agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext, ModelRequest,
10    PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction, HitlInterrupt};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30// (no streaming types in baseline)
31
32/// Core Deep Agent runtime implementation
33///
34/// This struct contains all the runtime state and behavior for a Deep Agent,
35/// including middleware management, tool execution, HITL support, and state persistence.
36pub struct DeepAgent {
37    descriptor: AgentDescriptor,
38    instructions: String,
39    planner: Arc<dyn PlannerHandle>,
40    middlewares: Vec<Arc<dyn AgentMiddleware>>,
41    base_tools: Vec<ToolBox>,
42    state: Arc<RwLock<AgentStateSnapshot>>,
43    history: Arc<RwLock<Vec<AgentMessage>>>,
44    _summarization: Option<Arc<SummarizationMiddleware>>,
45    hitl: Option<Arc<HumanInLoopMiddleware>>,
46    pending_hitl: Arc<RwLock<Option<HitlPending>>>,
47    builtin_tools: Option<HashSet<String>>,
48    checkpointer: Option<Arc<dyn Checkpointer>>,
49}
50
51struct HitlPending {
52    tool_name: String,
53    payload: Value,
54    tool: ToolBox,
55    message: AgentMessage,
56}
57
58impl DeepAgent {
59    fn collect_tools(&self) -> HashMap<String, ToolBox> {
60        let mut tools: HashMap<String, ToolBox> = HashMap::new();
61        for tool in &self.base_tools {
62            tools.insert(tool.schema().name.clone(), tool.clone());
63        }
64        for middleware in &self.middlewares {
65            for tool in middleware.tools() {
66                let tool_name = tool.schema().name.clone();
67                if self.should_include(&tool_name) {
68                    tools.insert(tool_name, tool);
69                }
70            }
71        }
72        tools
73    }
74    // no streaming path in baseline
75
76    fn should_include(&self, name: &str) -> bool {
77        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
78        if !is_builtin {
79            return true;
80        }
81        match &self.builtin_tools {
82            None => true,
83            Some(selected) => selected.contains(name),
84        }
85    }
86
87    fn append_history(&self, message: AgentMessage) {
88        if let Ok(mut history) = self.history.write() {
89            history.push(message);
90        }
91    }
92
93    fn current_history(&self) -> Vec<AgentMessage> {
94        self.history.read().map(|h| h.clone()).unwrap_or_default()
95    }
96
97    /// Save the current agent state to the configured checkpointer.
98    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
99        if let Some(ref checkpointer) = self.checkpointer {
100            let state = self
101                .state
102                .read()
103                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
104                .clone();
105            checkpointer.save_state(thread_id, &state).await
106        } else {
107            tracing::warn!("Attempted to save state but no checkpointer is configured");
108            Ok(())
109        }
110    }
111
112    /// Load agent state from the configured checkpointer.
113    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
114        if let Some(ref checkpointer) = self.checkpointer {
115            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
116                *self
117                    .state
118                    .write()
119                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
120                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
121                Ok(true)
122            } else {
123                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
124                Ok(false)
125            }
126        } else {
127            tracing::warn!("Attempted to load state but no checkpointer is configured");
128            Ok(false)
129        }
130    }
131
132    /// Delete saved state for the specified thread.
133    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
134        if let Some(ref checkpointer) = self.checkpointer {
135            checkpointer.delete_thread(thread_id).await
136        } else {
137            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
138            Ok(())
139        }
140    }
141
142    /// List all threads with saved state.
143    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
144        if let Some(ref checkpointer) = self.checkpointer {
145            checkpointer.list_threads().await
146        } else {
147            Ok(Vec::new())
148        }
149    }
150
151    async fn execute_tool(
152        &self,
153        tool: ToolBox,
154        _tool_name: String,
155        payload: Value,
156    ) -> anyhow::Result<AgentMessage> {
157        let state_snapshot = self.state.read().unwrap().clone();
158        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
159
160        let result = tool.execute(payload, ctx).await?;
161        Ok(self.apply_tool_result(result))
162    }
163
164    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
165        match result {
166            ToolResult::Message(message) => {
167                self.append_history(message.clone());
168                message
169            }
170            ToolResult::WithStateUpdate {
171                message,
172                state_diff,
173            } => {
174                if let Ok(mut state) = self.state.write() {
175                    let command = agents_core::command::Command::with_state(state_diff);
176                    command.apply_to(&mut state);
177                }
178                self.append_history(message.clone());
179                message
180            }
181        }
182    }
183
184    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
185        self.pending_hitl.read().ok().and_then(|guard| {
186            guard.as_ref().map(|pending| {
187                AgentInterrupt::HumanInLoop(HitlInterrupt {
188                    tool_name: pending.tool_name.clone(),
189                    message: pending.message.clone(),
190                })
191            })
192        })
193    }
194
195    pub async fn resume_hitl(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
196        let pending = self
197            .pending_hitl
198            .write()
199            .ok()
200            .and_then(|mut guard| guard.take())
201            .ok_or_else(|| anyhow::anyhow!("No pending HITL action"))?;
202        match action {
203            HitlAction::Approve => {
204                let result = self
205                    .execute_tool(
206                        pending.tool.clone(),
207                        pending.tool_name.clone(),
208                        pending.payload.clone(),
209                    )
210                    .await?;
211                Ok(result)
212            }
213            HitlAction::Reject { reason } => {
214                let text =
215                    reason.unwrap_or_else(|| "Tool execution rejected by human reviewer.".into());
216                let message = AgentMessage {
217                    role: MessageRole::System,
218                    content: MessageContent::Text(text),
219                    metadata: None,
220                };
221                self.append_history(message.clone());
222                Ok(message)
223            }
224            HitlAction::Respond { message } => {
225                self.append_history(message.clone());
226                Ok(message)
227            }
228            HitlAction::Edit { action, args } => {
229                // Execute the edited tool/action with provided args
230                let tools = self.collect_tools();
231                if let Some(tool) = tools.get(&action).cloned() {
232                    let result = self.execute_tool(tool, action, args).await?;
233                    Ok(result)
234                } else {
235                    Ok(AgentMessage {
236                        role: MessageRole::System,
237                        content: MessageContent::Text(format!(
238                            "Edited tool '{}' not available",
239                            action
240                        )),
241                        metadata: None,
242                    })
243                }
244            }
245        }
246    }
247
248    /// Handle message from string input - converts string to AgentMessage internally
249    pub async fn handle_message(
250        &self,
251        input: impl AsRef<str>,
252        state: Arc<AgentStateSnapshot>,
253    ) -> anyhow::Result<AgentMessage> {
254        self.handle_message_with_metadata(input, None, state).await
255    }
256
257    /// Handle message from string input with metadata - converts string to AgentMessage internally
258    pub async fn handle_message_with_metadata(
259        &self,
260        input: impl AsRef<str>,
261        metadata: Option<MessageMetadata>,
262        state: Arc<AgentStateSnapshot>,
263    ) -> anyhow::Result<AgentMessage> {
264        let agent_message = AgentMessage {
265            role: MessageRole::User,
266            content: MessageContent::Text(input.as_ref().to_string()),
267            metadata,
268        };
269        self.handle_message_internal(agent_message, state).await
270    }
271
272    /// Internal method that contains the actual message handling logic
273    async fn handle_message_internal(
274        &self,
275        input: AgentMessage,
276        _state: Arc<AgentStateSnapshot>,
277    ) -> anyhow::Result<AgentMessage> {
278        self.append_history(input.clone());
279
280        let mut request = ModelRequest::new(&self.instructions, self.current_history());
281        let tools = self.collect_tools();
282        for middleware in &self.middlewares {
283            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
284            middleware.modify_model_request(&mut ctx).await?;
285        }
286
287        let context = PlannerContext {
288            history: request.messages.clone(),
289            system_prompt: request.system_prompt.clone(),
290        };
291        let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
292
293        let decision = self.planner.plan(context, state_snapshot).await?;
294
295        match decision.next_action {
296            PlannerAction::Respond { message } => {
297                self.append_history(message.clone());
298                Ok(message)
299            }
300            PlannerAction::CallTool { tool_name, payload } => {
301                if let Some(tool) = tools.get(&tool_name).cloned() {
302                    if let Some(hitl) = &self.hitl {
303                        if let Some(policy) = hitl.requires_approval(&tool_name) {
304                            let message_text = policy
305                                .note
306                                .clone()
307                                .unwrap_or_else(|| "Awaiting human approval.".into());
308                            let approval_message = AgentMessage {
309                                role: MessageRole::System,
310                                content: MessageContent::Text(format!(
311                                    "HITL_REQUIRED: Tool '{tool}' requires approval: {message}",
312                                    tool = tool_name,
313                                    message = message_text
314                                )),
315                                metadata: None,
316                            };
317                            let pending = HitlPending {
318                                tool_name: tool_name.clone(),
319                                payload: payload.clone(),
320                                tool: tool.clone(),
321                                message: approval_message.clone(),
322                            };
323                            if let Ok(mut guard) = self.pending_hitl.write() {
324                                *guard = Some(pending);
325                            }
326                            self.append_history(approval_message.clone());
327                            return Ok(approval_message);
328                        }
329                    }
330                    self.execute_tool(tool.clone(), tool_name.clone(), payload.clone())
331                        .await
332                } else {
333                    Ok(AgentMessage {
334                        role: MessageRole::Tool,
335                        content: MessageContent::Text(format!(
336                            "Tool '{tool}' not available",
337                            tool = tool_name
338                        )),
339                        metadata: Some(MessageMetadata {
340                            tool_call_id: None,
341                            cache_control: None,
342                        }),
343                    })
344                }
345            }
346            PlannerAction::Terminate => Ok(AgentMessage {
347                role: MessageRole::Agent,
348                content: MessageContent::Text("Terminating conversation.".into()),
349                metadata: Some(MessageMetadata {
350                    tool_call_id: None,
351                    cache_control: None,
352                }),
353            }),
354        }
355    }
356}
357
358#[async_trait]
359impl AgentHandle for DeepAgent {
360    async fn describe(&self) -> AgentDescriptor {
361        self.descriptor.clone()
362    }
363
364    async fn handle_message(
365        &self,
366        input: AgentMessage,
367        _state: Arc<AgentStateSnapshot>,
368    ) -> anyhow::Result<AgentMessage> {
369        self.handle_message_internal(input, _state).await
370    }
371
372    async fn handle_message_stream(
373        &self,
374        input: AgentMessage,
375        _state: Arc<AgentStateSnapshot>,
376    ) -> anyhow::Result<agents_core::agent::AgentStream> {
377        use crate::planner::LlmBackedPlanner;
378        use agents_core::llm::{LlmRequest, StreamChunk};
379
380        // Add input to history
381        self.append_history(input.clone());
382
383        // Build the request similar to handle_message_internal
384        let mut request = ModelRequest::new(&self.instructions, self.current_history());
385        let tools = self.collect_tools();
386
387        // Apply middleware modifications
388        for middleware in &self.middlewares {
389            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
390            middleware.modify_model_request(&mut ctx).await?;
391        }
392
393        // Convert ModelRequest to LlmRequest and add tools
394        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
395        let llm_request = LlmRequest {
396            system_prompt: request.system_prompt.clone(),
397            messages: request.messages.clone(),
398            tools: tool_schemas,
399        };
400
401        // Try to get the underlying LLM model for streaming
402        let planner_any = self.planner.as_any();
403
404        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
405            // We have an LlmBackedPlanner, use its model for streaming
406            let model = llm_planner.model().clone();
407            let stream = model.generate_stream(llm_request).await?;
408            Ok(stream)
409        } else {
410            // Fallback to non-streaming
411            let response = self.handle_message_internal(input, _state).await?;
412            Ok(Box::pin(futures::stream::once(async move {
413                Ok(StreamChunk::Done { message: response })
414            })))
415        }
416    }
417}
418
419/// Create a deep agent from configuration - matches Python middleware assembly exactly
420///
421/// This function assembles the middleware stack in the same order as the Python SDK:
422/// planning → filesystem → subagents → summarization → prompt caching → optional HITL
423pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
424    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
425    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
426
427    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
428    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
429
430    // Build sub-agents from configurations
431    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
432
433    // Build custom sub-agents from configs
434    for subagent_config in &config.subagent_configs {
435        // Determine the planner for this sub-agent
436        let sub_planner = if let Some(ref model) = subagent_config.model {
437            // Sub-agent has its own model - wrap it in a planner
438            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
439        } else {
440            // Inherit parent's planner
441            config.planner.clone()
442        };
443
444        // Create a DeepAgentConfig for this sub-agent
445        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
446
447        // Configure tools
448        if let Some(ref tools) = subagent_config.tools {
449            for tool in tools {
450                sub_cfg = sub_cfg.with_tool(tool.clone());
451            }
452        }
453
454        // Configure built-in tools
455        if let Some(ref builtin) = subagent_config.builtin_tools {
456            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
457        }
458
459        // Sub-agents should not have their own sub-agents
460        sub_cfg = sub_cfg.with_auto_general_purpose(false);
461
462        // Configure prompt caching
463        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
464
465        // Build the sub-agent recursively
466        let sub_agent = create_deep_agent_from_config(sub_cfg);
467
468        // Register the sub-agent
469        registrations.push(SubAgentRegistration {
470            descriptor: SubAgentDescriptor {
471                name: subagent_config.name.clone(),
472                description: subagent_config.description.clone(),
473            },
474            agent: Arc::new(sub_agent),
475        });
476    }
477
478    // Optionally inject a general-purpose subagent
479    if config.auto_general_purpose {
480        let has_gp = registrations
481            .iter()
482            .any(|r| r.descriptor.name == "general-purpose");
483        if !has_gp {
484            // Create a subagent with inherited planner/tools and same instructions
485            let mut sub_cfg =
486                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
487                    .with_auto_general_purpose(false)
488                    .with_prompt_caching(config.enable_prompt_caching);
489            if let Some(ref selected) = config.builtin_tools {
490                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
491            }
492            if let Some(ref sum) = config.summarization {
493                sub_cfg = sub_cfg.with_summarization(sum.clone());
494            }
495            for t in &config.tools {
496                sub_cfg = sub_cfg.with_tool(t.clone());
497            }
498
499            let gp = create_deep_agent_from_config(sub_cfg);
500            registrations.push(SubAgentRegistration {
501                descriptor: SubAgentDescriptor {
502                    name: "general-purpose".into(),
503                    description: "Default reasoning agent".into(),
504                },
505                agent: Arc::new(gp),
506            });
507        }
508    }
509
510    let subagent = Arc::new(SubAgentMiddleware::new(registrations));
511    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
512    let summarization = config.summarization.as_ref().map(|cfg| {
513        Arc::new(SummarizationMiddleware::new(
514            cfg.messages_to_keep,
515            cfg.summary_note.clone(),
516        ))
517    });
518    let hitl = if config.tool_interrupts.is_empty() {
519        None
520    } else {
521        Some(Arc::new(HumanInLoopMiddleware::new(
522            config.tool_interrupts.clone(),
523        )))
524    };
525
526    // Assemble middleware stack in Python SDK order
527    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> =
528        vec![base_prompt, planning, filesystem, subagent];
529    if let Some(ref summary) = summarization {
530        middlewares.push(summary.clone());
531    }
532    if config.enable_prompt_caching {
533        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
534    }
535    if let Some(ref hitl_mw) = hitl {
536        middlewares.push(hitl_mw.clone());
537    }
538
539    DeepAgent {
540        descriptor: AgentDescriptor {
541            name: "deep-agent".into(),
542            version: "0.0.1".into(),
543            description: Some("Rust deep agent".into()),
544        },
545        instructions: config.instructions,
546        planner: config.planner,
547        middlewares,
548        base_tools: config.tools,
549        state,
550        history,
551        _summarization: summarization,
552        hitl,
553        pending_hitl: Arc::new(RwLock::new(None)),
554        builtin_tools: config.builtin_tools,
555        checkpointer: config.checkpointer,
556    }
557}