mod cli;
use anyhow::{Context, Result};
use capo_agent::{AppBuilder, UiEvent};
use clap::Parser;
use cli::{Cli, Mode};
use futures::StreamExt;
use tracing_subscriber::EnvFilter;
fn main() -> Result<()> {
color_eyre::install().ok();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("failed to start tokio runtime")?;
rt.block_on(async_main())
}
fn resolve_permissions_path(
agent_dir: &std::path::Path,
cwd: &std::path::Path,
) -> std::path::PathBuf {
let canonical = agent_dir.join("permissions.toml");
if canonical.exists() {
return canonical;
}
let home_config = std::env::var("HOME")
.ok()
.map(|h| std::path::PathBuf::from(h).join(".config/capo/permissions.toml"));
let project_legacy = cwd.join(".capo").join("permissions.toml");
for legacy in [home_config.as_ref(), Some(&project_legacy)]
.into_iter()
.flatten()
{
if legacy.exists() {
tracing::warn!(
"permissions.toml found at {}; please mv to {}",
legacy.display(),
canonical.display()
);
return legacy.clone();
}
}
canonical
}
async fn async_main() -> Result<()> {
let cli = Cli::parse();
init_tracing(cli.verbosity);
let cwd = std::env::current_dir().context("cannot read cwd")?;
let agent_dir = capo_agent::agent_dir();
let overrides = capo_agent::CliOverrides {
model: cli.model.clone(),
};
let mut settings = capo_agent::Settings::load(&overrides).context("failed to load settings")?;
if let Some(provider) = &cli.provider {
settings.model.provider = provider.clone();
}
let auth = capo_agent::Auth::load(&agent_dir).context("failed to load auth.json")?;
let skills = if cli.no_skills {
Vec::new()
} else {
let result = capo_agent::skills::load_all(&cwd, &capo_agent::paths::agent_dir());
for d in &result.diagnostics {
tracing::warn!(target: "skills", path = %d.file_path.display(), "{}", d.message);
}
eprintln!(" skills: {} loaded", result.skills.len());
result.skills
};
let permissions_path = resolve_permissions_path(&agent_dir, &cwd);
let session_paths = capo_agent::SessionPaths::for_cwd(agent_dir.clone(), &cwd);
let resume_session_id: Option<capo_agent::SessionId> = if let Some(s) = &cli.session {
Some(
capo_agent::SessionLookup::resolve(&session_paths, s)
.await
.context("--session resolution failed")?,
)
} else if cli.continue_session {
Some(
capo_agent::SessionLookup::most_recent(&session_paths)
.await
.context("--continue: no prior session in this directory")?,
)
} else {
None
};
eprintln!(
" provider: {} | model: {}",
settings.model.provider, settings.model.name
);
if let Some(id) = &resume_session_id {
eprintln!(" resuming session: {}", id.as_str());
}
let (mcp_cfg, mcp_diags) =
match capo_agent::mcp::load_config(&cwd, &capo_agent::paths::agent_dir(), &|k| {
std::env::var(k).ok()
}) {
Ok(out) => out,
Err(err) => {
eprintln!(" mcp: config load failed: {err}");
(capo_agent::mcp::McpConfig::default(), Vec::new())
}
};
for d in &mcp_diags {
eprintln!(" mcp: {d}");
}
let started = capo_agent::mcp::connect_all(&mcp_cfg, &capo_agent::paths::agent_dir()).await;
eprintln!(" mcp: {} server(s) connected", started.len());
let mcp_tools: Vec<std::sync::Arc<dyn motosan_agent_tool::Tool>> = {
let mut acc = Vec::new();
for s in &started {
match motosan_agent_loop::mcp::McpToolAdapter::from_server(std::sync::Arc::clone(
&s.server,
))
.await
{
Ok(mut tools) => acc.append(&mut tools),
Err(e) => {
tracing::error!(target: "mcp", server = %s.name, "list_tools failed: {e}")
}
}
}
acc
};
let mcp_pairs = capo_agent::mcp::into_pairs(started);
session_paths.ensure_bucket().ok();
let store: std::sync::Arc<dyn motosan_agent_loop::SessionStore> = std::sync::Arc::new(
motosan_agent_loop::FileSessionStore::new(session_paths.bucket_dir.clone()),
);
match cli.mode() {
Mode::Print(prompt) => {
let mut builder = AppBuilder::new()
.with_settings(settings)
.with_auth(auth)
.with_cwd(&cwd)
.with_builtin_tools()
.with_permissions_config(permissions_path)
.with_session_store(store)
.with_autocompact()
.with_skills(skills.clone());
if cli.no_context_files {
builder = builder.disable_context_discovery();
}
builder = builder
.with_extra_tools(mcp_tools.clone())
.with_mcp_servers(mcp_pairs.clone());
let app = match builder.build_with_session(resume_session_id).await {
Ok(app) => app,
Err(err) => {
disconnect_mcp_pairs(&mcp_pairs).await;
return Err(err).context("failed to build App");
}
};
let result = tokio::select! {
res = run_print_mode(&app, prompt) => res,
_ = tokio::signal::ctrl_c() => {
eprintln!("\n shutting down (ctrl-C)…");
Err(anyhow::anyhow!("interrupted"))
}
};
app.disconnect_mcp().await;
result
}
Mode::InteractiveStub => {
run_interactive(
settings,
auth,
cwd,
permissions_path,
store,
resume_session_id,
skills,
mcp_tools,
mcp_pairs,
&cli,
)
.await
}
}
}
#[allow(clippy::too_many_arguments)]
async fn run_interactive(
settings: capo_agent::Settings,
auth: capo_agent::Auth,
cwd: std::path::PathBuf,
permissions_path: std::path::PathBuf,
store: std::sync::Arc<dyn motosan_agent_loop::SessionStore>,
resume_session_id: Option<capo_agent::SessionId>,
skills: Vec<capo_agent::Skill>,
mcp_tools: Vec<std::sync::Arc<dyn motosan_agent_tool::Tool>>,
mcp_pairs: Vec<(
String,
std::sync::Arc<dyn motosan_agent_loop::mcp::McpServer>,
)>,
cli: &Cli,
) -> Result<()> {
use tokio::sync::mpsc;
let (ui_tx, ui_rx) = mpsc::channel::<capo_agent::UiEvent>(256);
let (cmd_tx, cmd_rx) = mpsc::channel::<capo_agent::Command>(64);
let mut builder = AppBuilder::new()
.with_settings(settings.clone())
.with_auth(auth)
.with_cwd(&cwd)
.with_builtin_tools()
.with_permissions_config(permissions_path)
.with_ui_channel(ui_tx.clone())
.with_session_store(std::sync::Arc::clone(&store))
.with_autocompact()
.with_skills(skills.clone());
if cli.no_context_files {
builder = builder.disable_context_discovery();
}
builder = builder
.with_extra_tools(mcp_tools.clone())
.with_mcp_servers(mcp_pairs.clone());
let app = match builder.build_with_session(resume_session_id).await {
Ok(app) => app,
Err(err) => {
disconnect_mcp_pairs(&mcp_pairs).await;
return Err(err).context("failed to build App");
}
};
let history = match app.session_history().await {
Ok(history) => history,
Err(err) => {
app.disconnect_mcp().await;
return Err(err).context("failed to load session history for transcript replay");
}
};
let mut state = capo_tui::state::AppState::default();
state.footer.cwd = cwd.display().to_string();
state.footer.model = settings.model.name.clone();
state.footer.skills_loaded = skills.len();
state.file_index = capo_tui::file_index::FileIndex::build(&cwd);
if let Ok(metas) = store.list_meta().await {
state.sessions = metas
.into_iter()
.map(|m| {
let short = m.session_id.chars().take(10).collect::<String>();
let name = m.name.unwrap_or_else(|| "(unnamed)".to_string());
capo_tui::state::SessionSummary {
label: format!("{short} {} {name}", m.updated_at_ms),
id: m.session_id,
}
})
.collect();
}
if cli.resume {
state.bottom_pane = Some(capo_tui::state::BottomPane::ResumePicker(
capo_tui::state::ResumePickerState::default(),
));
}
for msg in &history {
let text = msg.text();
if text.is_empty() {
continue;
}
match msg.role() {
motosan_agent_loop::Role::User => state
.messages
.push(capo_tui::state::MessageBlock::User(text)),
motosan_agent_loop::Role::Assistant => state
.messages
.push(capo_tui::state::MessageBlock::Assistant(text)),
_ => {}
}
}
let app = std::sync::Arc::new(app);
tokio::spawn(forward_commands(
std::sync::Arc::clone(&app),
ui_tx.clone(),
cmd_rx,
));
let ui_stream = Box::pin(futures::stream::unfold(ui_rx, |mut rx| async move {
rx.recv().await.map(|event| (event, rx))
}));
let result = tokio::select! {
res = capo_tui::run_tui(state, ui_stream, cmd_tx) => {
res.map(|_| ()).map_err(|e| anyhow::anyhow!("TUI error: {e}"))
}
_ = tokio::signal::ctrl_c() => {
eprintln!("\n shutting down (ctrl-C)…");
Err(anyhow::anyhow!("interrupted"))
}
};
app.disconnect_mcp().await;
result
}
async fn disconnect_mcp_pairs(
servers: &[(
String,
std::sync::Arc<dyn motosan_agent_loop::mcp::McpServer>,
)],
) {
for (name, server) in servers {
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), server.disconnect()).await;
tracing::debug!(target: "mcp", server = %name, "disconnected after build failure");
}
}
async fn forward_commands(
app: std::sync::Arc<capo_agent::App>,
ui_tx: tokio::sync::mpsc::Sender<capo_agent::UiEvent>,
mut cmd_rx: tokio::sync::mpsc::Receiver<capo_agent::Command>,
) {
use capo_agent::Command;
let mut active_turn: Option<tokio::task::JoinHandle<()>> = None;
while let Some(cmd) = cmd_rx.recv().await {
if active_turn
.as_ref()
.map(|handle| handle.is_finished())
.unwrap_or(false)
{
active_turn = None;
}
match cmd {
Command::SendUserMessage(text) => {
if active_turn.is_some() {
tracing::debug!("turn already running; ignoring extra SendUserMessage");
continue;
}
let app = std::sync::Arc::clone(&app);
let ui_tx = ui_tx.clone();
active_turn = Some(tokio::spawn(async move {
let mut stream = Box::pin(app.send_user_message(text));
while let Some(ev) = stream.next().await {
if ui_tx.send(ev).await.is_err() {
break;
}
}
}));
}
Command::CancelAgent => app.cancel(),
Command::Quit => {
if let Some(handle) = active_turn.take() {
handle.abort();
}
return;
}
Command::ResolvePermission(resolution) => {
use capo_agent::PermissionChoice;
if resolution.choice == PermissionChoice::AllowSession {
let key = capo_agent::permissions::SessionCache::key(
&resolution.tool,
&resolution.args,
);
app.permissions_cache()
.insert(key, capo_agent::Decision::Allowed);
}
}
Command::RunInlineBash {
command,
send_to_llm,
} => {
if send_to_llm && active_turn.is_some() {
let _ = ui_tx
.send(capo_agent::UiEvent::Error(
"cannot run !cmd while an agent turn is running".into(),
))
.await;
continue;
}
let output = run_inline_bash(&command).await;
if send_to_llm {
let composed =
format!("I ran `{command}` in the shell. Output:\n\n```\n{output}\n```");
let app = std::sync::Arc::clone(&app);
let ui_tx = ui_tx.clone();
active_turn = Some(tokio::spawn(async move {
let mut stream = Box::pin(app.send_user_message(composed));
while let Some(ev) = stream.next().await {
if ui_tx.send(ev).await.is_err() {
break;
}
}
}));
} else {
let _ = ui_tx
.send(capo_agent::UiEvent::InlineBashOutput { command, output })
.await;
}
}
Command::Compact => {
if active_turn.is_some() {
let _ = ui_tx
.send(capo_agent::UiEvent::Error(
"cannot compact while an agent turn is running".into(),
))
.await;
continue;
}
if let Err(err) = app.compact().await {
let _ = ui_tx
.send(capo_agent::UiEvent::Error(err.to_string()))
.await;
}
}
Command::NewSession => {
if active_turn.is_some() {
let _ = ui_tx
.send(capo_agent::UiEvent::Error(
"finish or cancel the current turn before /new".into(),
))
.await;
continue;
}
match app.new_session().await {
Ok(()) => {
let _ = ui_tx
.send(capo_agent::UiEvent::SessionReplaced(Vec::new()))
.await;
}
Err(err) => {
let _ = ui_tx
.send(capo_agent::UiEvent::Error(format!("/new failed: {err}")))
.await;
}
}
}
Command::LoadSession(id) => {
if active_turn.is_some() {
let _ = ui_tx
.send(capo_agent::UiEvent::Error(
"finish or cancel the current turn before /resume".into(),
))
.await;
continue;
}
match app.load_session(&id).await {
Ok(()) => {
let history = app.session_history().await.unwrap_or_default();
let _ = ui_tx
.send(capo_agent::UiEvent::SessionReplaced(history))
.await;
}
Err(err) => {
let _ = ui_tx
.send(capo_agent::UiEvent::Error(format!("/resume failed: {err}")))
.await;
}
}
}
Command::SwitchModel(model) => {
if active_turn.is_some() {
let _ = ui_tx
.send(capo_agent::UiEvent::Error(
"finish or cancel the current turn before /model".into(),
))
.await;
continue;
}
match app.switch_model(&model).await {
Ok(()) => {
let _ = ui_tx.send(capo_agent::UiEvent::ModelSwitched(model)).await;
}
Err(err) => {
let _ = ui_tx
.send(capo_agent::UiEvent::Error(format!("/model failed: {err}")))
.await;
}
}
}
}
}
}
async fn run_inline_bash(command: &str) -> String {
run_inline_bash_with_timeout(command, std::time::Duration::from_secs(30)).await
}
async fn run_inline_bash_with_timeout(command: &str, timeout: std::time::Duration) -> String {
use tokio::process::Command as ProcCommand;
const MAX: usize = 30 * 1024;
let mut child = ProcCommand::new("sh");
child.arg("-c").arg(command).kill_on_drop(true);
let result = tokio::time::timeout(timeout, child.output()).await;
match result {
Ok(Ok(out)) => {
let mut s = String::from_utf8_lossy(&out.stdout).into_owned();
let err = String::from_utf8_lossy(&out.stderr);
if !err.is_empty() {
s.push_str(&err);
}
if s.len() > MAX {
truncate_utf8(&mut s, MAX);
s.push_str("\n… (output truncated)");
}
if s.is_empty() {
"(no output)".to_string()
} else {
s
}
}
Ok(Err(e)) => format!("failed to run command: {e}"),
Err(_) => format!("command timed out after {}s", timeout.as_secs()),
}
}
fn truncate_utf8(s: &mut String, max: usize) {
let mut end = max.min(s.len());
while !s.is_char_boundary(end) {
end -= 1;
}
s.truncate(end);
}
async fn run_print_mode(app: &capo_agent::App, prompt: String) -> Result<()> {
let events: Vec<UiEvent> = app.send_user_message(prompt).collect().await;
let rendered = render_print_mode_events(&events)?;
print!("{}", rendered.stdout);
eprint!("{}", rendered.stderr);
Ok(())
}
#[derive(Debug, Default, PartialEq, Eq)]
struct RenderedPrintMode {
stdout: String,
stderr: String,
}
fn render_print_mode_events(events: &[UiEvent]) -> Result<RenderedPrintMode> {
let mut rendered = RenderedPrintMode::default();
let mut saw_text_delta = false;
for event in events {
match event {
UiEvent::AgentTurnStarted => {}
UiEvent::AgentThinking => rendered.stderr.push_str("[thinking]\n"),
UiEvent::AgentTextDelta(chunk) => {
saw_text_delta = true;
rendered.stdout.push_str(chunk);
}
UiEvent::AgentMessageComplete(text) => {
if !saw_text_delta && !text.is_empty() {
rendered.stdout.push_str(text);
rendered.stdout.push('\n');
}
}
UiEvent::ToolCallStarted { name, args, .. } => {
rendered
.stderr
.push_str(&format!("[tool:start] {name} {args}\n"));
}
UiEvent::ToolCallProgress { .. } => {}
UiEvent::ToolCallCompleted { result, .. } => {
if result.is_error {
rendered
.stderr
.push_str(&format!("[tool:error] {}\n", result.text));
} else {
rendered.stderr.push_str("[tool:done]\n");
}
}
UiEvent::AgentTurnComplete => {
if saw_text_delta {
rendered.stdout.push('\n');
}
}
UiEvent::PermissionRequested { .. } => {
}
UiEvent::InlineBashOutput { .. } => {}
UiEvent::SessionReplaced(_) | UiEvent::ModelSwitched(_) => {}
UiEvent::Error(msg) => {
rendered.stderr.push_str(&format!("[error] {msg}\n"));
return Err(anyhow::anyhow!("agent error: {msg}"));
}
}
}
Ok(rendered)
}
fn init_tracing(verbosity: u8) {
let default_level = match verbosity {
0 => "info",
1 => "debug",
_ => "trace",
};
let filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level));
tracing_subscriber::fmt()
.with_env_filter(filter)
.with_writer(std::io::stderr)
.with_target(false)
.compact()
.init();
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use capo_agent::{
AppBuilder, Command, Config, PermissionChoice, PermissionResolution, UiToolResult,
};
use motosan_agent_loop::{ChatOutput, LlmClient, Message};
use motosan_agent_tool::ToolDef;
use super::*;
#[test]
fn print_mode_does_not_duplicate_streamed_final_text() {
let events = vec![
UiEvent::AgentTurnStarted,
UiEvent::AgentThinking,
UiEvent::AgentTextDelta("hello".into()),
UiEvent::AgentTextDelta(" world".into()),
UiEvent::AgentMessageComplete("hello world".into()),
UiEvent::AgentTurnComplete,
];
let rendered = render_print_mode_events(&events)
.unwrap_or_else(|e| panic!("unexpected render error: {e}"));
assert_eq!(rendered.stdout, "hello world\n");
assert_eq!(rendered.stderr, "[thinking]\n");
}
#[test]
fn print_mode_prints_non_streamed_final_text_once() {
let events = vec![
UiEvent::AgentTurnStarted,
UiEvent::AgentMessageComplete("hello world".into()),
UiEvent::AgentTurnComplete,
];
let rendered = render_print_mode_events(&events)
.unwrap_or_else(|e| panic!("unexpected render error: {e}"));
assert_eq!(rendered.stdout, "hello world\n");
assert!(rendered.stderr.is_empty());
}
#[test]
fn print_mode_renders_tool_events_to_stderr() {
let events = vec![
UiEvent::ToolCallStarted {
id: "tool_1".into(),
name: "bash".into(),
args: serde_json::json!({"command": "echo hi"}),
},
UiEvent::ToolCallCompleted {
id: "tool_1".into(),
result: UiToolResult {
is_error: false,
text: "ok".into(),
},
},
];
let rendered = render_print_mode_events(&events)
.unwrap_or_else(|e| panic!("unexpected render error: {e}"));
assert!(rendered.stderr.contains("[tool:start] bash"));
assert!(rendered.stderr.contains("[tool:done]"));
}
#[test]
fn truncate_utf8_never_splits_multibyte_characters() {
let mut text = "a😀".repeat(10_000);
truncate_utf8(&mut text, 30 * 1024);
assert!(text.len() <= 30 * 1024);
assert!(text.is_char_boundary(text.len()));
}
#[tokio::test]
async fn inline_bash_times_out() {
let output = run_inline_bash_with_timeout("sleep 5", Duration::from_millis(20)).await;
assert!(output.contains("timed out"), "got: {output}");
}
struct PendingLlm;
#[async_trait]
impl LlmClient for PendingLlm {
async fn chat(
&self,
_messages: &[Message],
_tools: &[ToolDef],
) -> motosan_agent_loop::Result<ChatOutput> {
futures::future::pending::<motosan_agent_loop::Result<ChatOutput>>().await
}
}
#[tokio::test]
async fn command_forwarder_handles_resolution_while_turn_is_running() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("tempdir: {e}"));
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = match AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_llm(Arc::new(PendingLlm))
.build()
.await
{
Ok(app) => Arc::new(app),
Err(err) => panic!("build: {err}"),
};
let (ui_tx, _ui_rx) = tokio::sync::mpsc::channel::<UiEvent>(16);
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel::<Command>(16);
let forwarder = tokio::spawn(forward_commands(Arc::clone(&app), ui_tx, cmd_rx));
if cmd_tx
.send(Command::SendUserMessage("hi".into()))
.await
.is_err()
{
panic!("command channel closed");
}
tokio::task::yield_now().await;
let args = serde_json::json!({"command": "echo hi"});
let key = capo_agent::permissions::SessionCache::key("bash", &args);
if cmd_tx
.send(Command::ResolvePermission(PermissionResolution {
tool: "bash".into(),
args,
choice: PermissionChoice::AllowSession,
}))
.await
.is_err()
{
panic!("command channel closed");
}
let observed = tokio::time::timeout(Duration::from_millis(200), async {
loop {
if app.permissions_cache().get(&key).is_some() {
break true;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap_or(false);
if cmd_tx.send(Command::Quit).await.is_err() {
panic!("command channel closed");
}
match tokio::time::timeout(Duration::from_millis(200), forwarder).await {
Ok(Ok(())) => {}
Ok(Err(err)) => panic!("forwarder join failed: {err}"),
Err(_) => panic!("forwarder did not exit"),
}
assert!(observed, "session cache was not updated while turn ran");
}
#[tokio::test]
async fn command_forwarder_rejects_inline_bash_send_before_running_shell() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("tempdir: {e}"));
let marker = dir.path().join("inline-bash-should-not-run");
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = match AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_llm(Arc::new(PendingLlm))
.build()
.await
{
Ok(app) => Arc::new(app),
Err(err) => panic!("build: {err}"),
};
let (ui_tx, mut ui_rx) = tokio::sync::mpsc::channel::<UiEvent>(16);
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel::<Command>(16);
let forwarder = tokio::spawn(forward_commands(app, ui_tx, cmd_rx));
if cmd_tx
.send(Command::SendUserMessage("first".into()))
.await
.is_err()
{
panic!("command channel closed");
}
match tokio::time::timeout(Duration::from_millis(200), ui_rx.recv()).await {
Ok(Some(UiEvent::AgentTurnStarted)) => {}
Ok(Some(other)) => panic!("expected turn start, got {other:?}"),
Ok(None) => panic!("ui channel closed"),
Err(_) => panic!("timed out waiting for first turn"),
}
let command = format!("touch {}", marker.display());
if cmd_tx
.send(Command::RunInlineBash {
command,
send_to_llm: true,
})
.await
.is_err()
{
panic!("command channel closed");
}
let mut saw_rejection = false;
while let Ok(Some(event)) =
tokio::time::timeout(Duration::from_millis(200), ui_rx.recv()).await
{
if matches!(event, UiEvent::Error(message) if message.contains("cannot run !cmd")) {
saw_rejection = true;
break;
}
}
if cmd_tx.send(Command::Quit).await.is_err() {
panic!("command channel closed");
}
match tokio::time::timeout(Duration::from_millis(200), forwarder).await {
Ok(Ok(())) => {}
Ok(Err(err)) => panic!("forwarder join failed: {err}"),
Err(_) => panic!("forwarder did not exit"),
}
assert!(saw_rejection, "inline bash send was not rejected");
assert!(!marker.exists(), "inline bash command ran before rejection");
}
#[tokio::test]
async fn command_forwarder_rejects_compact_while_turn_is_running() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("tempdir: {e}"));
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = match AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_llm(Arc::new(PendingLlm))
.build()
.await
{
Ok(app) => Arc::new(app),
Err(err) => panic!("build: {err}"),
};
let (ui_tx, mut ui_rx) = tokio::sync::mpsc::channel::<UiEvent>(16);
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel::<Command>(16);
let forwarder = tokio::spawn(forward_commands(app, ui_tx, cmd_rx));
if cmd_tx
.send(Command::SendUserMessage("first".into()))
.await
.is_err()
{
panic!("command channel closed");
}
match tokio::time::timeout(Duration::from_millis(200), ui_rx.recv()).await {
Ok(Some(UiEvent::AgentTurnStarted)) => {}
Ok(Some(other)) => panic!("expected turn start, got {other:?}"),
Ok(None) => panic!("ui channel closed"),
Err(_) => panic!("timed out waiting for first turn"),
}
if cmd_tx.send(Command::Compact).await.is_err() {
panic!("command channel closed");
}
match tokio::time::timeout(Duration::from_millis(200), ui_rx.recv()).await {
Ok(Some(UiEvent::AgentThinking)) => {}
Ok(Some(other)) => panic!("expected thinking before compact rejection, got {other:?}"),
Ok(None) => panic!("ui channel closed"),
Err(_) => panic!("timed out waiting for thinking event"),
}
match tokio::time::timeout(Duration::from_millis(200), ui_rx.recv()).await {
Ok(Some(UiEvent::Error(message))) => {
assert!(message.contains("cannot compact while an agent turn is running"));
}
Ok(Some(other)) => panic!("expected compact rejection, got {other:?}"),
Ok(None) => panic!("ui channel closed"),
Err(_) => panic!("timed out waiting for compact rejection"),
}
if cmd_tx.send(Command::Quit).await.is_err() {
panic!("command channel closed");
}
match tokio::time::timeout(Duration::from_millis(200), forwarder).await {
Ok(Ok(())) => {}
Ok(Err(err)) => panic!("forwarder join failed: {err}"),
Err(_) => panic!("forwarder did not exit"),
}
}
struct DoneLlm;
#[async_trait]
impl LlmClient for DoneLlm {
async fn chat(
&self,
_messages: &[Message],
_tools: &[ToolDef],
) -> motosan_agent_loop::Result<ChatOutput> {
Ok(ChatOutput::new(motosan_agent_loop::LlmResponse::Message(
"done".into(),
)))
}
}
#[tokio::test]
async fn command_forwarder_reports_compact_errors_once() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("tempdir: {e}"));
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = match AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_llm(Arc::new(DoneLlm))
.build()
.await
{
Ok(app) => Arc::new(app),
Err(err) => panic!("build: {err}"),
};
for i in 0..4 {
let _: Vec<_> = app.send_user_message(format!("turn {i}")).collect().await;
}
let (ui_tx, mut ui_rx) = tokio::sync::mpsc::channel::<UiEvent>(16);
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel::<Command>(16);
let forwarder = tokio::spawn(forward_commands(app, ui_tx, cmd_rx));
if cmd_tx.send(Command::Compact).await.is_err() {
panic!("command channel closed");
}
match tokio::time::timeout(Duration::from_millis(200), ui_rx.recv()).await {
Ok(Some(UiEvent::Error(message))) => {
assert_eq!(message.matches("compaction failed").count(), 1, "{message}");
}
Ok(Some(other)) => panic!("expected compact error, got {other:?}"),
Ok(None) => panic!("ui channel closed"),
Err(_) => panic!("timed out waiting for compact error"),
}
if cmd_tx.send(Command::Quit).await.is_err() {
panic!("command channel closed");
}
match tokio::time::timeout(Duration::from_millis(200), forwarder).await {
Ok(Ok(())) => {}
Ok(Err(err)) => panic!("forwarder join failed: {err}"),
Err(_) => panic!("forwarder did not exit"),
}
}
#[tokio::test]
async fn command_forwarder_ignores_second_send_while_turn_is_running() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("tempdir: {e}"));
let mut cfg = Config::default();
cfg.anthropic.api_key = Some("sk-unused".into());
let app = match AppBuilder::new()
.with_config(cfg)
.with_cwd(dir.path())
.with_llm(Arc::new(PendingLlm))
.build()
.await
{
Ok(app) => Arc::new(app),
Err(err) => panic!("build: {err}"),
};
let (ui_tx, mut ui_rx) = tokio::sync::mpsc::channel::<UiEvent>(16);
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel::<Command>(16);
let forwarder = tokio::spawn(forward_commands(app, ui_tx, cmd_rx));
if cmd_tx
.send(Command::SendUserMessage("first".into()))
.await
.is_err()
{
panic!("command channel closed");
}
match tokio::time::timeout(Duration::from_millis(200), ui_rx.recv()).await {
Ok(Some(UiEvent::AgentTurnStarted)) => {}
Ok(Some(other)) => panic!("expected turn start, got {other:?}"),
Ok(None) => panic!("ui channel closed"),
Err(_) => panic!("timed out waiting for first turn"),
}
if cmd_tx
.send(Command::SendUserMessage("second".into()))
.await
.is_err()
{
panic!("command channel closed");
}
let mut saw_concurrency_error = false;
let mut extra_turn_starts = 0;
while let Ok(Some(event)) =
tokio::time::timeout(Duration::from_millis(25), ui_rx.recv()).await
{
match event {
UiEvent::Error(message) if message.contains("another turn") => {
saw_concurrency_error = true;
}
UiEvent::AgentTurnStarted => extra_turn_starts += 1,
_ => {}
}
}
if cmd_tx.send(Command::Quit).await.is_err() {
panic!("command channel closed");
}
match tokio::time::timeout(Duration::from_millis(200), forwarder).await {
Ok(Ok(())) => {}
Ok(Err(err)) => panic!("forwarder join failed: {err}"),
Err(_) => panic!("forwarder did not exit"),
}
assert!(!saw_concurrency_error, "second send leaked to App");
assert_eq!(extra_turn_starts, 0, "second send started another turn");
}
}