cassandra_protocol/frame/
frame_decoder.rs

1use crate::compression::{Compression, CompressionError};
2use crate::crc::{crc24, crc32};
3use crate::error::{Error, Result};
4use crate::frame::{
5    Envelope, ParseEnvelopeError, COMPRESSED_FRAME_HEADER_LENGTH, ENVELOPE_HEADER_LEN,
6    FRAME_TRAILER_LENGTH, MAX_FRAME_SIZE, PAYLOAD_SIZE_LIMIT, UNCOMPRESSED_FRAME_HEADER_LENGTH,
7};
8use lz4_flex::decompress;
9use std::convert::TryInto;
10use std::io;
11
12#[inline]
13fn create_unexpected_self_contained_error() -> Error {
14    "Found self-contained frame while waiting for non self-contained continuation!".into()
15}
16
17#[inline]
18fn create_header_crc_mismatch_error(computed_crc: i32, header_crc24: i32) -> Error {
19    format!("Header CRC mismatch - expected {header_crc24}, found {computed_crc}.",).into()
20}
21
22#[inline]
23fn create_payload_crc_mismatch_error(computed_crc: u32, payload_crc32: u32) -> Error {
24    format!("Payload CRC mismatch - read {payload_crc32}, computed {computed_crc}.",).into()
25}
26
27fn extract_envelopes(buffer: &[u8], compression: Compression) -> Result<(usize, Vec<Envelope>)> {
28    let mut current_pos = 0;
29    let mut envelopes = vec![];
30
31    loop {
32        match Envelope::from_buffer(&buffer[current_pos..], compression) {
33            Ok(envelope) => {
34                envelopes.push(envelope.envelope);
35                current_pos += envelope.envelope_len;
36            }
37            Err(ParseEnvelopeError::NotEnoughBytes) => break,
38            Err(error) => return Err(error.to_string().into()),
39        }
40    }
41
42    Ok((current_pos, envelopes))
43}
44
45fn try_decode_envelopes_with_spare_data(
46    buffer: &mut Vec<u8>,
47    compression: Compression,
48) -> Result<(Vec<Envelope>, Vec<u8>)> {
49    let (current_pos, envelopes) = extract_envelopes(buffer.as_slice(), compression)?;
50    Ok((envelopes, buffer.split_off(current_pos)))
51}
52
53fn try_decode_envelopes_without_spare_data(buffer: &[u8]) -> Result<Vec<Envelope>> {
54    let (_, envelopes) = extract_envelopes(buffer, Compression::None)?;
55    Ok(envelopes)
56}
57
58/// A decoder for frames. Since protocol v5, frames became "envelopes" and a frame now can contain
59/// multiple complete envelopes (self-contained frame) or a part of one bigger envelope.
60pub trait FrameDecoder {
61    /// Consumes some data and returns decoded envelopes. Decoders can be stateful, so data can be
62    /// buffered until envelopes can be parsed.
63    /// The buffer passed in should be cleared of consumed data by the decoder.
64    fn consume(&mut self, data: &mut Vec<u8>, compression: Compression) -> Result<Vec<Envelope>>;
65}
66
67/// Pre-V5 frame decoder which simply decodes one envelope directly into a buffer.
68#[derive(Clone, Debug)]
69pub struct LegacyFrameDecoder {
70    buffer: Vec<u8>,
71}
72
73impl Default for LegacyFrameDecoder {
74    fn default() -> Self {
75        Self {
76            buffer: Vec::with_capacity(MAX_FRAME_SIZE),
77        }
78    }
79}
80
81impl FrameDecoder for LegacyFrameDecoder {
82    fn consume(&mut self, data: &mut Vec<u8>, compression: Compression) -> Result<Vec<Envelope>> {
83        if self.buffer.is_empty() {
84            // optimistic case
85            let (envelopes, buffer) = try_decode_envelopes_with_spare_data(data, compression)?;
86
87            self.buffer = buffer;
88            data.clear();
89
90            return Ok(envelopes);
91        }
92
93        self.buffer.append(data);
94
95        let (envelopes, buffer) =
96            try_decode_envelopes_with_spare_data(&mut self.buffer, compression)?;
97
98        self.buffer = buffer;
99        Ok(envelopes)
100    }
101}
102
103/// Post-V5 Lz4 decoder with support for envelope frames with CRC checksum.
104#[derive(Clone, Debug, Default)]
105pub struct Lz4FrameDecoder {
106    inner_decoder: GenericFrameDecoder,
107}
108
109impl FrameDecoder for Lz4FrameDecoder {
110    //noinspection DuplicatedCode
111    #[inline]
112    fn consume(&mut self, data: &mut Vec<u8>, _compression: Compression) -> Result<Vec<Envelope>> {
113        self.inner_decoder.consume(data, Self::try_decode_frame)
114    }
115}
116
117impl Lz4FrameDecoder {
118    fn try_decode_frame(buffer: &mut Vec<u8>) -> Result<Option<(bool, Vec<u8>)>> {
119        let buffer_len = buffer.len();
120        if buffer_len < COMPRESSED_FRAME_HEADER_LENGTH {
121            return Ok(None);
122        }
123
124        let header =
125            i64::from_le_bytes(buffer[..COMPRESSED_FRAME_HEADER_LENGTH].try_into().unwrap());
126
127        let header_crc24 = ((header >> 40) & 0xffffff) as i32;
128        let computed_crc = crc24(&header.to_le_bytes()[..5]);
129
130        if header_crc24 != computed_crc {
131            return Err(create_header_crc_mismatch_error(computed_crc, header_crc24));
132        }
133
134        let compressed_length = (header & 0x1ffff) as usize;
135        let compressed_payload_end = compressed_length + COMPRESSED_FRAME_HEADER_LENGTH;
136
137        let frame_end = compressed_payload_end + FRAME_TRAILER_LENGTH;
138        if buffer_len < frame_end {
139            return Ok(None);
140        }
141
142        let compressed_payload_crc32 = u32::from_le_bytes(
143            buffer[compressed_payload_end..frame_end]
144                .try_into()
145                .unwrap(),
146        );
147
148        let computed_crc = crc32(&buffer[COMPRESSED_FRAME_HEADER_LENGTH..compressed_payload_end]);
149
150        if compressed_payload_crc32 != computed_crc {
151            return Err(create_payload_crc_mismatch_error(
152                computed_crc,
153                compressed_payload_crc32,
154            ));
155        }
156
157        let self_contained = (header & (1 << 34)) != 0;
158        let uncompressed_length = ((header >> 17) & 0x1ffff) as usize;
159
160        if uncompressed_length == 0 {
161            // protocol spec 2.2:
162            // An uncompressed length of 0 signals that the compressed payload should be used as-is
163            // and not decompressed.
164            let payload = buffer[COMPRESSED_FRAME_HEADER_LENGTH..compressed_payload_end].into();
165            *buffer = buffer.split_off(frame_end);
166
167            return Ok(Some((self_contained, payload)));
168        }
169
170        decompress(
171            &buffer[COMPRESSED_FRAME_HEADER_LENGTH..compressed_payload_end],
172            uncompressed_length,
173        )
174        .map_err(|error| CompressionError::Lz4(io::Error::new(io::ErrorKind::Other, error)).into())
175        .map(|payload| {
176            *buffer = buffer.split_off(frame_end);
177            Some((self_contained, payload))
178        })
179    }
180}
181
182/// Post-V5 decoder with support for envelope frames with CRC checksum.
183#[derive(Clone, Debug, Default)]
184pub struct UncompressedFrameDecoder {
185    inner_decoder: GenericFrameDecoder,
186}
187
188impl FrameDecoder for UncompressedFrameDecoder {
189    //noinspection DuplicatedCode
190    #[inline]
191    fn consume(&mut self, data: &mut Vec<u8>, _compression: Compression) -> Result<Vec<Envelope>> {
192        self.inner_decoder.consume(data, Self::try_decode_frame)
193    }
194}
195
196impl UncompressedFrameDecoder {
197    fn try_decode_frame(buffer: &mut Vec<u8>) -> Result<Option<(bool, Vec<u8>)>> {
198        let buffer_len = buffer.len();
199        if buffer_len < UNCOMPRESSED_FRAME_HEADER_LENGTH {
200            return Ok(None);
201        }
202
203        let header = if buffer_len >= 8 {
204            i64::from_le_bytes(buffer[..8].try_into().unwrap()) & 0xffffffffffff
205        } else {
206            let mut header = 0;
207            for (i, byte) in buffer[..UNCOMPRESSED_FRAME_HEADER_LENGTH]
208                .iter()
209                .enumerate()
210            {
211                header |= (*byte as i64) << (8 * i as i64);
212            }
213
214            header
215        };
216
217        let header_crc24 = ((header >> 24) & 0xffffff) as i32;
218        let computed_crc = crc24(&header.to_le_bytes()[..3]);
219
220        if header_crc24 != computed_crc {
221            return Err(create_header_crc_mismatch_error(computed_crc, header_crc24));
222        }
223
224        let payload_length = (header & 0x1ffff) as usize;
225        let payload_end = UNCOMPRESSED_FRAME_HEADER_LENGTH + payload_length;
226
227        let frame_end = payload_end + FRAME_TRAILER_LENGTH;
228        if buffer_len < frame_end {
229            return Ok(None);
230        }
231
232        let payload_crc32 = u32::from_le_bytes(buffer[payload_end..frame_end].try_into().unwrap());
233
234        let computed_crc = crc32(&buffer[UNCOMPRESSED_FRAME_HEADER_LENGTH..payload_end]);
235        if payload_crc32 != computed_crc {
236            return Err(create_payload_crc_mismatch_error(
237                computed_crc,
238                payload_crc32,
239            ));
240        }
241
242        let self_contained = (header & (1 << 17)) != 0;
243
244        let payload = buffer[UNCOMPRESSED_FRAME_HEADER_LENGTH..payload_end].into();
245        *buffer = buffer.split_off(frame_end);
246
247        Ok(Some((self_contained, payload)))
248    }
249}
250
251#[derive(Clone, Debug)]
252struct GenericFrameDecoder {
253    frame_buffer: Vec<u8>,
254    payload_buffer: Vec<u8>,
255    expected_payload_len: Option<usize>,
256}
257
258impl Default for GenericFrameDecoder {
259    fn default() -> Self {
260        Self {
261            frame_buffer: Vec::with_capacity(MAX_FRAME_SIZE),
262            payload_buffer: Vec::with_capacity(PAYLOAD_SIZE_LIMIT * 2),
263            expected_payload_len: None,
264        }
265    }
266}
267
268impl GenericFrameDecoder {
269    fn extract_non_self_contained_envelopes(&mut self) -> Result<Vec<Envelope>> {
270        if let Some(expected_payload_len) = self.expected_payload_len {
271            if self.payload_buffer.len() < expected_payload_len {
272                return Ok(vec![]);
273            }
274
275            let envelopes = try_decode_envelopes_without_spare_data(&self.payload_buffer)?;
276
277            self.payload_buffer.clear();
278            return Ok(envelopes);
279        }
280
281        if let Some(expected_payload_len) = self.extract_expected_payload_len() {
282            self.expected_payload_len = Some(expected_payload_len);
283            self.extract_non_self_contained_envelopes()
284        } else {
285            Ok(vec![])
286        }
287    }
288
289    fn extract_expected_payload_len(&self) -> Option<usize> {
290        if self.payload_buffer.len() < ENVELOPE_HEADER_LEN {
291            return None;
292        }
293
294        Some(i32::from_be_bytes(self.payload_buffer[5..9].try_into().unwrap()) as usize)
295    }
296
297    fn handle_frame(
298        &mut self,
299        envelopes: &mut Vec<Envelope>,
300        self_contained: bool,
301        frame: &mut Vec<u8>,
302    ) -> Result<()> {
303        if self_contained {
304            if !self.payload_buffer.is_empty() {
305                return Err(create_unexpected_self_contained_error());
306            }
307
308            envelopes.append(&mut try_decode_envelopes_without_spare_data(frame)?);
309        } else {
310            self.payload_buffer.append(frame);
311            envelopes.append(&mut self.extract_non_self_contained_envelopes()?);
312        }
313
314        Ok(())
315    }
316
317    fn consume(
318        &mut self,
319        data: &mut Vec<u8>,
320        try_decode_frame: impl Fn(&mut Vec<u8>) -> Result<Option<(bool, Vec<u8>)>>,
321    ) -> Result<Vec<Envelope>> {
322        let mut envelopes = vec![];
323
324        if self.frame_buffer.is_empty() {
325            // optimistic case
326            while !data.is_empty() {
327                if let Some((self_contained, mut frame)) = try_decode_frame(data)? {
328                    self.handle_frame(&mut envelopes, self_contained, &mut frame)?;
329                } else {
330                    // we have some data, but not a full frame yet
331                    self.frame_buffer.append(data);
332                    break;
333                }
334            }
335        } else {
336            self.frame_buffer.append(data);
337
338            while !self.frame_buffer.is_empty() {
339                if let Some((self_contained, mut frame)) = try_decode_frame(&mut self.frame_buffer)?
340                {
341                    self.handle_frame(&mut envelopes, self_contained, &mut frame)?;
342                } else {
343                    break;
344                }
345            }
346        }
347
348        Ok(envelopes)
349    }
350}