Skip to main content

middleware_core/
serialization.rs

1use core_types::{ErrorCode, ErrorDomain, RtError};
2use data_model::{ControlEnvelope, DataEnvelope, PacketHeader, SchemaId, SchemaVersion};
3
4const CONTROL_SEPARATOR: u8 = 0;
5
6pub trait SerializationManager {
7    fn encode_control(&self, envelope: &ControlEnvelope) -> Result<Vec<u8>, RtError>;
8    fn decode_control(&self, bytes: &[u8]) -> Result<ControlEnvelope, RtError>;
9    fn encode_data(&self, envelope: &DataEnvelope) -> Result<Vec<u8>, RtError>;
10}
11
12#[derive(Default)]
13pub struct SimpleSerializationManager;
14
15impl SerializationManager for SimpleSerializationManager {
16    fn encode_control(&self, envelope: &ControlEnvelope) -> Result<Vec<u8>, RtError> {
17        let mut bytes = envelope.label.as_bytes().to_vec();
18        bytes.push(CONTROL_SEPARATOR);
19        bytes.extend_from_slice(&envelope.payload);
20        Ok(bytes)
21    }
22
23    fn decode_control(&self, bytes: &[u8]) -> Result<ControlEnvelope, RtError> {
24        let Some(separator) = bytes.iter().position(|b| *b == CONTROL_SEPARATOR) else {
25            return Err(RtError::new(
26                ErrorCode::CodecError,
27                ErrorDomain::DataModel,
28                false,
29                "invalid control payload: missing separator",
30            ));
31        };
32
33        let label = String::from_utf8(bytes[..separator].to_vec()).map_err(|_| {
34            RtError::new(
35                ErrorCode::CodecError,
36                ErrorDomain::DataModel,
37                false,
38                "invalid control payload: label is not utf8",
39            )
40        })?;
41
42        let payload = bytes[separator + 1..].to_vec();
43
44        Ok(ControlEnvelope {
45            header: PacketHeader {
46                version: 1,
47                domain: core_types::TransportDomain::Local,
48                session_id: None,
49                stream_id: None,
50                sequence: 0,
51                ack: None,
52                timestamp: core_types::Timestamp::now(),
53                schema_id: SchemaId::new("decoded.control"),
54                schema_version: SchemaVersion(1),
55            },
56            label,
57            payload,
58        })
59    }
60
61    fn encode_data(&self, envelope: &DataEnvelope) -> Result<Vec<u8>, RtError> {
62        match &envelope.payload {
63            data_model::DataPayload::Inline(buf) => Ok(buf.clone()),
64            data_model::DataPayload::External(external) => {
65                let marker = format!(
66                    "ext:{}:{}:{}",
67                    external.buffer_id.0, external.offset, external.len
68                );
69                Ok(marker.into_bytes())
70            }
71        }
72    }
73}