1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9 AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10 CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23#[derive(Debug, Clone)]
25pub(crate) enum SessionMode {
26 New,
28 Resume(String),
30}
31
32struct SharedLlm {
37 client: Arc<dyn LlmClient>,
38}
39
40impl SharedLlm {
41 fn new(client: Arc<dyn LlmClient>) -> Self {
42 Self { client }
43 }
44
45 fn client(&self) -> Arc<dyn LlmClient> {
46 Arc::clone(&self.client)
47 }
48}
49
50pub(crate) struct SessionFactory {
51 cwd: PathBuf,
52 settings: crate::settings::Settings,
53 auth: crate::auth::Auth,
54 policy: Arc<crate::permissions::Policy>,
55 session_cache: Arc<crate::permissions::SessionCache>,
56 ui_tx: Option<mpsc::Sender<UiEvent>>,
57 permission_gate: Arc<dyn PermissionGate>,
58 progress_tx: mpsc::Sender<ToolProgressChunk>,
61 skills: Arc<Vec<crate::skills::Skill>>,
62 install_builtin_tools: bool,
63 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
64 max_iterations: usize,
65 context_discovery_disabled: bool,
66 autocompact_enabled: bool,
67 session_store: Option<Arc<dyn SessionStore>>,
68 llm_override: Option<Arc<dyn LlmClient>>,
69 current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
70 cancel_token: SharedCancelToken,
71}
72
73impl SessionFactory {
74 fn current_model(&self) -> Option<crate::model::ModelId> {
75 match self.current_model.lock() {
76 Ok(guard) => guard.clone(),
77 Err(poisoned) => poisoned.into_inner().clone(),
78 }
79 }
80
81 fn set_current_model(&self, model: crate::model::ModelId) {
82 match self.current_model.lock() {
83 Ok(mut guard) => *guard = Some(model),
84 Err(poisoned) => *poisoned.into_inner() = Some(model),
85 }
86 }
87
88 async fn build(
93 &self,
94 mode: SessionMode,
95 model_override: Option<&crate::model::ModelId>,
96 ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
97 let effective_model = model_override.cloned().or_else(|| self.current_model());
99 let mut settings = self.settings.clone();
100 if let Some(m) = &effective_model {
101 settings.model.name = m.as_str().to_string();
102 }
103
104 let llm = if effective_model.is_none() {
105 self.llm_override.as_ref().map_or_else(
106 || build_llm_client(&settings, &self.auth),
107 |llm| Ok(Arc::clone(llm)),
108 )?
109 } else {
110 build_llm_client(&settings, &self.auth)?
111 };
112
113 let tool_ctx = ToolCtx::new_with_cancel_token(
116 &self.cwd,
117 Arc::clone(&self.permission_gate),
118 self.progress_tx.clone(),
119 self.cancel_token.clone(),
120 );
121 let mut tools = if self.install_builtin_tools {
122 builtin_tools(tool_ctx.clone())
123 } else {
124 Vec::new()
125 };
126 tools.extend(self.extra_tools.iter().cloned());
127
128 let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
129 let base_prompt = build_system_prompt(&tool_names, &self.skills);
130 let system_prompt = if self.context_discovery_disabled {
131 base_prompt
132 } else {
133 let agent_dir = crate::paths::agent_dir();
134 let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
135 crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
136 };
137 let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
138
139 let mut engine_builder = Engine::builder()
140 .max_iterations(self.max_iterations)
141 .system_prompt(system_prompt)
142 .tool_context(motosan_tool_context);
143 for tool in tools {
144 engine_builder = engine_builder.tool(tool);
145 }
146 if let Some(ui_tx) = &self.ui_tx {
147 let ext = crate::permissions::PermissionExtension::new(
148 Arc::clone(&self.policy),
149 Arc::clone(&self.session_cache),
150 self.cwd.clone(),
151 ui_tx.clone(),
152 );
153 engine_builder = engine_builder.extension(Box::new(ext));
154 }
155 if self.autocompact_enabled
156 && settings.session.compact_at_context_pct > 0.0
157 && settings.session.compact_at_context_pct < 1.0
158 {
159 let cfg = AutocompactConfig {
160 threshold: settings.session.compact_at_context_pct,
161 max_context_tokens: settings.session.max_context_tokens,
162 keep_turns: settings.session.keep_turns.max(1),
163 };
164 engine_builder = engine_builder
165 .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
166 }
167 let engine = engine_builder.build();
168
169 let session = match (&mode, &self.session_store) {
170 (SessionMode::Resume(id), Some(store)) => {
171 let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
172 .await
173 .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
174 let entries = s
175 .entries()
176 .await
177 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
178 crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
179 s
180 }
181 (SessionMode::Resume(_), None) => {
182 return Err(AppError::Config("resume requires a session store".into()));
183 }
184 (SessionMode::New, Some(store)) => {
185 let id = crate::session::SessionId::new();
186 AgentSession::new_with_store(
187 id.into_string(),
188 Arc::clone(store),
189 engine,
190 Arc::clone(&llm),
191 )
192 }
193 (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
194 };
195
196 Ok((session, llm))
197 }
198}
199
200pub struct App {
201 session: arc_swap::ArcSwap<AgentSession>,
202 llm: arc_swap::ArcSwap<SharedLlm>,
203 factory: SessionFactory,
204 config: Config,
205 cancel_token: SharedCancelToken,
206 progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
207 next_tool_id: Arc<Mutex<ToolCallTracker>>,
208 skills: Arc<Vec<crate::skills::Skill>>,
209 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
210 pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
211}
212
213impl App {
214 pub fn config(&self) -> &Config {
215 &self.config
216 }
217
218 pub fn cancel(&self) {
222 self.cancel_token.cancel();
223 }
224
225 pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
229 Arc::clone(&self.session_cache)
230 }
231
232 pub fn session_id(&self) -> String {
235 self.session.load().session_id().to_string()
236 }
237
238 pub async fn session_history(
247 &self,
248 ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
249 self.session.load_full().history().await
250 }
251
252 pub async fn compact(&self) -> Result<()> {
258 use motosan_agent_loop::ThresholdStrategy;
259 let strategy = ThresholdStrategy {
260 threshold: 0.0,
261 ..ThresholdStrategy::default()
262 };
263 let llm = self.llm.load_full().client();
264 self.session
265 .load_full()
266 .maybe_compact(&strategy, llm)
267 .await
268 .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
269 Ok(())
270 }
271
272 pub async fn new_session(&self) -> Result<()> {
276 let (session, llm) = self.factory.build(SessionMode::New, None).await?;
277 self.session.store(Arc::new(session));
278 self.llm.store(Arc::new(SharedLlm::new(llm)));
279 Ok(())
280 }
281
282 pub async fn load_session(&self, id: &str) -> Result<()> {
288 let (session, llm) = self
289 .factory
290 .build(SessionMode::Resume(id.to_string()), None)
291 .await?;
292 self.session.store(Arc::new(session));
293 self.llm.store(Arc::new(SharedLlm::new(llm)));
294 Ok(())
295 }
296
297 pub async fn clone_session(&self) -> Result<String> {
306 let Some(store) = self.factory.session_store.as_ref() else {
307 return Err(AppError::Config("clone requires a session store".into()));
308 };
309 let source_id = self.session.load().session_id().to_string();
310 let new_id = crate::session::SessionId::new().into_string();
311 let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
312 catalog
313 .fork(&source_id, &new_id)
314 .await
315 .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
316 self.load_session(&new_id).await?;
317 Ok(new_id)
318 }
319
320 pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
335 let current_id = self.session.load().session_id().to_string();
336 let (session, llm) = self
337 .factory
338 .build(SessionMode::Resume(current_id), Some(model))
339 .await?;
340 self.factory.set_current_model(model.clone());
341 self.session.store(Arc::new(session));
342 self.llm.store(Arc::new(SharedLlm::new(llm)));
343 Ok(())
344 }
345
346 pub async fn disconnect_mcp(&self) {
349 for (name, server) in &self.mcp_servers {
350 let _ =
351 tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
352 tracing::debug!(target: "mcp", server = %name, "disconnected");
353 }
354 }
355
356 fn run_turn(
357 &self,
358 text: String,
359 fork_from: Option<motosan_agent_loop::EntryId>,
360 ) -> impl Stream<Item = UiEvent> + Send + 'static {
361 let session = self.session.load_full();
362 let skills = Arc::clone(&self.skills);
363 let cancel_token = self.cancel_token.clone();
364 let tracker = Arc::clone(&self.next_tool_id);
365 let progress = Arc::clone(&self.progress_rx);
366
367 async_stream::stream! {
368 let mut progress_guard = match progress.try_lock() {
370 Ok(guard) => guard,
371 Err(_) => {
372 yield UiEvent::Error(
373 "another turn is already running; capo is single-turn-per-App".into(),
374 );
375 return;
376 }
377 };
378
379 let cancel = cancel_token.reset();
381
382 yield UiEvent::AgentTurnStarted;
383 yield UiEvent::AgentThinking;
384
385 let text = crate::skills::expand::expand_skill_command(&text, &skills);
386 let new_user = motosan_agent_loop::Message::user(&text);
387
388 let handle = match fork_from {
390 None => {
391 let history = match session.history().await {
393 Ok(h) => h,
394 Err(err) => {
395 yield UiEvent::Error(format!("session.history failed: {err}"));
396 return;
397 }
398 };
399 let mut messages = history;
400 messages.push(new_user);
401 match session.start_turn(messages).await {
402 Ok(h) => h,
403 Err(err) => {
404 yield UiEvent::Error(format!("session.start_turn failed: {err}"));
405 return;
406 }
407 }
408 }
409 Some(from) => {
410 match session.fork_turn(from, vec![new_user]).await {
412 Ok(h) => h,
413 Err(err) => {
414 yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
415 return;
416 }
417 }
418 }
419 };
420 let previous_len = handle.previous_len;
421 let epoch = handle.epoch;
422 let branch_parent = handle.branch_parent;
423 let ops_tx = handle.ops_tx.clone();
424 let mut agent_stream = handle.stream;
425
426 let interrupt_bridge = tokio::spawn(async move {
434 cancel.cancelled().await;
435 let _ = ops_tx.send(AgentOp::Interrupt).await;
436 });
437
438 let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
440 let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
441
442 loop {
443 while let Ok(chunk) = progress_guard.try_recv() {
445 yield UiEvent::ToolCallProgress {
446 id: progress_event_id(&tracker),
447 chunk: ProgressChunk::from(chunk),
448 };
449 }
450
451 tokio::select! {
452 biased;
453 maybe_item = agent_stream.next() => {
454 match maybe_item {
455 Some(AgentStreamItem::Event(ev)) => {
456 if let Some(ui) = map_event(ev, &tracker) {
457 yield ui;
458 }
459 }
460 Some(AgentStreamItem::Terminal(term)) => {
461 terminal_result = Some(term.result);
462 terminal_messages = Some(term.messages);
463 break;
464 }
465 None => break,
466 }
467 }
468 Some(chunk) = progress_guard.recv() => {
469 yield UiEvent::ToolCallProgress {
470 id: progress_event_id(&tracker),
471 chunk: ProgressChunk::from(chunk),
472 };
473 }
474 }
475 }
476
477 interrupt_bridge.abort();
479
480 if let Some(msgs) = terminal_messages.as_ref() {
482 if let Err(err) = session
483 .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
484 .await
485 {
486 yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
487 }
488 }
489
490 match terminal_result {
492 Some(Ok(_)) => {
493 let final_text = terminal_messages
494 .as_ref()
495 .and_then(|msgs| {
496 msgs.iter()
497 .rev()
498 .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
499 .map(|m| m.text())
500 })
501 .unwrap_or_default();
502 if !final_text.is_empty() {
503 yield UiEvent::AgentMessageComplete(final_text);
504 }
505 while let Ok(chunk) = progress_guard.try_recv() {
507 yield UiEvent::ToolCallProgress {
508 id: progress_event_id(&tracker),
509 chunk: ProgressChunk::from(chunk),
510 };
511 }
512 yield UiEvent::AgentTurnComplete;
513 }
514 Some(Err(err)) => {
515 yield UiEvent::Error(format!("{err}"));
516 }
517 None => { }
518 }
519 }
520 }
521
522 pub fn send_user_message(&self, text: String) -> impl Stream<Item = UiEvent> + Send + 'static {
523 self.run_turn(text, None)
524 }
525
526 pub fn fork_from(
531 &self,
532 from: motosan_agent_loop::EntryId,
533 message: String,
534 ) -> impl Stream<Item = UiEvent> + Send + 'static {
535 self.run_turn(message, Some(from))
536 }
537
538 pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
542 let entries = self
543 .session
544 .load_full()
545 .entries()
546 .await
547 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
548 let branch = motosan_agent_loop::active_branch(&entries);
549 let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
550 .iter()
551 .filter_map(|stored| {
552 let msg = stored.entry.as_message()?;
553 if !matches!(msg.role(), motosan_agent_loop::Role::User) {
554 return None;
555 }
556 let preview: String = msg
557 .text()
558 .lines()
559 .next()
560 .unwrap_or("")
561 .chars()
562 .take(80)
563 .collect();
564 Some((stored.id.clone(), preview))
565 })
566 .collect();
567 out.reverse();
568 Ok(out)
569 }
570
571 pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
573 self.session
574 .load_full()
575 .branches()
576 .await
577 .map_err(|e| AppError::Config(format!("branches failed: {e}")))
578 }
579}
580
581#[derive(Debug, Default)]
582struct ToolCallTracker {
583 next_id: usize,
584 pending: VecDeque<(String, String)>,
585}
586
587impl ToolCallTracker {
588 fn start(&mut self, name: &str) -> String {
589 self.next_id += 1;
590 let id = format!("tool_{}", self.next_id);
591 self.pending.push_back((name.to_string(), id.clone()));
592 id
593 }
594
595 fn complete(&mut self, name: &str) -> String {
596 if let Some(pos) = self
597 .pending
598 .iter()
599 .position(|(pending_name, _)| pending_name == name)
600 {
601 if let Some((_, id)) = self.pending.remove(pos) {
602 return id;
603 }
604 }
605
606 self.next_id += 1;
607 format!("tool_{}", self.next_id)
608 }
609
610 fn progress_id(&self) -> Option<String> {
615 match self.pending.len() {
616 1 => self.pending.front().map(|(_, id)| id.clone()),
617 _ => None,
618 }
619 }
620}
621
622fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
623 match tracker.lock() {
624 Ok(guard) => guard,
625 Err(poisoned) => poisoned.into_inner(),
626 }
627}
628
629fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
630 lock_tool_tracker(tracker)
631 .progress_id()
632 .unwrap_or_else(|| "tool_unknown".to_string())
633}
634
635fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
636where
637 F: Fn(&str) -> Option<String>,
638{
639 env_lookup("ANTHROPIC_API_KEY")
640 .map(|key| key.trim().to_string())
641 .filter(|key| !key.is_empty())
642 .or_else(|| auth.api_key("anthropic").map(str::to_string))
643}
644
645fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
646 match ev {
647 AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
648 AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
649 let id = lock_tool_tracker(tool_tracker).start(&name);
650 Some(UiEvent::ToolCallStarted {
651 id,
652 name,
653 args: serde_json::json!({}),
654 })
655 }
656 AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
657 let id = lock_tool_tracker(tool_tracker).complete(&name);
658 Some(UiEvent::ToolCallCompleted {
659 id,
660 result: UiToolResult {
661 is_error: result.is_error,
662 text: format!("{name}: {result:?}"),
663 },
664 })
665 }
666 _ => None,
667 }
668}
669
670type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
671
672pub struct AppBuilder {
673 config: Option<Config>,
674 cwd: Option<PathBuf>,
675 permission_gate: Option<Arc<dyn PermissionGate>>,
676 install_builtin_tools: bool,
677 max_iterations: usize,
678 llm_override: Option<Arc<dyn LlmClient>>,
679 custom_tools_factory: Option<CustomToolsFactory>,
680 permissions_policy_path: Option<PathBuf>,
681 ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
682 settings: Option<crate::settings::Settings>,
683 auth: Option<crate::auth::Auth>,
684 context_discovery_disabled: bool,
685 session_store: Option<Arc<dyn SessionStore>>,
687 resume_session_id: Option<crate::session::SessionId>,
688 autocompact_enabled: bool,
689 skills: Vec<crate::skills::Skill>,
691 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
693 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
694}
695
696impl Default for AppBuilder {
697 fn default() -> Self {
698 Self {
699 config: None,
700 cwd: None,
701 permission_gate: None,
702 install_builtin_tools: false,
703 max_iterations: 20,
704 llm_override: None,
705 custom_tools_factory: None,
706 permissions_policy_path: None,
707 ui_tx: None,
708 settings: None,
709 auth: None,
710 context_discovery_disabled: false,
711 session_store: None,
712 resume_session_id: None,
713 autocompact_enabled: false,
714 skills: Vec::new(),
715 extra_tools: Vec::new(),
716 mcp_servers: Vec::new(),
717 }
718 }
719}
720
721impl AppBuilder {
722 pub fn new() -> Self {
723 Self::default()
724 }
725
726 pub fn with_config(mut self, cfg: Config) -> Self {
727 self.config = Some(cfg);
728 self
729 }
730
731 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
732 self.cwd = Some(cwd.into());
733 self
734 }
735
736 pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
737 self.permission_gate = Some(gate);
738 self
739 }
740
741 pub fn with_builtin_tools(mut self) -> Self {
747 self.install_builtin_tools = true;
748 self
749 }
750
751 pub fn with_max_iterations(mut self, n: usize) -> Self {
752 self.max_iterations = n;
753 self
754 }
755
756 pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
757 self.llm_override = Some(llm);
758 self
759 }
760
761 pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
762 self.permissions_policy_path = Some(path);
763 self
764 }
765
766 pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
767 self.ui_tx = Some(tx);
768 self
769 }
770
771 pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
773 self.settings = Some(settings);
774 self
775 }
776
777 pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
779 self.auth = Some(auth);
780 self
781 }
782
783 pub fn disable_context_discovery(mut self) -> Self {
786 self.context_discovery_disabled = true;
787 self
788 }
789
790 pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
793 self.session_store = Some(store);
794 self
795 }
796
797 pub fn with_autocompact(mut self) -> Self {
802 self.autocompact_enabled = true;
803 self
804 }
805
806 pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
812 self.skills = skills;
813 self
814 }
815
816 pub fn without_skills(mut self) -> Self {
817 self.skills.clear();
818 self
819 }
820
821 pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
825 self.extra_tools = tools;
826 self
827 }
828
829 pub fn with_mcp_servers(
832 mut self,
833 servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
834 ) -> Self {
835 self.mcp_servers = servers;
836 self
837 }
838
839 pub fn with_custom_tools_factory(
844 mut self,
845 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
846 ) -> Self {
847 self.custom_tools_factory = Some(Box::new(factory));
848 self
849 }
850
851 pub async fn build_with_custom_tools(
855 self,
856 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
857 ) -> Result<App> {
858 self.with_custom_tools_factory(factory).build().await
859 }
860
861 pub async fn build_with_session(
869 mut self,
870 resume: Option<crate::session::SessionId>,
871 ) -> Result<App> {
872 if let Some(id) = resume {
873 if self.session_store.is_none() {
874 return Err(AppError::Config(
875 "build_with_session(Some(id)) requires with_session_store(...)".into(),
876 ));
877 }
878 self.resume_session_id = Some(id);
879 }
880 self.build_internal().await
881 }
882
883 pub async fn build(self) -> Result<App> {
885 self.build_with_session(None).await
886 }
887
888 async fn build_internal(mut self) -> Result<App> {
889 let mcp_servers = std::mem::take(&mut self.mcp_servers);
890 let extra_tools = std::mem::take(&mut self.extra_tools);
891 let skills = self.skills.clone();
892 if self.install_builtin_tools && self.custom_tools_factory.is_some() {
893 return Err(AppError::Config(
894 "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
895 ));
896 }
897
898 let has_config = self.config.is_some();
902 let has_auth = self.auth.is_some();
903 let mut config = self.config.unwrap_or_default();
904 let settings = match self.settings {
905 Some(settings) => settings,
906 None => {
907 let mut settings = crate::settings::Settings::default();
908 settings.model.provider = config.model.provider.clone();
909 settings.model.name = config.model.name.clone();
910 settings.model.max_tokens = config.model.max_tokens;
911 settings
912 }
913 };
914 config.model.provider = settings.model.provider.clone();
915 config.model.name = settings.model.name.clone();
916 config.model.max_tokens = settings.model.max_tokens;
917 let mut auth = self.auth.unwrap_or_default();
918 if !has_auth {
919 if let Some(key) = config.anthropic.api_key.as_deref() {
920 auth.0.insert(
921 "anthropic".into(),
922 crate::auth::ProviderAuth::ApiKey {
923 key: key.to_string(),
924 },
925 );
926 }
927 }
928 let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
929 if env_or_auth_key.is_some() || has_auth || !has_config {
930 config.anthropic.api_key = env_or_auth_key;
931 }
932 let cwd = self
933 .cwd
934 .or_else(|| std::env::current_dir().ok())
935 .unwrap_or_else(|| PathBuf::from("."));
936 let permission_gate = self.permission_gate.unwrap_or_else(|| {
937 if self.ui_tx.is_some() {
941 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
942 } else {
943 tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
944 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
945 }
946 });
947
948 let policy: Arc<crate::permissions::Policy> =
950 Arc::new(match self.permissions_policy_path.as_ref() {
951 Some(path) => crate::permissions::Policy::load_or_default(path)?,
952 None => crate::permissions::Policy::default(),
953 });
954 let session_cache = Arc::new(crate::permissions::SessionCache::new());
955
956 let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
958
959 let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
964 let cancel_token = probe_ctx.cancel_token.clone();
965 let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
966 if let Some(factory_fn) = self.custom_tools_factory.take() {
967 let mut t = factory_fn(probe_ctx);
968 t.extend(extra_tools.clone());
969 (false, t)
970 } else {
971 (self.install_builtin_tools, extra_tools.clone())
972 };
973
974 let factory = SessionFactory {
975 cwd: cwd.clone(),
976 settings: settings.clone(),
977 auth: auth.clone(),
978 policy: Arc::clone(&policy),
979 session_cache: Arc::clone(&session_cache),
980 ui_tx: self.ui_tx.clone(),
981 permission_gate: Arc::clone(&permission_gate),
982 progress_tx: progress_tx.clone(),
983 skills: Arc::new(skills.clone()),
984 install_builtin_tools: install_builtin,
985 extra_tools: factory_extra_tools,
986 max_iterations: self.max_iterations,
987 context_discovery_disabled: self.context_discovery_disabled,
988 autocompact_enabled: self.autocompact_enabled,
989 session_store: self.session_store.clone(),
990 llm_override: self.llm_override.clone(),
991 current_model: Arc::new(Mutex::new(None)),
992 cancel_token: cancel_token.clone(),
993 };
994
995 let mode = match self.resume_session_id.take() {
996 Some(id) => SessionMode::Resume(id.into_string()),
997 None => SessionMode::New,
998 };
999 let (session, llm) = factory.build(mode, None).await?;
1000
1001 Ok(App {
1002 session: arc_swap::ArcSwap::from_pointee(session),
1003 llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1004 factory,
1005 config,
1006 cancel_token,
1007 progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1008 next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1009 skills: Arc::new(skills),
1010 mcp_servers,
1011 session_cache,
1012 })
1013 }
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018 use super::*;
1019 use crate::config::{AnthropicConfig, ModelConfig};
1020 use crate::events::UiEvent;
1021 use async_trait::async_trait;
1022 use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1023 use motosan_agent_tool::ToolDef;
1024 use std::sync::atomic::{AtomicUsize, Ordering};
1025
1026 #[tokio::test]
1027 async fn builder_fails_without_api_key() {
1028 let cfg = Config {
1029 anthropic: AnthropicConfig {
1030 api_key: None,
1031 base_url: "https://api.anthropic.com".into(),
1032 },
1033 model: ModelConfig {
1034 provider: "anthropic".into(),
1035 name: "claude-sonnet-4-6".into(),
1036 max_tokens: 4096,
1037 },
1038 };
1039 let err = match AppBuilder::new()
1040 .with_config(cfg)
1041 .with_builtin_tools()
1042 .build()
1043 .await
1044 {
1045 Ok(_) => panic!("must fail without key"),
1046 Err(err) => err,
1047 };
1048 assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1049 }
1050
1051 struct ToolOnlyLlm {
1052 turn: AtomicUsize,
1053 }
1054
1055 #[async_trait]
1056 impl LlmClient for ToolOnlyLlm {
1057 async fn chat(
1058 &self,
1059 _messages: &[Message],
1060 _tools: &[ToolDef],
1061 ) -> motosan_agent_loop::Result<ChatOutput> {
1062 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1063 if turn == 0 {
1064 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1065 ToolCallItem {
1066 id: "t1".into(),
1067 name: "read".into(),
1068 args: serde_json::json!({"path":"nope.txt"}),
1069 },
1070 ])))
1071 } else {
1072 Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1073 }
1074 }
1075 }
1076
1077 #[tokio::test]
1078 async fn empty_final_message_is_not_emitted() {
1079 let dir = tempfile::tempdir().unwrap();
1080 let mut cfg = Config::default();
1081 cfg.anthropic.api_key = Some("sk-unused".into());
1082 let app = AppBuilder::new()
1083 .with_config(cfg)
1084 .with_cwd(dir.path())
1085 .with_builtin_tools()
1086 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1087 turn: AtomicUsize::new(0),
1088 }))
1089 .build()
1090 .await
1091 .expect("build");
1092 let events: Vec<UiEvent> =
1093 futures::StreamExt::collect(app.send_user_message("x".into())).await;
1094 let empties = events
1095 .iter()
1096 .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1097 .count();
1098 assert_eq!(
1099 empties, 0,
1100 "should not emit empty final message, got: {events:?}"
1101 );
1102 }
1103
1104 struct EchoLlm;
1105
1106 #[async_trait]
1107 impl LlmClient for EchoLlm {
1108 async fn chat(
1109 &self,
1110 _messages: &[Message],
1111 _tools: &[ToolDef],
1112 ) -> motosan_agent_loop::Result<ChatOutput> {
1113 Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1114 }
1115 }
1116
1117 #[tokio::test]
1118 async fn new_session_swaps_in_a_fresh_empty_session() {
1119 use futures::StreamExt;
1120 let dir = tempfile::tempdir().expect("tempdir");
1121 let mut config = Config::default();
1122 config.anthropic.api_key = Some("sk-unused".into());
1123 let app = AppBuilder::new()
1124 .with_config(config)
1125 .with_cwd(dir.path())
1126 .with_builtin_tools()
1127 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1128 .build()
1129 .await
1130 .expect("build");
1131
1132 let _: Vec<_> = app.send_user_message("hello".into()).collect().await;
1133 let id_before = app.session_id();
1134 assert!(!app.session_history().await.expect("history").is_empty());
1135
1136 app.new_session().await.expect("new_session");
1137
1138 assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1139 assert!(
1140 app.session_history().await.expect("history").is_empty(),
1141 "fresh session has no history"
1142 );
1143 }
1144
1145 #[tokio::test]
1146 async fn load_session_restores_a_stored_session_by_id() {
1147 use futures::StreamExt;
1148 let dir = tempfile::tempdir().expect("tempdir");
1149 let store_dir = dir.path().join("sessions");
1150 let store: Arc<dyn SessionStore> =
1151 Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1152 let mut config = Config::default();
1153 config.anthropic.api_key = Some("sk-unused".into());
1154 let app = AppBuilder::new()
1155 .with_config(config)
1156 .with_cwd(dir.path())
1157 .with_builtin_tools()
1158 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1159 .with_session_store(Arc::clone(&store))
1160 .build()
1161 .await
1162 .expect("build");
1163
1164 let _: Vec<_> = app
1165 .send_user_message("remember this".into())
1166 .collect()
1167 .await;
1168 let original_id = app.session_id();
1169
1170 app.new_session().await.expect("new_session");
1171 assert_ne!(app.session_id(), original_id);
1172
1173 app.load_session(&original_id).await.expect("load_session");
1174 assert_eq!(app.session_id(), original_id);
1175 let history = app.session_history().await.expect("history");
1176 assert!(
1177 history.iter().any(|m| m.text().contains("remember this")),
1178 "loaded session should carry the original turn"
1179 );
1180 }
1181
1182 #[tokio::test]
1183 async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
1184 use futures::StreamExt;
1185 let dir = tempfile::tempdir().expect("tempdir");
1186 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1187 dir.path().join("s"),
1188 ));
1189 let mut config = Config::default();
1190 config.anthropic.api_key = Some("sk-unused".into());
1191 let app = AppBuilder::new()
1192 .with_config(config)
1193 .with_cwd(dir.path())
1194 .with_builtin_tools()
1195 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1196 .with_session_store(store)
1197 .build()
1198 .await
1199 .expect("build");
1200
1201 let _: Vec<_> = app.send_user_message("hello".into()).collect().await;
1202 let original_id = app.session_id();
1203
1204 let new_id = app.clone_session().await.expect("clone_session");
1205
1206 assert_ne!(new_id, original_id);
1208 assert_eq!(app.session_id(), new_id);
1209 let history = app.session_history().await.expect("history");
1211 assert!(history.iter().any(|m| m.text().contains("hello")));
1212 }
1213
1214 #[tokio::test]
1215 async fn fork_from_creates_a_branch_off_an_earlier_entry() {
1216 use futures::StreamExt;
1217 let dir = tempfile::tempdir().expect("tempdir");
1218 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1219 dir.path().join("s"),
1220 ));
1221 let mut config = Config::default();
1222 config.anthropic.api_key = Some("sk-unused".into());
1223 let app = AppBuilder::new()
1224 .with_config(config)
1225 .with_cwd(dir.path())
1226 .with_builtin_tools()
1227 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1228 .with_session_store(store)
1229 .build()
1230 .await
1231 .expect("build");
1232
1233 let _: Vec<_> = app.send_user_message("first".into()).collect().await;
1235 let _: Vec<_> = app.send_user_message("second".into()).collect().await;
1236
1237 let entries = app.session.load_full().entries().await.expect("entries");
1239 let first_id = entries
1240 .iter()
1241 .find_map(|stored| {
1242 let msg = stored.entry.as_message()?;
1243 (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
1244 .then(|| stored.id.clone())
1245 })
1246 .expect("first user message present");
1247
1248 let _: Vec<_> = app.fork_from(first_id, "branched".into()).collect().await;
1250
1251 let history = app.session_history().await.expect("history");
1252 let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
1253 assert!(
1254 texts.iter().any(|t| t.contains("first")),
1255 "fork keeps the fork-point ancestor"
1256 );
1257 assert!(
1258 texts.iter().any(|t| t.contains("branched")),
1259 "fork includes the new message"
1260 );
1261 assert!(
1262 !texts.iter().any(|t| t.contains("second")),
1263 "fork excludes the abandoned branch"
1264 );
1265 }
1266
1267 #[tokio::test]
1268 async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
1269 use futures::StreamExt;
1270 let dir = tempfile::tempdir().expect("tempdir");
1271 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1272 dir.path().join("s"),
1273 ));
1274 let mut config = Config::default();
1275 config.anthropic.api_key = Some("sk-unused".into());
1276 let app = AppBuilder::new()
1277 .with_config(config)
1278 .with_cwd(dir.path())
1279 .with_builtin_tools()
1280 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1281 .with_session_store(store)
1282 .build()
1283 .await
1284 .expect("build");
1285
1286 let _: Vec<_> = app.send_user_message("alpha".into()).collect().await;
1287 let _: Vec<_> = app.send_user_message("bravo".into()).collect().await;
1288
1289 let candidates = app.fork_candidates().await.expect("candidates");
1290 let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
1291 assert!(previews[0].contains("bravo"), "got {previews:?}");
1293 assert!(previews.iter().any(|p| p.contains("alpha")));
1294 assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
1296 }
1297
1298 #[tokio::test]
1299 async fn branches_returns_a_tree_for_a_linear_session() {
1300 use futures::StreamExt;
1301 let dir = tempfile::tempdir().expect("tempdir");
1302 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1303 dir.path().join("s"),
1304 ));
1305 let mut config = Config::default();
1306 config.anthropic.api_key = Some("sk-unused".into());
1307 let app = AppBuilder::new()
1308 .with_config(config)
1309 .with_cwd(dir.path())
1310 .with_builtin_tools()
1311 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1312 .with_session_store(store)
1313 .build()
1314 .await
1315 .expect("build");
1316
1317 let _: Vec<_> = app.send_user_message("hello".into()).collect().await;
1318 let tree = app.branches().await.expect("branches");
1319 assert!(!tree.nodes.is_empty());
1321 assert!(tree.active_leaf.is_some());
1322 }
1323
1324 #[tokio::test]
1325 async fn switch_model_preserves_history() {
1326 use futures::StreamExt;
1327 let dir = tempfile::tempdir().expect("tempdir");
1328 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1329 dir.path().join("s"),
1330 ));
1331 let mut config = Config::default();
1332 config.anthropic.api_key = Some("sk-unused".into());
1333 let app = AppBuilder::new()
1334 .with_config(config)
1335 .with_cwd(dir.path())
1336 .with_builtin_tools()
1337 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1338 .with_session_store(store)
1339 .build()
1340 .await
1341 .expect("build");
1342
1343 let _: Vec<_> = app.send_user_message("keep me".into()).collect().await;
1344 let id_before = app.session_id();
1345
1346 app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
1347 .await
1348 .expect("switch_model");
1349
1350 assert_eq!(
1351 app.session_id(),
1352 id_before,
1353 "switch_model keeps the same session"
1354 );
1355 let history = app.session_history().await.expect("history");
1356 assert!(history.iter().any(|m| m.text().contains("keep me")));
1357 }
1358
1359 #[tokio::test]
1360 async fn switch_model_is_sticky_for_future_session_rebuilds() {
1361 let dir = tempfile::tempdir().expect("tempdir");
1362 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1363 dir.path().join("s"),
1364 ));
1365 let mut config = Config::default();
1366 config.anthropic.api_key = Some("sk-unused".into());
1367 let app = AppBuilder::new()
1368 .with_config(config)
1369 .with_cwd(dir.path())
1370 .with_builtin_tools()
1371 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1372 .with_session_store(store)
1373 .build()
1374 .await
1375 .expect("build");
1376
1377 let selected = crate::model::ModelId::from("claude-opus-4-7");
1378 app.switch_model(&selected).await.expect("switch_model");
1379 app.new_session().await.expect("new_session");
1380
1381 assert_eq!(app.factory.current_model(), Some(selected));
1382 }
1383
1384 struct SleepThenDoneLlm {
1385 turn: AtomicUsize,
1386 }
1387
1388 #[async_trait]
1389 impl LlmClient for SleepThenDoneLlm {
1390 async fn chat(
1391 &self,
1392 _messages: &[Message],
1393 _tools: &[ToolDef],
1394 ) -> motosan_agent_loop::Result<ChatOutput> {
1395 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1396 if turn == 0 {
1397 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1398 ToolCallItem {
1399 id: "sleep".into(),
1400 name: "bash".into(),
1401 args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
1402 },
1403 ])))
1404 } else {
1405 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
1406 }
1407 }
1408 }
1409
1410 #[tokio::test]
1411 async fn cancel_reaches_builtin_tools_after_session_rebuild() {
1412 use futures::StreamExt;
1413 let dir = tempfile::tempdir().expect("tempdir");
1414 let mut config = Config::default();
1415 config.anthropic.api_key = Some("sk-unused".into());
1416 let app = Arc::new(
1417 AppBuilder::new()
1418 .with_config(config)
1419 .with_cwd(dir.path())
1420 .with_builtin_tools()
1421 .with_llm(Arc::new(SleepThenDoneLlm {
1422 turn: AtomicUsize::new(0),
1423 }) as Arc<dyn LlmClient>)
1424 .build()
1425 .await
1426 .expect("build"),
1427 );
1428
1429 app.new_session().await.expect("new_session");
1430 let running_app = Arc::clone(&app);
1431 let handle = tokio::spawn(async move {
1432 running_app
1433 .send_user_message("run a slow command".into())
1434 .collect::<Vec<_>>()
1435 .await
1436 });
1437 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1438 app.cancel();
1439
1440 let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
1441 .await
1442 .expect("turn should finish after cancellation")
1443 .expect("join");
1444 assert!(
1445 events.iter().any(|event| {
1446 matches!(
1447 event,
1448 UiEvent::ToolCallCompleted { result, .. }
1449 if result.text.contains("command cancelled by user")
1450 )
1451 }),
1452 "cancel should reach the rebuilt bash tool: {events:?}"
1453 );
1454 }
1455
1456 #[tokio::test]
1457 async fn compact_summarizes_a_session_with_enough_history() {
1458 struct DoneLlm;
1459 #[async_trait]
1460 impl LlmClient for DoneLlm {
1461 async fn chat(
1462 &self,
1463 _messages: &[Message],
1464 _tools: &[ToolDef],
1465 ) -> motosan_agent_loop::Result<ChatOutput> {
1466 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
1467 }
1468 }
1469
1470 let dir = tempfile::tempdir().expect("tempdir");
1471 let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
1472 dir.path().join("sessions"),
1473 ));
1474 let mut config = Config::default();
1475 config.anthropic.api_key = Some("sk-unused".into());
1476 let app = AppBuilder::new()
1477 .with_config(config)
1478 .with_cwd(dir.path())
1479 .with_builtin_tools()
1480 .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
1481 .with_session_store(store)
1482 .build()
1483 .await
1484 .expect("build");
1485
1486 for i in 0..4 {
1493 let _: Vec<_> = app.send_user_message(format!("turn {i}")).collect().await;
1494 }
1495
1496 app.compact().await.expect("compact should succeed");
1497
1498 let history = app.session_history().await.expect("history");
1501 assert!(
1502 !history.is_empty(),
1503 "session should still have content post-compaction"
1504 );
1505 }
1506
1507 #[test]
1508 fn anthropic_env_api_key_overrides_auth_json_key() {
1509 let mut auth = crate::auth::Auth::default();
1510 auth.0.insert(
1511 "anthropic".into(),
1512 crate::auth::ProviderAuth::ApiKey {
1513 key: "sk-auth".into(),
1514 },
1515 );
1516
1517 let key = anthropic_api_key_from(&auth, |name| {
1518 (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
1519 });
1520 assert_eq!(key.as_deref(), Some("sk-env"));
1521 }
1522
1523 #[tokio::test]
1524 async fn with_settings_overrides_deprecated_config_model() {
1525 use crate::settings::Settings;
1526
1527 let mut config = Config::default();
1528 config.model.name = "from-config".into();
1529 config.anthropic.api_key = Some("sk-config".into());
1530
1531 let mut settings = Settings::default();
1532 settings.model.name = "from-settings".into();
1533
1534 let tmp = tempfile::tempdir().unwrap();
1535 let app = AppBuilder::new()
1536 .with_config(config)
1537 .with_settings(settings)
1538 .with_cwd(tmp.path())
1539 .disable_context_discovery()
1540 .with_llm(Arc::new(EchoLlm))
1541 .build()
1542 .await
1543 .expect("build");
1544 assert_eq!(app.config().model.name, "from-settings");
1545 assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
1546 }
1547
1548 #[tokio::test]
1549 async fn with_settings_synthesises_legacy_config_for_build() {
1550 use crate::auth::{Auth, ProviderAuth};
1551 use crate::settings::Settings;
1552
1553 let mut settings = Settings::default();
1554 settings.model.name = "claude-sonnet-4-6".into();
1555
1556 let mut auth = Auth::default();
1557 auth.0.insert(
1558 "anthropic".into(),
1559 ProviderAuth::ApiKey {
1560 key: "sk-test".into(),
1561 },
1562 );
1563
1564 let tmp = tempfile::tempdir().unwrap();
1565 let app = AppBuilder::new()
1566 .with_settings(settings)
1567 .with_auth(auth)
1568 .with_cwd(tmp.path())
1569 .with_builtin_tools()
1570 .disable_context_discovery()
1571 .with_llm(Arc::new(EchoLlm))
1572 .build()
1573 .await
1574 .expect("build");
1575 let _ = app;
1576 }
1577
1578 #[tokio::test]
1579 async fn cancel_before_turn_does_not_poison_future_turns() {
1580 let dir = tempfile::tempdir().unwrap();
1581 let mut cfg = Config::default();
1582 cfg.anthropic.api_key = Some("sk-unused".into());
1583 let app = AppBuilder::new()
1584 .with_config(cfg)
1585 .with_cwd(dir.path())
1586 .with_builtin_tools()
1587 .with_llm(std::sync::Arc::new(EchoLlm))
1588 .build()
1589 .await
1590 .expect("build");
1591
1592 app.cancel();
1593 let events: Vec<UiEvent> = app.send_user_message("x".into()).collect().await;
1594
1595 assert!(
1596 events
1597 .iter()
1598 .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
1599 "turn should use a fresh cancellation token: {events:?}"
1600 );
1601 }
1602
1603 #[test]
1604 fn map_event_matches_started_and_completed_ids_by_tool_name() {
1605 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1606
1607 let started_bash = map_event(
1608 AgentEvent::Core(CoreEvent::ToolStarted {
1609 name: "bash".into(),
1610 }),
1611 &tracker,
1612 );
1613 let started_read = map_event(
1614 AgentEvent::Core(CoreEvent::ToolStarted {
1615 name: "read".into(),
1616 }),
1617 &tracker,
1618 );
1619 let completed_bash = map_event(
1620 AgentEvent::Core(CoreEvent::ToolCompleted {
1621 name: "bash".into(),
1622 result: motosan_agent_tool::ToolResult::text("ok"),
1623 }),
1624 &tracker,
1625 );
1626 let completed_read = map_event(
1627 AgentEvent::Core(CoreEvent::ToolCompleted {
1628 name: "read".into(),
1629 result: motosan_agent_tool::ToolResult::text("ok"),
1630 }),
1631 &tracker,
1632 );
1633
1634 assert!(matches!(
1635 started_bash,
1636 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
1637 ));
1638 assert!(matches!(
1639 started_read,
1640 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
1641 ));
1642 assert!(matches!(
1643 completed_bash,
1644 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
1645 ));
1646 assert!(matches!(
1647 completed_read,
1648 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
1649 ));
1650 }
1651
1652 #[test]
1653 fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
1654 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1655 let s1 = map_event(
1656 AgentEvent::Core(CoreEvent::ToolStarted {
1657 name: "bash".into(),
1658 }),
1659 &tracker,
1660 );
1661 let s2 = map_event(
1662 AgentEvent::Core(CoreEvent::ToolStarted {
1663 name: "bash".into(),
1664 }),
1665 &tracker,
1666 );
1667 let c1 = map_event(
1668 AgentEvent::Core(CoreEvent::ToolCompleted {
1669 name: "bash".into(),
1670 result: motosan_agent_tool::ToolResult::text("a"),
1671 }),
1672 &tracker,
1673 );
1674 let c2 = map_event(
1675 AgentEvent::Core(CoreEvent::ToolCompleted {
1676 name: "bash".into(),
1677 result: motosan_agent_tool::ToolResult::text("b"),
1678 }),
1679 &tracker,
1680 );
1681
1682 let id_s1 = match s1 {
1683 Some(UiEvent::ToolCallStarted { id, .. }) => id,
1684 other => panic!("{other:?}"),
1685 };
1686 let id_s2 = match s2 {
1687 Some(UiEvent::ToolCallStarted { id, .. }) => id,
1688 other => panic!("{other:?}"),
1689 };
1690 let id_c1 = match c1 {
1691 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
1692 other => panic!("{other:?}"),
1693 };
1694 let id_c2 = match c2 {
1695 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
1696 other => panic!("{other:?}"),
1697 };
1698
1699 assert_eq!(id_s1, id_c1);
1700 assert_eq!(id_s2, id_c2);
1701 assert_ne!(id_s1, id_s2);
1702 }
1703
1704 #[tokio::test]
1705 async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
1706 let dir = tempfile::tempdir().unwrap();
1707 let mut cfg = Config::default();
1708 cfg.anthropic.api_key = Some("sk-unused".into());
1709 let app = AppBuilder::new()
1710 .with_config(cfg)
1711 .with_cwd(dir.path())
1712 .with_builtin_tools()
1713 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1714 turn: AtomicUsize::new(0),
1715 }))
1716 .build()
1717 .await
1718 .expect("build");
1719
1720 let mut first = Box::pin(app.send_user_message("first".into()));
1721 let first_event = first.next().await;
1722 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
1723
1724 let second_events: Vec<UiEvent> = app.send_user_message("second".into()).collect().await;
1725 assert_eq!(
1726 second_events.len(),
1727 1,
1728 "expected immediate single error event, got: {second_events:?}"
1729 );
1730 assert!(matches!(
1731 &second_events[0],
1732 UiEvent::Error(msg) if msg.contains("single-turn-per-App")
1733 ));
1734 }
1735
1736 #[test]
1737 fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
1738 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1739 assert_eq!(progress_event_id(&tracker), "tool_unknown");
1740
1741 let only = map_event(
1742 AgentEvent::Core(CoreEvent::ToolStarted {
1743 name: "bash".into(),
1744 }),
1745 &tracker,
1746 );
1747 let only_id = match only {
1748 Some(UiEvent::ToolCallStarted { id, .. }) => id,
1749 other => panic!("{other:?}"),
1750 };
1751 assert_eq!(progress_event_id(&tracker), only_id);
1752
1753 let _second = map_event(
1754 AgentEvent::Core(CoreEvent::ToolStarted {
1755 name: "read".into(),
1756 }),
1757 &tracker,
1758 );
1759 assert_eq!(progress_event_id(&tracker), "tool_unknown");
1760 }
1761
1762 #[tokio::test]
1763 async fn builder_rejects_builtin_and_custom_tools_together() {
1764 let mut cfg = Config::default();
1765 cfg.anthropic.api_key = Some("sk-unused".into());
1766 let dir = tempfile::tempdir().unwrap();
1767 let err = match AppBuilder::new()
1768 .with_config(cfg)
1769 .with_cwd(dir.path())
1770 .with_builtin_tools()
1771 .with_custom_tools_factory(|_| Vec::new())
1772 .build()
1773 .await
1774 {
1775 Ok(_) => panic!("must reject conflicting tool configuration"),
1776 Err(err) => err,
1777 };
1778
1779 assert!(format!("{err}").contains("mutually exclusive"));
1780 }
1781
1782 #[tokio::test]
1784 async fn two_turns_in_same_session_share_history() {
1785 #[derive(Default)]
1786 struct CounterLlm {
1787 turn: AtomicUsize,
1788 }
1789 #[async_trait]
1790 impl LlmClient for CounterLlm {
1791 async fn chat(
1792 &self,
1793 messages: &[Message],
1794 _tools: &[ToolDef],
1795 ) -> motosan_agent_loop::Result<ChatOutput> {
1796 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1797 let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
1798 Ok(ChatOutput::new(LlmResponse::Message(answer)))
1799 }
1800 }
1801
1802 let tmp = tempfile::tempdir().unwrap();
1803 let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
1804 tmp.path().to_path_buf(),
1805 ));
1806
1807 let app = AppBuilder::new()
1808 .with_settings(crate::settings::Settings::default())
1809 .with_auth(crate::auth::Auth::default())
1810 .with_cwd(tmp.path())
1811 .with_builtin_tools()
1812 .disable_context_discovery()
1813 .with_llm(std::sync::Arc::new(CounterLlm::default()))
1814 .with_session_store(store)
1815 .build_with_session(None)
1816 .await
1817 .expect("build");
1818
1819 let _events1: Vec<UiEvent> = app.send_user_message("hi".into()).collect().await;
1820 let events2: Vec<UiEvent> = app.send_user_message("again".into()).collect().await;
1821
1822 let saw_more_than_one = events2.iter().any(|e| {
1824 matches!(
1825 e,
1826 UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
1827 )
1828 });
1829 assert!(
1830 saw_more_than_one,
1831 "second turn should have seen history; events: {events2:?}"
1832 );
1833 }
1834}
1835
1836#[cfg(test)]
1837mod skills_builder_tests {
1838 use super::*;
1839 use crate::skills::types::{Skill, SkillSource};
1840 use std::path::PathBuf;
1841
1842 fn fixture() -> Skill {
1843 Skill {
1844 name: "x".into(),
1845 description: "d".into(),
1846 file_path: PathBuf::from("/x.md"),
1847 base_dir: PathBuf::from("/"),
1848 disable_model_invocation: false,
1849 source: SkillSource::Global,
1850 }
1851 }
1852
1853 #[test]
1854 fn with_skills_stores_skills() {
1855 let b = AppBuilder::new().with_skills(vec![fixture()]);
1856 assert_eq!(b.skills.len(), 1);
1857 assert_eq!(b.skills[0].name, "x");
1858 }
1859
1860 #[test]
1861 fn without_skills_clears() {
1862 let b = AppBuilder::new()
1863 .with_skills(vec![fixture()])
1864 .without_skills();
1865 assert!(b.skills.is_empty());
1866 }
1867}
1868
1869#[cfg(test)]
1870mod mcp_builder_tests {
1871 use super::*;
1872 use motosan_agent_tool::Tool;
1873
1874 struct FakeTool;
1876 impl Tool for FakeTool {
1877 fn def(&self) -> motosan_agent_tool::ToolDef {
1878 motosan_agent_tool::ToolDef {
1879 name: "fake__echo".into(),
1880 description: "test".into(),
1881 input_schema: serde_json::json!({"type": "object"}),
1882 }
1883 }
1884 fn call(
1885 &self,
1886 _args: serde_json::Value,
1887 _ctx: &motosan_agent_tool::ToolContext,
1888 ) -> std::pin::Pin<
1889 Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
1890 > {
1891 Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
1892 }
1893 }
1894
1895 #[test]
1896 fn with_extra_tools_stores_tools() {
1897 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
1898 let b = AppBuilder::new().with_extra_tools(tools);
1899 assert_eq!(b.extra_tools.len(), 1);
1900 }
1901}