agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10    ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30// (no streaming types in baseline)
31
32/// Helper function to count todos by status
33fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
34    let mut pending = 0;
35    let mut in_progress = 0;
36    let mut completed = 0;
37
38    for todo in todos {
39        match todo.status {
40            agents_core::state::TodoStatus::Pending => pending += 1,
41            agents_core::state::TodoStatus::InProgress => in_progress += 1,
42            agents_core::state::TodoStatus::Completed => completed += 1,
43        }
44    }
45
46    (pending, in_progress, completed)
47}
48
49/// Core Deep Agent runtime implementation
50///
51/// This struct contains all the runtime state and behavior for a Deep Agent,
52/// including middleware management, tool execution, HITL support, and state persistence.
53pub struct DeepAgent {
54    descriptor: AgentDescriptor,
55    instructions: String,
56    planner: Arc<dyn PlannerHandle>,
57    middlewares: Vec<Arc<dyn AgentMiddleware>>,
58    base_tools: Vec<ToolBox>,
59    state: Arc<RwLock<AgentStateSnapshot>>,
60    history: Arc<RwLock<Vec<AgentMessage>>>,
61    _summarization: Option<Arc<SummarizationMiddleware>>,
62    _hitl: Option<Arc<HumanInLoopMiddleware>>,
63    builtin_tools: Option<HashSet<String>>,
64    checkpointer: Option<Arc<dyn Checkpointer>>,
65    event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
66}
67
68impl DeepAgent {
69    fn collect_tools(&self) -> HashMap<String, ToolBox> {
70        let mut tools: HashMap<String, ToolBox> = HashMap::new();
71        for tool in &self.base_tools {
72            tools.insert(tool.schema().name.clone(), tool.clone());
73        }
74        for middleware in &self.middlewares {
75            for tool in middleware.tools() {
76                let tool_name = tool.schema().name.clone();
77                if self.should_include(&tool_name) {
78                    tools.insert(tool_name, tool);
79                }
80            }
81        }
82        tools
83    }
84    // no streaming path in baseline
85
86    fn should_include(&self, name: &str) -> bool {
87        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
88        if !is_builtin {
89            return true;
90        }
91        match &self.builtin_tools {
92            None => true,
93            Some(selected) => selected.contains(name),
94        }
95    }
96
97    fn append_history(&self, message: AgentMessage) {
98        if let Ok(mut history) = self.history.write() {
99            history.push(message);
100        }
101    }
102
103    fn current_history(&self) -> Vec<AgentMessage> {
104        self.history.read().map(|h| h.clone()).unwrap_or_default()
105    }
106
107    fn emit_event(&self, event: agents_core::events::AgentEvent) {
108        if let Some(dispatcher) = &self.event_dispatcher {
109            let dispatcher_clone = dispatcher.clone();
110            tokio::spawn(async move {
111                dispatcher_clone.dispatch(event).await;
112            });
113        }
114    }
115
116    fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
117        agents_core::events::EventMetadata::new(
118            "default".to_string(),
119            uuid::Uuid::new_v4().to_string(),
120            None,
121        )
122    }
123
124    fn truncate_message(&self, message: &AgentMessage) -> String {
125        let text = match &message.content {
126            MessageContent::Text(t) => t.clone(),
127            MessageContent::Json(v) => v.to_string(),
128        };
129        if text.len() > 100 {
130            format!("{}...", &text[..100])
131        } else {
132            text
133        }
134    }
135
136    fn summarize_payload(&self, payload: &Value) -> String {
137        let json_str = payload.to_string();
138        if json_str.len() > 100 {
139            format!("{}...", &json_str[..100])
140        } else {
141            json_str
142        }
143    }
144
145    /// Save the current agent state to the configured checkpointer.
146    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
147        if let Some(ref checkpointer) = self.checkpointer {
148            let state = self
149                .state
150                .read()
151                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
152                .clone();
153
154            // Calculate state size before saving
155            let state_json = serde_json::to_string(&state)?;
156            let state_size = state_json.len();
157
158            // Save state to checkpointer
159            checkpointer.save_state(thread_id, &state).await?;
160
161            // Emit StateCheckpointed event after successful save
162            self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
163                agents_core::events::StateCheckpointedEvent {
164                    metadata: self.create_event_metadata(),
165                    checkpoint_id: thread_id.to_string(),
166                    state_size_bytes: state_size,
167                },
168            ));
169
170            tracing::debug!(
171                thread_id = %thread_id,
172                state_size_bytes = state_size,
173                "πŸ’Ύ State checkpointed and event emitted"
174            );
175
176            Ok(())
177        } else {
178            tracing::warn!("Attempted to save state but no checkpointer is configured");
179            Ok(())
180        }
181    }
182
183    /// Load agent state from the configured checkpointer.
184    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
185        if let Some(ref checkpointer) = self.checkpointer {
186            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
187                *self
188                    .state
189                    .write()
190                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
191                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
192                Ok(true)
193            } else {
194                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
195                Ok(false)
196            }
197        } else {
198            tracing::warn!("Attempted to load state but no checkpointer is configured");
199            Ok(false)
200        }
201    }
202
203    /// Delete saved state for the specified thread.
204    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
205        if let Some(ref checkpointer) = self.checkpointer {
206            checkpointer.delete_thread(thread_id).await
207        } else {
208            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
209            Ok(())
210        }
211    }
212
213    /// List all threads with saved state.
214    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
215        if let Some(ref checkpointer) = self.checkpointer {
216            checkpointer.list_threads().await
217        } else {
218            Ok(Vec::new())
219        }
220    }
221
222    async fn execute_tool(
223        &self,
224        tool: ToolBox,
225        _tool_name: String,
226        payload: Value,
227    ) -> anyhow::Result<AgentMessage> {
228        let state_snapshot = self.state.read().unwrap().clone();
229        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
230
231        let result = tool.execute(payload, ctx).await?;
232        Ok(self.apply_tool_result(result))
233    }
234
235    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
236        match result {
237            ToolResult::Message(message) => {
238                // Tool results are not added to conversation history
239                // Only the final LLM response after tool execution is added
240                message
241            }
242            ToolResult::WithStateUpdate {
243                message,
244                state_diff,
245            } => {
246                // Check if todos were updated
247                let todos_updated = state_diff.todos.is_some();
248
249                if let Ok(mut state) = self.state.write() {
250                    let command = agents_core::command::Command::with_state(state_diff);
251                    command.apply_to(&mut state);
252
253                    // Emit TodosUpdated event if todos were modified
254                    if todos_updated {
255                        let (pending_count, in_progress_count, completed_count) =
256                            count_todos(&state.todos);
257
258                        self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
259                            agents_core::events::TodosUpdatedEvent {
260                                metadata: self.create_event_metadata(),
261                                todos: state.todos.clone(),
262                                pending_count,
263                                in_progress_count,
264                                completed_count,
265                                last_updated: chrono::Utc::now().to_rfc3339(),
266                            },
267                        ));
268
269                        tracing::debug!(
270                            pending = pending_count,
271                            in_progress = in_progress_count,
272                            completed = completed_count,
273                            total = state.todos.len(),
274                            "πŸ“ Todos updated and event emitted"
275                        );
276                    }
277                }
278                // Tool results are not added to conversation history
279                // Only the final LLM response after tool execution is added
280                message
281            }
282        }
283    }
284
285    /// Get the current pending interrupt, if any.
286    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
287        self.state
288            .read()
289            .ok()
290            .and_then(|guard| guard.pending_interrupts.first().cloned())
291    }
292
293    /// Add a broadcaster dynamically to the agent's event dispatcher.
294    ///
295    /// This allows adding broadcasters after the agent is built, which is useful
296    /// for per-conversation or per-customer broadcasters.
297    ///
298    /// # Example
299    /// ```no_run
300    /// use std::sync::Arc;
301    /// // agent.add_broadcaster(Arc::new(MyBroadcaster::new()));
302    /// ```
303    pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
304        if let Some(dispatcher) = &self.event_dispatcher {
305            dispatcher.add_broadcaster(broadcaster);
306            tracing::debug!("Broadcaster added to event dispatcher");
307        } else {
308            tracing::warn!("add_broadcaster called but no event dispatcher configured");
309        }
310    }
311
312    /// Resume execution after human approval of an interrupt.
313    pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
314        // Get the first pending interrupt
315        let interrupt = {
316            let state_guard = self
317                .state
318                .read()
319                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
320            state_guard
321                .pending_interrupts
322                .first()
323                .cloned()
324                .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
325        };
326
327        let result_message = match action {
328            HitlAction::Accept => {
329                // Execute with original args
330                let AgentInterrupt::HumanInLoop(hitl) = interrupt;
331                tracing::info!(
332                    tool_name = %hitl.tool_name,
333                    call_id = %hitl.call_id,
334                    "βœ… HITL: Tool approved, executing with original arguments"
335                );
336
337                let tools = self.collect_tools();
338                let tool = tools
339                    .get(&hitl.tool_name)
340                    .cloned()
341                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
342
343                self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
344                    .await?
345            }
346
347            HitlAction::Edit {
348                tool_name,
349                tool_args,
350            } => {
351                // Execute with modified args
352                tracing::info!(
353                    tool_name = %tool_name,
354                    "✏️ HITL: Tool edited, executing with modified arguments"
355                );
356
357                let tools = self.collect_tools();
358                let tool = tools
359                    .get(&tool_name)
360                    .cloned()
361                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
362
363                self.execute_tool(tool, tool_name, tool_args).await?
364            }
365
366            HitlAction::Reject { reason } => {
367                // Don't execute - return rejection message
368                tracing::info!("❌ HITL: Tool rejected");
369
370                let text = reason
371                    .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
372
373                let message = AgentMessage {
374                    role: MessageRole::Tool,
375                    content: MessageContent::Text(text),
376                    metadata: None,
377                };
378
379                self.append_history(message.clone());
380                message
381            }
382
383            HitlAction::Respond { message } => {
384                // Don't execute - return custom message
385                tracing::info!("πŸ’¬ HITL: Custom response provided");
386
387                self.append_history(message.clone());
388                message
389            }
390        };
391
392        // Clear the interrupt from state
393        {
394            let mut state_guard = self
395                .state
396                .write()
397                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
398            state_guard.clear_interrupts();
399        }
400
401        // Persist cleared state
402        if let Some(checkpointer) = &self.checkpointer {
403            let state_clone = self
404                .state
405                .read()
406                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
407                .clone();
408            checkpointer
409                .save_state(&ThreadId::default(), &state_clone)
410                .await?;
411        }
412
413        Ok(result_message)
414    }
415
416    /// Handle message from string input - converts string to AgentMessage internally
417    pub async fn handle_message(
418        &self,
419        input: impl AsRef<str>,
420        state: Arc<AgentStateSnapshot>,
421    ) -> anyhow::Result<AgentMessage> {
422        self.handle_message_with_metadata(input, None, state).await
423    }
424
425    /// Handle message from string input with metadata - converts string to AgentMessage internally
426    pub async fn handle_message_with_metadata(
427        &self,
428        input: impl AsRef<str>,
429        metadata: Option<MessageMetadata>,
430        state: Arc<AgentStateSnapshot>,
431    ) -> anyhow::Result<AgentMessage> {
432        let agent_message = AgentMessage {
433            role: MessageRole::User,
434            content: MessageContent::Text(input.as_ref().to_string()),
435            metadata,
436        };
437        self.handle_message_internal(agent_message, state).await
438    }
439
440    /// Internal method that contains the actual message handling logic
441    async fn handle_message_internal(
442        &self,
443        input: AgentMessage,
444        _state: Arc<AgentStateSnapshot>,
445    ) -> anyhow::Result<AgentMessage> {
446        let start_time = std::time::Instant::now();
447
448        self.emit_event(agents_core::events::AgentEvent::AgentStarted(
449            agents_core::events::AgentStartedEvent {
450                metadata: self.create_event_metadata(),
451                agent_name: self.descriptor.name.clone(),
452                message_preview: self.truncate_message(&input),
453            },
454        ));
455
456        self.append_history(input.clone());
457
458        // ReAct loop: continue until LLM responds with text (not tool calls)
459        let max_iterations = 10;
460        let mut iteration = 0;
461
462        loop {
463            iteration += 1;
464            if iteration > max_iterations {
465                tracing::warn!(
466                    "⚠️ Max iterations ({}) reached, stopping ReAct loop",
467                    max_iterations
468                );
469                let response = AgentMessage {
470                    role: MessageRole::Agent,
471                    content: MessageContent::Text(
472                        "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
473                    ),
474                    metadata: None,
475                };
476                self.append_history(response.clone());
477                return Ok(response);
478            }
479
480            tracing::debug!("πŸ”„ ReAct iteration {}/{}", iteration, max_iterations);
481
482            // Build request with current history
483            let mut request = ModelRequest::new(&self.instructions, self.current_history());
484            let tools = self.collect_tools();
485            for middleware in &self.middlewares {
486                let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
487                middleware.modify_model_request(&mut ctx).await?;
488            }
489
490            let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
491            let context = PlannerContext {
492                history: request.messages.clone(),
493                system_prompt: request.system_prompt.clone(),
494                tools: tool_schemas,
495            };
496            let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
497
498            // Ask LLM what to do
499            let decision = self.planner.plan(context, state_snapshot).await?;
500
501            // Emit PlanningComplete event
502            self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
503                agents_core::events::PlanningCompleteEvent {
504                    metadata: self.create_event_metadata(),
505                    action_type: match &decision.next_action {
506                        PlannerAction::Respond { .. } => "respond".to_string(),
507                        PlannerAction::CallTool { .. } => "call_tool".to_string(),
508                        PlannerAction::Terminate => "terminate".to_string(),
509                    },
510                    action_summary: match &decision.next_action {
511                        PlannerAction::Respond { message } => {
512                            format!("Respond: {}", self.truncate_message(message))
513                        }
514                        PlannerAction::CallTool { tool_name, .. } => {
515                            format!("Call tool: {}", tool_name)
516                        }
517                        PlannerAction::Terminate => "Terminate".to_string(),
518                    },
519                },
520            ));
521
522            match decision.next_action {
523                PlannerAction::Respond { message } => {
524                    // LLM decided to respond with text - exit loop
525                    self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
526                        agents_core::events::AgentCompletedEvent {
527                            metadata: self.create_event_metadata(),
528                            agent_name: self.descriptor.name.clone(),
529                            duration_ms: start_time.elapsed().as_millis() as u64,
530                            response_preview: self.truncate_message(&message),
531                        },
532                    ));
533
534                    self.append_history(message.clone());
535                    return Ok(message);
536                }
537                PlannerAction::CallTool { tool_name, payload } => {
538                    // Add AI's decision to call tool to history
539                    // This is needed for OpenAI's API which expects:
540                    // 1. Assistant message with tool call
541                    // 2. Tool message with result
542                    let tool_call_message = AgentMessage {
543                        role: MessageRole::Agent,
544                        content: MessageContent::Text(format!(
545                            "Calling tool: {} with args: {}",
546                            tool_name,
547                            serde_json::to_string(&payload).unwrap_or_default()
548                        )),
549                        metadata: None,
550                    };
551                    self.append_history(tool_call_message);
552
553                    if let Some(tool) = tools.get(&tool_name).cloned() {
554                        // Check all middleware for interrupts before executing tool
555                        let call_id = format!("call_{}", uuid::Uuid::new_v4());
556                        for middleware in &self.middlewares {
557                            if let Some(interrupt) = middleware
558                                .before_tool_execution(&tool_name, &payload, &call_id)
559                                .await?
560                            {
561                                // Save interrupt to state
562                                {
563                                    let mut state_guard = self.state.write().map_err(|_| {
564                                        anyhow::anyhow!("Failed to acquire write lock on state")
565                                    })?;
566                                    state_guard.add_interrupt(interrupt.clone());
567                                }
568
569                                // Persist state with checkpointer
570                                if let Some(checkpointer) = &self.checkpointer {
571                                    let state_clone = self
572                                        .state
573                                        .read()
574                                        .map_err(|_| {
575                                            anyhow::anyhow!("Failed to acquire read lock on state")
576                                        })?
577                                        .clone();
578                                    checkpointer
579                                        .save_state(&ThreadId::default(), &state_clone)
580                                        .await?;
581                                }
582
583                                // Return interrupt message - execution pauses here
584                                let interrupt_message = AgentMessage {
585                                    role: MessageRole::System,
586                                    content: MessageContent::Text(format!(
587                                        "⏸️ Execution paused: Tool '{}' requires human approval",
588                                        tool_name
589                                    )),
590                                    metadata: None,
591                                };
592                                self.append_history(interrupt_message.clone());
593                                return Ok(interrupt_message);
594                            }
595                        }
596
597                        // No interrupt - execute tool
598                        let tool_start_time = std::time::Instant::now();
599
600                        self.emit_event(agents_core::events::AgentEvent::ToolStarted(
601                            agents_core::events::ToolStartedEvent {
602                                metadata: self.create_event_metadata(),
603                                tool_name: tool_name.clone(),
604                                input_summary: self.summarize_payload(&payload),
605                            },
606                        ));
607
608                        tracing::warn!(
609                            "βš™οΈ EXECUTING TOOL: {} with payload: {}",
610                            tool_name,
611                            serde_json::to_string(&payload)
612                                .unwrap_or_else(|_| "invalid json".to_string())
613                        );
614
615                        let result = self
616                            .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
617                            .await;
618
619                        let duration = tool_start_time.elapsed();
620                        match result {
621                            Ok(tool_result_message) => {
622                                let content_preview = match &tool_result_message.content {
623                                    MessageContent::Text(t) => {
624                                        if t.len() > 100 {
625                                            format!("{}... ({} chars)", &t[..100], t.len())
626                                        } else {
627                                            t.clone()
628                                        }
629                                    }
630                                    MessageContent::Json(v) => {
631                                        format!("JSON: {} bytes", v.to_string().len())
632                                    }
633                                };
634
635                                self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
636                                    agents_core::events::ToolCompletedEvent {
637                                        metadata: self.create_event_metadata(),
638                                        tool_name: tool_name.clone(),
639                                        duration_ms: duration.as_millis() as u64,
640                                        result_summary: content_preview.clone(),
641                                        success: true,
642                                    },
643                                ));
644
645                                tracing::warn!(
646                                    "βœ… TOOL COMPLETED: {} in {:?} - Result: {}",
647                                    tool_name,
648                                    duration,
649                                    content_preview
650                                );
651
652                                // Add tool result to history and continue ReAct loop
653                                self.append_history(tool_result_message);
654                                // Loop continues - LLM will see tool result and decide next action
655                            }
656                            Err(e) => {
657                                self.emit_event(agents_core::events::AgentEvent::ToolFailed(
658                                    agents_core::events::ToolFailedEvent {
659                                        metadata: self.create_event_metadata(),
660                                        tool_name: tool_name.clone(),
661                                        duration_ms: duration.as_millis() as u64,
662                                        error_message: e.to_string(),
663                                        is_recoverable: true,
664                                        retry_count: 0,
665                                    },
666                                ));
667
668                                tracing::error!(
669                                    "❌ TOOL FAILED: {} in {:?} - Error: {}",
670                                    tool_name,
671                                    duration,
672                                    e
673                                );
674
675                                // Add error to history and continue - let LLM handle the error
676                                let error_message = AgentMessage {
677                                    role: MessageRole::Tool,
678                                    content: MessageContent::Text(format!(
679                                        "Error executing {}: {}",
680                                        tool_name, e
681                                    )),
682                                    metadata: None,
683                                };
684                                self.append_history(error_message);
685                                // Loop continues - LLM will see error and decide how to handle it
686                            }
687                        }
688                    } else {
689                        // Tool not found - add error to history and continue
690                        tracing::warn!("⚠️ Tool '{}' not found", tool_name);
691                        let error_message = AgentMessage {
692                            role: MessageRole::Tool,
693                            content: MessageContent::Text(format!(
694                                "Tool '{}' not found. Available tools: {}",
695                                tool_name,
696                                tools
697                                    .keys()
698                                    .map(|k| k.as_str())
699                                    .collect::<Vec<_>>()
700                                    .join(", ")
701                            )),
702                            metadata: None,
703                        };
704                        self.append_history(error_message);
705                        // Loop continues - LLM will see error and try something else
706                    }
707                }
708                PlannerAction::Terminate => {
709                    // LLM decided to terminate - exit loop
710                    tracing::debug!("πŸ›‘ Agent terminated");
711                    let message = AgentMessage {
712                        role: MessageRole::Agent,
713                        content: MessageContent::Text("Task completed.".into()),
714                        metadata: None,
715                    };
716                    self.append_history(message.clone());
717                    return Ok(message);
718                }
719            }
720        }
721    }
722}
723
724#[async_trait]
725impl AgentHandle for DeepAgent {
726    async fn describe(&self) -> AgentDescriptor {
727        self.descriptor.clone()
728    }
729
730    async fn handle_message(
731        &self,
732        input: AgentMessage,
733        _state: Arc<AgentStateSnapshot>,
734    ) -> anyhow::Result<AgentMessage> {
735        self.handle_message_internal(input, _state).await
736    }
737
738    async fn handle_message_stream(
739        &self,
740        input: AgentMessage,
741        _state: Arc<AgentStateSnapshot>,
742    ) -> anyhow::Result<agents_core::agent::AgentStream> {
743        use crate::planner::LlmBackedPlanner;
744        use agents_core::llm::{LlmRequest, StreamChunk};
745
746        // Add input to history
747        self.append_history(input.clone());
748
749        // Build the request similar to handle_message_internal
750        let mut request = ModelRequest::new(&self.instructions, self.current_history());
751        let tools = self.collect_tools();
752
753        // Apply middleware modifications
754        for middleware in &self.middlewares {
755            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
756            middleware.modify_model_request(&mut ctx).await?;
757        }
758
759        // Convert ModelRequest to LlmRequest and add tools
760        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
761        let llm_request = LlmRequest {
762            system_prompt: request.system_prompt.clone(),
763            messages: request.messages.clone(),
764            tools: tool_schemas,
765        };
766
767        // Try to get the underlying LLM model for streaming
768        let planner_any = self.planner.as_any();
769
770        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
771            // We have an LlmBackedPlanner, use its model for streaming
772            let model = llm_planner.model().clone();
773            let stream = model.generate_stream(llm_request).await?;
774            Ok(stream)
775        } else {
776            // Fallback to non-streaming
777            let response = self.handle_message_internal(input, _state).await?;
778            Ok(Box::pin(futures::stream::once(async move {
779                Ok(StreamChunk::Done { message: response })
780            })))
781        }
782    }
783
784    async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
785        let state_guard = self
786            .state
787            .read()
788            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
789        Ok(state_guard.pending_interrupts.first().cloned())
790    }
791
792    async fn resume_with_approval(
793        &self,
794        action: agents_core::hitl::HitlAction,
795    ) -> anyhow::Result<AgentMessage> {
796        self.resume_with_approval(action).await
797    }
798}
799
800/// Create a deep agent from configuration - matches Python middleware assembly exactly
801///
802/// This function assembles the middleware stack in the same order as the Python SDK:
803/// planning β†’ filesystem β†’ subagents β†’ summarization β†’ prompt caching β†’ optional HITL
804pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
805    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
806    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
807
808    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
809    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
810
811    // Build sub-agents from configurations
812    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
813
814    // Build custom sub-agents from configs
815    for subagent_config in &config.subagent_configs {
816        // Determine the planner for this sub-agent
817        let sub_planner = if let Some(ref model) = subagent_config.model {
818            // Sub-agent has its own model - wrap it in a planner
819            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
820        } else {
821            // Inherit parent's planner
822            config.planner.clone()
823        };
824
825        // Create a DeepAgentConfig for this sub-agent
826        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
827
828        // Configure tools
829        if let Some(ref tools) = subagent_config.tools {
830            for tool in tools {
831                sub_cfg = sub_cfg.with_tool(tool.clone());
832            }
833        }
834
835        // Configure built-in tools
836        if let Some(ref builtin) = subagent_config.builtin_tools {
837            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
838        }
839
840        // Sub-agents should not have their own sub-agents
841        sub_cfg = sub_cfg.with_auto_general_purpose(false);
842
843        // Configure prompt caching
844        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
845
846        // Build the sub-agent recursively
847        let sub_agent = create_deep_agent_from_config(sub_cfg);
848
849        // Register the sub-agent
850        registrations.push(SubAgentRegistration {
851            descriptor: SubAgentDescriptor {
852                name: subagent_config.name.clone(),
853                description: subagent_config.description.clone(),
854            },
855            agent: Arc::new(sub_agent),
856        });
857    }
858
859    // Optionally inject a general-purpose subagent
860    if config.auto_general_purpose {
861        let has_gp = registrations
862            .iter()
863            .any(|r| r.descriptor.name == "general-purpose");
864        if !has_gp {
865            // Create a subagent with inherited planner/tools and same instructions
866            let mut sub_cfg =
867                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
868                    .with_auto_general_purpose(false)
869                    .with_prompt_caching(config.enable_prompt_caching);
870            if let Some(ref selected) = config.builtin_tools {
871                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
872            }
873            if let Some(ref sum) = config.summarization {
874                sub_cfg = sub_cfg.with_summarization(sum.clone());
875            }
876            for t in &config.tools {
877                sub_cfg = sub_cfg.with_tool(t.clone());
878            }
879
880            let gp = create_deep_agent_from_config(sub_cfg);
881            registrations.push(SubAgentRegistration {
882                descriptor: SubAgentDescriptor {
883                    name: "general-purpose".into(),
884                    description: "Default reasoning agent".into(),
885                },
886                agent: Arc::new(gp),
887            });
888        }
889    }
890
891    let subagent = Arc::new(SubAgentMiddleware::new_with_events(
892        registrations,
893        config.event_dispatcher.clone(),
894    ));
895    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
896    let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
897    let summarization = config.summarization.as_ref().map(|cfg| {
898        Arc::new(SummarizationMiddleware::new(
899            cfg.messages_to_keep,
900            cfg.summary_note.clone(),
901        ))
902    });
903    let hitl = if config.tool_interrupts.is_empty() {
904        None
905    } else {
906        // Validate that checkpointer is configured when HITL is enabled
907        if config.checkpointer.is_none() {
908            tracing::error!(
909                "⚠️ HITL middleware requires a checkpointer to persist interrupt state. \
910                 HITL will be disabled. Please configure a checkpointer to enable HITL."
911            );
912            None
913        } else {
914            tracing::info!("πŸ”’ HITL enabled for {} tools", config.tool_interrupts.len());
915            Some(Arc::new(HumanInLoopMiddleware::new(
916                config.tool_interrupts.clone(),
917            )))
918        }
919    };
920
921    // Assemble middleware stack with Deep Agent prompt for automatic tool usage
922    // Order: base β†’ deep agent prompt β†’ planning β†’ filesystem β†’ subagents β†’ summarization β†’ caching β†’ HITL
923    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
924        base_prompt,
925        deep_agent_prompt,
926        planning,
927        filesystem,
928        subagent,
929    ];
930    if let Some(ref summary) = summarization {
931        middlewares.push(summary.clone());
932    }
933    if config.enable_prompt_caching {
934        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
935    }
936    if let Some(ref hitl_mw) = hitl {
937        middlewares.push(hitl_mw.clone());
938    }
939
940    DeepAgent {
941        descriptor: AgentDescriptor {
942            name: "deep-agent".into(),
943            version: "0.0.1".into(),
944            description: Some("Rust deep agent".into()),
945        },
946        instructions: config.instructions,
947        planner: config.planner,
948        middlewares,
949        base_tools: config.tools,
950        state,
951        history,
952        _summarization: summarization,
953        _hitl: hitl,
954        builtin_tools: config.builtin_tools,
955        checkpointer: config.checkpointer,
956        event_dispatcher: config.event_dispatcher,
957    }
958}