use std::sync::Arc;
use async_trait::async_trait;
use awaken_runtime::RunActivation;
use awaken_runtime::loop_runner::{AgentLoopError, AgentRunResult};
use awaken_server_contract::contract::event_sink::EventSink;
use awaken_server_contract::contract::lifecycle::{RunStatus, TerminationReason};
use awaken_server_contract::contract::message::{
DeliveryBoundary, DeliveryGranularity, DeliveryMode, Message,
};
use awaken_server_contract::contract::storage::{RunRecord, RunStore, ThreadStore};
use awaken_server_contract::contract::suspension::ToolCallResume;
use awaken_stores::{InMemoryMailboxStore, InMemoryStore, PendingMessageStore};
use super::*;
use crate::mailbox::{MailboxConfig, RunDispatchExecutor};
struct NoopExecutor;
fn created_run_record(thread_id: &str, run_id: &str) -> RunRecord {
RunRecord {
run_id: run_id.to_string(),
thread_id: thread_id.to_string(),
agent_id: "agent-1".to_string(),
status: RunStatus::Created,
..Default::default()
}
}
#[async_trait]
impl RunDispatchExecutor for NoopExecutor {
async fn run(
&self,
activation: RunActivation,
_sink: Arc<dyn EventSink>,
) -> Result<AgentRunResult, AgentLoopError> {
Ok(AgentRunResult {
run_id: activation
.run_id_hint()
.unwrap_or("pending-test-run")
.to_string(),
response: "ok".to_string(),
termination: TerminationReason::NaturalEnd,
steps: 1,
})
}
fn cancel(&self, _id: &str) -> bool {
false
}
async fn cancel_and_wait_by_thread(&self, _thread_id: &str) -> bool {
false
}
fn send_decision(&self, _id: &str, _tool_call_id: String, _resume: ToolCallResume) -> bool {
false
}
}
#[tokio::test]
async fn foreground_interrupt_skips_queued_new_run_pending() {
let thread_store = Arc::new(InMemoryStore::new());
let mailbox = Arc::new(Mailbox::new_with_pending_thread_run_store(
Arc::new(NoopExecutor),
Arc::new(InMemoryMailboxStore::new()),
thread_store.clone(),
"consumer".to_string(),
MailboxConfig::default(),
));
mailbox
.deliver(
"thread-foreground-lane",
&[Message::user("queued future").with_id("queued-id".to_string())],
DeliveryMode::new_run(DeliveryGranularity::Batch),
)
.await
.unwrap();
let mut request = RunActivation::new(
"thread-foreground-lane",
vec![Message::user("interrupt now").with_id("interrupt-id".to_string())],
)
.with_agent_id("agent-1");
let messages = request.messages().to_vec();
let run_id = mailbox
.prepare_run_for_dispatch(&mut request, "thread-foreground-lane", &messages)
.await
.unwrap();
let committed = thread_store
.load_committed_messages("thread-foreground-lane")
.await
.unwrap()
.unwrap();
assert_eq!(committed.len(), 1);
assert_eq!(committed[0].id.as_deref(), Some("interrupt-id"));
let pending = thread_store
.load_pending_message_records("thread-foreground-lane")
.await
.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].pending_id, "queued-id");
assert_eq!(pending[0].delivery_mode.boundary, DeliveryBoundary::NewRun);
let run = thread_store.load_run(&run_id).await.unwrap().unwrap();
assert_eq!(
run.activation.unwrap().input.trigger_message_ids,
vec!["interrupt-id".to_string()]
);
}
#[tokio::test]
async fn next_step_freeze_skips_queued_new_run_pending() {
let thread_store = Arc::new(InMemoryStore::new());
let mailbox = Arc::new(Mailbox::new_with_pending_thread_run_store(
Arc::new(NoopExecutor),
Arc::new(InMemoryMailboxStore::new()),
thread_store.clone(),
"consumer".to_string(),
MailboxConfig::default(),
));
thread_store
.create_run(&created_run_record(
"thread-next-step-lane",
"run-next-step-lane",
))
.await
.unwrap();
let request = RunActivation::new("thread-next-step-lane", Vec::new())
.with_run_id_hint("run-next-step-lane");
let handler = mailbox
.pending_boundary_handler(&request, "run-next-step-lane", "resolution-test")
.expect("handler configured");
mailbox
.deliver(
"thread-next-step-lane",
&[Message::user("queued future").with_id("queued-id".to_string())],
DeliveryMode::new_run(DeliveryGranularity::Batch),
)
.await
.unwrap();
mailbox
.deliver(
"thread-next-step-lane",
&[Message::user("live steering").with_id("live-id".to_string())],
DeliveryMode::next_step(DeliveryGranularity::Batch),
)
.await
.unwrap();
handler
.freeze_pending_boundary(DeliveryBoundary::NextStep)
.await
.unwrap()
.expect("live message frozen");
let committed = thread_store
.load_committed_messages("thread-next-step-lane")
.await
.unwrap()
.unwrap();
assert_eq!(committed.len(), 1);
assert_eq!(committed[0].id.as_deref(), Some("live-id"));
let pending = thread_store
.load_pending_message_records("thread-next-step-lane")
.await
.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].pending_id, "queued-id");
assert_eq!(pending[0].delivery_mode.boundary, DeliveryBoundary::NewRun);
let run = thread_store
.load_run("run-next-step-lane")
.await
.unwrap()
.unwrap();
assert_eq!(
run.input.unwrap().trigger_message_ids,
vec!["live-id".to_string()]
);
}
#[tokio::test]
async fn targeted_next_step_does_not_fall_through_to_new_run() {
let thread_store = Arc::new(InMemoryStore::new());
let mailbox = Arc::new(Mailbox::new_with_pending_thread_run_store(
Arc::new(NoopExecutor),
Arc::new(InMemoryMailboxStore::new()),
thread_store.clone(),
"consumer".to_string(),
MailboxConfig::default(),
));
thread_store
.create_run(&created_run_record("thread-targeted", "run-b"))
.await
.unwrap();
let request = RunActivation::new("thread-targeted", Vec::new()).with_run_id_hint("run-b");
let handler = mailbox
.pending_boundary_handler(&request, "run-b", "resolution-test")
.expect("handler configured");
mailbox
.deliver(
"thread-targeted",
&[Message::user("stale steer").with_id("stale-id".to_string())],
DeliveryMode::next_step(DeliveryGranularity::Batch).targeted_to_run("run-a", false),
)
.await
.unwrap();
mailbox
.deliver(
"thread-targeted",
&[Message::user("queued").with_id("queued-id".to_string())],
DeliveryMode::new_run(DeliveryGranularity::Batch),
)
.await
.unwrap();
handler
.freeze_pending_boundary(DeliveryBoundary::NewRun)
.await
.unwrap()
.expect("queued message frozen");
let committed = thread_store
.load_committed_messages("thread-targeted")
.await
.unwrap()
.unwrap();
assert_eq!(committed.len(), 1);
assert_eq!(committed[0].id.as_deref(), Some("queued-id"));
let pending = thread_store
.load_pending_message_records("thread-targeted")
.await
.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].pending_id, "stale-id");
}
#[tokio::test]
async fn resume_input_does_not_consume_unrelated_queued_new_run() {
use awaken_server_contract::contract::tool_intercept::RunMode;
let thread_store = Arc::new(InMemoryStore::new());
let mailbox = Arc::new(Mailbox::new_with_pending_thread_run_store(
Arc::new(NoopExecutor),
Arc::new(InMemoryMailboxStore::new()),
thread_store.clone(),
"consumer".to_string(),
MailboxConfig::default(),
));
mailbox
.deliver(
"thread-resume-lane",
&[Message::user("later task").with_id("later-id".to_string())],
DeliveryMode::new_run(DeliveryGranularity::Batch),
)
.await
.unwrap();
let messages = vec![Message::user("yes").with_id("yes-id".to_string())];
let mut request =
RunActivation::new("thread-resume-lane", messages.clone()).with_run_id_hint("run-r");
request.trace.run_mode = RunMode::Resume;
let mut record = created_run_record("thread-resume-lane", "run-r");
mailbox
.prepare_pending_messages_for_dispatch(
&request,
"thread-resume-lane",
&messages,
"run-r",
&mut record,
"resolution-test",
)
.await
.unwrap()
.expect("resume input frozen");
let committed = thread_store
.load_committed_messages("thread-resume-lane")
.await
.unwrap()
.unwrap();
assert_eq!(committed.len(), 1);
assert_eq!(committed[0].id.as_deref(), Some("yes-id"));
let pending = thread_store
.load_pending_message_records("thread-resume-lane")
.await
.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].pending_id, "later-id");
assert_eq!(pending[0].delivery_mode.boundary, DeliveryBoundary::NewRun);
}