agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10    ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction, HitlInterrupt};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30// (no streaming types in baseline)
31
32/// Core Deep Agent runtime implementation
33///
34/// This struct contains all the runtime state and behavior for a Deep Agent,
35/// including middleware management, tool execution, HITL support, and state persistence.
36pub struct DeepAgent {
37    descriptor: AgentDescriptor,
38    instructions: String,
39    planner: Arc<dyn PlannerHandle>,
40    middlewares: Vec<Arc<dyn AgentMiddleware>>,
41    base_tools: Vec<ToolBox>,
42    state: Arc<RwLock<AgentStateSnapshot>>,
43    history: Arc<RwLock<Vec<AgentMessage>>>,
44    _summarization: Option<Arc<SummarizationMiddleware>>,
45    hitl: Option<Arc<HumanInLoopMiddleware>>,
46    pending_hitl: Arc<RwLock<Option<HitlPending>>>,
47    builtin_tools: Option<HashSet<String>>,
48    checkpointer: Option<Arc<dyn Checkpointer>>,
49}
50
51struct HitlPending {
52    tool_name: String,
53    payload: Value,
54    tool: ToolBox,
55    message: AgentMessage,
56}
57
58impl DeepAgent {
59    fn collect_tools(&self) -> HashMap<String, ToolBox> {
60        let mut tools: HashMap<String, ToolBox> = HashMap::new();
61        for tool in &self.base_tools {
62            tools.insert(tool.schema().name.clone(), tool.clone());
63        }
64        for middleware in &self.middlewares {
65            for tool in middleware.tools() {
66                let tool_name = tool.schema().name.clone();
67                if self.should_include(&tool_name) {
68                    tools.insert(tool_name, tool);
69                }
70            }
71        }
72        tools
73    }
74    // no streaming path in baseline
75
76    fn should_include(&self, name: &str) -> bool {
77        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
78        if !is_builtin {
79            return true;
80        }
81        match &self.builtin_tools {
82            None => true,
83            Some(selected) => selected.contains(name),
84        }
85    }
86
87    fn append_history(&self, message: AgentMessage) {
88        if let Ok(mut history) = self.history.write() {
89            history.push(message);
90        }
91    }
92
93    fn current_history(&self) -> Vec<AgentMessage> {
94        self.history.read().map(|h| h.clone()).unwrap_or_default()
95    }
96
97    /// Save the current agent state to the configured checkpointer.
98    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
99        if let Some(ref checkpointer) = self.checkpointer {
100            let state = self
101                .state
102                .read()
103                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
104                .clone();
105            checkpointer.save_state(thread_id, &state).await
106        } else {
107            tracing::warn!("Attempted to save state but no checkpointer is configured");
108            Ok(())
109        }
110    }
111
112    /// Load agent state from the configured checkpointer.
113    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
114        if let Some(ref checkpointer) = self.checkpointer {
115            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
116                *self
117                    .state
118                    .write()
119                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
120                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
121                Ok(true)
122            } else {
123                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
124                Ok(false)
125            }
126        } else {
127            tracing::warn!("Attempted to load state but no checkpointer is configured");
128            Ok(false)
129        }
130    }
131
132    /// Delete saved state for the specified thread.
133    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
134        if let Some(ref checkpointer) = self.checkpointer {
135            checkpointer.delete_thread(thread_id).await
136        } else {
137            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
138            Ok(())
139        }
140    }
141
142    /// List all threads with saved state.
143    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
144        if let Some(ref checkpointer) = self.checkpointer {
145            checkpointer.list_threads().await
146        } else {
147            Ok(Vec::new())
148        }
149    }
150
151    async fn execute_tool(
152        &self,
153        tool: ToolBox,
154        _tool_name: String,
155        payload: Value,
156    ) -> anyhow::Result<AgentMessage> {
157        let state_snapshot = self.state.read().unwrap().clone();
158        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
159
160        let result = tool.execute(payload, ctx).await?;
161        Ok(self.apply_tool_result(result))
162    }
163
164    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
165        match result {
166            ToolResult::Message(message) => {
167                // Tool results are not added to conversation history
168                // Only the final LLM response after tool execution is added
169                message
170            }
171            ToolResult::WithStateUpdate {
172                message,
173                state_diff,
174            } => {
175                if let Ok(mut state) = self.state.write() {
176                    let command = agents_core::command::Command::with_state(state_diff);
177                    command.apply_to(&mut state);
178                }
179                // Tool results are not added to conversation history
180                // Only the final LLM response after tool execution is added
181                message
182            }
183        }
184    }
185
186    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
187        self.pending_hitl.read().ok().and_then(|guard| {
188            guard.as_ref().map(|pending| {
189                AgentInterrupt::HumanInLoop(HitlInterrupt {
190                    tool_name: pending.tool_name.clone(),
191                    message: pending.message.clone(),
192                })
193            })
194        })
195    }
196
197    pub async fn resume_hitl(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
198        let pending = self
199            .pending_hitl
200            .write()
201            .ok()
202            .and_then(|mut guard| guard.take())
203            .ok_or_else(|| anyhow::anyhow!("No pending HITL action"))?;
204        match action {
205            HitlAction::Approve => {
206                let result = self
207                    .execute_tool(
208                        pending.tool.clone(),
209                        pending.tool_name.clone(),
210                        pending.payload.clone(),
211                    )
212                    .await?;
213                Ok(result)
214            }
215            HitlAction::Reject { reason } => {
216                let text =
217                    reason.unwrap_or_else(|| "Tool execution rejected by human reviewer.".into());
218                let message = AgentMessage {
219                    role: MessageRole::System,
220                    content: MessageContent::Text(text),
221                    metadata: None,
222                };
223                self.append_history(message.clone());
224                Ok(message)
225            }
226            HitlAction::Respond { message } => {
227                self.append_history(message.clone());
228                Ok(message)
229            }
230            HitlAction::Edit { action, args } => {
231                // Execute the edited tool/action with provided args
232                let tools = self.collect_tools();
233                if let Some(tool) = tools.get(&action).cloned() {
234                    let result = self.execute_tool(tool, action, args).await?;
235                    Ok(result)
236                } else {
237                    Ok(AgentMessage {
238                        role: MessageRole::System,
239                        content: MessageContent::Text(format!(
240                            "Edited tool '{}' not available",
241                            action
242                        )),
243                        metadata: None,
244                    })
245                }
246            }
247        }
248    }
249
250    /// Handle message from string input - converts string to AgentMessage internally
251    pub async fn handle_message(
252        &self,
253        input: impl AsRef<str>,
254        state: Arc<AgentStateSnapshot>,
255    ) -> anyhow::Result<AgentMessage> {
256        self.handle_message_with_metadata(input, None, state).await
257    }
258
259    /// Handle message from string input with metadata - converts string to AgentMessage internally
260    pub async fn handle_message_with_metadata(
261        &self,
262        input: impl AsRef<str>,
263        metadata: Option<MessageMetadata>,
264        state: Arc<AgentStateSnapshot>,
265    ) -> anyhow::Result<AgentMessage> {
266        let agent_message = AgentMessage {
267            role: MessageRole::User,
268            content: MessageContent::Text(input.as_ref().to_string()),
269            metadata,
270        };
271        self.handle_message_internal(agent_message, state).await
272    }
273
274    /// Internal method that contains the actual message handling logic
275    async fn handle_message_internal(
276        &self,
277        input: AgentMessage,
278        _state: Arc<AgentStateSnapshot>,
279    ) -> anyhow::Result<AgentMessage> {
280        self.append_history(input.clone());
281
282        // Build request with current history
283        let mut request = ModelRequest::new(&self.instructions, self.current_history());
284        let tools = self.collect_tools();
285        for middleware in &self.middlewares {
286            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
287            middleware.modify_model_request(&mut ctx).await?;
288        }
289
290        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
291        let context = PlannerContext {
292            history: request.messages.clone(),
293            system_prompt: request.system_prompt.clone(),
294            tools: tool_schemas,
295        };
296        let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
297
298        // Ask LLM what to do
299        let decision = self.planner.plan(context, state_snapshot).await?;
300
301        match decision.next_action {
302            PlannerAction::Respond { message } => {
303                self.append_history(message.clone());
304                Ok(message)
305            }
306            PlannerAction::CallTool { tool_name, payload } => {
307                if let Some(tool) = tools.get(&tool_name).cloned() {
308                    // Check HITL
309                    if let Some(hitl) = &self.hitl {
310                        if let Some(policy) = hitl.requires_approval(&tool_name) {
311                            let message_text = policy
312                                .note
313                                .clone()
314                                .unwrap_or_else(|| "Awaiting human approval.".into());
315                            let approval_message = AgentMessage {
316                                role: MessageRole::System,
317                                content: MessageContent::Text(format!(
318                                    "HITL_REQUIRED: Tool '{tool}' requires approval: {message}",
319                                    tool = tool_name,
320                                    message = message_text
321                                )),
322                                metadata: None,
323                            };
324                            let pending = HitlPending {
325                                tool_name: tool_name.clone(),
326                                payload: payload.clone(),
327                                tool: tool.clone(),
328                                message: approval_message.clone(),
329                            };
330                            if let Ok(mut guard) = self.pending_hitl.write() {
331                                *guard = Some(pending);
332                            }
333                            self.append_history(approval_message.clone());
334                            return Ok(approval_message);
335                        }
336                    }
337
338                    // Execute tool
339                    let start_time = std::time::Instant::now();
340                    tracing::warn!(
341                        "⚙️ EXECUTING TOOL: {} with payload: {}",
342                        tool_name,
343                        serde_json::to_string(&payload)
344                            .unwrap_or_else(|_| "invalid json".to_string())
345                    );
346
347                    let result = self
348                        .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
349                        .await;
350
351                    let duration = start_time.elapsed();
352                    match result {
353                        Ok(tool_result_message) => {
354                            let content_preview = match &tool_result_message.content {
355                                MessageContent::Text(t) => {
356                                    if t.len() > 100 {
357                                        format!("{}... ({} chars)", &t[..100], t.len())
358                                    } else {
359                                        t.clone()
360                                    }
361                                }
362                                MessageContent::Json(v) => {
363                                    format!("JSON: {} bytes", v.to_string().len())
364                                }
365                            };
366                            tracing::warn!(
367                                "✅ TOOL COMPLETED: {} in {:?} - Result: {}",
368                                tool_name,
369                                duration,
370                                content_preview
371                            );
372
373                            // Tool executed successfully - now respond naturally
374                            // Create a natural response incorporating the tool result
375                            let natural_response = match &tool_result_message.content {
376                                MessageContent::Text(text) => {
377                                    if text.is_empty() {
378                                        format!(
379                                            "I've executed the {} tool successfully.",
380                                            tool_name
381                                        )
382                                    } else {
383                                        // Include the tool result in the response
384                                        text.clone()
385                                    }
386                                }
387                                MessageContent::Json(json) => {
388                                    format!("Tool result: {}", json)
389                                }
390                            };
391
392                            let response = AgentMessage {
393                                role: MessageRole::Agent,
394                                content: MessageContent::Text(natural_response),
395                                metadata: None,
396                            };
397                            self.append_history(response.clone());
398                            Ok(response)
399                        }
400                        Err(e) => {
401                            tracing::error!(
402                                "❌ TOOL FAILED: {} in {:?} - Error: {}",
403                                tool_name,
404                                duration,
405                                e
406                            );
407
408                            // Tool failed - respond with error message
409                            let error_response = AgentMessage {
410                                role: MessageRole::Agent,
411                                content: MessageContent::Text(format!(
412                                    "I encountered an error while executing {}: {}",
413                                    tool_name, e
414                                )),
415                                metadata: None,
416                            };
417                            self.append_history(error_response.clone());
418                            Ok(error_response)
419                        }
420                    }
421                } else {
422                    // Tool not found
423                    tracing::warn!("⚠️ Tool '{}' not found", tool_name);
424                    let error_response = AgentMessage {
425                        role: MessageRole::Agent,
426                        content: MessageContent::Text(format!(
427                            "I don't have access to the '{}' tool.",
428                            tool_name
429                        )),
430                        metadata: None,
431                    };
432                    self.append_history(error_response.clone());
433                    Ok(error_response)
434                }
435            }
436            PlannerAction::Terminate => {
437                tracing::debug!("🛑 Agent terminated");
438                let message = AgentMessage {
439                    role: MessageRole::Agent,
440                    content: MessageContent::Text("Task completed.".into()),
441                    metadata: None,
442                };
443                self.append_history(message.clone());
444                Ok(message)
445            }
446        }
447    }
448}
449
450#[async_trait]
451impl AgentHandle for DeepAgent {
452    async fn describe(&self) -> AgentDescriptor {
453        self.descriptor.clone()
454    }
455
456    async fn handle_message(
457        &self,
458        input: AgentMessage,
459        _state: Arc<AgentStateSnapshot>,
460    ) -> anyhow::Result<AgentMessage> {
461        self.handle_message_internal(input, _state).await
462    }
463
464    async fn handle_message_stream(
465        &self,
466        input: AgentMessage,
467        _state: Arc<AgentStateSnapshot>,
468    ) -> anyhow::Result<agents_core::agent::AgentStream> {
469        use crate::planner::LlmBackedPlanner;
470        use agents_core::llm::{LlmRequest, StreamChunk};
471
472        // Add input to history
473        self.append_history(input.clone());
474
475        // Build the request similar to handle_message_internal
476        let mut request = ModelRequest::new(&self.instructions, self.current_history());
477        let tools = self.collect_tools();
478
479        // Apply middleware modifications
480        for middleware in &self.middlewares {
481            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
482            middleware.modify_model_request(&mut ctx).await?;
483        }
484
485        // Convert ModelRequest to LlmRequest and add tools
486        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
487        let llm_request = LlmRequest {
488            system_prompt: request.system_prompt.clone(),
489            messages: request.messages.clone(),
490            tools: tool_schemas,
491        };
492
493        // Try to get the underlying LLM model for streaming
494        let planner_any = self.planner.as_any();
495
496        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
497            // We have an LlmBackedPlanner, use its model for streaming
498            let model = llm_planner.model().clone();
499            let stream = model.generate_stream(llm_request).await?;
500            Ok(stream)
501        } else {
502            // Fallback to non-streaming
503            let response = self.handle_message_internal(input, _state).await?;
504            Ok(Box::pin(futures::stream::once(async move {
505                Ok(StreamChunk::Done { message: response })
506            })))
507        }
508    }
509}
510
511/// Create a deep agent from configuration - matches Python middleware assembly exactly
512///
513/// This function assembles the middleware stack in the same order as the Python SDK:
514/// planning → filesystem → subagents → summarization → prompt caching → optional HITL
515pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
516    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
517    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
518
519    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
520    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
521
522    // Build sub-agents from configurations
523    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
524
525    // Build custom sub-agents from configs
526    for subagent_config in &config.subagent_configs {
527        // Determine the planner for this sub-agent
528        let sub_planner = if let Some(ref model) = subagent_config.model {
529            // Sub-agent has its own model - wrap it in a planner
530            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
531        } else {
532            // Inherit parent's planner
533            config.planner.clone()
534        };
535
536        // Create a DeepAgentConfig for this sub-agent
537        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
538
539        // Configure tools
540        if let Some(ref tools) = subagent_config.tools {
541            for tool in tools {
542                sub_cfg = sub_cfg.with_tool(tool.clone());
543            }
544        }
545
546        // Configure built-in tools
547        if let Some(ref builtin) = subagent_config.builtin_tools {
548            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
549        }
550
551        // Sub-agents should not have their own sub-agents
552        sub_cfg = sub_cfg.with_auto_general_purpose(false);
553
554        // Configure prompt caching
555        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
556
557        // Build the sub-agent recursively
558        let sub_agent = create_deep_agent_from_config(sub_cfg);
559
560        // Register the sub-agent
561        registrations.push(SubAgentRegistration {
562            descriptor: SubAgentDescriptor {
563                name: subagent_config.name.clone(),
564                description: subagent_config.description.clone(),
565            },
566            agent: Arc::new(sub_agent),
567        });
568    }
569
570    // Optionally inject a general-purpose subagent
571    if config.auto_general_purpose {
572        let has_gp = registrations
573            .iter()
574            .any(|r| r.descriptor.name == "general-purpose");
575        if !has_gp {
576            // Create a subagent with inherited planner/tools and same instructions
577            let mut sub_cfg =
578                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
579                    .with_auto_general_purpose(false)
580                    .with_prompt_caching(config.enable_prompt_caching);
581            if let Some(ref selected) = config.builtin_tools {
582                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
583            }
584            if let Some(ref sum) = config.summarization {
585                sub_cfg = sub_cfg.with_summarization(sum.clone());
586            }
587            for t in &config.tools {
588                sub_cfg = sub_cfg.with_tool(t.clone());
589            }
590
591            let gp = create_deep_agent_from_config(sub_cfg);
592            registrations.push(SubAgentRegistration {
593                descriptor: SubAgentDescriptor {
594                    name: "general-purpose".into(),
595                    description: "Default reasoning agent".into(),
596                },
597                agent: Arc::new(gp),
598            });
599        }
600    }
601
602    let subagent = Arc::new(SubAgentMiddleware::new(registrations));
603    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
604    let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
605    let summarization = config.summarization.as_ref().map(|cfg| {
606        Arc::new(SummarizationMiddleware::new(
607            cfg.messages_to_keep,
608            cfg.summary_note.clone(),
609        ))
610    });
611    let hitl = if config.tool_interrupts.is_empty() {
612        None
613    } else {
614        Some(Arc::new(HumanInLoopMiddleware::new(
615            config.tool_interrupts.clone(),
616        )))
617    };
618
619    // Assemble middleware stack with Deep Agent prompt for automatic tool usage
620    // Order: base → deep agent prompt → planning → filesystem → subagents → summarization → caching → HITL
621    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
622        base_prompt,
623        deep_agent_prompt,
624        planning,
625        filesystem,
626        subagent,
627    ];
628    if let Some(ref summary) = summarization {
629        middlewares.push(summary.clone());
630    }
631    if config.enable_prompt_caching {
632        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
633    }
634    if let Some(ref hitl_mw) = hitl {
635        middlewares.push(hitl_mw.clone());
636    }
637
638    DeepAgent {
639        descriptor: AgentDescriptor {
640            name: "deep-agent".into(),
641            version: "0.0.1".into(),
642            description: Some("Rust deep agent".into()),
643        },
644        instructions: config.instructions,
645        planner: config.planner,
646        middlewares,
647        base_tools: config.tools,
648        state,
649        history,
650        _summarization: summarization,
651        hitl,
652        pending_hitl: Arc::new(RwLock::new(None)),
653        builtin_tools: config.builtin_tools,
654        checkpointer: config.checkpointer,
655    }
656}