Skip to main content

capo_agent/
app.rs

1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9    AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10    CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate, PromptStrategy};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23/// Internal accumulator for per-session cumulative token counts. Distinct
24/// from the wire-form `UiEvent::TurnStats` (which also carries per-turn
25/// deltas + model). See spec §3.1 and §3.4.
26#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
27pub struct TurnStatsAccum {
28    pub cumulative_input: u64,
29    pub cumulative_output: u64,
30    pub turn_count: u64,
31}
32
33impl TurnStatsAccum {
34    /// Add a `TokenUsage` from a completed turn. Saturating arithmetic
35    /// — `u64::MAX` is the cap (overflow is practically unreachable
36    /// at realistic token counts).
37    pub fn add(&mut self, usage: motosan_agent_loop::TokenUsage) {
38        self.cumulative_input = self.cumulative_input.saturating_add(usage.input_tokens);
39        self.cumulative_output = self.cumulative_output.saturating_add(usage.output_tokens);
40        self.turn_count = self.turn_count.saturating_add(1);
41    }
42}
43
44/// Extract `Reasoning` (thinking/chain-of-thought) ContentParts from any
45/// `Assistant` Messages in `messages`. Returns one `UiEvent::ThinkingComplete`
46/// per Reasoning part, in source order. Pure function — unit-testable.
47///
48/// Used by `run_turn` to surface thinking blocks to the TUI after a turn
49/// completes (motosan-agent-loop 0.21.1 doesn't stream thinking deltas;
50/// see v0.9 design spec §3.5).
51pub(crate) fn extract_thinking_events(messages: &[motosan_agent_loop::Message]) -> Vec<UiEvent> {
52    let mut events = Vec::new();
53    for msg in messages {
54        if let motosan_agent_loop::Message::Assistant { content, .. } = msg {
55            for part in content {
56                if let motosan_agent_loop::AssistantContent::Reasoning { text, .. } = part {
57                    events.push(UiEvent::ThinkingComplete { text: text.clone() });
58                }
59            }
60        }
61    }
62    events
63}
64
65/// Extract thinking events only for messages appended during the just-finished
66/// turn. `AgentResult.messages` / terminal messages contain full session
67/// history; slicing at `previous_len` prevents historical reasoning blocks
68/// from being re-emitted on every later turn.
69pub(crate) fn extract_new_thinking_events(
70    messages: &[motosan_agent_loop::Message],
71    previous_len: usize,
72) -> Vec<UiEvent> {
73    extract_thinking_events(messages.get(previous_len..).unwrap_or(&[]))
74}
75
76/// How a fresh `AgentSession` should be constructed by `SessionFactory`.
77#[derive(Debug, Clone)]
78pub(crate) enum SessionMode {
79    /// A brand-new session (fresh ulid; persisted if a store is configured).
80    New,
81    /// Resume an existing session by id (requires a configured store).
82    Resume(String),
83}
84
85/// Everything needed to (re)build an `AgentSession` + its `LlmClient`.
86/// Stored on `App` so `new_session` / `load_session` / `switch_model` can
87/// rebuild without re-running `AppBuilder`. All fields are cheap to clone
88/// (owned values or `Arc`s).
89struct SharedLlm {
90    client: Arc<dyn LlmClient>,
91}
92
93impl SharedLlm {
94    fn new(client: Arc<dyn LlmClient>) -> Self {
95        Self { client }
96    }
97
98    fn client(&self) -> Arc<dyn LlmClient> {
99        Arc::clone(&self.client)
100    }
101}
102
103pub(crate) struct SessionFactory {
104    cwd: PathBuf,
105    settings: Arc<Mutex<crate::settings::Settings>>,
106    auth: crate::auth::Auth,
107    policy: Arc<crate::permissions::Policy>,
108    session_cache: Arc<crate::permissions::SessionCache>,
109    ui_tx: Option<mpsc::Sender<UiEvent>>,
110    headless_permissions: bool,
111    permission_gate: Arc<dyn PermissionGate>,
112    permission_strategy_handle: Option<Arc<tokio::sync::RwLock<PromptStrategy>>>,
113    /// The SHARED progress sender. Rebuilt tools must use this exact
114    /// sender so `App.progress_rx` keeps receiving — see Task 5.
115    progress_tx: mpsc::Sender<ToolProgressChunk>,
116    skills: Arc<Vec<crate::skills::Skill>>,
117    install_builtin_tools: bool,
118    extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
119    max_iterations: usize,
120    context_discovery_disabled: bool,
121    autocompact_enabled: bool,
122    session_store: Option<Arc<dyn SessionStore>>,
123    llm_override: Option<Arc<dyn LlmClient>>,
124    current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
125    cancel_token: SharedCancelToken,
126}
127
128impl SessionFactory {
129    fn settings(&self) -> crate::settings::Settings {
130        match self.settings.lock() {
131            Ok(guard) => guard.clone(),
132            Err(poisoned) => poisoned.into_inner().clone(),
133        }
134    }
135
136    fn store_settings(&self, settings: crate::settings::Settings) {
137        match self.settings.lock() {
138            Ok(mut guard) => *guard = settings,
139            Err(poisoned) => *poisoned.into_inner() = settings,
140        }
141    }
142
143    fn current_model(&self) -> Option<crate::model::ModelId> {
144        match self.current_model.lock() {
145            Ok(guard) => guard.clone(),
146            Err(poisoned) => poisoned.into_inner().clone(),
147        }
148    }
149
150    fn set_current_model(&self, model: crate::model::ModelId) {
151        match self.current_model.lock() {
152            Ok(mut guard) => *guard = Some(model),
153            Err(poisoned) => *poisoned.into_inner() = Some(model),
154        }
155    }
156
157    fn clear_current_model(&self) {
158        match self.current_model.lock() {
159            Ok(mut guard) => *guard = None,
160            Err(poisoned) => *poisoned.into_inner() = None,
161        }
162    }
163
164    /// Build a fresh `AgentSession` + its `LlmClient`. `model_override`,
165    /// when set, replaces `settings.model.name` for this build (used by
166    /// `switch_model`). `settings_override`, when set, supplies the full
167    /// settings struct for this build and bypasses any sticky `/model`
168    /// override (used by `/settings`). The returned `LlmClient` is what
169    /// `App.llm` should be swapped to.
170    async fn build(
171        &self,
172        mode: SessionMode,
173        model_override: Option<&crate::model::ModelId>,
174        settings_override: Option<&crate::settings::Settings>,
175    ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
176        // Apply the model override (or last switched model) onto a settings clone.
177        // A settings override is authoritative: `/settings` should not be
178        // shadowed by an earlier `/model` sticky override.
179        let effective_model = model_override.cloned().or_else(|| {
180            if settings_override.is_some() {
181                None
182            } else {
183                self.current_model()
184            }
185        });
186        let mut settings = settings_override
187            .cloned()
188            .unwrap_or_else(|| self.settings());
189        if let Some(m) = &effective_model {
190            settings.model.name = m.as_str().to_string();
191        }
192
193        let llm = if effective_model.is_none() {
194            self.llm_override.as_ref().map_or_else(
195                || build_llm_client(&settings, &self.auth),
196                |llm| Ok(Arc::clone(llm)),
197            )?
198        } else {
199            build_llm_client(&settings, &self.auth)?
200        };
201
202        // Tools — rebuilt fresh each time, sharing the SAME progress_tx so
203        // `App.progress_rx` keeps receiving from whatever session is live.
204        let tool_ctx = ToolCtx::new_with_cancel_token(
205            &self.cwd,
206            Arc::clone(&self.permission_gate),
207            self.progress_tx.clone(),
208            self.cancel_token.clone(),
209        );
210        let mut tools = if self.install_builtin_tools {
211            builtin_tools(tool_ctx.clone())
212        } else {
213            Vec::new()
214        };
215        tools.extend(self.extra_tools.iter().cloned());
216
217        let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
218        let base_prompt = build_system_prompt(&tool_names, &self.skills);
219        let system_prompt = if self.context_discovery_disabled {
220            base_prompt
221        } else {
222            let agent_dir = crate::paths::agent_dir();
223            let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
224            crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
225        };
226        let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
227
228        let mut engine_builder = Engine::builder()
229            .max_iterations(self.max_iterations)
230            .system_prompt(system_prompt)
231            .tool_context(motosan_tool_context);
232        for tool in tools {
233            engine_builder = engine_builder.tool(tool);
234        }
235        if let Some(ui_tx) = &self.ui_tx {
236            let ext = if let Some(handle) = &self.permission_strategy_handle {
237                crate::permissions::PermissionExtension::with_strategy_handle(
238                    Arc::clone(&self.policy),
239                    Arc::clone(&self.session_cache),
240                    self.cwd.clone(),
241                    Some(ui_tx.clone()),
242                    Arc::clone(handle),
243                )
244            } else {
245                crate::permissions::PermissionExtension::new(
246                    Arc::clone(&self.policy),
247                    Arc::clone(&self.session_cache),
248                    self.cwd.clone(),
249                    ui_tx.clone(),
250                )
251            };
252            engine_builder = engine_builder.extension(Box::new(ext));
253        } else if self.headless_permissions {
254            let ext = if let Some(handle) = &self.permission_strategy_handle {
255                crate::permissions::PermissionExtension::with_strategy_handle(
256                    Arc::clone(&self.policy),
257                    Arc::clone(&self.session_cache),
258                    self.cwd.clone(),
259                    None,
260                    Arc::clone(handle),
261                )
262            } else {
263                crate::permissions::PermissionExtension::headless(
264                    Arc::clone(&self.policy),
265                    Arc::clone(&self.session_cache),
266                    self.cwd.clone(),
267                )
268            };
269            engine_builder = engine_builder.extension(Box::new(ext));
270        }
271        if self.autocompact_enabled
272            && settings.session.compact_at_context_pct > 0.0
273            && settings.session.compact_at_context_pct < 1.0
274        {
275            let cfg = AutocompactConfig {
276                threshold: settings.session.compact_at_context_pct,
277                max_context_tokens: settings.session.max_context_tokens,
278                keep_turns: settings.session.keep_turns.max(1),
279            };
280            engine_builder = engine_builder
281                .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
282        }
283        let engine = engine_builder.build();
284
285        let session = match (&mode, &self.session_store) {
286            (SessionMode::Resume(id), Some(store)) => {
287                let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
288                    .await
289                    .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
290                let entries = s
291                    .entries()
292                    .await
293                    .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
294                crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
295                s
296            }
297            (SessionMode::Resume(_), None) => {
298                return Err(AppError::Config("resume requires a session store".into()));
299            }
300            (SessionMode::New, Some(store)) => {
301                let id = crate::session::SessionId::new();
302                AgentSession::new_with_store(
303                    id.into_string(),
304                    Arc::clone(store),
305                    engine,
306                    Arc::clone(&llm),
307                )
308            }
309            (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
310        };
311
312        Ok((session, llm))
313    }
314}
315
316pub struct App {
317    session: arc_swap::ArcSwap<AgentSession>,
318    llm: arc_swap::ArcSwap<SharedLlm>,
319    factory: SessionFactory,
320    config: Config,
321    cancel_token: SharedCancelToken,
322    progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
323    next_tool_id: Arc<Mutex<ToolCallTracker>>,
324    skills: Arc<Vec<crate::skills::Skill>>,
325    mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
326    /// Loaded extensions for hook dispatch. Cheap to clone (Arc).
327    pub(crate) extension_registry: Arc<crate::extensions::ExtensionRegistry>,
328    /// Cumulative token counts for this session. Cheap to clone (Arc).
329    /// Updated after each turn's `AgentResult.usage`; not persisted across
330    /// `/resume` (motosan-agent-loop 0.21.1 doesn't carry per-turn
331    /// `TokenUsage` in `MessageMeta` — see spec §3.4 graceful degradation).
332    pub(crate) token_tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>,
333    extension_diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
334    pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
335    /// v0.10 Phase A: current permission mode (RwLock for runtime mutation).
336    /// Read by App::permission_mode(); written by App::set_permission_mode().
337    pub(crate) permission_mode: Arc<tokio::sync::RwLock<crate::permissions::PermissionMode>>,
338    /// v0.10 Phase A: handle to the PermissionExtension's strategy lock.
339    /// Some when a PermissionExtension is registered.
340    pub(crate) permission_strategy_handle: Option<Arc<tokio::sync::RwLock<PromptStrategy>>>,
341    /// v0.10 Phase A: ui_tx kept on App for emitting PermissionModeChanged.
342    /// Same Sender already used elsewhere (extension events, etc).
343    pub(crate) ui_tx_owned: Option<mpsc::Sender<UiEvent>>,
344}
345
346impl App {
347    pub fn config(&self) -> &Config {
348        &self.config
349    }
350
351    /// Request graceful cancellation of the currently-running turn (if any).
352    /// Safe to call from any task; the engine observes the token and halts
353    /// at the next safe point.
354    pub fn cancel(&self) {
355        self.cancel_token.cancel();
356    }
357
358    /// Exposes the session cache so front-ends can write "Allow for session"
359    /// decisions resolved by the user. Exposed as `Arc` so callers can drop
360    /// their reference freely.
361    pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
362        Arc::clone(&self.session_cache)
363    }
364
365    /// Read-only handle on the loaded extensions registry. For the
366    /// `Command::ListExtensions` dispatcher reply.
367    pub fn extension_registry(&self) -> Arc<crate::extensions::ExtensionRegistry> {
368        Arc::clone(&self.extension_registry)
369    }
370
371    /// Snapshot of the live settings currently used for session rebuilds.
372    pub fn settings(&self) -> crate::settings::Settings {
373        let mut settings = self.factory.settings();
374        if let Some(model) = self.factory.current_model() {
375            settings.model.name = model.to_string();
376        }
377        settings
378    }
379
380    /// Read-only handle on the cumulative token tally.
381    pub fn token_tally(&self) -> Arc<tokio::sync::Mutex<TurnStatsAccum>> {
382        Arc::clone(&self.token_tally)
383    }
384
385    /// v0.10 Phase A: change the permission mode for the current session.
386    /// Mutates the PermissionExtension's strategy lock; emits
387    /// UiEvent::PermissionModeChanged so RPC/TUI clients can sync.
388    /// Session-only — does NOT persist to settings.toml.
389    pub async fn set_permission_mode(&self, mode: crate::permissions::PermissionMode) {
390        let new_strategy = match mode {
391            crate::permissions::PermissionMode::Bypass => PromptStrategy::AllowAll,
392            crate::permissions::PermissionMode::AcceptEdits => PromptStrategy::AcceptEdits,
393            crate::permissions::PermissionMode::Prompt if self.ui_tx_owned.is_none() => {
394                PromptStrategy::HeadlessDeny
395            }
396            crate::permissions::PermissionMode::Prompt => PromptStrategy::Prompt,
397        };
398
399        if let Some(handle) = &self.permission_strategy_handle {
400            *handle.write().await = new_strategy;
401        }
402
403        *self.permission_mode.write().await = mode;
404
405        if let Some(ui_tx) = &self.ui_tx_owned {
406            let _ = ui_tx.send(UiEvent::PermissionModeChanged { mode }).await;
407        }
408    }
409
410    /// v0.10 Phase B-1: emit `UiEvent::SettingsSnapshot` carrying the
411    /// current live settings so clients learn startup settings without
412    /// issuing `Command::ReloadSettings`.
413    pub async fn emit_settings_snapshot(&self) {
414        if let Some(ui_tx) = &self.ui_tx_owned {
415            let _ = ui_tx
416                .send(UiEvent::SettingsSnapshot {
417                    settings: self.settings(),
418                })
419                .await;
420        }
421    }
422
423    /// Read the current permission mode.
424    pub async fn permission_mode(&self) -> crate::permissions::PermissionMode {
425        *self.permission_mode.read().await
426    }
427
428    /// Load-time diagnostics collected while building the extension registry.
429    pub fn extension_diagnostics(&self) -> Arc<Vec<crate::extensions::ExtensionDiagnostic>> {
430        Arc::clone(&self.extension_diagnostics)
431    }
432
433    /// M3: the underlying motosan session id. Always populated; ephemeral
434    /// sessions use a synthetic id internally.
435    pub fn session_id(&self) -> String {
436        self.session.load().session_id().to_string()
437    }
438
439    /// M3: snapshot of the session's persisted message history. Used by the
440    /// binary on `--continue` / `--session` to seed the TUI transcript so
441    /// the user can see what was said in prior runs. `AgentSession::resume`
442    /// already populates this internally for the *agent* to use as context
443    /// on the next turn; this method exposes it so the *front-end* can
444    /// render it too. Returns motosan's `Vec<Message>` verbatim (callers
445    /// decide how to map roles → UI blocks; system messages are typically
446    /// dropped because they're the prompt, not transcript).
447    pub async fn session_history(
448        &self,
449    ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
450        self.session.load_full().history().await
451    }
452
453    /// Manually compact the current session's context. Forces a one-shot
454    /// compaction (a zero-threshold `ThresholdStrategy` so `should_compact`
455    /// is always true), keeping the most recent few user turns. The
456    /// compaction marker is appended to the session jsonl like an
457    /// autocompact event.
458    pub async fn compact(&self) -> Result<()> {
459        use motosan_agent_loop::ThresholdStrategy;
460        let strategy = ThresholdStrategy {
461            threshold: 0.0,
462            ..ThresholdStrategy::default()
463        };
464        let llm = self.llm.load_full().client();
465        self.session
466            .load_full()
467            .maybe_compact(&strategy, llm)
468            .await
469            .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
470        Ok(())
471    }
472
473    /// Replace the live session with a brand-new empty one. An in-flight
474    /// turn (if any) keeps its own session snapshot and is unaffected; the
475    /// next `send_user_message` uses the new session.
476    pub async fn new_session(&self) -> Result<()> {
477        self.fire_session_before_switch("new", None).await?;
478        let (session, llm) = self.factory.build(SessionMode::New, None, None).await?;
479        self.session.store(Arc::new(session));
480        self.llm.store(Arc::new(SharedLlm::new(llm)));
481        self.reset_token_tally().await;
482        Ok(())
483    }
484
485    /// Replace the live session with a stored session loaded by id.
486    /// Errors if no session store is configured or the id is unknown.
487    ///
488    /// **Not turn-safe** — see `switch_model`'s doc. Phase D2's command
489    /// handler must gate this on no active turn.
490    pub async fn load_session(&self, id: &str) -> Result<()> {
491        self.fire_session_before_switch("load", Some(id)).await?;
492        self.load_session_without_hook(id).await
493    }
494
495    async fn load_session_without_hook(&self, id: &str) -> Result<()> {
496        let (session, llm) = self
497            .factory
498            .build(SessionMode::Resume(id.to_string()), None, None)
499            .await?;
500        self.session.store(Arc::new(session));
501        self.llm.store(Arc::new(SharedLlm::new(llm)));
502        self.reset_token_tally().await;
503        Ok(())
504    }
505
506    async fn reset_token_tally(&self) {
507        let mut tally = self.token_tally.lock().await;
508        *tally = TurnStatsAccum::default();
509    }
510
511    /// Copy the live session to a brand-new, fully independent session file
512    /// and switch to the copy. Unlike `/fork` (an in-file branch), the clone
513    /// is a separate file and appears in the `/resume` picker. Returns the
514    /// new session id. Requires a session store.
515    ///
516    /// **Not turn-safe** — like `load_session`, the caller (the
517    /// `Command::CloneSession` arm in `forward_commands`) must gate this on
518    /// no active turn.
519    pub async fn clone_session(&self) -> Result<String> {
520        self.fire_session_before_switch("clone", None).await?;
521        let Some(store) = self.factory.session_store.as_ref() else {
522            return Err(AppError::Config("clone requires a session store".into()));
523        };
524        let source_id = self.session.load().session_id().to_string();
525        let new_id = crate::session::SessionId::new().into_string();
526        let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
527        catalog
528            .fork(&source_id, &new_id)
529            .await
530            .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
531        self.load_session_without_hook(&new_id).await?;
532        Ok(new_id)
533    }
534
535    /// Rebuild the live session with a different model, preserving the
536    /// conversation. Requires a session store (the current session is
537    /// resumed by id under the new model). Errors otherwise.
538    ///
539    /// **Not turn-safe.** Like `load_session`, this resumes a session by
540    /// id. If a turn is still in flight on the *old* session when this is
541    /// called, that turn keeps writing entries under the same `session_id`
542    /// while the resumed session reads from it — a write/read interleaving
543    /// that the resumed session won't observe until its next reload. The
544    /// `App` API does not expose a turn-in-progress flag, so the *caller*
545    /// (Phase D2's `/model` / `/resume` command handlers in
546    /// `forward_commands`) MUST gate these calls on no active turn — the
547    /// `active_turn` bookkeeping already in `forward_commands` is exactly
548    /// that gate. D2's plan must wire that guard; D1 documents the contract.
549    pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
550        self.fire_session_before_switch("model_switch", None)
551            .await?;
552        let current_id = self.session.load().session_id().to_string();
553        let (session, llm) = self
554            .factory
555            .build(SessionMode::Resume(current_id), Some(model), None)
556            .await?;
557        self.factory.set_current_model(model.clone());
558        self.session.store(Arc::new(session));
559        self.llm.store(Arc::new(SharedLlm::new(llm)));
560        Ok(())
561    }
562
563    /// Replace the live session's settings + rebuild the underlying
564    /// session and LLM client. Used by the TUI `/settings` editor (via
565    /// `Command::ApplySettings`) and external RPC clients (via
566    /// `Command::ReloadSettings`).
567    ///
568    /// Does NOT fire `session_before_switch` — settings save is
569    /// user-intentional + the editor is right there; blocking via an
570    /// extension would be hostile UX. See v0.8 Phase B plan.
571    ///
572    /// Per the v0.8 Phase A mutable-accumulator audit lesson, resets
573    /// `token_tally` (the cumulative count is per-session; new settings
574    /// = effectively a new session even if the session id is preserved).
575    ///
576    /// **Not turn-safe** — like `switch_model`, the caller must gate on
577    /// no active turn.
578    pub async fn reload_settings(&self, new_settings: crate::settings::Settings) -> Result<()> {
579        // Rebuild session + LLM with the override. SessionMode::Resume
580        // preserves the session id and history; the LLM is rebuilt from
581        // the new settings (could be a different provider entirely).
582        let current_id = self.session.load().session_id().to_string();
583        let (session, llm) = self
584            .factory
585            .build(SessionMode::Resume(current_id), None, Some(&new_settings))
586            .await?;
587        self.factory.store_settings(new_settings);
588        self.factory.clear_current_model();
589        self.session.store(Arc::new(session));
590        self.llm.store(Arc::new(SharedLlm::new(llm)));
591
592        // Per v0.8 Phase A audit rule (mutable-accumulator extension):
593        // reset token tally on session replacement.
594        self.reset_token_tally().await;
595
596        Ok(())
597    }
598
599    /// M4 Phase B: disconnect every registered MCP server (2s per-server
600    /// timeout, best-effort). Call from the binary's ctrl-C handler.
601    pub async fn disconnect_mcp(&self) {
602        for (name, server) in &self.mcp_servers {
603            let _ =
604                tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
605            tracing::debug!(target: "mcp", server = %name, "disconnected");
606        }
607    }
608
609    fn run_turn(
610        &self,
611        msg: crate::user_message::UserMessage,
612        fork_from: Option<motosan_agent_loop::EntryId>,
613    ) -> impl Stream<Item = UiEvent> + Send + 'static {
614        let session = self.session.load_full();
615        let skills = Arc::clone(&self.skills);
616        let cancel_token = self.cancel_token.clone();
617        let tracker = Arc::clone(&self.next_tool_id);
618        let progress = Arc::clone(&self.progress_rx);
619        let token_tally = Arc::clone(&self.token_tally);
620        let settings_model_name = self
621            .factory
622            .current_model()
623            .map(|model| model.to_string())
624            .unwrap_or_else(|| self.factory.settings().model.name);
625
626        async_stream::stream! {
627            // Validate + encode attachments BEFORE the single-turn guard so a
628            // bad attachment error is never masked by "another turn is already
629            // running". See spec §3.
630            let new_user = {
631                // Skill expansion only applies to the user-visible text, not
632                // attachments. We thread the expanded text back into the
633                // UserMessage before preparation.
634                let expanded_text = crate::skills::expand::expand_skill_command(&msg.text, &skills);
635                let expanded_msg = crate::user_message::UserMessage {
636                    text: expanded_text,
637                    attachments: msg.attachments.clone(),
638                };
639                match crate::user_message::prepare_user_message(&expanded_msg) {
640                    Ok(m) => m,
641                    Err(err) => {
642                        yield UiEvent::AttachmentError {
643                            kind: err.kind(),
644                            message: err.to_string(),
645                        };
646                        return;
647                    }
648                }
649            };
650
651            // Single-turn guard (M2 contract preserved).
652            let mut progress_guard = match progress.try_lock() {
653                Ok(guard) => guard,
654                Err(_) => {
655                    yield UiEvent::Error(
656                        "another turn is already running; capo is single-turn-per-App".into(),
657                    );
658                    return;
659                }
660            };
661
662            // Reset cancel token for THIS turn.
663            let cancel = cancel_token.reset();
664
665            yield UiEvent::AgentTurnStarted;
666            yield UiEvent::AgentThinking;
667
668            // Start the turn (gives us a TurnHandle with stream + previous_len + branch_parent + ops_tx).
669            let handle = match fork_from {
670                None => {
671                    // Linear turn: load history, append, start_turn.
672                    let history = match session.history().await {
673                        Ok(h) => h,
674                        Err(err) => {
675                            yield UiEvent::Error(format!("session.history failed: {err}"));
676                            return;
677                        }
678                    };
679                    let mut messages = history;
680                    messages.push(new_user);
681                    match session.start_turn(messages).await {
682                        Ok(h) => h,
683                        Err(err) => {
684                            yield UiEvent::Error(format!("session.start_turn failed: {err}"));
685                            return;
686                        }
687                    }
688                }
689                Some(from) => {
690                    // Fork: fork_turn assembles the branch's prior history itself.
691                    match session.fork_turn(from, vec![new_user]).await {
692                        Ok(h) => h,
693                        Err(err) => {
694                            yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
695                            return;
696                        }
697                    }
698                }
699            };
700            let previous_len = handle.previous_len;
701            let epoch = handle.epoch;
702            let branch_parent = handle.branch_parent;
703            let ops_tx = handle.ops_tx.clone();
704            let mut agent_stream = handle.stream;
705
706            // Bridge our SharedCancelToken to motosan's AgentOp::Interrupt.
707            //
708            // `AgentSession::start_turn` does NOT take a CancellationToken —
709            // the engine's cancel-token path used in M2's direct `.run().cancel(tok)`
710            // is bypassed when going through AgentSession. The control plane is
711            // `ops_tx`. We spawn a tiny task that waits on `cancel.cancelled()`
712            // and forwards an `Interrupt` op when fired.
713            let interrupt_bridge = tokio::spawn(async move {
714                cancel.cancelled().await;
715                let _ = ops_tx.send(AgentOp::Interrupt).await;
716            });
717
718            // Drain events.
719            let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
720            let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
721
722            loop {
723                // Forward any progress chunks that arrived in this iteration.
724                while let Ok(chunk) = progress_guard.try_recv() {
725                    yield UiEvent::ToolCallProgress {
726                        id: progress_event_id(&tracker),
727                        chunk: ProgressChunk::from(chunk),
728                    };
729                }
730
731                tokio::select! {
732                    biased;
733                    maybe_item = agent_stream.next() => {
734                        match maybe_item {
735                            Some(AgentStreamItem::Event(ev)) => {
736                                if let Some(ui) = map_event(ev, &tracker) {
737                                    yield ui;
738                                }
739                            }
740                            Some(AgentStreamItem::Terminal(term)) => {
741                                terminal_result = Some(term.result);
742                                terminal_messages = Some(term.messages);
743                                break;
744                            }
745                            None => break,
746                        }
747                    }
748                    Some(chunk) = progress_guard.recv() => {
749                        yield UiEvent::ToolCallProgress {
750                            id: progress_event_id(&tracker),
751                            chunk: ProgressChunk::from(chunk),
752                        };
753                    }
754                }
755            }
756
757            // Tear down the interrupt bridge whether or not cancellation fired.
758            interrupt_bridge.abort();
759
760            // Persist new messages via record_turn_outcome (only when terminal reached).
761            if let Some(msgs) = terminal_messages.as_ref() {
762                if let Err(err) = session
763                    .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
764                    .await
765                {
766                    yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
767                }
768            }
769
770            // Translate the terminal Result into final UiEvents.
771            match terminal_result {
772                Some(Ok(result)) => {
773                    // v0.9 Phase A: surface thinking ContentParts BEFORE the final
774                    // assistant text. Spec §3.5: transcript order = thinking → text.
775                    if let Some(msgs) = terminal_messages.as_ref() {
776                        for ev in extract_new_thinking_events(msgs, previous_len) {
777                            yield ev;
778                        }
779                    }
780
781                    let final_text = terminal_messages
782                        .as_ref()
783                        .and_then(|msgs| {
784                            msgs.iter()
785                                .rev()
786                                .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
787                                .map(|m| m.text())
788                        })
789                        .unwrap_or_default();
790                    if !final_text.is_empty() {
791                        yield UiEvent::AgentMessageComplete(final_text);
792                    }
793                    // Emit TurnStats after AgentMessageComplete, before
794                    // AgentTurnComplete. See spec §3.2 emit ordering.
795                    let usage = result.usage;
796                    let (cumulative_input, cumulative_output) = {
797                        let mut tally = token_tally.lock().await;
798                        tally.add(usage);
799                        (tally.cumulative_input, tally.cumulative_output)
800                    };
801                    yield UiEvent::TurnStats {
802                        input_tokens: usage.input_tokens,
803                        output_tokens: usage.output_tokens,
804                        cumulative_input,
805                        cumulative_output,
806                        model: settings_model_name.clone(),
807                    };
808                    // Flush any remaining progress chunks.
809                    while let Ok(chunk) = progress_guard.try_recv() {
810                        yield UiEvent::ToolCallProgress {
811                            id: progress_event_id(&tracker),
812                            chunk: ProgressChunk::from(chunk),
813                        };
814                    }
815                    yield UiEvent::AgentTurnComplete;
816                }
817                Some(Err(err)) => {
818                    // MaxIterations is a soft cap: partial work was already
819                    // persisted by record_turn_outcome above, and the user
820                    // can just send another message to resume. Surface as a
821                    // Notice with a friendly affordance instead of a red
822                    // `[error]` block, and close the turn lifecycle cleanly
823                    // so the footer spinner / in_flight state is cleared.
824                    //
825                    // Token usage from the partial turn is intentionally
826                    // not emitted here — motosan's `AgentTerminal` only
827                    // carries `result + messages`, not the accumulated
828                    // `total_usage` on the error path. Surfacing it needs
829                    // an upstream `AgentTerminal::meta` extension.
830                    if let motosan_agent_loop::AgentError::MaxIterations(n) = err {
831                        yield UiEvent::Notice {
832                            title: "Agent stopped".to_string(),
833                            body: format!(
834                                "Reached the per-turn iteration cap ({n}). \
835                                 Partial work is saved — send another message to continue."
836                            ),
837                        };
838                        yield UiEvent::AgentTurnComplete;
839                    } else {
840                        yield UiEvent::Error(format!("{err}"));
841                    }
842                }
843                None => { /* stream closed without terminal — cancelled */ }
844            }
845        }
846    }
847
848    pub fn send_user_message(
849        &self,
850        msg: crate::user_message::UserMessage,
851    ) -> impl Stream<Item = UiEvent> + Send + 'static {
852        self.run_turn(msg, None)
853    }
854
855    /// Continue the conversation from an earlier entry, creating a branch.
856    /// `from` is the `EntryId` to attach under (typically a past user
857    /// message from `fork_candidates`). Streams `UiEvent`s exactly as
858    /// `send_user_message` does.
859    pub fn fork_from(
860        &self,
861        from: motosan_agent_loop::EntryId,
862        message: crate::user_message::UserMessage,
863    ) -> impl Stream<Item = UiEvent> + Send + 'static {
864        let registry = Arc::clone(&self.extension_registry);
865        let inner = self.run_turn(message, Some(from));
866        async_stream::stream! {
867            use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
868            match dispatch_session_before_switch(&registry, "fork", None).await {
869                HookOutcome::Continue => {}
870                HookOutcome::Cancelled { extension_name, reason } => {
871                    let msg = match reason {
872                        Some(r) => format!("extension `{extension_name}` cancelled fork: {r}"),
873                        None => format!("extension `{extension_name}` cancelled fork"),
874                    };
875                    yield UiEvent::Error(msg);
876                    return;
877                }
878            }
879            let mut inner = Box::pin(inner);
880            while let Some(ev) = futures::StreamExt::next(&mut inner).await {
881                yield ev;
882            }
883        }
884    }
885
886    /// User messages on the current (active) branch, newest first, each
887    /// paired with its `EntryId` and a one-line preview — the rows the
888    /// `/fork` picker offers.
889    pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
890        let entries = self
891            .session
892            .load_full()
893            .entries()
894            .await
895            .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
896        let branch = motosan_agent_loop::active_branch(&entries);
897        let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
898            .iter()
899            .filter_map(|stored| {
900                let msg = stored.entry.as_message()?;
901                if !matches!(msg.role(), motosan_agent_loop::Role::User) {
902                    return None;
903                }
904                let preview: String = msg
905                    .text()
906                    .lines()
907                    .next()
908                    .unwrap_or("")
909                    .chars()
910                    .take(80)
911                    .collect();
912                Some((stored.id.clone(), preview))
913            })
914            .collect();
915        out.reverse();
916        Ok(out)
917    }
918
919    /// The session log as a navigable branch tree — for the `/tree` UI.
920    pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
921        self.session
922            .load_full()
923            .branches()
924            .await
925            .map_err(|e| AppError::Config(format!("branches failed: {e}")))
926    }
927
928    /// Fire `session_before_switch` and translate `Cancelled` into
929    /// `AppError::HookCancelled`. Returns `Ok(())` on `Continue`.
930    async fn fire_session_before_switch(
931        &self,
932        reason: &str,
933        session_id: Option<&str>,
934    ) -> Result<()> {
935        use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
936        match dispatch_session_before_switch(&self.extension_registry, reason, session_id).await {
937            HookOutcome::Continue => Ok(()),
938            HookOutcome::Cancelled {
939                extension_name,
940                reason,
941            } => Err(AppError::HookCancelled {
942                extension_name,
943                reason,
944            }),
945        }
946    }
947}
948
949#[derive(Debug, Default)]
950struct ToolCallTracker {
951    next_id: usize,
952    pending: VecDeque<(String, String)>,
953}
954
955impl ToolCallTracker {
956    fn start(&mut self, name: &str) -> String {
957        self.next_id += 1;
958        let id = format!("tool_{}", self.next_id);
959        self.pending.push_back((name.to_string(), id.clone()));
960        id
961    }
962
963    fn complete(&mut self, name: &str) -> String {
964        if let Some(pos) = self
965            .pending
966            .iter()
967            .position(|(pending_name, _)| pending_name == name)
968        {
969            if let Some((_, id)) = self.pending.remove(pos) {
970                return id;
971            }
972        }
973
974        self.next_id += 1;
975        format!("tool_{}", self.next_id)
976    }
977
978    // ToolProgressChunk does not carry a tool-call id. When exactly one tool is
979    // pending we can attribute progress safely; otherwise we must not guess from
980    // queue order (for example `pending.back()`), because concurrent tool calls
981    // would mislabel output.
982    fn progress_id(&self) -> Option<String> {
983        match self.pending.len() {
984            1 => self.pending.front().map(|(_, id)| id.clone()),
985            _ => None,
986        }
987    }
988}
989
990fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
991    match tracker.lock() {
992        Ok(guard) => guard,
993        Err(poisoned) => poisoned.into_inner(),
994    }
995}
996
997fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
998    lock_tool_tracker(tracker)
999        .progress_id()
1000        .unwrap_or_else(|| "tool_unknown".to_string())
1001}
1002
1003fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
1004where
1005    F: Fn(&str) -> Option<String>,
1006{
1007    env_lookup("ANTHROPIC_API_KEY")
1008        .map(|key| key.trim().to_string())
1009        .filter(|key| !key.is_empty())
1010        .or_else(|| auth.api_key("anthropic").map(str::to_string))
1011}
1012
1013fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
1014    match ev {
1015        AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
1016        AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
1017            let id = lock_tool_tracker(tool_tracker).start(&name);
1018            Some(UiEvent::ToolCallStarted {
1019                id,
1020                name,
1021                args: serde_json::json!({}),
1022            })
1023        }
1024        AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
1025            let id = lock_tool_tracker(tool_tracker).complete(&name);
1026            Some(UiEvent::ToolCallCompleted {
1027                id,
1028                result: UiToolResult {
1029                    is_error: result.is_error,
1030                    text: format!("{name}: {result:?}"),
1031                },
1032            })
1033        }
1034        _ => None,
1035    }
1036}
1037
1038type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
1039
1040pub struct AppBuilder {
1041    config: Option<Config>,
1042    cwd: Option<PathBuf>,
1043    permission_gate: Option<Arc<dyn PermissionGate>>,
1044    install_builtin_tools: bool,
1045    max_iterations: usize,
1046    llm_override: Option<Arc<dyn LlmClient>>,
1047    custom_tools_factory: Option<CustomToolsFactory>,
1048    permissions_policy_path: Option<PathBuf>,
1049    ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
1050    headless_permissions: bool,
1051    settings: Option<crate::settings::Settings>,
1052    auth: Option<crate::auth::Auth>,
1053    context_discovery_disabled: bool,
1054    // M3 Phase A:
1055    session_store: Option<Arc<dyn SessionStore>>,
1056    resume_session_id: Option<crate::session::SessionId>,
1057    autocompact_enabled: bool,
1058    // M4 Phase A:
1059    skills: Vec<crate::skills::Skill>,
1060    // M4 Phase B:
1061    extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
1062    mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1063    extension_registry: Option<Arc<crate::extensions::ExtensionRegistry>>,
1064    extension_diagnostics: Option<Arc<Vec<crate::extensions::ExtensionDiagnostic>>>,
1065    token_tally: Option<Arc<tokio::sync::Mutex<TurnStatsAccum>>>,
1066}
1067
1068impl Default for AppBuilder {
1069    fn default() -> Self {
1070        Self {
1071            config: None,
1072            cwd: None,
1073            permission_gate: None,
1074            install_builtin_tools: false,
1075            // Per-turn (not session-wide) cap on LLM round-trips. Resets
1076            // every `send_user_message`. 200 is effectively "never hit in
1077            // practice" — codex/pi run uncapped; capo keeps a finite cap
1078            // as a runaway-loop safety net. When it does fire,
1079            // `run_turn` translates it to a `Notice` (not `Error`) so
1080            // the user can resume by sending another message.
1081            max_iterations: 200,
1082            llm_override: None,
1083            custom_tools_factory: None,
1084            permissions_policy_path: None,
1085            ui_tx: None,
1086            headless_permissions: false,
1087            settings: None,
1088            auth: None,
1089            context_discovery_disabled: false,
1090            session_store: None,
1091            resume_session_id: None,
1092            autocompact_enabled: false,
1093            skills: Vec::new(),
1094            extra_tools: Vec::new(),
1095            mcp_servers: Vec::new(),
1096            extension_registry: None,
1097            extension_diagnostics: None,
1098            token_tally: None,
1099        }
1100    }
1101}
1102
1103impl AppBuilder {
1104    pub fn new() -> Self {
1105        Self::default()
1106    }
1107
1108    pub fn with_config(mut self, cfg: Config) -> Self {
1109        self.config = Some(cfg);
1110        self
1111    }
1112
1113    pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
1114        self.cwd = Some(cwd.into());
1115        self
1116    }
1117
1118    pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
1119        self.permission_gate = Some(gate);
1120        self
1121    }
1122
1123    /// Install Capo's builtin tools (`read`, `bash`).
1124    ///
1125    /// This is mutually exclusive with `with_custom_tools_factory` /
1126    /// `build_with_custom_tools`; `build()` returns a configuration error if
1127    /// both are set.
1128    pub fn with_builtin_tools(mut self) -> Self {
1129        self.install_builtin_tools = true;
1130        self
1131    }
1132
1133    pub fn with_max_iterations(mut self, n: usize) -> Self {
1134        self.max_iterations = n;
1135        self
1136    }
1137
1138    pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
1139        self.llm_override = Some(llm);
1140        self
1141    }
1142
1143    pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
1144        self.permissions_policy_path = Some(path);
1145        self
1146    }
1147
1148    pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
1149        self.ui_tx = Some(tx);
1150        self
1151    }
1152
1153    /// Test/library hook to inject a pre-seeded token tally.
1154    pub fn with_token_tally(mut self, tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>) -> Self {
1155        self.token_tally = Some(tally);
1156        self
1157    }
1158
1159    /// Register `PermissionExtension` in headless-deny mode — `permissions.toml`
1160    /// and read-only auto-allow still apply, but a tool call that would
1161    /// otherwise prompt is denied (there is no UI to ask). Used by `--json`.
1162    /// Ignored if `with_ui_channel` is also set (an interactive channel wins).
1163    pub fn with_headless_permissions(mut self) -> Self {
1164        self.headless_permissions = true;
1165        self
1166    }
1167
1168    /// M3: install user `Settings`. Replaces `with_config` for new code.
1169    pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
1170        self.settings = Some(settings);
1171        self
1172    }
1173
1174    /// M3: install `Auth` (credentials for LLM providers).
1175    pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
1176        self.auth = Some(auth);
1177        self
1178    }
1179
1180    /// M3: disable AGENTS.md / CLAUDE.md discovery for this App.
1181    /// Opt-out — discovery is on by default in `build()`.
1182    pub fn disable_context_discovery(mut self) -> Self {
1183        self.context_discovery_disabled = true;
1184        self
1185    }
1186
1187    /// M3 Phase A: install a `SessionStore` (e.g. `motosan_agent_loop::FileSessionStore`)
1188    /// for persistence. When omitted, sessions are ephemeral (no jsonl on disk).
1189    pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
1190        self.session_store = Some(store);
1191        self
1192    }
1193
1194    /// M3 Phase A: enable autocompact at the `Settings::session.compact_at_context_pct`
1195    /// threshold. Requires `with_settings` to have been called; otherwise uses
1196    /// `Settings::default()`. Settings provide `max_context_tokens` and
1197    /// `keep_turns`. No-op when `settings.session.compact_at_context_pct == 0.0`.
1198    pub fn with_autocompact(mut self) -> Self {
1199        self.autocompact_enabled = true;
1200        self
1201    }
1202
1203    /// M4 Phase A: register skills. Pass an empty Vec (or omit the call,
1204    /// or call `without_skills()`) to disable skill injection. Skills are
1205    /// rendered into the system prompt's `<available_skills>` block when
1206    /// the `read` tool is available, and matched against `/skill:<name>`
1207    /// expansion before user messages reach the LLM.
1208    pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
1209        self.skills = skills;
1210        self
1211    }
1212
1213    pub fn without_skills(mut self) -> Self {
1214        self.skills.clear();
1215        self
1216    }
1217
1218    /// M4 Phase B: register additional tools (typically MCP). Unlike
1219    /// `with_custom_tools_factory`, this APPENDS to the builtin tools
1220    /// and does NOT replace them.
1221    pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
1222        self.extra_tools = tools;
1223        self
1224    }
1225
1226    pub fn with_extension_registry(
1227        mut self,
1228        registry: Arc<crate::extensions::ExtensionRegistry>,
1229    ) -> Self {
1230        self.extension_registry = Some(registry);
1231        self
1232    }
1233
1234    pub fn with_extension_diagnostics(
1235        mut self,
1236        diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
1237    ) -> Self {
1238        self.extension_diagnostics = Some(diagnostics);
1239        self
1240    }
1241
1242    /// M4 Phase B: register MCP server handles so `App::disconnect_mcp`
1243    /// can iterate them on shutdown. Storage only — does not connect.
1244    pub fn with_mcp_servers(
1245        mut self,
1246        servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1247    ) -> Self {
1248        self.mcp_servers = servers;
1249        self
1250    }
1251
1252    /// Install a custom tool set for this app.
1253    ///
1254    /// This is mutually exclusive with `with_builtin_tools`; `build()` returns
1255    /// a configuration error if both are set.
1256    pub fn with_custom_tools_factory(
1257        mut self,
1258        factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1259    ) -> Self {
1260        self.custom_tools_factory = Some(Box::new(factory));
1261        self
1262    }
1263
1264    /// Convenience wrapper for `with_custom_tools_factory(...).build()`.
1265    ///
1266    /// This is mutually exclusive with `with_builtin_tools`.
1267    pub async fn build_with_custom_tools(
1268        self,
1269        factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1270    ) -> Result<App> {
1271        self.with_custom_tools_factory(factory).build().await
1272    }
1273
1274    /// M3 Phase A: build the App with an optional resume session id.
1275    ///
1276    /// - `Some(id)` + `session_store: Some(_)` → `AgentSession::resume(id, store, engine, llm)`,
1277    ///   then replay history into `ToolCtx.read_files`.
1278    /// - `Some(id)` + `session_store: None` → error (resume requires a store).
1279    /// - `None` + `session_store: Some(_)` → `AgentSession::new_with_store(fresh_id, store, engine, llm)`.
1280    /// - `None` + `session_store: None` → `AgentSession::new(engine, llm)` (ephemeral).
1281    pub async fn build_with_session(
1282        mut self,
1283        resume: Option<crate::session::SessionId>,
1284    ) -> Result<App> {
1285        if let Some(id) = resume {
1286            if self.session_store.is_none() {
1287                return Err(AppError::Config(
1288                    "build_with_session(Some(id)) requires with_session_store(...)".into(),
1289                ));
1290            }
1291            self.resume_session_id = Some(id);
1292        }
1293        self.build_internal().await
1294    }
1295
1296    /// Legacy entry point; equivalent to `build_with_session(None)`.
1297    pub async fn build(self) -> Result<App> {
1298        self.build_with_session(None).await
1299    }
1300
1301    async fn build_internal(mut self) -> Result<App> {
1302        let mcp_servers = std::mem::take(&mut self.mcp_servers);
1303        let extra_tools = std::mem::take(&mut self.extra_tools);
1304        let skills = self.skills.clone();
1305        if self.install_builtin_tools && self.custom_tools_factory.is_some() {
1306            return Err(AppError::Config(
1307                "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
1308            ));
1309        }
1310
1311        // Synthesise the legacy `Config` so the rest of `build()` keeps
1312        // working without a wholesale rewrite. Settings + Auth override the
1313        // deprecated `Config` when both APIs are supplied.
1314        let has_config = self.config.is_some();
1315        let has_auth = self.auth.is_some();
1316        let mut config = self.config.unwrap_or_default();
1317        let settings = match self.settings {
1318            Some(settings) => settings,
1319            None => {
1320                let mut settings = crate::settings::Settings::default();
1321                settings.model.provider = config.model.provider.clone();
1322                settings.model.name = config.model.name.clone();
1323                settings.model.max_tokens = config.model.max_tokens;
1324                settings
1325            }
1326        };
1327        config.model.provider = settings.model.provider.clone();
1328        config.model.name = settings.model.name.clone();
1329        config.model.max_tokens = settings.model.max_tokens;
1330        let mut auth = self.auth.unwrap_or_default();
1331        if !has_auth {
1332            if let Some(key) = config.anthropic.api_key.as_deref() {
1333                auth.0.insert(
1334                    "anthropic".into(),
1335                    crate::auth::ProviderAuth::ApiKey {
1336                        key: key.to_string(),
1337                    },
1338                );
1339            }
1340        }
1341        let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
1342        if env_or_auth_key.is_some() || has_auth || !has_config {
1343            config.anthropic.api_key = env_or_auth_key;
1344        }
1345        let cwd = self
1346            .cwd
1347            .or_else(|| std::env::current_dir().ok())
1348            .unwrap_or_else(|| PathBuf::from("."));
1349        let agent_dir = crate::paths::agent_dir();
1350        let permission_gate = self.permission_gate.unwrap_or_else(|| {
1351            // When no gate is provided *and* no ui channel is wired,
1352            // fall back to NoOp with a warning log; when ui channel IS
1353            // wired, the PermissionExtension handles the real decisions.
1354            if self.ui_tx.is_some() || self.headless_permissions {
1355                Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1356            } else {
1357                tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
1358                Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1359            }
1360        });
1361
1362        // Permissions.
1363        let policy: Arc<crate::permissions::Policy> =
1364            Arc::new(match self.permissions_policy_path.as_ref() {
1365                Some(path) => crate::permissions::Policy::load_or_default(path)?,
1366                None => crate::permissions::Policy::default(),
1367            });
1368        let session_cache = Arc::new(crate::permissions::SessionCache::new());
1369        let permission_strategy_handle = if self.ui_tx.is_some() || self.headless_permissions {
1370            let initial_strategy = if self.ui_tx.is_some() {
1371                PromptStrategy::Prompt
1372            } else {
1373                PromptStrategy::HeadlessDeny
1374            };
1375            Some(Arc::new(tokio::sync::RwLock::new(initial_strategy)))
1376        } else {
1377            None
1378        };
1379        let permission_mode = Arc::new(tokio::sync::RwLock::new(
1380            crate::permissions::PermissionMode::default(),
1381        ));
1382        let token_tally = self
1383            .token_tally
1384            .unwrap_or_else(|| Arc::new(tokio::sync::Mutex::new(TurnStatsAccum::default())));
1385
1386        // Shared progress channel consumed by `send_user_message`.
1387        let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
1388
1389        // Resolve the tool set ONCE. Builtin tools, or a custom factory's
1390        // output, plus MCP `extra_tools`. A FnOnce custom factory can't be
1391        // re-run per session, so its output is captured here as a Vec.
1392        // `probe_ctx` also yields the build-time cancel token for `App`.
1393        let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
1394        let cancel_token = probe_ctx.cancel_token.clone();
1395        let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
1396            if let Some(factory_fn) = self.custom_tools_factory.take() {
1397                let mut t = factory_fn(probe_ctx);
1398                t.extend(extra_tools.clone());
1399                (false, t)
1400            } else {
1401                (self.install_builtin_tools, extra_tools.clone())
1402            };
1403
1404        let factory = SessionFactory {
1405            cwd: cwd.clone(),
1406            settings: Arc::new(Mutex::new(settings.clone())),
1407            auth: auth.clone(),
1408            policy: Arc::clone(&policy),
1409            session_cache: Arc::clone(&session_cache),
1410            ui_tx: self.ui_tx.clone(),
1411            headless_permissions: self.headless_permissions,
1412            permission_gate: Arc::clone(&permission_gate),
1413            permission_strategy_handle: permission_strategy_handle.clone(),
1414            progress_tx: progress_tx.clone(),
1415            skills: Arc::new(skills.clone()),
1416            install_builtin_tools: install_builtin,
1417            extra_tools: factory_extra_tools,
1418            max_iterations: self.max_iterations,
1419            context_discovery_disabled: self.context_discovery_disabled,
1420            autocompact_enabled: self.autocompact_enabled,
1421            session_store: self.session_store.clone(),
1422            llm_override: self.llm_override.clone(),
1423            current_model: Arc::new(Mutex::new(None)),
1424            cancel_token: cancel_token.clone(),
1425        };
1426
1427        let mode = match self.resume_session_id.take() {
1428            Some(id) => SessionMode::Resume(id.into_string()),
1429            None => SessionMode::New,
1430        };
1431        let (session, llm) = factory.build(mode, None, None).await?;
1432        let (extension_registry, extension_diagnostics) =
1433            if let Some(reg) = self.extension_registry.take() {
1434                let diagnostics = self
1435                    .extension_diagnostics
1436                    .unwrap_or_else(|| Arc::new(Vec::new()));
1437                (reg, diagnostics)
1438            } else {
1439                let manifest_path = crate::paths::extensions_manifest_path(&agent_dir);
1440                let (registry, diagnostics) =
1441                    crate::extensions::load_extensions_manifest(&manifest_path).await;
1442                for d in &diagnostics {
1443                    let message = d.message.as_str();
1444                    match d.severity {
1445                        crate::extensions::DiagnosticSeverity::Warn => {
1446                            tracing::warn!("extensions: {message}");
1447                        }
1448                        crate::extensions::DiagnosticSeverity::Error => {
1449                            tracing::error!("extensions: {message}");
1450                        }
1451                    }
1452                }
1453                (Arc::new(registry), Arc::new(diagnostics))
1454            };
1455
1456        Ok(App {
1457            session: arc_swap::ArcSwap::from_pointee(session),
1458            llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1459            factory,
1460            config,
1461            cancel_token,
1462            progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1463            next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1464            skills: Arc::new(skills),
1465            mcp_servers,
1466            extension_registry,
1467            token_tally,
1468            extension_diagnostics,
1469            session_cache,
1470            permission_mode,
1471            permission_strategy_handle,
1472            ui_tx_owned: self.ui_tx.clone(),
1473        })
1474    }
1475}
1476
1477#[cfg(test)]
1478mod tests {
1479    use super::*;
1480    use crate::config::{AnthropicConfig, ModelConfig};
1481    use crate::events::UiEvent;
1482    use crate::user_message::UserMessage;
1483    use async_trait::async_trait;
1484    use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1485    use motosan_agent_tool::ToolDef;
1486    use std::sync::atomic::{AtomicUsize, Ordering};
1487
1488    #[test]
1489    fn turn_stats_accum_accumulates_across_calls() {
1490        let mut accum = TurnStatsAccum::default();
1491        accum.add(motosan_agent_loop::TokenUsage {
1492            input_tokens: 100,
1493            output_tokens: 25,
1494        });
1495        accum.add(motosan_agent_loop::TokenUsage {
1496            input_tokens: 1000,
1497            output_tokens: 250,
1498        });
1499        assert_eq!(accum.cumulative_input, 1100);
1500        assert_eq!(accum.cumulative_output, 275);
1501        assert_eq!(accum.turn_count, 2);
1502    }
1503
1504    #[test]
1505    fn turn_stats_accum_saturates_on_overflow() {
1506        let mut accum = TurnStatsAccum {
1507            cumulative_input: u64::MAX - 5,
1508            cumulative_output: 0,
1509            turn_count: 1,
1510        };
1511        accum.add(motosan_agent_loop::TokenUsage {
1512            input_tokens: 100,
1513            output_tokens: 0,
1514        });
1515        assert_eq!(accum.cumulative_input, u64::MAX);
1516    }
1517
1518    #[test]
1519    fn extract_thinking_events_extracts_reasoning_in_source_order() {
1520        use motosan_agent_loop::{new_message_id, AssistantContent, MessageMeta};
1521        let messages = vec![Message::Assistant {
1522            id: new_message_id(),
1523            meta: MessageMeta::default(),
1524            content: vec![
1525                AssistantContent::Reasoning {
1526                    text: "first thought".into(),
1527                    signature: None,
1528                },
1529                AssistantContent::Text {
1530                    text: "answer 1".into(),
1531                },
1532                AssistantContent::Reasoning {
1533                    text: "second thought".into(),
1534                    signature: None,
1535                },
1536            ],
1537        }];
1538        let events = extract_thinking_events(&messages);
1539        assert_eq!(events.len(), 2);
1540        match (&events[0], &events[1]) {
1541            (UiEvent::ThinkingComplete { text: t1 }, UiEvent::ThinkingComplete { text: t2 }) => {
1542                assert_eq!(t1, "first thought");
1543                assert_eq!(t2, "second thought");
1544            }
1545            _ => panic!("expected two ThinkingComplete events"),
1546        }
1547    }
1548
1549    #[test]
1550    fn extract_new_thinking_events_skips_historical_reasoning_before_previous_len() {
1551        use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1552        let messages = vec![
1553            Message::User {
1554                id: new_message_id(),
1555                meta: MessageMeta::default(),
1556                content: vec![ContentPart::text("old question")],
1557            },
1558            Message::Assistant {
1559                id: new_message_id(),
1560                meta: MessageMeta::default(),
1561                content: vec![AssistantContent::Reasoning {
1562                    text: "old thought".into(),
1563                    signature: None,
1564                }],
1565            },
1566            Message::User {
1567                id: new_message_id(),
1568                meta: MessageMeta::default(),
1569                content: vec![ContentPart::text("new question")],
1570            },
1571            Message::Assistant {
1572                id: new_message_id(),
1573                meta: MessageMeta::default(),
1574                content: vec![AssistantContent::Reasoning {
1575                    text: "new thought".into(),
1576                    signature: None,
1577                }],
1578            },
1579        ];
1580
1581        let events = extract_new_thinking_events(&messages, 2);
1582        assert_eq!(events.len(), 1, "should only emit current-turn reasoning");
1583        match &events[0] {
1584            UiEvent::ThinkingComplete { text } => assert_eq!(text, "new thought"),
1585            other => panic!("expected ThinkingComplete; got {other:?}"),
1586        }
1587    }
1588
1589    #[test]
1590    fn extract_thinking_events_skips_non_assistant_and_non_reasoning() {
1591        use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1592        let messages = vec![
1593            Message::User {
1594                id: new_message_id(),
1595                meta: MessageMeta::default(),
1596                content: vec![ContentPart::text("question?")],
1597            },
1598            Message::Assistant {
1599                id: new_message_id(),
1600                meta: MessageMeta::default(),
1601                content: vec![AssistantContent::Text {
1602                    text: "answer".into(),
1603                }],
1604            },
1605        ];
1606        let events = extract_thinking_events(&messages);
1607        assert!(events.is_empty(), "expected no events; got {events:?}");
1608    }
1609
1610    #[tokio::test]
1611    async fn builder_fails_without_api_key() {
1612        let cfg = Config {
1613            anthropic: AnthropicConfig {
1614                api_key: None,
1615                base_url: "https://api.anthropic.com".into(),
1616            },
1617            model: ModelConfig {
1618                provider: "anthropic".into(),
1619                name: "claude-sonnet-4-6".into(),
1620                max_tokens: 4096,
1621            },
1622        };
1623        let err = match AppBuilder::new()
1624            .with_config(cfg)
1625            .with_builtin_tools()
1626            .build()
1627            .await
1628        {
1629            Ok(_) => panic!("must fail without key"),
1630            Err(err) => err,
1631        };
1632        assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1633    }
1634
1635    struct ToolOnlyLlm {
1636        turn: AtomicUsize,
1637    }
1638
1639    #[async_trait]
1640    impl LlmClient for ToolOnlyLlm {
1641        async fn chat(
1642            &self,
1643            _messages: &[Message],
1644            _tools: &[ToolDef],
1645        ) -> motosan_agent_loop::Result<ChatOutput> {
1646            let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1647            if turn == 0 {
1648                Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1649                    ToolCallItem {
1650                        id: "t1".into(),
1651                        name: "read".into(),
1652                        args: serde_json::json!({"path":"nope.txt"}),
1653                    },
1654                ])))
1655            } else {
1656                Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1657            }
1658        }
1659    }
1660
1661    #[tokio::test]
1662    async fn empty_final_message_is_not_emitted() {
1663        let dir = tempfile::tempdir().unwrap();
1664        let mut cfg = Config::default();
1665        cfg.anthropic.api_key = Some("sk-unused".into());
1666        let app = AppBuilder::new()
1667            .with_config(cfg)
1668            .with_cwd(dir.path())
1669            .with_builtin_tools()
1670            .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1671                turn: AtomicUsize::new(0),
1672            }))
1673            .build()
1674            .await
1675            .expect("build");
1676        let events: Vec<UiEvent> =
1677            futures::StreamExt::collect(app.send_user_message(UserMessage::text("x"))).await;
1678        let empties = events
1679            .iter()
1680            .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1681            .count();
1682        assert_eq!(
1683            empties, 0,
1684            "should not emit empty final message, got: {events:?}"
1685        );
1686    }
1687
1688    struct EchoLlm;
1689
1690    struct UsageLlm {
1691        responses: std::sync::Mutex<VecDeque<ChatOutput>>,
1692    }
1693
1694    #[async_trait]
1695    impl LlmClient for UsageLlm {
1696        async fn chat(
1697            &self,
1698            _messages: &[Message],
1699            _tools: &[ToolDef],
1700        ) -> motosan_agent_loop::Result<ChatOutput> {
1701            let next = match self.responses.lock() {
1702                Ok(mut responses) => responses.pop_front(),
1703                Err(poisoned) => poisoned.into_inner().pop_front(),
1704            };
1705            Ok(next.unwrap_or_else(|| panic!("UsageLlm script exhausted")))
1706        }
1707    }
1708
1709    #[async_trait]
1710    impl LlmClient for EchoLlm {
1711        async fn chat(
1712            &self,
1713            _messages: &[Message],
1714            _tools: &[ToolDef],
1715        ) -> motosan_agent_loop::Result<ChatOutput> {
1716            Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1717        }
1718    }
1719
1720    #[tokio::test]
1721    async fn turn_stats_emitted_with_cumulative_after_each_turn() {
1722        use motosan_agent_loop::{LlmResponse, TokenUsage};
1723
1724        let dir = tempfile::tempdir().expect("tempdir");
1725        let mut cfg = Config::default();
1726        cfg.anthropic.api_key = Some("sk-unused".into());
1727        let llm = Arc::new(UsageLlm {
1728            responses: std::sync::Mutex::new(VecDeque::from([
1729                ChatOutput::with_usage(
1730                    LlmResponse::Message("first".into()),
1731                    TokenUsage {
1732                        input_tokens: 100,
1733                        output_tokens: 50,
1734                    },
1735                ),
1736                ChatOutput::with_usage(
1737                    LlmResponse::Message("second".into()),
1738                    TokenUsage {
1739                        input_tokens: 200,
1740                        output_tokens: 80,
1741                    },
1742                ),
1743            ])),
1744        });
1745        let app = AppBuilder::new()
1746            .with_config(cfg)
1747            .with_cwd(dir.path())
1748            .with_builtin_tools()
1749            .with_llm(llm)
1750            .build()
1751            .await
1752            .expect("build");
1753
1754        let events_1: Vec<UiEvent> = app
1755            .send_user_message(UserMessage::text("hi"))
1756            .collect()
1757            .await;
1758        let events_2: Vec<UiEvent> = app
1759            .send_user_message(UserMessage::text("again"))
1760            .collect()
1761            .await;
1762
1763        let ts1 = events_1
1764            .iter()
1765            .find_map(|e| match e {
1766                UiEvent::TurnStats {
1767                    input_tokens,
1768                    output_tokens,
1769                    cumulative_input,
1770                    cumulative_output,
1771                    ..
1772                } => Some((
1773                    *input_tokens,
1774                    *output_tokens,
1775                    *cumulative_input,
1776                    *cumulative_output,
1777                )),
1778                _ => None,
1779            })
1780            .expect("turn 1 had no TurnStats");
1781        assert_eq!(ts1, (100, 50, 100, 50), "turn 1 stats");
1782
1783        let ts2 = events_2
1784            .iter()
1785            .find_map(|e| match e {
1786                UiEvent::TurnStats {
1787                    input_tokens,
1788                    output_tokens,
1789                    cumulative_input,
1790                    cumulative_output,
1791                    ..
1792                } => Some((
1793                    *input_tokens,
1794                    *output_tokens,
1795                    *cumulative_input,
1796                    *cumulative_output,
1797                )),
1798                _ => None,
1799            })
1800            .expect("turn 2 had no TurnStats");
1801        assert_eq!(ts2, (200, 80, 300, 130), "turn 2 stats");
1802
1803        let positions: Vec<&str> = events_1
1804            .iter()
1805            .filter_map(|e| match e {
1806                UiEvent::AgentMessageComplete(_) => Some("msg_complete"),
1807                UiEvent::TurnStats { .. } => Some("stats"),
1808                UiEvent::AgentTurnComplete => Some("turn_complete"),
1809                _ => None,
1810            })
1811            .collect();
1812        assert_eq!(
1813            positions,
1814            vec!["msg_complete", "stats", "turn_complete"],
1815            "wrong ordering"
1816        );
1817    }
1818
1819    #[tokio::test]
1820    async fn turn_stats_reset_after_new_session() {
1821        use motosan_agent_loop::{LlmResponse, TokenUsage};
1822
1823        let dir = tempfile::tempdir().expect("tempdir");
1824        let mut cfg = Config::default();
1825        cfg.anthropic.api_key = Some("sk-unused".into());
1826        let llm = Arc::new(UsageLlm {
1827            responses: std::sync::Mutex::new(VecDeque::from([
1828                ChatOutput::with_usage(
1829                    LlmResponse::Message("first".into()),
1830                    TokenUsage {
1831                        input_tokens: 100,
1832                        output_tokens: 50,
1833                    },
1834                ),
1835                ChatOutput::with_usage(
1836                    LlmResponse::Message("after-new".into()),
1837                    TokenUsage {
1838                        input_tokens: 7,
1839                        output_tokens: 3,
1840                    },
1841                ),
1842            ])),
1843        });
1844        let app = AppBuilder::new()
1845            .with_config(cfg)
1846            .with_cwd(dir.path())
1847            .with_builtin_tools()
1848            .with_llm(llm)
1849            .build()
1850            .await
1851            .expect("build");
1852
1853        let _: Vec<UiEvent> = app
1854            .send_user_message(UserMessage::text("hi"))
1855            .collect()
1856            .await;
1857        app.new_session().await.expect("new session");
1858        let events: Vec<UiEvent> = app
1859            .send_user_message(UserMessage::text("after new"))
1860            .collect()
1861            .await;
1862        let stats = events
1863            .iter()
1864            .find_map(|event| match event {
1865                UiEvent::TurnStats {
1866                    input_tokens,
1867                    output_tokens,
1868                    cumulative_input,
1869                    cumulative_output,
1870                    ..
1871                } => Some((
1872                    *input_tokens,
1873                    *output_tokens,
1874                    *cumulative_input,
1875                    *cumulative_output,
1876                )),
1877                _ => None,
1878            })
1879            .expect("turn had no TurnStats");
1880        assert_eq!(stats, (7, 3, 7, 3));
1881    }
1882
1883    #[tokio::test]
1884    async fn with_headless_permissions_builds_an_app() {
1885        let dir = tempfile::tempdir().expect("tempdir");
1886        let mut config = Config::default();
1887        config.anthropic.api_key = Some("sk-unused".into());
1888        let app = AppBuilder::new()
1889            .with_config(config)
1890            .with_cwd(dir.path())
1891            .with_builtin_tools()
1892            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1893            .with_headless_permissions()
1894            .build()
1895            .await
1896            .expect("build");
1897        // Build succeeds and yields a usable session id.
1898        assert!(!app.session_id().is_empty());
1899    }
1900
1901    #[tokio::test]
1902    async fn new_session_swaps_in_a_fresh_empty_session() {
1903        use futures::StreamExt;
1904        let dir = tempfile::tempdir().expect("tempdir");
1905        let mut config = Config::default();
1906        config.anthropic.api_key = Some("sk-unused".into());
1907        let app = AppBuilder::new()
1908            .with_config(config)
1909            .with_cwd(dir.path())
1910            .with_builtin_tools()
1911            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1912            .build()
1913            .await
1914            .expect("build");
1915
1916        let _: Vec<_> = app
1917            .send_user_message(UserMessage::text("hello"))
1918            .collect()
1919            .await;
1920        let id_before = app.session_id();
1921        assert!(!app.session_history().await.expect("history").is_empty());
1922
1923        app.new_session().await.expect("new_session");
1924
1925        assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1926        assert!(
1927            app.session_history().await.expect("history").is_empty(),
1928            "fresh session has no history"
1929        );
1930    }
1931
1932    #[tokio::test]
1933    async fn load_session_restores_a_stored_session_by_id() {
1934        use futures::StreamExt;
1935        let dir = tempfile::tempdir().expect("tempdir");
1936        let store_dir = dir.path().join("sessions");
1937        let store: Arc<dyn SessionStore> =
1938            Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1939        let mut config = Config::default();
1940        config.anthropic.api_key = Some("sk-unused".into());
1941        let app = AppBuilder::new()
1942            .with_config(config)
1943            .with_cwd(dir.path())
1944            .with_builtin_tools()
1945            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1946            .with_session_store(Arc::clone(&store))
1947            .build()
1948            .await
1949            .expect("build");
1950
1951        let _: Vec<_> = app
1952            .send_user_message(UserMessage::text("remember this"))
1953            .collect()
1954            .await;
1955        let original_id = app.session_id();
1956
1957        app.new_session().await.expect("new_session");
1958        assert_ne!(app.session_id(), original_id);
1959
1960        app.load_session(&original_id).await.expect("load_session");
1961        assert_eq!(app.session_id(), original_id);
1962        let history = app.session_history().await.expect("history");
1963        assert!(
1964            history.iter().any(|m| m.text().contains("remember this")),
1965            "loaded session should carry the original turn"
1966        );
1967    }
1968
1969    #[tokio::test]
1970    async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
1971        use futures::StreamExt;
1972        let dir = tempfile::tempdir().expect("tempdir");
1973        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1974            dir.path().join("s"),
1975        ));
1976        let mut config = Config::default();
1977        config.anthropic.api_key = Some("sk-unused".into());
1978        let app = AppBuilder::new()
1979            .with_config(config)
1980            .with_cwd(dir.path())
1981            .with_builtin_tools()
1982            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1983            .with_session_store(store)
1984            .build()
1985            .await
1986            .expect("build");
1987
1988        let _: Vec<_> = app
1989            .send_user_message(UserMessage::text("hello"))
1990            .collect()
1991            .await;
1992        let original_id = app.session_id();
1993
1994        let new_id = app.clone_session().await.expect("clone_session");
1995
1996        // The clone has a distinct id, and the live session switched to it.
1997        assert_ne!(new_id, original_id);
1998        assert_eq!(app.session_id(), new_id);
1999        // The copy carries the same conversation.
2000        let history = app.session_history().await.expect("history");
2001        assert!(history.iter().any(|m| m.text().contains("hello")));
2002    }
2003
2004    #[tokio::test]
2005    async fn fork_from_creates_a_branch_off_an_earlier_entry() {
2006        use futures::StreamExt;
2007        let dir = tempfile::tempdir().expect("tempdir");
2008        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2009            dir.path().join("s"),
2010        ));
2011        let mut config = Config::default();
2012        config.anthropic.api_key = Some("sk-unused".into());
2013        let app = AppBuilder::new()
2014            .with_config(config)
2015            .with_cwd(dir.path())
2016            .with_builtin_tools()
2017            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2018            .with_session_store(store)
2019            .build()
2020            .await
2021            .expect("build");
2022
2023        // Two linear turns.
2024        let _: Vec<_> = app
2025            .send_user_message(UserMessage::text("first"))
2026            .collect()
2027            .await;
2028        let _: Vec<_> = app
2029            .send_user_message(UserMessage::text("second"))
2030            .collect()
2031            .await;
2032
2033        // The EntryId of the first user message.
2034        let entries = app.session.load_full().entries().await.expect("entries");
2035        let first_id = entries
2036            .iter()
2037            .find_map(|stored| {
2038                let msg = stored.entry.as_message()?;
2039                (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
2040                    .then(|| stored.id.clone())
2041            })
2042            .expect("first user message present");
2043
2044        // Fork from it.
2045        let _: Vec<_> = app
2046            .fork_from(first_id, UserMessage::text("branched"))
2047            .collect()
2048            .await;
2049
2050        let history = app.session_history().await.expect("history");
2051        let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
2052        assert!(
2053            texts.iter().any(|t| t.contains("first")),
2054            "fork keeps the fork-point ancestor"
2055        );
2056        assert!(
2057            texts.iter().any(|t| t.contains("branched")),
2058            "fork includes the new message"
2059        );
2060        assert!(
2061            !texts.iter().any(|t| t.contains("second")),
2062            "fork excludes the abandoned branch"
2063        );
2064    }
2065
2066    #[tokio::test]
2067    async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
2068        use futures::StreamExt;
2069        let dir = tempfile::tempdir().expect("tempdir");
2070        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2071            dir.path().join("s"),
2072        ));
2073        let mut config = Config::default();
2074        config.anthropic.api_key = Some("sk-unused".into());
2075        let app = AppBuilder::new()
2076            .with_config(config)
2077            .with_cwd(dir.path())
2078            .with_builtin_tools()
2079            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2080            .with_session_store(store)
2081            .build()
2082            .await
2083            .expect("build");
2084
2085        let _: Vec<_> = app
2086            .send_user_message(UserMessage::text("alpha"))
2087            .collect()
2088            .await;
2089        let _: Vec<_> = app
2090            .send_user_message(UserMessage::text("bravo"))
2091            .collect()
2092            .await;
2093
2094        let candidates = app.fork_candidates().await.expect("candidates");
2095        let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
2096        // Newest first.
2097        assert!(previews[0].contains("bravo"), "got {previews:?}");
2098        assert!(previews.iter().any(|p| p.contains("alpha")));
2099        // Every id is non-empty and distinct.
2100        assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
2101    }
2102
2103    #[tokio::test]
2104    async fn branches_returns_a_tree_for_a_linear_session() {
2105        use futures::StreamExt;
2106        let dir = tempfile::tempdir().expect("tempdir");
2107        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2108            dir.path().join("s"),
2109        ));
2110        let mut config = Config::default();
2111        config.anthropic.api_key = Some("sk-unused".into());
2112        let app = AppBuilder::new()
2113            .with_config(config)
2114            .with_cwd(dir.path())
2115            .with_builtin_tools()
2116            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2117            .with_session_store(store)
2118            .build()
2119            .await
2120            .expect("build");
2121
2122        let _: Vec<_> = app
2123            .send_user_message(UserMessage::text("hello"))
2124            .collect()
2125            .await;
2126        let tree = app.branches().await.expect("branches");
2127        // A linear 1-turn session: non-empty node list, an active leaf.
2128        assert!(!tree.nodes.is_empty());
2129        assert!(tree.active_leaf.is_some());
2130    }
2131
2132    #[tokio::test]
2133    async fn reload_settings_rebuilds_session_and_resets_token_tally() {
2134        let dir = tempfile::tempdir().expect("tempdir");
2135        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2136            dir.path().join("s"),
2137        ));
2138        let mut cfg = Config::default();
2139        cfg.anthropic.api_key = Some("sk-unused".into());
2140
2141        let llm = Arc::new(UsageLlm {
2142            responses: std::sync::Mutex::new(VecDeque::from([
2143                ChatOutput::with_usage(
2144                    LlmResponse::Message("first".into()),
2145                    motosan_agent_loop::TokenUsage {
2146                        input_tokens: 100,
2147                        output_tokens: 50,
2148                    },
2149                ),
2150                ChatOutput::with_usage(
2151                    LlmResponse::Message("after-reload".into()),
2152                    motosan_agent_loop::TokenUsage {
2153                        input_tokens: 5,
2154                        output_tokens: 2,
2155                    },
2156                ),
2157            ])),
2158        });
2159        let app = AppBuilder::new()
2160            .with_config(cfg)
2161            .with_cwd(dir.path())
2162            .with_builtin_tools()
2163            .with_llm(llm)
2164            .with_session_store(store)
2165            .build()
2166            .await
2167            .expect("build");
2168
2169        let _: Vec<UiEvent> = app
2170            .send_user_message(crate::user_message::UserMessage::text("hi"))
2171            .collect()
2172            .await;
2173        {
2174            let token_tally = app.token_tally();
2175            let tally = token_tally.lock().await;
2176            assert_eq!(tally.cumulative_input, 100);
2177            assert_eq!(tally.cumulative_output, 50);
2178            assert_eq!(tally.turn_count, 1);
2179        }
2180
2181        let mut new_settings = crate::settings::Settings::default();
2182        new_settings.model.name = "claude-opus-4-7".into();
2183        new_settings.ui.footer_show_cost = false;
2184        app.reload_settings(new_settings).await.expect("reload");
2185        assert_eq!(app.factory.settings().model.name, "claude-opus-4-7");
2186        assert!(!app.factory.settings().ui.footer_show_cost);
2187        assert_eq!(app.factory.current_model(), None);
2188
2189        {
2190            let token_tally = app.token_tally();
2191            let tally = token_tally.lock().await;
2192            assert_eq!(tally.cumulative_input, 0, "tally not reset on reload");
2193            assert_eq!(tally.cumulative_output, 0);
2194            assert_eq!(tally.turn_count, 0);
2195        }
2196
2197        let _: Vec<UiEvent> = app
2198            .send_user_message(crate::user_message::UserMessage::text("after"))
2199            .collect()
2200            .await;
2201        {
2202            let token_tally = app.token_tally();
2203            let tally = token_tally.lock().await;
2204            assert_eq!(tally.cumulative_input, 5);
2205            assert_eq!(tally.cumulative_output, 2);
2206            assert_eq!(tally.turn_count, 1);
2207        }
2208    }
2209
2210    #[tokio::test]
2211    async fn switch_model_preserves_history() {
2212        use futures::StreamExt;
2213        let dir = tempfile::tempdir().expect("tempdir");
2214        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2215            dir.path().join("s"),
2216        ));
2217        let mut config = Config::default();
2218        config.anthropic.api_key = Some("sk-unused".into());
2219        let app = AppBuilder::new()
2220            .with_config(config)
2221            .with_cwd(dir.path())
2222            .with_builtin_tools()
2223            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2224            .with_session_store(store)
2225            .build()
2226            .await
2227            .expect("build");
2228
2229        let _: Vec<_> = app
2230            .send_user_message(UserMessage::text("keep me"))
2231            .collect()
2232            .await;
2233        let id_before = app.session_id();
2234
2235        app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
2236            .await
2237            .expect("switch_model");
2238
2239        assert_eq!(
2240            app.session_id(),
2241            id_before,
2242            "switch_model keeps the same session"
2243        );
2244        let history = app.session_history().await.expect("history");
2245        assert!(history.iter().any(|m| m.text().contains("keep me")));
2246    }
2247
2248    #[tokio::test]
2249    async fn switch_model_is_sticky_for_future_session_rebuilds() {
2250        let dir = tempfile::tempdir().expect("tempdir");
2251        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2252            dir.path().join("s"),
2253        ));
2254        let mut config = Config::default();
2255        config.anthropic.api_key = Some("sk-unused".into());
2256        let app = AppBuilder::new()
2257            .with_config(config)
2258            .with_cwd(dir.path())
2259            .with_builtin_tools()
2260            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2261            .with_session_store(store)
2262            .build()
2263            .await
2264            .expect("build");
2265
2266        let selected = crate::model::ModelId::from("claude-opus-4-7");
2267        app.switch_model(&selected).await.expect("switch_model");
2268        app.new_session().await.expect("new_session");
2269
2270        assert_eq!(app.factory.current_model(), Some(selected.clone()));
2271        assert_eq!(app.settings().model.name, selected.to_string());
2272    }
2273
2274    struct SleepThenDoneLlm {
2275        turn: AtomicUsize,
2276    }
2277
2278    #[async_trait]
2279    impl LlmClient for SleepThenDoneLlm {
2280        async fn chat(
2281            &self,
2282            _messages: &[Message],
2283            _tools: &[ToolDef],
2284        ) -> motosan_agent_loop::Result<ChatOutput> {
2285            let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2286            if turn == 0 {
2287                Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2288                    ToolCallItem {
2289                        id: "sleep".into(),
2290                        name: "bash".into(),
2291                        args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
2292                    },
2293                ])))
2294            } else {
2295                Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2296            }
2297        }
2298    }
2299
2300    #[tokio::test]
2301    async fn cancel_reaches_builtin_tools_after_session_rebuild() {
2302        use futures::StreamExt;
2303        let dir = tempfile::tempdir().expect("tempdir");
2304        let mut config = Config::default();
2305        config.anthropic.api_key = Some("sk-unused".into());
2306        let app = Arc::new(
2307            AppBuilder::new()
2308                .with_config(config)
2309                .with_cwd(dir.path())
2310                .with_builtin_tools()
2311                .with_llm(Arc::new(SleepThenDoneLlm {
2312                    turn: AtomicUsize::new(0),
2313                }) as Arc<dyn LlmClient>)
2314                .build()
2315                .await
2316                .expect("build"),
2317        );
2318
2319        app.new_session().await.expect("new_session");
2320        let running_app = Arc::clone(&app);
2321        let handle = tokio::spawn(async move {
2322            running_app
2323                .send_user_message(UserMessage::text("run a slow command"))
2324                .collect::<Vec<_>>()
2325                .await
2326        });
2327        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2328        app.cancel();
2329
2330        let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
2331            .await
2332            .expect("turn should finish after cancellation")
2333            .expect("join");
2334        assert!(
2335            events.iter().any(|event| {
2336                matches!(
2337                    event,
2338                    UiEvent::ToolCallCompleted { result, .. }
2339                        if result.text.contains("command cancelled by user")
2340                )
2341            }),
2342            "cancel should reach the rebuilt bash tool: {events:?}"
2343        );
2344    }
2345
2346    #[tokio::test]
2347    async fn compact_summarizes_a_session_with_enough_history() {
2348        struct DoneLlm;
2349        #[async_trait]
2350        impl LlmClient for DoneLlm {
2351            async fn chat(
2352                &self,
2353                _messages: &[Message],
2354                _tools: &[ToolDef],
2355            ) -> motosan_agent_loop::Result<ChatOutput> {
2356                Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2357            }
2358        }
2359
2360        let dir = tempfile::tempdir().expect("tempdir");
2361        let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
2362            dir.path().join("sessions"),
2363        ));
2364        let mut config = Config::default();
2365        config.anthropic.api_key = Some("sk-unused".into());
2366        let app = AppBuilder::new()
2367            .with_config(config)
2368            .with_cwd(dir.path())
2369            .with_builtin_tools()
2370            .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
2371            .with_session_store(store)
2372            .build()
2373            .await
2374            .expect("build");
2375
2376        // Run 4 user turns. `App::compact()` uses ThresholdStrategy with the
2377        // default keep_turns (3); `select_cutoff` only returns a cutoff once
2378        // there are >= keep_turns user messages — so a 1-turn session would
2379        // make compact() a no-op (Ok, but nothing summarized). 4 turns
2380        // guarantees the real compaction path (and the DoneLlm summarizer
2381        // call) is exercised.
2382        for i in 0..4 {
2383            let _: Vec<_> = app
2384                .send_user_message(UserMessage::text(format!("turn {i}")))
2385                .collect()
2386                .await;
2387        }
2388
2389        app.compact().await.expect("compact should succeed");
2390
2391        // The compaction marker is appended as a session entry — history
2392        // after compaction is shorter than the raw 4-turn transcript.
2393        let history = app.session_history().await.expect("history");
2394        assert!(
2395            !history.is_empty(),
2396            "session should still have content post-compaction"
2397        );
2398    }
2399
2400    #[test]
2401    fn anthropic_env_api_key_overrides_auth_json_key() {
2402        let mut auth = crate::auth::Auth::default();
2403        auth.0.insert(
2404            "anthropic".into(),
2405            crate::auth::ProviderAuth::ApiKey {
2406                key: "sk-auth".into(),
2407            },
2408        );
2409
2410        let key = anthropic_api_key_from(&auth, |name| {
2411            (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
2412        });
2413        assert_eq!(key.as_deref(), Some("sk-env"));
2414    }
2415
2416    #[tokio::test]
2417    async fn with_settings_overrides_deprecated_config_model() {
2418        use crate::settings::Settings;
2419
2420        let mut config = Config::default();
2421        config.model.name = "from-config".into();
2422        config.anthropic.api_key = Some("sk-config".into());
2423
2424        let mut settings = Settings::default();
2425        settings.model.name = "from-settings".into();
2426
2427        let tmp = tempfile::tempdir().unwrap();
2428        let app = AppBuilder::new()
2429            .with_config(config)
2430            .with_settings(settings)
2431            .with_cwd(tmp.path())
2432            .disable_context_discovery()
2433            .with_llm(Arc::new(EchoLlm))
2434            .build()
2435            .await
2436            .expect("build");
2437        assert_eq!(app.config().model.name, "from-settings");
2438        assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
2439    }
2440
2441    #[tokio::test]
2442    async fn with_settings_synthesises_legacy_config_for_build() {
2443        use crate::auth::{Auth, ProviderAuth};
2444        use crate::settings::Settings;
2445
2446        let mut settings = Settings::default();
2447        settings.model.name = "claude-sonnet-4-6".into();
2448
2449        let mut auth = Auth::default();
2450        auth.0.insert(
2451            "anthropic".into(),
2452            ProviderAuth::ApiKey {
2453                key: "sk-test".into(),
2454            },
2455        );
2456
2457        let tmp = tempfile::tempdir().unwrap();
2458        let app = AppBuilder::new()
2459            .with_settings(settings)
2460            .with_auth(auth)
2461            .with_cwd(tmp.path())
2462            .with_builtin_tools()
2463            .disable_context_discovery()
2464            .with_llm(Arc::new(EchoLlm))
2465            .build()
2466            .await
2467            .expect("build");
2468        let _ = app;
2469    }
2470
2471    #[tokio::test]
2472    async fn cancel_before_turn_does_not_poison_future_turns() {
2473        let dir = tempfile::tempdir().unwrap();
2474        let mut cfg = Config::default();
2475        cfg.anthropic.api_key = Some("sk-unused".into());
2476        let app = AppBuilder::new()
2477            .with_config(cfg)
2478            .with_cwd(dir.path())
2479            .with_builtin_tools()
2480            .with_llm(std::sync::Arc::new(EchoLlm))
2481            .build()
2482            .await
2483            .expect("build");
2484
2485        app.cancel();
2486        let events: Vec<UiEvent> = app
2487            .send_user_message(UserMessage::text("x"))
2488            .collect()
2489            .await;
2490
2491        assert!(
2492            events
2493                .iter()
2494                .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
2495            "turn should use a fresh cancellation token: {events:?}"
2496        );
2497    }
2498
2499    #[test]
2500    fn map_event_matches_started_and_completed_ids_by_tool_name() {
2501        let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2502
2503        let started_bash = map_event(
2504            AgentEvent::Core(CoreEvent::ToolStarted {
2505                name: "bash".into(),
2506            }),
2507            &tracker,
2508        );
2509        let started_read = map_event(
2510            AgentEvent::Core(CoreEvent::ToolStarted {
2511                name: "read".into(),
2512            }),
2513            &tracker,
2514        );
2515        let completed_bash = map_event(
2516            AgentEvent::Core(CoreEvent::ToolCompleted {
2517                name: "bash".into(),
2518                result: motosan_agent_tool::ToolResult::text("ok"),
2519            }),
2520            &tracker,
2521        );
2522        let completed_read = map_event(
2523            AgentEvent::Core(CoreEvent::ToolCompleted {
2524                name: "read".into(),
2525                result: motosan_agent_tool::ToolResult::text("ok"),
2526            }),
2527            &tracker,
2528        );
2529
2530        assert!(matches!(
2531            started_bash,
2532            Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
2533        ));
2534        assert!(matches!(
2535            started_read,
2536            Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
2537        ));
2538        assert!(matches!(
2539            completed_bash,
2540            Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
2541        ));
2542        assert!(matches!(
2543            completed_read,
2544            Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
2545        ));
2546    }
2547
2548    #[test]
2549    fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
2550        let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2551        let s1 = map_event(
2552            AgentEvent::Core(CoreEvent::ToolStarted {
2553                name: "bash".into(),
2554            }),
2555            &tracker,
2556        );
2557        let s2 = map_event(
2558            AgentEvent::Core(CoreEvent::ToolStarted {
2559                name: "bash".into(),
2560            }),
2561            &tracker,
2562        );
2563        let c1 = map_event(
2564            AgentEvent::Core(CoreEvent::ToolCompleted {
2565                name: "bash".into(),
2566                result: motosan_agent_tool::ToolResult::text("a"),
2567            }),
2568            &tracker,
2569        );
2570        let c2 = map_event(
2571            AgentEvent::Core(CoreEvent::ToolCompleted {
2572                name: "bash".into(),
2573                result: motosan_agent_tool::ToolResult::text("b"),
2574            }),
2575            &tracker,
2576        );
2577
2578        let id_s1 = match s1 {
2579            Some(UiEvent::ToolCallStarted { id, .. }) => id,
2580            other => panic!("{other:?}"),
2581        };
2582        let id_s2 = match s2 {
2583            Some(UiEvent::ToolCallStarted { id, .. }) => id,
2584            other => panic!("{other:?}"),
2585        };
2586        let id_c1 = match c1 {
2587            Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2588            other => panic!("{other:?}"),
2589        };
2590        let id_c2 = match c2 {
2591            Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2592            other => panic!("{other:?}"),
2593        };
2594
2595        assert_eq!(id_s1, id_c1);
2596        assert_eq!(id_s2, id_c2);
2597        assert_ne!(id_s1, id_s2);
2598    }
2599
2600    #[tokio::test]
2601    async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
2602        let dir = tempfile::tempdir().unwrap();
2603        let mut cfg = Config::default();
2604        cfg.anthropic.api_key = Some("sk-unused".into());
2605        let app = AppBuilder::new()
2606            .with_config(cfg)
2607            .with_cwd(dir.path())
2608            .with_builtin_tools()
2609            .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2610                turn: AtomicUsize::new(0),
2611            }))
2612            .build()
2613            .await
2614            .expect("build");
2615
2616        let mut first = Box::pin(app.send_user_message(UserMessage::text("first")));
2617        let first_event = first.next().await;
2618        assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2619
2620        let second_events: Vec<UiEvent> = app
2621            .send_user_message(UserMessage::text("second"))
2622            .collect()
2623            .await;
2624        assert_eq!(
2625            second_events.len(),
2626            1,
2627            "expected immediate single error event, got: {second_events:?}"
2628        );
2629        assert!(matches!(
2630            &second_events[0],
2631            UiEvent::Error(msg) if msg.contains("single-turn-per-App")
2632        ));
2633    }
2634
2635    #[tokio::test]
2636    async fn attachment_error_is_emitted_before_the_single_turn_guard_fires() {
2637        // Mirror of the concurrent_send_user_message... test above, but the
2638        // second call carries a bad attachment. The single-turn guard would
2639        // emit UiEvent::Error("...single-turn-per-App"); we expect the EARLIER
2640        // prepare_user_message step to short-circuit with AttachmentError
2641        // instead. See spec §3 ("prepare runs before guard") and §7.1.
2642        let dir = tempfile::tempdir().unwrap();
2643        let mut cfg = Config::default();
2644        cfg.anthropic.api_key = Some("sk-unused".into());
2645        let app = AppBuilder::new()
2646            .with_config(cfg)
2647            .with_cwd(dir.path())
2648            .with_builtin_tools()
2649            .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2650                turn: AtomicUsize::new(0),
2651            }))
2652            .build()
2653            .await
2654            .expect("build");
2655
2656        // Park the first turn (claims the single-turn lock).
2657        let mut first =
2658            Box::pin(app.send_user_message(crate::user_message::UserMessage::text("first")));
2659        let first_event = first.next().await;
2660        assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2661
2662        // Fire a second call with a bad attachment.
2663        let bad = crate::user_message::UserMessage {
2664            text: "second".into(),
2665            attachments: vec![crate::user_message::Attachment::Image {
2666                path: std::path::PathBuf::from("/tmp/this-file-must-not-exist-capo-v06-test.png"),
2667            }],
2668        };
2669        let second_events: Vec<UiEvent> = app.send_user_message(bad).collect().await;
2670
2671        assert_eq!(
2672            second_events.len(),
2673            1,
2674            "expected exactly one event (the attachment error); got: {second_events:?}"
2675        );
2676        assert!(
2677            matches!(
2678                &second_events[0],
2679                UiEvent::AttachmentError {
2680                    kind: crate::user_message::AttachmentErrorKind::NotFound,
2681                    ..
2682                }
2683            ),
2684            "expected AttachmentError::NotFound as first event; got {second_events:?}"
2685        );
2686    }
2687
2688    struct InfiniteToolLlm;
2689
2690    #[async_trait]
2691    impl LlmClient for InfiniteToolLlm {
2692        async fn chat(
2693            &self,
2694            _messages: &[Message],
2695            _tools: &[ToolDef],
2696        ) -> motosan_agent_loop::Result<ChatOutput> {
2697            // Every chat returns another tool call, never a terminating
2698            // Message — motosan loops until max_iterations is reached.
2699            Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2700                ToolCallItem {
2701                    id: "loop".into(),
2702                    name: "read".into(),
2703                    args: serde_json::json!({"path": "nope.txt"}),
2704                },
2705            ])))
2706        }
2707    }
2708
2709    #[tokio::test]
2710    async fn max_iterations_surfaces_as_notice_with_lifecycle_complete() {
2711        let dir = tempfile::tempdir().unwrap();
2712        let mut cfg = Config::default();
2713        cfg.anthropic.api_key = Some("sk-unused".into());
2714        let app = AppBuilder::new()
2715            .with_config(cfg)
2716            .with_cwd(dir.path())
2717            .with_builtin_tools()
2718            .with_llm(std::sync::Arc::new(InfiniteToolLlm))
2719            .with_max_iterations(3)
2720            .build()
2721            .await
2722            .expect("build");
2723
2724        let events: Vec<UiEvent> =
2725            futures::StreamExt::collect(app.send_user_message(UserMessage::text("loop"))).await;
2726
2727        assert!(
2728            !events.iter().any(|e| matches!(e, UiEvent::Error(_))),
2729            "MaxIterations should surface as Notice, not Error; got: {events:?}"
2730        );
2731
2732        let notice = events.iter().find_map(|e| match e {
2733            UiEvent::Notice { title, body } => Some((title.clone(), body.clone())),
2734            _ => None,
2735        });
2736        let (title, body) =
2737            notice.unwrap_or_else(|| panic!("expected Notice event; got: {events:?}"));
2738        let title_lower = title.to_lowercase();
2739        assert!(
2740            title_lower.contains("stop") || title_lower.contains("iteration"),
2741            "notice title should mention stop/iteration; got title={title:?} body={body:?}"
2742        );
2743        assert!(
2744            body.contains("3"),
2745            "body should reference the per-turn cap (3); got body={body:?}"
2746        );
2747        let body_lower = body.to_lowercase();
2748        assert!(
2749            body_lower.contains("continue") || body_lower.contains("send another"),
2750            "body should hint that user can continue; got body={body:?}"
2751        );
2752
2753        assert!(
2754            events
2755                .iter()
2756                .any(|e| matches!(e, UiEvent::AgentTurnComplete)),
2757            "AgentTurnComplete should fire on the soft-cap path; got: {events:?}"
2758        );
2759    }
2760
2761    #[test]
2762    fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
2763        let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2764        assert_eq!(progress_event_id(&tracker), "tool_unknown");
2765
2766        let only = map_event(
2767            AgentEvent::Core(CoreEvent::ToolStarted {
2768                name: "bash".into(),
2769            }),
2770            &tracker,
2771        );
2772        let only_id = match only {
2773            Some(UiEvent::ToolCallStarted { id, .. }) => id,
2774            other => panic!("{other:?}"),
2775        };
2776        assert_eq!(progress_event_id(&tracker), only_id);
2777
2778        let _second = map_event(
2779            AgentEvent::Core(CoreEvent::ToolStarted {
2780                name: "read".into(),
2781            }),
2782            &tracker,
2783        );
2784        assert_eq!(progress_event_id(&tracker), "tool_unknown");
2785    }
2786
2787    #[tokio::test]
2788    async fn builder_rejects_builtin_and_custom_tools_together() {
2789        let mut cfg = Config::default();
2790        cfg.anthropic.api_key = Some("sk-unused".into());
2791        let dir = tempfile::tempdir().unwrap();
2792        let err = match AppBuilder::new()
2793            .with_config(cfg)
2794            .with_cwd(dir.path())
2795            .with_builtin_tools()
2796            .with_custom_tools_factory(|_| Vec::new())
2797            .build()
2798            .await
2799        {
2800            Ok(_) => panic!("must reject conflicting tool configuration"),
2801            Err(err) => err,
2802        };
2803
2804        assert!(format!("{err}").contains("mutually exclusive"));
2805    }
2806
2807    /// M3 Phase A smoke: two turns share history when a SessionStore is wired.
2808    #[tokio::test]
2809    async fn two_turns_in_same_session_share_history() {
2810        #[derive(Default)]
2811        struct CounterLlm {
2812            turn: AtomicUsize,
2813        }
2814        #[async_trait]
2815        impl LlmClient for CounterLlm {
2816            async fn chat(
2817                &self,
2818                messages: &[Message],
2819                _tools: &[ToolDef],
2820            ) -> motosan_agent_loop::Result<ChatOutput> {
2821                let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2822                let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
2823                Ok(ChatOutput::new(LlmResponse::Message(answer)))
2824            }
2825        }
2826
2827        let tmp = tempfile::tempdir().unwrap();
2828        let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
2829            tmp.path().to_path_buf(),
2830        ));
2831
2832        let app = AppBuilder::new()
2833            .with_settings(crate::settings::Settings::default())
2834            .with_auth(crate::auth::Auth::default())
2835            .with_cwd(tmp.path())
2836            .with_builtin_tools()
2837            .disable_context_discovery()
2838            .with_llm(std::sync::Arc::new(CounterLlm::default()))
2839            .with_session_store(store)
2840            .build_with_session(None)
2841            .await
2842            .expect("build");
2843
2844        let _events1: Vec<UiEvent> = app
2845            .send_user_message(UserMessage::text("hi"))
2846            .collect()
2847            .await;
2848        let events2: Vec<UiEvent> = app
2849            .send_user_message(UserMessage::text("again"))
2850            .collect()
2851            .await;
2852
2853        // Turn 2's LLM saw turn 1's user message + turn 1's assistant + turn 2's new user.
2854        let saw_more_than_one = events2.iter().any(|e| {
2855            matches!(
2856                e,
2857                UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
2858            )
2859        });
2860        assert!(
2861            saw_more_than_one,
2862            "second turn should have seen history; events: {events2:?}"
2863        );
2864    }
2865}
2866
2867#[cfg(test)]
2868mod skills_builder_tests {
2869    use super::*;
2870    use crate::skills::types::{Skill, SkillSource};
2871    use std::path::PathBuf;
2872
2873    fn fixture() -> Skill {
2874        Skill {
2875            name: "x".into(),
2876            description: "d".into(),
2877            file_path: PathBuf::from("/x.md"),
2878            base_dir: PathBuf::from("/"),
2879            disable_model_invocation: false,
2880            source: SkillSource::Global,
2881        }
2882    }
2883
2884    #[test]
2885    fn with_skills_stores_skills() {
2886        let b = AppBuilder::new().with_skills(vec![fixture()]);
2887        assert_eq!(b.skills.len(), 1);
2888        assert_eq!(b.skills[0].name, "x");
2889    }
2890
2891    #[test]
2892    fn without_skills_clears() {
2893        let b = AppBuilder::new()
2894            .with_skills(vec![fixture()])
2895            .without_skills();
2896        assert!(b.skills.is_empty());
2897    }
2898}
2899
2900#[cfg(test)]
2901mod mcp_builder_tests {
2902    use super::*;
2903    use motosan_agent_tool::Tool;
2904
2905    // Trivial fake Tool just to verify with_extra_tools stores Arcs.
2906    struct FakeTool;
2907    impl Tool for FakeTool {
2908        fn def(&self) -> motosan_agent_tool::ToolDef {
2909            motosan_agent_tool::ToolDef {
2910                name: "fake__echo".into(),
2911                description: "test".into(),
2912                input_schema: serde_json::json!({"type": "object"}),
2913            }
2914        }
2915        fn call(
2916            &self,
2917            _args: serde_json::Value,
2918            _ctx: &motosan_agent_tool::ToolContext,
2919        ) -> std::pin::Pin<
2920            Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
2921        > {
2922            Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
2923        }
2924    }
2925
2926    #[test]
2927    fn with_extra_tools_stores_tools() {
2928        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
2929        let b = AppBuilder::new().with_extra_tools(tools);
2930        assert_eq!(b.extra_tools.len(), 1);
2931    }
2932}