#![cfg(all(feature = "comms", any(unix, windows)))]
use std::io::Write;
use std::path::Path;
use anyhow::{Context, Result};
use clap::Subcommand;
use serde_json::json;
use crate::comms::client::{CommsClient, scope_context_for};
use crate::comms::cursor::Cursor;
use crate::comms::ids::{AgentId, RoomId};
use crate::comms::model::{AgentCard, Room, RoomScope};
use crate::comms::protocol::SeqMeta;
const DEFAULT_LIMIT: u32 = 100;
const MAX_LIMIT: u32 = 1000;
const DEFAULT_SINCE_HOURS: u32 = 24;
const MICROS_PER_HOUR: i64 = 3_600_000_000;
const STALE_AFTER_HOURS: i64 = 168;
#[derive(Subcommand, Debug)]
pub enum CommsAgentCmd {
Register {
#[arg(long)]
name: Option<String>,
#[arg(long)]
description: Option<String>,
#[arg(long)]
version: Option<String>,
#[arg(long = "skill")]
skills: Vec<String>,
#[arg(long)]
as_agent: Option<String>,
},
Agents {
#[arg(long)]
room: Option<String>,
#[arg(long)]
as_agent: Option<String>,
},
RoomCreate {
room: String,
#[arg(long, default_value = "global")]
scope: String,
#[arg(long)]
title: Option<String>,
#[arg(long)]
as_agent: Option<String>,
},
Rooms,
RoomForPath {
path: String,
#[arg(long)]
as_agent: Option<String>,
},
Join {
room: String,
#[arg(long)]
as_agent: Option<String>,
},
Leave {
room: String,
#[arg(long)]
as_agent: Option<String>,
},
Post {
room: String,
subject: String,
#[arg(long)]
body: Option<String>,
#[arg(long = "tag")]
tags: Vec<String>,
#[arg(long)]
reply_to: Option<String>,
#[arg(long)]
as_agent: Option<String>,
},
Dm {
#[arg(long)]
to: String,
#[arg(long)]
subject: String,
#[arg(long)]
body: Option<String>,
#[arg(long)]
reply_to: Option<String>,
#[arg(long)]
as_agent: Option<String>,
},
History {
room: String,
#[arg(long)]
cursor: Option<String>,
#[arg(long)]
limit: Option<u32>,
#[arg(long)]
since_hours: Option<u32>,
#[arg(long)]
as_agent: Option<String>,
},
Read {
message_id: String,
},
Inbox {
#[arg(long)]
cursor: Option<String>,
#[arg(long)]
limit: Option<u32>,
#[arg(long)]
mark_read: bool,
#[arg(long)]
since_hours: Option<u32>,
#[arg(long)]
as_agent: Option<String>,
},
}
fn parse_scope(raw: &str) -> Result<RoomScope> {
if raw == "global" {
return Ok(RoomScope::Global);
}
if let Some(url) = raw.strip_prefix("remote:") {
return Ok(RoomScope::Remote(url.to_string()));
}
if let Some(path) = raw.strip_prefix("path:") {
return Ok(RoomScope::PathPrefix(std::path::PathBuf::from(path)));
}
anyhow::bail!("invalid --scope {raw:?}: expected `global`, `remote:<url>`, or `path:<dir>`")
}
fn clamp_limit(limit: Option<u32>) -> u32 {
limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT)
}
fn since_cutoff(since_hours: Option<u32>) -> Option<i64> {
let hours = since_hours.unwrap_or(DEFAULT_SINCE_HOURS);
if hours == 0 {
None
} else {
Some(crate::comms::model::now_micros() - i64::from(hours) * MICROS_PER_HOUR)
}
}
fn cli_agent_id(root: &Path) -> Result<AgentId> {
if let Ok(raw) = std::env::var("BASEMIND_AGENT_ID")
&& let Ok(id) = AgentId::parse(raw)
{
return Ok(id);
}
if let Ok(existing) = std::fs::read_to_string(root.join(".basemind").join("agent-id"))
&& let Ok(id) = AgentId::parse(existing.trim())
{
return Ok(id);
}
AgentId::parse("basemind-cli").context("construct CLI agent id")
}
pub fn run(root: &Path, json: bool, cmd: CommsAgentCmd) -> Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build tokio runtime")?;
runtime.block_on(async move {
let mut out = std::io::stdout().lock();
dispatch(root, json, cmd, &mut out).await
})
}
async fn connect_as(root: &Path, as_agent: Option<String>) -> Result<CommsClient> {
let agent = match as_agent {
Some(raw) => {
AgentId::parse(raw.clone()).with_context(|| format!("invalid --as-agent {raw:?}"))?
}
None => cli_agent_id(root)?,
};
let (remote, cwd) = scope_context_for(root);
CommsClient::ensure_and_connect(agent, remote, cwd)
.await
.map_err(|e| anyhow::anyhow!("connect to comms daemon: {e}"))
}
async fn dispatch(root: &Path, json: bool, cmd: CommsAgentCmd, out: &mut impl Write) -> Result<()> {
match cmd {
CommsAgentCmd::Register {
name,
description,
version,
skills,
as_agent,
} => {
let mut client = connect_as(root, as_agent).await?;
let card = AgentCard {
name: name.unwrap_or_default(),
description: description.unwrap_or_default(),
version: version.unwrap_or_default(),
skills,
};
let agent_id = client.agent().as_str().to_string();
client
.register_agent(card)
.await
.map_err(|e| anyhow::anyhow!("register: {e}"))?;
if json {
writeln!(
out,
"{}",
json!({ "agent_id": agent_id, "registered": true })
)?;
} else {
writeln!(out, "registered as {agent_id}")?;
}
}
CommsAgentCmd::Agents { room, as_agent } => {
let mut client = connect_as(root, as_agent).await?;
let room = room.map(RoomId::parse).transpose().context("room id")?;
let agents = client
.list_agents(room)
.await
.map_err(|e| anyhow::anyhow!("list agents: {e}"))?;
if json {
let rows: Vec<_> = agents
.iter()
.map(|a| {
json!({
"agent_id": a.agent_id.as_str(),
"name": a.card.name,
"version": a.card.version,
})
})
.collect();
writeln!(out, "{}", json!({ "total": rows.len(), "agents": rows }))?;
} else if agents.is_empty() {
writeln!(out, "no agents")?;
} else {
for a in &agents {
writeln!(
out,
"{}\t{}\t{}",
a.agent_id.as_str(),
a.card.name,
a.card.version
)?;
}
}
}
CommsAgentCmd::RoomCreate {
room,
scope,
title,
as_agent,
} => {
let mut client = connect_as(root, as_agent).await?;
let room_id = RoomId::parse(room).context("room id")?;
let scope = parse_scope(&scope)?;
let created = client
.create_room(room_id, scope, title)
.await
.map_err(|e| anyhow::anyhow!("create room: {e}"))?;
render_room(&created, json, out)?;
}
CommsAgentCmd::Rooms => {
let mut client = connect_as(root, None).await?;
let (remote, cwd) = scope_context_for(root);
let rooms = client
.list_rooms(remote, cwd)
.await
.map_err(|e| anyhow::anyhow!("list rooms: {e}"))?;
let now = crate::comms::model::now_micros();
if json {
let rows: Vec<_> = rooms.iter().map(|r| room_json(r, now)).collect();
writeln!(out, "{}", json!({ "total": rows.len(), "rooms": rows }))?;
} else if rooms.is_empty() {
writeln!(out, "no rooms")?;
} else {
for r in &rooms {
let marker = if is_stale(r, now) { "STALE" } else { "ACTIVE" };
writeln!(
out,
"{}\t{}\t{}\t{}",
r.room_id.as_str(),
r.title,
r.last_activity,
marker
)?;
}
}
}
CommsAgentCmd::RoomForPath { path, as_agent } => {
room_for_path(root, json, path, as_agent, out).await?;
}
CommsAgentCmd::Join { room, as_agent } => {
let mut client = connect_as(root, as_agent).await?;
let room_id = RoomId::parse(room).context("room id")?;
let label = room_id.as_str().to_string();
client
.join_room(room_id)
.await
.map_err(|e| anyhow::anyhow!("join: {e}"))?;
if json {
writeln!(out, "{}", json!({ "room": label, "joined": true }))?;
} else {
writeln!(out, "joined {label}")?;
}
}
CommsAgentCmd::Leave { room, as_agent } => {
let mut client = connect_as(root, as_agent).await?;
let room_id = RoomId::parse(room).context("room id")?;
let label = room_id.as_str().to_string();
client
.leave_room(room_id)
.await
.map_err(|e| anyhow::anyhow!("leave: {e}"))?;
if json {
writeln!(out, "{}", json!({ "room": label, "left": true }))?;
} else {
writeln!(out, "left {label}")?;
}
}
CommsAgentCmd::Post {
room,
subject,
body,
tags,
reply_to,
as_agent,
} => {
let mut client = connect_as(root, as_agent).await?;
let room_id = RoomId::parse(room).context("room id")?;
let body = body.unwrap_or_default().into_bytes();
let message_id = client
.post_message(room_id, subject, body, tags, reply_to, Vec::new())
.await
.map_err(|e| anyhow::anyhow!("post: {e}"))?;
if json {
writeln!(out, "{}", json!({ "message_id": message_id }))?;
} else {
writeln!(out, "{message_id}")?;
}
}
CommsAgentCmd::Dm {
to,
subject,
body,
reply_to,
as_agent,
} => {
dm(root, json, to, subject, body, reply_to, as_agent, out).await?;
}
CommsAgentCmd::History {
room,
cursor,
limit,
since_hours,
as_agent,
} => {
let mut client = connect_as(root, as_agent).await?;
let room_id = RoomId::parse(room).context("room id")?;
let (messages, next_cursor) = client
.read_history(
room_id,
cursor.map(Cursor),
clamp_limit(limit),
since_cutoff(since_hours),
)
.await
.map_err(|e| anyhow::anyhow!("history: {e}"))?;
render_front_matter(&messages, next_cursor.as_ref(), None, json, out)?;
}
CommsAgentCmd::Read { message_id } => {
let mut client = connect_as(root, None).await?;
let id = message_id.clone();
let body = client
.get_body(message_id)
.await
.map_err(|e| anyhow::anyhow!("read: {e}"))?;
let text = body.map(|b| String::from_utf8_lossy(&b).into_owned());
if json {
writeln!(
out,
"{}",
json!({ "message_id": id, "found": text.is_some(), "body": text })
)?;
} else {
match text {
Some(b) => writeln!(out, "{b}")?,
None => writeln!(out, "(no such message)")?,
}
}
}
CommsAgentCmd::Inbox {
cursor,
limit,
mark_read,
since_hours,
as_agent,
} => {
let mut client = connect_as(root, as_agent).await?;
let (remote, cwd) = scope_context_for(root);
let (messages, unread, next_cursor) = client
.read_inbox(
remote,
cwd,
cursor.map(Cursor),
clamp_limit(limit),
mark_read,
since_cutoff(since_hours),
)
.await
.map_err(|e| anyhow::anyhow!("inbox: {e}"))?;
render_front_matter(&messages, next_cursor.as_ref(), Some(unread), json, out)?;
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn dm(
root: &Path,
json: bool,
to: String,
subject: String,
body: Option<String>,
reply_to: Option<String>,
as_agent: Option<String>,
out: &mut impl Write,
) -> Result<()> {
let from_agent = match &as_agent {
Some(raw) => {
AgentId::parse(raw.clone()).with_context(|| format!("invalid --as-agent {raw:?}"))?
}
None => cli_agent_id(root)?,
};
let to_agent = AgentId::parse(to.clone()).with_context(|| format!("invalid --to {to:?}"))?;
if from_agent == to_agent {
anyhow::bail!("cannot dm yourself");
}
let (lo, hi) = if from_agent.as_str() <= to_agent.as_str() {
(from_agent.as_str(), to_agent.as_str())
} else {
(to_agent.as_str(), from_agent.as_str())
};
let room = RoomId::parse(format!("dm:{lo}:{hi}")).context("derive dm room id")?;
let dm_scope = RoomScope::Session(format!("dm:{lo}:{hi}"));
let title = format!("dm {lo} <-> {hi}");
let mut sender = connect_as(root, as_agent).await?;
sender
.create_room(room.clone(), dm_scope, Some(title))
.await
.map_err(|e| anyhow::anyhow!("create dm room: {e}"))?;
sender
.join_room(room.clone())
.await
.map_err(|e| anyhow::anyhow!("sender join: {e}"))?;
let mut recipient = connect_as(root, Some(to_agent.as_str().to_string())).await?;
recipient
.join_room(room.clone())
.await
.map_err(|e| anyhow::anyhow!("recipient join: {e}"))?;
let body = body.unwrap_or_default().into_bytes();
let message_id = sender
.post_message(
room.clone(),
subject,
body,
Vec::new(),
reply_to,
Vec::new(),
)
.await
.map_err(|e| anyhow::anyhow!("dm post: {e}"))?;
let room_label = room.into_string();
if json {
writeln!(
out,
"{}",
json!({ "message_id": message_id, "room": room_label })
)?;
} else {
writeln!(out, "{message_id}\t{room_label}")?;
}
Ok(())
}
async fn room_for_path(
root: &Path,
json: bool,
path: String,
as_agent: Option<String>,
out: &mut impl Write,
) -> Result<()> {
let base = crate::git::Repo::discover(Path::new(&path))
.ok()
.map(|r| r.workdir().to_path_buf())
.unwrap_or_else(|| std::path::PathBuf::from(&path));
let (remote, cwd) = scope_context_for(&base);
let room = crate::comms::daemon::repo_room_for(remote, cwd);
let scope_label = match &room.scope {
RoomScope::Remote(_) => "remote",
RoomScope::PathPrefix(_) => "path",
RoomScope::Session(_) => "session",
RoomScope::Global => "global",
};
let mut client = connect_as(root, as_agent).await?;
client
.create_room(
room.room_id.clone(),
room.scope.clone(),
Some(room.title.clone()),
)
.await
.map_err(|e| anyhow::anyhow!("create room: {e}"))?;
client
.join_room(room.room_id.clone())
.await
.map_err(|e| anyhow::anyhow!("join: {e}"))?;
let room_label = room.room_id.as_str().to_string();
if json {
writeln!(
out,
"{}",
json!({ "room": room_label, "scope": scope_label, "title": room.title })
)?;
} else {
writeln!(out, "{room_label}\t{scope_label}")?;
}
Ok(())
}
fn is_stale(room: &Room, now_micros: i64) -> bool {
room.last_activity == 0
|| (now_micros - room.last_activity) > STALE_AFTER_HOURS * MICROS_PER_HOUR
}
fn room_json(room: &Room, now_micros: i64) -> serde_json::Value {
json!({
"room_id": room.room_id.as_str(),
"title": room.title,
"created_at": room.created_at,
"last_activity": room.last_activity,
"stale": is_stale(room, now_micros),
})
}
fn render_room(room: &Room, json: bool, out: &mut impl Write) -> Result<()> {
if json {
writeln!(
out,
"{}",
json!({ "room": room_json(room, crate::comms::model::now_micros()) })
)?;
} else {
writeln!(out, "{}\t{}", room.room_id.as_str(), room.title)?;
}
Ok(())
}
fn render_front_matter(
messages: &[SeqMeta],
next_cursor: Option<&Cursor>,
unread: Option<u32>,
json: bool,
out: &mut impl Write,
) -> Result<()> {
if json {
let rows: Vec<_> = messages
.iter()
.map(|sm| {
let m = &sm.meta;
json!({
"id": m.id,
"room": m.room.as_str(),
"from": m.from.as_str(),
"ts_micros": m.ts_micros,
"subject": m.subject,
"tags": m.tags,
"scope": m.scope,
"reply_to": m.reply_to,
"seq": sm.seq,
"body_len": m.body_len,
})
})
.collect();
let mut obj = json!({ "total": rows.len(), "messages": rows });
if let Some(u) = unread {
obj["unread"] = json!(u);
}
if let Some(c) = next_cursor {
obj["next_cursor"] = json!(c.0);
}
writeln!(out, "{obj}")?;
return Ok(());
}
if let Some(u) = unread {
writeln!(out, "unread: {u}")?;
}
if messages.is_empty() {
writeln!(out, "(no messages)")?;
} else {
for sm in messages {
let m = &sm.meta;
writeln!(
out,
"{}\t{}\t{}\t{}",
m.subject,
m.from.as_str(),
m.ts_micros,
m.id
)?;
}
}
if let Some(c) = next_cursor {
writeln!(out, "next_cursor: {}", c.0)?;
}
Ok(())
}