#[cfg(not(feature = "std"))]
use alloc::{vec, vec::Vec};
use crate::classifier::{Classification, Classifier};
use crate::context::Context;
use crate::error::{EncodeError, Result};
use crate::metrics::CompressionMetrics;
use crate::protocol::{
ChannelInput, CompactHeader, EncodedMessage, EncodingType, MessageHeader, MessageType,
Priority, RawData, COMPACT_MARKER_DATA, COMPACT_MARKER_KEYFRAME,
};
#[derive(Debug, Clone)]
pub struct Encoder {
sequence: u16,
include_checksum: bool,
}
impl Encoder {
pub fn new() -> Self {
Self {
sequence: 0,
include_checksum: false,
}
}
pub fn with_checksum() -> Self {
Self {
sequence: 0,
include_checksum: true,
}
}
pub fn checksum_enabled(&self) -> bool {
self.include_checksum
}
pub fn sequence(&self) -> u16 {
self.sequence
}
pub fn reset_sequence(&mut self) {
self.sequence = 0;
}
pub fn restore_sequence(&mut self, seq: u16) {
self.sequence = seq;
}
pub fn encode_to_bytes(
&mut self,
data: &RawData,
classification: &Classification,
context: &Context,
) -> Vec<u8> {
let message = self.encode(data, classification, context);
if self.include_checksum {
message.to_bytes_with_checksum()
} else {
message.to_bytes()
}
}
pub fn encode_with_metrics(
&mut self,
data: &RawData,
classification: &Classification,
context: &Context,
metrics: &mut CompressionMetrics,
) -> EncodedMessage {
let message = self.encode(data, classification, context);
if let Some(encoding) = message.encoding_type() {
metrics.record_encode(data.raw_size(), message.len(), encoding);
}
message
}
pub fn encode(
&mut self,
data: &RawData,
classification: &Classification,
context: &Context,
) -> EncodedMessage {
if data.value.is_nan() || data.value.is_infinite() {
return self.encode_raw(data, classification.priority, context);
}
let (encoding_type, encoded_value) = self.choose_encoding(data, context);
let mut payload = Vec::new();
self.encode_varint(data.source_id, &mut payload);
payload.push(encoding_type as u8);
payload.extend(encoded_value);
let header = MessageHeader {
version: crate::PROTOCOL_VERSION,
message_type: MessageType::Data,
priority: classification.priority,
sequence: self.next_sequence(),
timestamp: (data.timestamp / 1000) as u32,
context_version: context.version(),
};
EncodedMessage::new(header, payload)
}
fn encode_raw(
&mut self,
data: &RawData,
priority: Priority,
context: &Context,
) -> EncodedMessage {
let mut payload = Vec::new();
self.encode_varint(data.source_id, &mut payload);
payload.push(EncodingType::Raw64 as u8);
payload.extend_from_slice(&data.value.to_be_bytes());
let header = MessageHeader {
version: crate::PROTOCOL_VERSION,
message_type: MessageType::Data,
priority,
sequence: self.next_sequence(),
timestamp: (data.timestamp / 1000) as u32,
context_version: context.version(),
};
EncodedMessage::new(header, payload)
}
fn choose_encoding(&self, data: &RawData, context: &Context) -> (EncodingType, Vec<u8>) {
if let Some(last) = context.last_value(data.source_id) {
if (data.value - last).abs() < f64::EPSILON {
return (EncodingType::Repeated, vec![]);
}
}
if let Some(prediction) = context.predict(data.source_id) {
let delta = data.value - prediction.value;
let scale = context.scale_factor() as f64;
let raw = delta * scale;
let scaled_delta = if raw >= 0.0 { raw + 0.5 } else { raw - 0.5 };
let scaled_delta = scaled_delta as i64 as f64;
if scaled_delta >= i8::MIN as f64 && scaled_delta <= i8::MAX as f64 {
let delta_i8 = scaled_delta as i8;
return (EncodingType::Delta8, vec![delta_i8 as u8]);
}
if scaled_delta >= i16::MIN as f64 && scaled_delta <= i16::MAX as f64 {
let delta_i16 = scaled_delta as i16;
return (EncodingType::Delta16, delta_i16.to_be_bytes().to_vec());
}
if scaled_delta >= i32::MIN as f64 && scaled_delta <= i32::MAX as f64 {
let delta_i32 = scaled_delta as i32;
return (EncodingType::Delta32, delta_i32.to_be_bytes().to_vec());
}
}
let as_f32 = data.value as f32;
if (as_f32 as f64 - data.value).abs() < 0.0001 {
return (EncodingType::Raw32, as_f32.to_be_bytes().to_vec());
}
(EncodingType::Raw64, data.value.to_be_bytes().to_vec())
}
fn encode_varint(&self, value: u32, output: &mut Vec<u8>) {
let mut v = value;
while v >= 0x80 {
output.push((v as u8 & 0x7F) | 0x80);
v >>= 7;
}
output.push(v as u8);
}
fn next_sequence(&mut self) -> u16 {
let seq = self.sequence;
self.sequence = self.sequence.wrapping_add(1);
seq
}
pub fn encode_multi(
&mut self,
values: &[(u8, f64)], source_id: u32,
timestamp: u64,
priority: Priority,
context: &Context,
) -> EncodedMessage {
let mut payload = Vec::new();
self.encode_varint(source_id, &mut payload);
payload.push(EncodingType::Multi as u8);
payload.push(values.len() as u8);
for (name_id, value) in values {
payload.push(*name_id);
payload.push(EncodingType::Raw32 as u8);
payload.extend_from_slice(&(*value as f32).to_be_bytes());
}
let header = MessageHeader {
version: crate::PROTOCOL_VERSION,
message_type: MessageType::Data,
priority,
sequence: self.next_sequence(),
timestamp: (timestamp / 1000) as u32,
context_version: context.version(),
};
EncodedMessage::new(header, payload)
}
const MULTI_FRAME_CAP: usize = 127;
pub fn encode_multi_adaptive(
&mut self,
channels: &[ChannelInput],
timestamp: u64,
context: &Context,
classifier: &Classifier,
) -> (EncodedMessage, Vec<Classification>) {
let classified: Vec<(&ChannelInput, Classification)> = channels
.iter()
.map(|ch| {
let raw = RawData::with_source(ch.name_id as u32, ch.value, timestamp);
let cls = classifier.classify(&raw, context);
(ch, cls)
})
.collect();
let mut must_include: Vec<(&ChannelInput, &Classification)> = Vec::new(); let mut deferred: Vec<(&ChannelInput, &Classification)> = Vec::new();
for (ch, cls) in &classified {
match cls.priority {
Priority::P1Critical | Priority::P2Important | Priority::P3Normal => {
must_include.push((ch, cls));
}
Priority::P4Deferred => {
deferred.push((ch, cls));
}
Priority::P5Disposable => {} }
}
must_include.sort_by_key(|(_, cls)| cls.priority);
let mut payload = Vec::new();
self.encode_varint(0, &mut payload);
payload.push(EncodingType::Multi as u8);
let count_pos = payload.len();
payload.push(0u8);
let mut included_count: u8 = 0;
for (ch, _cls) in &must_include {
self.write_channel_entry(ch, context, &mut payload);
included_count += 1;
}
let header_overhead = MessageHeader::SIZE;
for (ch, _cls) in &deferred {
let mut tmp = Vec::new();
self.write_channel_entry(ch, context, &mut tmp);
if header_overhead + payload.len() + tmp.len() <= Self::MULTI_FRAME_CAP {
payload.extend(tmp);
included_count += 1;
}
}
payload[count_pos] = included_count;
let header = MessageHeader {
version: crate::PROTOCOL_VERSION,
message_type: MessageType::Data,
priority: must_include
.first()
.map(|(_, cls)| cls.priority)
.unwrap_or(Priority::P3Normal),
sequence: self.next_sequence(),
timestamp: (timestamp / 1000) as u32,
context_version: context.version(),
};
let classifications = classified.into_iter().map(|(_, cls)| cls).collect();
(EncodedMessage::new(header, payload), classifications)
}
fn write_channel_entry(&self, ch: &ChannelInput, context: &Context, payload: &mut Vec<u8>) {
payload.push(ch.name_id);
let data = RawData::with_source(ch.name_id as u32, ch.value, 0);
let (encoding_type, encoded_value) = self.choose_encoding(&data, context);
payload.push(encoding_type as u8);
payload.extend(encoded_value);
}
#[inline]
pub(crate) fn fixed_channel_source_id(channel_index: usize) -> u32 {
(channel_index as u32) + 1
}
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum FixedEncoding {
Repeated = 0b00,
Delta8 = 0b01,
Delta16 = 0b10,
Raw32 = 0b11,
}
impl FixedEncoding {
#[inline]
pub(crate) const fn byte_size(self) -> usize {
match self {
FixedEncoding::Repeated => 0,
FixedEncoding::Delta8 => 1,
FixedEncoding::Delta16 => 2,
FixedEncoding::Raw32 => 4,
}
}
#[inline]
pub(crate) fn from_bits(bits: u8) -> Self {
match bits & 0b11 {
0b00 => FixedEncoding::Repeated,
0b01 => FixedEncoding::Delta8,
0b10 => FixedEncoding::Delta16,
_ => FixedEncoding::Raw32,
}
}
}
#[inline]
pub(crate) const fn fixed_bitmap_bytes(channel_count: usize) -> usize {
(channel_count * 2 + 7) / 8
}
impl Encoder {
pub fn encode_multi_fixed(
&mut self,
values: &[f64],
context: &Context,
keyframe: bool,
output: &mut [u8],
) -> Result<usize> {
if values.is_empty() {
return Err(EncodeError::PayloadTooLarge { size: 0, max: 64 }.into());
}
if values.len() > 64 {
return Err(EncodeError::PayloadTooLarge {
size: values.len(),
max: 64,
}
.into());
}
let count = values.len();
let bitmap_bytes = fixed_bitmap_bytes(count);
let mut encodings: [FixedEncoding; 64] = [FixedEncoding::Raw32; 64];
let mut data_bytes: usize = 0;
let mut per_channel_bytes: [[u8; 4]; 64] = [[0u8; 4]; 64];
let mut per_channel_len: [u8; 64] = [0u8; 64];
for (i, &value) in values.iter().enumerate() {
if value.is_nan() || value.is_infinite() {
let f = value as f32;
let bytes = f.to_be_bytes();
encodings[i] = FixedEncoding::Raw32;
per_channel_bytes[i][..4].copy_from_slice(&bytes);
per_channel_len[i] = 4;
data_bytes += 4;
continue;
}
if keyframe {
let bytes = (value as f32).to_be_bytes();
encodings[i] = FixedEncoding::Raw32;
per_channel_bytes[i][..4].copy_from_slice(&bytes);
per_channel_len[i] = 4;
data_bytes += 4;
continue;
}
let source_id = Self::fixed_channel_source_id(i);
if let Some(last) = context.last_value(source_id) {
if (value - last).abs() < f64::EPSILON {
encodings[i] = FixedEncoding::Repeated;
per_channel_len[i] = 0;
continue;
}
}
if let Some(prediction) = context.predict(source_id) {
let scale = context.scale_factor() as f64;
let raw = (value - prediction.value) * scale;
let rounded = if raw >= 0.0 { raw + 0.5 } else { raw - 0.5 };
let scaled = rounded as i64 as f64;
if scaled >= i8::MIN as f64 && scaled <= i8::MAX as f64 {
let d = scaled as i8;
encodings[i] = FixedEncoding::Delta8;
per_channel_bytes[i][0] = d as u8;
per_channel_len[i] = 1;
data_bytes += 1;
continue;
}
if scaled >= i16::MIN as f64 && scaled <= i16::MAX as f64 {
let d = scaled as i16;
let b = d.to_be_bytes();
encodings[i] = FixedEncoding::Delta16;
per_channel_bytes[i][..2].copy_from_slice(&b);
per_channel_len[i] = 2;
data_bytes += 2;
continue;
}
}
let b = (value as f32).to_be_bytes();
encodings[i] = FixedEncoding::Raw32;
per_channel_bytes[i][..4].copy_from_slice(&b);
per_channel_len[i] = 4;
data_bytes += 4;
}
let total = 1 + CompactHeader::SIZE + bitmap_bytes + data_bytes;
if output.len() < total {
return Err(EncodeError::BufferTooSmall {
needed: total,
available: output.len(),
}
.into());
}
output[0] = if keyframe {
COMPACT_MARKER_KEYFRAME
} else {
COMPACT_MARKER_DATA
};
let header = CompactHeader::new(self.next_sequence(), (context.version() & 0xFFFF) as u16);
header.write(&mut output[1..1 + CompactHeader::SIZE])?;
let bitmap_start = 1 + CompactHeader::SIZE;
for b in &mut output[bitmap_start..bitmap_start + bitmap_bytes] {
*b = 0;
}
for (i, enc) in encodings.iter().take(count).enumerate() {
let byte_idx = bitmap_start + i / 4;
let bit_shift = (i % 4) * 2;
output[byte_idx] |= (*enc as u8) << bit_shift;
}
let mut cursor = bitmap_start + bitmap_bytes;
for i in 0..count {
let len = per_channel_len[i] as usize;
if len > 0 {
output[cursor..cursor + len].copy_from_slice(&per_channel_bytes[i][..len]);
cursor += len;
}
}
debug_assert_eq!(cursor, total);
Ok(total)
}
}
impl Default for Encoder {
fn default() -> Self {
Self::new()
}
}
pub struct MessageBuilder {
header: MessageHeader,
payload: Vec<u8>,
}
impl MessageBuilder {
pub fn new() -> Self {
Self {
header: MessageHeader::default(),
payload: Vec::new(),
}
}
pub fn message_type(mut self, msg_type: MessageType) -> Self {
self.header.message_type = msg_type;
self
}
pub fn priority(mut self, priority: Priority) -> Self {
self.header.priority = priority;
self
}
pub fn sequence(mut self, seq: u16) -> Self {
self.header.sequence = seq;
self
}
pub fn timestamp(mut self, ts: u32) -> Self {
self.header.timestamp = ts;
self
}
pub fn context_version(mut self, version: u32) -> Self {
self.header.context_version = version;
self
}
pub fn payload(mut self, payload: Vec<u8>) -> Self {
self.payload = payload;
self
}
pub fn build(self) -> EncodedMessage {
EncodedMessage::new(self.header, self.payload)
}
}
impl Default for MessageBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::classifier::Classifier;
#[test]
fn test_encode_basic() {
let mut encoder = Encoder::new();
let classifier = Classifier::default();
let context = Context::new();
let data = RawData::new(42.0, 0);
let classification = classifier.classify(&data, &context);
let message = encoder.encode(&data, &classification, &context);
assert!(!message.is_empty());
assert!(message.len() < data.raw_size() + MessageHeader::SIZE);
}
#[test]
fn test_encode_with_context() {
let mut encoder = Encoder::new();
let classifier = Classifier::default();
let mut context = Context::new();
for i in 0..10 {
context.observe(&RawData::new(20.0 + i as f64 * 0.1, i as u64));
}
let data = RawData::new(21.0, 100);
let classification = classifier.classify(&data, &context);
let message = encoder.encode(&data, &classification, &context);
let encoding = message.encoding_type();
assert!(matches!(
encoding,
Some(EncodingType::Delta8) | Some(EncodingType::Delta16)
));
}
#[test]
fn test_encode_repeated() {
let mut encoder = Encoder::new();
let classifier = Classifier::default();
let mut context = Context::new();
context.observe(&RawData::new(42.0, 0));
let data = RawData::new(42.0, 1);
let classification = classifier.classify(&data, &context);
let message = encoder.encode(&data, &classification, &context);
assert_eq!(message.encoding_type(), Some(EncodingType::Repeated));
}
#[test]
fn test_sequence_increment() {
let mut encoder = Encoder::new();
let classifier = Classifier::default();
let context = Context::new();
let data = RawData::new(42.0, 0);
let classification = classifier.classify(&data, &context);
let msg1 = encoder.encode(&data, &classification, &context);
let msg2 = encoder.encode(&data, &classification, &context);
let msg3 = encoder.encode(&data, &classification, &context);
assert_eq!(msg1.header.sequence, 0);
assert_eq!(msg2.header.sequence, 1);
assert_eq!(msg3.header.sequence, 2);
}
#[test]
fn test_encode_nan() {
let mut encoder = Encoder::new();
let classifier = Classifier::default();
let context = Context::new();
let data = RawData::new(f64::NAN, 0);
let classification = classifier.classify(&data, &context);
let message = encoder.encode(&data, &classification, &context);
assert_eq!(message.encoding_type(), Some(EncodingType::Raw64));
}
#[test]
fn test_encode_multi() {
let mut encoder = Encoder::new();
let context = Context::new();
let values: Vec<(u8, f64)> = vec![
(1, 22.5), (2, 65.0), (3, 1013.25), ];
let message = encoder.encode_multi(&values, 42, 12345, Priority::P3Normal, &context);
assert_eq!(message.encoding_type(), Some(EncodingType::Multi));
}
#[test]
fn test_varint_encoding() {
let encoder = Encoder::new();
let mut out1 = Vec::new();
encoder.encode_varint(42, &mut out1);
assert_eq!(out1.len(), 1);
let mut out2 = Vec::new();
encoder.encode_varint(200, &mut out2);
assert_eq!(out2.len(), 2);
let mut out3 = Vec::new();
encoder.encode_varint(100000, &mut out3);
assert!(out3.len() >= 3);
}
#[test]
fn test_message_builder() {
let message = MessageBuilder::new()
.message_type(MessageType::Sync)
.priority(Priority::P1Critical)
.sequence(42)
.timestamp(12345)
.payload(vec![1, 2, 3])
.build();
assert_eq!(message.header.message_type, MessageType::Sync);
assert_eq!(message.header.priority, Priority::P1Critical);
assert_eq!(message.header.sequence, 42);
assert_eq!(message.payload, vec![1, 2, 3]);
}
}