cassandra_protocol/frame/
frame_encoder.rs1use crate::crc::{crc24, crc32};
2use crate::frame::{
3 COMPRESSED_FRAME_HEADER_LENGTH, FRAME_TRAILER_LENGTH, PAYLOAD_SIZE_LIMIT,
4 UNCOMPRESSED_FRAME_HEADER_LENGTH,
5};
6use lz4_flex::block::get_maximum_output_size;
7use lz4_flex::{compress, compress_into};
8
9#[inline]
10fn put3b(buffer: &mut [u8], value: i32) {
11 let value = value.to_le_bytes();
12 buffer[0] = value[0];
13 buffer[1] = value[1];
14 buffer[2] = value[2];
15}
16
17#[inline]
18fn add_trailer(buffer: &mut Vec<u8>, payload_start: usize) {
19 buffer.reserve(4);
20
21 let crc = crc32(&buffer[payload_start..]).to_le_bytes();
22
23 buffer.push(crc[0]);
24 buffer.push(crc[1]);
25 buffer.push(crc[2]);
26 buffer.push(crc[3]);
27}
28
29pub trait FrameEncoder {
41 fn can_fit(&self, len: usize) -> bool;
43
44 fn reset(&mut self);
46
47 fn add_envelope(&mut self, envelope: Vec<u8>);
49
50 fn finalize_self_contained(&mut self) -> &[u8];
52
53 fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]);
56
57 fn has_envelopes(&self) -> bool;
59}
60
61#[derive(Clone, Debug, Default)]
63pub struct LegacyFrameEncoder {
64 buffer: Vec<u8>,
65}
66
67impl FrameEncoder for LegacyFrameEncoder {
68 #[inline]
69 fn can_fit(&self, _len: usize) -> bool {
70 self.buffer.is_empty()
72 }
73
74 #[inline]
75 fn reset(&mut self) {
76 self.buffer.clear();
77 }
78
79 #[inline]
80 fn add_envelope(&mut self, envelope: Vec<u8>) {
81 self.buffer = envelope;
82 }
83
84 #[inline]
85 fn finalize_self_contained(&mut self) -> &[u8] {
86 &self.buffer
87 }
88
89 #[inline]
90 fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]) {
91 self.buffer.clear();
96 self.buffer.extend_from_slice(envelope);
97
98 (envelope.len(), &self.buffer)
99 }
100
101 #[inline]
102 fn has_envelopes(&self) -> bool {
103 !self.buffer.is_empty()
104 }
105}
106
107#[derive(Clone, Debug)]
109pub struct UncompressedFrameEncoder {
110 buffer: Vec<u8>,
111}
112
113impl FrameEncoder for UncompressedFrameEncoder {
114 #[inline]
115 fn can_fit(&self, len: usize) -> bool {
116 (self.buffer.len() - UNCOMPRESSED_FRAME_HEADER_LENGTH).saturating_add(len)
117 < PAYLOAD_SIZE_LIMIT
118 }
119
120 #[inline]
121 fn reset(&mut self) {
122 self.buffer.truncate(UNCOMPRESSED_FRAME_HEADER_LENGTH);
123 }
124
125 #[inline]
126 fn add_envelope(&mut self, mut envelope: Vec<u8>) {
127 self.buffer.append(&mut envelope);
128 }
129
130 fn finalize_self_contained(&mut self) -> &[u8] {
131 self.write_header(true);
132 add_trailer(&mut self.buffer, UNCOMPRESSED_FRAME_HEADER_LENGTH);
133
134 &self.buffer
135 }
136
137 fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]) {
138 let max_size = envelope.len().min(PAYLOAD_SIZE_LIMIT - 1);
139
140 self.buffer.extend_from_slice(&envelope[..max_size]);
141 self.buffer.reserve(FRAME_TRAILER_LENGTH);
142
143 self.write_header(false);
144 add_trailer(&mut self.buffer, UNCOMPRESSED_FRAME_HEADER_LENGTH);
145
146 (max_size, &self.buffer)
147 }
148
149 #[inline]
150 fn has_envelopes(&self) -> bool {
151 self.buffer.len() > UNCOMPRESSED_FRAME_HEADER_LENGTH
152 }
153}
154
155impl Default for UncompressedFrameEncoder {
156 fn default() -> Self {
157 let buffer = vec![0; UNCOMPRESSED_FRAME_HEADER_LENGTH];
158 Self { buffer }
159 }
160}
161
162impl UncompressedFrameEncoder {
163 fn write_header(&mut self, self_contained: bool) {
164 let len = self.buffer.len();
165 debug_assert!(
166 len < (PAYLOAD_SIZE_LIMIT + UNCOMPRESSED_FRAME_HEADER_LENGTH),
167 "len: {} max: {}",
168 len,
169 PAYLOAD_SIZE_LIMIT + UNCOMPRESSED_FRAME_HEADER_LENGTH
170 );
171
172 let mut len = (len - UNCOMPRESSED_FRAME_HEADER_LENGTH) as u64;
173 if self_contained {
174 len |= 1 << 17;
175 }
176
177 put3b(self.buffer.as_mut_slice(), len as i32);
178 put3b(&mut self.buffer[3..], crc24(&len.to_le_bytes()[..3]));
179 }
180}
181
182#[derive(Clone, Debug)]
184pub struct Lz4FrameEncoder {
185 buffer: Vec<u8>,
186}
187
188impl FrameEncoder for Lz4FrameEncoder {
189 #[inline]
190 fn can_fit(&self, len: usize) -> bool {
191 get_maximum_output_size(
194 (self.buffer.len() - COMPRESSED_FRAME_HEADER_LENGTH).saturating_add(len),
195 ) < PAYLOAD_SIZE_LIMIT
196 }
197
198 #[inline]
199 fn reset(&mut self) {
200 self.buffer.truncate(COMPRESSED_FRAME_HEADER_LENGTH);
201 }
202
203 #[inline]
204 fn add_envelope(&mut self, mut envelope: Vec<u8>) {
205 self.buffer.append(&mut envelope);
206 }
207
208 fn finalize_self_contained(&mut self) -> &[u8] {
209 let uncompressed_size = self.buffer.len() - COMPRESSED_FRAME_HEADER_LENGTH;
210 let mut compressed_payload = compress(&self.buffer[COMPRESSED_FRAME_HEADER_LENGTH..]);
211
212 self.buffer.truncate(COMPRESSED_FRAME_HEADER_LENGTH);
213 self.buffer.append(&mut compressed_payload);
214
215 self.write_header(uncompressed_size, true);
216 add_trailer(&mut self.buffer, COMPRESSED_FRAME_HEADER_LENGTH);
217
218 &self.buffer
219 }
220
221 fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]) {
222 let mut uncompressed_size = envelope.len().min(PAYLOAD_SIZE_LIMIT - 1);
223 let offset = uncompressed_size;
224
225 self.buffer.resize(
226 get_maximum_output_size(uncompressed_size)
227 + COMPRESSED_FRAME_HEADER_LENGTH
228 + FRAME_TRAILER_LENGTH, 0,
230 );
231
232 let mut compressed_size = compress_into(
233 &envelope[..uncompressed_size],
234 &mut self.buffer[COMPRESSED_FRAME_HEADER_LENGTH..],
235 )
236 .unwrap(); if compressed_size >= PAYLOAD_SIZE_LIMIT {
239 self.buffer[COMPRESSED_FRAME_HEADER_LENGTH
242 ..(COMPRESSED_FRAME_HEADER_LENGTH + uncompressed_size)]
243 .copy_from_slice(&envelope[..uncompressed_size]);
244
245 compressed_size = uncompressed_size;
246 uncompressed_size = 0; }
248
249 self.buffer
250 .truncate(COMPRESSED_FRAME_HEADER_LENGTH + compressed_size);
251
252 self.write_header(uncompressed_size, false);
253 add_trailer(&mut self.buffer, COMPRESSED_FRAME_HEADER_LENGTH);
254
255 (offset, &self.buffer)
256 }
257
258 #[inline]
259 fn has_envelopes(&self) -> bool {
260 self.buffer.len() > COMPRESSED_FRAME_HEADER_LENGTH
261 }
262}
263
264impl Default for Lz4FrameEncoder {
265 fn default() -> Self {
266 let buffer = vec![0; COMPRESSED_FRAME_HEADER_LENGTH];
267 Self { buffer }
268 }
269}
270
271impl Lz4FrameEncoder {
272 fn write_header(&mut self, uncompressed_size: usize, self_contained: bool) {
273 let len = self.buffer.len();
274 debug_assert!(len < (PAYLOAD_SIZE_LIMIT + COMPRESSED_FRAME_HEADER_LENGTH));
275
276 let mut header =
277 (len - COMPRESSED_FRAME_HEADER_LENGTH) as u64 | ((uncompressed_size as u64) << 17);
278
279 if self_contained {
280 header |= 1 << 34;
281 }
282
283 let crc = crc24(&header.to_le_bytes()[..5]) as u64;
284
285 let header = header | (crc << 40);
286 self.buffer[..8].copy_from_slice(&header.to_le_bytes());
287 }
288}