use super::{
consistency::Consistency,
encoder::{ColumnEncoder, BE_8_BYTES_LEN, BE_NULL_BYTES_LEN, BE_UNSET_BYTES_LEN},
opcode::{EXECUTE, QUERY},
queryflags::*,
QueryOrPrepared, Statements, Values,
};
use crate::cql::compression::{Compression, MyCompression};
const QUERY_HEADER: &'static [u8] = &[4, 0, 0, 0, QUERY, 0, 0, 0, 0];
pub struct QueryBuilder<Stage> {
buffer: Vec<u8>,
stage: Stage,
}
pub struct QueryHeader;
pub struct QueryStatement;
pub struct PreparedStatement;
pub struct QueryConsistency;
pub struct QueryFlags {
index: usize,
}
pub struct QueryValues {
query_flags: QueryFlags,
value_count: u16,
}
pub struct QueryPagingState {
query_flags: QueryFlags,
}
pub struct QuerySerialConsistency {
query_flags: QueryFlags,
}
pub struct QueryTimestamp {
query_flags: QueryFlags,
}
pub struct QueryBuild;
impl QueryBuilder<QueryHeader> {
pub fn new() -> QueryBuilder<QueryStatement> {
let mut buffer: Vec<u8> = Vec::new();
buffer.extend_from_slice(&QUERY_HEADER);
QueryBuilder::<QueryStatement> {
buffer,
stage: QueryStatement,
}
}
pub fn with_capacity(capacity: usize) -> QueryBuilder<QueryStatement> {
let mut buffer: Vec<u8> = Vec::with_capacity(capacity);
buffer.extend_from_slice(&QUERY_HEADER);
QueryBuilder::<QueryStatement> {
buffer,
stage: QueryStatement,
}
}
}
impl QueryOrPrepared for QueryStatement {
fn encode_statement<T: Statements>(query_or_batch: T, statement: &str) -> T::Return {
query_or_batch.statement(statement)
}
fn is_prepared() -> bool {
false
}
}
impl QueryOrPrepared for PreparedStatement {
fn encode_statement<T: Statements>(query_or_batch: T, statement: &str) -> T::Return {
query_or_batch.id(&md5::compute(statement.as_bytes()).into())
}
fn is_prepared() -> bool {
true
}
}
impl<T: QueryOrPrepared> Statements for QueryBuilder<T> {
type Return = QueryBuilder<QueryConsistency>;
fn statement(mut self, statement: &str) -> Self::Return {
self.buffer.extend(&i32::to_be_bytes(statement.len() as i32));
self.buffer.extend(statement.as_bytes());
QueryBuilder::<QueryConsistency> {
buffer: self.buffer,
stage: QueryConsistency,
}
}
fn id(mut self, id: &[u8; 16]) -> Self::Return {
self.buffer[4] = EXECUTE;
self.buffer.extend(&super::MD5_BE_LENGTH);
self.buffer.extend(id);
QueryBuilder::<QueryConsistency> {
buffer: self.buffer,
stage: QueryConsistency,
}
}
}
impl QueryBuilder<QueryConsistency> {
pub fn consistency(mut self, consistency: Consistency) -> QueryBuilder<QueryFlags> {
self.buffer.extend(&u16::to_be_bytes(consistency as u16));
let query_flag_index = self.buffer.len();
QueryBuilder::<QueryFlags> {
buffer: self.buffer,
stage: QueryFlags {
index: query_flag_index,
},
}
}
}
impl Values for QueryBuilder<QueryFlags> {
type Return = QueryBuilder<QueryValues>;
fn null_value(mut self) -> QueryBuilder<QueryValues> {
self.buffer.push(SKIP_METADATA | VALUES);
let value_count = 1;
self.buffer.extend(&u16::to_be_bytes(value_count));
self.buffer.extend(&BE_NULL_BYTES_LEN);
let query_values = QueryValues {
query_flags: self.stage,
value_count,
};
QueryBuilder::<QueryValues> {
buffer: self.buffer,
stage: query_values,
}
}
fn unset_value(mut self) -> QueryBuilder<QueryValues> {
self.buffer.push(SKIP_METADATA | VALUES);
let value_count = 1;
self.buffer.extend(&u16::to_be_bytes(value_count));
self.buffer.extend(&BE_UNSET_BYTES_LEN);
let query_values = QueryValues {
query_flags: self.stage,
value_count,
};
QueryBuilder::<QueryValues> {
buffer: self.buffer,
stage: query_values,
}
}
fn value<V: ColumnEncoder>(mut self, value: &V) -> QueryBuilder<QueryValues> {
self.buffer.push(SKIP_METADATA | VALUES);
let value_count = 1;
self.buffer.extend(&u16::to_be_bytes(value_count));
let query_values = QueryValues {
query_flags: self.stage,
value_count,
};
value.encode(&mut self.buffer);
QueryBuilder::<QueryValues> {
buffer: self.buffer,
stage: query_values,
}
}
}
impl QueryBuilder<QueryFlags> {
pub fn page_size(mut self, page_size: i32) -> QueryBuilder<QueryPagingState> {
self.buffer.push(SKIP_METADATA | PAGE_SIZE);
self.buffer.extend(&i32::to_be_bytes(page_size));
let query_paging_state = QueryPagingState {
query_flags: self.stage,
};
QueryBuilder::<QueryPagingState> {
buffer: self.buffer,
stage: query_paging_state,
}
}
pub fn paging_state(mut self, paging_state: &Option<Vec<u8>>) -> QueryBuilder<QuerySerialConsistency> {
if let Some(paging_state) = paging_state {
self.buffer.push(SKIP_METADATA | PAGING_STATE);
self.buffer.extend(&i32::to_be_bytes(paging_state.len() as i32));
self.buffer.extend(paging_state);
} else {
self.buffer.push(SKIP_METADATA);
}
let query_serial_consistency = QuerySerialConsistency {
query_flags: self.stage,
};
QueryBuilder::<QuerySerialConsistency> {
buffer: self.buffer,
stage: query_serial_consistency,
}
}
pub fn serial_consistency(mut self, consistency: Consistency) -> QueryBuilder<QueryTimestamp> {
self.buffer.push(SKIP_METADATA | SERIAL_CONSISTENCY);
self.buffer.extend(&u16::to_be_bytes(consistency as u16));
let query_timestamp = QueryTimestamp {
query_flags: self.stage,
};
QueryBuilder::<QueryTimestamp> {
buffer: self.buffer,
stage: query_timestamp,
}
}
pub fn timestamp(mut self, timestamp: i64) -> QueryBuilder<QueryBuild> {
self.buffer.push(SKIP_METADATA | TIMESTAMP);
self.buffer.extend(&BE_8_BYTES_LEN);
self.buffer.extend(&i64::to_be_bytes(timestamp));
let query_build = QueryBuild;
QueryBuilder::<QueryBuild> {
buffer: self.buffer,
stage: query_build,
}
}
pub fn build(mut self) -> anyhow::Result<Query> {
self.buffer[1] |= MyCompression::flag();
self.buffer.push(SKIP_METADATA);
self.buffer = MyCompression::get().compress(self.buffer)?;
Ok(Query(self.buffer))
}
}
impl Values for QueryBuilder<QueryValues> {
type Return = QueryBuilder<QueryValues>;
fn value<V: ColumnEncoder>(mut self, value: &V) -> Self {
self.stage.value_count += 1;
value.encode(&mut self.buffer);
self
}
fn unset_value(mut self) -> Self {
self.stage.value_count += 1;
self.buffer.extend(&BE_UNSET_BYTES_LEN);
self
}
fn null_value(mut self) -> Self {
self.stage.value_count += 1;
self.buffer.extend(&BE_NULL_BYTES_LEN);
self
}
}
impl QueryBuilder<QueryValues> {
pub fn page_size(mut self, page_size: i32) -> QueryBuilder<QueryPagingState> {
self.buffer[self.stage.query_flags.index] |= PAGE_SIZE;
self.buffer.extend(&i32::to_be_bytes(page_size));
let start = self.stage.query_flags.index + 1;
let end = start + 2;
self.buffer[start..end].copy_from_slice(&self.stage.value_count.to_be_bytes());
let query_page_size = QueryPagingState {
query_flags: self.stage.query_flags,
};
QueryBuilder::<QueryPagingState> {
buffer: self.buffer,
stage: query_page_size,
}
}
pub fn paging_state(mut self, paging_state: &Option<Vec<u8>>) -> QueryBuilder<QuerySerialConsistency> {
if let Some(paging_state) = paging_state {
self.buffer[self.stage.query_flags.index] |= PAGING_STATE;
self.buffer.extend(&i32::to_be_bytes(paging_state.len() as i32));
self.buffer.extend(paging_state);
}
let start = self.stage.query_flags.index + 1;
let end = start + 2;
self.buffer[start..end].copy_from_slice(&self.stage.value_count.to_be_bytes());
let query_serial_consistency = QuerySerialConsistency {
query_flags: self.stage.query_flags,
};
QueryBuilder::<QuerySerialConsistency> {
buffer: self.buffer,
stage: query_serial_consistency,
}
}
pub fn serial_consistency(mut self, consistency: Consistency) -> QueryBuilder<QueryTimestamp> {
self.buffer[self.stage.query_flags.index] |= SERIAL_CONSISTENCY;
self.buffer.extend(&u16::to_be_bytes(consistency as u16));
let start = self.stage.query_flags.index + 1;
let end = start + 2;
self.buffer[start..end].copy_from_slice(&self.stage.value_count.to_be_bytes());
let query_timestamp = QueryTimestamp {
query_flags: self.stage.query_flags,
};
QueryBuilder::<QueryTimestamp> {
buffer: self.buffer,
stage: query_timestamp,
}
}
pub fn timestamp(mut self, timestamp: i64) -> QueryBuilder<QueryBuild> {
self.buffer[self.stage.query_flags.index] |= TIMESTAMP;
self.buffer.extend(&BE_8_BYTES_LEN);
self.buffer.extend(&i64::to_be_bytes(timestamp));
let start = self.stage.query_flags.index + 1;
let end = start + 2;
self.buffer[start..end].copy_from_slice(&self.stage.value_count.to_be_bytes());
let query_build = QueryBuild;
QueryBuilder::<QueryBuild> {
buffer: self.buffer,
stage: query_build,
}
}
pub fn build(mut self) -> anyhow::Result<Query> {
self.buffer[1] |= MyCompression::flag();
let start = self.stage.query_flags.index + 1;
let end = start + 2;
self.buffer[start..end].copy_from_slice(&self.stage.value_count.to_be_bytes());
self.buffer = MyCompression::get().compress(self.buffer)?;
Ok(Query(self.buffer))
}
}
impl QueryBuilder<QueryPagingState> {
pub fn paging_state(mut self, paging_state: &Option<Vec<u8>>) -> QueryBuilder<QuerySerialConsistency> {
if let Some(paging_state) = paging_state {
self.buffer[self.stage.query_flags.index] |= PAGING_STATE;
self.buffer.extend(&i32::to_be_bytes(paging_state.len() as i32));
self.buffer.extend(paging_state);
}
let query_serial_consistency = QuerySerialConsistency {
query_flags: self.stage.query_flags,
};
QueryBuilder::<QuerySerialConsistency> {
buffer: self.buffer,
stage: query_serial_consistency,
}
}
pub fn serial_consistency(mut self, consistency: Consistency) -> QueryBuilder<QueryTimestamp> {
self.buffer[self.stage.query_flags.index] |= SERIAL_CONSISTENCY;
self.buffer.extend(&u16::to_be_bytes(consistency as u16));
let query_timestamp = QueryTimestamp {
query_flags: self.stage.query_flags,
};
QueryBuilder::<QueryTimestamp> {
buffer: self.buffer,
stage: query_timestamp,
}
}
pub fn timestamp(mut self, timestamp: i64) -> QueryBuilder<QueryBuild> {
self.buffer[self.stage.query_flags.index] |= TIMESTAMP;
self.buffer.extend(&BE_8_BYTES_LEN);
self.buffer.extend(&i64::to_be_bytes(timestamp));
let query_build = QueryBuild;
QueryBuilder::<QueryBuild> {
buffer: self.buffer,
stage: query_build,
}
}
pub fn build(mut self) -> anyhow::Result<Query> {
self.buffer[1] |= MyCompression::flag();
self.buffer = MyCompression::get().compress(self.buffer)?;
Ok(Query(self.buffer))
}
}
impl QueryBuilder<QuerySerialConsistency> {
pub fn serial_consistency(mut self, consistency: Consistency) -> QueryBuilder<QueryTimestamp> {
self.buffer[self.stage.query_flags.index] |= SERIAL_CONSISTENCY;
self.buffer.extend(&u16::to_be_bytes(consistency as u16));
let query_timestamp = QueryTimestamp {
query_flags: self.stage.query_flags,
};
QueryBuilder::<QueryTimestamp> {
buffer: self.buffer,
stage: query_timestamp,
}
}
pub fn timestamp(mut self, timestamp: i64) -> QueryBuilder<QueryBuild> {
self.buffer[self.stage.query_flags.index] |= TIMESTAMP;
self.buffer.extend(&BE_8_BYTES_LEN);
self.buffer.extend(&i64::to_be_bytes(timestamp));
let query_build = QueryBuild;
QueryBuilder::<QueryBuild> {
buffer: self.buffer,
stage: query_build,
}
}
pub fn build(mut self) -> anyhow::Result<Query> {
self.buffer[1] |= MyCompression::flag();
self.buffer = MyCompression::get().compress(self.buffer)?;
Ok(Query(self.buffer))
}
}
impl QueryBuilder<QueryTimestamp> {
pub fn timestamp(mut self, timestamp: i64) -> QueryBuilder<QueryBuild> {
self.buffer[self.stage.query_flags.index] |= TIMESTAMP;
self.buffer.extend(&BE_8_BYTES_LEN);
self.buffer.extend(&i64::to_be_bytes(timestamp));
let query_build = QueryBuild;
QueryBuilder::<QueryBuild> {
buffer: self.buffer,
stage: query_build,
}
}
pub fn build(mut self) -> anyhow::Result<Query> {
self.buffer[1] |= MyCompression::flag();
self.buffer = MyCompression::get().compress(self.buffer)?;
Ok(Query(self.buffer))
}
}
impl QueryBuilder<QueryBuild> {
pub fn build(mut self) -> anyhow::Result<Query> {
self.buffer[1] |= MyCompression::flag();
self.buffer = MyCompression::get().compress(self.buffer)?;
Ok(Query(self.buffer))
}
}
#[derive(Default, Clone)]
pub struct Query(pub Vec<u8>);
impl Query {
pub fn new() -> QueryBuilder<QueryStatement> {
QueryBuilder::<QueryHeader>::new()
}
pub fn with_capacity(capacity: usize) -> QueryBuilder<QueryStatement> {
QueryBuilder::<QueryHeader>::with_capacity(capacity)
}
}
impl Into<Vec<u8>> for Query {
fn into(self) -> Vec<u8> {
self.0
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::{SystemTime, UNIX_EPOCH};
#[test]
fn simple_query_builder_test() {
let Query(_payload) = Query::new()
.statement("INSERT_TX_QUERY")
.consistency(Consistency::One)
.value(&"HASH_VALUE")
.value(&"PAYLOAD_VALUE")
.value(&"ADDRESS_VALUE")
.value::<i64>(&0)
.value(&"OBSOLETE_TAG_VALUE")
.value(&SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs())
.value(&0)
.unset_value()
.build()
.unwrap();
}
}