1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9 AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10 CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate, PromptStrategy};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
27pub struct TurnStatsAccum {
28 pub cumulative_input: u64,
29 pub cumulative_output: u64,
30 pub turn_count: u64,
31}
32
33impl TurnStatsAccum {
34 pub fn add(&mut self, usage: motosan_agent_loop::TokenUsage) {
38 self.cumulative_input = self.cumulative_input.saturating_add(usage.input_tokens);
39 self.cumulative_output = self.cumulative_output.saturating_add(usage.output_tokens);
40 self.turn_count = self.turn_count.saturating_add(1);
41 }
42}
43
44pub(crate) fn extract_thinking_events(messages: &[motosan_agent_loop::Message]) -> Vec<UiEvent> {
52 let mut events = Vec::new();
53 for msg in messages {
54 if let motosan_agent_loop::Message::Assistant { content, .. } = msg {
55 for part in content {
56 if let motosan_agent_loop::AssistantContent::Reasoning { text, .. } = part {
57 events.push(UiEvent::ThinkingComplete { text: text.clone() });
58 }
59 }
60 }
61 }
62 events
63}
64
65pub(crate) fn extract_new_thinking_events(
70 messages: &[motosan_agent_loop::Message],
71 previous_len: usize,
72) -> Vec<UiEvent> {
73 extract_thinking_events(messages.get(previous_len..).unwrap_or(&[]))
74}
75
76#[derive(Debug, Clone)]
78pub(crate) enum SessionMode {
79 New,
81 Resume(String),
83}
84
85struct SharedLlm {
90 client: Arc<dyn LlmClient>,
91}
92
93impl SharedLlm {
94 fn new(client: Arc<dyn LlmClient>) -> Self {
95 Self { client }
96 }
97
98 fn client(&self) -> Arc<dyn LlmClient> {
99 Arc::clone(&self.client)
100 }
101}
102
103pub(crate) struct SessionFactory {
104 cwd: PathBuf,
105 settings: Arc<Mutex<crate::settings::Settings>>,
106 auth: crate::auth::Auth,
107 policy: Arc<crate::permissions::Policy>,
108 session_cache: Arc<crate::permissions::SessionCache>,
109 ui_tx: Option<mpsc::Sender<UiEvent>>,
110 headless_permissions: bool,
111 permission_gate: Arc<dyn PermissionGate>,
112 permission_strategy_handle: Option<Arc<tokio::sync::RwLock<PromptStrategy>>>,
113 progress_tx: mpsc::Sender<ToolProgressChunk>,
116 skills: Arc<Vec<crate::skills::Skill>>,
117 install_builtin_tools: bool,
118 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
119 max_iterations: usize,
120 context_discovery_disabled: bool,
121 autocompact_enabled: bool,
122 session_store: Option<Arc<dyn SessionStore>>,
123 llm_override: Option<Arc<dyn LlmClient>>,
124 current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
125 cancel_token: SharedCancelToken,
126}
127
128impl SessionFactory {
129 fn settings(&self) -> crate::settings::Settings {
130 match self.settings.lock() {
131 Ok(guard) => guard.clone(),
132 Err(poisoned) => poisoned.into_inner().clone(),
133 }
134 }
135
136 fn store_settings(&self, settings: crate::settings::Settings) {
137 match self.settings.lock() {
138 Ok(mut guard) => *guard = settings,
139 Err(poisoned) => *poisoned.into_inner() = settings,
140 }
141 }
142
143 fn current_model(&self) -> Option<crate::model::ModelId> {
144 match self.current_model.lock() {
145 Ok(guard) => guard.clone(),
146 Err(poisoned) => poisoned.into_inner().clone(),
147 }
148 }
149
150 fn set_current_model(&self, model: crate::model::ModelId) {
151 match self.current_model.lock() {
152 Ok(mut guard) => *guard = Some(model),
153 Err(poisoned) => *poisoned.into_inner() = Some(model),
154 }
155 }
156
157 fn clear_current_model(&self) {
158 match self.current_model.lock() {
159 Ok(mut guard) => *guard = None,
160 Err(poisoned) => *poisoned.into_inner() = None,
161 }
162 }
163
164 async fn build(
171 &self,
172 mode: SessionMode,
173 model_override: Option<&crate::model::ModelId>,
174 settings_override: Option<&crate::settings::Settings>,
175 ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
176 let effective_model = model_override.cloned().or_else(|| {
180 if settings_override.is_some() {
181 None
182 } else {
183 self.current_model()
184 }
185 });
186 let mut settings = settings_override
187 .cloned()
188 .unwrap_or_else(|| self.settings());
189 if let Some(m) = &effective_model {
190 settings.model.name = m.as_str().to_string();
191 }
192
193 let llm = if effective_model.is_none() {
194 self.llm_override.as_ref().map_or_else(
195 || build_llm_client(&settings, &self.auth),
196 |llm| Ok(Arc::clone(llm)),
197 )?
198 } else {
199 build_llm_client(&settings, &self.auth)?
200 };
201
202 let tool_ctx = ToolCtx::new_with_cancel_token(
205 &self.cwd,
206 Arc::clone(&self.permission_gate),
207 self.progress_tx.clone(),
208 self.cancel_token.clone(),
209 );
210 let mut tools = if self.install_builtin_tools {
211 builtin_tools(tool_ctx.clone())
212 } else {
213 Vec::new()
214 };
215 tools.extend(self.extra_tools.iter().cloned());
216
217 let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
218 let base_prompt = build_system_prompt(&tool_names, &self.skills);
219 let system_prompt = if self.context_discovery_disabled {
220 base_prompt
221 } else {
222 let agent_dir = crate::paths::agent_dir();
223 let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
224 crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
225 };
226 let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
227
228 let mut engine_builder = Engine::builder()
229 .max_iterations(self.max_iterations)
230 .system_prompt(system_prompt)
231 .tool_context(motosan_tool_context);
232 for tool in tools {
233 engine_builder = engine_builder.tool(tool);
234 }
235 if let Some(ui_tx) = &self.ui_tx {
236 let ext = if let Some(handle) = &self.permission_strategy_handle {
237 crate::permissions::PermissionExtension::with_strategy_handle(
238 Arc::clone(&self.policy),
239 Arc::clone(&self.session_cache),
240 self.cwd.clone(),
241 Some(ui_tx.clone()),
242 Arc::clone(handle),
243 )
244 } else {
245 crate::permissions::PermissionExtension::new(
246 Arc::clone(&self.policy),
247 Arc::clone(&self.session_cache),
248 self.cwd.clone(),
249 ui_tx.clone(),
250 )
251 };
252 engine_builder = engine_builder.extension(Box::new(ext));
253 } else if self.headless_permissions {
254 let ext = if let Some(handle) = &self.permission_strategy_handle {
255 crate::permissions::PermissionExtension::with_strategy_handle(
256 Arc::clone(&self.policy),
257 Arc::clone(&self.session_cache),
258 self.cwd.clone(),
259 None,
260 Arc::clone(handle),
261 )
262 } else {
263 crate::permissions::PermissionExtension::headless(
264 Arc::clone(&self.policy),
265 Arc::clone(&self.session_cache),
266 self.cwd.clone(),
267 )
268 };
269 engine_builder = engine_builder.extension(Box::new(ext));
270 }
271 if self.autocompact_enabled
272 && settings.session.compact_at_context_pct > 0.0
273 && settings.session.compact_at_context_pct < 1.0
274 {
275 let cfg = AutocompactConfig {
276 threshold: settings.session.compact_at_context_pct,
277 max_context_tokens: settings.session.max_context_tokens,
278 keep_turns: settings.session.keep_turns.max(1),
279 };
280 engine_builder = engine_builder
281 .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
282 }
283 let engine = engine_builder.build();
284
285 let session = match (&mode, &self.session_store) {
286 (SessionMode::Resume(id), Some(store)) => {
287 let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
288 .await
289 .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
290 let entries = s
291 .entries()
292 .await
293 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
294 crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
295 s
296 }
297 (SessionMode::Resume(_), None) => {
298 return Err(AppError::Config("resume requires a session store".into()));
299 }
300 (SessionMode::New, Some(store)) => {
301 let id = crate::session::SessionId::new();
302 AgentSession::new_with_store(
303 id.into_string(),
304 Arc::clone(store),
305 engine,
306 Arc::clone(&llm),
307 )
308 }
309 (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
310 };
311
312 Ok((session, llm))
313 }
314}
315
316pub struct App {
317 session: arc_swap::ArcSwap<AgentSession>,
318 llm: arc_swap::ArcSwap<SharedLlm>,
319 factory: SessionFactory,
320 config: Config,
321 cancel_token: SharedCancelToken,
322 progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
323 next_tool_id: Arc<Mutex<ToolCallTracker>>,
324 skills: Arc<Vec<crate::skills::Skill>>,
325 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
326 pub(crate) extension_registry: Arc<crate::extensions::ExtensionRegistry>,
328 pub(crate) token_tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>,
333 extension_diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
334 pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
335 pub(crate) permission_mode: Arc<tokio::sync::RwLock<crate::permissions::PermissionMode>>,
338 pub(crate) permission_strategy_handle: Option<Arc<tokio::sync::RwLock<PromptStrategy>>>,
341 pub(crate) ui_tx_owned: Option<mpsc::Sender<UiEvent>>,
344}
345
346impl App {
347 pub fn config(&self) -> &Config {
348 &self.config
349 }
350
351 pub fn cancel(&self) {
355 self.cancel_token.cancel();
356 }
357
358 pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
362 Arc::clone(&self.session_cache)
363 }
364
365 pub fn extension_registry(&self) -> Arc<crate::extensions::ExtensionRegistry> {
368 Arc::clone(&self.extension_registry)
369 }
370
371 pub fn settings(&self) -> crate::settings::Settings {
373 let mut settings = self.factory.settings();
374 if let Some(model) = self.factory.current_model() {
375 settings.model.name = model.to_string();
376 }
377 settings
378 }
379
380 pub fn token_tally(&self) -> Arc<tokio::sync::Mutex<TurnStatsAccum>> {
382 Arc::clone(&self.token_tally)
383 }
384
385 pub async fn set_permission_mode(&self, mode: crate::permissions::PermissionMode) {
390 let new_strategy = match mode {
391 crate::permissions::PermissionMode::Bypass => PromptStrategy::AllowAll,
392 crate::permissions::PermissionMode::AcceptEdits => PromptStrategy::AcceptEdits,
393 crate::permissions::PermissionMode::Prompt if self.ui_tx_owned.is_none() => {
394 PromptStrategy::HeadlessDeny
395 }
396 crate::permissions::PermissionMode::Prompt => PromptStrategy::Prompt,
397 };
398
399 if let Some(handle) = &self.permission_strategy_handle {
400 *handle.write().await = new_strategy;
401 }
402
403 *self.permission_mode.write().await = mode;
404
405 if let Some(ui_tx) = &self.ui_tx_owned {
406 let _ = ui_tx.send(UiEvent::PermissionModeChanged { mode }).await;
407 }
408 }
409
410 pub async fn emit_settings_snapshot(&self) {
414 if let Some(ui_tx) = &self.ui_tx_owned {
415 let _ = ui_tx
416 .send(UiEvent::SettingsSnapshot {
417 settings: self.settings(),
418 })
419 .await;
420 }
421 }
422
423 pub async fn permission_mode(&self) -> crate::permissions::PermissionMode {
425 *self.permission_mode.read().await
426 }
427
428 pub fn extension_diagnostics(&self) -> Arc<Vec<crate::extensions::ExtensionDiagnostic>> {
430 Arc::clone(&self.extension_diagnostics)
431 }
432
433 pub fn session_id(&self) -> String {
436 self.session.load().session_id().to_string()
437 }
438
439 pub async fn session_history(
448 &self,
449 ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
450 self.session.load_full().history().await
451 }
452
453 pub async fn compact(&self) -> Result<()> {
459 use motosan_agent_loop::ThresholdStrategy;
460 let strategy = ThresholdStrategy {
461 threshold: 0.0,
462 ..ThresholdStrategy::default()
463 };
464 let llm = self.llm.load_full().client();
465 self.session
466 .load_full()
467 .maybe_compact(&strategy, llm)
468 .await
469 .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
470 Ok(())
471 }
472
473 pub async fn new_session(&self) -> Result<()> {
477 self.fire_session_before_switch("new", None).await?;
478 let (session, llm) = self.factory.build(SessionMode::New, None, None).await?;
479 self.session.store(Arc::new(session));
480 self.llm.store(Arc::new(SharedLlm::new(llm)));
481 self.reset_token_tally().await;
482 Ok(())
483 }
484
485 pub async fn load_session(&self, id: &str) -> Result<()> {
491 self.fire_session_before_switch("load", Some(id)).await?;
492 self.load_session_without_hook(id).await
493 }
494
495 async fn load_session_without_hook(&self, id: &str) -> Result<()> {
496 let (session, llm) = self
497 .factory
498 .build(SessionMode::Resume(id.to_string()), None, None)
499 .await?;
500 self.session.store(Arc::new(session));
501 self.llm.store(Arc::new(SharedLlm::new(llm)));
502 self.reset_token_tally().await;
503 Ok(())
504 }
505
506 async fn reset_token_tally(&self) {
507 let mut tally = self.token_tally.lock().await;
508 *tally = TurnStatsAccum::default();
509 }
510
511 pub async fn clone_session(&self) -> Result<String> {
520 self.fire_session_before_switch("clone", None).await?;
521 let Some(store) = self.factory.session_store.as_ref() else {
522 return Err(AppError::Config("clone requires a session store".into()));
523 };
524 let source_id = self.session.load().session_id().to_string();
525 let new_id = crate::session::SessionId::new().into_string();
526 let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
527 catalog
528 .fork(&source_id, &new_id)
529 .await
530 .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
531 self.load_session_without_hook(&new_id).await?;
532 Ok(new_id)
533 }
534
535 pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
550 self.fire_session_before_switch("model_switch", None)
551 .await?;
552 let current_id = self.session.load().session_id().to_string();
553 let (session, llm) = self
554 .factory
555 .build(SessionMode::Resume(current_id), Some(model), None)
556 .await?;
557 self.factory.set_current_model(model.clone());
558 self.session.store(Arc::new(session));
559 self.llm.store(Arc::new(SharedLlm::new(llm)));
560 Ok(())
561 }
562
563 pub async fn reload_settings(&self, new_settings: crate::settings::Settings) -> Result<()> {
579 let current_id = self.session.load().session_id().to_string();
583 let (session, llm) = self
584 .factory
585 .build(SessionMode::Resume(current_id), None, Some(&new_settings))
586 .await?;
587 self.factory.store_settings(new_settings);
588 self.factory.clear_current_model();
589 self.session.store(Arc::new(session));
590 self.llm.store(Arc::new(SharedLlm::new(llm)));
591
592 self.reset_token_tally().await;
595
596 Ok(())
597 }
598
599 pub async fn disconnect_mcp(&self) {
602 for (name, server) in &self.mcp_servers {
603 let _ =
604 tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
605 tracing::debug!(target: "mcp", server = %name, "disconnected");
606 }
607 }
608
609 fn run_turn(
610 &self,
611 msg: crate::user_message::UserMessage,
612 fork_from: Option<motosan_agent_loop::EntryId>,
613 ) -> impl Stream<Item = UiEvent> + Send + 'static {
614 let session = self.session.load_full();
615 let skills = Arc::clone(&self.skills);
616 let cancel_token = self.cancel_token.clone();
617 let tracker = Arc::clone(&self.next_tool_id);
618 let progress = Arc::clone(&self.progress_rx);
619 let token_tally = Arc::clone(&self.token_tally);
620 let settings_model_name = self
621 .factory
622 .current_model()
623 .map(|model| model.to_string())
624 .unwrap_or_else(|| self.factory.settings().model.name);
625
626 async_stream::stream! {
627 let new_user = {
631 let expanded_text = crate::skills::expand::expand_skill_command(&msg.text, &skills);
635 let expanded_msg = crate::user_message::UserMessage {
636 text: expanded_text,
637 attachments: msg.attachments.clone(),
638 };
639 match crate::user_message::prepare_user_message(&expanded_msg) {
640 Ok(m) => m,
641 Err(err) => {
642 yield UiEvent::AttachmentError {
643 kind: err.kind(),
644 message: err.to_string(),
645 };
646 return;
647 }
648 }
649 };
650
651 let mut progress_guard = match progress.try_lock() {
653 Ok(guard) => guard,
654 Err(_) => {
655 yield UiEvent::Error(
656 "another turn is already running; capo is single-turn-per-App".into(),
657 );
658 return;
659 }
660 };
661
662 let cancel = cancel_token.reset();
664
665 yield UiEvent::AgentTurnStarted;
666 yield UiEvent::AgentThinking;
667
668 let handle = match fork_from {
670 None => {
671 let history = match session.history().await {
673 Ok(h) => h,
674 Err(err) => {
675 yield UiEvent::Error(format!("session.history failed: {err}"));
676 return;
677 }
678 };
679 let mut messages = history;
680 messages.push(new_user);
681 match session.start_turn(messages).await {
682 Ok(h) => h,
683 Err(err) => {
684 yield UiEvent::Error(format!("session.start_turn failed: {err}"));
685 return;
686 }
687 }
688 }
689 Some(from) => {
690 match session.fork_turn(from, vec![new_user]).await {
692 Ok(h) => h,
693 Err(err) => {
694 yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
695 return;
696 }
697 }
698 }
699 };
700 let previous_len = handle.previous_len;
701 let epoch = handle.epoch;
702 let branch_parent = handle.branch_parent;
703 let ops_tx = handle.ops_tx.clone();
704 let mut agent_stream = handle.stream;
705
706 let interrupt_bridge = tokio::spawn(async move {
714 cancel.cancelled().await;
715 let _ = ops_tx.send(AgentOp::Interrupt).await;
716 });
717
718 let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
720 let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
721 let mut streamed_thinking_seen = false;
726
727 loop {
728 while let Ok(chunk) = progress_guard.try_recv() {
730 yield UiEvent::ToolCallProgress {
731 id: progress_event_id(&tracker),
732 chunk: ProgressChunk::from(chunk),
733 };
734 }
735
736 tokio::select! {
737 biased;
738 maybe_item = agent_stream.next() => {
739 match maybe_item {
740 Some(AgentStreamItem::Event(ev)) => {
741 if matches!(
742 &ev,
743 motosan_agent_loop::AgentEvent::Core(
744 motosan_agent_loop::CoreEvent::ThinkingDone(_)
745 )
746 ) {
747 streamed_thinking_seen = true;
748 }
749 if let Some(ui) = map_event(ev, &tracker) {
750 yield ui;
751 }
752 }
753 Some(AgentStreamItem::Terminal(term)) => {
754 terminal_result = Some(term.result);
755 terminal_messages = Some(term.messages);
756 break;
757 }
758 None => break,
759 }
760 }
761 Some(chunk) = progress_guard.recv() => {
762 yield UiEvent::ToolCallProgress {
763 id: progress_event_id(&tracker),
764 chunk: ProgressChunk::from(chunk),
765 };
766 }
767 }
768 }
769
770 interrupt_bridge.abort();
772
773 if let Some(msgs) = terminal_messages.as_ref() {
775 if let Err(err) = session
776 .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
777 .await
778 {
779 yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
780 }
781 }
782
783 match terminal_result {
785 Some(Ok(result)) => {
786 if !streamed_thinking_seen {
792 if let Some(msgs) = terminal_messages.as_ref() {
793 for ev in extract_new_thinking_events(msgs, previous_len) {
794 yield ev;
795 }
796 }
797 }
798
799 let final_text = terminal_messages
800 .as_ref()
801 .and_then(|msgs| {
802 msgs.iter()
803 .rev()
804 .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
805 .map(|m| m.text())
806 })
807 .unwrap_or_default();
808 if !final_text.is_empty() {
809 yield UiEvent::AgentMessageComplete(final_text);
810 }
811 let usage = result.usage;
814 let (cumulative_input, cumulative_output) = {
815 let mut tally = token_tally.lock().await;
816 tally.add(usage);
817 (tally.cumulative_input, tally.cumulative_output)
818 };
819 yield UiEvent::TurnStats {
820 input_tokens: usage.input_tokens,
821 output_tokens: usage.output_tokens,
822 cumulative_input,
823 cumulative_output,
824 model: settings_model_name.clone(),
825 };
826 while let Ok(chunk) = progress_guard.try_recv() {
828 yield UiEvent::ToolCallProgress {
829 id: progress_event_id(&tracker),
830 chunk: ProgressChunk::from(chunk),
831 };
832 }
833 yield UiEvent::AgentTurnComplete;
834 }
835 Some(Err(err)) => {
836 if let motosan_agent_loop::AgentError::MaxIterations(n) = err {
849 yield UiEvent::Notice {
850 title: "Agent stopped".to_string(),
851 body: format!(
852 "Reached the per-turn iteration cap ({n}). \
853 Partial work is saved — send another message to continue."
854 ),
855 };
856 yield UiEvent::AgentTurnComplete;
857 } else {
858 yield UiEvent::Error(format!("{err}"));
859 }
860 }
861 None => { }
862 }
863 }
864 }
865
866 pub fn send_user_message(
867 &self,
868 msg: crate::user_message::UserMessage,
869 ) -> impl Stream<Item = UiEvent> + Send + 'static {
870 self.run_turn(msg, None)
871 }
872
873 pub fn fork_from(
878 &self,
879 from: motosan_agent_loop::EntryId,
880 message: crate::user_message::UserMessage,
881 ) -> impl Stream<Item = UiEvent> + Send + 'static {
882 let registry = Arc::clone(&self.extension_registry);
883 let inner = self.run_turn(message, Some(from));
884 async_stream::stream! {
885 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
886 match dispatch_session_before_switch(®istry, "fork", None).await {
887 HookOutcome::Continue => {}
888 HookOutcome::Cancelled { extension_name, reason } => {
889 let msg = match reason {
890 Some(r) => format!("extension `{extension_name}` cancelled fork: {r}"),
891 None => format!("extension `{extension_name}` cancelled fork"),
892 };
893 yield UiEvent::Error(msg);
894 return;
895 }
896 }
897 let mut inner = Box::pin(inner);
898 while let Some(ev) = futures::StreamExt::next(&mut inner).await {
899 yield ev;
900 }
901 }
902 }
903
904 pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
908 let entries = self
909 .session
910 .load_full()
911 .entries()
912 .await
913 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
914 let branch = motosan_agent_loop::active_branch(&entries);
915 let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
916 .iter()
917 .filter_map(|stored| {
918 let msg = stored.entry.as_message()?;
919 if !matches!(msg.role(), motosan_agent_loop::Role::User) {
920 return None;
921 }
922 let preview: String = msg
923 .text()
924 .lines()
925 .next()
926 .unwrap_or("")
927 .chars()
928 .take(80)
929 .collect();
930 Some((stored.id.clone(), preview))
931 })
932 .collect();
933 out.reverse();
934 Ok(out)
935 }
936
937 pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
939 self.session
940 .load_full()
941 .branches()
942 .await
943 .map_err(|e| AppError::Config(format!("branches failed: {e}")))
944 }
945
946 async fn fire_session_before_switch(
949 &self,
950 reason: &str,
951 session_id: Option<&str>,
952 ) -> Result<()> {
953 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
954 match dispatch_session_before_switch(&self.extension_registry, reason, session_id).await {
955 HookOutcome::Continue => Ok(()),
956 HookOutcome::Cancelled {
957 extension_name,
958 reason,
959 } => Err(AppError::HookCancelled {
960 extension_name,
961 reason,
962 }),
963 }
964 }
965}
966
967#[derive(Debug, Default)]
968struct ToolCallTracker {
969 next_id: usize,
970 pending: VecDeque<(String, String)>,
971}
972
973impl ToolCallTracker {
974 fn start(&mut self, name: &str) -> String {
975 self.next_id += 1;
976 let id = format!("tool_{}", self.next_id);
977 self.pending.push_back((name.to_string(), id.clone()));
978 id
979 }
980
981 fn complete(&mut self, name: &str) -> String {
982 if let Some(pos) = self
983 .pending
984 .iter()
985 .position(|(pending_name, _)| pending_name == name)
986 {
987 if let Some((_, id)) = self.pending.remove(pos) {
988 return id;
989 }
990 }
991
992 self.next_id += 1;
993 format!("tool_{}", self.next_id)
994 }
995
996 fn progress_id(&self) -> Option<String> {
1001 match self.pending.len() {
1002 1 => self.pending.front().map(|(_, id)| id.clone()),
1003 _ => None,
1004 }
1005 }
1006}
1007
1008fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
1009 match tracker.lock() {
1010 Ok(guard) => guard,
1011 Err(poisoned) => poisoned.into_inner(),
1012 }
1013}
1014
1015fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
1016 lock_tool_tracker(tracker)
1017 .progress_id()
1018 .unwrap_or_else(|| "tool_unknown".to_string())
1019}
1020
1021fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
1022where
1023 F: Fn(&str) -> Option<String>,
1024{
1025 env_lookup("ANTHROPIC_API_KEY")
1026 .map(|key| key.trim().to_string())
1027 .filter(|key| !key.is_empty())
1028 .or_else(|| auth.api_key("anthropic").map(str::to_string))
1029}
1030
1031fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
1032 match ev {
1033 AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
1034 AgentEvent::Core(CoreEvent::ThinkingChunk(delta)) => {
1035 Some(UiEvent::AgentThinkingDelta(delta))
1036 }
1037 AgentEvent::Core(CoreEvent::ThinkingDone(text)) => Some(UiEvent::ThinkingComplete { text }),
1038 AgentEvent::Core(CoreEvent::ToolStarted { name, args }) => {
1039 let id = lock_tool_tracker(tool_tracker).start(&name);
1040 Some(UiEvent::ToolCallStarted { id, name, args })
1041 }
1042 AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
1043 let id = lock_tool_tracker(tool_tracker).complete(&name);
1044 Some(UiEvent::ToolCallCompleted {
1045 id,
1046 result: UiToolResult {
1047 is_error: result.is_error,
1048 text: format!("{name}: {result:?}"),
1049 },
1050 })
1051 }
1052 _ => None,
1053 }
1054}
1055
1056type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
1057
1058pub struct AppBuilder {
1059 config: Option<Config>,
1060 cwd: Option<PathBuf>,
1061 permission_gate: Option<Arc<dyn PermissionGate>>,
1062 install_builtin_tools: bool,
1063 max_iterations: usize,
1064 llm_override: Option<Arc<dyn LlmClient>>,
1065 custom_tools_factory: Option<CustomToolsFactory>,
1066 permissions_policy_path: Option<PathBuf>,
1067 ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
1068 headless_permissions: bool,
1069 settings: Option<crate::settings::Settings>,
1070 auth: Option<crate::auth::Auth>,
1071 context_discovery_disabled: bool,
1072 session_store: Option<Arc<dyn SessionStore>>,
1074 resume_session_id: Option<crate::session::SessionId>,
1075 autocompact_enabled: bool,
1076 skills: Vec<crate::skills::Skill>,
1078 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
1080 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1081 extension_registry: Option<Arc<crate::extensions::ExtensionRegistry>>,
1082 extension_diagnostics: Option<Arc<Vec<crate::extensions::ExtensionDiagnostic>>>,
1083 token_tally: Option<Arc<tokio::sync::Mutex<TurnStatsAccum>>>,
1084}
1085
1086impl Default for AppBuilder {
1087 fn default() -> Self {
1088 Self {
1089 config: None,
1090 cwd: None,
1091 permission_gate: None,
1092 install_builtin_tools: false,
1093 max_iterations: 200,
1100 llm_override: None,
1101 custom_tools_factory: None,
1102 permissions_policy_path: None,
1103 ui_tx: None,
1104 headless_permissions: false,
1105 settings: None,
1106 auth: None,
1107 context_discovery_disabled: false,
1108 session_store: None,
1109 resume_session_id: None,
1110 autocompact_enabled: false,
1111 skills: Vec::new(),
1112 extra_tools: Vec::new(),
1113 mcp_servers: Vec::new(),
1114 extension_registry: None,
1115 extension_diagnostics: None,
1116 token_tally: None,
1117 }
1118 }
1119}
1120
1121impl AppBuilder {
1122 pub fn new() -> Self {
1123 Self::default()
1124 }
1125
1126 pub fn with_config(mut self, cfg: Config) -> Self {
1127 self.config = Some(cfg);
1128 self
1129 }
1130
1131 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
1132 self.cwd = Some(cwd.into());
1133 self
1134 }
1135
1136 pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
1137 self.permission_gate = Some(gate);
1138 self
1139 }
1140
1141 pub fn with_builtin_tools(mut self) -> Self {
1147 self.install_builtin_tools = true;
1148 self
1149 }
1150
1151 pub fn with_max_iterations(mut self, n: usize) -> Self {
1152 self.max_iterations = n;
1153 self
1154 }
1155
1156 pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
1157 self.llm_override = Some(llm);
1158 self
1159 }
1160
1161 pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
1162 self.permissions_policy_path = Some(path);
1163 self
1164 }
1165
1166 pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
1167 self.ui_tx = Some(tx);
1168 self
1169 }
1170
1171 pub fn with_token_tally(mut self, tally: Arc<tokio::sync::Mutex<TurnStatsAccum>>) -> Self {
1173 self.token_tally = Some(tally);
1174 self
1175 }
1176
1177 pub fn with_headless_permissions(mut self) -> Self {
1182 self.headless_permissions = true;
1183 self
1184 }
1185
1186 pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
1188 self.settings = Some(settings);
1189 self
1190 }
1191
1192 pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
1194 self.auth = Some(auth);
1195 self
1196 }
1197
1198 pub fn disable_context_discovery(mut self) -> Self {
1201 self.context_discovery_disabled = true;
1202 self
1203 }
1204
1205 pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
1208 self.session_store = Some(store);
1209 self
1210 }
1211
1212 pub fn with_autocompact(mut self) -> Self {
1217 self.autocompact_enabled = true;
1218 self
1219 }
1220
1221 pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
1227 self.skills = skills;
1228 self
1229 }
1230
1231 pub fn without_skills(mut self) -> Self {
1232 self.skills.clear();
1233 self
1234 }
1235
1236 pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
1240 self.extra_tools = tools;
1241 self
1242 }
1243
1244 pub fn with_extension_registry(
1245 mut self,
1246 registry: Arc<crate::extensions::ExtensionRegistry>,
1247 ) -> Self {
1248 self.extension_registry = Some(registry);
1249 self
1250 }
1251
1252 pub fn with_extension_diagnostics(
1253 mut self,
1254 diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
1255 ) -> Self {
1256 self.extension_diagnostics = Some(diagnostics);
1257 self
1258 }
1259
1260 pub fn with_mcp_servers(
1263 mut self,
1264 servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
1265 ) -> Self {
1266 self.mcp_servers = servers;
1267 self
1268 }
1269
1270 pub fn with_custom_tools_factory(
1275 mut self,
1276 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1277 ) -> Self {
1278 self.custom_tools_factory = Some(Box::new(factory));
1279 self
1280 }
1281
1282 pub async fn build_with_custom_tools(
1286 self,
1287 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
1288 ) -> Result<App> {
1289 self.with_custom_tools_factory(factory).build().await
1290 }
1291
1292 pub async fn build_with_session(
1300 mut self,
1301 resume: Option<crate::session::SessionId>,
1302 ) -> Result<App> {
1303 if let Some(id) = resume {
1304 if self.session_store.is_none() {
1305 return Err(AppError::Config(
1306 "build_with_session(Some(id)) requires with_session_store(...)".into(),
1307 ));
1308 }
1309 self.resume_session_id = Some(id);
1310 }
1311 self.build_internal().await
1312 }
1313
1314 pub async fn build(self) -> Result<App> {
1316 self.build_with_session(None).await
1317 }
1318
1319 async fn build_internal(mut self) -> Result<App> {
1320 let mcp_servers = std::mem::take(&mut self.mcp_servers);
1321 let extra_tools = std::mem::take(&mut self.extra_tools);
1322 let skills = self.skills.clone();
1323 if self.install_builtin_tools && self.custom_tools_factory.is_some() {
1324 return Err(AppError::Config(
1325 "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
1326 ));
1327 }
1328
1329 let has_config = self.config.is_some();
1333 let has_auth = self.auth.is_some();
1334 let mut config = self.config.unwrap_or_default();
1335 let settings = match self.settings {
1336 Some(settings) => settings,
1337 None => {
1338 let mut settings = crate::settings::Settings::default();
1339 settings.model.provider = config.model.provider.clone();
1340 settings.model.name = config.model.name.clone();
1341 settings.model.max_tokens = config.model.max_tokens;
1342 settings
1343 }
1344 };
1345 config.model.provider = settings.model.provider.clone();
1346 config.model.name = settings.model.name.clone();
1347 config.model.max_tokens = settings.model.max_tokens;
1348 let mut auth = self.auth.unwrap_or_default();
1349 if !has_auth {
1350 if let Some(key) = config.anthropic.api_key.as_deref() {
1351 auth.0.insert(
1352 "anthropic".into(),
1353 crate::auth::ProviderAuth::ApiKey {
1354 key: key.to_string(),
1355 },
1356 );
1357 }
1358 }
1359 let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
1360 if env_or_auth_key.is_some() || has_auth || !has_config {
1361 config.anthropic.api_key = env_or_auth_key;
1362 }
1363 let cwd = self
1364 .cwd
1365 .or_else(|| std::env::current_dir().ok())
1366 .unwrap_or_else(|| PathBuf::from("."));
1367 let agent_dir = crate::paths::agent_dir();
1368 let permission_gate = self.permission_gate.unwrap_or_else(|| {
1369 if self.ui_tx.is_some() || self.headless_permissions {
1373 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1374 } else {
1375 tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
1376 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1377 }
1378 });
1379
1380 let policy: Arc<crate::permissions::Policy> =
1382 Arc::new(match self.permissions_policy_path.as_ref() {
1383 Some(path) => crate::permissions::Policy::load_or_default(path)?,
1384 None => crate::permissions::Policy::default(),
1385 });
1386 let session_cache = Arc::new(crate::permissions::SessionCache::new());
1387 let permission_strategy_handle = if self.ui_tx.is_some() || self.headless_permissions {
1388 let initial_strategy = if self.ui_tx.is_some() {
1389 PromptStrategy::Prompt
1390 } else {
1391 PromptStrategy::HeadlessDeny
1392 };
1393 Some(Arc::new(tokio::sync::RwLock::new(initial_strategy)))
1394 } else {
1395 None
1396 };
1397 let permission_mode = Arc::new(tokio::sync::RwLock::new(
1398 crate::permissions::PermissionMode::default(),
1399 ));
1400 let token_tally = self
1401 .token_tally
1402 .unwrap_or_else(|| Arc::new(tokio::sync::Mutex::new(TurnStatsAccum::default())));
1403
1404 let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
1406
1407 let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
1412 let cancel_token = probe_ctx.cancel_token.clone();
1413 let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
1414 if let Some(factory_fn) = self.custom_tools_factory.take() {
1415 let mut t = factory_fn(probe_ctx);
1416 t.extend(extra_tools.clone());
1417 (false, t)
1418 } else {
1419 (self.install_builtin_tools, extra_tools.clone())
1420 };
1421
1422 let factory = SessionFactory {
1423 cwd: cwd.clone(),
1424 settings: Arc::new(Mutex::new(settings.clone())),
1425 auth: auth.clone(),
1426 policy: Arc::clone(&policy),
1427 session_cache: Arc::clone(&session_cache),
1428 ui_tx: self.ui_tx.clone(),
1429 headless_permissions: self.headless_permissions,
1430 permission_gate: Arc::clone(&permission_gate),
1431 permission_strategy_handle: permission_strategy_handle.clone(),
1432 progress_tx: progress_tx.clone(),
1433 skills: Arc::new(skills.clone()),
1434 install_builtin_tools: install_builtin,
1435 extra_tools: factory_extra_tools,
1436 max_iterations: self.max_iterations,
1437 context_discovery_disabled: self.context_discovery_disabled,
1438 autocompact_enabled: self.autocompact_enabled,
1439 session_store: self.session_store.clone(),
1440 llm_override: self.llm_override.clone(),
1441 current_model: Arc::new(Mutex::new(None)),
1442 cancel_token: cancel_token.clone(),
1443 };
1444
1445 let mode = match self.resume_session_id.take() {
1446 Some(id) => SessionMode::Resume(id.into_string()),
1447 None => SessionMode::New,
1448 };
1449 let (session, llm) = factory.build(mode, None, None).await?;
1450 let (extension_registry, extension_diagnostics) =
1451 if let Some(reg) = self.extension_registry.take() {
1452 let diagnostics = self
1453 .extension_diagnostics
1454 .unwrap_or_else(|| Arc::new(Vec::new()));
1455 (reg, diagnostics)
1456 } else {
1457 let manifest_path = crate::paths::extensions_manifest_path(&agent_dir);
1458 let (registry, diagnostics) =
1459 crate::extensions::load_extensions_manifest(&manifest_path).await;
1460 for d in &diagnostics {
1461 let message = d.message.as_str();
1462 match d.severity {
1463 crate::extensions::DiagnosticSeverity::Warn => {
1464 tracing::warn!("extensions: {message}");
1465 }
1466 crate::extensions::DiagnosticSeverity::Error => {
1467 tracing::error!("extensions: {message}");
1468 }
1469 }
1470 }
1471 (Arc::new(registry), Arc::new(diagnostics))
1472 };
1473
1474 Ok(App {
1475 session: arc_swap::ArcSwap::from_pointee(session),
1476 llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1477 factory,
1478 config,
1479 cancel_token,
1480 progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1481 next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1482 skills: Arc::new(skills),
1483 mcp_servers,
1484 extension_registry,
1485 token_tally,
1486 extension_diagnostics,
1487 session_cache,
1488 permission_mode,
1489 permission_strategy_handle,
1490 ui_tx_owned: self.ui_tx.clone(),
1491 })
1492 }
1493}
1494
1495#[cfg(test)]
1496mod tests {
1497 use super::*;
1498 use crate::config::{AnthropicConfig, ModelConfig};
1499 use crate::events::UiEvent;
1500 use crate::user_message::UserMessage;
1501 use async_trait::async_trait;
1502 use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1503 use motosan_agent_tool::ToolDef;
1504 use std::sync::atomic::{AtomicUsize, Ordering};
1505
1506 #[test]
1507 fn turn_stats_accum_accumulates_across_calls() {
1508 let mut accum = TurnStatsAccum::default();
1509 accum.add(motosan_agent_loop::TokenUsage {
1510 input_tokens: 100,
1511 output_tokens: 25,
1512 });
1513 accum.add(motosan_agent_loop::TokenUsage {
1514 input_tokens: 1000,
1515 output_tokens: 250,
1516 });
1517 assert_eq!(accum.cumulative_input, 1100);
1518 assert_eq!(accum.cumulative_output, 275);
1519 assert_eq!(accum.turn_count, 2);
1520 }
1521
1522 #[test]
1523 fn turn_stats_accum_saturates_on_overflow() {
1524 let mut accum = TurnStatsAccum {
1525 cumulative_input: u64::MAX - 5,
1526 cumulative_output: 0,
1527 turn_count: 1,
1528 };
1529 accum.add(motosan_agent_loop::TokenUsage {
1530 input_tokens: 100,
1531 output_tokens: 0,
1532 });
1533 assert_eq!(accum.cumulative_input, u64::MAX);
1534 }
1535
1536 #[test]
1537 fn extract_thinking_events_extracts_reasoning_in_source_order() {
1538 use motosan_agent_loop::{new_message_id, AssistantContent, MessageMeta};
1539 let messages = vec![Message::Assistant {
1540 id: new_message_id(),
1541 meta: MessageMeta::default(),
1542 content: vec![
1543 AssistantContent::Reasoning {
1544 text: "first thought".into(),
1545 signature: None,
1546 },
1547 AssistantContent::Text {
1548 text: "answer 1".into(),
1549 },
1550 AssistantContent::Reasoning {
1551 text: "second thought".into(),
1552 signature: None,
1553 },
1554 ],
1555 }];
1556 let events = extract_thinking_events(&messages);
1557 assert_eq!(events.len(), 2);
1558 match (&events[0], &events[1]) {
1559 (UiEvent::ThinkingComplete { text: t1 }, UiEvent::ThinkingComplete { text: t2 }) => {
1560 assert_eq!(t1, "first thought");
1561 assert_eq!(t2, "second thought");
1562 }
1563 _ => panic!("expected two ThinkingComplete events"),
1564 }
1565 }
1566
1567 #[test]
1568 fn extract_new_thinking_events_skips_historical_reasoning_before_previous_len() {
1569 use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1570 let messages = vec![
1571 Message::User {
1572 id: new_message_id(),
1573 meta: MessageMeta::default(),
1574 content: vec![ContentPart::text("old question")],
1575 },
1576 Message::Assistant {
1577 id: new_message_id(),
1578 meta: MessageMeta::default(),
1579 content: vec![AssistantContent::Reasoning {
1580 text: "old thought".into(),
1581 signature: None,
1582 }],
1583 },
1584 Message::User {
1585 id: new_message_id(),
1586 meta: MessageMeta::default(),
1587 content: vec![ContentPart::text("new question")],
1588 },
1589 Message::Assistant {
1590 id: new_message_id(),
1591 meta: MessageMeta::default(),
1592 content: vec![AssistantContent::Reasoning {
1593 text: "new thought".into(),
1594 signature: None,
1595 }],
1596 },
1597 ];
1598
1599 let events = extract_new_thinking_events(&messages, 2);
1600 assert_eq!(events.len(), 1, "should only emit current-turn reasoning");
1601 match &events[0] {
1602 UiEvent::ThinkingComplete { text } => assert_eq!(text, "new thought"),
1603 other => panic!("expected ThinkingComplete; got {other:?}"),
1604 }
1605 }
1606
1607 #[test]
1608 fn extract_new_thinking_events_still_emits_when_not_suppressed() {
1609 use motosan_agent_loop::{new_message_id, AssistantContent, MessageMeta};
1614 let messages = vec![Message::Assistant {
1615 id: new_message_id(),
1616 meta: MessageMeta::default(),
1617 content: vec![AssistantContent::Reasoning {
1618 text: "the model was thinking".into(),
1619 signature: None,
1620 }],
1621 }];
1622 let events = extract_new_thinking_events(&messages, 0);
1623 assert_eq!(events.len(), 1);
1624 }
1625
1626 #[test]
1627 fn extract_thinking_events_skips_non_assistant_and_non_reasoning() {
1628 use motosan_agent_loop::{new_message_id, AssistantContent, ContentPart, MessageMeta};
1629 let messages = vec![
1630 Message::User {
1631 id: new_message_id(),
1632 meta: MessageMeta::default(),
1633 content: vec![ContentPart::text("question?")],
1634 },
1635 Message::Assistant {
1636 id: new_message_id(),
1637 meta: MessageMeta::default(),
1638 content: vec![AssistantContent::Text {
1639 text: "answer".into(),
1640 }],
1641 },
1642 ];
1643 let events = extract_thinking_events(&messages);
1644 assert!(events.is_empty(), "expected no events; got {events:?}");
1645 }
1646
1647 #[tokio::test]
1648 async fn builder_fails_without_api_key() {
1649 let cfg = Config {
1650 anthropic: AnthropicConfig {
1651 api_key: None,
1652 base_url: "https://api.anthropic.com".into(),
1653 },
1654 model: ModelConfig {
1655 provider: "anthropic".into(),
1656 name: "claude-sonnet-4-6".into(),
1657 max_tokens: 4096,
1658 },
1659 };
1660 let err = match AppBuilder::new()
1661 .with_config(cfg)
1662 .with_builtin_tools()
1663 .build()
1664 .await
1665 {
1666 Ok(_) => panic!("must fail without key"),
1667 Err(err) => err,
1668 };
1669 assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1670 }
1671
1672 struct ToolOnlyLlm {
1673 turn: AtomicUsize,
1674 }
1675
1676 #[async_trait]
1677 impl LlmClient for ToolOnlyLlm {
1678 async fn chat(
1679 &self,
1680 _messages: &[Message],
1681 _tools: &[ToolDef],
1682 ) -> motosan_agent_loop::Result<ChatOutput> {
1683 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1684 if turn == 0 {
1685 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1686 ToolCallItem {
1687 id: "t1".into(),
1688 name: "read".into(),
1689 args: serde_json::json!({"path":"nope.txt"}),
1690 },
1691 ])))
1692 } else {
1693 Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1694 }
1695 }
1696 }
1697
1698 #[tokio::test]
1699 async fn empty_final_message_is_not_emitted() {
1700 let dir = tempfile::tempdir().unwrap();
1701 let mut cfg = Config::default();
1702 cfg.anthropic.api_key = Some("sk-unused".into());
1703 let app = AppBuilder::new()
1704 .with_config(cfg)
1705 .with_cwd(dir.path())
1706 .with_builtin_tools()
1707 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1708 turn: AtomicUsize::new(0),
1709 }))
1710 .build()
1711 .await
1712 .expect("build");
1713 let events: Vec<UiEvent> =
1714 futures::StreamExt::collect(app.send_user_message(UserMessage::text("x"))).await;
1715 let empties = events
1716 .iter()
1717 .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1718 .count();
1719 assert_eq!(
1720 empties, 0,
1721 "should not emit empty final message, got: {events:?}"
1722 );
1723 }
1724
1725 struct EchoLlm;
1726
1727 struct UsageLlm {
1728 responses: std::sync::Mutex<VecDeque<ChatOutput>>,
1729 }
1730
1731 #[async_trait]
1732 impl LlmClient for UsageLlm {
1733 async fn chat(
1734 &self,
1735 _messages: &[Message],
1736 _tools: &[ToolDef],
1737 ) -> motosan_agent_loop::Result<ChatOutput> {
1738 let next = match self.responses.lock() {
1739 Ok(mut responses) => responses.pop_front(),
1740 Err(poisoned) => poisoned.into_inner().pop_front(),
1741 };
1742 Ok(next.unwrap_or_else(|| panic!("UsageLlm script exhausted")))
1743 }
1744 }
1745
1746 #[async_trait]
1747 impl LlmClient for EchoLlm {
1748 async fn chat(
1749 &self,
1750 _messages: &[Message],
1751 _tools: &[ToolDef],
1752 ) -> motosan_agent_loop::Result<ChatOutput> {
1753 Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1754 }
1755 }
1756
1757 #[tokio::test]
1758 async fn turn_stats_emitted_with_cumulative_after_each_turn() {
1759 use motosan_agent_loop::{LlmResponse, TokenUsage};
1760
1761 let dir = tempfile::tempdir().expect("tempdir");
1762 let mut cfg = Config::default();
1763 cfg.anthropic.api_key = Some("sk-unused".into());
1764 let llm = Arc::new(UsageLlm {
1765 responses: std::sync::Mutex::new(VecDeque::from([
1766 ChatOutput::with_usage(
1767 LlmResponse::Message("first".into()),
1768 TokenUsage {
1769 input_tokens: 100,
1770 output_tokens: 50,
1771 },
1772 ),
1773 ChatOutput::with_usage(
1774 LlmResponse::Message("second".into()),
1775 TokenUsage {
1776 input_tokens: 200,
1777 output_tokens: 80,
1778 },
1779 ),
1780 ])),
1781 });
1782 let app = AppBuilder::new()
1783 .with_config(cfg)
1784 .with_cwd(dir.path())
1785 .with_builtin_tools()
1786 .with_llm(llm)
1787 .build()
1788 .await
1789 .expect("build");
1790
1791 let events_1: Vec<UiEvent> = app
1792 .send_user_message(UserMessage::text("hi"))
1793 .collect()
1794 .await;
1795 let events_2: Vec<UiEvent> = app
1796 .send_user_message(UserMessage::text("again"))
1797 .collect()
1798 .await;
1799
1800 let ts1 = events_1
1801 .iter()
1802 .find_map(|e| match e {
1803 UiEvent::TurnStats {
1804 input_tokens,
1805 output_tokens,
1806 cumulative_input,
1807 cumulative_output,
1808 ..
1809 } => Some((
1810 *input_tokens,
1811 *output_tokens,
1812 *cumulative_input,
1813 *cumulative_output,
1814 )),
1815 _ => None,
1816 })
1817 .expect("turn 1 had no TurnStats");
1818 assert_eq!(ts1, (100, 50, 100, 50), "turn 1 stats");
1819
1820 let ts2 = events_2
1821 .iter()
1822 .find_map(|e| match e {
1823 UiEvent::TurnStats {
1824 input_tokens,
1825 output_tokens,
1826 cumulative_input,
1827 cumulative_output,
1828 ..
1829 } => Some((
1830 *input_tokens,
1831 *output_tokens,
1832 *cumulative_input,
1833 *cumulative_output,
1834 )),
1835 _ => None,
1836 })
1837 .expect("turn 2 had no TurnStats");
1838 assert_eq!(ts2, (200, 80, 300, 130), "turn 2 stats");
1839
1840 let positions: Vec<&str> = events_1
1841 .iter()
1842 .filter_map(|e| match e {
1843 UiEvent::AgentMessageComplete(_) => Some("msg_complete"),
1844 UiEvent::TurnStats { .. } => Some("stats"),
1845 UiEvent::AgentTurnComplete => Some("turn_complete"),
1846 _ => None,
1847 })
1848 .collect();
1849 assert_eq!(
1850 positions,
1851 vec!["msg_complete", "stats", "turn_complete"],
1852 "wrong ordering"
1853 );
1854 }
1855
1856 #[tokio::test]
1857 async fn turn_stats_reset_after_new_session() {
1858 use motosan_agent_loop::{LlmResponse, TokenUsage};
1859
1860 let dir = tempfile::tempdir().expect("tempdir");
1861 let mut cfg = Config::default();
1862 cfg.anthropic.api_key = Some("sk-unused".into());
1863 let llm = Arc::new(UsageLlm {
1864 responses: std::sync::Mutex::new(VecDeque::from([
1865 ChatOutput::with_usage(
1866 LlmResponse::Message("first".into()),
1867 TokenUsage {
1868 input_tokens: 100,
1869 output_tokens: 50,
1870 },
1871 ),
1872 ChatOutput::with_usage(
1873 LlmResponse::Message("after-new".into()),
1874 TokenUsage {
1875 input_tokens: 7,
1876 output_tokens: 3,
1877 },
1878 ),
1879 ])),
1880 });
1881 let app = AppBuilder::new()
1882 .with_config(cfg)
1883 .with_cwd(dir.path())
1884 .with_builtin_tools()
1885 .with_llm(llm)
1886 .build()
1887 .await
1888 .expect("build");
1889
1890 let _: Vec<UiEvent> = app
1891 .send_user_message(UserMessage::text("hi"))
1892 .collect()
1893 .await;
1894 app.new_session().await.expect("new session");
1895 let events: Vec<UiEvent> = app
1896 .send_user_message(UserMessage::text("after new"))
1897 .collect()
1898 .await;
1899 let stats = events
1900 .iter()
1901 .find_map(|event| match event {
1902 UiEvent::TurnStats {
1903 input_tokens,
1904 output_tokens,
1905 cumulative_input,
1906 cumulative_output,
1907 ..
1908 } => Some((
1909 *input_tokens,
1910 *output_tokens,
1911 *cumulative_input,
1912 *cumulative_output,
1913 )),
1914 _ => None,
1915 })
1916 .expect("turn had no TurnStats");
1917 assert_eq!(stats, (7, 3, 7, 3));
1918 }
1919
1920 #[tokio::test]
1921 async fn with_headless_permissions_builds_an_app() {
1922 let dir = tempfile::tempdir().expect("tempdir");
1923 let mut config = Config::default();
1924 config.anthropic.api_key = Some("sk-unused".into());
1925 let app = AppBuilder::new()
1926 .with_config(config)
1927 .with_cwd(dir.path())
1928 .with_builtin_tools()
1929 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1930 .with_headless_permissions()
1931 .build()
1932 .await
1933 .expect("build");
1934 assert!(!app.session_id().is_empty());
1936 }
1937
1938 #[tokio::test]
1939 async fn new_session_swaps_in_a_fresh_empty_session() {
1940 use futures::StreamExt;
1941 let dir = tempfile::tempdir().expect("tempdir");
1942 let mut config = Config::default();
1943 config.anthropic.api_key = Some("sk-unused".into());
1944 let app = AppBuilder::new()
1945 .with_config(config)
1946 .with_cwd(dir.path())
1947 .with_builtin_tools()
1948 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1949 .build()
1950 .await
1951 .expect("build");
1952
1953 let _: Vec<_> = app
1954 .send_user_message(UserMessage::text("hello"))
1955 .collect()
1956 .await;
1957 let id_before = app.session_id();
1958 assert!(!app.session_history().await.expect("history").is_empty());
1959
1960 app.new_session().await.expect("new_session");
1961
1962 assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1963 assert!(
1964 app.session_history().await.expect("history").is_empty(),
1965 "fresh session has no history"
1966 );
1967 }
1968
1969 #[tokio::test]
1970 async fn load_session_restores_a_stored_session_by_id() {
1971 use futures::StreamExt;
1972 let dir = tempfile::tempdir().expect("tempdir");
1973 let store_dir = dir.path().join("sessions");
1974 let store: Arc<dyn SessionStore> =
1975 Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1976 let mut config = Config::default();
1977 config.anthropic.api_key = Some("sk-unused".into());
1978 let app = AppBuilder::new()
1979 .with_config(config)
1980 .with_cwd(dir.path())
1981 .with_builtin_tools()
1982 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1983 .with_session_store(Arc::clone(&store))
1984 .build()
1985 .await
1986 .expect("build");
1987
1988 let _: Vec<_> = app
1989 .send_user_message(UserMessage::text("remember this"))
1990 .collect()
1991 .await;
1992 let original_id = app.session_id();
1993
1994 app.new_session().await.expect("new_session");
1995 assert_ne!(app.session_id(), original_id);
1996
1997 app.load_session(&original_id).await.expect("load_session");
1998 assert_eq!(app.session_id(), original_id);
1999 let history = app.session_history().await.expect("history");
2000 assert!(
2001 history.iter().any(|m| m.text().contains("remember this")),
2002 "loaded session should carry the original turn"
2003 );
2004 }
2005
2006 #[tokio::test]
2007 async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
2008 use futures::StreamExt;
2009 let dir = tempfile::tempdir().expect("tempdir");
2010 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2011 dir.path().join("s"),
2012 ));
2013 let mut config = Config::default();
2014 config.anthropic.api_key = Some("sk-unused".into());
2015 let app = AppBuilder::new()
2016 .with_config(config)
2017 .with_cwd(dir.path())
2018 .with_builtin_tools()
2019 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2020 .with_session_store(store)
2021 .build()
2022 .await
2023 .expect("build");
2024
2025 let _: Vec<_> = app
2026 .send_user_message(UserMessage::text("hello"))
2027 .collect()
2028 .await;
2029 let original_id = app.session_id();
2030
2031 let new_id = app.clone_session().await.expect("clone_session");
2032
2033 assert_ne!(new_id, original_id);
2035 assert_eq!(app.session_id(), new_id);
2036 let history = app.session_history().await.expect("history");
2038 assert!(history.iter().any(|m| m.text().contains("hello")));
2039 }
2040
2041 #[tokio::test]
2042 async fn fork_from_creates_a_branch_off_an_earlier_entry() {
2043 use futures::StreamExt;
2044 let dir = tempfile::tempdir().expect("tempdir");
2045 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2046 dir.path().join("s"),
2047 ));
2048 let mut config = Config::default();
2049 config.anthropic.api_key = Some("sk-unused".into());
2050 let app = AppBuilder::new()
2051 .with_config(config)
2052 .with_cwd(dir.path())
2053 .with_builtin_tools()
2054 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2055 .with_session_store(store)
2056 .build()
2057 .await
2058 .expect("build");
2059
2060 let _: Vec<_> = app
2062 .send_user_message(UserMessage::text("first"))
2063 .collect()
2064 .await;
2065 let _: Vec<_> = app
2066 .send_user_message(UserMessage::text("second"))
2067 .collect()
2068 .await;
2069
2070 let entries = app.session.load_full().entries().await.expect("entries");
2072 let first_id = entries
2073 .iter()
2074 .find_map(|stored| {
2075 let msg = stored.entry.as_message()?;
2076 (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
2077 .then(|| stored.id.clone())
2078 })
2079 .expect("first user message present");
2080
2081 let _: Vec<_> = app
2083 .fork_from(first_id, UserMessage::text("branched"))
2084 .collect()
2085 .await;
2086
2087 let history = app.session_history().await.expect("history");
2088 let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
2089 assert!(
2090 texts.iter().any(|t| t.contains("first")),
2091 "fork keeps the fork-point ancestor"
2092 );
2093 assert!(
2094 texts.iter().any(|t| t.contains("branched")),
2095 "fork includes the new message"
2096 );
2097 assert!(
2098 !texts.iter().any(|t| t.contains("second")),
2099 "fork excludes the abandoned branch"
2100 );
2101 }
2102
2103 #[tokio::test]
2104 async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
2105 use futures::StreamExt;
2106 let dir = tempfile::tempdir().expect("tempdir");
2107 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2108 dir.path().join("s"),
2109 ));
2110 let mut config = Config::default();
2111 config.anthropic.api_key = Some("sk-unused".into());
2112 let app = AppBuilder::new()
2113 .with_config(config)
2114 .with_cwd(dir.path())
2115 .with_builtin_tools()
2116 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2117 .with_session_store(store)
2118 .build()
2119 .await
2120 .expect("build");
2121
2122 let _: Vec<_> = app
2123 .send_user_message(UserMessage::text("alpha"))
2124 .collect()
2125 .await;
2126 let _: Vec<_> = app
2127 .send_user_message(UserMessage::text("bravo"))
2128 .collect()
2129 .await;
2130
2131 let candidates = app.fork_candidates().await.expect("candidates");
2132 let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
2133 assert!(previews[0].contains("bravo"), "got {previews:?}");
2135 assert!(previews.iter().any(|p| p.contains("alpha")));
2136 assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
2138 }
2139
2140 #[tokio::test]
2141 async fn branches_returns_a_tree_for_a_linear_session() {
2142 use futures::StreamExt;
2143 let dir = tempfile::tempdir().expect("tempdir");
2144 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2145 dir.path().join("s"),
2146 ));
2147 let mut config = Config::default();
2148 config.anthropic.api_key = Some("sk-unused".into());
2149 let app = AppBuilder::new()
2150 .with_config(config)
2151 .with_cwd(dir.path())
2152 .with_builtin_tools()
2153 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2154 .with_session_store(store)
2155 .build()
2156 .await
2157 .expect("build");
2158
2159 let _: Vec<_> = app
2160 .send_user_message(UserMessage::text("hello"))
2161 .collect()
2162 .await;
2163 let tree = app.branches().await.expect("branches");
2164 assert!(!tree.nodes.is_empty());
2166 assert!(tree.active_leaf.is_some());
2167 }
2168
2169 #[tokio::test]
2170 async fn reload_settings_rebuilds_session_and_resets_token_tally() {
2171 let dir = tempfile::tempdir().expect("tempdir");
2172 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2173 dir.path().join("s"),
2174 ));
2175 let mut cfg = Config::default();
2176 cfg.anthropic.api_key = Some("sk-unused".into());
2177
2178 let llm = Arc::new(UsageLlm {
2179 responses: std::sync::Mutex::new(VecDeque::from([
2180 ChatOutput::with_usage(
2181 LlmResponse::Message("first".into()),
2182 motosan_agent_loop::TokenUsage {
2183 input_tokens: 100,
2184 output_tokens: 50,
2185 },
2186 ),
2187 ChatOutput::with_usage(
2188 LlmResponse::Message("after-reload".into()),
2189 motosan_agent_loop::TokenUsage {
2190 input_tokens: 5,
2191 output_tokens: 2,
2192 },
2193 ),
2194 ])),
2195 });
2196 let app = AppBuilder::new()
2197 .with_config(cfg)
2198 .with_cwd(dir.path())
2199 .with_builtin_tools()
2200 .with_llm(llm)
2201 .with_session_store(store)
2202 .build()
2203 .await
2204 .expect("build");
2205
2206 let _: Vec<UiEvent> = app
2207 .send_user_message(crate::user_message::UserMessage::text("hi"))
2208 .collect()
2209 .await;
2210 {
2211 let token_tally = app.token_tally();
2212 let tally = token_tally.lock().await;
2213 assert_eq!(tally.cumulative_input, 100);
2214 assert_eq!(tally.cumulative_output, 50);
2215 assert_eq!(tally.turn_count, 1);
2216 }
2217
2218 let mut new_settings = crate::settings::Settings::default();
2219 new_settings.model.name = "claude-opus-4-7".into();
2220 new_settings.ui.footer_show_cost = false;
2221 app.reload_settings(new_settings).await.expect("reload");
2222 assert_eq!(app.factory.settings().model.name, "claude-opus-4-7");
2223 assert!(!app.factory.settings().ui.footer_show_cost);
2224 assert_eq!(app.factory.current_model(), None);
2225
2226 {
2227 let token_tally = app.token_tally();
2228 let tally = token_tally.lock().await;
2229 assert_eq!(tally.cumulative_input, 0, "tally not reset on reload");
2230 assert_eq!(tally.cumulative_output, 0);
2231 assert_eq!(tally.turn_count, 0);
2232 }
2233
2234 let _: Vec<UiEvent> = app
2235 .send_user_message(crate::user_message::UserMessage::text("after"))
2236 .collect()
2237 .await;
2238 {
2239 let token_tally = app.token_tally();
2240 let tally = token_tally.lock().await;
2241 assert_eq!(tally.cumulative_input, 5);
2242 assert_eq!(tally.cumulative_output, 2);
2243 assert_eq!(tally.turn_count, 1);
2244 }
2245 }
2246
2247 #[tokio::test]
2248 async fn switch_model_preserves_history() {
2249 use futures::StreamExt;
2250 let dir = tempfile::tempdir().expect("tempdir");
2251 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2252 dir.path().join("s"),
2253 ));
2254 let mut config = Config::default();
2255 config.anthropic.api_key = Some("sk-unused".into());
2256 let app = AppBuilder::new()
2257 .with_config(config)
2258 .with_cwd(dir.path())
2259 .with_builtin_tools()
2260 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2261 .with_session_store(store)
2262 .build()
2263 .await
2264 .expect("build");
2265
2266 let _: Vec<_> = app
2267 .send_user_message(UserMessage::text("keep me"))
2268 .collect()
2269 .await;
2270 let id_before = app.session_id();
2271
2272 app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
2273 .await
2274 .expect("switch_model");
2275
2276 assert_eq!(
2277 app.session_id(),
2278 id_before,
2279 "switch_model keeps the same session"
2280 );
2281 let history = app.session_history().await.expect("history");
2282 assert!(history.iter().any(|m| m.text().contains("keep me")));
2283 }
2284
2285 #[tokio::test]
2286 async fn switch_model_is_sticky_for_future_session_rebuilds() {
2287 let dir = tempfile::tempdir().expect("tempdir");
2288 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
2289 dir.path().join("s"),
2290 ));
2291 let mut config = Config::default();
2292 config.anthropic.api_key = Some("sk-unused".into());
2293 let app = AppBuilder::new()
2294 .with_config(config)
2295 .with_cwd(dir.path())
2296 .with_builtin_tools()
2297 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
2298 .with_session_store(store)
2299 .build()
2300 .await
2301 .expect("build");
2302
2303 let selected = crate::model::ModelId::from("claude-opus-4-7");
2304 app.switch_model(&selected).await.expect("switch_model");
2305 app.new_session().await.expect("new_session");
2306
2307 assert_eq!(app.factory.current_model(), Some(selected.clone()));
2308 assert_eq!(app.settings().model.name, selected.to_string());
2309 }
2310
2311 struct SleepThenDoneLlm {
2312 turn: AtomicUsize,
2313 }
2314
2315 #[async_trait]
2316 impl LlmClient for SleepThenDoneLlm {
2317 async fn chat(
2318 &self,
2319 _messages: &[Message],
2320 _tools: &[ToolDef],
2321 ) -> motosan_agent_loop::Result<ChatOutput> {
2322 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2323 if turn == 0 {
2324 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2325 ToolCallItem {
2326 id: "sleep".into(),
2327 name: "bash".into(),
2328 args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
2329 },
2330 ])))
2331 } else {
2332 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2333 }
2334 }
2335 }
2336
2337 #[tokio::test]
2338 async fn cancel_reaches_builtin_tools_after_session_rebuild() {
2339 use futures::StreamExt;
2340 let dir = tempfile::tempdir().expect("tempdir");
2341 let mut config = Config::default();
2342 config.anthropic.api_key = Some("sk-unused".into());
2343 let app = Arc::new(
2344 AppBuilder::new()
2345 .with_config(config)
2346 .with_cwd(dir.path())
2347 .with_builtin_tools()
2348 .with_llm(Arc::new(SleepThenDoneLlm {
2349 turn: AtomicUsize::new(0),
2350 }) as Arc<dyn LlmClient>)
2351 .build()
2352 .await
2353 .expect("build"),
2354 );
2355
2356 app.new_session().await.expect("new_session");
2357 let running_app = Arc::clone(&app);
2358 let handle = tokio::spawn(async move {
2359 running_app
2360 .send_user_message(UserMessage::text("run a slow command"))
2361 .collect::<Vec<_>>()
2362 .await
2363 });
2364 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2365 app.cancel();
2366
2367 let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
2368 .await
2369 .expect("turn should finish after cancellation")
2370 .expect("join");
2371 assert!(
2372 events.iter().any(|event| {
2373 matches!(
2374 event,
2375 UiEvent::ToolCallCompleted { result, .. }
2376 if result.text.contains("command cancelled by user")
2377 )
2378 }),
2379 "cancel should reach the rebuilt bash tool: {events:?}"
2380 );
2381 }
2382
2383 #[tokio::test]
2384 async fn compact_summarizes_a_session_with_enough_history() {
2385 struct DoneLlm;
2386 #[async_trait]
2387 impl LlmClient for DoneLlm {
2388 async fn chat(
2389 &self,
2390 _messages: &[Message],
2391 _tools: &[ToolDef],
2392 ) -> motosan_agent_loop::Result<ChatOutput> {
2393 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
2394 }
2395 }
2396
2397 let dir = tempfile::tempdir().expect("tempdir");
2398 let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
2399 dir.path().join("sessions"),
2400 ));
2401 let mut config = Config::default();
2402 config.anthropic.api_key = Some("sk-unused".into());
2403 let app = AppBuilder::new()
2404 .with_config(config)
2405 .with_cwd(dir.path())
2406 .with_builtin_tools()
2407 .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
2408 .with_session_store(store)
2409 .build()
2410 .await
2411 .expect("build");
2412
2413 for i in 0..4 {
2420 let _: Vec<_> = app
2421 .send_user_message(UserMessage::text(format!("turn {i}")))
2422 .collect()
2423 .await;
2424 }
2425
2426 app.compact().await.expect("compact should succeed");
2427
2428 let history = app.session_history().await.expect("history");
2431 assert!(
2432 !history.is_empty(),
2433 "session should still have content post-compaction"
2434 );
2435 }
2436
2437 #[test]
2438 fn anthropic_env_api_key_overrides_auth_json_key() {
2439 let mut auth = crate::auth::Auth::default();
2440 auth.0.insert(
2441 "anthropic".into(),
2442 crate::auth::ProviderAuth::ApiKey {
2443 key: "sk-auth".into(),
2444 },
2445 );
2446
2447 let key = anthropic_api_key_from(&auth, |name| {
2448 (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
2449 });
2450 assert_eq!(key.as_deref(), Some("sk-env"));
2451 }
2452
2453 #[tokio::test]
2454 async fn with_settings_overrides_deprecated_config_model() {
2455 use crate::settings::Settings;
2456
2457 let mut config = Config::default();
2458 config.model.name = "from-config".into();
2459 config.anthropic.api_key = Some("sk-config".into());
2460
2461 let mut settings = Settings::default();
2462 settings.model.name = "from-settings".into();
2463
2464 let tmp = tempfile::tempdir().unwrap();
2465 let app = AppBuilder::new()
2466 .with_config(config)
2467 .with_settings(settings)
2468 .with_cwd(tmp.path())
2469 .disable_context_discovery()
2470 .with_llm(Arc::new(EchoLlm))
2471 .build()
2472 .await
2473 .expect("build");
2474 assert_eq!(app.config().model.name, "from-settings");
2475 assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
2476 }
2477
2478 #[tokio::test]
2479 async fn with_settings_synthesises_legacy_config_for_build() {
2480 use crate::auth::{Auth, ProviderAuth};
2481 use crate::settings::Settings;
2482
2483 let mut settings = Settings::default();
2484 settings.model.name = "claude-sonnet-4-6".into();
2485
2486 let mut auth = Auth::default();
2487 auth.0.insert(
2488 "anthropic".into(),
2489 ProviderAuth::ApiKey {
2490 key: "sk-test".into(),
2491 },
2492 );
2493
2494 let tmp = tempfile::tempdir().unwrap();
2495 let app = AppBuilder::new()
2496 .with_settings(settings)
2497 .with_auth(auth)
2498 .with_cwd(tmp.path())
2499 .with_builtin_tools()
2500 .disable_context_discovery()
2501 .with_llm(Arc::new(EchoLlm))
2502 .build()
2503 .await
2504 .expect("build");
2505 let _ = app;
2506 }
2507
2508 #[tokio::test]
2509 async fn cancel_before_turn_does_not_poison_future_turns() {
2510 let dir = tempfile::tempdir().unwrap();
2511 let mut cfg = Config::default();
2512 cfg.anthropic.api_key = Some("sk-unused".into());
2513 let app = AppBuilder::new()
2514 .with_config(cfg)
2515 .with_cwd(dir.path())
2516 .with_builtin_tools()
2517 .with_llm(std::sync::Arc::new(EchoLlm))
2518 .build()
2519 .await
2520 .expect("build");
2521
2522 app.cancel();
2523 let events: Vec<UiEvent> = app
2524 .send_user_message(UserMessage::text("x"))
2525 .collect()
2526 .await;
2527
2528 assert!(
2529 events
2530 .iter()
2531 .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
2532 "turn should use a fresh cancellation token: {events:?}"
2533 );
2534 }
2535
2536 #[test]
2537 fn map_event_matches_started_and_completed_ids_by_tool_name() {
2538 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2539
2540 let started_bash = map_event(
2541 AgentEvent::Core(CoreEvent::ToolStarted {
2542 name: "bash".into(),
2543 args: serde_json::json!({}),
2544 }),
2545 &tracker,
2546 );
2547 let started_read = map_event(
2548 AgentEvent::Core(CoreEvent::ToolStarted {
2549 name: "read".into(),
2550 args: serde_json::json!({}),
2551 }),
2552 &tracker,
2553 );
2554 let completed_bash = map_event(
2555 AgentEvent::Core(CoreEvent::ToolCompleted {
2556 name: "bash".into(),
2557 result: motosan_agent_tool::ToolResult::text("ok"),
2558 }),
2559 &tracker,
2560 );
2561 let completed_read = map_event(
2562 AgentEvent::Core(CoreEvent::ToolCompleted {
2563 name: "read".into(),
2564 result: motosan_agent_tool::ToolResult::text("ok"),
2565 }),
2566 &tracker,
2567 );
2568
2569 assert!(matches!(
2570 started_bash,
2571 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
2572 ));
2573 assert!(matches!(
2574 started_read,
2575 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
2576 ));
2577 assert!(matches!(
2578 completed_bash,
2579 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
2580 ));
2581 assert!(matches!(
2582 completed_read,
2583 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
2584 ));
2585 }
2586
2587 #[test]
2588 fn map_event_forwards_thinking_chunk_as_delta() {
2589 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2590 let ev = AgentEvent::Core(CoreEvent::ThinkingChunk("partial ".into()));
2591 match map_event(ev, &tracker) {
2592 Some(UiEvent::AgentThinkingDelta(s)) => assert_eq!(s, "partial "),
2593 other => panic!("expected AgentThinkingDelta, got {other:?}"),
2594 }
2595 }
2596
2597 #[test]
2598 fn map_event_forwards_thinking_done_as_complete() {
2599 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2600 let ev = AgentEvent::Core(CoreEvent::ThinkingDone("full thought".into()));
2601 match map_event(ev, &tracker) {
2602 Some(UiEvent::ThinkingComplete { text }) => assert_eq!(text, "full thought"),
2603 other => panic!("expected ThinkingComplete, got {other:?}"),
2604 }
2605 }
2606
2607 #[test]
2608 fn map_event_forwards_tool_started_args() {
2609 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2614 let args = serde_json::json!({"path": "src/main.rs", "limit": 20});
2615 let mapped = map_event(
2616 AgentEvent::Core(CoreEvent::ToolStarted {
2617 name: "read".into(),
2618 args: args.clone(),
2619 }),
2620 &tracker,
2621 );
2622 match mapped {
2623 Some(UiEvent::ToolCallStarted {
2624 args: forwarded, ..
2625 }) => assert_eq!(forwarded, args, "args must round-trip from CoreEvent"),
2626 other => panic!("expected ToolCallStarted; got {other:?}"),
2627 }
2628 }
2629
2630 #[test]
2631 fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
2632 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2633 let s1 = map_event(
2634 AgentEvent::Core(CoreEvent::ToolStarted {
2635 name: "bash".into(),
2636 args: serde_json::json!({}),
2637 }),
2638 &tracker,
2639 );
2640 let s2 = map_event(
2641 AgentEvent::Core(CoreEvent::ToolStarted {
2642 name: "bash".into(),
2643 args: serde_json::json!({}),
2644 }),
2645 &tracker,
2646 );
2647 let c1 = map_event(
2648 AgentEvent::Core(CoreEvent::ToolCompleted {
2649 name: "bash".into(),
2650 result: motosan_agent_tool::ToolResult::text("a"),
2651 }),
2652 &tracker,
2653 );
2654 let c2 = map_event(
2655 AgentEvent::Core(CoreEvent::ToolCompleted {
2656 name: "bash".into(),
2657 result: motosan_agent_tool::ToolResult::text("b"),
2658 }),
2659 &tracker,
2660 );
2661
2662 let id_s1 = match s1 {
2663 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2664 other => panic!("{other:?}"),
2665 };
2666 let id_s2 = match s2 {
2667 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2668 other => panic!("{other:?}"),
2669 };
2670 let id_c1 = match c1 {
2671 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2672 other => panic!("{other:?}"),
2673 };
2674 let id_c2 = match c2 {
2675 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
2676 other => panic!("{other:?}"),
2677 };
2678
2679 assert_eq!(id_s1, id_c1);
2680 assert_eq!(id_s2, id_c2);
2681 assert_ne!(id_s1, id_s2);
2682 }
2683
2684 #[tokio::test]
2685 async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
2686 let dir = tempfile::tempdir().unwrap();
2687 let mut cfg = Config::default();
2688 cfg.anthropic.api_key = Some("sk-unused".into());
2689 let app = AppBuilder::new()
2690 .with_config(cfg)
2691 .with_cwd(dir.path())
2692 .with_builtin_tools()
2693 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2694 turn: AtomicUsize::new(0),
2695 }))
2696 .build()
2697 .await
2698 .expect("build");
2699
2700 let mut first = Box::pin(app.send_user_message(UserMessage::text("first")));
2701 let first_event = first.next().await;
2702 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2703
2704 let second_events: Vec<UiEvent> = app
2705 .send_user_message(UserMessage::text("second"))
2706 .collect()
2707 .await;
2708 assert_eq!(
2709 second_events.len(),
2710 1,
2711 "expected immediate single error event, got: {second_events:?}"
2712 );
2713 assert!(matches!(
2714 &second_events[0],
2715 UiEvent::Error(msg) if msg.contains("single-turn-per-App")
2716 ));
2717 }
2718
2719 #[tokio::test]
2720 async fn attachment_error_is_emitted_before_the_single_turn_guard_fires() {
2721 let dir = tempfile::tempdir().unwrap();
2727 let mut cfg = Config::default();
2728 cfg.anthropic.api_key = Some("sk-unused".into());
2729 let app = AppBuilder::new()
2730 .with_config(cfg)
2731 .with_cwd(dir.path())
2732 .with_builtin_tools()
2733 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
2734 turn: AtomicUsize::new(0),
2735 }))
2736 .build()
2737 .await
2738 .expect("build");
2739
2740 let mut first =
2742 Box::pin(app.send_user_message(crate::user_message::UserMessage::text("first")));
2743 let first_event = first.next().await;
2744 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
2745
2746 let bad = crate::user_message::UserMessage {
2748 text: "second".into(),
2749 attachments: vec![crate::user_message::Attachment::Image {
2750 path: std::path::PathBuf::from("/tmp/this-file-must-not-exist-capo-v06-test.png"),
2751 }],
2752 };
2753 let second_events: Vec<UiEvent> = app.send_user_message(bad).collect().await;
2754
2755 assert_eq!(
2756 second_events.len(),
2757 1,
2758 "expected exactly one event (the attachment error); got: {second_events:?}"
2759 );
2760 assert!(
2761 matches!(
2762 &second_events[0],
2763 UiEvent::AttachmentError {
2764 kind: crate::user_message::AttachmentErrorKind::NotFound,
2765 ..
2766 }
2767 ),
2768 "expected AttachmentError::NotFound as first event; got {second_events:?}"
2769 );
2770 }
2771
2772 struct InfiniteToolLlm;
2773
2774 #[async_trait]
2775 impl LlmClient for InfiniteToolLlm {
2776 async fn chat(
2777 &self,
2778 _messages: &[Message],
2779 _tools: &[ToolDef],
2780 ) -> motosan_agent_loop::Result<ChatOutput> {
2781 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
2784 ToolCallItem {
2785 id: "loop".into(),
2786 name: "read".into(),
2787 args: serde_json::json!({"path": "nope.txt"}),
2788 },
2789 ])))
2790 }
2791 }
2792
2793 #[tokio::test]
2794 async fn max_iterations_surfaces_as_notice_with_lifecycle_complete() {
2795 let dir = tempfile::tempdir().unwrap();
2796 let mut cfg = Config::default();
2797 cfg.anthropic.api_key = Some("sk-unused".into());
2798 let app = AppBuilder::new()
2799 .with_config(cfg)
2800 .with_cwd(dir.path())
2801 .with_builtin_tools()
2802 .with_llm(std::sync::Arc::new(InfiniteToolLlm))
2803 .with_max_iterations(3)
2804 .build()
2805 .await
2806 .expect("build");
2807
2808 let events: Vec<UiEvent> =
2809 futures::StreamExt::collect(app.send_user_message(UserMessage::text("loop"))).await;
2810
2811 assert!(
2812 !events.iter().any(|e| matches!(e, UiEvent::Error(_))),
2813 "MaxIterations should surface as Notice, not Error; got: {events:?}"
2814 );
2815
2816 let notice = events.iter().find_map(|e| match e {
2817 UiEvent::Notice { title, body } => Some((title.clone(), body.clone())),
2818 _ => None,
2819 });
2820 let (title, body) =
2821 notice.unwrap_or_else(|| panic!("expected Notice event; got: {events:?}"));
2822 let title_lower = title.to_lowercase();
2823 assert!(
2824 title_lower.contains("stop") || title_lower.contains("iteration"),
2825 "notice title should mention stop/iteration; got title={title:?} body={body:?}"
2826 );
2827 assert!(
2828 body.contains("3"),
2829 "body should reference the per-turn cap (3); got body={body:?}"
2830 );
2831 let body_lower = body.to_lowercase();
2832 assert!(
2833 body_lower.contains("continue") || body_lower.contains("send another"),
2834 "body should hint that user can continue; got body={body:?}"
2835 );
2836
2837 assert!(
2838 events
2839 .iter()
2840 .any(|e| matches!(e, UiEvent::AgentTurnComplete)),
2841 "AgentTurnComplete should fire on the soft-cap path; got: {events:?}"
2842 );
2843 }
2844
2845 #[test]
2846 fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
2847 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
2848 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2849
2850 let only = map_event(
2851 AgentEvent::Core(CoreEvent::ToolStarted {
2852 name: "bash".into(),
2853 args: serde_json::json!({}),
2854 }),
2855 &tracker,
2856 );
2857 let only_id = match only {
2858 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2859 other => panic!("{other:?}"),
2860 };
2861 assert_eq!(progress_event_id(&tracker), only_id);
2862
2863 let _second = map_event(
2864 AgentEvent::Core(CoreEvent::ToolStarted {
2865 name: "read".into(),
2866 args: serde_json::json!({}),
2867 }),
2868 &tracker,
2869 );
2870 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2871 }
2872
2873 #[tokio::test]
2874 async fn builder_rejects_builtin_and_custom_tools_together() {
2875 let mut cfg = Config::default();
2876 cfg.anthropic.api_key = Some("sk-unused".into());
2877 let dir = tempfile::tempdir().unwrap();
2878 let err = match AppBuilder::new()
2879 .with_config(cfg)
2880 .with_cwd(dir.path())
2881 .with_builtin_tools()
2882 .with_custom_tools_factory(|_| Vec::new())
2883 .build()
2884 .await
2885 {
2886 Ok(_) => panic!("must reject conflicting tool configuration"),
2887 Err(err) => err,
2888 };
2889
2890 assert!(format!("{err}").contains("mutually exclusive"));
2891 }
2892
2893 #[tokio::test]
2895 async fn two_turns_in_same_session_share_history() {
2896 #[derive(Default)]
2897 struct CounterLlm {
2898 turn: AtomicUsize,
2899 }
2900 #[async_trait]
2901 impl LlmClient for CounterLlm {
2902 async fn chat(
2903 &self,
2904 messages: &[Message],
2905 _tools: &[ToolDef],
2906 ) -> motosan_agent_loop::Result<ChatOutput> {
2907 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2908 let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
2909 Ok(ChatOutput::new(LlmResponse::Message(answer)))
2910 }
2911 }
2912
2913 let tmp = tempfile::tempdir().unwrap();
2914 let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
2915 tmp.path().to_path_buf(),
2916 ));
2917
2918 let app = AppBuilder::new()
2919 .with_settings(crate::settings::Settings::default())
2920 .with_auth(crate::auth::Auth::default())
2921 .with_cwd(tmp.path())
2922 .with_builtin_tools()
2923 .disable_context_discovery()
2924 .with_llm(std::sync::Arc::new(CounterLlm::default()))
2925 .with_session_store(store)
2926 .build_with_session(None)
2927 .await
2928 .expect("build");
2929
2930 let _events1: Vec<UiEvent> = app
2931 .send_user_message(UserMessage::text("hi"))
2932 .collect()
2933 .await;
2934 let events2: Vec<UiEvent> = app
2935 .send_user_message(UserMessage::text("again"))
2936 .collect()
2937 .await;
2938
2939 let saw_more_than_one = events2.iter().any(|e| {
2941 matches!(
2942 e,
2943 UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
2944 )
2945 });
2946 assert!(
2947 saw_more_than_one,
2948 "second turn should have seen history; events: {events2:?}"
2949 );
2950 }
2951}
2952
2953#[cfg(test)]
2954mod skills_builder_tests {
2955 use super::*;
2956 use crate::skills::types::{Skill, SkillSource};
2957 use std::path::PathBuf;
2958
2959 fn fixture() -> Skill {
2960 Skill {
2961 name: "x".into(),
2962 description: "d".into(),
2963 file_path: PathBuf::from("/x.md"),
2964 base_dir: PathBuf::from("/"),
2965 disable_model_invocation: false,
2966 source: SkillSource::Global,
2967 }
2968 }
2969
2970 #[test]
2971 fn with_skills_stores_skills() {
2972 let b = AppBuilder::new().with_skills(vec![fixture()]);
2973 assert_eq!(b.skills.len(), 1);
2974 assert_eq!(b.skills[0].name, "x");
2975 }
2976
2977 #[test]
2978 fn without_skills_clears() {
2979 let b = AppBuilder::new()
2980 .with_skills(vec![fixture()])
2981 .without_skills();
2982 assert!(b.skills.is_empty());
2983 }
2984}
2985
2986#[cfg(test)]
2987mod mcp_builder_tests {
2988 use super::*;
2989 use motosan_agent_tool::Tool;
2990
2991 struct FakeTool;
2993 impl Tool for FakeTool {
2994 fn def(&self) -> motosan_agent_tool::ToolDef {
2995 motosan_agent_tool::ToolDef {
2996 name: "fake__echo".into(),
2997 description: "test".into(),
2998 input_schema: serde_json::json!({"type": "object"}),
2999 }
3000 }
3001 fn call(
3002 &self,
3003 _args: serde_json::Value,
3004 _ctx: &motosan_agent_tool::ToolContext,
3005 ) -> std::pin::Pin<
3006 Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
3007 > {
3008 Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
3009 }
3010 }
3011
3012 #[test]
3013 fn with_extra_tools_stores_tools() {
3014 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
3015 let b = AppBuilder::new().with_extra_tools(tools);
3016 assert_eq!(b.extra_tools.len(), 1);
3017 }
3018}