1use super::config::DeepAgentConfig;
7use crate::middleware::{
8 AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9 DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10 ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11 SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15 AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30fn count_todos(todos: &[agents_core::state::TodoItem]) -> (usize, usize, usize) {
34 let mut pending = 0;
35 let mut in_progress = 0;
36 let mut completed = 0;
37
38 for todo in todos {
39 match todo.status {
40 agents_core::state::TodoStatus::Pending => pending += 1,
41 agents_core::state::TodoStatus::InProgress => in_progress += 1,
42 agents_core::state::TodoStatus::Completed => completed += 1,
43 }
44 }
45
46 (pending, in_progress, completed)
47}
48
49pub struct DeepAgent {
54 descriptor: AgentDescriptor,
55 instructions: String,
56 planner: Arc<dyn PlannerHandle>,
57 middlewares: Vec<Arc<dyn AgentMiddleware>>,
58 base_tools: Vec<ToolBox>,
59 state: Arc<RwLock<AgentStateSnapshot>>,
60 history: Arc<RwLock<Vec<AgentMessage>>>,
61 _summarization: Option<Arc<SummarizationMiddleware>>,
62 _hitl: Option<Arc<HumanInLoopMiddleware>>,
63 builtin_tools: Option<HashSet<String>>,
64 checkpointer: Option<Arc<dyn Checkpointer>>,
65 event_dispatcher: Option<Arc<agents_core::events::EventDispatcher>>,
66 enable_pii_sanitization: bool,
67}
68
69impl DeepAgent {
70 fn collect_tools(&self) -> HashMap<String, ToolBox> {
71 let mut tools: HashMap<String, ToolBox> = HashMap::new();
72 for tool in &self.base_tools {
73 tools.insert(tool.schema().name.clone(), tool.clone());
74 }
75 for middleware in &self.middlewares {
76 for tool in middleware.tools() {
77 let tool_name = tool.schema().name.clone();
78 if self.should_include(&tool_name) {
79 tools.insert(tool_name, tool);
80 }
81 }
82 }
83 tools
84 }
85 fn should_include(&self, name: &str) -> bool {
88 let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
89 if !is_builtin {
90 return true;
91 }
92 match &self.builtin_tools {
93 None => true,
94 Some(selected) => selected.contains(name),
95 }
96 }
97
98 fn append_history(&self, message: AgentMessage) {
99 if let Ok(mut history) = self.history.write() {
100 history.push(message);
101 }
102 }
103
104 fn current_history(&self) -> Vec<AgentMessage> {
105 self.history.read().map(|h| h.clone()).unwrap_or_default()
106 }
107
108 fn emit_event(&self, event: agents_core::events::AgentEvent) {
109 if let Some(dispatcher) = &self.event_dispatcher {
110 let dispatcher_clone = dispatcher.clone();
111 tokio::spawn(async move {
112 dispatcher_clone.dispatch(event).await;
113 });
114 }
115 }
116
117 fn create_event_metadata(&self) -> agents_core::events::EventMetadata {
118 agents_core::events::EventMetadata::new(
119 "default".to_string(),
120 uuid::Uuid::new_v4().to_string(),
121 None,
122 )
123 }
124
125 fn truncate_message(&self, message: &AgentMessage) -> String {
126 let text = match &message.content {
127 MessageContent::Text(t) => t.clone(),
128 MessageContent::Json(v) => v.to_string(),
129 };
130
131 if self.enable_pii_sanitization {
132 agents_core::security::safe_preview(&text, agents_core::security::MAX_PREVIEW_LENGTH)
133 } else {
134 agents_core::security::truncate_string(&text, agents_core::security::MAX_PREVIEW_LENGTH)
136 }
137 }
138
139 fn summarize_payload(&self, payload: &Value) -> String {
140 if self.enable_pii_sanitization {
141 agents_core::security::sanitize_tool_payload(
142 payload,
143 agents_core::security::MAX_PREVIEW_LENGTH,
144 )
145 } else {
146 let json_str = payload.to_string();
148 agents_core::security::truncate_string(
149 &json_str,
150 agents_core::security::MAX_PREVIEW_LENGTH,
151 )
152 }
153 }
154
155 pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
157 if let Some(ref checkpointer) = self.checkpointer {
158 let state = self
159 .state
160 .read()
161 .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
162 .clone();
163
164 let state_json = serde_json::to_string(&state)?;
166 let state_size = state_json.len();
167
168 checkpointer.save_state(thread_id, &state).await?;
170
171 self.emit_event(agents_core::events::AgentEvent::StateCheckpointed(
173 agents_core::events::StateCheckpointedEvent {
174 metadata: self.create_event_metadata(),
175 checkpoint_id: thread_id.to_string(),
176 state_size_bytes: state_size,
177 },
178 ));
179
180 tracing::debug!(
181 thread_id = %thread_id,
182 state_size_bytes = state_size,
183 "πΎ State checkpointed and event emitted"
184 );
185
186 Ok(())
187 } else {
188 tracing::warn!("Attempted to save state but no checkpointer is configured");
189 Ok(())
190 }
191 }
192
193 pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
195 if let Some(ref checkpointer) = self.checkpointer {
196 if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
197 *self
198 .state
199 .write()
200 .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
201 tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
202 Ok(true)
203 } else {
204 tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
205 Ok(false)
206 }
207 } else {
208 tracing::warn!("Attempted to load state but no checkpointer is configured");
209 Ok(false)
210 }
211 }
212
213 pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
215 if let Some(ref checkpointer) = self.checkpointer {
216 checkpointer.delete_thread(thread_id).await
217 } else {
218 tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
219 Ok(())
220 }
221 }
222
223 pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
225 if let Some(ref checkpointer) = self.checkpointer {
226 checkpointer.list_threads().await
227 } else {
228 Ok(Vec::new())
229 }
230 }
231
232 async fn execute_tool(
233 &self,
234 tool: ToolBox,
235 _tool_name: String,
236 payload: Value,
237 ) -> anyhow::Result<AgentMessage> {
238 let state_snapshot = self.state.read().unwrap().clone();
239 let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
240
241 let result = tool.execute(payload, ctx).await?;
242 Ok(self.apply_tool_result(result))
243 }
244
245 fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
246 match result {
247 ToolResult::Message(message) => {
248 message
251 }
252 ToolResult::WithStateUpdate {
253 message,
254 state_diff,
255 } => {
256 let todos_updated = state_diff.todos.is_some();
258
259 if let Ok(mut state) = self.state.write() {
260 let command = agents_core::command::Command::with_state(state_diff);
261 command.apply_to(&mut state);
262
263 if todos_updated {
265 let (pending_count, in_progress_count, completed_count) =
266 count_todos(&state.todos);
267
268 self.emit_event(agents_core::events::AgentEvent::TodosUpdated(
269 agents_core::events::TodosUpdatedEvent {
270 metadata: self.create_event_metadata(),
271 todos: state.todos.clone(),
272 pending_count,
273 in_progress_count,
274 completed_count,
275 last_updated: chrono::Utc::now().to_rfc3339(),
276 },
277 ));
278
279 tracing::debug!(
280 pending = pending_count,
281 in_progress = in_progress_count,
282 completed = completed_count,
283 total = state.todos.len(),
284 "π Todos updated and event emitted"
285 );
286 }
287 }
288 message
291 }
292 }
293 }
294
295 pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
297 self.state
298 .read()
299 .ok()
300 .and_then(|guard| guard.pending_interrupts.first().cloned())
301 }
302
303 pub fn add_broadcaster(&self, broadcaster: Arc<dyn agents_core::events::EventBroadcaster>) {
315 if let Some(dispatcher) = &self.event_dispatcher {
316 dispatcher.add_broadcaster(broadcaster);
317 tracing::debug!("Broadcaster added to event dispatcher");
318 } else {
319 tracing::warn!("add_broadcaster called but no event dispatcher configured");
320 }
321 }
322
323 pub fn add_broadcasters(
338 &self,
339 broadcasters: Vec<Arc<dyn agents_core::events::EventBroadcaster>>,
340 ) {
341 if let Some(dispatcher) = &self.event_dispatcher {
342 for broadcaster in broadcasters {
343 dispatcher.add_broadcaster(broadcaster);
344 }
345 tracing::debug!("Multiple broadcasters added to event dispatcher");
346 } else {
347 tracing::warn!("add_broadcasters called but no event dispatcher configured");
348 }
349 }
350
351 pub async fn resume_with_approval(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
353 let interrupt = {
355 let state_guard = self
356 .state
357 .read()
358 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
359 state_guard
360 .pending_interrupts
361 .first()
362 .cloned()
363 .ok_or_else(|| anyhow::anyhow!("No pending interrupts"))?
364 };
365
366 let result_message = match action {
367 HitlAction::Accept => {
368 let AgentInterrupt::HumanInLoop(hitl) = interrupt;
370 tracing::info!(
371 tool_name = %hitl.tool_name,
372 call_id = %hitl.call_id,
373 "β
HITL: Tool approved, executing with original arguments"
374 );
375
376 let tools = self.collect_tools();
377 let tool = tools
378 .get(&hitl.tool_name)
379 .cloned()
380 .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", hitl.tool_name))?;
381
382 self.execute_tool(tool, hitl.tool_name, hitl.tool_args)
383 .await?
384 }
385
386 HitlAction::Edit {
387 tool_name,
388 tool_args,
389 } => {
390 tracing::info!(
392 tool_name = %tool_name,
393 "βοΈ HITL: Tool edited, executing with modified arguments"
394 );
395
396 let tools = self.collect_tools();
397 let tool = tools
398 .get(&tool_name)
399 .cloned()
400 .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
401
402 self.execute_tool(tool, tool_name, tool_args).await?
403 }
404
405 HitlAction::Reject { reason } => {
406 tracing::info!("β HITL: Tool rejected");
408
409 let text = reason
410 .unwrap_or_else(|| "Tool execution rejected by human reviewer.".to_string());
411
412 let message = AgentMessage {
413 role: MessageRole::Tool,
414 content: MessageContent::Text(text),
415 metadata: None,
416 };
417
418 self.append_history(message.clone());
419 message
420 }
421
422 HitlAction::Respond { message } => {
423 tracing::info!("π¬ HITL: Custom response provided");
425
426 self.append_history(message.clone());
427 message
428 }
429 };
430
431 {
433 let mut state_guard = self
434 .state
435 .write()
436 .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on state"))?;
437 state_guard.clear_interrupts();
438 }
439
440 if let Some(checkpointer) = &self.checkpointer {
442 let state_clone = self
443 .state
444 .read()
445 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?
446 .clone();
447 checkpointer
448 .save_state(&ThreadId::default(), &state_clone)
449 .await?;
450 }
451
452 Ok(result_message)
453 }
454
455 pub async fn handle_message(
457 &self,
458 input: impl AsRef<str>,
459 state: Arc<AgentStateSnapshot>,
460 ) -> anyhow::Result<AgentMessage> {
461 self.handle_message_with_metadata(input, None, state).await
462 }
463
464 pub async fn handle_message_with_metadata(
466 &self,
467 input: impl AsRef<str>,
468 metadata: Option<MessageMetadata>,
469 state: Arc<AgentStateSnapshot>,
470 ) -> anyhow::Result<AgentMessage> {
471 let agent_message = AgentMessage {
472 role: MessageRole::User,
473 content: MessageContent::Text(input.as_ref().to_string()),
474 metadata,
475 };
476 self.handle_message_internal(agent_message, state).await
477 }
478
479 async fn handle_message_internal(
481 &self,
482 input: AgentMessage,
483 _state: Arc<AgentStateSnapshot>,
484 ) -> anyhow::Result<AgentMessage> {
485 let start_time = std::time::Instant::now();
486
487 self.emit_event(agents_core::events::AgentEvent::AgentStarted(
488 agents_core::events::AgentStartedEvent {
489 metadata: self.create_event_metadata(),
490 agent_name: self.descriptor.name.clone(),
491 message_preview: self.truncate_message(&input),
492 },
493 ));
494
495 self.append_history(input.clone());
496
497 let max_iterations = 10;
499 let mut iteration = 0;
500
501 loop {
502 iteration += 1;
503 if iteration > max_iterations {
504 tracing::warn!(
505 "β οΈ Max iterations ({}) reached, stopping ReAct loop",
506 max_iterations
507 );
508 let response = AgentMessage {
509 role: MessageRole::Agent,
510 content: MessageContent::Text(
511 "I've reached the maximum number of steps. Let me summarize what I've done so far.".to_string()
512 ),
513 metadata: None,
514 };
515 self.append_history(response.clone());
516 return Ok(response);
517 }
518
519 tracing::debug!("π ReAct iteration {}/{}", iteration, max_iterations);
520
521 let mut request = ModelRequest::new(&self.instructions, self.current_history());
523 let tools = self.collect_tools();
524 for middleware in &self.middlewares {
525 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
526 middleware.modify_model_request(&mut ctx).await?;
527 }
528
529 let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
530 let context = PlannerContext {
531 history: request.messages.clone(),
532 system_prompt: request.system_prompt.clone(),
533 tools: tool_schemas,
534 };
535 let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
536
537 let decision = self.planner.plan(context, state_snapshot).await?;
539
540 self.emit_event(agents_core::events::AgentEvent::PlanningComplete(
542 agents_core::events::PlanningCompleteEvent {
543 metadata: self.create_event_metadata(),
544 action_type: match &decision.next_action {
545 PlannerAction::Respond { .. } => "respond".to_string(),
546 PlannerAction::CallTool { .. } => "call_tool".to_string(),
547 PlannerAction::Terminate => "terminate".to_string(),
548 },
549 action_summary: match &decision.next_action {
550 PlannerAction::Respond { message } => {
551 format!("Respond: {}", self.truncate_message(message))
552 }
553 PlannerAction::CallTool { tool_name, .. } => {
554 format!("Call tool: {}", tool_name)
555 }
556 PlannerAction::Terminate => "Terminate".to_string(),
557 },
558 },
559 ));
560
561 match decision.next_action {
562 PlannerAction::Respond { message } => {
563 self.emit_event(agents_core::events::AgentEvent::AgentCompleted(
565 agents_core::events::AgentCompletedEvent {
566 metadata: self.create_event_metadata(),
567 agent_name: self.descriptor.name.clone(),
568 duration_ms: start_time.elapsed().as_millis() as u64,
569 response_preview: self.truncate_message(&message),
570 },
571 ));
572
573 self.append_history(message.clone());
574 return Ok(message);
575 }
576 PlannerAction::CallTool { tool_name, payload } => {
577 let tool_call_message = AgentMessage {
582 role: MessageRole::Agent,
583 content: MessageContent::Text(format!(
584 "Calling tool: {} with args: {}",
585 tool_name,
586 serde_json::to_string(&payload).unwrap_or_default()
587 )),
588 metadata: None,
589 };
590 self.append_history(tool_call_message);
591
592 if let Some(tool) = tools.get(&tool_name).cloned() {
593 let call_id = format!("call_{}", uuid::Uuid::new_v4());
595 for middleware in &self.middlewares {
596 if let Some(interrupt) = middleware
597 .before_tool_execution(&tool_name, &payload, &call_id)
598 .await?
599 {
600 {
602 let mut state_guard = self.state.write().map_err(|_| {
603 anyhow::anyhow!("Failed to acquire write lock on state")
604 })?;
605 state_guard.add_interrupt(interrupt.clone());
606 }
607
608 if let Some(checkpointer) = &self.checkpointer {
610 let state_clone = self
611 .state
612 .read()
613 .map_err(|_| {
614 anyhow::anyhow!("Failed to acquire read lock on state")
615 })?
616 .clone();
617 checkpointer
618 .save_state(&ThreadId::default(), &state_clone)
619 .await?;
620 }
621
622 let interrupt_message = AgentMessage {
624 role: MessageRole::System,
625 content: MessageContent::Text(format!(
626 "βΈοΈ Execution paused: Tool '{}' requires human approval",
627 tool_name
628 )),
629 metadata: None,
630 };
631 self.append_history(interrupt_message.clone());
632 return Ok(interrupt_message);
633 }
634 }
635
636 let tool_start_time = std::time::Instant::now();
638
639 self.emit_event(agents_core::events::AgentEvent::ToolStarted(
640 agents_core::events::ToolStartedEvent {
641 metadata: self.create_event_metadata(),
642 tool_name: tool_name.clone(),
643 input_summary: self.summarize_payload(&payload),
644 },
645 ));
646
647 tracing::warn!(
648 "βοΈ EXECUTING TOOL: {} with payload: {}",
649 tool_name,
650 serde_json::to_string(&payload)
651 .unwrap_or_else(|_| "invalid json".to_string())
652 );
653
654 let result = self
655 .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
656 .await;
657
658 let duration = tool_start_time.elapsed();
659 match result {
660 Ok(tool_result_message) => {
661 let content_preview = match &tool_result_message.content {
662 MessageContent::Text(t) => {
663 if t.len() > 100 {
664 format!("{}... ({} chars)", &t[..100], t.len())
665 } else {
666 t.clone()
667 }
668 }
669 MessageContent::Json(v) => {
670 format!("JSON: {} bytes", v.to_string().len())
671 }
672 };
673
674 self.emit_event(agents_core::events::AgentEvent::ToolCompleted(
675 agents_core::events::ToolCompletedEvent {
676 metadata: self.create_event_metadata(),
677 tool_name: tool_name.clone(),
678 duration_ms: duration.as_millis() as u64,
679 result_summary: content_preview.clone(),
680 success: true,
681 },
682 ));
683
684 tracing::warn!(
685 "β
TOOL COMPLETED: {} in {:?} - Result: {}",
686 tool_name,
687 duration,
688 content_preview
689 );
690
691 self.append_history(tool_result_message);
693 }
695 Err(e) => {
696 self.emit_event(agents_core::events::AgentEvent::ToolFailed(
697 agents_core::events::ToolFailedEvent {
698 metadata: self.create_event_metadata(),
699 tool_name: tool_name.clone(),
700 duration_ms: duration.as_millis() as u64,
701 error_message: e.to_string(),
702 is_recoverable: true,
703 retry_count: 0,
704 },
705 ));
706
707 tracing::error!(
708 "β TOOL FAILED: {} in {:?} - Error: {}",
709 tool_name,
710 duration,
711 e
712 );
713
714 let error_message = AgentMessage {
716 role: MessageRole::Tool,
717 content: MessageContent::Text(format!(
718 "Error executing {}: {}",
719 tool_name, e
720 )),
721 metadata: None,
722 };
723 self.append_history(error_message);
724 }
726 }
727 } else {
728 tracing::warn!("β οΈ Tool '{}' not found", tool_name);
730 let error_message = AgentMessage {
731 role: MessageRole::Tool,
732 content: MessageContent::Text(format!(
733 "Tool '{}' not found. Available tools: {}",
734 tool_name,
735 tools
736 .keys()
737 .map(|k| k.as_str())
738 .collect::<Vec<_>>()
739 .join(", ")
740 )),
741 metadata: None,
742 };
743 self.append_history(error_message);
744 }
746 }
747 PlannerAction::Terminate => {
748 tracing::debug!("π Agent terminated");
750 let message = AgentMessage {
751 role: MessageRole::Agent,
752 content: MessageContent::Text("Task completed.".into()),
753 metadata: None,
754 };
755 self.append_history(message.clone());
756 return Ok(message);
757 }
758 }
759 }
760 }
761}
762
763#[async_trait]
764impl AgentHandle for DeepAgent {
765 async fn describe(&self) -> AgentDescriptor {
766 self.descriptor.clone()
767 }
768
769 async fn handle_message(
770 &self,
771 input: AgentMessage,
772 _state: Arc<AgentStateSnapshot>,
773 ) -> anyhow::Result<AgentMessage> {
774 self.handle_message_internal(input, _state).await
775 }
776
777 async fn handle_message_stream(
778 &self,
779 input: AgentMessage,
780 _state: Arc<AgentStateSnapshot>,
781 ) -> anyhow::Result<agents_core::agent::AgentStream> {
782 use crate::planner::LlmBackedPlanner;
783 use agents_core::llm::{LlmRequest, StreamChunk};
784
785 self.append_history(input.clone());
787
788 let mut request = ModelRequest::new(&self.instructions, self.current_history());
790 let tools = self.collect_tools();
791
792 for middleware in &self.middlewares {
794 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
795 middleware.modify_model_request(&mut ctx).await?;
796 }
797
798 let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
800 let llm_request = LlmRequest {
801 system_prompt: request.system_prompt.clone(),
802 messages: request.messages.clone(),
803 tools: tool_schemas,
804 };
805
806 let planner_any = self.planner.as_any();
808
809 if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
810 let model = llm_planner.model().clone();
812 let stream = model.generate_stream(llm_request).await?;
813 Ok(stream)
814 } else {
815 let response = self.handle_message_internal(input, _state).await?;
817 Ok(Box::pin(futures::stream::once(async move {
818 Ok(StreamChunk::Done { message: response })
819 })))
820 }
821 }
822
823 async fn current_interrupt(&self) -> anyhow::Result<Option<AgentInterrupt>> {
824 let state_guard = self
825 .state
826 .read()
827 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on state"))?;
828 Ok(state_guard.pending_interrupts.first().cloned())
829 }
830
831 async fn resume_with_approval(
832 &self,
833 action: agents_core::hitl::HitlAction,
834 ) -> anyhow::Result<AgentMessage> {
835 self.resume_with_approval(action).await
836 }
837}
838
839pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
844 let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
845 let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
846
847 let planning = Arc::new(PlanningMiddleware::new(state.clone()));
848 let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
849
850 let mut registrations: Vec<SubAgentRegistration> = Vec::new();
852
853 for subagent_config in &config.subagent_configs {
855 let sub_planner = if let Some(ref model) = subagent_config.model {
857 Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
859 } else {
860 config.planner.clone()
862 };
863
864 let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
866
867 if let Some(ref tools) = subagent_config.tools {
869 for tool in tools {
870 sub_cfg = sub_cfg.with_tool(tool.clone());
871 }
872 }
873
874 if let Some(ref builtin) = subagent_config.builtin_tools {
876 sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
877 }
878
879 sub_cfg = sub_cfg.with_auto_general_purpose(false);
881
882 sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
884
885 sub_cfg = sub_cfg.with_pii_sanitization(config.enable_pii_sanitization);
887
888 let sub_agent = create_deep_agent_from_config(sub_cfg);
890
891 registrations.push(SubAgentRegistration {
893 descriptor: SubAgentDescriptor {
894 name: subagent_config.name.clone(),
895 description: subagent_config.description.clone(),
896 },
897 agent: Arc::new(sub_agent),
898 });
899 }
900
901 if config.auto_general_purpose {
903 let has_gp = registrations
904 .iter()
905 .any(|r| r.descriptor.name == "general-purpose");
906 if !has_gp {
907 let mut sub_cfg =
909 DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
910 .with_auto_general_purpose(false)
911 .with_prompt_caching(config.enable_prompt_caching)
912 .with_pii_sanitization(config.enable_pii_sanitization);
913 if let Some(ref selected) = config.builtin_tools {
914 sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
915 }
916 if let Some(ref sum) = config.summarization {
917 sub_cfg = sub_cfg.with_summarization(sum.clone());
918 }
919 for t in &config.tools {
920 sub_cfg = sub_cfg.with_tool(t.clone());
921 }
922
923 let gp = create_deep_agent_from_config(sub_cfg);
924 registrations.push(SubAgentRegistration {
925 descriptor: SubAgentDescriptor {
926 name: "general-purpose".into(),
927 description: "Default reasoning agent".into(),
928 },
929 agent: Arc::new(gp),
930 });
931 }
932 }
933
934 let subagent = Arc::new(SubAgentMiddleware::new_with_events(
935 registrations,
936 config.event_dispatcher.clone(),
937 ));
938 let base_prompt = Arc::new(BaseSystemPromptMiddleware);
939 let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
940 let summarization = config.summarization.as_ref().map(|cfg| {
941 Arc::new(SummarizationMiddleware::new(
942 cfg.messages_to_keep,
943 cfg.summary_note.clone(),
944 ))
945 });
946 let hitl = if config.tool_interrupts.is_empty() {
947 None
948 } else {
949 if config.checkpointer.is_none() {
951 tracing::error!(
952 "β οΈ HITL middleware requires a checkpointer to persist interrupt state. \
953 HITL will be disabled. Please configure a checkpointer to enable HITL."
954 );
955 None
956 } else {
957 tracing::info!("π HITL enabled for {} tools", config.tool_interrupts.len());
958 Some(Arc::new(HumanInLoopMiddleware::new(
959 config.tool_interrupts.clone(),
960 )))
961 }
962 };
963
964 let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
967 base_prompt,
968 deep_agent_prompt,
969 planning,
970 filesystem,
971 subagent,
972 ];
973 if let Some(ref summary) = summarization {
974 middlewares.push(summary.clone());
975 }
976 if config.enable_prompt_caching {
977 middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
978 }
979 if let Some(ref hitl_mw) = hitl {
980 middlewares.push(hitl_mw.clone());
981 }
982
983 DeepAgent {
984 descriptor: AgentDescriptor {
985 name: "deep-agent".into(),
986 version: "0.0.1".into(),
987 description: Some("Rust deep agent".into()),
988 },
989 instructions: config.instructions,
990 planner: config.planner,
991 middlewares,
992 base_tools: config.tools,
993 state,
994 history,
995 _summarization: summarization,
996 _hitl: hitl,
997 builtin_tools: config.builtin_tools,
998 checkpointer: config.checkpointer,
999 event_dispatcher: config.event_dispatcher,
1000 enable_pii_sanitization: config.enable_pii_sanitization,
1001 }
1002}