use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use async_trait::async_trait;
use parking_lot::Mutex as SyncMutex;
use thiserror::Error;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use awaken_runtime::{ResolveError, RunActivation};
use awaken_server_contract::contract::commit_coordinator::CommitCoordinator;
use awaken_server_contract::contract::event::AgentEvent;
use awaken_server_contract::contract::event_sink::EventSink;
use awaken_server_contract::contract::mailbox::{MailboxStore, RunDispatchStatus};
use awaken_server_contract::contract::message::Message;
use awaken_server_contract::contract::run::{
RunActivationSnapshot, RunInputSnapshot, RunIntent, RunKind, RunOptions, RunTraceContext,
};
use awaken_server_contract::contract::staged_commit::{
OutboxServerEventPublisher, StagedCommitCoordinator,
};
use awaken_server_contract::contract::storage::{RunRecord, StorageError, ThreadRunStore};
use awaken_server_contract::contract::suspension::{ToolCallOutcome, ToolCallResume};
use awaken_server_contract::contract::tool_intercept::{AdapterKind, RunMode};
use crate::transport::channel_sink::ReconnectableEventSink;
const INLINE_CLAIM_GUARD_MS: u64 = 60_000;
#[cfg(not(test))]
const REMOTE_CANCEL_WAIT_MS: u64 = 5_000;
#[cfg(test)]
const REMOTE_CANCEL_WAIT_MS: u64 = 250;
const REMOTE_CANCEL_POLL_MS: u64 = 25;
const DISPATCH_SIGNAL_BATCH_DEFAULT: usize = 32;
const DISPATCH_SIGNAL_EXPIRES_DEFAULT: Duration = Duration::from_millis(500);
const DISPATCH_SIGNAL_ERROR_DELAY: Duration = Duration::from_millis(250);
const DISPATCH_SIGNAL_BLOCKED_NACK_BASE_DELAY_DEFAULT: Duration = Duration::from_millis(500);
const DISPATCH_SIGNAL_BLOCKED_NACK_MAX_DELAY_DEFAULT: Duration = Duration::from_secs(30);
const DISPATCH_SIGNAL_BATCH_ENV: &str = "AWAKEN_DISPATCH_SIGNAL_BATCH_SIZE";
const DISPATCH_SIGNAL_EXPIRES_ENV: &str = "AWAKEN_DISPATCH_SIGNAL_FETCH_EXPIRES_MS";
const DISPATCH_SIGNAL_NACK_BASE_DELAY_ENV: &str = "AWAKEN_DISPATCH_SIGNAL_NACK_BASE_DELAY_MS";
const DISPATCH_SIGNAL_NACK_MAX_DELAY_ENV: &str = "AWAKEN_DISPATCH_SIGNAL_NACK_MAX_DELAY_MS";
const DISPATCH_SIGNAL_MAX_CONCURRENT_HANDLERS_DEFAULT: usize = 32;
const DISPATCH_SIGNAL_MAX_CONCURRENT_HANDLERS_ENV: &str =
"AWAKEN_DISPATCH_SIGNAL_MAX_CONCURRENT_HANDLERS";
const TERMINAL_RECONCILE_BATCH: usize = 100;
const MAILBOX_DEPTH_STATUSES: [RunDispatchStatus; 6] = [
RunDispatchStatus::Queued,
RunDispatchStatus::Claimed,
RunDispatchStatus::Acked,
RunDispatchStatus::Cancelled,
RunDispatchStatus::Superseded,
RunDispatchStatus::DeadLetter,
];
pub(crate) const ACTIVE_RUN_CONFLICT_MESSAGE: &str =
"thread has an active run; cannot claim inline";
pub(super) fn run_activation_snapshot(
request: &RunActivation,
persisted_input: RunInputSnapshot,
resolution_id: Option<String>,
) -> RunActivationSnapshot {
RunActivationSnapshot {
intent: request.intent.clone(),
input: persisted_input,
options: request.options.clone(),
trace: request.trace.clone(),
seeded_decisions: request.control.seeded_decisions.clone(),
resolution_id,
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub(super) struct LegacyRunSnapshotExtras {
#[serde(default, skip_serializing_if = "Option::is_none")]
overrides: Option<awaken_server_contract::contract::inference::InferenceOverride>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
decisions: Vec<(
String,
awaken_server_contract::contract::suspension::ToolCallResume,
)>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
frontend_tools: Vec<awaken_server_contract::contract::tool::ToolDescriptor>,
#[serde(default, skip_serializing_if = "Option::is_none")]
continue_run_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
run_id_hint: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
dispatch_id_hint: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
parent_thread_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
transport_request_id: Option<String>,
#[serde(default)]
run_mode: RunMode,
#[serde(default)]
adapter: AdapterKind,
}
impl LegacyRunSnapshotExtras {
#[cfg(test)]
fn from_request(request: &awaken_runtime::RunActivation) -> Self {
Self {
overrides: request.options.overrides.clone(),
decisions: request.control.seeded_decisions.clone(),
frontend_tools: request.options.frontend_tools.clone(),
continue_run_id: request.resume_run_id().map(str::to_owned),
run_id_hint: request.persistence.run_id_hint.clone(),
dispatch_id_hint: request.persistence.dispatch_id_hint.clone(),
parent_thread_id: request.trace.parent_thread_id.clone(),
transport_request_id: request.trace.transport_request_id.clone(),
run_mode: request.trace.run_mode,
adapter: request.trace.adapter,
}
}
#[cfg(test)]
fn to_value(&self) -> Result<Option<serde_json::Value>, serde_json::Error> {
if self.overrides.is_none()
&& self.decisions.is_empty()
&& self.frontend_tools.is_empty()
&& self.continue_run_id.is_none()
&& self.run_id_hint.is_none()
&& self.dispatch_id_hint.is_none()
&& self.parent_thread_id.is_none()
&& self.transport_request_id.is_none()
&& self.run_mode == RunMode::Foreground
&& self.adapter == AdapterKind::Internal
{
Ok(None)
} else {
serde_json::to_value(self).map(Some)
}
}
fn from_value(value: &serde_json::Value) -> Result<Self, serde_json::Error> {
serde_json::from_value(value.clone())
}
#[cfg(test)]
fn apply_to(self, mut request: awaken_runtime::RunActivation) -> awaken_runtime::RunActivation {
if let Some(ov) = self.overrides {
request = request.with_overrides(ov);
}
if !self.decisions.is_empty() {
request = request.with_decisions(self.decisions);
}
if !self.frontend_tools.is_empty() {
request = request.with_frontend_tools(self.frontend_tools);
}
if let Some(crid) = self.continue_run_id {
request = request.with_continue_run_id(crid);
}
if let Some(run_id_hint) = self.run_id_hint {
request = request.with_run_id_hint(run_id_hint);
}
if let Some(dispatch_id_hint) = self.dispatch_id_hint {
request = request.with_dispatch_id_hint(dispatch_id_hint);
}
if let Some(parent_thread_id) = self.parent_thread_id {
request = request.with_parent_thread_id(parent_thread_id);
}
if let Some(transport_request_id) = self.transport_request_id {
request = request.with_transport_request_id(transport_request_id);
}
request
.with_run_mode(self.run_mode)
.with_adapter(self.adapter)
}
}
pub(super) struct LegacyRunRequestSnapshotAdapter {
pub snapshot: awaken_server_contract::contract::storage::RunRequestSnapshot,
pub input: RunInputSnapshot,
pub resolution_id: Option<String>,
pub thread_id: String,
pub agent_id: Option<String>,
pub parent_run_id: Option<String>,
pub extras: Option<LegacyRunSnapshotExtras>,
}
impl TryFrom<LegacyRunRequestSnapshotAdapter> for RunActivationSnapshot {
type Error = String;
fn try_from(value: LegacyRunRequestSnapshotAdapter) -> Result<Self, Self::Error> {
let mut options = RunOptions {
overrides: None,
frontend_tools: value.snapshot.frontend_tools,
};
let mut trace = RunTraceContext {
parent_run_id: value.parent_run_id,
parent_thread_id: value.snapshot.parent_thread_id,
origin: value.snapshot.origin.into(),
adapter: AdapterKind::Internal,
run_mode: RunMode::Resume,
dispatch_id: None,
session_id: None,
transport_request_id: value.snapshot.transport_request_id,
correlation_id: None,
};
let mut kind = RunKind::NewIntent;
let mut seeded_decisions = value
.snapshot
.decisions
.into_iter()
.map(|decision| (decision.call_id, decision.resume))
.collect::<Vec<_>>();
if let Some(extras) = value.extras {
if let Some(overrides) = extras.overrides {
options.overrides = Some(overrides);
}
if !extras.frontend_tools.is_empty() {
options.frontend_tools = extras.frontend_tools;
}
if !extras.decisions.is_empty() {
seeded_decisions = extras.decisions;
}
if let Some(run_id) = extras.continue_run_id {
kind = RunKind::HitlResume { run_id };
}
if let Some(parent_thread_id) = extras.parent_thread_id {
trace.parent_thread_id = Some(parent_thread_id);
}
if let Some(transport_request_id) = extras.transport_request_id {
trace.transport_request_id = Some(transport_request_id);
}
trace.run_mode = extras.run_mode;
trace.adapter = extras.adapter;
}
Ok(RunActivationSnapshot {
intent: RunIntent {
agent_id: value.agent_id,
thread_id: value.thread_id,
kind,
},
input: value.input,
options,
trace,
seeded_decisions,
resolution_id: value.resolution_id,
})
}
}
pub(super) fn legacy_input_snapshot(
run: &RunRecord,
snapshot: &awaken_server_contract::contract::storage::RunRequestSnapshot,
) -> RunInputSnapshot {
if let Some(input) = run.input.as_ref() {
return RunInputSnapshot {
thread_id: input.thread_id.clone(),
range: input.range,
trigger_message_ids: input.trigger_message_ids.clone(),
selected_message_ids: input.selected_message_ids.clone(),
context_policy: input.context_policy.clone(),
compacted_snapshot_id: input.compacted_snapshot_id.clone(),
};
}
RunInputSnapshot {
thread_id: run.thread_id.clone(),
range: None,
trigger_message_ids: snapshot.input_message_ids.clone(),
selected_message_ids: Vec::new(),
context_policy: None,
compacted_snapshot_id: None,
}
}
pub struct TaskDoneMailboxNotify {
mailbox: Arc<Mailbox>,
thread_id: String,
continue_run_id: Option<String>,
}
impl TaskDoneMailboxNotify {
pub fn new(mailbox: Arc<Mailbox>, thread_id: String, continue_run_id: Option<String>) -> Self {
Self {
mailbox,
thread_id,
continue_run_id,
}
}
}
impl awaken_runtime::inbox::OnInboxClosed for TaskDoneMailboxNotify {
fn closed(&self, message: &serde_json::Value) {
let mailbox = self.mailbox.clone();
let thread_id = self.thread_id.clone();
let continue_run_id = self.continue_run_id.clone();
let wake_message = awaken_runtime::inbox::inbox_event_message(message);
tokio::spawn(async move {
let mut request = RunActivation::new(thread_id.clone(), vec![wake_message])
.with_origin(awaken_server_contract::contract::storage::RunRequestOrigin::Internal)
.with_run_mode(RunMode::InternalWake)
.with_adapter(AdapterKind::Internal);
if let Some(run_id) = continue_run_id {
request = request.with_continue_run_id(run_id);
}
if let Err(e) = mailbox.submit_background(request).await {
tracing::warn!(thread_id, error = %e, "failed to enqueue background task wake dispatch");
}
});
}
}
#[derive(Debug, Clone)]
pub struct MailboxSubmitResult {
pub dispatch_id: String,
pub run_id: String,
pub thread_id: String,
pub status: MailboxDispatchStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MailboxDispatchStatus {
Running,
Queued,
}
#[derive(Debug, Error)]
pub enum MailboxError {
#[error("validation error: {0}")]
Validation(String),
#[error("store error: {0}")]
Store(#[from] StorageError),
#[error("resolution error while {context}: {source}")]
Resolution {
context: &'static str,
source: ResolveError,
},
#[error("internal error: {0}")]
Internal(String),
#[error("delivery blocked by barrier: pending '{blocking_pending_id}' must be consumed first")]
DeliveryBlockedByBarrier { blocking_pending_id: String },
}
#[derive(Debug)]
pub enum MailboxRunOutcome {
Completed,
TransientError(String),
PermanentError(String),
}
impl MailboxRunOutcome {
fn metric_label(&self) -> &'static str {
match self {
Self::Completed => "completed",
Self::TransientError(_) => "transient_error",
Self::PermanentError(_) => "permanent_error",
}
}
}
#[derive(Debug, Clone)]
pub struct MailboxConfig {
pub lease_ms: u64,
pub suspended_lease_ms: u64,
pub lease_renewal_interval: Duration,
pub sweep_interval: Duration,
pub gc_interval: Duration,
pub gc_ttl: Duration,
pub default_max_attempts: u32,
pub default_retry_delay_ms: u64,
pub max_retry_delay_ms: u64,
}
impl Default for MailboxConfig {
fn default() -> Self {
Self {
lease_ms: 30_000,
suspended_lease_ms: 600_000,
lease_renewal_interval: Duration::from_secs(10),
sweep_interval: Duration::from_secs(30),
gc_interval: Duration::from_secs(60),
gc_ttl: Duration::from_secs(24 * 60 * 60),
default_max_attempts: 5,
default_retry_delay_ms: 250,
max_retry_delay_ms: 30_000,
}
}
}
pub type MailboxMaintenanceCallback = Arc<dyn Fn() + Send + Sync + 'static>;
#[derive(Clone)]
pub struct MailboxStartupRecoveryConfig {
pub max_attempts: u32,
pub retry_delay: Duration,
}
impl Default for MailboxStartupRecoveryConfig {
fn default() -> Self {
Self {
max_attempts: 1,
retry_delay: Duration::from_millis(250),
}
}
}
#[derive(Clone)]
pub struct MailboxLifecycleConfig {
pub startup_delay: Duration,
pub startup_recovery: MailboxStartupRecoveryConfig,
pub maintenance_callback: Option<MailboxMaintenanceCallback>,
}
impl Default for MailboxLifecycleConfig {
fn default() -> Self {
Self {
startup_delay: Duration::ZERO,
startup_recovery: MailboxStartupRecoveryConfig::default(),
maintenance_callback: None,
}
}
}
#[derive(Clone)]
pub struct MailboxLifecycleHandle {
tasks: Arc<StdMutex<Option<MailboxLifecycleTasks>>>,
transition_lock: Arc<Mutex<()>>,
}
impl MailboxLifecycleHandle {
pub fn abort(&self) {
if let Some(tasks) = self.tasks.lock().expect("lifecycle lock poisoned").take() {
tasks.abort();
}
}
pub async fn shutdown(&self) -> Result<(), MailboxError> {
let _transition_guard = self.transition_lock.lock().await;
let tasks = self.tasks.lock().expect("lifecycle lock poisoned").take();
if let Some(tasks) = tasks {
tasks.shutdown().await?;
}
Ok(())
}
pub fn is_running(&self) -> bool {
self.tasks
.lock()
.expect("lifecycle lock poisoned")
.is_some()
}
}
struct MailboxLifecycleTasks {
recover_task: Option<JoinHandle<()>>,
dispatch_signal_task: Option<JoinHandle<()>>,
maintenance_task: JoinHandle<()>,
}
impl MailboxLifecycleTasks {
fn abort(self) {
if let Some(task) = self.recover_task {
task.abort();
}
if let Some(task) = self.dispatch_signal_task {
task.abort();
}
self.maintenance_task.abort();
}
async fn shutdown(self) -> Result<(), MailboxError> {
if let Some(task) = self.recover_task {
task.abort();
await_lifecycle_task("mailbox startup recovery", task).await?;
}
if let Some(task) = self.dispatch_signal_task {
task.abort();
await_lifecycle_task("mailbox dispatch signal loop", task).await?;
}
self.maintenance_task.abort();
await_lifecycle_task("mailbox maintenance", self.maintenance_task).await
}
}
async fn await_lifecycle_task(name: &str, task: JoinHandle<()>) -> Result<(), MailboxError> {
match task.await {
Ok(()) => Ok(()),
Err(error) if error.is_cancelled() => Ok(()),
Err(error) if error.is_panic() => Err(MailboxError::Internal(format!("{name} panicked"))),
Err(error) => Err(MailboxError::Internal(format!("{name} failed: {error}"))),
}
}
enum MailboxWorkerStatus {
Idle,
Claiming,
Running {
dispatch_id: String,
run_id: String,
lease_handle: JoinHandle<()>,
sink: Arc<ReconnectableEventSink>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum DispatchAttempt {
Claimed,
Busy,
NoEligible,
TransientError,
}
impl DispatchAttempt {
fn started_execution(self) -> bool {
matches!(self, DispatchAttempt::Claimed)
}
}
struct ThreadContext {
messages: Vec<Message>,
latest_run: Option<RunRecord>,
run_cache: HashMap<String, RunRecord>,
}
impl ThreadContext {
async fn load(run_store: &dyn ThreadRunStore, thread_id: &str) -> Result<Self, MailboxError> {
let messages = run_store
.load_messages(thread_id)
.await?
.unwrap_or_default();
let latest_run = run_store.latest_run(thread_id).await?;
let mut run_cache = HashMap::new();
if let Some(ref run) = latest_run {
run_cache.insert(run.run_id.clone(), run.clone());
}
Ok(Self {
messages,
latest_run,
run_cache,
})
}
fn get_run(&self, run_id: &str) -> Option<&RunRecord> {
self.run_cache.get(run_id)
}
fn apply_checkpoint(&mut self, messages: &[Message], run: &RunRecord) {
self.messages = messages.to_vec();
self.latest_run = Some(run.clone());
self.run_cache.insert(run.run_id.clone(), run.clone());
}
}
struct MailboxWorker {
status: MailboxWorkerStatus,
thread_ctx: Option<ThreadContext>,
}
impl Default for MailboxWorker {
fn default() -> Self {
Self {
status: MailboxWorkerStatus::Idle,
thread_ctx: None,
}
}
}
struct SuspensionAwareSink {
inner: Arc<dyn EventSink>,
suspended: Arc<AtomicBool>,
}
#[async_trait]
impl EventSink for SuspensionAwareSink {
async fn emit(&self, event: AgentEvent) {
if matches!(
&event,
AgentEvent::ToolCallDone {
outcome: ToolCallOutcome::Suspended,
..
}
) {
self.suspended.store(true, Ordering::Release);
}
if matches!(&event, AgentEvent::ToolCallResumed { .. }) {
self.suspended.store(false, Ordering::Release);
}
self.inner.emit(event).await;
}
async fn close(&self) {
self.inner.close().await;
}
}
struct ActiveRunGuard;
impl Drop for ActiveRunGuard {
fn drop(&mut self) {
crate::metrics::dec_active_runs();
}
}
pub struct Mailbox {
executor: Arc<dyn RunDispatchExecutor>,
store: Arc<dyn MailboxStore>,
coordinator: Arc<dyn CommitCoordinator>,
staged_coordinator: Option<Arc<dyn StagedCommitCoordinator>>,
run_store: Arc<dyn ThreadRunStore>,
pending_thread_run_store: Option<Arc<dyn awaken_stores::PendingThreadRunStore>>,
consumer_id: String,
workers: RwLock<HashMap<String, Arc<SyncMutex<MailboxWorker>>>>,
config: MailboxConfig,
runtime_event_capture: Option<RuntimeEventCaptureConfig>,
server_event_publisher: Option<Arc<dyn OutboxServerEventPublisher>>,
server_event_origin: String,
lifecycle_tasks: Arc<StdMutex<Option<MailboxLifecycleTasks>>>,
lifecycle_start_lock: Arc<Mutex<()>>,
thread_append_locks: Box<[Mutex<()>]>,
checkpoint_repair_queue: Arc<StdMutex<VecDeque<checkpoint_repair::CheckpointRepairTask>>>,
pinned_registry: Option<PinnedRegistrySource>,
}
#[derive(Clone)]
struct PinnedRegistrySource {
store: Arc<dyn awaken_server_contract::VersionedRegistryStore>,
scope: awaken_server_contract::ScopeId,
}
impl Mailbox {
pub fn new(
executor: impl IntoDispatchExecutor,
store: Arc<dyn MailboxStore>,
run_store: Arc<dyn ThreadRunStore>,
consumer_id: String,
config: MailboxConfig,
) -> Self {
Self::try_new(executor, store, run_store, consumer_id, config)
.expect("Mailbox requires a CommitCoordinator outside unit-test fallback")
}
pub fn try_new(
executor: impl IntoDispatchExecutor,
store: Arc<dyn MailboxStore>,
run_store: Arc<dyn ThreadRunStore>,
consumer_id: String,
config: MailboxConfig,
) -> Result<Self, MailboxError> {
let executor = executor.into_dispatch_executor();
let staged_coordinator = executor.staged_commit_coordinator();
let coordinator = if let Some(coordinator) = executor.commit_coordinator() {
coordinator
} else if cfg!(test) {
tracing::warn!(
"using unit-test MailboxRunStoreCoordinator fallback; non-test executors must \
provide a CommitCoordinator"
);
Arc::new(MailboxRunStoreCoordinator::new(Arc::clone(&run_store)))
as Arc<dyn CommitCoordinator>
} else {
return Err(MailboxError::Internal(
"Mailbox requires a CommitCoordinator; wire a durable \
Memory/File/Pg coordinator through the runtime"
.to_string(),
));
};
if let (Some(coordinator_identity), Some(run_store_identity)) = (
coordinator.thread_run_storage_identity(),
run_store.thread_run_storage_identity(),
) && coordinator_identity != run_store_identity
{
return Err(MailboxError::Validation(format!(
"mailbox run_store must match executor CommitCoordinator thread/run store \
(coordinator={coordinator_identity}, run_store={run_store_identity})"
)));
}
Ok(Self {
executor,
store,
coordinator,
staged_coordinator,
run_store,
pending_thread_run_store: None,
consumer_id,
workers: RwLock::new(HashMap::new()),
config,
runtime_event_capture: None,
server_event_publisher: None,
server_event_origin: "mailbox".to_string(),
lifecycle_tasks: Arc::new(StdMutex::new(None)),
lifecycle_start_lock: Arc::new(Mutex::new(())),
thread_append_locks: (0..Self::THREAD_APPEND_STRIPES)
.map(|_| Mutex::new(()))
.collect(),
checkpoint_repair_queue: Arc::new(StdMutex::new(VecDeque::new())),
pinned_registry: None,
})
}
#[must_use]
pub fn with_pinned_registry(
self,
store: Arc<dyn awaken_server_contract::VersionedRegistryStore>,
scope: impl Into<String>,
) -> Self {
self.try_with_pinned_registry(store, scope)
.expect("pinned registry scope_id must be valid")
}
pub fn try_with_pinned_registry(
mut self,
store: Arc<dyn awaken_server_contract::VersionedRegistryStore>,
scope: impl Into<String>,
) -> Result<Self, awaken_server_contract::ScopeError> {
let scope = awaken_server_contract::ScopeId::new(scope.into())?;
self.pinned_registry = Some(PinnedRegistrySource { store, scope });
Ok(self)
}
async fn materialize_pinned_registry_set(
&self,
resolution_id: &str,
) -> Result<Option<awaken_runtime::registry::RegistrySet>, MailboxError> {
let Some(source) = self.pinned_registry.as_ref() else {
return Ok(None);
};
let snapshot_version = resolution_id.parse::<u64>().map_err(|error| {
MailboxError::Internal(format!(
"invalid pinned registry resolution id '{resolution_id}': {error}"
))
})?;
let live = self.executor.live_registry_set().ok_or_else(|| {
MailboxError::Internal(
"pinned registry materialization requires a live registry snapshot".to_string(),
)
})?;
let materializer = crate::services::frozen_registry::FrozenAgentRegistryMaterializer::new(
source.store.clone(),
);
let frozen = materializer
.materialize(awaken_server_contract::VersionSelector::Publication {
scope_id: source.scope.as_str().to_string(),
snapshot_version,
})
.await
.map_err(|error| {
MailboxError::Internal(format!(
"failed to materialize pinned registry publication {snapshot_version}: {error}"
))
})?;
Ok(Some(frozen.to_registry_set(&live)))
}
const THREAD_APPEND_STRIPES: usize = 256;
const EVENT_CHANNEL_CAPACITY: usize = 256;
pub fn thread_run_store(&self) -> &Arc<dyn ThreadRunStore> {
&self.run_store
}
pub fn coordinator(&self) -> &Arc<dyn CommitCoordinator> {
&self.coordinator
}
pub fn send_decision(&self, id: &str, tool_call_id: String, resume: ToolCallResume) -> bool {
self.executor.send_decision(id, tool_call_id, resume)
}
}
mod cancel;
mod checkpoint;
mod checkpoint_repair;
mod coordinator_facade;
mod decision;
mod dispatch_execution;
mod helpers;
mod lifecycle;
mod live_delivery;
mod pending_delivery;
mod runtime_event_capture;
mod server_event_capture;
mod signal_loop;
mod staging_coordinator;
mod submit;
use self::coordinator_facade::MailboxRunStoreCoordinator;
use self::{helpers::*, runtime_event_capture::RuntimeEventCaptureConfig};
pub use crate::run_dispatch::RunDispatchExecutor;
pub use coordinator_facade::IntoDispatchExecutor;
#[cfg(test)]
mod tests;