agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10    ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30// (no streaming types in baseline)
31
32/// Helper function to count todos by status
33fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
34    let mut pending = 0;
35    let mut in_progress = 0;
36    let mut completed = 0;
37
38    for todo in todos {
39        match todo.status {
40            agents_core::state::TodoStatus::Pending => pending += 1,
41            agents_core::state::TodoStatus::InProgress => in_progress += 1,
42            agents_core::state::TodoStatus::Completed => completed += 1,
43        }
44    }
45
46    (pending, in_progress, completed)
47}
48
49/// Core Deep Agent runtime implementation
50///
51/// This struct contains all the runtime state and behavior for a Deep Agent,
52/// including middleware management, tool execution, HITL support, and state persistence.
53pub struct DeepAgent {
54    descriptor: AgentDescriptor,
55    instructions: String,
56    planner: Arc<dyn PlannerHandle>,
57    middlewares: Vec<Arc<dyn AgentMiddleware>>,
58    base_tools: Vec<ToolBox>,
59    state: Arc<RwLock<AgentStateSnapshot>>,
60    history: Arc<RwLock<Vec<AgentMessage>>>,
61    _summarization: Option<Arc<SummarizationMiddleware>>,
62    _hitl: Option<Arc<HumanInLoopMiddleware>>,
63    builtin_tools: Option<HashSet<String>>,
64    checkpointer: Option<Arc<dyn Checkpointer>>,
65    event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
66}
67
68impl DeepAgent {
69    fn collect_tools(&self) -> HashMap<String, ToolBox> {
70        let mut tools: HashMap<String, ToolBox> = HashMap::new();
71        for tool in &self.base_tools {
72            tools.insert(tool.schema().name.clone(), tool.clone());
73        }
74        for middleware in &self.middlewares {
75            for tool in middleware.tools() {
76                let tool_name = tool.schema().name.clone();
77                if self.should_include(&tool_name) {
78                    tools.insert(tool_name, tool);
79                }
80            }
81        }
82        tools
83    }
84    // no streaming path in baseline
85
86    fn should_include(&self, name: &str) -> bool {
87        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
88        if !is_builtin {
89            return true;
90        }
91        match &self.builtin_tools {
92            None => true,
93            Some(selected) => selected.contains(name),
94        }
95    }
96
97    fn append_history(&self, message: AgentMessage) {
98        if let Ok(mut history) = self.history.write() {
99            history.push(message);
100        }
101    }
102
103    fn current_history(&self) -> Vec<AgentMessage> {
104        self.history.read().map(|h| h.clone()).unwrap_or_default()
105    }
106
107    fn emit_event(&self, event: agents_core::events::AgentEvent) {
108        if let Some(dispatcher) = &self.event_dispatcher {
109            let dispatcher_clone = dispatcher.clone();
110            tokio::spawn(async move {
111                dispatcher_clone.dispatch(event).await;
112            });
113        }
114    }
115
116    fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
117        agents_core::events::EventMetadata::new(
118            "default".to_string(),
119            uuid::Uuid::new_v4().to_string(),
120            None,
121        )
122    }
123
124    fn truncate_message(&self, message: &AgentMessage) -> String {
125        let text = match &message.content {
126            MessageContent::Text(t) => t.clone(),
127            MessageContent::Json(v) => v.to_string(),
128        };
129        if text.len() > 100 {
130            format!("{}...", &text[..100])
131        } else {
132            text
133        }
134    }
135
136    fn summarize_payload(&self, payload: &Value) -> String {
137        let json_str = payload.to_string();
138        if json_str.len() > 100 {
139            format!("{}...", &json_str[..100])
140        } else {
141            json_str
142        }
143    }
144
145    /// Save the current agent state to the configured checkpointer.
146    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
147        if let Some(ref checkpointer) = self.checkpointer {
148            let state = self
149                .state
150                .read()
151                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
152                .clone();
153
154            // Calculate state size before saving
155            let state_json = serde_json::to_string(&state)?;
156            let state_size = state_json.len();
157
158            // Save state to checkpointer
159            checkpointer.save_state(thread_id, &state).await?;
160
161            // Emit StateCheckpointed event after successful save
162            self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
163                agents_core::events::StateCheckpointedEvent {
164                    metadata: self.create_event_metadata(),
165                    checkpoint_id: thread_id.to_string(),
166                    state_size_bytes: state_size,
167                },
168            ));
169
170            tracing::debug!(
171                thread_id = %thread_id,
172                state_size_bytes = state_size,
173                "πŸ’Ύ State checkpointed and event emitted"
174            );
175
176            Ok(())
177        } else {
178            tracing::warn!("Attempted to save state but no checkpointer is configured");
179            Ok(())
180        }
181    }
182
183    /// Load agent state from the configured checkpointer.
184    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
185        if let Some(ref checkpointer) = self.checkpointer {
186            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
187                *self
188                    .state
189                    .write()
190                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
191                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
192                Ok(true)
193            } else {
194                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
195                Ok(false)
196            }
197        } else {
198            tracing::warn!("Attempted to load state but no checkpointer is configured");
199            Ok(false)
200        }
201    }
202
203    /// Delete saved state for the specified thread.
204    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
205        if let Some(ref checkpointer) = self.checkpointer {
206            checkpointer.delete_thread(thread_id).await
207        } else {
208            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
209            Ok(())
210        }
211    }
212
213    /// List all threads with saved state.
214    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
215        if let Some(ref checkpointer) = self.checkpointer {
216            checkpointer.list_threads().await
217        } else {
218            Ok(Vec::new())
219        }
220    }
221
222    async fn execute_tool(
223        &self,
224        tool: ToolBox,
225        _tool_name: String,
226        payload: Value,
227    ) -> anyhow::Result<AgentMessage> {
228        let state_snapshot = self.state.read().unwrap().clone();
229        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
230
231        let result = tool.execute(payload, ctx).await?;
232        Ok(self.apply_tool_result(result))
233    }
234
235    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
236        match result {
237            ToolResult::Message(message) => {
238                // Tool results are not added to conversation history
239                // Only the final LLM response after tool execution is added
240                message
241            }
242            ToolResult::WithStateUpdate {
243                message,
244                state_diff,
245            } => {
246                // Check if todos were updated
247                let todos_updated = state_diff.todos.is_some();
248
249                if let Ok(mut state) = self.state.write() {
250                    let command = agents_core::command::Command::with_state(state_diff);
251                    command.apply_to(&mut state);
252
253                    // Emit TodosUpdated event if todos were modified
254                    if todos_updated {
255                        let (pending_count, in_progress_count, completed_count) =
256                            count_todos(&state.todos);
257
258                        self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
259                            agents_core::events::TodosUpdatedEvent {
260                                metadata: self.create_event_metadata(),
261                                todos: state.todos.clone(),
262                                pending_count,
263                                in_progress_count,
264                                completed_count,
265                                last_updated: chrono::Utc::now().to_rfc3339(),
266                            },
267                        ));
268
269                        tracing::debug!(
270                            pending = pending_count,
271                            in_progress = in_progress_count,
272                            completed = completed_count,
273                            total = state.todos.len(),
274                            "πŸ“ Todos updated and event emitted"
275                        );
276                    }
277                }
278                // Tool results are not added to conversation history
279                // Only the final LLM response after tool execution is added
280                message
281            }
282        }
283    }
284
285    /// Get the current pending interrupt, if any.
286    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
287        self.state
288            .read()
289            .ok()
290            .and_then(|guard| guard.pending_interrupts.first().cloned())
291    }
292
293    /// Add a broadcaster dynamically to the agent's event dispatcher.
294    ///
295    /// Add a single broadcaster dynamically after the agent is built.
296    ///
297    /// This is useful for per-conversation or per-customer broadcasters.
298    ///
299    /// # Example
300    /// ```no_run
301    /// use std::sync::Arc;
302    /// // agent.add_broadcaster(Arc::new(MyBroadcaster::new()));
303    /// ```
304    pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
305        if let Some(dispatcher) = &self.event_dispatcher {
306            dispatcher.add_broadcaster(broadcaster);
307            tracing::debug!("Broadcaster added to event dispatcher");
308        } else {
309            tracing::warn!("add_broadcaster called but no event dispatcher configured");
310        }
311    }
312
313    /// Add multiple broadcasters at once.
314    ///
315    /// This is useful when you need to add several broadcasters for a conversation
316    /// (e.g., WhatsApp, SSE, DynamoDB).
317    ///
318    /// # Example
319    /// ```no_run
320    /// use std::sync::Arc;
321    /// // agent.add_broadcasters(vec![
322    /// //     Arc::new(WhatsAppBroadcaster::new(phone)),
323    /// //     Arc::new(SseBroadcaster::new(channel)),
324    /// //     Arc::new(DynamoDbBroadcaster::new(table)),
325    /// // ]);
326    /// ```
327    pub fn add_broadcasters(
328        &self,
329        broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
330    ) {
331        if let Some(dispatcher) = &self.event_dispatcher {
332            for broadcaster in broadcasters {
333                dispatcher.add_broadcaster(broadcaster);
334            }
335            tracing::debug!("Multiple broadcasters added to event dispatcher");
336        } else {
337            tracing::warn!("add_broadcasters called but no event dispatcher configured");
338        }
339    }
340
341    /// Resume execution after human approval of an interrupt.
342    pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
343        // Get the first pending interrupt
344        let interrupt = {
345            let state_guard = self
346                .state
347                .read()
348                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
349            state_guard
350                .pending_interrupts
351                .first()
352                .cloned()
353                .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
354        };
355
356        let result_message = match action {
357            HitlAction::Accept => {
358                // Execute with original args
359                let AgentInterrupt::HumanInLoop(hitl) = interrupt;
360                tracing::info!(
361                    tool_name = %hitl.tool_name,
362                    call_id = %hitl.call_id,
363                    "βœ… HITL: Tool approved, executing with original arguments"
364                );
365
366                let tools = self.collect_tools();
367                let tool = tools
368                    .get(&hitl.tool_name)
369                    .cloned()
370                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
371
372                self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
373                    .await?
374            }
375
376            HitlAction::Edit {
377                tool_name,
378                tool_args,
379            } => {
380                // Execute with modified args
381                tracing::info!(
382                    tool_name = %tool_name,
383                    "✏️ HITL: Tool edited, executing with modified arguments"
384                );
385
386                let tools = self.collect_tools();
387                let tool = tools
388                    .get(&tool_name)
389                    .cloned()
390                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
391
392                self.execute_tool(tool, tool_name, tool_args).await?
393            }
394
395            HitlAction::Reject { reason } => {
396                // Don't execute - return rejection message
397                tracing::info!("❌ HITL: Tool rejected");
398
399                let text = reason
400                    .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
401
402                let message = AgentMessage {
403                    role: MessageRole::Tool,
404                    content: MessageContent::Text(text),
405                    metadata: None,
406                };
407
408                self.append_history(message.clone());
409                message
410            }
411
412            HitlAction::Respond { message } => {
413                // Don't execute - return custom message
414                tracing::info!("πŸ’¬ HITL: Custom response provided");
415
416                self.append_history(message.clone());
417                message
418            }
419        };
420
421        // Clear the interrupt from state
422        {
423            let mut state_guard = self
424                .state
425                .write()
426                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
427            state_guard.clear_interrupts();
428        }
429
430        // Persist cleared state
431        if let Some(checkpointer) = &self.checkpointer {
432            let state_clone = self
433                .state
434                .read()
435                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
436                .clone();
437            checkpointer
438                .save_state(&ThreadId::default(), &state_clone)
439                .await?;
440        }
441
442        Ok(result_message)
443    }
444
445    /// Handle message from string input - converts string to AgentMessage internally
446    pub async fn handle_message(
447        &self,
448        input: impl AsRef<str>,
449        state: Arc<AgentStateSnapshot>,
450    ) -> anyhow::Result<AgentMessage> {
451        self.handle_message_with_metadata(input, None, state).await
452    }
453
454    /// Handle message from string input with metadata - converts string to AgentMessage internally
455    pub async fn handle_message_with_metadata(
456        &self,
457        input: impl AsRef<str>,
458        metadata: Option<MessageMetadata>,
459        state: Arc<AgentStateSnapshot>,
460    ) -> anyhow::Result<AgentMessage> {
461        let agent_message = AgentMessage {
462            role: MessageRole::User,
463            content: MessageContent::Text(input.as_ref().to_string()),
464            metadata,
465        };
466        self.handle_message_internal(agent_message, state).await
467    }
468
469    /// Internal method that contains the actual message handling logic
470    async fn handle_message_internal(
471        &self,
472        input: AgentMessage,
473        _state: Arc<AgentStateSnapshot>,
474    ) -> anyhow::Result<AgentMessage> {
475        let start_time = std::time::Instant::now();
476
477        self.emit_event(agents_core::events::AgentEvent::AgentStarted(
478            agents_core::events::AgentStartedEvent {
479                metadata: self.create_event_metadata(),
480                agent_name: self.descriptor.name.clone(),
481                message_preview: self.truncate_message(&input),
482            },
483        ));
484
485        self.append_history(input.clone());
486
487        // ReAct loop: continue until LLM responds with text (not tool calls)
488        let max_iterations = 10;
489        let mut iteration = 0;
490
491        loop {
492            iteration += 1;
493            if iteration > max_iterations {
494                tracing::warn!(
495                    "⚠️ Max iterations ({}) reached, stopping ReAct loop",
496                    max_iterations
497                );
498                let response = AgentMessage {
499                    role: MessageRole::Agent,
500                    content: MessageContent::Text(
501                        "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
502                    ),
503                    metadata: None,
504                };
505                self.append_history(response.clone());
506                return Ok(response);
507            }
508
509            tracing::debug!("πŸ”„ ReAct iteration {}/{}", iteration, max_iterations);
510
511            // Build request with current history
512            let mut request = ModelRequest::new(&self.instructions, self.current_history());
513            let tools = self.collect_tools();
514            for middleware in &self.middlewares {
515                let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
516                middleware.modify_model_request(&mut ctx).await?;
517            }
518
519            let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
520            let context = PlannerContext {
521                history: request.messages.clone(),
522                system_prompt: request.system_prompt.clone(),
523                tools: tool_schemas,
524            };
525            let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
526
527            // Ask LLM what to do
528            let decision = self.planner.plan(context, state_snapshot).await?;
529
530            // Emit PlanningComplete event
531            self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
532                agents_core::events::PlanningCompleteEvent {
533                    metadata: self.create_event_metadata(),
534                    action_type: match &decision.next_action {
535                        PlannerAction::Respond { .. } => "respond".to_string(),
536                        PlannerAction::CallTool { .. } => "call_tool".to_string(),
537                        PlannerAction::Terminate => "terminate".to_string(),
538                    },
539                    action_summary: match &decision.next_action {
540                        PlannerAction::Respond { message } => {
541                            format!("Respond: {}", self.truncate_message(message))
542                        }
543                        PlannerAction::CallTool { tool_name, .. } => {
544                            format!("Call tool: {}", tool_name)
545                        }
546                        PlannerAction::Terminate => "Terminate".to_string(),
547                    },
548                },
549            ));
550
551            match decision.next_action {
552                PlannerAction::Respond { message } => {
553                    // LLM decided to respond with text - exit loop
554                    self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
555                        agents_core::events::AgentCompletedEvent {
556                            metadata: self.create_event_metadata(),
557                            agent_name: self.descriptor.name.clone(),
558                            duration_ms: start_time.elapsed().as_millis() as u64,
559                            response_preview: self.truncate_message(&message),
560                        },
561                    ));
562
563                    self.append_history(message.clone());
564                    return Ok(message);
565                }
566                PlannerAction::CallTool { tool_name, payload } => {
567                    // Add AI's decision to call tool to history
568                    // This is needed for OpenAI's API which expects:
569                    // 1. Assistant message with tool call
570                    // 2. Tool message with result
571                    let tool_call_message = AgentMessage {
572                        role: MessageRole::Agent,
573                        content: MessageContent::Text(format!(
574                            "Calling tool: {} with args: {}",
575                            tool_name,
576                            serde_json::to_string(&payload).unwrap_or_default()
577                        )),
578                        metadata: None,
579                    };
580                    self.append_history(tool_call_message);
581
582                    if let Some(tool) = tools.get(&tool_name).cloned() {
583                        // Check all middleware for interrupts before executing tool
584                        let call_id = format!("call_{}", uuid::Uuid::new_v4());
585                        for middleware in &self.middlewares {
586                            if let Some(interrupt) = middleware
587                                .before_tool_execution(&tool_name, &payload, &call_id)
588                                .await?
589                            {
590                                // Save interrupt to state
591                                {
592                                    let mut state_guard = self.state.write().map_err(|_| {
593                                        anyhow::anyhow!("Failed to acquire write lock on state")
594                                    })?;
595                                    state_guard.add_interrupt(interrupt.clone());
596                                }
597
598                                // Persist state with checkpointer
599                                if let Some(checkpointer) = &self.checkpointer {
600                                    let state_clone = self
601                                        .state
602                                        .read()
603                                        .map_err(|_| {
604                                            anyhow::anyhow!("Failed to acquire read lock on state")
605                                        })?
606                                        .clone();
607                                    checkpointer
608                                        .save_state(&ThreadId::default(), &state_clone)
609                                        .await?;
610                                }
611
612                                // Return interrupt message - execution pauses here
613                                let interrupt_message = AgentMessage {
614                                    role: MessageRole::System,
615                                    content: MessageContent::Text(format!(
616                                        "⏸️ Execution paused: Tool '{}' requires human approval",
617                                        tool_name
618                                    )),
619                                    metadata: None,
620                                };
621                                self.append_history(interrupt_message.clone());
622                                return Ok(interrupt_message);
623                            }
624                        }
625
626                        // No interrupt - execute tool
627                        let tool_start_time = std::time::Instant::now();
628
629                        self.emit_event(agents_core::events::AgentEvent::ToolStarted(
630                            agents_core::events::ToolStartedEvent {
631                                metadata: self.create_event_metadata(),
632                                tool_name: tool_name.clone(),
633                                input_summary: self.summarize_payload(&payload),
634                            },
635                        ));
636
637                        tracing::warn!(
638                            "βš™οΈ EXECUTING TOOL: {} with payload: {}",
639                            tool_name,
640                            serde_json::to_string(&payload)
641                                .unwrap_or_else(|_| "invalid json".to_string())
642                        );
643
644                        let result = self
645                            .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
646                            .await;
647
648                        let duration = tool_start_time.elapsed();
649                        match result {
650                            Ok(tool_result_message) => {
651                                let content_preview = match &tool_result_message.content {
652                                    MessageContent::Text(t) => {
653                                        if t.len() > 100 {
654                                            format!("{}... ({} chars)", &t[..100], t.len())
655                                        } else {
656                                            t.clone()
657                                        }
658                                    }
659                                    MessageContent::Json(v) => {
660                                        format!("JSON: {} bytes", v.to_string().len())
661                                    }
662                                };
663
664                                self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
665                                    agents_core::events::ToolCompletedEvent {
666                                        metadata: self.create_event_metadata(),
667                                        tool_name: tool_name.clone(),
668                                        duration_ms: duration.as_millis() as u64,
669                                        result_summary: content_preview.clone(),
670                                        success: true,
671                                    },
672                                ));
673
674                                tracing::warn!(
675                                    "βœ… TOOL COMPLETED: {} in {:?} - Result: {}",
676                                    tool_name,
677                                    duration,
678                                    content_preview
679                                );
680
681                                // Add tool result to history and continue ReAct loop
682                                self.append_history(tool_result_message);
683                                // Loop continues - LLM will see tool result and decide next action
684                            }
685                            Err(e) => {
686                                self.emit_event(agents_core::events::AgentEvent::ToolFailed(
687                                    agents_core::events::ToolFailedEvent {
688                                        metadata: self.create_event_metadata(),
689                                        tool_name: tool_name.clone(),
690                                        duration_ms: duration.as_millis() as u64,
691                                        error_message: e.to_string(),
692                                        is_recoverable: true,
693                                        retry_count: 0,
694                                    },
695                                ));
696
697                                tracing::error!(
698                                    "❌ TOOL FAILED: {} in {:?} - Error: {}",
699                                    tool_name,
700                                    duration,
701                                    e
702                                );
703
704                                // Add error to history and continue - let LLM handle the error
705                                let error_message = AgentMessage {
706                                    role: MessageRole::Tool,
707                                    content: MessageContent::Text(format!(
708                                        "Error executing {}: {}",
709                                        tool_name, e
710                                    )),
711                                    metadata: None,
712                                };
713                                self.append_history(error_message);
714                                // Loop continues - LLM will see error and decide how to handle it
715                            }
716                        }
717                    } else {
718                        // Tool not found - add error to history and continue
719                        tracing::warn!("⚠️ Tool '{}' not found", tool_name);
720                        let error_message = AgentMessage {
721                            role: MessageRole::Tool,
722                            content: MessageContent::Text(format!(
723                                "Tool '{}' not found. Available tools: {}",
724                                tool_name,
725                                tools
726                                    .keys()
727                                    .map(|k| k.as_str())
728                                    .collect::<Vec<_>>()
729                                    .join(", ")
730                            )),
731                            metadata: None,
732                        };
733                        self.append_history(error_message);
734                        // Loop continues - LLM will see error and try something else
735                    }
736                }
737                PlannerAction::Terminate => {
738                    // LLM decided to terminate - exit loop
739                    tracing::debug!("πŸ›‘ Agent terminated");
740                    let message = AgentMessage {
741                        role: MessageRole::Agent,
742                        content: MessageContent::Text("Task completed.".into()),
743                        metadata: None,
744                    };
745                    self.append_history(message.clone());
746                    return Ok(message);
747                }
748            }
749        }
750    }
751}
752
753#[async_trait]
754impl AgentHandle for DeepAgent {
755    async fn describe(&self) -> AgentDescriptor {
756        self.descriptor.clone()
757    }
758
759    async fn handle_message(
760        &self,
761        input: AgentMessage,
762        _state: Arc<AgentStateSnapshot>,
763    ) -> anyhow::Result<AgentMessage> {
764        self.handle_message_internal(input, _state).await
765    }
766
767    async fn handle_message_stream(
768        &self,
769        input: AgentMessage,
770        _state: Arc<AgentStateSnapshot>,
771    ) -> anyhow::Result<agents_core::agent::AgentStream> {
772        use crate::planner::LlmBackedPlanner;
773        use agents_core::llm::{LlmRequest, StreamChunk};
774
775        // Add input to history
776        self.append_history(input.clone());
777
778        // Build the request similar to handle_message_internal
779        let mut request = ModelRequest::new(&self.instructions, self.current_history());
780        let tools = self.collect_tools();
781
782        // Apply middleware modifications
783        for middleware in &self.middlewares {
784            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
785            middleware.modify_model_request(&mut ctx).await?;
786        }
787
788        // Convert ModelRequest to LlmRequest and add tools
789        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
790        let llm_request = LlmRequest {
791            system_prompt: request.system_prompt.clone(),
792            messages: request.messages.clone(),
793            tools: tool_schemas,
794        };
795
796        // Try to get the underlying LLM model for streaming
797        let planner_any = self.planner.as_any();
798
799        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
800            // We have an LlmBackedPlanner, use its model for streaming
801            let model = llm_planner.model().clone();
802            let stream = model.generate_stream(llm_request).await?;
803            Ok(stream)
804        } else {
805            // Fallback to non-streaming
806            let response = self.handle_message_internal(input, _state).await?;
807            Ok(Box::pin(futures::stream::once(async move {
808                Ok(StreamChunk::Done { message: response })
809            })))
810        }
811    }
812
813    async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
814        let state_guard = self
815            .state
816            .read()
817            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
818        Ok(state_guard.pending_interrupts.first().cloned())
819    }
820
821    async fn resume_with_approval(
822        &self,
823        action: agents_core::hitl::HitlAction,
824    ) -> anyhow::Result<AgentMessage> {
825        self.resume_with_approval(action).await
826    }
827}
828
829/// Create a deep agent from configuration - matches Python middleware assembly exactly
830///
831/// This function assembles the middleware stack in the same order as the Python SDK:
832/// planning β†’ filesystem β†’ subagents β†’ summarization β†’ prompt caching β†’ optional HITL
833pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
834    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
835    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
836
837    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
838    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
839
840    // Build sub-agents from configurations
841    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
842
843    // Build custom sub-agents from configs
844    for subagent_config in &config.subagent_configs {
845        // Determine the planner for this sub-agent
846        let sub_planner = if let Some(ref model) = subagent_config.model {
847            // Sub-agent has its own model - wrap it in a planner
848            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
849        } else {
850            // Inherit parent's planner
851            config.planner.clone()
852        };
853
854        // Create a DeepAgentConfig for this sub-agent
855        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
856
857        // Configure tools
858        if let Some(ref tools) = subagent_config.tools {
859            for tool in tools {
860                sub_cfg = sub_cfg.with_tool(tool.clone());
861            }
862        }
863
864        // Configure built-in tools
865        if let Some(ref builtin) = subagent_config.builtin_tools {
866            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
867        }
868
869        // Sub-agents should not have their own sub-agents
870        sub_cfg = sub_cfg.with_auto_general_purpose(false);
871
872        // Configure prompt caching
873        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
874
875        // Build the sub-agent recursively
876        let sub_agent = create_deep_agent_from_config(sub_cfg);
877
878        // Register the sub-agent
879        registrations.push(SubAgentRegistration {
880            descriptor: SubAgentDescriptor {
881                name: subagent_config.name.clone(),
882                description: subagent_config.description.clone(),
883            },
884            agent: Arc::new(sub_agent),
885        });
886    }
887
888    // Optionally inject a general-purpose subagent
889    if config.auto_general_purpose {
890        let has_gp = registrations
891            .iter()
892            .any(|r| r.descriptor.name == "general-purpose");
893        if !has_gp {
894            // Create a subagent with inherited planner/tools and same instructions
895            let mut sub_cfg =
896                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
897                    .with_auto_general_purpose(false)
898                    .with_prompt_caching(config.enable_prompt_caching);
899            if let Some(ref selected) = config.builtin_tools {
900                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
901            }
902            if let Some(ref sum) = config.summarization {
903                sub_cfg = sub_cfg.with_summarization(sum.clone());
904            }
905            for t in &config.tools {
906                sub_cfg = sub_cfg.with_tool(t.clone());
907            }
908
909            let gp = create_deep_agent_from_config(sub_cfg);
910            registrations.push(SubAgentRegistration {
911                descriptor: SubAgentDescriptor {
912                    name: "general-purpose".into(),
913                    description: "Default reasoning agent".into(),
914                },
915                agent: Arc::new(gp),
916            });
917        }
918    }
919
920    let subagent = Arc::new(SubAgentMiddleware::new_with_events(
921        registrations,
922        config.event_dispatcher.clone(),
923    ));
924    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
925    let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
926    let summarization = config.summarization.as_ref().map(|cfg| {
927        Arc::new(SummarizationMiddleware::new(
928            cfg.messages_to_keep,
929            cfg.summary_note.clone(),
930        ))
931    });
932    let hitl = if config.tool_interrupts.is_empty() {
933        None
934    } else {
935        // Validate that checkpointer is configured when HITL is enabled
936        if config.checkpointer.is_none() {
937            tracing::error!(
938                "⚠️ HITL middleware requires a checkpointer to persist interrupt state. \
939                 HITL will be disabled. Please configure a checkpointer to enable HITL."
940            );
941            None
942        } else {
943            tracing::info!("πŸ”’ HITL enabled for {} tools", config.tool_interrupts.len());
944            Some(Arc::new(HumanInLoopMiddleware::new(
945                config.tool_interrupts.clone(),
946            )))
947        }
948    };
949
950    // Assemble middleware stack with Deep Agent prompt for automatic tool usage
951    // Order: base β†’ deep agent prompt β†’ planning β†’ filesystem β†’ subagents β†’ summarization β†’ caching β†’ HITL
952    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
953        base_prompt,
954        deep_agent_prompt,
955        planning,
956        filesystem,
957        subagent,
958    ];
959    if let Some(ref summary) = summarization {
960        middlewares.push(summary.clone());
961    }
962    if config.enable_prompt_caching {
963        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
964    }
965    if let Some(ref hitl_mw) = hitl {
966        middlewares.push(hitl_mw.clone());
967    }
968
969    DeepAgent {
970        descriptor: AgentDescriptor {
971            name: "deep-agent".into(),
972            version: "0.0.1".into(),
973            description: Some("Rust deep agent".into()),
974        },
975        instructions: config.instructions,
976        planner: config.planner,
977        middlewares,
978        base_tools: config.tools,
979        state,
980        history,
981        _summarization: summarization,
982        _hitl: hitl,
983        builtin_tools: config.builtin_tools,
984        checkpointer: config.checkpointer,
985        event_dispatcher: config.event_dispatcher,
986    }
987}