1use std::fs;
23use std::io;
24use std::path::{Path, PathBuf};
25use std::sync::atomic::{AtomicU64, Ordering};
26
27use serde::{Deserialize, Serialize};
28
29const CHANNEL_WRAPPER_TOKENS: &[&str] = &["</channel>", "<channel source="];
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Envelope {
33 pub id: Option<String>,
34 pub from: String,
35 pub to: Option<String>,
36 pub kind: Option<String>,
37 pub in_reply_to: Option<String>,
38 pub thread: Option<String>,
39 pub swarm: Option<String>,
40 pub idempotency_key: Option<String>,
41 pub requires_ack: Option<bool>,
42 pub text: String,
43 pub ts: String,
44}
45
46impl Envelope {
47 pub fn new(from: impl Into<String>, text: impl Into<String>, ts: impl Into<String>) -> Self {
48 Self {
49 id: None,
50 from: from.into(),
51 to: None,
52 kind: None,
53 in_reply_to: None,
54 thread: None,
55 swarm: None,
56 idempotency_key: None,
57 requires_ack: None,
58 text: text.into(),
59 ts: ts.into(),
60 }
61 }
62
63 pub fn kind_or_text(&self) -> &str {
64 self.kind
65 .as_deref()
66 .unwrap_or(crate::restart_handshake::KIND_TEXT)
67 }
68}
69
70pub fn valid_agent_id(id: &str) -> bool {
76 let core = id.strip_prefix("test-").unwrap_or(id);
77 match core.strip_prefix("agent") {
78 Some(suffix) => {
79 !suffix.is_empty()
80 && suffix
81 .chars()
82 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit())
83 }
84 None => false,
85 }
86}
87
88pub fn build_filename(from: &str) -> String {
92 static SEQ: AtomicU64 = AtomicU64::new(0);
93 let seq = SEQ.fetch_add(1, Ordering::Relaxed);
94 let nanos = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
95 let pid = std::process::id();
96 let rand_hex: u32 = rand::random();
97 format!("{nanos:020}-{pid:010}-{rand_hex:08x}-{seq:010}-from-{from}.json")
98}
99
100pub fn write_envelope(inbox: &Path, env: &Envelope) -> io::Result<PathBuf> {
104 fs::create_dir_all(inbox)?;
105 let bytes =
106 serde_json::to_vec(env).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
107 let mut last_err: Option<io::Error> = None;
108 for attempt in 0..8 {
109 let name = build_filename(&env.from);
110 let final_path = inbox.join(&name);
111 let tmp_path = inbox.join(format!(".{name}.{attempt}.tmp"));
112 fs::write(&tmp_path, &bytes)?;
113 match fs::hard_link(&tmp_path, &final_path) {
114 Ok(()) => {
115 let _ = fs::remove_file(&tmp_path);
116 return Ok(final_path);
117 }
118 Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
119 let _ = fs::remove_file(&tmp_path);
120 last_err = Some(e);
121 continue;
122 }
123 Err(e) => {
124 let _ = fs::remove_file(&tmp_path);
125 return Err(e);
126 }
127 }
128 }
129 Err(last_err.unwrap_or_else(|| {
130 io::Error::new(
131 io::ErrorKind::AlreadyExists,
132 "exhausted 8 attempts to create unique envelope filename",
133 )
134 }))
135}
136
137pub fn body_contains_wrapper_tokens(body: &str) -> bool {
139 CHANNEL_WRAPPER_TOKENS
140 .iter()
141 .any(|needle| body.contains(needle))
142}
143
144pub fn validate_bus_envelope(env: &Envelope) -> Result<(), String> {
147 if !valid_agent_id(&env.from) {
148 return Err(format!(
149 "invalid from {:?} (expected agent<lowercase-alnum>)",
150 env.from
151 ));
152 }
153 if chrono::DateTime::parse_from_rfc3339(&env.ts).is_err() {
154 return Err(format!("invalid ts {:?} (expected RFC3339)", env.ts));
155 }
156 if body_contains_wrapper_tokens(&env.text) {
157 return Err("body contains <channel> wrapper token; framing-break rejected".to_string());
158 }
159 Ok(())
160}
161
162pub fn xml_escape_body(s: &str) -> String {
165 let mut out = String::with_capacity(s.len());
166 for c in s.chars() {
167 match c {
168 '&' => out.push_str("&"),
169 '<' => out.push_str("<"),
170 '>' => out.push_str(">"),
171 '"' => out.push_str("""),
172 '\'' => out.push_str("'"),
173 other => out.push(other),
174 }
175 }
176 out
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182
183 #[test]
184 fn valid_agent_id_accepts_canonical_shapes() {
185 assert!(valid_agent_id("agent0"));
186 assert!(valid_agent_id("agent42"));
187 assert!(valid_agent_id("agentinfinity"));
188 }
189
190 #[test]
191 fn valid_agent_id_accepts_test_namespace() {
192 assert!(valid_agent_id("test-agent0"));
193 assert!(valid_agent_id("test-agent97"));
194 assert!(valid_agent_id("test-agentinfinity"));
195 }
196
197 #[test]
198 fn valid_agent_id_rejects_junk() {
199 assert!(!valid_agent_id(""));
200 assert!(!valid_agent_id("agent"));
201 assert!(!valid_agent_id("AGENT0"));
202 assert!(!valid_agent_id("agent-7"));
203 assert!(!valid_agent_id("bob"));
204 assert!(!valid_agent_id("agent 0"));
205 assert!(!valid_agent_id("test-"));
207 assert!(!valid_agent_id("test-bob"));
208 assert!(!valid_agent_id("test-agent"));
209 }
210
211 #[test]
212 fn build_filename_shape() {
213 let n = build_filename("agent7");
214 assert!(n.ends_with("-from-agent7.json"));
215 let parts: Vec<&str> = n.splitn(5, '-').collect();
216 assert_eq!(parts.len(), 5);
217 assert_eq!(parts[0].len(), 20);
218 assert_eq!(parts[1].len(), 10);
219 assert_eq!(parts[2].len(), 8);
220 }
221
222 #[test]
223 fn build_filename_is_unique_within_process() {
224 let mut seen = std::collections::HashSet::new();
225 for _ in 0..1000 {
226 assert!(seen.insert(build_filename("agent0")));
227 }
228 }
229
230 #[test]
231 fn write_envelope_round_trips() {
232 let tmp = tempfile::tempdir().unwrap();
233 let env = Envelope {
234 id: None,
235 from: "agent2".to_string(),
236 to: None,
237 kind: None,
238 in_reply_to: None,
239 thread: None,
240 swarm: None,
241 idempotency_key: None,
242 requires_ack: None,
243 text: "hello".to_string(),
244 ts: "2026-04-15T21:00:00Z".to_string(),
245 };
246 let path = write_envelope(tmp.path(), &env).unwrap();
247 let raw = fs::read_to_string(&path).unwrap();
248 let round: Envelope = serde_json::from_str(&raw).unwrap();
249 assert_eq!(round.from, env.from);
250 assert_eq!(round.text, env.text);
251 }
252
253 #[test]
254 fn legacy_envelope_deserializes_with_v1_defaults() {
255 let raw = r#"{"from":"agent0","text":"hello","ts":"2026-04-15T21:00:00Z"}"#;
256 let env: Envelope = serde_json::from_str(raw).unwrap();
257 assert_eq!(env.from, "agent0");
258 assert_eq!(env.text, "hello");
259 assert_eq!(env.ts, "2026-04-15T21:00:00Z");
260 assert_eq!(env.id, None);
261 assert_eq!(env.to, None);
262 assert_eq!(env.kind, None);
263 assert_eq!(env.in_reply_to, None);
264 assert_eq!(env.thread, None);
265 assert_eq!(env.swarm, None);
266 assert_eq!(env.idempotency_key, None);
267 assert_eq!(env.requires_ack, None);
268 assert_eq!(env.kind_or_text(), "text");
269 }
270
271 #[test]
272 fn v1_envelope_round_trips_optional_fields() {
273 let env = Envelope {
274 id: Some("msg-1".to_string()),
275 from: "agent0".to_string(),
276 to: Some("agent4".to_string()),
277 kind: Some("brief".to_string()),
278 in_reply_to: Some("msg-0".to_string()),
279 thread: Some("thread-1".to_string()),
280 swarm: Some("swarm-1".to_string()),
281 idempotency_key: Some("idem-1".to_string()),
282 requires_ack: Some(true),
283 text: "hello".to_string(),
284 ts: "2026-04-15T21:00:00Z".to_string(),
285 };
286 let raw = serde_json::to_string(&env).unwrap();
287 let round: Envelope = serde_json::from_str(&raw).unwrap();
288 assert_eq!(round.id.as_deref(), Some("msg-1"));
289 assert_eq!(round.to.as_deref(), Some("agent4"));
290 assert_eq!(round.kind.as_deref(), Some("brief"));
291 assert_eq!(round.in_reply_to.as_deref(), Some("msg-0"));
292 assert_eq!(round.thread.as_deref(), Some("thread-1"));
293 assert_eq!(round.swarm.as_deref(), Some("swarm-1"));
294 assert_eq!(round.idempotency_key.as_deref(), Some("idem-1"));
295 assert_eq!(round.requires_ack, Some(true));
296 }
297
298 #[test]
299 fn write_envelope_rejects_name_collision() {
300 let tmp = tempfile::tempdir().unwrap();
305 let env = Envelope {
306 id: None,
307 from: "agent2".to_string(),
308 to: None,
309 kind: None,
310 in_reply_to: None,
311 thread: None,
312 swarm: None,
313 idempotency_key: None,
314 requires_ack: None,
315 text: "hi".to_string(),
316 ts: "2026-04-15T21:00:00Z".to_string(),
317 };
318 let a = write_envelope(tmp.path(), &env).unwrap();
319 let b = write_envelope(tmp.path(), &env).unwrap();
320 assert_ne!(a, b);
321 }
322}