1use crate::MessageElement;
13use crate::{
14 DataEndpoint, TelemetryError, TelemetryResult, get_message_name, is_reliable_type,
15 message_meta,
16 packet::Packet,
17 {MAX_VALUE_DATA_ENDPOINT, MAX_VALUE_DATA_TYPE, config::DataType},
18};
19
20use crate::packet::{hash_bytes_u64, sender_address_u32};
21#[cfg(feature = "std")]
22use alloc::borrow::ToOwned;
23#[cfg(feature = "std")]
24use alloc::collections::BTreeMap;
25use alloc::{format, string::String, sync::Arc, vec, vec::Vec};
26use crc32fast::Hasher as Crc32Hasher;
27#[cfg(feature = "std")]
28use std::sync::{Mutex, OnceLock};
29
30#[derive(Clone, Debug, PartialEq, Eq)]
34pub struct TelemetryEnvelope {
35 pub ty: DataType,
37 pub endpoints: Arc<[DataEndpoint]>,
39 pub sender: Arc<str>,
41 pub source_address: u32,
43 pub timestamp_ms: u64,
45 pub wire_shape: Option<MessageElement>,
47 pub target_senders: Arc<[u64]>,
49}
50
51#[derive(Clone, Copy, Debug, PartialEq, Eq)]
53pub struct ReliableHeader {
54 pub flags: u8,
55 pub seq: u32,
56 pub ack: u32,
57}
58
59pub const RELIABLE_FLAG_ACK_ONLY: u8 = 0x01;
61
62pub const RELIABLE_FLAG_UNORDERED: u8 = 0x02;
64
65pub const RELIABLE_FLAG_UNSEQUENCED: u8 = 0x80;
67
68pub const RELIABLE_HEADER_BYTES: usize = 1 + 4 + 4;
70pub const CRC32_BYTES: usize = 4;
72
73const FLAG_COMPRESSED_PAYLOAD: u8 = 0x01;
101const FLAG_WIRE_CONTRACT: u8 = 0x04;
102const FLAG_PACKET_NONCE: u8 = 0x08;
103#[cfg(feature = "cryptography")]
104const FLAG_E2E_ENCRYPTED_PAYLOAD: u8 = 0x10;
105const FLAG_ENDPOINT_BITMAP_PRESENT: u8 = 0x20;
106const FLAG_COMPACT_RELIABLE_HEADER: u8 = 0x40;
107const CONTRACT_FLAG_TARGETS: u8 = 0x01;
108const CONTRACT_FLAG_SHAPE: u8 = 0x02;
109const CONTRACT_FLAG_RELIABLE_HEADER: u8 = 0x04;
110const RELIABLE_WIRE_FLAG_SEQ_PRESENT: u8 = 0x04;
111const RELIABLE_WIRE_FLAG_ACK_PRESENT: u8 = 0x08;
112const RELIABLE_PUBLIC_FLAGS_MASK: u8 =
113 RELIABLE_FLAG_ACK_ONLY | RELIABLE_FLAG_UNORDERED | RELIABLE_FLAG_UNSEQUENCED;
114#[cfg(feature = "cryptography")]
115const E2E_NONCE_LEN: usize = 12;
116#[cfg(feature = "cryptography")]
117const E2E_TAG_CAP: usize = 32;
118
119#[derive(Clone, Debug, PartialEq, Eq)]
120struct WireContract {
121 shape: Option<MessageElement>,
122 target_senders: Arc<[u64]>,
123 has_reliable_header: bool,
124}
125
126#[inline]
128fn write_uleb128<T>(mut v: u64, out: &mut Vec<T>)
129where
130 T: From<u8>,
131{
132 loop {
133 let mut byte = (v & 0x7F) as u8;
134 v >>= 7;
135 if v != 0 {
136 byte |= 0x80;
137 }
138 out.push(T::from(byte));
139 if v == 0 {
140 break;
141 }
142 }
143}
144
145#[inline]
147fn read_uleb128(r: &mut ByteReader) -> Result<u64, TelemetryError> {
148 let mut result: u64 = 0;
149 let mut shift = 0u32;
150 for _ in 0..10 {
152 let b = r.read_bytes(1)?[0];
153 result |= ((b & 0x7F) as u64) << shift;
154 if (b & 0x80) == 0 {
155 return Ok(result);
156 }
157 shift += 7;
158 }
159 Err(TelemetryError::Unpack("uleb128 too long"))
160}
161
162#[inline]
164fn uleb128_size(mut v: u64) -> usize {
165 let mut n = 1;
166 while v >= 0x80 {
167 v >>= 7;
168 n += 1;
169 }
170 n
171}
172
173#[inline]
175fn bitmap_popcount(bm: &[u8]) -> usize {
176 bm.iter().map(|b| b.count_ones() as usize).sum()
177}
178
179#[derive(Clone, Copy)]
184struct ByteReader<'a> {
185 buf: &'a [u8],
186 off: usize,
187}
188
189impl<'a> ByteReader<'a> {
190 fn new(buf: &'a [u8]) -> Self {
192 Self { buf, off: 0 }
193 }
194
195 fn remaining(&self) -> usize {
197 self.buf.len().saturating_sub(self.off)
198 }
199
200 fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], TelemetryError> {
202 if self.remaining() < n {
203 return Err(TelemetryError::Unpack("short read"));
204 }
205 let s = &self.buf[self.off..self.off + n];
206 self.off += n;
207 Ok(s)
208 }
209}
210
211#[inline]
212fn write_u32_le(v: u32, out: &mut Vec<u8>) {
213 out.extend_from_slice(&v.to_le_bytes());
214}
215
216#[inline]
217fn read_u32_le(r: &mut ByteReader) -> Result<u32, TelemetryError> {
218 let b = r.read_bytes(4)?;
219 Ok(u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
220}
221
222#[inline]
223fn encode_wire_shape(shape: MessageElement) -> Result<Vec<u8>, TelemetryError> {
224 let dt = crate::config::message_data_type_code(shape.data_type());
225 let class = crate::config::message_class_code(shape.message_type());
226 let mut out = Vec::with_capacity(6);
227 let mut packed = dt | (class << 4);
228 if matches!(shape, MessageElement::Static(_, _, _)) {
229 packed |= 1 << 6;
230 }
231 out.push(packed);
232 if let MessageElement::Static(count, _, _) = shape {
233 let count = u64::try_from(count).map_err(|_| TelemetryError::Pack("wire shape count"))?;
234 write_uleb128(count, &mut out);
235 }
236 Ok(out)
237}
238
239#[inline]
240fn decode_wire_shape(r: &mut ByteReader) -> Result<MessageElement, TelemetryError> {
241 let packed = r.read_bytes(1)?[0];
242 let dt = crate::config::message_data_type_from_code(packed & 0x0F)
243 .ok_or(TelemetryError::Unpack("wire shape type"))?;
244 let class = crate::config::message_class_from_code((packed >> 4) & 0x03)
245 .ok_or(TelemetryError::Unpack("wire shape class"))?;
246 if (packed & (1 << 6)) != 0 {
247 let count = usize::try_from(read_uleb128(r)?)
248 .map_err(|_| TelemetryError::Unpack("wire shape count"))?;
249 Ok(MessageElement::Static(count, dt, class))
250 } else {
251 Ok(MessageElement::Dynamic(dt, class))
252 }
253}
254
255#[inline]
256fn encode_wire_contract(
257 shape: Option<MessageElement>,
258 target_senders: &[u64],
259 has_reliable_header: bool,
260) -> Result<Vec<u8>, TelemetryError> {
261 let mut out = Vec::new();
265 let mut flags = 0u8;
266 if !target_senders.is_empty() {
267 flags |= CONTRACT_FLAG_TARGETS;
268 }
269 if shape.is_some() {
270 flags |= CONTRACT_FLAG_SHAPE;
271 }
272 if has_reliable_header {
273 flags |= CONTRACT_FLAG_RELIABLE_HEADER;
274 }
275 out.push(flags);
276 if let Some(shape) = shape {
277 out.extend_from_slice(&encode_wire_shape(shape)?);
278 }
279 if !target_senders.is_empty() {
280 write_uleb128(target_senders.len() as u64, &mut out);
281 for hash in target_senders {
282 out.extend_from_slice(&hash.to_le_bytes());
283 }
284 }
285 Ok(out)
286}
287
288#[inline]
289fn decode_wire_contract(
290 r: &mut ByteReader,
291 has_contract: bool,
292) -> Result<WireContract, TelemetryError> {
293 if !has_contract {
294 return Ok(WireContract {
295 shape: None,
296 target_senders: Arc::<[u64]>::from([]),
297 has_reliable_header: false,
298 });
299 }
300 let contract_len = usize::try_from(read_uleb128(r)?)
301 .map_err(|_| TelemetryError::Unpack("wire contract length"))?;
302 let contract_bytes = r.read_bytes(contract_len)?;
303 let mut cr = ByteReader::new(contract_bytes);
304 let flags = cr.read_bytes(1)?[0];
308 let shape = if (flags & CONTRACT_FLAG_SHAPE) != 0 {
309 Some(decode_wire_shape(&mut cr)?)
310 } else {
311 None
312 };
313 let target_senders: Arc<[u64]> = if (flags & CONTRACT_FLAG_TARGETS) != 0 {
314 let count = usize::try_from(read_uleb128(&mut cr)?)
315 .map_err(|_| TelemetryError::Unpack("wire contract target count"))?;
316 let mut targets = Vec::with_capacity(count);
317 for _ in 0..count {
318 let bytes = cr.read_bytes(8)?;
319 targets.push(u64::from_le_bytes([
320 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
321 ]));
322 }
323 Arc::from(targets)
324 } else {
325 Arc::<[u64]>::from([])
326 };
327 if cr.remaining() != 0 {
328 return Err(TelemetryError::Unpack("wire contract trailing bytes"));
329 }
330 Ok(WireContract {
331 shape,
332 target_senders,
333 has_reliable_header: (flags & CONTRACT_FLAG_RELIABLE_HEADER) != 0,
334 })
335}
336
337#[inline]
338fn crc32_bytes(data: &[u8]) -> u32 {
339 let mut hasher = Crc32Hasher::new();
340 hasher.update(data);
341 hasher.finalize()
342}
343
344#[inline]
345fn append_crc32(out: &mut Vec<u8>) {
346 let crc = crc32_bytes(out);
347 out.extend_from_slice(&crc.to_le_bytes());
348}
349
350#[inline]
351fn split_crc32(buf: &[u8]) -> Result<(&[u8], u32), TelemetryError> {
352 if buf.len() < CRC32_BYTES {
353 return Err(TelemetryError::Unpack("short buffer"));
354 }
355 let data_len = buf.len() - CRC32_BYTES;
356 let crc = u32::from_le_bytes([
357 buf[data_len],
358 buf[data_len + 1],
359 buf[data_len + 2],
360 buf[data_len + 3],
361 ]);
362 Ok((&buf[..data_len], crc))
363}
364
365#[inline]
366fn verify_crc32(buf: &[u8]) -> Result<&[u8], TelemetryError> {
367 let (data, expected) = split_crc32(buf)?;
368 let actual = crc32_bytes(data);
369 if actual != expected {
370 return Err(TelemetryError::Unpack("crc32 mismatch"));
371 }
372 Ok(data)
373}
374
375#[cfg(feature = "cryptography")]
376#[inline]
377fn e2e_nonce_for_packet(pkt: &Packet) -> [u8; E2E_NONCE_LEN] {
378 let mut nonce = [0u8; E2E_NONCE_LEN];
379 nonce[..4].copy_from_slice(&pkt.data_type().as_u32().to_le_bytes());
380 nonce[4..10].copy_from_slice(&(pkt.timestamp() & 0x0000_FFFF_FFFF_FFFF).to_le_bytes()[..6]);
381 nonce[10..].copy_from_slice(&pkt.nonce().to_le_bytes());
382 nonce
383}
384
385#[cfg(feature = "cryptography")]
386fn write_encrypted_payload(
387 pkt: &Packet,
388 key_id: u32,
389 plaintext_wire_payload: &[u8],
390 out: &mut Vec<u8>,
391) -> TelemetryResult<()> {
392 let aad_end = out.len();
393 let nonce = e2e_nonce_for_packet(pkt);
394 let mut ciphertext = vec![0u8; plaintext_wire_payload.len()];
395 let mut tag = [0u8; E2E_TAG_CAP];
396 let (ciphertext_len, tag_len) = crate::crypto::seal_with_registered_crypto(
397 key_id,
398 &nonce,
399 &out[..aad_end],
400 plaintext_wire_payload,
401 &mut ciphertext,
402 &mut tag,
403 )?;
404 if ciphertext_len > ciphertext.len() || tag_len > tag.len() {
405 return Err(TelemetryError::SizeMismatchError);
406 }
407 write_uleb128(u64::from(key_id), out);
408 write_uleb128(plaintext_wire_payload.len() as u64, out);
409 write_uleb128(nonce.len() as u64, out);
410 out.extend_from_slice(&nonce);
411 write_uleb128(tag_len as u64, out);
412 out.extend_from_slice(&tag[..tag_len]);
413 out.extend_from_slice(&ciphertext[..ciphertext_len]);
414 Ok(())
415}
416
417#[cfg(feature = "cryptography")]
418fn read_encrypted_payload(
419 r: &mut ByteReader,
420 aad: &[u8],
421 plaintext_len: usize,
422) -> TelemetryResult<Vec<u8>> {
423 let key_id = u32::try_from(read_uleb128(r)?)
424 .map_err(|_| TelemetryError::Unpack("e2e key id too large"))?;
425 let wire_payload_len = usize::try_from(read_uleb128(r)?)
426 .map_err(|_| TelemetryError::Unpack("e2e payload length"))?;
427 if wire_payload_len > plaintext_len {
428 return Err(TelemetryError::Unpack("bad e2e payload length"));
429 }
430 let nonce_len = usize::try_from(read_uleb128(r)?)
431 .map_err(|_| TelemetryError::Unpack("e2e nonce length"))?;
432 if nonce_len == 0 || nonce_len > 64 {
433 return Err(TelemetryError::Unpack("bad e2e nonce length"));
434 }
435 let nonce = r.read_bytes(nonce_len)?;
436 let tag_len =
437 usize::try_from(read_uleb128(r)?).map_err(|_| TelemetryError::Unpack("e2e tag length"))?;
438 if tag_len == 0 || tag_len > E2E_TAG_CAP {
439 return Err(TelemetryError::Unpack("bad e2e tag length"));
440 }
441 let tag = r.read_bytes(tag_len)?;
442 let ciphertext_len = r.remaining();
443 let ciphertext = r.read_bytes(ciphertext_len)?;
444 let mut plaintext = vec![0u8; wire_payload_len];
445 let opened_len = crate::crypto::open_with_registered_crypto(
446 key_id,
447 nonce,
448 aad,
449 ciphertext,
450 tag,
451 &mut plaintext,
452 )?;
453 if opened_len != wire_payload_len {
454 return Err(TelemetryError::SizeMismatchError);
455 }
456 Ok(plaintext)
457}
458
459#[inline]
460fn write_reliable_header(h: ReliableHeader, out: &mut Vec<u8>) {
461 out.push(h.flags);
462 write_u32_le(h.seq, out);
463 write_u32_le(h.ack, out);
464}
465
466#[inline]
467fn reliable_compact_size(h: ReliableHeader) -> usize {
468 let seq_present = (h.flags & RELIABLE_FLAG_ACK_ONLY) == 0 || h.seq != 0;
469 let ack_present = h.ack != 0 || (h.flags & RELIABLE_FLAG_ACK_ONLY) != 0;
470 1 + if seq_present {
471 uleb128_size(h.seq as u64)
472 } else {
473 0
474 } + if ack_present {
475 uleb128_size(h.ack as u64)
476 } else {
477 0
478 }
479}
480
481#[inline]
482fn should_compact_reliable_header(h: ReliableHeader) -> bool {
483 reliable_compact_size(h) < RELIABLE_HEADER_BYTES
484}
485
486#[inline]
487fn reliable_wire_size(h: ReliableHeader, compact: bool) -> usize {
488 if compact {
489 reliable_compact_size(h)
490 } else {
491 RELIABLE_HEADER_BYTES
492 }
493}
494
495pub(crate) fn write_reliable_header_encoded(h: ReliableHeader, compact: bool, out: &mut Vec<u8>) {
496 if !compact {
497 write_reliable_header(h, out);
498 return;
499 }
500
501 let seq_present = (h.flags & RELIABLE_FLAG_ACK_ONLY) == 0 || h.seq != 0;
502 let ack_present = h.ack != 0 || (h.flags & RELIABLE_FLAG_ACK_ONLY) != 0;
503 let mut wire_flags = h.flags & RELIABLE_PUBLIC_FLAGS_MASK;
504 if seq_present {
505 wire_flags |= RELIABLE_WIRE_FLAG_SEQ_PRESENT;
506 }
507 if ack_present {
508 wire_flags |= RELIABLE_WIRE_FLAG_ACK_PRESENT;
509 }
510 out.push(wire_flags);
511 if seq_present {
512 write_uleb128(h.seq as u64, out);
513 }
514 if ack_present {
515 write_uleb128(h.ack as u64, out);
516 }
517}
518
519#[inline]
520fn read_reliable_header(r: &mut ByteReader) -> Result<ReliableHeader, TelemetryError> {
521 let flags = r.read_bytes(1)?[0];
522 let seq = read_u32_le(r)?;
523 let ack = read_u32_le(r)?;
524 Ok(ReliableHeader { flags, seq, ack })
525}
526
527fn read_reliable_header_compact(r: &mut ByteReader) -> Result<ReliableHeader, TelemetryError> {
528 let wire_flags = r.read_bytes(1)?[0];
529 let seq = if (wire_flags & RELIABLE_WIRE_FLAG_SEQ_PRESENT) != 0 {
530 u32::try_from(read_uleb128(r)?)
531 .map_err(|_| TelemetryError::Unpack("reliable seq too large"))?
532 } else {
533 0
534 };
535 let ack = if (wire_flags & RELIABLE_WIRE_FLAG_ACK_PRESENT) != 0 {
536 u32::try_from(read_uleb128(r)?)
537 .map_err(|_| TelemetryError::Unpack("reliable ack too large"))?
538 } else {
539 0
540 };
541 Ok(ReliableHeader {
542 flags: wire_flags & RELIABLE_PUBLIC_FLAGS_MASK,
543 seq,
544 ack,
545 })
546}
547
548#[inline]
549fn read_reliable_header_encoded(
550 r: &mut ByteReader,
551 compact: bool,
552) -> Result<ReliableHeader, TelemetryError> {
553 if compact {
554 read_reliable_header_compact(r)
555 } else {
556 read_reliable_header(r)
557 }
558}
559
560#[cfg(feature = "std")]
561static ADDRESS_BOOK: OnceLock<Mutex<BTreeMap<u32, Arc<str>>>> = OnceLock::new();
562
563#[cfg(feature = "std")]
564fn address_book() -> &'static Mutex<BTreeMap<u32, Arc<str>>> {
565 ADDRESS_BOOK.get_or_init(|| Mutex::new(BTreeMap::new()))
566}
567
568#[inline]
569pub(crate) fn source_address_for_sender(sender: &str) -> u32 {
570 let addr = sender_address_u32(sender);
571 remember_source_address(addr, sender);
572 addr
573}
574
575#[cfg(feature = "std")]
576pub(crate) fn remember_source_address(addr: u32, sender: &str) {
577 address_book()
578 .lock()
579 .expect("wire address book poisoned")
580 .entry(addr)
581 .or_insert_with(|| Arc::<str>::from(sender));
582}
583
584#[cfg(not(feature = "std"))]
585pub(crate) fn remember_source_address(_addr: u32, _sender: &str) {}
586
587#[cfg(feature = "std")]
588fn sender_name_for_address(addr: u32) -> String {
589 address_book()
590 .lock()
591 .expect("wire address book poisoned")
592 .get(&addr)
593 .map(|sender| sender.as_ref().to_owned())
594 .unwrap_or_else(|| format!("@addr:{addr}"))
595}
596
597#[cfg(not(feature = "std"))]
598fn sender_name_for_address(addr: u32) -> String {
599 format!("@addr:{addr}")
600}
601
602const EP_BITMAP_BITS: usize = (MAX_VALUE_DATA_ENDPOINT as usize) + 1;
608
609const EP_BITMAP_BYTES: usize = EP_BITMAP_BITS.div_ceil(8);
611
612#[inline]
617fn build_endpoint_bitmap(eps: &[DataEndpoint]) -> [u8; EP_BITMAP_BYTES] {
618 let mut bm = [0u8; EP_BITMAP_BYTES];
619 for &ep in eps {
620 let idx = ep.as_u32() as usize;
621 debug_assert!(idx < EP_BITMAP_BITS, "endpoint discriminant out of range");
622 if idx < EP_BITMAP_BITS {
623 let byte = idx / 8;
624 let bit = idx % 8;
625 bm[byte] |= 1u8 << bit;
626 }
627 }
628 bm
629}
630
631fn expand_endpoint_bitmap(
637 bm: &[u8],
638) -> Result<([DataEndpoint; EP_BITMAP_BITS], usize), TelemetryError> {
639 if bm.len() != EP_BITMAP_BYTES {
640 return Err(TelemetryError::Unpack("bad endpoint bitmap size"));
641 }
642
643 let dummy = DataEndpoint::TelemetryError;
645
646 let mut arr = [dummy; EP_BITMAP_BITS];
648
649 let mut len = 0usize;
650 for idx in 0..EP_BITMAP_BITS {
651 let byte = idx / 8;
652 let bit = idx % 8;
653 if (bm[byte] >> bit) & 1 != 0 {
654 let v = idx as u32;
655 let ep = DataEndpoint::try_from_u32(v)
656 .ok_or(TelemetryError::Unpack("bad endpoint bit set"))?;
657 arr[len] = ep;
658 len += 1;
659 }
660 }
661
662 Ok((arr, len))
663}
664
665#[inline]
666fn endpoint_bitmap_and_count(eps: &[DataEndpoint]) -> ([u8; EP_BITMAP_BYTES], usize) {
667 let bm = build_endpoint_bitmap(eps);
668 let count = bitmap_popcount(&bm);
669 (bm, count)
670}
671
672#[inline]
673fn endpoints_match_schema(ty: DataType, eps: &[DataEndpoint]) -> bool {
674 let (packet_bm, packet_count) = endpoint_bitmap_and_count(eps);
675 let (schema_bm, schema_count) = endpoint_bitmap_and_count(message_meta(ty).endpoints);
676 packet_count == schema_count && packet_bm == schema_bm
677}
678
679#[inline]
680fn schema_endpoints_from_type(ty: DataType, nep: usize) -> TelemetryResult<Arc<[DataEndpoint]>> {
681 let (bm, count) = endpoint_bitmap_and_count(message_meta(ty).endpoints);
682 let (ep_buf, ep_len) = expand_endpoint_bitmap(&bm)?;
683 if count != nep || ep_len != nep {
684 return Err(TelemetryError::Unpack("endpoint count mismatch"));
685 }
686 Ok(Arc::from(&ep_buf[..ep_len]))
687}
688
689#[inline]
690fn endpoints_from_wire_or_schema(
691 r: &mut ByteReader<'_>,
692 bitmap_present: bool,
693 ty: Option<DataType>,
694 nep: usize,
695) -> TelemetryResult<Arc<[DataEndpoint]>> {
696 if bitmap_present {
697 let bm = r.read_bytes(EP_BITMAP_BYTES)?;
698 let (ep_buf, ep_len) = expand_endpoint_bitmap(bm)?;
699 if ep_len != nep {
700 return Err(TelemetryError::Unpack("endpoint count mismatch"));
701 }
702 Ok(Arc::from(&ep_buf[..ep_len]))
703 } else {
704 let ty = ty.ok_or(TelemetryError::InvalidType)?;
705 schema_endpoints_from_type(ty, nep)
706 }
707}
708
709#[inline]
710fn data_type_id_from_wire(ty_v: u64) -> TelemetryResult<u32> {
711 let ty_u32 = u32::try_from(ty_v).map_err(|_| TelemetryError::Unpack("type too large"))?;
712 if ty_u32 > MAX_VALUE_DATA_TYPE {
713 return Err(TelemetryError::InvalidType);
714 }
715 Ok(ty_u32)
716}
717
718pub fn pack_packet(pkt: &Packet) -> Arc<[u8]> {
731 if is_reliable_type(pkt.data_type()) {
732 let hdr = ReliableHeader {
735 flags: RELIABLE_FLAG_UNSEQUENCED,
736 seq: 0,
737 ack: 0,
738 };
739 return pack_packet_with_reliable(pkt, hdr);
740 }
741 pack_packet_inner(pkt, None)
742}
743
744pub fn pack_packet_with_reliable(pkt: &Packet, header: ReliableHeader) -> Arc<[u8]> {
748 pack_packet_inner(pkt, Some(header))
749}
750
751pub fn pack_reliable_ack(sender: &str, ty: DataType, timestamp_ms: u64, ack: u32) -> Arc<[u8]> {
756 let bm = [0u8; EP_BITMAP_BYTES];
757 let source_address = source_address_for_sender(sender);
758
759 let mut out = Vec::with_capacity(32 + EP_BITMAP_BYTES + CRC32_BYTES);
761
762 let flags: u8 = FLAG_ENDPOINT_BITMAP_PRESENT;
763 out.push(flags);
764 out.push(0u8); write_uleb128(ty.as_u32() as u64, &mut out);
767 write_uleb128(0u64, &mut out); write_uleb128(timestamp_ms, &mut out);
769 write_uleb128(source_address as u64, &mut out);
770
771 out.extend_from_slice(&bm);
772 let reliable = ReliableHeader {
773 flags: RELIABLE_FLAG_ACK_ONLY,
774 seq: 0,
775 ack,
776 };
777 if should_compact_reliable_header(reliable) {
778 out[0] |= FLAG_COMPACT_RELIABLE_HEADER;
779 write_reliable_header_encoded(reliable, true, &mut out);
780 } else {
781 write_reliable_header_encoded(reliable, false, &mut out);
782 }
783 append_crc32(&mut out);
784
785 Arc::<[u8]>::from(out)
786}
787
788fn pack_packet_inner(pkt: &Packet, reliable: Option<ReliableHeader>) -> Arc<[u8]> {
789 pack_packet_inner_with_contract(
790 pkt,
791 reliable,
792 pkt.wire_shape(),
793 pkt.wire_target_senders(),
794 None,
795 )
796 .expect("plaintext packet packing failed")
797}
798
799pub(crate) fn pack_packet_with_wire_contract(
819 pkt: &Packet,
820 reliable: Option<ReliableHeader>,
821 shape: Option<MessageElement>,
822 target_senders: &[u64],
823) -> TelemetryResult<Arc<[u8]>> {
824 pack_packet_inner_with_contract(pkt, reliable, shape, target_senders, None)
825}
826
827#[derive(Clone, Copy, Debug)]
828#[cfg_attr(not(feature = "cryptography"), allow(dead_code))]
829pub(crate) struct E2eSealConfig {
830 pub key_id: u32,
831}
832
833#[cfg(feature = "cryptography")]
834pub(crate) fn pack_packet_with_wire_contract_e2e(
835 pkt: &Packet,
836 reliable: Option<ReliableHeader>,
837 shape: Option<MessageElement>,
838 target_senders: &[u64],
839 e2e: E2eSealConfig,
840) -> TelemetryResult<Arc<[u8]>> {
841 pack_packet_inner_with_contract(pkt, reliable, shape, target_senders, Some(e2e))
842}
843
844fn pack_packet_inner_with_contract(
845 pkt: &Packet,
846 reliable: Option<ReliableHeader>,
847 shape: Option<MessageElement>,
848 target_senders: &[u64],
849 #[cfg_attr(not(feature = "cryptography"), allow(unused_variables))] e2e: Option<E2eSealConfig>,
850) -> TelemetryResult<Arc<[u8]>> {
851 let carries_wire_contract = shape.is_some() || !target_senders.is_empty();
852 let endpoints_are_schema_default = endpoints_match_schema(pkt.data_type(), pkt.endpoints());
853 let endpoint_bitmap_present = carries_wire_contract || !endpoints_are_schema_default;
854 let (bm, nep_unique) = endpoint_bitmap_and_count(pkt.endpoints());
855 let endpoint_bytes = if endpoint_bitmap_present {
856 EP_BITMAP_BYTES
857 } else {
858 0
859 };
860
861 let source_address = source_address_for_sender(pkt.sender());
862
863 let payload = pkt.payload();
865 let (payload_compressed, payload_wire) = payload_compression::compress_if_beneficial(payload);
866
867 let reliable_is_compact = reliable.is_some_and(should_compact_reliable_header);
869 let reliable_len = if let Some(hdr) = reliable {
870 reliable_wire_size(hdr, reliable_is_compact)
871 } else {
872 0
873 };
874 let contract = encode_wire_contract(shape, target_senders, reliable.is_some())
875 .unwrap_or_else(|_| vec![0u8]);
876 let contract_len = if carries_wire_contract {
877 uleb128_size(contract.len() as u64) + contract.len()
878 } else {
879 0
880 };
881 let mut out = Vec::with_capacity(
882 16 + endpoint_bytes + contract_len + reliable_len + payload_wire.len() + CRC32_BYTES,
883 );
884
885 let mut flags: u8 = 0;
887 if payload_compressed {
888 flags |= FLAG_COMPRESSED_PAYLOAD;
889 }
890 if carries_wire_contract {
891 flags |= FLAG_WIRE_CONTRACT;
892 }
893 if pkt.nonce() != 0 {
894 flags |= FLAG_PACKET_NONCE;
895 }
896 if endpoint_bitmap_present {
897 flags |= FLAG_ENDPOINT_BITMAP_PRESENT;
898 }
899 if reliable_is_compact {
900 flags |= FLAG_COMPACT_RELIABLE_HEADER;
901 }
902 #[cfg(feature = "cryptography")]
903 if e2e.is_some() {
904 flags |= FLAG_E2E_ENCRYPTED_PAYLOAD;
905 }
906 out.push(flags);
907
908 assert!(
909 nep_unique <= u8::MAX as usize,
910 "too many endpoints selected to fit in NEP u8"
911 );
912 out.push(nep_unique as u8);
913
914 write_uleb128(pkt.data_type().as_u32() as u64, &mut out);
916 write_uleb128(pkt.data_size() as u64, &mut out);
917 write_uleb128(pkt.timestamp(), &mut out);
918 if pkt.nonce() != 0 {
919 write_uleb128(pkt.nonce() as u64, &mut out);
920 }
921
922 write_uleb128(source_address as u64, &mut out);
923
924 if endpoint_bitmap_present {
925 out.extend_from_slice(&bm);
926 }
927 if (flags & FLAG_WIRE_CONTRACT) != 0 {
928 write_uleb128(contract.len() as u64, &mut out);
932 out.extend_from_slice(&contract);
933 }
934 if let Some(hdr) = reliable {
935 write_reliable_header_encoded(hdr, reliable_is_compact, &mut out);
936 }
937 #[cfg(feature = "cryptography")]
938 if let Some(e2e) = e2e {
939 write_encrypted_payload(pkt, e2e.key_id, &payload_wire, &mut out)?;
940 } else {
941 out.extend_from_slice(&payload_wire);
942 }
943 #[cfg(not(feature = "cryptography"))]
944 {
945 out.extend_from_slice(&payload_wire);
946 }
947 append_crc32(&mut out);
948
949 Ok(Arc::<[u8]>::from(out))
950}
951
952pub fn unpack_packet(buf: &[u8]) -> Result<Packet, TelemetryError> {
978 let data = verify_crc32(buf)?;
979 if data.is_empty() {
980 return Err(TelemetryError::Unpack("short prelude"));
981 }
982 let mut r = ByteReader::new(data);
983
984 let flags = r.read_bytes(1)?[0];
985 let payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
986 let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
987 let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
988 #[cfg(feature = "cryptography")]
989 let payload_is_encrypted = (flags & FLAG_E2E_ENCRYPTED_PAYLOAD) != 0;
990 #[cfg(not(feature = "cryptography"))]
991 if (flags & 0x10) != 0 {
992 return Err(TelemetryError::Unpack("e2e crypto unsupported"));
993 }
994
995 let nep = r.read_bytes(1)?[0] as usize;
996
997 let ty_v = read_uleb128(&mut r)?;
998 let dsz = read_uleb128(&mut r)? as usize; let ts_v = read_uleb128(&mut r)?;
1000 let nonce = if (flags & FLAG_PACKET_NONCE) != 0 {
1001 u16::try_from(read_uleb128(&mut r)?)
1002 .map_err(|_| TelemetryError::Unpack("packet nonce too large"))?
1003 } else {
1004 0
1005 };
1006 let source_address = u32::try_from(read_uleb128(&mut r)?)
1007 .map_err(|_| TelemetryError::Unpack("source address too large"))?;
1008 let sender_str = sender_name_for_address(source_address);
1009 let ty_u32 = data_type_id_from_wire(ty_v)?;
1010 let known_ty = DataType::try_from_u32(ty_u32);
1011 let endpoint_bytes = if endpoint_bitmap_present {
1012 EP_BITMAP_BYTES
1013 } else {
1014 0
1015 };
1016
1017 if !payload_is_compressed {
1020 if r.remaining() < endpoint_bytes + dsz {
1021 return Err(TelemetryError::Unpack("short buffer"));
1022 }
1023 } else if r.remaining() < endpoint_bytes + 1 {
1024 return Err(TelemetryError::Unpack("short buffer"));
1025 }
1026
1027 let eps = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, nep)?;
1028
1029 let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
1030 let ty = known_ty
1031 .or_else(|| contract.shape.map(|_| DataType(ty_u32)))
1032 .ok_or(TelemetryError::InvalidType)?;
1033
1034 let mut reliable_hdr: Option<ReliableHeader> = None;
1036 if is_reliable_type(ty) || contract.has_reliable_header {
1037 let hdr = read_reliable_header_encoded(&mut r, compact_reliable_header)?;
1038 if (hdr.flags & RELIABLE_FLAG_ACK_ONLY) != 0 {
1039 return Err(TelemetryError::Unpack("reliable control frame"));
1040 }
1041 reliable_hdr = Some(hdr);
1042 }
1043
1044 let payload_arc: Arc<[u8]> = {
1046 #[cfg(feature = "cryptography")]
1047 let payload_wire_owned;
1048 #[cfg(feature = "cryptography")]
1049 let payload_wire: &[u8] = if payload_is_encrypted {
1050 let aad_end = r.off;
1051 payload_wire_owned = read_encrypted_payload(&mut r, &data[..aad_end], dsz)?;
1052 &payload_wire_owned
1053 } else if !payload_is_compressed {
1054 r.read_bytes(dsz)?
1055 } else {
1056 let comp_len = r.remaining();
1057 r.read_bytes(comp_len)?
1058 };
1059
1060 #[cfg(not(feature = "cryptography"))]
1061 let payload_wire: &[u8] = if !payload_is_compressed {
1062 r.read_bytes(dsz)?
1063 } else {
1064 let comp_len = r.remaining();
1065 r.read_bytes(comp_len)?
1066 };
1067
1068 if payload_is_compressed {
1069 let decompressed = payload_compression::decompress(payload_wire, dsz)?;
1070 Arc::<[u8]>::from(decompressed)
1071 } else {
1072 if payload_wire.len() != dsz {
1073 return Err(TelemetryError::Unpack("payload length mismatch"));
1074 }
1075 Arc::<[u8]>::from(payload_wire)
1076 }
1077 };
1078
1079 let _ = reliable_hdr;
1083 Packet::new_with_wire_contract(
1084 ty,
1085 &eps,
1086 &sender_str,
1087 ts_v,
1088 nonce,
1089 payload_arc,
1090 contract.shape,
1091 contract.target_senders,
1092 )
1093}
1094
1095pub fn peek_envelope(buf: &[u8]) -> TelemetryResult<TelemetryEnvelope> {
1116 let data = verify_crc32(buf)?;
1117 if data.is_empty() {
1118 return Err(TelemetryError::Unpack("short prelude"));
1119 }
1120 let mut r = ByteReader::new(data);
1121
1122 let flags = r.read_bytes(1)?[0];
1123 let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
1124 let _payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
1126
1127 let nep = r.read_bytes(1)?[0] as usize;
1128
1129 let ty_v = read_uleb128(&mut r)?;
1130 let _dsz = read_uleb128(&mut r)? as usize;
1131 let ts_v = read_uleb128(&mut r)?;
1132 if (flags & FLAG_PACKET_NONCE) != 0 {
1133 let _ = read_uleb128(&mut r)?;
1134 }
1135 let source_address = u32::try_from(read_uleb128(&mut r)?)
1136 .map_err(|_| TelemetryError::Unpack("source address too large"))?;
1137 let sender_str = sender_name_for_address(source_address);
1138 let ty_u32 = data_type_id_from_wire(ty_v)?;
1139 let known_ty = DataType::try_from_u32(ty_u32);
1140 let endpoint_bytes = if endpoint_bitmap_present {
1141 EP_BITMAP_BYTES
1142 } else {
1143 0
1144 };
1145
1146 if r.remaining() < endpoint_bytes {
1147 return Err(TelemetryError::Unpack("short buffer"));
1148 }
1149
1150 let eps = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, nep)?;
1151
1152 let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
1153 let ty = known_ty
1154 .or_else(|| contract.shape.map(|_| DataType(ty_u32)))
1155 .ok_or(TelemetryError::InvalidType)?;
1156
1157 Ok(TelemetryEnvelope {
1158 ty,
1159 endpoints: eps,
1160 sender: Arc::<str>::from(sender_str),
1161 source_address,
1162 timestamp_ms: ts_v,
1163 wire_shape: contract.shape,
1164 target_senders: contract.target_senders,
1165 })
1166}
1167
1168pub struct TelemetryFrameInfo {
1170 pub envelope: TelemetryEnvelope,
1171 pub reliable: Option<ReliableHeader>,
1172}
1173
1174impl TelemetryFrameInfo {
1175 #[inline]
1176 pub fn ack_only(&self) -> bool {
1178 self.reliable
1179 .map(|h| (h.flags & RELIABLE_FLAG_ACK_ONLY) != 0)
1180 .unwrap_or(false)
1181 }
1182}
1183
1184fn peek_frame_info_inner(buf: &[u8]) -> TelemetryResult<TelemetryFrameInfo> {
1185 if buf.is_empty() {
1186 return Err(TelemetryError::Unpack("short prelude"));
1187 }
1188 let mut r = ByteReader::new(buf);
1189
1190 let flags = r.read_bytes(1)?[0];
1191 let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
1192 let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
1193 let _payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
1194
1195 let nep = r.read_bytes(1)?[0] as usize;
1196
1197 let ty_v = read_uleb128(&mut r)?;
1198 let _dsz = read_uleb128(&mut r)? as usize;
1199 let ts_v = read_uleb128(&mut r)?;
1200 if (flags & FLAG_PACKET_NONCE) != 0 {
1201 let _ = read_uleb128(&mut r)?;
1202 }
1203 let source_address = u32::try_from(read_uleb128(&mut r)?)
1204 .map_err(|_| TelemetryError::Unpack("source address too large"))?;
1205 let sender_str = sender_name_for_address(source_address);
1206 let ty_u32 = data_type_id_from_wire(ty_v)?;
1207 let known_ty = DataType::try_from_u32(ty_u32);
1208 let endpoint_bytes = if endpoint_bitmap_present {
1209 EP_BITMAP_BYTES
1210 } else {
1211 0
1212 };
1213
1214 if r.remaining() < endpoint_bytes {
1215 return Err(TelemetryError::Unpack("short buffer"));
1216 }
1217
1218 let eps = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, nep)?;
1219
1220 let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
1221 let ty = known_ty
1222 .or_else(|| contract.shape.map(|_| DataType(ty_u32)))
1223 .ok_or(TelemetryError::InvalidType)?;
1224
1225 let reliable = if is_reliable_type(ty) || contract.has_reliable_header {
1226 if r.remaining() < 1 {
1227 return Err(TelemetryError::Unpack("short buffer"));
1228 }
1229 Some(read_reliable_header_encoded(
1230 &mut r,
1231 compact_reliable_header,
1232 )?)
1233 } else {
1234 None
1235 };
1236
1237 Ok(TelemetryFrameInfo {
1238 envelope: TelemetryEnvelope {
1239 ty,
1240 endpoints: eps,
1241 sender: Arc::<str>::from(sender_str),
1242 source_address,
1243 timestamp_ms: ts_v,
1244 wire_shape: contract.shape,
1245 target_senders: contract.target_senders,
1246 },
1247 reliable,
1248 })
1249}
1250
1251pub fn peek_frame_info(buf: &[u8]) -> TelemetryResult<TelemetryFrameInfo> {
1257 let data = verify_crc32(buf)?;
1258 peek_frame_info_inner(data)
1259}
1260
1261pub fn peek_frame_info_unchecked(buf: &[u8]) -> TelemetryResult<TelemetryFrameInfo> {
1267 let (data, _crc) = split_crc32(buf)?;
1268 peek_frame_info_inner(data)
1269}
1270
1271pub fn reliable_header_offset(buf: &[u8]) -> TelemetryResult<Option<usize>> {
1286 Ok(reliable_header_span(buf)?.map(|(off, _, _)| off))
1287}
1288
1289pub(crate) fn reliable_header_span(
1290 buf: &[u8],
1291) -> TelemetryResult<Option<(usize, usize, ReliableHeader)>> {
1292 if buf.len() < CRC32_BYTES + 1 {
1293 return Err(TelemetryError::Unpack("short prelude"));
1294 }
1295 let data_len = buf.len().saturating_sub(CRC32_BYTES);
1296 let mut r = ByteReader::new(&buf[..data_len]);
1297
1298 let flags = r.read_bytes(1)?[0];
1299 let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
1300 let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
1301
1302 let _nep = r.read_bytes(1)?[0] as usize;
1303
1304 let ty_v = read_uleb128(&mut r)?;
1305 let _dsz = read_uleb128(&mut r)? as usize;
1306 let _ts_v = read_uleb128(&mut r)?;
1307 if (flags & FLAG_PACKET_NONCE) != 0 {
1308 let _ = read_uleb128(&mut r)?;
1309 }
1310 let _source_address = u32::try_from(read_uleb128(&mut r)?)
1311 .map_err(|_| TelemetryError::Unpack("source address too large"))?;
1312 let endpoint_bytes = if endpoint_bitmap_present {
1313 EP_BITMAP_BYTES
1314 } else {
1315 0
1316 };
1317
1318 if r.remaining() < endpoint_bytes {
1319 return Err(TelemetryError::Unpack("short buffer"));
1320 }
1321
1322 if endpoint_bitmap_present {
1323 r.read_bytes(EP_BITMAP_BYTES)?;
1324 }
1325 let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
1326 let ty_u32 = data_type_id_from_wire(ty_v)?;
1327 let ty = DataType::try_from_u32(ty_u32)
1328 .or_else(|| contract.shape.map(|_| DataType(ty_u32)))
1329 .ok_or(TelemetryError::InvalidType)?;
1330 if !is_reliable_type(ty) && !contract.has_reliable_header {
1331 return Ok(None);
1332 }
1333
1334 let off = r.off;
1335 let hdr = read_reliable_header_encoded(&mut r, compact_reliable_header)?;
1336 Ok(Some((off, r.off - off, hdr)))
1337}
1338
1339pub fn rewrite_reliable_header(
1355 buf: &mut [u8],
1356 flags: u8,
1357 seq: u32,
1358 ack: u32,
1359) -> TelemetryResult<bool> {
1360 let Some((off, old_len, _)) = reliable_header_span(buf)? else {
1361 return Ok(false);
1362 };
1363 let hdr = ReliableHeader { flags, seq, ack };
1364 let compact = should_compact_reliable_header(hdr);
1365 if reliable_wire_size(hdr, compact) != old_len {
1366 return Err(TelemetryError::Unpack(
1367 "reliable header rewrite changes wire size",
1368 ));
1369 }
1370 let data_len = buf.len().saturating_sub(CRC32_BYTES);
1371 if data_len.saturating_sub(off) < old_len {
1372 return Err(TelemetryError::Unpack("short buffer"));
1373 }
1374 if compact {
1375 buf[0] |= FLAG_COMPACT_RELIABLE_HEADER;
1376 } else {
1377 buf[0] &= !FLAG_COMPACT_RELIABLE_HEADER;
1378 }
1379 let mut encoded = Vec::with_capacity(old_len);
1380 write_reliable_header_encoded(hdr, compact, &mut encoded);
1381 buf[off..off + old_len].copy_from_slice(&encoded);
1382 if buf.len() < CRC32_BYTES {
1383 return Err(TelemetryError::Unpack("short buffer"));
1384 }
1385 let crc = crc32_bytes(&buf[..data_len]);
1386 buf[data_len..data_len + CRC32_BYTES].copy_from_slice(&crc.to_le_bytes());
1387 Ok(true)
1388}
1389
1390pub(crate) fn rewrite_reliable_header_owned(
1391 buf: &[u8],
1392 flags: u8,
1393 seq: u32,
1394 ack: u32,
1395) -> TelemetryResult<Option<Arc<[u8]>>> {
1396 let Some((off, old_len, _)) = reliable_header_span(buf)? else {
1397 return Ok(None);
1398 };
1399 let data_len = buf.len().saturating_sub(CRC32_BYTES);
1400 if data_len < off + old_len {
1401 return Err(TelemetryError::Unpack("short buffer"));
1402 }
1403 let hdr = ReliableHeader { flags, seq, ack };
1404 let compact = should_compact_reliable_header(hdr);
1405 let mut encoded = Vec::with_capacity(reliable_wire_size(hdr, compact));
1406 write_reliable_header_encoded(hdr, compact, &mut encoded);
1407
1408 let mut out = Vec::with_capacity(data_len - old_len + encoded.len() + CRC32_BYTES);
1409 out.extend_from_slice(&buf[..off]);
1410 if compact {
1411 out[0] |= FLAG_COMPACT_RELIABLE_HEADER;
1412 } else {
1413 out[0] &= !FLAG_COMPACT_RELIABLE_HEADER;
1414 }
1415 out.extend_from_slice(&encoded);
1416 out.extend_from_slice(&buf[off + old_len..data_len]);
1417 let crc = crc32_bytes(&out);
1418 out.extend_from_slice(&crc.to_le_bytes());
1419 Ok(Some(Arc::from(out)))
1420}
1421
1422pub fn header_size_bytes(pkt: &Packet) -> usize {
1438 let prelude = 2; let source_address = sender_address_u32(pkt.sender());
1441
1442 prelude
1443 + uleb128_size(pkt.data_type().as_u32() as u64)
1444 + uleb128_size(pkt.data_size() as u64)
1445 + uleb128_size(pkt.timestamp())
1446 + if pkt.nonce() != 0 {
1447 uleb128_size(pkt.nonce() as u64)
1448 } else {
1449 0
1450 }
1451 + uleb128_size(source_address as u64)
1452}
1453
1454pub fn packet_wire_size(pkt: &Packet) -> usize {
1466 let header = header_size_bytes(pkt);
1467
1468 let payload = pkt.payload();
1469 let (_payload_compressed, payload_wire) = payload_compression::compress_if_beneficial(payload);
1470
1471 let reliable_len = if is_reliable_type(pkt.data_type()) {
1472 let hdr = ReliableHeader {
1473 flags: 0,
1474 seq: 0,
1475 ack: 0,
1476 };
1477 reliable_wire_size(hdr, should_compact_reliable_header(hdr))
1478 } else {
1479 0
1480 };
1481 let endpoint_len = if endpoints_match_schema(pkt.data_type(), pkt.endpoints()) {
1482 0
1483 } else {
1484 EP_BITMAP_BYTES
1485 };
1486
1487 header + endpoint_len + reliable_len + payload_wire.len() + CRC32_BYTES
1488}
1489
1490#[inline]
1491pub fn packet_id_from_wire(buf: &[u8]) -> Result<u64, TelemetryError> {
1493 let data = verify_crc32(buf)?;
1494 if data.len() < 2 {
1495 return Err(TelemetryError::Unpack("short prelude"));
1496 }
1497
1498 let mut r = ByteReader::new(data);
1499
1500 let flags = r.read_bytes(1)?[0];
1501 let payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
1502 let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
1503 let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
1504 #[cfg(feature = "cryptography")]
1505 let payload_is_encrypted = (flags & FLAG_E2E_ENCRYPTED_PAYLOAD) != 0;
1506 #[cfg(not(feature = "cryptography"))]
1507 if (flags & 0x10) != 0 {
1508 return Err(TelemetryError::Unpack("e2e crypto unsupported"));
1509 }
1510
1511 let _nep = r.read_bytes(1)?[0] as usize;
1512
1513 let ty_v = read_uleb128(&mut r)?;
1514 let dsz = read_uleb128(&mut r)? as usize; let ts_v = read_uleb128(&mut r)?;
1516 let nonce = if (flags & FLAG_PACKET_NONCE) != 0 {
1517 u16::try_from(read_uleb128(&mut r)?)
1518 .map_err(|_| TelemetryError::Unpack("packet nonce too large"))?
1519 } else {
1520 0
1521 };
1522 let source_address = u32::try_from(read_uleb128(&mut r)?)
1523 .map_err(|_| TelemetryError::Unpack("source address too large"))?;
1524 let ty_u32 = data_type_id_from_wire(ty_v)?;
1525 let known_ty = DataType::try_from_u32(ty_u32);
1526 let endpoint_bytes = if endpoint_bitmap_present {
1527 EP_BITMAP_BYTES
1528 } else {
1529 0
1530 };
1531
1532 if r.remaining() < endpoint_bytes {
1533 return Err(TelemetryError::Unpack("short buffer"));
1534 }
1535
1536 let endpoints = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, _nep)?;
1537
1538 let _contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
1539 let ty = known_ty
1540 .or_else(|| _contract.shape.map(|_| DataType(ty_u32)))
1541 .ok_or(TelemetryError::InvalidType)?;
1542
1543 if is_reliable_type(ty) || _contract.has_reliable_header {
1545 let hdr = read_reliable_header_encoded(&mut r, compact_reliable_header)?;
1546 if (hdr.flags & RELIABLE_FLAG_ACK_ONLY) != 0 {
1547 return Err(TelemetryError::Unpack("reliable control frame"));
1548 }
1549 }
1550
1551 #[cfg(feature = "cryptography")]
1553 let payload_wire_owned;
1554 #[cfg(feature = "cryptography")]
1555 let payload_wire: &[u8] = if payload_is_encrypted {
1556 let aad_end = r.off;
1557 payload_wire_owned =
1558 read_encrypted_payload(&mut r, data.get(..aad_end).unwrap_or(&[]), dsz)?;
1559 &payload_wire_owned
1560 } else if !payload_is_compressed {
1561 if r.remaining() < dsz {
1562 return Err(TelemetryError::Unpack("short buffer"));
1563 }
1564 r.read_bytes(dsz)?
1565 } else {
1566 let comp_len = r.remaining();
1567 if comp_len < 1 {
1568 return Err(TelemetryError::Unpack("short buffer"));
1569 }
1570 r.read_bytes(comp_len)?
1571 };
1572 #[cfg(not(feature = "cryptography"))]
1573 let payload_wire: &[u8] = if !payload_is_compressed {
1574 if r.remaining() < dsz {
1575 return Err(TelemetryError::Unpack("short buffer"));
1576 }
1577 r.read_bytes(dsz)?
1578 } else {
1579 let comp_len = r.remaining();
1580 if comp_len < 1 {
1581 return Err(TelemetryError::Unpack("short buffer"));
1582 }
1583 r.read_bytes(comp_len)?
1584 };
1585 let payload_decompressed;
1586 let payload_bytes: &[u8] = if payload_is_compressed {
1587 payload_decompressed = payload_compression::decompress(payload_wire, dsz)?;
1588 &payload_decompressed
1589 } else {
1590 if payload_wire.len() != dsz {
1591 return Err(TelemetryError::Unpack("payload length mismatch"));
1592 }
1593 payload_wire
1594 };
1595
1596 let mut h: u64 = 0x9E37_79B9_7F4A_7C15;
1598
1599 h = hash_bytes_u64(h, &source_address.to_le_bytes());
1601
1602 h = hash_bytes_u64(h, get_message_name(ty).as_bytes());
1604
1605 for ep in endpoints.iter() {
1607 h = hash_bytes_u64(h, ep.as_str().as_bytes());
1608 }
1609
1610 h = hash_bytes_u64(h, &ts_v.to_le_bytes());
1612 h = hash_bytes_u64(h, &nonce.to_le_bytes());
1613 h = hash_bytes_u64(h, &(dsz as u64).to_le_bytes());
1614
1615 h = hash_bytes_u64(h, payload_bytes);
1617 Ok(h)
1618}
1619
1620mod payload_compression {
1621 use crate::TelemetryError;
1622 use alloc::borrow::Cow;
1623 #[cfg(feature = "compression")]
1624 use alloc::vec;
1625 use alloc::vec::Vec;
1626
1627 #[cfg(feature = "compression")]
1628 use crate::config::runtime_payload_compress_threshold;
1629 #[cfg(feature = "compression")]
1630 use zstd_safe::CompressionLevel;
1631
1632 #[cfg(feature = "compression")]
1640 pub fn compress_if_beneficial(payload: &'_ [u8]) -> (bool, Cow<'_, [u8]>) {
1641 if payload.len() < runtime_payload_compress_threshold() {
1642 return (false, Cow::Borrowed(payload));
1643 }
1644
1645 let Some(compressed) = compress_to_vec_bounded(payload, payload.len().saturating_sub(2))
1647 else {
1648 return (false, Cow::Borrowed(payload));
1649 };
1650
1651 if compressed.len() + 1 >= payload.len() {
1653 (false, Cow::Borrowed(payload))
1654 } else {
1655 (true, Cow::Owned(compressed))
1656 }
1657 }
1658
1659 #[cfg(feature = "compression")]
1660 fn compress_to_vec_bounded(input: &[u8], max_output: usize) -> Option<Vec<u8>> {
1661 if input.is_empty() || max_output == 0 {
1662 return None;
1663 }
1664
1665 let mut out = vec![0u8; max_output];
1666 let level: CompressionLevel = 1;
1668 let written = zstd_safe::compress(&mut out[..], input, level).ok()?;
1669 out.truncate(written);
1670 Some(out)
1671 }
1672
1673 #[cfg(feature = "compression")]
1683 pub fn decompress(compressed: &[u8], expected_len: usize) -> Result<Vec<u8>, TelemetryError> {
1684 let mut out = vec![0u8; expected_len];
1685 let written = zstd_safe::decompress(&mut out[..], compressed)
1686 .map_err(|_| TelemetryError::Unpack("decompression failed"))?;
1687 if written != expected_len {
1688 return Err(TelemetryError::Unpack("decompressed size mismatch"));
1689 }
1690 Ok(out)
1691 }
1692
1693 #[cfg(not(feature = "compression"))]
1695 pub fn compress_if_beneficial<'a>(payload: &'a [u8]) -> (bool, Cow<'a, [u8]>) {
1697 (false, Cow::Borrowed(payload))
1698 }
1699
1700 #[cfg(not(feature = "compression"))]
1701 pub fn decompress(_compressed: &[u8], _expected_len: usize) -> Result<Vec<u8>, TelemetryError> {
1703 Err(TelemetryError::Unpack(
1704 "compressed payloads not supported (compression feature disabled)",
1705 ))
1706 }
1707}