1use super::config::DeepAgentConfig;
7use crate::middleware::{
8 AgentMiddleware, AnthropicPromptCachingMiddleware, BaseSystemPromptMiddleware,
9 FilesystemMiddleware, HumanInLoopMiddleware, MiddlewareContext, ModelRequest,
10 PlanningMiddleware, SubAgentDescriptor, SubAgentMiddleware, SubAgentRegistration,
11 SummarizationMiddleware,
12};
13use agents_core::agent::{
14 AgentDescriptor, AgentHandle, PlannerAction, PlannerContext, PlannerHandle, ToolHandle,
15 ToolResponse,
16};
17use agents_core::hitl::{AgentInterrupt, HitlAction, HitlInterrupt};
18use agents_core::messaging::{
19 AgentMessage, MessageContent, MessageMetadata, MessageRole, ToolInvocation,
20};
21use agents_core::persistence::{Checkpointer, ThreadId};
22use agents_core::state::AgentStateSnapshot;
23use async_trait::async_trait;
24use serde_json::Value;
25use std::collections::{HashMap, HashSet};
26use std::sync::{Arc, RwLock};
27
28const BUILTIN_TOOL_NAMES: &[&str] = &["write_todos", "ls", "read_file", "write_file", "edit_file"];
30
31pub struct DeepAgent {
38 descriptor: AgentDescriptor,
39 instructions: String,
40 planner: Arc<dyn PlannerHandle>,
41 middlewares: Vec<Arc<dyn AgentMiddleware>>,
42 base_tools: Vec<Arc<dyn ToolHandle>>,
43 state: Arc<RwLock<AgentStateSnapshot>>,
44 history: Arc<RwLock<Vec<AgentMessage>>>,
45 _summarization: Option<Arc<SummarizationMiddleware>>,
46 hitl: Option<Arc<HumanInLoopMiddleware>>,
47 pending_hitl: Arc<RwLock<Option<HitlPending>>>,
48 builtin_tools: Option<HashSet<String>>,
49 checkpointer: Option<Arc<dyn Checkpointer>>,
50}
51
52struct HitlPending {
53 tool_name: String,
54 payload: Value,
55 tool: Arc<dyn ToolHandle>,
56 message: AgentMessage,
57}
58
59impl DeepAgent {
60 fn collect_tools(&self) -> HashMap<String, Arc<dyn ToolHandle>> {
61 let mut tools: HashMap<String, Arc<dyn ToolHandle>> = HashMap::new();
62 for tool in &self.base_tools {
63 tools.insert(tool.name().to_string(), tool.clone());
64 }
65 for middleware in &self.middlewares {
66 for tool in middleware.tools() {
67 if self.should_include(tool.name()) {
68 tools.insert(tool.name().to_string(), tool);
69 }
70 }
71 }
72 tools
73 }
74 fn should_include(&self, name: &str) -> bool {
77 let is_builtin = BUILTIN_TOOL_NAMES.contains(&name);
78 if !is_builtin {
79 return true;
80 }
81 match &self.builtin_tools {
82 None => true,
83 Some(selected) => selected.contains(name),
84 }
85 }
86
87 fn append_history(&self, message: AgentMessage) {
88 if let Ok(mut history) = self.history.write() {
89 history.push(message);
90 }
91 }
92
93 fn current_history(&self) -> Vec<AgentMessage> {
94 self.history.read().map(|h| h.clone()).unwrap_or_default()
95 }
96
97 pub async fn save_state(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
99 if let Some(ref checkpointer) = self.checkpointer {
100 let state = self
101 .state
102 .read()
103 .map_err(|_| anyhow::anyhow!("Failed to read agent state"))?
104 .clone();
105 checkpointer.save_state(thread_id, &state).await
106 } else {
107 tracing::warn!("Attempted to save state but no checkpointer is configured");
108 Ok(())
109 }
110 }
111
112 pub async fn load_state(&self, thread_id: &ThreadId) -> anyhow::Result<bool> {
114 if let Some(ref checkpointer) = self.checkpointer {
115 if let Some(saved_state) = checkpointer.load_state(thread_id).await? {
116 *self
117 .state
118 .write()
119 .map_err(|_| anyhow::anyhow!("Failed to write agent state"))? = saved_state;
120 tracing::info!(thread_id = %thread_id, "Loaded agent state from checkpointer");
121 Ok(true)
122 } else {
123 tracing::debug!(thread_id = %thread_id, "No saved state found for thread");
124 Ok(false)
125 }
126 } else {
127 tracing::warn!("Attempted to load state but no checkpointer is configured");
128 Ok(false)
129 }
130 }
131
132 pub async fn delete_thread(&self, thread_id: &ThreadId) -> anyhow::Result<()> {
134 if let Some(ref checkpointer) = self.checkpointer {
135 checkpointer.delete_thread(thread_id).await
136 } else {
137 tracing::warn!("Attempted to delete thread state but no checkpointer is configured");
138 Ok(())
139 }
140 }
141
142 pub async fn list_threads(&self) -> anyhow::Result<Vec<ThreadId>> {
144 if let Some(ref checkpointer) = self.checkpointer {
145 checkpointer.list_threads().await
146 } else {
147 Ok(Vec::new())
148 }
149 }
150
151 async fn execute_tool(
152 &self,
153 tool: Arc<dyn ToolHandle>,
154 tool_name: String,
155 payload: Value,
156 ) -> anyhow::Result<AgentMessage> {
157 let response = tool
158 .invoke(ToolInvocation {
159 tool_name: tool_name.clone(),
160 args: payload,
161 tool_call_id: None,
162 })
163 .await?;
164
165 Ok(self.apply_tool_response(response))
166 }
167
168 fn apply_tool_response(&self, response: ToolResponse) -> AgentMessage {
169 match response {
170 ToolResponse::Message(message) => {
171 self.append_history(message.clone());
172 message
173 }
174 ToolResponse::Command(command) => {
175 if let Ok(mut state) = self.state.write() {
176 command.clone().apply_to(&mut state);
177 }
178 let mut final_message = None;
179 for message in &command.messages {
180 self.append_history(message.clone());
181 final_message = Some(message.clone());
182 }
183 final_message.unwrap_or_else(|| AgentMessage {
184 role: MessageRole::Tool,
185 content: MessageContent::Text("Command executed.".into()),
186 metadata: Some(MessageMetadata {
187 tool_call_id: None,
188 cache_control: None,
189 }),
190 })
191 }
192 }
193 }
194
195 pub fn current_interrupt(&self) -> Option<AgentInterrupt> {
196 self.pending_hitl.read().ok().and_then(|guard| {
197 guard.as_ref().map(|pending| {
198 AgentInterrupt::HumanInLoop(HitlInterrupt {
199 tool_name: pending.tool_name.clone(),
200 message: pending.message.clone(),
201 })
202 })
203 })
204 }
205
206 pub async fn resume_hitl(&self, action: HitlAction) -> anyhow::Result<AgentMessage> {
207 let pending = self
208 .pending_hitl
209 .write()
210 .ok()
211 .and_then(|mut guard| guard.take())
212 .ok_or_else(|| anyhow::anyhow!("No pending HITL action"))?;
213 match action {
214 HitlAction::Approve => {
215 let result = self
216 .execute_tool(
217 pending.tool.clone(),
218 pending.tool_name.clone(),
219 pending.payload.clone(),
220 )
221 .await?;
222 Ok(result)
223 }
224 HitlAction::Reject { reason } => {
225 let text =
226 reason.unwrap_or_else(|| "Tool execution rejected by human reviewer.".into());
227 let message = AgentMessage {
228 role: MessageRole::System,
229 content: MessageContent::Text(text),
230 metadata: None,
231 };
232 self.append_history(message.clone());
233 Ok(message)
234 }
235 HitlAction::Respond { message } => {
236 self.append_history(message.clone());
237 Ok(message)
238 }
239 HitlAction::Edit { action, args } => {
240 let tools = self.collect_tools();
242 if let Some(tool) = tools.get(&action).cloned() {
243 let result = self.execute_tool(tool, action, args).await?;
244 Ok(result)
245 } else {
246 Ok(AgentMessage {
247 role: MessageRole::System,
248 content: MessageContent::Text(format!(
249 "Edited tool '{}' not available",
250 action
251 )),
252 metadata: None,
253 })
254 }
255 }
256 }
257 }
258
259 pub async fn handle_message(
261 &self,
262 input: impl AsRef<str>,
263 state: Arc<AgentStateSnapshot>,
264 ) -> anyhow::Result<AgentMessage> {
265 self.handle_message_with_metadata(input, None, state).await
266 }
267
268 pub async fn handle_message_with_metadata(
270 &self,
271 input: impl AsRef<str>,
272 metadata: Option<MessageMetadata>,
273 state: Arc<AgentStateSnapshot>,
274 ) -> anyhow::Result<AgentMessage> {
275 let agent_message = AgentMessage {
276 role: MessageRole::User,
277 content: MessageContent::Text(input.as_ref().to_string()),
278 metadata,
279 };
280 self.handle_message_internal(agent_message, state).await
281 }
282
283 async fn handle_message_internal(
285 &self,
286 input: AgentMessage,
287 _state: Arc<AgentStateSnapshot>,
288 ) -> anyhow::Result<AgentMessage> {
289 self.append_history(input.clone());
290
291 let mut request = ModelRequest::new(&self.instructions, self.current_history());
292 let tools = self.collect_tools();
293 for middleware in &self.middlewares {
294 let mut ctx = MiddlewareContext::with_request(&mut request, self.state.clone());
295 middleware.modify_model_request(&mut ctx).await?;
296 }
297
298 let context = PlannerContext {
299 history: request.messages.clone(),
300 system_prompt: request.system_prompt.clone(),
301 };
302 let state_snapshot = Arc::new(self.state.read().map(|s| s.clone()).unwrap_or_default());
303
304 let decision = self.planner.plan(context, state_snapshot).await?;
305
306 match decision.next_action {
307 PlannerAction::Respond { message } => {
308 self.append_history(message.clone());
309 Ok(message)
310 }
311 PlannerAction::CallTool { tool_name, payload } => {
312 if let Some(tool) = tools.get(&tool_name).cloned() {
313 if let Some(hitl) = &self.hitl {
314 if let Some(policy) = hitl.requires_approval(&tool_name) {
315 let message_text = policy
316 .note
317 .clone()
318 .unwrap_or_else(|| "Awaiting human approval.".into());
319 let approval_message = AgentMessage {
320 role: MessageRole::System,
321 content: MessageContent::Text(format!(
322 "HITL_REQUIRED: Tool '{tool}' requires approval: {message}",
323 tool = tool_name,
324 message = message_text
325 )),
326 metadata: None,
327 };
328 let pending = HitlPending {
329 tool_name: tool_name.clone(),
330 payload: payload.clone(),
331 tool: tool.clone(),
332 message: approval_message.clone(),
333 };
334 if let Ok(mut guard) = self.pending_hitl.write() {
335 *guard = Some(pending);
336 }
337 self.append_history(approval_message.clone());
338 return Ok(approval_message);
339 }
340 }
341 self.execute_tool(tool.clone(), tool_name.clone(), payload.clone())
342 .await
343 } else {
344 Ok(AgentMessage {
345 role: MessageRole::Tool,
346 content: MessageContent::Text(format!(
347 "Tool '{tool}' not available",
348 tool = tool_name
349 )),
350 metadata: Some(MessageMetadata {
351 tool_call_id: None,
352 cache_control: None,
353 }),
354 })
355 }
356 }
357 PlannerAction::Terminate => Ok(AgentMessage {
358 role: MessageRole::Agent,
359 content: MessageContent::Text("Terminating conversation.".into()),
360 metadata: Some(MessageMetadata {
361 tool_call_id: None,
362 cache_control: None,
363 }),
364 }),
365 }
366 }
367}
368
369#[async_trait]
370impl AgentHandle for DeepAgent {
371 async fn describe(&self) -> AgentDescriptor {
372 self.descriptor.clone()
373 }
374
375 async fn handle_message(
376 &self,
377 input: AgentMessage,
378 _state: Arc<AgentStateSnapshot>,
379 ) -> anyhow::Result<AgentMessage> {
380 self.handle_message_internal(input, _state).await
381 }
382}
383
384pub fn create_deep_agent_from_config(config: DeepAgentConfig) -> DeepAgent {
389 let state = Arc::new(RwLock::new(AgentStateSnapshot::default()));
390 let history = Arc::new(RwLock::new(Vec::<AgentMessage>::new()));
391
392 let planning = Arc::new(PlanningMiddleware::new(state.clone()));
393 let filesystem = Arc::new(FilesystemMiddleware::new(state.clone()));
394
395 let mut registrations = config.subagents.clone();
397 if config.auto_general_purpose {
398 let has_gp = registrations
399 .iter()
400 .any(|r| r.descriptor.name == "general-purpose");
401 if !has_gp {
402 let mut sub_cfg =
404 DeepAgentConfig::new(config.instructions.clone(), config.planner.clone())
405 .with_auto_general_purpose(false)
406 .with_prompt_caching(config.enable_prompt_caching);
407 if let Some(ref selected) = config.builtin_tools {
408 sub_cfg = sub_cfg.with_builtin_tools(selected.iter().cloned());
409 }
410 if let Some(ref sum) = config.summarization {
411 sub_cfg = sub_cfg.with_summarization(sum.clone());
412 }
413 for t in &config.tools {
414 sub_cfg = sub_cfg.with_tool(t.clone());
415 }
416
417 let gp = create_deep_agent_from_config(sub_cfg);
418 registrations.push(SubAgentRegistration {
419 descriptor: SubAgentDescriptor {
420 name: "general-purpose".into(),
421 description: "Default reasoning agent".into(),
422 },
423 agent: Arc::new(gp),
424 });
425 }
426 }
427
428 let subagent = Arc::new(SubAgentMiddleware::new(registrations));
429 let base_prompt = Arc::new(BaseSystemPromptMiddleware);
430 let summarization = config.summarization.as_ref().map(|cfg| {
431 Arc::new(SummarizationMiddleware::new(
432 cfg.messages_to_keep,
433 cfg.summary_note.clone(),
434 ))
435 });
436 let hitl = if config.tool_interrupts.is_empty() {
437 None
438 } else {
439 Some(Arc::new(HumanInLoopMiddleware::new(
440 config.tool_interrupts.clone(),
441 )))
442 };
443
444 let mut middlewares: Vec<Arc<dyn AgentMiddleware>> =
446 vec![base_prompt, planning, filesystem, subagent];
447 if let Some(ref summary) = summarization {
448 middlewares.push(summary.clone());
449 }
450 if config.enable_prompt_caching {
451 middlewares.push(Arc::new(AnthropicPromptCachingMiddleware::with_defaults()));
452 }
453 if let Some(ref hitl_mw) = hitl {
454 middlewares.push(hitl_mw.clone());
455 }
456
457 DeepAgent {
458 descriptor: AgentDescriptor {
459 name: "deep-agent".into(),
460 version: "0.0.1".into(),
461 description: Some("Rust deep agent".into()),
462 },
463 instructions: config.instructions,
464 planner: config.planner,
465 middlewares,
466 base_tools: config.tools,
467 state,
468 history,
469 _summarization: summarization,
470 hitl,
471 pending_hitl: Arc::new(RwLock::new(None)),
472 builtin_tools: config.builtin_tools,
473 checkpointer: config.checkpointer,
474 }
475}