1use super::config::DeepAgentConfig;
7use crate::middleware::{
8 AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9 FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext, ModelRequest,
10 PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11 SummarizationMiddleware,
12};
13use crate::planner::LlmBackedPlanner;
14use agents_core::agent::{
15 AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction, HitlInterrupt};
18use agents_core::messaging::{AgentMessage, MessageContent, MessageMetadata, MessageRole};
19use agents_core::persistence::{Checkpointer, ThreadId};
20use agents_core::state::AgentStateSnapshot;
21use agents_core::tools::{ToolBox, ToolContext, ToolResult};
22use async_trait::async_trait;
23use serde_json::Value;
24use std::collections::{HashMap, HashSet};
25use std::sync::{Arc, RwLock};
26
27const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
29
30pub struct DeepAgent {
37 descriptor: AgentDescriptor,
38 instructions: String,
39 planner: Arc<dyn PlannerHandle>,
40 middlewares: Vec<Arc<dyn AgentMiddleware>>,
41 base_tools: Vec<ToolBox>,
42 state: Arc<RwLock<AgentStateSnapshot>>,
43 history: Arc<RwLock<Vec<AgentMessage>>>,
44 _summarization: Option<Arc<SummarizationMiddleware>>,
45 hitl: Option<Arc<HumanInLoopMiddleware>>,
46 pending_hitl: Arc<RwLock<Option<HitlPending>>>,
47 builtin_tools: Option<HashSet<String>>,
48 checkpointer: Option<Arc<dyn Checkpointer>>,
49}
50
51struct HitlPending {
52 tool_name: String,
53 payload: Value,
54 tool: ToolBox,
55 message: AgentMessage,
56}
57
58impl DeepAgent {
59 fn collect_tools(&self) -> HashMap<String, ToolBox> {
60 let mut tools: HashMap<String, ToolBox> = HashMap::new();
61 for tool in &self.base_tools {
62 tools.insert(tool.schema().name.clone(), tool.clone());
63 }
64 for middleware in &self.middlewares {
65 for tool in middleware.tools() {
66 let tool_name = tool.schema().name.clone();
67 if self.should_include(&tool_name) {
68 tools.insert(tool_name, tool);
69 }
70 }
71 }
72 tools
73 }
74 fn should_include(&self, name: &str) -> bool {
77 let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
78 if !is_builtin {
79 return true;
80 }
81 match &self.builtin_tools {
82 None => true,
83 Some(selected) => selected.contains(name),
84 }
85 }
86
87 fn append_history(&self, message: AgentMessage) {
88 if let Ok(mut history) = self.history.write() {
89 history.push(message);
90 }
91 }
92
93 fn current_history(&self) -> Vec<AgentMessage> {
94 self.history.read().map(|h| h.clone()).unwrap_or_default()
95 }
96
97 pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
99 if let Some(ref checkpointer) = self.checkpointer {
100 let state = self
101 .state
102 .read()
103 .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
104 .clone();
105 checkpointer.save_state(thread_id, &state).await
106 } else {
107 tracing::warn!("Attempted to save state but no checkpointer is configured");
108 Ok(())
109 }
110 }
111
112 pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
114 if let Some(ref checkpointer) = self.checkpointer {
115 if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
116 *self
117 .state
118 .write()
119 .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
120 tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
121 Ok(true)
122 } else {
123 tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
124 Ok(false)
125 }
126 } else {
127 tracing::warn!("Attempted to load state but no checkpointer is configured");
128 Ok(false)
129 }
130 }
131
132 pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
134 if let Some(ref checkpointer) = self.checkpointer {
135 checkpointer.delete_thread(thread_id).await
136 } else {
137 tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
138 Ok(())
139 }
140 }
141
142 pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
144 if let Some(ref checkpointer) = self.checkpointer {
145 checkpointer.list_threads().await
146 } else {
147 Ok(Vec::new())
148 }
149 }
150
151 async fn execute_tool(
152 &self,
153 tool: ToolBox,
154 _tool_name: String,
155 payload: Value,
156 ) -> anyhow::Result<AgentMessage> {
157 let state_snapshot = self.state.read().unwrap().clone();
158 let ctx = ToolContext::with_mutable_state(Arc::new(state_snapshot), self.state.clone());
159
160 let result = tool.execute(payload, ctx).await?;
161 Ok(self.apply_tool_result(result))
162 }
163
164 fn apply_tool_result(&self, result: ToolResult) -> AgentMessage {
165 match result {
166 ToolResult::Message(message) => {
167 self.append_history(message.clone());
168 message
169 }
170 ToolResult::WithStateUpdate {
171 message,
172 state_diff,
173 } => {
174 if let Ok(mut state) = self.state.write() {
175 let command = agents_core::command::Command::with_state(state_diff);
176 command.apply_to(&mut state);
177 }
178 self.append_history(message.clone());
179 message
180 }
181 }
182 }
183
184 pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
185 self.pending_hitl.read().ok().and_then(|guard| {
186 guard.as_ref().map(|pending| {
187 AgentInterrupt::HumanInLoop(HitlInterrupt {
188 tool_name: pending.tool_name.clone(),
189 message: pending.message.clone(),
190 })
191 })
192 })
193 }
194
195 pub async fn resume_hitl(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
196 let pending = self
197 .pending_hitl
198 .write()
199 .ok()
200 .and_then(|mut guard| guard.take())
201 .ok_or_else(|| anyhow::anyhow!("No pending HITL action"))?;
202 match action {
203 HitlAction::Approve => {
204 let result = self
205 .execute_tool(
206 pending.tool.clone(),
207 pending.tool_name.clone(),
208 pending.payload.clone(),
209 )
210 .await?;
211 Ok(result)
212 }
213 HitlAction::Reject { reason } => {
214 let text =
215 reason.unwrap_or_else(|| "Tool execution rejected by human reviewer.".into());
216 let message = AgentMessage {
217 role: MessageRole::System,
218 content: MessageContent::Text(text),
219 metadata: None,
220 };
221 self.append_history(message.clone());
222 Ok(message)
223 }
224 HitlAction::Respond { message } => {
225 self.append_history(message.clone());
226 Ok(message)
227 }
228 HitlAction::Edit { action, args } => {
229 let tools = self.collect_tools();
231 if let Some(tool) = tools.get(&action).cloned() {
232 let result = self.execute_tool(tool, action, args).await?;
233 Ok(result)
234 } else {
235 Ok(AgentMessage {
236 role: MessageRole::System,
237 content: MessageContent::Text(format!(
238 "Edited tool '{}' not available",
239 action
240 )),
241 metadata: None,
242 })
243 }
244 }
245 }
246 }
247
248 pub async fn handle_message(
250 &self,
251 input: impl AsRef<str>,
252 state: Arc<AgentStateSnapshot>,
253 ) -> anyhow::Result<AgentMessage> {
254 self.handle_message_with_metadata(input, None, state).await
255 }
256
257 pub async fn handle_message_with_metadata(
259 &self,
260 input: impl AsRef<str>,
261 metadata: Option<MessageMetadata>,
262 state: Arc<AgentStateSnapshot>,
263 ) -> anyhow::Result<AgentMessage> {
264 let agent_message = AgentMessage {
265 role: MessageRole::User,
266 content: MessageContent::Text(input.as_ref().to_string()),
267 metadata,
268 };
269 self.handle_message_internal(agent_message, state).await
270 }
271
272 async fn handle_message_internal(
274 &self,
275 input: AgentMessage,
276 _state: Arc<AgentStateSnapshot>,
277 ) -> anyhow::Result<AgentMessage> {
278 self.append_history(input.clone());
279
280 let mut request = ModelRequest::new(&self.instructions, self.current_history());
281 let tools = self.collect_tools();
282 for middleware in &self.middlewares {
283 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
284 middleware.modify_model_request(&mut ctx).await?;
285 }
286
287 let context = PlannerContext {
288 history: request.messages.clone(),
289 system_prompt: request.system_prompt.clone(),
290 };
291 let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
292
293 let decision = self.planner.plan(context, state_snapshot).await?;
294
295 match decision.next_action {
296 PlannerAction::Respond { message } => {
297 self.append_history(message.clone());
298 Ok(message)
299 }
300 PlannerAction::CallTool { tool_name, payload } => {
301 if let Some(tool) = tools.get(&tool_name).cloned() {
302 if let Some(hitl) = &self.hitl {
303 if let Some(policy) = hitl.requires_approval(&tool_name) {
304 let message_text = policy
305 .note
306 .clone()
307 .unwrap_or_else(|| "Awaiting human approval.".into());
308 let approval_message = AgentMessage {
309 role: MessageRole::System,
310 content: MessageContent::Text(format!(
311 "HITL_REQUIRED: Tool '{tool}' requires approval: {message}",
312 tool = tool_name,
313 message = message_text
314 )),
315 metadata: None,
316 };
317 let pending = HitlPending {
318 tool_name: tool_name.clone(),
319 payload: payload.clone(),
320 tool: tool.clone(),
321 message: approval_message.clone(),
322 };
323 if let Ok(mut guard) = self.pending_hitl.write() {
324 *guard = Some(pending);
325 }
326 self.append_history(approval_message.clone());
327 return Ok(approval_message);
328 }
329 }
330 self.execute_tool(tool.clone(), tool_name.clone(), payload.clone())
331 .await
332 } else {
333 Ok(AgentMessage {
334 role: MessageRole::Tool,
335 content: MessageContent::Text(format!(
336 "Tool '{tool}' not available",
337 tool = tool_name
338 )),
339 metadata: Some(MessageMetadata {
340 tool_call_id: None,
341 cache_control: None,
342 }),
343 })
344 }
345 }
346 PlannerAction::Terminate => Ok(AgentMessage {
347 role: MessageRole::Agent,
348 content: MessageContent::Text("Terminating conversation.".into()),
349 metadata: Some(MessageMetadata {
350 tool_call_id: None,
351 cache_control: None,
352 }),
353 }),
354 }
355 }
356}
357
358#[async_trait]
359impl AgentHandle for DeepAgent {
360 async fn describe(&self) -> AgentDescriptor {
361 self.descriptor.clone()
362 }
363
364 async fn handle_message(
365 &self,
366 input: AgentMessage,
367 _state: Arc<AgentStateSnapshot>,
368 ) -> anyhow::Result<AgentMessage> {
369 self.handle_message_internal(input, _state).await
370 }
371
372 async fn handle_message_stream(
373 &self,
374 input: AgentMessage,
375 _state: Arc<AgentStateSnapshot>,
376 ) -> anyhow::Result<agents_core::agent::AgentStream> {
377 use crate::planner::LlmBackedPlanner;
378 use agents_core::llm::{LlmRequest, StreamChunk};
379
380 self.append_history(input.clone());
382
383 let mut request = ModelRequest::new(&self.instructions, self.current_history());
385 let tools = self.collect_tools();
386
387 for middleware in &self.middlewares {
389 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
390 middleware.modify_model_request(&mut ctx).await?;
391 }
392
393 let tool_schemas: Vec<_> = tools.values().map(|t| t.schema()).collect();
395 let llm_request = LlmRequest {
396 system_prompt: request.system_prompt.clone(),
397 messages: request.messages.clone(),
398 tools: tool_schemas,
399 };
400
401 let planner_any = self.planner.as_any();
403
404 if let Some(llm_planner) = planner_any.downcast_ref::<LlmBackedPlanner>() {
405 let model = llm_planner.model().clone();
407 let stream = model.generate_stream(llm_request).await?;
408 Ok(stream)
409 } else {
410 let response = self.handle_message_internal(input, _state).await?;
412 Ok(Box::pin(futures::stream::once(async move {
413 Ok(StreamChunk::Done { message: response })
414 })))
415 }
416 }
417}
418
419pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
424 let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
425 let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
426
427 let planning = Arc::new(PlanningMiddleware::new(state.clone()));
428 let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
429
430 let mut registrations: Vec<SubAgentRegistration> = Vec::new();
432
433 for subagent_config in &config.subagent_configs {
435 let sub_planner = if let Some(ref model) = subagent_config.model {
437 Arc::new(LlmBackedPlanner::new(model.clone())) as Arc<dyn PlannerHandle>
439 } else {
440 config.planner.clone()
442 };
443
444 let mut sub_cfg = DeepAgentConfig::new(subagent_config.instructions.clone(), sub_planner);
446
447 if let Some(ref tools) = subagent_config.tools {
449 for tool in tools {
450 sub_cfg = sub_cfg.with_tool(tool.clone());
451 }
452 }
453
454 if let Some(ref builtin) = subagent_config.builtin_tools {
456 sub_cfg = sub_cfg.with_builtin_tools(builtin.iter().cloned());
457 }
458
459 sub_cfg = sub_cfg.with_auto_general_purpose(false);
461
462 sub_cfg = sub_cfg.with_prompt_caching(subagent_config.enable_prompt_caching);
464
465 let sub_agent = create_deep_agent_from_config(sub_cfg);
467
468 registrations.push(SubAgentRegistration {
470 descriptor: SubAgentDescriptor {
471 name: subagent_config.name.clone(),
472 description: subagent_config.description.clone(),
473 },
474 agent: Arc::new(sub_agent),
475 });
476 }
477
478 if config.auto_general_purpose {
480 let has_gp = registrations
481 .iter()
482 .any(|r| r.descriptor.name == "general-purpose");
483 if !has_gp {
484 let mut sub_cfg =
486 DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
487 .with_auto_general_purpose(false)
488 .with_prompt_caching(config.enable_prompt_caching);
489 if let Some(ref selected) = config.builtin_tools {
490 sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
491 }
492 if let Some(ref sum) = config.summarization {
493 sub_cfg = sub_cfg.with_summarization(sum.clone());
494 }
495 for t in &config.tools {
496 sub_cfg = sub_cfg.with_tool(t.clone());
497 }
498
499 let gp = create_deep_agent_from_config(sub_cfg);
500 registrations.push(SubAgentRegistration {
501 descriptor: SubAgentDescriptor {
502 name: "general-purpose".into(),
503 description: "Default reasoning agent".into(),
504 },
505 agent: Arc::new(gp),
506 });
507 }
508 }
509
510 let subagent = Arc::new(SubAgentMiddleware::new(registrations));
511 let base_prompt = Arc::new(BaseSystemPromptMiddleware);
512 let summarization = config.summarization.as_ref().map(|cfg| {
513 Arc::new(SummarizationMiddleware::new(
514 cfg.messages_to_keep,
515 cfg.summary_note.clone(),
516 ))
517 });
518 let hitl = if config.tool_interrupts.is_empty() {
519 None
520 } else {
521 Some(Arc::new(HumanInLoopMiddleware::new(
522 config.tool_interrupts.clone(),
523 )))
524 };
525
526 let mut middlewares: Vec<Arc<dyn AgentMiddleware>> =
528 vec![base_prompt, planning, filesystem, subagent];
529 if let Some(ref summary) = summarization {
530 middlewares.push(summary.clone());
531 }
532 if config.enable_prompt_caching {
533 middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
534 }
535 if let Some(ref hitl_mw) = hitl {
536 middlewares.push(hitl_mw.clone());
537 }
538
539 DeepAgent {
540 descriptor: AgentDescriptor {
541 name: "deep-agent".into(),
542 version: "0.0.1".into(),
543 description: Some("Rust deep agent".into()),
544 },
545 instructions: config.instructions,
546 planner: config.planner,
547 middlewares,
548 base_tools: config.tools,
549 state,
550 history,
551 _summarization: summarization,
552 hitl,
553 pending_hitl: Arc::new(RwLock::new(None)),
554 builtin_tools: config.builtin_tools,
555 checkpointer: config.checkpointer,
556 }
557}