agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10    ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30// (no streaming types in baseline)
31
32/// Helper function to count todos by status
33fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
34    let mut pending = 0;
35    let mut in_progress = 0;
36    let mut completed = 0;
37
38    for todo in todos {
39        match todo.status {
40            agents_core::state::TodoStatus::Pending => pending += 1,
41            agents_core::state::TodoStatus::InProgress => in_progress += 1,
42            agents_core::state::TodoStatus::Completed => completed += 1,
43        }
44    }
45
46    (pending, in_progress, completed)
47}
48
49/// Core Deep Agent runtime implementation
50///
51/// This struct contains all the runtime state and behavior for a Deep Agent,
52/// including middleware management, tool execution, HITL support, and state persistence.
53pub struct DeepAgent {
54    descriptor: AgentDescriptor,
55    instructions: String,
56    planner: Arc<dyn PlannerHandle>,
57    middlewares: Vec<Arc<dyn AgentMiddleware>>,
58    base_tools: Vec<ToolBox>,
59    state: Arc<RwLock<AgentStateSnapshot>>,
60    history: Arc<RwLock<Vec<AgentMessage>>>,
61    _summarization: Option<Arc<SummarizationMiddleware>>,
62    _hitl: Option<Arc<HumanInLoopMiddleware>>,
63    builtin_tools: Option<HashSet<String>>,
64    checkpointer: Option<Arc<dyn Checkpointer>>,
65    event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
66    enable_pii_sanitization: bool,
67}
68
69impl DeepAgent {
70    fn collect_tools(&self) -> HashMap<String, ToolBox> {
71        let mut tools: HashMap<String, ToolBox> = HashMap::new();
72        for tool in &self.base_tools {
73            tools.insert(tool.schema().name.clone(), tool.clone());
74        }
75        for middleware in &self.middlewares {
76            for tool in middleware.tools() {
77                let tool_name = tool.schema().name.clone();
78                if self.should_include(&tool_name) {
79                    tools.insert(tool_name, tool);
80                }
81            }
82        }
83        tools
84    }
85    // no streaming path in baseline
86
87    fn should_include(&self, name: &str) -> bool {
88        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
89        if !is_builtin {
90            return true;
91        }
92        match &self.builtin_tools {
93            None => true,
94            Some(selected) => selected.contains(name),
95        }
96    }
97
98    fn append_history(&self, message: AgentMessage) {
99        if let Ok(mut history) = self.history.write() {
100            history.push(message);
101        }
102    }
103
104    fn current_history(&self) -> Vec<AgentMessage> {
105        self.history.read().map(|h| h.clone()).unwrap_or_default()
106    }
107
108    fn emit_event(&self, event: agents_core::events::AgentEvent) {
109        if let Some(dispatcher) = &self.event_dispatcher {
110            let dispatcher_clone = dispatcher.clone();
111            tokio::spawn(async move {
112                dispatcher_clone.dispatch(event).await;
113            });
114        }
115    }
116
117    fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
118        agents_core::events::EventMetadata::new(
119            "default".to_string(),
120            uuid::Uuid::new_v4().to_string(),
121            None,
122        )
123    }
124
125    fn truncate_message(&self, message: &AgentMessage) -> String {
126        let text = match &message.content {
127            MessageContent::Text(t) => t.clone(),
128            MessageContent::Json(v) => v.to_string(),
129        };
130
131        if self.enable_pii_sanitization {
132            agents_core::security::safe_preview(&text, agents_core::security::MAX_PREVIEW_LENGTH)
133        } else {
134            // No sanitization - just truncate
135            agents_core::security::truncate_string(&text, agents_core::security::MAX_PREVIEW_LENGTH)
136        }
137    }
138
139    fn get_full_message_text(&self, message: &AgentMessage) -> String {
140        match &message.content {
141            MessageContent::Text(t) => t.clone(),
142            MessageContent::Json(v) => v.to_string(),
143        }
144    }
145
146    fn summarize_payload(&self, payload: &Value) -> String {
147        if self.enable_pii_sanitization {
148            agents_core::security::sanitize_tool_payload(
149                payload,
150                agents_core::security::MAX_PREVIEW_LENGTH,
151            )
152        } else {
153            // No sanitization - just truncate JSON string
154            let json_str = payload.to_string();
155            agents_core::security::truncate_string(
156                &json_str,
157                agents_core::security::MAX_PREVIEW_LENGTH,
158            )
159        }
160    }
161
162    /// Save the current agent state to the configured checkpointer.
163    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
164        if let Some(ref checkpointer) = self.checkpointer {
165            let state = self
166                .state
167                .read()
168                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
169                .clone();
170
171            // Calculate state size before saving
172            let state_json = serde_json::to_string(&state)?;
173            let state_size = state_json.len();
174
175            // Save state to checkpointer
176            checkpointer.save_state(thread_id, &state).await?;
177
178            // Emit StateCheckpointed event after successful save
179            self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
180                agents_core::events::StateCheckpointedEvent {
181                    metadata: self.create_event_metadata(),
182                    checkpoint_id: thread_id.to_string(),
183                    state_size_bytes: state_size,
184                },
185            ));
186
187            tracing::debug!(
188                thread_id = %thread_id,
189                state_size_bytes = state_size,
190                "πŸ’Ύ State checkpointed and event emitted"
191            );
192
193            Ok(())
194        } else {
195            tracing::warn!("Attempted to save state but no checkpointer is configured");
196            Ok(())
197        }
198    }
199
200    /// Load agent state from the configured checkpointer.
201    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
202        if let Some(ref checkpointer) = self.checkpointer {
203            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
204                *self
205                    .state
206                    .write()
207                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
208                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
209                Ok(true)
210            } else {
211                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
212                Ok(false)
213            }
214        } else {
215            tracing::warn!("Attempted to load state but no checkpointer is configured");
216            Ok(false)
217        }
218    }
219
220    /// Delete saved state for the specified thread.
221    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
222        if let Some(ref checkpointer) = self.checkpointer {
223            checkpointer.delete_thread(thread_id).await
224        } else {
225            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
226            Ok(())
227        }
228    }
229
230    /// List all threads with saved state.
231    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
232        if let Some(ref checkpointer) = self.checkpointer {
233            checkpointer.list_threads().await
234        } else {
235            Ok(Vec::new())
236        }
237    }
238
239    async fn execute_tool(
240        &self,
241        tool: ToolBox,
242        _tool_name: String,
243        payload: Value,
244    ) -> anyhow::Result<AgentMessage> {
245        let state_snapshot = self.state.read().unwrap().clone();
246        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
247
248        let result = tool.execute(payload, ctx).await?;
249        Ok(self.apply_tool_result(result))
250    }
251
252    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
253        match result {
254            ToolResult::Message(message) => {
255                // Tool results are not added to conversation history
256                // Only the final LLM response after tool execution is added
257                message
258            }
259            ToolResult::WithStateUpdate {
260                message,
261                state_diff,
262            } => {
263                // Check if todos were updated
264                let todos_updated = state_diff.todos.is_some();
265
266                if let Ok(mut state) = self.state.write() {
267                    let command = agents_core::command::Command::with_state(state_diff);
268                    command.apply_to(&mut state);
269
270                    // Emit TodosUpdated event if todos were modified
271                    if todos_updated {
272                        let (pending_count, in_progress_count, completed_count) =
273                            count_todos(&state.todos);
274
275                        self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
276                            agents_core::events::TodosUpdatedEvent {
277                                metadata: self.create_event_metadata(),
278                                todos: state.todos.clone(),
279                                pending_count,
280                                in_progress_count,
281                                completed_count,
282                                last_updated: chrono::Utc::now().to_rfc3339(),
283                            },
284                        ));
285
286                        tracing::debug!(
287                            pending = pending_count,
288                            in_progress = in_progress_count,
289                            completed = completed_count,
290                            total = state.todos.len(),
291                            "πŸ“ Todos updated and event emitted"
292                        );
293                    }
294                }
295                // Tool results are not added to conversation history
296                // Only the final LLM response after tool execution is added
297                message
298            }
299        }
300    }
301
302    /// Get the current pending interrupt, if any.
303    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
304        self.state
305            .read()
306            .ok()
307            .and_then(|guard| guard.pending_interrupts.first().cloned())
308    }
309
310    /// Add a broadcaster dynamically to the agent's event dispatcher.
311    ///
312    /// Add a single broadcaster dynamically after the agent is built.
313    ///
314    /// This is useful for per-conversation or per-customer broadcasters.
315    ///
316    /// # Example
317    /// ```no_run
318    /// use std::sync::Arc;
319    /// // agent.add_broadcaster(Arc::new(MyBroadcaster::new()));
320    /// ```
321    pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
322        if let Some(dispatcher) = &self.event_dispatcher {
323            dispatcher.add_broadcaster(broadcaster);
324            tracing::debug!("Broadcaster added to event dispatcher");
325        } else {
326            tracing::warn!("add_broadcaster called but no event dispatcher configured");
327        }
328    }
329
330    /// Add multiple broadcasters at once.
331    ///
332    /// This is useful when you need to add several broadcasters for a conversation
333    /// (e.g., WhatsApp, SSE, DynamoDB).
334    ///
335    /// # Example
336    /// ```no_run
337    /// use std::sync::Arc;
338    /// // agent.add_broadcasters(vec![
339    /// //     Arc::new(WhatsAppBroadcaster::new(phone)),
340    /// //     Arc::new(SseBroadcaster::new(channel)),
341    /// //     Arc::new(DynamoDbBroadcaster::new(table)),
342    /// // ]);
343    /// ```
344    pub fn add_broadcasters(
345        &self,
346        broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
347    ) {
348        if let Some(dispatcher) = &self.event_dispatcher {
349            for broadcaster in broadcasters {
350                dispatcher.add_broadcaster(broadcaster);
351            }
352            tracing::debug!("Multiple broadcasters added to event dispatcher");
353        } else {
354            tracing::warn!("add_broadcasters called but no event dispatcher configured");
355        }
356    }
357
358    /// Resume execution after human approval of an interrupt.
359    pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
360        // Get the first pending interrupt
361        let interrupt = {
362            let state_guard = self
363                .state
364                .read()
365                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
366            state_guard
367                .pending_interrupts
368                .first()
369                .cloned()
370                .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
371        };
372
373        let result_message = match action {
374            HitlAction::Accept => {
375                // Execute with original args
376                let AgentInterrupt::HumanInLoop(hitl) = interrupt;
377                tracing::info!(
378                    tool_name = %hitl.tool_name,
379                    call_id = %hitl.call_id,
380                    "βœ… HITL: Tool approved, executing with original arguments"
381                );
382
383                let tools = self.collect_tools();
384                let tool = tools
385                    .get(&hitl.tool_name)
386                    .cloned()
387                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
388
389                self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
390                    .await?
391            }
392
393            HitlAction::Edit {
394                tool_name,
395                tool_args,
396            } => {
397                // Execute with modified args
398                tracing::info!(
399                    tool_name = %tool_name,
400                    "✏️ HITL: Tool edited, executing with modified arguments"
401                );
402
403                let tools = self.collect_tools();
404                let tool = tools
405                    .get(&tool_name)
406                    .cloned()
407                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
408
409                self.execute_tool(tool, tool_name, tool_args).await?
410            }
411
412            HitlAction::Reject { reason } => {
413                // Don't execute - return rejection message
414                tracing::info!("❌ HITL: Tool rejected");
415
416                let text = reason
417                    .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
418
419                let message = AgentMessage {
420                    role: MessageRole::Tool,
421                    content: MessageContent::Text(text),
422                    metadata: None,
423                };
424
425                self.append_history(message.clone());
426                message
427            }
428
429            HitlAction::Respond { message } => {
430                // Don't execute - return custom message
431                tracing::info!("πŸ’¬ HITL: Custom response provided");
432
433                self.append_history(message.clone());
434                message
435            }
436        };
437
438        // Clear the interrupt from state
439        {
440            let mut state_guard = self
441                .state
442                .write()
443                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
444            state_guard.clear_interrupts();
445        }
446
447        // Persist cleared state
448        if let Some(checkpointer) = &self.checkpointer {
449            let state_clone = self
450                .state
451                .read()
452                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
453                .clone();
454            checkpointer
455                .save_state(&ThreadId::default(), &state_clone)
456                .await?;
457        }
458
459        Ok(result_message)
460    }
461
462    /// Handle message from string input - converts string to AgentMessage internally
463    pub async fn handle_message(
464        &self,
465        input: impl AsRef<str>,
466        state: Arc<AgentStateSnapshot>,
467    ) -> anyhow::Result<AgentMessage> {
468        self.handle_message_with_metadata(input, None, state).await
469    }
470
471    /// Handle message from string input with metadata - converts string to AgentMessage internally
472    pub async fn handle_message_with_metadata(
473        &self,
474        input: impl AsRef<str>,
475        metadata: Option<MessageMetadata>,
476        state: Arc<AgentStateSnapshot>,
477    ) -> anyhow::Result<AgentMessage> {
478        let agent_message = AgentMessage {
479            role: MessageRole::User,
480            content: MessageContent::Text(input.as_ref().to_string()),
481            metadata,
482        };
483        self.handle_message_internal(agent_message, state).await
484    }
485
486    /// Internal method that contains the actual message handling logic
487    async fn handle_message_internal(
488        &self,
489        input: AgentMessage,
490        _state: Arc<AgentStateSnapshot>,
491    ) -> anyhow::Result<AgentMessage> {
492        let start_time = std::time::Instant::now();
493
494        self.emit_event(agents_core::events::AgentEvent::AgentStarted(
495            agents_core::events::AgentStartedEvent {
496                metadata: self.create_event_metadata(),
497                agent_name: self.descriptor.name.clone(),
498                message_preview: self.truncate_message(&input),
499            },
500        ));
501
502        self.append_history(input.clone());
503
504        // ReAct loop: continue until LLM responds with text (not tool calls)
505        let max_iterations = 10;
506        let mut iteration = 0;
507
508        loop {
509            iteration += 1;
510            if iteration > max_iterations {
511                tracing::warn!(
512                    "⚠️ Max iterations ({}) reached, stopping ReAct loop",
513                    max_iterations
514                );
515                let response = AgentMessage {
516                    role: MessageRole::Agent,
517                    content: MessageContent::Text(
518                        "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
519                    ),
520                    metadata: None,
521                };
522                self.append_history(response.clone());
523                return Ok(response);
524            }
525
526            tracing::debug!("πŸ”„ ReAct iteration {}/{}", iteration, max_iterations);
527
528            // Build request with current history
529            let mut request = ModelRequest::new(&self.instructions, self.current_history());
530            let tools = self.collect_tools();
531            for middleware in &self.middlewares {
532                let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
533                middleware.modify_model_request(&mut ctx).await?;
534            }
535
536            let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
537            let context = PlannerContext {
538                history: request.messages.clone(),
539                system_prompt: request.system_prompt.clone(),
540                tools: tool_schemas,
541            };
542            let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
543
544            // Ask LLM what to do
545            let decision = self.planner.plan(context, state_snapshot).await?;
546
547            // Emit PlanningComplete event
548            self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
549                agents_core::events::PlanningCompleteEvent {
550                    metadata: self.create_event_metadata(),
551                    action_type: match &decision.next_action {
552                        PlannerAction::Respond { .. } => "respond".to_string(),
553                        PlannerAction::CallTool { .. } => "call_tool".to_string(),
554                        PlannerAction::Terminate => "terminate".to_string(),
555                    },
556                    action_summary: match &decision.next_action {
557                        PlannerAction::Respond { message } => {
558                            format!("Respond: {}", self.truncate_message(message))
559                        }
560                        PlannerAction::CallTool { tool_name, .. } => {
561                            format!("Call tool: {}", tool_name)
562                        }
563                        PlannerAction::Terminate => "Terminate".to_string(),
564                    },
565                },
566            ));
567
568            match decision.next_action {
569                PlannerAction::Respond { message } => {
570                    // LLM decided to respond with text - exit loop
571                    self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
572                        agents_core::events::AgentCompletedEvent {
573                            metadata: self.create_event_metadata(),
574                            agent_name: self.descriptor.name.clone(),
575                            duration_ms: start_time.elapsed().as_millis() as u64,
576                            response_preview: self.truncate_message(&message),
577                            response: self.get_full_message_text(&message),
578                        },
579                    ));
580
581                    self.append_history(message.clone());
582                    return Ok(message);
583                }
584                PlannerAction::CallTool { tool_name, payload } => {
585                    // Add AI's decision to call tool to history
586                    // This is needed for OpenAI's API which expects:
587                    // 1. Assistant message with tool call
588                    // 2. Tool message with result
589                    let tool_call_message = AgentMessage {
590                        role: MessageRole::Agent,
591                        content: MessageContent::Text(format!(
592                            "Calling tool: {} with args: {}",
593                            tool_name,
594                            serde_json::to_string(&payload).unwrap_or_default()
595                        )),
596                        metadata: None,
597                    };
598                    self.append_history(tool_call_message);
599
600                    if let Some(tool) = tools.get(&tool_name).cloned() {
601                        // Check all middleware for interrupts before executing tool
602                        let call_id = format!("call_{}", uuid::Uuid::new_v4());
603                        for middleware in &self.middlewares {
604                            if let Some(interrupt) = middleware
605                                .before_tool_execution(&tool_name, &payload, &call_id)
606                                .await?
607                            {
608                                // Save interrupt to state
609                                {
610                                    let mut state_guard = self.state.write().map_err(|_| {
611                                        anyhow::anyhow!("Failed to acquire write lock on state")
612                                    })?;
613                                    state_guard.add_interrupt(interrupt.clone());
614                                }
615
616                                // Persist state with checkpointer
617                                if let Some(checkpointer) = &self.checkpointer {
618                                    let state_clone = self
619                                        .state
620                                        .read()
621                                        .map_err(|_| {
622                                            anyhow::anyhow!("Failed to acquire read lock on state")
623                                        })?
624                                        .clone();
625                                    checkpointer
626                                        .save_state(&ThreadId::default(), &state_clone)
627                                        .await?;
628                                }
629
630                                // Return interrupt message - execution pauses here
631                                let interrupt_message = AgentMessage {
632                                    role: MessageRole::System,
633                                    content: MessageContent::Text(format!(
634                                        "⏸️ Execution paused: Tool '{}' requires human approval",
635                                        tool_name
636                                    )),
637                                    metadata: None,
638                                };
639                                self.append_history(interrupt_message.clone());
640                                return Ok(interrupt_message);
641                            }
642                        }
643
644                        // No interrupt - execute tool
645                        let tool_start_time = std::time::Instant::now();
646
647                        self.emit_event(agents_core::events::AgentEvent::ToolStarted(
648                            agents_core::events::ToolStartedEvent {
649                                metadata: self.create_event_metadata(),
650                                tool_name: tool_name.clone(),
651                                input_summary: self.summarize_payload(&payload),
652                            },
653                        ));
654
655                        tracing::warn!(
656                            "βš™οΈ EXECUTING TOOL: {} with payload: {}",
657                            tool_name,
658                            serde_json::to_string(&payload)
659                                .unwrap_or_else(|_| "invalid json".to_string())
660                        );
661
662                        let result = self
663                            .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
664                            .await;
665
666                        let duration = tool_start_time.elapsed();
667                        match result {
668                            Ok(tool_result_message) => {
669                                let content_preview = match &tool_result_message.content {
670                                    MessageContent::Text(t) => {
671                                        if t.len() > 100 {
672                                            format!("{}... ({} chars)", &t[..100], t.len())
673                                        } else {
674                                            t.clone()
675                                        }
676                                    }
677                                    MessageContent::Json(v) => {
678                                        format!("JSON: {} bytes", v.to_string().len())
679                                    }
680                                };
681
682                                self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
683                                    agents_core::events::ToolCompletedEvent {
684                                        metadata: self.create_event_metadata(),
685                                        tool_name: tool_name.clone(),
686                                        duration_ms: duration.as_millis() as u64,
687                                        result_summary: content_preview.clone(),
688                                        success: true,
689                                    },
690                                ));
691
692                                tracing::warn!(
693                                    "βœ… TOOL COMPLETED: {} in {:?} - Result: {}",
694                                    tool_name,
695                                    duration,
696                                    content_preview
697                                );
698
699                                // Add tool result to history and continue ReAct loop
700                                self.append_history(tool_result_message);
701                                // Loop continues - LLM will see tool result and decide next action
702                            }
703                            Err(e) => {
704                                self.emit_event(agents_core::events::AgentEvent::ToolFailed(
705                                    agents_core::events::ToolFailedEvent {
706                                        metadata: self.create_event_metadata(),
707                                        tool_name: tool_name.clone(),
708                                        duration_ms: duration.as_millis() as u64,
709                                        error_message: e.to_string(),
710                                        is_recoverable: true,
711                                        retry_count: 0,
712                                    },
713                                ));
714
715                                tracing::error!(
716                                    "❌ TOOL FAILED: {} in {:?} - Error: {}",
717                                    tool_name,
718                                    duration,
719                                    e
720                                );
721
722                                // Add error to history and continue - let LLM handle the error
723                                let error_message = AgentMessage {
724                                    role: MessageRole::Tool,
725                                    content: MessageContent::Text(format!(
726                                        "Error executing {}: {}",
727                                        tool_name, e
728                                    )),
729                                    metadata: None,
730                                };
731                                self.append_history(error_message);
732                                // Loop continues - LLM will see error and decide how to handle it
733                            }
734                        }
735                    } else {
736                        // Tool not found - add error to history and continue
737                        tracing::warn!("⚠️ Tool '{}' not found", tool_name);
738                        let error_message = AgentMessage {
739                            role: MessageRole::Tool,
740                            content: MessageContent::Text(format!(
741                                "Tool '{}' not found. Available tools: {}",
742                                tool_name,
743                                tools
744                                    .keys()
745                                    .map(|k| k.as_str())
746                                    .collect::<Vec<_>>()
747                                    .join(", ")
748                            )),
749                            metadata: None,
750                        };
751                        self.append_history(error_message);
752                        // Loop continues - LLM will see error and try something else
753                    }
754                }
755                PlannerAction::Terminate => {
756                    // LLM decided to terminate - exit loop
757                    tracing::debug!("πŸ›‘ Agent terminated");
758                    let message = AgentMessage {
759                        role: MessageRole::Agent,
760                        content: MessageContent::Text("Task completed.".into()),
761                        metadata: None,
762                    };
763                    self.append_history(message.clone());
764                    return Ok(message);
765                }
766            }
767        }
768    }
769}
770
771#[async_trait]
772impl AgentHandle for DeepAgent {
773    async fn describe(&self) -> AgentDescriptor {
774        self.descriptor.clone()
775    }
776
777    async fn handle_message(
778        &self,
779        input: AgentMessage,
780        _state: Arc<AgentStateSnapshot>,
781    ) -> anyhow::Result<AgentMessage> {
782        self.handle_message_internal(input, _state).await
783    }
784
785    async fn handle_message_stream(
786        &self,
787        input: AgentMessage,
788        _state: Arc<AgentStateSnapshot>,
789    ) -> anyhow::Result<agents_core::agent::AgentStream> {
790        use crate::planner::LlmBackedPlanner;
791        use agents_core::llm::{LlmRequest, StreamChunk};
792
793        // Add input to history
794        self.append_history(input.clone());
795
796        // Build the request similar to handle_message_internal
797        let mut request = ModelRequest::new(&self.instructions, self.current_history());
798        let tools = self.collect_tools();
799
800        // Apply middleware modifications
801        for middleware in &self.middlewares {
802            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
803            middleware.modify_model_request(&mut ctx).await?;
804        }
805
806        // Convert ModelRequest to LlmRequest and add tools
807        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
808        let llm_request = LlmRequest {
809            system_prompt: request.system_prompt.clone(),
810            messages: request.messages.clone(),
811            tools: tool_schemas,
812        };
813
814        // Try to get the underlying LLM model for streaming
815        let planner_any = self.planner.as_any();
816
817        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
818            // We have an LlmBackedPlanner, use its model for streaming
819            let model = llm_planner.model().clone();
820            let stream = model.generate_stream(llm_request).await?;
821            Ok(stream)
822        } else {
823            // Fallback to non-streaming
824            let response = self.handle_message_internal(input, _state).await?;
825            Ok(Box::pin(futures::stream::once(async move {
826                Ok(StreamChunk::Done { message: response })
827            })))
828        }
829    }
830
831    async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
832        let state_guard = self
833            .state
834            .read()
835            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
836        Ok(state_guard.pending_interrupts.first().cloned())
837    }
838
839    async fn resume_with_approval(
840        &self,
841        action: agents_core::hitl::HitlAction,
842    ) -> anyhow::Result<AgentMessage> {
843        self.resume_with_approval(action).await
844    }
845}
846
847/// Create a deep agent from configuration - matches Python middleware assembly exactly
848///
849/// This function assembles the middleware stack in the same order as the Python SDK:
850/// planning β†’ filesystem β†’ subagents β†’ summarization β†’ prompt caching β†’ optional HITL
851pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
852    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
853    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
854
855    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
856    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
857
858    // Build sub-agents from configurations
859    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
860
861    // Build custom sub-agents from configs
862    tracing::info!(
863        "πŸ“‹ Processing {} sub-agent configurations",
864        config.subagent_configs.len()
865    );
866
867    for subagent_config in &config.subagent_configs {
868        tracing::info!(
869            "πŸ—οΈ Building sub-agent: {} ({})",
870            subagent_config.name,
871            subagent_config.description
872        );
873
874        // Determine the planner for this sub-agent
875        let sub_planner = if let Some(ref model) = subagent_config.model {
876            // Sub-agent has its own model - wrap it in a planner
877            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
878        } else {
879            // Inherit parent's planner
880            config.planner.clone()
881        };
882
883        // Create a DeepAgentConfig for this sub-agent
884        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
885
886        // Configure tools
887        if let Some(ref tools) = subagent_config.tools {
888            tracing::debug!(
889                "  - Configuring {} tools for {}",
890                tools.len(),
891                subagent_config.name
892            );
893            for tool in tools {
894                sub_cfg = sub_cfg.with_tool(tool.clone());
895            }
896        }
897
898        // Configure built-in tools
899        if let Some(ref builtin) = subagent_config.builtin_tools {
900            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
901        }
902
903        // Sub-agents should not have their own sub-agents
904        sub_cfg = sub_cfg.with_auto_general_purpose(false);
905
906        // Configure prompt caching
907        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
908
909        // Inherit PII sanitization setting from parent
910        sub_cfg = sub_cfg.with_pii_sanitization(config.enable_pii_sanitization);
911
912        // Build the sub-agent recursively
913        let sub_agent = create_deep_agent_from_config(sub_cfg);
914
915        // Register the sub-agent
916        registrations.push(SubAgentRegistration {
917            descriptor: SubAgentDescriptor {
918                name: subagent_config.name.clone(),
919                description: subagent_config.description.clone(),
920            },
921            agent: Arc::new(sub_agent),
922        });
923
924        tracing::info!("=> Registered sub-agent: {}", subagent_config.name);
925    }
926
927    tracing::info!("=> Total sub-agents registered: {}", registrations.len());
928
929    // Optionally inject a general-purpose subagent
930    if config.auto_general_purpose {
931        let has_gp = registrations
932            .iter()
933            .any(|r| r.descriptor.name == "general-purpose");
934        if !has_gp {
935            // Create a subagent with inherited planner/tools and same instructions
936            let mut sub_cfg =
937                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
938                    .with_auto_general_purpose(false)
939                    .with_prompt_caching(config.enable_prompt_caching)
940                    .with_pii_sanitization(config.enable_pii_sanitization);
941            if let Some(ref selected) = config.builtin_tools {
942                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
943            }
944            if let Some(ref sum) = config.summarization {
945                sub_cfg = sub_cfg.with_summarization(sum.clone());
946            }
947            for t in &config.tools {
948                sub_cfg = sub_cfg.with_tool(t.clone());
949            }
950
951            let gp = create_deep_agent_from_config(sub_cfg);
952            registrations.push(SubAgentRegistration {
953                descriptor: SubAgentDescriptor {
954                    name: "general-purpose".into(),
955                    description: "Default reasoning agent".into(),
956                },
957                agent: Arc::new(gp),
958            });
959        }
960    }
961
962    let subagent = Arc::new(SubAgentMiddleware::new_with_events(
963        registrations,
964        config.event_dispatcher.clone(),
965    ));
966    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
967    let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
968    let summarization = config.summarization.as_ref().map(|cfg| {
969        Arc::new(SummarizationMiddleware::new(
970            cfg.messages_to_keep,
971            cfg.summary_note.clone(),
972        ))
973    });
974    let hitl = if config.tool_interrupts.is_empty() {
975        None
976    } else {
977        // Validate that checkpointer is configured when HITL is enabled
978        if config.checkpointer.is_none() {
979            tracing::error!(
980                "⚠️ HITL middleware requires a checkpointer to persist interrupt state. \
981                 HITL will be disabled. Please configure a checkpointer to enable HITL."
982            );
983            None
984        } else {
985            tracing::info!("πŸ”’ HITL enabled for {} tools", config.tool_interrupts.len());
986            Some(Arc::new(HumanInLoopMiddleware::new(
987                config.tool_interrupts.clone(),
988            )))
989        }
990    };
991
992    // Assemble middleware stack with Deep Agent prompt for automatic tool usage
993    // Order: base β†’ deep agent prompt β†’ planning β†’ filesystem β†’ subagents β†’ summarization β†’ caching β†’ HITL
994    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
995        base_prompt,
996        deep_agent_prompt,
997        planning,
998        filesystem,
999        subagent,
1000    ];
1001    if let Some(ref summary) = summarization {
1002        middlewares.push(summary.clone());
1003    }
1004    if config.enable_prompt_caching {
1005        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
1006    }
1007    if let Some(ref hitl_mw) = hitl {
1008        middlewares.push(hitl_mw.clone());
1009    }
1010
1011    DeepAgent {
1012        descriptor: AgentDescriptor {
1013            name: "deep-agent".into(),
1014            version: "0.0.1".into(),
1015            description: Some("Rust deep agent".into()),
1016        },
1017        instructions: config.instructions,
1018        planner: config.planner,
1019        middlewares,
1020        base_tools: config.tools,
1021        state,
1022        history,
1023        _summarization: summarization,
1024        _hitl: hitl,
1025        builtin_tools: config.builtin_tools,
1026        checkpointer: config.checkpointer,
1027        event_dispatcher: config.event_dispatcher,
1028        enable_pii_sanitization: config.enable_pii_sanitization,
1029    }
1030}