1use super::config::DeepAgentConfig;
7use crate::middleware::{
8 AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9 DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10 ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11 SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15 AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
34 let mut pending = 0;
35 let mut in_progress = 0;
36 let mut completed = 0;
37
38 for todo in todos {
39 match todo.status {
40 agents_core::state::TodoStatus::Pending => pending += 1,
41 agents_core::state::TodoStatus::InProgress => in_progress += 1,
42 agents_core::state::TodoStatus::Completed => completed += 1,
43 }
44 }
45
46 (pending, in_progress, completed)
47}
48
49pub struct DeepAgent {
54 descriptor: AgentDescriptor,
55 instructions: String,
56 planner: Arc<dyn PlannerHandle>,
57 middlewares: Vec<Arc<dyn AgentMiddleware>>,
58 base_tools: Vec<ToolBox>,
59 state: Arc<RwLock<AgentStateSnapshot>>,
60 history: Arc<RwLock<Vec<AgentMessage>>>,
61 _summarization: Option<Arc<SummarizationMiddleware>>,
62 _hitl: Option<Arc<HumanInLoopMiddleware>>,
63 builtin_tools: Option<HashSet<String>>,
64 checkpointer: Option<Arc<dyn Checkpointer>>,
65 event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
66 enable_pii_sanitization: bool,
67}
68
69impl DeepAgent {
70 fn collect_tools(&self) -> HashMap<String, ToolBox> {
71 let mut tools: HashMap<String, ToolBox> = HashMap::new();
72 for tool in &self.base_tools {
73 tools.insert(tool.schema().name.clone(), tool.clone());
74 }
75 for middleware in &self.middlewares {
76 for tool in middleware.tools() {
77 let tool_name = tool.schema().name.clone();
78 if self.should_include(&tool_name) {
79 tools.insert(tool_name, tool);
80 }
81 }
82 }
83 tools
84 }
85 fn should_include(&self, name: &str) -> bool {
88 let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
89 if !is_builtin {
90 return true;
91 }
92 match &self.builtin_tools {
93 None => true,
94 Some(selected) => selected.contains(name),
95 }
96 }
97
98 fn append_history(&self, message: AgentMessage) {
99 if let Ok(mut history) = self.history.write() {
100 history.push(message);
101 }
102 }
103
104 fn current_history(&self) -> Vec<AgentMessage> {
105 self.history.read().map(|h| h.clone()).unwrap_or_default()
106 }
107
108 fn emit_event(&self, event: agents_core::events::AgentEvent) {
109 if let Some(dispatcher) = &self.event_dispatcher {
110 let dispatcher_clone = dispatcher.clone();
111 tokio::spawn(async move {
112 dispatcher_clone.dispatch(event).await;
113 });
114 }
115 }
116
117 fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
118 agents_core::events::EventMetadata::new(
119 "default".to_string(),
120 uuid::Uuid::new_v4().to_string(),
121 None,
122 )
123 }
124
125 fn truncate_message(&self, message: &AgentMessage) -> String {
126 let text = match &message.content {
127 MessageContent::Text(t) => t.clone(),
128 MessageContent::Json(v) => v.to_string(),
129 };
130
131 if self.enable_pii_sanitization {
132 agents_core::security::safe_preview(&text, agents_core::security::MAX_PREVIEW_LENGTH)
133 } else {
134 agents_core::security::truncate_string(&text, agents_core::security::MAX_PREVIEW_LENGTH)
136 }
137 }
138
139 fn get_full_message_text(&self, message: &AgentMessage) -> String {
140 match &message.content {
141 MessageContent::Text(t) => t.clone(),
142 MessageContent::Json(v) => v.to_string(),
143 }
144 }
145
146 fn summarize_payload(&self, payload: &Value) -> String {
147 if self.enable_pii_sanitization {
148 agents_core::security::sanitize_tool_payload(
149 payload,
150 agents_core::security::MAX_PREVIEW_LENGTH,
151 )
152 } else {
153 let json_str = payload.to_string();
155 agents_core::security::truncate_string(
156 &json_str,
157 agents_core::security::MAX_PREVIEW_LENGTH,
158 )
159 }
160 }
161
162 pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
164 if let Some(ref checkpointer) = self.checkpointer {
165 let state = self
166 .state
167 .read()
168 .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
169 .clone();
170
171 let state_json = serde_json::to_string(&state)?;
173 let state_size = state_json.len();
174
175 checkpointer.save_state(thread_id, &state).await?;
177
178 self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
180 agents_core::events::StateCheckpointedEvent {
181 metadata: self.create_event_metadata(),
182 checkpoint_id: thread_id.to_string(),
183 state_size_bytes: state_size,
184 },
185 ));
186
187 tracing::debug!(
188 thread_id = %thread_id,
189 state_size_bytes = state_size,
190 "πΎ State checkpointed and event emitted"
191 );
192
193 Ok(())
194 } else {
195 tracing::warn!("Attempted to save state but no checkpointer is configured");
196 Ok(())
197 }
198 }
199
200 pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
202 if let Some(ref checkpointer) = self.checkpointer {
203 if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
204 *self
205 .state
206 .write()
207 .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
208 tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
209 Ok(true)
210 } else {
211 tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
212 Ok(false)
213 }
214 } else {
215 tracing::warn!("Attempted to load state but no checkpointer is configured");
216 Ok(false)
217 }
218 }
219
220 pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
222 if let Some(ref checkpointer) = self.checkpointer {
223 checkpointer.delete_thread(thread_id).await
224 } else {
225 tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
226 Ok(())
227 }
228 }
229
230 pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
232 if let Some(ref checkpointer) = self.checkpointer {
233 checkpointer.list_threads().await
234 } else {
235 Ok(Vec::new())
236 }
237 }
238
239 async fn execute_tool(
240 &self,
241 tool: ToolBox,
242 _tool_name: String,
243 payload: Value,
244 ) -> anyhow::Result<AgentMessage> {
245 let state_snapshot = self.state.read().unwrap().clone();
246 let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
247
248 let result = tool.execute(payload, ctx).await?;
249 Ok(self.apply_tool_result(result))
250 }
251
252 fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
253 match result {
254 ToolResult::Message(message) => {
255 message
258 }
259 ToolResult::WithStateUpdate {
260 message,
261 state_diff,
262 } => {
263 let todos_updated = state_diff.todos.is_some();
265
266 if let Ok(mut state) = self.state.write() {
267 let command = agents_core::command::Command::with_state(state_diff);
268 command.apply_to(&mut state);
269
270 if todos_updated {
272 let (pending_count, in_progress_count, completed_count) =
273 count_todos(&state.todos);
274
275 self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
276 agents_core::events::TodosUpdatedEvent {
277 metadata: self.create_event_metadata(),
278 todos: state.todos.clone(),
279 pending_count,
280 in_progress_count,
281 completed_count,
282 last_updated: chrono::Utc::now().to_rfc3339(),
283 },
284 ));
285
286 tracing::debug!(
287 pending = pending_count,
288 in_progress = in_progress_count,
289 completed = completed_count,
290 total = state.todos.len(),
291 "π Todos updated and event emitted"
292 );
293 }
294 }
295 message
298 }
299 }
300 }
301
302 pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
304 self.state
305 .read()
306 .ok()
307 .and_then(|guard| guard.pending_interrupts.first().cloned())
308 }
309
310 pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
322 if let Some(dispatcher) = &self.event_dispatcher {
323 dispatcher.add_broadcaster(broadcaster);
324 tracing::debug!("Broadcaster added to event dispatcher");
325 } else {
326 tracing::warn!("add_broadcaster called but no event dispatcher configured");
327 }
328 }
329
330 pub fn add_broadcasters(
345 &self,
346 broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
347 ) {
348 if let Some(dispatcher) = &self.event_dispatcher {
349 for broadcaster in broadcasters {
350 dispatcher.add_broadcaster(broadcaster);
351 }
352 tracing::debug!("Multiple broadcasters added to event dispatcher");
353 } else {
354 tracing::warn!("add_broadcasters called but no event dispatcher configured");
355 }
356 }
357
358 pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
360 let interrupt = {
362 let state_guard = self
363 .state
364 .read()
365 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
366 state_guard
367 .pending_interrupts
368 .first()
369 .cloned()
370 .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
371 };
372
373 let result_message = match action {
374 HitlAction::Accept => {
375 let AgentInterrupt::HumanInLoop(hitl) = interrupt;
377 tracing::info!(
378 tool_name = %hitl.tool_name,
379 call_id = %hitl.call_id,
380 "β
HITL: Tool approved, executing with original arguments"
381 );
382
383 let tools = self.collect_tools();
384 let tool = tools
385 .get(&hitl.tool_name)
386 .cloned()
387 .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
388
389 self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
390 .await?
391 }
392
393 HitlAction::Edit {
394 tool_name,
395 tool_args,
396 } => {
397 tracing::info!(
399 tool_name = %tool_name,
400 "βοΈ HITL: Tool edited, executing with modified arguments"
401 );
402
403 let tools = self.collect_tools();
404 let tool = tools
405 .get(&tool_name)
406 .cloned()
407 .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
408
409 self.execute_tool(tool, tool_name, tool_args).await?
410 }
411
412 HitlAction::Reject { reason } => {
413 tracing::info!("β HITL: Tool rejected");
415
416 let text = reason
417 .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
418
419 let message = AgentMessage {
420 role: MessageRole::Tool,
421 content: MessageContent::Text(text),
422 metadata: None,
423 };
424
425 self.append_history(message.clone());
426 message
427 }
428
429 HitlAction::Respond { message } => {
430 tracing::info!("π¬ HITL: Custom response provided");
432
433 self.append_history(message.clone());
434 message
435 }
436 };
437
438 {
440 let mut state_guard = self
441 .state
442 .write()
443 .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
444 state_guard.clear_interrupts();
445 }
446
447 if let Some(checkpointer) = &self.checkpointer {
449 let state_clone = self
450 .state
451 .read()
452 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
453 .clone();
454 checkpointer
455 .save_state(&ThreadId::default(), &state_clone)
456 .await?;
457 }
458
459 Ok(result_message)
460 }
461
462 pub async fn handle_message(
464 &self,
465 input: impl AsRef<str>,
466 state: Arc<AgentStateSnapshot>,
467 ) -> anyhow::Result<AgentMessage> {
468 self.handle_message_with_metadata(input, None, state).await
469 }
470
471 pub async fn handle_message_with_metadata(
473 &self,
474 input: impl AsRef<str>,
475 metadata: Option<MessageMetadata>,
476 state: Arc<AgentStateSnapshot>,
477 ) -> anyhow::Result<AgentMessage> {
478 let agent_message = AgentMessage {
479 role: MessageRole::User,
480 content: MessageContent::Text(input.as_ref().to_string()),
481 metadata,
482 };
483 self.handle_message_internal(agent_message, state).await
484 }
485
486 async fn handle_message_internal(
488 &self,
489 input: AgentMessage,
490 _state: Arc<AgentStateSnapshot>,
491 ) -> anyhow::Result<AgentMessage> {
492 let start_time = std::time::Instant::now();
493
494 self.emit_event(agents_core::events::AgentEvent::AgentStarted(
495 agents_core::events::AgentStartedEvent {
496 metadata: self.create_event_metadata(),
497 agent_name: self.descriptor.name.clone(),
498 message_preview: self.truncate_message(&input),
499 },
500 ));
501
502 self.append_history(input.clone());
503
504 let max_iterations = 10;
506 let mut iteration = 0;
507
508 loop {
509 iteration += 1;
510 if iteration > max_iterations {
511 tracing::warn!(
512 "β οΈ Max iterations ({}) reached, stopping ReAct loop",
513 max_iterations
514 );
515 let response = AgentMessage {
516 role: MessageRole::Agent,
517 content: MessageContent::Text(
518 "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
519 ),
520 metadata: None,
521 };
522 self.append_history(response.clone());
523 return Ok(response);
524 }
525
526 tracing::debug!("π ReAct iteration {}/{}", iteration, max_iterations);
527
528 let mut request = ModelRequest::new(&self.instructions, self.current_history());
530 let tools = self.collect_tools();
531 for middleware in &self.middlewares {
532 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
533 middleware.modify_model_request(&mut ctx).await?;
534 }
535
536 let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
537 let context = PlannerContext {
538 history: request.messages.clone(),
539 system_prompt: request.system_prompt.clone(),
540 tools: tool_schemas,
541 };
542 let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
543
544 let decision = self.planner.plan(context, state_snapshot).await?;
546
547 self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
549 agents_core::events::PlanningCompleteEvent {
550 metadata: self.create_event_metadata(),
551 action_type: match &decision.next_action {
552 PlannerAction::Respond { .. } => "respond".to_string(),
553 PlannerAction::CallTool { .. } => "call_tool".to_string(),
554 PlannerAction::Terminate => "terminate".to_string(),
555 },
556 action_summary: match &decision.next_action {
557 PlannerAction::Respond { message } => {
558 format!("Respond: {}", self.truncate_message(message))
559 }
560 PlannerAction::CallTool { tool_name, .. } => {
561 format!("Call tool: {}", tool_name)
562 }
563 PlannerAction::Terminate => "Terminate".to_string(),
564 },
565 },
566 ));
567
568 match decision.next_action {
569 PlannerAction::Respond { message } => {
570 self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
572 agents_core::events::AgentCompletedEvent {
573 metadata: self.create_event_metadata(),
574 agent_name: self.descriptor.name.clone(),
575 duration_ms: start_time.elapsed().as_millis() as u64,
576 response_preview: self.truncate_message(&message),
577 response: self.get_full_message_text(&message),
578 },
579 ));
580
581 self.append_history(message.clone());
582 return Ok(message);
583 }
584 PlannerAction::CallTool { tool_name, payload } => {
585 let tool_call_message = AgentMessage {
590 role: MessageRole::Agent,
591 content: MessageContent::Text(format!(
592 "Calling tool: {} with args: {}",
593 tool_name,
594 serde_json::to_string(&payload).unwrap_or_default()
595 )),
596 metadata: None,
597 };
598 self.append_history(tool_call_message);
599
600 if let Some(tool) = tools.get(&tool_name).cloned() {
601 let call_id = format!("call_{}", uuid::Uuid::new_v4());
603 for middleware in &self.middlewares {
604 if let Some(interrupt) = middleware
605 .before_tool_execution(&tool_name, &payload, &call_id)
606 .await?
607 {
608 {
610 let mut state_guard = self.state.write().map_err(|_| {
611 anyhow::anyhow!("Failed to acquire write lock on state")
612 })?;
613 state_guard.add_interrupt(interrupt.clone());
614 }
615
616 if let Some(checkpointer) = &self.checkpointer {
618 let state_clone = self
619 .state
620 .read()
621 .map_err(|_| {
622 anyhow::anyhow!("Failed to acquire read lock on state")
623 })?
624 .clone();
625 checkpointer
626 .save_state(&ThreadId::default(), &state_clone)
627 .await?;
628 }
629
630 let interrupt_message = AgentMessage {
632 role: MessageRole::System,
633 content: MessageContent::Text(format!(
634 "βΈοΈ Execution paused: Tool '{}' requires human approval",
635 tool_name
636 )),
637 metadata: None,
638 };
639 self.append_history(interrupt_message.clone());
640 return Ok(interrupt_message);
641 }
642 }
643
644 let tool_start_time = std::time::Instant::now();
646
647 self.emit_event(agents_core::events::AgentEvent::ToolStarted(
648 agents_core::events::ToolStartedEvent {
649 metadata: self.create_event_metadata(),
650 tool_name: tool_name.clone(),
651 input_summary: self.summarize_payload(&payload),
652 },
653 ));
654
655 tracing::warn!(
656 "βοΈ EXECUTING TOOL: {} with payload: {}",
657 tool_name,
658 serde_json::to_string(&payload)
659 .unwrap_or_else(|_| "invalid json".to_string())
660 );
661
662 let result = self
663 .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
664 .await;
665
666 let duration = tool_start_time.elapsed();
667 match result {
668 Ok(tool_result_message) => {
669 let content_preview = match &tool_result_message.content {
670 MessageContent::Text(t) => {
671 if t.len() > 100 {
672 format!("{}... ({} chars)", &t[..100], t.len())
673 } else {
674 t.clone()
675 }
676 }
677 MessageContent::Json(v) => {
678 format!("JSON: {} bytes", v.to_string().len())
679 }
680 };
681
682 self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
683 agents_core::events::ToolCompletedEvent {
684 metadata: self.create_event_metadata(),
685 tool_name: tool_name.clone(),
686 duration_ms: duration.as_millis() as u64,
687 result_summary: content_preview.clone(),
688 success: true,
689 },
690 ));
691
692 tracing::warn!(
693 "β
TOOL COMPLETED: {} in {:?} - Result: {}",
694 tool_name,
695 duration,
696 content_preview
697 );
698
699 self.append_history(tool_result_message);
701 }
703 Err(e) => {
704 self.emit_event(agents_core::events::AgentEvent::ToolFailed(
705 agents_core::events::ToolFailedEvent {
706 metadata: self.create_event_metadata(),
707 tool_name: tool_name.clone(),
708 duration_ms: duration.as_millis() as u64,
709 error_message: e.to_string(),
710 is_recoverable: true,
711 retry_count: 0,
712 },
713 ));
714
715 tracing::error!(
716 "β TOOL FAILED: {} in {:?} - Error: {}",
717 tool_name,
718 duration,
719 e
720 );
721
722 let error_message = AgentMessage {
724 role: MessageRole::Tool,
725 content: MessageContent::Text(format!(
726 "Error executing {}: {}",
727 tool_name, e
728 )),
729 metadata: None,
730 };
731 self.append_history(error_message);
732 }
734 }
735 } else {
736 tracing::warn!("β οΈ Tool '{}' not found", tool_name);
738 let error_message = AgentMessage {
739 role: MessageRole::Tool,
740 content: MessageContent::Text(format!(
741 "Tool '{}' not found. Available tools: {}",
742 tool_name,
743 tools
744 .keys()
745 .map(|k| k.as_str())
746 .collect::<Vec<_>>()
747 .join(", ")
748 )),
749 metadata: None,
750 };
751 self.append_history(error_message);
752 }
754 }
755 PlannerAction::Terminate => {
756 tracing::debug!("π Agent terminated");
758 let message = AgentMessage {
759 role: MessageRole::Agent,
760 content: MessageContent::Text("Task completed.".into()),
761 metadata: None,
762 };
763 self.append_history(message.clone());
764 return Ok(message);
765 }
766 }
767 }
768 }
769}
770
771#[async_trait]
772impl AgentHandle for DeepAgent {
773 async fn describe(&self) -> AgentDescriptor {
774 self.descriptor.clone()
775 }
776
777 async fn handle_message(
778 &self,
779 input: AgentMessage,
780 _state: Arc<AgentStateSnapshot>,
781 ) -> anyhow::Result<AgentMessage> {
782 self.handle_message_internal(input, _state).await
783 }
784
785 async fn handle_message_stream(
786 &self,
787 input: AgentMessage,
788 _state: Arc<AgentStateSnapshot>,
789 ) -> anyhow::Result<agents_core::agent::AgentStream> {
790 use crate::planner::LlmBackedPlanner;
791 use agents_core::llm::{LlmRequest, StreamChunk};
792 use futures::StreamExt;
793
794 self.append_history(input.clone());
796
797 let mut request = ModelRequest::new(&self.instructions, self.current_history());
799 let tools = self.collect_tools();
800
801 for middleware in &self.middlewares {
803 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
804 middleware.modify_model_request(&mut ctx).await?;
805 }
806
807 let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
809 let llm_request = LlmRequest {
810 system_prompt: request.system_prompt.clone(),
811 messages: request.messages.clone(),
812 tools: tool_schemas,
813 };
814
815 let planner_any = self.planner.as_any();
817
818 if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
819 let model = llm_planner.model().clone();
821 let stream = model.generate_stream(llm_request).await?;
822
823 let agent_name = self.descriptor.name.clone();
825 let event_dispatcher = self.event_dispatcher.clone();
826
827 let wrapped_stream = stream.then(move |chunk_result| {
828 let dispatcher = event_dispatcher.clone();
829 let name = agent_name.clone();
830
831 async move {
832 match &chunk_result {
833 Ok(StreamChunk::TextDelta(token)) => {
834 if let Some(ref dispatcher) = dispatcher {
836 let event = agents_core::events::AgentEvent::StreamingToken(
837 agents_core::events::StreamingTokenEvent {
838 metadata: agents_core::events::EventMetadata::new(
839 "default".to_string(),
840 uuid::Uuid::new_v4().to_string(),
841 None,
842 ),
843 agent_name: name.clone(),
844 token: token.clone(),
845 },
846 );
847 dispatcher.dispatch(event).await;
848 }
849 }
850 Ok(StreamChunk::Done { message }) => {
851 if let Some(ref dispatcher) = dispatcher {
853 let full_text = match &message.content {
854 agents_core::messaging::MessageContent::Text(t) => t.clone(),
855 agents_core::messaging::MessageContent::Json(v) => {
856 v.to_string()
857 }
858 };
859
860 let preview = if full_text.len() > 100 {
861 format!("{}...", &full_text[..100])
862 } else {
863 full_text.clone()
864 };
865
866 let event = agents_core::events::AgentEvent::AgentCompleted(
867 agents_core::events::AgentCompletedEvent {
868 metadata: agents_core::events::EventMetadata::new(
869 "default".to_string(),
870 uuid::Uuid::new_v4().to_string(),
871 None,
872 ),
873 agent_name: name.clone(),
874 duration_ms: 0, response_preview: preview,
876 response: full_text,
877 },
878 );
879 dispatcher.dispatch(event).await;
880 }
881 }
882 _ => {}
883 }
884 chunk_result
885 }
886 });
887
888 Ok(Box::pin(wrapped_stream))
889 } else {
890 let response = self.handle_message_internal(input, _state).await?;
892 Ok(Box::pin(futures::stream::once(async move {
893 Ok(StreamChunk::Done { message: response })
894 })))
895 }
896 }
897
898 async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
899 let state_guard = self
900 .state
901 .read()
902 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
903 Ok(state_guard.pending_interrupts.first().cloned())
904 }
905
906 async fn resume_with_approval(
907 &self,
908 action: agents_core::hitl::HitlAction,
909 ) -> anyhow::Result<AgentMessage> {
910 self.resume_with_approval(action).await
911 }
912}
913
914pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
919 let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
920 let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
921
922 let planning = Arc::new(PlanningMiddleware::new(state.clone()));
923 let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
924
925 let mut registrations: Vec<SubAgentRegistration> = Vec::new();
927
928 tracing::info!(
930 "π Processing {} sub-agent configurations",
931 config.subagent_configs.len()
932 );
933
934 for subagent_config in &config.subagent_configs {
935 tracing::info!(
936 "ποΈ Building sub-agent: {} ({})",
937 subagent_config.name,
938 subagent_config.description
939 );
940
941 let sub_planner = if let Some(ref model) = subagent_config.model {
943 Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
945 } else {
946 config.planner.clone()
948 };
949
950 let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
952
953 if let Some(ref tools) = subagent_config.tools {
955 tracing::debug!(
956 " - Configuring {} tools for {}",
957 tools.len(),
958 subagent_config.name
959 );
960 for tool in tools {
961 sub_cfg = sub_cfg.with_tool(tool.clone());
962 }
963 }
964
965 if let Some(ref builtin) = subagent_config.builtin_tools {
967 sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
968 }
969
970 sub_cfg = sub_cfg.with_auto_general_purpose(false);
972
973 sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
975
976 sub_cfg = sub_cfg.with_pii_sanitization(config.enable_pii_sanitization);
978
979 let sub_agent = create_deep_agent_from_config(sub_cfg);
981
982 registrations.push(SubAgentRegistration {
984 descriptor: SubAgentDescriptor {
985 name: subagent_config.name.clone(),
986 description: subagent_config.description.clone(),
987 },
988 agent: Arc::new(sub_agent),
989 });
990
991 tracing::info!("=> Registered sub-agent: {}", subagent_config.name);
992 }
993
994 tracing::info!("=> Total sub-agents registered: {}", registrations.len());
995
996 if config.auto_general_purpose {
998 let has_gp = registrations
999 .iter()
1000 .any(|r| r.descriptor.name == "general-purpose");
1001 if !has_gp {
1002 let mut sub_cfg =
1004 DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
1005 .with_auto_general_purpose(false)
1006 .with_prompt_caching(config.enable_prompt_caching)
1007 .with_pii_sanitization(config.enable_pii_sanitization);
1008 if let Some(ref selected) = config.builtin_tools {
1009 sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
1010 }
1011 if let Some(ref sum) = config.summarization {
1012 sub_cfg = sub_cfg.with_summarization(sum.clone());
1013 }
1014 for t in &config.tools {
1015 sub_cfg = sub_cfg.with_tool(t.clone());
1016 }
1017
1018 let gp = create_deep_agent_from_config(sub_cfg);
1019 registrations.push(SubAgentRegistration {
1020 descriptor: SubAgentDescriptor {
1021 name: "general-purpose".into(),
1022 description: "Default reasoning agent".into(),
1023 },
1024 agent: Arc::new(gp),
1025 });
1026 }
1027 }
1028
1029 let subagent = Arc::new(SubAgentMiddleware::new_with_events(
1030 registrations,
1031 config.event_dispatcher.clone(),
1032 ));
1033 let base_prompt = Arc::new(BaseSystemPromptMiddleware);
1034 let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
1035 let summarization = config.summarization.as_ref().map(|cfg| {
1036 Arc::new(SummarizationMiddleware::new(
1037 cfg.messages_to_keep,
1038 cfg.summary_note.clone(),
1039 ))
1040 });
1041 let hitl = if config.tool_interrupts.is_empty() {
1042 None
1043 } else {
1044 if config.checkpointer.is_none() {
1046 tracing::error!(
1047 "β οΈ HITL middleware requires a checkpointer to persist interrupt state. \
1048 HITL will be disabled. Please configure a checkpointer to enable HITL."
1049 );
1050 None
1051 } else {
1052 tracing::info!("π HITL enabled for {} tools", config.tool_interrupts.len());
1053 Some(Arc::new(HumanInLoopMiddleware::new(
1054 config.tool_interrupts.clone(),
1055 )))
1056 }
1057 };
1058
1059 let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
1062 base_prompt,
1063 deep_agent_prompt,
1064 planning,
1065 filesystem,
1066 subagent,
1067 ];
1068 if let Some(ref summary) = summarization {
1069 middlewares.push(summary.clone());
1070 }
1071 if config.enable_prompt_caching {
1072 middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
1073 }
1074 if let Some(ref hitl_mw) = hitl {
1075 middlewares.push(hitl_mw.clone());
1076 }
1077
1078 DeepAgent {
1079 descriptor: AgentDescriptor {
1080 name: "deep-agent".into(),
1081 version: "0.0.1".into(),
1082 description: Some("Rust deep agent".into()),
1083 },
1084 instructions: config.instructions,
1085 planner: config.planner,
1086 middlewares,
1087 base_tools: config.tools,
1088 state,
1089 history,
1090 _summarization: summarization,
1091 _hitl: hitl,
1092 builtin_tools: config.builtin_tools,
1093 checkpointer: config.checkpointer,
1094 event_dispatcher: config.event_dispatcher,
1095 enable_pii_sanitization: config.enable_pii_sanitization,
1096 }
1097}