1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9 AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10 CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
27pub struct TurnStatsAccum {
28 pub cumulative_input: u64,
29 pub cumulative_output: u64,
30 pub turn_count: u64,
31}
32
33impl TurnStatsAccum {
34 pub fn add(&mut self, usage: motosan_agent_loop::TokenUsage) {
38 self.cumulative_input = self.cumulative_input.saturating_add(usage.input_tokens);
39 self.cumulative_output = self.cumulative_output.saturating_add(usage.output_tokens);
40 self.turn_count = self.turn_count.saturating_add(1);
41 }
42}
43
44pub(crate) fn extract_thinking_events(messages: &[motosan_agent_loop::Message]) -> Vec<UiEvent> {
52 let mut events = Vec::new();
53 for msg in messages {
54 if let motosan_agent_loop::Message::Assistant { content, .. } = msg {
55 for part in content {
56 if let motosan_agent_loop::AssistantContent::Reasoning { text, .. } = part {
57 events.push(UiEvent::ThinkingComplete { text: text.clone() });
58 }
59 }
60 }
61 }
62 events
63}
64
65pub(crate) fn extract_new_thinking_events(
70 messages: &[motosan_agent_loop::Message],
71 previous_len: usize,
72) -> Vec<UiEvent> {
73 extract_thinking_events(messages.get(previous_len..).unwrap_or(&[]))
74}
75
76#[derive(Debug, Clone)]
78pub(crate) enum SessionMode {
79 New,
81 Resume(String),
83}
84
85struct SharedLlm {
90 client: Arc<dyn LlmClient>,
91}
92
93impl SharedLlm {
94 fn new(client: Arc<dyn LlmClient>) -> Self {
95 Self { client }
96 }
97
98 fn client(&self) -> Arc<dyn LlmClient> {
99 Arc::clone(&self.client)
100 }
101}
102
103pub(crate) struct SessionFactory {
104 cwd: PathBuf,
105 settings: Arc<Mutex<crate::settings::Settings>>,
106 auth: crate::auth::Auth,
107 policy: Arc<crate::permissions::Policy>,
108 session_cache: Arc<crate::permissions::SessionCache>,
109 ui_tx: Option<mpsc::Sender<UiEvent>>,
110 headless_permissions: bool,
111 permission_gate: Arc<dyn PermissionGate>,
112 progress_tx: mpsc::Sender<ToolProgressChunk>,
115 skills: Arc<Vec<crate::skills::Skill>>,
116 install_builtin_tools: bool,
117 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
118 max_iterations: usize,
119 context_discovery_disabled: bool,
120 autocompact_enabled: bool,
121 session_store: Option<Arc<dyn SessionStore>>,
122 llm_override: Option<Arc<dyn LlmClient>>,
123 current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
124 cancel_token: SharedCancelToken,
125}
126
127impl SessionFactory {
128 fn settings(&self) -> crate::settings::Settings {
129 match self.settings.lock() {
130 Ok(guard) => guard.clone(),
131 Err(poisoned) => poisoned.into_inner().clone(),
132 }
133 }
134
135 fn store_settings(&self, settings: crate::settings::Settings) {
136 match self.settings.lock() {
137 Ok(mut guard) => *guard = settings,
138 Err(poisoned) => *poisoned.into_inner() = settings,
139 }
140 }
141
142 fn current_model(&self) -> Option<crate::model::ModelId> {
143 match self.current_model.lock() {
144 Ok(guard) => guard.clone(),
145 Err(poisoned) => poisoned.into_inner().clone(),
146 }
147 }
148
149 fn set_current_model(&self, model: crate::model::ModelId) {
150 match self.current_model.lock() {
151 Ok(mut guard) => *guard = Some(model),
152 Err(poisoned) => *poisoned.into_inner() = Some(model),
153 }
154 }
155
156 fn clear_current_model(&self) {
157 match self.current_model.lock() {
158 Ok(mut guard) => *guard = None,
159 Err(poisoned) => *poisoned.into_inner() = None,
160 }
161 }
162
163 async fn build(
170 &self,
171 mode: SessionMode,
172 model_override: Option<&crate::model::ModelId>,
173 settings_override: Option<&crate::settings::Settings>,
174 ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
175 let effective_model = model_override.cloned().or_else(|| {
179 if settings_override.is_some() {
180 None
181 } else {
182 self.current_model()
183 }
184 });
185 let mut settings = settings_override
186 .cloned()
187 .unwrap_or_else(|| self.settings());
188 if let Some(m) = &effective_model {
189 settings.model.name = m.as_str().to_string();
190 }
191
192 let llm = if effective_model.is_none() {
193 self.llm_override.as_ref().map_or_else(
194 || build_llm_client(&settings, &self.auth),
195 |llm| Ok(Arc::clone(llm)),
196 )?
197 } else {
198 build_llm_client(&settings, &self.auth)?
199 };
200
201 let tool_ctx = ToolCtx::new_with_cancel_token(
204 &self.cwd,
205 Arc::clone(&self.permission_gate),
206 self.progress_tx.clone(),
207 self.cancel_token.clone(),
208 );
209 let mut tools = if self.install_builtin_tools {
210 builtin_tools(tool_ctx.clone())
211 } else {
212 Vec::new()
213 };
214 tools.extend(self.extra_tools.iter().cloned());
215
216 let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
217 let base_prompt = build_system_prompt(&tool_names, &self.skills);
218 let system_prompt = if self.context_discovery_disabled {
219 base_prompt
220 } else {
221 let agent_dir = crate::paths::agent_dir();
222 let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
223 crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
224 };
225 let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
226
227 let mut engine_builder = Engine::builder()
228 .max_iterations(self.max_iterations)
229 .system_prompt(system_prompt)
230 .tool_context(motosan_tool_context);
231 for tool in tools {
232 engine_builder = engine_builder.tool(tool);
233 }
234 if let Some(ui_tx) = &self.ui_tx {
235 let ext = crate::permissions::PermissionExtension::new(
236 Arc::clone(&self.policy),
237 Arc::clone(&self.session_cache),
238 self.cwd.clone(),
239 ui_tx.clone(),
240 );
241 engine_builder = engine_builder.extension(Box::new(ext));
242 } else if self.headless_permissions {
243 let ext = crate::permissions::PermissionExtension::headless(
244 Arc::clone(&self.policy),
245 Arc::clone(&self.session_cache),
246 self.cwd.clone(),
247 );
248 engine_builder = engine_builder.extension(Box::new(ext));
249 }
250 if self.autocompact_enabled
251 && settings.session.compact_at_context_pct > 0.0
252 && settings.session.compact_at_context_pct < 1.0
253 {
254 let cfg = AutocompactConfig {
255 threshold: settings.session.compact_at_context_pct,
256 max_context_tokens: settings.session.max_context_tokens,
257 keep_turns: settings.session.keep_turns.max(1),
258 };
259 engine_builder = engine_builder
260 .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
261 }
262 let engine = engine_builder.build();
263
264 let session = match (&mode, &self.session_store) {
265 (SessionMode::Resume(id), Some(store)) => {
266 let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
267 .await
268 .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
269 let entries = s
270 .entries()
271 .await
272 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
273 crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
274 s
275 }
276 (SessionMode::Resume(_), None) => {
277 return Err(AppError::Config("resume requires a session store".into()));
278 }
279 (SessionMode::New, Some(store)) => {
280 let id = crate::session::SessionId::new();
281 AgentSession::new_with_store(
282 id.into_string(),
283 Arc::clone(store),
284 engine,
285 Arc::clone(&llm),
286 )
287 }
288 (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
289 };
290
291 Ok((session, llm))
292 }
293}
294
295pub struct App {
296 session: arc_swap::ArcSwap<AgentSession>,
297 llm: arc_swap::ArcSwap<SharedLlm>,
298 factory: SessionFactory,
299 config: Config,
300 cancel_token: SharedCancelToken,
301 progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
302 next_tool_id: Arc<Mutex<ToolCallTracker>>,
303 skills: Arc<Vec<crate::skills::Skill>>,
304 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
305 pub(crate) extension_registry: Arc<crate::extensions::ExtensionRegistry>,
307 pub(crate) token_tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>,
312 extension_diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
313 pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
314}
315
316impl App {
317 pub fn config(&self) -> &Config {
318 &self.config
319 }
320
321 pub fn cancel(&self) {
325 self.cancel_token.cancel();
326 }
327
328 pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
332 Arc::clone(&self.session_cache)
333 }
334
335 pub fn extension_registry(&self) -> Arc<crate::extensions::ExtensionRegistry> {
338 Arc::clone(&self.extension_registry)
339 }
340
341 pub fn settings(&self) -> crate::settings::Settings {
343 let mut settings = self.factory.settings();
344 if let Some(model) = self.factory.current_model() {
345 settings.model.name = model.to_string();
346 }
347 settings
348 }
349
350 pub fn token_tally(&self) -> Arc<tokio::sync::Mutex<TurnStatsAccum>> {
352 Arc::clone(&self.token_tally)
353 }
354
355 pub fn extension_diagnostics(&self) -> Arc<Vec<crate::extensions::ExtensionDiagnostic>> {
357 Arc::clone(&self.extension_diagnostics)
358 }
359
360 pub fn session_id(&self) -> String {
363 self.session.load().session_id().to_string()
364 }
365
366 pub async fn session_history(
375 &self,
376 ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
377 self.session.load_full().history().await
378 }
379
380 pub async fn compact(&self) -> Result<()> {
386 use motosan_agent_loop::ThresholdStrategy;
387 let strategy = ThresholdStrategy {
388 threshold: 0.0,
389 ..ThresholdStrategy::default()
390 };
391 let llm = self.llm.load_full().client();
392 self.session
393 .load_full()
394 .maybe_compact(&strategy, llm)
395 .await
396 .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
397 Ok(())
398 }
399
400 pub async fn new_session(&self) -> Result<()> {
404 self.fire_session_before_switch("new", None).await?;
405 let (session, llm) = self.factory.build(SessionMode::New, None, None).await?;
406 self.session.store(Arc::new(session));
407 self.llm.store(Arc::new(SharedLlm::new(llm)));
408 self.reset_token_tally().await;
409 Ok(())
410 }
411
412 pub async fn load_session(&self, id: &str) -> Result<()> {
418 self.fire_session_before_switch("load", Some(id)).await?;
419 self.load_session_without_hook(id).await
420 }
421
422 async fn load_session_without_hook(&self, id: &str) -> Result<()> {
423 let (session, llm) = self
424 .factory
425 .build(SessionMode::Resume(id.to_string()), None, None)
426 .await?;
427 self.session.store(Arc::new(session));
428 self.llm.store(Arc::new(SharedLlm::new(llm)));
429 self.reset_token_tally().await;
430 Ok(())
431 }
432
433 async fn reset_token_tally(&self) {
434 let mut tally = self.token_tally.lock().await;
435 *tally = TurnStatsAccum::default();
436 }
437
438 pub async fn clone_session(&self) -> Result<String> {
447 self.fire_session_before_switch("clone", None).await?;
448 let Some(store) = self.factory.session_store.as_ref() else {
449 return Err(AppError::Config("clone requires a session store".into()));
450 };
451 let source_id = self.session.load().session_id().to_string();
452 let new_id = crate::session::SessionId::new().into_string();
453 let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
454 catalog
455 .fork(&source_id, &new_id)
456 .await
457 .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
458 self.load_session_without_hook(&new_id).await?;
459 Ok(new_id)
460 }
461
462 pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
477 self.fire_session_before_switch("model_switch", None)
478 .await?;
479 let current_id = self.session.load().session_id().to_string();
480 let (session, llm) = self
481 .factory
482 .build(SessionMode::Resume(current_id), Some(model), None)
483 .await?;
484 self.factory.set_current_model(model.clone());
485 self.session.store(Arc::new(session));
486 self.llm.store(Arc::new(SharedLlm::new(llm)));
487 Ok(())
488 }
489
490 pub async fn reload_settings(&self, new_settings: crate::settings::Settings) -> Result<()> {
506 let current_id = self.session.load().session_id().to_string();
510 let (session, llm) = self
511 .factory
512 .build(SessionMode::Resume(current_id), None, Some(&new_settings))
513 .await?;
514 self.factory.store_settings(new_settings);
515 self.factory.clear_current_model();
516 self.session.store(Arc::new(session));
517 self.llm.store(Arc::new(SharedLlm::new(llm)));
518
519 self.reset_token_tally().await;
522
523 Ok(())
524 }
525
526 pub async fn disconnect_mcp(&self) {
529 for (name, server) in &self.mcp_servers {
530 let _ =
531 tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
532 tracing::debug!(target: "mcp", server = %name, "disconnected");
533 }
534 }
535
536 fn run_turn(
537 &self,
538 msg: crate::user_message::UserMessage,
539 fork_from: Option<motosan_agent_loop::EntryId>,
540 ) -> impl Stream<Item = UiEvent> + Send + 'static {
541 let session = self.session.load_full();
542 let skills = Arc::clone(&self.skills);
543 let cancel_token = self.cancel_token.clone();
544 let tracker = Arc::clone(&self.next_tool_id);
545 let progress = Arc::clone(&self.progress_rx);
546 let token_tally = Arc::clone(&self.token_tally);
547 let settings_model_name = self
548 .factory
549 .current_model()
550 .map(|model| model.to_string())
551 .unwrap_or_else(|| self.factory.settings().model.name);
552
553 async_stream::stream! {
554 let new_user = {
558 let expanded_text = crate::skills::expand::expand_skill_command(&msg.text, &skills);
562 let expanded_msg = crate::user_message::UserMessage {
563 text: expanded_text,
564 attachments: msg.attachments.clone(),
565 };
566 match crate::user_message::prepare_user_message(&expanded_msg) {
567 Ok(m) => m,
568 Err(err) => {
569 yield UiEvent::AttachmentError {
570 kind: err.kind(),
571 message: err.to_string(),
572 };
573 return;
574 }
575 }
576 };
577
578 let mut progress_guard = match progress.try_lock() {
580 Ok(guard) => guard,
581 Err(_) => {
582 yield UiEvent::Error(
583 "another turn is already running; capo is single-turn-per-App".into(),
584 );
585 return;
586 }
587 };
588
589 let cancel = cancel_token.reset();
591
592 yield UiEvent::AgentTurnStarted;
593 yield UiEvent::AgentThinking;
594
595 let handle = match fork_from {
597 None => {
598 let history = match session.history().await {
600 Ok(h) => h,
601 Err(err) => {
602 yield UiEvent::Error(format!("session.history failed: {err}"));
603 return;
604 }
605 };
606 let mut messages = history;
607 messages.push(new_user);
608 match session.start_turn(messages).await {
609 Ok(h) => h,
610 Err(err) => {
611 yield UiEvent::Error(format!("session.start_turn failed: {err}"));
612 return;
613 }
614 }
615 }
616 Some(from) => {
617 match session.fork_turn(from, vec![new_user]).await {
619 Ok(h) => h,
620 Err(err) => {
621 yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
622 return;
623 }
624 }
625 }
626 };
627 let previous_len = handle.previous_len;
628 let epoch = handle.epoch;
629 let branch_parent = handle.branch_parent;
630 let ops_tx = handle.ops_tx.clone();
631 let mut agent_stream = handle.stream;
632
633 let interrupt_bridge = tokio::spawn(async move {
641 cancel.cancelled().await;
642 let _ = ops_tx.send(AgentOp::Interrupt).await;
643 });
644
645 let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
647 let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
648
649 loop {
650 while let Ok(chunk) = progress_guard.try_recv() {
652 yield UiEvent::ToolCallProgress {
653 id: progress_event_id(&tracker),
654 chunk: ProgressChunk::from(chunk),
655 };
656 }
657
658 tokio::select! {
659 biased;
660 maybe_item = agent_stream.next() => {
661 match maybe_item {
662 Some(AgentStreamItem::Event(ev)) => {
663 if let Some(ui) = map_event(ev, &tracker) {
664 yield ui;
665 }
666 }
667 Some(AgentStreamItem::Terminal(term)) => {
668 terminal_result = Some(term.result);
669 terminal_messages = Some(term.messages);
670 break;
671 }
672 None => break,
673 }
674 }
675 Some(chunk) = progress_guard.recv() => {
676 yield UiEvent::ToolCallProgress {
677 id: progress_event_id(&tracker),
678 chunk: ProgressChunk::from(chunk),
679 };
680 }
681 }
682 }
683
684 interrupt_bridge.abort();
686
687 if let Some(msgs) = terminal_messages.as_ref() {
689 if let Err(err) = session
690 .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
691 .await
692 {
693 yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
694 }
695 }
696
697 match terminal_result {
699 Some(Ok(result)) => {
700 if let Some(msgs) = terminal_messages.as_ref() {
703 for ev in extract_new_thinking_events(msgs, previous_len) {
704 yield ev;
705 }
706 }
707
708 let final_text = terminal_messages
709 .as_ref()
710 .and_then(|msgs| {
711 msgs.iter()
712 .rev()
713 .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
714 .map(|m| m.text())
715 })
716 .unwrap_or_default();
717 if !final_text.is_empty() {
718 yield UiEvent::AgentMessageComplete(final_text);
719 }
720 let usage = result.usage;
723 let (cumulative_input, cumulative_output) = {
724 let mut tally = token_tally.lock().await;
725 tally.add(usage);
726 (tally.cumulative_input, tally.cumulative_output)
727 };
728 yield UiEvent::TurnStats {
729 input_tokens: usage.input_tokens,
730 output_tokens: usage.output_tokens,
731 cumulative_input,
732 cumulative_output,
733 model: settings_model_name.clone(),
734 };
735 while let Ok(chunk) = progress_guard.try_recv() {
737 yield UiEvent::ToolCallProgress {
738 id: progress_event_id(&tracker),
739 chunk: ProgressChunk::from(chunk),
740 };
741 }
742 yield UiEvent::AgentTurnComplete;
743 }
744 Some(Err(err)) => {
745 yield UiEvent::Error(format!("{err}"));
746 }
747 None => { }
748 }
749 }
750 }
751
752 pub fn send_user_message(
753 &self,
754 msg: crate::user_message::UserMessage,
755 ) -> impl Stream<Item = UiEvent> + Send + 'static {
756 self.run_turn(msg, None)
757 }
758
759 pub fn fork_from(
764 &self,
765 from: motosan_agent_loop::EntryId,
766 message: crate::user_message::UserMessage,
767 ) -> impl Stream<Item = UiEvent> + Send + 'static {
768 let registry = Arc::clone(&self.extension_registry);
769 let inner = self.run_turn(message, Some(from));
770 async_stream::stream! {
771 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
772 match dispatch_session_before_switch(®istry, "fork", None).await {
773 HookOutcome::Continue => {}
774 HookOutcome::Cancelled { extension_name, reason } => {
775 let msg = match reason {
776 Some(r) => format!("extension `{extension_name}` cancelled fork: {r}"),
777 None => format!("extension `{extension_name}` cancelled fork"),
778 };
779 yield UiEvent::Error(msg);
780 return;
781 }
782 }
783 let mut inner = Box::pin(inner);
784 while let Some(ev) = futures::StreamExt::next(&mut inner).await {
785 yield ev;
786 }
787 }
788 }
789
790 pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
794 let entries = self
795 .session
796 .load_full()
797 .entries()
798 .await
799 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
800 let branch = motosan_agent_loop::active_branch(&entries);
801 let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
802 .iter()
803 .filter_map(|stored| {
804 let msg = stored.entry.as_message()?;
805 if !matches!(msg.role(), motosan_agent_loop::Role::User) {
806 return None;
807 }
808 let preview: String = msg
809 .text()
810 .lines()
811 .next()
812 .unwrap_or("")
813 .chars()
814 .take(80)
815 .collect();
816 Some((stored.id.clone(), preview))
817 })
818 .collect();
819 out.reverse();
820 Ok(out)
821 }
822
823 pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
825 self.session
826 .load_full()
827 .branches()
828 .await
829 .map_err(|e| AppError::Config(format!("branches failed: {e}")))
830 }
831
832 async fn fire_session_before_switch(
835 &self,
836 reason: &str,
837 session_id: Option<&str>,
838 ) -> Result<()> {
839 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
840 match dispatch_session_before_switch(&self.extension_registry, reason, session_id).await {
841 HookOutcome::Continue => Ok(()),
842 HookOutcome::Cancelled {
843 extension_name,
844 reason,
845 } => Err(AppError::HookCancelled {
846 extension_name,
847 reason,
848 }),
849 }
850 }
851}
852
853#[derive(Debug, Default)]
854struct ToolCallTracker {
855 next_id: usize,
856 pending: VecDeque<(String, String)>,
857}
858
859impl ToolCallTracker {
860 fn start(&mut self, name: &str) -> String {
861 self.next_id += 1;
862 let id = format!("tool_{}", self.next_id);
863 self.pending.push_back((name.to_string(), id.clone()));
864 id
865 }
866
867 fn complete(&mut self, name: &str) -> String {
868 if let Some(pos) = self
869 .pending
870 .iter()
871 .position(|(pending_name, _)| pending_name == name)
872 {
873 if let Some((_, id)) = self.pending.remove(pos) {
874 return id;
875 }
876 }
877
878 self.next_id += 1;
879 format!("tool_{}", self.next_id)
880 }
881
882 fn progress_id(&self) -> Option<String> {
887 match self.pending.len() {
888 1 => self.pending.front().map(|(_, id)| id.clone()),
889 _ => None,
890 }
891 }
892}
893
894fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
895 match tracker.lock() {
896 Ok(guard) => guard,
897 Err(poisoned) => poisoned.into_inner(),
898 }
899}
900
901fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
902 lock_tool_tracker(tracker)
903 .progress_id()
904 .unwrap_or_else(|| "tool_unknown".to_string())
905}
906
907fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
908where
909 F: Fn(&str) -> Option<String>,
910{
911 env_lookup("ANTHROPIC_API_KEY")
912 .map(|key| key.trim().to_string())
913 .filter(|key| !key.is_empty())
914 .or_else(|| auth.api_key("anthropic").map(str::to_string))
915}
916
917fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
918 match ev {
919 AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
920 AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
921 let id = lock_tool_tracker(tool_tracker).start(&name);
922 Some(UiEvent::ToolCallStarted {
923 id,
924 name,
925 args: serde_json::json!({}),
926 })
927 }
928 AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
929 let id = lock_tool_tracker(tool_tracker).complete(&name);
930 Some(UiEvent::ToolCallCompleted {
931 id,
932 result: UiToolResult {
933 is_error: result.is_error,
934 text: format!("{name}: {result:?}"),
935 },
936 })
937 }
938 _ => None,
939 }
940}
941
942type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
943
944pub struct AppBuilder {
945 config: Option<Config>,
946 cwd: Option<PathBuf>,
947 permission_gate: Option<Arc<dyn PermissionGate>>,
948 install_builtin_tools: bool,
949 max_iterations: usize,
950 llm_override: Option<Arc<dyn LlmClient>>,
951 custom_tools_factory: Option<CustomToolsFactory>,
952 permissions_policy_path: Option<PathBuf>,
953 ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
954 headless_permissions: bool,
955 settings: Option<crate::settings::Settings>,
956 auth: Option<crate::auth::Auth>,
957 context_discovery_disabled: bool,
958 session_store: Option<Arc<dyn SessionStore>>,
960 resume_session_id: Option<crate::session::SessionId>,
961 autocompact_enabled: bool,
962 skills: Vec<crate::skills::Skill>,
964 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
966 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
967 extension_registry: Option<Arc<crate::extensions::ExtensionRegistry>>,
968 extension_diagnostics: Option<Arc<Vec<crate::extensions::ExtensionDiagnostic>>>,
969 token_tally: Option<Arc<tokio::sync::Mutex<TurnStatsAccum>>>,
970}
971
972impl Default for AppBuilder {
973 fn default() -> Self {
974 Self {
975 config: None,
976 cwd: None,
977 permission_gate: None,
978 install_builtin_tools: false,
979 max_iterations: 20,
980 llm_override: None,
981 custom_tools_factory: None,
982 permissions_policy_path: None,
983 ui_tx: None,
984 headless_permissions: false,
985 settings: None,
986 auth: None,
987 context_discovery_disabled: false,
988 session_store: None,
989 resume_session_id: None,
990 autocompact_enabled: false,
991 skills: Vec::new(),
992 extra_tools: Vec::new(),
993 mcp_servers: Vec::new(),
994 extension_registry: None,
995 extension_diagnostics: None,
996 token_tally: None,
997 }
998 }
999}
1000
1001impl AppBuilder {
1002 pub fn new() -> Self {
1003 Self::default()
1004 }
1005
1006 pub fn with_config(mut self, cfg: Config) -> Self {
1007 self.config = Some(cfg);
1008 self
1009 }
1010
1011 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
1012 self.cwd = Some(cwd.into());
1013 self
1014 }
1015
1016 pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
1017 self.permission_gate = Some(gate);
1018 self
1019 }
1020
1021 pub fn with_builtin_tools(mut self) -> Self {
1027 self.install_builtin_tools = true;
1028 self
1029 }
1030
1031 pub fn with_max_iterations(mut self, n: usize) -> Self {
1032 self.max_iterations = n;
1033 self
1034 }
1035
1036 pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
1037 self.llm_override = Some(llm);
1038 self
1039 }
1040
1041 pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
1042 self.permissions_policy_path = Some(path);
1043 self
1044 }
1045
1046 pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
1047 self.ui_tx = Some(tx);
1048 self
1049 }
1050
1051 pub fn with_token_tally(mut self, tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>) -> Self {
1053 self.token_tally = Some(tally);
1054 self
1055 }
1056
1057 pub fn with_headless_permissions(mut self) -> Self {
1062 self.headless_permissions = true;
1063 self
1064 }
1065
1066 pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
1068 self.settings = Some(settings);
1069 self
1070 }
1071
1072 pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
1074 self.auth = Some(auth);
1075 self
1076 }
1077
1078 pub fn disable_context_discovery(mut self) -> Self {
1081 self.context_discovery_disabled = true;
1082 self
1083 }
1084
1085 pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
1088 self.session_store = Some(store);
1089 self
1090 }
1091
1092 pub fn with_autocompact(mut self) -> Self {
1097 self.autocompact_enabled = true;
1098 self
1099 }
1100
1101 pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
1107 self.skills = skills;
1108 self
1109 }
1110
1111 pub fn without_skills(mut self) -> Self {
1112 self.skills.clear();
1113 self
1114 }
1115
1116 pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
1120 self.extra_tools = tools;
1121 self
1122 }
1123
1124 pub fn with_extension_registry(
1125 mut self,
1126 registry: Arc<crate::extensions::ExtensionRegistry>,
1127 ) -> Self {
1128 self.extension_registry = Some(registry);
1129 self
1130 }
1131
1132 pub fn with_extension_diagnostics(
1133 mut self,
1134 diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
1135 ) -> Self {
1136 self.extension_diagnostics = Some(diagnostics);
1137 self
1138 }
1139
1140 pub fn with_mcp_servers(
1143 mut self,
1144 servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1145 ) -> Self {
1146 self.mcp_servers = servers;
1147 self
1148 }
1149
1150 pub fn with_custom_tools_factory(
1155 mut self,
1156 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1157 ) -> Self {
1158 self.custom_tools_factory = Some(Box::new(factory));
1159 self
1160 }
1161
1162 pub async fn build_with_custom_tools(
1166 self,
1167 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1168 ) -> Result<App> {
1169 self.with_custom_tools_factory(factory).build().await
1170 }
1171
1172 pub async fn build_with_session(
1180 mut self,
1181 resume: Option<crate::session::SessionId>,
1182 ) -> Result<App> {
1183 if let Some(id) = resume {
1184 if self.session_store.is_none() {
1185 return Err(AppError::Config(
1186 "build_with_session(Some(id)) requires with_session_store(...)".into(),
1187 ));
1188 }
1189 self.resume_session_id = Some(id);
1190 }
1191 self.build_internal().await
1192 }
1193
1194 pub async fn build(self) -> Result<App> {
1196 self.build_with_session(None).await
1197 }
1198
1199 async fn build_internal(mut self) -> Result<App> {
1200 let mcp_servers = std::mem::take(&mut self.mcp_servers);
1201 let extra_tools = std::mem::take(&mut self.extra_tools);
1202 let skills = self.skills.clone();
1203 if self.install_builtin_tools && self.custom_tools_factory.is_some() {
1204 return Err(AppError::Config(
1205 "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
1206 ));
1207 }
1208
1209 let has_config = self.config.is_some();
1213 let has_auth = self.auth.is_some();
1214 let mut config = self.config.unwrap_or_default();
1215 let settings = match self.settings {
1216 Some(settings) => settings,
1217 None => {
1218 let mut settings = crate::settings::Settings::default();
1219 settings.model.provider = config.model.provider.clone();
1220 settings.model.name = config.model.name.clone();
1221 settings.model.max_tokens = config.model.max_tokens;
1222 settings
1223 }
1224 };
1225 config.model.provider = settings.model.provider.clone();
1226 config.model.name = settings.model.name.clone();
1227 config.model.max_tokens = settings.model.max_tokens;
1228 let mut auth = self.auth.unwrap_or_default();
1229 if !has_auth {
1230 if let Some(key) = config.anthropic.api_key.as_deref() {
1231 auth.0.insert(
1232 "anthropic".into(),
1233 crate::auth::ProviderAuth::ApiKey {
1234 key: key.to_string(),
1235 },
1236 );
1237 }
1238 }
1239 let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
1240 if env_or_auth_key.is_some() || has_auth || !has_config {
1241 config.anthropic.api_key = env_or_auth_key;
1242 }
1243 let cwd = self
1244 .cwd
1245 .or_else(|| std::env::current_dir().ok())
1246 .unwrap_or_else(|| PathBuf::from("."));
1247 let agent_dir = crate::paths::agent_dir();
1248 let permission_gate = self.permission_gate.unwrap_or_else(|| {
1249 if self.ui_tx.is_some() || self.headless_permissions {
1253 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1254 } else {
1255 tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
1256 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1257 }
1258 });
1259
1260 let policy: Arc<crate::permissions::Policy> =
1262 Arc::new(match self.permissions_policy_path.as_ref() {
1263 Some(path) => crate::permissions::Policy::load_or_default(path)?,
1264 None => crate::permissions::Policy::default(),
1265 });
1266 let session_cache = Arc::new(crate::permissions::SessionCache::new());
1267 let token_tally = self
1268 .token_tally
1269 .unwrap_or_else(|| Arc::new(tokio::sync::Mutex::new(TurnStatsAccum::default())));
1270
1271 let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
1273
1274 let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
1279 let cancel_token = probe_ctx.cancel_token.clone();
1280 let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
1281 if let Some(factory_fn) = self.custom_tools_factory.take() {
1282 let mut t = factory_fn(probe_ctx);
1283 t.extend(extra_tools.clone());
1284 (false, t)
1285 } else {
1286 (self.install_builtin_tools, extra_tools.clone())
1287 };
1288
1289 let factory = SessionFactory {
1290 cwd: cwd.clone(),
1291 settings: Arc::new(Mutex::new(settings.clone())),
1292 auth: auth.clone(),
1293 policy: Arc::clone(&policy),
1294 session_cache: Arc::clone(&session_cache),
1295 ui_tx: self.ui_tx.clone(),
1296 headless_permissions: self.headless_permissions,
1297 permission_gate: Arc::clone(&permission_gate),
1298 progress_tx: progress_tx.clone(),
1299 skills: Arc::new(skills.clone()),
1300 install_builtin_tools: install_builtin,
1301 extra_tools: factory_extra_tools,
1302 max_iterations: self.max_iterations,
1303 context_discovery_disabled: self.context_discovery_disabled,
1304 autocompact_enabled: self.autocompact_enabled,
1305 session_store: self.session_store.clone(),
1306 llm_override: self.llm_override.clone(),
1307 current_model: Arc::new(Mutex::new(None)),
1308 cancel_token: cancel_token.clone(),
1309 };
1310
1311 let mode = match self.resume_session_id.take() {
1312 Some(id) => SessionMode::Resume(id.into_string()),
1313 None => SessionMode::New,
1314 };
1315 let (session, llm) = factory.build(mode, None, None).await?;
1316 let (extension_registry, extension_diagnostics) =
1317 if let Some(reg) = self.extension_registry.take() {
1318 let diagnostics = self
1319 .extension_diagnostics
1320 .unwrap_or_else(|| Arc::new(Vec::new()));
1321 (reg, diagnostics)
1322 } else {
1323 let manifest_path = crate::paths::extensions_manifest_path(&agent_dir);
1324 let (registry, diagnostics) =
1325 crate::extensions::load_extensions_manifest(&manifest_path).await;
1326 for d in &diagnostics {
1327 let message = d.message.as_str();
1328 match d.severity {
1329 crate::extensions::DiagnosticSeverity::Warn => {
1330 tracing::warn!("extensions: {message}");
1331 }
1332 crate::extensions::DiagnosticSeverity::Error => {
1333 tracing::error!("extensions: {message}");
1334 }
1335 }
1336 }
1337 (Arc::new(registry), Arc::new(diagnostics))
1338 };
1339
1340 Ok(App {
1341 session: arc_swap::ArcSwap::from_pointee(session),
1342 llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1343 factory,
1344 config,
1345 cancel_token,
1346 progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1347 next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1348 skills: Arc::new(skills),
1349 mcp_servers,
1350 extension_registry,
1351 token_tally,
1352 extension_diagnostics,
1353 session_cache,
1354 })
1355 }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360 use super::*;
1361 use crate::config::{AnthropicConfig, ModelConfig};
1362 use crate::events::UiEvent;
1363 use crate::user_message::UserMessage;
1364 use async_trait::async_trait;
1365 use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1366 use motosan_agent_tool::ToolDef;
1367 use std::sync::atomic::{AtomicUsize, Ordering};
1368
1369 #[test]
1370 fn turn_stats_accum_accumulates_across_calls() {
1371 let mut accum = TurnStatsAccum::default();
1372 accum.add(motosan_agent_loop::TokenUsage {
1373 input_tokens: 100,
1374 output_tokens: 25,
1375 });
1376 accum.add(motosan_agent_loop::TokenUsage {
1377 input_tokens: 1000,
1378 output_tokens: 250,
1379 });
1380 assert_eq!(accum.cumulative_input, 1100);
1381 assert_eq!(accum.cumulative_output, 275);
1382 assert_eq!(accum.turn_count, 2);
1383 }
1384
1385 #[test]
1386 fn turn_stats_accum_saturates_on_overflow() {
1387 let mut accum = TurnStatsAccum {
1388 cumulative_input: u64::MAX - 5,
1389 cumulative_output: 0,
1390 turn_count: 1,
1391 };
1392 accum.add(motosan_agent_loop::TokenUsage {
1393 input_tokens: 100,
1394 output_tokens: 0,
1395 });
1396 assert_eq!(accum.cumulative_input, u64::MAX);
1397 }
1398
1399 #[test]
1400 fn extract_thinking_events_extracts_reasoning_in_source_order() {
1401 use motosan_agent_loop::{new_message_id, AssistantContent, MessageMeta};
1402 let messages = vec![Message::Assistant {
1403 id: new_message_id(),
1404 meta: MessageMeta::default(),
1405 content: vec![
1406 AssistantContent::Reasoning {
1407 text: "first thought".into(),
1408 signature: None,
1409 },
1410 AssistantContent::Text {
1411 text: "answer 1".into(),
1412 },
1413 AssistantContent::Reasoning {
1414 text: "second thought".into(),
1415 signature: None,
1416 },
1417 ],
1418 }];
1419 let events = extract_thinking_events(&messages);
1420 assert_eq!(events.len(), 2);
1421 match (&events[0], &events[1]) {
1422 (UiEvent::ThinkingComplete { text: t1 }, UiEvent::ThinkingComplete { text: t2 }) => {
1423 assert_eq!(t1, "first thought");
1424 assert_eq!(t2, "second thought");
1425 }
1426 _ => panic!("expected two ThinkingComplete events"),
1427 }
1428 }
1429
1430 #[test]
1431 fn extract_new_thinking_events_skips_historical_reasoning_before_previous_len() {
1432 use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1433 let messages = vec![
1434 Message::User {
1435 id: new_message_id(),
1436 meta: MessageMeta::default(),
1437 content: vec![ContentPart::text("old question")],
1438 },
1439 Message::Assistant {
1440 id: new_message_id(),
1441 meta: MessageMeta::default(),
1442 content: vec![AssistantContent::Reasoning {
1443 text: "old thought".into(),
1444 signature: None,
1445 }],
1446 },
1447 Message::User {
1448 id: new_message_id(),
1449 meta: MessageMeta::default(),
1450 content: vec![ContentPart::text("new question")],
1451 },
1452 Message::Assistant {
1453 id: new_message_id(),
1454 meta: MessageMeta::default(),
1455 content: vec![AssistantContent::Reasoning {
1456 text: "new thought".into(),
1457 signature: None,
1458 }],
1459 },
1460 ];
1461
1462 let events = extract_new_thinking_events(&messages, 2);
1463 assert_eq!(events.len(), 1, "should only emit current-turn reasoning");
1464 match &events[0] {
1465 UiEvent::ThinkingComplete { text } => assert_eq!(text, "new thought"),
1466 other => panic!("expected ThinkingComplete; got {other:?}"),
1467 }
1468 }
1469
1470 #[test]
1471 fn extract_thinking_events_skips_non_assistant_and_non_reasoning() {
1472 use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1473 let messages = vec![
1474 Message::User {
1475 id: new_message_id(),
1476 meta: MessageMeta::default(),
1477 content: vec![ContentPart::text("question?")],
1478 },
1479 Message::Assistant {
1480 id: new_message_id(),
1481 meta: MessageMeta::default(),
1482 content: vec![AssistantContent::Text {
1483 text: "answer".into(),
1484 }],
1485 },
1486 ];
1487 let events = extract_thinking_events(&messages);
1488 assert!(events.is_empty(), "expected no events; got {events:?}");
1489 }
1490
1491 #[tokio::test]
1492 async fn builder_fails_without_api_key() {
1493 let cfg = Config {
1494 anthropic: AnthropicConfig {
1495 api_key: None,
1496 base_url: "https://api.anthropic.com".into(),
1497 },
1498 model: ModelConfig {
1499 provider: "anthropic".into(),
1500 name: "claude-sonnet-4-6".into(),
1501 max_tokens: 4096,
1502 },
1503 };
1504 let err = match AppBuilder::new()
1505 .with_config(cfg)
1506 .with_builtin_tools()
1507 .build()
1508 .await
1509 {
1510 Ok(_) => panic!("must fail without key"),
1511 Err(err) => err,
1512 };
1513 assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1514 }
1515
1516 struct ToolOnlyLlm {
1517 turn: AtomicUsize,
1518 }
1519
1520 #[async_trait]
1521 impl LlmClient for ToolOnlyLlm {
1522 async fn chat(
1523 &self,
1524 _messages: &[Message],
1525 _tools: &[ToolDef],
1526 ) -> motosan_agent_loop::Result<ChatOutput> {
1527 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1528 if turn == 0 {
1529 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1530 ToolCallItem {
1531 id: "t1".into(),
1532 name: "read".into(),
1533 args: serde_json::json!({"path":"nope.txt"}),
1534 },
1535 ])))
1536 } else {
1537 Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1538 }
1539 }
1540 }
1541
1542 #[tokio::test]
1543 async fn empty_final_message_is_not_emitted() {
1544 let dir = tempfile::tempdir().unwrap();
1545 let mut cfg = Config::default();
1546 cfg.anthropic.api_key = Some("sk-unused".into());
1547 let app = AppBuilder::new()
1548 .with_config(cfg)
1549 .with_cwd(dir.path())
1550 .with_builtin_tools()
1551 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1552 turn: AtomicUsize::new(0),
1553 }))
1554 .build()
1555 .await
1556 .expect("build");
1557 let events: Vec<UiEvent> =
1558 futures::StreamExt::collect(app.send_user_message(UserMessage::text("x"))).await;
1559 let empties = events
1560 .iter()
1561 .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1562 .count();
1563 assert_eq!(
1564 empties, 0,
1565 "should not emit empty final message, got: {events:?}"
1566 );
1567 }
1568
1569 struct EchoLlm;
1570
1571 struct UsageLlm {
1572 responses: std::sync::Mutex<VecDeque<ChatOutput>>,
1573 }
1574
1575 #[async_trait]
1576 impl LlmClient for UsageLlm {
1577 async fn chat(
1578 &self,
1579 _messages: &[Message],
1580 _tools: &[ToolDef],
1581 ) -> motosan_agent_loop::Result<ChatOutput> {
1582 let next = match self.responses.lock() {
1583 Ok(mut responses) => responses.pop_front(),
1584 Err(poisoned) => poisoned.into_inner().pop_front(),
1585 };
1586 Ok(next.unwrap_or_else(|| panic!("UsageLlm script exhausted")))
1587 }
1588 }
1589
1590 #[async_trait]
1591 impl LlmClient for EchoLlm {
1592 async fn chat(
1593 &self,
1594 _messages: &[Message],
1595 _tools: &[ToolDef],
1596 ) -> motosan_agent_loop::Result<ChatOutput> {
1597 Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1598 }
1599 }
1600
1601 #[tokio::test]
1602 async fn turn_stats_emitted_with_cumulative_after_each_turn() {
1603 use motosan_agent_loop::{LlmResponse, TokenUsage};
1604
1605 let dir = tempfile::tempdir().expect("tempdir");
1606 let mut cfg = Config::default();
1607 cfg.anthropic.api_key = Some("sk-unused".into());
1608 let llm = Arc::new(UsageLlm {
1609 responses: std::sync::Mutex::new(VecDeque::from([
1610 ChatOutput::with_usage(
1611 LlmResponse::Message("first".into()),
1612 TokenUsage {
1613 input_tokens: 100,
1614 output_tokens: 50,
1615 },
1616 ),
1617 ChatOutput::with_usage(
1618 LlmResponse::Message("second".into()),
1619 TokenUsage {
1620 input_tokens: 200,
1621 output_tokens: 80,
1622 },
1623 ),
1624 ])),
1625 });
1626 let app = AppBuilder::new()
1627 .with_config(cfg)
1628 .with_cwd(dir.path())
1629 .with_builtin_tools()
1630 .with_llm(llm)
1631 .build()
1632 .await
1633 .expect("build");
1634
1635 let events_1: Vec<UiEvent> = app
1636 .send_user_message(UserMessage::text("hi"))
1637 .collect()
1638 .await;
1639 let events_2: Vec<UiEvent> = app
1640 .send_user_message(UserMessage::text("again"))
1641 .collect()
1642 .await;
1643
1644 let ts1 = events_1
1645 .iter()
1646 .find_map(|e| match e {
1647 UiEvent::TurnStats {
1648 input_tokens,
1649 output_tokens,
1650 cumulative_input,
1651 cumulative_output,
1652 ..
1653 } => Some((
1654 *input_tokens,
1655 *output_tokens,
1656 *cumulative_input,
1657 *cumulative_output,
1658 )),
1659 _ => None,
1660 })
1661 .expect("turn 1 had no TurnStats");
1662 assert_eq!(ts1, (100, 50, 100, 50), "turn 1 stats");
1663
1664 let ts2 = events_2
1665 .iter()
1666 .find_map(|e| match e {
1667 UiEvent::TurnStats {
1668 input_tokens,
1669 output_tokens,
1670 cumulative_input,
1671 cumulative_output,
1672 ..
1673 } => Some((
1674 *input_tokens,
1675 *output_tokens,
1676 *cumulative_input,
1677 *cumulative_output,
1678 )),
1679 _ => None,
1680 })
1681 .expect("turn 2 had no TurnStats");
1682 assert_eq!(ts2, (200, 80, 300, 130), "turn 2 stats");
1683
1684 let positions: Vec<&str> = events_1
1685 .iter()
1686 .filter_map(|e| match e {
1687 UiEvent::AgentMessageComplete(_) => Some("msg_complete"),
1688 UiEvent::TurnStats { .. } => Some("stats"),
1689 UiEvent::AgentTurnComplete => Some("turn_complete"),
1690 _ => None,
1691 })
1692 .collect();
1693 assert_eq!(
1694 positions,
1695 vec!["msg_complete", "stats", "turn_complete"],
1696 "wrong ordering"
1697 );
1698 }
1699
1700 #[tokio::test]
1701 async fn turn_stats_reset_after_new_session() {
1702 use motosan_agent_loop::{LlmResponse, TokenUsage};
1703
1704 let dir = tempfile::tempdir().expect("tempdir");
1705 let mut cfg = Config::default();
1706 cfg.anthropic.api_key = Some("sk-unused".into());
1707 let llm = Arc::new(UsageLlm {
1708 responses: std::sync::Mutex::new(VecDeque::from([
1709 ChatOutput::with_usage(
1710 LlmResponse::Message("first".into()),
1711 TokenUsage {
1712 input_tokens: 100,
1713 output_tokens: 50,
1714 },
1715 ),
1716 ChatOutput::with_usage(
1717 LlmResponse::Message("after-new".into()),
1718 TokenUsage {
1719 input_tokens: 7,
1720 output_tokens: 3,
1721 },
1722 ),
1723 ])),
1724 });
1725 let app = AppBuilder::new()
1726 .with_config(cfg)
1727 .with_cwd(dir.path())
1728 .with_builtin_tools()
1729 .with_llm(llm)
1730 .build()
1731 .await
1732 .expect("build");
1733
1734 let _: Vec<UiEvent> = app
1735 .send_user_message(UserMessage::text("hi"))
1736 .collect()
1737 .await;
1738 app.new_session().await.expect("new session");
1739 let events: Vec<UiEvent> = app
1740 .send_user_message(UserMessage::text("after new"))
1741 .collect()
1742 .await;
1743 let stats = events
1744 .iter()
1745 .find_map(|event| match event {
1746 UiEvent::TurnStats {
1747 input_tokens,
1748 output_tokens,
1749 cumulative_input,
1750 cumulative_output,
1751 ..
1752 } => Some((
1753 *input_tokens,
1754 *output_tokens,
1755 *cumulative_input,
1756 *cumulative_output,
1757 )),
1758 _ => None,
1759 })
1760 .expect("turn had no TurnStats");
1761 assert_eq!(stats, (7, 3, 7, 3));
1762 }
1763
1764 #[tokio::test]
1765 async fn with_headless_permissions_builds_an_app() {
1766 let dir = tempfile::tempdir().expect("tempdir");
1767 let mut config = Config::default();
1768 config.anthropic.api_key = Some("sk-unused".into());
1769 let app = AppBuilder::new()
1770 .with_config(config)
1771 .with_cwd(dir.path())
1772 .with_builtin_tools()
1773 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1774 .with_headless_permissions()
1775 .build()
1776 .await
1777 .expect("build");
1778 assert!(!app.session_id().is_empty());
1780 }
1781
1782 #[tokio::test]
1783 async fn new_session_swaps_in_a_fresh_empty_session() {
1784 use futures::StreamExt;
1785 let dir = tempfile::tempdir().expect("tempdir");
1786 let mut config = Config::default();
1787 config.anthropic.api_key = Some("sk-unused".into());
1788 let app = AppBuilder::new()
1789 .with_config(config)
1790 .with_cwd(dir.path())
1791 .with_builtin_tools()
1792 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1793 .build()
1794 .await
1795 .expect("build");
1796
1797 let _: Vec<_> = app
1798 .send_user_message(UserMessage::text("hello"))
1799 .collect()
1800 .await;
1801 let id_before = app.session_id();
1802 assert!(!app.session_history().await.expect("history").is_empty());
1803
1804 app.new_session().await.expect("new_session");
1805
1806 assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1807 assert!(
1808 app.session_history().await.expect("history").is_empty(),
1809 "fresh session has no history"
1810 );
1811 }
1812
1813 #[tokio::test]
1814 async fn load_session_restores_a_stored_session_by_id() {
1815 use futures::StreamExt;
1816 let dir = tempfile::tempdir().expect("tempdir");
1817 let store_dir = dir.path().join("sessions");
1818 let store: Arc<dyn SessionStore> =
1819 Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1820 let mut config = Config::default();
1821 config.anthropic.api_key = Some("sk-unused".into());
1822 let app = AppBuilder::new()
1823 .with_config(config)
1824 .with_cwd(dir.path())
1825 .with_builtin_tools()
1826 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1827 .with_session_store(Arc::clone(&store))
1828 .build()
1829 .await
1830 .expect("build");
1831
1832 let _: Vec<_> = app
1833 .send_user_message(UserMessage::text("remember this"))
1834 .collect()
1835 .await;
1836 let original_id = app.session_id();
1837
1838 app.new_session().await.expect("new_session");
1839 assert_ne!(app.session_id(), original_id);
1840
1841 app.load_session(&original_id).await.expect("load_session");
1842 assert_eq!(app.session_id(), original_id);
1843 let history = app.session_history().await.expect("history");
1844 assert!(
1845 history.iter().any(|m| m.text().contains("remember this")),
1846 "loaded session should carry the original turn"
1847 );
1848 }
1849
1850 #[tokio::test]
1851 async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
1852 use futures::StreamExt;
1853 let dir = tempfile::tempdir().expect("tempdir");
1854 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1855 dir.path().join("s"),
1856 ));
1857 let mut config = Config::default();
1858 config.anthropic.api_key = Some("sk-unused".into());
1859 let app = AppBuilder::new()
1860 .with_config(config)
1861 .with_cwd(dir.path())
1862 .with_builtin_tools()
1863 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1864 .with_session_store(store)
1865 .build()
1866 .await
1867 .expect("build");
1868
1869 let _: Vec<_> = app
1870 .send_user_message(UserMessage::text("hello"))
1871 .collect()
1872 .await;
1873 let original_id = app.session_id();
1874
1875 let new_id = app.clone_session().await.expect("clone_session");
1876
1877 assert_ne!(new_id, original_id);
1879 assert_eq!(app.session_id(), new_id);
1880 let history = app.session_history().await.expect("history");
1882 assert!(history.iter().any(|m| m.text().contains("hello")));
1883 }
1884
1885 #[tokio::test]
1886 async fn fork_from_creates_a_branch_off_an_earlier_entry() {
1887 use futures::StreamExt;
1888 let dir = tempfile::tempdir().expect("tempdir");
1889 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1890 dir.path().join("s"),
1891 ));
1892 let mut config = Config::default();
1893 config.anthropic.api_key = Some("sk-unused".into());
1894 let app = AppBuilder::new()
1895 .with_config(config)
1896 .with_cwd(dir.path())
1897 .with_builtin_tools()
1898 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1899 .with_session_store(store)
1900 .build()
1901 .await
1902 .expect("build");
1903
1904 let _: Vec<_> = app
1906 .send_user_message(UserMessage::text("first"))
1907 .collect()
1908 .await;
1909 let _: Vec<_> = app
1910 .send_user_message(UserMessage::text("second"))
1911 .collect()
1912 .await;
1913
1914 let entries = app.session.load_full().entries().await.expect("entries");
1916 let first_id = entries
1917 .iter()
1918 .find_map(|stored| {
1919 let msg = stored.entry.as_message()?;
1920 (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
1921 .then(|| stored.id.clone())
1922 })
1923 .expect("first user message present");
1924
1925 let _: Vec<_> = app
1927 .fork_from(first_id, UserMessage::text("branched"))
1928 .collect()
1929 .await;
1930
1931 let history = app.session_history().await.expect("history");
1932 let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
1933 assert!(
1934 texts.iter().any(|t| t.contains("first")),
1935 "fork keeps the fork-point ancestor"
1936 );
1937 assert!(
1938 texts.iter().any(|t| t.contains("branched")),
1939 "fork includes the new message"
1940 );
1941 assert!(
1942 !texts.iter().any(|t| t.contains("second")),
1943 "fork excludes the abandoned branch"
1944 );
1945 }
1946
1947 #[tokio::test]
1948 async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
1949 use futures::StreamExt;
1950 let dir = tempfile::tempdir().expect("tempdir");
1951 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1952 dir.path().join("s"),
1953 ));
1954 let mut config = Config::default();
1955 config.anthropic.api_key = Some("sk-unused".into());
1956 let app = AppBuilder::new()
1957 .with_config(config)
1958 .with_cwd(dir.path())
1959 .with_builtin_tools()
1960 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1961 .with_session_store(store)
1962 .build()
1963 .await
1964 .expect("build");
1965
1966 let _: Vec<_> = app
1967 .send_user_message(UserMessage::text("alpha"))
1968 .collect()
1969 .await;
1970 let _: Vec<_> = app
1971 .send_user_message(UserMessage::text("bravo"))
1972 .collect()
1973 .await;
1974
1975 let candidates = app.fork_candidates().await.expect("candidates");
1976 let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
1977 assert!(previews[0].contains("bravo"), "got {previews:?}");
1979 assert!(previews.iter().any(|p| p.contains("alpha")));
1980 assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
1982 }
1983
1984 #[tokio::test]
1985 async fn branches_returns_a_tree_for_a_linear_session() {
1986 use futures::StreamExt;
1987 let dir = tempfile::tempdir().expect("tempdir");
1988 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1989 dir.path().join("s"),
1990 ));
1991 let mut config = Config::default();
1992 config.anthropic.api_key = Some("sk-unused".into());
1993 let app = AppBuilder::new()
1994 .with_config(config)
1995 .with_cwd(dir.path())
1996 .with_builtin_tools()
1997 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1998 .with_session_store(store)
1999 .build()
2000 .await
2001 .expect("build");
2002
2003 let _: Vec<_> = app
2004 .send_user_message(UserMessage::text("hello"))
2005 .collect()
2006 .await;
2007 let tree = app.branches().await.expect("branches");
2008 assert!(!tree.nodes.is_empty());
2010 assert!(tree.active_leaf.is_some());
2011 }
2012
2013 #[tokio::test]
2014 async fn reload_settings_rebuilds_session_and_resets_token_tally() {
2015 let dir = tempfile::tempdir().expect("tempdir");
2016 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2017 dir.path().join("s"),
2018 ));
2019 let mut cfg = Config::default();
2020 cfg.anthropic.api_key = Some("sk-unused".into());
2021
2022 let llm = Arc::new(UsageLlm {
2023 responses: std::sync::Mutex::new(VecDeque::from([
2024 ChatOutput::with_usage(
2025 LlmResponse::Message("first".into()),
2026 motosan_agent_loop::TokenUsage {
2027 input_tokens: 100,
2028 output_tokens: 50,
2029 },
2030 ),
2031 ChatOutput::with_usage(
2032 LlmResponse::Message("after-reload".into()),
2033 motosan_agent_loop::TokenUsage {
2034 input_tokens: 5,
2035 output_tokens: 2,
2036 },
2037 ),
2038 ])),
2039 });
2040 let app = AppBuilder::new()
2041 .with_config(cfg)
2042 .with_cwd(dir.path())
2043 .with_builtin_tools()
2044 .with_llm(llm)
2045 .with_session_store(store)
2046 .build()
2047 .await
2048 .expect("build");
2049
2050 let _: Vec<UiEvent> = app
2051 .send_user_message(crate::user_message::UserMessage::text("hi"))
2052 .collect()
2053 .await;
2054 {
2055 let token_tally = app.token_tally();
2056 let tally = token_tally.lock().await;
2057 assert_eq!(tally.cumulative_input, 100);
2058 assert_eq!(tally.cumulative_output, 50);
2059 assert_eq!(tally.turn_count, 1);
2060 }
2061
2062 let mut new_settings = crate::settings::Settings::default();
2063 new_settings.model.name = "claude-opus-4-7".into();
2064 new_settings.ui.footer_show_cost = false;
2065 app.reload_settings(new_settings).await.expect("reload");
2066 assert_eq!(app.factory.settings().model.name, "claude-opus-4-7");
2067 assert!(!app.factory.settings().ui.footer_show_cost);
2068 assert_eq!(app.factory.current_model(), None);
2069
2070 {
2071 let token_tally = app.token_tally();
2072 let tally = token_tally.lock().await;
2073 assert_eq!(tally.cumulative_input, 0, "tally not reset on reload");
2074 assert_eq!(tally.cumulative_output, 0);
2075 assert_eq!(tally.turn_count, 0);
2076 }
2077
2078 let _: Vec<UiEvent> = app
2079 .send_user_message(crate::user_message::UserMessage::text("after"))
2080 .collect()
2081 .await;
2082 {
2083 let token_tally = app.token_tally();
2084 let tally = token_tally.lock().await;
2085 assert_eq!(tally.cumulative_input, 5);
2086 assert_eq!(tally.cumulative_output, 2);
2087 assert_eq!(tally.turn_count, 1);
2088 }
2089 }
2090
2091 #[tokio::test]
2092 async fn switch_model_preserves_history() {
2093 use futures::StreamExt;
2094 let dir = tempfile::tempdir().expect("tempdir");
2095 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2096 dir.path().join("s"),
2097 ));
2098 let mut config = Config::default();
2099 config.anthropic.api_key = Some("sk-unused".into());
2100 let app = AppBuilder::new()
2101 .with_config(config)
2102 .with_cwd(dir.path())
2103 .with_builtin_tools()
2104 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2105 .with_session_store(store)
2106 .build()
2107 .await
2108 .expect("build");
2109
2110 let _: Vec<_> = app
2111 .send_user_message(UserMessage::text("keep me"))
2112 .collect()
2113 .await;
2114 let id_before = app.session_id();
2115
2116 app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
2117 .await
2118 .expect("switch_model");
2119
2120 assert_eq!(
2121 app.session_id(),
2122 id_before,
2123 "switch_model keeps the same session"
2124 );
2125 let history = app.session_history().await.expect("history");
2126 assert!(history.iter().any(|m| m.text().contains("keep me")));
2127 }
2128
2129 #[tokio::test]
2130 async fn switch_model_is_sticky_for_future_session_rebuilds() {
2131 let dir = tempfile::tempdir().expect("tempdir");
2132 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2133 dir.path().join("s"),
2134 ));
2135 let mut config = Config::default();
2136 config.anthropic.api_key = Some("sk-unused".into());
2137 let app = AppBuilder::new()
2138 .with_config(config)
2139 .with_cwd(dir.path())
2140 .with_builtin_tools()
2141 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2142 .with_session_store(store)
2143 .build()
2144 .await
2145 .expect("build");
2146
2147 let selected = crate::model::ModelId::from("claude-opus-4-7");
2148 app.switch_model(&selected).await.expect("switch_model");
2149 app.new_session().await.expect("new_session");
2150
2151 assert_eq!(app.factory.current_model(), Some(selected.clone()));
2152 assert_eq!(app.settings().model.name, selected.to_string());
2153 }
2154
2155 struct SleepThenDoneLlm {
2156 turn: AtomicUsize,
2157 }
2158
2159 #[async_trait]
2160 impl LlmClient for SleepThenDoneLlm {
2161 async fn chat(
2162 &self,
2163 _messages: &[Message],
2164 _tools: &[ToolDef],
2165 ) -> motosan_agent_loop::Result<ChatOutput> {
2166 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2167 if turn == 0 {
2168 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2169 ToolCallItem {
2170 id: "sleep".into(),
2171 name: "bash".into(),
2172 args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
2173 },
2174 ])))
2175 } else {
2176 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2177 }
2178 }
2179 }
2180
2181 #[tokio::test]
2182 async fn cancel_reaches_builtin_tools_after_session_rebuild() {
2183 use futures::StreamExt;
2184 let dir = tempfile::tempdir().expect("tempdir");
2185 let mut config = Config::default();
2186 config.anthropic.api_key = Some("sk-unused".into());
2187 let app = Arc::new(
2188 AppBuilder::new()
2189 .with_config(config)
2190 .with_cwd(dir.path())
2191 .with_builtin_tools()
2192 .with_llm(Arc::new(SleepThenDoneLlm {
2193 turn: AtomicUsize::new(0),
2194 }) as Arc<dyn LlmClient>)
2195 .build()
2196 .await
2197 .expect("build"),
2198 );
2199
2200 app.new_session().await.expect("new_session");
2201 let running_app = Arc::clone(&app);
2202 let handle = tokio::spawn(async move {
2203 running_app
2204 .send_user_message(UserMessage::text("run a slow command"))
2205 .collect::<Vec<_>>()
2206 .await
2207 });
2208 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2209 app.cancel();
2210
2211 let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
2212 .await
2213 .expect("turn should finish after cancellation")
2214 .expect("join");
2215 assert!(
2216 events.iter().any(|event| {
2217 matches!(
2218 event,
2219 UiEvent::ToolCallCompleted { result, .. }
2220 if result.text.contains("command cancelled by user")
2221 )
2222 }),
2223 "cancel should reach the rebuilt bash tool: {events:?}"
2224 );
2225 }
2226
2227 #[tokio::test]
2228 async fn compact_summarizes_a_session_with_enough_history() {
2229 struct DoneLlm;
2230 #[async_trait]
2231 impl LlmClient for DoneLlm {
2232 async fn chat(
2233 &self,
2234 _messages: &[Message],
2235 _tools: &[ToolDef],
2236 ) -> motosan_agent_loop::Result<ChatOutput> {
2237 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2238 }
2239 }
2240
2241 let dir = tempfile::tempdir().expect("tempdir");
2242 let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
2243 dir.path().join("sessions"),
2244 ));
2245 let mut config = Config::default();
2246 config.anthropic.api_key = Some("sk-unused".into());
2247 let app = AppBuilder::new()
2248 .with_config(config)
2249 .with_cwd(dir.path())
2250 .with_builtin_tools()
2251 .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
2252 .with_session_store(store)
2253 .build()
2254 .await
2255 .expect("build");
2256
2257 for i in 0..4 {
2264 let _: Vec<_> = app
2265 .send_user_message(UserMessage::text(format!("turn {i}")))
2266 .collect()
2267 .await;
2268 }
2269
2270 app.compact().await.expect("compact should succeed");
2271
2272 let history = app.session_history().await.expect("history");
2275 assert!(
2276 !history.is_empty(),
2277 "session should still have content post-compaction"
2278 );
2279 }
2280
2281 #[test]
2282 fn anthropic_env_api_key_overrides_auth_json_key() {
2283 let mut auth = crate::auth::Auth::default();
2284 auth.0.insert(
2285 "anthropic".into(),
2286 crate::auth::ProviderAuth::ApiKey {
2287 key: "sk-auth".into(),
2288 },
2289 );
2290
2291 let key = anthropic_api_key_from(&auth, |name| {
2292 (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
2293 });
2294 assert_eq!(key.as_deref(), Some("sk-env"));
2295 }
2296
2297 #[tokio::test]
2298 async fn with_settings_overrides_deprecated_config_model() {
2299 use crate::settings::Settings;
2300
2301 let mut config = Config::default();
2302 config.model.name = "from-config".into();
2303 config.anthropic.api_key = Some("sk-config".into());
2304
2305 let mut settings = Settings::default();
2306 settings.model.name = "from-settings".into();
2307
2308 let tmp = tempfile::tempdir().unwrap();
2309 let app = AppBuilder::new()
2310 .with_config(config)
2311 .with_settings(settings)
2312 .with_cwd(tmp.path())
2313 .disable_context_discovery()
2314 .with_llm(Arc::new(EchoLlm))
2315 .build()
2316 .await
2317 .expect("build");
2318 assert_eq!(app.config().model.name, "from-settings");
2319 assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
2320 }
2321
2322 #[tokio::test]
2323 async fn with_settings_synthesises_legacy_config_for_build() {
2324 use crate::auth::{Auth, ProviderAuth};
2325 use crate::settings::Settings;
2326
2327 let mut settings = Settings::default();
2328 settings.model.name = "claude-sonnet-4-6".into();
2329
2330 let mut auth = Auth::default();
2331 auth.0.insert(
2332 "anthropic".into(),
2333 ProviderAuth::ApiKey {
2334 key: "sk-test".into(),
2335 },
2336 );
2337
2338 let tmp = tempfile::tempdir().unwrap();
2339 let app = AppBuilder::new()
2340 .with_settings(settings)
2341 .with_auth(auth)
2342 .with_cwd(tmp.path())
2343 .with_builtin_tools()
2344 .disable_context_discovery()
2345 .with_llm(Arc::new(EchoLlm))
2346 .build()
2347 .await
2348 .expect("build");
2349 let _ = app;
2350 }
2351
2352 #[tokio::test]
2353 async fn cancel_before_turn_does_not_poison_future_turns() {
2354 let dir = tempfile::tempdir().unwrap();
2355 let mut cfg = Config::default();
2356 cfg.anthropic.api_key = Some("sk-unused".into());
2357 let app = AppBuilder::new()
2358 .with_config(cfg)
2359 .with_cwd(dir.path())
2360 .with_builtin_tools()
2361 .with_llm(std::sync::Arc::new(EchoLlm))
2362 .build()
2363 .await
2364 .expect("build");
2365
2366 app.cancel();
2367 let events: Vec<UiEvent> = app
2368 .send_user_message(UserMessage::text("x"))
2369 .collect()
2370 .await;
2371
2372 assert!(
2373 events
2374 .iter()
2375 .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
2376 "turn should use a fresh cancellation token: {events:?}"
2377 );
2378 }
2379
2380 #[test]
2381 fn map_event_matches_started_and_completed_ids_by_tool_name() {
2382 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2383
2384 let started_bash = map_event(
2385 AgentEvent::Core(CoreEvent::ToolStarted {
2386 name: "bash".into(),
2387 }),
2388 &tracker,
2389 );
2390 let started_read = map_event(
2391 AgentEvent::Core(CoreEvent::ToolStarted {
2392 name: "read".into(),
2393 }),
2394 &tracker,
2395 );
2396 let completed_bash = map_event(
2397 AgentEvent::Core(CoreEvent::ToolCompleted {
2398 name: "bash".into(),
2399 result: motosan_agent_tool::ToolResult::text("ok"),
2400 }),
2401 &tracker,
2402 );
2403 let completed_read = map_event(
2404 AgentEvent::Core(CoreEvent::ToolCompleted {
2405 name: "read".into(),
2406 result: motosan_agent_tool::ToolResult::text("ok"),
2407 }),
2408 &tracker,
2409 );
2410
2411 assert!(matches!(
2412 started_bash,
2413 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
2414 ));
2415 assert!(matches!(
2416 started_read,
2417 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
2418 ));
2419 assert!(matches!(
2420 completed_bash,
2421 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
2422 ));
2423 assert!(matches!(
2424 completed_read,
2425 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
2426 ));
2427 }
2428
2429 #[test]
2430 fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
2431 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2432 let s1 = map_event(
2433 AgentEvent::Core(CoreEvent::ToolStarted {
2434 name: "bash".into(),
2435 }),
2436 &tracker,
2437 );
2438 let s2 = map_event(
2439 AgentEvent::Core(CoreEvent::ToolStarted {
2440 name: "bash".into(),
2441 }),
2442 &tracker,
2443 );
2444 let c1 = map_event(
2445 AgentEvent::Core(CoreEvent::ToolCompleted {
2446 name: "bash".into(),
2447 result: motosan_agent_tool::ToolResult::text("a"),
2448 }),
2449 &tracker,
2450 );
2451 let c2 = map_event(
2452 AgentEvent::Core(CoreEvent::ToolCompleted {
2453 name: "bash".into(),
2454 result: motosan_agent_tool::ToolResult::text("b"),
2455 }),
2456 &tracker,
2457 );
2458
2459 let id_s1 = match s1 {
2460 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2461 other => panic!("{other:?}"),
2462 };
2463 let id_s2 = match s2 {
2464 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2465 other => panic!("{other:?}"),
2466 };
2467 let id_c1 = match c1 {
2468 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2469 other => panic!("{other:?}"),
2470 };
2471 let id_c2 = match c2 {
2472 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2473 other => panic!("{other:?}"),
2474 };
2475
2476 assert_eq!(id_s1, id_c1);
2477 assert_eq!(id_s2, id_c2);
2478 assert_ne!(id_s1, id_s2);
2479 }
2480
2481 #[tokio::test]
2482 async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
2483 let dir = tempfile::tempdir().unwrap();
2484 let mut cfg = Config::default();
2485 cfg.anthropic.api_key = Some("sk-unused".into());
2486 let app = AppBuilder::new()
2487 .with_config(cfg)
2488 .with_cwd(dir.path())
2489 .with_builtin_tools()
2490 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2491 turn: AtomicUsize::new(0),
2492 }))
2493 .build()
2494 .await
2495 .expect("build");
2496
2497 let mut first = Box::pin(app.send_user_message(UserMessage::text("first")));
2498 let first_event = first.next().await;
2499 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2500
2501 let second_events: Vec<UiEvent> = app
2502 .send_user_message(UserMessage::text("second"))
2503 .collect()
2504 .await;
2505 assert_eq!(
2506 second_events.len(),
2507 1,
2508 "expected immediate single error event, got: {second_events:?}"
2509 );
2510 assert!(matches!(
2511 &second_events[0],
2512 UiEvent::Error(msg) if msg.contains("single-turn-per-App")
2513 ));
2514 }
2515
2516 #[tokio::test]
2517 async fn attachment_error_is_emitted_before_the_single_turn_guard_fires() {
2518 let dir = tempfile::tempdir().unwrap();
2524 let mut cfg = Config::default();
2525 cfg.anthropic.api_key = Some("sk-unused".into());
2526 let app = AppBuilder::new()
2527 .with_config(cfg)
2528 .with_cwd(dir.path())
2529 .with_builtin_tools()
2530 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2531 turn: AtomicUsize::new(0),
2532 }))
2533 .build()
2534 .await
2535 .expect("build");
2536
2537 let mut first =
2539 Box::pin(app.send_user_message(crate::user_message::UserMessage::text("first")));
2540 let first_event = first.next().await;
2541 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2542
2543 let bad = crate::user_message::UserMessage {
2545 text: "second".into(),
2546 attachments: vec![crate::user_message::Attachment::Image {
2547 path: std::path::PathBuf::from("/tmp/this-file-must-not-exist-capo-v06-test.png"),
2548 }],
2549 };
2550 let second_events: Vec<UiEvent> = app.send_user_message(bad).collect().await;
2551
2552 assert_eq!(
2553 second_events.len(),
2554 1,
2555 "expected exactly one event (the attachment error); got: {second_events:?}"
2556 );
2557 assert!(
2558 matches!(
2559 &second_events[0],
2560 UiEvent::AttachmentError {
2561 kind: crate::user_message::AttachmentErrorKind::NotFound,
2562 ..
2563 }
2564 ),
2565 "expected AttachmentError::NotFound as first event; got {second_events:?}"
2566 );
2567 }
2568
2569 #[test]
2570 fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
2571 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2572 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2573
2574 let only = map_event(
2575 AgentEvent::Core(CoreEvent::ToolStarted {
2576 name: "bash".into(),
2577 }),
2578 &tracker,
2579 );
2580 let only_id = match only {
2581 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2582 other => panic!("{other:?}"),
2583 };
2584 assert_eq!(progress_event_id(&tracker), only_id);
2585
2586 let _second = map_event(
2587 AgentEvent::Core(CoreEvent::ToolStarted {
2588 name: "read".into(),
2589 }),
2590 &tracker,
2591 );
2592 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2593 }
2594
2595 #[tokio::test]
2596 async fn builder_rejects_builtin_and_custom_tools_together() {
2597 let mut cfg = Config::default();
2598 cfg.anthropic.api_key = Some("sk-unused".into());
2599 let dir = tempfile::tempdir().unwrap();
2600 let err = match AppBuilder::new()
2601 .with_config(cfg)
2602 .with_cwd(dir.path())
2603 .with_builtin_tools()
2604 .with_custom_tools_factory(|_| Vec::new())
2605 .build()
2606 .await
2607 {
2608 Ok(_) => panic!("must reject conflicting tool configuration"),
2609 Err(err) => err,
2610 };
2611
2612 assert!(format!("{err}").contains("mutually exclusive"));
2613 }
2614
2615 #[tokio::test]
2617 async fn two_turns_in_same_session_share_history() {
2618 #[derive(Default)]
2619 struct CounterLlm {
2620 turn: AtomicUsize,
2621 }
2622 #[async_trait]
2623 impl LlmClient for CounterLlm {
2624 async fn chat(
2625 &self,
2626 messages: &[Message],
2627 _tools: &[ToolDef],
2628 ) -> motosan_agent_loop::Result<ChatOutput> {
2629 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2630 let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
2631 Ok(ChatOutput::new(LlmResponse::Message(answer)))
2632 }
2633 }
2634
2635 let tmp = tempfile::tempdir().unwrap();
2636 let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
2637 tmp.path().to_path_buf(),
2638 ));
2639
2640 let app = AppBuilder::new()
2641 .with_settings(crate::settings::Settings::default())
2642 .with_auth(crate::auth::Auth::default())
2643 .with_cwd(tmp.path())
2644 .with_builtin_tools()
2645 .disable_context_discovery()
2646 .with_llm(std::sync::Arc::new(CounterLlm::default()))
2647 .with_session_store(store)
2648 .build_with_session(None)
2649 .await
2650 .expect("build");
2651
2652 let _events1: Vec<UiEvent> = app
2653 .send_user_message(UserMessage::text("hi"))
2654 .collect()
2655 .await;
2656 let events2: Vec<UiEvent> = app
2657 .send_user_message(UserMessage::text("again"))
2658 .collect()
2659 .await;
2660
2661 let saw_more_than_one = events2.iter().any(|e| {
2663 matches!(
2664 e,
2665 UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
2666 )
2667 });
2668 assert!(
2669 saw_more_than_one,
2670 "second turn should have seen history; events: {events2:?}"
2671 );
2672 }
2673}
2674
2675#[cfg(test)]
2676mod skills_builder_tests {
2677 use super::*;
2678 use crate::skills::types::{Skill, SkillSource};
2679 use std::path::PathBuf;
2680
2681 fn fixture() -> Skill {
2682 Skill {
2683 name: "x".into(),
2684 description: "d".into(),
2685 file_path: PathBuf::from("/x.md"),
2686 base_dir: PathBuf::from("/"),
2687 disable_model_invocation: false,
2688 source: SkillSource::Global,
2689 }
2690 }
2691
2692 #[test]
2693 fn with_skills_stores_skills() {
2694 let b = AppBuilder::new().with_skills(vec![fixture()]);
2695 assert_eq!(b.skills.len(), 1);
2696 assert_eq!(b.skills[0].name, "x");
2697 }
2698
2699 #[test]
2700 fn without_skills_clears() {
2701 let b = AppBuilder::new()
2702 .with_skills(vec![fixture()])
2703 .without_skills();
2704 assert!(b.skills.is_empty());
2705 }
2706}
2707
2708#[cfg(test)]
2709mod mcp_builder_tests {
2710 use super::*;
2711 use motosan_agent_tool::Tool;
2712
2713 struct FakeTool;
2715 impl Tool for FakeTool {
2716 fn def(&self) -> motosan_agent_tool::ToolDef {
2717 motosan_agent_tool::ToolDef {
2718 name: "fake__echo".into(),
2719 description: "test".into(),
2720 input_schema: serde_json::json!({"type": "object"}),
2721 }
2722 }
2723 fn call(
2724 &self,
2725 _args: serde_json::Value,
2726 _ctx: &motosan_agent_tool::ToolContext,
2727 ) -> std::pin::Pin<
2728 Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
2729 > {
2730 Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
2731 }
2732 }
2733
2734 #[test]
2735 fn with_extra_tools_stores_tools() {
2736 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
2737 let b = AppBuilder::new().with_extra_tools(tools);
2738 assert_eq!(b.extra_tools.len(), 1);
2739 }
2740}