agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10    ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30// (no streaming types in baseline)
31
32/// Helper function to count todos by status
33fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
34    let mut pending = 0;
35    let mut in_progress = 0;
36    let mut completed = 0;
37
38    for todo in todos {
39        match todo.status {
40            agents_core::state::TodoStatus::Pending => pending += 1,
41            agents_core::state::TodoStatus::InProgress => in_progress += 1,
42            agents_core::state::TodoStatus::Completed => completed += 1,
43        }
44    }
45
46    (pending, in_progress, completed)
47}
48
49/// Core Deep Agent runtime implementation
50///
51/// This struct contains all the runtime state and behavior for a Deep Agent,
52/// including middleware management, tool execution, HITL support, and state persistence.
53pub struct DeepAgent {
54    descriptor: AgentDescriptor,
55    instructions: String,
56    planner: Arc<dyn PlannerHandle>,
57    middlewares: Vec<Arc<dyn AgentMiddleware>>,
58    base_tools: Vec<ToolBox>,
59    state: Arc<RwLock<AgentStateSnapshot>>,
60    history: Arc<RwLock<Vec<AgentMessage>>>,
61    _summarization: Option<Arc<SummarizationMiddleware>>,
62    _hitl: Option<Arc<HumanInLoopMiddleware>>,
63    builtin_tools: Option<HashSet<String>>,
64    checkpointer: Option<Arc<dyn Checkpointer>>,
65    event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
66    enable_pii_sanitization: bool,
67}
68
69impl DeepAgent {
70    fn collect_tools(&self) -> HashMap<String, ToolBox> {
71        let mut tools: HashMap<String, ToolBox> = HashMap::new();
72        for tool in &self.base_tools {
73            tools.insert(tool.schema().name.clone(), tool.clone());
74        }
75        for middleware in &self.middlewares {
76            for tool in middleware.tools() {
77                let tool_name = tool.schema().name.clone();
78                if self.should_include(&tool_name) {
79                    tools.insert(tool_name, tool);
80                }
81            }
82        }
83        tools
84    }
85    // no streaming path in baseline
86
87    fn should_include(&self, name: &str) -> bool {
88        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
89        if !is_builtin {
90            return true;
91        }
92        match &self.builtin_tools {
93            None => true,
94            Some(selected) => selected.contains(name),
95        }
96    }
97
98    fn append_history(&self, message: AgentMessage) {
99        if let Ok(mut history) = self.history.write() {
100            history.push(message);
101        }
102    }
103
104    fn current_history(&self) -> Vec<AgentMessage> {
105        self.history.read().map(|h| h.clone()).unwrap_or_default()
106    }
107
108    fn emit_event(&self, event: agents_core::events::AgentEvent) {
109        if let Some(dispatcher) = &self.event_dispatcher {
110            let dispatcher_clone = dispatcher.clone();
111            tokio::spawn(async move {
112                dispatcher_clone.dispatch(event).await;
113            });
114        }
115    }
116
117    fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
118        agents_core::events::EventMetadata::new(
119            "default".to_string(),
120            uuid::Uuid::new_v4().to_string(),
121            None,
122        )
123    }
124
125    fn truncate_message(&self, message: &AgentMessage) -> String {
126        let text = match &message.content {
127            MessageContent::Text(t) => t.clone(),
128            MessageContent::Json(v) => v.to_string(),
129        };
130
131        if self.enable_pii_sanitization {
132            agents_core::security::safe_preview(&text, agents_core::security::MAX_PREVIEW_LENGTH)
133        } else {
134            // No sanitization - just truncate
135            agents_core::security::truncate_string(&text, agents_core::security::MAX_PREVIEW_LENGTH)
136        }
137    }
138
139    fn summarize_payload(&self, payload: &Value) -> String {
140        if self.enable_pii_sanitization {
141            agents_core::security::sanitize_tool_payload(
142                payload,
143                agents_core::security::MAX_PREVIEW_LENGTH,
144            )
145        } else {
146            // No sanitization - just truncate JSON string
147            let json_str = payload.to_string();
148            agents_core::security::truncate_string(
149                &json_str,
150                agents_core::security::MAX_PREVIEW_LENGTH,
151            )
152        }
153    }
154
155    /// Save the current agent state to the configured checkpointer.
156    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
157        if let Some(ref checkpointer) = self.checkpointer {
158            let state = self
159                .state
160                .read()
161                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
162                .clone();
163
164            // Calculate state size before saving
165            let state_json = serde_json::to_string(&state)?;
166            let state_size = state_json.len();
167
168            // Save state to checkpointer
169            checkpointer.save_state(thread_id, &state).await?;
170
171            // Emit StateCheckpointed event after successful save
172            self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
173                agents_core::events::StateCheckpointedEvent {
174                    metadata: self.create_event_metadata(),
175                    checkpoint_id: thread_id.to_string(),
176                    state_size_bytes: state_size,
177                },
178            ));
179
180            tracing::debug!(
181                thread_id = %thread_id,
182                state_size_bytes = state_size,
183                "πŸ’Ύ State checkpointed and event emitted"
184            );
185
186            Ok(())
187        } else {
188            tracing::warn!("Attempted to save state but no checkpointer is configured");
189            Ok(())
190        }
191    }
192
193    /// Load agent state from the configured checkpointer.
194    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
195        if let Some(ref checkpointer) = self.checkpointer {
196            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
197                *self
198                    .state
199                    .write()
200                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
201                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
202                Ok(true)
203            } else {
204                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
205                Ok(false)
206            }
207        } else {
208            tracing::warn!("Attempted to load state but no checkpointer is configured");
209            Ok(false)
210        }
211    }
212
213    /// Delete saved state for the specified thread.
214    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
215        if let Some(ref checkpointer) = self.checkpointer {
216            checkpointer.delete_thread(thread_id).await
217        } else {
218            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
219            Ok(())
220        }
221    }
222
223    /// List all threads with saved state.
224    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
225        if let Some(ref checkpointer) = self.checkpointer {
226            checkpointer.list_threads().await
227        } else {
228            Ok(Vec::new())
229        }
230    }
231
232    async fn execute_tool(
233        &self,
234        tool: ToolBox,
235        _tool_name: String,
236        payload: Value,
237    ) -> anyhow::Result<AgentMessage> {
238        let state_snapshot = self.state.read().unwrap().clone();
239        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
240
241        let result = tool.execute(payload, ctx).await?;
242        Ok(self.apply_tool_result(result))
243    }
244
245    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
246        match result {
247            ToolResult::Message(message) => {
248                // Tool results are not added to conversation history
249                // Only the final LLM response after tool execution is added
250                message
251            }
252            ToolResult::WithStateUpdate {
253                message,
254                state_diff,
255            } => {
256                // Check if todos were updated
257                let todos_updated = state_diff.todos.is_some();
258
259                if let Ok(mut state) = self.state.write() {
260                    let command = agents_core::command::Command::with_state(state_diff);
261                    command.apply_to(&mut state);
262
263                    // Emit TodosUpdated event if todos were modified
264                    if todos_updated {
265                        let (pending_count, in_progress_count, completed_count) =
266                            count_todos(&state.todos);
267
268                        self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
269                            agents_core::events::TodosUpdatedEvent {
270                                metadata: self.create_event_metadata(),
271                                todos: state.todos.clone(),
272                                pending_count,
273                                in_progress_count,
274                                completed_count,
275                                last_updated: chrono::Utc::now().to_rfc3339(),
276                            },
277                        ));
278
279                        tracing::debug!(
280                            pending = pending_count,
281                            in_progress = in_progress_count,
282                            completed = completed_count,
283                            total = state.todos.len(),
284                            "πŸ“ Todos updated and event emitted"
285                        );
286                    }
287                }
288                // Tool results are not added to conversation history
289                // Only the final LLM response after tool execution is added
290                message
291            }
292        }
293    }
294
295    /// Get the current pending interrupt, if any.
296    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
297        self.state
298            .read()
299            .ok()
300            .and_then(|guard| guard.pending_interrupts.first().cloned())
301    }
302
303    /// Add a broadcaster dynamically to the agent's event dispatcher.
304    ///
305    /// Add a single broadcaster dynamically after the agent is built.
306    ///
307    /// This is useful for per-conversation or per-customer broadcasters.
308    ///
309    /// # Example
310    /// ```no_run
311    /// use std::sync::Arc;
312    /// // agent.add_broadcaster(Arc::new(MyBroadcaster::new()));
313    /// ```
314    pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
315        if let Some(dispatcher) = &self.event_dispatcher {
316            dispatcher.add_broadcaster(broadcaster);
317            tracing::debug!("Broadcaster added to event dispatcher");
318        } else {
319            tracing::warn!("add_broadcaster called but no event dispatcher configured");
320        }
321    }
322
323    /// Add multiple broadcasters at once.
324    ///
325    /// This is useful when you need to add several broadcasters for a conversation
326    /// (e.g., WhatsApp, SSE, DynamoDB).
327    ///
328    /// # Example
329    /// ```no_run
330    /// use std::sync::Arc;
331    /// // agent.add_broadcasters(vec![
332    /// //     Arc::new(WhatsAppBroadcaster::new(phone)),
333    /// //     Arc::new(SseBroadcaster::new(channel)),
334    /// //     Arc::new(DynamoDbBroadcaster::new(table)),
335    /// // ]);
336    /// ```
337    pub fn add_broadcasters(
338        &self,
339        broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
340    ) {
341        if let Some(dispatcher) = &self.event_dispatcher {
342            for broadcaster in broadcasters {
343                dispatcher.add_broadcaster(broadcaster);
344            }
345            tracing::debug!("Multiple broadcasters added to event dispatcher");
346        } else {
347            tracing::warn!("add_broadcasters called but no event dispatcher configured");
348        }
349    }
350
351    /// Resume execution after human approval of an interrupt.
352    pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
353        // Get the first pending interrupt
354        let interrupt = {
355            let state_guard = self
356                .state
357                .read()
358                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
359            state_guard
360                .pending_interrupts
361                .first()
362                .cloned()
363                .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
364        };
365
366        let result_message = match action {
367            HitlAction::Accept => {
368                // Execute with original args
369                let AgentInterrupt::HumanInLoop(hitl) = interrupt;
370                tracing::info!(
371                    tool_name = %hitl.tool_name,
372                    call_id = %hitl.call_id,
373                    "βœ… HITL: Tool approved, executing with original arguments"
374                );
375
376                let tools = self.collect_tools();
377                let tool = tools
378                    .get(&hitl.tool_name)
379                    .cloned()
380                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
381
382                self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
383                    .await?
384            }
385
386            HitlAction::Edit {
387                tool_name,
388                tool_args,
389            } => {
390                // Execute with modified args
391                tracing::info!(
392                    tool_name = %tool_name,
393                    "✏️ HITL: Tool edited, executing with modified arguments"
394                );
395
396                let tools = self.collect_tools();
397                let tool = tools
398                    .get(&tool_name)
399                    .cloned()
400                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
401
402                self.execute_tool(tool, tool_name, tool_args).await?
403            }
404
405            HitlAction::Reject { reason } => {
406                // Don't execute - return rejection message
407                tracing::info!("❌ HITL: Tool rejected");
408
409                let text = reason
410                    .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
411
412                let message = AgentMessage {
413                    role: MessageRole::Tool,
414                    content: MessageContent::Text(text),
415                    metadata: None,
416                };
417
418                self.append_history(message.clone());
419                message
420            }
421
422            HitlAction::Respond { message } => {
423                // Don't execute - return custom message
424                tracing::info!("πŸ’¬ HITL: Custom response provided");
425
426                self.append_history(message.clone());
427                message
428            }
429        };
430
431        // Clear the interrupt from state
432        {
433            let mut state_guard = self
434                .state
435                .write()
436                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
437            state_guard.clear_interrupts();
438        }
439
440        // Persist cleared state
441        if let Some(checkpointer) = &self.checkpointer {
442            let state_clone = self
443                .state
444                .read()
445                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
446                .clone();
447            checkpointer
448                .save_state(&ThreadId::default(), &state_clone)
449                .await?;
450        }
451
452        Ok(result_message)
453    }
454
455    /// Handle message from string input - converts string to AgentMessage internally
456    pub async fn handle_message(
457        &self,
458        input: impl AsRef<str>,
459        state: Arc<AgentStateSnapshot>,
460    ) -> anyhow::Result<AgentMessage> {
461        self.handle_message_with_metadata(input, None, state).await
462    }
463
464    /// Handle message from string input with metadata - converts string to AgentMessage internally
465    pub async fn handle_message_with_metadata(
466        &self,
467        input: impl AsRef<str>,
468        metadata: Option<MessageMetadata>,
469        state: Arc<AgentStateSnapshot>,
470    ) -> anyhow::Result<AgentMessage> {
471        let agent_message = AgentMessage {
472            role: MessageRole::User,
473            content: MessageContent::Text(input.as_ref().to_string()),
474            metadata,
475        };
476        self.handle_message_internal(agent_message, state).await
477    }
478
479    /// Internal method that contains the actual message handling logic
480    async fn handle_message_internal(
481        &self,
482        input: AgentMessage,
483        _state: Arc<AgentStateSnapshot>,
484    ) -> anyhow::Result<AgentMessage> {
485        let start_time = std::time::Instant::now();
486
487        self.emit_event(agents_core::events::AgentEvent::AgentStarted(
488            agents_core::events::AgentStartedEvent {
489                metadata: self.create_event_metadata(),
490                agent_name: self.descriptor.name.clone(),
491                message_preview: self.truncate_message(&input),
492            },
493        ));
494
495        self.append_history(input.clone());
496
497        // ReAct loop: continue until LLM responds with text (not tool calls)
498        let max_iterations = 10;
499        let mut iteration = 0;
500
501        loop {
502            iteration += 1;
503            if iteration > max_iterations {
504                tracing::warn!(
505                    "⚠️ Max iterations ({}) reached, stopping ReAct loop",
506                    max_iterations
507                );
508                let response = AgentMessage {
509                    role: MessageRole::Agent,
510                    content: MessageContent::Text(
511                        "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
512                    ),
513                    metadata: None,
514                };
515                self.append_history(response.clone());
516                return Ok(response);
517            }
518
519            tracing::debug!("πŸ”„ ReAct iteration {}/{}", iteration, max_iterations);
520
521            // Build request with current history
522            let mut request = ModelRequest::new(&self.instructions, self.current_history());
523            let tools = self.collect_tools();
524            for middleware in &self.middlewares {
525                let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
526                middleware.modify_model_request(&mut ctx).await?;
527            }
528
529            let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
530            let context = PlannerContext {
531                history: request.messages.clone(),
532                system_prompt: request.system_prompt.clone(),
533                tools: tool_schemas,
534            };
535            let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
536
537            // Ask LLM what to do
538            let decision = self.planner.plan(context, state_snapshot).await?;
539
540            // Emit PlanningComplete event
541            self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
542                agents_core::events::PlanningCompleteEvent {
543                    metadata: self.create_event_metadata(),
544                    action_type: match &decision.next_action {
545                        PlannerAction::Respond { .. } => "respond".to_string(),
546                        PlannerAction::CallTool { .. } => "call_tool".to_string(),
547                        PlannerAction::Terminate => "terminate".to_string(),
548                    },
549                    action_summary: match &decision.next_action {
550                        PlannerAction::Respond { message } => {
551                            format!("Respond: {}", self.truncate_message(message))
552                        }
553                        PlannerAction::CallTool { tool_name, .. } => {
554                            format!("Call tool: {}", tool_name)
555                        }
556                        PlannerAction::Terminate => "Terminate".to_string(),
557                    },
558                },
559            ));
560
561            match decision.next_action {
562                PlannerAction::Respond { message } => {
563                    // LLM decided to respond with text - exit loop
564                    self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
565                        agents_core::events::AgentCompletedEvent {
566                            metadata: self.create_event_metadata(),
567                            agent_name: self.descriptor.name.clone(),
568                            duration_ms: start_time.elapsed().as_millis() as u64,
569                            response_preview: self.truncate_message(&message),
570                        },
571                    ));
572
573                    self.append_history(message.clone());
574                    return Ok(message);
575                }
576                PlannerAction::CallTool { tool_name, payload } => {
577                    // Add AI's decision to call tool to history
578                    // This is needed for OpenAI's API which expects:
579                    // 1. Assistant message with tool call
580                    // 2. Tool message with result
581                    let tool_call_message = AgentMessage {
582                        role: MessageRole::Agent,
583                        content: MessageContent::Text(format!(
584                            "Calling tool: {} with args: {}",
585                            tool_name,
586                            serde_json::to_string(&payload).unwrap_or_default()
587                        )),
588                        metadata: None,
589                    };
590                    self.append_history(tool_call_message);
591
592                    if let Some(tool) = tools.get(&tool_name).cloned() {
593                        // Check all middleware for interrupts before executing tool
594                        let call_id = format!("call_{}", uuid::Uuid::new_v4());
595                        for middleware in &self.middlewares {
596                            if let Some(interrupt) = middleware
597                                .before_tool_execution(&tool_name, &payload, &call_id)
598                                .await?
599                            {
600                                // Save interrupt to state
601                                {
602                                    let mut state_guard = self.state.write().map_err(|_| {
603                                        anyhow::anyhow!("Failed to acquire write lock on state")
604                                    })?;
605                                    state_guard.add_interrupt(interrupt.clone());
606                                }
607
608                                // Persist state with checkpointer
609                                if let Some(checkpointer) = &self.checkpointer {
610                                    let state_clone = self
611                                        .state
612                                        .read()
613                                        .map_err(|_| {
614                                            anyhow::anyhow!("Failed to acquire read lock on state")
615                                        })?
616                                        .clone();
617                                    checkpointer
618                                        .save_state(&ThreadId::default(), &state_clone)
619                                        .await?;
620                                }
621
622                                // Return interrupt message - execution pauses here
623                                let interrupt_message = AgentMessage {
624                                    role: MessageRole::System,
625                                    content: MessageContent::Text(format!(
626                                        "⏸️ Execution paused: Tool '{}' requires human approval",
627                                        tool_name
628                                    )),
629                                    metadata: None,
630                                };
631                                self.append_history(interrupt_message.clone());
632                                return Ok(interrupt_message);
633                            }
634                        }
635
636                        // No interrupt - execute tool
637                        let tool_start_time = std::time::Instant::now();
638
639                        self.emit_event(agents_core::events::AgentEvent::ToolStarted(
640                            agents_core::events::ToolStartedEvent {
641                                metadata: self.create_event_metadata(),
642                                tool_name: tool_name.clone(),
643                                input_summary: self.summarize_payload(&payload),
644                            },
645                        ));
646
647                        tracing::warn!(
648                            "βš™οΈ EXECUTING TOOL: {} with payload: {}",
649                            tool_name,
650                            serde_json::to_string(&payload)
651                                .unwrap_or_else(|_| "invalid json".to_string())
652                        );
653
654                        let result = self
655                            .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
656                            .await;
657
658                        let duration = tool_start_time.elapsed();
659                        match result {
660                            Ok(tool_result_message) => {
661                                let content_preview = match &tool_result_message.content {
662                                    MessageContent::Text(t) => {
663                                        if t.len() > 100 {
664                                            format!("{}... ({} chars)", &t[..100], t.len())
665                                        } else {
666                                            t.clone()
667                                        }
668                                    }
669                                    MessageContent::Json(v) => {
670                                        format!("JSON: {} bytes", v.to_string().len())
671                                    }
672                                };
673
674                                self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
675                                    agents_core::events::ToolCompletedEvent {
676                                        metadata: self.create_event_metadata(),
677                                        tool_name: tool_name.clone(),
678                                        duration_ms: duration.as_millis() as u64,
679                                        result_summary: content_preview.clone(),
680                                        success: true,
681                                    },
682                                ));
683
684                                tracing::warn!(
685                                    "βœ… TOOL COMPLETED: {} in {:?} - Result: {}",
686                                    tool_name,
687                                    duration,
688                                    content_preview
689                                );
690
691                                // Add tool result to history and continue ReAct loop
692                                self.append_history(tool_result_message);
693                                // Loop continues - LLM will see tool result and decide next action
694                            }
695                            Err(e) => {
696                                self.emit_event(agents_core::events::AgentEvent::ToolFailed(
697                                    agents_core::events::ToolFailedEvent {
698                                        metadata: self.create_event_metadata(),
699                                        tool_name: tool_name.clone(),
700                                        duration_ms: duration.as_millis() as u64,
701                                        error_message: e.to_string(),
702                                        is_recoverable: true,
703                                        retry_count: 0,
704                                    },
705                                ));
706
707                                tracing::error!(
708                                    "❌ TOOL FAILED: {} in {:?} - Error: {}",
709                                    tool_name,
710                                    duration,
711                                    e
712                                );
713
714                                // Add error to history and continue - let LLM handle the error
715                                let error_message = AgentMessage {
716                                    role: MessageRole::Tool,
717                                    content: MessageContent::Text(format!(
718                                        "Error executing {}: {}",
719                                        tool_name, e
720                                    )),
721                                    metadata: None,
722                                };
723                                self.append_history(error_message);
724                                // Loop continues - LLM will see error and decide how to handle it
725                            }
726                        }
727                    } else {
728                        // Tool not found - add error to history and continue
729                        tracing::warn!("⚠️ Tool '{}' not found", tool_name);
730                        let error_message = AgentMessage {
731                            role: MessageRole::Tool,
732                            content: MessageContent::Text(format!(
733                                "Tool '{}' not found. Available tools: {}",
734                                tool_name,
735                                tools
736                                    .keys()
737                                    .map(|k| k.as_str())
738                                    .collect::<Vec<_>>()
739                                    .join(", ")
740                            )),
741                            metadata: None,
742                        };
743                        self.append_history(error_message);
744                        // Loop continues - LLM will see error and try something else
745                    }
746                }
747                PlannerAction::Terminate => {
748                    // LLM decided to terminate - exit loop
749                    tracing::debug!("πŸ›‘ Agent terminated");
750                    let message = AgentMessage {
751                        role: MessageRole::Agent,
752                        content: MessageContent::Text("Task completed.".into()),
753                        metadata: None,
754                    };
755                    self.append_history(message.clone());
756                    return Ok(message);
757                }
758            }
759        }
760    }
761}
762
763#[async_trait]
764impl AgentHandle for DeepAgent {
765    async fn describe(&self) -> AgentDescriptor {
766        self.descriptor.clone()
767    }
768
769    async fn handle_message(
770        &self,
771        input: AgentMessage,
772        _state: Arc<AgentStateSnapshot>,
773    ) -> anyhow::Result<AgentMessage> {
774        self.handle_message_internal(input, _state).await
775    }
776
777    async fn handle_message_stream(
778        &self,
779        input: AgentMessage,
780        _state: Arc<AgentStateSnapshot>,
781    ) -> anyhow::Result<agents_core::agent::AgentStream> {
782        use crate::planner::LlmBackedPlanner;
783        use agents_core::llm::{LlmRequest, StreamChunk};
784
785        // Add input to history
786        self.append_history(input.clone());
787
788        // Build the request similar to handle_message_internal
789        let mut request = ModelRequest::new(&self.instructions, self.current_history());
790        let tools = self.collect_tools();
791
792        // Apply middleware modifications
793        for middleware in &self.middlewares {
794            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
795            middleware.modify_model_request(&mut ctx).await?;
796        }
797
798        // Convert ModelRequest to LlmRequest and add tools
799        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
800        let llm_request = LlmRequest {
801            system_prompt: request.system_prompt.clone(),
802            messages: request.messages.clone(),
803            tools: tool_schemas,
804        };
805
806        // Try to get the underlying LLM model for streaming
807        let planner_any = self.planner.as_any();
808
809        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
810            // We have an LlmBackedPlanner, use its model for streaming
811            let model = llm_planner.model().clone();
812            let stream = model.generate_stream(llm_request).await?;
813            Ok(stream)
814        } else {
815            // Fallback to non-streaming
816            let response = self.handle_message_internal(input, _state).await?;
817            Ok(Box::pin(futures::stream::once(async move {
818                Ok(StreamChunk::Done { message: response })
819            })))
820        }
821    }
822
823    async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
824        let state_guard = self
825            .state
826            .read()
827            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
828        Ok(state_guard.pending_interrupts.first().cloned())
829    }
830
831    async fn resume_with_approval(
832        &self,
833        action: agents_core::hitl::HitlAction,
834    ) -> anyhow::Result<AgentMessage> {
835        self.resume_with_approval(action).await
836    }
837}
838
839/// Create a deep agent from configuration - matches Python middleware assembly exactly
840///
841/// This function assembles the middleware stack in the same order as the Python SDK:
842/// planning β†’ filesystem β†’ subagents β†’ summarization β†’ prompt caching β†’ optional HITL
843pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
844    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
845    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
846
847    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
848    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
849
850    // Build sub-agents from configurations
851    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
852
853    // Build custom sub-agents from configs
854    for subagent_config in &config.subagent_configs {
855        // Determine the planner for this sub-agent
856        let sub_planner = if let Some(ref model) = subagent_config.model {
857            // Sub-agent has its own model - wrap it in a planner
858            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
859        } else {
860            // Inherit parent's planner
861            config.planner.clone()
862        };
863
864        // Create a DeepAgentConfig for this sub-agent
865        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
866
867        // Configure tools
868        if let Some(ref tools) = subagent_config.tools {
869            for tool in tools {
870                sub_cfg = sub_cfg.with_tool(tool.clone());
871            }
872        }
873
874        // Configure built-in tools
875        if let Some(ref builtin) = subagent_config.builtin_tools {
876            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
877        }
878
879        // Sub-agents should not have their own sub-agents
880        sub_cfg = sub_cfg.with_auto_general_purpose(false);
881
882        // Configure prompt caching
883        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
884
885        // Inherit PII sanitization setting from parent
886        sub_cfg = sub_cfg.with_pii_sanitization(config.enable_pii_sanitization);
887
888        // Build the sub-agent recursively
889        let sub_agent = create_deep_agent_from_config(sub_cfg);
890
891        // Register the sub-agent
892        registrations.push(SubAgentRegistration {
893            descriptor: SubAgentDescriptor {
894                name: subagent_config.name.clone(),
895                description: subagent_config.description.clone(),
896            },
897            agent: Arc::new(sub_agent),
898        });
899    }
900
901    // Optionally inject a general-purpose subagent
902    if config.auto_general_purpose {
903        let has_gp = registrations
904            .iter()
905            .any(|r| r.descriptor.name == "general-purpose");
906        if !has_gp {
907            // Create a subagent with inherited planner/tools and same instructions
908            let mut sub_cfg =
909                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
910                    .with_auto_general_purpose(false)
911                    .with_prompt_caching(config.enable_prompt_caching)
912                    .with_pii_sanitization(config.enable_pii_sanitization);
913            if let Some(ref selected) = config.builtin_tools {
914                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
915            }
916            if let Some(ref sum) = config.summarization {
917                sub_cfg = sub_cfg.with_summarization(sum.clone());
918            }
919            for t in &config.tools {
920                sub_cfg = sub_cfg.with_tool(t.clone());
921            }
922
923            let gp = create_deep_agent_from_config(sub_cfg);
924            registrations.push(SubAgentRegistration {
925                descriptor: SubAgentDescriptor {
926                    name: "general-purpose".into(),
927                    description: "Default reasoning agent".into(),
928                },
929                agent: Arc::new(gp),
930            });
931        }
932    }
933
934    let subagent = Arc::new(SubAgentMiddleware::new_with_events(
935        registrations,
936        config.event_dispatcher.clone(),
937    ));
938    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
939    let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
940    let summarization = config.summarization.as_ref().map(|cfg| {
941        Arc::new(SummarizationMiddleware::new(
942            cfg.messages_to_keep,
943            cfg.summary_note.clone(),
944        ))
945    });
946    let hitl = if config.tool_interrupts.is_empty() {
947        None
948    } else {
949        // Validate that checkpointer is configured when HITL is enabled
950        if config.checkpointer.is_none() {
951            tracing::error!(
952                "⚠️ HITL middleware requires a checkpointer to persist interrupt state. \
953                 HITL will be disabled. Please configure a checkpointer to enable HITL."
954            );
955            None
956        } else {
957            tracing::info!("πŸ”’ HITL enabled for {} tools", config.tool_interrupts.len());
958            Some(Arc::new(HumanInLoopMiddleware::new(
959                config.tool_interrupts.clone(),
960            )))
961        }
962    };
963
964    // Assemble middleware stack with Deep Agent prompt for automatic tool usage
965    // Order: base β†’ deep agent prompt β†’ planning β†’ filesystem β†’ subagents β†’ summarization β†’ caching β†’ HITL
966    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
967        base_prompt,
968        deep_agent_prompt,
969        planning,
970        filesystem,
971        subagent,
972    ];
973    if let Some(ref summary) = summarization {
974        middlewares.push(summary.clone());
975    }
976    if config.enable_prompt_caching {
977        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
978    }
979    if let Some(ref hitl_mw) = hitl {
980        middlewares.push(hitl_mw.clone());
981    }
982
983    DeepAgent {
984        descriptor: AgentDescriptor {
985            name: "deep-agent".into(),
986            version: "0.0.1".into(),
987            description: Some("Rust deep agent".into()),
988        },
989        instructions: config.instructions,
990        planner: config.planner,
991        middlewares,
992        base_tools: config.tools,
993        state,
994        history,
995        _summarization: summarization,
996        _hitl: hitl,
997        builtin_tools: config.builtin_tools,
998        checkpointer: config.checkpointer,
999        event_dispatcher: config.event_dispatcher,
1000        enable_pii_sanitization: config.enable_pii_sanitization,
1001    }
1002}