use std::pin::Pin;
use std::time::Duration;
use async_trait::async_trait;
use futures::Stream;
use serde::{Deserialize, Serialize};
use super::lifecycle::{RunStatus, TerminationReason};
use super::message::Message;
use super::storage::StorageError;
use super::suspension::ToolCallResume;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RunDispatchStatus {
Queued,
Claimed,
Acked,
Cancelled,
Superseded,
DeadLetter,
}
impl RunDispatchStatus {
pub fn is_terminal(self) -> bool {
matches!(
self,
Self::Acked | Self::Cancelled | Self::Superseded | Self::DeadLetter
)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RunDispatchResult {
pub run_id: String,
pub dispatch_instance_id: String,
pub status: RunStatus,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub termination: Option<TerminationReason>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub response: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunDispatch {
pub dispatch_id: String,
pub thread_id: String,
pub run_id: String,
pub priority: u8,
pub dedupe_key: Option<String>,
pub dispatch_epoch: u64,
pub status: RunDispatchStatus,
pub available_at: u64,
pub attempt_count: u32,
pub max_attempts: u32,
pub last_error: Option<String>,
pub claim_token: Option<String>,
pub claimed_by: Option<String>,
pub lease_until: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dispatch_instance_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub run_status: Option<RunStatus>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub termination: Option<TerminationReason>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub run_response: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub run_error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completed_at: Option<u64>,
pub created_at: u64,
pub updated_at: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MailboxInterrupt {
pub new_dispatch_epoch: u64,
pub active_dispatch: Option<RunDispatch>,
pub superseded_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MailboxInterruptDetails {
pub new_dispatch_epoch: u64,
pub active_dispatch: Option<RunDispatch>,
pub superseded_count: usize,
#[serde(default)]
pub superseded_dispatches: Vec<RunDispatch>,
}
impl MailboxInterruptDetails {
#[must_use]
pub fn into_summary(self) -> MailboxInterrupt {
MailboxInterrupt {
new_dispatch_epoch: self.new_dispatch_epoch,
active_dispatch: self.active_dispatch,
superseded_count: self.superseded_count,
}
}
#[must_use]
pub fn summary(&self) -> MailboxInterrupt {
MailboxInterrupt {
new_dispatch_epoch: self.new_dispatch_epoch,
active_dispatch: self.active_dispatch.clone(),
superseded_count: self.superseded_count,
}
}
}
impl From<MailboxInterrupt> for MailboxInterruptDetails {
fn from(interrupt: MailboxInterrupt) -> Self {
Self {
new_dispatch_epoch: interrupt.new_dispatch_epoch,
active_dispatch: interrupt.active_dispatch,
superseded_count: interrupt.superseded_count,
superseded_dispatches: Vec::new(),
}
}
}
impl From<MailboxInterruptDetails> for MailboxInterrupt {
fn from(details: MailboxInterruptDetails) -> Self {
details.into_summary()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub enum LiveRunCommand {
Messages(Vec<Message>),
Cancel,
Decision(Vec<(String, ToolCallResume)>),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LiveRunTarget {
pub thread_id: String,
pub run_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dispatch_id: Option<String>,
}
impl LiveRunTarget {
#[must_use]
pub fn new(thread_id: impl Into<String>, run_id: impl Into<String>) -> Self {
Self {
thread_id: thread_id.into(),
run_id: run_id.into(),
dispatch_id: None,
}
}
#[must_use]
pub fn with_dispatch_id(mut self, dispatch_id: impl Into<String>) -> Self {
self.dispatch_id = Some(dispatch_id.into());
self
}
}
pub trait LiveCommandReceipt: Send + Sync {
fn ack(self: Box<Self>);
}
pub struct LiveRunCommandEntry {
pub command: LiveRunCommand,
pub receipt: Box<dyn LiveCommandReceipt>,
}
impl std::fmt::Debug for LiveRunCommandEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LiveRunCommandEntry")
.field("command", &self.command)
.finish_non_exhaustive()
}
}
pub type LiveRunCommandStream = Pin<Box<dyn Stream<Item = LiveRunCommandEntry> + Send>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LiveDeliveryOutcome {
Delivered,
NoSubscriber,
}
#[async_trait]
pub trait DispatchSignalReceipt: Send + Sync {
fn redelivery_attempts(&self) -> Option<u64> {
None
}
async fn ack(self: Box<Self>) -> Result<(), StorageError>;
async fn nack(self: Box<Self>) -> Result<(), StorageError>;
async fn nack_with_delay(self: Box<Self>, delay: Duration) -> Result<(), StorageError> {
let _ = delay;
self.nack().await
}
}
pub struct DispatchSignalEntry {
pub thread_id: String,
pub dispatch_id: String,
pub receipt: Box<dyn DispatchSignalReceipt>,
}
impl std::fmt::Debug for DispatchSignalEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DispatchSignalEntry")
.field("thread_id", &self.thread_id)
.field("dispatch_id", &self.dispatch_id)
.finish_non_exhaustive()
}
}
#[async_trait]
pub trait MailboxStore: Send + Sync {
async fn enqueue(&self, dispatch: &RunDispatch) -> Result<(), StorageError>;
async fn claim(
&self,
thread_id: &str,
consumer_id: &str,
lease_ms: u64,
now: u64,
limit: usize,
) -> Result<Vec<RunDispatch>, StorageError>;
async fn claim_dispatch(
&self,
dispatch_id: &str,
consumer_id: &str,
lease_ms: u64,
now: u64,
) -> Result<Option<RunDispatch>, StorageError>;
async fn ack(&self, dispatch_id: &str, claim_token: &str, now: u64)
-> Result<(), StorageError>;
async fn record_dispatch_start(
&self,
dispatch_id: &str,
claim_token: &str,
dispatch_instance_id: &str,
now: u64,
) -> Result<(), StorageError>;
async fn record_run_result(
&self,
dispatch_id: &str,
claim_token: &str,
result: &RunDispatchResult,
now: u64,
) -> Result<(), StorageError>;
async fn nack(
&self,
dispatch_id: &str,
claim_token: &str,
retry_at: u64,
error: &str,
now: u64,
) -> Result<(), StorageError>;
async fn dead_letter(
&self,
dispatch_id: &str,
claim_token: &str,
error: &str,
now: u64,
) -> Result<(), StorageError>;
async fn cancel(
&self,
dispatch_id: &str,
now: u64,
) -> Result<Option<RunDispatch>, StorageError>;
async fn extend_lease(
&self,
dispatch_id: &str,
claim_token: &str,
extension_ms: u64,
now: u64,
) -> Result<bool, StorageError>;
async fn interrupt(&self, thread_id: &str, now: u64) -> Result<MailboxInterrupt, StorageError>;
async fn interrupt_detailed(
&self,
thread_id: &str,
now: u64,
) -> Result<MailboxInterruptDetails, StorageError> {
self.interrupt(thread_id, now).await.map(Into::into)
}
async fn current_dispatch_epoch(&self, thread_id: &str) -> Result<u64, StorageError> {
let _ = thread_id;
Ok(0)
}
async fn supersede_claimed(
&self,
dispatch_id: &str,
claim_token: &str,
now: u64,
reason: &str,
) -> Result<Option<RunDispatch>, StorageError> {
let _ = (dispatch_id, claim_token, now, reason);
Err(StorageError::Io(
"supersede claimed dispatch is not supported by this mailbox store".into(),
))
}
async fn load_dispatch(&self, dispatch_id: &str) -> Result<Option<RunDispatch>, StorageError>;
async fn list_dispatches(
&self,
thread_id: &str,
status_filter: Option<&[RunDispatchStatus]>,
limit: usize,
offset: usize,
) -> Result<Vec<RunDispatch>, StorageError>;
async fn count_dispatches_by_status(
&self,
status: RunDispatchStatus,
) -> Result<usize, StorageError> {
let _ = status;
Err(StorageError::Io(
"count dispatches by status is not supported by this mailbox store".into(),
))
}
async fn list_terminal_dispatches(
&self,
limit: usize,
offset: usize,
) -> Result<Vec<RunDispatch>, StorageError> {
let _ = (limit, offset);
Err(StorageError::Io(
"list terminal dispatches is not supported by this mailbox store".into(),
))
}
async fn reclaim_expired_leases(
&self,
now: u64,
limit: usize,
) -> Result<Vec<RunDispatch>, StorageError>;
async fn purge_terminal(&self, older_than: u64) -> Result<usize, StorageError>;
async fn queued_thread_ids(&self) -> Result<Vec<String>, StorageError>;
fn supports_dispatch_signals(&self) -> bool {
false
}
async fn pull_dispatch_signals(
&self,
max: usize,
expires: Duration,
) -> Result<Vec<DispatchSignalEntry>, StorageError> {
let _ = (max, expires);
Ok(Vec::new())
}
async fn deliver_live(
&self,
thread_id: &str,
cmd: LiveRunCommand,
) -> Result<LiveDeliveryOutcome, StorageError> {
let _ = (thread_id, cmd);
Ok(LiveDeliveryOutcome::NoSubscriber)
}
async fn deliver_live_to(
&self,
target: &LiveRunTarget,
cmd: LiveRunCommand,
) -> Result<LiveDeliveryOutcome, StorageError> {
self.deliver_live(&target.thread_id, cmd).await
}
async fn open_live_channel(
&self,
thread_id: &str,
) -> Result<LiveRunCommandStream, StorageError> {
let _ = thread_id;
Ok(Box::pin(futures::stream::empty()))
}
async fn open_live_channel_for(
&self,
target: &LiveRunTarget,
) -> Result<LiveRunCommandStream, StorageError> {
self.open_live_channel(&target.thread_id).await
}
}
#[cfg(test)]
mod tests {
use super::*;
mod proptest_mailbox {
use super::*;
use proptest::prelude::*;
fn arb_dispatch_status() -> impl Strategy<Value = RunDispatchStatus> {
prop_oneof![
Just(RunDispatchStatus::Queued),
Just(RunDispatchStatus::Claimed),
Just(RunDispatchStatus::Acked),
Just(RunDispatchStatus::Cancelled),
Just(RunDispatchStatus::Superseded),
Just(RunDispatchStatus::DeadLetter),
]
}
fn arb_dispatch() -> impl Strategy<Value = RunDispatch> {
(
arb_dispatch_status(),
0u32..100,
0u64..u64::MAX,
0u64..u64::MAX,
0u8..=255u8,
1u32..20,
0u64..1_000_000,
)
.prop_map(
|(
status,
attempt_count,
created_at,
available_at,
priority,
max_attempts,
dispatch_epoch,
)| {
let claim_token = match status {
RunDispatchStatus::Claimed => Some("token-123".to_string()),
_ => None,
};
let claimed_by = match status {
RunDispatchStatus::Claimed => Some("consumer-1".to_string()),
_ => None,
};
RunDispatch {
dispatch_id: "dispatch-prop".to_string(),
thread_id: "thread-prop".to_string(),
run_id: "run-prop".to_string(),
priority,
dedupe_key: None,
dispatch_epoch,
status,
available_at,
attempt_count,
max_attempts,
last_error: None,
claim_token,
claimed_by,
lease_until: if status == RunDispatchStatus::Claimed {
Some(created_at.saturating_add(30_000))
} else {
None
},
dispatch_instance_id: None,
run_status: None,
termination: None,
run_response: None,
run_error: None,
completed_at: None,
created_at,
updated_at: created_at,
}
},
)
}
proptest! {
#[test]
fn terminal_status_is_terminal(status in arb_dispatch_status()) {
let expected_terminal = matches!(
status,
RunDispatchStatus::Acked
| RunDispatchStatus::Cancelled
| RunDispatchStatus::Superseded
| RunDispatchStatus::DeadLetter
);
prop_assert_eq!(status.is_terminal(), expected_terminal);
}
#[test]
fn claimed_dispatch_always_has_claim_token(dispatch in arb_dispatch()) {
if dispatch.status == RunDispatchStatus::Claimed {
prop_assert!(
dispatch.claim_token.is_some(),
"Claimed dispatch must have a claim_token"
);
}
}
#[test]
fn queued_dispatch_never_has_claim_token(dispatch in arb_dispatch()) {
if dispatch.status == RunDispatchStatus::Queued {
prop_assert!(
dispatch.claim_token.is_none(),
"Queued dispatch must not have a claim_token"
);
}
}
#[test]
fn run_dispatch_serde_roundtrip(dispatch in arb_dispatch()) {
let json = serde_json::to_string(&dispatch).unwrap();
let parsed: RunDispatch = serde_json::from_str(&json).unwrap();
prop_assert_eq!(parsed.dispatch_id, dispatch.dispatch_id);
prop_assert_eq!(parsed.status, dispatch.status);
prop_assert_eq!(parsed.attempt_count, dispatch.attempt_count);
prop_assert_eq!(parsed.priority, dispatch.priority);
prop_assert_eq!(parsed.dispatch_epoch, dispatch.dispatch_epoch);
prop_assert_eq!(parsed.claim_token, dispatch.claim_token);
prop_assert_eq!(parsed.available_at, dispatch.available_at);
prop_assert_eq!(parsed.max_attempts, dispatch.max_attempts);
}
#[test]
fn run_dispatch_status_serde_roundtrip_prop(status in arb_dispatch_status()) {
let json = serde_json::to_string(&status).unwrap();
let parsed: RunDispatchStatus = serde_json::from_str(&json).unwrap();
prop_assert_eq!(parsed, status);
}
}
}
#[test]
fn is_terminal_returns_true_for_terminal_states() {
assert!(RunDispatchStatus::Acked.is_terminal());
assert!(RunDispatchStatus::Cancelled.is_terminal());
assert!(RunDispatchStatus::Superseded.is_terminal());
assert!(RunDispatchStatus::DeadLetter.is_terminal());
}
#[test]
fn is_terminal_returns_false_for_non_terminal_states() {
assert!(!RunDispatchStatus::Queued.is_terminal());
assert!(!RunDispatchStatus::Claimed.is_terminal());
}
fn make_run_dispatch() -> RunDispatch {
RunDispatch {
dispatch_id: "dispatch-001".to_string(),
thread_id: "thread-abc".to_string(),
run_id: "run-001".to_string(),
priority: 128,
dedupe_key: Some("req-xyz".to_string()),
dispatch_epoch: 1,
status: RunDispatchStatus::Queued,
available_at: 1000,
attempt_count: 0,
max_attempts: 5,
last_error: None,
claim_token: None,
claimed_by: None,
lease_until: None,
dispatch_instance_id: None,
run_status: None,
termination: None,
run_response: None,
run_error: None,
completed_at: None,
created_at: 1000,
updated_at: 1000,
}
}
#[test]
fn run_dispatch_serde_roundtrip() {
let dispatch = make_run_dispatch();
let json = serde_json::to_string(&dispatch).unwrap();
let parsed: RunDispatch = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.dispatch_id, "dispatch-001");
assert_eq!(parsed.thread_id, "thread-abc");
assert_eq!(parsed.run_id, "run-001");
assert_eq!(parsed.priority, 128);
assert_eq!(parsed.dedupe_key.as_deref(), Some("req-xyz"));
assert_eq!(parsed.dispatch_epoch, 1);
assert_eq!(parsed.status, RunDispatchStatus::Queued);
assert_eq!(parsed.available_at, 1000);
assert_eq!(parsed.attempt_count, 0);
assert_eq!(parsed.max_attempts, 5);
assert!(parsed.last_error.is_none());
assert!(parsed.claim_token.is_none());
assert!(parsed.claimed_by.is_none());
assert!(parsed.lease_until.is_none());
assert!(parsed.dispatch_instance_id.is_none());
assert!(parsed.run_status.is_none());
assert!(parsed.termination.is_none());
assert!(parsed.run_response.is_none());
assert!(parsed.run_error.is_none());
assert!(parsed.completed_at.is_none());
assert_eq!(parsed.created_at, 1000);
assert_eq!(parsed.updated_at, 1000);
}
#[test]
fn run_dispatch_runtime_trace_serde_roundtrip() {
use super::super::lifecycle::TerminationReason;
let mut dispatch = make_run_dispatch();
dispatch.dispatch_instance_id = Some("dispatch-1".into());
dispatch.run_status = Some(RunStatus::Done);
dispatch.termination = Some(TerminationReason::NaturalEnd);
dispatch.run_response = Some("done".into());
dispatch.completed_at = Some(2000);
let json = serde_json::to_string(&dispatch).unwrap();
let parsed: RunDispatch = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.run_id, "run-001");
assert_eq!(parsed.dispatch_instance_id.as_deref(), Some("dispatch-1"));
assert_eq!(parsed.run_status, Some(RunStatus::Done));
assert_eq!(parsed.termination, Some(TerminationReason::NaturalEnd));
assert_eq!(parsed.run_response.as_deref(), Some("done"));
assert_eq!(parsed.completed_at, Some(2000));
assert_eq!(parsed.status, RunDispatchStatus::Queued);
}
#[test]
fn run_dispatch_result_serde_roundtrip() {
use super::super::lifecycle::TerminationReason;
let result = RunDispatchResult {
run_id: "run-1".into(),
dispatch_instance_id: "dispatch-1".into(),
status: RunStatus::Done,
termination: Some(TerminationReason::Blocked("needs approval".into())),
response: None,
error: Some("needs approval".into()),
};
let json = serde_json::to_string(&result).unwrap();
let parsed: RunDispatchResult = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, result);
}
#[test]
fn run_dispatch_status_serde_roundtrip() {
for status in [
RunDispatchStatus::Queued,
RunDispatchStatus::Claimed,
RunDispatchStatus::Acked,
RunDispatchStatus::Cancelled,
RunDispatchStatus::Superseded,
RunDispatchStatus::DeadLetter,
] {
let json = serde_json::to_string(&status).unwrap();
let parsed: RunDispatchStatus = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, status);
}
}
#[test]
fn mailbox_interrupt_serde_roundtrip() {
let interrupt = MailboxInterrupt {
new_dispatch_epoch: 5,
active_dispatch: Some(make_run_dispatch()),
superseded_count: 3,
};
let json = serde_json::to_string(&interrupt).unwrap();
let parsed: MailboxInterrupt = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.new_dispatch_epoch, 5);
assert!(parsed.active_dispatch.is_some());
assert_eq!(parsed.superseded_count, 3);
}
#[test]
fn mailbox_interrupt_ignores_detailed_payload_for_legacy_summary() {
let json = serde_json::json!({
"new_dispatch_epoch": 5,
"active_dispatch": null,
"superseded_count": 3,
"superseded_dispatches": [make_run_dispatch()]
});
let parsed: MailboxInterrupt = serde_json::from_value(json).unwrap();
assert_eq!(parsed.new_dispatch_epoch, 5);
assert!(parsed.active_dispatch.is_none());
assert_eq!(parsed.superseded_count, 3);
}
#[test]
fn mailbox_interrupt_details_serde_roundtrip() {
let details = MailboxInterruptDetails {
new_dispatch_epoch: 5,
active_dispatch: Some(make_run_dispatch()),
superseded_count: 3,
superseded_dispatches: vec![make_run_dispatch()],
};
let json = serde_json::to_string(&details).unwrap();
let parsed: MailboxInterruptDetails = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.new_dispatch_epoch, 5);
assert!(parsed.active_dispatch.is_some());
assert_eq!(parsed.superseded_count, 3);
assert_eq!(parsed.superseded_dispatches.len(), 1);
assert_eq!(parsed.summary().superseded_count, 3);
}
}