1use crate::error;
2use crate::error::Error;
3use crate::frame::events::SchemaChange;
4use crate::frame::{FromBytes, FromCursor, Serialize, Version};
5use crate::types::rows::Row;
6use crate::types::{
7 from_cursor_str, serialize_str, try_i16_from_bytes, try_i32_from_bytes, try_u64_from_bytes,
8 CBytes, CBytesShort, CInt, CIntShort, INT_LEN, SHORT_LEN,
9};
10use bitflags::bitflags;
11use derive_more::{Constructor, Display};
12use std::convert::{TryFrom, TryInto};
13use std::io::{Cursor, Error as IoError, Read};
14
15#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Hash, Display)]
17#[non_exhaustive]
18pub enum ResultKind {
19 Void,
21 Rows,
23 SetKeyspace,
25 Prepared,
27 SchemaChange,
29}
30
31impl Serialize for ResultKind {
32 #[inline]
33 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
34 CInt::from(*self).serialize(cursor, version);
35 }
36}
37
38impl FromBytes for ResultKind {
39 fn from_bytes(bytes: &[u8]) -> error::Result<ResultKind> {
40 try_i32_from_bytes(bytes)
41 .map_err(Into::into)
42 .and_then(ResultKind::try_from)
43 }
44}
45
46impl From<ResultKind> for CInt {
47 fn from(value: ResultKind) -> Self {
48 match value {
49 ResultKind::Void => 0x0001,
50 ResultKind::Rows => 0x0002,
51 ResultKind::SetKeyspace => 0x0003,
52 ResultKind::Prepared => 0x0004,
53 ResultKind::SchemaChange => 0x0005,
54 }
55 }
56}
57
58impl TryFrom<CInt> for ResultKind {
59 type Error = Error;
60
61 fn try_from(value: CInt) -> Result<Self, Self::Error> {
62 match value {
63 0x0001 => Ok(ResultKind::Void),
64 0x0002 => Ok(ResultKind::Rows),
65 0x0003 => Ok(ResultKind::SetKeyspace),
66 0x0004 => Ok(ResultKind::Prepared),
67 0x0005 => Ok(ResultKind::SchemaChange),
68 _ => Err(Error::UnexpectedResultKind(value)),
69 }
70 }
71}
72
73impl FromCursor for ResultKind {
74 fn from_cursor(cursor: &mut Cursor<&[u8]>, _version: Version) -> error::Result<ResultKind> {
75 let mut buff = [0; INT_LEN];
76 cursor.read_exact(&mut buff)?;
77
78 let rk = CInt::from_be_bytes(buff);
79 rk.try_into()
80 }
81}
82
83#[derive(Debug, PartialEq, Eq, Clone, Hash)]
86#[non_exhaustive]
87pub enum ResResultBody {
88 Void,
90 Rows(BodyResResultRows),
92 SetKeyspace(BodyResResultSetKeyspace),
95 Prepared(BodyResResultPrepared),
97 SchemaChange(SchemaChange),
99}
100
101impl Serialize for ResResultBody {
102 #[inline]
103 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
104 match &self {
105 ResResultBody::Void => {
106 ResultKind::Void.serialize(cursor, version);
107 }
108 ResResultBody::Rows(rows) => {
109 ResultKind::Rows.serialize(cursor, version);
110 rows.serialize(cursor, version);
111 }
112 ResResultBody::SetKeyspace(set_keyspace) => {
113 ResultKind::SetKeyspace.serialize(cursor, version);
114 set_keyspace.serialize(cursor, version);
115 }
116 ResResultBody::Prepared(prepared) => {
117 ResultKind::Prepared.serialize(cursor, version);
118 prepared.serialize(cursor, version);
119 }
120 ResResultBody::SchemaChange(schema_change) => {
121 ResultKind::SchemaChange.serialize(cursor, version);
122 schema_change.serialize(cursor, version);
123 }
124 }
125 }
126}
127
128impl ResResultBody {
129 fn parse_body_from_cursor(
130 cursor: &mut Cursor<&[u8]>,
131 result_kind: ResultKind,
132 version: Version,
133 ) -> error::Result<ResResultBody> {
134 Ok(match result_kind {
135 ResultKind::Void => ResResultBody::Void,
136 ResultKind::Rows => {
137 ResResultBody::Rows(BodyResResultRows::from_cursor(cursor, version)?)
138 }
139 ResultKind::SetKeyspace => {
140 ResResultBody::SetKeyspace(BodyResResultSetKeyspace::from_cursor(cursor, version)?)
141 }
142 ResultKind::Prepared => {
143 ResResultBody::Prepared(BodyResResultPrepared::from_cursor(cursor, version)?)
144 }
145 ResultKind::SchemaChange => {
146 ResResultBody::SchemaChange(SchemaChange::from_cursor(cursor, version)?)
147 }
148 })
149 }
150
151 pub fn into_rows(self) -> Option<Vec<Row>> {
153 match self {
154 ResResultBody::Rows(rows_body) => Some(Row::from_body(rows_body)),
155 _ => None,
156 }
157 }
158
159 pub fn as_rows_metadata(&self) -> Option<&RowsMetadata> {
161 match self {
162 ResResultBody::Rows(rows_body) => Some(&rows_body.metadata),
163 _ => None,
164 }
165 }
166
167 pub fn into_prepared(self) -> Option<BodyResResultPrepared> {
170 match self {
171 ResResultBody::Prepared(p) => Some(p),
172 _ => None,
173 }
174 }
175
176 pub fn into_set_keyspace(self) -> Option<BodyResResultSetKeyspace> {
179 match self {
180 ResResultBody::SetKeyspace(p) => Some(p),
181 _ => None,
182 }
183 }
184}
185
186impl ResResultBody {
187 pub fn from_cursor(
188 cursor: &mut Cursor<&[u8]>,
189 version: Version,
190 ) -> error::Result<ResResultBody> {
191 let result_kind = ResultKind::from_cursor(cursor, version)?;
192 ResResultBody::parse_body_from_cursor(cursor, result_kind, version)
193 }
194}
195
196#[derive(Debug, Constructor, PartialEq, Ord, PartialOrd, Eq, Clone, Hash)]
198pub struct BodyResResultSetKeyspace {
199 pub body: String,
201}
202
203impl Serialize for BodyResResultSetKeyspace {
204 #[inline]
205 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
206 serialize_str(cursor, &self.body, version);
207 }
208}
209
210impl FromCursor for BodyResResultSetKeyspace {
211 fn from_cursor(
212 cursor: &mut Cursor<&[u8]>,
213 _version: Version,
214 ) -> error::Result<BodyResResultSetKeyspace> {
215 from_cursor_str(cursor).map(|x| BodyResResultSetKeyspace::new(x.to_string()))
216 }
217}
218
219#[derive(Debug, PartialEq, Eq, Clone, Hash)]
222pub struct BodyResResultRows {
223 pub metadata: RowsMetadata,
225 pub rows_count: CInt,
227 pub rows_content: Vec<Vec<CBytes>>,
229 pub protocol_version: Version,
231}
232
233impl Serialize for BodyResResultRows {
234 #[inline]
235 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
236 self.metadata.serialize(cursor, version);
237 self.rows_count.serialize(cursor, version);
238 self.rows_content
239 .iter()
240 .flatten()
241 .for_each(|x| x.serialize(cursor, version));
242 }
243}
244
245impl BodyResResultRows {
246 fn rows_content(
247 cursor: &mut Cursor<&[u8]>,
248 rows_count: i32,
249 columns_count: i32,
250 version: Version,
251 ) -> error::Result<Vec<Vec<CBytes>>> {
252 (0..rows_count)
253 .map(|_| {
254 (0..columns_count)
255 .map(|_| CBytes::from_cursor(cursor, version))
256 .collect::<Result<_, _>>()
257 })
258 .collect::<Result<_, _>>()
259 }
260}
261
262impl FromCursor for BodyResResultRows {
263 fn from_cursor(
264 cursor: &mut Cursor<&[u8]>,
265 version: Version,
266 ) -> error::Result<BodyResResultRows> {
267 let metadata = RowsMetadata::from_cursor(cursor, version)?;
268 let rows_count = CInt::from_cursor(cursor, version)?;
269 let rows_content =
270 BodyResResultRows::rows_content(cursor, rows_count, metadata.columns_count, version)?;
271
272 Ok(BodyResResultRows {
273 metadata,
274 rows_count,
275 rows_content,
276 protocol_version: version,
277 })
278 }
279}
280
281#[derive(Debug, Clone, PartialEq, Eq, Hash)]
283pub struct RowsMetadata {
284 pub flags: RowsMetadataFlags,
286 pub columns_count: i32,
288 pub paging_state: Option<CBytes>,
290 pub new_metadata_id: Option<CBytesShort>,
293 pub global_table_spec: Option<TableSpec>,
297 pub col_specs: Vec<ColSpec>,
299}
300
301impl Serialize for RowsMetadata {
302 #[inline]
303 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
304 assert_eq!(
307 self.flags.contains(RowsMetadataFlags::HAS_MORE_PAGES),
308 self.paging_state.is_some()
309 );
310
311 match (
312 self.flags.contains(RowsMetadataFlags::NO_METADATA),
313 self.flags.contains(RowsMetadataFlags::GLOBAL_TABLE_SPACE),
314 ) {
315 (false, false) => {
316 assert!(self.global_table_spec.is_none());
317 assert!(!self.col_specs.is_empty());
318 }
319 (false, true) => {
320 assert!(!self.col_specs.is_empty());
321 }
322 (true, _) => {
323 assert!(self.global_table_spec.is_none());
324 assert!(self.col_specs.is_empty());
325 }
326 }
327
328 self.flags.serialize(cursor, version);
329
330 self.columns_count.serialize(cursor, version);
331
332 if let Some(paging_state) = &self.paging_state {
333 paging_state.serialize(cursor, version);
334 }
335
336 if let Some(new_metadata_id) = &self.new_metadata_id {
337 new_metadata_id.serialize(cursor, version);
338 }
339
340 if let Some(global_table_spec) = &self.global_table_spec {
341 global_table_spec.serialize(cursor, version);
342 }
343
344 self.col_specs
345 .iter()
346 .for_each(|x| x.serialize(cursor, version));
347 }
348}
349
350impl FromCursor for RowsMetadata {
351 fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<RowsMetadata> {
352 let flags = RowsMetadataFlags::from_bits_truncate(CInt::from_cursor(cursor, version)?);
353 let columns_count = CInt::from_cursor(cursor, version)?;
354
355 let paging_state = if flags.contains(RowsMetadataFlags::HAS_MORE_PAGES) {
356 Some(CBytes::from_cursor(cursor, version)?)
357 } else {
358 None
359 };
360
361 if flags.contains(RowsMetadataFlags::NO_METADATA) {
362 return Ok(RowsMetadata {
363 flags,
364 columns_count,
365 paging_state,
366 new_metadata_id: None,
367 global_table_spec: None,
368 col_specs: vec![],
369 });
370 }
371
372 let new_metadata_id = if flags.contains(RowsMetadataFlags::METADATA_CHANGED) {
373 Some(CBytesShort::from_cursor(cursor, version)?)
374 } else {
375 None
376 };
377
378 let has_global_table_space = flags.contains(RowsMetadataFlags::GLOBAL_TABLE_SPACE);
379 let global_table_spec =
380 extract_global_table_space(cursor, has_global_table_space, version)?;
381
382 let col_specs =
383 ColSpec::parse_colspecs(cursor, columns_count, has_global_table_space, version)?;
384
385 Ok(RowsMetadata {
386 flags,
387 columns_count,
388 paging_state,
389 new_metadata_id,
390 global_table_spec,
391 col_specs,
392 })
393 }
394}
395
396bitflags! {
397 #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
398 pub struct RowsMetadataFlags: i32 {
399 const GLOBAL_TABLE_SPACE = 0x0001;
400 const HAS_MORE_PAGES = 0x0002;
401 const NO_METADATA = 0x0004;
402 const METADATA_CHANGED = 0x0008;
403 }
404}
405
406impl Serialize for RowsMetadataFlags {
407 #[inline]
408 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
409 self.bits().serialize(cursor, version)
410 }
411}
412
413impl From<RowsMetadataFlags> for i32 {
414 fn from(value: RowsMetadataFlags) -> Self {
415 value.bits()
416 }
417}
418
419impl FromBytes for RowsMetadataFlags {
420 fn from_bytes(bytes: &[u8]) -> error::Result<RowsMetadataFlags> {
421 try_u64_from_bytes(bytes).map_err(Into::into).and_then(|f| {
422 RowsMetadataFlags::from_bits(f as i32)
423 .ok_or_else(|| "Unexpected rows metadata flag".into())
424 })
425 }
426}
427
428#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
430pub struct TableSpec {
431 pub ks_name: String,
432 pub table_name: String,
433}
434
435impl Serialize for TableSpec {
436 #[inline]
437 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
438 serialize_str(cursor, &self.ks_name, version);
439 serialize_str(cursor, &self.table_name, version);
440 }
441}
442
443impl FromCursor for TableSpec {
444 fn from_cursor(cursor: &mut Cursor<&[u8]>, _version: Version) -> error::Result<Self> {
445 let ks_name = from_cursor_str(cursor)?.to_string();
446 let table_name = from_cursor_str(cursor)?.to_string();
447 Ok(TableSpec {
448 ks_name,
449 table_name,
450 })
451 }
452}
453
454#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
456pub struct ColSpec {
457 pub table_spec: Option<TableSpec>,
460 pub name: String,
462 pub col_type: ColTypeOption,
464}
465
466impl Serialize for ColSpec {
467 #[inline]
468 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
469 if let Some(table_spec) = &self.table_spec {
470 table_spec.serialize(cursor, version);
471 }
472
473 serialize_str(cursor, &self.name, version);
474 self.col_type.serialize(cursor, version);
475 }
476}
477
478impl ColSpec {
479 pub fn parse_colspecs(
480 cursor: &mut Cursor<&[u8]>,
481 column_count: i32,
482 has_global_table_space: bool,
483 version: Version,
484 ) -> error::Result<Vec<ColSpec>> {
485 (0..column_count)
486 .map(|_| {
487 let table_spec = if !has_global_table_space {
488 Some(TableSpec::from_cursor(cursor, version)?)
489 } else {
490 None
491 };
492
493 let name = from_cursor_str(cursor)?.to_string();
494 let col_type = ColTypeOption::from_cursor(cursor, version)?;
495
496 Ok(ColSpec {
497 table_spec,
498 name,
499 col_type,
500 })
501 })
502 .collect::<Result<_, _>>()
503 }
504}
505
506#[derive(Debug, Clone, Display, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)]
508#[non_exhaustive]
509pub enum ColType {
510 Custom,
511 Ascii,
512 Bigint,
513 Blob,
514 Boolean,
515 Counter,
516 Decimal,
517 Double,
518 Float,
519 Int,
520 Timestamp,
521 Uuid,
522 Varchar,
523 Varint,
524 Timeuuid,
525 Inet,
526 Date,
527 Time,
528 Smallint,
529 Tinyint,
530 Duration,
531 List,
532 Map,
533 Set,
534 Udt,
535 Tuple,
536}
537
538impl TryFrom<CIntShort> for ColType {
539 type Error = Error;
540
541 fn try_from(value: CIntShort) -> Result<Self, Self::Error> {
542 match value {
543 0x0000 => Ok(ColType::Custom),
544 0x0001 => Ok(ColType::Ascii),
545 0x0002 => Ok(ColType::Bigint),
546 0x0003 => Ok(ColType::Blob),
547 0x0004 => Ok(ColType::Boolean),
548 0x0005 => Ok(ColType::Counter),
549 0x0006 => Ok(ColType::Decimal),
550 0x0007 => Ok(ColType::Double),
551 0x0008 => Ok(ColType::Float),
552 0x0009 => Ok(ColType::Int),
553 0x000B => Ok(ColType::Timestamp),
554 0x000C => Ok(ColType::Uuid),
555 0x000D => Ok(ColType::Varchar),
556 0x000E => Ok(ColType::Varint),
557 0x000F => Ok(ColType::Timeuuid),
558 0x0010 => Ok(ColType::Inet),
559 0x0011 => Ok(ColType::Date),
560 0x0012 => Ok(ColType::Time),
561 0x0013 => Ok(ColType::Smallint),
562 0x0014 => Ok(ColType::Tinyint),
563 0x0015 => Ok(ColType::Duration),
564 0x0020 => Ok(ColType::List),
565 0x0021 => Ok(ColType::Map),
566 0x0022 => Ok(ColType::Set),
567 0x0030 => Ok(ColType::Udt),
568 0x0031 => Ok(ColType::Tuple),
569 0x0080 => Ok(ColType::Varchar),
570 _ => Err(Error::UnexpectedColumnType(value)),
571 }
572 }
573}
574
575impl FromBytes for ColType {
576 fn from_bytes(bytes: &[u8]) -> error::Result<ColType> {
577 try_i16_from_bytes(bytes)
578 .map_err(Into::into)
579 .and_then(ColType::try_from)
580 }
581}
582
583impl Serialize for ColType {
584 #[inline]
585 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
586 (match self {
587 ColType::Custom => 0x0000,
588 ColType::Ascii => 0x0001,
589 ColType::Bigint => 0x0002,
590 ColType::Blob => 0x0003,
591 ColType::Boolean => 0x0004,
592 ColType::Counter => 0x0005,
593 ColType::Decimal => 0x0006,
594 ColType::Double => 0x0007,
595 ColType::Float => 0x0008,
596 ColType::Int => 0x0009,
597 ColType::Timestamp => 0x000B,
598 ColType::Uuid => 0x000C,
599 ColType::Varchar => 0x000D,
600 ColType::Varint => 0x000E,
601 ColType::Timeuuid => 0x000F,
602 ColType::Inet => 0x0010,
603 ColType::Date => 0x0011,
604 ColType::Time => 0x0012,
605 ColType::Smallint => 0x0013,
606 ColType::Tinyint => 0x0014,
607 ColType::Duration => 0x0015,
608 ColType::List => 0x0020,
609 ColType::Map => 0x0021,
610 ColType::Set => 0x0022,
611 ColType::Udt => 0x0030,
612 ColType::Tuple => 0x0031,
613 } as CIntShort)
614 .serialize(cursor, version);
615 }
616}
617
618impl FromCursor for ColType {
619 fn from_cursor(cursor: &mut Cursor<&[u8]>, _version: Version) -> error::Result<ColType> {
620 let mut buff = [0; SHORT_LEN];
621 cursor.read_exact(&mut buff)?;
622
623 let t = CIntShort::from_be_bytes(buff);
624 t.try_into()
625 }
626}
627
628#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
630pub struct ColTypeOption {
631 pub id: ColType,
633 pub value: Option<ColTypeOptionValue>,
635}
636
637impl Serialize for ColTypeOption {
638 #[inline]
639 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
640 self.id.serialize(cursor, version);
641 if let Some(value) = &self.value {
642 value.serialize(cursor, version);
643 }
644 }
645}
646
647impl FromCursor for ColTypeOption {
648 fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<ColTypeOption> {
649 let id = ColType::from_cursor(cursor, version)?;
650 let value = match id {
651 ColType::Custom => Some(ColTypeOptionValue::CString(
652 from_cursor_str(cursor)?.to_string(),
653 )),
654 ColType::Set => {
655 let col_type = ColTypeOption::from_cursor(cursor, version)?;
656 Some(ColTypeOptionValue::CSet(Box::new(col_type)))
657 }
658 ColType::List => {
659 let col_type = ColTypeOption::from_cursor(cursor, version)?;
660 Some(ColTypeOptionValue::CList(Box::new(col_type)))
661 }
662 ColType::Udt => Some(ColTypeOptionValue::UdtType(CUdt::from_cursor(
663 cursor, version,
664 )?)),
665 ColType::Tuple => Some(ColTypeOptionValue::TupleType(CTuple::from_cursor(
666 cursor, version,
667 )?)),
668 ColType::Map => {
669 let name_type = ColTypeOption::from_cursor(cursor, version)?;
670 let value_type = ColTypeOption::from_cursor(cursor, version)?;
671 Some(ColTypeOptionValue::CMap(
672 Box::new(name_type),
673 Box::new(value_type),
674 ))
675 }
676 _ => None,
677 };
678
679 Ok(ColTypeOption { id, value })
680 }
681}
682
683#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
685#[non_exhaustive]
686pub enum ColTypeOptionValue {
687 CString(String),
688 ColType(ColType),
689 CSet(Box<ColTypeOption>),
690 CList(Box<ColTypeOption>),
691 UdtType(CUdt),
692 TupleType(CTuple),
693 CMap(Box<ColTypeOption>, Box<ColTypeOption>),
694}
695
696impl Serialize for ColTypeOptionValue {
697 #[inline]
698 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
699 match self {
700 Self::CString(c) => serialize_str(cursor, c, version),
701 Self::ColType(c) => c.serialize(cursor, version),
702 Self::CSet(c) => c.serialize(cursor, version),
703 Self::CList(c) => c.serialize(cursor, version),
704 Self::UdtType(c) => c.serialize(cursor, version),
705 Self::TupleType(c) => c.serialize(cursor, version),
706 Self::CMap(v1, v2) => {
707 v1.serialize(cursor, version);
708 v2.serialize(cursor, version);
709 }
710 }
711 }
712}
713
714#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
716pub struct CUdt {
717 pub ks: String,
719 pub udt_name: String,
721 pub descriptions: Vec<(String, ColTypeOption)>,
723}
724
725impl Serialize for CUdt {
726 #[inline]
727 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
728 serialize_str(cursor, &self.ks, version);
729 serialize_str(cursor, &self.udt_name, version);
730 (self.descriptions.len() as i16).serialize(cursor, version);
731 self.descriptions.iter().for_each(|(name, col_type)| {
732 serialize_str(cursor, name, version);
733 col_type.serialize(cursor, version);
734 });
735 }
736}
737
738impl FromCursor for CUdt {
739 fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<CUdt> {
740 let ks = from_cursor_str(cursor)?.to_string();
741 let udt_name = from_cursor_str(cursor)?.to_string();
742
743 let mut buff = [0; SHORT_LEN];
744 cursor.read_exact(&mut buff)?;
745
746 let n = i16::from_be_bytes(buff);
747 let mut descriptions = Vec::with_capacity(n as usize);
748 for _ in 0..n {
749 let name = from_cursor_str(cursor)?.to_string();
750 let col_type = ColTypeOption::from_cursor(cursor, version)?;
751 descriptions.push((name, col_type));
752 }
753
754 Ok(CUdt {
755 ks,
756 udt_name,
757 descriptions,
758 })
759 }
760}
761
762#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
765pub struct CTuple {
766 pub types: Vec<ColTypeOption>,
768}
769
770impl Serialize for CTuple {
771 #[inline]
772 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
773 (self.types.len() as i16).serialize(cursor, version);
774 self.types.iter().for_each(|f| f.serialize(cursor, version));
775 }
776}
777
778impl FromCursor for CTuple {
779 fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<CTuple> {
780 let mut buff = [0; SHORT_LEN];
781 cursor.read_exact(&mut buff)?;
782
783 let n = i16::from_be_bytes(buff);
784 let mut types = Vec::with_capacity(n as usize);
785 for _ in 0..n {
786 let col_type = ColTypeOption::from_cursor(cursor, version)?;
787 types.push(col_type);
788 }
789
790 Ok(CTuple { types })
791 }
792}
793
794#[derive(Debug, PartialEq, Eq, Clone, Hash)]
796pub struct BodyResResultPrepared {
797 pub id: CBytesShort,
799 pub result_metadata_id: Option<CBytesShort>,
801 pub metadata: PreparedMetadata,
803 pub result_metadata: RowsMetadata,
806}
807
808impl Serialize for BodyResResultPrepared {
809 #[inline]
810 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
811 self.id.serialize(cursor, version);
812
813 if let Some(result_metadata_id) = &self.result_metadata_id {
814 result_metadata_id.serialize(cursor, version);
815 }
816
817 self.metadata.serialize(cursor, version);
818 self.result_metadata.serialize(cursor, version);
819 }
820}
821
822impl BodyResResultPrepared {
823 fn from_cursor(
824 cursor: &mut Cursor<&[u8]>,
825 version: Version,
826 ) -> error::Result<BodyResResultPrepared> {
827 let id = CBytesShort::from_cursor(cursor, version)?;
828
829 let result_metadata_id = if version == Version::V5 {
830 Some(CBytesShort::from_cursor(cursor, version)?)
831 } else {
832 None
833 };
834
835 let metadata = PreparedMetadata::from_cursor(cursor, version)?;
836 let result_metadata = RowsMetadata::from_cursor(cursor, version)?;
837
838 Ok(BodyResResultPrepared {
839 id,
840 result_metadata_id,
841 metadata,
842 result_metadata,
843 })
844 }
845}
846
847bitflags! {
848 pub struct PreparedMetadataFlags: i32 {
849 const GLOBAL_TABLE_SPACE = 0x0001;
850 }
851}
852
853impl Serialize for PreparedMetadataFlags {
854 #[inline]
855 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
856 self.bits().serialize(cursor, version);
857 }
858}
859
860#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
862pub struct PreparedMetadata {
863 pub pk_indexes: Vec<i16>,
864 pub global_table_spec: Option<TableSpec>,
865 pub col_specs: Vec<ColSpec>,
866}
867
868impl Serialize for PreparedMetadata {
869 #[inline]
870 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
871 if self.global_table_spec.is_some() {
872 PreparedMetadataFlags::GLOBAL_TABLE_SPACE
873 } else {
874 PreparedMetadataFlags::empty()
875 }
876 .serialize(cursor, version);
877
878 let columns_count = self.col_specs.len() as i32;
879 columns_count.serialize(cursor, version);
880
881 let pk_count = self.pk_indexes.len() as i32;
882 pk_count.serialize(cursor, version);
883
884 self.pk_indexes
885 .iter()
886 .for_each(|f| f.serialize(cursor, version));
887
888 if let Some(global_table_spec) = &self.global_table_spec {
889 global_table_spec.serialize(cursor, version);
890 }
891
892 self.col_specs
893 .iter()
894 .for_each(|x| x.serialize(cursor, version));
895 }
896}
897
898impl PreparedMetadata {
899 fn from_cursor(
900 cursor: &mut Cursor<&[u8]>,
901 version: Version,
902 ) -> error::Result<PreparedMetadata> {
903 let flags = PreparedMetadataFlags::from_bits_truncate(CInt::from_cursor(cursor, version)?);
904 let columns_count = CInt::from_cursor(cursor, version)?;
905
906 let pk_count = if let Version::V3 = version {
907 0
908 } else {
909 CInt::from_cursor(cursor, version)?
911 };
912
913 let pk_indexes = (0..pk_count)
914 .map(|_| {
915 let mut buff = [0; SHORT_LEN];
916 cursor.read_exact(&mut buff)?;
917
918 Ok(i16::from_be_bytes(buff))
919 })
920 .collect::<Result<Vec<i16>, IoError>>()?;
921
922 let has_global_table_space = flags.contains(PreparedMetadataFlags::GLOBAL_TABLE_SPACE);
923 let global_table_spec =
924 extract_global_table_space(cursor, has_global_table_space, version)?;
925 let col_specs =
926 ColSpec::parse_colspecs(cursor, columns_count, has_global_table_space, version)?;
927
928 Ok(PreparedMetadata {
929 pk_indexes,
930 global_table_spec,
931 col_specs,
932 })
933 }
934}
935
936fn extract_global_table_space(
937 cursor: &mut Cursor<&[u8]>,
938 has_global_table_space: bool,
939 version: Version,
940) -> error::Result<Option<TableSpec>> {
941 Ok(if has_global_table_space {
942 Some(TableSpec::from_cursor(cursor, version)?)
943 } else {
944 None
945 })
946}
947
948#[cfg(test)]
950fn test_encode_decode(bytes: &[u8], expected: ResResultBody) {
951 {
952 let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
953 let result = ResResultBody::from_cursor(&mut cursor, Version::V4).unwrap();
954 assert_eq!(expected, result);
955 }
956
957 {
958 let mut buffer = Vec::new();
959 let mut cursor = Cursor::new(&mut buffer);
960 expected.serialize(&mut cursor, Version::V4);
961 assert_eq!(buffer, bytes);
962 }
963}
964
965#[cfg(test)]
966mod cudt {
967 use super::*;
968
969 #[test]
971 fn cudt() {
972 let bytes = &[
973 0, 3, 98, 97, 114, 0, 3, 102, 111, 111, 0, 2, 0, 3, 98, 97, 114, 0, 9, 0, 3, 102, 111, 111, 0, 9, ];
984 let expected = CUdt {
985 ks: "bar".into(),
986 udt_name: "foo".into(),
987 descriptions: vec![
988 (
989 "bar".into(),
990 ColTypeOption {
991 id: ColType::Int,
992 value: None,
993 },
994 ),
995 (
996 "foo".into(),
997 ColTypeOption {
998 id: ColType::Int,
999 value: None,
1000 },
1001 ),
1002 ],
1003 };
1004
1005 {
1006 let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1007 let udt = CUdt::from_cursor(&mut cursor, Version::V4).unwrap();
1008 assert_eq!(udt, expected);
1009 }
1010
1011 {
1012 let mut buffer = Vec::new();
1013 let mut cursor = Cursor::new(&mut buffer);
1014 expected.serialize(&mut cursor, Version::V4);
1015 assert_eq!(buffer, bytes);
1016 }
1017 }
1018}
1019
1020#[cfg(test)]
1021mod ctuple {
1023 use super::*;
1024
1025 #[test]
1026 fn ctuple() {
1027 let bytes = &[0, 3, 0, 9, 0, 9, 0, 9];
1028 let expected = CTuple {
1029 types: vec![
1030 ColTypeOption {
1031 id: ColType::Int,
1032 value: None,
1033 };
1034 3
1035 ],
1036 };
1037
1038 {
1039 let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1040 let tuple = CTuple::from_cursor(&mut cursor, Version::V4).unwrap();
1041 assert_eq!(tuple, expected);
1042 }
1043
1044 {
1045 let mut buffer = Vec::new();
1046 let mut cursor = Cursor::new(&mut buffer);
1047 expected.serialize(&mut cursor, Version::V4);
1048 assert_eq!(buffer, bytes);
1049 }
1050 }
1051}
1052
1053#[cfg(test)]
1054mod col_spec {
1056 use super::*;
1057
1058 #[test]
1059 fn col_spec_with_table_spec() {
1060 let bytes = &[
1061 0, 3, 98, 97, 114, 0, 3, 102, 111, 111, 0, 3, 102, 111, 111, 0, 9, ];
1069
1070 let expected = vec![ColSpec {
1071 table_spec: Some(TableSpec {
1072 ks_name: "bar".into(),
1073 table_name: "foo".into(),
1074 }),
1075 name: "foo".into(),
1076 col_type: ColTypeOption {
1077 id: ColType::Int,
1078 value: None,
1079 },
1080 }];
1081
1082 {
1083 let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1084 let col_spec = ColSpec::parse_colspecs(&mut cursor, 1, false, Version::V4).unwrap();
1085 assert_eq!(col_spec, expected);
1086 }
1087
1088 {
1089 let mut buffer = Vec::new();
1090 let mut cursor = Cursor::new(&mut buffer);
1091 expected[0].serialize(&mut cursor, Version::V4);
1092 assert_eq!(buffer, bytes);
1093 }
1094 }
1095
1096 #[test]
1097 fn col_spec_without_table_spec() {
1098 let bytes = &[
1099 0, 3, 102, 111, 111, 0, 9, ];
1103 let expected = vec![ColSpec {
1104 table_spec: None,
1105 name: "foo".into(),
1106 col_type: ColTypeOption {
1107 id: ColType::Int,
1108 value: None,
1109 },
1110 }];
1111
1112 {
1113 let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1114 let col_spec = ColSpec::parse_colspecs(&mut cursor, 1, true, Version::V4).unwrap();
1115 assert_eq!(col_spec, expected);
1116 }
1117
1118 {
1119 let mut buffer = Vec::new();
1120 let mut cursor = Cursor::new(&mut buffer);
1121 expected[0].serialize(&mut cursor, Version::V4);
1122 assert_eq!(buffer, bytes);
1123 }
1124 }
1125}
1126
1127#[cfg(test)]
1128mod col_type_option {
1130 use super::*;
1131
1132 #[test]
1133 fn col_type_options_int() {
1134 let bytes = &[0, 9];
1135 let expected = ColTypeOption {
1136 id: ColType::Int,
1137 value: None,
1138 };
1139
1140 {
1141 let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1142 let col_type_option = ColTypeOption::from_cursor(&mut cursor, Version::V4).unwrap();
1143 assert_eq!(col_type_option, expected);
1144 }
1145
1146 {
1147 let mut buffer = Vec::new();
1148 let mut cursor = Cursor::new(&mut buffer);
1149 expected.serialize(&mut cursor, Version::V4);
1150 assert_eq!(buffer, bytes);
1151 }
1152 }
1153
1154 #[test]
1155 fn col_type_options_map() {
1156 let bytes = &[0, 33, 0, 9, 0, 9];
1157 let expected = ColTypeOption {
1158 id: ColType::Map,
1159 value: Some(ColTypeOptionValue::CMap(
1160 Box::new(ColTypeOption {
1161 id: ColType::Int,
1162 value: None,
1163 }),
1164 Box::new(ColTypeOption {
1165 id: ColType::Int,
1166 value: None,
1167 }),
1168 )),
1169 };
1170
1171 {
1172 let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1173 let col_type_option = ColTypeOption::from_cursor(&mut cursor, Version::V4).unwrap();
1174 assert_eq!(col_type_option, expected);
1175 }
1176
1177 {
1178 let mut buffer = Vec::new();
1179 let mut cursor = Cursor::new(&mut buffer);
1180 expected.serialize(&mut cursor, Version::V4);
1181 assert_eq!(buffer, bytes);
1182 }
1183 }
1184}
1185
1186#[cfg(test)]
1187mod table_spec {
1189 use super::*;
1190
1191 #[test]
1192 fn table_spec() {
1193 let bytes = &[
1194 0, 3, 98, 97, 114, 0, 3, 102, 111, 111, ];
1197 let expected = TableSpec {
1198 ks_name: "bar".into(),
1199 table_name: "foo".into(),
1200 };
1201
1202 {
1203 let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1204 let table_spec = TableSpec::from_cursor(&mut cursor, Version::V4).unwrap();
1205 assert_eq!(table_spec, expected);
1206 }
1207
1208 {
1209 let mut buffer = Vec::new();
1210 let mut cursor = Cursor::new(&mut buffer);
1211 expected.serialize(&mut cursor, Version::V4);
1212 assert_eq!(buffer, bytes);
1213 }
1214 }
1215}
1216
1217#[cfg(test)]
1218mod void {
1219 use super::*;
1220
1221 #[test]
1222 fn test_void() {
1223 let bytes = &[0, 0, 0, 1];
1224 let expected = ResResultBody::Void;
1225 test_encode_decode(bytes, expected);
1226 }
1227}
1228
1229#[cfg(test)]
1230mod rows_metadata {
1232 use super::*;
1233
1234 #[test]
1235 fn rows_metadata() {
1236 let bytes = &[
1237 0, 0, 0, 8, 0, 0, 0, 2, 0, 1, 1, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 102, 111, 111, 0, 9, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 98, 97, 114, 0, 19, ];
1253
1254 let expected = RowsMetadata {
1255 flags: RowsMetadataFlags::METADATA_CHANGED,
1256
1257 columns_count: 2,
1258 paging_state: None,
1259 new_metadata_id: Some(CBytesShort::new(vec![1])),
1260 global_table_spec: None,
1261 col_specs: vec![
1262 ColSpec {
1263 table_spec: Some(TableSpec {
1264 ks_name: "ksname1".into(),
1265 table_name: "tablename".into(),
1266 }),
1267 name: "foo".into(),
1268 col_type: ColTypeOption {
1269 id: ColType::Int,
1270 value: None,
1271 },
1272 },
1273 ColSpec {
1274 table_spec: Some(TableSpec {
1275 ks_name: "ksname1".into(),
1276 table_name: "tablename".into(),
1277 }),
1278 name: "bar".into(),
1279 col_type: ColTypeOption {
1280 id: ColType::Smallint,
1281 value: None,
1282 },
1283 },
1284 ],
1285 };
1286
1287 {
1288 let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1289 let metadata = RowsMetadata::from_cursor(&mut cursor, Version::V4).unwrap();
1290 assert_eq!(metadata, expected);
1291 }
1292
1293 {
1294 let mut buffer = Vec::new();
1295 let mut cursor = Cursor::new(&mut buffer);
1296 expected.serialize(&mut cursor, Version::V4);
1297 assert_eq!(buffer, bytes);
1298 }
1299 }
1300}
1301
1302#[cfg(test)]
1303mod rows {
1305 use super::*;
1306
1307 #[test]
1308 fn test_rows() {
1309 let bytes = &[
1310 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 102, 111, 111, 0, 9, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 98, 97, 114, 0, 19, 0, 0, 0, 0, ];
1327
1328 let expected = ResResultBody::Rows(BodyResResultRows {
1329 metadata: RowsMetadata {
1330 flags: RowsMetadataFlags::empty(),
1331 columns_count: 2,
1332 paging_state: None,
1333 new_metadata_id: None,
1334 global_table_spec: None,
1335 col_specs: vec![
1336 ColSpec {
1337 table_spec: Some(TableSpec {
1338 ks_name: "ksname1".into(),
1339 table_name: "tablename".into(),
1340 }),
1341 name: "foo".into(),
1342 col_type: ColTypeOption {
1343 id: ColType::Int,
1344 value: None,
1345 },
1346 },
1347 ColSpec {
1348 table_spec: Some(TableSpec {
1349 ks_name: "ksname1".into(),
1350 table_name: "tablename".into(),
1351 }),
1352 name: "bar".into(),
1353 col_type: ColTypeOption {
1354 id: ColType::Smallint,
1355 value: None,
1356 },
1357 },
1358 ],
1359 },
1360 rows_count: 0,
1361 rows_content: vec![],
1362 protocol_version: Version::V4,
1363 });
1364
1365 test_encode_decode(bytes, expected);
1366 }
1367
1368 #[test]
1369 fn test_rows_no_metadata() {
1370 let bytes = &[
1371 0, 0, 0, 2, 0, 0, 0, 4, 0, 0, 0, 3, 0, 0, 0, 0, ];
1376
1377 let expected = ResResultBody::Rows(BodyResResultRows {
1378 metadata: RowsMetadata {
1379 flags: RowsMetadataFlags::NO_METADATA,
1380 columns_count: 3,
1381 paging_state: None,
1382 new_metadata_id: None,
1383 global_table_spec: None,
1384 col_specs: vec![],
1385 },
1386 rows_count: 0,
1387 rows_content: vec![],
1388 protocol_version: Version::V4,
1389 });
1390
1391 test_encode_decode(bytes, expected);
1392 }
1393}
1394
1395#[cfg(test)]
1396mod keyspace {
1397 use super::*;
1398
1399 #[test]
1400 fn test_set_keyspace() {
1401 let bytes = &[
1402 0, 0, 0, 3, 0, 4, 98, 108, 97, 104, ];
1405
1406 let expected = ResResultBody::SetKeyspace(BodyResResultSetKeyspace {
1407 body: "blah".into(),
1408 });
1409
1410 test_encode_decode(bytes, expected);
1411 }
1412}
1413
1414#[cfg(test)]
1415mod prepared_metadata {
1417 use super::*;
1418
1419 #[test]
1420 fn prepared_metadata() {
1421 let bytes = &[
1422 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 102, 111, 111, 0, 9, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 98, 97, 114, 0, 19, ];
1440
1441 let expected = PreparedMetadata {
1442 pk_indexes: vec![0],
1443 global_table_spec: None,
1444 col_specs: vec![
1445 ColSpec {
1446 table_spec: Some(TableSpec {
1447 ks_name: "ksname1".into(),
1448 table_name: "tablename".into(),
1449 }),
1450 name: "foo".into(),
1451 col_type: ColTypeOption {
1452 id: ColType::Int,
1453 value: None,
1454 },
1455 },
1456 ColSpec {
1457 table_spec: Some(TableSpec {
1458 ks_name: "ksname1".into(),
1459 table_name: "tablename".into(),
1460 }),
1461 name: "bar".into(),
1462 col_type: ColTypeOption {
1463 id: ColType::Smallint,
1464 value: None,
1465 },
1466 },
1467 ],
1468 };
1469
1470 {
1471 let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1472 let metadata = PreparedMetadata::from_cursor(&mut cursor, Version::V4).unwrap();
1473 assert_eq!(metadata, expected);
1474 }
1475
1476 {
1477 let mut buffer = Vec::new();
1478 let mut cursor = Cursor::new(&mut buffer);
1479 expected.serialize(&mut cursor, Version::V4);
1480 assert_eq!(buffer, bytes);
1481 }
1482 }
1483}
1484
1485#[cfg(test)]
1486mod prepared {
1488 use super::*;
1489 use crate::types::{to_short, CBytesShort};
1490
1491 #[test]
1492 fn test_prepared() {
1493 let bytes = &[
1494 0, 0, 0, 4, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 102, 111, 111, 0, 9, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 98, 97, 114, 0, 19, 0, 0, 0, 0, 0, 0, 0, 2, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 102, 111, 111, 0, 9, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 98, 97, 114, 0, 19, ];
1528
1529 let expected = ResResultBody::Prepared(BodyResResultPrepared {
1530 id: CBytesShort::new(to_short(1)),
1531 result_metadata_id: None,
1532 metadata: PreparedMetadata {
1533 pk_indexes: vec![0],
1534 global_table_spec: None,
1535 col_specs: vec![
1536 ColSpec {
1537 table_spec: Some(TableSpec {
1538 ks_name: "ksname1".into(),
1539 table_name: "tablename".into(),
1540 }),
1541 name: "foo".into(),
1542 col_type: ColTypeOption {
1543 id: ColType::Int,
1544 value: None,
1545 },
1546 },
1547 ColSpec {
1548 table_spec: Some(TableSpec {
1549 ks_name: "ksname1".into(),
1550 table_name: "tablename".into(),
1551 }),
1552 name: "bar".into(),
1553 col_type: ColTypeOption {
1554 id: ColType::Smallint,
1555 value: None,
1556 },
1557 },
1558 ],
1559 },
1560 result_metadata: RowsMetadata {
1561 flags: RowsMetadataFlags::empty(),
1562 columns_count: 2,
1563 paging_state: None,
1564 new_metadata_id: None,
1565 global_table_spec: None,
1566 col_specs: vec![
1567 ColSpec {
1568 table_spec: Some(TableSpec {
1569 ks_name: "ksname1".into(),
1570 table_name: "tablename".into(),
1571 }),
1572 name: "foo".into(),
1573 col_type: ColTypeOption {
1574 id: ColType::Int,
1575 value: None,
1576 },
1577 },
1578 ColSpec {
1579 table_spec: Some(TableSpec {
1580 table_name: "tablename".into(),
1581 ks_name: "ksname1".into(),
1582 }),
1583 name: "bar".into(),
1584 col_type: ColTypeOption {
1585 id: ColType::Smallint,
1586 value: None,
1587 },
1588 },
1589 ],
1590 },
1591 });
1592
1593 test_encode_decode(bytes, expected);
1594 }
1595}
1596
1597#[cfg(test)]
1598mod schema_change {
1599 use super::*;
1600 use crate::frame::events::{SchemaChangeOptions, SchemaChangeTarget, SchemaChangeType};
1601
1602 #[test]
1603 fn test_schema_change() {
1604 let bytes = &[
1605 0, 0, 0, 5, 0, 7, 67, 82, 69, 65, 84, 69, 68, 0, 8, 75, 69, 89, 83, 80, 65, 67, 69, 0, 4, 98, 108, 97, 104, ];
1610
1611 let expected = ResResultBody::SchemaChange(SchemaChange {
1612 change_type: SchemaChangeType::Created,
1613 target: SchemaChangeTarget::Keyspace,
1614 options: SchemaChangeOptions::Keyspace("blah".into()),
1615 });
1616
1617 test_encode_decode(bytes, expected);
1618 }
1619}