1mod events;
2mod profile;
3mod subagent;
4
5pub use events::{AgentEvent, QuestionResponder, TodoItem, TodoStatus};
6pub use profile::AgentProfile;
7
8use events::PendingToolCall;
9
10use crate::command::CommandRegistry;
11use crate::config::Config;
12use crate::db::Db;
13use crate::extension::{Event, EventContext, HookRegistry, HookResult};
14use crate::memory::MemoryStore;
15use crate::provider::{ContentBlock, Message, Provider, Role, StreamEventType, Usage};
16use crate::tools::ToolRegistry;
17use anyhow::{Context, Result};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::mpsc::UnboundedSender;
21use uuid::Uuid;
22
23const COMPACT_THRESHOLD: f32 = 0.8;
24const COMPACT_KEEP_MESSAGES: usize = 10;
25
26const MEMORY_INSTRUCTIONS: &str = "\n\n\
27# Memory
28
29You have persistent memory across conversations. **Core blocks** (above) are always visible — update them via `core_memory_update` for essential user/agent facts. **Archival memory** is searched per turn — use `memory_add`/`memory_search`/`memory_list`/`memory_delete` to manage it.
30
31When the user says \"remember\"/\"forget\"/\"what do you know about me\", use the appropriate memory tool. Memories are also auto-extracted in the background, so focus on explicit requests.";
32
33#[derive(Debug, Clone)]
35pub struct InterruptedToolCall {
36 pub name: String,
37 pub input: String,
38 pub output: Option<String>,
39 pub is_error: bool,
40}
41
42const TITLE_SYSTEM_PROMPT: &str = "\
43You are a title generator. You output ONLY a thread title. Nothing else.
44
45Generate a brief title that would help the user find this conversation later.
46
47Rules:
48- A single line, 50 characters or fewer
49- No explanations, no quotes, no punctuation wrapping
50- Use the same language as the user message
51- Title must be grammatically correct and read naturally
52- Never include tool names (e.g. read tool, bash tool, edit tool)
53- Focus on the main topic or question the user wants to retrieve
54- Vary your phrasing — avoid repetitive patterns like always starting with \"Analyzing\"
55- When a file is mentioned, focus on WHAT the user wants to do WITH the file
56- Keep exact: technical terms, numbers, filenames, HTTP codes
57- Remove filler words: the, this, my, a, an
58- If the user message is short or conversational (e.g. \"hello\", \"hey\"): \
59 create a title reflecting the user's tone (Greeting, Quick check-in, etc.)";
60
61pub struct Agent {
62 providers: Vec<Arc<dyn Provider>>,
63 active: usize,
64 tools: Arc<ToolRegistry>,
65 db: Db,
66 memory: Option<Arc<MemoryStore>>,
67 memory_auto_extract: bool,
68 memory_inject_count: usize,
69 conversation_id: String,
70 persisted: bool,
71 messages: Vec<Message>,
72 profiles: Vec<AgentProfile>,
73 active_profile: usize,
74 pub thinking_budget: u32,
75 cwd: String,
76 agents_context: crate::context::AgentsContext,
77 last_input_tokens: u32,
78 permissions: HashMap<String, String>,
79 snapshots: crate::snapshot::SnapshotManager,
80 hooks: HookRegistry,
81 commands: CommandRegistry,
82 subagent_enabled: bool,
83 subagent_max_turns: usize,
84 background_results: Arc<std::sync::Mutex<HashMap<String, String>>>,
85 background_handles: HashMap<String, tokio::task::JoinHandle<()>>,
86 background_tx: Option<UnboundedSender<AgentEvent>>,
87}
88
89impl Agent {
90 #[allow(clippy::too_many_arguments)]
91 pub fn new(
92 providers: Vec<Box<dyn Provider>>,
93 db: Db,
94 config: &Config,
95 memory: Option<Arc<MemoryStore>>,
96 tools: ToolRegistry,
97 profiles: Vec<AgentProfile>,
98 cwd: String,
99 agents_context: crate::context::AgentsContext,
100 hooks: HookRegistry,
101 commands: CommandRegistry,
102 ) -> Result<Self> {
103 assert!(!providers.is_empty(), "at least one provider required");
104 let providers: Vec<Arc<dyn Provider>> = providers.into_iter().map(Arc::from).collect();
105 let conversation_id = uuid::Uuid::new_v4().to_string();
106 tracing::debug!("Agent created with conversation {}", conversation_id);
107 let mut profiles = if profiles.is_empty() {
108 vec![AgentProfile::default_profile()]
109 } else {
110 profiles
111 };
112 if !profiles.iter().any(|p| p.name == "plan") {
113 let at = 1.min(profiles.len());
114 profiles.insert(at, AgentProfile::plan_profile());
115 }
116 Ok(Agent {
117 providers,
118 active: 0,
119 tools: Arc::new(tools),
120 db,
121 memory,
122 memory_auto_extract: config.memory.auto_extract,
123 memory_inject_count: config.memory.inject_count,
124 conversation_id,
125 persisted: false,
126 messages: Vec::new(),
127 profiles,
128 active_profile: 0,
129 thinking_budget: 0,
130 cwd,
131 agents_context,
132 last_input_tokens: 0,
133 permissions: config.permissions.clone(),
134 snapshots: crate::snapshot::SnapshotManager::new(),
135 hooks,
136 commands,
137 subagent_enabled: config.subagents.enabled,
138 subagent_max_turns: config.subagents.max_turns,
139 background_results: Arc::new(std::sync::Mutex::new(HashMap::new())),
140 background_handles: HashMap::new(),
141 background_tx: None,
142 })
143 }
144 fn ensure_persisted(&mut self) -> Result<()> {
145 if !self.persisted {
146 self.db.create_conversation_with_id(
147 &self.conversation_id,
148 self.provider().model(),
149 self.provider().name(),
150 &self.cwd,
151 )?;
152 self.persisted = true;
153 }
154 Ok(())
155 }
156 fn provider(&self) -> &dyn Provider {
157 &*self.providers[self.active]
158 }
159 fn provider_arc(&self) -> Arc<dyn Provider> {
160 Arc::clone(&self.providers[self.active])
161 }
162 pub fn aside_provider(&self) -> Arc<dyn Provider> {
163 Arc::clone(&self.providers[self.active])
164 }
165 pub fn set_background_tx(&mut self, tx: UnboundedSender<AgentEvent>) {
166 self.background_tx = Some(tx);
167 }
168 pub fn background_tx(&self) -> Option<UnboundedSender<AgentEvent>> {
169 self.background_tx.clone()
170 }
171 fn event_context(&self, event: &Event) -> EventContext {
172 EventContext {
173 event: event.as_str().to_string(),
174 model: self.provider().model().to_string(),
175 provider: self.provider().name().to_string(),
176 cwd: self.cwd.clone(),
177 session_id: self.conversation_id.clone(),
178 ..Default::default()
179 }
180 }
181 pub fn execute_command(&self, name: &str, args: &str) -> Result<String> {
182 self.commands.execute(name, args, &self.cwd)
183 }
184 pub fn list_commands(&self) -> Vec<(&str, &str)> {
185 self.commands.list()
186 }
187 pub fn has_command(&self, name: &str) -> bool {
188 self.commands.has(name)
189 }
190 pub fn hooks(&self) -> &HookRegistry {
191 &self.hooks
192 }
193 fn profile(&self) -> &AgentProfile {
194 &self.profiles[self.active_profile]
195 }
196 pub fn conversation_id(&self) -> &str {
197 &self.conversation_id
198 }
199 pub fn messages(&self) -> &[Message] {
200 &self.messages
201 }
202 pub fn set_model(&mut self, model: String) {
203 if let Some(p) = Arc::get_mut(&mut self.providers[self.active]) {
204 p.set_model(model);
205 } else {
206 tracing::warn!("cannot change model while background subagent is active");
207 }
208 }
209 pub fn set_active_provider(&mut self, provider_name: &str, model: &str) {
210 if let Some(idx) = self
211 .providers
212 .iter()
213 .position(|p| p.name() == provider_name)
214 {
215 self.active = idx;
216 if let Some(p) = Arc::get_mut(&mut self.providers[idx]) {
217 p.set_model(model.to_string());
218 } else {
219 tracing::warn!("cannot change model while background subagent is active");
220 }
221 }
222 }
223 pub fn set_thinking_budget(&mut self, budget: u32) {
224 self.thinking_budget = budget;
225 }
226 pub fn available_models(&self) -> Vec<String> {
227 self.provider().available_models()
228 }
229 pub fn cached_all_models(&self) -> Vec<(String, Vec<String>)> {
230 self.providers
231 .iter()
232 .map(|p| (p.name().to_string(), p.available_models()))
233 .collect()
234 }
235 pub async fn fetch_all_models(&mut self) -> Vec<(String, Vec<String>)> {
236 let mut result = Vec::new();
237 for p in &self.providers {
238 let models = match p.fetch_models().await {
239 Ok(m) => m,
240 Err(e) => {
241 tracing::warn!("Failed to fetch models for {}: {e}", p.name());
242 Vec::new()
243 }
244 };
245 result.push((p.name().to_string(), models));
246 }
247 result
248 }
249 pub fn current_model(&self) -> &str {
250 self.provider().model()
251 }
252 pub fn current_provider_name(&self) -> &str {
253 self.provider().name()
254 }
255 pub fn current_agent_name(&self) -> &str {
256 &self.profile().name
257 }
258 pub fn context_window(&self) -> u32 {
259 self.provider().context_window()
260 }
261 pub async fn fetch_context_window(&self) -> u32 {
262 match self.provider().fetch_context_window().await {
263 Ok(cw) => cw,
264 Err(e) => {
265 tracing::warn!("Failed to fetch context window: {e}");
266 0
267 }
268 }
269 }
270 pub fn agent_profiles(&self) -> &[AgentProfile] {
271 &self.profiles
272 }
273 pub fn switch_agent(&mut self, name: &str) -> bool {
274 if let Some(idx) = self.profiles.iter().position(|p| p.name == name) {
275 self.active_profile = idx;
276 let model_spec = self.profiles[idx].model_spec.clone();
277
278 if let Some(spec) = model_spec {
279 let (provider, model) = Config::parse_model_spec(&spec);
280 if let Some(prov) = provider {
281 self.set_active_provider(prov, model);
282 } else {
283 self.set_model(model.to_string());
284 }
285 }
286 tracing::info!("Switched to agent '{}'", name);
287 true
288 } else {
289 false
290 }
291 }
292 pub fn cleanup_if_empty(&mut self) {
293 if self.messages.is_empty() && self.persisted {
294 let _ = self.db.delete_conversation(&self.conversation_id);
295 }
296 }
297 pub fn new_conversation(&mut self) -> Result<()> {
298 self.cleanup_if_empty();
299 self.conversation_id = uuid::Uuid::new_v4().to_string();
300 self.persisted = false;
301 self.messages.clear();
302 Ok(())
303 }
304 pub fn resume_conversation(&mut self, conversation: &crate::db::Conversation) -> Result<()> {
305 self.conversation_id = conversation.id.clone();
306 self.persisted = true;
307 self.messages = conversation
308 .messages
309 .iter()
310 .map(|m| Message {
311 role: if m.role == "user" {
312 Role::User
313 } else {
314 Role::Assistant
315 },
316 content: vec![ContentBlock::Text(m.content.clone())],
317 })
318 .collect();
319 tracing::debug!("Resumed conversation {}", conversation.id);
320 {
321 let ctx = self.event_context(&Event::OnResume);
322 self.hooks.emit(&Event::OnResume, &ctx);
323 }
324 Ok(())
325 }
326
327 pub fn add_interrupted_message(
330 &mut self,
331 content: String,
332 tool_calls: Vec<InterruptedToolCall>,
333 thinking: Option<String>,
334 ) -> Result<()> {
335 let mut blocks: Vec<ContentBlock> = Vec::new();
336 if let Some(t) = thinking
337 && !t.is_empty()
338 {
339 blocks.push(ContentBlock::Thinking {
340 thinking: t,
341 signature: String::new(),
342 });
343 }
344 if !content.is_empty() {
345 blocks.push(ContentBlock::Text(content.clone()));
346 }
347 let mut tool_ids: Vec<String> = Vec::new();
348 for tc in &tool_calls {
349 let id = Uuid::new_v4().to_string();
350 tool_ids.push(id.clone());
351 let input_value: serde_json::Value =
352 serde_json::from_str(&tc.input).unwrap_or_else(|_| serde_json::json!({}));
353 blocks.push(ContentBlock::ToolUse {
354 id: id.clone(),
355 name: tc.name.clone(),
356 input: input_value,
357 });
358 }
359 for (tc, id) in tool_calls.iter().zip(tool_ids.iter()) {
360 blocks.push(ContentBlock::ToolResult {
361 tool_use_id: id.clone(),
362 content: tc.output.clone().unwrap_or_default(),
363 is_error: tc.is_error,
364 });
365 }
366 self.messages.push(Message {
367 role: Role::Assistant,
368 content: blocks,
369 });
370 let stored_text = if content.is_empty() {
371 String::from("[tool use]")
372 } else {
373 content
374 };
375 self.ensure_persisted()?;
376 let assistant_msg_id =
377 self.db
378 .add_message(&self.conversation_id, "assistant", &stored_text)?;
379 for (tc, id) in tool_calls.iter().zip(tool_ids.iter()) {
380 let _ = self
381 .db
382 .add_tool_call(&assistant_msg_id, id, &tc.name, &tc.input);
383 if let Some(ref output) = tc.output {
384 let _ = self.db.update_tool_result(id, output, tc.is_error);
385 }
386 }
387 Ok(())
388 }
389 pub fn list_sessions(&self) -> Result<Vec<crate::db::ConversationSummary>> {
390 self.db.list_conversations_for_cwd(&self.cwd, 50)
391 }
392 pub fn get_session(&self, id: &str) -> Result<crate::db::Conversation> {
393 self.db.get_conversation(id)
394 }
395 pub fn conversation_title(&self) -> Option<String> {
396 self.db
397 .get_conversation(&self.conversation_id)
398 .ok()
399 .and_then(|c| c.title)
400 }
401 pub fn rename_session(&self, title: &str) -> Result<()> {
402 self.db
403 .update_conversation_title(&self.conversation_id, title)
404 .context("failed to rename session")
405 }
406 pub fn cwd(&self) -> &str {
407 &self.cwd
408 }
409
410 pub fn truncate_messages(&mut self, count: usize) {
411 let target = count.min(self.messages.len());
412 self.messages.truncate(target);
413 }
414
415 pub fn revert_to_message(&mut self, keep: usize) -> Result<Vec<String>> {
416 let keep = keep.min(self.messages.len());
417 let checkpoint_idx = self.messages[..keep]
418 .iter()
419 .filter(|m| m.role == Role::Assistant)
420 .count();
421 self.messages.truncate(keep);
422 self.db
423 .truncate_messages(&self.conversation_id, keep)
424 .context("truncating db messages")?;
425 let restored = if checkpoint_idx > 0 {
426 let res = self.snapshots.restore_to_checkpoint(checkpoint_idx - 1)?;
427 self.snapshots.truncate_checkpoints(checkpoint_idx);
428 res
429 } else {
430 let res = self.snapshots.restore_all()?;
431 self.snapshots.truncate_checkpoints(0);
432 res
433 };
434 Ok(restored)
435 }
436
437 pub fn fork_conversation(&mut self, msg_count: usize) -> Result<()> {
438 let kept = self.messages[..msg_count.min(self.messages.len())].to_vec();
439 self.cleanup_if_empty();
440 let conversation_id = self.db.create_conversation(
441 self.provider().model(),
442 self.provider().name(),
443 &self.cwd,
444 )?;
445 self.conversation_id = conversation_id;
446 self.persisted = true;
447 self.messages = kept;
448 for msg in &self.messages {
449 let role = match msg.role {
450 Role::User => "user",
451 Role::Assistant => "assistant",
452 Role::System => "system",
453 };
454 let text: String = msg
455 .content
456 .iter()
457 .filter_map(|b| {
458 if let ContentBlock::Text(t) = b {
459 Some(t.as_str())
460 } else {
461 None
462 }
463 })
464 .collect::<Vec<_>>()
465 .join("\n");
466 if !text.is_empty() {
467 let _ = self.db.add_message(&self.conversation_id, role, &text);
468 }
469 }
470 Ok(())
471 }
472
473 fn title_model(&self) -> &str {
474 self.provider().model()
475 }
476
477 fn should_compact(&self) -> bool {
478 let limit = self.provider().context_window();
479 let threshold = (limit as f32 * COMPACT_THRESHOLD) as u32;
480 self.last_input_tokens >= threshold
481 }
482 fn emit_compact_hooks(&self, phase: &Event) {
483 let ctx = self.event_context(phase);
484 self.hooks.emit(phase, &ctx);
485 }
486 async fn compact(&mut self, event_tx: &UnboundedSender<AgentEvent>) -> Result<()> {
487 let keep = COMPACT_KEEP_MESSAGES;
488 if self.messages.len() <= keep + 2 {
489 return Ok(());
490 }
491 let cutoff = self.messages.len() - keep;
492 let old_messages = self.messages[..cutoff].to_vec();
493 let kept = self.messages[cutoff..].to_vec();
494
495 let mut summary_text = String::new();
496 for msg in &old_messages {
497 let role = match msg.role {
498 Role::User => "User",
499 Role::Assistant => "Assistant",
500 Role::System => "System",
501 };
502 for block in &msg.content {
503 if let ContentBlock::Text(t) = block {
504 summary_text.push_str(&format!("{}:\n{}\n\n", role, t));
505 }
506 }
507 }
508 let summary_request = vec![Message {
509 role: Role::User,
510 content: vec![ContentBlock::Text(format!(
511 "Summarize the following conversation history concisely, preserving all key decisions, facts, code changes, and context that would be needed to continue the work:\n\n{}",
512 summary_text
513 ))],
514 }];
515
516 self.emit_compact_hooks(&Event::BeforeCompact);
517 let mut stream_rx = self
518 .provider()
519 .stream(
520 &summary_request,
521 Some("You are a concise summarizer. Produce a dense, factual summary."),
522 &[],
523 4096,
524 0,
525 )
526 .await?;
527 let mut full_summary = String::new();
528 while let Some(event) = stream_rx.recv().await {
529 if let StreamEventType::TextDelta(text) = event.event_type {
530 full_summary.push_str(&text);
531 }
532 }
533 self.messages = vec![
534 Message {
535 role: Role::User,
536 content: vec![ContentBlock::Text(
537 "[Previous conversation summarized below]".to_string(),
538 )],
539 },
540 Message {
541 role: Role::Assistant,
542 content: vec![ContentBlock::Text(format!(
543 "Summary of prior context:\n\n{}",
544 full_summary
545 ))],
546 },
547 ];
548 self.messages.extend(kept);
549
550 let _ = self.db.add_message(
551 &self.conversation_id,
552 "assistant",
553 &format!("[Compacted {} messages into summary]", cutoff),
554 );
555 self.last_input_tokens = 0;
556 let _ = event_tx.send(AgentEvent::Compacted {
557 messages_removed: cutoff,
558 });
559 self.emit_compact_hooks(&Event::AfterCompact);
560 Ok(())
561 }
562 fn sanitize(&mut self) {
570 loop {
571 let dominated = match self.messages.last() {
572 None => false,
573 Some(msg) if msg.role == Role::User => {
574 !msg.content.is_empty()
575 && msg
576 .content
577 .iter()
578 .all(|b| matches!(b, ContentBlock::ToolResult { .. }))
579 }
580 Some(msg) if msg.role == Role::Assistant => msg
581 .content
582 .iter()
583 .any(|b| matches!(b, ContentBlock::ToolUse { .. })),
584 _ => false,
585 };
586 if dominated {
587 self.messages.pop();
588 } else {
589 break;
590 }
591 }
592 if matches!(self.messages.last(), Some(msg) if msg.role == Role::User) {
596 self.messages.pop();
597 }
598 }
599
600 pub async fn send_message(
601 &mut self,
602 content: &str,
603 event_tx: UnboundedSender<AgentEvent>,
604 ) -> Result<()> {
605 self.send_message_with_images(content, Vec::new(), event_tx)
606 .await
607 }
608
609 pub async fn send_message_with_images(
610 &mut self,
611 content: &str,
612 images: Vec<(String, String)>,
613 event_tx: UnboundedSender<AgentEvent>,
614 ) -> Result<()> {
615 self.sanitize();
616 {
617 let mut ctx = self.event_context(&Event::OnUserInput);
618 ctx.prompt = Some(content.to_string());
619 self.hooks.emit(&Event::OnUserInput, &ctx);
620 }
621 if !self.provider().supports_server_compaction() && self.should_compact() {
622 self.compact(&event_tx).await?;
623 }
624 self.ensure_persisted()?;
625 self.db
626 .add_message(&self.conversation_id, "user", content)?;
627 let mut blocks: Vec<ContentBlock> = Vec::new();
628 for (media_type, data) in images {
629 blocks.push(ContentBlock::Image { media_type, data });
630 }
631 blocks.push(ContentBlock::Text(content.to_string()));
632 self.messages.push(Message {
633 role: Role::User,
634 content: blocks,
635 });
636 let title_rx = if self.messages.len() == 1 {
637 let preview: String = content.chars().take(50).collect();
638 let preview = preview.trim().to_string();
639 if !preview.is_empty() {
640 let _ = self
641 .db
642 .update_conversation_title(&self.conversation_id, &preview);
643 let _ = event_tx.send(AgentEvent::TitleGenerated(preview));
644 }
645 let title_messages = vec![Message {
646 role: Role::User,
647 content: vec![ContentBlock::Text(format!(
648 "Generate a title for this conversation:\n\n{}",
649 content
650 ))],
651 }];
652 match self
653 .provider()
654 .stream_with_model(
655 self.title_model(),
656 &title_messages,
657 Some(TITLE_SYSTEM_PROMPT),
658 &[],
659 100,
660 0,
661 )
662 .await
663 {
664 Ok(rx) => Some(rx),
665 Err(e) => {
666 tracing::warn!("title generation stream failed: {e}");
667 None
668 }
669 }
670 } else {
671 None
672 };
673 let mut final_usage: Option<Usage> = None;
674 let mut system_prompt = self
675 .agents_context
676 .apply_to_system_prompt(&self.profile().system_prompt);
677 if let Some(ref store) = self.memory {
678 let query: String = content.chars().take(200).collect();
679 match store.inject_context(&query, self.memory_inject_count) {
680 Ok(ctx) if !ctx.is_empty() => {
681 system_prompt.push_str("\n\n");
682 system_prompt.push_str(&ctx);
683 }
684 Err(e) => tracing::warn!("memory injection failed: {e}"),
685 _ => {}
686 }
687 system_prompt.push_str(MEMORY_INSTRUCTIONS);
688 }
689 let tool_filter = self.profile().tool_filter.clone();
690 let thinking_budget = self.thinking_budget;
691 loop {
692 let mut tool_defs = self.tools.definitions_filtered(&tool_filter);
693 tool_defs.push(crate::provider::ToolDefinition {
694 name: "todo_write".to_string(),
695 description: "Create or update the task list for the current session. Use to track progress on multi-step tasks.".to_string(),
696 input_schema: serde_json::json!({
697 "type": "object",
698 "properties": {
699 "todos": {
700 "type": "array",
701 "items": {
702 "type": "object",
703 "properties": {
704 "content": { "type": "string", "description": "Brief description of the task" },
705 "status": { "type": "string", "enum": ["pending", "in_progress", "completed"], "description": "Current status" }
706 },
707 "required": ["content", "status"]
708 }
709 }
710 },
711 "required": ["todos"]
712 }),
713 });
714 tool_defs.push(crate::provider::ToolDefinition {
715 name: "question".to_string(),
716 description: "Ask the user a question and wait for their response. Use when you need clarification or a decision from the user.".to_string(),
717 input_schema: serde_json::json!({
718 "type": "object",
719 "properties": {
720 "question": { "type": "string", "description": "The question to ask the user" },
721 "options": { "type": "array", "items": { "type": "string" }, "description": "Optional list of choices" }
722 },
723 "required": ["question"]
724 }),
725 });
726 tool_defs.push(crate::provider::ToolDefinition {
727 name: "snapshot_list".to_string(),
728 description: "List all files that have been created or modified in this session."
729 .to_string(),
730 input_schema: serde_json::json!({
731 "type": "object",
732 "properties": {},
733 }),
734 });
735 tool_defs.push(crate::provider::ToolDefinition {
736 name: "snapshot_restore".to_string(),
737 description: "Restore a file to its original state before this session modified it. Pass a path or omit to restore all files.".to_string(),
738 input_schema: serde_json::json!({
739 "type": "object",
740 "properties": {
741 "path": { "type": "string", "description": "File path to restore (omit to restore all)" }
742 },
743 }),
744 });
745 if self.subagent_enabled {
746 let profile_names: Vec<String> =
747 self.profiles.iter().map(|p| p.name.clone()).collect();
748 let profiles_desc = if profile_names.is_empty() {
749 String::new()
750 } else {
751 format!(" Available profiles: {}.", profile_names.join(", "))
752 };
753 tool_defs.push(crate::provider::ToolDefinition {
754 name: "subagent".to_string(),
755 description: format!(
756 "Delegate a focused task to a subagent that runs in isolated context with its own conversation. \
757 The subagent has access to tools and works autonomously without user interaction. \
758 Use for complex subtasks that benefit from separate context (research, code analysis, multi-file changes). \
759 Set background=true to run non-blocking (returns immediately with an ID; retrieve results later with subagent_result).{}",
760 profiles_desc
761 ),
762 input_schema: serde_json::json!({
763 "type": "object",
764 "properties": {
765 "description": {
766 "type": "string",
767 "description": "What the subagent should do (used as system prompt context)"
768 },
769 "task": {
770 "type": "string",
771 "description": "The specific task prompt for the subagent"
772 },
773 "profile": {
774 "type": "string",
775 "description": "Agent profile to use (affects available tools and system prompt)"
776 },
777 "background": {
778 "type": "boolean",
779 "description": "Run in background (non-blocking). Returns an ID to check later with subagent_result."
780 }
781 },
782 "required": ["description", "task"]
783 }),
784 });
785 tool_defs.push(crate::provider::ToolDefinition {
786 name: "subagent_result".to_string(),
787 description: "Retrieve the result of a background subagent by ID. Returns the output if complete, or a status message if still running.".to_string(),
788 input_schema: serde_json::json!({
789 "type": "object",
790 "properties": {
791 "id": {
792 "type": "string",
793 "description": "The subagent ID returned when it was launched in background mode"
794 }
795 },
796 "required": ["id"]
797 }),
798 });
799 }
800 if self.memory.is_some() {
801 tool_defs.extend(crate::memory::tools::definitions());
802 }
803 {
804 let mut ctx = self.event_context(&Event::BeforePrompt);
805 ctx.prompt = Some(content.to_string());
806 match self.hooks.emit_blocking(&Event::BeforePrompt, &ctx) {
807 HookResult::Block(reason) => {
808 let _ = event_tx.send(AgentEvent::TextComplete(format!(
809 "[blocked by hook: {}]",
810 reason.trim()
811 )));
812 return Ok(());
813 }
814 HookResult::Modify(_modified) => {}
815 HookResult::Allow => {}
816 }
817 }
818 self.hooks.emit(
819 &Event::OnStreamStart,
820 &self.event_context(&Event::OnStreamStart),
821 );
822 let mut stream_rx = self
823 .provider()
824 .stream(
825 &self.messages,
826 Some(&system_prompt),
827 &tool_defs,
828 8192,
829 thinking_budget,
830 )
831 .await?;
832 self.hooks.emit(
833 &Event::OnStreamEnd,
834 &self.event_context(&Event::OnStreamEnd),
835 );
836 let mut full_text = String::new();
837 let mut full_thinking = String::new();
838 let mut full_thinking_signature = String::new();
839 let mut compaction_content: Option<String> = None;
840 let mut tool_calls: Vec<PendingToolCall> = Vec::new();
841 let mut current_tool_input = String::new();
842 while let Some(event) = stream_rx.recv().await {
843 match event.event_type {
844 StreamEventType::TextDelta(text) => {
845 full_text.push_str(&text);
846 let _ = event_tx.send(AgentEvent::TextDelta(text));
847 }
848 StreamEventType::ThinkingDelta(text) => {
849 full_thinking.push_str(&text);
850 let _ = event_tx.send(AgentEvent::ThinkingDelta(text));
851 }
852 StreamEventType::ThinkingComplete {
853 thinking,
854 signature,
855 } => {
856 full_thinking = thinking;
857 full_thinking_signature = signature;
858 }
859 StreamEventType::CompactionComplete(content) => {
860 let _ = event_tx.send(AgentEvent::Compacting);
861 compaction_content = Some(content);
862 }
863 StreamEventType::ToolUseStart { id, name } => {
864 current_tool_input.clear();
865 let _ = event_tx.send(AgentEvent::ToolCallStart {
866 id: id.clone(),
867 name: name.clone(),
868 });
869 tool_calls.push(PendingToolCall {
870 id,
871 name,
872 input: String::new(),
873 });
874 }
875 StreamEventType::ToolUseInputDelta(delta) => {
876 current_tool_input.push_str(&delta);
877 let _ = event_tx.send(AgentEvent::ToolCallInputDelta(delta));
878 }
879 StreamEventType::ToolUseEnd => {
880 if let Some(tc) = tool_calls.last_mut() {
881 tc.input = current_tool_input.clone();
882 }
883 current_tool_input.clear();
884 }
885 StreamEventType::MessageEnd {
886 stop_reason: _,
887 usage,
888 } => {
889 self.last_input_tokens = usage.input_tokens;
890 let _ = self
891 .db
892 .update_last_input_tokens(&self.conversation_id, usage.input_tokens);
893 final_usage = Some(usage);
894 }
895
896 _ => {}
897 }
898 }
899
900 let mut content_blocks: Vec<ContentBlock> = Vec::new();
901 if let Some(ref summary) = compaction_content {
902 content_blocks.push(ContentBlock::Compaction {
903 content: summary.clone(),
904 });
905 }
906 if !full_thinking.is_empty() {
907 content_blocks.push(ContentBlock::Thinking {
908 thinking: full_thinking.clone(),
909 signature: full_thinking_signature.clone(),
910 });
911 }
912 if !full_text.is_empty() {
913 content_blocks.push(ContentBlock::Text(full_text.clone()));
914 }
915
916 for tc in &tool_calls {
917 let input_value: serde_json::Value =
918 serde_json::from_str(&tc.input).unwrap_or_else(|_| serde_json::json!({}));
919 content_blocks.push(ContentBlock::ToolUse {
920 id: tc.id.clone(),
921 name: tc.name.clone(),
922 input: input_value,
923 });
924 }
925
926 self.messages.push(Message {
927 role: Role::Assistant,
928 content: content_blocks,
929 });
930 let stored_text = if !full_text.is_empty() {
931 full_text.clone()
932 } else {
933 String::from("[tool use]")
934 };
935 let assistant_msg_id =
936 self.db
937 .add_message(&self.conversation_id, "assistant", &stored_text)?;
938 for tc in &tool_calls {
939 let _ = self
940 .db
941 .add_tool_call(&assistant_msg_id, &tc.id, &tc.name, &tc.input);
942 }
943 {
944 let mut ctx = self.event_context(&Event::AfterPrompt);
945 ctx.prompt = Some(full_text.clone());
946 self.hooks.emit(&Event::AfterPrompt, &ctx);
947 }
948 self.snapshots.checkpoint();
949 if tool_calls.is_empty() {
950 let _ = event_tx.send(AgentEvent::TextComplete(full_text));
951 if let Some(usage) = final_usage {
952 let _ = event_tx.send(AgentEvent::Done { usage });
953 }
954 if self.memory_auto_extract
955 && let Some(ref store) = self.memory
956 {
957 let msgs = self.messages.clone();
958 let provider = self.provider_arc();
959 let store = Arc::clone(store);
960 let conv_id = self.conversation_id.clone();
961 let etx = event_tx.clone();
962 tokio::spawn(async move {
963 match crate::memory::extract::extract(&msgs, &*provider, &store, &conv_id)
964 .await
965 {
966 Ok(result)
967 if result.added > 0 || result.updated > 0 || result.deleted > 0 =>
968 {
969 let _ = etx.send(AgentEvent::MemoryExtracted {
970 added: result.added,
971 updated: result.updated,
972 deleted: result.deleted,
973 });
974 }
975 Err(e) => tracing::warn!("memory extraction failed: {e}"),
976 _ => {}
977 }
978 });
979 }
980 break;
981 }
982
983 let mut result_blocks: Vec<ContentBlock> = Vec::new();
984
985 for tc in &tool_calls {
986 let input_value: serde_json::Value =
987 serde_json::from_str(&tc.input).unwrap_or_else(|_| serde_json::json!({}));
988 if tc.name == "todo_write" {
990 if let Some(todos_arr) = input_value.get("todos").and_then(|v| v.as_array()) {
991 let items: Vec<TodoItem> = todos_arr
992 .iter()
993 .filter_map(|t| {
994 let content = t.get("content")?.as_str()?.to_string();
995 let status = match t
996 .get("status")
997 .and_then(|s| s.as_str())
998 .unwrap_or("pending")
999 {
1000 "in_progress" => TodoStatus::InProgress,
1001 "completed" => TodoStatus::Completed,
1002 _ => TodoStatus::Pending,
1003 };
1004 Some(TodoItem { content, status })
1005 })
1006 .collect();
1007 let _ = event_tx.send(AgentEvent::TodoUpdate(items));
1008 }
1009 let _ = event_tx.send(AgentEvent::ToolCallResult {
1010 id: tc.id.clone(),
1011 name: tc.name.clone(),
1012 output: "ok".to_string(),
1013 is_error: false,
1014 });
1015 result_blocks.push(ContentBlock::ToolResult {
1016 tool_use_id: tc.id.clone(),
1017 content: "ok".to_string(),
1018 is_error: false,
1019 });
1020 continue;
1021 }
1022 if tc.name == "question" {
1024 let question = input_value
1025 .get("question")
1026 .and_then(|v| v.as_str())
1027 .unwrap_or("?")
1028 .to_string();
1029 let options: Vec<String> = input_value
1030 .get("options")
1031 .and_then(|v| v.as_array())
1032 .map(|arr| {
1033 arr.iter()
1034 .filter_map(|v| v.as_str().map(String::from))
1035 .collect()
1036 })
1037 .unwrap_or_default();
1038 let (tx, rx) = tokio::sync::oneshot::channel();
1039 let _ = event_tx.send(AgentEvent::Question {
1040 id: tc.id.clone(),
1041 question: question.clone(),
1042 options,
1043 responder: QuestionResponder(tx),
1044 });
1045 let answer = match rx.await {
1046 Ok(a) => a,
1047 Err(_) => "[cancelled]".to_string(),
1048 };
1049 let _ = event_tx.send(AgentEvent::ToolCallResult {
1050 id: tc.id.clone(),
1051 name: tc.name.clone(),
1052 output: answer.clone(),
1053 is_error: false,
1054 });
1055 result_blocks.push(ContentBlock::ToolResult {
1056 tool_use_id: tc.id.clone(),
1057 content: answer,
1058 is_error: false,
1059 });
1060 continue;
1061 }
1062 if tc.name == "snapshot_list" {
1064 let changes = self.snapshots.list_changes();
1065 let output = if changes.is_empty() {
1066 "No file changes in this session.".to_string()
1067 } else {
1068 changes
1069 .iter()
1070 .map(|(p, k)| format!("{} {}", k.icon(), p))
1071 .collect::<Vec<_>>()
1072 .join("\n")
1073 };
1074 let _ = event_tx.send(AgentEvent::ToolCallResult {
1075 id: tc.id.clone(),
1076 name: tc.name.clone(),
1077 output: output.clone(),
1078 is_error: false,
1079 });
1080 result_blocks.push(ContentBlock::ToolResult {
1081 tool_use_id: tc.id.clone(),
1082 content: output,
1083 is_error: false,
1084 });
1085 continue;
1086 }
1087 if tc.name == "snapshot_restore" {
1089 let output =
1090 if let Some(path) = input_value.get("path").and_then(|v| v.as_str()) {
1091 match self.snapshots.restore(path) {
1092 Ok(msg) => msg,
1093 Err(e) => e.to_string(),
1094 }
1095 } else {
1096 match self.snapshots.restore_all() {
1097 Ok(msgs) => {
1098 if msgs.is_empty() {
1099 "Nothing to restore.".to_string()
1100 } else {
1101 msgs.join("\n")
1102 }
1103 }
1104 Err(e) => e.to_string(),
1105 }
1106 };
1107 let _ = event_tx.send(AgentEvent::ToolCallResult {
1108 id: tc.id.clone(),
1109 name: tc.name.clone(),
1110 output: output.clone(),
1111 is_error: false,
1112 });
1113 result_blocks.push(ContentBlock::ToolResult {
1114 tool_use_id: tc.id.clone(),
1115 content: output,
1116 is_error: false,
1117 });
1118 continue;
1119 }
1120 if tc.name == "batch" {
1122 let invocations = input_value
1123 .get("invocations")
1124 .and_then(|v| v.as_array())
1125 .cloned()
1126 .unwrap_or_default();
1127 tracing::debug!("batch: {} invocations", invocations.len());
1128 let results: Vec<serde_json::Value> = invocations
1129 .iter()
1130 .map(|inv| {
1131 let name = inv.get("tool_name").and_then(|v| v.as_str()).unwrap_or("");
1132 let input = inv.get("input").cloned().unwrap_or(serde_json::Value::Null);
1133 match self.tools.execute(name, input) {
1134 Ok(out) => serde_json::json!({ "tool_name": name, "result": out, "is_error": false }),
1135 Err(e) => serde_json::json!({ "tool_name": name, "result": e.to_string(), "is_error": true }),
1136 }
1137 })
1138 .collect();
1139 let output = serde_json::to_string(&results).unwrap_or_else(|e| e.to_string());
1140 let _ = event_tx.send(AgentEvent::ToolCallResult {
1141 id: tc.id.clone(),
1142 name: tc.name.clone(),
1143 output: output.clone(),
1144 is_error: false,
1145 });
1146 result_blocks.push(ContentBlock::ToolResult {
1147 tool_use_id: tc.id.clone(),
1148 content: output,
1149 is_error: false,
1150 });
1151 continue;
1152 }
1153 if tc.name == "subagent" {
1155 let description = input_value
1156 .get("description")
1157 .and_then(|v| v.as_str())
1158 .unwrap_or("subtask")
1159 .to_string();
1160 let task = input_value
1161 .get("task")
1162 .and_then(|v| v.as_str())
1163 .unwrap_or("")
1164 .to_string();
1165 let profile = input_value
1166 .get("profile")
1167 .and_then(|v| v.as_str())
1168 .map(String::from);
1169 let background = input_value
1170 .get("background")
1171 .and_then(|v| v.as_bool())
1172 .unwrap_or(false);
1173
1174 let output = if background {
1175 match self.spawn_background_subagent(
1176 &description,
1177 &task,
1178 profile.as_deref(),
1179 ) {
1180 Ok(id) => format!("Background subagent launched with id: {id}"),
1181 Err(e) => {
1182 tracing::error!("background subagent error: {e}");
1183 format!("[subagent error: {e}]")
1184 }
1185 }
1186 } else {
1187 match self
1188 .run_subagent(&description, &task, profile.as_deref(), &event_tx)
1189 .await
1190 {
1191 Ok(text) => text,
1192 Err(e) => {
1193 tracing::error!("subagent error: {e}");
1194 format!("[subagent error: {e}]")
1195 }
1196 }
1197 };
1198 let is_error = output.starts_with("[subagent error:");
1199 let _ = event_tx.send(AgentEvent::ToolCallResult {
1200 id: tc.id.clone(),
1201 name: tc.name.clone(),
1202 output: output.clone(),
1203 is_error,
1204 });
1205 result_blocks.push(ContentBlock::ToolResult {
1206 tool_use_id: tc.id.clone(),
1207 content: output,
1208 is_error,
1209 });
1210 continue;
1211 }
1212 if tc.name == "subagent_result" {
1214 let id = input_value.get("id").and_then(|v| v.as_str()).unwrap_or("");
1215 let output = {
1216 let results = self
1217 .background_results
1218 .lock()
1219 .unwrap_or_else(|e| e.into_inner());
1220 if let Some(result) = results.get(id) {
1221 result.clone()
1222 } else if self.background_handles.contains_key(id) {
1223 format!("Subagent '{id}' is still running.")
1224 } else {
1225 format!("No subagent found with id '{id}'.")
1226 }
1227 };
1228 let _ = event_tx.send(AgentEvent::ToolCallResult {
1229 id: tc.id.clone(),
1230 name: tc.name.clone(),
1231 output: output.clone(),
1232 is_error: false,
1233 });
1234 result_blocks.push(ContentBlock::ToolResult {
1235 tool_use_id: tc.id.clone(),
1236 content: output,
1237 is_error: false,
1238 });
1239 continue;
1240 }
1241 if let Some(ref store) = self.memory
1243 && let Some((output, is_error)) = crate::memory::tools::handle(
1244 &tc.name,
1245 &input_value,
1246 store,
1247 &self.conversation_id,
1248 )
1249 {
1250 let _ = event_tx.send(AgentEvent::ToolCallResult {
1251 id: tc.id.clone(),
1252 name: tc.name.clone(),
1253 output: output.clone(),
1254 is_error,
1255 });
1256 result_blocks.push(ContentBlock::ToolResult {
1257 tool_use_id: tc.id.clone(),
1258 content: output,
1259 is_error,
1260 });
1261 continue;
1262 }
1263 let perm = self
1265 .permissions
1266 .get(&tc.name)
1267 .map(|s| s.as_str())
1268 .unwrap_or("allow");
1269 if perm == "deny" {
1270 let output = format!("Tool '{}' is denied by permissions config.", tc.name);
1271 let _ = event_tx.send(AgentEvent::ToolCallResult {
1272 id: tc.id.clone(),
1273 name: tc.name.clone(),
1274 output: output.clone(),
1275 is_error: true,
1276 });
1277 result_blocks.push(ContentBlock::ToolResult {
1278 tool_use_id: tc.id.clone(),
1279 content: output,
1280 is_error: true,
1281 });
1282 continue;
1283 }
1284 if perm == "ask" {
1285 let summary = format!("{}: {}", tc.name, &tc.input[..tc.input.len().min(100)]);
1286 let (ptx, prx) = tokio::sync::oneshot::channel();
1287 let _ = event_tx.send(AgentEvent::PermissionRequest {
1288 tool_name: tc.name.clone(),
1289 input_summary: summary,
1290 responder: QuestionResponder(ptx),
1291 });
1292 let answer = match prx.await {
1293 Ok(a) => a,
1294 Err(_) => "deny".to_string(),
1295 };
1296 if answer != "allow" {
1297 let output = format!("Tool '{}' denied by user.", tc.name);
1298 let _ = event_tx.send(AgentEvent::ToolCallResult {
1299 id: tc.id.clone(),
1300 name: tc.name.clone(),
1301 output: output.clone(),
1302 is_error: true,
1303 });
1304 result_blocks.push(ContentBlock::ToolResult {
1305 tool_use_id: tc.id.clone(),
1306 content: output,
1307 is_error: true,
1308 });
1309 continue;
1310 }
1311 }
1312 if tc.name == "write_file" || tc.name == "apply_patch" {
1314 if tc.name == "write_file" {
1315 if let Some(path) = input_value.get("path").and_then(|v| v.as_str()) {
1316 self.snapshots.before_write(path);
1317 }
1318 } else if let Some(patches) =
1319 input_value.get("patches").and_then(|v| v.as_array())
1320 {
1321 for patch in patches {
1322 if let Some(path) = patch.get("path").and_then(|v| v.as_str()) {
1323 self.snapshots.before_write(path);
1324 }
1325 }
1326 }
1327 }
1328 {
1329 let mut ctx = self.event_context(&Event::BeforeToolCall);
1330 ctx.tool_name = Some(tc.name.clone());
1331 ctx.tool_input = Some(tc.input.clone());
1332 match self.hooks.emit_blocking(&Event::BeforeToolCall, &ctx) {
1333 HookResult::Block(reason) => {
1334 let output = format!("[blocked by hook: {}]", reason.trim());
1335 let _ = event_tx.send(AgentEvent::ToolCallResult {
1336 id: tc.id.clone(),
1337 name: tc.name.clone(),
1338 output: output.clone(),
1339 is_error: true,
1340 });
1341 result_blocks.push(ContentBlock::ToolResult {
1342 tool_use_id: tc.id.clone(),
1343 content: output,
1344 is_error: true,
1345 });
1346 continue;
1347 }
1348 HookResult::Modify(_modified) => {}
1349 HookResult::Allow => {}
1350 }
1351 }
1352 let _ = event_tx.send(AgentEvent::ToolCallExecuting {
1353 id: tc.id.clone(),
1354 name: tc.name.clone(),
1355 input: tc.input.clone(),
1356 });
1357 let tool_name = tc.name.clone();
1358 let tool_input = input_value.clone();
1359 let exec_result = tokio::time::timeout(std::time::Duration::from_secs(30), async {
1360 tokio::task::block_in_place(|| self.tools.execute(&tool_name, tool_input))
1361 })
1362 .await;
1363 let (output, is_error) = match exec_result {
1364 Err(_elapsed) => {
1365 let msg = format!("Tool '{}' timed out after 30 seconds.", tc.name);
1366 let mut ctx = self.event_context(&Event::OnToolError);
1367 ctx.tool_name = Some(tc.name.clone());
1368 ctx.error = Some(msg.clone());
1369 self.hooks.emit(&Event::OnToolError, &ctx);
1370 (msg, true)
1371 }
1372 Ok(Err(e)) => {
1373 let msg = e.to_string();
1374 let mut ctx = self.event_context(&Event::OnToolError);
1375 ctx.tool_name = Some(tc.name.clone());
1376 ctx.error = Some(msg.clone());
1377 self.hooks.emit(&Event::OnToolError, &ctx);
1378 (msg, true)
1379 }
1380 Ok(Ok(out)) => (out, false),
1381 };
1382 tracing::debug!(
1383 "Tool '{}' result (error={}): {}",
1384 tc.name,
1385 is_error,
1386 &output[..output.len().min(200)]
1387 );
1388 let _ = self.db.update_tool_result(&tc.id, &output, is_error);
1389 let _ = event_tx.send(AgentEvent::ToolCallResult {
1390 id: tc.id.clone(),
1391 name: tc.name.clone(),
1392 output: output.clone(),
1393 is_error,
1394 });
1395 {
1396 let mut ctx = self.event_context(&Event::AfterToolCall);
1397 ctx.tool_name = Some(tc.name.clone());
1398 ctx.tool_output = Some(output.clone());
1399 self.hooks.emit(&Event::AfterToolCall, &ctx);
1400 }
1401 result_blocks.push(ContentBlock::ToolResult {
1402 tool_use_id: tc.id.clone(),
1403 content: output,
1404 is_error,
1405 });
1406 }
1407
1408 self.messages.push(Message {
1409 role: Role::User,
1410 content: result_blocks,
1411 });
1412 }
1413
1414 let title = if let Some(mut rx) = title_rx {
1415 let mut raw = String::new();
1416 while let Some(event) = rx.recv().await {
1417 match event.event_type {
1418 StreamEventType::TextDelta(text) => raw.push_str(&text),
1419 StreamEventType::Error(e) => {
1420 tracing::warn!("title stream error: {e}");
1421 }
1422 _ => {}
1423 }
1424 }
1425 let t = raw
1426 .trim()
1427 .trim_matches('"')
1428 .trim_matches('`')
1429 .trim_matches('*')
1430 .replace('\n', " ");
1431 let t: String = t.chars().take(50).collect();
1432 if t.is_empty() {
1433 tracing::warn!("title stream returned empty text");
1434 None
1435 } else {
1436 Some(t)
1437 }
1438 } else {
1439 None
1440 };
1441 let fallback = || -> String {
1442 self.messages
1443 .first()
1444 .and_then(|m| {
1445 m.content.iter().find_map(|b| {
1446 if let ContentBlock::Text(t) = b {
1447 let s: String = t.chars().take(50).collect();
1448 let s = s.trim().to_string();
1449 if s.is_empty() { None } else { Some(s) }
1450 } else {
1451 None
1452 }
1453 })
1454 })
1455 .unwrap_or_else(|| "Chat".to_string())
1456 };
1457 let title = title.unwrap_or_else(fallback);
1458 let _ = self
1459 .db
1460 .update_conversation_title(&self.conversation_id, &title);
1461 let _ = event_tx.send(AgentEvent::TitleGenerated(title.clone()));
1462 {
1463 let mut ctx = self.event_context(&Event::OnTitleGenerated);
1464 ctx.title = Some(title);
1465 self.hooks.emit(&Event::OnTitleGenerated, &ctx);
1466 }
1467
1468 Ok(())
1469 }
1470}