1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9 AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10 CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate, PromptStrategy};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
27pub struct TurnStatsAccum {
28 pub cumulative_input: u64,
29 pub cumulative_output: u64,
30 pub turn_count: u64,
31}
32
33impl TurnStatsAccum {
34 pub fn add(&mut self, usage: motosan_agent_loop::TokenUsage) {
38 self.cumulative_input = self.cumulative_input.saturating_add(usage.input_tokens);
39 self.cumulative_output = self.cumulative_output.saturating_add(usage.output_tokens);
40 self.turn_count = self.turn_count.saturating_add(1);
41 }
42}
43
44pub(crate) fn extract_thinking_events(messages: &[motosan_agent_loop::Message]) -> Vec<UiEvent> {
52 let mut events = Vec::new();
53 for msg in messages {
54 if let motosan_agent_loop::Message::Assistant { content, .. } = msg {
55 for part in content {
56 if let motosan_agent_loop::AssistantContent::Reasoning { text, .. } = part {
57 events.push(UiEvent::ThinkingComplete { text: text.clone() });
58 }
59 }
60 }
61 }
62 events
63}
64
65pub(crate) fn extract_new_thinking_events(
70 messages: &[motosan_agent_loop::Message],
71 previous_len: usize,
72) -> Vec<UiEvent> {
73 extract_thinking_events(messages.get(previous_len..).unwrap_or(&[]))
74}
75
76#[derive(Debug, Clone)]
78pub(crate) enum SessionMode {
79 New,
81 Resume(String),
83}
84
85struct SharedLlm {
90 client: Arc<dyn LlmClient>,
91}
92
93impl SharedLlm {
94 fn new(client: Arc<dyn LlmClient>) -> Self {
95 Self { client }
96 }
97
98 fn client(&self) -> Arc<dyn LlmClient> {
99 Arc::clone(&self.client)
100 }
101}
102
103pub(crate) struct SessionFactory {
104 cwd: PathBuf,
105 settings: Arc<Mutex<crate::settings::Settings>>,
106 auth: crate::auth::Auth,
107 policy: Arc<crate::permissions::Policy>,
108 session_cache: Arc<crate::permissions::SessionCache>,
109 ui_tx: Option<mpsc::Sender<UiEvent>>,
110 headless_permissions: bool,
111 permission_gate: Arc<dyn PermissionGate>,
112 permission_strategy_handle: Option<Arc<tokio::sync::RwLock<PromptStrategy>>>,
113 progress_tx: mpsc::Sender<ToolProgressChunk>,
116 skills: Arc<Vec<crate::skills::Skill>>,
117 install_builtin_tools: bool,
118 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
119 max_iterations: usize,
120 context_discovery_disabled: bool,
121 autocompact_enabled: bool,
122 session_store: Option<Arc<dyn SessionStore>>,
123 llm_override: Option<Arc<dyn LlmClient>>,
124 current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
125 cancel_token: SharedCancelToken,
126}
127
128impl SessionFactory {
129 fn settings(&self) -> crate::settings::Settings {
130 match self.settings.lock() {
131 Ok(guard) => guard.clone(),
132 Err(poisoned) => poisoned.into_inner().clone(),
133 }
134 }
135
136 fn store_settings(&self, settings: crate::settings::Settings) {
137 match self.settings.lock() {
138 Ok(mut guard) => *guard = settings,
139 Err(poisoned) => *poisoned.into_inner() = settings,
140 }
141 }
142
143 fn current_model(&self) -> Option<crate::model::ModelId> {
144 match self.current_model.lock() {
145 Ok(guard) => guard.clone(),
146 Err(poisoned) => poisoned.into_inner().clone(),
147 }
148 }
149
150 fn set_current_model(&self, model: crate::model::ModelId) {
151 match self.current_model.lock() {
152 Ok(mut guard) => *guard = Some(model),
153 Err(poisoned) => *poisoned.into_inner() = Some(model),
154 }
155 }
156
157 fn clear_current_model(&self) {
158 match self.current_model.lock() {
159 Ok(mut guard) => *guard = None,
160 Err(poisoned) => *poisoned.into_inner() = None,
161 }
162 }
163
164 async fn build(
171 &self,
172 mode: SessionMode,
173 model_override: Option<&crate::model::ModelId>,
174 settings_override: Option<&crate::settings::Settings>,
175 ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
176 let effective_model = model_override.cloned().or_else(|| {
180 if settings_override.is_some() {
181 None
182 } else {
183 self.current_model()
184 }
185 });
186 let mut settings = settings_override
187 .cloned()
188 .unwrap_or_else(|| self.settings());
189 if let Some(m) = &effective_model {
190 settings.model.name = m.as_str().to_string();
191 }
192
193 let llm = if effective_model.is_none() {
194 self.llm_override.as_ref().map_or_else(
195 || build_llm_client(&settings, &self.auth),
196 |llm| Ok(Arc::clone(llm)),
197 )?
198 } else {
199 build_llm_client(&settings, &self.auth)?
200 };
201
202 let tool_ctx = ToolCtx::new_with_cancel_token(
205 &self.cwd,
206 Arc::clone(&self.permission_gate),
207 self.progress_tx.clone(),
208 self.cancel_token.clone(),
209 );
210 let mut tools = if self.install_builtin_tools {
211 builtin_tools(tool_ctx.clone())
212 } else {
213 Vec::new()
214 };
215 tools.extend(self.extra_tools.iter().cloned());
216
217 let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
218 let base_prompt = build_system_prompt(&tool_names, &self.skills);
219 let system_prompt = if self.context_discovery_disabled {
220 base_prompt
221 } else {
222 let agent_dir = crate::paths::agent_dir();
223 let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
224 crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
225 };
226 let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
227
228 let mut engine_builder = Engine::builder()
229 .max_iterations(self.max_iterations)
230 .system_prompt(system_prompt)
231 .tool_context(motosan_tool_context);
232 for tool in tools {
233 engine_builder = engine_builder.tool(tool);
234 }
235 if let Some(ui_tx) = &self.ui_tx {
236 let ext = if let Some(handle) = &self.permission_strategy_handle {
237 crate::permissions::PermissionExtension::with_strategy_handle(
238 Arc::clone(&self.policy),
239 Arc::clone(&self.session_cache),
240 self.cwd.clone(),
241 Some(ui_tx.clone()),
242 Arc::clone(handle),
243 )
244 } else {
245 crate::permissions::PermissionExtension::new(
246 Arc::clone(&self.policy),
247 Arc::clone(&self.session_cache),
248 self.cwd.clone(),
249 ui_tx.clone(),
250 )
251 };
252 engine_builder = engine_builder.extension(Box::new(ext));
253 } else if self.headless_permissions {
254 let ext = if let Some(handle) = &self.permission_strategy_handle {
255 crate::permissions::PermissionExtension::with_strategy_handle(
256 Arc::clone(&self.policy),
257 Arc::clone(&self.session_cache),
258 self.cwd.clone(),
259 None,
260 Arc::clone(handle),
261 )
262 } else {
263 crate::permissions::PermissionExtension::headless(
264 Arc::clone(&self.policy),
265 Arc::clone(&self.session_cache),
266 self.cwd.clone(),
267 )
268 };
269 engine_builder = engine_builder.extension(Box::new(ext));
270 }
271 if self.autocompact_enabled
272 && settings.session.compact_at_context_pct > 0.0
273 && settings.session.compact_at_context_pct < 1.0
274 {
275 let cfg = AutocompactConfig {
276 threshold: settings.session.compact_at_context_pct,
277 max_context_tokens: settings.session.max_context_tokens,
278 keep_turns: settings.session.keep_turns.max(1),
279 };
280 engine_builder = engine_builder
281 .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
282 }
283 let engine = engine_builder.build();
284
285 let session = match (&mode, &self.session_store) {
286 (SessionMode::Resume(id), Some(store)) => {
287 let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
288 .await
289 .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
290 let entries = s
291 .entries()
292 .await
293 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
294 crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
295 s
296 }
297 (SessionMode::Resume(_), None) => {
298 return Err(AppError::Config("resume requires a session store".into()));
299 }
300 (SessionMode::New, Some(store)) => {
301 let id = crate::session::SessionId::new();
302 AgentSession::new_with_store(
303 id.into_string(),
304 Arc::clone(store),
305 engine,
306 Arc::clone(&llm),
307 )
308 }
309 (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
310 };
311
312 Ok((session, llm))
313 }
314}
315
316pub struct App {
317 session: arc_swap::ArcSwap<AgentSession>,
318 llm: arc_swap::ArcSwap<SharedLlm>,
319 factory: SessionFactory,
320 config: Config,
321 cancel_token: SharedCancelToken,
322 progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
323 next_tool_id: Arc<Mutex<ToolCallTracker>>,
324 skills: Arc<Vec<crate::skills::Skill>>,
325 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
326 pub(crate) extension_registry: Arc<crate::extensions::ExtensionRegistry>,
328 pub(crate) token_tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>,
333 extension_diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
334 pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
335 pub(crate) permission_mode: Arc<tokio::sync::RwLock<crate::permissions::PermissionMode>>,
338 pub(crate) permission_strategy_handle: Option<Arc<tokio::sync::RwLock<PromptStrategy>>>,
341 pub(crate) ui_tx_owned: Option<mpsc::Sender<UiEvent>>,
344}
345
346impl App {
347 pub fn config(&self) -> &Config {
348 &self.config
349 }
350
351 pub fn cancel(&self) {
355 self.cancel_token.cancel();
356 }
357
358 pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
362 Arc::clone(&self.session_cache)
363 }
364
365 pub fn extension_registry(&self) -> Arc<crate::extensions::ExtensionRegistry> {
368 Arc::clone(&self.extension_registry)
369 }
370
371 pub fn settings(&self) -> crate::settings::Settings {
373 let mut settings = self.factory.settings();
374 if let Some(model) = self.factory.current_model() {
375 settings.model.name = model.to_string();
376 }
377 settings
378 }
379
380 pub fn token_tally(&self) -> Arc<tokio::sync::Mutex<TurnStatsAccum>> {
382 Arc::clone(&self.token_tally)
383 }
384
385 pub async fn set_permission_mode(&self, mode: crate::permissions::PermissionMode) {
390 let new_strategy = match mode {
391 crate::permissions::PermissionMode::Bypass => PromptStrategy::AllowAll,
392 crate::permissions::PermissionMode::AcceptEdits => PromptStrategy::AcceptEdits,
393 crate::permissions::PermissionMode::Prompt if self.ui_tx_owned.is_none() => {
394 PromptStrategy::HeadlessDeny
395 }
396 crate::permissions::PermissionMode::Prompt => PromptStrategy::Prompt,
397 };
398
399 if let Some(handle) = &self.permission_strategy_handle {
400 *handle.write().await = new_strategy;
401 }
402
403 *self.permission_mode.write().await = mode;
404
405 if let Some(ui_tx) = &self.ui_tx_owned {
406 let _ = ui_tx.send(UiEvent::PermissionModeChanged { mode }).await;
407 }
408 }
409
410 pub async fn emit_settings_snapshot(&self) {
414 if let Some(ui_tx) = &self.ui_tx_owned {
415 let _ = ui_tx
416 .send(UiEvent::SettingsSnapshot {
417 settings: self.settings(),
418 })
419 .await;
420 }
421 }
422
423 pub async fn permission_mode(&self) -> crate::permissions::PermissionMode {
425 *self.permission_mode.read().await
426 }
427
428 pub fn extension_diagnostics(&self) -> Arc<Vec<crate::extensions::ExtensionDiagnostic>> {
430 Arc::clone(&self.extension_diagnostics)
431 }
432
433 pub fn session_id(&self) -> String {
436 self.session.load().session_id().to_string()
437 }
438
439 pub async fn session_history(
448 &self,
449 ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
450 self.session.load_full().history().await
451 }
452
453 pub async fn compact(&self) -> Result<()> {
459 use motosan_agent_loop::ThresholdStrategy;
460 let strategy = ThresholdStrategy {
461 threshold: 0.0,
462 ..ThresholdStrategy::default()
463 };
464 let llm = self.llm.load_full().client();
465 self.session
466 .load_full()
467 .maybe_compact(&strategy, llm)
468 .await
469 .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
470 Ok(())
471 }
472
473 pub async fn new_session(&self) -> Result<()> {
477 self.fire_session_before_switch("new", None).await?;
478 let (session, llm) = self.factory.build(SessionMode::New, None, None).await?;
479 self.session.store(Arc::new(session));
480 self.llm.store(Arc::new(SharedLlm::new(llm)));
481 self.reset_token_tally().await;
482 Ok(())
483 }
484
485 pub async fn load_session(&self, id: &str) -> Result<()> {
491 self.fire_session_before_switch("load", Some(id)).await?;
492 self.load_session_without_hook(id).await
493 }
494
495 async fn load_session_without_hook(&self, id: &str) -> Result<()> {
496 let (session, llm) = self
497 .factory
498 .build(SessionMode::Resume(id.to_string()), None, None)
499 .await?;
500 self.session.store(Arc::new(session));
501 self.llm.store(Arc::new(SharedLlm::new(llm)));
502 self.reset_token_tally().await;
503 Ok(())
504 }
505
506 async fn reset_token_tally(&self) {
507 let mut tally = self.token_tally.lock().await;
508 *tally = TurnStatsAccum::default();
509 }
510
511 pub async fn clone_session(&self) -> Result<String> {
520 self.fire_session_before_switch("clone", None).await?;
521 let Some(store) = self.factory.session_store.as_ref() else {
522 return Err(AppError::Config("clone requires a session store".into()));
523 };
524 let source_id = self.session.load().session_id().to_string();
525 let new_id = crate::session::SessionId::new().into_string();
526 let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
527 catalog
528 .fork(&source_id, &new_id)
529 .await
530 .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
531 self.load_session_without_hook(&new_id).await?;
532 Ok(new_id)
533 }
534
535 pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
550 self.fire_session_before_switch("model_switch", None)
551 .await?;
552 let current_id = self.session.load().session_id().to_string();
553 let (session, llm) = self
554 .factory
555 .build(SessionMode::Resume(current_id), Some(model), None)
556 .await?;
557 self.factory.set_current_model(model.clone());
558 self.session.store(Arc::new(session));
559 self.llm.store(Arc::new(SharedLlm::new(llm)));
560 Ok(())
561 }
562
563 pub async fn reload_settings(&self, new_settings: crate::settings::Settings) -> Result<()> {
579 let current_id = self.session.load().session_id().to_string();
583 let (session, llm) = self
584 .factory
585 .build(SessionMode::Resume(current_id), None, Some(&new_settings))
586 .await?;
587 self.factory.store_settings(new_settings);
588 self.factory.clear_current_model();
589 self.session.store(Arc::new(session));
590 self.llm.store(Arc::new(SharedLlm::new(llm)));
591
592 self.reset_token_tally().await;
595
596 Ok(())
597 }
598
599 pub async fn disconnect_mcp(&self) {
602 for (name, server) in &self.mcp_servers {
603 let _ =
604 tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
605 tracing::debug!(target: "mcp", server = %name, "disconnected");
606 }
607 }
608
609 fn run_turn(
610 &self,
611 msg: crate::user_message::UserMessage,
612 fork_from: Option<motosan_agent_loop::EntryId>,
613 ) -> impl Stream<Item = UiEvent> + Send + 'static {
614 let session = self.session.load_full();
615 let skills = Arc::clone(&self.skills);
616 let cancel_token = self.cancel_token.clone();
617 let tracker = Arc::clone(&self.next_tool_id);
618 let progress = Arc::clone(&self.progress_rx);
619 let token_tally = Arc::clone(&self.token_tally);
620 let settings_model_name = self
621 .factory
622 .current_model()
623 .map(|model| model.to_string())
624 .unwrap_or_else(|| self.factory.settings().model.name);
625
626 async_stream::stream! {
627 let new_user = {
631 let expanded_text = crate::skills::expand::expand_skill_command(&msg.text, &skills);
635 let expanded_msg = crate::user_message::UserMessage {
636 text: expanded_text,
637 attachments: msg.attachments.clone(),
638 };
639 match crate::user_message::prepare_user_message(&expanded_msg) {
640 Ok(m) => m,
641 Err(err) => {
642 yield UiEvent::AttachmentError {
643 kind: err.kind(),
644 message: err.to_string(),
645 };
646 return;
647 }
648 }
649 };
650
651 let mut progress_guard = match progress.try_lock() {
653 Ok(guard) => guard,
654 Err(_) => {
655 yield UiEvent::Error(
656 "another turn is already running; capo is single-turn-per-App".into(),
657 );
658 return;
659 }
660 };
661
662 let cancel = cancel_token.reset();
664
665 yield UiEvent::AgentTurnStarted;
666 yield UiEvent::AgentThinking;
667
668 let handle = match fork_from {
670 None => {
671 let history = match session.history().await {
673 Ok(h) => h,
674 Err(err) => {
675 yield UiEvent::Error(format!("session.history failed: {err}"));
676 return;
677 }
678 };
679 let mut messages = history;
680 messages.push(new_user);
681 match session.start_turn(messages).await {
682 Ok(h) => h,
683 Err(err) => {
684 yield UiEvent::Error(format!("session.start_turn failed: {err}"));
685 return;
686 }
687 }
688 }
689 Some(from) => {
690 match session.fork_turn(from, vec![new_user]).await {
692 Ok(h) => h,
693 Err(err) => {
694 yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
695 return;
696 }
697 }
698 }
699 };
700 let previous_len = handle.previous_len;
701 let epoch = handle.epoch;
702 let branch_parent = handle.branch_parent;
703 let ops_tx = handle.ops_tx.clone();
704 let mut agent_stream = handle.stream;
705
706 let interrupt_bridge = tokio::spawn(async move {
714 cancel.cancelled().await;
715 let _ = ops_tx.send(AgentOp::Interrupt).await;
716 });
717
718 let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
720 let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
721
722 loop {
723 while let Ok(chunk) = progress_guard.try_recv() {
725 yield UiEvent::ToolCallProgress {
726 id: progress_event_id(&tracker),
727 chunk: ProgressChunk::from(chunk),
728 };
729 }
730
731 tokio::select! {
732 biased;
733 maybe_item = agent_stream.next() => {
734 match maybe_item {
735 Some(AgentStreamItem::Event(ev)) => {
736 if let Some(ui) = map_event(ev, &tracker) {
737 yield ui;
738 }
739 }
740 Some(AgentStreamItem::Terminal(term)) => {
741 terminal_result = Some(term.result);
742 terminal_messages = Some(term.messages);
743 break;
744 }
745 None => break,
746 }
747 }
748 Some(chunk) = progress_guard.recv() => {
749 yield UiEvent::ToolCallProgress {
750 id: progress_event_id(&tracker),
751 chunk: ProgressChunk::from(chunk),
752 };
753 }
754 }
755 }
756
757 interrupt_bridge.abort();
759
760 if let Some(msgs) = terminal_messages.as_ref() {
762 if let Err(err) = session
763 .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
764 .await
765 {
766 yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
767 }
768 }
769
770 match terminal_result {
772 Some(Ok(result)) => {
773 if let Some(msgs) = terminal_messages.as_ref() {
776 for ev in extract_new_thinking_events(msgs, previous_len) {
777 yield ev;
778 }
779 }
780
781 let final_text = terminal_messages
782 .as_ref()
783 .and_then(|msgs| {
784 msgs.iter()
785 .rev()
786 .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
787 .map(|m| m.text())
788 })
789 .unwrap_or_default();
790 if !final_text.is_empty() {
791 yield UiEvent::AgentMessageComplete(final_text);
792 }
793 let usage = result.usage;
796 let (cumulative_input, cumulative_output) = {
797 let mut tally = token_tally.lock().await;
798 tally.add(usage);
799 (tally.cumulative_input, tally.cumulative_output)
800 };
801 yield UiEvent::TurnStats {
802 input_tokens: usage.input_tokens,
803 output_tokens: usage.output_tokens,
804 cumulative_input,
805 cumulative_output,
806 model: settings_model_name.clone(),
807 };
808 while let Ok(chunk) = progress_guard.try_recv() {
810 yield UiEvent::ToolCallProgress {
811 id: progress_event_id(&tracker),
812 chunk: ProgressChunk::from(chunk),
813 };
814 }
815 yield UiEvent::AgentTurnComplete;
816 }
817 Some(Err(err)) => {
818 if let motosan_agent_loop::AgentError::MaxIterations(n) = err {
831 yield UiEvent::Notice {
832 title: "Agent stopped".to_string(),
833 body: format!(
834 "Reached the per-turn iteration cap ({n}). \
835 Partial work is saved — send another message to continue."
836 ),
837 };
838 yield UiEvent::AgentTurnComplete;
839 } else {
840 yield UiEvent::Error(format!("{err}"));
841 }
842 }
843 None => { }
844 }
845 }
846 }
847
848 pub fn send_user_message(
849 &self,
850 msg: crate::user_message::UserMessage,
851 ) -> impl Stream<Item = UiEvent> + Send + 'static {
852 self.run_turn(msg, None)
853 }
854
855 pub fn fork_from(
860 &self,
861 from: motosan_agent_loop::EntryId,
862 message: crate::user_message::UserMessage,
863 ) -> impl Stream<Item = UiEvent> + Send + 'static {
864 let registry = Arc::clone(&self.extension_registry);
865 let inner = self.run_turn(message, Some(from));
866 async_stream::stream! {
867 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
868 match dispatch_session_before_switch(®istry, "fork", None).await {
869 HookOutcome::Continue => {}
870 HookOutcome::Cancelled { extension_name, reason } => {
871 let msg = match reason {
872 Some(r) => format!("extension `{extension_name}` cancelled fork: {r}"),
873 None => format!("extension `{extension_name}` cancelled fork"),
874 };
875 yield UiEvent::Error(msg);
876 return;
877 }
878 }
879 let mut inner = Box::pin(inner);
880 while let Some(ev) = futures::StreamExt::next(&mut inner).await {
881 yield ev;
882 }
883 }
884 }
885
886 pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
890 let entries = self
891 .session
892 .load_full()
893 .entries()
894 .await
895 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
896 let branch = motosan_agent_loop::active_branch(&entries);
897 let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
898 .iter()
899 .filter_map(|stored| {
900 let msg = stored.entry.as_message()?;
901 if !matches!(msg.role(), motosan_agent_loop::Role::User) {
902 return None;
903 }
904 let preview: String = msg
905 .text()
906 .lines()
907 .next()
908 .unwrap_or("")
909 .chars()
910 .take(80)
911 .collect();
912 Some((stored.id.clone(), preview))
913 })
914 .collect();
915 out.reverse();
916 Ok(out)
917 }
918
919 pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
921 self.session
922 .load_full()
923 .branches()
924 .await
925 .map_err(|e| AppError::Config(format!("branches failed: {e}")))
926 }
927
928 async fn fire_session_before_switch(
931 &self,
932 reason: &str,
933 session_id: Option<&str>,
934 ) -> Result<()> {
935 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
936 match dispatch_session_before_switch(&self.extension_registry, reason, session_id).await {
937 HookOutcome::Continue => Ok(()),
938 HookOutcome::Cancelled {
939 extension_name,
940 reason,
941 } => Err(AppError::HookCancelled {
942 extension_name,
943 reason,
944 }),
945 }
946 }
947}
948
949#[derive(Debug, Default)]
950struct ToolCallTracker {
951 next_id: usize,
952 pending: VecDeque<(String, String)>,
953}
954
955impl ToolCallTracker {
956 fn start(&mut self, name: &str) -> String {
957 self.next_id += 1;
958 let id = format!("tool_{}", self.next_id);
959 self.pending.push_back((name.to_string(), id.clone()));
960 id
961 }
962
963 fn complete(&mut self, name: &str) -> String {
964 if let Some(pos) = self
965 .pending
966 .iter()
967 .position(|(pending_name, _)| pending_name == name)
968 {
969 if let Some((_, id)) = self.pending.remove(pos) {
970 return id;
971 }
972 }
973
974 self.next_id += 1;
975 format!("tool_{}", self.next_id)
976 }
977
978 fn progress_id(&self) -> Option<String> {
983 match self.pending.len() {
984 1 => self.pending.front().map(|(_, id)| id.clone()),
985 _ => None,
986 }
987 }
988}
989
990fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
991 match tracker.lock() {
992 Ok(guard) => guard,
993 Err(poisoned) => poisoned.into_inner(),
994 }
995}
996
997fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
998 lock_tool_tracker(tracker)
999 .progress_id()
1000 .unwrap_or_else(|| "tool_unknown".to_string())
1001}
1002
1003fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
1004where
1005 F: Fn(&str) -> Option<String>,
1006{
1007 env_lookup("ANTHROPIC_API_KEY")
1008 .map(|key| key.trim().to_string())
1009 .filter(|key| !key.is_empty())
1010 .or_else(|| auth.api_key("anthropic").map(str::to_string))
1011}
1012
1013fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
1014 match ev {
1015 AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
1016 AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
1017 let id = lock_tool_tracker(tool_tracker).start(&name);
1018 Some(UiEvent::ToolCallStarted {
1019 id,
1020 name,
1021 args: serde_json::json!({}),
1022 })
1023 }
1024 AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
1025 let id = lock_tool_tracker(tool_tracker).complete(&name);
1026 Some(UiEvent::ToolCallCompleted {
1027 id,
1028 result: UiToolResult {
1029 is_error: result.is_error,
1030 text: format!("{name}: {result:?}"),
1031 },
1032 })
1033 }
1034 _ => None,
1035 }
1036}
1037
1038type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
1039
1040pub struct AppBuilder {
1041 config: Option<Config>,
1042 cwd: Option<PathBuf>,
1043 permission_gate: Option<Arc<dyn PermissionGate>>,
1044 install_builtin_tools: bool,
1045 max_iterations: usize,
1046 llm_override: Option<Arc<dyn LlmClient>>,
1047 custom_tools_factory: Option<CustomToolsFactory>,
1048 permissions_policy_path: Option<PathBuf>,
1049 ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
1050 headless_permissions: bool,
1051 settings: Option<crate::settings::Settings>,
1052 auth: Option<crate::auth::Auth>,
1053 context_discovery_disabled: bool,
1054 session_store: Option<Arc<dyn SessionStore>>,
1056 resume_session_id: Option<crate::session::SessionId>,
1057 autocompact_enabled: bool,
1058 skills: Vec<crate::skills::Skill>,
1060 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
1062 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1063 extension_registry: Option<Arc<crate::extensions::ExtensionRegistry>>,
1064 extension_diagnostics: Option<Arc<Vec<crate::extensions::ExtensionDiagnostic>>>,
1065 token_tally: Option<Arc<tokio::sync::Mutex<TurnStatsAccum>>>,
1066}
1067
1068impl Default for AppBuilder {
1069 fn default() -> Self {
1070 Self {
1071 config: None,
1072 cwd: None,
1073 permission_gate: None,
1074 install_builtin_tools: false,
1075 max_iterations: 200,
1082 llm_override: None,
1083 custom_tools_factory: None,
1084 permissions_policy_path: None,
1085 ui_tx: None,
1086 headless_permissions: false,
1087 settings: None,
1088 auth: None,
1089 context_discovery_disabled: false,
1090 session_store: None,
1091 resume_session_id: None,
1092 autocompact_enabled: false,
1093 skills: Vec::new(),
1094 extra_tools: Vec::new(),
1095 mcp_servers: Vec::new(),
1096 extension_registry: None,
1097 extension_diagnostics: None,
1098 token_tally: None,
1099 }
1100 }
1101}
1102
1103impl AppBuilder {
1104 pub fn new() -> Self {
1105 Self::default()
1106 }
1107
1108 pub fn with_config(mut self, cfg: Config) -> Self {
1109 self.config = Some(cfg);
1110 self
1111 }
1112
1113 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
1114 self.cwd = Some(cwd.into());
1115 self
1116 }
1117
1118 pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
1119 self.permission_gate = Some(gate);
1120 self
1121 }
1122
1123 pub fn with_builtin_tools(mut self) -> Self {
1129 self.install_builtin_tools = true;
1130 self
1131 }
1132
1133 pub fn with_max_iterations(mut self, n: usize) -> Self {
1134 self.max_iterations = n;
1135 self
1136 }
1137
1138 pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
1139 self.llm_override = Some(llm);
1140 self
1141 }
1142
1143 pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
1144 self.permissions_policy_path = Some(path);
1145 self
1146 }
1147
1148 pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
1149 self.ui_tx = Some(tx);
1150 self
1151 }
1152
1153 pub fn with_token_tally(mut self, tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>) -> Self {
1155 self.token_tally = Some(tally);
1156 self
1157 }
1158
1159 pub fn with_headless_permissions(mut self) -> Self {
1164 self.headless_permissions = true;
1165 self
1166 }
1167
1168 pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
1170 self.settings = Some(settings);
1171 self
1172 }
1173
1174 pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
1176 self.auth = Some(auth);
1177 self
1178 }
1179
1180 pub fn disable_context_discovery(mut self) -> Self {
1183 self.context_discovery_disabled = true;
1184 self
1185 }
1186
1187 pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
1190 self.session_store = Some(store);
1191 self
1192 }
1193
1194 pub fn with_autocompact(mut self) -> Self {
1199 self.autocompact_enabled = true;
1200 self
1201 }
1202
1203 pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
1209 self.skills = skills;
1210 self
1211 }
1212
1213 pub fn without_skills(mut self) -> Self {
1214 self.skills.clear();
1215 self
1216 }
1217
1218 pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
1222 self.extra_tools = tools;
1223 self
1224 }
1225
1226 pub fn with_extension_registry(
1227 mut self,
1228 registry: Arc<crate::extensions::ExtensionRegistry>,
1229 ) -> Self {
1230 self.extension_registry = Some(registry);
1231 self
1232 }
1233
1234 pub fn with_extension_diagnostics(
1235 mut self,
1236 diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
1237 ) -> Self {
1238 self.extension_diagnostics = Some(diagnostics);
1239 self
1240 }
1241
1242 pub fn with_mcp_servers(
1245 mut self,
1246 servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1247 ) -> Self {
1248 self.mcp_servers = servers;
1249 self
1250 }
1251
1252 pub fn with_custom_tools_factory(
1257 mut self,
1258 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1259 ) -> Self {
1260 self.custom_tools_factory = Some(Box::new(factory));
1261 self
1262 }
1263
1264 pub async fn build_with_custom_tools(
1268 self,
1269 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1270 ) -> Result<App> {
1271 self.with_custom_tools_factory(factory).build().await
1272 }
1273
1274 pub async fn build_with_session(
1282 mut self,
1283 resume: Option<crate::session::SessionId>,
1284 ) -> Result<App> {
1285 if let Some(id) = resume {
1286 if self.session_store.is_none() {
1287 return Err(AppError::Config(
1288 "build_with_session(Some(id)) requires with_session_store(...)".into(),
1289 ));
1290 }
1291 self.resume_session_id = Some(id);
1292 }
1293 self.build_internal().await
1294 }
1295
1296 pub async fn build(self) -> Result<App> {
1298 self.build_with_session(None).await
1299 }
1300
1301 async fn build_internal(mut self) -> Result<App> {
1302 let mcp_servers = std::mem::take(&mut self.mcp_servers);
1303 let extra_tools = std::mem::take(&mut self.extra_tools);
1304 let skills = self.skills.clone();
1305 if self.install_builtin_tools && self.custom_tools_factory.is_some() {
1306 return Err(AppError::Config(
1307 "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
1308 ));
1309 }
1310
1311 let has_config = self.config.is_some();
1315 let has_auth = self.auth.is_some();
1316 let mut config = self.config.unwrap_or_default();
1317 let settings = match self.settings {
1318 Some(settings) => settings,
1319 None => {
1320 let mut settings = crate::settings::Settings::default();
1321 settings.model.provider = config.model.provider.clone();
1322 settings.model.name = config.model.name.clone();
1323 settings.model.max_tokens = config.model.max_tokens;
1324 settings
1325 }
1326 };
1327 config.model.provider = settings.model.provider.clone();
1328 config.model.name = settings.model.name.clone();
1329 config.model.max_tokens = settings.model.max_tokens;
1330 let mut auth = self.auth.unwrap_or_default();
1331 if !has_auth {
1332 if let Some(key) = config.anthropic.api_key.as_deref() {
1333 auth.0.insert(
1334 "anthropic".into(),
1335 crate::auth::ProviderAuth::ApiKey {
1336 key: key.to_string(),
1337 },
1338 );
1339 }
1340 }
1341 let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
1342 if env_or_auth_key.is_some() || has_auth || !has_config {
1343 config.anthropic.api_key = env_or_auth_key;
1344 }
1345 let cwd = self
1346 .cwd
1347 .or_else(|| std::env::current_dir().ok())
1348 .unwrap_or_else(|| PathBuf::from("."));
1349 let agent_dir = crate::paths::agent_dir();
1350 let permission_gate = self.permission_gate.unwrap_or_else(|| {
1351 if self.ui_tx.is_some() || self.headless_permissions {
1355 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1356 } else {
1357 tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
1358 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1359 }
1360 });
1361
1362 let policy: Arc<crate::permissions::Policy> =
1364 Arc::new(match self.permissions_policy_path.as_ref() {
1365 Some(path) => crate::permissions::Policy::load_or_default(path)?,
1366 None => crate::permissions::Policy::default(),
1367 });
1368 let session_cache = Arc::new(crate::permissions::SessionCache::new());
1369 let permission_strategy_handle = if self.ui_tx.is_some() || self.headless_permissions {
1370 let initial_strategy = if self.ui_tx.is_some() {
1371 PromptStrategy::Prompt
1372 } else {
1373 PromptStrategy::HeadlessDeny
1374 };
1375 Some(Arc::new(tokio::sync::RwLock::new(initial_strategy)))
1376 } else {
1377 None
1378 };
1379 let permission_mode = Arc::new(tokio::sync::RwLock::new(
1380 crate::permissions::PermissionMode::default(),
1381 ));
1382 let token_tally = self
1383 .token_tally
1384 .unwrap_or_else(|| Arc::new(tokio::sync::Mutex::new(TurnStatsAccum::default())));
1385
1386 let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
1388
1389 let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
1394 let cancel_token = probe_ctx.cancel_token.clone();
1395 let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
1396 if let Some(factory_fn) = self.custom_tools_factory.take() {
1397 let mut t = factory_fn(probe_ctx);
1398 t.extend(extra_tools.clone());
1399 (false, t)
1400 } else {
1401 (self.install_builtin_tools, extra_tools.clone())
1402 };
1403
1404 let factory = SessionFactory {
1405 cwd: cwd.clone(),
1406 settings: Arc::new(Mutex::new(settings.clone())),
1407 auth: auth.clone(),
1408 policy: Arc::clone(&policy),
1409 session_cache: Arc::clone(&session_cache),
1410 ui_tx: self.ui_tx.clone(),
1411 headless_permissions: self.headless_permissions,
1412 permission_gate: Arc::clone(&permission_gate),
1413 permission_strategy_handle: permission_strategy_handle.clone(),
1414 progress_tx: progress_tx.clone(),
1415 skills: Arc::new(skills.clone()),
1416 install_builtin_tools: install_builtin,
1417 extra_tools: factory_extra_tools,
1418 max_iterations: self.max_iterations,
1419 context_discovery_disabled: self.context_discovery_disabled,
1420 autocompact_enabled: self.autocompact_enabled,
1421 session_store: self.session_store.clone(),
1422 llm_override: self.llm_override.clone(),
1423 current_model: Arc::new(Mutex::new(None)),
1424 cancel_token: cancel_token.clone(),
1425 };
1426
1427 let mode = match self.resume_session_id.take() {
1428 Some(id) => SessionMode::Resume(id.into_string()),
1429 None => SessionMode::New,
1430 };
1431 let (session, llm) = factory.build(mode, None, None).await?;
1432 let (extension_registry, extension_diagnostics) =
1433 if let Some(reg) = self.extension_registry.take() {
1434 let diagnostics = self
1435 .extension_diagnostics
1436 .unwrap_or_else(|| Arc::new(Vec::new()));
1437 (reg, diagnostics)
1438 } else {
1439 let manifest_path = crate::paths::extensions_manifest_path(&agent_dir);
1440 let (registry, diagnostics) =
1441 crate::extensions::load_extensions_manifest(&manifest_path).await;
1442 for d in &diagnostics {
1443 let message = d.message.as_str();
1444 match d.severity {
1445 crate::extensions::DiagnosticSeverity::Warn => {
1446 tracing::warn!("extensions: {message}");
1447 }
1448 crate::extensions::DiagnosticSeverity::Error => {
1449 tracing::error!("extensions: {message}");
1450 }
1451 }
1452 }
1453 (Arc::new(registry), Arc::new(diagnostics))
1454 };
1455
1456 Ok(App {
1457 session: arc_swap::ArcSwap::from_pointee(session),
1458 llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1459 factory,
1460 config,
1461 cancel_token,
1462 progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1463 next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1464 skills: Arc::new(skills),
1465 mcp_servers,
1466 extension_registry,
1467 token_tally,
1468 extension_diagnostics,
1469 session_cache,
1470 permission_mode,
1471 permission_strategy_handle,
1472 ui_tx_owned: self.ui_tx.clone(),
1473 })
1474 }
1475}
1476
1477#[cfg(test)]
1478mod tests {
1479 use super::*;
1480 use crate::config::{AnthropicConfig, ModelConfig};
1481 use crate::events::UiEvent;
1482 use crate::user_message::UserMessage;
1483 use async_trait::async_trait;
1484 use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1485 use motosan_agent_tool::ToolDef;
1486 use std::sync::atomic::{AtomicUsize, Ordering};
1487
1488 #[test]
1489 fn turn_stats_accum_accumulates_across_calls() {
1490 let mut accum = TurnStatsAccum::default();
1491 accum.add(motosan_agent_loop::TokenUsage {
1492 input_tokens: 100,
1493 output_tokens: 25,
1494 });
1495 accum.add(motosan_agent_loop::TokenUsage {
1496 input_tokens: 1000,
1497 output_tokens: 250,
1498 });
1499 assert_eq!(accum.cumulative_input, 1100);
1500 assert_eq!(accum.cumulative_output, 275);
1501 assert_eq!(accum.turn_count, 2);
1502 }
1503
1504 #[test]
1505 fn turn_stats_accum_saturates_on_overflow() {
1506 let mut accum = TurnStatsAccum {
1507 cumulative_input: u64::MAX - 5,
1508 cumulative_output: 0,
1509 turn_count: 1,
1510 };
1511 accum.add(motosan_agent_loop::TokenUsage {
1512 input_tokens: 100,
1513 output_tokens: 0,
1514 });
1515 assert_eq!(accum.cumulative_input, u64::MAX);
1516 }
1517
1518 #[test]
1519 fn extract_thinking_events_extracts_reasoning_in_source_order() {
1520 use motosan_agent_loop::{new_message_id, AssistantContent, MessageMeta};
1521 let messages = vec![Message::Assistant {
1522 id: new_message_id(),
1523 meta: MessageMeta::default(),
1524 content: vec![
1525 AssistantContent::Reasoning {
1526 text: "first thought".into(),
1527 signature: None,
1528 },
1529 AssistantContent::Text {
1530 text: "answer 1".into(),
1531 },
1532 AssistantContent::Reasoning {
1533 text: "second thought".into(),
1534 signature: None,
1535 },
1536 ],
1537 }];
1538 let events = extract_thinking_events(&messages);
1539 assert_eq!(events.len(), 2);
1540 match (&events[0], &events[1]) {
1541 (UiEvent::ThinkingComplete { text: t1 }, UiEvent::ThinkingComplete { text: t2 }) => {
1542 assert_eq!(t1, "first thought");
1543 assert_eq!(t2, "second thought");
1544 }
1545 _ => panic!("expected two ThinkingComplete events"),
1546 }
1547 }
1548
1549 #[test]
1550 fn extract_new_thinking_events_skips_historical_reasoning_before_previous_len() {
1551 use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1552 let messages = vec![
1553 Message::User {
1554 id: new_message_id(),
1555 meta: MessageMeta::default(),
1556 content: vec![ContentPart::text("old question")],
1557 },
1558 Message::Assistant {
1559 id: new_message_id(),
1560 meta: MessageMeta::default(),
1561 content: vec![AssistantContent::Reasoning {
1562 text: "old thought".into(),
1563 signature: None,
1564 }],
1565 },
1566 Message::User {
1567 id: new_message_id(),
1568 meta: MessageMeta::default(),
1569 content: vec![ContentPart::text("new question")],
1570 },
1571 Message::Assistant {
1572 id: new_message_id(),
1573 meta: MessageMeta::default(),
1574 content: vec![AssistantContent::Reasoning {
1575 text: "new thought".into(),
1576 signature: None,
1577 }],
1578 },
1579 ];
1580
1581 let events = extract_new_thinking_events(&messages, 2);
1582 assert_eq!(events.len(), 1, "should only emit current-turn reasoning");
1583 match &events[0] {
1584 UiEvent::ThinkingComplete { text } => assert_eq!(text, "new thought"),
1585 other => panic!("expected ThinkingComplete; got {other:?}"),
1586 }
1587 }
1588
1589 #[test]
1590 fn extract_thinking_events_skips_non_assistant_and_non_reasoning() {
1591 use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1592 let messages = vec![
1593 Message::User {
1594 id: new_message_id(),
1595 meta: MessageMeta::default(),
1596 content: vec![ContentPart::text("question?")],
1597 },
1598 Message::Assistant {
1599 id: new_message_id(),
1600 meta: MessageMeta::default(),
1601 content: vec![AssistantContent::Text {
1602 text: "answer".into(),
1603 }],
1604 },
1605 ];
1606 let events = extract_thinking_events(&messages);
1607 assert!(events.is_empty(), "expected no events; got {events:?}");
1608 }
1609
1610 #[tokio::test]
1611 async fn builder_fails_without_api_key() {
1612 let cfg = Config {
1613 anthropic: AnthropicConfig {
1614 api_key: None,
1615 base_url: "https://api.anthropic.com".into(),
1616 },
1617 model: ModelConfig {
1618 provider: "anthropic".into(),
1619 name: "claude-sonnet-4-6".into(),
1620 max_tokens: 4096,
1621 },
1622 };
1623 let err = match AppBuilder::new()
1624 .with_config(cfg)
1625 .with_builtin_tools()
1626 .build()
1627 .await
1628 {
1629 Ok(_) => panic!("must fail without key"),
1630 Err(err) => err,
1631 };
1632 assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1633 }
1634
1635 struct ToolOnlyLlm {
1636 turn: AtomicUsize,
1637 }
1638
1639 #[async_trait]
1640 impl LlmClient for ToolOnlyLlm {
1641 async fn chat(
1642 &self,
1643 _messages: &[Message],
1644 _tools: &[ToolDef],
1645 ) -> motosan_agent_loop::Result<ChatOutput> {
1646 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1647 if turn == 0 {
1648 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1649 ToolCallItem {
1650 id: "t1".into(),
1651 name: "read".into(),
1652 args: serde_json::json!({"path":"nope.txt"}),
1653 },
1654 ])))
1655 } else {
1656 Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1657 }
1658 }
1659 }
1660
1661 #[tokio::test]
1662 async fn empty_final_message_is_not_emitted() {
1663 let dir = tempfile::tempdir().unwrap();
1664 let mut cfg = Config::default();
1665 cfg.anthropic.api_key = Some("sk-unused".into());
1666 let app = AppBuilder::new()
1667 .with_config(cfg)
1668 .with_cwd(dir.path())
1669 .with_builtin_tools()
1670 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1671 turn: AtomicUsize::new(0),
1672 }))
1673 .build()
1674 .await
1675 .expect("build");
1676 let events: Vec<UiEvent> =
1677 futures::StreamExt::collect(app.send_user_message(UserMessage::text("x"))).await;
1678 let empties = events
1679 .iter()
1680 .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1681 .count();
1682 assert_eq!(
1683 empties, 0,
1684 "should not emit empty final message, got: {events:?}"
1685 );
1686 }
1687
1688 struct EchoLlm;
1689
1690 struct UsageLlm {
1691 responses: std::sync::Mutex<VecDeque<ChatOutput>>,
1692 }
1693
1694 #[async_trait]
1695 impl LlmClient for UsageLlm {
1696 async fn chat(
1697 &self,
1698 _messages: &[Message],
1699 _tools: &[ToolDef],
1700 ) -> motosan_agent_loop::Result<ChatOutput> {
1701 let next = match self.responses.lock() {
1702 Ok(mut responses) => responses.pop_front(),
1703 Err(poisoned) => poisoned.into_inner().pop_front(),
1704 };
1705 Ok(next.unwrap_or_else(|| panic!("UsageLlm script exhausted")))
1706 }
1707 }
1708
1709 #[async_trait]
1710 impl LlmClient for EchoLlm {
1711 async fn chat(
1712 &self,
1713 _messages: &[Message],
1714 _tools: &[ToolDef],
1715 ) -> motosan_agent_loop::Result<ChatOutput> {
1716 Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1717 }
1718 }
1719
1720 #[tokio::test]
1721 async fn turn_stats_emitted_with_cumulative_after_each_turn() {
1722 use motosan_agent_loop::{LlmResponse, TokenUsage};
1723
1724 let dir = tempfile::tempdir().expect("tempdir");
1725 let mut cfg = Config::default();
1726 cfg.anthropic.api_key = Some("sk-unused".into());
1727 let llm = Arc::new(UsageLlm {
1728 responses: std::sync::Mutex::new(VecDeque::from([
1729 ChatOutput::with_usage(
1730 LlmResponse::Message("first".into()),
1731 TokenUsage {
1732 input_tokens: 100,
1733 output_tokens: 50,
1734 },
1735 ),
1736 ChatOutput::with_usage(
1737 LlmResponse::Message("second".into()),
1738 TokenUsage {
1739 input_tokens: 200,
1740 output_tokens: 80,
1741 },
1742 ),
1743 ])),
1744 });
1745 let app = AppBuilder::new()
1746 .with_config(cfg)
1747 .with_cwd(dir.path())
1748 .with_builtin_tools()
1749 .with_llm(llm)
1750 .build()
1751 .await
1752 .expect("build");
1753
1754 let events_1: Vec<UiEvent> = app
1755 .send_user_message(UserMessage::text("hi"))
1756 .collect()
1757 .await;
1758 let events_2: Vec<UiEvent> = app
1759 .send_user_message(UserMessage::text("again"))
1760 .collect()
1761 .await;
1762
1763 let ts1 = events_1
1764 .iter()
1765 .find_map(|e| match e {
1766 UiEvent::TurnStats {
1767 input_tokens,
1768 output_tokens,
1769 cumulative_input,
1770 cumulative_output,
1771 ..
1772 } => Some((
1773 *input_tokens,
1774 *output_tokens,
1775 *cumulative_input,
1776 *cumulative_output,
1777 )),
1778 _ => None,
1779 })
1780 .expect("turn 1 had no TurnStats");
1781 assert_eq!(ts1, (100, 50, 100, 50), "turn 1 stats");
1782
1783 let ts2 = events_2
1784 .iter()
1785 .find_map(|e| match e {
1786 UiEvent::TurnStats {
1787 input_tokens,
1788 output_tokens,
1789 cumulative_input,
1790 cumulative_output,
1791 ..
1792 } => Some((
1793 *input_tokens,
1794 *output_tokens,
1795 *cumulative_input,
1796 *cumulative_output,
1797 )),
1798 _ => None,
1799 })
1800 .expect("turn 2 had no TurnStats");
1801 assert_eq!(ts2, (200, 80, 300, 130), "turn 2 stats");
1802
1803 let positions: Vec<&str> = events_1
1804 .iter()
1805 .filter_map(|e| match e {
1806 UiEvent::AgentMessageComplete(_) => Some("msg_complete"),
1807 UiEvent::TurnStats { .. } => Some("stats"),
1808 UiEvent::AgentTurnComplete => Some("turn_complete"),
1809 _ => None,
1810 })
1811 .collect();
1812 assert_eq!(
1813 positions,
1814 vec!["msg_complete", "stats", "turn_complete"],
1815 "wrong ordering"
1816 );
1817 }
1818
1819 #[tokio::test]
1820 async fn turn_stats_reset_after_new_session() {
1821 use motosan_agent_loop::{LlmResponse, TokenUsage};
1822
1823 let dir = tempfile::tempdir().expect("tempdir");
1824 let mut cfg = Config::default();
1825 cfg.anthropic.api_key = Some("sk-unused".into());
1826 let llm = Arc::new(UsageLlm {
1827 responses: std::sync::Mutex::new(VecDeque::from([
1828 ChatOutput::with_usage(
1829 LlmResponse::Message("first".into()),
1830 TokenUsage {
1831 input_tokens: 100,
1832 output_tokens: 50,
1833 },
1834 ),
1835 ChatOutput::with_usage(
1836 LlmResponse::Message("after-new".into()),
1837 TokenUsage {
1838 input_tokens: 7,
1839 output_tokens: 3,
1840 },
1841 ),
1842 ])),
1843 });
1844 let app = AppBuilder::new()
1845 .with_config(cfg)
1846 .with_cwd(dir.path())
1847 .with_builtin_tools()
1848 .with_llm(llm)
1849 .build()
1850 .await
1851 .expect("build");
1852
1853 let _: Vec<UiEvent> = app
1854 .send_user_message(UserMessage::text("hi"))
1855 .collect()
1856 .await;
1857 app.new_session().await.expect("new session");
1858 let events: Vec<UiEvent> = app
1859 .send_user_message(UserMessage::text("after new"))
1860 .collect()
1861 .await;
1862 let stats = events
1863 .iter()
1864 .find_map(|event| match event {
1865 UiEvent::TurnStats {
1866 input_tokens,
1867 output_tokens,
1868 cumulative_input,
1869 cumulative_output,
1870 ..
1871 } => Some((
1872 *input_tokens,
1873 *output_tokens,
1874 *cumulative_input,
1875 *cumulative_output,
1876 )),
1877 _ => None,
1878 })
1879 .expect("turn had no TurnStats");
1880 assert_eq!(stats, (7, 3, 7, 3));
1881 }
1882
1883 #[tokio::test]
1884 async fn with_headless_permissions_builds_an_app() {
1885 let dir = tempfile::tempdir().expect("tempdir");
1886 let mut config = Config::default();
1887 config.anthropic.api_key = Some("sk-unused".into());
1888 let app = AppBuilder::new()
1889 .with_config(config)
1890 .with_cwd(dir.path())
1891 .with_builtin_tools()
1892 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1893 .with_headless_permissions()
1894 .build()
1895 .await
1896 .expect("build");
1897 assert!(!app.session_id().is_empty());
1899 }
1900
1901 #[tokio::test]
1902 async fn new_session_swaps_in_a_fresh_empty_session() {
1903 use futures::StreamExt;
1904 let dir = tempfile::tempdir().expect("tempdir");
1905 let mut config = Config::default();
1906 config.anthropic.api_key = Some("sk-unused".into());
1907 let app = AppBuilder::new()
1908 .with_config(config)
1909 .with_cwd(dir.path())
1910 .with_builtin_tools()
1911 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1912 .build()
1913 .await
1914 .expect("build");
1915
1916 let _: Vec<_> = app
1917 .send_user_message(UserMessage::text("hello"))
1918 .collect()
1919 .await;
1920 let id_before = app.session_id();
1921 assert!(!app.session_history().await.expect("history").is_empty());
1922
1923 app.new_session().await.expect("new_session");
1924
1925 assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1926 assert!(
1927 app.session_history().await.expect("history").is_empty(),
1928 "fresh session has no history"
1929 );
1930 }
1931
1932 #[tokio::test]
1933 async fn load_session_restores_a_stored_session_by_id() {
1934 use futures::StreamExt;
1935 let dir = tempfile::tempdir().expect("tempdir");
1936 let store_dir = dir.path().join("sessions");
1937 let store: Arc<dyn SessionStore> =
1938 Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1939 let mut config = Config::default();
1940 config.anthropic.api_key = Some("sk-unused".into());
1941 let app = AppBuilder::new()
1942 .with_config(config)
1943 .with_cwd(dir.path())
1944 .with_builtin_tools()
1945 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1946 .with_session_store(Arc::clone(&store))
1947 .build()
1948 .await
1949 .expect("build");
1950
1951 let _: Vec<_> = app
1952 .send_user_message(UserMessage::text("remember this"))
1953 .collect()
1954 .await;
1955 let original_id = app.session_id();
1956
1957 app.new_session().await.expect("new_session");
1958 assert_ne!(app.session_id(), original_id);
1959
1960 app.load_session(&original_id).await.expect("load_session");
1961 assert_eq!(app.session_id(), original_id);
1962 let history = app.session_history().await.expect("history");
1963 assert!(
1964 history.iter().any(|m| m.text().contains("remember this")),
1965 "loaded session should carry the original turn"
1966 );
1967 }
1968
1969 #[tokio::test]
1970 async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
1971 use futures::StreamExt;
1972 let dir = tempfile::tempdir().expect("tempdir");
1973 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1974 dir.path().join("s"),
1975 ));
1976 let mut config = Config::default();
1977 config.anthropic.api_key = Some("sk-unused".into());
1978 let app = AppBuilder::new()
1979 .with_config(config)
1980 .with_cwd(dir.path())
1981 .with_builtin_tools()
1982 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1983 .with_session_store(store)
1984 .build()
1985 .await
1986 .expect("build");
1987
1988 let _: Vec<_> = app
1989 .send_user_message(UserMessage::text("hello"))
1990 .collect()
1991 .await;
1992 let original_id = app.session_id();
1993
1994 let new_id = app.clone_session().await.expect("clone_session");
1995
1996 assert_ne!(new_id, original_id);
1998 assert_eq!(app.session_id(), new_id);
1999 let history = app.session_history().await.expect("history");
2001 assert!(history.iter().any(|m| m.text().contains("hello")));
2002 }
2003
2004 #[tokio::test]
2005 async fn fork_from_creates_a_branch_off_an_earlier_entry() {
2006 use futures::StreamExt;
2007 let dir = tempfile::tempdir().expect("tempdir");
2008 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2009 dir.path().join("s"),
2010 ));
2011 let mut config = Config::default();
2012 config.anthropic.api_key = Some("sk-unused".into());
2013 let app = AppBuilder::new()
2014 .with_config(config)
2015 .with_cwd(dir.path())
2016 .with_builtin_tools()
2017 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2018 .with_session_store(store)
2019 .build()
2020 .await
2021 .expect("build");
2022
2023 let _: Vec<_> = app
2025 .send_user_message(UserMessage::text("first"))
2026 .collect()
2027 .await;
2028 let _: Vec<_> = app
2029 .send_user_message(UserMessage::text("second"))
2030 .collect()
2031 .await;
2032
2033 let entries = app.session.load_full().entries().await.expect("entries");
2035 let first_id = entries
2036 .iter()
2037 .find_map(|stored| {
2038 let msg = stored.entry.as_message()?;
2039 (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
2040 .then(|| stored.id.clone())
2041 })
2042 .expect("first user message present");
2043
2044 let _: Vec<_> = app
2046 .fork_from(first_id, UserMessage::text("branched"))
2047 .collect()
2048 .await;
2049
2050 let history = app.session_history().await.expect("history");
2051 let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
2052 assert!(
2053 texts.iter().any(|t| t.contains("first")),
2054 "fork keeps the fork-point ancestor"
2055 );
2056 assert!(
2057 texts.iter().any(|t| t.contains("branched")),
2058 "fork includes the new message"
2059 );
2060 assert!(
2061 !texts.iter().any(|t| t.contains("second")),
2062 "fork excludes the abandoned branch"
2063 );
2064 }
2065
2066 #[tokio::test]
2067 async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
2068 use futures::StreamExt;
2069 let dir = tempfile::tempdir().expect("tempdir");
2070 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2071 dir.path().join("s"),
2072 ));
2073 let mut config = Config::default();
2074 config.anthropic.api_key = Some("sk-unused".into());
2075 let app = AppBuilder::new()
2076 .with_config(config)
2077 .with_cwd(dir.path())
2078 .with_builtin_tools()
2079 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2080 .with_session_store(store)
2081 .build()
2082 .await
2083 .expect("build");
2084
2085 let _: Vec<_> = app
2086 .send_user_message(UserMessage::text("alpha"))
2087 .collect()
2088 .await;
2089 let _: Vec<_> = app
2090 .send_user_message(UserMessage::text("bravo"))
2091 .collect()
2092 .await;
2093
2094 let candidates = app.fork_candidates().await.expect("candidates");
2095 let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
2096 assert!(previews[0].contains("bravo"), "got {previews:?}");
2098 assert!(previews.iter().any(|p| p.contains("alpha")));
2099 assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
2101 }
2102
2103 #[tokio::test]
2104 async fn branches_returns_a_tree_for_a_linear_session() {
2105 use futures::StreamExt;
2106 let dir = tempfile::tempdir().expect("tempdir");
2107 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2108 dir.path().join("s"),
2109 ));
2110 let mut config = Config::default();
2111 config.anthropic.api_key = Some("sk-unused".into());
2112 let app = AppBuilder::new()
2113 .with_config(config)
2114 .with_cwd(dir.path())
2115 .with_builtin_tools()
2116 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2117 .with_session_store(store)
2118 .build()
2119 .await
2120 .expect("build");
2121
2122 let _: Vec<_> = app
2123 .send_user_message(UserMessage::text("hello"))
2124 .collect()
2125 .await;
2126 let tree = app.branches().await.expect("branches");
2127 assert!(!tree.nodes.is_empty());
2129 assert!(tree.active_leaf.is_some());
2130 }
2131
2132 #[tokio::test]
2133 async fn reload_settings_rebuilds_session_and_resets_token_tally() {
2134 let dir = tempfile::tempdir().expect("tempdir");
2135 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2136 dir.path().join("s"),
2137 ));
2138 let mut cfg = Config::default();
2139 cfg.anthropic.api_key = Some("sk-unused".into());
2140
2141 let llm = Arc::new(UsageLlm {
2142 responses: std::sync::Mutex::new(VecDeque::from([
2143 ChatOutput::with_usage(
2144 LlmResponse::Message("first".into()),
2145 motosan_agent_loop::TokenUsage {
2146 input_tokens: 100,
2147 output_tokens: 50,
2148 },
2149 ),
2150 ChatOutput::with_usage(
2151 LlmResponse::Message("after-reload".into()),
2152 motosan_agent_loop::TokenUsage {
2153 input_tokens: 5,
2154 output_tokens: 2,
2155 },
2156 ),
2157 ])),
2158 });
2159 let app = AppBuilder::new()
2160 .with_config(cfg)
2161 .with_cwd(dir.path())
2162 .with_builtin_tools()
2163 .with_llm(llm)
2164 .with_session_store(store)
2165 .build()
2166 .await
2167 .expect("build");
2168
2169 let _: Vec<UiEvent> = app
2170 .send_user_message(crate::user_message::UserMessage::text("hi"))
2171 .collect()
2172 .await;
2173 {
2174 let token_tally = app.token_tally();
2175 let tally = token_tally.lock().await;
2176 assert_eq!(tally.cumulative_input, 100);
2177 assert_eq!(tally.cumulative_output, 50);
2178 assert_eq!(tally.turn_count, 1);
2179 }
2180
2181 let mut new_settings = crate::settings::Settings::default();
2182 new_settings.model.name = "claude-opus-4-7".into();
2183 new_settings.ui.footer_show_cost = false;
2184 app.reload_settings(new_settings).await.expect("reload");
2185 assert_eq!(app.factory.settings().model.name, "claude-opus-4-7");
2186 assert!(!app.factory.settings().ui.footer_show_cost);
2187 assert_eq!(app.factory.current_model(), None);
2188
2189 {
2190 let token_tally = app.token_tally();
2191 let tally = token_tally.lock().await;
2192 assert_eq!(tally.cumulative_input, 0, "tally not reset on reload");
2193 assert_eq!(tally.cumulative_output, 0);
2194 assert_eq!(tally.turn_count, 0);
2195 }
2196
2197 let _: Vec<UiEvent> = app
2198 .send_user_message(crate::user_message::UserMessage::text("after"))
2199 .collect()
2200 .await;
2201 {
2202 let token_tally = app.token_tally();
2203 let tally = token_tally.lock().await;
2204 assert_eq!(tally.cumulative_input, 5);
2205 assert_eq!(tally.cumulative_output, 2);
2206 assert_eq!(tally.turn_count, 1);
2207 }
2208 }
2209
2210 #[tokio::test]
2211 async fn switch_model_preserves_history() {
2212 use futures::StreamExt;
2213 let dir = tempfile::tempdir().expect("tempdir");
2214 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2215 dir.path().join("s"),
2216 ));
2217 let mut config = Config::default();
2218 config.anthropic.api_key = Some("sk-unused".into());
2219 let app = AppBuilder::new()
2220 .with_config(config)
2221 .with_cwd(dir.path())
2222 .with_builtin_tools()
2223 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2224 .with_session_store(store)
2225 .build()
2226 .await
2227 .expect("build");
2228
2229 let _: Vec<_> = app
2230 .send_user_message(UserMessage::text("keep me"))
2231 .collect()
2232 .await;
2233 let id_before = app.session_id();
2234
2235 app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
2236 .await
2237 .expect("switch_model");
2238
2239 assert_eq!(
2240 app.session_id(),
2241 id_before,
2242 "switch_model keeps the same session"
2243 );
2244 let history = app.session_history().await.expect("history");
2245 assert!(history.iter().any(|m| m.text().contains("keep me")));
2246 }
2247
2248 #[tokio::test]
2249 async fn switch_model_is_sticky_for_future_session_rebuilds() {
2250 let dir = tempfile::tempdir().expect("tempdir");
2251 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2252 dir.path().join("s"),
2253 ));
2254 let mut config = Config::default();
2255 config.anthropic.api_key = Some("sk-unused".into());
2256 let app = AppBuilder::new()
2257 .with_config(config)
2258 .with_cwd(dir.path())
2259 .with_builtin_tools()
2260 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2261 .with_session_store(store)
2262 .build()
2263 .await
2264 .expect("build");
2265
2266 let selected = crate::model::ModelId::from("claude-opus-4-7");
2267 app.switch_model(&selected).await.expect("switch_model");
2268 app.new_session().await.expect("new_session");
2269
2270 assert_eq!(app.factory.current_model(), Some(selected.clone()));
2271 assert_eq!(app.settings().model.name, selected.to_string());
2272 }
2273
2274 struct SleepThenDoneLlm {
2275 turn: AtomicUsize,
2276 }
2277
2278 #[async_trait]
2279 impl LlmClient for SleepThenDoneLlm {
2280 async fn chat(
2281 &self,
2282 _messages: &[Message],
2283 _tools: &[ToolDef],
2284 ) -> motosan_agent_loop::Result<ChatOutput> {
2285 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2286 if turn == 0 {
2287 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2288 ToolCallItem {
2289 id: "sleep".into(),
2290 name: "bash".into(),
2291 args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
2292 },
2293 ])))
2294 } else {
2295 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2296 }
2297 }
2298 }
2299
2300 #[tokio::test]
2301 async fn cancel_reaches_builtin_tools_after_session_rebuild() {
2302 use futures::StreamExt;
2303 let dir = tempfile::tempdir().expect("tempdir");
2304 let mut config = Config::default();
2305 config.anthropic.api_key = Some("sk-unused".into());
2306 let app = Arc::new(
2307 AppBuilder::new()
2308 .with_config(config)
2309 .with_cwd(dir.path())
2310 .with_builtin_tools()
2311 .with_llm(Arc::new(SleepThenDoneLlm {
2312 turn: AtomicUsize::new(0),
2313 }) as Arc<dyn LlmClient>)
2314 .build()
2315 .await
2316 .expect("build"),
2317 );
2318
2319 app.new_session().await.expect("new_session");
2320 let running_app = Arc::clone(&app);
2321 let handle = tokio::spawn(async move {
2322 running_app
2323 .send_user_message(UserMessage::text("run a slow command"))
2324 .collect::<Vec<_>>()
2325 .await
2326 });
2327 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2328 app.cancel();
2329
2330 let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
2331 .await
2332 .expect("turn should finish after cancellation")
2333 .expect("join");
2334 assert!(
2335 events.iter().any(|event| {
2336 matches!(
2337 event,
2338 UiEvent::ToolCallCompleted { result, .. }
2339 if result.text.contains("command cancelled by user")
2340 )
2341 }),
2342 "cancel should reach the rebuilt bash tool: {events:?}"
2343 );
2344 }
2345
2346 #[tokio::test]
2347 async fn compact_summarizes_a_session_with_enough_history() {
2348 struct DoneLlm;
2349 #[async_trait]
2350 impl LlmClient for DoneLlm {
2351 async fn chat(
2352 &self,
2353 _messages: &[Message],
2354 _tools: &[ToolDef],
2355 ) -> motosan_agent_loop::Result<ChatOutput> {
2356 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2357 }
2358 }
2359
2360 let dir = tempfile::tempdir().expect("tempdir");
2361 let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
2362 dir.path().join("sessions"),
2363 ));
2364 let mut config = Config::default();
2365 config.anthropic.api_key = Some("sk-unused".into());
2366 let app = AppBuilder::new()
2367 .with_config(config)
2368 .with_cwd(dir.path())
2369 .with_builtin_tools()
2370 .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
2371 .with_session_store(store)
2372 .build()
2373 .await
2374 .expect("build");
2375
2376 for i in 0..4 {
2383 let _: Vec<_> = app
2384 .send_user_message(UserMessage::text(format!("turn {i}")))
2385 .collect()
2386 .await;
2387 }
2388
2389 app.compact().await.expect("compact should succeed");
2390
2391 let history = app.session_history().await.expect("history");
2394 assert!(
2395 !history.is_empty(),
2396 "session should still have content post-compaction"
2397 );
2398 }
2399
2400 #[test]
2401 fn anthropic_env_api_key_overrides_auth_json_key() {
2402 let mut auth = crate::auth::Auth::default();
2403 auth.0.insert(
2404 "anthropic".into(),
2405 crate::auth::ProviderAuth::ApiKey {
2406 key: "sk-auth".into(),
2407 },
2408 );
2409
2410 let key = anthropic_api_key_from(&auth, |name| {
2411 (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
2412 });
2413 assert_eq!(key.as_deref(), Some("sk-env"));
2414 }
2415
2416 #[tokio::test]
2417 async fn with_settings_overrides_deprecated_config_model() {
2418 use crate::settings::Settings;
2419
2420 let mut config = Config::default();
2421 config.model.name = "from-config".into();
2422 config.anthropic.api_key = Some("sk-config".into());
2423
2424 let mut settings = Settings::default();
2425 settings.model.name = "from-settings".into();
2426
2427 let tmp = tempfile::tempdir().unwrap();
2428 let app = AppBuilder::new()
2429 .with_config(config)
2430 .with_settings(settings)
2431 .with_cwd(tmp.path())
2432 .disable_context_discovery()
2433 .with_llm(Arc::new(EchoLlm))
2434 .build()
2435 .await
2436 .expect("build");
2437 assert_eq!(app.config().model.name, "from-settings");
2438 assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
2439 }
2440
2441 #[tokio::test]
2442 async fn with_settings_synthesises_legacy_config_for_build() {
2443 use crate::auth::{Auth, ProviderAuth};
2444 use crate::settings::Settings;
2445
2446 let mut settings = Settings::default();
2447 settings.model.name = "claude-sonnet-4-6".into();
2448
2449 let mut auth = Auth::default();
2450 auth.0.insert(
2451 "anthropic".into(),
2452 ProviderAuth::ApiKey {
2453 key: "sk-test".into(),
2454 },
2455 );
2456
2457 let tmp = tempfile::tempdir().unwrap();
2458 let app = AppBuilder::new()
2459 .with_settings(settings)
2460 .with_auth(auth)
2461 .with_cwd(tmp.path())
2462 .with_builtin_tools()
2463 .disable_context_discovery()
2464 .with_llm(Arc::new(EchoLlm))
2465 .build()
2466 .await
2467 .expect("build");
2468 let _ = app;
2469 }
2470
2471 #[tokio::test]
2472 async fn cancel_before_turn_does_not_poison_future_turns() {
2473 let dir = tempfile::tempdir().unwrap();
2474 let mut cfg = Config::default();
2475 cfg.anthropic.api_key = Some("sk-unused".into());
2476 let app = AppBuilder::new()
2477 .with_config(cfg)
2478 .with_cwd(dir.path())
2479 .with_builtin_tools()
2480 .with_llm(std::sync::Arc::new(EchoLlm))
2481 .build()
2482 .await
2483 .expect("build");
2484
2485 app.cancel();
2486 let events: Vec<UiEvent> = app
2487 .send_user_message(UserMessage::text("x"))
2488 .collect()
2489 .await;
2490
2491 assert!(
2492 events
2493 .iter()
2494 .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
2495 "turn should use a fresh cancellation token: {events:?}"
2496 );
2497 }
2498
2499 #[test]
2500 fn map_event_matches_started_and_completed_ids_by_tool_name() {
2501 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2502
2503 let started_bash = map_event(
2504 AgentEvent::Core(CoreEvent::ToolStarted {
2505 name: "bash".into(),
2506 }),
2507 &tracker,
2508 );
2509 let started_read = map_event(
2510 AgentEvent::Core(CoreEvent::ToolStarted {
2511 name: "read".into(),
2512 }),
2513 &tracker,
2514 );
2515 let completed_bash = map_event(
2516 AgentEvent::Core(CoreEvent::ToolCompleted {
2517 name: "bash".into(),
2518 result: motosan_agent_tool::ToolResult::text("ok"),
2519 }),
2520 &tracker,
2521 );
2522 let completed_read = map_event(
2523 AgentEvent::Core(CoreEvent::ToolCompleted {
2524 name: "read".into(),
2525 result: motosan_agent_tool::ToolResult::text("ok"),
2526 }),
2527 &tracker,
2528 );
2529
2530 assert!(matches!(
2531 started_bash,
2532 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
2533 ));
2534 assert!(matches!(
2535 started_read,
2536 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
2537 ));
2538 assert!(matches!(
2539 completed_bash,
2540 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
2541 ));
2542 assert!(matches!(
2543 completed_read,
2544 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
2545 ));
2546 }
2547
2548 #[test]
2549 fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
2550 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2551 let s1 = map_event(
2552 AgentEvent::Core(CoreEvent::ToolStarted {
2553 name: "bash".into(),
2554 }),
2555 &tracker,
2556 );
2557 let s2 = map_event(
2558 AgentEvent::Core(CoreEvent::ToolStarted {
2559 name: "bash".into(),
2560 }),
2561 &tracker,
2562 );
2563 let c1 = map_event(
2564 AgentEvent::Core(CoreEvent::ToolCompleted {
2565 name: "bash".into(),
2566 result: motosan_agent_tool::ToolResult::text("a"),
2567 }),
2568 &tracker,
2569 );
2570 let c2 = map_event(
2571 AgentEvent::Core(CoreEvent::ToolCompleted {
2572 name: "bash".into(),
2573 result: motosan_agent_tool::ToolResult::text("b"),
2574 }),
2575 &tracker,
2576 );
2577
2578 let id_s1 = match s1 {
2579 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2580 other => panic!("{other:?}"),
2581 };
2582 let id_s2 = match s2 {
2583 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2584 other => panic!("{other:?}"),
2585 };
2586 let id_c1 = match c1 {
2587 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2588 other => panic!("{other:?}"),
2589 };
2590 let id_c2 = match c2 {
2591 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2592 other => panic!("{other:?}"),
2593 };
2594
2595 assert_eq!(id_s1, id_c1);
2596 assert_eq!(id_s2, id_c2);
2597 assert_ne!(id_s1, id_s2);
2598 }
2599
2600 #[tokio::test]
2601 async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
2602 let dir = tempfile::tempdir().unwrap();
2603 let mut cfg = Config::default();
2604 cfg.anthropic.api_key = Some("sk-unused".into());
2605 let app = AppBuilder::new()
2606 .with_config(cfg)
2607 .with_cwd(dir.path())
2608 .with_builtin_tools()
2609 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2610 turn: AtomicUsize::new(0),
2611 }))
2612 .build()
2613 .await
2614 .expect("build");
2615
2616 let mut first = Box::pin(app.send_user_message(UserMessage::text("first")));
2617 let first_event = first.next().await;
2618 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2619
2620 let second_events: Vec<UiEvent> = app
2621 .send_user_message(UserMessage::text("second"))
2622 .collect()
2623 .await;
2624 assert_eq!(
2625 second_events.len(),
2626 1,
2627 "expected immediate single error event, got: {second_events:?}"
2628 );
2629 assert!(matches!(
2630 &second_events[0],
2631 UiEvent::Error(msg) if msg.contains("single-turn-per-App")
2632 ));
2633 }
2634
2635 #[tokio::test]
2636 async fn attachment_error_is_emitted_before_the_single_turn_guard_fires() {
2637 let dir = tempfile::tempdir().unwrap();
2643 let mut cfg = Config::default();
2644 cfg.anthropic.api_key = Some("sk-unused".into());
2645 let app = AppBuilder::new()
2646 .with_config(cfg)
2647 .with_cwd(dir.path())
2648 .with_builtin_tools()
2649 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2650 turn: AtomicUsize::new(0),
2651 }))
2652 .build()
2653 .await
2654 .expect("build");
2655
2656 let mut first =
2658 Box::pin(app.send_user_message(crate::user_message::UserMessage::text("first")));
2659 let first_event = first.next().await;
2660 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2661
2662 let bad = crate::user_message::UserMessage {
2664 text: "second".into(),
2665 attachments: vec![crate::user_message::Attachment::Image {
2666 path: std::path::PathBuf::from("/tmp/this-file-must-not-exist-capo-v06-test.png"),
2667 }],
2668 };
2669 let second_events: Vec<UiEvent> = app.send_user_message(bad).collect().await;
2670
2671 assert_eq!(
2672 second_events.len(),
2673 1,
2674 "expected exactly one event (the attachment error); got: {second_events:?}"
2675 );
2676 assert!(
2677 matches!(
2678 &second_events[0],
2679 UiEvent::AttachmentError {
2680 kind: crate::user_message::AttachmentErrorKind::NotFound,
2681 ..
2682 }
2683 ),
2684 "expected AttachmentError::NotFound as first event; got {second_events:?}"
2685 );
2686 }
2687
2688 struct InfiniteToolLlm;
2689
2690 #[async_trait]
2691 impl LlmClient for InfiniteToolLlm {
2692 async fn chat(
2693 &self,
2694 _messages: &[Message],
2695 _tools: &[ToolDef],
2696 ) -> motosan_agent_loop::Result<ChatOutput> {
2697 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2700 ToolCallItem {
2701 id: "loop".into(),
2702 name: "read".into(),
2703 args: serde_json::json!({"path": "nope.txt"}),
2704 },
2705 ])))
2706 }
2707 }
2708
2709 #[tokio::test]
2710 async fn max_iterations_surfaces_as_notice_with_lifecycle_complete() {
2711 let dir = tempfile::tempdir().unwrap();
2712 let mut cfg = Config::default();
2713 cfg.anthropic.api_key = Some("sk-unused".into());
2714 let app = AppBuilder::new()
2715 .with_config(cfg)
2716 .with_cwd(dir.path())
2717 .with_builtin_tools()
2718 .with_llm(std::sync::Arc::new(InfiniteToolLlm))
2719 .with_max_iterations(3)
2720 .build()
2721 .await
2722 .expect("build");
2723
2724 let events: Vec<UiEvent> =
2725 futures::StreamExt::collect(app.send_user_message(UserMessage::text("loop"))).await;
2726
2727 assert!(
2728 !events.iter().any(|e| matches!(e, UiEvent::Error(_))),
2729 "MaxIterations should surface as Notice, not Error; got: {events:?}"
2730 );
2731
2732 let notice = events.iter().find_map(|e| match e {
2733 UiEvent::Notice { title, body } => Some((title.clone(), body.clone())),
2734 _ => None,
2735 });
2736 let (title, body) =
2737 notice.unwrap_or_else(|| panic!("expected Notice event; got: {events:?}"));
2738 let title_lower = title.to_lowercase();
2739 assert!(
2740 title_lower.contains("stop") || title_lower.contains("iteration"),
2741 "notice title should mention stop/iteration; got title={title:?} body={body:?}"
2742 );
2743 assert!(
2744 body.contains("3"),
2745 "body should reference the per-turn cap (3); got body={body:?}"
2746 );
2747 let body_lower = body.to_lowercase();
2748 assert!(
2749 body_lower.contains("continue") || body_lower.contains("send another"),
2750 "body should hint that user can continue; got body={body:?}"
2751 );
2752
2753 assert!(
2754 events
2755 .iter()
2756 .any(|e| matches!(e, UiEvent::AgentTurnComplete)),
2757 "AgentTurnComplete should fire on the soft-cap path; got: {events:?}"
2758 );
2759 }
2760
2761 #[test]
2762 fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
2763 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2764 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2765
2766 let only = map_event(
2767 AgentEvent::Core(CoreEvent::ToolStarted {
2768 name: "bash".into(),
2769 }),
2770 &tracker,
2771 );
2772 let only_id = match only {
2773 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2774 other => panic!("{other:?}"),
2775 };
2776 assert_eq!(progress_event_id(&tracker), only_id);
2777
2778 let _second = map_event(
2779 AgentEvent::Core(CoreEvent::ToolStarted {
2780 name: "read".into(),
2781 }),
2782 &tracker,
2783 );
2784 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2785 }
2786
2787 #[tokio::test]
2788 async fn builder_rejects_builtin_and_custom_tools_together() {
2789 let mut cfg = Config::default();
2790 cfg.anthropic.api_key = Some("sk-unused".into());
2791 let dir = tempfile::tempdir().unwrap();
2792 let err = match AppBuilder::new()
2793 .with_config(cfg)
2794 .with_cwd(dir.path())
2795 .with_builtin_tools()
2796 .with_custom_tools_factory(|_| Vec::new())
2797 .build()
2798 .await
2799 {
2800 Ok(_) => panic!("must reject conflicting tool configuration"),
2801 Err(err) => err,
2802 };
2803
2804 assert!(format!("{err}").contains("mutually exclusive"));
2805 }
2806
2807 #[tokio::test]
2809 async fn two_turns_in_same_session_share_history() {
2810 #[derive(Default)]
2811 struct CounterLlm {
2812 turn: AtomicUsize,
2813 }
2814 #[async_trait]
2815 impl LlmClient for CounterLlm {
2816 async fn chat(
2817 &self,
2818 messages: &[Message],
2819 _tools: &[ToolDef],
2820 ) -> motosan_agent_loop::Result<ChatOutput> {
2821 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2822 let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
2823 Ok(ChatOutput::new(LlmResponse::Message(answer)))
2824 }
2825 }
2826
2827 let tmp = tempfile::tempdir().unwrap();
2828 let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
2829 tmp.path().to_path_buf(),
2830 ));
2831
2832 let app = AppBuilder::new()
2833 .with_settings(crate::settings::Settings::default())
2834 .with_auth(crate::auth::Auth::default())
2835 .with_cwd(tmp.path())
2836 .with_builtin_tools()
2837 .disable_context_discovery()
2838 .with_llm(std::sync::Arc::new(CounterLlm::default()))
2839 .with_session_store(store)
2840 .build_with_session(None)
2841 .await
2842 .expect("build");
2843
2844 let _events1: Vec<UiEvent> = app
2845 .send_user_message(UserMessage::text("hi"))
2846 .collect()
2847 .await;
2848 let events2: Vec<UiEvent> = app
2849 .send_user_message(UserMessage::text("again"))
2850 .collect()
2851 .await;
2852
2853 let saw_more_than_one = events2.iter().any(|e| {
2855 matches!(
2856 e,
2857 UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
2858 )
2859 });
2860 assert!(
2861 saw_more_than_one,
2862 "second turn should have seen history; events: {events2:?}"
2863 );
2864 }
2865}
2866
2867#[cfg(test)]
2868mod skills_builder_tests {
2869 use super::*;
2870 use crate::skills::types::{Skill, SkillSource};
2871 use std::path::PathBuf;
2872
2873 fn fixture() -> Skill {
2874 Skill {
2875 name: "x".into(),
2876 description: "d".into(),
2877 file_path: PathBuf::from("/x.md"),
2878 base_dir: PathBuf::from("/"),
2879 disable_model_invocation: false,
2880 source: SkillSource::Global,
2881 }
2882 }
2883
2884 #[test]
2885 fn with_skills_stores_skills() {
2886 let b = AppBuilder::new().with_skills(vec![fixture()]);
2887 assert_eq!(b.skills.len(), 1);
2888 assert_eq!(b.skills[0].name, "x");
2889 }
2890
2891 #[test]
2892 fn without_skills_clears() {
2893 let b = AppBuilder::new()
2894 .with_skills(vec![fixture()])
2895 .without_skills();
2896 assert!(b.skills.is_empty());
2897 }
2898}
2899
2900#[cfg(test)]
2901mod mcp_builder_tests {
2902 use super::*;
2903 use motosan_agent_tool::Tool;
2904
2905 struct FakeTool;
2907 impl Tool for FakeTool {
2908 fn def(&self) -> motosan_agent_tool::ToolDef {
2909 motosan_agent_tool::ToolDef {
2910 name: "fake__echo".into(),
2911 description: "test".into(),
2912 input_schema: serde_json::json!({"type": "object"}),
2913 }
2914 }
2915 fn call(
2916 &self,
2917 _args: serde_json::Value,
2918 _ctx: &motosan_agent_tool::ToolContext,
2919 ) -> std::pin::Pin<
2920 Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
2921 > {
2922 Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
2923 }
2924 }
2925
2926 #[test]
2927 fn with_extra_tools_stores_tools() {
2928 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
2929 let b = AppBuilder::new().with_extra_tools(tools);
2930 assert_eq!(b.extra_tools.len(), 1);
2931 }
2932}