use crate::audit::{self, AuditArgs};
use crate::compose::{build_message, ComposeArgs, ComposedMessage};
use crate::profile::load_profile;
use crate::{CliError, CliResult};
use clap::Args;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::fs;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, Output};
use std::time::Duration;
#[derive(Debug, Args)]
pub struct SendHerdrArgs {
#[arg(long, help = "Target herdr pane id.")]
pub pane: String,
#[arg(long, help = "Print composed message without sending.")]
pub dry_run: bool,
#[arg(long, default_value = "herdr", help = "herdr executable path.")]
pub herdr_bin: String,
#[arg(
long,
help = "session id; on a successful send this records the sender audit + message corpus automatically (ADR 029) — do not also run zynk audit for the same sent message. Omit for an ad-hoc, unaudited send."
)]
pub session_id: Option<String>,
#[arg(
long,
help = "opt out of the audited send: send the message but write no audit/corpus record (ADR 029)."
)]
pub no_audit: bool,
#[arg(
long,
default_value = "outputs",
help = "audit artifact root (independent of --db); the record is written under <root>/sessions/<session-id>/audit.md."
)]
pub root: PathBuf,
#[arg(
long,
default_value = "agent",
value_parser = ["agent", "operator", "helper-tool", "unknown"],
help = "who originated this send, for the audit record (ADR 029 C5)."
)]
pub command_origin: String,
#[arg(
long,
default_value = "full",
help = "redaction policy for the audited payload; defaults to full so the corpus is queryable (ADR 029 decision 7)."
)]
pub payload_redaction_policy: String,
#[arg(
long,
help = "sensitive category; a profile force_hash_only category forces hash-only regardless of --payload-redaction-policy."
)]
pub sensitive_category: Option<String>,
#[arg(
long,
help = "live DB path for the audited send's projection; defaults to the cwd .zynk/zynk.db (ADR 028); --no-db forces file-only."
)]
pub db: Option<PathBuf>,
#[arg(
long,
help = "force the audited send file-only; skip the DB projection."
)]
pub no_db: bool,
#[arg(
long,
help = "ADR 034 D7: ALSO retain the full sent message as recoverable ciphertext in the DB custody_vault (additive to the redacted corpus); requires an audited send (--session-id, not --no-audit/--no-db). A retention failure is LOUD (nonzero), never a silent no-retain."
)]
pub retain_custody: bool,
#[arg(
long,
help = "path to the operator-owned custody key file (default <db-dir>/custody.key or $ZYNK_CUSTODY_KEY_FILE); only used with --retain-custody."
)]
pub custody_key_file: Option<PathBuf>,
#[command(flatten)]
pub compose: ComposeArgs,
}
pub fn run(args: SendHerdrArgs) -> CliResult<()> {
let profile = load_profile(args.compose.profile.as_deref())?;
let composed = build_message(&args.compose, &profile)?;
let audited = args.session_id.is_some() && !args.no_audit;
if args.db.is_some() && !audited {
eprintln!(
"warning: --db has no effect without an audited send (--session-id, and not --no-audit)"
);
}
if args.retain_custody {
if !audited {
return Err(CliError::usage(
"--retain-custody requires an audited send (a --session-id, not --no-audit)",
));
}
if args.no_db {
return Err(CliError::usage(
"--retain-custody cannot be combined with --no-db: custody is retained in the DB vault",
));
}
}
if args.dry_run {
eprintln!("DRY RUN: message was not sent; do not record delivery_status=sent.");
println!("{}", composed.message);
return Ok(());
}
let audit_args = if audited {
let session_id = args
.session_id
.clone()
.expect("audited implies --session-id is present");
let prepared = prepare_audit(&args, &composed, session_id)?;
audit::validate_audit_args(&prepared, &profile)?;
Some(prepared)
} else {
None
};
dispatch_herdr_message(&args, &composed.message)?;
if let Some(audit_args) = audit_args {
match audit::write_audit_file(&audit_args, &profile) {
Ok((audit_path, record)) => {
audit::project_record(
audit_args.db.as_deref(),
audit_args.no_db,
&audit_args.root,
&record,
)?;
if args.retain_custody {
crate::audit::retain_custody_for(
args.db.as_deref(),
args.no_db,
args.custody_key_file.as_deref(),
&record,
composed.message.as_bytes(),
)
.map_err(|error| {
CliError::failure(format!(
"{}; record written but custody NOT retained: {}",
audit_path.display(),
error.message
))
})?;
}
}
Err(error) => return Err(integrity_gap(&audit_args, &composed.message, error)),
}
}
Ok(())
}
fn parse_herdr_header_fields(header: &str) -> Option<BTreeMap<String, String>> {
let inner = header.trim().strip_prefix("[herdr ")?.strip_suffix(']')?;
let mut fields = BTreeMap::new();
for token in inner.split_whitespace() {
let (key, value) = token.split_once('=')?;
if key.is_empty() || value.is_empty() {
return None;
}
if fields.insert(key.to_string(), value.to_string()).is_some() {
return None;
}
}
Some(fields)
}
fn collapse_header_whitespace(header: &str) -> String {
header.split_whitespace().collect::<Vec<_>>().join(" ")
}
fn join_hard_wrapped_header(header: &str) -> String {
let mut joined = String::new();
for (idx, line) in header.lines().enumerate() {
if idx == 0 {
joined.push_str(line);
} else {
joined.push_str(line.trim_start());
}
}
if joined.is_empty() {
header.to_string()
} else {
joined
}
}
fn header_fields_match(
candidate: &BTreeMap<String, String>,
expected: &BTreeMap<String, String>,
) -> bool {
for required in ["from", "to", "mid", "type"] {
if candidate.get(required) != expected.get(required) {
return false;
}
}
expected
.iter()
.all(|(key, value)| candidate.get(key) == Some(value))
}
fn header_candidate_matches(candidate: &str, expected: &BTreeMap<String, String>) -> bool {
let variants = [
candidate.to_string(),
collapse_header_whitespace(candidate),
join_hard_wrapped_header(candidate),
];
variants.iter().any(|variant| {
parse_herdr_header_fields(variant)
.as_ref()
.is_some_and(|fields| header_fields_match(fields, expected))
})
}
fn parse_marker_count(recent: &str, marker: &str) -> usize {
if marker.is_empty() {
return 0;
}
let Some(expected) = parse_herdr_header_fields(marker) else {
return 0;
};
let mut count = 0;
let mut cursor = 0;
while let Some(relative_start) = recent[cursor..].find("[herdr ") {
let start = cursor + relative_start;
let Some(relative_end) = recent[start..].find(']') else {
break;
};
let end = start + relative_end + 1;
if header_candidate_matches(&recent[start..end], &expected) {
count += 1;
}
cursor = end;
}
count
}
fn delivery_marker(composed_message: &str) -> CliResult<String> {
let start = composed_message
.find("[herdr ")
.ok_or_else(|| CliError::failure("composed message has no [herdr ...] header to verify"))?;
let rel_end = composed_message[start..]
.find(']')
.ok_or_else(|| CliError::failure("composed message [herdr ...] header is unterminated"))?;
Ok(composed_message[start..=start + rel_end].to_string())
}
fn dispatch_herdr_message(args: &SendHerdrArgs, message: &str) -> CliResult<()> {
deliver_with_marker_verify(args, message)
}
fn env_u64(var: &str, default: u64) -> u64 {
std::env::var(var)
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(default)
}
const MIN_HERDR_ATOMIC_RUN_VERSION: (u64, u64, u64) = (0, 6, 8);
const MIN_HERDR_ATOMIC_RUN_PROTOCOL: u64 = 12;
fn combined_output(output: &Output) -> String {
let mut text = String::new();
text.push_str(&String::from_utf8_lossy(&output.stdout));
text.push_str(&String::from_utf8_lossy(&output.stderr));
text
}
fn parse_herdr_version(text: &str) -> Option<(u64, u64, u64)> {
for token in text.split_whitespace() {
let candidate = token
.trim_start_matches('v')
.trim_end_matches(|ch: char| !(ch.is_ascii_alphanumeric() || ch == '.' || ch == '-'));
let numeric = candidate.split(['-', '+']).next().unwrap_or(candidate);
let parts = numeric.split('.').collect::<Vec<_>>();
if parts.len() < 2 || parts.len() > 3 {
continue;
}
let major = parts[0].parse::<u64>().ok()?;
let minor = parts[1].parse::<u64>().ok()?;
let patch = parts
.get(2)
.map_or(Some(0), |part| part.parse::<u64>().ok())?;
return Some((major, minor, patch));
}
None
}
fn version_at_least(found: (u64, u64, u64), minimum: (u64, u64, u64)) -> bool {
found.cmp(&minimum) != Ordering::Less
}
fn format_version(version: (u64, u64, u64)) -> String {
format!("{}.{}.{}", version.0, version.1, version.2)
}
fn parse_herdr_status_protocols(text: &str) -> Vec<u64> {
text.lines()
.filter_map(|line| {
let (key, value) = line.trim().split_once(':')?;
if key.trim() == "protocol" {
value.trim().parse::<u64>().ok()
} else {
None
}
})
.collect()
}
fn parse_herdr_status_compatible(text: &str) -> Option<bool> {
text.lines().find_map(|line| {
let (key, value) = line.trim().split_once(':')?;
if key.trim() == "compatible" {
Some(matches!(value.trim(), "yes" | "true" | "1"))
} else {
None
}
})
}
fn ensure_herdr_status_compatible(herdr_bin: &str) -> CliResult<()> {
let status = run_herdr_command(herdr_bin, &["status"])?;
if !status.status.success() {
return Err(CliError::with_code(
status.status.code().unwrap_or(1),
format!(
"failed to check Herdr client/server status before atomic submit; require protocol >= {} and compatible=yes. No message sent and no sent audit/corpus recorded: {}",
MIN_HERDR_ATOMIC_RUN_PROTOCOL,
combined_output(&status).trim()
),
));
}
let status_text = combined_output(&status);
let protocols = parse_herdr_status_protocols(&status_text);
if protocols.is_empty() {
return Err(CliError::failure(format!(
"could not parse Herdr protocol from `herdr status`; require protocol >= {} and compatible=yes before atomic submit. No message sent and no sent audit/corpus recorded.",
MIN_HERDR_ATOMIC_RUN_PROTOCOL
)));
}
if protocols
.iter()
.any(|protocol| *protocol < MIN_HERDR_ATOMIC_RUN_PROTOCOL)
{
return Err(CliError::failure(format!(
"Herdr protocol {:?} is too old for zynk's atomic submit path; require protocol >= {} and compatible=yes. Upgrade/restart Herdr before sending; no message sent and no sent audit/corpus recorded.",
protocols,
MIN_HERDR_ATOMIC_RUN_PROTOCOL
)));
}
if parse_herdr_status_compatible(&status_text) != Some(true) {
return Err(CliError::failure(
"Herdr client/server status is not compatible=yes; restart or upgrade Herdr before sending. No message sent and no sent audit/corpus recorded.",
));
}
Ok(())
}
fn ensure_atomic_pane_run_supported(herdr_bin: &str) -> CliResult<()> {
let version = run_herdr_command(herdr_bin, &["--version"])?;
if !version.status.success() {
return Err(CliError::with_code(
version.status.code().unwrap_or(1),
format!(
"failed to check Herdr version before atomic submit; no message sent and no sent audit/corpus recorded: {}",
combined_output(&version).trim()
),
));
}
let version_text = combined_output(&version);
let found = parse_herdr_version(&version_text).ok_or_else(|| {
CliError::failure(format!(
"could not parse Herdr version from `{}`; zynk v1.5.2 requires Herdr >= {} with `herdr pane run`; no message sent and no sent audit/corpus recorded",
version_text.trim(),
format_version(MIN_HERDR_ATOMIC_RUN_VERSION)
))
})?;
if !version_at_least(found, MIN_HERDR_ATOMIC_RUN_VERSION) {
return Err(CliError::failure(format!(
"Herdr {} is too old for zynk's atomic submit path; require Herdr >= {} with `herdr pane run`. Upgrade Herdr before sending; no message sent and no sent audit/corpus recorded.",
format_version(found),
format_version(MIN_HERDR_ATOMIC_RUN_VERSION)
)));
}
ensure_herdr_status_compatible(herdr_bin)?;
let pane_help = run_herdr_command(herdr_bin, &["pane", "--help"])?;
if !pane_help.status.success() {
return Err(CliError::with_code(
pane_help.status.code().unwrap_or(1),
format!(
"failed to check Herdr pane command capabilities before atomic submit; require `herdr pane run`; no message sent and no sent audit/corpus recorded: {}",
combined_output(&pane_help).trim()
),
));
}
let pane_help_text = combined_output(&pane_help);
if !pane_help_text.contains("pane run") {
return Err(CliError::failure(format!(
"Herdr {} does not advertise required atomic submit capability `herdr pane run`; upgrade Herdr before sending. No message sent and no sent audit/corpus recorded.",
format_version(found)
)));
}
Ok(())
}
fn submit_message(herdr_bin: &str, pane: &str, message: &str) -> CliResult<()> {
let submitted = run_herdr_command(herdr_bin, &["pane", "run", pane, message])?;
write_child_output(&submitted)?;
ensure_herdr_success(
&submitted,
"herdr pane run failed (atomic submit); no sent audit/corpus recorded. Confirm Herdr >= 0.6.8, protocol >= 12, compatible=yes, and `pane run` support; inspect the target pane for stuck input, and recover manually before retrying",
)?;
Ok(())
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
struct MarkerCounts {
recent_unwrapped: usize,
visible: usize,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
struct MarkerSnapshot {
counts: MarkerCounts,
}
impl MarkerSnapshot {
fn verifies_since(&self, baseline: &Self, post_submit: Option<&Self>) -> bool {
if self.counts.recent_unwrapped > baseline.counts.recent_unwrapped {
return true;
}
let visible_floor = post_submit
.map(|snapshot| baseline.counts.visible.max(snapshot.counts.visible))
.unwrap_or(baseline.counts.visible);
self.counts.visible > visible_floor
}
}
fn marker_snapshot(herdr_bin: &str, pane: &str, marker: &str) -> CliResult<MarkerSnapshot> {
let recent = run_herdr_command(
herdr_bin,
&[
"pane",
"read",
pane,
"--source",
"recent-unwrapped",
"--lines",
"400",
],
)?;
ensure_herdr_success(&recent, "herdr pane read failed")?;
let visible = run_herdr_command(herdr_bin, &["pane", "read", pane, "--source", "visible"])?;
ensure_herdr_success(&visible, "herdr pane read failed")?;
let visible_text = String::from_utf8_lossy(&visible.stdout).to_string();
Ok(MarkerSnapshot {
counts: MarkerCounts {
recent_unwrapped: parse_marker_count(&String::from_utf8_lossy(&recent.stdout), marker),
visible: parse_marker_count(&visible_text, marker),
},
})
}
fn deliver_with_marker_verify(args: &SendHerdrArgs, message: &str) -> CliResult<()> {
let marker = delivery_marker(message)?;
validate_herdr_payload(message)?;
let timeout = Duration::from_millis(env_u64("ZYNK_VERIFY_TIMEOUT_MS", 30_000));
let poll = Duration::from_millis(env_u64("ZYNK_VERIFY_POLL_MS", 200));
ensure_atomic_pane_run_supported(&args.herdr_bin)?;
let baseline = marker_snapshot(&args.herdr_bin, &args.pane, &marker)?;
submit_message(&args.herdr_bin, &args.pane, message)?;
let post_submit = marker_snapshot(&args.herdr_bin, &args.pane, &marker)?;
let deadline = std::time::Instant::now() + timeout;
loop {
if marker_snapshot(&args.herdr_bin, &args.pane, &marker)?
.verifies_since(&baseline, Some(&post_submit))
{
return Ok(());
}
if std::time::Instant::now() >= deadline {
break;
}
std::thread::sleep(poll);
}
Err(CliError::failure(format!(
"delivery to pane {} unverified within timeout; not recording sent (ADR 041 D1/D4). \
If the message is visible but still stuck in the target input, inspect the target pane and recover manually; zynk recorded no sent audit/corpus.",
args.pane
)))
}
fn validate_herdr_payload(message: &str) -> CliResult<()> {
if message.contains('\u{1b}') || message.contains('\u{009b}') {
return Err(CliError::failure(
"Herdr messages may not contain terminal control bytes (ESC or C1 CSI)",
));
}
Ok(())
}
fn run_herdr_command(herdr_bin: &str, args: &[&str]) -> CliResult<Output> {
match Command::new(herdr_bin).args(args).output() {
Ok(output) => Ok(output),
Err(error) if error.kind() == io::ErrorKind::NotFound => Err(CliError::with_code(
127,
format!("herdr CLI not found at {herdr_bin}"),
)),
Err(error) => Err(CliError::failure(format!("failed to run herdr: {error}"))),
}
}
fn write_child_output(output: &Output) -> CliResult<()> {
io::stdout()
.write_all(&output.stdout)
.map_err(|error| CliError::failure(format!("failed to write stdout: {error}")))?;
io::stderr()
.write_all(&output.stderr)
.map_err(|error| CliError::failure(format!("failed to write stderr: {error}")))?;
Ok(())
}
fn ensure_herdr_success(output: &Output, message: &str) -> CliResult<()> {
if output.status.success() {
Ok(())
} else {
Err(CliError::with_code(
output.status.code().unwrap_or(1),
message,
))
}
}
fn prepare_audit(
args: &SendHerdrArgs,
composed: &ComposedMessage,
session_id: String,
) -> CliResult<AuditArgs> {
let from = args.compose.from.as_deref().ok_or_else(|| {
CliError::usage("audited send (--session-id) requires --from agent:address")
})?;
let to = args.compose.to.as_deref().ok_or_else(|| {
CliError::usage("audited send (--session-id) requires --to agent:address")
})?;
let (source_agent, source_address) = split_agent_address(from, "--from")?;
let (target_agent, target_address) = split_agent_address(to, "--to")?;
if target_address != args.pane {
return Err(CliError::usage(format!(
"--to address ({target_address}) must equal --pane ({}) so the audit records the real destination (ADR 029 C2)",
args.pane
)));
}
let workspace_id = workspace_from_pane(&args.pane)?;
let mid = args
.compose
.mid
.clone()
.ok_or_else(|| CliError::usage("audited send (--session-id) requires --mid"))?;
let due = args
.compose
.due
.as_deref()
.and_then(crate::timestamp::canonicalize);
Ok(AuditArgs {
profile: args.compose.profile.clone(),
root: args.root.clone(),
session_id,
audit_id: None,
previous_audit_id: None,
timestamp: None,
due,
source_agent: source_agent.clone(),
source_address,
target_agent,
target_address,
transport: "herdr".to_string(),
workspace_id,
transport_thread_id: None,
mid,
record_type: composed.message_type.clone(),
mode: composed.mode.clone(),
r#ref: args.compose.r#ref.clone(),
re: args.compose.re.clone(),
command_origin: args.command_origin.clone(),
payload: Some(composed.message.clone()),
payload_file: None,
payload_redaction_policy: args.payload_redaction_policy.clone(),
payload_ref: None,
sensitive_category: args.sensitive_category.clone(),
excerpt_chars: 12,
delivery_status: "sent".to_string(),
observed_by: source_agent,
verified_by: "helper-tool".to_string(),
db: args.db.clone(),
no_db: args.no_db,
retain_custody: false,
custody_key_file: None,
})
}
fn split_agent_address(value: &str, flag: &str) -> CliResult<(String, String)> {
match value.split_once(':') {
Some((agent, address)) if !agent.is_empty() && !address.is_empty() => {
Ok((agent.to_string(), address.to_string()))
}
_ => Err(CliError::usage(format!(
"{flag} must be agent:address (got {value:?})"
))),
}
}
fn workspace_from_pane(pane: &str) -> CliResult<String> {
match pane.rsplit_once('-') {
Some((workspace, index))
if !workspace.is_empty()
&& !index.is_empty()
&& index.chars().all(|c| c.is_ascii_digit()) =>
{
Ok(workspace.to_string())
}
_ => Err(CliError::usage(format!(
"--pane must look like <workspace>-<n> to derive workspace_id (got {pane:?})"
))),
}
}
fn integrity_gap(audit_args: &AuditArgs, message: &str, cause: CliError) -> CliError {
let recovery = std::env::current_dir()
.unwrap_or_else(|_| PathBuf::from("."))
.join(format!(
"zynk-recovery-{}-{}.txt",
audit_args.session_id, audit_args.mid
));
let recovery_note = match fs::write(&recovery, message) {
Ok(()) => format!("preserved the sent message to {}", recovery.display()),
Err(error) => {
format!("FAILED to preserve the sent message ({error}); message body below:\n{message}")
}
};
CliError::with_code(
1,
format!(
"INTEGRITY GAP: message was SENT but the audit file write failed: {}\n{}\nreconcile the record with:\n {}",
cause.message,
recovery_note,
reconcile_command(audit_args, &recovery)
),
)
}
fn shell_quote(value: &str) -> String {
if !value.is_empty()
&& value
.bytes()
.all(|b| b.is_ascii_alphanumeric() || b"-_./=:+,@%".contains(&b))
{
value.to_string()
} else {
format!("'{}'", value.replace('\'', "'\\''"))
}
}
fn reconcile_command(a: &AuditArgs, recovery: &Path) -> String {
let mut parts = vec![
"zynk audit".to_string(),
format!("--root {}", shell_quote(&a.root.display().to_string())),
format!("--session-id {}", shell_quote(&a.session_id)),
format!("--source-agent {}", shell_quote(&a.source_agent)),
format!("--source-address {}", shell_quote(&a.source_address)),
format!("--target-agent {}", shell_quote(&a.target_agent)),
format!("--target-address {}", shell_quote(&a.target_address)),
format!("--transport {}", shell_quote(&a.transport)),
format!("--workspace-id {}", shell_quote(&a.workspace_id)),
format!("--mid {}", shell_quote(&a.mid)),
format!("--type {}", shell_quote(&a.record_type)),
format!("--command-origin {}", shell_quote(&a.command_origin)),
format!(
"--payload-redaction-policy {}",
shell_quote(&a.payload_redaction_policy)
),
"--delivery-status sent".to_string(),
format!("--observed-by {}", shell_quote(&a.observed_by)),
"--verified-by helper-tool".to_string(),
format!(
"--payload-file {}",
shell_quote(&recovery.display().to_string())
),
];
if a.no_db {
parts.push("--no-db".to_string());
} else if let Some(db) = &a.db {
parts.push(format!("--db {}", shell_quote(&db.display().to_string())));
}
if let Some(value) = &a.re {
parts.push(format!("--re {}", shell_quote(value)));
}
if let Some(value) = &a.r#ref {
parts.push(format!("--ref {}", shell_quote(value)));
}
if let Some(value) = &a.mode {
parts.push(format!("--mode {}", shell_quote(value)));
}
if let Some(value) = &a.due {
parts.push(format!("--due {}", shell_quote(value)));
}
parts.join(" ")
}
#[cfg(test)]
mod tests {
use super::{
delivery_marker, parse_herdr_status_compatible, parse_herdr_status_protocols,
parse_herdr_version, parse_marker_count, shell_quote, version_at_least,
MIN_HERDR_ATOMIC_RUN_VERSION,
};
#[test]
fn herdr_version_parser_accepts_current_cli_output() {
assert_eq!(parse_herdr_version("herdr 0.6.8\n"), Some((0, 6, 8)));
assert_eq!(
parse_herdr_version("herdr v0.7.0-preview\n"),
Some((0, 7, 0))
);
assert!(version_at_least((0, 6, 8), MIN_HERDR_ATOMIC_RUN_VERSION));
assert!(version_at_least((0, 7, 0), MIN_HERDR_ATOMIC_RUN_VERSION));
assert!(!version_at_least((0, 6, 7), MIN_HERDR_ATOMIC_RUN_VERSION));
}
#[test]
fn herdr_status_parser_accepts_current_status_output() {
let status = "client:\n version: 0.6.8\n channel: stable\n protocol: 12\n\nserver:\n status: running\n version: 0.6.8\n protocol: 12\n compatible: yes\n";
assert_eq!(parse_herdr_status_protocols(status), vec![12, 12]);
assert_eq!(parse_herdr_status_compatible(status), Some(true));
assert_eq!(
parse_herdr_status_compatible("compatible: no\n"),
Some(false)
);
assert_eq!(
parse_herdr_status_protocols("protocol: 11\nprotocol: 12\n"),
vec![11, 12]
);
}
#[test]
fn parse_marker_count_counts_header_occurrences() {
let marker = "[herdr from=a to=b mid=m1 type=ack]";
assert_eq!(parse_marker_count("", marker), 0);
let twice = format!("line\n{marker}\nmiddle\n{marker}\ntail\n");
assert_eq!(parse_marker_count(&twice, marker), 2);
assert_eq!(
parse_marker_count("[some other bracketed text]\nnothing here\n", marker),
0
);
assert_eq!(parse_marker_count("anything", ""), 0);
}
#[test]
fn parse_marker_count_matches_hard_wrapped_header_value() {
let marker = "[herdr from=claude:w652eed593568a3-3 to=codex:w652eed593568a3-1 mid=smscode-claude-zynk-v14-ack type=ack ref=zynk-v1.4.0-release re=smscode-zynk-v14-ack-claude-1 mode=status]";
let wrapped = "\
› [from-claude via herdr] [herdr from=claude:w652eed593568a3-3 to=codex:w652eed593568a3-1 mid=smscode-claude-zynk-v14-ack type=ack ref=zynk-v1.4.0-release re=smscode-zynk-v14-ack-claude-
1 mode=status] BODY: delivered text
";
assert_eq!(
parse_marker_count(wrapped, marker),
1,
"a terminal hard-wrap inside a header value must not hide a delivered marker"
);
}
#[test]
fn parse_marker_count_matches_header_wrapped_between_fields() {
let marker = "[herdr from=a to=b mid=m1 type=ack ref=topic re=parent mode=review]";
let wrapped = "\
[from-a via herdr] [herdr from=a to=b mid=m1 type=ack
ref=topic re=parent mode=review] BODY: ok
";
assert_eq!(parse_marker_count(wrapped, marker), 1);
}
#[test]
fn parse_marker_count_requires_structured_field_match_not_mid_only() {
let marker = "[herdr from=a to=b mid=m1 type=ack ref=topic]";
assert_eq!(
parse_marker_count("[herdr from=x to=b mid=m1 type=ack ref=topic]", marker),
0,
"wrong sender must not verify"
);
assert_eq!(
parse_marker_count("[herdr from=a to=b mid=m1 type=ack]", marker),
0,
"missing expected optional fields must not verify"
);
assert_eq!(
parse_marker_count(
"[herdr from=x from=a to=b mid=m1 type=ack ref=topic]",
marker
),
0,
"malformed duplicate fields must not verify"
);
assert_eq!(
parse_marker_count("plain prose mentions mid=m1 but has no header", marker),
0,
"mid-only prose is not delivery proof"
);
}
#[test]
fn marker_counted_while_unrelated_bracketed_prose_does_not_block() {
let marker = "[herdr from=a to=b mid=m1 type=ack]";
let recent = format!("{marker}\nold: [unrelated bracketed note] discussed\n");
assert_eq!(parse_marker_count(&recent, marker), 1);
}
#[test]
fn delivery_marker_extracts_the_herdr_header_segment() {
let composed = "[from-claude via herdr] [herdr from=a to=b mid=m1 type=ack] BODY: hi";
assert_eq!(
delivery_marker(composed).unwrap(),
"[herdr from=a to=b mid=m1 type=ack]"
);
}
#[test]
fn delivery_marker_errors_without_a_header() {
let err = delivery_marker("no header here BODY: hi").unwrap_err();
assert!(
err.message.contains("no [herdr"),
"missing header must fail loud: {}",
err.message
);
}
#[test]
fn shell_quote_passes_safe_values_and_quotes_the_rest() {
assert_eq!(shell_quote("outputs"), "outputs");
assert_eq!(shell_quote("/a/b-c_d.e:1"), "/a/b-c_d.e:1");
assert_eq!(shell_quote(""), "''");
assert_eq!(shell_quote("with space"), "'with space'");
assert_eq!(shell_quote("a;rm -rf b"), "'a;rm -rf b'");
assert_eq!(shell_quote("a'b"), "'a'\\''b'");
}
}