1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9 AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10 CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate, PromptStrategy};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
27pub struct TurnStatsAccum {
28 pub cumulative_input: u64,
29 pub cumulative_output: u64,
30 pub turn_count: u64,
31}
32
33impl TurnStatsAccum {
34 pub fn add(&mut self, usage: motosan_agent_loop::TokenUsage) {
38 self.cumulative_input = self.cumulative_input.saturating_add(usage.input_tokens);
39 self.cumulative_output = self.cumulative_output.saturating_add(usage.output_tokens);
40 self.turn_count = self.turn_count.saturating_add(1);
41 }
42}
43
44pub(crate) fn extract_thinking_events(messages: &[motosan_agent_loop::Message]) -> Vec<UiEvent> {
52 let mut events = Vec::new();
53 for msg in messages {
54 if let motosan_agent_loop::Message::Assistant { content, .. } = msg {
55 for part in content {
56 if let motosan_agent_loop::AssistantContent::Reasoning { text, .. } = part {
57 events.push(UiEvent::ThinkingComplete { text: text.clone() });
58 }
59 }
60 }
61 }
62 events
63}
64
65pub(crate) fn extract_new_thinking_events(
70 messages: &[motosan_agent_loop::Message],
71 previous_len: usize,
72) -> Vec<UiEvent> {
73 extract_thinking_events(messages.get(previous_len..).unwrap_or(&[]))
74}
75
76#[derive(Debug, Clone)]
78pub(crate) enum SessionMode {
79 New,
81 Resume(String),
83}
84
85struct SharedLlm {
90 client: Arc<dyn LlmClient>,
91}
92
93impl SharedLlm {
94 fn new(client: Arc<dyn LlmClient>) -> Self {
95 Self { client }
96 }
97
98 fn client(&self) -> Arc<dyn LlmClient> {
99 Arc::clone(&self.client)
100 }
101}
102
103pub(crate) struct SessionFactory {
104 cwd: PathBuf,
105 settings: Arc<Mutex<crate::settings::Settings>>,
106 auth: crate::auth::Auth,
107 policy: Arc<crate::permissions::Policy>,
108 session_cache: Arc<crate::permissions::SessionCache>,
109 ui_tx: Option<mpsc::Sender<UiEvent>>,
110 headless_permissions: bool,
111 permission_gate: Arc<dyn PermissionGate>,
112 permission_strategy_handle: Option<Arc<tokio::sync::RwLock<PromptStrategy>>>,
113 progress_tx: mpsc::Sender<ToolProgressChunk>,
116 skills: Arc<Vec<crate::skills::Skill>>,
117 install_builtin_tools: bool,
118 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
119 max_iterations: usize,
120 context_discovery_disabled: bool,
121 autocompact_enabled: bool,
122 session_store: Option<Arc<dyn SessionStore>>,
123 llm_override: Option<Arc<dyn LlmClient>>,
124 current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
125 cancel_token: SharedCancelToken,
126}
127
128impl SessionFactory {
129 fn settings(&self) -> crate::settings::Settings {
130 match self.settings.lock() {
131 Ok(guard) => guard.clone(),
132 Err(poisoned) => poisoned.into_inner().clone(),
133 }
134 }
135
136 fn store_settings(&self, settings: crate::settings::Settings) {
137 match self.settings.lock() {
138 Ok(mut guard) => *guard = settings,
139 Err(poisoned) => *poisoned.into_inner() = settings,
140 }
141 }
142
143 fn current_model(&self) -> Option<crate::model::ModelId> {
144 match self.current_model.lock() {
145 Ok(guard) => guard.clone(),
146 Err(poisoned) => poisoned.into_inner().clone(),
147 }
148 }
149
150 fn set_current_model(&self, model: crate::model::ModelId) {
151 match self.current_model.lock() {
152 Ok(mut guard) => *guard = Some(model),
153 Err(poisoned) => *poisoned.into_inner() = Some(model),
154 }
155 }
156
157 fn clear_current_model(&self) {
158 match self.current_model.lock() {
159 Ok(mut guard) => *guard = None,
160 Err(poisoned) => *poisoned.into_inner() = None,
161 }
162 }
163
164 async fn build(
171 &self,
172 mode: SessionMode,
173 model_override: Option<&crate::model::ModelId>,
174 settings_override: Option<&crate::settings::Settings>,
175 ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
176 let effective_model = model_override.cloned().or_else(|| {
180 if settings_override.is_some() {
181 None
182 } else {
183 self.current_model()
184 }
185 });
186 let mut settings = settings_override
187 .cloned()
188 .unwrap_or_else(|| self.settings());
189 if let Some(m) = &effective_model {
190 settings.model.name = m.as_str().to_string();
191 }
192
193 let llm = if effective_model.is_none() {
194 self.llm_override.as_ref().map_or_else(
195 || build_llm_client(&settings, &self.auth),
196 |llm| Ok(Arc::clone(llm)),
197 )?
198 } else {
199 build_llm_client(&settings, &self.auth)?
200 };
201
202 let tool_ctx = ToolCtx::new_with_cancel_token(
205 &self.cwd,
206 Arc::clone(&self.permission_gate),
207 self.progress_tx.clone(),
208 self.cancel_token.clone(),
209 );
210 let mut tools = if self.install_builtin_tools {
211 builtin_tools(tool_ctx.clone())
212 } else {
213 Vec::new()
214 };
215 tools.extend(self.extra_tools.iter().cloned());
216
217 let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
218 let base_prompt = build_system_prompt(&tool_names, &self.skills);
219 let system_prompt = if self.context_discovery_disabled {
220 base_prompt
221 } else {
222 let agent_dir = crate::paths::agent_dir();
223 let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
224 crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
225 };
226 let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
227
228 let mut engine_builder = Engine::builder()
229 .max_iterations(self.max_iterations)
230 .system_prompt(system_prompt)
231 .tool_context(motosan_tool_context);
232 for tool in tools {
233 engine_builder = engine_builder.tool(tool);
234 }
235 if let Some(ui_tx) = &self.ui_tx {
236 let ext = if let Some(handle) = &self.permission_strategy_handle {
237 crate::permissions::PermissionExtension::with_strategy_handle(
238 Arc::clone(&self.policy),
239 Arc::clone(&self.session_cache),
240 self.cwd.clone(),
241 Some(ui_tx.clone()),
242 Arc::clone(handle),
243 )
244 } else {
245 crate::permissions::PermissionExtension::new(
246 Arc::clone(&self.policy),
247 Arc::clone(&self.session_cache),
248 self.cwd.clone(),
249 ui_tx.clone(),
250 )
251 };
252 engine_builder = engine_builder.extension(Box::new(ext));
253 } else if self.headless_permissions {
254 let ext = if let Some(handle) = &self.permission_strategy_handle {
255 crate::permissions::PermissionExtension::with_strategy_handle(
256 Arc::clone(&self.policy),
257 Arc::clone(&self.session_cache),
258 self.cwd.clone(),
259 None,
260 Arc::clone(handle),
261 )
262 } else {
263 crate::permissions::PermissionExtension::headless(
264 Arc::clone(&self.policy),
265 Arc::clone(&self.session_cache),
266 self.cwd.clone(),
267 )
268 };
269 engine_builder = engine_builder.extension(Box::new(ext));
270 }
271 if self.autocompact_enabled
272 && settings.session.compact_at_context_pct > 0.0
273 && settings.session.compact_at_context_pct < 1.0
274 {
275 let cfg = AutocompactConfig {
276 threshold: settings.session.compact_at_context_pct,
277 max_context_tokens: settings.session.max_context_tokens,
278 keep_turns: settings.session.keep_turns.max(1),
279 };
280 engine_builder = engine_builder
281 .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
282 }
283 let engine = engine_builder.build();
284
285 let session = match (&mode, &self.session_store) {
286 (SessionMode::Resume(id), Some(store)) => {
287 let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
288 .await
289 .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
290 let entries = s
291 .entries()
292 .await
293 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
294 crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
295 s
296 }
297 (SessionMode::Resume(_), None) => {
298 return Err(AppError::Config("resume requires a session store".into()));
299 }
300 (SessionMode::New, Some(store)) => {
301 let id = crate::session::SessionId::new();
302 AgentSession::new_with_store(
303 id.into_string(),
304 Arc::clone(store),
305 engine,
306 Arc::clone(&llm),
307 )
308 }
309 (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
310 };
311
312 Ok((session, llm))
313 }
314}
315
316pub struct App {
317 session: arc_swap::ArcSwap<AgentSession>,
318 llm: arc_swap::ArcSwap<SharedLlm>,
319 factory: SessionFactory,
320 config: Config,
321 cancel_token: SharedCancelToken,
322 progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
323 next_tool_id: Arc<Mutex<ToolCallTracker>>,
324 skills: Arc<Vec<crate::skills::Skill>>,
325 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
326 pub(crate) extension_registry: Arc<crate::extensions::ExtensionRegistry>,
328 pub(crate) token_tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>,
333 extension_diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
334 pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
335 pub(crate) permission_mode: Arc<tokio::sync::RwLock<crate::permissions::PermissionMode>>,
338 pub(crate) permission_strategy_handle: Option<Arc<tokio::sync::RwLock<PromptStrategy>>>,
341 pub(crate) ui_tx_owned: Option<mpsc::Sender<UiEvent>>,
344}
345
346impl App {
347 pub fn config(&self) -> &Config {
348 &self.config
349 }
350
351 pub fn cancel(&self) {
355 self.cancel_token.cancel();
356 }
357
358 pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
362 Arc::clone(&self.session_cache)
363 }
364
365 pub fn extension_registry(&self) -> Arc<crate::extensions::ExtensionRegistry> {
368 Arc::clone(&self.extension_registry)
369 }
370
371 pub fn settings(&self) -> crate::settings::Settings {
373 let mut settings = self.factory.settings();
374 if let Some(model) = self.factory.current_model() {
375 settings.model.name = model.to_string();
376 }
377 settings
378 }
379
380 pub fn token_tally(&self) -> Arc<tokio::sync::Mutex<TurnStatsAccum>> {
382 Arc::clone(&self.token_tally)
383 }
384
385 pub async fn set_permission_mode(&self, mode: crate::permissions::PermissionMode) {
390 let new_strategy = match mode {
391 crate::permissions::PermissionMode::Bypass => PromptStrategy::AllowAll,
392 crate::permissions::PermissionMode::AcceptEdits => PromptStrategy::AcceptEdits,
393 crate::permissions::PermissionMode::Prompt if self.ui_tx_owned.is_none() => {
394 PromptStrategy::HeadlessDeny
395 }
396 crate::permissions::PermissionMode::Prompt => PromptStrategy::Prompt,
397 };
398
399 if let Some(handle) = &self.permission_strategy_handle {
400 *handle.write().await = new_strategy;
401 }
402
403 *self.permission_mode.write().await = mode;
404
405 if let Some(ui_tx) = &self.ui_tx_owned {
406 let _ = ui_tx.send(UiEvent::PermissionModeChanged { mode }).await;
407 }
408 }
409
410 pub async fn emit_settings_snapshot(&self) {
414 if let Some(ui_tx) = &self.ui_tx_owned {
415 let _ = ui_tx
416 .send(UiEvent::SettingsSnapshot {
417 settings: self.settings(),
418 })
419 .await;
420 }
421 }
422
423 pub async fn permission_mode(&self) -> crate::permissions::PermissionMode {
425 *self.permission_mode.read().await
426 }
427
428 pub fn extension_diagnostics(&self) -> Arc<Vec<crate::extensions::ExtensionDiagnostic>> {
430 Arc::clone(&self.extension_diagnostics)
431 }
432
433 pub fn session_id(&self) -> String {
436 self.session.load().session_id().to_string()
437 }
438
439 pub async fn session_history(
448 &self,
449 ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
450 self.session.load_full().history().await
451 }
452
453 pub async fn compact(&self) -> Result<()> {
459 use motosan_agent_loop::ThresholdStrategy;
460 let strategy = ThresholdStrategy {
461 threshold: 0.0,
462 ..ThresholdStrategy::default()
463 };
464 let llm = self.llm.load_full().client();
465 self.session
466 .load_full()
467 .maybe_compact(&strategy, llm)
468 .await
469 .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
470 Ok(())
471 }
472
473 pub async fn new_session(&self) -> Result<()> {
477 self.fire_session_before_switch("new", None).await?;
478 let (session, llm) = self.factory.build(SessionMode::New, None, None).await?;
479 self.session.store(Arc::new(session));
480 self.llm.store(Arc::new(SharedLlm::new(llm)));
481 self.reset_token_tally().await;
482 Ok(())
483 }
484
485 pub async fn load_session(&self, id: &str) -> Result<()> {
491 self.fire_session_before_switch("load", Some(id)).await?;
492 self.load_session_without_hook(id).await
493 }
494
495 async fn load_session_without_hook(&self, id: &str) -> Result<()> {
496 let (session, llm) = self
497 .factory
498 .build(SessionMode::Resume(id.to_string()), None, None)
499 .await?;
500 self.session.store(Arc::new(session));
501 self.llm.store(Arc::new(SharedLlm::new(llm)));
502 self.reset_token_tally().await;
503 Ok(())
504 }
505
506 async fn reset_token_tally(&self) {
507 let mut tally = self.token_tally.lock().await;
508 *tally = TurnStatsAccum::default();
509 }
510
511 pub async fn clone_session(&self) -> Result<String> {
520 self.fire_session_before_switch("clone", None).await?;
521 let Some(store) = self.factory.session_store.as_ref() else {
522 return Err(AppError::Config("clone requires a session store".into()));
523 };
524 let source_id = self.session.load().session_id().to_string();
525 let new_id = crate::session::SessionId::new().into_string();
526 let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
527 catalog
528 .fork(&source_id, &new_id)
529 .await
530 .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
531 self.load_session_without_hook(&new_id).await?;
532 Ok(new_id)
533 }
534
535 pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
550 self.fire_session_before_switch("model_switch", None)
551 .await?;
552 let current_id = self.session.load().session_id().to_string();
553 let (session, llm) = self
554 .factory
555 .build(SessionMode::Resume(current_id), Some(model), None)
556 .await?;
557 self.factory.set_current_model(model.clone());
558 self.session.store(Arc::new(session));
559 self.llm.store(Arc::new(SharedLlm::new(llm)));
560 Ok(())
561 }
562
563 pub async fn reload_settings(&self, new_settings: crate::settings::Settings) -> Result<()> {
579 let current_id = self.session.load().session_id().to_string();
583 let (session, llm) = self
584 .factory
585 .build(SessionMode::Resume(current_id), None, Some(&new_settings))
586 .await?;
587 self.factory.store_settings(new_settings);
588 self.factory.clear_current_model();
589 self.session.store(Arc::new(session));
590 self.llm.store(Arc::new(SharedLlm::new(llm)));
591
592 self.reset_token_tally().await;
595
596 Ok(())
597 }
598
599 pub async fn disconnect_mcp(&self) {
602 for (name, server) in &self.mcp_servers {
603 let _ =
604 tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
605 tracing::debug!(target: "mcp", server = %name, "disconnected");
606 }
607 }
608
609 fn run_turn(
610 &self,
611 msg: crate::user_message::UserMessage,
612 fork_from: Option<motosan_agent_loop::EntryId>,
613 ) -> impl Stream<Item = UiEvent> + Send + 'static {
614 let session = self.session.load_full();
615 let skills = Arc::clone(&self.skills);
616 let cancel_token = self.cancel_token.clone();
617 let tracker = Arc::clone(&self.next_tool_id);
618 let progress = Arc::clone(&self.progress_rx);
619 let token_tally = Arc::clone(&self.token_tally);
620 let settings_model_name = self
621 .factory
622 .current_model()
623 .map(|model| model.to_string())
624 .unwrap_or_else(|| self.factory.settings().model.name);
625
626 async_stream::stream! {
627 let new_user = {
631 let expanded_text = crate::skills::expand::expand_skill_command(&msg.text, &skills);
635 let expanded_msg = crate::user_message::UserMessage {
636 text: expanded_text,
637 attachments: msg.attachments.clone(),
638 };
639 match crate::user_message::prepare_user_message(&expanded_msg) {
640 Ok(m) => m,
641 Err(err) => {
642 yield UiEvent::AttachmentError {
643 kind: err.kind(),
644 message: err.to_string(),
645 };
646 return;
647 }
648 }
649 };
650
651 let mut progress_guard = match progress.try_lock() {
653 Ok(guard) => guard,
654 Err(_) => {
655 yield UiEvent::Error(
656 "another turn is already running; capo is single-turn-per-App".into(),
657 );
658 return;
659 }
660 };
661
662 let cancel = cancel_token.reset();
664
665 yield UiEvent::AgentTurnStarted;
666 yield UiEvent::AgentThinking;
667
668 let handle = match fork_from {
670 None => {
671 let history = match session.history().await {
673 Ok(h) => h,
674 Err(err) => {
675 yield UiEvent::Error(format!("session.history failed: {err}"));
676 return;
677 }
678 };
679 let mut messages = history;
680 messages.push(new_user);
681 match session.start_turn(messages).await {
682 Ok(h) => h,
683 Err(err) => {
684 yield UiEvent::Error(format!("session.start_turn failed: {err}"));
685 return;
686 }
687 }
688 }
689 Some(from) => {
690 match session.fork_turn(from, vec![new_user]).await {
692 Ok(h) => h,
693 Err(err) => {
694 yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
695 return;
696 }
697 }
698 }
699 };
700 let previous_len = handle.previous_len;
701 let epoch = handle.epoch;
702 let branch_parent = handle.branch_parent;
703 let ops_tx = handle.ops_tx.clone();
704 let mut agent_stream = handle.stream;
705
706 let interrupt_bridge = tokio::spawn(async move {
714 cancel.cancelled().await;
715 let _ = ops_tx.send(AgentOp::Interrupt).await;
716 });
717
718 let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
720 let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
721
722 loop {
723 while let Ok(chunk) = progress_guard.try_recv() {
725 yield UiEvent::ToolCallProgress {
726 id: progress_event_id(&tracker),
727 chunk: ProgressChunk::from(chunk),
728 };
729 }
730
731 tokio::select! {
732 biased;
733 maybe_item = agent_stream.next() => {
734 match maybe_item {
735 Some(AgentStreamItem::Event(ev)) => {
736 if let Some(ui) = map_event(ev, &tracker) {
737 yield ui;
738 }
739 }
740 Some(AgentStreamItem::Terminal(term)) => {
741 terminal_result = Some(term.result);
742 terminal_messages = Some(term.messages);
743 break;
744 }
745 None => break,
746 }
747 }
748 Some(chunk) = progress_guard.recv() => {
749 yield UiEvent::ToolCallProgress {
750 id: progress_event_id(&tracker),
751 chunk: ProgressChunk::from(chunk),
752 };
753 }
754 }
755 }
756
757 interrupt_bridge.abort();
759
760 if let Some(msgs) = terminal_messages.as_ref() {
762 if let Err(err) = session
763 .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
764 .await
765 {
766 yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
767 }
768 }
769
770 match terminal_result {
772 Some(Ok(result)) => {
773 if let Some(msgs) = terminal_messages.as_ref() {
776 for ev in extract_new_thinking_events(msgs, previous_len) {
777 yield ev;
778 }
779 }
780
781 let final_text = terminal_messages
782 .as_ref()
783 .and_then(|msgs| {
784 msgs.iter()
785 .rev()
786 .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
787 .map(|m| m.text())
788 })
789 .unwrap_or_default();
790 if !final_text.is_empty() {
791 yield UiEvent::AgentMessageComplete(final_text);
792 }
793 let usage = result.usage;
796 let (cumulative_input, cumulative_output) = {
797 let mut tally = token_tally.lock().await;
798 tally.add(usage);
799 (tally.cumulative_input, tally.cumulative_output)
800 };
801 yield UiEvent::TurnStats {
802 input_tokens: usage.input_tokens,
803 output_tokens: usage.output_tokens,
804 cumulative_input,
805 cumulative_output,
806 model: settings_model_name.clone(),
807 };
808 while let Ok(chunk) = progress_guard.try_recv() {
810 yield UiEvent::ToolCallProgress {
811 id: progress_event_id(&tracker),
812 chunk: ProgressChunk::from(chunk),
813 };
814 }
815 yield UiEvent::AgentTurnComplete;
816 }
817 Some(Err(err)) => {
818 yield UiEvent::Error(format!("{err}"));
819 }
820 None => { }
821 }
822 }
823 }
824
825 pub fn send_user_message(
826 &self,
827 msg: crate::user_message::UserMessage,
828 ) -> impl Stream<Item = UiEvent> + Send + 'static {
829 self.run_turn(msg, None)
830 }
831
832 pub fn fork_from(
837 &self,
838 from: motosan_agent_loop::EntryId,
839 message: crate::user_message::UserMessage,
840 ) -> impl Stream<Item = UiEvent> + Send + 'static {
841 let registry = Arc::clone(&self.extension_registry);
842 let inner = self.run_turn(message, Some(from));
843 async_stream::stream! {
844 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
845 match dispatch_session_before_switch(®istry, "fork", None).await {
846 HookOutcome::Continue => {}
847 HookOutcome::Cancelled { extension_name, reason } => {
848 let msg = match reason {
849 Some(r) => format!("extension `{extension_name}` cancelled fork: {r}"),
850 None => format!("extension `{extension_name}` cancelled fork"),
851 };
852 yield UiEvent::Error(msg);
853 return;
854 }
855 }
856 let mut inner = Box::pin(inner);
857 while let Some(ev) = futures::StreamExt::next(&mut inner).await {
858 yield ev;
859 }
860 }
861 }
862
863 pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
867 let entries = self
868 .session
869 .load_full()
870 .entries()
871 .await
872 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
873 let branch = motosan_agent_loop::active_branch(&entries);
874 let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
875 .iter()
876 .filter_map(|stored| {
877 let msg = stored.entry.as_message()?;
878 if !matches!(msg.role(), motosan_agent_loop::Role::User) {
879 return None;
880 }
881 let preview: String = msg
882 .text()
883 .lines()
884 .next()
885 .unwrap_or("")
886 .chars()
887 .take(80)
888 .collect();
889 Some((stored.id.clone(), preview))
890 })
891 .collect();
892 out.reverse();
893 Ok(out)
894 }
895
896 pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
898 self.session
899 .load_full()
900 .branches()
901 .await
902 .map_err(|e| AppError::Config(format!("branches failed: {e}")))
903 }
904
905 async fn fire_session_before_switch(
908 &self,
909 reason: &str,
910 session_id: Option<&str>,
911 ) -> Result<()> {
912 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
913 match dispatch_session_before_switch(&self.extension_registry, reason, session_id).await {
914 HookOutcome::Continue => Ok(()),
915 HookOutcome::Cancelled {
916 extension_name,
917 reason,
918 } => Err(AppError::HookCancelled {
919 extension_name,
920 reason,
921 }),
922 }
923 }
924}
925
926#[derive(Debug, Default)]
927struct ToolCallTracker {
928 next_id: usize,
929 pending: VecDeque<(String, String)>,
930}
931
932impl ToolCallTracker {
933 fn start(&mut self, name: &str) -> String {
934 self.next_id += 1;
935 let id = format!("tool_{}", self.next_id);
936 self.pending.push_back((name.to_string(), id.clone()));
937 id
938 }
939
940 fn complete(&mut self, name: &str) -> String {
941 if let Some(pos) = self
942 .pending
943 .iter()
944 .position(|(pending_name, _)| pending_name == name)
945 {
946 if let Some((_, id)) = self.pending.remove(pos) {
947 return id;
948 }
949 }
950
951 self.next_id += 1;
952 format!("tool_{}", self.next_id)
953 }
954
955 fn progress_id(&self) -> Option<String> {
960 match self.pending.len() {
961 1 => self.pending.front().map(|(_, id)| id.clone()),
962 _ => None,
963 }
964 }
965}
966
967fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
968 match tracker.lock() {
969 Ok(guard) => guard,
970 Err(poisoned) => poisoned.into_inner(),
971 }
972}
973
974fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
975 lock_tool_tracker(tracker)
976 .progress_id()
977 .unwrap_or_else(|| "tool_unknown".to_string())
978}
979
980fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
981where
982 F: Fn(&str) -> Option<String>,
983{
984 env_lookup("ANTHROPIC_API_KEY")
985 .map(|key| key.trim().to_string())
986 .filter(|key| !key.is_empty())
987 .or_else(|| auth.api_key("anthropic").map(str::to_string))
988}
989
990fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
991 match ev {
992 AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
993 AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
994 let id = lock_tool_tracker(tool_tracker).start(&name);
995 Some(UiEvent::ToolCallStarted {
996 id,
997 name,
998 args: serde_json::json!({}),
999 })
1000 }
1001 AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
1002 let id = lock_tool_tracker(tool_tracker).complete(&name);
1003 Some(UiEvent::ToolCallCompleted {
1004 id,
1005 result: UiToolResult {
1006 is_error: result.is_error,
1007 text: format!("{name}: {result:?}"),
1008 },
1009 })
1010 }
1011 _ => None,
1012 }
1013}
1014
1015type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
1016
1017pub struct AppBuilder {
1018 config: Option<Config>,
1019 cwd: Option<PathBuf>,
1020 permission_gate: Option<Arc<dyn PermissionGate>>,
1021 install_builtin_tools: bool,
1022 max_iterations: usize,
1023 llm_override: Option<Arc<dyn LlmClient>>,
1024 custom_tools_factory: Option<CustomToolsFactory>,
1025 permissions_policy_path: Option<PathBuf>,
1026 ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
1027 headless_permissions: bool,
1028 settings: Option<crate::settings::Settings>,
1029 auth: Option<crate::auth::Auth>,
1030 context_discovery_disabled: bool,
1031 session_store: Option<Arc<dyn SessionStore>>,
1033 resume_session_id: Option<crate::session::SessionId>,
1034 autocompact_enabled: bool,
1035 skills: Vec<crate::skills::Skill>,
1037 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
1039 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1040 extension_registry: Option<Arc<crate::extensions::ExtensionRegistry>>,
1041 extension_diagnostics: Option<Arc<Vec<crate::extensions::ExtensionDiagnostic>>>,
1042 token_tally: Option<Arc<tokio::sync::Mutex<TurnStatsAccum>>>,
1043}
1044
1045impl Default for AppBuilder {
1046 fn default() -> Self {
1047 Self {
1048 config: None,
1049 cwd: None,
1050 permission_gate: None,
1051 install_builtin_tools: false,
1052 max_iterations: 20,
1053 llm_override: None,
1054 custom_tools_factory: None,
1055 permissions_policy_path: None,
1056 ui_tx: None,
1057 headless_permissions: false,
1058 settings: None,
1059 auth: None,
1060 context_discovery_disabled: false,
1061 session_store: None,
1062 resume_session_id: None,
1063 autocompact_enabled: false,
1064 skills: Vec::new(),
1065 extra_tools: Vec::new(),
1066 mcp_servers: Vec::new(),
1067 extension_registry: None,
1068 extension_diagnostics: None,
1069 token_tally: None,
1070 }
1071 }
1072}
1073
1074impl AppBuilder {
1075 pub fn new() -> Self {
1076 Self::default()
1077 }
1078
1079 pub fn with_config(mut self, cfg: Config) -> Self {
1080 self.config = Some(cfg);
1081 self
1082 }
1083
1084 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
1085 self.cwd = Some(cwd.into());
1086 self
1087 }
1088
1089 pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
1090 self.permission_gate = Some(gate);
1091 self
1092 }
1093
1094 pub fn with_builtin_tools(mut self) -> Self {
1100 self.install_builtin_tools = true;
1101 self
1102 }
1103
1104 pub fn with_max_iterations(mut self, n: usize) -> Self {
1105 self.max_iterations = n;
1106 self
1107 }
1108
1109 pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
1110 self.llm_override = Some(llm);
1111 self
1112 }
1113
1114 pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
1115 self.permissions_policy_path = Some(path);
1116 self
1117 }
1118
1119 pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
1120 self.ui_tx = Some(tx);
1121 self
1122 }
1123
1124 pub fn with_token_tally(mut self, tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>) -> Self {
1126 self.token_tally = Some(tally);
1127 self
1128 }
1129
1130 pub fn with_headless_permissions(mut self) -> Self {
1135 self.headless_permissions = true;
1136 self
1137 }
1138
1139 pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
1141 self.settings = Some(settings);
1142 self
1143 }
1144
1145 pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
1147 self.auth = Some(auth);
1148 self
1149 }
1150
1151 pub fn disable_context_discovery(mut self) -> Self {
1154 self.context_discovery_disabled = true;
1155 self
1156 }
1157
1158 pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
1161 self.session_store = Some(store);
1162 self
1163 }
1164
1165 pub fn with_autocompact(mut self) -> Self {
1170 self.autocompact_enabled = true;
1171 self
1172 }
1173
1174 pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
1180 self.skills = skills;
1181 self
1182 }
1183
1184 pub fn without_skills(mut self) -> Self {
1185 self.skills.clear();
1186 self
1187 }
1188
1189 pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
1193 self.extra_tools = tools;
1194 self
1195 }
1196
1197 pub fn with_extension_registry(
1198 mut self,
1199 registry: Arc<crate::extensions::ExtensionRegistry>,
1200 ) -> Self {
1201 self.extension_registry = Some(registry);
1202 self
1203 }
1204
1205 pub fn with_extension_diagnostics(
1206 mut self,
1207 diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
1208 ) -> Self {
1209 self.extension_diagnostics = Some(diagnostics);
1210 self
1211 }
1212
1213 pub fn with_mcp_servers(
1216 mut self,
1217 servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1218 ) -> Self {
1219 self.mcp_servers = servers;
1220 self
1221 }
1222
1223 pub fn with_custom_tools_factory(
1228 mut self,
1229 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1230 ) -> Self {
1231 self.custom_tools_factory = Some(Box::new(factory));
1232 self
1233 }
1234
1235 pub async fn build_with_custom_tools(
1239 self,
1240 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1241 ) -> Result<App> {
1242 self.with_custom_tools_factory(factory).build().await
1243 }
1244
1245 pub async fn build_with_session(
1253 mut self,
1254 resume: Option<crate::session::SessionId>,
1255 ) -> Result<App> {
1256 if let Some(id) = resume {
1257 if self.session_store.is_none() {
1258 return Err(AppError::Config(
1259 "build_with_session(Some(id)) requires with_session_store(...)".into(),
1260 ));
1261 }
1262 self.resume_session_id = Some(id);
1263 }
1264 self.build_internal().await
1265 }
1266
1267 pub async fn build(self) -> Result<App> {
1269 self.build_with_session(None).await
1270 }
1271
1272 async fn build_internal(mut self) -> Result<App> {
1273 let mcp_servers = std::mem::take(&mut self.mcp_servers);
1274 let extra_tools = std::mem::take(&mut self.extra_tools);
1275 let skills = self.skills.clone();
1276 if self.install_builtin_tools && self.custom_tools_factory.is_some() {
1277 return Err(AppError::Config(
1278 "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
1279 ));
1280 }
1281
1282 let has_config = self.config.is_some();
1286 let has_auth = self.auth.is_some();
1287 let mut config = self.config.unwrap_or_default();
1288 let settings = match self.settings {
1289 Some(settings) => settings,
1290 None => {
1291 let mut settings = crate::settings::Settings::default();
1292 settings.model.provider = config.model.provider.clone();
1293 settings.model.name = config.model.name.clone();
1294 settings.model.max_tokens = config.model.max_tokens;
1295 settings
1296 }
1297 };
1298 config.model.provider = settings.model.provider.clone();
1299 config.model.name = settings.model.name.clone();
1300 config.model.max_tokens = settings.model.max_tokens;
1301 let mut auth = self.auth.unwrap_or_default();
1302 if !has_auth {
1303 if let Some(key) = config.anthropic.api_key.as_deref() {
1304 auth.0.insert(
1305 "anthropic".into(),
1306 crate::auth::ProviderAuth::ApiKey {
1307 key: key.to_string(),
1308 },
1309 );
1310 }
1311 }
1312 let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
1313 if env_or_auth_key.is_some() || has_auth || !has_config {
1314 config.anthropic.api_key = env_or_auth_key;
1315 }
1316 let cwd = self
1317 .cwd
1318 .or_else(|| std::env::current_dir().ok())
1319 .unwrap_or_else(|| PathBuf::from("."));
1320 let agent_dir = crate::paths::agent_dir();
1321 let permission_gate = self.permission_gate.unwrap_or_else(|| {
1322 if self.ui_tx.is_some() || self.headless_permissions {
1326 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1327 } else {
1328 tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
1329 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1330 }
1331 });
1332
1333 let policy: Arc<crate::permissions::Policy> =
1335 Arc::new(match self.permissions_policy_path.as_ref() {
1336 Some(path) => crate::permissions::Policy::load_or_default(path)?,
1337 None => crate::permissions::Policy::default(),
1338 });
1339 let session_cache = Arc::new(crate::permissions::SessionCache::new());
1340 let permission_strategy_handle = if self.ui_tx.is_some() || self.headless_permissions {
1341 let initial_strategy = if self.ui_tx.is_some() {
1342 PromptStrategy::Prompt
1343 } else {
1344 PromptStrategy::HeadlessDeny
1345 };
1346 Some(Arc::new(tokio::sync::RwLock::new(initial_strategy)))
1347 } else {
1348 None
1349 };
1350 let permission_mode = Arc::new(tokio::sync::RwLock::new(
1351 crate::permissions::PermissionMode::default(),
1352 ));
1353 let token_tally = self
1354 .token_tally
1355 .unwrap_or_else(|| Arc::new(tokio::sync::Mutex::new(TurnStatsAccum::default())));
1356
1357 let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
1359
1360 let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
1365 let cancel_token = probe_ctx.cancel_token.clone();
1366 let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
1367 if let Some(factory_fn) = self.custom_tools_factory.take() {
1368 let mut t = factory_fn(probe_ctx);
1369 t.extend(extra_tools.clone());
1370 (false, t)
1371 } else {
1372 (self.install_builtin_tools, extra_tools.clone())
1373 };
1374
1375 let factory = SessionFactory {
1376 cwd: cwd.clone(),
1377 settings: Arc::new(Mutex::new(settings.clone())),
1378 auth: auth.clone(),
1379 policy: Arc::clone(&policy),
1380 session_cache: Arc::clone(&session_cache),
1381 ui_tx: self.ui_tx.clone(),
1382 headless_permissions: self.headless_permissions,
1383 permission_gate: Arc::clone(&permission_gate),
1384 permission_strategy_handle: permission_strategy_handle.clone(),
1385 progress_tx: progress_tx.clone(),
1386 skills: Arc::new(skills.clone()),
1387 install_builtin_tools: install_builtin,
1388 extra_tools: factory_extra_tools,
1389 max_iterations: self.max_iterations,
1390 context_discovery_disabled: self.context_discovery_disabled,
1391 autocompact_enabled: self.autocompact_enabled,
1392 session_store: self.session_store.clone(),
1393 llm_override: self.llm_override.clone(),
1394 current_model: Arc::new(Mutex::new(None)),
1395 cancel_token: cancel_token.clone(),
1396 };
1397
1398 let mode = match self.resume_session_id.take() {
1399 Some(id) => SessionMode::Resume(id.into_string()),
1400 None => SessionMode::New,
1401 };
1402 let (session, llm) = factory.build(mode, None, None).await?;
1403 let (extension_registry, extension_diagnostics) =
1404 if let Some(reg) = self.extension_registry.take() {
1405 let diagnostics = self
1406 .extension_diagnostics
1407 .unwrap_or_else(|| Arc::new(Vec::new()));
1408 (reg, diagnostics)
1409 } else {
1410 let manifest_path = crate::paths::extensions_manifest_path(&agent_dir);
1411 let (registry, diagnostics) =
1412 crate::extensions::load_extensions_manifest(&manifest_path).await;
1413 for d in &diagnostics {
1414 let message = d.message.as_str();
1415 match d.severity {
1416 crate::extensions::DiagnosticSeverity::Warn => {
1417 tracing::warn!("extensions: {message}");
1418 }
1419 crate::extensions::DiagnosticSeverity::Error => {
1420 tracing::error!("extensions: {message}");
1421 }
1422 }
1423 }
1424 (Arc::new(registry), Arc::new(diagnostics))
1425 };
1426
1427 Ok(App {
1428 session: arc_swap::ArcSwap::from_pointee(session),
1429 llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1430 factory,
1431 config,
1432 cancel_token,
1433 progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1434 next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1435 skills: Arc::new(skills),
1436 mcp_servers,
1437 extension_registry,
1438 token_tally,
1439 extension_diagnostics,
1440 session_cache,
1441 permission_mode,
1442 permission_strategy_handle,
1443 ui_tx_owned: self.ui_tx.clone(),
1444 })
1445 }
1446}
1447
1448#[cfg(test)]
1449mod tests {
1450 use super::*;
1451 use crate::config::{AnthropicConfig, ModelConfig};
1452 use crate::events::UiEvent;
1453 use crate::user_message::UserMessage;
1454 use async_trait::async_trait;
1455 use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1456 use motosan_agent_tool::ToolDef;
1457 use std::sync::atomic::{AtomicUsize, Ordering};
1458
1459 #[test]
1460 fn turn_stats_accum_accumulates_across_calls() {
1461 let mut accum = TurnStatsAccum::default();
1462 accum.add(motosan_agent_loop::TokenUsage {
1463 input_tokens: 100,
1464 output_tokens: 25,
1465 });
1466 accum.add(motosan_agent_loop::TokenUsage {
1467 input_tokens: 1000,
1468 output_tokens: 250,
1469 });
1470 assert_eq!(accum.cumulative_input, 1100);
1471 assert_eq!(accum.cumulative_output, 275);
1472 assert_eq!(accum.turn_count, 2);
1473 }
1474
1475 #[test]
1476 fn turn_stats_accum_saturates_on_overflow() {
1477 let mut accum = TurnStatsAccum {
1478 cumulative_input: u64::MAX - 5,
1479 cumulative_output: 0,
1480 turn_count: 1,
1481 };
1482 accum.add(motosan_agent_loop::TokenUsage {
1483 input_tokens: 100,
1484 output_tokens: 0,
1485 });
1486 assert_eq!(accum.cumulative_input, u64::MAX);
1487 }
1488
1489 #[test]
1490 fn extract_thinking_events_extracts_reasoning_in_source_order() {
1491 use motosan_agent_loop::{new_message_id, AssistantContent, MessageMeta};
1492 let messages = vec![Message::Assistant {
1493 id: new_message_id(),
1494 meta: MessageMeta::default(),
1495 content: vec![
1496 AssistantContent::Reasoning {
1497 text: "first thought".into(),
1498 signature: None,
1499 },
1500 AssistantContent::Text {
1501 text: "answer 1".into(),
1502 },
1503 AssistantContent::Reasoning {
1504 text: "second thought".into(),
1505 signature: None,
1506 },
1507 ],
1508 }];
1509 let events = extract_thinking_events(&messages);
1510 assert_eq!(events.len(), 2);
1511 match (&events[0], &events[1]) {
1512 (UiEvent::ThinkingComplete { text: t1 }, UiEvent::ThinkingComplete { text: t2 }) => {
1513 assert_eq!(t1, "first thought");
1514 assert_eq!(t2, "second thought");
1515 }
1516 _ => panic!("expected two ThinkingComplete events"),
1517 }
1518 }
1519
1520 #[test]
1521 fn extract_new_thinking_events_skips_historical_reasoning_before_previous_len() {
1522 use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1523 let messages = vec![
1524 Message::User {
1525 id: new_message_id(),
1526 meta: MessageMeta::default(),
1527 content: vec![ContentPart::text("old question")],
1528 },
1529 Message::Assistant {
1530 id: new_message_id(),
1531 meta: MessageMeta::default(),
1532 content: vec![AssistantContent::Reasoning {
1533 text: "old thought".into(),
1534 signature: None,
1535 }],
1536 },
1537 Message::User {
1538 id: new_message_id(),
1539 meta: MessageMeta::default(),
1540 content: vec![ContentPart::text("new question")],
1541 },
1542 Message::Assistant {
1543 id: new_message_id(),
1544 meta: MessageMeta::default(),
1545 content: vec![AssistantContent::Reasoning {
1546 text: "new thought".into(),
1547 signature: None,
1548 }],
1549 },
1550 ];
1551
1552 let events = extract_new_thinking_events(&messages, 2);
1553 assert_eq!(events.len(), 1, "should only emit current-turn reasoning");
1554 match &events[0] {
1555 UiEvent::ThinkingComplete { text } => assert_eq!(text, "new thought"),
1556 other => panic!("expected ThinkingComplete; got {other:?}"),
1557 }
1558 }
1559
1560 #[test]
1561 fn extract_thinking_events_skips_non_assistant_and_non_reasoning() {
1562 use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1563 let messages = vec![
1564 Message::User {
1565 id: new_message_id(),
1566 meta: MessageMeta::default(),
1567 content: vec![ContentPart::text("question?")],
1568 },
1569 Message::Assistant {
1570 id: new_message_id(),
1571 meta: MessageMeta::default(),
1572 content: vec![AssistantContent::Text {
1573 text: "answer".into(),
1574 }],
1575 },
1576 ];
1577 let events = extract_thinking_events(&messages);
1578 assert!(events.is_empty(), "expected no events; got {events:?}");
1579 }
1580
1581 #[tokio::test]
1582 async fn builder_fails_without_api_key() {
1583 let cfg = Config {
1584 anthropic: AnthropicConfig {
1585 api_key: None,
1586 base_url: "https://api.anthropic.com".into(),
1587 },
1588 model: ModelConfig {
1589 provider: "anthropic".into(),
1590 name: "claude-sonnet-4-6".into(),
1591 max_tokens: 4096,
1592 },
1593 };
1594 let err = match AppBuilder::new()
1595 .with_config(cfg)
1596 .with_builtin_tools()
1597 .build()
1598 .await
1599 {
1600 Ok(_) => panic!("must fail without key"),
1601 Err(err) => err,
1602 };
1603 assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1604 }
1605
1606 struct ToolOnlyLlm {
1607 turn: AtomicUsize,
1608 }
1609
1610 #[async_trait]
1611 impl LlmClient for ToolOnlyLlm {
1612 async fn chat(
1613 &self,
1614 _messages: &[Message],
1615 _tools: &[ToolDef],
1616 ) -> motosan_agent_loop::Result<ChatOutput> {
1617 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1618 if turn == 0 {
1619 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1620 ToolCallItem {
1621 id: "t1".into(),
1622 name: "read".into(),
1623 args: serde_json::json!({"path":"nope.txt"}),
1624 },
1625 ])))
1626 } else {
1627 Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1628 }
1629 }
1630 }
1631
1632 #[tokio::test]
1633 async fn empty_final_message_is_not_emitted() {
1634 let dir = tempfile::tempdir().unwrap();
1635 let mut cfg = Config::default();
1636 cfg.anthropic.api_key = Some("sk-unused".into());
1637 let app = AppBuilder::new()
1638 .with_config(cfg)
1639 .with_cwd(dir.path())
1640 .with_builtin_tools()
1641 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1642 turn: AtomicUsize::new(0),
1643 }))
1644 .build()
1645 .await
1646 .expect("build");
1647 let events: Vec<UiEvent> =
1648 futures::StreamExt::collect(app.send_user_message(UserMessage::text("x"))).await;
1649 let empties = events
1650 .iter()
1651 .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1652 .count();
1653 assert_eq!(
1654 empties, 0,
1655 "should not emit empty final message, got: {events:?}"
1656 );
1657 }
1658
1659 struct EchoLlm;
1660
1661 struct UsageLlm {
1662 responses: std::sync::Mutex<VecDeque<ChatOutput>>,
1663 }
1664
1665 #[async_trait]
1666 impl LlmClient for UsageLlm {
1667 async fn chat(
1668 &self,
1669 _messages: &[Message],
1670 _tools: &[ToolDef],
1671 ) -> motosan_agent_loop::Result<ChatOutput> {
1672 let next = match self.responses.lock() {
1673 Ok(mut responses) => responses.pop_front(),
1674 Err(poisoned) => poisoned.into_inner().pop_front(),
1675 };
1676 Ok(next.unwrap_or_else(|| panic!("UsageLlm script exhausted")))
1677 }
1678 }
1679
1680 #[async_trait]
1681 impl LlmClient for EchoLlm {
1682 async fn chat(
1683 &self,
1684 _messages: &[Message],
1685 _tools: &[ToolDef],
1686 ) -> motosan_agent_loop::Result<ChatOutput> {
1687 Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1688 }
1689 }
1690
1691 #[tokio::test]
1692 async fn turn_stats_emitted_with_cumulative_after_each_turn() {
1693 use motosan_agent_loop::{LlmResponse, TokenUsage};
1694
1695 let dir = tempfile::tempdir().expect("tempdir");
1696 let mut cfg = Config::default();
1697 cfg.anthropic.api_key = Some("sk-unused".into());
1698 let llm = Arc::new(UsageLlm {
1699 responses: std::sync::Mutex::new(VecDeque::from([
1700 ChatOutput::with_usage(
1701 LlmResponse::Message("first".into()),
1702 TokenUsage {
1703 input_tokens: 100,
1704 output_tokens: 50,
1705 },
1706 ),
1707 ChatOutput::with_usage(
1708 LlmResponse::Message("second".into()),
1709 TokenUsage {
1710 input_tokens: 200,
1711 output_tokens: 80,
1712 },
1713 ),
1714 ])),
1715 });
1716 let app = AppBuilder::new()
1717 .with_config(cfg)
1718 .with_cwd(dir.path())
1719 .with_builtin_tools()
1720 .with_llm(llm)
1721 .build()
1722 .await
1723 .expect("build");
1724
1725 let events_1: Vec<UiEvent> = app
1726 .send_user_message(UserMessage::text("hi"))
1727 .collect()
1728 .await;
1729 let events_2: Vec<UiEvent> = app
1730 .send_user_message(UserMessage::text("again"))
1731 .collect()
1732 .await;
1733
1734 let ts1 = events_1
1735 .iter()
1736 .find_map(|e| match e {
1737 UiEvent::TurnStats {
1738 input_tokens,
1739 output_tokens,
1740 cumulative_input,
1741 cumulative_output,
1742 ..
1743 } => Some((
1744 *input_tokens,
1745 *output_tokens,
1746 *cumulative_input,
1747 *cumulative_output,
1748 )),
1749 _ => None,
1750 })
1751 .expect("turn 1 had no TurnStats");
1752 assert_eq!(ts1, (100, 50, 100, 50), "turn 1 stats");
1753
1754 let ts2 = events_2
1755 .iter()
1756 .find_map(|e| match e {
1757 UiEvent::TurnStats {
1758 input_tokens,
1759 output_tokens,
1760 cumulative_input,
1761 cumulative_output,
1762 ..
1763 } => Some((
1764 *input_tokens,
1765 *output_tokens,
1766 *cumulative_input,
1767 *cumulative_output,
1768 )),
1769 _ => None,
1770 })
1771 .expect("turn 2 had no TurnStats");
1772 assert_eq!(ts2, (200, 80, 300, 130), "turn 2 stats");
1773
1774 let positions: Vec<&str> = events_1
1775 .iter()
1776 .filter_map(|e| match e {
1777 UiEvent::AgentMessageComplete(_) => Some("msg_complete"),
1778 UiEvent::TurnStats { .. } => Some("stats"),
1779 UiEvent::AgentTurnComplete => Some("turn_complete"),
1780 _ => None,
1781 })
1782 .collect();
1783 assert_eq!(
1784 positions,
1785 vec!["msg_complete", "stats", "turn_complete"],
1786 "wrong ordering"
1787 );
1788 }
1789
1790 #[tokio::test]
1791 async fn turn_stats_reset_after_new_session() {
1792 use motosan_agent_loop::{LlmResponse, TokenUsage};
1793
1794 let dir = tempfile::tempdir().expect("tempdir");
1795 let mut cfg = Config::default();
1796 cfg.anthropic.api_key = Some("sk-unused".into());
1797 let llm = Arc::new(UsageLlm {
1798 responses: std::sync::Mutex::new(VecDeque::from([
1799 ChatOutput::with_usage(
1800 LlmResponse::Message("first".into()),
1801 TokenUsage {
1802 input_tokens: 100,
1803 output_tokens: 50,
1804 },
1805 ),
1806 ChatOutput::with_usage(
1807 LlmResponse::Message("after-new".into()),
1808 TokenUsage {
1809 input_tokens: 7,
1810 output_tokens: 3,
1811 },
1812 ),
1813 ])),
1814 });
1815 let app = AppBuilder::new()
1816 .with_config(cfg)
1817 .with_cwd(dir.path())
1818 .with_builtin_tools()
1819 .with_llm(llm)
1820 .build()
1821 .await
1822 .expect("build");
1823
1824 let _: Vec<UiEvent> = app
1825 .send_user_message(UserMessage::text("hi"))
1826 .collect()
1827 .await;
1828 app.new_session().await.expect("new session");
1829 let events: Vec<UiEvent> = app
1830 .send_user_message(UserMessage::text("after new"))
1831 .collect()
1832 .await;
1833 let stats = events
1834 .iter()
1835 .find_map(|event| match event {
1836 UiEvent::TurnStats {
1837 input_tokens,
1838 output_tokens,
1839 cumulative_input,
1840 cumulative_output,
1841 ..
1842 } => Some((
1843 *input_tokens,
1844 *output_tokens,
1845 *cumulative_input,
1846 *cumulative_output,
1847 )),
1848 _ => None,
1849 })
1850 .expect("turn had no TurnStats");
1851 assert_eq!(stats, (7, 3, 7, 3));
1852 }
1853
1854 #[tokio::test]
1855 async fn with_headless_permissions_builds_an_app() {
1856 let dir = tempfile::tempdir().expect("tempdir");
1857 let mut config = Config::default();
1858 config.anthropic.api_key = Some("sk-unused".into());
1859 let app = AppBuilder::new()
1860 .with_config(config)
1861 .with_cwd(dir.path())
1862 .with_builtin_tools()
1863 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1864 .with_headless_permissions()
1865 .build()
1866 .await
1867 .expect("build");
1868 assert!(!app.session_id().is_empty());
1870 }
1871
1872 #[tokio::test]
1873 async fn new_session_swaps_in_a_fresh_empty_session() {
1874 use futures::StreamExt;
1875 let dir = tempfile::tempdir().expect("tempdir");
1876 let mut config = Config::default();
1877 config.anthropic.api_key = Some("sk-unused".into());
1878 let app = AppBuilder::new()
1879 .with_config(config)
1880 .with_cwd(dir.path())
1881 .with_builtin_tools()
1882 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1883 .build()
1884 .await
1885 .expect("build");
1886
1887 let _: Vec<_> = app
1888 .send_user_message(UserMessage::text("hello"))
1889 .collect()
1890 .await;
1891 let id_before = app.session_id();
1892 assert!(!app.session_history().await.expect("history").is_empty());
1893
1894 app.new_session().await.expect("new_session");
1895
1896 assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1897 assert!(
1898 app.session_history().await.expect("history").is_empty(),
1899 "fresh session has no history"
1900 );
1901 }
1902
1903 #[tokio::test]
1904 async fn load_session_restores_a_stored_session_by_id() {
1905 use futures::StreamExt;
1906 let dir = tempfile::tempdir().expect("tempdir");
1907 let store_dir = dir.path().join("sessions");
1908 let store: Arc<dyn SessionStore> =
1909 Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1910 let mut config = Config::default();
1911 config.anthropic.api_key = Some("sk-unused".into());
1912 let app = AppBuilder::new()
1913 .with_config(config)
1914 .with_cwd(dir.path())
1915 .with_builtin_tools()
1916 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1917 .with_session_store(Arc::clone(&store))
1918 .build()
1919 .await
1920 .expect("build");
1921
1922 let _: Vec<_> = app
1923 .send_user_message(UserMessage::text("remember this"))
1924 .collect()
1925 .await;
1926 let original_id = app.session_id();
1927
1928 app.new_session().await.expect("new_session");
1929 assert_ne!(app.session_id(), original_id);
1930
1931 app.load_session(&original_id).await.expect("load_session");
1932 assert_eq!(app.session_id(), original_id);
1933 let history = app.session_history().await.expect("history");
1934 assert!(
1935 history.iter().any(|m| m.text().contains("remember this")),
1936 "loaded session should carry the original turn"
1937 );
1938 }
1939
1940 #[tokio::test]
1941 async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
1942 use futures::StreamExt;
1943 let dir = tempfile::tempdir().expect("tempdir");
1944 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1945 dir.path().join("s"),
1946 ));
1947 let mut config = Config::default();
1948 config.anthropic.api_key = Some("sk-unused".into());
1949 let app = AppBuilder::new()
1950 .with_config(config)
1951 .with_cwd(dir.path())
1952 .with_builtin_tools()
1953 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1954 .with_session_store(store)
1955 .build()
1956 .await
1957 .expect("build");
1958
1959 let _: Vec<_> = app
1960 .send_user_message(UserMessage::text("hello"))
1961 .collect()
1962 .await;
1963 let original_id = app.session_id();
1964
1965 let new_id = app.clone_session().await.expect("clone_session");
1966
1967 assert_ne!(new_id, original_id);
1969 assert_eq!(app.session_id(), new_id);
1970 let history = app.session_history().await.expect("history");
1972 assert!(history.iter().any(|m| m.text().contains("hello")));
1973 }
1974
1975 #[tokio::test]
1976 async fn fork_from_creates_a_branch_off_an_earlier_entry() {
1977 use futures::StreamExt;
1978 let dir = tempfile::tempdir().expect("tempdir");
1979 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1980 dir.path().join("s"),
1981 ));
1982 let mut config = Config::default();
1983 config.anthropic.api_key = Some("sk-unused".into());
1984 let app = AppBuilder::new()
1985 .with_config(config)
1986 .with_cwd(dir.path())
1987 .with_builtin_tools()
1988 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1989 .with_session_store(store)
1990 .build()
1991 .await
1992 .expect("build");
1993
1994 let _: Vec<_> = app
1996 .send_user_message(UserMessage::text("first"))
1997 .collect()
1998 .await;
1999 let _: Vec<_> = app
2000 .send_user_message(UserMessage::text("second"))
2001 .collect()
2002 .await;
2003
2004 let entries = app.session.load_full().entries().await.expect("entries");
2006 let first_id = entries
2007 .iter()
2008 .find_map(|stored| {
2009 let msg = stored.entry.as_message()?;
2010 (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
2011 .then(|| stored.id.clone())
2012 })
2013 .expect("first user message present");
2014
2015 let _: Vec<_> = app
2017 .fork_from(first_id, UserMessage::text("branched"))
2018 .collect()
2019 .await;
2020
2021 let history = app.session_history().await.expect("history");
2022 let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
2023 assert!(
2024 texts.iter().any(|t| t.contains("first")),
2025 "fork keeps the fork-point ancestor"
2026 );
2027 assert!(
2028 texts.iter().any(|t| t.contains("branched")),
2029 "fork includes the new message"
2030 );
2031 assert!(
2032 !texts.iter().any(|t| t.contains("second")),
2033 "fork excludes the abandoned branch"
2034 );
2035 }
2036
2037 #[tokio::test]
2038 async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
2039 use futures::StreamExt;
2040 let dir = tempfile::tempdir().expect("tempdir");
2041 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2042 dir.path().join("s"),
2043 ));
2044 let mut config = Config::default();
2045 config.anthropic.api_key = Some("sk-unused".into());
2046 let app = AppBuilder::new()
2047 .with_config(config)
2048 .with_cwd(dir.path())
2049 .with_builtin_tools()
2050 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2051 .with_session_store(store)
2052 .build()
2053 .await
2054 .expect("build");
2055
2056 let _: Vec<_> = app
2057 .send_user_message(UserMessage::text("alpha"))
2058 .collect()
2059 .await;
2060 let _: Vec<_> = app
2061 .send_user_message(UserMessage::text("bravo"))
2062 .collect()
2063 .await;
2064
2065 let candidates = app.fork_candidates().await.expect("candidates");
2066 let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
2067 assert!(previews[0].contains("bravo"), "got {previews:?}");
2069 assert!(previews.iter().any(|p| p.contains("alpha")));
2070 assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
2072 }
2073
2074 #[tokio::test]
2075 async fn branches_returns_a_tree_for_a_linear_session() {
2076 use futures::StreamExt;
2077 let dir = tempfile::tempdir().expect("tempdir");
2078 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2079 dir.path().join("s"),
2080 ));
2081 let mut config = Config::default();
2082 config.anthropic.api_key = Some("sk-unused".into());
2083 let app = AppBuilder::new()
2084 .with_config(config)
2085 .with_cwd(dir.path())
2086 .with_builtin_tools()
2087 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2088 .with_session_store(store)
2089 .build()
2090 .await
2091 .expect("build");
2092
2093 let _: Vec<_> = app
2094 .send_user_message(UserMessage::text("hello"))
2095 .collect()
2096 .await;
2097 let tree = app.branches().await.expect("branches");
2098 assert!(!tree.nodes.is_empty());
2100 assert!(tree.active_leaf.is_some());
2101 }
2102
2103 #[tokio::test]
2104 async fn reload_settings_rebuilds_session_and_resets_token_tally() {
2105 let dir = tempfile::tempdir().expect("tempdir");
2106 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2107 dir.path().join("s"),
2108 ));
2109 let mut cfg = Config::default();
2110 cfg.anthropic.api_key = Some("sk-unused".into());
2111
2112 let llm = Arc::new(UsageLlm {
2113 responses: std::sync::Mutex::new(VecDeque::from([
2114 ChatOutput::with_usage(
2115 LlmResponse::Message("first".into()),
2116 motosan_agent_loop::TokenUsage {
2117 input_tokens: 100,
2118 output_tokens: 50,
2119 },
2120 ),
2121 ChatOutput::with_usage(
2122 LlmResponse::Message("after-reload".into()),
2123 motosan_agent_loop::TokenUsage {
2124 input_tokens: 5,
2125 output_tokens: 2,
2126 },
2127 ),
2128 ])),
2129 });
2130 let app = AppBuilder::new()
2131 .with_config(cfg)
2132 .with_cwd(dir.path())
2133 .with_builtin_tools()
2134 .with_llm(llm)
2135 .with_session_store(store)
2136 .build()
2137 .await
2138 .expect("build");
2139
2140 let _: Vec<UiEvent> = app
2141 .send_user_message(crate::user_message::UserMessage::text("hi"))
2142 .collect()
2143 .await;
2144 {
2145 let token_tally = app.token_tally();
2146 let tally = token_tally.lock().await;
2147 assert_eq!(tally.cumulative_input, 100);
2148 assert_eq!(tally.cumulative_output, 50);
2149 assert_eq!(tally.turn_count, 1);
2150 }
2151
2152 let mut new_settings = crate::settings::Settings::default();
2153 new_settings.model.name = "claude-opus-4-7".into();
2154 new_settings.ui.footer_show_cost = false;
2155 app.reload_settings(new_settings).await.expect("reload");
2156 assert_eq!(app.factory.settings().model.name, "claude-opus-4-7");
2157 assert!(!app.factory.settings().ui.footer_show_cost);
2158 assert_eq!(app.factory.current_model(), None);
2159
2160 {
2161 let token_tally = app.token_tally();
2162 let tally = token_tally.lock().await;
2163 assert_eq!(tally.cumulative_input, 0, "tally not reset on reload");
2164 assert_eq!(tally.cumulative_output, 0);
2165 assert_eq!(tally.turn_count, 0);
2166 }
2167
2168 let _: Vec<UiEvent> = app
2169 .send_user_message(crate::user_message::UserMessage::text("after"))
2170 .collect()
2171 .await;
2172 {
2173 let token_tally = app.token_tally();
2174 let tally = token_tally.lock().await;
2175 assert_eq!(tally.cumulative_input, 5);
2176 assert_eq!(tally.cumulative_output, 2);
2177 assert_eq!(tally.turn_count, 1);
2178 }
2179 }
2180
2181 #[tokio::test]
2182 async fn switch_model_preserves_history() {
2183 use futures::StreamExt;
2184 let dir = tempfile::tempdir().expect("tempdir");
2185 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2186 dir.path().join("s"),
2187 ));
2188 let mut config = Config::default();
2189 config.anthropic.api_key = Some("sk-unused".into());
2190 let app = AppBuilder::new()
2191 .with_config(config)
2192 .with_cwd(dir.path())
2193 .with_builtin_tools()
2194 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2195 .with_session_store(store)
2196 .build()
2197 .await
2198 .expect("build");
2199
2200 let _: Vec<_> = app
2201 .send_user_message(UserMessage::text("keep me"))
2202 .collect()
2203 .await;
2204 let id_before = app.session_id();
2205
2206 app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
2207 .await
2208 .expect("switch_model");
2209
2210 assert_eq!(
2211 app.session_id(),
2212 id_before,
2213 "switch_model keeps the same session"
2214 );
2215 let history = app.session_history().await.expect("history");
2216 assert!(history.iter().any(|m| m.text().contains("keep me")));
2217 }
2218
2219 #[tokio::test]
2220 async fn switch_model_is_sticky_for_future_session_rebuilds() {
2221 let dir = tempfile::tempdir().expect("tempdir");
2222 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2223 dir.path().join("s"),
2224 ));
2225 let mut config = Config::default();
2226 config.anthropic.api_key = Some("sk-unused".into());
2227 let app = AppBuilder::new()
2228 .with_config(config)
2229 .with_cwd(dir.path())
2230 .with_builtin_tools()
2231 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2232 .with_session_store(store)
2233 .build()
2234 .await
2235 .expect("build");
2236
2237 let selected = crate::model::ModelId::from("claude-opus-4-7");
2238 app.switch_model(&selected).await.expect("switch_model");
2239 app.new_session().await.expect("new_session");
2240
2241 assert_eq!(app.factory.current_model(), Some(selected.clone()));
2242 assert_eq!(app.settings().model.name, selected.to_string());
2243 }
2244
2245 struct SleepThenDoneLlm {
2246 turn: AtomicUsize,
2247 }
2248
2249 #[async_trait]
2250 impl LlmClient for SleepThenDoneLlm {
2251 async fn chat(
2252 &self,
2253 _messages: &[Message],
2254 _tools: &[ToolDef],
2255 ) -> motosan_agent_loop::Result<ChatOutput> {
2256 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2257 if turn == 0 {
2258 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2259 ToolCallItem {
2260 id: "sleep".into(),
2261 name: "bash".into(),
2262 args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
2263 },
2264 ])))
2265 } else {
2266 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2267 }
2268 }
2269 }
2270
2271 #[tokio::test]
2272 async fn cancel_reaches_builtin_tools_after_session_rebuild() {
2273 use futures::StreamExt;
2274 let dir = tempfile::tempdir().expect("tempdir");
2275 let mut config = Config::default();
2276 config.anthropic.api_key = Some("sk-unused".into());
2277 let app = Arc::new(
2278 AppBuilder::new()
2279 .with_config(config)
2280 .with_cwd(dir.path())
2281 .with_builtin_tools()
2282 .with_llm(Arc::new(SleepThenDoneLlm {
2283 turn: AtomicUsize::new(0),
2284 }) as Arc<dyn LlmClient>)
2285 .build()
2286 .await
2287 .expect("build"),
2288 );
2289
2290 app.new_session().await.expect("new_session");
2291 let running_app = Arc::clone(&app);
2292 let handle = tokio::spawn(async move {
2293 running_app
2294 .send_user_message(UserMessage::text("run a slow command"))
2295 .collect::<Vec<_>>()
2296 .await
2297 });
2298 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2299 app.cancel();
2300
2301 let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
2302 .await
2303 .expect("turn should finish after cancellation")
2304 .expect("join");
2305 assert!(
2306 events.iter().any(|event| {
2307 matches!(
2308 event,
2309 UiEvent::ToolCallCompleted { result, .. }
2310 if result.text.contains("command cancelled by user")
2311 )
2312 }),
2313 "cancel should reach the rebuilt bash tool: {events:?}"
2314 );
2315 }
2316
2317 #[tokio::test]
2318 async fn compact_summarizes_a_session_with_enough_history() {
2319 struct DoneLlm;
2320 #[async_trait]
2321 impl LlmClient for DoneLlm {
2322 async fn chat(
2323 &self,
2324 _messages: &[Message],
2325 _tools: &[ToolDef],
2326 ) -> motosan_agent_loop::Result<ChatOutput> {
2327 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2328 }
2329 }
2330
2331 let dir = tempfile::tempdir().expect("tempdir");
2332 let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
2333 dir.path().join("sessions"),
2334 ));
2335 let mut config = Config::default();
2336 config.anthropic.api_key = Some("sk-unused".into());
2337 let app = AppBuilder::new()
2338 .with_config(config)
2339 .with_cwd(dir.path())
2340 .with_builtin_tools()
2341 .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
2342 .with_session_store(store)
2343 .build()
2344 .await
2345 .expect("build");
2346
2347 for i in 0..4 {
2354 let _: Vec<_> = app
2355 .send_user_message(UserMessage::text(format!("turn {i}")))
2356 .collect()
2357 .await;
2358 }
2359
2360 app.compact().await.expect("compact should succeed");
2361
2362 let history = app.session_history().await.expect("history");
2365 assert!(
2366 !history.is_empty(),
2367 "session should still have content post-compaction"
2368 );
2369 }
2370
2371 #[test]
2372 fn anthropic_env_api_key_overrides_auth_json_key() {
2373 let mut auth = crate::auth::Auth::default();
2374 auth.0.insert(
2375 "anthropic".into(),
2376 crate::auth::ProviderAuth::ApiKey {
2377 key: "sk-auth".into(),
2378 },
2379 );
2380
2381 let key = anthropic_api_key_from(&auth, |name| {
2382 (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
2383 });
2384 assert_eq!(key.as_deref(), Some("sk-env"));
2385 }
2386
2387 #[tokio::test]
2388 async fn with_settings_overrides_deprecated_config_model() {
2389 use crate::settings::Settings;
2390
2391 let mut config = Config::default();
2392 config.model.name = "from-config".into();
2393 config.anthropic.api_key = Some("sk-config".into());
2394
2395 let mut settings = Settings::default();
2396 settings.model.name = "from-settings".into();
2397
2398 let tmp = tempfile::tempdir().unwrap();
2399 let app = AppBuilder::new()
2400 .with_config(config)
2401 .with_settings(settings)
2402 .with_cwd(tmp.path())
2403 .disable_context_discovery()
2404 .with_llm(Arc::new(EchoLlm))
2405 .build()
2406 .await
2407 .expect("build");
2408 assert_eq!(app.config().model.name, "from-settings");
2409 assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
2410 }
2411
2412 #[tokio::test]
2413 async fn with_settings_synthesises_legacy_config_for_build() {
2414 use crate::auth::{Auth, ProviderAuth};
2415 use crate::settings::Settings;
2416
2417 let mut settings = Settings::default();
2418 settings.model.name = "claude-sonnet-4-6".into();
2419
2420 let mut auth = Auth::default();
2421 auth.0.insert(
2422 "anthropic".into(),
2423 ProviderAuth::ApiKey {
2424 key: "sk-test".into(),
2425 },
2426 );
2427
2428 let tmp = tempfile::tempdir().unwrap();
2429 let app = AppBuilder::new()
2430 .with_settings(settings)
2431 .with_auth(auth)
2432 .with_cwd(tmp.path())
2433 .with_builtin_tools()
2434 .disable_context_discovery()
2435 .with_llm(Arc::new(EchoLlm))
2436 .build()
2437 .await
2438 .expect("build");
2439 let _ = app;
2440 }
2441
2442 #[tokio::test]
2443 async fn cancel_before_turn_does_not_poison_future_turns() {
2444 let dir = tempfile::tempdir().unwrap();
2445 let mut cfg = Config::default();
2446 cfg.anthropic.api_key = Some("sk-unused".into());
2447 let app = AppBuilder::new()
2448 .with_config(cfg)
2449 .with_cwd(dir.path())
2450 .with_builtin_tools()
2451 .with_llm(std::sync::Arc::new(EchoLlm))
2452 .build()
2453 .await
2454 .expect("build");
2455
2456 app.cancel();
2457 let events: Vec<UiEvent> = app
2458 .send_user_message(UserMessage::text("x"))
2459 .collect()
2460 .await;
2461
2462 assert!(
2463 events
2464 .iter()
2465 .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
2466 "turn should use a fresh cancellation token: {events:?}"
2467 );
2468 }
2469
2470 #[test]
2471 fn map_event_matches_started_and_completed_ids_by_tool_name() {
2472 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2473
2474 let started_bash = map_event(
2475 AgentEvent::Core(CoreEvent::ToolStarted {
2476 name: "bash".into(),
2477 }),
2478 &tracker,
2479 );
2480 let started_read = map_event(
2481 AgentEvent::Core(CoreEvent::ToolStarted {
2482 name: "read".into(),
2483 }),
2484 &tracker,
2485 );
2486 let completed_bash = map_event(
2487 AgentEvent::Core(CoreEvent::ToolCompleted {
2488 name: "bash".into(),
2489 result: motosan_agent_tool::ToolResult::text("ok"),
2490 }),
2491 &tracker,
2492 );
2493 let completed_read = map_event(
2494 AgentEvent::Core(CoreEvent::ToolCompleted {
2495 name: "read".into(),
2496 result: motosan_agent_tool::ToolResult::text("ok"),
2497 }),
2498 &tracker,
2499 );
2500
2501 assert!(matches!(
2502 started_bash,
2503 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
2504 ));
2505 assert!(matches!(
2506 started_read,
2507 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
2508 ));
2509 assert!(matches!(
2510 completed_bash,
2511 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
2512 ));
2513 assert!(matches!(
2514 completed_read,
2515 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
2516 ));
2517 }
2518
2519 #[test]
2520 fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
2521 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2522 let s1 = map_event(
2523 AgentEvent::Core(CoreEvent::ToolStarted {
2524 name: "bash".into(),
2525 }),
2526 &tracker,
2527 );
2528 let s2 = map_event(
2529 AgentEvent::Core(CoreEvent::ToolStarted {
2530 name: "bash".into(),
2531 }),
2532 &tracker,
2533 );
2534 let c1 = map_event(
2535 AgentEvent::Core(CoreEvent::ToolCompleted {
2536 name: "bash".into(),
2537 result: motosan_agent_tool::ToolResult::text("a"),
2538 }),
2539 &tracker,
2540 );
2541 let c2 = map_event(
2542 AgentEvent::Core(CoreEvent::ToolCompleted {
2543 name: "bash".into(),
2544 result: motosan_agent_tool::ToolResult::text("b"),
2545 }),
2546 &tracker,
2547 );
2548
2549 let id_s1 = match s1 {
2550 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2551 other => panic!("{other:?}"),
2552 };
2553 let id_s2 = match s2 {
2554 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2555 other => panic!("{other:?}"),
2556 };
2557 let id_c1 = match c1 {
2558 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2559 other => panic!("{other:?}"),
2560 };
2561 let id_c2 = match c2 {
2562 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2563 other => panic!("{other:?}"),
2564 };
2565
2566 assert_eq!(id_s1, id_c1);
2567 assert_eq!(id_s2, id_c2);
2568 assert_ne!(id_s1, id_s2);
2569 }
2570
2571 #[tokio::test]
2572 async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
2573 let dir = tempfile::tempdir().unwrap();
2574 let mut cfg = Config::default();
2575 cfg.anthropic.api_key = Some("sk-unused".into());
2576 let app = AppBuilder::new()
2577 .with_config(cfg)
2578 .with_cwd(dir.path())
2579 .with_builtin_tools()
2580 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2581 turn: AtomicUsize::new(0),
2582 }))
2583 .build()
2584 .await
2585 .expect("build");
2586
2587 let mut first = Box::pin(app.send_user_message(UserMessage::text("first")));
2588 let first_event = first.next().await;
2589 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2590
2591 let second_events: Vec<UiEvent> = app
2592 .send_user_message(UserMessage::text("second"))
2593 .collect()
2594 .await;
2595 assert_eq!(
2596 second_events.len(),
2597 1,
2598 "expected immediate single error event, got: {second_events:?}"
2599 );
2600 assert!(matches!(
2601 &second_events[0],
2602 UiEvent::Error(msg) if msg.contains("single-turn-per-App")
2603 ));
2604 }
2605
2606 #[tokio::test]
2607 async fn attachment_error_is_emitted_before_the_single_turn_guard_fires() {
2608 let dir = tempfile::tempdir().unwrap();
2614 let mut cfg = Config::default();
2615 cfg.anthropic.api_key = Some("sk-unused".into());
2616 let app = AppBuilder::new()
2617 .with_config(cfg)
2618 .with_cwd(dir.path())
2619 .with_builtin_tools()
2620 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2621 turn: AtomicUsize::new(0),
2622 }))
2623 .build()
2624 .await
2625 .expect("build");
2626
2627 let mut first =
2629 Box::pin(app.send_user_message(crate::user_message::UserMessage::text("first")));
2630 let first_event = first.next().await;
2631 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2632
2633 let bad = crate::user_message::UserMessage {
2635 text: "second".into(),
2636 attachments: vec![crate::user_message::Attachment::Image {
2637 path: std::path::PathBuf::from("/tmp/this-file-must-not-exist-capo-v06-test.png"),
2638 }],
2639 };
2640 let second_events: Vec<UiEvent> = app.send_user_message(bad).collect().await;
2641
2642 assert_eq!(
2643 second_events.len(),
2644 1,
2645 "expected exactly one event (the attachment error); got: {second_events:?}"
2646 );
2647 assert!(
2648 matches!(
2649 &second_events[0],
2650 UiEvent::AttachmentError {
2651 kind: crate::user_message::AttachmentErrorKind::NotFound,
2652 ..
2653 }
2654 ),
2655 "expected AttachmentError::NotFound as first event; got {second_events:?}"
2656 );
2657 }
2658
2659 #[test]
2660 fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
2661 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2662 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2663
2664 let only = map_event(
2665 AgentEvent::Core(CoreEvent::ToolStarted {
2666 name: "bash".into(),
2667 }),
2668 &tracker,
2669 );
2670 let only_id = match only {
2671 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2672 other => panic!("{other:?}"),
2673 };
2674 assert_eq!(progress_event_id(&tracker), only_id);
2675
2676 let _second = map_event(
2677 AgentEvent::Core(CoreEvent::ToolStarted {
2678 name: "read".into(),
2679 }),
2680 &tracker,
2681 );
2682 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2683 }
2684
2685 #[tokio::test]
2686 async fn builder_rejects_builtin_and_custom_tools_together() {
2687 let mut cfg = Config::default();
2688 cfg.anthropic.api_key = Some("sk-unused".into());
2689 let dir = tempfile::tempdir().unwrap();
2690 let err = match AppBuilder::new()
2691 .with_config(cfg)
2692 .with_cwd(dir.path())
2693 .with_builtin_tools()
2694 .with_custom_tools_factory(|_| Vec::new())
2695 .build()
2696 .await
2697 {
2698 Ok(_) => panic!("must reject conflicting tool configuration"),
2699 Err(err) => err,
2700 };
2701
2702 assert!(format!("{err}").contains("mutually exclusive"));
2703 }
2704
2705 #[tokio::test]
2707 async fn two_turns_in_same_session_share_history() {
2708 #[derive(Default)]
2709 struct CounterLlm {
2710 turn: AtomicUsize,
2711 }
2712 #[async_trait]
2713 impl LlmClient for CounterLlm {
2714 async fn chat(
2715 &self,
2716 messages: &[Message],
2717 _tools: &[ToolDef],
2718 ) -> motosan_agent_loop::Result<ChatOutput> {
2719 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2720 let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
2721 Ok(ChatOutput::new(LlmResponse::Message(answer)))
2722 }
2723 }
2724
2725 let tmp = tempfile::tempdir().unwrap();
2726 let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
2727 tmp.path().to_path_buf(),
2728 ));
2729
2730 let app = AppBuilder::new()
2731 .with_settings(crate::settings::Settings::default())
2732 .with_auth(crate::auth::Auth::default())
2733 .with_cwd(tmp.path())
2734 .with_builtin_tools()
2735 .disable_context_discovery()
2736 .with_llm(std::sync::Arc::new(CounterLlm::default()))
2737 .with_session_store(store)
2738 .build_with_session(None)
2739 .await
2740 .expect("build");
2741
2742 let _events1: Vec<UiEvent> = app
2743 .send_user_message(UserMessage::text("hi"))
2744 .collect()
2745 .await;
2746 let events2: Vec<UiEvent> = app
2747 .send_user_message(UserMessage::text("again"))
2748 .collect()
2749 .await;
2750
2751 let saw_more_than_one = events2.iter().any(|e| {
2753 matches!(
2754 e,
2755 UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
2756 )
2757 });
2758 assert!(
2759 saw_more_than_one,
2760 "second turn should have seen history; events: {events2:?}"
2761 );
2762 }
2763}
2764
2765#[cfg(test)]
2766mod skills_builder_tests {
2767 use super::*;
2768 use crate::skills::types::{Skill, SkillSource};
2769 use std::path::PathBuf;
2770
2771 fn fixture() -> Skill {
2772 Skill {
2773 name: "x".into(),
2774 description: "d".into(),
2775 file_path: PathBuf::from("/x.md"),
2776 base_dir: PathBuf::from("/"),
2777 disable_model_invocation: false,
2778 source: SkillSource::Global,
2779 }
2780 }
2781
2782 #[test]
2783 fn with_skills_stores_skills() {
2784 let b = AppBuilder::new().with_skills(vec![fixture()]);
2785 assert_eq!(b.skills.len(), 1);
2786 assert_eq!(b.skills[0].name, "x");
2787 }
2788
2789 #[test]
2790 fn without_skills_clears() {
2791 let b = AppBuilder::new()
2792 .with_skills(vec![fixture()])
2793 .without_skills();
2794 assert!(b.skills.is_empty());
2795 }
2796}
2797
2798#[cfg(test)]
2799mod mcp_builder_tests {
2800 use super::*;
2801 use motosan_agent_tool::Tool;
2802
2803 struct FakeTool;
2805 impl Tool for FakeTool {
2806 fn def(&self) -> motosan_agent_tool::ToolDef {
2807 motosan_agent_tool::ToolDef {
2808 name: "fake__echo".into(),
2809 description: "test".into(),
2810 input_schema: serde_json::json!({"type": "object"}),
2811 }
2812 }
2813 fn call(
2814 &self,
2815 _args: serde_json::Value,
2816 _ctx: &motosan_agent_tool::ToolContext,
2817 ) -> std::pin::Pin<
2818 Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
2819 > {
2820 Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
2821 }
2822 }
2823
2824 #[test]
2825 fn with_extra_tools_stores_tools() {
2826 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
2827 let b = AppBuilder::new().with_extra_tools(tools);
2828 assert_eq!(b.extra_tools.len(), 1);
2829 }
2830}