cassandra_proto/frame/
mod.rs

1//! `frame` module contains general Frame functionality.
2use crate::frame::frame_response::ResponseBody;
3use crate::types::to_n_bytes;
4use crate::uuid::Uuid;
5
6/// Number of stream bytes in accordance to protocol.
7pub const STREAM_LEN: usize = 2;
8/// Number of body length bytes in accordance to protocol.
9pub const LENGTH_LEN: usize = 4;
10
11pub mod events;
12pub mod frame_auth_challenge;
13pub mod frame_auth_response;
14pub mod frame_auth_success;
15pub mod frame_authenticate;
16pub mod frame_batch;
17pub mod frame_error;
18pub mod frame_event;
19pub mod frame_execute;
20pub mod frame_options;
21pub mod frame_prepare;
22pub mod frame_query;
23pub mod frame_ready;
24pub mod frame_register;
25pub mod frame_response;
26pub mod frame_result;
27pub mod frame_startup;
28pub mod frame_supported;
29pub mod parser;
30pub mod parser_async;
31pub mod traits;
32
33use crate::error;
34
35pub use self::traits::*;
36
37#[derive(Debug)]
38pub struct Frame {
39    pub version: Version,
40    pub flags: Vec<Flag>,
41    pub opcode: Opcode,
42    pub stream: u16,
43    pub body: Vec<u8>,
44    pub tracing_id: Option<Uuid>,
45    pub warnings: Vec<String>,
46}
47
48impl Frame {
49    pub fn get_body(&self) -> error::Result<ResponseBody> {
50        ResponseBody::from(self.body.as_slice(), &self.opcode)
51    }
52
53    pub fn tracing_id(&self) -> &Option<Uuid> {
54        &self.tracing_id
55    }
56
57    pub fn warnings(&self) -> &Vec<String> {
58        &self.warnings
59    }
60}
61
62impl<'a> IntoBytes for Frame {
63    fn into_cbytes(&self) -> Vec<u8> {
64        let mut v = vec![];
65
66        let version_bytes = self.version.as_byte();
67        let flag_bytes = Flag::many_to_cbytes(&self.flags);
68        let opcode_bytes = self.opcode.as_byte();
69        let body_len = self.body.len();
70
71        v.push(version_bytes);
72        v.push(flag_bytes);
73        v.extend_from_slice(to_n_bytes(self.stream as u64, STREAM_LEN).as_slice());
74        v.push(opcode_bytes);
75        v.extend_from_slice(to_n_bytes(body_len as u64, LENGTH_LEN).as_slice());
76        v.extend_from_slice(self.body.as_slice());
77
78        v
79    }
80}
81
82/// Frame's version
83#[derive(Debug, PartialEq)]
84pub enum Version {
85    Request,
86    Response,
87}
88
89impl Version {
90    /// Number of bytes that represent Cassandra frame's version.
91    pub const BYTE_LENGTH: usize = 1;
92
93    /// It returns an actual Cassandra request frame version that CDRS can work with.
94    /// This version is based on selected feature - on of `v3`, `v4` or `v5`.
95    fn request_version() -> u8 {
96        if cfg!(feature = "v3") {
97            0x03
98        } else if cfg!(feature = "v4") || cfg!(feature = "v5") {
99            0x04
100        } else {
101            panic!(
102                "{}",
103                "Protocol version is not supported. CDRS should be run with protocol feature \
104                 set to v3, v4 or v5"
105            );
106        }
107    }
108
109    /// It returns an actual Cassandra response frame version that CDRS can work with.
110    /// This version is based on selected feature - on of `v3`, `v4` or `v5`.
111    fn response_version() -> u8 {
112        if cfg!(feature = "v3") {
113            0x83
114        } else if cfg!(feature = "v4") || cfg!(feature = "v5") {
115            0x84
116        } else {
117            panic!(
118                "{}",
119                "Protocol version is not supported. CDRS should be run with protocol feature \
120                 set to v3, v4 or v5"
121            );
122        }
123    }
124}
125
126impl AsByte for Version {
127    fn as_byte(&self) -> u8 {
128        match self {
129            &Version::Request => Version::request_version(),
130            &Version::Response => Version::response_version(),
131        }
132    }
133}
134
135impl From<Vec<u8>> for Version {
136    fn from(v: Vec<u8>) -> Version {
137        if v.len() != Self::BYTE_LENGTH {
138            error!(
139                "Unexpected Cassandra verion. Should has {} byte(-s), got {:?}",
140                Self::BYTE_LENGTH,
141                v
142            );
143            panic!(
144                "Unexpected Cassandra verion. Should has {} byte(-s), got {:?}",
145                Self::BYTE_LENGTH,
146                v
147            );
148        }
149        let version = v[0];
150        let req = Version::request_version();
151        let res = Version::response_version();
152
153        if version == req {
154            Version::Request
155        } else if version == res {
156            Version::Response
157        } else {
158            error!(
159                "Unexpected Cassandra version {:?}, either {:?} or {:?} is expected",
160                version, req, res
161            );
162            panic!(
163                "Unexpected Cassandra version {:?}, either {:?} or {:?} is expected",
164                version, req, res
165            );
166        }
167    }
168}
169
170/// Frame's flag
171// Is not implemented functionality. Only Igonore works for now
172#[derive(Debug, PartialEq)]
173pub enum Flag {
174    Compression,
175    Tracing,
176    CustomPayload,
177    Warning,
178    Ignore,
179}
180
181impl Flag {
182    /// Number of flag bytes in accordance to protocol.
183    const BYTE_LENGTH: usize = 1;
184
185    /// It returns selected flags collection.
186    pub fn get_collection(flags: u8) -> Vec<Flag> {
187        let mut found_flags: Vec<Flag> = vec![];
188
189        if Flag::has_compression(flags) {
190            found_flags.push(Flag::Compression);
191        }
192
193        if Flag::has_tracing(flags) {
194            found_flags.push(Flag::Tracing);
195        }
196
197        if Flag::has_custom_payload(flags) {
198            found_flags.push(Flag::CustomPayload);
199        }
200
201        if Flag::has_warning(flags) {
202            found_flags.push(Flag::Warning);
203        }
204
205        found_flags
206    }
207
208    /// The method converts a serie of `Flag`-s into a single byte.
209    pub fn many_to_cbytes(flags: &Vec<Flag>) -> u8 {
210        flags
211            .iter()
212            .fold(Flag::Ignore.as_byte(), |acc, f| acc | f.as_byte())
213    }
214
215    /// Indicates if flags contains `Flag::Compression`
216    pub fn has_compression(flags: u8) -> bool {
217        (flags & Flag::Compression.as_byte()) > 0
218    }
219
220    /// Indicates if flags contains `Flag::Tracing`
221    pub fn has_tracing(flags: u8) -> bool {
222        (flags & Flag::Tracing.as_byte()) > 0
223    }
224
225    /// Indicates if flags contains `Flag::CustomPayload`
226    pub fn has_custom_payload(flags: u8) -> bool {
227        (flags & Flag::CustomPayload.as_byte()) > 0
228    }
229
230    /// Indicates if flags contains `Flag::Warning`
231    pub fn has_warning(flags: u8) -> bool {
232        (flags & Flag::Warning.as_byte()) > 0
233    }
234}
235
236impl AsByte for Flag {
237    fn as_byte(&self) -> u8 {
238        match self {
239            &Flag::Compression => 0x01,
240            &Flag::Tracing => 0x02,
241            &Flag::CustomPayload => 0x04,
242            &Flag::Warning => 0x08,
243            &Flag::Ignore => 0x00,
244            // assuming that ingoring value whould be other than [0x01, 0x02, 0x04, 0x08]
245        }
246    }
247}
248
249impl From<u8> for Flag {
250    fn from(f: u8) -> Flag {
251        match f {
252            0x01 => Flag::Compression,
253            0x02 => Flag::Tracing,
254            0x04 => Flag::CustomPayload,
255            0x08 => Flag::Warning,
256            _ => Flag::Ignore, // ignore by specification
257        }
258    }
259}
260
261#[derive(Debug, PartialEq)]
262pub enum Opcode {
263    Error,
264    Startup,
265    Ready,
266    Authenticate,
267    Options,
268    Supported,
269    Query,
270    Result,
271    Prepare,
272    Execute,
273    Register,
274    Event,
275    Batch,
276    AuthChallenge,
277    AuthResponse,
278    AuthSuccess,
279}
280
281impl Opcode {
282    // Number of opcode bytes in accordance to protocol.
283    pub const BYTE_LENGTH: usize = 1;
284}
285
286impl AsByte for Opcode {
287    fn as_byte(&self) -> u8 {
288        match self {
289            &Opcode::Error => 0x00,
290            &Opcode::Startup => 0x01,
291            &Opcode::Ready => 0x02,
292            &Opcode::Authenticate => 0x03,
293            &Opcode::Options => 0x05,
294            &Opcode::Supported => 0x06,
295            &Opcode::Query => 0x07,
296            &Opcode::Result => 0x08,
297            &Opcode::Prepare => 0x09,
298            &Opcode::Execute => 0x0A,
299            &Opcode::Register => 0x0B,
300            &Opcode::Event => 0x0C,
301            &Opcode::Batch => 0x0D,
302            &Opcode::AuthChallenge => 0x0E,
303            &Opcode::AuthResponse => 0x0F,
304            &Opcode::AuthSuccess => 0x10,
305        }
306    }
307}
308
309impl From<u8> for Opcode {
310    fn from(b: u8) -> Opcode {
311        match b {
312            0x00 => Opcode::Error,
313            0x01 => Opcode::Startup,
314            0x02 => Opcode::Ready,
315            0x03 => Opcode::Authenticate,
316            0x05 => Opcode::Options,
317            0x06 => Opcode::Supported,
318            0x07 => Opcode::Query,
319            0x08 => Opcode::Result,
320            0x09 => Opcode::Prepare,
321            0x0A => Opcode::Execute,
322            0x0B => Opcode::Register,
323            0x0C => Opcode::Event,
324            0x0D => Opcode::Batch,
325            0x0E => Opcode::AuthChallenge,
326            0x0F => Opcode::AuthResponse,
327            0x10 => Opcode::AuthSuccess,
328            _ => unreachable!(),
329        }
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use crate::frame::traits::AsByte;
337
338    #[test]
339    #[cfg(not(feature = "v3"))]
340    fn test_frame_version_as_byte() {
341        let request_version = Version::Request;
342        assert_eq!(request_version.as_byte(), 0x04);
343        let response_version = Version::Response;
344        assert_eq!(response_version.as_byte(), 0x84);
345    }
346
347    #[test]
348    #[cfg(feature = "v3")]
349    fn test_frame_version_as_byte_v3() {
350        let request_version = Version::Request;
351        assert_eq!(request_version.as_byte(), 0x03);
352        let response_version = Version::Response;
353        assert_eq!(response_version.as_byte(), 0x83);
354    }
355
356    #[test]
357    #[cfg(not(feature = "v3"))]
358    fn test_frame_version_from() {
359        let request: Vec<u8> = vec![0x04];
360        assert_eq!(Version::from(request), Version::Request);
361        let response: Vec<u8> = vec![0x84];
362        assert_eq!(Version::from(response), Version::Response);
363    }
364
365    #[test]
366    #[cfg(feature = "v3")]
367    fn test_frame_version_from_v3() {
368        let request: Vec<u8> = vec![0x03];
369        assert_eq!(Version::from(request), Version::Request);
370        let response: Vec<u8> = vec![0x83];
371        assert_eq!(Version::from(response), Version::Response);
372    }
373
374    #[test]
375    fn test_flag_from() {
376        assert_eq!(Flag::from(0x01 as u8), Flag::Compression);
377        assert_eq!(Flag::from(0x02 as u8), Flag::Tracing);
378        assert_eq!(Flag::from(0x04 as u8), Flag::CustomPayload);
379        assert_eq!(Flag::from(0x08 as u8), Flag::Warning);
380        // rest should be interpreted as Ignore
381        assert_eq!(Flag::from(0x10 as u8), Flag::Ignore);
382        assert_eq!(Flag::from(0x31 as u8), Flag::Ignore);
383    }
384
385    #[test]
386    fn test_flag_as_byte() {
387        assert_eq!(Flag::Compression.as_byte(), 0x01);
388        assert_eq!(Flag::Tracing.as_byte(), 0x02);
389        assert_eq!(Flag::CustomPayload.as_byte(), 0x04);
390        assert_eq!(Flag::Warning.as_byte(), 0x08);
391    }
392
393    #[test]
394    fn test_flag_has_x() {
395        assert!(Flag::has_compression(0x01));
396        assert!(!Flag::has_compression(0x02));
397
398        assert!(Flag::has_tracing(0x02));
399        assert!(!Flag::has_tracing(0x01));
400
401        assert!(Flag::has_custom_payload(0x04));
402        assert!(!Flag::has_custom_payload(0x02));
403
404        assert!(Flag::has_warning(0x08));
405        assert!(!Flag::has_warning(0x01));
406    }
407
408    #[test]
409    fn test_flag_many_to_cbytes() {
410        let all = vec![
411            Flag::Compression,
412            Flag::Tracing,
413            Flag::CustomPayload,
414            Flag::Warning,
415        ];
416        assert_eq!(Flag::many_to_cbytes(&all), 1 | 2 | 4 | 8);
417        let some = vec![Flag::Compression, Flag::Warning];
418        assert_eq!(Flag::many_to_cbytes(&some), 1 | 8);
419        let one = vec![Flag::Compression];
420        assert_eq!(Flag::many_to_cbytes(&one), 1);
421    }
422
423    #[test]
424    fn test_flag_get_collection() {
425        let all = vec![
426            Flag::Compression,
427            Flag::Tracing,
428            Flag::CustomPayload,
429            Flag::Warning,
430        ];
431        assert_eq!(Flag::get_collection(1 | 2 | 4 | 8), all);
432        let some = vec![Flag::Compression, Flag::Warning];
433        assert_eq!(Flag::get_collection(1 | 8), some);
434        let one = vec![Flag::Compression];
435        assert_eq!(Flag::get_collection(1), one);
436    }
437
438    #[test]
439    fn test_opcode_as_byte() {
440        assert_eq!(Opcode::Error.as_byte(), 0x00);
441        assert_eq!(Opcode::Startup.as_byte(), 0x01);
442        assert_eq!(Opcode::Ready.as_byte(), 0x02);
443        assert_eq!(Opcode::Authenticate.as_byte(), 0x03);
444        assert_eq!(Opcode::Options.as_byte(), 0x05);
445        assert_eq!(Opcode::Supported.as_byte(), 0x06);
446        assert_eq!(Opcode::Query.as_byte(), 0x07);
447        assert_eq!(Opcode::Result.as_byte(), 0x08);
448        assert_eq!(Opcode::Prepare.as_byte(), 0x09);
449        assert_eq!(Opcode::Execute.as_byte(), 0x0A);
450        assert_eq!(Opcode::Register.as_byte(), 0x0B);
451        assert_eq!(Opcode::Event.as_byte(), 0x0C);
452        assert_eq!(Opcode::Batch.as_byte(), 0x0D);
453        assert_eq!(Opcode::AuthChallenge.as_byte(), 0x0E);
454        assert_eq!(Opcode::AuthResponse.as_byte(), 0x0F);
455        assert_eq!(Opcode::AuthSuccess.as_byte(), 0x10);
456    }
457
458    #[test]
459    fn test_opcode_from() {
460        assert_eq!(Opcode::from(0x00), Opcode::Error);
461        assert_eq!(Opcode::from(0x01), Opcode::Startup);
462        assert_eq!(Opcode::from(0x02), Opcode::Ready);
463        assert_eq!(Opcode::from(0x03), Opcode::Authenticate);
464        assert_eq!(Opcode::from(0x05), Opcode::Options);
465        assert_eq!(Opcode::from(0x06), Opcode::Supported);
466        assert_eq!(Opcode::from(0x07), Opcode::Query);
467        assert_eq!(Opcode::from(0x08), Opcode::Result);
468        assert_eq!(Opcode::from(0x09), Opcode::Prepare);
469        assert_eq!(Opcode::from(0x0A), Opcode::Execute);
470        assert_eq!(Opcode::from(0x0B), Opcode::Register);
471        assert_eq!(Opcode::from(0x0C), Opcode::Event);
472        assert_eq!(Opcode::from(0x0D), Opcode::Batch);
473        assert_eq!(Opcode::from(0x0E), Opcode::AuthChallenge);
474        assert_eq!(Opcode::from(0x0F), Opcode::AuthResponse);
475        assert_eq!(Opcode::from(0x10), Opcode::AuthSuccess);
476    }
477}