use async_trait::async_trait;
use awaken_contract::contract::identity::{RunIdentity, RunOrigin};
use awaken_contract::contract::lifecycle::TerminationReason;
use crate::loop_runner::{AgentLoopParams, prepare_resume, run_agent_loop};
use crate::registry::ResolvedAgent;
use crate::state::StateStore;
use super::{
BackendCapabilities, BackendDelegateContinuation, BackendDelegatePersistence,
BackendDelegateRunRequest, BackendRootRunRequest, BackendRunOutput, BackendRunResult,
BackendRunStatus, ExecutionBackend, ExecutionBackendError,
};
#[cfg(feature = "background")]
struct BackgroundControlResolver<'a> {
inner: &'a dyn crate::registry::ExecutionResolver,
context: Option<crate::extensions::background::BackgroundTaskExecutionContext>,
}
#[cfg(feature = "background")]
impl<'a> BackgroundControlResolver<'a> {
fn new(
inner: &'a dyn crate::registry::ExecutionResolver,
context: Option<crate::extensions::background::BackgroundTaskExecutionContext>,
) -> Self {
Self { inner, context }
}
}
#[cfg(feature = "background")]
impl crate::registry::AgentResolver for BackgroundControlResolver<'_> {
fn resolve(&self, agent_id: &str) -> Result<ResolvedAgent, crate::RuntimeError> {
let mut resolved = self.inner.resolve(agent_id)?;
if let Some(context) = &self.context {
LocalBackend::ensure_background_cancel_tool(&mut resolved, context);
}
Ok(resolved)
}
fn agent_ids(&self) -> Vec<String> {
self.inner.agent_ids()
}
}
#[cfg(feature = "background")]
impl crate::registry::ExecutionResolver for BackgroundControlResolver<'_> {
fn resolve_execution(
&self,
agent_id: &str,
) -> Result<crate::registry::ResolvedExecution, crate::RuntimeError> {
self.inner.resolve_execution(agent_id)
}
}
pub struct LocalBackend;
impl LocalBackend {
#[must_use]
pub fn new() -> Self {
Self
}
}
impl Default for LocalBackend {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ExecutionBackend for LocalBackend {
fn capabilities(&self) -> BackendCapabilities {
BackendCapabilities::full()
}
async fn execute_delegate(
&self,
request: BackendDelegateRunRequest<'_>,
) -> Result<BackendRunResult, ExecutionBackendError> {
Self::execute_delegate(self, request).await
}
async fn execute_root(
&self,
request: BackendRootRunRequest<'_>,
) -> Result<BackendRunResult, ExecutionBackendError> {
self.execute_root_with_thread_context(request, None).await
}
}
impl LocalBackend {
pub(crate) async fn execute_root_with_thread_context(
&self,
request: BackendRootRunRequest<'_>,
thread_ctx: Option<crate::ThreadContextSnapshot>,
) -> Result<BackendRunResult, ExecutionBackendError> {
let phase_runtime = request
.local
.as_ref()
.map(|context| context.phase_runtime)
.ok_or_else(|| {
ExecutionBackendError::ExecutionFailed(
"local root execution requires a phase runtime context".into(),
)
})?;
let run_identity = request.run_identity.clone();
let run_id = run_identity.run_id.clone();
if !request.decisions.is_empty() {
prepare_resume(phase_runtime.store(), request.decisions, None)
.map_err(crate::loop_runner::AgentLoopError::PhaseError)
.map_err(ExecutionBackendError::Loop)?;
}
let result = crate::loop_runner::run_agent_loop_with_thread_context(
AgentLoopParams {
resolver: request.resolver,
agent_id: request.agent_id,
runtime: phase_runtime,
sink: request.sink,
checkpoint_store: request.checkpoint_store,
messages: request.messages,
run_identity,
cancellation_token: request.control.cancellation_token,
decision_rx: request.control.decision_rx,
overrides: request.overrides,
frontend_tools: request.frontend_tools,
inbox: request.inbox,
is_continuation: request.is_continuation,
},
thread_ctx,
)
.await
.map_err(ExecutionBackendError::Loop)?;
let response = if result.response.is_empty() {
None
} else {
Some(result.response)
};
Ok(BackendRunResult {
agent_id: request.agent_id.to_string(),
status: map_termination(&result.termination),
termination: result.termination,
status_reason: None,
output: BackendRunOutput::from_text(response.clone()),
response,
steps: result.steps,
run_id: Some(run_id),
inbox: None,
state: None,
})
}
pub async fn execute_delegate(
&self,
request: BackendDelegateRunRequest<'_>,
) -> Result<BackendRunResult, ExecutionBackendError> {
match (request.policy.persistence, request.policy.continuation) {
(BackendDelegatePersistence::Ephemeral, BackendDelegateContinuation::Disabled) => {}
}
#[cfg(feature = "background")]
let background_context = crate::extensions::background::current_background_task_context();
let resolved = crate::registry::AgentResolver::resolve(request.resolver, request.agent_id)
.map_err(|error| {
ExecutionBackendError::AgentNotFound(format!(
"failed to resolve agent '{}': {error}",
request.agent_id
))
})?;
let store = crate::state::StateStore::new();
store
.install_plugin(crate::loop_runner::LoopStatePlugin)
.map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
let phase_runtime = crate::phase::PhaseRuntime::new(store.clone())
.map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
let (owner_inbox, inbox_receiver) = {
let (sender, receiver) = crate::inbox::inbox_channel();
(Some(sender), receiver)
};
Self::bind_local_execution_env(&store, &resolved, owner_inbox.as_ref())
.map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
#[cfg(feature = "background")]
let bg_manager = if resolved
.env
.plugins
.iter()
.any(|plugin| plugin.descriptor().name == "background_tasks")
{
None
} else {
let manager = crate::extensions::background::BackgroundTaskManager::new();
let manager = std::sync::Arc::new(manager);
manager.set_store(store.clone());
Some(manager)
};
#[cfg(feature = "background")]
if let Some(manager) = &bg_manager {
if let Some(sender) = owner_inbox.clone() {
manager.set_owner_inbox(sender);
}
store
.install_plugin(crate::extensions::background::BackgroundTaskPlugin::new(
manager.clone(),
))
.map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
}
#[cfg(feature = "background")]
let background_resolver =
BackgroundControlResolver::new(request.resolver, background_context.clone());
#[cfg(not(feature = "background"))]
let background_resolver = request.resolver;
let sub_run_id = uuid::Uuid::now_v7().to_string();
let mut run_identity = RunIdentity::new(
sub_run_id.clone(),
request.parent.parent_thread_id.clone(),
sub_run_id.clone(),
request.parent.parent_run_id.clone(),
request.agent_id.to_string(),
RunOrigin::Subagent,
);
if let Some(parent_tool_call_id) = request.parent.parent_tool_call_id.clone() {
run_identity = run_identity.with_parent_tool_call_id(parent_tool_call_id);
}
let result = run_agent_loop(AgentLoopParams {
resolver: &background_resolver,
agent_id: request.agent_id,
runtime: &phase_runtime,
sink: request.sink,
checkpoint_store: None,
messages: request.messages,
run_identity,
cancellation_token: request.control.cancellation_token,
decision_rx: request.control.decision_rx,
overrides: None,
frontend_tools: Vec::new(),
inbox: Some(inbox_receiver),
is_continuation: false,
})
.await
.map_err(ExecutionBackendError::Loop)?;
let response = if result.response.is_empty() {
None
} else {
Some(result.response)
};
Ok(BackendRunResult {
agent_id: request.agent_id.to_string(),
status: map_termination(&result.termination),
termination: result.termination,
status_reason: None,
output: BackendRunOutput::from_text(response.clone()),
response,
steps: result.steps,
run_id: Some(sub_run_id),
inbox: owner_inbox,
state: None,
})
}
#[cfg(feature = "background")]
fn ensure_background_cancel_tool(
resolved: &mut ResolvedAgent,
context: &crate::extensions::background::BackgroundTaskExecutionContext,
) {
if resolved
.tools
.contains_key(crate::extensions::background::CANCEL_TASK_TOOL_ID)
{
return;
}
let tool: std::sync::Arc<dyn awaken_contract::contract::tool::Tool> = std::sync::Arc::new(
crate::extensions::background::CancelTaskTool::with_current_task(
context.manager.clone(),
context.task_id.clone(),
),
);
resolved.tools.insert(
crate::extensions::background::CANCEL_TASK_TOOL_ID.into(),
tool.clone(),
);
resolved.env.tools.insert(
crate::extensions::background::CANCEL_TASK_TOOL_ID.into(),
tool,
);
}
pub(crate) fn bind_local_execution_env(
store: &StateStore,
resolved: &ResolvedAgent,
owner_inbox: Option<&crate::inbox::InboxSender>,
) -> Result<(), awaken_contract::StateError> {
if !resolved.env.key_registrations.is_empty() {
store.register_keys(&resolved.env.key_registrations)?;
}
for plugin in &resolved.env.plugins {
plugin.bind_runtime_context(store, owner_inbox);
}
Ok(())
}
}
fn map_termination(termination: &TerminationReason) -> BackendRunStatus {
match termination {
TerminationReason::NaturalEnd | TerminationReason::BehaviorRequested => {
BackendRunStatus::Completed
}
TerminationReason::Cancelled => BackendRunStatus::Cancelled,
TerminationReason::Stopped(reason) => {
BackendRunStatus::Failed(format!("stopped: {reason:?}"))
}
TerminationReason::Blocked(message) => {
BackendRunStatus::Failed(format!("blocked: {message}"))
}
TerminationReason::Suspended => BackendRunStatus::Suspended(None),
TerminationReason::Error(message) => BackendRunStatus::Failed(message.clone()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use awaken_contract::contract::content::ContentBlock;
use awaken_contract::contract::event_sink::NullEventSink;
use awaken_contract::contract::executor::{
InferenceExecutionError, InferenceRequest, LlmExecutor,
};
use awaken_contract::contract::inference::{StopReason, StreamResult, TokenUsage};
use awaken_contract::contract::message::{Message, ToolCall};
use awaken_contract::contract::tool::{
Tool, ToolCallContext, ToolDescriptor, ToolError, ToolOutput, ToolResult,
};
use serde_json::{Value, json};
use crate::backend::{
BackendControl, BackendDelegatePolicy, BackendDelegateRunRequest, BackendParentContext,
BackendRunStatus,
};
#[cfg(feature = "background")]
use crate::extensions::background::{
BackgroundTaskManager, BackgroundTaskPlugin, TaskParentContext, TaskResult as BgTaskResult,
TaskStatus,
};
use crate::loop_runner::build_agent_env;
use crate::plugins::{Plugin, PluginDescriptor};
use crate::registry::{AgentResolver, ExecutionResolver, ResolvedExecution};
#[cfg(feature = "background")]
use crate::state::StateStore;
struct ScriptedLlm {
responses: Mutex<Vec<StreamResult>>,
}
impl ScriptedLlm {
fn new(responses: Vec<StreamResult>) -> Self {
Self {
responses: Mutex::new(responses),
}
}
}
#[async_trait]
impl LlmExecutor for ScriptedLlm {
async fn execute(
&self,
_request: InferenceRequest,
) -> Result<StreamResult, InferenceExecutionError> {
let mut responses = self.responses.lock().unwrap();
assert!(!responses.is_empty(), "scripted LLM exhausted");
Ok(responses.remove(0))
}
fn name(&self) -> &str {
"scripted"
}
}
fn text_response(text: &str) -> StreamResult {
StreamResult {
content: vec![ContentBlock::text(text)],
tool_calls: vec![],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::EndTurn),
has_incomplete_tool_calls: false,
}
}
fn tool_call_response(text: &str, tool_name: &str, call_id: &str, args: Value) -> StreamResult {
StreamResult {
content: vec![ContentBlock::text(text)],
tool_calls: vec![ToolCall::new(call_id, tool_name, args)],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::ToolUse),
has_incomplete_tool_calls: false,
}
}
struct EchoTool;
#[async_trait]
impl Tool for EchoTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("echo", "echo", "Echoes input back")
}
async fn execute(
&self,
args: Value,
_ctx: &ToolCallContext,
) -> Result<ToolOutput, ToolError> {
Ok(ToolResult::success_with_message("echo", args, "tool result should not win").into())
}
}
#[cfg(feature = "background")]
struct CustomCancelTool {
called: Arc<AtomicUsize>,
}
#[cfg(feature = "background")]
#[async_trait]
impl Tool for CustomCancelTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("cancel_task", "cancel_task", "custom cancel task tool")
}
async fn execute(
&self,
args: Value,
_ctx: &ToolCallContext,
) -> Result<ToolOutput, ToolError> {
self.called.fetch_add(1, Ordering::SeqCst);
Ok(
ToolResult::success_with_message("cancel_task", args, "custom cancel handled")
.into(),
)
}
}
struct BindingPlugin {
bind_count: Arc<AtomicUsize>,
}
impl Plugin for BindingPlugin {
fn descriptor(&self) -> PluginDescriptor {
PluginDescriptor {
name: "binding-plugin",
}
}
fn bind_runtime_context(
&self,
_store: &crate::state::StateStore,
_owner_inbox: Option<&crate::inbox::InboxSender>,
) {
self.bind_count.fetch_add(1, Ordering::SeqCst);
}
}
struct FixedResolver {
agent: ResolvedAgent,
plugins: Vec<Arc<dyn Plugin>>,
}
impl AgentResolver for FixedResolver {
fn resolve(&self, _agent_id: &str) -> Result<ResolvedAgent, crate::RuntimeError> {
let mut agent = self.agent.clone();
agent.env = build_agent_env(&self.plugins, &agent).expect("build env");
Ok(agent)
}
}
impl ExecutionResolver for FixedResolver {
fn resolve_execution(
&self,
agent_id: &str,
) -> Result<ResolvedExecution, crate::RuntimeError> {
self.resolve(agent_id).map(ResolvedExecution::local)
}
}
#[tokio::test]
async fn execute_delegate_binds_plugin_runtime_context() {
let bind_count = Arc::new(AtomicUsize::new(0));
let plugin: Arc<dyn Plugin> = Arc::new(BindingPlugin {
bind_count: bind_count.clone(),
});
let resolver = FixedResolver {
agent: ResolvedAgent::new(
"delegate",
"m",
"sys",
Arc::new(ScriptedLlm::new(vec![text_response("delegated response")])),
),
plugins: vec![plugin],
};
let result = LocalBackend::new()
.execute_delegate(BackendDelegateRunRequest {
agent_id: "delegate",
messages: vec![Message::user("hello")],
new_messages: vec![Message::user("hello")],
sink: Arc::new(NullEventSink),
resolver: &resolver,
parent: BackendParentContext {
parent_run_id: Some("parent-run".into()),
parent_thread_id: Some("parent-thread".into()),
parent_tool_call_id: Some("tool-1".into()),
},
control: BackendControl::default(),
policy: BackendDelegatePolicy::default(),
})
.await
.expect("delegate execution should succeed");
assert!(matches!(result.status, BackendRunStatus::Completed));
assert_eq!(bind_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn execute_delegate_returns_final_non_tool_message_after_tool_output() {
let resolver = FixedResolver {
agent: ResolvedAgent::new(
"delegate",
"m",
"sys",
Arc::new(ScriptedLlm::new(vec![
tool_call_response(
"checking",
"echo",
"call-1",
json!({"message": "tool result should not win"}),
),
text_response("final child answer"),
])),
)
.with_tool(Arc::new(EchoTool)),
plugins: Vec::new(),
};
let result = LocalBackend::new()
.execute_delegate(BackendDelegateRunRequest {
agent_id: "delegate",
messages: vec![Message::user("delegate with a tool")],
new_messages: vec![Message::user("delegate with a tool")],
sink: Arc::new(NullEventSink),
resolver: &resolver,
parent: BackendParentContext {
parent_run_id: Some("parent-run".into()),
parent_thread_id: Some("parent-thread".into()),
parent_tool_call_id: Some("tool-1".into()),
},
control: BackendControl::default(),
policy: BackendDelegatePolicy::default(),
})
.await
.expect("delegate execution should succeed");
assert!(matches!(result.status, BackendRunStatus::Completed));
assert_eq!(result.response.as_deref(), Some("final child answer"));
assert_eq!(result.output.text.as_deref(), Some("final child answer"));
assert_eq!(result.steps, 2);
}
#[cfg(feature = "background")]
#[tokio::test]
async fn execute_delegate_in_background_task_can_self_cancel_and_cascade() {
let store = StateStore::new();
let manager = Arc::new(BackgroundTaskManager::new());
manager.set_store(store.clone());
let plugin: Arc<dyn Plugin> = Arc::new(BackgroundTaskPlugin::new(manager.clone()));
let env = crate::phase::ExecutionEnv::from_plugins(&[plugin], &Default::default())
.expect("background plugin env");
store
.register_keys(&env.key_registrations)
.expect("background keys should register");
let resolver = FixedResolver {
agent: ResolvedAgent::new(
"delegate",
"m",
"sys",
Arc::new(ScriptedLlm::new(vec![
tool_call_response(
"cancel self",
"cancel_task",
"call-1",
json!({"target": {"relation": "self"}}),
),
text_response("should not be reached after cancellation"),
])),
),
plugins: Vec::new(),
};
let child_task_id = Arc::new(tokio::sync::Mutex::new(None::<String>));
let child_task_id_seen = child_task_id.clone();
let resolver = Arc::new(resolver);
let task_id = manager
.spawn_agent_with_context(
"thread-1",
Some("worker"),
"worker agent",
TaskParentContext::default(),
{
let manager = manager.clone();
move |ctx| {
let manager = manager.clone();
let child_task_id_seen = child_task_id_seen.clone();
let resolver = resolver.clone();
async move {
assert!(
crate::extensions::background::current_background_task_context()
.is_some(),
"background task context should be visible inside spawned task"
);
let child_id = manager
.spawn(
"thread-1",
"child",
Some("leaf"),
"child task",
TaskParentContext {
task_id: Some(ctx.task_id.clone()),
..TaskParentContext::default()
},
|child_ctx| async move {
child_ctx.cancelled().await;
BgTaskResult::Cancelled
},
)
.await
.expect("child task should spawn");
*child_task_id_seen.lock().await = Some(child_id);
let result = LocalBackend::new()
.execute_delegate(BackendDelegateRunRequest {
agent_id: "delegate",
messages: vec![Message::user("cancel yourself")],
new_messages: vec![Message::user("cancel yourself")],
sink: Arc::new(NullEventSink),
resolver: resolver.as_ref(),
parent: BackendParentContext {
parent_run_id: Some("parent-run".into()),
parent_thread_id: Some("parent-thread".into()),
parent_tool_call_id: Some("tool-1".into()),
},
control: BackendControl {
cancellation_token: Some(ctx.cancel_token.clone()),
decision_rx: None,
},
policy: BackendDelegatePolicy::default(),
})
.await
.expect("delegate should run");
match result.status {
BackendRunStatus::Cancelled => BgTaskResult::Cancelled,
other => BgTaskResult::Failed(format!(
"expected cancelled delegate status, got {other}; response={:?}; output={:?}",
result.response,
result.output
)),
}
}
}
},
)
.await
.expect("background sub-agent should spawn");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let child_id = child_task_id
.lock()
.await
.clone()
.expect("child task id should be recorded");
let task_summary = manager
.get(&task_id)
.await
.expect("root task should still be queryable");
let child_summary = manager
.get(&child_id)
.await
.expect("child task should still be queryable");
assert_eq!(
task_summary.status,
TaskStatus::Cancelled,
"task={task_summary:?} child={child_summary:?}"
);
assert_eq!(
child_summary.status,
TaskStatus::Cancelled,
"task={task_summary:?} child={child_summary:?}"
);
}
#[cfg(feature = "background")]
#[tokio::test]
async fn execute_delegate_preserves_existing_cancel_task_tool_in_background_context() {
let store = StateStore::new();
let manager = Arc::new(BackgroundTaskManager::new());
manager.set_store(store.clone());
let plugin: Arc<dyn Plugin> = Arc::new(BackgroundTaskPlugin::new(manager.clone()));
let env = crate::phase::ExecutionEnv::from_plugins(&[plugin], &Default::default())
.expect("background plugin env");
store
.register_keys(&env.key_registrations)
.expect("background keys should register");
let cancel_calls = Arc::new(AtomicUsize::new(0));
let resolver = Arc::new(FixedResolver {
agent: ResolvedAgent::new(
"delegate",
"m",
"sys",
Arc::new(ScriptedLlm::new(vec![
tool_call_response(
"use custom cancel",
"cancel_task",
"call-1",
json!({"target": {"relation": "self"}}),
),
text_response("custom tool won"),
])),
)
.with_tool(Arc::new(CustomCancelTool {
called: cancel_calls.clone(),
})),
plugins: Vec::new(),
});
let task_id = manager
.spawn_agent_with_context(
"thread-1",
Some("worker"),
"worker agent",
TaskParentContext::default(),
move |ctx| {
let resolver = resolver.clone();
async move {
let result = LocalBackend::new()
.execute_delegate(BackendDelegateRunRequest {
agent_id: "delegate",
messages: vec![Message::user("use custom cancel")],
new_messages: vec![Message::user("use custom cancel")],
sink: Arc::new(NullEventSink),
resolver: resolver.as_ref(),
parent: BackendParentContext {
parent_run_id: Some("parent-run".into()),
parent_thread_id: Some("parent-thread".into()),
parent_tool_call_id: Some("tool-1".into()),
},
control: BackendControl {
cancellation_token: Some(ctx.cancel_token),
decision_rx: None,
},
policy: BackendDelegatePolicy::default(),
})
.await
.expect("delegate should run");
match result.status {
BackendRunStatus::Completed
if result.response.as_deref() == Some("custom tool won") =>
{
BgTaskResult::Success(json!({
"response": result.response,
"steps": result.steps,
}))
}
other => BgTaskResult::Failed(format!(
"expected completed delegate status, got {other}; response={:?}; output={:?}",
result.response,
result.output
)),
}
}
},
)
.await
.expect("background sub-agent should spawn");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert_eq!(cancel_calls.load(Ordering::SeqCst), 1);
let summary = manager
.get(&task_id)
.await
.expect("root task should be queryable");
assert_eq!(summary.status, TaskStatus::Completed, "task={summary:?}");
}
}