1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::{Stream, StreamExt};
8use motosan_agent_loop::{
9 AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
10 CoreEvent, Engine, LlmClient, SessionStore,
11};
12use motosan_agent_tool::ToolContext;
13use tokio::sync::mpsc;
14
15use crate::agent::build_system_prompt;
16use crate::config::Config;
17use crate::error::{AppError, Result};
18use crate::events::{ProgressChunk, UiEvent, UiToolResult};
19use crate::llm::build_llm_client;
20use crate::permissions::{NoOpPermissionGate, PermissionGate};
21use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
22
23#[derive(Debug, Clone)]
25pub(crate) enum SessionMode {
26 New,
28 Resume(String),
30}
31
32struct SharedLlm {
37 client: Arc<dyn LlmClient>,
38}
39
40impl SharedLlm {
41 fn new(client: Arc<dyn LlmClient>) -> Self {
42 Self { client }
43 }
44
45 fn client(&self) -> Arc<dyn LlmClient> {
46 Arc::clone(&self.client)
47 }
48}
49
50pub(crate) struct SessionFactory {
51 cwd: PathBuf,
52 settings: crate::settings::Settings,
53 auth: crate::auth::Auth,
54 policy: Arc<crate::permissions::Policy>,
55 session_cache: Arc<crate::permissions::SessionCache>,
56 ui_tx: Option<mpsc::Sender<UiEvent>>,
57 headless_permissions: bool,
58 permission_gate: Arc<dyn PermissionGate>,
59 progress_tx: mpsc::Sender<ToolProgressChunk>,
62 skills: Arc<Vec<crate::skills::Skill>>,
63 install_builtin_tools: bool,
64 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
65 max_iterations: usize,
66 context_discovery_disabled: bool,
67 autocompact_enabled: bool,
68 session_store: Option<Arc<dyn SessionStore>>,
69 llm_override: Option<Arc<dyn LlmClient>>,
70 current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
71 cancel_token: SharedCancelToken,
72}
73
74impl SessionFactory {
75 fn current_model(&self) -> Option<crate::model::ModelId> {
76 match self.current_model.lock() {
77 Ok(guard) => guard.clone(),
78 Err(poisoned) => poisoned.into_inner().clone(),
79 }
80 }
81
82 fn set_current_model(&self, model: crate::model::ModelId) {
83 match self.current_model.lock() {
84 Ok(mut guard) => *guard = Some(model),
85 Err(poisoned) => *poisoned.into_inner() = Some(model),
86 }
87 }
88
89 async fn build(
94 &self,
95 mode: SessionMode,
96 model_override: Option<&crate::model::ModelId>,
97 ) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
98 let effective_model = model_override.cloned().or_else(|| self.current_model());
100 let mut settings = self.settings.clone();
101 if let Some(m) = &effective_model {
102 settings.model.name = m.as_str().to_string();
103 }
104
105 let llm = if effective_model.is_none() {
106 self.llm_override.as_ref().map_or_else(
107 || build_llm_client(&settings, &self.auth),
108 |llm| Ok(Arc::clone(llm)),
109 )?
110 } else {
111 build_llm_client(&settings, &self.auth)?
112 };
113
114 let tool_ctx = ToolCtx::new_with_cancel_token(
117 &self.cwd,
118 Arc::clone(&self.permission_gate),
119 self.progress_tx.clone(),
120 self.cancel_token.clone(),
121 );
122 let mut tools = if self.install_builtin_tools {
123 builtin_tools(tool_ctx.clone())
124 } else {
125 Vec::new()
126 };
127 tools.extend(self.extra_tools.iter().cloned());
128
129 let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
130 let base_prompt = build_system_prompt(&tool_names, &self.skills);
131 let system_prompt = if self.context_discovery_disabled {
132 base_prompt
133 } else {
134 let agent_dir = crate::paths::agent_dir();
135 let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
136 crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
137 };
138 let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
139
140 let mut engine_builder = Engine::builder()
141 .max_iterations(self.max_iterations)
142 .system_prompt(system_prompt)
143 .tool_context(motosan_tool_context);
144 for tool in tools {
145 engine_builder = engine_builder.tool(tool);
146 }
147 if let Some(ui_tx) = &self.ui_tx {
148 let ext = crate::permissions::PermissionExtension::new(
149 Arc::clone(&self.policy),
150 Arc::clone(&self.session_cache),
151 self.cwd.clone(),
152 ui_tx.clone(),
153 );
154 engine_builder = engine_builder.extension(Box::new(ext));
155 } else if self.headless_permissions {
156 let ext = crate::permissions::PermissionExtension::headless(
157 Arc::clone(&self.policy),
158 Arc::clone(&self.session_cache),
159 self.cwd.clone(),
160 );
161 engine_builder = engine_builder.extension(Box::new(ext));
162 }
163 if self.autocompact_enabled
164 && settings.session.compact_at_context_pct > 0.0
165 && settings.session.compact_at_context_pct < 1.0
166 {
167 let cfg = AutocompactConfig {
168 threshold: settings.session.compact_at_context_pct,
169 max_context_tokens: settings.session.max_context_tokens,
170 keep_turns: settings.session.keep_turns.max(1),
171 };
172 engine_builder = engine_builder
173 .extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
174 }
175 let engine = engine_builder.build();
176
177 let session = match (&mode, &self.session_store) {
178 (SessionMode::Resume(id), Some(store)) => {
179 let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
180 .await
181 .map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
182 let entries = s
183 .entries()
184 .await
185 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
186 crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
187 s
188 }
189 (SessionMode::Resume(_), None) => {
190 return Err(AppError::Config("resume requires a session store".into()));
191 }
192 (SessionMode::New, Some(store)) => {
193 let id = crate::session::SessionId::new();
194 AgentSession::new_with_store(
195 id.into_string(),
196 Arc::clone(store),
197 engine,
198 Arc::clone(&llm),
199 )
200 }
201 (SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
202 };
203
204 Ok((session, llm))
205 }
206}
207
208pub struct App {
209 session: arc_swap::ArcSwap<AgentSession>,
210 llm: arc_swap::ArcSwap<SharedLlm>,
211 factory: SessionFactory,
212 config: Config,
213 cancel_token: SharedCancelToken,
214 progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
215 next_tool_id: Arc<Mutex<ToolCallTracker>>,
216 skills: Arc<Vec<crate::skills::Skill>>,
217 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
218 pub(crate) extension_registry: Arc<crate::extensions::ExtensionRegistry>,
220 extension_diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
221 pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
222}
223
224impl App {
225 pub fn config(&self) -> &Config {
226 &self.config
227 }
228
229 pub fn cancel(&self) {
233 self.cancel_token.cancel();
234 }
235
236 pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
240 Arc::clone(&self.session_cache)
241 }
242
243 pub fn extension_registry(&self) -> Arc<crate::extensions::ExtensionRegistry> {
246 Arc::clone(&self.extension_registry)
247 }
248
249 pub fn extension_diagnostics(&self) -> Arc<Vec<crate::extensions::ExtensionDiagnostic>> {
251 Arc::clone(&self.extension_diagnostics)
252 }
253
254 pub fn session_id(&self) -> String {
257 self.session.load().session_id().to_string()
258 }
259
260 pub async fn session_history(
269 &self,
270 ) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
271 self.session.load_full().history().await
272 }
273
274 pub async fn compact(&self) -> Result<()> {
280 use motosan_agent_loop::ThresholdStrategy;
281 let strategy = ThresholdStrategy {
282 threshold: 0.0,
283 ..ThresholdStrategy::default()
284 };
285 let llm = self.llm.load_full().client();
286 self.session
287 .load_full()
288 .maybe_compact(&strategy, llm)
289 .await
290 .map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
291 Ok(())
292 }
293
294 pub async fn new_session(&self) -> Result<()> {
298 self.fire_session_before_switch("new", None).await?;
299 let (session, llm) = self.factory.build(SessionMode::New, None).await?;
300 self.session.store(Arc::new(session));
301 self.llm.store(Arc::new(SharedLlm::new(llm)));
302 Ok(())
303 }
304
305 pub async fn load_session(&self, id: &str) -> Result<()> {
311 self.fire_session_before_switch("load", Some(id)).await?;
312 self.load_session_without_hook(id).await
313 }
314
315 async fn load_session_without_hook(&self, id: &str) -> Result<()> {
316 let (session, llm) = self
317 .factory
318 .build(SessionMode::Resume(id.to_string()), None)
319 .await?;
320 self.session.store(Arc::new(session));
321 self.llm.store(Arc::new(SharedLlm::new(llm)));
322 Ok(())
323 }
324
325 pub async fn clone_session(&self) -> Result<String> {
334 self.fire_session_before_switch("clone", None).await?;
335 let Some(store) = self.factory.session_store.as_ref() else {
336 return Err(AppError::Config("clone requires a session store".into()));
337 };
338 let source_id = self.session.load().session_id().to_string();
339 let new_id = crate::session::SessionId::new().into_string();
340 let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
341 catalog
342 .fork(&source_id, &new_id)
343 .await
344 .map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
345 self.load_session_without_hook(&new_id).await?;
346 Ok(new_id)
347 }
348
349 pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
364 self.fire_session_before_switch("model_switch", None)
365 .await?;
366 let current_id = self.session.load().session_id().to_string();
367 let (session, llm) = self
368 .factory
369 .build(SessionMode::Resume(current_id), Some(model))
370 .await?;
371 self.factory.set_current_model(model.clone());
372 self.session.store(Arc::new(session));
373 self.llm.store(Arc::new(SharedLlm::new(llm)));
374 Ok(())
375 }
376
377 pub async fn disconnect_mcp(&self) {
380 for (name, server) in &self.mcp_servers {
381 let _ =
382 tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
383 tracing::debug!(target: "mcp", server = %name, "disconnected");
384 }
385 }
386
387 fn run_turn(
388 &self,
389 msg: crate::user_message::UserMessage,
390 fork_from: Option<motosan_agent_loop::EntryId>,
391 ) -> impl Stream<Item = UiEvent> + Send + 'static {
392 let session = self.session.load_full();
393 let skills = Arc::clone(&self.skills);
394 let cancel_token = self.cancel_token.clone();
395 let tracker = Arc::clone(&self.next_tool_id);
396 let progress = Arc::clone(&self.progress_rx);
397
398 async_stream::stream! {
399 let new_user = {
403 let expanded_text = crate::skills::expand::expand_skill_command(&msg.text, &skills);
407 let expanded_msg = crate::user_message::UserMessage {
408 text: expanded_text,
409 attachments: msg.attachments.clone(),
410 };
411 match crate::user_message::prepare_user_message(&expanded_msg) {
412 Ok(m) => m,
413 Err(err) => {
414 yield UiEvent::AttachmentError {
415 kind: err.kind(),
416 message: err.to_string(),
417 };
418 return;
419 }
420 }
421 };
422
423 let mut progress_guard = match progress.try_lock() {
425 Ok(guard) => guard,
426 Err(_) => {
427 yield UiEvent::Error(
428 "another turn is already running; capo is single-turn-per-App".into(),
429 );
430 return;
431 }
432 };
433
434 let cancel = cancel_token.reset();
436
437 yield UiEvent::AgentTurnStarted;
438 yield UiEvent::AgentThinking;
439
440 let handle = match fork_from {
442 None => {
443 let history = match session.history().await {
445 Ok(h) => h,
446 Err(err) => {
447 yield UiEvent::Error(format!("session.history failed: {err}"));
448 return;
449 }
450 };
451 let mut messages = history;
452 messages.push(new_user);
453 match session.start_turn(messages).await {
454 Ok(h) => h,
455 Err(err) => {
456 yield UiEvent::Error(format!("session.start_turn failed: {err}"));
457 return;
458 }
459 }
460 }
461 Some(from) => {
462 match session.fork_turn(from, vec![new_user]).await {
464 Ok(h) => h,
465 Err(err) => {
466 yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
467 return;
468 }
469 }
470 }
471 };
472 let previous_len = handle.previous_len;
473 let epoch = handle.epoch;
474 let branch_parent = handle.branch_parent;
475 let ops_tx = handle.ops_tx.clone();
476 let mut agent_stream = handle.stream;
477
478 let interrupt_bridge = tokio::spawn(async move {
486 cancel.cancelled().await;
487 let _ = ops_tx.send(AgentOp::Interrupt).await;
488 });
489
490 let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
492 let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
493
494 loop {
495 while let Ok(chunk) = progress_guard.try_recv() {
497 yield UiEvent::ToolCallProgress {
498 id: progress_event_id(&tracker),
499 chunk: ProgressChunk::from(chunk),
500 };
501 }
502
503 tokio::select! {
504 biased;
505 maybe_item = agent_stream.next() => {
506 match maybe_item {
507 Some(AgentStreamItem::Event(ev)) => {
508 if let Some(ui) = map_event(ev, &tracker) {
509 yield ui;
510 }
511 }
512 Some(AgentStreamItem::Terminal(term)) => {
513 terminal_result = Some(term.result);
514 terminal_messages = Some(term.messages);
515 break;
516 }
517 None => break,
518 }
519 }
520 Some(chunk) = progress_guard.recv() => {
521 yield UiEvent::ToolCallProgress {
522 id: progress_event_id(&tracker),
523 chunk: ProgressChunk::from(chunk),
524 };
525 }
526 }
527 }
528
529 interrupt_bridge.abort();
531
532 if let Some(msgs) = terminal_messages.as_ref() {
534 if let Err(err) = session
535 .record_turn_outcome(epoch, previous_len, msgs, branch_parent)
536 .await
537 {
538 yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
539 }
540 }
541
542 match terminal_result {
544 Some(Ok(_)) => {
545 let final_text = terminal_messages
546 .as_ref()
547 .and_then(|msgs| {
548 msgs.iter()
549 .rev()
550 .find(|m| m.role() == motosan_agent_loop::Role::Assistant)
551 .map(|m| m.text())
552 })
553 .unwrap_or_default();
554 if !final_text.is_empty() {
555 yield UiEvent::AgentMessageComplete(final_text);
556 }
557 while let Ok(chunk) = progress_guard.try_recv() {
559 yield UiEvent::ToolCallProgress {
560 id: progress_event_id(&tracker),
561 chunk: ProgressChunk::from(chunk),
562 };
563 }
564 yield UiEvent::AgentTurnComplete;
565 }
566 Some(Err(err)) => {
567 yield UiEvent::Error(format!("{err}"));
568 }
569 None => { }
570 }
571 }
572 }
573
574 pub fn send_user_message(
575 &self,
576 msg: crate::user_message::UserMessage,
577 ) -> impl Stream<Item = UiEvent> + Send + 'static {
578 self.run_turn(msg, None)
579 }
580
581 pub fn fork_from(
586 &self,
587 from: motosan_agent_loop::EntryId,
588 message: crate::user_message::UserMessage,
589 ) -> impl Stream<Item = UiEvent> + Send + 'static {
590 let registry = Arc::clone(&self.extension_registry);
591 let inner = self.run_turn(message, Some(from));
592 async_stream::stream! {
593 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
594 match dispatch_session_before_switch(®istry, "fork", None).await {
595 HookOutcome::Continue => {}
596 HookOutcome::Cancelled { extension_name, reason } => {
597 let msg = match reason {
598 Some(r) => format!("extension `{extension_name}` cancelled fork: {r}"),
599 None => format!("extension `{extension_name}` cancelled fork"),
600 };
601 yield UiEvent::Error(msg);
602 return;
603 }
604 }
605 let mut inner = Box::pin(inner);
606 while let Some(ev) = futures::StreamExt::next(&mut inner).await {
607 yield ev;
608 }
609 }
610 }
611
612 pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
616 let entries = self
617 .session
618 .load_full()
619 .entries()
620 .await
621 .map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
622 let branch = motosan_agent_loop::active_branch(&entries);
623 let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
624 .iter()
625 .filter_map(|stored| {
626 let msg = stored.entry.as_message()?;
627 if !matches!(msg.role(), motosan_agent_loop::Role::User) {
628 return None;
629 }
630 let preview: String = msg
631 .text()
632 .lines()
633 .next()
634 .unwrap_or("")
635 .chars()
636 .take(80)
637 .collect();
638 Some((stored.id.clone(), preview))
639 })
640 .collect();
641 out.reverse();
642 Ok(out)
643 }
644
645 pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
647 self.session
648 .load_full()
649 .branches()
650 .await
651 .map_err(|e| AppError::Config(format!("branches failed: {e}")))
652 }
653
654 async fn fire_session_before_switch(
657 &self,
658 reason: &str,
659 session_id: Option<&str>,
660 ) -> Result<()> {
661 use crate::extensions::dispatcher::{dispatch_session_before_switch, HookOutcome};
662 match dispatch_session_before_switch(&self.extension_registry, reason, session_id).await {
663 HookOutcome::Continue => Ok(()),
664 HookOutcome::Cancelled {
665 extension_name,
666 reason,
667 } => Err(AppError::HookCancelled {
668 extension_name,
669 reason,
670 }),
671 }
672 }
673}
674
675#[derive(Debug, Default)]
676struct ToolCallTracker {
677 next_id: usize,
678 pending: VecDeque<(String, String)>,
679}
680
681impl ToolCallTracker {
682 fn start(&mut self, name: &str) -> String {
683 self.next_id += 1;
684 let id = format!("tool_{}", self.next_id);
685 self.pending.push_back((name.to_string(), id.clone()));
686 id
687 }
688
689 fn complete(&mut self, name: &str) -> String {
690 if let Some(pos) = self
691 .pending
692 .iter()
693 .position(|(pending_name, _)| pending_name == name)
694 {
695 if let Some((_, id)) = self.pending.remove(pos) {
696 return id;
697 }
698 }
699
700 self.next_id += 1;
701 format!("tool_{}", self.next_id)
702 }
703
704 fn progress_id(&self) -> Option<String> {
709 match self.pending.len() {
710 1 => self.pending.front().map(|(_, id)| id.clone()),
711 _ => None,
712 }
713 }
714}
715
716fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
717 match tracker.lock() {
718 Ok(guard) => guard,
719 Err(poisoned) => poisoned.into_inner(),
720 }
721}
722
723fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
724 lock_tool_tracker(tracker)
725 .progress_id()
726 .unwrap_or_else(|| "tool_unknown".to_string())
727}
728
729fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
730where
731 F: Fn(&str) -> Option<String>,
732{
733 env_lookup("ANTHROPIC_API_KEY")
734 .map(|key| key.trim().to_string())
735 .filter(|key| !key.is_empty())
736 .or_else(|| auth.api_key("anthropic").map(str::to_string))
737}
738
739fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
740 match ev {
741 AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
742 AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
743 let id = lock_tool_tracker(tool_tracker).start(&name);
744 Some(UiEvent::ToolCallStarted {
745 id,
746 name,
747 args: serde_json::json!({}),
748 })
749 }
750 AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
751 let id = lock_tool_tracker(tool_tracker).complete(&name);
752 Some(UiEvent::ToolCallCompleted {
753 id,
754 result: UiToolResult {
755 is_error: result.is_error,
756 text: format!("{name}: {result:?}"),
757 },
758 })
759 }
760 _ => None,
761 }
762}
763
764type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
765
766pub struct AppBuilder {
767 config: Option<Config>,
768 cwd: Option<PathBuf>,
769 permission_gate: Option<Arc<dyn PermissionGate>>,
770 install_builtin_tools: bool,
771 max_iterations: usize,
772 llm_override: Option<Arc<dyn LlmClient>>,
773 custom_tools_factory: Option<CustomToolsFactory>,
774 permissions_policy_path: Option<PathBuf>,
775 ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
776 headless_permissions: bool,
777 settings: Option<crate::settings::Settings>,
778 auth: Option<crate::auth::Auth>,
779 context_discovery_disabled: bool,
780 session_store: Option<Arc<dyn SessionStore>>,
782 resume_session_id: Option<crate::session::SessionId>,
783 autocompact_enabled: bool,
784 skills: Vec<crate::skills::Skill>,
786 extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
788 mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
789 extension_registry: Option<Arc<crate::extensions::ExtensionRegistry>>,
790 extension_diagnostics: Option<Arc<Vec<crate::extensions::ExtensionDiagnostic>>>,
791}
792
793impl Default for AppBuilder {
794 fn default() -> Self {
795 Self {
796 config: None,
797 cwd: None,
798 permission_gate: None,
799 install_builtin_tools: false,
800 max_iterations: 20,
801 llm_override: None,
802 custom_tools_factory: None,
803 permissions_policy_path: None,
804 ui_tx: None,
805 headless_permissions: false,
806 settings: None,
807 auth: None,
808 context_discovery_disabled: false,
809 session_store: None,
810 resume_session_id: None,
811 autocompact_enabled: false,
812 skills: Vec::new(),
813 extra_tools: Vec::new(),
814 mcp_servers: Vec::new(),
815 extension_registry: None,
816 extension_diagnostics: None,
817 }
818 }
819}
820
821impl AppBuilder {
822 pub fn new() -> Self {
823 Self::default()
824 }
825
826 pub fn with_config(mut self, cfg: Config) -> Self {
827 self.config = Some(cfg);
828 self
829 }
830
831 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
832 self.cwd = Some(cwd.into());
833 self
834 }
835
836 pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
837 self.permission_gate = Some(gate);
838 self
839 }
840
841 pub fn with_builtin_tools(mut self) -> Self {
847 self.install_builtin_tools = true;
848 self
849 }
850
851 pub fn with_max_iterations(mut self, n: usize) -> Self {
852 self.max_iterations = n;
853 self
854 }
855
856 pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
857 self.llm_override = Some(llm);
858 self
859 }
860
861 pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
862 self.permissions_policy_path = Some(path);
863 self
864 }
865
866 pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
867 self.ui_tx = Some(tx);
868 self
869 }
870
871 pub fn with_headless_permissions(mut self) -> Self {
876 self.headless_permissions = true;
877 self
878 }
879
880 pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
882 self.settings = Some(settings);
883 self
884 }
885
886 pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
888 self.auth = Some(auth);
889 self
890 }
891
892 pub fn disable_context_discovery(mut self) -> Self {
895 self.context_discovery_disabled = true;
896 self
897 }
898
899 pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
902 self.session_store = Some(store);
903 self
904 }
905
906 pub fn with_autocompact(mut self) -> Self {
911 self.autocompact_enabled = true;
912 self
913 }
914
915 pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
921 self.skills = skills;
922 self
923 }
924
925 pub fn without_skills(mut self) -> Self {
926 self.skills.clear();
927 self
928 }
929
930 pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
934 self.extra_tools = tools;
935 self
936 }
937
938 pub fn with_extension_registry(
939 mut self,
940 registry: Arc<crate::extensions::ExtensionRegistry>,
941 ) -> Self {
942 self.extension_registry = Some(registry);
943 self
944 }
945
946 pub fn with_extension_diagnostics(
947 mut self,
948 diagnostics: Arc<Vec<crate::extensions::ExtensionDiagnostic>>,
949 ) -> Self {
950 self.extension_diagnostics = Some(diagnostics);
951 self
952 }
953
954 pub fn with_mcp_servers(
957 mut self,
958 servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
959 ) -> Self {
960 self.mcp_servers = servers;
961 self
962 }
963
964 pub fn with_custom_tools_factory(
969 mut self,
970 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
971 ) -> Self {
972 self.custom_tools_factory = Some(Box::new(factory));
973 self
974 }
975
976 pub async fn build_with_custom_tools(
980 self,
981 factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
982 ) -> Result<App> {
983 self.with_custom_tools_factory(factory).build().await
984 }
985
986 pub async fn build_with_session(
994 mut self,
995 resume: Option<crate::session::SessionId>,
996 ) -> Result<App> {
997 if let Some(id) = resume {
998 if self.session_store.is_none() {
999 return Err(AppError::Config(
1000 "build_with_session(Some(id)) requires with_session_store(...)".into(),
1001 ));
1002 }
1003 self.resume_session_id = Some(id);
1004 }
1005 self.build_internal().await
1006 }
1007
1008 pub async fn build(self) -> Result<App> {
1010 self.build_with_session(None).await
1011 }
1012
1013 async fn build_internal(mut self) -> Result<App> {
1014 let mcp_servers = std::mem::take(&mut self.mcp_servers);
1015 let extra_tools = std::mem::take(&mut self.extra_tools);
1016 let skills = self.skills.clone();
1017 if self.install_builtin_tools && self.custom_tools_factory.is_some() {
1018 return Err(AppError::Config(
1019 "with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
1020 ));
1021 }
1022
1023 let has_config = self.config.is_some();
1027 let has_auth = self.auth.is_some();
1028 let mut config = self.config.unwrap_or_default();
1029 let settings = match self.settings {
1030 Some(settings) => settings,
1031 None => {
1032 let mut settings = crate::settings::Settings::default();
1033 settings.model.provider = config.model.provider.clone();
1034 settings.model.name = config.model.name.clone();
1035 settings.model.max_tokens = config.model.max_tokens;
1036 settings
1037 }
1038 };
1039 config.model.provider = settings.model.provider.clone();
1040 config.model.name = settings.model.name.clone();
1041 config.model.max_tokens = settings.model.max_tokens;
1042 let mut auth = self.auth.unwrap_or_default();
1043 if !has_auth {
1044 if let Some(key) = config.anthropic.api_key.as_deref() {
1045 auth.0.insert(
1046 "anthropic".into(),
1047 crate::auth::ProviderAuth::ApiKey {
1048 key: key.to_string(),
1049 },
1050 );
1051 }
1052 }
1053 let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
1054 if env_or_auth_key.is_some() || has_auth || !has_config {
1055 config.anthropic.api_key = env_or_auth_key;
1056 }
1057 let cwd = self
1058 .cwd
1059 .or_else(|| std::env::current_dir().ok())
1060 .unwrap_or_else(|| PathBuf::from("."));
1061 let agent_dir = crate::paths::agent_dir();
1062 let permission_gate = self.permission_gate.unwrap_or_else(|| {
1063 if self.ui_tx.is_some() || self.headless_permissions {
1067 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1068 } else {
1069 tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
1070 Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
1071 }
1072 });
1073
1074 let policy: Arc<crate::permissions::Policy> =
1076 Arc::new(match self.permissions_policy_path.as_ref() {
1077 Some(path) => crate::permissions::Policy::load_or_default(path)?,
1078 None => crate::permissions::Policy::default(),
1079 });
1080 let session_cache = Arc::new(crate::permissions::SessionCache::new());
1081
1082 let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
1084
1085 let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
1090 let cancel_token = probe_ctx.cancel_token.clone();
1091 let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
1092 if let Some(factory_fn) = self.custom_tools_factory.take() {
1093 let mut t = factory_fn(probe_ctx);
1094 t.extend(extra_tools.clone());
1095 (false, t)
1096 } else {
1097 (self.install_builtin_tools, extra_tools.clone())
1098 };
1099
1100 let factory = SessionFactory {
1101 cwd: cwd.clone(),
1102 settings: settings.clone(),
1103 auth: auth.clone(),
1104 policy: Arc::clone(&policy),
1105 session_cache: Arc::clone(&session_cache),
1106 ui_tx: self.ui_tx.clone(),
1107 headless_permissions: self.headless_permissions,
1108 permission_gate: Arc::clone(&permission_gate),
1109 progress_tx: progress_tx.clone(),
1110 skills: Arc::new(skills.clone()),
1111 install_builtin_tools: install_builtin,
1112 extra_tools: factory_extra_tools,
1113 max_iterations: self.max_iterations,
1114 context_discovery_disabled: self.context_discovery_disabled,
1115 autocompact_enabled: self.autocompact_enabled,
1116 session_store: self.session_store.clone(),
1117 llm_override: self.llm_override.clone(),
1118 current_model: Arc::new(Mutex::new(None)),
1119 cancel_token: cancel_token.clone(),
1120 };
1121
1122 let mode = match self.resume_session_id.take() {
1123 Some(id) => SessionMode::Resume(id.into_string()),
1124 None => SessionMode::New,
1125 };
1126 let (session, llm) = factory.build(mode, None).await?;
1127 let (extension_registry, extension_diagnostics) =
1128 if let Some(reg) = self.extension_registry.take() {
1129 let diagnostics = self
1130 .extension_diagnostics
1131 .unwrap_or_else(|| Arc::new(Vec::new()));
1132 (reg, diagnostics)
1133 } else {
1134 let manifest_path = crate::paths::extensions_manifest_path(&agent_dir);
1135 let (registry, diagnostics) =
1136 crate::extensions::load_extensions_manifest(&manifest_path).await;
1137 for d in &diagnostics {
1138 let message = d.message.as_str();
1139 match d.severity {
1140 crate::extensions::DiagnosticSeverity::Warn => {
1141 tracing::warn!("extensions: {message}");
1142 }
1143 crate::extensions::DiagnosticSeverity::Error => {
1144 tracing::error!("extensions: {message}");
1145 }
1146 }
1147 }
1148 (Arc::new(registry), Arc::new(diagnostics))
1149 };
1150
1151 Ok(App {
1152 session: arc_swap::ArcSwap::from_pointee(session),
1153 llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
1154 factory,
1155 config,
1156 cancel_token,
1157 progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
1158 next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
1159 skills: Arc::new(skills),
1160 mcp_servers,
1161 extension_registry,
1162 extension_diagnostics,
1163 session_cache,
1164 })
1165 }
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170 use super::*;
1171 use crate::config::{AnthropicConfig, ModelConfig};
1172 use crate::events::UiEvent;
1173 use crate::user_message::UserMessage;
1174 use async_trait::async_trait;
1175 use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
1176 use motosan_agent_tool::ToolDef;
1177 use std::sync::atomic::{AtomicUsize, Ordering};
1178
1179 #[tokio::test]
1180 async fn builder_fails_without_api_key() {
1181 let cfg = Config {
1182 anthropic: AnthropicConfig {
1183 api_key: None,
1184 base_url: "https://api.anthropic.com".into(),
1185 },
1186 model: ModelConfig {
1187 provider: "anthropic".into(),
1188 name: "claude-sonnet-4-6".into(),
1189 max_tokens: 4096,
1190 },
1191 };
1192 let err = match AppBuilder::new()
1193 .with_config(cfg)
1194 .with_builtin_tools()
1195 .build()
1196 .await
1197 {
1198 Ok(_) => panic!("must fail without key"),
1199 Err(err) => err,
1200 };
1201 assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
1202 }
1203
1204 struct ToolOnlyLlm {
1205 turn: AtomicUsize,
1206 }
1207
1208 #[async_trait]
1209 impl LlmClient for ToolOnlyLlm {
1210 async fn chat(
1211 &self,
1212 _messages: &[Message],
1213 _tools: &[ToolDef],
1214 ) -> motosan_agent_loop::Result<ChatOutput> {
1215 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1216 if turn == 0 {
1217 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1218 ToolCallItem {
1219 id: "t1".into(),
1220 name: "read".into(),
1221 args: serde_json::json!({"path":"nope.txt"}),
1222 },
1223 ])))
1224 } else {
1225 Ok(ChatOutput::new(LlmResponse::Message(String::new())))
1226 }
1227 }
1228 }
1229
1230 #[tokio::test]
1231 async fn empty_final_message_is_not_emitted() {
1232 let dir = tempfile::tempdir().unwrap();
1233 let mut cfg = Config::default();
1234 cfg.anthropic.api_key = Some("sk-unused".into());
1235 let app = AppBuilder::new()
1236 .with_config(cfg)
1237 .with_cwd(dir.path())
1238 .with_builtin_tools()
1239 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1240 turn: AtomicUsize::new(0),
1241 }))
1242 .build()
1243 .await
1244 .expect("build");
1245 let events: Vec<UiEvent> =
1246 futures::StreamExt::collect(app.send_user_message(UserMessage::text("x"))).await;
1247 let empties = events
1248 .iter()
1249 .filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
1250 .count();
1251 assert_eq!(
1252 empties, 0,
1253 "should not emit empty final message, got: {events:?}"
1254 );
1255 }
1256
1257 struct EchoLlm;
1258
1259 #[async_trait]
1260 impl LlmClient for EchoLlm {
1261 async fn chat(
1262 &self,
1263 _messages: &[Message],
1264 _tools: &[ToolDef],
1265 ) -> motosan_agent_loop::Result<ChatOutput> {
1266 Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
1267 }
1268 }
1269
1270 #[tokio::test]
1271 async fn with_headless_permissions_builds_an_app() {
1272 let dir = tempfile::tempdir().expect("tempdir");
1273 let mut config = Config::default();
1274 config.anthropic.api_key = Some("sk-unused".into());
1275 let app = AppBuilder::new()
1276 .with_config(config)
1277 .with_cwd(dir.path())
1278 .with_builtin_tools()
1279 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1280 .with_headless_permissions()
1281 .build()
1282 .await
1283 .expect("build");
1284 assert!(!app.session_id().is_empty());
1286 }
1287
1288 #[tokio::test]
1289 async fn new_session_swaps_in_a_fresh_empty_session() {
1290 use futures::StreamExt;
1291 let dir = tempfile::tempdir().expect("tempdir");
1292 let mut config = Config::default();
1293 config.anthropic.api_key = Some("sk-unused".into());
1294 let app = AppBuilder::new()
1295 .with_config(config)
1296 .with_cwd(dir.path())
1297 .with_builtin_tools()
1298 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1299 .build()
1300 .await
1301 .expect("build");
1302
1303 let _: Vec<_> = app
1304 .send_user_message(UserMessage::text("hello"))
1305 .collect()
1306 .await;
1307 let id_before = app.session_id();
1308 assert!(!app.session_history().await.expect("history").is_empty());
1309
1310 app.new_session().await.expect("new_session");
1311
1312 assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
1313 assert!(
1314 app.session_history().await.expect("history").is_empty(),
1315 "fresh session has no history"
1316 );
1317 }
1318
1319 #[tokio::test]
1320 async fn load_session_restores_a_stored_session_by_id() {
1321 use futures::StreamExt;
1322 let dir = tempfile::tempdir().expect("tempdir");
1323 let store_dir = dir.path().join("sessions");
1324 let store: Arc<dyn SessionStore> =
1325 Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
1326 let mut config = Config::default();
1327 config.anthropic.api_key = Some("sk-unused".into());
1328 let app = AppBuilder::new()
1329 .with_config(config)
1330 .with_cwd(dir.path())
1331 .with_builtin_tools()
1332 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1333 .with_session_store(Arc::clone(&store))
1334 .build()
1335 .await
1336 .expect("build");
1337
1338 let _: Vec<_> = app
1339 .send_user_message(UserMessage::text("remember this"))
1340 .collect()
1341 .await;
1342 let original_id = app.session_id();
1343
1344 app.new_session().await.expect("new_session");
1345 assert_ne!(app.session_id(), original_id);
1346
1347 app.load_session(&original_id).await.expect("load_session");
1348 assert_eq!(app.session_id(), original_id);
1349 let history = app.session_history().await.expect("history");
1350 assert!(
1351 history.iter().any(|m| m.text().contains("remember this")),
1352 "loaded session should carry the original turn"
1353 );
1354 }
1355
1356 #[tokio::test]
1357 async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
1358 use futures::StreamExt;
1359 let dir = tempfile::tempdir().expect("tempdir");
1360 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1361 dir.path().join("s"),
1362 ));
1363 let mut config = Config::default();
1364 config.anthropic.api_key = Some("sk-unused".into());
1365 let app = AppBuilder::new()
1366 .with_config(config)
1367 .with_cwd(dir.path())
1368 .with_builtin_tools()
1369 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1370 .with_session_store(store)
1371 .build()
1372 .await
1373 .expect("build");
1374
1375 let _: Vec<_> = app
1376 .send_user_message(UserMessage::text("hello"))
1377 .collect()
1378 .await;
1379 let original_id = app.session_id();
1380
1381 let new_id = app.clone_session().await.expect("clone_session");
1382
1383 assert_ne!(new_id, original_id);
1385 assert_eq!(app.session_id(), new_id);
1386 let history = app.session_history().await.expect("history");
1388 assert!(history.iter().any(|m| m.text().contains("hello")));
1389 }
1390
1391 #[tokio::test]
1392 async fn fork_from_creates_a_branch_off_an_earlier_entry() {
1393 use futures::StreamExt;
1394 let dir = tempfile::tempdir().expect("tempdir");
1395 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1396 dir.path().join("s"),
1397 ));
1398 let mut config = Config::default();
1399 config.anthropic.api_key = Some("sk-unused".into());
1400 let app = AppBuilder::new()
1401 .with_config(config)
1402 .with_cwd(dir.path())
1403 .with_builtin_tools()
1404 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1405 .with_session_store(store)
1406 .build()
1407 .await
1408 .expect("build");
1409
1410 let _: Vec<_> = app
1412 .send_user_message(UserMessage::text("first"))
1413 .collect()
1414 .await;
1415 let _: Vec<_> = app
1416 .send_user_message(UserMessage::text("second"))
1417 .collect()
1418 .await;
1419
1420 let entries = app.session.load_full().entries().await.expect("entries");
1422 let first_id = entries
1423 .iter()
1424 .find_map(|stored| {
1425 let msg = stored.entry.as_message()?;
1426 (msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
1427 .then(|| stored.id.clone())
1428 })
1429 .expect("first user message present");
1430
1431 let _: Vec<_> = app
1433 .fork_from(first_id, UserMessage::text("branched"))
1434 .collect()
1435 .await;
1436
1437 let history = app.session_history().await.expect("history");
1438 let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
1439 assert!(
1440 texts.iter().any(|t| t.contains("first")),
1441 "fork keeps the fork-point ancestor"
1442 );
1443 assert!(
1444 texts.iter().any(|t| t.contains("branched")),
1445 "fork includes the new message"
1446 );
1447 assert!(
1448 !texts.iter().any(|t| t.contains("second")),
1449 "fork excludes the abandoned branch"
1450 );
1451 }
1452
1453 #[tokio::test]
1454 async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
1455 use futures::StreamExt;
1456 let dir = tempfile::tempdir().expect("tempdir");
1457 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1458 dir.path().join("s"),
1459 ));
1460 let mut config = Config::default();
1461 config.anthropic.api_key = Some("sk-unused".into());
1462 let app = AppBuilder::new()
1463 .with_config(config)
1464 .with_cwd(dir.path())
1465 .with_builtin_tools()
1466 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1467 .with_session_store(store)
1468 .build()
1469 .await
1470 .expect("build");
1471
1472 let _: Vec<_> = app
1473 .send_user_message(UserMessage::text("alpha"))
1474 .collect()
1475 .await;
1476 let _: Vec<_> = app
1477 .send_user_message(UserMessage::text("bravo"))
1478 .collect()
1479 .await;
1480
1481 let candidates = app.fork_candidates().await.expect("candidates");
1482 let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
1483 assert!(previews[0].contains("bravo"), "got {previews:?}");
1485 assert!(previews.iter().any(|p| p.contains("alpha")));
1486 assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
1488 }
1489
1490 #[tokio::test]
1491 async fn branches_returns_a_tree_for_a_linear_session() {
1492 use futures::StreamExt;
1493 let dir = tempfile::tempdir().expect("tempdir");
1494 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1495 dir.path().join("s"),
1496 ));
1497 let mut config = Config::default();
1498 config.anthropic.api_key = Some("sk-unused".into());
1499 let app = AppBuilder::new()
1500 .with_config(config)
1501 .with_cwd(dir.path())
1502 .with_builtin_tools()
1503 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1504 .with_session_store(store)
1505 .build()
1506 .await
1507 .expect("build");
1508
1509 let _: Vec<_> = app
1510 .send_user_message(UserMessage::text("hello"))
1511 .collect()
1512 .await;
1513 let tree = app.branches().await.expect("branches");
1514 assert!(!tree.nodes.is_empty());
1516 assert!(tree.active_leaf.is_some());
1517 }
1518
1519 #[tokio::test]
1520 async fn switch_model_preserves_history() {
1521 use futures::StreamExt;
1522 let dir = tempfile::tempdir().expect("tempdir");
1523 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1524 dir.path().join("s"),
1525 ));
1526 let mut config = Config::default();
1527 config.anthropic.api_key = Some("sk-unused".into());
1528 let app = AppBuilder::new()
1529 .with_config(config)
1530 .with_cwd(dir.path())
1531 .with_builtin_tools()
1532 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1533 .with_session_store(store)
1534 .build()
1535 .await
1536 .expect("build");
1537
1538 let _: Vec<_> = app
1539 .send_user_message(UserMessage::text("keep me"))
1540 .collect()
1541 .await;
1542 let id_before = app.session_id();
1543
1544 app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
1545 .await
1546 .expect("switch_model");
1547
1548 assert_eq!(
1549 app.session_id(),
1550 id_before,
1551 "switch_model keeps the same session"
1552 );
1553 let history = app.session_history().await.expect("history");
1554 assert!(history.iter().any(|m| m.text().contains("keep me")));
1555 }
1556
1557 #[tokio::test]
1558 async fn switch_model_is_sticky_for_future_session_rebuilds() {
1559 let dir = tempfile::tempdir().expect("tempdir");
1560 let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
1561 dir.path().join("s"),
1562 ));
1563 let mut config = Config::default();
1564 config.anthropic.api_key = Some("sk-unused".into());
1565 let app = AppBuilder::new()
1566 .with_config(config)
1567 .with_cwd(dir.path())
1568 .with_builtin_tools()
1569 .with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
1570 .with_session_store(store)
1571 .build()
1572 .await
1573 .expect("build");
1574
1575 let selected = crate::model::ModelId::from("claude-opus-4-7");
1576 app.switch_model(&selected).await.expect("switch_model");
1577 app.new_session().await.expect("new_session");
1578
1579 assert_eq!(app.factory.current_model(), Some(selected));
1580 }
1581
1582 struct SleepThenDoneLlm {
1583 turn: AtomicUsize,
1584 }
1585
1586 #[async_trait]
1587 impl LlmClient for SleepThenDoneLlm {
1588 async fn chat(
1589 &self,
1590 _messages: &[Message],
1591 _tools: &[ToolDef],
1592 ) -> motosan_agent_loop::Result<ChatOutput> {
1593 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
1594 if turn == 0 {
1595 Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
1596 ToolCallItem {
1597 id: "sleep".into(),
1598 name: "bash".into(),
1599 args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
1600 },
1601 ])))
1602 } else {
1603 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
1604 }
1605 }
1606 }
1607
1608 #[tokio::test]
1609 async fn cancel_reaches_builtin_tools_after_session_rebuild() {
1610 use futures::StreamExt;
1611 let dir = tempfile::tempdir().expect("tempdir");
1612 let mut config = Config::default();
1613 config.anthropic.api_key = Some("sk-unused".into());
1614 let app = Arc::new(
1615 AppBuilder::new()
1616 .with_config(config)
1617 .with_cwd(dir.path())
1618 .with_builtin_tools()
1619 .with_llm(Arc::new(SleepThenDoneLlm {
1620 turn: AtomicUsize::new(0),
1621 }) as Arc<dyn LlmClient>)
1622 .build()
1623 .await
1624 .expect("build"),
1625 );
1626
1627 app.new_session().await.expect("new_session");
1628 let running_app = Arc::clone(&app);
1629 let handle = tokio::spawn(async move {
1630 running_app
1631 .send_user_message(UserMessage::text("run a slow command"))
1632 .collect::<Vec<_>>()
1633 .await
1634 });
1635 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1636 app.cancel();
1637
1638 let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
1639 .await
1640 .expect("turn should finish after cancellation")
1641 .expect("join");
1642 assert!(
1643 events.iter().any(|event| {
1644 matches!(
1645 event,
1646 UiEvent::ToolCallCompleted { result, .. }
1647 if result.text.contains("command cancelled by user")
1648 )
1649 }),
1650 "cancel should reach the rebuilt bash tool: {events:?}"
1651 );
1652 }
1653
1654 #[tokio::test]
1655 async fn compact_summarizes_a_session_with_enough_history() {
1656 struct DoneLlm;
1657 #[async_trait]
1658 impl LlmClient for DoneLlm {
1659 async fn chat(
1660 &self,
1661 _messages: &[Message],
1662 _tools: &[ToolDef],
1663 ) -> motosan_agent_loop::Result<ChatOutput> {
1664 Ok(ChatOutput::new(LlmResponse::Message("done".into())))
1665 }
1666 }
1667
1668 let dir = tempfile::tempdir().expect("tempdir");
1669 let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
1670 dir.path().join("sessions"),
1671 ));
1672 let mut config = Config::default();
1673 config.anthropic.api_key = Some("sk-unused".into());
1674 let app = AppBuilder::new()
1675 .with_config(config)
1676 .with_cwd(dir.path())
1677 .with_builtin_tools()
1678 .with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
1679 .with_session_store(store)
1680 .build()
1681 .await
1682 .expect("build");
1683
1684 for i in 0..4 {
1691 let _: Vec<_> = app
1692 .send_user_message(UserMessage::text(format!("turn {i}")))
1693 .collect()
1694 .await;
1695 }
1696
1697 app.compact().await.expect("compact should succeed");
1698
1699 let history = app.session_history().await.expect("history");
1702 assert!(
1703 !history.is_empty(),
1704 "session should still have content post-compaction"
1705 );
1706 }
1707
1708 #[test]
1709 fn anthropic_env_api_key_overrides_auth_json_key() {
1710 let mut auth = crate::auth::Auth::default();
1711 auth.0.insert(
1712 "anthropic".into(),
1713 crate::auth::ProviderAuth::ApiKey {
1714 key: "sk-auth".into(),
1715 },
1716 );
1717
1718 let key = anthropic_api_key_from(&auth, |name| {
1719 (name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
1720 });
1721 assert_eq!(key.as_deref(), Some("sk-env"));
1722 }
1723
1724 #[tokio::test]
1725 async fn with_settings_overrides_deprecated_config_model() {
1726 use crate::settings::Settings;
1727
1728 let mut config = Config::default();
1729 config.model.name = "from-config".into();
1730 config.anthropic.api_key = Some("sk-config".into());
1731
1732 let mut settings = Settings::default();
1733 settings.model.name = "from-settings".into();
1734
1735 let tmp = tempfile::tempdir().unwrap();
1736 let app = AppBuilder::new()
1737 .with_config(config)
1738 .with_settings(settings)
1739 .with_cwd(tmp.path())
1740 .disable_context_discovery()
1741 .with_llm(Arc::new(EchoLlm))
1742 .build()
1743 .await
1744 .expect("build");
1745 assert_eq!(app.config().model.name, "from-settings");
1746 assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
1747 }
1748
1749 #[tokio::test]
1750 async fn with_settings_synthesises_legacy_config_for_build() {
1751 use crate::auth::{Auth, ProviderAuth};
1752 use crate::settings::Settings;
1753
1754 let mut settings = Settings::default();
1755 settings.model.name = "claude-sonnet-4-6".into();
1756
1757 let mut auth = Auth::default();
1758 auth.0.insert(
1759 "anthropic".into(),
1760 ProviderAuth::ApiKey {
1761 key: "sk-test".into(),
1762 },
1763 );
1764
1765 let tmp = tempfile::tempdir().unwrap();
1766 let app = AppBuilder::new()
1767 .with_settings(settings)
1768 .with_auth(auth)
1769 .with_cwd(tmp.path())
1770 .with_builtin_tools()
1771 .disable_context_discovery()
1772 .with_llm(Arc::new(EchoLlm))
1773 .build()
1774 .await
1775 .expect("build");
1776 let _ = app;
1777 }
1778
1779 #[tokio::test]
1780 async fn cancel_before_turn_does_not_poison_future_turns() {
1781 let dir = tempfile::tempdir().unwrap();
1782 let mut cfg = Config::default();
1783 cfg.anthropic.api_key = Some("sk-unused".into());
1784 let app = AppBuilder::new()
1785 .with_config(cfg)
1786 .with_cwd(dir.path())
1787 .with_builtin_tools()
1788 .with_llm(std::sync::Arc::new(EchoLlm))
1789 .build()
1790 .await
1791 .expect("build");
1792
1793 app.cancel();
1794 let events: Vec<UiEvent> = app
1795 .send_user_message(UserMessage::text("x"))
1796 .collect()
1797 .await;
1798
1799 assert!(
1800 events
1801 .iter()
1802 .any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
1803 "turn should use a fresh cancellation token: {events:?}"
1804 );
1805 }
1806
1807 #[test]
1808 fn map_event_matches_started_and_completed_ids_by_tool_name() {
1809 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1810
1811 let started_bash = map_event(
1812 AgentEvent::Core(CoreEvent::ToolStarted {
1813 name: "bash".into(),
1814 }),
1815 &tracker,
1816 );
1817 let started_read = map_event(
1818 AgentEvent::Core(CoreEvent::ToolStarted {
1819 name: "read".into(),
1820 }),
1821 &tracker,
1822 );
1823 let completed_bash = map_event(
1824 AgentEvent::Core(CoreEvent::ToolCompleted {
1825 name: "bash".into(),
1826 result: motosan_agent_tool::ToolResult::text("ok"),
1827 }),
1828 &tracker,
1829 );
1830 let completed_read = map_event(
1831 AgentEvent::Core(CoreEvent::ToolCompleted {
1832 name: "read".into(),
1833 result: motosan_agent_tool::ToolResult::text("ok"),
1834 }),
1835 &tracker,
1836 );
1837
1838 assert!(matches!(
1839 started_bash,
1840 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
1841 ));
1842 assert!(matches!(
1843 started_read,
1844 Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
1845 ));
1846 assert!(matches!(
1847 completed_bash,
1848 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
1849 ));
1850 assert!(matches!(
1851 completed_read,
1852 Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
1853 ));
1854 }
1855
1856 #[test]
1857 fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
1858 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1859 let s1 = map_event(
1860 AgentEvent::Core(CoreEvent::ToolStarted {
1861 name: "bash".into(),
1862 }),
1863 &tracker,
1864 );
1865 let s2 = map_event(
1866 AgentEvent::Core(CoreEvent::ToolStarted {
1867 name: "bash".into(),
1868 }),
1869 &tracker,
1870 );
1871 let c1 = map_event(
1872 AgentEvent::Core(CoreEvent::ToolCompleted {
1873 name: "bash".into(),
1874 result: motosan_agent_tool::ToolResult::text("a"),
1875 }),
1876 &tracker,
1877 );
1878 let c2 = map_event(
1879 AgentEvent::Core(CoreEvent::ToolCompleted {
1880 name: "bash".into(),
1881 result: motosan_agent_tool::ToolResult::text("b"),
1882 }),
1883 &tracker,
1884 );
1885
1886 let id_s1 = match s1 {
1887 Some(UiEvent::ToolCallStarted { id, .. }) => id,
1888 other => panic!("{other:?}"),
1889 };
1890 let id_s2 = match s2 {
1891 Some(UiEvent::ToolCallStarted { id, .. }) => id,
1892 other => panic!("{other:?}"),
1893 };
1894 let id_c1 = match c1 {
1895 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
1896 other => panic!("{other:?}"),
1897 };
1898 let id_c2 = match c2 {
1899 Some(UiEvent::ToolCallCompleted { id, .. }) => id,
1900 other => panic!("{other:?}"),
1901 };
1902
1903 assert_eq!(id_s1, id_c1);
1904 assert_eq!(id_s2, id_c2);
1905 assert_ne!(id_s1, id_s2);
1906 }
1907
1908 #[tokio::test]
1909 async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
1910 let dir = tempfile::tempdir().unwrap();
1911 let mut cfg = Config::default();
1912 cfg.anthropic.api_key = Some("sk-unused".into());
1913 let app = AppBuilder::new()
1914 .with_config(cfg)
1915 .with_cwd(dir.path())
1916 .with_builtin_tools()
1917 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1918 turn: AtomicUsize::new(0),
1919 }))
1920 .build()
1921 .await
1922 .expect("build");
1923
1924 let mut first = Box::pin(app.send_user_message(UserMessage::text("first")));
1925 let first_event = first.next().await;
1926 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
1927
1928 let second_events: Vec<UiEvent> = app
1929 .send_user_message(UserMessage::text("second"))
1930 .collect()
1931 .await;
1932 assert_eq!(
1933 second_events.len(),
1934 1,
1935 "expected immediate single error event, got: {second_events:?}"
1936 );
1937 assert!(matches!(
1938 &second_events[0],
1939 UiEvent::Error(msg) if msg.contains("single-turn-per-App")
1940 ));
1941 }
1942
1943 #[tokio::test]
1944 async fn attachment_error_is_emitted_before_the_single_turn_guard_fires() {
1945 let dir = tempfile::tempdir().unwrap();
1951 let mut cfg = Config::default();
1952 cfg.anthropic.api_key = Some("sk-unused".into());
1953 let app = AppBuilder::new()
1954 .with_config(cfg)
1955 .with_cwd(dir.path())
1956 .with_builtin_tools()
1957 .with_llm(std::sync::Arc::new(ToolOnlyLlm {
1958 turn: AtomicUsize::new(0),
1959 }))
1960 .build()
1961 .await
1962 .expect("build");
1963
1964 let mut first =
1966 Box::pin(app.send_user_message(crate::user_message::UserMessage::text("first")));
1967 let first_event = first.next().await;
1968 assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
1969
1970 let bad = crate::user_message::UserMessage {
1972 text: "second".into(),
1973 attachments: vec![crate::user_message::Attachment::Image {
1974 path: std::path::PathBuf::from("/tmp/this-file-must-not-exist-capo-v06-test.png"),
1975 }],
1976 };
1977 let second_events: Vec<UiEvent> = app.send_user_message(bad).collect().await;
1978
1979 assert_eq!(
1980 second_events.len(),
1981 1,
1982 "expected exactly one event (the attachment error); got: {second_events:?}"
1983 );
1984 assert!(
1985 matches!(
1986 &second_events[0],
1987 UiEvent::AttachmentError {
1988 kind: crate::user_message::AttachmentErrorKind::NotFound,
1989 ..
1990 }
1991 ),
1992 "expected AttachmentError::NotFound as first event; got {second_events:?}"
1993 );
1994 }
1995
1996 #[test]
1997 fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
1998 let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
1999 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2000
2001 let only = map_event(
2002 AgentEvent::Core(CoreEvent::ToolStarted {
2003 name: "bash".into(),
2004 }),
2005 &tracker,
2006 );
2007 let only_id = match only {
2008 Some(UiEvent::ToolCallStarted { id, .. }) => id,
2009 other => panic!("{other:?}"),
2010 };
2011 assert_eq!(progress_event_id(&tracker), only_id);
2012
2013 let _second = map_event(
2014 AgentEvent::Core(CoreEvent::ToolStarted {
2015 name: "read".into(),
2016 }),
2017 &tracker,
2018 );
2019 assert_eq!(progress_event_id(&tracker), "tool_unknown");
2020 }
2021
2022 #[tokio::test]
2023 async fn builder_rejects_builtin_and_custom_tools_together() {
2024 let mut cfg = Config::default();
2025 cfg.anthropic.api_key = Some("sk-unused".into());
2026 let dir = tempfile::tempdir().unwrap();
2027 let err = match AppBuilder::new()
2028 .with_config(cfg)
2029 .with_cwd(dir.path())
2030 .with_builtin_tools()
2031 .with_custom_tools_factory(|_| Vec::new())
2032 .build()
2033 .await
2034 {
2035 Ok(_) => panic!("must reject conflicting tool configuration"),
2036 Err(err) => err,
2037 };
2038
2039 assert!(format!("{err}").contains("mutually exclusive"));
2040 }
2041
2042 #[tokio::test]
2044 async fn two_turns_in_same_session_share_history() {
2045 #[derive(Default)]
2046 struct CounterLlm {
2047 turn: AtomicUsize,
2048 }
2049 #[async_trait]
2050 impl LlmClient for CounterLlm {
2051 async fn chat(
2052 &self,
2053 messages: &[Message],
2054 _tools: &[ToolDef],
2055 ) -> motosan_agent_loop::Result<ChatOutput> {
2056 let turn = self.turn.fetch_add(1, Ordering::SeqCst);
2057 let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
2058 Ok(ChatOutput::new(LlmResponse::Message(answer)))
2059 }
2060 }
2061
2062 let tmp = tempfile::tempdir().unwrap();
2063 let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
2064 tmp.path().to_path_buf(),
2065 ));
2066
2067 let app = AppBuilder::new()
2068 .with_settings(crate::settings::Settings::default())
2069 .with_auth(crate::auth::Auth::default())
2070 .with_cwd(tmp.path())
2071 .with_builtin_tools()
2072 .disable_context_discovery()
2073 .with_llm(std::sync::Arc::new(CounterLlm::default()))
2074 .with_session_store(store)
2075 .build_with_session(None)
2076 .await
2077 .expect("build");
2078
2079 let _events1: Vec<UiEvent> = app
2080 .send_user_message(UserMessage::text("hi"))
2081 .collect()
2082 .await;
2083 let events2: Vec<UiEvent> = app
2084 .send_user_message(UserMessage::text("again"))
2085 .collect()
2086 .await;
2087
2088 let saw_more_than_one = events2.iter().any(|e| {
2090 matches!(
2091 e,
2092 UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
2093 )
2094 });
2095 assert!(
2096 saw_more_than_one,
2097 "second turn should have seen history; events: {events2:?}"
2098 );
2099 }
2100}
2101
2102#[cfg(test)]
2103mod skills_builder_tests {
2104 use super::*;
2105 use crate::skills::types::{Skill, SkillSource};
2106 use std::path::PathBuf;
2107
2108 fn fixture() -> Skill {
2109 Skill {
2110 name: "x".into(),
2111 description: "d".into(),
2112 file_path: PathBuf::from("/x.md"),
2113 base_dir: PathBuf::from("/"),
2114 disable_model_invocation: false,
2115 source: SkillSource::Global,
2116 }
2117 }
2118
2119 #[test]
2120 fn with_skills_stores_skills() {
2121 let b = AppBuilder::new().with_skills(vec![fixture()]);
2122 assert_eq!(b.skills.len(), 1);
2123 assert_eq!(b.skills[0].name, "x");
2124 }
2125
2126 #[test]
2127 fn without_skills_clears() {
2128 let b = AppBuilder::new()
2129 .with_skills(vec![fixture()])
2130 .without_skills();
2131 assert!(b.skills.is_empty());
2132 }
2133}
2134
2135#[cfg(test)]
2136mod mcp_builder_tests {
2137 use super::*;
2138 use motosan_agent_tool::Tool;
2139
2140 struct FakeTool;
2142 impl Tool for FakeTool {
2143 fn def(&self) -> motosan_agent_tool::ToolDef {
2144 motosan_agent_tool::ToolDef {
2145 name: "fake__echo".into(),
2146 description: "test".into(),
2147 input_schema: serde_json::json!({"type": "object"}),
2148 }
2149 }
2150 fn call(
2151 &self,
2152 _args: serde_json::Value,
2153 _ctx: &motosan_agent_tool::ToolContext,
2154 ) -> std::pin::Pin<
2155 Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
2156 > {
2157 Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
2158 }
2159 }
2160
2161 #[test]
2162 fn with_extra_tools_stores_tools() {
2163 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
2164 let b = AppBuilder::new().with_extra_tools(tools);
2165 assert_eq!(b.extra_tools.len(), 1);
2166 }
2167}