agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10    ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::num::NonZeroUsize;
26use std::sync::{Arc, RwLock};
27
28// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
29const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
30
31// (no streaming types in baseline)
32
33/// Helper function to count todos by status
34fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
35    let mut pending = 0;
36    let mut in_progress = 0;
37    let mut completed = 0;
38
39    for todo in todos {
40        match todo.status {
41            agents_core::state::TodoStatus::Pending => pending += 1,
42            agents_core::state::TodoStatus::InProgress => in_progress += 1,
43            agents_core::state::TodoStatus::Completed => completed += 1,
44        }
45    }
46
47    (pending, in_progress, completed)
48}
49
50/// Core Deep Agent runtime implementation
51///
52/// This struct contains all the runtime state and behavior for a Deep Agent,
53/// including middleware management, tool execution, HITL support, and state persistence.
54pub struct DeepAgent {
55    descriptor: AgentDescriptor,
56    instructions: String,
57    planner: Arc<dyn PlannerHandle>,
58    middlewares: Vec<Arc<dyn AgentMiddleware>>,
59    base_tools: Vec<ToolBox>,
60    state: Arc<RwLock<AgentStateSnapshot>>,
61    history: Arc<RwLock<Vec<AgentMessage>>>,
62    _summarization: Option<Arc<SummarizationMiddleware>>,
63    _hitl: Option<Arc<HumanInLoopMiddleware>>,
64    builtin_tools: Option<HashSet<String>>,
65    checkpointer: Option<Arc<dyn Checkpointer>>,
66    event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
67    enable_pii_sanitization: bool,
68    max_iterations: NonZeroUsize,
69}
70
71impl DeepAgent {
72    fn collect_tools(&self) -> HashMap<String, ToolBox> {
73        let mut tools: HashMap<String, ToolBox> = HashMap::new();
74        for tool in &self.base_tools {
75            tools.insert(tool.schema().name.clone(), tool.clone());
76        }
77        for middleware in &self.middlewares {
78            for tool in middleware.tools() {
79                let tool_name = tool.schema().name.clone();
80                if self.should_include(&tool_name) {
81                    tools.insert(tool_name, tool);
82                }
83            }
84        }
85        tools
86    }
87    // no streaming path in baseline
88
89    fn should_include(&self, name: &str) -> bool {
90        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
91        if !is_builtin {
92            return true;
93        }
94        match &self.builtin_tools {
95            None => true,
96            Some(selected) => selected.contains(name),
97        }
98    }
99
100    fn append_history(&self, message: AgentMessage) {
101        if let Ok(mut history) = self.history.write() {
102            history.push(message);
103        }
104    }
105
106    fn current_history(&self) -> Vec<AgentMessage> {
107        self.history.read().map(|h| h.clone()).unwrap_or_default()
108    }
109
110    fn emit_event(&self, event: agents_core::events::AgentEvent) {
111        if let Some(dispatcher) = &self.event_dispatcher {
112            let dispatcher_clone = dispatcher.clone();
113            tokio::spawn(async move {
114                dispatcher_clone.dispatch(event).await;
115            });
116        }
117    }
118
119    fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
120        agents_core::events::EventMetadata::new(
121            "default".to_string(),
122            uuid::Uuid::new_v4().to_string(),
123            None,
124        )
125    }
126
127    fn truncate_message(&self, message: &AgentMessage) -> String {
128        let text = match &message.content {
129            MessageContent::Text(t) => t.clone(),
130            MessageContent::Json(v) => v.to_string(),
131        };
132
133        if self.enable_pii_sanitization {
134            agents_core::security::safe_preview(&text, agents_core::security::MAX_PREVIEW_LENGTH)
135        } else {
136            // No sanitization - just truncate
137            agents_core::security::truncate_string(&text, agents_core::security::MAX_PREVIEW_LENGTH)
138        }
139    }
140
141    fn get_full_message_text(&self, message: &AgentMessage) -> String {
142        match &message.content {
143            MessageContent::Text(t) => t.clone(),
144            MessageContent::Json(v) => v.to_string(),
145        }
146    }
147
148    fn summarize_payload(&self, payload: &Value) -> String {
149        if self.enable_pii_sanitization {
150            agents_core::security::sanitize_tool_payload(
151                payload,
152                agents_core::security::MAX_PREVIEW_LENGTH,
153            )
154        } else {
155            // No sanitization - just truncate JSON string
156            let json_str = payload.to_string();
157            agents_core::security::truncate_string(
158                &json_str,
159                agents_core::security::MAX_PREVIEW_LENGTH,
160            )
161        }
162    }
163
164    /// Save the current agent state to the configured checkpointer.
165    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
166        if let Some(ref checkpointer) = self.checkpointer {
167            let state = self
168                .state
169                .read()
170                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
171                .clone();
172
173            // Calculate state size before saving
174            let state_json = serde_json::to_string(&state)?;
175            let state_size = state_json.len();
176
177            // Save state to checkpointer
178            checkpointer.save_state(thread_id, &state).await?;
179
180            // Emit StateCheckpointed event after successful save
181            self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
182                agents_core::events::StateCheckpointedEvent {
183                    metadata: self.create_event_metadata(),
184                    checkpoint_id: thread_id.to_string(),
185                    state_size_bytes: state_size,
186                },
187            ));
188
189            tracing::debug!(
190                thread_id = %thread_id,
191                state_size_bytes = state_size,
192                "πŸ’Ύ State checkpointed and event emitted"
193            );
194
195            Ok(())
196        } else {
197            tracing::warn!("Attempted to save state but no checkpointer is configured");
198            Ok(())
199        }
200    }
201
202    /// Load agent state from the configured checkpointer.
203    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
204        if let Some(ref checkpointer) = self.checkpointer {
205            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
206                *self
207                    .state
208                    .write()
209                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
210                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
211                Ok(true)
212            } else {
213                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
214                Ok(false)
215            }
216        } else {
217            tracing::warn!("Attempted to load state but no checkpointer is configured");
218            Ok(false)
219        }
220    }
221
222    /// Delete saved state for the specified thread.
223    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
224        if let Some(ref checkpointer) = self.checkpointer {
225            checkpointer.delete_thread(thread_id).await
226        } else {
227            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
228            Ok(())
229        }
230    }
231
232    /// List all threads with saved state.
233    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
234        if let Some(ref checkpointer) = self.checkpointer {
235            checkpointer.list_threads().await
236        } else {
237            Ok(Vec::new())
238        }
239    }
240
241    async fn execute_tool(
242        &self,
243        tool: ToolBox,
244        _tool_name: String,
245        payload: Value,
246    ) -> anyhow::Result<AgentMessage> {
247        let state_snapshot = self.state.read().unwrap().clone();
248        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
249
250        let result = tool.execute(payload, ctx).await?;
251        Ok(self.apply_tool_result(result))
252    }
253
254    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
255        match result {
256            ToolResult::Message(message) => {
257                // Tool results are not added to conversation history
258                // Only the final LLM response after tool execution is added
259                message
260            }
261            ToolResult::WithStateUpdate {
262                message,
263                state_diff,
264            } => {
265                // Check if todos were updated
266                let todos_updated = state_diff.todos.is_some();
267
268                if let Ok(mut state) = self.state.write() {
269                    let command = agents_core::command::Command::with_state(state_diff);
270                    command.apply_to(&mut state);
271
272                    // Emit TodosUpdated event if todos were modified
273                    if todos_updated {
274                        let (pending_count, in_progress_count, completed_count) =
275                            count_todos(&state.todos);
276
277                        self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
278                            agents_core::events::TodosUpdatedEvent {
279                                metadata: self.create_event_metadata(),
280                                todos: state.todos.clone(),
281                                pending_count,
282                                in_progress_count,
283                                completed_count,
284                                last_updated: chrono::Utc::now().to_rfc3339(),
285                            },
286                        ));
287
288                        tracing::debug!(
289                            pending = pending_count,
290                            in_progress = in_progress_count,
291                            completed = completed_count,
292                            total = state.todos.len(),
293                            "πŸ“ Todos updated and event emitted"
294                        );
295                    }
296                }
297                // Tool results are not added to conversation history
298                // Only the final LLM response after tool execution is added
299                message
300            }
301        }
302    }
303
304    /// Get the current pending interrupt, if any.
305    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
306        self.state
307            .read()
308            .ok()
309            .and_then(|guard| guard.pending_interrupts.first().cloned())
310    }
311
312    /// Add a broadcaster dynamically to the agent's event dispatcher.
313    ///
314    /// Add a single broadcaster dynamically after the agent is built.
315    ///
316    /// This is useful for per-conversation or per-customer broadcasters.
317    ///
318    /// # Example
319    /// ```no_run
320    /// use std::sync::Arc;
321    /// // agent.add_broadcaster(Arc::new(MyBroadcaster::new()));
322    /// ```
323    pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
324        if let Some(dispatcher) = &self.event_dispatcher {
325            dispatcher.add_broadcaster(broadcaster);
326            tracing::debug!("Broadcaster added to event dispatcher");
327        } else {
328            tracing::warn!("add_broadcaster called but no event dispatcher configured");
329        }
330    }
331
332    /// Add multiple broadcasters at once.
333    ///
334    /// This is useful when you need to add several broadcasters for a conversation
335    /// (e.g., WhatsApp, SSE, DynamoDB).
336    ///
337    /// # Example
338    /// ```no_run
339    /// use std::sync::Arc;
340    /// // agent.add_broadcasters(vec![
341    /// //     Arc::new(WhatsAppBroadcaster::new(phone)),
342    /// //     Arc::new(SseBroadcaster::new(channel)),
343    /// //     Arc::new(DynamoDbBroadcaster::new(table)),
344    /// // ]);
345    /// ```
346    pub fn add_broadcasters(
347        &self,
348        broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
349    ) {
350        if let Some(dispatcher) = &self.event_dispatcher {
351            for broadcaster in broadcasters {
352                dispatcher.add_broadcaster(broadcaster);
353            }
354            tracing::debug!("Multiple broadcasters added to event dispatcher");
355        } else {
356            tracing::warn!("add_broadcasters called but no event dispatcher configured");
357        }
358    }
359
360    /// Resume execution after human approval of an interrupt.
361    pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
362        // Get the first pending interrupt
363        let interrupt = {
364            let state_guard = self
365                .state
366                .read()
367                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
368            state_guard
369                .pending_interrupts
370                .first()
371                .cloned()
372                .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
373        };
374
375        let result_message = match action {
376            HitlAction::Accept => {
377                // Execute with original args
378                let AgentInterrupt::HumanInLoop(hitl) = interrupt;
379                tracing::info!(
380                    tool_name = %hitl.tool_name,
381                    call_id = %hitl.call_id,
382                    "βœ… HITL: Tool approved, executing with original arguments"
383                );
384
385                let tools = self.collect_tools();
386                let tool = tools
387                    .get(&hitl.tool_name)
388                    .cloned()
389                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
390
391                self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
392                    .await?
393            }
394
395            HitlAction::Edit {
396                tool_name,
397                tool_args,
398            } => {
399                // Execute with modified args
400                tracing::info!(
401                    tool_name = %tool_name,
402                    "✏️ HITL: Tool edited, executing with modified arguments"
403                );
404
405                let tools = self.collect_tools();
406                let tool = tools
407                    .get(&tool_name)
408                    .cloned()
409                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
410
411                self.execute_tool(tool, tool_name, tool_args).await?
412            }
413
414            HitlAction::Reject { reason } => {
415                // Don't execute - return rejection message
416                tracing::info!("❌ HITL: Tool rejected");
417
418                let text = reason
419                    .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
420
421                let message = AgentMessage {
422                    role: MessageRole::Tool,
423                    content: MessageContent::Text(text),
424                    metadata: None,
425                };
426
427                self.append_history(message.clone());
428                message
429            }
430
431            HitlAction::Respond { message } => {
432                // Don't execute - return custom message
433                tracing::info!("πŸ’¬ HITL: Custom response provided");
434
435                self.append_history(message.clone());
436                message
437            }
438        };
439
440        // Clear the interrupt from state
441        {
442            let mut state_guard = self
443                .state
444                .write()
445                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
446            state_guard.clear_interrupts();
447        }
448
449        // Persist cleared state
450        if let Some(checkpointer) = &self.checkpointer {
451            let state_clone = self
452                .state
453                .read()
454                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
455                .clone();
456            checkpointer
457                .save_state(&ThreadId::default(), &state_clone)
458                .await?;
459        }
460
461        Ok(result_message)
462    }
463
464    /// Handle message from string input - converts string to AgentMessage internally
465    pub async fn handle_message(
466        &self,
467        input: impl AsRef<str>,
468        state: Arc<AgentStateSnapshot>,
469    ) -> anyhow::Result<AgentMessage> {
470        self.handle_message_with_metadata(input, None, state).await
471    }
472
473    /// Handle message from string input with metadata - converts string to AgentMessage internally
474    pub async fn handle_message_with_metadata(
475        &self,
476        input: impl AsRef<str>,
477        metadata: Option<MessageMetadata>,
478        state: Arc<AgentStateSnapshot>,
479    ) -> anyhow::Result<AgentMessage> {
480        let agent_message = AgentMessage {
481            role: MessageRole::User,
482            content: MessageContent::Text(input.as_ref().to_string()),
483            metadata,
484        };
485        self.handle_message_internal(agent_message, state).await
486    }
487
488    /// Internal method that contains the actual message handling logic
489    async fn handle_message_internal(
490        &self,
491        input: AgentMessage,
492        loaded_state: Arc<AgentStateSnapshot>,
493    ) -> anyhow::Result<AgentMessage> {
494        let start_time = std::time::Instant::now();
495
496        // Initialize internal state with loaded state from checkpointer
497        // This ensures conversation context is maintained across sessions
498        if let Ok(mut state_guard) = self.state.write() {
499            *state_guard = (*loaded_state).clone();
500        }
501
502        self.emit_event(agents_core::events::AgentEvent::AgentStarted(
503            agents_core::events::AgentStartedEvent {
504                metadata: self.create_event_metadata(),
505                agent_name: self.descriptor.name.clone(),
506                message_preview: self.truncate_message(&input),
507            },
508        ));
509
510        self.append_history(input.clone());
511
512        // ReAct loop: continue until LLM responds with text (not tool calls)
513        let max_iterations = self.max_iterations.get();
514        let mut iteration = 0;
515
516        loop {
517            iteration += 1;
518            if iteration > max_iterations {
519                tracing::warn!(
520                    "⚠️ Max iterations ({}) reached, stopping ReAct loop",
521                    max_iterations
522                );
523                let response = AgentMessage {
524                    role: MessageRole::Agent,
525                    content: MessageContent::Text(
526                        "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
527                    ),
528                    metadata: None,
529                };
530                self.append_history(response.clone());
531                return Ok(response);
532            }
533
534            tracing::debug!("πŸ”„ ReAct iteration {}/{}", iteration, max_iterations);
535
536            // Build request with current history
537            let mut request = ModelRequest::new(&self.instructions, self.current_history());
538            let tools = self.collect_tools();
539            for middleware in &self.middlewares {
540                let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
541                middleware.modify_model_request(&mut ctx).await?;
542            }
543
544            let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
545            let context = PlannerContext {
546                history: request.messages.clone(),
547                system_prompt: request.system_prompt.clone(),
548                tools: tool_schemas,
549            };
550            let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
551
552            // Ask LLM what to do
553            let decision = self.planner.plan(context, state_snapshot).await?;
554
555            // Emit PlanningComplete event
556            self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
557                agents_core::events::PlanningCompleteEvent {
558                    metadata: self.create_event_metadata(),
559                    action_type: match &decision.next_action {
560                        PlannerAction::Respond { .. } => "respond".to_string(),
561                        PlannerAction::CallTool { .. } => "call_tool".to_string(),
562                        PlannerAction::Terminate => "terminate".to_string(),
563                    },
564                    action_summary: match &decision.next_action {
565                        PlannerAction::Respond { message } => {
566                            format!("Respond: {}", self.truncate_message(message))
567                        }
568                        PlannerAction::CallTool { tool_name, .. } => {
569                            format!("Call tool: {}", tool_name)
570                        }
571                        PlannerAction::Terminate => "Terminate".to_string(),
572                    },
573                },
574            ));
575
576            match decision.next_action {
577                PlannerAction::Respond { message } => {
578                    // LLM decided to respond with text - exit loop
579                    self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
580                        agents_core::events::AgentCompletedEvent {
581                            metadata: self.create_event_metadata(),
582                            agent_name: self.descriptor.name.clone(),
583                            duration_ms: start_time.elapsed().as_millis() as u64,
584                            response_preview: self.truncate_message(&message),
585                            response: self.get_full_message_text(&message),
586                        },
587                    ));
588
589                    self.append_history(message.clone());
590                    return Ok(message);
591                }
592                PlannerAction::CallTool { tool_name, payload } => {
593                    // Add AI's decision to call tool to history
594                    // This is needed for OpenAI's API which expects:
595                    // 1. Assistant message with tool call
596                    // 2. Tool message with result
597                    let tool_call_message = AgentMessage {
598                        role: MessageRole::Agent,
599                        content: MessageContent::Text(format!(
600                            "Calling tool: {} with args: {}",
601                            tool_name,
602                            serde_json::to_string(&payload).unwrap_or_default()
603                        )),
604                        metadata: None,
605                    };
606                    self.append_history(tool_call_message);
607
608                    if let Some(tool) = tools.get(&tool_name).cloned() {
609                        // Check all middleware for interrupts before executing tool
610                        let call_id = format!("call_{}", uuid::Uuid::new_v4());
611                        for middleware in &self.middlewares {
612                            if let Some(interrupt) = middleware
613                                .before_tool_execution(&tool_name, &payload, &call_id)
614                                .await?
615                            {
616                                // Save interrupt to state
617                                {
618                                    let mut state_guard = self.state.write().map_err(|_| {
619                                        anyhow::anyhow!("Failed to acquire write lock on state")
620                                    })?;
621                                    state_guard.add_interrupt(interrupt.clone());
622                                }
623
624                                // Persist state with checkpointer
625                                if let Some(checkpointer) = &self.checkpointer {
626                                    let state_clone = self
627                                        .state
628                                        .read()
629                                        .map_err(|_| {
630                                            anyhow::anyhow!("Failed to acquire read lock on state")
631                                        })?
632                                        .clone();
633                                    checkpointer
634                                        .save_state(&ThreadId::default(), &state_clone)
635                                        .await?;
636                                }
637
638                                // Return interrupt message - execution pauses here
639                                let interrupt_message = AgentMessage {
640                                    role: MessageRole::System,
641                                    content: MessageContent::Text(format!(
642                                        "⏸️ Execution paused: Tool '{}' requires human approval",
643                                        tool_name
644                                    )),
645                                    metadata: None,
646                                };
647                                self.append_history(interrupt_message.clone());
648                                return Ok(interrupt_message);
649                            }
650                        }
651
652                        // No interrupt - execute tool
653                        let tool_start_time = std::time::Instant::now();
654
655                        self.emit_event(agents_core::events::AgentEvent::ToolStarted(
656                            agents_core::events::ToolStartedEvent {
657                                metadata: self.create_event_metadata(),
658                                tool_name: tool_name.clone(),
659                                input_summary: self.summarize_payload(&payload),
660                            },
661                        ));
662
663                        tracing::warn!(
664                            "βš™οΈ EXECUTING TOOL: {} with payload: {}",
665                            tool_name,
666                            serde_json::to_string(&payload)
667                                .unwrap_or_else(|_| "invalid json".to_string())
668                        );
669
670                        let result = self
671                            .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
672                            .await;
673
674                        let duration = tool_start_time.elapsed();
675                        match result {
676                            Ok(tool_result_message) => {
677                                let content_preview = match &tool_result_message.content {
678                                    MessageContent::Text(t) => {
679                                        if t.len() > 100 {
680                                            format!("{}... ({} chars)", &t[..100], t.len())
681                                        } else {
682                                            t.clone()
683                                        }
684                                    }
685                                    MessageContent::Json(v) => {
686                                        format!("JSON: {} bytes", v.to_string().len())
687                                    }
688                                };
689
690                                self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
691                                    agents_core::events::ToolCompletedEvent {
692                                        metadata: self.create_event_metadata(),
693                                        tool_name: tool_name.clone(),
694                                        duration_ms: duration.as_millis() as u64,
695                                        result_summary: content_preview.clone(),
696                                        success: true,
697                                    },
698                                ));
699
700                                tracing::warn!(
701                                    "βœ… TOOL COMPLETED: {} in {:?} - Result: {}",
702                                    tool_name,
703                                    duration,
704                                    content_preview
705                                );
706
707                                // Add tool result to history and continue ReAct loop
708                                self.append_history(tool_result_message);
709                                // Loop continues - LLM will see tool result and decide next action
710                            }
711                            Err(e) => {
712                                self.emit_event(agents_core::events::AgentEvent::ToolFailed(
713                                    agents_core::events::ToolFailedEvent {
714                                        metadata: self.create_event_metadata(),
715                                        tool_name: tool_name.clone(),
716                                        duration_ms: duration.as_millis() as u64,
717                                        error_message: e.to_string(),
718                                        is_recoverable: true,
719                                        retry_count: 0,
720                                    },
721                                ));
722
723                                tracing::error!(
724                                    "❌ TOOL FAILED: {} in {:?} - Error: {}",
725                                    tool_name,
726                                    duration,
727                                    e
728                                );
729
730                                // Add error to history and continue - let LLM handle the error
731                                let error_message = AgentMessage {
732                                    role: MessageRole::Tool,
733                                    content: MessageContent::Text(format!(
734                                        "Error executing {}: {}",
735                                        tool_name, e
736                                    )),
737                                    metadata: None,
738                                };
739                                self.append_history(error_message);
740                                // Loop continues - LLM will see error and decide how to handle it
741                            }
742                        }
743                    } else {
744                        // Tool not found - add error to history and continue
745                        tracing::warn!("⚠️ Tool '{}' not found", tool_name);
746                        let error_message = AgentMessage {
747                            role: MessageRole::Tool,
748                            content: MessageContent::Text(format!(
749                                "Tool '{}' not found. Available tools: {}",
750                                tool_name,
751                                tools
752                                    .keys()
753                                    .map(|k| k.as_str())
754                                    .collect::<Vec<_>>()
755                                    .join(", ")
756                            )),
757                            metadata: None,
758                        };
759                        self.append_history(error_message);
760                        // Loop continues - LLM will see error and try something else
761                    }
762                }
763                PlannerAction::Terminate => {
764                    // LLM decided to terminate - exit loop
765                    tracing::debug!("πŸ›‘ Agent terminated");
766                    let message = AgentMessage {
767                        role: MessageRole::Agent,
768                        content: MessageContent::Text("Task completed.".into()),
769                        metadata: None,
770                    };
771                    self.append_history(message.clone());
772                    return Ok(message);
773                }
774            }
775        }
776    }
777}
778
779#[async_trait]
780impl AgentHandle for DeepAgent {
781    async fn describe(&self) -> AgentDescriptor {
782        self.descriptor.clone()
783    }
784
785    async fn handle_message(
786        &self,
787        input: AgentMessage,
788        _state: Arc<AgentStateSnapshot>,
789    ) -> anyhow::Result<AgentMessage> {
790        let response = self.handle_message_internal(input, _state).await?;
791
792        // Persist state to checkpointer after successful message handling
793        if let Some(checkpointer) = &self.checkpointer {
794            let state_clone = self
795                .state
796                .read()
797                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
798                .clone();
799            checkpointer
800                .save_state(&ThreadId::default(), &state_clone)
801                .await?;
802        }
803
804        Ok(response)
805    }
806
807    async fn handle_message_stream(
808        &self,
809        input: AgentMessage,
810        _state: Arc<AgentStateSnapshot>,
811    ) -> anyhow::Result<agents_core::agent::AgentStream> {
812        use crate::planner::LlmBackedPlanner;
813        use agents_core::llm::{LlmRequest, StreamChunk};
814        use futures::StreamExt;
815
816        // Add input to history
817        self.append_history(input.clone());
818
819        // Build the request similar to handle_message_internal
820        let mut request = ModelRequest::new(&self.instructions, self.current_history());
821        let tools = self.collect_tools();
822
823        // Apply middleware modifications
824        for middleware in &self.middlewares {
825            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
826            middleware.modify_model_request(&mut ctx).await?;
827        }
828
829        // Convert ModelRequest to LlmRequest and add tools
830        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
831        let llm_request = LlmRequest {
832            system_prompt: request.system_prompt.clone(),
833            messages: request.messages.clone(),
834            tools: tool_schemas,
835        };
836
837        // Try to get the underlying LLM model for streaming
838        let planner_any = self.planner.as_any();
839
840        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
841            // We have an LlmBackedPlanner, use its model for streaming
842            let model = llm_planner.model().clone();
843            let stream = model.generate_stream(llm_request).await?;
844
845            // Wrap stream to emit events to broadcasters
846            let agent_name = self.descriptor.name.clone();
847            let event_dispatcher = self.event_dispatcher.clone();
848
849            let wrapped_stream = stream.then(move |chunk_result| {
850                let dispatcher = event_dispatcher.clone();
851                let name = agent_name.clone();
852
853                async move {
854                    match &chunk_result {
855                        Ok(StreamChunk::TextDelta(token)) => {
856                            // Emit streaming token event
857                            if let Some(ref dispatcher) = dispatcher {
858                                let event = agents_core::events::AgentEvent::StreamingToken(
859                                    agents_core::events::StreamingTokenEvent {
860                                        metadata: agents_core::events::EventMetadata::new(
861                                            "default".to_string(),
862                                            uuid::Uuid::new_v4().to_string(),
863                                            None,
864                                        ),
865                                        agent_name: name.clone(),
866                                        token: token.clone(),
867                                    },
868                                );
869                                dispatcher.dispatch(event).await;
870                            }
871                        }
872                        Ok(StreamChunk::Done { message }) => {
873                            // Emit agent completed event
874                            if let Some(ref dispatcher) = dispatcher {
875                                let full_text = match &message.content {
876                                    agents_core::messaging::MessageContent::Text(t) => t.clone(),
877                                    agents_core::messaging::MessageContent::Json(v) => {
878                                        v.to_string()
879                                    }
880                                };
881
882                                let preview = if full_text.len() > 100 {
883                                    format!("{}...", &full_text[..100])
884                                } else {
885                                    full_text.clone()
886                                };
887
888                                let event = agents_core::events::AgentEvent::AgentCompleted(
889                                    agents_core::events::AgentCompletedEvent {
890                                        metadata: agents_core::events::EventMetadata::new(
891                                            "default".to_string(),
892                                            uuid::Uuid::new_v4().to_string(),
893                                            None,
894                                        ),
895                                        agent_name: name.clone(),
896                                        duration_ms: 0, // Duration not tracked in streaming mode
897                                        response_preview: preview,
898                                        response: full_text,
899                                    },
900                                );
901                                dispatcher.dispatch(event).await;
902                            }
903                        }
904                        _ => {}
905                    }
906                    chunk_result
907                }
908            });
909
910            Ok(Box::pin(wrapped_stream))
911        } else {
912            // Fallback to non-streaming
913            let response = self.handle_message_internal(input, _state).await?;
914            Ok(Box::pin(futures::stream::once(async move {
915                Ok(StreamChunk::Done { message: response })
916            })))
917        }
918    }
919
920    async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
921        let state_guard = self
922            .state
923            .read()
924            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
925        Ok(state_guard.pending_interrupts.first().cloned())
926    }
927
928    async fn resume_with_approval(
929        &self,
930        action: agents_core::hitl::HitlAction,
931    ) -> anyhow::Result<AgentMessage> {
932        self.resume_with_approval(action).await
933    }
934}
935
936/// Create a deep agent from configuration - matches Python middleware assembly exactly
937///
938/// This function assembles the middleware stack in the same order as the Python SDK:
939/// planning β†’ filesystem β†’ subagents β†’ summarization β†’ prompt caching β†’ optional HITL
940pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
941    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
942    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
943
944    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
945    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
946
947    // Build sub-agents from configurations
948    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
949
950    for subagent_config in &config.subagent_configs {
951        // Determine the planner for this sub-agent
952        let sub_planner = if let Some(ref model) = subagent_config.model {
953            // Sub-agent has its own model - wrap it in a planner
954            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
955        } else {
956            // Inherit parent's planner
957            config.planner.clone()
958        };
959
960        // Create a DeepAgentConfig for this sub-agent
961        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
962
963        // Inherit max_iterations from parent
964        sub_cfg = sub_cfg.with_max_iterations(config.max_iterations.get());
965
966        // Configure tools
967        if let Some(ref tools) = subagent_config.tools {
968            tracing::debug!(
969                "  - Configuring {} tools for {}",
970                tools.len(),
971                subagent_config.name
972            );
973            for tool in tools {
974                sub_cfg = sub_cfg.with_tool(tool.clone());
975            }
976        }
977
978        // Configure built-in tools
979        if let Some(ref builtin) = subagent_config.builtin_tools {
980            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
981        }
982
983        // Sub-agents should not have their own sub-agents
984        sub_cfg = sub_cfg.with_auto_general_purpose(false);
985
986        // Configure prompt caching
987        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
988
989        // Inherit PII sanitization setting from parent
990        sub_cfg = sub_cfg.with_pii_sanitization(config.enable_pii_sanitization);
991
992        // Build the sub-agent recursively
993        let sub_agent = create_deep_agent_from_config(sub_cfg);
994
995        // Register the sub-agent
996        registrations.push(SubAgentRegistration {
997            descriptor: SubAgentDescriptor {
998                name: subagent_config.name.clone(),
999                description: subagent_config.description.clone(),
1000            },
1001            agent: Arc::new(sub_agent),
1002        });
1003
1004        tracing::info!("=> Registered sub-agent: {}", subagent_config.name);
1005    }
1006
1007    tracing::info!("=> Total sub-agents registered: {}", registrations.len());
1008
1009    // Optionally inject a general-purpose subagent
1010    if config.auto_general_purpose {
1011        let has_gp = registrations
1012            .iter()
1013            .any(|r| r.descriptor.name == "general-purpose");
1014        if !has_gp {
1015            // Create a subagent with inherited planner/tools and same instructions
1016            let mut sub_cfg =
1017                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
1018                    .with_auto_general_purpose(false)
1019                    .with_prompt_caching(config.enable_prompt_caching)
1020                    .with_pii_sanitization(config.enable_pii_sanitization)
1021                    .with_max_iterations(config.max_iterations.get());
1022            if let Some(ref selected) = config.builtin_tools {
1023                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
1024            }
1025            if let Some(ref sum) = config.summarization {
1026                sub_cfg = sub_cfg.with_summarization(sum.clone());
1027            }
1028            for t in &config.tools {
1029                sub_cfg = sub_cfg.with_tool(t.clone());
1030            }
1031
1032            let gp = create_deep_agent_from_config(sub_cfg);
1033            registrations.push(SubAgentRegistration {
1034                descriptor: SubAgentDescriptor {
1035                    name: "general-purpose".into(),
1036                    description: "Default reasoning agent".into(),
1037                },
1038                agent: Arc::new(gp),
1039            });
1040        }
1041    }
1042
1043    let subagent = Arc::new(SubAgentMiddleware::new_with_events(
1044        registrations,
1045        config.event_dispatcher.clone(),
1046    ));
1047    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
1048    let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
1049    let summarization = config.summarization.as_ref().map(|cfg| {
1050        Arc::new(SummarizationMiddleware::new(
1051            cfg.messages_to_keep,
1052            cfg.summary_note.clone(),
1053        ))
1054    });
1055    let hitl = if config.tool_interrupts.is_empty() {
1056        None
1057    } else {
1058        // Validate that checkpointer is configured when HITL is enabled
1059        if config.checkpointer.is_none() {
1060            tracing::error!(
1061                "⚠️ HITL middleware requires a checkpointer to persist interrupt state. \
1062                 HITL will be disabled. Please configure a checkpointer to enable HITL."
1063            );
1064            None
1065        } else {
1066            tracing::info!("πŸ”’ HITL enabled for {} tools", config.tool_interrupts.len());
1067            Some(Arc::new(HumanInLoopMiddleware::new(
1068                config.tool_interrupts.clone(),
1069            )))
1070        }
1071    };
1072
1073    // Assemble middleware stack with Deep Agent prompt for automatic tool usage
1074    // Order: base β†’ deep agent prompt β†’ planning β†’ filesystem β†’ subagents β†’ summarization β†’ caching β†’ HITL
1075    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
1076        base_prompt,
1077        deep_agent_prompt,
1078        planning,
1079        filesystem,
1080        subagent,
1081    ];
1082    if let Some(ref summary) = summarization {
1083        middlewares.push(summary.clone());
1084    }
1085    if config.enable_prompt_caching {
1086        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
1087    }
1088    if let Some(ref hitl_mw) = hitl {
1089        middlewares.push(hitl_mw.clone());
1090    }
1091
1092    DeepAgent {
1093        descriptor: AgentDescriptor {
1094            name: "deep-agent".into(),
1095            version: "0.0.1".into(),
1096            description: Some("Rust deep agent".into()),
1097        },
1098        instructions: config.instructions,
1099        planner: config.planner,
1100        middlewares,
1101        base_tools: config.tools,
1102        state,
1103        history,
1104        _summarization: summarization,
1105        _hitl: hitl,
1106        builtin_tools: config.builtin_tools,
1107        checkpointer: config.checkpointer,
1108        event_dispatcher: config.event_dispatcher,
1109        enable_pii_sanitization: config.enable_pii_sanitization,
1110        max_iterations: config.max_iterations,
1111    }
1112}