1use crate::frame::frame_response::ResponseBody;
3use crate::types::to_n_bytes;
4use crate::uuid::Uuid;
5
6pub const STREAM_LEN: usize = 2;
8pub const LENGTH_LEN: usize = 4;
10
11pub mod events;
12pub mod frame_auth_challenge;
13pub mod frame_auth_response;
14pub mod frame_auth_success;
15pub mod frame_authenticate;
16pub mod frame_batch;
17pub mod frame_error;
18pub mod frame_event;
19pub mod frame_execute;
20pub mod frame_options;
21pub mod frame_prepare;
22pub mod frame_query;
23pub mod frame_ready;
24pub mod frame_register;
25pub mod frame_response;
26pub mod frame_result;
27pub mod frame_startup;
28pub mod frame_supported;
29pub mod parser;
30pub mod parser_async;
31pub mod traits;
32
33use crate::error;
34
35pub use self::traits::*;
36
37#[derive(Debug)]
38pub struct Frame {
39 pub version: Version,
40 pub flags: Vec<Flag>,
41 pub opcode: Opcode,
42 pub stream: u16,
43 pub body: Vec<u8>,
44 pub tracing_id: Option<Uuid>,
45 pub warnings: Vec<String>,
46}
47
48impl Frame {
49 pub fn get_body(&self) -> error::Result<ResponseBody> {
50 ResponseBody::from(self.body.as_slice(), &self.opcode)
51 }
52
53 pub fn tracing_id(&self) -> &Option<Uuid> {
54 &self.tracing_id
55 }
56
57 pub fn warnings(&self) -> &Vec<String> {
58 &self.warnings
59 }
60}
61
62impl<'a> IntoBytes for Frame {
63 fn into_cbytes(&self) -> Vec<u8> {
64 let mut v = vec![];
65
66 let version_bytes = self.version.as_byte();
67 let flag_bytes = Flag::many_to_cbytes(&self.flags);
68 let opcode_bytes = self.opcode.as_byte();
69 let body_len = self.body.len();
70
71 v.push(version_bytes);
72 v.push(flag_bytes);
73 v.extend_from_slice(to_n_bytes(self.stream as u64, STREAM_LEN).as_slice());
74 v.push(opcode_bytes);
75 v.extend_from_slice(to_n_bytes(body_len as u64, LENGTH_LEN).as_slice());
76 v.extend_from_slice(self.body.as_slice());
77
78 v
79 }
80}
81
82#[derive(Debug, PartialEq)]
84pub enum Version {
85 Request,
86 Response,
87}
88
89impl Version {
90 pub const BYTE_LENGTH: usize = 1;
92
93 fn request_version() -> u8 {
96 if cfg!(feature = "v3") {
97 0x03
98 } else if cfg!(feature = "v4") || cfg!(feature = "v5") {
99 0x04
100 } else {
101 panic!(
102 "{}",
103 "Protocol version is not supported. CDRS should be run with protocol feature \
104 set to v3, v4 or v5"
105 );
106 }
107 }
108
109 fn response_version() -> u8 {
112 if cfg!(feature = "v3") {
113 0x83
114 } else if cfg!(feature = "v4") || cfg!(feature = "v5") {
115 0x84
116 } else {
117 panic!(
118 "{}",
119 "Protocol version is not supported. CDRS should be run with protocol feature \
120 set to v3, v4 or v5"
121 );
122 }
123 }
124}
125
126impl AsByte for Version {
127 fn as_byte(&self) -> u8 {
128 match self {
129 &Version::Request => Version::request_version(),
130 &Version::Response => Version::response_version(),
131 }
132 }
133}
134
135impl From<Vec<u8>> for Version {
136 fn from(v: Vec<u8>) -> Version {
137 if v.len() != Self::BYTE_LENGTH {
138 error!(
139 "Unexpected Cassandra verion. Should has {} byte(-s), got {:?}",
140 Self::BYTE_LENGTH,
141 v
142 );
143 panic!(
144 "Unexpected Cassandra verion. Should has {} byte(-s), got {:?}",
145 Self::BYTE_LENGTH,
146 v
147 );
148 }
149 let version = v[0];
150 let req = Version::request_version();
151 let res = Version::response_version();
152
153 if version == req {
154 Version::Request
155 } else if version == res {
156 Version::Response
157 } else {
158 error!(
159 "Unexpected Cassandra version {:?}, either {:?} or {:?} is expected",
160 version, req, res
161 );
162 panic!(
163 "Unexpected Cassandra version {:?}, either {:?} or {:?} is expected",
164 version, req, res
165 );
166 }
167 }
168}
169
170#[derive(Debug, PartialEq)]
173pub enum Flag {
174 Compression,
175 Tracing,
176 CustomPayload,
177 Warning,
178 Ignore,
179}
180
181impl Flag {
182 const BYTE_LENGTH: usize = 1;
184
185 pub fn get_collection(flags: u8) -> Vec<Flag> {
187 let mut found_flags: Vec<Flag> = vec![];
188
189 if Flag::has_compression(flags) {
190 found_flags.push(Flag::Compression);
191 }
192
193 if Flag::has_tracing(flags) {
194 found_flags.push(Flag::Tracing);
195 }
196
197 if Flag::has_custom_payload(flags) {
198 found_flags.push(Flag::CustomPayload);
199 }
200
201 if Flag::has_warning(flags) {
202 found_flags.push(Flag::Warning);
203 }
204
205 found_flags
206 }
207
208 pub fn many_to_cbytes(flags: &Vec<Flag>) -> u8 {
210 flags
211 .iter()
212 .fold(Flag::Ignore.as_byte(), |acc, f| acc | f.as_byte())
213 }
214
215 pub fn has_compression(flags: u8) -> bool {
217 (flags & Flag::Compression.as_byte()) > 0
218 }
219
220 pub fn has_tracing(flags: u8) -> bool {
222 (flags & Flag::Tracing.as_byte()) > 0
223 }
224
225 pub fn has_custom_payload(flags: u8) -> bool {
227 (flags & Flag::CustomPayload.as_byte()) > 0
228 }
229
230 pub fn has_warning(flags: u8) -> bool {
232 (flags & Flag::Warning.as_byte()) > 0
233 }
234}
235
236impl AsByte for Flag {
237 fn as_byte(&self) -> u8 {
238 match self {
239 &Flag::Compression => 0x01,
240 &Flag::Tracing => 0x02,
241 &Flag::CustomPayload => 0x04,
242 &Flag::Warning => 0x08,
243 &Flag::Ignore => 0x00,
244 }
246 }
247}
248
249impl From<u8> for Flag {
250 fn from(f: u8) -> Flag {
251 match f {
252 0x01 => Flag::Compression,
253 0x02 => Flag::Tracing,
254 0x04 => Flag::CustomPayload,
255 0x08 => Flag::Warning,
256 _ => Flag::Ignore, }
258 }
259}
260
261#[derive(Debug, PartialEq)]
262pub enum Opcode {
263 Error,
264 Startup,
265 Ready,
266 Authenticate,
267 Options,
268 Supported,
269 Query,
270 Result,
271 Prepare,
272 Execute,
273 Register,
274 Event,
275 Batch,
276 AuthChallenge,
277 AuthResponse,
278 AuthSuccess,
279}
280
281impl Opcode {
282 pub const BYTE_LENGTH: usize = 1;
284}
285
286impl AsByte for Opcode {
287 fn as_byte(&self) -> u8 {
288 match self {
289 &Opcode::Error => 0x00,
290 &Opcode::Startup => 0x01,
291 &Opcode::Ready => 0x02,
292 &Opcode::Authenticate => 0x03,
293 &Opcode::Options => 0x05,
294 &Opcode::Supported => 0x06,
295 &Opcode::Query => 0x07,
296 &Opcode::Result => 0x08,
297 &Opcode::Prepare => 0x09,
298 &Opcode::Execute => 0x0A,
299 &Opcode::Register => 0x0B,
300 &Opcode::Event => 0x0C,
301 &Opcode::Batch => 0x0D,
302 &Opcode::AuthChallenge => 0x0E,
303 &Opcode::AuthResponse => 0x0F,
304 &Opcode::AuthSuccess => 0x10,
305 }
306 }
307}
308
309impl From<u8> for Opcode {
310 fn from(b: u8) -> Opcode {
311 match b {
312 0x00 => Opcode::Error,
313 0x01 => Opcode::Startup,
314 0x02 => Opcode::Ready,
315 0x03 => Opcode::Authenticate,
316 0x05 => Opcode::Options,
317 0x06 => Opcode::Supported,
318 0x07 => Opcode::Query,
319 0x08 => Opcode::Result,
320 0x09 => Opcode::Prepare,
321 0x0A => Opcode::Execute,
322 0x0B => Opcode::Register,
323 0x0C => Opcode::Event,
324 0x0D => Opcode::Batch,
325 0x0E => Opcode::AuthChallenge,
326 0x0F => Opcode::AuthResponse,
327 0x10 => Opcode::AuthSuccess,
328 _ => unreachable!(),
329 }
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use crate::frame::traits::AsByte;
337
338 #[test]
339 #[cfg(not(feature = "v3"))]
340 fn test_frame_version_as_byte() {
341 let request_version = Version::Request;
342 assert_eq!(request_version.as_byte(), 0x04);
343 let response_version = Version::Response;
344 assert_eq!(response_version.as_byte(), 0x84);
345 }
346
347 #[test]
348 #[cfg(feature = "v3")]
349 fn test_frame_version_as_byte_v3() {
350 let request_version = Version::Request;
351 assert_eq!(request_version.as_byte(), 0x03);
352 let response_version = Version::Response;
353 assert_eq!(response_version.as_byte(), 0x83);
354 }
355
356 #[test]
357 #[cfg(not(feature = "v3"))]
358 fn test_frame_version_from() {
359 let request: Vec<u8> = vec![0x04];
360 assert_eq!(Version::from(request), Version::Request);
361 let response: Vec<u8> = vec![0x84];
362 assert_eq!(Version::from(response), Version::Response);
363 }
364
365 #[test]
366 #[cfg(feature = "v3")]
367 fn test_frame_version_from_v3() {
368 let request: Vec<u8> = vec![0x03];
369 assert_eq!(Version::from(request), Version::Request);
370 let response: Vec<u8> = vec![0x83];
371 assert_eq!(Version::from(response), Version::Response);
372 }
373
374 #[test]
375 fn test_flag_from() {
376 assert_eq!(Flag::from(0x01 as u8), Flag::Compression);
377 assert_eq!(Flag::from(0x02 as u8), Flag::Tracing);
378 assert_eq!(Flag::from(0x04 as u8), Flag::CustomPayload);
379 assert_eq!(Flag::from(0x08 as u8), Flag::Warning);
380 assert_eq!(Flag::from(0x10 as u8), Flag::Ignore);
382 assert_eq!(Flag::from(0x31 as u8), Flag::Ignore);
383 }
384
385 #[test]
386 fn test_flag_as_byte() {
387 assert_eq!(Flag::Compression.as_byte(), 0x01);
388 assert_eq!(Flag::Tracing.as_byte(), 0x02);
389 assert_eq!(Flag::CustomPayload.as_byte(), 0x04);
390 assert_eq!(Flag::Warning.as_byte(), 0x08);
391 }
392
393 #[test]
394 fn test_flag_has_x() {
395 assert!(Flag::has_compression(0x01));
396 assert!(!Flag::has_compression(0x02));
397
398 assert!(Flag::has_tracing(0x02));
399 assert!(!Flag::has_tracing(0x01));
400
401 assert!(Flag::has_custom_payload(0x04));
402 assert!(!Flag::has_custom_payload(0x02));
403
404 assert!(Flag::has_warning(0x08));
405 assert!(!Flag::has_warning(0x01));
406 }
407
408 #[test]
409 fn test_flag_many_to_cbytes() {
410 let all = vec![
411 Flag::Compression,
412 Flag::Tracing,
413 Flag::CustomPayload,
414 Flag::Warning,
415 ];
416 assert_eq!(Flag::many_to_cbytes(&all), 1 | 2 | 4 | 8);
417 let some = vec![Flag::Compression, Flag::Warning];
418 assert_eq!(Flag::many_to_cbytes(&some), 1 | 8);
419 let one = vec![Flag::Compression];
420 assert_eq!(Flag::many_to_cbytes(&one), 1);
421 }
422
423 #[test]
424 fn test_flag_get_collection() {
425 let all = vec![
426 Flag::Compression,
427 Flag::Tracing,
428 Flag::CustomPayload,
429 Flag::Warning,
430 ];
431 assert_eq!(Flag::get_collection(1 | 2 | 4 | 8), all);
432 let some = vec![Flag::Compression, Flag::Warning];
433 assert_eq!(Flag::get_collection(1 | 8), some);
434 let one = vec![Flag::Compression];
435 assert_eq!(Flag::get_collection(1), one);
436 }
437
438 #[test]
439 fn test_opcode_as_byte() {
440 assert_eq!(Opcode::Error.as_byte(), 0x00);
441 assert_eq!(Opcode::Startup.as_byte(), 0x01);
442 assert_eq!(Opcode::Ready.as_byte(), 0x02);
443 assert_eq!(Opcode::Authenticate.as_byte(), 0x03);
444 assert_eq!(Opcode::Options.as_byte(), 0x05);
445 assert_eq!(Opcode::Supported.as_byte(), 0x06);
446 assert_eq!(Opcode::Query.as_byte(), 0x07);
447 assert_eq!(Opcode::Result.as_byte(), 0x08);
448 assert_eq!(Opcode::Prepare.as_byte(), 0x09);
449 assert_eq!(Opcode::Execute.as_byte(), 0x0A);
450 assert_eq!(Opcode::Register.as_byte(), 0x0B);
451 assert_eq!(Opcode::Event.as_byte(), 0x0C);
452 assert_eq!(Opcode::Batch.as_byte(), 0x0D);
453 assert_eq!(Opcode::AuthChallenge.as_byte(), 0x0E);
454 assert_eq!(Opcode::AuthResponse.as_byte(), 0x0F);
455 assert_eq!(Opcode::AuthSuccess.as_byte(), 0x10);
456 }
457
458 #[test]
459 fn test_opcode_from() {
460 assert_eq!(Opcode::from(0x00), Opcode::Error);
461 assert_eq!(Opcode::from(0x01), Opcode::Startup);
462 assert_eq!(Opcode::from(0x02), Opcode::Ready);
463 assert_eq!(Opcode::from(0x03), Opcode::Authenticate);
464 assert_eq!(Opcode::from(0x05), Opcode::Options);
465 assert_eq!(Opcode::from(0x06), Opcode::Supported);
466 assert_eq!(Opcode::from(0x07), Opcode::Query);
467 assert_eq!(Opcode::from(0x08), Opcode::Result);
468 assert_eq!(Opcode::from(0x09), Opcode::Prepare);
469 assert_eq!(Opcode::from(0x0A), Opcode::Execute);
470 assert_eq!(Opcode::from(0x0B), Opcode::Register);
471 assert_eq!(Opcode::from(0x0C), Opcode::Event);
472 assert_eq!(Opcode::from(0x0D), Opcode::Batch);
473 assert_eq!(Opcode::from(0x0E), Opcode::AuthChallenge);
474 assert_eq!(Opcode::from(0x0F), Opcode::AuthResponse);
475 assert_eq!(Opcode::from(0x10), Opcode::AuthSuccess);
476 }
477}