Skip to main content

capo_agent/
app.rs

1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9    AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10    CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23/// How a fresh `AgentSession` should be constructed by `SessionFactory`.
24#[derive(Debug, Clone)]
25pub(crate) enum SessionMode {
26    /// A brand-new session (fresh ulid; persisted if a store is configured).
27    New,
28    /// Resume an existing session by id (requires a configured store).
29    Resume(String),
30}
31
32/// Everything needed to (re)build an `AgentSession` + its `LlmClient`.
33/// Stored on `App` so `new_session` / `load_session` / `switch_model` can
34/// rebuild without re-running `AppBuilder`. All fields are cheap to clone
35/// (owned values or `Arc`s).
36struct SharedLlm {
37    client: Arc<dyn LlmClient>,
38}
39
40impl SharedLlm {
41    fn new(client: Arc<dyn LlmClient>) -> Self {
42        Self { client }
43    }
44
45    fn client(&self) -> Arc<dyn LlmClient> {
46        Arc::clone(&self.client)
47    }
48}
49
50pub(crate) struct SessionFactory {
51    cwd: PathBuf,
52    settings: crate::settings::Settings,
53    auth: crate::auth::Auth,
54    policy: Arc<crate::permissions::Policy>,
55    session_cache: Arc<crate::permissions::SessionCache>,
56    ui_tx: Option<mpsc::Sender<UiEvent>>,
57    headless_permissions: bool,
58    permission_gate: Arc<dyn PermissionGate>,
59    /// The SHARED progress sender. Rebuilt tools must use this exact
60    /// sender so `App.progress_rx` keeps receiving — see Task 5.
61    progress_tx: mpsc::Sender<ToolProgressChunk>,
62    skills: Arc<Vec<crate::skills::Skill>>,
63    install_builtin_tools: bool,
64    extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
65    max_iterations: usize,
66    context_discovery_disabled: bool,
67    autocompact_enabled: bool,
68    session_store: Option<Arc<dyn SessionStore>>,
69    llm_override: Option<Arc<dyn LlmClient>>,
70    current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
71    cancel_token: SharedCancelToken,
72}
73
74impl SessionFactory {
75    fn current_model(&self) -> Option<crate::model::ModelId> {
76        match self.current_model.lock() {
77            Ok(guard) => guard.clone(),
78            Err(poisoned) => poisoned.into_inner().clone(),
79        }
80    }
81
82    fn set_current_model(&self, model: crate::model::ModelId) {
83        match self.current_model.lock() {
84            Ok(mut guard) => *guard = Some(model),
85            Err(poisoned) => *poisoned.into_inner() = Some(model),
86        }
87    }
88
89    /// Build a fresh `AgentSession` + its `LlmClient`. `model_override`,
90    /// when set, replaces `settings.model.name` for this build (used by
91    /// `switch_model`). The returned `LlmClient` is what `App.llm` should
92    /// be swapped to.
93    async fn build(
94        &self,
95        mode: SessionMode,
96        model_override: Option<&crate::model::ModelId>,
97    ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
98        // Apply the model override (or last switched model) onto a settings clone.
99        let effective_model = model_override.cloned().or_else(|| self.current_model());
100        let mut settings = self.settings.clone();
101        if let Some(m) = &effective_model {
102            settings.model.name = m.as_str().to_string();
103        }
104
105        let llm = if effective_model.is_none() {
106            self.llm_override.as_ref().map_or_else(
107                || build_llm_client(&settings, &self.auth),
108                |llm| Ok(Arc::clone(llm)),
109            )?
110        } else {
111            build_llm_client(&settings, &self.auth)?
112        };
113
114        // Tools — rebuilt fresh each time, sharing the SAME progress_tx so
115        // `App.progress_rx` keeps receiving from whatever session is live.
116        let tool_ctx = ToolCtx::new_with_cancel_token(
117            &self.cwd,
118            Arc::clone(&self.permission_gate),
119            self.progress_tx.clone(),
120            self.cancel_token.clone(),
121        );
122        let mut tools = if self.install_builtin_tools {
123            builtin_tools(tool_ctx.clone())
124        } else {
125            Vec::new()
126        };
127        tools.extend(self.extra_tools.iter().cloned());
128
129        let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
130        let base_prompt = build_system_prompt(&tool_names, &self.skills);
131        let system_prompt = if self.context_discovery_disabled {
132            base_prompt
133        } else {
134            let agent_dir = crate::paths::agent_dir();
135            let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
136            crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
137        };
138        let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
139
140        let mut engine_builder = Engine::builder()
141            .max_iterations(self.max_iterations)
142            .system_prompt(system_prompt)
143            .tool_context(motosan_tool_context);
144        for tool in tools {
145            engine_builder = engine_builder.tool(tool);
146        }
147        if let Some(ui_tx) = &self.ui_tx {
148            let ext = crate::permissions::PermissionExtension::new(
149                Arc::clone(&self.policy),
150                Arc::clone(&self.session_cache),
151                self.cwd.clone(),
152                ui_tx.clone(),
153            );
154            engine_builder = engine_builder.extension(Box::new(ext));
155        } else if self.headless_permissions {
156            let ext = crate::permissions::PermissionExtension::headless(
157                Arc::clone(&self.policy),
158                Arc::clone(&self.session_cache),
159                self.cwd.clone(),
160            );
161            engine_builder = engine_builder.extension(Box::new(ext));
162        }
163        if self.autocompact_enabled
164            && settings.session.compact_at_context_pct > 0.0
165            && settings.session.compact_at_context_pct < 1.0
166        {
167            let cfg = AutocompactConfig {
168                threshold: settings.session.compact_at_context_pct,
169                max_context_tokens: settings.session.max_context_tokens,
170                keep_turns: settings.session.keep_turns.max(1),
171            };
172            engine_builder = engine_builder
173                .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
174        }
175        let engine = engine_builder.build();
176
177        let session = match (&mode, &self.session_store) {
178            (SessionMode::Resume(id), Some(store)) => {
179                let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
180                    .await
181                    .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
182                let entries = s
183                    .entries()
184                    .await
185                    .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
186                crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
187                s
188            }
189            (SessionMode::Resume(_), None) => {
190                return Err(AppError::Config("resume requires a session store".into()));
191            }
192            (SessionMode::New, Some(store)) => {
193                let id = crate::session::SessionId::new();
194                AgentSession::new_with_store(
195                    id.into_string(),
196                    Arc::clone(store),
197                    engine,
198                    Arc::clone(&llm),
199                )
200            }
201            (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
202        };
203
204        Ok((session, llm))
205    }
206}
207
208pub struct App {
209    session: arc_swap::ArcSwap<AgentSession>,
210    llm: arc_swap::ArcSwap<SharedLlm>,
211    factory: SessionFactory,
212    config: Config,
213    cancel_token: SharedCancelToken,
214    progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
215    next_tool_id: Arc<Mutex<ToolCallTracker>>,
216    skills: Arc<Vec<crate::skills::Skill>>,
217    mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
218    pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
219}
220
221impl App {
222    pub fn config(&self) -> &Config {
223        &self.config
224    }
225
226    /// Request graceful cancellation of the currently-running turn (if any).
227    /// Safe to call from any task; the engine observes the token and halts
228    /// at the next safe point.
229    pub fn cancel(&self) {
230        self.cancel_token.cancel();
231    }
232
233    /// Exposes the session cache so front-ends can write "Allow for session"
234    /// decisions resolved by the user. Exposed as `Arc` so callers can drop
235    /// their reference freely.
236    pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
237        Arc::clone(&self.session_cache)
238    }
239
240    /// M3: the underlying motosan session id. Always populated; ephemeral
241    /// sessions use a synthetic id internally.
242    pub fn session_id(&self) -> String {
243        self.session.load().session_id().to_string()
244    }
245
246    /// M3: snapshot of the session's persisted message history. Used by the
247    /// binary on `--continue` / `--session` to seed the TUI transcript so
248    /// the user can see what was said in prior runs. `AgentSession::resume`
249    /// already populates this internally for the *agent* to use as context
250    /// on the next turn; this method exposes it so the *front-end* can
251    /// render it too. Returns motosan's `Vec<Message>` verbatim (callers
252    /// decide how to map roles → UI blocks; system messages are typically
253    /// dropped because they're the prompt, not transcript).
254    pub async fn session_history(
255        &self,
256    ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
257        self.session.load_full().history().await
258    }
259
260    /// Manually compact the current session's context. Forces a one-shot
261    /// compaction (a zero-threshold `ThresholdStrategy` so `should_compact`
262    /// is always true), keeping the most recent few user turns. The
263    /// compaction marker is appended to the session jsonl like an
264    /// autocompact event.
265    pub async fn compact(&self) -> Result<()> {
266        use motosan_agent_loop::ThresholdStrategy;
267        let strategy = ThresholdStrategy {
268            threshold: 0.0,
269            ..ThresholdStrategy::default()
270        };
271        let llm = self.llm.load_full().client();
272        self.session
273            .load_full()
274            .maybe_compact(&strategy, llm)
275            .await
276            .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
277        Ok(())
278    }
279
280    /// Replace the live session with a brand-new empty one. An in-flight
281    /// turn (if any) keeps its own session snapshot and is unaffected; the
282    /// next `send_user_message` uses the new session.
283    pub async fn new_session(&self) -> Result<()> {
284        let (session, llm) = self.factory.build(SessionMode::New, None).await?;
285        self.session.store(Arc::new(session));
286        self.llm.store(Arc::new(SharedLlm::new(llm)));
287        Ok(())
288    }
289
290    /// Replace the live session with a stored session loaded by id.
291    /// Errors if no session store is configured or the id is unknown.
292    ///
293    /// **Not turn-safe** — see `switch_model`'s doc. Phase D2's command
294    /// handler must gate this on no active turn.
295    pub async fn load_session(&self, id: &str) -> Result<()> {
296        let (session, llm) = self
297            .factory
298            .build(SessionMode::Resume(id.to_string()), None)
299            .await?;
300        self.session.store(Arc::new(session));
301        self.llm.store(Arc::new(SharedLlm::new(llm)));
302        Ok(())
303    }
304
305    /// Copy the live session to a brand-new, fully independent session file
306    /// and switch to the copy. Unlike `/fork` (an in-file branch), the clone
307    /// is a separate file and appears in the `/resume` picker. Returns the
308    /// new session id. Requires a session store.
309    ///
310    /// **Not turn-safe** — like `load_session`, the caller (the
311    /// `Command::CloneSession` arm in `forward_commands`) must gate this on
312    /// no active turn.
313    pub async fn clone_session(&self) -> Result<String> {
314        let Some(store) = self.factory.session_store.as_ref() else {
315            return Err(AppError::Config("clone requires a session store".into()));
316        };
317        let source_id = self.session.load().session_id().to_string();
318        let new_id = crate::session::SessionId::new().into_string();
319        let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
320        catalog
321            .fork(&source_id, &new_id)
322            .await
323            .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
324        self.load_session(&new_id).await?;
325        Ok(new_id)
326    }
327
328    /// Rebuild the live session with a different model, preserving the
329    /// conversation. Requires a session store (the current session is
330    /// resumed by id under the new model). Errors otherwise.
331    ///
332    /// **Not turn-safe.** Like `load_session`, this resumes a session by
333    /// id. If a turn is still in flight on the *old* session when this is
334    /// called, that turn keeps writing entries under the same `session_id`
335    /// while the resumed session reads from it — a write/read interleaving
336    /// that the resumed session won't observe until its next reload. The
337    /// `App` API does not expose a turn-in-progress flag, so the *caller*
338    /// (Phase D2's `/model` / `/resume` command handlers in
339    /// `forward_commands`) MUST gate these calls on no active turn — the
340    /// `active_turn` bookkeeping already in `forward_commands` is exactly
341    /// that gate. D2's plan must wire that guard; D1 documents the contract.
342    pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
343        let current_id = self.session.load().session_id().to_string();
344        let (session, llm) = self
345            .factory
346            .build(SessionMode::Resume(current_id), Some(model))
347            .await?;
348        self.factory.set_current_model(model.clone());
349        self.session.store(Arc::new(session));
350        self.llm.store(Arc::new(SharedLlm::new(llm)));
351        Ok(())
352    }
353
354    /// M4 Phase B: disconnect every registered MCP server (2s per-server
355    /// timeout, best-effort). Call from the binary's ctrl-C handler.
356    pub async fn disconnect_mcp(&self) {
357        for (name, server) in &self.mcp_servers {
358            let _ =
359                tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
360            tracing::debug!(target: "mcp", server = %name, "disconnected");
361        }
362    }
363
364    fn run_turn(
365        &self,
366        text: String,
367        fork_from: Option<motosan_agent_loop::EntryId>,
368    ) -> impl Stream<Item = UiEvent> + Send + 'static {
369        let session = self.session.load_full();
370        let skills = Arc::clone(&self.skills);
371        let cancel_token = self.cancel_token.clone();
372        let tracker = Arc::clone(&self.next_tool_id);
373        let progress = Arc::clone(&self.progress_rx);
374
375        async_stream::stream! {
376            // Single-turn guard (M2 contract preserved).
377            let mut progress_guard = match progress.try_lock() {
378                Ok(guard) => guard,
379                Err(_) => {
380                    yield UiEvent::Error(
381                        "another turn is already running; capo is single-turn-per-App".into(),
382                    );
383                    return;
384                }
385            };
386
387            // Reset cancel token for THIS turn.
388            let cancel = cancel_token.reset();
389
390            yield UiEvent::AgentTurnStarted;
391            yield UiEvent::AgentThinking;
392
393            let text = crate::skills::expand::expand_skill_command(&text, &skills);
394            let new_user = motosan_agent_loop::Message::user(&text);
395
396            // Start the turn (gives us a TurnHandle with stream + previous_len + branch_parent + ops_tx).
397            let handle = match fork_from {
398                None => {
399                    // Linear turn: load history, append, start_turn.
400                    let history = match session.history().await {
401                        Ok(h) => h,
402                        Err(err) => {
403                            yield UiEvent::Error(format!("session.history failed: {err}"));
404                            return;
405                        }
406                    };
407                    let mut messages = history;
408                    messages.push(new_user);
409                    match session.start_turn(messages).await {
410                        Ok(h) => h,
411                        Err(err) => {
412                            yield UiEvent::Error(format!("session.start_turn failed: {err}"));
413                            return;
414                        }
415                    }
416                }
417                Some(from) => {
418                    // Fork: fork_turn assembles the branch's prior history itself.
419                    match session.fork_turn(from, vec![new_user]).await {
420                        Ok(h) => h,
421                        Err(err) => {
422                            yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
423                            return;
424                        }
425                    }
426                }
427            };
428            let previous_len = handle.previous_len;
429            let epoch = handle.epoch;
430            let branch_parent = handle.branch_parent;
431            let ops_tx = handle.ops_tx.clone();
432            let mut agent_stream = handle.stream;
433
434            // Bridge our SharedCancelToken to motosan's AgentOp::Interrupt.
435            //
436            // `AgentSession::start_turn` does NOT take a CancellationToken —
437            // the engine's cancel-token path used in M2's direct `.run().cancel(tok)`
438            // is bypassed when going through AgentSession. The control plane is
439            // `ops_tx`. We spawn a tiny task that waits on `cancel.cancelled()`
440            // and forwards an `Interrupt` op when fired.
441            let interrupt_bridge = tokio::spawn(async move {
442                cancel.cancelled().await;
443                let _ = ops_tx.send(AgentOp::Interrupt).await;
444            });
445
446            // Drain events.
447            let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
448            let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
449
450            loop {
451                // Forward any progress chunks that arrived in this iteration.
452                while let Ok(chunk) = progress_guard.try_recv() {
453                    yield UiEvent::ToolCallProgress {
454                        id: progress_event_id(&tracker),
455                        chunk: ProgressChunk::from(chunk),
456                    };
457                }
458
459                tokio::select! {
460                    biased;
461                    maybe_item = agent_stream.next() => {
462                        match maybe_item {
463                            Some(AgentStreamItem::Event(ev)) => {
464                                if let Some(ui) = map_event(ev, &tracker) {
465                                    yield ui;
466                                }
467                            }
468                            Some(AgentStreamItem::Terminal(term)) => {
469                                terminal_result = Some(term.result);
470                                terminal_messages = Some(term.messages);
471                                break;
472                            }
473                            None => break,
474                        }
475                    }
476                    Some(chunk) = progress_guard.recv() => {
477                        yield UiEvent::ToolCallProgress {
478                            id: progress_event_id(&tracker),
479                            chunk: ProgressChunk::from(chunk),
480                        };
481                    }
482                }
483            }
484
485            // Tear down the interrupt bridge whether or not cancellation fired.
486            interrupt_bridge.abort();
487
488            // Persist new messages via record_turn_outcome (only when terminal reached).
489            if let Some(msgs) = terminal_messages.as_ref() {
490                if let Err(err) = session
491                    .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
492                    .await
493                {
494                    yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
495                }
496            }
497
498            // Translate the terminal Result into final UiEvents.
499            match terminal_result {
500                Some(Ok(_)) => {
501                    let final_text = terminal_messages
502                        .as_ref()
503                        .and_then(|msgs| {
504                            msgs.iter()
505                                .rev()
506                                .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
507                                .map(|m| m.text())
508                        })
509                        .unwrap_or_default();
510                    if !final_text.is_empty() {
511                        yield UiEvent::AgentMessageComplete(final_text);
512                    }
513                    // Flush any remaining progress chunks.
514                    while let Ok(chunk) = progress_guard.try_recv() {
515                        yield UiEvent::ToolCallProgress {
516                            id: progress_event_id(&tracker),
517                            chunk: ProgressChunk::from(chunk),
518                        };
519                    }
520                    yield UiEvent::AgentTurnComplete;
521                }
522                Some(Err(err)) => {
523                    yield UiEvent::Error(format!("{err}"));
524                }
525                None => { /* stream closed without terminal — cancelled */ }
526            }
527        }
528    }
529
530    pub fn send_user_message(&self, text: String) -> impl Stream<Item = UiEvent> + Send + 'static {
531        self.run_turn(text, None)
532    }
533
534    /// Continue the conversation from an earlier entry, creating a branch.
535    /// `from` is the `EntryId` to attach under (typically a past user
536    /// message from `fork_candidates`). Streams `UiEvent`s exactly as
537    /// `send_user_message` does.
538    pub fn fork_from(
539        &self,
540        from: motosan_agent_loop::EntryId,
541        message: String,
542    ) -> impl Stream<Item = UiEvent> + Send + 'static {
543        self.run_turn(message, Some(from))
544    }
545
546    /// User messages on the current (active) branch, newest first, each
547    /// paired with its `EntryId` and a one-line preview — the rows the
548    /// `/fork` picker offers.
549    pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
550        let entries = self
551            .session
552            .load_full()
553            .entries()
554            .await
555            .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
556        let branch = motosan_agent_loop::active_branch(&entries);
557        let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
558            .iter()
559            .filter_map(|stored| {
560                let msg = stored.entry.as_message()?;
561                if !matches!(msg.role(), motosan_agent_loop::Role::User) {
562                    return None;
563                }
564                let preview: String = msg
565                    .text()
566                    .lines()
567                    .next()
568                    .unwrap_or("")
569                    .chars()
570                    .take(80)
571                    .collect();
572                Some((stored.id.clone(), preview))
573            })
574            .collect();
575        out.reverse();
576        Ok(out)
577    }
578
579    /// The session log as a navigable branch tree — for the `/tree` UI.
580    pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
581        self.session
582            .load_full()
583            .branches()
584            .await
585            .map_err(|e| AppError::Config(format!("branches failed: {e}")))
586    }
587}
588
589#[derive(Debug, Default)]
590struct ToolCallTracker {
591    next_id: usize,
592    pending: VecDeque<(String, String)>,
593}
594
595impl ToolCallTracker {
596    fn start(&mut self, name: &str) -> String {
597        self.next_id += 1;
598        let id = format!("tool_{}", self.next_id);
599        self.pending.push_back((name.to_string(), id.clone()));
600        id
601    }
602
603    fn complete(&mut self, name: &str) -> String {
604        if let Some(pos) = self
605            .pending
606            .iter()
607            .position(|(pending_name, _)| pending_name == name)
608        {
609            if let Some((_, id)) = self.pending.remove(pos) {
610                return id;
611            }
612        }
613
614        self.next_id += 1;
615        format!("tool_{}", self.next_id)
616    }
617
618    // ToolProgressChunk does not carry a tool-call id. When exactly one tool is
619    // pending we can attribute progress safely; otherwise we must not guess from
620    // queue order (for example `pending.back()`), because concurrent tool calls
621    // would mislabel output.
622    fn progress_id(&self) -> Option<String> {
623        match self.pending.len() {
624            1 => self.pending.front().map(|(_, id)| id.clone()),
625            _ => None,
626        }
627    }
628}
629
630fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
631    match tracker.lock() {
632        Ok(guard) => guard,
633        Err(poisoned) => poisoned.into_inner(),
634    }
635}
636
637fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
638    lock_tool_tracker(tracker)
639        .progress_id()
640        .unwrap_or_else(|| "tool_unknown".to_string())
641}
642
643fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
644where
645    F: Fn(&str) -> Option<String>,
646{
647    env_lookup("ANTHROPIC_API_KEY")
648        .map(|key| key.trim().to_string())
649        .filter(|key| !key.is_empty())
650        .or_else(|| auth.api_key("anthropic").map(str::to_string))
651}
652
653fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
654    match ev {
655        AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
656        AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
657            let id = lock_tool_tracker(tool_tracker).start(&name);
658            Some(UiEvent::ToolCallStarted {
659                id,
660                name,
661                args: serde_json::json!({}),
662            })
663        }
664        AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
665            let id = lock_tool_tracker(tool_tracker).complete(&name);
666            Some(UiEvent::ToolCallCompleted {
667                id,
668                result: UiToolResult {
669                    is_error: result.is_error,
670                    text: format!("{name}: {result:?}"),
671                },
672            })
673        }
674        _ => None,
675    }
676}
677
678type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
679
680pub struct AppBuilder {
681    config: Option<Config>,
682    cwd: Option<PathBuf>,
683    permission_gate: Option<Arc<dyn PermissionGate>>,
684    install_builtin_tools: bool,
685    max_iterations: usize,
686    llm_override: Option<Arc<dyn LlmClient>>,
687    custom_tools_factory: Option<CustomToolsFactory>,
688    permissions_policy_path: Option<PathBuf>,
689    ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
690    headless_permissions: bool,
691    settings: Option<crate::settings::Settings>,
692    auth: Option<crate::auth::Auth>,
693    context_discovery_disabled: bool,
694    // M3 Phase A:
695    session_store: Option<Arc<dyn SessionStore>>,
696    resume_session_id: Option<crate::session::SessionId>,
697    autocompact_enabled: bool,
698    // M4 Phase A:
699    skills: Vec<crate::skills::Skill>,
700    // M4 Phase B:
701    extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
702    mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
703}
704
705impl Default for AppBuilder {
706    fn default() -> Self {
707        Self {
708            config: None,
709            cwd: None,
710            permission_gate: None,
711            install_builtin_tools: false,
712            max_iterations: 20,
713            llm_override: None,
714            custom_tools_factory: None,
715            permissions_policy_path: None,
716            ui_tx: None,
717            headless_permissions: false,
718            settings: None,
719            auth: None,
720            context_discovery_disabled: false,
721            session_store: None,
722            resume_session_id: None,
723            autocompact_enabled: false,
724            skills: Vec::new(),
725            extra_tools: Vec::new(),
726            mcp_servers: Vec::new(),
727        }
728    }
729}
730
731impl AppBuilder {
732    pub fn new() -> Self {
733        Self::default()
734    }
735
736    pub fn with_config(mut self, cfg: Config) -> Self {
737        self.config = Some(cfg);
738        self
739    }
740
741    pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
742        self.cwd = Some(cwd.into());
743        self
744    }
745
746    pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
747        self.permission_gate = Some(gate);
748        self
749    }
750
751    /// Install Capo's builtin tools (`read`, `bash`).
752    ///
753    /// This is mutually exclusive with `with_custom_tools_factory` /
754    /// `build_with_custom_tools`; `build()` returns a configuration error if
755    /// both are set.
756    pub fn with_builtin_tools(mut self) -> Self {
757        self.install_builtin_tools = true;
758        self
759    }
760
761    pub fn with_max_iterations(mut self, n: usize) -> Self {
762        self.max_iterations = n;
763        self
764    }
765
766    pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
767        self.llm_override = Some(llm);
768        self
769    }
770
771    pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
772        self.permissions_policy_path = Some(path);
773        self
774    }
775
776    pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
777        self.ui_tx = Some(tx);
778        self
779    }
780
781    /// Register `PermissionExtension` in headless-deny mode — `permissions.toml`
782    /// and read-only auto-allow still apply, but a tool call that would
783    /// otherwise prompt is denied (there is no UI to ask). Used by `--json`.
784    /// Ignored if `with_ui_channel` is also set (an interactive channel wins).
785    pub fn with_headless_permissions(mut self) -> Self {
786        self.headless_permissions = true;
787        self
788    }
789
790    /// M3: install user `Settings`. Replaces `with_config` for new code.
791    pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
792        self.settings = Some(settings);
793        self
794    }
795
796    /// M3: install `Auth` (credentials for LLM providers).
797    pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
798        self.auth = Some(auth);
799        self
800    }
801
802    /// M3: disable AGENTS.md / CLAUDE.md discovery for this App.
803    /// Opt-out — discovery is on by default in `build()`.
804    pub fn disable_context_discovery(mut self) -> Self {
805        self.context_discovery_disabled = true;
806        self
807    }
808
809    /// M3 Phase A: install a `SessionStore` (e.g. `motosan_agent_loop::FileSessionStore`)
810    /// for persistence. When omitted, sessions are ephemeral (no jsonl on disk).
811    pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
812        self.session_store = Some(store);
813        self
814    }
815
816    /// M3 Phase A: enable autocompact at the `Settings::session.compact_at_context_pct`
817    /// threshold. Requires `with_settings` to have been called; otherwise uses
818    /// `Settings::default()`. Settings provide `max_context_tokens` and
819    /// `keep_turns`. No-op when `settings.session.compact_at_context_pct == 0.0`.
820    pub fn with_autocompact(mut self) -> Self {
821        self.autocompact_enabled = true;
822        self
823    }
824
825    /// M4 Phase A: register skills. Pass an empty Vec (or omit the call,
826    /// or call `without_skills()`) to disable skill injection. Skills are
827    /// rendered into the system prompt's `<available_skills>` block when
828    /// the `read` tool is available, and matched against `/skill:<name>`
829    /// expansion before user messages reach the LLM.
830    pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
831        self.skills = skills;
832        self
833    }
834
835    pub fn without_skills(mut self) -> Self {
836        self.skills.clear();
837        self
838    }
839
840    /// M4 Phase B: register additional tools (typically MCP). Unlike
841    /// `with_custom_tools_factory`, this APPENDS to the builtin tools
842    /// and does NOT replace them.
843    pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
844        self.extra_tools = tools;
845        self
846    }
847
848    /// M4 Phase B: register MCP server handles so `App::disconnect_mcp`
849    /// can iterate them on shutdown. Storage only — does not connect.
850    pub fn with_mcp_servers(
851        mut self,
852        servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
853    ) -> Self {
854        self.mcp_servers = servers;
855        self
856    }
857
858    /// Install a custom tool set for this app.
859    ///
860    /// This is mutually exclusive with `with_builtin_tools`; `build()` returns
861    /// a configuration error if both are set.
862    pub fn with_custom_tools_factory(
863        mut self,
864        factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
865    ) -> Self {
866        self.custom_tools_factory = Some(Box::new(factory));
867        self
868    }
869
870    /// Convenience wrapper for `with_custom_tools_factory(...).build()`.
871    ///
872    /// This is mutually exclusive with `with_builtin_tools`.
873    pub async fn build_with_custom_tools(
874        self,
875        factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
876    ) -> Result<App> {
877        self.with_custom_tools_factory(factory).build().await
878    }
879
880    /// M3 Phase A: build the App with an optional resume session id.
881    ///
882    /// - `Some(id)` + `session_store: Some(_)` → `AgentSession::resume(id, store, engine, llm)`,
883    ///   then replay history into `ToolCtx.read_files`.
884    /// - `Some(id)` + `session_store: None` → error (resume requires a store).
885    /// - `None` + `session_store: Some(_)` → `AgentSession::new_with_store(fresh_id, store, engine, llm)`.
886    /// - `None` + `session_store: None` → `AgentSession::new(engine, llm)` (ephemeral).
887    pub async fn build_with_session(
888        mut self,
889        resume: Option<crate::session::SessionId>,
890    ) -> Result<App> {
891        if let Some(id) = resume {
892            if self.session_store.is_none() {
893                return Err(AppError::Config(
894                    "build_with_session(Some(id)) requires with_session_store(...)".into(),
895                ));
896            }
897            self.resume_session_id = Some(id);
898        }
899        self.build_internal().await
900    }
901
902    /// Legacy entry point; equivalent to `build_with_session(None)`.
903    pub async fn build(self) -> Result<App> {
904        self.build_with_session(None).await
905    }
906
907    async fn build_internal(mut self) -> Result<App> {
908        let mcp_servers = std::mem::take(&mut self.mcp_servers);
909        let extra_tools = std::mem::take(&mut self.extra_tools);
910        let skills = self.skills.clone();
911        if self.install_builtin_tools && self.custom_tools_factory.is_some() {
912            return Err(AppError::Config(
913                "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
914            ));
915        }
916
917        // Synthesise the legacy `Config` so the rest of `build()` keeps
918        // working without a wholesale rewrite. Settings + Auth override the
919        // deprecated `Config` when both APIs are supplied.
920        let has_config = self.config.is_some();
921        let has_auth = self.auth.is_some();
922        let mut config = self.config.unwrap_or_default();
923        let settings = match self.settings {
924            Some(settings) => settings,
925            None => {
926                let mut settings = crate::settings::Settings::default();
927                settings.model.provider = config.model.provider.clone();
928                settings.model.name = config.model.name.clone();
929                settings.model.max_tokens = config.model.max_tokens;
930                settings
931            }
932        };
933        config.model.provider = settings.model.provider.clone();
934        config.model.name = settings.model.name.clone();
935        config.model.max_tokens = settings.model.max_tokens;
936        let mut auth = self.auth.unwrap_or_default();
937        if !has_auth {
938            if let Some(key) = config.anthropic.api_key.as_deref() {
939                auth.0.insert(
940                    "anthropic".into(),
941                    crate::auth::ProviderAuth::ApiKey {
942                        key: key.to_string(),
943                    },
944                );
945            }
946        }
947        let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
948        if env_or_auth_key.is_some() || has_auth || !has_config {
949            config.anthropic.api_key = env_or_auth_key;
950        }
951        let cwd = self
952            .cwd
953            .or_else(|| std::env::current_dir().ok())
954            .unwrap_or_else(|| PathBuf::from("."));
955        let permission_gate = self.permission_gate.unwrap_or_else(|| {
956            // When no gate is provided *and* no ui channel is wired,
957            // fall back to NoOp with a warning log; when ui channel IS
958            // wired, the PermissionExtension handles the real decisions.
959            if self.ui_tx.is_some() || self.headless_permissions {
960                Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
961            } else {
962                tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
963                Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
964            }
965        });
966
967        // Permissions.
968        let policy: Arc<crate::permissions::Policy> =
969            Arc::new(match self.permissions_policy_path.as_ref() {
970                Some(path) => crate::permissions::Policy::load_or_default(path)?,
971                None => crate::permissions::Policy::default(),
972            });
973        let session_cache = Arc::new(crate::permissions::SessionCache::new());
974
975        // Shared progress channel consumed by `send_user_message`.
976        let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
977
978        // Resolve the tool set ONCE. Builtin tools, or a custom factory's
979        // output, plus MCP `extra_tools`. A FnOnce custom factory can't be
980        // re-run per session, so its output is captured here as a Vec.
981        // `probe_ctx` also yields the build-time cancel token for `App`.
982        let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
983        let cancel_token = probe_ctx.cancel_token.clone();
984        let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
985            if let Some(factory_fn) = self.custom_tools_factory.take() {
986                let mut t = factory_fn(probe_ctx);
987                t.extend(extra_tools.clone());
988                (false, t)
989            } else {
990                (self.install_builtin_tools, extra_tools.clone())
991            };
992
993        let factory = SessionFactory {
994            cwd: cwd.clone(),
995            settings: settings.clone(),
996            auth: auth.clone(),
997            policy: Arc::clone(&policy),
998            session_cache: Arc::clone(&session_cache),
999            ui_tx: self.ui_tx.clone(),
1000            headless_permissions: self.headless_permissions,
1001            permission_gate: Arc::clone(&permission_gate),
1002            progress_tx: progress_tx.clone(),
1003            skills: Arc::new(skills.clone()),
1004            install_builtin_tools: install_builtin,
1005            extra_tools: factory_extra_tools,
1006            max_iterations: self.max_iterations,
1007            context_discovery_disabled: self.context_discovery_disabled,
1008            autocompact_enabled: self.autocompact_enabled,
1009            session_store: self.session_store.clone(),
1010            llm_override: self.llm_override.clone(),
1011            current_model: Arc::new(Mutex::new(None)),
1012            cancel_token: cancel_token.clone(),
1013        };
1014
1015        let mode = match self.resume_session_id.take() {
1016            Some(id) => SessionMode::Resume(id.into_string()),
1017            None => SessionMode::New,
1018        };
1019        let (session, llm) = factory.build(mode, None).await?;
1020
1021        Ok(App {
1022            session: arc_swap::ArcSwap::from_pointee(session),
1023            llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1024            factory,
1025            config,
1026            cancel_token,
1027            progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1028            next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1029            skills: Arc::new(skills),
1030            mcp_servers,
1031            session_cache,
1032        })
1033    }
1034}
1035
1036#[cfg(test)]
1037mod tests {
1038    use super::*;
1039    use crate::config::{AnthropicConfig, ModelConfig};
1040    use crate::events::UiEvent;
1041    use async_trait::async_trait;
1042    use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1043    use motosan_agent_tool::ToolDef;
1044    use std::sync::atomic::{AtomicUsize, Ordering};
1045
1046    #[tokio::test]
1047    async fn builder_fails_without_api_key() {
1048        let cfg = Config {
1049            anthropic: AnthropicConfig {
1050                api_key: None,
1051                base_url: "https://api.anthropic.com".into(),
1052            },
1053            model: ModelConfig {
1054                provider: "anthropic".into(),
1055                name: "claude-sonnet-4-6".into(),
1056                max_tokens: 4096,
1057            },
1058        };
1059        let err = match AppBuilder::new()
1060            .with_config(cfg)
1061            .with_builtin_tools()
1062            .build()
1063            .await
1064        {
1065            Ok(_) => panic!("must fail without key"),
1066            Err(err) => err,
1067        };
1068        assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1069    }
1070
1071    struct ToolOnlyLlm {
1072        turn: AtomicUsize,
1073    }
1074
1075    #[async_trait]
1076    impl LlmClient for ToolOnlyLlm {
1077        async fn chat(
1078            &self,
1079            _messages: &[Message],
1080            _tools: &[ToolDef],
1081        ) -> motosan_agent_loop::Result<ChatOutput> {
1082            let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1083            if turn == 0 {
1084                Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1085                    ToolCallItem {
1086                        id: "t1".into(),
1087                        name: "read".into(),
1088                        args: serde_json::json!({"path":"nope.txt"}),
1089                    },
1090                ])))
1091            } else {
1092                Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1093            }
1094        }
1095    }
1096
1097    #[tokio::test]
1098    async fn empty_final_message_is_not_emitted() {
1099        let dir = tempfile::tempdir().unwrap();
1100        let mut cfg = Config::default();
1101        cfg.anthropic.api_key = Some("sk-unused".into());
1102        let app = AppBuilder::new()
1103            .with_config(cfg)
1104            .with_cwd(dir.path())
1105            .with_builtin_tools()
1106            .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1107                turn: AtomicUsize::new(0),
1108            }))
1109            .build()
1110            .await
1111            .expect("build");
1112        let events: Vec<UiEvent> =
1113            futures::StreamExt::collect(app.send_user_message("x".into())).await;
1114        let empties = events
1115            .iter()
1116            .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1117            .count();
1118        assert_eq!(
1119            empties, 0,
1120            "should not emit empty final message, got: {events:?}"
1121        );
1122    }
1123
1124    struct EchoLlm;
1125
1126    #[async_trait]
1127    impl LlmClient for EchoLlm {
1128        async fn chat(
1129            &self,
1130            _messages: &[Message],
1131            _tools: &[ToolDef],
1132        ) -> motosan_agent_loop::Result<ChatOutput> {
1133            Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1134        }
1135    }
1136
1137    #[tokio::test]
1138    async fn with_headless_permissions_builds_an_app() {
1139        let dir = tempfile::tempdir().expect("tempdir");
1140        let mut config = Config::default();
1141        config.anthropic.api_key = Some("sk-unused".into());
1142        let app = AppBuilder::new()
1143            .with_config(config)
1144            .with_cwd(dir.path())
1145            .with_builtin_tools()
1146            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1147            .with_headless_permissions()
1148            .build()
1149            .await
1150            .expect("build");
1151        // Build succeeds and yields a usable session id.
1152        assert!(!app.session_id().is_empty());
1153    }
1154
1155    #[tokio::test]
1156    async fn new_session_swaps_in_a_fresh_empty_session() {
1157        use futures::StreamExt;
1158        let dir = tempfile::tempdir().expect("tempdir");
1159        let mut config = Config::default();
1160        config.anthropic.api_key = Some("sk-unused".into());
1161        let app = AppBuilder::new()
1162            .with_config(config)
1163            .with_cwd(dir.path())
1164            .with_builtin_tools()
1165            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1166            .build()
1167            .await
1168            .expect("build");
1169
1170        let _: Vec<_> = app.send_user_message("hello".into()).collect().await;
1171        let id_before = app.session_id();
1172        assert!(!app.session_history().await.expect("history").is_empty());
1173
1174        app.new_session().await.expect("new_session");
1175
1176        assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1177        assert!(
1178            app.session_history().await.expect("history").is_empty(),
1179            "fresh session has no history"
1180        );
1181    }
1182
1183    #[tokio::test]
1184    async fn load_session_restores_a_stored_session_by_id() {
1185        use futures::StreamExt;
1186        let dir = tempfile::tempdir().expect("tempdir");
1187        let store_dir = dir.path().join("sessions");
1188        let store: Arc<dyn SessionStore> =
1189            Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1190        let mut config = Config::default();
1191        config.anthropic.api_key = Some("sk-unused".into());
1192        let app = AppBuilder::new()
1193            .with_config(config)
1194            .with_cwd(dir.path())
1195            .with_builtin_tools()
1196            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1197            .with_session_store(Arc::clone(&store))
1198            .build()
1199            .await
1200            .expect("build");
1201
1202        let _: Vec<_> = app
1203            .send_user_message("remember this".into())
1204            .collect()
1205            .await;
1206        let original_id = app.session_id();
1207
1208        app.new_session().await.expect("new_session");
1209        assert_ne!(app.session_id(), original_id);
1210
1211        app.load_session(&original_id).await.expect("load_session");
1212        assert_eq!(app.session_id(), original_id);
1213        let history = app.session_history().await.expect("history");
1214        assert!(
1215            history.iter().any(|m| m.text().contains("remember this")),
1216            "loaded session should carry the original turn"
1217        );
1218    }
1219
1220    #[tokio::test]
1221    async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
1222        use futures::StreamExt;
1223        let dir = tempfile::tempdir().expect("tempdir");
1224        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1225            dir.path().join("s"),
1226        ));
1227        let mut config = Config::default();
1228        config.anthropic.api_key = Some("sk-unused".into());
1229        let app = AppBuilder::new()
1230            .with_config(config)
1231            .with_cwd(dir.path())
1232            .with_builtin_tools()
1233            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1234            .with_session_store(store)
1235            .build()
1236            .await
1237            .expect("build");
1238
1239        let _: Vec<_> = app.send_user_message("hello".into()).collect().await;
1240        let original_id = app.session_id();
1241
1242        let new_id = app.clone_session().await.expect("clone_session");
1243
1244        // The clone has a distinct id, and the live session switched to it.
1245        assert_ne!(new_id, original_id);
1246        assert_eq!(app.session_id(), new_id);
1247        // The copy carries the same conversation.
1248        let history = app.session_history().await.expect("history");
1249        assert!(history.iter().any(|m| m.text().contains("hello")));
1250    }
1251
1252    #[tokio::test]
1253    async fn fork_from_creates_a_branch_off_an_earlier_entry() {
1254        use futures::StreamExt;
1255        let dir = tempfile::tempdir().expect("tempdir");
1256        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1257            dir.path().join("s"),
1258        ));
1259        let mut config = Config::default();
1260        config.anthropic.api_key = Some("sk-unused".into());
1261        let app = AppBuilder::new()
1262            .with_config(config)
1263            .with_cwd(dir.path())
1264            .with_builtin_tools()
1265            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1266            .with_session_store(store)
1267            .build()
1268            .await
1269            .expect("build");
1270
1271        // Two linear turns.
1272        let _: Vec<_> = app.send_user_message("first".into()).collect().await;
1273        let _: Vec<_> = app.send_user_message("second".into()).collect().await;
1274
1275        // The EntryId of the first user message.
1276        let entries = app.session.load_full().entries().await.expect("entries");
1277        let first_id = entries
1278            .iter()
1279            .find_map(|stored| {
1280                let msg = stored.entry.as_message()?;
1281                (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
1282                    .then(|| stored.id.clone())
1283            })
1284            .expect("first user message present");
1285
1286        // Fork from it.
1287        let _: Vec<_> = app.fork_from(first_id, "branched".into()).collect().await;
1288
1289        let history = app.session_history().await.expect("history");
1290        let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
1291        assert!(
1292            texts.iter().any(|t| t.contains("first")),
1293            "fork keeps the fork-point ancestor"
1294        );
1295        assert!(
1296            texts.iter().any(|t| t.contains("branched")),
1297            "fork includes the new message"
1298        );
1299        assert!(
1300            !texts.iter().any(|t| t.contains("second")),
1301            "fork excludes the abandoned branch"
1302        );
1303    }
1304
1305    #[tokio::test]
1306    async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
1307        use futures::StreamExt;
1308        let dir = tempfile::tempdir().expect("tempdir");
1309        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1310            dir.path().join("s"),
1311        ));
1312        let mut config = Config::default();
1313        config.anthropic.api_key = Some("sk-unused".into());
1314        let app = AppBuilder::new()
1315            .with_config(config)
1316            .with_cwd(dir.path())
1317            .with_builtin_tools()
1318            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1319            .with_session_store(store)
1320            .build()
1321            .await
1322            .expect("build");
1323
1324        let _: Vec<_> = app.send_user_message("alpha".into()).collect().await;
1325        let _: Vec<_> = app.send_user_message("bravo".into()).collect().await;
1326
1327        let candidates = app.fork_candidates().await.expect("candidates");
1328        let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
1329        // Newest first.
1330        assert!(previews[0].contains("bravo"), "got {previews:?}");
1331        assert!(previews.iter().any(|p| p.contains("alpha")));
1332        // Every id is non-empty and distinct.
1333        assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
1334    }
1335
1336    #[tokio::test]
1337    async fn branches_returns_a_tree_for_a_linear_session() {
1338        use futures::StreamExt;
1339        let dir = tempfile::tempdir().expect("tempdir");
1340        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1341            dir.path().join("s"),
1342        ));
1343        let mut config = Config::default();
1344        config.anthropic.api_key = Some("sk-unused".into());
1345        let app = AppBuilder::new()
1346            .with_config(config)
1347            .with_cwd(dir.path())
1348            .with_builtin_tools()
1349            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1350            .with_session_store(store)
1351            .build()
1352            .await
1353            .expect("build");
1354
1355        let _: Vec<_> = app.send_user_message("hello".into()).collect().await;
1356        let tree = app.branches().await.expect("branches");
1357        // A linear 1-turn session: non-empty node list, an active leaf.
1358        assert!(!tree.nodes.is_empty());
1359        assert!(tree.active_leaf.is_some());
1360    }
1361
1362    #[tokio::test]
1363    async fn switch_model_preserves_history() {
1364        use futures::StreamExt;
1365        let dir = tempfile::tempdir().expect("tempdir");
1366        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1367            dir.path().join("s"),
1368        ));
1369        let mut config = Config::default();
1370        config.anthropic.api_key = Some("sk-unused".into());
1371        let app = AppBuilder::new()
1372            .with_config(config)
1373            .with_cwd(dir.path())
1374            .with_builtin_tools()
1375            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1376            .with_session_store(store)
1377            .build()
1378            .await
1379            .expect("build");
1380
1381        let _: Vec<_> = app.send_user_message("keep me".into()).collect().await;
1382        let id_before = app.session_id();
1383
1384        app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
1385            .await
1386            .expect("switch_model");
1387
1388        assert_eq!(
1389            app.session_id(),
1390            id_before,
1391            "switch_model keeps the same session"
1392        );
1393        let history = app.session_history().await.expect("history");
1394        assert!(history.iter().any(|m| m.text().contains("keep me")));
1395    }
1396
1397    #[tokio::test]
1398    async fn switch_model_is_sticky_for_future_session_rebuilds() {
1399        let dir = tempfile::tempdir().expect("tempdir");
1400        let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1401            dir.path().join("s"),
1402        ));
1403        let mut config = Config::default();
1404        config.anthropic.api_key = Some("sk-unused".into());
1405        let app = AppBuilder::new()
1406            .with_config(config)
1407            .with_cwd(dir.path())
1408            .with_builtin_tools()
1409            .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1410            .with_session_store(store)
1411            .build()
1412            .await
1413            .expect("build");
1414
1415        let selected = crate::model::ModelId::from("claude-opus-4-7");
1416        app.switch_model(&selected).await.expect("switch_model");
1417        app.new_session().await.expect("new_session");
1418
1419        assert_eq!(app.factory.current_model(), Some(selected));
1420    }
1421
1422    struct SleepThenDoneLlm {
1423        turn: AtomicUsize,
1424    }
1425
1426    #[async_trait]
1427    impl LlmClient for SleepThenDoneLlm {
1428        async fn chat(
1429            &self,
1430            _messages: &[Message],
1431            _tools: &[ToolDef],
1432        ) -> motosan_agent_loop::Result<ChatOutput> {
1433            let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1434            if turn == 0 {
1435                Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1436                    ToolCallItem {
1437                        id: "sleep".into(),
1438                        name: "bash".into(),
1439                        args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
1440                    },
1441                ])))
1442            } else {
1443                Ok(ChatOutput::new(LlmResponse::Message("done".into())))
1444            }
1445        }
1446    }
1447
1448    #[tokio::test]
1449    async fn cancel_reaches_builtin_tools_after_session_rebuild() {
1450        use futures::StreamExt;
1451        let dir = tempfile::tempdir().expect("tempdir");
1452        let mut config = Config::default();
1453        config.anthropic.api_key = Some("sk-unused".into());
1454        let app = Arc::new(
1455            AppBuilder::new()
1456                .with_config(config)
1457                .with_cwd(dir.path())
1458                .with_builtin_tools()
1459                .with_llm(Arc::new(SleepThenDoneLlm {
1460                    turn: AtomicUsize::new(0),
1461                }) as Arc<dyn LlmClient>)
1462                .build()
1463                .await
1464                .expect("build"),
1465        );
1466
1467        app.new_session().await.expect("new_session");
1468        let running_app = Arc::clone(&app);
1469        let handle = tokio::spawn(async move {
1470            running_app
1471                .send_user_message("run a slow command".into())
1472                .collect::<Vec<_>>()
1473                .await
1474        });
1475        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1476        app.cancel();
1477
1478        let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
1479            .await
1480            .expect("turn should finish after cancellation")
1481            .expect("join");
1482        assert!(
1483            events.iter().any(|event| {
1484                matches!(
1485                    event,
1486                    UiEvent::ToolCallCompleted { result, .. }
1487                        if result.text.contains("command cancelled by user")
1488                )
1489            }),
1490            "cancel should reach the rebuilt bash tool: {events:?}"
1491        );
1492    }
1493
1494    #[tokio::test]
1495    async fn compact_summarizes_a_session_with_enough_history() {
1496        struct DoneLlm;
1497        #[async_trait]
1498        impl LlmClient for DoneLlm {
1499            async fn chat(
1500                &self,
1501                _messages: &[Message],
1502                _tools: &[ToolDef],
1503            ) -> motosan_agent_loop::Result<ChatOutput> {
1504                Ok(ChatOutput::new(LlmResponse::Message("done".into())))
1505            }
1506        }
1507
1508        let dir = tempfile::tempdir().expect("tempdir");
1509        let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
1510            dir.path().join("sessions"),
1511        ));
1512        let mut config = Config::default();
1513        config.anthropic.api_key = Some("sk-unused".into());
1514        let app = AppBuilder::new()
1515            .with_config(config)
1516            .with_cwd(dir.path())
1517            .with_builtin_tools()
1518            .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
1519            .with_session_store(store)
1520            .build()
1521            .await
1522            .expect("build");
1523
1524        // Run 4 user turns. `App::compact()` uses ThresholdStrategy with the
1525        // default keep_turns (3); `select_cutoff` only returns a cutoff once
1526        // there are >= keep_turns user messages — so a 1-turn session would
1527        // make compact() a no-op (Ok, but nothing summarized). 4 turns
1528        // guarantees the real compaction path (and the DoneLlm summarizer
1529        // call) is exercised.
1530        for i in 0..4 {
1531            let _: Vec<_> = app.send_user_message(format!("turn {i}")).collect().await;
1532        }
1533
1534        app.compact().await.expect("compact should succeed");
1535
1536        // The compaction marker is appended as a session entry — history
1537        // after compaction is shorter than the raw 4-turn transcript.
1538        let history = app.session_history().await.expect("history");
1539        assert!(
1540            !history.is_empty(),
1541            "session should still have content post-compaction"
1542        );
1543    }
1544
1545    #[test]
1546    fn anthropic_env_api_key_overrides_auth_json_key() {
1547        let mut auth = crate::auth::Auth::default();
1548        auth.0.insert(
1549            "anthropic".into(),
1550            crate::auth::ProviderAuth::ApiKey {
1551                key: "sk-auth".into(),
1552            },
1553        );
1554
1555        let key = anthropic_api_key_from(&auth, |name| {
1556            (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
1557        });
1558        assert_eq!(key.as_deref(), Some("sk-env"));
1559    }
1560
1561    #[tokio::test]
1562    async fn with_settings_overrides_deprecated_config_model() {
1563        use crate::settings::Settings;
1564
1565        let mut config = Config::default();
1566        config.model.name = "from-config".into();
1567        config.anthropic.api_key = Some("sk-config".into());
1568
1569        let mut settings = Settings::default();
1570        settings.model.name = "from-settings".into();
1571
1572        let tmp = tempfile::tempdir().unwrap();
1573        let app = AppBuilder::new()
1574            .with_config(config)
1575            .with_settings(settings)
1576            .with_cwd(tmp.path())
1577            .disable_context_discovery()
1578            .with_llm(Arc::new(EchoLlm))
1579            .build()
1580            .await
1581            .expect("build");
1582        assert_eq!(app.config().model.name, "from-settings");
1583        assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
1584    }
1585
1586    #[tokio::test]
1587    async fn with_settings_synthesises_legacy_config_for_build() {
1588        use crate::auth::{Auth, ProviderAuth};
1589        use crate::settings::Settings;
1590
1591        let mut settings = Settings::default();
1592        settings.model.name = "claude-sonnet-4-6".into();
1593
1594        let mut auth = Auth::default();
1595        auth.0.insert(
1596            "anthropic".into(),
1597            ProviderAuth::ApiKey {
1598                key: "sk-test".into(),
1599            },
1600        );
1601
1602        let tmp = tempfile::tempdir().unwrap();
1603        let app = AppBuilder::new()
1604            .with_settings(settings)
1605            .with_auth(auth)
1606            .with_cwd(tmp.path())
1607            .with_builtin_tools()
1608            .disable_context_discovery()
1609            .with_llm(Arc::new(EchoLlm))
1610            .build()
1611            .await
1612            .expect("build");
1613        let _ = app;
1614    }
1615
1616    #[tokio::test]
1617    async fn cancel_before_turn_does_not_poison_future_turns() {
1618        let dir = tempfile::tempdir().unwrap();
1619        let mut cfg = Config::default();
1620        cfg.anthropic.api_key = Some("sk-unused".into());
1621        let app = AppBuilder::new()
1622            .with_config(cfg)
1623            .with_cwd(dir.path())
1624            .with_builtin_tools()
1625            .with_llm(std::sync::Arc::new(EchoLlm))
1626            .build()
1627            .await
1628            .expect("build");
1629
1630        app.cancel();
1631        let events: Vec<UiEvent> = app.send_user_message("x".into()).collect().await;
1632
1633        assert!(
1634            events
1635                .iter()
1636                .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
1637            "turn should use a fresh cancellation token: {events:?}"
1638        );
1639    }
1640
1641    #[test]
1642    fn map_event_matches_started_and_completed_ids_by_tool_name() {
1643        let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1644
1645        let started_bash = map_event(
1646            AgentEvent::Core(CoreEvent::ToolStarted {
1647                name: "bash".into(),
1648            }),
1649            &tracker,
1650        );
1651        let started_read = map_event(
1652            AgentEvent::Core(CoreEvent::ToolStarted {
1653                name: "read".into(),
1654            }),
1655            &tracker,
1656        );
1657        let completed_bash = map_event(
1658            AgentEvent::Core(CoreEvent::ToolCompleted {
1659                name: "bash".into(),
1660                result: motosan_agent_tool::ToolResult::text("ok"),
1661            }),
1662            &tracker,
1663        );
1664        let completed_read = map_event(
1665            AgentEvent::Core(CoreEvent::ToolCompleted {
1666                name: "read".into(),
1667                result: motosan_agent_tool::ToolResult::text("ok"),
1668            }),
1669            &tracker,
1670        );
1671
1672        assert!(matches!(
1673            started_bash,
1674            Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
1675        ));
1676        assert!(matches!(
1677            started_read,
1678            Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
1679        ));
1680        assert!(matches!(
1681            completed_bash,
1682            Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
1683        ));
1684        assert!(matches!(
1685            completed_read,
1686            Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
1687        ));
1688    }
1689
1690    #[test]
1691    fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
1692        let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1693        let s1 = map_event(
1694            AgentEvent::Core(CoreEvent::ToolStarted {
1695                name: "bash".into(),
1696            }),
1697            &tracker,
1698        );
1699        let s2 = map_event(
1700            AgentEvent::Core(CoreEvent::ToolStarted {
1701                name: "bash".into(),
1702            }),
1703            &tracker,
1704        );
1705        let c1 = map_event(
1706            AgentEvent::Core(CoreEvent::ToolCompleted {
1707                name: "bash".into(),
1708                result: motosan_agent_tool::ToolResult::text("a"),
1709            }),
1710            &tracker,
1711        );
1712        let c2 = map_event(
1713            AgentEvent::Core(CoreEvent::ToolCompleted {
1714                name: "bash".into(),
1715                result: motosan_agent_tool::ToolResult::text("b"),
1716            }),
1717            &tracker,
1718        );
1719
1720        let id_s1 = match s1 {
1721            Some(UiEvent::ToolCallStarted { id, .. }) => id,
1722            other => panic!("{other:?}"),
1723        };
1724        let id_s2 = match s2 {
1725            Some(UiEvent::ToolCallStarted { id, .. }) => id,
1726            other => panic!("{other:?}"),
1727        };
1728        let id_c1 = match c1 {
1729            Some(UiEvent::ToolCallCompleted { id, .. }) => id,
1730            other => panic!("{other:?}"),
1731        };
1732        let id_c2 = match c2 {
1733            Some(UiEvent::ToolCallCompleted { id, .. }) => id,
1734            other => panic!("{other:?}"),
1735        };
1736
1737        assert_eq!(id_s1, id_c1);
1738        assert_eq!(id_s2, id_c2);
1739        assert_ne!(id_s1, id_s2);
1740    }
1741
1742    #[tokio::test]
1743    async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
1744        let dir = tempfile::tempdir().unwrap();
1745        let mut cfg = Config::default();
1746        cfg.anthropic.api_key = Some("sk-unused".into());
1747        let app = AppBuilder::new()
1748            .with_config(cfg)
1749            .with_cwd(dir.path())
1750            .with_builtin_tools()
1751            .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1752                turn: AtomicUsize::new(0),
1753            }))
1754            .build()
1755            .await
1756            .expect("build");
1757
1758        let mut first = Box::pin(app.send_user_message("first".into()));
1759        let first_event = first.next().await;
1760        assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
1761
1762        let second_events: Vec<UiEvent> = app.send_user_message("second".into()).collect().await;
1763        assert_eq!(
1764            second_events.len(),
1765            1,
1766            "expected immediate single error event, got: {second_events:?}"
1767        );
1768        assert!(matches!(
1769            &second_events[0],
1770            UiEvent::Error(msg) if msg.contains("single-turn-per-App")
1771        ));
1772    }
1773
1774    #[test]
1775    fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
1776        let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1777        assert_eq!(progress_event_id(&tracker), "tool_unknown");
1778
1779        let only = map_event(
1780            AgentEvent::Core(CoreEvent::ToolStarted {
1781                name: "bash".into(),
1782            }),
1783            &tracker,
1784        );
1785        let only_id = match only {
1786            Some(UiEvent::ToolCallStarted { id, .. }) => id,
1787            other => panic!("{other:?}"),
1788        };
1789        assert_eq!(progress_event_id(&tracker), only_id);
1790
1791        let _second = map_event(
1792            AgentEvent::Core(CoreEvent::ToolStarted {
1793                name: "read".into(),
1794            }),
1795            &tracker,
1796        );
1797        assert_eq!(progress_event_id(&tracker), "tool_unknown");
1798    }
1799
1800    #[tokio::test]
1801    async fn builder_rejects_builtin_and_custom_tools_together() {
1802        let mut cfg = Config::default();
1803        cfg.anthropic.api_key = Some("sk-unused".into());
1804        let dir = tempfile::tempdir().unwrap();
1805        let err = match AppBuilder::new()
1806            .with_config(cfg)
1807            .with_cwd(dir.path())
1808            .with_builtin_tools()
1809            .with_custom_tools_factory(|_| Vec::new())
1810            .build()
1811            .await
1812        {
1813            Ok(_) => panic!("must reject conflicting tool configuration"),
1814            Err(err) => err,
1815        };
1816
1817        assert!(format!("{err}").contains("mutually exclusive"));
1818    }
1819
1820    /// M3 Phase A smoke: two turns share history when a SessionStore is wired.
1821    #[tokio::test]
1822    async fn two_turns_in_same_session_share_history() {
1823        #[derive(Default)]
1824        struct CounterLlm {
1825            turn: AtomicUsize,
1826        }
1827        #[async_trait]
1828        impl LlmClient for CounterLlm {
1829            async fn chat(
1830                &self,
1831                messages: &[Message],
1832                _tools: &[ToolDef],
1833            ) -> motosan_agent_loop::Result<ChatOutput> {
1834                let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1835                let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
1836                Ok(ChatOutput::new(LlmResponse::Message(answer)))
1837            }
1838        }
1839
1840        let tmp = tempfile::tempdir().unwrap();
1841        let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
1842            tmp.path().to_path_buf(),
1843        ));
1844
1845        let app = AppBuilder::new()
1846            .with_settings(crate::settings::Settings::default())
1847            .with_auth(crate::auth::Auth::default())
1848            .with_cwd(tmp.path())
1849            .with_builtin_tools()
1850            .disable_context_discovery()
1851            .with_llm(std::sync::Arc::new(CounterLlm::default()))
1852            .with_session_store(store)
1853            .build_with_session(None)
1854            .await
1855            .expect("build");
1856
1857        let _events1: Vec<UiEvent> = app.send_user_message("hi".into()).collect().await;
1858        let events2: Vec<UiEvent> = app.send_user_message("again".into()).collect().await;
1859
1860        // Turn 2's LLM saw turn 1's user message + turn 1's assistant + turn 2's new user.
1861        let saw_more_than_one = events2.iter().any(|e| {
1862            matches!(
1863                e,
1864                UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
1865            )
1866        });
1867        assert!(
1868            saw_more_than_one,
1869            "second turn should have seen history; events: {events2:?}"
1870        );
1871    }
1872}
1873
1874#[cfg(test)]
1875mod skills_builder_tests {
1876    use super::*;
1877    use crate::skills::types::{Skill, SkillSource};
1878    use std::path::PathBuf;
1879
1880    fn fixture() -> Skill {
1881        Skill {
1882            name: "x".into(),
1883            description: "d".into(),
1884            file_path: PathBuf::from("/x.md"),
1885            base_dir: PathBuf::from("/"),
1886            disable_model_invocation: false,
1887            source: SkillSource::Global,
1888        }
1889    }
1890
1891    #[test]
1892    fn with_skills_stores_skills() {
1893        let b = AppBuilder::new().with_skills(vec![fixture()]);
1894        assert_eq!(b.skills.len(), 1);
1895        assert_eq!(b.skills[0].name, "x");
1896    }
1897
1898    #[test]
1899    fn without_skills_clears() {
1900        let b = AppBuilder::new()
1901            .with_skills(vec![fixture()])
1902            .without_skills();
1903        assert!(b.skills.is_empty());
1904    }
1905}
1906
1907#[cfg(test)]
1908mod mcp_builder_tests {
1909    use super::*;
1910    use motosan_agent_tool::Tool;
1911
1912    // Trivial fake Tool just to verify with_extra_tools stores Arcs.
1913    struct FakeTool;
1914    impl Tool for FakeTool {
1915        fn def(&self) -> motosan_agent_tool::ToolDef {
1916            motosan_agent_tool::ToolDef {
1917                name: "fake__echo".into(),
1918                description: "test".into(),
1919                input_schema: serde_json::json!({"type": "object"}),
1920            }
1921        }
1922        fn call(
1923            &self,
1924            _args: serde_json::Value,
1925            _ctx: &motosan_agent_tool::ToolContext,
1926        ) -> std::pin::Pin<
1927            Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
1928        > {
1929            Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
1930        }
1931    }
1932
1933    #[test]
1934    fn with_extra_tools_stores_tools() {
1935        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
1936        let b = AppBuilder::new().with_extra_tools(tools);
1937        assert_eq!(b.extra_tools.len(), 1);
1938    }
1939}