cassandra_protocol/frame/
frame_decoder.rs1use crate::compression::{Compression, CompressionError};
2use crate::crc::{crc24, crc32};
3use crate::error::{Error, Result};
4use crate::frame::{
5 Envelope, ParseEnvelopeError, COMPRESSED_FRAME_HEADER_LENGTH, ENVELOPE_HEADER_LEN,
6 FRAME_TRAILER_LENGTH, MAX_FRAME_SIZE, PAYLOAD_SIZE_LIMIT, UNCOMPRESSED_FRAME_HEADER_LENGTH,
7};
8use lz4_flex::decompress;
9use std::convert::TryInto;
10use std::io;
11
12#[inline]
13fn create_unexpected_self_contained_error() -> Error {
14 "Found self-contained frame while waiting for non self-contained continuation!".into()
15}
16
17#[inline]
18fn create_header_crc_mismatch_error(computed_crc: i32, header_crc24: i32) -> Error {
19 format!("Header CRC mismatch - expected {header_crc24}, found {computed_crc}.",).into()
20}
21
22#[inline]
23fn create_payload_crc_mismatch_error(computed_crc: u32, payload_crc32: u32) -> Error {
24 format!("Payload CRC mismatch - read {payload_crc32}, computed {computed_crc}.",).into()
25}
26
27fn extract_envelopes(buffer: &[u8], compression: Compression) -> Result<(usize, Vec<Envelope>)> {
28 let mut current_pos = 0;
29 let mut envelopes = vec![];
30
31 loop {
32 match Envelope::from_buffer(&buffer[current_pos..], compression) {
33 Ok(envelope) => {
34 envelopes.push(envelope.envelope);
35 current_pos += envelope.envelope_len;
36 }
37 Err(ParseEnvelopeError::NotEnoughBytes) => break,
38 Err(error) => return Err(error.to_string().into()),
39 }
40 }
41
42 Ok((current_pos, envelopes))
43}
44
45fn try_decode_envelopes_with_spare_data(
46 buffer: &mut Vec<u8>,
47 compression: Compression,
48) -> Result<(Vec<Envelope>, Vec<u8>)> {
49 let (current_pos, envelopes) = extract_envelopes(buffer.as_slice(), compression)?;
50 Ok((envelopes, buffer.split_off(current_pos)))
51}
52
53fn try_decode_envelopes_without_spare_data(buffer: &[u8]) -> Result<Vec<Envelope>> {
54 let (_, envelopes) = extract_envelopes(buffer, Compression::None)?;
55 Ok(envelopes)
56}
57
58pub trait FrameDecoder {
61 fn consume(&mut self, data: &mut Vec<u8>, compression: Compression) -> Result<Vec<Envelope>>;
65}
66
67#[derive(Clone, Debug)]
69pub struct LegacyFrameDecoder {
70 buffer: Vec<u8>,
71}
72
73impl Default for LegacyFrameDecoder {
74 fn default() -> Self {
75 Self {
76 buffer: Vec::with_capacity(MAX_FRAME_SIZE),
77 }
78 }
79}
80
81impl FrameDecoder for LegacyFrameDecoder {
82 fn consume(&mut self, data: &mut Vec<u8>, compression: Compression) -> Result<Vec<Envelope>> {
83 if self.buffer.is_empty() {
84 let (envelopes, buffer) = try_decode_envelopes_with_spare_data(data, compression)?;
86
87 self.buffer = buffer;
88 data.clear();
89
90 return Ok(envelopes);
91 }
92
93 self.buffer.append(data);
94
95 let (envelopes, buffer) =
96 try_decode_envelopes_with_spare_data(&mut self.buffer, compression)?;
97
98 self.buffer = buffer;
99 Ok(envelopes)
100 }
101}
102
103#[derive(Clone, Debug, Default)]
105pub struct Lz4FrameDecoder {
106 inner_decoder: GenericFrameDecoder,
107}
108
109impl FrameDecoder for Lz4FrameDecoder {
110 #[inline]
112 fn consume(&mut self, data: &mut Vec<u8>, _compression: Compression) -> Result<Vec<Envelope>> {
113 self.inner_decoder.consume(data, Self::try_decode_frame)
114 }
115}
116
117impl Lz4FrameDecoder {
118 fn try_decode_frame(buffer: &mut Vec<u8>) -> Result<Option<(bool, Vec<u8>)>> {
119 let buffer_len = buffer.len();
120 if buffer_len < COMPRESSED_FRAME_HEADER_LENGTH {
121 return Ok(None);
122 }
123
124 let header =
125 i64::from_le_bytes(buffer[..COMPRESSED_FRAME_HEADER_LENGTH].try_into().unwrap());
126
127 let header_crc24 = ((header >> 40) & 0xffffff) as i32;
128 let computed_crc = crc24(&header.to_le_bytes()[..5]);
129
130 if header_crc24 != computed_crc {
131 return Err(create_header_crc_mismatch_error(computed_crc, header_crc24));
132 }
133
134 let compressed_length = (header & 0x1ffff) as usize;
135 let compressed_payload_end = compressed_length + COMPRESSED_FRAME_HEADER_LENGTH;
136
137 let frame_end = compressed_payload_end + FRAME_TRAILER_LENGTH;
138 if buffer_len < frame_end {
139 return Ok(None);
140 }
141
142 let compressed_payload_crc32 = u32::from_le_bytes(
143 buffer[compressed_payload_end..frame_end]
144 .try_into()
145 .unwrap(),
146 );
147
148 let computed_crc = crc32(&buffer[COMPRESSED_FRAME_HEADER_LENGTH..compressed_payload_end]);
149
150 if compressed_payload_crc32 != computed_crc {
151 return Err(create_payload_crc_mismatch_error(
152 computed_crc,
153 compressed_payload_crc32,
154 ));
155 }
156
157 let self_contained = (header & (1 << 34)) != 0;
158 let uncompressed_length = ((header >> 17) & 0x1ffff) as usize;
159
160 if uncompressed_length == 0 {
161 let payload = buffer[COMPRESSED_FRAME_HEADER_LENGTH..compressed_payload_end].into();
165 *buffer = buffer.split_off(frame_end);
166
167 return Ok(Some((self_contained, payload)));
168 }
169
170 decompress(
171 &buffer[COMPRESSED_FRAME_HEADER_LENGTH..compressed_payload_end],
172 uncompressed_length,
173 )
174 .map_err(|error| CompressionError::Lz4(io::Error::new(io::ErrorKind::Other, error)).into())
175 .map(|payload| {
176 *buffer = buffer.split_off(frame_end);
177 Some((self_contained, payload))
178 })
179 }
180}
181
182#[derive(Clone, Debug, Default)]
184pub struct UncompressedFrameDecoder {
185 inner_decoder: GenericFrameDecoder,
186}
187
188impl FrameDecoder for UncompressedFrameDecoder {
189 #[inline]
191 fn consume(&mut self, data: &mut Vec<u8>, _compression: Compression) -> Result<Vec<Envelope>> {
192 self.inner_decoder.consume(data, Self::try_decode_frame)
193 }
194}
195
196impl UncompressedFrameDecoder {
197 fn try_decode_frame(buffer: &mut Vec<u8>) -> Result<Option<(bool, Vec<u8>)>> {
198 let buffer_len = buffer.len();
199 if buffer_len < UNCOMPRESSED_FRAME_HEADER_LENGTH {
200 return Ok(None);
201 }
202
203 let header = if buffer_len >= 8 {
204 i64::from_le_bytes(buffer[..8].try_into().unwrap()) & 0xffffffffffff
205 } else {
206 let mut header = 0;
207 for (i, byte) in buffer[..UNCOMPRESSED_FRAME_HEADER_LENGTH]
208 .iter()
209 .enumerate()
210 {
211 header |= (*byte as i64) << (8 * i as i64);
212 }
213
214 header
215 };
216
217 let header_crc24 = ((header >> 24) & 0xffffff) as i32;
218 let computed_crc = crc24(&header.to_le_bytes()[..3]);
219
220 if header_crc24 != computed_crc {
221 return Err(create_header_crc_mismatch_error(computed_crc, header_crc24));
222 }
223
224 let payload_length = (header & 0x1ffff) as usize;
225 let payload_end = UNCOMPRESSED_FRAME_HEADER_LENGTH + payload_length;
226
227 let frame_end = payload_end + FRAME_TRAILER_LENGTH;
228 if buffer_len < frame_end {
229 return Ok(None);
230 }
231
232 let payload_crc32 = u32::from_le_bytes(buffer[payload_end..frame_end].try_into().unwrap());
233
234 let computed_crc = crc32(&buffer[UNCOMPRESSED_FRAME_HEADER_LENGTH..payload_end]);
235 if payload_crc32 != computed_crc {
236 return Err(create_payload_crc_mismatch_error(
237 computed_crc,
238 payload_crc32,
239 ));
240 }
241
242 let self_contained = (header & (1 << 17)) != 0;
243
244 let payload = buffer[UNCOMPRESSED_FRAME_HEADER_LENGTH..payload_end].into();
245 *buffer = buffer.split_off(frame_end);
246
247 Ok(Some((self_contained, payload)))
248 }
249}
250
251#[derive(Clone, Debug)]
252struct GenericFrameDecoder {
253 frame_buffer: Vec<u8>,
254 payload_buffer: Vec<u8>,
255 expected_payload_len: Option<usize>,
256}
257
258impl Default for GenericFrameDecoder {
259 fn default() -> Self {
260 Self {
261 frame_buffer: Vec::with_capacity(MAX_FRAME_SIZE),
262 payload_buffer: Vec::with_capacity(PAYLOAD_SIZE_LIMIT * 2),
263 expected_payload_len: None,
264 }
265 }
266}
267
268impl GenericFrameDecoder {
269 fn extract_non_self_contained_envelopes(&mut self) -> Result<Vec<Envelope>> {
270 if let Some(expected_payload_len) = self.expected_payload_len {
271 if self.payload_buffer.len() < expected_payload_len {
272 return Ok(vec![]);
273 }
274
275 let envelopes = try_decode_envelopes_without_spare_data(&self.payload_buffer)?;
276
277 self.payload_buffer.clear();
278 return Ok(envelopes);
279 }
280
281 if let Some(expected_payload_len) = self.extract_expected_payload_len() {
282 self.expected_payload_len = Some(expected_payload_len);
283 self.extract_non_self_contained_envelopes()
284 } else {
285 Ok(vec![])
286 }
287 }
288
289 fn extract_expected_payload_len(&self) -> Option<usize> {
290 if self.payload_buffer.len() < ENVELOPE_HEADER_LEN {
291 return None;
292 }
293
294 Some(i32::from_be_bytes(self.payload_buffer[5..9].try_into().unwrap()) as usize)
295 }
296
297 fn handle_frame(
298 &mut self,
299 envelopes: &mut Vec<Envelope>,
300 self_contained: bool,
301 frame: &mut Vec<u8>,
302 ) -> Result<()> {
303 if self_contained {
304 if !self.payload_buffer.is_empty() {
305 return Err(create_unexpected_self_contained_error());
306 }
307
308 envelopes.append(&mut try_decode_envelopes_without_spare_data(frame)?);
309 } else {
310 self.payload_buffer.append(frame);
311 envelopes.append(&mut self.extract_non_self_contained_envelopes()?);
312 }
313
314 Ok(())
315 }
316
317 fn consume(
318 &mut self,
319 data: &mut Vec<u8>,
320 try_decode_frame: impl Fn(&mut Vec<u8>) -> Result<Option<(bool, Vec<u8>)>>,
321 ) -> Result<Vec<Envelope>> {
322 let mut envelopes = vec![];
323
324 if self.frame_buffer.is_empty() {
325 while !data.is_empty() {
327 if let Some((self_contained, mut frame)) = try_decode_frame(data)? {
328 self.handle_frame(&mut envelopes, self_contained, &mut frame)?;
329 } else {
330 self.frame_buffer.append(data);
332 break;
333 }
334 }
335 } else {
336 self.frame_buffer.append(data);
337
338 while !self.frame_buffer.is_empty() {
339 if let Some((self_contained, mut frame)) = try_decode_frame(&mut self.frame_buffer)?
340 {
341 self.handle_frame(&mut envelopes, self_contained, &mut frame)?;
342 } else {
343 break;
344 }
345 }
346 }
347
348 Ok(envelopes)
349 }
350}