agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10    ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::num::NonZeroUsize;
26use std::sync::{Arc, RwLock};
27
28// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
29const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
30
31// (no streaming types in baseline)
32
33/// Helper function to count todos by status
34fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
35    let mut pending = 0;
36    let mut in_progress = 0;
37    let mut completed = 0;
38
39    for todo in todos {
40        match todo.status {
41            agents_core::state::TodoStatus::Pending => pending += 1,
42            agents_core::state::TodoStatus::InProgress => in_progress += 1,
43            agents_core::state::TodoStatus::Completed => completed += 1,
44        }
45    }
46
47    (pending, in_progress, completed)
48}
49
50/// Core Deep Agent runtime implementation
51///
52/// This struct contains all the runtime state and behavior for a Deep Agent,
53/// including middleware management, tool execution, HITL support, and state persistence.
54pub struct DeepAgent {
55    descriptor: AgentDescriptor,
56    instructions: String,
57    planner: Arc<dyn PlannerHandle>,
58    middlewares: Vec<Arc<dyn AgentMiddleware>>,
59    base_tools: Vec<ToolBox>,
60    state: Arc<RwLock<AgentStateSnapshot>>,
61    history: Arc<RwLock<Vec<AgentMessage>>>,
62    _summarization: Option<Arc<SummarizationMiddleware>>,
63    _hitl: Option<Arc<HumanInLoopMiddleware>>,
64    builtin_tools: Option<HashSet<String>>,
65    checkpointer: Option<Arc<dyn Checkpointer>>,
66    event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
67    enable_pii_sanitization: bool,
68    max_iterations: NonZeroUsize,
69}
70
71impl DeepAgent {
72    fn collect_tools(&self) -> HashMap<String, ToolBox> {
73        let mut tools: HashMap<String, ToolBox> = HashMap::new();
74        for tool in &self.base_tools {
75            tools.insert(tool.schema().name.clone(), tool.clone());
76        }
77        for middleware in &self.middlewares {
78            for tool in middleware.tools() {
79                let tool_name = tool.schema().name.clone();
80                if self.should_include(&tool_name) {
81                    tools.insert(tool_name, tool);
82                }
83            }
84        }
85        tools
86    }
87    // no streaming path in baseline
88
89    fn should_include(&self, name: &str) -> bool {
90        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
91        if !is_builtin {
92            return true;
93        }
94        match &self.builtin_tools {
95            None => true,
96            Some(selected) => selected.contains(name),
97        }
98    }
99
100    fn append_history(&self, message: AgentMessage) {
101        if let Ok(mut history) = self.history.write() {
102            history.push(message);
103        }
104    }
105
106    fn current_history(&self) -> Vec<AgentMessage> {
107        self.history.read().map(|h| h.clone()).unwrap_or_default()
108    }
109
110    fn emit_event(&self, event: agents_core::events::AgentEvent) {
111        if let Some(dispatcher) = &self.event_dispatcher {
112            let dispatcher_clone = dispatcher.clone();
113            tokio::spawn(async move {
114                dispatcher_clone.dispatch(event).await;
115            });
116        }
117    }
118
119    fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
120        agents_core::events::EventMetadata::new(
121            "default".to_string(),
122            uuid::Uuid::new_v4().to_string(),
123            None,
124        )
125    }
126
127    fn truncate_message(&self, message: &AgentMessage) -> String {
128        let text = match &message.content {
129            MessageContent::Text(t) => t.clone(),
130            MessageContent::Json(v) => v.to_string(),
131        };
132
133        if self.enable_pii_sanitization {
134            agents_core::security::safe_preview(&text, agents_core::security::MAX_PREVIEW_LENGTH)
135        } else {
136            // No sanitization - just truncate
137            agents_core::security::truncate_string(&text, agents_core::security::MAX_PREVIEW_LENGTH)
138        }
139    }
140
141    fn get_full_message_text(&self, message: &AgentMessage) -> String {
142        match &message.content {
143            MessageContent::Text(t) => t.clone(),
144            MessageContent::Json(v) => v.to_string(),
145        }
146    }
147
148    fn summarize_payload(&self, payload: &Value) -> String {
149        if self.enable_pii_sanitization {
150            agents_core::security::sanitize_tool_payload(
151                payload,
152                agents_core::security::MAX_PREVIEW_LENGTH,
153            )
154        } else {
155            // No sanitization - just truncate JSON string
156            let json_str = payload.to_string();
157            agents_core::security::truncate_string(
158                &json_str,
159                agents_core::security::MAX_PREVIEW_LENGTH,
160            )
161        }
162    }
163
164    /// Save the current agent state to the configured checkpointer.
165    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
166        if let Some(ref checkpointer) = self.checkpointer {
167            let state = self
168                .state
169                .read()
170                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
171                .clone();
172
173            // Calculate state size before saving
174            let state_json = serde_json::to_string(&state)?;
175            let state_size = state_json.len();
176
177            // Save state to checkpointer
178            checkpointer.save_state(thread_id, &state).await?;
179
180            // Emit StateCheckpointed event after successful save
181            self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
182                agents_core::events::StateCheckpointedEvent {
183                    metadata: self.create_event_metadata(),
184                    checkpoint_id: thread_id.to_string(),
185                    state_size_bytes: state_size,
186                },
187            ));
188
189            tracing::debug!(
190                thread_id = %thread_id,
191                state_size_bytes = state_size,
192                "πŸ’Ύ State checkpointed and event emitted"
193            );
194
195            Ok(())
196        } else {
197            tracing::warn!("Attempted to save state but no checkpointer is configured");
198            Ok(())
199        }
200    }
201
202    /// Load agent state from the configured checkpointer.
203    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
204        if let Some(ref checkpointer) = self.checkpointer {
205            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
206                *self
207                    .state
208                    .write()
209                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
210                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
211                Ok(true)
212            } else {
213                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
214                Ok(false)
215            }
216        } else {
217            tracing::warn!("Attempted to load state but no checkpointer is configured");
218            Ok(false)
219        }
220    }
221
222    /// Delete saved state for the specified thread.
223    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
224        if let Some(ref checkpointer) = self.checkpointer {
225            checkpointer.delete_thread(thread_id).await
226        } else {
227            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
228            Ok(())
229        }
230    }
231
232    /// List all threads with saved state.
233    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
234        if let Some(ref checkpointer) = self.checkpointer {
235            checkpointer.list_threads().await
236        } else {
237            Ok(Vec::new())
238        }
239    }
240
241    async fn execute_tool(
242        &self,
243        tool: ToolBox,
244        _tool_name: String,
245        payload: Value,
246    ) -> anyhow::Result<AgentMessage> {
247        let state_snapshot = self.state.read().unwrap().clone();
248        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
249
250        let result = tool.execute(payload, ctx).await?;
251        Ok(self.apply_tool_result(result))
252    }
253
254    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
255        match result {
256            ToolResult::Message(message) => {
257                // Tool results are not added to conversation history
258                // Only the final LLM response after tool execution is added
259                message
260            }
261            ToolResult::WithStateUpdate {
262                message,
263                state_diff,
264            } => {
265                // Check if todos were updated
266                let todos_updated = state_diff.todos.is_some();
267
268                if let Ok(mut state) = self.state.write() {
269                    let command = agents_core::command::Command::with_state(state_diff);
270                    command.apply_to(&mut state);
271
272                    // Emit TodosUpdated event if todos were modified
273                    if todos_updated {
274                        let (pending_count, in_progress_count, completed_count) =
275                            count_todos(&state.todos);
276
277                        self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
278                            agents_core::events::TodosUpdatedEvent {
279                                metadata: self.create_event_metadata(),
280                                todos: state.todos.clone(),
281                                pending_count,
282                                in_progress_count,
283                                completed_count,
284                                last_updated: chrono::Utc::now().to_rfc3339(),
285                            },
286                        ));
287
288                        tracing::debug!(
289                            pending = pending_count,
290                            in_progress = in_progress_count,
291                            completed = completed_count,
292                            total = state.todos.len(),
293                            "πŸ“ Todos updated and event emitted"
294                        );
295                    }
296                }
297                // Tool results are not added to conversation history
298                // Only the final LLM response after tool execution is added
299                message
300            }
301        }
302    }
303
304    /// Get the current pending interrupt, if any.
305    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
306        self.state
307            .read()
308            .ok()
309            .and_then(|guard| guard.pending_interrupts.first().cloned())
310    }
311
312    /// Add a broadcaster dynamically to the agent's event dispatcher.
313    ///
314    /// Add a single broadcaster dynamically after the agent is built.
315    ///
316    /// This is useful for per-conversation or per-customer broadcasters.
317    ///
318    /// # Example
319    /// ```no_run
320    /// use std::sync::Arc;
321    /// // agent.add_broadcaster(Arc::new(MyBroadcaster::new()));
322    /// ```
323    pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
324        if let Some(dispatcher) = &self.event_dispatcher {
325            dispatcher.add_broadcaster(broadcaster);
326            tracing::debug!("Broadcaster added to event dispatcher");
327        } else {
328            tracing::warn!("add_broadcaster called but no event dispatcher configured");
329        }
330    }
331
332    /// Add multiple broadcasters at once.
333    ///
334    /// This is useful when you need to add several broadcasters for a conversation
335    /// (e.g., WhatsApp, SSE, DynamoDB).
336    ///
337    /// # Example
338    /// ```no_run
339    /// use std::sync::Arc;
340    /// // agent.add_broadcasters(vec![
341    /// //     Arc::new(WhatsAppBroadcaster::new(phone)),
342    /// //     Arc::new(SseBroadcaster::new(channel)),
343    /// //     Arc::new(DynamoDbBroadcaster::new(table)),
344    /// // ]);
345    /// ```
346    pub fn add_broadcasters(
347        &self,
348        broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
349    ) {
350        if let Some(dispatcher) = &self.event_dispatcher {
351            for broadcaster in broadcasters {
352                dispatcher.add_broadcaster(broadcaster);
353            }
354            tracing::debug!("Multiple broadcasters added to event dispatcher");
355        } else {
356            tracing::warn!("add_broadcasters called but no event dispatcher configured");
357        }
358    }
359
360    /// Resume execution after human approval of an interrupt.
361    pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
362        // Get the first pending interrupt
363        let interrupt = {
364            let state_guard = self
365                .state
366                .read()
367                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
368            state_guard
369                .pending_interrupts
370                .first()
371                .cloned()
372                .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
373        };
374
375        let result_message = match action {
376            HitlAction::Accept => {
377                // Execute with original args
378                let AgentInterrupt::HumanInLoop(hitl) = interrupt;
379                tracing::info!(
380                    tool_name = %hitl.tool_name,
381                    call_id = %hitl.call_id,
382                    "βœ… HITL: Tool approved, executing with original arguments"
383                );
384
385                let tools = self.collect_tools();
386                let tool = tools
387                    .get(&hitl.tool_name)
388                    .cloned()
389                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
390
391                self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
392                    .await?
393            }
394
395            HitlAction::Edit {
396                tool_name,
397                tool_args,
398            } => {
399                // Execute with modified args
400                tracing::info!(
401                    tool_name = %tool_name,
402                    "✏️ HITL: Tool edited, executing with modified arguments"
403                );
404
405                let tools = self.collect_tools();
406                let tool = tools
407                    .get(&tool_name)
408                    .cloned()
409                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
410
411                self.execute_tool(tool, tool_name, tool_args).await?
412            }
413
414            HitlAction::Reject { reason } => {
415                // Don't execute - return rejection message
416                tracing::info!("❌ HITL: Tool rejected");
417
418                let text = reason
419                    .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
420
421                let message = AgentMessage {
422                    role: MessageRole::Tool,
423                    content: MessageContent::Text(text),
424                    metadata: None,
425                };
426
427                self.append_history(message.clone());
428                message
429            }
430
431            HitlAction::Respond { message } => {
432                // Don't execute - return custom message
433                tracing::info!("πŸ’¬ HITL: Custom response provided");
434
435                self.append_history(message.clone());
436                message
437            }
438        };
439
440        // Clear the interrupt from state
441        {
442            let mut state_guard = self
443                .state
444                .write()
445                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
446            state_guard.clear_interrupts();
447        }
448
449        // Persist cleared state
450        if let Some(checkpointer) = &self.checkpointer {
451            let state_clone = self
452                .state
453                .read()
454                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
455                .clone();
456            checkpointer
457                .save_state(&ThreadId::default(), &state_clone)
458                .await?;
459        }
460
461        Ok(result_message)
462    }
463
464    /// Handle message from string input - converts string to AgentMessage internally
465    pub async fn handle_message(
466        &self,
467        input: impl AsRef<str>,
468        state: Arc<AgentStateSnapshot>,
469    ) -> anyhow::Result<AgentMessage> {
470        self.handle_message_with_metadata(input, None, state).await
471    }
472
473    /// Handle message from string input with metadata - converts string to AgentMessage internally
474    pub async fn handle_message_with_metadata(
475        &self,
476        input: impl AsRef<str>,
477        metadata: Option<MessageMetadata>,
478        state: Arc<AgentStateSnapshot>,
479    ) -> anyhow::Result<AgentMessage> {
480        let agent_message = AgentMessage {
481            role: MessageRole::User,
482            content: MessageContent::Text(input.as_ref().to_string()),
483            metadata,
484        };
485        self.handle_message_internal(agent_message, state).await
486    }
487
488    /// Internal method that contains the actual message handling logic
489    async fn handle_message_internal(
490        &self,
491        input: AgentMessage,
492        loaded_state: Arc<AgentStateSnapshot>,
493    ) -> anyhow::Result<AgentMessage> {
494        let start_time = std::time::Instant::now();
495
496        // Initialize internal state with loaded state from checkpointer
497        // This ensures conversation context is maintained across sessions
498        if let Ok(mut state_guard) = self.state.write() {
499            *state_guard = (*loaded_state).clone();
500        }
501
502        self.emit_event(agents_core::events::AgentEvent::AgentStarted(
503            agents_core::events::AgentStartedEvent {
504                metadata: self.create_event_metadata(),
505                agent_name: self.descriptor.name.clone(),
506                message_preview: self.truncate_message(&input),
507            },
508        ));
509
510        self.append_history(input.clone());
511
512        // ReAct loop: continue until LLM responds with text (not tool calls)
513        let max_iterations = self.max_iterations.get();
514        let mut iteration = 0;
515
516        loop {
517            iteration += 1;
518            if iteration > max_iterations {
519                tracing::warn!(
520                    "⚠️ Max iterations ({}) reached, stopping ReAct loop",
521                    max_iterations
522                );
523                let response = AgentMessage {
524                    role: MessageRole::Agent,
525                    content: MessageContent::Text(
526                        "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
527                    ),
528                    metadata: None,
529                };
530                self.append_history(response.clone());
531                return Ok(response);
532            }
533
534            tracing::debug!("πŸ”„ ReAct iteration {}/{}", iteration, max_iterations);
535
536            // Build request with current history
537            let mut request = ModelRequest::new(&self.instructions, self.current_history());
538            let tools = self.collect_tools();
539            for middleware in &self.middlewares {
540                let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
541                middleware.modify_model_request(&mut ctx).await?;
542            }
543
544            let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
545            let context = PlannerContext {
546                history: request.messages.clone(),
547                system_prompt: request.system_prompt.clone(),
548                tools: tool_schemas,
549            };
550            let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
551
552            // Ask LLM what to do
553            let decision = self.planner.plan(context, state_snapshot).await?;
554
555            // Emit PlanningComplete event
556            self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
557                agents_core::events::PlanningCompleteEvent {
558                    metadata: self.create_event_metadata(),
559                    action_type: match &decision.next_action {
560                        PlannerAction::Respond { .. } => "respond".to_string(),
561                        PlannerAction::CallTool { .. } => "call_tool".to_string(),
562                        PlannerAction::Terminate => "terminate".to_string(),
563                    },
564                    action_summary: match &decision.next_action {
565                        PlannerAction::Respond { message } => {
566                            format!("Respond: {}", self.truncate_message(message))
567                        }
568                        PlannerAction::CallTool { tool_name, .. } => {
569                            format!("Call tool: {}", tool_name)
570                        }
571                        PlannerAction::Terminate => "Terminate".to_string(),
572                    },
573                },
574            ));
575
576            match decision.next_action {
577                PlannerAction::Respond { message } => {
578                    // LLM decided to respond with text - exit loop
579                    self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
580                        agents_core::events::AgentCompletedEvent {
581                            metadata: self.create_event_metadata(),
582                            agent_name: self.descriptor.name.clone(),
583                            duration_ms: start_time.elapsed().as_millis() as u64,
584                            response_preview: self.truncate_message(&message),
585                            response: self.get_full_message_text(&message),
586                        },
587                    ));
588
589                    self.append_history(message.clone());
590                    return Ok(message);
591                }
592                PlannerAction::CallTool { tool_name, payload } => {
593                    // Add AI's decision to call tool to history
594                    // This is needed for OpenAI's API which expects:
595                    // 1. Assistant message with tool call
596                    // 2. Tool message with result
597                    // Using System role to avoid LLM echoing this back as a response
598                    let tool_call_message = AgentMessage {
599                        role: MessageRole::System,
600                        content: MessageContent::Text(format!(
601                            "Calling tool: {} with args: {}",
602                            tool_name,
603                            serde_json::to_string(&payload).unwrap_or_default()
604                        )),
605                        metadata: None,
606                    };
607                    self.append_history(tool_call_message);
608
609                    if let Some(tool) = tools.get(&tool_name).cloned() {
610                        // Check all middleware for interrupts before executing tool
611                        let call_id = format!("call_{}", uuid::Uuid::new_v4());
612                        for middleware in &self.middlewares {
613                            if let Some(interrupt) = middleware
614                                .before_tool_execution(&tool_name, &payload, &call_id)
615                                .await?
616                            {
617                                // Save interrupt to state
618                                {
619                                    let mut state_guard = self.state.write().map_err(|_| {
620                                        anyhow::anyhow!("Failed to acquire write lock on state")
621                                    })?;
622                                    state_guard.add_interrupt(interrupt.clone());
623                                }
624
625                                // Persist state with checkpointer
626                                if let Some(checkpointer) = &self.checkpointer {
627                                    let state_clone = self
628                                        .state
629                                        .read()
630                                        .map_err(|_| {
631                                            anyhow::anyhow!("Failed to acquire read lock on state")
632                                        })?
633                                        .clone();
634                                    checkpointer
635                                        .save_state(&ThreadId::default(), &state_clone)
636                                        .await?;
637                                }
638
639                                // Return interrupt message - execution pauses here
640                                let interrupt_message = AgentMessage {
641                                    role: MessageRole::System,
642                                    content: MessageContent::Text(format!(
643                                        "⏸️ Execution paused: Tool '{}' requires human approval",
644                                        tool_name
645                                    )),
646                                    metadata: None,
647                                };
648                                self.append_history(interrupt_message.clone());
649                                return Ok(interrupt_message);
650                            }
651                        }
652
653                        // No interrupt - execute tool
654                        let tool_start_time = std::time::Instant::now();
655
656                        self.emit_event(agents_core::events::AgentEvent::ToolStarted(
657                            agents_core::events::ToolStartedEvent {
658                                metadata: self.create_event_metadata(),
659                                tool_name: tool_name.clone(),
660                                input_summary: self.summarize_payload(&payload),
661                            },
662                        ));
663
664                        tracing::warn!(
665                            "βš™οΈ EXECUTING TOOL: {} with payload: {}",
666                            tool_name,
667                            serde_json::to_string(&payload)
668                                .unwrap_or_else(|_| "invalid json".to_string())
669                        );
670
671                        let result = self
672                            .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
673                            .await;
674
675                        let duration = tool_start_time.elapsed();
676                        match result {
677                            Ok(tool_result_message) => {
678                                let content_preview = match &tool_result_message.content {
679                                    MessageContent::Text(t) => {
680                                        if t.len() > 100 {
681                                            let truncated: String = t.chars().take(100).collect();
682                                            format!(
683                                                "{}... ({} chars)",
684                                                truncated,
685                                                t.chars().count()
686                                            )
687                                        } else {
688                                            t.clone()
689                                        }
690                                    }
691                                    MessageContent::Json(v) => {
692                                        format!("JSON: {} bytes", v.to_string().len())
693                                    }
694                                };
695
696                                self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
697                                    agents_core::events::ToolCompletedEvent {
698                                        metadata: self.create_event_metadata(),
699                                        tool_name: tool_name.clone(),
700                                        duration_ms: duration.as_millis() as u64,
701                                        result_summary: content_preview.clone(),
702                                        success: true,
703                                    },
704                                ));
705
706                                tracing::warn!(
707                                    "βœ… TOOL COMPLETED: {} in {:?} - Result: {}",
708                                    tool_name,
709                                    duration,
710                                    content_preview
711                                );
712
713                                // Add tool result to history and continue ReAct loop
714                                self.append_history(tool_result_message);
715                                // Loop continues - LLM will see tool result and decide next action
716                            }
717                            Err(e) => {
718                                self.emit_event(agents_core::events::AgentEvent::ToolFailed(
719                                    agents_core::events::ToolFailedEvent {
720                                        metadata: self.create_event_metadata(),
721                                        tool_name: tool_name.clone(),
722                                        duration_ms: duration.as_millis() as u64,
723                                        error_message: e.to_string(),
724                                        is_recoverable: true,
725                                        retry_count: 0,
726                                    },
727                                ));
728
729                                tracing::error!(
730                                    "❌ TOOL FAILED: {} in {:?} - Error: {}",
731                                    tool_name,
732                                    duration,
733                                    e
734                                );
735
736                                // Add error to history and continue - let LLM handle the error
737                                let error_message = AgentMessage {
738                                    role: MessageRole::Tool,
739                                    content: MessageContent::Text(format!(
740                                        "Error executing {}: {}",
741                                        tool_name, e
742                                    )),
743                                    metadata: None,
744                                };
745                                self.append_history(error_message);
746                                // Loop continues - LLM will see error and decide how to handle it
747                            }
748                        }
749                    } else {
750                        // Tool not found - add error to history and continue
751                        tracing::warn!("⚠️ Tool '{}' not found", tool_name);
752                        let error_message = AgentMessage {
753                            role: MessageRole::Tool,
754                            content: MessageContent::Text(format!(
755                                "Tool '{}' not found. Available tools: {}",
756                                tool_name,
757                                tools
758                                    .keys()
759                                    .map(|k| k.as_str())
760                                    .collect::<Vec<_>>()
761                                    .join(", ")
762                            )),
763                            metadata: None,
764                        };
765                        self.append_history(error_message);
766                        // Loop continues - LLM will see error and try something else
767                    }
768                }
769                PlannerAction::Terminate => {
770                    // LLM decided to terminate - exit loop
771                    tracing::debug!("πŸ›‘ Agent terminated");
772                    let message = AgentMessage {
773                        role: MessageRole::Agent,
774                        content: MessageContent::Text("Task completed.".into()),
775                        metadata: None,
776                    };
777                    self.append_history(message.clone());
778                    return Ok(message);
779                }
780            }
781        }
782    }
783}
784
785#[async_trait]
786impl AgentHandle for DeepAgent {
787    async fn describe(&self) -> AgentDescriptor {
788        self.descriptor.clone()
789    }
790
791    async fn handle_message(
792        &self,
793        input: AgentMessage,
794        _state: Arc<AgentStateSnapshot>,
795    ) -> anyhow::Result<AgentMessage> {
796        let response = self.handle_message_internal(input, _state).await?;
797
798        // Persist state to checkpointer after successful message handling
799        if let Some(checkpointer) = &self.checkpointer {
800            let state_clone = self
801                .state
802                .read()
803                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
804                .clone();
805            checkpointer
806                .save_state(&ThreadId::default(), &state_clone)
807                .await?;
808        }
809
810        Ok(response)
811    }
812
813    async fn handle_message_stream(
814        &self,
815        input: AgentMessage,
816        _state: Arc<AgentStateSnapshot>,
817    ) -> anyhow::Result<agents_core::agent::AgentStream> {
818        use crate::planner::LlmBackedPlanner;
819        use agents_core::llm::{LlmRequest, StreamChunk};
820        use futures::StreamExt;
821
822        // Add input to history
823        self.append_history(input.clone());
824
825        // Build the request similar to handle_message_internal
826        let mut request = ModelRequest::new(&self.instructions, self.current_history());
827        let tools = self.collect_tools();
828
829        // Apply middleware modifications
830        for middleware in &self.middlewares {
831            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
832            middleware.modify_model_request(&mut ctx).await?;
833        }
834
835        // Convert ModelRequest to LlmRequest and add tools
836        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
837        let llm_request = LlmRequest {
838            system_prompt: request.system_prompt.clone(),
839            messages: request.messages.clone(),
840            tools: tool_schemas,
841        };
842
843        // Try to get the underlying LLM model for streaming
844        let planner_any = self.planner.as_any();
845
846        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
847            // We have an LlmBackedPlanner, use its model for streaming
848            let model = llm_planner.model().clone();
849            let stream = model.generate_stream(llm_request).await?;
850
851            // Wrap stream to emit events to broadcasters
852            let agent_name = self.descriptor.name.clone();
853            let event_dispatcher = self.event_dispatcher.clone();
854
855            let wrapped_stream = stream.then(move |chunk_result| {
856                let dispatcher = event_dispatcher.clone();
857                let name = agent_name.clone();
858
859                async move {
860                    match &chunk_result {
861                        Ok(StreamChunk::TextDelta(token)) => {
862                            // Emit streaming token event
863                            if let Some(ref dispatcher) = dispatcher {
864                                let event = agents_core::events::AgentEvent::StreamingToken(
865                                    agents_core::events::StreamingTokenEvent {
866                                        metadata: agents_core::events::EventMetadata::new(
867                                            "default".to_string(),
868                                            uuid::Uuid::new_v4().to_string(),
869                                            None,
870                                        ),
871                                        agent_name: name.clone(),
872                                        token: token.clone(),
873                                    },
874                                );
875                                dispatcher.dispatch(event).await;
876                            }
877                        }
878                        Ok(StreamChunk::Done { message }) => {
879                            // Emit agent completed event
880                            if let Some(ref dispatcher) = dispatcher {
881                                let full_text = match &message.content {
882                                    agents_core::messaging::MessageContent::Text(t) => t.clone(),
883                                    agents_core::messaging::MessageContent::Json(v) => {
884                                        v.to_string()
885                                    }
886                                };
887
888                                let preview = if full_text.len() > 100 {
889                                    format!("{}...", &full_text[..100])
890                                } else {
891                                    full_text.clone()
892                                };
893
894                                let event = agents_core::events::AgentEvent::AgentCompleted(
895                                    agents_core::events::AgentCompletedEvent {
896                                        metadata: agents_core::events::EventMetadata::new(
897                                            "default".to_string(),
898                                            uuid::Uuid::new_v4().to_string(),
899                                            None,
900                                        ),
901                                        agent_name: name.clone(),
902                                        duration_ms: 0, // Duration not tracked in streaming mode
903                                        response_preview: preview,
904                                        response: full_text,
905                                    },
906                                );
907                                dispatcher.dispatch(event).await;
908                            }
909                        }
910                        _ => {}
911                    }
912                    chunk_result
913                }
914            });
915
916            Ok(Box::pin(wrapped_stream))
917        } else {
918            // Fallback to non-streaming
919            let response = self.handle_message_internal(input, _state).await?;
920            Ok(Box::pin(futures::stream::once(async move {
921                Ok(StreamChunk::Done { message: response })
922            })))
923        }
924    }
925
926    async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
927        let state_guard = self
928            .state
929            .read()
930            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
931        Ok(state_guard.pending_interrupts.first().cloned())
932    }
933
934    async fn resume_with_approval(
935        &self,
936        action: agents_core::hitl::HitlAction,
937    ) -> anyhow::Result<AgentMessage> {
938        self.resume_with_approval(action).await
939    }
940}
941
942/// Create a deep agent from configuration - matches Python middleware assembly exactly
943///
944/// This function assembles the middleware stack in the same order as the Python SDK:
945/// planning β†’ filesystem β†’ subagents β†’ summarization β†’ prompt caching β†’ optional HITL
946pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
947    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
948    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
949
950    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
951    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
952
953    // Build sub-agents from configurations
954    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
955
956    for subagent_config in &config.subagent_configs {
957        // Determine the planner for this sub-agent
958        let sub_planner = if let Some(ref model) = subagent_config.model {
959            // Sub-agent has its own model - wrap it in a planner
960            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
961        } else {
962            // Inherit parent's planner
963            config.planner.clone()
964        };
965
966        // Create a DeepAgentConfig for this sub-agent
967        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
968
969        // Inherit max_iterations from parent
970        sub_cfg = sub_cfg.with_max_iterations(config.max_iterations.get());
971
972        // Configure tools
973        if let Some(ref tools) = subagent_config.tools {
974            tracing::debug!(
975                "  - Configuring {} tools for {}",
976                tools.len(),
977                subagent_config.name
978            );
979            for tool in tools {
980                sub_cfg = sub_cfg.with_tool(tool.clone());
981            }
982        }
983
984        // Configure built-in tools
985        if let Some(ref builtin) = subagent_config.builtin_tools {
986            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
987        }
988
989        // Sub-agents should not have their own sub-agents
990        sub_cfg = sub_cfg.with_auto_general_purpose(false);
991
992        // Configure prompt caching
993        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
994
995        // Inherit PII sanitization setting from parent
996        sub_cfg = sub_cfg.with_pii_sanitization(config.enable_pii_sanitization);
997
998        // Build the sub-agent recursively
999        let sub_agent = create_deep_agent_from_config(sub_cfg);
1000
1001        // Register the sub-agent
1002        registrations.push(SubAgentRegistration {
1003            descriptor: SubAgentDescriptor {
1004                name: subagent_config.name.clone(),
1005                description: subagent_config.description.clone(),
1006            },
1007            agent: Arc::new(sub_agent),
1008        });
1009
1010        tracing::info!("=> Registered sub-agent: {}", subagent_config.name);
1011    }
1012
1013    tracing::info!("=> Total sub-agents registered: {}", registrations.len());
1014
1015    // Optionally inject a general-purpose subagent
1016    if config.auto_general_purpose {
1017        let has_gp = registrations
1018            .iter()
1019            .any(|r| r.descriptor.name == "general-purpose");
1020        if !has_gp {
1021            // Create a subagent with inherited planner/tools and same instructions
1022            let mut sub_cfg =
1023                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
1024                    .with_auto_general_purpose(false)
1025                    .with_prompt_caching(config.enable_prompt_caching)
1026                    .with_pii_sanitization(config.enable_pii_sanitization)
1027                    .with_max_iterations(config.max_iterations.get());
1028            if let Some(ref selected) = config.builtin_tools {
1029                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
1030            }
1031            if let Some(ref sum) = config.summarization {
1032                sub_cfg = sub_cfg.with_summarization(sum.clone());
1033            }
1034            for t in &config.tools {
1035                sub_cfg = sub_cfg.with_tool(t.clone());
1036            }
1037
1038            let gp = create_deep_agent_from_config(sub_cfg);
1039            registrations.push(SubAgentRegistration {
1040                descriptor: SubAgentDescriptor {
1041                    name: "general-purpose".into(),
1042                    description: "Default reasoning agent".into(),
1043                },
1044                agent: Arc::new(gp),
1045            });
1046        }
1047    }
1048
1049    let subagent = Arc::new(SubAgentMiddleware::new_with_events(
1050        registrations,
1051        config.event_dispatcher.clone(),
1052    ));
1053    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
1054
1055    // Create Deep Agent prompt middleware - use override if custom system prompt is set
1056    let deep_agent_prompt: Arc<dyn AgentMiddleware> =
1057        if let Some(ref custom_prompt) = config.custom_system_prompt {
1058            Arc::new(DeepAgentPromptMiddleware::with_override(
1059                custom_prompt.clone(),
1060            ))
1061        } else {
1062            Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()))
1063        };
1064    let summarization = config.summarization.as_ref().map(|cfg| {
1065        Arc::new(SummarizationMiddleware::new(
1066            cfg.messages_to_keep,
1067            cfg.summary_note.clone(),
1068        ))
1069    });
1070    let hitl = if config.tool_interrupts.is_empty() {
1071        None
1072    } else {
1073        // Validate that checkpointer is configured when HITL is enabled
1074        if config.checkpointer.is_none() {
1075            tracing::error!(
1076                "⚠️ HITL middleware requires a checkpointer to persist interrupt state. \
1077                 HITL will be disabled. Please configure a checkpointer to enable HITL."
1078            );
1079            None
1080        } else {
1081            tracing::info!("πŸ”’ HITL enabled for {} tools", config.tool_interrupts.len());
1082            Some(Arc::new(HumanInLoopMiddleware::new(
1083                config.tool_interrupts.clone(),
1084            )))
1085        }
1086    };
1087
1088    // Assemble middleware stack with Deep Agent prompt for automatic tool usage
1089    // Order: base β†’ deep agent prompt β†’ planning β†’ filesystem β†’ subagents β†’ summarization β†’ caching β†’ HITL
1090    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
1091        base_prompt,
1092        deep_agent_prompt,
1093        planning,
1094        filesystem,
1095        subagent,
1096    ];
1097    if let Some(ref summary) = summarization {
1098        middlewares.push(summary.clone());
1099    }
1100    if config.enable_prompt_caching {
1101        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
1102    }
1103    if let Some(ref hitl_mw) = hitl {
1104        middlewares.push(hitl_mw.clone());
1105    }
1106
1107    DeepAgent {
1108        descriptor: AgentDescriptor {
1109            name: "deep-agent".into(),
1110            version: "0.0.1".into(),
1111            description: Some("Rust deep agent".into()),
1112        },
1113        instructions: config.instructions,
1114        planner: config.planner,
1115        middlewares,
1116        base_tools: config.tools,
1117        state,
1118        history,
1119        _summarization: summarization,
1120        _hitl: hitl,
1121        builtin_tools: config.builtin_tools,
1122        checkpointer: config.checkpointer,
1123        event_dispatcher: config.event_dispatcher,
1124        enable_pii_sanitization: config.enable_pii_sanitization,
1125        max_iterations: config.max_iterations,
1126    }
1127}