1use super::config::DeepAgentConfig;
7use crate::middleware::{
8 AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9 DeepAgentPromptMiddleware, FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext,
10 ModelRequest, PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11 SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15 AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction, HitlInterrupt};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30pub struct DeepAgent {
37 descriptor: AgentDescriptor,
38 instructions: String,
39 planner: Arc<dyn PlannerHandle>,
40 middlewares: Vec<Arc<dyn AgentMiddleware>>,
41 base_tools: Vec<ToolBox>,
42 state: Arc<RwLock<AgentStateSnapshot>>,
43 history: Arc<RwLock<Vec<AgentMessage>>>,
44 _summarization: Option<Arc<SummarizationMiddleware>>,
45 hitl: Option<Arc<HumanInLoopMiddleware>>,
46 pending_hitl: Arc<RwLock<Option<HitlPending>>>,
47 builtin_tools: Option<HashSet<String>>,
48 checkpointer: Option<Arc<dyn Checkpointer>>,
49}
50
51struct HitlPending {
52 tool_name: String,
53 payload: Value,
54 tool: ToolBox,
55 message: AgentMessage,
56}
57
58impl DeepAgent {
59 fn collect_tools(&self) -> HashMap<String, ToolBox> {
60 let mut tools: HashMap<String, ToolBox> = HashMap::new();
61 for tool in &self.base_tools {
62 tools.insert(tool.schema().name.clone(), tool.clone());
63 }
64 for middleware in &self.middlewares {
65 for tool in middleware.tools() {
66 let tool_name = tool.schema().name.clone();
67 if self.should_include(&tool_name) {
68 tools.insert(tool_name, tool);
69 }
70 }
71 }
72 tools
73 }
74 fn should_include(&self, name: &str) -> bool {
77 let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
78 if !is_builtin {
79 return true;
80 }
81 match &self.builtin_tools {
82 None => true,
83 Some(selected) => selected.contains(name),
84 }
85 }
86
87 fn append_history(&self, message: AgentMessage) {
88 if let Ok(mut history) = self.history.write() {
89 history.push(message);
90 }
91 }
92
93 fn current_history(&self) -> Vec<AgentMessage> {
94 self.history.read().map(|h| h.clone()).unwrap_or_default()
95 }
96
97 pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
99 if let Some(ref checkpointer) = self.checkpointer {
100 let state = self
101 .state
102 .read()
103 .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
104 .clone();
105 checkpointer.save_state(thread_id, &state).await
106 } else {
107 tracing::warn!("Attempted to save state but no checkpointer is configured");
108 Ok(())
109 }
110 }
111
112 pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
114 if let Some(ref checkpointer) = self.checkpointer {
115 if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
116 *self
117 .state
118 .write()
119 .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
120 tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
121 Ok(true)
122 } else {
123 tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
124 Ok(false)
125 }
126 } else {
127 tracing::warn!("Attempted to load state but no checkpointer is configured");
128 Ok(false)
129 }
130 }
131
132 pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
134 if let Some(ref checkpointer) = self.checkpointer {
135 checkpointer.delete_thread(thread_id).await
136 } else {
137 tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
138 Ok(())
139 }
140 }
141
142 pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
144 if let Some(ref checkpointer) = self.checkpointer {
145 checkpointer.list_threads().await
146 } else {
147 Ok(Vec::new())
148 }
149 }
150
151 async fn execute_tool(
152 &self,
153 tool: ToolBox,
154 _tool_name: String,
155 payload: Value,
156 ) -> anyhow::Result<AgentMessage> {
157 let state_snapshot = self.state.read().unwrap().clone();
158 let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
159
160 let result = tool.execute(payload, ctx).await?;
161 Ok(self.apply_tool_result(result))
162 }
163
164 fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
165 match result {
166 ToolResult::Message(message) => {
167 message
170 }
171 ToolResult::WithStateUpdate {
172 message,
173 state_diff,
174 } => {
175 if let Ok(mut state) = self.state.write() {
176 let command = agents_core::command::Command::with_state(state_diff);
177 command.apply_to(&mut state);
178 }
179 message
182 }
183 }
184 }
185
186 pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
187 self.pending_hitl.read().ok().and_then(|guard| {
188 guard.as_ref().map(|pending| {
189 AgentInterrupt::HumanInLoop(HitlInterrupt {
190 tool_name: pending.tool_name.clone(),
191 message: pending.message.clone(),
192 })
193 })
194 })
195 }
196
197 pub async fn resume_hitl(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
198 let pending = self
199 .pending_hitl
200 .write()
201 .ok()
202 .and_then(|mut guard| guard.take())
203 .ok_or_else(|| anyhow::anyhow!("No pending HITL action"))?;
204 match action {
205 HitlAction::Approve => {
206 let result = self
207 .execute_tool(
208 pending.tool.clone(),
209 pending.tool_name.clone(),
210 pending.payload.clone(),
211 )
212 .await?;
213 Ok(result)
214 }
215 HitlAction::Reject { reason } => {
216 let text =
217 reason.unwrap_or_else(|| "Tool execution rejected by human reviewer.".into());
218 let message = AgentMessage {
219 role: MessageRole::System,
220 content: MessageContent::Text(text),
221 metadata: None,
222 };
223 self.append_history(message.clone());
224 Ok(message)
225 }
226 HitlAction::Respond { message } => {
227 self.append_history(message.clone());
228 Ok(message)
229 }
230 HitlAction::Edit { action, args } => {
231 let tools = self.collect_tools();
233 if let Some(tool) = tools.get(&action).cloned() {
234 let result = self.execute_tool(tool, action, args).await?;
235 Ok(result)
236 } else {
237 Ok(AgentMessage {
238 role: MessageRole::System,
239 content: MessageContent::Text(format!(
240 "Edited tool '{}' not available",
241 action
242 )),
243 metadata: None,
244 })
245 }
246 }
247 }
248 }
249
250 pub async fn handle_message(
252 &self,
253 input: impl AsRef<str>,
254 state: Arc<AgentStateSnapshot>,
255 ) -> anyhow::Result<AgentMessage> {
256 self.handle_message_with_metadata(input, None, state).await
257 }
258
259 pub async fn handle_message_with_metadata(
261 &self,
262 input: impl AsRef<str>,
263 metadata: Option<MessageMetadata>,
264 state: Arc<AgentStateSnapshot>,
265 ) -> anyhow::Result<AgentMessage> {
266 let agent_message = AgentMessage {
267 role: MessageRole::User,
268 content: MessageContent::Text(input.as_ref().to_string()),
269 metadata,
270 };
271 self.handle_message_internal(agent_message, state).await
272 }
273
274 async fn handle_message_internal(
276 &self,
277 input: AgentMessage,
278 _state: Arc<AgentStateSnapshot>,
279 ) -> anyhow::Result<AgentMessage> {
280 self.append_history(input.clone());
281
282 let mut request = ModelRequest::new(&self.instructions, self.current_history());
284 let tools = self.collect_tools();
285 for middleware in &self.middlewares {
286 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
287 middleware.modify_model_request(&mut ctx).await?;
288 }
289
290 let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
291 let context = PlannerContext {
292 history: request.messages.clone(),
293 system_prompt: request.system_prompt.clone(),
294 tools: tool_schemas,
295 };
296 let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
297
298 let decision = self.planner.plan(context, state_snapshot).await?;
300
301 match decision.next_action {
302 PlannerAction::Respond { message } => {
303 self.append_history(message.clone());
304 Ok(message)
305 }
306 PlannerAction::CallTool { tool_name, payload } => {
307 if let Some(tool) = tools.get(&tool_name).cloned() {
308 if let Some(hitl) = &self.hitl {
310 if let Some(policy) = hitl.requires_approval(&tool_name) {
311 let message_text = policy
312 .note
313 .clone()
314 .unwrap_or_else(|| "Awaiting human approval.".into());
315 let approval_message = AgentMessage {
316 role: MessageRole::System,
317 content: MessageContent::Text(format!(
318 "HITL_REQUIRED: Tool '{tool}' requires approval: {message}",
319 tool = tool_name,
320 message = message_text
321 )),
322 metadata: None,
323 };
324 let pending = HitlPending {
325 tool_name: tool_name.clone(),
326 payload: payload.clone(),
327 tool: tool.clone(),
328 message: approval_message.clone(),
329 };
330 if let Ok(mut guard) = self.pending_hitl.write() {
331 *guard = Some(pending);
332 }
333 self.append_history(approval_message.clone());
334 return Ok(approval_message);
335 }
336 }
337
338 let start_time = std::time::Instant::now();
340 tracing::warn!(
341 "⚙️ EXECUTING TOOL: {} with payload: {}",
342 tool_name,
343 serde_json::to_string(&payload)
344 .unwrap_or_else(|_| "invalid json".to_string())
345 );
346
347 let result = self
348 .execute_tool(tool.clone(), tool_name.clone(), payload.clone())
349 .await;
350
351 let duration = start_time.elapsed();
352 match result {
353 Ok(tool_result_message) => {
354 let content_preview = match &tool_result_message.content {
355 MessageContent::Text(t) => {
356 if t.len() > 100 {
357 format!("{}... ({} chars)", &t[..100], t.len())
358 } else {
359 t.clone()
360 }
361 }
362 MessageContent::Json(v) => {
363 format!("JSON: {} bytes", v.to_string().len())
364 }
365 };
366 tracing::warn!(
367 "✅ TOOL COMPLETED: {} in {:?} - Result: {}",
368 tool_name,
369 duration,
370 content_preview
371 );
372
373 let natural_response = match &tool_result_message.content {
376 MessageContent::Text(text) => {
377 if text.is_empty() {
378 format!(
379 "I've executed the {} tool successfully.",
380 tool_name
381 )
382 } else {
383 text.clone()
385 }
386 }
387 MessageContent::Json(json) => {
388 format!("Tool result: {}", json)
389 }
390 };
391
392 let response = AgentMessage {
393 role: MessageRole::Agent,
394 content: MessageContent::Text(natural_response),
395 metadata: None,
396 };
397 self.append_history(response.clone());
398 Ok(response)
399 }
400 Err(e) => {
401 tracing::error!(
402 "❌ TOOL FAILED: {} in {:?} - Error: {}",
403 tool_name,
404 duration,
405 e
406 );
407
408 let error_response = AgentMessage {
410 role: MessageRole::Agent,
411 content: MessageContent::Text(format!(
412 "I encountered an error while executing {}: {}",
413 tool_name, e
414 )),
415 metadata: None,
416 };
417 self.append_history(error_response.clone());
418 Ok(error_response)
419 }
420 }
421 } else {
422 tracing::warn!("⚠️ Tool '{}' not found", tool_name);
424 let error_response = AgentMessage {
425 role: MessageRole::Agent,
426 content: MessageContent::Text(format!(
427 "I don't have access to the '{}' tool.",
428 tool_name
429 )),
430 metadata: None,
431 };
432 self.append_history(error_response.clone());
433 Ok(error_response)
434 }
435 }
436 PlannerAction::Terminate => {
437 tracing::debug!("🛑 Agent terminated");
438 let message = AgentMessage {
439 role: MessageRole::Agent,
440 content: MessageContent::Text("Task completed.".into()),
441 metadata: None,
442 };
443 self.append_history(message.clone());
444 Ok(message)
445 }
446 }
447 }
448}
449
450#[async_trait]
451impl AgentHandle for DeepAgent {
452 async fn describe(&self) -> AgentDescriptor {
453 self.descriptor.clone()
454 }
455
456 async fn handle_message(
457 &self,
458 input: AgentMessage,
459 _state: Arc<AgentStateSnapshot>,
460 ) -> anyhow::Result<AgentMessage> {
461 self.handle_message_internal(input, _state).await
462 }
463
464 async fn handle_message_stream(
465 &self,
466 input: AgentMessage,
467 _state: Arc<AgentStateSnapshot>,
468 ) -> anyhow::Result<agents_core::agent::AgentStream> {
469 use crate::planner::LlmBackedPlanner;
470 use agents_core::llm::{LlmRequest, StreamChunk};
471
472 self.append_history(input.clone());
474
475 let mut request = ModelRequest::new(&self.instructions, self.current_history());
477 let tools = self.collect_tools();
478
479 for middleware in &self.middlewares {
481 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
482 middleware.modify_model_request(&mut ctx).await?;
483 }
484
485 let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
487 let llm_request = LlmRequest {
488 system_prompt: request.system_prompt.clone(),
489 messages: request.messages.clone(),
490 tools: tool_schemas,
491 };
492
493 let planner_any = self.planner.as_any();
495
496 if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
497 let model = llm_planner.model().clone();
499 let stream = model.generate_stream(llm_request).await?;
500 Ok(stream)
501 } else {
502 let response = self.handle_message_internal(input, _state).await?;
504 Ok(Box::pin(futures::stream::once(async move {
505 Ok(StreamChunk::Done { message: response })
506 })))
507 }
508 }
509}
510
511pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
516 let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
517 let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
518
519 let planning = Arc::new(PlanningMiddleware::new(state.clone()));
520 let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
521
522 let mut registrations: Vec<SubAgentRegistration> = Vec::new();
524
525 for subagent_config in &config.subagent_configs {
527 let sub_planner = if let Some(ref model) = subagent_config.model {
529 Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
531 } else {
532 config.planner.clone()
534 };
535
536 let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
538
539 if let Some(ref tools) = subagent_config.tools {
541 for tool in tools {
542 sub_cfg = sub_cfg.with_tool(tool.clone());
543 }
544 }
545
546 if let Some(ref builtin) = subagent_config.builtin_tools {
548 sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
549 }
550
551 sub_cfg = sub_cfg.with_auto_general_purpose(false);
553
554 sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
556
557 let sub_agent = create_deep_agent_from_config(sub_cfg);
559
560 registrations.push(SubAgentRegistration {
562 descriptor: SubAgentDescriptor {
563 name: subagent_config.name.clone(),
564 description: subagent_config.description.clone(),
565 },
566 agent: Arc::new(sub_agent),
567 });
568 }
569
570 if config.auto_general_purpose {
572 let has_gp = registrations
573 .iter()
574 .any(|r| r.descriptor.name == "general-purpose");
575 if !has_gp {
576 let mut sub_cfg =
578 DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
579 .with_auto_general_purpose(false)
580 .with_prompt_caching(config.enable_prompt_caching);
581 if let Some(ref selected) = config.builtin_tools {
582 sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
583 }
584 if let Some(ref sum) = config.summarization {
585 sub_cfg = sub_cfg.with_summarization(sum.clone());
586 }
587 for t in &config.tools {
588 sub_cfg = sub_cfg.with_tool(t.clone());
589 }
590
591 let gp = create_deep_agent_from_config(sub_cfg);
592 registrations.push(SubAgentRegistration {
593 descriptor: SubAgentDescriptor {
594 name: "general-purpose".into(),
595 description: "Default reasoning agent".into(),
596 },
597 agent: Arc::new(gp),
598 });
599 }
600 }
601
602 let subagent = Arc::new(SubAgentMiddleware::new(registrations));
603 let base_prompt = Arc::new(BaseSystemPromptMiddleware);
604 let deep_agent_prompt = Arc::new(DeepAgentPromptMiddleware::new(config.instructions.clone()));
605 let summarization = config.summarization.as_ref().map(|cfg| {
606 Arc::new(SummarizationMiddleware::new(
607 cfg.messages_to_keep,
608 cfg.summary_note.clone(),
609 ))
610 });
611 let hitl = if config.tool_interrupts.is_empty() {
612 None
613 } else {
614 Some(Arc::new(HumanInLoopMiddleware::new(
615 config.tool_interrupts.clone(),
616 )))
617 };
618
619 let mut middlewares: Vec<Arc<dyn AgentMiddleware>> = vec![
622 base_prompt,
623 deep_agent_prompt,
624 planning,
625 filesystem,
626 subagent,
627 ];
628 if let Some(ref summary) = summarization {
629 middlewares.push(summary.clone());
630 }
631 if config.enable_prompt_caching {
632 middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
633 }
634 if let Some(ref hitl_mw) = hitl {
635 middlewares.push(hitl_mw.clone());
636 }
637
638 DeepAgent {
639 descriptor: AgentDescriptor {
640 name: "deep-agent".into(),
641 version: "0.0.1".into(),
642 description: Some("Rust deep agent".into()),
643 },
644 instructions: config.instructions,
645 planner: config.planner,
646 middlewares,
647 base_tools: config.tools,
648 state,
649 history,
650 _summarization: summarization,
651 hitl,
652 pending_hitl: Arc::new(RwLock::new(None)),
653 builtin_tools: config.builtin_tools,
654 checkpointer: config.checkpointer,
655 }
656}