cassandra_protocol/frame/
frame_encoder.rs

1use crate::crc::{crc24, crc32};
2use crate::frame::{
3    COMPRESSED_FRAME_HEADER_LENGTH, FRAME_TRAILER_LENGTH, PAYLOAD_SIZE_LIMIT,
4    UNCOMPRESSED_FRAME_HEADER_LENGTH,
5};
6use lz4_flex::block::get_maximum_output_size;
7use lz4_flex::{compress, compress_into};
8
9#[inline]
10fn put3b(buffer: &mut [u8], value: i32) {
11    let value = value.to_le_bytes();
12    buffer[0] = value[0];
13    buffer[1] = value[1];
14    buffer[2] = value[2];
15}
16
17#[inline]
18fn add_trailer(buffer: &mut Vec<u8>, payload_start: usize) {
19    buffer.reserve(4);
20
21    let crc = crc32(&buffer[payload_start..]).to_le_bytes();
22
23    buffer.push(crc[0]);
24    buffer.push(crc[1]);
25    buffer.push(crc[2]);
26    buffer.push(crc[3]);
27}
28
29/// An encoder for frames. Since protocol *v5*, frames became "envelopes" and a frame now can contain
30/// multiple complete envelopes (self-contained frame) or a part of one bigger envelope.
31///
32/// Encoders are stateful and can either:
33/// 1. Have multiple self-contained envelopes added.
34/// 2. Have a single non self-contained envelope added.
35///
36/// In either case, the encoder is assumed to have the buffer ready to accept envelopes before
37/// adding the first one or after calling [`reset_buffer`]. At some point, the frame can become
38/// finalized (which is the only possible case when adding a non self-contained envelope) and the
39/// returned buffer is assumed to be immutable and ready to be sent.  
40pub trait FrameEncoder {
41    /// Determines if payload of given size can fit in current frame buffer.
42    fn can_fit(&self, len: usize) -> bool;
43
44    /// Resets the internal state and prepares it for encoding envelopes.
45    fn reset(&mut self);
46
47    /// Adds a self-contained envelope to current frame.
48    fn add_envelope(&mut self, envelope: Vec<u8>);
49
50    /// Finalizes a self-contained encoded frame in the buffer.
51    fn finalize_self_contained(&mut self) -> &[u8];
52
53    /// Appends a large envelope and finalizes non self-contained encoded frame in the buffer.
54    /// Copies as much envelope data as possible and returns new envelope buffer start.
55    fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]);
56
57    /// Checks if current frame contains any envelopes.
58    fn has_envelopes(&self) -> bool;
59}
60
61/// Pre-V5 frame encoder which simply encodes one envelope directly in the buffer.
62#[derive(Clone, Debug, Default)]
63pub struct LegacyFrameEncoder {
64    buffer: Vec<u8>,
65}
66
67impl FrameEncoder for LegacyFrameEncoder {
68    #[inline]
69    fn can_fit(&self, _len: usize) -> bool {
70        // we support only one envelope per frame
71        self.buffer.is_empty()
72    }
73
74    #[inline]
75    fn reset(&mut self) {
76        self.buffer.clear();
77    }
78
79    #[inline]
80    fn add_envelope(&mut self, envelope: Vec<u8>) {
81        self.buffer = envelope;
82    }
83
84    #[inline]
85    fn finalize_self_contained(&mut self) -> &[u8] {
86        &self.buffer
87    }
88
89    #[inline]
90    fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]) {
91        // attempting to finalize a non self-contained frame via the legacy encoder - while this
92        // will work, the legacy encoder doesn't distinguish such frames and all are considered
93        // self-contained
94
95        self.buffer.clear();
96        self.buffer.extend_from_slice(envelope);
97
98        (envelope.len(), &self.buffer)
99    }
100
101    #[inline]
102    fn has_envelopes(&self) -> bool {
103        !self.buffer.is_empty()
104    }
105}
106
107/// Post-V5 encoder with support for envelope frames with CRC checksum.
108#[derive(Clone, Debug)]
109pub struct UncompressedFrameEncoder {
110    buffer: Vec<u8>,
111}
112
113impl FrameEncoder for UncompressedFrameEncoder {
114    #[inline]
115    fn can_fit(&self, len: usize) -> bool {
116        (self.buffer.len() - UNCOMPRESSED_FRAME_HEADER_LENGTH).saturating_add(len)
117            < PAYLOAD_SIZE_LIMIT
118    }
119
120    #[inline]
121    fn reset(&mut self) {
122        self.buffer.truncate(UNCOMPRESSED_FRAME_HEADER_LENGTH);
123    }
124
125    #[inline]
126    fn add_envelope(&mut self, mut envelope: Vec<u8>) {
127        self.buffer.append(&mut envelope);
128    }
129
130    fn finalize_self_contained(&mut self) -> &[u8] {
131        self.write_header(true);
132        add_trailer(&mut self.buffer, UNCOMPRESSED_FRAME_HEADER_LENGTH);
133
134        &self.buffer
135    }
136
137    fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]) {
138        let max_size = envelope.len().min(PAYLOAD_SIZE_LIMIT - 1);
139
140        self.buffer.extend_from_slice(&envelope[..max_size]);
141        self.buffer.reserve(FRAME_TRAILER_LENGTH);
142
143        self.write_header(false);
144        add_trailer(&mut self.buffer, UNCOMPRESSED_FRAME_HEADER_LENGTH);
145
146        (max_size, &self.buffer)
147    }
148
149    #[inline]
150    fn has_envelopes(&self) -> bool {
151        self.buffer.len() > UNCOMPRESSED_FRAME_HEADER_LENGTH
152    }
153}
154
155impl Default for UncompressedFrameEncoder {
156    fn default() -> Self {
157        let buffer = vec![0; UNCOMPRESSED_FRAME_HEADER_LENGTH];
158        Self { buffer }
159    }
160}
161
162impl UncompressedFrameEncoder {
163    fn write_header(&mut self, self_contained: bool) {
164        let len = self.buffer.len();
165        debug_assert!(
166            len < (PAYLOAD_SIZE_LIMIT + UNCOMPRESSED_FRAME_HEADER_LENGTH),
167            "len: {} max: {}",
168            len,
169            PAYLOAD_SIZE_LIMIT + UNCOMPRESSED_FRAME_HEADER_LENGTH
170        );
171
172        let mut len = (len - UNCOMPRESSED_FRAME_HEADER_LENGTH) as u64;
173        if self_contained {
174            len |= 1 << 17;
175        }
176
177        put3b(self.buffer.as_mut_slice(), len as i32);
178        put3b(&mut self.buffer[3..], crc24(&len.to_le_bytes()[..3]));
179    }
180}
181
182/// Post-V5 Lz4 encoder with support for envelope frames with CRC checksum.
183#[derive(Clone, Debug)]
184pub struct Lz4FrameEncoder {
185    buffer: Vec<u8>,
186}
187
188impl FrameEncoder for Lz4FrameEncoder {
189    #[inline]
190    fn can_fit(&self, len: usize) -> bool {
191        // we don't know the whole compressed payload size, so we need to be conservative and expect
192        // the worst case
193        get_maximum_output_size(
194            (self.buffer.len() - COMPRESSED_FRAME_HEADER_LENGTH).saturating_add(len),
195        ) < PAYLOAD_SIZE_LIMIT
196    }
197
198    #[inline]
199    fn reset(&mut self) {
200        self.buffer.truncate(COMPRESSED_FRAME_HEADER_LENGTH);
201    }
202
203    #[inline]
204    fn add_envelope(&mut self, mut envelope: Vec<u8>) {
205        self.buffer.append(&mut envelope);
206    }
207
208    fn finalize_self_contained(&mut self) -> &[u8] {
209        let uncompressed_size = self.buffer.len() - COMPRESSED_FRAME_HEADER_LENGTH;
210        let mut compressed_payload = compress(&self.buffer[COMPRESSED_FRAME_HEADER_LENGTH..]);
211
212        self.buffer.truncate(COMPRESSED_FRAME_HEADER_LENGTH);
213        self.buffer.append(&mut compressed_payload);
214
215        self.write_header(uncompressed_size, true);
216        add_trailer(&mut self.buffer, COMPRESSED_FRAME_HEADER_LENGTH);
217
218        &self.buffer
219    }
220
221    fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]) {
222        let mut uncompressed_size = envelope.len().min(PAYLOAD_SIZE_LIMIT - 1);
223        let offset = uncompressed_size;
224
225        self.buffer.resize(
226            get_maximum_output_size(uncompressed_size)
227                + COMPRESSED_FRAME_HEADER_LENGTH
228                + FRAME_TRAILER_LENGTH, // add space for trailer, so we don't allocate later
229            0,
230        );
231
232        let mut compressed_size = compress_into(
233            &envelope[..uncompressed_size],
234            &mut self.buffer[COMPRESSED_FRAME_HEADER_LENGTH..],
235        )
236        .unwrap(); // we can safely unwrap, since we have at least the amount of space needed
237
238        if compressed_size >= PAYLOAD_SIZE_LIMIT {
239            // compressed size can exceed source size, therefore can exceed max payload size
240            // Java driver simply ignores compression at this point, so ¯\_(ツ)_/¯
241            self.buffer[COMPRESSED_FRAME_HEADER_LENGTH
242                ..(COMPRESSED_FRAME_HEADER_LENGTH + uncompressed_size)]
243                .copy_from_slice(&envelope[..uncompressed_size]);
244
245            compressed_size = uncompressed_size;
246            uncompressed_size = 0; // compressed size of 0 means no compression
247        }
248
249        self.buffer
250            .truncate(COMPRESSED_FRAME_HEADER_LENGTH + compressed_size);
251
252        self.write_header(uncompressed_size, false);
253        add_trailer(&mut self.buffer, COMPRESSED_FRAME_HEADER_LENGTH);
254
255        (offset, &self.buffer)
256    }
257
258    #[inline]
259    fn has_envelopes(&self) -> bool {
260        self.buffer.len() > COMPRESSED_FRAME_HEADER_LENGTH
261    }
262}
263
264impl Default for Lz4FrameEncoder {
265    fn default() -> Self {
266        let buffer = vec![0; COMPRESSED_FRAME_HEADER_LENGTH];
267        Self { buffer }
268    }
269}
270
271impl Lz4FrameEncoder {
272    fn write_header(&mut self, uncompressed_size: usize, self_contained: bool) {
273        let len = self.buffer.len();
274        debug_assert!(len < (PAYLOAD_SIZE_LIMIT + COMPRESSED_FRAME_HEADER_LENGTH));
275
276        let mut header =
277            (len - COMPRESSED_FRAME_HEADER_LENGTH) as u64 | ((uncompressed_size as u64) << 17);
278
279        if self_contained {
280            header |= 1 << 34;
281        }
282
283        let crc = crc24(&header.to_le_bytes()[..5]) as u64;
284
285        let header = header | (crc << 40);
286        self.buffer[..8].copy_from_slice(&header.to_le_bytes());
287    }
288}