1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9 AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10 CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
27pub struct TurnStatsAccum {
28 pub cumulative_input: u64,
29 pub cumulative_output: u64,
30 pub turn_count: u64,
31}
32
33impl TurnStatsAccum {
34 pub fn add(&mut self, usage: motosan_agent_loop::TokenUsage) {
38 self.cumulative_input = self.cumulative_input.saturating_add(usage.input_tokens);
39 self.cumulative_output = self.cumulative_output.saturating_add(usage.output_tokens);
40 self.turn_count = self.turn_count.saturating_add(1);
41 }
42}
43
44#[derive(Debug, Clone)]
46pub(crate) enum SessionMode {
47 New,
49 Resume(String),
51}
52
53struct SharedLlm {
58 client: Arc<dyn LlmClient>,
59}
60
61impl SharedLlm {
62 fn new(client: Arc<dyn LlmClient>) -> Self {
63 Self { client }
64 }
65
66 fn client(&self) -> Arc<dyn LlmClient> {
67 Arc::clone(&self.client)
68 }
69}
70
71pub(crate) struct SessionFactory {
72 cwd: PathBuf,
73 settings: Arc<Mutex<crate::settings::Settings>>,
74 auth: crate::auth::Auth,
75 policy: Arc<crate::permissions::Policy>,
76 session_cache: Arc<crate::permissions::SessionCache>,
77 ui_tx: Option<mpsc::Sender<UiEvent>>,
78 headless_permissions: bool,
79 permission_gate: Arc<dyn PermissionGate>,
80 progress_tx: mpsc::Sender<ToolProgressChunk>,
83 skills: Arc<Vec<crate::skills::Skill>>,
84 install_builtin_tools: bool,
85 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
86 max_iterations: usize,
87 context_discovery_disabled: bool,
88 autocompact_enabled: bool,
89 session_store: Option<Arc<dyn SessionStore>>,
90 llm_override: Option<Arc<dyn LlmClient>>,
91 current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
92 cancel_token: SharedCancelToken,
93}
94
95impl SessionFactory {
96 fn settings(&self) -> crate::settings::Settings {
97 match self.settings.lock() {
98 Ok(guard) => guard.clone(),
99 Err(poisoned) => poisoned.into_inner().clone(),
100 }
101 }
102
103 fn store_settings(&self, settings: crate::settings::Settings) {
104 match self.settings.lock() {
105 Ok(mut guard) => *guard = settings,
106 Err(poisoned) => *poisoned.into_inner() = settings,
107 }
108 }
109
110 fn current_model(&self) -> Option<crate::model::ModelId> {
111 match self.current_model.lock() {
112 Ok(guard) => guard.clone(),
113 Err(poisoned) => poisoned.into_inner().clone(),
114 }
115 }
116
117 fn set_current_model(&self, model: crate::model::ModelId) {
118 match self.current_model.lock() {
119 Ok(mut guard) => *guard = Some(model),
120 Err(poisoned) => *poisoned.into_inner() = Some(model),
121 }
122 }
123
124 fn clear_current_model(&self) {
125 match self.current_model.lock() {
126 Ok(mut guard) => *guard = None,
127 Err(poisoned) => *poisoned.into_inner() = None,
128 }
129 }
130
131 async fn build(
138 &self,
139 mode: SessionMode,
140 model_override: Option<&crate::model::ModelId>,
141 settings_override: Option<&crate::settings::Settings>,
142 ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
143 let effective_model = model_override.cloned().or_else(|| {
147 if settings_override.is_some() {
148 None
149 } else {
150 self.current_model()
151 }
152 });
153 let mut settings = settings_override
154 .cloned()
155 .unwrap_or_else(|| self.settings());
156 if let Some(m) = &effective_model {
157 settings.model.name = m.as_str().to_string();
158 }
159
160 let llm = if effective_model.is_none() {
161 self.llm_override.as_ref().map_or_else(
162 || build_llm_client(&settings, &self.auth),
163 |llm| Ok(Arc::clone(llm)),
164 )?
165 } else {
166 build_llm_client(&settings, &self.auth)?
167 };
168
169 let tool_ctx = ToolCtx::new_with_cancel_token(
172 &self.cwd,
173 Arc::clone(&self.permission_gate),
174 self.progress_tx.clone(),
175 self.cancel_token.clone(),
176 );
177 let mut tools = if self.install_builtin_tools {
178 builtin_tools(tool_ctx.clone())
179 } else {
180 Vec::new()
181 };
182 tools.extend(self.extra_tools.iter().cloned());
183
184 let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
185 let base_prompt = build_system_prompt(&tool_names, &self.skills);
186 let system_prompt = if self.context_discovery_disabled {
187 base_prompt
188 } else {
189 let agent_dir = crate::paths::agent_dir();
190 let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
191 crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
192 };
193 let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
194
195 let mut engine_builder = Engine::builder()
196 .max_iterations(self.max_iterations)
197 .system_prompt(system_prompt)
198 .tool_context(motosan_tool_context);
199 for tool in tools {
200 engine_builder = engine_builder.tool(tool);
201 }
202 if let Some(ui_tx) = &self.ui_tx {
203 let ext = crate::permissions::PermissionExtension::new(
204 Arc::clone(&self.policy),
205 Arc::clone(&self.session_cache),
206 self.cwd.clone(),
207 ui_tx.clone(),
208 );
209 engine_builder = engine_builder.extension(Box::new(ext));
210 } else if self.headless_permissions {
211 let ext = crate::permissions::PermissionExtension::headless(
212 Arc::clone(&self.policy),
213 Arc::clone(&self.session_cache),
214 self.cwd.clone(),
215 );
216 engine_builder = engine_builder.extension(Box::new(ext));
217 }
218 if self.autocompact_enabled
219 && settings.session.compact_at_context_pct > 0.0
220 && settings.session.compact_at_context_pct < 1.0
221 {
222 let cfg = AutocompactConfig {
223 threshold: settings.session.compact_at_context_pct,
224 max_context_tokens: settings.session.max_context_tokens,
225 keep_turns: settings.session.keep_turns.max(1),
226 };
227 engine_builder = engine_builder
228 .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
229 }
230 let engine = engine_builder.build();
231
232 let session = match (&mode, &self.session_store) {
233 (SessionMode::Resume(id), Some(store)) => {
234 let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
235 .await
236 .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
237 let entries = s
238 .entries()
239 .await
240 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
241 crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
242 s
243 }
244 (SessionMode::Resume(_), None) => {
245 return Err(AppError::Config("resume requires a session store".into()));
246 }
247 (SessionMode::New, Some(store)) => {
248 let id = crate::session::SessionId::new();
249 AgentSession::new_with_store(
250 id.into_string(),
251 Arc::clone(store),
252 engine,
253 Arc::clone(&llm),
254 )
255 }
256 (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
257 };
258
259 Ok((session, llm))
260 }
261}
262
263pub struct App {
264 session: arc_swap::ArcSwap<AgentSession>,
265 llm: arc_swap::ArcSwap<SharedLlm>,
266 factory: SessionFactory,
267 config: Config,
268 cancel_token: SharedCancelToken,
269 progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
270 next_tool_id: Arc<Mutex<ToolCallTracker>>,
271 skills: Arc<Vec<crate::skills::Skill>>,
272 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
273 pub(crate) extension_registry: Arc<crate::extensions::ExtensionRegistry>,
275 pub(crate) token_tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>,
280 extension_diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
281 pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
282}
283
284impl App {
285 pub fn config(&self) -> &Config {
286 &self.config
287 }
288
289 pub fn cancel(&self) {
293 self.cancel_token.cancel();
294 }
295
296 pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
300 Arc::clone(&self.session_cache)
301 }
302
303 pub fn extension_registry(&self) -> Arc<crate::extensions::ExtensionRegistry> {
306 Arc::clone(&self.extension_registry)
307 }
308
309 pub fn settings(&self) -> crate::settings::Settings {
311 let mut settings = self.factory.settings();
312 if let Some(model) = self.factory.current_model() {
313 settings.model.name = model.to_string();
314 }
315 settings
316 }
317
318 pub fn token_tally(&self) -> Arc<tokio::sync::Mutex<TurnStatsAccum>> {
320 Arc::clone(&self.token_tally)
321 }
322
323 pub fn extension_diagnostics(&self) -> Arc<Vec<crate::extensions::ExtensionDiagnostic>> {
325 Arc::clone(&self.extension_diagnostics)
326 }
327
328 pub fn session_id(&self) -> String {
331 self.session.load().session_id().to_string()
332 }
333
334 pub async fn session_history(
343 &self,
344 ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
345 self.session.load_full().history().await
346 }
347
348 pub async fn compact(&self) -> Result<()> {
354 use motosan_agent_loop::ThresholdStrategy;
355 let strategy = ThresholdStrategy {
356 threshold: 0.0,
357 ..ThresholdStrategy::default()
358 };
359 let llm = self.llm.load_full().client();
360 self.session
361 .load_full()
362 .maybe_compact(&strategy, llm)
363 .await
364 .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
365 Ok(())
366 }
367
368 pub async fn new_session(&self) -> Result<()> {
372 self.fire_session_before_switch("new", None).await?;
373 let (session, llm) = self.factory.build(SessionMode::New, None, None).await?;
374 self.session.store(Arc::new(session));
375 self.llm.store(Arc::new(SharedLlm::new(llm)));
376 self.reset_token_tally().await;
377 Ok(())
378 }
379
380 pub async fn load_session(&self, id: &str) -> Result<()> {
386 self.fire_session_before_switch("load", Some(id)).await?;
387 self.load_session_without_hook(id).await
388 }
389
390 async fn load_session_without_hook(&self, id: &str) -> Result<()> {
391 let (session, llm) = self
392 .factory
393 .build(SessionMode::Resume(id.to_string()), None, None)
394 .await?;
395 self.session.store(Arc::new(session));
396 self.llm.store(Arc::new(SharedLlm::new(llm)));
397 self.reset_token_tally().await;
398 Ok(())
399 }
400
401 async fn reset_token_tally(&self) {
402 let mut tally = self.token_tally.lock().await;
403 *tally = TurnStatsAccum::default();
404 }
405
406 pub async fn clone_session(&self) -> Result<String> {
415 self.fire_session_before_switch("clone", None).await?;
416 let Some(store) = self.factory.session_store.as_ref() else {
417 return Err(AppError::Config("clone requires a session store".into()));
418 };
419 let source_id = self.session.load().session_id().to_string();
420 let new_id = crate::session::SessionId::new().into_string();
421 let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
422 catalog
423 .fork(&source_id, &new_id)
424 .await
425 .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
426 self.load_session_without_hook(&new_id).await?;
427 Ok(new_id)
428 }
429
430 pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
445 self.fire_session_before_switch("model_switch", None)
446 .await?;
447 let current_id = self.session.load().session_id().to_string();
448 let (session, llm) = self
449 .factory
450 .build(SessionMode::Resume(current_id), Some(model), None)
451 .await?;
452 self.factory.set_current_model(model.clone());
453 self.session.store(Arc::new(session));
454 self.llm.store(Arc::new(SharedLlm::new(llm)));
455 Ok(())
456 }
457
458 pub async fn reload_settings(&self, new_settings: crate::settings::Settings) -> Result<()> {
474 let current_id = self.session.load().session_id().to_string();
478 let (session, llm) = self
479 .factory
480 .build(SessionMode::Resume(current_id), None, Some(&new_settings))
481 .await?;
482 self.factory.store_settings(new_settings);
483 self.factory.clear_current_model();
484 self.session.store(Arc::new(session));
485 self.llm.store(Arc::new(SharedLlm::new(llm)));
486
487 self.reset_token_tally().await;
490
491 Ok(())
492 }
493
494 pub async fn disconnect_mcp(&self) {
497 for (name, server) in &self.mcp_servers {
498 let _ =
499 tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
500 tracing::debug!(target: "mcp", server = %name, "disconnected");
501 }
502 }
503
504 fn run_turn(
505 &self,
506 msg: crate::user_message::UserMessage,
507 fork_from: Option<motosan_agent_loop::EntryId>,
508 ) -> impl Stream<Item = UiEvent> + Send + 'static {
509 let session = self.session.load_full();
510 let skills = Arc::clone(&self.skills);
511 let cancel_token = self.cancel_token.clone();
512 let tracker = Arc::clone(&self.next_tool_id);
513 let progress = Arc::clone(&self.progress_rx);
514 let token_tally = Arc::clone(&self.token_tally);
515 let settings_model_name = self
516 .factory
517 .current_model()
518 .map(|model| model.to_string())
519 .unwrap_or_else(|| self.factory.settings().model.name);
520
521 async_stream::stream! {
522 let new_user = {
526 let expanded_text = crate::skills::expand::expand_skill_command(&msg.text, &skills);
530 let expanded_msg = crate::user_message::UserMessage {
531 text: expanded_text,
532 attachments: msg.attachments.clone(),
533 };
534 match crate::user_message::prepare_user_message(&expanded_msg) {
535 Ok(m) => m,
536 Err(err) => {
537 yield UiEvent::AttachmentError {
538 kind: err.kind(),
539 message: err.to_string(),
540 };
541 return;
542 }
543 }
544 };
545
546 let mut progress_guard = match progress.try_lock() {
548 Ok(guard) => guard,
549 Err(_) => {
550 yield UiEvent::Error(
551 "another turn is already running; capo is single-turn-per-App".into(),
552 );
553 return;
554 }
555 };
556
557 let cancel = cancel_token.reset();
559
560 yield UiEvent::AgentTurnStarted;
561 yield UiEvent::AgentThinking;
562
563 let handle = match fork_from {
565 None => {
566 let history = match session.history().await {
568 Ok(h) => h,
569 Err(err) => {
570 yield UiEvent::Error(format!("session.history failed: {err}"));
571 return;
572 }
573 };
574 let mut messages = history;
575 messages.push(new_user);
576 match session.start_turn(messages).await {
577 Ok(h) => h,
578 Err(err) => {
579 yield UiEvent::Error(format!("session.start_turn failed: {err}"));
580 return;
581 }
582 }
583 }
584 Some(from) => {
585 match session.fork_turn(from, vec![new_user]).await {
587 Ok(h) => h,
588 Err(err) => {
589 yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
590 return;
591 }
592 }
593 }
594 };
595 let previous_len = handle.previous_len;
596 let epoch = handle.epoch;
597 let branch_parent = handle.branch_parent;
598 let ops_tx = handle.ops_tx.clone();
599 let mut agent_stream = handle.stream;
600
601 let interrupt_bridge = tokio::spawn(async move {
609 cancel.cancelled().await;
610 let _ = ops_tx.send(AgentOp::Interrupt).await;
611 });
612
613 let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
615 let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
616
617 loop {
618 while let Ok(chunk) = progress_guard.try_recv() {
620 yield UiEvent::ToolCallProgress {
621 id: progress_event_id(&tracker),
622 chunk: ProgressChunk::from(chunk),
623 };
624 }
625
626 tokio::select! {
627 biased;
628 maybe_item = agent_stream.next() => {
629 match maybe_item {
630 Some(AgentStreamItem::Event(ev)) => {
631 if let Some(ui) = map_event(ev, &tracker) {
632 yield ui;
633 }
634 }
635 Some(AgentStreamItem::Terminal(term)) => {
636 terminal_result = Some(term.result);
637 terminal_messages = Some(term.messages);
638 break;
639 }
640 None => break,
641 }
642 }
643 Some(chunk) = progress_guard.recv() => {
644 yield UiEvent::ToolCallProgress {
645 id: progress_event_id(&tracker),
646 chunk: ProgressChunk::from(chunk),
647 };
648 }
649 }
650 }
651
652 interrupt_bridge.abort();
654
655 if let Some(msgs) = terminal_messages.as_ref() {
657 if let Err(err) = session
658 .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
659 .await
660 {
661 yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
662 }
663 }
664
665 match terminal_result {
667 Some(Ok(result)) => {
668 let final_text = terminal_messages
669 .as_ref()
670 .and_then(|msgs| {
671 msgs.iter()
672 .rev()
673 .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
674 .map(|m| m.text())
675 })
676 .unwrap_or_default();
677 if !final_text.is_empty() {
678 yield UiEvent::AgentMessageComplete(final_text);
679 }
680 let usage = result.usage;
683 let (cumulative_input, cumulative_output) = {
684 let mut tally = token_tally.lock().await;
685 tally.add(usage);
686 (tally.cumulative_input, tally.cumulative_output)
687 };
688 yield UiEvent::TurnStats {
689 input_tokens: usage.input_tokens,
690 output_tokens: usage.output_tokens,
691 cumulative_input,
692 cumulative_output,
693 model: settings_model_name.clone(),
694 };
695 while let Ok(chunk) = progress_guard.try_recv() {
697 yield UiEvent::ToolCallProgress {
698 id: progress_event_id(&tracker),
699 chunk: ProgressChunk::from(chunk),
700 };
701 }
702 yield UiEvent::AgentTurnComplete;
703 }
704 Some(Err(err)) => {
705 yield UiEvent::Error(format!("{err}"));
706 }
707 None => { }
708 }
709 }
710 }
711
712 pub fn send_user_message(
713 &self,
714 msg: crate::user_message::UserMessage,
715 ) -> impl Stream<Item = UiEvent> + Send + 'static {
716 self.run_turn(msg, None)
717 }
718
719 pub fn fork_from(
724 &self,
725 from: motosan_agent_loop::EntryId,
726 message: crate::user_message::UserMessage,
727 ) -> impl Stream<Item = UiEvent> + Send + 'static {
728 let registry = Arc::clone(&self.extension_registry);
729 let inner = self.run_turn(message, Some(from));
730 async_stream::stream! {
731 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
732 match dispatch_session_before_switch(®istry, "fork", None).await {
733 HookOutcome::Continue => {}
734 HookOutcome::Cancelled { extension_name, reason } => {
735 let msg = match reason {
736 Some(r) => format!("extension `{extension_name}` cancelled fork: {r}"),
737 None => format!("extension `{extension_name}` cancelled fork"),
738 };
739 yield UiEvent::Error(msg);
740 return;
741 }
742 }
743 let mut inner = Box::pin(inner);
744 while let Some(ev) = futures::StreamExt::next(&mut inner).await {
745 yield ev;
746 }
747 }
748 }
749
750 pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
754 let entries = self
755 .session
756 .load_full()
757 .entries()
758 .await
759 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
760 let branch = motosan_agent_loop::active_branch(&entries);
761 let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
762 .iter()
763 .filter_map(|stored| {
764 let msg = stored.entry.as_message()?;
765 if !matches!(msg.role(), motosan_agent_loop::Role::User) {
766 return None;
767 }
768 let preview: String = msg
769 .text()
770 .lines()
771 .next()
772 .unwrap_or("")
773 .chars()
774 .take(80)
775 .collect();
776 Some((stored.id.clone(), preview))
777 })
778 .collect();
779 out.reverse();
780 Ok(out)
781 }
782
783 pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
785 self.session
786 .load_full()
787 .branches()
788 .await
789 .map_err(|e| AppError::Config(format!("branches failed: {e}")))
790 }
791
792 async fn fire_session_before_switch(
795 &self,
796 reason: &str,
797 session_id: Option<&str>,
798 ) -> Result<()> {
799 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
800 match dispatch_session_before_switch(&self.extension_registry, reason, session_id).await {
801 HookOutcome::Continue => Ok(()),
802 HookOutcome::Cancelled {
803 extension_name,
804 reason,
805 } => Err(AppError::HookCancelled {
806 extension_name,
807 reason,
808 }),
809 }
810 }
811}
812
813#[derive(Debug, Default)]
814struct ToolCallTracker {
815 next_id: usize,
816 pending: VecDeque<(String, String)>,
817}
818
819impl ToolCallTracker {
820 fn start(&mut self, name: &str) -> String {
821 self.next_id += 1;
822 let id = format!("tool_{}", self.next_id);
823 self.pending.push_back((name.to_string(), id.clone()));
824 id
825 }
826
827 fn complete(&mut self, name: &str) -> String {
828 if let Some(pos) = self
829 .pending
830 .iter()
831 .position(|(pending_name, _)| pending_name == name)
832 {
833 if let Some((_, id)) = self.pending.remove(pos) {
834 return id;
835 }
836 }
837
838 self.next_id += 1;
839 format!("tool_{}", self.next_id)
840 }
841
842 fn progress_id(&self) -> Option<String> {
847 match self.pending.len() {
848 1 => self.pending.front().map(|(_, id)| id.clone()),
849 _ => None,
850 }
851 }
852}
853
854fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
855 match tracker.lock() {
856 Ok(guard) => guard,
857 Err(poisoned) => poisoned.into_inner(),
858 }
859}
860
861fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
862 lock_tool_tracker(tracker)
863 .progress_id()
864 .unwrap_or_else(|| "tool_unknown".to_string())
865}
866
867fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
868where
869 F: Fn(&str) -> Option<String>,
870{
871 env_lookup("ANTHROPIC_API_KEY")
872 .map(|key| key.trim().to_string())
873 .filter(|key| !key.is_empty())
874 .or_else(|| auth.api_key("anthropic").map(str::to_string))
875}
876
877fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
878 match ev {
879 AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
880 AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
881 let id = lock_tool_tracker(tool_tracker).start(&name);
882 Some(UiEvent::ToolCallStarted {
883 id,
884 name,
885 args: serde_json::json!({}),
886 })
887 }
888 AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
889 let id = lock_tool_tracker(tool_tracker).complete(&name);
890 Some(UiEvent::ToolCallCompleted {
891 id,
892 result: UiToolResult {
893 is_error: result.is_error,
894 text: format!("{name}: {result:?}"),
895 },
896 })
897 }
898 _ => None,
899 }
900}
901
902type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
903
904pub struct AppBuilder {
905 config: Option<Config>,
906 cwd: Option<PathBuf>,
907 permission_gate: Option<Arc<dyn PermissionGate>>,
908 install_builtin_tools: bool,
909 max_iterations: usize,
910 llm_override: Option<Arc<dyn LlmClient>>,
911 custom_tools_factory: Option<CustomToolsFactory>,
912 permissions_policy_path: Option<PathBuf>,
913 ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
914 headless_permissions: bool,
915 settings: Option<crate::settings::Settings>,
916 auth: Option<crate::auth::Auth>,
917 context_discovery_disabled: bool,
918 session_store: Option<Arc<dyn SessionStore>>,
920 resume_session_id: Option<crate::session::SessionId>,
921 autocompact_enabled: bool,
922 skills: Vec<crate::skills::Skill>,
924 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
926 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
927 extension_registry: Option<Arc<crate::extensions::ExtensionRegistry>>,
928 extension_diagnostics: Option<Arc<Vec<crate::extensions::ExtensionDiagnostic>>>,
929 token_tally: Option<Arc<tokio::sync::Mutex<TurnStatsAccum>>>,
930}
931
932impl Default for AppBuilder {
933 fn default() -> Self {
934 Self {
935 config: None,
936 cwd: None,
937 permission_gate: None,
938 install_builtin_tools: false,
939 max_iterations: 20,
940 llm_override: None,
941 custom_tools_factory: None,
942 permissions_policy_path: None,
943 ui_tx: None,
944 headless_permissions: false,
945 settings: None,
946 auth: None,
947 context_discovery_disabled: false,
948 session_store: None,
949 resume_session_id: None,
950 autocompact_enabled: false,
951 skills: Vec::new(),
952 extra_tools: Vec::new(),
953 mcp_servers: Vec::new(),
954 extension_registry: None,
955 extension_diagnostics: None,
956 token_tally: None,
957 }
958 }
959}
960
961impl AppBuilder {
962 pub fn new() -> Self {
963 Self::default()
964 }
965
966 pub fn with_config(mut self, cfg: Config) -> Self {
967 self.config = Some(cfg);
968 self
969 }
970
971 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
972 self.cwd = Some(cwd.into());
973 self
974 }
975
976 pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
977 self.permission_gate = Some(gate);
978 self
979 }
980
981 pub fn with_builtin_tools(mut self) -> Self {
987 self.install_builtin_tools = true;
988 self
989 }
990
991 pub fn with_max_iterations(mut self, n: usize) -> Self {
992 self.max_iterations = n;
993 self
994 }
995
996 pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
997 self.llm_override = Some(llm);
998 self
999 }
1000
1001 pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
1002 self.permissions_policy_path = Some(path);
1003 self
1004 }
1005
1006 pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
1007 self.ui_tx = Some(tx);
1008 self
1009 }
1010
1011 pub fn with_token_tally(mut self, tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>) -> Self {
1013 self.token_tally = Some(tally);
1014 self
1015 }
1016
1017 pub fn with_headless_permissions(mut self) -> Self {
1022 self.headless_permissions = true;
1023 self
1024 }
1025
1026 pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
1028 self.settings = Some(settings);
1029 self
1030 }
1031
1032 pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
1034 self.auth = Some(auth);
1035 self
1036 }
1037
1038 pub fn disable_context_discovery(mut self) -> Self {
1041 self.context_discovery_disabled = true;
1042 self
1043 }
1044
1045 pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
1048 self.session_store = Some(store);
1049 self
1050 }
1051
1052 pub fn with_autocompact(mut self) -> Self {
1057 self.autocompact_enabled = true;
1058 self
1059 }
1060
1061 pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
1067 self.skills = skills;
1068 self
1069 }
1070
1071 pub fn without_skills(mut self) -> Self {
1072 self.skills.clear();
1073 self
1074 }
1075
1076 pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
1080 self.extra_tools = tools;
1081 self
1082 }
1083
1084 pub fn with_extension_registry(
1085 mut self,
1086 registry: Arc<crate::extensions::ExtensionRegistry>,
1087 ) -> Self {
1088 self.extension_registry = Some(registry);
1089 self
1090 }
1091
1092 pub fn with_extension_diagnostics(
1093 mut self,
1094 diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
1095 ) -> Self {
1096 self.extension_diagnostics = Some(diagnostics);
1097 self
1098 }
1099
1100 pub fn with_mcp_servers(
1103 mut self,
1104 servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1105 ) -> Self {
1106 self.mcp_servers = servers;
1107 self
1108 }
1109
1110 pub fn with_custom_tools_factory(
1115 mut self,
1116 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1117 ) -> Self {
1118 self.custom_tools_factory = Some(Box::new(factory));
1119 self
1120 }
1121
1122 pub async fn build_with_custom_tools(
1126 self,
1127 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1128 ) -> Result<App> {
1129 self.with_custom_tools_factory(factory).build().await
1130 }
1131
1132 pub async fn build_with_session(
1140 mut self,
1141 resume: Option<crate::session::SessionId>,
1142 ) -> Result<App> {
1143 if let Some(id) = resume {
1144 if self.session_store.is_none() {
1145 return Err(AppError::Config(
1146 "build_with_session(Some(id)) requires with_session_store(...)".into(),
1147 ));
1148 }
1149 self.resume_session_id = Some(id);
1150 }
1151 self.build_internal().await
1152 }
1153
1154 pub async fn build(self) -> Result<App> {
1156 self.build_with_session(None).await
1157 }
1158
1159 async fn build_internal(mut self) -> Result<App> {
1160 let mcp_servers = std::mem::take(&mut self.mcp_servers);
1161 let extra_tools = std::mem::take(&mut self.extra_tools);
1162 let skills = self.skills.clone();
1163 if self.install_builtin_tools && self.custom_tools_factory.is_some() {
1164 return Err(AppError::Config(
1165 "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
1166 ));
1167 }
1168
1169 let has_config = self.config.is_some();
1173 let has_auth = self.auth.is_some();
1174 let mut config = self.config.unwrap_or_default();
1175 let settings = match self.settings {
1176 Some(settings) => settings,
1177 None => {
1178 let mut settings = crate::settings::Settings::default();
1179 settings.model.provider = config.model.provider.clone();
1180 settings.model.name = config.model.name.clone();
1181 settings.model.max_tokens = config.model.max_tokens;
1182 settings
1183 }
1184 };
1185 config.model.provider = settings.model.provider.clone();
1186 config.model.name = settings.model.name.clone();
1187 config.model.max_tokens = settings.model.max_tokens;
1188 let mut auth = self.auth.unwrap_or_default();
1189 if !has_auth {
1190 if let Some(key) = config.anthropic.api_key.as_deref() {
1191 auth.0.insert(
1192 "anthropic".into(),
1193 crate::auth::ProviderAuth::ApiKey {
1194 key: key.to_string(),
1195 },
1196 );
1197 }
1198 }
1199 let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
1200 if env_or_auth_key.is_some() || has_auth || !has_config {
1201 config.anthropic.api_key = env_or_auth_key;
1202 }
1203 let cwd = self
1204 .cwd
1205 .or_else(|| std::env::current_dir().ok())
1206 .unwrap_or_else(|| PathBuf::from("."));
1207 let agent_dir = crate::paths::agent_dir();
1208 let permission_gate = self.permission_gate.unwrap_or_else(|| {
1209 if self.ui_tx.is_some() || self.headless_permissions {
1213 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1214 } else {
1215 tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
1216 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1217 }
1218 });
1219
1220 let policy: Arc<crate::permissions::Policy> =
1222 Arc::new(match self.permissions_policy_path.as_ref() {
1223 Some(path) => crate::permissions::Policy::load_or_default(path)?,
1224 None => crate::permissions::Policy::default(),
1225 });
1226 let session_cache = Arc::new(crate::permissions::SessionCache::new());
1227 let token_tally = self
1228 .token_tally
1229 .unwrap_or_else(|| Arc::new(tokio::sync::Mutex::new(TurnStatsAccum::default())));
1230
1231 let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
1233
1234 let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
1239 let cancel_token = probe_ctx.cancel_token.clone();
1240 let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
1241 if let Some(factory_fn) = self.custom_tools_factory.take() {
1242 let mut t = factory_fn(probe_ctx);
1243 t.extend(extra_tools.clone());
1244 (false, t)
1245 } else {
1246 (self.install_builtin_tools, extra_tools.clone())
1247 };
1248
1249 let factory = SessionFactory {
1250 cwd: cwd.clone(),
1251 settings: Arc::new(Mutex::new(settings.clone())),
1252 auth: auth.clone(),
1253 policy: Arc::clone(&policy),
1254 session_cache: Arc::clone(&session_cache),
1255 ui_tx: self.ui_tx.clone(),
1256 headless_permissions: self.headless_permissions,
1257 permission_gate: Arc::clone(&permission_gate),
1258 progress_tx: progress_tx.clone(),
1259 skills: Arc::new(skills.clone()),
1260 install_builtin_tools: install_builtin,
1261 extra_tools: factory_extra_tools,
1262 max_iterations: self.max_iterations,
1263 context_discovery_disabled: self.context_discovery_disabled,
1264 autocompact_enabled: self.autocompact_enabled,
1265 session_store: self.session_store.clone(),
1266 llm_override: self.llm_override.clone(),
1267 current_model: Arc::new(Mutex::new(None)),
1268 cancel_token: cancel_token.clone(),
1269 };
1270
1271 let mode = match self.resume_session_id.take() {
1272 Some(id) => SessionMode::Resume(id.into_string()),
1273 None => SessionMode::New,
1274 };
1275 let (session, llm) = factory.build(mode, None, None).await?;
1276 let (extension_registry, extension_diagnostics) =
1277 if let Some(reg) = self.extension_registry.take() {
1278 let diagnostics = self
1279 .extension_diagnostics
1280 .unwrap_or_else(|| Arc::new(Vec::new()));
1281 (reg, diagnostics)
1282 } else {
1283 let manifest_path = crate::paths::extensions_manifest_path(&agent_dir);
1284 let (registry, diagnostics) =
1285 crate::extensions::load_extensions_manifest(&manifest_path).await;
1286 for d in &diagnostics {
1287 let message = d.message.as_str();
1288 match d.severity {
1289 crate::extensions::DiagnosticSeverity::Warn => {
1290 tracing::warn!("extensions: {message}");
1291 }
1292 crate::extensions::DiagnosticSeverity::Error => {
1293 tracing::error!("extensions: {message}");
1294 }
1295 }
1296 }
1297 (Arc::new(registry), Arc::new(diagnostics))
1298 };
1299
1300 Ok(App {
1301 session: arc_swap::ArcSwap::from_pointee(session),
1302 llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1303 factory,
1304 config,
1305 cancel_token,
1306 progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1307 next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1308 skills: Arc::new(skills),
1309 mcp_servers,
1310 extension_registry,
1311 token_tally,
1312 extension_diagnostics,
1313 session_cache,
1314 })
1315 }
1316}
1317
1318#[cfg(test)]
1319mod tests {
1320 use super::*;
1321 use crate::config::{AnthropicConfig, ModelConfig};
1322 use crate::events::UiEvent;
1323 use crate::user_message::UserMessage;
1324 use async_trait::async_trait;
1325 use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1326 use motosan_agent_tool::ToolDef;
1327 use std::sync::atomic::{AtomicUsize, Ordering};
1328
1329 #[test]
1330 fn turn_stats_accum_accumulates_across_calls() {
1331 let mut accum = TurnStatsAccum::default();
1332 accum.add(motosan_agent_loop::TokenUsage {
1333 input_tokens: 100,
1334 output_tokens: 25,
1335 });
1336 accum.add(motosan_agent_loop::TokenUsage {
1337 input_tokens: 1000,
1338 output_tokens: 250,
1339 });
1340 assert_eq!(accum.cumulative_input, 1100);
1341 assert_eq!(accum.cumulative_output, 275);
1342 assert_eq!(accum.turn_count, 2);
1343 }
1344
1345 #[test]
1346 fn turn_stats_accum_saturates_on_overflow() {
1347 let mut accum = TurnStatsAccum {
1348 cumulative_input: u64::MAX - 5,
1349 cumulative_output: 0,
1350 turn_count: 1,
1351 };
1352 accum.add(motosan_agent_loop::TokenUsage {
1353 input_tokens: 100,
1354 output_tokens: 0,
1355 });
1356 assert_eq!(accum.cumulative_input, u64::MAX);
1357 }
1358
1359 #[tokio::test]
1360 async fn builder_fails_without_api_key() {
1361 let cfg = Config {
1362 anthropic: AnthropicConfig {
1363 api_key: None,
1364 base_url: "https://api.anthropic.com".into(),
1365 },
1366 model: ModelConfig {
1367 provider: "anthropic".into(),
1368 name: "claude-sonnet-4-6".into(),
1369 max_tokens: 4096,
1370 },
1371 };
1372 let err = match AppBuilder::new()
1373 .with_config(cfg)
1374 .with_builtin_tools()
1375 .build()
1376 .await
1377 {
1378 Ok(_) => panic!("must fail without key"),
1379 Err(err) => err,
1380 };
1381 assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1382 }
1383
1384 struct ToolOnlyLlm {
1385 turn: AtomicUsize,
1386 }
1387
1388 #[async_trait]
1389 impl LlmClient for ToolOnlyLlm {
1390 async fn chat(
1391 &self,
1392 _messages: &[Message],
1393 _tools: &[ToolDef],
1394 ) -> motosan_agent_loop::Result<ChatOutput> {
1395 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1396 if turn == 0 {
1397 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1398 ToolCallItem {
1399 id: "t1".into(),
1400 name: "read".into(),
1401 args: serde_json::json!({"path":"nope.txt"}),
1402 },
1403 ])))
1404 } else {
1405 Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1406 }
1407 }
1408 }
1409
1410 #[tokio::test]
1411 async fn empty_final_message_is_not_emitted() {
1412 let dir = tempfile::tempdir().unwrap();
1413 let mut cfg = Config::default();
1414 cfg.anthropic.api_key = Some("sk-unused".into());
1415 let app = AppBuilder::new()
1416 .with_config(cfg)
1417 .with_cwd(dir.path())
1418 .with_builtin_tools()
1419 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1420 turn: AtomicUsize::new(0),
1421 }))
1422 .build()
1423 .await
1424 .expect("build");
1425 let events: Vec<UiEvent> =
1426 futures::StreamExt::collect(app.send_user_message(UserMessage::text("x"))).await;
1427 let empties = events
1428 .iter()
1429 .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1430 .count();
1431 assert_eq!(
1432 empties, 0,
1433 "should not emit empty final message, got: {events:?}"
1434 );
1435 }
1436
1437 struct EchoLlm;
1438
1439 struct UsageLlm {
1440 responses: std::sync::Mutex<VecDeque<ChatOutput>>,
1441 }
1442
1443 #[async_trait]
1444 impl LlmClient for UsageLlm {
1445 async fn chat(
1446 &self,
1447 _messages: &[Message],
1448 _tools: &[ToolDef],
1449 ) -> motosan_agent_loop::Result<ChatOutput> {
1450 let next = match self.responses.lock() {
1451 Ok(mut responses) => responses.pop_front(),
1452 Err(poisoned) => poisoned.into_inner().pop_front(),
1453 };
1454 Ok(next.unwrap_or_else(|| panic!("UsageLlm script exhausted")))
1455 }
1456 }
1457
1458 #[async_trait]
1459 impl LlmClient for EchoLlm {
1460 async fn chat(
1461 &self,
1462 _messages: &[Message],
1463 _tools: &[ToolDef],
1464 ) -> motosan_agent_loop::Result<ChatOutput> {
1465 Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1466 }
1467 }
1468
1469 #[tokio::test]
1470 async fn turn_stats_emitted_with_cumulative_after_each_turn() {
1471 use motosan_agent_loop::{LlmResponse, TokenUsage};
1472
1473 let dir = tempfile::tempdir().expect("tempdir");
1474 let mut cfg = Config::default();
1475 cfg.anthropic.api_key = Some("sk-unused".into());
1476 let llm = Arc::new(UsageLlm {
1477 responses: std::sync::Mutex::new(VecDeque::from([
1478 ChatOutput::with_usage(
1479 LlmResponse::Message("first".into()),
1480 TokenUsage {
1481 input_tokens: 100,
1482 output_tokens: 50,
1483 },
1484 ),
1485 ChatOutput::with_usage(
1486 LlmResponse::Message("second".into()),
1487 TokenUsage {
1488 input_tokens: 200,
1489 output_tokens: 80,
1490 },
1491 ),
1492 ])),
1493 });
1494 let app = AppBuilder::new()
1495 .with_config(cfg)
1496 .with_cwd(dir.path())
1497 .with_builtin_tools()
1498 .with_llm(llm)
1499 .build()
1500 .await
1501 .expect("build");
1502
1503 let events_1: Vec<UiEvent> = app
1504 .send_user_message(UserMessage::text("hi"))
1505 .collect()
1506 .await;
1507 let events_2: Vec<UiEvent> = app
1508 .send_user_message(UserMessage::text("again"))
1509 .collect()
1510 .await;
1511
1512 let ts1 = events_1
1513 .iter()
1514 .find_map(|e| match e {
1515 UiEvent::TurnStats {
1516 input_tokens,
1517 output_tokens,
1518 cumulative_input,
1519 cumulative_output,
1520 ..
1521 } => Some((
1522 *input_tokens,
1523 *output_tokens,
1524 *cumulative_input,
1525 *cumulative_output,
1526 )),
1527 _ => None,
1528 })
1529 .expect("turn 1 had no TurnStats");
1530 assert_eq!(ts1, (100, 50, 100, 50), "turn 1 stats");
1531
1532 let ts2 = events_2
1533 .iter()
1534 .find_map(|e| match e {
1535 UiEvent::TurnStats {
1536 input_tokens,
1537 output_tokens,
1538 cumulative_input,
1539 cumulative_output,
1540 ..
1541 } => Some((
1542 *input_tokens,
1543 *output_tokens,
1544 *cumulative_input,
1545 *cumulative_output,
1546 )),
1547 _ => None,
1548 })
1549 .expect("turn 2 had no TurnStats");
1550 assert_eq!(ts2, (200, 80, 300, 130), "turn 2 stats");
1551
1552 let positions: Vec<&str> = events_1
1553 .iter()
1554 .filter_map(|e| match e {
1555 UiEvent::AgentMessageComplete(_) => Some("msg_complete"),
1556 UiEvent::TurnStats { .. } => Some("stats"),
1557 UiEvent::AgentTurnComplete => Some("turn_complete"),
1558 _ => None,
1559 })
1560 .collect();
1561 assert_eq!(
1562 positions,
1563 vec!["msg_complete", "stats", "turn_complete"],
1564 "wrong ordering"
1565 );
1566 }
1567
1568 #[tokio::test]
1569 async fn turn_stats_reset_after_new_session() {
1570 use motosan_agent_loop::{LlmResponse, TokenUsage};
1571
1572 let dir = tempfile::tempdir().expect("tempdir");
1573 let mut cfg = Config::default();
1574 cfg.anthropic.api_key = Some("sk-unused".into());
1575 let llm = Arc::new(UsageLlm {
1576 responses: std::sync::Mutex::new(VecDeque::from([
1577 ChatOutput::with_usage(
1578 LlmResponse::Message("first".into()),
1579 TokenUsage {
1580 input_tokens: 100,
1581 output_tokens: 50,
1582 },
1583 ),
1584 ChatOutput::with_usage(
1585 LlmResponse::Message("after-new".into()),
1586 TokenUsage {
1587 input_tokens: 7,
1588 output_tokens: 3,
1589 },
1590 ),
1591 ])),
1592 });
1593 let app = AppBuilder::new()
1594 .with_config(cfg)
1595 .with_cwd(dir.path())
1596 .with_builtin_tools()
1597 .with_llm(llm)
1598 .build()
1599 .await
1600 .expect("build");
1601
1602 let _: Vec<UiEvent> = app
1603 .send_user_message(UserMessage::text("hi"))
1604 .collect()
1605 .await;
1606 app.new_session().await.expect("new session");
1607 let events: Vec<UiEvent> = app
1608 .send_user_message(UserMessage::text("after new"))
1609 .collect()
1610 .await;
1611 let stats = events
1612 .iter()
1613 .find_map(|event| match event {
1614 UiEvent::TurnStats {
1615 input_tokens,
1616 output_tokens,
1617 cumulative_input,
1618 cumulative_output,
1619 ..
1620 } => Some((
1621 *input_tokens,
1622 *output_tokens,
1623 *cumulative_input,
1624 *cumulative_output,
1625 )),
1626 _ => None,
1627 })
1628 .expect("turn had no TurnStats");
1629 assert_eq!(stats, (7, 3, 7, 3));
1630 }
1631
1632 #[tokio::test]
1633 async fn with_headless_permissions_builds_an_app() {
1634 let dir = tempfile::tempdir().expect("tempdir");
1635 let mut config = Config::default();
1636 config.anthropic.api_key = Some("sk-unused".into());
1637 let app = AppBuilder::new()
1638 .with_config(config)
1639 .with_cwd(dir.path())
1640 .with_builtin_tools()
1641 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1642 .with_headless_permissions()
1643 .build()
1644 .await
1645 .expect("build");
1646 assert!(!app.session_id().is_empty());
1648 }
1649
1650 #[tokio::test]
1651 async fn new_session_swaps_in_a_fresh_empty_session() {
1652 use futures::StreamExt;
1653 let dir = tempfile::tempdir().expect("tempdir");
1654 let mut config = Config::default();
1655 config.anthropic.api_key = Some("sk-unused".into());
1656 let app = AppBuilder::new()
1657 .with_config(config)
1658 .with_cwd(dir.path())
1659 .with_builtin_tools()
1660 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1661 .build()
1662 .await
1663 .expect("build");
1664
1665 let _: Vec<_> = app
1666 .send_user_message(UserMessage::text("hello"))
1667 .collect()
1668 .await;
1669 let id_before = app.session_id();
1670 assert!(!app.session_history().await.expect("history").is_empty());
1671
1672 app.new_session().await.expect("new_session");
1673
1674 assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1675 assert!(
1676 app.session_history().await.expect("history").is_empty(),
1677 "fresh session has no history"
1678 );
1679 }
1680
1681 #[tokio::test]
1682 async fn load_session_restores_a_stored_session_by_id() {
1683 use futures::StreamExt;
1684 let dir = tempfile::tempdir().expect("tempdir");
1685 let store_dir = dir.path().join("sessions");
1686 let store: Arc<dyn SessionStore> =
1687 Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1688 let mut config = Config::default();
1689 config.anthropic.api_key = Some("sk-unused".into());
1690 let app = AppBuilder::new()
1691 .with_config(config)
1692 .with_cwd(dir.path())
1693 .with_builtin_tools()
1694 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1695 .with_session_store(Arc::clone(&store))
1696 .build()
1697 .await
1698 .expect("build");
1699
1700 let _: Vec<_> = app
1701 .send_user_message(UserMessage::text("remember this"))
1702 .collect()
1703 .await;
1704 let original_id = app.session_id();
1705
1706 app.new_session().await.expect("new_session");
1707 assert_ne!(app.session_id(), original_id);
1708
1709 app.load_session(&original_id).await.expect("load_session");
1710 assert_eq!(app.session_id(), original_id);
1711 let history = app.session_history().await.expect("history");
1712 assert!(
1713 history.iter().any(|m| m.text().contains("remember this")),
1714 "loaded session should carry the original turn"
1715 );
1716 }
1717
1718 #[tokio::test]
1719 async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
1720 use futures::StreamExt;
1721 let dir = tempfile::tempdir().expect("tempdir");
1722 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1723 dir.path().join("s"),
1724 ));
1725 let mut config = Config::default();
1726 config.anthropic.api_key = Some("sk-unused".into());
1727 let app = AppBuilder::new()
1728 .with_config(config)
1729 .with_cwd(dir.path())
1730 .with_builtin_tools()
1731 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1732 .with_session_store(store)
1733 .build()
1734 .await
1735 .expect("build");
1736
1737 let _: Vec<_> = app
1738 .send_user_message(UserMessage::text("hello"))
1739 .collect()
1740 .await;
1741 let original_id = app.session_id();
1742
1743 let new_id = app.clone_session().await.expect("clone_session");
1744
1745 assert_ne!(new_id, original_id);
1747 assert_eq!(app.session_id(), new_id);
1748 let history = app.session_history().await.expect("history");
1750 assert!(history.iter().any(|m| m.text().contains("hello")));
1751 }
1752
1753 #[tokio::test]
1754 async fn fork_from_creates_a_branch_off_an_earlier_entry() {
1755 use futures::StreamExt;
1756 let dir = tempfile::tempdir().expect("tempdir");
1757 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1758 dir.path().join("s"),
1759 ));
1760 let mut config = Config::default();
1761 config.anthropic.api_key = Some("sk-unused".into());
1762 let app = AppBuilder::new()
1763 .with_config(config)
1764 .with_cwd(dir.path())
1765 .with_builtin_tools()
1766 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1767 .with_session_store(store)
1768 .build()
1769 .await
1770 .expect("build");
1771
1772 let _: Vec<_> = app
1774 .send_user_message(UserMessage::text("first"))
1775 .collect()
1776 .await;
1777 let _: Vec<_> = app
1778 .send_user_message(UserMessage::text("second"))
1779 .collect()
1780 .await;
1781
1782 let entries = app.session.load_full().entries().await.expect("entries");
1784 let first_id = entries
1785 .iter()
1786 .find_map(|stored| {
1787 let msg = stored.entry.as_message()?;
1788 (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
1789 .then(|| stored.id.clone())
1790 })
1791 .expect("first user message present");
1792
1793 let _: Vec<_> = app
1795 .fork_from(first_id, UserMessage::text("branched"))
1796 .collect()
1797 .await;
1798
1799 let history = app.session_history().await.expect("history");
1800 let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
1801 assert!(
1802 texts.iter().any(|t| t.contains("first")),
1803 "fork keeps the fork-point ancestor"
1804 );
1805 assert!(
1806 texts.iter().any(|t| t.contains("branched")),
1807 "fork includes the new message"
1808 );
1809 assert!(
1810 !texts.iter().any(|t| t.contains("second")),
1811 "fork excludes the abandoned branch"
1812 );
1813 }
1814
1815 #[tokio::test]
1816 async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
1817 use futures::StreamExt;
1818 let dir = tempfile::tempdir().expect("tempdir");
1819 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1820 dir.path().join("s"),
1821 ));
1822 let mut config = Config::default();
1823 config.anthropic.api_key = Some("sk-unused".into());
1824 let app = AppBuilder::new()
1825 .with_config(config)
1826 .with_cwd(dir.path())
1827 .with_builtin_tools()
1828 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1829 .with_session_store(store)
1830 .build()
1831 .await
1832 .expect("build");
1833
1834 let _: Vec<_> = app
1835 .send_user_message(UserMessage::text("alpha"))
1836 .collect()
1837 .await;
1838 let _: Vec<_> = app
1839 .send_user_message(UserMessage::text("bravo"))
1840 .collect()
1841 .await;
1842
1843 let candidates = app.fork_candidates().await.expect("candidates");
1844 let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
1845 assert!(previews[0].contains("bravo"), "got {previews:?}");
1847 assert!(previews.iter().any(|p| p.contains("alpha")));
1848 assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
1850 }
1851
1852 #[tokio::test]
1853 async fn branches_returns_a_tree_for_a_linear_session() {
1854 use futures::StreamExt;
1855 let dir = tempfile::tempdir().expect("tempdir");
1856 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1857 dir.path().join("s"),
1858 ));
1859 let mut config = Config::default();
1860 config.anthropic.api_key = Some("sk-unused".into());
1861 let app = AppBuilder::new()
1862 .with_config(config)
1863 .with_cwd(dir.path())
1864 .with_builtin_tools()
1865 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1866 .with_session_store(store)
1867 .build()
1868 .await
1869 .expect("build");
1870
1871 let _: Vec<_> = app
1872 .send_user_message(UserMessage::text("hello"))
1873 .collect()
1874 .await;
1875 let tree = app.branches().await.expect("branches");
1876 assert!(!tree.nodes.is_empty());
1878 assert!(tree.active_leaf.is_some());
1879 }
1880
1881 #[tokio::test]
1882 async fn reload_settings_rebuilds_session_and_resets_token_tally() {
1883 let dir = tempfile::tempdir().expect("tempdir");
1884 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1885 dir.path().join("s"),
1886 ));
1887 let mut cfg = Config::default();
1888 cfg.anthropic.api_key = Some("sk-unused".into());
1889
1890 let llm = Arc::new(UsageLlm {
1891 responses: std::sync::Mutex::new(VecDeque::from([
1892 ChatOutput::with_usage(
1893 LlmResponse::Message("first".into()),
1894 motosan_agent_loop::TokenUsage {
1895 input_tokens: 100,
1896 output_tokens: 50,
1897 },
1898 ),
1899 ChatOutput::with_usage(
1900 LlmResponse::Message("after-reload".into()),
1901 motosan_agent_loop::TokenUsage {
1902 input_tokens: 5,
1903 output_tokens: 2,
1904 },
1905 ),
1906 ])),
1907 });
1908 let app = AppBuilder::new()
1909 .with_config(cfg)
1910 .with_cwd(dir.path())
1911 .with_builtin_tools()
1912 .with_llm(llm)
1913 .with_session_store(store)
1914 .build()
1915 .await
1916 .expect("build");
1917
1918 let _: Vec<UiEvent> = app
1919 .send_user_message(crate::user_message::UserMessage::text("hi"))
1920 .collect()
1921 .await;
1922 {
1923 let token_tally = app.token_tally();
1924 let tally = token_tally.lock().await;
1925 assert_eq!(tally.cumulative_input, 100);
1926 assert_eq!(tally.cumulative_output, 50);
1927 assert_eq!(tally.turn_count, 1);
1928 }
1929
1930 let mut new_settings = crate::settings::Settings::default();
1931 new_settings.model.name = "claude-opus-4-7".into();
1932 new_settings.ui.footer_show_cost = false;
1933 app.reload_settings(new_settings).await.expect("reload");
1934 assert_eq!(app.factory.settings().model.name, "claude-opus-4-7");
1935 assert!(!app.factory.settings().ui.footer_show_cost);
1936 assert_eq!(app.factory.current_model(), None);
1937
1938 {
1939 let token_tally = app.token_tally();
1940 let tally = token_tally.lock().await;
1941 assert_eq!(tally.cumulative_input, 0, "tally not reset on reload");
1942 assert_eq!(tally.cumulative_output, 0);
1943 assert_eq!(tally.turn_count, 0);
1944 }
1945
1946 let _: Vec<UiEvent> = app
1947 .send_user_message(crate::user_message::UserMessage::text("after"))
1948 .collect()
1949 .await;
1950 {
1951 let token_tally = app.token_tally();
1952 let tally = token_tally.lock().await;
1953 assert_eq!(tally.cumulative_input, 5);
1954 assert_eq!(tally.cumulative_output, 2);
1955 assert_eq!(tally.turn_count, 1);
1956 }
1957 }
1958
1959 #[tokio::test]
1960 async fn switch_model_preserves_history() {
1961 use futures::StreamExt;
1962 let dir = tempfile::tempdir().expect("tempdir");
1963 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1964 dir.path().join("s"),
1965 ));
1966 let mut config = Config::default();
1967 config.anthropic.api_key = Some("sk-unused".into());
1968 let app = AppBuilder::new()
1969 .with_config(config)
1970 .with_cwd(dir.path())
1971 .with_builtin_tools()
1972 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1973 .with_session_store(store)
1974 .build()
1975 .await
1976 .expect("build");
1977
1978 let _: Vec<_> = app
1979 .send_user_message(UserMessage::text("keep me"))
1980 .collect()
1981 .await;
1982 let id_before = app.session_id();
1983
1984 app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
1985 .await
1986 .expect("switch_model");
1987
1988 assert_eq!(
1989 app.session_id(),
1990 id_before,
1991 "switch_model keeps the same session"
1992 );
1993 let history = app.session_history().await.expect("history");
1994 assert!(history.iter().any(|m| m.text().contains("keep me")));
1995 }
1996
1997 #[tokio::test]
1998 async fn switch_model_is_sticky_for_future_session_rebuilds() {
1999 let dir = tempfile::tempdir().expect("tempdir");
2000 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2001 dir.path().join("s"),
2002 ));
2003 let mut config = Config::default();
2004 config.anthropic.api_key = Some("sk-unused".into());
2005 let app = AppBuilder::new()
2006 .with_config(config)
2007 .with_cwd(dir.path())
2008 .with_builtin_tools()
2009 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2010 .with_session_store(store)
2011 .build()
2012 .await
2013 .expect("build");
2014
2015 let selected = crate::model::ModelId::from("claude-opus-4-7");
2016 app.switch_model(&selected).await.expect("switch_model");
2017 app.new_session().await.expect("new_session");
2018
2019 assert_eq!(app.factory.current_model(), Some(selected.clone()));
2020 assert_eq!(app.settings().model.name, selected.to_string());
2021 }
2022
2023 struct SleepThenDoneLlm {
2024 turn: AtomicUsize,
2025 }
2026
2027 #[async_trait]
2028 impl LlmClient for SleepThenDoneLlm {
2029 async fn chat(
2030 &self,
2031 _messages: &[Message],
2032 _tools: &[ToolDef],
2033 ) -> motosan_agent_loop::Result<ChatOutput> {
2034 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2035 if turn == 0 {
2036 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2037 ToolCallItem {
2038 id: "sleep".into(),
2039 name: "bash".into(),
2040 args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
2041 },
2042 ])))
2043 } else {
2044 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2045 }
2046 }
2047 }
2048
2049 #[tokio::test]
2050 async fn cancel_reaches_builtin_tools_after_session_rebuild() {
2051 use futures::StreamExt;
2052 let dir = tempfile::tempdir().expect("tempdir");
2053 let mut config = Config::default();
2054 config.anthropic.api_key = Some("sk-unused".into());
2055 let app = Arc::new(
2056 AppBuilder::new()
2057 .with_config(config)
2058 .with_cwd(dir.path())
2059 .with_builtin_tools()
2060 .with_llm(Arc::new(SleepThenDoneLlm {
2061 turn: AtomicUsize::new(0),
2062 }) as Arc<dyn LlmClient>)
2063 .build()
2064 .await
2065 .expect("build"),
2066 );
2067
2068 app.new_session().await.expect("new_session");
2069 let running_app = Arc::clone(&app);
2070 let handle = tokio::spawn(async move {
2071 running_app
2072 .send_user_message(UserMessage::text("run a slow command"))
2073 .collect::<Vec<_>>()
2074 .await
2075 });
2076 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2077 app.cancel();
2078
2079 let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
2080 .await
2081 .expect("turn should finish after cancellation")
2082 .expect("join");
2083 assert!(
2084 events.iter().any(|event| {
2085 matches!(
2086 event,
2087 UiEvent::ToolCallCompleted { result, .. }
2088 if result.text.contains("command cancelled by user")
2089 )
2090 }),
2091 "cancel should reach the rebuilt bash tool: {events:?}"
2092 );
2093 }
2094
2095 #[tokio::test]
2096 async fn compact_summarizes_a_session_with_enough_history() {
2097 struct DoneLlm;
2098 #[async_trait]
2099 impl LlmClient for DoneLlm {
2100 async fn chat(
2101 &self,
2102 _messages: &[Message],
2103 _tools: &[ToolDef],
2104 ) -> motosan_agent_loop::Result<ChatOutput> {
2105 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2106 }
2107 }
2108
2109 let dir = tempfile::tempdir().expect("tempdir");
2110 let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
2111 dir.path().join("sessions"),
2112 ));
2113 let mut config = Config::default();
2114 config.anthropic.api_key = Some("sk-unused".into());
2115 let app = AppBuilder::new()
2116 .with_config(config)
2117 .with_cwd(dir.path())
2118 .with_builtin_tools()
2119 .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
2120 .with_session_store(store)
2121 .build()
2122 .await
2123 .expect("build");
2124
2125 for i in 0..4 {
2132 let _: Vec<_> = app
2133 .send_user_message(UserMessage::text(format!("turn {i}")))
2134 .collect()
2135 .await;
2136 }
2137
2138 app.compact().await.expect("compact should succeed");
2139
2140 let history = app.session_history().await.expect("history");
2143 assert!(
2144 !history.is_empty(),
2145 "session should still have content post-compaction"
2146 );
2147 }
2148
2149 #[test]
2150 fn anthropic_env_api_key_overrides_auth_json_key() {
2151 let mut auth = crate::auth::Auth::default();
2152 auth.0.insert(
2153 "anthropic".into(),
2154 crate::auth::ProviderAuth::ApiKey {
2155 key: "sk-auth".into(),
2156 },
2157 );
2158
2159 let key = anthropic_api_key_from(&auth, |name| {
2160 (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
2161 });
2162 assert_eq!(key.as_deref(), Some("sk-env"));
2163 }
2164
2165 #[tokio::test]
2166 async fn with_settings_overrides_deprecated_config_model() {
2167 use crate::settings::Settings;
2168
2169 let mut config = Config::default();
2170 config.model.name = "from-config".into();
2171 config.anthropic.api_key = Some("sk-config".into());
2172
2173 let mut settings = Settings::default();
2174 settings.model.name = "from-settings".into();
2175
2176 let tmp = tempfile::tempdir().unwrap();
2177 let app = AppBuilder::new()
2178 .with_config(config)
2179 .with_settings(settings)
2180 .with_cwd(tmp.path())
2181 .disable_context_discovery()
2182 .with_llm(Arc::new(EchoLlm))
2183 .build()
2184 .await
2185 .expect("build");
2186 assert_eq!(app.config().model.name, "from-settings");
2187 assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
2188 }
2189
2190 #[tokio::test]
2191 async fn with_settings_synthesises_legacy_config_for_build() {
2192 use crate::auth::{Auth, ProviderAuth};
2193 use crate::settings::Settings;
2194
2195 let mut settings = Settings::default();
2196 settings.model.name = "claude-sonnet-4-6".into();
2197
2198 let mut auth = Auth::default();
2199 auth.0.insert(
2200 "anthropic".into(),
2201 ProviderAuth::ApiKey {
2202 key: "sk-test".into(),
2203 },
2204 );
2205
2206 let tmp = tempfile::tempdir().unwrap();
2207 let app = AppBuilder::new()
2208 .with_settings(settings)
2209 .with_auth(auth)
2210 .with_cwd(tmp.path())
2211 .with_builtin_tools()
2212 .disable_context_discovery()
2213 .with_llm(Arc::new(EchoLlm))
2214 .build()
2215 .await
2216 .expect("build");
2217 let _ = app;
2218 }
2219
2220 #[tokio::test]
2221 async fn cancel_before_turn_does_not_poison_future_turns() {
2222 let dir = tempfile::tempdir().unwrap();
2223 let mut cfg = Config::default();
2224 cfg.anthropic.api_key = Some("sk-unused".into());
2225 let app = AppBuilder::new()
2226 .with_config(cfg)
2227 .with_cwd(dir.path())
2228 .with_builtin_tools()
2229 .with_llm(std::sync::Arc::new(EchoLlm))
2230 .build()
2231 .await
2232 .expect("build");
2233
2234 app.cancel();
2235 let events: Vec<UiEvent> = app
2236 .send_user_message(UserMessage::text("x"))
2237 .collect()
2238 .await;
2239
2240 assert!(
2241 events
2242 .iter()
2243 .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
2244 "turn should use a fresh cancellation token: {events:?}"
2245 );
2246 }
2247
2248 #[test]
2249 fn map_event_matches_started_and_completed_ids_by_tool_name() {
2250 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2251
2252 let started_bash = map_event(
2253 AgentEvent::Core(CoreEvent::ToolStarted {
2254 name: "bash".into(),
2255 }),
2256 &tracker,
2257 );
2258 let started_read = map_event(
2259 AgentEvent::Core(CoreEvent::ToolStarted {
2260 name: "read".into(),
2261 }),
2262 &tracker,
2263 );
2264 let completed_bash = map_event(
2265 AgentEvent::Core(CoreEvent::ToolCompleted {
2266 name: "bash".into(),
2267 result: motosan_agent_tool::ToolResult::text("ok"),
2268 }),
2269 &tracker,
2270 );
2271 let completed_read = map_event(
2272 AgentEvent::Core(CoreEvent::ToolCompleted {
2273 name: "read".into(),
2274 result: motosan_agent_tool::ToolResult::text("ok"),
2275 }),
2276 &tracker,
2277 );
2278
2279 assert!(matches!(
2280 started_bash,
2281 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
2282 ));
2283 assert!(matches!(
2284 started_read,
2285 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
2286 ));
2287 assert!(matches!(
2288 completed_bash,
2289 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
2290 ));
2291 assert!(matches!(
2292 completed_read,
2293 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
2294 ));
2295 }
2296
2297 #[test]
2298 fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
2299 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2300 let s1 = map_event(
2301 AgentEvent::Core(CoreEvent::ToolStarted {
2302 name: "bash".into(),
2303 }),
2304 &tracker,
2305 );
2306 let s2 = map_event(
2307 AgentEvent::Core(CoreEvent::ToolStarted {
2308 name: "bash".into(),
2309 }),
2310 &tracker,
2311 );
2312 let c1 = map_event(
2313 AgentEvent::Core(CoreEvent::ToolCompleted {
2314 name: "bash".into(),
2315 result: motosan_agent_tool::ToolResult::text("a"),
2316 }),
2317 &tracker,
2318 );
2319 let c2 = map_event(
2320 AgentEvent::Core(CoreEvent::ToolCompleted {
2321 name: "bash".into(),
2322 result: motosan_agent_tool::ToolResult::text("b"),
2323 }),
2324 &tracker,
2325 );
2326
2327 let id_s1 = match s1 {
2328 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2329 other => panic!("{other:?}"),
2330 };
2331 let id_s2 = match s2 {
2332 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2333 other => panic!("{other:?}"),
2334 };
2335 let id_c1 = match c1 {
2336 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2337 other => panic!("{other:?}"),
2338 };
2339 let id_c2 = match c2 {
2340 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2341 other => panic!("{other:?}"),
2342 };
2343
2344 assert_eq!(id_s1, id_c1);
2345 assert_eq!(id_s2, id_c2);
2346 assert_ne!(id_s1, id_s2);
2347 }
2348
2349 #[tokio::test]
2350 async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
2351 let dir = tempfile::tempdir().unwrap();
2352 let mut cfg = Config::default();
2353 cfg.anthropic.api_key = Some("sk-unused".into());
2354 let app = AppBuilder::new()
2355 .with_config(cfg)
2356 .with_cwd(dir.path())
2357 .with_builtin_tools()
2358 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2359 turn: AtomicUsize::new(0),
2360 }))
2361 .build()
2362 .await
2363 .expect("build");
2364
2365 let mut first = Box::pin(app.send_user_message(UserMessage::text("first")));
2366 let first_event = first.next().await;
2367 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2368
2369 let second_events: Vec<UiEvent> = app
2370 .send_user_message(UserMessage::text("second"))
2371 .collect()
2372 .await;
2373 assert_eq!(
2374 second_events.len(),
2375 1,
2376 "expected immediate single error event, got: {second_events:?}"
2377 );
2378 assert!(matches!(
2379 &second_events[0],
2380 UiEvent::Error(msg) if msg.contains("single-turn-per-App")
2381 ));
2382 }
2383
2384 #[tokio::test]
2385 async fn attachment_error_is_emitted_before_the_single_turn_guard_fires() {
2386 let dir = tempfile::tempdir().unwrap();
2392 let mut cfg = Config::default();
2393 cfg.anthropic.api_key = Some("sk-unused".into());
2394 let app = AppBuilder::new()
2395 .with_config(cfg)
2396 .with_cwd(dir.path())
2397 .with_builtin_tools()
2398 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2399 turn: AtomicUsize::new(0),
2400 }))
2401 .build()
2402 .await
2403 .expect("build");
2404
2405 let mut first =
2407 Box::pin(app.send_user_message(crate::user_message::UserMessage::text("first")));
2408 let first_event = first.next().await;
2409 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2410
2411 let bad = crate::user_message::UserMessage {
2413 text: "second".into(),
2414 attachments: vec![crate::user_message::Attachment::Image {
2415 path: std::path::PathBuf::from("/tmp/this-file-must-not-exist-capo-v06-test.png"),
2416 }],
2417 };
2418 let second_events: Vec<UiEvent> = app.send_user_message(bad).collect().await;
2419
2420 assert_eq!(
2421 second_events.len(),
2422 1,
2423 "expected exactly one event (the attachment error); got: {second_events:?}"
2424 );
2425 assert!(
2426 matches!(
2427 &second_events[0],
2428 UiEvent::AttachmentError {
2429 kind: crate::user_message::AttachmentErrorKind::NotFound,
2430 ..
2431 }
2432 ),
2433 "expected AttachmentError::NotFound as first event; got {second_events:?}"
2434 );
2435 }
2436
2437 #[test]
2438 fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
2439 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2440 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2441
2442 let only = map_event(
2443 AgentEvent::Core(CoreEvent::ToolStarted {
2444 name: "bash".into(),
2445 }),
2446 &tracker,
2447 );
2448 let only_id = match only {
2449 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2450 other => panic!("{other:?}"),
2451 };
2452 assert_eq!(progress_event_id(&tracker), only_id);
2453
2454 let _second = map_event(
2455 AgentEvent::Core(CoreEvent::ToolStarted {
2456 name: "read".into(),
2457 }),
2458 &tracker,
2459 );
2460 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2461 }
2462
2463 #[tokio::test]
2464 async fn builder_rejects_builtin_and_custom_tools_together() {
2465 let mut cfg = Config::default();
2466 cfg.anthropic.api_key = Some("sk-unused".into());
2467 let dir = tempfile::tempdir().unwrap();
2468 let err = match AppBuilder::new()
2469 .with_config(cfg)
2470 .with_cwd(dir.path())
2471 .with_builtin_tools()
2472 .with_custom_tools_factory(|_| Vec::new())
2473 .build()
2474 .await
2475 {
2476 Ok(_) => panic!("must reject conflicting tool configuration"),
2477 Err(err) => err,
2478 };
2479
2480 assert!(format!("{err}").contains("mutually exclusive"));
2481 }
2482
2483 #[tokio::test]
2485 async fn two_turns_in_same_session_share_history() {
2486 #[derive(Default)]
2487 struct CounterLlm {
2488 turn: AtomicUsize,
2489 }
2490 #[async_trait]
2491 impl LlmClient for CounterLlm {
2492 async fn chat(
2493 &self,
2494 messages: &[Message],
2495 _tools: &[ToolDef],
2496 ) -> motosan_agent_loop::Result<ChatOutput> {
2497 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2498 let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
2499 Ok(ChatOutput::new(LlmResponse::Message(answer)))
2500 }
2501 }
2502
2503 let tmp = tempfile::tempdir().unwrap();
2504 let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
2505 tmp.path().to_path_buf(),
2506 ));
2507
2508 let app = AppBuilder::new()
2509 .with_settings(crate::settings::Settings::default())
2510 .with_auth(crate::auth::Auth::default())
2511 .with_cwd(tmp.path())
2512 .with_builtin_tools()
2513 .disable_context_discovery()
2514 .with_llm(std::sync::Arc::new(CounterLlm::default()))
2515 .with_session_store(store)
2516 .build_with_session(None)
2517 .await
2518 .expect("build");
2519
2520 let _events1: Vec<UiEvent> = app
2521 .send_user_message(UserMessage::text("hi"))
2522 .collect()
2523 .await;
2524 let events2: Vec<UiEvent> = app
2525 .send_user_message(UserMessage::text("again"))
2526 .collect()
2527 .await;
2528
2529 let saw_more_than_one = events2.iter().any(|e| {
2531 matches!(
2532 e,
2533 UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
2534 )
2535 });
2536 assert!(
2537 saw_more_than_one,
2538 "second turn should have seen history; events: {events2:?}"
2539 );
2540 }
2541}
2542
2543#[cfg(test)]
2544mod skills_builder_tests {
2545 use super::*;
2546 use crate::skills::types::{Skill, SkillSource};
2547 use std::path::PathBuf;
2548
2549 fn fixture() -> Skill {
2550 Skill {
2551 name: "x".into(),
2552 description: "d".into(),
2553 file_path: PathBuf::from("/x.md"),
2554 base_dir: PathBuf::from("/"),
2555 disable_model_invocation: false,
2556 source: SkillSource::Global,
2557 }
2558 }
2559
2560 #[test]
2561 fn with_skills_stores_skills() {
2562 let b = AppBuilder::new().with_skills(vec![fixture()]);
2563 assert_eq!(b.skills.len(), 1);
2564 assert_eq!(b.skills[0].name, "x");
2565 }
2566
2567 #[test]
2568 fn without_skills_clears() {
2569 let b = AppBuilder::new()
2570 .with_skills(vec![fixture()])
2571 .without_skills();
2572 assert!(b.skills.is_empty());
2573 }
2574}
2575
2576#[cfg(test)]
2577mod mcp_builder_tests {
2578 use super::*;
2579 use motosan_agent_tool::Tool;
2580
2581 struct FakeTool;
2583 impl Tool for FakeTool {
2584 fn def(&self) -> motosan_agent_tool::ToolDef {
2585 motosan_agent_tool::ToolDef {
2586 name: "fake__echo".into(),
2587 description: "test".into(),
2588 input_schema: serde_json::json!({"type": "object"}),
2589 }
2590 }
2591 fn call(
2592 &self,
2593 _args: serde_json::Value,
2594 _ctx: &motosan_agent_tool::ToolContext,
2595 ) -> std::pin::Pin<
2596 Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
2597 > {
2598 Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
2599 }
2600 }
2601
2602 #[test]
2603 fn with_extra_tools_stores_tools() {
2604 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
2605 let b = AppBuilder::new().with_extra_tools(tools);
2606 assert_eq!(b.extra_tools.len(), 1);
2607 }
2608}