use crate::config::{StandardSmallPayload, runtime_device_identifier, runtime_string_precision};
use crate::queue::ByteCost;
use crate::{
MessageClass, MessageDataType, MessageElement, TelemetryError, TelemetryResult,
config::{DataEndpoint, DataType},
data_type_size, get_data_type, get_message_name, message_meta,
router::LeBytes,
};
use crate::{impl_data_as_prim, impl_from_prim_slices, impl_ledecode_auto};
use alloc::{string::String, string::ToString, sync::Arc, vec, vec::Vec};
use core::any::TypeId;
use core::fmt::{Formatter, Write};
use core::sync::atomic::{AtomicU32, Ordering};
const EPOCH_MS_THRESHOLD: u64 = 1_000_000_000_000;
const DEFAULT_STRING_CAPACITY: usize = 96;
static PACKET_NONCE_COUNTER: AtomicU32 = AtomicU32::new(0xA5C3_1F27);
#[inline]
fn next_packet_nonce() -> u16 {
let mut x = PACKET_NONCE_COUNTER.fetch_add(0x9E37_79B9, Ordering::Relaxed);
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
(x as u16) | 1
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Packet {
ty: DataType,
data_size: usize,
sender: Arc<str>,
endpoints: Arc<[DataEndpoint]>,
timestamp: u64,
nonce: u16,
payload: StandardSmallPayload,
wire_shape: Option<MessageElement>,
wire_target_senders: Arc<[u64]>,
}
#[inline]
const fn element_width(dt: MessageDataType) -> usize {
match dt {
MessageDataType::UInt8 | MessageDataType::Int8 | MessageDataType::Bool => 1,
MessageDataType::UInt16 | MessageDataType::Int16 => 2,
MessageDataType::UInt32 | MessageDataType::Int32 | MessageDataType::Float32 => 4,
MessageDataType::UInt64 | MessageDataType::Int64 | MessageDataType::Float64 => 8,
MessageDataType::UInt128 | MessageDataType::Int128 => 16,
MessageDataType::String | MessageDataType::Binary => 1,
MessageDataType::NoData => 0,
}
}
#[inline]
pub fn hash_bytes_u64(mut h: u64, bytes: &[u8]) -> u64 {
const PRIME: u64 = 0x9E37_79B1;
for &b in bytes {
h ^= b as u64;
h = h.wrapping_mul(PRIME);
h ^= h >> 27;
}
h
}
#[inline]
pub(crate) fn sender_address_u32(sender: &str) -> u32 {
let raw = hash_bytes_u64(0xA6D3_8C21_4B7F_19E5, sender.as_bytes()) as u32;
if raw == 0 { 1 } else { raw }
}
#[inline]
fn validate_dynamic_len_and_content_for_element(
element: MessageElement,
bytes: &[u8],
) -> TelemetryResult<()> {
match element.data_type() {
MessageDataType::String => {
let end = bytes
.iter()
.rposition(|&b| b != 0)
.map(|i| i + 1)
.unwrap_or(0);
if end > 0 {
core::str::from_utf8(&bytes[..end]).map_err(|_| TelemetryError::InvalidUtf8)?;
}
Ok(())
}
MessageDataType::Binary => Ok(()),
dt => {
let w = element_width(dt);
if w == 0 || !bytes.len().is_multiple_of(w) {
return Err(TelemetryError::SizeMismatch {
expected: w,
got: bytes.len(),
});
}
Ok(())
}
}
}
trait LeDecode: Sized {
const WIDTH: usize;
fn from_le(slice: &[u8]) -> Self;
}
impl_ledecode_auto!(f32);
impl_ledecode_auto!(f64);
impl_ledecode_auto!(u16);
impl_ledecode_auto!(u32);
impl_ledecode_auto!(u64);
impl_ledecode_auto!(u128);
impl_ledecode_auto!(i16);
impl_ledecode_auto!(i32);
impl_ledecode_auto!(i64);
impl_ledecode_auto!(i128);
impl_ledecode_auto!(u8);
impl_ledecode_auto!(i8);
impl Packet {
#[inline]
fn validate_payload_against_element(
element: MessageElement,
payload: &[u8],
) -> TelemetryResult<()> {
match element {
MessageElement::Static(need, dt, _) => {
let need = need * data_type_size(dt);
if payload.len() != need {
return Err(TelemetryError::SizeMismatch {
expected: need,
got: payload.len(),
});
}
Ok(())
}
MessageElement::Dynamic(_, _) => {
validate_dynamic_len_and_content_for_element(element, payload)
}
}
}
#[inline]
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_with_wire_contract(
ty: DataType,
endpoints: &[DataEndpoint],
sender: &str,
timestamp: u64,
nonce: u16,
payload: Arc<[u8]>,
wire_shape: Option<MessageElement>,
wire_target_senders: Arc<[u64]>,
) -> TelemetryResult<Self> {
if endpoints.is_empty() {
return Err(TelemetryError::EmptyEndpoints);
}
let element = wire_shape.unwrap_or(message_meta(ty).element);
Self::validate_payload_against_element(element, &payload)?;
Ok(Self {
ty,
data_size: payload.len(),
sender: sender.into(),
endpoints: Arc::<[DataEndpoint]>::from(endpoints),
timestamp,
nonce,
payload: StandardSmallPayload::new(&payload),
wire_shape,
wire_target_senders,
})
}
pub fn new(
ty: DataType,
endpoints: &[DataEndpoint],
sender: &str,
timestamp: u64,
payload: Arc<[u8]>,
) -> TelemetryResult<Self> {
Self::new_with_nonce(
ty,
endpoints,
sender,
timestamp,
next_packet_nonce(),
payload,
)
}
pub fn new_with_nonce(
ty: DataType,
endpoints: &[DataEndpoint],
sender: &str,
timestamp: u64,
nonce: u16,
payload: Arc<[u8]>,
) -> TelemetryResult<Self> {
Self::new_with_wire_contract(
ty,
endpoints,
sender,
timestamp,
nonce,
payload,
None,
Arc::<[u64]>::from([]),
)
}
#[inline]
fn effective_element(&self) -> MessageElement {
self.wire_shape.unwrap_or(message_meta(self.ty).element)
}
#[inline]
fn effective_data_type(&self) -> MessageDataType {
self.effective_element().data_type()
}
#[inline]
fn effective_message_class(&self) -> MessageClass {
self.effective_element().message_type()
}
#[inline]
pub fn packet_id(&self) -> u64 {
let mut h: u64 = 0x9E37_79B9_7F4A_7C15;
h = hash_bytes_u64(h, &sender_address_u32(self.sender.as_ref()).to_le_bytes());
h = hash_bytes_u64(h, get_message_name(self.ty).as_bytes());
for ep in self.endpoints.iter() {
h = hash_bytes_u64(h, ep.as_str().as_bytes());
}
h = hash_bytes_u64(h, &self.timestamp.to_le_bytes());
h = hash_bytes_u64(h, &self.nonce.to_le_bytes());
h = hash_bytes_u64(h, &self.data_size.to_le_bytes());
h = hash_bytes_u64(h, self.payload());
h
}
#[inline]
fn _as_le_bytes<T>(&self, expected_kind: MessageDataType) -> TelemetryResult<Vec<T>>
where
T: LeDecode,
{
self.ensure_kind(expected_kind)?;
let bytes: &[u8] = self.payload();
let width = T::WIDTH;
if !bytes.len().is_multiple_of(width) {
return Err(TelemetryError::SizeMismatch {
expected: (bytes.len() / width) * width,
got: bytes.len(),
});
}
let count = bytes.len() / width;
let mut out = Vec::with_capacity(count);
for chunk in bytes.chunks_exact(width) {
out.push(T::from_le(chunk));
}
Ok(out)
}
pub fn validate(&self) -> TelemetryResult<()> {
if self.endpoints.is_empty() {
return Err(TelemetryError::EmptyEndpoints);
}
if self.payload.len() != self.data_size {
return Err(TelemetryError::SizeMismatch {
expected: self.data_size,
got: self.payload.len(),
});
}
Self::validate_payload_against_element(self.effective_element(), &self.payload)
}
#[inline]
pub fn data_type(&self) -> DataType {
self.ty
}
#[inline]
pub fn sender(&self) -> &str {
&self.sender
}
#[inline]
pub fn endpoints(&self) -> &[DataEndpoint] {
&self.endpoints
}
#[inline]
pub fn timestamp(&self) -> u64 {
self.timestamp
}
#[inline]
pub fn nonce(&self) -> u16 {
self.nonce
}
#[inline]
pub fn data_size(&self) -> usize {
self.data_size
}
#[inline]
pub fn payload(&self) -> &[u8] {
&self.payload
}
#[inline]
pub(crate) fn wire_shape(&self) -> Option<MessageElement> {
self.wire_shape
}
#[inline]
pub(crate) fn wire_target_senders(&self) -> &[u64] {
&self.wire_target_senders
}
#[inline]
pub fn with_nonce(mut self, nonce: u16) -> Self {
self.nonce = nonce;
self
}
pub fn header_string(&self) -> String {
let mut out = String::with_capacity(DEFAULT_STRING_CAPACITY);
let _ = write!(
&mut out,
"Type: {}, Data Size: {}, Sender: {}, Endpoints: [",
get_message_name(self.ty),
self.data_size,
self.sender.as_ref(),
);
for (i, ep) in self.endpoints.iter().enumerate() {
if i != 0 {
out.push_str(", ");
}
out.push_str(ep.as_str());
}
out.push_str("], Timestamp: ");
let _ = write!(&mut out, "{}", self.timestamp);
out.push_str(" (");
append_human_time(&mut out, self.timestamp);
out.push(')');
out
}
#[inline]
pub fn data_as_utf8_ref(&self) -> Option<&str> {
if self.effective_data_type() != MessageDataType::String {
return None;
}
let bytes = &self.payload;
let end = bytes.iter().rposition(|&b| b != 0).map(|i| i + 1)?;
core::str::from_utf8(&bytes[..end]).ok()
}
#[inline]
fn data_to_string<T>(&self, s: &mut String)
where
T: LeBytes + core::fmt::Display + 'static,
{
let it = self.payload.chunks_exact(T::WIDTH);
let mut first = true;
for chunk in it {
if !first {
s.push_str(", ");
}
first = false;
let v = T::from_le_slice(chunk);
if TypeId::of::<T>() == TypeId::of::<f32>() || TypeId::of::<T>() == TypeId::of::<f64>()
{
let _ = write!(s, "{:.*}", runtime_string_precision(), v);
} else {
let _ = write!(s, "{v}");
}
}
}
pub fn as_string(&self) -> String {
let mut s = String::from("{");
s.push_str(&self.header_string());
if self.payload.is_empty() {
s.push_str(", Data: (<NoData>)}");
return s;
}
match self.effective_message_class() {
MessageClass::Data => {
s.push_str(", Data: (");
}
MessageClass::Error => {
s.push_str(", Error: (");
}
MessageClass::Warning => {
s.push_str(", Warning: (");
}
}
if let Some(msg) = self.data_as_utf8_ref() {
s.push('"');
s.push_str(msg);
s.push_str("\")}");
return s;
}
match self.effective_data_type() {
MessageDataType::Float64 => {
self.data_to_string::<f64>(&mut s);
}
MessageDataType::Float32 => {
self.data_to_string::<f32>(&mut s);
}
MessageDataType::UInt128 => {
self.data_to_string::<u128>(&mut s);
}
MessageDataType::UInt64 => {
self.data_to_string::<u64>(&mut s);
}
MessageDataType::UInt32 => {
self.data_to_string::<u32>(&mut s);
}
MessageDataType::UInt16 => {
self.data_to_string::<u16>(&mut s);
}
MessageDataType::UInt8 => {
self.data_to_string::<u8>(&mut s);
}
MessageDataType::Int128 => {
self.data_to_string::<i128>(&mut s);
}
MessageDataType::Int64 => {
self.data_to_string::<i64>(&mut s);
}
MessageDataType::Int32 => {
self.data_to_string::<i32>(&mut s);
}
MessageDataType::Int16 => {
self.data_to_string::<i16>(&mut s);
}
MessageDataType::Int8 => {
self.data_to_string::<i8>(&mut s);
}
MessageDataType::Bool => {
let mut it = self.payload.iter().peekable();
while let Some(b) = it.next() {
let _ = write!(s, "{}", *b != 0);
if it.peek().is_some() {
s.push_str(", ");
}
}
}
MessageDataType::String => {
}
MessageDataType::Binary => return self.to_hex_string(),
MessageDataType::NoData => {
s.push_str("<no data>");
}
}
s.push_str(")}");
s
}
pub fn to_hex_string(&self) -> String {
let mut s = self.header_string();
s.push_str(", Data (hex):");
if !self.payload.is_empty() {
s.reserve(self.payload.len().saturating_mul(5));
for &b in self.payload.iter() {
let _ = write!(&mut s, " 0x{:02x}", b);
}
}
s
}
#[inline]
fn ensure_kind(&self, expected: MessageDataType) -> TelemetryResult<()> {
let dt = self.effective_data_type();
if dt != expected {
return Err(TelemetryError::TypeMismatch {
expected: data_type_size(expected),
got: data_type_size(dt),
});
}
Ok(())
}
impl_data_as_prim! {
data_as_f32, f32, MessageDataType::Float32;
data_as_f64, f64, MessageDataType::Float64;
data_as_u8, u8, MessageDataType::UInt8;
data_as_u16, u16, MessageDataType::UInt16;
data_as_u32, u32, MessageDataType::UInt32;
data_as_u64, u64, MessageDataType::UInt64;
data_as_u128, u128, MessageDataType::UInt128;
data_as_i8, i8, MessageDataType::Int8;
data_as_i16, i16, MessageDataType::Int16;
data_as_i32, i32, MessageDataType::Int32;
data_as_i64, i64, MessageDataType::Int64;
data_as_i128, i128, MessageDataType::Int128;
}
#[inline]
pub fn data_as_bool(&self) -> TelemetryResult<Vec<bool>> {
self.ensure_kind(MessageDataType::Bool)?;
Ok(self.payload.iter().map(|&b| b != 0).collect())
}
#[inline]
pub fn data_as_string(&self) -> TelemetryResult<String> {
self.ensure_kind(MessageDataType::String)?;
let bytes = &self.payload;
let end = bytes
.iter()
.rposition(|&b| b != 0)
.map(|i| i + 1)
.unwrap_or(0);
if end == 0 {
return Ok(String::new());
}
let s = core::str::from_utf8(&bytes[..end])
.map_err(|_| TelemetryError::InvalidUtf8)?
.to_string();
Ok(s)
}
#[inline]
pub fn data_as_binary(&self) -> TelemetryResult<Vec<u8>> {
self.ensure_kind(MessageDataType::Binary)?;
Ok(self.payload.to_vec())
}
fn from_prim_le_slice_with_sender<T>(
ty: DataType,
values: &[T],
endpoints: &[DataEndpoint],
sender: &str,
timestamp: u64,
) -> TelemetryResult<Self>
where
T: Copy + 'static,
{
let dt = get_data_type(ty);
if dt == MessageDataType::Bool
|| dt == MessageDataType::String
|| dt == MessageDataType::Binary
{
return Err(TelemetryError::BadArg);
}
let element_size = data_type_size(dt);
if element_size != size_of::<T>() {
return Err(TelemetryError::TypeMismatch {
expected: element_size,
got: size_of::<T>(),
});
}
let total_bytes = values.len() * element_size;
if let MessageElement::Static(exact, _, _) = message_meta(ty).element {
let exact_bytes = exact * element_size;
if total_bytes != exact_bytes {
return Err(TelemetryError::SizeMismatch {
expected: exact_bytes,
got: total_bytes,
});
}
}
let mut bytes = vec![0u8; total_bytes];
unsafe { bytes.set_len(total_bytes) };
for (i, v) in values.iter().copied().enumerate() {
let offset = i * element_size;
let dst = &mut bytes[offset..offset + element_size];
unsafe {
core::ptr::copy_nonoverlapping(
&v as *const T as *const u8,
dst.as_mut_ptr(),
element_size,
);
}
#[cfg(target_endian = "big")]
{
dst.reverse();
}
}
Self::new(ty, endpoints, sender, timestamp, Arc::<[u8]>::from(bytes))
}
#[inline]
pub fn from_prim_le_slice<T>(
ty: DataType,
values: &[T],
endpoints: &[DataEndpoint],
timestamp: u64,
) -> TelemetryResult<Self>
where
T: Copy + 'static,
{
let sender = runtime_device_identifier();
Self::from_prim_le_slice_with_sender(ty, values, endpoints, &sender, timestamp)
}
impl_from_prim_slices! {
from_u8_slice, u8;
from_u16_slice, u16;
from_i8_slice, i8;
from_i16_slice, i16;
from_u32_slice, u32;
from_i32_slice, i32;
from_u64_slice, u64;
from_i64_slice, i64;
from_u128_slice, u128;
from_i128_slice, i128;
from_f32_slice, f32;
from_f64_slice, f64;
}
#[inline]
pub fn from_no_data(
ty: DataType,
endpoints: &[DataEndpoint],
timestamp: u64,
) -> TelemetryResult<Self> {
let meta = message_meta(ty);
match meta.element {
MessageElement::Static(need, _, _) => {
if need != 0 {
return Err(TelemetryError::SizeMismatch {
expected: need,
got: 0,
});
}
}
MessageElement::Dynamic(_, _) => {
}
}
let sender = runtime_device_identifier();
Self::new(ty, endpoints, &sender, timestamp, Arc::<[u8]>::from([]))
}
#[inline]
pub fn from_bool_slice(
ty: DataType,
values: &[bool],
endpoints: &[DataEndpoint],
timestamp: u64,
) -> TelemetryResult<Self> {
if get_data_type(ty) != MessageDataType::Bool {
return Err(TelemetryError::TypeMismatch {
expected: data_type_size(get_data_type(ty)),
got: size_of::<bool>(),
});
}
let total_bytes = values.len();
if let MessageElement::Static(exact, _, _) = message_meta(ty).element
&& total_bytes != exact
{
return Err(TelemetryError::SizeMismatch {
expected: exact,
got: total_bytes,
});
}
let mut bytes = Vec::with_capacity(total_bytes);
bytes.extend(values.iter().map(|b| if *b { 1u8 } else { 0u8 }));
let sender = runtime_device_identifier();
Self::new(ty, endpoints, &sender, timestamp, Arc::<[u8]>::from(bytes))
}
#[inline]
pub fn from_str_slice(
ty: DataType,
s: &str,
endpoints: &[DataEndpoint],
timestamp: u64,
) -> TelemetryResult<Self> {
if get_data_type(ty) != MessageDataType::String {
return Err(TelemetryError::TypeMismatch {
expected: data_type_size(get_data_type(ty)),
got: 1,
});
}
let bytes: Arc<[u8]> = Arc::from(s.as_bytes());
let sender = runtime_device_identifier();
Self::new(ty, endpoints, &sender, timestamp, bytes)
}
}
#[inline]
fn div_mod_u64(n: u64, d: u64) -> (u64, u64) {
(n / d, n % d)
}
fn civil_from_days(mut z: i64) -> (i32, u32, u32) {
z += 719_468; let era = (if z >= 0 { z } else { z - 146_096 }) / 146_097;
let doe = z - era * 146_097; let yoe = (doe - doe / 1_460 + doe / 36_524 - doe / 146_096) / 365; let y = (yoe as i32) + era as i32 * 400;
let doy = (doe - (365 * yoe + yoe / 4 - yoe / 100)) as i32; let mp = (5 * doy + 2) / 153; let d = doy - (153 * mp + 2) / 5 + 1; let m = mp + if mp < 10 { 3 } else { -9 }; let y = y + (m <= 2) as i32; (y, m as u32, d as u32)
}
fn append_human_time(out: &mut String, total_ms: u64) {
if total_ms >= EPOCH_MS_THRESHOLD {
let (secs, sub_ms) = div_mod_u64(total_ms, 1_000);
let days = (secs / 86_400) as i64;
let sod = (secs % 86_400) as u32; let (year, month, day) = civil_from_days(days);
let hour = sod / 3_600;
let min = (sod % 3_600) / 60;
let sec = sod % 60;
let _ = Write::write_fmt(
out,
format_args!(
"{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:03}Z",
year, month, day, hour, min, sec, sub_ms as u32
),
);
} else {
let hours = total_ms / 3_600_000;
let minutes = (total_ms % 3_600_000) / 60_000;
let seconds = (total_ms % 60_000) / 1_000;
let milliseconds = total_ms % 1_000;
if hours > 0 {
let _ = Write::write_fmt(
out,
format_args!("{hours}h {minutes:02}m {seconds:02}s {milliseconds:03}ms"),
);
} else if minutes > 0 {
let _ = Write::write_fmt(
out,
format_args!("{minutes}m {seconds:02}s {milliseconds:03}ms"),
);
} else {
let _ = Write::write_fmt(out, format_args!("{seconds}s {milliseconds:03}ms"));
}
}
}
impl core::fmt::Display for Packet {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
f.write_str(&Packet::as_string(self))
}
}
impl ByteCost for Packet {
#[inline]
fn byte_cost(&self) -> usize {
size_of::<Self>()
+ self.sender.len()
+ self.endpoints.len() * size_of::<DataEndpoint>()
+ self.payload.byte_cost()
}
}