use crate::audit::{self, AuditArgs};
use crate::compose::{build_message, ComposeArgs, ComposedMessage};
use crate::profile::load_profile;
use crate::{CliError, CliResult};
use clap::Args;
use std::collections::BTreeMap;
use std::fs;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, Output};
use std::time::Duration;
#[derive(Debug, Args)]
pub struct SendHerdrArgs {
#[arg(long, help = "Target herdr pane id.")]
pub pane: String,
#[arg(long, help = "Print composed message without sending.")]
pub dry_run: bool,
#[arg(long, default_value = "herdr", help = "herdr executable path.")]
pub herdr_bin: String,
#[arg(
long,
help = "session id; on a successful send this records the sender audit + message corpus automatically (ADR 029) — do not also run zynk audit for the same sent message. Omit for an ad-hoc, unaudited send."
)]
pub session_id: Option<String>,
#[arg(
long,
help = "opt out of the audited send: send the message but write no audit/corpus record (ADR 029)."
)]
pub no_audit: bool,
#[arg(
long,
default_value = "outputs",
help = "audit artifact root (independent of --db); the record is written under <root>/sessions/<session-id>/audit.md."
)]
pub root: PathBuf,
#[arg(
long,
default_value = "agent",
value_parser = ["agent", "operator", "helper-tool", "unknown"],
help = "who originated this send, for the audit record (ADR 029 C5)."
)]
pub command_origin: String,
#[arg(
long,
default_value = "full",
help = "redaction policy for the audited payload; defaults to full so the corpus is queryable (ADR 029 decision 7)."
)]
pub payload_redaction_policy: String,
#[arg(
long,
help = "sensitive category; a profile force_hash_only category forces hash-only regardless of --payload-redaction-policy."
)]
pub sensitive_category: Option<String>,
#[arg(
long,
help = "live DB path for the audited send's projection; defaults to the cwd .zynk/zynk.db (ADR 028); --no-db forces file-only."
)]
pub db: Option<PathBuf>,
#[arg(
long,
help = "force the audited send file-only; skip the DB projection."
)]
pub no_db: bool,
#[arg(
long,
help = "ADR 034 D7: ALSO retain the full sent message as recoverable ciphertext in the DB custody_vault (additive to the redacted corpus); requires an audited send (--session-id, not --no-audit/--no-db). A retention failure is LOUD (nonzero), never a silent no-retain."
)]
pub retain_custody: bool,
#[arg(
long,
help = "path to the operator-owned custody key file (default <db-dir>/custody.key or $ZYNK_CUSTODY_KEY_FILE); only used with --retain-custody."
)]
pub custody_key_file: Option<PathBuf>,
#[command(flatten)]
pub compose: ComposeArgs,
}
pub fn run(args: SendHerdrArgs) -> CliResult<()> {
let profile = load_profile(args.compose.profile.as_deref())?;
let composed = build_message(&args.compose, &profile)?;
let audited = args.session_id.is_some() && !args.no_audit;
if args.db.is_some() && !audited {
eprintln!(
"warning: --db has no effect without an audited send (--session-id, and not --no-audit)"
);
}
if args.retain_custody {
if !audited {
return Err(CliError::usage(
"--retain-custody requires an audited send (a --session-id, not --no-audit)",
));
}
if args.no_db {
return Err(CliError::usage(
"--retain-custody cannot be combined with --no-db: custody is retained in the DB vault",
));
}
}
if args.dry_run {
eprintln!("DRY RUN: message was not sent; do not record delivery_status=sent.");
println!("{}", composed.message);
return Ok(());
}
let audit_args = if audited {
let session_id = args
.session_id
.clone()
.expect("audited implies --session-id is present");
let prepared = prepare_audit(&args, &composed, session_id)?;
audit::validate_audit_args(&prepared, &profile)?;
Some(prepared)
} else {
None
};
dispatch_herdr_message(&args, &composed.message)?;
if let Some(audit_args) = audit_args {
match audit::write_audit_file(&audit_args, &profile) {
Ok((audit_path, record)) => {
audit::project_record(
audit_args.db.as_deref(),
audit_args.no_db,
&audit_args.root,
&record,
)?;
if args.retain_custody {
crate::audit::retain_custody_for(
args.db.as_deref(),
args.no_db,
args.custody_key_file.as_deref(),
&record,
composed.message.as_bytes(),
)
.map_err(|error| {
CliError::failure(format!(
"{}; record written but custody NOT retained: {}",
audit_path.display(),
error.message
))
})?;
}
}
Err(error) => return Err(integrity_gap(&audit_args, &composed.message, error)),
}
}
Ok(())
}
fn parse_herdr_header_fields(header: &str) -> Option<BTreeMap<String, String>> {
let inner = header.trim().strip_prefix("[herdr ")?.strip_suffix(']')?;
let mut fields = BTreeMap::new();
for token in inner.split_whitespace() {
let (key, value) = token.split_once('=')?;
if key.is_empty() || value.is_empty() {
return None;
}
if fields.insert(key.to_string(), value.to_string()).is_some() {
return None;
}
}
Some(fields)
}
fn collapse_header_whitespace(header: &str) -> String {
header.split_whitespace().collect::<Vec<_>>().join(" ")
}
fn join_hard_wrapped_header(header: &str) -> String {
let mut joined = String::new();
for (idx, line) in header.lines().enumerate() {
if idx == 0 {
joined.push_str(line);
} else {
joined.push_str(line.trim_start());
}
}
if joined.is_empty() {
header.to_string()
} else {
joined
}
}
fn header_fields_match(
candidate: &BTreeMap<String, String>,
expected: &BTreeMap<String, String>,
) -> bool {
for required in ["from", "to", "mid", "type"] {
if candidate.get(required) != expected.get(required) {
return false;
}
}
expected
.iter()
.all(|(key, value)| candidate.get(key) == Some(value))
}
fn header_candidate_matches(candidate: &str, expected: &BTreeMap<String, String>) -> bool {
let variants = [
candidate.to_string(),
collapse_header_whitespace(candidate),
join_hard_wrapped_header(candidate),
];
variants.iter().any(|variant| {
parse_herdr_header_fields(variant)
.as_ref()
.is_some_and(|fields| header_fields_match(fields, expected))
})
}
fn parse_marker_count(recent: &str, marker: &str) -> usize {
if marker.is_empty() {
return 0;
}
let Some(expected) = parse_herdr_header_fields(marker) else {
return 0;
};
let mut count = 0;
let mut cursor = 0;
while let Some(relative_start) = recent[cursor..].find("[herdr ") {
let start = cursor + relative_start;
let Some(relative_end) = recent[start..].find(']') else {
break;
};
let end = start + relative_end + 1;
if header_candidate_matches(&recent[start..end], &expected) {
count += 1;
}
cursor = end;
}
count
}
fn delivery_marker(composed_message: &str) -> CliResult<String> {
let start = composed_message
.find("[herdr ")
.ok_or_else(|| CliError::failure("composed message has no [herdr ...] header to verify"))?;
let rel_end = composed_message[start..]
.find(']')
.ok_or_else(|| CliError::failure("composed message [herdr ...] header is unterminated"))?;
Ok(composed_message[start..=start + rel_end].to_string())
}
fn dispatch_herdr_message(args: &SendHerdrArgs, message: &str) -> CliResult<()> {
deliver_with_retry(args, message)
}
fn env_u64(var: &str, default: u64) -> u64 {
std::env::var(var)
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(default)
}
fn submit_message(herdr_bin: &str, pane: &str, message: &str, settle: Duration) -> CliResult<()> {
let pasted = run_herdr_command(herdr_bin, &["pane", "send-text", pane, message])?;
write_child_output(&pasted)?;
ensure_herdr_success(&pasted, "herdr pane send-text failed")?;
std::thread::sleep(settle);
let keyed = run_herdr_command(herdr_bin, &["pane", "send-keys", pane, "Enter"])?;
write_child_output(&keyed)?;
ensure_herdr_success(&keyed, "herdr pane send-keys failed")?;
Ok(())
}
fn marker_count(herdr_bin: &str, pane: &str, marker: &str) -> CliResult<usize> {
let out = run_herdr_command(
herdr_bin,
&[
"pane",
"read",
pane,
"--source",
"recent-unwrapped",
"--lines",
"400",
],
)?;
ensure_herdr_success(&out, "herdr pane read failed")?;
Ok(parse_marker_count(
&String::from_utf8_lossy(&out.stdout),
marker,
))
}
fn deliver_with_retry(args: &SendHerdrArgs, message: &str) -> CliResult<()> {
let marker = delivery_marker(message)?;
validate_bracketed_paste_payload(message)?;
let settle = Duration::from_millis(env_u64("ZYNK_VERIFY_SETTLE_MS", 300));
let timeout = Duration::from_millis(env_u64("ZYNK_VERIFY_TIMEOUT_MS", 4000));
let poll = Duration::from_millis(env_u64("ZYNK_VERIFY_POLL_MS", 200));
let attempts = env_u64("ZYNK_VERIFY_ATTEMPTS", 3).max(1);
let baseline = marker_count(&args.herdr_bin, &args.pane, &marker)?;
for _ in 0..attempts {
if marker_count(&args.herdr_bin, &args.pane, &marker)? > baseline {
return Ok(());
}
submit_message(&args.herdr_bin, &args.pane, message, settle)?;
let deadline = std::time::Instant::now() + timeout;
loop {
if marker_count(&args.herdr_bin, &args.pane, &marker)? > baseline {
return Ok(());
}
if std::time::Instant::now() >= deadline {
break;
}
std::thread::sleep(poll);
}
}
Err(CliError::failure(format!(
"delivery to pane {} unverified within timeout after {} attempt(s); not recording sent (ADR 041 D1/D4)",
args.pane, attempts
)))
}
fn validate_bracketed_paste_payload(message: &str) -> CliResult<()> {
if message.contains('\u{1b}') || message.contains('\u{009b}') {
return Err(CliError::failure(
"Herdr messages may not contain terminal control bytes (ESC or C1 CSI)",
));
}
Ok(())
}
fn run_herdr_command(herdr_bin: &str, args: &[&str]) -> CliResult<Output> {
match Command::new(herdr_bin).args(args).output() {
Ok(output) => Ok(output),
Err(error) if error.kind() == io::ErrorKind::NotFound => Err(CliError::with_code(
127,
format!("herdr CLI not found at {herdr_bin}"),
)),
Err(error) => Err(CliError::failure(format!("failed to run herdr: {error}"))),
}
}
fn write_child_output(output: &Output) -> CliResult<()> {
io::stdout()
.write_all(&output.stdout)
.map_err(|error| CliError::failure(format!("failed to write stdout: {error}")))?;
io::stderr()
.write_all(&output.stderr)
.map_err(|error| CliError::failure(format!("failed to write stderr: {error}")))?;
Ok(())
}
fn ensure_herdr_success(output: &Output, message: &str) -> CliResult<()> {
if output.status.success() {
Ok(())
} else {
Err(CliError::with_code(
output.status.code().unwrap_or(1),
message,
))
}
}
fn prepare_audit(
args: &SendHerdrArgs,
composed: &ComposedMessage,
session_id: String,
) -> CliResult<AuditArgs> {
let from = args.compose.from.as_deref().ok_or_else(|| {
CliError::usage("audited send (--session-id) requires --from agent:address")
})?;
let to = args.compose.to.as_deref().ok_or_else(|| {
CliError::usage("audited send (--session-id) requires --to agent:address")
})?;
let (source_agent, source_address) = split_agent_address(from, "--from")?;
let (target_agent, target_address) = split_agent_address(to, "--to")?;
if target_address != args.pane {
return Err(CliError::usage(format!(
"--to address ({target_address}) must equal --pane ({}) so the audit records the real destination (ADR 029 C2)",
args.pane
)));
}
let workspace_id = workspace_from_pane(&args.pane)?;
let mid = args
.compose
.mid
.clone()
.ok_or_else(|| CliError::usage("audited send (--session-id) requires --mid"))?;
let due = args
.compose
.due
.as_deref()
.and_then(crate::timestamp::canonicalize);
Ok(AuditArgs {
profile: args.compose.profile.clone(),
root: args.root.clone(),
session_id,
audit_id: None,
previous_audit_id: None,
timestamp: None,
due,
source_agent: source_agent.clone(),
source_address,
target_agent,
target_address,
transport: "herdr".to_string(),
workspace_id,
transport_thread_id: None,
mid,
record_type: composed.message_type.clone(),
mode: composed.mode.clone(),
r#ref: args.compose.r#ref.clone(),
re: args.compose.re.clone(),
command_origin: args.command_origin.clone(),
payload: Some(composed.message.clone()),
payload_file: None,
payload_redaction_policy: args.payload_redaction_policy.clone(),
payload_ref: None,
sensitive_category: args.sensitive_category.clone(),
excerpt_chars: 12,
delivery_status: "sent".to_string(),
observed_by: source_agent,
verified_by: "helper-tool".to_string(),
db: args.db.clone(),
no_db: args.no_db,
retain_custody: false,
custody_key_file: None,
})
}
fn split_agent_address(value: &str, flag: &str) -> CliResult<(String, String)> {
match value.split_once(':') {
Some((agent, address)) if !agent.is_empty() && !address.is_empty() => {
Ok((agent.to_string(), address.to_string()))
}
_ => Err(CliError::usage(format!(
"{flag} must be agent:address (got {value:?})"
))),
}
}
fn workspace_from_pane(pane: &str) -> CliResult<String> {
match pane.rsplit_once('-') {
Some((workspace, index))
if !workspace.is_empty()
&& !index.is_empty()
&& index.chars().all(|c| c.is_ascii_digit()) =>
{
Ok(workspace.to_string())
}
_ => Err(CliError::usage(format!(
"--pane must look like <workspace>-<n> to derive workspace_id (got {pane:?})"
))),
}
}
fn integrity_gap(audit_args: &AuditArgs, message: &str, cause: CliError) -> CliError {
let recovery = std::env::current_dir()
.unwrap_or_else(|_| PathBuf::from("."))
.join(format!(
"zynk-recovery-{}-{}.txt",
audit_args.session_id, audit_args.mid
));
let recovery_note = match fs::write(&recovery, message) {
Ok(()) => format!("preserved the sent message to {}", recovery.display()),
Err(error) => {
format!("FAILED to preserve the sent message ({error}); message body below:\n{message}")
}
};
CliError::with_code(
1,
format!(
"INTEGRITY GAP: message was SENT but the audit file write failed: {}\n{}\nreconcile the record with:\n {}",
cause.message,
recovery_note,
reconcile_command(audit_args, &recovery)
),
)
}
fn shell_quote(value: &str) -> String {
if !value.is_empty()
&& value
.bytes()
.all(|b| b.is_ascii_alphanumeric() || b"-_./=:+,@%".contains(&b))
{
value.to_string()
} else {
format!("'{}'", value.replace('\'', "'\\''"))
}
}
fn reconcile_command(a: &AuditArgs, recovery: &Path) -> String {
let mut parts = vec![
"zynk audit".to_string(),
format!("--root {}", shell_quote(&a.root.display().to_string())),
format!("--session-id {}", shell_quote(&a.session_id)),
format!("--source-agent {}", shell_quote(&a.source_agent)),
format!("--source-address {}", shell_quote(&a.source_address)),
format!("--target-agent {}", shell_quote(&a.target_agent)),
format!("--target-address {}", shell_quote(&a.target_address)),
format!("--transport {}", shell_quote(&a.transport)),
format!("--workspace-id {}", shell_quote(&a.workspace_id)),
format!("--mid {}", shell_quote(&a.mid)),
format!("--type {}", shell_quote(&a.record_type)),
format!("--command-origin {}", shell_quote(&a.command_origin)),
format!(
"--payload-redaction-policy {}",
shell_quote(&a.payload_redaction_policy)
),
"--delivery-status sent".to_string(),
format!("--observed-by {}", shell_quote(&a.observed_by)),
"--verified-by helper-tool".to_string(),
format!(
"--payload-file {}",
shell_quote(&recovery.display().to_string())
),
];
if a.no_db {
parts.push("--no-db".to_string());
} else if let Some(db) = &a.db {
parts.push(format!("--db {}", shell_quote(&db.display().to_string())));
}
if let Some(value) = &a.re {
parts.push(format!("--re {}", shell_quote(value)));
}
if let Some(value) = &a.r#ref {
parts.push(format!("--ref {}", shell_quote(value)));
}
if let Some(value) = &a.mode {
parts.push(format!("--mode {}", shell_quote(value)));
}
if let Some(value) = &a.due {
parts.push(format!("--due {}", shell_quote(value)));
}
parts.join(" ")
}
#[cfg(test)]
mod tests {
use super::{delivery_marker, parse_marker_count, shell_quote};
#[test]
fn parse_marker_count_counts_header_occurrences() {
let marker = "[herdr from=a to=b mid=m1 type=ack]";
assert_eq!(parse_marker_count("", marker), 0);
let twice = format!("line\n{marker}\nmiddle\n{marker}\ntail\n");
assert_eq!(parse_marker_count(&twice, marker), 2);
assert_eq!(
parse_marker_count("[some other bracketed text]\nnothing here\n", marker),
0
);
assert_eq!(parse_marker_count("anything", ""), 0);
}
#[test]
fn parse_marker_count_matches_hard_wrapped_header_value() {
let marker = "[herdr from=claude:w652eed593568a3-3 to=codex:w652eed593568a3-1 mid=smscode-claude-zynk-v14-ack type=ack ref=zynk-v1.4.0-release re=smscode-zynk-v14-ack-claude-1 mode=status]";
let wrapped = "\
› [from-claude via herdr] [herdr from=claude:w652eed593568a3-3 to=codex:w652eed593568a3-1 mid=smscode-claude-zynk-v14-ack type=ack ref=zynk-v1.4.0-release re=smscode-zynk-v14-ack-claude-
1 mode=status] BODY: delivered text
";
assert_eq!(
parse_marker_count(wrapped, marker),
1,
"a terminal hard-wrap inside a header value must not hide a delivered marker"
);
}
#[test]
fn parse_marker_count_matches_header_wrapped_between_fields() {
let marker = "[herdr from=a to=b mid=m1 type=ack ref=topic re=parent mode=review]";
let wrapped = "\
[from-a via herdr] [herdr from=a to=b mid=m1 type=ack
ref=topic re=parent mode=review] BODY: ok
";
assert_eq!(parse_marker_count(wrapped, marker), 1);
}
#[test]
fn parse_marker_count_requires_structured_field_match_not_mid_only() {
let marker = "[herdr from=a to=b mid=m1 type=ack ref=topic]";
assert_eq!(
parse_marker_count("[herdr from=x to=b mid=m1 type=ack ref=topic]", marker),
0,
"wrong sender must not verify"
);
assert_eq!(
parse_marker_count("[herdr from=a to=b mid=m1 type=ack]", marker),
0,
"missing expected optional fields must not verify"
);
assert_eq!(
parse_marker_count(
"[herdr from=x from=a to=b mid=m1 type=ack ref=topic]",
marker
),
0,
"malformed duplicate fields must not verify"
);
assert_eq!(
parse_marker_count("plain prose mentions mid=m1 but has no header", marker),
0,
"mid-only prose is not delivery proof"
);
}
#[test]
fn marker_counted_while_unrelated_bracketed_prose_does_not_block() {
let marker = "[herdr from=a to=b mid=m1 type=ack]";
let recent = format!("{marker}\nold: [unrelated bracketed note] discussed\n");
assert_eq!(parse_marker_count(&recent, marker), 1);
}
#[test]
fn delivery_marker_extracts_the_herdr_header_segment() {
let composed = "[from-claude via herdr] [herdr from=a to=b mid=m1 type=ack] BODY: hi";
assert_eq!(
delivery_marker(composed).unwrap(),
"[herdr from=a to=b mid=m1 type=ack]"
);
}
#[test]
fn delivery_marker_errors_without_a_header() {
let err = delivery_marker("no header here BODY: hi").unwrap_err();
assert!(
err.message.contains("no [herdr"),
"missing header must fail loud: {}",
err.message
);
}
#[test]
fn shell_quote_passes_safe_values_and_quotes_the_rest() {
assert_eq!(shell_quote("outputs"), "outputs");
assert_eq!(shell_quote("/a/b-c_d.e:1"), "/a/b-c_d.e:1");
assert_eq!(shell_quote(""), "''");
assert_eq!(shell_quote("with space"), "'with space'");
assert_eq!(shell_quote("a;rm -rf b"), "'a;rm -rf b'");
assert_eq!(shell_quote("a'b"), "'a'\\''b'");
}
}