Skip to main content

dot/agent/
mod.rs

1mod events;
2mod profile;
3mod subagent;
4
5pub use events::{AgentEvent, QuestionResponder, TodoItem, TodoStatus};
6pub use profile::AgentProfile;
7
8use events::PendingToolCall;
9
10use crate::command::CommandRegistry;
11use crate::config::Config;
12use crate::db::Db;
13use crate::extension::{Event, EventContext, HookRegistry, HookResult};
14use crate::memory::MemoryStore;
15use crate::provider::{ContentBlock, Message, Provider, Role, StreamEventType, Usage};
16use crate::tools::ToolRegistry;
17use anyhow::{Context, Result};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::mpsc::UnboundedSender;
21
22const COMPACT_THRESHOLD: f32 = 0.8;
23const COMPACT_KEEP_MESSAGES: usize = 10;
24
25const MEMORY_INSTRUCTIONS: &str = "\n\n\
26# Memory
27
28You have persistent memory across conversations. **Core blocks** (above) are always visible — update them via `core_memory_update` for essential user/agent facts. **Archival memory** is searched per turn — use `memory_add`/`memory_search`/`memory_list`/`memory_delete` to manage it.
29
30When the user says \"remember\"/\"forget\"/\"what do you know about me\", use the appropriate memory tool. Memories are also auto-extracted in the background, so focus on explicit requests.";
31
32const TITLE_SYSTEM_PROMPT: &str = "\
33You are a title generator. You output ONLY a thread title. Nothing else.
34
35Generate a brief title that would help the user find this conversation later.
36
37Rules:
38- A single line, 50 characters or fewer
39- No explanations, no quotes, no punctuation wrapping
40- Use the same language as the user message
41- Title must be grammatically correct and read naturally
42- Never include tool names (e.g. read tool, bash tool, edit tool)
43- Focus on the main topic or question the user wants to retrieve
44- Vary your phrasing — avoid repetitive patterns like always starting with \"Analyzing\"
45- When a file is mentioned, focus on WHAT the user wants to do WITH the file
46- Keep exact: technical terms, numbers, filenames, HTTP codes
47- Remove filler words: the, this, my, a, an
48- If the user message is short or conversational (e.g. \"hello\", \"hey\"): \
49  create a title reflecting the user's tone (Greeting, Quick check-in, etc.)";
50
51pub struct Agent {
52    providers: Vec<Arc<dyn Provider>>,
53    active: usize,
54    tools: Arc<ToolRegistry>,
55    db: Db,
56    memory: Option<Arc<MemoryStore>>,
57    memory_auto_extract: bool,
58    memory_inject_count: usize,
59    conversation_id: String,
60    messages: Vec<Message>,
61    profiles: Vec<AgentProfile>,
62    active_profile: usize,
63    pub thinking_budget: u32,
64    cwd: String,
65    agents_context: crate::context::AgentsContext,
66    last_input_tokens: u32,
67    permissions: HashMap<String, String>,
68    snapshots: crate::snapshot::SnapshotManager,
69    hooks: HookRegistry,
70    commands: CommandRegistry,
71    subagent_enabled: bool,
72    subagent_max_turns: usize,
73    background_results: Arc<std::sync::Mutex<HashMap<String, String>>>,
74    background_handles: HashMap<String, tokio::task::JoinHandle<()>>,
75    background_tx: Option<UnboundedSender<AgentEvent>>,
76}
77
78impl Agent {
79    #[allow(clippy::too_many_arguments)]
80    pub fn new(
81        providers: Vec<Box<dyn Provider>>,
82        db: Db,
83        config: &Config,
84        memory: Option<Arc<MemoryStore>>,
85        tools: ToolRegistry,
86        profiles: Vec<AgentProfile>,
87        cwd: String,
88        agents_context: crate::context::AgentsContext,
89        hooks: HookRegistry,
90        commands: CommandRegistry,
91    ) -> Result<Self> {
92        assert!(!providers.is_empty(), "at least one provider required");
93        let providers: Vec<Arc<dyn Provider>> = providers.into_iter().map(Arc::from).collect();
94        let conversation_id =
95            db.create_conversation(providers[0].model(), providers[0].name(), &cwd)?;
96        tracing::debug!("Agent created with conversation {}", conversation_id);
97        let mut profiles = if profiles.is_empty() {
98            vec![AgentProfile::default_profile()]
99        } else {
100            profiles
101        };
102        if !profiles.iter().any(|p| p.name == "plan") {
103            let at = 1.min(profiles.len());
104            profiles.insert(at, AgentProfile::plan_profile());
105        }
106        Ok(Agent {
107            providers,
108            active: 0,
109            tools: Arc::new(tools),
110            db,
111            memory,
112            memory_auto_extract: config.memory.auto_extract,
113            memory_inject_count: config.memory.inject_count,
114            conversation_id,
115            messages: Vec::new(),
116            profiles,
117            active_profile: 0,
118            thinking_budget: 0,
119            cwd,
120            agents_context,
121            last_input_tokens: 0,
122            permissions: config.permissions.clone(),
123            snapshots: crate::snapshot::SnapshotManager::new(),
124            hooks,
125            commands,
126            subagent_enabled: config.subagents.enabled,
127            subagent_max_turns: config.subagents.max_turns,
128            background_results: Arc::new(std::sync::Mutex::new(HashMap::new())),
129            background_handles: HashMap::new(),
130            background_tx: None,
131        })
132    }
133    fn provider(&self) -> &dyn Provider {
134        &*self.providers[self.active]
135    }
136    fn provider_arc(&self) -> Arc<dyn Provider> {
137        Arc::clone(&self.providers[self.active])
138    }
139    pub fn set_background_tx(&mut self, tx: UnboundedSender<AgentEvent>) {
140        self.background_tx = Some(tx);
141    }
142    fn event_context(&self, event: &Event) -> EventContext {
143        EventContext {
144            event: event.as_str().to_string(),
145            model: self.provider().model().to_string(),
146            provider: self.provider().name().to_string(),
147            cwd: self.cwd.clone(),
148            session_id: self.conversation_id.clone(),
149            ..Default::default()
150        }
151    }
152    pub fn execute_command(&self, name: &str, args: &str) -> Result<String> {
153        self.commands.execute(name, args, &self.cwd)
154    }
155    pub fn list_commands(&self) -> Vec<(&str, &str)> {
156        self.commands.list()
157    }
158    pub fn has_command(&self, name: &str) -> bool {
159        self.commands.has(name)
160    }
161    pub fn hooks(&self) -> &HookRegistry {
162        &self.hooks
163    }
164    fn profile(&self) -> &AgentProfile {
165        &self.profiles[self.active_profile]
166    }
167    pub fn conversation_id(&self) -> &str {
168        &self.conversation_id
169    }
170    pub fn messages(&self) -> &[Message] {
171        &self.messages
172    }
173    pub fn set_model(&mut self, model: String) {
174        if let Some(p) = Arc::get_mut(&mut self.providers[self.active]) {
175            p.set_model(model);
176        } else {
177            tracing::warn!("cannot change model while background subagent is active");
178        }
179    }
180    pub fn set_active_provider(&mut self, provider_name: &str, model: &str) {
181        if let Some(idx) = self
182            .providers
183            .iter()
184            .position(|p| p.name() == provider_name)
185        {
186            self.active = idx;
187            if let Some(p) = Arc::get_mut(&mut self.providers[idx]) {
188                p.set_model(model.to_string());
189            } else {
190                tracing::warn!("cannot change model while background subagent is active");
191            }
192        }
193    }
194    pub fn set_thinking_budget(&mut self, budget: u32) {
195        self.thinking_budget = budget;
196    }
197    pub fn available_models(&self) -> Vec<String> {
198        self.provider().available_models()
199    }
200    pub async fn fetch_all_models(&self) -> Vec<(String, Vec<String>)> {
201        let mut result = Vec::new();
202        for p in &self.providers {
203            let models = match p.fetch_models().await {
204                Ok(m) => m,
205                Err(e) => {
206                    tracing::warn!("Failed to fetch models for {}: {e}", p.name());
207                    Vec::new()
208                }
209            };
210            result.push((p.name().to_string(), models));
211        }
212        result
213    }
214    pub fn current_model(&self) -> &str {
215        self.provider().model()
216    }
217    pub fn current_provider_name(&self) -> &str {
218        self.provider().name()
219    }
220    pub fn current_agent_name(&self) -> &str {
221        &self.profile().name
222    }
223    pub fn context_window(&self) -> u32 {
224        self.provider().context_window()
225    }
226    pub async fn fetch_context_window(&self) -> u32 {
227        match self.provider().fetch_context_window().await {
228            Ok(cw) => cw,
229            Err(e) => {
230                tracing::warn!("Failed to fetch context window: {e}");
231                0
232            }
233        }
234    }
235    pub fn agent_profiles(&self) -> &[AgentProfile] {
236        &self.profiles
237    }
238    pub fn switch_agent(&mut self, name: &str) -> bool {
239        if let Some(idx) = self.profiles.iter().position(|p| p.name == name) {
240            self.active_profile = idx;
241            let model_spec = self.profiles[idx].model_spec.clone();
242
243            if let Some(spec) = model_spec {
244                let (provider, model) = Config::parse_model_spec(&spec);
245                if let Some(prov) = provider {
246                    self.set_active_provider(prov, model);
247                } else {
248                    self.set_model(model.to_string());
249                }
250            }
251            tracing::info!("Switched to agent '{}'", name);
252            true
253        } else {
254            false
255        }
256    }
257    pub fn cleanup_if_empty(&mut self) {
258        if self.messages.is_empty() {
259            let _ = self.db.delete_conversation(&self.conversation_id);
260        }
261    }
262    pub fn new_conversation(&mut self) -> Result<()> {
263        self.cleanup_if_empty();
264        let conversation_id = self.db.create_conversation(
265            self.provider().model(),
266            self.provider().name(),
267            &self.cwd,
268        )?;
269        self.conversation_id = conversation_id;
270        self.messages.clear();
271        Ok(())
272    }
273    pub fn resume_conversation(&mut self, conversation: &crate::db::Conversation) -> Result<()> {
274        self.conversation_id = conversation.id.clone();
275        self.messages = conversation
276            .messages
277            .iter()
278            .map(|m| Message {
279                role: if m.role == "user" {
280                    Role::User
281                } else {
282                    Role::Assistant
283                },
284                content: vec![ContentBlock::Text(m.content.clone())],
285            })
286            .collect();
287        tracing::debug!("Resumed conversation {}", conversation.id);
288        {
289            let ctx = self.event_context(&Event::OnResume);
290            self.hooks.emit(&Event::OnResume, &ctx);
291        }
292        Ok(())
293    }
294    pub fn list_sessions(&self) -> Result<Vec<crate::db::ConversationSummary>> {
295        self.db.list_conversations_for_cwd(&self.cwd, 50)
296    }
297    pub fn get_session(&self, id: &str) -> Result<crate::db::Conversation> {
298        self.db.get_conversation(id)
299    }
300    pub fn conversation_title(&self) -> Option<String> {
301        self.db
302            .get_conversation(&self.conversation_id)
303            .ok()
304            .and_then(|c| c.title)
305    }
306    pub fn rename_session(&self, title: &str) -> Result<()> {
307        self.db
308            .update_conversation_title(&self.conversation_id, title)
309            .context("failed to rename session")
310    }
311    pub fn cwd(&self) -> &str {
312        &self.cwd
313    }
314
315    pub fn truncate_messages(&mut self, count: usize) {
316        let target = count.min(self.messages.len());
317        self.messages.truncate(target);
318    }
319
320    pub fn revert_to_message(&mut self, keep: usize) -> Result<Vec<String>> {
321        let keep = keep.min(self.messages.len());
322        let checkpoint_idx = self.messages[..keep]
323            .iter()
324            .filter(|m| m.role == Role::Assistant)
325            .count();
326        self.messages.truncate(keep);
327        self.db
328            .truncate_messages(&self.conversation_id, keep)
329            .context("truncating db messages")?;
330        let restored = if checkpoint_idx > 0 {
331            let res = self.snapshots.restore_to_checkpoint(checkpoint_idx - 1)?;
332            self.snapshots.truncate_checkpoints(checkpoint_idx);
333            res
334        } else {
335            let res = self.snapshots.restore_all()?;
336            self.snapshots.truncate_checkpoints(0);
337            res
338        };
339        Ok(restored)
340    }
341
342    pub fn fork_conversation(&mut self, msg_count: usize) -> Result<()> {
343        let kept = self.messages[..msg_count.min(self.messages.len())].to_vec();
344        self.cleanup_if_empty();
345        let conversation_id = self.db.create_conversation(
346            self.provider().model(),
347            self.provider().name(),
348            &self.cwd,
349        )?;
350        self.conversation_id = conversation_id;
351        self.messages = kept;
352        for msg in &self.messages {
353            let role = match msg.role {
354                Role::User => "user",
355                Role::Assistant => "assistant",
356                Role::System => "system",
357            };
358            let text: String = msg
359                .content
360                .iter()
361                .filter_map(|b| {
362                    if let ContentBlock::Text(t) = b {
363                        Some(t.as_str())
364                    } else {
365                        None
366                    }
367                })
368                .collect::<Vec<_>>()
369                .join("\n");
370            if !text.is_empty() {
371                let _ = self.db.add_message(&self.conversation_id, role, &text);
372            }
373        }
374        Ok(())
375    }
376
377    fn title_model(&self) -> &str {
378        self.provider().model()
379    }
380
381    fn should_compact(&self) -> bool {
382        let limit = self.provider().context_window();
383        let threshold = (limit as f32 * COMPACT_THRESHOLD) as u32;
384        self.last_input_tokens >= threshold
385    }
386    fn emit_compact_hooks(&self, phase: &Event) {
387        let ctx = self.event_context(phase);
388        self.hooks.emit(phase, &ctx);
389    }
390    async fn compact(&mut self, event_tx: &UnboundedSender<AgentEvent>) -> Result<()> {
391        let keep = COMPACT_KEEP_MESSAGES;
392        if self.messages.len() <= keep + 2 {
393            return Ok(());
394        }
395        let cutoff = self.messages.len() - keep;
396        let old_messages = self.messages[..cutoff].to_vec();
397        let kept = self.messages[cutoff..].to_vec();
398
399        let mut summary_text = String::new();
400        for msg in &old_messages {
401            let role = match msg.role {
402                Role::User => "User",
403                Role::Assistant => "Assistant",
404                Role::System => "System",
405            };
406            for block in &msg.content {
407                if let ContentBlock::Text(t) = block {
408                    summary_text.push_str(&format!("{}:\n{}\n\n", role, t));
409                }
410            }
411        }
412        let summary_request = vec![Message {
413            role: Role::User,
414            content: vec![ContentBlock::Text(format!(
415                "Summarize the following conversation history concisely, preserving all key decisions, facts, code changes, and context that would be needed to continue the work:\n\n{}",
416                summary_text
417            ))],
418        }];
419
420        self.emit_compact_hooks(&Event::BeforeCompact);
421        let mut stream_rx = self
422            .provider()
423            .stream(
424                &summary_request,
425                Some("You are a concise summarizer. Produce a dense, factual summary."),
426                &[],
427                4096,
428                0,
429            )
430            .await?;
431        let mut full_summary = String::new();
432        while let Some(event) = stream_rx.recv().await {
433            if let StreamEventType::TextDelta(text) = event.event_type {
434                full_summary.push_str(&text);
435            }
436        }
437        self.messages = vec![
438            Message {
439                role: Role::User,
440                content: vec![ContentBlock::Text(
441                    "[Previous conversation summarized below]".to_string(),
442                )],
443            },
444            Message {
445                role: Role::Assistant,
446                content: vec![ContentBlock::Text(format!(
447                    "Summary of prior context:\n\n{}",
448                    full_summary
449                ))],
450            },
451        ];
452        self.messages.extend(kept);
453
454        let _ = self.db.add_message(
455            &self.conversation_id,
456            "assistant",
457            &format!("[Compacted {} messages into summary]", cutoff),
458        );
459        self.last_input_tokens = 0;
460        let _ = event_tx.send(AgentEvent::Compacted {
461            messages_removed: cutoff,
462        });
463        self.emit_compact_hooks(&Event::AfterCompact);
464        Ok(())
465    }
466    /// Remove orphaned tool-call cycles and trailing user messages from the
467    /// end of the conversation. This handles cases where a previous stream was
468    /// cancelled or errored mid-execution, leaving ToolResult messages without
469    /// a subsequent assistant response, assistant ToolUse messages without
470    /// corresponding ToolResult messages, or plain user messages that never
471    /// received a response (which would cause consecutive user messages on the
472    /// next send).
473    fn sanitize(&mut self) {
474        loop {
475            let dominated = match self.messages.last() {
476                None => false,
477                Some(msg) if msg.role == Role::User => {
478                    !msg.content.is_empty()
479                        && msg
480                            .content
481                            .iter()
482                            .all(|b| matches!(b, ContentBlock::ToolResult { .. }))
483                }
484                Some(msg) if msg.role == Role::Assistant => msg
485                    .content
486                    .iter()
487                    .any(|b| matches!(b, ContentBlock::ToolUse { .. })),
488                _ => false,
489            };
490            if dominated {
491                self.messages.pop();
492            } else {
493                break;
494            }
495        }
496        // Drop any trailing user message to prevent consecutive user messages.
497        // This happens when a previous send_message was cancelled after pushing
498        // the user message but before the assistant could respond.
499        if matches!(self.messages.last(), Some(msg) if msg.role == Role::User) {
500            self.messages.pop();
501        }
502    }
503
504    pub async fn send_message(
505        &mut self,
506        content: &str,
507        event_tx: UnboundedSender<AgentEvent>,
508    ) -> Result<()> {
509        self.send_message_with_images(content, Vec::new(), event_tx)
510            .await
511    }
512
513    pub async fn send_message_with_images(
514        &mut self,
515        content: &str,
516        images: Vec<(String, String)>,
517        event_tx: UnboundedSender<AgentEvent>,
518    ) -> Result<()> {
519        self.sanitize();
520        {
521            let mut ctx = self.event_context(&Event::OnUserInput);
522            ctx.prompt = Some(content.to_string());
523            self.hooks.emit(&Event::OnUserInput, &ctx);
524        }
525        if !self.provider().supports_server_compaction() && self.should_compact() {
526            self.compact(&event_tx).await?;
527        }
528        self.db
529            .add_message(&self.conversation_id, "user", content)?;
530        let mut blocks: Vec<ContentBlock> = Vec::new();
531        for (media_type, data) in images {
532            blocks.push(ContentBlock::Image { media_type, data });
533        }
534        blocks.push(ContentBlock::Text(content.to_string()));
535        self.messages.push(Message {
536            role: Role::User,
537            content: blocks,
538        });
539        let title_rx = if self.messages.len() == 1 {
540            let preview: String = content.chars().take(50).collect();
541            let preview = preview.trim().to_string();
542            if !preview.is_empty() {
543                let _ = self
544                    .db
545                    .update_conversation_title(&self.conversation_id, &preview);
546                let _ = event_tx.send(AgentEvent::TitleGenerated(preview));
547            }
548            let title_messages = vec![Message {
549                role: Role::User,
550                content: vec![ContentBlock::Text(format!(
551                    "Generate a title for this conversation:\n\n{}",
552                    content
553                ))],
554            }];
555            match self
556                .provider()
557                .stream_with_model(
558                    self.title_model(),
559                    &title_messages,
560                    Some(TITLE_SYSTEM_PROMPT),
561                    &[],
562                    100,
563                    0,
564                )
565                .await
566            {
567                Ok(rx) => Some(rx),
568                Err(e) => {
569                    tracing::warn!("title generation stream failed: {e}");
570                    None
571                }
572            }
573        } else {
574            None
575        };
576        let mut final_usage: Option<Usage> = None;
577        let mut system_prompt = self
578            .agents_context
579            .apply_to_system_prompt(&self.profile().system_prompt);
580        if let Some(ref store) = self.memory {
581            let query: String = content.chars().take(200).collect();
582            match store.inject_context(&query, self.memory_inject_count) {
583                Ok(ctx) if !ctx.is_empty() => {
584                    system_prompt.push_str("\n\n");
585                    system_prompt.push_str(&ctx);
586                }
587                Err(e) => tracing::warn!("memory injection failed: {e}"),
588                _ => {}
589            }
590            system_prompt.push_str(MEMORY_INSTRUCTIONS);
591        }
592        let tool_filter = self.profile().tool_filter.clone();
593        let thinking_budget = self.thinking_budget;
594        loop {
595            let mut tool_defs = self.tools.definitions_filtered(&tool_filter);
596            tool_defs.push(crate::provider::ToolDefinition {
597                name: "todo_write".to_string(),
598                description: "Create or update the task list for the current session. Use to track progress on multi-step tasks.".to_string(),
599                input_schema: serde_json::json!({
600                    "type": "object",
601                    "properties": {
602                        "todos": {
603                            "type": "array",
604                            "items": {
605                                "type": "object",
606                                "properties": {
607                                    "content": { "type": "string", "description": "Brief description of the task" },
608                                    "status": { "type": "string", "enum": ["pending", "in_progress", "completed"], "description": "Current status" }
609                                },
610                                "required": ["content", "status"]
611                            }
612                        }
613                    },
614                    "required": ["todos"]
615                }),
616            });
617            tool_defs.push(crate::provider::ToolDefinition {
618                name: "question".to_string(),
619                description: "Ask the user a question and wait for their response. Use when you need clarification or a decision from the user.".to_string(),
620                input_schema: serde_json::json!({
621                    "type": "object",
622                    "properties": {
623                        "question": { "type": "string", "description": "The question to ask the user" },
624                        "options": { "type": "array", "items": { "type": "string" }, "description": "Optional list of choices" }
625                    },
626                    "required": ["question"]
627                }),
628            });
629            tool_defs.push(crate::provider::ToolDefinition {
630                name: "snapshot_list".to_string(),
631                description: "List all files that have been created or modified in this session."
632                    .to_string(),
633                input_schema: serde_json::json!({
634                    "type": "object",
635                    "properties": {},
636                }),
637            });
638            tool_defs.push(crate::provider::ToolDefinition {
639                name: "snapshot_restore".to_string(),
640                description: "Restore a file to its original state before this session modified it. Pass a path or omit to restore all files.".to_string(),
641                input_schema: serde_json::json!({
642                    "type": "object",
643                    "properties": {
644                        "path": { "type": "string", "description": "File path to restore (omit to restore all)" }
645                    },
646                }),
647            });
648            if self.subagent_enabled {
649                let profile_names: Vec<String> =
650                    self.profiles.iter().map(|p| p.name.clone()).collect();
651                let profiles_desc = if profile_names.is_empty() {
652                    String::new()
653                } else {
654                    format!(" Available profiles: {}.", profile_names.join(", "))
655                };
656                tool_defs.push(crate::provider::ToolDefinition {
657                    name: "subagent".to_string(),
658                    description: format!(
659                        "Delegate a focused task to a subagent that runs in isolated context with its own conversation. \
660                         The subagent has access to tools and works autonomously without user interaction. \
661                         Use for complex subtasks that benefit from separate context (research, code analysis, multi-file changes). \
662                         Set background=true to run non-blocking (returns immediately with an ID; retrieve results later with subagent_result).{}",
663                        profiles_desc
664                    ),
665                    input_schema: serde_json::json!({
666                        "type": "object",
667                        "properties": {
668                            "description": {
669                                "type": "string",
670                                "description": "What the subagent should do (used as system prompt context)"
671                            },
672                            "task": {
673                                "type": "string",
674                                "description": "The specific task prompt for the subagent"
675                            },
676                            "profile": {
677                                "type": "string",
678                                "description": "Agent profile to use (affects available tools and system prompt)"
679                            },
680                            "background": {
681                                "type": "boolean",
682                                "description": "Run in background (non-blocking). Returns an ID to check later with subagent_result."
683                            }
684                        },
685                        "required": ["description", "task"]
686                    }),
687                });
688                tool_defs.push(crate::provider::ToolDefinition {
689                    name: "subagent_result".to_string(),
690                    description: "Retrieve the result of a background subagent by ID. Returns the output if complete, or a status message if still running.".to_string(),
691                    input_schema: serde_json::json!({
692                        "type": "object",
693                        "properties": {
694                            "id": {
695                                "type": "string",
696                                "description": "The subagent ID returned when it was launched in background mode"
697                            }
698                        },
699                        "required": ["id"]
700                    }),
701                });
702            }
703            if self.memory.is_some() {
704                tool_defs.extend(crate::memory::tools::definitions());
705            }
706            {
707                let mut ctx = self.event_context(&Event::BeforePrompt);
708                ctx.prompt = Some(content.to_string());
709                match self.hooks.emit_blocking(&Event::BeforePrompt, &ctx) {
710                    HookResult::Block(reason) => {
711                        let _ = event_tx.send(AgentEvent::TextComplete(format!(
712                            "[blocked by hook: {}]",
713                            reason.trim()
714                        )));
715                        return Ok(());
716                    }
717                    HookResult::Modify(_modified) => {}
718                    HookResult::Allow => {}
719                }
720            }
721            self.hooks.emit(
722                &Event::OnStreamStart,
723                &self.event_context(&Event::OnStreamStart),
724            );
725            let mut stream_rx = self
726                .provider()
727                .stream(
728                    &self.messages,
729                    Some(&system_prompt),
730                    &tool_defs,
731                    8192,
732                    thinking_budget,
733                )
734                .await?;
735            self.hooks.emit(
736                &Event::OnStreamEnd,
737                &self.event_context(&Event::OnStreamEnd),
738            );
739            let mut full_text = String::new();
740            let mut full_thinking = String::new();
741            let mut full_thinking_signature = String::new();
742            let mut compaction_content: Option<String> = None;
743            let mut tool_calls: Vec<PendingToolCall> = Vec::new();
744            let mut current_tool_input = String::new();
745            while let Some(event) = stream_rx.recv().await {
746                match event.event_type {
747                    StreamEventType::TextDelta(text) => {
748                        full_text.push_str(&text);
749                        let _ = event_tx.send(AgentEvent::TextDelta(text));
750                    }
751                    StreamEventType::ThinkingDelta(text) => {
752                        full_thinking.push_str(&text);
753                        let _ = event_tx.send(AgentEvent::ThinkingDelta(text));
754                    }
755                    StreamEventType::ThinkingComplete {
756                        thinking,
757                        signature,
758                    } => {
759                        full_thinking = thinking;
760                        full_thinking_signature = signature;
761                    }
762                    StreamEventType::CompactionComplete(content) => {
763                        let _ = event_tx.send(AgentEvent::Compacting);
764                        compaction_content = Some(content);
765                    }
766                    StreamEventType::ToolUseStart { id, name } => {
767                        current_tool_input.clear();
768                        let _ = event_tx.send(AgentEvent::ToolCallStart {
769                            id: id.clone(),
770                            name: name.clone(),
771                        });
772                        tool_calls.push(PendingToolCall {
773                            id,
774                            name,
775                            input: String::new(),
776                        });
777                    }
778                    StreamEventType::ToolUseInputDelta(delta) => {
779                        current_tool_input.push_str(&delta);
780                        let _ = event_tx.send(AgentEvent::ToolCallInputDelta(delta));
781                    }
782                    StreamEventType::ToolUseEnd => {
783                        if let Some(tc) = tool_calls.last_mut() {
784                            tc.input = current_tool_input.clone();
785                        }
786                        current_tool_input.clear();
787                    }
788                    StreamEventType::MessageEnd {
789                        stop_reason: _,
790                        usage,
791                    } => {
792                        self.last_input_tokens = usage.input_tokens;
793                        let _ = self
794                            .db
795                            .update_last_input_tokens(&self.conversation_id, usage.input_tokens);
796                        final_usage = Some(usage);
797                    }
798
799                    _ => {}
800                }
801            }
802
803            let mut content_blocks: Vec<ContentBlock> = Vec::new();
804            if let Some(ref summary) = compaction_content {
805                content_blocks.push(ContentBlock::Compaction {
806                    content: summary.clone(),
807                });
808            }
809            if !full_thinking.is_empty() {
810                content_blocks.push(ContentBlock::Thinking {
811                    thinking: full_thinking.clone(),
812                    signature: full_thinking_signature.clone(),
813                });
814            }
815            if !full_text.is_empty() {
816                content_blocks.push(ContentBlock::Text(full_text.clone()));
817            }
818
819            for tc in &tool_calls {
820                let input_value: serde_json::Value =
821                    serde_json::from_str(&tc.input).unwrap_or_else(|_| serde_json::json!({}));
822                content_blocks.push(ContentBlock::ToolUse {
823                    id: tc.id.clone(),
824                    name: tc.name.clone(),
825                    input: input_value,
826                });
827            }
828
829            self.messages.push(Message {
830                role: Role::Assistant,
831                content: content_blocks,
832            });
833            let stored_text = if !full_text.is_empty() {
834                full_text.clone()
835            } else {
836                String::from("[tool use]")
837            };
838            let assistant_msg_id =
839                self.db
840                    .add_message(&self.conversation_id, "assistant", &stored_text)?;
841            for tc in &tool_calls {
842                let _ = self
843                    .db
844                    .add_tool_call(&assistant_msg_id, &tc.id, &tc.name, &tc.input);
845            }
846            {
847                let mut ctx = self.event_context(&Event::AfterPrompt);
848                ctx.prompt = Some(full_text.clone());
849                self.hooks.emit(&Event::AfterPrompt, &ctx);
850            }
851            self.snapshots.checkpoint();
852            if tool_calls.is_empty() {
853                let _ = event_tx.send(AgentEvent::TextComplete(full_text));
854                if let Some(usage) = final_usage {
855                    let _ = event_tx.send(AgentEvent::Done { usage });
856                }
857                if self.memory_auto_extract
858                    && let Some(ref store) = self.memory
859                {
860                    let msgs = self.messages.clone();
861                    let provider = self.provider_arc();
862                    let store = Arc::clone(store);
863                    let conv_id = self.conversation_id.clone();
864                    let etx = event_tx.clone();
865                    tokio::spawn(async move {
866                        match crate::memory::extract::extract(&msgs, &*provider, &store, &conv_id)
867                            .await
868                        {
869                            Ok(result)
870                                if result.added > 0 || result.updated > 0 || result.deleted > 0 =>
871                            {
872                                let _ = etx.send(AgentEvent::MemoryExtracted {
873                                    added: result.added,
874                                    updated: result.updated,
875                                    deleted: result.deleted,
876                                });
877                            }
878                            Err(e) => tracing::warn!("memory extraction failed: {e}"),
879                            _ => {}
880                        }
881                    });
882                }
883                break;
884            }
885
886            let mut result_blocks: Vec<ContentBlock> = Vec::new();
887
888            for tc in &tool_calls {
889                let input_value: serde_json::Value =
890                    serde_json::from_str(&tc.input).unwrap_or_else(|_| serde_json::json!({}));
891                // Virtual tool: todo_write
892                if tc.name == "todo_write" {
893                    if let Some(todos_arr) = input_value.get("todos").and_then(|v| v.as_array()) {
894                        let items: Vec<TodoItem> = todos_arr
895                            .iter()
896                            .filter_map(|t| {
897                                let content = t.get("content")?.as_str()?.to_string();
898                                let status = match t
899                                    .get("status")
900                                    .and_then(|s| s.as_str())
901                                    .unwrap_or("pending")
902                                {
903                                    "in_progress" => TodoStatus::InProgress,
904                                    "completed" => TodoStatus::Completed,
905                                    _ => TodoStatus::Pending,
906                                };
907                                Some(TodoItem { content, status })
908                            })
909                            .collect();
910                        let _ = event_tx.send(AgentEvent::TodoUpdate(items));
911                    }
912                    let _ = event_tx.send(AgentEvent::ToolCallResult {
913                        id: tc.id.clone(),
914                        name: tc.name.clone(),
915                        output: "ok".to_string(),
916                        is_error: false,
917                    });
918                    result_blocks.push(ContentBlock::ToolResult {
919                        tool_use_id: tc.id.clone(),
920                        content: "ok".to_string(),
921                        is_error: false,
922                    });
923                    continue;
924                }
925                // Virtual tool: question
926                if tc.name == "question" {
927                    let question = input_value
928                        .get("question")
929                        .and_then(|v| v.as_str())
930                        .unwrap_or("?")
931                        .to_string();
932                    let options: Vec<String> = input_value
933                        .get("options")
934                        .and_then(|v| v.as_array())
935                        .map(|arr| {
936                            arr.iter()
937                                .filter_map(|v| v.as_str().map(String::from))
938                                .collect()
939                        })
940                        .unwrap_or_default();
941                    let (tx, rx) = tokio::sync::oneshot::channel();
942                    let _ = event_tx.send(AgentEvent::Question {
943                        id: tc.id.clone(),
944                        question: question.clone(),
945                        options,
946                        responder: QuestionResponder(tx),
947                    });
948                    let answer = match rx.await {
949                        Ok(a) => a,
950                        Err(_) => "[cancelled]".to_string(),
951                    };
952                    let _ = event_tx.send(AgentEvent::ToolCallResult {
953                        id: tc.id.clone(),
954                        name: tc.name.clone(),
955                        output: answer.clone(),
956                        is_error: false,
957                    });
958                    result_blocks.push(ContentBlock::ToolResult {
959                        tool_use_id: tc.id.clone(),
960                        content: answer,
961                        is_error: false,
962                    });
963                    continue;
964                }
965                // Virtual tool: snapshot_list
966                if tc.name == "snapshot_list" {
967                    let changes = self.snapshots.list_changes();
968                    let output = if changes.is_empty() {
969                        "No file changes in this session.".to_string()
970                    } else {
971                        changes
972                            .iter()
973                            .map(|(p, k)| format!("{} {}", k.icon(), p))
974                            .collect::<Vec<_>>()
975                            .join("\n")
976                    };
977                    let _ = event_tx.send(AgentEvent::ToolCallResult {
978                        id: tc.id.clone(),
979                        name: tc.name.clone(),
980                        output: output.clone(),
981                        is_error: false,
982                    });
983                    result_blocks.push(ContentBlock::ToolResult {
984                        tool_use_id: tc.id.clone(),
985                        content: output,
986                        is_error: false,
987                    });
988                    continue;
989                }
990                // Virtual tool: snapshot_restore
991                if tc.name == "snapshot_restore" {
992                    let output =
993                        if let Some(path) = input_value.get("path").and_then(|v| v.as_str()) {
994                            match self.snapshots.restore(path) {
995                                Ok(msg) => msg,
996                                Err(e) => e.to_string(),
997                            }
998                        } else {
999                            match self.snapshots.restore_all() {
1000                                Ok(msgs) => {
1001                                    if msgs.is_empty() {
1002                                        "Nothing to restore.".to_string()
1003                                    } else {
1004                                        msgs.join("\n")
1005                                    }
1006                                }
1007                                Err(e) => e.to_string(),
1008                            }
1009                        };
1010                    let _ = event_tx.send(AgentEvent::ToolCallResult {
1011                        id: tc.id.clone(),
1012                        name: tc.name.clone(),
1013                        output: output.clone(),
1014                        is_error: false,
1015                    });
1016                    result_blocks.push(ContentBlock::ToolResult {
1017                        tool_use_id: tc.id.clone(),
1018                        content: output,
1019                        is_error: false,
1020                    });
1021                    continue;
1022                }
1023                // Virtual tool: batch
1024                if tc.name == "batch" {
1025                    let invocations = input_value
1026                        .get("invocations")
1027                        .and_then(|v| v.as_array())
1028                        .cloned()
1029                        .unwrap_or_default();
1030                    tracing::debug!("batch: {} invocations", invocations.len());
1031                    let results: Vec<serde_json::Value> = invocations
1032                        .iter()
1033                        .map(|inv| {
1034                            let name = inv.get("tool_name").and_then(|v| v.as_str()).unwrap_or("");
1035                            let input = inv.get("input").cloned().unwrap_or(serde_json::Value::Null);
1036                            match self.tools.execute(name, input) {
1037                                Ok(out) => serde_json::json!({ "tool_name": name, "result": out, "is_error": false }),
1038                                Err(e) => serde_json::json!({ "tool_name": name, "result": e.to_string(), "is_error": true }),
1039                            }
1040                        })
1041                        .collect();
1042                    let output = serde_json::to_string(&results).unwrap_or_else(|e| e.to_string());
1043                    let _ = event_tx.send(AgentEvent::ToolCallResult {
1044                        id: tc.id.clone(),
1045                        name: tc.name.clone(),
1046                        output: output.clone(),
1047                        is_error: false,
1048                    });
1049                    result_blocks.push(ContentBlock::ToolResult {
1050                        tool_use_id: tc.id.clone(),
1051                        content: output,
1052                        is_error: false,
1053                    });
1054                    continue;
1055                }
1056                // Virtual tool: subagent
1057                if tc.name == "subagent" {
1058                    let description = input_value
1059                        .get("description")
1060                        .and_then(|v| v.as_str())
1061                        .unwrap_or("subtask")
1062                        .to_string();
1063                    let task = input_value
1064                        .get("task")
1065                        .and_then(|v| v.as_str())
1066                        .unwrap_or("")
1067                        .to_string();
1068                    let profile = input_value
1069                        .get("profile")
1070                        .and_then(|v| v.as_str())
1071                        .map(String::from);
1072                    let background = input_value
1073                        .get("background")
1074                        .and_then(|v| v.as_bool())
1075                        .unwrap_or(false);
1076
1077                    let output = if background {
1078                        match self.spawn_background_subagent(
1079                            &description,
1080                            &task,
1081                            profile.as_deref(),
1082                        ) {
1083                            Ok(id) => format!("Background subagent launched with id: {id}"),
1084                            Err(e) => {
1085                                tracing::error!("background subagent error: {e}");
1086                                format!("[subagent error: {e}]")
1087                            }
1088                        }
1089                    } else {
1090                        match self
1091                            .run_subagent(&description, &task, profile.as_deref(), &event_tx)
1092                            .await
1093                        {
1094                            Ok(text) => text,
1095                            Err(e) => {
1096                                tracing::error!("subagent error: {e}");
1097                                format!("[subagent error: {e}]")
1098                            }
1099                        }
1100                    };
1101                    let is_error = output.starts_with("[subagent error:");
1102                    let _ = event_tx.send(AgentEvent::ToolCallResult {
1103                        id: tc.id.clone(),
1104                        name: tc.name.clone(),
1105                        output: output.clone(),
1106                        is_error,
1107                    });
1108                    result_blocks.push(ContentBlock::ToolResult {
1109                        tool_use_id: tc.id.clone(),
1110                        content: output,
1111                        is_error,
1112                    });
1113                    continue;
1114                }
1115                // Virtual tool: subagent_result
1116                if tc.name == "subagent_result" {
1117                    let id = input_value.get("id").and_then(|v| v.as_str()).unwrap_or("");
1118                    let output = {
1119                        let results = self
1120                            .background_results
1121                            .lock()
1122                            .unwrap_or_else(|e| e.into_inner());
1123                        if let Some(result) = results.get(id) {
1124                            result.clone()
1125                        } else if self.background_handles.contains_key(id) {
1126                            format!("Subagent '{id}' is still running.")
1127                        } else {
1128                            format!("No subagent found with id '{id}'.")
1129                        }
1130                    };
1131                    let _ = event_tx.send(AgentEvent::ToolCallResult {
1132                        id: tc.id.clone(),
1133                        name: tc.name.clone(),
1134                        output: output.clone(),
1135                        is_error: false,
1136                    });
1137                    result_blocks.push(ContentBlock::ToolResult {
1138                        tool_use_id: tc.id.clone(),
1139                        content: output,
1140                        is_error: false,
1141                    });
1142                    continue;
1143                }
1144                // Virtual tools: memory
1145                if let Some(ref store) = self.memory
1146                    && let Some((output, is_error)) = crate::memory::tools::handle(
1147                        &tc.name,
1148                        &input_value,
1149                        store,
1150                        &self.conversation_id,
1151                    )
1152                {
1153                    let _ = event_tx.send(AgentEvent::ToolCallResult {
1154                        id: tc.id.clone(),
1155                        name: tc.name.clone(),
1156                        output: output.clone(),
1157                        is_error,
1158                    });
1159                    result_blocks.push(ContentBlock::ToolResult {
1160                        tool_use_id: tc.id.clone(),
1161                        content: output,
1162                        is_error,
1163                    });
1164                    continue;
1165                }
1166                // Permission check
1167                let perm = self
1168                    .permissions
1169                    .get(&tc.name)
1170                    .map(|s| s.as_str())
1171                    .unwrap_or("allow");
1172                if perm == "deny" {
1173                    let output = format!("Tool '{}' is denied by permissions config.", tc.name);
1174                    let _ = event_tx.send(AgentEvent::ToolCallResult {
1175                        id: tc.id.clone(),
1176                        name: tc.name.clone(),
1177                        output: output.clone(),
1178                        is_error: true,
1179                    });
1180                    result_blocks.push(ContentBlock::ToolResult {
1181                        tool_use_id: tc.id.clone(),
1182                        content: output,
1183                        is_error: true,
1184                    });
1185                    continue;
1186                }
1187                if perm == "ask" {
1188                    let summary = format!("{}: {}", tc.name, &tc.input[..tc.input.len().min(100)]);
1189                    let (ptx, prx) = tokio::sync::oneshot::channel();
1190                    let _ = event_tx.send(AgentEvent::PermissionRequest {
1191                        tool_name: tc.name.clone(),
1192                        input_summary: summary,
1193                        responder: QuestionResponder(ptx),
1194                    });
1195                    let answer = match prx.await {
1196                        Ok(a) => a,
1197                        Err(_) => "deny".to_string(),
1198                    };
1199                    if answer != "allow" {
1200                        let output = format!("Tool '{}' denied by user.", tc.name);
1201                        let _ = event_tx.send(AgentEvent::ToolCallResult {
1202                            id: tc.id.clone(),
1203                            name: tc.name.clone(),
1204                            output: output.clone(),
1205                            is_error: true,
1206                        });
1207                        result_blocks.push(ContentBlock::ToolResult {
1208                            tool_use_id: tc.id.clone(),
1209                            content: output,
1210                            is_error: true,
1211                        });
1212                        continue;
1213                    }
1214                }
1215                // Snapshot before file writes
1216                if tc.name == "write_file" || tc.name == "apply_patch" {
1217                    if tc.name == "write_file" {
1218                        if let Some(path) = input_value.get("path").and_then(|v| v.as_str()) {
1219                            self.snapshots.before_write(path);
1220                        }
1221                    } else if let Some(patches) =
1222                        input_value.get("patches").and_then(|v| v.as_array())
1223                    {
1224                        for patch in patches {
1225                            if let Some(path) = patch.get("path").and_then(|v| v.as_str()) {
1226                                self.snapshots.before_write(path);
1227                            }
1228                        }
1229                    }
1230                }
1231                {
1232                    let mut ctx = self.event_context(&Event::BeforeToolCall);
1233                    ctx.tool_name = Some(tc.name.clone());
1234                    ctx.tool_input = Some(tc.input.clone());
1235                    match self.hooks.emit_blocking(&Event::BeforeToolCall, &ctx) {
1236                        HookResult::Block(reason) => {
1237                            let output = format!("[blocked by hook: {}]", reason.trim());
1238                            let _ = event_tx.send(AgentEvent::ToolCallResult {
1239                                id: tc.id.clone(),
1240                                name: tc.name.clone(),
1241                                output: output.clone(),
1242                                is_error: true,
1243                            });
1244                            result_blocks.push(ContentBlock::ToolResult {
1245                                tool_use_id: tc.id.clone(),
1246                                content: output,
1247                                is_error: true,
1248                            });
1249                            continue;
1250                        }
1251                        HookResult::Modify(_modified) => {}
1252                        HookResult::Allow => {}
1253                    }
1254                }
1255                let _ = event_tx.send(AgentEvent::ToolCallExecuting {
1256                    id: tc.id.clone(),
1257                    name: tc.name.clone(),
1258                    input: tc.input.clone(),
1259                });
1260                let tool_name = tc.name.clone();
1261                let tool_input = input_value.clone();
1262                let exec_result = tokio::time::timeout(std::time::Duration::from_secs(30), async {
1263                    tokio::task::block_in_place(|| self.tools.execute(&tool_name, tool_input))
1264                })
1265                .await;
1266                let (output, is_error) = match exec_result {
1267                    Err(_elapsed) => {
1268                        let msg = format!("Tool '{}' timed out after 30 seconds.", tc.name);
1269                        let mut ctx = self.event_context(&Event::OnToolError);
1270                        ctx.tool_name = Some(tc.name.clone());
1271                        ctx.error = Some(msg.clone());
1272                        self.hooks.emit(&Event::OnToolError, &ctx);
1273                        (msg, true)
1274                    }
1275                    Ok(Err(e)) => {
1276                        let msg = e.to_string();
1277                        let mut ctx = self.event_context(&Event::OnToolError);
1278                        ctx.tool_name = Some(tc.name.clone());
1279                        ctx.error = Some(msg.clone());
1280                        self.hooks.emit(&Event::OnToolError, &ctx);
1281                        (msg, true)
1282                    }
1283                    Ok(Ok(out)) => (out, false),
1284                };
1285                tracing::debug!(
1286                    "Tool '{}' result (error={}): {}",
1287                    tc.name,
1288                    is_error,
1289                    &output[..output.len().min(200)]
1290                );
1291                let _ = self.db.update_tool_result(&tc.id, &output, is_error);
1292                let _ = event_tx.send(AgentEvent::ToolCallResult {
1293                    id: tc.id.clone(),
1294                    name: tc.name.clone(),
1295                    output: output.clone(),
1296                    is_error,
1297                });
1298                {
1299                    let mut ctx = self.event_context(&Event::AfterToolCall);
1300                    ctx.tool_name = Some(tc.name.clone());
1301                    ctx.tool_output = Some(output.clone());
1302                    self.hooks.emit(&Event::AfterToolCall, &ctx);
1303                }
1304                result_blocks.push(ContentBlock::ToolResult {
1305                    tool_use_id: tc.id.clone(),
1306                    content: output,
1307                    is_error,
1308                });
1309            }
1310
1311            self.messages.push(Message {
1312                role: Role::User,
1313                content: result_blocks,
1314            });
1315        }
1316
1317        let title = if let Some(mut rx) = title_rx {
1318            let mut raw = String::new();
1319            while let Some(event) = rx.recv().await {
1320                match event.event_type {
1321                    StreamEventType::TextDelta(text) => raw.push_str(&text),
1322                    StreamEventType::Error(e) => {
1323                        tracing::warn!("title stream error: {e}");
1324                    }
1325                    _ => {}
1326                }
1327            }
1328            let t = raw
1329                .trim()
1330                .trim_matches('"')
1331                .trim_matches('`')
1332                .trim_matches('*')
1333                .replace('\n', " ");
1334            let t: String = t.chars().take(50).collect();
1335            if t.is_empty() {
1336                tracing::warn!("title stream returned empty text");
1337                None
1338            } else {
1339                Some(t)
1340            }
1341        } else {
1342            None
1343        };
1344        let fallback = || -> String {
1345            self.messages
1346                .first()
1347                .and_then(|m| {
1348                    m.content.iter().find_map(|b| {
1349                        if let ContentBlock::Text(t) = b {
1350                            let s: String = t.chars().take(50).collect();
1351                            let s = s.trim().to_string();
1352                            if s.is_empty() { None } else { Some(s) }
1353                        } else {
1354                            None
1355                        }
1356                    })
1357                })
1358                .unwrap_or_else(|| "Chat".to_string())
1359        };
1360        let title = title.unwrap_or_else(fallback);
1361        let _ = self
1362            .db
1363            .update_conversation_title(&self.conversation_id, &title);
1364        let _ = event_tx.send(AgentEvent::TitleGenerated(title.clone()));
1365        {
1366            let mut ctx = self.event_context(&Event::OnTitleGenerated);
1367            ctx.title = Some(title);
1368            self.hooks.emit(&Event::OnTitleGenerated, &ctx);
1369        }
1370
1371        Ok(())
1372    }
1373}