use async_trait::async_trait;
use awaken_contract::contract::identity::{RunIdentity, RunOrigin};
use awaken_contract::contract::lifecycle::TerminationReason;
use crate::loop_runner::{AgentLoopParams, prepare_resume, run_agent_loop};
use crate::registry::ResolvedAgent;
use crate::state::StateStore;
use super::{
BackendCapabilities, BackendDelegateContinuation, BackendDelegatePersistence,
BackendDelegateRunRequest, BackendRootRunRequest, BackendRunOutput, BackendRunResult,
BackendRunStatus, ExecutionBackend, ExecutionBackendError,
};
pub struct LocalBackend;
impl LocalBackend {
#[must_use]
pub fn new() -> Self {
Self
}
}
impl Default for LocalBackend {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ExecutionBackend for LocalBackend {
fn capabilities(&self) -> BackendCapabilities {
BackendCapabilities::full()
}
async fn execute_delegate(
&self,
request: BackendDelegateRunRequest<'_>,
) -> Result<BackendRunResult, ExecutionBackendError> {
Self::execute_delegate(self, request).await
}
async fn execute_root(
&self,
request: BackendRootRunRequest<'_>,
) -> Result<BackendRunResult, ExecutionBackendError> {
self.execute_root_with_thread_context(request, None).await
}
}
impl LocalBackend {
pub(crate) async fn execute_root_with_thread_context(
&self,
request: BackendRootRunRequest<'_>,
thread_ctx: Option<crate::ThreadContextSnapshot>,
) -> Result<BackendRunResult, ExecutionBackendError> {
let phase_runtime = request
.local
.as_ref()
.map(|context| context.phase_runtime)
.ok_or_else(|| {
ExecutionBackendError::ExecutionFailed(
"local root execution requires a phase runtime context".into(),
)
})?;
let run_identity = request.run_identity.clone();
let run_id = run_identity.run_id.clone();
if !request.decisions.is_empty() {
prepare_resume(phase_runtime.store(), request.decisions, None)
.map_err(crate::loop_runner::AgentLoopError::PhaseError)
.map_err(ExecutionBackendError::Loop)?;
}
let result = crate::loop_runner::run_agent_loop_with_thread_context(
AgentLoopParams {
resolver: request.resolver,
agent_id: request.agent_id,
runtime: phase_runtime,
sink: request.sink,
checkpoint_store: request.checkpoint_store,
messages: request.messages,
run_identity,
cancellation_token: request.control.cancellation_token,
decision_rx: request.control.decision_rx,
overrides: request.overrides,
frontend_tools: request.frontend_tools,
inbox: request.inbox,
is_continuation: request.is_continuation,
},
thread_ctx,
)
.await
.map_err(ExecutionBackendError::Loop)?;
let response = if result.response.is_empty() {
None
} else {
Some(result.response)
};
Ok(BackendRunResult {
agent_id: request.agent_id.to_string(),
status: map_termination(&result.termination),
termination: result.termination,
status_reason: None,
output: BackendRunOutput::from_text(response.clone()),
response,
steps: result.steps,
run_id: Some(run_id),
inbox: None,
state: None,
})
}
pub async fn execute_delegate(
&self,
request: BackendDelegateRunRequest<'_>,
) -> Result<BackendRunResult, ExecutionBackendError> {
match (request.policy.persistence, request.policy.continuation) {
(BackendDelegatePersistence::Ephemeral, BackendDelegateContinuation::Disabled) => {}
}
let resolved = request
.resolver
.resolve(request.agent_id)
.map_err(|error| {
ExecutionBackendError::AgentNotFound(format!(
"failed to resolve agent '{}': {error}",
request.agent_id
))
})?;
let store = crate::state::StateStore::new();
store
.install_plugin(crate::loop_runner::LoopStatePlugin)
.map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
let phase_runtime = crate::phase::PhaseRuntime::new(store.clone())
.map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
let (owner_inbox, inbox_receiver) = {
let (sender, receiver) = crate::inbox::inbox_channel();
(Some(sender), receiver)
};
Self::bind_local_execution_env(&store, &resolved, owner_inbox.as_ref())
.map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
#[cfg(feature = "background")]
let bg_manager = if resolved
.env
.plugins
.iter()
.any(|plugin| plugin.descriptor().name == "background_tasks")
{
None
} else {
let manager = crate::extensions::background::BackgroundTaskManager::new();
let manager = std::sync::Arc::new(manager);
manager.set_store(store.clone());
Some(manager)
};
#[cfg(feature = "background")]
if let Some(manager) = &bg_manager {
if let Some(sender) = owner_inbox.clone() {
manager.set_owner_inbox(sender);
}
store
.install_plugin(crate::extensions::background::BackgroundTaskPlugin::new(
manager.clone(),
))
.map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
}
let sub_run_id = uuid::Uuid::now_v7().to_string();
let mut run_identity = RunIdentity::new(
sub_run_id.clone(),
request.parent.parent_thread_id.clone(),
sub_run_id.clone(),
request.parent.parent_run_id.clone(),
request.agent_id.to_string(),
RunOrigin::Subagent,
);
if let Some(parent_tool_call_id) = request.parent.parent_tool_call_id.clone() {
run_identity = run_identity.with_parent_tool_call_id(parent_tool_call_id);
}
let result = run_agent_loop(AgentLoopParams {
resolver: request.resolver,
agent_id: request.agent_id,
runtime: &phase_runtime,
sink: request.sink,
checkpoint_store: None,
messages: request.messages,
run_identity,
cancellation_token: request.control.cancellation_token,
decision_rx: request.control.decision_rx,
overrides: None,
frontend_tools: Vec::new(),
inbox: Some(inbox_receiver),
is_continuation: false,
})
.await
.map_err(ExecutionBackendError::Loop)?;
let response = if result.response.is_empty() {
None
} else {
Some(result.response)
};
Ok(BackendRunResult {
agent_id: request.agent_id.to_string(),
status: map_termination(&result.termination),
termination: result.termination,
status_reason: None,
output: BackendRunOutput::from_text(response.clone()),
response,
steps: result.steps,
run_id: Some(sub_run_id),
inbox: owner_inbox,
state: None,
})
}
pub(crate) fn bind_local_execution_env(
store: &StateStore,
resolved: &ResolvedAgent,
owner_inbox: Option<&crate::inbox::InboxSender>,
) -> Result<(), awaken_contract::StateError> {
if !resolved.env.key_registrations.is_empty() {
store.register_keys(&resolved.env.key_registrations)?;
}
for plugin in &resolved.env.plugins {
plugin.bind_runtime_context(store, owner_inbox);
}
Ok(())
}
}
fn map_termination(termination: &TerminationReason) -> BackendRunStatus {
match termination {
TerminationReason::NaturalEnd | TerminationReason::BehaviorRequested => {
BackendRunStatus::Completed
}
TerminationReason::Cancelled => BackendRunStatus::Cancelled,
TerminationReason::Stopped(reason) => {
BackendRunStatus::Failed(format!("stopped: {reason:?}"))
}
TerminationReason::Blocked(message) => {
BackendRunStatus::Failed(format!("blocked: {message}"))
}
TerminationReason::Suspended => BackendRunStatus::Suspended(None),
TerminationReason::Error(message) => BackendRunStatus::Failed(message.clone()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use awaken_contract::contract::content::ContentBlock;
use awaken_contract::contract::event_sink::NullEventSink;
use awaken_contract::contract::executor::{
InferenceExecutionError, InferenceRequest, LlmExecutor,
};
use awaken_contract::contract::inference::{StopReason, StreamResult, TokenUsage};
use awaken_contract::contract::message::{Message, ToolCall};
use awaken_contract::contract::tool::{
Tool, ToolCallContext, ToolDescriptor, ToolError, ToolOutput, ToolResult,
};
use serde_json::{Value, json};
use crate::backend::{
BackendControl, BackendDelegatePolicy, BackendDelegateRunRequest, BackendParentContext,
};
use crate::loop_runner::build_agent_env;
use crate::plugins::{Plugin, PluginDescriptor};
use crate::registry::{AgentResolver, ExecutionResolver, ResolvedExecution};
struct ScriptedLlm {
responses: Mutex<Vec<StreamResult>>,
}
impl ScriptedLlm {
fn new(responses: Vec<StreamResult>) -> Self {
Self {
responses: Mutex::new(responses),
}
}
}
#[async_trait]
impl LlmExecutor for ScriptedLlm {
async fn execute(
&self,
_request: InferenceRequest,
) -> Result<StreamResult, InferenceExecutionError> {
let mut responses = self.responses.lock().unwrap();
assert!(!responses.is_empty(), "scripted LLM exhausted");
Ok(responses.remove(0))
}
fn name(&self) -> &str {
"scripted"
}
}
fn text_response(text: &str) -> StreamResult {
StreamResult {
content: vec![ContentBlock::text(text)],
tool_calls: vec![],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::EndTurn),
has_incomplete_tool_calls: false,
}
}
fn tool_call_response(text: &str, tool_name: &str, call_id: &str, args: Value) -> StreamResult {
StreamResult {
content: vec![ContentBlock::text(text)],
tool_calls: vec![ToolCall::new(call_id, tool_name, args)],
usage: Some(TokenUsage::default()),
stop_reason: Some(StopReason::ToolUse),
has_incomplete_tool_calls: false,
}
}
struct EchoTool;
#[async_trait]
impl Tool for EchoTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("echo", "echo", "Echoes input back")
}
async fn execute(
&self,
args: Value,
_ctx: &ToolCallContext,
) -> Result<ToolOutput, ToolError> {
Ok(ToolResult::success_with_message("echo", args, "tool result should not win").into())
}
}
struct BindingPlugin {
bind_count: Arc<AtomicUsize>,
}
impl Plugin for BindingPlugin {
fn descriptor(&self) -> PluginDescriptor {
PluginDescriptor {
name: "binding-plugin",
}
}
fn bind_runtime_context(
&self,
_store: &crate::state::StateStore,
_owner_inbox: Option<&crate::inbox::InboxSender>,
) {
self.bind_count.fetch_add(1, Ordering::SeqCst);
}
}
struct FixedResolver {
agent: ResolvedAgent,
plugins: Vec<Arc<dyn Plugin>>,
}
impl AgentResolver for FixedResolver {
fn resolve(&self, _agent_id: &str) -> Result<ResolvedAgent, crate::RuntimeError> {
let mut agent = self.agent.clone();
agent.env = build_agent_env(&self.plugins, &agent).expect("build env");
Ok(agent)
}
}
impl ExecutionResolver for FixedResolver {
fn resolve_execution(
&self,
agent_id: &str,
) -> Result<ResolvedExecution, crate::RuntimeError> {
self.resolve(agent_id).map(ResolvedExecution::local)
}
}
#[tokio::test]
async fn execute_delegate_binds_plugin_runtime_context() {
let bind_count = Arc::new(AtomicUsize::new(0));
let plugin: Arc<dyn Plugin> = Arc::new(BindingPlugin {
bind_count: bind_count.clone(),
});
let resolver = FixedResolver {
agent: ResolvedAgent::new(
"delegate",
"m",
"sys",
Arc::new(ScriptedLlm::new(vec![text_response("delegated response")])),
),
plugins: vec![plugin],
};
let result = LocalBackend::new()
.execute_delegate(BackendDelegateRunRequest {
agent_id: "delegate",
messages: vec![Message::user("hello")],
new_messages: vec![Message::user("hello")],
sink: Arc::new(NullEventSink),
resolver: &resolver,
parent: BackendParentContext {
parent_run_id: Some("parent-run".into()),
parent_thread_id: Some("parent-thread".into()),
parent_tool_call_id: Some("tool-1".into()),
},
control: BackendControl::default(),
policy: BackendDelegatePolicy::default(),
})
.await
.expect("delegate execution should succeed");
assert!(matches!(result.status, BackendRunStatus::Completed));
assert_eq!(bind_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn execute_delegate_returns_final_non_tool_message_after_tool_output() {
let resolver = FixedResolver {
agent: ResolvedAgent::new(
"delegate",
"m",
"sys",
Arc::new(ScriptedLlm::new(vec![
tool_call_response(
"checking",
"echo",
"call-1",
json!({"message": "tool result should not win"}),
),
text_response("final child answer"),
])),
)
.with_tool(Arc::new(EchoTool)),
plugins: Vec::new(),
};
let result = LocalBackend::new()
.execute_delegate(BackendDelegateRunRequest {
agent_id: "delegate",
messages: vec![Message::user("delegate with a tool")],
new_messages: vec![Message::user("delegate with a tool")],
sink: Arc::new(NullEventSink),
resolver: &resolver,
parent: BackendParentContext {
parent_run_id: Some("parent-run".into()),
parent_thread_id: Some("parent-thread".into()),
parent_tool_call_id: Some("tool-1".into()),
},
control: BackendControl::default(),
policy: BackendDelegatePolicy::default(),
})
.await
.expect("delegate execution should succeed");
assert!(matches!(result.status, BackendRunStatus::Completed));
assert_eq!(result.response.as_deref(), Some("final child answer"));
assert_eq!(result.output.text.as_deref(), Some("final child answer"));
assert_eq!(result.steps, 2);
}
}