1use crate::compression::{Compression, CompressionError};
2use crate::frame::message_request::RequestBody;
3use crate::frame::message_response::ResponseBody;
4use crate::types::data_serialization_types::decode_timeuuid;
5use crate::types::{from_cursor_string_list, try_i16_from_bytes, try_i32_from_bytes, UUID_LEN};
6use bitflags::bitflags;
7use derivative::Derivative;
8use derive_more::{Constructor, Display};
9use std::convert::TryFrom;
10use std::io::Cursor;
11use thiserror::Error;
12use uuid::Uuid;
13
14pub use crate::frame::traits::*;
15
16const ENVELOPE_HEADER_LEN: usize = 9;
18pub const STREAM_LEN: usize = 2;
20pub const LENGTH_LEN: usize = 4;
22
23pub mod events;
24pub mod frame_decoder;
25pub mod frame_encoder;
26pub mod message_auth_challenge;
27pub mod message_auth_response;
28pub mod message_auth_success;
29pub mod message_authenticate;
30pub mod message_batch;
31pub mod message_error;
32pub mod message_event;
33pub mod message_execute;
34pub mod message_options;
35pub mod message_prepare;
36pub mod message_query;
37pub mod message_ready;
38pub mod message_register;
39pub mod message_request;
40pub mod message_response;
41pub mod message_result;
42pub mod message_startup;
43pub mod message_supported;
44pub mod traits;
45
46use crate::error;
47
48pub const EVENT_STREAM_ID: i16 = -1;
49
50const fn const_max(a: usize, b: usize) -> usize {
51 if a < b {
52 a
53 } else {
54 b
55 }
56}
57
58pub const PAYLOAD_SIZE_LIMIT: usize = 1 << 17;
60
61const UNCOMPRESSED_FRAME_HEADER_LENGTH: usize = 6;
62const COMPRESSED_FRAME_HEADER_LENGTH: usize = 8;
63const FRAME_TRAILER_LENGTH: usize = 4;
64
65pub const MAX_FRAME_SIZE: usize = PAYLOAD_SIZE_LIMIT
67 + const_max(
68 UNCOMPRESSED_FRAME_HEADER_LENGTH,
69 COMPRESSED_FRAME_HEADER_LENGTH,
70 )
71 + FRAME_TRAILER_LENGTH;
72
73pub type StreamId = i16;
75
76#[derive(Debug, Clone, Eq, PartialEq, Hash, Constructor)]
77pub struct ParsedEnvelope {
78 pub envelope_len: usize,
80 pub envelope: Envelope,
82}
83
84#[derive(Derivative, Clone, PartialEq, Eq, Hash)]
85#[derivative(Debug)]
86pub struct Envelope {
87 pub version: Version,
88 pub direction: Direction,
89 pub flags: Flags,
90 pub opcode: Opcode,
91 pub stream_id: StreamId,
92 #[derivative(Debug = "ignore")]
93 pub body: Vec<u8>,
94 pub tracing_id: Option<Uuid>,
95 pub warnings: Vec<String>,
96}
97
98impl Envelope {
99 #[inline]
100 #[allow(clippy::too_many_arguments)]
101 pub fn new(
102 version: Version,
103 direction: Direction,
104 flags: Flags,
105 opcode: Opcode,
106 stream_id: StreamId,
107 body: Vec<u8>,
108 tracing_id: Option<Uuid>,
109 warnings: Vec<String>,
110 ) -> Self {
111 Envelope {
112 version,
113 direction,
114 flags,
115 opcode,
116 stream_id,
117 body,
118 tracing_id,
119 warnings,
120 }
121 }
122
123 #[inline]
124 pub fn request_body(&self) -> error::Result<RequestBody> {
125 RequestBody::try_from(self.body.as_slice(), self.opcode, self.version)
126 }
127
128 #[inline]
129 pub fn response_body(&self) -> error::Result<ResponseBody> {
130 ResponseBody::try_from(self.body.as_slice(), self.opcode, self.version)
131 }
132
133 #[inline]
134 pub fn tracing_id(&self) -> &Option<Uuid> {
135 &self.tracing_id
136 }
137
138 #[inline]
139 pub fn warnings(&self) -> &[String] {
140 &self.warnings
141 }
142
143 pub fn from_buffer(
150 data: &[u8],
151 compression: Compression,
152 ) -> Result<ParsedEnvelope, ParseEnvelopeError> {
153 if data.len() < ENVELOPE_HEADER_LEN {
154 return Err(ParseEnvelopeError::NotEnoughBytes);
155 }
156
157 let body_len = try_i32_from_bytes(&data[5..9]).unwrap() as usize;
158 let envelope_len = ENVELOPE_HEADER_LEN + body_len;
159 if data.len() < envelope_len {
160 return Err(ParseEnvelopeError::NotEnoughBytes);
161 }
162
163 let version = Version::try_from(data[0])
164 .map_err(|_| ParseEnvelopeError::UnsupportedVersion(data[0] & 0x7f))?;
165 let direction = Direction::from(data[0]);
166 let flags = Flags::from_bits_truncate(data[1]);
167 let stream_id = try_i16_from_bytes(&data[2..4]).unwrap();
168 let opcode = Opcode::try_from(data[4])
169 .map_err(|_| ParseEnvelopeError::UnsupportedOpcode(data[4]))?;
170
171 let body_bytes = &data[ENVELOPE_HEADER_LEN..envelope_len];
172
173 let full_body = if flags.contains(Flags::COMPRESSION) {
174 compression.decode(body_bytes.to_vec())
175 } else {
176 Compression::None.decode(body_bytes.to_vec())
177 }
178 .map_err(ParseEnvelopeError::DecompressionError)?;
179
180 let body_len = full_body.len();
181
182 let mut body_cursor = Cursor::new(full_body.as_slice());
184
185 let tracing_id = if flags.contains(Flags::TRACING) && direction == Direction::Response {
186 let mut tracing_bytes = [0; UUID_LEN];
187 std::io::Read::read_exact(&mut body_cursor, &mut tracing_bytes).unwrap();
188
189 Some(decode_timeuuid(&tracing_bytes).map_err(ParseEnvelopeError::InvalidUuid)?)
190 } else {
191 None
192 };
193
194 let warnings = if flags.contains(Flags::WARNING) {
195 from_cursor_string_list(&mut body_cursor)
196 .map_err(ParseEnvelopeError::InvalidWarnings)?
197 } else {
198 vec![]
199 };
200
201 let mut body = Vec::with_capacity(body_len - body_cursor.position() as usize);
202
203 std::io::Read::read_to_end(&mut body_cursor, &mut body)
204 .expect("Read cannot fail because cursor is backed by slice");
205
206 Ok(ParsedEnvelope::new(
207 envelope_len,
208 Envelope {
209 version,
210 direction,
211 flags,
212 opcode,
213 stream_id,
214 body,
215 tracing_id,
216 warnings,
217 },
218 ))
219 }
220
221 pub fn check_envelope_size(data: &[u8]) -> Result<usize, CheckEnvelopeSizeError> {
222 if data.len() < ENVELOPE_HEADER_LEN {
223 return Err(CheckEnvelopeSizeError::NotEnoughBytes);
224 }
225
226 let body_len = try_i32_from_bytes(&data[5..9]).unwrap() as usize;
227 let envelope_len = ENVELOPE_HEADER_LEN + body_len;
228 if data.len() < envelope_len {
229 return Err(CheckEnvelopeSizeError::NotEnoughBytes);
230 }
231 let _ = Version::try_from(data[0])
232 .map_err(|_| CheckEnvelopeSizeError::UnsupportedVersion(data[0] & 0x7f))?;
233
234 Ok(envelope_len)
235 }
236
237 pub fn encode_with(&self, compressor: Compression) -> error::Result<Vec<u8>> {
238 let is_compressed = self.version < Version::V5 && compressor.is_compressed();
240
241 let combined_version_byte = u8::from(self.version) | u8::from(self.direction);
242 let flag_byte = (if is_compressed {
243 self.flags | Flags::COMPRESSION
244 } else {
245 self.flags.difference(Flags::COMPRESSION)
246 })
247 .bits();
248
249 let opcode_byte = u8::from(self.opcode);
250
251 let mut v = Vec::with_capacity(9);
252
253 v.push(combined_version_byte);
254 v.push(flag_byte);
255 v.extend_from_slice(&self.stream_id.to_be_bytes());
256 v.push(opcode_byte);
257
258 let mut flags_buffer = vec![];
259
260 if self.flags.contains(Flags::TRACING) && self.direction == Direction::Response {
261 let mut tracing_id = self
262 .tracing_id
263 .ok_or_else(|| {
264 error::Error::Io(std::io::Error::new(
265 std::io::ErrorKind::Other,
266 "Tracing flag was set but Envelope has no tracing_id",
267 ))
268 })?
269 .into_bytes()
270 .to_vec();
271
272 flags_buffer.append(&mut tracing_id);
273 };
274
275 if self.flags.contains(Flags::WARNING) && self.direction == Direction::Response {
276 let warnings_len = self.warnings.len() as i16;
277 flags_buffer.extend_from_slice(&warnings_len.to_be_bytes());
278
279 for warning in &self.warnings {
280 let warning_len = warning.len() as i16;
281 flags_buffer.extend_from_slice(&warning_len.to_be_bytes());
282 flags_buffer.append(&mut warning.as_bytes().to_vec());
283 }
284 }
285
286 if is_compressed {
287 let encoded_body = if flags_buffer.is_empty() {
289 compressor.encode(&self.body)?
290 } else {
291 flags_buffer.extend_from_slice(&self.body);
292 compressor.encode(&flags_buffer)?
293 };
294
295 let body_len = encoded_body.len() as i32;
296 v.extend_from_slice(&body_len.to_be_bytes());
297 v.extend_from_slice(&encoded_body);
298 } else {
299 if flags_buffer.is_empty() {
301 let body_len = self.body.len() as i32;
302 v.extend_from_slice(&body_len.to_be_bytes());
303 v.extend_from_slice(&self.body);
304 } else {
305 let body_len = self.body.len() as i32 + flags_buffer.len() as i32;
306 v.extend_from_slice(&body_len.to_be_bytes());
307 flags_buffer.extend_from_slice(&self.body);
308 v.append(&mut flags_buffer);
309 }
310 }
311
312 Ok(v)
313 }
314}
315
316#[derive(Debug, Error)]
317#[non_exhaustive]
318pub enum CheckEnvelopeSizeError {
319 #[error("Not enough bytes!")]
320 NotEnoughBytes,
321 #[error("Unsupported version: {0}")]
322 UnsupportedVersion(u8),
323 #[error("Unsupported opcode: {0}")]
324 UnsupportedOpcode(u8),
325}
326
327#[derive(Debug, Error)]
328#[non_exhaustive]
329pub enum ParseEnvelopeError {
330 #[error("Not enough bytes!")]
332 NotEnoughBytes,
333 #[error("Unsupported version: {0}")]
335 UnsupportedVersion(u8),
336 #[error("Unsupported opcode: {0}")]
337 UnsupportedOpcode(u8),
338 #[error("Decompression error: {0}")]
339 DecompressionError(CompressionError),
340 #[error("Invalid uuid: {0}")]
341 InvalidUuid(uuid::Error),
342 #[error("Invalid warnings: {0}")]
343 InvalidWarnings(error::Error),
344}
345
346#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
348#[non_exhaustive]
349pub enum Version {
350 V3,
351 V4,
352 V5,
353}
354
355impl From<Version> for u8 {
356 fn from(value: Version) -> Self {
357 match value {
358 Version::V3 => 3,
359 Version::V4 => 4,
360 Version::V5 => 5,
361 }
362 }
363}
364
365impl TryFrom<u8> for Version {
366 type Error = error::Error;
367
368 fn try_from(version: u8) -> Result<Self, Self::Error> {
369 match version & 0x7F {
370 3 => Ok(Version::V3),
371 4 => Ok(Version::V4),
372 5 => Ok(Version::V5),
373 v => Err(error::Error::General(format!(
374 "Unknown cassandra version: {v}"
375 ))),
376 }
377 }
378}
379
380impl Version {
381 pub const BYTE_LENGTH: usize = 1;
383}
384
385#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
386pub enum Direction {
387 Request,
388 Response,
389}
390
391impl From<Direction> for u8 {
392 fn from(value: Direction) -> u8 {
393 match value {
394 Direction::Request => 0x00,
395 Direction::Response => 0x80,
396 }
397 }
398}
399
400impl From<u8> for Direction {
401 fn from(value: u8) -> Self {
402 match value & 0x80 {
403 0 => Direction::Request,
404 _ => Direction::Response,
405 }
406 }
407}
408
409bitflags! {
410 #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
412 pub struct Flags: u8 {
413 const COMPRESSION = 0x01;
414 const TRACING = 0x02;
415 const CUSTOM_PAYLOAD = 0x04;
416 const WARNING = 0x08;
417 const BETA = 0x10;
418 }
419}
420
421impl Default for Flags {
422 #[inline]
423 fn default() -> Self {
424 Flags::empty()
425 }
426}
427
428impl Flags {
429 pub const BYTE_LENGTH: usize = 1;
431}
432
433#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
434#[non_exhaustive]
435pub enum Opcode {
436 Error,
437 Startup,
438 Ready,
439 Authenticate,
440 Options,
441 Supported,
442 Query,
443 Result,
444 Prepare,
445 Execute,
446 Register,
447 Event,
448 Batch,
449 AuthChallenge,
450 AuthResponse,
451 AuthSuccess,
452}
453
454impl Opcode {
455 pub const BYTE_LENGTH: usize = 1;
457}
458
459impl From<Opcode> for u8 {
460 fn from(value: Opcode) -> Self {
461 match value {
462 Opcode::Error => 0x00,
463 Opcode::Startup => 0x01,
464 Opcode::Ready => 0x02,
465 Opcode::Authenticate => 0x03,
466 Opcode::Options => 0x05,
467 Opcode::Supported => 0x06,
468 Opcode::Query => 0x07,
469 Opcode::Result => 0x08,
470 Opcode::Prepare => 0x09,
471 Opcode::Execute => 0x0A,
472 Opcode::Register => 0x0B,
473 Opcode::Event => 0x0C,
474 Opcode::Batch => 0x0D,
475 Opcode::AuthChallenge => 0x0E,
476 Opcode::AuthResponse => 0x0F,
477 Opcode::AuthSuccess => 0x10,
478 }
479 }
480}
481
482impl TryFrom<u8> for Opcode {
483 type Error = error::Error;
484
485 fn try_from(value: u8) -> Result<Self, <Opcode as TryFrom<u8>>::Error> {
486 match value {
487 0x00 => Ok(Opcode::Error),
488 0x01 => Ok(Opcode::Startup),
489 0x02 => Ok(Opcode::Ready),
490 0x03 => Ok(Opcode::Authenticate),
491 0x05 => Ok(Opcode::Options),
492 0x06 => Ok(Opcode::Supported),
493 0x07 => Ok(Opcode::Query),
494 0x08 => Ok(Opcode::Result),
495 0x09 => Ok(Opcode::Prepare),
496 0x0A => Ok(Opcode::Execute),
497 0x0B => Ok(Opcode::Register),
498 0x0C => Ok(Opcode::Event),
499 0x0D => Ok(Opcode::Batch),
500 0x0E => Ok(Opcode::AuthChallenge),
501 0x0F => Ok(Opcode::AuthResponse),
502 0x10 => Ok(Opcode::AuthSuccess),
503 _ => Err(error::Error::General(format!("Unknown opcode: {value}"))),
504 }
505 }
506}
507
508#[cfg(test)]
509mod helpers {
510 use super::*;
511
512 pub fn test_encode_decode_roundtrip_response(
513 raw_envelope: &[u8],
514 envelope: Envelope,
515 body: ResponseBody,
516 ) {
517 let encoded_body = body.serialize_to_vec(Version::V4);
519 assert_eq!(
520 &envelope.body, &encoded_body,
521 "encoded body did not match envelope's body"
522 );
523
524 let encoded_envelope = envelope.encode_with(Compression::None).unwrap();
525 assert_eq!(
526 raw_envelope, &encoded_envelope,
527 "encoded envelope did not match expected raw envelope"
528 );
529
530 let decoded_envelope = Envelope::from_buffer(raw_envelope, Compression::None)
532 .unwrap()
533 .envelope;
534 assert_eq!(decoded_envelope, envelope);
535
536 let decoded_body = envelope.response_body().unwrap();
537 assert_eq!(
538 body, decoded_body,
539 "decoded envelope.body did not match body"
540 )
541 }
542
543 pub fn test_encode_decode_roundtrip_request(
544 raw_envelope: &[u8],
545 envelope: Envelope,
546 body: RequestBody,
547 ) {
548 let encoded_body = body.serialize_to_vec(Version::V4);
550 assert_eq!(
551 &envelope.body, &encoded_body,
552 "encoded body did not match envelope's body"
553 );
554
555 let encoded_envelope = envelope.encode_with(Compression::None).unwrap();
556 assert_eq!(
557 raw_envelope, &encoded_envelope,
558 "encoded envelope did not match expected raw envelope"
559 );
560
561 let decoded_envelope = Envelope::from_buffer(raw_envelope, Compression::None)
563 .unwrap()
564 .envelope;
565 assert_eq!(envelope, decoded_envelope);
566
567 let decoded_body = envelope.request_body().unwrap();
568 assert_eq!(
569 body, decoded_body,
570 "decoded envelope.body did not match body"
571 )
572 }
573
574 pub fn test_encode_decode_roundtrip_nondeterministic_request(
576 mut envelope: Envelope,
577 body: RequestBody,
578 ) {
579 envelope.body = body.serialize_to_vec(Version::V4);
581
582 let decoded_body = envelope.request_body().unwrap();
584 assert_eq!(
585 body, decoded_body,
586 "decoded envelope.body did not match body"
587 )
588 }
589}
590
591#[cfg(test)]
593mod tests {
594 use super::*;
595 use crate::consistency::Consistency;
596 use crate::frame::frame_decoder::{
597 FrameDecoder, LegacyFrameDecoder, Lz4FrameDecoder, UncompressedFrameDecoder,
598 };
599 use crate::frame::frame_encoder::{
600 FrameEncoder, LegacyFrameEncoder, Lz4FrameEncoder, UncompressedFrameEncoder,
601 };
602 use crate::frame::message_query::BodyReqQuery;
603 use crate::query::query_params::QueryParams;
604 use crate::query::query_values::QueryValues;
605 use crate::types::value::Value;
606 use crate::types::CBytes;
607
608 #[test]
609 fn test_frame_version_as_byte() {
610 assert_eq!(u8::from(Version::V3), 0x03);
611 assert_eq!(u8::from(Version::V4), 0x04);
612 assert_eq!(u8::from(Version::V5), 0x05);
613
614 assert_eq!(u8::from(Direction::Request), 0x00);
615 assert_eq!(u8::from(Direction::Response), 0x80);
616 }
617
618 #[test]
619 fn test_frame_version_from() {
620 assert_eq!(Version::try_from(0x03).unwrap(), Version::V3);
621 assert_eq!(Version::try_from(0x83).unwrap(), Version::V3);
622 assert_eq!(Version::try_from(0x04).unwrap(), Version::V4);
623 assert_eq!(Version::try_from(0x84).unwrap(), Version::V4);
624 assert_eq!(Version::try_from(0x05).unwrap(), Version::V5);
625 assert_eq!(Version::try_from(0x85).unwrap(), Version::V5);
626
627 assert_eq!(Direction::from(0x03), Direction::Request);
628 assert_eq!(Direction::from(0x04), Direction::Request);
629 assert_eq!(Direction::from(0x05), Direction::Request);
630 assert_eq!(Direction::from(0x83), Direction::Response);
631 assert_eq!(Direction::from(0x84), Direction::Response);
632 assert_eq!(Direction::from(0x85), Direction::Response);
633 }
634
635 #[test]
636 fn test_opcode_as_byte() {
637 assert_eq!(u8::from(Opcode::Error), 0x00);
638 assert_eq!(u8::from(Opcode::Startup), 0x01);
639 assert_eq!(u8::from(Opcode::Ready), 0x02);
640 assert_eq!(u8::from(Opcode::Authenticate), 0x03);
641 assert_eq!(u8::from(Opcode::Options), 0x05);
642 assert_eq!(u8::from(Opcode::Supported), 0x06);
643 assert_eq!(u8::from(Opcode::Query), 0x07);
644 assert_eq!(u8::from(Opcode::Result), 0x08);
645 assert_eq!(u8::from(Opcode::Prepare), 0x09);
646 assert_eq!(u8::from(Opcode::Execute), 0x0A);
647 assert_eq!(u8::from(Opcode::Register), 0x0B);
648 assert_eq!(u8::from(Opcode::Event), 0x0C);
649 assert_eq!(u8::from(Opcode::Batch), 0x0D);
650 assert_eq!(u8::from(Opcode::AuthChallenge), 0x0E);
651 assert_eq!(u8::from(Opcode::AuthResponse), 0x0F);
652 assert_eq!(u8::from(Opcode::AuthSuccess), 0x10);
653 }
654
655 #[test]
656 fn test_opcode_from() {
657 assert_eq!(Opcode::try_from(0x00).unwrap(), Opcode::Error);
658 assert_eq!(Opcode::try_from(0x01).unwrap(), Opcode::Startup);
659 assert_eq!(Opcode::try_from(0x02).unwrap(), Opcode::Ready);
660 assert_eq!(Opcode::try_from(0x03).unwrap(), Opcode::Authenticate);
661 assert_eq!(Opcode::try_from(0x05).unwrap(), Opcode::Options);
662 assert_eq!(Opcode::try_from(0x06).unwrap(), Opcode::Supported);
663 assert_eq!(Opcode::try_from(0x07).unwrap(), Opcode::Query);
664 assert_eq!(Opcode::try_from(0x08).unwrap(), Opcode::Result);
665 assert_eq!(Opcode::try_from(0x09).unwrap(), Opcode::Prepare);
666 assert_eq!(Opcode::try_from(0x0A).unwrap(), Opcode::Execute);
667 assert_eq!(Opcode::try_from(0x0B).unwrap(), Opcode::Register);
668 assert_eq!(Opcode::try_from(0x0C).unwrap(), Opcode::Event);
669 assert_eq!(Opcode::try_from(0x0D).unwrap(), Opcode::Batch);
670 assert_eq!(Opcode::try_from(0x0E).unwrap(), Opcode::AuthChallenge);
671 assert_eq!(Opcode::try_from(0x0F).unwrap(), Opcode::AuthResponse);
672 assert_eq!(Opcode::try_from(0x10).unwrap(), Opcode::AuthSuccess);
673 }
674
675 #[test]
676 fn test_ready() {
677 let raw_envelope = vec![4, 0, 0, 0, 2, 0, 0, 0, 0];
678 let envelope = Envelope {
679 version: Version::V4,
680 direction: Direction::Request,
681 flags: Flags::empty(),
682 opcode: Opcode::Ready,
683 stream_id: 0,
684 body: vec![],
685 tracing_id: None,
686 warnings: vec![],
687 };
688 let body = ResponseBody::Ready;
689 helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
690 }
691
692 #[test]
693 fn test_query_minimal() {
694 let raw_envelope = [
695 4, 0, 0, 0, 7, 0, 0, 0, 11, 0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64,
696 ];
697 let envelope = Envelope {
698 version: Version::V4,
699 direction: Direction::Request,
700 flags: Flags::empty(),
701 opcode: Opcode::Query,
702 stream_id: 0,
703 body: vec![0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64],
704 tracing_id: None,
705 warnings: vec![],
706 };
707 let body = RequestBody::Query(BodyReqQuery {
708 query: "blah".into(),
709 query_params: QueryParams {
710 consistency: Consistency::Any,
711 with_names: true,
712 values: None,
713 page_size: None,
714 paging_state: None,
715 serial_consistency: None,
716 timestamp: None,
717 keyspace: None,
718 now_in_seconds: None,
719 },
720 });
721 helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
722 }
723
724 #[test]
725 fn test_query_simple_values() {
726 let raw_envelope = [
727 4, 0, 0, 0, 7, 0, 0, 0, 30, 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114,
728 121, 0, 8, 1, 0, 2, 0, 0, 0, 3, 1, 2, 3, 255, 255, 255, 255,
729 ];
730 let envelope = Envelope {
731 version: Version::V4,
732 direction: Direction::Request,
733 flags: Flags::empty(),
734 opcode: Opcode::Query,
735 stream_id: 0,
736 body: vec![
737 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114, 121, 0, 8, 1, 0, 2, 0, 0,
738 0, 3, 1, 2, 3, 255, 255, 255, 255,
739 ],
740 tracing_id: None,
741 warnings: vec![],
742 };
743 let body = RequestBody::Query(BodyReqQuery {
744 query: "some query".into(),
745 query_params: QueryParams {
746 consistency: Consistency::Serial,
747 with_names: false,
748 values: Some(QueryValues::SimpleValues(vec![
749 Value::Some(vec![1, 2, 3]),
750 Value::Null,
751 ])),
752 page_size: None,
753 paging_state: None,
754 serial_consistency: None,
755 timestamp: None,
756 keyspace: None,
757 now_in_seconds: None,
758 },
759 });
760 helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
761 }
762
763 #[test]
764 fn test_query_named_values() {
765 let envelope = Envelope {
766 version: Version::V4,
767 direction: Direction::Request,
768 flags: Flags::empty(),
769 opcode: Opcode::Query,
770 stream_id: 0,
771 body: vec![],
772 tracing_id: None,
773 warnings: vec![],
774 };
775 let body = RequestBody::Query(BodyReqQuery {
776 query: "another query".into(),
777 query_params: QueryParams {
778 consistency: Consistency::Three,
779 with_names: true,
780 values: Some(QueryValues::NamedValues(
781 vec![
782 ("foo".to_string(), Value::Some(vec![11, 12, 13])),
783 ("bar".to_string(), Value::NotSet),
784 ("baz".to_string(), Value::Some(vec![42, 10, 99, 100, 4])),
785 ]
786 .into_iter()
787 .collect(),
788 )),
789 page_size: Some(4),
790 paging_state: Some(CBytes::new(vec![0, 1, 2, 3])),
791 serial_consistency: Some(Consistency::One),
792 timestamp: Some(2000),
793 keyspace: None,
794 now_in_seconds: None,
795 },
796 });
797 helpers::test_encode_decode_roundtrip_nondeterministic_request(envelope, body);
798 }
799
800 #[test]
801 fn test_result_prepared_statement() {
802 use crate::frame::message_result::{
803 BodyResResultPrepared, ColSpec, ColType, ColTypeOption, PreparedMetadata,
804 ResResultBody, RowsMetadata, RowsMetadataFlags, TableSpec,
805 };
806 use crate::types::CBytesShort;
807
808 let raw_envelope = [
809 132, 0, 0, 0, 8, 0, 0, 0, 97, 0, 0, 0, 4, 0, 16, 195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27,
812 73, 0, 0, 0, 1, 0, 0, 0, 3, 0, 0, 0, 1, 0, 0, 0, 23, 116, 101, 115, 116, 95, 112, 114, 101, 112, 97, 114, 101, 95, 115, 116, 97, 116,
818 101, 109, 101, 110, 116,
819 115, 0, 7, 116, 97, 98, 108, 101, 95, 49, 0, 2, 105, 100, 0, 9, 0, 1, 120, 0, 9, 0, 4, 110, 97, 109, 101, 0, 13, 0, 0, 0, 4, 0, 0, 0, 0, ];
830 let envelope = Envelope {
831 version: Version::V4,
832 direction: Direction::Response,
833 flags: Flags::empty(),
834 opcode: Opcode::Result,
835 stream_id: 0,
836 body: vec![
837 0, 0, 0, 4, 0, 16, 195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27,
839 73, 0, 0, 0, 1, 0, 0, 0, 3, 0, 0, 0, 1, 0, 0, 0, 23, 116, 101, 115, 116, 95, 112, 114, 101, 112, 97, 114, 101, 95, 115, 116, 97,
845 116, 101, 109, 101, 110, 116,
846 115, 0, 7, 116, 97, 98, 108, 101, 95, 49, 0, 2, 105, 100, 0, 9, 0, 1, 120, 0, 9, 0, 4, 110, 97, 109, 101, 0, 13, 0, 0, 0, 4, 0, 0, 0, 0, ],
857 tracing_id: None,
858 warnings: vec![],
859 };
860 let body = ResponseBody::Result(ResResultBody::Prepared(BodyResResultPrepared {
861 id: CBytesShort::new(vec![
862 195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27, 73,
863 ]),
864 result_metadata_id: None,
865 metadata: PreparedMetadata {
866 pk_indexes: vec![0],
867 global_table_spec: Some(TableSpec {
868 ks_name: "test_prepare_statements".into(),
869 table_name: "table_1".into(),
870 }),
871 col_specs: vec![
872 ColSpec {
873 table_spec: None,
874 name: "id".into(),
875 col_type: ColTypeOption {
876 id: ColType::Int,
877 value: None,
878 },
879 },
880 ColSpec {
881 table_spec: None,
882 name: "x".into(),
883 col_type: ColTypeOption {
884 id: ColType::Int,
885 value: None,
886 },
887 },
888 ColSpec {
889 table_spec: None,
890 name: "name".into(),
891 col_type: ColTypeOption {
892 id: ColType::Varchar,
893 value: None,
894 },
895 },
896 ],
897 },
898 result_metadata: RowsMetadata {
899 flags: RowsMetadataFlags::NO_METADATA,
900 columns_count: 0,
901 paging_state: None,
902 new_metadata_id: None,
903 global_table_spec: None,
904 col_specs: vec![],
905 },
906 }));
907
908 helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
909 }
910
911 fn create_small_envelope_data() -> (Envelope, Vec<u8>) {
912 let raw_envelope = vec![
913 4, 0, 0, 0, 7, 0, 0, 0, 30, 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114,
914 121, 0, 8, 1, 0, 2, 0, 0, 0, 3, 1, 2, 3, 255, 255, 255, 255,
915 ];
916 let envelope = Envelope {
917 version: Version::V4,
918 direction: Direction::Request,
919 flags: Flags::empty(),
920 opcode: Opcode::Query,
921 stream_id: 0,
922 body: vec![
923 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114, 121, 0, 8, 1, 0, 2, 0, 0,
924 0, 3, 1, 2, 3, 255, 255, 255, 255,
925 ],
926 tracing_id: None,
927 warnings: vec![],
928 };
929
930 (envelope, raw_envelope)
931 }
932
933 fn create_large_envelope_data() -> (Envelope, Vec<u8>) {
934 let body: Vec<u8> = (0..262144).map(|value| (value % 256) as u8).collect();
935
936 let mut raw_envelope = vec![4, 0, 0, 0, 7, 0, 4, 0, 0];
937 raw_envelope.append(&mut body.clone());
938
939 let envelope = Envelope {
940 version: Version::V4,
941 direction: Direction::Request,
942 flags: Flags::empty(),
943 opcode: Opcode::Query,
944 stream_id: 0,
945 body,
946 tracing_id: None,
947 warnings: vec![],
948 };
949
950 (envelope, raw_envelope)
951 }
952
953 #[test]
954 fn should_encode_and_decode_legacy_frames() {
955 let (envelope, raw_envelope) = create_small_envelope_data();
956
957 let mut encoder = LegacyFrameEncoder::default();
958 assert!(encoder.can_fit(raw_envelope.len()));
959
960 encoder.add_envelope(raw_envelope.clone());
961 assert!(!encoder.can_fit(1));
962
963 let mut frame = encoder.finalize_self_contained().to_vec();
964 assert_eq!(frame, raw_envelope);
965
966 let mut decoder = LegacyFrameDecoder::default();
967
968 let envelopes = decoder.consume(&mut frame, Compression::None).unwrap();
969 assert_eq!(envelopes.len(), 1);
970 assert_eq!(envelopes[0], envelope);
971
972 encoder.reset();
973 assert!(encoder.can_fit(raw_envelope.len()));
974 }
975
976 #[test]
977 fn should_encode_and_decode_uncompressed_self_contained_frames() {
978 let (envelope, raw_envelope) = create_small_envelope_data();
979
980 let mut encoder = UncompressedFrameEncoder::default();
981 assert!(encoder.can_fit(raw_envelope.len()));
982
983 encoder.add_envelope(raw_envelope.clone());
984 assert!(encoder.can_fit(raw_envelope.len()));
985
986 encoder.add_envelope(raw_envelope);
987
988 let mut buffer1 = encoder.finalize_self_contained().to_vec();
989 let mut buffer2 = buffer1.split_off(5);
990
991 let mut decoder = UncompressedFrameDecoder::default();
992
993 let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
994 assert!(buffer1.is_empty());
995 assert!(envelopes.is_empty());
996
997 let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
998 assert!(buffer2.is_empty());
999 assert_eq!(envelopes.len(), 2);
1000 assert_eq!(envelopes[0], envelope);
1001 assert_eq!(envelopes[1], envelope);
1002 }
1003
1004 #[test]
1005 fn should_encode_and_decode_uncompressed_non_self_contained_frames() {
1006 let (envelope, raw_envelope) = create_large_envelope_data();
1007
1008 let mut encoder = UncompressedFrameEncoder::default();
1009 assert!(!encoder.can_fit(raw_envelope.len()));
1010
1011 let data_len = raw_envelope.len();
1012 let mut data_start = 0;
1013 let mut buffer1 = vec![];
1014
1015 while data_start < data_len {
1016 let (data_start_offset, frame) =
1017 encoder.finalize_non_self_contained(&raw_envelope[data_start..]);
1018
1019 data_start += data_start_offset;
1020
1021 buffer1.extend_from_slice(frame);
1022
1023 encoder.reset();
1024 }
1025
1026 let mut buffer2 = buffer1.split_off(PAYLOAD_SIZE_LIMIT);
1027
1028 let mut decoder = UncompressedFrameDecoder::default();
1029
1030 let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1031 assert!(buffer1.is_empty());
1032 assert!(envelopes.is_empty());
1033
1034 let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1035 assert!(buffer2.is_empty());
1036 assert_eq!(envelopes.len(), 1);
1037 assert_eq!(envelopes[0], envelope);
1038 }
1039
1040 #[test]
1041 fn should_encode_and_decode_compressed_self_contained_frames() {
1042 let (envelope, raw_envelope) = create_small_envelope_data();
1043
1044 let mut encoder = Lz4FrameEncoder::default();
1045 assert!(encoder.can_fit(raw_envelope.len()));
1046
1047 encoder.add_envelope(raw_envelope.clone());
1048 assert!(encoder.can_fit(raw_envelope.len()));
1049
1050 encoder.add_envelope(raw_envelope);
1051
1052 let mut buffer1 = encoder.finalize_self_contained().to_vec();
1053 let mut buffer2 = buffer1.split_off(5);
1054
1055 let mut decoder = Lz4FrameDecoder::default();
1056
1057 let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1058 assert!(buffer1.is_empty());
1059 assert!(envelopes.is_empty());
1060
1061 let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1062 assert!(buffer2.is_empty());
1063 assert_eq!(envelopes.len(), 2);
1064 assert_eq!(envelopes[0], envelope);
1065 assert_eq!(envelopes[1], envelope);
1066 }
1067
1068 #[test]
1069 fn should_encode_and_decode_compressed_non_self_contained_frames() {
1070 let (envelope, raw_envelope) = create_large_envelope_data();
1071
1072 let mut encoder = Lz4FrameEncoder::default();
1073 assert!(!encoder.can_fit(raw_envelope.len()));
1074
1075 let data_len = raw_envelope.len();
1076 let mut data_start = 0;
1077 let mut buffer1 = vec![];
1078
1079 while data_start < data_len {
1080 let (data_start_offset, frame) =
1081 encoder.finalize_non_self_contained(&raw_envelope[data_start..]);
1082
1083 data_start += data_start_offset;
1084
1085 buffer1.extend_from_slice(frame);
1086
1087 encoder.reset();
1088 }
1089
1090 let mut buffer2 = buffer1.split_off(1000);
1091
1092 let mut decoder = Lz4FrameDecoder::default();
1093
1094 let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1095 assert!(buffer1.is_empty());
1096 assert!(envelopes.is_empty());
1097
1098 let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1099 assert!(buffer2.is_empty());
1100 assert_eq!(envelopes.len(), 1);
1101 assert_eq!(envelopes[0], envelope);
1102 }
1103}
1104
1105#[cfg(test)]
1106mod flags {
1107 use super::*;
1108 use crate::consistency::Consistency;
1109 use crate::frame::message_query::BodyReqQuery;
1110 use crate::frame::message_result::ResResultBody;
1111 use crate::query::query_params::QueryParams;
1112
1113 #[test]
1114 fn test_tracing_id_request() {
1115 let raw_envelope = [
1116 4, 2, 0, 12, 7, 0, 0, 0, 11, 0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64, ];
1123 let envelope = Envelope {
1124 version: Version::V4,
1125 direction: Direction::Request,
1126 flags: Flags::TRACING,
1127 opcode: Opcode::Query,
1128 stream_id: 12,
1129 body: vec![0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64],
1130 tracing_id: None,
1131 warnings: vec![],
1132 };
1133
1134 let body = RequestBody::Query(BodyReqQuery {
1135 query: "blah".into(),
1136 query_params: QueryParams {
1137 consistency: Consistency::Any,
1138 with_names: true,
1139 values: None,
1140 page_size: None,
1141 paging_state: None,
1142 serial_consistency: None,
1143 timestamp: None,
1144 keyspace: None,
1145 now_in_seconds: None,
1146 },
1147 });
1148 helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
1149 }
1150
1151 #[test]
1152 fn test_tracing_id_response() {
1153 let raw_envelope = [
1154 132, 2, 0, 12, 8, 0, 0, 0, 20, 4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87, 0, 0, 0, 1, ];
1162 let envelope = Envelope {
1163 version: Version::V4,
1164 direction: Direction::Response,
1165 flags: Flags::TRACING,
1166 opcode: Opcode::Result,
1167 stream_id: 12,
1168 body: vec![0, 0, 0, 1],
1169 tracing_id: Some(uuid::Uuid::from_bytes([
1170 4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87,
1171 ])),
1172 warnings: vec![],
1173 };
1174
1175 let body = ResponseBody::Result(ResResultBody::Void);
1176
1177 helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
1178 }
1179
1180 #[test]
1181 fn test_warnings_response() {
1182 let raw_envelope = [
1183 132, 8, 5, 64, 8, 0, 0, 0, 19, 0, 1, 0, 11, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 0, 0, 0, 1, ];
1192
1193 let body = ResponseBody::Result(ResResultBody::Void);
1194
1195 let envelope = Envelope {
1196 version: Version::V4,
1197 opcode: Opcode::Result,
1198 flags: Flags::WARNING,
1199 direction: Direction::Response,
1200 stream_id: 1344,
1201 tracing_id: None,
1202 body: vec![0, 0, 0, 1],
1203 warnings: vec!["Hello World".into()],
1204 };
1205
1206 helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
1207 }
1208}