#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, MutexGuard};
use futures::{Stream, StreamExt};
use motosan_agent_loop::{
AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
CoreEvent, Engine, LlmClient, SessionStore,
};
use motosan_agent_tool::ToolContext;
use tokio::sync::mpsc;
use crate::agent::build_system_prompt;
use crate::config::Config;
use crate::error::{AppError, Result};
use crate::events::{ProgressChunk, UiEvent, UiToolResult};
use crate::llm::build_llm_client;
use crate::permissions::{NoOpPermissionGate, PermissionGate};
use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
#[derive(Debug, Clone)]
pub(crate) enum SessionMode {
New,
Resume(String),
}
struct SharedLlm {
client: Arc<dyn LlmClient>,
}
impl SharedLlm {
fn new(client: Arc<dyn LlmClient>) -> Self {
Self { client }
}
fn client(&self) -> Arc<dyn LlmClient> {
Arc::clone(&self.client)
}
}
pub(crate) struct SessionFactory {
cwd: PathBuf,
settings: crate::settings::Settings,
auth: crate::auth::Auth,
policy: Arc<crate::permissions::Policy>,
session_cache: Arc<crate::permissions::SessionCache>,
ui_tx: Option<mpsc::Sender<UiEvent>>,
headless_permissions: bool,
permission_gate: Arc<dyn PermissionGate>,
progress_tx: mpsc::Sender<ToolProgressChunk>,
skills: Arc<Vec<crate::skills::Skill>>,
install_builtin_tools: bool,
extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
max_iterations: usize,
context_discovery_disabled: bool,
autocompact_enabled: bool,
session_store: Option<Arc<dyn SessionStore>>,
llm_override: Option<Arc<dyn LlmClient>>,
current_model: Arc<Mutex<Option<crate::model::ModelId>>>,
cancel_token: SharedCancelToken,
}
impl SessionFactory {
fn current_model(&self) -> Option<crate::model::ModelId> {
match self.current_model.lock() {
Ok(guard) => guard.clone(),
Err(poisoned) => poisoned.into_inner().clone(),
}
}
fn set_current_model(&self, model: crate::model::ModelId) {
match self.current_model.lock() {
Ok(mut guard) => *guard = Some(model),
Err(poisoned) => *poisoned.into_inner() = Some(model),
}
}
async fn build(
&self,
mode: SessionMode,
model_override: Option<&crate::model::ModelId>,
) -> Result<(AgentSession, Arc<dyn LlmClient>)> {
let effective_model = model_override.cloned().or_else(|| self.current_model());
let mut settings = self.settings.clone();
if let Some(m) = &effective_model {
settings.model.name = m.as_str().to_string();
}
let llm = if effective_model.is_none() {
self.llm_override.as_ref().map_or_else(
|| build_llm_client(&settings, &self.auth),
|llm| Ok(Arc::clone(llm)),
)?
} else {
build_llm_client(&settings, &self.auth)?
};
let tool_ctx = ToolCtx::new_with_cancel_token(
&self.cwd,
Arc::clone(&self.permission_gate),
self.progress_tx.clone(),
self.cancel_token.clone(),
);
let mut tools = if self.install_builtin_tools {
builtin_tools(tool_ctx.clone())
} else {
Vec::new()
};
tools.extend(self.extra_tools.iter().cloned());
let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
let base_prompt = build_system_prompt(&tool_names, &self.skills);
let system_prompt = if self.context_discovery_disabled {
base_prompt
} else {
let agent_dir = crate::paths::agent_dir();
let context = crate::context_files::load_project_context_files(&self.cwd, &agent_dir);
crate::context_files::assemble_system_prompt(&base_prompt, &context, &self.cwd)
};
let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&self.cwd);
let mut engine_builder = Engine::builder()
.max_iterations(self.max_iterations)
.system_prompt(system_prompt)
.tool_context(motosan_tool_context);
for tool in tools {
engine_builder = engine_builder.tool(tool);
}
if let Some(ui_tx) = &self.ui_tx {
let ext = crate::permissions::PermissionExtension::new(
Arc::clone(&self.policy),
Arc::clone(&self.session_cache),
self.cwd.clone(),
ui_tx.clone(),
);
engine_builder = engine_builder.extension(Box::new(ext));
} else if self.headless_permissions {
let ext = crate::permissions::PermissionExtension::headless(
Arc::clone(&self.policy),
Arc::clone(&self.session_cache),
self.cwd.clone(),
);
engine_builder = engine_builder.extension(Box::new(ext));
}
if self.autocompact_enabled
&& settings.session.compact_at_context_pct > 0.0
&& settings.session.compact_at_context_pct < 1.0
{
let cfg = AutocompactConfig {
threshold: settings.session.compact_at_context_pct,
max_context_tokens: settings.session.max_context_tokens,
keep_turns: settings.session.keep_turns.max(1),
};
engine_builder = engine_builder
.extension(Box::new(AutocompactExtension::new(cfg, Arc::clone(&llm))));
}
let engine = engine_builder.build();
let session = match (&mode, &self.session_store) {
(SessionMode::Resume(id), Some(store)) => {
let s = AgentSession::resume(id, Arc::clone(store), engine, Arc::clone(&llm))
.await
.map_err(|e| AppError::Config(format!("resume failed: {e}")))?;
let entries = s
.entries()
.await
.map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
s
}
(SessionMode::Resume(_), None) => {
return Err(AppError::Config("resume requires a session store".into()));
}
(SessionMode::New, Some(store)) => {
let id = crate::session::SessionId::new();
AgentSession::new_with_store(
id.into_string(),
Arc::clone(store),
engine,
Arc::clone(&llm),
)
}
(SessionMode::New, None) => AgentSession::new(engine, Arc::clone(&llm)),
};
Ok((session, llm))
}
}
pub struct App {
session: arc_swap::ArcSwap<AgentSession>,
llm: arc_swap::ArcSwap<SharedLlm>,
factory: SessionFactory,
config: Config,
cancel_token: SharedCancelToken,
progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
next_tool_id: Arc<Mutex<ToolCallTracker>>,
skills: Arc<Vec<crate::skills::Skill>>,
mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
}
impl App {
pub fn config(&self) -> &Config {
&self.config
}
pub fn cancel(&self) {
self.cancel_token.cancel();
}
pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
Arc::clone(&self.session_cache)
}
pub fn session_id(&self) -> String {
self.session.load().session_id().to_string()
}
pub async fn session_history(
&self,
) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
self.session.load_full().history().await
}
pub async fn compact(&self) -> Result<()> {
use motosan_agent_loop::ThresholdStrategy;
let strategy = ThresholdStrategy {
threshold: 0.0,
..ThresholdStrategy::default()
};
let llm = self.llm.load_full().client();
self.session
.load_full()
.maybe_compact(&strategy, llm)
.await
.map_err(|e| AppError::Config(format!("compaction failed: {e}")))?;
Ok(())
}
pub async fn new_session(&self) -> Result<()> {
let (session, llm) = self.factory.build(SessionMode::New, None).await?;
self.session.store(Arc::new(session));
self.llm.store(Arc::new(SharedLlm::new(llm)));
Ok(())
}
pub async fn load_session(&self, id: &str) -> Result<()> {
let (session, llm) = self
.factory
.build(SessionMode::Resume(id.to_string()), None)
.await?;
self.session.store(Arc::new(session));
self.llm.store(Arc::new(SharedLlm::new(llm)));
Ok(())
}
pub async fn clone_session(&self) -> Result<String> {
let Some(store) = self.factory.session_store.as_ref() else {
return Err(AppError::Config("clone requires a session store".into()));
};
let source_id = self.session.load().session_id().to_string();
let new_id = crate::session::SessionId::new().into_string();
let catalog = motosan_agent_loop::SessionCatalog::new(Arc::clone(store));
catalog
.fork(&source_id, &new_id)
.await
.map_err(|e| AppError::Config(format!("clone failed: {e}")))?;
self.load_session(&new_id).await?;
Ok(new_id)
}
pub async fn switch_model(&self, model: &crate::model::ModelId) -> Result<()> {
let current_id = self.session.load().session_id().to_string();
let (session, llm) = self
.factory
.build(SessionMode::Resume(current_id), Some(model))
.await?;
self.factory.set_current_model(model.clone());
self.session.store(Arc::new(session));
self.llm.store(Arc::new(SharedLlm::new(llm)));
Ok(())
}
pub async fn disconnect_mcp(&self) {
for (name, server) in &self.mcp_servers {
let _ =
tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
tracing::debug!(target: "mcp", server = %name, "disconnected");
}
}
fn run_turn(
&self,
msg: crate::user_message::UserMessage,
fork_from: Option<motosan_agent_loop::EntryId>,
) -> impl Stream<Item = UiEvent> + Send + 'static {
let session = self.session.load_full();
let skills = Arc::clone(&self.skills);
let cancel_token = self.cancel_token.clone();
let tracker = Arc::clone(&self.next_tool_id);
let progress = Arc::clone(&self.progress_rx);
async_stream::stream! {
let new_user = {
let expanded_text = crate::skills::expand::expand_skill_command(&msg.text, &skills);
let expanded_msg = crate::user_message::UserMessage {
text: expanded_text,
attachments: msg.attachments.clone(),
};
match crate::user_message::prepare_user_message(&expanded_msg) {
Ok(m) => m,
Err(err) => {
yield UiEvent::AttachmentError {
kind: err.kind(),
message: err.to_string(),
};
return;
}
}
};
let mut progress_guard = match progress.try_lock() {
Ok(guard) => guard,
Err(_) => {
yield UiEvent::Error(
"another turn is already running; capo is single-turn-per-App".into(),
);
return;
}
};
let cancel = cancel_token.reset();
yield UiEvent::AgentTurnStarted;
yield UiEvent::AgentThinking;
let handle = match fork_from {
None => {
let history = match session.history().await {
Ok(h) => h,
Err(err) => {
yield UiEvent::Error(format!("session.history failed: {err}"));
return;
}
};
let mut messages = history;
messages.push(new_user);
match session.start_turn(messages).await {
Ok(h) => h,
Err(err) => {
yield UiEvent::Error(format!("session.start_turn failed: {err}"));
return;
}
}
}
Some(from) => {
match session.fork_turn(from, vec![new_user]).await {
Ok(h) => h,
Err(err) => {
yield UiEvent::Error(format!("session.fork_turn failed: {err}"));
return;
}
}
}
};
let previous_len = handle.previous_len;
let epoch = handle.epoch;
let branch_parent = handle.branch_parent;
let ops_tx = handle.ops_tx.clone();
let mut agent_stream = handle.stream;
let interrupt_bridge = tokio::spawn(async move {
cancel.cancelled().await;
let _ = ops_tx.send(AgentOp::Interrupt).await;
});
let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
loop {
while let Ok(chunk) = progress_guard.try_recv() {
yield UiEvent::ToolCallProgress {
id: progress_event_id(&tracker),
chunk: ProgressChunk::from(chunk),
};
}
tokio::select! {
biased;
maybe_item = agent_stream.next() => {
match maybe_item {
Some(AgentStreamItem::Event(ev)) => {
if let Some(ui) = map_event(ev, &tracker) {
yield ui;
}
}
Some(AgentStreamItem::Terminal(term)) => {
terminal_result = Some(term.result);
terminal_messages = Some(term.messages);
break;
}
None => break,
}
}
Some(chunk) = progress_guard.recv() => {
yield UiEvent::ToolCallProgress {
id: progress_event_id(&tracker),
chunk: ProgressChunk::from(chunk),
};
}
}
}
interrupt_bridge.abort();
if let Some(msgs) = terminal_messages.as_ref() {
if let Err(err) = session
.record_turn_outcome(epoch, previous_len, msgs, branch_parent)
.await
{
yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
}
}
match terminal_result {
Some(Ok(_)) => {
let final_text = terminal_messages
.as_ref()
.and_then(|msgs| {
msgs.iter()
.rev()
.find(|m| m.role() == motosan_agent_loop::Role::Assistant)
.map(|m| m.text())
})
.unwrap_or_default();
if !final_text.is_empty() {
yield UiEvent::AgentMessageComplete(final_text);
}
while let Ok(chunk) = progress_guard.try_recv() {
yield UiEvent::ToolCallProgress {
id: progress_event_id(&tracker),
chunk: ProgressChunk::from(chunk),
};
}
yield UiEvent::AgentTurnComplete;
}
Some(Err(err)) => {
yield UiEvent::Error(format!("{err}"));
}
None => { }
}
}
}
pub fn send_user_message(
&self,
msg: crate::user_message::UserMessage,
) -> impl Stream<Item = UiEvent> + Send + 'static {
self.run_turn(msg, None)
}
pub fn fork_from(
&self,
from: motosan_agent_loop::EntryId,
message: crate::user_message::UserMessage,
) -> impl Stream<Item = UiEvent> + Send + 'static {
self.run_turn(message, Some(from))
}
pub async fn fork_candidates(&self) -> Result<Vec<(motosan_agent_loop::EntryId, String)>> {
let entries = self
.session
.load_full()
.entries()
.await
.map_err(|e| AppError::Config(format!("entries failed: {e}")))?;
let branch = motosan_agent_loop::active_branch(&entries);
let mut out: Vec<(motosan_agent_loop::EntryId, String)> = branch
.iter()
.filter_map(|stored| {
let msg = stored.entry.as_message()?;
if !matches!(msg.role(), motosan_agent_loop::Role::User) {
return None;
}
let preview: String = msg
.text()
.lines()
.next()
.unwrap_or("")
.chars()
.take(80)
.collect();
Some((stored.id.clone(), preview))
})
.collect();
out.reverse();
Ok(out)
}
pub async fn branches(&self) -> Result<motosan_agent_loop::BranchTree> {
self.session
.load_full()
.branches()
.await
.map_err(|e| AppError::Config(format!("branches failed: {e}")))
}
}
#[derive(Debug, Default)]
struct ToolCallTracker {
next_id: usize,
pending: VecDeque<(String, String)>,
}
impl ToolCallTracker {
fn start(&mut self, name: &str) -> String {
self.next_id += 1;
let id = format!("tool_{}", self.next_id);
self.pending.push_back((name.to_string(), id.clone()));
id
}
fn complete(&mut self, name: &str) -> String {
if let Some(pos) = self
.pending
.iter()
.position(|(pending_name, _)| pending_name == name)
{
if let Some((_, id)) = self.pending.remove(pos) {
return id;
}
}
self.next_id += 1;
format!("tool_{}", self.next_id)
}
fn progress_id(&self) -> Option<String> {
match self.pending.len() {
1 => self.pending.front().map(|(_, id)| id.clone()),
_ => None,
}
}
}
fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
match tracker.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
lock_tool_tracker(tracker)
.progress_id()
.unwrap_or_else(|| "tool_unknown".to_string())
}
fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
where
F: Fn(&str) -> Option<String>,
{
env_lookup("ANTHROPIC_API_KEY")
.map(|key| key.trim().to_string())
.filter(|key| !key.is_empty())
.or_else(|| auth.api_key("anthropic").map(str::to_string))
}
fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
match ev {
AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
let id = lock_tool_tracker(tool_tracker).start(&name);
Some(UiEvent::ToolCallStarted {
id,
name,
args: serde_json::json!({}),
})
}
AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
let id = lock_tool_tracker(tool_tracker).complete(&name);
Some(UiEvent::ToolCallCompleted {
id,
result: UiToolResult {
is_error: result.is_error,
text: format!("{name}: {result:?}"),
},
})
}
_ => None,
}
}
type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
pub struct AppBuilder {
config: Option<Config>,
cwd: Option<PathBuf>,
permission_gate: Option<Arc<dyn PermissionGate>>,
install_builtin_tools: bool,
max_iterations: usize,
llm_override: Option<Arc<dyn LlmClient>>,
custom_tools_factory: Option<CustomToolsFactory>,
permissions_policy_path: Option<PathBuf>,
ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
headless_permissions: bool,
settings: Option<crate::settings::Settings>,
auth: Option<crate::auth::Auth>,
context_discovery_disabled: bool,
session_store: Option<Arc<dyn SessionStore>>,
resume_session_id: Option<crate::session::SessionId>,
autocompact_enabled: bool,
skills: Vec<crate::skills::Skill>,
extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
}
impl Default for AppBuilder {
fn default() -> Self {
Self {
config: None,
cwd: None,
permission_gate: None,
install_builtin_tools: false,
max_iterations: 20,
llm_override: None,
custom_tools_factory: None,
permissions_policy_path: None,
ui_tx: None,
headless_permissions: false,
settings: None,
auth: None,
context_discovery_disabled: false,
session_store: None,
resume_session_id: None,
autocompact_enabled: false,
skills: Vec::new(),
extra_tools: Vec::new(),
mcp_servers: Vec::new(),
}
}
}
impl AppBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_config(mut self, cfg: Config) -> Self {
self.config = Some(cfg);
self
}
pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
self.cwd = Some(cwd.into());
self
}
pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
self.permission_gate = Some(gate);
self
}
pub fn with_builtin_tools(mut self) -> Self {
self.install_builtin_tools = true;
self
}
pub fn with_max_iterations(mut self, n: usize) -> Self {
self.max_iterations = n;
self
}
pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
self.llm_override = Some(llm);
self
}
pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
self.permissions_policy_path = Some(path);
self
}
pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
self.ui_tx = Some(tx);
self
}
pub fn with_headless_permissions(mut self) -> Self {
self.headless_permissions = true;
self
}
pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
self.settings = Some(settings);
self
}
pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
self.auth = Some(auth);
self
}
pub fn disable_context_discovery(mut self) -> Self {
self.context_discovery_disabled = true;
self
}
pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
self.session_store = Some(store);
self
}
pub fn with_autocompact(mut self) -> Self {
self.autocompact_enabled = true;
self
}
pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
self.skills = skills;
self
}
pub fn without_skills(mut self) -> Self {
self.skills.clear();
self
}
pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
self.extra_tools = tools;
self
}
pub fn with_mcp_servers(
mut self,
servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
) -> Self {
self.mcp_servers = servers;
self
}
pub fn with_custom_tools_factory(
mut self,
factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
) -> Self {
self.custom_tools_factory = Some(Box::new(factory));
self
}
pub async fn build_with_custom_tools(
self,
factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
) -> Result<App> {
self.with_custom_tools_factory(factory).build().await
}
pub async fn build_with_session(
mut self,
resume: Option<crate::session::SessionId>,
) -> Result<App> {
if let Some(id) = resume {
if self.session_store.is_none() {
return Err(AppError::Config(
"build_with_session(Some(id)) requires with_session_store(...)".into(),
));
}
self.resume_session_id = Some(id);
}
self.build_internal().await
}
pub async fn build(self) -> Result<App> {
self.build_with_session(None).await
}
async fn build_internal(mut self) -> Result<App> {
let mcp_servers = std::mem::take(&mut self.mcp_servers);
let extra_tools = std::mem::take(&mut self.extra_tools);
let skills = self.skills.clone();
if self.install_builtin_tools && self.custom_tools_factory.is_some() {
return Err(AppError::Config(
"with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
));
}
let has_config = self.config.is_some();
let has_auth = self.auth.is_some();
let mut config = self.config.unwrap_or_default();
let settings = match self.settings {
Some(settings) => settings,
None => {
let mut settings = crate::settings::Settings::default();
settings.model.provider = config.model.provider.clone();
settings.model.name = config.model.name.clone();
settings.model.max_tokens = config.model.max_tokens;
settings
}
};
config.model.provider = settings.model.provider.clone();
config.model.name = settings.model.name.clone();
config.model.max_tokens = settings.model.max_tokens;
let mut auth = self.auth.unwrap_or_default();
if !has_auth {
if let Some(key) = config.anthropic.api_key.as_deref() {
auth.0.insert(
"anthropic".into(),
crate::auth::ProviderAuth::ApiKey {
key: key.to_string(),
},
);
}
}
let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
if env_or_auth_key.is_some() || has_auth || !has_config {
config.anthropic.api_key = env_or_auth_key;
}
let cwd = self
.cwd
.or_else(|| std::env::current_dir().ok())
.unwrap_or_else(|| PathBuf::from("."));
let permission_gate = self.permission_gate.unwrap_or_else(|| {
if self.ui_tx.is_some() || self.headless_permissions {
Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
} else {
tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
}
});
let policy: Arc<crate::permissions::Policy> =
Arc::new(match self.permissions_policy_path.as_ref() {
Some(path) => crate::permissions::Policy::load_or_default(path)?,
None => crate::permissions::Policy::default(),
});
let session_cache = Arc::new(crate::permissions::SessionCache::new());
let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
let probe_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx.clone());
let cancel_token = probe_ctx.cancel_token.clone();
let (install_builtin, factory_extra_tools): (bool, Vec<Arc<dyn motosan_agent_tool::Tool>>) =
if let Some(factory_fn) = self.custom_tools_factory.take() {
let mut t = factory_fn(probe_ctx);
t.extend(extra_tools.clone());
(false, t)
} else {
(self.install_builtin_tools, extra_tools.clone())
};
let factory = SessionFactory {
cwd: cwd.clone(),
settings: settings.clone(),
auth: auth.clone(),
policy: Arc::clone(&policy),
session_cache: Arc::clone(&session_cache),
ui_tx: self.ui_tx.clone(),
headless_permissions: self.headless_permissions,
permission_gate: Arc::clone(&permission_gate),
progress_tx: progress_tx.clone(),
skills: Arc::new(skills.clone()),
install_builtin_tools: install_builtin,
extra_tools: factory_extra_tools,
max_iterations: self.max_iterations,
context_discovery_disabled: self.context_discovery_disabled,
autocompact_enabled: self.autocompact_enabled,
session_store: self.session_store.clone(),
llm_override: self.llm_override.clone(),
current_model: Arc::new(Mutex::new(None)),
cancel_token: cancel_token.clone(),
};
let mode = match self.resume_session_id.take() {
Some(id) => SessionMode::Resume(id.into_string()),
None => SessionMode::New,
};
let (session, llm) = factory.build(mode, None).await?;
Ok(App {
session: arc_swap::ArcSwap::from_pointee(session),
llm: arc_swap::ArcSwap::from_pointee(SharedLlm::new(llm)),
factory,
config,
cancel_token,
progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
skills: Arc::new(skills),
mcp_servers,
session_cache,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{AnthropicConfig, ModelConfig};
use crate::events::UiEvent;
use crate::user_message::UserMessage;
use async_trait::async_trait;
use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
use motosan_agent_tool::ToolDef;
use std::sync::atomic::{AtomicUsize, Ordering};
#[tokio::test]
async fn builder_fails_without_api_key() {
let cfg = Config {
anthropic: AnthropicConfig {
api_key: None,
base_url: "https://api.anthropic.com".into(),
},
model: ModelConfig {
provider: "anthropic".into(),
name: "claude-sonnet-4-6".into(),
max_tokens: 4096,
},
};
let err = match AppBuilder::new()
.with_config(cfg)
.with_builtin_tools()
.build()
.await
{
Ok(_) => panic!("must fail without key"),
Err(err) => err,
};
assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
}
struct ToolOnlyLlm {
turn: AtomicUsize,
}
#[async_trait]
impl LlmClient for ToolOnlyLlm {
async fn chat(
&self,
_messages: &[Message],
_tools: &[ToolDef],
) -> motosan_agent_loop::Result<ChatOutput> {
let turn = self.turn.fetch_add(1, Ordering::SeqCst);
if turn == 0 {
Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
ToolCallItem {
id: "t1".into(),
name: "read".into(),
args: serde_json::json!({"path":"nope.txt"}),
},
])))
} else {
Ok(ChatOutput::new(LlmResponse::Message(String::new())))
}
}
}
#[tokio::test]
async fn empty_final_message_is_not_emitted() {
let dir = tempfile::tempdir().unwrap();
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(std::sync::Arc::new(ToolOnlyLlm {
turn: AtomicUsize::new(0),
}))
.build()
.await
.expect("build");
let events: Vec<UiEvent> =
futures::StreamExt::collect(app.send_user_message(UserMessage::text("x"))).await;
let empties = events
.iter()
.filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
.count();
assert_eq!(
empties, 0,
"should not emit empty final message, got: {events:?}"
);
}
struct EchoLlm;
#[async_trait]
impl LlmClient for EchoLlm {
async fn chat(
&self,
_messages: &[Message],
_tools: &[ToolDef],
) -> motosan_agent_loop::Result<ChatOutput> {
Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
}
}
#[tokio::test]
async fn with_headless_permissions_builds_an_app() {
let dir = tempfile::tempdir().expect("tempdir");
let mut config = Config::default();
config.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(config)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
.with_headless_permissions()
.build()
.await
.expect("build");
assert!(!app.session_id().is_empty());
}
#[tokio::test]
async fn new_session_swaps_in_a_fresh_empty_session() {
use futures::StreamExt;
let dir = tempfile::tempdir().expect("tempdir");
let mut config = Config::default();
config.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(config)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
.build()
.await
.expect("build");
let _: Vec<_> = app
.send_user_message(UserMessage::text("hello"))
.collect()
.await;
let id_before = app.session_id();
assert!(!app.session_history().await.expect("history").is_empty());
app.new_session().await.expect("new_session");
assert_ne!(app.session_id(), id_before, "a fresh session has a new id");
assert!(
app.session_history().await.expect("history").is_empty(),
"fresh session has no history"
);
}
#[tokio::test]
async fn load_session_restores_a_stored_session_by_id() {
use futures::StreamExt;
let dir = tempfile::tempdir().expect("tempdir");
let store_dir = dir.path().join("sessions");
let store: Arc<dyn SessionStore> =
Arc::new(motosan_agent_loop::FileSessionStore::new(store_dir));
let mut config = Config::default();
config.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(config)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
.with_session_store(Arc::clone(&store))
.build()
.await
.expect("build");
let _: Vec<_> = app
.send_user_message(UserMessage::text("remember this"))
.collect()
.await;
let original_id = app.session_id();
app.new_session().await.expect("new_session");
assert_ne!(app.session_id(), original_id);
app.load_session(&original_id).await.expect("load_session");
assert_eq!(app.session_id(), original_id);
let history = app.session_history().await.expect("history");
assert!(
history.iter().any(|m| m.text().contains("remember this")),
"loaded session should carry the original turn"
);
}
#[tokio::test]
async fn clone_session_copies_to_a_new_id_and_switches_to_it() {
use futures::StreamExt;
let dir = tempfile::tempdir().expect("tempdir");
let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
dir.path().join("s"),
));
let mut config = Config::default();
config.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(config)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
.with_session_store(store)
.build()
.await
.expect("build");
let _: Vec<_> = app
.send_user_message(UserMessage::text("hello"))
.collect()
.await;
let original_id = app.session_id();
let new_id = app.clone_session().await.expect("clone_session");
assert_ne!(new_id, original_id);
assert_eq!(app.session_id(), new_id);
let history = app.session_history().await.expect("history");
assert!(history.iter().any(|m| m.text().contains("hello")));
}
#[tokio::test]
async fn fork_from_creates_a_branch_off_an_earlier_entry() {
use futures::StreamExt;
let dir = tempfile::tempdir().expect("tempdir");
let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
dir.path().join("s"),
));
let mut config = Config::default();
config.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(config)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
.with_session_store(store)
.build()
.await
.expect("build");
let _: Vec<_> = app
.send_user_message(UserMessage::text("first"))
.collect()
.await;
let _: Vec<_> = app
.send_user_message(UserMessage::text("second"))
.collect()
.await;
let entries = app.session.load_full().entries().await.expect("entries");
let first_id = entries
.iter()
.find_map(|stored| {
let msg = stored.entry.as_message()?;
(msg.role() == motosan_agent_loop::Role::User && msg.text().contains("first"))
.then(|| stored.id.clone())
})
.expect("first user message present");
let _: Vec<_> = app
.fork_from(first_id, UserMessage::text("branched"))
.collect()
.await;
let history = app.session_history().await.expect("history");
let texts: Vec<String> = history.iter().map(|m| m.text()).collect();
assert!(
texts.iter().any(|t| t.contains("first")),
"fork keeps the fork-point ancestor"
);
assert!(
texts.iter().any(|t| t.contains("branched")),
"fork includes the new message"
);
assert!(
!texts.iter().any(|t| t.contains("second")),
"fork excludes the abandoned branch"
);
}
#[tokio::test]
async fn fork_candidates_lists_active_branch_user_messages_newest_first() {
use futures::StreamExt;
let dir = tempfile::tempdir().expect("tempdir");
let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
dir.path().join("s"),
));
let mut config = Config::default();
config.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(config)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
.with_session_store(store)
.build()
.await
.expect("build");
let _: Vec<_> = app
.send_user_message(UserMessage::text("alpha"))
.collect()
.await;
let _: Vec<_> = app
.send_user_message(UserMessage::text("bravo"))
.collect()
.await;
let candidates = app.fork_candidates().await.expect("candidates");
let previews: Vec<&str> = candidates.iter().map(|(_, p)| p.as_str()).collect();
assert!(previews[0].contains("bravo"), "got {previews:?}");
assert!(previews.iter().any(|p| p.contains("alpha")));
assert!(candidates.iter().all(|(id, _)| !id.is_empty()));
}
#[tokio::test]
async fn branches_returns_a_tree_for_a_linear_session() {
use futures::StreamExt;
let dir = tempfile::tempdir().expect("tempdir");
let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
dir.path().join("s"),
));
let mut config = Config::default();
config.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(config)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
.with_session_store(store)
.build()
.await
.expect("build");
let _: Vec<_> = app
.send_user_message(UserMessage::text("hello"))
.collect()
.await;
let tree = app.branches().await.expect("branches");
assert!(!tree.nodes.is_empty());
assert!(tree.active_leaf.is_some());
}
#[tokio::test]
async fn switch_model_preserves_history() {
use futures::StreamExt;
let dir = tempfile::tempdir().expect("tempdir");
let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
dir.path().join("s"),
));
let mut config = Config::default();
config.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(config)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
.with_session_store(store)
.build()
.await
.expect("build");
let _: Vec<_> = app
.send_user_message(UserMessage::text("keep me"))
.collect()
.await;
let id_before = app.session_id();
app.switch_model(&crate::model::ModelId::from("claude-opus-4-7"))
.await
.expect("switch_model");
assert_eq!(
app.session_id(),
id_before,
"switch_model keeps the same session"
);
let history = app.session_history().await.expect("history");
assert!(history.iter().any(|m| m.text().contains("keep me")));
}
#[tokio::test]
async fn switch_model_is_sticky_for_future_session_rebuilds() {
let dir = tempfile::tempdir().expect("tempdir");
let store: Arc<dyn SessionStore> = Arc::new(motosan_agent_loop::FileSessionStore::new(
dir.path().join("s"),
));
let mut config = Config::default();
config.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(config)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(Arc::new(EchoLlm) as Arc<dyn LlmClient>)
.with_session_store(store)
.build()
.await
.expect("build");
let selected = crate::model::ModelId::from("claude-opus-4-7");
app.switch_model(&selected).await.expect("switch_model");
app.new_session().await.expect("new_session");
assert_eq!(app.factory.current_model(), Some(selected));
}
struct SleepThenDoneLlm {
turn: AtomicUsize,
}
#[async_trait]
impl LlmClient for SleepThenDoneLlm {
async fn chat(
&self,
_messages: &[Message],
_tools: &[ToolDef],
) -> motosan_agent_loop::Result<ChatOutput> {
let turn = self.turn.fetch_add(1, Ordering::SeqCst);
if turn == 0 {
Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
ToolCallItem {
id: "sleep".into(),
name: "bash".into(),
args: serde_json::json!({"command":"sleep 5", "timeout_ms": 10000}),
},
])))
} else {
Ok(ChatOutput::new(LlmResponse::Message("done".into())))
}
}
}
#[tokio::test]
async fn cancel_reaches_builtin_tools_after_session_rebuild() {
use futures::StreamExt;
let dir = tempfile::tempdir().expect("tempdir");
let mut config = Config::default();
config.anthropic.api_key = Some("sk-unused".into());
let app = Arc::new(
AppBuilder::new()
.with_config(config)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(Arc::new(SleepThenDoneLlm {
turn: AtomicUsize::new(0),
}) as Arc<dyn LlmClient>)
.build()
.await
.expect("build"),
);
app.new_session().await.expect("new_session");
let running_app = Arc::clone(&app);
let handle = tokio::spawn(async move {
running_app
.send_user_message(UserMessage::text("run a slow command"))
.collect::<Vec<_>>()
.await
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
app.cancel();
let events = tokio::time::timeout(std::time::Duration::from_secs(2), handle)
.await
.expect("turn should finish after cancellation")
.expect("join");
assert!(
events.iter().any(|event| {
matches!(
event,
UiEvent::ToolCallCompleted { result, .. }
if result.text.contains("command cancelled by user")
)
}),
"cancel should reach the rebuilt bash tool: {events:?}"
);
}
#[tokio::test]
async fn compact_summarizes_a_session_with_enough_history() {
struct DoneLlm;
#[async_trait]
impl LlmClient for DoneLlm {
async fn chat(
&self,
_messages: &[Message],
_tools: &[ToolDef],
) -> motosan_agent_loop::Result<ChatOutput> {
Ok(ChatOutput::new(LlmResponse::Message("done".into())))
}
}
let dir = tempfile::tempdir().expect("tempdir");
let store = Arc::new(motosan_agent_loop::FileSessionStore::new(
dir.path().join("sessions"),
));
let mut config = Config::default();
config.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(config)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(Arc::new(DoneLlm) as Arc<dyn LlmClient>)
.with_session_store(store)
.build()
.await
.expect("build");
for i in 0..4 {
let _: Vec<_> = app
.send_user_message(UserMessage::text(format!("turn {i}")))
.collect()
.await;
}
app.compact().await.expect("compact should succeed");
let history = app.session_history().await.expect("history");
assert!(
!history.is_empty(),
"session should still have content post-compaction"
);
}
#[test]
fn anthropic_env_api_key_overrides_auth_json_key() {
let mut auth = crate::auth::Auth::default();
auth.0.insert(
"anthropic".into(),
crate::auth::ProviderAuth::ApiKey {
key: "sk-auth".into(),
},
);
let key = anthropic_api_key_from(&auth, |name| {
(name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
});
assert_eq!(key.as_deref(), Some("sk-env"));
}
#[tokio::test]
async fn with_settings_overrides_deprecated_config_model() {
use crate::settings::Settings;
let mut config = Config::default();
config.model.name = "from-config".into();
config.anthropic.api_key = Some("sk-config".into());
let mut settings = Settings::default();
settings.model.name = "from-settings".into();
let tmp = tempfile::tempdir().unwrap();
let app = AppBuilder::new()
.with_config(config)
.with_settings(settings)
.with_cwd(tmp.path())
.disable_context_discovery()
.with_llm(Arc::new(EchoLlm))
.build()
.await
.expect("build");
assert_eq!(app.config().model.name, "from-settings");
assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
}
#[tokio::test]
async fn with_settings_synthesises_legacy_config_for_build() {
use crate::auth::{Auth, ProviderAuth};
use crate::settings::Settings;
let mut settings = Settings::default();
settings.model.name = "claude-sonnet-4-6".into();
let mut auth = Auth::default();
auth.0.insert(
"anthropic".into(),
ProviderAuth::ApiKey {
key: "sk-test".into(),
},
);
let tmp = tempfile::tempdir().unwrap();
let app = AppBuilder::new()
.with_settings(settings)
.with_auth(auth)
.with_cwd(tmp.path())
.with_builtin_tools()
.disable_context_discovery()
.with_llm(Arc::new(EchoLlm))
.build()
.await
.expect("build");
let _ = app;
}
#[tokio::test]
async fn cancel_before_turn_does_not_poison_future_turns() {
let dir = tempfile::tempdir().unwrap();
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(std::sync::Arc::new(EchoLlm))
.build()
.await
.expect("build");
app.cancel();
let events: Vec<UiEvent> = app
.send_user_message(UserMessage::text("x"))
.collect()
.await;
assert!(
events
.iter()
.any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
"turn should use a fresh cancellation token: {events:?}"
);
}
#[test]
fn map_event_matches_started_and_completed_ids_by_tool_name() {
let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
let started_bash = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "bash".into(),
}),
&tracker,
);
let started_read = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "read".into(),
}),
&tracker,
);
let completed_bash = map_event(
AgentEvent::Core(CoreEvent::ToolCompleted {
name: "bash".into(),
result: motosan_agent_tool::ToolResult::text("ok"),
}),
&tracker,
);
let completed_read = map_event(
AgentEvent::Core(CoreEvent::ToolCompleted {
name: "read".into(),
result: motosan_agent_tool::ToolResult::text("ok"),
}),
&tracker,
);
assert!(matches!(
started_bash,
Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
));
assert!(matches!(
started_read,
Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
));
assert!(matches!(
completed_bash,
Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
));
assert!(matches!(
completed_read,
Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
));
}
#[test]
fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
let s1 = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "bash".into(),
}),
&tracker,
);
let s2 = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "bash".into(),
}),
&tracker,
);
let c1 = map_event(
AgentEvent::Core(CoreEvent::ToolCompleted {
name: "bash".into(),
result: motosan_agent_tool::ToolResult::text("a"),
}),
&tracker,
);
let c2 = map_event(
AgentEvent::Core(CoreEvent::ToolCompleted {
name: "bash".into(),
result: motosan_agent_tool::ToolResult::text("b"),
}),
&tracker,
);
let id_s1 = match s1 {
Some(UiEvent::ToolCallStarted { id, .. }) => id,
other => panic!("{other:?}"),
};
let id_s2 = match s2 {
Some(UiEvent::ToolCallStarted { id, .. }) => id,
other => panic!("{other:?}"),
};
let id_c1 = match c1 {
Some(UiEvent::ToolCallCompleted { id, .. }) => id,
other => panic!("{other:?}"),
};
let id_c2 = match c2 {
Some(UiEvent::ToolCallCompleted { id, .. }) => id,
other => panic!("{other:?}"),
};
assert_eq!(id_s1, id_c1);
assert_eq!(id_s2, id_c2);
assert_ne!(id_s1, id_s2);
}
#[tokio::test]
async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
let dir = tempfile::tempdir().unwrap();
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(std::sync::Arc::new(ToolOnlyLlm {
turn: AtomicUsize::new(0),
}))
.build()
.await
.expect("build");
let mut first = Box::pin(app.send_user_message(UserMessage::text("first")));
let first_event = first.next().await;
assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
let second_events: Vec<UiEvent> = app
.send_user_message(UserMessage::text("second"))
.collect()
.await;
assert_eq!(
second_events.len(),
1,
"expected immediate single error event, got: {second_events:?}"
);
assert!(matches!(
&second_events[0],
UiEvent::Error(msg) if msg.contains("single-turn-per-App")
));
}
#[tokio::test]
async fn attachment_error_is_emitted_before_the_single_turn_guard_fires() {
let dir = tempfile::tempdir().unwrap();
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(std::sync::Arc::new(ToolOnlyLlm {
turn: AtomicUsize::new(0),
}))
.build()
.await
.expect("build");
let mut first =
Box::pin(app.send_user_message(crate::user_message::UserMessage::text("first")));
let first_event = first.next().await;
assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
let bad = crate::user_message::UserMessage {
text: "second".into(),
attachments: vec![crate::user_message::Attachment::Image {
path: std::path::PathBuf::from("/tmp/this-file-must-not-exist-capo-v06-test.png"),
}],
};
let second_events: Vec<UiEvent> = app.send_user_message(bad).collect().await;
assert_eq!(
second_events.len(),
1,
"expected exactly one event (the attachment error); got: {second_events:?}"
);
assert!(
matches!(
&second_events[0],
UiEvent::AttachmentError {
kind: crate::user_message::AttachmentErrorKind::NotFound,
..
}
),
"expected AttachmentError::NotFound as first event; got {second_events:?}"
);
}
#[test]
fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
assert_eq!(progress_event_id(&tracker), "tool_unknown");
let only = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "bash".into(),
}),
&tracker,
);
let only_id = match only {
Some(UiEvent::ToolCallStarted { id, .. }) => id,
other => panic!("{other:?}"),
};
assert_eq!(progress_event_id(&tracker), only_id);
let _second = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "read".into(),
}),
&tracker,
);
assert_eq!(progress_event_id(&tracker), "tool_unknown");
}
#[tokio::test]
async fn builder_rejects_builtin_and_custom_tools_together() {
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let dir = tempfile::tempdir().unwrap();
let err = match AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_builtin_tools()
.with_custom_tools_factory(|_| Vec::new())
.build()
.await
{
Ok(_) => panic!("must reject conflicting tool configuration"),
Err(err) => err,
};
assert!(format!("{err}").contains("mutually exclusive"));
}
#[tokio::test]
async fn two_turns_in_same_session_share_history() {
#[derive(Default)]
struct CounterLlm {
turn: AtomicUsize,
}
#[async_trait]
impl LlmClient for CounterLlm {
async fn chat(
&self,
messages: &[Message],
_tools: &[ToolDef],
) -> motosan_agent_loop::Result<ChatOutput> {
let turn = self.turn.fetch_add(1, Ordering::SeqCst);
let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
Ok(ChatOutput::new(LlmResponse::Message(answer)))
}
}
let tmp = tempfile::tempdir().unwrap();
let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
tmp.path().to_path_buf(),
));
let app = AppBuilder::new()
.with_settings(crate::settings::Settings::default())
.with_auth(crate::auth::Auth::default())
.with_cwd(tmp.path())
.with_builtin_tools()
.disable_context_discovery()
.with_llm(std::sync::Arc::new(CounterLlm::default()))
.with_session_store(store)
.build_with_session(None)
.await
.expect("build");
let _events1: Vec<UiEvent> = app
.send_user_message(UserMessage::text("hi"))
.collect()
.await;
let events2: Vec<UiEvent> = app
.send_user_message(UserMessage::text("again"))
.collect()
.await;
let saw_more_than_one = events2.iter().any(|e| {
matches!(
e,
UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
)
});
assert!(
saw_more_than_one,
"second turn should have seen history; events: {events2:?}"
);
}
}
#[cfg(test)]
mod skills_builder_tests {
use super::*;
use crate::skills::types::{Skill, SkillSource};
use std::path::PathBuf;
fn fixture() -> Skill {
Skill {
name: "x".into(),
description: "d".into(),
file_path: PathBuf::from("/x.md"),
base_dir: PathBuf::from("/"),
disable_model_invocation: false,
source: SkillSource::Global,
}
}
#[test]
fn with_skills_stores_skills() {
let b = AppBuilder::new().with_skills(vec![fixture()]);
assert_eq!(b.skills.len(), 1);
assert_eq!(b.skills[0].name, "x");
}
#[test]
fn without_skills_clears() {
let b = AppBuilder::new()
.with_skills(vec![fixture()])
.without_skills();
assert!(b.skills.is_empty());
}
}
#[cfg(test)]
mod mcp_builder_tests {
use super::*;
use motosan_agent_tool::Tool;
struct FakeTool;
impl Tool for FakeTool {
fn def(&self) -> motosan_agent_tool::ToolDef {
motosan_agent_tool::ToolDef {
name: "fake__echo".into(),
description: "test".into(),
input_schema: serde_json::json!({"type": "object"}),
}
}
fn call(
&self,
_args: serde_json::Value,
_ctx: &motosan_agent_tool::ToolContext,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
> {
Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
}
}
#[test]
fn with_extra_tools_stores_tools() {
let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
let b = AppBuilder::new().with_extra_tools(tools);
assert_eq!(b.extra_tools.len(), 1);
}
}