1mod events;
2mod profile;
3mod subagent;
4
5pub use events::{AgentEvent, QuestionResponder, TodoItem, TodoStatus};
6pub use profile::AgentProfile;
7
8use events::PendingToolCall;
9
10use crate::command::CommandRegistry;
11use crate::config::Config;
12use crate::db::Db;
13use crate::extension::{Event, EventContext, HookRegistry, HookResult};
14use crate::memory::MemoryStore;
15use crate::provider::{ContentBlock, Message, Provider, Role, StreamEventType, Usage};
16use crate::tools::ToolRegistry;
17use anyhow::{Context, Result};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::mpsc::UnboundedSender;
21use uuid::Uuid;
22
23const COMPACT_THRESHOLD: f32 = 0.8;
24const COMPACT_KEEP_MESSAGES: usize = 10;
25
26const MEMORY_INSTRUCTIONS: &str = "\n\n\
27# Memory
28
29You have persistent memory across conversations. **Core blocks** (above) are always visible — update them via `core_memory_update` for essential user/agent facts. **Archival memory** is searched per turn — use `memory_add`/`memory_search`/`memory_list`/`memory_delete` to manage it.
30
31When the user says \"remember\"/\"forget\"/\"what do you know about me\", use the appropriate memory tool. Memories are also auto-extracted in the background, so focus on explicit requests.";
32
33#[derive(Debug, Clone)]
35pub struct InterruptedToolCall {
36 pub name: String,
37 pub input: String,
38 pub output: Option<String>,
39 pub is_error: bool,
40}
41
42const TITLE_SYSTEM_PROMPT: &str = "\
43You are a title generator. You output ONLY a thread title. Nothing else.
44
45Generate a brief title that would help the user find this conversation later.
46
47Rules:
48- A single line, 50 characters or fewer
49- No explanations, no quotes, no punctuation wrapping
50- Use the same language as the user message
51- Title must be grammatically correct and read naturally
52- Never include tool names (e.g. read tool, bash tool, edit tool)
53- Focus on the main topic or question the user wants to retrieve
54- Vary your phrasing — avoid repetitive patterns like always starting with \"Analyzing\"
55- When a file is mentioned, focus on WHAT the user wants to do WITH the file
56- Keep exact: technical terms, numbers, filenames, HTTP codes
57- Remove filler words: the, this, my, a, an
58- If the user message is short or conversational (e.g. \"hello\", \"hey\"): \
59 create a title reflecting the user's tone (Greeting, Quick check-in, etc.)";
60
61pub struct Agent {
62 providers: Vec<Arc<dyn Provider>>,
63 active: usize,
64 tools: Arc<ToolRegistry>,
65 db: Db,
66 memory: Option<Arc<MemoryStore>>,
67 memory_auto_extract: bool,
68 memory_inject_count: usize,
69 conversation_id: String,
70 persisted: bool,
71 messages: Vec<Message>,
72 profiles: Vec<AgentProfile>,
73 active_profile: usize,
74 pub thinking_budget: u32,
75 cwd: String,
76 agents_context: crate::context::AgentsContext,
77 last_input_tokens: u32,
78 permissions: HashMap<String, String>,
79 snapshots: crate::snapshot::SnapshotManager,
80 hooks: HookRegistry,
81 commands: CommandRegistry,
82 subagent_enabled: bool,
83 subagent_max_turns: usize,
84 background_results: Arc<std::sync::Mutex<HashMap<String, String>>>,
85 background_handles: HashMap<String, tokio::task::JoinHandle<()>>,
86 background_tx: Option<UnboundedSender<AgentEvent>>,
87}
88
89impl Agent {
90 #[allow(clippy::too_many_arguments)]
91 pub fn new(
92 providers: Vec<Box<dyn Provider>>,
93 db: Db,
94 config: &Config,
95 memory: Option<Arc<MemoryStore>>,
96 tools: ToolRegistry,
97 profiles: Vec<AgentProfile>,
98 cwd: String,
99 agents_context: crate::context::AgentsContext,
100 hooks: HookRegistry,
101 commands: CommandRegistry,
102 ) -> Result<Self> {
103 assert!(!providers.is_empty(), "at least one provider required");
104 let providers: Vec<Arc<dyn Provider>> = providers.into_iter().map(Arc::from).collect();
105 let conversation_id = uuid::Uuid::new_v4().to_string();
106 tracing::debug!("Agent created with conversation {}", conversation_id);
107 let mut profiles = if profiles.is_empty() {
108 vec![AgentProfile::default_profile()]
109 } else {
110 profiles
111 };
112 if !profiles.iter().any(|p| p.name == "plan") {
113 let at = 1.min(profiles.len());
114 profiles.insert(at, AgentProfile::plan_profile());
115 }
116 Ok(Agent {
117 providers,
118 active: 0,
119 tools: Arc::new(tools),
120 db,
121 memory,
122 memory_auto_extract: config.memory.auto_extract,
123 memory_inject_count: config.memory.inject_count,
124 conversation_id,
125 persisted: false,
126 messages: Vec::new(),
127 profiles,
128 active_profile: 0,
129 thinking_budget: 0,
130 cwd,
131 agents_context,
132 last_input_tokens: 0,
133 permissions: config.permissions.clone(),
134 snapshots: crate::snapshot::SnapshotManager::new(),
135 hooks,
136 commands,
137 subagent_enabled: config.subagents.enabled,
138 subagent_max_turns: config.subagents.max_turns,
139 background_results: Arc::new(std::sync::Mutex::new(HashMap::new())),
140 background_handles: HashMap::new(),
141 background_tx: None,
142 })
143 }
144 fn ensure_persisted(&mut self) -> Result<()> {
145 if !self.persisted {
146 self.db.create_conversation_with_id(
147 &self.conversation_id,
148 self.provider().model(),
149 self.provider().name(),
150 &self.cwd,
151 )?;
152 self.persisted = true;
153 }
154 Ok(())
155 }
156 fn provider(&self) -> &dyn Provider {
157 &*self.providers[self.active]
158 }
159 fn provider_arc(&self) -> Arc<dyn Provider> {
160 Arc::clone(&self.providers[self.active])
161 }
162 pub fn set_background_tx(&mut self, tx: UnboundedSender<AgentEvent>) {
163 self.background_tx = Some(tx);
164 }
165 fn event_context(&self, event: &Event) -> EventContext {
166 EventContext {
167 event: event.as_str().to_string(),
168 model: self.provider().model().to_string(),
169 provider: self.provider().name().to_string(),
170 cwd: self.cwd.clone(),
171 session_id: self.conversation_id.clone(),
172 ..Default::default()
173 }
174 }
175 pub fn execute_command(&self, name: &str, args: &str) -> Result<String> {
176 self.commands.execute(name, args, &self.cwd)
177 }
178 pub fn list_commands(&self) -> Vec<(&str, &str)> {
179 self.commands.list()
180 }
181 pub fn has_command(&self, name: &str) -> bool {
182 self.commands.has(name)
183 }
184 pub fn hooks(&self) -> &HookRegistry {
185 &self.hooks
186 }
187 fn profile(&self) -> &AgentProfile {
188 &self.profiles[self.active_profile]
189 }
190 pub fn conversation_id(&self) -> &str {
191 &self.conversation_id
192 }
193 pub fn messages(&self) -> &[Message] {
194 &self.messages
195 }
196 pub fn set_model(&mut self, model: String) {
197 if let Some(p) = Arc::get_mut(&mut self.providers[self.active]) {
198 p.set_model(model);
199 } else {
200 tracing::warn!("cannot change model while background subagent is active");
201 }
202 }
203 pub fn set_active_provider(&mut self, provider_name: &str, model: &str) {
204 if let Some(idx) = self
205 .providers
206 .iter()
207 .position(|p| p.name() == provider_name)
208 {
209 self.active = idx;
210 if let Some(p) = Arc::get_mut(&mut self.providers[idx]) {
211 p.set_model(model.to_string());
212 } else {
213 tracing::warn!("cannot change model while background subagent is active");
214 }
215 }
216 }
217 pub fn set_thinking_budget(&mut self, budget: u32) {
218 self.thinking_budget = budget;
219 }
220 pub fn available_models(&self) -> Vec<String> {
221 self.provider().available_models()
222 }
223 pub fn cached_all_models(&self) -> Vec<(String, Vec<String>)> {
224 self.providers
225 .iter()
226 .map(|p| (p.name().to_string(), p.available_models()))
227 .collect()
228 }
229 pub async fn fetch_all_models(&mut self) -> Vec<(String, Vec<String>)> {
230 let mut result = Vec::new();
231 for p in &self.providers {
232 let models = match p.fetch_models().await {
233 Ok(m) => m,
234 Err(e) => {
235 tracing::warn!("Failed to fetch models for {}: {e}", p.name());
236 Vec::new()
237 }
238 };
239 result.push((p.name().to_string(), models));
240 }
241 result
242 }
243 pub fn current_model(&self) -> &str {
244 self.provider().model()
245 }
246 pub fn current_provider_name(&self) -> &str {
247 self.provider().name()
248 }
249 pub fn current_agent_name(&self) -> &str {
250 &self.profile().name
251 }
252 pub fn context_window(&self) -> u32 {
253 self.provider().context_window()
254 }
255 pub async fn fetch_context_window(&self) -> u32 {
256 match self.provider().fetch_context_window().await {
257 Ok(cw) => cw,
258 Err(e) => {
259 tracing::warn!("Failed to fetch context window: {e}");
260 0
261 }
262 }
263 }
264 pub fn agent_profiles(&self) -> &[AgentProfile] {
265 &self.profiles
266 }
267 pub fn switch_agent(&mut self, name: &str) -> bool {
268 if let Some(idx) = self.profiles.iter().position(|p| p.name == name) {
269 self.active_profile = idx;
270 let model_spec = self.profiles[idx].model_spec.clone();
271
272 if let Some(spec) = model_spec {
273 let (provider, model) = Config::parse_model_spec(&spec);
274 if let Some(prov) = provider {
275 self.set_active_provider(prov, model);
276 } else {
277 self.set_model(model.to_string());
278 }
279 }
280 tracing::info!("Switched to agent '{}'", name);
281 true
282 } else {
283 false
284 }
285 }
286 pub fn cleanup_if_empty(&mut self) {
287 if self.messages.is_empty() && self.persisted {
288 let _ = self.db.delete_conversation(&self.conversation_id);
289 }
290 }
291 pub fn new_conversation(&mut self) -> Result<()> {
292 self.cleanup_if_empty();
293 self.conversation_id = uuid::Uuid::new_v4().to_string();
294 self.persisted = false;
295 self.messages.clear();
296 Ok(())
297 }
298 pub fn resume_conversation(&mut self, conversation: &crate::db::Conversation) -> Result<()> {
299 self.conversation_id = conversation.id.clone();
300 self.persisted = true;
301 self.messages = conversation
302 .messages
303 .iter()
304 .map(|m| Message {
305 role: if m.role == "user" {
306 Role::User
307 } else {
308 Role::Assistant
309 },
310 content: vec![ContentBlock::Text(m.content.clone())],
311 })
312 .collect();
313 tracing::debug!("Resumed conversation {}", conversation.id);
314 {
315 let ctx = self.event_context(&Event::OnResume);
316 self.hooks.emit(&Event::OnResume, &ctx);
317 }
318 Ok(())
319 }
320
321 pub fn add_interrupted_message(
324 &mut self,
325 content: String,
326 tool_calls: Vec<InterruptedToolCall>,
327 thinking: Option<String>,
328 ) -> Result<()> {
329 let mut blocks: Vec<ContentBlock> = Vec::new();
330 if let Some(t) = thinking {
331 if !t.is_empty() {
332 blocks.push(ContentBlock::Thinking {
333 thinking: t,
334 signature: String::new(),
335 });
336 }
337 }
338 if !content.is_empty() {
339 blocks.push(ContentBlock::Text(content.clone()));
340 }
341 let mut tool_ids: Vec<String> = Vec::new();
342 for tc in &tool_calls {
343 let id = Uuid::new_v4().to_string();
344 tool_ids.push(id.clone());
345 let input_value: serde_json::Value =
346 serde_json::from_str(&tc.input).unwrap_or_else(|_| serde_json::json!({}));
347 blocks.push(ContentBlock::ToolUse {
348 id: id.clone(),
349 name: tc.name.clone(),
350 input: input_value,
351 });
352 }
353 for (tc, id) in tool_calls.iter().zip(tool_ids.iter()) {
354 blocks.push(ContentBlock::ToolResult {
355 tool_use_id: id.clone(),
356 content: tc.output.clone().unwrap_or_default(),
357 is_error: tc.is_error,
358 });
359 }
360 self.messages.push(Message {
361 role: Role::Assistant,
362 content: blocks,
363 });
364 let stored_text = if content.is_empty() {
365 String::from("[tool use]")
366 } else {
367 content
368 };
369 self.ensure_persisted()?;
370 let assistant_msg_id =
371 self.db
372 .add_message(&self.conversation_id, "assistant", &stored_text)?;
373 for (tc, id) in tool_calls.iter().zip(tool_ids.iter()) {
374 let _ = self
375 .db
376 .add_tool_call(&assistant_msg_id, id, &tc.name, &tc.input);
377 if let Some(ref output) = tc.output {
378 let _ = self.db.update_tool_result(id, output, tc.is_error);
379 }
380 }
381 Ok(())
382 }
383 pub fn list_sessions(&self) -> Result<Vec<crate::db::ConversationSummary>> {
384 self.db.list_conversations_for_cwd(&self.cwd, 50)
385 }
386 pub fn get_session(&self, id: &str) -> Result<crate::db::Conversation> {
387 self.db.get_conversation(id)
388 }
389 pub fn conversation_title(&self) -> Option<String> {
390 self.db
391 .get_conversation(&self.conversation_id)
392 .ok()
393 .and_then(|c| c.title)
394 }
395 pub fn rename_session(&self, title: &str) -> Result<()> {
396 self.db
397 .update_conversation_title(&self.conversation_id, title)
398 .context("failed to rename session")
399 }
400 pub fn cwd(&self) -> &str {
401 &self.cwd
402 }
403
404 pub fn truncate_messages(&mut self, count: usize) {
405 let target = count.min(self.messages.len());
406 self.messages.truncate(target);
407 }
408
409 pub fn revert_to_message(&mut self, keep: usize) -> Result<Vec<String>> {
410 let keep = keep.min(self.messages.len());
411 let checkpoint_idx = self.messages[..keep]
412 .iter()
413 .filter(|m| m.role == Role::Assistant)
414 .count();
415 self.messages.truncate(keep);
416 self.db
417 .truncate_messages(&self.conversation_id, keep)
418 .context("truncating db messages")?;
419 let restored = if checkpoint_idx > 0 {
420 let res = self.snapshots.restore_to_checkpoint(checkpoint_idx - 1)?;
421 self.snapshots.truncate_checkpoints(checkpoint_idx);
422 res
423 } else {
424 let res = self.snapshots.restore_all()?;
425 self.snapshots.truncate_checkpoints(0);
426 res
427 };
428 Ok(restored)
429 }
430
431 pub fn fork_conversation(&mut self, msg_count: usize) -> Result<()> {
432 let kept = self.messages[..msg_count.min(self.messages.len())].to_vec();
433 self.cleanup_if_empty();
434 let conversation_id = self.db.create_conversation(
435 self.provider().model(),
436 self.provider().name(),
437 &self.cwd,
438 )?;
439 self.conversation_id = conversation_id;
440 self.persisted = true;
441 self.messages = kept;
442 for msg in &self.messages {
443 let role = match msg.role {
444 Role::User => "user",
445 Role::Assistant => "assistant",
446 Role::System => "system",
447 };
448 let text: String = msg
449 .content
450 .iter()
451 .filter_map(|b| {
452 if let ContentBlock::Text(t) = b {
453 Some(t.as_str())
454 } else {
455 None
456 }
457 })
458 .collect::<Vec<_>>()
459 .join("\n");
460 if !text.is_empty() {
461 let _ = self.db.add_message(&self.conversation_id, role, &text);
462 }
463 }
464 Ok(())
465 }
466
467 fn title_model(&self) -> &str {
468 self.provider().model()
469 }
470
471 fn should_compact(&self) -> bool {
472 let limit = self.provider().context_window();
473 let threshold = (limit as f32 * COMPACT_THRESHOLD) as u32;
474 self.last_input_tokens >= threshold
475 }
476 fn emit_compact_hooks(&self, phase: &Event) {
477 let ctx = self.event_context(phase);
478 self.hooks.emit(phase, &ctx);
479 }
480 async fn compact(&mut self, event_tx: &UnboundedSender<AgentEvent>) -> Result<()> {
481 let keep = COMPACT_KEEP_MESSAGES;
482 if self.messages.len() <= keep + 2 {
483 return Ok(());
484 }
485 let cutoff = self.messages.len() - keep;
486 let old_messages = self.messages[..cutoff].to_vec();
487 let kept = self.messages[cutoff..].to_vec();
488
489 let mut summary_text = String::new();
490 for msg in &old_messages {
491 let role = match msg.role {
492 Role::User => "User",
493 Role::Assistant => "Assistant",
494 Role::System => "System",
495 };
496 for block in &msg.content {
497 if let ContentBlock::Text(t) = block {
498 summary_text.push_str(&format!("{}:\n{}\n\n", role, t));
499 }
500 }
501 }
502 let summary_request = vec![Message {
503 role: Role::User,
504 content: vec![ContentBlock::Text(format!(
505 "Summarize the following conversation history concisely, preserving all key decisions, facts, code changes, and context that would be needed to continue the work:\n\n{}",
506 summary_text
507 ))],
508 }];
509
510 self.emit_compact_hooks(&Event::BeforeCompact);
511 let mut stream_rx = self
512 .provider()
513 .stream(
514 &summary_request,
515 Some("You are a concise summarizer. Produce a dense, factual summary."),
516 &[],
517 4096,
518 0,
519 )
520 .await?;
521 let mut full_summary = String::new();
522 while let Some(event) = stream_rx.recv().await {
523 if let StreamEventType::TextDelta(text) = event.event_type {
524 full_summary.push_str(&text);
525 }
526 }
527 self.messages = vec![
528 Message {
529 role: Role::User,
530 content: vec![ContentBlock::Text(
531 "[Previous conversation summarized below]".to_string(),
532 )],
533 },
534 Message {
535 role: Role::Assistant,
536 content: vec![ContentBlock::Text(format!(
537 "Summary of prior context:\n\n{}",
538 full_summary
539 ))],
540 },
541 ];
542 self.messages.extend(kept);
543
544 let _ = self.db.add_message(
545 &self.conversation_id,
546 "assistant",
547 &format!("[Compacted {} messages into summary]", cutoff),
548 );
549 self.last_input_tokens = 0;
550 let _ = event_tx.send(AgentEvent::Compacted {
551 messages_removed: cutoff,
552 });
553 self.emit_compact_hooks(&Event::AfterCompact);
554 Ok(())
555 }
556 fn sanitize(&mut self) {
564 loop {
565 let dominated = match self.messages.last() {
566 None => false,
567 Some(msg) if msg.role == Role::User => {
568 !msg.content.is_empty()
569 && msg
570 .content
571 .iter()
572 .all(|b| matches!(b, ContentBlock::ToolResult { .. }))
573 }
574 Some(msg) if msg.role == Role::Assistant => msg
575 .content
576 .iter()
577 .any(|b| matches!(b, ContentBlock::ToolUse { .. })),
578 _ => false,
579 };
580 if dominated {
581 self.messages.pop();
582 } else {
583 break;
584 }
585 }
586 if matches!(self.messages.last(), Some(msg) if msg.role == Role::User) {
590 self.messages.pop();
591 }
592 }
593
594 pub async fn send_message(
595 &mut self,
596 content: &str,
597 event_tx: UnboundedSender<AgentEvent>,
598 ) -> Result<()> {
599 self.send_message_with_images(content, Vec::new(), event_tx)
600 .await
601 }
602
603 pub async fn send_message_with_images(
604 &mut self,
605 content: &str,
606 images: Vec<(String, String)>,
607 event_tx: UnboundedSender<AgentEvent>,
608 ) -> Result<()> {
609 self.sanitize();
610 {
611 let mut ctx = self.event_context(&Event::OnUserInput);
612 ctx.prompt = Some(content.to_string());
613 self.hooks.emit(&Event::OnUserInput, &ctx);
614 }
615 if !self.provider().supports_server_compaction() && self.should_compact() {
616 self.compact(&event_tx).await?;
617 }
618 self.ensure_persisted()?;
619 self.db
620 .add_message(&self.conversation_id, "user", content)?;
621 let mut blocks: Vec<ContentBlock> = Vec::new();
622 for (media_type, data) in images {
623 blocks.push(ContentBlock::Image { media_type, data });
624 }
625 blocks.push(ContentBlock::Text(content.to_string()));
626 self.messages.push(Message {
627 role: Role::User,
628 content: blocks,
629 });
630 let title_rx = if self.messages.len() == 1 {
631 let preview: String = content.chars().take(50).collect();
632 let preview = preview.trim().to_string();
633 if !preview.is_empty() {
634 let _ = self
635 .db
636 .update_conversation_title(&self.conversation_id, &preview);
637 let _ = event_tx.send(AgentEvent::TitleGenerated(preview));
638 }
639 let title_messages = vec![Message {
640 role: Role::User,
641 content: vec![ContentBlock::Text(format!(
642 "Generate a title for this conversation:\n\n{}",
643 content
644 ))],
645 }];
646 match self
647 .provider()
648 .stream_with_model(
649 self.title_model(),
650 &title_messages,
651 Some(TITLE_SYSTEM_PROMPT),
652 &[],
653 100,
654 0,
655 )
656 .await
657 {
658 Ok(rx) => Some(rx),
659 Err(e) => {
660 tracing::warn!("title generation stream failed: {e}");
661 None
662 }
663 }
664 } else {
665 None
666 };
667 let mut final_usage: Option<Usage> = None;
668 let mut system_prompt = self
669 .agents_context
670 .apply_to_system_prompt(&self.profile().system_prompt);
671 if let Some(ref store) = self.memory {
672 let query: String = content.chars().take(200).collect();
673 match store.inject_context(&query, self.memory_inject_count) {
674 Ok(ctx) if !ctx.is_empty() => {
675 system_prompt.push_str("\n\n");
676 system_prompt.push_str(&ctx);
677 }
678 Err(e) => tracing::warn!("memory injection failed: {e}"),
679 _ => {}
680 }
681 system_prompt.push_str(MEMORY_INSTRUCTIONS);
682 }
683 let tool_filter = self.profile().tool_filter.clone();
684 let thinking_budget = self.thinking_budget;
685 loop {
686 let mut tool_defs = self.tools.definitions_filtered(&tool_filter);
687 tool_defs.push(crate::provider::ToolDefinition {
688 name: "todo_write".to_string(),
689 description: "Create or update the task list for the current session. Use to track progress on multi-step tasks.".to_string(),
690 input_schema: serde_json::json!({
691 "type": "object",
692 "properties": {
693 "todos": {
694 "type": "array",
695 "items": {
696 "type": "object",
697 "properties": {
698 "content": { "type": "string", "description": "Brief description of the task" },
699 "status": { "type": "string", "enum": ["pending", "in_progress", "completed"], "description": "Current status" }
700 },
701 "required": ["content", "status"]
702 }
703 }
704 },
705 "required": ["todos"]
706 }),
707 });
708 tool_defs.push(crate::provider::ToolDefinition {
709 name: "question".to_string(),
710 description: "Ask the user a question and wait for their response. Use when you need clarification or a decision from the user.".to_string(),
711 input_schema: serde_json::json!({
712 "type": "object",
713 "properties": {
714 "question": { "type": "string", "description": "The question to ask the user" },
715 "options": { "type": "array", "items": { "type": "string" }, "description": "Optional list of choices" }
716 },
717 "required": ["question"]
718 }),
719 });
720 tool_defs.push(crate::provider::ToolDefinition {
721 name: "snapshot_list".to_string(),
722 description: "List all files that have been created or modified in this session."
723 .to_string(),
724 input_schema: serde_json::json!({
725 "type": "object",
726 "properties": {},
727 }),
728 });
729 tool_defs.push(crate::provider::ToolDefinition {
730 name: "snapshot_restore".to_string(),
731 description: "Restore a file to its original state before this session modified it. Pass a path or omit to restore all files.".to_string(),
732 input_schema: serde_json::json!({
733 "type": "object",
734 "properties": {
735 "path": { "type": "string", "description": "File path to restore (omit to restore all)" }
736 },
737 }),
738 });
739 if self.subagent_enabled {
740 let profile_names: Vec<String> =
741 self.profiles.iter().map(|p| p.name.clone()).collect();
742 let profiles_desc = if profile_names.is_empty() {
743 String::new()
744 } else {
745 format!(" Available profiles: {}.", profile_names.join(", "))
746 };
747 tool_defs.push(crate::provider::ToolDefinition {
748 name: "subagent".to_string(),
749 description: format!(
750 "Delegate a focused task to a subagent that runs in isolated context with its own conversation. \
751 The subagent has access to tools and works autonomously without user interaction. \
752 Use for complex subtasks that benefit from separate context (research, code analysis, multi-file changes). \
753 Set background=true to run non-blocking (returns immediately with an ID; retrieve results later with subagent_result).{}",
754 profiles_desc
755 ),
756 input_schema: serde_json::json!({
757 "type": "object",
758 "properties": {
759 "description": {
760 "type": "string",
761 "description": "What the subagent should do (used as system prompt context)"
762 },
763 "task": {
764 "type": "string",
765 "description": "The specific task prompt for the subagent"
766 },
767 "profile": {
768 "type": "string",
769 "description": "Agent profile to use (affects available tools and system prompt)"
770 },
771 "background": {
772 "type": "boolean",
773 "description": "Run in background (non-blocking). Returns an ID to check later with subagent_result."
774 }
775 },
776 "required": ["description", "task"]
777 }),
778 });
779 tool_defs.push(crate::provider::ToolDefinition {
780 name: "subagent_result".to_string(),
781 description: "Retrieve the result of a background subagent by ID. Returns the output if complete, or a status message if still running.".to_string(),
782 input_schema: serde_json::json!({
783 "type": "object",
784 "properties": {
785 "id": {
786 "type": "string",
787 "description": "The subagent ID returned when it was launched in background mode"
788 }
789 },
790 "required": ["id"]
791 }),
792 });
793 }
794 if self.memory.is_some() {
795 tool_defs.extend(crate::memory::tools::definitions());
796 }
797 {
798 let mut ctx = self.event_context(&Event::BeforePrompt);
799 ctx.prompt = Some(content.to_string());
800 match self.hooks.emit_blocking(&Event::BeforePrompt, &ctx) {
801 HookResult::Block(reason) => {
802 let _ = event_tx.send(AgentEvent::TextComplete(format!(
803 "[blocked by hook: {}]",
804 reason.trim()
805 )));
806 return Ok(());
807 }
808 HookResult::Modify(_modified) => {}
809 HookResult::Allow => {}
810 }
811 }
812 self.hooks.emit(
813 &Event::OnStreamStart,
814 &self.event_context(&Event::OnStreamStart),
815 );
816 let mut stream_rx = self
817 .provider()
818 .stream(
819 &self.messages,
820 Some(&system_prompt),
821 &tool_defs,
822 8192,
823 thinking_budget,
824 )
825 .await?;
826 self.hooks.emit(
827 &Event::OnStreamEnd,
828 &self.event_context(&Event::OnStreamEnd),
829 );
830 let mut full_text = String::new();
831 let mut full_thinking = String::new();
832 let mut full_thinking_signature = String::new();
833 let mut compaction_content: Option<String> = None;
834 let mut tool_calls: Vec<PendingToolCall> = Vec::new();
835 let mut current_tool_input = String::new();
836 while let Some(event) = stream_rx.recv().await {
837 match event.event_type {
838 StreamEventType::TextDelta(text) => {
839 full_text.push_str(&text);
840 let _ = event_tx.send(AgentEvent::TextDelta(text));
841 }
842 StreamEventType::ThinkingDelta(text) => {
843 full_thinking.push_str(&text);
844 let _ = event_tx.send(AgentEvent::ThinkingDelta(text));
845 }
846 StreamEventType::ThinkingComplete {
847 thinking,
848 signature,
849 } => {
850 full_thinking = thinking;
851 full_thinking_signature = signature;
852 }
853 StreamEventType::CompactionComplete(content) => {
854 let _ = event_tx.send(AgentEvent::Compacting);
855 compaction_content = Some(content);
856 }
857 StreamEventType::ToolUseStart { id, name } => {
858 current_tool_input.clear();
859 let _ = event_tx.send(AgentEvent::ToolCallStart {
860 id: id.clone(),
861 name: name.clone(),
862 });
863 tool_calls.push(PendingToolCall {
864 id,
865 name,
866 input: String::new(),
867 });
868 }
869 StreamEventType::ToolUseInputDelta(delta) => {
870 current_tool_input.push_str(&delta);
871 let _ = event_tx.send(AgentEvent::ToolCallInputDelta(delta));
872 }
873 StreamEventType::ToolUseEnd => {
874 if let Some(tc) = tool_calls.last_mut() {
875 tc.input = current_tool_input.clone();
876 }
877 current_tool_input.clear();
878 }
879 StreamEventType::MessageEnd {
880 stop_reason: _,
881 usage,
882 } => {
883 self.last_input_tokens = usage.input_tokens;
884 let _ = self
885 .db
886 .update_last_input_tokens(&self.conversation_id, usage.input_tokens);
887 final_usage = Some(usage);
888 }
889
890 _ => {}
891 }
892 }
893
894 let mut content_blocks: Vec<ContentBlock> = Vec::new();
895 if let Some(ref summary) = compaction_content {
896 content_blocks.push(ContentBlock::Compaction {
897 content: summary.clone(),
898 });
899 }
900 if !full_thinking.is_empty() {
901 content_blocks.push(ContentBlock::Thinking {
902 thinking: full_thinking.clone(),
903 signature: full_thinking_signature.clone(),
904 });
905 }
906 if !full_text.is_empty() {
907 content_blocks.push(ContentBlock::Text(full_text.clone()));
908 }
909
910 for tc in &tool_calls {
911 let input_value: serde_json::Value =
912 serde_json::from_str(&tc.input).unwrap_or_else(|_| serde_json::json!({}));
913 content_blocks.push(ContentBlock::ToolUse {
914 id: tc.id.clone(),
915 name: tc.name.clone(),
916 input: input_value,
917 });
918 }
919
920 self.messages.push(Message {
921 role: Role::Assistant,
922 content: content_blocks,
923 });
924 let stored_text = if !full_text.is_empty() {
925 full_text.clone()
926 } else {
927 String::from("[tool use]")
928 };
929 let assistant_msg_id =
930 self.db
931 .add_message(&self.conversation_id, "assistant", &stored_text)?;
932 for tc in &tool_calls {
933 let _ = self
934 .db
935 .add_tool_call(&assistant_msg_id, &tc.id, &tc.name, &tc.input);
936 }
937 {
938 let mut ctx = self.event_context(&Event::AfterPrompt);
939 ctx.prompt = Some(full_text.clone());
940 self.hooks.emit(&Event::AfterPrompt, &ctx);
941 }
942 self.snapshots.checkpoint();
943 if tool_calls.is_empty() {
944 let _ = event_tx.send(AgentEvent::TextComplete(full_text));
945 if let Some(usage) = final_usage {
946 let _ = event_tx.send(AgentEvent::Done { usage });
947 }
948 if self.memory_auto_extract
949 && let Some(ref store) = self.memory
950 {
951 let msgs = self.messages.clone();
952 let provider = self.provider_arc();
953 let store = Arc::clone(store);
954 let conv_id = self.conversation_id.clone();
955 let etx = event_tx.clone();
956 tokio::spawn(async move {
957 match crate::memory::extract::extract(&msgs, &*provider, &store, &conv_id)
958 .await
959 {
960 Ok(result)
961 if result.added > 0 || result.updated > 0 || result.deleted > 0 =>
962 {
963 let _ = etx.send(AgentEvent::MemoryExtracted {
964 added: result.added,
965 updated: result.updated,
966 deleted: result.deleted,
967 });
968 }
969 Err(e) => tracing::warn!("memory extraction failed: {e}"),
970 _ => {}
971 }
972 });
973 }
974 break;
975 }
976
977 let mut result_blocks: Vec<ContentBlock> = Vec::new();
978
979 for tc in &tool_calls {
980 let input_value: serde_json::Value =
981 serde_json::from_str(&tc.input).unwrap_or_else(|_| serde_json::json!({}));
982 if tc.name == "todo_write" {
984 if let Some(todos_arr) = input_value.get("todos").and_then(|v| v.as_array()) {
985 let items: Vec<TodoItem> = todos_arr
986 .iter()
987 .filter_map(|t| {
988 let content = t.get("content")?.as_str()?.to_string();
989 let status = match t
990 .get("status")
991 .and_then(|s| s.as_str())
992 .unwrap_or("pending")
993 {
994 "in_progress" => TodoStatus::InProgress,
995 "completed" => TodoStatus::Completed,
996 _ => TodoStatus::Pending,
997 };
998 Some(TodoItem { content, status })
999 })
1000 .collect();
1001 let _ = event_tx.send(AgentEvent::TodoUpdate(items));
1002 }
1003 let _ = event_tx.send(AgentEvent::ToolCallResult {
1004 id: tc.id.clone(),
1005 name: tc.name.clone(),
1006 output: "ok".to_string(),
1007 is_error: false,
1008 });
1009 result_blocks.push(ContentBlock::ToolResult {
1010 tool_use_id: tc.id.clone(),
1011 content: "ok".to_string(),
1012 is_error: false,
1013 });
1014 continue;
1015 }
1016 if tc.name == "question" {
1018 let question = input_value
1019 .get("question")
1020 .and_then(|v| v.as_str())
1021 .unwrap_or("?")
1022 .to_string();
1023 let options: Vec<String> = input_value
1024 .get("options")
1025 .and_then(|v| v.as_array())
1026 .map(|arr| {
1027 arr.iter()
1028 .filter_map(|v| v.as_str().map(String::from))
1029 .collect()
1030 })
1031 .unwrap_or_default();
1032 let (tx, rx) = tokio::sync::oneshot::channel();
1033 let _ = event_tx.send(AgentEvent::Question {
1034 id: tc.id.clone(),
1035 question: question.clone(),
1036 options,
1037 responder: QuestionResponder(tx),
1038 });
1039 let answer = match rx.await {
1040 Ok(a) => a,
1041 Err(_) => "[cancelled]".to_string(),
1042 };
1043 let _ = event_tx.send(AgentEvent::ToolCallResult {
1044 id: tc.id.clone(),
1045 name: tc.name.clone(),
1046 output: answer.clone(),
1047 is_error: false,
1048 });
1049 result_blocks.push(ContentBlock::ToolResult {
1050 tool_use_id: tc.id.clone(),
1051 content: answer,
1052 is_error: false,
1053 });
1054 continue;
1055 }
1056 if tc.name == "snapshot_list" {
1058 let changes = self.snapshots.list_changes();
1059 let output = if changes.is_empty() {
1060 "No file changes in this session.".to_string()
1061 } else {
1062 changes
1063 .iter()
1064 .map(|(p, k)| format!("{} {}", k.icon(), p))
1065 .collect::<Vec<_>>()
1066 .join("\n")
1067 };
1068 let _ = event_tx.send(AgentEvent::ToolCallResult {
1069 id: tc.id.clone(),
1070 name: tc.name.clone(),
1071 output: output.clone(),
1072 is_error: false,
1073 });
1074 result_blocks.push(ContentBlock::ToolResult {
1075 tool_use_id: tc.id.clone(),
1076 content: output,
1077 is_error: false,
1078 });
1079 continue;
1080 }
1081 if tc.name == "snapshot_restore" {
1083 let output =
1084 if let Some(path) = input_value.get("path").and_then(|v| v.as_str()) {
1085 match self.snapshots.restore(path) {
1086 Ok(msg) => msg,
1087 Err(e) => e.to_string(),
1088 }
1089 } else {
1090 match self.snapshots.restore_all() {
1091 Ok(msgs) => {
1092 if msgs.is_empty() {
1093 "Nothing to restore.".to_string()
1094 } else {
1095 msgs.join("\n")
1096 }
1097 }
1098 Err(e) => e.to_string(),
1099 }
1100 };
1101 let _ = event_tx.send(AgentEvent::ToolCallResult {
1102 id: tc.id.clone(),
1103 name: tc.name.clone(),
1104 output: output.clone(),
1105 is_error: false,
1106 });
1107 result_blocks.push(ContentBlock::ToolResult {
1108 tool_use_id: tc.id.clone(),
1109 content: output,
1110 is_error: false,
1111 });
1112 continue;
1113 }
1114 if tc.name == "batch" {
1116 let invocations = input_value
1117 .get("invocations")
1118 .and_then(|v| v.as_array())
1119 .cloned()
1120 .unwrap_or_default();
1121 tracing::debug!("batch: {} invocations", invocations.len());
1122 let results: Vec<serde_json::Value> = invocations
1123 .iter()
1124 .map(|inv| {
1125 let name = inv.get("tool_name").and_then(|v| v.as_str()).unwrap_or("");
1126 let input = inv.get("input").cloned().unwrap_or(serde_json::Value::Null);
1127 match self.tools.execute(name, input) {
1128 Ok(out) => serde_json::json!({ "tool_name": name, "result": out, "is_error": false }),
1129 Err(e) => serde_json::json!({ "tool_name": name, "result": e.to_string(), "is_error": true }),
1130 }
1131 })
1132 .collect();
1133 let output = serde_json::to_string(&results).unwrap_or_else(|e| e.to_string());
1134 let _ = event_tx.send(AgentEvent::ToolCallResult {
1135 id: tc.id.clone(),
1136 name: tc.name.clone(),
1137 output: output.clone(),
1138 is_error: false,
1139 });
1140 result_blocks.push(ContentBlock::ToolResult {
1141 tool_use_id: tc.id.clone(),
1142 content: output,
1143 is_error: false,
1144 });
1145 continue;
1146 }
1147 if tc.name == "subagent" {
1149 let description = input_value
1150 .get("description")
1151 .and_then(|v| v.as_str())
1152 .unwrap_or("subtask")
1153 .to_string();
1154 let task = input_value
1155 .get("task")
1156 .and_then(|v| v.as_str())
1157 .unwrap_or("")
1158 .to_string();
1159 let profile = input_value
1160 .get("profile")
1161 .and_then(|v| v.as_str())
1162 .map(String::from);
1163 let background = input_value
1164 .get("background")
1165 .and_then(|v| v.as_bool())
1166 .unwrap_or(false);
1167
1168 let output = if background {
1169 match self.spawn_background_subagent(
1170 &description,
1171 &task,
1172 profile.as_deref(),
1173 ) {
1174 Ok(id) => format!("Background subagent launched with id: {id}"),
1175 Err(e) => {
1176 tracing::error!("background subagent error: {e}");
1177 format!("[subagent error: {e}]")
1178 }
1179 }
1180 } else {
1181 match self
1182 .run_subagent(&description, &task, profile.as_deref(), &event_tx)
1183 .await
1184 {
1185 Ok(text) => text,
1186 Err(e) => {
1187 tracing::error!("subagent error: {e}");
1188 format!("[subagent error: {e}]")
1189 }
1190 }
1191 };
1192 let is_error = output.starts_with("[subagent error:");
1193 let _ = event_tx.send(AgentEvent::ToolCallResult {
1194 id: tc.id.clone(),
1195 name: tc.name.clone(),
1196 output: output.clone(),
1197 is_error,
1198 });
1199 result_blocks.push(ContentBlock::ToolResult {
1200 tool_use_id: tc.id.clone(),
1201 content: output,
1202 is_error,
1203 });
1204 continue;
1205 }
1206 if tc.name == "subagent_result" {
1208 let id = input_value.get("id").and_then(|v| v.as_str()).unwrap_or("");
1209 let output = {
1210 let results = self
1211 .background_results
1212 .lock()
1213 .unwrap_or_else(|e| e.into_inner());
1214 if let Some(result) = results.get(id) {
1215 result.clone()
1216 } else if self.background_handles.contains_key(id) {
1217 format!("Subagent '{id}' is still running.")
1218 } else {
1219 format!("No subagent found with id '{id}'.")
1220 }
1221 };
1222 let _ = event_tx.send(AgentEvent::ToolCallResult {
1223 id: tc.id.clone(),
1224 name: tc.name.clone(),
1225 output: output.clone(),
1226 is_error: false,
1227 });
1228 result_blocks.push(ContentBlock::ToolResult {
1229 tool_use_id: tc.id.clone(),
1230 content: output,
1231 is_error: false,
1232 });
1233 continue;
1234 }
1235 if let Some(ref store) = self.memory
1237 && let Some((output, is_error)) = crate::memory::tools::handle(
1238 &tc.name,
1239 &input_value,
1240 store,
1241 &self.conversation_id,
1242 )
1243 {
1244 let _ = event_tx.send(AgentEvent::ToolCallResult {
1245 id: tc.id.clone(),
1246 name: tc.name.clone(),
1247 output: output.clone(),
1248 is_error,
1249 });
1250 result_blocks.push(ContentBlock::ToolResult {
1251 tool_use_id: tc.id.clone(),
1252 content: output,
1253 is_error,
1254 });
1255 continue;
1256 }
1257 let perm = self
1259 .permissions
1260 .get(&tc.name)
1261 .map(|s| s.as_str())
1262 .unwrap_or("allow");
1263 if perm == "deny" {
1264 let output = format!("Tool '{}' is denied by permissions config.", tc.name);
1265 let _ = event_tx.send(AgentEvent::ToolCallResult {
1266 id: tc.id.clone(),
1267 name: tc.name.clone(),
1268 output: output.clone(),
1269 is_error: true,
1270 });
1271 result_blocks.push(ContentBlock::ToolResult {
1272 tool_use_id: tc.id.clone(),
1273 content: output,
1274 is_error: true,
1275 });
1276 continue;
1277 }
1278 if perm == "ask" {
1279 let summary = format!("{}: {}", tc.name, &tc.input[..tc.input.len().min(100)]);
1280 let (ptx, prx) = tokio::sync::oneshot::channel();
1281 let _ = event_tx.send(AgentEvent::PermissionRequest {
1282 tool_name: tc.name.clone(),
1283 input_summary: summary,
1284 responder: QuestionResponder(ptx),
1285 });
1286 let answer = match prx.await {
1287 Ok(a) => a,
1288 Err(_) => "deny".to_string(),
1289 };
1290 if answer != "allow" {
1291 let output = format!("Tool '{}' denied by user.", tc.name);
1292 let _ = event_tx.send(AgentEvent::ToolCallResult {
1293 id: tc.id.clone(),
1294 name: tc.name.clone(),
1295 output: output.clone(),
1296 is_error: true,
1297 });
1298 result_blocks.push(ContentBlock::ToolResult {
1299 tool_use_id: tc.id.clone(),
1300 content: output,
1301 is_error: true,
1302 });
1303 continue;
1304 }
1305 }
1306 if tc.name == "write_file" || tc.name == "apply_patch" {
1308 if tc.name == "write_file" {
1309 if let Some(path) = input_value.get("path").and_then(|v| v.as_str()) {
1310 self.snapshots.before_write(path);
1311 }
1312 } else if let Some(patches) =
1313 input_value.get("patches").and_then(|v| v.as_array())
1314 {
1315 for patch in patches {
1316 if let Some(path) = patch.get("path").and_then(|v| v.as_str()) {
1317 self.snapshots.before_write(path);
1318 }
1319 }
1320 }
1321 }
1322 {
1323 let mut ctx = self.event_context(&Event::BeforeToolCall);
1324 ctx.tool_name = Some(tc.name.clone());
1325 ctx.tool_input = Some(tc.input.clone());
1326 match self.hooks.emit_blocking(&Event::BeforeToolCall, &ctx) {
1327 HookResult::Block(reason) => {
1328 let output = format!("[blocked by hook: {}]", reason.trim());
1329 let _ = event_tx.send(AgentEvent::ToolCallResult {
1330 id: tc.id.clone(),
1331 name: tc.name.clone(),
1332 output: output.clone(),
1333 is_error: true,
1334 });
1335 result_blocks.push(ContentBlock::ToolResult {
1336 tool_use_id: tc.id.clone(),
1337 content: output,
1338 is_error: true,
1339 });
1340 continue;
1341 }
1342 HookResult::Modify(_modified) => {}
1343 HookResult::Allow => {}
1344 }
1345 }
1346 let _ = event_tx.send(AgentEvent::ToolCallExecuting {
1347 id: tc.id.clone(),
1348 name: tc.name.clone(),
1349 input: tc.input.clone(),
1350 });
1351 let tool_name = tc.name.clone();
1352 let tool_input = input_value.clone();
1353 let exec_result = tokio::time::timeout(std::time::Duration::from_secs(30), async {
1354 tokio::task::block_in_place(|| self.tools.execute(&tool_name, tool_input))
1355 })
1356 .await;
1357 let (output, is_error) = match exec_result {
1358 Err(_elapsed) => {
1359 let msg = format!("Tool '{}' timed out after 30 seconds.", tc.name);
1360 let mut ctx = self.event_context(&Event::OnToolError);
1361 ctx.tool_name = Some(tc.name.clone());
1362 ctx.error = Some(msg.clone());
1363 self.hooks.emit(&Event::OnToolError, &ctx);
1364 (msg, true)
1365 }
1366 Ok(Err(e)) => {
1367 let msg = e.to_string();
1368 let mut ctx = self.event_context(&Event::OnToolError);
1369 ctx.tool_name = Some(tc.name.clone());
1370 ctx.error = Some(msg.clone());
1371 self.hooks.emit(&Event::OnToolError, &ctx);
1372 (msg, true)
1373 }
1374 Ok(Ok(out)) => (out, false),
1375 };
1376 tracing::debug!(
1377 "Tool '{}' result (error={}): {}",
1378 tc.name,
1379 is_error,
1380 &output[..output.len().min(200)]
1381 );
1382 let _ = self.db.update_tool_result(&tc.id, &output, is_error);
1383 let _ = event_tx.send(AgentEvent::ToolCallResult {
1384 id: tc.id.clone(),
1385 name: tc.name.clone(),
1386 output: output.clone(),
1387 is_error,
1388 });
1389 {
1390 let mut ctx = self.event_context(&Event::AfterToolCall);
1391 ctx.tool_name = Some(tc.name.clone());
1392 ctx.tool_output = Some(output.clone());
1393 self.hooks.emit(&Event::AfterToolCall, &ctx);
1394 }
1395 result_blocks.push(ContentBlock::ToolResult {
1396 tool_use_id: tc.id.clone(),
1397 content: output,
1398 is_error,
1399 });
1400 }
1401
1402 self.messages.push(Message {
1403 role: Role::User,
1404 content: result_blocks,
1405 });
1406 }
1407
1408 let title = if let Some(mut rx) = title_rx {
1409 let mut raw = String::new();
1410 while let Some(event) = rx.recv().await {
1411 match event.event_type {
1412 StreamEventType::TextDelta(text) => raw.push_str(&text),
1413 StreamEventType::Error(e) => {
1414 tracing::warn!("title stream error: {e}");
1415 }
1416 _ => {}
1417 }
1418 }
1419 let t = raw
1420 .trim()
1421 .trim_matches('"')
1422 .trim_matches('`')
1423 .trim_matches('*')
1424 .replace('\n', " ");
1425 let t: String = t.chars().take(50).collect();
1426 if t.is_empty() {
1427 tracing::warn!("title stream returned empty text");
1428 None
1429 } else {
1430 Some(t)
1431 }
1432 } else {
1433 None
1434 };
1435 let fallback = || -> String {
1436 self.messages
1437 .first()
1438 .and_then(|m| {
1439 m.content.iter().find_map(|b| {
1440 if let ContentBlock::Text(t) = b {
1441 let s: String = t.chars().take(50).collect();
1442 let s = s.trim().to_string();
1443 if s.is_empty() { None } else { Some(s) }
1444 } else {
1445 None
1446 }
1447 })
1448 })
1449 .unwrap_or_else(|| "Chat".to_string())
1450 };
1451 let title = title.unwrap_or_else(fallback);
1452 let _ = self
1453 .db
1454 .update_conversation_title(&self.conversation_id, &title);
1455 let _ = event_tx.send(AgentEvent::TitleGenerated(title.clone()));
1456 {
1457 let mut ctx = self.event_context(&Event::OnTitleGenerated);
1458 ctx.title = Some(title);
1459 self.hooks.emit(&Event::OnTitleGenerated, &ctx);
1460 }
1461
1462 Ok(())
1463 }
1464}