Skip to main content

netsky_core/
envelope.rs

1//! Shared agent-bus envelope: the on-disk shape, filename convention, and
2//! atomic-with-create-new write used by every producer on the bus. Consumers
3//! read envelopes with their own deserializer; writers MUST go through this
4//! module so every inbox file agrees on filename shape.
5//!
6//! Filename: `{nanos:020}-{pid:010}-{rand_hex:08}-{seq:010}-from-{from}.json`.
7//! The nanos+pid+rand+seq quartet gives cross-process uniqueness that a
8//! process-local `AtomicU64` alone cannot: two fresh `netsky channel send`
9//! processes both start at `seq=0`, and a wall-clock nanosecond collision
10//! between them would clobber each other's envelope under the old shape.
11//!
12//! Ordering is best-effort: consumers sort filenames lexicographically,
13//! which orders by nanos first. Clock skew between producers or concurrent
14//! writers can reorder causal order. Consumers that need causal ordering
15//! must embed a sequence number in the envelope body.
16//!
17//! Write flow: serialize → write `.<name>.<attempt>.tmp` → hard-link to
18//! `<name>` (create-new semantics; fails on collision) → unlink tmp.
19//! Retries with a fresh name on collision; returns AlreadyExists after 8
20//! failed attempts so a wedged inbox cannot spin forever.
21
22use std::fs;
23use std::io;
24use std::path::{Path, PathBuf};
25use std::sync::atomic::{AtomicU64, Ordering};
26
27use serde::{Deserialize, Serialize};
28
29const CHANNEL_WRAPPER_TOKENS: &[&str] = &["</channel>", "<channel source="];
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Envelope {
33    pub from: String,
34    pub text: String,
35    pub ts: String,
36}
37
38/// `from`/`target` id validator: `agent` prefix + non-empty lowercase-alnum
39/// suffix. Mirrored in the MCP agent source so filenames and inbox names
40/// round-trip under a single rule.
41pub fn valid_agent_id(id: &str) -> bool {
42    match id.strip_prefix("agent") {
43        Some(suffix) => {
44            !suffix.is_empty()
45                && suffix
46                    .chars()
47                    .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit())
48        }
49        None => false,
50    }
51}
52
53/// Build the canonical inbox filename. `from` is trusted to have already
54/// passed [`valid_agent_id`] — callers that have not validated must do so
55/// before constructing envelopes.
56pub fn build_filename(from: &str) -> String {
57    static SEQ: AtomicU64 = AtomicU64::new(0);
58    let seq = SEQ.fetch_add(1, Ordering::Relaxed);
59    let nanos = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
60    let pid = std::process::id();
61    let rand_hex: u32 = rand::random();
62    format!("{nanos:020}-{pid:010}-{rand_hex:08x}-{seq:010}-from-{from}.json")
63}
64
65/// Atomically write `env` into `inbox`. Loops up to 8 times on filename
66/// collision (AlreadyExists) with a fresh name each attempt. Returns the
67/// final path on success.
68pub fn write_envelope(inbox: &Path, env: &Envelope) -> io::Result<PathBuf> {
69    fs::create_dir_all(inbox)?;
70    let bytes =
71        serde_json::to_vec(env).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
72    let mut last_err: Option<io::Error> = None;
73    for attempt in 0..8 {
74        let name = build_filename(&env.from);
75        let final_path = inbox.join(&name);
76        let tmp_path = inbox.join(format!(".{name}.{attempt}.tmp"));
77        fs::write(&tmp_path, &bytes)?;
78        match fs::hard_link(&tmp_path, &final_path) {
79            Ok(()) => {
80                let _ = fs::remove_file(&tmp_path);
81                return Ok(final_path);
82            }
83            Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
84                let _ = fs::remove_file(&tmp_path);
85                last_err = Some(e);
86                continue;
87            }
88            Err(e) => {
89                let _ = fs::remove_file(&tmp_path);
90                return Err(e);
91            }
92        }
93    }
94    Err(last_err.unwrap_or_else(|| {
95        io::Error::new(
96            io::ErrorKind::AlreadyExists,
97            "exhausted 8 attempts to create unique envelope filename",
98        )
99    }))
100}
101
102/// Reject content that would break the shell-facing `<channel>` framing.
103pub fn body_contains_wrapper_tokens(body: &str) -> bool {
104    CHANNEL_WRAPPER_TOKENS
105        .iter()
106        .any(|needle| body.contains(needle))
107}
108
109/// Validate the shared agent-bus envelope shape before a drain prints it
110/// into channel framing.
111pub fn validate_bus_envelope(env: &Envelope) -> Result<(), String> {
112    if !valid_agent_id(&env.from) {
113        return Err(format!(
114            "invalid from {:?} (expected agent<lowercase-alnum>)",
115            env.from
116        ));
117    }
118    if chrono::DateTime::parse_from_rfc3339(&env.ts).is_err() {
119        return Err(format!("invalid ts {:?} (expected RFC3339)", env.ts));
120    }
121    if body_contains_wrapper_tokens(&env.text) {
122        return Err("body contains <channel> wrapper token; framing-break rejected".to_string());
123    }
124    Ok(())
125}
126
127/// Escape the channel body defensively before it is wrapped in XML-like
128/// framing and shown to a model.
129pub fn xml_escape_body(s: &str) -> String {
130    let mut out = String::with_capacity(s.len());
131    for c in s.chars() {
132        match c {
133            '&' => out.push_str("&amp;"),
134            '<' => out.push_str("&lt;"),
135            '>' => out.push_str("&gt;"),
136            '"' => out.push_str("&quot;"),
137            '\'' => out.push_str("&apos;"),
138            other => out.push(other),
139        }
140    }
141    out
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147
148    #[test]
149    fn valid_agent_id_accepts_canonical_shapes() {
150        assert!(valid_agent_id("agent0"));
151        assert!(valid_agent_id("agent42"));
152        assert!(valid_agent_id("agentinfinity"));
153    }
154
155    #[test]
156    fn valid_agent_id_rejects_junk() {
157        assert!(!valid_agent_id(""));
158        assert!(!valid_agent_id("agent"));
159        assert!(!valid_agent_id("AGENT0"));
160        assert!(!valid_agent_id("agent-7"));
161        assert!(!valid_agent_id("bob"));
162        assert!(!valid_agent_id("agent 0"));
163    }
164
165    #[test]
166    fn build_filename_shape() {
167        let n = build_filename("agent7");
168        assert!(n.ends_with("-from-agent7.json"));
169        let parts: Vec<&str> = n.splitn(5, '-').collect();
170        assert_eq!(parts.len(), 5);
171        assert_eq!(parts[0].len(), 20);
172        assert_eq!(parts[1].len(), 10);
173        assert_eq!(parts[2].len(), 8);
174    }
175
176    #[test]
177    fn build_filename_is_unique_within_process() {
178        let mut seen = std::collections::HashSet::new();
179        for _ in 0..1000 {
180            assert!(seen.insert(build_filename("agent0")));
181        }
182    }
183
184    #[test]
185    fn write_envelope_round_trips() {
186        let tmp = tempfile::tempdir().unwrap();
187        let env = Envelope {
188            from: "agent2".to_string(),
189            text: "hello".to_string(),
190            ts: "2026-04-15T21:00:00Z".to_string(),
191        };
192        let path = write_envelope(tmp.path(), &env).unwrap();
193        let raw = fs::read_to_string(&path).unwrap();
194        let round: Envelope = serde_json::from_str(&raw).unwrap();
195        assert_eq!(round.from, env.from);
196        assert_eq!(round.text, env.text);
197    }
198
199    #[test]
200    fn write_envelope_rejects_name_collision() {
201        // Pre-populate the inbox with a file whose hard-link target we
202        // force to collide. We cannot cheaply force the AtomicU64+rand to
203        // repeat, so assert the happy path here and exercise the collision
204        // branch via a unit on the retry loop.
205        let tmp = tempfile::tempdir().unwrap();
206        let env = Envelope {
207            from: "agent2".to_string(),
208            text: "hi".to_string(),
209            ts: "2026-04-15T21:00:00Z".to_string(),
210        };
211        let a = write_envelope(tmp.path(), &env).unwrap();
212        let b = write_envelope(tmp.path(), &env).unwrap();
213        assert_ne!(a, b);
214    }
215}