1use crate::compression::{Compression, CompressionError};
2use crate::frame::message_request::RequestBody;
3use crate::frame::message_response::ResponseBody;
4use crate::types::data_serialization_types::decode_timeuuid;
5use crate::types::{from_cursor_string_list, try_i16_from_bytes, try_i32_from_bytes, UUID_LEN};
6use bitflags::bitflags;
7use derivative::Derivative;
8use derive_more::{Constructor, Display};
9use std::convert::TryFrom;
10use std::io::Cursor;
11use thiserror::Error;
12use uuid::Uuid;
13
14pub use crate::frame::traits::*;
15
16const ENVELOPE_HEADER_LEN: usize = 9;
18pub const STREAM_LEN: usize = 2;
20pub const LENGTH_LEN: usize = 4;
22
23pub mod events;
24pub mod frame_decoder;
25pub mod frame_encoder;
26pub mod message_auth_challenge;
27pub mod message_auth_response;
28pub mod message_auth_success;
29pub mod message_authenticate;
30pub mod message_batch;
31pub mod message_error;
32pub mod message_event;
33pub mod message_execute;
34pub mod message_options;
35pub mod message_prepare;
36pub mod message_query;
37pub mod message_ready;
38pub mod message_register;
39pub mod message_request;
40pub mod message_response;
41pub mod message_result;
42pub mod message_startup;
43pub mod message_supported;
44pub mod traits;
45
46use crate::error;
47
48pub const EVENT_STREAM_ID: i16 = -1;
49
50const fn const_max(a: usize, b: usize) -> usize {
51 if a < b {
52 a
53 } else {
54 b
55 }
56}
57
58pub const PAYLOAD_SIZE_LIMIT: usize = 1 << 17;
60
61const UNCOMPRESSED_FRAME_HEADER_LENGTH: usize = 6;
62const COMPRESSED_FRAME_HEADER_LENGTH: usize = 8;
63const FRAME_TRAILER_LENGTH: usize = 4;
64
65pub const MAX_FRAME_SIZE: usize = PAYLOAD_SIZE_LIMIT
67 + const_max(
68 UNCOMPRESSED_FRAME_HEADER_LENGTH,
69 COMPRESSED_FRAME_HEADER_LENGTH,
70 )
71 + FRAME_TRAILER_LENGTH;
72
73pub type StreamId = i16;
75
76#[derive(Debug, Clone, Eq, PartialEq, Hash, Constructor)]
77pub struct ParsedEnvelope {
78 pub envelope_len: usize,
80 pub envelope: Envelope,
82}
83
84#[derive(Derivative, Clone, PartialEq, Eq, Hash)]
85#[derivative(Debug)]
86pub struct Envelope {
87 pub version: Version,
88 pub direction: Direction,
89 pub flags: Flags,
90 pub opcode: Opcode,
91 pub stream_id: StreamId,
92 #[derivative(Debug = "ignore")]
93 pub body: Vec<u8>,
94 pub tracing_id: Option<Uuid>,
95 pub warnings: Vec<String>,
96}
97
98impl Envelope {
99 #[inline]
100 #[allow(clippy::too_many_arguments)]
101 pub fn new(
102 version: Version,
103 direction: Direction,
104 flags: Flags,
105 opcode: Opcode,
106 stream_id: StreamId,
107 body: Vec<u8>,
108 tracing_id: Option<Uuid>,
109 warnings: Vec<String>,
110 ) -> Self {
111 Envelope {
112 version,
113 direction,
114 flags,
115 opcode,
116 stream_id,
117 body,
118 tracing_id,
119 warnings,
120 }
121 }
122
123 #[inline]
124 pub fn request_body(&self) -> error::Result<RequestBody> {
125 RequestBody::try_from(self.body.as_slice(), self.opcode, self.version)
126 }
127
128 #[inline]
129 pub fn response_body(&self) -> error::Result<ResponseBody> {
130 ResponseBody::try_from(self.body.as_slice(), self.opcode, self.version)
131 }
132
133 #[inline]
134 pub fn tracing_id(&self) -> &Option<Uuid> {
135 &self.tracing_id
136 }
137
138 #[inline]
139 pub fn warnings(&self) -> &[String] {
140 &self.warnings
141 }
142
143 pub fn from_buffer(
150 data: &[u8],
151 compression: Compression,
152 ) -> Result<ParsedEnvelope, ParseEnvelopeError> {
153 if data.len() < ENVELOPE_HEADER_LEN {
154 return Err(ParseEnvelopeError::NotEnoughBytes);
155 }
156
157 let body_len = try_i32_from_bytes(&data[5..9]).unwrap() as usize;
158 let envelope_len = ENVELOPE_HEADER_LEN + body_len;
159 if data.len() < envelope_len {
160 return Err(ParseEnvelopeError::NotEnoughBytes);
161 }
162
163 let version = Version::try_from(data[0])
164 .map_err(|_| ParseEnvelopeError::UnsupportedVersion(data[0] & 0x7f))?;
165 let direction = Direction::from(data[0]);
166 let flags = Flags::from_bits_truncate(data[1]);
167 let stream_id = try_i16_from_bytes(&data[2..4]).unwrap();
168 let opcode = Opcode::try_from(data[4])
169 .map_err(|_| ParseEnvelopeError::UnsupportedOpcode(data[4]))?;
170
171 let body_bytes = &data[ENVELOPE_HEADER_LEN..envelope_len];
172
173 let full_body = if flags.contains(Flags::COMPRESSION) {
174 compression.decode(body_bytes.to_vec())
175 } else {
176 Compression::None.decode(body_bytes.to_vec())
177 }
178 .map_err(ParseEnvelopeError::DecompressionError)?;
179
180 let body_len = full_body.len();
181
182 let mut body_cursor = Cursor::new(full_body.as_slice());
184
185 let tracing_id = if flags.contains(Flags::TRACING) && direction == Direction::Response {
186 let mut tracing_bytes = [0; UUID_LEN];
187 std::io::Read::read_exact(&mut body_cursor, &mut tracing_bytes).unwrap();
188
189 Some(decode_timeuuid(&tracing_bytes).map_err(ParseEnvelopeError::InvalidUuid)?)
190 } else {
191 None
192 };
193
194 let warnings = if flags.contains(Flags::WARNING) {
195 from_cursor_string_list(&mut body_cursor)
196 .map_err(ParseEnvelopeError::InvalidWarnings)?
197 } else {
198 vec![]
199 };
200
201 let mut body = Vec::with_capacity(body_len - body_cursor.position() as usize);
202
203 std::io::Read::read_to_end(&mut body_cursor, &mut body)
204 .expect("Read cannot fail because cursor is backed by slice");
205
206 Ok(ParsedEnvelope::new(
207 envelope_len,
208 Envelope {
209 version,
210 direction,
211 flags,
212 opcode,
213 stream_id,
214 body,
215 tracing_id,
216 warnings,
217 },
218 ))
219 }
220
221 pub fn check_envelope_size(data: &[u8]) -> Result<usize, CheckEnvelopeSizeError> {
222 if data.len() < ENVELOPE_HEADER_LEN {
223 return Err(CheckEnvelopeSizeError::NotEnoughBytes);
224 }
225
226 let body_len = try_i32_from_bytes(&data[5..9]).unwrap() as usize;
227 let envelope_len = ENVELOPE_HEADER_LEN + body_len;
228 if data.len() < envelope_len {
229 return Err(CheckEnvelopeSizeError::NotEnoughBytes);
230 }
231 let _ = Version::try_from(data[0])
232 .map_err(|_| CheckEnvelopeSizeError::UnsupportedVersion(data[0] & 0x7f))?;
233
234 Ok(envelope_len)
235 }
236
237 pub fn encode_with(&self, compressor: Compression) -> error::Result<Vec<u8>> {
238 let is_compressed = self.version < Version::V5 && compressor.is_compressed();
240
241 let combined_version_byte = u8::from(self.version) | u8::from(self.direction);
242 let flag_byte = (if is_compressed {
243 self.flags | Flags::COMPRESSION
244 } else {
245 self.flags.difference(Flags::COMPRESSION)
246 })
247 .bits();
248
249 let opcode_byte = u8::from(self.opcode);
250
251 let mut v = Vec::with_capacity(9);
252
253 v.push(combined_version_byte);
254 v.push(flag_byte);
255 v.extend_from_slice(&self.stream_id.to_be_bytes());
256 v.push(opcode_byte);
257
258 let mut flags_buffer = vec![];
259
260 if self.flags.contains(Flags::TRACING) && self.direction == Direction::Response {
261 let mut tracing_id = self
262 .tracing_id
263 .ok_or_else(|| {
264 error::Error::Io(std::io::Error::other(
265 "Tracing flag was set but Envelope has no tracing_id",
266 ))
267 })?
268 .into_bytes()
269 .to_vec();
270
271 flags_buffer.append(&mut tracing_id);
272 };
273
274 if self.flags.contains(Flags::WARNING) && self.direction == Direction::Response {
275 let warnings_len = self.warnings.len() as i16;
276 flags_buffer.extend_from_slice(&warnings_len.to_be_bytes());
277
278 for warning in &self.warnings {
279 let warning_len = warning.len() as i16;
280 flags_buffer.extend_from_slice(&warning_len.to_be_bytes());
281 flags_buffer.append(&mut warning.as_bytes().to_vec());
282 }
283 }
284
285 if is_compressed {
286 let encoded_body = if flags_buffer.is_empty() {
288 compressor.encode(&self.body)?
289 } else {
290 flags_buffer.extend_from_slice(&self.body);
291 compressor.encode(&flags_buffer)?
292 };
293
294 let body_len = encoded_body.len() as i32;
295 v.extend_from_slice(&body_len.to_be_bytes());
296 v.extend_from_slice(&encoded_body);
297 } else {
298 if flags_buffer.is_empty() {
300 let body_len = self.body.len() as i32;
301 v.extend_from_slice(&body_len.to_be_bytes());
302 v.extend_from_slice(&self.body);
303 } else {
304 let body_len = self.body.len() as i32 + flags_buffer.len() as i32;
305 v.extend_from_slice(&body_len.to_be_bytes());
306 flags_buffer.extend_from_slice(&self.body);
307 v.append(&mut flags_buffer);
308 }
309 }
310
311 Ok(v)
312 }
313}
314
315#[derive(Debug, Error)]
316#[non_exhaustive]
317pub enum CheckEnvelopeSizeError {
318 #[error("Not enough bytes!")]
319 NotEnoughBytes,
320 #[error("Unsupported version: {0}")]
321 UnsupportedVersion(u8),
322 #[error("Unsupported opcode: {0}")]
323 UnsupportedOpcode(u8),
324}
325
326#[derive(Debug, Error)]
327#[non_exhaustive]
328pub enum ParseEnvelopeError {
329 #[error("Not enough bytes!")]
331 NotEnoughBytes,
332 #[error("Unsupported version: {0}")]
334 UnsupportedVersion(u8),
335 #[error("Unsupported opcode: {0}")]
336 UnsupportedOpcode(u8),
337 #[error("Decompression error: {0}")]
338 DecompressionError(CompressionError),
339 #[error("Invalid uuid: {0}")]
340 InvalidUuid(uuid::Error),
341 #[error("Invalid warnings: {0}")]
342 InvalidWarnings(error::Error),
343}
344
345#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
347#[non_exhaustive]
348pub enum Version {
349 V3,
350 V4,
351 V5,
352}
353
354impl From<Version> for u8 {
355 fn from(value: Version) -> Self {
356 match value {
357 Version::V3 => 3,
358 Version::V4 => 4,
359 Version::V5 => 5,
360 }
361 }
362}
363
364impl TryFrom<u8> for Version {
365 type Error = error::Error;
366
367 fn try_from(version: u8) -> Result<Self, Self::Error> {
368 match version & 0x7F {
369 3 => Ok(Version::V3),
370 4 => Ok(Version::V4),
371 5 => Ok(Version::V5),
372 v => Err(error::Error::General(format!(
373 "Unknown cassandra version: {v}"
374 ))),
375 }
376 }
377}
378
379impl Version {
380 pub const BYTE_LENGTH: usize = 1;
382}
383
384#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
385pub enum Direction {
386 Request,
387 Response,
388}
389
390impl From<Direction> for u8 {
391 fn from(value: Direction) -> u8 {
392 match value {
393 Direction::Request => 0x00,
394 Direction::Response => 0x80,
395 }
396 }
397}
398
399impl From<u8> for Direction {
400 fn from(value: u8) -> Self {
401 match value & 0x80 {
402 0 => Direction::Request,
403 _ => Direction::Response,
404 }
405 }
406}
407
408bitflags! {
409 #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
411 pub struct Flags: u8 {
412 const COMPRESSION = 0x01;
413 const TRACING = 0x02;
414 const CUSTOM_PAYLOAD = 0x04;
415 const WARNING = 0x08;
416 const BETA = 0x10;
417 }
418}
419
420impl Default for Flags {
421 #[inline]
422 fn default() -> Self {
423 Flags::empty()
424 }
425}
426
427impl Flags {
428 pub const BYTE_LENGTH: usize = 1;
430}
431
432#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash, Display)]
433#[non_exhaustive]
434pub enum Opcode {
435 Error,
436 Startup,
437 Ready,
438 Authenticate,
439 Options,
440 Supported,
441 Query,
442 Result,
443 Prepare,
444 Execute,
445 Register,
446 Event,
447 Batch,
448 AuthChallenge,
449 AuthResponse,
450 AuthSuccess,
451}
452
453impl Opcode {
454 pub const BYTE_LENGTH: usize = 1;
456}
457
458impl From<Opcode> for u8 {
459 fn from(value: Opcode) -> Self {
460 match value {
461 Opcode::Error => 0x00,
462 Opcode::Startup => 0x01,
463 Opcode::Ready => 0x02,
464 Opcode::Authenticate => 0x03,
465 Opcode::Options => 0x05,
466 Opcode::Supported => 0x06,
467 Opcode::Query => 0x07,
468 Opcode::Result => 0x08,
469 Opcode::Prepare => 0x09,
470 Opcode::Execute => 0x0A,
471 Opcode::Register => 0x0B,
472 Opcode::Event => 0x0C,
473 Opcode::Batch => 0x0D,
474 Opcode::AuthChallenge => 0x0E,
475 Opcode::AuthResponse => 0x0F,
476 Opcode::AuthSuccess => 0x10,
477 }
478 }
479}
480
481impl TryFrom<u8> for Opcode {
482 type Error = error::Error;
483
484 fn try_from(value: u8) -> Result<Self, <Opcode as TryFrom<u8>>::Error> {
485 match value {
486 0x00 => Ok(Opcode::Error),
487 0x01 => Ok(Opcode::Startup),
488 0x02 => Ok(Opcode::Ready),
489 0x03 => Ok(Opcode::Authenticate),
490 0x05 => Ok(Opcode::Options),
491 0x06 => Ok(Opcode::Supported),
492 0x07 => Ok(Opcode::Query),
493 0x08 => Ok(Opcode::Result),
494 0x09 => Ok(Opcode::Prepare),
495 0x0A => Ok(Opcode::Execute),
496 0x0B => Ok(Opcode::Register),
497 0x0C => Ok(Opcode::Event),
498 0x0D => Ok(Opcode::Batch),
499 0x0E => Ok(Opcode::AuthChallenge),
500 0x0F => Ok(Opcode::AuthResponse),
501 0x10 => Ok(Opcode::AuthSuccess),
502 _ => Err(error::Error::General(format!("Unknown opcode: {value}"))),
503 }
504 }
505}
506
507#[cfg(test)]
508mod helpers {
509 use super::*;
510
511 pub fn test_encode_decode_roundtrip_response(
512 raw_envelope: &[u8],
513 envelope: Envelope,
514 body: ResponseBody,
515 ) {
516 let encoded_body = body.serialize_to_vec(Version::V4);
518 assert_eq!(
519 &envelope.body, &encoded_body,
520 "encoded body did not match envelope's body"
521 );
522
523 let encoded_envelope = envelope.encode_with(Compression::None).unwrap();
524 assert_eq!(
525 raw_envelope, &encoded_envelope,
526 "encoded envelope did not match expected raw envelope"
527 );
528
529 let decoded_envelope = Envelope::from_buffer(raw_envelope, Compression::None)
531 .unwrap()
532 .envelope;
533 assert_eq!(decoded_envelope, envelope);
534
535 let decoded_body = envelope.response_body().unwrap();
536 assert_eq!(
537 body, decoded_body,
538 "decoded envelope.body did not match body"
539 )
540 }
541
542 pub fn test_encode_decode_roundtrip_request(
543 raw_envelope: &[u8],
544 envelope: Envelope,
545 body: RequestBody,
546 ) {
547 let encoded_body = body.serialize_to_vec(Version::V4);
549 assert_eq!(
550 &envelope.body, &encoded_body,
551 "encoded body did not match envelope's body"
552 );
553
554 let encoded_envelope = envelope.encode_with(Compression::None).unwrap();
555 assert_eq!(
556 raw_envelope, &encoded_envelope,
557 "encoded envelope did not match expected raw envelope"
558 );
559
560 let decoded_envelope = Envelope::from_buffer(raw_envelope, Compression::None)
562 .unwrap()
563 .envelope;
564 assert_eq!(envelope, decoded_envelope);
565
566 let decoded_body = envelope.request_body().unwrap();
567 assert_eq!(
568 body, decoded_body,
569 "decoded envelope.body did not match body"
570 )
571 }
572
573 pub fn test_encode_decode_roundtrip_nondeterministic_request(
575 mut envelope: Envelope,
576 body: RequestBody,
577 ) {
578 envelope.body = body.serialize_to_vec(Version::V4);
580
581 let decoded_body = envelope.request_body().unwrap();
583 assert_eq!(
584 body, decoded_body,
585 "decoded envelope.body did not match body"
586 )
587 }
588}
589
590#[cfg(test)]
592mod tests {
593 use super::*;
594 use crate::consistency::Consistency;
595 use crate::frame::frame_decoder::{
596 FrameDecoder, LegacyFrameDecoder, Lz4FrameDecoder, UncompressedFrameDecoder,
597 };
598 use crate::frame::frame_encoder::{
599 FrameEncoder, LegacyFrameEncoder, Lz4FrameEncoder, UncompressedFrameEncoder,
600 };
601 use crate::frame::message_query::BodyReqQuery;
602 use crate::query::query_params::QueryParams;
603 use crate::query::query_values::QueryValues;
604 use crate::types::value::Value;
605 use crate::types::CBytes;
606
607 #[test]
608 fn test_frame_version_as_byte() {
609 assert_eq!(u8::from(Version::V3), 0x03);
610 assert_eq!(u8::from(Version::V4), 0x04);
611 assert_eq!(u8::from(Version::V5), 0x05);
612
613 assert_eq!(u8::from(Direction::Request), 0x00);
614 assert_eq!(u8::from(Direction::Response), 0x80);
615 }
616
617 #[test]
618 fn test_frame_version_from() {
619 assert_eq!(Version::try_from(0x03).unwrap(), Version::V3);
620 assert_eq!(Version::try_from(0x83).unwrap(), Version::V3);
621 assert_eq!(Version::try_from(0x04).unwrap(), Version::V4);
622 assert_eq!(Version::try_from(0x84).unwrap(), Version::V4);
623 assert_eq!(Version::try_from(0x05).unwrap(), Version::V5);
624 assert_eq!(Version::try_from(0x85).unwrap(), Version::V5);
625
626 assert_eq!(Direction::from(0x03), Direction::Request);
627 assert_eq!(Direction::from(0x04), Direction::Request);
628 assert_eq!(Direction::from(0x05), Direction::Request);
629 assert_eq!(Direction::from(0x83), Direction::Response);
630 assert_eq!(Direction::from(0x84), Direction::Response);
631 assert_eq!(Direction::from(0x85), Direction::Response);
632 }
633
634 #[test]
635 fn test_opcode_as_byte() {
636 assert_eq!(u8::from(Opcode::Error), 0x00);
637 assert_eq!(u8::from(Opcode::Startup), 0x01);
638 assert_eq!(u8::from(Opcode::Ready), 0x02);
639 assert_eq!(u8::from(Opcode::Authenticate), 0x03);
640 assert_eq!(u8::from(Opcode::Options), 0x05);
641 assert_eq!(u8::from(Opcode::Supported), 0x06);
642 assert_eq!(u8::from(Opcode::Query), 0x07);
643 assert_eq!(u8::from(Opcode::Result), 0x08);
644 assert_eq!(u8::from(Opcode::Prepare), 0x09);
645 assert_eq!(u8::from(Opcode::Execute), 0x0A);
646 assert_eq!(u8::from(Opcode::Register), 0x0B);
647 assert_eq!(u8::from(Opcode::Event), 0x0C);
648 assert_eq!(u8::from(Opcode::Batch), 0x0D);
649 assert_eq!(u8::from(Opcode::AuthChallenge), 0x0E);
650 assert_eq!(u8::from(Opcode::AuthResponse), 0x0F);
651 assert_eq!(u8::from(Opcode::AuthSuccess), 0x10);
652 }
653
654 #[test]
655 fn test_opcode_from() {
656 assert_eq!(Opcode::try_from(0x00).unwrap(), Opcode::Error);
657 assert_eq!(Opcode::try_from(0x01).unwrap(), Opcode::Startup);
658 assert_eq!(Opcode::try_from(0x02).unwrap(), Opcode::Ready);
659 assert_eq!(Opcode::try_from(0x03).unwrap(), Opcode::Authenticate);
660 assert_eq!(Opcode::try_from(0x05).unwrap(), Opcode::Options);
661 assert_eq!(Opcode::try_from(0x06).unwrap(), Opcode::Supported);
662 assert_eq!(Opcode::try_from(0x07).unwrap(), Opcode::Query);
663 assert_eq!(Opcode::try_from(0x08).unwrap(), Opcode::Result);
664 assert_eq!(Opcode::try_from(0x09).unwrap(), Opcode::Prepare);
665 assert_eq!(Opcode::try_from(0x0A).unwrap(), Opcode::Execute);
666 assert_eq!(Opcode::try_from(0x0B).unwrap(), Opcode::Register);
667 assert_eq!(Opcode::try_from(0x0C).unwrap(), Opcode::Event);
668 assert_eq!(Opcode::try_from(0x0D).unwrap(), Opcode::Batch);
669 assert_eq!(Opcode::try_from(0x0E).unwrap(), Opcode::AuthChallenge);
670 assert_eq!(Opcode::try_from(0x0F).unwrap(), Opcode::AuthResponse);
671 assert_eq!(Opcode::try_from(0x10).unwrap(), Opcode::AuthSuccess);
672 }
673
674 #[test]
675 fn test_ready() {
676 let raw_envelope = vec![4, 0, 0, 0, 2, 0, 0, 0, 0];
677 let envelope = Envelope {
678 version: Version::V4,
679 direction: Direction::Request,
680 flags: Flags::empty(),
681 opcode: Opcode::Ready,
682 stream_id: 0,
683 body: vec![],
684 tracing_id: None,
685 warnings: vec![],
686 };
687 let body = ResponseBody::Ready;
688 helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
689 }
690
691 #[test]
692 fn test_query_minimal() {
693 let raw_envelope = [
694 4, 0, 0, 0, 7, 0, 0, 0, 11, 0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64,
695 ];
696 let envelope = Envelope {
697 version: Version::V4,
698 direction: Direction::Request,
699 flags: Flags::empty(),
700 opcode: Opcode::Query,
701 stream_id: 0,
702 body: vec![0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64],
703 tracing_id: None,
704 warnings: vec![],
705 };
706 let body = RequestBody::Query(BodyReqQuery {
707 query: "blah".into(),
708 query_params: QueryParams {
709 consistency: Consistency::Any,
710 with_names: true,
711 values: None,
712 page_size: None,
713 paging_state: None,
714 serial_consistency: None,
715 timestamp: None,
716 keyspace: None,
717 now_in_seconds: None,
718 },
719 });
720 helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
721 }
722
723 #[test]
724 fn test_query_simple_values() {
725 let raw_envelope = [
726 4, 0, 0, 0, 7, 0, 0, 0, 30, 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114,
727 121, 0, 8, 1, 0, 2, 0, 0, 0, 3, 1, 2, 3, 255, 255, 255, 255,
728 ];
729 let envelope = Envelope {
730 version: Version::V4,
731 direction: Direction::Request,
732 flags: Flags::empty(),
733 opcode: Opcode::Query,
734 stream_id: 0,
735 body: vec![
736 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114, 121, 0, 8, 1, 0, 2, 0, 0,
737 0, 3, 1, 2, 3, 255, 255, 255, 255,
738 ],
739 tracing_id: None,
740 warnings: vec![],
741 };
742 let body = RequestBody::Query(BodyReqQuery {
743 query: "some query".into(),
744 query_params: QueryParams {
745 consistency: Consistency::Serial,
746 with_names: false,
747 values: Some(QueryValues::SimpleValues(vec![
748 Value::Some(vec![1, 2, 3]),
749 Value::Null,
750 ])),
751 page_size: None,
752 paging_state: None,
753 serial_consistency: None,
754 timestamp: None,
755 keyspace: None,
756 now_in_seconds: None,
757 },
758 });
759 helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
760 }
761
762 #[test]
763 fn test_query_named_values() {
764 let envelope = Envelope {
765 version: Version::V4,
766 direction: Direction::Request,
767 flags: Flags::empty(),
768 opcode: Opcode::Query,
769 stream_id: 0,
770 body: vec![],
771 tracing_id: None,
772 warnings: vec![],
773 };
774 let body = RequestBody::Query(BodyReqQuery {
775 query: "another query".into(),
776 query_params: QueryParams {
777 consistency: Consistency::Three,
778 with_names: true,
779 values: Some(QueryValues::NamedValues(
780 vec![
781 ("foo".to_string(), Value::Some(vec![11, 12, 13])),
782 ("bar".to_string(), Value::NotSet),
783 ("baz".to_string(), Value::Some(vec![42, 10, 99, 100, 4])),
784 ]
785 .into_iter()
786 .collect(),
787 )),
788 page_size: Some(4),
789 paging_state: Some(CBytes::new(vec![0, 1, 2, 3])),
790 serial_consistency: Some(Consistency::One),
791 timestamp: Some(2000),
792 keyspace: None,
793 now_in_seconds: None,
794 },
795 });
796 helpers::test_encode_decode_roundtrip_nondeterministic_request(envelope, body);
797 }
798
799 #[test]
800 fn test_result_prepared_statement() {
801 use crate::frame::message_result::{
802 BodyResResultPrepared, ColSpec, ColType, ColTypeOption, PreparedMetadata,
803 ResResultBody, RowsMetadata, RowsMetadataFlags, TableSpec,
804 };
805 use crate::types::CBytesShort;
806
807 let raw_envelope = [
808 132, 0, 0, 0, 8, 0, 0, 0, 97, 0, 0, 0, 4, 0, 16, 195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27,
811 73, 0, 0, 0, 1, 0, 0, 0, 3, 0, 0, 0, 1, 0, 0, 0, 23, 116, 101, 115, 116, 95, 112, 114, 101, 112, 97, 114, 101, 95, 115, 116, 97, 116,
817 101, 109, 101, 110, 116,
818 115, 0, 7, 116, 97, 98, 108, 101, 95, 49, 0, 2, 105, 100, 0, 9, 0, 1, 120, 0, 9, 0, 4, 110, 97, 109, 101, 0, 13, 0, 0, 0, 4, 0, 0, 0, 0, ];
829 let envelope = Envelope {
830 version: Version::V4,
831 direction: Direction::Response,
832 flags: Flags::empty(),
833 opcode: Opcode::Result,
834 stream_id: 0,
835 body: vec![
836 0, 0, 0, 4, 0, 16, 195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27,
838 73, 0, 0, 0, 1, 0, 0, 0, 3, 0, 0, 0, 1, 0, 0, 0, 23, 116, 101, 115, 116, 95, 112, 114, 101, 112, 97, 114, 101, 95, 115, 116, 97,
844 116, 101, 109, 101, 110, 116,
845 115, 0, 7, 116, 97, 98, 108, 101, 95, 49, 0, 2, 105, 100, 0, 9, 0, 1, 120, 0, 9, 0, 4, 110, 97, 109, 101, 0, 13, 0, 0, 0, 4, 0, 0, 0, 0, ],
856 tracing_id: None,
857 warnings: vec![],
858 };
859 let body = ResponseBody::Result(ResResultBody::Prepared(BodyResResultPrepared {
860 id: CBytesShort::new(vec![
861 195, 165, 42, 38, 120, 170, 232, 144, 214, 187, 158, 200, 160, 226, 27, 73,
862 ]),
863 result_metadata_id: None,
864 metadata: PreparedMetadata {
865 pk_indexes: vec![0],
866 global_table_spec: Some(TableSpec {
867 ks_name: "test_prepare_statements".into(),
868 table_name: "table_1".into(),
869 }),
870 col_specs: vec![
871 ColSpec {
872 table_spec: None,
873 name: "id".into(),
874 col_type: ColTypeOption {
875 id: ColType::Int,
876 value: None,
877 },
878 },
879 ColSpec {
880 table_spec: None,
881 name: "x".into(),
882 col_type: ColTypeOption {
883 id: ColType::Int,
884 value: None,
885 },
886 },
887 ColSpec {
888 table_spec: None,
889 name: "name".into(),
890 col_type: ColTypeOption {
891 id: ColType::Varchar,
892 value: None,
893 },
894 },
895 ],
896 },
897 result_metadata: RowsMetadata {
898 flags: RowsMetadataFlags::NO_METADATA,
899 columns_count: 0,
900 paging_state: None,
901 new_metadata_id: None,
902 global_table_spec: None,
903 col_specs: vec![],
904 },
905 }));
906
907 helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
908 }
909
910 fn create_small_envelope_data() -> (Envelope, Vec<u8>) {
911 let raw_envelope = vec![
912 4, 0, 0, 0, 7, 0, 0, 0, 30, 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114,
913 121, 0, 8, 1, 0, 2, 0, 0, 0, 3, 1, 2, 3, 255, 255, 255, 255,
914 ];
915 let envelope = Envelope {
916 version: Version::V4,
917 direction: Direction::Request,
918 flags: Flags::empty(),
919 opcode: Opcode::Query,
920 stream_id: 0,
921 body: vec![
922 0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114, 121, 0, 8, 1, 0, 2, 0, 0,
923 0, 3, 1, 2, 3, 255, 255, 255, 255,
924 ],
925 tracing_id: None,
926 warnings: vec![],
927 };
928
929 (envelope, raw_envelope)
930 }
931
932 fn create_large_envelope_data() -> (Envelope, Vec<u8>) {
933 let body: Vec<u8> = (0..262144).map(|value| (value % 256) as u8).collect();
934
935 let mut raw_envelope = vec![4, 0, 0, 0, 7, 0, 4, 0, 0];
936 raw_envelope.append(&mut body.clone());
937
938 let envelope = Envelope {
939 version: Version::V4,
940 direction: Direction::Request,
941 flags: Flags::empty(),
942 opcode: Opcode::Query,
943 stream_id: 0,
944 body,
945 tracing_id: None,
946 warnings: vec![],
947 };
948
949 (envelope, raw_envelope)
950 }
951
952 #[test]
953 fn should_encode_and_decode_legacy_frames() {
954 let (envelope, raw_envelope) = create_small_envelope_data();
955
956 let mut encoder = LegacyFrameEncoder::default();
957 assert!(encoder.can_fit(raw_envelope.len()));
958
959 encoder.add_envelope(raw_envelope.clone());
960 assert!(!encoder.can_fit(1));
961
962 let mut frame = encoder.finalize_self_contained().to_vec();
963 assert_eq!(frame, raw_envelope);
964
965 let mut decoder = LegacyFrameDecoder::default();
966
967 let envelopes = decoder.consume(&mut frame, Compression::None).unwrap();
968 assert_eq!(envelopes.len(), 1);
969 assert_eq!(envelopes[0], envelope);
970
971 encoder.reset();
972 assert!(encoder.can_fit(raw_envelope.len()));
973 }
974
975 #[test]
976 fn should_encode_and_decode_uncompressed_self_contained_frames() {
977 let (envelope, raw_envelope) = create_small_envelope_data();
978
979 let mut encoder = UncompressedFrameEncoder::default();
980 assert!(encoder.can_fit(raw_envelope.len()));
981
982 encoder.add_envelope(raw_envelope.clone());
983 assert!(encoder.can_fit(raw_envelope.len()));
984
985 encoder.add_envelope(raw_envelope);
986
987 let mut buffer1 = encoder.finalize_self_contained().to_vec();
988 let mut buffer2 = buffer1.split_off(5);
989
990 let mut decoder = UncompressedFrameDecoder::default();
991
992 let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
993 assert!(buffer1.is_empty());
994 assert!(envelopes.is_empty());
995
996 let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
997 assert!(buffer2.is_empty());
998 assert_eq!(envelopes.len(), 2);
999 assert_eq!(envelopes[0], envelope);
1000 assert_eq!(envelopes[1], envelope);
1001 }
1002
1003 #[test]
1004 fn should_encode_and_decode_uncompressed_non_self_contained_frames() {
1005 let (envelope, raw_envelope) = create_large_envelope_data();
1006
1007 let mut encoder = UncompressedFrameEncoder::default();
1008 assert!(!encoder.can_fit(raw_envelope.len()));
1009
1010 let data_len = raw_envelope.len();
1011 let mut data_start = 0;
1012 let mut buffer1 = vec![];
1013
1014 while data_start < data_len {
1015 let (data_start_offset, frame) =
1016 encoder.finalize_non_self_contained(&raw_envelope[data_start..]);
1017
1018 data_start += data_start_offset;
1019
1020 buffer1.extend_from_slice(frame);
1021
1022 encoder.reset();
1023 }
1024
1025 let mut buffer2 = buffer1.split_off(PAYLOAD_SIZE_LIMIT);
1026
1027 let mut decoder = UncompressedFrameDecoder::default();
1028
1029 let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1030 assert!(buffer1.is_empty());
1031 assert!(envelopes.is_empty());
1032
1033 let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1034 assert!(buffer2.is_empty());
1035 assert_eq!(envelopes.len(), 1);
1036 assert_eq!(envelopes[0], envelope);
1037 }
1038
1039 #[test]
1040 fn should_encode_and_decode_compressed_self_contained_frames() {
1041 let (envelope, raw_envelope) = create_small_envelope_data();
1042
1043 let mut encoder = Lz4FrameEncoder::default();
1044 assert!(encoder.can_fit(raw_envelope.len()));
1045
1046 encoder.add_envelope(raw_envelope.clone());
1047 assert!(encoder.can_fit(raw_envelope.len()));
1048
1049 encoder.add_envelope(raw_envelope);
1050
1051 let mut buffer1 = encoder.finalize_self_contained().to_vec();
1052 let mut buffer2 = buffer1.split_off(5);
1053
1054 let mut decoder = Lz4FrameDecoder::default();
1055
1056 let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1057 assert!(buffer1.is_empty());
1058 assert!(envelopes.is_empty());
1059
1060 let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1061 assert!(buffer2.is_empty());
1062 assert_eq!(envelopes.len(), 2);
1063 assert_eq!(envelopes[0], envelope);
1064 assert_eq!(envelopes[1], envelope);
1065 }
1066
1067 #[test]
1068 fn should_encode_and_decode_compressed_non_self_contained_frames() {
1069 let (envelope, raw_envelope) = create_large_envelope_data();
1070
1071 let mut encoder = Lz4FrameEncoder::default();
1072 assert!(!encoder.can_fit(raw_envelope.len()));
1073
1074 let data_len = raw_envelope.len();
1075 let mut data_start = 0;
1076 let mut buffer1 = vec![];
1077
1078 while data_start < data_len {
1079 let (data_start_offset, frame) =
1080 encoder.finalize_non_self_contained(&raw_envelope[data_start..]);
1081
1082 data_start += data_start_offset;
1083
1084 buffer1.extend_from_slice(frame);
1085
1086 encoder.reset();
1087 }
1088
1089 let mut buffer2 = buffer1.split_off(1000);
1090
1091 let mut decoder = Lz4FrameDecoder::default();
1092
1093 let envelopes = decoder.consume(&mut buffer1, Compression::None).unwrap();
1094 assert!(buffer1.is_empty());
1095 assert!(envelopes.is_empty());
1096
1097 let envelopes = decoder.consume(&mut buffer2, Compression::None).unwrap();
1098 assert!(buffer2.is_empty());
1099 assert_eq!(envelopes.len(), 1);
1100 assert_eq!(envelopes[0], envelope);
1101 }
1102}
1103
1104#[cfg(test)]
1105mod flags {
1106 use super::*;
1107 use crate::consistency::Consistency;
1108 use crate::frame::message_query::BodyReqQuery;
1109 use crate::frame::message_result::ResResultBody;
1110 use crate::query::query_params::QueryParams;
1111
1112 #[test]
1113 fn test_tracing_id_request() {
1114 let raw_envelope = [
1115 4, 2, 0, 12, 7, 0, 0, 0, 11, 0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64, ];
1122 let envelope = Envelope {
1123 version: Version::V4,
1124 direction: Direction::Request,
1125 flags: Flags::TRACING,
1126 opcode: Opcode::Query,
1127 stream_id: 12,
1128 body: vec![0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64],
1129 tracing_id: None,
1130 warnings: vec![],
1131 };
1132
1133 let body = RequestBody::Query(BodyReqQuery {
1134 query: "blah".into(),
1135 query_params: QueryParams {
1136 consistency: Consistency::Any,
1137 with_names: true,
1138 values: None,
1139 page_size: None,
1140 paging_state: None,
1141 serial_consistency: None,
1142 timestamp: None,
1143 keyspace: None,
1144 now_in_seconds: None,
1145 },
1146 });
1147 helpers::test_encode_decode_roundtrip_request(&raw_envelope, envelope, body);
1148 }
1149
1150 #[test]
1151 fn test_tracing_id_response() {
1152 let raw_envelope = [
1153 132, 2, 0, 12, 8, 0, 0, 0, 20, 4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87, 0, 0, 0, 1, ];
1161 let envelope = Envelope {
1162 version: Version::V4,
1163 direction: Direction::Response,
1164 flags: Flags::TRACING,
1165 opcode: Opcode::Result,
1166 stream_id: 12,
1167 body: vec![0, 0, 0, 1],
1168 tracing_id: Some(uuid::Uuid::from_bytes([
1169 4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87,
1170 ])),
1171 warnings: vec![],
1172 };
1173
1174 let body = ResponseBody::Result(ResResultBody::Void);
1175
1176 helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
1177 }
1178
1179 #[test]
1180 fn test_warnings_response() {
1181 let raw_envelope = [
1182 132, 8, 5, 64, 8, 0, 0, 0, 19, 0, 1, 0, 11, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 0, 0, 0, 1, ];
1191
1192 let body = ResponseBody::Result(ResResultBody::Void);
1193
1194 let envelope = Envelope {
1195 version: Version::V4,
1196 opcode: Opcode::Result,
1197 flags: Flags::WARNING,
1198 direction: Direction::Response,
1199 stream_id: 1344,
1200 tracing_id: None,
1201 body: vec![0, 0, 0, 1],
1202 warnings: vec!["Hello World".into()],
1203 };
1204
1205 helpers::test_encode_decode_roundtrip_response(&raw_envelope, envelope, body);
1206 }
1207}