1mod events;
2mod profile;
3mod subagent;
4
5pub use events::{AgentEvent, QuestionResponder, TodoItem, TodoStatus};
6pub use profile::AgentProfile;
7
8use events::PendingToolCall;
9
10use crate::command::CommandRegistry;
11use crate::config::Config;
12use crate::db::Db;
13use crate::extension::{Event, EventContext, HookRegistry, HookResult};
14use crate::memory::MemoryStore;
15use crate::provider::{ContentBlock, Message, Provider, Role, StreamEventType, Usage};
16use crate::tools::ToolRegistry;
17use anyhow::{Context, Result};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::mpsc::UnboundedSender;
21
22const COMPACT_THRESHOLD: f32 = 0.8;
23const COMPACT_KEEP_MESSAGES: usize = 10;
24
25const MEMORY_INSTRUCTIONS: &str = "\n\n\
26# Memory
27
28You have persistent memory across conversations. **Core blocks** (above) are always visible — update them via `core_memory_update` for essential user/agent facts. **Archival memory** is searched per turn — use `memory_add`/`memory_search`/`memory_list`/`memory_delete` to manage it.
29
30When the user says \"remember\"/\"forget\"/\"what do you know about me\", use the appropriate memory tool. Memories are also auto-extracted in the background, so focus on explicit requests.";
31
32const TITLE_SYSTEM_PROMPT: &str = "\
33You are a title generator. You output ONLY a thread title. Nothing else.
34
35Generate a brief title that would help the user find this conversation later.
36
37Rules:
38- A single line, 50 characters or fewer
39- No explanations, no quotes, no punctuation wrapping
40- Use the same language as the user message
41- Title must be grammatically correct and read naturally
42- Never include tool names (e.g. read tool, bash tool, edit tool)
43- Focus on the main topic or question the user wants to retrieve
44- Vary your phrasing — avoid repetitive patterns like always starting with \"Analyzing\"
45- When a file is mentioned, focus on WHAT the user wants to do WITH the file
46- Keep exact: technical terms, numbers, filenames, HTTP codes
47- Remove filler words: the, this, my, a, an
48- If the user message is short or conversational (e.g. \"hello\", \"hey\"): \
49 create a title reflecting the user's tone (Greeting, Quick check-in, etc.)";
50
51pub struct Agent {
52 providers: Vec<Arc<dyn Provider>>,
53 active: usize,
54 tools: Arc<ToolRegistry>,
55 db: Db,
56 memory: Option<Arc<MemoryStore>>,
57 memory_auto_extract: bool,
58 memory_inject_count: usize,
59 conversation_id: String,
60 messages: Vec<Message>,
61 profiles: Vec<AgentProfile>,
62 active_profile: usize,
63 pub thinking_budget: u32,
64 cwd: String,
65 agents_context: crate::context::AgentsContext,
66 last_input_tokens: u32,
67 permissions: HashMap<String, String>,
68 snapshots: crate::snapshot::SnapshotManager,
69 hooks: HookRegistry,
70 commands: CommandRegistry,
71 subagent_enabled: bool,
72 subagent_max_turns: usize,
73 background_results: Arc<std::sync::Mutex<HashMap<String, String>>>,
74 background_handles: HashMap<String, tokio::task::JoinHandle<()>>,
75 background_tx: Option<UnboundedSender<AgentEvent>>,
76}
77
78impl Agent {
79 #[allow(clippy::too_many_arguments)]
80 pub fn new(
81 providers: Vec<Box<dyn Provider>>,
82 db: Db,
83 config: &Config,
84 memory: Option<Arc<MemoryStore>>,
85 tools: ToolRegistry,
86 profiles: Vec<AgentProfile>,
87 cwd: String,
88 agents_context: crate::context::AgentsContext,
89 hooks: HookRegistry,
90 commands: CommandRegistry,
91 ) -> Result<Self> {
92 assert!(!providers.is_empty(), "at least one provider required");
93 let providers: Vec<Arc<dyn Provider>> = providers.into_iter().map(Arc::from).collect();
94 let conversation_id =
95 db.create_conversation(providers[0].model(), providers[0].name(), &cwd)?;
96 tracing::debug!("Agent created with conversation {}", conversation_id);
97 let mut profiles = if profiles.is_empty() {
98 vec![AgentProfile::default_profile()]
99 } else {
100 profiles
101 };
102 if !profiles.iter().any(|p| p.name == "plan") {
103 let at = 1.min(profiles.len());
104 profiles.insert(at, AgentProfile::plan_profile());
105 }
106 Ok(Agent {
107 providers,
108 active: 0,
109 tools: Arc::new(tools),
110 db,
111 memory,
112 memory_auto_extract: config.memory.auto_extract,
113 memory_inject_count: config.memory.inject_count,
114 conversation_id,
115 messages: Vec::new(),
116 profiles,
117 active_profile: 0,
118 thinking_budget: 0,
119 cwd,
120 agents_context,
121 last_input_tokens: 0,
122 permissions: config.permissions.clone(),
123 snapshots: crate::snapshot::SnapshotManager::new(),
124 hooks,
125 commands,
126 subagent_enabled: config.subagents.enabled,
127 subagent_max_turns: config.subagents.max_turns,
128 background_results: Arc::new(std::sync::Mutex::new(HashMap::new())),
129 background_handles: HashMap::new(),
130 background_tx: None,
131 })
132 }
133 fn provider(&self) -> &dyn Provider {
134 &*self.providers[self.active]
135 }
136 fn provider_arc(&self) -> Arc<dyn Provider> {
137 Arc::clone(&self.providers[self.active])
138 }
139 pub fn set_background_tx(&mut self, tx: UnboundedSender<AgentEvent>) {
140 self.background_tx = Some(tx);
141 }
142 fn event_context(&self, event: &Event) -> EventContext {
143 EventContext {
144 event: event.as_str().to_string(),
145 model: self.provider().model().to_string(),
146 provider: self.provider().name().to_string(),
147 cwd: self.cwd.clone(),
148 session_id: self.conversation_id.clone(),
149 ..Default::default()
150 }
151 }
152 pub fn execute_command(&self, name: &str, args: &str) -> Result<String> {
153 self.commands.execute(name, args, &self.cwd)
154 }
155 pub fn list_commands(&self) -> Vec<(&str, &str)> {
156 self.commands.list()
157 }
158 pub fn has_command(&self, name: &str) -> bool {
159 self.commands.has(name)
160 }
161 pub fn hooks(&self) -> &HookRegistry {
162 &self.hooks
163 }
164 fn profile(&self) -> &AgentProfile {
165 &self.profiles[self.active_profile]
166 }
167 pub fn conversation_id(&self) -> &str {
168 &self.conversation_id
169 }
170 pub fn messages(&self) -> &[Message] {
171 &self.messages
172 }
173 pub fn set_model(&mut self, model: String) {
174 if let Some(p) = Arc::get_mut(&mut self.providers[self.active]) {
175 p.set_model(model);
176 } else {
177 tracing::warn!("cannot change model while background subagent is active");
178 }
179 }
180 pub fn set_active_provider(&mut self, provider_name: &str, model: &str) {
181 if let Some(idx) = self
182 .providers
183 .iter()
184 .position(|p| p.name() == provider_name)
185 {
186 self.active = idx;
187 if let Some(p) = Arc::get_mut(&mut self.providers[idx]) {
188 p.set_model(model.to_string());
189 } else {
190 tracing::warn!("cannot change model while background subagent is active");
191 }
192 }
193 }
194 pub fn set_thinking_budget(&mut self, budget: u32) {
195 self.thinking_budget = budget;
196 }
197 pub fn available_models(&self) -> Vec<String> {
198 self.provider().available_models()
199 }
200 pub async fn fetch_all_models(&self) -> Vec<(String, Vec<String>)> {
201 let mut result = Vec::new();
202 for p in &self.providers {
203 let models = match p.fetch_models().await {
204 Ok(m) => m,
205 Err(e) => {
206 tracing::warn!("Failed to fetch models for {}: {e}", p.name());
207 Vec::new()
208 }
209 };
210 result.push((p.name().to_string(), models));
211 }
212 result
213 }
214 pub fn current_model(&self) -> &str {
215 self.provider().model()
216 }
217 pub fn current_provider_name(&self) -> &str {
218 self.provider().name()
219 }
220 pub fn current_agent_name(&self) -> &str {
221 &self.profile().name
222 }
223 pub fn context_window(&self) -> u32 {
224 self.provider().context_window()
225 }
226 pub async fn fetch_context_window(&self) -> u32 {
227 match self.provider().fetch_context_window().await {
228 Ok(cw) => cw,
229 Err(e) => {
230 tracing::warn!("Failed to fetch context window: {e}");
231 0
232 }
233 }
234 }
235 pub fn agent_profiles(&self) -> &[AgentProfile] {
236 &self.profiles
237 }
238 pub fn switch_agent(&mut self, name: &str) -> bool {
239 if let Some(idx) = self.profiles.iter().position(|p| p.name == name) {
240 self.active_profile = idx;
241 let model_spec = self.profiles[idx].model_spec.clone();
242
243 if let Some(spec) = model_spec {
244 let (provider, model) = Config::parse_model_spec(&spec);
245 if let Some(prov) = provider {
246 self.set_active_provider(prov, model);
247 } else {
248 self.set_model(model.to_string());
249 }
250 }
251 tracing::info!("Switched to agent '{}'", name);
252 true
253 } else {
254 false
255 }
256 }
257 pub fn cleanup_if_empty(&mut self) {
258 if self.messages.is_empty() {
259 let _ = self.db.delete_conversation(&self.conversation_id);
260 }
261 }
262 pub fn new_conversation(&mut self) -> Result<()> {
263 self.cleanup_if_empty();
264 let conversation_id = self.db.create_conversation(
265 self.provider().model(),
266 self.provider().name(),
267 &self.cwd,
268 )?;
269 self.conversation_id = conversation_id;
270 self.messages.clear();
271 Ok(())
272 }
273 pub fn resume_conversation(&mut self, conversation: &crate::db::Conversation) -> Result<()> {
274 self.conversation_id = conversation.id.clone();
275 self.messages = conversation
276 .messages
277 .iter()
278 .map(|m| Message {
279 role: if m.role == "user" {
280 Role::User
281 } else {
282 Role::Assistant
283 },
284 content: vec![ContentBlock::Text(m.content.clone())],
285 })
286 .collect();
287 tracing::debug!("Resumed conversation {}", conversation.id);
288 {
289 let ctx = self.event_context(&Event::OnResume);
290 self.hooks.emit(&Event::OnResume, &ctx);
291 }
292 Ok(())
293 }
294 pub fn list_sessions(&self) -> Result<Vec<crate::db::ConversationSummary>> {
295 self.db.list_conversations_for_cwd(&self.cwd, 50)
296 }
297 pub fn get_session(&self, id: &str) -> Result<crate::db::Conversation> {
298 self.db.get_conversation(id)
299 }
300 pub fn conversation_title(&self) -> Option<String> {
301 self.db
302 .get_conversation(&self.conversation_id)
303 .ok()
304 .and_then(|c| c.title)
305 }
306 pub fn rename_session(&self, title: &str) -> Result<()> {
307 self.db
308 .update_conversation_title(&self.conversation_id, title)
309 .context("failed to rename session")
310 }
311 pub fn cwd(&self) -> &str {
312 &self.cwd
313 }
314
315 pub fn truncate_messages(&mut self, count: usize) {
316 let target = count.min(self.messages.len());
317 self.messages.truncate(target);
318 }
319
320 pub fn revert_to_message(&mut self, keep: usize) -> Result<Vec<String>> {
321 let keep = keep.min(self.messages.len());
322 let checkpoint_idx = self.messages[..keep]
323 .iter()
324 .filter(|m| m.role == Role::Assistant)
325 .count();
326 self.messages.truncate(keep);
327 self.db
328 .truncate_messages(&self.conversation_id, keep)
329 .context("truncating db messages")?;
330 let restored = if checkpoint_idx > 0 {
331 let res = self.snapshots.restore_to_checkpoint(checkpoint_idx - 1)?;
332 self.snapshots.truncate_checkpoints(checkpoint_idx);
333 res
334 } else {
335 let res = self.snapshots.restore_all()?;
336 self.snapshots.truncate_checkpoints(0);
337 res
338 };
339 Ok(restored)
340 }
341
342 pub fn fork_conversation(&mut self, msg_count: usize) -> Result<()> {
343 let kept = self.messages[..msg_count.min(self.messages.len())].to_vec();
344 self.cleanup_if_empty();
345 let conversation_id = self.db.create_conversation(
346 self.provider().model(),
347 self.provider().name(),
348 &self.cwd,
349 )?;
350 self.conversation_id = conversation_id;
351 self.messages = kept;
352 for msg in &self.messages {
353 let role = match msg.role {
354 Role::User => "user",
355 Role::Assistant => "assistant",
356 Role::System => "system",
357 };
358 let text: String = msg
359 .content
360 .iter()
361 .filter_map(|b| {
362 if let ContentBlock::Text(t) = b {
363 Some(t.as_str())
364 } else {
365 None
366 }
367 })
368 .collect::<Vec<_>>()
369 .join("\n");
370 if !text.is_empty() {
371 let _ = self.db.add_message(&self.conversation_id, role, &text);
372 }
373 }
374 Ok(())
375 }
376
377 fn title_model(&self) -> &str {
378 self.provider().model()
379 }
380
381 fn should_compact(&self) -> bool {
382 let limit = self.provider().context_window();
383 let threshold = (limit as f32 * COMPACT_THRESHOLD) as u32;
384 self.last_input_tokens >= threshold
385 }
386 fn emit_compact_hooks(&self, phase: &Event) {
387 let ctx = self.event_context(phase);
388 self.hooks.emit(phase, &ctx);
389 }
390 async fn compact(&mut self, event_tx: &UnboundedSender<AgentEvent>) -> Result<()> {
391 let keep = COMPACT_KEEP_MESSAGES;
392 if self.messages.len() <= keep + 2 {
393 return Ok(());
394 }
395 let cutoff = self.messages.len() - keep;
396 let old_messages = self.messages[..cutoff].to_vec();
397 let kept = self.messages[cutoff..].to_vec();
398
399 let mut summary_text = String::new();
400 for msg in &old_messages {
401 let role = match msg.role {
402 Role::User => "User",
403 Role::Assistant => "Assistant",
404 Role::System => "System",
405 };
406 for block in &msg.content {
407 if let ContentBlock::Text(t) = block {
408 summary_text.push_str(&format!("{}:\n{}\n\n", role, t));
409 }
410 }
411 }
412 let summary_request = vec![Message {
413 role: Role::User,
414 content: vec![ContentBlock::Text(format!(
415 "Summarize the following conversation history concisely, preserving all key decisions, facts, code changes, and context that would be needed to continue the work:\n\n{}",
416 summary_text
417 ))],
418 }];
419
420 self.emit_compact_hooks(&Event::BeforeCompact);
421 let mut stream_rx = self
422 .provider()
423 .stream(
424 &summary_request,
425 Some("You are a concise summarizer. Produce a dense, factual summary."),
426 &[],
427 4096,
428 0,
429 )
430 .await?;
431 let mut full_summary = String::new();
432 while let Some(event) = stream_rx.recv().await {
433 if let StreamEventType::TextDelta(text) = event.event_type {
434 full_summary.push_str(&text);
435 }
436 }
437 self.messages = vec![
438 Message {
439 role: Role::User,
440 content: vec![ContentBlock::Text(
441 "[Previous conversation summarized below]".to_string(),
442 )],
443 },
444 Message {
445 role: Role::Assistant,
446 content: vec![ContentBlock::Text(format!(
447 "Summary of prior context:\n\n{}",
448 full_summary
449 ))],
450 },
451 ];
452 self.messages.extend(kept);
453
454 let _ = self.db.add_message(
455 &self.conversation_id,
456 "assistant",
457 &format!("[Compacted {} messages into summary]", cutoff),
458 );
459 self.last_input_tokens = 0;
460 let _ = event_tx.send(AgentEvent::Compacted {
461 messages_removed: cutoff,
462 });
463 self.emit_compact_hooks(&Event::AfterCompact);
464 Ok(())
465 }
466 fn sanitize(&mut self) {
474 loop {
475 let dominated = match self.messages.last() {
476 None => false,
477 Some(msg) if msg.role == Role::User => {
478 !msg.content.is_empty()
479 && msg
480 .content
481 .iter()
482 .all(|b| matches!(b, ContentBlock::ToolResult { .. }))
483 }
484 Some(msg) if msg.role == Role::Assistant => msg
485 .content
486 .iter()
487 .any(|b| matches!(b, ContentBlock::ToolUse { .. })),
488 _ => false,
489 };
490 if dominated {
491 self.messages.pop();
492 } else {
493 break;
494 }
495 }
496 if matches!(self.messages.last(), Some(msg) if msg.role == Role::User) {
500 self.messages.pop();
501 }
502 }
503
504 pub async fn send_message(
505 &mut self,
506 content: &str,
507 event_tx: UnboundedSender<AgentEvent>,
508 ) -> Result<()> {
509 self.send_message_with_images(content, Vec::new(), event_tx)
510 .await
511 }
512
513 pub async fn send_message_with_images(
514 &mut self,
515 content: &str,
516 images: Vec<(String, String)>,
517 event_tx: UnboundedSender<AgentEvent>,
518 ) -> Result<()> {
519 self.sanitize();
520 {
521 let mut ctx = self.event_context(&Event::OnUserInput);
522 ctx.prompt = Some(content.to_string());
523 self.hooks.emit(&Event::OnUserInput, &ctx);
524 }
525 if !self.provider().supports_server_compaction() && self.should_compact() {
526 self.compact(&event_tx).await?;
527 }
528 self.db
529 .add_message(&self.conversation_id, "user", content)?;
530 let mut blocks: Vec<ContentBlock> = Vec::new();
531 for (media_type, data) in images {
532 blocks.push(ContentBlock::Image { media_type, data });
533 }
534 blocks.push(ContentBlock::Text(content.to_string()));
535 self.messages.push(Message {
536 role: Role::User,
537 content: blocks,
538 });
539 let title_rx = if self.messages.len() == 1 {
540 let preview: String = content.chars().take(50).collect();
541 let preview = preview.trim().to_string();
542 if !preview.is_empty() {
543 let _ = self
544 .db
545 .update_conversation_title(&self.conversation_id, &preview);
546 let _ = event_tx.send(AgentEvent::TitleGenerated(preview));
547 }
548 let title_messages = vec![Message {
549 role: Role::User,
550 content: vec![ContentBlock::Text(format!(
551 "Generate a title for this conversation:\n\n{}",
552 content
553 ))],
554 }];
555 match self
556 .provider()
557 .stream_with_model(
558 self.title_model(),
559 &title_messages,
560 Some(TITLE_SYSTEM_PROMPT),
561 &[],
562 100,
563 0,
564 )
565 .await
566 {
567 Ok(rx) => Some(rx),
568 Err(e) => {
569 tracing::warn!("title generation stream failed: {e}");
570 None
571 }
572 }
573 } else {
574 None
575 };
576 let mut final_usage: Option<Usage> = None;
577 let mut system_prompt = self
578 .agents_context
579 .apply_to_system_prompt(&self.profile().system_prompt);
580 if let Some(ref store) = self.memory {
581 let query: String = content.chars().take(200).collect();
582 match store.inject_context(&query, self.memory_inject_count) {
583 Ok(ctx) if !ctx.is_empty() => {
584 system_prompt.push_str("\n\n");
585 system_prompt.push_str(&ctx);
586 }
587 Err(e) => tracing::warn!("memory injection failed: {e}"),
588 _ => {}
589 }
590 system_prompt.push_str(MEMORY_INSTRUCTIONS);
591 }
592 let tool_filter = self.profile().tool_filter.clone();
593 let thinking_budget = self.thinking_budget;
594 loop {
595 let mut tool_defs = self.tools.definitions_filtered(&tool_filter);
596 tool_defs.push(crate::provider::ToolDefinition {
597 name: "todo_write".to_string(),
598 description: "Create or update the task list for the current session. Use to track progress on multi-step tasks.".to_string(),
599 input_schema: serde_json::json!({
600 "type": "object",
601 "properties": {
602 "todos": {
603 "type": "array",
604 "items": {
605 "type": "object",
606 "properties": {
607 "content": { "type": "string", "description": "Brief description of the task" },
608 "status": { "type": "string", "enum": ["pending", "in_progress", "completed"], "description": "Current status" }
609 },
610 "required": ["content", "status"]
611 }
612 }
613 },
614 "required": ["todos"]
615 }),
616 });
617 tool_defs.push(crate::provider::ToolDefinition {
618 name: "question".to_string(),
619 description: "Ask the user a question and wait for their response. Use when you need clarification or a decision from the user.".to_string(),
620 input_schema: serde_json::json!({
621 "type": "object",
622 "properties": {
623 "question": { "type": "string", "description": "The question to ask the user" },
624 "options": { "type": "array", "items": { "type": "string" }, "description": "Optional list of choices" }
625 },
626 "required": ["question"]
627 }),
628 });
629 tool_defs.push(crate::provider::ToolDefinition {
630 name: "snapshot_list".to_string(),
631 description: "List all files that have been created or modified in this session."
632 .to_string(),
633 input_schema: serde_json::json!({
634 "type": "object",
635 "properties": {},
636 }),
637 });
638 tool_defs.push(crate::provider::ToolDefinition {
639 name: "snapshot_restore".to_string(),
640 description: "Restore a file to its original state before this session modified it. Pass a path or omit to restore all files.".to_string(),
641 input_schema: serde_json::json!({
642 "type": "object",
643 "properties": {
644 "path": { "type": "string", "description": "File path to restore (omit to restore all)" }
645 },
646 }),
647 });
648 if self.subagent_enabled {
649 let profile_names: Vec<String> =
650 self.profiles.iter().map(|p| p.name.clone()).collect();
651 let profiles_desc = if profile_names.is_empty() {
652 String::new()
653 } else {
654 format!(" Available profiles: {}.", profile_names.join(", "))
655 };
656 tool_defs.push(crate::provider::ToolDefinition {
657 name: "subagent".to_string(),
658 description: format!(
659 "Delegate a focused task to a subagent that runs in isolated context with its own conversation. \
660 The subagent has access to tools and works autonomously without user interaction. \
661 Use for complex subtasks that benefit from separate context (research, code analysis, multi-file changes). \
662 Set background=true to run non-blocking (returns immediately with an ID; retrieve results later with subagent_result).{}",
663 profiles_desc
664 ),
665 input_schema: serde_json::json!({
666 "type": "object",
667 "properties": {
668 "description": {
669 "type": "string",
670 "description": "What the subagent should do (used as system prompt context)"
671 },
672 "task": {
673 "type": "string",
674 "description": "The specific task prompt for the subagent"
675 },
676 "profile": {
677 "type": "string",
678 "description": "Agent profile to use (affects available tools and system prompt)"
679 },
680 "background": {
681 "type": "boolean",
682 "description": "Run in background (non-blocking). Returns an ID to check later with subagent_result."
683 }
684 },
685 "required": ["description", "task"]
686 }),
687 });
688 tool_defs.push(crate::provider::ToolDefinition {
689 name: "subagent_result".to_string(),
690 description: "Retrieve the result of a background subagent by ID. Returns the output if complete, or a status message if still running.".to_string(),
691 input_schema: serde_json::json!({
692 "type": "object",
693 "properties": {
694 "id": {
695 "type": "string",
696 "description": "The subagent ID returned when it was launched in background mode"
697 }
698 },
699 "required": ["id"]
700 }),
701 });
702 }
703 if self.memory.is_some() {
704 tool_defs.extend(crate::memory::tools::definitions());
705 }
706 {
707 let mut ctx = self.event_context(&Event::BeforePrompt);
708 ctx.prompt = Some(content.to_string());
709 match self.hooks.emit_blocking(&Event::BeforePrompt, &ctx) {
710 HookResult::Block(reason) => {
711 let _ = event_tx.send(AgentEvent::TextComplete(format!(
712 "[blocked by hook: {}]",
713 reason.trim()
714 )));
715 return Ok(());
716 }
717 HookResult::Modify(_modified) => {}
718 HookResult::Allow => {}
719 }
720 }
721 self.hooks.emit(
722 &Event::OnStreamStart,
723 &self.event_context(&Event::OnStreamStart),
724 );
725 let mut stream_rx = self
726 .provider()
727 .stream(
728 &self.messages,
729 Some(&system_prompt),
730 &tool_defs,
731 8192,
732 thinking_budget,
733 )
734 .await?;
735 self.hooks.emit(
736 &Event::OnStreamEnd,
737 &self.event_context(&Event::OnStreamEnd),
738 );
739 let mut full_text = String::new();
740 let mut full_thinking = String::new();
741 let mut full_thinking_signature = String::new();
742 let mut compaction_content: Option<String> = None;
743 let mut tool_calls: Vec<PendingToolCall> = Vec::new();
744 let mut current_tool_input = String::new();
745 while let Some(event) = stream_rx.recv().await {
746 match event.event_type {
747 StreamEventType::TextDelta(text) => {
748 full_text.push_str(&text);
749 let _ = event_tx.send(AgentEvent::TextDelta(text));
750 }
751 StreamEventType::ThinkingDelta(text) => {
752 full_thinking.push_str(&text);
753 let _ = event_tx.send(AgentEvent::ThinkingDelta(text));
754 }
755 StreamEventType::ThinkingComplete {
756 thinking,
757 signature,
758 } => {
759 full_thinking = thinking;
760 full_thinking_signature = signature;
761 }
762 StreamEventType::CompactionComplete(content) => {
763 let _ = event_tx.send(AgentEvent::Compacting);
764 compaction_content = Some(content);
765 }
766 StreamEventType::ToolUseStart { id, name } => {
767 current_tool_input.clear();
768 let _ = event_tx.send(AgentEvent::ToolCallStart {
769 id: id.clone(),
770 name: name.clone(),
771 });
772 tool_calls.push(PendingToolCall {
773 id,
774 name,
775 input: String::new(),
776 });
777 }
778 StreamEventType::ToolUseInputDelta(delta) => {
779 current_tool_input.push_str(&delta);
780 let _ = event_tx.send(AgentEvent::ToolCallInputDelta(delta));
781 }
782 StreamEventType::ToolUseEnd => {
783 if let Some(tc) = tool_calls.last_mut() {
784 tc.input = current_tool_input.clone();
785 }
786 current_tool_input.clear();
787 }
788 StreamEventType::MessageEnd {
789 stop_reason: _,
790 usage,
791 } => {
792 self.last_input_tokens = usage.input_tokens;
793 let _ = self
794 .db
795 .update_last_input_tokens(&self.conversation_id, usage.input_tokens);
796 final_usage = Some(usage);
797 }
798
799 _ => {}
800 }
801 }
802
803 let mut content_blocks: Vec<ContentBlock> = Vec::new();
804 if let Some(ref summary) = compaction_content {
805 content_blocks.push(ContentBlock::Compaction {
806 content: summary.clone(),
807 });
808 }
809 if !full_thinking.is_empty() {
810 content_blocks.push(ContentBlock::Thinking {
811 thinking: full_thinking.clone(),
812 signature: full_thinking_signature.clone(),
813 });
814 }
815 if !full_text.is_empty() {
816 content_blocks.push(ContentBlock::Text(full_text.clone()));
817 }
818
819 for tc in &tool_calls {
820 let input_value: serde_json::Value =
821 serde_json::from_str(&tc.input).unwrap_or_else(|_| serde_json::json!({}));
822 content_blocks.push(ContentBlock::ToolUse {
823 id: tc.id.clone(),
824 name: tc.name.clone(),
825 input: input_value,
826 });
827 }
828
829 self.messages.push(Message {
830 role: Role::Assistant,
831 content: content_blocks,
832 });
833 let stored_text = if !full_text.is_empty() {
834 full_text.clone()
835 } else {
836 String::from("[tool use]")
837 };
838 let assistant_msg_id =
839 self.db
840 .add_message(&self.conversation_id, "assistant", &stored_text)?;
841 for tc in &tool_calls {
842 let _ = self
843 .db
844 .add_tool_call(&assistant_msg_id, &tc.id, &tc.name, &tc.input);
845 }
846 {
847 let mut ctx = self.event_context(&Event::AfterPrompt);
848 ctx.prompt = Some(full_text.clone());
849 self.hooks.emit(&Event::AfterPrompt, &ctx);
850 }
851 self.snapshots.checkpoint();
852 if tool_calls.is_empty() {
853 let _ = event_tx.send(AgentEvent::TextComplete(full_text));
854 if let Some(usage) = final_usage {
855 let _ = event_tx.send(AgentEvent::Done { usage });
856 }
857 if self.memory_auto_extract
858 && let Some(ref store) = self.memory
859 {
860 let msgs = self.messages.clone();
861 let provider = self.provider_arc();
862 let store = Arc::clone(store);
863 let conv_id = self.conversation_id.clone();
864 let etx = event_tx.clone();
865 tokio::spawn(async move {
866 match crate::memory::extract::extract(&msgs, &*provider, &store, &conv_id)
867 .await
868 {
869 Ok(result)
870 if result.added > 0 || result.updated > 0 || result.deleted > 0 =>
871 {
872 let _ = etx.send(AgentEvent::MemoryExtracted {
873 added: result.added,
874 updated: result.updated,
875 deleted: result.deleted,
876 });
877 }
878 Err(e) => tracing::warn!("memory extraction failed: {e}"),
879 _ => {}
880 }
881 });
882 }
883 break;
884 }
885
886 let mut result_blocks: Vec<ContentBlock> = Vec::new();
887
888 for tc in &tool_calls {
889 let input_value: serde_json::Value =
890 serde_json::from_str(&tc.input).unwrap_or_else(|_| serde_json::json!({}));
891 if tc.name == "todo_write" {
893 if let Some(todos_arr) = input_value.get("todos").and_then(|v| v.as_array()) {
894 let items: Vec<TodoItem> = todos_arr
895 .iter()
896 .filter_map(|t| {
897 let content = t.get("content")?.as_str()?.to_string();
898 let status = match t
899 .get("status")
900 .and_then(|s| s.as_str())
901 .unwrap_or("pending")
902 {
903 "in_progress" => TodoStatus::InProgress,
904 "completed" => TodoStatus::Completed,
905 _ => TodoStatus::Pending,
906 };
907 Some(TodoItem { content, status })
908 })
909 .collect();
910 let _ = event_tx.send(AgentEvent::TodoUpdate(items));
911 }
912 let _ = event_tx.send(AgentEvent::ToolCallResult {
913 id: tc.id.clone(),
914 name: tc.name.clone(),
915 output: "ok".to_string(),
916 is_error: false,
917 });
918 result_blocks.push(ContentBlock::ToolResult {
919 tool_use_id: tc.id.clone(),
920 content: "ok".to_string(),
921 is_error: false,
922 });
923 continue;
924 }
925 if tc.name == "question" {
927 let question = input_value
928 .get("question")
929 .and_then(|v| v.as_str())
930 .unwrap_or("?")
931 .to_string();
932 let options: Vec<String> = input_value
933 .get("options")
934 .and_then(|v| v.as_array())
935 .map(|arr| {
936 arr.iter()
937 .filter_map(|v| v.as_str().map(String::from))
938 .collect()
939 })
940 .unwrap_or_default();
941 let (tx, rx) = tokio::sync::oneshot::channel();
942 let _ = event_tx.send(AgentEvent::Question {
943 id: tc.id.clone(),
944 question: question.clone(),
945 options,
946 responder: QuestionResponder(tx),
947 });
948 let answer = match rx.await {
949 Ok(a) => a,
950 Err(_) => "[cancelled]".to_string(),
951 };
952 let _ = event_tx.send(AgentEvent::ToolCallResult {
953 id: tc.id.clone(),
954 name: tc.name.clone(),
955 output: answer.clone(),
956 is_error: false,
957 });
958 result_blocks.push(ContentBlock::ToolResult {
959 tool_use_id: tc.id.clone(),
960 content: answer,
961 is_error: false,
962 });
963 continue;
964 }
965 if tc.name == "snapshot_list" {
967 let changes = self.snapshots.list_changes();
968 let output = if changes.is_empty() {
969 "No file changes in this session.".to_string()
970 } else {
971 changes
972 .iter()
973 .map(|(p, k)| format!("{} {}", k.icon(), p))
974 .collect::<Vec<_>>()
975 .join("\n")
976 };
977 let _ = event_tx.send(AgentEvent::ToolCallResult {
978 id: tc.id.clone(),
979 name: tc.name.clone(),
980 output: output.clone(),
981 is_error: false,
982 });
983 result_blocks.push(ContentBlock::ToolResult {
984 tool_use_id: tc.id.clone(),
985 content: output,
986 is_error: false,
987 });
988 continue;
989 }
990 if tc.name == "snapshot_restore" {
992 let output =
993 if let Some(path) = input_value.get("path").and_then(|v| v.as_str()) {
994 match self.snapshots.restore(path) {
995 Ok(msg) => msg,
996 Err(e) => e.to_string(),
997 }
998 } else {
999 match self.snapshots.restore_all() {
1000 Ok(msgs) => {
1001 if msgs.is_empty() {
1002 "Nothing to restore.".to_string()
1003 } else {
1004 msgs.join("\n")
1005 }
1006 }
1007 Err(e) => e.to_string(),
1008 }
1009 };
1010 let _ = event_tx.send(AgentEvent::ToolCallResult {
1011 id: tc.id.clone(),
1012 name: tc.name.clone(),
1013 output: output.clone(),
1014 is_error: false,
1015 });
1016 result_blocks.push(ContentBlock::ToolResult {
1017 tool_use_id: tc.id.clone(),
1018 content: output,
1019 is_error: false,
1020 });
1021 continue;
1022 }
1023 if tc.name == "batch" {
1025 let invocations = input_value
1026 .get("invocations")
1027 .and_then(|v| v.as_array())
1028 .cloned()
1029 .unwrap_or_default();
1030 tracing::debug!("batch: {} invocations", invocations.len());
1031 let results: Vec<serde_json::Value> = invocations
1032 .iter()
1033 .map(|inv| {
1034 let name = inv.get("tool_name").and_then(|v| v.as_str()).unwrap_or("");
1035 let input = inv.get("input").cloned().unwrap_or(serde_json::Value::Null);
1036 match self.tools.execute(name, input) {
1037 Ok(out) => serde_json::json!({ "tool_name": name, "result": out, "is_error": false }),
1038 Err(e) => serde_json::json!({ "tool_name": name, "result": e.to_string(), "is_error": true }),
1039 }
1040 })
1041 .collect();
1042 let output = serde_json::to_string(&results).unwrap_or_else(|e| e.to_string());
1043 let _ = event_tx.send(AgentEvent::ToolCallResult {
1044 id: tc.id.clone(),
1045 name: tc.name.clone(),
1046 output: output.clone(),
1047 is_error: false,
1048 });
1049 result_blocks.push(ContentBlock::ToolResult {
1050 tool_use_id: tc.id.clone(),
1051 content: output,
1052 is_error: false,
1053 });
1054 continue;
1055 }
1056 if tc.name == "subagent" {
1058 let description = input_value
1059 .get("description")
1060 .and_then(|v| v.as_str())
1061 .unwrap_or("subtask")
1062 .to_string();
1063 let task = input_value
1064 .get("task")
1065 .and_then(|v| v.as_str())
1066 .unwrap_or("")
1067 .to_string();
1068 let profile = input_value
1069 .get("profile")
1070 .and_then(|v| v.as_str())
1071 .map(String::from);
1072 let background = input_value
1073 .get("background")
1074 .and_then(|v| v.as_bool())
1075 .unwrap_or(false);
1076
1077 let output = if background {
1078 match self.spawn_background_subagent(
1079 &description,
1080 &task,
1081 profile.as_deref(),
1082 ) {
1083 Ok(id) => format!("Background subagent launched with id: {id}"),
1084 Err(e) => {
1085 tracing::error!("background subagent error: {e}");
1086 format!("[subagent error: {e}]")
1087 }
1088 }
1089 } else {
1090 match self
1091 .run_subagent(&description, &task, profile.as_deref(), &event_tx)
1092 .await
1093 {
1094 Ok(text) => text,
1095 Err(e) => {
1096 tracing::error!("subagent error: {e}");
1097 format!("[subagent error: {e}]")
1098 }
1099 }
1100 };
1101 let is_error = output.starts_with("[subagent error:");
1102 let _ = event_tx.send(AgentEvent::ToolCallResult {
1103 id: tc.id.clone(),
1104 name: tc.name.clone(),
1105 output: output.clone(),
1106 is_error,
1107 });
1108 result_blocks.push(ContentBlock::ToolResult {
1109 tool_use_id: tc.id.clone(),
1110 content: output,
1111 is_error,
1112 });
1113 continue;
1114 }
1115 if tc.name == "subagent_result" {
1117 let id = input_value.get("id").and_then(|v| v.as_str()).unwrap_or("");
1118 let output = {
1119 let results = self
1120 .background_results
1121 .lock()
1122 .unwrap_or_else(|e| e.into_inner());
1123 if let Some(result) = results.get(id) {
1124 result.clone()
1125 } else if self.background_handles.contains_key(id) {
1126 format!("Subagent '{id}' is still running.")
1127 } else {
1128 format!("No subagent found with id '{id}'.")
1129 }
1130 };
1131 let _ = event_tx.send(AgentEvent::ToolCallResult {
1132 id: tc.id.clone(),
1133 name: tc.name.clone(),
1134 output: output.clone(),
1135 is_error: false,
1136 });
1137 result_blocks.push(ContentBlock::ToolResult {
1138 tool_use_id: tc.id.clone(),
1139 content: output,
1140 is_error: false,
1141 });
1142 continue;
1143 }
1144 if let Some(ref store) = self.memory
1146 && let Some((output, is_error)) = crate::memory::tools::handle(
1147 &tc.name,
1148 &input_value,
1149 store,
1150 &self.conversation_id,
1151 )
1152 {
1153 let _ = event_tx.send(AgentEvent::ToolCallResult {
1154 id: tc.id.clone(),
1155 name: tc.name.clone(),
1156 output: output.clone(),
1157 is_error,
1158 });
1159 result_blocks.push(ContentBlock::ToolResult {
1160 tool_use_id: tc.id.clone(),
1161 content: output,
1162 is_error,
1163 });
1164 continue;
1165 }
1166 let perm = self
1168 .permissions
1169 .get(&tc.name)
1170 .map(|s| s.as_str())
1171 .unwrap_or("allow");
1172 if perm == "deny" {
1173 let output = format!("Tool '{}' is denied by permissions config.", tc.name);
1174 let _ = event_tx.send(AgentEvent::ToolCallResult {
1175 id: tc.id.clone(),
1176 name: tc.name.clone(),
1177 output: output.clone(),
1178 is_error: true,
1179 });
1180 result_blocks.push(ContentBlock::ToolResult {
1181 tool_use_id: tc.id.clone(),
1182 content: output,
1183 is_error: true,
1184 });
1185 continue;
1186 }
1187 if perm == "ask" {
1188 let summary = format!("{}: {}", tc.name, &tc.input[..tc.input.len().min(100)]);
1189 let (ptx, prx) = tokio::sync::oneshot::channel();
1190 let _ = event_tx.send(AgentEvent::PermissionRequest {
1191 tool_name: tc.name.clone(),
1192 input_summary: summary,
1193 responder: QuestionResponder(ptx),
1194 });
1195 let answer = match prx.await {
1196 Ok(a) => a,
1197 Err(_) => "deny".to_string(),
1198 };
1199 if answer != "allow" {
1200 let output = format!("Tool '{}' denied by user.", tc.name);
1201 let _ = event_tx.send(AgentEvent::ToolCallResult {
1202 id: tc.id.clone(),
1203 name: tc.name.clone(),
1204 output: output.clone(),
1205 is_error: true,
1206 });
1207 result_blocks.push(ContentBlock::ToolResult {
1208 tool_use_id: tc.id.clone(),
1209 content: output,
1210 is_error: true,
1211 });
1212 continue;
1213 }
1214 }
1215 if tc.name == "write_file" || tc.name == "apply_patch" {
1217 if tc.name == "write_file" {
1218 if let Some(path) = input_value.get("path").and_then(|v| v.as_str()) {
1219 self.snapshots.before_write(path);
1220 }
1221 } else if let Some(patches) =
1222 input_value.get("patches").and_then(|v| v.as_array())
1223 {
1224 for patch in patches {
1225 if let Some(path) = patch.get("path").and_then(|v| v.as_str()) {
1226 self.snapshots.before_write(path);
1227 }
1228 }
1229 }
1230 }
1231 {
1232 let mut ctx = self.event_context(&Event::BeforeToolCall);
1233 ctx.tool_name = Some(tc.name.clone());
1234 ctx.tool_input = Some(tc.input.clone());
1235 match self.hooks.emit_blocking(&Event::BeforeToolCall, &ctx) {
1236 HookResult::Block(reason) => {
1237 let output = format!("[blocked by hook: {}]", reason.trim());
1238 let _ = event_tx.send(AgentEvent::ToolCallResult {
1239 id: tc.id.clone(),
1240 name: tc.name.clone(),
1241 output: output.clone(),
1242 is_error: true,
1243 });
1244 result_blocks.push(ContentBlock::ToolResult {
1245 tool_use_id: tc.id.clone(),
1246 content: output,
1247 is_error: true,
1248 });
1249 continue;
1250 }
1251 HookResult::Modify(_modified) => {}
1252 HookResult::Allow => {}
1253 }
1254 }
1255 let _ = event_tx.send(AgentEvent::ToolCallExecuting {
1256 id: tc.id.clone(),
1257 name: tc.name.clone(),
1258 input: tc.input.clone(),
1259 });
1260 let tool_name = tc.name.clone();
1261 let tool_input = input_value.clone();
1262 let exec_result = tokio::time::timeout(std::time::Duration::from_secs(30), async {
1263 tokio::task::block_in_place(|| self.tools.execute(&tool_name, tool_input))
1264 })
1265 .await;
1266 let (output, is_error) = match exec_result {
1267 Err(_elapsed) => {
1268 let msg = format!("Tool '{}' timed out after 30 seconds.", tc.name);
1269 let mut ctx = self.event_context(&Event::OnToolError);
1270 ctx.tool_name = Some(tc.name.clone());
1271 ctx.error = Some(msg.clone());
1272 self.hooks.emit(&Event::OnToolError, &ctx);
1273 (msg, true)
1274 }
1275 Ok(Err(e)) => {
1276 let msg = e.to_string();
1277 let mut ctx = self.event_context(&Event::OnToolError);
1278 ctx.tool_name = Some(tc.name.clone());
1279 ctx.error = Some(msg.clone());
1280 self.hooks.emit(&Event::OnToolError, &ctx);
1281 (msg, true)
1282 }
1283 Ok(Ok(out)) => (out, false),
1284 };
1285 tracing::debug!(
1286 "Tool '{}' result (error={}): {}",
1287 tc.name,
1288 is_error,
1289 &output[..output.len().min(200)]
1290 );
1291 let _ = self.db.update_tool_result(&tc.id, &output, is_error);
1292 let _ = event_tx.send(AgentEvent::ToolCallResult {
1293 id: tc.id.clone(),
1294 name: tc.name.clone(),
1295 output: output.clone(),
1296 is_error,
1297 });
1298 {
1299 let mut ctx = self.event_context(&Event::AfterToolCall);
1300 ctx.tool_name = Some(tc.name.clone());
1301 ctx.tool_output = Some(output.clone());
1302 self.hooks.emit(&Event::AfterToolCall, &ctx);
1303 }
1304 result_blocks.push(ContentBlock::ToolResult {
1305 tool_use_id: tc.id.clone(),
1306 content: output,
1307 is_error,
1308 });
1309 }
1310
1311 self.messages.push(Message {
1312 role: Role::User,
1313 content: result_blocks,
1314 });
1315 }
1316
1317 let title = if let Some(mut rx) = title_rx {
1318 let mut raw = String::new();
1319 while let Some(event) = rx.recv().await {
1320 match event.event_type {
1321 StreamEventType::TextDelta(text) => raw.push_str(&text),
1322 StreamEventType::Error(e) => {
1323 tracing::warn!("title stream error: {e}");
1324 }
1325 _ => {}
1326 }
1327 }
1328 let t = raw
1329 .trim()
1330 .trim_matches('"')
1331 .trim_matches('`')
1332 .trim_matches('*')
1333 .replace('\n', " ");
1334 let t: String = t.chars().take(50).collect();
1335 if t.is_empty() {
1336 tracing::warn!("title stream returned empty text");
1337 None
1338 } else {
1339 Some(t)
1340 }
1341 } else {
1342 None
1343 };
1344 let fallback = || -> String {
1345 self.messages
1346 .first()
1347 .and_then(|m| {
1348 m.content.iter().find_map(|b| {
1349 if let ContentBlock::Text(t) = b {
1350 let s: String = t.chars().take(50).collect();
1351 let s = s.trim().to_string();
1352 if s.is_empty() { None } else { Some(s) }
1353 } else {
1354 None
1355 }
1356 })
1357 })
1358 .unwrap_or_else(|| "Chat".to_string())
1359 };
1360 let title = title.unwrap_or_else(fallback);
1361 let _ = self
1362 .db
1363 .update_conversation_title(&self.conversation_id, &title);
1364 let _ = event_tx.send(AgentEvent::TitleGenerated(title.clone()));
1365 {
1366 let mut ctx = self.event_context(&Event::OnTitleGenerated);
1367 ctx.title = Some(title);
1368 self.hooks.emit(&Event::OnTitleGenerated, &ctx);
1369 }
1370
1371 Ok(())
1372 }
1373}