Skip to main content

netsky_core/
envelope.rs

1//! Shared agent-bus envelope: the on-disk shape, filename convention, and
2//! atomic-with-create-new write used by every producer on the bus. Consumers
3//! read envelopes with their own deserializer; writers MUST go through this
4//! module so every inbox file agrees on filename shape.
5//!
6//! Filename: `{nanos:020}-{pid:010}-{rand_hex:08}-{seq:010}-from-{from}.json`.
7//! The nanos+pid+rand+seq quartet gives cross-process uniqueness that a
8//! process-local `AtomicU64` alone cannot: two fresh `netsky channel send`
9//! processes both start at `seq=0`, and a wall-clock nanosecond collision
10//! between them would clobber each other's envelope under the old shape.
11//!
12//! Ordering is best-effort: consumers sort filenames lexicographically,
13//! which orders by nanos first. Clock skew between producers or concurrent
14//! writers can reorder causal order. Consumers that need causal ordering
15//! must embed a sequence number in the envelope body.
16//!
17//! Write flow: serialize → write `.<name>.<attempt>.tmp` → hard-link to
18//! `<name>` (create-new semantics; fails on collision) → unlink tmp.
19//! Retries with a fresh name on collision; returns AlreadyExists after 8
20//! failed attempts so a wedged inbox cannot spin forever.
21
22use std::fs;
23use std::io;
24use std::path::{Path, PathBuf};
25use std::sync::atomic::{AtomicU64, Ordering};
26
27use serde::{Deserialize, Serialize};
28
29const CHANNEL_WRAPPER_TOKENS: &[&str] = &["</channel>", "<channel source="];
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Envelope {
33    pub id: Option<String>,
34    pub from: String,
35    pub to: Option<String>,
36    pub kind: Option<String>,
37    pub in_reply_to: Option<String>,
38    pub thread: Option<String>,
39    pub swarm: Option<String>,
40    pub idempotency_key: Option<String>,
41    pub requires_ack: Option<bool>,
42    pub text: String,
43    pub ts: String,
44}
45
46impl Envelope {
47    pub fn new(from: impl Into<String>, text: impl Into<String>, ts: impl Into<String>) -> Self {
48        Self {
49            id: None,
50            from: from.into(),
51            to: None,
52            kind: None,
53            in_reply_to: None,
54            thread: None,
55            swarm: None,
56            idempotency_key: None,
57            requires_ack: None,
58            text: text.into(),
59            ts: ts.into(),
60        }
61    }
62
63    pub fn kind_or_text(&self) -> &str {
64        self.kind
65            .as_deref()
66            .unwrap_or(crate::restart_handshake::KIND_TEXT)
67    }
68}
69
70/// `from`/`target` id validator: `agent` prefix + non-empty lowercase-alnum
71/// suffix, optionally preceded by the `test-` test-harness namespace.
72/// Accepted shapes: `agent0`, `agent42`, `agentinfinity`, `test-agent97`,
73/// `test-agentinfinity`. Mirrored in the MCP agent source so filenames
74/// and inbox names round-trip under a single rule.
75pub fn valid_agent_id(id: &str) -> bool {
76    let core = id.strip_prefix("test-").unwrap_or(id);
77    match core.strip_prefix("agent") {
78        Some(suffix) => {
79            !suffix.is_empty()
80                && suffix
81                    .chars()
82                    .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit())
83        }
84        None => false,
85    }
86}
87
88/// Build the canonical inbox filename. `from` is trusted to have already
89/// passed [`valid_agent_id`] — callers that have not validated must do so
90/// before constructing envelopes.
91pub fn build_filename(from: &str) -> String {
92    static SEQ: AtomicU64 = AtomicU64::new(0);
93    let seq = SEQ.fetch_add(1, Ordering::Relaxed);
94    let nanos = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
95    let pid = std::process::id();
96    let rand_hex: u32 = rand::random();
97    format!("{nanos:020}-{pid:010}-{rand_hex:08x}-{seq:010}-from-{from}.json")
98}
99
100/// Atomically write `env` into `inbox`. Loops up to 8 times on filename
101/// collision (AlreadyExists) with a fresh name each attempt. Returns the
102/// final path on success.
103pub fn write_envelope(inbox: &Path, env: &Envelope) -> io::Result<PathBuf> {
104    fs::create_dir_all(inbox)?;
105    let bytes =
106        serde_json::to_vec(env).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
107    let mut last_err: Option<io::Error> = None;
108    for attempt in 0..8 {
109        let name = build_filename(&env.from);
110        let final_path = inbox.join(&name);
111        let tmp_path = inbox.join(format!(".{name}.{attempt}.tmp"));
112        fs::write(&tmp_path, &bytes)?;
113        match fs::hard_link(&tmp_path, &final_path) {
114            Ok(()) => {
115                let _ = fs::remove_file(&tmp_path);
116                return Ok(final_path);
117            }
118            Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
119                let _ = fs::remove_file(&tmp_path);
120                last_err = Some(e);
121                continue;
122            }
123            Err(e) => {
124                let _ = fs::remove_file(&tmp_path);
125                return Err(e);
126            }
127        }
128    }
129    Err(last_err.unwrap_or_else(|| {
130        io::Error::new(
131            io::ErrorKind::AlreadyExists,
132            "exhausted 8 attempts to create unique envelope filename",
133        )
134    }))
135}
136
137/// Reject content that would break the shell-facing `<channel>` framing.
138pub fn body_contains_wrapper_tokens(body: &str) -> bool {
139    CHANNEL_WRAPPER_TOKENS
140        .iter()
141        .any(|needle| body.contains(needle))
142}
143
144/// Validate the shared agent-bus envelope shape before a drain prints it
145/// into channel framing.
146pub fn validate_bus_envelope(env: &Envelope) -> Result<(), String> {
147    if !valid_agent_id(&env.from) {
148        return Err(format!(
149            "invalid from {:?} (expected agent<lowercase-alnum>)",
150            env.from
151        ));
152    }
153    if chrono::DateTime::parse_from_rfc3339(&env.ts).is_err() {
154        return Err(format!("invalid ts {:?} (expected RFC3339)", env.ts));
155    }
156    if body_contains_wrapper_tokens(&env.text) {
157        return Err("body contains <channel> wrapper token; framing-break rejected".to_string());
158    }
159    Ok(())
160}
161
162/// Escape the channel body defensively before it is wrapped in XML-like
163/// framing and shown to a model.
164pub fn xml_escape_body(s: &str) -> String {
165    let mut out = String::with_capacity(s.len());
166    for c in s.chars() {
167        match c {
168            '&' => out.push_str("&amp;"),
169            '<' => out.push_str("&lt;"),
170            '>' => out.push_str("&gt;"),
171            '"' => out.push_str("&quot;"),
172            '\'' => out.push_str("&apos;"),
173            other => out.push(other),
174        }
175    }
176    out
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    #[test]
184    fn valid_agent_id_accepts_canonical_shapes() {
185        assert!(valid_agent_id("agent0"));
186        assert!(valid_agent_id("agent42"));
187        assert!(valid_agent_id("agentinfinity"));
188    }
189
190    #[test]
191    fn valid_agent_id_accepts_test_namespace() {
192        assert!(valid_agent_id("test-agent0"));
193        assert!(valid_agent_id("test-agent97"));
194        assert!(valid_agent_id("test-agentinfinity"));
195    }
196
197    #[test]
198    fn valid_agent_id_rejects_junk() {
199        assert!(!valid_agent_id(""));
200        assert!(!valid_agent_id("agent"));
201        assert!(!valid_agent_id("AGENT0"));
202        assert!(!valid_agent_id("agent-7"));
203        assert!(!valid_agent_id("bob"));
204        assert!(!valid_agent_id("agent 0"));
205        // `test-` is only a namespace prefix; bare `test-` or `test-bob` are not valid.
206        assert!(!valid_agent_id("test-"));
207        assert!(!valid_agent_id("test-bob"));
208        assert!(!valid_agent_id("test-agent"));
209    }
210
211    #[test]
212    fn build_filename_shape() {
213        let n = build_filename("agent7");
214        assert!(n.ends_with("-from-agent7.json"));
215        let parts: Vec<&str> = n.splitn(5, '-').collect();
216        assert_eq!(parts.len(), 5);
217        assert_eq!(parts[0].len(), 20);
218        assert_eq!(parts[1].len(), 10);
219        assert_eq!(parts[2].len(), 8);
220    }
221
222    #[test]
223    fn build_filename_is_unique_within_process() {
224        let mut seen = std::collections::HashSet::new();
225        for _ in 0..1000 {
226            assert!(seen.insert(build_filename("agent0")));
227        }
228    }
229
230    #[test]
231    fn write_envelope_round_trips() {
232        let tmp = tempfile::tempdir().unwrap();
233        let env = Envelope {
234            id: None,
235            from: "agent2".to_string(),
236            to: None,
237            kind: None,
238            in_reply_to: None,
239            thread: None,
240            swarm: None,
241            idempotency_key: None,
242            requires_ack: None,
243            text: "hello".to_string(),
244            ts: "2026-04-15T21:00:00Z".to_string(),
245        };
246        let path = write_envelope(tmp.path(), &env).unwrap();
247        let raw = fs::read_to_string(&path).unwrap();
248        let round: Envelope = serde_json::from_str(&raw).unwrap();
249        assert_eq!(round.from, env.from);
250        assert_eq!(round.text, env.text);
251    }
252
253    #[test]
254    fn legacy_envelope_deserializes_with_v1_defaults() {
255        let raw = r#"{"from":"agent0","text":"hello","ts":"2026-04-15T21:00:00Z"}"#;
256        let env: Envelope = serde_json::from_str(raw).unwrap();
257        assert_eq!(env.from, "agent0");
258        assert_eq!(env.text, "hello");
259        assert_eq!(env.ts, "2026-04-15T21:00:00Z");
260        assert_eq!(env.id, None);
261        assert_eq!(env.to, None);
262        assert_eq!(env.kind, None);
263        assert_eq!(env.in_reply_to, None);
264        assert_eq!(env.thread, None);
265        assert_eq!(env.swarm, None);
266        assert_eq!(env.idempotency_key, None);
267        assert_eq!(env.requires_ack, None);
268        assert_eq!(env.kind_or_text(), "text");
269    }
270
271    #[test]
272    fn v1_envelope_round_trips_optional_fields() {
273        let env = Envelope {
274            id: Some("msg-1".to_string()),
275            from: "agent0".to_string(),
276            to: Some("agent4".to_string()),
277            kind: Some("brief".to_string()),
278            in_reply_to: Some("msg-0".to_string()),
279            thread: Some("thread-1".to_string()),
280            swarm: Some("swarm-1".to_string()),
281            idempotency_key: Some("idem-1".to_string()),
282            requires_ack: Some(true),
283            text: "hello".to_string(),
284            ts: "2026-04-15T21:00:00Z".to_string(),
285        };
286        let raw = serde_json::to_string(&env).unwrap();
287        let round: Envelope = serde_json::from_str(&raw).unwrap();
288        assert_eq!(round.id.as_deref(), Some("msg-1"));
289        assert_eq!(round.to.as_deref(), Some("agent4"));
290        assert_eq!(round.kind.as_deref(), Some("brief"));
291        assert_eq!(round.in_reply_to.as_deref(), Some("msg-0"));
292        assert_eq!(round.thread.as_deref(), Some("thread-1"));
293        assert_eq!(round.swarm.as_deref(), Some("swarm-1"));
294        assert_eq!(round.idempotency_key.as_deref(), Some("idem-1"));
295        assert_eq!(round.requires_ack, Some(true));
296    }
297
298    #[test]
299    fn write_envelope_rejects_name_collision() {
300        // Pre-populate the inbox with a file whose hard-link target we
301        // force to collide. We cannot cheaply force the AtomicU64+rand to
302        // repeat, so assert the happy path here and exercise the collision
303        // branch via a unit on the retry loop.
304        let tmp = tempfile::tempdir().unwrap();
305        let env = Envelope {
306            id: None,
307            from: "agent2".to_string(),
308            to: None,
309            kind: None,
310            in_reply_to: None,
311            thread: None,
312            swarm: None,
313            idempotency_key: None,
314            requires_ack: None,
315            text: "hi".to_string(),
316            ts: "2026-04-15T21:00:00Z".to_string(),
317        };
318        let a = write_envelope(tmp.path(), &env).unwrap();
319        let b = write_envelope(tmp.path(), &env).unwrap();
320        assert_ne!(a, b);
321    }
322}