agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext, ModelRequest,
10    PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use agents_core::agent::{
14    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle, ToolHandle,
15    ToolResponse,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction, HitlInterrupt};
18use agents_core::messaging::{
19    AgentMessage, MessageContent, MessageMetadata, MessageRole, ToolInvocation,
20};
21use agents_core::persistence::{Checkpointer, ThreadId};
22use agents_core::state::AgentStateSnapshot;
23use async_trait::async_trait;
24use serde_json::Value;
25use std::collections::{HashMap, HashSet};
26use std::sync::{Arc, RwLock};
27
28// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
29const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
30
31// (no streaming types in baseline)
32
33/// Core Deep Agent runtime implementation
34///
35/// This struct contains all the runtime state and behavior for a Deep Agent,
36/// including middleware management, tool execution, HITL support, and state persistence.
37pub struct DeepAgent {
38    descriptor: AgentDescriptor,
39    instructions: String,
40    planner: Arc<dyn PlannerHandle>,
41    middlewares: Vec<Arc<dyn AgentMiddleware>>,
42    base_tools: Vec<Arc<dyn ToolHandle>>,
43    state: Arc<RwLock<AgentStateSnapshot>>,
44    history: Arc<RwLock<Vec<AgentMessage>>>,
45    _summarization: Option<Arc<SummarizationMiddleware>>,
46    hitl: Option<Arc<HumanInLoopMiddleware>>,
47    pending_hitl: Arc<RwLock<Option<HitlPending>>>,
48    builtin_tools: Option<HashSet<String>>,
49    checkpointer: Option<Arc<dyn Checkpointer>>,
50}
51
52struct HitlPending {
53    tool_name: String,
54    payload: Value,
55    tool: Arc<dyn ToolHandle>,
56    message: AgentMessage,
57}
58
59impl DeepAgent {
60    fn collect_tools(&self) -> HashMap<String, Arc<dyn ToolHandle>> {
61        let mut tools: HashMap<String, Arc<dyn ToolHandle>> = HashMap::new();
62        for tool in &self.base_tools {
63            tools.insert(tool.name().to_string(), tool.clone());
64        }
65        for middleware in &self.middlewares {
66            for tool in middleware.tools() {
67                if self.should_include(tool.name()) {
68                    tools.insert(tool.name().to_string(), tool);
69                }
70            }
71        }
72        tools
73    }
74    // no streaming path in baseline
75
76    fn should_include(&self, name: &str) -> bool {
77        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
78        if !is_builtin {
79            return true;
80        }
81        match &self.builtin_tools {
82            None => true,
83            Some(selected) => selected.contains(name),
84        }
85    }
86
87    fn append_history(&self, message: AgentMessage) {
88        if let Ok(mut history) = self.history.write() {
89            history.push(message);
90        }
91    }
92
93    fn current_history(&self) -> Vec<AgentMessage> {
94        self.history.read().map(|h| h.clone()).unwrap_or_default()
95    }
96
97    /// Save the current agent state to the configured checkpointer.
98    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
99        if let Some(ref checkpointer) = self.checkpointer {
100            let state = self
101                .state
102                .read()
103                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
104                .clone();
105            checkpointer.save_state(thread_id, &state).await
106        } else {
107            tracing::warn!("Attempted to save state but no checkpointer is configured");
108            Ok(())
109        }
110    }
111
112    /// Load agent state from the configured checkpointer.
113    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
114        if let Some(ref checkpointer) = self.checkpointer {
115            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
116                *self
117                    .state
118                    .write()
119                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
120                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
121                Ok(true)
122            } else {
123                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
124                Ok(false)
125            }
126        } else {
127            tracing::warn!("Attempted to load state but no checkpointer is configured");
128            Ok(false)
129        }
130    }
131
132    /// Delete saved state for the specified thread.
133    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
134        if let Some(ref checkpointer) = self.checkpointer {
135            checkpointer.delete_thread(thread_id).await
136        } else {
137            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
138            Ok(())
139        }
140    }
141
142    /// List all threads with saved state.
143    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
144        if let Some(ref checkpointer) = self.checkpointer {
145            checkpointer.list_threads().await
146        } else {
147            Ok(Vec::new())
148        }
149    }
150
151    async fn execute_tool(
152        &self,
153        tool: Arc<dyn ToolHandle>,
154        tool_name: String,
155        payload: Value,
156    ) -> anyhow::Result<AgentMessage> {
157        let response = tool
158            .invoke(ToolInvocation {
159                tool_name: tool_name.clone(),
160                args: payload,
161                tool_call_id: None,
162            })
163            .await?;
164
165        Ok(self.apply_tool_response(response))
166    }
167
168    fn apply_tool_response(&self, response: ToolResponse) -> AgentMessage {
169        match response {
170            ToolResponse::Message(message) => {
171                self.append_history(message.clone());
172                message
173            }
174            ToolResponse::Command(command) => {
175                if let Ok(mut state) = self.state.write() {
176                    command.clone().apply_to(&mut state);
177                }
178                let mut final_message = None;
179                for message in &command.messages {
180                    self.append_history(message.clone());
181                    final_message = Some(message.clone());
182                }
183                final_message.unwrap_or_else(|| AgentMessage {
184                    role: MessageRole::Tool,
185                    content: MessageContent::Text("Command executed.".into()),
186                    metadata: Some(MessageMetadata {
187                        tool_call_id: None,
188                        cache_control: None,
189                    }),
190                })
191            }
192        }
193    }
194
195    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
196        self.pending_hitl.read().ok().and_then(|guard| {
197            guard.as_ref().map(|pending| {
198                AgentInterrupt::HumanInLoop(HitlInterrupt {
199                    tool_name: pending.tool_name.clone(),
200                    message: pending.message.clone(),
201                })
202            })
203        })
204    }
205
206    pub async fn resume_hitl(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
207        let pending = self
208            .pending_hitl
209            .write()
210            .ok()
211            .and_then(|mut guard| guard.take())
212            .ok_or_else(|| anyhow::anyhow!("No pending HITL action"))?;
213        match action {
214            HitlAction::Approve => {
215                let result = self
216                    .execute_tool(
217                        pending.tool.clone(),
218                        pending.tool_name.clone(),
219                        pending.payload.clone(),
220                    )
221                    .await?;
222                Ok(result)
223            }
224            HitlAction::Reject { reason } => {
225                let text =
226                    reason.unwrap_or_else(|| "Tool execution rejected by human reviewer.".into());
227                let message = AgentMessage {
228                    role: MessageRole::System,
229                    content: MessageContent::Text(text),
230                    metadata: None,
231                };
232                self.append_history(message.clone());
233                Ok(message)
234            }
235            HitlAction::Respond { message } => {
236                self.append_history(message.clone());
237                Ok(message)
238            }
239            HitlAction::Edit { action, args } => {
240                // Execute the edited tool/action with provided args
241                let tools = self.collect_tools();
242                if let Some(tool) = tools.get(&action).cloned() {
243                    let result = self.execute_tool(tool, action, args).await?;
244                    Ok(result)
245                } else {
246                    Ok(AgentMessage {
247                        role: MessageRole::System,
248                        content: MessageContent::Text(format!(
249                            "Edited tool '{}' not available",
250                            action
251                        )),
252                        metadata: None,
253                    })
254                }
255            }
256        }
257    }
258
259    /// Handle message from string input - converts string to AgentMessage internally
260    pub async fn handle_message(
261        &self,
262        input: impl AsRef<str>,
263        state: Arc<AgentStateSnapshot>,
264    ) -> anyhow::Result<AgentMessage> {
265        self.handle_message_with_metadata(input, None, state).await
266    }
267
268    /// Handle message from string input with metadata - converts string to AgentMessage internally
269    pub async fn handle_message_with_metadata(
270        &self,
271        input: impl AsRef<str>,
272        metadata: Option<MessageMetadata>,
273        state: Arc<AgentStateSnapshot>,
274    ) -> anyhow::Result<AgentMessage> {
275        let agent_message = AgentMessage {
276            role: MessageRole::User,
277            content: MessageContent::Text(input.as_ref().to_string()),
278            metadata,
279        };
280        self.handle_message_internal(agent_message, state).await
281    }
282
283    /// Internal method that contains the actual message handling logic
284    async fn handle_message_internal(
285        &self,
286        input: AgentMessage,
287        _state: Arc<AgentStateSnapshot>,
288    ) -> anyhow::Result<AgentMessage> {
289        self.append_history(input.clone());
290
291        let mut request = ModelRequest::new(&self.instructions, self.current_history());
292        let tools = self.collect_tools();
293        for middleware in &self.middlewares {
294            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
295            middleware.modify_model_request(&mut ctx).await?;
296        }
297
298        let context = PlannerContext {
299            history: request.messages.clone(),
300            system_prompt: request.system_prompt.clone(),
301        };
302        let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
303
304        let decision = self.planner.plan(context, state_snapshot).await?;
305
306        match decision.next_action {
307            PlannerAction::Respond { message } => {
308                self.append_history(message.clone());
309                Ok(message)
310            }
311            PlannerAction::CallTool { tool_name, payload } => {
312                if let Some(tool) = tools.get(&tool_name).cloned() {
313                    if let Some(hitl) = &self.hitl {
314                        if let Some(policy) = hitl.requires_approval(&tool_name) {
315                            let message_text = policy
316                                .note
317                                .clone()
318                                .unwrap_or_else(|| "Awaiting human approval.".into());
319                            let approval_message = AgentMessage {
320                                role: MessageRole::System,
321                                content: MessageContent::Text(format!(
322                                    "HITL_REQUIRED: Tool '{tool}' requires approval: {message}",
323                                    tool = tool_name,
324                                    message = message_text
325                                )),
326                                metadata: None,
327                            };
328                            let pending = HitlPending {
329                                tool_name: tool_name.clone(),
330                                payload: payload.clone(),
331                                tool: tool.clone(),
332                                message: approval_message.clone(),
333                            };
334                            if let Ok(mut guard) = self.pending_hitl.write() {
335                                *guard = Some(pending);
336                            }
337                            self.append_history(approval_message.clone());
338                            return Ok(approval_message);
339                        }
340                    }
341                    self.execute_tool(tool.clone(), tool_name.clone(), payload.clone())
342                        .await
343                } else {
344                    Ok(AgentMessage {
345                        role: MessageRole::Tool,
346                        content: MessageContent::Text(format!(
347                            "Tool '{tool}' not available",
348                            tool = tool_name
349                        )),
350                        metadata: Some(MessageMetadata {
351                            tool_call_id: None,
352                            cache_control: None,
353                        }),
354                    })
355                }
356            }
357            PlannerAction::Terminate => Ok(AgentMessage {
358                role: MessageRole::Agent,
359                content: MessageContent::Text("Terminating conversation.".into()),
360                metadata: Some(MessageMetadata {
361                    tool_call_id: None,
362                    cache_control: None,
363                }),
364            }),
365        }
366    }
367}
368
369#[async_trait]
370impl AgentHandle for DeepAgent {
371    async fn describe(&self) -> AgentDescriptor {
372        self.descriptor.clone()
373    }
374
375    async fn handle_message(
376        &self,
377        input: AgentMessage,
378        _state: Arc<AgentStateSnapshot>,
379    ) -> anyhow::Result<AgentMessage> {
380        self.handle_message_internal(input, _state).await
381    }
382}
383
384/// Create a deep agent from configuration - matches Python middleware assembly exactly
385///
386/// This function assembles the middleware stack in the same order as the Python SDK:
387/// planning → filesystem → subagents → summarization → prompt caching → optional HITL
388pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
389    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
390    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
391
392    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
393    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
394
395    // Prepare subagent registrations, optionally injecting a general-purpose subagent
396    let mut registrations = config.subagents.clone();
397    if config.auto_general_purpose {
398        let has_gp = registrations
399            .iter()
400            .any(|r| r.descriptor.name == "general-purpose");
401        if !has_gp {
402            // Create a subagent with inherited planner/tools and same instructions
403            let mut sub_cfg =
404                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
405                    .with_auto_general_purpose(false)
406                    .with_prompt_caching(config.enable_prompt_caching);
407            if let Some(ref selected) = config.builtin_tools {
408                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
409            }
410            if let Some(ref sum) = config.summarization {
411                sub_cfg = sub_cfg.with_summarization(sum.clone());
412            }
413            for t in &config.tools {
414                sub_cfg = sub_cfg.with_tool(t.clone());
415            }
416
417            let gp = create_deep_agent_from_config(sub_cfg);
418            registrations.push(SubAgentRegistration {
419                descriptor: SubAgentDescriptor {
420                    name: "general-purpose".into(),
421                    description: "Default reasoning agent".into(),
422                },
423                agent: Arc::new(gp),
424            });
425        }
426    }
427
428    let subagent = Arc::new(SubAgentMiddleware::new(registrations));
429    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
430    let summarization = config.summarization.as_ref().map(|cfg| {
431        Arc::new(SummarizationMiddleware::new(
432            cfg.messages_to_keep,
433            cfg.summary_note.clone(),
434        ))
435    });
436    let hitl = if config.tool_interrupts.is_empty() {
437        None
438    } else {
439        Some(Arc::new(HumanInLoopMiddleware::new(
440            config.tool_interrupts.clone(),
441        )))
442    };
443
444    // Assemble middleware stack in Python SDK order
445    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> =
446        vec![base_prompt, planning, filesystem, subagent];
447    if let Some(ref summary) = summarization {
448        middlewares.push(summary.clone());
449    }
450    if config.enable_prompt_caching {
451        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
452    }
453    if let Some(ref hitl_mw) = hitl {
454        middlewares.push(hitl_mw.clone());
455    }
456
457    DeepAgent {
458        descriptor: AgentDescriptor {
459            name: "deep-agent".into(),
460            version: "0.0.1".into(),
461            description: Some("Rust deep agent".into()),
462        },
463        instructions: config.instructions,
464        planner: config.planner,
465        middlewares,
466        base_tools: config.tools,
467        state,
468        history,
469        _summarization: summarization,
470        hitl,
471        pending_hitl: Arc::new(RwLock::new(None)),
472        builtin_tools: config.builtin_tools,
473        checkpointer: config.checkpointer,
474    }
475}