1#![allow(clippy::unusual_byte_groupings)]
2use crate::encoding::STANDARD;
6use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
7use flate2::read::DeflateDecoder;
8use flate2::write::DeflateEncoder;
9use flate2::Compression;
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use sha2::{Digest, Sha256};
13use std::io::{Read, Write};
14
15use crate::canonicalize;
16use crate::expiration::{is_within_window, Window};
17
18#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
19pub struct PacketSignatureEnvelope {
20 pub algorithm: String,
21 pub signer: String,
22 pub signature: String,
23}
24
25#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
26pub struct PacketFragmentHeader {
27 pub fragment_id: String,
28 pub index: u32,
29 pub count: u32,
30 pub total_payload_bytes: u32,
31 pub payload_digest: String,
32}
33
34#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
35pub struct Packet {
36 pub packet_version: String,
37 pub packet_id: String,
38 pub source: String,
39 pub destination: String,
40 pub priority: String,
41 #[serde(skip_serializing_if = "Option::is_none", default)]
42 pub emergency: Option<bool>,
43 pub created_at: String,
44 #[serde(skip_serializing_if = "Option::is_none", default)]
45 pub expires_at: Option<String>,
46 #[serde(skip_serializing_if = "Option::is_none", default)]
47 pub ttl_hops: Option<u32>,
48 #[serde(skip_serializing_if = "Option::is_none", default)]
49 pub route_constraints: Option<Vec<String>>,
50 #[serde(skip_serializing_if = "Option::is_none", default)]
51 pub encoding: Option<String>,
52 #[serde(skip_serializing_if = "Option::is_none", default)]
53 pub compression: Option<String>,
54 pub payload: String,
55 #[serde(skip_serializing_if = "Option::is_none", default)]
56 pub session_ref: Option<String>,
57 #[serde(skip_serializing_if = "Option::is_none", default)]
58 pub fragment: Option<PacketFragmentHeader>,
59 pub signature: PacketSignatureEnvelope,
60}
61
62#[derive(Clone, Debug)]
63pub struct SignPacketArgs<'a> {
64 pub packet_id: String,
65 pub source: String,
66 pub destination: String,
67 pub priority: String,
68 pub payload: &'a [u8],
69 pub encoding: Option<String>,
70 pub compression: Option<String>,
71 pub emergency: bool,
72 pub expires_at: Option<String>,
73 pub ttl_hops: Option<u32>,
74 pub route_constraints: Option<Vec<String>>,
75 pub session_ref: Option<String>,
76 pub private_key: [u8; 32],
77 pub signer: String,
78 pub created_at: Option<String>,
79}
80
81pub fn packet_signing_bytes(p: &Packet) -> [u8; 32] {
82 let mut value = serde_json::to_value(p).unwrap_or(Value::Null);
83 if let Value::Object(map) = &mut value {
84 map.remove("signature");
85 }
86 let canonical = canonicalize(&value).unwrap_or_default();
87 Sha256::digest(canonical.as_bytes()).into()
88}
89
90pub fn sign_packet(args: SignPacketArgs<'_>) -> Result<Packet, String> {
91 if args.priority == "P0" && !args.emergency {
92 return Err("P0 priority is reserved for emergency packets".into());
93 }
94 let encoding = args.encoding.unwrap_or_else(|| "cbor".to_string());
95 let compression = args.compression.unwrap_or_else(|| "none".to_string());
96
97 let payload_bytes: Vec<u8> = if encoding == "cbor" {
99 serde_cbor_envelope(args.payload)
100 } else {
101 let canonical = canonicalize(&serde_json::json!({
102 "raw": STANDARD.encode(args.payload),
103 }))
104 .map_err(|e| format!("canonicalize: {}", e))?;
105 canonical.into_bytes()
106 };
107
108 let final_bytes = if compression == "deflate" {
109 let mut enc = DeflateEncoder::new(Vec::new(), Compression::default());
110 enc.write_all(&payload_bytes).map_err(|e| e.to_string())?;
111 enc.finish().map_err(|e| e.to_string())?
112 } else {
113 payload_bytes
114 };
115
116 let mut draft = Packet {
117 packet_version: "1".into(),
118 packet_id: args.packet_id,
119 source: args.source.clone(),
120 destination: args.destination,
121 priority: args.priority,
122 emergency: if args.emergency { Some(true) } else { None },
123 created_at: args.created_at.unwrap_or_else(now_iso8601),
124 expires_at: args.expires_at,
125 ttl_hops: args.ttl_hops,
126 route_constraints: args.route_constraints.filter(|r| !r.is_empty()),
127 encoding: Some(encoding),
128 compression: Some(compression),
129 payload: STANDARD.encode(&final_bytes),
130 session_ref: args.session_ref,
131 fragment: None,
132 signature: PacketSignatureEnvelope {
133 algorithm: "ed25519".into(),
134 signer: args.signer.clone(),
135 signature: String::new(),
136 },
137 };
138 let digest = packet_signing_bytes(&draft);
139 let signing = SigningKey::from_bytes(&args.private_key);
140 let sig: Signature = signing.sign(&digest);
141 draft.signature.signature = STANDARD.encode(sig.to_bytes());
142 Ok(draft)
143}
144
145#[derive(Debug)]
146pub struct VerifyPacketResult {
147 pub ok: bool,
148 pub reason: Option<String>,
149 pub payload: Option<Vec<u8>>,
150}
151
152pub fn verify_packet(packet: &Packet, public_key: &[u8; 32], now: &str) -> VerifyPacketResult {
153 let rejected = |r: &str| VerifyPacketResult {
154 ok: false,
155 reason: Some(r.to_string()),
156 payload: None,
157 };
158 if packet.packet_version != "1" {
159 return rejected(&format!(
160 "unsupported packet_version {}",
161 packet.packet_version
162 ));
163 }
164 if packet.signature.signer != packet.source {
165 return rejected("signature signer does not match source");
166 }
167 if packet.priority == "P0" && packet.emergency != Some(true) {
168 return rejected("P0 reserved for emergency packets");
169 }
170 if let Some(expires) = &packet.expires_at {
171 let window = Window {
172 valid_until: Some(expires.as_str()),
173 ..Window::default()
174 };
175 if !is_within_window(&window, now) {
176 return rejected("packet expired");
177 }
178 }
179 let digest = packet_signing_bytes(packet);
180 let sig_bytes = match STANDARD.decode(&packet.signature.signature) {
181 Ok(b) => b,
182 Err(e) => return rejected(&format!("signature base64 decode: {}", e)),
183 };
184 let sig = match Signature::from_slice(&sig_bytes) {
185 Ok(s) => s,
186 Err(e) => return rejected(&format!("signature parse: {}", e)),
187 };
188 let vk = match VerifyingKey::from_bytes(public_key) {
189 Ok(v) => v,
190 Err(e) => return rejected(&format!("verifying key: {}", e)),
191 };
192 if vk.verify(&digest, &sig).is_err() {
193 return rejected("signature verification failed");
194 }
195 let wire = match STANDARD.decode(&packet.payload) {
196 Ok(b) => b,
197 Err(e) => return rejected(&format!("payload base64: {}", e)),
198 };
199 let decompressed = if packet.compression.as_deref() == Some("deflate") {
200 let mut dec = DeflateDecoder::new(&wire[..]);
201 let mut out = Vec::new();
202 if dec.read_to_end(&mut out).is_err() {
203 return rejected("deflate inflate failed");
204 }
205 out
206 } else {
207 wire
208 };
209 let payload = match packet.encoding.as_deref() {
210 Some("json") | None => match serde_json::from_slice::<Value>(&decompressed) {
211 Ok(v) => match v.get("raw").and_then(|r| r.as_str()) {
212 Some(b64) => match STANDARD.decode(b64) {
213 Ok(b) => b,
214 Err(_) => return rejected("payload base64 inner"),
215 },
216 None => return rejected("json envelope missing raw"),
217 },
218 Err(e) => return rejected(&format!("json decode: {}", e)),
219 },
220 Some("cbor") => match decode_cbor_envelope(&decompressed) {
221 Ok(b) => b,
222 Err(e) => return rejected(&format!("cbor decode: {}", e)),
223 },
224 Some(other) => return rejected(&format!("unknown encoding {}", other)),
225 };
226 VerifyPacketResult {
227 ok: true,
228 reason: None,
229 payload: Some(payload),
230 }
231}
232
233#[derive(Clone, Debug, Default)]
234pub struct FragmentOptions {
235 pub mtu: Option<usize>,
236}
237
238pub fn fragment_packet(
239 source: &Packet,
240 private_key: &[u8; 32],
241 opts: FragmentOptions,
242) -> Vec<Packet> {
243 let mtu = opts.mtu.unwrap_or(256);
244 let original = STANDARD.decode(&source.payload).unwrap_or_default();
245 let total_bytes = original.len();
246 if total_bytes <= mtu {
247 return vec![source.clone()];
248 }
249 let count = total_bytes.div_ceil(mtu);
250 let digest_hex = sha256_hex(&original);
251 let payload_digest = format!("sha256:{}", digest_hex);
252 let fragment_id = format!("frag-{}", source.packet_id);
253 let mut out = Vec::with_capacity(count);
254 for i in 0..count {
255 let start = i * mtu;
256 let end = (start + mtu).min(total_bytes);
257 let slice = &original[start..end];
258 let mut draft = Packet {
259 packet_version: "1".into(),
260 packet_id: format!("{}-{}", source.packet_id, i),
261 source: source.source.clone(),
262 destination: source.destination.clone(),
263 priority: source.priority.clone(),
264 emergency: source.emergency,
265 created_at: source.created_at.clone(),
266 expires_at: source.expires_at.clone(),
267 ttl_hops: source.ttl_hops,
268 route_constraints: source.route_constraints.clone(),
269 encoding: source.encoding.clone(),
270 compression: source.compression.clone(),
271 payload: STANDARD.encode(slice),
272 session_ref: source.session_ref.clone(),
273 fragment: Some(PacketFragmentHeader {
274 fragment_id: fragment_id.clone(),
275 index: i as u32,
276 count: count as u32,
277 total_payload_bytes: total_bytes as u32,
278 payload_digest: payload_digest.clone(),
279 }),
280 signature: PacketSignatureEnvelope {
281 algorithm: "ed25519".into(),
282 signer: source.source.clone(),
283 signature: String::new(),
284 },
285 };
286 let digest = packet_signing_bytes(&draft);
287 let signing = SigningKey::from_bytes(private_key);
288 let sig: Signature = signing.sign(&digest);
289 draft.signature.signature = STANDARD.encode(sig.to_bytes());
290 out.push(draft);
291 }
292 out
293}
294
295#[derive(Debug)]
296pub struct ReassembleResult {
297 pub ok: bool,
298 pub reason: Option<String>,
299 pub packet_id: Option<String>,
300 pub payload: Option<Vec<u8>>,
301}
302
303pub fn reassemble_fragments(fragments: &[Packet]) -> ReassembleResult {
304 let rejected = |r: &str| ReassembleResult {
305 ok: false,
306 reason: Some(r.to_string()),
307 packet_id: None,
308 payload: None,
309 };
310 if fragments.is_empty() {
311 return rejected("no fragments");
312 }
313 let header = match fragments[0].fragment.as_ref() {
314 Some(h) => h.clone(),
315 None => return rejected("first fragment missing fragment header"),
316 };
317 if fragments.len() != header.count as usize {
318 return rejected(&format!(
319 "expected {} fragments, got {}",
320 header.count,
321 fragments.len()
322 ));
323 }
324 let mut ordered: Vec<Option<&Packet>> = vec![None; header.count as usize];
325 for f in fragments {
326 let h = match f.fragment.as_ref() {
327 Some(h) => h,
328 None => return rejected("fragment missing header"),
329 };
330 if h.fragment_id != header.fragment_id {
331 return rejected("mismatched fragment_id");
332 }
333 if h.count != header.count {
334 return rejected("mismatched fragment count");
335 }
336 if ordered[h.index as usize].is_some() {
337 return rejected(&format!("duplicate fragment index {}", h.index));
338 }
339 ordered[h.index as usize] = Some(f);
340 }
341 let mut out = Vec::with_capacity(header.total_payload_bytes as usize);
342 for slot in ordered {
343 match slot {
344 Some(p) => {
345 let bytes = match STANDARD.decode(&p.payload) {
346 Ok(b) => b,
347 Err(e) => return rejected(&format!("base64: {}", e)),
348 };
349 out.extend_from_slice(&bytes);
350 }
351 None => return rejected("missing fragment slot"),
352 }
353 }
354 if out.len() != header.total_payload_bytes as usize {
355 return rejected(&format!(
356 "assembled {} bytes, expected {}",
357 out.len(),
358 header.total_payload_bytes
359 ));
360 }
361 let computed = format!("sha256:{}", sha256_hex(&out));
362 if computed != header.payload_digest {
363 return rejected("reassembled payload digest mismatch");
364 }
365 ReassembleResult {
366 ok: true,
367 reason: None,
368 packet_id: Some(header.fragment_id),
369 payload: Some(out),
370 }
371}
372
373fn serde_cbor_envelope(raw: &[u8]) -> Vec<u8> {
374 let value = crate::cbor::Value::Map(vec![(
375 crate::cbor::Value::Text("raw".into()),
376 crate::cbor::Value::Bytes(raw.to_vec()),
377 )]);
378 crate::cbor::encode(&value).expect("cbor encode")
379}
380
381fn decode_cbor_envelope(bytes: &[u8]) -> Result<Vec<u8>, String> {
382 let value = crate::cbor::decode(bytes).map_err(|e| format!("cbor: {}", e))?;
383 match value {
384 crate::cbor::Value::Map(entries) => {
385 for (k, v) in entries {
386 if matches!(k, crate::cbor::Value::Text(ref s) if s == "raw") {
387 if let crate::cbor::Value::Bytes(b) = v {
388 return Ok(b);
389 }
390 }
391 }
392 Err("cbor envelope missing raw".into())
393 }
394 _ => Err("cbor envelope not a map".into()),
395 }
396}
397
398fn sha256_hex(b: &[u8]) -> String {
399 let digest = Sha256::digest(b);
400 digest.iter().map(|byte| format!("{:02x}", byte)).collect()
401}
402
403fn now_iso8601() -> String {
404 let secs = std::time::SystemTime::now()
405 .duration_since(std::time::UNIX_EPOCH)
406 .unwrap_or_default()
407 .as_secs() as i64;
408 let (y, m, d, h, mi, s) = secs_to_ymdhms(secs);
409 format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z", y, m, d, h, mi, s)
410}
411
412fn secs_to_ymdhms(secs: i64) -> (i32, u32, u32, u32, u32, u32) {
413 let days = secs.div_euclid(86_400);
414 let time = secs.rem_euclid(86_400);
415 let hour = (time / 3600) as u32;
416 let minute = ((time % 3600) / 60) as u32;
417 let second = (time % 60) as u32;
418 let z = days + 719_468;
419 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
420 let doe = (z - era * 146_097) as u64;
421 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
422 let y = yoe as i64 + era * 400;
423 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
424 let mp = (5 * doy + 2) / 153;
425 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
426 let m = if mp < 10 {
427 (mp + 3) as u32
428 } else {
429 (mp - 9) as u32
430 };
431 let year = if m <= 2 { y + 1 } else { y };
432 (year as i32, m, d, hour, minute, second)
433}
434
435#[derive(Clone, Debug, Default)]
440pub struct LoraSimOptions {
441 pub packet_loss: Option<f64>,
443 pub bandwidth_bps: Option<f64>,
445 pub base_latency_ms: Option<f64>,
447}
448
449#[derive(Debug, Default)]
450pub struct LoraSimResult {
451 pub delivered: Vec<Packet>,
452 pub dropped: Vec<Packet>,
453 pub total_latency_ms: f64,
455}
456
457pub fn simulate_lora(
462 packets: &[Packet],
463 opts: LoraSimOptions,
464 rng_seed: Option<u64>,
465) -> LoraSimResult {
466 let loss = opts.packet_loss.unwrap_or(0.0);
467 let bw = opts.bandwidth_bps.unwrap_or(250.0);
468 let base = opts.base_latency_ms.unwrap_or(5000.0);
469 let mut state = rng_seed.unwrap_or(0xdeadbeef_dead_beefu64);
470 let mut next = move || {
471 state ^= state << 13;
473 state ^= state >> 7;
474 state ^= state << 17;
475
476 (state.wrapping_mul(0x2545_F491_4F6C_DD1Du64) >> 11) as f64 / (1u64 << 53) as f64
477 };
478 let mut delivered = Vec::with_capacity(packets.len());
479 let mut dropped: Vec<Packet> = Vec::new();
480 let mut total_latency_ms = 0.0_f64;
481 for p in packets {
482 let canonical = serde_json::to_string(p).unwrap_or_default();
483 let size_bytes = canonical.len() as f64;
484 let tx_ms = (size_bytes / bw) * 1000.0;
485 total_latency_ms += base + tx_ms;
486 if next() < loss {
487 dropped.push(p.clone());
488 } else {
489 delivered.push(p.clone());
490 }
491 }
492 LoraSimResult {
493 delivered,
494 dropped,
495 total_latency_ms,
496 }
497}
498
499#[cfg(test)]
500mod lora_tests {
501 use super::*;
502 use rand::rngs::OsRng;
503
504 fn fixture(packet_id: &str) -> Packet {
505 let signer = "tf:actor:agent:example.com/x";
506 let mut signer_seed = [0u8; 32];
507 rand::RngCore::fill_bytes(&mut OsRng, &mut signer_seed);
508 sign_packet(SignPacketArgs {
509 packet_id: packet_id.into(),
510 source: signer.into(),
511 destination: "tf:actor:service:example.com/d".into(),
512 priority: "P3".into(),
513 payload: b"hi",
514 encoding: None,
515 compression: None,
516 emergency: false,
517 expires_at: None,
518 ttl_hops: None,
519 route_constraints: None,
520 session_ref: None,
521 private_key: signer_seed,
522 signer: signer.into(),
523 created_at: Some("2026-04-24T12:00:00Z".into()),
524 })
525 .expect("sign")
526 }
527
528 #[test]
529 fn lossless_channel_delivers_everything() {
530 let packets = vec![fixture("a"), fixture("b"), fixture("c")];
531 let r = simulate_lora(
532 &packets,
533 LoraSimOptions {
534 packet_loss: Some(0.0),
535 bandwidth_bps: Some(250.0),
536 base_latency_ms: Some(5000.0),
537 },
538 Some(1),
539 );
540 assert_eq!(r.delivered.len(), 3);
541 assert_eq!(r.dropped.len(), 0);
542 assert!(r.total_latency_ms > 15_000.0);
543 }
544
545 #[test]
546 fn full_loss_drops_everything() {
547 let packets = vec![fixture("a"), fixture("b")];
548 let r = simulate_lora(
549 &packets,
550 LoraSimOptions {
551 packet_loss: Some(1.0),
552 ..Default::default()
553 },
554 Some(42),
555 );
556 assert_eq!(r.delivered.len(), 0);
557 assert_eq!(r.dropped.len(), 2);
558 }
559
560 #[test]
561 fn deterministic_with_seed() {
562 let packets: Vec<Packet> = (0..10).map(|i| fixture(&format!("pkt-{}", i))).collect();
563 let r1 = simulate_lora(
564 &packets,
565 LoraSimOptions {
566 packet_loss: Some(0.5),
567 ..Default::default()
568 },
569 Some(99),
570 );
571 let r2 = simulate_lora(
572 &packets,
573 LoraSimOptions {
574 packet_loss: Some(0.5),
575 ..Default::default()
576 },
577 Some(99),
578 );
579 assert_eq!(r1.delivered.len(), r2.delivered.len());
580 assert_eq!(r1.dropped.len(), r2.dropped.len());
581 }
582}