Skip to main content

cassandra_protocol/frame/
frame_decoder.rs

1use crate::compression::{Compression, CompressionError};
2use crate::crc::{crc24, crc32};
3use crate::error::{Error, Result};
4use crate::frame::{
5    Envelope, ParseEnvelopeError, COMPRESSED_FRAME_HEADER_LENGTH, ENVELOPE_HEADER_LEN,
6    FRAME_TRAILER_LENGTH, MAX_FRAME_SIZE, PAYLOAD_SIZE_LIMIT, UNCOMPRESSED_FRAME_HEADER_LENGTH,
7};
8use lz4_flex::decompress;
9use std::convert::TryInto;
10use std::io;
11
12#[inline]
13fn create_unexpected_self_contained_error() -> Error {
14    "Found self-contained frame while waiting for non self-contained continuation!".into()
15}
16
17#[inline]
18fn create_header_crc_mismatch_error(computed_crc: i32, header_crc24: i32) -> Error {
19    format!("Header CRC mismatch - expected {header_crc24}, found {computed_crc}.",).into()
20}
21
22#[inline]
23fn create_payload_crc_mismatch_error(computed_crc: u32, payload_crc32: u32) -> Error {
24    format!("Payload CRC mismatch - read {payload_crc32}, computed {computed_crc}.",).into()
25}
26
27fn extract_envelopes(buffer: &[u8], compression: Compression) -> Result<(usize, Vec<Envelope>)> {
28    let mut current_pos = 0;
29    let mut envelopes = vec![];
30
31    loop {
32        match Envelope::from_buffer(&buffer[current_pos..], compression) {
33            Ok(envelope) => {
34                envelopes.push(envelope.envelope);
35                current_pos += envelope.envelope_len;
36            }
37            Err(ParseEnvelopeError::NotEnoughBytes) => break,
38            Err(error) => return Err(error.to_string().into()),
39        }
40    }
41
42    Ok((current_pos, envelopes))
43}
44
45fn try_decode_envelopes_with_spare_data(
46    buffer: &mut Vec<u8>,
47    compression: Compression,
48) -> Result<(Vec<Envelope>, Vec<u8>)> {
49    let (current_pos, envelopes) = extract_envelopes(buffer.as_slice(), compression)?;
50    Ok((envelopes, buffer.split_off(current_pos)))
51}
52
53fn try_decode_envelopes_without_spare_data(buffer: &[u8]) -> Result<Vec<Envelope>> {
54    let (_, envelopes) = extract_envelopes(buffer, Compression::None)?;
55    Ok(envelopes)
56}
57
58/// A decoder for frames. Since protocol v5, frames became "envelopes" and a frame now can contain
59/// multiple complete envelopes (self-contained frame) or a part of one bigger envelope.
60pub trait FrameDecoder {
61    /// Consumes some data and returns decoded envelopes. Decoders can be stateful, so data can be
62    /// buffered until envelopes can be parsed.
63    /// The buffer passed in should be cleared of consumed data by the decoder.
64    fn consume(&mut self, data: &mut Vec<u8>, compression: Compression) -> Result<Vec<Envelope>>;
65}
66
67/// Pre-V5 frame decoder which simply decodes one envelope directly into a buffer.
68#[derive(Clone, Debug)]
69pub struct LegacyFrameDecoder {
70    buffer: Vec<u8>,
71}
72
73impl Default for LegacyFrameDecoder {
74    fn default() -> Self {
75        Self {
76            buffer: Vec::with_capacity(MAX_FRAME_SIZE),
77        }
78    }
79}
80
81impl FrameDecoder for LegacyFrameDecoder {
82    fn consume(&mut self, data: &mut Vec<u8>, compression: Compression) -> Result<Vec<Envelope>> {
83        if self.buffer.is_empty() {
84            // optimistic case
85            let (envelopes, buffer) = try_decode_envelopes_with_spare_data(data, compression)?;
86
87            self.buffer = buffer;
88            data.clear();
89
90            return Ok(envelopes);
91        }
92
93        self.buffer.append(data);
94
95        let (envelopes, buffer) =
96            try_decode_envelopes_with_spare_data(&mut self.buffer, compression)?;
97
98        self.buffer = buffer;
99        Ok(envelopes)
100    }
101}
102
103/// Post-V5 Lz4 decoder with support for envelope frames with CRC checksum.
104#[derive(Clone, Debug, Default)]
105pub struct Lz4FrameDecoder {
106    inner_decoder: GenericFrameDecoder,
107}
108
109impl FrameDecoder for Lz4FrameDecoder {
110    //noinspection DuplicatedCode
111    #[inline]
112    fn consume(&mut self, data: &mut Vec<u8>, _compression: Compression) -> Result<Vec<Envelope>> {
113        self.inner_decoder.consume(data, Self::try_decode_frame)
114    }
115}
116
117impl Lz4FrameDecoder {
118    fn try_decode_frame(buffer: &mut Vec<u8>) -> Result<Option<(bool, Vec<u8>)>> {
119        let buffer_len = buffer.len();
120        if buffer_len < COMPRESSED_FRAME_HEADER_LENGTH {
121            return Ok(None);
122        }
123
124        let header =
125            i64::from_le_bytes(buffer[..COMPRESSED_FRAME_HEADER_LENGTH].try_into().unwrap());
126
127        let header_crc24 = ((header >> 40) & 0xffffff) as i32;
128        let computed_crc = crc24(&header.to_le_bytes()[..5]);
129
130        if header_crc24 != computed_crc {
131            return Err(create_header_crc_mismatch_error(computed_crc, header_crc24));
132        }
133
134        let compressed_length = (header & 0x1ffff) as usize;
135        let compressed_payload_end = compressed_length + COMPRESSED_FRAME_HEADER_LENGTH;
136
137        let frame_end = compressed_payload_end + FRAME_TRAILER_LENGTH;
138        if buffer_len < frame_end {
139            return Ok(None);
140        }
141
142        let compressed_payload_crc32 = u32::from_le_bytes(
143            buffer[compressed_payload_end..frame_end]
144                .try_into()
145                .unwrap(),
146        );
147
148        let computed_crc = crc32(&buffer[COMPRESSED_FRAME_HEADER_LENGTH..compressed_payload_end]);
149
150        if compressed_payload_crc32 != computed_crc {
151            return Err(create_payload_crc_mismatch_error(
152                computed_crc,
153                compressed_payload_crc32,
154            ));
155        }
156
157        let self_contained = (header & (1 << 34)) != 0;
158        let uncompressed_length = ((header >> 17) & 0x1ffff) as usize;
159
160        if uncompressed_length == 0 {
161            // protocol spec 2.2:
162            // An uncompressed length of 0 signals that the compressed payload should be used as-is
163            // and not decompressed.
164            let payload = buffer[COMPRESSED_FRAME_HEADER_LENGTH..compressed_payload_end].into();
165            *buffer = buffer.split_off(frame_end);
166
167            return Ok(Some((self_contained, payload)));
168        }
169
170        decompress(
171            &buffer[COMPRESSED_FRAME_HEADER_LENGTH..compressed_payload_end],
172            uncompressed_length,
173        )
174        .map_err(|error| CompressionError::Lz4(io::Error::other(error)).into())
175        .map(|payload| {
176            *buffer = buffer.split_off(frame_end);
177            Some((self_contained, payload))
178        })
179    }
180}
181
182/// Post-V5 decoder with support for envelope frames with CRC checksum.
183#[derive(Clone, Debug, Default)]
184pub struct UncompressedFrameDecoder {
185    inner_decoder: GenericFrameDecoder,
186}
187
188impl FrameDecoder for UncompressedFrameDecoder {
189    //noinspection DuplicatedCode
190    #[inline]
191    fn consume(&mut self, data: &mut Vec<u8>, _compression: Compression) -> Result<Vec<Envelope>> {
192        self.inner_decoder.consume(data, Self::try_decode_frame)
193    }
194}
195
196impl UncompressedFrameDecoder {
197    fn try_decode_frame(buffer: &mut Vec<u8>) -> Result<Option<(bool, Vec<u8>)>> {
198        let buffer_len = buffer.len();
199        if buffer_len < UNCOMPRESSED_FRAME_HEADER_LENGTH {
200            return Ok(None);
201        }
202
203        let header = if buffer_len >= 8 {
204            i64::from_le_bytes(buffer[..8].try_into().unwrap()) & 0xffffffffffff
205        } else {
206            let mut header = 0;
207            for (i, byte) in buffer[..UNCOMPRESSED_FRAME_HEADER_LENGTH]
208                .iter()
209                .enumerate()
210            {
211                header |= (*byte as i64) << (8 * i as i64);
212            }
213
214            header
215        };
216
217        let header_crc24 = ((header >> 24) & 0xffffff) as i32;
218        let computed_crc = crc24(&header.to_le_bytes()[..3]);
219
220        if header_crc24 != computed_crc {
221            return Err(create_header_crc_mismatch_error(computed_crc, header_crc24));
222        }
223
224        let payload_length = (header & 0x1ffff) as usize;
225        let payload_end = UNCOMPRESSED_FRAME_HEADER_LENGTH + payload_length;
226
227        let frame_end = payload_end + FRAME_TRAILER_LENGTH;
228        if buffer_len < frame_end {
229            return Ok(None);
230        }
231
232        let payload_crc32 = u32::from_le_bytes(buffer[payload_end..frame_end].try_into().unwrap());
233
234        let computed_crc = crc32(&buffer[UNCOMPRESSED_FRAME_HEADER_LENGTH..payload_end]);
235        if payload_crc32 != computed_crc {
236            return Err(create_payload_crc_mismatch_error(
237                computed_crc,
238                payload_crc32,
239            ));
240        }
241
242        let self_contained = (header & (1 << 17)) != 0;
243
244        let payload = buffer[UNCOMPRESSED_FRAME_HEADER_LENGTH..payload_end].into();
245        *buffer = buffer.split_off(frame_end);
246
247        Ok(Some((self_contained, payload)))
248    }
249}
250
251#[derive(Clone, Debug)]
252struct GenericFrameDecoder {
253    frame_buffer: Vec<u8>,
254    payload_buffer: Vec<u8>,
255    expected_payload_len: Option<usize>,
256}
257
258impl Default for GenericFrameDecoder {
259    fn default() -> Self {
260        Self {
261            frame_buffer: Vec::with_capacity(MAX_FRAME_SIZE),
262            payload_buffer: Vec::with_capacity(PAYLOAD_SIZE_LIMIT * 2),
263            expected_payload_len: None,
264        }
265    }
266}
267
268impl GenericFrameDecoder {
269    fn extract_non_self_contained_envelopes(&mut self) -> Result<Vec<Envelope>> {
270        if let Some(expected_payload_len) = self.expected_payload_len {
271            // The Cassandra wire format encodes the body length in bytes 5..9
272            // of the envelope header (after version/flags/stream/opcode). The
273            // FULL envelope on the wire is therefore ENVELOPE_HEADER_LEN bytes
274            // of header plus expected_payload_len bytes of body, so the buffer
275            // must contain at least that many bytes before we can decode it.
276            // Without this header offset we would attempt to decode while the
277            // body was still partial, lose the partial data on the buffer
278            // truncation below, and mis-frame the next envelope.
279            let total_envelope_len = ENVELOPE_HEADER_LEN + expected_payload_len;
280            if self.payload_buffer.len() < total_envelope_len {
281                return Ok(vec![]);
282            }
283
284            // Use extract_envelopes directly so we know exactly how many bytes
285            // got consumed. drain(..consumed) preserves any trailing bytes
286            // that may belong to the next envelope - they could legitimately
287            // be there if a producer packed bytes from envelope N+1 into the
288            // tail of the non-self-contained sequence for envelope N. The
289            // previous code simply called clear() and silently lost them.
290            let (consumed, envelopes) = extract_envelopes(&self.payload_buffer, Compression::None)?;
291            self.payload_buffer.drain(..consumed);
292
293            // Reset envelope-tracking state so the next call re-parses the
294            // body length from whatever envelope header remains at the start
295            // of the buffer (or waits for one if the buffer is empty / a
296            // partial header). Without this reset the next sequence would be
297            // gated against the previous envelope's length.
298            self.expected_payload_len = None;
299            return Ok(envelopes);
300        }
301
302        if let Some(expected_payload_len) = self.extract_expected_payload_len() {
303            self.expected_payload_len = Some(expected_payload_len);
304            self.extract_non_self_contained_envelopes()
305        } else {
306            Ok(vec![])
307        }
308    }
309
310    fn extract_expected_payload_len(&self) -> Option<usize> {
311        if self.payload_buffer.len() < ENVELOPE_HEADER_LEN {
312            return None;
313        }
314
315        Some(i32::from_be_bytes(self.payload_buffer[5..9].try_into().unwrap()) as usize)
316    }
317
318    fn handle_frame(
319        &mut self,
320        envelopes: &mut Vec<Envelope>,
321        self_contained: bool,
322        frame: &mut Vec<u8>,
323    ) -> Result<()> {
324        if self_contained {
325            if !self.payload_buffer.is_empty() {
326                return Err(create_unexpected_self_contained_error());
327            }
328
329            envelopes.append(&mut try_decode_envelopes_without_spare_data(frame)?);
330        } else {
331            self.payload_buffer.append(frame);
332            envelopes.append(&mut self.extract_non_self_contained_envelopes()?);
333        }
334
335        Ok(())
336    }
337
338    fn consume(
339        &mut self,
340        data: &mut Vec<u8>,
341        try_decode_frame: impl Fn(&mut Vec<u8>) -> Result<Option<(bool, Vec<u8>)>>,
342    ) -> Result<Vec<Envelope>> {
343        let mut envelopes = vec![];
344
345        if self.frame_buffer.is_empty() {
346            // optimistic case
347            while !data.is_empty() {
348                if let Some((self_contained, mut frame)) = try_decode_frame(data)? {
349                    self.handle_frame(&mut envelopes, self_contained, &mut frame)?;
350                } else {
351                    // we have some data, but not a full frame yet
352                    self.frame_buffer.append(data);
353                    break;
354                }
355            }
356        } else {
357            self.frame_buffer.append(data);
358
359            while !self.frame_buffer.is_empty() {
360                if let Some((self_contained, mut frame)) = try_decode_frame(&mut self.frame_buffer)?
361                {
362                    self.handle_frame(&mut envelopes, self_contained, &mut frame)?;
363                } else {
364                    break;
365                }
366            }
367        }
368
369        Ok(envelopes)
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use crate::frame::frame_encoder::{FrameEncoder, UncompressedFrameEncoder};
377    use crate::frame::{Direction, Envelope, Flags, Opcode, Version};
378
379    // Build a body of `size` bytes filled with the supplied byte. We pick the
380    // body size to be larger than PAYLOAD_SIZE_LIMIT so the encoder is forced
381    // to emit non-self-contained frames.
382    fn make_envelope(stream_id: i16, fill: u8, body_size: usize) -> Vec<u8> {
383        Envelope {
384            version: Version::V5,
385            direction: Direction::Request,
386            flags: Flags::empty(),
387            opcode: Opcode::Query,
388            stream_id,
389            body: vec![fill; body_size],
390            tracing_id: None,
391            warnings: vec![],
392        }
393        .encode_with(Compression::None)
394        .unwrap()
395    }
396
397    // Encode one envelope (which is too large to fit in a single frame) as a
398    // sequence of non-self-contained frames. Each frame has its own header and
399    // CRC trailer so we can simply concatenate them on the wire.
400    fn encode_as_non_self_contained(envelope: &[u8]) -> Vec<u8> {
401        let mut encoder = UncompressedFrameEncoder::default();
402        let mut wire = vec![];
403        let mut start = 0;
404        while start < envelope.len() {
405            let (consumed, frame) = encoder.finalize_non_self_contained(&envelope[start..]);
406            wire.extend_from_slice(frame);
407            start += consumed;
408            encoder.reset();
409        }
410        wire
411    }
412
413    #[test]
414    fn decoder_recovers_two_consecutive_non_self_contained_envelopes() {
415        // Use a body just over PAYLOAD_SIZE_LIMIT so each envelope spans two
416        // frames; the second envelope is deliberately a different (smaller)
417        // size to expose any stale `expected_payload_len` carryover.
418        let envelope_a = make_envelope(1, 0xAA, PAYLOAD_SIZE_LIMIT + 100);
419        let envelope_b = make_envelope(2, 0xBB, PAYLOAD_SIZE_LIMIT + 50);
420
421        let mut wire = encode_as_non_self_contained(&envelope_a);
422        wire.extend_from_slice(&encode_as_non_self_contained(&envelope_b));
423
424        let mut decoder = UncompressedFrameDecoder::default();
425        let envelopes = decoder
426            .consume(&mut wire, Compression::None)
427            .expect("decoder must accept two consecutive non-self-contained envelopes");
428
429        // we expect to recover both envelopes intact, in order
430        assert_eq!(envelopes.len(), 2, "should decode both envelopes");
431        assert_eq!(envelopes[0].stream_id, 1);
432        assert_eq!(envelopes[0].body, vec![0xAA; PAYLOAD_SIZE_LIMIT + 100]);
433        assert_eq!(envelopes[1].stream_id, 2);
434        assert_eq!(envelopes[1].body, vec![0xBB; PAYLOAD_SIZE_LIMIT + 50]);
435    }
436
437    // The reviewer pointed out a defensive gap: when payload_buffer holds a
438    // complete envelope plus the start of the next envelope (because a
439    // hypothetical producer packed bytes across envelope boundaries inside
440    // non-self-contained frames), the previous code called clear() on the
441    // buffer after decoding the first envelope, losing the trailing bytes
442    // that begin the next envelope.
443    //
444    // We construct that exact scenario by hand-packing one non-self-contained
445    // frame whose payload is `envelope_a + envelope_b[..partial]`, followed
446    // by another non-self-contained frame carrying `envelope_b[partial..]`.
447    // A correct decoder must reconstruct both envelopes; a buggy one drops
448    // envelope_b's prefix on `clear()` and then fails to parse the second
449    // envelope from a misaligned start.
450    #[test]
451    fn decoder_preserves_trailing_bytes_across_non_self_contained_frames() {
452        let envelope_a = make_envelope(1, 0xAA, 100);
453        let envelope_b = make_envelope(2, 0xBB, 200);
454
455        // Frame 1 carries envelope_a in full PLUS the first 100 bytes of
456        // envelope_b. Both fit comfortably under PAYLOAD_SIZE_LIMIT, so
457        // finalize_non_self_contained packs them into a single frame.
458        let half_b = 100usize;
459        let mut packed = envelope_a.clone();
460        packed.extend_from_slice(&envelope_b[..half_b]);
461
462        let mut encoder = UncompressedFrameEncoder::default();
463        let (consumed, frame1_slice) = encoder.finalize_non_self_contained(&packed);
464        assert_eq!(
465            consumed,
466            packed.len(),
467            "test setup: whole packed slice must fit"
468        );
469        let frame1: Vec<u8> = frame1_slice.to_vec();
470        encoder.reset();
471
472        // Frame 2 carries the remaining bytes of envelope_b.
473        let (_, frame2_slice) = encoder.finalize_non_self_contained(&envelope_b[half_b..]);
474        let frame2: Vec<u8> = frame2_slice.to_vec();
475
476        let mut wire = frame1;
477        wire.extend_from_slice(&frame2);
478
479        let mut decoder = UncompressedFrameDecoder::default();
480        let envelopes = decoder
481            .consume(&mut wire, Compression::None)
482            .expect("decoder must accept the cross-boundary packed frames");
483
484        assert_eq!(envelopes.len(), 2, "both envelopes must be recovered");
485        assert_eq!(envelopes[0].stream_id, 1);
486        assert_eq!(envelopes[0].body, vec![0xAA; 100]);
487        assert_eq!(envelopes[1].stream_id, 2);
488        assert_eq!(envelopes[1].body, vec![0xBB; 200]);
489    }
490}