Skip to main content

awaken_runtime/backend/
local.rs

1//! In-process execution backend backed by the standard loop runner.
2
3use async_trait::async_trait;
4use awaken_contract::contract::identity::{RunIdentity, RunOrigin};
5use awaken_contract::contract::lifecycle::TerminationReason;
6
7use crate::loop_runner::{AgentLoopParams, prepare_resume, run_agent_loop};
8use crate::registry::ResolvedAgent;
9use crate::state::StateStore;
10
11use super::{
12    BackendCapabilities, BackendDelegateContinuation, BackendDelegatePersistence,
13    BackendDelegateRunRequest, BackendRootRunRequest, BackendRunOutput, BackendRunResult,
14    BackendRunStatus, ExecutionBackend, ExecutionBackendError,
15};
16
17/// Local runtime backend for executing the standard loop in-process.
18pub struct LocalBackend;
19
20impl LocalBackend {
21    #[must_use]
22    pub fn new() -> Self {
23        Self
24    }
25}
26
27impl Default for LocalBackend {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33#[async_trait]
34impl ExecutionBackend for LocalBackend {
35    fn capabilities(&self) -> BackendCapabilities {
36        BackendCapabilities::full()
37    }
38
39    async fn execute_delegate(
40        &self,
41        request: BackendDelegateRunRequest<'_>,
42    ) -> Result<BackendRunResult, ExecutionBackendError> {
43        Self::execute_delegate(self, request).await
44    }
45
46    async fn execute_root(
47        &self,
48        request: BackendRootRunRequest<'_>,
49    ) -> Result<BackendRunResult, ExecutionBackendError> {
50        let phase_runtime = request
51            .local
52            .as_ref()
53            .map(|context| context.phase_runtime)
54            .ok_or_else(|| {
55                ExecutionBackendError::ExecutionFailed(
56                    "local root execution requires a phase runtime context".into(),
57                )
58            })?;
59        let run_identity = request.run_identity.clone();
60        let run_id = run_identity.run_id.clone();
61        if !request.decisions.is_empty() {
62            prepare_resume(phase_runtime.store(), request.decisions, None)
63                .map_err(crate::loop_runner::AgentLoopError::PhaseError)
64                .map_err(ExecutionBackendError::Loop)?;
65        }
66
67        let result = run_agent_loop(AgentLoopParams {
68            resolver: request.resolver,
69            agent_id: request.agent_id,
70            runtime: phase_runtime,
71            sink: request.sink,
72            checkpoint_store: request.checkpoint_store,
73            messages: request.messages,
74            run_identity,
75            cancellation_token: request.control.cancellation_token,
76            decision_rx: request.control.decision_rx,
77            overrides: request.overrides,
78            frontend_tools: request.frontend_tools,
79            inbox: request.inbox,
80            is_continuation: request.is_continuation,
81        })
82        .await
83        .map_err(ExecutionBackendError::Loop)?;
84
85        let response = if result.response.is_empty() {
86            None
87        } else {
88            Some(result.response)
89        };
90        Ok(BackendRunResult {
91            agent_id: request.agent_id.to_string(),
92            status: map_termination(&result.termination),
93            termination: result.termination,
94            status_reason: None,
95            output: BackendRunOutput::from_text(response.clone()),
96            response,
97            steps: result.steps,
98            run_id: Some(run_id),
99            inbox: None,
100            state: None,
101        })
102    }
103}
104
105impl LocalBackend {
106    pub async fn execute_delegate(
107        &self,
108        request: BackendDelegateRunRequest<'_>,
109    ) -> Result<BackendRunResult, ExecutionBackendError> {
110        match (request.policy.persistence, request.policy.continuation) {
111            (BackendDelegatePersistence::Ephemeral, BackendDelegateContinuation::Disabled) => {}
112        }
113        let resolved = request
114            .resolver
115            .resolve(request.agent_id)
116            .map_err(|error| {
117                ExecutionBackendError::AgentNotFound(format!(
118                    "failed to resolve agent '{}': {error}",
119                    request.agent_id
120                ))
121            })?;
122
123        let store = crate::state::StateStore::new();
124        store
125            .install_plugin(crate::loop_runner::LoopStatePlugin)
126            .map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
127
128        let phase_runtime = crate::phase::PhaseRuntime::new(store.clone())
129            .map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
130
131        let (owner_inbox, inbox_receiver) = {
132            let (sender, receiver) = crate::inbox::inbox_channel();
133            (Some(sender), receiver)
134        };
135
136        Self::bind_local_execution_env(&store, &resolved, owner_inbox.as_ref())
137            .map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
138
139        #[cfg(feature = "background")]
140        let bg_manager = if resolved
141            .env
142            .plugins
143            .iter()
144            .any(|plugin| plugin.descriptor().name == "background_tasks")
145        {
146            None
147        } else {
148            let manager = crate::extensions::background::BackgroundTaskManager::new();
149            let manager = std::sync::Arc::new(manager);
150            manager.set_store(store.clone());
151            Some(manager)
152        };
153
154        #[cfg(feature = "background")]
155        if let Some(manager) = &bg_manager {
156            if let Some(sender) = owner_inbox.clone() {
157                manager.set_owner_inbox(sender);
158            }
159            store
160                .install_plugin(crate::extensions::background::BackgroundTaskPlugin::new(
161                    manager.clone(),
162                ))
163                .map_err(|error| ExecutionBackendError::ExecutionFailed(error.to_string()))?;
164        }
165
166        let sub_run_id = uuid::Uuid::now_v7().to_string();
167        let mut run_identity = RunIdentity::new(
168            sub_run_id.clone(),
169            request.parent.parent_thread_id.clone(),
170            sub_run_id.clone(),
171            request.parent.parent_run_id.clone(),
172            request.agent_id.to_string(),
173            RunOrigin::Subagent,
174        );
175        if let Some(parent_tool_call_id) = request.parent.parent_tool_call_id.clone() {
176            run_identity = run_identity.with_parent_tool_call_id(parent_tool_call_id);
177        }
178
179        let result = run_agent_loop(AgentLoopParams {
180            resolver: request.resolver,
181            agent_id: request.agent_id,
182            runtime: &phase_runtime,
183            sink: request.sink,
184            checkpoint_store: None,
185            messages: request.messages,
186            run_identity,
187            cancellation_token: request.control.cancellation_token,
188            decision_rx: request.control.decision_rx,
189            overrides: None,
190            frontend_tools: Vec::new(),
191            inbox: Some(inbox_receiver),
192            is_continuation: false,
193        })
194        .await
195        .map_err(ExecutionBackendError::Loop)?;
196
197        let response = if result.response.is_empty() {
198            None
199        } else {
200            Some(result.response)
201        };
202        Ok(BackendRunResult {
203            agent_id: request.agent_id.to_string(),
204            status: map_termination(&result.termination),
205            termination: result.termination,
206            status_reason: None,
207            output: BackendRunOutput::from_text(response.clone()),
208            response,
209            steps: result.steps,
210            run_id: Some(sub_run_id),
211            inbox: owner_inbox,
212            state: None,
213        })
214    }
215
216    pub(crate) fn bind_local_execution_env(
217        store: &StateStore,
218        resolved: &ResolvedAgent,
219        owner_inbox: Option<&crate::inbox::InboxSender>,
220    ) -> Result<(), awaken_contract::StateError> {
221        if !resolved.env.key_registrations.is_empty() {
222            store.register_keys(&resolved.env.key_registrations)?;
223        }
224        for plugin in &resolved.env.plugins {
225            plugin.bind_runtime_context(store, owner_inbox);
226        }
227        Ok(())
228    }
229}
230
231fn map_termination(termination: &TerminationReason) -> BackendRunStatus {
232    match termination {
233        TerminationReason::NaturalEnd | TerminationReason::BehaviorRequested => {
234            BackendRunStatus::Completed
235        }
236        TerminationReason::Cancelled => BackendRunStatus::Cancelled,
237        TerminationReason::Stopped(reason) => {
238            BackendRunStatus::Failed(format!("stopped: {reason:?}"))
239        }
240        TerminationReason::Blocked(message) => {
241            BackendRunStatus::Failed(format!("blocked: {message}"))
242        }
243        TerminationReason::Suspended => BackendRunStatus::Suspended(None),
244        TerminationReason::Error(message) => BackendRunStatus::Failed(message.clone()),
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use std::sync::Arc;
252    use std::sync::Mutex;
253    use std::sync::atomic::{AtomicUsize, Ordering};
254
255    use async_trait::async_trait;
256    use awaken_contract::contract::content::ContentBlock;
257    use awaken_contract::contract::event_sink::NullEventSink;
258    use awaken_contract::contract::executor::{
259        InferenceExecutionError, InferenceRequest, LlmExecutor,
260    };
261    use awaken_contract::contract::inference::{StopReason, StreamResult, TokenUsage};
262    use awaken_contract::contract::message::{Message, ToolCall};
263    use awaken_contract::contract::tool::{
264        Tool, ToolCallContext, ToolDescriptor, ToolError, ToolOutput, ToolResult,
265    };
266    use serde_json::{Value, json};
267
268    use crate::backend::{
269        BackendControl, BackendDelegatePolicy, BackendDelegateRunRequest, BackendParentContext,
270    };
271    use crate::loop_runner::build_agent_env;
272    use crate::plugins::{Plugin, PluginDescriptor};
273    use crate::registry::{AgentResolver, ExecutionResolver, ResolvedExecution};
274
275    struct ScriptedLlm {
276        responses: Mutex<Vec<StreamResult>>,
277    }
278
279    impl ScriptedLlm {
280        fn new(responses: Vec<StreamResult>) -> Self {
281            Self {
282                responses: Mutex::new(responses),
283            }
284        }
285    }
286
287    #[async_trait]
288    impl LlmExecutor for ScriptedLlm {
289        async fn execute(
290            &self,
291            _request: InferenceRequest,
292        ) -> Result<StreamResult, InferenceExecutionError> {
293            let mut responses = self.responses.lock().unwrap();
294            assert!(!responses.is_empty(), "scripted LLM exhausted");
295            Ok(responses.remove(0))
296        }
297
298        fn name(&self) -> &str {
299            "scripted"
300        }
301    }
302
303    fn text_response(text: &str) -> StreamResult {
304        StreamResult {
305            content: vec![ContentBlock::text(text)],
306            tool_calls: vec![],
307            usage: Some(TokenUsage::default()),
308            stop_reason: Some(StopReason::EndTurn),
309            has_incomplete_tool_calls: false,
310        }
311    }
312
313    fn tool_call_response(text: &str, tool_name: &str, call_id: &str, args: Value) -> StreamResult {
314        StreamResult {
315            content: vec![ContentBlock::text(text)],
316            tool_calls: vec![ToolCall::new(call_id, tool_name, args)],
317            usage: Some(TokenUsage::default()),
318            stop_reason: Some(StopReason::ToolUse),
319            has_incomplete_tool_calls: false,
320        }
321    }
322
323    struct EchoTool;
324
325    #[async_trait]
326    impl Tool for EchoTool {
327        fn descriptor(&self) -> ToolDescriptor {
328            ToolDescriptor::new("echo", "echo", "Echoes input back")
329        }
330
331        async fn execute(
332            &self,
333            args: Value,
334            _ctx: &ToolCallContext,
335        ) -> Result<ToolOutput, ToolError> {
336            Ok(ToolResult::success_with_message("echo", args, "tool result should not win").into())
337        }
338    }
339
340    struct BindingPlugin {
341        bind_count: Arc<AtomicUsize>,
342    }
343
344    impl Plugin for BindingPlugin {
345        fn descriptor(&self) -> PluginDescriptor {
346            PluginDescriptor {
347                name: "binding-plugin",
348            }
349        }
350
351        fn bind_runtime_context(
352            &self,
353            _store: &crate::state::StateStore,
354            _owner_inbox: Option<&crate::inbox::InboxSender>,
355        ) {
356            self.bind_count.fetch_add(1, Ordering::SeqCst);
357        }
358    }
359
360    struct FixedResolver {
361        agent: ResolvedAgent,
362        plugins: Vec<Arc<dyn Plugin>>,
363    }
364
365    impl AgentResolver for FixedResolver {
366        fn resolve(&self, _agent_id: &str) -> Result<ResolvedAgent, crate::RuntimeError> {
367            let mut agent = self.agent.clone();
368            agent.env = build_agent_env(&self.plugins, &agent).expect("build env");
369            Ok(agent)
370        }
371    }
372
373    impl ExecutionResolver for FixedResolver {
374        fn resolve_execution(
375            &self,
376            agent_id: &str,
377        ) -> Result<ResolvedExecution, crate::RuntimeError> {
378            self.resolve(agent_id).map(ResolvedExecution::local)
379        }
380    }
381
382    #[tokio::test]
383    async fn execute_delegate_binds_plugin_runtime_context() {
384        let bind_count = Arc::new(AtomicUsize::new(0));
385        let plugin: Arc<dyn Plugin> = Arc::new(BindingPlugin {
386            bind_count: bind_count.clone(),
387        });
388        let resolver = FixedResolver {
389            agent: ResolvedAgent::new(
390                "delegate",
391                "m",
392                "sys",
393                Arc::new(ScriptedLlm::new(vec![text_response("delegated response")])),
394            ),
395            plugins: vec![plugin],
396        };
397
398        let result = LocalBackend::new()
399            .execute_delegate(BackendDelegateRunRequest {
400                agent_id: "delegate",
401                messages: vec![Message::user("hello")],
402                new_messages: vec![Message::user("hello")],
403                sink: Arc::new(NullEventSink),
404                resolver: &resolver,
405                parent: BackendParentContext {
406                    parent_run_id: Some("parent-run".into()),
407                    parent_thread_id: Some("parent-thread".into()),
408                    parent_tool_call_id: Some("tool-1".into()),
409                },
410                control: BackendControl::default(),
411                policy: BackendDelegatePolicy::default(),
412            })
413            .await
414            .expect("delegate execution should succeed");
415
416        assert!(matches!(result.status, BackendRunStatus::Completed));
417        assert_eq!(bind_count.load(Ordering::SeqCst), 1);
418    }
419
420    #[tokio::test]
421    async fn execute_delegate_returns_final_non_tool_message_after_tool_output() {
422        let resolver = FixedResolver {
423            agent: ResolvedAgent::new(
424                "delegate",
425                "m",
426                "sys",
427                Arc::new(ScriptedLlm::new(vec![
428                    tool_call_response(
429                        "checking",
430                        "echo",
431                        "call-1",
432                        json!({"message": "tool result should not win"}),
433                    ),
434                    text_response("final child answer"),
435                ])),
436            )
437            .with_tool(Arc::new(EchoTool)),
438            plugins: Vec::new(),
439        };
440
441        let result = LocalBackend::new()
442            .execute_delegate(BackendDelegateRunRequest {
443                agent_id: "delegate",
444                messages: vec![Message::user("delegate with a tool")],
445                new_messages: vec![Message::user("delegate with a tool")],
446                sink: Arc::new(NullEventSink),
447                resolver: &resolver,
448                parent: BackendParentContext {
449                    parent_run_id: Some("parent-run".into()),
450                    parent_thread_id: Some("parent-thread".into()),
451                    parent_tool_call_id: Some("tool-1".into()),
452                },
453                control: BackendControl::default(),
454                policy: BackendDelegatePolicy::default(),
455            })
456            .await
457            .expect("delegate execution should succeed");
458
459        assert!(matches!(result.status, BackendRunStatus::Completed));
460        assert_eq!(result.response.as_deref(), Some("final child answer"));
461        assert_eq!(result.output.text.as_deref(), Some("final child answer"));
462        assert_eq!(result.steps, 2);
463    }
464}