liminal/durability/channel/
codec.rs1use super::{CausalContext, MessageEnvelope};
2use crate::durability::DurabilityError;
3
4const ENVELOPE_MAGIC: [u8; 4] = *b"LME1";
5const ABSENT: u8 = 0;
6const PRESENT: u8 = 1;
7
8impl MessageEnvelope {
9 pub fn serialize(&self) -> Result<Vec<u8>, DurabilityError> {
16 let mut bytes = Vec::new();
17 bytes.extend_from_slice(&ENVELOPE_MAGIC);
18 write_bytes(&mut bytes, &self.payload)?;
19 write_optional_causal_context(&mut bytes, self.causal_context.as_ref())?;
20 write_u64(&mut bytes, self.timestamp);
21 write_string(&mut bytes, &self.publisher_id)?;
22 write_optional_string(&mut bytes, self.idempotency_key.as_deref())?;
23 Ok(bytes)
24 }
25
26 pub fn deserialize(bytes: &[u8]) -> Result<Self, DurabilityError> {
33 let mut reader = EnvelopeReader::new(bytes);
34 reader.read_magic()?;
35 let payload = reader.read_bytes()?.to_vec();
36 let causal_context = reader.read_optional_causal_context()?;
37 let timestamp = reader.read_u64()?;
38 let publisher_id = reader.read_string()?;
39 let idempotency_key = reader.read_optional_string()?;
40 reader.finish()?;
41
42 Ok(Self {
43 payload,
44 causal_context,
45 timestamp,
46 publisher_id,
47 idempotency_key,
48 })
49 }
50}
51
52fn write_optional_causal_context(
53 bytes: &mut Vec<u8>,
54 context: Option<&CausalContext>,
55) -> Result<(), DurabilityError> {
56 match context {
57 Some(context) => {
58 bytes.push(PRESENT);
59 write_optional_string(bytes, context.parent_id.as_deref())?;
60 write_optional_u64(bytes, context.vector_clock_entry);
61 }
62 None => bytes.push(ABSENT),
63 }
64 Ok(())
65}
66
67fn write_optional_string(bytes: &mut Vec<u8>, value: Option<&str>) -> Result<(), DurabilityError> {
68 match value {
69 Some(value) => {
70 bytes.push(PRESENT);
71 write_string(bytes, value)?;
72 }
73 None => bytes.push(ABSENT),
74 }
75 Ok(())
76}
77
78fn write_optional_u64(bytes: &mut Vec<u8>, value: Option<u64>) {
79 match value {
80 Some(value) => {
81 bytes.push(PRESENT);
82 write_u64(bytes, value);
83 }
84 None => bytes.push(ABSENT),
85 }
86}
87
88fn write_string(bytes: &mut Vec<u8>, value: &str) -> Result<(), DurabilityError> {
89 write_bytes(bytes, value.as_bytes())
90}
91
92fn write_bytes(bytes: &mut Vec<u8>, value: &[u8]) -> Result<(), DurabilityError> {
93 let len = u64::try_from(value.len()).map_err(|error| {
94 DurabilityError::EnvelopeError(format!("field length cannot be encoded: {error}"))
95 })?;
96 write_u64(bytes, len);
97 bytes.extend_from_slice(value);
98 Ok(())
99}
100
101fn write_u64(bytes: &mut Vec<u8>, value: u64) {
102 bytes.extend_from_slice(&value.to_be_bytes());
103}
104
105struct EnvelopeReader<'a> {
106 bytes: &'a [u8],
107 cursor: usize,
108}
109
110impl<'a> EnvelopeReader<'a> {
111 const fn new(bytes: &'a [u8]) -> Self {
112 Self { bytes, cursor: 0 }
113 }
114
115 fn read_magic(&mut self) -> Result<(), DurabilityError> {
116 let magic = self.read_exact(ENVELOPE_MAGIC.len())?;
117 if magic == ENVELOPE_MAGIC {
118 Ok(())
119 } else {
120 Err(DurabilityError::EnvelopeError(
121 "invalid envelope magic".to_owned(),
122 ))
123 }
124 }
125
126 fn read_optional_causal_context(&mut self) -> Result<Option<CausalContext>, DurabilityError> {
127 if self.read_presence()? {
128 Ok(Some(CausalContext {
129 parent_id: self.read_optional_string()?,
130 vector_clock_entry: self.read_optional_u64()?,
131 }))
132 } else {
133 Ok(None)
134 }
135 }
136
137 fn read_optional_string(&mut self) -> Result<Option<String>, DurabilityError> {
138 if self.read_presence()? {
139 self.read_string().map(Some)
140 } else {
141 Ok(None)
142 }
143 }
144
145 fn read_optional_u64(&mut self) -> Result<Option<u64>, DurabilityError> {
146 if self.read_presence()? {
147 self.read_u64().map(Some)
148 } else {
149 Ok(None)
150 }
151 }
152
153 fn read_string(&mut self) -> Result<String, DurabilityError> {
154 let bytes = self.read_bytes()?;
155 String::from_utf8(bytes.to_vec()).map_err(|error| {
156 DurabilityError::EnvelopeError(format!("invalid UTF-8 string field: {error}"))
157 })
158 }
159
160 fn read_bytes(&mut self) -> Result<&'a [u8], DurabilityError> {
161 let len_u64 = self.read_u64()?;
162 let len = usize::try_from(len_u64).map_err(|error| {
163 DurabilityError::EnvelopeError(format!("encoded length cannot fit memory: {error}"))
164 })?;
165 self.read_exact(len)
166 }
167
168 fn read_presence(&mut self) -> Result<bool, DurabilityError> {
169 match self.read_u8()? {
170 ABSENT => Ok(false),
171 PRESENT => Ok(true),
172 value => Err(DurabilityError::EnvelopeError(format!(
173 "invalid option marker {value}"
174 ))),
175 }
176 }
177
178 fn read_u8(&mut self) -> Result<u8, DurabilityError> {
179 let byte = self.read_exact(1)?;
180 Ok(byte[0])
181 }
182
183 fn read_u64(&mut self) -> Result<u64, DurabilityError> {
184 let bytes = self.read_exact(8)?;
185 let mut array = [0_u8; 8];
186 array.copy_from_slice(bytes);
187 Ok(u64::from_be_bytes(array))
188 }
189
190 fn read_exact(&mut self, len: usize) -> Result<&'a [u8], DurabilityError> {
191 let end = self
192 .cursor
193 .checked_add(len)
194 .ok_or_else(|| DurabilityError::EnvelopeError("envelope cursor overflow".to_owned()))?;
195 if end > self.bytes.len() {
196 return Err(DurabilityError::EnvelopeError(
197 "truncated envelope bytes".to_owned(),
198 ));
199 }
200 let slice = &self.bytes[self.cursor..end];
201 self.cursor = end;
202 Ok(slice)
203 }
204
205 fn finish(&self) -> Result<(), DurabilityError> {
206 if self.cursor == self.bytes.len() {
207 Ok(())
208 } else {
209 Err(DurabilityError::EnvelopeError(
210 "trailing envelope bytes".to_owned(),
211 ))
212 }
213 }
214}