1use super::config::DeepAgentConfig;
7use crate::middleware::{
8 AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9 DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10 ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11 SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15 AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::num::NonZeroUsize;
26use std::sync::{Arc, RwLock};
27
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
30
31fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
35 let mut pending = 0;
36 let mut in_progress = 0;
37 let mut completed = 0;
38
39 for todo in todos {
40 match todo.status {
41 agents_core::state::TodoStatus::Pending => pending += 1,
42 agents_core::state::TodoStatus::InProgress => in_progress += 1,
43 agents_core::state::TodoStatus::Completed => completed += 1,
44 }
45 }
46
47 (pending, in_progress, completed)
48}
49
50pub struct DeepAgent {
55 descriptor: AgentDescriptor,
56 instructions: String,
57 planner: Arc<dyn PlannerHandle>,
58 middlewares: Vec<Arc<dyn AgentMiddleware>>,
59 base_tools: Vec<ToolBox>,
60 state: Arc<RwLock<AgentStateSnapshot>>,
61 history: Arc<RwLock<Vec<AgentMessage>>>,
62 _summarization: Option<Arc<SummarizationMiddleware>>,
63 _hitl: Option<Arc<HumanInLoopMiddleware>>,
64 builtin_tools: Option<HashSet<String>>,
65 checkpointer: Option<Arc<dyn Checkpointer>>,
66 event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
67 enable_pii_sanitization: bool,
68 max_iterations: NonZeroUsize,
69}
70
71impl DeepAgent {
72 fn collect_tools(&self) -> HashMap<String, ToolBox> {
73 let mut tools: HashMap<String, ToolBox> = HashMap::new();
74 for tool in &self.base_tools {
75 tools.insert(tool.schema().name.clone(), tool.clone());
76 }
77 for middleware in &self.middlewares {
78 for tool in middleware.tools() {
79 let tool_name = tool.schema().name.clone();
80 if self.should_include(&tool_name) {
81 tools.insert(tool_name, tool);
82 }
83 }
84 }
85 tools
86 }
87 fn should_include(&self, name: &str) -> bool {
90 let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
91 if !is_builtin {
92 return true;
93 }
94 match &self.builtin_tools {
95 None => true,
96 Some(selected) => selected.contains(name),
97 }
98 }
99
100 fn append_history(&self, message: AgentMessage) {
101 if let Ok(mut history) = self.history.write() {
102 history.push(message);
103 }
104 }
105
106 fn current_history(&self) -> Vec<AgentMessage> {
107 self.history.read().map(|h| h.clone()).unwrap_or_default()
108 }
109
110 fn emit_event(&self, event: agents_core::events::AgentEvent) {
111 if let Some(dispatcher) = &self.event_dispatcher {
112 let dispatcher_clone = dispatcher.clone();
113 tokio::spawn(async move {
114 dispatcher_clone.dispatch(event).await;
115 });
116 }
117 }
118
119 fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
120 agents_core::events::EventMetadata::new(
121 "default".to_string(),
122 uuid::Uuid::new_v4().to_string(),
123 None,
124 )
125 }
126
127 fn truncate_message(&self, message: &AgentMessage) -> String {
128 let text = match &message.content {
129 MessageContent::Text(t) => t.clone(),
130 MessageContent::Json(v) => v.to_string(),
131 };
132
133 if self.enable_pii_sanitization {
134 agents_core::security::safe_preview(&text, agents_core::security::MAX_PREVIEW_LENGTH)
135 } else {
136 agents_core::security::truncate_string(&text, agents_core::security::MAX_PREVIEW_LENGTH)
138 }
139 }
140
141 fn get_full_message_text(&self, message: &AgentMessage) -> String {
142 match &message.content {
143 MessageContent::Text(t) => t.clone(),
144 MessageContent::Json(v) => v.to_string(),
145 }
146 }
147
148 fn summarize_payload(&self, payload: &Value) -> String {
149 if self.enable_pii_sanitization {
150 agents_core::security::sanitize_tool_payload(
151 payload,
152 agents_core::security::MAX_PREVIEW_LENGTH,
153 )
154 } else {
155 let json_str = payload.to_string();
157 agents_core::security::truncate_string(
158 &json_str,
159 agents_core::security::MAX_PREVIEW_LENGTH,
160 )
161 }
162 }
163
164 pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
166 if let Some(ref checkpointer) = self.checkpointer {
167 let state = self
168 .state
169 .read()
170 .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
171 .clone();
172
173 let state_json = serde_json::to_string(&state)?;
175 let state_size = state_json.len();
176
177 checkpointer.save_state(thread_id, &state).await?;
179
180 self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
182 agents_core::events::StateCheckpointedEvent {
183 metadata: self.create_event_metadata(),
184 checkpoint_id: thread_id.to_string(),
185 state_size_bytes: state_size,
186 },
187 ));
188
189 tracing::debug!(
190 thread_id = %thread_id,
191 state_size_bytes = state_size,
192 "πΎ State checkpointed and event emitted"
193 );
194
195 Ok(())
196 } else {
197 tracing::warn!("Attempted to save state but no checkpointer is configured");
198 Ok(())
199 }
200 }
201
202 pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
204 if let Some(ref checkpointer) = self.checkpointer {
205 if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
206 *self
207 .state
208 .write()
209 .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
210 tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
211 Ok(true)
212 } else {
213 tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
214 Ok(false)
215 }
216 } else {
217 tracing::warn!("Attempted to load state but no checkpointer is configured");
218 Ok(false)
219 }
220 }
221
222 pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
224 if let Some(ref checkpointer) = self.checkpointer {
225 checkpointer.delete_thread(thread_id).await
226 } else {
227 tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
228 Ok(())
229 }
230 }
231
232 pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
234 if let Some(ref checkpointer) = self.checkpointer {
235 checkpointer.list_threads().await
236 } else {
237 Ok(Vec::new())
238 }
239 }
240
241 async fn execute_tool(
242 &self,
243 tool: ToolBox,
244 _tool_name: String,
245 payload: Value,
246 ) -> anyhow::Result<AgentMessage> {
247 let state_snapshot = self.state.read().unwrap().clone();
248 let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
249
250 let result = tool.execute(payload, ctx).await?;
251 Ok(self.apply_tool_result(result))
252 }
253
254 fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
255 match result {
256 ToolResult::Message(message) => {
257 message
260 }
261 ToolResult::WithStateUpdate {
262 message,
263 state_diff,
264 } => {
265 let todos_updated = state_diff.todos.is_some();
267
268 if let Ok(mut state) = self.state.write() {
269 let command = agents_core::command::Command::with_state(state_diff);
270 command.apply_to(&mut state);
271
272 if todos_updated {
274 let (pending_count, in_progress_count, completed_count) =
275 count_todos(&state.todos);
276
277 self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
278 agents_core::events::TodosUpdatedEvent {
279 metadata: self.create_event_metadata(),
280 todos: state.todos.clone(),
281 pending_count,
282 in_progress_count,
283 completed_count,
284 last_updated: chrono::Utc::now().to_rfc3339(),
285 },
286 ));
287
288 tracing::debug!(
289 pending = pending_count,
290 in_progress = in_progress_count,
291 completed = completed_count,
292 total = state.todos.len(),
293 "π Todos updated and event emitted"
294 );
295 }
296 }
297 message
300 }
301 }
302 }
303
304 pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
306 self.state
307 .read()
308 .ok()
309 .and_then(|guard| guard.pending_interrupts.first().cloned())
310 }
311
312 pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
324 if let Some(dispatcher) = &self.event_dispatcher {
325 dispatcher.add_broadcaster(broadcaster);
326 tracing::debug!("Broadcaster added to event dispatcher");
327 } else {
328 tracing::warn!("add_broadcaster called but no event dispatcher configured");
329 }
330 }
331
332 pub fn add_broadcasters(
347 &self,
348 broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
349 ) {
350 if let Some(dispatcher) = &self.event_dispatcher {
351 for broadcaster in broadcasters {
352 dispatcher.add_broadcaster(broadcaster);
353 }
354 tracing::debug!("Multiple broadcasters added to event dispatcher");
355 } else {
356 tracing::warn!("add_broadcasters called but no event dispatcher configured");
357 }
358 }
359
360 pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
362 let interrupt = {
364 let state_guard = self
365 .state
366 .read()
367 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
368 state_guard
369 .pending_interrupts
370 .first()
371 .cloned()
372 .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
373 };
374
375 let result_message = match action {
376 HitlAction::Accept => {
377 let AgentInterrupt::HumanInLoop(hitl) = interrupt;
379 tracing::info!(
380 tool_name = %hitl.tool_name,
381 call_id = %hitl.call_id,
382 "β
HITL: Tool approved, executing with original arguments"
383 );
384
385 let tools = self.collect_tools();
386 let tool = tools
387 .get(&hitl.tool_name)
388 .cloned()
389 .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
390
391 self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
392 .await?
393 }
394
395 HitlAction::Edit {
396 tool_name,
397 tool_args,
398 } => {
399 tracing::info!(
401 tool_name = %tool_name,
402 "βοΈ HITL: Tool edited, executing with modified arguments"
403 );
404
405 let tools = self.collect_tools();
406 let tool = tools
407 .get(&tool_name)
408 .cloned()
409 .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
410
411 self.execute_tool(tool, tool_name, tool_args).await?
412 }
413
414 HitlAction::Reject { reason } => {
415 tracing::info!("β HITL: Tool rejected");
417
418 let text = reason
419 .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
420
421 let message = AgentMessage {
422 role: MessageRole::Tool,
423 content: MessageContent::Text(text),
424 metadata: None,
425 };
426
427 self.append_history(message.clone());
428 message
429 }
430
431 HitlAction::Respond { message } => {
432 tracing::info!("π¬ HITL: Custom response provided");
434
435 self.append_history(message.clone());
436 message
437 }
438 };
439
440 {
442 let mut state_guard = self
443 .state
444 .write()
445 .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
446 state_guard.clear_interrupts();
447 }
448
449 if let Some(checkpointer) = &self.checkpointer {
451 let state_clone = self
452 .state
453 .read()
454 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
455 .clone();
456 checkpointer
457 .save_state(&ThreadId::default(), &state_clone)
458 .await?;
459 }
460
461 Ok(result_message)
462 }
463
464 pub async fn handle_message(
466 &self,
467 input: impl AsRef<str>,
468 state: Arc<AgentStateSnapshot>,
469 ) -> anyhow::Result<AgentMessage> {
470 self.handle_message_with_metadata(input, None, state).await
471 }
472
473 pub async fn handle_message_with_metadata(
475 &self,
476 input: impl AsRef<str>,
477 metadata: Option<MessageMetadata>,
478 state: Arc<AgentStateSnapshot>,
479 ) -> anyhow::Result<AgentMessage> {
480 let agent_message = AgentMessage {
481 role: MessageRole::User,
482 content: MessageContent::Text(input.as_ref().to_string()),
483 metadata,
484 };
485 self.handle_message_internal(agent_message, state).await
486 }
487
488 async fn handle_message_internal(
490 &self,
491 input: AgentMessage,
492 loaded_state: Arc<AgentStateSnapshot>,
493 ) -> anyhow::Result<AgentMessage> {
494 let start_time = std::time::Instant::now();
495
496 if let Ok(mut state_guard) = self.state.write() {
499 *state_guard = (*loaded_state).clone();
500 }
501
502 self.emit_event(agents_core::events::AgentEvent::AgentStarted(
503 agents_core::events::AgentStartedEvent {
504 metadata: self.create_event_metadata(),
505 agent_name: self.descriptor.name.clone(),
506 message_preview: self.truncate_message(&input),
507 },
508 ));
509
510 self.append_history(input.clone());
511
512 let max_iterations = self.max_iterations.get();
514 let mut iteration = 0;
515
516 loop {
517 iteration += 1;
518 if iteration > max_iterations {
519 tracing::warn!(
520 "β οΈ Max iterations ({}) reached, stopping ReAct loop",
521 max_iterations
522 );
523 let response = AgentMessage {
524 role: MessageRole::Agent,
525 content: MessageContent::Text(
526 "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
527 ),
528 metadata: None,
529 };
530 self.append_history(response.clone());
531 return Ok(response);
532 }
533
534 tracing::debug!("π ReAct iteration {}/{}", iteration, max_iterations);
535
536 let mut request = ModelRequest::new(&self.instructions, self.current_history());
538 let tools = self.collect_tools();
539 for middleware in &self.middlewares {
540 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
541 middleware.modify_model_request(&mut ctx).await?;
542 }
543
544 let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
545 let context = PlannerContext {
546 history: request.messages.clone(),
547 system_prompt: request.system_prompt.clone(),
548 tools: tool_schemas,
549 };
550 let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
551
552 let decision = self.planner.plan(context, state_snapshot).await?;
554
555 self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
557 agents_core::events::PlanningCompleteEvent {
558 metadata: self.create_event_metadata(),
559 action_type: match &decision.next_action {
560 PlannerAction::Respond { .. } => "respond".to_string(),
561 PlannerAction::CallTool { .. } => "call_tool".to_string(),
562 PlannerAction::Terminate => "terminate".to_string(),
563 },
564 action_summary: match &decision.next_action {
565 PlannerAction::Respond { message } => {
566 format!("Respond: {}", self.truncate_message(message))
567 }
568 PlannerAction::CallTool { tool_name, .. } => {
569 format!("Call tool: {}", tool_name)
570 }
571 PlannerAction::Terminate => "Terminate".to_string(),
572 },
573 },
574 ));
575
576 match decision.next_action {
577 PlannerAction::Respond { message } => {
578 self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
580 agents_core::events::AgentCompletedEvent {
581 metadata: self.create_event_metadata(),
582 agent_name: self.descriptor.name.clone(),
583 duration_ms: start_time.elapsed().as_millis() as u64,
584 response_preview: self.truncate_message(&message),
585 response: self.get_full_message_text(&message),
586 },
587 ));
588
589 self.append_history(message.clone());
590 return Ok(message);
591 }
592 PlannerAction::CallTool { tool_name, payload } => {
593 let tool_call_message = AgentMessage {
599 role: MessageRole::System,
600 content: MessageContent::Text(format!(
601 "Calling tool: {} with args: {}",
602 tool_name,
603 serde_json::to_string(&payload).unwrap_or_default()
604 )),
605 metadata: None,
606 };
607 self.append_history(tool_call_message);
608
609 if let Some(tool) = tools.get(&tool_name).cloned() {
610 let call_id = format!("call_{}", uuid::Uuid::new_v4());
612 for middleware in &self.middlewares {
613 if let Some(interrupt) = middleware
614 .before_tool_execution(&tool_name, &payload, &call_id)
615 .await?
616 {
617 {
619 let mut state_guard = self.state.write().map_err(|_| {
620 anyhow::anyhow!("Failed to acquire write lock on state")
621 })?;
622 state_guard.add_interrupt(interrupt.clone());
623 }
624
625 if let Some(checkpointer) = &self.checkpointer {
627 let state_clone = self
628 .state
629 .read()
630 .map_err(|_| {
631 anyhow::anyhow!("Failed to acquire read lock on state")
632 })?
633 .clone();
634 checkpointer
635 .save_state(&ThreadId::default(), &state_clone)
636 .await?;
637 }
638
639 let interrupt_message = AgentMessage {
641 role: MessageRole::System,
642 content: MessageContent::Text(format!(
643 "βΈοΈ Execution paused: Tool '{}' requires human approval",
644 tool_name
645 )),
646 metadata: None,
647 };
648 self.append_history(interrupt_message.clone());
649 return Ok(interrupt_message);
650 }
651 }
652
653 let tool_start_time = std::time::Instant::now();
655
656 self.emit_event(agents_core::events::AgentEvent::ToolStarted(
657 agents_core::events::ToolStartedEvent {
658 metadata: self.create_event_metadata(),
659 tool_name: tool_name.clone(),
660 input_summary: self.summarize_payload(&payload),
661 },
662 ));
663
664 tracing::warn!(
665 "βοΈ EXECUTING TOOL: {} with payload: {}",
666 tool_name,
667 serde_json::to_string(&payload)
668 .unwrap_or_else(|_| "invalid json".to_string())
669 );
670
671 let result = self
672 .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
673 .await;
674
675 let duration = tool_start_time.elapsed();
676 match result {
677 Ok(tool_result_message) => {
678 let content_preview = match &tool_result_message.content {
679 MessageContent::Text(t) => {
680 if t.len() > 100 {
681 let truncated: String = t.chars().take(100).collect();
682 format!(
683 "{}... ({} chars)",
684 truncated,
685 t.chars().count()
686 )
687 } else {
688 t.clone()
689 }
690 }
691 MessageContent::Json(v) => {
692 format!("JSON: {} bytes", v.to_string().len())
693 }
694 };
695
696 self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
697 agents_core::events::ToolCompletedEvent {
698 metadata: self.create_event_metadata(),
699 tool_name: tool_name.clone(),
700 duration_ms: duration.as_millis() as u64,
701 result_summary: content_preview.clone(),
702 success: true,
703 },
704 ));
705
706 tracing::warn!(
707 "β
TOOL COMPLETED: {} in {:?} - Result: {}",
708 tool_name,
709 duration,
710 content_preview
711 );
712
713 self.append_history(tool_result_message);
715 }
717 Err(e) => {
718 self.emit_event(agents_core::events::AgentEvent::ToolFailed(
719 agents_core::events::ToolFailedEvent {
720 metadata: self.create_event_metadata(),
721 tool_name: tool_name.clone(),
722 duration_ms: duration.as_millis() as u64,
723 error_message: e.to_string(),
724 is_recoverable: true,
725 retry_count: 0,
726 },
727 ));
728
729 tracing::error!(
730 "β TOOL FAILED: {} in {:?} - Error: {}",
731 tool_name,
732 duration,
733 e
734 );
735
736 let error_message = AgentMessage {
738 role: MessageRole::Tool,
739 content: MessageContent::Text(format!(
740 "Error executing {}: {}",
741 tool_name, e
742 )),
743 metadata: None,
744 };
745 self.append_history(error_message);
746 }
748 }
749 } else {
750 tracing::warn!("β οΈ Tool '{}' not found", tool_name);
752 let error_message = AgentMessage {
753 role: MessageRole::Tool,
754 content: MessageContent::Text(format!(
755 "Tool '{}' not found. Available tools: {}",
756 tool_name,
757 tools
758 .keys()
759 .map(|k| k.as_str())
760 .collect::<Vec<_>>()
761 .join(", ")
762 )),
763 metadata: None,
764 };
765 self.append_history(error_message);
766 }
768 }
769 PlannerAction::Terminate => {
770 tracing::debug!("π Agent terminated");
772 let message = AgentMessage {
773 role: MessageRole::Agent,
774 content: MessageContent::Text("Task completed.".into()),
775 metadata: None,
776 };
777 self.append_history(message.clone());
778 return Ok(message);
779 }
780 }
781 }
782 }
783}
784
785#[async_trait]
786impl AgentHandle for DeepAgent {
787 async fn describe(&self) -> AgentDescriptor {
788 self.descriptor.clone()
789 }
790
791 async fn handle_message(
792 &self,
793 input: AgentMessage,
794 _state: Arc<AgentStateSnapshot>,
795 ) -> anyhow::Result<AgentMessage> {
796 let response = self.handle_message_internal(input, _state).await?;
797
798 if let Some(checkpointer) = &self.checkpointer {
800 let state_clone = self
801 .state
802 .read()
803 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
804 .clone();
805 checkpointer
806 .save_state(&ThreadId::default(), &state_clone)
807 .await?;
808 }
809
810 Ok(response)
811 }
812
813 async fn handle_message_stream(
814 &self,
815 input: AgentMessage,
816 _state: Arc<AgentStateSnapshot>,
817 ) -> anyhow::Result<agents_core::agent::AgentStream> {
818 use crate::planner::LlmBackedPlanner;
819 use agents_core::llm::{LlmRequest, StreamChunk};
820 use futures::StreamExt;
821
822 self.append_history(input.clone());
824
825 let mut request = ModelRequest::new(&self.instructions, self.current_history());
827 let tools = self.collect_tools();
828
829 for middleware in &self.middlewares {
831 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
832 middleware.modify_model_request(&mut ctx).await?;
833 }
834
835 let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
837 let llm_request = LlmRequest {
838 system_prompt: request.system_prompt.clone(),
839 messages: request.messages.clone(),
840 tools: tool_schemas,
841 };
842
843 let planner_any = self.planner.as_any();
845
846 if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
847 let model = llm_planner.model().clone();
849 let stream = model.generate_stream(llm_request).await?;
850
851 let agent_name = self.descriptor.name.clone();
853 let event_dispatcher = self.event_dispatcher.clone();
854
855 let wrapped_stream = stream.then(move |chunk_result| {
856 let dispatcher = event_dispatcher.clone();
857 let name = agent_name.clone();
858
859 async move {
860 match &chunk_result {
861 Ok(StreamChunk::TextDelta(token)) => {
862 if let Some(ref dispatcher) = dispatcher {
864 let event = agents_core::events::AgentEvent::StreamingToken(
865 agents_core::events::StreamingTokenEvent {
866 metadata: agents_core::events::EventMetadata::new(
867 "default".to_string(),
868 uuid::Uuid::new_v4().to_string(),
869 None,
870 ),
871 agent_name: name.clone(),
872 token: token.clone(),
873 },
874 );
875 dispatcher.dispatch(event).await;
876 }
877 }
878 Ok(StreamChunk::Done { message }) => {
879 if let Some(ref dispatcher) = dispatcher {
881 let full_text = match &message.content {
882 agents_core::messaging::MessageContent::Text(t) => t.clone(),
883 agents_core::messaging::MessageContent::Json(v) => {
884 v.to_string()
885 }
886 };
887
888 let preview = if full_text.len() > 100 {
889 format!("{}...", &full_text[..100])
890 } else {
891 full_text.clone()
892 };
893
894 let event = agents_core::events::AgentEvent::AgentCompleted(
895 agents_core::events::AgentCompletedEvent {
896 metadata: agents_core::events::EventMetadata::new(
897 "default".to_string(),
898 uuid::Uuid::new_v4().to_string(),
899 None,
900 ),
901 agent_name: name.clone(),
902 duration_ms: 0, response_preview: preview,
904 response: full_text,
905 },
906 );
907 dispatcher.dispatch(event).await;
908 }
909 }
910 _ => {}
911 }
912 chunk_result
913 }
914 });
915
916 Ok(Box::pin(wrapped_stream))
917 } else {
918 let response = self.handle_message_internal(input, _state).await?;
920 Ok(Box::pin(futures::stream::once(async move {
921 Ok(StreamChunk::Done { message: response })
922 })))
923 }
924 }
925
926 async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
927 let state_guard = self
928 .state
929 .read()
930 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
931 Ok(state_guard.pending_interrupts.first().cloned())
932 }
933
934 async fn resume_with_approval(
935 &self,
936 action: agents_core::hitl::HitlAction,
937 ) -> anyhow::Result<AgentMessage> {
938 self.resume_with_approval(action).await
939 }
940}
941
942pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
947 let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
948 let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
949
950 let planning = Arc::new(PlanningMiddleware::new(state.clone()));
951 let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
952
953 let mut registrations: Vec<SubAgentRegistration> = Vec::new();
955
956 for subagent_config in &config.subagent_configs {
957 let sub_planner = if let Some(ref model) = subagent_config.model {
959 Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
961 } else {
962 config.planner.clone()
964 };
965
966 let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
968
969 sub_cfg = sub_cfg.with_max_iterations(config.max_iterations.get());
971
972 if let Some(ref tools) = subagent_config.tools {
974 tracing::debug!(
975 " - Configuring {} tools for {}",
976 tools.len(),
977 subagent_config.name
978 );
979 for tool in tools {
980 sub_cfg = sub_cfg.with_tool(tool.clone());
981 }
982 }
983
984 if let Some(ref builtin) = subagent_config.builtin_tools {
986 sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
987 }
988
989 sub_cfg = sub_cfg.with_auto_general_purpose(false);
991
992 sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
994
995 sub_cfg = sub_cfg.with_pii_sanitization(config.enable_pii_sanitization);
997
998 let sub_agent = create_deep_agent_from_config(sub_cfg);
1000
1001 registrations.push(SubAgentRegistration {
1003 descriptor: SubAgentDescriptor {
1004 name: subagent_config.name.clone(),
1005 description: subagent_config.description.clone(),
1006 },
1007 agent: Arc::new(sub_agent),
1008 });
1009
1010 tracing::info!("=> Registered sub-agent: {}", subagent_config.name);
1011 }
1012
1013 tracing::info!("=> Total sub-agents registered: {}", registrations.len());
1014
1015 if config.auto_general_purpose {
1017 let has_gp = registrations
1018 .iter()
1019 .any(|r| r.descriptor.name == "general-purpose");
1020 if !has_gp {
1021 let mut sub_cfg =
1023 DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
1024 .with_auto_general_purpose(false)
1025 .with_prompt_caching(config.enable_prompt_caching)
1026 .with_pii_sanitization(config.enable_pii_sanitization)
1027 .with_max_iterations(config.max_iterations.get());
1028 if let Some(ref selected) = config.builtin_tools {
1029 sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
1030 }
1031 if let Some(ref sum) = config.summarization {
1032 sub_cfg = sub_cfg.with_summarization(sum.clone());
1033 }
1034 for t in &config.tools {
1035 sub_cfg = sub_cfg.with_tool(t.clone());
1036 }
1037
1038 let gp = create_deep_agent_from_config(sub_cfg);
1039 registrations.push(SubAgentRegistration {
1040 descriptor: SubAgentDescriptor {
1041 name: "general-purpose".into(),
1042 description: "Default reasoning agent".into(),
1043 },
1044 agent: Arc::new(gp),
1045 });
1046 }
1047 }
1048
1049 let subagent = Arc::new(SubAgentMiddleware::new_with_events(
1050 registrations,
1051 config.event_dispatcher.clone(),
1052 ));
1053 let base_prompt = Arc::new(BaseSystemPromptMiddleware);
1054
1055 let deep_agent_prompt: Arc<dyn AgentMiddleware> =
1057 if let Some(ref custom_prompt) = config.custom_system_prompt {
1058 Arc::new(DeepAgentPromptMiddleware::with_override(
1059 custom_prompt.clone(),
1060 ))
1061 } else {
1062 Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()))
1063 };
1064 let summarization = config.summarization.as_ref().map(|cfg| {
1065 Arc::new(SummarizationMiddleware::new(
1066 cfg.messages_to_keep,
1067 cfg.summary_note.clone(),
1068 ))
1069 });
1070 let hitl = if config.tool_interrupts.is_empty() {
1071 None
1072 } else {
1073 if config.checkpointer.is_none() {
1075 tracing::error!(
1076 "β οΈ HITL middleware requires a checkpointer to persist interrupt state. \
1077 HITL will be disabled. Please configure a checkpointer to enable HITL."
1078 );
1079 None
1080 } else {
1081 tracing::info!("π HITL enabled for {} tools", config.tool_interrupts.len());
1082 Some(Arc::new(HumanInLoopMiddleware::new(
1083 config.tool_interrupts.clone(),
1084 )))
1085 }
1086 };
1087
1088 let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
1091 base_prompt,
1092 deep_agent_prompt,
1093 planning,
1094 filesystem,
1095 subagent,
1096 ];
1097 if let Some(ref summary) = summarization {
1098 middlewares.push(summary.clone());
1099 }
1100 if config.enable_prompt_caching {
1101 middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
1102 }
1103 if let Some(ref hitl_mw) = hitl {
1104 middlewares.push(hitl_mw.clone());
1105 }
1106
1107 DeepAgent {
1108 descriptor: AgentDescriptor {
1109 name: "deep-agent".into(),
1110 version: "0.0.1".into(),
1111 description: Some("Rust deep agent".into()),
1112 },
1113 instructions: config.instructions,
1114 planner: config.planner,
1115 middlewares,
1116 base_tools: config.tools,
1117 state,
1118 history,
1119 _summarization: summarization,
1120 _hitl: hitl,
1121 builtin_tools: config.builtin_tools,
1122 checkpointer: config.checkpointer,
1123 event_dispatcher: config.event_dispatcher,
1124 enable_pii_sanitization: config.enable_pii_sanitization,
1125 max_iterations: config.max_iterations,
1126 }
1127}