1use async_trait::async_trait;
4use awaken_contract::contract::identity::{RunIdentity, RunOrigin};
5use awaken_contract::contract::lifecycle::TerminationReason;
6
7use crate::loop_runner::{AgentLoopParams, prepare_resume, run_agent_loop};
8use crate::registry::ResolvedAgent;
9use crate::state::StateStore;
10
11use super::{
12 BackendCapabilities, BackendDelegateContinuation, BackendDelegatePersistence,
13 BackendDelegateRunRequest, BackendRootRunRequest, BackendRunOutput, BackendRunResult,
14 BackendRunStatus, ExecutionBackend, ExecutionBackendError,
15};
16
17pub struct LocalBackend;
19
20impl LocalBackend {
21 #[must_use]
22 pub fn new() -> Self {
23 Self
24 }
25}
26
27impl Default for LocalBackend {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33#[async_trait]
34impl ExecutionBackend for LocalBackend {
35 fn capabilities(&self) -> BackendCapabilities {
36 BackendCapabilities::full()
37 }
38
39 async fn execute_delegate(
40 &self,
41 request: BackendDelegateRunRequest<'_>,
42 ) -> Result<BackendRunResult, ExecutionBackendError> {
43 Self::execute_delegate(self, request).await
44 }
45
46 async fn execute_root(
47 &self,
48 request: BackendRootRunRequest<'_>,
49 ) -> Result<BackendRunResult, ExecutionBackendError> {
50 let phase_runtime = request
51 .local
52 .as_ref()
53 .map(|context| context.phase_runtime)
54 .ok_or_else(|| {
55 ExecutionBackendError::ExecutionFailed(
56 "local root execution requires a phase runtime context".into(),
57 )
58 })?;
59 let run_identity = request.run_identity.clone();
60 let run_id = run_identity.run_id.clone();
61 if !request.decisions.is_empty() {
62 prepare_resume(phase_runtime.store(), request.decisions, None)
63 .map_err(crate::loop_runner::AgentLoopError::PhaseError)
64 .map_err(ExecutionBackendError::Loop)?;
65 }
66
67 let result = run_agent_loop(AgentLoopParams {
68 resolver: request.resolver,
69 agent_id: request.agent_id,
70 runtime: phase_runtime,
71 sink: request.sink,
72 checkpoint_store: request.checkpoint_store,
73 messages: request.messages,
74 run_identity,
75 cancellation_token: request.control.cancellation_token,
76 decision_rx: request.control.decision_rx,
77 overrides: request.overrides,
78 frontend_tools: request.frontend_tools,
79 inbox: request.inbox,
80 is_continuation: request.is_continuation,
81 })
82 .await
83 .map_err(ExecutionBackendError::Loop)?;
84
85 let response = if result.response.is_empty() {
86 None
87 } else {
88 Some(result.response)
89 };
90 Ok(BackendRunResult {
91 agent_id: request.agent_id.to_string(),
92 status: map_termination(&result.termination),
93 termination: result.termination,
94 status_reason: None,
95 output: BackendRunOutput::from_text(response.clone()),
96 response,
97 steps: result.steps,
98 run_id: Some(run_id),
99 inbox: None,
100 state: None,
101 })
102 }
103}
104
105impl LocalBackend {
106 pub async fn execute_delegate(
107 &self,
108 request: BackendDelegateRunRequest<'_>,
109 ) -> Result<BackendRunResult, ExecutionBackendError> {
110 match (request.policy.persistence, request.policy.continuation) {
111 (BackendDelegatePersistence::Ephemeral, BackendDelegateContinuation::Disabled) => {}
112 }
113 let resolved = request
114 .resolver
115 .resolve(request.agent_id)
116 .map_err(|error| {
117 ExecutionBackendError::AgentNotFound(format!(
118 "failed to resolve agent '{}': {error}",
119 request.agent_id
120 ))
121 })?;
122
123 let store = crate::state::StateStore::new();
124 store
125 .install_plugin(crate::loop_runner::LoopStatePlugin)
126 .map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
127
128 let phase_runtime = crate::phase::PhaseRuntime::new(store.clone())
129 .map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
130
131 let (owner_inbox, inbox_receiver) = {
132 let (sender, receiver) = crate::inbox::inbox_channel();
133 (Some(sender), receiver)
134 };
135
136 Self::bind_local_execution_env(&store, &resolved, owner_inbox.as_ref())
137 .map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
138
139 #[cfg(feature = "background")]
140 let bg_manager = if resolved
141 .env
142 .plugins
143 .iter()
144 .any(|plugin| plugin.descriptor().name == "background_tasks")
145 {
146 None
147 } else {
148 let manager = crate::extensions::background::BackgroundTaskManager::new();
149 let manager = std::sync::Arc::new(manager);
150 manager.set_store(store.clone());
151 Some(manager)
152 };
153
154 #[cfg(feature = "background")]
155 if let Some(manager) = &bg_manager {
156 if let Some(sender) = owner_inbox.clone() {
157 manager.set_owner_inbox(sender);
158 }
159 store
160 .install_plugin(crate::extensions::background::BackgroundTaskPlugin::new(
161 manager.clone(),
162 ))
163 .map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
164 }
165
166 let sub_run_id = uuid::Uuid::now_v7().to_string();
167 let mut run_identity = RunIdentity::new(
168 sub_run_id.clone(),
169 request.parent.parent_thread_id.clone(),
170 sub_run_id.clone(),
171 request.parent.parent_run_id.clone(),
172 request.agent_id.to_string(),
173 RunOrigin::Subagent,
174 );
175 if let Some(parent_tool_call_id) = request.parent.parent_tool_call_id.clone() {
176 run_identity = run_identity.with_parent_tool_call_id(parent_tool_call_id);
177 }
178
179 let result = run_agent_loop(AgentLoopParams {
180 resolver: request.resolver,
181 agent_id: request.agent_id,
182 runtime: &phase_runtime,
183 sink: request.sink,
184 checkpoint_store: None,
185 messages: request.messages,
186 run_identity,
187 cancellation_token: request.control.cancellation_token,
188 decision_rx: request.control.decision_rx,
189 overrides: None,
190 frontend_tools: Vec::new(),
191 inbox: Some(inbox_receiver),
192 is_continuation: false,
193 })
194 .await
195 .map_err(ExecutionBackendError::Loop)?;
196
197 let response = if result.response.is_empty() {
198 None
199 } else {
200 Some(result.response)
201 };
202 Ok(BackendRunResult {
203 agent_id: request.agent_id.to_string(),
204 status: map_termination(&result.termination),
205 termination: result.termination,
206 status_reason: None,
207 output: BackendRunOutput::from_text(response.clone()),
208 response,
209 steps: result.steps,
210 run_id: Some(sub_run_id),
211 inbox: owner_inbox,
212 state: None,
213 })
214 }
215
216 pub(crate) fn bind_local_execution_env(
217 store: &StateStore,
218 resolved: &ResolvedAgent,
219 owner_inbox: Option<&crate::inbox::InboxSender>,
220 ) -> Result<(), awaken_contract::StateError> {
221 if !resolved.env.key_registrations.is_empty() {
222 store.register_keys(&resolved.env.key_registrations)?;
223 }
224 for plugin in &resolved.env.plugins {
225 plugin.bind_runtime_context(store, owner_inbox);
226 }
227 Ok(())
228 }
229}
230
231fn map_termination(termination: &TerminationReason) -> BackendRunStatus {
232 match termination {
233 TerminationReason::NaturalEnd | TerminationReason::BehaviorRequested => {
234 BackendRunStatus::Completed
235 }
236 TerminationReason::Cancelled => BackendRunStatus::Cancelled,
237 TerminationReason::Stopped(reason) => {
238 BackendRunStatus::Failed(format!("stopped: {reason:?}"))
239 }
240 TerminationReason::Blocked(message) => {
241 BackendRunStatus::Failed(format!("blocked: {message}"))
242 }
243 TerminationReason::Suspended => BackendRunStatus::Suspended(None),
244 TerminationReason::Error(message) => BackendRunStatus::Failed(message.clone()),
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use std::sync::Arc;
252 use std::sync::Mutex;
253 use std::sync::atomic::{AtomicUsize, Ordering};
254
255 use async_trait::async_trait;
256 use awaken_contract::contract::content::ContentBlock;
257 use awaken_contract::contract::event_sink::NullEventSink;
258 use awaken_contract::contract::executor::{
259 InferenceExecutionError, InferenceRequest, LlmExecutor,
260 };
261 use awaken_contract::contract::inference::{StopReason, StreamResult, TokenUsage};
262 use awaken_contract::contract::message::{Message, ToolCall};
263 use awaken_contract::contract::tool::{
264 Tool, ToolCallContext, ToolDescriptor, ToolError, ToolOutput, ToolResult,
265 };
266 use serde_json::{Value, json};
267
268 use crate::backend::{
269 BackendControl, BackendDelegatePolicy, BackendDelegateRunRequest, BackendParentContext,
270 };
271 use crate::loop_runner::build_agent_env;
272 use crate::plugins::{Plugin, PluginDescriptor};
273 use crate::registry::{AgentResolver, ExecutionResolver, ResolvedExecution};
274
275 struct ScriptedLlm {
276 responses: Mutex<Vec<StreamResult>>,
277 }
278
279 impl ScriptedLlm {
280 fn new(responses: Vec<StreamResult>) -> Self {
281 Self {
282 responses: Mutex::new(responses),
283 }
284 }
285 }
286
287 #[async_trait]
288 impl LlmExecutor for ScriptedLlm {
289 async fn execute(
290 &self,
291 _request: InferenceRequest,
292 ) -> Result<StreamResult, InferenceExecutionError> {
293 let mut responses = self.responses.lock().unwrap();
294 assert!(!responses.is_empty(), "scripted LLM exhausted");
295 Ok(responses.remove(0))
296 }
297
298 fn name(&self) -> &str {
299 "scripted"
300 }
301 }
302
303 fn text_response(text: &str) -> StreamResult {
304 StreamResult {
305 content: vec![ContentBlock::text(text)],
306 tool_calls: vec![],
307 usage: Some(TokenUsage::default()),
308 stop_reason: Some(StopReason::EndTurn),
309 has_incomplete_tool_calls: false,
310 }
311 }
312
313 fn tool_call_response(text: &str, tool_name: &str, call_id: &str, args: Value) -> StreamResult {
314 StreamResult {
315 content: vec![ContentBlock::text(text)],
316 tool_calls: vec![ToolCall::new(call_id, tool_name, args)],
317 usage: Some(TokenUsage::default()),
318 stop_reason: Some(StopReason::ToolUse),
319 has_incomplete_tool_calls: false,
320 }
321 }
322
323 struct EchoTool;
324
325 #[async_trait]
326 impl Tool for EchoTool {
327 fn descriptor(&self) -> ToolDescriptor {
328 ToolDescriptor::new("echo", "echo", "Echoes input back")
329 }
330
331 async fn execute(
332 &self,
333 args: Value,
334 _ctx: &ToolCallContext,
335 ) -> Result<ToolOutput, ToolError> {
336 Ok(ToolResult::success_with_message("echo", args, "tool result should not win").into())
337 }
338 }
339
340 struct BindingPlugin {
341 bind_count: Arc<AtomicUsize>,
342 }
343
344 impl Plugin for BindingPlugin {
345 fn descriptor(&self) -> PluginDescriptor {
346 PluginDescriptor {
347 name: "binding-plugin",
348 }
349 }
350
351 fn bind_runtime_context(
352 &self,
353 _store: &crate::state::StateStore,
354 _owner_inbox: Option<&crate::inbox::InboxSender>,
355 ) {
356 self.bind_count.fetch_add(1, Ordering::SeqCst);
357 }
358 }
359
360 struct FixedResolver {
361 agent: ResolvedAgent,
362 plugins: Vec<Arc<dyn Plugin>>,
363 }
364
365 impl AgentResolver for FixedResolver {
366 fn resolve(&self, _agent_id: &str) -> Result<ResolvedAgent, crate::RuntimeError> {
367 let mut agent = self.agent.clone();
368 agent.env = build_agent_env(&self.plugins, &agent).expect("build env");
369 Ok(agent)
370 }
371 }
372
373 impl ExecutionResolver for FixedResolver {
374 fn resolve_execution(
375 &self,
376 agent_id: &str,
377 ) -> Result<ResolvedExecution, crate::RuntimeError> {
378 self.resolve(agent_id).map(ResolvedExecution::local)
379 }
380 }
381
382 #[tokio::test]
383 async fn execute_delegate_binds_plugin_runtime_context() {
384 let bind_count = Arc::new(AtomicUsize::new(0));
385 let plugin: Arc<dyn Plugin> = Arc::new(BindingPlugin {
386 bind_count: bind_count.clone(),
387 });
388 let resolver = FixedResolver {
389 agent: ResolvedAgent::new(
390 "delegate",
391 "m",
392 "sys",
393 Arc::new(ScriptedLlm::new(vec![text_response("delegated response")])),
394 ),
395 plugins: vec![plugin],
396 };
397
398 let result = LocalBackend::new()
399 .execute_delegate(BackendDelegateRunRequest {
400 agent_id: "delegate",
401 messages: vec![Message::user("hello")],
402 new_messages: vec![Message::user("hello")],
403 sink: Arc::new(NullEventSink),
404 resolver: &resolver,
405 parent: BackendParentContext {
406 parent_run_id: Some("parent-run".into()),
407 parent_thread_id: Some("parent-thread".into()),
408 parent_tool_call_id: Some("tool-1".into()),
409 },
410 control: BackendControl::default(),
411 policy: BackendDelegatePolicy::default(),
412 })
413 .await
414 .expect("delegate execution should succeed");
415
416 assert!(matches!(result.status, BackendRunStatus::Completed));
417 assert_eq!(bind_count.load(Ordering::SeqCst), 1);
418 }
419
420 #[tokio::test]
421 async fn execute_delegate_returns_final_non_tool_message_after_tool_output() {
422 let resolver = FixedResolver {
423 agent: ResolvedAgent::new(
424 "delegate",
425 "m",
426 "sys",
427 Arc::new(ScriptedLlm::new(vec![
428 tool_call_response(
429 "checking",
430 "echo",
431 "call-1",
432 json!({"message": "tool result should not win"}),
433 ),
434 text_response("final child answer"),
435 ])),
436 )
437 .with_tool(Arc::new(EchoTool)),
438 plugins: Vec::new(),
439 };
440
441 let result = LocalBackend::new()
442 .execute_delegate(BackendDelegateRunRequest {
443 agent_id: "delegate",
444 messages: vec![Message::user("delegate with a tool")],
445 new_messages: vec![Message::user("delegate with a tool")],
446 sink: Arc::new(NullEventSink),
447 resolver: &resolver,
448 parent: BackendParentContext {
449 parent_run_id: Some("parent-run".into()),
450 parent_thread_id: Some("parent-thread".into()),
451 parent_tool_call_id: Some("tool-1".into()),
452 },
453 control: BackendControl::default(),
454 policy: BackendDelegatePolicy::default(),
455 })
456 .await
457 .expect("delegate execution should succeed");
458
459 assert!(matches!(result.status, BackendRunStatus::Completed));
460 assert_eq!(result.response.as_deref(), Some("final child answer"));
461 assert_eq!(result.output.text.as_deref(), Some("final child answer"));
462 assert_eq!(result.steps, 2);
463 }
464}