#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, MutexGuard};
use futures::{Stream, StreamExt};
use motosan_agent_loop::{
AgentEvent, AgentOp, AgentSession, AgentStreamItem, AutocompactConfig, AutocompactExtension,
CoreEvent, Engine, LlmClient, SessionStore,
};
use motosan_agent_tool::ToolContext;
use tokio::sync::mpsc;
use crate::agent::build_system_prompt;
use crate::config::Config;
use crate::error::{AppError, Result};
use crate::events::{ProgressChunk, UiEvent, UiToolResult};
use crate::llm::build_llm_client;
use crate::permissions::{NoOpPermissionGate, PermissionGate};
use crate::tools::{builtin_tools, SharedCancelToken, ToolCtx, ToolProgressChunk};
pub struct App {
session: Arc<AgentSession>,
config: Config,
cancel_token: SharedCancelToken,
progress_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<ToolProgressChunk>>>,
next_tool_id: Arc<Mutex<ToolCallTracker>>,
skills: Arc<Vec<crate::skills::Skill>>,
mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
pub(crate) session_cache: Arc<crate::permissions::SessionCache>,
}
impl App {
pub fn config(&self) -> &Config {
&self.config
}
pub fn cancel(&self) {
self.cancel_token.cancel();
}
pub fn permissions_cache(&self) -> Arc<crate::permissions::SessionCache> {
Arc::clone(&self.session_cache)
}
pub fn session_id(&self) -> &str {
self.session.session_id()
}
pub async fn session_history(
&self,
) -> motosan_agent_loop::Result<Vec<motosan_agent_loop::Message>> {
self.session.history().await
}
pub async fn disconnect_mcp(&self) {
for (name, server) in &self.mcp_servers {
let _ =
tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
tracing::debug!(target: "mcp", server = %name, "disconnected");
}
}
pub fn send_user_message(&self, text: String) -> impl Stream<Item = UiEvent> + Send + 'static {
let session = Arc::clone(&self.session);
let skills = Arc::clone(&self.skills);
let cancel_token = self.cancel_token.clone();
let tracker = Arc::clone(&self.next_tool_id);
let progress = Arc::clone(&self.progress_rx);
async_stream::stream! {
let mut progress_guard = match progress.try_lock() {
Ok(guard) => guard,
Err(_) => {
yield UiEvent::Error(
"another turn is already running; capo is single-turn-per-App".into(),
);
return;
}
};
let cancel = cancel_token.reset();
yield UiEvent::AgentTurnStarted;
yield UiEvent::AgentThinking;
let history = match session.history().await {
Ok(h) => h,
Err(err) => {
yield UiEvent::Error(format!("session.history failed: {err}"));
return;
}
};
let mut messages = history;
let text = crate::skills::expand::expand_skill_command(&text, &skills);
messages.push(motosan_agent_loop::Message::user(&text));
let handle = match session.start_turn(messages).await {
Ok(h) => h,
Err(err) => {
yield UiEvent::Error(format!("session.start_turn failed: {err}"));
return;
}
};
let previous_len = handle.previous_len;
let epoch = handle.epoch;
let ops_tx = handle.ops_tx.clone();
let mut agent_stream = handle.stream;
let interrupt_bridge = tokio::spawn(async move {
cancel.cancelled().await;
let _ = ops_tx.send(AgentOp::Interrupt).await;
});
let mut terminal_messages: Option<Vec<motosan_agent_loop::Message>> = None;
let mut terminal_result: Option<motosan_agent_loop::Result<motosan_agent_loop::AgentResult>> = None;
loop {
while let Ok(chunk) = progress_guard.try_recv() {
yield UiEvent::ToolCallProgress {
id: progress_event_id(&tracker),
chunk: ProgressChunk::from(chunk),
};
}
tokio::select! {
biased;
maybe_item = agent_stream.next() => {
match maybe_item {
Some(AgentStreamItem::Event(ev)) => {
if let Some(ui) = map_event(ev, &tracker) {
yield ui;
}
}
Some(AgentStreamItem::Terminal(term)) => {
terminal_result = Some(term.result);
terminal_messages = Some(term.messages);
break;
}
None => break,
}
}
Some(chunk) = progress_guard.recv() => {
yield UiEvent::ToolCallProgress {
id: progress_event_id(&tracker),
chunk: ProgressChunk::from(chunk),
};
}
}
}
interrupt_bridge.abort();
if let Some(msgs) = terminal_messages.as_ref() {
if let Err(err) = session.record_turn_outcome(epoch, previous_len, msgs).await {
yield UiEvent::Error(format!("session.record_turn_outcome: {err}"));
}
}
match terminal_result {
Some(Ok(_)) => {
let final_text = terminal_messages
.as_ref()
.and_then(|msgs| {
msgs.iter()
.rev()
.find(|m| m.role() == motosan_agent_loop::Role::Assistant)
.map(|m| m.text())
})
.unwrap_or_default();
if !final_text.is_empty() {
yield UiEvent::AgentMessageComplete(final_text);
}
while let Ok(chunk) = progress_guard.try_recv() {
yield UiEvent::ToolCallProgress {
id: progress_event_id(&tracker),
chunk: ProgressChunk::from(chunk),
};
}
yield UiEvent::AgentTurnComplete;
}
Some(Err(err)) => {
yield UiEvent::Error(format!("{err}"));
}
None => { }
}
}
}
}
#[derive(Debug, Default)]
struct ToolCallTracker {
next_id: usize,
pending: VecDeque<(String, String)>,
}
impl ToolCallTracker {
fn start(&mut self, name: &str) -> String {
self.next_id += 1;
let id = format!("tool_{}", self.next_id);
self.pending.push_back((name.to_string(), id.clone()));
id
}
fn complete(&mut self, name: &str) -> String {
if let Some(pos) = self
.pending
.iter()
.position(|(pending_name, _)| pending_name == name)
{
if let Some((_, id)) = self.pending.remove(pos) {
return id;
}
}
self.next_id += 1;
format!("tool_{}", self.next_id)
}
fn progress_id(&self) -> Option<String> {
match self.pending.len() {
1 => self.pending.front().map(|(_, id)| id.clone()),
_ => None,
}
}
}
fn lock_tool_tracker(tracker: &Arc<Mutex<ToolCallTracker>>) -> MutexGuard<'_, ToolCallTracker> {
match tracker.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
fn progress_event_id(tracker: &Arc<Mutex<ToolCallTracker>>) -> String {
lock_tool_tracker(tracker)
.progress_id()
.unwrap_or_else(|| "tool_unknown".to_string())
}
fn anthropic_api_key_from<F>(auth: &crate::auth::Auth, env_lookup: F) -> Option<String>
where
F: Fn(&str) -> Option<String>,
{
env_lookup("ANTHROPIC_API_KEY")
.map(|key| key.trim().to_string())
.filter(|key| !key.is_empty())
.or_else(|| auth.api_key("anthropic").map(str::to_string))
}
fn map_event(ev: AgentEvent, tool_tracker: &Arc<Mutex<ToolCallTracker>>) -> Option<UiEvent> {
match ev {
AgentEvent::Core(CoreEvent::TextChunk(delta)) => Some(UiEvent::AgentTextDelta(delta)),
AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
let id = lock_tool_tracker(tool_tracker).start(&name);
Some(UiEvent::ToolCallStarted {
id,
name,
args: serde_json::json!({}),
})
}
AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
let id = lock_tool_tracker(tool_tracker).complete(&name);
Some(UiEvent::ToolCallCompleted {
id,
result: UiToolResult {
is_error: result.is_error,
text: format!("{name}: {result:?}"),
},
})
}
_ => None,
}
}
type CustomToolsFactory = Box<dyn FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>>>;
pub struct AppBuilder {
config: Option<Config>,
cwd: Option<PathBuf>,
permission_gate: Option<Arc<dyn PermissionGate>>,
install_builtin_tools: bool,
max_iterations: usize,
llm_override: Option<Arc<dyn LlmClient>>,
custom_tools_factory: Option<CustomToolsFactory>,
permissions_policy_path: Option<PathBuf>,
ui_tx: Option<mpsc::Sender<crate::events::UiEvent>>,
settings: Option<crate::settings::Settings>,
auth: Option<crate::auth::Auth>,
context_discovery_disabled: bool,
session_store: Option<Arc<dyn SessionStore>>,
resume_session_id: Option<crate::session::SessionId>,
autocompact_enabled: bool,
skills: Vec<crate::skills::Skill>,
extra_tools: Vec<Arc<dyn motosan_agent_tool::Tool>>,
mcp_servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
}
impl Default for AppBuilder {
fn default() -> Self {
Self {
config: None,
cwd: None,
permission_gate: None,
install_builtin_tools: false,
max_iterations: 20,
llm_override: None,
custom_tools_factory: None,
permissions_policy_path: None,
ui_tx: None,
settings: None,
auth: None,
context_discovery_disabled: false,
session_store: None,
resume_session_id: None,
autocompact_enabled: false,
skills: Vec::new(),
extra_tools: Vec::new(),
mcp_servers: Vec::new(),
}
}
}
impl AppBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_config(mut self, cfg: Config) -> Self {
self.config = Some(cfg);
self
}
pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
self.cwd = Some(cwd.into());
self
}
pub fn with_permission_gate(mut self, gate: Arc<dyn PermissionGate>) -> Self {
self.permission_gate = Some(gate);
self
}
pub fn with_builtin_tools(mut self) -> Self {
self.install_builtin_tools = true;
self
}
pub fn with_max_iterations(mut self, n: usize) -> Self {
self.max_iterations = n;
self
}
pub fn with_llm(mut self, llm: Arc<dyn LlmClient>) -> Self {
self.llm_override = Some(llm);
self
}
pub fn with_permissions_config(mut self, path: PathBuf) -> Self {
self.permissions_policy_path = Some(path);
self
}
pub fn with_ui_channel(mut self, tx: mpsc::Sender<UiEvent>) -> Self {
self.ui_tx = Some(tx);
self
}
pub fn with_settings(mut self, settings: crate::settings::Settings) -> Self {
self.settings = Some(settings);
self
}
pub fn with_auth(mut self, auth: crate::auth::Auth) -> Self {
self.auth = Some(auth);
self
}
pub fn disable_context_discovery(mut self) -> Self {
self.context_discovery_disabled = true;
self
}
pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
self.session_store = Some(store);
self
}
pub fn with_autocompact(mut self) -> Self {
self.autocompact_enabled = true;
self
}
pub fn with_skills(mut self, skills: Vec<crate::skills::Skill>) -> Self {
self.skills = skills;
self
}
pub fn without_skills(mut self) -> Self {
self.skills.clear();
self
}
pub fn with_extra_tools(mut self, tools: Vec<Arc<dyn motosan_agent_tool::Tool>>) -> Self {
self.extra_tools = tools;
self
}
pub fn with_mcp_servers(
mut self,
servers: Vec<(String, Arc<dyn motosan_agent_loop::mcp::McpServer>)>,
) -> Self {
self.mcp_servers = servers;
self
}
pub fn with_custom_tools_factory(
mut self,
factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
) -> Self {
self.custom_tools_factory = Some(Box::new(factory));
self
}
pub async fn build_with_custom_tools(
self,
factory: impl FnOnce(ToolCtx) -> Vec<Arc<dyn motosan_agent_tool::Tool>> + 'static,
) -> Result<App> {
self.with_custom_tools_factory(factory).build().await
}
pub async fn build_with_session(
mut self,
resume: Option<crate::session::SessionId>,
) -> Result<App> {
if let Some(id) = resume {
if self.session_store.is_none() {
return Err(AppError::Config(
"build_with_session(Some(id)) requires with_session_store(...)".into(),
));
}
self.resume_session_id = Some(id);
}
self.build_internal().await
}
pub async fn build(self) -> Result<App> {
self.build_with_session(None).await
}
async fn build_internal(mut self) -> Result<App> {
let mcp_servers = std::mem::take(&mut self.mcp_servers);
let extra_tools = std::mem::take(&mut self.extra_tools);
let skills = self.skills.clone();
if self.install_builtin_tools && self.custom_tools_factory.is_some() {
return Err(AppError::Config(
"with_builtin_tools and with_custom_tools_factory are mutually exclusive".into(),
));
}
let has_config = self.config.is_some();
let has_auth = self.auth.is_some();
let mut config = self.config.unwrap_or_default();
let settings = match self.settings {
Some(settings) => settings,
None => {
let mut settings = crate::settings::Settings::default();
settings.model.provider = config.model.provider.clone();
settings.model.name = config.model.name.clone();
settings.model.max_tokens = config.model.max_tokens;
settings
}
};
config.model.provider = settings.model.provider.clone();
config.model.name = settings.model.name.clone();
config.model.max_tokens = settings.model.max_tokens;
let mut auth = self.auth.unwrap_or_default();
if !has_auth {
if let Some(key) = config.anthropic.api_key.as_deref() {
auth.0.insert(
"anthropic".into(),
crate::auth::ProviderAuth::ApiKey {
key: key.to_string(),
},
);
}
}
let env_or_auth_key = anthropic_api_key_from(&auth, |name| std::env::var(name).ok());
if env_or_auth_key.is_some() || has_auth || !has_config {
config.anthropic.api_key = env_or_auth_key;
}
let cwd = self
.cwd
.or_else(|| std::env::current_dir().ok())
.unwrap_or_else(|| PathBuf::from("."));
let permission_gate = self.permission_gate.unwrap_or_else(|| {
if self.ui_tx.is_some() {
Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
} else {
tracing::warn!("no PermissionGate and no UI channel — tools run unchecked");
Arc::new(NoOpPermissionGate) as Arc<dyn PermissionGate>
}
});
let llm = if let Some(llm) = self.llm_override {
llm
} else {
build_llm_client(&settings, &auth)?
};
let (progress_tx, progress_rx) = mpsc::channel::<ToolProgressChunk>(64);
let tool_ctx = ToolCtx::new(&cwd, Arc::clone(&permission_gate), progress_tx);
let cancel_token = tool_ctx.cancel_token.clone();
let mut tools = if self.install_builtin_tools {
builtin_tools(tool_ctx.clone())
} else if let Some(factory) = self.custom_tools_factory {
factory(tool_ctx.clone())
} else {
Vec::new()
};
tools.extend(extra_tools);
let tool_names: Vec<String> = tools.iter().map(|t| t.def().name).collect();
let base_prompt = build_system_prompt(&tool_names, &skills);
let system_prompt = if self.context_discovery_disabled {
base_prompt
} else {
let agent_dir = crate::paths::agent_dir();
let context = crate::context_files::load_project_context_files(&cwd, &agent_dir);
crate::context_files::assemble_system_prompt(&base_prompt, &context, &cwd)
};
let motosan_tool_context = ToolContext::new("capo", "capo").with_cwd(&cwd);
let policy: Arc<crate::permissions::Policy> =
Arc::new(match self.permissions_policy_path.as_ref() {
Some(path) => crate::permissions::Policy::load_or_default(path)?,
None => crate::permissions::Policy::default(),
});
let session_cache = Arc::new(crate::permissions::SessionCache::new());
let mut engine_builder = Engine::builder()
.max_iterations(self.max_iterations)
.system_prompt(system_prompt)
.tool_context(motosan_tool_context);
for tool in tools {
engine_builder = engine_builder.tool(tool);
}
if let Some(ui_tx) = self.ui_tx {
let ext = crate::permissions::PermissionExtension::new(
Arc::clone(&policy),
Arc::clone(&session_cache),
cwd.clone(),
ui_tx,
);
engine_builder = engine_builder.extension(Box::new(ext));
}
if self.autocompact_enabled
&& settings.session.compact_at_context_pct > 0.0
&& settings.session.compact_at_context_pct < 1.0
{
let cfg = AutocompactConfig {
threshold: settings.session.compact_at_context_pct,
max_context_tokens: settings.session.max_context_tokens,
keep_turns: settings.session.keep_turns.max(1),
};
let ext = AutocompactExtension::new(cfg, Arc::clone(&llm));
engine_builder = engine_builder.extension(Box::new(ext));
}
let engine = engine_builder.build();
let session = match (self.resume_session_id, self.session_store) {
(Some(id), Some(store)) => {
let s =
AgentSession::resume(id.as_str(), Arc::clone(&store), engine, Arc::clone(&llm))
.await
.map_err(|err| AppError::Config(format!("resume failed: {err}")))?;
let entries = s
.entries()
.await
.map_err(|err| AppError::Config(format!("entries failed: {err}")))?;
crate::session::hydrate_read_files(&entries, &tool_ctx).await?;
s
}
(None, Some(store)) => {
let id = crate::session::SessionId::new();
AgentSession::new_with_store(id.into_string(), store, engine, Arc::clone(&llm))
}
(None, None) => AgentSession::new(engine, Arc::clone(&llm)),
(Some(_), None) => unreachable!("guarded in build_with_session"),
};
Ok(App {
session: Arc::new(session),
config,
cancel_token,
progress_rx: Arc::new(tokio::sync::Mutex::new(progress_rx)),
next_tool_id: Arc::new(Mutex::new(ToolCallTracker::default())),
skills: Arc::new(skills),
mcp_servers,
session_cache,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{AnthropicConfig, ModelConfig};
use crate::events::UiEvent;
use async_trait::async_trait;
use motosan_agent_loop::{ChatOutput, LlmClient, LlmResponse, Message, ToolCallItem};
use motosan_agent_tool::ToolDef;
use std::sync::atomic::{AtomicUsize, Ordering};
#[tokio::test]
async fn builder_fails_without_api_key() {
let cfg = Config {
anthropic: AnthropicConfig {
api_key: None,
base_url: "https://api.anthropic.com".into(),
},
model: ModelConfig {
provider: "anthropic".into(),
name: "claude-sonnet-4-6".into(),
max_tokens: 4096,
},
};
let err = match AppBuilder::new()
.with_config(cfg)
.with_builtin_tools()
.build()
.await
{
Ok(_) => panic!("must fail without key"),
Err(err) => err,
};
assert!(format!("{err}").contains("ANTHROPIC_API_KEY"));
}
struct ToolOnlyLlm {
turn: AtomicUsize,
}
#[async_trait]
impl LlmClient for ToolOnlyLlm {
async fn chat(
&self,
_messages: &[Message],
_tools: &[ToolDef],
) -> motosan_agent_loop::Result<ChatOutput> {
let turn = self.turn.fetch_add(1, Ordering::SeqCst);
if turn == 0 {
Ok(ChatOutput::new(LlmResponse::ToolCalls(vec![
ToolCallItem {
id: "t1".into(),
name: "read".into(),
args: serde_json::json!({"path":"nope.txt"}),
},
])))
} else {
Ok(ChatOutput::new(LlmResponse::Message(String::new())))
}
}
}
#[tokio::test]
async fn empty_final_message_is_not_emitted() {
let dir = tempfile::tempdir().unwrap();
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(std::sync::Arc::new(ToolOnlyLlm {
turn: AtomicUsize::new(0),
}))
.build()
.await
.expect("build");
let events: Vec<UiEvent> =
futures::StreamExt::collect(app.send_user_message("x".into())).await;
let empties = events
.iter()
.filter(|e| matches!(e, UiEvent::AgentMessageComplete(t) if t.is_empty()))
.count();
assert_eq!(
empties, 0,
"should not emit empty final message, got: {events:?}"
);
}
struct EchoLlm;
#[async_trait]
impl LlmClient for EchoLlm {
async fn chat(
&self,
_messages: &[Message],
_tools: &[ToolDef],
) -> motosan_agent_loop::Result<ChatOutput> {
Ok(ChatOutput::new(LlmResponse::Message("ok".into())))
}
}
#[test]
fn anthropic_env_api_key_overrides_auth_json_key() {
let mut auth = crate::auth::Auth::default();
auth.0.insert(
"anthropic".into(),
crate::auth::ProviderAuth::ApiKey {
key: "sk-auth".into(),
},
);
let key = anthropic_api_key_from(&auth, |name| {
(name == "ANTHROPIC_API_KEY").then(|| " sk-env ".to_string())
});
assert_eq!(key.as_deref(), Some("sk-env"));
}
#[tokio::test]
async fn with_settings_overrides_deprecated_config_model() {
use crate::settings::Settings;
let mut config = Config::default();
config.model.name = "from-config".into();
config.anthropic.api_key = Some("sk-config".into());
let mut settings = Settings::default();
settings.model.name = "from-settings".into();
let tmp = tempfile::tempdir().unwrap();
let app = AppBuilder::new()
.with_config(config)
.with_settings(settings)
.with_cwd(tmp.path())
.disable_context_discovery()
.with_llm(Arc::new(EchoLlm))
.build()
.await
.expect("build");
assert_eq!(app.config().model.name, "from-settings");
assert_eq!(app.config().anthropic.api_key.as_deref(), Some("sk-config"));
}
#[tokio::test]
async fn with_settings_synthesises_legacy_config_for_build() {
use crate::auth::{Auth, ProviderAuth};
use crate::settings::Settings;
let mut settings = Settings::default();
settings.model.name = "claude-sonnet-4-6".into();
let mut auth = Auth::default();
auth.0.insert(
"anthropic".into(),
ProviderAuth::ApiKey {
key: "sk-test".into(),
},
);
let tmp = tempfile::tempdir().unwrap();
let app = AppBuilder::new()
.with_settings(settings)
.with_auth(auth)
.with_cwd(tmp.path())
.with_builtin_tools()
.disable_context_discovery()
.with_llm(Arc::new(EchoLlm))
.build()
.await
.expect("build");
let _ = app;
}
#[tokio::test]
async fn cancel_before_turn_does_not_poison_future_turns() {
let dir = tempfile::tempdir().unwrap();
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(std::sync::Arc::new(EchoLlm))
.build()
.await
.expect("build");
app.cancel();
let events: Vec<UiEvent> = app.send_user_message("x".into()).collect().await;
assert!(
events
.iter()
.any(|e| matches!(e, UiEvent::AgentMessageComplete(text) if text == "ok")),
"turn should use a fresh cancellation token: {events:?}"
);
}
#[test]
fn map_event_matches_started_and_completed_ids_by_tool_name() {
let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
let started_bash = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "bash".into(),
}),
&tracker,
);
let started_read = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "read".into(),
}),
&tracker,
);
let completed_bash = map_event(
AgentEvent::Core(CoreEvent::ToolCompleted {
name: "bash".into(),
result: motosan_agent_tool::ToolResult::text("ok"),
}),
&tracker,
);
let completed_read = map_event(
AgentEvent::Core(CoreEvent::ToolCompleted {
name: "read".into(),
result: motosan_agent_tool::ToolResult::text("ok"),
}),
&tracker,
);
assert!(matches!(
started_bash,
Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_1" && name == "bash"
));
assert!(matches!(
started_read,
Some(UiEvent::ToolCallStarted { ref id, ref name, .. }) if id == "tool_2" && name == "read"
));
assert!(matches!(
completed_bash,
Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_1"
));
assert!(matches!(
completed_read,
Some(UiEvent::ToolCallCompleted { ref id, .. }) if id == "tool_2"
));
}
#[test]
fn tool_tracker_handles_two_concurrent_invocations_of_same_tool() {
let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
let s1 = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "bash".into(),
}),
&tracker,
);
let s2 = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "bash".into(),
}),
&tracker,
);
let c1 = map_event(
AgentEvent::Core(CoreEvent::ToolCompleted {
name: "bash".into(),
result: motosan_agent_tool::ToolResult::text("a"),
}),
&tracker,
);
let c2 = map_event(
AgentEvent::Core(CoreEvent::ToolCompleted {
name: "bash".into(),
result: motosan_agent_tool::ToolResult::text("b"),
}),
&tracker,
);
let id_s1 = match s1 {
Some(UiEvent::ToolCallStarted { id, .. }) => id,
other => panic!("{other:?}"),
};
let id_s2 = match s2 {
Some(UiEvent::ToolCallStarted { id, .. }) => id,
other => panic!("{other:?}"),
};
let id_c1 = match c1 {
Some(UiEvent::ToolCallCompleted { id, .. }) => id,
other => panic!("{other:?}"),
};
let id_c2 = match c2 {
Some(UiEvent::ToolCallCompleted { id, .. }) => id,
other => panic!("{other:?}"),
};
assert_eq!(id_s1, id_c1);
assert_eq!(id_s2, id_c2);
assert_ne!(id_s1, id_s2);
}
#[tokio::test]
async fn concurrent_send_user_message_returns_an_error_instead_of_hanging() {
let dir = tempfile::tempdir().unwrap();
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_builtin_tools()
.with_llm(std::sync::Arc::new(ToolOnlyLlm {
turn: AtomicUsize::new(0),
}))
.build()
.await
.expect("build");
let mut first = Box::pin(app.send_user_message("first".into()));
let first_event = first.next().await;
assert!(matches!(first_event, Some(UiEvent::AgentTurnStarted)));
let second_events: Vec<UiEvent> = app.send_user_message("second".into()).collect().await;
assert_eq!(
second_events.len(),
1,
"expected immediate single error event, got: {second_events:?}"
);
assert!(matches!(
&second_events[0],
UiEvent::Error(msg) if msg.contains("single-turn-per-App")
));
}
#[test]
fn progress_events_only_claim_an_id_when_exactly_one_tool_is_pending() {
let tracker = Arc::new(Mutex::new(ToolCallTracker::default()));
assert_eq!(progress_event_id(&tracker), "tool_unknown");
let only = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "bash".into(),
}),
&tracker,
);
let only_id = match only {
Some(UiEvent::ToolCallStarted { id, .. }) => id,
other => panic!("{other:?}"),
};
assert_eq!(progress_event_id(&tracker), only_id);
let _second = map_event(
AgentEvent::Core(CoreEvent::ToolStarted {
name: "read".into(),
}),
&tracker,
);
assert_eq!(progress_event_id(&tracker), "tool_unknown");
}
#[tokio::test]
async fn builder_rejects_builtin_and_custom_tools_together() {
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let dir = tempfile::tempdir().unwrap();
let err = match AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_builtin_tools()
.with_custom_tools_factory(|_| Vec::new())
.build()
.await
{
Ok(_) => panic!("must reject conflicting tool configuration"),
Err(err) => err,
};
assert!(format!("{err}").contains("mutually exclusive"));
}
#[tokio::test]
async fn two_turns_in_same_session_share_history() {
#[derive(Default)]
struct CounterLlm {
turn: AtomicUsize,
}
#[async_trait]
impl LlmClient for CounterLlm {
async fn chat(
&self,
messages: &[Message],
_tools: &[ToolDef],
) -> motosan_agent_loop::Result<ChatOutput> {
let turn = self.turn.fetch_add(1, Ordering::SeqCst);
let answer = format!("turn-{turn}-saw-{}-messages", messages.len());
Ok(ChatOutput::new(LlmResponse::Message(answer)))
}
}
let tmp = tempfile::tempdir().unwrap();
let store = std::sync::Arc::new(motosan_agent_loop::FileSessionStore::new(
tmp.path().to_path_buf(),
));
let app = AppBuilder::new()
.with_settings(crate::settings::Settings::default())
.with_auth(crate::auth::Auth::default())
.with_cwd(tmp.path())
.with_builtin_tools()
.disable_context_discovery()
.with_llm(std::sync::Arc::new(CounterLlm::default()))
.with_session_store(store)
.build_with_session(None)
.await
.expect("build");
let _events1: Vec<UiEvent> = app.send_user_message("hi".into()).collect().await;
let events2: Vec<UiEvent> = app.send_user_message("again".into()).collect().await;
let saw_more_than_one = events2.iter().any(|e| {
matches!(
e,
UiEvent::AgentMessageComplete(t) if t.contains("messages") && !t.contains("saw-1-")
)
});
assert!(
saw_more_than_one,
"second turn should have seen history; events: {events2:?}"
);
}
}
#[cfg(test)]
mod skills_builder_tests {
use super::*;
use crate::skills::types::{Skill, SkillSource};
use std::path::PathBuf;
fn fixture() -> Skill {
Skill {
name: "x".into(),
description: "d".into(),
file_path: PathBuf::from("/x.md"),
base_dir: PathBuf::from("/"),
disable_model_invocation: false,
source: SkillSource::Global,
}
}
#[test]
fn with_skills_stores_skills() {
let b = AppBuilder::new().with_skills(vec![fixture()]);
assert_eq!(b.skills.len(), 1);
assert_eq!(b.skills[0].name, "x");
}
#[test]
fn without_skills_clears() {
let b = AppBuilder::new()
.with_skills(vec![fixture()])
.without_skills();
assert!(b.skills.is_empty());
}
}
#[cfg(test)]
mod mcp_builder_tests {
use super::*;
use motosan_agent_tool::Tool;
struct FakeTool;
impl Tool for FakeTool {
fn def(&self) -> motosan_agent_tool::ToolDef {
motosan_agent_tool::ToolDef {
name: "fake__echo".into(),
description: "test".into(),
input_schema: serde_json::json!({"type": "object"}),
}
}
fn call(
&self,
_args: serde_json::Value,
_ctx: &motosan_agent_tool::ToolContext,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = motosan_agent_tool::ToolResult> + Send + '_>,
> {
Box::pin(async { motosan_agent_tool::ToolResult::text("ok") })
}
}
#[test]
fn with_extra_tools_stores_tools() {
let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(FakeTool)];
let b = AppBuilder::new().with_extra_tools(tools);
assert_eq!(b.extra_tools.len(), 1);
}
}