Skip to main content

liminal/durability/channel/
codec.rs

1use super::{CausalContext, MessageEnvelope};
2use crate::durability::DurabilityError;
3
4const ENVELOPE_MAGIC: [u8; 4] = *b"LME1";
5const ABSENT: u8 = 0;
6const PRESENT: u8 = 1;
7
8impl MessageEnvelope {
9    /// Serializes the envelope into a canonical binary representation.
10    ///
11    /// # Errors
12    ///
13    /// Returns [`DurabilityError::EnvelopeError`] if a field length cannot be
14    /// represented in the storage format.
15    pub fn serialize(&self) -> Result<Vec<u8>, DurabilityError> {
16        let mut bytes = Vec::new();
17        bytes.extend_from_slice(&ENVELOPE_MAGIC);
18        write_bytes(&mut bytes, &self.payload)?;
19        write_optional_causal_context(&mut bytes, self.causal_context.as_ref())?;
20        write_u64(&mut bytes, self.timestamp);
21        write_string(&mut bytes, &self.publisher_id)?;
22        write_optional_string(&mut bytes, self.idempotency_key.as_deref())?;
23        Ok(bytes)
24    }
25
26    /// Deserializes an envelope previously produced by [`Self::serialize`].
27    ///
28    /// # Errors
29    ///
30    /// Returns [`DurabilityError::EnvelopeError`] when bytes are malformed,
31    /// truncated, contain invalid UTF-8, or carry trailing data.
32    pub fn deserialize(bytes: &[u8]) -> Result<Self, DurabilityError> {
33        let mut reader = EnvelopeReader::new(bytes);
34        reader.read_magic()?;
35        let payload = reader.read_bytes()?.to_vec();
36        let causal_context = reader.read_optional_causal_context()?;
37        let timestamp = reader.read_u64()?;
38        let publisher_id = reader.read_string()?;
39        let idempotency_key = reader.read_optional_string()?;
40        reader.finish()?;
41
42        Ok(Self {
43            payload,
44            causal_context,
45            timestamp,
46            publisher_id,
47            idempotency_key,
48        })
49    }
50}
51
52fn write_optional_causal_context(
53    bytes: &mut Vec<u8>,
54    context: Option<&CausalContext>,
55) -> Result<(), DurabilityError> {
56    match context {
57        Some(context) => {
58            bytes.push(PRESENT);
59            write_optional_string(bytes, context.parent_id.as_deref())?;
60            write_optional_u64(bytes, context.vector_clock_entry);
61        }
62        None => bytes.push(ABSENT),
63    }
64    Ok(())
65}
66
67fn write_optional_string(bytes: &mut Vec<u8>, value: Option<&str>) -> Result<(), DurabilityError> {
68    match value {
69        Some(value) => {
70            bytes.push(PRESENT);
71            write_string(bytes, value)?;
72        }
73        None => bytes.push(ABSENT),
74    }
75    Ok(())
76}
77
78fn write_optional_u64(bytes: &mut Vec<u8>, value: Option<u64>) {
79    match value {
80        Some(value) => {
81            bytes.push(PRESENT);
82            write_u64(bytes, value);
83        }
84        None => bytes.push(ABSENT),
85    }
86}
87
88fn write_string(bytes: &mut Vec<u8>, value: &str) -> Result<(), DurabilityError> {
89    write_bytes(bytes, value.as_bytes())
90}
91
92fn write_bytes(bytes: &mut Vec<u8>, value: &[u8]) -> Result<(), DurabilityError> {
93    let len = u64::try_from(value.len()).map_err(|error| {
94        DurabilityError::EnvelopeError(format!("field length cannot be encoded: {error}"))
95    })?;
96    write_u64(bytes, len);
97    bytes.extend_from_slice(value);
98    Ok(())
99}
100
101fn write_u64(bytes: &mut Vec<u8>, value: u64) {
102    bytes.extend_from_slice(&value.to_be_bytes());
103}
104
105struct EnvelopeReader<'a> {
106    bytes: &'a [u8],
107    cursor: usize,
108}
109
110impl<'a> EnvelopeReader<'a> {
111    const fn new(bytes: &'a [u8]) -> Self {
112        Self { bytes, cursor: 0 }
113    }
114
115    fn read_magic(&mut self) -> Result<(), DurabilityError> {
116        let magic = self.read_exact(ENVELOPE_MAGIC.len())?;
117        if magic == ENVELOPE_MAGIC {
118            Ok(())
119        } else {
120            Err(DurabilityError::EnvelopeError(
121                "invalid envelope magic".to_owned(),
122            ))
123        }
124    }
125
126    fn read_optional_causal_context(&mut self) -> Result<Option<CausalContext>, DurabilityError> {
127        if self.read_presence()? {
128            Ok(Some(CausalContext {
129                parent_id: self.read_optional_string()?,
130                vector_clock_entry: self.read_optional_u64()?,
131            }))
132        } else {
133            Ok(None)
134        }
135    }
136
137    fn read_optional_string(&mut self) -> Result<Option<String>, DurabilityError> {
138        if self.read_presence()? {
139            self.read_string().map(Some)
140        } else {
141            Ok(None)
142        }
143    }
144
145    fn read_optional_u64(&mut self) -> Result<Option<u64>, DurabilityError> {
146        if self.read_presence()? {
147            self.read_u64().map(Some)
148        } else {
149            Ok(None)
150        }
151    }
152
153    fn read_string(&mut self) -> Result<String, DurabilityError> {
154        let bytes = self.read_bytes()?;
155        String::from_utf8(bytes.to_vec()).map_err(|error| {
156            DurabilityError::EnvelopeError(format!("invalid UTF-8 string field: {error}"))
157        })
158    }
159
160    fn read_bytes(&mut self) -> Result<&'a [u8], DurabilityError> {
161        let len_u64 = self.read_u64()?;
162        let len = usize::try_from(len_u64).map_err(|error| {
163            DurabilityError::EnvelopeError(format!("encoded length cannot fit memory: {error}"))
164        })?;
165        self.read_exact(len)
166    }
167
168    fn read_presence(&mut self) -> Result<bool, DurabilityError> {
169        match self.read_u8()? {
170            ABSENT => Ok(false),
171            PRESENT => Ok(true),
172            value => Err(DurabilityError::EnvelopeError(format!(
173                "invalid option marker {value}"
174            ))),
175        }
176    }
177
178    fn read_u8(&mut self) -> Result<u8, DurabilityError> {
179        let byte = self.read_exact(1)?;
180        Ok(byte[0])
181    }
182
183    fn read_u64(&mut self) -> Result<u64, DurabilityError> {
184        let bytes = self.read_exact(8)?;
185        let mut array = [0_u8; 8];
186        array.copy_from_slice(bytes);
187        Ok(u64::from_be_bytes(array))
188    }
189
190    fn read_exact(&mut self, len: usize) -> Result<&'a [u8], DurabilityError> {
191        let end = self
192            .cursor
193            .checked_add(len)
194            .ok_or_else(|| DurabilityError::EnvelopeError("envelope cursor overflow".to_owned()))?;
195        if end > self.bytes.len() {
196            return Err(DurabilityError::EnvelopeError(
197                "truncated envelope bytes".to_owned(),
198            ));
199        }
200        let slice = &self.bytes[self.cursor..end];
201        self.cursor = end;
202        Ok(slice)
203    }
204
205    fn finish(&self) -> Result<(), DurabilityError> {
206        if self.cursor == self.bytes.len() {
207            Ok(())
208        } else {
209            Err(DurabilityError::EnvelopeError(
210                "trailing envelope bytes".to_owned(),
211            ))
212        }
213    }
214}