use crate::MessageElement;
use crate::{
DataEndpoint, TelemetryError, TelemetryResult, get_message_name, is_reliable_type,
message_meta,
packet::Packet,
{MAX_VALUE_DATA_ENDPOINT, MAX_VALUE_DATA_TYPE, config::DataType},
};
use crate::packet::{hash_bytes_u64, sender_address_u32};
#[cfg(feature = "std")]
use alloc::borrow::ToOwned;
#[cfg(feature = "std")]
use alloc::collections::BTreeMap;
use alloc::{format, string::String, sync::Arc, vec, vec::Vec};
use crc32fast::Hasher as Crc32Hasher;
#[cfg(feature = "std")]
use std::sync::{Mutex, OnceLock};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TelemetryEnvelope {
pub ty: DataType,
pub endpoints: Arc<[DataEndpoint]>,
pub sender: Arc<str>,
pub source_address: u32,
pub timestamp_ms: u64,
pub wire_shape: Option<MessageElement>,
pub target_senders: Arc<[u64]>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ReliableHeader {
pub flags: u8,
pub seq: u32,
pub ack: u32,
}
pub const RELIABLE_FLAG_ACK_ONLY: u8 = 0x01;
pub const RELIABLE_FLAG_UNORDERED: u8 = 0x02;
pub const RELIABLE_FLAG_UNSEQUENCED: u8 = 0x80;
pub const RELIABLE_HEADER_BYTES: usize = 1 + 4 + 4;
pub const CRC32_BYTES: usize = 4;
const FLAG_COMPRESSED_PAYLOAD: u8 = 0x01;
const FLAG_WIRE_CONTRACT: u8 = 0x04;
const FLAG_PACKET_NONCE: u8 = 0x08;
#[cfg(feature = "cryptography")]
const FLAG_E2E_ENCRYPTED_PAYLOAD: u8 = 0x10;
const FLAG_ENDPOINT_BITMAP_PRESENT: u8 = 0x20;
const FLAG_COMPACT_RELIABLE_HEADER: u8 = 0x40;
const CONTRACT_FLAG_TARGETS: u8 = 0x01;
const CONTRACT_FLAG_SHAPE: u8 = 0x02;
const CONTRACT_FLAG_RELIABLE_HEADER: u8 = 0x04;
const RELIABLE_WIRE_FLAG_SEQ_PRESENT: u8 = 0x04;
const RELIABLE_WIRE_FLAG_ACK_PRESENT: u8 = 0x08;
const RELIABLE_PUBLIC_FLAGS_MASK: u8 =
RELIABLE_FLAG_ACK_ONLY | RELIABLE_FLAG_UNORDERED | RELIABLE_FLAG_UNSEQUENCED;
#[cfg(feature = "cryptography")]
const E2E_NONCE_LEN: usize = 12;
#[cfg(feature = "cryptography")]
const E2E_TAG_CAP: usize = 32;
#[derive(Clone, Debug, PartialEq, Eq)]
struct WireContract {
shape: Option<MessageElement>,
target_senders: Arc<[u64]>,
has_reliable_header: bool,
}
#[inline]
fn write_uleb128<T>(mut v: u64, out: &mut Vec<T>)
where
T: From<u8>,
{
loop {
let mut byte = (v & 0x7F) as u8;
v >>= 7;
if v != 0 {
byte |= 0x80;
}
out.push(T::from(byte));
if v == 0 {
break;
}
}
}
#[inline]
fn read_uleb128(r: &mut ByteReader) -> Result<u64, TelemetryError> {
let mut result: u64 = 0;
let mut shift = 0u32;
for _ in 0..10 {
let b = r.read_bytes(1)?[0];
result |= ((b & 0x7F) as u64) << shift;
if (b & 0x80) == 0 {
return Ok(result);
}
shift += 7;
}
Err(TelemetryError::Unpack("uleb128 too long"))
}
#[inline]
fn uleb128_size(mut v: u64) -> usize {
let mut n = 1;
while v >= 0x80 {
v >>= 7;
n += 1;
}
n
}
#[inline]
fn bitmap_popcount(bm: &[u8]) -> usize {
bm.iter().map(|b| b.count_ones() as usize).sum()
}
#[derive(Clone, Copy)]
struct ByteReader<'a> {
buf: &'a [u8],
off: usize,
}
impl<'a> ByteReader<'a> {
fn new(buf: &'a [u8]) -> Self {
Self { buf, off: 0 }
}
fn remaining(&self) -> usize {
self.buf.len().saturating_sub(self.off)
}
fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], TelemetryError> {
if self.remaining() < n {
return Err(TelemetryError::Unpack("short read"));
}
let s = &self.buf[self.off..self.off + n];
self.off += n;
Ok(s)
}
}
#[inline]
fn write_u32_le(v: u32, out: &mut Vec<u8>) {
out.extend_from_slice(&v.to_le_bytes());
}
#[inline]
fn read_u32_le(r: &mut ByteReader) -> Result<u32, TelemetryError> {
let b = r.read_bytes(4)?;
Ok(u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
}
#[inline]
fn encode_wire_shape(shape: MessageElement) -> Result<Vec<u8>, TelemetryError> {
let dt = crate::config::message_data_type_code(shape.data_type());
let class = crate::config::message_class_code(shape.message_type());
let mut out = Vec::with_capacity(6);
let mut packed = dt | (class << 4);
if matches!(shape, MessageElement::Static(_, _, _)) {
packed |= 1 << 6;
}
out.push(packed);
if let MessageElement::Static(count, _, _) = shape {
let count = u64::try_from(count).map_err(|_| TelemetryError::Pack("wire shape count"))?;
write_uleb128(count, &mut out);
}
Ok(out)
}
#[inline]
fn decode_wire_shape(r: &mut ByteReader) -> Result<MessageElement, TelemetryError> {
let packed = r.read_bytes(1)?[0];
let dt = crate::config::message_data_type_from_code(packed & 0x0F)
.ok_or(TelemetryError::Unpack("wire shape type"))?;
let class = crate::config::message_class_from_code((packed >> 4) & 0x03)
.ok_or(TelemetryError::Unpack("wire shape class"))?;
if (packed & (1 << 6)) != 0 {
let count = usize::try_from(read_uleb128(r)?)
.map_err(|_| TelemetryError::Unpack("wire shape count"))?;
Ok(MessageElement::Static(count, dt, class))
} else {
Ok(MessageElement::Dynamic(dt, class))
}
}
#[inline]
fn encode_wire_contract(
shape: Option<MessageElement>,
target_senders: &[u64],
has_reliable_header: bool,
) -> Result<Vec<u8>, TelemetryError> {
let mut out = Vec::new();
let mut flags = 0u8;
if !target_senders.is_empty() {
flags |= CONTRACT_FLAG_TARGETS;
}
if shape.is_some() {
flags |= CONTRACT_FLAG_SHAPE;
}
if has_reliable_header {
flags |= CONTRACT_FLAG_RELIABLE_HEADER;
}
out.push(flags);
if let Some(shape) = shape {
out.extend_from_slice(&encode_wire_shape(shape)?);
}
if !target_senders.is_empty() {
write_uleb128(target_senders.len() as u64, &mut out);
for hash in target_senders {
out.extend_from_slice(&hash.to_le_bytes());
}
}
Ok(out)
}
#[inline]
fn decode_wire_contract(
r: &mut ByteReader,
has_contract: bool,
) -> Result<WireContract, TelemetryError> {
if !has_contract {
return Ok(WireContract {
shape: None,
target_senders: Arc::<[u64]>::from([]),
has_reliable_header: false,
});
}
let contract_len = usize::try_from(read_uleb128(r)?)
.map_err(|_| TelemetryError::Unpack("wire contract length"))?;
let contract_bytes = r.read_bytes(contract_len)?;
let mut cr = ByteReader::new(contract_bytes);
let flags = cr.read_bytes(1)?[0];
let shape = if (flags & CONTRACT_FLAG_SHAPE) != 0 {
Some(decode_wire_shape(&mut cr)?)
} else {
None
};
let target_senders: Arc<[u64]> = if (flags & CONTRACT_FLAG_TARGETS) != 0 {
let count = usize::try_from(read_uleb128(&mut cr)?)
.map_err(|_| TelemetryError::Unpack("wire contract target count"))?;
let mut targets = Vec::with_capacity(count);
for _ in 0..count {
let bytes = cr.read_bytes(8)?;
targets.push(u64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]));
}
Arc::from(targets)
} else {
Arc::<[u64]>::from([])
};
if cr.remaining() != 0 {
return Err(TelemetryError::Unpack("wire contract trailing bytes"));
}
Ok(WireContract {
shape,
target_senders,
has_reliable_header: (flags & CONTRACT_FLAG_RELIABLE_HEADER) != 0,
})
}
#[inline]
fn crc32_bytes(data: &[u8]) -> u32 {
let mut hasher = Crc32Hasher::new();
hasher.update(data);
hasher.finalize()
}
#[inline]
fn append_crc32(out: &mut Vec<u8>) {
let crc = crc32_bytes(out);
out.extend_from_slice(&crc.to_le_bytes());
}
#[inline]
fn split_crc32(buf: &[u8]) -> Result<(&[u8], u32), TelemetryError> {
if buf.len() < CRC32_BYTES {
return Err(TelemetryError::Unpack("short buffer"));
}
let data_len = buf.len() - CRC32_BYTES;
let crc = u32::from_le_bytes([
buf[data_len],
buf[data_len + 1],
buf[data_len + 2],
buf[data_len + 3],
]);
Ok((&buf[..data_len], crc))
}
#[inline]
fn verify_crc32(buf: &[u8]) -> Result<&[u8], TelemetryError> {
let (data, expected) = split_crc32(buf)?;
let actual = crc32_bytes(data);
if actual != expected {
return Err(TelemetryError::Unpack("crc32 mismatch"));
}
Ok(data)
}
#[cfg(feature = "cryptography")]
#[inline]
fn e2e_nonce_for_packet(pkt: &Packet) -> [u8; E2E_NONCE_LEN] {
let mut nonce = [0u8; E2E_NONCE_LEN];
nonce[..4].copy_from_slice(&pkt.data_type().as_u32().to_le_bytes());
nonce[4..10].copy_from_slice(&(pkt.timestamp() & 0x0000_FFFF_FFFF_FFFF).to_le_bytes()[..6]);
nonce[10..].copy_from_slice(&pkt.nonce().to_le_bytes());
nonce
}
#[cfg(feature = "cryptography")]
fn write_encrypted_payload(
pkt: &Packet,
key_id: u32,
plaintext_wire_payload: &[u8],
out: &mut Vec<u8>,
) -> TelemetryResult<()> {
let aad_end = out.len();
let nonce = e2e_nonce_for_packet(pkt);
let mut ciphertext = vec![0u8; plaintext_wire_payload.len()];
let mut tag = [0u8; E2E_TAG_CAP];
let (ciphertext_len, tag_len) = crate::crypto::seal_with_registered_crypto(
key_id,
&nonce,
&out[..aad_end],
plaintext_wire_payload,
&mut ciphertext,
&mut tag,
)?;
if ciphertext_len > ciphertext.len() || tag_len > tag.len() {
return Err(TelemetryError::SizeMismatchError);
}
write_uleb128(u64::from(key_id), out);
write_uleb128(plaintext_wire_payload.len() as u64, out);
write_uleb128(nonce.len() as u64, out);
out.extend_from_slice(&nonce);
write_uleb128(tag_len as u64, out);
out.extend_from_slice(&tag[..tag_len]);
out.extend_from_slice(&ciphertext[..ciphertext_len]);
Ok(())
}
#[cfg(feature = "cryptography")]
fn read_encrypted_payload(
r: &mut ByteReader,
aad: &[u8],
plaintext_len: usize,
) -> TelemetryResult<Vec<u8>> {
let key_id = u32::try_from(read_uleb128(r)?)
.map_err(|_| TelemetryError::Unpack("e2e key id too large"))?;
let wire_payload_len = usize::try_from(read_uleb128(r)?)
.map_err(|_| TelemetryError::Unpack("e2e payload length"))?;
if wire_payload_len > plaintext_len {
return Err(TelemetryError::Unpack("bad e2e payload length"));
}
let nonce_len = usize::try_from(read_uleb128(r)?)
.map_err(|_| TelemetryError::Unpack("e2e nonce length"))?;
if nonce_len == 0 || nonce_len > 64 {
return Err(TelemetryError::Unpack("bad e2e nonce length"));
}
let nonce = r.read_bytes(nonce_len)?;
let tag_len =
usize::try_from(read_uleb128(r)?).map_err(|_| TelemetryError::Unpack("e2e tag length"))?;
if tag_len == 0 || tag_len > E2E_TAG_CAP {
return Err(TelemetryError::Unpack("bad e2e tag length"));
}
let tag = r.read_bytes(tag_len)?;
let ciphertext_len = r.remaining();
let ciphertext = r.read_bytes(ciphertext_len)?;
let mut plaintext = vec![0u8; wire_payload_len];
let opened_len = crate::crypto::open_with_registered_crypto(
key_id,
nonce,
aad,
ciphertext,
tag,
&mut plaintext,
)?;
if opened_len != wire_payload_len {
return Err(TelemetryError::SizeMismatchError);
}
Ok(plaintext)
}
#[inline]
fn write_reliable_header(h: ReliableHeader, out: &mut Vec<u8>) {
out.push(h.flags);
write_u32_le(h.seq, out);
write_u32_le(h.ack, out);
}
#[inline]
fn reliable_compact_size(h: ReliableHeader) -> usize {
let seq_present = (h.flags & RELIABLE_FLAG_ACK_ONLY) == 0 || h.seq != 0;
let ack_present = h.ack != 0 || (h.flags & RELIABLE_FLAG_ACK_ONLY) != 0;
1 + if seq_present {
uleb128_size(h.seq as u64)
} else {
0
} + if ack_present {
uleb128_size(h.ack as u64)
} else {
0
}
}
#[inline]
fn should_compact_reliable_header(h: ReliableHeader) -> bool {
reliable_compact_size(h) < RELIABLE_HEADER_BYTES
}
#[inline]
fn reliable_wire_size(h: ReliableHeader, compact: bool) -> usize {
if compact {
reliable_compact_size(h)
} else {
RELIABLE_HEADER_BYTES
}
}
pub(crate) fn write_reliable_header_encoded(h: ReliableHeader, compact: bool, out: &mut Vec<u8>) {
if !compact {
write_reliable_header(h, out);
return;
}
let seq_present = (h.flags & RELIABLE_FLAG_ACK_ONLY) == 0 || h.seq != 0;
let ack_present = h.ack != 0 || (h.flags & RELIABLE_FLAG_ACK_ONLY) != 0;
let mut wire_flags = h.flags & RELIABLE_PUBLIC_FLAGS_MASK;
if seq_present {
wire_flags |= RELIABLE_WIRE_FLAG_SEQ_PRESENT;
}
if ack_present {
wire_flags |= RELIABLE_WIRE_FLAG_ACK_PRESENT;
}
out.push(wire_flags);
if seq_present {
write_uleb128(h.seq as u64, out);
}
if ack_present {
write_uleb128(h.ack as u64, out);
}
}
#[inline]
fn read_reliable_header(r: &mut ByteReader) -> Result<ReliableHeader, TelemetryError> {
let flags = r.read_bytes(1)?[0];
let seq = read_u32_le(r)?;
let ack = read_u32_le(r)?;
Ok(ReliableHeader { flags, seq, ack })
}
fn read_reliable_header_compact(r: &mut ByteReader) -> Result<ReliableHeader, TelemetryError> {
let wire_flags = r.read_bytes(1)?[0];
let seq = if (wire_flags & RELIABLE_WIRE_FLAG_SEQ_PRESENT) != 0 {
u32::try_from(read_uleb128(r)?)
.map_err(|_| TelemetryError::Unpack("reliable seq too large"))?
} else {
0
};
let ack = if (wire_flags & RELIABLE_WIRE_FLAG_ACK_PRESENT) != 0 {
u32::try_from(read_uleb128(r)?)
.map_err(|_| TelemetryError::Unpack("reliable ack too large"))?
} else {
0
};
Ok(ReliableHeader {
flags: wire_flags & RELIABLE_PUBLIC_FLAGS_MASK,
seq,
ack,
})
}
#[inline]
fn read_reliable_header_encoded(
r: &mut ByteReader,
compact: bool,
) -> Result<ReliableHeader, TelemetryError> {
if compact {
read_reliable_header_compact(r)
} else {
read_reliable_header(r)
}
}
#[cfg(feature = "std")]
static ADDRESS_BOOK: OnceLock<Mutex<BTreeMap<u32, Arc<str>>>> = OnceLock::new();
#[cfg(feature = "std")]
fn address_book() -> &'static Mutex<BTreeMap<u32, Arc<str>>> {
ADDRESS_BOOK.get_or_init(|| Mutex::new(BTreeMap::new()))
}
#[inline]
pub(crate) fn source_address_for_sender(sender: &str) -> u32 {
let addr = sender_address_u32(sender);
remember_source_address(addr, sender);
addr
}
#[cfg(feature = "std")]
pub(crate) fn remember_source_address(addr: u32, sender: &str) {
address_book()
.lock()
.expect("wire address book poisoned")
.entry(addr)
.or_insert_with(|| Arc::<str>::from(sender));
}
#[cfg(not(feature = "std"))]
pub(crate) fn remember_source_address(_addr: u32, _sender: &str) {}
#[cfg(feature = "std")]
fn sender_name_for_address(addr: u32) -> String {
address_book()
.lock()
.expect("wire address book poisoned")
.get(&addr)
.map(|sender| sender.as_ref().to_owned())
.unwrap_or_else(|| format!("@addr:{addr}"))
}
#[cfg(not(feature = "std"))]
fn sender_name_for_address(addr: u32) -> String {
format!("@addr:{addr}")
}
const EP_BITMAP_BITS: usize = (MAX_VALUE_DATA_ENDPOINT as usize) + 1;
const EP_BITMAP_BYTES: usize = EP_BITMAP_BITS.div_ceil(8);
#[inline]
fn build_endpoint_bitmap(eps: &[DataEndpoint]) -> [u8; EP_BITMAP_BYTES] {
let mut bm = [0u8; EP_BITMAP_BYTES];
for &ep in eps {
let idx = ep.as_u32() as usize;
debug_assert!(idx < EP_BITMAP_BITS, "endpoint discriminant out of range");
if idx < EP_BITMAP_BITS {
let byte = idx / 8;
let bit = idx % 8;
bm[byte] |= 1u8 << bit;
}
}
bm
}
fn expand_endpoint_bitmap(
bm: &[u8],
) -> Result<([DataEndpoint; EP_BITMAP_BITS], usize), TelemetryError> {
if bm.len() != EP_BITMAP_BYTES {
return Err(TelemetryError::Unpack("bad endpoint bitmap size"));
}
let dummy = DataEndpoint::TelemetryError;
let mut arr = [dummy; EP_BITMAP_BITS];
let mut len = 0usize;
for idx in 0..EP_BITMAP_BITS {
let byte = idx / 8;
let bit = idx % 8;
if (bm[byte] >> bit) & 1 != 0 {
let v = idx as u32;
let ep = DataEndpoint::try_from_u32(v)
.ok_or(TelemetryError::Unpack("bad endpoint bit set"))?;
arr[len] = ep;
len += 1;
}
}
Ok((arr, len))
}
#[inline]
fn endpoint_bitmap_and_count(eps: &[DataEndpoint]) -> ([u8; EP_BITMAP_BYTES], usize) {
let bm = build_endpoint_bitmap(eps);
let count = bitmap_popcount(&bm);
(bm, count)
}
#[inline]
fn endpoints_match_schema(ty: DataType, eps: &[DataEndpoint]) -> bool {
let (packet_bm, packet_count) = endpoint_bitmap_and_count(eps);
let (schema_bm, schema_count) = endpoint_bitmap_and_count(message_meta(ty).endpoints);
packet_count == schema_count && packet_bm == schema_bm
}
#[inline]
fn schema_endpoints_from_type(ty: DataType, nep: usize) -> TelemetryResult<Arc<[DataEndpoint]>> {
let (bm, count) = endpoint_bitmap_and_count(message_meta(ty).endpoints);
let (ep_buf, ep_len) = expand_endpoint_bitmap(&bm)?;
if count != nep || ep_len != nep {
return Err(TelemetryError::Unpack("endpoint count mismatch"));
}
Ok(Arc::from(&ep_buf[..ep_len]))
}
#[inline]
fn endpoints_from_wire_or_schema(
r: &mut ByteReader<'_>,
bitmap_present: bool,
ty: Option<DataType>,
nep: usize,
) -> TelemetryResult<Arc<[DataEndpoint]>> {
if bitmap_present {
let bm = r.read_bytes(EP_BITMAP_BYTES)?;
let (ep_buf, ep_len) = expand_endpoint_bitmap(bm)?;
if ep_len != nep {
return Err(TelemetryError::Unpack("endpoint count mismatch"));
}
Ok(Arc::from(&ep_buf[..ep_len]))
} else {
let ty = ty.ok_or(TelemetryError::InvalidType)?;
schema_endpoints_from_type(ty, nep)
}
}
#[inline]
fn data_type_id_from_wire(ty_v: u64) -> TelemetryResult<u32> {
let ty_u32 = u32::try_from(ty_v).map_err(|_| TelemetryError::Unpack("type too large"))?;
if ty_u32 > MAX_VALUE_DATA_TYPE {
return Err(TelemetryError::InvalidType);
}
Ok(ty_u32)
}
pub fn pack_packet(pkt: &Packet) -> Arc<[u8]> {
if is_reliable_type(pkt.data_type()) {
let hdr = ReliableHeader {
flags: RELIABLE_FLAG_UNSEQUENCED,
seq: 0,
ack: 0,
};
return pack_packet_with_reliable(pkt, hdr);
}
pack_packet_inner(pkt, None)
}
pub fn pack_packet_with_reliable(pkt: &Packet, header: ReliableHeader) -> Arc<[u8]> {
pack_packet_inner(pkt, Some(header))
}
pub fn pack_reliable_ack(sender: &str, ty: DataType, timestamp_ms: u64, ack: u32) -> Arc<[u8]> {
let bm = [0u8; EP_BITMAP_BYTES];
let source_address = source_address_for_sender(sender);
let mut out = Vec::with_capacity(32 + EP_BITMAP_BYTES + CRC32_BYTES);
let flags: u8 = FLAG_ENDPOINT_BITMAP_PRESENT;
out.push(flags);
out.push(0u8);
write_uleb128(ty.as_u32() as u64, &mut out);
write_uleb128(0u64, &mut out); write_uleb128(timestamp_ms, &mut out);
write_uleb128(source_address as u64, &mut out);
out.extend_from_slice(&bm);
let reliable = ReliableHeader {
flags: RELIABLE_FLAG_ACK_ONLY,
seq: 0,
ack,
};
if should_compact_reliable_header(reliable) {
out[0] |= FLAG_COMPACT_RELIABLE_HEADER;
write_reliable_header_encoded(reliable, true, &mut out);
} else {
write_reliable_header_encoded(reliable, false, &mut out);
}
append_crc32(&mut out);
Arc::<[u8]>::from(out)
}
fn pack_packet_inner(pkt: &Packet, reliable: Option<ReliableHeader>) -> Arc<[u8]> {
pack_packet_inner_with_contract(
pkt,
reliable,
pkt.wire_shape(),
pkt.wire_target_senders(),
None,
)
.expect("plaintext packet packing failed")
}
pub(crate) fn pack_packet_with_wire_contract(
pkt: &Packet,
reliable: Option<ReliableHeader>,
shape: Option<MessageElement>,
target_senders: &[u64],
) -> TelemetryResult<Arc<[u8]>> {
pack_packet_inner_with_contract(pkt, reliable, shape, target_senders, None)
}
#[derive(Clone, Copy, Debug)]
#[cfg_attr(not(feature = "cryptography"), allow(dead_code))]
pub(crate) struct E2eSealConfig {
pub key_id: u32,
}
#[cfg(feature = "cryptography")]
pub(crate) fn pack_packet_with_wire_contract_e2e(
pkt: &Packet,
reliable: Option<ReliableHeader>,
shape: Option<MessageElement>,
target_senders: &[u64],
e2e: E2eSealConfig,
) -> TelemetryResult<Arc<[u8]>> {
pack_packet_inner_with_contract(pkt, reliable, shape, target_senders, Some(e2e))
}
fn pack_packet_inner_with_contract(
pkt: &Packet,
reliable: Option<ReliableHeader>,
shape: Option<MessageElement>,
target_senders: &[u64],
#[cfg_attr(not(feature = "cryptography"), allow(unused_variables))] e2e: Option<E2eSealConfig>,
) -> TelemetryResult<Arc<[u8]>> {
let carries_wire_contract = shape.is_some() || !target_senders.is_empty();
let endpoints_are_schema_default = endpoints_match_schema(pkt.data_type(), pkt.endpoints());
let endpoint_bitmap_present = carries_wire_contract || !endpoints_are_schema_default;
let (bm, nep_unique) = endpoint_bitmap_and_count(pkt.endpoints());
let endpoint_bytes = if endpoint_bitmap_present {
EP_BITMAP_BYTES
} else {
0
};
let source_address = source_address_for_sender(pkt.sender());
let payload = pkt.payload();
let (payload_compressed, payload_wire) = payload_compression::compress_if_beneficial(payload);
let reliable_is_compact = reliable.is_some_and(should_compact_reliable_header);
let reliable_len = if let Some(hdr) = reliable {
reliable_wire_size(hdr, reliable_is_compact)
} else {
0
};
let contract = encode_wire_contract(shape, target_senders, reliable.is_some())
.unwrap_or_else(|_| vec![0u8]);
let contract_len = if carries_wire_contract {
uleb128_size(contract.len() as u64) + contract.len()
} else {
0
};
let mut out = Vec::with_capacity(
16 + endpoint_bytes + contract_len + reliable_len + payload_wire.len() + CRC32_BYTES,
);
let mut flags: u8 = 0;
if payload_compressed {
flags |= FLAG_COMPRESSED_PAYLOAD;
}
if carries_wire_contract {
flags |= FLAG_WIRE_CONTRACT;
}
if pkt.nonce() != 0 {
flags |= FLAG_PACKET_NONCE;
}
if endpoint_bitmap_present {
flags |= FLAG_ENDPOINT_BITMAP_PRESENT;
}
if reliable_is_compact {
flags |= FLAG_COMPACT_RELIABLE_HEADER;
}
#[cfg(feature = "cryptography")]
if e2e.is_some() {
flags |= FLAG_E2E_ENCRYPTED_PAYLOAD;
}
out.push(flags);
assert!(
nep_unique <= u8::MAX as usize,
"too many endpoints selected to fit in NEP u8"
);
out.push(nep_unique as u8);
write_uleb128(pkt.data_type().as_u32() as u64, &mut out);
write_uleb128(pkt.data_size() as u64, &mut out);
write_uleb128(pkt.timestamp(), &mut out);
if pkt.nonce() != 0 {
write_uleb128(pkt.nonce() as u64, &mut out);
}
write_uleb128(source_address as u64, &mut out);
if endpoint_bitmap_present {
out.extend_from_slice(&bm);
}
if (flags & FLAG_WIRE_CONTRACT) != 0 {
write_uleb128(contract.len() as u64, &mut out);
out.extend_from_slice(&contract);
}
if let Some(hdr) = reliable {
write_reliable_header_encoded(hdr, reliable_is_compact, &mut out);
}
#[cfg(feature = "cryptography")]
if let Some(e2e) = e2e {
write_encrypted_payload(pkt, e2e.key_id, &payload_wire, &mut out)?;
} else {
out.extend_from_slice(&payload_wire);
}
#[cfg(not(feature = "cryptography"))]
{
out.extend_from_slice(&payload_wire);
}
append_crc32(&mut out);
Ok(Arc::<[u8]>::from(out))
}
pub fn unpack_packet(buf: &[u8]) -> Result<Packet, TelemetryError> {
let data = verify_crc32(buf)?;
if data.is_empty() {
return Err(TelemetryError::Unpack("short prelude"));
}
let mut r = ByteReader::new(data);
let flags = r.read_bytes(1)?[0];
let payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
#[cfg(feature = "cryptography")]
let payload_is_encrypted = (flags & FLAG_E2E_ENCRYPTED_PAYLOAD) != 0;
#[cfg(not(feature = "cryptography"))]
if (flags & 0x10) != 0 {
return Err(TelemetryError::Unpack("e2e crypto unsupported"));
}
let nep = r.read_bytes(1)?[0] as usize;
let ty_v = read_uleb128(&mut r)?;
let dsz = read_uleb128(&mut r)? as usize; let ts_v = read_uleb128(&mut r)?;
let nonce = if (flags & FLAG_PACKET_NONCE) != 0 {
u16::try_from(read_uleb128(&mut r)?)
.map_err(|_| TelemetryError::Unpack("packet nonce too large"))?
} else {
0
};
let source_address = u32::try_from(read_uleb128(&mut r)?)
.map_err(|_| TelemetryError::Unpack("source address too large"))?;
let sender_str = sender_name_for_address(source_address);
let ty_u32 = data_type_id_from_wire(ty_v)?;
let known_ty = DataType::try_from_u32(ty_u32);
let endpoint_bytes = if endpoint_bitmap_present {
EP_BITMAP_BYTES
} else {
0
};
if !payload_is_compressed {
if r.remaining() < endpoint_bytes + dsz {
return Err(TelemetryError::Unpack("short buffer"));
}
} else if r.remaining() < endpoint_bytes + 1 {
return Err(TelemetryError::Unpack("short buffer"));
}
let eps = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, nep)?;
let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
let ty = known_ty
.or_else(|| contract.shape.map(|_| DataType(ty_u32)))
.ok_or(TelemetryError::InvalidType)?;
let mut reliable_hdr: Option<ReliableHeader> = None;
if is_reliable_type(ty) || contract.has_reliable_header {
let hdr = read_reliable_header_encoded(&mut r, compact_reliable_header)?;
if (hdr.flags & RELIABLE_FLAG_ACK_ONLY) != 0 {
return Err(TelemetryError::Unpack("reliable control frame"));
}
reliable_hdr = Some(hdr);
}
let payload_arc: Arc<[u8]> = {
#[cfg(feature = "cryptography")]
let payload_wire_owned;
#[cfg(feature = "cryptography")]
let payload_wire: &[u8] = if payload_is_encrypted {
let aad_end = r.off;
payload_wire_owned = read_encrypted_payload(&mut r, &data[..aad_end], dsz)?;
&payload_wire_owned
} else if !payload_is_compressed {
r.read_bytes(dsz)?
} else {
let comp_len = r.remaining();
r.read_bytes(comp_len)?
};
#[cfg(not(feature = "cryptography"))]
let payload_wire: &[u8] = if !payload_is_compressed {
r.read_bytes(dsz)?
} else {
let comp_len = r.remaining();
r.read_bytes(comp_len)?
};
if payload_is_compressed {
let decompressed = payload_compression::decompress(payload_wire, dsz)?;
Arc::<[u8]>::from(decompressed)
} else {
if payload_wire.len() != dsz {
return Err(TelemetryError::Unpack("payload length mismatch"));
}
Arc::<[u8]>::from(payload_wire)
}
};
let _ = reliable_hdr;
Packet::new_with_wire_contract(
ty,
&eps,
&sender_str,
ts_v,
nonce,
payload_arc,
contract.shape,
contract.target_senders,
)
}
pub fn peek_envelope(buf: &[u8]) -> TelemetryResult<TelemetryEnvelope> {
let data = verify_crc32(buf)?;
if data.is_empty() {
return Err(TelemetryError::Unpack("short prelude"));
}
let mut r = ByteReader::new(data);
let flags = r.read_bytes(1)?[0];
let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
let _payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
let nep = r.read_bytes(1)?[0] as usize;
let ty_v = read_uleb128(&mut r)?;
let _dsz = read_uleb128(&mut r)? as usize;
let ts_v = read_uleb128(&mut r)?;
if (flags & FLAG_PACKET_NONCE) != 0 {
let _ = read_uleb128(&mut r)?;
}
let source_address = u32::try_from(read_uleb128(&mut r)?)
.map_err(|_| TelemetryError::Unpack("source address too large"))?;
let sender_str = sender_name_for_address(source_address);
let ty_u32 = data_type_id_from_wire(ty_v)?;
let known_ty = DataType::try_from_u32(ty_u32);
let endpoint_bytes = if endpoint_bitmap_present {
EP_BITMAP_BYTES
} else {
0
};
if r.remaining() < endpoint_bytes {
return Err(TelemetryError::Unpack("short buffer"));
}
let eps = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, nep)?;
let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
let ty = known_ty
.or_else(|| contract.shape.map(|_| DataType(ty_u32)))
.ok_or(TelemetryError::InvalidType)?;
Ok(TelemetryEnvelope {
ty,
endpoints: eps,
sender: Arc::<str>::from(sender_str),
source_address,
timestamp_ms: ts_v,
wire_shape: contract.shape,
target_senders: contract.target_senders,
})
}
pub struct TelemetryFrameInfo {
pub envelope: TelemetryEnvelope,
pub reliable: Option<ReliableHeader>,
}
impl TelemetryFrameInfo {
#[inline]
pub fn ack_only(&self) -> bool {
self.reliable
.map(|h| (h.flags & RELIABLE_FLAG_ACK_ONLY) != 0)
.unwrap_or(false)
}
}
fn peek_frame_info_inner(buf: &[u8]) -> TelemetryResult<TelemetryFrameInfo> {
if buf.is_empty() {
return Err(TelemetryError::Unpack("short prelude"));
}
let mut r = ByteReader::new(buf);
let flags = r.read_bytes(1)?[0];
let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
let _payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
let nep = r.read_bytes(1)?[0] as usize;
let ty_v = read_uleb128(&mut r)?;
let _dsz = read_uleb128(&mut r)? as usize;
let ts_v = read_uleb128(&mut r)?;
if (flags & FLAG_PACKET_NONCE) != 0 {
let _ = read_uleb128(&mut r)?;
}
let source_address = u32::try_from(read_uleb128(&mut r)?)
.map_err(|_| TelemetryError::Unpack("source address too large"))?;
let sender_str = sender_name_for_address(source_address);
let ty_u32 = data_type_id_from_wire(ty_v)?;
let known_ty = DataType::try_from_u32(ty_u32);
let endpoint_bytes = if endpoint_bitmap_present {
EP_BITMAP_BYTES
} else {
0
};
if r.remaining() < endpoint_bytes {
return Err(TelemetryError::Unpack("short buffer"));
}
let eps = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, nep)?;
let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
let ty = known_ty
.or_else(|| contract.shape.map(|_| DataType(ty_u32)))
.ok_or(TelemetryError::InvalidType)?;
let reliable = if is_reliable_type(ty) || contract.has_reliable_header {
if r.remaining() < 1 {
return Err(TelemetryError::Unpack("short buffer"));
}
Some(read_reliable_header_encoded(
&mut r,
compact_reliable_header,
)?)
} else {
None
};
Ok(TelemetryFrameInfo {
envelope: TelemetryEnvelope {
ty,
endpoints: eps,
sender: Arc::<str>::from(sender_str),
source_address,
timestamp_ms: ts_v,
wire_shape: contract.shape,
target_senders: contract.target_senders,
},
reliable,
})
}
pub fn peek_frame_info(buf: &[u8]) -> TelemetryResult<TelemetryFrameInfo> {
let data = verify_crc32(buf)?;
peek_frame_info_inner(data)
}
pub fn peek_frame_info_unchecked(buf: &[u8]) -> TelemetryResult<TelemetryFrameInfo> {
let (data, _crc) = split_crc32(buf)?;
peek_frame_info_inner(data)
}
pub fn reliable_header_offset(buf: &[u8]) -> TelemetryResult<Option<usize>> {
Ok(reliable_header_span(buf)?.map(|(off, _, _)| off))
}
pub(crate) fn reliable_header_span(
buf: &[u8],
) -> TelemetryResult<Option<(usize, usize, ReliableHeader)>> {
if buf.len() < CRC32_BYTES + 1 {
return Err(TelemetryError::Unpack("short prelude"));
}
let data_len = buf.len().saturating_sub(CRC32_BYTES);
let mut r = ByteReader::new(&buf[..data_len]);
let flags = r.read_bytes(1)?[0];
let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
let _nep = r.read_bytes(1)?[0] as usize;
let ty_v = read_uleb128(&mut r)?;
let _dsz = read_uleb128(&mut r)? as usize;
let _ts_v = read_uleb128(&mut r)?;
if (flags & FLAG_PACKET_NONCE) != 0 {
let _ = read_uleb128(&mut r)?;
}
let _source_address = u32::try_from(read_uleb128(&mut r)?)
.map_err(|_| TelemetryError::Unpack("source address too large"))?;
let endpoint_bytes = if endpoint_bitmap_present {
EP_BITMAP_BYTES
} else {
0
};
if r.remaining() < endpoint_bytes {
return Err(TelemetryError::Unpack("short buffer"));
}
if endpoint_bitmap_present {
r.read_bytes(EP_BITMAP_BYTES)?;
}
let contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
let ty_u32 = data_type_id_from_wire(ty_v)?;
let ty = DataType::try_from_u32(ty_u32)
.or_else(|| contract.shape.map(|_| DataType(ty_u32)))
.ok_or(TelemetryError::InvalidType)?;
if !is_reliable_type(ty) && !contract.has_reliable_header {
return Ok(None);
}
let off = r.off;
let hdr = read_reliable_header_encoded(&mut r, compact_reliable_header)?;
Ok(Some((off, r.off - off, hdr)))
}
pub fn rewrite_reliable_header(
buf: &mut [u8],
flags: u8,
seq: u32,
ack: u32,
) -> TelemetryResult<bool> {
let Some((off, old_len, _)) = reliable_header_span(buf)? else {
return Ok(false);
};
let hdr = ReliableHeader { flags, seq, ack };
let compact = should_compact_reliable_header(hdr);
if reliable_wire_size(hdr, compact) != old_len {
return Err(TelemetryError::Unpack(
"reliable header rewrite changes wire size",
));
}
let data_len = buf.len().saturating_sub(CRC32_BYTES);
if data_len.saturating_sub(off) < old_len {
return Err(TelemetryError::Unpack("short buffer"));
}
if compact {
buf[0] |= FLAG_COMPACT_RELIABLE_HEADER;
} else {
buf[0] &= !FLAG_COMPACT_RELIABLE_HEADER;
}
let mut encoded = Vec::with_capacity(old_len);
write_reliable_header_encoded(hdr, compact, &mut encoded);
buf[off..off + old_len].copy_from_slice(&encoded);
if buf.len() < CRC32_BYTES {
return Err(TelemetryError::Unpack("short buffer"));
}
let crc = crc32_bytes(&buf[..data_len]);
buf[data_len..data_len + CRC32_BYTES].copy_from_slice(&crc.to_le_bytes());
Ok(true)
}
pub(crate) fn rewrite_reliable_header_owned(
buf: &[u8],
flags: u8,
seq: u32,
ack: u32,
) -> TelemetryResult<Option<Arc<[u8]>>> {
let Some((off, old_len, _)) = reliable_header_span(buf)? else {
return Ok(None);
};
let data_len = buf.len().saturating_sub(CRC32_BYTES);
if data_len < off + old_len {
return Err(TelemetryError::Unpack("short buffer"));
}
let hdr = ReliableHeader { flags, seq, ack };
let compact = should_compact_reliable_header(hdr);
let mut encoded = Vec::with_capacity(reliable_wire_size(hdr, compact));
write_reliable_header_encoded(hdr, compact, &mut encoded);
let mut out = Vec::with_capacity(data_len - old_len + encoded.len() + CRC32_BYTES);
out.extend_from_slice(&buf[..off]);
if compact {
out[0] |= FLAG_COMPACT_RELIABLE_HEADER;
} else {
out[0] &= !FLAG_COMPACT_RELIABLE_HEADER;
}
out.extend_from_slice(&encoded);
out.extend_from_slice(&buf[off + old_len..data_len]);
let crc = crc32_bytes(&out);
out.extend_from_slice(&crc.to_le_bytes());
Ok(Some(Arc::from(out)))
}
pub fn header_size_bytes(pkt: &Packet) -> usize {
let prelude = 2;
let source_address = sender_address_u32(pkt.sender());
prelude
+ uleb128_size(pkt.data_type().as_u32() as u64)
+ uleb128_size(pkt.data_size() as u64)
+ uleb128_size(pkt.timestamp())
+ if pkt.nonce() != 0 {
uleb128_size(pkt.nonce() as u64)
} else {
0
}
+ uleb128_size(source_address as u64)
}
pub fn packet_wire_size(pkt: &Packet) -> usize {
let header = header_size_bytes(pkt);
let payload = pkt.payload();
let (_payload_compressed, payload_wire) = payload_compression::compress_if_beneficial(payload);
let reliable_len = if is_reliable_type(pkt.data_type()) {
let hdr = ReliableHeader {
flags: 0,
seq: 0,
ack: 0,
};
reliable_wire_size(hdr, should_compact_reliable_header(hdr))
} else {
0
};
let endpoint_len = if endpoints_match_schema(pkt.data_type(), pkt.endpoints()) {
0
} else {
EP_BITMAP_BYTES
};
header + endpoint_len + reliable_len + payload_wire.len() + CRC32_BYTES
}
#[inline]
pub fn packet_id_from_wire(buf: &[u8]) -> Result<u64, TelemetryError> {
let data = verify_crc32(buf)?;
if data.len() < 2 {
return Err(TelemetryError::Unpack("short prelude"));
}
let mut r = ByteReader::new(data);
let flags = r.read_bytes(1)?[0];
let payload_is_compressed = (flags & FLAG_COMPRESSED_PAYLOAD) != 0;
let endpoint_bitmap_present = (flags & FLAG_ENDPOINT_BITMAP_PRESENT) != 0;
let compact_reliable_header = (flags & FLAG_COMPACT_RELIABLE_HEADER) != 0;
#[cfg(feature = "cryptography")]
let payload_is_encrypted = (flags & FLAG_E2E_ENCRYPTED_PAYLOAD) != 0;
#[cfg(not(feature = "cryptography"))]
if (flags & 0x10) != 0 {
return Err(TelemetryError::Unpack("e2e crypto unsupported"));
}
let _nep = r.read_bytes(1)?[0] as usize;
let ty_v = read_uleb128(&mut r)?;
let dsz = read_uleb128(&mut r)? as usize; let ts_v = read_uleb128(&mut r)?;
let nonce = if (flags & FLAG_PACKET_NONCE) != 0 {
u16::try_from(read_uleb128(&mut r)?)
.map_err(|_| TelemetryError::Unpack("packet nonce too large"))?
} else {
0
};
let source_address = u32::try_from(read_uleb128(&mut r)?)
.map_err(|_| TelemetryError::Unpack("source address too large"))?;
let ty_u32 = data_type_id_from_wire(ty_v)?;
let known_ty = DataType::try_from_u32(ty_u32);
let endpoint_bytes = if endpoint_bitmap_present {
EP_BITMAP_BYTES
} else {
0
};
if r.remaining() < endpoint_bytes {
return Err(TelemetryError::Unpack("short buffer"));
}
let endpoints = endpoints_from_wire_or_schema(&mut r, endpoint_bitmap_present, known_ty, _nep)?;
let _contract = decode_wire_contract(&mut r, (flags & FLAG_WIRE_CONTRACT) != 0)?;
let ty = known_ty
.or_else(|| _contract.shape.map(|_| DataType(ty_u32)))
.ok_or(TelemetryError::InvalidType)?;
if is_reliable_type(ty) || _contract.has_reliable_header {
let hdr = read_reliable_header_encoded(&mut r, compact_reliable_header)?;
if (hdr.flags & RELIABLE_FLAG_ACK_ONLY) != 0 {
return Err(TelemetryError::Unpack("reliable control frame"));
}
}
#[cfg(feature = "cryptography")]
let payload_wire_owned;
#[cfg(feature = "cryptography")]
let payload_wire: &[u8] = if payload_is_encrypted {
let aad_end = r.off;
payload_wire_owned =
read_encrypted_payload(&mut r, data.get(..aad_end).unwrap_or(&[]), dsz)?;
&payload_wire_owned
} else if !payload_is_compressed {
if r.remaining() < dsz {
return Err(TelemetryError::Unpack("short buffer"));
}
r.read_bytes(dsz)?
} else {
let comp_len = r.remaining();
if comp_len < 1 {
return Err(TelemetryError::Unpack("short buffer"));
}
r.read_bytes(comp_len)?
};
#[cfg(not(feature = "cryptography"))]
let payload_wire: &[u8] = if !payload_is_compressed {
if r.remaining() < dsz {
return Err(TelemetryError::Unpack("short buffer"));
}
r.read_bytes(dsz)?
} else {
let comp_len = r.remaining();
if comp_len < 1 {
return Err(TelemetryError::Unpack("short buffer"));
}
r.read_bytes(comp_len)?
};
let payload_decompressed;
let payload_bytes: &[u8] = if payload_is_compressed {
payload_decompressed = payload_compression::decompress(payload_wire, dsz)?;
&payload_decompressed
} else {
if payload_wire.len() != dsz {
return Err(TelemetryError::Unpack("payload length mismatch"));
}
payload_wire
};
let mut h: u64 = 0x9E37_79B9_7F4A_7C15;
h = hash_bytes_u64(h, &source_address.to_le_bytes());
h = hash_bytes_u64(h, get_message_name(ty).as_bytes());
for ep in endpoints.iter() {
h = hash_bytes_u64(h, ep.as_str().as_bytes());
}
h = hash_bytes_u64(h, &ts_v.to_le_bytes());
h = hash_bytes_u64(h, &nonce.to_le_bytes());
h = hash_bytes_u64(h, &(dsz as u64).to_le_bytes());
h = hash_bytes_u64(h, payload_bytes);
Ok(h)
}
mod payload_compression {
use crate::TelemetryError;
use alloc::borrow::Cow;
#[cfg(feature = "compression")]
use alloc::vec;
use alloc::vec::Vec;
#[cfg(feature = "compression")]
use crate::config::runtime_payload_compress_threshold;
#[cfg(feature = "compression")]
use zstd_safe::CompressionLevel;
#[cfg(feature = "compression")]
pub fn compress_if_beneficial(payload: &'_ [u8]) -> (bool, Cow<'_, [u8]>) {
if payload.len() < runtime_payload_compress_threshold() {
return (false, Cow::Borrowed(payload));
}
let Some(compressed) = compress_to_vec_bounded(payload, payload.len().saturating_sub(2))
else {
return (false, Cow::Borrowed(payload));
};
if compressed.len() + 1 >= payload.len() {
(false, Cow::Borrowed(payload))
} else {
(true, Cow::Owned(compressed))
}
}
#[cfg(feature = "compression")]
fn compress_to_vec_bounded(input: &[u8], max_output: usize) -> Option<Vec<u8>> {
if input.is_empty() || max_output == 0 {
return None;
}
let mut out = vec![0u8; max_output];
let level: CompressionLevel = 1;
let written = zstd_safe::compress(&mut out[..], input, level).ok()?;
out.truncate(written);
Some(out)
}
#[cfg(feature = "compression")]
pub fn decompress(compressed: &[u8], expected_len: usize) -> Result<Vec<u8>, TelemetryError> {
let mut out = vec![0u8; expected_len];
let written = zstd_safe::decompress(&mut out[..], compressed)
.map_err(|_| TelemetryError::Unpack("decompression failed"))?;
if written != expected_len {
return Err(TelemetryError::Unpack("decompressed size mismatch"));
}
Ok(out)
}
#[cfg(not(feature = "compression"))]
pub fn compress_if_beneficial<'a>(payload: &'a [u8]) -> (bool, Cow<'a, [u8]>) {
(false, Cow::Borrowed(payload))
}
#[cfg(not(feature = "compression"))]
pub fn decompress(_compressed: &[u8], _expected_len: usize) -> Result<Vec<u8>, TelemetryError> {
Err(TelemetryError::Unpack(
"compressed payloads not supported (compression feature disabled)",
))
}
}