use crate::audit::{self, AuditArgs};
use crate::compose::{build_message, ComposeArgs, ComposedMessage};
use crate::profile::load_profile;
use crate::{CliError, CliResult};
use clap::Args;
use std::fs;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, Output};
#[derive(Debug, Args)]
pub struct SendHerdrArgs {
#[arg(long, help = "Target herdr pane id.")]
pub pane: String,
#[arg(long, help = "Print composed message without sending.")]
pub dry_run: bool,
#[arg(long, default_value = "herdr", help = "herdr executable path.")]
pub herdr_bin: String,
#[arg(
long,
help = "session id; on a successful send this records the sender audit + message corpus automatically (ADR 029) — do not also run zynk audit for the same sent message. Omit for an ad-hoc, unaudited send."
)]
pub session_id: Option<String>,
#[arg(
long,
help = "opt out of the audited send: send the message but write no audit/corpus record (ADR 029)."
)]
pub no_audit: bool,
#[arg(
long,
default_value = "outputs",
help = "audit artifact root (independent of --db); the record is written under <root>/sessions/<session-id>/audit.md."
)]
pub root: PathBuf,
#[arg(
long,
default_value = "agent",
value_parser = ["agent", "operator", "helper-tool", "unknown"],
help = "who originated this send, for the audit record (ADR 029 C5)."
)]
pub command_origin: String,
#[arg(
long,
default_value = "full",
help = "redaction policy for the audited payload; defaults to full so the corpus is queryable (ADR 029 decision 7)."
)]
pub payload_redaction_policy: String,
#[arg(
long,
help = "sensitive category; a profile force_hash_only category forces hash-only regardless of --payload-redaction-policy."
)]
pub sensitive_category: Option<String>,
#[arg(
long,
help = "live DB path for the audited send's projection; defaults to the cwd .zynk/zynk.db (ADR 028); --no-db forces file-only."
)]
pub db: Option<PathBuf>,
#[arg(
long,
help = "force the audited send file-only; skip the DB projection."
)]
pub no_db: bool,
#[arg(
long,
help = "ADR 034 D7: ALSO retain the full sent message as recoverable ciphertext in the DB custody_vault (additive to the redacted corpus); requires an audited send (--session-id, not --no-audit/--no-db). A retention failure is LOUD (nonzero), never a silent no-retain."
)]
pub retain_custody: bool,
#[arg(
long,
help = "path to the operator-owned custody key file (default <db-dir>/custody.key or $ZYNK_CUSTODY_KEY_FILE); only used with --retain-custody."
)]
pub custody_key_file: Option<PathBuf>,
#[command(flatten)]
pub compose: ComposeArgs,
}
pub fn run(args: SendHerdrArgs) -> CliResult<()> {
let profile = load_profile(args.compose.profile.as_deref())?;
let composed = build_message(&args.compose, &profile)?;
let audited = args.session_id.is_some() && !args.no_audit;
if args.db.is_some() && !audited {
eprintln!(
"warning: --db has no effect without an audited send (--session-id, and not --no-audit)"
);
}
if args.retain_custody {
if !audited {
return Err(CliError::usage(
"--retain-custody requires an audited send (a --session-id, not --no-audit)",
));
}
if args.no_db {
return Err(CliError::usage(
"--retain-custody cannot be combined with --no-db: custody is retained in the DB vault",
));
}
}
if args.dry_run {
eprintln!("DRY RUN: message was not sent; do not record delivery_status=sent.");
println!("{}", composed.message);
return Ok(());
}
let audit_args = if audited {
let session_id = args
.session_id
.clone()
.expect("audited implies --session-id is present");
let prepared = prepare_audit(&args, &composed, session_id)?;
audit::validate_audit_args(&prepared, &profile)?;
Some(prepared)
} else {
None
};
dispatch_herdr_message(&args, &composed.message)?;
if let Some(audit_args) = audit_args {
match audit::write_audit_file(&audit_args, &profile) {
Ok((audit_path, record)) => {
audit::project_record(
audit_args.db.as_deref(),
audit_args.no_db,
&audit_args.root,
&record,
)?;
if args.retain_custody {
crate::audit::retain_custody_for(
args.db.as_deref(),
args.no_db,
args.custody_key_file.as_deref(),
&record,
composed.message.as_bytes(),
)
.map_err(|error| {
CliError::failure(format!(
"{}; record written but custody NOT retained: {}",
audit_path.display(),
error.message
))
})?;
}
}
Err(error) => return Err(integrity_gap(&audit_args, &composed.message, error)),
}
}
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct ReceiverState {
pub marker_count: usize,
pub input: InputClass,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum InputClass {
Recognized(InputState),
Unrecognized,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum InputState {
Empty,
OneChip,
Other,
}
fn parse_marker_count(recent: &str, marker: &str) -> usize {
if marker.is_empty() {
0
} else {
recent.matches(marker).count()
}
}
fn is_input_box_rule(line: &str) -> bool {
let trimmed = line.trim();
!trimmed.is_empty()
&& trimmed.chars().all(|c| c == '─' || c == ' ')
&& trimmed.chars().filter(|&c| c == '─').count() >= 8
}
fn starts_with_claude_prompt(line: &str) -> bool {
line.trim_start().starts_with('\u{276f}')
}
fn classify_input(visible: &str) -> InputClass {
let lines: Vec<&str> = visible.lines().collect();
let rules: Vec<usize> = lines
.iter()
.enumerate()
.filter(|(_, l)| is_input_box_rule(l))
.map(|(i, _)| i)
.collect();
if rules.len() < 2 {
return InputClass::Unrecognized;
}
let (top, bottom) = (rules[rules.len() - 2], rules[rules.len() - 1]);
let region_lines = &lines[top + 1..bottom];
if !region_lines.iter().any(|l| starts_with_claude_prompt(l)) {
return InputClass::Unrecognized;
}
let region = region_lines.join(" ");
let cleaned = region
.replace('\u{276f}', " ") .trim()
.trim_start_matches('>')
.trim()
.to_string();
if cleaned.is_empty() {
return InputClass::Recognized(InputState::Empty);
}
if let Some(rest) = cleaned.strip_prefix("[Pasted text") {
if let Some(close) = rest.find(']') {
if rest[close + 1..].trim().is_empty() {
return InputClass::Recognized(InputState::OneChip);
}
}
}
InputClass::Recognized(InputState::Other)
}
fn delivery_marker(composed_message: &str) -> CliResult<String> {
let start = composed_message
.find("[herdr ")
.ok_or_else(|| CliError::failure("composed message has no [herdr ...] header to verify"))?;
let rel_end = composed_message[start..]
.find(']')
.ok_or_else(|| CliError::failure("composed message [herdr ...] header is unterminated"))?;
Ok(composed_message[start..=start + rel_end].to_string())
}
fn dispatch_herdr_message(args: &SendHerdrArgs, message: &str) -> CliResult<()> {
verify_delivery(args, message)
}
fn env_u64(var: &str, default: u64) -> u64 {
std::env::var(var)
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(default)
}
fn read_state(herdr_bin: &str, pane: &str, marker: &str) -> CliResult<ReceiverState> {
let recent = run_herdr_command(
herdr_bin,
&[
"pane",
"read",
pane,
"--source",
"recent-unwrapped",
"--lines",
"400",
],
)?;
ensure_herdr_success(&recent, "herdr pane read failed")?;
let marker_count = parse_marker_count(&String::from_utf8_lossy(&recent.stdout), marker);
let visible = run_herdr_command(herdr_bin, &["pane", "read", pane, "--source", "visible"])?;
ensure_herdr_success(&visible, "herdr pane read failed")?;
let input = classify_input(&String::from_utf8_lossy(&visible.stdout));
Ok(ReceiverState {
marker_count,
input,
})
}
fn verify_delivery(args: &SendHerdrArgs, message: &str) -> CliResult<()> {
let marker = delivery_marker(message)?;
if message.contains('\n') {
validate_bracketed_paste_payload(message)?;
}
let baseline = read_state(&args.herdr_bin, &args.pane, &marker)?;
let recognized = matches!(baseline.input, InputClass::Recognized(_));
if recognized && baseline.input != InputClass::Recognized(InputState::Empty) {
return Err(CliError::failure(format!(
"receiver pane {} has pre-existing pending input; refusing to send (ADR 041 D3)",
args.pane
)));
}
let run = run_herdr_command(&args.herdr_bin, &["pane", "run", &args.pane, message])?;
write_child_output(&run)?;
ensure_herdr_success(&run, "herdr pane run failed")?;
let timeout = std::time::Duration::from_millis(env_u64("ZYNK_VERIFY_TIMEOUT_MS", 4000));
let poll = std::time::Duration::from_millis(env_u64("ZYNK_VERIFY_POLL_MS", 200));
let deadline = std::time::Instant::now() + timeout;
let mut entered = false;
loop {
let state = read_state(&args.herdr_bin, &args.pane, &marker)?;
let new_marker = state.marker_count > baseline.marker_count;
if recognized {
if new_marker && state.input == InputClass::Recognized(InputState::Empty) {
return Ok(());
}
match state.input {
InputClass::Recognized(InputState::OneChip) if !entered => {
let keyed = run_herdr_command(
&args.herdr_bin,
&["pane", "send-keys", &args.pane, "Enter"],
)?;
ensure_herdr_success(&keyed, "herdr pane send-keys failed")?;
entered = true;
}
InputClass::Recognized(InputState::Other) => {
return Err(CliError::failure(format!(
"receiver pane {} shows ambiguous pending input; refusing the auto-submit (ADR 041 D3)",
args.pane
)));
}
_ => {}
}
} else {
if new_marker {
return Ok(());
}
}
if std::time::Instant::now() >= deadline {
return Err(CliError::failure(format!(
"delivery to pane {} unverified within timeout; not recording sent (ADR 041 D1/D4)",
args.pane
)));
}
std::thread::sleep(poll);
}
}
fn validate_bracketed_paste_payload(message: &str) -> CliResult<()> {
if message.contains('\u{1b}') || message.contains('\u{009b}') {
return Err(CliError::failure(
"multiline Herdr messages may not contain terminal control bytes (ESC or C1 CSI)",
));
}
Ok(())
}
fn run_herdr_command(herdr_bin: &str, args: &[&str]) -> CliResult<Output> {
match Command::new(herdr_bin).args(args).output() {
Ok(output) => Ok(output),
Err(error) if error.kind() == io::ErrorKind::NotFound => Err(CliError::with_code(
127,
format!("herdr CLI not found at {herdr_bin}"),
)),
Err(error) => Err(CliError::failure(format!("failed to run herdr: {error}"))),
}
}
fn write_child_output(output: &Output) -> CliResult<()> {
io::stdout()
.write_all(&output.stdout)
.map_err(|error| CliError::failure(format!("failed to write stdout: {error}")))?;
io::stderr()
.write_all(&output.stderr)
.map_err(|error| CliError::failure(format!("failed to write stderr: {error}")))?;
Ok(())
}
fn ensure_herdr_success(output: &Output, message: &str) -> CliResult<()> {
if output.status.success() {
Ok(())
} else {
Err(CliError::with_code(
output.status.code().unwrap_or(1),
message,
))
}
}
fn prepare_audit(
args: &SendHerdrArgs,
composed: &ComposedMessage,
session_id: String,
) -> CliResult<AuditArgs> {
let from = args.compose.from.as_deref().ok_or_else(|| {
CliError::usage("audited send (--session-id) requires --from agent:address")
})?;
let to = args.compose.to.as_deref().ok_or_else(|| {
CliError::usage("audited send (--session-id) requires --to agent:address")
})?;
let (source_agent, source_address) = split_agent_address(from, "--from")?;
let (target_agent, target_address) = split_agent_address(to, "--to")?;
if target_address != args.pane {
return Err(CliError::usage(format!(
"--to address ({target_address}) must equal --pane ({}) so the audit records the real destination (ADR 029 C2)",
args.pane
)));
}
let workspace_id = workspace_from_pane(&args.pane)?;
let mid = args
.compose
.mid
.clone()
.ok_or_else(|| CliError::usage("audited send (--session-id) requires --mid"))?;
let due = args
.compose
.due
.as_deref()
.and_then(crate::timestamp::canonicalize);
Ok(AuditArgs {
profile: args.compose.profile.clone(),
root: args.root.clone(),
session_id,
audit_id: None,
previous_audit_id: None,
timestamp: None,
due,
source_agent: source_agent.clone(),
source_address,
target_agent,
target_address,
transport: "herdr".to_string(),
workspace_id,
transport_thread_id: None,
mid,
record_type: composed.message_type.clone(),
mode: composed.mode.clone(),
r#ref: args.compose.r#ref.clone(),
re: args.compose.re.clone(),
command_origin: args.command_origin.clone(),
payload: Some(composed.message.clone()),
payload_file: None,
payload_redaction_policy: args.payload_redaction_policy.clone(),
payload_ref: None,
sensitive_category: args.sensitive_category.clone(),
excerpt_chars: 12,
delivery_status: "sent".to_string(),
observed_by: source_agent,
verified_by: "helper-tool".to_string(),
db: args.db.clone(),
no_db: args.no_db,
retain_custody: false,
custody_key_file: None,
})
}
fn split_agent_address(value: &str, flag: &str) -> CliResult<(String, String)> {
match value.split_once(':') {
Some((agent, address)) if !agent.is_empty() && !address.is_empty() => {
Ok((agent.to_string(), address.to_string()))
}
_ => Err(CliError::usage(format!(
"{flag} must be agent:address (got {value:?})"
))),
}
}
fn workspace_from_pane(pane: &str) -> CliResult<String> {
match pane.rsplit_once('-') {
Some((workspace, index))
if !workspace.is_empty()
&& !index.is_empty()
&& index.chars().all(|c| c.is_ascii_digit()) =>
{
Ok(workspace.to_string())
}
_ => Err(CliError::usage(format!(
"--pane must look like <workspace>-<n> to derive workspace_id (got {pane:?})"
))),
}
}
fn integrity_gap(audit_args: &AuditArgs, message: &str, cause: CliError) -> CliError {
let recovery = std::env::current_dir()
.unwrap_or_else(|_| PathBuf::from("."))
.join(format!(
"zynk-recovery-{}-{}.txt",
audit_args.session_id, audit_args.mid
));
let recovery_note = match fs::write(&recovery, message) {
Ok(()) => format!("preserved the sent message to {}", recovery.display()),
Err(error) => {
format!("FAILED to preserve the sent message ({error}); message body below:\n{message}")
}
};
CliError::with_code(
1,
format!(
"INTEGRITY GAP: message was SENT but the audit file write failed: {}\n{}\nreconcile the record with:\n {}",
cause.message,
recovery_note,
reconcile_command(audit_args, &recovery)
),
)
}
fn shell_quote(value: &str) -> String {
if !value.is_empty()
&& value
.bytes()
.all(|b| b.is_ascii_alphanumeric() || b"-_./=:+,@%".contains(&b))
{
value.to_string()
} else {
format!("'{}'", value.replace('\'', "'\\''"))
}
}
fn reconcile_command(a: &AuditArgs, recovery: &Path) -> String {
let mut parts = vec![
"zynk audit".to_string(),
format!("--root {}", shell_quote(&a.root.display().to_string())),
format!("--session-id {}", shell_quote(&a.session_id)),
format!("--source-agent {}", shell_quote(&a.source_agent)),
format!("--source-address {}", shell_quote(&a.source_address)),
format!("--target-agent {}", shell_quote(&a.target_agent)),
format!("--target-address {}", shell_quote(&a.target_address)),
format!("--transport {}", shell_quote(&a.transport)),
format!("--workspace-id {}", shell_quote(&a.workspace_id)),
format!("--mid {}", shell_quote(&a.mid)),
format!("--type {}", shell_quote(&a.record_type)),
format!("--command-origin {}", shell_quote(&a.command_origin)),
format!(
"--payload-redaction-policy {}",
shell_quote(&a.payload_redaction_policy)
),
"--delivery-status sent".to_string(),
format!("--observed-by {}", shell_quote(&a.observed_by)),
"--verified-by helper-tool".to_string(),
format!(
"--payload-file {}",
shell_quote(&recovery.display().to_string())
),
];
if a.no_db {
parts.push("--no-db".to_string());
} else if let Some(db) = &a.db {
parts.push(format!("--db {}", shell_quote(&db.display().to_string())));
}
if let Some(value) = &a.re {
parts.push(format!("--re {}", shell_quote(value)));
}
if let Some(value) = &a.r#ref {
parts.push(format!("--ref {}", shell_quote(value)));
}
if let Some(value) = &a.mode {
parts.push(format!("--mode {}", shell_quote(value)));
}
if let Some(value) = &a.due {
parts.push(format!("--due {}", shell_quote(value)));
}
parts.join(" ")
}
#[cfg(test)]
mod tests {
use super::{
classify_input, delivery_marker, parse_marker_count, shell_quote, InputClass, InputState,
};
const REAL_CLAUDE_VISIBLE: &str =
include_str!("../tests/fixtures/adr041-claude-pane-visible.txt");
const REAL_CODEX_VISIBLE: &str =
include_str!("../tests/fixtures/adr041-codex-pane-visible.txt");
#[test]
fn parse_marker_count_counts_header_occurrences() {
let marker = "[herdr from=a to=b mid=m1 type=ack]";
assert_eq!(parse_marker_count("", marker), 0);
let twice = format!("line\n{marker}\nmiddle\n{marker}\ntail\n");
assert_eq!(parse_marker_count(&twice, marker), 2);
assert_eq!(
parse_marker_count("[Pasted text #1 +9 lines]\nnothing here\n", marker),
0
);
assert_eq!(parse_marker_count("anything", ""), 0);
}
#[test]
fn classify_input_recognizes_real_claude_box_as_empty() {
assert!(
REAL_CLAUDE_VISIBLE.matches("[Pasted text").count() >= 2,
"the fixture must actually contain historical prose chips to be a real test"
);
assert_eq!(
classify_input(REAL_CLAUDE_VISIBLE),
InputClass::Recognized(InputState::Empty),
"the real Claude-Code `❯` input box is recognized and empty"
);
}
#[test]
fn classify_input_unrecognized_for_real_codex_pane() {
assert!(
!REAL_CODEX_VISIBLE.contains('\u{276f}'),
"the Codex fixture must have no `❯` glyph to be a real Unrecognized test"
);
assert_eq!(
classify_input(REAL_CODEX_VISIBLE),
InputClass::Unrecognized,
"a Codex/GPT layout (no `❯`-prompt in the rule region) is Unrecognized, not a false abort"
);
}
#[test]
fn classify_input_recognized_empty_for_bare_prompt() {
let visible = "\
some scrollback
────────────────────────
❯
────────────────────────
";
assert_eq!(
classify_input(visible),
InputClass::Recognized(InputState::Empty)
);
}
#[test]
fn classify_input_recognized_one_chip_for_sole_paste_chip() {
let visible = "\
some scrollback
────────────────────────
❯ [Pasted text #1 +9 lines]
────────────────────────
";
assert_eq!(
classify_input(visible),
InputClass::Recognized(InputState::OneChip)
);
}
#[test]
fn classify_input_recognized_other_for_plain_typed_text() {
let visible = "\
prior
────────────────────────
❯ draft text
────────────────────────
";
assert_eq!(
classify_input(visible),
InputClass::Recognized(InputState::Other)
);
}
#[test]
fn classify_input_recognized_other_for_chip_plus_text() {
let visible = "\
prior
────────────────────────
❯ [Pasted text #1 +9 lines] and typed text
────────────────────────
";
assert_eq!(
classify_input(visible),
InputClass::Recognized(InputState::Other)
);
}
#[test]
fn classify_input_recognized_other_for_two_chips() {
let visible = "\
prior
────────────────────────
❯ [Pasted text #1 +9 lines]
[Pasted text #2 +3 lines]
────────────────────────
";
assert_eq!(
classify_input(visible),
InputClass::Recognized(InputState::Other)
);
}
#[test]
fn classify_input_unrecognized_without_input_box() {
let visible = "$ ls\nfile1 file2\n[Pasted text #1 +9 lines] mentioned in output\n";
assert_eq!(classify_input(visible), InputClass::Unrecognized);
}
#[test]
fn classify_input_unrecognized_for_non_claude_prompt_in_box() {
let visible = "\
prior
────────────────────────
› ghost placeholder text
────────────────────────
";
assert_eq!(classify_input(visible), InputClass::Unrecognized);
}
#[test]
fn marker_counted_while_historical_prose_does_not_block() {
let marker = "[herdr from=a to=b mid=m1 type=ack]";
let recent = format!("{marker}\nold: [Pasted text #1 +9 lines] discussed\n");
assert_eq!(parse_marker_count(&recent, marker), 1);
let visible = "\
talk about [Pasted text #1 +9 lines]
────────────────────────
❯
────────────────────────
";
assert_eq!(
classify_input(visible),
InputClass::Recognized(InputState::Empty)
);
}
#[test]
fn delivery_marker_extracts_the_herdr_header_segment() {
let composed = "[from-claude via herdr] [herdr from=a to=b mid=m1 type=ack] BODY: hi";
assert_eq!(
delivery_marker(composed).unwrap(),
"[herdr from=a to=b mid=m1 type=ack]"
);
}
#[test]
fn delivery_marker_errors_without_a_header() {
let err = delivery_marker("no header here BODY: hi").unwrap_err();
assert!(
err.message.contains("no [herdr"),
"missing header must fail loud: {}",
err.message
);
}
#[test]
fn shell_quote_passes_safe_values_and_quotes_the_rest() {
assert_eq!(shell_quote("outputs"), "outputs");
assert_eq!(shell_quote("/a/b-c_d.e:1"), "/a/b-c_d.e:1");
assert_eq!(shell_quote(""), "''");
assert_eq!(shell_quote("with space"), "'with space'");
assert_eq!(shell_quote("a;rm -rf b"), "'a;rm -rf b'");
assert_eq!(shell_quote("a'b"), "'a'\\''b'");
}
}