cassandra_protocol/
frame.rs

1use crate::compression::{Compression, CompressionError};
2use crate::frame::message_request::RequestBody;
3use crate::frame::message_response::ResponseBody;
4use crate::types::data_serialization_types::decode_timeuuid;
5use crate::types::{from_cursor_string_list, try_i16_from_bytes, try_i32_from_bytes, UUID_LEN};
6use bitflags::bitflags;
7use derivative::Derivative;
8use derive_more::{Constructor, Display};
9use std::convert::TryFrom;
10use std::io::Cursor;
11use thiserror::Error;
12use uuid::Uuid;
13
14pub use crate::frame::traits::*;
15
16/// Number of bytes in the header
17const ENVELOPE_HEADER_LEN: usize = 9;
18/// Number of stream bytes in accordance to protocol.
19pub const STREAM_LEN: usize = 2;
20/// Number of body length bytes in accordance to protocol.
21pub const LENGTH_LEN: usize = 4;
22
23pub mod events;
24pub mod frame_decoder;
25pub mod frame_encoder;
26pub mod message_auth_challenge;
27pub mod message_auth_response;
28pub mod message_auth_success;
29pub mod message_authenticate;
30pub mod message_batch;
31pub mod message_error;
32pub mod message_event;
33pub mod message_execute;
34pub mod message_options;
35pub mod message_prepare;
36pub mod message_query;
37pub mod message_ready;
38pub mod message_register;
39pub mod message_request;
40pub mod message_response;
41pub mod message_result;
42pub mod message_startup;
43pub mod message_supported;
44pub mod traits;
45
46use crate::error;
47
48pub const EVENT_STREAM_ID: i16 = -1;
49
50const fn const_max(a: usize, b: usize) -> usize {
51    if a < b {
52        a
53    } else {
54        b
55    }
56}
57
58/// Maximum size of frame payloads - aggregated envelopes or a part of a single envelope.
59pub const PAYLOAD_SIZE_LIMIT: usize = 1 << 17;
60
61const UNCOMPRESSED_FRAME_HEADER_LENGTH: usize = 6;
62const COMPRESSED_FRAME_HEADER_LENGTH: usize = 8;
63const FRAME_TRAILER_LENGTH: usize = 4;
64
65/// Maximum size of an entire frame.
66pub const MAX_FRAME_SIZE: usize = PAYLOAD_SIZE_LIMIT
67    + const_max(
68        UNCOMPRESSED_FRAME_HEADER_LENGTH,
69        COMPRESSED_FRAME_HEADER_LENGTH,
70    )
71    + FRAME_TRAILER_LENGTH;
72
73/// Cassandra stream identifier.
74pub type StreamId = i16;
75
76#[derive(Debug, Clone, Eq, PartialEq, Hash, Constructor)]
77pub struct ParsedEnvelope {
78    /// How many bytes from the buffer have been read.
79    pub envelope_len: usize,
80    /// The parsed envelope.
81    pub envelope: Envelope,
82}
83
84#[derive(Derivative, Clone, PartialEq, Eq, Hash)]
85#[derivative(Debug)]
86pub struct Envelope {
87    pub version: Version,
88    pub direction: Direction,
89    pub flags: Flags,
90    pub opcode: Opcode,
91    pub stream_id: StreamId,
92    #[derivative(Debug = "ignore")]
93    pub body: Vec<u8>,
94    pub tracing_id: Option<Uuid>,
95    pub warnings: Vec<String>,
96}
97
98impl Envelope {
99    #[inline]
100    #[allow(clippy::too_many_arguments)]
101    pub fn new(
102        version: Version,
103        direction: Direction,
104        flags: Flags,
105        opcode: Opcode,
106        stream_id: StreamId,
107        body: Vec<u8>,
108        tracing_id: Option<Uuid>,
109        warnings: Vec<String>,
110    ) -> Self {
111        Envelope {
112            version,
113            direction,
114            flags,
115            opcode,
116            stream_id,
117            body,
118            tracing_id,
119            warnings,
120        }
121    }
122
123    #[inline]
124    pub fn request_body(&self) -> error::Result<RequestBody> {
125        RequestBody::try_from(self.body.as_slice(), self.opcode, self.version)
126    }
127
128    #[inline]
129    pub fn response_body(&self) -> error::Result<ResponseBody> {
130        ResponseBody::try_from(self.body.as_slice(), self.opcode, self.version)
131    }
132
133    #[inline]
134    pub fn tracing_id(&self) -> &Option<Uuid> {
135        &self.tracing_id
136    }
137
138    #[inline]
139    pub fn warnings(&self) -> &[String] {
140        &self.warnings
141    }
142
143    /// Parses the raw bytes of a cassandra envelope returning a [`ParsedEnvelope`] struct.
144    /// The typical use case is reading from a buffer that may contain 0 or more envelopes and where
145    /// the last envelope may be incomplete. The possible return values are:
146    /// * `Ok(ParsedEnvelope)` - The first envelope in the buffer has been successfully parsed.
147    /// * `Err(ParseEnvelopeError::NotEnoughBytes)` - There are not enough bytes to parse a single envelope, [`Envelope::from_buffer`] should be recalled when it is possible that there are more bytes.
148    /// * `Err(_)` - The envelope is malformed and you should close the connection as this method does not provide a way to tell how many bytes to advance the buffer in this case.
149    pub fn from_buffer(
150        data: &[u8],
151        compression: Compression,
152    ) -> Result<ParsedEnvelope, ParseEnvelopeError> {
153        if data.len() < ENVELOPE_HEADER_LEN {
154            return Err(ParseEnvelopeError::NotEnoughBytes);
155        }
156
157        let body_len = try_i32_from_bytes(&data[5..9]).unwrap() as usize;
158        let envelope_len = ENVELOPE_HEADER_LEN + body_len;
159        if data.len() < envelope_len {
160            return Err(ParseEnvelopeError::NotEnoughBytes);
161        }
162
163        let version = Version::try_from(data[0])
164            .map_err(|_| ParseEnvelopeError::UnsupportedVersion(data[0] & 0x7f))?;
165        let direction = Direction::from(data[0]);
166        let flags = Flags::from_bits_truncate(data[1]);
167        let stream_id = try_i16_from_bytes(&data[2..4]).unwrap();
168        let opcode = Opcode::try_from(data[4])
169            .map_err(|_| ParseEnvelopeError::UnsupportedOpcode(data[4]))?;
170
171        let body_bytes = &data[ENVELOPE_HEADER_LEN..envelope_len];
172
173        let full_body = if flags.contains(Flags::COMPRESSION) {
174            compression.decode(body_bytes.to_vec())
175        } else {
176            Compression::None.decode(body_bytes.to_vec())
177        }
178        .map_err(ParseEnvelopeError::DecompressionError)?;
179
180        let body_len = full_body.len();
181
182        // Use cursor to get tracing id, warnings and actual body
183        let mut body_cursor = Cursor::new(full_body.as_slice());
184
185        let tracing_id = if flags.contains(Flags::TRACING) && direction == Direction::Response {
186            let mut tracing_bytes = [0; UUID_LEN];
187            std::io::Read::read_exact(&mut body_cursor, &mut tracing_bytes).unwrap();
188
189            Some(decode_timeuuid(&tracing_bytes).map_err(ParseEnvelopeError::InvalidUuid)?)
190        } else {
191            None
192        };
193
194        let warnings = if flags.contains(Flags::WARNING) {
195            from_cursor_string_list(&mut body_cursor)
196                .map_err(ParseEnvelopeError::InvalidWarnings)?
197        } else {
198            vec![]
199        };
200
201        let mut body = Vec::with_capacity(body_len - body_cursor.position() as usize);
202
203        std::io::Read::read_to_end(&mut body_cursor, &mut body)
204            .expect("Read cannot fail because cursor is backed by slice");
205
206        Ok(ParsedEnvelope::new(
207            envelope_len,
208            Envelope {
209                version,
210                direction,
211                flags,
212                opcode,
213                stream_id,
214                body,
215                tracing_id,
216                warnings,
217            },
218        ))
219    }
220
221    pub fn check_envelope_size(data: &[u8]) -> Result<usize, CheckEnvelopeSizeError> {
222        if data.len() < ENVELOPE_HEADER_LEN {
223            return Err(CheckEnvelopeSizeError::NotEnoughBytes);
224        }
225
226        let body_len = try_i32_from_bytes(&data[5..9]).unwrap() as usize;
227        let envelope_len = ENVELOPE_HEADER_LEN + body_len;
228        if data.len() < envelope_len {
229            return Err(CheckEnvelopeSizeError::NotEnoughBytes);
230        }
231        let _ = Version::try_from(data[0])
232            .map_err(|_| CheckEnvelopeSizeError::UnsupportedVersion(data[0] & 0x7f))?;
233
234        Ok(envelope_len)
235    }
236
237    pub fn encode_with(&self, compressor: Compression) -> error::Result<Vec<u8>> {
238        // compression is ignored since v5
239        let is_compressed = self.version < Version::V5 && compressor.is_compressed();
240
241        let combined_version_byte = u8::from(self.version) | u8::from(self.direction);
242        let flag_byte = (if is_compressed {
243            self.flags | Flags::COMPRESSION
244        } else {
245            self.flags.difference(Flags::COMPRESSION)
246        })
247        .bits();
248
249        let opcode_byte = u8::from(self.opcode);
250
251        let mut v = Vec::with_capacity(9);
252
253        v.push(combined_version_byte);
254        v.push(flag_byte);
255        v.extend_from_slice(&self.stream_id.to_be_bytes());
256        v.push(opcode_byte);
257
258        let mut flags_buffer = vec![];
259
260        if self.flags.contains(Flags::TRACING) && self.direction == Direction::Response {
261            let mut tracing_id = self
262                .tracing_id
263                .ok_or_else(|| {
264                    error::Error::Io(std::io::Error::other(
265                        "Tracing flag was set but Envelope has no tracing_id",
266                    ))
267                })?
268                .into_bytes()
269                .to_vec();
270
271            flags_buffer.append(&mut tracing_id);
272        };
273
274        if self.flags.contains(Flags::WARNING) && self.direction == Direction::Response {
275            let warnings_len = self.warnings.len() as i16;
276            flags_buffer.extend_from_slice(&warnings_len.to_be_bytes());
277
278            for warning in &self.warnings {
279                let warning_len = warning.len() as i16;
280                flags_buffer.extend_from_slice(&warning_len.to_be_bytes());
281                flags_buffer.append(&mut warning.as_bytes().to_vec());
282            }
283        }
284
285        if is_compressed {
286            // avoid having to copy the body if there is nothing in flags_buffer
287            let encoded_body = if flags_buffer.is_empty() {
288                compressor.encode(&self.body)?
289            } else {
290                flags_buffer.extend_from_slice(&self.body);
291                compressor.encode(&flags_buffer)?
292            };
293
294            let body_len = encoded_body.len() as i32;
295            v.extend_from_slice(&body_len.to_be_bytes());
296            v.extend_from_slice(&encoded_body);
297        } else {
298            // avoid having to copy the body if there is nothing in flags_buffer
299            if flags_buffer.is_empty() {
300                let body_len = self.body.len() as i32;
301                v.extend_from_slice(&body_len.to_be_bytes());
302                v.extend_from_slice(&self.body);
303            } else {
304                let body_len = self.body.len() as i32 + flags_buffer.len() as i32;
305                v.extend_from_slice(&body_len.to_be_bytes());
306                flags_buffer.extend_from_slice(&self.body);
307                v.append(&mut flags_buffer);
308            }
309        }
310
311        Ok(v)
312    }
313}
314
315#[derive(Debug, Error)]
316#[non_exhaustive]
317pub enum CheckEnvelopeSizeError {
318    #[error("Not enough bytes!")]
319    NotEnoughBytes,
320    #[error("Unsupported version: {0}")]
321    UnsupportedVersion(u8),
322    #[error("Unsupported opcode: {0}")]
323    UnsupportedOpcode(u8),
324}
325
326#[derive(Debug, Error)]
327#[non_exhaustive]
328pub enum ParseEnvelopeError {
329    /// There are not enough bytes to parse a single envelope, [`Envelope::from_buffer`] should be recalled when it is possible that there are more bytes.
330    #[error("Not enough bytes!")]
331    NotEnoughBytes,
332    /// The version is not supported by cassandra-protocol, a server implementation should handle this by returning a server error with the message "Invalid or unsupported protocol version".
333    #[error("Unsupported version: {0}")]
334    UnsupportedVersion(u8),
335    #[error("Unsupported opcode: {0}")]
336    UnsupportedOpcode(u8),
337    #[error("Decompression error: {0}")]
338    DecompressionError(CompressionError),
339    #[error("Invalid uuid: {0}")]
340    InvalidUuid(uuid::Error),
341    #[error("Invalid warnings: {0}")]
342    InvalidWarnings(error::Error),
343}
344
345/// Protocol version.
346#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
347#[non_exhaustive]
348pub enum Version {
349    V3,
350    V4,
351    V5,
352}
353
354impl From<Version> for u8 {
355    fn from(value: Version) -> Self {
356        match value {
357            Version::V3 => 3,
358            Version::V4 => 4,
359            Version::V5 => 5,
360        }
361    }
362}
363
364impl TryFrom<u8> for Version {
365    type Error = error::Error;
366
367    fn try_from(version: u8) -> Result<Self, Self::Error> {
368        match version & 0x7F {
369            3 => Ok(Version::V3),
370            4 => Ok(Version::V4),
371            5 => Ok(Version::V5),
372            v => Err(error::Error::General(format!(
373                "Unknown cassandra version: {v}"
374            ))),
375        }
376    }
377}
378
379impl Version {
380    /// Number of bytes that represent Cassandra frame's version.
381    pub const BYTE_LENGTH: usize = 1;
382}
383
384#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
385pub enum Direction {
386    Request,
387    Response,
388}
389
390impl From<Direction> for u8 {
391    fn from(value: Direction) -> u8 {
392        match value {
393            Direction::Request => 0x00,
394            Direction::Response => 0x80,
395        }
396    }
397}
398
399impl From<u8> for Direction {
400    fn from(value: u8) -> Self {
401        match value & 0x80 {
402            0 => Direction::Request,
403            _ => Direction::Response,
404        }
405    }
406}
407
408bitflags! {
409    /// Envelope flags
410    #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
411    pub struct Flags: u8 {
412        const COMPRESSION = 0x01;
413        const TRACING = 0x02;
414        const CUSTOM_PAYLOAD = 0x04;
415        const WARNING = 0x08;
416        const BETA = 0x10;
417    }
418}
419
420impl Default for Flags {
421    #[inline]
422    fn default() -> Self {
423        Flags::empty()
424    }
425}
426
427impl Flags {
428    // Number of opcode bytes in accordance to protocol.
429    pub const BYTE_LENGTH: usize = 1;
430}
431
432#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
433#[non_exhaustive]
434pub enum Opcode {
435    Error,
436    Startup,
437    Ready,
438    Authenticate,
439    Options,
440    Supported,
441    Query,
442    Result,
443    Prepare,
444    Execute,
445    Register,
446    Event,
447    Batch,
448    AuthChallenge,
449    AuthResponse,
450    AuthSuccess,
451}
452
453impl Opcode {
454    // Number of opcode bytes in accordance to protocol.
455    pub const BYTE_LENGTH: usize = 1;
456}
457
458impl From<Opcode> for u8 {
459    fn from(value: Opcode) -> Self {
460        match value {
461            Opcode::Error => 0x00,
462            Opcode::Startup => 0x01,
463            Opcode::Ready => 0x02,
464            Opcode::Authenticate => 0x03,
465            Opcode::Options => 0x05,
466            Opcode::Supported => 0x06,
467            Opcode::Query => 0x07,
468            Opcode::Result => 0x08,
469            Opcode::Prepare => 0x09,
470            Opcode::Execute => 0x0A,
471            Opcode::Register => 0x0B,
472            Opcode::Event => 0x0C,
473            Opcode::Batch => 0x0D,
474            Opcode::AuthChallenge => 0x0E,
475            Opcode::AuthResponse => 0x0F,
476            Opcode::AuthSuccess => 0x10,
477        }
478    }
479}
480
481impl TryFrom<u8> for Opcode {
482    type Error = error::Error;
483
484    fn try_from(value: u8) -> Result<Self, <Opcode as TryFrom<u8>>::Error> {
485        match value {
486            0x00 => Ok(Opcode::Error),
487            0x01 => Ok(Opcode::Startup),
488            0x02 => Ok(Opcode::Ready),
489            0x03 => Ok(Opcode::Authenticate),
490            0x05 => Ok(Opcode::Options),
491            0x06 => Ok(Opcode::Supported),
492            0x07 => Ok(Opcode::Query),
493            0x08 => Ok(Opcode::Result),
494            0x09 => Ok(Opcode::Prepare),
495            0x0A => Ok(Opcode::Execute),
496            0x0B => Ok(Opcode::Register),
497            0x0C => Ok(Opcode::Event),
498            0x0D => Ok(Opcode::Batch),
499            0x0E => Ok(Opcode::AuthChallenge),
500            0x0F => Ok(Opcode::AuthResponse),
501            0x10 => Ok(Opcode::AuthSuccess),
502            _ => Err(error::Error::General(format!("Unknown opcode: {value}"))),
503        }
504    }
505}
506
507#[cfg(test)]
508mod helpers {
509    use super::*;
510
511    pub fn test_encode_decode_roundtrip_response(
512        raw_envelope: &[u8],
513        envelope: Envelope,
514        body: ResponseBody,
515    ) {
516        // test encode
517        let encoded_body = body.serialize_to_vec(Version::V4);
518        assert_eq!(
519            &envelope.body, &encoded_body,
520            "encoded body did not match envelope's body"
521        );
522
523        let encoded_envelope = envelope.encode_with(Compression::None).unwrap();
524        assert_eq!(
525            raw_envelope, &encoded_envelope,
526            "encoded envelope did not match expected raw envelope"
527        );
528
529        // test decode
530        let decoded_envelope = Envelope::from_buffer(raw_envelope, Compression::None)
531            .unwrap()
532            .envelope;
533        assert_eq!(decoded_envelope, envelope);
534
535        let decoded_body = envelope.response_body().unwrap();
536        assert_eq!(
537            body, decoded_body,
538            "decoded envelope.body did not match body"
539        )
540    }
541
542    pub fn test_encode_decode_roundtrip_request(
543        raw_envelope: &[u8],
544        envelope: Envelope,
545        body: RequestBody,
546    ) {
547        // test encode
548        let encoded_body = body.serialize_to_vec(Version::V4);
549        assert_eq!(
550            &envelope.body, &encoded_body,
551            "encoded body did not match envelope's body"
552        );
553
554        let encoded_envelope = envelope.encode_with(Compression::None).unwrap();
555        assert_eq!(
556            raw_envelope, &encoded_envelope,
557            "encoded envelope did not match expected raw envelope"
558        );
559
560        // test decode
561        let decoded_envelope = Envelope::from_buffer(raw_envelope, Compression::None)
562            .unwrap()
563            .envelope;
564        assert_eq!(envelope, decoded_envelope);
565
566        let decoded_body = envelope.request_body().unwrap();
567        assert_eq!(
568            body, decoded_body,
569            "decoded envelope.body did not match body"
570        )
571    }
572
573    /// Use this when the body binary representation is nondeterministic but the body typed representation is deterministic
574    pub fn test_encode_decode_roundtrip_nondeterministic_request(
575        mut envelope: Envelope,
576        body: RequestBody,
577    ) {
578        // test encode
579        envelope.body = body.serialize_to_vec(Version::V4);
580
581        // test decode
582        let decoded_body = envelope.request_body().unwrap();
583        assert_eq!(
584            body, decoded_body,
585            "decoded envelope.body did not match body"
586        )
587    }
588}
589
590//noinspection DuplicatedCode
591#[cfg(test)]
592mod tests {
593    use super::*;
594    use crate::consistency::Consistency;
595    use crate::frame::frame_decoder::{
596        FrameDecoder, LegacyFrameDecoder, Lz4FrameDecoder, UncompressedFrameDecoder,
597    };
598    use crate::frame::frame_encoder::{
599        FrameEncoder, LegacyFrameEncoder, Lz4FrameEncoder, UncompressedFrameEncoder,
600    };
601    use crate::frame::message_query::BodyReqQuery;
602    use crate::query::query_params::QueryParams;
603    use crate::query::query_values::QueryValues;
604    use crate::types::value::Value;
605    use crate::types::CBytes;
606
607    #[test]
608    fn test_frame_version_as_byte() {
609        assert_eq!(u8::from(Version::V3), 0x03);
610        assert_eq!(u8::from(Version::V4), 0x04);
611        assert_eq!(u8::from(Version::V5), 0x05);
612
613        assert_eq!(u8::from(Direction::Request), 0x00);
614        assert_eq!(u8::from(Direction::Response), 0x80);
615    }
616
617    #[test]
618    fn test_frame_version_from() {
619        assert_eq!(Version::try_from(0x03).unwrap(), Version::V3);
620        assert_eq!(Version::try_from(0x83).unwrap(), Version::V3);
621        assert_eq!(Version::try_from(0x04).unwrap(), Version::V4);
622        assert_eq!(Version::try_from(0x84).unwrap(), Version::V4);
623        assert_eq!(Version::try_from(0x05).unwrap(), Version::V5);
624        assert_eq!(Version::try_from(0x85).unwrap(), Version::V5);
625
626        assert_eq!(Direction::from(0x03), Direction::Request);
627        assert_eq!(Direction::from(0x04), Direction::Request);
628        assert_eq!(Direction::from(0x05), Direction::Request);
629        assert_eq!(Direction::from(0x83), Direction::Response);
630        assert_eq!(Direction::from(0x84), Direction::Response);
631        assert_eq!(Direction::from(0x85), Direction::Response);
632    }
633
634    #[test]
635    fn test_opcode_as_byte() {
636        assert_eq!(u8::from(Opcode::Error), 0x00);
637        assert_eq!(u8::from(Opcode::Startup), 0x01);
638        assert_eq!(u8::from(Opcode::Ready), 0x02);
639        assert_eq!(u8::from(Opcode::Authenticate), 0x03);
640        assert_eq!(u8::from(Opcode::Options), 0x05);
641        assert_eq!(u8::from(Opcode::Supported), 0x06);
642        assert_eq!(u8::from(Opcode::Query), 0x07);
643        assert_eq!(u8::from(Opcode::Result), 0x08);
644        assert_eq!(u8::from(Opcode::Prepare), 0x09);
645        assert_eq!(u8::from(Opcode::Execute), 0x0A);
646        assert_eq!(u8::from(Opcode::Register), 0x0B);
647        assert_eq!(u8::from(Opcode::Event), 0x0C);
648        assert_eq!(u8::from(Opcode::Batch), 0x0D);
649        assert_eq!(u8::from(Opcode::AuthChallenge), 0x0E);
650        assert_eq!(u8::from(Opcode::AuthResponse), 0x0F);
651        assert_eq!(u8::from(Opcode::AuthSuccess), 0x10);
652    }
653
654    #[test]
655    fn test_opcode_from() {
656        assert_eq!(Opcode::try_from(0x00).unwrap(), Opcode::Error);
657        assert_eq!(Opcode::try_from(0x01).unwrap(), Opcode::Startup);
658        assert_eq!(Opcode::try_from(0x02).unwrap(), Opcode::Ready);
659        assert_eq!(Opcode::try_from(0x03).unwrap(), Opcode::Authenticate);
660        assert_eq!(Opcode::try_from(0x05).unwrap(), Opcode::Options);
661        assert_eq!(Opcode::try_from(0x06).unwrap(), Opcode::Supported);
662        assert_eq!(Opcode::try_from(0x07).unwrap(), Opcode::Query);
663        assert_eq!(Opcode::try_from(0x08).unwrap(), Opcode::Result);
664        assert_eq!(Opcode::try_from(0x09).unwrap(), Opcode::Prepare);
665        assert_eq!(Opcode::try_from(0x0A).unwrap(), Opcode::Execute);
666        assert_eq!(Opcode::try_from(0x0B).unwrap(), Opcode::Register);
667        assert_eq!(Opcode::try_from(0x0C).unwrap(), Opcode::Event);
668        assert_eq!(Opcode::try_from(0x0D).unwrap(), Opcode::Batch);
669        assert_eq!(Opcode::try_from(0x0E).unwrap(), Opcode::AuthChallenge);
670        assert_eq!(Opcode::try_from(0x0F).unwrap(), Opcode::AuthResponse);
671        assert_eq!(Opcode::try_from(0x10).unwrap(), Opcode::AuthSuccess);
672    }
673
674    #[test]
675    fn test_ready() {
676        let raw_envelope = vec![4, 0, 0, 0, 2, 0, 0, 0, 0];
677        let envelope = Envelope {
678            version: Version::V4,
679            direction: Direction::Request,
680            flags: Flags::empty(),
681            opcode: Opcode::Ready,
682            stream_id: 0,
683            body: vec![],
684            tracing_id: None,
685            warnings: vec![],
686        };
687        let body = ResponseBody::Ready;
688        helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
689    }
690
691    #[test]
692    fn test_query_minimal() {
693        let raw_envelope = [
694            4, 0, 0, 0, 7, 0, 0, 0, 11, 0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64,
695        ];
696        let envelope = Envelope {
697            version: Version::V4,
698            direction: Direction::Request,
699            flags: Flags::empty(),
700            opcode: Opcode::Query,
701            stream_id: 0,
702            body: vec![0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64],
703            tracing_id: None,
704            warnings: vec![],
705        };
706        let body = RequestBody::Query(BodyReqQuery {
707            query: "blah".into(),
708            query_params: QueryParams {
709                consistency: Consistency::Any,
710                with_names: true,
711                values: None,
712                page_size: None,
713                paging_state: None,
714                serial_consistency: None,
715                timestamp: None,
716                keyspace: None,
717                now_in_seconds: None,
718            },
719        });
720        helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
721    }
722
723    #[test]
724    fn test_query_simple_values() {
725        let raw_envelope = [
726            4, 0, 0, 0, 7, 0, 0, 0, 30, 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114,
727            121, 0, 8, 1, 0, 2, 0, 0, 0, 3, 1, 2, 3, 255, 255, 255, 255,
728        ];
729        let envelope = Envelope {
730            version: Version::V4,
731            direction: Direction::Request,
732            flags: Flags::empty(),
733            opcode: Opcode::Query,
734            stream_id: 0,
735            body: vec![
736                0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114, 121, 0, 8, 1, 0, 2, 0, 0,
737                0, 3, 1, 2, 3, 255, 255, 255, 255,
738            ],
739            tracing_id: None,
740            warnings: vec![],
741        };
742        let body = RequestBody::Query(BodyReqQuery {
743            query: "some query".into(),
744            query_params: QueryParams {
745                consistency: Consistency::Serial,
746                with_names: false,
747                values: Some(QueryValues::SimpleValues(vec![
748                    Value::Some(vec![1, 2, 3]),
749                    Value::Null,
750                ])),
751                page_size: None,
752                paging_state: None,
753                serial_consistency: None,
754                timestamp: None,
755                keyspace: None,
756                now_in_seconds: None,
757            },
758        });
759        helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
760    }
761
762    #[test]
763    fn test_query_named_values() {
764        let envelope = Envelope {
765            version: Version::V4,
766            direction: Direction::Request,
767            flags: Flags::empty(),
768            opcode: Opcode::Query,
769            stream_id: 0,
770            body: vec![],
771            tracing_id: None,
772            warnings: vec![],
773        };
774        let body = RequestBody::Query(BodyReqQuery {
775            query: "another query".into(),
776            query_params: QueryParams {
777                consistency: Consistency::Three,
778                with_names: true,
779                values: Some(QueryValues::NamedValues(
780                    vec![
781                        ("foo".to_string(), Value::Some(vec![11, 12, 13])),
782                        ("bar".to_string(), Value::NotSet),
783                        ("baz".to_string(), Value::Some(vec![42, 10, 99, 100, 4])),
784                    ]
785                    .into_iter()
786                    .collect(),
787                )),
788                page_size: Some(4),
789                paging_state: Some(CBytes::new(vec![0, 1, 2, 3])),
790                serial_consistency: Some(Consistency::One),
791                timestamp: Some(2000),
792                keyspace: None,
793                now_in_seconds: None,
794            },
795        });
796        helpers::test_encode_decode_roundtrip_nondeterministic_request(envelope, body);
797    }
798
799    #[test]
800    fn test_result_prepared_statement() {
801        use crate::frame::message_result::{
802            BodyResResultPrepared, ColSpec, ColType, ColTypeOption, PreparedMetadata,
803            ResResultBody, RowsMetadata, RowsMetadataFlags, TableSpec,
804        };
805        use crate::types::CBytesShort;
806
807        let raw_envelope = [
808            132, 0, 0, 0, 8, 0, 0, 0, 97, // cassandra header
809            0, 0, 0, 4, // prepared statement result
810            0, 16, 195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27,
811            73, // id
812            0, 0, 0, 1, // prepared metadata flags
813            0, 0, 0, 3, // columns count
814            0, 0, 0, 1, // pk count
815            0, 0, // pk index 1
816            0, 23, 116, 101, 115, 116, 95, 112, 114, 101, 112, 97, 114, 101, 95, 115, 116, 97, 116,
817            101, 109, 101, 110, 116,
818            115, // global_table_spec.ks_name = test_prepare_statements
819            0, 7, 116, 97, 98, 108, 101, 95, 49, // global_table_spec.table_name = table_1
820            0, 2, 105, 100, // ColSpec.name = "id"
821            0, 9, // ColSpec.col_type = Int
822            0, 1, 120, // ColSpec.name = "x"
823            0, 9, // ColSpec.col_type = Int
824            0, 4, 110, 97, 109, 101, // ColSpec.name = "name"
825            0, 13, // ColSpec.col_type = VarChar
826            0, 0, 0, 4, // row metadata flags
827            0, 0, 0, 0, // columns count
828        ];
829        let envelope = Envelope {
830            version: Version::V4,
831            direction: Direction::Response,
832            flags: Flags::empty(),
833            opcode: Opcode::Result,
834            stream_id: 0,
835            body: vec![
836                0, 0, 0, 4, // prepared statement result
837                0, 16, 195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27,
838                73, // id
839                0, 0, 0, 1, // prepared metadata flags
840                0, 0, 0, 3, // columns count
841                0, 0, 0, 1, // pk count
842                0, 0, // pk index 1
843                0, 23, 116, 101, 115, 116, 95, 112, 114, 101, 112, 97, 114, 101, 95, 115, 116, 97,
844                116, 101, 109, 101, 110, 116,
845                115, // global_table_spec.ks_name = test_prepare_statements
846                0, 7, 116, 97, 98, 108, 101, 95, 49, // global_table_spec.table_name = table_1
847                0, 2, 105, 100, // ColSpec.name = "id"
848                0, 9, // ColSpec.col_type = Int
849                0, 1, 120, // ColSpec.name = "x"
850                0, 9, // ColSpec.col_type = Int
851                0, 4, 110, 97, 109, 101, // ColSpec.name = "name"
852                0, 13, // ColSpec.col_type = VarChar
853                0, 0, 0, 4, // row metadata flags
854                0, 0, 0, 0, // columns count
855            ],
856            tracing_id: None,
857            warnings: vec![],
858        };
859        let body = ResponseBody::Result(ResResultBody::Prepared(BodyResResultPrepared {
860            id: CBytesShort::new(vec![
861                195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27, 73,
862            ]),
863            result_metadata_id: None,
864            metadata: PreparedMetadata {
865                pk_indexes: vec![0],
866                global_table_spec: Some(TableSpec {
867                    ks_name: "test_prepare_statements".into(),
868                    table_name: "table_1".into(),
869                }),
870                col_specs: vec![
871                    ColSpec {
872                        table_spec: None,
873                        name: "id".into(),
874                        col_type: ColTypeOption {
875                            id: ColType::Int,
876                            value: None,
877                        },
878                    },
879                    ColSpec {
880                        table_spec: None,
881                        name: "x".into(),
882                        col_type: ColTypeOption {
883                            id: ColType::Int,
884                            value: None,
885                        },
886                    },
887                    ColSpec {
888                        table_spec: None,
889                        name: "name".into(),
890                        col_type: ColTypeOption {
891                            id: ColType::Varchar,
892                            value: None,
893                        },
894                    },
895                ],
896            },
897            result_metadata: RowsMetadata {
898                flags: RowsMetadataFlags::NO_METADATA,
899                columns_count: 0,
900                paging_state: None,
901                new_metadata_id: None,
902                global_table_spec: None,
903                col_specs: vec![],
904            },
905        }));
906
907        helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
908    }
909
910    fn create_small_envelope_data() -> (Envelope, Vec<u8>) {
911        let raw_envelope = vec![
912            4, 0, 0, 0, 7, 0, 0, 0, 30, 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114,
913            121, 0, 8, 1, 0, 2, 0, 0, 0, 3, 1, 2, 3, 255, 255, 255, 255,
914        ];
915        let envelope = Envelope {
916            version: Version::V4,
917            direction: Direction::Request,
918            flags: Flags::empty(),
919            opcode: Opcode::Query,
920            stream_id: 0,
921            body: vec![
922                0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114, 121, 0, 8, 1, 0, 2, 0, 0,
923                0, 3, 1, 2, 3, 255, 255, 255, 255,
924            ],
925            tracing_id: None,
926            warnings: vec![],
927        };
928
929        (envelope, raw_envelope)
930    }
931
932    fn create_large_envelope_data() -> (Envelope, Vec<u8>) {
933        let body: Vec<u8> = (0..262144).map(|value| (value % 256) as u8).collect();
934
935        let mut raw_envelope = vec![4, 0, 0, 0, 7, 0, 4, 0, 0];
936        raw_envelope.append(&mut body.clone());
937
938        let envelope = Envelope {
939            version: Version::V4,
940            direction: Direction::Request,
941            flags: Flags::empty(),
942            opcode: Opcode::Query,
943            stream_id: 0,
944            body,
945            tracing_id: None,
946            warnings: vec![],
947        };
948
949        (envelope, raw_envelope)
950    }
951
952    #[test]
953    fn should_encode_and_decode_legacy_frames() {
954        let (envelope, raw_envelope) = create_small_envelope_data();
955
956        let mut encoder = LegacyFrameEncoder::default();
957        assert!(encoder.can_fit(raw_envelope.len()));
958
959        encoder.add_envelope(raw_envelope.clone());
960        assert!(!encoder.can_fit(1));
961
962        let mut frame = encoder.finalize_self_contained().to_vec();
963        assert_eq!(frame, raw_envelope);
964
965        let mut decoder = LegacyFrameDecoder::default();
966
967        let envelopes = decoder.consume(&mut frame, Compression::None).unwrap();
968        assert_eq!(envelopes.len(), 1);
969        assert_eq!(envelopes[0], envelope);
970
971        encoder.reset();
972        assert!(encoder.can_fit(raw_envelope.len()));
973    }
974
975    #[test]
976    fn should_encode_and_decode_uncompressed_self_contained_frames() {
977        let (envelope, raw_envelope) = create_small_envelope_data();
978
979        let mut encoder = UncompressedFrameEncoder::default();
980        assert!(encoder.can_fit(raw_envelope.len()));
981
982        encoder.add_envelope(raw_envelope.clone());
983        assert!(encoder.can_fit(raw_envelope.len()));
984
985        encoder.add_envelope(raw_envelope);
986
987        let mut buffer1 = encoder.finalize_self_contained().to_vec();
988        let mut buffer2 = buffer1.split_off(5);
989
990        let mut decoder = UncompressedFrameDecoder::default();
991
992        let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
993        assert!(buffer1.is_empty());
994        assert!(envelopes.is_empty());
995
996        let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
997        assert!(buffer2.is_empty());
998        assert_eq!(envelopes.len(), 2);
999        assert_eq!(envelopes[0], envelope);
1000        assert_eq!(envelopes[1], envelope);
1001    }
1002
1003    #[test]
1004    fn should_encode_and_decode_uncompressed_non_self_contained_frames() {
1005        let (envelope, raw_envelope) = create_large_envelope_data();
1006
1007        let mut encoder = UncompressedFrameEncoder::default();
1008        assert!(!encoder.can_fit(raw_envelope.len()));
1009
1010        let data_len = raw_envelope.len();
1011        let mut data_start = 0;
1012        let mut buffer1 = vec![];
1013
1014        while data_start < data_len {
1015            let (data_start_offset, frame) =
1016                encoder.finalize_non_self_contained(&raw_envelope[data_start..]);
1017
1018            data_start += data_start_offset;
1019
1020            buffer1.extend_from_slice(frame);
1021
1022            encoder.reset();
1023        }
1024
1025        let mut buffer2 = buffer1.split_off(PAYLOAD_SIZE_LIMIT);
1026
1027        let mut decoder = UncompressedFrameDecoder::default();
1028
1029        let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1030        assert!(buffer1.is_empty());
1031        assert!(envelopes.is_empty());
1032
1033        let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1034        assert!(buffer2.is_empty());
1035        assert_eq!(envelopes.len(), 1);
1036        assert_eq!(envelopes[0], envelope);
1037    }
1038
1039    #[test]
1040    fn should_encode_and_decode_compressed_self_contained_frames() {
1041        let (envelope, raw_envelope) = create_small_envelope_data();
1042
1043        let mut encoder = Lz4FrameEncoder::default();
1044        assert!(encoder.can_fit(raw_envelope.len()));
1045
1046        encoder.add_envelope(raw_envelope.clone());
1047        assert!(encoder.can_fit(raw_envelope.len()));
1048
1049        encoder.add_envelope(raw_envelope);
1050
1051        let mut buffer1 = encoder.finalize_self_contained().to_vec();
1052        let mut buffer2 = buffer1.split_off(5);
1053
1054        let mut decoder = Lz4FrameDecoder::default();
1055
1056        let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1057        assert!(buffer1.is_empty());
1058        assert!(envelopes.is_empty());
1059
1060        let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1061        assert!(buffer2.is_empty());
1062        assert_eq!(envelopes.len(), 2);
1063        assert_eq!(envelopes[0], envelope);
1064        assert_eq!(envelopes[1], envelope);
1065    }
1066
1067    #[test]
1068    fn should_encode_and_decode_compressed_non_self_contained_frames() {
1069        let (envelope, raw_envelope) = create_large_envelope_data();
1070
1071        let mut encoder = Lz4FrameEncoder::default();
1072        assert!(!encoder.can_fit(raw_envelope.len()));
1073
1074        let data_len = raw_envelope.len();
1075        let mut data_start = 0;
1076        let mut buffer1 = vec![];
1077
1078        while data_start < data_len {
1079            let (data_start_offset, frame) =
1080                encoder.finalize_non_self_contained(&raw_envelope[data_start..]);
1081
1082            data_start += data_start_offset;
1083
1084            buffer1.extend_from_slice(frame);
1085
1086            encoder.reset();
1087        }
1088
1089        let mut buffer2 = buffer1.split_off(1000);
1090
1091        let mut decoder = Lz4FrameDecoder::default();
1092
1093        let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1094        assert!(buffer1.is_empty());
1095        assert!(envelopes.is_empty());
1096
1097        let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1098        assert!(buffer2.is_empty());
1099        assert_eq!(envelopes.len(), 1);
1100        assert_eq!(envelopes[0], envelope);
1101    }
1102}
1103
1104#[cfg(test)]
1105mod flags {
1106    use super::*;
1107    use crate::consistency::Consistency;
1108    use crate::frame::message_query::BodyReqQuery;
1109    use crate::frame::message_result::ResResultBody;
1110    use crate::query::query_params::QueryParams;
1111
1112    #[test]
1113    fn test_tracing_id_request() {
1114        let raw_envelope = [
1115            4, // version
1116            2, // flags
1117            0, 12, // stream id
1118            7,  // opcode
1119            0, 0, 0, 11, //length
1120            0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64, // body
1121        ];
1122        let envelope = Envelope {
1123            version: Version::V4,
1124            direction: Direction::Request,
1125            flags: Flags::TRACING,
1126            opcode: Opcode::Query,
1127            stream_id: 12,
1128            body: vec![0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64],
1129            tracing_id: None,
1130            warnings: vec![],
1131        };
1132
1133        let body = RequestBody::Query(BodyReqQuery {
1134            query: "blah".into(),
1135            query_params: QueryParams {
1136                consistency: Consistency::Any,
1137                with_names: true,
1138                values: None,
1139                page_size: None,
1140                paging_state: None,
1141                serial_consistency: None,
1142                timestamp: None,
1143                keyspace: None,
1144                now_in_seconds: None,
1145            },
1146        });
1147        helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
1148    }
1149
1150    #[test]
1151    fn test_tracing_id_response() {
1152        let raw_envelope = [
1153            132, //version
1154            2,   // flags
1155            0, 12, // stream id
1156            8,  //opcode
1157            0, 0, 0, 20, // length
1158            4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87, // tracing_id
1159            0, 0, 0, 1, // body
1160        ];
1161        let envelope = Envelope {
1162            version: Version::V4,
1163            direction: Direction::Response,
1164            flags: Flags::TRACING,
1165            opcode: Opcode::Result,
1166            stream_id: 12,
1167            body: vec![0, 0, 0, 1],
1168            tracing_id: Some(uuid::Uuid::from_bytes([
1169                4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87,
1170            ])),
1171            warnings: vec![],
1172        };
1173
1174        let body = ResponseBody::Result(ResResultBody::Void);
1175
1176        helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
1177    }
1178
1179    #[test]
1180    fn test_warnings_response() {
1181        let raw_envelope = [
1182            132, // version
1183            8,   // flags
1184            5, 64, // stream id
1185            8,  // opcode
1186            0, 0, 0, 19, // length
1187            // warnings
1188            0, 1, 0, 11, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, // warnings
1189            0, 0, 0, 1, // body
1190        ];
1191
1192        let body = ResponseBody::Result(ResResultBody::Void);
1193
1194        let envelope = Envelope {
1195            version: Version::V4,
1196            opcode: Opcode::Result,
1197            flags: Flags::WARNING,
1198            direction: Direction::Response,
1199            stream_id: 1344,
1200            tracing_id: None,
1201            body: vec![0, 0, 0, 1],
1202            warnings: vec!["Hello World".into()],
1203        };
1204
1205        helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
1206    }
1207}