use super::{
batchflags::*,
consistency::Consistency,
encoder::{ColumnEncoder, BE_8_BYTES_LEN, BE_NULL_BYTES_LEN, BE_UNSET_BYTES_LEN},
opcode::BATCH,
Statements, Values, MD5_BE_LENGTH,
};
use crate::cql::compression::{Compression, MyCompression};
const BATCH_HEADER: &'static [u8] = &[4, 0, 0, 0, BATCH, 0, 0, 0, 0];
pub struct Batch(pub Vec<u8>);
#[repr(u8)]
pub enum BatchTypes {
Logged = 0,
Unlogged = 1,
Counter = 2,
}
pub struct BatchBuilder<Type: Copy + Into<u8>, Stage> {
buffer: Vec<u8>,
query_count: u16,
batch_type: Type,
stage: Stage,
}
pub struct BatchHeader;
pub struct BatchType;
#[derive(Copy, Clone)]
pub struct BatchTypeUnset;
impl Into<u8> for BatchTypeUnset {
fn into(self) -> u8 {
panic!("Batch type is not set!")
}
}
#[derive(Copy, Clone)]
pub struct BatchTypeLogged;
impl Into<u8> for BatchTypeLogged {
fn into(self) -> u8 {
0
}
}
#[derive(Copy, Clone)]
pub struct BatchTypeUnlogged;
impl Into<u8> for BatchTypeUnlogged {
fn into(self) -> u8 {
1
}
}
#[derive(Copy, Clone)]
pub struct BatchTypeCounter;
impl Into<u8> for BatchTypeCounter {
fn into(self) -> u8 {
2
}
}
pub struct BatchStatementOrId;
pub struct BatchValues {
value_count: u16,
index: usize,
}
pub struct BatchFlags;
pub struct BatchTimestamp;
pub struct BatchBuild;
impl BatchBuilder<BatchTypeUnset, BatchHeader> {
pub fn new() -> BatchBuilder<BatchTypeUnset, BatchType> {
let mut buffer: Vec<u8> = Vec::new();
buffer.extend_from_slice(&BATCH_HEADER);
BatchBuilder {
buffer,
query_count: 0,
batch_type: BatchTypeUnset,
stage: BatchType,
}
}
pub fn with_capacity(capacity: usize) -> BatchBuilder<BatchTypeUnset, BatchType> {
let mut buffer: Vec<u8> = Vec::with_capacity(capacity);
buffer.extend_from_slice(&BATCH_HEADER);
BatchBuilder {
buffer,
query_count: 0,
batch_type: BatchTypeUnset,
stage: BatchType,
}
}
}
impl BatchBuilder<BatchTypeUnset, BatchType> {
pub fn batch_type<Type: Copy + Into<u8>>(mut self, batch_type: Type) -> BatchBuilder<Type, BatchStatementOrId> {
self.buffer.extend(&[batch_type.into(), 0, 0]);
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type,
stage: BatchStatementOrId,
}
}
pub fn logged(mut self) -> BatchBuilder<BatchTypeLogged, BatchStatementOrId> {
self.buffer.extend(&[0, 0, 0]);
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type: BatchTypeLogged,
stage: BatchStatementOrId,
}
}
pub fn unlogged(mut self) -> BatchBuilder<BatchTypeUnlogged, BatchStatementOrId> {
self.buffer.extend(&[1, 0, 0]);
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type: BatchTypeUnlogged,
stage: BatchStatementOrId,
}
}
pub fn counter(mut self) -> BatchBuilder<BatchTypeCounter, BatchStatementOrId> {
self.buffer.extend(&[2, 0, 0]);
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type: BatchTypeCounter,
stage: BatchStatementOrId,
}
}
}
impl<Type: Copy + Into<u8>> Statements for BatchBuilder<Type, BatchStatementOrId> {
type Return = BatchBuilder<Type, BatchValues>;
fn statement(mut self, statement: &str) -> Self::Return {
self.buffer.push(0);
self.buffer.extend(&i32::to_be_bytes(statement.len() as i32));
self.buffer.extend(statement.bytes());
self.query_count += 1;
let index = self.buffer.len();
self.buffer.extend(&[0, 0]);
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type: self.batch_type,
stage: BatchValues { value_count: 0, index },
}
}
fn id(mut self, id: &[u8; 16]) -> Self::Return {
self.buffer.push(1);
self.buffer.extend(&MD5_BE_LENGTH);
self.buffer.extend(id);
self.query_count += 1;
let index = self.buffer.len();
self.buffer.extend(&[0, 0]);
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type: self.batch_type,
stage: BatchValues { value_count: 0, index },
}
}
}
impl<Type: Copy + Into<u8>> Values for BatchBuilder<Type, BatchValues> {
type Return = BatchBuilder<Type, BatchValues>;
fn value<V: ColumnEncoder>(mut self, value: &V) -> Self {
value.encode(&mut self.buffer);
self.stage.value_count += 1;
self
}
fn unset_value(mut self) -> Self {
self.buffer.extend(&BE_UNSET_BYTES_LEN);
self.stage.value_count += 1;
self
}
fn null_value(mut self) -> Self {
self.buffer.extend(&BE_NULL_BYTES_LEN);
self.stage.value_count += 1;
self
}
}
impl<Type: Copy + Into<u8>> Statements for BatchBuilder<Type, BatchValues> {
type Return = Self;
fn statement(mut self, statement: &str) -> BatchBuilder<Type, BatchValues> {
self.buffer[self.stage.index..(self.stage.index + 2)]
.copy_from_slice(&u16::to_be_bytes(self.stage.value_count));
self.buffer.push(0);
self.buffer.extend(&i32::to_be_bytes(statement.len() as i32));
self.buffer.extend(statement.bytes());
self.query_count += 1;
self.buffer.extend(&[0, 0]);
let index = self.buffer.len();
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type: self.batch_type,
stage: BatchValues { value_count: 0, index },
}
}
fn id(mut self, id: &[u8; 16]) -> BatchBuilder<Type, BatchValues> {
self.buffer[self.stage.index..(self.stage.index + 2)]
.copy_from_slice(&u16::to_be_bytes(self.stage.value_count));
self.buffer.push(1);
self.buffer.extend(&MD5_BE_LENGTH);
self.buffer.extend(id);
self.query_count += 1;
self.buffer.extend(&[0, 0]);
let index = self.buffer.len();
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type: self.batch_type,
stage: BatchValues { value_count: 0, index },
}
}
}
impl<Type: Copy + Into<u8>> BatchBuilder<Type, BatchValues> {
pub fn consistency(mut self, consistency: Consistency) -> BatchBuilder<Type, BatchFlags> {
self.buffer[self.stage.index..(self.stage.index + 2)]
.copy_from_slice(&u16::to_be_bytes(self.stage.value_count));
self.buffer.extend(&u16::to_be_bytes(consistency as u16));
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type: self.batch_type,
stage: BatchFlags,
}
}
}
impl<Type: Copy + Into<u8>> BatchBuilder<Type, BatchFlags> {
pub fn serial_consistency(mut self, consistency: Consistency) -> BatchBuilder<Type, BatchTimestamp> {
self.buffer.push(SERIAL_CONSISTENCY);
self.buffer.extend(&u16::to_be_bytes(consistency as u16));
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type: self.batch_type,
stage: BatchTimestamp,
}
}
pub fn timestamp(mut self, timestamp: i64) -> BatchBuilder<Type, BatchBuild> {
self.buffer.push(TIMESTAMP);
self.buffer.extend(&BE_8_BYTES_LEN);
self.buffer.extend(&i64::to_be_bytes(timestamp));
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type: self.batch_type,
stage: BatchBuild,
}
}
pub fn build(mut self) -> anyhow::Result<Batch> {
self.buffer[1] |= MyCompression::flag();
self.buffer.push(NOFLAGS);
self.buffer[10..12].copy_from_slice(&u16::to_be_bytes(self.query_count));
self.buffer = MyCompression::get().compress(self.buffer)?;
Ok(Batch(self.buffer))
}
}
impl<Type: Copy + Into<u8>> BatchBuilder<Type, BatchTimestamp> {
pub fn timestamp(mut self, timestamp: i64) -> BatchBuilder<Type, BatchBuild> {
self.buffer.last_mut().map(|last_byte| *last_byte |= TIMESTAMP);
self.buffer.extend(&BE_8_BYTES_LEN);
self.buffer.extend(&i64::to_be_bytes(timestamp));
BatchBuilder {
buffer: self.buffer,
query_count: self.query_count,
batch_type: self.batch_type,
stage: BatchBuild,
}
}
pub fn build(mut self) -> anyhow::Result<Batch> {
self.buffer[1] |= MyCompression::flag();
self.buffer[10..12].copy_from_slice(&u16::to_be_bytes(self.query_count));
self.buffer = MyCompression::get().compress(self.buffer)?;
Ok(Batch(self.buffer))
}
}
impl<Type: Copy + Into<u8>> BatchBuilder<Type, BatchBuild> {
pub fn build(mut self) -> anyhow::Result<Batch> {
self.buffer[1] |= MyCompression::flag();
self.buffer[10..12].copy_from_slice(&u16::to_be_bytes(self.query_count));
self.buffer = MyCompression::get().compress(self.buffer)?;
Ok(Batch(self.buffer))
}
}
impl Batch {
pub fn new() -> BatchBuilder<BatchTypeUnset, BatchType> {
BatchBuilder::new()
}
pub fn with_capacity(capacity: usize) -> BatchBuilder<BatchTypeUnset, BatchType> {
BatchBuilder::with_capacity(capacity)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn simple_query_builder_test() {
let Batch(_payload) = Batch::new()
.logged()
.statement("INSERT_TX_QUERY")
.value(&"HASH_VALUE")
.value(&"PAYLOAD_VALUE")
.id(&[0; 16])
.value(&"JUNK_VALUE")
.consistency(Consistency::One)
.build()
.unwrap();
}
}