cassandra_protocol/frame/
message_result.rs

1use crate::error;
2use crate::error::Error;
3use crate::frame::events::SchemaChange;
4use crate::frame::{FromBytes, FromCursor, Serialize, Version};
5use crate::types::rows::Row;
6use crate::types::{
7    from_cursor_str, serialize_str, try_i16_from_bytes, try_i32_from_bytes, try_u64_from_bytes,
8    CBytes, CBytesShort, CInt, CIntShort, INT_LEN, SHORT_LEN,
9};
10use bitflags::bitflags;
11use derive_more::{Constructor, Display};
12use std::convert::{TryFrom, TryInto};
13use std::io::{Cursor, Error as IoError, Read};
14
15/// `ResultKind` is enum which represents types of result.
16#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Hash, Display)]
17#[non_exhaustive]
18pub enum ResultKind {
19    /// Void result.
20    Void,
21    /// Rows result.
22    Rows,
23    /// Set keyspace result.
24    SetKeyspace,
25    /// Prepared result.
26    Prepared,
27    /// Schema change result.
28    SchemaChange,
29}
30
31impl Serialize for ResultKind {
32    #[inline]
33    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
34        CInt::from(*self).serialize(cursor, version);
35    }
36}
37
38impl FromBytes for ResultKind {
39    fn from_bytes(bytes: &[u8]) -> error::Result<ResultKind> {
40        try_i32_from_bytes(bytes)
41            .map_err(Into::into)
42            .and_then(ResultKind::try_from)
43    }
44}
45
46impl From<ResultKind> for CInt {
47    fn from(value: ResultKind) -> Self {
48        match value {
49            ResultKind::Void => 0x0001,
50            ResultKind::Rows => 0x0002,
51            ResultKind::SetKeyspace => 0x0003,
52            ResultKind::Prepared => 0x0004,
53            ResultKind::SchemaChange => 0x0005,
54        }
55    }
56}
57
58impl TryFrom<CInt> for ResultKind {
59    type Error = Error;
60
61    fn try_from(value: CInt) -> Result<Self, Self::Error> {
62        match value {
63            0x0001 => Ok(ResultKind::Void),
64            0x0002 => Ok(ResultKind::Rows),
65            0x0003 => Ok(ResultKind::SetKeyspace),
66            0x0004 => Ok(ResultKind::Prepared),
67            0x0005 => Ok(ResultKind::SchemaChange),
68            _ => Err(Error::UnexpectedResultKind(value)),
69        }
70    }
71}
72
73impl FromCursor for ResultKind {
74    fn from_cursor(cursor: &mut Cursor<&[u8]>, _version: Version) -> error::Result<ResultKind> {
75        let mut buff = [0; INT_LEN];
76        cursor.read_exact(&mut buff)?;
77
78        let rk = CInt::from_be_bytes(buff);
79        rk.try_into()
80    }
81}
82
83/// `ResponseBody` is a generalized enum that represents all types of responses. Each of enum
84/// option wraps related body type.
85#[derive(Debug, PartialEq, Eq, Clone, Hash)]
86#[non_exhaustive]
87pub enum ResResultBody {
88    /// Void response body. It's an empty struct.
89    Void,
90    /// Rows response body. It represents a body of response which contains rows.
91    Rows(BodyResResultRows),
92    /// Set keyspace body. It represents a body of set_keyspace query and usually contains
93    /// a name of just set namespace.
94    SetKeyspace(BodyResResultSetKeyspace),
95    /// Prepared response body.
96    Prepared(BodyResResultPrepared),
97    /// Schema change body
98    SchemaChange(SchemaChange),
99}
100
101impl Serialize for ResResultBody {
102    #[inline]
103    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
104        match &self {
105            ResResultBody::Void => {
106                ResultKind::Void.serialize(cursor, version);
107            }
108            ResResultBody::Rows(rows) => {
109                ResultKind::Rows.serialize(cursor, version);
110                rows.serialize(cursor, version);
111            }
112            ResResultBody::SetKeyspace(set_keyspace) => {
113                ResultKind::SetKeyspace.serialize(cursor, version);
114                set_keyspace.serialize(cursor, version);
115            }
116            ResResultBody::Prepared(prepared) => {
117                ResultKind::Prepared.serialize(cursor, version);
118                prepared.serialize(cursor, version);
119            }
120            ResResultBody::SchemaChange(schema_change) => {
121                ResultKind::SchemaChange.serialize(cursor, version);
122                schema_change.serialize(cursor, version);
123            }
124        }
125    }
126}
127
128impl ResResultBody {
129    fn parse_body_from_cursor(
130        cursor: &mut Cursor<&[u8]>,
131        result_kind: ResultKind,
132        version: Version,
133    ) -> error::Result<ResResultBody> {
134        Ok(match result_kind {
135            ResultKind::Void => ResResultBody::Void,
136            ResultKind::Rows => {
137                ResResultBody::Rows(BodyResResultRows::from_cursor(cursor, version)?)
138            }
139            ResultKind::SetKeyspace => {
140                ResResultBody::SetKeyspace(BodyResResultSetKeyspace::from_cursor(cursor, version)?)
141            }
142            ResultKind::Prepared => {
143                ResResultBody::Prepared(BodyResResultPrepared::from_cursor(cursor, version)?)
144            }
145            ResultKind::SchemaChange => {
146                ResResultBody::SchemaChange(SchemaChange::from_cursor(cursor, version)?)
147            }
148        })
149    }
150
151    /// Converts body into `Vec<Row>` if body's type is `Row` and returns `None` otherwise.
152    pub fn into_rows(self) -> Option<Vec<Row>> {
153        match self {
154            ResResultBody::Rows(rows_body) => Some(Row::from_body(rows_body)),
155            _ => None,
156        }
157    }
158
159    /// Returns `Some` rows metadata if envelope result is of type rows and `None` otherwise
160    pub fn as_rows_metadata(&self) -> Option<&RowsMetadata> {
161        match self {
162            ResResultBody::Rows(rows_body) => Some(&rows_body.metadata),
163            _ => None,
164        }
165    }
166
167    /// Unwraps body and returns BodyResResultPrepared which contains an exact result of
168    /// PREPARE query.
169    pub fn into_prepared(self) -> Option<BodyResResultPrepared> {
170        match self {
171            ResResultBody::Prepared(p) => Some(p),
172            _ => None,
173        }
174    }
175
176    /// Unwraps body and returns BodyResResultSetKeyspace which contains an exact result of
177    /// use keyspace query.
178    pub fn into_set_keyspace(self) -> Option<BodyResResultSetKeyspace> {
179        match self {
180            ResResultBody::SetKeyspace(p) => Some(p),
181            _ => None,
182        }
183    }
184}
185
186impl ResResultBody {
187    pub fn from_cursor(
188        cursor: &mut Cursor<&[u8]>,
189        version: Version,
190    ) -> error::Result<ResResultBody> {
191        let result_kind = ResultKind::from_cursor(cursor, version)?;
192        ResResultBody::parse_body_from_cursor(cursor, result_kind, version)
193    }
194}
195
196/// It represents set keyspace result body. Body contains keyspace name.
197#[derive(Debug, Constructor, PartialEq, Ord, PartialOrd, Eq, Clone, Hash)]
198pub struct BodyResResultSetKeyspace {
199    /// It contains name of keyspace that was set.
200    pub body: String,
201}
202
203impl Serialize for BodyResResultSetKeyspace {
204    #[inline]
205    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
206        serialize_str(cursor, &self.body, version);
207    }
208}
209
210impl FromCursor for BodyResResultSetKeyspace {
211    fn from_cursor(
212        cursor: &mut Cursor<&[u8]>,
213        _version: Version,
214    ) -> error::Result<BodyResResultSetKeyspace> {
215        from_cursor_str(cursor).map(|x| BodyResResultSetKeyspace::new(x.to_string()))
216    }
217}
218
219/// Structure that represents result of type
220/// [rows](https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v4.spec#L533).
221#[derive(Debug, PartialEq, Eq, Clone, Hash)]
222pub struct BodyResResultRows {
223    /// Rows metadata
224    pub metadata: RowsMetadata,
225    /// Number of rows.
226    pub rows_count: CInt,
227    /// From spec: it is composed of `rows_count` of rows.
228    pub rows_content: Vec<Vec<CBytes>>,
229    /// Protocol version.
230    pub protocol_version: Version,
231}
232
233impl Serialize for BodyResResultRows {
234    #[inline]
235    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
236        self.metadata.serialize(cursor, version);
237        self.rows_count.serialize(cursor, version);
238        self.rows_content
239            .iter()
240            .flatten()
241            .for_each(|x| x.serialize(cursor, version));
242    }
243}
244
245impl BodyResResultRows {
246    fn rows_content(
247        cursor: &mut Cursor<&[u8]>,
248        rows_count: i32,
249        columns_count: i32,
250        version: Version,
251    ) -> error::Result<Vec<Vec<CBytes>>> {
252        (0..rows_count)
253            .map(|_| {
254                (0..columns_count)
255                    .map(|_| CBytes::from_cursor(cursor, version))
256                    .collect::<Result<_, _>>()
257            })
258            .collect::<Result<_, _>>()
259    }
260}
261
262impl FromCursor for BodyResResultRows {
263    fn from_cursor(
264        cursor: &mut Cursor<&[u8]>,
265        version: Version,
266    ) -> error::Result<BodyResResultRows> {
267        let metadata = RowsMetadata::from_cursor(cursor, version)?;
268        let rows_count = CInt::from_cursor(cursor, version)?;
269        let rows_content =
270            BodyResResultRows::rows_content(cursor, rows_count, metadata.columns_count, version)?;
271
272        Ok(BodyResResultRows {
273            metadata,
274            rows_count,
275            rows_content,
276            protocol_version: version,
277        })
278    }
279}
280
281/// Rows metadata.
282#[derive(Debug, Clone, PartialEq, Eq, Hash)]
283pub struct RowsMetadata {
284    /// Flags.
285    pub flags: RowsMetadataFlags,
286    /// Number of columns.
287    pub columns_count: i32,
288    /// Paging state.
289    pub paging_state: Option<CBytes>,
290    /// New, changed result set metadata. The new metadata ID must also be used in subsequent
291    /// executions of the corresponding prepared statement, if any.
292    pub new_metadata_id: Option<CBytesShort>,
293    // In fact by specification Vec should have only two elements representing the
294    // (unique) keyspace name and table name the columns belong to
295    /// `Option` that may contain global table space.
296    pub global_table_spec: Option<TableSpec>,
297    /// List of column specifications.
298    pub col_specs: Vec<ColSpec>,
299}
300
301impl Serialize for RowsMetadata {
302    #[inline]
303    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
304        // First we need assert that the flags match up with the data we were provided.
305        // If they dont match up then it is impossible to encode.
306        assert_eq!(
307            self.flags.contains(RowsMetadataFlags::HAS_MORE_PAGES),
308            self.paging_state.is_some()
309        );
310
311        match (
312            self.flags.contains(RowsMetadataFlags::NO_METADATA),
313            self.flags.contains(RowsMetadataFlags::GLOBAL_TABLE_SPACE),
314        ) {
315            (false, false) => {
316                assert!(self.global_table_spec.is_none());
317                assert!(!self.col_specs.is_empty());
318            }
319            (false, true) => {
320                assert!(!self.col_specs.is_empty());
321            }
322            (true, _) => {
323                assert!(self.global_table_spec.is_none());
324                assert!(self.col_specs.is_empty());
325            }
326        }
327
328        self.flags.serialize(cursor, version);
329
330        self.columns_count.serialize(cursor, version);
331
332        if let Some(paging_state) = &self.paging_state {
333            paging_state.serialize(cursor, version);
334        }
335
336        if let Some(new_metadata_id) = &self.new_metadata_id {
337            new_metadata_id.serialize(cursor, version);
338        }
339
340        if let Some(global_table_spec) = &self.global_table_spec {
341            global_table_spec.serialize(cursor, version);
342        }
343
344        self.col_specs
345            .iter()
346            .for_each(|x| x.serialize(cursor, version));
347    }
348}
349
350impl FromCursor for RowsMetadata {
351    fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<RowsMetadata> {
352        let flags = RowsMetadataFlags::from_bits_truncate(CInt::from_cursor(cursor, version)?);
353        let columns_count = CInt::from_cursor(cursor, version)?;
354
355        let paging_state = if flags.contains(RowsMetadataFlags::HAS_MORE_PAGES) {
356            Some(CBytes::from_cursor(cursor, version)?)
357        } else {
358            None
359        };
360
361        if flags.contains(RowsMetadataFlags::NO_METADATA) {
362            return Ok(RowsMetadata {
363                flags,
364                columns_count,
365                paging_state,
366                new_metadata_id: None,
367                global_table_spec: None,
368                col_specs: vec![],
369            });
370        }
371
372        let new_metadata_id = if flags.contains(RowsMetadataFlags::METADATA_CHANGED) {
373            Some(CBytesShort::from_cursor(cursor, version)?)
374        } else {
375            None
376        };
377
378        let has_global_table_space = flags.contains(RowsMetadataFlags::GLOBAL_TABLE_SPACE);
379        let global_table_spec =
380            extract_global_table_space(cursor, has_global_table_space, version)?;
381
382        let col_specs =
383            ColSpec::parse_colspecs(cursor, columns_count, has_global_table_space, version)?;
384
385        Ok(RowsMetadata {
386            flags,
387            columns_count,
388            paging_state,
389            new_metadata_id,
390            global_table_spec,
391            col_specs,
392        })
393    }
394}
395
396bitflags! {
397    #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
398    pub struct RowsMetadataFlags: i32 {
399        const GLOBAL_TABLE_SPACE = 0x0001;
400        const HAS_MORE_PAGES = 0x0002;
401        const NO_METADATA = 0x0004;
402        const METADATA_CHANGED = 0x0008;
403    }
404}
405
406impl Serialize for RowsMetadataFlags {
407    #[inline]
408    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
409        self.bits().serialize(cursor, version)
410    }
411}
412
413impl From<RowsMetadataFlags> for i32 {
414    fn from(value: RowsMetadataFlags) -> Self {
415        value.bits()
416    }
417}
418
419impl FromBytes for RowsMetadataFlags {
420    fn from_bytes(bytes: &[u8]) -> error::Result<RowsMetadataFlags> {
421        try_u64_from_bytes(bytes).map_err(Into::into).and_then(|f| {
422            RowsMetadataFlags::from_bits(f as i32)
423                .ok_or_else(|| "Unexpected rows metadata flag".into())
424        })
425    }
426}
427
428/// Table specification.
429#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
430pub struct TableSpec {
431    pub ks_name: String,
432    pub table_name: String,
433}
434
435impl Serialize for TableSpec {
436    #[inline]
437    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
438        serialize_str(cursor, &self.ks_name, version);
439        serialize_str(cursor, &self.table_name, version);
440    }
441}
442
443impl FromCursor for TableSpec {
444    fn from_cursor(cursor: &mut Cursor<&[u8]>, _version: Version) -> error::Result<Self> {
445        let ks_name = from_cursor_str(cursor)?.to_string();
446        let table_name = from_cursor_str(cursor)?.to_string();
447        Ok(TableSpec {
448            ks_name,
449            table_name,
450        })
451    }
452}
453
454/// Single column specification.
455#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
456pub struct ColSpec {
457    /// The initial <ks_name> and <table_name> are strings and only present
458    /// if the Global_tables_spec flag is NOT set
459    pub table_spec: Option<TableSpec>,
460    /// Column name
461    pub name: String,
462    /// Column type defined in spec in 4.2.5.2
463    pub col_type: ColTypeOption,
464}
465
466impl Serialize for ColSpec {
467    #[inline]
468    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
469        if let Some(table_spec) = &self.table_spec {
470            table_spec.serialize(cursor, version);
471        }
472
473        serialize_str(cursor, &self.name, version);
474        self.col_type.serialize(cursor, version);
475    }
476}
477
478impl ColSpec {
479    pub fn parse_colspecs(
480        cursor: &mut Cursor<&[u8]>,
481        column_count: i32,
482        has_global_table_space: bool,
483        version: Version,
484    ) -> error::Result<Vec<ColSpec>> {
485        (0..column_count)
486            .map(|_| {
487                let table_spec = if !has_global_table_space {
488                    Some(TableSpec::from_cursor(cursor, version)?)
489                } else {
490                    None
491                };
492
493                let name = from_cursor_str(cursor)?.to_string();
494                let col_type = ColTypeOption::from_cursor(cursor, version)?;
495
496                Ok(ColSpec {
497                    table_spec,
498                    name,
499                    col_type,
500                })
501            })
502            .collect::<Result<_, _>>()
503    }
504}
505
506/// Cassandra data types which could be returned by a server.
507#[derive(Debug, Clone, Display, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)]
508#[non_exhaustive]
509pub enum ColType {
510    Custom,
511    Ascii,
512    Bigint,
513    Blob,
514    Boolean,
515    Counter,
516    Decimal,
517    Double,
518    Float,
519    Int,
520    Timestamp,
521    Uuid,
522    Varchar,
523    Varint,
524    Timeuuid,
525    Inet,
526    Date,
527    Time,
528    Smallint,
529    Tinyint,
530    Duration,
531    List,
532    Map,
533    Set,
534    Udt,
535    Tuple,
536}
537
538impl TryFrom<CIntShort> for ColType {
539    type Error = Error;
540
541    fn try_from(value: CIntShort) -> Result<Self, Self::Error> {
542        match value {
543            0x0000 => Ok(ColType::Custom),
544            0x0001 => Ok(ColType::Ascii),
545            0x0002 => Ok(ColType::Bigint),
546            0x0003 => Ok(ColType::Blob),
547            0x0004 => Ok(ColType::Boolean),
548            0x0005 => Ok(ColType::Counter),
549            0x0006 => Ok(ColType::Decimal),
550            0x0007 => Ok(ColType::Double),
551            0x0008 => Ok(ColType::Float),
552            0x0009 => Ok(ColType::Int),
553            0x000B => Ok(ColType::Timestamp),
554            0x000C => Ok(ColType::Uuid),
555            0x000D => Ok(ColType::Varchar),
556            0x000E => Ok(ColType::Varint),
557            0x000F => Ok(ColType::Timeuuid),
558            0x0010 => Ok(ColType::Inet),
559            0x0011 => Ok(ColType::Date),
560            0x0012 => Ok(ColType::Time),
561            0x0013 => Ok(ColType::Smallint),
562            0x0014 => Ok(ColType::Tinyint),
563            0x0015 => Ok(ColType::Duration),
564            0x0020 => Ok(ColType::List),
565            0x0021 => Ok(ColType::Map),
566            0x0022 => Ok(ColType::Set),
567            0x0030 => Ok(ColType::Udt),
568            0x0031 => Ok(ColType::Tuple),
569            0x0080 => Ok(ColType::Varchar),
570            _ => Err(Error::UnexpectedColumnType(value)),
571        }
572    }
573}
574
575impl FromBytes for ColType {
576    fn from_bytes(bytes: &[u8]) -> error::Result<ColType> {
577        try_i16_from_bytes(bytes)
578            .map_err(Into::into)
579            .and_then(ColType::try_from)
580    }
581}
582
583impl Serialize for ColType {
584    #[inline]
585    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
586        (match self {
587            ColType::Custom => 0x0000,
588            ColType::Ascii => 0x0001,
589            ColType::Bigint => 0x0002,
590            ColType::Blob => 0x0003,
591            ColType::Boolean => 0x0004,
592            ColType::Counter => 0x0005,
593            ColType::Decimal => 0x0006,
594            ColType::Double => 0x0007,
595            ColType::Float => 0x0008,
596            ColType::Int => 0x0009,
597            ColType::Timestamp => 0x000B,
598            ColType::Uuid => 0x000C,
599            ColType::Varchar => 0x000D,
600            ColType::Varint => 0x000E,
601            ColType::Timeuuid => 0x000F,
602            ColType::Inet => 0x0010,
603            ColType::Date => 0x0011,
604            ColType::Time => 0x0012,
605            ColType::Smallint => 0x0013,
606            ColType::Tinyint => 0x0014,
607            ColType::Duration => 0x0015,
608            ColType::List => 0x0020,
609            ColType::Map => 0x0021,
610            ColType::Set => 0x0022,
611            ColType::Udt => 0x0030,
612            ColType::Tuple => 0x0031,
613        } as CIntShort)
614            .serialize(cursor, version);
615    }
616}
617
618impl FromCursor for ColType {
619    fn from_cursor(cursor: &mut Cursor<&[u8]>, _version: Version) -> error::Result<ColType> {
620        let mut buff = [0; SHORT_LEN];
621        cursor.read_exact(&mut buff)?;
622
623        let t = CIntShort::from_be_bytes(buff);
624        t.try_into()
625    }
626}
627
628/// Cassandra option that represent column type.
629#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
630pub struct ColTypeOption {
631    /// Id refers to `ColType`.
632    pub id: ColType,
633    /// Values depending on column type.
634    pub value: Option<ColTypeOptionValue>,
635}
636
637impl Serialize for ColTypeOption {
638    #[inline]
639    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
640        self.id.serialize(cursor, version);
641        if let Some(value) = &self.value {
642            value.serialize(cursor, version);
643        }
644    }
645}
646
647impl FromCursor for ColTypeOption {
648    fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<ColTypeOption> {
649        let id = ColType::from_cursor(cursor, version)?;
650        let value = match id {
651            ColType::Custom => Some(ColTypeOptionValue::CString(
652                from_cursor_str(cursor)?.to_string(),
653            )),
654            ColType::Set => {
655                let col_type = ColTypeOption::from_cursor(cursor, version)?;
656                Some(ColTypeOptionValue::CSet(Box::new(col_type)))
657            }
658            ColType::List => {
659                let col_type = ColTypeOption::from_cursor(cursor, version)?;
660                Some(ColTypeOptionValue::CList(Box::new(col_type)))
661            }
662            ColType::Udt => Some(ColTypeOptionValue::UdtType(CUdt::from_cursor(
663                cursor, version,
664            )?)),
665            ColType::Tuple => Some(ColTypeOptionValue::TupleType(CTuple::from_cursor(
666                cursor, version,
667            )?)),
668            ColType::Map => {
669                let name_type = ColTypeOption::from_cursor(cursor, version)?;
670                let value_type = ColTypeOption::from_cursor(cursor, version)?;
671                Some(ColTypeOptionValue::CMap(
672                    Box::new(name_type),
673                    Box::new(value_type),
674                ))
675            }
676            _ => None,
677        };
678
679        Ok(ColTypeOption { id, value })
680    }
681}
682
683/// Enum that represents all possible types of `value` of `ColTypeOption`.
684#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
685#[non_exhaustive]
686pub enum ColTypeOptionValue {
687    CString(String),
688    ColType(ColType),
689    CSet(Box<ColTypeOption>),
690    CList(Box<ColTypeOption>),
691    UdtType(CUdt),
692    TupleType(CTuple),
693    CMap(Box<ColTypeOption>, Box<ColTypeOption>),
694}
695
696impl Serialize for ColTypeOptionValue {
697    #[inline]
698    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
699        match self {
700            Self::CString(c) => serialize_str(cursor, c, version),
701            Self::ColType(c) => c.serialize(cursor, version),
702            Self::CSet(c) => c.serialize(cursor, version),
703            Self::CList(c) => c.serialize(cursor, version),
704            Self::UdtType(c) => c.serialize(cursor, version),
705            Self::TupleType(c) => c.serialize(cursor, version),
706            Self::CMap(v1, v2) => {
707                v1.serialize(cursor, version);
708                v2.serialize(cursor, version);
709            }
710        }
711    }
712}
713
714/// User defined type.
715#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
716pub struct CUdt {
717    /// Keyspace name.
718    pub ks: String,
719    /// Udt name
720    pub udt_name: String,
721    /// List of pairs `(name, type)` where name is field name and type is type of field.
722    pub descriptions: Vec<(String, ColTypeOption)>,
723}
724
725impl Serialize for CUdt {
726    #[inline]
727    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
728        serialize_str(cursor, &self.ks, version);
729        serialize_str(cursor, &self.udt_name, version);
730        (self.descriptions.len() as i16).serialize(cursor, version);
731        self.descriptions.iter().for_each(|(name, col_type)| {
732            serialize_str(cursor, name, version);
733            col_type.serialize(cursor, version);
734        });
735    }
736}
737
738impl FromCursor for CUdt {
739    fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<CUdt> {
740        let ks = from_cursor_str(cursor)?.to_string();
741        let udt_name = from_cursor_str(cursor)?.to_string();
742
743        let mut buff = [0; SHORT_LEN];
744        cursor.read_exact(&mut buff)?;
745
746        let n = i16::from_be_bytes(buff);
747        let mut descriptions = Vec::with_capacity(n as usize);
748        for _ in 0..n {
749            let name = from_cursor_str(cursor)?.to_string();
750            let col_type = ColTypeOption::from_cursor(cursor, version)?;
751            descriptions.push((name, col_type));
752        }
753
754        Ok(CUdt {
755            ks,
756            udt_name,
757            descriptions,
758        })
759    }
760}
761
762/// User defined type.
763/// [Read more...](https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v4.spec#L608)
764#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
765pub struct CTuple {
766    /// List of types.
767    pub types: Vec<ColTypeOption>,
768}
769
770impl Serialize for CTuple {
771    #[inline]
772    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
773        (self.types.len() as i16).serialize(cursor, version);
774        self.types.iter().for_each(|f| f.serialize(cursor, version));
775    }
776}
777
778impl FromCursor for CTuple {
779    fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<CTuple> {
780        let mut buff = [0; SHORT_LEN];
781        cursor.read_exact(&mut buff)?;
782
783        let n = i16::from_be_bytes(buff);
784        let mut types = Vec::with_capacity(n as usize);
785        for _ in 0..n {
786            let col_type = ColTypeOption::from_cursor(cursor, version)?;
787            types.push(col_type);
788        }
789
790        Ok(CTuple { types })
791    }
792}
793
794/// The structure represents a body of a response envelope of type `prepared`
795#[derive(Debug, PartialEq, Eq, Clone, Hash)]
796pub struct BodyResResultPrepared {
797    /// id of prepared request
798    pub id: CBytesShort,
799    /// result metadata id (only available since V5)
800    pub result_metadata_id: Option<CBytesShort>,
801    /// metadata
802    pub metadata: PreparedMetadata,
803    /// It is defined exactly the same as <metadata> in the Rows
804    /// documentation.
805    pub result_metadata: RowsMetadata,
806}
807
808impl Serialize for BodyResResultPrepared {
809    #[inline]
810    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
811        self.id.serialize(cursor, version);
812
813        if let Some(result_metadata_id) = &self.result_metadata_id {
814            result_metadata_id.serialize(cursor, version);
815        }
816
817        self.metadata.serialize(cursor, version);
818        self.result_metadata.serialize(cursor, version);
819    }
820}
821
822impl BodyResResultPrepared {
823    fn from_cursor(
824        cursor: &mut Cursor<&[u8]>,
825        version: Version,
826    ) -> error::Result<BodyResResultPrepared> {
827        let id = CBytesShort::from_cursor(cursor, version)?;
828
829        let result_metadata_id = if version == Version::V5 {
830            Some(CBytesShort::from_cursor(cursor, version)?)
831        } else {
832            None
833        };
834
835        let metadata = PreparedMetadata::from_cursor(cursor, version)?;
836        let result_metadata = RowsMetadata::from_cursor(cursor, version)?;
837
838        Ok(BodyResResultPrepared {
839            id,
840            result_metadata_id,
841            metadata,
842            result_metadata,
843        })
844    }
845}
846
847bitflags! {
848    pub struct PreparedMetadataFlags: i32 {
849        const GLOBAL_TABLE_SPACE = 0x0001;
850    }
851}
852
853impl Serialize for PreparedMetadataFlags {
854    #[inline]
855    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
856        self.bits().serialize(cursor, version);
857    }
858}
859
860/// The structure that represents metadata of prepared response.
861#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
862pub struct PreparedMetadata {
863    pub pk_indexes: Vec<i16>,
864    pub global_table_spec: Option<TableSpec>,
865    pub col_specs: Vec<ColSpec>,
866}
867
868impl Serialize for PreparedMetadata {
869    #[inline]
870    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
871        if self.global_table_spec.is_some() {
872            PreparedMetadataFlags::GLOBAL_TABLE_SPACE
873        } else {
874            PreparedMetadataFlags::empty()
875        }
876        .serialize(cursor, version);
877
878        let columns_count = self.col_specs.len() as i32;
879        columns_count.serialize(cursor, version);
880
881        let pk_count = self.pk_indexes.len() as i32;
882        pk_count.serialize(cursor, version);
883
884        self.pk_indexes
885            .iter()
886            .for_each(|f| f.serialize(cursor, version));
887
888        if let Some(global_table_spec) = &self.global_table_spec {
889            global_table_spec.serialize(cursor, version);
890        }
891
892        self.col_specs
893            .iter()
894            .for_each(|x| x.serialize(cursor, version));
895    }
896}
897
898impl PreparedMetadata {
899    fn from_cursor(
900        cursor: &mut Cursor<&[u8]>,
901        version: Version,
902    ) -> error::Result<PreparedMetadata> {
903        let flags = PreparedMetadataFlags::from_bits_truncate(CInt::from_cursor(cursor, version)?);
904        let columns_count = CInt::from_cursor(cursor, version)?;
905
906        let pk_count = if let Version::V3 = version {
907            0
908        } else {
909            // v4 or v5
910            CInt::from_cursor(cursor, version)?
911        };
912
913        let pk_indexes = (0..pk_count)
914            .map(|_| {
915                let mut buff = [0; SHORT_LEN];
916                cursor.read_exact(&mut buff)?;
917
918                Ok(i16::from_be_bytes(buff))
919            })
920            .collect::<Result<Vec<i16>, IoError>>()?;
921
922        let has_global_table_space = flags.contains(PreparedMetadataFlags::GLOBAL_TABLE_SPACE);
923        let global_table_spec =
924            extract_global_table_space(cursor, has_global_table_space, version)?;
925        let col_specs =
926            ColSpec::parse_colspecs(cursor, columns_count, has_global_table_space, version)?;
927
928        Ok(PreparedMetadata {
929            pk_indexes,
930            global_table_spec,
931            col_specs,
932        })
933    }
934}
935
936fn extract_global_table_space(
937    cursor: &mut Cursor<&[u8]>,
938    has_global_table_space: bool,
939    version: Version,
940) -> error::Result<Option<TableSpec>> {
941    Ok(if has_global_table_space {
942        Some(TableSpec::from_cursor(cursor, version)?)
943    } else {
944        None
945    })
946}
947
948//noinspection DuplicatedCode
949#[cfg(test)]
950fn test_encode_decode(bytes: &[u8], expected: ResResultBody) {
951    {
952        let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
953        let result = ResResultBody::from_cursor(&mut cursor, Version::V4).unwrap();
954        assert_eq!(expected, result);
955    }
956
957    {
958        let mut buffer = Vec::new();
959        let mut cursor = Cursor::new(&mut buffer);
960        expected.serialize(&mut cursor, Version::V4);
961        assert_eq!(buffer, bytes);
962    }
963}
964
965#[cfg(test)]
966mod cudt {
967    use super::*;
968
969    //noinspection DuplicatedCode
970    #[test]
971    fn cudt() {
972        let bytes = &[
973            0, 3, 98, 97, 114, // keyspace name - bar
974            0, 3, 102, 111, 111, // udt_name - foo
975            0, 2, // length
976            // pair 1
977            0, 3, 98, 97, 114, //name - bar
978            0, 9, // col type int
979            //
980            // // pair 2
981            0, 3, 102, 111, 111, // name - foo
982            0, 9, // col type int
983        ];
984        let expected = CUdt {
985            ks: "bar".into(),
986            udt_name: "foo".into(),
987            descriptions: vec![
988                (
989                    "bar".into(),
990                    ColTypeOption {
991                        id: ColType::Int,
992                        value: None,
993                    },
994                ),
995                (
996                    "foo".into(),
997                    ColTypeOption {
998                        id: ColType::Int,
999                        value: None,
1000                    },
1001                ),
1002            ],
1003        };
1004
1005        {
1006            let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1007            let udt = CUdt::from_cursor(&mut cursor, Version::V4).unwrap();
1008            assert_eq!(udt, expected);
1009        }
1010
1011        {
1012            let mut buffer = Vec::new();
1013            let mut cursor = Cursor::new(&mut buffer);
1014            expected.serialize(&mut cursor, Version::V4);
1015            assert_eq!(buffer, bytes);
1016        }
1017    }
1018}
1019
1020#[cfg(test)]
1021//noinspection DuplicatedCode
1022mod ctuple {
1023    use super::*;
1024
1025    #[test]
1026    fn ctuple() {
1027        let bytes = &[0, 3, 0, 9, 0, 9, 0, 9];
1028        let expected = CTuple {
1029            types: vec![
1030                ColTypeOption {
1031                    id: ColType::Int,
1032                    value: None,
1033                };
1034                3
1035            ],
1036        };
1037
1038        {
1039            let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1040            let tuple = CTuple::from_cursor(&mut cursor, Version::V4).unwrap();
1041            assert_eq!(tuple, expected);
1042        }
1043
1044        {
1045            let mut buffer = Vec::new();
1046            let mut cursor = Cursor::new(&mut buffer);
1047            expected.serialize(&mut cursor, Version::V4);
1048            assert_eq!(buffer, bytes);
1049        }
1050    }
1051}
1052
1053#[cfg(test)]
1054//noinspection DuplicatedCode
1055mod col_spec {
1056    use super::*;
1057
1058    #[test]
1059    fn col_spec_with_table_spec() {
1060        let bytes = &[
1061            // table spec
1062            0, 3, 98, 97, 114, // bar
1063            0, 3, 102, 111, 111, //foo
1064            //
1065            0, 3, 102, 111, 111, //name - foo
1066            //
1067            0, 9, // col type - int
1068        ];
1069
1070        let expected = vec![ColSpec {
1071            table_spec: Some(TableSpec {
1072                ks_name: "bar".into(),
1073                table_name: "foo".into(),
1074            }),
1075            name: "foo".into(),
1076            col_type: ColTypeOption {
1077                id: ColType::Int,
1078                value: None,
1079            },
1080        }];
1081
1082        {
1083            let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1084            let col_spec = ColSpec::parse_colspecs(&mut cursor, 1, false, Version::V4).unwrap();
1085            assert_eq!(col_spec, expected);
1086        }
1087
1088        {
1089            let mut buffer = Vec::new();
1090            let mut cursor = Cursor::new(&mut buffer);
1091            expected[0].serialize(&mut cursor, Version::V4);
1092            assert_eq!(buffer, bytes);
1093        }
1094    }
1095
1096    #[test]
1097    fn col_spec_without_table_spec() {
1098        let bytes = &[
1099            0, 3, 102, 111, 111, //name - foo
1100            //
1101            0, 9, // col type - int
1102        ];
1103        let expected = vec![ColSpec {
1104            table_spec: None,
1105            name: "foo".into(),
1106            col_type: ColTypeOption {
1107                id: ColType::Int,
1108                value: None,
1109            },
1110        }];
1111
1112        {
1113            let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1114            let col_spec = ColSpec::parse_colspecs(&mut cursor, 1, true, Version::V4).unwrap();
1115            assert_eq!(col_spec, expected);
1116        }
1117
1118        {
1119            let mut buffer = Vec::new();
1120            let mut cursor = Cursor::new(&mut buffer);
1121            expected[0].serialize(&mut cursor, Version::V4);
1122            assert_eq!(buffer, bytes);
1123        }
1124    }
1125}
1126
1127#[cfg(test)]
1128//noinspection DuplicatedCode
1129mod col_type_option {
1130    use super::*;
1131
1132    #[test]
1133    fn col_type_options_int() {
1134        let bytes = &[0, 9];
1135        let expected = ColTypeOption {
1136            id: ColType::Int,
1137            value: None,
1138        };
1139
1140        {
1141            let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1142            let col_type_option = ColTypeOption::from_cursor(&mut cursor, Version::V4).unwrap();
1143            assert_eq!(col_type_option, expected);
1144        }
1145
1146        {
1147            let mut buffer = Vec::new();
1148            let mut cursor = Cursor::new(&mut buffer);
1149            expected.serialize(&mut cursor, Version::V4);
1150            assert_eq!(buffer, bytes);
1151        }
1152    }
1153
1154    #[test]
1155    fn col_type_options_map() {
1156        let bytes = &[0, 33, 0, 9, 0, 9];
1157        let expected = ColTypeOption {
1158            id: ColType::Map,
1159            value: Some(ColTypeOptionValue::CMap(
1160                Box::new(ColTypeOption {
1161                    id: ColType::Int,
1162                    value: None,
1163                }),
1164                Box::new(ColTypeOption {
1165                    id: ColType::Int,
1166                    value: None,
1167                }),
1168            )),
1169        };
1170
1171        {
1172            let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1173            let col_type_option = ColTypeOption::from_cursor(&mut cursor, Version::V4).unwrap();
1174            assert_eq!(col_type_option, expected);
1175        }
1176
1177        {
1178            let mut buffer = Vec::new();
1179            let mut cursor = Cursor::new(&mut buffer);
1180            expected.serialize(&mut cursor, Version::V4);
1181            assert_eq!(buffer, bytes);
1182        }
1183    }
1184}
1185
1186#[cfg(test)]
1187//noinspection DuplicatedCode
1188mod table_spec {
1189    use super::*;
1190
1191    #[test]
1192    fn table_spec() {
1193        let bytes = &[
1194            0, 3, 98, 97, 114, // bar
1195            0, 3, 102, 111, 111, //foo
1196        ];
1197        let expected = TableSpec {
1198            ks_name: "bar".into(),
1199            table_name: "foo".into(),
1200        };
1201
1202        {
1203            let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1204            let table_spec = TableSpec::from_cursor(&mut cursor, Version::V4).unwrap();
1205            assert_eq!(table_spec, expected);
1206        }
1207
1208        {
1209            let mut buffer = Vec::new();
1210            let mut cursor = Cursor::new(&mut buffer);
1211            expected.serialize(&mut cursor, Version::V4);
1212            assert_eq!(buffer, bytes);
1213        }
1214    }
1215}
1216
1217#[cfg(test)]
1218mod void {
1219    use super::*;
1220
1221    #[test]
1222    fn test_void() {
1223        let bytes = &[0, 0, 0, 1];
1224        let expected = ResResultBody::Void;
1225        test_encode_decode(bytes, expected);
1226    }
1227}
1228
1229#[cfg(test)]
1230//noinspection DuplicatedCode
1231mod rows_metadata {
1232    use super::*;
1233
1234    #[test]
1235    fn rows_metadata() {
1236        let bytes = &[
1237            0, 0, 0, 8, // rows metadata flag
1238            0, 0, 0, 2, // columns count
1239            0, 1, 1, // new metadata id
1240            //
1241            // Col Spec 1
1242            0, 7, 107, 115, 110, 97, 109, 101, 49, // ksname1
1243            0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, // tablename
1244            0, 3, 102, 111, 111, // name
1245            0, 9, // col type id
1246            //
1247            // Col spec 2
1248            0, 7, 107, 115, 110, 97, 109, 101, 49, // ksname1
1249            0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, // tablename
1250            0, 3, 98, 97, 114, // name
1251            0, 19, // col type
1252        ];
1253
1254        let expected = RowsMetadata {
1255            flags: RowsMetadataFlags::METADATA_CHANGED,
1256
1257            columns_count: 2,
1258            paging_state: None,
1259            new_metadata_id: Some(CBytesShort::new(vec![1])),
1260            global_table_spec: None,
1261            col_specs: vec![
1262                ColSpec {
1263                    table_spec: Some(TableSpec {
1264                        ks_name: "ksname1".into(),
1265                        table_name: "tablename".into(),
1266                    }),
1267                    name: "foo".into(),
1268                    col_type: ColTypeOption {
1269                        id: ColType::Int,
1270                        value: None,
1271                    },
1272                },
1273                ColSpec {
1274                    table_spec: Some(TableSpec {
1275                        ks_name: "ksname1".into(),
1276                        table_name: "tablename".into(),
1277                    }),
1278                    name: "bar".into(),
1279                    col_type: ColTypeOption {
1280                        id: ColType::Smallint,
1281                        value: None,
1282                    },
1283                },
1284            ],
1285        };
1286
1287        {
1288            let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1289            let metadata = RowsMetadata::from_cursor(&mut cursor, Version::V4).unwrap();
1290            assert_eq!(metadata, expected);
1291        }
1292
1293        {
1294            let mut buffer = Vec::new();
1295            let mut cursor = Cursor::new(&mut buffer);
1296            expected.serialize(&mut cursor, Version::V4);
1297            assert_eq!(buffer, bytes);
1298        }
1299    }
1300}
1301
1302#[cfg(test)]
1303//noinspection DuplicatedCode
1304mod rows {
1305    use super::*;
1306
1307    #[test]
1308    fn test_rows() {
1309        let bytes = &[
1310            0, 0, 0, 2, // rows flag
1311            0, 0, 0, 0, // rows metadata flag
1312            0, 0, 0, 2, // columns count
1313            //
1314            // Col Spec 1
1315            0, 7, 107, 115, 110, 97, 109, 101, 49, // ksname1
1316            0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, // tablename
1317            0, 3, 102, 111, 111, // name
1318            0, 9, // col type id
1319            //
1320            // Col spec 2
1321            0, 7, 107, 115, 110, 97, 109, 101, 49, // ksname1
1322            0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, // tablename
1323            0, 3, 98, 97, 114, // name
1324            0, 19, // col type
1325            0, 0, 0, 0, // rows count
1326        ];
1327
1328        let expected = ResResultBody::Rows(BodyResResultRows {
1329            metadata: RowsMetadata {
1330                flags: RowsMetadataFlags::empty(),
1331                columns_count: 2,
1332                paging_state: None,
1333                new_metadata_id: None,
1334                global_table_spec: None,
1335                col_specs: vec![
1336                    ColSpec {
1337                        table_spec: Some(TableSpec {
1338                            ks_name: "ksname1".into(),
1339                            table_name: "tablename".into(),
1340                        }),
1341                        name: "foo".into(),
1342                        col_type: ColTypeOption {
1343                            id: ColType::Int,
1344                            value: None,
1345                        },
1346                    },
1347                    ColSpec {
1348                        table_spec: Some(TableSpec {
1349                            ks_name: "ksname1".into(),
1350                            table_name: "tablename".into(),
1351                        }),
1352                        name: "bar".into(),
1353                        col_type: ColTypeOption {
1354                            id: ColType::Smallint,
1355                            value: None,
1356                        },
1357                    },
1358                ],
1359            },
1360            rows_count: 0,
1361            rows_content: vec![],
1362            protocol_version: Version::V4,
1363        });
1364
1365        test_encode_decode(bytes, expected);
1366    }
1367
1368    #[test]
1369    fn test_rows_no_metadata() {
1370        let bytes = &[
1371            0, 0, 0, 2, // rows flag
1372            0, 0, 0, 4, // rows metadata flag
1373            0, 0, 0, 3, // columns count
1374            0, 0, 0, 0, // rows count
1375        ];
1376
1377        let expected = ResResultBody::Rows(BodyResResultRows {
1378            metadata: RowsMetadata {
1379                flags: RowsMetadataFlags::NO_METADATA,
1380                columns_count: 3,
1381                paging_state: None,
1382                new_metadata_id: None,
1383                global_table_spec: None,
1384                col_specs: vec![],
1385            },
1386            rows_count: 0,
1387            rows_content: vec![],
1388            protocol_version: Version::V4,
1389        });
1390
1391        test_encode_decode(bytes, expected);
1392    }
1393}
1394
1395#[cfg(test)]
1396mod keyspace {
1397    use super::*;
1398
1399    #[test]
1400    fn test_set_keyspace() {
1401        let bytes = &[
1402            0, 0, 0, 3, // keyspace flag
1403            0, 4, 98, 108, 97, 104, // blah
1404        ];
1405
1406        let expected = ResResultBody::SetKeyspace(BodyResResultSetKeyspace {
1407            body: "blah".into(),
1408        });
1409
1410        test_encode_decode(bytes, expected);
1411    }
1412}
1413
1414#[cfg(test)]
1415//noinspection DuplicatedCode
1416mod prepared_metadata {
1417    use super::*;
1418
1419    #[test]
1420    fn prepared_metadata() {
1421        let bytes = &[
1422            0, 0, 0, 0, // global table space flag
1423            0, 0, 0, 2, // columns counts
1424            0, 0, 0, 1, // pk_count
1425            0, 0, // pk_index
1426            //
1427            // col specs
1428            // col spec 1
1429            0, 7, 107, 115, 110, 97, 109, 101, 49, // ksname1
1430            0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, // tablename
1431            0, 3, 102, 111, 111, // foo
1432            0, 9, // id
1433            //
1434            // col spec 2
1435            0, 7, 107, 115, 110, 97, 109, 101, 49, // ksname1
1436            0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, // tablename
1437            0, 3, 98, 97, 114, // bar
1438            0, 19, // id
1439        ];
1440
1441        let expected = PreparedMetadata {
1442            pk_indexes: vec![0],
1443            global_table_spec: None,
1444            col_specs: vec![
1445                ColSpec {
1446                    table_spec: Some(TableSpec {
1447                        ks_name: "ksname1".into(),
1448                        table_name: "tablename".into(),
1449                    }),
1450                    name: "foo".into(),
1451                    col_type: ColTypeOption {
1452                        id: ColType::Int,
1453                        value: None,
1454                    },
1455                },
1456                ColSpec {
1457                    table_spec: Some(TableSpec {
1458                        ks_name: "ksname1".into(),
1459                        table_name: "tablename".into(),
1460                    }),
1461                    name: "bar".into(),
1462                    col_type: ColTypeOption {
1463                        id: ColType::Smallint,
1464                        value: None,
1465                    },
1466                },
1467            ],
1468        };
1469
1470        {
1471            let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
1472            let metadata = PreparedMetadata::from_cursor(&mut cursor, Version::V4).unwrap();
1473            assert_eq!(metadata, expected);
1474        }
1475
1476        {
1477            let mut buffer = Vec::new();
1478            let mut cursor = Cursor::new(&mut buffer);
1479            expected.serialize(&mut cursor, Version::V4);
1480            assert_eq!(buffer, bytes);
1481        }
1482    }
1483}
1484
1485#[cfg(test)]
1486//noinspection DuplicatedCode
1487mod prepared {
1488    use super::*;
1489    use crate::types::{to_short, CBytesShort};
1490
1491    #[test]
1492    fn test_prepared() {
1493        let bytes = &[
1494            0, 0, 0, 4, // prepared
1495            0, 2, 0, 1, // id
1496            //
1497            // prepared flags
1498            0, 0, 0, 0, // global table space flag
1499            0, 0, 0, 2, // columns counts
1500            0, 0, 0, 1, // pk_count
1501            0, 0, // pk_index
1502            //
1503            // col specs
1504            // col spec 1
1505            0, 7, 107, 115, 110, 97, 109, 101, 49, // ksname1
1506            0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, // tablename
1507            0, 3, 102, 111, 111, // foo
1508            0, 9, // id
1509            //
1510            // col spec 2
1511            0, 7, 107, 115, 110, 97, 109, 101, 49, // ksname1
1512            0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, // tablename
1513            0, 3, 98, 97, 114, // bar
1514            0, 19, // id
1515            //
1516            // rows metadata
1517            0, 0, 0, 0, // empty flags
1518            0, 0, 0, 2, // columns count
1519            0, 7, 107, 115, 110, 97, 109, 101, 49, // ksname1
1520            0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, // tablename
1521            0, 3, 102, 111, 111, // foo
1522            0, 9, // int
1523            0, 7, 107, 115, 110, 97, 109, 101, 49, // ksname1
1524            0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, // tablename
1525            0, 3, 98, 97, 114, // bar
1526            0, 19, // id
1527        ];
1528
1529        let expected = ResResultBody::Prepared(BodyResResultPrepared {
1530            id: CBytesShort::new(to_short(1)),
1531            result_metadata_id: None,
1532            metadata: PreparedMetadata {
1533                pk_indexes: vec![0],
1534                global_table_spec: None,
1535                col_specs: vec![
1536                    ColSpec {
1537                        table_spec: Some(TableSpec {
1538                            ks_name: "ksname1".into(),
1539                            table_name: "tablename".into(),
1540                        }),
1541                        name: "foo".into(),
1542                        col_type: ColTypeOption {
1543                            id: ColType::Int,
1544                            value: None,
1545                        },
1546                    },
1547                    ColSpec {
1548                        table_spec: Some(TableSpec {
1549                            ks_name: "ksname1".into(),
1550                            table_name: "tablename".into(),
1551                        }),
1552                        name: "bar".into(),
1553                        col_type: ColTypeOption {
1554                            id: ColType::Smallint,
1555                            value: None,
1556                        },
1557                    },
1558                ],
1559            },
1560            result_metadata: RowsMetadata {
1561                flags: RowsMetadataFlags::empty(),
1562                columns_count: 2,
1563                paging_state: None,
1564                new_metadata_id: None,
1565                global_table_spec: None,
1566                col_specs: vec![
1567                    ColSpec {
1568                        table_spec: Some(TableSpec {
1569                            ks_name: "ksname1".into(),
1570                            table_name: "tablename".into(),
1571                        }),
1572                        name: "foo".into(),
1573                        col_type: ColTypeOption {
1574                            id: ColType::Int,
1575                            value: None,
1576                        },
1577                    },
1578                    ColSpec {
1579                        table_spec: Some(TableSpec {
1580                            table_name: "tablename".into(),
1581                            ks_name: "ksname1".into(),
1582                        }),
1583                        name: "bar".into(),
1584                        col_type: ColTypeOption {
1585                            id: ColType::Smallint,
1586                            value: None,
1587                        },
1588                    },
1589                ],
1590            },
1591        });
1592
1593        test_encode_decode(bytes, expected);
1594    }
1595}
1596
1597#[cfg(test)]
1598mod schema_change {
1599    use super::*;
1600    use crate::frame::events::{SchemaChangeOptions, SchemaChangeTarget, SchemaChangeType};
1601
1602    #[test]
1603    fn test_schema_change() {
1604        let bytes = &[
1605            0, 0, 0, 5, // schema change
1606            0, 7, 67, 82, 69, 65, 84, 69, 68, // change type - created
1607            0, 8, 75, 69, 89, 83, 80, 65, 67, 69, // target keyspace
1608            0, 4, 98, 108, 97, 104, // options - blah
1609        ];
1610
1611        let expected = ResResultBody::SchemaChange(SchemaChange {
1612            change_type: SchemaChangeType::Created,
1613            target: SchemaChangeTarget::Keyspace,
1614            options: SchemaChangeOptions::Keyspace("blah".into()),
1615        });
1616
1617        test_encode_decode(bytes, expected);
1618    }
1619}