use crate::herdr_orchestration::{load_inventory, Inventory, PaneView};
use crate::{CliError, CliResult};
use clap::Args;
use std::fs;
use std::io::IsTerminal;
use std::process::Command;
#[derive(Debug, Clone)]
pub(crate) struct SelfContext {
pub agent: String,
pub pane_id: String,
pub workspace_id: String,
pub tab: Option<String>,
pub cwd: Option<String>,
}
#[derive(Debug, Args)]
pub struct WhoamiArgs {
#[arg(long, default_value = "table", value_enum, help = "Output format.")]
pub format: crate::herdr_orchestration::OutputFormat,
#[arg(long, default_value = "herdr", help = "herdr executable path.")]
pub herdr_bin: String,
}
pub(crate) fn resolve_target_in(
inventory: &Inventory,
agent: &str,
workspace: Option<&str>,
) -> CliResult<(String, String)> {
let candidates: Vec<&PaneView> = inventory
.workspaces
.iter()
.filter(|w| match workspace {
Some(selector) => workspace_matches(w, selector),
None => true,
})
.flat_map(|w| w.tabs.iter())
.flat_map(|t| t.panes.iter())
.filter(|pane| pane.agent.as_deref() == Some(agent))
.collect();
match candidates.as_slice() {
[one] => Ok((agent.to_string(), one.pane_id.clone())),
[] => {
let scope = match workspace {
Some(w) => format!(" in workspace {w}"),
None => String::new(),
};
Err(CliError::usage(format!(
"no live pane for agent {agent}{scope}"
)))
}
many => {
let listing = many
.iter()
.map(|pane| format!("{}:{}", pane.workspace_id, pane.pane_id))
.collect::<Vec<_>>()
.join(", ");
Err(CliError::usage(format!(
"agent {agent} is ambiguous across live panes ({listing}); pass --to-pane <id> or --workspace"
)))
}
}
}
fn workspace_matches(
workspace: &crate::herdr_orchestration::WorkspaceView,
selector: &str,
) -> bool {
workspace.workspace_id == selector
|| workspace.label.as_deref() == Some(selector)
|| workspace.number.is_some_and(|n| n.to_string() == selector)
}
pub(crate) fn resolve_target(
herdr_bin: &str,
agent: &str,
workspace: Option<&str>,
) -> CliResult<(String, String)> {
let inventory = load_inventory(herdr_bin)?;
resolve_target_in(&inventory, agent, workspace)
}
pub(crate) fn resolve_self(herdr_bin: &str) -> CliResult<SelfContext> {
let pane_env = std::env::var("HERDR_PANE_ID").map_err(|_| {
CliError::failure("cannot resolve self: HERDR_PANE_ID is not set (run inside a Herdr pane)")
})?;
if pane_env.trim().is_empty() {
return Err(CliError::failure(
"cannot resolve self: HERDR_PANE_ID is empty",
));
}
let output = Command::new(herdr_bin)
.args(["pane", "get", &pane_env])
.output()
.map_err(|error| {
if error.kind() == std::io::ErrorKind::NotFound {
CliError::with_code(127, format!("herdr CLI not found at {herdr_bin}"))
} else {
CliError::failure(format!("failed to run herdr pane get: {error}"))
}
})?;
if !output.status.success() {
return Err(CliError::failure(format!(
"cannot resolve self: herdr pane get {pane_env} failed: {}",
String::from_utf8_lossy(&output.stderr).trim()
)));
}
let stdout = String::from_utf8(output.stdout)
.map_err(|error| CliError::failure(format!("herdr pane get output not utf-8: {error}")))?;
parse_self_from_pane_get(&stdout)
}
fn parse_self_from_pane_get(stdout: &str) -> CliResult<SelfContext> {
let value: serde_json::Value = serde_json::from_str(stdout).map_err(|error| {
CliError::failure(format!("failed to parse herdr pane get JSON: {error}"))
})?;
let pane = &value["result"]["pane"];
let agent = pane["agent"]
.as_str()
.filter(|s| !s.is_empty())
.ok_or_else(|| {
CliError::failure("herdr pane get JSON missing result.pane.agent (cannot resolve self)")
})?
.to_string();
let pane_id = pane["pane_id"]
.as_str()
.filter(|s| !s.is_empty())
.ok_or_else(|| CliError::failure("herdr pane get JSON missing result.pane.pane_id"))?
.to_string();
let workspace_id = pane["workspace_id"]
.as_str()
.filter(|s| !s.is_empty())
.ok_or_else(|| CliError::failure("herdr pane get JSON missing result.pane.workspace_id"))?
.to_string();
let tab = pane["tab_id"].as_str().map(str::to_string);
let cwd = pane["foreground_cwd"]
.as_str()
.or_else(|| pane["cwd"].as_str())
.map(str::to_string);
Ok(SelfContext {
agent,
pane_id,
workspace_id,
tab,
cwd,
})
}
pub fn run_whoami(args: WhoamiArgs) -> CliResult<()> {
use crate::herdr_orchestration::OutputFormat;
let me = resolve_self(&args.herdr_bin)?;
let inventory = load_inventory(&args.herdr_bin)?;
let peers = peer_summary(&inventory, &me);
match args.format {
OutputFormat::Json => {
let value = serde_json::json!({
"agent": me.agent,
"pane_id": me.pane_id,
"workspace_id": me.workspace_id,
"tab": me.tab,
"cwd": me.cwd,
"peers": peers.iter().map(|p| serde_json::json!({
"agent": p.agent,
"pane_id": p.pane_id,
"workspace_id": p.workspace_id,
"uniquely_addressable": p.uniquely_addressable,
})).collect::<Vec<_>>(),
});
println!(
"{}",
serde_json::to_string_pretty(&value).map_err(|error| {
CliError::failure(format!("failed to render whoami JSON: {error}"))
})?
);
}
OutputFormat::Table => {
println!("agent {}", me.agent);
println!("pane_id {}", me.pane_id);
println!("workspace {}", me.workspace_id);
println!("tab {}", me.tab.as_deref().unwrap_or("-"));
println!("cwd {}", me.cwd.as_deref().unwrap_or("-"));
println!();
println!(
"{:<12} {:<18} {:<12} ADDRESSABLE",
"PEER", "PANE", "WORKSPACE"
);
for peer in &peers {
println!(
"{:<12} {:<18} {:<12} {}",
peer.agent,
peer.pane_id,
peer.workspace_id,
if peer.uniquely_addressable {
"yes"
} else {
"no (ambiguous)"
}
);
}
}
}
Ok(())
}
struct PeerSummary {
agent: String,
pane_id: String,
workspace_id: String,
uniquely_addressable: bool,
}
fn peer_summary(inventory: &Inventory, me: &SelfContext) -> Vec<PeerSummary> {
use std::collections::HashMap;
let mut agent_counts: HashMap<&str, usize> = HashMap::new();
for pane in all_panes(inventory) {
if let Some(agent) = pane.agent.as_deref() {
*agent_counts.entry(agent).or_insert(0) += 1;
}
}
let mut peers = Vec::new();
for pane in all_panes(inventory) {
let Some(agent) = pane.agent.as_deref() else {
continue;
};
if pane.pane_id == me.pane_id {
continue;
}
peers.push(PeerSummary {
agent: agent.to_string(),
pane_id: pane.pane_id.clone(),
workspace_id: pane.workspace_id.clone(),
uniquely_addressable: agent_counts.get(agent).copied().unwrap_or(0) == 1,
});
}
peers
}
fn all_panes(inventory: &Inventory) -> impl Iterator<Item = &PaneView> {
inventory
.workspaces
.iter()
.flat_map(|w| w.tabs.iter())
.flat_map(|t| t.panes.iter())
}
#[derive(Debug, Args)]
pub struct ReplyArgs {
#[arg(help = "Parent message id to reply to (sets re=<mid>; routing inferred from it).")]
pub mid: String,
#[arg(
long = "type",
help = "Message type (ADR 040 D3: NEVER inferred — the default reply to a request-review is not approve)."
)]
pub message_type: Option<String>,
#[arg(
long,
help = "Artifact reference (ADR 040 D3: NOT defaulted from the parent — a reply's artifact usually differs)."
)]
pub r#ref: Option<String>,
#[arg(
long,
help = "Collaboration mode; defaults to the parent's mode, else the message default (ADR 040 D3)."
)]
pub mode: Option<String>,
#[arg(
long,
help = "Reply body inline; otherwise --body-file, otherwise stdin (when not a TTY)."
)]
pub body: Option<String>,
#[arg(
long,
help = "Reply body file ('-' reads stdin); otherwise stdin when not a TTY."
)]
pub body_file: Option<std::path::PathBuf>,
#[arg(long = "reply-mid", help = "Override the auto-minted reply mid.")]
pub reply_mid: Option<String>,
#[arg(
long,
help = "Disambiguate the parent when the mid is reused across sessions (ADR 001): scope the parent lookup to this session. Required when a cross-session mid collision is detected."
)]
pub session_id: Option<String>,
#[arg(
long,
default_value = "outputs",
help = "Audit artifact root (read for the parent, written for the reply)."
)]
pub root: std::path::PathBuf,
#[arg(
long,
help = "Live DB path for the parent lookup + reply projection; defaults to cwd .zynk/zynk.db."
)]
pub db: Option<std::path::PathBuf>,
#[arg(
long,
help = "Force file-only (no DB for the parent lookup or the reply projection)."
)]
pub no_db: bool,
#[arg(
long,
help = "Opt out of the audited reply (send only; no audit/corpus record)."
)]
pub no_audit: bool,
#[arg(
long,
default_value = "agent",
value_parser = ["agent", "operator", "helper-tool", "unknown"],
help = "Who originated this reply, for the audit record."
)]
pub command_origin: String,
#[arg(
long,
default_value = "full",
help = "Redaction policy for the audited reply payload."
)]
pub payload_redaction_policy: String,
#[arg(
long,
help = "Preview the inferred routing + composed message without sending; writes no audit/outputs (--type still required)."
)]
pub dry_run: bool,
#[arg(long, default_value = "herdr", help = "herdr executable path.")]
pub herdr_bin: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ParentContext {
pub source_agent: String,
pub session_id: String,
pub mode: Option<String>,
pub workspace_id: String,
}
pub fn run_reply(args: ReplyArgs) -> CliResult<()> {
let message_type = args.message_type.clone().ok_or_else(|| {
CliError::usage("reply requires --type (ADR 040 D3: the type is never inferred)")
})?;
let db_path = resolve_db_path(args.db.as_deref(), args.no_db);
let parent = load_parent(
&args.mid,
db_path.as_deref(),
&args.root,
args.session_id.as_deref(),
)?;
let me = resolve_self(&args.herdr_bin)?;
let (peer_agent, peer_pane) = resolve_target(
&args.herdr_bin,
&parent.source_agent,
Some(parent.workspace_id.as_str()),
)?;
let mode = args.mode.clone().or_else(|| parent.mode.clone());
let body = read_body(args.body.as_deref(), args.body_file.as_deref())?;
let send_args = build_send_args(BuildSend {
herdr_bin: args.herdr_bin,
me: &me,
peer_agent: &peer_agent,
peer_pane: &peer_pane,
mid: args.reply_mid.clone(),
message_type,
re: Some(args.mid.clone()),
r#ref: args.r#ref.clone(),
mode,
session_id: parent.session_id.clone(),
body,
command_origin: args.command_origin,
payload_redaction_policy: args.payload_redaction_policy,
no_audit: args.no_audit,
dry_run: args.dry_run,
db: args.db,
no_db: args.no_db,
root: args.root,
})?;
crate::send_herdr::run(send_args)
}
fn resolve_db_path(db: Option<&std::path::Path>, no_db: bool) -> Option<std::path::PathBuf> {
if no_db {
return None;
}
if let Some(db) = db {
return Some(db.to_path_buf());
}
let default = std::path::Path::new(".zynk/zynk.db");
default.exists().then(|| default.to_path_buf())
}
pub(crate) fn load_parent(
mid: &str,
db: Option<&std::path::Path>,
root: &std::path::Path,
session: Option<&str>,
) -> CliResult<ParentContext> {
use std::collections::BTreeMap;
let from_db: BTreeMap<String, ParentContext> = match db {
Some(path) if path.exists() => load_parent_candidates_from_db(mid, path)?,
_ => BTreeMap::new(),
};
let from_file = load_parent_candidates_from_file(mid, root)?;
let mut sessions: BTreeMap<String, ParentContext> = from_db;
for (session_id, file_ctx) in from_file {
match sessions.get(&session_id) {
Some(db_ctx) if *db_ctx != file_ctx => {
eprintln!(
"warning: parent {mid} in session {session_id} disagrees between the DB and the outputs file; resolving from the file (ADR 027/040 D7 conflict authority)"
);
sessions.insert(session_id, file_ctx);
}
Some(_) => {} None => {
sessions.insert(session_id, file_ctx);
}
}
}
if let Some(session) = session {
return sessions.remove(session).ok_or_else(|| {
CliError::usage(format!(
"no parent message found for mid {mid:?} in session {session:?} (looked in the DB and {})",
root.display()
))
});
}
match sessions.len() {
0 => Err(CliError::usage(format!(
"no parent message found for mid {mid:?} (looked in the DB and {})",
root.display()
))),
1 => Ok(sessions.into_values().next().expect("len==1")),
_ => {
let candidates = sessions.keys().cloned().collect::<Vec<_>>().join(", ");
Err(CliError::usage(format!(
"mid {mid:?} is ambiguous across {} sessions ({candidates}); pass --session-id <session> to disambiguate (ADR 001: a mid is unique only within a session)",
sessions.len()
)))
}
}
}
fn load_parent_candidates_from_db(
mid: &str,
db: &std::path::Path,
) -> CliResult<std::collections::BTreeMap<String, ParentContext>> {
let connection = crate::db::open_read_database(db)?;
let mut statement = connection
.prepare(
"SELECT session_id, source_agent_id, mode, workspace_id FROM (
SELECT session_id, source_agent_id, mode, workspace_id,
ROW_NUMBER() OVER (
PARTITION BY session_id
ORDER BY timestamp ASC, audit_id ASC
) AS rn
FROM audit_records WHERE mid = ?1
) WHERE rn = 1",
)
.map_err(|error| CliError::failure(format!("failed to query parent {mid}: {error}")))?;
let rows = statement
.query_map(rusqlite::params![mid], |row| {
Ok((
row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?, row.get::<_, Option<String>>(2)?, row.get::<_, String>(3)?, ))
})
.map_err(|error| CliError::failure(format!("failed to read parent {mid}: {error}")))?;
let mut out = std::collections::BTreeMap::new();
for row in rows {
let (session_id, source_agent, mode, workspace_id) = row
.map_err(|error| CliError::failure(format!("failed to read parent {mid}: {error}")))?;
let source_agent = source_agent.ok_or_else(|| {
CliError::failure(format!(
"parent {mid} in session {session_id} has no source_agent in the DB; cannot infer the reply target"
))
})?;
out.insert(
session_id.clone(),
ParentContext {
source_agent,
session_id,
mode,
workspace_id,
},
);
}
Ok(out)
}
fn load_parent_candidates_from_file(
mid: &str,
root: &std::path::Path,
) -> CliResult<std::collections::BTreeMap<String, ParentContext>> {
let mut out = std::collections::BTreeMap::new();
let sessions = root.join("sessions");
if !sessions.is_dir() {
return Ok(out);
}
for entry in fs::read_dir(&sessions).map_err(|error| {
CliError::failure(format!("failed to read {}: {error}", sessions.display()))
})? {
let entry = entry
.map_err(|error| CliError::failure(format!("failed to read session dir: {error}")))?;
let audit_path = entry.path().join("audit.md");
if !audit_path.is_file() {
continue;
}
let content = fs::read_to_string(&audit_path).map_err(|error| {
CliError::failure(format!("failed to read {}: {error}", audit_path.display()))
})?;
for block in parse_audit_blocks_full(&content) {
if block.get("mid").map(String::as_str) != Some(mid) {
continue;
}
let source_agent = block.get("source_agent").cloned().ok_or_else(|| {
CliError::failure(format!("parent {mid} record is missing source_agent"))
})?;
let session_id = block.get("session_id").cloned().ok_or_else(|| {
CliError::failure(format!("parent {mid} record is missing session_id"))
})?;
let workspace_id = block.get("workspace_id").cloned().ok_or_else(|| {
CliError::failure(format!("parent {mid} record is missing workspace_id"))
})?;
out.entry(session_id.clone()).or_insert(ParentContext {
source_agent,
session_id,
mode: block.get("mode").cloned(),
workspace_id,
});
}
}
Ok(out)
}
fn read_body(body: Option<&str>, body_file: Option<&std::path::Path>) -> CliResult<String> {
use std::io::Read;
let raw = if let Some(body) = body {
body.to_string()
} else if let Some(path) = body_file {
if path.as_os_str() == "-" {
read_stdin_body()?
} else {
fs::read_to_string(path).map_err(|error| {
CliError::failure(format!(
"failed to read body file {}: {error}",
path.display()
))
})?
}
} else {
if std::io::stdin().is_terminal() {
return Err(CliError::usage(
"no body: pass --body, --body-file <path>, or pipe the body on stdin",
));
}
let mut buffer = String::new();
std::io::stdin()
.read_to_string(&mut buffer)
.map_err(|error| {
CliError::failure(format!("failed to read body from stdin: {error}"))
})?;
buffer
};
Ok(raw.replace("\r\n", "\n").replace('\r', "\n"))
}
fn read_stdin_body() -> CliResult<String> {
use std::io::Read;
if std::io::stdin().is_terminal() {
return Err(CliError::usage(
"--body-file - expects the body on stdin, but stdin is a TTY",
));
}
let mut buffer = String::new();
std::io::stdin()
.read_to_string(&mut buffer)
.map_err(|error| CliError::failure(format!("failed to read body from stdin: {error}")))?;
Ok(buffer)
}
struct BuildSend<'a> {
herdr_bin: String,
me: &'a SelfContext,
peer_agent: &'a str,
peer_pane: &'a str,
mid: Option<String>,
message_type: String,
re: Option<String>,
r#ref: Option<String>,
mode: Option<String>,
session_id: String,
body: String,
command_origin: String,
payload_redaction_policy: String,
no_audit: bool,
dry_run: bool,
db: Option<std::path::PathBuf>,
no_db: bool,
root: std::path::PathBuf,
}
fn build_send_args(spec: BuildSend<'_>) -> CliResult<crate::send_herdr::SendHerdrArgs> {
let mid = spec.mid.unwrap_or_else(crate::dashboard_write::mint_mid);
let from = format!("{}:{}", spec.me.agent, spec.me.pane_id);
let to = format!("{}:{}", spec.peer_agent, spec.peer_pane);
let compose = crate::compose::ComposeArgs {
profile: None,
from: Some(from),
to: Some(to),
mid: Some(mid),
message_type: Some(spec.message_type),
shorthand: None,
r#ref: spec.r#ref,
re: spec.re,
due: None,
mode: spec.mode,
transport: None,
body: Some(spec.body),
body_file: None,
field: Vec::new(),
var: Vec::new(),
};
Ok(crate::send_herdr::SendHerdrArgs {
pane: spec.peer_pane.to_string(),
dry_run: spec.dry_run,
herdr_bin: spec.herdr_bin,
session_id: Some(spec.session_id),
no_audit: spec.no_audit,
root: spec.root,
command_origin: spec.command_origin,
payload_redaction_policy: spec.payload_redaction_policy,
sensitive_category: None,
db: spec.db,
no_db: spec.no_db,
retain_custody: false,
custody_key_file: None,
compose,
})
}
#[derive(Debug, Args)]
pub struct SendAgentArgs {
#[arg(help = "Target agent name; resolved to exactly one LIVE pane (ADR 040 D2).")]
pub agent: String,
#[arg(
long,
help = "Pin the exact target pane id (verified to be occupied by <agent>); disambiguates duplicates."
)]
pub to_pane: Option<String>,
#[arg(
long,
help = "Restrict resolution to this workspace (id, label, or number); disambiguates cross-workspace duplicates."
)]
pub workspace: Option<String>,
#[arg(long = "type", help = "Message type (required; never inferred).")]
pub message_type: Option<String>,
#[arg(long, help = "Session id for the audited send (required).")]
pub session_id: Option<String>,
#[arg(long, help = "Artifact reference (optional).")]
pub r#ref: Option<String>,
#[arg(
long,
help = "Collaboration mode (optional; the message default applies otherwise)."
)]
pub mode: Option<String>,
#[arg(
long,
help = "Message body inline; otherwise --body-file, otherwise stdin."
)]
pub body: Option<String>,
#[arg(
long,
help = "Body file ('-' reads stdin); otherwise stdin when not a TTY."
)]
pub body_file: Option<std::path::PathBuf>,
#[arg(long, help = "Message id; auto-minted (op-<hex>) when omitted.")]
pub mid: Option<String>,
#[arg(
long,
default_value = "outputs",
help = "Audit artifact root for the send."
)]
pub root: std::path::PathBuf,
#[arg(
long,
help = "Live DB path for the projection; defaults to cwd .zynk/zynk.db."
)]
pub db: Option<std::path::PathBuf>,
#[arg(long, help = "Force file-only (skip the DB projection).")]
pub no_db: bool,
#[arg(
long,
help = "Opt out of the audited send (send only; no audit/corpus record)."
)]
pub no_audit: bool,
#[arg(
long,
default_value = "agent",
value_parser = ["agent", "operator", "helper-tool", "unknown"],
help = "Who originated this send, for the audit record."
)]
pub command_origin: String,
#[arg(
long,
default_value = "full",
help = "Redaction policy for the audited payload."
)]
pub payload_redaction_policy: String,
#[arg(
long,
help = "Preview the inferred routing + composed message without sending; writes no audit/outputs (--type/--session-id still required)."
)]
pub dry_run: bool,
#[arg(long, default_value = "herdr", help = "herdr executable path.")]
pub herdr_bin: String,
}
pub fn run_send_agent(args: SendAgentArgs) -> CliResult<()> {
let message_type = args.message_type.clone().ok_or_else(|| {
CliError::usage("send agent requires --type (the type is never inferred)")
})?;
let session_id = args.session_id.clone().ok_or_else(|| {
CliError::usage("send agent requires --session-id for the audited record")
})?;
let me = resolve_self(&args.herdr_bin)?;
let inventory = load_inventory(&args.herdr_bin)?;
let (peer_agent, peer_pane) = resolve_send_target(
&inventory,
&args.agent,
args.to_pane.as_deref(),
args.workspace.as_deref(),
)?;
let body = read_body(args.body.as_deref(), args.body_file.as_deref())?;
let send_args = build_send_args(BuildSend {
herdr_bin: args.herdr_bin,
me: &me,
peer_agent: &peer_agent,
peer_pane: &peer_pane,
mid: args.mid.clone(),
message_type,
re: None,
r#ref: args.r#ref.clone(),
mode: args.mode.clone(),
session_id,
body,
command_origin: args.command_origin,
payload_redaction_policy: args.payload_redaction_policy,
no_audit: args.no_audit,
dry_run: args.dry_run,
db: args.db,
no_db: args.no_db,
root: args.root,
})?;
crate::send_herdr::run(send_args)
}
fn resolve_send_target(
inventory: &Inventory,
agent: &str,
to_pane: Option<&str>,
workspace: Option<&str>,
) -> CliResult<(String, String)> {
if let Some(pane_id) = to_pane {
let pane = all_panes(inventory)
.find(|pane| pane.pane_id == pane_id)
.ok_or_else(|| {
CliError::usage(format!("--to-pane {pane_id} is not a live Herdr pane"))
})?;
match pane.agent.as_deref() {
Some(occupant) if occupant == agent => Ok((agent.to_string(), pane_id.to_string())),
Some(occupant) => Err(CliError::usage(format!(
"--to-pane {pane_id} is occupied by {occupant}, not {agent} (ADR 040 D2)"
))),
None => Err(CliError::usage(format!(
"--to-pane {pane_id} has no agent; cannot target {agent}"
))),
}
} else {
resolve_target_in(inventory, agent, workspace)
}
}
#[derive(Debug, Args)]
pub struct InboxArgs {
#[arg(
long,
help = "Show messages addressed to this agent; defaults to the live self (HERDR_PANE_ID)."
)]
pub me: Option<String>,
#[arg(long, help = "Restrict to one session id.")]
pub session: Option<String>,
#[arg(
long,
help = "BEST-EFFORT HEURISTIC: my request-* messages with no live re= reply yet (never an authoritative answered state)."
)]
pub unanswered: bool,
#[arg(
long,
default_value = "outputs",
help = "Audit artifact root for the file-only view."
)]
pub root: std::path::PathBuf,
#[arg(
long,
help = "DB path to read; defaults to cwd .zynk/zynk.db if it exists (read-only)."
)]
pub db: Option<std::path::PathBuf>,
#[arg(long, help = "Force the file-only view (ignore any DB).")]
pub no_db: bool,
#[arg(long, default_value = "table", value_enum, help = "Output format.")]
pub format: crate::herdr_orchestration::OutputFormat,
#[arg(
long,
default_value = "herdr",
help = "herdr executable path (only for self resolution)."
)]
pub herdr_bin: String,
}
#[derive(Debug, Args)]
pub struct ThreadArgs {
#[arg(
help = "Any message in the thread; shows the full connected re= component (its ancestors up to the root plus the descendants), oldest-first."
)]
pub mid: String,
#[arg(
long,
help = "Disambiguate when the mid is reused across sessions (ADR 001): scope the thread to this session. Required when a cross-session mid collision is detected."
)]
pub session_id: Option<String>,
#[arg(
long,
default_value = "outputs",
help = "Audit artifact root for the file-only view."
)]
pub root: std::path::PathBuf,
#[arg(
long,
help = "DB path to read; defaults to cwd .zynk/zynk.db if it exists (read-only)."
)]
pub db: Option<std::path::PathBuf>,
#[arg(long, help = "Force the file-only view (ignore any DB).")]
pub no_db: bool,
#[arg(long, default_value = "table", value_enum, help = "Output format.")]
pub format: crate::herdr_orchestration::OutputFormat,
}
#[derive(Debug, Clone, serde::Serialize)]
pub(crate) struct MessageRow {
pub mid: String,
pub session_id: String,
pub message_type: String,
pub source_agent: Option<String>,
pub target_agent: Option<String>,
pub r#ref: Option<String>,
pub re: Option<String>,
pub timestamp: String,
pub redaction_policy: String,
pub excerpt: Option<String>,
}
pub fn run_inbox(args: InboxArgs) -> CliResult<()> {
use crate::herdr_orchestration::OutputFormat;
let me = match args.me.clone() {
Some(me) => me,
None => resolve_self(&args.herdr_bin)?.agent,
};
let db = resolve_db_path(args.db.as_deref(), args.no_db);
let mut rows = load_messages(db.as_deref(), &args.root, args.session.as_deref())?;
rows.retain(|row| row.target_agent.as_deref() == Some(me.as_str()));
rows.sort_by(|a, b| b.timestamp.cmp(&a.timestamp).then(b.mid.cmp(&a.mid)));
let answered: std::collections::HashSet<(String, String)> = if args.unanswered {
let all = load_messages(db.as_deref(), &args.root, None)?;
all.iter()
.filter_map(|row| row.re.clone().map(|re| (row.session_id.clone(), re)))
.collect()
} else {
std::collections::HashSet::new()
};
if args.unanswered {
rows.retain(|row| {
row.message_type.starts_with("request-")
&& !answered.contains(&(row.session_id.clone(), row.mid.clone()))
});
}
match args.format {
OutputFormat::Json => print_rows_json(&rows)?,
OutputFormat::Table => {
if args.unanswered {
println!("# inbox --unanswered is a BEST-EFFORT HEURISTIC over the re= chain (not an authoritative answered state)");
}
print_rows_table(&rows, &format!("inbox for {me}"));
}
}
Ok(())
}
pub fn run_thread(args: ThreadArgs) -> CliResult<()> {
use crate::herdr_orchestration::OutputFormat;
let db = resolve_db_path(args.db.as_deref(), args.no_db);
let all = load_messages(db.as_deref(), &args.root, None)?;
let start_sessions: std::collections::BTreeSet<String> = all
.iter()
.filter(|row| row.mid == args.mid)
.map(|row| row.session_id.clone())
.collect();
let session = match (args.session_id.as_deref(), start_sessions.len()) {
(Some(session), _) => {
if !start_sessions.contains(session) {
return Err(CliError::usage(format!(
"no message found for mid {:?} in session {session:?} (looked in the DB and {})",
args.mid,
args.root.display()
)));
}
session.to_string()
}
(None, 0) => {
return Err(CliError::usage(format!(
"no message found for mid {:?} in the DB or {}",
args.mid,
args.root.display()
)));
}
(None, 1) => start_sessions.iter().next().expect("len==1").clone(),
(None, _) => {
let candidates = start_sessions
.iter()
.cloned()
.collect::<Vec<_>>()
.join(", ");
return Err(CliError::usage(format!(
"mid {:?} is ambiguous across {} sessions ({candidates}); pass --session-id <session> to disambiguate (ADR 001: a mid is unique only within a session)",
args.mid,
start_sessions.len()
)));
}
};
let scoped: Vec<MessageRow> = all
.into_iter()
.filter(|row| row.session_id == session)
.collect();
let members = connected_re_component(&scoped, &session, &args.mid);
let mut rows: Vec<MessageRow> = scoped
.iter()
.filter(|row| members.contains(&(row.session_id.clone(), row.mid.clone())))
.cloned()
.collect();
if rows.is_empty() {
return Err(CliError::usage(format!(
"no message found for mid {:?} in the DB or {}",
args.mid,
args.root.display()
)));
}
let depth = re_depth(&scoped);
rows.sort_by(|a, b| {
let ka = (a.session_id.clone(), a.mid.clone());
let kb = (b.session_id.clone(), b.mid.clone());
a.timestamp
.cmp(&b.timestamp)
.then(depth.get(&ka).cmp(&depth.get(&kb)))
.then(a.mid.cmp(&b.mid))
});
match args.format {
OutputFormat::Json => print_rows_json(&rows)?,
OutputFormat::Table => print_rows_table(&rows, &format!("thread {}", args.mid)),
}
Ok(())
}
fn connected_re_component(
all: &[MessageRow],
start_session: &str,
start_mid: &str,
) -> std::collections::HashSet<(String, String)> {
use std::collections::{HashMap, HashSet, VecDeque};
type Node = (String, String); let mut parent_of: HashMap<Node, Node> = HashMap::new();
let mut children_of: HashMap<Node, Vec<Node>> = HashMap::new();
for row in all {
if let Some(re) = row.re.as_deref() {
let child = (row.session_id.clone(), row.mid.clone());
let parent = (row.session_id.clone(), re.to_string());
parent_of.insert(child.clone(), parent.clone());
children_of.entry(parent).or_default().push(child);
}
}
let start = (start_session.to_string(), start_mid.to_string());
let mut seen: HashSet<Node> = HashSet::new();
let mut queue: VecDeque<Node> = VecDeque::new();
seen.insert(start.clone());
queue.push_back(start);
while let Some(node) = queue.pop_front() {
if let Some(parent) = parent_of.get(&node) {
if seen.insert(parent.clone()) {
queue.push_back(parent.clone());
}
}
if let Some(children) = children_of.get(&node) {
for child in children {
if seen.insert(child.clone()) {
queue.push_back(child.clone());
}
}
}
}
seen
}
fn re_depth(all: &[MessageRow]) -> std::collections::HashMap<(String, String), usize> {
use std::collections::{HashMap, HashSet};
type Node = (String, String);
let parent_of: HashMap<Node, Node> = all
.iter()
.filter_map(|row| {
row.re.as_deref().map(|re| {
(
(row.session_id.clone(), row.mid.clone()),
(row.session_id.clone(), re.to_string()),
)
})
})
.collect();
let present: HashSet<Node> = all
.iter()
.map(|row| (row.session_id.clone(), row.mid.clone()))
.collect();
let mut depths: HashMap<Node, usize> = HashMap::new();
let cap = all.len();
for row in all {
let mut depth = 0usize;
let mut cursor = (row.session_id.clone(), row.mid.clone());
while let Some(parent) = parent_of.get(&cursor) {
if !present.contains(parent) || depth >= cap {
break;
}
depth += 1;
cursor = parent.clone();
}
depths.insert((row.session_id.clone(), row.mid.clone()), depth);
}
depths
}
fn print_rows_table(rows: &[MessageRow], title: &str) {
println!("# {title} ({} message(s))", rows.len());
println!(
"{:<20} {:<16} {:<18} {:<14} {:<14} EXCERPT",
"MID", "TYPE", "FROM->TO", "REF", "RE"
);
for row in rows {
let from_to = format!(
"{}->{}",
row.source_agent.as_deref().unwrap_or("-"),
row.target_agent.as_deref().unwrap_or("-")
);
let excerpt = match (&row.excerpt, row.redaction_policy.as_str()) {
(_, "hash-only") => "[hash-only]".to_string(),
(Some(text), _) => truncate_excerpt(text),
(None, _) => "-".to_string(),
};
println!(
"{:<20} {:<16} {:<18} {:<14} {:<14} {}",
row.mid,
row.message_type,
from_to,
row.r#ref.as_deref().unwrap_or("-"),
row.re.as_deref().unwrap_or("-"),
excerpt
);
}
}
fn truncate_excerpt(text: &str) -> String {
let single = text.replace('\n', " ");
if single.chars().count() > 60 {
let head: String = single.chars().take(57).collect();
format!("{head}...")
} else {
single
}
}
fn print_rows_json(rows: &[MessageRow]) -> CliResult<()> {
println!(
"{}",
serde_json::to_string_pretty(rows)
.map_err(|error| CliError::failure(format!("failed to render JSON: {error}")))?
);
Ok(())
}
pub(crate) fn load_messages(
db: Option<&std::path::Path>,
root: &std::path::Path,
session: Option<&str>,
) -> CliResult<Vec<MessageRow>> {
let file_rows = load_messages_from_files(root, session)?;
let Some(path) = db.filter(|path| path.exists()) else {
return Ok(file_rows);
};
let db_rows = load_messages_from_db(path, session)?;
Ok(merge_file_authoritative(db_rows, file_rows))
}
fn merge_file_authoritative(
db_rows: Vec<MessageRow>,
file_rows: Vec<MessageRow>,
) -> Vec<MessageRow> {
use std::collections::HashSet;
let file_keys: HashSet<(String, String)> = file_rows
.iter()
.map(|row| (row.session_id.clone(), row.mid.clone()))
.collect();
let mut merged = file_rows;
for row in db_rows {
let key = (row.session_id.clone(), row.mid.clone());
if !file_keys.contains(&key) {
merged.push(row);
}
}
merged
}
fn load_messages_from_db(
db: &std::path::Path,
session: Option<&str>,
) -> CliResult<Vec<MessageRow>> {
let connection = crate::db::open_read_database(db)?;
let mut statement = connection
.prepare(
"SELECT m.mid, m.session_id, m.message_type, m.source_agent_id, m.target_agent_id,
m.ref, a.re, m.timestamp, m.payload_redaction_policy,
CASE WHEN m.payload_redaction_policy = 'hash-only'
THEN NULL ELSE m.payload_excerpt END
FROM messages AS m
LEFT JOIN audit_records AS a ON a.audit_id = m.latest_audit_id
ORDER BY m.timestamp, m.mid",
)
.map_err(|error| CliError::failure(format!("failed to query messages: {error}")))?;
let rows = statement
.query_map([], |row| {
Ok(MessageRow {
mid: row.get(0)?,
session_id: row.get(1)?,
message_type: row.get(2)?,
source_agent: row.get(3)?,
target_agent: row.get(4)?,
r#ref: row.get(5)?,
re: row.get(6)?,
timestamp: row.get(7)?,
redaction_policy: row.get(8)?,
excerpt: row.get(9)?,
})
})
.map_err(|error| CliError::failure(format!("failed to read messages: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read messages: {error}")))?;
Ok(filter_session(rows, session))
}
fn load_messages_from_files(
root: &std::path::Path,
session: Option<&str>,
) -> CliResult<Vec<MessageRow>> {
let sessions = root.join("sessions");
if !sessions.is_dir() {
return Ok(Vec::new());
}
let mut rows = Vec::new();
let mut seen: std::collections::HashSet<(String, String)> = std::collections::HashSet::new();
for entry in fs::read_dir(&sessions).map_err(|error| {
CliError::failure(format!("failed to read {}: {error}", sessions.display()))
})? {
let entry = entry
.map_err(|error| CliError::failure(format!("failed to read session dir: {error}")))?;
let audit_path = entry.path().join("audit.md");
if !audit_path.is_file() {
continue;
}
let content = fs::read_to_string(&audit_path).map_err(|error| {
CliError::failure(format!("failed to read {}: {error}", audit_path.display()))
})?;
for block in parse_audit_blocks_full(&content) {
let Some(mid) = block.get("mid").cloned() else {
continue;
};
let session_id = block.get("session_id").cloned().unwrap_or_default();
if !seen.insert((session_id.clone(), mid.clone())) {
continue; }
let redaction_policy = block
.get("payload_redaction_policy")
.cloned()
.unwrap_or_else(|| "hash-only".to_string());
let excerpt = if redaction_policy == "hash-only" {
None
} else {
block.get("payload_excerpt").cloned()
};
rows.push(MessageRow {
mid,
session_id,
message_type: block.get("type").cloned().unwrap_or_default(),
source_agent: block.get("source_agent").cloned(),
target_agent: block.get("target_agent").cloned(),
r#ref: block.get("ref").cloned(),
re: block.get("re").cloned(),
timestamp: block.get("timestamp").cloned().unwrap_or_default(),
redaction_policy,
excerpt,
});
}
}
Ok(filter_session(rows, session))
}
fn filter_session(rows: Vec<MessageRow>, session: Option<&str>) -> Vec<MessageRow> {
match session {
Some(session) => rows
.into_iter()
.filter(|row| row.session_id == session)
.collect(),
None => rows,
}
}
fn parse_audit_blocks_full(content: &str) -> Vec<std::collections::BTreeMap<String, String>> {
let mut blocks = Vec::new();
let mut current: Option<std::collections::BTreeMap<String, String>> = None;
let mut payload_capture: Option<Vec<String>> = None;
for raw in content.lines() {
if payload_capture.is_some() {
if raw == "```" {
if let (Some(values), Some(lines)) = (current.as_mut(), payload_capture.take()) {
values.insert("payload_excerpt".to_string(), lines.join("\n"));
}
if let Some(values) = current.take() {
blocks.push(values);
}
continue;
}
if let Some(lines) = payload_capture.as_mut() {
lines.push(raw.to_string());
}
continue;
}
let line = raw.trim();
if line == "```text" {
current = Some(std::collections::BTreeMap::new());
continue;
}
if line == "```" {
if let Some(values) = current.take() {
blocks.push(values);
}
continue;
}
if current.is_some() {
if let Some((key, value)) = line.split_once('=') {
if key == "payload_excerpt" {
payload_capture = Some(vec![value.to_string()]);
} else if let Some(values) = current.as_mut() {
values
.entry(key.to_string())
.or_insert_with(|| value.to_string());
}
}
}
}
blocks
}
#[derive(Debug, Args)]
pub struct DoctorArgs {
#[arg(
long,
default_value = "herdr",
help = "herdr executable path to probe."
)]
pub herdr_bin: String,
#[arg(
long,
help = "DB path to inspect read-only (default cwd .zynk/zynk.db if it exists)."
)]
pub db: Option<std::path::PathBuf>,
#[arg(
long,
default_value = "outputs",
help = "Audit artifact root to inspect."
)]
pub root: std::path::PathBuf,
#[arg(long, default_value = "table", value_enum, help = "Output format.")]
pub format: crate::herdr_orchestration::OutputFormat,
}
#[derive(Debug, Clone, serde::Serialize)]
struct Finding {
level: &'static str,
check: &'static str,
detail: String,
}
pub fn run_doctor(args: DoctorArgs) -> CliResult<()> {
use crate::herdr_orchestration::OutputFormat;
let mut findings: Vec<Finding> = Vec::new();
findings.push(Finding {
level: "info",
check: "version",
detail: format!("zynk {}", env!("CARGO_PKG_VERSION")),
});
match load_inventory(&args.herdr_bin) {
Ok(inventory) => {
findings.push(Finding {
level: "ok",
check: "herdr",
detail: format!(
"reachable ({} workspace(s) visible)",
inventory.workspaces.len()
),
});
match resolve_self(&args.herdr_bin) {
Ok(me) => findings.push(Finding {
level: "ok",
check: "self-pane",
detail: format!(
"{} at {} (workspace {})",
me.agent, me.pane_id, me.workspace_id
),
}),
Err(error) => findings.push(Finding {
level: "warn",
check: "self-pane",
detail: format!("cannot resolve self: {}", error.message()),
}),
}
for (agent, count) in ambiguous_agents(&inventory) {
findings.push(Finding {
level: "warn",
check: "ambiguous-agent",
detail: format!(
"agent {agent} occupies {count} live panes — name alone is ambiguous; use --workspace/--to-pane"
),
});
}
}
Err(error) => findings.push(Finding {
level: "warn",
check: "herdr",
detail: format!(
"not reachable: {} (send/whoami need a live herdr)",
error.message()
),
}),
}
let db = resolve_db_path(args.db.as_deref(), false);
match db {
Some(path) if path.exists() => match doctor_projection_gap(&path) {
Ok(0) => findings.push(Finding {
level: "ok",
check: "db",
detail: format!("{} present; no obvious projection gap", path.display()),
}),
Ok(gap) => findings.push(Finding {
level: "warn",
check: "db",
detail: format!(
"{}: {gap} message(s) with no latest_audit_id (run `zynk db import outputs` to reconcile)",
path.display()
),
}),
Err(error) => findings.push(Finding {
level: "warn",
check: "db",
detail: format!("could not inspect {}: {}", path.display(), error.message()),
}),
},
_ => findings.push(Finding {
level: "info",
check: "db",
detail: "no .zynk/zynk.db in cwd (file-only mode; status/audit/send auto-create it)".to_string(),
}),
}
findings.push(skill_mirror_finding());
findings.push(Finding {
level: "info",
check: "send-hints",
detail: "addresses are agent:pane-id; reply/send-agent resolve panes live, so a stale pane aborts loudly rather than misdelivering".to_string(),
});
match args.format {
OutputFormat::Json => {
println!(
"{}",
serde_json::to_string_pretty(&findings).map_err(|error| CliError::failure(
format!("failed to render doctor JSON: {error}")
))?
);
}
OutputFormat::Table => {
println!("{:<6} {:<18} DETAIL", "LEVEL", "CHECK");
for finding in &findings {
println!(
"{:<6} {:<18} {}",
finding.level, finding.check, finding.detail
);
}
}
}
Ok(())
}
fn ambiguous_agents(inventory: &Inventory) -> Vec<(String, usize)> {
use std::collections::BTreeMap;
let mut counts: BTreeMap<String, usize> = BTreeMap::new();
for pane in all_panes(inventory) {
if let Some(agent) = pane.agent.as_deref() {
*counts.entry(agent.to_string()).or_insert(0) += 1;
}
}
counts.into_iter().filter(|(_, count)| *count > 1).collect()
}
fn doctor_projection_gap(db: &std::path::Path) -> CliResult<i64> {
let connection = crate::db::open_read_database(db)?;
connection
.query_row(
"SELECT COUNT(*) FROM messages WHERE latest_audit_id IS NULL",
[],
|row| row.get(0),
)
.map_err(|error| CliError::failure(format!("failed to inspect messages: {error}")))
}
fn skill_mirror_finding() -> Finding {
let mut candidates: Vec<std::path::PathBuf> =
vec![std::path::PathBuf::from("skills/zynk/SKILL.md")];
if let Ok(home) = std::env::var("HOME") {
candidates.push(std::path::Path::new(&home).join(".claude/skills/zynk/SKILL.md"));
}
if let Some(found) = candidates.iter().find(|path| path.exists()) {
Finding {
level: "ok",
check: "skill-mirror",
detail: format!("zynk skill present at {}", found.display()),
}
} else {
Finding {
level: "info",
check: "skill-mirror",
detail: "no zynk skill found under ./skills or $HOME/.claude/skills (optional)"
.to_string(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::herdr_orchestration::{Inventory, PaneView, TabView, WorkspaceView};
fn pane(id: &str, workspace: &str, agent: &str) -> PaneView {
PaneView {
pane_id: id.to_string(),
workspace_id: workspace.to_string(),
tab_id: format!("{workspace}:1"),
agent: Some(agent.to_string()),
agent_status: "idle".to_string(),
label: None,
cwd: None,
focused: false,
}
}
fn ws(workspace_id: &str, label: &str, number: i64, panes: Vec<PaneView>) -> WorkspaceView {
WorkspaceView {
workspace_id: workspace_id.to_string(),
label: Some(label.to_string()),
number: Some(number),
focused: false,
tabs: vec![TabView {
tab_id: format!("{workspace_id}:1"),
workspace_id: workspace_id.to_string(),
label: None,
number: Some(1),
focused: false,
panes,
}],
}
}
fn inventory(workspaces: Vec<WorkspaceView>) -> Inventory {
Inventory { workspaces }
}
#[test]
fn resolve_target_exactly_one_pane_ok() {
let inv = inventory(vec![ws(
"w1",
"smscode",
1,
vec![pane("w1-1", "w1", "codex"), pane("w1-2", "w1", "claude")],
)]);
let (agent, pane_id) = resolve_target_in(&inv, "codex", None).unwrap();
assert_eq!(agent, "codex");
assert_eq!(pane_id, "w1-1");
}
#[test]
fn resolve_target_zero_panes_errors_no_live_pane() {
let inv = inventory(vec![ws(
"w1",
"smscode",
1,
vec![pane("w1-1", "w1", "claude")],
)]);
let err = resolve_target_in(&inv, "codex", None).unwrap_err();
assert!(
err.message().contains("no live pane"),
"expected no-live-pane error, got: {}",
err.message()
);
}
#[test]
fn resolve_target_two_cross_workspace_without_scope_lists_candidates() {
let inv = inventory(vec![
ws("w1", "smscode", 1, vec![pane("w1-1", "w1", "codex")]),
ws("w2", "infra", 2, vec![pane("w2-1", "w2", "codex")]),
]);
let err = resolve_target_in(&inv, "codex", None).unwrap_err();
let msg = err.message();
assert!(
msg.contains("ambiguous"),
"expected ambiguity error, got: {msg}"
);
assert!(
msg.contains("w1:w1-1"),
"should list candidate w1:w1-1, got: {msg}"
);
assert!(
msg.contains("w2:w2-1"),
"should list candidate w2:w2-1, got: {msg}"
);
}
#[test]
fn resolve_target_two_panes_with_workspace_scope_narrows_to_one() {
let inv = inventory(vec![
ws("w1", "smscode", 1, vec![pane("w1-1", "w1", "codex")]),
ws("w2", "infra", 2, vec![pane("w2-1", "w2", "codex")]),
]);
assert_eq!(
resolve_target_in(&inv, "codex", Some("w2")).unwrap().1,
"w2-1"
);
assert_eq!(
resolve_target_in(&inv, "codex", Some("infra")).unwrap().1,
"w2-1"
);
assert_eq!(
resolve_target_in(&inv, "codex", Some("1")).unwrap().1,
"w1-1"
);
}
}