Skip to main content

sedsnet/
packet.rs

1//! Telemetry packet core type and formatting helpers.
2//!
3//! [`Packet`] is the main payload-bearing type. It:
4//! - holds sender, endpoints, timestamp and raw payload bytes,
5//! - validates payload sizes and encodings against the schema from `message_meta`,
6//! - supports pretty printing (header + decoded values) for debugging/logging,
7//! - uses [`SmallPayload`] internally to keep small messages on the stack.
8
9use crate::config::{StandardSmallPayload, runtime_device_identifier, runtime_string_precision};
10use crate::queue::ByteCost;
11use crate::{
12    MessageClass, MessageDataType, MessageElement, TelemetryError, TelemetryResult,
13    config::{DataEndpoint, DataType},
14    data_type_size, get_data_type, get_message_name, message_meta,
15    router::LeBytes,
16};
17use crate::{impl_data_as_prim, impl_from_prim_slices, impl_ledecode_auto};
18use alloc::{string::String, string::ToString, sync::Arc, vec, vec::Vec};
19use core::any::TypeId;
20use core::fmt::{Formatter, Write};
21use core::sync::atomic::{AtomicU32, Ordering};
22// ============================================================================
23// Constants
24// ============================================================================
25
26/// Threshold (in ms since boot/epoch) above which timestamps are treated as
27/// Unix epoch time rather than an uptime counter.
28///
29/// Anything smaller is formatted as an uptime-style duration; larger values
30/// are formatted as a UTC date-time.
31const EPOCH_MS_THRESHOLD: u64 = 1_000_000_000_000; // clearly not an uptime counter
32
33/// Default starting capacity for human-readable strings.
34const DEFAULT_STRING_CAPACITY: usize = 96;
35
36/// Process-local packet nonce generator used to keep same-timestamp packets
37/// distinct for recent-ID dedupe even when their logical payload matches.
38static PACKET_NONCE_COUNTER: AtomicU32 = AtomicU32::new(0xA5C3_1F27);
39
40#[inline]
41fn next_packet_nonce() -> u16 {
42    let mut x = PACKET_NONCE_COUNTER.fetch_add(0x9E37_79B9, Ordering::Relaxed);
43    x ^= x << 13;
44    x ^= x >> 17;
45    x ^= x << 5;
46    (x as u16) | 1
47}
48
49// ============================================================================
50// Packet
51// ============================================================================
52
53/// Payload-bearing packet (safe, validated, shareable).
54///
55/// This is the primary data structure passed around inside the crate and
56/// across FFI boundaries (via views / wrappers).
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub struct Packet {
59    /// Logical message type (schema selector).
60    ty: DataType,
61
62    /// Size of the payload in bytes.
63    ///
64    /// This is cached and must match `payload.len()`. [`Packet::validate`]
65    /// checks the invariant.
66    data_size: usize,
67
68    /// Logical sender identifier (e.g. device or subsystem name).
69    sender: Arc<str>,
70
71    /// Destination endpoints for this message.
72    endpoints: Arc<[DataEndpoint]>,
73
74    /// Timestamp in milliseconds.
75    ///
76    /// - If `< EPOCH_MS_THRESHOLD`, treated as an uptime counter and formatted
77    ///   like `12m 34s 567ms`.
78    /// - If `>= EPOCH_MS_THRESHOLD`, treated as Unix epoch ms and formatted
79    ///   as `YYYY-MM-DD HH:MM:SS.mmmZ`.
80    timestamp: u64,
81
82    /// Small per-packet nonce carried on the wire and folded into packet ID
83    /// calculation so otherwise-identical packets emitted in the same
84    /// millisecond do not collapse in recent-ID dedupe.
85    nonce: u16,
86
87    /// Raw payload bytes, stored via [`SmallPayload`] for small/large optimization.
88    payload: StandardSmallPayload,
89
90    /// Inline wire-format shape used when this packet came from a migration-safe frame.
91    wire_shape: Option<MessageElement>,
92
93    /// Explicit destination sender hashes frozen into the wire-format contract.
94    wire_target_senders: Arc<[u64]>,
95}
96
97// ============================================================================
98// Internal helpers for validation / formatting
99// ============================================================================
100
101/// Effective element width (in bytes) for the given message data type.
102///
103/// For numeric/bool types this is the true width of one element.
104/// For string/hex we return 1 to treat the payload as a byte stream when
105/// checking dynamic length multiples.
106#[inline]
107const fn element_width(dt: MessageDataType) -> usize {
108    match dt {
109        MessageDataType::UInt8 | MessageDataType::Int8 | MessageDataType::Bool => 1,
110        MessageDataType::UInt16 | MessageDataType::Int16 => 2,
111        MessageDataType::UInt32 | MessageDataType::Int32 | MessageDataType::Float32 => 4,
112        MessageDataType::UInt64 | MessageDataType::Int64 | MessageDataType::Float64 => 8,
113        MessageDataType::UInt128 | MessageDataType::Int128 => 16,
114        // For String/Hex we treat width as 1 (byte granularity) when checking dynamic multiples.
115        MessageDataType::String | MessageDataType::Binary => 1,
116        MessageDataType::NoData => 0,
117    }
118}
119
120// Simple 64-bit hash used for packet IDs (no_std friendly).
121/// Not cryptographic; just a decent mix for deduplication.
122/// Takes an initial hash value and a byte slice, returns the updated hash.
123#[inline]
124pub fn hash_bytes_u64(mut h: u64, bytes: &[u8]) -> u64 {
125    // Arbitrary odd constant (not FNV, but good enough for dedupe).
126    const PRIME: u64 = 0x9E37_79B1;
127    for &b in bytes {
128        h ^= b as u64;
129        h = h.wrapping_mul(PRIME);
130        // a tiny extra mix
131        h ^= h >> 27;
132    }
133    h
134}
135
136/// Stable compact source address derived from a logical sender name.
137///
138/// Network-master address assignment can override the discovery mapping at the
139/// routing layer, but standalone packets need a deterministic address so the
140/// canonical wire header never has to carry the sender string.
141#[inline]
142pub(crate) fn sender_address_u32(sender: &str) -> u32 {
143    let raw = hash_bytes_u64(0xA6D3_8C21_4B7F_19E5, sender.as_bytes()) as u32;
144    if raw == 0 { 1 } else { raw }
145}
146
147/// Validate the payload of a dynamic-length message.
148///
149/// - For `String`: trims trailing NULs for validation and ensures UTF-8 (if non-empty).
150/// - For `Hex`: no additional validation.
151/// - For numerics/bool: ensures the length is a multiple of the element width.
152#[inline]
153fn validate_dynamic_len_and_content_for_element(
154    element: MessageElement,
155    bytes: &[u8],
156) -> TelemetryResult<()> {
157    match element.data_type() {
158        MessageDataType::String => {
159            let end = bytes
160                .iter()
161                .rposition(|&b| b != 0)
162                .map(|i| i + 1)
163                .unwrap_or(0);
164            if end > 0 {
165                core::str::from_utf8(&bytes[..end]).map_err(|_| TelemetryError::InvalidUtf8)?;
166            }
167            Ok(())
168        }
169        MessageDataType::Binary => Ok(()),
170        dt => {
171            let w = element_width(dt);
172            if w == 0 || !bytes.len().is_multiple_of(w) {
173                return Err(TelemetryError::SizeMismatch {
174                    expected: w,
175                    got: bytes.len(),
176                });
177            }
178            Ok(())
179        }
180    }
181}
182
183// ============================================================================
184// Decode bytes trait
185// ============================================================================
186
187trait LeDecode: Sized {
188    const WIDTH: usize;
189    fn from_le(slice: &[u8]) -> Self;
190}
191
192impl_ledecode_auto!(f32);
193impl_ledecode_auto!(f64);
194
195impl_ledecode_auto!(u16);
196impl_ledecode_auto!(u32);
197impl_ledecode_auto!(u64);
198impl_ledecode_auto!(u128);
199
200impl_ledecode_auto!(i16);
201impl_ledecode_auto!(i32);
202impl_ledecode_auto!(i64);
203impl_ledecode_auto!(i128);
204
205// special 1-byte cases
206impl_ledecode_auto!(u8);
207impl_ledecode_auto!(i8);
208
209// ============================================================================
210// Packet impl
211// ============================================================================
212
213impl Packet {
214    /// Validate `payload` against the provided schema element.
215    ///
216    /// This helper is shared by normal packet construction and by
217    /// `new_with_wire_contract(...)`. The latter matters because a packet that
218    /// was already packed may need to keep using the element shape it was
219    /// written with even if the local runtime registry has since changed.
220    ///
221    /// # Parameters
222    /// - `element`: Effective schema element to validate against. This may come
223    ///   from the current runtime registry or from the inline wire contract.
224    /// - `payload`: Raw payload bytes to validate.
225    ///
226    /// # Returns
227    /// - `Ok(())` when the payload is compatible with `element`.
228    /// - `Err(TelemetryError)` when the payload length or encoding does not
229    ///   match the required shape.
230    #[inline]
231    fn validate_payload_against_element(
232        element: MessageElement,
233        payload: &[u8],
234    ) -> TelemetryResult<()> {
235        match element {
236            MessageElement::Static(need, dt, _) => {
237                let need = need * data_type_size(dt);
238                if payload.len() != need {
239                    return Err(TelemetryError::SizeMismatch {
240                        expected: need,
241                        got: payload.len(),
242                    });
243                }
244                Ok(())
245            }
246            MessageElement::Dynamic(_, _) => {
247                validate_dynamic_len_and_content_for_element(element, payload)
248            }
249        }
250    }
251
252    /// Create a packet while preserving wire-contract metadata from a decoded frame.
253    ///
254    /// This constructor is used by unpacking paths that may carry a
255    /// migration-safe wire contract. The contract can supply:
256    ///
257    /// - `wire_shape`: an inline `MessageElement` that keeps payload decoding
258    ///   stable even if the local runtime schema changed after the packet was
259    ///   packed.
260    /// - `wire_target_senders`: a frozen list of destination-holder sender
261    ///   hashes that routers and relays use to keep in-flight forwarding bound
262    ///   to the originally intended remote holders.
263    ///
264    /// # Parameters
265    /// - `ty`: Logical message type ID carried by the frame.
266    /// - `endpoints`: Logical destination endpoints from the endpoint bitmap.
267    /// - `sender`: Logical sender string decoded from the frame.
268    /// - `timestamp`: Wire timestamp in milliseconds.
269    /// - `payload`: Payload bytes after any decompression.
270    /// - `wire_shape`: Optional inline payload shape carried by the wire
271    ///   contract. When present, validation uses this value instead of the
272    ///   current runtime schema entry.
273    /// - `wire_target_senders`: Frozen destination-holder hashes carried by the
274    ///   wire contract. These are routing metadata, not application payload.
275    ///
276    /// # Returns
277    /// - `Ok(Packet)` when all packet invariants and payload validation pass.
278    /// - `Err(TelemetryError)` when endpoints are empty or the payload is
279    ///   incompatible with the effective schema element.
280    #[inline]
281    #[allow(clippy::too_many_arguments)]
282    pub(crate) fn new_with_wire_contract(
283        ty: DataType,
284        endpoints: &[DataEndpoint],
285        sender: &str,
286        timestamp: u64,
287        nonce: u16,
288        payload: Arc<[u8]>,
289        wire_shape: Option<MessageElement>,
290        wire_target_senders: Arc<[u64]>,
291    ) -> TelemetryResult<Self> {
292        if endpoints.is_empty() {
293            return Err(TelemetryError::EmptyEndpoints);
294        }
295
296        let element = wire_shape.unwrap_or(message_meta(ty).element);
297        Self::validate_payload_against_element(element, &payload)?;
298
299        Ok(Self {
300            ty,
301            data_size: payload.len(),
302            sender: sender.into(),
303            endpoints: Arc::<[DataEndpoint]>::from(endpoints),
304            timestamp,
305            nonce,
306            payload: StandardSmallPayload::new(&payload),
307            wire_shape,
308            wire_target_senders,
309        })
310    }
311
312    /// Create a packet from a raw payload, validating against `message_meta(ty)`.
313    ///
314    /// Checks:
315    /// - `endpoints` is non-empty.
316    /// - For static element count:
317    ///   - `payload.len() == element_count * data_type_size(get_data_type(ty))`.
318    /// - For dynamic:
319    ///   - Length and encoding are validated by [`validate_dynamic_len_and_content`].
320    /// # Arguments
321    /// - `ty`: logical message type (schema selector).
322    /// - `endpoints`: destination endpoint list (must be non-empty).
323    /// - `sender`: logical sender identifier (e.g. device or subsystem name).
324    /// - `timestamp`: timestamp in milliseconds.
325    /// - `payload`: raw payload bytes.
326    /// # Returns
327    /// - `Ok(Packet)` if validation passes.
328    /// - `Err(TelemetryError)` if validation fails.
329    /// # Errors
330    /// - [`TelemetryError::EmptyEndpoints`] if `endpoints` is empty.
331    /// - [`TelemetryError::SizeMismatch`] if the payload size does not match
332    ///   the expected size for static element counts, or is not a multiple
333    ///   of the element width for dynamic types.
334    /// - [`TelemetryError::InvalidUtf8`] if the payload is a string
335    ///   type and is not valid UTF-8 .
336    pub fn new(
337        ty: DataType,
338        endpoints: &[DataEndpoint],
339        sender: &str,
340        timestamp: u64,
341        payload: Arc<[u8]>,
342    ) -> TelemetryResult<Self> {
343        Self::new_with_nonce(
344            ty,
345            endpoints,
346            sender,
347            timestamp,
348            next_packet_nonce(),
349            payload,
350        )
351    }
352
353    /// Create a packet with an explicit nonce value.
354    ///
355    /// This is mainly useful when callers need stable byte-for-byte wire output
356    /// across repeated constructions, such as deterministic tests or
357    /// precomputed fixtures.
358    pub fn new_with_nonce(
359        ty: DataType,
360        endpoints: &[DataEndpoint],
361        sender: &str,
362        timestamp: u64,
363        nonce: u16,
364        payload: Arc<[u8]>,
365    ) -> TelemetryResult<Self> {
366        Self::new_with_wire_contract(
367            ty,
368            endpoints,
369            sender,
370            timestamp,
371            nonce,
372            payload,
373            None,
374            Arc::<[u64]>::from([]),
375        )
376    }
377
378    /// Resolve the schema element this packet should use for validation,
379    /// formatting, and typed payload access.
380    ///
381    /// Normal locally-built packets use the current runtime schema entry for
382    /// `self.ty`. Packets decoded from migration-safe frames may instead carry
383    /// `self.wire_shape`, which takes precedence so an in-flight payload can
384    /// still be interpreted after runtime schema churn.
385    #[inline]
386    fn effective_element(&self) -> MessageElement {
387        self.wire_shape.unwrap_or(message_meta(self.ty).element)
388    }
389
390    /// Return the effective primitive payload kind for this packet.
391    ///
392    /// This is derived from `effective_element()` so typed decode helpers keep
393    /// following any inline wire shape when one is present.
394    #[inline]
395    fn effective_data_type(&self) -> MessageDataType {
396        self.effective_element().data_type()
397    }
398
399    /// Return the effective message class for this packet.
400    ///
401    /// Like `effective_data_type()`, this honors inline wire-shape metadata so
402    /// formatting remains consistent for in-flight packets across schema
403    /// changes.
404    #[inline]
405    fn effective_message_class(&self) -> MessageClass {
406        self.effective_element().message_type()
407    }
408
409    /// Compute a stable 64-bit identifier for this packet.
410    ///
411    /// This is *not* packed – it is derived locally from:
412    /// - sender
413    /// - logical type (DataType)
414    /// - endpoints
415    /// - timestamp
416    /// - payload bytes
417    ///
418    /// Identical packets on different boards/links will compute the same ID,
419    /// so the Router/Relay can use it to drop duplicates.
420    #[inline]
421    pub fn packet_id(&self) -> u64 {
422        // Seed with an arbitrary non-zero constant.
423        let mut h: u64 = 0x9E37_79B9_7F4A_7C15;
424
425        // Compact source address. Sender names are discovery/config metadata,
426        // not packet-header identity.
427        h = hash_bytes_u64(h, &sender_address_u32(self.sender.as_ref()).to_le_bytes());
428
429        // Logical type as string
430        h = hash_bytes_u64(h, get_message_name(self.ty).as_bytes());
431
432        // Endpoints (as their string names)
433        for ep in self.endpoints.iter() {
434            h = hash_bytes_u64(h, ep.as_str().as_bytes());
435        }
436
437        // Timestamp + data_size as bytes
438        h = hash_bytes_u64(h, &self.timestamp.to_le_bytes());
439        h = hash_bytes_u64(h, &self.nonce.to_le_bytes());
440        h = hash_bytes_u64(h, &self.data_size.to_le_bytes());
441
442        // Payload bytes
443        h = hash_bytes_u64(h, self.payload());
444
445        h
446    }
447
448    /// Generic helper: decode the payload as a Vec<T> in little-endian,
449    /// after checking the runtime data kind and size.
450    #[inline]
451    fn _as_le_bytes<T>(&self, expected_kind: MessageDataType) -> TelemetryResult<Vec<T>>
452    where
453        T: LeDecode,
454    {
455        self.ensure_kind(expected_kind)?;
456
457        let bytes: &[u8] = self.payload();
458        let width = T::WIDTH;
459
460        if !bytes.len().is_multiple_of(width) {
461            // Packet should already be validated; if not, surface a size error.
462            return Err(TelemetryError::SizeMismatch {
463                expected: (bytes.len() / width) * width,
464                got: bytes.len(),
465            });
466        }
467
468        let count = bytes.len() / width;
469        let mut out = Vec::with_capacity(count);
470
471        for chunk in bytes.chunks_exact(width) {
472            out.push(T::from_le(chunk));
473        }
474
475        Ok(out)
476    }
477
478    /// Validate basic invariants:
479    ///
480    /// - `endpoints` is non-empty.
481    /// - `payload.len() == data_size`.
482    /// - For static element count:
483    ///   - `data_size == element_count * data_type_size(get_data_type(ty))`.
484    /// - For dynamic:
485    ///   - Length and encoding are validated by [`validate_dynamic_len_and_content`].
486    /// # Returns
487    /// - `Ok(())` if validation passes.
488    /// - `Err(TelemetryError)` if validation fails.
489    pub fn validate(&self) -> TelemetryResult<()> {
490        if self.endpoints.is_empty() {
491            return Err(TelemetryError::EmptyEndpoints);
492        }
493        if self.payload.len() != self.data_size {
494            return Err(TelemetryError::SizeMismatch {
495                expected: self.data_size,
496                got: self.payload.len(),
497            });
498        }
499
500        Self::validate_payload_against_element(self.effective_element(), &self.payload)
501    }
502
503    /* ---- Getters ---- */
504    /// Get the message data type.
505    /// This is the logical schema selector.
506    #[inline]
507    pub fn data_type(&self) -> DataType {
508        self.ty
509    }
510
511    /// Get the sender identifier.
512    /// This is typically a device or subsystem name.
513    #[inline]
514    pub fn sender(&self) -> &str {
515        &self.sender
516    }
517
518    /// Get the destination endpoints for this message.
519    #[inline]
520    pub fn endpoints(&self) -> &[DataEndpoint] {
521        &self.endpoints
522    }
523
524    /// Get the timestamp in milliseconds.
525    #[inline]
526    pub fn timestamp(&self) -> u64 {
527        self.timestamp
528    }
529
530    /// Get the packet nonce.
531    #[inline]
532    pub fn nonce(&self) -> u16 {
533        self.nonce
534    }
535
536    /// Get the payload size in bytes.
537    #[inline]
538    pub fn data_size(&self) -> usize {
539        self.data_size
540    }
541
542    /// Get the raw payload bytes.
543    #[inline]
544    pub fn payload(&self) -> &[u8] {
545        &self.payload
546    }
547
548    /// Return the optional inline wire shape preserved from unpacking.
549    ///
550    /// This is crate-visible because packing and routing need to keep the
551    /// contract intact when a packet is forwarded again.
552    #[inline]
553    pub(crate) fn wire_shape(&self) -> Option<MessageElement> {
554        self.wire_shape
555    }
556
557    /// Return the frozen destination-holder hashes preserved from the wire contract.
558    ///
559    /// Routers and relays use this list to avoid delivering an in-flight packet
560    /// to newly-learned holders that were not part of the original delivery
561    /// contract when the packet was packed.
562    #[inline]
563    pub(crate) fn wire_target_senders(&self) -> &[u64] {
564        &self.wire_target_senders
565    }
566
567    /// Override the packet nonce while keeping the rest of the packet intact.
568    #[inline]
569    pub fn with_nonce(mut self, nonce: u16) -> Self {
570        self.nonce = nonce;
571        self
572    }
573
574    /// Header-only string (no decoded data).
575    ///
576    /// Example:
577    /// `Type: FOO, Data Size: 8, Sender: dev0, Endpoints: [EP_A, EP_B], Timestamp: 1234 (1s 234ms)`
578    /// # Returns
579    /// - Human-readable string with header fields.
580    pub fn header_string(&self) -> String {
581        let mut out = String::with_capacity(DEFAULT_STRING_CAPACITY);
582
583        let _ = write!(
584            &mut out,
585            "Type: {}, Data Size: {}, Sender: {}, Endpoints: [",
586            get_message_name(self.ty),
587            self.data_size,
588            self.sender.as_ref(),
589        );
590        for (i, ep) in self.endpoints.iter().enumerate() {
591            if i != 0 {
592                out.push_str(", ");
593            }
594            out.push_str(ep.as_str());
595        }
596        out.push_str("], Timestamp: ");
597        let _ = write!(&mut out, "{}", self.timestamp);
598
599        out.push_str(" (");
600        append_human_time(&mut out, self.timestamp);
601        out.push(')');
602        out
603    }
604
605    /// Borrow the payload as UTF-8 without trailing NULs (no allocation).
606    ///
607    /// Returns `None` if the message `DataType` is not a `String` type or if
608    /// the payload is not valid UTF-8 (after trimming trailing NUL).
609    /// # Returns
610    /// - `Some(&str)` if the payload is a valid UTF-8 string.
611    /// - `None` otherwise.
612    #[inline]
613    pub fn data_as_utf8_ref(&self) -> Option<&str> {
614        if self.effective_data_type() != MessageDataType::String {
615            return None;
616        }
617        let bytes = &self.payload;
618        let end = bytes.iter().rposition(|&b| b != 0).map(|i| i + 1)?;
619        core::str::from_utf8(&bytes[..end]).ok()
620    }
621
622    /// Helper: append decoded numeric/float elements to `s`.
623    ///
624    /// - Uses `LeBytes::from_le_slice` with fixed-width chunks.
625    /// - Floats (`f32`/`f64`) are formatted with a fixed precision
626    ///   the current runtime string precision.
627    #[inline]
628    fn data_to_string<T>(&self, s: &mut String)
629    where
630        T: LeBytes + core::fmt::Display + 'static,
631    {
632        let it = self.payload.chunks_exact(T::WIDTH);
633        let mut first = true;
634
635        for chunk in it {
636            if !first {
637                s.push_str(", ");
638            }
639            first = false;
640
641            let v = T::from_le_slice(chunk);
642
643            // If this is a float type, use precision; otherwise, default formatting.
644            if TypeId::of::<T>() == TypeId::of::<f32>() || TypeId::of::<T>() == TypeId::of::<f64>()
645            {
646                // `{:.*}` = "use this precision argument"
647                let _ = write!(s, "{:.*}", runtime_string_precision(), v);
648            } else {
649                let _ = write!(s, "{v}");
650            }
651        }
652    }
653
654    /// Full pretty string including decoded data portion.
655    ///
656    /// - String payloads are rendered as `"..."`
657    /// - Numeric/bool payloads are rendered as comma-separated values
658    /// - Hex payloads are delegated to [`Packet::to_hex_string`]
659    /// # Returns
660    /// - Human-readable string with header and decoded data.
661    pub fn as_string(&self) -> String {
662        let mut s = String::from("{");
663        s.push_str(&self.header_string());
664
665        if self.payload.is_empty() {
666            s.push_str(", Data: (<NoData>)}");
667            return s;
668        }
669
670        match self.effective_message_class() {
671            MessageClass::Data => {
672                s.push_str(", Data: (");
673            }
674            MessageClass::Error => {
675                s.push_str(", Error: (");
676            }
677            MessageClass::Warning => {
678                s.push_str(", Warning: (");
679            }
680        }
681
682        if let Some(msg) = self.data_as_utf8_ref() {
683            s.push('"');
684            s.push_str(msg);
685            s.push_str("\")}");
686            return s;
687        }
688
689        match self.effective_data_type() {
690            MessageDataType::Float64 => {
691                self.data_to_string::<f64>(&mut s);
692            }
693            MessageDataType::Float32 => {
694                self.data_to_string::<f32>(&mut s);
695            }
696            MessageDataType::UInt128 => {
697                self.data_to_string::<u128>(&mut s);
698            }
699            MessageDataType::UInt64 => {
700                self.data_to_string::<u64>(&mut s);
701            }
702            MessageDataType::UInt32 => {
703                self.data_to_string::<u32>(&mut s);
704            }
705            MessageDataType::UInt16 => {
706                self.data_to_string::<u16>(&mut s);
707            }
708            MessageDataType::UInt8 => {
709                self.data_to_string::<u8>(&mut s);
710            }
711            MessageDataType::Int128 => {
712                self.data_to_string::<i128>(&mut s);
713            }
714            MessageDataType::Int64 => {
715                self.data_to_string::<i64>(&mut s);
716            }
717            MessageDataType::Int32 => {
718                self.data_to_string::<i32>(&mut s);
719            }
720            MessageDataType::Int16 => {
721                self.data_to_string::<i16>(&mut s);
722            }
723            MessageDataType::Int8 => {
724                self.data_to_string::<i8>(&mut s);
725            }
726            MessageDataType::Bool => {
727                // Interpret any nonzero as true.
728                let mut it = self.payload.iter().peekable();
729                while let Some(b) = it.next() {
730                    let _ = write!(s, "{}", *b != 0);
731                    if it.peek().is_some() {
732                        s.push_str(", ");
733                    }
734                }
735            }
736            MessageDataType::String => {
737                // Already handled above via `data_as_utf8_ref`.
738            }
739            MessageDataType::Binary => return self.to_hex_string(),
740            MessageDataType::NoData => {
741                s.push_str("<no data>");
742            }
743        }
744
745        s.push_str(")}");
746        s
747    }
748
749    /// Hex dump variant of [`Packet::as_string`].
750    ///
751    /// Produces:
752    ///
753    /// `Type: ..., Data Size: ..., ..., Timestamp: ... (...), Data (hex): 0xNN 0xNN ...`
754    /// # Returns
755    /// - Human-readable string with header and hex-formatted data.
756    pub fn to_hex_string(&self) -> String {
757        // Header first.
758        let mut s = self.header_string();
759        s.push_str(", Data (hex):");
760
761        if !self.payload.is_empty() {
762            // Reserve roughly 5 chars per byte: " 0xNN".
763            s.reserve(self.payload.len().saturating_mul(5));
764            for &b in self.payload.iter() {
765                let _ = write!(&mut s, " 0x{:02x}", b);
766            }
767        }
768        s
769    }
770
771    // =========================================================================
772    // Typed data accessors
773    // =========================================================================
774
775    /// Ensure this packet's element type matches `expected`.
776    #[inline]
777    fn ensure_kind(&self, expected: MessageDataType) -> TelemetryResult<()> {
778        let dt = self.effective_data_type();
779        if dt != expected {
780            return Err(TelemetryError::TypeMismatch {
781                expected: data_type_size(expected),
782                got: data_type_size(dt),
783            });
784        }
785        Ok(())
786    }
787
788    impl_data_as_prim! {
789        data_as_f32,   f32,   MessageDataType::Float32;
790        data_as_f64,   f64,   MessageDataType::Float64;
791
792        data_as_u8,    u8,    MessageDataType::UInt8;
793        data_as_u16,   u16,   MessageDataType::UInt16;
794        data_as_u32,   u32,   MessageDataType::UInt32;
795        data_as_u64,   u64,   MessageDataType::UInt64;
796        data_as_u128,  u128,  MessageDataType::UInt128;
797
798        data_as_i8,    i8,    MessageDataType::Int8;
799        data_as_i16,   i16,   MessageDataType::Int16;
800        data_as_i32,   i32,   MessageDataType::Int32;
801        data_as_i64,   i64,   MessageDataType::Int64;
802        data_as_i128,  i128,  MessageDataType::Int128;
803    }
804
805    /// Decode payload as `bool`s. Any non-zero byte is treated as `true`.
806    #[inline]
807    pub fn data_as_bool(&self) -> TelemetryResult<Vec<bool>> {
808        self.ensure_kind(MessageDataType::Bool)?;
809        Ok(self.payload.iter().map(|&b| b != 0).collect())
810    }
811
812    /// Decode payload as a string (for String type).
813    #[inline]
814    pub fn data_as_string(&self) -> TelemetryResult<String> {
815        self.ensure_kind(MessageDataType::String)?;
816
817        let bytes = &self.payload;
818        let end = bytes
819            .iter()
820            .rposition(|&b| b != 0)
821            .map(|i| i + 1)
822            .unwrap_or(0);
823
824        if end == 0 {
825            return Ok(String::new());
826        }
827
828        let s = core::str::from_utf8(&bytes[..end])
829            .map_err(|_| TelemetryError::InvalidUtf8)?
830            .to_string();
831        Ok(s)
832    }
833
834    /// Decode payload as raw bytes (for Binary type).
835    #[inline]
836    pub fn data_as_binary(&self) -> TelemetryResult<Vec<u8>> {
837        self.ensure_kind(MessageDataType::Binary)?;
838        Ok(self.payload.to_vec())
839    }
840
841    /// Internal helper: build a packet from a slice of primitive values
842    /// encoded as little-endian, using the given sender.
843    ///
844    /// Works for all numeric types (integer and float) as long as the schema's
845    /// element width matches `T`'s width. Not used for String/Binary/Bool.
846    fn from_prim_le_slice_with_sender<T>(
847        ty: DataType,
848        values: &[T],
849        endpoints: &[DataEndpoint],
850        sender: &str,
851        timestamp: u64,
852    ) -> TelemetryResult<Self>
853    where
854        T: Copy + 'static,
855    {
856        let dt = get_data_type(ty);
857
858        // Only allow numeric-ish types here; String/Binary/Bool are handled elsewhere.
859
860        if dt == MessageDataType::Bool
861            || dt == MessageDataType::String
862            || dt == MessageDataType::Binary
863        {
864            // For these, use dedicated constructors (bool / string / binary).
865            return Err(TelemetryError::BadArg);
866        }
867        let element_size = data_type_size(dt);
868
869        // Ensure T's width matches what the schema expects.
870        if element_size != size_of::<T>() {
871            return Err(TelemetryError::TypeMismatch {
872                expected: element_size,
873                got: size_of::<T>(),
874            });
875        }
876
877        let total_bytes = values.len() * element_size;
878
879        // If the schema has a static element count, enforce it up front.
880        if let MessageElement::Static(exact, _, _) = message_meta(ty).element {
881            let exact_bytes = exact * element_size;
882            if total_bytes != exact_bytes {
883                return Err(TelemetryError::SizeMismatch {
884                    expected: exact_bytes,
885                    got: total_bytes,
886                });
887            }
888        }
889
890        let mut bytes = vec![0u8; total_bytes];
891        unsafe { bytes.set_len(total_bytes) };
892
893        for (i, v) in values.iter().copied().enumerate() {
894            let offset = i * element_size;
895            let dst = &mut bytes[offset..offset + element_size];
896
897            // Copy the raw memory of `v` into the buffer.
898            unsafe {
899                core::ptr::copy_nonoverlapping(
900                    &v as *const T as *const u8,
901                    dst.as_mut_ptr(),
902                    element_size,
903                );
904            }
905
906            // Normalize to little-endian on big-endian targets.
907            // On little-endian this block is compiled out.
908            #[cfg(target_endian = "big")]
909            {
910                dst.reverse();
911            }
912        }
913
914        Self::new(ty, endpoints, sender, timestamp, Arc::<[u8]>::from(bytes))
915    }
916
917    /// Same as `from_prim_le_slice_with_sender` but uses the runtime device identifier.
918    /// as the sender (mirrors `from_u8_slice`, `from_f32_slice`).
919    #[inline]
920    pub fn from_prim_le_slice<T>(
921        ty: DataType,
922        values: &[T],
923        endpoints: &[DataEndpoint],
924        timestamp: u64,
925    ) -> TelemetryResult<Self>
926    where
927        T: Copy + 'static,
928    {
929        let sender = runtime_device_identifier();
930        Self::from_prim_le_slice_with_sender(ty, values, endpoints, &sender, timestamp)
931    }
932
933    // -------------------------------------------------------------------------
934    // Convenience wrappers for all numeric types
935    // -------------------------------------------------------------------------
936    impl_from_prim_slices! {
937        from_u8_slice,   u8;
938        from_u16_slice,  u16;
939        from_i8_slice,   i8;
940        from_i16_slice,  i16;
941        from_u32_slice,  u32;
942        from_i32_slice,  i32;
943        from_u64_slice,  u64;
944        from_i64_slice,  i64;
945        from_u128_slice, u128;
946        from_i128_slice, i128;
947        from_f32_slice,  f32;
948        from_f64_slice,  f64;
949    }
950
951    /// Builds a packet with an empty payload for types whose schema allows zero bytes.
952    #[inline]
953    pub fn from_no_data(
954        ty: DataType,
955        endpoints: &[DataEndpoint],
956        timestamp: u64,
957    ) -> TelemetryResult<Self> {
958        let meta = message_meta(ty);
959        match meta.element {
960            MessageElement::Static(need, _, _) => {
961                if need != 0 {
962                    return Err(TelemetryError::SizeMismatch {
963                        expected: need,
964                        got: 0,
965                    });
966                }
967            }
968            MessageElement::Dynamic(_, _) => {
969                // Dynamic with zero-length payload is OK.
970            }
971        }
972
973        let sender = runtime_device_identifier();
974        Self::new(ty, endpoints, &sender, timestamp, Arc::<[u8]>::from([]))
975    }
976
977    /// Bool constructor: encodes each bool as a single byte (0 / 1).
978    #[inline]
979    pub fn from_bool_slice(
980        ty: DataType,
981        values: &[bool],
982        endpoints: &[DataEndpoint],
983        timestamp: u64,
984    ) -> TelemetryResult<Self> {
985        if get_data_type(ty) != MessageDataType::Bool {
986            return Err(TelemetryError::TypeMismatch {
987                expected: data_type_size(get_data_type(ty)),
988                got: size_of::<bool>(),
989            });
990        }
991
992        let total_bytes = values.len();
993        if let MessageElement::Static(exact, _, _) = message_meta(ty).element
994            && total_bytes != exact
995        {
996            return Err(TelemetryError::SizeMismatch {
997                expected: exact,
998                got: total_bytes,
999            });
1000        }
1001
1002        let mut bytes = Vec::with_capacity(total_bytes);
1003        bytes.extend(values.iter().map(|b| if *b { 1u8 } else { 0u8 }));
1004
1005        let sender = runtime_device_identifier();
1006        Self::new(ty, endpoints, &sender, timestamp, Arc::<[u8]>::from(bytes))
1007    }
1008
1009    /// String constructor (dynamic length). Trailing NULs are not added;
1010    /// `new()` + `validate_dynamic_len_and_content` will do UTF-8 validation.
1011    #[inline]
1012    pub fn from_str_slice(
1013        ty: DataType,
1014        s: &str,
1015        endpoints: &[DataEndpoint],
1016        timestamp: u64,
1017    ) -> TelemetryResult<Self> {
1018        if get_data_type(ty) != MessageDataType::String {
1019            return Err(TelemetryError::TypeMismatch {
1020                expected: data_type_size(get_data_type(ty)),
1021                got: 1,
1022            });
1023        }
1024
1025        let bytes: Arc<[u8]> = Arc::from(s.as_bytes());
1026        let sender = runtime_device_identifier();
1027        Self::new(ty, endpoints, &sender, timestamp, bytes)
1028    }
1029}
1030
1031// ============================================================================
1032// Time formatting (no_std-friendly, UTC or uptime)
1033// ============================================================================
1034
1035#[inline]
1036fn div_mod_u64(n: u64, d: u64) -> (u64, u64) {
1037    (n / d, n % d)
1038}
1039
1040// Howard Hinnant–style civil-from-days (proleptic Gregorian).
1041fn civil_from_days(mut z: i64) -> (i32, u32, u32) {
1042    // epoch (1970-01-01) has days=0
1043    z += 719_468; // shift to civil base
1044    let era = (if z >= 0 { z } else { z - 146_096 }) / 146_097;
1045    let doe = z - era * 146_097; // [0, 146096]
1046    let yoe = (doe - doe / 1_460 + doe / 36_524 - doe / 146_096) / 365; // [0, 399]
1047    let y = (yoe as i32) + era as i32 * 400;
1048    let doy = (doe - (365 * yoe + yoe / 4 - yoe / 100)) as i32; // [0, 365]
1049    let mp = (5 * doy + 2) / 153; // [0, 11]
1050    let d = doy - (153 * mp + 2) / 5 + 1; // [1, 31]
1051    let m = mp + if mp < 10 { 3 } else { -9 }; // [1, 12]
1052    let y = y + (m <= 2) as i32; // year
1053    (y, m as u32, d as u32)
1054}
1055
1056/// Append a human-readable timestamp to `out`, either uptime (`hh:mm:ss.mmm`)
1057/// or UTC epoch like `YYYY-MM-DD HH:MM:SS.mmmZ`, depending on threshold.
1058fn append_human_time(out: &mut String, total_ms: u64) {
1059    if total_ms >= EPOCH_MS_THRESHOLD {
1060        // Unix epoch path.
1061        let (secs, sub_ms) = div_mod_u64(total_ms, 1_000);
1062        let days = (secs / 86_400) as i64;
1063        let sod = (secs % 86_400) as u32; // seconds of day
1064        let (year, month, day) = civil_from_days(days);
1065        let hour = sod / 3_600;
1066        let min = (sod % 3_600) / 60;
1067        let sec = sod % 60;
1068        let _ = Write::write_fmt(
1069            out,
1070            format_args!(
1071                "{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:03}Z",
1072                year, month, day, hour, min, sec, sub_ms as u32
1073            ),
1074        );
1075    } else {
1076        // Uptime-style duration.
1077        let hours = total_ms / 3_600_000;
1078        let minutes = (total_ms % 3_600_000) / 60_000;
1079        let seconds = (total_ms % 60_000) / 1_000;
1080        let milliseconds = total_ms % 1_000;
1081        if hours > 0 {
1082            let _ = Write::write_fmt(
1083                out,
1084                format_args!("{hours}h {minutes:02}m {seconds:02}s {milliseconds:03}ms"),
1085            );
1086        } else if minutes > 0 {
1087            let _ = Write::write_fmt(
1088                out,
1089                format_args!("{minutes}m {seconds:02}s {milliseconds:03}ms"),
1090            );
1091        } else {
1092            let _ = Write::write_fmt(out, format_args!("{seconds}s {milliseconds:03}ms"));
1093        }
1094    }
1095}
1096
1097// ============================================================================
1098// Display impl
1099// ============================================================================
1100
1101impl core::fmt::Display for Packet {
1102    #[inline]
1103    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
1104        f.write_str(&Packet::as_string(self))
1105    }
1106}
1107
1108impl ByteCost for Packet {
1109    #[inline]
1110    fn byte_cost(&self) -> usize {
1111        size_of::<Self>()
1112            + self.sender.len()
1113            + self.endpoints.len() * size_of::<DataEndpoint>()
1114            + self.payload.byte_cost()
1115    }
1116}