use crate::repl::runner::Runner;
use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use futures_util::StreamExt;
use std::{collections::HashMap, ffi::OsString};
pub mod attach;
pub mod config;
pub mod console;
pub mod external;
mod foreground;
mod service;
#[derive(Parser, Debug)]
#[command(name = "crabtalk", about = "Crabtalk — AI agent platform")]
pub struct Cli {
#[arg(long, group = "daemon_op")]
pub start: bool,
#[arg(long, group = "daemon_op")]
pub restart: bool,
#[arg(long, group = "daemon_op")]
pub foreground: bool,
#[arg(long, group = "daemon_op")]
pub stop: bool,
#[arg(long, group = "daemon_op")]
pub reload: bool,
#[arg(long, group = "daemon_op")]
pub events: bool,
#[arg(short, long, action = clap::ArgAction::Count)]
pub verbose: u8,
#[arg(long)]
pub tcp: bool,
#[arg(long, default_value = "crab")]
pub agent: String,
#[command(subcommand)]
pub command: Option<Command>,
}
impl Cli {
pub fn log_filter(&self) -> Option<&'static str> {
if self.foreground && self.verbose > 0 {
Some(match self.verbose {
1 => "crabtalk=info",
2 => "crabtalk=debug",
_ => "crabtalk=trace",
})
} else if matches!(self.command, Some(Command::Pull { .. })) {
Some("crabtalk=info")
} else {
None
}
}
pub async fn run(self) -> Result<()> {
if self.start || self.restart {
if self.restart {
let _ = service::uninstall();
}
daemon::config::scaffold_config_dir(&wcore::paths::CONFIG_DIR)?;
let config_path = wcore::paths::CONFIG_DIR.join(wcore::paths::CONFIG_FILE);
let config = daemon::DaemonConfig::load(&config_path)?;
if config.provider.is_empty() {
attach::setup_provider(&config_path)?;
}
return service::install(self.verbose.max(1), self.restart);
}
if self.foreground {
return foreground::start().await;
}
if self.stop {
return service::uninstall();
}
if self.reload {
let mut runner = connect_default_or_tcp(self.tcp).await?;
runner.reload().await?;
println!("daemon reloaded");
return Ok(());
}
if self.events {
use wcore::protocol::message::AgentEventKind;
let mut runner = connect_default_or_tcp(self.tcp).await?;
let stream = runner.subscribe_events();
tokio::pin!(stream);
let mut buffers: HashMap<(String, String), String> = HashMap::new();
while let Some(Ok(event)) = stream.next().await {
let key = (event.agent.clone(), event.sender.clone());
match AgentEventKind::try_from(event.kind) {
Ok(AgentEventKind::TextStart | AgentEventKind::ThinkingStart) => {
buffers.insert(key, String::new());
}
Ok(AgentEventKind::TextDelta | AgentEventKind::ThinkingDelta) => {
if let Some(buf) = buffers.get_mut(&key) {
buf.push_str(&event.content);
}
}
Ok(end @ (AgentEventKind::TextEnd | AgentEventKind::ThinkingEnd)) => {
if let Some(text) = buffers.remove(&key) {
let label = if end == AgentEventKind::ThinkingEnd {
"thinking"
} else {
"text"
};
let trimmed = truncate_for_display(&text, 80);
println!("[{}] {label}: {trimmed}", event.agent);
}
}
_ => {
println!(
"[{}] {} (sender {})",
event.agent, event.content, event.sender
);
}
}
}
return Ok(());
}
match self.command {
None => {
let runner = connect_or_start(self.tcp, self.verbose.max(1)).await?;
let mut repl = crate::repl::ChatRepl::new(runner, self.agent)?;
repl.run().await
}
Some(Command::Resume { file }) => {
let runner = connect_default_or_tcp(self.tcp).await?;
if let Some(path) = file {
let mut repl = crate::repl::ChatRepl::new(runner, self.agent)?;
repl.resume(std::path::PathBuf::from(path)).await
} else {
let cmd = console::Console;
let selected = cmd.run(runner).await?;
if let Some(path) = selected {
let runner = connect_default_or_tcp(self.tcp).await?;
let mut repl = crate::repl::ChatRepl::new(runner, self.agent)?;
repl.resume(path).await
} else {
Ok(())
}
}
}
Some(Command::Pull { plugin, force }) => {
use std::io::Write;
use wcore::protocol::message::plugin_event;
let mut runner = connect_default_or_tcp(self.tcp).await?;
let mut stream = std::pin::pin!(runner.install_plugin(&plugin, "", "", force));
let mut last_was_output = false;
while let Some(event) = stream.next().await {
match event? {
plugin_event::Event::Step(s) => {
if last_was_output {
println!();
last_was_output = false;
}
println!(" {}", s.message);
}
plugin_event::Event::SetupOutput(o) => {
print!("\r {}", o.content);
let _ = std::io::stdout().flush();
last_was_output = true;
}
plugin_event::Event::Warning(w) => {
if last_was_output {
println!();
last_was_output = false;
}
eprintln!(" warning: {}", w.message);
}
plugin_event::Event::Done(d) => {
if last_was_output {
println!();
}
if !d.error.is_empty() {
anyhow::bail!("{}", d.error);
}
}
}
}
println!("Done: {plugin}");
Ok(())
}
Some(Command::Rm { plugin }) => {
let mut runner = connect_default_or_tcp(self.tcp).await?;
let mut stream = std::pin::pin!(runner.uninstall_plugin(&plugin));
while let Some(event) = stream.next().await {
match event? {
wcore::protocol::message::plugin_event::Event::Step(s) => {
println!(" {}", s.message);
}
wcore::protocol::message::plugin_event::Event::Warning(w) => {
eprintln!(" warning: {}", w.message);
}
wcore::protocol::message::plugin_event::Event::Done(d) => {
if !d.error.is_empty() {
anyhow::bail!("{}", d.error);
}
}
_ => {}
}
}
println!("Done: {plugin}");
Ok(())
}
Some(Command::Config(cmd)) => cmd.run().await,
Some(Command::Ps) => {
let run_dir = &*wcore::paths::RUN_DIR;
let mut found = false;
if let Ok(entries) = std::fs::read_dir(run_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("port") {
continue;
}
let Some(name) = path.file_stem().and_then(|s| s.to_str()) else {
continue;
};
if name == "crabtalk" {
continue;
}
if let Ok(contents) = std::fs::read_to_string(&path)
&& let Ok(port) = contents.trim().parse::<u16>()
{
let alive = std::net::TcpStream::connect(("127.0.0.1", port)).is_ok();
let status = if alive { "running" } else { "stale" };
println!("{name}\t:{port}\t{status}");
found = true;
}
}
}
if !found {
println!("no services running");
}
Ok(())
}
Some(Command::Logs { tail_args }) => crabtalk_command::view_logs("daemon", &tail_args),
Some(Command::External(args)) => external::run(args),
}
}
}
#[derive(Subcommand, Debug)]
pub enum Command {
Pull {
plugin: String,
#[arg(long)]
force: bool,
},
Rm {
plugin: String,
},
Config(config::Config),
Ps,
Logs {
#[arg(trailing_var_arg = true, allow_hyphen_values = true)]
tail_args: Vec<String>,
},
Resume {
file: Option<String>,
},
#[command(external_subcommand)]
External(Vec<OsString>),
}
async fn connect_or_start(use_tcp: bool, verbose: u8) -> Result<Runner> {
match connect_default_or_tcp(use_tcp).await {
Ok(runner) => Ok(runner),
Err(e) => {
tracing::debug!("daemon not reachable, starting: {e}");
daemon::config::scaffold_config_dir(&wcore::paths::CONFIG_DIR)?;
let config_path = wcore::paths::CONFIG_DIR.join(wcore::paths::CONFIG_FILE);
let config = daemon::DaemonConfig::load(&config_path)?;
if config.provider.is_empty() {
attach::setup_provider(&config_path)?;
}
service::install(verbose, false)?;
for _ in 0..20 {
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
if let Ok(runner) = connect_default_or_tcp(use_tcp).await {
return Ok(runner);
}
}
anyhow::bail!("daemon started but not reachable after 5s")
}
}
}
async fn connect_default_or_tcp(use_tcp: bool) -> Result<Runner> {
if use_tcp {
connect_tcp().await
} else {
connect_default().await
}
}
pub(crate) async fn connect_default() -> Result<Runner> {
#[cfg(unix)]
{
let socket_path = &*wcore::paths::SOCKET_PATH;
Runner::connect(socket_path).await.with_context(|| {
format!(
"failed to connect to crabtalk daemon at {}",
socket_path.display()
)
})
}
#[cfg(not(unix))]
{
connect_tcp().await
}
}
pub(crate) async fn connect_tcp() -> Result<Runner> {
let tcp_port_file = &*wcore::paths::TCP_PORT_FILE;
let port_str = std::fs::read_to_string(tcp_port_file).with_context(|| {
format!(
"failed to read TCP port file at {}",
tcp_port_file.display()
)
})?;
let port: u16 = port_str
.trim()
.parse()
.with_context(|| format!("invalid port in {}", tcp_port_file.display()))?;
Runner::connect_tcp(port)
.await
.with_context(|| format!("failed to connect to crabtalk daemon via TCP on port {port}"))
}
fn truncate_for_display(s: &str, max: usize) -> String {
if s.len() <= max {
return s.to_owned();
}
let mut end = max.saturating_sub(3);
while !s.is_char_boundary(end) {
end -= 1;
}
format!("{}...", &s[..end])
}