cassandra_protocol/
frame.rs

1use crate::compression::{Compression, CompressionError};
2use crate::frame::message_request::RequestBody;
3use crate::frame::message_response::ResponseBody;
4use crate::types::data_serialization_types::decode_timeuuid;
5use crate::types::{from_cursor_string_list, try_i16_from_bytes, try_i32_from_bytes, UUID_LEN};
6use bitflags::bitflags;
7use derivative::Derivative;
8use derive_more::{Constructor, Display};
9use std::convert::TryFrom;
10use std::io::Cursor;
11use thiserror::Error;
12use uuid::Uuid;
13
14pub use crate::frame::traits::*;
15
16/// Number of bytes in the header
17const ENVELOPE_HEADER_LEN: usize = 9;
18/// Number of stream bytes in accordance to protocol.
19pub const STREAM_LEN: usize = 2;
20/// Number of body length bytes in accordance to protocol.
21pub const LENGTH_LEN: usize = 4;
22
23pub mod events;
24pub mod frame_decoder;
25pub mod frame_encoder;
26pub mod message_auth_challenge;
27pub mod message_auth_response;
28pub mod message_auth_success;
29pub mod message_authenticate;
30pub mod message_batch;
31pub mod message_error;
32pub mod message_event;
33pub mod message_execute;
34pub mod message_options;
35pub mod message_prepare;
36pub mod message_query;
37pub mod message_ready;
38pub mod message_register;
39pub mod message_request;
40pub mod message_response;
41pub mod message_result;
42pub mod message_startup;
43pub mod message_supported;
44pub mod traits;
45
46use crate::error;
47
48pub const EVENT_STREAM_ID: i16 = -1;
49
50const fn const_max(a: usize, b: usize) -> usize {
51    if a < b {
52        a
53    } else {
54        b
55    }
56}
57
58/// Maximum size of frame payloads - aggregated envelopes or a part of a single envelope.
59pub const PAYLOAD_SIZE_LIMIT: usize = 1 << 17;
60
61const UNCOMPRESSED_FRAME_HEADER_LENGTH: usize = 6;
62const COMPRESSED_FRAME_HEADER_LENGTH: usize = 8;
63const FRAME_TRAILER_LENGTH: usize = 4;
64
65/// Maximum size of an entire frame.
66pub const MAX_FRAME_SIZE: usize = PAYLOAD_SIZE_LIMIT
67    + const_max(
68        UNCOMPRESSED_FRAME_HEADER_LENGTH,
69        COMPRESSED_FRAME_HEADER_LENGTH,
70    )
71    + FRAME_TRAILER_LENGTH;
72
73/// Cassandra stream identifier.
74pub type StreamId = i16;
75
76#[derive(Debug, Clone, Eq, PartialEq, Hash, Constructor)]
77pub struct ParsedEnvelope {
78    /// How many bytes from the buffer have been read.
79    pub envelope_len: usize,
80    /// The parsed envelope.
81    pub envelope: Envelope,
82}
83
84#[derive(Derivative, Clone, PartialEq, Eq, Hash)]
85#[derivative(Debug)]
86pub struct Envelope {
87    pub version: Version,
88    pub direction: Direction,
89    pub flags: Flags,
90    pub opcode: Opcode,
91    pub stream_id: StreamId,
92    #[derivative(Debug = "ignore")]
93    pub body: Vec<u8>,
94    pub tracing_id: Option<Uuid>,
95    pub warnings: Vec<String>,
96}
97
98impl Envelope {
99    #[inline]
100    #[allow(clippy::too_many_arguments)]
101    pub fn new(
102        version: Version,
103        direction: Direction,
104        flags: Flags,
105        opcode: Opcode,
106        stream_id: StreamId,
107        body: Vec<u8>,
108        tracing_id: Option<Uuid>,
109        warnings: Vec<String>,
110    ) -> Self {
111        Envelope {
112            version,
113            direction,
114            flags,
115            opcode,
116            stream_id,
117            body,
118            tracing_id,
119            warnings,
120        }
121    }
122
123    #[inline]
124    pub fn request_body(&self) -> error::Result<RequestBody> {
125        RequestBody::try_from(self.body.as_slice(), self.opcode, self.version)
126    }
127
128    #[inline]
129    pub fn response_body(&self) -> error::Result<ResponseBody> {
130        ResponseBody::try_from(self.body.as_slice(), self.opcode, self.version)
131    }
132
133    #[inline]
134    pub fn tracing_id(&self) -> &Option<Uuid> {
135        &self.tracing_id
136    }
137
138    #[inline]
139    pub fn warnings(&self) -> &[String] {
140        &self.warnings
141    }
142
143    /// Parses the raw bytes of a cassandra envelope returning a [`ParsedEnvelope`] struct.
144    /// The typical use case is reading from a buffer that may contain 0 or more envelopes and where
145    /// the last envelope may be incomplete. The possible return values are:
146    /// * `Ok(ParsedEnvelope)` - The first envelope in the buffer has been successfully parsed.
147    /// * `Err(ParseEnvelopeError::NotEnoughBytes)` - There are not enough bytes to parse a single envelope, [`Envelope::from_buffer`] should be recalled when it is possible that there are more bytes.
148    /// * `Err(_)` - The envelope is malformed and you should close the connection as this method does not provide a way to tell how many bytes to advance the buffer in this case.
149    pub fn from_buffer(
150        data: &[u8],
151        compression: Compression,
152    ) -> Result<ParsedEnvelope, ParseEnvelopeError> {
153        if data.len() < ENVELOPE_HEADER_LEN {
154            return Err(ParseEnvelopeError::NotEnoughBytes);
155        }
156
157        let body_len = try_i32_from_bytes(&data[5..9]).unwrap() as usize;
158        let envelope_len = ENVELOPE_HEADER_LEN + body_len;
159        if data.len() < envelope_len {
160            return Err(ParseEnvelopeError::NotEnoughBytes);
161        }
162
163        let version = Version::try_from(data[0])
164            .map_err(|_| ParseEnvelopeError::UnsupportedVersion(data[0] & 0x7f))?;
165        let direction = Direction::from(data[0]);
166        let flags = Flags::from_bits_truncate(data[1]);
167        let stream_id = try_i16_from_bytes(&data[2..4]).unwrap();
168        let opcode = Opcode::try_from(data[4])
169            .map_err(|_| ParseEnvelopeError::UnsupportedOpcode(data[4]))?;
170
171        let body_bytes = &data[ENVELOPE_HEADER_LEN..envelope_len];
172
173        let full_body = if flags.contains(Flags::COMPRESSION) {
174            compression.decode(body_bytes.to_vec())
175        } else {
176            Compression::None.decode(body_bytes.to_vec())
177        }
178        .map_err(ParseEnvelopeError::DecompressionError)?;
179
180        let body_len = full_body.len();
181
182        // Use cursor to get tracing id, warnings and actual body
183        let mut body_cursor = Cursor::new(full_body.as_slice());
184
185        let tracing_id = if flags.contains(Flags::TRACING) && direction == Direction::Response {
186            let mut tracing_bytes = [0; UUID_LEN];
187            std::io::Read::read_exact(&mut body_cursor, &mut tracing_bytes).unwrap();
188
189            Some(decode_timeuuid(&tracing_bytes).map_err(ParseEnvelopeError::InvalidUuid)?)
190        } else {
191            None
192        };
193
194        let warnings = if flags.contains(Flags::WARNING) {
195            from_cursor_string_list(&mut body_cursor)
196                .map_err(ParseEnvelopeError::InvalidWarnings)?
197        } else {
198            vec![]
199        };
200
201        let mut body = Vec::with_capacity(body_len - body_cursor.position() as usize);
202
203        std::io::Read::read_to_end(&mut body_cursor, &mut body)
204            .expect("Read cannot fail because cursor is backed by slice");
205
206        Ok(ParsedEnvelope::new(
207            envelope_len,
208            Envelope {
209                version,
210                direction,
211                flags,
212                opcode,
213                stream_id,
214                body,
215                tracing_id,
216                warnings,
217            },
218        ))
219    }
220
221    pub fn check_envelope_size(data: &[u8]) -> Result<usize, CheckEnvelopeSizeError> {
222        if data.len() < ENVELOPE_HEADER_LEN {
223            return Err(CheckEnvelopeSizeError::NotEnoughBytes);
224        }
225
226        let body_len = try_i32_from_bytes(&data[5..9]).unwrap() as usize;
227        let envelope_len = ENVELOPE_HEADER_LEN + body_len;
228        if data.len() < envelope_len {
229            return Err(CheckEnvelopeSizeError::NotEnoughBytes);
230        }
231        let _ = Version::try_from(data[0])
232            .map_err(|_| CheckEnvelopeSizeError::UnsupportedVersion(data[0] & 0x7f))?;
233
234        Ok(envelope_len)
235    }
236
237    pub fn encode_with(&self, compressor: Compression) -> error::Result<Vec<u8>> {
238        // compression is ignored since v5
239        let is_compressed = self.version < Version::V5 && compressor.is_compressed();
240
241        let combined_version_byte = u8::from(self.version) | u8::from(self.direction);
242        let flag_byte = (if is_compressed {
243            self.flags | Flags::COMPRESSION
244        } else {
245            self.flags.difference(Flags::COMPRESSION)
246        })
247        .bits();
248
249        let opcode_byte = u8::from(self.opcode);
250
251        let mut v = Vec::with_capacity(9);
252
253        v.push(combined_version_byte);
254        v.push(flag_byte);
255        v.extend_from_slice(&self.stream_id.to_be_bytes());
256        v.push(opcode_byte);
257
258        let mut flags_buffer = vec![];
259
260        if self.flags.contains(Flags::TRACING) && self.direction == Direction::Response {
261            let mut tracing_id = self
262                .tracing_id
263                .ok_or_else(|| {
264                    error::Error::Io(std::io::Error::new(
265                        std::io::ErrorKind::Other,
266                        "Tracing flag was set but Envelope has no tracing_id",
267                    ))
268                })?
269                .into_bytes()
270                .to_vec();
271
272            flags_buffer.append(&mut tracing_id);
273        };
274
275        if self.flags.contains(Flags::WARNING) && self.direction == Direction::Response {
276            let warnings_len = self.warnings.len() as i16;
277            flags_buffer.extend_from_slice(&warnings_len.to_be_bytes());
278
279            for warning in &self.warnings {
280                let warning_len = warning.len() as i16;
281                flags_buffer.extend_from_slice(&warning_len.to_be_bytes());
282                flags_buffer.append(&mut warning.as_bytes().to_vec());
283            }
284        }
285
286        if is_compressed {
287            // avoid having to copy the body if there is nothing in flags_buffer
288            let encoded_body = if flags_buffer.is_empty() {
289                compressor.encode(&self.body)?
290            } else {
291                flags_buffer.extend_from_slice(&self.body);
292                compressor.encode(&flags_buffer)?
293            };
294
295            let body_len = encoded_body.len() as i32;
296            v.extend_from_slice(&body_len.to_be_bytes());
297            v.extend_from_slice(&encoded_body);
298        } else {
299            // avoid having to copy the body if there is nothing in flags_buffer
300            if flags_buffer.is_empty() {
301                let body_len = self.body.len() as i32;
302                v.extend_from_slice(&body_len.to_be_bytes());
303                v.extend_from_slice(&self.body);
304            } else {
305                let body_len = self.body.len() as i32 + flags_buffer.len() as i32;
306                v.extend_from_slice(&body_len.to_be_bytes());
307                flags_buffer.extend_from_slice(&self.body);
308                v.append(&mut flags_buffer);
309            }
310        }
311
312        Ok(v)
313    }
314}
315
316#[derive(Debug, Error)]
317#[non_exhaustive]
318pub enum CheckEnvelopeSizeError {
319    #[error("Not enough bytes!")]
320    NotEnoughBytes,
321    #[error("Unsupported version: {0}")]
322    UnsupportedVersion(u8),
323    #[error("Unsupported opcode: {0}")]
324    UnsupportedOpcode(u8),
325}
326
327#[derive(Debug, Error)]
328#[non_exhaustive]
329pub enum ParseEnvelopeError {
330    /// There are not enough bytes to parse a single envelope, [`Envelope::from_buffer`] should be recalled when it is possible that there are more bytes.
331    #[error("Not enough bytes!")]
332    NotEnoughBytes,
333    /// The version is not supported by cassandra-protocol, a server implementation should handle this by returning a server error with the message "Invalid or unsupported protocol version".
334    #[error("Unsupported version: {0}")]
335    UnsupportedVersion(u8),
336    #[error("Unsupported opcode: {0}")]
337    UnsupportedOpcode(u8),
338    #[error("Decompression error: {0}")]
339    DecompressionError(CompressionError),
340    #[error("Invalid uuid: {0}")]
341    InvalidUuid(uuid::Error),
342    #[error("Invalid warnings: {0}")]
343    InvalidWarnings(error::Error),
344}
345
346/// Protocol version.
347#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
348#[non_exhaustive]
349pub enum Version {
350    V3,
351    V4,
352    V5,
353}
354
355impl From<Version> for u8 {
356    fn from(value: Version) -> Self {
357        match value {
358            Version::V3 => 3,
359            Version::V4 => 4,
360            Version::V5 => 5,
361        }
362    }
363}
364
365impl TryFrom<u8> for Version {
366    type Error = error::Error;
367
368    fn try_from(version: u8) -> Result<Self, Self::Error> {
369        match version & 0x7F {
370            3 => Ok(Version::V3),
371            4 => Ok(Version::V4),
372            5 => Ok(Version::V5),
373            v => Err(error::Error::General(format!(
374                "Unknown cassandra version: {v}"
375            ))),
376        }
377    }
378}
379
380impl Version {
381    /// Number of bytes that represent Cassandra frame's version.
382    pub const BYTE_LENGTH: usize = 1;
383}
384
385#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
386pub enum Direction {
387    Request,
388    Response,
389}
390
391impl From<Direction> for u8 {
392    fn from(value: Direction) -> u8 {
393        match value {
394            Direction::Request => 0x00,
395            Direction::Response => 0x80,
396        }
397    }
398}
399
400impl From<u8> for Direction {
401    fn from(value: u8) -> Self {
402        match value & 0x80 {
403            0 => Direction::Request,
404            _ => Direction::Response,
405        }
406    }
407}
408
409bitflags! {
410    /// Envelope flags
411    #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
412    pub struct Flags: u8 {
413        const COMPRESSION = 0x01;
414        const TRACING = 0x02;
415        const CUSTOM_PAYLOAD = 0x04;
416        const WARNING = 0x08;
417        const BETA = 0x10;
418    }
419}
420
421impl Default for Flags {
422    #[inline]
423    fn default() -> Self {
424        Flags::empty()
425    }
426}
427
428impl Flags {
429    // Number of opcode bytes in accordance to protocol.
430    pub const BYTE_LENGTH: usize = 1;
431}
432
433#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
434#[non_exhaustive]
435pub enum Opcode {
436    Error,
437    Startup,
438    Ready,
439    Authenticate,
440    Options,
441    Supported,
442    Query,
443    Result,
444    Prepare,
445    Execute,
446    Register,
447    Event,
448    Batch,
449    AuthChallenge,
450    AuthResponse,
451    AuthSuccess,
452}
453
454impl Opcode {
455    // Number of opcode bytes in accordance to protocol.
456    pub const BYTE_LENGTH: usize = 1;
457}
458
459impl From<Opcode> for u8 {
460    fn from(value: Opcode) -> Self {
461        match value {
462            Opcode::Error => 0x00,
463            Opcode::Startup => 0x01,
464            Opcode::Ready => 0x02,
465            Opcode::Authenticate => 0x03,
466            Opcode::Options => 0x05,
467            Opcode::Supported => 0x06,
468            Opcode::Query => 0x07,
469            Opcode::Result => 0x08,
470            Opcode::Prepare => 0x09,
471            Opcode::Execute => 0x0A,
472            Opcode::Register => 0x0B,
473            Opcode::Event => 0x0C,
474            Opcode::Batch => 0x0D,
475            Opcode::AuthChallenge => 0x0E,
476            Opcode::AuthResponse => 0x0F,
477            Opcode::AuthSuccess => 0x10,
478        }
479    }
480}
481
482impl TryFrom<u8> for Opcode {
483    type Error = error::Error;
484
485    fn try_from(value: u8) -> Result<Self, <Opcode as TryFrom<u8>>::Error> {
486        match value {
487            0x00 => Ok(Opcode::Error),
488            0x01 => Ok(Opcode::Startup),
489            0x02 => Ok(Opcode::Ready),
490            0x03 => Ok(Opcode::Authenticate),
491            0x05 => Ok(Opcode::Options),
492            0x06 => Ok(Opcode::Supported),
493            0x07 => Ok(Opcode::Query),
494            0x08 => Ok(Opcode::Result),
495            0x09 => Ok(Opcode::Prepare),
496            0x0A => Ok(Opcode::Execute),
497            0x0B => Ok(Opcode::Register),
498            0x0C => Ok(Opcode::Event),
499            0x0D => Ok(Opcode::Batch),
500            0x0E => Ok(Opcode::AuthChallenge),
501            0x0F => Ok(Opcode::AuthResponse),
502            0x10 => Ok(Opcode::AuthSuccess),
503            _ => Err(error::Error::General(format!("Unknown opcode: {value}"))),
504        }
505    }
506}
507
508#[cfg(test)]
509mod helpers {
510    use super::*;
511
512    pub fn test_encode_decode_roundtrip_response(
513        raw_envelope: &[u8],
514        envelope: Envelope,
515        body: ResponseBody,
516    ) {
517        // test encode
518        let encoded_body = body.serialize_to_vec(Version::V4);
519        assert_eq!(
520            &envelope.body, &encoded_body,
521            "encoded body did not match envelope's body"
522        );
523
524        let encoded_envelope = envelope.encode_with(Compression::None).unwrap();
525        assert_eq!(
526            raw_envelope, &encoded_envelope,
527            "encoded envelope did not match expected raw envelope"
528        );
529
530        // test decode
531        let decoded_envelope = Envelope::from_buffer(raw_envelope, Compression::None)
532            .unwrap()
533            .envelope;
534        assert_eq!(decoded_envelope, envelope);
535
536        let decoded_body = envelope.response_body().unwrap();
537        assert_eq!(
538            body, decoded_body,
539            "decoded envelope.body did not match body"
540        )
541    }
542
543    pub fn test_encode_decode_roundtrip_request(
544        raw_envelope: &[u8],
545        envelope: Envelope,
546        body: RequestBody,
547    ) {
548        // test encode
549        let encoded_body = body.serialize_to_vec(Version::V4);
550        assert_eq!(
551            &envelope.body, &encoded_body,
552            "encoded body did not match envelope's body"
553        );
554
555        let encoded_envelope = envelope.encode_with(Compression::None).unwrap();
556        assert_eq!(
557            raw_envelope, &encoded_envelope,
558            "encoded envelope did not match expected raw envelope"
559        );
560
561        // test decode
562        let decoded_envelope = Envelope::from_buffer(raw_envelope, Compression::None)
563            .unwrap()
564            .envelope;
565        assert_eq!(envelope, decoded_envelope);
566
567        let decoded_body = envelope.request_body().unwrap();
568        assert_eq!(
569            body, decoded_body,
570            "decoded envelope.body did not match body"
571        )
572    }
573
574    /// Use this when the body binary representation is nondeterministic but the body typed representation is deterministic
575    pub fn test_encode_decode_roundtrip_nondeterministic_request(
576        mut envelope: Envelope,
577        body: RequestBody,
578    ) {
579        // test encode
580        envelope.body = body.serialize_to_vec(Version::V4);
581
582        // test decode
583        let decoded_body = envelope.request_body().unwrap();
584        assert_eq!(
585            body, decoded_body,
586            "decoded envelope.body did not match body"
587        )
588    }
589}
590
591//noinspection DuplicatedCode
592#[cfg(test)]
593mod tests {
594    use super::*;
595    use crate::consistency::Consistency;
596    use crate::frame::frame_decoder::{
597        FrameDecoder, LegacyFrameDecoder, Lz4FrameDecoder, UncompressedFrameDecoder,
598    };
599    use crate::frame::frame_encoder::{
600        FrameEncoder, LegacyFrameEncoder, Lz4FrameEncoder, UncompressedFrameEncoder,
601    };
602    use crate::frame::message_query::BodyReqQuery;
603    use crate::query::query_params::QueryParams;
604    use crate::query::query_values::QueryValues;
605    use crate::types::value::Value;
606    use crate::types::CBytes;
607
608    #[test]
609    fn test_frame_version_as_byte() {
610        assert_eq!(u8::from(Version::V3), 0x03);
611        assert_eq!(u8::from(Version::V4), 0x04);
612        assert_eq!(u8::from(Version::V5), 0x05);
613
614        assert_eq!(u8::from(Direction::Request), 0x00);
615        assert_eq!(u8::from(Direction::Response), 0x80);
616    }
617
618    #[test]
619    fn test_frame_version_from() {
620        assert_eq!(Version::try_from(0x03).unwrap(), Version::V3);
621        assert_eq!(Version::try_from(0x83).unwrap(), Version::V3);
622        assert_eq!(Version::try_from(0x04).unwrap(), Version::V4);
623        assert_eq!(Version::try_from(0x84).unwrap(), Version::V4);
624        assert_eq!(Version::try_from(0x05).unwrap(), Version::V5);
625        assert_eq!(Version::try_from(0x85).unwrap(), Version::V5);
626
627        assert_eq!(Direction::from(0x03), Direction::Request);
628        assert_eq!(Direction::from(0x04), Direction::Request);
629        assert_eq!(Direction::from(0x05), Direction::Request);
630        assert_eq!(Direction::from(0x83), Direction::Response);
631        assert_eq!(Direction::from(0x84), Direction::Response);
632        assert_eq!(Direction::from(0x85), Direction::Response);
633    }
634
635    #[test]
636    fn test_opcode_as_byte() {
637        assert_eq!(u8::from(Opcode::Error), 0x00);
638        assert_eq!(u8::from(Opcode::Startup), 0x01);
639        assert_eq!(u8::from(Opcode::Ready), 0x02);
640        assert_eq!(u8::from(Opcode::Authenticate), 0x03);
641        assert_eq!(u8::from(Opcode::Options), 0x05);
642        assert_eq!(u8::from(Opcode::Supported), 0x06);
643        assert_eq!(u8::from(Opcode::Query), 0x07);
644        assert_eq!(u8::from(Opcode::Result), 0x08);
645        assert_eq!(u8::from(Opcode::Prepare), 0x09);
646        assert_eq!(u8::from(Opcode::Execute), 0x0A);
647        assert_eq!(u8::from(Opcode::Register), 0x0B);
648        assert_eq!(u8::from(Opcode::Event), 0x0C);
649        assert_eq!(u8::from(Opcode::Batch), 0x0D);
650        assert_eq!(u8::from(Opcode::AuthChallenge), 0x0E);
651        assert_eq!(u8::from(Opcode::AuthResponse), 0x0F);
652        assert_eq!(u8::from(Opcode::AuthSuccess), 0x10);
653    }
654
655    #[test]
656    fn test_opcode_from() {
657        assert_eq!(Opcode::try_from(0x00).unwrap(), Opcode::Error);
658        assert_eq!(Opcode::try_from(0x01).unwrap(), Opcode::Startup);
659        assert_eq!(Opcode::try_from(0x02).unwrap(), Opcode::Ready);
660        assert_eq!(Opcode::try_from(0x03).unwrap(), Opcode::Authenticate);
661        assert_eq!(Opcode::try_from(0x05).unwrap(), Opcode::Options);
662        assert_eq!(Opcode::try_from(0x06).unwrap(), Opcode::Supported);
663        assert_eq!(Opcode::try_from(0x07).unwrap(), Opcode::Query);
664        assert_eq!(Opcode::try_from(0x08).unwrap(), Opcode::Result);
665        assert_eq!(Opcode::try_from(0x09).unwrap(), Opcode::Prepare);
666        assert_eq!(Opcode::try_from(0x0A).unwrap(), Opcode::Execute);
667        assert_eq!(Opcode::try_from(0x0B).unwrap(), Opcode::Register);
668        assert_eq!(Opcode::try_from(0x0C).unwrap(), Opcode::Event);
669        assert_eq!(Opcode::try_from(0x0D).unwrap(), Opcode::Batch);
670        assert_eq!(Opcode::try_from(0x0E).unwrap(), Opcode::AuthChallenge);
671        assert_eq!(Opcode::try_from(0x0F).unwrap(), Opcode::AuthResponse);
672        assert_eq!(Opcode::try_from(0x10).unwrap(), Opcode::AuthSuccess);
673    }
674
675    #[test]
676    fn test_ready() {
677        let raw_envelope = vec![4, 0, 0, 0, 2, 0, 0, 0, 0];
678        let envelope = Envelope {
679            version: Version::V4,
680            direction: Direction::Request,
681            flags: Flags::empty(),
682            opcode: Opcode::Ready,
683            stream_id: 0,
684            body: vec![],
685            tracing_id: None,
686            warnings: vec![],
687        };
688        let body = ResponseBody::Ready;
689        helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
690    }
691
692    #[test]
693    fn test_query_minimal() {
694        let raw_envelope = [
695            4, 0, 0, 0, 7, 0, 0, 0, 11, 0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64,
696        ];
697        let envelope = Envelope {
698            version: Version::V4,
699            direction: Direction::Request,
700            flags: Flags::empty(),
701            opcode: Opcode::Query,
702            stream_id: 0,
703            body: vec![0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64],
704            tracing_id: None,
705            warnings: vec![],
706        };
707        let body = RequestBody::Query(BodyReqQuery {
708            query: "blah".into(),
709            query_params: QueryParams {
710                consistency: Consistency::Any,
711                with_names: true,
712                values: None,
713                page_size: None,
714                paging_state: None,
715                serial_consistency: None,
716                timestamp: None,
717                keyspace: None,
718                now_in_seconds: None,
719            },
720        });
721        helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
722    }
723
724    #[test]
725    fn test_query_simple_values() {
726        let raw_envelope = [
727            4, 0, 0, 0, 7, 0, 0, 0, 30, 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114,
728            121, 0, 8, 1, 0, 2, 0, 0, 0, 3, 1, 2, 3, 255, 255, 255, 255,
729        ];
730        let envelope = Envelope {
731            version: Version::V4,
732            direction: Direction::Request,
733            flags: Flags::empty(),
734            opcode: Opcode::Query,
735            stream_id: 0,
736            body: vec![
737                0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114, 121, 0, 8, 1, 0, 2, 0, 0,
738                0, 3, 1, 2, 3, 255, 255, 255, 255,
739            ],
740            tracing_id: None,
741            warnings: vec![],
742        };
743        let body = RequestBody::Query(BodyReqQuery {
744            query: "some query".into(),
745            query_params: QueryParams {
746                consistency: Consistency::Serial,
747                with_names: false,
748                values: Some(QueryValues::SimpleValues(vec![
749                    Value::Some(vec![1, 2, 3]),
750                    Value::Null,
751                ])),
752                page_size: None,
753                paging_state: None,
754                serial_consistency: None,
755                timestamp: None,
756                keyspace: None,
757                now_in_seconds: None,
758            },
759        });
760        helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
761    }
762
763    #[test]
764    fn test_query_named_values() {
765        let envelope = Envelope {
766            version: Version::V4,
767            direction: Direction::Request,
768            flags: Flags::empty(),
769            opcode: Opcode::Query,
770            stream_id: 0,
771            body: vec![],
772            tracing_id: None,
773            warnings: vec![],
774        };
775        let body = RequestBody::Query(BodyReqQuery {
776            query: "another query".into(),
777            query_params: QueryParams {
778                consistency: Consistency::Three,
779                with_names: true,
780                values: Some(QueryValues::NamedValues(
781                    vec![
782                        ("foo".to_string(), Value::Some(vec![11, 12, 13])),
783                        ("bar".to_string(), Value::NotSet),
784                        ("baz".to_string(), Value::Some(vec![42, 10, 99, 100, 4])),
785                    ]
786                    .into_iter()
787                    .collect(),
788                )),
789                page_size: Some(4),
790                paging_state: Some(CBytes::new(vec![0, 1, 2, 3])),
791                serial_consistency: Some(Consistency::One),
792                timestamp: Some(2000),
793                keyspace: None,
794                now_in_seconds: None,
795            },
796        });
797        helpers::test_encode_decode_roundtrip_nondeterministic_request(envelope, body);
798    }
799
800    #[test]
801    fn test_result_prepared_statement() {
802        use crate::frame::message_result::{
803            BodyResResultPrepared, ColSpec, ColType, ColTypeOption, PreparedMetadata,
804            ResResultBody, RowsMetadata, RowsMetadataFlags, TableSpec,
805        };
806        use crate::types::CBytesShort;
807
808        let raw_envelope = [
809            132, 0, 0, 0, 8, 0, 0, 0, 97, // cassandra header
810            0, 0, 0, 4, // prepared statement result
811            0, 16, 195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27,
812            73, // id
813            0, 0, 0, 1, // prepared metadata flags
814            0, 0, 0, 3, // columns count
815            0, 0, 0, 1, // pk count
816            0, 0, // pk index 1
817            0, 23, 116, 101, 115, 116, 95, 112, 114, 101, 112, 97, 114, 101, 95, 115, 116, 97, 116,
818            101, 109, 101, 110, 116,
819            115, // global_table_spec.ks_name = test_prepare_statements
820            0, 7, 116, 97, 98, 108, 101, 95, 49, // global_table_spec.table_name = table_1
821            0, 2, 105, 100, // ColSpec.name = "id"
822            0, 9, // ColSpec.col_type = Int
823            0, 1, 120, // ColSpec.name = "x"
824            0, 9, // ColSpec.col_type = Int
825            0, 4, 110, 97, 109, 101, // ColSpec.name = "name"
826            0, 13, // ColSpec.col_type = VarChar
827            0, 0, 0, 4, // row metadata flags
828            0, 0, 0, 0, // columns count
829        ];
830        let envelope = Envelope {
831            version: Version::V4,
832            direction: Direction::Response,
833            flags: Flags::empty(),
834            opcode: Opcode::Result,
835            stream_id: 0,
836            body: vec![
837                0, 0, 0, 4, // prepared statement result
838                0, 16, 195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27,
839                73, // id
840                0, 0, 0, 1, // prepared metadata flags
841                0, 0, 0, 3, // columns count
842                0, 0, 0, 1, // pk count
843                0, 0, // pk index 1
844                0, 23, 116, 101, 115, 116, 95, 112, 114, 101, 112, 97, 114, 101, 95, 115, 116, 97,
845                116, 101, 109, 101, 110, 116,
846                115, // global_table_spec.ks_name = test_prepare_statements
847                0, 7, 116, 97, 98, 108, 101, 95, 49, // global_table_spec.table_name = table_1
848                0, 2, 105, 100, // ColSpec.name = "id"
849                0, 9, // ColSpec.col_type = Int
850                0, 1, 120, // ColSpec.name = "x"
851                0, 9, // ColSpec.col_type = Int
852                0, 4, 110, 97, 109, 101, // ColSpec.name = "name"
853                0, 13, // ColSpec.col_type = VarChar
854                0, 0, 0, 4, // row metadata flags
855                0, 0, 0, 0, // columns count
856            ],
857            tracing_id: None,
858            warnings: vec![],
859        };
860        let body = ResponseBody::Result(ResResultBody::Prepared(BodyResResultPrepared {
861            id: CBytesShort::new(vec![
862                195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27, 73,
863            ]),
864            result_metadata_id: None,
865            metadata: PreparedMetadata {
866                pk_indexes: vec![0],
867                global_table_spec: Some(TableSpec {
868                    ks_name: "test_prepare_statements".into(),
869                    table_name: "table_1".into(),
870                }),
871                col_specs: vec![
872                    ColSpec {
873                        table_spec: None,
874                        name: "id".into(),
875                        col_type: ColTypeOption {
876                            id: ColType::Int,
877                            value: None,
878                        },
879                    },
880                    ColSpec {
881                        table_spec: None,
882                        name: "x".into(),
883                        col_type: ColTypeOption {
884                            id: ColType::Int,
885                            value: None,
886                        },
887                    },
888                    ColSpec {
889                        table_spec: None,
890                        name: "name".into(),
891                        col_type: ColTypeOption {
892                            id: ColType::Varchar,
893                            value: None,
894                        },
895                    },
896                ],
897            },
898            result_metadata: RowsMetadata {
899                flags: RowsMetadataFlags::NO_METADATA,
900                columns_count: 0,
901                paging_state: None,
902                new_metadata_id: None,
903                global_table_spec: None,
904                col_specs: vec![],
905            },
906        }));
907
908        helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
909    }
910
911    fn create_small_envelope_data() -> (Envelope, Vec<u8>) {
912        let raw_envelope = vec![
913            4, 0, 0, 0, 7, 0, 0, 0, 30, 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114,
914            121, 0, 8, 1, 0, 2, 0, 0, 0, 3, 1, 2, 3, 255, 255, 255, 255,
915        ];
916        let envelope = Envelope {
917            version: Version::V4,
918            direction: Direction::Request,
919            flags: Flags::empty(),
920            opcode: Opcode::Query,
921            stream_id: 0,
922            body: vec![
923                0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114, 121, 0, 8, 1, 0, 2, 0, 0,
924                0, 3, 1, 2, 3, 255, 255, 255, 255,
925            ],
926            tracing_id: None,
927            warnings: vec![],
928        };
929
930        (envelope, raw_envelope)
931    }
932
933    fn create_large_envelope_data() -> (Envelope, Vec<u8>) {
934        let body: Vec<u8> = (0..262144).map(|value| (value % 256) as u8).collect();
935
936        let mut raw_envelope = vec![4, 0, 0, 0, 7, 0, 4, 0, 0];
937        raw_envelope.append(&mut body.clone());
938
939        let envelope = Envelope {
940            version: Version::V4,
941            direction: Direction::Request,
942            flags: Flags::empty(),
943            opcode: Opcode::Query,
944            stream_id: 0,
945            body,
946            tracing_id: None,
947            warnings: vec![],
948        };
949
950        (envelope, raw_envelope)
951    }
952
953    #[test]
954    fn should_encode_and_decode_legacy_frames() {
955        let (envelope, raw_envelope) = create_small_envelope_data();
956
957        let mut encoder = LegacyFrameEncoder::default();
958        assert!(encoder.can_fit(raw_envelope.len()));
959
960        encoder.add_envelope(raw_envelope.clone());
961        assert!(!encoder.can_fit(1));
962
963        let mut frame = encoder.finalize_self_contained().to_vec();
964        assert_eq!(frame, raw_envelope);
965
966        let mut decoder = LegacyFrameDecoder::default();
967
968        let envelopes = decoder.consume(&mut frame, Compression::None).unwrap();
969        assert_eq!(envelopes.len(), 1);
970        assert_eq!(envelopes[0], envelope);
971
972        encoder.reset();
973        assert!(encoder.can_fit(raw_envelope.len()));
974    }
975
976    #[test]
977    fn should_encode_and_decode_uncompressed_self_contained_frames() {
978        let (envelope, raw_envelope) = create_small_envelope_data();
979
980        let mut encoder = UncompressedFrameEncoder::default();
981        assert!(encoder.can_fit(raw_envelope.len()));
982
983        encoder.add_envelope(raw_envelope.clone());
984        assert!(encoder.can_fit(raw_envelope.len()));
985
986        encoder.add_envelope(raw_envelope);
987
988        let mut buffer1 = encoder.finalize_self_contained().to_vec();
989        let mut buffer2 = buffer1.split_off(5);
990
991        let mut decoder = UncompressedFrameDecoder::default();
992
993        let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
994        assert!(buffer1.is_empty());
995        assert!(envelopes.is_empty());
996
997        let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
998        assert!(buffer2.is_empty());
999        assert_eq!(envelopes.len(), 2);
1000        assert_eq!(envelopes[0], envelope);
1001        assert_eq!(envelopes[1], envelope);
1002    }
1003
1004    #[test]
1005    fn should_encode_and_decode_uncompressed_non_self_contained_frames() {
1006        let (envelope, raw_envelope) = create_large_envelope_data();
1007
1008        let mut encoder = UncompressedFrameEncoder::default();
1009        assert!(!encoder.can_fit(raw_envelope.len()));
1010
1011        let data_len = raw_envelope.len();
1012        let mut data_start = 0;
1013        let mut buffer1 = vec![];
1014
1015        while data_start < data_len {
1016            let (data_start_offset, frame) =
1017                encoder.finalize_non_self_contained(&raw_envelope[data_start..]);
1018
1019            data_start += data_start_offset;
1020
1021            buffer1.extend_from_slice(frame);
1022
1023            encoder.reset();
1024        }
1025
1026        let mut buffer2 = buffer1.split_off(PAYLOAD_SIZE_LIMIT);
1027
1028        let mut decoder = UncompressedFrameDecoder::default();
1029
1030        let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1031        assert!(buffer1.is_empty());
1032        assert!(envelopes.is_empty());
1033
1034        let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1035        assert!(buffer2.is_empty());
1036        assert_eq!(envelopes.len(), 1);
1037        assert_eq!(envelopes[0], envelope);
1038    }
1039
1040    #[test]
1041    fn should_encode_and_decode_compressed_self_contained_frames() {
1042        let (envelope, raw_envelope) = create_small_envelope_data();
1043
1044        let mut encoder = Lz4FrameEncoder::default();
1045        assert!(encoder.can_fit(raw_envelope.len()));
1046
1047        encoder.add_envelope(raw_envelope.clone());
1048        assert!(encoder.can_fit(raw_envelope.len()));
1049
1050        encoder.add_envelope(raw_envelope);
1051
1052        let mut buffer1 = encoder.finalize_self_contained().to_vec();
1053        let mut buffer2 = buffer1.split_off(5);
1054
1055        let mut decoder = Lz4FrameDecoder::default();
1056
1057        let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1058        assert!(buffer1.is_empty());
1059        assert!(envelopes.is_empty());
1060
1061        let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1062        assert!(buffer2.is_empty());
1063        assert_eq!(envelopes.len(), 2);
1064        assert_eq!(envelopes[0], envelope);
1065        assert_eq!(envelopes[1], envelope);
1066    }
1067
1068    #[test]
1069    fn should_encode_and_decode_compressed_non_self_contained_frames() {
1070        let (envelope, raw_envelope) = create_large_envelope_data();
1071
1072        let mut encoder = Lz4FrameEncoder::default();
1073        assert!(!encoder.can_fit(raw_envelope.len()));
1074
1075        let data_len = raw_envelope.len();
1076        let mut data_start = 0;
1077        let mut buffer1 = vec![];
1078
1079        while data_start < data_len {
1080            let (data_start_offset, frame) =
1081                encoder.finalize_non_self_contained(&raw_envelope[data_start..]);
1082
1083            data_start += data_start_offset;
1084
1085            buffer1.extend_from_slice(frame);
1086
1087            encoder.reset();
1088        }
1089
1090        let mut buffer2 = buffer1.split_off(1000);
1091
1092        let mut decoder = Lz4FrameDecoder::default();
1093
1094        let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1095        assert!(buffer1.is_empty());
1096        assert!(envelopes.is_empty());
1097
1098        let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1099        assert!(buffer2.is_empty());
1100        assert_eq!(envelopes.len(), 1);
1101        assert_eq!(envelopes[0], envelope);
1102    }
1103}
1104
1105#[cfg(test)]
1106mod flags {
1107    use super::*;
1108    use crate::consistency::Consistency;
1109    use crate::frame::message_query::BodyReqQuery;
1110    use crate::frame::message_result::ResResultBody;
1111    use crate::query::query_params::QueryParams;
1112
1113    #[test]
1114    fn test_tracing_id_request() {
1115        let raw_envelope = [
1116            4, // version
1117            2, // flags
1118            0, 12, // stream id
1119            7,  // opcode
1120            0, 0, 0, 11, //length
1121            0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64, // body
1122        ];
1123        let envelope = Envelope {
1124            version: Version::V4,
1125            direction: Direction::Request,
1126            flags: Flags::TRACING,
1127            opcode: Opcode::Query,
1128            stream_id: 12,
1129            body: vec![0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64],
1130            tracing_id: None,
1131            warnings: vec![],
1132        };
1133
1134        let body = RequestBody::Query(BodyReqQuery {
1135            query: "blah".into(),
1136            query_params: QueryParams {
1137                consistency: Consistency::Any,
1138                with_names: true,
1139                values: None,
1140                page_size: None,
1141                paging_state: None,
1142                serial_consistency: None,
1143                timestamp: None,
1144                keyspace: None,
1145                now_in_seconds: None,
1146            },
1147        });
1148        helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
1149    }
1150
1151    #[test]
1152    fn test_tracing_id_response() {
1153        let raw_envelope = [
1154            132, //version
1155            2,   // flags
1156            0, 12, // stream id
1157            8,  //opcode
1158            0, 0, 0, 20, // length
1159            4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87, // tracing_id
1160            0, 0, 0, 1, // body
1161        ];
1162        let envelope = Envelope {
1163            version: Version::V4,
1164            direction: Direction::Response,
1165            flags: Flags::TRACING,
1166            opcode: Opcode::Result,
1167            stream_id: 12,
1168            body: vec![0, 0, 0, 1],
1169            tracing_id: Some(uuid::Uuid::from_bytes([
1170                4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87,
1171            ])),
1172            warnings: vec![],
1173        };
1174
1175        let body = ResponseBody::Result(ResResultBody::Void);
1176
1177        helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
1178    }
1179
1180    #[test]
1181    fn test_warnings_response() {
1182        let raw_envelope = [
1183            132, // version
1184            8,   // flags
1185            5, 64, // stream id
1186            8,  // opcode
1187            0, 0, 0, 19, // length
1188            // warnings
1189            0, 1, 0, 11, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, // warnings
1190            0, 0, 0, 1, // body
1191        ];
1192
1193        let body = ResponseBody::Result(ResResultBody::Void);
1194
1195        let envelope = Envelope {
1196            version: Version::V4,
1197            opcode: Opcode::Result,
1198            flags: Flags::WARNING,
1199            direction: Direction::Response,
1200            stream_id: 1344,
1201            tracing_id: None,
1202            body: vec![0, 0, 0, 1],
1203            warnings: vec!["Hello World".into()],
1204        };
1205
1206        helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
1207    }
1208}