use crate::error;
use crate::error::Error;
use crate::frame::events::SchemaChange;
use crate::frame::{FromBytes, FromCursor, Serialize, Version};
use crate::types::rows::Row;
use crate::types::{
from_cursor_str, serialize_str, try_i16_from_bytes, try_i32_from_bytes, try_u64_from_bytes,
CBytes, CBytesShort, CInt, CIntShort, INT_LEN, SHORT_LEN,
};
use bitflags::bitflags;
use derive_more::{Constructor, Display};
use std::convert::{TryFrom, TryInto};
use std::io::{Cursor, Error as IoError, Read};
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Hash, Display)]
#[non_exhaustive]
pub enum ResultKind {
Void,
Rows,
SetKeyspace,
Prepared,
SchemaChange,
}
impl Serialize for ResultKind {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
CInt::from(*self).serialize(cursor, version);
}
}
impl FromBytes for ResultKind {
fn from_bytes(bytes: &[u8]) -> error::Result<ResultKind> {
try_i32_from_bytes(bytes)
.map_err(Into::into)
.and_then(ResultKind::try_from)
}
}
impl From<ResultKind> for CInt {
fn from(value: ResultKind) -> Self {
match value {
ResultKind::Void => 0x0001,
ResultKind::Rows => 0x0002,
ResultKind::SetKeyspace => 0x0003,
ResultKind::Prepared => 0x0004,
ResultKind::SchemaChange => 0x0005,
}
}
}
impl TryFrom<CInt> for ResultKind {
type Error = Error;
fn try_from(value: CInt) -> Result<Self, Self::Error> {
match value {
0x0001 => Ok(ResultKind::Void),
0x0002 => Ok(ResultKind::Rows),
0x0003 => Ok(ResultKind::SetKeyspace),
0x0004 => Ok(ResultKind::Prepared),
0x0005 => Ok(ResultKind::SchemaChange),
_ => Err(Error::UnexpectedResultKind(value)),
}
}
}
impl FromCursor for ResultKind {
fn from_cursor(cursor: &mut Cursor<&[u8]>, _version: Version) -> error::Result<ResultKind> {
let mut buff = [0; INT_LEN];
cursor.read_exact(&mut buff)?;
let rk = CInt::from_be_bytes(buff);
rk.try_into()
}
}
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
#[non_exhaustive]
pub enum ResResultBody {
Void,
Rows(BodyResResultRows),
SetKeyspace(BodyResResultSetKeyspace),
Prepared(BodyResResultPrepared),
SchemaChange(SchemaChange),
}
impl Serialize for ResResultBody {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
match &self {
ResResultBody::Void => {
ResultKind::Void.serialize(cursor, version);
}
ResResultBody::Rows(rows) => {
ResultKind::Rows.serialize(cursor, version);
rows.serialize(cursor, version);
}
ResResultBody::SetKeyspace(set_keyspace) => {
ResultKind::SetKeyspace.serialize(cursor, version);
set_keyspace.serialize(cursor, version);
}
ResResultBody::Prepared(prepared) => {
ResultKind::Prepared.serialize(cursor, version);
prepared.serialize(cursor, version);
}
ResResultBody::SchemaChange(schema_change) => {
ResultKind::SchemaChange.serialize(cursor, version);
schema_change.serialize(cursor, version);
}
}
}
}
impl ResResultBody {
fn parse_body_from_cursor(
cursor: &mut Cursor<&[u8]>,
result_kind: ResultKind,
version: Version,
) -> error::Result<ResResultBody> {
Ok(match result_kind {
ResultKind::Void => ResResultBody::Void,
ResultKind::Rows => {
ResResultBody::Rows(BodyResResultRows::from_cursor(cursor, version)?)
}
ResultKind::SetKeyspace => {
ResResultBody::SetKeyspace(BodyResResultSetKeyspace::from_cursor(cursor, version)?)
}
ResultKind::Prepared => {
ResResultBody::Prepared(BodyResResultPrepared::from_cursor(cursor, version)?)
}
ResultKind::SchemaChange => {
ResResultBody::SchemaChange(SchemaChange::from_cursor(cursor, version)?)
}
})
}
pub fn into_rows(self) -> Option<Vec<Row>> {
match self {
ResResultBody::Rows(rows_body) => Some(Row::from_body(rows_body)),
_ => None,
}
}
pub fn as_rows_metadata(&self) -> Option<&RowsMetadata> {
match self {
ResResultBody::Rows(rows_body) => Some(&rows_body.metadata),
_ => None,
}
}
pub fn into_prepared(self) -> Option<BodyResResultPrepared> {
match self {
ResResultBody::Prepared(p) => Some(p),
_ => None,
}
}
pub fn into_set_keyspace(self) -> Option<BodyResResultSetKeyspace> {
match self {
ResResultBody::SetKeyspace(p) => Some(p),
_ => None,
}
}
}
impl ResResultBody {
pub fn from_cursor(
cursor: &mut Cursor<&[u8]>,
version: Version,
) -> error::Result<ResResultBody> {
let result_kind = ResultKind::from_cursor(cursor, version)?;
ResResultBody::parse_body_from_cursor(cursor, result_kind, version)
}
}
#[derive(Debug, Constructor, PartialEq, Ord, PartialOrd, Eq, Clone, Hash)]
pub struct BodyResResultSetKeyspace {
pub body: String,
}
impl Serialize for BodyResResultSetKeyspace {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
serialize_str(cursor, &self.body, version);
}
}
impl FromCursor for BodyResResultSetKeyspace {
fn from_cursor(
cursor: &mut Cursor<&[u8]>,
_version: Version,
) -> error::Result<BodyResResultSetKeyspace> {
from_cursor_str(cursor).map(|x| BodyResResultSetKeyspace::new(x.to_string()))
}
}
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
pub struct BodyResResultRows {
pub metadata: RowsMetadata,
pub rows_count: CInt,
pub rows_content: Vec<Vec<CBytes>>,
pub protocol_version: Version,
}
impl Serialize for BodyResResultRows {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
self.metadata.serialize(cursor, version);
self.rows_count.serialize(cursor, version);
self.rows_content
.iter()
.flatten()
.for_each(|x| x.serialize(cursor, version));
}
}
impl BodyResResultRows {
fn rows_content(
cursor: &mut Cursor<&[u8]>,
rows_count: i32,
columns_count: i32,
version: Version,
) -> error::Result<Vec<Vec<CBytes>>> {
(0..rows_count)
.map(|_| {
(0..columns_count)
.map(|_| CBytes::from_cursor(cursor, version))
.collect::<Result<_, _>>()
})
.collect::<Result<_, _>>()
}
}
impl FromCursor for BodyResResultRows {
fn from_cursor(
cursor: &mut Cursor<&[u8]>,
version: Version,
) -> error::Result<BodyResResultRows> {
let metadata = RowsMetadata::from_cursor(cursor, version)?;
let rows_count = CInt::from_cursor(cursor, version)?;
let rows_content =
BodyResResultRows::rows_content(cursor, rows_count, metadata.columns_count, version)?;
Ok(BodyResResultRows {
metadata,
rows_count,
rows_content,
protocol_version: version,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RowsMetadata {
pub flags: RowsMetadataFlags,
pub columns_count: i32,
pub paging_state: Option<CBytes>,
pub new_metadata_id: Option<CBytesShort>,
pub global_table_spec: Option<TableSpec>,
pub col_specs: Vec<ColSpec>,
}
impl Serialize for RowsMetadata {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
assert_eq!(
self.flags.contains(RowsMetadataFlags::HAS_MORE_PAGES),
self.paging_state.is_some()
);
match (
self.flags.contains(RowsMetadataFlags::NO_METADATA),
self.flags.contains(RowsMetadataFlags::GLOBAL_TABLE_SPACE),
) {
(false, false) => {
assert!(self.global_table_spec.is_none());
assert!(!self.col_specs.is_empty());
}
(false, true) => {
assert!(!self.col_specs.is_empty());
}
(true, _) => {
assert!(self.global_table_spec.is_none());
assert!(self.col_specs.is_empty());
}
}
self.flags.serialize(cursor, version);
self.columns_count.serialize(cursor, version);
if let Some(paging_state) = &self.paging_state {
paging_state.serialize(cursor, version);
}
if let Some(new_metadata_id) = &self.new_metadata_id {
new_metadata_id.serialize(cursor, version);
}
if let Some(global_table_spec) = &self.global_table_spec {
global_table_spec.serialize(cursor, version);
}
self.col_specs
.iter()
.for_each(|x| x.serialize(cursor, version));
}
}
impl FromCursor for RowsMetadata {
fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<RowsMetadata> {
let flags = RowsMetadataFlags::from_bits_truncate(CInt::from_cursor(cursor, version)?);
let columns_count = CInt::from_cursor(cursor, version)?;
let paging_state = if flags.contains(RowsMetadataFlags::HAS_MORE_PAGES) {
Some(CBytes::from_cursor(cursor, version)?)
} else {
None
};
if flags.contains(RowsMetadataFlags::NO_METADATA) {
return Ok(RowsMetadata {
flags,
columns_count,
paging_state,
new_metadata_id: None,
global_table_spec: None,
col_specs: vec![],
});
}
let new_metadata_id = if flags.contains(RowsMetadataFlags::METADATA_CHANGED) {
Some(CBytesShort::from_cursor(cursor, version)?)
} else {
None
};
let has_global_table_space = flags.contains(RowsMetadataFlags::GLOBAL_TABLE_SPACE);
let global_table_spec =
extract_global_table_space(cursor, has_global_table_space, version)?;
let col_specs =
ColSpec::parse_colspecs(cursor, columns_count, has_global_table_space, version)?;
Ok(RowsMetadata {
flags,
columns_count,
paging_state,
new_metadata_id,
global_table_spec,
col_specs,
})
}
}
bitflags! {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct RowsMetadataFlags: i32 {
const GLOBAL_TABLE_SPACE = 0x0001;
const HAS_MORE_PAGES = 0x0002;
const NO_METADATA = 0x0004;
const METADATA_CHANGED = 0x0008;
}
}
impl Serialize for RowsMetadataFlags {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
self.bits().serialize(cursor, version)
}
}
impl From<RowsMetadataFlags> for i32 {
fn from(value: RowsMetadataFlags) -> Self {
value.bits()
}
}
impl FromBytes for RowsMetadataFlags {
fn from_bytes(bytes: &[u8]) -> error::Result<RowsMetadataFlags> {
try_u64_from_bytes(bytes).map_err(Into::into).and_then(|f| {
RowsMetadataFlags::from_bits(f as i32)
.ok_or_else(|| "Unexpected rows metadata flag".into())
})
}
}
#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
pub struct TableSpec {
pub ks_name: String,
pub table_name: String,
}
impl Serialize for TableSpec {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
serialize_str(cursor, &self.ks_name, version);
serialize_str(cursor, &self.table_name, version);
}
}
impl FromCursor for TableSpec {
fn from_cursor(cursor: &mut Cursor<&[u8]>, _version: Version) -> error::Result<Self> {
let ks_name = from_cursor_str(cursor)?.to_string();
let table_name = from_cursor_str(cursor)?.to_string();
Ok(TableSpec {
ks_name,
table_name,
})
}
}
#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
pub struct ColSpec {
pub table_spec: Option<TableSpec>,
pub name: String,
pub col_type: ColTypeOption,
}
impl Serialize for ColSpec {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
if let Some(table_spec) = &self.table_spec {
table_spec.serialize(cursor, version);
}
serialize_str(cursor, &self.name, version);
self.col_type.serialize(cursor, version);
}
}
impl ColSpec {
pub fn parse_colspecs(
cursor: &mut Cursor<&[u8]>,
column_count: i32,
has_global_table_space: bool,
version: Version,
) -> error::Result<Vec<ColSpec>> {
(0..column_count)
.map(|_| {
let table_spec = if !has_global_table_space {
Some(TableSpec::from_cursor(cursor, version)?)
} else {
None
};
let name = from_cursor_str(cursor)?.to_string();
let col_type = ColTypeOption::from_cursor(cursor, version)?;
Ok(ColSpec {
table_spec,
name,
col_type,
})
})
.collect::<Result<_, _>>()
}
}
#[derive(Debug, Clone, Display, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)]
#[non_exhaustive]
pub enum ColType {
Custom,
Ascii,
Bigint,
Blob,
Boolean,
Counter,
Decimal,
Double,
Float,
Int,
Timestamp,
Uuid,
Varchar,
Varint,
Timeuuid,
Inet,
Date,
Time,
Smallint,
Tinyint,
Duration,
List,
Map,
Set,
Udt,
Tuple,
}
impl TryFrom<CIntShort> for ColType {
type Error = Error;
fn try_from(value: CIntShort) -> Result<Self, Self::Error> {
match value {
0x0000 => Ok(ColType::Custom),
0x0001 => Ok(ColType::Ascii),
0x0002 => Ok(ColType::Bigint),
0x0003 => Ok(ColType::Blob),
0x0004 => Ok(ColType::Boolean),
0x0005 => Ok(ColType::Counter),
0x0006 => Ok(ColType::Decimal),
0x0007 => Ok(ColType::Double),
0x0008 => Ok(ColType::Float),
0x0009 => Ok(ColType::Int),
0x000B => Ok(ColType::Timestamp),
0x000C => Ok(ColType::Uuid),
0x000D => Ok(ColType::Varchar),
0x000E => Ok(ColType::Varint),
0x000F => Ok(ColType::Timeuuid),
0x0010 => Ok(ColType::Inet),
0x0011 => Ok(ColType::Date),
0x0012 => Ok(ColType::Time),
0x0013 => Ok(ColType::Smallint),
0x0014 => Ok(ColType::Tinyint),
0x0015 => Ok(ColType::Duration),
0x0020 => Ok(ColType::List),
0x0021 => Ok(ColType::Map),
0x0022 => Ok(ColType::Set),
0x0030 => Ok(ColType::Udt),
0x0031 => Ok(ColType::Tuple),
0x0080 => Ok(ColType::Varchar),
_ => Err(Error::UnexpectedColumnType(value)),
}
}
}
impl FromBytes for ColType {
fn from_bytes(bytes: &[u8]) -> error::Result<ColType> {
try_i16_from_bytes(bytes)
.map_err(Into::into)
.and_then(ColType::try_from)
}
}
impl Serialize for ColType {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
(match self {
ColType::Custom => 0x0000,
ColType::Ascii => 0x0001,
ColType::Bigint => 0x0002,
ColType::Blob => 0x0003,
ColType::Boolean => 0x0004,
ColType::Counter => 0x0005,
ColType::Decimal => 0x0006,
ColType::Double => 0x0007,
ColType::Float => 0x0008,
ColType::Int => 0x0009,
ColType::Timestamp => 0x000B,
ColType::Uuid => 0x000C,
ColType::Varchar => 0x000D,
ColType::Varint => 0x000E,
ColType::Timeuuid => 0x000F,
ColType::Inet => 0x0010,
ColType::Date => 0x0011,
ColType::Time => 0x0012,
ColType::Smallint => 0x0013,
ColType::Tinyint => 0x0014,
ColType::Duration => 0x0015,
ColType::List => 0x0020,
ColType::Map => 0x0021,
ColType::Set => 0x0022,
ColType::Udt => 0x0030,
ColType::Tuple => 0x0031,
} as CIntShort)
.serialize(cursor, version);
}
}
impl FromCursor for ColType {
fn from_cursor(cursor: &mut Cursor<&[u8]>, _version: Version) -> error::Result<ColType> {
let mut buff = [0; SHORT_LEN];
cursor.read_exact(&mut buff)?;
let t = CIntShort::from_be_bytes(buff);
t.try_into()
}
}
#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
pub struct ColTypeOption {
pub id: ColType,
pub value: Option<ColTypeOptionValue>,
}
impl Serialize for ColTypeOption {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
self.id.serialize(cursor, version);
if let Some(value) = &self.value {
value.serialize(cursor, version);
}
}
}
impl FromCursor for ColTypeOption {
fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<ColTypeOption> {
let id = ColType::from_cursor(cursor, version)?;
let value = match id {
ColType::Custom => Some(ColTypeOptionValue::CString(
from_cursor_str(cursor)?.to_string(),
)),
ColType::Set => {
let col_type = ColTypeOption::from_cursor(cursor, version)?;
Some(ColTypeOptionValue::CSet(Box::new(col_type)))
}
ColType::List => {
let col_type = ColTypeOption::from_cursor(cursor, version)?;
Some(ColTypeOptionValue::CList(Box::new(col_type)))
}
ColType::Udt => Some(ColTypeOptionValue::UdtType(CUdt::from_cursor(
cursor, version,
)?)),
ColType::Tuple => Some(ColTypeOptionValue::TupleType(CTuple::from_cursor(
cursor, version,
)?)),
ColType::Map => {
let name_type = ColTypeOption::from_cursor(cursor, version)?;
let value_type = ColTypeOption::from_cursor(cursor, version)?;
Some(ColTypeOptionValue::CMap(
Box::new(name_type),
Box::new(value_type),
))
}
_ => None,
};
Ok(ColTypeOption { id, value })
}
}
#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
#[non_exhaustive]
pub enum ColTypeOptionValue {
CString(String),
ColType(ColType),
CSet(Box<ColTypeOption>),
CList(Box<ColTypeOption>),
UdtType(CUdt),
TupleType(CTuple),
CMap(Box<ColTypeOption>, Box<ColTypeOption>),
}
impl Serialize for ColTypeOptionValue {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
match self {
Self::CString(c) => serialize_str(cursor, c, version),
Self::ColType(c) => c.serialize(cursor, version),
Self::CSet(c) => c.serialize(cursor, version),
Self::CList(c) => c.serialize(cursor, version),
Self::UdtType(c) => c.serialize(cursor, version),
Self::TupleType(c) => c.serialize(cursor, version),
Self::CMap(v1, v2) => {
v1.serialize(cursor, version);
v2.serialize(cursor, version);
}
}
}
}
#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
pub struct CUdt {
pub ks: String,
pub udt_name: String,
pub descriptions: Vec<(String, ColTypeOption)>,
}
impl Serialize for CUdt {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
serialize_str(cursor, &self.ks, version);
serialize_str(cursor, &self.udt_name, version);
(self.descriptions.len() as i16).serialize(cursor, version);
self.descriptions.iter().for_each(|(name, col_type)| {
serialize_str(cursor, name, version);
col_type.serialize(cursor, version);
});
}
}
impl FromCursor for CUdt {
fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<CUdt> {
let ks = from_cursor_str(cursor)?.to_string();
let udt_name = from_cursor_str(cursor)?.to_string();
let mut buff = [0; SHORT_LEN];
cursor.read_exact(&mut buff)?;
let n = i16::from_be_bytes(buff);
let mut descriptions = Vec::with_capacity(n as usize);
for _ in 0..n {
let name = from_cursor_str(cursor)?.to_string();
let col_type = ColTypeOption::from_cursor(cursor, version)?;
descriptions.push((name, col_type));
}
Ok(CUdt {
ks,
udt_name,
descriptions,
})
}
}
#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
pub struct CTuple {
pub types: Vec<ColTypeOption>,
}
impl Serialize for CTuple {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
(self.types.len() as i16).serialize(cursor, version);
self.types.iter().for_each(|f| f.serialize(cursor, version));
}
}
impl FromCursor for CTuple {
fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<CTuple> {
let mut buff = [0; SHORT_LEN];
cursor.read_exact(&mut buff)?;
let n = i16::from_be_bytes(buff);
let mut types = Vec::with_capacity(n as usize);
for _ in 0..n {
let col_type = ColTypeOption::from_cursor(cursor, version)?;
types.push(col_type);
}
Ok(CTuple { types })
}
}
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
pub struct BodyResResultPrepared {
pub id: CBytesShort,
pub result_metadata_id: Option<CBytesShort>,
pub metadata: PreparedMetadata,
pub result_metadata: RowsMetadata,
}
impl Serialize for BodyResResultPrepared {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
self.id.serialize(cursor, version);
if let Some(result_metadata_id) = &self.result_metadata_id {
result_metadata_id.serialize(cursor, version);
}
self.metadata.serialize(cursor, version);
self.result_metadata.serialize(cursor, version);
}
}
impl BodyResResultPrepared {
fn from_cursor(
cursor: &mut Cursor<&[u8]>,
version: Version,
) -> error::Result<BodyResResultPrepared> {
let id = CBytesShort::from_cursor(cursor, version)?;
let result_metadata_id = if version == Version::V5 {
Some(CBytesShort::from_cursor(cursor, version)?)
} else {
None
};
let metadata = PreparedMetadata::from_cursor(cursor, version)?;
let result_metadata = RowsMetadata::from_cursor(cursor, version)?;
Ok(BodyResResultPrepared {
id,
result_metadata_id,
metadata,
result_metadata,
})
}
}
bitflags! {
pub struct PreparedMetadataFlags: i32 {
const GLOBAL_TABLE_SPACE = 0x0001;
}
}
impl Serialize for PreparedMetadataFlags {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
self.bits().serialize(cursor, version);
}
}
#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
pub struct PreparedMetadata {
pub pk_indexes: Vec<i16>,
pub global_table_spec: Option<TableSpec>,
pub col_specs: Vec<ColSpec>,
}
impl Serialize for PreparedMetadata {
#[inline]
fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
if self.global_table_spec.is_some() {
PreparedMetadataFlags::GLOBAL_TABLE_SPACE
} else {
PreparedMetadataFlags::empty()
}
.serialize(cursor, version);
let columns_count = self.col_specs.len() as i32;
columns_count.serialize(cursor, version);
let pk_count = self.pk_indexes.len() as i32;
pk_count.serialize(cursor, version);
self.pk_indexes
.iter()
.for_each(|f| f.serialize(cursor, version));
if let Some(global_table_spec) = &self.global_table_spec {
global_table_spec.serialize(cursor, version);
}
self.col_specs
.iter()
.for_each(|x| x.serialize(cursor, version));
}
}
impl PreparedMetadata {
fn from_cursor(
cursor: &mut Cursor<&[u8]>,
version: Version,
) -> error::Result<PreparedMetadata> {
let flags = PreparedMetadataFlags::from_bits_truncate(CInt::from_cursor(cursor, version)?);
let columns_count = CInt::from_cursor(cursor, version)?;
let pk_count = if let Version::V3 = version {
0
} else {
CInt::from_cursor(cursor, version)?
};
let pk_indexes = (0..pk_count)
.map(|_| {
let mut buff = [0; SHORT_LEN];
cursor.read_exact(&mut buff)?;
Ok(i16::from_be_bytes(buff))
})
.collect::<Result<Vec<i16>, IoError>>()?;
let has_global_table_space = flags.contains(PreparedMetadataFlags::GLOBAL_TABLE_SPACE);
let global_table_spec =
extract_global_table_space(cursor, has_global_table_space, version)?;
let col_specs =
ColSpec::parse_colspecs(cursor, columns_count, has_global_table_space, version)?;
Ok(PreparedMetadata {
pk_indexes,
global_table_spec,
col_specs,
})
}
}
fn extract_global_table_space(
cursor: &mut Cursor<&[u8]>,
has_global_table_space: bool,
version: Version,
) -> error::Result<Option<TableSpec>> {
Ok(if has_global_table_space {
Some(TableSpec::from_cursor(cursor, version)?)
} else {
None
})
}
#[cfg(test)]
fn test_encode_decode(bytes: &[u8], expected: ResResultBody) {
{
let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
let result = ResResultBody::from_cursor(&mut cursor, Version::V4).unwrap();
assert_eq!(expected, result);
}
{
let mut buffer = Vec::new();
let mut cursor = Cursor::new(&mut buffer);
expected.serialize(&mut cursor, Version::V4);
assert_eq!(buffer, bytes);
}
}
#[cfg(test)]
mod cudt {
use super::*;
#[test]
fn cudt() {
let bytes = &[
0, 3, 98, 97, 114, 0, 3, 102, 111, 111, 0, 2, 0, 3, 98, 97, 114, 0, 9, 0, 3, 102, 111, 111, 0, 9, ];
let expected = CUdt {
ks: "bar".into(),
udt_name: "foo".into(),
descriptions: vec![
(
"bar".into(),
ColTypeOption {
id: ColType::Int,
value: None,
},
),
(
"foo".into(),
ColTypeOption {
id: ColType::Int,
value: None,
},
),
],
};
{
let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
let udt = CUdt::from_cursor(&mut cursor, Version::V4).unwrap();
assert_eq!(udt, expected);
}
{
let mut buffer = Vec::new();
let mut cursor = Cursor::new(&mut buffer);
expected.serialize(&mut cursor, Version::V4);
assert_eq!(buffer, bytes);
}
}
}
#[cfg(test)]
mod ctuple {
use super::*;
#[test]
fn ctuple() {
let bytes = &[0, 3, 0, 9, 0, 9, 0, 9];
let expected = CTuple {
types: vec![
ColTypeOption {
id: ColType::Int,
value: None,
};
3
],
};
{
let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
let tuple = CTuple::from_cursor(&mut cursor, Version::V4).unwrap();
assert_eq!(tuple, expected);
}
{
let mut buffer = Vec::new();
let mut cursor = Cursor::new(&mut buffer);
expected.serialize(&mut cursor, Version::V4);
assert_eq!(buffer, bytes);
}
}
}
#[cfg(test)]
mod col_spec {
use super::*;
#[test]
fn col_spec_with_table_spec() {
let bytes = &[
0, 3, 98, 97, 114, 0, 3, 102, 111, 111, 0, 3, 102, 111, 111, 0, 9, ];
let expected = vec![ColSpec {
table_spec: Some(TableSpec {
ks_name: "bar".into(),
table_name: "foo".into(),
}),
name: "foo".into(),
col_type: ColTypeOption {
id: ColType::Int,
value: None,
},
}];
{
let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
let col_spec = ColSpec::parse_colspecs(&mut cursor, 1, false, Version::V4).unwrap();
assert_eq!(col_spec, expected);
}
{
let mut buffer = Vec::new();
let mut cursor = Cursor::new(&mut buffer);
expected[0].serialize(&mut cursor, Version::V4);
assert_eq!(buffer, bytes);
}
}
#[test]
fn col_spec_without_table_spec() {
let bytes = &[
0, 3, 102, 111, 111, 0, 9, ];
let expected = vec![ColSpec {
table_spec: None,
name: "foo".into(),
col_type: ColTypeOption {
id: ColType::Int,
value: None,
},
}];
{
let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
let col_spec = ColSpec::parse_colspecs(&mut cursor, 1, true, Version::V4).unwrap();
assert_eq!(col_spec, expected);
}
{
let mut buffer = Vec::new();
let mut cursor = Cursor::new(&mut buffer);
expected[0].serialize(&mut cursor, Version::V4);
assert_eq!(buffer, bytes);
}
}
}
#[cfg(test)]
mod col_type_option {
use super::*;
#[test]
fn col_type_options_int() {
let bytes = &[0, 9];
let expected = ColTypeOption {
id: ColType::Int,
value: None,
};
{
let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
let col_type_option = ColTypeOption::from_cursor(&mut cursor, Version::V4).unwrap();
assert_eq!(col_type_option, expected);
}
{
let mut buffer = Vec::new();
let mut cursor = Cursor::new(&mut buffer);
expected.serialize(&mut cursor, Version::V4);
assert_eq!(buffer, bytes);
}
}
#[test]
fn col_type_options_map() {
let bytes = &[0, 33, 0, 9, 0, 9];
let expected = ColTypeOption {
id: ColType::Map,
value: Some(ColTypeOptionValue::CMap(
Box::new(ColTypeOption {
id: ColType::Int,
value: None,
}),
Box::new(ColTypeOption {
id: ColType::Int,
value: None,
}),
)),
};
{
let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
let col_type_option = ColTypeOption::from_cursor(&mut cursor, Version::V4).unwrap();
assert_eq!(col_type_option, expected);
}
{
let mut buffer = Vec::new();
let mut cursor = Cursor::new(&mut buffer);
expected.serialize(&mut cursor, Version::V4);
assert_eq!(buffer, bytes);
}
}
}
#[cfg(test)]
mod table_spec {
use super::*;
#[test]
fn table_spec() {
let bytes = &[
0, 3, 98, 97, 114, 0, 3, 102, 111, 111, ];
let expected = TableSpec {
ks_name: "bar".into(),
table_name: "foo".into(),
};
{
let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
let table_spec = TableSpec::from_cursor(&mut cursor, Version::V4).unwrap();
assert_eq!(table_spec, expected);
}
{
let mut buffer = Vec::new();
let mut cursor = Cursor::new(&mut buffer);
expected.serialize(&mut cursor, Version::V4);
assert_eq!(buffer, bytes);
}
}
}
#[cfg(test)]
mod void {
use super::*;
#[test]
fn test_void() {
let bytes = &[0, 0, 0, 1];
let expected = ResResultBody::Void;
test_encode_decode(bytes, expected);
}
}
#[cfg(test)]
mod rows_metadata {
use super::*;
#[test]
fn rows_metadata() {
let bytes = &[
0, 0, 0, 8, 0, 0, 0, 2, 0, 1, 1, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 102, 111, 111, 0, 9, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 98, 97, 114, 0, 19, ];
let expected = RowsMetadata {
flags: RowsMetadataFlags::METADATA_CHANGED,
columns_count: 2,
paging_state: None,
new_metadata_id: Some(CBytesShort::new(vec![1])),
global_table_spec: None,
col_specs: vec![
ColSpec {
table_spec: Some(TableSpec {
ks_name: "ksname1".into(),
table_name: "tablename".into(),
}),
name: "foo".into(),
col_type: ColTypeOption {
id: ColType::Int,
value: None,
},
},
ColSpec {
table_spec: Some(TableSpec {
ks_name: "ksname1".into(),
table_name: "tablename".into(),
}),
name: "bar".into(),
col_type: ColTypeOption {
id: ColType::Smallint,
value: None,
},
},
],
};
{
let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
let metadata = RowsMetadata::from_cursor(&mut cursor, Version::V4).unwrap();
assert_eq!(metadata, expected);
}
{
let mut buffer = Vec::new();
let mut cursor = Cursor::new(&mut buffer);
expected.serialize(&mut cursor, Version::V4);
assert_eq!(buffer, bytes);
}
}
}
#[cfg(test)]
mod rows {
use super::*;
#[test]
fn test_rows() {
let bytes = &[
0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 102, 111, 111, 0, 9, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 98, 97, 114, 0, 19, 0, 0, 0, 0, ];
let expected = ResResultBody::Rows(BodyResResultRows {
metadata: RowsMetadata {
flags: RowsMetadataFlags::empty(),
columns_count: 2,
paging_state: None,
new_metadata_id: None,
global_table_spec: None,
col_specs: vec![
ColSpec {
table_spec: Some(TableSpec {
ks_name: "ksname1".into(),
table_name: "tablename".into(),
}),
name: "foo".into(),
col_type: ColTypeOption {
id: ColType::Int,
value: None,
},
},
ColSpec {
table_spec: Some(TableSpec {
ks_name: "ksname1".into(),
table_name: "tablename".into(),
}),
name: "bar".into(),
col_type: ColTypeOption {
id: ColType::Smallint,
value: None,
},
},
],
},
rows_count: 0,
rows_content: vec![],
protocol_version: Version::V4,
});
test_encode_decode(bytes, expected);
}
#[test]
fn test_rows_no_metadata() {
let bytes = &[
0, 0, 0, 2, 0, 0, 0, 4, 0, 0, 0, 3, 0, 0, 0, 0, ];
let expected = ResResultBody::Rows(BodyResResultRows {
metadata: RowsMetadata {
flags: RowsMetadataFlags::NO_METADATA,
columns_count: 3,
paging_state: None,
new_metadata_id: None,
global_table_spec: None,
col_specs: vec![],
},
rows_count: 0,
rows_content: vec![],
protocol_version: Version::V4,
});
test_encode_decode(bytes, expected);
}
}
#[cfg(test)]
mod keyspace {
use super::*;
#[test]
fn test_set_keyspace() {
let bytes = &[
0, 0, 0, 3, 0, 4, 98, 108, 97, 104, ];
let expected = ResResultBody::SetKeyspace(BodyResResultSetKeyspace {
body: "blah".into(),
});
test_encode_decode(bytes, expected);
}
}
#[cfg(test)]
mod prepared_metadata {
use super::*;
#[test]
fn prepared_metadata() {
let bytes = &[
0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 102, 111, 111, 0, 9, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 98, 97, 114, 0, 19, ];
let expected = PreparedMetadata {
pk_indexes: vec![0],
global_table_spec: None,
col_specs: vec![
ColSpec {
table_spec: Some(TableSpec {
ks_name: "ksname1".into(),
table_name: "tablename".into(),
}),
name: "foo".into(),
col_type: ColTypeOption {
id: ColType::Int,
value: None,
},
},
ColSpec {
table_spec: Some(TableSpec {
ks_name: "ksname1".into(),
table_name: "tablename".into(),
}),
name: "bar".into(),
col_type: ColTypeOption {
id: ColType::Smallint,
value: None,
},
},
],
};
{
let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
let metadata = PreparedMetadata::from_cursor(&mut cursor, Version::V4).unwrap();
assert_eq!(metadata, expected);
}
{
let mut buffer = Vec::new();
let mut cursor = Cursor::new(&mut buffer);
expected.serialize(&mut cursor, Version::V4);
assert_eq!(buffer, bytes);
}
}
}
#[cfg(test)]
mod prepared {
use super::*;
use crate::types::{to_short, CBytesShort};
#[test]
fn test_prepared() {
let bytes = &[
0, 0, 0, 4, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 102, 111, 111, 0, 9, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 98, 97, 114, 0, 19, 0, 0, 0, 0, 0, 0, 0, 2, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 102, 111, 111, 0, 9, 0, 7, 107, 115, 110, 97, 109, 101, 49, 0, 9, 116, 97, 98, 108, 101, 110, 97, 109, 101, 0, 3, 98, 97, 114, 0, 19, ];
let expected = ResResultBody::Prepared(BodyResResultPrepared {
id: CBytesShort::new(to_short(1)),
result_metadata_id: None,
metadata: PreparedMetadata {
pk_indexes: vec![0],
global_table_spec: None,
col_specs: vec![
ColSpec {
table_spec: Some(TableSpec {
ks_name: "ksname1".into(),
table_name: "tablename".into(),
}),
name: "foo".into(),
col_type: ColTypeOption {
id: ColType::Int,
value: None,
},
},
ColSpec {
table_spec: Some(TableSpec {
ks_name: "ksname1".into(),
table_name: "tablename".into(),
}),
name: "bar".into(),
col_type: ColTypeOption {
id: ColType::Smallint,
value: None,
},
},
],
},
result_metadata: RowsMetadata {
flags: RowsMetadataFlags::empty(),
columns_count: 2,
paging_state: None,
new_metadata_id: None,
global_table_spec: None,
col_specs: vec![
ColSpec {
table_spec: Some(TableSpec {
ks_name: "ksname1".into(),
table_name: "tablename".into(),
}),
name: "foo".into(),
col_type: ColTypeOption {
id: ColType::Int,
value: None,
},
},
ColSpec {
table_spec: Some(TableSpec {
table_name: "tablename".into(),
ks_name: "ksname1".into(),
}),
name: "bar".into(),
col_type: ColTypeOption {
id: ColType::Smallint,
value: None,
},
},
],
},
});
test_encode_decode(bytes, expected);
}
}
#[cfg(test)]
mod schema_change {
use super::*;
use crate::frame::events::{SchemaChangeOptions, SchemaChangeTarget, SchemaChangeType};
#[test]
fn test_schema_change() {
let bytes = &[
0, 0, 0, 5, 0, 7, 67, 82, 69, 65, 84, 69, 68, 0, 8, 75, 69, 89, 83, 80, 65, 67, 69, 0, 4, 98, 108, 97, 104, ];
let expected = ResResultBody::SchemaChange(SchemaChange {
change_type: SchemaChangeType::Created,
target: SchemaChangeTarget::Keyspace,
options: SchemaChangeOptions::Keyspace("blah".into()),
});
test_encode_decode(bytes, expected);
}
}