liminal/durability/conversation/
codec.rs1use super::ConversationEvent;
2use crate::durability::DurabilityError;
3
4const EVENT_MAGIC: [u8; 4] = *b"LCE1";
5const TAG_MESSAGE_RECEIVED: u8 = 1;
6const TAG_PROCESSING_STARTED: u8 = 2;
7const TAG_STEP_COMPLETED: u8 = 3;
8const TAG_PROCESSING_FINISHED: u8 = 4;
9const TAG_ERROR_OCCURRED: u8 = 5;
10
11impl ConversationEvent {
12 pub fn serialize(&self) -> Result<Vec<u8>, DurabilityError> {
18 let mut bytes = Vec::new();
19 bytes.extend_from_slice(&EVENT_MAGIC);
20 match self {
21 Self::MessageReceived {
22 message_id,
23 received_at,
24 } => {
25 bytes.push(TAG_MESSAGE_RECEIVED);
26 write_string(&mut bytes, message_id)?;
27 write_u64(&mut bytes, *received_at);
28 }
29 Self::ProcessingStarted { message_id } => {
30 bytes.push(TAG_PROCESSING_STARTED);
31 write_string(&mut bytes, message_id)?;
32 }
33 Self::StepCompleted {
34 message_id,
35 step_index,
36 output,
37 } => {
38 bytes.push(TAG_STEP_COMPLETED);
39 write_string(&mut bytes, message_id)?;
40 write_u32(&mut bytes, *step_index);
41 write_bytes(&mut bytes, output)?;
42 }
43 Self::ProcessingFinished { message_id } => {
44 bytes.push(TAG_PROCESSING_FINISHED);
45 write_string(&mut bytes, message_id)?;
46 }
47 Self::ErrorOccurred { message_id, error } => {
48 bytes.push(TAG_ERROR_OCCURRED);
49 write_string(&mut bytes, message_id)?;
50 write_string(&mut bytes, error)?;
51 }
52 }
53 Ok(bytes)
54 }
55
56 pub fn deserialize(bytes: &[u8]) -> Result<Self, DurabilityError> {
63 let mut reader = EventReader::new(bytes);
64 reader.read_magic()?;
65 let tag = reader.read_u8()?;
66 let event = match tag {
67 TAG_MESSAGE_RECEIVED => Self::MessageReceived {
68 message_id: reader.read_string()?,
69 received_at: reader.read_u64()?,
70 },
71 TAG_PROCESSING_STARTED => Self::ProcessingStarted {
72 message_id: reader.read_string()?,
73 },
74 TAG_STEP_COMPLETED => Self::StepCompleted {
75 message_id: reader.read_string()?,
76 step_index: reader.read_u32()?,
77 output: reader.read_bytes()?.to_vec(),
78 },
79 TAG_PROCESSING_FINISHED => Self::ProcessingFinished {
80 message_id: reader.read_string()?,
81 },
82 TAG_ERROR_OCCURRED => Self::ErrorOccurred {
83 message_id: reader.read_string()?,
84 error: reader.read_string()?,
85 },
86 value => {
87 return Err(DurabilityError::EnvelopeError(format!(
88 "invalid conversation event tag {value}"
89 )));
90 }
91 };
92 reader.finish()?;
93 Ok(event)
94 }
95}
96
97fn write_string(bytes: &mut Vec<u8>, value: &str) -> Result<(), DurabilityError> {
98 write_bytes(bytes, value.as_bytes())
99}
100
101fn write_bytes(bytes: &mut Vec<u8>, value: &[u8]) -> Result<(), DurabilityError> {
102 let len = u64::try_from(value.len()).map_err(|error| {
103 DurabilityError::EnvelopeError(format!("field length cannot be encoded: {error}"))
104 })?;
105 write_u64(bytes, len);
106 bytes.extend_from_slice(value);
107 Ok(())
108}
109
110fn write_u32(bytes: &mut Vec<u8>, value: u32) {
111 bytes.extend_from_slice(&value.to_be_bytes());
112}
113
114fn write_u64(bytes: &mut Vec<u8>, value: u64) {
115 bytes.extend_from_slice(&value.to_be_bytes());
116}
117
118struct EventReader<'a> {
119 bytes: &'a [u8],
120 cursor: usize,
121}
122
123impl<'a> EventReader<'a> {
124 const fn new(bytes: &'a [u8]) -> Self {
125 Self { bytes, cursor: 0 }
126 }
127
128 fn read_magic(&mut self) -> Result<(), DurabilityError> {
129 let magic = self.read_exact(EVENT_MAGIC.len())?;
130 if magic == EVENT_MAGIC {
131 Ok(())
132 } else {
133 Err(DurabilityError::EnvelopeError(
134 "invalid conversation event magic".to_owned(),
135 ))
136 }
137 }
138
139 fn read_string(&mut self) -> Result<String, DurabilityError> {
140 let bytes = self.read_bytes()?;
141 String::from_utf8(bytes.to_vec()).map_err(|error| {
142 DurabilityError::EnvelopeError(format!("invalid UTF-8 string field: {error}"))
143 })
144 }
145
146 fn read_bytes(&mut self) -> Result<&'a [u8], DurabilityError> {
147 let len_u64 = self.read_u64()?;
148 let len = usize::try_from(len_u64).map_err(|error| {
149 DurabilityError::EnvelopeError(format!("encoded length cannot fit memory: {error}"))
150 })?;
151 self.read_exact(len)
152 }
153
154 fn read_u8(&mut self) -> Result<u8, DurabilityError> {
155 let byte = self.read_exact(1)?;
156 Ok(byte[0])
157 }
158
159 fn read_u32(&mut self) -> Result<u32, DurabilityError> {
160 let bytes = self.read_exact(4)?;
161 let mut array = [0_u8; 4];
162 array.copy_from_slice(bytes);
163 Ok(u32::from_be_bytes(array))
164 }
165
166 fn read_u64(&mut self) -> Result<u64, DurabilityError> {
167 let bytes = self.read_exact(8)?;
168 let mut array = [0_u8; 8];
169 array.copy_from_slice(bytes);
170 Ok(u64::from_be_bytes(array))
171 }
172
173 fn read_exact(&mut self, len: usize) -> Result<&'a [u8], DurabilityError> {
174 let end = self.cursor.checked_add(len).ok_or_else(|| {
175 DurabilityError::EnvelopeError("conversation event cursor overflow".to_owned())
176 })?;
177 if end > self.bytes.len() {
178 return Err(DurabilityError::EnvelopeError(
179 "truncated conversation event bytes".to_owned(),
180 ));
181 }
182 let slice = &self.bytes[self.cursor..end];
183 self.cursor = end;
184 Ok(slice)
185 }
186
187 fn finish(&self) -> Result<(), DurabilityError> {
188 if self.cursor == self.bytes.len() {
189 Ok(())
190 } else {
191 Err(DurabilityError::EnvelopeError(
192 "trailing conversation event bytes".to_owned(),
193 ))
194 }
195 }
196}