Skip to main content

liminal/durability/conversation/
codec.rs

1use super::ConversationEvent;
2use crate::durability::DurabilityError;
3
4const EVENT_MAGIC: [u8; 4] = *b"LCE1";
5const TAG_MESSAGE_RECEIVED: u8 = 1;
6const TAG_PROCESSING_STARTED: u8 = 2;
7const TAG_STEP_COMPLETED: u8 = 3;
8const TAG_PROCESSING_FINISHED: u8 = 4;
9const TAG_ERROR_OCCURRED: u8 = 5;
10
11impl ConversationEvent {
12    /// Serializes this event into deterministic storage bytes.
13    ///
14    /// # Errors
15    ///
16    /// Returns [`DurabilityError::EnvelopeError`] when a field length cannot be encoded.
17    pub fn serialize(&self) -> Result<Vec<u8>, DurabilityError> {
18        let mut bytes = Vec::new();
19        bytes.extend_from_slice(&EVENT_MAGIC);
20        match self {
21            Self::MessageReceived {
22                message_id,
23                received_at,
24            } => {
25                bytes.push(TAG_MESSAGE_RECEIVED);
26                write_string(&mut bytes, message_id)?;
27                write_u64(&mut bytes, *received_at);
28            }
29            Self::ProcessingStarted { message_id } => {
30                bytes.push(TAG_PROCESSING_STARTED);
31                write_string(&mut bytes, message_id)?;
32            }
33            Self::StepCompleted {
34                message_id,
35                step_index,
36                output,
37            } => {
38                bytes.push(TAG_STEP_COMPLETED);
39                write_string(&mut bytes, message_id)?;
40                write_u32(&mut bytes, *step_index);
41                write_bytes(&mut bytes, output)?;
42            }
43            Self::ProcessingFinished { message_id } => {
44                bytes.push(TAG_PROCESSING_FINISHED);
45                write_string(&mut bytes, message_id)?;
46            }
47            Self::ErrorOccurred { message_id, error } => {
48                bytes.push(TAG_ERROR_OCCURRED);
49                write_string(&mut bytes, message_id)?;
50                write_string(&mut bytes, error)?;
51            }
52        }
53        Ok(bytes)
54    }
55
56    /// Deserializes an event previously produced by [`Self::serialize`].
57    ///
58    /// # Errors
59    ///
60    /// Returns [`DurabilityError::EnvelopeError`] when bytes are malformed,
61    /// truncated, contain invalid UTF-8, use an unknown tag, or carry trailing data.
62    pub fn deserialize(bytes: &[u8]) -> Result<Self, DurabilityError> {
63        let mut reader = EventReader::new(bytes);
64        reader.read_magic()?;
65        let tag = reader.read_u8()?;
66        let event = match tag {
67            TAG_MESSAGE_RECEIVED => Self::MessageReceived {
68                message_id: reader.read_string()?,
69                received_at: reader.read_u64()?,
70            },
71            TAG_PROCESSING_STARTED => Self::ProcessingStarted {
72                message_id: reader.read_string()?,
73            },
74            TAG_STEP_COMPLETED => Self::StepCompleted {
75                message_id: reader.read_string()?,
76                step_index: reader.read_u32()?,
77                output: reader.read_bytes()?.to_vec(),
78            },
79            TAG_PROCESSING_FINISHED => Self::ProcessingFinished {
80                message_id: reader.read_string()?,
81            },
82            TAG_ERROR_OCCURRED => Self::ErrorOccurred {
83                message_id: reader.read_string()?,
84                error: reader.read_string()?,
85            },
86            value => {
87                return Err(DurabilityError::EnvelopeError(format!(
88                    "invalid conversation event tag {value}"
89                )));
90            }
91        };
92        reader.finish()?;
93        Ok(event)
94    }
95}
96
97fn write_string(bytes: &mut Vec<u8>, value: &str) -> Result<(), DurabilityError> {
98    write_bytes(bytes, value.as_bytes())
99}
100
101fn write_bytes(bytes: &mut Vec<u8>, value: &[u8]) -> Result<(), DurabilityError> {
102    let len = u64::try_from(value.len()).map_err(|error| {
103        DurabilityError::EnvelopeError(format!("field length cannot be encoded: {error}"))
104    })?;
105    write_u64(bytes, len);
106    bytes.extend_from_slice(value);
107    Ok(())
108}
109
110fn write_u32(bytes: &mut Vec<u8>, value: u32) {
111    bytes.extend_from_slice(&value.to_be_bytes());
112}
113
114fn write_u64(bytes: &mut Vec<u8>, value: u64) {
115    bytes.extend_from_slice(&value.to_be_bytes());
116}
117
118struct EventReader<'a> {
119    bytes: &'a [u8],
120    cursor: usize,
121}
122
123impl<'a> EventReader<'a> {
124    const fn new(bytes: &'a [u8]) -> Self {
125        Self { bytes, cursor: 0 }
126    }
127
128    fn read_magic(&mut self) -> Result<(), DurabilityError> {
129        let magic = self.read_exact(EVENT_MAGIC.len())?;
130        if magic == EVENT_MAGIC {
131            Ok(())
132        } else {
133            Err(DurabilityError::EnvelopeError(
134                "invalid conversation event magic".to_owned(),
135            ))
136        }
137    }
138
139    fn read_string(&mut self) -> Result<String, DurabilityError> {
140        let bytes = self.read_bytes()?;
141        String::from_utf8(bytes.to_vec()).map_err(|error| {
142            DurabilityError::EnvelopeError(format!("invalid UTF-8 string field: {error}"))
143        })
144    }
145
146    fn read_bytes(&mut self) -> Result<&'a [u8], DurabilityError> {
147        let len_u64 = self.read_u64()?;
148        let len = usize::try_from(len_u64).map_err(|error| {
149            DurabilityError::EnvelopeError(format!("encoded length cannot fit memory: {error}"))
150        })?;
151        self.read_exact(len)
152    }
153
154    fn read_u8(&mut self) -> Result<u8, DurabilityError> {
155        let byte = self.read_exact(1)?;
156        Ok(byte[0])
157    }
158
159    fn read_u32(&mut self) -> Result<u32, DurabilityError> {
160        let bytes = self.read_exact(4)?;
161        let mut array = [0_u8; 4];
162        array.copy_from_slice(bytes);
163        Ok(u32::from_be_bytes(array))
164    }
165
166    fn read_u64(&mut self) -> Result<u64, DurabilityError> {
167        let bytes = self.read_exact(8)?;
168        let mut array = [0_u8; 8];
169        array.copy_from_slice(bytes);
170        Ok(u64::from_be_bytes(array))
171    }
172
173    fn read_exact(&mut self, len: usize) -> Result<&'a [u8], DurabilityError> {
174        let end = self.cursor.checked_add(len).ok_or_else(|| {
175            DurabilityError::EnvelopeError("conversation event cursor overflow".to_owned())
176        })?;
177        if end > self.bytes.len() {
178            return Err(DurabilityError::EnvelopeError(
179                "truncated conversation event bytes".to_owned(),
180            ));
181        }
182        let slice = &self.bytes[self.cursor..end];
183        self.cursor = end;
184        Ok(slice)
185    }
186
187    fn finish(&self) -> Result<(), DurabilityError> {
188        if self.cursor == self.bytes.len() {
189            Ok(())
190        } else {
191            Err(DurabilityError::EnvelopeError(
192                "trailing conversation event bytes".to_owned(),
193            ))
194        }
195    }
196}