1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9 AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10 CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate, PromptStrategy};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
27pub struct TurnStatsAccum {
28 pub cumulative_input: u64,
29 pub cumulative_output: u64,
30 pub turn_count: u64,
31}
32
33impl TurnStatsAccum {
34 pub fn add(&mut self, usage: motosan_agent_loop::TokenUsage) {
38 self.cumulative_input = self.cumulative_input.saturating_add(usage.input_tokens);
39 self.cumulative_output = self.cumulative_output.saturating_add(usage.output_tokens);
40 self.turn_count = self.turn_count.saturating_add(1);
41 }
42}
43
44pub(crate) fn extract_thinking_events(messages: &[motosan_agent_loop::Message]) -> Vec<UiEvent> {
52 let mut events = Vec::new();
53 for msg in messages {
54 if let motosan_agent_loop::Message::Assistant { content, .. } = msg {
55 for part in content {
56 if let motosan_agent_loop::AssistantContent::Reasoning { text, .. } = part {
57 events.push(UiEvent::ThinkingComplete { text: text.clone() });
58 }
59 }
60 }
61 }
62 events
63}
64
65pub(crate) fn extract_new_thinking_events(
70 messages: &[motosan_agent_loop::Message],
71 previous_len: usize,
72) -> Vec<UiEvent> {
73 extract_thinking_events(messages.get(previous_len..).unwrap_or(&[]))
74}
75
76#[derive(Debug, Clone)]
78pub(crate) enum SessionMode {
79 New,
81 Resume(String),
83}
84
85struct SharedLlm {
90 client: Arc<dyn LlmClient>,
91}
92
93impl SharedLlm {
94 fn new(client: Arc<dyn LlmClient>) -> Self {
95 Self { client }
96 }
97
98 fn client(&self) -> Arc<dyn LlmClient> {
99 Arc::clone(&self.client)
100 }
101}
102
103pub(crate) struct SessionFactory {
104 cwd: PathBuf,
105 settings: Arc<Mutex<crate::settings::Settings>>,
106 auth: crate::auth::Auth,
107 policy: Arc<crate::permissions::Policy>,
108 session_cache: Arc<crate::permissions::SessionCache>,
109 ui_tx: Option<mpsc::Sender<UiEvent>>,
110 headless_permissions: bool,
111 permission_gate: Arc<dyn PermissionGate>,
112 permission_strategy_handle: Option<Arc<tokio::sync::RwLock<PromptStrategy>>>,
113 progress_tx: mpsc::Sender<ToolProgressChunk>,
116 skills: Arc<Vec<crate::skills::Skill>>,
117 install_builtin_tools: bool,
118 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
119 max_iterations: usize,
120 context_discovery_disabled: bool,
121 autocompact_enabled: bool,
122 session_store: Option<Arc<dyn SessionStore>>,
123 llm_override: Option<Arc<dyn LlmClient>>,
124 current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
125 cancel_token: SharedCancelToken,
126}
127
128impl SessionFactory {
129 fn settings(&self) -> crate::settings::Settings {
130 match self.settings.lock() {
131 Ok(guard) => guard.clone(),
132 Err(poisoned) => poisoned.into_inner().clone(),
133 }
134 }
135
136 fn store_settings(&self, settings: crate::settings::Settings) {
137 match self.settings.lock() {
138 Ok(mut guard) => *guard = settings,
139 Err(poisoned) => *poisoned.into_inner() = settings,
140 }
141 }
142
143 fn current_model(&self) -> Option<crate::model::ModelId> {
144 match self.current_model.lock() {
145 Ok(guard) => guard.clone(),
146 Err(poisoned) => poisoned.into_inner().clone(),
147 }
148 }
149
150 fn set_current_model(&self, model: crate::model::ModelId) {
151 match self.current_model.lock() {
152 Ok(mut guard) => *guard = Some(model),
153 Err(poisoned) => *poisoned.into_inner() = Some(model),
154 }
155 }
156
157 fn clear_current_model(&self) {
158 match self.current_model.lock() {
159 Ok(mut guard) => *guard = None,
160 Err(poisoned) => *poisoned.into_inner() = None,
161 }
162 }
163
164 async fn build(
171 &self,
172 mode: SessionMode,
173 model_override: Option<&crate::model::ModelId>,
174 settings_override: Option<&crate::settings::Settings>,
175 ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
176 let effective_model = model_override.cloned().or_else(|| {
180 if settings_override.is_some() {
181 None
182 } else {
183 self.current_model()
184 }
185 });
186 let mut settings = settings_override
187 .cloned()
188 .unwrap_or_else(|| self.settings());
189 if let Some(m) = &effective_model {
190 settings.model.name = m.as_str().to_string();
191 }
192
193 let llm = if effective_model.is_none() {
194 self.llm_override.as_ref().map_or_else(
195 || build_llm_client(&settings, &self.auth),
196 |llm| Ok(Arc::clone(llm)),
197 )?
198 } else {
199 build_llm_client(&settings, &self.auth)?
200 };
201
202 let tool_ctx = ToolCtx::new_with_cancel_token(
205 &self.cwd,
206 Arc::clone(&self.permission_gate),
207 self.progress_tx.clone(),
208 self.cancel_token.clone(),
209 );
210 let mut tools = if self.install_builtin_tools {
211 builtin_tools(tool_ctx.clone())
212 } else {
213 Vec::new()
214 };
215 tools.extend(self.extra_tools.iter().cloned());
216
217 let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
218 let base_prompt = build_system_prompt(&tool_names, &self.skills);
219 let system_prompt = if self.context_discovery_disabled {
220 base_prompt
221 } else {
222 let agent_dir = crate::paths::agent_dir();
223 let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
224 crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
225 };
226 let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
227
228 let mut engine_builder = Engine::builder()
229 .max_iterations(self.max_iterations)
230 .system_prompt(system_prompt)
231 .tool_context(motosan_tool_context);
232 for tool in tools {
233 engine_builder = engine_builder.tool(tool);
234 }
235 if let Some(ui_tx) = &self.ui_tx {
236 let ext = if let Some(handle) = &self.permission_strategy_handle {
237 crate::permissions::PermissionExtension::with_strategy_handle(
238 Arc::clone(&self.policy),
239 Arc::clone(&self.session_cache),
240 self.cwd.clone(),
241 Some(ui_tx.clone()),
242 Arc::clone(handle),
243 )
244 } else {
245 crate::permissions::PermissionExtension::new(
246 Arc::clone(&self.policy),
247 Arc::clone(&self.session_cache),
248 self.cwd.clone(),
249 ui_tx.clone(),
250 )
251 };
252 engine_builder = engine_builder.extension(Box::new(ext));
253 } else if self.headless_permissions {
254 let ext = if let Some(handle) = &self.permission_strategy_handle {
255 crate::permissions::PermissionExtension::with_strategy_handle(
256 Arc::clone(&self.policy),
257 Arc::clone(&self.session_cache),
258 self.cwd.clone(),
259 None,
260 Arc::clone(handle),
261 )
262 } else {
263 crate::permissions::PermissionExtension::headless(
264 Arc::clone(&self.policy),
265 Arc::clone(&self.session_cache),
266 self.cwd.clone(),
267 )
268 };
269 engine_builder = engine_builder.extension(Box::new(ext));
270 }
271 if self.autocompact_enabled
272 && settings.session.compact_at_context_pct > 0.0
273 && settings.session.compact_at_context_pct < 1.0
274 {
275 let cfg = AutocompactConfig {
276 threshold: settings.session.compact_at_context_pct,
277 max_context_tokens: settings.session.max_context_tokens,
278 keep_turns: settings.session.keep_turns.max(1),
279 };
280 engine_builder = engine_builder
281 .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
282 }
283 let engine = engine_builder.build();
284
285 let session = match (&mode, &self.session_store) {
286 (SessionMode::Resume(id), Some(store)) => {
287 let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
288 .await
289 .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
290 let entries = s
291 .entries()
292 .await
293 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
294 crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
295 s
296 }
297 (SessionMode::Resume(_), None) => {
298 return Err(AppError::Config("resume requires a session store".into()));
299 }
300 (SessionMode::New, Some(store)) => {
301 let id = crate::session::SessionId::new();
302 AgentSession::new_with_store(
303 id.into_string(),
304 Arc::clone(store),
305 engine,
306 Arc::clone(&llm),
307 )
308 }
309 (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
310 };
311
312 Ok((session, llm))
313 }
314}
315
316pub struct App {
317 session: arc_swap::ArcSwap<AgentSession>,
318 llm: arc_swap::ArcSwap<SharedLlm>,
319 factory: SessionFactory,
320 config: Config,
321 cancel_token: SharedCancelToken,
322 progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
323 next_tool_id: Arc<Mutex<ToolCallTracker>>,
324 skills: Arc<Vec<crate::skills::Skill>>,
325 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
326 pub(crate) extension_registry: Arc<crate::extensions::ExtensionRegistry>,
328 pub(crate) token_tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>,
333 extension_diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
334 pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
335 pub(crate) permission_mode: Arc<tokio::sync::RwLock<crate::permissions::PermissionMode>>,
338 pub(crate) permission_strategy_handle: Option<Arc<tokio::sync::RwLock<PromptStrategy>>>,
341 pub(crate) ui_tx_owned: Option<mpsc::Sender<UiEvent>>,
344}
345
346impl App {
347 pub fn config(&self) -> &Config {
348 &self.config
349 }
350
351 pub fn cancel(&self) {
355 self.cancel_token.cancel();
356 }
357
358 pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
362 Arc::clone(&self.session_cache)
363 }
364
365 pub fn extension_registry(&self) -> Arc<crate::extensions::ExtensionRegistry> {
368 Arc::clone(&self.extension_registry)
369 }
370
371 pub fn settings(&self) -> crate::settings::Settings {
373 let mut settings = self.factory.settings();
374 if let Some(model) = self.factory.current_model() {
375 settings.model.name = model.to_string();
376 }
377 settings
378 }
379
380 pub fn token_tally(&self) -> Arc<tokio::sync::Mutex<TurnStatsAccum>> {
382 Arc::clone(&self.token_tally)
383 }
384
385 pub async fn set_permission_mode(&self, mode: crate::permissions::PermissionMode) {
390 let new_strategy = match mode {
391 crate::permissions::PermissionMode::Bypass => PromptStrategy::AllowAll,
392 crate::permissions::PermissionMode::AcceptEdits => PromptStrategy::AcceptEdits,
393 crate::permissions::PermissionMode::Prompt if self.ui_tx_owned.is_none() => {
394 PromptStrategy::HeadlessDeny
395 }
396 crate::permissions::PermissionMode::Prompt => PromptStrategy::Prompt,
397 };
398
399 if let Some(handle) = &self.permission_strategy_handle {
400 *handle.write().await = new_strategy;
401 }
402
403 *self.permission_mode.write().await = mode;
404
405 if let Some(ui_tx) = &self.ui_tx_owned {
406 let _ = ui_tx.send(UiEvent::PermissionModeChanged { mode }).await;
407 }
408 }
409
410 pub async fn emit_settings_snapshot(&self) {
414 if let Some(ui_tx) = &self.ui_tx_owned {
415 let _ = ui_tx
416 .send(UiEvent::SettingsSnapshot {
417 settings: self.settings(),
418 })
419 .await;
420 }
421 }
422
423 pub async fn permission_mode(&self) -> crate::permissions::PermissionMode {
425 *self.permission_mode.read().await
426 }
427
428 pub fn extension_diagnostics(&self) -> Arc<Vec<crate::extensions::ExtensionDiagnostic>> {
430 Arc::clone(&self.extension_diagnostics)
431 }
432
433 pub fn session_id(&self) -> String {
436 self.session.load().session_id().to_string()
437 }
438
439 pub async fn session_history(
448 &self,
449 ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
450 self.session.load_full().history().await
451 }
452
453 pub async fn compact(&self) -> Result<()> {
459 use motosan_agent_loop::ThresholdStrategy;
460 let strategy = ThresholdStrategy {
461 threshold: 0.0,
462 ..ThresholdStrategy::default()
463 };
464 let llm = self.llm.load_full().client();
465 self.session
466 .load_full()
467 .maybe_compact(&strategy, llm)
468 .await
469 .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
470 Ok(())
471 }
472
473 pub async fn new_session(&self) -> Result<()> {
477 self.fire_session_before_switch("new", None).await?;
478 let (session, llm) = self.factory.build(SessionMode::New, None, None).await?;
479 self.session.store(Arc::new(session));
480 self.llm.store(Arc::new(SharedLlm::new(llm)));
481 self.reset_token_tally().await;
482 Ok(())
483 }
484
485 pub async fn load_session(&self, id: &str) -> Result<()> {
491 self.fire_session_before_switch("load", Some(id)).await?;
492 self.load_session_without_hook(id).await
493 }
494
495 async fn load_session_without_hook(&self, id: &str) -> Result<()> {
496 let (session, llm) = self
497 .factory
498 .build(SessionMode::Resume(id.to_string()), None, None)
499 .await?;
500 self.session.store(Arc::new(session));
501 self.llm.store(Arc::new(SharedLlm::new(llm)));
502 self.reset_token_tally().await;
503 Ok(())
504 }
505
506 async fn reset_token_tally(&self) {
507 let mut tally = self.token_tally.lock().await;
508 *tally = TurnStatsAccum::default();
509 }
510
511 pub async fn clone_session(&self) -> Result<String> {
520 self.fire_session_before_switch("clone", None).await?;
521 let Some(store) = self.factory.session_store.as_ref() else {
522 return Err(AppError::Config("clone requires a session store".into()));
523 };
524 let source_id = self.session.load().session_id().to_string();
525 let new_id = crate::session::SessionId::new().into_string();
526 let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
527 catalog
528 .fork(&source_id, &new_id)
529 .await
530 .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
531 self.load_session_without_hook(&new_id).await?;
532 Ok(new_id)
533 }
534
535 pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
550 self.fire_session_before_switch("model_switch", None)
551 .await?;
552 let current_id = self.session.load().session_id().to_string();
553 let (session, llm) = self
554 .factory
555 .build(SessionMode::Resume(current_id), Some(model), None)
556 .await?;
557 self.factory.set_current_model(model.clone());
558 self.session.store(Arc::new(session));
559 self.llm.store(Arc::new(SharedLlm::new(llm)));
560 Ok(())
561 }
562
563 pub async fn reload_settings(&self, new_settings: crate::settings::Settings) -> Result<()> {
579 let current_id = self.session.load().session_id().to_string();
583 let (session, llm) = self
584 .factory
585 .build(SessionMode::Resume(current_id), None, Some(&new_settings))
586 .await?;
587 self.factory.store_settings(new_settings);
588 self.factory.clear_current_model();
589 self.session.store(Arc::new(session));
590 self.llm.store(Arc::new(SharedLlm::new(llm)));
591
592 self.reset_token_tally().await;
595
596 Ok(())
597 }
598
599 pub async fn disconnect_mcp(&self) {
602 for (name, server) in &self.mcp_servers {
603 let _ =
604 tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
605 tracing::debug!(target: "mcp", server = %name, "disconnected");
606 }
607 }
608
609 fn run_turn(
610 &self,
611 msg: crate::user_message::UserMessage,
612 fork_from: Option<motosan_agent_loop::EntryId>,
613 ) -> impl Stream<Item = UiEvent> + Send + 'static {
614 let session = self.session.load_full();
615 let skills = Arc::clone(&self.skills);
616 let cancel_token = self.cancel_token.clone();
617 let tracker = Arc::clone(&self.next_tool_id);
618 let progress = Arc::clone(&self.progress_rx);
619 let token_tally = Arc::clone(&self.token_tally);
620 let settings_model_name = self
621 .factory
622 .current_model()
623 .map(|model| model.to_string())
624 .unwrap_or_else(|| self.factory.settings().model.name);
625
626 async_stream::stream! {
627 let new_user = {
631 let expanded_text = crate::skills::expand::expand_skill_command(&msg.text, &skills);
635 let expanded_msg = crate::user_message::UserMessage {
636 text: expanded_text,
637 attachments: msg.attachments.clone(),
638 };
639 match crate::user_message::prepare_user_message(&expanded_msg) {
640 Ok(m) => m,
641 Err(err) => {
642 yield UiEvent::AttachmentError {
643 kind: err.kind(),
644 message: err.to_string(),
645 };
646 return;
647 }
648 }
649 };
650
651 let mut progress_guard = match progress.try_lock() {
653 Ok(guard) => guard,
654 Err(_) => {
655 yield UiEvent::Error(
656 "another turn is already running; capo is single-turn-per-App".into(),
657 );
658 return;
659 }
660 };
661
662 let cancel = cancel_token.reset();
664
665 yield UiEvent::AgentTurnStarted;
666 yield UiEvent::AgentThinking;
667
668 let handle = match fork_from {
670 None => {
671 let history = match session.history().await {
673 Ok(h) => h,
674 Err(err) => {
675 yield UiEvent::Error(format!("session.history failed: {err}"));
676 return;
677 }
678 };
679 let mut messages = history;
680 messages.push(new_user);
681 match session.start_turn(messages).await {
682 Ok(h) => h,
683 Err(err) => {
684 yield UiEvent::Error(format!("session.start_turn failed: {err}"));
685 return;
686 }
687 }
688 }
689 Some(from) => {
690 match session.fork_turn(from, vec![new_user]).await {
692 Ok(h) => h,
693 Err(err) => {
694 yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
695 return;
696 }
697 }
698 }
699 };
700 let previous_len = handle.previous_len;
701 let epoch = handle.epoch;
702 let branch_parent = handle.branch_parent;
703 let ops_tx = handle.ops_tx.clone();
704 let mut agent_stream = handle.stream;
705
706 let interrupt_bridge = tokio::spawn(async move {
714 cancel.cancelled().await;
715 let _ = ops_tx.send(AgentOp::Interrupt).await;
716 });
717
718 let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
720 let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
721
722 loop {
723 while let Ok(chunk) = progress_guard.try_recv() {
725 yield UiEvent::ToolCallProgress {
726 id: progress_event_id(&tracker),
727 chunk: ProgressChunk::from(chunk),
728 };
729 }
730
731 tokio::select! {
732 biased;
733 maybe_item = agent_stream.next() => {
734 match maybe_item {
735 Some(AgentStreamItem::Event(ev)) => {
736 if let Some(ui) = map_event(ev, &tracker) {
737 yield ui;
738 }
739 }
740 Some(AgentStreamItem::Terminal(term)) => {
741 terminal_result = Some(term.result);
742 terminal_messages = Some(term.messages);
743 break;
744 }
745 None => break,
746 }
747 }
748 Some(chunk) = progress_guard.recv() => {
749 yield UiEvent::ToolCallProgress {
750 id: progress_event_id(&tracker),
751 chunk: ProgressChunk::from(chunk),
752 };
753 }
754 }
755 }
756
757 interrupt_bridge.abort();
759
760 if let Some(msgs) = terminal_messages.as_ref() {
762 if let Err(err) = session
763 .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
764 .await
765 {
766 yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
767 }
768 }
769
770 match terminal_result {
772 Some(Ok(result)) => {
773 if let Some(msgs) = terminal_messages.as_ref() {
776 for ev in extract_new_thinking_events(msgs, previous_len) {
777 yield ev;
778 }
779 }
780
781 let final_text = terminal_messages
782 .as_ref()
783 .and_then(|msgs| {
784 msgs.iter()
785 .rev()
786 .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
787 .map(|m| m.text())
788 })
789 .unwrap_or_default();
790 if !final_text.is_empty() {
791 yield UiEvent::AgentMessageComplete(final_text);
792 }
793 let usage = result.usage;
796 let (cumulative_input, cumulative_output) = {
797 let mut tally = token_tally.lock().await;
798 tally.add(usage);
799 (tally.cumulative_input, tally.cumulative_output)
800 };
801 yield UiEvent::TurnStats {
802 input_tokens: usage.input_tokens,
803 output_tokens: usage.output_tokens,
804 cumulative_input,
805 cumulative_output,
806 model: settings_model_name.clone(),
807 };
808 while let Ok(chunk) = progress_guard.try_recv() {
810 yield UiEvent::ToolCallProgress {
811 id: progress_event_id(&tracker),
812 chunk: ProgressChunk::from(chunk),
813 };
814 }
815 yield UiEvent::AgentTurnComplete;
816 }
817 Some(Err(err)) => {
818 if let motosan_agent_loop::AgentError::MaxIterations(n) = err {
831 yield UiEvent::Notice {
832 title: "Agent stopped".to_string(),
833 body: format!(
834 "Reached the per-turn iteration cap ({n}). \
835 Partial work is saved — send another message to continue."
836 ),
837 };
838 yield UiEvent::AgentTurnComplete;
839 } else {
840 yield UiEvent::Error(format!("{err}"));
841 }
842 }
843 None => { }
844 }
845 }
846 }
847
848 pub fn send_user_message(
849 &self,
850 msg: crate::user_message::UserMessage,
851 ) -> impl Stream<Item = UiEvent> + Send + 'static {
852 self.run_turn(msg, None)
853 }
854
855 pub fn fork_from(
860 &self,
861 from: motosan_agent_loop::EntryId,
862 message: crate::user_message::UserMessage,
863 ) -> impl Stream<Item = UiEvent> + Send + 'static {
864 let registry = Arc::clone(&self.extension_registry);
865 let inner = self.run_turn(message, Some(from));
866 async_stream::stream! {
867 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
868 match dispatch_session_before_switch(®istry, "fork", None).await {
869 HookOutcome::Continue => {}
870 HookOutcome::Cancelled { extension_name, reason } => {
871 let msg = match reason {
872 Some(r) => format!("extension `{extension_name}` cancelled fork: {r}"),
873 None => format!("extension `{extension_name}` cancelled fork"),
874 };
875 yield UiEvent::Error(msg);
876 return;
877 }
878 }
879 let mut inner = Box::pin(inner);
880 while let Some(ev) = futures::StreamExt::next(&mut inner).await {
881 yield ev;
882 }
883 }
884 }
885
886 pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
890 let entries = self
891 .session
892 .load_full()
893 .entries()
894 .await
895 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
896 let branch = motosan_agent_loop::active_branch(&entries);
897 let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
898 .iter()
899 .filter_map(|stored| {
900 let msg = stored.entry.as_message()?;
901 if !matches!(msg.role(), motosan_agent_loop::Role::User) {
902 return None;
903 }
904 let preview: String = msg
905 .text()
906 .lines()
907 .next()
908 .unwrap_or("")
909 .chars()
910 .take(80)
911 .collect();
912 Some((stored.id.clone(), preview))
913 })
914 .collect();
915 out.reverse();
916 Ok(out)
917 }
918
919 pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
921 self.session
922 .load_full()
923 .branches()
924 .await
925 .map_err(|e| AppError::Config(format!("branches failed: {e}")))
926 }
927
928 async fn fire_session_before_switch(
931 &self,
932 reason: &str,
933 session_id: Option<&str>,
934 ) -> Result<()> {
935 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
936 match dispatch_session_before_switch(&self.extension_registry, reason, session_id).await {
937 HookOutcome::Continue => Ok(()),
938 HookOutcome::Cancelled {
939 extension_name,
940 reason,
941 } => Err(AppError::HookCancelled {
942 extension_name,
943 reason,
944 }),
945 }
946 }
947}
948
949#[derive(Debug, Default)]
950struct ToolCallTracker {
951 next_id: usize,
952 pending: VecDeque<(String, String)>,
953}
954
955impl ToolCallTracker {
956 fn start(&mut self, name: &str) -> String {
957 self.next_id += 1;
958 let id = format!("tool_{}", self.next_id);
959 self.pending.push_back((name.to_string(), id.clone()));
960 id
961 }
962
963 fn complete(&mut self, name: &str) -> String {
964 if let Some(pos) = self
965 .pending
966 .iter()
967 .position(|(pending_name, _)| pending_name == name)
968 {
969 if let Some((_, id)) = self.pending.remove(pos) {
970 return id;
971 }
972 }
973
974 self.next_id += 1;
975 format!("tool_{}", self.next_id)
976 }
977
978 fn progress_id(&self) -> Option<String> {
983 match self.pending.len() {
984 1 => self.pending.front().map(|(_, id)| id.clone()),
985 _ => None,
986 }
987 }
988}
989
990fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
991 match tracker.lock() {
992 Ok(guard) => guard,
993 Err(poisoned) => poisoned.into_inner(),
994 }
995}
996
997fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
998 lock_tool_tracker(tracker)
999 .progress_id()
1000 .unwrap_or_else(|| "tool_unknown".to_string())
1001}
1002
1003fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
1004where
1005 F: Fn(&str) -> Option<String>,
1006{
1007 env_lookup("ANTHROPIC_API_KEY")
1008 .map(|key| key.trim().to_string())
1009 .filter(|key| !key.is_empty())
1010 .or_else(|| auth.api_key("anthropic").map(str::to_string))
1011}
1012
1013fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
1014 match ev {
1015 AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
1016 AgentEvent::Core(CoreEvent::ToolStarted { name, args }) => {
1017 let id = lock_tool_tracker(tool_tracker).start(&name);
1018 Some(UiEvent::ToolCallStarted { id, name, args })
1019 }
1020 AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
1021 let id = lock_tool_tracker(tool_tracker).complete(&name);
1022 Some(UiEvent::ToolCallCompleted {
1023 id,
1024 result: UiToolResult {
1025 is_error: result.is_error,
1026 text: format!("{name}: {result:?}"),
1027 },
1028 })
1029 }
1030 _ => None,
1031 }
1032}
1033
1034type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
1035
1036pub struct AppBuilder {
1037 config: Option<Config>,
1038 cwd: Option<PathBuf>,
1039 permission_gate: Option<Arc<dyn PermissionGate>>,
1040 install_builtin_tools: bool,
1041 max_iterations: usize,
1042 llm_override: Option<Arc<dyn LlmClient>>,
1043 custom_tools_factory: Option<CustomToolsFactory>,
1044 permissions_policy_path: Option<PathBuf>,
1045 ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
1046 headless_permissions: bool,
1047 settings: Option<crate::settings::Settings>,
1048 auth: Option<crate::auth::Auth>,
1049 context_discovery_disabled: bool,
1050 session_store: Option<Arc<dyn SessionStore>>,
1052 resume_session_id: Option<crate::session::SessionId>,
1053 autocompact_enabled: bool,
1054 skills: Vec<crate::skills::Skill>,
1056 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
1058 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1059 extension_registry: Option<Arc<crate::extensions::ExtensionRegistry>>,
1060 extension_diagnostics: Option<Arc<Vec<crate::extensions::ExtensionDiagnostic>>>,
1061 token_tally: Option<Arc<tokio::sync::Mutex<TurnStatsAccum>>>,
1062}
1063
1064impl Default for AppBuilder {
1065 fn default() -> Self {
1066 Self {
1067 config: None,
1068 cwd: None,
1069 permission_gate: None,
1070 install_builtin_tools: false,
1071 max_iterations: 200,
1078 llm_override: None,
1079 custom_tools_factory: None,
1080 permissions_policy_path: None,
1081 ui_tx: None,
1082 headless_permissions: false,
1083 settings: None,
1084 auth: None,
1085 context_discovery_disabled: false,
1086 session_store: None,
1087 resume_session_id: None,
1088 autocompact_enabled: false,
1089 skills: Vec::new(),
1090 extra_tools: Vec::new(),
1091 mcp_servers: Vec::new(),
1092 extension_registry: None,
1093 extension_diagnostics: None,
1094 token_tally: None,
1095 }
1096 }
1097}
1098
1099impl AppBuilder {
1100 pub fn new() -> Self {
1101 Self::default()
1102 }
1103
1104 pub fn with_config(mut self, cfg: Config) -> Self {
1105 self.config = Some(cfg);
1106 self
1107 }
1108
1109 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
1110 self.cwd = Some(cwd.into());
1111 self
1112 }
1113
1114 pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
1115 self.permission_gate = Some(gate);
1116 self
1117 }
1118
1119 pub fn with_builtin_tools(mut self) -> Self {
1125 self.install_builtin_tools = true;
1126 self
1127 }
1128
1129 pub fn with_max_iterations(mut self, n: usize) -> Self {
1130 self.max_iterations = n;
1131 self
1132 }
1133
1134 pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
1135 self.llm_override = Some(llm);
1136 self
1137 }
1138
1139 pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
1140 self.permissions_policy_path = Some(path);
1141 self
1142 }
1143
1144 pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
1145 self.ui_tx = Some(tx);
1146 self
1147 }
1148
1149 pub fn with_token_tally(mut self, tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>) -> Self {
1151 self.token_tally = Some(tally);
1152 self
1153 }
1154
1155 pub fn with_headless_permissions(mut self) -> Self {
1160 self.headless_permissions = true;
1161 self
1162 }
1163
1164 pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
1166 self.settings = Some(settings);
1167 self
1168 }
1169
1170 pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
1172 self.auth = Some(auth);
1173 self
1174 }
1175
1176 pub fn disable_context_discovery(mut self) -> Self {
1179 self.context_discovery_disabled = true;
1180 self
1181 }
1182
1183 pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
1186 self.session_store = Some(store);
1187 self
1188 }
1189
1190 pub fn with_autocompact(mut self) -> Self {
1195 self.autocompact_enabled = true;
1196 self
1197 }
1198
1199 pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
1205 self.skills = skills;
1206 self
1207 }
1208
1209 pub fn without_skills(mut self) -> Self {
1210 self.skills.clear();
1211 self
1212 }
1213
1214 pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
1218 self.extra_tools = tools;
1219 self
1220 }
1221
1222 pub fn with_extension_registry(
1223 mut self,
1224 registry: Arc<crate::extensions::ExtensionRegistry>,
1225 ) -> Self {
1226 self.extension_registry = Some(registry);
1227 self
1228 }
1229
1230 pub fn with_extension_diagnostics(
1231 mut self,
1232 diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
1233 ) -> Self {
1234 self.extension_diagnostics = Some(diagnostics);
1235 self
1236 }
1237
1238 pub fn with_mcp_servers(
1241 mut self,
1242 servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1243 ) -> Self {
1244 self.mcp_servers = servers;
1245 self
1246 }
1247
1248 pub fn with_custom_tools_factory(
1253 mut self,
1254 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1255 ) -> Self {
1256 self.custom_tools_factory = Some(Box::new(factory));
1257 self
1258 }
1259
1260 pub async fn build_with_custom_tools(
1264 self,
1265 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1266 ) -> Result<App> {
1267 self.with_custom_tools_factory(factory).build().await
1268 }
1269
1270 pub async fn build_with_session(
1278 mut self,
1279 resume: Option<crate::session::SessionId>,
1280 ) -> Result<App> {
1281 if let Some(id) = resume {
1282 if self.session_store.is_none() {
1283 return Err(AppError::Config(
1284 "build_with_session(Some(id)) requires with_session_store(...)".into(),
1285 ));
1286 }
1287 self.resume_session_id = Some(id);
1288 }
1289 self.build_internal().await
1290 }
1291
1292 pub async fn build(self) -> Result<App> {
1294 self.build_with_session(None).await
1295 }
1296
1297 async fn build_internal(mut self) -> Result<App> {
1298 let mcp_servers = std::mem::take(&mut self.mcp_servers);
1299 let extra_tools = std::mem::take(&mut self.extra_tools);
1300 let skills = self.skills.clone();
1301 if self.install_builtin_tools && self.custom_tools_factory.is_some() {
1302 return Err(AppError::Config(
1303 "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
1304 ));
1305 }
1306
1307 let has_config = self.config.is_some();
1311 let has_auth = self.auth.is_some();
1312 let mut config = self.config.unwrap_or_default();
1313 let settings = match self.settings {
1314 Some(settings) => settings,
1315 None => {
1316 let mut settings = crate::settings::Settings::default();
1317 settings.model.provider = config.model.provider.clone();
1318 settings.model.name = config.model.name.clone();
1319 settings.model.max_tokens = config.model.max_tokens;
1320 settings
1321 }
1322 };
1323 config.model.provider = settings.model.provider.clone();
1324 config.model.name = settings.model.name.clone();
1325 config.model.max_tokens = settings.model.max_tokens;
1326 let mut auth = self.auth.unwrap_or_default();
1327 if !has_auth {
1328 if let Some(key) = config.anthropic.api_key.as_deref() {
1329 auth.0.insert(
1330 "anthropic".into(),
1331 crate::auth::ProviderAuth::ApiKey {
1332 key: key.to_string(),
1333 },
1334 );
1335 }
1336 }
1337 let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
1338 if env_or_auth_key.is_some() || has_auth || !has_config {
1339 config.anthropic.api_key = env_or_auth_key;
1340 }
1341 let cwd = self
1342 .cwd
1343 .or_else(|| std::env::current_dir().ok())
1344 .unwrap_or_else(|| PathBuf::from("."));
1345 let agent_dir = crate::paths::agent_dir();
1346 let permission_gate = self.permission_gate.unwrap_or_else(|| {
1347 if self.ui_tx.is_some() || self.headless_permissions {
1351 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1352 } else {
1353 tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
1354 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1355 }
1356 });
1357
1358 let policy: Arc<crate::permissions::Policy> =
1360 Arc::new(match self.permissions_policy_path.as_ref() {
1361 Some(path) => crate::permissions::Policy::load_or_default(path)?,
1362 None => crate::permissions::Policy::default(),
1363 });
1364 let session_cache = Arc::new(crate::permissions::SessionCache::new());
1365 let permission_strategy_handle = if self.ui_tx.is_some() || self.headless_permissions {
1366 let initial_strategy = if self.ui_tx.is_some() {
1367 PromptStrategy::Prompt
1368 } else {
1369 PromptStrategy::HeadlessDeny
1370 };
1371 Some(Arc::new(tokio::sync::RwLock::new(initial_strategy)))
1372 } else {
1373 None
1374 };
1375 let permission_mode = Arc::new(tokio::sync::RwLock::new(
1376 crate::permissions::PermissionMode::default(),
1377 ));
1378 let token_tally = self
1379 .token_tally
1380 .unwrap_or_else(|| Arc::new(tokio::sync::Mutex::new(TurnStatsAccum::default())));
1381
1382 let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
1384
1385 let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
1390 let cancel_token = probe_ctx.cancel_token.clone();
1391 let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
1392 if let Some(factory_fn) = self.custom_tools_factory.take() {
1393 let mut t = factory_fn(probe_ctx);
1394 t.extend(extra_tools.clone());
1395 (false, t)
1396 } else {
1397 (self.install_builtin_tools, extra_tools.clone())
1398 };
1399
1400 let factory = SessionFactory {
1401 cwd: cwd.clone(),
1402 settings: Arc::new(Mutex::new(settings.clone())),
1403 auth: auth.clone(),
1404 policy: Arc::clone(&policy),
1405 session_cache: Arc::clone(&session_cache),
1406 ui_tx: self.ui_tx.clone(),
1407 headless_permissions: self.headless_permissions,
1408 permission_gate: Arc::clone(&permission_gate),
1409 permission_strategy_handle: permission_strategy_handle.clone(),
1410 progress_tx: progress_tx.clone(),
1411 skills: Arc::new(skills.clone()),
1412 install_builtin_tools: install_builtin,
1413 extra_tools: factory_extra_tools,
1414 max_iterations: self.max_iterations,
1415 context_discovery_disabled: self.context_discovery_disabled,
1416 autocompact_enabled: self.autocompact_enabled,
1417 session_store: self.session_store.clone(),
1418 llm_override: self.llm_override.clone(),
1419 current_model: Arc::new(Mutex::new(None)),
1420 cancel_token: cancel_token.clone(),
1421 };
1422
1423 let mode = match self.resume_session_id.take() {
1424 Some(id) => SessionMode::Resume(id.into_string()),
1425 None => SessionMode::New,
1426 };
1427 let (session, llm) = factory.build(mode, None, None).await?;
1428 let (extension_registry, extension_diagnostics) =
1429 if let Some(reg) = self.extension_registry.take() {
1430 let diagnostics = self
1431 .extension_diagnostics
1432 .unwrap_or_else(|| Arc::new(Vec::new()));
1433 (reg, diagnostics)
1434 } else {
1435 let manifest_path = crate::paths::extensions_manifest_path(&agent_dir);
1436 let (registry, diagnostics) =
1437 crate::extensions::load_extensions_manifest(&manifest_path).await;
1438 for d in &diagnostics {
1439 let message = d.message.as_str();
1440 match d.severity {
1441 crate::extensions::DiagnosticSeverity::Warn => {
1442 tracing::warn!("extensions: {message}");
1443 }
1444 crate::extensions::DiagnosticSeverity::Error => {
1445 tracing::error!("extensions: {message}");
1446 }
1447 }
1448 }
1449 (Arc::new(registry), Arc::new(diagnostics))
1450 };
1451
1452 Ok(App {
1453 session: arc_swap::ArcSwap::from_pointee(session),
1454 llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1455 factory,
1456 config,
1457 cancel_token,
1458 progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1459 next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1460 skills: Arc::new(skills),
1461 mcp_servers,
1462 extension_registry,
1463 token_tally,
1464 extension_diagnostics,
1465 session_cache,
1466 permission_mode,
1467 permission_strategy_handle,
1468 ui_tx_owned: self.ui_tx.clone(),
1469 })
1470 }
1471}
1472
1473#[cfg(test)]
1474mod tests {
1475 use super::*;
1476 use crate::config::{AnthropicConfig, ModelConfig};
1477 use crate::events::UiEvent;
1478 use crate::user_message::UserMessage;
1479 use async_trait::async_trait;
1480 use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1481 use motosan_agent_tool::ToolDef;
1482 use std::sync::atomic::{AtomicUsize, Ordering};
1483
1484 #[test]
1485 fn turn_stats_accum_accumulates_across_calls() {
1486 let mut accum = TurnStatsAccum::default();
1487 accum.add(motosan_agent_loop::TokenUsage {
1488 input_tokens: 100,
1489 output_tokens: 25,
1490 });
1491 accum.add(motosan_agent_loop::TokenUsage {
1492 input_tokens: 1000,
1493 output_tokens: 250,
1494 });
1495 assert_eq!(accum.cumulative_input, 1100);
1496 assert_eq!(accum.cumulative_output, 275);
1497 assert_eq!(accum.turn_count, 2);
1498 }
1499
1500 #[test]
1501 fn turn_stats_accum_saturates_on_overflow() {
1502 let mut accum = TurnStatsAccum {
1503 cumulative_input: u64::MAX - 5,
1504 cumulative_output: 0,
1505 turn_count: 1,
1506 };
1507 accum.add(motosan_agent_loop::TokenUsage {
1508 input_tokens: 100,
1509 output_tokens: 0,
1510 });
1511 assert_eq!(accum.cumulative_input, u64::MAX);
1512 }
1513
1514 #[test]
1515 fn extract_thinking_events_extracts_reasoning_in_source_order() {
1516 use motosan_agent_loop::{new_message_id, AssistantContent, MessageMeta};
1517 let messages = vec![Message::Assistant {
1518 id: new_message_id(),
1519 meta: MessageMeta::default(),
1520 content: vec![
1521 AssistantContent::Reasoning {
1522 text: "first thought".into(),
1523 signature: None,
1524 },
1525 AssistantContent::Text {
1526 text: "answer 1".into(),
1527 },
1528 AssistantContent::Reasoning {
1529 text: "second thought".into(),
1530 signature: None,
1531 },
1532 ],
1533 }];
1534 let events = extract_thinking_events(&messages);
1535 assert_eq!(events.len(), 2);
1536 match (&events[0], &events[1]) {
1537 (UiEvent::ThinkingComplete { text: t1 }, UiEvent::ThinkingComplete { text: t2 }) => {
1538 assert_eq!(t1, "first thought");
1539 assert_eq!(t2, "second thought");
1540 }
1541 _ => panic!("expected two ThinkingComplete events"),
1542 }
1543 }
1544
1545 #[test]
1546 fn extract_new_thinking_events_skips_historical_reasoning_before_previous_len() {
1547 use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1548 let messages = vec![
1549 Message::User {
1550 id: new_message_id(),
1551 meta: MessageMeta::default(),
1552 content: vec![ContentPart::text("old question")],
1553 },
1554 Message::Assistant {
1555 id: new_message_id(),
1556 meta: MessageMeta::default(),
1557 content: vec![AssistantContent::Reasoning {
1558 text: "old thought".into(),
1559 signature: None,
1560 }],
1561 },
1562 Message::User {
1563 id: new_message_id(),
1564 meta: MessageMeta::default(),
1565 content: vec![ContentPart::text("new question")],
1566 },
1567 Message::Assistant {
1568 id: new_message_id(),
1569 meta: MessageMeta::default(),
1570 content: vec![AssistantContent::Reasoning {
1571 text: "new thought".into(),
1572 signature: None,
1573 }],
1574 },
1575 ];
1576
1577 let events = extract_new_thinking_events(&messages, 2);
1578 assert_eq!(events.len(), 1, "should only emit current-turn reasoning");
1579 match &events[0] {
1580 UiEvent::ThinkingComplete { text } => assert_eq!(text, "new thought"),
1581 other => panic!("expected ThinkingComplete; got {other:?}"),
1582 }
1583 }
1584
1585 #[test]
1586 fn extract_thinking_events_skips_non_assistant_and_non_reasoning() {
1587 use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1588 let messages = vec![
1589 Message::User {
1590 id: new_message_id(),
1591 meta: MessageMeta::default(),
1592 content: vec![ContentPart::text("question?")],
1593 },
1594 Message::Assistant {
1595 id: new_message_id(),
1596 meta: MessageMeta::default(),
1597 content: vec![AssistantContent::Text {
1598 text: "answer".into(),
1599 }],
1600 },
1601 ];
1602 let events = extract_thinking_events(&messages);
1603 assert!(events.is_empty(), "expected no events; got {events:?}");
1604 }
1605
1606 #[tokio::test]
1607 async fn builder_fails_without_api_key() {
1608 let cfg = Config {
1609 anthropic: AnthropicConfig {
1610 api_key: None,
1611 base_url: "https://api.anthropic.com".into(),
1612 },
1613 model: ModelConfig {
1614 provider: "anthropic".into(),
1615 name: "claude-sonnet-4-6".into(),
1616 max_tokens: 4096,
1617 },
1618 };
1619 let err = match AppBuilder::new()
1620 .with_config(cfg)
1621 .with_builtin_tools()
1622 .build()
1623 .await
1624 {
1625 Ok(_) => panic!("must fail without key"),
1626 Err(err) => err,
1627 };
1628 assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1629 }
1630
1631 struct ToolOnlyLlm {
1632 turn: AtomicUsize,
1633 }
1634
1635 #[async_trait]
1636 impl LlmClient for ToolOnlyLlm {
1637 async fn chat(
1638 &self,
1639 _messages: &[Message],
1640 _tools: &[ToolDef],
1641 ) -> motosan_agent_loop::Result<ChatOutput> {
1642 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1643 if turn == 0 {
1644 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1645 ToolCallItem {
1646 id: "t1".into(),
1647 name: "read".into(),
1648 args: serde_json::json!({"path":"nope.txt"}),
1649 },
1650 ])))
1651 } else {
1652 Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1653 }
1654 }
1655 }
1656
1657 #[tokio::test]
1658 async fn empty_final_message_is_not_emitted() {
1659 let dir = tempfile::tempdir().unwrap();
1660 let mut cfg = Config::default();
1661 cfg.anthropic.api_key = Some("sk-unused".into());
1662 let app = AppBuilder::new()
1663 .with_config(cfg)
1664 .with_cwd(dir.path())
1665 .with_builtin_tools()
1666 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1667 turn: AtomicUsize::new(0),
1668 }))
1669 .build()
1670 .await
1671 .expect("build");
1672 let events: Vec<UiEvent> =
1673 futures::StreamExt::collect(app.send_user_message(UserMessage::text("x"))).await;
1674 let empties = events
1675 .iter()
1676 .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1677 .count();
1678 assert_eq!(
1679 empties, 0,
1680 "should not emit empty final message, got: {events:?}"
1681 );
1682 }
1683
1684 struct EchoLlm;
1685
1686 struct UsageLlm {
1687 responses: std::sync::Mutex<VecDeque<ChatOutput>>,
1688 }
1689
1690 #[async_trait]
1691 impl LlmClient for UsageLlm {
1692 async fn chat(
1693 &self,
1694 _messages: &[Message],
1695 _tools: &[ToolDef],
1696 ) -> motosan_agent_loop::Result<ChatOutput> {
1697 let next = match self.responses.lock() {
1698 Ok(mut responses) => responses.pop_front(),
1699 Err(poisoned) => poisoned.into_inner().pop_front(),
1700 };
1701 Ok(next.unwrap_or_else(|| panic!("UsageLlm script exhausted")))
1702 }
1703 }
1704
1705 #[async_trait]
1706 impl LlmClient for EchoLlm {
1707 async fn chat(
1708 &self,
1709 _messages: &[Message],
1710 _tools: &[ToolDef],
1711 ) -> motosan_agent_loop::Result<ChatOutput> {
1712 Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1713 }
1714 }
1715
1716 #[tokio::test]
1717 async fn turn_stats_emitted_with_cumulative_after_each_turn() {
1718 use motosan_agent_loop::{LlmResponse, TokenUsage};
1719
1720 let dir = tempfile::tempdir().expect("tempdir");
1721 let mut cfg = Config::default();
1722 cfg.anthropic.api_key = Some("sk-unused".into());
1723 let llm = Arc::new(UsageLlm {
1724 responses: std::sync::Mutex::new(VecDeque::from([
1725 ChatOutput::with_usage(
1726 LlmResponse::Message("first".into()),
1727 TokenUsage {
1728 input_tokens: 100,
1729 output_tokens: 50,
1730 },
1731 ),
1732 ChatOutput::with_usage(
1733 LlmResponse::Message("second".into()),
1734 TokenUsage {
1735 input_tokens: 200,
1736 output_tokens: 80,
1737 },
1738 ),
1739 ])),
1740 });
1741 let app = AppBuilder::new()
1742 .with_config(cfg)
1743 .with_cwd(dir.path())
1744 .with_builtin_tools()
1745 .with_llm(llm)
1746 .build()
1747 .await
1748 .expect("build");
1749
1750 let events_1: Vec<UiEvent> = app
1751 .send_user_message(UserMessage::text("hi"))
1752 .collect()
1753 .await;
1754 let events_2: Vec<UiEvent> = app
1755 .send_user_message(UserMessage::text("again"))
1756 .collect()
1757 .await;
1758
1759 let ts1 = events_1
1760 .iter()
1761 .find_map(|e| match e {
1762 UiEvent::TurnStats {
1763 input_tokens,
1764 output_tokens,
1765 cumulative_input,
1766 cumulative_output,
1767 ..
1768 } => Some((
1769 *input_tokens,
1770 *output_tokens,
1771 *cumulative_input,
1772 *cumulative_output,
1773 )),
1774 _ => None,
1775 })
1776 .expect("turn 1 had no TurnStats");
1777 assert_eq!(ts1, (100, 50, 100, 50), "turn 1 stats");
1778
1779 let ts2 = events_2
1780 .iter()
1781 .find_map(|e| match e {
1782 UiEvent::TurnStats {
1783 input_tokens,
1784 output_tokens,
1785 cumulative_input,
1786 cumulative_output,
1787 ..
1788 } => Some((
1789 *input_tokens,
1790 *output_tokens,
1791 *cumulative_input,
1792 *cumulative_output,
1793 )),
1794 _ => None,
1795 })
1796 .expect("turn 2 had no TurnStats");
1797 assert_eq!(ts2, (200, 80, 300, 130), "turn 2 stats");
1798
1799 let positions: Vec<&str> = events_1
1800 .iter()
1801 .filter_map(|e| match e {
1802 UiEvent::AgentMessageComplete(_) => Some("msg_complete"),
1803 UiEvent::TurnStats { .. } => Some("stats"),
1804 UiEvent::AgentTurnComplete => Some("turn_complete"),
1805 _ => None,
1806 })
1807 .collect();
1808 assert_eq!(
1809 positions,
1810 vec!["msg_complete", "stats", "turn_complete"],
1811 "wrong ordering"
1812 );
1813 }
1814
1815 #[tokio::test]
1816 async fn turn_stats_reset_after_new_session() {
1817 use motosan_agent_loop::{LlmResponse, TokenUsage};
1818
1819 let dir = tempfile::tempdir().expect("tempdir");
1820 let mut cfg = Config::default();
1821 cfg.anthropic.api_key = Some("sk-unused".into());
1822 let llm = Arc::new(UsageLlm {
1823 responses: std::sync::Mutex::new(VecDeque::from([
1824 ChatOutput::with_usage(
1825 LlmResponse::Message("first".into()),
1826 TokenUsage {
1827 input_tokens: 100,
1828 output_tokens: 50,
1829 },
1830 ),
1831 ChatOutput::with_usage(
1832 LlmResponse::Message("after-new".into()),
1833 TokenUsage {
1834 input_tokens: 7,
1835 output_tokens: 3,
1836 },
1837 ),
1838 ])),
1839 });
1840 let app = AppBuilder::new()
1841 .with_config(cfg)
1842 .with_cwd(dir.path())
1843 .with_builtin_tools()
1844 .with_llm(llm)
1845 .build()
1846 .await
1847 .expect("build");
1848
1849 let _: Vec<UiEvent> = app
1850 .send_user_message(UserMessage::text("hi"))
1851 .collect()
1852 .await;
1853 app.new_session().await.expect("new session");
1854 let events: Vec<UiEvent> = app
1855 .send_user_message(UserMessage::text("after new"))
1856 .collect()
1857 .await;
1858 let stats = events
1859 .iter()
1860 .find_map(|event| match event {
1861 UiEvent::TurnStats {
1862 input_tokens,
1863 output_tokens,
1864 cumulative_input,
1865 cumulative_output,
1866 ..
1867 } => Some((
1868 *input_tokens,
1869 *output_tokens,
1870 *cumulative_input,
1871 *cumulative_output,
1872 )),
1873 _ => None,
1874 })
1875 .expect("turn had no TurnStats");
1876 assert_eq!(stats, (7, 3, 7, 3));
1877 }
1878
1879 #[tokio::test]
1880 async fn with_headless_permissions_builds_an_app() {
1881 let dir = tempfile::tempdir().expect("tempdir");
1882 let mut config = Config::default();
1883 config.anthropic.api_key = Some("sk-unused".into());
1884 let app = AppBuilder::new()
1885 .with_config(config)
1886 .with_cwd(dir.path())
1887 .with_builtin_tools()
1888 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1889 .with_headless_permissions()
1890 .build()
1891 .await
1892 .expect("build");
1893 assert!(!app.session_id().is_empty());
1895 }
1896
1897 #[tokio::test]
1898 async fn new_session_swaps_in_a_fresh_empty_session() {
1899 use futures::StreamExt;
1900 let dir = tempfile::tempdir().expect("tempdir");
1901 let mut config = Config::default();
1902 config.anthropic.api_key = Some("sk-unused".into());
1903 let app = AppBuilder::new()
1904 .with_config(config)
1905 .with_cwd(dir.path())
1906 .with_builtin_tools()
1907 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1908 .build()
1909 .await
1910 .expect("build");
1911
1912 let _: Vec<_> = app
1913 .send_user_message(UserMessage::text("hello"))
1914 .collect()
1915 .await;
1916 let id_before = app.session_id();
1917 assert!(!app.session_history().await.expect("history").is_empty());
1918
1919 app.new_session().await.expect("new_session");
1920
1921 assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1922 assert!(
1923 app.session_history().await.expect("history").is_empty(),
1924 "fresh session has no history"
1925 );
1926 }
1927
1928 #[tokio::test]
1929 async fn load_session_restores_a_stored_session_by_id() {
1930 use futures::StreamExt;
1931 let dir = tempfile::tempdir().expect("tempdir");
1932 let store_dir = dir.path().join("sessions");
1933 let store: Arc<dyn SessionStore> =
1934 Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1935 let mut config = Config::default();
1936 config.anthropic.api_key = Some("sk-unused".into());
1937 let app = AppBuilder::new()
1938 .with_config(config)
1939 .with_cwd(dir.path())
1940 .with_builtin_tools()
1941 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1942 .with_session_store(Arc::clone(&store))
1943 .build()
1944 .await
1945 .expect("build");
1946
1947 let _: Vec<_> = app
1948 .send_user_message(UserMessage::text("remember this"))
1949 .collect()
1950 .await;
1951 let original_id = app.session_id();
1952
1953 app.new_session().await.expect("new_session");
1954 assert_ne!(app.session_id(), original_id);
1955
1956 app.load_session(&original_id).await.expect("load_session");
1957 assert_eq!(app.session_id(), original_id);
1958 let history = app.session_history().await.expect("history");
1959 assert!(
1960 history.iter().any(|m| m.text().contains("remember this")),
1961 "loaded session should carry the original turn"
1962 );
1963 }
1964
1965 #[tokio::test]
1966 async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
1967 use futures::StreamExt;
1968 let dir = tempfile::tempdir().expect("tempdir");
1969 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1970 dir.path().join("s"),
1971 ));
1972 let mut config = Config::default();
1973 config.anthropic.api_key = Some("sk-unused".into());
1974 let app = AppBuilder::new()
1975 .with_config(config)
1976 .with_cwd(dir.path())
1977 .with_builtin_tools()
1978 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1979 .with_session_store(store)
1980 .build()
1981 .await
1982 .expect("build");
1983
1984 let _: Vec<_> = app
1985 .send_user_message(UserMessage::text("hello"))
1986 .collect()
1987 .await;
1988 let original_id = app.session_id();
1989
1990 let new_id = app.clone_session().await.expect("clone_session");
1991
1992 assert_ne!(new_id, original_id);
1994 assert_eq!(app.session_id(), new_id);
1995 let history = app.session_history().await.expect("history");
1997 assert!(history.iter().any(|m| m.text().contains("hello")));
1998 }
1999
2000 #[tokio::test]
2001 async fn fork_from_creates_a_branch_off_an_earlier_entry() {
2002 use futures::StreamExt;
2003 let dir = tempfile::tempdir().expect("tempdir");
2004 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2005 dir.path().join("s"),
2006 ));
2007 let mut config = Config::default();
2008 config.anthropic.api_key = Some("sk-unused".into());
2009 let app = AppBuilder::new()
2010 .with_config(config)
2011 .with_cwd(dir.path())
2012 .with_builtin_tools()
2013 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2014 .with_session_store(store)
2015 .build()
2016 .await
2017 .expect("build");
2018
2019 let _: Vec<_> = app
2021 .send_user_message(UserMessage::text("first"))
2022 .collect()
2023 .await;
2024 let _: Vec<_> = app
2025 .send_user_message(UserMessage::text("second"))
2026 .collect()
2027 .await;
2028
2029 let entries = app.session.load_full().entries().await.expect("entries");
2031 let first_id = entries
2032 .iter()
2033 .find_map(|stored| {
2034 let msg = stored.entry.as_message()?;
2035 (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
2036 .then(|| stored.id.clone())
2037 })
2038 .expect("first user message present");
2039
2040 let _: Vec<_> = app
2042 .fork_from(first_id, UserMessage::text("branched"))
2043 .collect()
2044 .await;
2045
2046 let history = app.session_history().await.expect("history");
2047 let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
2048 assert!(
2049 texts.iter().any(|t| t.contains("first")),
2050 "fork keeps the fork-point ancestor"
2051 );
2052 assert!(
2053 texts.iter().any(|t| t.contains("branched")),
2054 "fork includes the new message"
2055 );
2056 assert!(
2057 !texts.iter().any(|t| t.contains("second")),
2058 "fork excludes the abandoned branch"
2059 );
2060 }
2061
2062 #[tokio::test]
2063 async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
2064 use futures::StreamExt;
2065 let dir = tempfile::tempdir().expect("tempdir");
2066 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2067 dir.path().join("s"),
2068 ));
2069 let mut config = Config::default();
2070 config.anthropic.api_key = Some("sk-unused".into());
2071 let app = AppBuilder::new()
2072 .with_config(config)
2073 .with_cwd(dir.path())
2074 .with_builtin_tools()
2075 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2076 .with_session_store(store)
2077 .build()
2078 .await
2079 .expect("build");
2080
2081 let _: Vec<_> = app
2082 .send_user_message(UserMessage::text("alpha"))
2083 .collect()
2084 .await;
2085 let _: Vec<_> = app
2086 .send_user_message(UserMessage::text("bravo"))
2087 .collect()
2088 .await;
2089
2090 let candidates = app.fork_candidates().await.expect("candidates");
2091 let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
2092 assert!(previews[0].contains("bravo"), "got {previews:?}");
2094 assert!(previews.iter().any(|p| p.contains("alpha")));
2095 assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
2097 }
2098
2099 #[tokio::test]
2100 async fn branches_returns_a_tree_for_a_linear_session() {
2101 use futures::StreamExt;
2102 let dir = tempfile::tempdir().expect("tempdir");
2103 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2104 dir.path().join("s"),
2105 ));
2106 let mut config = Config::default();
2107 config.anthropic.api_key = Some("sk-unused".into());
2108 let app = AppBuilder::new()
2109 .with_config(config)
2110 .with_cwd(dir.path())
2111 .with_builtin_tools()
2112 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2113 .with_session_store(store)
2114 .build()
2115 .await
2116 .expect("build");
2117
2118 let _: Vec<_> = app
2119 .send_user_message(UserMessage::text("hello"))
2120 .collect()
2121 .await;
2122 let tree = app.branches().await.expect("branches");
2123 assert!(!tree.nodes.is_empty());
2125 assert!(tree.active_leaf.is_some());
2126 }
2127
2128 #[tokio::test]
2129 async fn reload_settings_rebuilds_session_and_resets_token_tally() {
2130 let dir = tempfile::tempdir().expect("tempdir");
2131 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2132 dir.path().join("s"),
2133 ));
2134 let mut cfg = Config::default();
2135 cfg.anthropic.api_key = Some("sk-unused".into());
2136
2137 let llm = Arc::new(UsageLlm {
2138 responses: std::sync::Mutex::new(VecDeque::from([
2139 ChatOutput::with_usage(
2140 LlmResponse::Message("first".into()),
2141 motosan_agent_loop::TokenUsage {
2142 input_tokens: 100,
2143 output_tokens: 50,
2144 },
2145 ),
2146 ChatOutput::with_usage(
2147 LlmResponse::Message("after-reload".into()),
2148 motosan_agent_loop::TokenUsage {
2149 input_tokens: 5,
2150 output_tokens: 2,
2151 },
2152 ),
2153 ])),
2154 });
2155 let app = AppBuilder::new()
2156 .with_config(cfg)
2157 .with_cwd(dir.path())
2158 .with_builtin_tools()
2159 .with_llm(llm)
2160 .with_session_store(store)
2161 .build()
2162 .await
2163 .expect("build");
2164
2165 let _: Vec<UiEvent> = app
2166 .send_user_message(crate::user_message::UserMessage::text("hi"))
2167 .collect()
2168 .await;
2169 {
2170 let token_tally = app.token_tally();
2171 let tally = token_tally.lock().await;
2172 assert_eq!(tally.cumulative_input, 100);
2173 assert_eq!(tally.cumulative_output, 50);
2174 assert_eq!(tally.turn_count, 1);
2175 }
2176
2177 let mut new_settings = crate::settings::Settings::default();
2178 new_settings.model.name = "claude-opus-4-7".into();
2179 new_settings.ui.footer_show_cost = false;
2180 app.reload_settings(new_settings).await.expect("reload");
2181 assert_eq!(app.factory.settings().model.name, "claude-opus-4-7");
2182 assert!(!app.factory.settings().ui.footer_show_cost);
2183 assert_eq!(app.factory.current_model(), None);
2184
2185 {
2186 let token_tally = app.token_tally();
2187 let tally = token_tally.lock().await;
2188 assert_eq!(tally.cumulative_input, 0, "tally not reset on reload");
2189 assert_eq!(tally.cumulative_output, 0);
2190 assert_eq!(tally.turn_count, 0);
2191 }
2192
2193 let _: Vec<UiEvent> = app
2194 .send_user_message(crate::user_message::UserMessage::text("after"))
2195 .collect()
2196 .await;
2197 {
2198 let token_tally = app.token_tally();
2199 let tally = token_tally.lock().await;
2200 assert_eq!(tally.cumulative_input, 5);
2201 assert_eq!(tally.cumulative_output, 2);
2202 assert_eq!(tally.turn_count, 1);
2203 }
2204 }
2205
2206 #[tokio::test]
2207 async fn switch_model_preserves_history() {
2208 use futures::StreamExt;
2209 let dir = tempfile::tempdir().expect("tempdir");
2210 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2211 dir.path().join("s"),
2212 ));
2213 let mut config = Config::default();
2214 config.anthropic.api_key = Some("sk-unused".into());
2215 let app = AppBuilder::new()
2216 .with_config(config)
2217 .with_cwd(dir.path())
2218 .with_builtin_tools()
2219 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2220 .with_session_store(store)
2221 .build()
2222 .await
2223 .expect("build");
2224
2225 let _: Vec<_> = app
2226 .send_user_message(UserMessage::text("keep me"))
2227 .collect()
2228 .await;
2229 let id_before = app.session_id();
2230
2231 app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
2232 .await
2233 .expect("switch_model");
2234
2235 assert_eq!(
2236 app.session_id(),
2237 id_before,
2238 "switch_model keeps the same session"
2239 );
2240 let history = app.session_history().await.expect("history");
2241 assert!(history.iter().any(|m| m.text().contains("keep me")));
2242 }
2243
2244 #[tokio::test]
2245 async fn switch_model_is_sticky_for_future_session_rebuilds() {
2246 let dir = tempfile::tempdir().expect("tempdir");
2247 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2248 dir.path().join("s"),
2249 ));
2250 let mut config = Config::default();
2251 config.anthropic.api_key = Some("sk-unused".into());
2252 let app = AppBuilder::new()
2253 .with_config(config)
2254 .with_cwd(dir.path())
2255 .with_builtin_tools()
2256 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2257 .with_session_store(store)
2258 .build()
2259 .await
2260 .expect("build");
2261
2262 let selected = crate::model::ModelId::from("claude-opus-4-7");
2263 app.switch_model(&selected).await.expect("switch_model");
2264 app.new_session().await.expect("new_session");
2265
2266 assert_eq!(app.factory.current_model(), Some(selected.clone()));
2267 assert_eq!(app.settings().model.name, selected.to_string());
2268 }
2269
2270 struct SleepThenDoneLlm {
2271 turn: AtomicUsize,
2272 }
2273
2274 #[async_trait]
2275 impl LlmClient for SleepThenDoneLlm {
2276 async fn chat(
2277 &self,
2278 _messages: &[Message],
2279 _tools: &[ToolDef],
2280 ) -> motosan_agent_loop::Result<ChatOutput> {
2281 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2282 if turn == 0 {
2283 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2284 ToolCallItem {
2285 id: "sleep".into(),
2286 name: "bash".into(),
2287 args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
2288 },
2289 ])))
2290 } else {
2291 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2292 }
2293 }
2294 }
2295
2296 #[tokio::test]
2297 async fn cancel_reaches_builtin_tools_after_session_rebuild() {
2298 use futures::StreamExt;
2299 let dir = tempfile::tempdir().expect("tempdir");
2300 let mut config = Config::default();
2301 config.anthropic.api_key = Some("sk-unused".into());
2302 let app = Arc::new(
2303 AppBuilder::new()
2304 .with_config(config)
2305 .with_cwd(dir.path())
2306 .with_builtin_tools()
2307 .with_llm(Arc::new(SleepThenDoneLlm {
2308 turn: AtomicUsize::new(0),
2309 }) as Arc<dyn LlmClient>)
2310 .build()
2311 .await
2312 .expect("build"),
2313 );
2314
2315 app.new_session().await.expect("new_session");
2316 let running_app = Arc::clone(&app);
2317 let handle = tokio::spawn(async move {
2318 running_app
2319 .send_user_message(UserMessage::text("run a slow command"))
2320 .collect::<Vec<_>>()
2321 .await
2322 });
2323 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2324 app.cancel();
2325
2326 let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
2327 .await
2328 .expect("turn should finish after cancellation")
2329 .expect("join");
2330 assert!(
2331 events.iter().any(|event| {
2332 matches!(
2333 event,
2334 UiEvent::ToolCallCompleted { result, .. }
2335 if result.text.contains("command cancelled by user")
2336 )
2337 }),
2338 "cancel should reach the rebuilt bash tool: {events:?}"
2339 );
2340 }
2341
2342 #[tokio::test]
2343 async fn compact_summarizes_a_session_with_enough_history() {
2344 struct DoneLlm;
2345 #[async_trait]
2346 impl LlmClient for DoneLlm {
2347 async fn chat(
2348 &self,
2349 _messages: &[Message],
2350 _tools: &[ToolDef],
2351 ) -> motosan_agent_loop::Result<ChatOutput> {
2352 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2353 }
2354 }
2355
2356 let dir = tempfile::tempdir().expect("tempdir");
2357 let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
2358 dir.path().join("sessions"),
2359 ));
2360 let mut config = Config::default();
2361 config.anthropic.api_key = Some("sk-unused".into());
2362 let app = AppBuilder::new()
2363 .with_config(config)
2364 .with_cwd(dir.path())
2365 .with_builtin_tools()
2366 .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
2367 .with_session_store(store)
2368 .build()
2369 .await
2370 .expect("build");
2371
2372 for i in 0..4 {
2379 let _: Vec<_> = app
2380 .send_user_message(UserMessage::text(format!("turn {i}")))
2381 .collect()
2382 .await;
2383 }
2384
2385 app.compact().await.expect("compact should succeed");
2386
2387 let history = app.session_history().await.expect("history");
2390 assert!(
2391 !history.is_empty(),
2392 "session should still have content post-compaction"
2393 );
2394 }
2395
2396 #[test]
2397 fn anthropic_env_api_key_overrides_auth_json_key() {
2398 let mut auth = crate::auth::Auth::default();
2399 auth.0.insert(
2400 "anthropic".into(),
2401 crate::auth::ProviderAuth::ApiKey {
2402 key: "sk-auth".into(),
2403 },
2404 );
2405
2406 let key = anthropic_api_key_from(&auth, |name| {
2407 (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
2408 });
2409 assert_eq!(key.as_deref(), Some("sk-env"));
2410 }
2411
2412 #[tokio::test]
2413 async fn with_settings_overrides_deprecated_config_model() {
2414 use crate::settings::Settings;
2415
2416 let mut config = Config::default();
2417 config.model.name = "from-config".into();
2418 config.anthropic.api_key = Some("sk-config".into());
2419
2420 let mut settings = Settings::default();
2421 settings.model.name = "from-settings".into();
2422
2423 let tmp = tempfile::tempdir().unwrap();
2424 let app = AppBuilder::new()
2425 .with_config(config)
2426 .with_settings(settings)
2427 .with_cwd(tmp.path())
2428 .disable_context_discovery()
2429 .with_llm(Arc::new(EchoLlm))
2430 .build()
2431 .await
2432 .expect("build");
2433 assert_eq!(app.config().model.name, "from-settings");
2434 assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
2435 }
2436
2437 #[tokio::test]
2438 async fn with_settings_synthesises_legacy_config_for_build() {
2439 use crate::auth::{Auth, ProviderAuth};
2440 use crate::settings::Settings;
2441
2442 let mut settings = Settings::default();
2443 settings.model.name = "claude-sonnet-4-6".into();
2444
2445 let mut auth = Auth::default();
2446 auth.0.insert(
2447 "anthropic".into(),
2448 ProviderAuth::ApiKey {
2449 key: "sk-test".into(),
2450 },
2451 );
2452
2453 let tmp = tempfile::tempdir().unwrap();
2454 let app = AppBuilder::new()
2455 .with_settings(settings)
2456 .with_auth(auth)
2457 .with_cwd(tmp.path())
2458 .with_builtin_tools()
2459 .disable_context_discovery()
2460 .with_llm(Arc::new(EchoLlm))
2461 .build()
2462 .await
2463 .expect("build");
2464 let _ = app;
2465 }
2466
2467 #[tokio::test]
2468 async fn cancel_before_turn_does_not_poison_future_turns() {
2469 let dir = tempfile::tempdir().unwrap();
2470 let mut cfg = Config::default();
2471 cfg.anthropic.api_key = Some("sk-unused".into());
2472 let app = AppBuilder::new()
2473 .with_config(cfg)
2474 .with_cwd(dir.path())
2475 .with_builtin_tools()
2476 .with_llm(std::sync::Arc::new(EchoLlm))
2477 .build()
2478 .await
2479 .expect("build");
2480
2481 app.cancel();
2482 let events: Vec<UiEvent> = app
2483 .send_user_message(UserMessage::text("x"))
2484 .collect()
2485 .await;
2486
2487 assert!(
2488 events
2489 .iter()
2490 .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
2491 "turn should use a fresh cancellation token: {events:?}"
2492 );
2493 }
2494
2495 #[test]
2496 fn map_event_matches_started_and_completed_ids_by_tool_name() {
2497 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2498
2499 let started_bash = map_event(
2500 AgentEvent::Core(CoreEvent::ToolStarted {
2501 name: "bash".into(),
2502 args: serde_json::json!({}),
2503 }),
2504 &tracker,
2505 );
2506 let started_read = map_event(
2507 AgentEvent::Core(CoreEvent::ToolStarted {
2508 name: "read".into(),
2509 args: serde_json::json!({}),
2510 }),
2511 &tracker,
2512 );
2513 let completed_bash = map_event(
2514 AgentEvent::Core(CoreEvent::ToolCompleted {
2515 name: "bash".into(),
2516 result: motosan_agent_tool::ToolResult::text("ok"),
2517 }),
2518 &tracker,
2519 );
2520 let completed_read = map_event(
2521 AgentEvent::Core(CoreEvent::ToolCompleted {
2522 name: "read".into(),
2523 result: motosan_agent_tool::ToolResult::text("ok"),
2524 }),
2525 &tracker,
2526 );
2527
2528 assert!(matches!(
2529 started_bash,
2530 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
2531 ));
2532 assert!(matches!(
2533 started_read,
2534 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
2535 ));
2536 assert!(matches!(
2537 completed_bash,
2538 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
2539 ));
2540 assert!(matches!(
2541 completed_read,
2542 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
2543 ));
2544 }
2545
2546 #[test]
2547 fn map_event_forwards_tool_started_args() {
2548 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2553 let args = serde_json::json!({"path": "src/main.rs", "limit": 20});
2554 let mapped = map_event(
2555 AgentEvent::Core(CoreEvent::ToolStarted {
2556 name: "read".into(),
2557 args: args.clone(),
2558 }),
2559 &tracker,
2560 );
2561 match mapped {
2562 Some(UiEvent::ToolCallStarted {
2563 args: forwarded, ..
2564 }) => assert_eq!(forwarded, args, "args must round-trip from CoreEvent"),
2565 other => panic!("expected ToolCallStarted; got {other:?}"),
2566 }
2567 }
2568
2569 #[test]
2570 fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
2571 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2572 let s1 = map_event(
2573 AgentEvent::Core(CoreEvent::ToolStarted {
2574 name: "bash".into(),
2575 args: serde_json::json!({}),
2576 }),
2577 &tracker,
2578 );
2579 let s2 = map_event(
2580 AgentEvent::Core(CoreEvent::ToolStarted {
2581 name: "bash".into(),
2582 args: serde_json::json!({}),
2583 }),
2584 &tracker,
2585 );
2586 let c1 = map_event(
2587 AgentEvent::Core(CoreEvent::ToolCompleted {
2588 name: "bash".into(),
2589 result: motosan_agent_tool::ToolResult::text("a"),
2590 }),
2591 &tracker,
2592 );
2593 let c2 = map_event(
2594 AgentEvent::Core(CoreEvent::ToolCompleted {
2595 name: "bash".into(),
2596 result: motosan_agent_tool::ToolResult::text("b"),
2597 }),
2598 &tracker,
2599 );
2600
2601 let id_s1 = match s1 {
2602 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2603 other => panic!("{other:?}"),
2604 };
2605 let id_s2 = match s2 {
2606 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2607 other => panic!("{other:?}"),
2608 };
2609 let id_c1 = match c1 {
2610 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2611 other => panic!("{other:?}"),
2612 };
2613 let id_c2 = match c2 {
2614 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2615 other => panic!("{other:?}"),
2616 };
2617
2618 assert_eq!(id_s1, id_c1);
2619 assert_eq!(id_s2, id_c2);
2620 assert_ne!(id_s1, id_s2);
2621 }
2622
2623 #[tokio::test]
2624 async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
2625 let dir = tempfile::tempdir().unwrap();
2626 let mut cfg = Config::default();
2627 cfg.anthropic.api_key = Some("sk-unused".into());
2628 let app = AppBuilder::new()
2629 .with_config(cfg)
2630 .with_cwd(dir.path())
2631 .with_builtin_tools()
2632 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2633 turn: AtomicUsize::new(0),
2634 }))
2635 .build()
2636 .await
2637 .expect("build");
2638
2639 let mut first = Box::pin(app.send_user_message(UserMessage::text("first")));
2640 let first_event = first.next().await;
2641 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2642
2643 let second_events: Vec<UiEvent> = app
2644 .send_user_message(UserMessage::text("second"))
2645 .collect()
2646 .await;
2647 assert_eq!(
2648 second_events.len(),
2649 1,
2650 "expected immediate single error event, got: {second_events:?}"
2651 );
2652 assert!(matches!(
2653 &second_events[0],
2654 UiEvent::Error(msg) if msg.contains("single-turn-per-App")
2655 ));
2656 }
2657
2658 #[tokio::test]
2659 async fn attachment_error_is_emitted_before_the_single_turn_guard_fires() {
2660 let dir = tempfile::tempdir().unwrap();
2666 let mut cfg = Config::default();
2667 cfg.anthropic.api_key = Some("sk-unused".into());
2668 let app = AppBuilder::new()
2669 .with_config(cfg)
2670 .with_cwd(dir.path())
2671 .with_builtin_tools()
2672 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2673 turn: AtomicUsize::new(0),
2674 }))
2675 .build()
2676 .await
2677 .expect("build");
2678
2679 let mut first =
2681 Box::pin(app.send_user_message(crate::user_message::UserMessage::text("first")));
2682 let first_event = first.next().await;
2683 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2684
2685 let bad = crate::user_message::UserMessage {
2687 text: "second".into(),
2688 attachments: vec![crate::user_message::Attachment::Image {
2689 path: std::path::PathBuf::from("/tmp/this-file-must-not-exist-capo-v06-test.png"),
2690 }],
2691 };
2692 let second_events: Vec<UiEvent> = app.send_user_message(bad).collect().await;
2693
2694 assert_eq!(
2695 second_events.len(),
2696 1,
2697 "expected exactly one event (the attachment error); got: {second_events:?}"
2698 );
2699 assert!(
2700 matches!(
2701 &second_events[0],
2702 UiEvent::AttachmentError {
2703 kind: crate::user_message::AttachmentErrorKind::NotFound,
2704 ..
2705 }
2706 ),
2707 "expected AttachmentError::NotFound as first event; got {second_events:?}"
2708 );
2709 }
2710
2711 struct InfiniteToolLlm;
2712
2713 #[async_trait]
2714 impl LlmClient for InfiniteToolLlm {
2715 async fn chat(
2716 &self,
2717 _messages: &[Message],
2718 _tools: &[ToolDef],
2719 ) -> motosan_agent_loop::Result<ChatOutput> {
2720 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2723 ToolCallItem {
2724 id: "loop".into(),
2725 name: "read".into(),
2726 args: serde_json::json!({"path": "nope.txt"}),
2727 },
2728 ])))
2729 }
2730 }
2731
2732 #[tokio::test]
2733 async fn max_iterations_surfaces_as_notice_with_lifecycle_complete() {
2734 let dir = tempfile::tempdir().unwrap();
2735 let mut cfg = Config::default();
2736 cfg.anthropic.api_key = Some("sk-unused".into());
2737 let app = AppBuilder::new()
2738 .with_config(cfg)
2739 .with_cwd(dir.path())
2740 .with_builtin_tools()
2741 .with_llm(std::sync::Arc::new(InfiniteToolLlm))
2742 .with_max_iterations(3)
2743 .build()
2744 .await
2745 .expect("build");
2746
2747 let events: Vec<UiEvent> =
2748 futures::StreamExt::collect(app.send_user_message(UserMessage::text("loop"))).await;
2749
2750 assert!(
2751 !events.iter().any(|e| matches!(e, UiEvent::Error(_))),
2752 "MaxIterations should surface as Notice, not Error; got: {events:?}"
2753 );
2754
2755 let notice = events.iter().find_map(|e| match e {
2756 UiEvent::Notice { title, body } => Some((title.clone(), body.clone())),
2757 _ => None,
2758 });
2759 let (title, body) =
2760 notice.unwrap_or_else(|| panic!("expected Notice event; got: {events:?}"));
2761 let title_lower = title.to_lowercase();
2762 assert!(
2763 title_lower.contains("stop") || title_lower.contains("iteration"),
2764 "notice title should mention stop/iteration; got title={title:?} body={body:?}"
2765 );
2766 assert!(
2767 body.contains("3"),
2768 "body should reference the per-turn cap (3); got body={body:?}"
2769 );
2770 let body_lower = body.to_lowercase();
2771 assert!(
2772 body_lower.contains("continue") || body_lower.contains("send another"),
2773 "body should hint that user can continue; got body={body:?}"
2774 );
2775
2776 assert!(
2777 events
2778 .iter()
2779 .any(|e| matches!(e, UiEvent::AgentTurnComplete)),
2780 "AgentTurnComplete should fire on the soft-cap path; got: {events:?}"
2781 );
2782 }
2783
2784 #[test]
2785 fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
2786 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2787 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2788
2789 let only = map_event(
2790 AgentEvent::Core(CoreEvent::ToolStarted {
2791 name: "bash".into(),
2792 args: serde_json::json!({}),
2793 }),
2794 &tracker,
2795 );
2796 let only_id = match only {
2797 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2798 other => panic!("{other:?}"),
2799 };
2800 assert_eq!(progress_event_id(&tracker), only_id);
2801
2802 let _second = map_event(
2803 AgentEvent::Core(CoreEvent::ToolStarted {
2804 name: "read".into(),
2805 args: serde_json::json!({}),
2806 }),
2807 &tracker,
2808 );
2809 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2810 }
2811
2812 #[tokio::test]
2813 async fn builder_rejects_builtin_and_custom_tools_together() {
2814 let mut cfg = Config::default();
2815 cfg.anthropic.api_key = Some("sk-unused".into());
2816 let dir = tempfile::tempdir().unwrap();
2817 let err = match AppBuilder::new()
2818 .with_config(cfg)
2819 .with_cwd(dir.path())
2820 .with_builtin_tools()
2821 .with_custom_tools_factory(|_| Vec::new())
2822 .build()
2823 .await
2824 {
2825 Ok(_) => panic!("must reject conflicting tool configuration"),
2826 Err(err) => err,
2827 };
2828
2829 assert!(format!("{err}").contains("mutually exclusive"));
2830 }
2831
2832 #[tokio::test]
2834 async fn two_turns_in_same_session_share_history() {
2835 #[derive(Default)]
2836 struct CounterLlm {
2837 turn: AtomicUsize,
2838 }
2839 #[async_trait]
2840 impl LlmClient for CounterLlm {
2841 async fn chat(
2842 &self,
2843 messages: &[Message],
2844 _tools: &[ToolDef],
2845 ) -> motosan_agent_loop::Result<ChatOutput> {
2846 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2847 let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
2848 Ok(ChatOutput::new(LlmResponse::Message(answer)))
2849 }
2850 }
2851
2852 let tmp = tempfile::tempdir().unwrap();
2853 let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
2854 tmp.path().to_path_buf(),
2855 ));
2856
2857 let app = AppBuilder::new()
2858 .with_settings(crate::settings::Settings::default())
2859 .with_auth(crate::auth::Auth::default())
2860 .with_cwd(tmp.path())
2861 .with_builtin_tools()
2862 .disable_context_discovery()
2863 .with_llm(std::sync::Arc::new(CounterLlm::default()))
2864 .with_session_store(store)
2865 .build_with_session(None)
2866 .await
2867 .expect("build");
2868
2869 let _events1: Vec<UiEvent> = app
2870 .send_user_message(UserMessage::text("hi"))
2871 .collect()
2872 .await;
2873 let events2: Vec<UiEvent> = app
2874 .send_user_message(UserMessage::text("again"))
2875 .collect()
2876 .await;
2877
2878 let saw_more_than_one = events2.iter().any(|e| {
2880 matches!(
2881 e,
2882 UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
2883 )
2884 });
2885 assert!(
2886 saw_more_than_one,
2887 "second turn should have seen history; events: {events2:?}"
2888 );
2889 }
2890}
2891
2892#[cfg(test)]
2893mod skills_builder_tests {
2894 use super::*;
2895 use crate::skills::types::{Skill, SkillSource};
2896 use std::path::PathBuf;
2897
2898 fn fixture() -> Skill {
2899 Skill {
2900 name: "x".into(),
2901 description: "d".into(),
2902 file_path: PathBuf::from("/x.md"),
2903 base_dir: PathBuf::from("/"),
2904 disable_model_invocation: false,
2905 source: SkillSource::Global,
2906 }
2907 }
2908
2909 #[test]
2910 fn with_skills_stores_skills() {
2911 let b = AppBuilder::new().with_skills(vec![fixture()]);
2912 assert_eq!(b.skills.len(), 1);
2913 assert_eq!(b.skills[0].name, "x");
2914 }
2915
2916 #[test]
2917 fn without_skills_clears() {
2918 let b = AppBuilder::new()
2919 .with_skills(vec![fixture()])
2920 .without_skills();
2921 assert!(b.skills.is_empty());
2922 }
2923}
2924
2925#[cfg(test)]
2926mod mcp_builder_tests {
2927 use super::*;
2928 use motosan_agent_tool::Tool;
2929
2930 struct FakeTool;
2932 impl Tool for FakeTool {
2933 fn def(&self) -> motosan_agent_tool::ToolDef {
2934 motosan_agent_tool::ToolDef {
2935 name: "fake__echo".into(),
2936 description: "test".into(),
2937 input_schema: serde_json::json!({"type": "object"}),
2938 }
2939 }
2940 fn call(
2941 &self,
2942 _args: serde_json::Value,
2943 _ctx: &motosan_agent_tool::ToolContext,
2944 ) -> std::pin::Pin<
2945 Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
2946 > {
2947 Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
2948 }
2949 }
2950
2951 #[test]
2952 fn with_extra_tools_stores_tools() {
2953 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
2954 let b = AppBuilder::new().with_extra_tools(tools);
2955 assert_eq!(b.extra_tools.len(), 1);
2956 }
2957}