1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9 AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10 CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23#[derive(Debug, Clone)]
25pub(crate) enum SessionMode {
26 New,
28 Resume(String),
30}
31
32struct SharedLlm {
37 client: Arc<dyn LlmClient>,
38}
39
40impl SharedLlm {
41 fn new(client: Arc<dyn LlmClient>) -> Self {
42 Self { client }
43 }
44
45 fn client(&self) -> Arc<dyn LlmClient> {
46 Arc::clone(&self.client)
47 }
48}
49
50pub(crate) struct SessionFactory {
51 cwd: PathBuf,
52 settings: crate::settings::Settings,
53 auth: crate::auth::Auth,
54 policy: Arc<crate::permissions::Policy>,
55 session_cache: Arc<crate::permissions::SessionCache>,
56 ui_tx: Option<mpsc::Sender<UiEvent>>,
57 headless_permissions: bool,
58 permission_gate: Arc<dyn PermissionGate>,
59 progress_tx: mpsc::Sender<ToolProgressChunk>,
62 skills: Arc<Vec<crate::skills::Skill>>,
63 install_builtin_tools: bool,
64 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
65 max_iterations: usize,
66 context_discovery_disabled: bool,
67 autocompact_enabled: bool,
68 session_store: Option<Arc<dyn SessionStore>>,
69 llm_override: Option<Arc<dyn LlmClient>>,
70 current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
71 cancel_token: SharedCancelToken,
72}
73
74impl SessionFactory {
75 fn current_model(&self) -> Option<crate::model::ModelId> {
76 match self.current_model.lock() {
77 Ok(guard) => guard.clone(),
78 Err(poisoned) => poisoned.into_inner().clone(),
79 }
80 }
81
82 fn set_current_model(&self, model: crate::model::ModelId) {
83 match self.current_model.lock() {
84 Ok(mut guard) => *guard = Some(model),
85 Err(poisoned) => *poisoned.into_inner() = Some(model),
86 }
87 }
88
89 async fn build(
94 &self,
95 mode: SessionMode,
96 model_override: Option<&crate::model::ModelId>,
97 ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
98 let effective_model = model_override.cloned().or_else(|| self.current_model());
100 let mut settings = self.settings.clone();
101 if let Some(m) = &effective_model {
102 settings.model.name = m.as_str().to_string();
103 }
104
105 let llm = if effective_model.is_none() {
106 self.llm_override.as_ref().map_or_else(
107 || build_llm_client(&settings, &self.auth),
108 |llm| Ok(Arc::clone(llm)),
109 )?
110 } else {
111 build_llm_client(&settings, &self.auth)?
112 };
113
114 let tool_ctx = ToolCtx::new_with_cancel_token(
117 &self.cwd,
118 Arc::clone(&self.permission_gate),
119 self.progress_tx.clone(),
120 self.cancel_token.clone(),
121 );
122 let mut tools = if self.install_builtin_tools {
123 builtin_tools(tool_ctx.clone())
124 } else {
125 Vec::new()
126 };
127 tools.extend(self.extra_tools.iter().cloned());
128
129 let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
130 let base_prompt = build_system_prompt(&tool_names, &self.skills);
131 let system_prompt = if self.context_discovery_disabled {
132 base_prompt
133 } else {
134 let agent_dir = crate::paths::agent_dir();
135 let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
136 crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
137 };
138 let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
139
140 let mut engine_builder = Engine::builder()
141 .max_iterations(self.max_iterations)
142 .system_prompt(system_prompt)
143 .tool_context(motosan_tool_context);
144 for tool in tools {
145 engine_builder = engine_builder.tool(tool);
146 }
147 if let Some(ui_tx) = &self.ui_tx {
148 let ext = crate::permissions::PermissionExtension::new(
149 Arc::clone(&self.policy),
150 Arc::clone(&self.session_cache),
151 self.cwd.clone(),
152 ui_tx.clone(),
153 );
154 engine_builder = engine_builder.extension(Box::new(ext));
155 } else if self.headless_permissions {
156 let ext = crate::permissions::PermissionExtension::headless(
157 Arc::clone(&self.policy),
158 Arc::clone(&self.session_cache),
159 self.cwd.clone(),
160 );
161 engine_builder = engine_builder.extension(Box::new(ext));
162 }
163 if self.autocompact_enabled
164 && settings.session.compact_at_context_pct > 0.0
165 && settings.session.compact_at_context_pct < 1.0
166 {
167 let cfg = AutocompactConfig {
168 threshold: settings.session.compact_at_context_pct,
169 max_context_tokens: settings.session.max_context_tokens,
170 keep_turns: settings.session.keep_turns.max(1),
171 };
172 engine_builder = engine_builder
173 .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
174 }
175 let engine = engine_builder.build();
176
177 let session = match (&mode, &self.session_store) {
178 (SessionMode::Resume(id), Some(store)) => {
179 let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
180 .await
181 .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
182 let entries = s
183 .entries()
184 .await
185 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
186 crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
187 s
188 }
189 (SessionMode::Resume(_), None) => {
190 return Err(AppError::Config("resume requires a session store".into()));
191 }
192 (SessionMode::New, Some(store)) => {
193 let id = crate::session::SessionId::new();
194 AgentSession::new_with_store(
195 id.into_string(),
196 Arc::clone(store),
197 engine,
198 Arc::clone(&llm),
199 )
200 }
201 (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
202 };
203
204 Ok((session, llm))
205 }
206}
207
208pub struct App {
209 session: arc_swap::ArcSwap<AgentSession>,
210 llm: arc_swap::ArcSwap<SharedLlm>,
211 factory: SessionFactory,
212 config: Config,
213 cancel_token: SharedCancelToken,
214 progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
215 next_tool_id: Arc<Mutex<ToolCallTracker>>,
216 skills: Arc<Vec<crate::skills::Skill>>,
217 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
218 pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
219}
220
221impl App {
222 pub fn config(&self) -> &Config {
223 &self.config
224 }
225
226 pub fn cancel(&self) {
230 self.cancel_token.cancel();
231 }
232
233 pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
237 Arc::clone(&self.session_cache)
238 }
239
240 pub fn session_id(&self) -> String {
243 self.session.load().session_id().to_string()
244 }
245
246 pub async fn session_history(
255 &self,
256 ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
257 self.session.load_full().history().await
258 }
259
260 pub async fn compact(&self) -> Result<()> {
266 use motosan_agent_loop::ThresholdStrategy;
267 let strategy = ThresholdStrategy {
268 threshold: 0.0,
269 ..ThresholdStrategy::default()
270 };
271 let llm = self.llm.load_full().client();
272 self.session
273 .load_full()
274 .maybe_compact(&strategy, llm)
275 .await
276 .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
277 Ok(())
278 }
279
280 pub async fn new_session(&self) -> Result<()> {
284 let (session, llm) = self.factory.build(SessionMode::New, None).await?;
285 self.session.store(Arc::new(session));
286 self.llm.store(Arc::new(SharedLlm::new(llm)));
287 Ok(())
288 }
289
290 pub async fn load_session(&self, id: &str) -> Result<()> {
296 let (session, llm) = self
297 .factory
298 .build(SessionMode::Resume(id.to_string()), None)
299 .await?;
300 self.session.store(Arc::new(session));
301 self.llm.store(Arc::new(SharedLlm::new(llm)));
302 Ok(())
303 }
304
305 pub async fn clone_session(&self) -> Result<String> {
314 let Some(store) = self.factory.session_store.as_ref() else {
315 return Err(AppError::Config("clone requires a session store".into()));
316 };
317 let source_id = self.session.load().session_id().to_string();
318 let new_id = crate::session::SessionId::new().into_string();
319 let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
320 catalog
321 .fork(&source_id, &new_id)
322 .await
323 .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
324 self.load_session(&new_id).await?;
325 Ok(new_id)
326 }
327
328 pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
343 let current_id = self.session.load().session_id().to_string();
344 let (session, llm) = self
345 .factory
346 .build(SessionMode::Resume(current_id), Some(model))
347 .await?;
348 self.factory.set_current_model(model.clone());
349 self.session.store(Arc::new(session));
350 self.llm.store(Arc::new(SharedLlm::new(llm)));
351 Ok(())
352 }
353
354 pub async fn disconnect_mcp(&self) {
357 for (name, server) in &self.mcp_servers {
358 let _ =
359 tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
360 tracing::debug!(target: "mcp", server = %name, "disconnected");
361 }
362 }
363
364 fn run_turn(
365 &self,
366 msg: crate::user_message::UserMessage,
367 fork_from: Option<motosan_agent_loop::EntryId>,
368 ) -> impl Stream<Item = UiEvent> + Send + 'static {
369 let session = self.session.load_full();
370 let skills = Arc::clone(&self.skills);
371 let cancel_token = self.cancel_token.clone();
372 let tracker = Arc::clone(&self.next_tool_id);
373 let progress = Arc::clone(&self.progress_rx);
374
375 async_stream::stream! {
376 let new_user = {
380 let expanded_text = crate::skills::expand::expand_skill_command(&msg.text, &skills);
384 let expanded_msg = crate::user_message::UserMessage {
385 text: expanded_text,
386 attachments: msg.attachments.clone(),
387 };
388 match crate::user_message::prepare_user_message(&expanded_msg) {
389 Ok(m) => m,
390 Err(err) => {
391 yield UiEvent::AttachmentError {
392 kind: err.kind(),
393 message: err.to_string(),
394 };
395 return;
396 }
397 }
398 };
399
400 let mut progress_guard = match progress.try_lock() {
402 Ok(guard) => guard,
403 Err(_) => {
404 yield UiEvent::Error(
405 "another turn is already running; capo is single-turn-per-App".into(),
406 );
407 return;
408 }
409 };
410
411 let cancel = cancel_token.reset();
413
414 yield UiEvent::AgentTurnStarted;
415 yield UiEvent::AgentThinking;
416
417 let handle = match fork_from {
419 None => {
420 let history = match session.history().await {
422 Ok(h) => h,
423 Err(err) => {
424 yield UiEvent::Error(format!("session.history failed: {err}"));
425 return;
426 }
427 };
428 let mut messages = history;
429 messages.push(new_user);
430 match session.start_turn(messages).await {
431 Ok(h) => h,
432 Err(err) => {
433 yield UiEvent::Error(format!("session.start_turn failed: {err}"));
434 return;
435 }
436 }
437 }
438 Some(from) => {
439 match session.fork_turn(from, vec![new_user]).await {
441 Ok(h) => h,
442 Err(err) => {
443 yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
444 return;
445 }
446 }
447 }
448 };
449 let previous_len = handle.previous_len;
450 let epoch = handle.epoch;
451 let branch_parent = handle.branch_parent;
452 let ops_tx = handle.ops_tx.clone();
453 let mut agent_stream = handle.stream;
454
455 let interrupt_bridge = tokio::spawn(async move {
463 cancel.cancelled().await;
464 let _ = ops_tx.send(AgentOp::Interrupt).await;
465 });
466
467 let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
469 let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
470
471 loop {
472 while let Ok(chunk) = progress_guard.try_recv() {
474 yield UiEvent::ToolCallProgress {
475 id: progress_event_id(&tracker),
476 chunk: ProgressChunk::from(chunk),
477 };
478 }
479
480 tokio::select! {
481 biased;
482 maybe_item = agent_stream.next() => {
483 match maybe_item {
484 Some(AgentStreamItem::Event(ev)) => {
485 if let Some(ui) = map_event(ev, &tracker) {
486 yield ui;
487 }
488 }
489 Some(AgentStreamItem::Terminal(term)) => {
490 terminal_result = Some(term.result);
491 terminal_messages = Some(term.messages);
492 break;
493 }
494 None => break,
495 }
496 }
497 Some(chunk) = progress_guard.recv() => {
498 yield UiEvent::ToolCallProgress {
499 id: progress_event_id(&tracker),
500 chunk: ProgressChunk::from(chunk),
501 };
502 }
503 }
504 }
505
506 interrupt_bridge.abort();
508
509 if let Some(msgs) = terminal_messages.as_ref() {
511 if let Err(err) = session
512 .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
513 .await
514 {
515 yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
516 }
517 }
518
519 match terminal_result {
521 Some(Ok(_)) => {
522 let final_text = terminal_messages
523 .as_ref()
524 .and_then(|msgs| {
525 msgs.iter()
526 .rev()
527 .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
528 .map(|m| m.text())
529 })
530 .unwrap_or_default();
531 if !final_text.is_empty() {
532 yield UiEvent::AgentMessageComplete(final_text);
533 }
534 while let Ok(chunk) = progress_guard.try_recv() {
536 yield UiEvent::ToolCallProgress {
537 id: progress_event_id(&tracker),
538 chunk: ProgressChunk::from(chunk),
539 };
540 }
541 yield UiEvent::AgentTurnComplete;
542 }
543 Some(Err(err)) => {
544 yield UiEvent::Error(format!("{err}"));
545 }
546 None => { }
547 }
548 }
549 }
550
551 pub fn send_user_message(
552 &self,
553 msg: crate::user_message::UserMessage,
554 ) -> impl Stream<Item = UiEvent> + Send + 'static {
555 self.run_turn(msg, None)
556 }
557
558 pub fn fork_from(
563 &self,
564 from: motosan_agent_loop::EntryId,
565 message: crate::user_message::UserMessage,
566 ) -> impl Stream<Item = UiEvent> + Send + 'static {
567 self.run_turn(message, Some(from))
568 }
569
570 pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
574 let entries = self
575 .session
576 .load_full()
577 .entries()
578 .await
579 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
580 let branch = motosan_agent_loop::active_branch(&entries);
581 let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
582 .iter()
583 .filter_map(|stored| {
584 let msg = stored.entry.as_message()?;
585 if !matches!(msg.role(), motosan_agent_loop::Role::User) {
586 return None;
587 }
588 let preview: String = msg
589 .text()
590 .lines()
591 .next()
592 .unwrap_or("")
593 .chars()
594 .take(80)
595 .collect();
596 Some((stored.id.clone(), preview))
597 })
598 .collect();
599 out.reverse();
600 Ok(out)
601 }
602
603 pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
605 self.session
606 .load_full()
607 .branches()
608 .await
609 .map_err(|e| AppError::Config(format!("branches failed: {e}")))
610 }
611}
612
613#[derive(Debug, Default)]
614struct ToolCallTracker {
615 next_id: usize,
616 pending: VecDeque<(String, String)>,
617}
618
619impl ToolCallTracker {
620 fn start(&mut self, name: &str) -> String {
621 self.next_id += 1;
622 let id = format!("tool_{}", self.next_id);
623 self.pending.push_back((name.to_string(), id.clone()));
624 id
625 }
626
627 fn complete(&mut self, name: &str) -> String {
628 if let Some(pos) = self
629 .pending
630 .iter()
631 .position(|(pending_name, _)| pending_name == name)
632 {
633 if let Some((_, id)) = self.pending.remove(pos) {
634 return id;
635 }
636 }
637
638 self.next_id += 1;
639 format!("tool_{}", self.next_id)
640 }
641
642 fn progress_id(&self) -> Option<String> {
647 match self.pending.len() {
648 1 => self.pending.front().map(|(_, id)| id.clone()),
649 _ => None,
650 }
651 }
652}
653
654fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
655 match tracker.lock() {
656 Ok(guard) => guard,
657 Err(poisoned) => poisoned.into_inner(),
658 }
659}
660
661fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
662 lock_tool_tracker(tracker)
663 .progress_id()
664 .unwrap_or_else(|| "tool_unknown".to_string())
665}
666
667fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
668where
669 F: Fn(&str) -> Option<String>,
670{
671 env_lookup("ANTHROPIC_API_KEY")
672 .map(|key| key.trim().to_string())
673 .filter(|key| !key.is_empty())
674 .or_else(|| auth.api_key("anthropic").map(str::to_string))
675}
676
677fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
678 match ev {
679 AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
680 AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
681 let id = lock_tool_tracker(tool_tracker).start(&name);
682 Some(UiEvent::ToolCallStarted {
683 id,
684 name,
685 args: serde_json::json!({}),
686 })
687 }
688 AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
689 let id = lock_tool_tracker(tool_tracker).complete(&name);
690 Some(UiEvent::ToolCallCompleted {
691 id,
692 result: UiToolResult {
693 is_error: result.is_error,
694 text: format!("{name}: {result:?}"),
695 },
696 })
697 }
698 _ => None,
699 }
700}
701
702type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
703
704pub struct AppBuilder {
705 config: Option<Config>,
706 cwd: Option<PathBuf>,
707 permission_gate: Option<Arc<dyn PermissionGate>>,
708 install_builtin_tools: bool,
709 max_iterations: usize,
710 llm_override: Option<Arc<dyn LlmClient>>,
711 custom_tools_factory: Option<CustomToolsFactory>,
712 permissions_policy_path: Option<PathBuf>,
713 ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
714 headless_permissions: bool,
715 settings: Option<crate::settings::Settings>,
716 auth: Option<crate::auth::Auth>,
717 context_discovery_disabled: bool,
718 session_store: Option<Arc<dyn SessionStore>>,
720 resume_session_id: Option<crate::session::SessionId>,
721 autocompact_enabled: bool,
722 skills: Vec<crate::skills::Skill>,
724 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
726 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
727}
728
729impl Default for AppBuilder {
730 fn default() -> Self {
731 Self {
732 config: None,
733 cwd: None,
734 permission_gate: None,
735 install_builtin_tools: false,
736 max_iterations: 20,
737 llm_override: None,
738 custom_tools_factory: None,
739 permissions_policy_path: None,
740 ui_tx: None,
741 headless_permissions: false,
742 settings: None,
743 auth: None,
744 context_discovery_disabled: false,
745 session_store: None,
746 resume_session_id: None,
747 autocompact_enabled: false,
748 skills: Vec::new(),
749 extra_tools: Vec::new(),
750 mcp_servers: Vec::new(),
751 }
752 }
753}
754
755impl AppBuilder {
756 pub fn new() -> Self {
757 Self::default()
758 }
759
760 pub fn with_config(mut self, cfg: Config) -> Self {
761 self.config = Some(cfg);
762 self
763 }
764
765 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
766 self.cwd = Some(cwd.into());
767 self
768 }
769
770 pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
771 self.permission_gate = Some(gate);
772 self
773 }
774
775 pub fn with_builtin_tools(mut self) -> Self {
781 self.install_builtin_tools = true;
782 self
783 }
784
785 pub fn with_max_iterations(mut self, n: usize) -> Self {
786 self.max_iterations = n;
787 self
788 }
789
790 pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
791 self.llm_override = Some(llm);
792 self
793 }
794
795 pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
796 self.permissions_policy_path = Some(path);
797 self
798 }
799
800 pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
801 self.ui_tx = Some(tx);
802 self
803 }
804
805 pub fn with_headless_permissions(mut self) -> Self {
810 self.headless_permissions = true;
811 self
812 }
813
814 pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
816 self.settings = Some(settings);
817 self
818 }
819
820 pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
822 self.auth = Some(auth);
823 self
824 }
825
826 pub fn disable_context_discovery(mut self) -> Self {
829 self.context_discovery_disabled = true;
830 self
831 }
832
833 pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
836 self.session_store = Some(store);
837 self
838 }
839
840 pub fn with_autocompact(mut self) -> Self {
845 self.autocompact_enabled = true;
846 self
847 }
848
849 pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
855 self.skills = skills;
856 self
857 }
858
859 pub fn without_skills(mut self) -> Self {
860 self.skills.clear();
861 self
862 }
863
864 pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
868 self.extra_tools = tools;
869 self
870 }
871
872 pub fn with_mcp_servers(
875 mut self,
876 servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
877 ) -> Self {
878 self.mcp_servers = servers;
879 self
880 }
881
882 pub fn with_custom_tools_factory(
887 mut self,
888 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
889 ) -> Self {
890 self.custom_tools_factory = Some(Box::new(factory));
891 self
892 }
893
894 pub async fn build_with_custom_tools(
898 self,
899 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
900 ) -> Result<App> {
901 self.with_custom_tools_factory(factory).build().await
902 }
903
904 pub async fn build_with_session(
912 mut self,
913 resume: Option<crate::session::SessionId>,
914 ) -> Result<App> {
915 if let Some(id) = resume {
916 if self.session_store.is_none() {
917 return Err(AppError::Config(
918 "build_with_session(Some(id)) requires with_session_store(...)".into(),
919 ));
920 }
921 self.resume_session_id = Some(id);
922 }
923 self.build_internal().await
924 }
925
926 pub async fn build(self) -> Result<App> {
928 self.build_with_session(None).await
929 }
930
931 async fn build_internal(mut self) -> Result<App> {
932 let mcp_servers = std::mem::take(&mut self.mcp_servers);
933 let extra_tools = std::mem::take(&mut self.extra_tools);
934 let skills = self.skills.clone();
935 if self.install_builtin_tools && self.custom_tools_factory.is_some() {
936 return Err(AppError::Config(
937 "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
938 ));
939 }
940
941 let has_config = self.config.is_some();
945 let has_auth = self.auth.is_some();
946 let mut config = self.config.unwrap_or_default();
947 let settings = match self.settings {
948 Some(settings) => settings,
949 None => {
950 let mut settings = crate::settings::Settings::default();
951 settings.model.provider = config.model.provider.clone();
952 settings.model.name = config.model.name.clone();
953 settings.model.max_tokens = config.model.max_tokens;
954 settings
955 }
956 };
957 config.model.provider = settings.model.provider.clone();
958 config.model.name = settings.model.name.clone();
959 config.model.max_tokens = settings.model.max_tokens;
960 let mut auth = self.auth.unwrap_or_default();
961 if !has_auth {
962 if let Some(key) = config.anthropic.api_key.as_deref() {
963 auth.0.insert(
964 "anthropic".into(),
965 crate::auth::ProviderAuth::ApiKey {
966 key: key.to_string(),
967 },
968 );
969 }
970 }
971 let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
972 if env_or_auth_key.is_some() || has_auth || !has_config {
973 config.anthropic.api_key = env_or_auth_key;
974 }
975 let cwd = self
976 .cwd
977 .or_else(|| std::env::current_dir().ok())
978 .unwrap_or_else(|| PathBuf::from("."));
979 let permission_gate = self.permission_gate.unwrap_or_else(|| {
980 if self.ui_tx.is_some() || self.headless_permissions {
984 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
985 } else {
986 tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
987 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
988 }
989 });
990
991 let policy: Arc<crate::permissions::Policy> =
993 Arc::new(match self.permissions_policy_path.as_ref() {
994 Some(path) => crate::permissions::Policy::load_or_default(path)?,
995 None => crate::permissions::Policy::default(),
996 });
997 let session_cache = Arc::new(crate::permissions::SessionCache::new());
998
999 let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
1001
1002 let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
1007 let cancel_token = probe_ctx.cancel_token.clone();
1008 let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
1009 if let Some(factory_fn) = self.custom_tools_factory.take() {
1010 let mut t = factory_fn(probe_ctx);
1011 t.extend(extra_tools.clone());
1012 (false, t)
1013 } else {
1014 (self.install_builtin_tools, extra_tools.clone())
1015 };
1016
1017 let factory = SessionFactory {
1018 cwd: cwd.clone(),
1019 settings: settings.clone(),
1020 auth: auth.clone(),
1021 policy: Arc::clone(&policy),
1022 session_cache: Arc::clone(&session_cache),
1023 ui_tx: self.ui_tx.clone(),
1024 headless_permissions: self.headless_permissions,
1025 permission_gate: Arc::clone(&permission_gate),
1026 progress_tx: progress_tx.clone(),
1027 skills: Arc::new(skills.clone()),
1028 install_builtin_tools: install_builtin,
1029 extra_tools: factory_extra_tools,
1030 max_iterations: self.max_iterations,
1031 context_discovery_disabled: self.context_discovery_disabled,
1032 autocompact_enabled: self.autocompact_enabled,
1033 session_store: self.session_store.clone(),
1034 llm_override: self.llm_override.clone(),
1035 current_model: Arc::new(Mutex::new(None)),
1036 cancel_token: cancel_token.clone(),
1037 };
1038
1039 let mode = match self.resume_session_id.take() {
1040 Some(id) => SessionMode::Resume(id.into_string()),
1041 None => SessionMode::New,
1042 };
1043 let (session, llm) = factory.build(mode, None).await?;
1044
1045 Ok(App {
1046 session: arc_swap::ArcSwap::from_pointee(session),
1047 llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1048 factory,
1049 config,
1050 cancel_token,
1051 progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1052 next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1053 skills: Arc::new(skills),
1054 mcp_servers,
1055 session_cache,
1056 })
1057 }
1058}
1059
1060#[cfg(test)]
1061mod tests {
1062 use super::*;
1063 use crate::config::{AnthropicConfig, ModelConfig};
1064 use crate::events::UiEvent;
1065 use crate::user_message::UserMessage;
1066 use async_trait::async_trait;
1067 use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1068 use motosan_agent_tool::ToolDef;
1069 use std::sync::atomic::{AtomicUsize, Ordering};
1070
1071 #[tokio::test]
1072 async fn builder_fails_without_api_key() {
1073 let cfg = Config {
1074 anthropic: AnthropicConfig {
1075 api_key: None,
1076 base_url: "https://api.anthropic.com".into(),
1077 },
1078 model: ModelConfig {
1079 provider: "anthropic".into(),
1080 name: "claude-sonnet-4-6".into(),
1081 max_tokens: 4096,
1082 },
1083 };
1084 let err = match AppBuilder::new()
1085 .with_config(cfg)
1086 .with_builtin_tools()
1087 .build()
1088 .await
1089 {
1090 Ok(_) => panic!("must fail without key"),
1091 Err(err) => err,
1092 };
1093 assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1094 }
1095
1096 struct ToolOnlyLlm {
1097 turn: AtomicUsize,
1098 }
1099
1100 #[async_trait]
1101 impl LlmClient for ToolOnlyLlm {
1102 async fn chat(
1103 &self,
1104 _messages: &[Message],
1105 _tools: &[ToolDef],
1106 ) -> motosan_agent_loop::Result<ChatOutput> {
1107 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1108 if turn == 0 {
1109 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1110 ToolCallItem {
1111 id: "t1".into(),
1112 name: "read".into(),
1113 args: serde_json::json!({"path":"nope.txt"}),
1114 },
1115 ])))
1116 } else {
1117 Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1118 }
1119 }
1120 }
1121
1122 #[tokio::test]
1123 async fn empty_final_message_is_not_emitted() {
1124 let dir = tempfile::tempdir().unwrap();
1125 let mut cfg = Config::default();
1126 cfg.anthropic.api_key = Some("sk-unused".into());
1127 let app = AppBuilder::new()
1128 .with_config(cfg)
1129 .with_cwd(dir.path())
1130 .with_builtin_tools()
1131 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1132 turn: AtomicUsize::new(0),
1133 }))
1134 .build()
1135 .await
1136 .expect("build");
1137 let events: Vec<UiEvent> =
1138 futures::StreamExt::collect(app.send_user_message(UserMessage::text("x"))).await;
1139 let empties = events
1140 .iter()
1141 .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1142 .count();
1143 assert_eq!(
1144 empties, 0,
1145 "should not emit empty final message, got: {events:?}"
1146 );
1147 }
1148
1149 struct EchoLlm;
1150
1151 #[async_trait]
1152 impl LlmClient for EchoLlm {
1153 async fn chat(
1154 &self,
1155 _messages: &[Message],
1156 _tools: &[ToolDef],
1157 ) -> motosan_agent_loop::Result<ChatOutput> {
1158 Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1159 }
1160 }
1161
1162 #[tokio::test]
1163 async fn with_headless_permissions_builds_an_app() {
1164 let dir = tempfile::tempdir().expect("tempdir");
1165 let mut config = Config::default();
1166 config.anthropic.api_key = Some("sk-unused".into());
1167 let app = AppBuilder::new()
1168 .with_config(config)
1169 .with_cwd(dir.path())
1170 .with_builtin_tools()
1171 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1172 .with_headless_permissions()
1173 .build()
1174 .await
1175 .expect("build");
1176 assert!(!app.session_id().is_empty());
1178 }
1179
1180 #[tokio::test]
1181 async fn new_session_swaps_in_a_fresh_empty_session() {
1182 use futures::StreamExt;
1183 let dir = tempfile::tempdir().expect("tempdir");
1184 let mut config = Config::default();
1185 config.anthropic.api_key = Some("sk-unused".into());
1186 let app = AppBuilder::new()
1187 .with_config(config)
1188 .with_cwd(dir.path())
1189 .with_builtin_tools()
1190 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1191 .build()
1192 .await
1193 .expect("build");
1194
1195 let _: Vec<_> = app
1196 .send_user_message(UserMessage::text("hello"))
1197 .collect()
1198 .await;
1199 let id_before = app.session_id();
1200 assert!(!app.session_history().await.expect("history").is_empty());
1201
1202 app.new_session().await.expect("new_session");
1203
1204 assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1205 assert!(
1206 app.session_history().await.expect("history").is_empty(),
1207 "fresh session has no history"
1208 );
1209 }
1210
1211 #[tokio::test]
1212 async fn load_session_restores_a_stored_session_by_id() {
1213 use futures::StreamExt;
1214 let dir = tempfile::tempdir().expect("tempdir");
1215 let store_dir = dir.path().join("sessions");
1216 let store: Arc<dyn SessionStore> =
1217 Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1218 let mut config = Config::default();
1219 config.anthropic.api_key = Some("sk-unused".into());
1220 let app = AppBuilder::new()
1221 .with_config(config)
1222 .with_cwd(dir.path())
1223 .with_builtin_tools()
1224 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1225 .with_session_store(Arc::clone(&store))
1226 .build()
1227 .await
1228 .expect("build");
1229
1230 let _: Vec<_> = app
1231 .send_user_message(UserMessage::text("remember this"))
1232 .collect()
1233 .await;
1234 let original_id = app.session_id();
1235
1236 app.new_session().await.expect("new_session");
1237 assert_ne!(app.session_id(), original_id);
1238
1239 app.load_session(&original_id).await.expect("load_session");
1240 assert_eq!(app.session_id(), original_id);
1241 let history = app.session_history().await.expect("history");
1242 assert!(
1243 history.iter().any(|m| m.text().contains("remember this")),
1244 "loaded session should carry the original turn"
1245 );
1246 }
1247
1248 #[tokio::test]
1249 async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
1250 use futures::StreamExt;
1251 let dir = tempfile::tempdir().expect("tempdir");
1252 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1253 dir.path().join("s"),
1254 ));
1255 let mut config = Config::default();
1256 config.anthropic.api_key = Some("sk-unused".into());
1257 let app = AppBuilder::new()
1258 .with_config(config)
1259 .with_cwd(dir.path())
1260 .with_builtin_tools()
1261 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1262 .with_session_store(store)
1263 .build()
1264 .await
1265 .expect("build");
1266
1267 let _: Vec<_> = app
1268 .send_user_message(UserMessage::text("hello"))
1269 .collect()
1270 .await;
1271 let original_id = app.session_id();
1272
1273 let new_id = app.clone_session().await.expect("clone_session");
1274
1275 assert_ne!(new_id, original_id);
1277 assert_eq!(app.session_id(), new_id);
1278 let history = app.session_history().await.expect("history");
1280 assert!(history.iter().any(|m| m.text().contains("hello")));
1281 }
1282
1283 #[tokio::test]
1284 async fn fork_from_creates_a_branch_off_an_earlier_entry() {
1285 use futures::StreamExt;
1286 let dir = tempfile::tempdir().expect("tempdir");
1287 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1288 dir.path().join("s"),
1289 ));
1290 let mut config = Config::default();
1291 config.anthropic.api_key = Some("sk-unused".into());
1292 let app = AppBuilder::new()
1293 .with_config(config)
1294 .with_cwd(dir.path())
1295 .with_builtin_tools()
1296 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1297 .with_session_store(store)
1298 .build()
1299 .await
1300 .expect("build");
1301
1302 let _: Vec<_> = app
1304 .send_user_message(UserMessage::text("first"))
1305 .collect()
1306 .await;
1307 let _: Vec<_> = app
1308 .send_user_message(UserMessage::text("second"))
1309 .collect()
1310 .await;
1311
1312 let entries = app.session.load_full().entries().await.expect("entries");
1314 let first_id = entries
1315 .iter()
1316 .find_map(|stored| {
1317 let msg = stored.entry.as_message()?;
1318 (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
1319 .then(|| stored.id.clone())
1320 })
1321 .expect("first user message present");
1322
1323 let _: Vec<_> = app
1325 .fork_from(first_id, UserMessage::text("branched"))
1326 .collect()
1327 .await;
1328
1329 let history = app.session_history().await.expect("history");
1330 let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
1331 assert!(
1332 texts.iter().any(|t| t.contains("first")),
1333 "fork keeps the fork-point ancestor"
1334 );
1335 assert!(
1336 texts.iter().any(|t| t.contains("branched")),
1337 "fork includes the new message"
1338 );
1339 assert!(
1340 !texts.iter().any(|t| t.contains("second")),
1341 "fork excludes the abandoned branch"
1342 );
1343 }
1344
1345 #[tokio::test]
1346 async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
1347 use futures::StreamExt;
1348 let dir = tempfile::tempdir().expect("tempdir");
1349 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1350 dir.path().join("s"),
1351 ));
1352 let mut config = Config::default();
1353 config.anthropic.api_key = Some("sk-unused".into());
1354 let app = AppBuilder::new()
1355 .with_config(config)
1356 .with_cwd(dir.path())
1357 .with_builtin_tools()
1358 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1359 .with_session_store(store)
1360 .build()
1361 .await
1362 .expect("build");
1363
1364 let _: Vec<_> = app
1365 .send_user_message(UserMessage::text("alpha"))
1366 .collect()
1367 .await;
1368 let _: Vec<_> = app
1369 .send_user_message(UserMessage::text("bravo"))
1370 .collect()
1371 .await;
1372
1373 let candidates = app.fork_candidates().await.expect("candidates");
1374 let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
1375 assert!(previews[0].contains("bravo"), "got {previews:?}");
1377 assert!(previews.iter().any(|p| p.contains("alpha")));
1378 assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
1380 }
1381
1382 #[tokio::test]
1383 async fn branches_returns_a_tree_for_a_linear_session() {
1384 use futures::StreamExt;
1385 let dir = tempfile::tempdir().expect("tempdir");
1386 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1387 dir.path().join("s"),
1388 ));
1389 let mut config = Config::default();
1390 config.anthropic.api_key = Some("sk-unused".into());
1391 let app = AppBuilder::new()
1392 .with_config(config)
1393 .with_cwd(dir.path())
1394 .with_builtin_tools()
1395 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1396 .with_session_store(store)
1397 .build()
1398 .await
1399 .expect("build");
1400
1401 let _: Vec<_> = app
1402 .send_user_message(UserMessage::text("hello"))
1403 .collect()
1404 .await;
1405 let tree = app.branches().await.expect("branches");
1406 assert!(!tree.nodes.is_empty());
1408 assert!(tree.active_leaf.is_some());
1409 }
1410
1411 #[tokio::test]
1412 async fn switch_model_preserves_history() {
1413 use futures::StreamExt;
1414 let dir = tempfile::tempdir().expect("tempdir");
1415 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1416 dir.path().join("s"),
1417 ));
1418 let mut config = Config::default();
1419 config.anthropic.api_key = Some("sk-unused".into());
1420 let app = AppBuilder::new()
1421 .with_config(config)
1422 .with_cwd(dir.path())
1423 .with_builtin_tools()
1424 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1425 .with_session_store(store)
1426 .build()
1427 .await
1428 .expect("build");
1429
1430 let _: Vec<_> = app
1431 .send_user_message(UserMessage::text("keep me"))
1432 .collect()
1433 .await;
1434 let id_before = app.session_id();
1435
1436 app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
1437 .await
1438 .expect("switch_model");
1439
1440 assert_eq!(
1441 app.session_id(),
1442 id_before,
1443 "switch_model keeps the same session"
1444 );
1445 let history = app.session_history().await.expect("history");
1446 assert!(history.iter().any(|m| m.text().contains("keep me")));
1447 }
1448
1449 #[tokio::test]
1450 async fn switch_model_is_sticky_for_future_session_rebuilds() {
1451 let dir = tempfile::tempdir().expect("tempdir");
1452 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1453 dir.path().join("s"),
1454 ));
1455 let mut config = Config::default();
1456 config.anthropic.api_key = Some("sk-unused".into());
1457 let app = AppBuilder::new()
1458 .with_config(config)
1459 .with_cwd(dir.path())
1460 .with_builtin_tools()
1461 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1462 .with_session_store(store)
1463 .build()
1464 .await
1465 .expect("build");
1466
1467 let selected = crate::model::ModelId::from("claude-opus-4-7");
1468 app.switch_model(&selected).await.expect("switch_model");
1469 app.new_session().await.expect("new_session");
1470
1471 assert_eq!(app.factory.current_model(), Some(selected));
1472 }
1473
1474 struct SleepThenDoneLlm {
1475 turn: AtomicUsize,
1476 }
1477
1478 #[async_trait]
1479 impl LlmClient for SleepThenDoneLlm {
1480 async fn chat(
1481 &self,
1482 _messages: &[Message],
1483 _tools: &[ToolDef],
1484 ) -> motosan_agent_loop::Result<ChatOutput> {
1485 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1486 if turn == 0 {
1487 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1488 ToolCallItem {
1489 id: "sleep".into(),
1490 name: "bash".into(),
1491 args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
1492 },
1493 ])))
1494 } else {
1495 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
1496 }
1497 }
1498 }
1499
1500 #[tokio::test]
1501 async fn cancel_reaches_builtin_tools_after_session_rebuild() {
1502 use futures::StreamExt;
1503 let dir = tempfile::tempdir().expect("tempdir");
1504 let mut config = Config::default();
1505 config.anthropic.api_key = Some("sk-unused".into());
1506 let app = Arc::new(
1507 AppBuilder::new()
1508 .with_config(config)
1509 .with_cwd(dir.path())
1510 .with_builtin_tools()
1511 .with_llm(Arc::new(SleepThenDoneLlm {
1512 turn: AtomicUsize::new(0),
1513 }) as Arc<dyn LlmClient>)
1514 .build()
1515 .await
1516 .expect("build"),
1517 );
1518
1519 app.new_session().await.expect("new_session");
1520 let running_app = Arc::clone(&app);
1521 let handle = tokio::spawn(async move {
1522 running_app
1523 .send_user_message(UserMessage::text("run a slow command"))
1524 .collect::<Vec<_>>()
1525 .await
1526 });
1527 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1528 app.cancel();
1529
1530 let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
1531 .await
1532 .expect("turn should finish after cancellation")
1533 .expect("join");
1534 assert!(
1535 events.iter().any(|event| {
1536 matches!(
1537 event,
1538 UiEvent::ToolCallCompleted { result, .. }
1539 if result.text.contains("command cancelled by user")
1540 )
1541 }),
1542 "cancel should reach the rebuilt bash tool: {events:?}"
1543 );
1544 }
1545
1546 #[tokio::test]
1547 async fn compact_summarizes_a_session_with_enough_history() {
1548 struct DoneLlm;
1549 #[async_trait]
1550 impl LlmClient for DoneLlm {
1551 async fn chat(
1552 &self,
1553 _messages: &[Message],
1554 _tools: &[ToolDef],
1555 ) -> motosan_agent_loop::Result<ChatOutput> {
1556 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
1557 }
1558 }
1559
1560 let dir = tempfile::tempdir().expect("tempdir");
1561 let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
1562 dir.path().join("sessions"),
1563 ));
1564 let mut config = Config::default();
1565 config.anthropic.api_key = Some("sk-unused".into());
1566 let app = AppBuilder::new()
1567 .with_config(config)
1568 .with_cwd(dir.path())
1569 .with_builtin_tools()
1570 .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
1571 .with_session_store(store)
1572 .build()
1573 .await
1574 .expect("build");
1575
1576 for i in 0..4 {
1583 let _: Vec<_> = app
1584 .send_user_message(UserMessage::text(format!("turn {i}")))
1585 .collect()
1586 .await;
1587 }
1588
1589 app.compact().await.expect("compact should succeed");
1590
1591 let history = app.session_history().await.expect("history");
1594 assert!(
1595 !history.is_empty(),
1596 "session should still have content post-compaction"
1597 );
1598 }
1599
1600 #[test]
1601 fn anthropic_env_api_key_overrides_auth_json_key() {
1602 let mut auth = crate::auth::Auth::default();
1603 auth.0.insert(
1604 "anthropic".into(),
1605 crate::auth::ProviderAuth::ApiKey {
1606 key: "sk-auth".into(),
1607 },
1608 );
1609
1610 let key = anthropic_api_key_from(&auth, |name| {
1611 (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
1612 });
1613 assert_eq!(key.as_deref(), Some("sk-env"));
1614 }
1615
1616 #[tokio::test]
1617 async fn with_settings_overrides_deprecated_config_model() {
1618 use crate::settings::Settings;
1619
1620 let mut config = Config::default();
1621 config.model.name = "from-config".into();
1622 config.anthropic.api_key = Some("sk-config".into());
1623
1624 let mut settings = Settings::default();
1625 settings.model.name = "from-settings".into();
1626
1627 let tmp = tempfile::tempdir().unwrap();
1628 let app = AppBuilder::new()
1629 .with_config(config)
1630 .with_settings(settings)
1631 .with_cwd(tmp.path())
1632 .disable_context_discovery()
1633 .with_llm(Arc::new(EchoLlm))
1634 .build()
1635 .await
1636 .expect("build");
1637 assert_eq!(app.config().model.name, "from-settings");
1638 assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
1639 }
1640
1641 #[tokio::test]
1642 async fn with_settings_synthesises_legacy_config_for_build() {
1643 use crate::auth::{Auth, ProviderAuth};
1644 use crate::settings::Settings;
1645
1646 let mut settings = Settings::default();
1647 settings.model.name = "claude-sonnet-4-6".into();
1648
1649 let mut auth = Auth::default();
1650 auth.0.insert(
1651 "anthropic".into(),
1652 ProviderAuth::ApiKey {
1653 key: "sk-test".into(),
1654 },
1655 );
1656
1657 let tmp = tempfile::tempdir().unwrap();
1658 let app = AppBuilder::new()
1659 .with_settings(settings)
1660 .with_auth(auth)
1661 .with_cwd(tmp.path())
1662 .with_builtin_tools()
1663 .disable_context_discovery()
1664 .with_llm(Arc::new(EchoLlm))
1665 .build()
1666 .await
1667 .expect("build");
1668 let _ = app;
1669 }
1670
1671 #[tokio::test]
1672 async fn cancel_before_turn_does_not_poison_future_turns() {
1673 let dir = tempfile::tempdir().unwrap();
1674 let mut cfg = Config::default();
1675 cfg.anthropic.api_key = Some("sk-unused".into());
1676 let app = AppBuilder::new()
1677 .with_config(cfg)
1678 .with_cwd(dir.path())
1679 .with_builtin_tools()
1680 .with_llm(std::sync::Arc::new(EchoLlm))
1681 .build()
1682 .await
1683 .expect("build");
1684
1685 app.cancel();
1686 let events: Vec<UiEvent> = app
1687 .send_user_message(UserMessage::text("x"))
1688 .collect()
1689 .await;
1690
1691 assert!(
1692 events
1693 .iter()
1694 .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
1695 "turn should use a fresh cancellation token: {events:?}"
1696 );
1697 }
1698
1699 #[test]
1700 fn map_event_matches_started_and_completed_ids_by_tool_name() {
1701 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1702
1703 let started_bash = map_event(
1704 AgentEvent::Core(CoreEvent::ToolStarted {
1705 name: "bash".into(),
1706 }),
1707 &tracker,
1708 );
1709 let started_read = map_event(
1710 AgentEvent::Core(CoreEvent::ToolStarted {
1711 name: "read".into(),
1712 }),
1713 &tracker,
1714 );
1715 let completed_bash = map_event(
1716 AgentEvent::Core(CoreEvent::ToolCompleted {
1717 name: "bash".into(),
1718 result: motosan_agent_tool::ToolResult::text("ok"),
1719 }),
1720 &tracker,
1721 );
1722 let completed_read = map_event(
1723 AgentEvent::Core(CoreEvent::ToolCompleted {
1724 name: "read".into(),
1725 result: motosan_agent_tool::ToolResult::text("ok"),
1726 }),
1727 &tracker,
1728 );
1729
1730 assert!(matches!(
1731 started_bash,
1732 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
1733 ));
1734 assert!(matches!(
1735 started_read,
1736 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
1737 ));
1738 assert!(matches!(
1739 completed_bash,
1740 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
1741 ));
1742 assert!(matches!(
1743 completed_read,
1744 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
1745 ));
1746 }
1747
1748 #[test]
1749 fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
1750 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1751 let s1 = map_event(
1752 AgentEvent::Core(CoreEvent::ToolStarted {
1753 name: "bash".into(),
1754 }),
1755 &tracker,
1756 );
1757 let s2 = map_event(
1758 AgentEvent::Core(CoreEvent::ToolStarted {
1759 name: "bash".into(),
1760 }),
1761 &tracker,
1762 );
1763 let c1 = map_event(
1764 AgentEvent::Core(CoreEvent::ToolCompleted {
1765 name: "bash".into(),
1766 result: motosan_agent_tool::ToolResult::text("a"),
1767 }),
1768 &tracker,
1769 );
1770 let c2 = map_event(
1771 AgentEvent::Core(CoreEvent::ToolCompleted {
1772 name: "bash".into(),
1773 result: motosan_agent_tool::ToolResult::text("b"),
1774 }),
1775 &tracker,
1776 );
1777
1778 let id_s1 = match s1 {
1779 Some(UiEvent::ToolCallStarted { id, .. }) => id,
1780 other => panic!("{other:?}"),
1781 };
1782 let id_s2 = match s2 {
1783 Some(UiEvent::ToolCallStarted { id, .. }) => id,
1784 other => panic!("{other:?}"),
1785 };
1786 let id_c1 = match c1 {
1787 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
1788 other => panic!("{other:?}"),
1789 };
1790 let id_c2 = match c2 {
1791 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
1792 other => panic!("{other:?}"),
1793 };
1794
1795 assert_eq!(id_s1, id_c1);
1796 assert_eq!(id_s2, id_c2);
1797 assert_ne!(id_s1, id_s2);
1798 }
1799
1800 #[tokio::test]
1801 async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
1802 let dir = tempfile::tempdir().unwrap();
1803 let mut cfg = Config::default();
1804 cfg.anthropic.api_key = Some("sk-unused".into());
1805 let app = AppBuilder::new()
1806 .with_config(cfg)
1807 .with_cwd(dir.path())
1808 .with_builtin_tools()
1809 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1810 turn: AtomicUsize::new(0),
1811 }))
1812 .build()
1813 .await
1814 .expect("build");
1815
1816 let mut first = Box::pin(app.send_user_message(UserMessage::text("first")));
1817 let first_event = first.next().await;
1818 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
1819
1820 let second_events: Vec<UiEvent> = app
1821 .send_user_message(UserMessage::text("second"))
1822 .collect()
1823 .await;
1824 assert_eq!(
1825 second_events.len(),
1826 1,
1827 "expected immediate single error event, got: {second_events:?}"
1828 );
1829 assert!(matches!(
1830 &second_events[0],
1831 UiEvent::Error(msg) if msg.contains("single-turn-per-App")
1832 ));
1833 }
1834
1835 #[tokio::test]
1836 async fn attachment_error_is_emitted_before_the_single_turn_guard_fires() {
1837 let dir = tempfile::tempdir().unwrap();
1843 let mut cfg = Config::default();
1844 cfg.anthropic.api_key = Some("sk-unused".into());
1845 let app = AppBuilder::new()
1846 .with_config(cfg)
1847 .with_cwd(dir.path())
1848 .with_builtin_tools()
1849 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1850 turn: AtomicUsize::new(0),
1851 }))
1852 .build()
1853 .await
1854 .expect("build");
1855
1856 let mut first =
1858 Box::pin(app.send_user_message(crate::user_message::UserMessage::text("first")));
1859 let first_event = first.next().await;
1860 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
1861
1862 let bad = crate::user_message::UserMessage {
1864 text: "second".into(),
1865 attachments: vec![crate::user_message::Attachment::Image {
1866 path: std::path::PathBuf::from("/tmp/this-file-must-not-exist-capo-v06-test.png"),
1867 }],
1868 };
1869 let second_events: Vec<UiEvent> = app.send_user_message(bad).collect().await;
1870
1871 assert_eq!(
1872 second_events.len(),
1873 1,
1874 "expected exactly one event (the attachment error); got: {second_events:?}"
1875 );
1876 assert!(
1877 matches!(
1878 &second_events[0],
1879 UiEvent::AttachmentError {
1880 kind: crate::user_message::AttachmentErrorKind::NotFound,
1881 ..
1882 }
1883 ),
1884 "expected AttachmentError::NotFound as first event; got {second_events:?}"
1885 );
1886 }
1887
1888 #[test]
1889 fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
1890 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1891 assert_eq!(progress_event_id(&tracker), "tool_unknown");
1892
1893 let only = map_event(
1894 AgentEvent::Core(CoreEvent::ToolStarted {
1895 name: "bash".into(),
1896 }),
1897 &tracker,
1898 );
1899 let only_id = match only {
1900 Some(UiEvent::ToolCallStarted { id, .. }) => id,
1901 other => panic!("{other:?}"),
1902 };
1903 assert_eq!(progress_event_id(&tracker), only_id);
1904
1905 let _second = map_event(
1906 AgentEvent::Core(CoreEvent::ToolStarted {
1907 name: "read".into(),
1908 }),
1909 &tracker,
1910 );
1911 assert_eq!(progress_event_id(&tracker), "tool_unknown");
1912 }
1913
1914 #[tokio::test]
1915 async fn builder_rejects_builtin_and_custom_tools_together() {
1916 let mut cfg = Config::default();
1917 cfg.anthropic.api_key = Some("sk-unused".into());
1918 let dir = tempfile::tempdir().unwrap();
1919 let err = match AppBuilder::new()
1920 .with_config(cfg)
1921 .with_cwd(dir.path())
1922 .with_builtin_tools()
1923 .with_custom_tools_factory(|_| Vec::new())
1924 .build()
1925 .await
1926 {
1927 Ok(_) => panic!("must reject conflicting tool configuration"),
1928 Err(err) => err,
1929 };
1930
1931 assert!(format!("{err}").contains("mutually exclusive"));
1932 }
1933
1934 #[tokio::test]
1936 async fn two_turns_in_same_session_share_history() {
1937 #[derive(Default)]
1938 struct CounterLlm {
1939 turn: AtomicUsize,
1940 }
1941 #[async_trait]
1942 impl LlmClient for CounterLlm {
1943 async fn chat(
1944 &self,
1945 messages: &[Message],
1946 _tools: &[ToolDef],
1947 ) -> motosan_agent_loop::Result<ChatOutput> {
1948 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1949 let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
1950 Ok(ChatOutput::new(LlmResponse::Message(answer)))
1951 }
1952 }
1953
1954 let tmp = tempfile::tempdir().unwrap();
1955 let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
1956 tmp.path().to_path_buf(),
1957 ));
1958
1959 let app = AppBuilder::new()
1960 .with_settings(crate::settings::Settings::default())
1961 .with_auth(crate::auth::Auth::default())
1962 .with_cwd(tmp.path())
1963 .with_builtin_tools()
1964 .disable_context_discovery()
1965 .with_llm(std::sync::Arc::new(CounterLlm::default()))
1966 .with_session_store(store)
1967 .build_with_session(None)
1968 .await
1969 .expect("build");
1970
1971 let _events1: Vec<UiEvent> = app
1972 .send_user_message(UserMessage::text("hi"))
1973 .collect()
1974 .await;
1975 let events2: Vec<UiEvent> = app
1976 .send_user_message(UserMessage::text("again"))
1977 .collect()
1978 .await;
1979
1980 let saw_more_than_one = events2.iter().any(|e| {
1982 matches!(
1983 e,
1984 UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
1985 )
1986 });
1987 assert!(
1988 saw_more_than_one,
1989 "second turn should have seen history; events: {events2:?}"
1990 );
1991 }
1992}
1993
1994#[cfg(test)]
1995mod skills_builder_tests {
1996 use super::*;
1997 use crate::skills::types::{Skill, SkillSource};
1998 use std::path::PathBuf;
1999
2000 fn fixture() -> Skill {
2001 Skill {
2002 name: "x".into(),
2003 description: "d".into(),
2004 file_path: PathBuf::from("/x.md"),
2005 base_dir: PathBuf::from("/"),
2006 disable_model_invocation: false,
2007 source: SkillSource::Global,
2008 }
2009 }
2010
2011 #[test]
2012 fn with_skills_stores_skills() {
2013 let b = AppBuilder::new().with_skills(vec![fixture()]);
2014 assert_eq!(b.skills.len(), 1);
2015 assert_eq!(b.skills[0].name, "x");
2016 }
2017
2018 #[test]
2019 fn without_skills_clears() {
2020 let b = AppBuilder::new()
2021 .with_skills(vec![fixture()])
2022 .without_skills();
2023 assert!(b.skills.is_empty());
2024 }
2025}
2026
2027#[cfg(test)]
2028mod mcp_builder_tests {
2029 use super::*;
2030 use motosan_agent_tool::Tool;
2031
2032 struct FakeTool;
2034 impl Tool for FakeTool {
2035 fn def(&self) -> motosan_agent_tool::ToolDef {
2036 motosan_agent_tool::ToolDef {
2037 name: "fake__echo".into(),
2038 description: "test".into(),
2039 input_schema: serde_json::json!({"type": "object"}),
2040 }
2041 }
2042 fn call(
2043 &self,
2044 _args: serde_json::Value,
2045 _ctx: &motosan_agent_tool::ToolContext,
2046 ) -> std::pin::Pin<
2047 Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
2048 > {
2049 Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
2050 }
2051 }
2052
2053 #[test]
2054 fn with_extra_tools_stores_tools() {
2055 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
2056 let b = AppBuilder::new().with_extra_tools(tools);
2057 assert_eq!(b.extra_tools.len(), 1);
2058 }
2059}