Skip to main content

sedsnet/
wire_format.rs

1//! Wire-format packing and unpacking of telemetry packets.
2//!
3//! This module defines the compact v2 wire format used to send and receive
4//! [`Packet`]s, along with:
5//! - [`pack_packet`] / [`unpack_packet`] for full packets.
6//! - [`peek_envelope`] for header-only inspection without touching the payload.
7//! - Size helpers like [`header_size_bytes`] and [`packet_wire_size`].
8//!
9//! The core public type here is [`TelemetryEnvelope`], a lightweight view of
10//! the header fields used by `peek_envelope`.
11
12use crate::MessageElement;
13use crate::{
14    DataEndpoint, TelemetryError, TelemetryResult, get_message_name, is_reliable_type,
15    message_meta,
16    packet::Packet,
17    {MAX_VALUE_DATA_ENDPOINT, MAX_VALUE_DATA_TYPE, config::DataType},
18};
19
20use crate::packet::{hash_bytes_u64, sender_address_u32};
21#[cfg(feature = "std")]
22use alloc::borrow::ToOwned;
23#[cfg(feature = "std")]
24use alloc::collections::BTreeMap;
25use alloc::{format, string::String, sync::Arc, vec, vec::Vec};
26use crc32fast::Hasher as Crc32Hasher;
27#[cfg(feature = "std")]
28use std::sync::{Mutex, OnceLock};
29
30/// Lightweight header-only view of a packed [`Packet`].
31///
32/// Produced by [`peek_envelope`] without allocating or copying the payload.
33#[derive(Clone, Debug, PartialEq, Eq)]
34pub struct TelemetryEnvelope {
35    /// Telemetry [`DataType`] discriminant.
36    pub ty: DataType,
37    /// All endpoints this packet is destined for (set bits in the bitmap).
38    pub endpoints: Arc<[DataEndpoint]>,
39    /// Sender identity resolved from discovery/config address state.
40    pub sender: Arc<str>,
41    /// Compact source address carried by the packet header.
42    pub source_address: u32,
43    /// Timestamp in milliseconds (as stored on the wire).
44    pub timestamp_ms: u64,
45    /// Inline wire-format payload shape, if present.
46    pub wire_shape: Option<MessageElement>,
47    /// Frozen destination sender hashes, if present.
48    pub target_senders: Arc<[u64]>,
49}
50
51/// Reliable header included for data types marked `reliable` in the schema.
52#[derive(Clone, Copy, Debug, PartialEq, Eq)]
53pub struct ReliableHeader {
54    pub flags: u8,
55    pub seq: u32,
56    pub ack: u32,
57}
58
59/// Reliable header flag: frame is ACK-only control (no payload).
60pub const RELIABLE_FLAG_ACK_ONLY: u8 = 0x01;
61
62/// Reliable header flag: frame is reliable but unordered (ACK/retransmit without ordering).
63pub const RELIABLE_FLAG_UNORDERED: u8 = 0x02;
64
65/// Reliable header flag: frame is unsequenced best-effort (no ordering/ack).
66pub const RELIABLE_FLAG_UNSEQUENCED: u8 = 0x80;
67
68/// Fixed size of the reliable header on the wire.
69pub const RELIABLE_HEADER_BYTES: usize = 1 + 4 + 4;
70/// Fixed size of the CRC32 trailer on the wire.
71pub const CRC32_BYTES: usize = 4;
72
73// packet Layout:
74//
75//   [FLAGS: u8]
76//       Bit 0: payload compressed flag (1 = compressed)
77//       Bit 5: endpoint bitmap present (0 = derive endpoints from data type metadata)
78//       Bit 6: compact reliable header present (if a reliable header exists)
79//
80//   [NEP: u8]
81//       Number of selected endpoints.
82//
83//   VARINT(ty: u32 as u64)           -- ULEB128
84//   VARINT(data_size: u64)           -- ULEB128   (LOGICAL payload size, uncompressed)
85//   VARINT(timestamp: u64)           -- ULEB128
86//   [VARINT(nonce: u16 as u64)]      -- ULEB128   (ONLY if bit3 set)
87//   VARINT(src_addr: u32 as u64)     -- ULEB128
88//
89//   [ENDPOINTS_BITMAP]               -- ONLY if bit5 set; 1 bit per possible endpoint
90//   [RELIABLE HEADER]                -- present if type is configured `reliable`
91//       [REL_FLAGS: u8]
92//       [SEQ: u32 LE]
93//       [ACK: u32 LE]
94//   PAYLOAD BYTES                    -- raw or compressed payload bytes
95//   [CRC32: u32 LE]                  -- checksum of all prior bytes
96
97// ===========================================================================
98// ULEB128 (varint) encoding helpers
99// ===========================================================================
100const FLAG_COMPRESSED_PAYLOAD: u8 = 0x01;
101const FLAG_WIRE_CONTRACT: u8 = 0x04;
102const FLAG_PACKET_NONCE: u8 = 0x08;
103#[cfg(feature = "cryptography")]
104const FLAG_E2E_ENCRYPTED_PAYLOAD: u8 = 0x10;
105const FLAG_ENDPOINT_BITMAP_PRESENT: u8 = 0x20;
106const FLAG_COMPACT_RELIABLE_HEADER: u8 = 0x40;
107const CONTRACT_FLAG_TARGETS: u8 = 0x01;
108const CONTRACT_FLAG_SHAPE: u8 = 0x02;
109const CONTRACT_FLAG_RELIABLE_HEADER: u8 = 0x04;
110const RELIABLE_WIRE_FLAG_SEQ_PRESENT: u8 = 0x04;
111const RELIABLE_WIRE_FLAG_ACK_PRESENT: u8 = 0x08;
112const RELIABLE_PUBLIC_FLAGS_MASK: u8 =
113    RELIABLE_FLAG_ACK_ONLY | RELIABLE_FLAG_UNORDERED | RELIABLE_FLAG_UNSEQUENCED;
114#[cfg(feature = "cryptography")]
115const E2E_NONCE_LEN: usize = 12;
116#[cfg(feature = "cryptography")]
117const E2E_TAG_CAP: usize = 32;
118
119#[derive(Clone, Debug, PartialEq, Eq)]
120struct WireContract {
121    shape: Option<MessageElement>,
122    target_senders: Arc<[u64]>,
123    has_reliable_header: bool,
124}
125
126/// Encode a `u64` as ULEB128 and append it to `out`.
127#[inline]
128fn write_uleb128<T>(mut v: u64, out: &mut Vec<T>)
129where
130    T: From<u8>,
131{
132    loop {
133        let mut byte = (v & 0x7F) as u8;
134        v >>= 7;
135        if v != 0 {
136            byte |= 0x80;
137        }
138        out.push(T::from(byte));
139        if v == 0 {
140            break;
141        }
142    }
143}
144
145/// Decode a ULEB128-encoded `u64` from the given reader.
146#[inline]
147fn read_uleb128(r: &mut ByteReader) -> Result<u64, TelemetryError> {
148    let mut result: u64 = 0;
149    let mut shift = 0u32;
150    // u64 fits in at most 10 ULEB128 bytes.
151    for _ in 0..10 {
152        let b = r.read_bytes(1)?[0];
153        result |= ((b & 0x7F) as u64) << shift;
154        if (b & 0x80) == 0 {
155            return Ok(result);
156        }
157        shift += 7;
158    }
159    Err(TelemetryError::Unpack("uleb128 too long"))
160}
161
162/// Compute the encoded length (in bytes) of a ULEB128-encoded `u64`.
163#[inline]
164fn uleb128_size(mut v: u64) -> usize {
165    let mut n = 1;
166    while v >= 0x80 {
167        v >>= 7;
168        n += 1;
169    }
170    n
171}
172
173/// Count the total number of bits set across all bytes of the bitmap.
174#[inline]
175fn bitmap_popcount(bm: &[u8]) -> usize {
176    bm.iter().map(|b| b.count_ones() as usize).sum()
177}
178
179// ===========================================================================
180// ByteReader: tiny cursor over a byte slice
181// ===========================================================================
182
183#[derive(Clone, Copy)]
184struct ByteReader<'a> {
185    buf: &'a [u8],
186    off: usize,
187}
188
189impl<'a> ByteReader<'a> {
190    /// Create a new reader over the given buffer starting at offset 0.
191    fn new(buf: &'a [u8]) -> Self {
192        Self { buf, off: 0 }
193    }
194
195    /// Remaining bytes that can still be read.
196    fn remaining(&self) -> usize {
197        self.buf.len().saturating_sub(self.off)
198    }
199
200    /// Read exactly `n` bytes, advancing the internal offset.
201    fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], TelemetryError> {
202        if self.remaining() < n {
203            return Err(TelemetryError::Unpack("short read"));
204        }
205        let s = &self.buf[self.off..self.off + n];
206        self.off += n;
207        Ok(s)
208    }
209}
210
211#[inline]
212fn write_u32_le(v: u32, out: &mut Vec<u8>) {
213    out.extend_from_slice(&v.to_le_bytes());
214}
215
216#[inline]
217fn read_u32_le(r: &mut ByteReader) -> Result<u32, TelemetryError> {
218    let b = r.read_bytes(4)?;
219    Ok(u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
220}
221
222#[inline]
223fn encode_wire_shape(shape: MessageElement) -> Result<Vec<u8>, TelemetryError> {
224    let dt = crate::config::message_data_type_code(shape.data_type());
225    let class = crate::config::message_class_code(shape.message_type());
226    let mut out = Vec::with_capacity(6);
227    let mut packed = dt | (class << 4);
228    if matches!(shape, MessageElement::Static(_, _, _)) {
229        packed |= 1 << 6;
230    }
231    out.push(packed);
232    if let MessageElement::Static(count, _, _) = shape {
233        let count = u64::try_from(count).map_err(|_| TelemetryError::Pack("wire shape count"))?;
234        write_uleb128(count, &mut out);
235    }
236    Ok(out)
237}
238
239#[inline]
240fn decode_wire_shape(r: &mut ByteReader) -> Result<MessageElement, TelemetryError> {
241    let packed = r.read_bytes(1)?[0];
242    let dt = crate::config::message_data_type_from_code(packed & 0x0F)
243        .ok_or(TelemetryError::Unpack("wire shape type"))?;
244    let class = crate::config::message_class_from_code((packed >> 4) & 0x03)
245        .ok_or(TelemetryError::Unpack("wire shape class"))?;
246    if (packed & (1 << 6)) != 0 {
247        let count = usize::try_from(read_uleb128(r)?)
248            .map_err(|_| TelemetryError::Unpack("wire shape count"))?;
249        Ok(MessageElement::Static(count, dt, class))
250    } else {
251        Ok(MessageElement::Dynamic(dt, class))
252    }
253}
254
255#[inline]
256fn encode_wire_contract(
257    shape: Option<MessageElement>,
258    target_senders: &[u64],
259    has_reliable_header: bool,
260) -> Result<Vec<u8>, TelemetryError> {
261    // Keep the contract self-contained and compact. It exists only to preserve
262    // delivery/decode intent for packets that are already in flight while
263    // schema or topology updates are still converging.
264    let mut out = Vec::new();
265    let mut flags = 0u8;
266    if !target_senders.is_empty() {
267        flags |= CONTRACT_FLAG_TARGETS;
268    }
269    if shape.is_some() {
270        flags |= CONTRACT_FLAG_SHAPE;
271    }
272    if has_reliable_header {
273        flags |= CONTRACT_FLAG_RELIABLE_HEADER;
274    }
275    out.push(flags);
276    if let Some(shape) = shape {
277        out.extend_from_slice(&encode_wire_shape(shape)?);
278    }
279    if !target_senders.is_empty() {
280        write_uleb128(target_senders.len() as u64, &mut out);
281        for hash in target_senders {
282            out.extend_from_slice(&hash.to_le_bytes());
283        }
284    }
285    Ok(out)
286}
287
288#[inline]
289fn decode_wire_contract(
290    r: &mut ByteReader,
291    has_contract: bool,
292) -> Result<WireContract, TelemetryError> {
293    if !has_contract {
294        return Ok(WireContract {
295            shape: None,
296            target_senders: Arc::<[u64]>::from([]),
297            has_reliable_header: false,
298        });
299    }
300    let contract_len = usize::try_from(read_uleb128(r)?)
301        .map_err(|_| TelemetryError::Unpack("wire contract length"))?;
302    let contract_bytes = r.read_bytes(contract_len)?;
303    let mut cr = ByteReader::new(contract_bytes);
304    // Parsing through a bounded sub-reader lets us reject trailing garbage in
305    // the contract cleanly instead of accidentally treating it as payload or
306    // reliable-header bytes.
307    let flags = cr.read_bytes(1)?[0];
308    let shape = if (flags & CONTRACT_FLAG_SHAPE) != 0 {
309        Some(decode_wire_shape(&mut cr)?)
310    } else {
311        None
312    };
313    let target_senders: Arc<[u64]> = if (flags & CONTRACT_FLAG_TARGETS) != 0 {
314        let count = usize::try_from(read_uleb128(&mut cr)?)
315            .map_err(|_| TelemetryError::Unpack("wire contract target count"))?;
316        let mut targets = Vec::with_capacity(count);
317        for _ in 0..count {
318            let bytes = cr.read_bytes(8)?;
319            targets.push(u64::from_le_bytes([
320                bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
321            ]));
322        }
323        Arc::from(targets)
324    } else {
325        Arc::<[u64]>::from([])
326    };
327    if cr.remaining() != 0 {
328        return Err(TelemetryError::Unpack("wire contract trailing bytes"));
329    }
330    Ok(WireContract {
331        shape,
332        target_senders,
333        has_reliable_header: (flags & CONTRACT_FLAG_RELIABLE_HEADER) != 0,
334    })
335}
336
337#[inline]
338fn crc32_bytes(data: &[u8]) -> u32 {
339    let mut hasher = Crc32Hasher::new();
340    hasher.update(data);
341    hasher.finalize()
342}
343
344#[inline]
345fn append_crc32(out: &mut Vec<u8>) {
346    let crc = crc32_bytes(out);
347    out.extend_from_slice(&crc.to_le_bytes());
348}
349
350#[inline]
351fn split_crc32(buf: &[u8]) -> Result<(&[u8], u32), TelemetryError> {
352    if buf.len() < CRC32_BYTES {
353        return Err(TelemetryError::Unpack("short buffer"));
354    }
355    let data_len = buf.len() - CRC32_BYTES;
356    let crc = u32::from_le_bytes([
357        buf[data_len],
358        buf[data_len + 1],
359        buf[data_len + 2],
360        buf[data_len + 3],
361    ]);
362    Ok((&buf[..data_len], crc))
363}
364
365#[inline]
366fn verify_crc32(buf: &[u8]) -> Result<&[u8], TelemetryError> {
367    let (data, expected) = split_crc32(buf)?;
368    let actual = crc32_bytes(data);
369    if actual != expected {
370        return Err(TelemetryError::Unpack("crc32 mismatch"));
371    }
372    Ok(data)
373}
374
375#[cfg(feature = "cryptography")]
376#[inline]
377fn e2e_nonce_for_packet(pkt: &Packet) -> [u8; E2E_NONCE_LEN] {
378    let mut nonce = [0u8; E2E_NONCE_LEN];
379    nonce[..4].copy_from_slice(&pkt.data_type().as_u32().to_le_bytes());
380    nonce[4..10].copy_from_slice(&(pkt.timestamp() & 0x0000_FFFF_FFFF_FFFF).to_le_bytes()[..6]);
381    nonce[10..].copy_from_slice(&pkt.nonce().to_le_bytes());
382    nonce
383}
384
385#[cfg(feature = "cryptography")]
386fn write_encrypted_payload(
387    pkt: &Packet,
388    key_id: u32,
389    plaintext_wire_payload: &[u8],
390    out: &mut Vec<u8>,
391) -> TelemetryResult<()> {
392    let aad_end = out.len();
393    let nonce = e2e_nonce_for_packet(pkt);
394    let mut ciphertext = vec![0u8; plaintext_wire_payload.len()];
395    let mut tag = [0u8; E2E_TAG_CAP];
396    let (ciphertext_len, tag_len) = crate::crypto::seal_with_registered_crypto(
397        key_id,
398        &nonce,
399        &out[..aad_end],
400        plaintext_wire_payload,
401        &mut ciphertext,
402        &mut tag,
403    )?;
404    if ciphertext_len > ciphertext.len() || tag_len > tag.len() {
405        return Err(TelemetryError::SizeMismatchError);
406    }
407    write_uleb128(u64::from(key_id), out);
408    write_uleb128(plaintext_wire_payload.len() as u64, out);
409    write_uleb128(nonce.len() as u64, out);
410    out.extend_from_slice(&nonce);
411    write_uleb128(tag_len as u64, out);
412    out.extend_from_slice(&tag[..tag_len]);
413    out.extend_from_slice(&ciphertext[..ciphertext_len]);
414    Ok(())
415}
416
417#[cfg(feature = "cryptography")]
418fn read_encrypted_payload(
419    r: &mut ByteReader,
420    aad: &[u8],
421    plaintext_len: usize,
422) -> TelemetryResult<Vec<u8>> {
423    let key_id = u32::try_from(read_uleb128(r)?)
424        .map_err(|_| TelemetryError::Unpack("e2e key id too large"))?;
425    let wire_payload_len = usize::try_from(read_uleb128(r)?)
426        .map_err(|_| TelemetryError::Unpack("e2e payload length"))?;
427    if wire_payload_len > plaintext_len {
428        return Err(TelemetryError::Unpack("bad e2e payload length"));
429    }
430    let nonce_len = usize::try_from(read_uleb128(r)?)
431        .map_err(|_| TelemetryError::Unpack("e2e nonce length"))?;
432    if nonce_len == 0 || nonce_len > 64 {
433        return Err(TelemetryError::Unpack("bad e2e nonce length"));
434    }
435    let nonce = r.read_bytes(nonce_len)?;
436    let tag_len =
437        usize::try_from(read_uleb128(r)?).map_err(|_| TelemetryError::Unpack("e2e tag length"))?;
438    if tag_len == 0 || tag_len > E2E_TAG_CAP {
439        return Err(TelemetryError::Unpack("bad e2e tag length"));
440    }
441    let tag = r.read_bytes(tag_len)?;
442    let ciphertext_len = r.remaining();
443    let ciphertext = r.read_bytes(ciphertext_len)?;
444    let mut plaintext = vec![0u8; wire_payload_len];
445    let opened_len = crate::crypto::open_with_registered_crypto(
446        key_id,
447        nonce,
448        aad,
449        ciphertext,
450        tag,
451        &mut plaintext,
452    )?;
453    if opened_len != wire_payload_len {
454        return Err(TelemetryError::SizeMismatchError);
455    }
456    Ok(plaintext)
457}
458
459#[inline]
460fn write_reliable_header(h: ReliableHeader, out: &mut Vec<u8>) {
461    out.push(h.flags);
462    write_u32_le(h.seq, out);
463    write_u32_le(h.ack, out);
464}
465
466#[inline]
467fn reliable_compact_size(h: ReliableHeader) -> usize {
468    let seq_present = (h.flags & RELIABLE_FLAG_ACK_ONLY) == 0 || h.seq != 0;
469    let ack_present = h.ack != 0 || (h.flags & RELIABLE_FLAG_ACK_ONLY) != 0;
470    1 + if seq_present {
471        uleb128_size(h.seq as u64)
472    } else {
473        0
474    } + if ack_present {
475        uleb128_size(h.ack as u64)
476    } else {
477        0
478    }
479}
480
481#[inline]
482fn should_compact_reliable_header(h: ReliableHeader) -> bool {
483    reliable_compact_size(h) < RELIABLE_HEADER_BYTES
484}
485
486#[inline]
487fn reliable_wire_size(h: ReliableHeader, compact: bool) -> usize {
488    if compact {
489        reliable_compact_size(h)
490    } else {
491        RELIABLE_HEADER_BYTES
492    }
493}
494
495pub(crate) fn write_reliable_header_encoded(h: ReliableHeader, compact: bool, out: &mut Vec<u8>) {
496    if !compact {
497        write_reliable_header(h, out);
498        return;
499    }
500
501    let seq_present = (h.flags & RELIABLE_FLAG_ACK_ONLY) == 0 || h.seq != 0;
502    let ack_present = h.ack != 0 || (h.flags & RELIABLE_FLAG_ACK_ONLY) != 0;
503    let mut wire_flags = h.flags & RELIABLE_PUBLIC_FLAGS_MASK;
504    if seq_present {
505        wire_flags |= RELIABLE_WIRE_FLAG_SEQ_PRESENT;
506    }
507    if ack_present {
508        wire_flags |= RELIABLE_WIRE_FLAG_ACK_PRESENT;
509    }
510    out.push(wire_flags);
511    if seq_present {
512        write_uleb128(h.seq as u64, out);
513    }
514    if ack_present {
515        write_uleb128(h.ack as u64, out);
516    }
517}
518
519#[inline]
520fn read_reliable_header(r: &mut ByteReader) -> Result<ReliableHeader, TelemetryError> {
521    let flags = r.read_bytes(1)?[0];
522    let seq = read_u32_le(r)?;
523    let ack = read_u32_le(r)?;
524    Ok(ReliableHeader { flags, seq, ack })
525}
526
527fn read_reliable_header_compact(r: &mut ByteReader) -> Result<ReliableHeader, TelemetryError> {
528    let wire_flags = r.read_bytes(1)?[0];
529    let seq = if (wire_flags & RELIABLE_WIRE_FLAG_SEQ_PRESENT) != 0 {
530        u32::try_from(read_uleb128(r)?)
531            .map_err(|_| TelemetryError::Unpack("reliable seq too large"))?
532    } else {
533        0
534    };
535    let ack = if (wire_flags & RELIABLE_WIRE_FLAG_ACK_PRESENT) != 0 {
536        u32::try_from(read_uleb128(r)?)
537            .map_err(|_| TelemetryError::Unpack("reliable ack too large"))?
538    } else {
539        0
540    };
541    Ok(ReliableHeader {
542        flags: wire_flags & RELIABLE_PUBLIC_FLAGS_MASK,
543        seq,
544        ack,
545    })
546}
547
548#[inline]
549fn read_reliable_header_encoded(
550    r: &mut ByteReader,
551    compact: bool,
552) -> Result<ReliableHeader, TelemetryError> {
553    if compact {
554        read_reliable_header_compact(r)
555    } else {
556        read_reliable_header(r)
557    }
558}
559
560#[cfg(feature = "std")]
561static ADDRESS_BOOK: OnceLock<Mutex<BTreeMap<u32, Arc<str>>>> = OnceLock::new();
562
563#[cfg(feature = "std")]
564fn address_book() -> &'static Mutex<BTreeMap<u32, Arc<str>>> {
565    ADDRESS_BOOK.get_or_init(|| Mutex::new(BTreeMap::new()))
566}
567
568#[inline]
569pub(crate) fn source_address_for_sender(sender: &str) -> u32 {
570    let addr = sender_address_u32(sender);
571    remember_source_address(addr, sender);
572    addr
573}
574
575#[cfg(feature = "std")]
576pub(crate) fn remember_source_address(addr: u32, sender: &str) {
577    address_book()
578        .lock()
579        .expect("wire address book poisoned")
580        .entry(addr)
581        .or_insert_with(|| Arc::<str>::from(sender));
582}
583
584#[cfg(not(feature = "std"))]
585pub(crate) fn remember_source_address(_addr: u32, _sender: &str) {}
586
587#[cfg(feature = "std")]
588fn sender_name_for_address(addr: u32) -> String {
589    address_book()
590        .lock()
591        .expect("wire address book poisoned")
592        .get(&addr)
593        .map(|sender| sender.as_ref().to_owned())
594        .unwrap_or_else(|| format!("@addr:{addr}"))
595}
596
597#[cfg(not(feature = "std"))]
598fn sender_name_for_address(addr: u32) -> String {
599    format!("@addr:{addr}")
600}
601
602// ===========================================================================
603// Endpoint bitmap constants and helpers
604// ===========================================================================
605
606/// Number of bits needed to cover all possible `DataEndpoint` discriminants.
607const EP_BITMAP_BITS: usize = (MAX_VALUE_DATA_ENDPOINT as usize) + 1;
608
609/// Number of bytes required to store [`EP_BITMAP_BITS`] bits.
610const EP_BITMAP_BYTES: usize = EP_BITMAP_BITS.div_ceil(8);
611
612/// Build a compact endpoint bitmap from the provided list of endpoints.
613///
614/// Each endpoint `ep` sets the bit at position `ep.as_u32()` in the bitmap.
615/// Bits are packed LSB-first within each byte.
616#[inline]
617fn build_endpoint_bitmap(eps: &[DataEndpoint]) -> [u8; EP_BITMAP_BYTES] {
618    let mut bm = [0u8; EP_BITMAP_BYTES];
619    for &ep in eps {
620        let idx = ep.as_u32() as usize;
621        debug_assert!(idx < EP_BITMAP_BITS, "endpoint discriminant out of range");
622        if idx < EP_BITMAP_BITS {
623            let byte = idx / 8;
624            let bit = idx % 8;
625            bm[byte] |= 1u8 << bit;
626        }
627    }
628    bm
629}
630
631/// Expand a bitmap of endpoints into a dense array and its logical length.
632///
633/// Returns `(array, len)` where:
634/// - `array[0..len]` are the active endpoints in ascending discriminant order.
635/// - `array[len..]` is filled with a dummy `DataEndpoint` and should be ignored.
636fn expand_endpoint_bitmap(
637    bm: &[u8],
638) -> Result<([DataEndpoint; EP_BITMAP_BITS], usize), TelemetryError> {
639    if bm.len() != EP_BITMAP_BYTES {
640        return Err(TelemetryError::Unpack("bad endpoint bitmap size"));
641    }
642
643    // Pick *any* valid endpoint as filler/dummy for the array.
644    let dummy = DataEndpoint::TelemetryError;
645
646    // Entire array is initialized to a valid value ⇒ fully safe.
647    let mut arr = [dummy; EP_BITMAP_BITS];
648
649    let mut len = 0usize;
650    for idx in 0..EP_BITMAP_BITS {
651        let byte = idx / 8;
652        let bit = idx % 8;
653        if (bm[byte] >> bit) & 1 != 0 {
654            let v = idx as u32;
655            let ep = DataEndpoint::try_from_u32(v)
656                .ok_or(TelemetryError::Unpack("bad endpoint bit set"))?;
657            arr[len] = ep;
658            len += 1;
659        }
660    }
661
662    Ok((arr, len))
663}
664
665#[inline]
666fn endpoint_bitmap_and_count(eps: &[DataEndpoint]) -> ([u8; EP_BITMAP_BYTES], usize) {
667    let bm = build_endpoint_bitmap(eps);
668    let count = bitmap_popcount(&bm);
669    (bm, count)
670}
671
672#[inline]
673fn endpoints_match_schema(ty: DataType, eps: &[DataEndpoint]) -> bool {
674    let (packet_bm, packet_count) = endpoint_bitmap_and_count(eps);
675    let (schema_bm, schema_count) = endpoint_bitmap_and_count(message_meta(ty).endpoints);
676    packet_count == schema_count && packet_bm == schema_bm
677}
678
679#[inline]
680fn schema_endpoints_from_type(ty: DataType, nep: usize) -> TelemetryResult<Arc<[DataEndpoint]>> {
681    let (bm, count) = endpoint_bitmap_and_count(message_meta(ty).endpoints);
682    let (ep_buf, ep_len) = expand_endpoint_bitmap(&bm)?;
683    if count != nep || ep_len != nep {
684        return Err(TelemetryError::Unpack("endpoint count mismatch"));
685    }
686    Ok(Arc::from(&ep_buf[..ep_len]))
687}
688
689#[inline]
690fn endpoints_from_wire_or_schema(
691    r: &mut ByteReader<'_>,
692    bitmap_present: bool,
693    ty: Option<DataType>,
694    nep: usize,
695) -> TelemetryResult<Arc<[DataEndpoint]>> {
696    if bitmap_present {
697        let bm = r.read_bytes(EP_BITMAP_BYTES)?;
698        let (ep_buf, ep_len) = expand_endpoint_bitmap(bm)?;
699        if ep_len != nep {
700            return Err(TelemetryError::Unpack("endpoint count mismatch"));
701        }
702        Ok(Arc::from(&ep_buf[..ep_len]))
703    } else {
704        let ty = ty.ok_or(TelemetryError::InvalidType)?;
705        schema_endpoints_from_type(ty, nep)
706    }
707}
708
709#[inline]
710fn data_type_id_from_wire(ty_v: u64) -> TelemetryResult<u32> {
711    let ty_u32 = u32::try_from(ty_v).map_err(|_| TelemetryError::Unpack("type too large"))?;
712    if ty_u32 > MAX_VALUE_DATA_TYPE {
713        return Err(TelemetryError::InvalidType);
714    }
715    Ok(ty_u32)
716}
717
718// ===========================================================================
719// Packing
720// ===========================================================================
721
722/// Pack a [`Packet`] into the compact v2 wire format.
723///
724/// The returned `Arc<[u8]>` owns the packed bytes and can be shared cheaply.
725/// # Arguments
726/// - `pkt`: Telemetry packet to pack.
727///
728/// # Returns
729/// - `Arc<[u8]>`: Packed packet in compact v2 wire format.
730pub fn pack_packet(pkt: &Packet) -> Arc<[u8]> {
731    if is_reliable_type(pkt.data_type()) {
732        // Default to an unsequenced reliable header to keep the wire format consistent.
733        // Use `pack_packet_with_reliable` for ordered/retransmitted delivery.
734        let hdr = ReliableHeader {
735            flags: RELIABLE_FLAG_UNSEQUENCED,
736            seq: 0,
737            ack: 0,
738        };
739        return pack_packet_with_reliable(pkt, hdr);
740    }
741    pack_packet_inner(pkt, None)
742}
743
744/// Pack a [`Packet`] with an explicit reliable header.
745///
746/// This should be used for data types configured as `reliable` in the schema.
747pub fn pack_packet_with_reliable(pkt: &Packet, header: ReliableHeader) -> Arc<[u8]> {
748    pack_packet_inner(pkt, Some(header))
749}
750
751/// Pack a reliable ACK-only control frame for the given data type.
752///
753/// The resulting bytes are not a valid `Packet` and should be handled
754/// by the router's reliable layer.
755pub fn pack_reliable_ack(sender: &str, ty: DataType, timestamp_ms: u64, ack: u32) -> Arc<[u8]> {
756    let bm = [0u8; EP_BITMAP_BYTES];
757    let source_address = source_address_for_sender(sender);
758
759    // No payload for ACK-only control frames.
760    let mut out = Vec::with_capacity(32 + EP_BITMAP_BYTES + CRC32_BYTES);
761
762    let flags: u8 = FLAG_ENDPOINT_BITMAP_PRESENT;
763    out.push(flags);
764    out.push(0u8); // NEP = 0
765
766    write_uleb128(ty.as_u32() as u64, &mut out);
767    write_uleb128(0u64, &mut out); // payload size
768    write_uleb128(timestamp_ms, &mut out);
769    write_uleb128(source_address as u64, &mut out);
770
771    out.extend_from_slice(&bm);
772    let reliable = ReliableHeader {
773        flags: RELIABLE_FLAG_ACK_ONLY,
774        seq: 0,
775        ack,
776    };
777    if should_compact_reliable_header(reliable) {
778        out[0] |= FLAG_COMPACT_RELIABLE_HEADER;
779        write_reliable_header_encoded(reliable, true, &mut out);
780    } else {
781        write_reliable_header_encoded(reliable, false, &mut out);
782    }
783    append_crc32(&mut out);
784
785    Arc::<[u8]>::from(out)
786}
787
788fn pack_packet_inner(pkt: &Packet, reliable: Option<ReliableHeader>) -> Arc<[u8]> {
789    pack_packet_inner_with_contract(
790        pkt,
791        reliable,
792        pkt.wire_shape(),
793        pkt.wire_target_senders(),
794        None,
795    )
796    .expect("plaintext packet packing failed")
797}
798
799/// Pack `pkt` while explicitly controlling the wire-contract metadata.
800///
801/// Router and relay forwarding paths use this helper when they need to attach
802/// or preserve a migration-safe contract instead of simply serializing the
803/// packet against the current runtime schema/topology view.
804///
805/// # Parameters
806/// - `pkt`: Logical packet to pack.
807/// - `reliable`: Optional hop-level reliable header to append after the
808///   contract.
809/// - `shape`: Optional inline payload shape to encode into the contract so
810///   downstream unpackers can validate/decode the payload against the
811///   original shape.
812/// - `target_senders`: Frozen destination-holder sender hashes that keep
813///   in-flight routing bound to the intended holders.
814///
815/// # Returns
816/// - `Ok(Arc<[u8]>)` containing the packed frame bytes.
817/// - `Err(TelemetryError)` if contract encoding fails.
818pub(crate) fn pack_packet_with_wire_contract(
819    pkt: &Packet,
820    reliable: Option<ReliableHeader>,
821    shape: Option<MessageElement>,
822    target_senders: &[u64],
823) -> TelemetryResult<Arc<[u8]>> {
824    pack_packet_inner_with_contract(pkt, reliable, shape, target_senders, None)
825}
826
827#[derive(Clone, Copy, Debug)]
828#[cfg_attr(not(feature = "cryptography"), allow(dead_code))]
829pub(crate) struct E2eSealConfig {
830    pub key_id: u32,
831}
832
833#[cfg(feature = "cryptography")]
834pub(crate) fn pack_packet_with_wire_contract_e2e(
835    pkt: &Packet,
836    reliable: Option<ReliableHeader>,
837    shape: Option<MessageElement>,
838    target_senders: &[u64],
839    e2e: E2eSealConfig,
840) -> TelemetryResult<Arc<[u8]>> {
841    pack_packet_inner_with_contract(pkt, reliable, shape, target_senders, Some(e2e))
842}
843
844fn pack_packet_inner_with_contract(
845    pkt: &Packet,
846    reliable: Option<ReliableHeader>,
847    shape: Option<MessageElement>,
848    target_senders: &[u64],
849    #[cfg_attr(not(feature = "cryptography"), allow(unused_variables))] e2e: Option<E2eSealConfig>,
850) -> TelemetryResult<Arc<[u8]>> {
851    let carries_wire_contract = shape.is_some() || !target_senders.is_empty();
852    let endpoints_are_schema_default = endpoints_match_schema(pkt.data_type(), pkt.endpoints());
853    let endpoint_bitmap_present = carries_wire_contract || !endpoints_are_schema_default;
854    let (bm, nep_unique) = endpoint_bitmap_and_count(pkt.endpoints());
855    let endpoint_bytes = if endpoint_bitmap_present {
856        EP_BITMAP_BYTES
857    } else {
858        0
859    };
860
861    let source_address = source_address_for_sender(pkt.sender());
862
863    // Decide whether to compress the payload.
864    let payload = pkt.payload();
865    let (payload_compressed, payload_wire) = payload_compression::compress_if_beneficial(payload);
866
867    // Heuristic capacity: fixed prelude + bitmap + reliable + payload_wire.
868    let reliable_is_compact = reliable.is_some_and(should_compact_reliable_header);
869    let reliable_len = if let Some(hdr) = reliable {
870        reliable_wire_size(hdr, reliable_is_compact)
871    } else {
872        0
873    };
874    let contract = encode_wire_contract(shape, target_senders, reliable.is_some())
875        .unwrap_or_else(|_| vec![0u8]);
876    let contract_len = if carries_wire_contract {
877        uleb128_size(contract.len() as u64) + contract.len()
878    } else {
879        0
880    };
881    let mut out = Vec::with_capacity(
882        16 + endpoint_bytes + contract_len + reliable_len + payload_wire.len() + CRC32_BYTES,
883    );
884
885    // FLAGS byte
886    let mut flags: u8 = 0;
887    if payload_compressed {
888        flags |= FLAG_COMPRESSED_PAYLOAD;
889    }
890    if carries_wire_contract {
891        flags |= FLAG_WIRE_CONTRACT;
892    }
893    if pkt.nonce() != 0 {
894        flags |= FLAG_PACKET_NONCE;
895    }
896    if endpoint_bitmap_present {
897        flags |= FLAG_ENDPOINT_BITMAP_PRESENT;
898    }
899    if reliable_is_compact {
900        flags |= FLAG_COMPACT_RELIABLE_HEADER;
901    }
902    #[cfg(feature = "cryptography")]
903    if e2e.is_some() {
904        flags |= FLAG_E2E_ENCRYPTED_PAYLOAD;
905    }
906    out.push(flags);
907
908    assert!(
909        nep_unique <= u8::MAX as usize,
910        "too many endpoints selected to fit in NEP u8"
911    );
912    out.push(nep_unique as u8);
913
914    // NOTE: data_size is the *logical* (uncompressed) payload size.
915    write_uleb128(pkt.data_type().as_u32() as u64, &mut out);
916    write_uleb128(pkt.data_size() as u64, &mut out);
917    write_uleb128(pkt.timestamp(), &mut out);
918    if pkt.nonce() != 0 {
919        write_uleb128(pkt.nonce() as u64, &mut out);
920    }
921
922    write_uleb128(source_address as u64, &mut out);
923
924    if endpoint_bitmap_present {
925        out.extend_from_slice(&bm);
926    }
927    if (flags & FLAG_WIRE_CONTRACT) != 0 {
928        // The contract must appear before the reliable header because it may
929        // carry the "reliable header present" bit for packets whose current
930        // runtime schema lookup no longer reflects the original wire semantics.
931        write_uleb128(contract.len() as u64, &mut out);
932        out.extend_from_slice(&contract);
933    }
934    if let Some(hdr) = reliable {
935        write_reliable_header_encoded(hdr, reliable_is_compact, &mut out);
936    }
937    #[cfg(feature = "cryptography")]
938    if let Some(e2e) = e2e {
939        write_encrypted_payload(pkt, e2e.key_id, &payload_wire, &mut out)?;
940    } else {
941        out.extend_from_slice(&payload_wire);
942    }
943    #[cfg(not(feature = "cryptography"))]
944    {
945        out.extend_from_slice(&payload_wire);
946    }
947    append_crc32(&mut out);
948
949    Ok(Arc::<[u8]>::from(out))
950}
951
952// ===========================================================================
953// Unpacking (full packet)
954// ===========================================================================
955
956/// Unpack a full [`Packet`] from a packed v2 wire frame.
957///
958/// This validates the frame CRC, resolves the endpoint set, decodes the
959/// optional migration-safe wire contract, parses the optional reliable header,
960/// and reconstructs the logical `Packet`. When the frame carries an inline wire
961/// shape, the returned packet preserves that shape so payload validation and
962/// formatting remain stable even if the local runtime schema has changed since
963/// the packet was originally packed.
964///
965/// # Parameters
966/// - `buf`: Complete packed frame bytes, including the CRC32 trailer.
967///
968/// # Returns
969/// - `Ok(Packet)` when the frame is well-formed and contains a payload-bearing
970///   telemetry packet.
971///
972/// # Errors
973/// - [`TelemetryError::Unpack`] if the frame is malformed, truncated, or
974///   fails CRC validation.
975/// - [`TelemetryError::InvalidType`] if the type ID is not valid and the frame
976///   does not carry enough inline shape information to keep decoding it.
977pub fn unpack_packet(buf: &[u8]) -> Result<Packet, TelemetryError> {
978    let data = verify_crc32(buf)?;
979    if data.is_empty() {
980        return Err(TelemetryError::Unpack("short prelude"));
981    }
982    let mut r = ByteReader::new(data);
983
984    let flags = r.read_bytes(1)?[0];
985    let payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
986    let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
987    let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
988    #[cfg(feature = "cryptography")]
989    let payload_is_encrypted = (flags & FLAG_E2E_ENCRYPTED_PAYLOAD) != 0;
990    #[cfg(not(feature = "cryptography"))]
991    if (flags & 0x10) != 0 {
992        return Err(TelemetryError::Unpack("e2e crypto unsupported"));
993    }
994
995    let nep = r.read_bytes(1)?[0] as usize;
996
997    let ty_v = read_uleb128(&mut r)?;
998    let dsz = read_uleb128(&mut r)? as usize; // logical (uncompressed) payload size
999    let ts_v = read_uleb128(&mut r)?;
1000    let nonce = if (flags & FLAG_PACKET_NONCE) != 0 {
1001        u16::try_from(read_uleb128(&mut r)?)
1002            .map_err(|_| TelemetryError::Unpack("packet nonce too large"))?
1003    } else {
1004        0
1005    };
1006    let source_address = u32::try_from(read_uleb128(&mut r)?)
1007        .map_err(|_| TelemetryError::Unpack("source address too large"))?;
1008    let sender_str = sender_name_for_address(source_address);
1009    let ty_u32 = data_type_id_from_wire(ty_v)?;
1010    let known_ty = DataType::try_from_u32(ty_u32);
1011    let endpoint_bytes = if endpoint_bitmap_present {
1012        EP_BITMAP_BYTES
1013    } else {
1014        0
1015    };
1016
1017    // For uncompressed payload: bitmap + [contract] + [reliable] + payload(dsz)
1018    // For compressed payload: bitmap + [contract] + [reliable] + at least 1 byte.
1019    if !payload_is_compressed {
1020        if r.remaining() < endpoint_bytes + dsz {
1021            return Err(TelemetryError::Unpack("short buffer"));
1022        }
1023    } else if r.remaining() < endpoint_bytes + 1 {
1024        return Err(TelemetryError::Unpack("short buffer"));
1025    }
1026
1027    let eps = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, nep)?;
1028
1029    let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
1030    let ty = known_ty
1031        .or_else(|| contract.shape.map(|_| DataType(ty_u32)))
1032        .ok_or(TelemetryError::InvalidType)?;
1033
1034    // ----- Reliable header (optional) -----
1035    let mut reliable_hdr: Option<ReliableHeader> = None;
1036    if is_reliable_type(ty) || contract.has_reliable_header {
1037        let hdr = read_reliable_header_encoded(&mut r, compact_reliable_header)?;
1038        if (hdr.flags & RELIABLE_FLAG_ACK_ONLY) != 0 {
1039            return Err(TelemetryError::Unpack("reliable control frame"));
1040        }
1041        reliable_hdr = Some(hdr);
1042    }
1043
1044    // ----- Payload handling -----
1045    let payload_arc: Arc<[u8]> = {
1046        #[cfg(feature = "cryptography")]
1047        let payload_wire_owned;
1048        #[cfg(feature = "cryptography")]
1049        let payload_wire: &[u8] = if payload_is_encrypted {
1050            let aad_end = r.off;
1051            payload_wire_owned = read_encrypted_payload(&mut r, &data[..aad_end], dsz)?;
1052            &payload_wire_owned
1053        } else if !payload_is_compressed {
1054            r.read_bytes(dsz)?
1055        } else {
1056            let comp_len = r.remaining();
1057            r.read_bytes(comp_len)?
1058        };
1059
1060        #[cfg(not(feature = "cryptography"))]
1061        let payload_wire: &[u8] = if !payload_is_compressed {
1062            r.read_bytes(dsz)?
1063        } else {
1064            let comp_len = r.remaining();
1065            r.read_bytes(comp_len)?
1066        };
1067
1068        if payload_is_compressed {
1069            let decompressed = payload_compression::decompress(payload_wire, dsz)?;
1070            Arc::<[u8]>::from(decompressed)
1071        } else {
1072            if payload_wire.len() != dsz {
1073                return Err(TelemetryError::Unpack("payload length mismatch"));
1074            }
1075            Arc::<[u8]>::from(payload_wire)
1076        }
1077    };
1078
1079    // `Packet` preserves logical payload data plus wire-contract metadata, but
1080    // not hop-level reliable transport state. The router/relay reliable layer
1081    // consumes that header before handing the logical packet onward.
1082    let _ = reliable_hdr;
1083    Packet::new_with_wire_contract(
1084        ty,
1085        &eps,
1086        &sender_str,
1087        ts_v,
1088        nonce,
1089        payload_arc,
1090        contract.shape,
1091        contract.target_senders,
1092    )
1093}
1094
1095// ===========================================================================
1096// Peek / envelope-only decode
1097// ===========================================================================
1098
1099/// Decode only the routing-relevant envelope of a packed packet.
1100///
1101/// This reads enough of the frame to expose type, endpoints, sender, timestamp,
1102/// and any migration-safe contract metadata without touching payload bytes.
1103/// That makes it the fast path for routing and other header-only inspection.
1104///
1105/// # Parameters
1106/// - `buf`: Complete packed frame bytes, including the CRC32 trailer.
1107///
1108/// # Returns
1109/// - `Ok(TelemetryEnvelope)` containing the logical packet envelope plus any
1110///   inline wire-shape and target-sender metadata.
1111///
1112/// # Errors
1113/// - [`TelemetryError::Unpack`] if the frame is malformed or fails CRC.
1114/// - [`TelemetryError::InvalidType`] if the type ID cannot be resolved.
1115pub fn peek_envelope(buf: &[u8]) -> TelemetryResult<TelemetryEnvelope> {
1116    let data = verify_crc32(buf)?;
1117    if data.is_empty() {
1118        return Err(TelemetryError::Unpack("short prelude"));
1119    }
1120    let mut r = ByteReader::new(data);
1121
1122    let flags = r.read_bytes(1)?[0];
1123    let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
1124    // We don't care about payload compression here.
1125    let _payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
1126
1127    let nep = r.read_bytes(1)?[0] as usize;
1128
1129    let ty_v = read_uleb128(&mut r)?;
1130    let _dsz = read_uleb128(&mut r)? as usize;
1131    let ts_v = read_uleb128(&mut r)?;
1132    if (flags & FLAG_PACKET_NONCE) != 0 {
1133        let _ = read_uleb128(&mut r)?;
1134    }
1135    let source_address = u32::try_from(read_uleb128(&mut r)?)
1136        .map_err(|_| TelemetryError::Unpack("source address too large"))?;
1137    let sender_str = sender_name_for_address(source_address);
1138    let ty_u32 = data_type_id_from_wire(ty_v)?;
1139    let known_ty = DataType::try_from_u32(ty_u32);
1140    let endpoint_bytes = if endpoint_bitmap_present {
1141        EP_BITMAP_BYTES
1142    } else {
1143        0
1144    };
1145
1146    if r.remaining() < endpoint_bytes {
1147        return Err(TelemetryError::Unpack("short buffer"));
1148    }
1149
1150    let eps = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, nep)?;
1151
1152    let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
1153    let ty = known_ty
1154        .or_else(|| contract.shape.map(|_| DataType(ty_u32)))
1155        .ok_or(TelemetryError::InvalidType)?;
1156
1157    Ok(TelemetryEnvelope {
1158        ty,
1159        endpoints: eps,
1160        sender: Arc::<str>::from(sender_str),
1161        source_address,
1162        timestamp_ms: ts_v,
1163        wire_shape: contract.shape,
1164        target_senders: contract.target_senders,
1165    })
1166}
1167
1168/// Decode the header/envelope and optional reliable header without touching the payload.
1169pub struct TelemetryFrameInfo {
1170    pub envelope: TelemetryEnvelope,
1171    pub reliable: Option<ReliableHeader>,
1172}
1173
1174impl TelemetryFrameInfo {
1175    #[inline]
1176    /// Returns `true` when the frame carries only a reliable-delivery acknowledgment.
1177    pub fn ack_only(&self) -> bool {
1178        self.reliable
1179            .map(|h| (h.flags & RELIABLE_FLAG_ACK_ONLY) != 0)
1180            .unwrap_or(false)
1181    }
1182}
1183
1184fn peek_frame_info_inner(buf: &[u8]) -> TelemetryResult<TelemetryFrameInfo> {
1185    if buf.is_empty() {
1186        return Err(TelemetryError::Unpack("short prelude"));
1187    }
1188    let mut r = ByteReader::new(buf);
1189
1190    let flags = r.read_bytes(1)?[0];
1191    let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
1192    let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
1193    let _payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
1194
1195    let nep = r.read_bytes(1)?[0] as usize;
1196
1197    let ty_v = read_uleb128(&mut r)?;
1198    let _dsz = read_uleb128(&mut r)? as usize;
1199    let ts_v = read_uleb128(&mut r)?;
1200    if (flags & FLAG_PACKET_NONCE) != 0 {
1201        let _ = read_uleb128(&mut r)?;
1202    }
1203    let source_address = u32::try_from(read_uleb128(&mut r)?)
1204        .map_err(|_| TelemetryError::Unpack("source address too large"))?;
1205    let sender_str = sender_name_for_address(source_address);
1206    let ty_u32 = data_type_id_from_wire(ty_v)?;
1207    let known_ty = DataType::try_from_u32(ty_u32);
1208    let endpoint_bytes = if endpoint_bitmap_present {
1209        EP_BITMAP_BYTES
1210    } else {
1211        0
1212    };
1213
1214    if r.remaining() < endpoint_bytes {
1215        return Err(TelemetryError::Unpack("short buffer"));
1216    }
1217
1218    let eps = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, nep)?;
1219
1220    let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
1221    let ty = known_ty
1222        .or_else(|| contract.shape.map(|_| DataType(ty_u32)))
1223        .ok_or(TelemetryError::InvalidType)?;
1224
1225    let reliable = if is_reliable_type(ty) || contract.has_reliable_header {
1226        if r.remaining() < 1 {
1227            return Err(TelemetryError::Unpack("short buffer"));
1228        }
1229        Some(read_reliable_header_encoded(
1230            &mut r,
1231            compact_reliable_header,
1232        )?)
1233    } else {
1234        None
1235    };
1236
1237    Ok(TelemetryFrameInfo {
1238        envelope: TelemetryEnvelope {
1239            ty,
1240            endpoints: eps,
1241            sender: Arc::<str>::from(sender_str),
1242            source_address,
1243            timestamp_ms: ts_v,
1244            wire_shape: contract.shape,
1245            target_senders: contract.target_senders,
1246        },
1247        reliable,
1248    })
1249}
1250
1251/// Peek the envelope plus reliable header (if present) without decoding payload bytes.
1252///
1253/// This is the primary router/relay fast path for reliable-layer decisions on
1254/// packed traffic. It still validates the frame CRC before exposing any
1255/// information.
1256pub fn peek_frame_info(buf: &[u8]) -> TelemetryResult<TelemetryFrameInfo> {
1257    let data = verify_crc32(buf)?;
1258    peek_frame_info_inner(data)
1259}
1260
1261/// Peek the envelope plus reliable header (if present) without validating CRC32.
1262///
1263/// This is intended only for internal call sites that have already validated
1264/// frame integrity or intentionally want best-effort inspection of partially
1265/// trusted bytes.
1266pub fn peek_frame_info_unchecked(buf: &[u8]) -> TelemetryResult<TelemetryFrameInfo> {
1267    let (data, _crc) = split_crc32(buf)?;
1268    peek_frame_info_inner(data)
1269}
1270
1271/// Locate the reliable-header byte offset within a packed frame.
1272///
1273/// The offset is computed after walking the source address and optional wire
1274/// contract. This matters because the contract can explicitly state that a
1275/// reliable header is present even if the current runtime schema no longer
1276/// marks the type as reliable.
1277///
1278/// # Parameters
1279/// - `buf`: Packed frame bytes, including the CRC32 trailer.
1280///
1281/// # Returns
1282/// - `Ok(Some(offset))` when a reliable header is present.
1283/// - `Ok(None)` when the frame carries no reliable header.
1284/// - `Err(TelemetryError)` when the frame is malformed.
1285pub fn reliable_header_offset(buf: &[u8]) -> TelemetryResult<Option<usize>> {
1286    Ok(reliable_header_span(buf)?.map(|(off, _, _)| off))
1287}
1288
1289pub(crate) fn reliable_header_span(
1290    buf: &[u8],
1291) -> TelemetryResult<Option<(usize, usize, ReliableHeader)>> {
1292    if buf.len() < CRC32_BYTES + 1 {
1293        return Err(TelemetryError::Unpack("short prelude"));
1294    }
1295    let data_len = buf.len().saturating_sub(CRC32_BYTES);
1296    let mut r = ByteReader::new(&buf[..data_len]);
1297
1298    let flags = r.read_bytes(1)?[0];
1299    let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
1300    let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
1301
1302    let _nep = r.read_bytes(1)?[0] as usize;
1303
1304    let ty_v = read_uleb128(&mut r)?;
1305    let _dsz = read_uleb128(&mut r)? as usize;
1306    let _ts_v = read_uleb128(&mut r)?;
1307    if (flags & FLAG_PACKET_NONCE) != 0 {
1308        let _ = read_uleb128(&mut r)?;
1309    }
1310    let _source_address = u32::try_from(read_uleb128(&mut r)?)
1311        .map_err(|_| TelemetryError::Unpack("source address too large"))?;
1312    let endpoint_bytes = if endpoint_bitmap_present {
1313        EP_BITMAP_BYTES
1314    } else {
1315        0
1316    };
1317
1318    if r.remaining() < endpoint_bytes {
1319        return Err(TelemetryError::Unpack("short buffer"));
1320    }
1321
1322    if endpoint_bitmap_present {
1323        r.read_bytes(EP_BITMAP_BYTES)?;
1324    }
1325    let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
1326    let ty_u32 = data_type_id_from_wire(ty_v)?;
1327    let ty = DataType::try_from_u32(ty_u32)
1328        .or_else(|| contract.shape.map(|_| DataType(ty_u32)))
1329        .ok_or(TelemetryError::InvalidType)?;
1330    if !is_reliable_type(ty) && !contract.has_reliable_header {
1331        return Ok(None);
1332    }
1333
1334    let off = r.off;
1335    let hdr = read_reliable_header_encoded(&mut r, compact_reliable_header)?;
1336    Ok(Some((off, r.off - off, hdr)))
1337}
1338
1339/// Rewrite the reliable header in-place and refresh the frame CRC32.
1340///
1341/// This avoids reserializing the entire packet when reliable transport code
1342/// only needs to change sequence/ACK state in an already-built frame.
1343///
1344/// # Parameters
1345/// - `buf`: Mutable packed frame bytes.
1346/// - `flags`: Replacement reliable-header flags byte.
1347/// - `seq`: Replacement sequence number.
1348/// - `ack`: Replacement cumulative ACK value.
1349///
1350/// # Returns
1351/// - `Ok(true)` if the frame carried a reliable header and it was rewritten.
1352/// - `Ok(false)` if no reliable header is present.
1353/// - `Err(TelemetryError)` if the frame is malformed.
1354pub fn rewrite_reliable_header(
1355    buf: &mut [u8],
1356    flags: u8,
1357    seq: u32,
1358    ack: u32,
1359) -> TelemetryResult<bool> {
1360    let Some((off, old_len, _)) = reliable_header_span(buf)? else {
1361        return Ok(false);
1362    };
1363    let hdr = ReliableHeader { flags, seq, ack };
1364    let compact = should_compact_reliable_header(hdr);
1365    if reliable_wire_size(hdr, compact) != old_len {
1366        return Err(TelemetryError::Unpack(
1367            "reliable header rewrite changes wire size",
1368        ));
1369    }
1370    let data_len = buf.len().saturating_sub(CRC32_BYTES);
1371    if data_len.saturating_sub(off) < old_len {
1372        return Err(TelemetryError::Unpack("short buffer"));
1373    }
1374    if compact {
1375        buf[0] |= FLAG_COMPACT_RELIABLE_HEADER;
1376    } else {
1377        buf[0] &= !FLAG_COMPACT_RELIABLE_HEADER;
1378    }
1379    let mut encoded = Vec::with_capacity(old_len);
1380    write_reliable_header_encoded(hdr, compact, &mut encoded);
1381    buf[off..off + old_len].copy_from_slice(&encoded);
1382    if buf.len() < CRC32_BYTES {
1383        return Err(TelemetryError::Unpack("short buffer"));
1384    }
1385    let crc = crc32_bytes(&buf[..data_len]);
1386    buf[data_len..data_len + CRC32_BYTES].copy_from_slice(&crc.to_le_bytes());
1387    Ok(true)
1388}
1389
1390pub(crate) fn rewrite_reliable_header_owned(
1391    buf: &[u8],
1392    flags: u8,
1393    seq: u32,
1394    ack: u32,
1395) -> TelemetryResult<Option<Arc<[u8]>>> {
1396    let Some((off, old_len, _)) = reliable_header_span(buf)? else {
1397        return Ok(None);
1398    };
1399    let data_len = buf.len().saturating_sub(CRC32_BYTES);
1400    if data_len < off + old_len {
1401        return Err(TelemetryError::Unpack("short buffer"));
1402    }
1403    let hdr = ReliableHeader { flags, seq, ack };
1404    let compact = should_compact_reliable_header(hdr);
1405    let mut encoded = Vec::with_capacity(reliable_wire_size(hdr, compact));
1406    write_reliable_header_encoded(hdr, compact, &mut encoded);
1407
1408    let mut out = Vec::with_capacity(data_len - old_len + encoded.len() + CRC32_BYTES);
1409    out.extend_from_slice(&buf[..off]);
1410    if compact {
1411        out[0] |= FLAG_COMPACT_RELIABLE_HEADER;
1412    } else {
1413        out[0] &= !FLAG_COMPACT_RELIABLE_HEADER;
1414    }
1415    out.extend_from_slice(&encoded);
1416    out.extend_from_slice(&buf[off + old_len..data_len]);
1417    let crc = crc32_bytes(&out);
1418    out.extend_from_slice(&crc.to_le_bytes());
1419    Ok(Some(Arc::from(out)))
1420}
1421
1422// ===========================================================================
1423// Size helpers
1424// ===========================================================================
1425
1426/// Compute the encoded metadata-prefix size for `pkt`.
1427///
1428/// This includes the fixed prelude and top-level varints, but excludes the
1429/// optional endpoint bitmap, optional wire contract, optional reliable header,
1430/// payload bytes, and CRC32 trailer.
1431///
1432/// # Parameters
1433/// - `pkt`: Packet to size.
1434///
1435/// # Returns
1436/// - Byte count of the packed metadata prefix.
1437pub fn header_size_bytes(pkt: &Packet) -> usize {
1438    let prelude = 2; // FLAGS (u8) + NEP (u8)
1439
1440    let source_address = sender_address_u32(pkt.sender());
1441
1442    prelude
1443        + uleb128_size(pkt.data_type().as_u32() as u64)
1444        + uleb128_size(pkt.data_size() as u64)
1445        + uleb128_size(pkt.timestamp())
1446        + if pkt.nonce() != 0 {
1447            uleb128_size(pkt.nonce() as u64)
1448        } else {
1449            0
1450        }
1451        + uleb128_size(source_address as u64)
1452}
1453
1454/// Compute the full packed wire size for `pkt`.
1455///
1456/// This applies the same sender/payload compression heuristics as normal
1457/// packing, then sums the metadata prefix, optional endpoint bitmap, optional
1458/// reliable header, payload bytes, and CRC32 trailer.
1459///
1460/// # Parameters
1461/// - `pkt`: Packet to size.
1462///
1463/// # Returns
1464/// - Total encoded frame size in bytes.
1465pub fn packet_wire_size(pkt: &Packet) -> usize {
1466    let header = header_size_bytes(pkt);
1467
1468    let payload = pkt.payload();
1469    let (_payload_compressed, payload_wire) = payload_compression::compress_if_beneficial(payload);
1470
1471    let reliable_len = if is_reliable_type(pkt.data_type()) {
1472        let hdr = ReliableHeader {
1473            flags: 0,
1474            seq: 0,
1475            ack: 0,
1476        };
1477        reliable_wire_size(hdr, should_compact_reliable_header(hdr))
1478    } else {
1479        0
1480    };
1481    let endpoint_len = if endpoints_match_schema(pkt.data_type(), pkt.endpoints()) {
1482        0
1483    } else {
1484        EP_BITMAP_BYTES
1485    };
1486
1487    header + endpoint_len + reliable_len + payload_wire.len() + CRC32_BYTES
1488}
1489
1490#[inline]
1491/// Computes the same packet ID as [`Packet::packet_id`] directly from a packed wire frame.
1492pub fn packet_id_from_wire(buf: &[u8]) -> Result<u64, TelemetryError> {
1493    let data = verify_crc32(buf)?;
1494    if data.len() < 2 {
1495        return Err(TelemetryError::Unpack("short prelude"));
1496    }
1497
1498    let mut r = ByteReader::new(data);
1499
1500    let flags = r.read_bytes(1)?[0];
1501    let payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
1502    let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
1503    let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
1504    #[cfg(feature = "cryptography")]
1505    let payload_is_encrypted = (flags & FLAG_E2E_ENCRYPTED_PAYLOAD) != 0;
1506    #[cfg(not(feature = "cryptography"))]
1507    if (flags & 0x10) != 0 {
1508        return Err(TelemetryError::Unpack("e2e crypto unsupported"));
1509    }
1510
1511    let _nep = r.read_bytes(1)?[0] as usize;
1512
1513    let ty_v = read_uleb128(&mut r)?;
1514    let dsz = read_uleb128(&mut r)? as usize; // logical payload size (uncompressed)
1515    let ts_v = read_uleb128(&mut r)?;
1516    let nonce = if (flags & FLAG_PACKET_NONCE) != 0 {
1517        u16::try_from(read_uleb128(&mut r)?)
1518            .map_err(|_| TelemetryError::Unpack("packet nonce too large"))?
1519    } else {
1520        0
1521    };
1522    let source_address = u32::try_from(read_uleb128(&mut r)?)
1523        .map_err(|_| TelemetryError::Unpack("source address too large"))?;
1524    let ty_u32 = data_type_id_from_wire(ty_v)?;
1525    let known_ty = DataType::try_from_u32(ty_u32);
1526    let endpoint_bytes = if endpoint_bitmap_present {
1527        EP_BITMAP_BYTES
1528    } else {
1529        0
1530    };
1531
1532    if r.remaining() < endpoint_bytes {
1533        return Err(TelemetryError::Unpack("short buffer"));
1534    }
1535
1536    let endpoints = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, _nep)?;
1537
1538    let _contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
1539    let ty = known_ty
1540        .or_else(|| _contract.shape.map(|_| DataType(ty_u32)))
1541        .ok_or(TelemetryError::InvalidType)?;
1542
1543    // ---- reliable header (optional) ----
1544    if is_reliable_type(ty) || _contract.has_reliable_header {
1545        let hdr = read_reliable_header_encoded(&mut r, compact_reliable_header)?;
1546        if (hdr.flags & RELIABLE_FLAG_ACK_ONLY) != 0 {
1547            return Err(TelemetryError::Unpack("reliable control frame"));
1548        }
1549    }
1550
1551    // ---- payload bytes (must hash *logical* payload bytes) ----
1552    #[cfg(feature = "cryptography")]
1553    let payload_wire_owned;
1554    #[cfg(feature = "cryptography")]
1555    let payload_wire: &[u8] = if payload_is_encrypted {
1556        let aad_end = r.off;
1557        payload_wire_owned =
1558            read_encrypted_payload(&mut r, data.get(..aad_end).unwrap_or(&[]), dsz)?;
1559        &payload_wire_owned
1560    } else if !payload_is_compressed {
1561        if r.remaining() < dsz {
1562            return Err(TelemetryError::Unpack("short buffer"));
1563        }
1564        r.read_bytes(dsz)?
1565    } else {
1566        let comp_len = r.remaining();
1567        if comp_len < 1 {
1568            return Err(TelemetryError::Unpack("short buffer"));
1569        }
1570        r.read_bytes(comp_len)?
1571    };
1572    #[cfg(not(feature = "cryptography"))]
1573    let payload_wire: &[u8] = if !payload_is_compressed {
1574        if r.remaining() < dsz {
1575            return Err(TelemetryError::Unpack("short buffer"));
1576        }
1577        r.read_bytes(dsz)?
1578    } else {
1579        let comp_len = r.remaining();
1580        if comp_len < 1 {
1581            return Err(TelemetryError::Unpack("short buffer"));
1582        }
1583        r.read_bytes(comp_len)?
1584    };
1585    let payload_decompressed;
1586    let payload_bytes: &[u8] = if payload_is_compressed {
1587        payload_decompressed = payload_compression::decompress(payload_wire, dsz)?;
1588        &payload_decompressed
1589    } else {
1590        if payload_wire.len() != dsz {
1591            return Err(TelemetryError::Unpack("payload length mismatch"));
1592        }
1593        payload_wire
1594    };
1595
1596    // ---- hash exactly like Packet::packet_id() ----
1597    let mut h: u64 = 0x9E37_79B9_7F4A_7C15;
1598
1599    // Compact source address.
1600    h = hash_bytes_u64(h, &source_address.to_le_bytes());
1601
1602    // Logical type as string bytes
1603    h = hash_bytes_u64(h, get_message_name(ty).as_bytes());
1604
1605    // Endpoints as string bytes, in ascending discriminant order.
1606    for ep in endpoints.iter() {
1607        h = hash_bytes_u64(h, ep.as_str().as_bytes());
1608    }
1609
1610    // Timestamp + data_size as bytes
1611    h = hash_bytes_u64(h, &ts_v.to_le_bytes());
1612    h = hash_bytes_u64(h, &nonce.to_le_bytes());
1613    h = hash_bytes_u64(h, &(dsz as u64).to_le_bytes());
1614
1615    // Payload bytes (logical payload)
1616    h = hash_bytes_u64(h, payload_bytes);
1617    Ok(h)
1618}
1619
1620mod payload_compression {
1621    use crate::TelemetryError;
1622    use alloc::borrow::Cow;
1623    #[cfg(feature = "compression")]
1624    use alloc::vec;
1625    use alloc::vec::Vec;
1626
1627    #[cfg(feature = "compression")]
1628    use crate::config::runtime_payload_compress_threshold;
1629    #[cfg(feature = "compression")]
1630    use zstd_safe::CompressionLevel;
1631
1632    /// Compress the given payload if it is beneficial to do so.
1633    /// # Arguments
1634    /// - `payload`: Original uncompressed payload bytes.
1635    /// # Returns
1636    /// - `(bool, Cow<[u8]>)`: Tuple where the first element indicates whether
1637    ///   compression was applied, and the second element is the resulting
1638    ///   payload bytes (compressed or original).
1639    #[cfg(feature = "compression")]
1640    pub fn compress_if_beneficial(payload: &'_ [u8]) -> (bool, Cow<'_, [u8]>) {
1641        if payload.len() < runtime_payload_compress_threshold() {
1642            return (false, Cow::Borrowed(payload));
1643        }
1644
1645        // Bound output and avoid growth beyond useful threshold.
1646        let Some(compressed) = compress_to_vec_bounded(payload, payload.len().saturating_sub(2))
1647        else {
1648            return (false, Cow::Borrowed(payload));
1649        };
1650
1651        // Only use compressed form if it actually saves space.
1652        if compressed.len() + 1 >= payload.len() {
1653            (false, Cow::Borrowed(payload))
1654        } else {
1655            (true, Cow::Owned(compressed))
1656        }
1657    }
1658
1659    #[cfg(feature = "compression")]
1660    fn compress_to_vec_bounded(input: &[u8], max_output: usize) -> Option<Vec<u8>> {
1661        if input.is_empty() || max_output == 0 {
1662            return None;
1663        }
1664
1665        let mut out = vec![0u8; max_output];
1666        // Use default-level behavior for better compression ratio on typical telemetry payloads.
1667        let level: CompressionLevel = 1;
1668        let written = zstd_safe::compress(&mut out[..], input, level).ok()?;
1669        out.truncate(written);
1670        Some(out)
1671    }
1672
1673    /// Decompress the given compressed payload.
1674    /// # Arguments
1675    /// - `compressed`: Compressed payload bytes.
1676    /// - `expected_len`: Expected length of the decompressed payload.
1677    /// # Returns
1678    /// - `Vec<u8>`: Decompressed payload bytes.
1679    /// # Errors
1680    /// - `TelemetryError::Unpack` if decompression fails or the size
1681    ///   does not match `expected_len`.
1682    #[cfg(feature = "compression")]
1683    pub fn decompress(compressed: &[u8], expected_len: usize) -> Result<Vec<u8>, TelemetryError> {
1684        let mut out = vec![0u8; expected_len];
1685        let written = zstd_safe::decompress(&mut out[..], compressed)
1686            .map_err(|_| TelemetryError::Unpack("decompression failed"))?;
1687        if written != expected_len {
1688            return Err(TelemetryError::Unpack("decompressed size mismatch"));
1689        }
1690        Ok(out)
1691    }
1692
1693    // Stub when compression is disabled (never actually produces compressed payloads).
1694    #[cfg(not(feature = "compression"))]
1695    /// Returns the original payload unchanged when compression support is disabled.
1696    pub fn compress_if_beneficial<'a>(payload: &'a [u8]) -> (bool, Cow<'a, [u8]>) {
1697        (false, Cow::Borrowed(payload))
1698    }
1699
1700    #[cfg(not(feature = "compression"))]
1701    /// Reports that compressed payloads cannot be decoded when compression support is disabled.
1702    pub fn decompress(_compressed: &[u8], _expected_len: usize) -> Result<Vec<u8>, TelemetryError> {
1703        Err(TelemetryError::Unpack(
1704            "compressed payloads not supported (compression feature disabled)",
1705        ))
1706    }
1707}