use std::sync::Arc;
use async_trait::async_trait;
use serde_json::{Value, json};
use awaken_contract::contract::content::ContentBlock;
use awaken_contract::contract::event::AgentEvent;
use awaken_contract::contract::event_sink::{EventSink, NullEventSink};
use awaken_contract::contract::executor::{InferenceExecutionError, InferenceRequest, LlmExecutor};
use awaken_contract::contract::identity::{RunIdentity, RunOrigin};
use awaken_contract::contract::inference::{StopReason, StreamResult, TokenUsage};
use awaken_contract::contract::lifecycle::{RunStatus, TerminationReason};
use awaken_contract::contract::message::{Message, ToolCall};
use awaken_contract::contract::tool::{
Tool, ToolCallContext, ToolDescriptor, ToolError, ToolOutput, ToolResult,
};
use awaken_runtime::agent::state::RunLifecycle;
use awaken_runtime::backend::{
BackendControl, BackendDelegatePolicy, BackendDelegateRunRequest, BackendParentContext,
};
use awaken_runtime::extensions::background::{
BackgroundTaskManager, BackgroundTaskPlugin, TaskParentContext, TaskResult as BgTaskResult,
};
use awaken_runtime::loop_runner::{AgentLoopParams, build_agent_env, run_agent_loop};
use awaken_runtime::phase::PhaseRuntime;
use awaken_runtime::plugins::Plugin;
use awaken_runtime::registry::{
AgentResolver, ExecutionResolver, ResolvedAgent, ResolvedExecution,
};
use awaken_runtime::state::StateStore;
use awaken_runtime::{RuntimeError, inbox};
struct ScriptedLlm {
responses: std::sync::Mutex<Vec<StreamResult>>,
}
impl ScriptedLlm {
fn new(responses: Vec<StreamResult>) -> Self {
Self {
responses: std::sync::Mutex::new(responses),
}
}
}
#[async_trait]
impl LlmExecutor for ScriptedLlm {
async fn execute(
&self,
_req: InferenceRequest,
) -> Result<StreamResult, InferenceExecutionError> {
let mut responses = self.responses.lock().unwrap();
if responses.is_empty() {
Ok(StreamResult {
content: vec![ContentBlock::text("done")],
tool_calls: vec![],
usage: None,
stop_reason: Some(StopReason::EndTurn),
has_incomplete_tool_calls: false,
})
} else {
Ok(responses.remove(0))
}
}
fn name(&self) -> &str {
"scripted"
}
}
struct SpawnTaskTool {
manager: Arc<BackgroundTaskManager>,
}
#[async_trait]
impl Tool for SpawnTaskTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("spawn_task", "spawn_task", "Spawn a background task")
}
async fn execute(&self, _args: Value, _ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
let id = self
.manager
.spawn(
"thread-1",
"test",
Some("worker"),
"background worker",
TaskParentContext::default(),
|ctx| async move {
ctx.cancelled().await;
BgTaskResult::Cancelled
},
)
.await
.map_err(|e| ToolError::ExecutionFailed(e.to_string()))?;
Ok(ToolResult::success("spawn_task", json!({"task_id": id})).into())
}
}
struct SpawnEmitterTool {
manager: Arc<BackgroundTaskManager>,
}
#[async_trait]
impl Tool for SpawnEmitterTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new(
"spawn_emitter",
"spawn_emitter",
"Spawn a task that emits events",
)
}
async fn execute(&self, _args: Value, _ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
let id = self
.manager
.spawn(
"thread-1",
"emitter",
None,
"emitting task",
TaskParentContext::default(),
|ctx| async move {
ctx.emit("data", json!({"rows": 42}));
BgTaskResult::Success(json!({"emitted": true}))
},
)
.await
.map_err(|e| ToolError::ExecutionFailed(e.to_string()))?;
Ok(ToolResult::success("spawn_emitter", json!({"task_id": id})).into())
}
}
fn make_bg_runtime() -> (
PhaseRuntime,
StateStore,
Arc<BackgroundTaskManager>,
Vec<Arc<dyn Plugin>>,
) {
let store = StateStore::new();
let manager = Arc::new(BackgroundTaskManager::new());
manager.set_store(store.clone());
store
.install_plugin(awaken_runtime::loop_runner::LoopStatePlugin)
.unwrap();
let bg_plugin = Arc::new(BackgroundTaskPlugin::new(manager.clone()));
store
.install_plugin(BackgroundTaskPlugin::new(manager.clone()))
.unwrap();
let runtime = PhaseRuntime::new(store.clone()).unwrap();
(runtime, store, manager, vec![bg_plugin as Arc<dyn Plugin>])
}
struct FixedResolver {
agent: ResolvedAgent,
plugins: Vec<Arc<dyn Plugin>>,
}
impl AgentResolver for FixedResolver {
fn resolve(&self, _agent_id: &str) -> Result<ResolvedAgent, RuntimeError> {
let mut agent = self.agent.clone();
agent.env = build_agent_env(&self.plugins, &agent)?;
Ok(agent)
}
}
impl ExecutionResolver for FixedResolver {
fn resolve_execution(&self, agent_id: &str) -> Result<ResolvedExecution, RuntimeError> {
self.resolve(agent_id).map(ResolvedExecution::local)
}
}
fn local_delegate_request<'a>(
resolver: &'a dyn ExecutionResolver,
agent_id: &'a str,
messages: Vec<Message>,
sink: Arc<dyn EventSink>,
parent_run_id: Option<String>,
parent_thread_id: Option<String>,
) -> BackendDelegateRunRequest<'a> {
BackendDelegateRunRequest {
agent_id,
new_messages: messages.clone(),
messages,
sink,
resolver,
parent: BackendParentContext {
parent_run_id,
parent_thread_id,
parent_tool_call_id: None,
},
control: BackendControl::default(),
policy: BackendDelegatePolicy::default(),
}
}
fn test_identity() -> RunIdentity {
RunIdentity::new(
"thread-1".into(),
None,
"run-1".into(),
None,
"test-agent".into(),
RunOrigin::User,
)
}
fn make_tool_call_response(tool_name: &str) -> StreamResult {
StreamResult {
content: vec![ContentBlock::text("calling tool")],
tool_calls: vec![ToolCall::new("c1", tool_name, json!({}))],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::ToolUse),
has_incomplete_tool_calls: false,
}
}
fn make_text_response(text: &str) -> StreamResult {
StreamResult {
content: vec![ContentBlock::text(text)],
tool_calls: vec![],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::EndTurn),
has_incomplete_tool_calls: false,
}
}
#[tokio::test]
async fn agent_with_running_task_enters_awaiting_tasks() {
let (runtime, store, manager, bg_plugins) = make_bg_runtime();
let tool: Arc<dyn Tool> = Arc::new(SpawnTaskTool {
manager: manager.clone(),
});
let llm = Arc::new(ScriptedLlm::new(vec![
make_tool_call_response("spawn_task"),
make_text_response("Task spawned, waiting for completion."),
]));
let agent = ResolvedAgent::new("test", "m", "sys", llm).with_tool(tool);
let resolver = FixedResolver {
agent,
plugins: bg_plugins,
};
let result = run_agent_loop(AgentLoopParams {
resolver: &resolver,
agent_id: "test",
runtime: &runtime,
sink: Arc::new(NullEventSink),
checkpoint_store: None,
messages: vec![Message::user("spawn a background task")],
run_identity: test_identity(),
cancellation_token: None,
decision_rx: None,
overrides: None,
frontend_tools: Vec::new(),
inbox: None,
is_continuation: false,
})
.await
.unwrap();
assert_eq!(result.termination, TerminationReason::NaturalEnd);
let lifecycle = store.read::<RunLifecycle>().unwrap();
assert_eq!(lifecycle.status, RunStatus::Waiting);
assert_eq!(lifecycle.status_reason.as_deref(), Some("awaiting_tasks"));
assert!(lifecycle.step_count >= 2);
assert!(manager.has_running("thread-1").await);
manager.cancel_all("thread-1").await;
}
#[tokio::test]
async fn agent_without_tasks_completes_normally() {
let (runtime, store, _manager, bg_plugins) = make_bg_runtime();
let llm = Arc::new(ScriptedLlm::new(vec![make_text_response(
"Hello, no tasks needed.",
)]));
let agent = ResolvedAgent::new("test", "m", "sys", llm);
let resolver = FixedResolver {
agent,
plugins: bg_plugins,
};
let result = run_agent_loop(AgentLoopParams {
resolver: &resolver,
agent_id: "test",
runtime: &runtime,
sink: Arc::new(NullEventSink),
checkpoint_store: None,
messages: vec![Message::user("hello")],
run_identity: test_identity(),
cancellation_token: None,
decision_rx: None,
overrides: None,
frontend_tools: Vec::new(),
inbox: None,
is_continuation: false,
})
.await
.unwrap();
assert_eq!(result.termination, TerminationReason::NaturalEnd);
let lifecycle = store.read::<RunLifecycle>().unwrap();
assert_eq!(lifecycle.status, RunStatus::Done);
assert_eq!(lifecycle.status_reason.as_deref(), Some("natural"));
}
#[tokio::test]
async fn task_event_injected_into_conversation() {
let (runtime, store, manager, _bg_plugins) = make_bg_runtime();
let (inbox_tx, inbox_rx) = inbox::inbox_channel();
drop((runtime, store, manager));
let store = StateStore::new();
let mgr = BackgroundTaskManager::new();
mgr.set_owner_inbox(inbox_tx);
let manager = Arc::new(mgr);
manager.set_store(store.clone());
store
.install_plugin(awaken_runtime::loop_runner::LoopStatePlugin)
.unwrap();
let bg_plugin = Arc::new(BackgroundTaskPlugin::new(manager.clone()));
store
.install_plugin(BackgroundTaskPlugin::new(manager.clone()))
.unwrap();
let runtime = PhaseRuntime::new(store.clone()).unwrap();
let tool: Arc<dyn Tool> = Arc::new(SpawnEmitterTool {
manager: manager.clone(),
});
let llm = Arc::new(ScriptedLlm::new(vec![
make_tool_call_response("spawn_emitter"),
make_text_response("Processed the event data."),
]));
let agent = ResolvedAgent::new("test", "m", "sys", llm).with_tool(tool);
let resolver = FixedResolver {
agent,
plugins: vec![bg_plugin as Arc<dyn Plugin>],
};
let result = run_agent_loop(AgentLoopParams {
resolver: &resolver,
agent_id: "test",
runtime: &runtime,
sink: Arc::new(NullEventSink),
checkpoint_store: None,
messages: vec![Message::user("spawn an emitter")],
run_identity: test_identity(),
cancellation_token: None,
decision_rx: None,
overrides: None,
frontend_tools: Vec::new(),
inbox: Some(inbox_rx),
is_continuation: false,
})
.await
.unwrap();
assert_eq!(result.termination, TerminationReason::NaturalEnd);
}
#[tokio::test]
async fn parent_child_message_roundtrip() {
use awaken_runtime::extensions::background::SendError;
let store = StateStore::new();
let parent_mgr = BackgroundTaskManager::new();
let (parent_inbox_tx, mut parent_inbox_rx) = inbox::inbox_channel();
parent_mgr.set_owner_inbox(parent_inbox_tx);
let parent_mgr = Arc::new(parent_mgr);
parent_mgr.set_store(store.clone());
store
.install_plugin(awaken_runtime::loop_runner::LoopStatePlugin)
.unwrap();
store
.install_plugin(BackgroundTaskPlugin::new(parent_mgr.clone()))
.unwrap();
let child_id = parent_mgr
.spawn_agent(
"thread-1",
Some("worker"),
"long-running worker agent",
TaskParentContext {
task_id: None,
run_id: Some("run-parent".into()),
call_id: None,
agent_id: Some("parent-agent".into()),
},
|_cancel, _child_inbox_sender, mut child_inbox_rx| async move {
let mut instruction = None;
for _ in 0..100 {
if let Some(msg) = child_inbox_rx.try_recv() {
instruction = Some(msg);
break;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
let instruction =
instruction.expect("child should receive instruction from parent");
let content = instruction["payload"]["content"]
.as_str()
.unwrap_or("no content");
BgTaskResult::Success(serde_json::json!({
"final_result": format!("completed: {content}"),
}))
},
)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert!(parent_mgr.has_running("thread-1").await);
let send_result = parent_mgr
.send_task_inbox_message(
&child_id,
"thread-1",
"parent-agent",
"analyze schema drift",
)
.await;
assert!(
send_result.is_ok(),
"parent should successfully send to child"
);
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let final_status = parent_mgr.get(&child_id).await.unwrap();
assert_eq!(
final_status.status,
awaken_runtime::extensions::background::TaskStatus::Completed
);
let result = final_status.result.unwrap();
assert!(
result["final_result"]
.as_str()
.unwrap()
.contains("analyze schema drift"),
"child result should contain the instruction: {result:?}"
);
assert!(!parent_mgr.has_running("thread-1").await);
let parent_msgs = parent_inbox_rx.drain();
assert!(
parent_msgs
.iter()
.any(|m| m.get("kind").and_then(|k| k.as_str()) == Some("completed")),
"parent should receive completion event from child"
);
let late_send = parent_mgr
.send_task_inbox_message(&child_id, "thread-1", "parent-agent", "too late")
.await;
assert!(
matches!(late_send, Err(SendError::TaskTerminated(_))),
"sending to completed child should fail"
);
}
#[tokio::test]
async fn parallel_children_independent_messaging() {
let store = StateStore::new();
let parent_mgr = Arc::new(BackgroundTaskManager::new());
parent_mgr.set_store(store.clone());
store
.install_plugin(awaken_runtime::loop_runner::LoopStatePlugin)
.unwrap();
store
.install_plugin(BackgroundTaskPlugin::new(parent_mgr.clone()))
.unwrap();
let mut child_ids = Vec::new();
for name in &["alpha", "beta", "gamma"] {
let id = parent_mgr
.spawn_agent(
"thread-1",
Some(name),
&format!("{name} worker"),
TaskParentContext::default(),
|_cancel, _sender, mut rx| async move {
for _ in 0..100 {
if let Some(msg) = rx.try_recv() {
let content = msg["payload"]["content"]
.as_str()
.unwrap_or("none")
.to_string();
return BgTaskResult::Success(serde_json::json!({"echo": content}));
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
BgTaskResult::Failed("timeout waiting for message".into())
},
)
.await
.unwrap();
child_ids.push((name.to_string(), id));
}
for (name, id) in &child_ids {
parent_mgr
.send_task_inbox_message(id, "thread-1", "parent", &format!("task for {name}"))
.await
.unwrap();
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
for (name, id) in &child_ids {
let task = parent_mgr.get(id).await.unwrap();
assert_eq!(
task.status,
awaken_runtime::extensions::background::TaskStatus::Completed,
"{name} should be completed"
);
let echo = task.result.as_ref().unwrap()["echo"].as_str().unwrap();
assert_eq!(
echo,
format!("task for {name}"),
"{name} should echo its own instruction"
);
}
assert!(!parent_mgr.has_running("thread-1").await);
}
#[tokio::test]
async fn cancelled_child_rejects_messages() {
let store = StateStore::new();
let parent_mgr = Arc::new(BackgroundTaskManager::new());
parent_mgr.set_store(store.clone());
store
.install_plugin(awaken_runtime::loop_runner::LoopStatePlugin)
.unwrap();
store
.install_plugin(BackgroundTaskPlugin::new(parent_mgr.clone()))
.unwrap();
let id = parent_mgr
.spawn_agent(
"thread-1",
Some("worker"),
"cancellable",
TaskParentContext::default(),
|cancel, _sender, _rx| async move {
cancel.cancelled().await;
BgTaskResult::Cancelled
},
)
.await
.unwrap();
let r1 = parent_mgr
.send_task_inbox_message(&id, "thread-1", "parent", "before cancel")
.await;
assert!(r1.is_ok());
parent_mgr.cancel(&id).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let r2 = parent_mgr
.send_task_inbox_message(&id, "thread-1", "parent", "after cancel")
.await;
assert!(
r2.is_err(),
"message to cancelled child should fail: {r2:?}"
);
}
#[tokio::test]
async fn local_backend_sub_agent_receives_bg_task_events() {
use awaken_runtime::extensions::a2a::{DelegateRunStatus, LocalBackend};
use std::sync::atomic::{AtomicUsize, Ordering};
let call_count = Arc::new(AtomicUsize::new(0));
struct CountingLlm {
counter: Arc<AtomicUsize>,
}
#[async_trait]
impl LlmExecutor for CountingLlm {
async fn execute(
&self,
_req: InferenceRequest,
) -> Result<StreamResult, InferenceExecutionError> {
let n = self.counter.fetch_add(1, Ordering::SeqCst);
Ok(StreamResult {
content: vec![ContentBlock::text(format!("response {n}"))],
tool_calls: vec![],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::EndTurn),
has_incomplete_tool_calls: false,
})
}
fn name(&self) -> &str {
"counting"
}
}
let llm = Arc::new(CountingLlm {
counter: call_count.clone(),
});
let agent = ResolvedAgent::new("sub", "m", "You are a sub-agent.", llm);
let resolver = Arc::new(FixedResolver {
agent,
plugins: vec![],
});
let backend = LocalBackend::new();
let result = backend
.execute_delegate(local_delegate_request(
resolver.as_ref(),
"sub",
vec![Message::user("do work")],
Arc::new(NullEventSink),
Some("parent-run".into()),
Some("parent-thread".into()),
))
.await
.unwrap();
assert!(matches!(result.status, DelegateRunStatus::Completed));
assert!(call_count.load(Ordering::SeqCst) >= 1);
assert!(result.inbox.is_some());
assert!(result.inbox.as_ref().unwrap().is_closed());
}
#[tokio::test]
async fn multi_level_bg_task_event_reaches_sub_agent() {
use awaken_runtime::extensions::a2a::{DelegateRunStatus, LocalBackend};
use std::sync::atomic::{AtomicUsize, Ordering};
let call_count = Arc::new(AtomicUsize::new(0));
struct BgSpawningLlm {
counter: Arc<AtomicUsize>,
}
#[async_trait]
impl LlmExecutor for BgSpawningLlm {
async fn execute(
&self,
_req: InferenceRequest,
) -> Result<StreamResult, InferenceExecutionError> {
let n = self.counter.fetch_add(1, Ordering::SeqCst);
if n == 0 {
Ok(StreamResult {
content: vec![ContentBlock::text("spawning bg task")],
tool_calls: vec![ToolCall::new("c1", "spawn_bg", json!({}))],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::ToolUse),
has_incomplete_tool_calls: false,
})
} else {
Ok(StreamResult {
content: vec![ContentBlock::text(format!("done (turn {n})"))],
tool_calls: vec![],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::EndTurn),
has_incomplete_tool_calls: false,
})
}
}
fn name(&self) -> &str {
"bg-spawning"
}
}
struct SpawnBgTool;
#[async_trait]
impl Tool for SpawnBgTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("spawn_bg", "spawn_bg", "Spawn a background task that emits")
}
async fn execute(
&self,
_args: Value,
_ctx: &ToolCallContext,
) -> Result<ToolOutput, ToolError> {
Ok(ToolResult::success("spawn_bg", json!({"spawned": true})).into())
}
}
let llm = Arc::new(BgSpawningLlm {
counter: call_count.clone(),
});
let tool: Arc<dyn Tool> = Arc::new(SpawnBgTool);
let agent = ResolvedAgent::new("sub", "m", "You are a sub-agent.", llm).with_tool(tool);
let resolver = Arc::new(FixedResolver {
agent,
plugins: vec![],
});
let backend = LocalBackend::new();
let result = backend
.execute_delegate(local_delegate_request(
resolver.as_ref(),
"sub",
vec![Message::user("spawn a bg task")],
Arc::new(NullEventSink),
Some("parent-run".into()),
Some("parent-thread".into()),
))
.await
.unwrap();
assert!(matches!(result.status, DelegateRunStatus::Completed));
assert!(
call_count.load(Ordering::SeqCst) >= 2,
"LLM should be called at least twice"
);
assert!(result.inbox.is_some());
assert!(result.inbox.as_ref().unwrap().is_closed());
}
#[tokio::test]
async fn sub_agent_waits_for_bg_task_completion_before_returning() {
use std::sync::atomic::{AtomicUsize, Ordering};
let call_count = Arc::new(AtomicUsize::new(0));
struct ToolCallingLlm {
counter: Arc<AtomicUsize>,
}
#[async_trait]
impl LlmExecutor for ToolCallingLlm {
async fn execute(
&self,
_req: InferenceRequest,
) -> Result<StreamResult, InferenceExecutionError> {
let n = self.counter.fetch_add(1, Ordering::SeqCst);
match n {
0 => Ok(StreamResult {
content: vec![ContentBlock::text("spawning")],
tool_calls: vec![ToolCall::new("c1", "set_pending_then_clear", json!({}))],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::ToolUse),
has_incomplete_tool_calls: false,
}),
_ => Ok(StreamResult {
content: vec![ContentBlock::text(format!("final summary turn {n}"))],
tool_calls: vec![],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::EndTurn),
has_incomplete_tool_calls: false,
}),
}
}
fn name(&self) -> &str {
"tool-calling"
}
}
struct SpawnRealBgTaskTool {
manager: Arc<BackgroundTaskManager>,
}
#[async_trait]
impl Tool for SpawnRealBgTaskTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new(
"set_pending_then_clear",
"set_pending_then_clear",
"Spawn a real bg task",
)
}
async fn execute(
&self,
_args: Value,
_ctx: &ToolCallContext,
) -> Result<ToolOutput, ToolError> {
self.manager
.spawn(
"thread-sub",
"test",
None,
"delayed task",
TaskParentContext::default(),
|_ctx| async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
BgTaskResult::Success(json!({"data": "task finished"}))
},
)
.await
.map_err(|e| ToolError::ExecutionFailed(e.to_string()))?;
Ok(ToolResult::success("set_pending_then_clear", json!({"spawned": true})).into())
}
}
let store = StateStore::new();
let (inbox_sender, inbox_receiver) = crate::inbox::inbox_channel();
let bg_mgr = BackgroundTaskManager::new();
bg_mgr.set_owner_inbox(inbox_sender);
let bg_mgr = Arc::new(bg_mgr);
bg_mgr.set_store(store.clone());
store
.install_plugin(awaken_runtime::loop_runner::LoopStatePlugin)
.unwrap();
let bg_plugin = Arc::new(BackgroundTaskPlugin::new(bg_mgr.clone()));
store
.install_plugin(BackgroundTaskPlugin::new(bg_mgr.clone()))
.unwrap();
let runtime = PhaseRuntime::new(store.clone()).unwrap();
let llm = Arc::new(ToolCallingLlm {
counter: call_count.clone(),
});
let tool: Arc<dyn Tool> = Arc::new(SpawnRealBgTaskTool { manager: bg_mgr });
let agent = ResolvedAgent::new("sub", "m", "sys", llm).with_tool(tool);
let resolver = FixedResolver {
agent,
plugins: vec![bg_plugin as Arc<dyn Plugin>],
};
let sub_identity = RunIdentity::new(
"thread-sub".into(),
None,
"run-sub".into(),
Some("parent-run".into()),
"sub-agent".into(),
RunOrigin::Subagent,
);
let result = run_agent_loop(AgentLoopParams {
resolver: &resolver,
agent_id: "sub",
runtime: &runtime,
sink: Arc::new(NullEventSink),
checkpoint_store: None,
messages: vec![Message::user("run bg task")],
run_identity: sub_identity,
cancellation_token: None,
decision_rx: None,
overrides: None,
frontend_tools: Vec::new(),
inbox: Some(inbox_receiver),
is_continuation: false,
})
.await
.unwrap();
let total_calls = call_count.load(Ordering::SeqCst);
assert!(
total_calls >= 3,
"sub-agent should do at least 3 LLM turns (tool+wait+summary), got {total_calls}"
);
assert!(result.response.contains("final summary"));
assert_eq!(result.termination, TerminationReason::NaturalEnd);
}
#[tokio::test]
async fn inbox_events_have_structured_kind_field() {
let store = StateStore::new();
let (inbox_sender, mut inbox_receiver) = inbox::inbox_channel();
let bg_mgr = BackgroundTaskManager::new();
bg_mgr.set_owner_inbox(inbox_sender);
let bg_mgr = Arc::new(bg_mgr);
bg_mgr.set_store(store.clone());
store
.install_plugin(awaken_runtime::loop_runner::LoopStatePlugin)
.unwrap();
store
.install_plugin(BackgroundTaskPlugin::new(bg_mgr.clone()))
.unwrap();
bg_mgr
.spawn(
"thread-1",
"emitter",
None,
"quick emit",
TaskParentContext::default(),
|ctx| async move {
ctx.emit("data_ready", json!({"rows": 42}));
BgTaskResult::Success(json!("done"))
},
)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let msgs = inbox_receiver.drain();
let custom = msgs.iter().find(|m| m["kind"] == "custom");
assert!(custom.is_some(), "should have custom event");
assert_eq!(custom.unwrap()["event_type"], "data_ready");
let completed = msgs.iter().find(|m| m["kind"] == "completed");
assert!(completed.is_some(), "should have completed event");
for msg in &msgs {
assert!(
msg.get("task_id").is_some(),
"every event should have task_id"
);
}
}
#[tokio::test]
async fn non_blocking_spawn_agent_same_event_format() {
let store = StateStore::new();
let (inbox_sender, mut inbox_receiver) = inbox::inbox_channel();
let bg_mgr = BackgroundTaskManager::new();
bg_mgr.set_owner_inbox(inbox_sender);
let bg_mgr = Arc::new(bg_mgr);
bg_mgr.set_store(store.clone());
store
.install_plugin(awaken_runtime::loop_runner::LoopStatePlugin)
.unwrap();
store
.install_plugin(BackgroundTaskPlugin::new(bg_mgr.clone()))
.unwrap();
bg_mgr
.spawn_agent(
"thread-1",
Some("worker"),
"emitting worker",
TaskParentContext::default(),
|_cancel, _child_sender, _rx| async move {
BgTaskResult::Success(json!({"done": true}))
},
)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let msgs = inbox_receiver.drain();
assert!(!msgs.is_empty(), "parent inbox should have events");
let completed = msgs
.iter()
.find(|m| m.get("kind").and_then(|k| k.as_str()) == Some("completed"));
assert!(completed.is_some(), "should have completed event");
}
#[tokio::test]
async fn run_finish_signals_awaiting_tasks_in_result() {
let (runtime, _store, manager, bg_plugins) = make_bg_runtime();
let tool: Arc<dyn Tool> = Arc::new(SpawnTaskTool {
manager: manager.clone(),
});
let llm = Arc::new(ScriptedLlm::new(vec![
make_tool_call_response("spawn_task"),
make_text_response("spawned, waiting"),
]));
let agent = ResolvedAgent::new("test", "m", "sys", llm).with_tool(tool);
let resolver = FixedResolver {
agent,
plugins: bg_plugins,
};
let sink = Arc::new(awaken_contract::contract::event_sink::VecEventSink::new());
let _result = run_agent_loop(AgentLoopParams {
resolver: &resolver,
agent_id: "test",
runtime: &runtime,
sink: sink.clone() as Arc<dyn awaken_contract::contract::event_sink::EventSink>,
checkpoint_store: None,
messages: vec![Message::user("spawn task")],
run_identity: test_identity(),
cancellation_token: None,
decision_rx: None,
overrides: None,
frontend_tools: Vec::new(),
inbox: None,
is_continuation: false,
})
.await
.unwrap();
let events = sink.take();
let run_finish = events
.iter()
.find(|e| matches!(e, AgentEvent::RunFinish { .. }));
assert!(run_finish.is_some(), "should have RunFinish event");
if let AgentEvent::RunFinish {
result,
termination,
..
} = run_finish.unwrap()
{
assert_eq!(*termination, TerminationReason::NaturalEnd);
let result_json = result.as_ref().unwrap();
assert_eq!(result_json["status"], "waiting");
assert_eq!(result_json["status_reason"], "awaiting_tasks");
}
manager.cancel_all("thread-1").await;
}
#[tokio::test]
async fn run_finish_normal_end_no_awaiting_flag() {
let (_runtime, _store, _manager, bg_plugins) = make_bg_runtime();
let llm = Arc::new(ScriptedLlm::new(vec![make_text_response("hello")]));
let agent = ResolvedAgent::new("test", "m", "sys", llm);
let resolver = FixedResolver {
agent,
plugins: bg_plugins,
};
let sink = Arc::new(awaken_contract::contract::event_sink::VecEventSink::new());
let _result = run_agent_loop(AgentLoopParams {
resolver: &resolver,
agent_id: "test",
runtime: &_runtime,
sink: sink.clone() as Arc<dyn awaken_contract::contract::event_sink::EventSink>,
checkpoint_store: None,
messages: vec![Message::user("hi")],
run_identity: test_identity(),
cancellation_token: None,
decision_rx: None,
overrides: None,
frontend_tools: Vec::new(),
inbox: None,
is_continuation: false,
})
.await
.unwrap();
let events = sink.take();
let run_finish = events
.iter()
.find(|e| matches!(e, AgentEvent::RunFinish { .. }));
if let AgentEvent::RunFinish { result, .. } = run_finish.unwrap() {
let result_json = result.as_ref().unwrap();
assert_eq!(result_json["status"], "done");
assert_eq!(result_json["status_reason"], "natural");
}
}
#[tokio::test]
async fn local_backend_passes_parent_thread_id() {
use awaken_runtime::extensions::a2a::{DelegateRunStatus, LocalBackend};
use std::sync::atomic::{AtomicUsize, Ordering};
let call_count = Arc::new(AtomicUsize::new(0));
struct IdentityCapturingLlm {
counter: Arc<AtomicUsize>,
}
#[async_trait]
impl LlmExecutor for IdentityCapturingLlm {
async fn execute(
&self,
_req: InferenceRequest,
) -> Result<StreamResult, InferenceExecutionError> {
let n = self.counter.fetch_add(1, Ordering::SeqCst);
if n == 0 {
}
Ok(StreamResult {
content: vec![ContentBlock::text("done")],
tool_calls: vec![],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::EndTurn),
has_incomplete_tool_calls: false,
})
}
fn name(&self) -> &str {
"identity-capturing"
}
}
let llm = Arc::new(IdentityCapturingLlm {
counter: call_count.clone(),
});
let agent = ResolvedAgent::new("sub", "m", "sys", llm);
let resolver = Arc::new(FixedResolver {
agent,
plugins: vec![],
});
let backend = LocalBackend::new();
let result = backend
.execute_delegate(local_delegate_request(
resolver.as_ref(),
"sub",
vec![Message::user("hello")],
Arc::new(NullEventSink),
Some("parent-run-id".into()),
Some("parent-thread-id".into()),
))
.await
.unwrap();
assert!(matches!(result.status, DelegateRunStatus::Completed));
}
#[tokio::test]
async fn run_finish_cancelled_has_done_status() {
let (_runtime, _store, _manager, bg_plugins) = make_bg_runtime();
struct SlowLlm;
#[async_trait]
impl LlmExecutor for SlowLlm {
async fn execute(
&self,
_req: InferenceRequest,
) -> Result<StreamResult, InferenceExecutionError> {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
Ok(make_text_response("too slow"))
}
fn name(&self) -> &str {
"slow"
}
}
let llm = Arc::new(SlowLlm);
let agent = ResolvedAgent::new("test", "m", "sys", llm);
let resolver = FixedResolver {
agent,
plugins: bg_plugins,
};
let cancel_token = awaken_runtime::CancellationToken::new();
let cancel_token2 = cancel_token.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
cancel_token2.cancel();
});
let sink = Arc::new(awaken_contract::contract::event_sink::VecEventSink::new());
let _result = run_agent_loop(AgentLoopParams {
resolver: &resolver,
agent_id: "test",
runtime: &_runtime,
sink: sink.clone() as Arc<dyn awaken_contract::contract::event_sink::EventSink>,
checkpoint_store: None,
messages: vec![Message::user("hi")],
run_identity: test_identity(),
cancellation_token: Some(cancel_token),
decision_rx: None,
overrides: None,
frontend_tools: Vec::new(),
inbox: None,
is_continuation: false,
})
.await
.unwrap();
let events = sink.take();
let run_finish = events
.iter()
.find(|e| matches!(e, AgentEvent::RunFinish { .. }));
if let Some(AgentEvent::RunFinish {
result,
termination,
..
}) = run_finish
{
assert_eq!(*termination, TerminationReason::Cancelled);
let r = result.as_ref().unwrap();
assert_eq!(r["status"], "done");
assert_eq!(r["status_reason"], "cancelled");
} else {
panic!("should have RunFinish event");
}
}
#[tokio::test]
async fn run_finish_suspended_has_waiting_status() {
use awaken_contract::contract::message::ToolCall;
use awaken_contract::contract::tool::{Tool, ToolDescriptor, ToolResult};
struct SuspendTool;
#[async_trait]
impl Tool for SuspendTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("dangerous", "dangerous", "requires approval")
}
async fn execute(
&self,
_args: Value,
_ctx: &ToolCallContext,
) -> Result<ToolOutput, ToolError> {
Ok(ToolResult::suspended("dangerous", "needs approval").into())
}
}
let (_runtime, _store, _manager, bg_plugins) = make_bg_runtime();
let llm = Arc::new(ScriptedLlm::new(vec![StreamResult {
content: vec![ContentBlock::text("calling tool")],
tool_calls: vec![ToolCall::new("c1", "dangerous", json!({}))],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::ToolUse),
has_incomplete_tool_calls: false,
}]));
let tool: Arc<dyn Tool> = Arc::new(SuspendTool);
let agent = ResolvedAgent::new("test", "m", "sys", llm).with_tool(tool);
let resolver = FixedResolver {
agent,
plugins: bg_plugins,
};
let sink = Arc::new(awaken_contract::contract::event_sink::VecEventSink::new());
let _result = run_agent_loop(AgentLoopParams {
resolver: &resolver,
agent_id: "test",
runtime: &_runtime,
sink: sink.clone() as Arc<dyn awaken_contract::contract::event_sink::EventSink>,
checkpoint_store: None,
messages: vec![Message::user("do dangerous thing")],
run_identity: test_identity(),
cancellation_token: None,
decision_rx: None,
overrides: None,
frontend_tools: Vec::new(),
inbox: None,
is_continuation: false,
})
.await
.unwrap();
let events = sink.take();
let run_finishes: Vec<_> = events
.iter()
.filter(|e| matches!(e, AgentEvent::RunFinish { .. }))
.collect();
assert!(!run_finishes.is_empty(), "should have RunFinish events");
if let AgentEvent::RunFinish {
result,
termination,
..
} = run_finishes.last().unwrap()
{
assert_eq!(*termination, TerminationReason::Suspended);
let r = result.as_ref().unwrap();
assert_eq!(r["status"], "waiting");
assert_eq!(r["status_reason"], "suspended");
}
}