agents_runtime/agent/
runtime.rs

1//! Deep Agent runtime implementation
2//!
3//! This module contains the core DeepAgent struct and its runtime behavior,
4//! including message handling, tool execution, HITL support, and state management.
5
6use super::config::DeepAgentConfig;
7use crate::middleware::{
8    AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9    DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10    ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11    SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15    AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27// Built-in tool names exposed by middlewares. The `task` tool for subagents is not gated.
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30// (no streaming types in baseline)
31
32/// Helper function to count todos by status
33fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
34    let mut pending = 0;
35    let mut in_progress = 0;
36    let mut completed = 0;
37
38    for todo in todos {
39        match todo.status {
40            agents_core::state::TodoStatus::Pending => pending += 1,
41            agents_core::state::TodoStatus::InProgress => in_progress += 1,
42            agents_core::state::TodoStatus::Completed => completed += 1,
43        }
44    }
45
46    (pending, in_progress, completed)
47}
48
49/// Core Deep Agent runtime implementation
50///
51/// This struct contains all the runtime state and behavior for a Deep Agent,
52/// including middleware management, tool execution, HITL support, and state persistence.
53pub struct DeepAgent {
54    descriptor: AgentDescriptor,
55    instructions: String,
56    planner: Arc<dyn PlannerHandle>,
57    middlewares: Vec<Arc<dyn AgentMiddleware>>,
58    base_tools: Vec<ToolBox>,
59    state: Arc<RwLock<AgentStateSnapshot>>,
60    history: Arc<RwLock<Vec<AgentMessage>>>,
61    _summarization: Option<Arc<SummarizationMiddleware>>,
62    _hitl: Option<Arc<HumanInLoopMiddleware>>,
63    builtin_tools: Option<HashSet<String>>,
64    checkpointer: Option<Arc<dyn Checkpointer>>,
65    event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
66}
67
68impl DeepAgent {
69    fn collect_tools(&self) -> HashMap<String, ToolBox> {
70        let mut tools: HashMap<String, ToolBox> = HashMap::new();
71        for tool in &self.base_tools {
72            tools.insert(tool.schema().name.clone(), tool.clone());
73        }
74        for middleware in &self.middlewares {
75            for tool in middleware.tools() {
76                let tool_name = tool.schema().name.clone();
77                if self.should_include(&tool_name) {
78                    tools.insert(tool_name, tool);
79                }
80            }
81        }
82        tools
83    }
84    // no streaming path in baseline
85
86    fn should_include(&self, name: &str) -> bool {
87        let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
88        if !is_builtin {
89            return true;
90        }
91        match &self.builtin_tools {
92            None => true,
93            Some(selected) => selected.contains(name),
94        }
95    }
96
97    fn append_history(&self, message: AgentMessage) {
98        if let Ok(mut history) = self.history.write() {
99            history.push(message);
100        }
101    }
102
103    fn current_history(&self) -> Vec<AgentMessage> {
104        self.history.read().map(|h| h.clone()).unwrap_or_default()
105    }
106
107    fn emit_event(&self, event: agents_core::events::AgentEvent) {
108        if let Some(dispatcher) = &self.event_dispatcher {
109            let dispatcher_clone = dispatcher.clone();
110            tokio::spawn(async move {
111                dispatcher_clone.dispatch(event).await;
112            });
113        }
114    }
115
116    fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
117        agents_core::events::EventMetadata::new(
118            "default".to_string(),
119            uuid::Uuid::new_v4().to_string(),
120            None,
121        )
122    }
123
124    fn truncate_message(&self, message: &AgentMessage) -> String {
125        let text = match &message.content {
126            MessageContent::Text(t) => t.clone(),
127            MessageContent::Json(v) => v.to_string(),
128        };
129        if text.len() > 100 {
130            format!("{}...", &text[..100])
131        } else {
132            text
133        }
134    }
135
136    fn summarize_payload(&self, payload: &Value) -> String {
137        let json_str = payload.to_string();
138        if json_str.len() > 100 {
139            format!("{}...", &json_str[..100])
140        } else {
141            json_str
142        }
143    }
144
145    /// Save the current agent state to the configured checkpointer.
146    pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
147        if let Some(ref checkpointer) = self.checkpointer {
148            let state = self
149                .state
150                .read()
151                .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
152                .clone();
153
154            // Calculate state size before saving
155            let state_json = serde_json::to_string(&state)?;
156            let state_size = state_json.len();
157
158            // Save state to checkpointer
159            checkpointer.save_state(thread_id, &state).await?;
160
161            // Emit StateCheckpointed event after successful save
162            self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
163                agents_core::events::StateCheckpointedEvent {
164                    metadata: self.create_event_metadata(),
165                    checkpoint_id: thread_id.to_string(),
166                    state_size_bytes: state_size,
167                },
168            ));
169
170            tracing::debug!(
171                thread_id = %thread_id,
172                state_size_bytes = state_size,
173                "πŸ’Ύ State checkpointed and event emitted"
174            );
175
176            Ok(())
177        } else {
178            tracing::warn!("Attempted to save state but no checkpointer is configured");
179            Ok(())
180        }
181    }
182
183    /// Load agent state from the configured checkpointer.
184    pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
185        if let Some(ref checkpointer) = self.checkpointer {
186            if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
187                *self
188                    .state
189                    .write()
190                    .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
191                tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
192                Ok(true)
193            } else {
194                tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
195                Ok(false)
196            }
197        } else {
198            tracing::warn!("Attempted to load state but no checkpointer is configured");
199            Ok(false)
200        }
201    }
202
203    /// Delete saved state for the specified thread.
204    pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
205        if let Some(ref checkpointer) = self.checkpointer {
206            checkpointer.delete_thread(thread_id).await
207        } else {
208            tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
209            Ok(())
210        }
211    }
212
213    /// List all threads with saved state.
214    pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
215        if let Some(ref checkpointer) = self.checkpointer {
216            checkpointer.list_threads().await
217        } else {
218            Ok(Vec::new())
219        }
220    }
221
222    async fn execute_tool(
223        &self,
224        tool: ToolBox,
225        _tool_name: String,
226        payload: Value,
227    ) -> anyhow::Result<AgentMessage> {
228        let state_snapshot = self.state.read().unwrap().clone();
229        let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
230
231        let result = tool.execute(payload, ctx).await?;
232        Ok(self.apply_tool_result(result))
233    }
234
235    fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
236        match result {
237            ToolResult::Message(message) => {
238                // Tool results are not added to conversation history
239                // Only the final LLM response after tool execution is added
240                message
241            }
242            ToolResult::WithStateUpdate {
243                message,
244                state_diff,
245            } => {
246                // Check if todos were updated
247                let todos_updated = state_diff.todos.is_some();
248
249                if let Ok(mut state) = self.state.write() {
250                    let command = agents_core::command::Command::with_state(state_diff);
251                    command.apply_to(&mut state);
252
253                    // Emit TodosUpdated event if todos were modified
254                    if todos_updated {
255                        let (pending_count, in_progress_count, completed_count) =
256                            count_todos(&state.todos);
257
258                        self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
259                            agents_core::events::TodosUpdatedEvent {
260                                metadata: self.create_event_metadata(),
261                                todos: state.todos.clone(),
262                                pending_count,
263                                in_progress_count,
264                                completed_count,
265                                last_updated: chrono::Utc::now().to_rfc3339(),
266                            },
267                        ));
268
269                        tracing::debug!(
270                            pending = pending_count,
271                            in_progress = in_progress_count,
272                            completed = completed_count,
273                            total = state.todos.len(),
274                            "πŸ“ Todos updated and event emitted"
275                        );
276                    }
277                }
278                // Tool results are not added to conversation history
279                // Only the final LLM response after tool execution is added
280                message
281            }
282        }
283    }
284
285    /// Get the current pending interrupt, if any.
286    pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
287        self.state
288            .read()
289            .ok()
290            .and_then(|guard| guard.pending_interrupts.first().cloned())
291    }
292
293    /// Resume execution after human approval of an interrupt.
294    pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
295        // Get the first pending interrupt
296        let interrupt = {
297            let state_guard = self
298                .state
299                .read()
300                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
301            state_guard
302                .pending_interrupts
303                .first()
304                .cloned()
305                .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
306        };
307
308        let result_message = match action {
309            HitlAction::Accept => {
310                // Execute with original args
311                let AgentInterrupt::HumanInLoop(hitl) = interrupt;
312                tracing::info!(
313                    tool_name = %hitl.tool_name,
314                    call_id = %hitl.call_id,
315                    "βœ… HITL: Tool approved, executing with original arguments"
316                );
317
318                let tools = self.collect_tools();
319                let tool = tools
320                    .get(&hitl.tool_name)
321                    .cloned()
322                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
323
324                self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
325                    .await?
326            }
327
328            HitlAction::Edit {
329                tool_name,
330                tool_args,
331            } => {
332                // Execute with modified args
333                tracing::info!(
334                    tool_name = %tool_name,
335                    "✏️ HITL: Tool edited, executing with modified arguments"
336                );
337
338                let tools = self.collect_tools();
339                let tool = tools
340                    .get(&tool_name)
341                    .cloned()
342                    .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
343
344                self.execute_tool(tool, tool_name, tool_args).await?
345            }
346
347            HitlAction::Reject { reason } => {
348                // Don't execute - return rejection message
349                tracing::info!("❌ HITL: Tool rejected");
350
351                let text = reason
352                    .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
353
354                let message = AgentMessage {
355                    role: MessageRole::Tool,
356                    content: MessageContent::Text(text),
357                    metadata: None,
358                };
359
360                self.append_history(message.clone());
361                message
362            }
363
364            HitlAction::Respond { message } => {
365                // Don't execute - return custom message
366                tracing::info!("πŸ’¬ HITL: Custom response provided");
367
368                self.append_history(message.clone());
369                message
370            }
371        };
372
373        // Clear the interrupt from state
374        {
375            let mut state_guard = self
376                .state
377                .write()
378                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
379            state_guard.clear_interrupts();
380        }
381
382        // Persist cleared state
383        if let Some(checkpointer) = &self.checkpointer {
384            let state_clone = self
385                .state
386                .read()
387                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
388                .clone();
389            checkpointer
390                .save_state(&ThreadId::default(), &state_clone)
391                .await?;
392        }
393
394        Ok(result_message)
395    }
396
397    /// Handle message from string input - converts string to AgentMessage internally
398    pub async fn handle_message(
399        &self,
400        input: impl AsRef<str>,
401        state: Arc<AgentStateSnapshot>,
402    ) -> anyhow::Result<AgentMessage> {
403        self.handle_message_with_metadata(input, None, state).await
404    }
405
406    /// Handle message from string input with metadata - converts string to AgentMessage internally
407    pub async fn handle_message_with_metadata(
408        &self,
409        input: impl AsRef<str>,
410        metadata: Option<MessageMetadata>,
411        state: Arc<AgentStateSnapshot>,
412    ) -> anyhow::Result<AgentMessage> {
413        let agent_message = AgentMessage {
414            role: MessageRole::User,
415            content: MessageContent::Text(input.as_ref().to_string()),
416            metadata,
417        };
418        self.handle_message_internal(agent_message, state).await
419    }
420
421    /// Internal method that contains the actual message handling logic
422    async fn handle_message_internal(
423        &self,
424        input: AgentMessage,
425        _state: Arc<AgentStateSnapshot>,
426    ) -> anyhow::Result<AgentMessage> {
427        let start_time = std::time::Instant::now();
428
429        self.emit_event(agents_core::events::AgentEvent::AgentStarted(
430            agents_core::events::AgentStartedEvent {
431                metadata: self.create_event_metadata(),
432                agent_name: self.descriptor.name.clone(),
433                message_preview: self.truncate_message(&input),
434            },
435        ));
436
437        self.append_history(input.clone());
438
439        // ReAct loop: continue until LLM responds with text (not tool calls)
440        let max_iterations = 10;
441        let mut iteration = 0;
442
443        loop {
444            iteration += 1;
445            if iteration > max_iterations {
446                tracing::warn!(
447                    "⚠️ Max iterations ({}) reached, stopping ReAct loop",
448                    max_iterations
449                );
450                let response = AgentMessage {
451                    role: MessageRole::Agent,
452                    content: MessageContent::Text(
453                        "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
454                    ),
455                    metadata: None,
456                };
457                self.append_history(response.clone());
458                return Ok(response);
459            }
460
461            tracing::debug!("πŸ”„ ReAct iteration {}/{}", iteration, max_iterations);
462
463            // Build request with current history
464            let mut request = ModelRequest::new(&self.instructions, self.current_history());
465            let tools = self.collect_tools();
466            for middleware in &self.middlewares {
467                let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
468                middleware.modify_model_request(&mut ctx).await?;
469            }
470
471            let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
472            let context = PlannerContext {
473                history: request.messages.clone(),
474                system_prompt: request.system_prompt.clone(),
475                tools: tool_schemas,
476            };
477            let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
478
479            // Ask LLM what to do
480            let decision = self.planner.plan(context, state_snapshot).await?;
481
482            // Emit PlanningComplete event
483            self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
484                agents_core::events::PlanningCompleteEvent {
485                    metadata: self.create_event_metadata(),
486                    action_type: match &decision.next_action {
487                        PlannerAction::Respond { .. } => "respond".to_string(),
488                        PlannerAction::CallTool { .. } => "call_tool".to_string(),
489                        PlannerAction::Terminate => "terminate".to_string(),
490                    },
491                    action_summary: match &decision.next_action {
492                        PlannerAction::Respond { message } => {
493                            format!("Respond: {}", self.truncate_message(message))
494                        }
495                        PlannerAction::CallTool { tool_name, .. } => {
496                            format!("Call tool: {}", tool_name)
497                        }
498                        PlannerAction::Terminate => "Terminate".to_string(),
499                    },
500                },
501            ));
502
503            match decision.next_action {
504                PlannerAction::Respond { message } => {
505                    // LLM decided to respond with text - exit loop
506                    self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
507                        agents_core::events::AgentCompletedEvent {
508                            metadata: self.create_event_metadata(),
509                            agent_name: self.descriptor.name.clone(),
510                            duration_ms: start_time.elapsed().as_millis() as u64,
511                            response_preview: self.truncate_message(&message),
512                        },
513                    ));
514
515                    self.append_history(message.clone());
516                    return Ok(message);
517                }
518                PlannerAction::CallTool { tool_name, payload } => {
519                    // Add AI's decision to call tool to history
520                    // This is needed for OpenAI's API which expects:
521                    // 1. Assistant message with tool call
522                    // 2. Tool message with result
523                    let tool_call_message = AgentMessage {
524                        role: MessageRole::Agent,
525                        content: MessageContent::Text(format!(
526                            "Calling tool: {} with args: {}",
527                            tool_name,
528                            serde_json::to_string(&payload).unwrap_or_default()
529                        )),
530                        metadata: None,
531                    };
532                    self.append_history(tool_call_message);
533
534                    if let Some(tool) = tools.get(&tool_name).cloned() {
535                        // Check all middleware for interrupts before executing tool
536                        let call_id = format!("call_{}", uuid::Uuid::new_v4());
537                        for middleware in &self.middlewares {
538                            if let Some(interrupt) = middleware
539                                .before_tool_execution(&tool_name, &payload, &call_id)
540                                .await?
541                            {
542                                // Save interrupt to state
543                                {
544                                    let mut state_guard = self.state.write().map_err(|_| {
545                                        anyhow::anyhow!("Failed to acquire write lock on state")
546                                    })?;
547                                    state_guard.add_interrupt(interrupt.clone());
548                                }
549
550                                // Persist state with checkpointer
551                                if let Some(checkpointer) = &self.checkpointer {
552                                    let state_clone = self
553                                        .state
554                                        .read()
555                                        .map_err(|_| {
556                                            anyhow::anyhow!("Failed to acquire read lock on state")
557                                        })?
558                                        .clone();
559                                    checkpointer
560                                        .save_state(&ThreadId::default(), &state_clone)
561                                        .await?;
562                                }
563
564                                // Return interrupt message - execution pauses here
565                                let interrupt_message = AgentMessage {
566                                    role: MessageRole::System,
567                                    content: MessageContent::Text(format!(
568                                        "⏸️ Execution paused: Tool '{}' requires human approval",
569                                        tool_name
570                                    )),
571                                    metadata: None,
572                                };
573                                self.append_history(interrupt_message.clone());
574                                return Ok(interrupt_message);
575                            }
576                        }
577
578                        // No interrupt - execute tool
579                        let tool_start_time = std::time::Instant::now();
580
581                        self.emit_event(agents_core::events::AgentEvent::ToolStarted(
582                            agents_core::events::ToolStartedEvent {
583                                metadata: self.create_event_metadata(),
584                                tool_name: tool_name.clone(),
585                                input_summary: self.summarize_payload(&payload),
586                            },
587                        ));
588
589                        tracing::warn!(
590                            "βš™οΈ EXECUTING TOOL: {} with payload: {}",
591                            tool_name,
592                            serde_json::to_string(&payload)
593                                .unwrap_or_else(|_| "invalid json".to_string())
594                        );
595
596                        let result = self
597                            .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
598                            .await;
599
600                        let duration = tool_start_time.elapsed();
601                        match result {
602                            Ok(tool_result_message) => {
603                                let content_preview = match &tool_result_message.content {
604                                    MessageContent::Text(t) => {
605                                        if t.len() > 100 {
606                                            format!("{}... ({} chars)", &t[..100], t.len())
607                                        } else {
608                                            t.clone()
609                                        }
610                                    }
611                                    MessageContent::Json(v) => {
612                                        format!("JSON: {} bytes", v.to_string().len())
613                                    }
614                                };
615
616                                self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
617                                    agents_core::events::ToolCompletedEvent {
618                                        metadata: self.create_event_metadata(),
619                                        tool_name: tool_name.clone(),
620                                        duration_ms: duration.as_millis() as u64,
621                                        result_summary: content_preview.clone(),
622                                        success: true,
623                                    },
624                                ));
625
626                                tracing::warn!(
627                                    "βœ… TOOL COMPLETED: {} in {:?} - Result: {}",
628                                    tool_name,
629                                    duration,
630                                    content_preview
631                                );
632
633                                // Add tool result to history and continue ReAct loop
634                                self.append_history(tool_result_message);
635                                // Loop continues - LLM will see tool result and decide next action
636                            }
637                            Err(e) => {
638                                self.emit_event(agents_core::events::AgentEvent::ToolFailed(
639                                    agents_core::events::ToolFailedEvent {
640                                        metadata: self.create_event_metadata(),
641                                        tool_name: tool_name.clone(),
642                                        duration_ms: duration.as_millis() as u64,
643                                        error_message: e.to_string(),
644                                        is_recoverable: true,
645                                        retry_count: 0,
646                                    },
647                                ));
648
649                                tracing::error!(
650                                    "❌ TOOL FAILED: {} in {:?} - Error: {}",
651                                    tool_name,
652                                    duration,
653                                    e
654                                );
655
656                                // Add error to history and continue - let LLM handle the error
657                                let error_message = AgentMessage {
658                                    role: MessageRole::Tool,
659                                    content: MessageContent::Text(format!(
660                                        "Error executing {}: {}",
661                                        tool_name, e
662                                    )),
663                                    metadata: None,
664                                };
665                                self.append_history(error_message);
666                                // Loop continues - LLM will see error and decide how to handle it
667                            }
668                        }
669                    } else {
670                        // Tool not found - add error to history and continue
671                        tracing::warn!("⚠️ Tool '{}' not found", tool_name);
672                        let error_message = AgentMessage {
673                            role: MessageRole::Tool,
674                            content: MessageContent::Text(format!(
675                                "Tool '{}' not found. Available tools: {}",
676                                tool_name,
677                                tools
678                                    .keys()
679                                    .map(|k| k.as_str())
680                                    .collect::<Vec<_>>()
681                                    .join(", ")
682                            )),
683                            metadata: None,
684                        };
685                        self.append_history(error_message);
686                        // Loop continues - LLM will see error and try something else
687                    }
688                }
689                PlannerAction::Terminate => {
690                    // LLM decided to terminate - exit loop
691                    tracing::debug!("πŸ›‘ Agent terminated");
692                    let message = AgentMessage {
693                        role: MessageRole::Agent,
694                        content: MessageContent::Text("Task completed.".into()),
695                        metadata: None,
696                    };
697                    self.append_history(message.clone());
698                    return Ok(message);
699                }
700            }
701        }
702    }
703}
704
705#[async_trait]
706impl AgentHandle for DeepAgent {
707    async fn describe(&self) -> AgentDescriptor {
708        self.descriptor.clone()
709    }
710
711    async fn handle_message(
712        &self,
713        input: AgentMessage,
714        _state: Arc<AgentStateSnapshot>,
715    ) -> anyhow::Result<AgentMessage> {
716        self.handle_message_internal(input, _state).await
717    }
718
719    async fn handle_message_stream(
720        &self,
721        input: AgentMessage,
722        _state: Arc<AgentStateSnapshot>,
723    ) -> anyhow::Result<agents_core::agent::AgentStream> {
724        use crate::planner::LlmBackedPlanner;
725        use agents_core::llm::{LlmRequest, StreamChunk};
726
727        // Add input to history
728        self.append_history(input.clone());
729
730        // Build the request similar to handle_message_internal
731        let mut request = ModelRequest::new(&self.instructions, self.current_history());
732        let tools = self.collect_tools();
733
734        // Apply middleware modifications
735        for middleware in &self.middlewares {
736            let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
737            middleware.modify_model_request(&mut ctx).await?;
738        }
739
740        // Convert ModelRequest to LlmRequest and add tools
741        let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
742        let llm_request = LlmRequest {
743            system_prompt: request.system_prompt.clone(),
744            messages: request.messages.clone(),
745            tools: tool_schemas,
746        };
747
748        // Try to get the underlying LLM model for streaming
749        let planner_any = self.planner.as_any();
750
751        if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
752            // We have an LlmBackedPlanner, use its model for streaming
753            let model = llm_planner.model().clone();
754            let stream = model.generate_stream(llm_request).await?;
755            Ok(stream)
756        } else {
757            // Fallback to non-streaming
758            let response = self.handle_message_internal(input, _state).await?;
759            Ok(Box::pin(futures::stream::once(async move {
760                Ok(StreamChunk::Done { message: response })
761            })))
762        }
763    }
764
765    async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
766        let state_guard = self
767            .state
768            .read()
769            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
770        Ok(state_guard.pending_interrupts.first().cloned())
771    }
772
773    async fn resume_with_approval(
774        &self,
775        action: agents_core::hitl::HitlAction,
776    ) -> anyhow::Result<AgentMessage> {
777        self.resume_with_approval(action).await
778    }
779}
780
781/// Create a deep agent from configuration - matches Python middleware assembly exactly
782///
783/// This function assembles the middleware stack in the same order as the Python SDK:
784/// planning β†’ filesystem β†’ subagents β†’ summarization β†’ prompt caching β†’ optional HITL
785pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
786    let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
787    let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
788
789    let planning = Arc::new(PlanningMiddleware::new(state.clone()));
790    let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
791
792    // Build sub-agents from configurations
793    let mut registrations: Vec<SubAgentRegistration> = Vec::new();
794
795    // Build custom sub-agents from configs
796    for subagent_config in &config.subagent_configs {
797        // Determine the planner for this sub-agent
798        let sub_planner = if let Some(ref model) = subagent_config.model {
799            // Sub-agent has its own model - wrap it in a planner
800            Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
801        } else {
802            // Inherit parent's planner
803            config.planner.clone()
804        };
805
806        // Create a DeepAgentConfig for this sub-agent
807        let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
808
809        // Configure tools
810        if let Some(ref tools) = subagent_config.tools {
811            for tool in tools {
812                sub_cfg = sub_cfg.with_tool(tool.clone());
813            }
814        }
815
816        // Configure built-in tools
817        if let Some(ref builtin) = subagent_config.builtin_tools {
818            sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
819        }
820
821        // Sub-agents should not have their own sub-agents
822        sub_cfg = sub_cfg.with_auto_general_purpose(false);
823
824        // Configure prompt caching
825        sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
826
827        // Build the sub-agent recursively
828        let sub_agent = create_deep_agent_from_config(sub_cfg);
829
830        // Register the sub-agent
831        registrations.push(SubAgentRegistration {
832            descriptor: SubAgentDescriptor {
833                name: subagent_config.name.clone(),
834                description: subagent_config.description.clone(),
835            },
836            agent: Arc::new(sub_agent),
837        });
838    }
839
840    // Optionally inject a general-purpose subagent
841    if config.auto_general_purpose {
842        let has_gp = registrations
843            .iter()
844            .any(|r| r.descriptor.name == "general-purpose");
845        if !has_gp {
846            // Create a subagent with inherited planner/tools and same instructions
847            let mut sub_cfg =
848                DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
849                    .with_auto_general_purpose(false)
850                    .with_prompt_caching(config.enable_prompt_caching);
851            if let Some(ref selected) = config.builtin_tools {
852                sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
853            }
854            if let Some(ref sum) = config.summarization {
855                sub_cfg = sub_cfg.with_summarization(sum.clone());
856            }
857            for t in &config.tools {
858                sub_cfg = sub_cfg.with_tool(t.clone());
859            }
860
861            let gp = create_deep_agent_from_config(sub_cfg);
862            registrations.push(SubAgentRegistration {
863                descriptor: SubAgentDescriptor {
864                    name: "general-purpose".into(),
865                    description: "Default reasoning agent".into(),
866                },
867                agent: Arc::new(gp),
868            });
869        }
870    }
871
872    let subagent = Arc::new(SubAgentMiddleware::new_with_events(
873        registrations,
874        config.event_dispatcher.clone(),
875    ));
876    let base_prompt = Arc::new(BaseSystemPromptMiddleware);
877    let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
878    let summarization = config.summarization.as_ref().map(|cfg| {
879        Arc::new(SummarizationMiddleware::new(
880            cfg.messages_to_keep,
881            cfg.summary_note.clone(),
882        ))
883    });
884    let hitl = if config.tool_interrupts.is_empty() {
885        None
886    } else {
887        // Validate that checkpointer is configured when HITL is enabled
888        if config.checkpointer.is_none() {
889            tracing::error!(
890                "⚠️ HITL middleware requires a checkpointer to persist interrupt state. \
891                 HITL will be disabled. Please configure a checkpointer to enable HITL."
892            );
893            None
894        } else {
895            tracing::info!("πŸ”’ HITL enabled for {} tools", config.tool_interrupts.len());
896            Some(Arc::new(HumanInLoopMiddleware::new(
897                config.tool_interrupts.clone(),
898            )))
899        }
900    };
901
902    // Assemble middleware stack with Deep Agent prompt for automatic tool usage
903    // Order: base β†’ deep agent prompt β†’ planning β†’ filesystem β†’ subagents β†’ summarization β†’ caching β†’ HITL
904    let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
905        base_prompt,
906        deep_agent_prompt,
907        planning,
908        filesystem,
909        subagent,
910    ];
911    if let Some(ref summary) = summarization {
912        middlewares.push(summary.clone());
913    }
914    if config.enable_prompt_caching {
915        middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
916    }
917    if let Some(ref hitl_mw) = hitl {
918        middlewares.push(hitl_mw.clone());
919    }
920
921    DeepAgent {
922        descriptor: AgentDescriptor {
923            name: "deep-agent".into(),
924            version: "0.0.1".into(),
925            description: Some("Rust deep agent".into()),
926        },
927        instructions: config.instructions,
928        planner: config.planner,
929        middlewares,
930        base_tools: config.tools,
931        state,
932        history,
933        _summarization: summarization,
934        _hitl: hitl,
935        builtin_tools: config.builtin_tools,
936        checkpointer: config.checkpointer,
937        event_dispatcher: config.event_dispatcher,
938    }
939}