1use super::config::DeepAgentConfig;
7use crate::middleware::{
8 AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9 DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10 ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11 SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15 AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::num::NonZeroUsize;
26use std::sync::{Arc, RwLock};
27
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
30
31fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
35 let mut pending = 0;
36 let mut in_progress = 0;
37 let mut completed = 0;
38
39 for todo in todos {
40 match todo.status {
41 agents_core::state::TodoStatus::Pending => pending += 1,
42 agents_core::state::TodoStatus::InProgress => in_progress += 1,
43 agents_core::state::TodoStatus::Completed => completed += 1,
44 }
45 }
46
47 (pending, in_progress, completed)
48}
49
50pub struct DeepAgent {
55 descriptor: AgentDescriptor,
56 instructions: String,
57 planner: Arc<dyn PlannerHandle>,
58 middlewares: Vec<Arc<dyn AgentMiddleware>>,
59 base_tools: Vec<ToolBox>,
60 state: Arc<RwLock<AgentStateSnapshot>>,
61 history: Arc<RwLock<Vec<AgentMessage>>>,
62 _summarization: Option<Arc<SummarizationMiddleware>>,
63 _hitl: Option<Arc<HumanInLoopMiddleware>>,
64 builtin_tools: Option<HashSet<String>>,
65 checkpointer: Option<Arc<dyn Checkpointer>>,
66 event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
67 enable_pii_sanitization: bool,
68 max_iterations: NonZeroUsize,
69}
70
71impl DeepAgent {
72 fn collect_tools(&self) -> HashMap<String, ToolBox> {
73 let mut tools: HashMap<String, ToolBox> = HashMap::new();
74 for tool in &self.base_tools {
75 tools.insert(tool.schema().name.clone(), tool.clone());
76 }
77 for middleware in &self.middlewares {
78 for tool in middleware.tools() {
79 let tool_name = tool.schema().name.clone();
80 if self.should_include(&tool_name) {
81 tools.insert(tool_name, tool);
82 }
83 }
84 }
85 tools
86 }
87 fn should_include(&self, name: &str) -> bool {
90 let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
91 if !is_builtin {
92 return true;
93 }
94 match &self.builtin_tools {
95 None => true,
96 Some(selected) => selected.contains(name),
97 }
98 }
99
100 fn append_history(&self, message: AgentMessage) {
101 if let Ok(mut history) = self.history.write() {
102 history.push(message);
103 }
104 }
105
106 fn current_history(&self) -> Vec<AgentMessage> {
107 self.history.read().map(|h| h.clone()).unwrap_or_default()
108 }
109
110 fn emit_event(&self, event: agents_core::events::AgentEvent) {
111 if let Some(dispatcher) = &self.event_dispatcher {
112 let dispatcher_clone = dispatcher.clone();
113 tokio::spawn(async move {
114 dispatcher_clone.dispatch(event).await;
115 });
116 }
117 }
118
119 fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
120 agents_core::events::EventMetadata::new(
121 "default".to_string(),
122 uuid::Uuid::new_v4().to_string(),
123 None,
124 )
125 }
126
127 fn truncate_message(&self, message: &AgentMessage) -> String {
128 let text = match &message.content {
129 MessageContent::Text(t) => t.clone(),
130 MessageContent::Json(v) => v.to_string(),
131 };
132
133 if self.enable_pii_sanitization {
134 agents_core::security::safe_preview(&text, agents_core::security::MAX_PREVIEW_LENGTH)
135 } else {
136 agents_core::security::truncate_string(&text, agents_core::security::MAX_PREVIEW_LENGTH)
138 }
139 }
140
141 fn get_full_message_text(&self, message: &AgentMessage) -> String {
142 match &message.content {
143 MessageContent::Text(t) => t.clone(),
144 MessageContent::Json(v) => v.to_string(),
145 }
146 }
147
148 fn summarize_payload(&self, payload: &Value) -> String {
149 if self.enable_pii_sanitization {
150 agents_core::security::sanitize_tool_payload(
151 payload,
152 agents_core::security::MAX_PREVIEW_LENGTH,
153 )
154 } else {
155 let json_str = payload.to_string();
157 agents_core::security::truncate_string(
158 &json_str,
159 agents_core::security::MAX_PREVIEW_LENGTH,
160 )
161 }
162 }
163
164 pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
166 if let Some(ref checkpointer) = self.checkpointer {
167 let state = self
168 .state
169 .read()
170 .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
171 .clone();
172
173 let state_json = serde_json::to_string(&state)?;
175 let state_size = state_json.len();
176
177 checkpointer.save_state(thread_id, &state).await?;
179
180 self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
182 agents_core::events::StateCheckpointedEvent {
183 metadata: self.create_event_metadata(),
184 checkpoint_id: thread_id.to_string(),
185 state_size_bytes: state_size,
186 },
187 ));
188
189 tracing::debug!(
190 thread_id = %thread_id,
191 state_size_bytes = state_size,
192 "πΎ State checkpointed and event emitted"
193 );
194
195 Ok(())
196 } else {
197 tracing::warn!("Attempted to save state but no checkpointer is configured");
198 Ok(())
199 }
200 }
201
202 pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
204 if let Some(ref checkpointer) = self.checkpointer {
205 if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
206 *self
207 .state
208 .write()
209 .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
210 tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
211 Ok(true)
212 } else {
213 tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
214 Ok(false)
215 }
216 } else {
217 tracing::warn!("Attempted to load state but no checkpointer is configured");
218 Ok(false)
219 }
220 }
221
222 pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
224 if let Some(ref checkpointer) = self.checkpointer {
225 checkpointer.delete_thread(thread_id).await
226 } else {
227 tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
228 Ok(())
229 }
230 }
231
232 pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
234 if let Some(ref checkpointer) = self.checkpointer {
235 checkpointer.list_threads().await
236 } else {
237 Ok(Vec::new())
238 }
239 }
240
241 async fn execute_tool(
242 &self,
243 tool: ToolBox,
244 _tool_name: String,
245 payload: Value,
246 ) -> anyhow::Result<AgentMessage> {
247 let state_snapshot = self.state.read().unwrap().clone();
248 let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
249
250 let result = tool.execute(payload, ctx).await?;
251 Ok(self.apply_tool_result(result))
252 }
253
254 fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
255 match result {
256 ToolResult::Message(message) => {
257 message
260 }
261 ToolResult::WithStateUpdate {
262 message,
263 state_diff,
264 } => {
265 let todos_updated = state_diff.todos.is_some();
267
268 if let Ok(mut state) = self.state.write() {
269 let command = agents_core::command::Command::with_state(state_diff);
270 command.apply_to(&mut state);
271
272 if todos_updated {
274 let (pending_count, in_progress_count, completed_count) =
275 count_todos(&state.todos);
276
277 self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
278 agents_core::events::TodosUpdatedEvent {
279 metadata: self.create_event_metadata(),
280 todos: state.todos.clone(),
281 pending_count,
282 in_progress_count,
283 completed_count,
284 last_updated: chrono::Utc::now().to_rfc3339(),
285 },
286 ));
287
288 tracing::debug!(
289 pending = pending_count,
290 in_progress = in_progress_count,
291 completed = completed_count,
292 total = state.todos.len(),
293 "π Todos updated and event emitted"
294 );
295 }
296 }
297 message
300 }
301 }
302 }
303
304 pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
306 self.state
307 .read()
308 .ok()
309 .and_then(|guard| guard.pending_interrupts.first().cloned())
310 }
311
312 pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
324 if let Some(dispatcher) = &self.event_dispatcher {
325 dispatcher.add_broadcaster(broadcaster);
326 tracing::debug!("Broadcaster added to event dispatcher");
327 } else {
328 tracing::warn!("add_broadcaster called but no event dispatcher configured");
329 }
330 }
331
332 pub fn add_broadcasters(
347 &self,
348 broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
349 ) {
350 if let Some(dispatcher) = &self.event_dispatcher {
351 for broadcaster in broadcasters {
352 dispatcher.add_broadcaster(broadcaster);
353 }
354 tracing::debug!("Multiple broadcasters added to event dispatcher");
355 } else {
356 tracing::warn!("add_broadcasters called but no event dispatcher configured");
357 }
358 }
359
360 pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
362 let interrupt = {
364 let state_guard = self
365 .state
366 .read()
367 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
368 state_guard
369 .pending_interrupts
370 .first()
371 .cloned()
372 .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
373 };
374
375 let result_message = match action {
376 HitlAction::Accept => {
377 let AgentInterrupt::HumanInLoop(hitl) = interrupt;
379 tracing::info!(
380 tool_name = %hitl.tool_name,
381 call_id = %hitl.call_id,
382 "β
HITL: Tool approved, executing with original arguments"
383 );
384
385 let tools = self.collect_tools();
386 let tool = tools
387 .get(&hitl.tool_name)
388 .cloned()
389 .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
390
391 self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
392 .await?
393 }
394
395 HitlAction::Edit {
396 tool_name,
397 tool_args,
398 } => {
399 tracing::info!(
401 tool_name = %tool_name,
402 "βοΈ HITL: Tool edited, executing with modified arguments"
403 );
404
405 let tools = self.collect_tools();
406 let tool = tools
407 .get(&tool_name)
408 .cloned()
409 .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
410
411 self.execute_tool(tool, tool_name, tool_args).await?
412 }
413
414 HitlAction::Reject { reason } => {
415 tracing::info!("β HITL: Tool rejected");
417
418 let text = reason
419 .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
420
421 let message = AgentMessage {
422 role: MessageRole::Tool,
423 content: MessageContent::Text(text),
424 metadata: None,
425 };
426
427 self.append_history(message.clone());
428 message
429 }
430
431 HitlAction::Respond { message } => {
432 tracing::info!("π¬ HITL: Custom response provided");
434
435 self.append_history(message.clone());
436 message
437 }
438 };
439
440 {
442 let mut state_guard = self
443 .state
444 .write()
445 .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
446 state_guard.clear_interrupts();
447 }
448
449 if let Some(checkpointer) = &self.checkpointer {
451 let state_clone = self
452 .state
453 .read()
454 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
455 .clone();
456 checkpointer
457 .save_state(&ThreadId::default(), &state_clone)
458 .await?;
459 }
460
461 Ok(result_message)
462 }
463
464 pub async fn handle_message(
466 &self,
467 input: impl AsRef<str>,
468 state: Arc<AgentStateSnapshot>,
469 ) -> anyhow::Result<AgentMessage> {
470 self.handle_message_with_metadata(input, None, state).await
471 }
472
473 pub async fn handle_message_with_metadata(
475 &self,
476 input: impl AsRef<str>,
477 metadata: Option<MessageMetadata>,
478 state: Arc<AgentStateSnapshot>,
479 ) -> anyhow::Result<AgentMessage> {
480 let agent_message = AgentMessage {
481 role: MessageRole::User,
482 content: MessageContent::Text(input.as_ref().to_string()),
483 metadata,
484 };
485 self.handle_message_internal(agent_message, state).await
486 }
487
488 async fn handle_message_internal(
490 &self,
491 input: AgentMessage,
492 loaded_state: Arc<AgentStateSnapshot>,
493 ) -> anyhow::Result<AgentMessage> {
494 let start_time = std::time::Instant::now();
495
496 if let Ok(mut state_guard) = self.state.write() {
499 *state_guard = (*loaded_state).clone();
500 }
501
502 self.emit_event(agents_core::events::AgentEvent::AgentStarted(
503 agents_core::events::AgentStartedEvent {
504 metadata: self.create_event_metadata(),
505 agent_name: self.descriptor.name.clone(),
506 message_preview: self.truncate_message(&input),
507 },
508 ));
509
510 self.append_history(input.clone());
511
512 let max_iterations = self.max_iterations.get();
514 let mut iteration = 0;
515
516 loop {
517 iteration += 1;
518 if iteration > max_iterations {
519 tracing::warn!(
520 "β οΈ Max iterations ({}) reached, stopping ReAct loop",
521 max_iterations
522 );
523 let response = AgentMessage {
524 role: MessageRole::Agent,
525 content: MessageContent::Text(
526 "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
527 ),
528 metadata: None,
529 };
530 self.append_history(response.clone());
531 return Ok(response);
532 }
533
534 tracing::debug!("π ReAct iteration {}/{}", iteration, max_iterations);
535
536 let mut request = ModelRequest::new(&self.instructions, self.current_history());
538 let tools = self.collect_tools();
539 for middleware in &self.middlewares {
540 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
541 middleware.modify_model_request(&mut ctx).await?;
542 }
543
544 let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
545 let context = PlannerContext {
546 history: request.messages.clone(),
547 system_prompt: request.system_prompt.clone(),
548 tools: tool_schemas,
549 };
550 let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
551
552 let decision = self.planner.plan(context, state_snapshot).await?;
554
555 self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
557 agents_core::events::PlanningCompleteEvent {
558 metadata: self.create_event_metadata(),
559 action_type: match &decision.next_action {
560 PlannerAction::Respond { .. } => "respond".to_string(),
561 PlannerAction::CallTool { .. } => "call_tool".to_string(),
562 PlannerAction::Terminate => "terminate".to_string(),
563 },
564 action_summary: match &decision.next_action {
565 PlannerAction::Respond { message } => {
566 format!("Respond: {}", self.truncate_message(message))
567 }
568 PlannerAction::CallTool { tool_name, .. } => {
569 format!("Call tool: {}", tool_name)
570 }
571 PlannerAction::Terminate => "Terminate".to_string(),
572 },
573 },
574 ));
575
576 match decision.next_action {
577 PlannerAction::Respond { message } => {
578 self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
580 agents_core::events::AgentCompletedEvent {
581 metadata: self.create_event_metadata(),
582 agent_name: self.descriptor.name.clone(),
583 duration_ms: start_time.elapsed().as_millis() as u64,
584 response_preview: self.truncate_message(&message),
585 response: self.get_full_message_text(&message),
586 },
587 ));
588
589 self.append_history(message.clone());
590 return Ok(message);
591 }
592 PlannerAction::CallTool { tool_name, payload } => {
593 let tool_call_message = AgentMessage {
598 role: MessageRole::Agent,
599 content: MessageContent::Text(format!(
600 "Calling tool: {} with args: {}",
601 tool_name,
602 serde_json::to_string(&payload).unwrap_or_default()
603 )),
604 metadata: None,
605 };
606 self.append_history(tool_call_message);
607
608 if let Some(tool) = tools.get(&tool_name).cloned() {
609 let call_id = format!("call_{}", uuid::Uuid::new_v4());
611 for middleware in &self.middlewares {
612 if let Some(interrupt) = middleware
613 .before_tool_execution(&tool_name, &payload, &call_id)
614 .await?
615 {
616 {
618 let mut state_guard = self.state.write().map_err(|_| {
619 anyhow::anyhow!("Failed to acquire write lock on state")
620 })?;
621 state_guard.add_interrupt(interrupt.clone());
622 }
623
624 if let Some(checkpointer) = &self.checkpointer {
626 let state_clone = self
627 .state
628 .read()
629 .map_err(|_| {
630 anyhow::anyhow!("Failed to acquire read lock on state")
631 })?
632 .clone();
633 checkpointer
634 .save_state(&ThreadId::default(), &state_clone)
635 .await?;
636 }
637
638 let interrupt_message = AgentMessage {
640 role: MessageRole::System,
641 content: MessageContent::Text(format!(
642 "βΈοΈ Execution paused: Tool '{}' requires human approval",
643 tool_name
644 )),
645 metadata: None,
646 };
647 self.append_history(interrupt_message.clone());
648 return Ok(interrupt_message);
649 }
650 }
651
652 let tool_start_time = std::time::Instant::now();
654
655 self.emit_event(agents_core::events::AgentEvent::ToolStarted(
656 agents_core::events::ToolStartedEvent {
657 metadata: self.create_event_metadata(),
658 tool_name: tool_name.clone(),
659 input_summary: self.summarize_payload(&payload),
660 },
661 ));
662
663 tracing::warn!(
664 "βοΈ EXECUTING TOOL: {} with payload: {}",
665 tool_name,
666 serde_json::to_string(&payload)
667 .unwrap_or_else(|_| "invalid json".to_string())
668 );
669
670 let result = self
671 .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
672 .await;
673
674 let duration = tool_start_time.elapsed();
675 match result {
676 Ok(tool_result_message) => {
677 let content_preview = match &tool_result_message.content {
678 MessageContent::Text(t) => {
679 if t.len() > 100 {
680 format!("{}... ({} chars)", &t[..100], t.len())
681 } else {
682 t.clone()
683 }
684 }
685 MessageContent::Json(v) => {
686 format!("JSON: {} bytes", v.to_string().len())
687 }
688 };
689
690 self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
691 agents_core::events::ToolCompletedEvent {
692 metadata: self.create_event_metadata(),
693 tool_name: tool_name.clone(),
694 duration_ms: duration.as_millis() as u64,
695 result_summary: content_preview.clone(),
696 success: true,
697 },
698 ));
699
700 tracing::warn!(
701 "β
TOOL COMPLETED: {} in {:?} - Result: {}",
702 tool_name,
703 duration,
704 content_preview
705 );
706
707 self.append_history(tool_result_message);
709 }
711 Err(e) => {
712 self.emit_event(agents_core::events::AgentEvent::ToolFailed(
713 agents_core::events::ToolFailedEvent {
714 metadata: self.create_event_metadata(),
715 tool_name: tool_name.clone(),
716 duration_ms: duration.as_millis() as u64,
717 error_message: e.to_string(),
718 is_recoverable: true,
719 retry_count: 0,
720 },
721 ));
722
723 tracing::error!(
724 "β TOOL FAILED: {} in {:?} - Error: {}",
725 tool_name,
726 duration,
727 e
728 );
729
730 let error_message = AgentMessage {
732 role: MessageRole::Tool,
733 content: MessageContent::Text(format!(
734 "Error executing {}: {}",
735 tool_name, e
736 )),
737 metadata: None,
738 };
739 self.append_history(error_message);
740 }
742 }
743 } else {
744 tracing::warn!("β οΈ Tool '{}' not found", tool_name);
746 let error_message = AgentMessage {
747 role: MessageRole::Tool,
748 content: MessageContent::Text(format!(
749 "Tool '{}' not found. Available tools: {}",
750 tool_name,
751 tools
752 .keys()
753 .map(|k| k.as_str())
754 .collect::<Vec<_>>()
755 .join(", ")
756 )),
757 metadata: None,
758 };
759 self.append_history(error_message);
760 }
762 }
763 PlannerAction::Terminate => {
764 tracing::debug!("π Agent terminated");
766 let message = AgentMessage {
767 role: MessageRole::Agent,
768 content: MessageContent::Text("Task completed.".into()),
769 metadata: None,
770 };
771 self.append_history(message.clone());
772 return Ok(message);
773 }
774 }
775 }
776 }
777}
778
779#[async_trait]
780impl AgentHandle for DeepAgent {
781 async fn describe(&self) -> AgentDescriptor {
782 self.descriptor.clone()
783 }
784
785 async fn handle_message(
786 &self,
787 input: AgentMessage,
788 _state: Arc<AgentStateSnapshot>,
789 ) -> anyhow::Result<AgentMessage> {
790 let response = self.handle_message_internal(input, _state).await?;
791
792 if let Some(checkpointer) = &self.checkpointer {
794 let state_clone = self
795 .state
796 .read()
797 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
798 .clone();
799 checkpointer
800 .save_state(&ThreadId::default(), &state_clone)
801 .await?;
802 }
803
804 Ok(response)
805 }
806
807 async fn handle_message_stream(
808 &self,
809 input: AgentMessage,
810 _state: Arc<AgentStateSnapshot>,
811 ) -> anyhow::Result<agents_core::agent::AgentStream> {
812 use crate::planner::LlmBackedPlanner;
813 use agents_core::llm::{LlmRequest, StreamChunk};
814 use futures::StreamExt;
815
816 self.append_history(input.clone());
818
819 let mut request = ModelRequest::new(&self.instructions, self.current_history());
821 let tools = self.collect_tools();
822
823 for middleware in &self.middlewares {
825 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
826 middleware.modify_model_request(&mut ctx).await?;
827 }
828
829 let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
831 let llm_request = LlmRequest {
832 system_prompt: request.system_prompt.clone(),
833 messages: request.messages.clone(),
834 tools: tool_schemas,
835 };
836
837 let planner_any = self.planner.as_any();
839
840 if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
841 let model = llm_planner.model().clone();
843 let stream = model.generate_stream(llm_request).await?;
844
845 let agent_name = self.descriptor.name.clone();
847 let event_dispatcher = self.event_dispatcher.clone();
848
849 let wrapped_stream = stream.then(move |chunk_result| {
850 let dispatcher = event_dispatcher.clone();
851 let name = agent_name.clone();
852
853 async move {
854 match &chunk_result {
855 Ok(StreamChunk::TextDelta(token)) => {
856 if let Some(ref dispatcher) = dispatcher {
858 let event = agents_core::events::AgentEvent::StreamingToken(
859 agents_core::events::StreamingTokenEvent {
860 metadata: agents_core::events::EventMetadata::new(
861 "default".to_string(),
862 uuid::Uuid::new_v4().to_string(),
863 None,
864 ),
865 agent_name: name.clone(),
866 token: token.clone(),
867 },
868 );
869 dispatcher.dispatch(event).await;
870 }
871 }
872 Ok(StreamChunk::Done { message }) => {
873 if let Some(ref dispatcher) = dispatcher {
875 let full_text = match &message.content {
876 agents_core::messaging::MessageContent::Text(t) => t.clone(),
877 agents_core::messaging::MessageContent::Json(v) => {
878 v.to_string()
879 }
880 };
881
882 let preview = if full_text.len() > 100 {
883 format!("{}...", &full_text[..100])
884 } else {
885 full_text.clone()
886 };
887
888 let event = agents_core::events::AgentEvent::AgentCompleted(
889 agents_core::events::AgentCompletedEvent {
890 metadata: agents_core::events::EventMetadata::new(
891 "default".to_string(),
892 uuid::Uuid::new_v4().to_string(),
893 None,
894 ),
895 agent_name: name.clone(),
896 duration_ms: 0, response_preview: preview,
898 response: full_text,
899 },
900 );
901 dispatcher.dispatch(event).await;
902 }
903 }
904 _ => {}
905 }
906 chunk_result
907 }
908 });
909
910 Ok(Box::pin(wrapped_stream))
911 } else {
912 let response = self.handle_message_internal(input, _state).await?;
914 Ok(Box::pin(futures::stream::once(async move {
915 Ok(StreamChunk::Done { message: response })
916 })))
917 }
918 }
919
920 async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
921 let state_guard = self
922 .state
923 .read()
924 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
925 Ok(state_guard.pending_interrupts.first().cloned())
926 }
927
928 async fn resume_with_approval(
929 &self,
930 action: agents_core::hitl::HitlAction,
931 ) -> anyhow::Result<AgentMessage> {
932 self.resume_with_approval(action).await
933 }
934}
935
936pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
941 let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
942 let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
943
944 let planning = Arc::new(PlanningMiddleware::new(state.clone()));
945 let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
946
947 let mut registrations: Vec<SubAgentRegistration> = Vec::new();
949
950 for subagent_config in &config.subagent_configs {
951 let sub_planner = if let Some(ref model) = subagent_config.model {
953 Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
955 } else {
956 config.planner.clone()
958 };
959
960 let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
962
963 sub_cfg = sub_cfg.with_max_iterations(config.max_iterations.get());
965
966 if let Some(ref tools) = subagent_config.tools {
968 tracing::debug!(
969 " - Configuring {} tools for {}",
970 tools.len(),
971 subagent_config.name
972 );
973 for tool in tools {
974 sub_cfg = sub_cfg.with_tool(tool.clone());
975 }
976 }
977
978 if let Some(ref builtin) = subagent_config.builtin_tools {
980 sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
981 }
982
983 sub_cfg = sub_cfg.with_auto_general_purpose(false);
985
986 sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
988
989 sub_cfg = sub_cfg.with_pii_sanitization(config.enable_pii_sanitization);
991
992 let sub_agent = create_deep_agent_from_config(sub_cfg);
994
995 registrations.push(SubAgentRegistration {
997 descriptor: SubAgentDescriptor {
998 name: subagent_config.name.clone(),
999 description: subagent_config.description.clone(),
1000 },
1001 agent: Arc::new(sub_agent),
1002 });
1003
1004 tracing::info!("=> Registered sub-agent: {}", subagent_config.name);
1005 }
1006
1007 tracing::info!("=> Total sub-agents registered: {}", registrations.len());
1008
1009 if config.auto_general_purpose {
1011 let has_gp = registrations
1012 .iter()
1013 .any(|r| r.descriptor.name == "general-purpose");
1014 if !has_gp {
1015 let mut sub_cfg =
1017 DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
1018 .with_auto_general_purpose(false)
1019 .with_prompt_caching(config.enable_prompt_caching)
1020 .with_pii_sanitization(config.enable_pii_sanitization)
1021 .with_max_iterations(config.max_iterations.get());
1022 if let Some(ref selected) = config.builtin_tools {
1023 sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
1024 }
1025 if let Some(ref sum) = config.summarization {
1026 sub_cfg = sub_cfg.with_summarization(sum.clone());
1027 }
1028 for t in &config.tools {
1029 sub_cfg = sub_cfg.with_tool(t.clone());
1030 }
1031
1032 let gp = create_deep_agent_from_config(sub_cfg);
1033 registrations.push(SubAgentRegistration {
1034 descriptor: SubAgentDescriptor {
1035 name: "general-purpose".into(),
1036 description: "Default reasoning agent".into(),
1037 },
1038 agent: Arc::new(gp),
1039 });
1040 }
1041 }
1042
1043 let subagent = Arc::new(SubAgentMiddleware::new_with_events(
1044 registrations,
1045 config.event_dispatcher.clone(),
1046 ));
1047 let base_prompt = Arc::new(BaseSystemPromptMiddleware);
1048 let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
1049 let summarization = config.summarization.as_ref().map(|cfg| {
1050 Arc::new(SummarizationMiddleware::new(
1051 cfg.messages_to_keep,
1052 cfg.summary_note.clone(),
1053 ))
1054 });
1055 let hitl = if config.tool_interrupts.is_empty() {
1056 None
1057 } else {
1058 if config.checkpointer.is_none() {
1060 tracing::error!(
1061 "β οΈ HITL middleware requires a checkpointer to persist interrupt state. \
1062 HITL will be disabled. Please configure a checkpointer to enable HITL."
1063 );
1064 None
1065 } else {
1066 tracing::info!("π HITL enabled for {} tools", config.tool_interrupts.len());
1067 Some(Arc::new(HumanInLoopMiddleware::new(
1068 config.tool_interrupts.clone(),
1069 )))
1070 }
1071 };
1072
1073 let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
1076 base_prompt,
1077 deep_agent_prompt,
1078 planning,
1079 filesystem,
1080 subagent,
1081 ];
1082 if let Some(ref summary) = summarization {
1083 middlewares.push(summary.clone());
1084 }
1085 if config.enable_prompt_caching {
1086 middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
1087 }
1088 if let Some(ref hitl_mw) = hitl {
1089 middlewares.push(hitl_mw.clone());
1090 }
1091
1092 DeepAgent {
1093 descriptor: AgentDescriptor {
1094 name: "deep-agent".into(),
1095 version: "0.0.1".into(),
1096 description: Some("Rust deep agent".into()),
1097 },
1098 instructions: config.instructions,
1099 planner: config.planner,
1100 middlewares,
1101 base_tools: config.tools,
1102 state,
1103 history,
1104 _summarization: summarization,
1105 _hitl: hitl,
1106 builtin_tools: config.builtin_tools,
1107 checkpointer: config.checkpointer,
1108 event_dispatcher: config.event_dispatcher,
1109 enable_pii_sanitization: config.enable_pii_sanitization,
1110 max_iterations: config.max_iterations,
1111 }
1112}