sedsnet/packet.rs
1//! Telemetry packet core type and formatting helpers.
2//!
3//! [`Packet`] is the main payload-bearing type. It:
4//! - holds sender, endpoints, timestamp and raw payload bytes,
5//! - validates payload sizes and encodings against the schema from `message_meta`,
6//! - supports pretty printing (header + decoded values) for debugging/logging,
7//! - uses [`SmallPayload`] internally to keep small messages on the stack.
8
9use crate::config::{DEVICE_IDENTIFIER, STRING_PRECISION, StandardSmallPayload};
10use crate::queue::ByteCost;
11use crate::{
12 MessageClass, MessageDataType, MessageElement, TelemetryError, TelemetryResult,
13 config::{DataEndpoint, DataType},
14 data_type_size, get_data_type, get_message_name, message_meta,
15 router::LeBytes,
16};
17use crate::{impl_data_as_prim, impl_from_prim_slices, impl_ledecode_auto};
18use alloc::{string::String, string::ToString, sync::Arc, vec, vec::Vec};
19use core::any::TypeId;
20use core::fmt::{Formatter, Write};
21use core::sync::atomic::{AtomicU32, Ordering};
22// ============================================================================
23// Constants
24// ============================================================================
25
26/// Threshold (in ms since boot/epoch) above which timestamps are treated as
27/// Unix epoch time rather than an uptime counter.
28///
29/// Anything smaller is formatted as an uptime-style duration; larger values
30/// are formatted as a UTC date-time.
31const EPOCH_MS_THRESHOLD: u64 = 1_000_000_000_000; // clearly not an uptime counter
32
33/// Default starting capacity for human-readable strings.
34const DEFAULT_STRING_CAPACITY: usize = 96;
35
36/// Process-local packet nonce generator used to keep same-timestamp packets
37/// distinct for recent-ID dedupe even when their logical payload matches.
38static PACKET_NONCE_COUNTER: AtomicU32 = AtomicU32::new(0xA5C3_1F27);
39
40#[inline]
41fn next_packet_nonce() -> u16 {
42 let mut x = PACKET_NONCE_COUNTER.fetch_add(0x9E37_79B9, Ordering::Relaxed);
43 x ^= x << 13;
44 x ^= x >> 17;
45 x ^= x << 5;
46 (x as u16) | 1
47}
48
49// ============================================================================
50// Packet
51// ============================================================================
52
53/// Payload-bearing packet (safe, validated, shareable).
54///
55/// This is the primary data structure passed around inside the crate and
56/// across FFI boundaries (via views / wrappers).
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub struct Packet {
59 /// Logical message type (schema selector).
60 ty: DataType,
61
62 /// Size of the payload in bytes.
63 ///
64 /// This is cached and must match `payload.len()`. [`Packet::validate`]
65 /// checks the invariant.
66 data_size: usize,
67
68 /// Logical sender identifier (e.g. device or subsystem name).
69 sender: Arc<str>,
70
71 /// Destination endpoints for this message.
72 endpoints: Arc<[DataEndpoint]>,
73
74 /// Timestamp in milliseconds.
75 ///
76 /// - If `< EPOCH_MS_THRESHOLD`, treated as an uptime counter and formatted
77 /// like `12m 34s 567ms`.
78 /// - If `>= EPOCH_MS_THRESHOLD`, treated as Unix epoch ms and formatted
79 /// as `YYYY-MM-DD HH:MM:SS.mmmZ`.
80 timestamp: u64,
81
82 /// Small per-packet nonce carried on the wire and folded into packet ID
83 /// calculation so otherwise-identical packets emitted in the same
84 /// millisecond do not collapse in recent-ID dedupe.
85 nonce: u16,
86
87 /// Raw payload bytes, stored via [`SmallPayload`] for small/large optimization.
88 payload: StandardSmallPayload,
89
90 /// Inline wire-format shape used when this packet came from a migration-safe frame.
91 wire_shape: Option<MessageElement>,
92
93 /// Explicit destination sender hashes frozen into the wire-format contract.
94 wire_target_senders: Arc<[u64]>,
95}
96
97// ============================================================================
98// Internal helpers for validation / formatting
99// ============================================================================
100
101/// Effective element width (in bytes) for the given message data type.
102///
103/// For numeric/bool types this is the true width of one element.
104/// For string/hex we return 1 to treat the payload as a byte stream when
105/// checking dynamic length multiples.
106#[inline]
107const fn element_width(dt: MessageDataType) -> usize {
108 match dt {
109 MessageDataType::UInt8 | MessageDataType::Int8 | MessageDataType::Bool => 1,
110 MessageDataType::UInt16 | MessageDataType::Int16 => 2,
111 MessageDataType::UInt32 | MessageDataType::Int32 | MessageDataType::Float32 => 4,
112 MessageDataType::UInt64 | MessageDataType::Int64 | MessageDataType::Float64 => 8,
113 MessageDataType::UInt128 | MessageDataType::Int128 => 16,
114 // For String/Hex we treat width as 1 (byte granularity) when checking dynamic multiples.
115 MessageDataType::String | MessageDataType::Binary => 1,
116 MessageDataType::NoData => 0,
117 }
118}
119
120// Simple 64-bit hash used for packet IDs (no_std friendly).
121/// Not cryptographic; just a decent mix for deduplication.
122/// Takes an initial hash value and a byte slice, returns the updated hash.
123#[inline]
124pub fn hash_bytes_u64(mut h: u64, bytes: &[u8]) -> u64 {
125 // Arbitrary odd constant (not FNV, but good enough for dedupe).
126 const PRIME: u64 = 0x9E37_79B1;
127 for &b in bytes {
128 h ^= b as u64;
129 h = h.wrapping_mul(PRIME);
130 // a tiny extra mix
131 h ^= h >> 27;
132 }
133 h
134}
135
136/// Stable compact source address derived from a logical sender name.
137///
138/// Network-master address assignment can override the discovery mapping at the
139/// routing layer, but standalone packets need a deterministic address so the
140/// canonical wire header never has to carry the sender string.
141#[inline]
142pub(crate) fn sender_address_u32(sender: &str) -> u32 {
143 let raw = hash_bytes_u64(0xA6D3_8C21_4B7F_19E5, sender.as_bytes()) as u32;
144 if raw == 0 { 1 } else { raw }
145}
146
147/// Validate the payload of a dynamic-length message.
148///
149/// - For `String`: trims trailing NULs for validation and ensures UTF-8 (if non-empty).
150/// - For `Hex`: no additional validation.
151/// - For numerics/bool: ensures the length is a multiple of the element width.
152#[inline]
153fn validate_dynamic_len_and_content_for_element(
154 element: MessageElement,
155 bytes: &[u8],
156) -> TelemetryResult<()> {
157 match element.data_type() {
158 MessageDataType::String => {
159 let end = bytes
160 .iter()
161 .rposition(|&b| b != 0)
162 .map(|i| i + 1)
163 .unwrap_or(0);
164 if end > 0 {
165 core::str::from_utf8(&bytes[..end]).map_err(|_| TelemetryError::InvalidUtf8)?;
166 }
167 Ok(())
168 }
169 MessageDataType::Binary => Ok(()),
170 dt => {
171 let w = element_width(dt);
172 if w == 0 || !bytes.len().is_multiple_of(w) {
173 return Err(TelemetryError::SizeMismatch {
174 expected: w,
175 got: bytes.len(),
176 });
177 }
178 Ok(())
179 }
180 }
181}
182
183// ============================================================================
184// Decode bytes trait
185// ============================================================================
186
187trait LeDecode: Sized {
188 const WIDTH: usize;
189 fn from_le(slice: &[u8]) -> Self;
190}
191
192impl_ledecode_auto!(f32);
193impl_ledecode_auto!(f64);
194
195impl_ledecode_auto!(u16);
196impl_ledecode_auto!(u32);
197impl_ledecode_auto!(u64);
198impl_ledecode_auto!(u128);
199
200impl_ledecode_auto!(i16);
201impl_ledecode_auto!(i32);
202impl_ledecode_auto!(i64);
203impl_ledecode_auto!(i128);
204
205// special 1-byte cases
206impl_ledecode_auto!(u8);
207impl_ledecode_auto!(i8);
208
209// ============================================================================
210// Packet impl
211// ============================================================================
212
213impl Packet {
214 /// Validate `payload` against the provided schema element.
215 ///
216 /// This helper is shared by normal packet construction and by
217 /// `new_with_wire_contract(...)`. The latter matters because a packet that
218 /// was already packed may need to keep using the element shape it was
219 /// written with even if the local runtime registry has since changed.
220 ///
221 /// # Parameters
222 /// - `element`: Effective schema element to validate against. This may come
223 /// from the current runtime registry or from the inline wire contract.
224 /// - `payload`: Raw payload bytes to validate.
225 ///
226 /// # Returns
227 /// - `Ok(())` when the payload is compatible with `element`.
228 /// - `Err(TelemetryError)` when the payload length or encoding does not
229 /// match the required shape.
230 #[inline]
231 fn validate_payload_against_element(
232 element: MessageElement,
233 payload: &[u8],
234 ) -> TelemetryResult<()> {
235 match element {
236 MessageElement::Static(need, dt, _) => {
237 let need = need * data_type_size(dt);
238 if payload.len() != need {
239 return Err(TelemetryError::SizeMismatch {
240 expected: need,
241 got: payload.len(),
242 });
243 }
244 Ok(())
245 }
246 MessageElement::Dynamic(_, _) => {
247 validate_dynamic_len_and_content_for_element(element, payload)
248 }
249 }
250 }
251
252 /// Create a packet while preserving wire-contract metadata from a decoded frame.
253 ///
254 /// This constructor is used by unpacking paths that may carry a
255 /// migration-safe wire contract. The contract can supply:
256 ///
257 /// - `wire_shape`: an inline `MessageElement` that keeps payload decoding
258 /// stable even if the local runtime schema changed after the packet was
259 /// packed.
260 /// - `wire_target_senders`: a frozen list of destination-holder sender
261 /// hashes that routers and relays use to keep in-flight forwarding bound
262 /// to the originally intended remote holders.
263 ///
264 /// # Parameters
265 /// - `ty`: Logical message type ID carried by the frame.
266 /// - `endpoints`: Logical destination endpoints from the endpoint bitmap.
267 /// - `sender`: Logical sender string decoded from the frame.
268 /// - `timestamp`: Wire timestamp in milliseconds.
269 /// - `payload`: Payload bytes after any decompression.
270 /// - `wire_shape`: Optional inline payload shape carried by the wire
271 /// contract. When present, validation uses this value instead of the
272 /// current runtime schema entry.
273 /// - `wire_target_senders`: Frozen destination-holder hashes carried by the
274 /// wire contract. These are routing metadata, not application payload.
275 ///
276 /// # Returns
277 /// - `Ok(Packet)` when all packet invariants and payload validation pass.
278 /// - `Err(TelemetryError)` when endpoints are empty or the payload is
279 /// incompatible with the effective schema element.
280 #[inline]
281 #[allow(clippy::too_many_arguments)]
282 pub(crate) fn new_with_wire_contract(
283 ty: DataType,
284 endpoints: &[DataEndpoint],
285 sender: &str,
286 timestamp: u64,
287 nonce: u16,
288 payload: Arc<[u8]>,
289 wire_shape: Option<MessageElement>,
290 wire_target_senders: Arc<[u64]>,
291 ) -> TelemetryResult<Self> {
292 if endpoints.is_empty() {
293 return Err(TelemetryError::EmptyEndpoints);
294 }
295
296 let element = wire_shape.unwrap_or(message_meta(ty).element);
297 Self::validate_payload_against_element(element, &payload)?;
298
299 Ok(Self {
300 ty,
301 data_size: payload.len(),
302 sender: sender.into(),
303 endpoints: Arc::<[DataEndpoint]>::from(endpoints),
304 timestamp,
305 nonce,
306 payload: StandardSmallPayload::new(&payload),
307 wire_shape,
308 wire_target_senders,
309 })
310 }
311
312 /// Create a packet from a raw payload, validating against `message_meta(ty)`.
313 ///
314 /// Checks:
315 /// - `endpoints` is non-empty.
316 /// - For static element count:
317 /// - `payload.len() == element_count * data_type_size(get_data_type(ty))`.
318 /// - For dynamic:
319 /// - Length and encoding are validated by [`validate_dynamic_len_and_content`].
320 /// # Arguments
321 /// - `ty`: logical message type (schema selector).
322 /// - `endpoints`: destination endpoint list (must be non-empty).
323 /// - `sender`: logical sender identifier (e.g. device or subsystem name).
324 /// - `timestamp`: timestamp in milliseconds.
325 /// - `payload`: raw payload bytes.
326 /// # Returns
327 /// - `Ok(Packet)` if validation passes.
328 /// - `Err(TelemetryError)` if validation fails.
329 /// # Errors
330 /// - [`TelemetryError::EmptyEndpoints`] if `endpoints` is empty.
331 /// - [`TelemetryError::SizeMismatch`] if the payload size does not match
332 /// the expected size for static element counts, or is not a multiple
333 /// of the element width for dynamic types.
334 /// - [`TelemetryError::InvalidUtf8`] if the payload is a string
335 /// type and is not valid UTF-8 .
336 pub fn new(
337 ty: DataType,
338 endpoints: &[DataEndpoint],
339 sender: &str,
340 timestamp: u64,
341 payload: Arc<[u8]>,
342 ) -> TelemetryResult<Self> {
343 Self::new_with_nonce(
344 ty,
345 endpoints,
346 sender,
347 timestamp,
348 next_packet_nonce(),
349 payload,
350 )
351 }
352
353 /// Create a packet with an explicit nonce value.
354 ///
355 /// This is mainly useful when callers need stable byte-for-byte wire output
356 /// across repeated constructions, such as deterministic tests or
357 /// precomputed fixtures.
358 pub fn new_with_nonce(
359 ty: DataType,
360 endpoints: &[DataEndpoint],
361 sender: &str,
362 timestamp: u64,
363 nonce: u16,
364 payload: Arc<[u8]>,
365 ) -> TelemetryResult<Self> {
366 Self::new_with_wire_contract(
367 ty,
368 endpoints,
369 sender,
370 timestamp,
371 nonce,
372 payload,
373 None,
374 Arc::<[u64]>::from([]),
375 )
376 }
377
378 /// Resolve the schema element this packet should use for validation,
379 /// formatting, and typed payload access.
380 ///
381 /// Normal locally-built packets use the current runtime schema entry for
382 /// `self.ty`. Packets decoded from migration-safe frames may instead carry
383 /// `self.wire_shape`, which takes precedence so an in-flight payload can
384 /// still be interpreted after runtime schema churn.
385 #[inline]
386 fn effective_element(&self) -> MessageElement {
387 self.wire_shape.unwrap_or(message_meta(self.ty).element)
388 }
389
390 /// Return the effective primitive payload kind for this packet.
391 ///
392 /// This is derived from `effective_element()` so typed decode helpers keep
393 /// following any inline wire shape when one is present.
394 #[inline]
395 fn effective_data_type(&self) -> MessageDataType {
396 self.effective_element().data_type()
397 }
398
399 /// Return the effective message class for this packet.
400 ///
401 /// Like `effective_data_type()`, this honors inline wire-shape metadata so
402 /// formatting remains consistent for in-flight packets across schema
403 /// changes.
404 #[inline]
405 fn effective_message_class(&self) -> MessageClass {
406 self.effective_element().message_type()
407 }
408
409 /// Compute a stable 64-bit identifier for this packet.
410 ///
411 /// This is *not* packed – it is derived locally from:
412 /// - sender
413 /// - logical type (DataType)
414 /// - endpoints
415 /// - timestamp
416 /// - payload bytes
417 ///
418 /// Identical packets on different boards/links will compute the same ID,
419 /// so the Router/Relay can use it to drop duplicates.
420 #[inline]
421 pub fn packet_id(&self) -> u64 {
422 // Seed with an arbitrary non-zero constant.
423 let mut h: u64 = 0x9E37_79B9_7F4A_7C15;
424
425 // Compact source address. Sender names are discovery/config metadata,
426 // not packet-header identity.
427 h = hash_bytes_u64(h, &sender_address_u32(self.sender.as_ref()).to_le_bytes());
428
429 // Logical type as string
430 h = hash_bytes_u64(h, get_message_name(self.ty).as_bytes());
431
432 // Endpoints (as their string names)
433 for ep in self.endpoints.iter() {
434 h = hash_bytes_u64(h, ep.as_str().as_bytes());
435 }
436
437 // Timestamp + data_size as bytes
438 h = hash_bytes_u64(h, &self.timestamp.to_le_bytes());
439 h = hash_bytes_u64(h, &self.nonce.to_le_bytes());
440 h = hash_bytes_u64(h, &self.data_size.to_le_bytes());
441
442 // Payload bytes
443 h = hash_bytes_u64(h, self.payload());
444
445 h
446 }
447
448 /// Generic helper: decode the payload as a Vec<T> in little-endian,
449 /// after checking the runtime data kind and size.
450 #[inline]
451 fn _as_le_bytes<T>(&self, expected_kind: MessageDataType) -> TelemetryResult<Vec<T>>
452 where
453 T: LeDecode,
454 {
455 self.ensure_kind(expected_kind)?;
456
457 let bytes: &[u8] = self.payload();
458 let width = T::WIDTH;
459
460 if !bytes.len().is_multiple_of(width) {
461 // Packet should already be validated; if not, surface a size error.
462 return Err(TelemetryError::SizeMismatch {
463 expected: (bytes.len() / width) * width,
464 got: bytes.len(),
465 });
466 }
467
468 let count = bytes.len() / width;
469 let mut out = Vec::with_capacity(count);
470
471 for chunk in bytes.chunks_exact(width) {
472 out.push(T::from_le(chunk));
473 }
474
475 Ok(out)
476 }
477
478 /// Validate basic invariants:
479 ///
480 /// - `endpoints` is non-empty.
481 /// - `payload.len() == data_size`.
482 /// - For static element count:
483 /// - `data_size == element_count * data_type_size(get_data_type(ty))`.
484 /// - For dynamic:
485 /// - Length and encoding are validated by [`validate_dynamic_len_and_content`].
486 /// # Returns
487 /// - `Ok(())` if validation passes.
488 /// - `Err(TelemetryError)` if validation fails.
489 pub fn validate(&self) -> TelemetryResult<()> {
490 if self.endpoints.is_empty() {
491 return Err(TelemetryError::EmptyEndpoints);
492 }
493 if self.payload.len() != self.data_size {
494 return Err(TelemetryError::SizeMismatch {
495 expected: self.data_size,
496 got: self.payload.len(),
497 });
498 }
499
500 Self::validate_payload_against_element(self.effective_element(), &self.payload)
501 }
502
503 /* ---- Getters ---- */
504 /// Get the message data type.
505 /// This is the logical schema selector.
506 #[inline]
507 pub fn data_type(&self) -> DataType {
508 self.ty
509 }
510
511 /// Get the sender identifier.
512 /// This is typically a device or subsystem name.
513 #[inline]
514 pub fn sender(&self) -> &str {
515 &self.sender
516 }
517
518 /// Get the destination endpoints for this message.
519 #[inline]
520 pub fn endpoints(&self) -> &[DataEndpoint] {
521 &self.endpoints
522 }
523
524 /// Get the timestamp in milliseconds.
525 #[inline]
526 pub fn timestamp(&self) -> u64 {
527 self.timestamp
528 }
529
530 /// Get the packet nonce.
531 #[inline]
532 pub fn nonce(&self) -> u16 {
533 self.nonce
534 }
535
536 /// Get the payload size in bytes.
537 #[inline]
538 pub fn data_size(&self) -> usize {
539 self.data_size
540 }
541
542 /// Get the raw payload bytes.
543 #[inline]
544 pub fn payload(&self) -> &[u8] {
545 &self.payload
546 }
547
548 /// Return the optional inline wire shape preserved from unpacking.
549 ///
550 /// This is crate-visible because packing and routing need to keep the
551 /// contract intact when a packet is forwarded again.
552 #[inline]
553 pub(crate) fn wire_shape(&self) -> Option<MessageElement> {
554 self.wire_shape
555 }
556
557 /// Return the frozen destination-holder hashes preserved from the wire contract.
558 ///
559 /// Routers and relays use this list to avoid delivering an in-flight packet
560 /// to newly-learned holders that were not part of the original delivery
561 /// contract when the packet was packed.
562 #[inline]
563 pub(crate) fn wire_target_senders(&self) -> &[u64] {
564 &self.wire_target_senders
565 }
566
567 /// Override the packet nonce while keeping the rest of the packet intact.
568 #[inline]
569 pub fn with_nonce(mut self, nonce: u16) -> Self {
570 self.nonce = nonce;
571 self
572 }
573
574 /// Header-only string (no decoded data).
575 ///
576 /// Example:
577 /// `Type: FOO, Data Size: 8, Sender: dev0, Endpoints: [EP_A, EP_B], Timestamp: 1234 (1s 234ms)`
578 /// # Returns
579 /// - Human-readable string with header fields.
580 pub fn header_string(&self) -> String {
581 let mut out = String::with_capacity(DEFAULT_STRING_CAPACITY);
582
583 let _ = write!(
584 &mut out,
585 "Type: {}, Data Size: {}, Sender: {}, Endpoints: [",
586 get_message_name(self.ty),
587 self.data_size,
588 self.sender.as_ref(),
589 );
590 for (i, ep) in self.endpoints.iter().enumerate() {
591 if i != 0 {
592 out.push_str(", ");
593 }
594 out.push_str(ep.as_str());
595 }
596 out.push_str("], Timestamp: ");
597 let _ = write!(&mut out, "{}", self.timestamp);
598
599 out.push_str(" (");
600 append_human_time(&mut out, self.timestamp);
601 out.push(')');
602 out
603 }
604
605 /// Borrow the payload as UTF-8 without trailing NULs (no allocation).
606 ///
607 /// Returns `None` if the message `DataType` is not a `String` type or if
608 /// the payload is not valid UTF-8 (after trimming trailing NUL).
609 /// # Returns
610 /// - `Some(&str)` if the payload is a valid UTF-8 string.
611 /// - `None` otherwise.
612 #[inline]
613 pub fn data_as_utf8_ref(&self) -> Option<&str> {
614 if self.effective_data_type() != MessageDataType::String {
615 return None;
616 }
617 let bytes = &self.payload;
618 let end = bytes.iter().rposition(|&b| b != 0).map(|i| i + 1)?;
619 core::str::from_utf8(&bytes[..end]).ok()
620 }
621
622 /// Helper: append decoded numeric/float elements to `s`.
623 ///
624 /// - Uses `LeBytes::from_le_slice` with fixed-width chunks.
625 /// - Floats (`f32`/`f64`) are formatted with a fixed precision
626 /// [`STRING_PRECISION`].
627 #[inline]
628 fn data_to_string<T>(&self, s: &mut String)
629 where
630 T: LeBytes + core::fmt::Display + 'static,
631 {
632 let it = self.payload.chunks_exact(T::WIDTH);
633 let mut first = true;
634
635 for chunk in it {
636 if !first {
637 s.push_str(", ");
638 }
639 first = false;
640
641 let v = T::from_le_slice(chunk);
642
643 // If this is a float type, use precision; otherwise, default formatting.
644 if TypeId::of::<T>() == TypeId::of::<f32>() || TypeId::of::<T>() == TypeId::of::<f64>()
645 {
646 // `{:.*}` = "use this precision argument"
647 let _ = write!(s, "{:.*}", STRING_PRECISION, v);
648 } else {
649 let _ = write!(s, "{v}");
650 }
651 }
652 }
653
654 /// Full pretty string including decoded data portion.
655 ///
656 /// - String payloads are rendered as `"..."`
657 /// - Numeric/bool payloads are rendered as comma-separated values
658 /// - Hex payloads are delegated to [`Packet::to_hex_string`]
659 /// # Returns
660 /// - Human-readable string with header and decoded data.
661 pub fn as_string(&self) -> String {
662 let mut s = String::from("{");
663 s.push_str(&self.header_string());
664
665 if self.payload.is_empty() {
666 s.push_str(", Data: (<NoData>)}");
667 return s;
668 }
669
670 match self.effective_message_class() {
671 MessageClass::Data => {
672 s.push_str(", Data: (");
673 }
674 MessageClass::Error => {
675 s.push_str(", Error: (");
676 }
677 MessageClass::Warning => {
678 s.push_str(", Warning: (");
679 }
680 }
681
682 if let Some(msg) = self.data_as_utf8_ref() {
683 s.push('"');
684 s.push_str(msg);
685 s.push_str("\")}");
686 return s;
687 }
688
689 match self.effective_data_type() {
690 MessageDataType::Float64 => {
691 self.data_to_string::<f64>(&mut s);
692 }
693 MessageDataType::Float32 => {
694 self.data_to_string::<f32>(&mut s);
695 }
696 MessageDataType::UInt128 => {
697 self.data_to_string::<u128>(&mut s);
698 }
699 MessageDataType::UInt64 => {
700 self.data_to_string::<u64>(&mut s);
701 }
702 MessageDataType::UInt32 => {
703 self.data_to_string::<u32>(&mut s);
704 }
705 MessageDataType::UInt16 => {
706 self.data_to_string::<u16>(&mut s);
707 }
708 MessageDataType::UInt8 => {
709 self.data_to_string::<u8>(&mut s);
710 }
711 MessageDataType::Int128 => {
712 self.data_to_string::<i128>(&mut s);
713 }
714 MessageDataType::Int64 => {
715 self.data_to_string::<i64>(&mut s);
716 }
717 MessageDataType::Int32 => {
718 self.data_to_string::<i32>(&mut s);
719 }
720 MessageDataType::Int16 => {
721 self.data_to_string::<i16>(&mut s);
722 }
723 MessageDataType::Int8 => {
724 self.data_to_string::<i8>(&mut s);
725 }
726 MessageDataType::Bool => {
727 // Interpret any nonzero as true.
728 let mut it = self.payload.iter().peekable();
729 while let Some(b) = it.next() {
730 let _ = write!(s, "{}", *b != 0);
731 if it.peek().is_some() {
732 s.push_str(", ");
733 }
734 }
735 }
736 MessageDataType::String => {
737 // Already handled above via `data_as_utf8_ref`.
738 }
739 MessageDataType::Binary => return self.to_hex_string(),
740 MessageDataType::NoData => {
741 s.push_str("<no data>");
742 }
743 }
744
745 s.push_str(")}");
746 s
747 }
748
749 /// Hex dump variant of [`Packet::as_string`].
750 ///
751 /// Produces:
752 ///
753 /// `Type: ..., Data Size: ..., ..., Timestamp: ... (...), Data (hex): 0xNN 0xNN ...`
754 /// # Returns
755 /// - Human-readable string with header and hex-formatted data.
756 pub fn to_hex_string(&self) -> String {
757 // Header first.
758 let mut s = self.header_string();
759 s.push_str(", Data (hex):");
760
761 if !self.payload.is_empty() {
762 // Reserve roughly 5 chars per byte: " 0xNN".
763 s.reserve(self.payload.len().saturating_mul(5));
764 for &b in self.payload.iter() {
765 let _ = write!(&mut s, " 0x{:02x}", b);
766 }
767 }
768 s
769 }
770
771 // =========================================================================
772 // Typed data accessors
773 // =========================================================================
774
775 /// Ensure this packet's element type matches `expected`.
776 #[inline]
777 fn ensure_kind(&self, expected: MessageDataType) -> TelemetryResult<()> {
778 let dt = self.effective_data_type();
779 if dt != expected {
780 return Err(TelemetryError::TypeMismatch {
781 expected: data_type_size(expected),
782 got: data_type_size(dt),
783 });
784 }
785 Ok(())
786 }
787
788 impl_data_as_prim! {
789 data_as_f32, f32, MessageDataType::Float32;
790 data_as_f64, f64, MessageDataType::Float64;
791
792 data_as_u8, u8, MessageDataType::UInt8;
793 data_as_u16, u16, MessageDataType::UInt16;
794 data_as_u32, u32, MessageDataType::UInt32;
795 data_as_u64, u64, MessageDataType::UInt64;
796 data_as_u128, u128, MessageDataType::UInt128;
797
798 data_as_i8, i8, MessageDataType::Int8;
799 data_as_i16, i16, MessageDataType::Int16;
800 data_as_i32, i32, MessageDataType::Int32;
801 data_as_i64, i64, MessageDataType::Int64;
802 data_as_i128, i128, MessageDataType::Int128;
803 }
804
805 /// Decode payload as `bool`s. Any non-zero byte is treated as `true`.
806 #[inline]
807 pub fn data_as_bool(&self) -> TelemetryResult<Vec<bool>> {
808 self.ensure_kind(MessageDataType::Bool)?;
809 Ok(self.payload.iter().map(|&b| b != 0).collect())
810 }
811
812 /// Decode payload as a string (for String type).
813 #[inline]
814 pub fn data_as_string(&self) -> TelemetryResult<String> {
815 self.ensure_kind(MessageDataType::String)?;
816
817 let bytes = &self.payload;
818 let end = bytes
819 .iter()
820 .rposition(|&b| b != 0)
821 .map(|i| i + 1)
822 .unwrap_or(0);
823
824 if end == 0 {
825 return Ok(String::new());
826 }
827
828 let s = core::str::from_utf8(&bytes[..end])
829 .map_err(|_| TelemetryError::InvalidUtf8)?
830 .to_string();
831 Ok(s)
832 }
833
834 /// Decode payload as raw bytes (for Binary type).
835 #[inline]
836 pub fn data_as_binary(&self) -> TelemetryResult<Vec<u8>> {
837 self.ensure_kind(MessageDataType::Binary)?;
838 Ok(self.payload.to_vec())
839 }
840
841 /// Internal helper: build a packet from a slice of primitive values
842 /// encoded as little-endian, using the given sender.
843 ///
844 /// Works for all numeric types (integer and float) as long as the schema's
845 /// element width matches `T`'s width. Not used for String/Binary/Bool.
846 fn from_prim_le_slice_with_sender<T>(
847 ty: DataType,
848 values: &[T],
849 endpoints: &[DataEndpoint],
850 sender: &str,
851 timestamp: u64,
852 ) -> TelemetryResult<Self>
853 where
854 T: Copy + 'static,
855 {
856 let dt = get_data_type(ty);
857
858 // Only allow numeric-ish types here; String/Binary/Bool are handled elsewhere.
859
860 if dt == MessageDataType::Bool
861 || dt == MessageDataType::String
862 || dt == MessageDataType::Binary
863 {
864 // For these, use dedicated constructors (bool / string / binary).
865 return Err(TelemetryError::BadArg);
866 }
867 let element_size = data_type_size(dt);
868
869 // Ensure T's width matches what the schema expects.
870 if element_size != size_of::<T>() {
871 return Err(TelemetryError::TypeMismatch {
872 expected: element_size,
873 got: size_of::<T>(),
874 });
875 }
876
877 let total_bytes = values.len() * element_size;
878
879 // If the schema has a static element count, enforce it up front.
880 if let MessageElement::Static(exact, _, _) = message_meta(ty).element {
881 let exact_bytes = exact * element_size;
882 if total_bytes != exact_bytes {
883 return Err(TelemetryError::SizeMismatch {
884 expected: exact_bytes,
885 got: total_bytes,
886 });
887 }
888 }
889
890 let mut bytes = vec![0u8; total_bytes];
891 unsafe { bytes.set_len(total_bytes) };
892
893 for (i, v) in values.iter().copied().enumerate() {
894 let offset = i * element_size;
895 let dst = &mut bytes[offset..offset + element_size];
896
897 // Copy the raw memory of `v` into the buffer.
898 unsafe {
899 core::ptr::copy_nonoverlapping(
900 &v as *const T as *const u8,
901 dst.as_mut_ptr(),
902 element_size,
903 );
904 }
905
906 // Normalize to little-endian on big-endian targets.
907 // On little-endian this block is compiled out.
908 #[cfg(target_endian = "big")]
909 {
910 dst.reverse();
911 }
912 }
913
914 Self::new(ty, endpoints, sender, timestamp, Arc::<[u8]>::from(bytes))
915 }
916
917 /// Same as `from_prim_le_slice_with_sender` but uses `DEVICE_IDENTIFIER`
918 /// as the sender (mirrors `from_u8_slice`, `from_f32_slice`).
919 #[inline]
920 pub fn from_prim_le_slice<T>(
921 ty: DataType,
922 values: &[T],
923 endpoints: &[DataEndpoint],
924 timestamp: u64,
925 ) -> TelemetryResult<Self>
926 where
927 T: Copy + 'static,
928 {
929 Self::from_prim_le_slice_with_sender(ty, values, endpoints, DEVICE_IDENTIFIER, timestamp)
930 }
931
932 // -------------------------------------------------------------------------
933 // Convenience wrappers for all numeric types
934 // -------------------------------------------------------------------------
935 impl_from_prim_slices! {
936 from_u8_slice, u8;
937 from_u16_slice, u16;
938 from_i8_slice, i8;
939 from_i16_slice, i16;
940 from_u32_slice, u32;
941 from_i32_slice, i32;
942 from_u64_slice, u64;
943 from_i64_slice, i64;
944 from_u128_slice, u128;
945 from_i128_slice, i128;
946 from_f32_slice, f32;
947 from_f64_slice, f64;
948 }
949
950 /// Builds a packet with an empty payload for types whose schema allows zero bytes.
951 #[inline]
952 pub fn from_no_data(
953 ty: DataType,
954 endpoints: &[DataEndpoint],
955 timestamp: u64,
956 ) -> TelemetryResult<Self> {
957 let meta = message_meta(ty);
958 match meta.element {
959 MessageElement::Static(need, _, _) => {
960 if need != 0 {
961 return Err(TelemetryError::SizeMismatch {
962 expected: need,
963 got: 0,
964 });
965 }
966 }
967 MessageElement::Dynamic(_, _) => {
968 // Dynamic with zero-length payload is OK.
969 }
970 }
971
972 Self::new(
973 ty,
974 endpoints,
975 DEVICE_IDENTIFIER,
976 timestamp,
977 Arc::<[u8]>::from([]),
978 )
979 }
980
981 /// Bool constructor: encodes each bool as a single byte (0 / 1).
982 #[inline]
983 pub fn from_bool_slice(
984 ty: DataType,
985 values: &[bool],
986 endpoints: &[DataEndpoint],
987 timestamp: u64,
988 ) -> TelemetryResult<Self> {
989 if get_data_type(ty) != MessageDataType::Bool {
990 return Err(TelemetryError::TypeMismatch {
991 expected: data_type_size(get_data_type(ty)),
992 got: size_of::<bool>(),
993 });
994 }
995
996 let total_bytes = values.len();
997 if let MessageElement::Static(exact, _, _) = message_meta(ty).element
998 && total_bytes != exact
999 {
1000 return Err(TelemetryError::SizeMismatch {
1001 expected: exact,
1002 got: total_bytes,
1003 });
1004 }
1005
1006 let mut bytes = Vec::with_capacity(total_bytes);
1007 bytes.extend(values.iter().map(|b| if *b { 1u8 } else { 0u8 }));
1008
1009 Self::new(
1010 ty,
1011 endpoints,
1012 DEVICE_IDENTIFIER,
1013 timestamp,
1014 Arc::<[u8]>::from(bytes),
1015 )
1016 }
1017
1018 /// String constructor (dynamic length). Trailing NULs are not added;
1019 /// `new()` + `validate_dynamic_len_and_content` will do UTF-8 validation.
1020 #[inline]
1021 pub fn from_str_slice(
1022 ty: DataType,
1023 s: &str,
1024 endpoints: &[DataEndpoint],
1025 timestamp: u64,
1026 ) -> TelemetryResult<Self> {
1027 if get_data_type(ty) != MessageDataType::String {
1028 return Err(TelemetryError::TypeMismatch {
1029 expected: data_type_size(get_data_type(ty)),
1030 got: 1,
1031 });
1032 }
1033
1034 let bytes: Arc<[u8]> = Arc::from(s.as_bytes());
1035 Self::new(ty, endpoints, DEVICE_IDENTIFIER, timestamp, bytes)
1036 }
1037}
1038
1039// ============================================================================
1040// Time formatting (no_std-friendly, UTC or uptime)
1041// ============================================================================
1042
1043#[inline]
1044fn div_mod_u64(n: u64, d: u64) -> (u64, u64) {
1045 (n / d, n % d)
1046}
1047
1048// Howard Hinnant–style civil-from-days (proleptic Gregorian).
1049fn civil_from_days(mut z: i64) -> (i32, u32, u32) {
1050 // epoch (1970-01-01) has days=0
1051 z += 719_468; // shift to civil base
1052 let era = (if z >= 0 { z } else { z - 146_096 }) / 146_097;
1053 let doe = z - era * 146_097; // [0, 146096]
1054 let yoe = (doe - doe / 1_460 + doe / 36_524 - doe / 146_096) / 365; // [0, 399]
1055 let y = (yoe as i32) + era as i32 * 400;
1056 let doy = (doe - (365 * yoe + yoe / 4 - yoe / 100)) as i32; // [0, 365]
1057 let mp = (5 * doy + 2) / 153; // [0, 11]
1058 let d = doy - (153 * mp + 2) / 5 + 1; // [1, 31]
1059 let m = mp + if mp < 10 { 3 } else { -9 }; // [1, 12]
1060 let y = y + (m <= 2) as i32; // year
1061 (y, m as u32, d as u32)
1062}
1063
1064/// Append a human-readable timestamp to `out`, either uptime (`hh:mm:ss.mmm`)
1065/// or UTC epoch like `YYYY-MM-DD HH:MM:SS.mmmZ`, depending on threshold.
1066fn append_human_time(out: &mut String, total_ms: u64) {
1067 if total_ms >= EPOCH_MS_THRESHOLD {
1068 // Unix epoch path.
1069 let (secs, sub_ms) = div_mod_u64(total_ms, 1_000);
1070 let days = (secs / 86_400) as i64;
1071 let sod = (secs % 86_400) as u32; // seconds of day
1072 let (year, month, day) = civil_from_days(days);
1073 let hour = sod / 3_600;
1074 let min = (sod % 3_600) / 60;
1075 let sec = sod % 60;
1076 let _ = Write::write_fmt(
1077 out,
1078 format_args!(
1079 "{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:03}Z",
1080 year, month, day, hour, min, sec, sub_ms as u32
1081 ),
1082 );
1083 } else {
1084 // Uptime-style duration.
1085 let hours = total_ms / 3_600_000;
1086 let minutes = (total_ms % 3_600_000) / 60_000;
1087 let seconds = (total_ms % 60_000) / 1_000;
1088 let milliseconds = total_ms % 1_000;
1089 if hours > 0 {
1090 let _ = Write::write_fmt(
1091 out,
1092 format_args!("{hours}h {minutes:02}m {seconds:02}s {milliseconds:03}ms"),
1093 );
1094 } else if minutes > 0 {
1095 let _ = Write::write_fmt(
1096 out,
1097 format_args!("{minutes}m {seconds:02}s {milliseconds:03}ms"),
1098 );
1099 } else {
1100 let _ = Write::write_fmt(out, format_args!("{seconds}s {milliseconds:03}ms"));
1101 }
1102 }
1103}
1104
1105// ============================================================================
1106// Display impl
1107// ============================================================================
1108
1109impl core::fmt::Display for Packet {
1110 #[inline]
1111 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
1112 f.write_str(&Packet::as_string(self))
1113 }
1114}
1115
1116impl ByteCost for Packet {
1117 #[inline]
1118 fn byte_cost(&self) -> usize {
1119 size_of::<Self>()
1120 + self.sender.len()
1121 + self.endpoints.len() * size_of::<DataEndpoint>()
1122 + self.payload.byte_cost()
1123 }
1124}