Skip to main content

capo_agent/
app.rs

1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9    AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10    CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23/// Internal accumulator for per-session cumulative token counts. Distinct
24/// from the wire-form `UiEvent::TurnStats` (which also carries per-turn
25/// deltas + model). See spec §3.1 and §3.4.
26#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
27pub struct TurnStatsAccum {
28    pub cumulative_input: u64,
29    pub cumulative_output: u64,
30    pub turn_count: u64,
31}
32
33impl TurnStatsAccum {
34    /// Add a `TokenUsage` from a completed turn. Saturating arithmetic
35    /// — `u64::MAX` is the cap (overflow is practically unreachable
36    /// at realistic token counts).
37    pub fn add(&mut self, usage: motosan_agent_loop::TokenUsage) {
38        self.cumulative_input = self.cumulative_input.saturating_add(usage.input_tokens);
39        self.cumulative_output = self.cumulative_output.saturating_add(usage.output_tokens);
40        self.turn_count = self.turn_count.saturating_add(1);
41    }
42}
43
44/// How a fresh `AgentSession` should be constructed by `SessionFactory`.
45#[derive(Debug, Clone)]
46pub(crate) enum SessionMode {
47    /// A brand-new session (fresh ulid; persisted if a store is configured).
48    New,
49    /// Resume an existing session by id (requires a configured store).
50    Resume(String),
51}
52
53/// Everything needed to (re)build an `AgentSession` + its `LlmClient`.
54/// Stored on `App` so `new_session` / `load_session` / `switch_model` can
55/// rebuild without re-running `AppBuilder`. All fields are cheap to clone
56/// (owned values or `Arc`s).
57struct SharedLlm {
58    client: Arc<dyn LlmClient>,
59}
60
61impl SharedLlm {
62    fn new(client: Arc<dyn LlmClient>) -> Self {
63        Self { client }
64    }
65
66    fn client(&self) -> Arc<dyn LlmClient> {
67        Arc::clone(&self.client)
68    }
69}
70
71pub(crate) struct SessionFactory {
72    cwd: PathBuf,
73    settings: Arc<Mutex<crate::settings::Settings>>,
74    auth: crate::auth::Auth,
75    policy: Arc<crate::permissions::Policy>,
76    session_cache: Arc<crate::permissions::SessionCache>,
77    ui_tx: Option<mpsc::Sender<UiEvent>>,
78    headless_permissions: bool,
79    permission_gate: Arc<dyn PermissionGate>,
80    /// The SHARED progress sender. Rebuilt tools must use this exact
81    /// sender so `App.progress_rx` keeps receiving — see Task 5.
82    progress_tx: mpsc::Sender<ToolProgressChunk>,
83    skills: Arc<Vec<crate::skills::Skill>>,
84    install_builtin_tools: bool,
85    extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
86    max_iterations: usize,
87    context_discovery_disabled: bool,
88    autocompact_enabled: bool,
89    session_store: Option<Arc<dyn SessionStore>>,
90    llm_override: Option<Arc<dyn LlmClient>>,
91    current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
92    cancel_token: SharedCancelToken,
93}
94
95impl SessionFactory {
96    fn settings(&self) -> crate::settings::Settings {
97        match self.settings.lock() {
98            Ok(guard) => guard.clone(),
99            Err(poisoned) => poisoned.into_inner().clone(),
100        }
101    }
102
103    fn store_settings(&self, settings: crate::settings::Settings) {
104        match self.settings.lock() {
105            Ok(mut guard) => *guard = settings,
106            Err(poisoned) => *poisoned.into_inner() = settings,
107        }
108    }
109
110    fn current_model(&self) -> Option<crate::model::ModelId> {
111        match self.current_model.lock() {
112            Ok(guard) => guard.clone(),
113            Err(poisoned) => poisoned.into_inner().clone(),
114        }
115    }
116
117    fn set_current_model(&self, model: crate::model::ModelId) {
118        match self.current_model.lock() {
119            Ok(mut guard) => *guard = Some(model),
120            Err(poisoned) => *poisoned.into_inner() = Some(model),
121        }
122    }
123
124    fn clear_current_model(&self) {
125        match self.current_model.lock() {
126            Ok(mut guard) => *guard = None,
127            Err(poisoned) => *poisoned.into_inner() = None,
128        }
129    }
130
131    /// Build a fresh `AgentSession` + its `LlmClient`. `model_override`,
132    /// when set, replaces `settings.model.name` for this build (used by
133    /// `switch_model`). `settings_override`, when set, supplies the full
134    /// settings struct for this build and bypasses any sticky `/model`
135    /// override (used by `/settings`). The returned `LlmClient` is what
136    /// `App.llm` should be swapped to.
137    async fn build(
138        &self,
139        mode: SessionMode,
140        model_override: Option<&crate::model::ModelId>,
141        settings_override: Option<&crate::settings::Settings>,
142    ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
143        // Apply the model override (or last switched model) onto a settings clone.
144        // A settings override is authoritative: `/settings` should not be
145        // shadowed by an earlier `/model` sticky override.
146        let effective_model = model_override.cloned().or_else(|| {
147            if settings_override.is_some() {
148                None
149            } else {
150                self.current_model()
151            }
152        });
153        let mut settings = settings_override
154            .cloned()
155            .unwrap_or_else(|| self.settings());
156        if let Some(m) = &effective_model {
157            settings.model.name = m.as_str().to_string();
158        }
159
160        let llm = if effective_model.is_none() {
161            self.llm_override.as_ref().map_or_else(
162                || build_llm_client(&settings, &self.auth),
163                |llm| Ok(Arc::clone(llm)),
164            )?
165        } else {
166            build_llm_client(&settings, &self.auth)?
167        };
168
169        // Tools — rebuilt fresh each time, sharing the SAME progress_tx so
170        // `App.progress_rx` keeps receiving from whatever session is live.
171        let tool_ctx = ToolCtx::new_with_cancel_token(
172            &self.cwd,
173            Arc::clone(&self.permission_gate),
174            self.progress_tx.clone(),
175            self.cancel_token.clone(),
176        );
177        let mut tools = if self.install_builtin_tools {
178            builtin_tools(tool_ctx.clone())
179        } else {
180            Vec::new()
181        };
182        tools.extend(self.extra_tools.iter().cloned());
183
184        let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
185        let base_prompt = build_system_prompt(&tool_names, &self.skills);
186        let system_prompt = if self.context_discovery_disabled {
187            base_prompt
188        } else {
189            let agent_dir = crate::paths::agent_dir();
190            let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
191            crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
192        };
193        let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
194
195        let mut engine_builder = Engine::builder()
196            .max_iterations(self.max_iterations)
197            .system_prompt(system_prompt)
198            .tool_context(motosan_tool_context);
199        for tool in tools {
200            engine_builder = engine_builder.tool(tool);
201        }
202        if let Some(ui_tx) = &self.ui_tx {
203            let ext = crate::permissions::PermissionExtension::new(
204                Arc::clone(&self.policy),
205                Arc::clone(&self.session_cache),
206                self.cwd.clone(),
207                ui_tx.clone(),
208            );
209            engine_builder = engine_builder.extension(Box::new(ext));
210        } else if self.headless_permissions {
211            let ext = crate::permissions::PermissionExtension::headless(
212                Arc::clone(&self.policy),
213                Arc::clone(&self.session_cache),
214                self.cwd.clone(),
215            );
216            engine_builder = engine_builder.extension(Box::new(ext));
217        }
218        if self.autocompact_enabled
219            && settings.session.compact_at_context_pct > 0.0
220            && settings.session.compact_at_context_pct < 1.0
221        {
222            let cfg = AutocompactConfig {
223                threshold: settings.session.compact_at_context_pct,
224                max_context_tokens: settings.session.max_context_tokens,
225                keep_turns: settings.session.keep_turns.max(1),
226            };
227            engine_builder = engine_builder
228                .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
229        }
230        let engine = engine_builder.build();
231
232        let session = match (&mode, &self.session_store) {
233            (SessionMode::Resume(id), Some(store)) => {
234                let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
235                    .await
236                    .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
237                let entries = s
238                    .entries()
239                    .await
240                    .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
241                crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
242                s
243            }
244            (SessionMode::Resume(_), None) => {
245                return Err(AppError::Config("resume requires a session store".into()));
246            }
247            (SessionMode::New, Some(store)) => {
248                let id = crate::session::SessionId::new();
249                AgentSession::new_with_store(
250                    id.into_string(),
251                    Arc::clone(store),
252                    engine,
253                    Arc::clone(&llm),
254                )
255            }
256            (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
257        };
258
259        Ok((session, llm))
260    }
261}
262
263pub struct App {
264    session: arc_swap::ArcSwap<AgentSession>,
265    llm: arc_swap::ArcSwap<SharedLlm>,
266    factory: SessionFactory,
267    config: Config,
268    cancel_token: SharedCancelToken,
269    progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
270    next_tool_id: Arc<Mutex<ToolCallTracker>>,
271    skills: Arc<Vec<crate::skills::Skill>>,
272    mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
273    /// Loaded extensions for hook dispatch. Cheap to clone (Arc).
274    pub(crate) extension_registry: Arc<crate::extensions::ExtensionRegistry>,
275    /// Cumulative token counts for this session. Cheap to clone (Arc).
276    /// Updated after each turn's `AgentResult.usage`; not persisted across
277    /// `/resume` (motosan-agent-loop 0.21.1 doesn't carry per-turn
278    /// `TokenUsage` in `MessageMeta` — see spec §3.4 graceful degradation).
279    pub(crate) token_tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>,
280    extension_diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
281    pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
282}
283
284impl App {
285    pub fn config(&self) -> &Config {
286        &self.config
287    }
288
289    /// Request graceful cancellation of the currently-running turn (if any).
290    /// Safe to call from any task; the engine observes the token and halts
291    /// at the next safe point.
292    pub fn cancel(&self) {
293        self.cancel_token.cancel();
294    }
295
296    /// Exposes the session cache so front-ends can write "Allow for session"
297    /// decisions resolved by the user. Exposed as `Arc` so callers can drop
298    /// their reference freely.
299    pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
300        Arc::clone(&self.session_cache)
301    }
302
303    /// Read-only handle on the loaded extensions registry. For the
304    /// `Command::ListExtensions` dispatcher reply.
305    pub fn extension_registry(&self) -> Arc<crate::extensions::ExtensionRegistry> {
306        Arc::clone(&self.extension_registry)
307    }
308
309    /// Snapshot of the live settings currently used for session rebuilds.
310    pub fn settings(&self) -> crate::settings::Settings {
311        let mut settings = self.factory.settings();
312        if let Some(model) = self.factory.current_model() {
313            settings.model.name = model.to_string();
314        }
315        settings
316    }
317
318    /// Read-only handle on the cumulative token tally.
319    pub fn token_tally(&self) -> Arc<tokio::sync::Mutex<TurnStatsAccum>> {
320        Arc::clone(&self.token_tally)
321    }
322
323    /// Load-time diagnostics collected while building the extension registry.
324    pub fn extension_diagnostics(&self) -> Arc<Vec<crate::extensions::ExtensionDiagnostic>> {
325        Arc::clone(&self.extension_diagnostics)
326    }
327
328    /// M3: the underlying motosan session id. Always populated; ephemeral
329    /// sessions use a synthetic id internally.
330    pub fn session_id(&self) -> String {
331        self.session.load().session_id().to_string()
332    }
333
334    /// M3: snapshot of the session's persisted message history. Used by the
335    /// binary on `--continue` / `--session` to seed the TUI transcript so
336    /// the user can see what was said in prior runs. `AgentSession::resume`
337    /// already populates this internally for the *agent* to use as context
338    /// on the next turn; this method exposes it so the *front-end* can
339    /// render it too. Returns motosan's `Vec<Message>` verbatim (callers
340    /// decide how to map roles → UI blocks; system messages are typically
341    /// dropped because they're the prompt, not transcript).
342    pub async fn session_history(
343        &self,
344    ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
345        self.session.load_full().history().await
346    }
347
348    /// Manually compact the current session's context. Forces a one-shot
349    /// compaction (a zero-threshold `ThresholdStrategy` so `should_compact`
350    /// is always true), keeping the most recent few user turns. The
351    /// compaction marker is appended to the session jsonl like an
352    /// autocompact event.
353    pub async fn compact(&self) -> Result<()> {
354        use motosan_agent_loop::ThresholdStrategy;
355        let strategy = ThresholdStrategy {
356            threshold: 0.0,
357            ..ThresholdStrategy::default()
358        };
359        let llm = self.llm.load_full().client();
360        self.session
361            .load_full()
362            .maybe_compact(&strategy, llm)
363            .await
364            .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
365        Ok(())
366    }
367
368    /// Replace the live session with a brand-new empty one. An in-flight
369    /// turn (if any) keeps its own session snapshot and is unaffected; the
370    /// next `send_user_message` uses the new session.
371    pub async fn new_session(&self) -> Result<()> {
372        self.fire_session_before_switch("new", None).await?;
373        let (session, llm) = self.factory.build(SessionMode::New, None, None).await?;
374        self.session.store(Arc::new(session));
375        self.llm.store(Arc::new(SharedLlm::new(llm)));
376        self.reset_token_tally().await;
377        Ok(())
378    }
379
380    /// Replace the live session with a stored session loaded by id.
381    /// Errors if no session store is configured or the id is unknown.
382    ///
383    /// **Not turn-safe** — see `switch_model`'s doc. Phase D2's command
384    /// handler must gate this on no active turn.
385    pub async fn load_session(&self, id: &str) -> Result<()> {
386        self.fire_session_before_switch("load", Some(id)).await?;
387        self.load_session_without_hook(id).await
388    }
389
390    async fn load_session_without_hook(&self, id: &str) -> Result<()> {
391        let (session, llm) = self
392            .factory
393            .build(SessionMode::Resume(id.to_string()), None, None)
394            .await?;
395        self.session.store(Arc::new(session));
396        self.llm.store(Arc::new(SharedLlm::new(llm)));
397        self.reset_token_tally().await;
398        Ok(())
399    }
400
401    async fn reset_token_tally(&self) {
402        let mut tally = self.token_tally.lock().await;
403        *tally = TurnStatsAccum::default();
404    }
405
406    /// Copy the live session to a brand-new, fully independent session file
407    /// and switch to the copy. Unlike `/fork` (an in-file branch), the clone
408    /// is a separate file and appears in the `/resume` picker. Returns the
409    /// new session id. Requires a session store.
410    ///
411    /// **Not turn-safe** — like `load_session`, the caller (the
412    /// `Command::CloneSession` arm in `forward_commands`) must gate this on
413    /// no active turn.
414    pub async fn clone_session(&self) -> Result<String> {
415        self.fire_session_before_switch("clone", None).await?;
416        let Some(store) = self.factory.session_store.as_ref() else {
417            return Err(AppError::Config("clone requires a session store".into()));
418        };
419        let source_id = self.session.load().session_id().to_string();
420        let new_id = crate::session::SessionId::new().into_string();
421        let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
422        catalog
423            .fork(&source_id, &new_id)
424            .await
425            .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
426        self.load_session_without_hook(&new_id).await?;
427        Ok(new_id)
428    }
429
430    /// Rebuild the live session with a different model, preserving the
431    /// conversation. Requires a session store (the current session is
432    /// resumed by id under the new model). Errors otherwise.
433    ///
434    /// **Not turn-safe.** Like `load_session`, this resumes a session by
435    /// id. If a turn is still in flight on the *old* session when this is
436    /// called, that turn keeps writing entries under the same `session_id`
437    /// while the resumed session reads from it — a write/read interleaving
438    /// that the resumed session won't observe until its next reload. The
439    /// `App` API does not expose a turn-in-progress flag, so the *caller*
440    /// (Phase D2's `/model` / `/resume` command handlers in
441    /// `forward_commands`) MUST gate these calls on no active turn — the
442    /// `active_turn` bookkeeping already in `forward_commands` is exactly
443    /// that gate. D2's plan must wire that guard; D1 documents the contract.
444    pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
445        self.fire_session_before_switch("model_switch", None)
446            .await?;
447        let current_id = self.session.load().session_id().to_string();
448        let (session, llm) = self
449            .factory
450            .build(SessionMode::Resume(current_id), Some(model), None)
451            .await?;
452        self.factory.set_current_model(model.clone());
453        self.session.store(Arc::new(session));
454        self.llm.store(Arc::new(SharedLlm::new(llm)));
455        Ok(())
456    }
457
458    /// Replace the live session's settings + rebuild the underlying
459    /// session and LLM client. Used by the TUI `/settings` editor (via
460    /// `Command::ApplySettings`) and external RPC clients (via
461    /// `Command::ReloadSettings`).
462    ///
463    /// Does NOT fire `session_before_switch` — settings save is
464    /// user-intentional + the editor is right there; blocking via an
465    /// extension would be hostile UX. See v0.8 Phase B plan.
466    ///
467    /// Per the v0.8 Phase A mutable-accumulator audit lesson, resets
468    /// `token_tally` (the cumulative count is per-session; new settings
469    /// = effectively a new session even if the session id is preserved).
470    ///
471    /// **Not turn-safe** — like `switch_model`, the caller must gate on
472    /// no active turn.
473    pub async fn reload_settings(&self, new_settings: crate::settings::Settings) -> Result<()> {
474        // Rebuild session + LLM with the override. SessionMode::Resume
475        // preserves the session id and history; the LLM is rebuilt from
476        // the new settings (could be a different provider entirely).
477        let current_id = self.session.load().session_id().to_string();
478        let (session, llm) = self
479            .factory
480            .build(SessionMode::Resume(current_id), None, Some(&new_settings))
481            .await?;
482        self.factory.store_settings(new_settings);
483        self.factory.clear_current_model();
484        self.session.store(Arc::new(session));
485        self.llm.store(Arc::new(SharedLlm::new(llm)));
486
487        // Per v0.8 Phase A audit rule (mutable-accumulator extension):
488        // reset token tally on session replacement.
489        self.reset_token_tally().await;
490
491        Ok(())
492    }
493
494    /// M4 Phase B: disconnect every registered MCP server (2s per-server
495    /// timeout, best-effort). Call from the binary's ctrl-C handler.
496    pub async fn disconnect_mcp(&self) {
497        for (name, server) in &self.mcp_servers {
498            let _ =
499                tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
500            tracing::debug!(target: "mcp", server = %name, "disconnected");
501        }
502    }
503
504    fn run_turn(
505        &self,
506        msg: crate::user_message::UserMessage,
507        fork_from: Option<motosan_agent_loop::EntryId>,
508    ) -> impl Stream<Item = UiEvent> + Send + 'static {
509        let session = self.session.load_full();
510        let skills = Arc::clone(&self.skills);
511        let cancel_token = self.cancel_token.clone();
512        let tracker = Arc::clone(&self.next_tool_id);
513        let progress = Arc::clone(&self.progress_rx);
514        let token_tally = Arc::clone(&self.token_tally);
515        let settings_model_name = self
516            .factory
517            .current_model()
518            .map(|model| model.to_string())
519            .unwrap_or_else(|| self.factory.settings().model.name);
520
521        async_stream::stream! {
522            // Validate + encode attachments BEFORE the single-turn guard so a
523            // bad attachment error is never masked by "another turn is already
524            // running". See spec §3.
525            let new_user = {
526                // Skill expansion only applies to the user-visible text, not
527                // attachments. We thread the expanded text back into the
528                // UserMessage before preparation.
529                let expanded_text = crate::skills::expand::expand_skill_command(&msg.text, &skills);
530                let expanded_msg = crate::user_message::UserMessage {
531                    text: expanded_text,
532                    attachments: msg.attachments.clone(),
533                };
534                match crate::user_message::prepare_user_message(&expanded_msg) {
535                    Ok(m) => m,
536                    Err(err) => {
537                        yield UiEvent::AttachmentError {
538                            kind: err.kind(),
539                            message: err.to_string(),
540                        };
541                        return;
542                    }
543                }
544            };
545
546            // Single-turn guard (M2 contract preserved).
547            let mut progress_guard = match progress.try_lock() {
548                Ok(guard) => guard,
549                Err(_) => {
550                    yield UiEvent::Error(
551                        "another turn is already running; capo is single-turn-per-App".into(),
552                    );
553                    return;
554                }
555            };
556
557            // Reset cancel token for THIS turn.
558            let cancel = cancel_token.reset();
559
560            yield UiEvent::AgentTurnStarted;
561            yield UiEvent::AgentThinking;
562
563            // Start the turn (gives us a TurnHandle with stream + previous_len + branch_parent + ops_tx).
564            let handle = match fork_from {
565                None => {
566                    // Linear turn: load history, append, start_turn.
567                    let history = match session.history().await {
568                        Ok(h) => h,
569                        Err(err) => {
570                            yield UiEvent::Error(format!("session.history failed: {err}"));
571                            return;
572                        }
573                    };
574                    let mut messages = history;
575                    messages.push(new_user);
576                    match session.start_turn(messages).await {
577                        Ok(h) => h,
578                        Err(err) => {
579                            yield UiEvent::Error(format!("session.start_turn failed: {err}"));
580                            return;
581                        }
582                    }
583                }
584                Some(from) => {
585                    // Fork: fork_turn assembles the branch's prior history itself.
586                    match session.fork_turn(from, vec![new_user]).await {
587                        Ok(h) => h,
588                        Err(err) => {
589                            yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
590                            return;
591                        }
592                    }
593                }
594            };
595            let previous_len = handle.previous_len;
596            let epoch = handle.epoch;
597            let branch_parent = handle.branch_parent;
598            let ops_tx = handle.ops_tx.clone();
599            let mut agent_stream = handle.stream;
600
601            // Bridge our SharedCancelToken to motosan's AgentOp::Interrupt.
602            //
603            // `AgentSession::start_turn` does NOT take a CancellationToken —
604            // the engine's cancel-token path used in M2's direct `.run().cancel(tok)`
605            // is bypassed when going through AgentSession. The control plane is
606            // `ops_tx`. We spawn a tiny task that waits on `cancel.cancelled()`
607            // and forwards an `Interrupt` op when fired.
608            let interrupt_bridge = tokio::spawn(async move {
609                cancel.cancelled().await;
610                let _ = ops_tx.send(AgentOp::Interrupt).await;
611            });
612
613            // Drain events.
614            let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
615            let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
616
617            loop {
618                // Forward any progress chunks that arrived in this iteration.
619                while let Ok(chunk) = progress_guard.try_recv() {
620                    yield UiEvent::ToolCallProgress {
621                        id: progress_event_id(&tracker),
622                        chunk: ProgressChunk::from(chunk),
623                    };
624                }
625
626                tokio::select! {
627                    biased;
628                    maybe_item = agent_stream.next() => {
629                        match maybe_item {
630                            Some(AgentStreamItem::Event(ev)) => {
631                                if let Some(ui) = map_event(ev, &tracker) {
632                                    yield ui;
633                                }
634                            }
635                            Some(AgentStreamItem::Terminal(term)) => {
636                                terminal_result = Some(term.result);
637                                terminal_messages = Some(term.messages);
638                                break;
639                            }
640                            None => break,
641                        }
642                    }
643                    Some(chunk) = progress_guard.recv() => {
644                        yield UiEvent::ToolCallProgress {
645                            id: progress_event_id(&tracker),
646                            chunk: ProgressChunk::from(chunk),
647                        };
648                    }
649                }
650            }
651
652            // Tear down the interrupt bridge whether or not cancellation fired.
653            interrupt_bridge.abort();
654
655            // Persist new messages via record_turn_outcome (only when terminal reached).
656            if let Some(msgs) = terminal_messages.as_ref() {
657                if let Err(err) = session
658                    .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
659                    .await
660                {
661                    yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
662                }
663            }
664
665            // Translate the terminal Result into final UiEvents.
666            match terminal_result {
667                Some(Ok(result)) => {
668                    let final_text = terminal_messages
669                        .as_ref()
670                        .and_then(|msgs| {
671                            msgs.iter()
672                                .rev()
673                                .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
674                                .map(|m| m.text())
675                        })
676                        .unwrap_or_default();
677                    if !final_text.is_empty() {
678                        yield UiEvent::AgentMessageComplete(final_text);
679                    }
680                    // Emit TurnStats after AgentMessageComplete, before
681                    // AgentTurnComplete. See spec §3.2 emit ordering.
682                    let usage = result.usage;
683                    let (cumulative_input, cumulative_output) = {
684                        let mut tally = token_tally.lock().await;
685                        tally.add(usage);
686                        (tally.cumulative_input, tally.cumulative_output)
687                    };
688                    yield UiEvent::TurnStats {
689                        input_tokens: usage.input_tokens,
690                        output_tokens: usage.output_tokens,
691                        cumulative_input,
692                        cumulative_output,
693                        model: settings_model_name.clone(),
694                    };
695                    // Flush any remaining progress chunks.
696                    while let Ok(chunk) = progress_guard.try_recv() {
697                        yield UiEvent::ToolCallProgress {
698                            id: progress_event_id(&tracker),
699                            chunk: ProgressChunk::from(chunk),
700                        };
701                    }
702                    yield UiEvent::AgentTurnComplete;
703                }
704                Some(Err(err)) => {
705                    yield UiEvent::Error(format!("{err}"));
706                }
707                None => { /* stream closed without terminal — cancelled */ }
708            }
709        }
710    }
711
712    pub fn send_user_message(
713        &self,
714        msg: crate::user_message::UserMessage,
715    ) -> impl Stream<Item = UiEvent> + Send + 'static {
716        self.run_turn(msg, None)
717    }
718
719    /// Continue the conversation from an earlier entry, creating a branch.
720    /// `from` is the `EntryId` to attach under (typically a past user
721    /// message from `fork_candidates`). Streams `UiEvent`s exactly as
722    /// `send_user_message` does.
723    pub fn fork_from(
724        &self,
725        from: motosan_agent_loop::EntryId,
726        message: crate::user_message::UserMessage,
727    ) -> impl Stream<Item = UiEvent> + Send + 'static {
728        let registry = Arc::clone(&self.extension_registry);
729        let inner = self.run_turn(message, Some(from));
730        async_stream::stream! {
731            use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
732            match dispatch_session_before_switch(&registry, "fork", None).await {
733                HookOutcome::Continue => {}
734                HookOutcome::Cancelled { extension_name, reason } => {
735                    let msg = match reason {
736                        Some(r) => format!("extension `{extension_name}` cancelled fork: {r}"),
737                        None => format!("extension `{extension_name}` cancelled fork"),
738                    };
739                    yield UiEvent::Error(msg);
740                    return;
741                }
742            }
743            let mut inner = Box::pin(inner);
744            while let Some(ev) = futures::StreamExt::next(&mut inner).await {
745                yield ev;
746            }
747        }
748    }
749
750    /// User messages on the current (active) branch, newest first, each
751    /// paired with its `EntryId` and a one-line preview — the rows the
752    /// `/fork` picker offers.
753    pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
754        let entries = self
755            .session
756            .load_full()
757            .entries()
758            .await
759            .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
760        let branch = motosan_agent_loop::active_branch(&entries);
761        let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
762            .iter()
763            .filter_map(|stored| {
764                let msg = stored.entry.as_message()?;
765                if !matches!(msg.role(), motosan_agent_loop::Role::User) {
766                    return None;
767                }
768                let preview: String = msg
769                    .text()
770                    .lines()
771                    .next()
772                    .unwrap_or("")
773                    .chars()
774                    .take(80)
775                    .collect();
776                Some((stored.id.clone(), preview))
777            })
778            .collect();
779        out.reverse();
780        Ok(out)
781    }
782
783    /// The session log as a navigable branch tree — for the `/tree` UI.
784    pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
785        self.session
786            .load_full()
787            .branches()
788            .await
789            .map_err(|e| AppError::Config(format!("branches failed: {e}")))
790    }
791
792    /// Fire `session_before_switch` and translate `Cancelled` into
793    /// `AppError::HookCancelled`. Returns `Ok(())` on `Continue`.
794    async fn fire_session_before_switch(
795        &self,
796        reason: &str,
797        session_id: Option<&str>,
798    ) -> Result<()> {
799        use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
800        match dispatch_session_before_switch(&self.extension_registry, reason, session_id).await {
801            HookOutcome::Continue => Ok(()),
802            HookOutcome::Cancelled {
803                extension_name,
804                reason,
805            } => Err(AppError::HookCancelled {
806                extension_name,
807                reason,
808            }),
809        }
810    }
811}
812
813#[derive(Debug, Default)]
814struct ToolCallTracker {
815    next_id: usize,
816    pending: VecDeque<(String, String)>,
817}
818
819impl ToolCallTracker {
820    fn start(&mut self, name: &str) -> String {
821        self.next_id += 1;
822        let id = format!("tool_{}", self.next_id);
823        self.pending.push_back((name.to_string(), id.clone()));
824        id
825    }
826
827    fn complete(&mut self, name: &str) -> String {
828        if let Some(pos) = self
829            .pending
830            .iter()
831            .position(|(pending_name, _)| pending_name == name)
832        {
833            if let Some((_, id)) = self.pending.remove(pos) {
834                return id;
835            }
836        }
837
838        self.next_id += 1;
839        format!("tool_{}", self.next_id)
840    }
841
842    // ToolProgressChunk does not carry a tool-call id. When exactly one tool is
843    // pending we can attribute progress safely; otherwise we must not guess from
844    // queue order (for example `pending.back()`), because concurrent tool calls
845    // would mislabel output.
846    fn progress_id(&self) -> Option<String> {
847        match self.pending.len() {
848            1 => self.pending.front().map(|(_, id)| id.clone()),
849            _ => None,
850        }
851    }
852}
853
854fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
855    match tracker.lock() {
856        Ok(guard) => guard,
857        Err(poisoned) => poisoned.into_inner(),
858    }
859}
860
861fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
862    lock_tool_tracker(tracker)
863        .progress_id()
864        .unwrap_or_else(|| "tool_unknown".to_string())
865}
866
867fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
868where
869    F: Fn(&str) -> Option<String>,
870{
871    env_lookup("ANTHROPIC_API_KEY")
872        .map(|key| key.trim().to_string())
873        .filter(|key| !key.is_empty())
874        .or_else(|| auth.api_key("anthropic").map(str::to_string))
875}
876
877fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
878    match ev {
879        AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
880        AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
881            let id = lock_tool_tracker(tool_tracker).start(&name);
882            Some(UiEvent::ToolCallStarted {
883                id,
884                name,
885                args: serde_json::json!({}),
886            })
887        }
888        AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
889            let id = lock_tool_tracker(tool_tracker).complete(&name);
890            Some(UiEvent::ToolCallCompleted {
891                id,
892                result: UiToolResult {
893                    is_error: result.is_error,
894                    text: format!("{name}: {result:?}"),
895                },
896            })
897        }
898        _ => None,
899    }
900}
901
902type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
903
904pub struct AppBuilder {
905    config: Option<Config>,
906    cwd: Option<PathBuf>,
907    permission_gate: Option<Arc<dyn PermissionGate>>,
908    install_builtin_tools: bool,
909    max_iterations: usize,
910    llm_override: Option<Arc<dyn LlmClient>>,
911    custom_tools_factory: Option<CustomToolsFactory>,
912    permissions_policy_path: Option<PathBuf>,
913    ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
914    headless_permissions: bool,
915    settings: Option<crate::settings::Settings>,
916    auth: Option<crate::auth::Auth>,
917    context_discovery_disabled: bool,
918    // M3 Phase A:
919    session_store: Option<Arc<dyn SessionStore>>,
920    resume_session_id: Option<crate::session::SessionId>,
921    autocompact_enabled: bool,
922    // M4 Phase A:
923    skills: Vec<crate::skills::Skill>,
924    // M4 Phase B:
925    extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
926    mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
927    extension_registry: Option<Arc<crate::extensions::ExtensionRegistry>>,
928    extension_diagnostics: Option<Arc<Vec<crate::extensions::ExtensionDiagnostic>>>,
929    token_tally: Option<Arc<tokio::sync::Mutex<TurnStatsAccum>>>,
930}
931
932impl Default for AppBuilder {
933    fn default() -> Self {
934        Self {
935            config: None,
936            cwd: None,
937            permission_gate: None,
938            install_builtin_tools: false,
939            max_iterations: 20,
940            llm_override: None,
941            custom_tools_factory: None,
942            permissions_policy_path: None,
943            ui_tx: None,
944            headless_permissions: false,
945            settings: None,
946            auth: None,
947            context_discovery_disabled: false,
948            session_store: None,
949            resume_session_id: None,
950            autocompact_enabled: false,
951            skills: Vec::new(),
952            extra_tools: Vec::new(),
953            mcp_servers: Vec::new(),
954            extension_registry: None,
955            extension_diagnostics: None,
956            token_tally: None,
957        }
958    }
959}
960
961impl AppBuilder {
962    pub fn new() -> Self {
963        Self::default()
964    }
965
966    pub fn with_config(mut self, cfg: Config) -> Self {
967        self.config = Some(cfg);
968        self
969    }
970
971    pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
972        self.cwd = Some(cwd.into());
973        self
974    }
975
976    pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
977        self.permission_gate = Some(gate);
978        self
979    }
980
981    /// Install Capo's builtin tools (`read`, `bash`).
982    ///
983    /// This is mutually exclusive with `with_custom_tools_factory` /
984    /// `build_with_custom_tools`; `build()` returns a configuration error if
985    /// both are set.
986    pub fn with_builtin_tools(mut self) -> Self {
987        self.install_builtin_tools = true;
988        self
989    }
990
991    pub fn with_max_iterations(mut self, n: usize) -> Self {
992        self.max_iterations = n;
993        self
994    }
995
996    pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
997        self.llm_override = Some(llm);
998        self
999    }
1000
1001    pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
1002        self.permissions_policy_path = Some(path);
1003        self
1004    }
1005
1006    pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
1007        self.ui_tx = Some(tx);
1008        self
1009    }
1010
1011    /// Test/library hook to inject a pre-seeded token tally.
1012    pub fn with_token_tally(mut self, tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>) -> Self {
1013        self.token_tally = Some(tally);
1014        self
1015    }
1016
1017    /// Register `PermissionExtension` in headless-deny mode — `permissions.toml`
1018    /// and read-only auto-allow still apply, but a tool call that would
1019    /// otherwise prompt is denied (there is no UI to ask). Used by `--json`.
1020    /// Ignored if `with_ui_channel` is also set (an interactive channel wins).
1021    pub fn with_headless_permissions(mut self) -> Self {
1022        self.headless_permissions = true;
1023        self
1024    }
1025
1026    /// M3: install user `Settings`. Replaces `with_config` for new code.
1027    pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
1028        self.settings = Some(settings);
1029        self
1030    }
1031
1032    /// M3: install `Auth` (credentials for LLM providers).
1033    pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
1034        self.auth = Some(auth);
1035        self
1036    }
1037
1038    /// M3: disable AGENTS.md / CLAUDE.md discovery for this App.
1039    /// Opt-out — discovery is on by default in `build()`.
1040    pub fn disable_context_discovery(mut self) -> Self {
1041        self.context_discovery_disabled = true;
1042        self
1043    }
1044
1045    /// M3 Phase A: install a `SessionStore` (e.g. `motosan_agent_loop::FileSessionStore`)
1046    /// for persistence. When omitted, sessions are ephemeral (no jsonl on disk).
1047    pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
1048        self.session_store = Some(store);
1049        self
1050    }
1051
1052    /// M3 Phase A: enable autocompact at the `Settings::session.compact_at_context_pct`
1053    /// threshold. Requires `with_settings` to have been called; otherwise uses
1054    /// `Settings::default()`. Settings provide `max_context_tokens` and
1055    /// `keep_turns`. No-op when `settings.session.compact_at_context_pct == 0.0`.
1056    pub fn with_autocompact(mut self) -> Self {
1057        self.autocompact_enabled = true;
1058        self
1059    }
1060
1061    /// M4 Phase A: register skills. Pass an empty Vec (or omit the call,
1062    /// or call `without_skills()`) to disable skill injection. Skills are
1063    /// rendered into the system prompt's `<available_skills>` block when
1064    /// the `read` tool is available, and matched against `/skill:<name>`
1065    /// expansion before user messages reach the LLM.
1066    pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
1067        self.skills = skills;
1068        self
1069    }
1070
1071    pub fn without_skills(mut self) -> Self {
1072        self.skills.clear();
1073        self
1074    }
1075
1076    /// M4 Phase B: register additional tools (typically MCP). Unlike
1077    /// `with_custom_tools_factory`, this APPENDS to the builtin tools
1078    /// and does NOT replace them.
1079    pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
1080        self.extra_tools = tools;
1081        self
1082    }
1083
1084    pub fn with_extension_registry(
1085        mut self,
1086        registry: Arc<crate::extensions::ExtensionRegistry>,
1087    ) -> Self {
1088        self.extension_registry = Some(registry);
1089        self
1090    }
1091
1092    pub fn with_extension_diagnostics(
1093        mut self,
1094        diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
1095    ) -> Self {
1096        self.extension_diagnostics = Some(diagnostics);
1097        self
1098    }
1099
1100    /// M4 Phase B: register MCP server handles so `App::disconnect_mcp`
1101    /// can iterate them on shutdown. Storage only — does not connect.
1102    pub fn with_mcp_servers(
1103        mut self,
1104        servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1105    ) -> Self {
1106        self.mcp_servers = servers;
1107        self
1108    }
1109
1110    /// Install a custom tool set for this app.
1111    ///
1112    /// This is mutually exclusive with `with_builtin_tools`; `build()` returns
1113    /// a configuration error if both are set.
1114    pub fn with_custom_tools_factory(
1115        mut self,
1116        factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1117    ) -> Self {
1118        self.custom_tools_factory = Some(Box::new(factory));
1119        self
1120    }
1121
1122    /// Convenience wrapper for `with_custom_tools_factory(...).build()`.
1123    ///
1124    /// This is mutually exclusive with `with_builtin_tools`.
1125    pub async fn build_with_custom_tools(
1126        self,
1127        factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1128    ) -> Result<App> {
1129        self.with_custom_tools_factory(factory).build().await
1130    }
1131
1132    /// M3 Phase A: build the App with an optional resume session id.
1133    ///
1134    /// - `Some(id)` + `session_store: Some(_)` → `AgentSession::resume(id, store, engine, llm)`,
1135    ///   then replay history into `ToolCtx.read_files`.
1136    /// - `Some(id)` + `session_store: None` → error (resume requires a store).
1137    /// - `None` + `session_store: Some(_)` → `AgentSession::new_with_store(fresh_id, store, engine, llm)`.
1138    /// - `None` + `session_store: None` → `AgentSession::new(engine, llm)` (ephemeral).
1139    pub async fn build_with_session(
1140        mut self,
1141        resume: Option<crate::session::SessionId>,
1142    ) -> Result<App> {
1143        if let Some(id) = resume {
1144            if self.session_store.is_none() {
1145                return Err(AppError::Config(
1146                    "build_with_session(Some(id)) requires with_session_store(...)".into(),
1147                ));
1148            }
1149            self.resume_session_id = Some(id);
1150        }
1151        self.build_internal().await
1152    }
1153
1154    /// Legacy entry point; equivalent to `build_with_session(None)`.
1155    pub async fn build(self) -> Result<App> {
1156        self.build_with_session(None).await
1157    }
1158
1159    async fn build_internal(mut self) -> Result<App> {
1160        let mcp_servers = std::mem::take(&mut self.mcp_servers);
1161        let extra_tools = std::mem::take(&mut self.extra_tools);
1162        let skills = self.skills.clone();
1163        if self.install_builtin_tools && self.custom_tools_factory.is_some() {
1164            return Err(AppError::Config(
1165                "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
1166            ));
1167        }
1168
1169        // Synthesise the legacy `Config` so the rest of `build()` keeps
1170        // working without a wholesale rewrite. Settings + Auth override the
1171        // deprecated `Config` when both APIs are supplied.
1172        let has_config = self.config.is_some();
1173        let has_auth = self.auth.is_some();
1174        let mut config = self.config.unwrap_or_default();
1175        let settings = match self.settings {
1176            Some(settings) => settings,
1177            None => {
1178                let mut settings = crate::settings::Settings::default();
1179                settings.model.provider = config.model.provider.clone();
1180                settings.model.name = config.model.name.clone();
1181                settings.model.max_tokens = config.model.max_tokens;
1182                settings
1183            }
1184        };
1185        config.model.provider = settings.model.provider.clone();
1186        config.model.name = settings.model.name.clone();
1187        config.model.max_tokens = settings.model.max_tokens;
1188        let mut auth = self.auth.unwrap_or_default();
1189        if !has_auth {
1190            if let Some(key) = config.anthropic.api_key.as_deref() {
1191                auth.0.insert(
1192                    "anthropic".into(),
1193                    crate::auth::ProviderAuth::ApiKey {
1194                        key: key.to_string(),
1195                    },
1196                );
1197            }
1198        }
1199        let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
1200        if env_or_auth_key.is_some() || has_auth || !has_config {
1201            config.anthropic.api_key = env_or_auth_key;
1202        }
1203        let cwd = self
1204            .cwd
1205            .or_else(|| std::env::current_dir().ok())
1206            .unwrap_or_else(|| PathBuf::from("."));
1207        let agent_dir = crate::paths::agent_dir();
1208        let permission_gate = self.permission_gate.unwrap_or_else(|| {
1209            // When no gate is provided *and* no ui channel is wired,
1210            // fall back to NoOp with a warning log; when ui channel IS
1211            // wired, the PermissionExtension handles the real decisions.
1212            if self.ui_tx.is_some() || self.headless_permissions {
1213                Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1214            } else {
1215                tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
1216                Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1217            }
1218        });
1219
1220        // Permissions.
1221        let policy: Arc<crate::permissions::Policy> =
1222            Arc::new(match self.permissions_policy_path.as_ref() {
1223                Some(path) => crate::permissions::Policy::load_or_default(path)?,
1224                None => crate::permissions::Policy::default(),
1225            });
1226        let session_cache = Arc::new(crate::permissions::SessionCache::new());
1227        let token_tally = self
1228            .token_tally
1229            .unwrap_or_else(|| Arc::new(tokio::sync::Mutex::new(TurnStatsAccum::default())));
1230
1231        // Shared progress channel consumed by `send_user_message`.
1232        let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
1233
1234        // Resolve the tool set ONCE. Builtin tools, or a custom factory's
1235        // output, plus MCP `extra_tools`. A FnOnce custom factory can't be
1236        // re-run per session, so its output is captured here as a Vec.
1237        // `probe_ctx` also yields the build-time cancel token for `App`.
1238        let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
1239        let cancel_token = probe_ctx.cancel_token.clone();
1240        let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
1241            if let Some(factory_fn) = self.custom_tools_factory.take() {
1242                let mut t = factory_fn(probe_ctx);
1243                t.extend(extra_tools.clone());
1244                (false, t)
1245            } else {
1246                (self.install_builtin_tools, extra_tools.clone())
1247            };
1248
1249        let factory = SessionFactory {
1250            cwd: cwd.clone(),
1251            settings: Arc::new(Mutex::new(settings.clone())),
1252            auth: auth.clone(),
1253            policy: Arc::clone(&policy),
1254            session_cache: Arc::clone(&session_cache),
1255            ui_tx: self.ui_tx.clone(),
1256            headless_permissions: self.headless_permissions,
1257            permission_gate: Arc::clone(&permission_gate),
1258            progress_tx: progress_tx.clone(),
1259            skills: Arc::new(skills.clone()),
1260            install_builtin_tools: install_builtin,
1261            extra_tools: factory_extra_tools,
1262            max_iterations: self.max_iterations,
1263            context_discovery_disabled: self.context_discovery_disabled,
1264            autocompact_enabled: self.autocompact_enabled,
1265            session_store: self.session_store.clone(),
1266            llm_override: self.llm_override.clone(),
1267            current_model: Arc::new(Mutex::new(None)),
1268            cancel_token: cancel_token.clone(),
1269        };
1270
1271        let mode = match self.resume_session_id.take() {
1272            Some(id) => SessionMode::Resume(id.into_string()),
1273            None => SessionMode::New,
1274        };
1275        let (session, llm) = factory.build(mode, None, None).await?;
1276        let (extension_registry, extension_diagnostics) =
1277            if let Some(reg) = self.extension_registry.take() {
1278                let diagnostics = self
1279                    .extension_diagnostics
1280                    .unwrap_or_else(|| Arc::new(Vec::new()));
1281                (reg, diagnostics)
1282            } else {
1283                let manifest_path = crate::paths::extensions_manifest_path(&agent_dir);
1284                let (registry, diagnostics) =
1285                    crate::extensions::load_extensions_manifest(&manifest_path).await;
1286                for d in &diagnostics {
1287                    let message = d.message.as_str();
1288                    match d.severity {
1289                        crate::extensions::DiagnosticSeverity::Warn => {
1290                            tracing::warn!("extensions: {message}");
1291                        }
1292                        crate::extensions::DiagnosticSeverity::Error => {
1293                            tracing::error!("extensions: {message}");
1294                        }
1295                    }
1296                }
1297                (Arc::new(registry), Arc::new(diagnostics))
1298            };
1299
1300        Ok(App {
1301            session: arc_swap::ArcSwap::from_pointee(session),
1302            llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1303            factory,
1304            config,
1305            cancel_token,
1306            progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1307            next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1308            skills: Arc::new(skills),
1309            mcp_servers,
1310            extension_registry,
1311            token_tally,
1312            extension_diagnostics,
1313            session_cache,
1314        })
1315    }
1316}
1317
1318#[cfg(test)]
1319mod tests {
1320    use super::*;
1321    use crate::config::{AnthropicConfig, ModelConfig};
1322    use crate::events::UiEvent;
1323    use crate::user_message::UserMessage;
1324    use async_trait::async_trait;
1325    use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1326    use motosan_agent_tool::ToolDef;
1327    use std::sync::atomic::{AtomicUsize, Ordering};
1328
1329    #[test]
1330    fn turn_stats_accum_accumulates_across_calls() {
1331        let mut accum = TurnStatsAccum::default();
1332        accum.add(motosan_agent_loop::TokenUsage {
1333            input_tokens: 100,
1334            output_tokens: 25,
1335        });
1336        accum.add(motosan_agent_loop::TokenUsage {
1337            input_tokens: 1000,
1338            output_tokens: 250,
1339        });
1340        assert_eq!(accum.cumulative_input, 1100);
1341        assert_eq!(accum.cumulative_output, 275);
1342        assert_eq!(accum.turn_count, 2);
1343    }
1344
1345    #[test]
1346    fn turn_stats_accum_saturates_on_overflow() {
1347        let mut accum = TurnStatsAccum {
1348            cumulative_input: u64::MAX - 5,
1349            cumulative_output: 0,
1350            turn_count: 1,
1351        };
1352        accum.add(motosan_agent_loop::TokenUsage {
1353            input_tokens: 100,
1354            output_tokens: 0,
1355        });
1356        assert_eq!(accum.cumulative_input, u64::MAX);
1357    }
1358
1359    #[tokio::test]
1360    async fn builder_fails_without_api_key() {
1361        let cfg = Config {
1362            anthropic: AnthropicConfig {
1363                api_key: None,
1364                base_url: "https://api.anthropic.com".into(),
1365            },
1366            model: ModelConfig {
1367                provider: "anthropic".into(),
1368                name: "claude-sonnet-4-6".into(),
1369                max_tokens: 4096,
1370            },
1371        };
1372        let err = match AppBuilder::new()
1373            .with_config(cfg)
1374            .with_builtin_tools()
1375            .build()
1376            .await
1377        {
1378            Ok(_) => panic!("must fail without key"),
1379            Err(err) => err,
1380        };
1381        assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1382    }
1383
1384    struct ToolOnlyLlm {
1385        turn: AtomicUsize,
1386    }
1387
1388    #[async_trait]
1389    impl LlmClient for ToolOnlyLlm {
1390        async fn chat(
1391            &self,
1392            _messages: &[Message],
1393            _tools: &[ToolDef],
1394        ) -> motosan_agent_loop::Result<ChatOutput> {
1395            let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1396            if turn == 0 {
1397                Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1398                    ToolCallItem {
1399                        id: "t1".into(),
1400                        name: "read".into(),
1401                        args: serde_json::json!({"path":"nope.txt"}),
1402                    },
1403                ])))
1404            } else {
1405                Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1406            }
1407        }
1408    }
1409
1410    #[tokio::test]
1411    async fn empty_final_message_is_not_emitted() {
1412        let dir = tempfile::tempdir().unwrap();
1413        let mut cfg = Config::default();
1414        cfg.anthropic.api_key = Some("sk-unused".into());
1415        let app = AppBuilder::new()
1416            .with_config(cfg)
1417            .with_cwd(dir.path())
1418            .with_builtin_tools()
1419            .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1420                turn: AtomicUsize::new(0),
1421            }))
1422            .build()
1423            .await
1424            .expect("build");
1425        let events: Vec<UiEvent> =
1426            futures::StreamExt::collect(app.send_user_message(UserMessage::text("x"))).await;
1427        let empties = events
1428            .iter()
1429            .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1430            .count();
1431        assert_eq!(
1432            empties, 0,
1433            "should not emit empty final message, got: {events:?}"
1434        );
1435    }
1436
1437    struct EchoLlm;
1438
1439    struct UsageLlm {
1440        responses: std::sync::Mutex<VecDeque<ChatOutput>>,
1441    }
1442
1443    #[async_trait]
1444    impl LlmClient for UsageLlm {
1445        async fn chat(
1446            &self,
1447            _messages: &[Message],
1448            _tools: &[ToolDef],
1449        ) -> motosan_agent_loop::Result<ChatOutput> {
1450            let next = match self.responses.lock() {
1451                Ok(mut responses) => responses.pop_front(),
1452                Err(poisoned) => poisoned.into_inner().pop_front(),
1453            };
1454            Ok(next.unwrap_or_else(|| panic!("UsageLlm script exhausted")))
1455        }
1456    }
1457
1458    #[async_trait]
1459    impl LlmClient for EchoLlm {
1460        async fn chat(
1461            &self,
1462            _messages: &[Message],
1463            _tools: &[ToolDef],
1464        ) -> motosan_agent_loop::Result<ChatOutput> {
1465            Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1466        }
1467    }
1468
1469    #[tokio::test]
1470    async fn turn_stats_emitted_with_cumulative_after_each_turn() {
1471        use motosan_agent_loop::{LlmResponse, TokenUsage};
1472
1473        let dir = tempfile::tempdir().expect("tempdir");
1474        let mut cfg = Config::default();
1475        cfg.anthropic.api_key = Some("sk-unused".into());
1476        let llm = Arc::new(UsageLlm {
1477            responses: std::sync::Mutex::new(VecDeque::from([
1478                ChatOutput::with_usage(
1479                    LlmResponse::Message("first".into()),
1480                    TokenUsage {
1481                        input_tokens: 100,
1482                        output_tokens: 50,
1483                    },
1484                ),
1485                ChatOutput::with_usage(
1486                    LlmResponse::Message("second".into()),
1487                    TokenUsage {
1488                        input_tokens: 200,
1489                        output_tokens: 80,
1490                    },
1491                ),
1492            ])),
1493        });
1494        let app = AppBuilder::new()
1495            .with_config(cfg)
1496            .with_cwd(dir.path())
1497            .with_builtin_tools()
1498            .with_llm(llm)
1499            .build()
1500            .await
1501            .expect("build");
1502
1503        let events_1: Vec<UiEvent> = app
1504            .send_user_message(UserMessage::text("hi"))
1505            .collect()
1506            .await;
1507        let events_2: Vec<UiEvent> = app
1508            .send_user_message(UserMessage::text("again"))
1509            .collect()
1510            .await;
1511
1512        let ts1 = events_1
1513            .iter()
1514            .find_map(|e| match e {
1515                UiEvent::TurnStats {
1516                    input_tokens,
1517                    output_tokens,
1518                    cumulative_input,
1519                    cumulative_output,
1520                    ..
1521                } => Some((
1522                    *input_tokens,
1523                    *output_tokens,
1524                    *cumulative_input,
1525                    *cumulative_output,
1526                )),
1527                _ => None,
1528            })
1529            .expect("turn 1 had no TurnStats");
1530        assert_eq!(ts1, (100, 50, 100, 50), "turn 1 stats");
1531
1532        let ts2 = events_2
1533            .iter()
1534            .find_map(|e| match e {
1535                UiEvent::TurnStats {
1536                    input_tokens,
1537                    output_tokens,
1538                    cumulative_input,
1539                    cumulative_output,
1540                    ..
1541                } => Some((
1542                    *input_tokens,
1543                    *output_tokens,
1544                    *cumulative_input,
1545                    *cumulative_output,
1546                )),
1547                _ => None,
1548            })
1549            .expect("turn 2 had no TurnStats");
1550        assert_eq!(ts2, (200, 80, 300, 130), "turn 2 stats");
1551
1552        let positions: Vec<&str> = events_1
1553            .iter()
1554            .filter_map(|e| match e {
1555                UiEvent::AgentMessageComplete(_) => Some("msg_complete"),
1556                UiEvent::TurnStats { .. } => Some("stats"),
1557                UiEvent::AgentTurnComplete => Some("turn_complete"),
1558                _ => None,
1559            })
1560            .collect();
1561        assert_eq!(
1562            positions,
1563            vec!["msg_complete", "stats", "turn_complete"],
1564            "wrong ordering"
1565        );
1566    }
1567
1568    #[tokio::test]
1569    async fn turn_stats_reset_after_new_session() {
1570        use motosan_agent_loop::{LlmResponse, TokenUsage};
1571
1572        let dir = tempfile::tempdir().expect("tempdir");
1573        let mut cfg = Config::default();
1574        cfg.anthropic.api_key = Some("sk-unused".into());
1575        let llm = Arc::new(UsageLlm {
1576            responses: std::sync::Mutex::new(VecDeque::from([
1577                ChatOutput::with_usage(
1578                    LlmResponse::Message("first".into()),
1579                    TokenUsage {
1580                        input_tokens: 100,
1581                        output_tokens: 50,
1582                    },
1583                ),
1584                ChatOutput::with_usage(
1585                    LlmResponse::Message("after-new".into()),
1586                    TokenUsage {
1587                        input_tokens: 7,
1588                        output_tokens: 3,
1589                    },
1590                ),
1591            ])),
1592        });
1593        let app = AppBuilder::new()
1594            .with_config(cfg)
1595            .with_cwd(dir.path())
1596            .with_builtin_tools()
1597            .with_llm(llm)
1598            .build()
1599            .await
1600            .expect("build");
1601
1602        let _: Vec<UiEvent> = app
1603            .send_user_message(UserMessage::text("hi"))
1604            .collect()
1605            .await;
1606        app.new_session().await.expect("new session");
1607        let events: Vec<UiEvent> = app
1608            .send_user_message(UserMessage::text("after new"))
1609            .collect()
1610            .await;
1611        let stats = events
1612            .iter()
1613            .find_map(|event| match event {
1614                UiEvent::TurnStats {
1615                    input_tokens,
1616                    output_tokens,
1617                    cumulative_input,
1618                    cumulative_output,
1619                    ..
1620                } => Some((
1621                    *input_tokens,
1622                    *output_tokens,
1623                    *cumulative_input,
1624                    *cumulative_output,
1625                )),
1626                _ => None,
1627            })
1628            .expect("turn had no TurnStats");
1629        assert_eq!(stats, (7, 3, 7, 3));
1630    }
1631
1632    #[tokio::test]
1633    async fn with_headless_permissions_builds_an_app() {
1634        let dir = tempfile::tempdir().expect("tempdir");
1635        let mut config = Config::default();
1636        config.anthropic.api_key = Some("sk-unused".into());
1637        let app = AppBuilder::new()
1638            .with_config(config)
1639            .with_cwd(dir.path())
1640            .with_builtin_tools()
1641            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1642            .with_headless_permissions()
1643            .build()
1644            .await
1645            .expect("build");
1646        // Build succeeds and yields a usable session id.
1647        assert!(!app.session_id().is_empty());
1648    }
1649
1650    #[tokio::test]
1651    async fn new_session_swaps_in_a_fresh_empty_session() {
1652        use futures::StreamExt;
1653        let dir = tempfile::tempdir().expect("tempdir");
1654        let mut config = Config::default();
1655        config.anthropic.api_key = Some("sk-unused".into());
1656        let app = AppBuilder::new()
1657            .with_config(config)
1658            .with_cwd(dir.path())
1659            .with_builtin_tools()
1660            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1661            .build()
1662            .await
1663            .expect("build");
1664
1665        let _: Vec<_> = app
1666            .send_user_message(UserMessage::text("hello"))
1667            .collect()
1668            .await;
1669        let id_before = app.session_id();
1670        assert!(!app.session_history().await.expect("history").is_empty());
1671
1672        app.new_session().await.expect("new_session");
1673
1674        assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1675        assert!(
1676            app.session_history().await.expect("history").is_empty(),
1677            "fresh session has no history"
1678        );
1679    }
1680
1681    #[tokio::test]
1682    async fn load_session_restores_a_stored_session_by_id() {
1683        use futures::StreamExt;
1684        let dir = tempfile::tempdir().expect("tempdir");
1685        let store_dir = dir.path().join("sessions");
1686        let store: Arc<dyn SessionStore> =
1687            Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1688        let mut config = Config::default();
1689        config.anthropic.api_key = Some("sk-unused".into());
1690        let app = AppBuilder::new()
1691            .with_config(config)
1692            .with_cwd(dir.path())
1693            .with_builtin_tools()
1694            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1695            .with_session_store(Arc::clone(&store))
1696            .build()
1697            .await
1698            .expect("build");
1699
1700        let _: Vec<_> = app
1701            .send_user_message(UserMessage::text("remember this"))
1702            .collect()
1703            .await;
1704        let original_id = app.session_id();
1705
1706        app.new_session().await.expect("new_session");
1707        assert_ne!(app.session_id(), original_id);
1708
1709        app.load_session(&original_id).await.expect("load_session");
1710        assert_eq!(app.session_id(), original_id);
1711        let history = app.session_history().await.expect("history");
1712        assert!(
1713            history.iter().any(|m| m.text().contains("remember this")),
1714            "loaded session should carry the original turn"
1715        );
1716    }
1717
1718    #[tokio::test]
1719    async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
1720        use futures::StreamExt;
1721        let dir = tempfile::tempdir().expect("tempdir");
1722        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1723            dir.path().join("s"),
1724        ));
1725        let mut config = Config::default();
1726        config.anthropic.api_key = Some("sk-unused".into());
1727        let app = AppBuilder::new()
1728            .with_config(config)
1729            .with_cwd(dir.path())
1730            .with_builtin_tools()
1731            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1732            .with_session_store(store)
1733            .build()
1734            .await
1735            .expect("build");
1736
1737        let _: Vec<_> = app
1738            .send_user_message(UserMessage::text("hello"))
1739            .collect()
1740            .await;
1741        let original_id = app.session_id();
1742
1743        let new_id = app.clone_session().await.expect("clone_session");
1744
1745        // The clone has a distinct id, and the live session switched to it.
1746        assert_ne!(new_id, original_id);
1747        assert_eq!(app.session_id(), new_id);
1748        // The copy carries the same conversation.
1749        let history = app.session_history().await.expect("history");
1750        assert!(history.iter().any(|m| m.text().contains("hello")));
1751    }
1752
1753    #[tokio::test]
1754    async fn fork_from_creates_a_branch_off_an_earlier_entry() {
1755        use futures::StreamExt;
1756        let dir = tempfile::tempdir().expect("tempdir");
1757        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1758            dir.path().join("s"),
1759        ));
1760        let mut config = Config::default();
1761        config.anthropic.api_key = Some("sk-unused".into());
1762        let app = AppBuilder::new()
1763            .with_config(config)
1764            .with_cwd(dir.path())
1765            .with_builtin_tools()
1766            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1767            .with_session_store(store)
1768            .build()
1769            .await
1770            .expect("build");
1771
1772        // Two linear turns.
1773        let _: Vec<_> = app
1774            .send_user_message(UserMessage::text("first"))
1775            .collect()
1776            .await;
1777        let _: Vec<_> = app
1778            .send_user_message(UserMessage::text("second"))
1779            .collect()
1780            .await;
1781
1782        // The EntryId of the first user message.
1783        let entries = app.session.load_full().entries().await.expect("entries");
1784        let first_id = entries
1785            .iter()
1786            .find_map(|stored| {
1787                let msg = stored.entry.as_message()?;
1788                (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
1789                    .then(|| stored.id.clone())
1790            })
1791            .expect("first user message present");
1792
1793        // Fork from it.
1794        let _: Vec<_> = app
1795            .fork_from(first_id, UserMessage::text("branched"))
1796            .collect()
1797            .await;
1798
1799        let history = app.session_history().await.expect("history");
1800        let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
1801        assert!(
1802            texts.iter().any(|t| t.contains("first")),
1803            "fork keeps the fork-point ancestor"
1804        );
1805        assert!(
1806            texts.iter().any(|t| t.contains("branched")),
1807            "fork includes the new message"
1808        );
1809        assert!(
1810            !texts.iter().any(|t| t.contains("second")),
1811            "fork excludes the abandoned branch"
1812        );
1813    }
1814
1815    #[tokio::test]
1816    async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
1817        use futures::StreamExt;
1818        let dir = tempfile::tempdir().expect("tempdir");
1819        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1820            dir.path().join("s"),
1821        ));
1822        let mut config = Config::default();
1823        config.anthropic.api_key = Some("sk-unused".into());
1824        let app = AppBuilder::new()
1825            .with_config(config)
1826            .with_cwd(dir.path())
1827            .with_builtin_tools()
1828            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1829            .with_session_store(store)
1830            .build()
1831            .await
1832            .expect("build");
1833
1834        let _: Vec<_> = app
1835            .send_user_message(UserMessage::text("alpha"))
1836            .collect()
1837            .await;
1838        let _: Vec<_> = app
1839            .send_user_message(UserMessage::text("bravo"))
1840            .collect()
1841            .await;
1842
1843        let candidates = app.fork_candidates().await.expect("candidates");
1844        let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
1845        // Newest first.
1846        assert!(previews[0].contains("bravo"), "got {previews:?}");
1847        assert!(previews.iter().any(|p| p.contains("alpha")));
1848        // Every id is non-empty and distinct.
1849        assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
1850    }
1851
1852    #[tokio::test]
1853    async fn branches_returns_a_tree_for_a_linear_session() {
1854        use futures::StreamExt;
1855        let dir = tempfile::tempdir().expect("tempdir");
1856        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1857            dir.path().join("s"),
1858        ));
1859        let mut config = Config::default();
1860        config.anthropic.api_key = Some("sk-unused".into());
1861        let app = AppBuilder::new()
1862            .with_config(config)
1863            .with_cwd(dir.path())
1864            .with_builtin_tools()
1865            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1866            .with_session_store(store)
1867            .build()
1868            .await
1869            .expect("build");
1870
1871        let _: Vec<_> = app
1872            .send_user_message(UserMessage::text("hello"))
1873            .collect()
1874            .await;
1875        let tree = app.branches().await.expect("branches");
1876        // A linear 1-turn session: non-empty node list, an active leaf.
1877        assert!(!tree.nodes.is_empty());
1878        assert!(tree.active_leaf.is_some());
1879    }
1880
1881    #[tokio::test]
1882    async fn reload_settings_rebuilds_session_and_resets_token_tally() {
1883        let dir = tempfile::tempdir().expect("tempdir");
1884        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1885            dir.path().join("s"),
1886        ));
1887        let mut cfg = Config::default();
1888        cfg.anthropic.api_key = Some("sk-unused".into());
1889
1890        let llm = Arc::new(UsageLlm {
1891            responses: std::sync::Mutex::new(VecDeque::from([
1892                ChatOutput::with_usage(
1893                    LlmResponse::Message("first".into()),
1894                    motosan_agent_loop::TokenUsage {
1895                        input_tokens: 100,
1896                        output_tokens: 50,
1897                    },
1898                ),
1899                ChatOutput::with_usage(
1900                    LlmResponse::Message("after-reload".into()),
1901                    motosan_agent_loop::TokenUsage {
1902                        input_tokens: 5,
1903                        output_tokens: 2,
1904                    },
1905                ),
1906            ])),
1907        });
1908        let app = AppBuilder::new()
1909            .with_config(cfg)
1910            .with_cwd(dir.path())
1911            .with_builtin_tools()
1912            .with_llm(llm)
1913            .with_session_store(store)
1914            .build()
1915            .await
1916            .expect("build");
1917
1918        let _: Vec<UiEvent> = app
1919            .send_user_message(crate::user_message::UserMessage::text("hi"))
1920            .collect()
1921            .await;
1922        {
1923            let token_tally = app.token_tally();
1924            let tally = token_tally.lock().await;
1925            assert_eq!(tally.cumulative_input, 100);
1926            assert_eq!(tally.cumulative_output, 50);
1927            assert_eq!(tally.turn_count, 1);
1928        }
1929
1930        let mut new_settings = crate::settings::Settings::default();
1931        new_settings.model.name = "claude-opus-4-7".into();
1932        new_settings.ui.footer_show_cost = false;
1933        app.reload_settings(new_settings).await.expect("reload");
1934        assert_eq!(app.factory.settings().model.name, "claude-opus-4-7");
1935        assert!(!app.factory.settings().ui.footer_show_cost);
1936        assert_eq!(app.factory.current_model(), None);
1937
1938        {
1939            let token_tally = app.token_tally();
1940            let tally = token_tally.lock().await;
1941            assert_eq!(tally.cumulative_input, 0, "tally not reset on reload");
1942            assert_eq!(tally.cumulative_output, 0);
1943            assert_eq!(tally.turn_count, 0);
1944        }
1945
1946        let _: Vec<UiEvent> = app
1947            .send_user_message(crate::user_message::UserMessage::text("after"))
1948            .collect()
1949            .await;
1950        {
1951            let token_tally = app.token_tally();
1952            let tally = token_tally.lock().await;
1953            assert_eq!(tally.cumulative_input, 5);
1954            assert_eq!(tally.cumulative_output, 2);
1955            assert_eq!(tally.turn_count, 1);
1956        }
1957    }
1958
1959    #[tokio::test]
1960    async fn switch_model_preserves_history() {
1961        use futures::StreamExt;
1962        let dir = tempfile::tempdir().expect("tempdir");
1963        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1964            dir.path().join("s"),
1965        ));
1966        let mut config = Config::default();
1967        config.anthropic.api_key = Some("sk-unused".into());
1968        let app = AppBuilder::new()
1969            .with_config(config)
1970            .with_cwd(dir.path())
1971            .with_builtin_tools()
1972            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1973            .with_session_store(store)
1974            .build()
1975            .await
1976            .expect("build");
1977
1978        let _: Vec<_> = app
1979            .send_user_message(UserMessage::text("keep me"))
1980            .collect()
1981            .await;
1982        let id_before = app.session_id();
1983
1984        app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
1985            .await
1986            .expect("switch_model");
1987
1988        assert_eq!(
1989            app.session_id(),
1990            id_before,
1991            "switch_model keeps the same session"
1992        );
1993        let history = app.session_history().await.expect("history");
1994        assert!(history.iter().any(|m| m.text().contains("keep me")));
1995    }
1996
1997    #[tokio::test]
1998    async fn switch_model_is_sticky_for_future_session_rebuilds() {
1999        let dir = tempfile::tempdir().expect("tempdir");
2000        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2001            dir.path().join("s"),
2002        ));
2003        let mut config = Config::default();
2004        config.anthropic.api_key = Some("sk-unused".into());
2005        let app = AppBuilder::new()
2006            .with_config(config)
2007            .with_cwd(dir.path())
2008            .with_builtin_tools()
2009            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2010            .with_session_store(store)
2011            .build()
2012            .await
2013            .expect("build");
2014
2015        let selected = crate::model::ModelId::from("claude-opus-4-7");
2016        app.switch_model(&selected).await.expect("switch_model");
2017        app.new_session().await.expect("new_session");
2018
2019        assert_eq!(app.factory.current_model(), Some(selected.clone()));
2020        assert_eq!(app.settings().model.name, selected.to_string());
2021    }
2022
2023    struct SleepThenDoneLlm {
2024        turn: AtomicUsize,
2025    }
2026
2027    #[async_trait]
2028    impl LlmClient for SleepThenDoneLlm {
2029        async fn chat(
2030            &self,
2031            _messages: &[Message],
2032            _tools: &[ToolDef],
2033        ) -> motosan_agent_loop::Result<ChatOutput> {
2034            let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2035            if turn == 0 {
2036                Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2037                    ToolCallItem {
2038                        id: "sleep".into(),
2039                        name: "bash".into(),
2040                        args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
2041                    },
2042                ])))
2043            } else {
2044                Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2045            }
2046        }
2047    }
2048
2049    #[tokio::test]
2050    async fn cancel_reaches_builtin_tools_after_session_rebuild() {
2051        use futures::StreamExt;
2052        let dir = tempfile::tempdir().expect("tempdir");
2053        let mut config = Config::default();
2054        config.anthropic.api_key = Some("sk-unused".into());
2055        let app = Arc::new(
2056            AppBuilder::new()
2057                .with_config(config)
2058                .with_cwd(dir.path())
2059                .with_builtin_tools()
2060                .with_llm(Arc::new(SleepThenDoneLlm {
2061                    turn: AtomicUsize::new(0),
2062                }) as Arc<dyn LlmClient>)
2063                .build()
2064                .await
2065                .expect("build"),
2066        );
2067
2068        app.new_session().await.expect("new_session");
2069        let running_app = Arc::clone(&app);
2070        let handle = tokio::spawn(async move {
2071            running_app
2072                .send_user_message(UserMessage::text("run a slow command"))
2073                .collect::<Vec<_>>()
2074                .await
2075        });
2076        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2077        app.cancel();
2078
2079        let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
2080            .await
2081            .expect("turn should finish after cancellation")
2082            .expect("join");
2083        assert!(
2084            events.iter().any(|event| {
2085                matches!(
2086                    event,
2087                    UiEvent::ToolCallCompleted { result, .. }
2088                        if result.text.contains("command cancelled by user")
2089                )
2090            }),
2091            "cancel should reach the rebuilt bash tool: {events:?}"
2092        );
2093    }
2094
2095    #[tokio::test]
2096    async fn compact_summarizes_a_session_with_enough_history() {
2097        struct DoneLlm;
2098        #[async_trait]
2099        impl LlmClient for DoneLlm {
2100            async fn chat(
2101                &self,
2102                _messages: &[Message],
2103                _tools: &[ToolDef],
2104            ) -> motosan_agent_loop::Result<ChatOutput> {
2105                Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2106            }
2107        }
2108
2109        let dir = tempfile::tempdir().expect("tempdir");
2110        let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
2111            dir.path().join("sessions"),
2112        ));
2113        let mut config = Config::default();
2114        config.anthropic.api_key = Some("sk-unused".into());
2115        let app = AppBuilder::new()
2116            .with_config(config)
2117            .with_cwd(dir.path())
2118            .with_builtin_tools()
2119            .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
2120            .with_session_store(store)
2121            .build()
2122            .await
2123            .expect("build");
2124
2125        // Run 4 user turns. `App::compact()` uses ThresholdStrategy with the
2126        // default keep_turns (3); `select_cutoff` only returns a cutoff once
2127        // there are >= keep_turns user messages — so a 1-turn session would
2128        // make compact() a no-op (Ok, but nothing summarized). 4 turns
2129        // guarantees the real compaction path (and the DoneLlm summarizer
2130        // call) is exercised.
2131        for i in 0..4 {
2132            let _: Vec<_> = app
2133                .send_user_message(UserMessage::text(format!("turn {i}")))
2134                .collect()
2135                .await;
2136        }
2137
2138        app.compact().await.expect("compact should succeed");
2139
2140        // The compaction marker is appended as a session entry — history
2141        // after compaction is shorter than the raw 4-turn transcript.
2142        let history = app.session_history().await.expect("history");
2143        assert!(
2144            !history.is_empty(),
2145            "session should still have content post-compaction"
2146        );
2147    }
2148
2149    #[test]
2150    fn anthropic_env_api_key_overrides_auth_json_key() {
2151        let mut auth = crate::auth::Auth::default();
2152        auth.0.insert(
2153            "anthropic".into(),
2154            crate::auth::ProviderAuth::ApiKey {
2155                key: "sk-auth".into(),
2156            },
2157        );
2158
2159        let key = anthropic_api_key_from(&auth, |name| {
2160            (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
2161        });
2162        assert_eq!(key.as_deref(), Some("sk-env"));
2163    }
2164
2165    #[tokio::test]
2166    async fn with_settings_overrides_deprecated_config_model() {
2167        use crate::settings::Settings;
2168
2169        let mut config = Config::default();
2170        config.model.name = "from-config".into();
2171        config.anthropic.api_key = Some("sk-config".into());
2172
2173        let mut settings = Settings::default();
2174        settings.model.name = "from-settings".into();
2175
2176        let tmp = tempfile::tempdir().unwrap();
2177        let app = AppBuilder::new()
2178            .with_config(config)
2179            .with_settings(settings)
2180            .with_cwd(tmp.path())
2181            .disable_context_discovery()
2182            .with_llm(Arc::new(EchoLlm))
2183            .build()
2184            .await
2185            .expect("build");
2186        assert_eq!(app.config().model.name, "from-settings");
2187        assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
2188    }
2189
2190    #[tokio::test]
2191    async fn with_settings_synthesises_legacy_config_for_build() {
2192        use crate::auth::{Auth, ProviderAuth};
2193        use crate::settings::Settings;
2194
2195        let mut settings = Settings::default();
2196        settings.model.name = "claude-sonnet-4-6".into();
2197
2198        let mut auth = Auth::default();
2199        auth.0.insert(
2200            "anthropic".into(),
2201            ProviderAuth::ApiKey {
2202                key: "sk-test".into(),
2203            },
2204        );
2205
2206        let tmp = tempfile::tempdir().unwrap();
2207        let app = AppBuilder::new()
2208            .with_settings(settings)
2209            .with_auth(auth)
2210            .with_cwd(tmp.path())
2211            .with_builtin_tools()
2212            .disable_context_discovery()
2213            .with_llm(Arc::new(EchoLlm))
2214            .build()
2215            .await
2216            .expect("build");
2217        let _ = app;
2218    }
2219
2220    #[tokio::test]
2221    async fn cancel_before_turn_does_not_poison_future_turns() {
2222        let dir = tempfile::tempdir().unwrap();
2223        let mut cfg = Config::default();
2224        cfg.anthropic.api_key = Some("sk-unused".into());
2225        let app = AppBuilder::new()
2226            .with_config(cfg)
2227            .with_cwd(dir.path())
2228            .with_builtin_tools()
2229            .with_llm(std::sync::Arc::new(EchoLlm))
2230            .build()
2231            .await
2232            .expect("build");
2233
2234        app.cancel();
2235        let events: Vec<UiEvent> = app
2236            .send_user_message(UserMessage::text("x"))
2237            .collect()
2238            .await;
2239
2240        assert!(
2241            events
2242                .iter()
2243                .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
2244            "turn should use a fresh cancellation token: {events:?}"
2245        );
2246    }
2247
2248    #[test]
2249    fn map_event_matches_started_and_completed_ids_by_tool_name() {
2250        let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2251
2252        let started_bash = map_event(
2253            AgentEvent::Core(CoreEvent::ToolStarted {
2254                name: "bash".into(),
2255            }),
2256            &tracker,
2257        );
2258        let started_read = map_event(
2259            AgentEvent::Core(CoreEvent::ToolStarted {
2260                name: "read".into(),
2261            }),
2262            &tracker,
2263        );
2264        let completed_bash = map_event(
2265            AgentEvent::Core(CoreEvent::ToolCompleted {
2266                name: "bash".into(),
2267                result: motosan_agent_tool::ToolResult::text("ok"),
2268            }),
2269            &tracker,
2270        );
2271        let completed_read = map_event(
2272            AgentEvent::Core(CoreEvent::ToolCompleted {
2273                name: "read".into(),
2274                result: motosan_agent_tool::ToolResult::text("ok"),
2275            }),
2276            &tracker,
2277        );
2278
2279        assert!(matches!(
2280            started_bash,
2281            Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
2282        ));
2283        assert!(matches!(
2284            started_read,
2285            Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
2286        ));
2287        assert!(matches!(
2288            completed_bash,
2289            Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
2290        ));
2291        assert!(matches!(
2292            completed_read,
2293            Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
2294        ));
2295    }
2296
2297    #[test]
2298    fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
2299        let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2300        let s1 = map_event(
2301            AgentEvent::Core(CoreEvent::ToolStarted {
2302                name: "bash".into(),
2303            }),
2304            &tracker,
2305        );
2306        let s2 = map_event(
2307            AgentEvent::Core(CoreEvent::ToolStarted {
2308                name: "bash".into(),
2309            }),
2310            &tracker,
2311        );
2312        let c1 = map_event(
2313            AgentEvent::Core(CoreEvent::ToolCompleted {
2314                name: "bash".into(),
2315                result: motosan_agent_tool::ToolResult::text("a"),
2316            }),
2317            &tracker,
2318        );
2319        let c2 = map_event(
2320            AgentEvent::Core(CoreEvent::ToolCompleted {
2321                name: "bash".into(),
2322                result: motosan_agent_tool::ToolResult::text("b"),
2323            }),
2324            &tracker,
2325        );
2326
2327        let id_s1 = match s1 {
2328            Some(UiEvent::ToolCallStarted { id, .. }) => id,
2329            other => panic!("{other:?}"),
2330        };
2331        let id_s2 = match s2 {
2332            Some(UiEvent::ToolCallStarted { id, .. }) => id,
2333            other => panic!("{other:?}"),
2334        };
2335        let id_c1 = match c1 {
2336            Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2337            other => panic!("{other:?}"),
2338        };
2339        let id_c2 = match c2 {
2340            Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2341            other => panic!("{other:?}"),
2342        };
2343
2344        assert_eq!(id_s1, id_c1);
2345        assert_eq!(id_s2, id_c2);
2346        assert_ne!(id_s1, id_s2);
2347    }
2348
2349    #[tokio::test]
2350    async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
2351        let dir = tempfile::tempdir().unwrap();
2352        let mut cfg = Config::default();
2353        cfg.anthropic.api_key = Some("sk-unused".into());
2354        let app = AppBuilder::new()
2355            .with_config(cfg)
2356            .with_cwd(dir.path())
2357            .with_builtin_tools()
2358            .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2359                turn: AtomicUsize::new(0),
2360            }))
2361            .build()
2362            .await
2363            .expect("build");
2364
2365        let mut first = Box::pin(app.send_user_message(UserMessage::text("first")));
2366        let first_event = first.next().await;
2367        assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2368
2369        let second_events: Vec<UiEvent> = app
2370            .send_user_message(UserMessage::text("second"))
2371            .collect()
2372            .await;
2373        assert_eq!(
2374            second_events.len(),
2375            1,
2376            "expected immediate single error event, got: {second_events:?}"
2377        );
2378        assert!(matches!(
2379            &second_events[0],
2380            UiEvent::Error(msg) if msg.contains("single-turn-per-App")
2381        ));
2382    }
2383
2384    #[tokio::test]
2385    async fn attachment_error_is_emitted_before_the_single_turn_guard_fires() {
2386        // Mirror of the concurrent_send_user_message... test above, but the
2387        // second call carries a bad attachment. The single-turn guard would
2388        // emit UiEvent::Error("...single-turn-per-App"); we expect the EARLIER
2389        // prepare_user_message step to short-circuit with AttachmentError
2390        // instead. See spec §3 ("prepare runs before guard") and §7.1.
2391        let dir = tempfile::tempdir().unwrap();
2392        let mut cfg = Config::default();
2393        cfg.anthropic.api_key = Some("sk-unused".into());
2394        let app = AppBuilder::new()
2395            .with_config(cfg)
2396            .with_cwd(dir.path())
2397            .with_builtin_tools()
2398            .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2399                turn: AtomicUsize::new(0),
2400            }))
2401            .build()
2402            .await
2403            .expect("build");
2404
2405        // Park the first turn (claims the single-turn lock).
2406        let mut first =
2407            Box::pin(app.send_user_message(crate::user_message::UserMessage::text("first")));
2408        let first_event = first.next().await;
2409        assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2410
2411        // Fire a second call with a bad attachment.
2412        let bad = crate::user_message::UserMessage {
2413            text: "second".into(),
2414            attachments: vec![crate::user_message::Attachment::Image {
2415                path: std::path::PathBuf::from("/tmp/this-file-must-not-exist-capo-v06-test.png"),
2416            }],
2417        };
2418        let second_events: Vec<UiEvent> = app.send_user_message(bad).collect().await;
2419
2420        assert_eq!(
2421            second_events.len(),
2422            1,
2423            "expected exactly one event (the attachment error); got: {second_events:?}"
2424        );
2425        assert!(
2426            matches!(
2427                &second_events[0],
2428                UiEvent::AttachmentError {
2429                    kind: crate::user_message::AttachmentErrorKind::NotFound,
2430                    ..
2431                }
2432            ),
2433            "expected AttachmentError::NotFound as first event; got {second_events:?}"
2434        );
2435    }
2436
2437    #[test]
2438    fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
2439        let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2440        assert_eq!(progress_event_id(&tracker), "tool_unknown");
2441
2442        let only = map_event(
2443            AgentEvent::Core(CoreEvent::ToolStarted {
2444                name: "bash".into(),
2445            }),
2446            &tracker,
2447        );
2448        let only_id = match only {
2449            Some(UiEvent::ToolCallStarted { id, .. }) => id,
2450            other => panic!("{other:?}"),
2451        };
2452        assert_eq!(progress_event_id(&tracker), only_id);
2453
2454        let _second = map_event(
2455            AgentEvent::Core(CoreEvent::ToolStarted {
2456                name: "read".into(),
2457            }),
2458            &tracker,
2459        );
2460        assert_eq!(progress_event_id(&tracker), "tool_unknown");
2461    }
2462
2463    #[tokio::test]
2464    async fn builder_rejects_builtin_and_custom_tools_together() {
2465        let mut cfg = Config::default();
2466        cfg.anthropic.api_key = Some("sk-unused".into());
2467        let dir = tempfile::tempdir().unwrap();
2468        let err = match AppBuilder::new()
2469            .with_config(cfg)
2470            .with_cwd(dir.path())
2471            .with_builtin_tools()
2472            .with_custom_tools_factory(|_| Vec::new())
2473            .build()
2474            .await
2475        {
2476            Ok(_) => panic!("must reject conflicting tool configuration"),
2477            Err(err) => err,
2478        };
2479
2480        assert!(format!("{err}").contains("mutually exclusive"));
2481    }
2482
2483    /// M3 Phase A smoke: two turns share history when a SessionStore is wired.
2484    #[tokio::test]
2485    async fn two_turns_in_same_session_share_history() {
2486        #[derive(Default)]
2487        struct CounterLlm {
2488            turn: AtomicUsize,
2489        }
2490        #[async_trait]
2491        impl LlmClient for CounterLlm {
2492            async fn chat(
2493                &self,
2494                messages: &[Message],
2495                _tools: &[ToolDef],
2496            ) -> motosan_agent_loop::Result<ChatOutput> {
2497                let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2498                let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
2499                Ok(ChatOutput::new(LlmResponse::Message(answer)))
2500            }
2501        }
2502
2503        let tmp = tempfile::tempdir().unwrap();
2504        let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
2505            tmp.path().to_path_buf(),
2506        ));
2507
2508        let app = AppBuilder::new()
2509            .with_settings(crate::settings::Settings::default())
2510            .with_auth(crate::auth::Auth::default())
2511            .with_cwd(tmp.path())
2512            .with_builtin_tools()
2513            .disable_context_discovery()
2514            .with_llm(std::sync::Arc::new(CounterLlm::default()))
2515            .with_session_store(store)
2516            .build_with_session(None)
2517            .await
2518            .expect("build");
2519
2520        let _events1: Vec<UiEvent> = app
2521            .send_user_message(UserMessage::text("hi"))
2522            .collect()
2523            .await;
2524        let events2: Vec<UiEvent> = app
2525            .send_user_message(UserMessage::text("again"))
2526            .collect()
2527            .await;
2528
2529        // Turn 2's LLM saw turn 1's user message + turn 1's assistant + turn 2's new user.
2530        let saw_more_than_one = events2.iter().any(|e| {
2531            matches!(
2532                e,
2533                UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
2534            )
2535        });
2536        assert!(
2537            saw_more_than_one,
2538            "second turn should have seen history; events: {events2:?}"
2539        );
2540    }
2541}
2542
2543#[cfg(test)]
2544mod skills_builder_tests {
2545    use super::*;
2546    use crate::skills::types::{Skill, SkillSource};
2547    use std::path::PathBuf;
2548
2549    fn fixture() -> Skill {
2550        Skill {
2551            name: "x".into(),
2552            description: "d".into(),
2553            file_path: PathBuf::from("/x.md"),
2554            base_dir: PathBuf::from("/"),
2555            disable_model_invocation: false,
2556            source: SkillSource::Global,
2557        }
2558    }
2559
2560    #[test]
2561    fn with_skills_stores_skills() {
2562        let b = AppBuilder::new().with_skills(vec![fixture()]);
2563        assert_eq!(b.skills.len(), 1);
2564        assert_eq!(b.skills[0].name, "x");
2565    }
2566
2567    #[test]
2568    fn without_skills_clears() {
2569        let b = AppBuilder::new()
2570            .with_skills(vec![fixture()])
2571            .without_skills();
2572        assert!(b.skills.is_empty());
2573    }
2574}
2575
2576#[cfg(test)]
2577mod mcp_builder_tests {
2578    use super::*;
2579    use motosan_agent_tool::Tool;
2580
2581    // Trivial fake Tool just to verify with_extra_tools stores Arcs.
2582    struct FakeTool;
2583    impl Tool for FakeTool {
2584        fn def(&self) -> motosan_agent_tool::ToolDef {
2585            motosan_agent_tool::ToolDef {
2586                name: "fake__echo".into(),
2587                description: "test".into(),
2588                input_schema: serde_json::json!({"type": "object"}),
2589            }
2590        }
2591        fn call(
2592            &self,
2593            _args: serde_json::Value,
2594            _ctx: &motosan_agent_tool::ToolContext,
2595        ) -> std::pin::Pin<
2596            Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
2597        > {
2598            Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
2599        }
2600    }
2601
2602    #[test]
2603    fn with_extra_tools_stores_tools() {
2604        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
2605        let b = AppBuilder::new().with_extra_tools(tools);
2606        assert_eq!(b.extra_tools.len(), 1);
2607    }
2608}