agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10    ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::num::NonZeroUsize;
26use std::sync::{Arc, RwLock};
27
28// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
29const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
30
31// (no streaming types in baseline)
32
33/// Helper function to count todos by status
34fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
35    let mut pending = 0;
36    let mut in_progress = 0;
37    let mut completed = 0;
38
39    for todo in todos {
40        match todo.status {
41            agents_core::state::TodoStatus::Pending => pending += 1,
42            agents_core::state::TodoStatus::InProgress => in_progress += 1,
43            agents_core::state::TodoStatus::Completed => completed += 1,
44        }
45    }
46
47    (pending, in_progress, completed)
48}
49
50/// Core Deep Agent runtime implementation
51///
52/// This struct contains all the runtime state and behavior for a Deep Agent,
53/// including middleware management, tool execution, HITL support, and state persistence.
54pub struct DeepAgent {
55    descriptor: AgentDescriptor,
56    instructions: String,
57    planner: Arc<dyn PlannerHandle>,
58    middlewares: Vec<Arc<dyn AgentMiddleware>>,
59    base_tools: Vec<ToolBox>,
60    state: Arc<RwLock<AgentStateSnapshot>>,
61    history: Arc<RwLock<Vec<AgentMessage>>>,
62    _summarization: Option<Arc<SummarizationMiddleware>>,
63    _hitl: Option<Arc<HumanInLoopMiddleware>>,
64    builtin_tools: Option<HashSet<String>>,
65    checkpointer: Option<Arc<dyn Checkpointer>>,
66    event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
67    enable_pii_sanitization: bool,
68    max_iterations: NonZeroUsize,
69}
70
71impl DeepAgent {
72    fn collect_tools(&self) -> HashMap<String, ToolBox> {
73        let mut tools: HashMap<String, ToolBox> = HashMap::new();
74        for tool in &self.base_tools {
75            tools.insert(tool.schema().name.clone(), tool.clone());
76        }
77        for middleware in &self.middlewares {
78            for tool in middleware.tools() {
79                let tool_name = tool.schema().name.clone();
80                if self.should_include(&tool_name) {
81                    tools.insert(tool_name, tool);
82                }
83            }
84        }
85        tools
86    }
87    // no streaming path in baseline
88
89    fn should_include(&self, name: &str) -> bool {
90        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
91        if !is_builtin {
92            return true;
93        }
94        match &self.builtin_tools {
95            None => true,
96            Some(selected) => selected.contains(name),
97        }
98    }
99
100    fn append_history(&self, message: AgentMessage) {
101        if let Ok(mut history) = self.history.write() {
102            history.push(message);
103        }
104    }
105
106    fn current_history(&self) -> Vec<AgentMessage> {
107        self.history.read().map(|h| h.clone()).unwrap_or_default()
108    }
109
110    fn emit_event(&self, event: agents_core::events::AgentEvent) {
111        if let Some(dispatcher) = &self.event_dispatcher {
112            let dispatcher_clone = dispatcher.clone();
113            tokio::spawn(async move {
114                dispatcher_clone.dispatch(event).await;
115            });
116        }
117    }
118
119    fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
120        agents_core::events::EventMetadata::new(
121            "default".to_string(),
122            uuid::Uuid::new_v4().to_string(),
123            None,
124        )
125    }
126
127    fn truncate_message(&self, message: &AgentMessage) -> String {
128        let text = match &message.content {
129            MessageContent::Text(t) => t.clone(),
130            MessageContent::Json(v) => v.to_string(),
131        };
132
133        if self.enable_pii_sanitization {
134            agents_core::security::safe_preview(&text, agents_core::security::MAX_PREVIEW_LENGTH)
135        } else {
136            // No sanitization - just truncate
137            agents_core::security::truncate_string(&text, agents_core::security::MAX_PREVIEW_LENGTH)
138        }
139    }
140
141    fn get_full_message_text(&self, message: &AgentMessage) -> String {
142        match &message.content {
143            MessageContent::Text(t) => t.clone(),
144            MessageContent::Json(v) => v.to_string(),
145        }
146    }
147
148    fn summarize_payload(&self, payload: &Value) -> String {
149        if self.enable_pii_sanitization {
150            agents_core::security::sanitize_tool_payload(
151                payload,
152                agents_core::security::MAX_PREVIEW_LENGTH,
153            )
154        } else {
155            // No sanitization - just truncate JSON string
156            let json_str = payload.to_string();
157            agents_core::security::truncate_string(
158                &json_str,
159                agents_core::security::MAX_PREVIEW_LENGTH,
160            )
161        }
162    }
163
164    /// Save the current agent state to the configured checkpointer.
165    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
166        if let Some(ref checkpointer) = self.checkpointer {
167            let state = self
168                .state
169                .read()
170                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
171                .clone();
172
173            // Calculate state size before saving
174            let state_json = serde_json::to_string(&state)?;
175            let state_size = state_json.len();
176
177            // Save state to checkpointer
178            checkpointer.save_state(thread_id, &state).await?;
179
180            // Emit StateCheckpointed event after successful save
181            self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
182                agents_core::events::StateCheckpointedEvent {
183                    metadata: self.create_event_metadata(),
184                    checkpoint_id: thread_id.to_string(),
185                    state_size_bytes: state_size,
186                },
187            ));
188
189            tracing::debug!(
190                thread_id = %thread_id,
191                state_size_bytes = state_size,
192                "πŸ’Ύ State checkpointed and event emitted"
193            );
194
195            Ok(())
196        } else {
197            tracing::warn!("Attempted to save state but no checkpointer is configured");
198            Ok(())
199        }
200    }
201
202    /// Load agent state from the configured checkpointer.
203    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
204        if let Some(ref checkpointer) = self.checkpointer {
205            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
206                *self
207                    .state
208                    .write()
209                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
210                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
211                Ok(true)
212            } else {
213                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
214                Ok(false)
215            }
216        } else {
217            tracing::warn!("Attempted to load state but no checkpointer is configured");
218            Ok(false)
219        }
220    }
221
222    /// Delete saved state for the specified thread.
223    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
224        if let Some(ref checkpointer) = self.checkpointer {
225            checkpointer.delete_thread(thread_id).await
226        } else {
227            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
228            Ok(())
229        }
230    }
231
232    /// List all threads with saved state.
233    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
234        if let Some(ref checkpointer) = self.checkpointer {
235            checkpointer.list_threads().await
236        } else {
237            Ok(Vec::new())
238        }
239    }
240
241    async fn execute_tool(
242        &self,
243        tool: ToolBox,
244        _tool_name: String,
245        payload: Value,
246    ) -> anyhow::Result<AgentMessage> {
247        let state_snapshot = self.state.read().unwrap().clone();
248        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
249
250        let result = tool.execute(payload, ctx).await?;
251        Ok(self.apply_tool_result(result))
252    }
253
254    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
255        match result {
256            ToolResult::Message(message) => {
257                // Tool results are not added to conversation history
258                // Only the final LLM response after tool execution is added
259                message
260            }
261            ToolResult::WithStateUpdate {
262                message,
263                state_diff,
264            } => {
265                // Check if todos were updated
266                let todos_updated = state_diff.todos.is_some();
267
268                if let Ok(mut state) = self.state.write() {
269                    let command = agents_core::command::Command::with_state(state_diff);
270                    command.apply_to(&mut state);
271
272                    // Emit TodosUpdated event if todos were modified
273                    if todos_updated {
274                        let (pending_count, in_progress_count, completed_count) =
275                            count_todos(&state.todos);
276
277                        self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
278                            agents_core::events::TodosUpdatedEvent {
279                                metadata: self.create_event_metadata(),
280                                todos: state.todos.clone(),
281                                pending_count,
282                                in_progress_count,
283                                completed_count,
284                                last_updated: chrono::Utc::now().to_rfc3339(),
285                            },
286                        ));
287
288                        tracing::debug!(
289                            pending = pending_count,
290                            in_progress = in_progress_count,
291                            completed = completed_count,
292                            total = state.todos.len(),
293                            "πŸ“ Todos updated and event emitted"
294                        );
295                    }
296                }
297                // Tool results are not added to conversation history
298                // Only the final LLM response after tool execution is added
299                message
300            }
301        }
302    }
303
304    /// Get the current pending interrupt, if any.
305    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
306        self.state
307            .read()
308            .ok()
309            .and_then(|guard| guard.pending_interrupts.first().cloned())
310    }
311
312    /// Add a broadcaster dynamically to the agent's event dispatcher.
313    ///
314    /// Add a single broadcaster dynamically after the agent is built.
315    ///
316    /// This is useful for per-conversation or per-customer broadcasters.
317    ///
318    /// # Example
319    /// ```no_run
320    /// use std::sync::Arc;
321    /// // agent.add_broadcaster(Arc::new(MyBroadcaster::new()));
322    /// ```
323    pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
324        if let Some(dispatcher) = &self.event_dispatcher {
325            dispatcher.add_broadcaster(broadcaster);
326            tracing::debug!("Broadcaster added to event dispatcher");
327        } else {
328            tracing::warn!("add_broadcaster called but no event dispatcher configured");
329        }
330    }
331
332    /// Add multiple broadcasters at once.
333    ///
334    /// This is useful when you need to add several broadcasters for a conversation
335    /// (e.g., WhatsApp, SSE, DynamoDB).
336    ///
337    /// # Example
338    /// ```no_run
339    /// use std::sync::Arc;
340    /// // agent.add_broadcasters(vec![
341    /// //     Arc::new(WhatsAppBroadcaster::new(phone)),
342    /// //     Arc::new(SseBroadcaster::new(channel)),
343    /// //     Arc::new(DynamoDbBroadcaster::new(table)),
344    /// // ]);
345    /// ```
346    pub fn add_broadcasters(
347        &self,
348        broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
349    ) {
350        if let Some(dispatcher) = &self.event_dispatcher {
351            for broadcaster in broadcasters {
352                dispatcher.add_broadcaster(broadcaster);
353            }
354            tracing::debug!("Multiple broadcasters added to event dispatcher");
355        } else {
356            tracing::warn!("add_broadcasters called but no event dispatcher configured");
357        }
358    }
359
360    /// Resume execution after human approval of an interrupt.
361    pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
362        // Get the first pending interrupt
363        let interrupt = {
364            let state_guard = self
365                .state
366                .read()
367                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
368            state_guard
369                .pending_interrupts
370                .first()
371                .cloned()
372                .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
373        };
374
375        let result_message = match action {
376            HitlAction::Accept => {
377                // Execute with original args
378                let AgentInterrupt::HumanInLoop(hitl) = interrupt;
379                tracing::info!(
380                    tool_name = %hitl.tool_name,
381                    call_id = %hitl.call_id,
382                    "βœ… HITL: Tool approved, executing with original arguments"
383                );
384
385                let tools = self.collect_tools();
386                let tool = tools
387                    .get(&hitl.tool_name)
388                    .cloned()
389                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
390
391                self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
392                    .await?
393            }
394
395            HitlAction::Edit {
396                tool_name,
397                tool_args,
398            } => {
399                // Execute with modified args
400                tracing::info!(
401                    tool_name = %tool_name,
402                    "✏️ HITL: Tool edited, executing with modified arguments"
403                );
404
405                let tools = self.collect_tools();
406                let tool = tools
407                    .get(&tool_name)
408                    .cloned()
409                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
410
411                self.execute_tool(tool, tool_name, tool_args).await?
412            }
413
414            HitlAction::Reject { reason } => {
415                // Don't execute - return rejection message
416                tracing::info!("❌ HITL: Tool rejected");
417
418                let text = reason
419                    .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
420
421                let message = AgentMessage {
422                    role: MessageRole::Tool,
423                    content: MessageContent::Text(text),
424                    metadata: None,
425                };
426
427                self.append_history(message.clone());
428                message
429            }
430
431            HitlAction::Respond { message } => {
432                // Don't execute - return custom message
433                tracing::info!("πŸ’¬ HITL: Custom response provided");
434
435                self.append_history(message.clone());
436                message
437            }
438        };
439
440        // Clear the interrupt from state
441        {
442            let mut state_guard = self
443                .state
444                .write()
445                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
446            state_guard.clear_interrupts();
447        }
448
449        // Persist cleared state
450        if let Some(checkpointer) = &self.checkpointer {
451            let state_clone = self
452                .state
453                .read()
454                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
455                .clone();
456            checkpointer
457                .save_state(&ThreadId::default(), &state_clone)
458                .await?;
459        }
460
461        Ok(result_message)
462    }
463
464    /// Handle message from string input - converts string to AgentMessage internally
465    pub async fn handle_message(
466        &self,
467        input: impl AsRef<str>,
468        state: Arc<AgentStateSnapshot>,
469    ) -> anyhow::Result<AgentMessage> {
470        self.handle_message_with_metadata(input, None, state).await
471    }
472
473    /// Handle message from string input with metadata - converts string to AgentMessage internally
474    pub async fn handle_message_with_metadata(
475        &self,
476        input: impl AsRef<str>,
477        metadata: Option<MessageMetadata>,
478        state: Arc<AgentStateSnapshot>,
479    ) -> anyhow::Result<AgentMessage> {
480        let agent_message = AgentMessage {
481            role: MessageRole::User,
482            content: MessageContent::Text(input.as_ref().to_string()),
483            metadata,
484        };
485        self.handle_message_internal(agent_message, state).await
486    }
487
488    /// Internal method that contains the actual message handling logic
489    async fn handle_message_internal(
490        &self,
491        input: AgentMessage,
492        loaded_state: Arc<AgentStateSnapshot>,
493    ) -> anyhow::Result<AgentMessage> {
494        let start_time = std::time::Instant::now();
495
496        // Initialize internal state with loaded state from checkpointer
497        // This ensures conversation context is maintained across sessions
498        if let Ok(mut state_guard) = self.state.write() {
499            *state_guard = (*loaded_state).clone();
500        }
501
502        self.emit_event(agents_core::events::AgentEvent::AgentStarted(
503            agents_core::events::AgentStartedEvent {
504                metadata: self.create_event_metadata(),
505                agent_name: self.descriptor.name.clone(),
506                message_preview: self.truncate_message(&input),
507            },
508        ));
509
510        self.append_history(input.clone());
511
512        // ReAct loop: continue until LLM responds with text (not tool calls)
513        let max_iterations = self.max_iterations.get();
514        let mut iteration = 0;
515
516        loop {
517            iteration += 1;
518            if iteration > max_iterations {
519                tracing::warn!(
520                    "⚠️ Max iterations ({}) reached, stopping ReAct loop",
521                    max_iterations
522                );
523                let response = AgentMessage {
524                    role: MessageRole::Agent,
525                    content: MessageContent::Text(
526                        "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
527                    ),
528                    metadata: None,
529                };
530                self.append_history(response.clone());
531                return Ok(response);
532            }
533
534            tracing::debug!("πŸ”„ ReAct iteration {}/{}", iteration, max_iterations);
535
536            // Build request with current history
537            let mut request = ModelRequest::new(&self.instructions, self.current_history());
538            let tools = self.collect_tools();
539            for middleware in &self.middlewares {
540                let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
541                middleware.modify_model_request(&mut ctx).await?;
542            }
543
544            let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
545            let context = PlannerContext {
546                history: request.messages.clone(),
547                system_prompt: request.system_prompt.clone(),
548                tools: tool_schemas,
549            };
550            let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
551
552            // Ask LLM what to do
553            let decision = self.planner.plan(context, state_snapshot).await?;
554
555            // Emit PlanningComplete event
556            self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
557                agents_core::events::PlanningCompleteEvent {
558                    metadata: self.create_event_metadata(),
559                    action_type: match &decision.next_action {
560                        PlannerAction::Respond { .. } => "respond".to_string(),
561                        PlannerAction::CallTool { .. } => "call_tool".to_string(),
562                        PlannerAction::Terminate => "terminate".to_string(),
563                    },
564                    action_summary: match &decision.next_action {
565                        PlannerAction::Respond { message } => {
566                            format!("Respond: {}", self.truncate_message(message))
567                        }
568                        PlannerAction::CallTool { tool_name, .. } => {
569                            format!("Call tool: {}", tool_name)
570                        }
571                        PlannerAction::Terminate => "Terminate".to_string(),
572                    },
573                },
574            ));
575
576            match decision.next_action {
577                PlannerAction::Respond { message } => {
578                    // LLM decided to respond with text - exit loop
579                    self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
580                        agents_core::events::AgentCompletedEvent {
581                            metadata: self.create_event_metadata(),
582                            agent_name: self.descriptor.name.clone(),
583                            duration_ms: start_time.elapsed().as_millis() as u64,
584                            response_preview: self.truncate_message(&message),
585                            response: self.get_full_message_text(&message),
586                        },
587                    ));
588
589                    self.append_history(message.clone());
590                    return Ok(message);
591                }
592                PlannerAction::CallTool { tool_name, payload } => {
593                    // Add AI's decision to call tool to history
594                    // This is needed for OpenAI's API which expects:
595                    // 1. Assistant message with tool call
596                    // 2. Tool message with result
597                    // Using System role to avoid LLM echoing this back as a response
598                    let tool_call_message = AgentMessage {
599                        role: MessageRole::System,
600                        content: MessageContent::Text(format!(
601                            "Calling tool: {} with args: {}",
602                            tool_name,
603                            serde_json::to_string(&payload).unwrap_or_default()
604                        )),
605                        metadata: None,
606                    };
607                    self.append_history(tool_call_message);
608
609                    if let Some(tool) = tools.get(&tool_name).cloned() {
610                        // Check all middleware for interrupts before executing tool
611                        let call_id = format!("call_{}", uuid::Uuid::new_v4());
612                        for middleware in &self.middlewares {
613                            if let Some(interrupt) = middleware
614                                .before_tool_execution(&tool_name, &payload, &call_id)
615                                .await?
616                            {
617                                // Save interrupt to state
618                                {
619                                    let mut state_guard = self.state.write().map_err(|_| {
620                                        anyhow::anyhow!("Failed to acquire write lock on state")
621                                    })?;
622                                    state_guard.add_interrupt(interrupt.clone());
623                                }
624
625                                // Persist state with checkpointer
626                                if let Some(checkpointer) = &self.checkpointer {
627                                    let state_clone = self
628                                        .state
629                                        .read()
630                                        .map_err(|_| {
631                                            anyhow::anyhow!("Failed to acquire read lock on state")
632                                        })?
633                                        .clone();
634                                    checkpointer
635                                        .save_state(&ThreadId::default(), &state_clone)
636                                        .await?;
637                                }
638
639                                // Return interrupt message - execution pauses here
640                                let interrupt_message = AgentMessage {
641                                    role: MessageRole::System,
642                                    content: MessageContent::Text(format!(
643                                        "⏸️ Execution paused: Tool '{}' requires human approval",
644                                        tool_name
645                                    )),
646                                    metadata: None,
647                                };
648                                self.append_history(interrupt_message.clone());
649                                return Ok(interrupt_message);
650                            }
651                        }
652
653                        // No interrupt - execute tool
654                        let tool_start_time = std::time::Instant::now();
655
656                        self.emit_event(agents_core::events::AgentEvent::ToolStarted(
657                            agents_core::events::ToolStartedEvent {
658                                metadata: self.create_event_metadata(),
659                                tool_name: tool_name.clone(),
660                                input_summary: self.summarize_payload(&payload),
661                            },
662                        ));
663
664                        tracing::warn!(
665                            "βš™οΈ EXECUTING TOOL: {} with payload: {}",
666                            tool_name,
667                            serde_json::to_string(&payload)
668                                .unwrap_or_else(|_| "invalid json".to_string())
669                        );
670
671                        let result = self
672                            .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
673                            .await;
674
675                        let duration = tool_start_time.elapsed();
676                        match result {
677                            Ok(tool_result_message) => {
678                                let content_preview = match &tool_result_message.content {
679                                    MessageContent::Text(t) => {
680                                        if t.chars().count() > 100 {
681                                            format!("{:.100}... ({} chars)", t, t.chars().count())
682                                        } else {
683                                            t.clone()
684                                        }
685                                    }
686                                    MessageContent::Json(v) => {
687                                        format!("JSON: {} bytes", v.to_string().len())
688                                    }
689                                };
690
691                                self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
692                                    agents_core::events::ToolCompletedEvent {
693                                        metadata: self.create_event_metadata(),
694                                        tool_name: tool_name.clone(),
695                                        duration_ms: duration.as_millis() as u64,
696                                        result_summary: content_preview.clone(),
697                                        success: true,
698                                    },
699                                ));
700
701                                tracing::warn!(
702                                    "βœ… TOOL COMPLETED: {} in {:?} - Result: {}",
703                                    tool_name,
704                                    duration,
705                                    content_preview
706                                );
707
708                                // Add tool result to history and continue ReAct loop
709                                self.append_history(tool_result_message);
710                                // Loop continues - LLM will see tool result and decide next action
711                            }
712                            Err(e) => {
713                                self.emit_event(agents_core::events::AgentEvent::ToolFailed(
714                                    agents_core::events::ToolFailedEvent {
715                                        metadata: self.create_event_metadata(),
716                                        tool_name: tool_name.clone(),
717                                        duration_ms: duration.as_millis() as u64,
718                                        error_message: e.to_string(),
719                                        is_recoverable: true,
720                                        retry_count: 0,
721                                    },
722                                ));
723
724                                tracing::error!(
725                                    "❌ TOOL FAILED: {} in {:?} - Error: {}",
726                                    tool_name,
727                                    duration,
728                                    e
729                                );
730
731                                // Add error to history and continue - let LLM handle the error
732                                let error_message = AgentMessage {
733                                    role: MessageRole::Tool,
734                                    content: MessageContent::Text(format!(
735                                        "Error executing {}: {}",
736                                        tool_name, e
737                                    )),
738                                    metadata: None,
739                                };
740                                self.append_history(error_message);
741                                // Loop continues - LLM will see error and decide how to handle it
742                            }
743                        }
744                    } else {
745                        // Tool not found - add error to history and continue
746                        tracing::warn!("⚠️ Tool '{}' not found", tool_name);
747                        let error_message = AgentMessage {
748                            role: MessageRole::Tool,
749                            content: MessageContent::Text(format!(
750                                "Tool '{}' not found. Available tools: {}",
751                                tool_name,
752                                tools
753                                    .keys()
754                                    .map(|k| k.as_str())
755                                    .collect::<Vec<_>>()
756                                    .join(", ")
757                            )),
758                            metadata: None,
759                        };
760                        self.append_history(error_message);
761                        // Loop continues - LLM will see error and try something else
762                    }
763                }
764                PlannerAction::Terminate => {
765                    // LLM decided to terminate - exit loop
766                    tracing::debug!("πŸ›‘ Agent terminated");
767                    let message = AgentMessage {
768                        role: MessageRole::Agent,
769                        content: MessageContent::Text("Task completed.".into()),
770                        metadata: None,
771                    };
772                    self.append_history(message.clone());
773                    return Ok(message);
774                }
775            }
776        }
777    }
778}
779
780#[async_trait]
781impl AgentHandle for DeepAgent {
782    async fn describe(&self) -> AgentDescriptor {
783        self.descriptor.clone()
784    }
785
786    async fn handle_message(
787        &self,
788        input: AgentMessage,
789        _state: Arc<AgentStateSnapshot>,
790    ) -> anyhow::Result<AgentMessage> {
791        let response = self.handle_message_internal(input, _state).await?;
792
793        // Persist state to checkpointer after successful message handling
794        if let Some(checkpointer) = &self.checkpointer {
795            let state_clone = self
796                .state
797                .read()
798                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
799                .clone();
800            checkpointer
801                .save_state(&ThreadId::default(), &state_clone)
802                .await?;
803        }
804
805        Ok(response)
806    }
807
808    async fn handle_message_stream(
809        &self,
810        input: AgentMessage,
811        _state: Arc<AgentStateSnapshot>,
812    ) -> anyhow::Result<agents_core::agent::AgentStream> {
813        use crate::planner::LlmBackedPlanner;
814        use agents_core::llm::{LlmRequest, StreamChunk};
815        use futures::StreamExt;
816
817        // Add input to history
818        self.append_history(input.clone());
819
820        // Build the request similar to handle_message_internal
821        let mut request = ModelRequest::new(&self.instructions, self.current_history());
822        let tools = self.collect_tools();
823
824        // Apply middleware modifications
825        for middleware in &self.middlewares {
826            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
827            middleware.modify_model_request(&mut ctx).await?;
828        }
829
830        // Convert ModelRequest to LlmRequest and add tools
831        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
832        let llm_request = LlmRequest {
833            system_prompt: request.system_prompt.clone(),
834            messages: request.messages.clone(),
835            tools: tool_schemas,
836        };
837
838        // Try to get the underlying LLM model for streaming
839        let planner_any = self.planner.as_any();
840
841        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
842            // We have an LlmBackedPlanner, use its model for streaming
843            let model = llm_planner.model().clone();
844            let stream = model.generate_stream(llm_request).await?;
845
846            // Wrap stream to emit events to broadcasters
847            let agent_name = self.descriptor.name.clone();
848            let event_dispatcher = self.event_dispatcher.clone();
849
850            let wrapped_stream = stream.then(move |chunk_result| {
851                let dispatcher = event_dispatcher.clone();
852                let name = agent_name.clone();
853
854                async move {
855                    match &chunk_result {
856                        Ok(StreamChunk::TextDelta(token)) => {
857                            // Emit streaming token event
858                            if let Some(ref dispatcher) = dispatcher {
859                                let event = agents_core::events::AgentEvent::StreamingToken(
860                                    agents_core::events::StreamingTokenEvent {
861                                        metadata: agents_core::events::EventMetadata::new(
862                                            "default".to_string(),
863                                            uuid::Uuid::new_v4().to_string(),
864                                            None,
865                                        ),
866                                        agent_name: name.clone(),
867                                        token: token.clone(),
868                                    },
869                                );
870                                dispatcher.dispatch(event).await;
871                            }
872                        }
873                        Ok(StreamChunk::Done { message }) => {
874                            // Emit agent completed event
875                            if let Some(ref dispatcher) = dispatcher {
876                                let full_text = match &message.content {
877                                    agents_core::messaging::MessageContent::Text(t) => t.clone(),
878                                    agents_core::messaging::MessageContent::Json(v) => {
879                                        v.to_string()
880                                    }
881                                };
882
883                                let preview = if full_text.len() > 100 {
884                                    format!("{}...", &full_text[..100])
885                                } else {
886                                    full_text.clone()
887                                };
888
889                                let event = agents_core::events::AgentEvent::AgentCompleted(
890                                    agents_core::events::AgentCompletedEvent {
891                                        metadata: agents_core::events::EventMetadata::new(
892                                            "default".to_string(),
893                                            uuid::Uuid::new_v4().to_string(),
894                                            None,
895                                        ),
896                                        agent_name: name.clone(),
897                                        duration_ms: 0, // Duration not tracked in streaming mode
898                                        response_preview: preview,
899                                        response: full_text,
900                                    },
901                                );
902                                dispatcher.dispatch(event).await;
903                            }
904                        }
905                        _ => {}
906                    }
907                    chunk_result
908                }
909            });
910
911            Ok(Box::pin(wrapped_stream))
912        } else {
913            // Fallback to non-streaming
914            let response = self.handle_message_internal(input, _state).await?;
915            Ok(Box::pin(futures::stream::once(async move {
916                Ok(StreamChunk::Done { message: response })
917            })))
918        }
919    }
920
921    async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
922        let state_guard = self
923            .state
924            .read()
925            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
926        Ok(state_guard.pending_interrupts.first().cloned())
927    }
928
929    async fn resume_with_approval(
930        &self,
931        action: agents_core::hitl::HitlAction,
932    ) -> anyhow::Result<AgentMessage> {
933        self.resume_with_approval(action).await
934    }
935}
936
937/// Create a deep agent from configuration - matches Python middleware assembly exactly
938///
939/// This function assembles the middleware stack in the same order as the Python SDK:
940/// planning β†’ filesystem β†’ subagents β†’ summarization β†’ prompt caching β†’ optional HITL
941pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
942    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
943    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
944
945    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
946    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
947
948    // Build sub-agents from configurations
949    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
950
951    for subagent_config in &config.subagent_configs {
952        // Determine the planner for this sub-agent
953        let sub_planner = if let Some(ref model) = subagent_config.model {
954            // Sub-agent has its own model - wrap it in a planner
955            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
956        } else {
957            // Inherit parent's planner
958            config.planner.clone()
959        };
960
961        // Create a DeepAgentConfig for this sub-agent
962        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
963
964        // Inherit max_iterations from parent
965        sub_cfg = sub_cfg.with_max_iterations(config.max_iterations.get());
966
967        // Configure tools
968        if let Some(ref tools) = subagent_config.tools {
969            tracing::debug!(
970                "  - Configuring {} tools for {}",
971                tools.len(),
972                subagent_config.name
973            );
974            for tool in tools {
975                sub_cfg = sub_cfg.with_tool(tool.clone());
976            }
977        }
978
979        // Configure built-in tools
980        if let Some(ref builtin) = subagent_config.builtin_tools {
981            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
982        }
983
984        // Sub-agents should not have their own sub-agents
985        sub_cfg = sub_cfg.with_auto_general_purpose(false);
986
987        // Configure prompt caching
988        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
989
990        // Inherit PII sanitization setting from parent
991        sub_cfg = sub_cfg.with_pii_sanitization(config.enable_pii_sanitization);
992
993        // Build the sub-agent recursively
994        let sub_agent = create_deep_agent_from_config(sub_cfg);
995
996        // Register the sub-agent
997        registrations.push(SubAgentRegistration {
998            descriptor: SubAgentDescriptor {
999                name: subagent_config.name.clone(),
1000                description: subagent_config.description.clone(),
1001            },
1002            agent: Arc::new(sub_agent),
1003        });
1004
1005        tracing::info!("=> Registered sub-agent: {}", subagent_config.name);
1006    }
1007
1008    tracing::info!("=> Total sub-agents registered: {}", registrations.len());
1009
1010    // Optionally inject a general-purpose subagent
1011    if config.auto_general_purpose {
1012        let has_gp = registrations
1013            .iter()
1014            .any(|r| r.descriptor.name == "general-purpose");
1015        if !has_gp {
1016            // Create a subagent with inherited planner/tools and same instructions
1017            let mut sub_cfg =
1018                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
1019                    .with_auto_general_purpose(false)
1020                    .with_prompt_caching(config.enable_prompt_caching)
1021                    .with_pii_sanitization(config.enable_pii_sanitization)
1022                    .with_max_iterations(config.max_iterations.get());
1023            if let Some(ref selected) = config.builtin_tools {
1024                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
1025            }
1026            if let Some(ref sum) = config.summarization {
1027                sub_cfg = sub_cfg.with_summarization(sum.clone());
1028            }
1029            for t in &config.tools {
1030                sub_cfg = sub_cfg.with_tool(t.clone());
1031            }
1032
1033            let gp = create_deep_agent_from_config(sub_cfg);
1034            registrations.push(SubAgentRegistration {
1035                descriptor: SubAgentDescriptor {
1036                    name: "general-purpose".into(),
1037                    description: "Default reasoning agent".into(),
1038                },
1039                agent: Arc::new(gp),
1040            });
1041        }
1042    }
1043
1044    let subagent = Arc::new(SubAgentMiddleware::new_with_events(
1045        registrations,
1046        config.event_dispatcher.clone(),
1047    ));
1048    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
1049
1050    // Create Deep Agent prompt middleware - use override if custom system prompt is set
1051    // Otherwise use the configured prompt format (JSON or TOON)
1052    let deep_agent_prompt: Arc<dyn AgentMiddleware> =
1053        if let Some(ref custom_prompt) = config.custom_system_prompt {
1054            Arc::new(DeepAgentPromptMiddleware::with_override(
1055                custom_prompt.clone(),
1056            ))
1057        } else {
1058            Arc::new(DeepAgentPromptMiddleware::with_format(
1059                config.instructions.clone(),
1060                config.prompt_format,
1061            ))
1062        };
1063    let summarization = config.summarization.as_ref().map(|cfg| {
1064        Arc::new(SummarizationMiddleware::new(
1065            cfg.messages_to_keep,
1066            cfg.summary_note.clone(),
1067        ))
1068    });
1069    let hitl = if config.tool_interrupts.is_empty() {
1070        None
1071    } else {
1072        // Validate that checkpointer is configured when HITL is enabled
1073        if config.checkpointer.is_none() {
1074            tracing::error!(
1075                "⚠️ HITL middleware requires a checkpointer to persist interrupt state. \
1076                 HITL will be disabled. Please configure a checkpointer to enable HITL."
1077            );
1078            None
1079        } else {
1080            tracing::info!("πŸ”’ HITL enabled for {} tools", config.tool_interrupts.len());
1081            Some(Arc::new(HumanInLoopMiddleware::new(
1082                config.tool_interrupts.clone(),
1083            )))
1084        }
1085    };
1086
1087    // Assemble middleware stack with Deep Agent prompt for automatic tool usage
1088    // Order: base β†’ deep agent prompt β†’ planning β†’ filesystem β†’ subagents β†’ summarization β†’ caching β†’ HITL
1089    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
1090        base_prompt,
1091        deep_agent_prompt,
1092        planning,
1093        filesystem,
1094        subagent,
1095    ];
1096    if let Some(ref summary) = summarization {
1097        middlewares.push(summary.clone());
1098    }
1099    if config.enable_prompt_caching {
1100        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
1101    }
1102    if let Some(ref hitl_mw) = hitl {
1103        middlewares.push(hitl_mw.clone());
1104    }
1105
1106    DeepAgent {
1107        descriptor: AgentDescriptor {
1108            name: "deep-agent".into(),
1109            version: "0.0.1".into(),
1110            description: Some("Rust deep agent".into()),
1111        },
1112        instructions: config.instructions,
1113        planner: config.planner,
1114        middlewares,
1115        base_tools: config.tools,
1116        state,
1117        history,
1118        _summarization: summarization,
1119        _hitl: hitl,
1120        builtin_tools: config.builtin_tools,
1121        checkpointer: config.checkpointer,
1122        event_dispatcher: config.event_dispatcher,
1123        enable_pii_sanitization: config.enable_pii_sanitization,
1124        max_iterations: config.max_iterations,
1125    }
1126}