agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10    ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30// (no streaming types in baseline)
31
32/// Helper function to count todos by status
33fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
34    let mut pending = 0;
35    let mut in_progress = 0;
36    let mut completed = 0;
37
38    for todo in todos {
39        match todo.status {
40            agents_core::state::TodoStatus::Pending => pending += 1,
41            agents_core::state::TodoStatus::InProgress => in_progress += 1,
42            agents_core::state::TodoStatus::Completed => completed += 1,
43        }
44    }
45
46    (pending, in_progress, completed)
47}
48
49/// Core Deep Agent runtime implementation
50///
51/// This struct contains all the runtime state and behavior for a Deep Agent,
52/// including middleware management, tool execution, HITL support, and state persistence.
53pub struct DeepAgent {
54    descriptor: AgentDescriptor,
55    instructions: String,
56    planner: Arc<dyn PlannerHandle>,
57    middlewares: Vec<Arc<dyn AgentMiddleware>>,
58    base_tools: Vec<ToolBox>,
59    state: Arc<RwLock<AgentStateSnapshot>>,
60    history: Arc<RwLock<Vec<AgentMessage>>>,
61    _summarization: Option<Arc<SummarizationMiddleware>>,
62    _hitl: Option<Arc<HumanInLoopMiddleware>>,
63    builtin_tools: Option<HashSet<String>>,
64    checkpointer: Option<Arc<dyn Checkpointer>>,
65    event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
66    enable_pii_sanitization: bool,
67}
68
69impl DeepAgent {
70    fn collect_tools(&self) -> HashMap<String, ToolBox> {
71        let mut tools: HashMap<String, ToolBox> = HashMap::new();
72        for tool in &self.base_tools {
73            tools.insert(tool.schema().name.clone(), tool.clone());
74        }
75        for middleware in &self.middlewares {
76            for tool in middleware.tools() {
77                let tool_name = tool.schema().name.clone();
78                if self.should_include(&tool_name) {
79                    tools.insert(tool_name, tool);
80                }
81            }
82        }
83        tools
84    }
85    // no streaming path in baseline
86
87    fn should_include(&self, name: &str) -> bool {
88        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
89        if !is_builtin {
90            return true;
91        }
92        match &self.builtin_tools {
93            None => true,
94            Some(selected) => selected.contains(name),
95        }
96    }
97
98    fn append_history(&self, message: AgentMessage) {
99        if let Ok(mut history) = self.history.write() {
100            history.push(message);
101        }
102    }
103
104    fn current_history(&self) -> Vec<AgentMessage> {
105        self.history.read().map(|h| h.clone()).unwrap_or_default()
106    }
107
108    fn emit_event(&self, event: agents_core::events::AgentEvent) {
109        if let Some(dispatcher) = &self.event_dispatcher {
110            let dispatcher_clone = dispatcher.clone();
111            tokio::spawn(async move {
112                dispatcher_clone.dispatch(event).await;
113            });
114        }
115    }
116
117    fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
118        agents_core::events::EventMetadata::new(
119            "default".to_string(),
120            uuid::Uuid::new_v4().to_string(),
121            None,
122        )
123    }
124
125    fn truncate_message(&self, message: &AgentMessage) -> String {
126        let text = match &message.content {
127            MessageContent::Text(t) => t.clone(),
128            MessageContent::Json(v) => v.to_string(),
129        };
130
131        if self.enable_pii_sanitization {
132            agents_core::security::safe_preview(&text, agents_core::security::MAX_PREVIEW_LENGTH)
133        } else {
134            // No sanitization - just truncate
135            agents_core::security::truncate_string(&text, agents_core::security::MAX_PREVIEW_LENGTH)
136        }
137    }
138
139    fn get_full_message_text(&self, message: &AgentMessage) -> String {
140        match &message.content {
141            MessageContent::Text(t) => t.clone(),
142            MessageContent::Json(v) => v.to_string(),
143        }
144    }
145
146    fn summarize_payload(&self, payload: &Value) -> String {
147        if self.enable_pii_sanitization {
148            agents_core::security::sanitize_tool_payload(
149                payload,
150                agents_core::security::MAX_PREVIEW_LENGTH,
151            )
152        } else {
153            // No sanitization - just truncate JSON string
154            let json_str = payload.to_string();
155            agents_core::security::truncate_string(
156                &json_str,
157                agents_core::security::MAX_PREVIEW_LENGTH,
158            )
159        }
160    }
161
162    /// Save the current agent state to the configured checkpointer.
163    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
164        if let Some(ref checkpointer) = self.checkpointer {
165            let state = self
166                .state
167                .read()
168                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
169                .clone();
170
171            // Calculate state size before saving
172            let state_json = serde_json::to_string(&state)?;
173            let state_size = state_json.len();
174
175            // Save state to checkpointer
176            checkpointer.save_state(thread_id, &state).await?;
177
178            // Emit StateCheckpointed event after successful save
179            self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
180                agents_core::events::StateCheckpointedEvent {
181                    metadata: self.create_event_metadata(),
182                    checkpoint_id: thread_id.to_string(),
183                    state_size_bytes: state_size,
184                },
185            ));
186
187            tracing::debug!(
188                thread_id = %thread_id,
189                state_size_bytes = state_size,
190                "πŸ’Ύ State checkpointed and event emitted"
191            );
192
193            Ok(())
194        } else {
195            tracing::warn!("Attempted to save state but no checkpointer is configured");
196            Ok(())
197        }
198    }
199
200    /// Load agent state from the configured checkpointer.
201    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
202        if let Some(ref checkpointer) = self.checkpointer {
203            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
204                *self
205                    .state
206                    .write()
207                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
208                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
209                Ok(true)
210            } else {
211                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
212                Ok(false)
213            }
214        } else {
215            tracing::warn!("Attempted to load state but no checkpointer is configured");
216            Ok(false)
217        }
218    }
219
220    /// Delete saved state for the specified thread.
221    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
222        if let Some(ref checkpointer) = self.checkpointer {
223            checkpointer.delete_thread(thread_id).await
224        } else {
225            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
226            Ok(())
227        }
228    }
229
230    /// List all threads with saved state.
231    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
232        if let Some(ref checkpointer) = self.checkpointer {
233            checkpointer.list_threads().await
234        } else {
235            Ok(Vec::new())
236        }
237    }
238
239    async fn execute_tool(
240        &self,
241        tool: ToolBox,
242        _tool_name: String,
243        payload: Value,
244    ) -> anyhow::Result<AgentMessage> {
245        let state_snapshot = self.state.read().unwrap().clone();
246        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
247
248        let result = tool.execute(payload, ctx).await?;
249        Ok(self.apply_tool_result(result))
250    }
251
252    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
253        match result {
254            ToolResult::Message(message) => {
255                // Tool results are not added to conversation history
256                // Only the final LLM response after tool execution is added
257                message
258            }
259            ToolResult::WithStateUpdate {
260                message,
261                state_diff,
262            } => {
263                // Check if todos were updated
264                let todos_updated = state_diff.todos.is_some();
265
266                if let Ok(mut state) = self.state.write() {
267                    let command = agents_core::command::Command::with_state(state_diff);
268                    command.apply_to(&mut state);
269
270                    // Emit TodosUpdated event if todos were modified
271                    if todos_updated {
272                        let (pending_count, in_progress_count, completed_count) =
273                            count_todos(&state.todos);
274
275                        self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
276                            agents_core::events::TodosUpdatedEvent {
277                                metadata: self.create_event_metadata(),
278                                todos: state.todos.clone(),
279                                pending_count,
280                                in_progress_count,
281                                completed_count,
282                                last_updated: chrono::Utc::now().to_rfc3339(),
283                            },
284                        ));
285
286                        tracing::debug!(
287                            pending = pending_count,
288                            in_progress = in_progress_count,
289                            completed = completed_count,
290                            total = state.todos.len(),
291                            "πŸ“ Todos updated and event emitted"
292                        );
293                    }
294                }
295                // Tool results are not added to conversation history
296                // Only the final LLM response after tool execution is added
297                message
298            }
299        }
300    }
301
302    /// Get the current pending interrupt, if any.
303    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
304        self.state
305            .read()
306            .ok()
307            .and_then(|guard| guard.pending_interrupts.first().cloned())
308    }
309
310    /// Add a broadcaster dynamically to the agent's event dispatcher.
311    ///
312    /// Add a single broadcaster dynamically after the agent is built.
313    ///
314    /// This is useful for per-conversation or per-customer broadcasters.
315    ///
316    /// # Example
317    /// ```no_run
318    /// use std::sync::Arc;
319    /// // agent.add_broadcaster(Arc::new(MyBroadcaster::new()));
320    /// ```
321    pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
322        if let Some(dispatcher) = &self.event_dispatcher {
323            dispatcher.add_broadcaster(broadcaster);
324            tracing::debug!("Broadcaster added to event dispatcher");
325        } else {
326            tracing::warn!("add_broadcaster called but no event dispatcher configured");
327        }
328    }
329
330    /// Add multiple broadcasters at once.
331    ///
332    /// This is useful when you need to add several broadcasters for a conversation
333    /// (e.g., WhatsApp, SSE, DynamoDB).
334    ///
335    /// # Example
336    /// ```no_run
337    /// use std::sync::Arc;
338    /// // agent.add_broadcasters(vec![
339    /// //     Arc::new(WhatsAppBroadcaster::new(phone)),
340    /// //     Arc::new(SseBroadcaster::new(channel)),
341    /// //     Arc::new(DynamoDbBroadcaster::new(table)),
342    /// // ]);
343    /// ```
344    pub fn add_broadcasters(
345        &self,
346        broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
347    ) {
348        if let Some(dispatcher) = &self.event_dispatcher {
349            for broadcaster in broadcasters {
350                dispatcher.add_broadcaster(broadcaster);
351            }
352            tracing::debug!("Multiple broadcasters added to event dispatcher");
353        } else {
354            tracing::warn!("add_broadcasters called but no event dispatcher configured");
355        }
356    }
357
358    /// Resume execution after human approval of an interrupt.
359    pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
360        // Get the first pending interrupt
361        let interrupt = {
362            let state_guard = self
363                .state
364                .read()
365                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
366            state_guard
367                .pending_interrupts
368                .first()
369                .cloned()
370                .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
371        };
372
373        let result_message = match action {
374            HitlAction::Accept => {
375                // Execute with original args
376                let AgentInterrupt::HumanInLoop(hitl) = interrupt;
377                tracing::info!(
378                    tool_name = %hitl.tool_name,
379                    call_id = %hitl.call_id,
380                    "βœ… HITL: Tool approved, executing with original arguments"
381                );
382
383                let tools = self.collect_tools();
384                let tool = tools
385                    .get(&hitl.tool_name)
386                    .cloned()
387                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
388
389                self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
390                    .await?
391            }
392
393            HitlAction::Edit {
394                tool_name,
395                tool_args,
396            } => {
397                // Execute with modified args
398                tracing::info!(
399                    tool_name = %tool_name,
400                    "✏️ HITL: Tool edited, executing with modified arguments"
401                );
402
403                let tools = self.collect_tools();
404                let tool = tools
405                    .get(&tool_name)
406                    .cloned()
407                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
408
409                self.execute_tool(tool, tool_name, tool_args).await?
410            }
411
412            HitlAction::Reject { reason } => {
413                // Don't execute - return rejection message
414                tracing::info!("❌ HITL: Tool rejected");
415
416                let text = reason
417                    .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
418
419                let message = AgentMessage {
420                    role: MessageRole::Tool,
421                    content: MessageContent::Text(text),
422                    metadata: None,
423                };
424
425                self.append_history(message.clone());
426                message
427            }
428
429            HitlAction::Respond { message } => {
430                // Don't execute - return custom message
431                tracing::info!("πŸ’¬ HITL: Custom response provided");
432
433                self.append_history(message.clone());
434                message
435            }
436        };
437
438        // Clear the interrupt from state
439        {
440            let mut state_guard = self
441                .state
442                .write()
443                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
444            state_guard.clear_interrupts();
445        }
446
447        // Persist cleared state
448        if let Some(checkpointer) = &self.checkpointer {
449            let state_clone = self
450                .state
451                .read()
452                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
453                .clone();
454            checkpointer
455                .save_state(&ThreadId::default(), &state_clone)
456                .await?;
457        }
458
459        Ok(result_message)
460    }
461
462    /// Handle message from string input - converts string to AgentMessage internally
463    pub async fn handle_message(
464        &self,
465        input: impl AsRef<str>,
466        state: Arc<AgentStateSnapshot>,
467    ) -> anyhow::Result<AgentMessage> {
468        self.handle_message_with_metadata(input, None, state).await
469    }
470
471    /// Handle message from string input with metadata - converts string to AgentMessage internally
472    pub async fn handle_message_with_metadata(
473        &self,
474        input: impl AsRef<str>,
475        metadata: Option<MessageMetadata>,
476        state: Arc<AgentStateSnapshot>,
477    ) -> anyhow::Result<AgentMessage> {
478        let agent_message = AgentMessage {
479            role: MessageRole::User,
480            content: MessageContent::Text(input.as_ref().to_string()),
481            metadata,
482        };
483        self.handle_message_internal(agent_message, state).await
484    }
485
486    /// Internal method that contains the actual message handling logic
487    async fn handle_message_internal(
488        &self,
489        input: AgentMessage,
490        _state: Arc<AgentStateSnapshot>,
491    ) -> anyhow::Result<AgentMessage> {
492        let start_time = std::time::Instant::now();
493
494        self.emit_event(agents_core::events::AgentEvent::AgentStarted(
495            agents_core::events::AgentStartedEvent {
496                metadata: self.create_event_metadata(),
497                agent_name: self.descriptor.name.clone(),
498                message_preview: self.truncate_message(&input),
499            },
500        ));
501
502        self.append_history(input.clone());
503
504        // ReAct loop: continue until LLM responds with text (not tool calls)
505        let max_iterations = 10;
506        let mut iteration = 0;
507
508        loop {
509            iteration += 1;
510            if iteration > max_iterations {
511                tracing::warn!(
512                    "⚠️ Max iterations ({}) reached, stopping ReAct loop",
513                    max_iterations
514                );
515                let response = AgentMessage {
516                    role: MessageRole::Agent,
517                    content: MessageContent::Text(
518                        "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
519                    ),
520                    metadata: None,
521                };
522                self.append_history(response.clone());
523                return Ok(response);
524            }
525
526            tracing::debug!("πŸ”„ ReAct iteration {}/{}", iteration, max_iterations);
527
528            // Build request with current history
529            let mut request = ModelRequest::new(&self.instructions, self.current_history());
530            let tools = self.collect_tools();
531            for middleware in &self.middlewares {
532                let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
533                middleware.modify_model_request(&mut ctx).await?;
534            }
535
536            let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
537            let context = PlannerContext {
538                history: request.messages.clone(),
539                system_prompt: request.system_prompt.clone(),
540                tools: tool_schemas,
541            };
542            let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
543
544            // Ask LLM what to do
545            let decision = self.planner.plan(context, state_snapshot).await?;
546
547            // Emit PlanningComplete event
548            self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
549                agents_core::events::PlanningCompleteEvent {
550                    metadata: self.create_event_metadata(),
551                    action_type: match &decision.next_action {
552                        PlannerAction::Respond { .. } => "respond".to_string(),
553                        PlannerAction::CallTool { .. } => "call_tool".to_string(),
554                        PlannerAction::Terminate => "terminate".to_string(),
555                    },
556                    action_summary: match &decision.next_action {
557                        PlannerAction::Respond { message } => {
558                            format!("Respond: {}", self.truncate_message(message))
559                        }
560                        PlannerAction::CallTool { tool_name, .. } => {
561                            format!("Call tool: {}", tool_name)
562                        }
563                        PlannerAction::Terminate => "Terminate".to_string(),
564                    },
565                },
566            ));
567
568            match decision.next_action {
569                PlannerAction::Respond { message } => {
570                    // LLM decided to respond with text - exit loop
571                    self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
572                        agents_core::events::AgentCompletedEvent {
573                            metadata: self.create_event_metadata(),
574                            agent_name: self.descriptor.name.clone(),
575                            duration_ms: start_time.elapsed().as_millis() as u64,
576                            response_preview: self.truncate_message(&message),
577                            response: self.get_full_message_text(&message),
578                        },
579                    ));
580
581                    self.append_history(message.clone());
582                    return Ok(message);
583                }
584                PlannerAction::CallTool { tool_name, payload } => {
585                    // Add AI's decision to call tool to history
586                    // This is needed for OpenAI's API which expects:
587                    // 1. Assistant message with tool call
588                    // 2. Tool message with result
589                    let tool_call_message = AgentMessage {
590                        role: MessageRole::Agent,
591                        content: MessageContent::Text(format!(
592                            "Calling tool: {} with args: {}",
593                            tool_name,
594                            serde_json::to_string(&payload).unwrap_or_default()
595                        )),
596                        metadata: None,
597                    };
598                    self.append_history(tool_call_message);
599
600                    if let Some(tool) = tools.get(&tool_name).cloned() {
601                        // Check all middleware for interrupts before executing tool
602                        let call_id = format!("call_{}", uuid::Uuid::new_v4());
603                        for middleware in &self.middlewares {
604                            if let Some(interrupt) = middleware
605                                .before_tool_execution(&tool_name, &payload, &call_id)
606                                .await?
607                            {
608                                // Save interrupt to state
609                                {
610                                    let mut state_guard = self.state.write().map_err(|_| {
611                                        anyhow::anyhow!("Failed to acquire write lock on state")
612                                    })?;
613                                    state_guard.add_interrupt(interrupt.clone());
614                                }
615
616                                // Persist state with checkpointer
617                                if let Some(checkpointer) = &self.checkpointer {
618                                    let state_clone = self
619                                        .state
620                                        .read()
621                                        .map_err(|_| {
622                                            anyhow::anyhow!("Failed to acquire read lock on state")
623                                        })?
624                                        .clone();
625                                    checkpointer
626                                        .save_state(&ThreadId::default(), &state_clone)
627                                        .await?;
628                                }
629
630                                // Return interrupt message - execution pauses here
631                                let interrupt_message = AgentMessage {
632                                    role: MessageRole::System,
633                                    content: MessageContent::Text(format!(
634                                        "⏸️ Execution paused: Tool '{}' requires human approval",
635                                        tool_name
636                                    )),
637                                    metadata: None,
638                                };
639                                self.append_history(interrupt_message.clone());
640                                return Ok(interrupt_message);
641                            }
642                        }
643
644                        // No interrupt - execute tool
645                        let tool_start_time = std::time::Instant::now();
646
647                        self.emit_event(agents_core::events::AgentEvent::ToolStarted(
648                            agents_core::events::ToolStartedEvent {
649                                metadata: self.create_event_metadata(),
650                                tool_name: tool_name.clone(),
651                                input_summary: self.summarize_payload(&payload),
652                            },
653                        ));
654
655                        tracing::warn!(
656                            "βš™οΈ EXECUTING TOOL: {} with payload: {}",
657                            tool_name,
658                            serde_json::to_string(&payload)
659                                .unwrap_or_else(|_| "invalid json".to_string())
660                        );
661
662                        let result = self
663                            .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
664                            .await;
665
666                        let duration = tool_start_time.elapsed();
667                        match result {
668                            Ok(tool_result_message) => {
669                                let content_preview = match &tool_result_message.content {
670                                    MessageContent::Text(t) => {
671                                        if t.len() > 100 {
672                                            format!("{}... ({} chars)", &t[..100], t.len())
673                                        } else {
674                                            t.clone()
675                                        }
676                                    }
677                                    MessageContent::Json(v) => {
678                                        format!("JSON: {} bytes", v.to_string().len())
679                                    }
680                                };
681
682                                self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
683                                    agents_core::events::ToolCompletedEvent {
684                                        metadata: self.create_event_metadata(),
685                                        tool_name: tool_name.clone(),
686                                        duration_ms: duration.as_millis() as u64,
687                                        result_summary: content_preview.clone(),
688                                        success: true,
689                                    },
690                                ));
691
692                                tracing::warn!(
693                                    "βœ… TOOL COMPLETED: {} in {:?} - Result: {}",
694                                    tool_name,
695                                    duration,
696                                    content_preview
697                                );
698
699                                // Add tool result to history and continue ReAct loop
700                                self.append_history(tool_result_message);
701                                // Loop continues - LLM will see tool result and decide next action
702                            }
703                            Err(e) => {
704                                self.emit_event(agents_core::events::AgentEvent::ToolFailed(
705                                    agents_core::events::ToolFailedEvent {
706                                        metadata: self.create_event_metadata(),
707                                        tool_name: tool_name.clone(),
708                                        duration_ms: duration.as_millis() as u64,
709                                        error_message: e.to_string(),
710                                        is_recoverable: true,
711                                        retry_count: 0,
712                                    },
713                                ));
714
715                                tracing::error!(
716                                    "❌ TOOL FAILED: {} in {:?} - Error: {}",
717                                    tool_name,
718                                    duration,
719                                    e
720                                );
721
722                                // Add error to history and continue - let LLM handle the error
723                                let error_message = AgentMessage {
724                                    role: MessageRole::Tool,
725                                    content: MessageContent::Text(format!(
726                                        "Error executing {}: {}",
727                                        tool_name, e
728                                    )),
729                                    metadata: None,
730                                };
731                                self.append_history(error_message);
732                                // Loop continues - LLM will see error and decide how to handle it
733                            }
734                        }
735                    } else {
736                        // Tool not found - add error to history and continue
737                        tracing::warn!("⚠️ Tool '{}' not found", tool_name);
738                        let error_message = AgentMessage {
739                            role: MessageRole::Tool,
740                            content: MessageContent::Text(format!(
741                                "Tool '{}' not found. Available tools: {}",
742                                tool_name,
743                                tools
744                                    .keys()
745                                    .map(|k| k.as_str())
746                                    .collect::<Vec<_>>()
747                                    .join(", ")
748                            )),
749                            metadata: None,
750                        };
751                        self.append_history(error_message);
752                        // Loop continues - LLM will see error and try something else
753                    }
754                }
755                PlannerAction::Terminate => {
756                    // LLM decided to terminate - exit loop
757                    tracing::debug!("πŸ›‘ Agent terminated");
758                    let message = AgentMessage {
759                        role: MessageRole::Agent,
760                        content: MessageContent::Text("Task completed.".into()),
761                        metadata: None,
762                    };
763                    self.append_history(message.clone());
764                    return Ok(message);
765                }
766            }
767        }
768    }
769}
770
771#[async_trait]
772impl AgentHandle for DeepAgent {
773    async fn describe(&self) -> AgentDescriptor {
774        self.descriptor.clone()
775    }
776
777    async fn handle_message(
778        &self,
779        input: AgentMessage,
780        _state: Arc<AgentStateSnapshot>,
781    ) -> anyhow::Result<AgentMessage> {
782        self.handle_message_internal(input, _state).await
783    }
784
785    async fn handle_message_stream(
786        &self,
787        input: AgentMessage,
788        _state: Arc<AgentStateSnapshot>,
789    ) -> anyhow::Result<agents_core::agent::AgentStream> {
790        use crate::planner::LlmBackedPlanner;
791        use agents_core::llm::{LlmRequest, StreamChunk};
792        use futures::StreamExt;
793
794        // Add input to history
795        self.append_history(input.clone());
796
797        // Build the request similar to handle_message_internal
798        let mut request = ModelRequest::new(&self.instructions, self.current_history());
799        let tools = self.collect_tools();
800
801        // Apply middleware modifications
802        for middleware in &self.middlewares {
803            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
804            middleware.modify_model_request(&mut ctx).await?;
805        }
806
807        // Convert ModelRequest to LlmRequest and add tools
808        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
809        let llm_request = LlmRequest {
810            system_prompt: request.system_prompt.clone(),
811            messages: request.messages.clone(),
812            tools: tool_schemas,
813        };
814
815        // Try to get the underlying LLM model for streaming
816        let planner_any = self.planner.as_any();
817
818        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
819            // We have an LlmBackedPlanner, use its model for streaming
820            let model = llm_planner.model().clone();
821            let stream = model.generate_stream(llm_request).await?;
822
823            // Wrap stream to emit events to broadcasters
824            let agent_name = self.descriptor.name.clone();
825            let event_dispatcher = self.event_dispatcher.clone();
826
827            let wrapped_stream = stream.then(move |chunk_result| {
828                let dispatcher = event_dispatcher.clone();
829                let name = agent_name.clone();
830
831                async move {
832                    match &chunk_result {
833                        Ok(StreamChunk::TextDelta(token)) => {
834                            // Emit streaming token event
835                            if let Some(ref dispatcher) = dispatcher {
836                                let event = agents_core::events::AgentEvent::StreamingToken(
837                                    agents_core::events::StreamingTokenEvent {
838                                        metadata: agents_core::events::EventMetadata::new(
839                                            "default".to_string(),
840                                            uuid::Uuid::new_v4().to_string(),
841                                            None,
842                                        ),
843                                        agent_name: name.clone(),
844                                        token: token.clone(),
845                                    },
846                                );
847                                dispatcher.dispatch(event).await;
848                            }
849                        }
850                        Ok(StreamChunk::Done { message }) => {
851                            // Emit agent completed event
852                            if let Some(ref dispatcher) = dispatcher {
853                                let full_text = match &message.content {
854                                    agents_core::messaging::MessageContent::Text(t) => t.clone(),
855                                    agents_core::messaging::MessageContent::Json(v) => {
856                                        v.to_string()
857                                    }
858                                };
859
860                                let preview = if full_text.len() > 100 {
861                                    format!("{}...", &full_text[..100])
862                                } else {
863                                    full_text.clone()
864                                };
865
866                                let event = agents_core::events::AgentEvent::AgentCompleted(
867                                    agents_core::events::AgentCompletedEvent {
868                                        metadata: agents_core::events::EventMetadata::new(
869                                            "default".to_string(),
870                                            uuid::Uuid::new_v4().to_string(),
871                                            None,
872                                        ),
873                                        agent_name: name.clone(),
874                                        duration_ms: 0, // Duration not tracked in streaming mode
875                                        response_preview: preview,
876                                        response: full_text,
877                                    },
878                                );
879                                dispatcher.dispatch(event).await;
880                            }
881                        }
882                        _ => {}
883                    }
884                    chunk_result
885                }
886            });
887
888            Ok(Box::pin(wrapped_stream))
889        } else {
890            // Fallback to non-streaming
891            let response = self.handle_message_internal(input, _state).await?;
892            Ok(Box::pin(futures::stream::once(async move {
893                Ok(StreamChunk::Done { message: response })
894            })))
895        }
896    }
897
898    async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
899        let state_guard = self
900            .state
901            .read()
902            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
903        Ok(state_guard.pending_interrupts.first().cloned())
904    }
905
906    async fn resume_with_approval(
907        &self,
908        action: agents_core::hitl::HitlAction,
909    ) -> anyhow::Result<AgentMessage> {
910        self.resume_with_approval(action).await
911    }
912}
913
914/// Create a deep agent from configuration - matches Python middleware assembly exactly
915///
916/// This function assembles the middleware stack in the same order as the Python SDK:
917/// planning β†’ filesystem β†’ subagents β†’ summarization β†’ prompt caching β†’ optional HITL
918pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
919    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
920    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
921
922    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
923    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
924
925    // Build sub-agents from configurations
926    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
927
928    // Build custom sub-agents from configs
929    tracing::info!(
930        "πŸ“‹ Processing {} sub-agent configurations",
931        config.subagent_configs.len()
932    );
933
934    for subagent_config in &config.subagent_configs {
935        tracing::info!(
936            "πŸ—οΈ Building sub-agent: {} ({})",
937            subagent_config.name,
938            subagent_config.description
939        );
940
941        // Determine the planner for this sub-agent
942        let sub_planner = if let Some(ref model) = subagent_config.model {
943            // Sub-agent has its own model - wrap it in a planner
944            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
945        } else {
946            // Inherit parent's planner
947            config.planner.clone()
948        };
949
950        // Create a DeepAgentConfig for this sub-agent
951        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
952
953        // Configure tools
954        if let Some(ref tools) = subagent_config.tools {
955            tracing::debug!(
956                "  - Configuring {} tools for {}",
957                tools.len(),
958                subagent_config.name
959            );
960            for tool in tools {
961                sub_cfg = sub_cfg.with_tool(tool.clone());
962            }
963        }
964
965        // Configure built-in tools
966        if let Some(ref builtin) = subagent_config.builtin_tools {
967            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
968        }
969
970        // Sub-agents should not have their own sub-agents
971        sub_cfg = sub_cfg.with_auto_general_purpose(false);
972
973        // Configure prompt caching
974        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
975
976        // Inherit PII sanitization setting from parent
977        sub_cfg = sub_cfg.with_pii_sanitization(config.enable_pii_sanitization);
978
979        // Build the sub-agent recursively
980        let sub_agent = create_deep_agent_from_config(sub_cfg);
981
982        // Register the sub-agent
983        registrations.push(SubAgentRegistration {
984            descriptor: SubAgentDescriptor {
985                name: subagent_config.name.clone(),
986                description: subagent_config.description.clone(),
987            },
988            agent: Arc::new(sub_agent),
989        });
990
991        tracing::info!("=> Registered sub-agent: {}", subagent_config.name);
992    }
993
994    tracing::info!("=> Total sub-agents registered: {}", registrations.len());
995
996    // Optionally inject a general-purpose subagent
997    if config.auto_general_purpose {
998        let has_gp = registrations
999            .iter()
1000            .any(|r| r.descriptor.name == "general-purpose");
1001        if !has_gp {
1002            // Create a subagent with inherited planner/tools and same instructions
1003            let mut sub_cfg =
1004                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
1005                    .with_auto_general_purpose(false)
1006                    .with_prompt_caching(config.enable_prompt_caching)
1007                    .with_pii_sanitization(config.enable_pii_sanitization);
1008            if let Some(ref selected) = config.builtin_tools {
1009                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
1010            }
1011            if let Some(ref sum) = config.summarization {
1012                sub_cfg = sub_cfg.with_summarization(sum.clone());
1013            }
1014            for t in &config.tools {
1015                sub_cfg = sub_cfg.with_tool(t.clone());
1016            }
1017
1018            let gp = create_deep_agent_from_config(sub_cfg);
1019            registrations.push(SubAgentRegistration {
1020                descriptor: SubAgentDescriptor {
1021                    name: "general-purpose".into(),
1022                    description: "Default reasoning agent".into(),
1023                },
1024                agent: Arc::new(gp),
1025            });
1026        }
1027    }
1028
1029    let subagent = Arc::new(SubAgentMiddleware::new_with_events(
1030        registrations,
1031        config.event_dispatcher.clone(),
1032    ));
1033    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
1034    let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
1035    let summarization = config.summarization.as_ref().map(|cfg| {
1036        Arc::new(SummarizationMiddleware::new(
1037            cfg.messages_to_keep,
1038            cfg.summary_note.clone(),
1039        ))
1040    });
1041    let hitl = if config.tool_interrupts.is_empty() {
1042        None
1043    } else {
1044        // Validate that checkpointer is configured when HITL is enabled
1045        if config.checkpointer.is_none() {
1046            tracing::error!(
1047                "⚠️ HITL middleware requires a checkpointer to persist interrupt state. \
1048                 HITL will be disabled. Please configure a checkpointer to enable HITL."
1049            );
1050            None
1051        } else {
1052            tracing::info!("πŸ”’ HITL enabled for {} tools", config.tool_interrupts.len());
1053            Some(Arc::new(HumanInLoopMiddleware::new(
1054                config.tool_interrupts.clone(),
1055            )))
1056        }
1057    };
1058
1059    // Assemble middleware stack with Deep Agent prompt for automatic tool usage
1060    // Order: base β†’ deep agent prompt β†’ planning β†’ filesystem β†’ subagents β†’ summarization β†’ caching β†’ HITL
1061    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
1062        base_prompt,
1063        deep_agent_prompt,
1064        planning,
1065        filesystem,
1066        subagent,
1067    ];
1068    if let Some(ref summary) = summarization {
1069        middlewares.push(summary.clone());
1070    }
1071    if config.enable_prompt_caching {
1072        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
1073    }
1074    if let Some(ref hitl_mw) = hitl {
1075        middlewares.push(hitl_mw.clone());
1076    }
1077
1078    DeepAgent {
1079        descriptor: AgentDescriptor {
1080            name: "deep-agent".into(),
1081            version: "0.0.1".into(),
1082            description: Some("Rust deep agent".into()),
1083        },
1084        instructions: config.instructions,
1085        planner: config.planner,
1086        middlewares,
1087        base_tools: config.tools,
1088        state,
1089        history,
1090        _summarization: summarization,
1091        _hitl: hitl,
1092        builtin_tools: config.builtin_tools,
1093        checkpointer: config.checkpointer,
1094        event_dispatcher: config.event_dispatcher,
1095        enable_pii_sanitization: config.enable_pii_sanitization,
1096    }
1097}