use std::str;
use std::time::Duration;
use byteorder::{ByteOrder, LittleEndian, NetworkEndian};
use crate::batch::batch_executor::SharedSlice;
use crate::commands::field_type::FieldType;
use crate::errors::Result;
use crate::expressions::FilterExpression;
use crate::msgpack::encoder;
use crate::operations::{Operation, OperationBin, OperationData, OperationType};
use crate::policy::{
BatchPolicy, CommitLevel, ConsistencyLevel, GenerationPolicy, QueryPolicy, ReadPolicy,
RecordExistsAction, ScanPolicy, WritePolicy,
};
use crate::{BatchRead, Bin, Bins, CollectionIndexType, Key, Statement, Value};
const INFO1_READ: u8 = 1;
const INFO1_GET_ALL: u8 = 1 << 1;
const INFO1_BATCH: u8 = 1 << 3;
const INFO1_NOBINDATA: u8 = 1 << 5;
const INFO1_CONSISTENCY_ALL: u8 = 1 << 6;
const INFO2_WRITE: u8 = 1;
const INFO2_DELETE: u8 = 1 << 1;
const INFO2_GENERATION: u8 = 1 << 2;
const INFO2_GENERATION_GT: u8 = 1 << 3;
const INFO2_DURABLE_DELETE: u8 = 1 << 4;
const INFO2_CREATE_ONLY: u8 = 1 << 5;
const INFO2_RESPOND_ALL_OPS: u8 = 1 << 7;
pub const INFO3_LAST: u8 = 1;
const INFO3_COMMIT_MASTER: u8 = 1 << 1;
pub const _INFO3_PARTITION_DONE: u8 = 1 << 2;
const INFO3_UPDATE_ONLY: u8 = 1 << 3;
const INFO3_CREATE_OR_REPLACE: u8 = 1 << 4;
const INFO3_REPLACE_ONLY: u8 = 1 << 5;
pub const MSG_TOTAL_HEADER_SIZE: u8 = 30;
const FIELD_HEADER_SIZE: u8 = 5;
const OPERATION_HEADER_SIZE: u8 = 8;
pub const MSG_REMAINING_HEADER_SIZE: u8 = 22;
const DIGEST_SIZE: u8 = 20;
const CL_MSG_VERSION: u8 = 2;
const AS_MSG_TYPE: u8 = 3;
const MAX_BUFFER_SIZE: usize = 1024 * 1024 + 8;
#[derive(Debug, Default)]
pub struct Buffer {
pub data_buffer: Vec<u8>,
pub data_offset: usize,
pub reclaim_threshold: usize,
}
impl Buffer {
pub fn new(reclaim_threshold: usize) -> Self {
Buffer {
data_buffer: Vec::with_capacity(1024),
data_offset: 0,
reclaim_threshold,
}
}
fn begin(&mut self) -> Result<()> {
self.data_offset = MSG_TOTAL_HEADER_SIZE as usize;
Ok(())
}
pub fn size_buffer(&mut self) -> Result<()> {
let offset = self.data_offset;
self.resize_buffer(offset)
}
pub fn resize_buffer(&mut self, size: usize) -> Result<()> {
if size > MAX_BUFFER_SIZE {
bail!("Invalid size for buffer: {}", size);
}
let mem_size = self.data_buffer.capacity();
self.data_buffer.resize(size, 0);
if mem_size > self.reclaim_threshold && size < mem_size {
self.data_buffer.shrink_to_fit();
}
Ok(())
}
pub fn reset_offset(&mut self) -> Result<()> {
self.data_offset = 0;
Ok(())
}
pub fn end(&mut self) -> Result<()> {
let size = ((self.data_offset - 8) as i64)
| ((i64::from(CL_MSG_VERSION) << 56) as i64)
| (i64::from(AS_MSG_TYPE) << 48);
self.reset_offset()?;
self.write_i64(size)?;
Ok(())
}
pub fn set_write<'b, A: AsRef<Bin<'b>>>(
&mut self,
policy: &WritePolicy,
op_type: OperationType,
key: &Key,
bins: &[A],
) -> Result<()> {
self.begin()?;
let mut field_count = self.estimate_key_size(key, policy.send_key)?;
let filter_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_size > 0 {
field_count += 1;
}
for bin in bins {
self.estimate_operation_size_for_bin(bin.as_ref())?;
}
self.size_buffer()?;
self.write_header_with_policy(
policy,
0,
INFO2_WRITE,
field_count as u16,
bins.len() as u16,
)?;
self.write_key(key, policy.send_key)?;
if let Some(filter) = policy.filter_expression() {
self.write_filter_expression(filter, filter_size)?;
}
for bin in bins {
self.write_operation_for_bin(bin.as_ref(), op_type)?;
}
self.end()
}
pub fn set_delete(&mut self, policy: &WritePolicy, key: &Key) -> Result<()> {
self.begin()?;
let mut field_count = self.estimate_key_size(key, false)?;
let filter_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_size > 0 {
field_count += 1;
}
self.size_buffer()?;
self.write_header_with_policy(
policy,
0,
INFO2_WRITE | INFO2_DELETE,
field_count as u16,
0,
)?;
self.write_key(key, false)?;
if let Some(filter) = policy.filter_expression() {
self.write_filter_expression(filter, filter_size)?;
}
self.end()
}
pub fn set_touch(&mut self, policy: &WritePolicy, key: &Key) -> Result<()> {
self.begin()?;
let mut field_count = self.estimate_key_size(key, policy.send_key)?;
let filter_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_size > 0 {
field_count += 1;
}
self.estimate_operation_size()?;
self.size_buffer()?;
self.write_header_with_policy(policy, 0, INFO2_WRITE, field_count as u16, 1)?;
self.write_key(key, policy.send_key)?;
if let Some(filter) = policy.filter_expression() {
self.write_filter_expression(filter, filter_size)?;
}
self.write_operation_for_operation_type(OperationType::Touch)?;
self.end()
}
pub fn set_exists(&mut self, policy: &WritePolicy, key: &Key) -> Result<()> {
self.begin()?;
let mut field_count = self.estimate_key_size(key, false)?;
let filter_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_size > 0 {
field_count += 1;
}
self.size_buffer()?;
self.write_header(
&policy.base_policy,
INFO1_READ | INFO1_NOBINDATA,
0,
field_count,
0,
)?;
self.write_key(key, false)?;
if let Some(filter) = policy.filter_expression() {
self.write_filter_expression(filter, filter_size)?;
}
self.end()
}
pub fn set_read(&mut self, policy: &ReadPolicy, key: &Key, bins: &Bins) -> Result<()> {
match bins {
Bins::None => self.set_read_header(policy, key),
Bins::All => self.set_read_for_key_only(policy, key),
Bins::Some(ref bin_names) => {
self.begin()?;
let mut field_count = self.estimate_key_size(key, false)?;
let filter_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_size > 0 {
field_count += 1;
}
for bin_name in bin_names {
self.estimate_operation_size_for_bin_name(bin_name)?;
}
self.size_buffer()?;
self.write_header(policy, INFO1_READ, 0, field_count, bin_names.len() as u16)?;
self.write_key(key, false)?;
if let Some(filter) = policy.filter_expression() {
self.write_filter_expression(filter, filter_size)?;
}
for bin_name in bin_names {
self.write_operation_for_bin_name(bin_name, OperationType::Read)?;
}
self.end()?;
Ok(())
}
}
}
pub fn set_read_header(&mut self, policy: &ReadPolicy, key: &Key) -> Result<()> {
self.begin()?;
let mut field_count = self.estimate_key_size(key, false)?;
let filter_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_size > 0 {
field_count += 1;
}
self.estimate_operation_size_for_bin_name("")?;
self.size_buffer()?;
self.write_header(policy, INFO1_READ | INFO1_NOBINDATA, 0, field_count, 1)?;
self.write_key(key, false)?;
if let Some(filter) = policy.filter_expression() {
self.write_filter_expression(filter, filter_size)?;
}
self.write_operation_for_bin_name("", OperationType::Read)?;
self.end()
}
pub fn set_read_for_key_only(&mut self, policy: &ReadPolicy, key: &Key) -> Result<()> {
self.begin()?;
let mut field_count = self.estimate_key_size(key, false)?;
let filter_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_size > 0 {
field_count += 1;
}
self.size_buffer()?;
self.write_header(policy, INFO1_READ | INFO1_GET_ALL, 0, field_count, 0)?;
self.write_key(key, false)?;
if let Some(filter) = policy.filter_expression() {
self.write_filter_expression(filter, filter_size)?;
}
self.end()
}
pub fn set_batch_read<'a>(
&mut self,
policy: &BatchPolicy,
batch_reads: SharedSlice<BatchRead<'a>>,
offsets: &[usize],
) -> Result<()> {
let field_count_row = if policy.send_set_name { 2 } else { 1 };
self.begin()?;
let mut field_count = 1;
self.data_offset += FIELD_HEADER_SIZE as usize + 5;
let filter_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_size > 0 {
field_count += 1;
}
let mut prev: Option<&BatchRead> = None;
for idx in offsets {
let batch_read: &BatchRead = batch_reads.get(*idx).unwrap();
self.data_offset += batch_read.key.digest.len() + 4;
match prev {
Some(prev) if batch_read.match_header(prev, policy.send_set_name) => {
self.data_offset += 1;
}
_ => {
let key = &batch_read.key;
self.data_offset += key.namespace.len() + FIELD_HEADER_SIZE as usize + 6;
if policy.send_set_name {
self.data_offset += key.set_name.len() + FIELD_HEADER_SIZE as usize;
}
if let Bins::Some(ref bin_names) = batch_read.bins {
for name in bin_names {
self.estimate_operation_size_for_bin_name(name)?;
}
}
}
}
prev = Some(batch_read);
}
self.size_buffer()?;
self.write_header(
&policy.base_policy,
INFO1_READ | INFO1_BATCH,
0,
field_count,
0,
)?;
if let Some(filter) = policy.filter_expression() {
self.write_filter_expression(filter, filter_size)?;
}
let field_size_offset = self.data_offset;
let field_type = if policy.send_set_name {
FieldType::BatchIndexWithSet
} else {
FieldType::BatchIndex
};
self.write_field_header(0, field_type)?;
self.write_u32(offsets.len() as u32)?;
self.write_u8(if policy.allow_inline { 1 } else { 0 })?;
prev = None;
for idx in offsets {
let batch_read = batch_reads.get(*idx).unwrap();
let key = &batch_read.key;
self.write_u32(*idx as u32)?;
self.write_bytes(&key.digest)?;
match prev {
Some(prev) if batch_read.match_header(prev, policy.send_set_name) => {
self.write_u8(1)?;
}
_ => {
self.write_u8(0)?;
match batch_read.bins {
Bins::None => {
self.write_u8(INFO1_READ | INFO1_NOBINDATA)?;
self.write_u16(field_count_row)?;
self.write_u16(0)?;
self.write_field_string(&key.namespace, FieldType::Namespace)?;
if policy.send_set_name {
self.write_field_string(&key.set_name, FieldType::Table)?;
}
}
Bins::All => {
self.write_u8(INFO1_READ | INFO1_GET_ALL)?;
self.write_u16(field_count_row)?;
self.write_u16(0)?;
self.write_field_string(&key.namespace, FieldType::Namespace)?;
if policy.send_set_name {
self.write_field_string(&key.set_name, FieldType::Table)?;
}
}
Bins::Some(ref bin_names) => {
self.write_u8(INFO1_READ)?;
self.write_u16(field_count_row)?;
self.write_u16(bin_names.len() as u16)?;
self.write_field_string(&key.namespace, FieldType::Namespace)?;
if policy.send_set_name {
self.write_field_string(&key.set_name, FieldType::Table)?;
}
for bin in bin_names {
self.write_operation_for_bin_name(bin, OperationType::Read)?;
}
}
}
}
}
prev = Some(batch_read);
}
let field_size = self.data_offset - MSG_TOTAL_HEADER_SIZE as usize - 4;
NetworkEndian::write_u32(
&mut self.data_buffer[field_size_offset..field_size_offset + 4],
field_size as u32,
);
self.end()
}
pub fn set_operate<'a>(
&mut self,
policy: &WritePolicy,
key: &Key,
operations: &'a [Operation<'a>],
) -> Result<()> {
self.begin()?;
let mut read_attr = 0;
let mut write_attr = 0;
for operation in operations {
match *operation {
Operation {
op: OperationType::Read,
bin: OperationBin::None,
..
} => read_attr |= INFO1_READ | INFO1_NOBINDATA,
Operation {
op: OperationType::Read,
bin: OperationBin::All,
..
} => read_attr |= INFO1_READ | INFO1_GET_ALL,
Operation {
op: OperationType::Read,
..
}
| Operation {
op: OperationType::CdtRead,
..
}
| Operation {
op: OperationType::BitRead,
..
}
| Operation {
op: OperationType::HllRead,
..
}
| Operation {
op: OperationType::ExpRead,
..
} => read_attr |= INFO1_READ,
_ => write_attr |= INFO2_WRITE,
}
let each_op = matches!(
operation.data,
OperationData::CdtMapOp(_)
| OperationData::CdtBitOp(_)
| OperationData::HLLOp(_)
| OperationData::EXPOp(_)
);
if policy.respond_per_each_op || each_op {
write_attr |= INFO2_RESPOND_ALL_OPS;
}
self.data_offset += operation.estimate_size()? + OPERATION_HEADER_SIZE as usize;
}
let mut field_count = self.estimate_key_size(key, policy.send_key && write_attr != 0)?;
let filter_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_size > 0 {
field_count += 1;
}
self.size_buffer()?;
if write_attr == 0 {
self.write_header(
&policy.base_policy,
read_attr,
write_attr,
field_count,
operations.len() as u16,
)?;
} else {
self.write_header_with_policy(
policy,
read_attr,
write_attr,
field_count,
operations.len() as u16,
)?;
}
self.write_key(key, policy.send_key && write_attr != 0)?;
if let Some(filter) = policy.filter_expression() {
self.write_filter_expression(filter, filter_size)?;
}
for operation in operations {
operation.write_to(self)?;
}
self.end()
}
pub fn set_udf(
&mut self,
policy: &WritePolicy,
key: &Key,
package_name: &str,
function_name: &str,
args: Option<&[Value]>,
) -> Result<()> {
self.begin()?;
let mut field_count = self.estimate_key_size(key, policy.send_key)?;
field_count += self.estimate_udf_size(package_name, function_name, args)? as u16;
let filter_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_size > 0 {
field_count += 1;
}
self.size_buffer()?;
self.write_header(&policy.base_policy, 0, INFO2_WRITE, field_count, 0)?;
self.write_key(key, policy.send_key)?;
if let Some(filter) = policy.filter_expression() {
self.write_filter_expression(filter, filter_size)?;
}
self.write_field_string(package_name, FieldType::UdfPackageName)?;
self.write_field_string(function_name, FieldType::UdfFunction)?;
self.write_args(args, FieldType::UdfArgList)?;
self.end()
}
pub fn set_scan(
&mut self,
policy: &ScanPolicy,
namespace: &str,
set_name: &str,
bins: &Bins,
task_id: u64,
partitions: &Vec<u16>,
) -> Result<()> {
self.begin()?;
let mut field_count = 0;
let filter_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_size > 0 {
field_count += 1;
}
if !namespace.is_empty() {
self.data_offset += namespace.len() + FIELD_HEADER_SIZE as usize;
field_count += 1;
}
if !set_name.is_empty() {
self.data_offset += set_name.len() + FIELD_HEADER_SIZE as usize;
field_count += 1;
}
self.data_offset += partitions.len() * 2 + FIELD_HEADER_SIZE as usize;
field_count += 1;
self.data_offset += 4 + FIELD_HEADER_SIZE as usize;
field_count += 1;
self.data_offset += 8 + FIELD_HEADER_SIZE as usize;
field_count += 1;
let bin_count = match *bins {
Bins::All | Bins::None => 0,
Bins::Some(ref bin_names) => {
for bin_name in bin_names {
self.estimate_operation_size_for_bin_name(bin_name)?;
}
bin_names.len()
}
};
self.size_buffer()?;
let mut read_attr = INFO1_READ;
if bins.is_none() {
read_attr |= INFO1_NOBINDATA;
}
self.write_header(
&policy.base_policy,
read_attr,
0,
field_count,
bin_count as u16,
)?;
if !namespace.is_empty() {
self.write_field_string(namespace, FieldType::Namespace)?;
}
if !set_name.is_empty() {
self.write_field_string(set_name, FieldType::Table)?;
}
self.write_field_header(partitions.len() * 2, FieldType::PIDArray)?;
for pid in partitions {
self.write_u16_little_endian(*pid)?;
}
if let Some(filter) = policy.filter_expression() {
self.write_filter_expression(filter, filter_size)?;
}
self.write_field_header(4, FieldType::ScanTimeout)?;
self.write_u32(policy.socket_timeout)?;
self.write_field_header(8, FieldType::TranId)?;
self.write_u64(task_id)?;
if let Bins::Some(ref bin_names) = *bins {
for bin_name in bin_names {
self.write_operation_for_bin_name(bin_name, OperationType::Read)?;
}
}
self.end()
}
#[allow(clippy::cognitive_complexity)]
pub fn set_query(
&mut self,
policy: &QueryPolicy,
statement: &Statement,
write: bool,
task_id: u64,
partitions: &Vec<u16>,
) -> Result<()> {
let filter = match statement.filters {
Some(ref filters) => Some(&filters[0]),
None => None,
};
self.begin()?;
let mut field_count = 0;
let mut filter_size = 0;
let mut bin_name_size = 0;
if !statement.namespace.is_empty() {
self.data_offset += statement.namespace.len() + FIELD_HEADER_SIZE as usize;
field_count += 1;
}
if !statement.set_name.is_empty() {
self.data_offset += statement.set_name.len() + FIELD_HEADER_SIZE as usize;
field_count += 1;
}
if let Some(ref index_name) = statement.index_name {
if !index_name.is_empty() {
self.data_offset += index_name.len() + FIELD_HEADER_SIZE as usize;
field_count += 1;
}
}
self.data_offset += 8 + FIELD_HEADER_SIZE as usize;
field_count += 1;
if let Some(filter) = filter {
let idx_type = filter.collection_index_type();
if idx_type != CollectionIndexType::Default {
self.data_offset += 1 + FIELD_HEADER_SIZE as usize;
field_count += 1;
}
filter_size = 1 + filter.estimate_size()?;
self.data_offset += filter_size + FIELD_HEADER_SIZE as usize;
field_count += 1;
if let Bins::Some(ref bin_names) = statement.bins {
self.data_offset += FIELD_HEADER_SIZE as usize;
bin_name_size += 1;
for bin_name in bin_names {
bin_name_size += bin_name.len() + 1;
}
self.data_offset += bin_name_size;
field_count += 1;
}
} else {
self.data_offset += partitions.len() * 2 + FIELD_HEADER_SIZE as usize;
field_count += 1;
}
let filter_exp_size = self.estimate_filter_size(policy.filter_expression())?;
if filter_exp_size > 0 {
field_count += 1;
}
if let Some(ref aggregation) = statement.aggregation {
self.data_offset += 1 + FIELD_HEADER_SIZE as usize; self.data_offset += aggregation.package_name.len() + FIELD_HEADER_SIZE as usize;
self.data_offset += aggregation.function_name.len() + FIELD_HEADER_SIZE as usize;
if let Some(ref args) = aggregation.function_args {
self.estimate_args_size(Some(args))?;
} else {
self.estimate_args_size(None)?;
}
field_count += 4;
}
if statement.is_scan() {
if let Bins::Some(ref bin_names) = statement.bins {
for bin_name in bin_names {
self.estimate_operation_size_for_bin_name(bin_name)?;
}
}
}
self.size_buffer()?;
let mut operation_count: usize = 0;
if statement.is_scan() {
if let Bins::Some(ref bin_names) = statement.bins {
operation_count += bin_names.len();
}
}
let info1 = if statement.bins.is_none() {
INFO1_READ | INFO1_NOBINDATA
} else {
INFO1_READ
};
let info2 = if write { INFO2_WRITE } else { 0 };
self.write_header(
&policy.base_policy,
info1,
info2,
field_count,
operation_count as u16,
)?;
if !statement.namespace.is_empty() {
self.write_field_string(&statement.namespace, FieldType::Namespace)?;
}
if let Some(ref index_name) = statement.index_name {
if !index_name.is_empty() {
self.write_field_string(index_name, FieldType::IndexName)?;
}
}
if !statement.set_name.is_empty() {
self.write_field_string(&statement.set_name, FieldType::Table)?;
}
self.write_field_header(8, FieldType::TranId)?;
self.write_u64(task_id)?;
if let Some(filter) = filter {
let idx_type = filter.collection_index_type();
if idx_type != CollectionIndexType::Default {
self.write_field_header(1, FieldType::IndexType)?;
self.write_u8(idx_type as u8)?;
}
self.write_field_header(filter_size, FieldType::IndexRange)?;
self.write_u8(1)?;
filter.write(self)?;
if let Bins::Some(ref bin_names) = statement.bins {
if !bin_names.is_empty() {
self.write_field_header(bin_name_size, FieldType::QueryBinList)?;
self.write_u8(bin_names.len() as u8)?;
for bin_name in bin_names {
self.write_u8(bin_name.len() as u8)?;
self.write_str(bin_name)?;
}
}
}
} else {
self.write_field_header(partitions.len() * 2, FieldType::PIDArray)?;
for pid in partitions {
self.write_u16_little_endian(*pid)?;
}
}
if let Some(filter_exp) = policy.filter_expression() {
self.write_filter_expression(filter_exp, filter_exp_size)?;
}
if let Some(ref aggregation) = statement.aggregation {
self.write_field_header(1, FieldType::UdfOp)?;
if statement.bins.is_none() {
self.write_u8(2)?;
} else {
self.write_u8(1)?;
}
self.write_field_string(&aggregation.package_name, FieldType::UdfPackageName)?;
self.write_field_string(&aggregation.function_name, FieldType::UdfFunction)?;
if let Some(ref args) = aggregation.function_args {
self.write_args(Some(args), FieldType::UdfArgList)?;
} else {
self.write_args(None, FieldType::UdfArgList)?;
}
}
if statement.is_scan() {
if let Bins::Some(ref bin_names) = statement.bins {
for bin_name in bin_names {
self.write_operation_for_bin_name(bin_name, OperationType::Read)?;
}
}
}
self.end()
}
fn estimate_filter_size(&mut self, filter: &Option<FilterExpression>) -> Result<usize> {
if let Some(filter) = filter {
let filter_size = filter.pack(&mut None)?;
self.data_offset += filter_size + FIELD_HEADER_SIZE as usize;
Ok(filter_size)
} else {
Ok(0)
}
}
fn estimate_key_size(&mut self, key: &Key, send_key: bool) -> Result<u16> {
let mut field_count: u16 = 0;
if !key.namespace.is_empty() {
self.data_offset += key.namespace.len() + FIELD_HEADER_SIZE as usize;
field_count += 1;
}
if !key.set_name.is_empty() {
self.data_offset += key.set_name.len() + FIELD_HEADER_SIZE as usize;
field_count += 1;
}
self.data_offset += (DIGEST_SIZE + FIELD_HEADER_SIZE) as usize;
field_count += 1;
if send_key {
if let Some(ref user_key) = key.user_key {
self.data_offset += user_key.estimate_size()? + FIELD_HEADER_SIZE as usize + 1;
field_count += 1;
}
}
Ok(field_count)
}
fn estimate_args_size(&mut self, args: Option<&[Value]>) -> Result<()> {
if let Some(args) = args {
self.data_offset += encoder::pack_array(&mut None, args)? + FIELD_HEADER_SIZE as usize;
} else {
self.data_offset +=
encoder::pack_empty_args_array(&mut None)? + FIELD_HEADER_SIZE as usize;
}
Ok(())
}
fn estimate_udf_size(
&mut self,
package_name: &str,
function_name: &str,
args: Option<&[Value]>,
) -> Result<usize> {
self.data_offset += package_name.len() + FIELD_HEADER_SIZE as usize;
self.data_offset += function_name.len() + FIELD_HEADER_SIZE as usize;
self.estimate_args_size(args)?;
Ok(3)
}
fn estimate_operation_size_for_bin(&mut self, bin: &Bin) -> Result<()> {
self.data_offset += bin.name.len() + OPERATION_HEADER_SIZE as usize;
self.data_offset += bin.value.estimate_size()?;
Ok(())
}
fn estimate_operation_size_for_bin_name(&mut self, bin_name: &str) -> Result<()> {
self.data_offset += bin_name.len() + OPERATION_HEADER_SIZE as usize;
Ok(())
}
fn estimate_operation_size(&mut self) -> Result<()> {
self.data_offset += OPERATION_HEADER_SIZE as usize;
Ok(())
}
fn write_header(
&mut self,
policy: &ReadPolicy,
read_attr: u8,
write_attr: u8,
field_count: u16,
operation_count: u16,
) -> Result<()> {
let mut read_attr = read_attr;
if policy.consistency_level == ConsistencyLevel::ConsistencyAll {
read_attr |= INFO1_CONSISTENCY_ALL;
}
self.data_buffer[8] = MSG_REMAINING_HEADER_SIZE; self.data_buffer[9] = read_attr;
self.data_buffer[10] = write_attr;
for i in 11..26 {
self.data_buffer[i] = 0;
}
self.data_offset = 26;
self.write_u16(field_count as u16)?;
self.write_u16(operation_count as u16)?;
self.data_offset = MSG_TOTAL_HEADER_SIZE as usize;
Ok(())
}
fn write_header_with_policy(
&mut self,
policy: &WritePolicy,
read_attr: u8,
write_attr: u8,
field_count: u16,
operation_count: u16,
) -> Result<()> {
let mut generation: u32 = 0;
let mut info_attr: u8 = 0;
let mut read_attr = read_attr;
let mut write_attr = write_attr;
match policy.record_exists_action {
RecordExistsAction::Update => (),
RecordExistsAction::UpdateOnly => info_attr |= INFO3_UPDATE_ONLY,
RecordExistsAction::Replace => info_attr |= INFO3_CREATE_OR_REPLACE,
RecordExistsAction::ReplaceOnly => info_attr |= INFO3_REPLACE_ONLY,
RecordExistsAction::CreateOnly => write_attr |= INFO2_CREATE_ONLY,
}
match policy.generation_policy {
GenerationPolicy::None => (),
GenerationPolicy::ExpectGenEqual => {
generation = policy.generation;
write_attr |= INFO2_GENERATION;
}
GenerationPolicy::ExpectGenGreater => {
generation = policy.generation;
write_attr |= INFO2_GENERATION_GT;
}
}
if policy.commit_level == CommitLevel::CommitMaster {
info_attr |= INFO3_COMMIT_MASTER
}
if policy.base_policy.consistency_level == ConsistencyLevel::ConsistencyAll {
read_attr |= INFO1_CONSISTENCY_ALL
}
if policy.durable_delete {
write_attr |= INFO2_DURABLE_DELETE
}
self.data_offset = 8;
self.write_u8(MSG_REMAINING_HEADER_SIZE)?; self.write_u8(read_attr)?;
self.write_u8(write_attr)?;
self.write_u8(info_attr)?;
self.write_u8(0)?; self.write_u8(0)?;
self.write_u32(generation)?;
self.write_u32(policy.expiration.into())?;
self.write_u8(0)?;
self.write_u8(0)?;
self.write_u8(0)?;
self.write_u8(0)?;
self.write_u16(field_count)?;
self.write_u16(operation_count)?;
self.data_offset = MSG_TOTAL_HEADER_SIZE as usize;
Ok(())
}
fn write_key(&mut self, key: &Key, send_key: bool) -> Result<()> {
if !key.namespace.is_empty() {
self.write_field_string(&key.namespace, FieldType::Namespace)?;
}
if !key.set_name.is_empty() {
self.write_field_string(&key.set_name, FieldType::Table)?;
}
self.write_field_bytes(&key.digest, FieldType::DigestRipe)?;
if send_key {
if let Some(ref user_key) = key.user_key {
self.write_field_value(user_key, FieldType::Key)?;
}
}
Ok(())
}
fn write_filter_expression(&mut self, filter: &FilterExpression, size: usize) -> Result<()> {
self.write_field_header(size, FieldType::FilterExp)?;
filter.pack(&mut Some(self))?;
Ok(())
}
fn write_field_header(&mut self, size: usize, ftype: FieldType) -> Result<()> {
self.write_i32(size as i32 + 1)?;
self.write_u8(ftype as u8)?;
Ok(())
}
fn write_field_string(&mut self, field: &str, ftype: FieldType) -> Result<()> {
self.write_field_header(field.len(), ftype)?;
self.write_str(field)?;
Ok(())
}
fn write_field_bytes(&mut self, bytes: &[u8], ftype: FieldType) -> Result<()> {
self.write_field_header(bytes.len(), ftype)?;
self.write_bytes(bytes)?;
Ok(())
}
fn write_field_value(&mut self, value: &Value, ftype: FieldType) -> Result<()> {
self.write_field_header(value.estimate_size()? + 1, ftype)?;
self.write_u8(value.particle_type() as u8)?;
value.write_to(self)?;
Ok(())
}
fn write_args(&mut self, args: Option<&[Value]>, ftype: FieldType) -> Result<()> {
if let Some(args) = args {
self.write_field_header(encoder::pack_array(&mut None, args)?, ftype)?;
encoder::pack_array(&mut Some(self), args)?;
} else {
self.write_field_header(encoder::pack_empty_args_array(&mut None)?, ftype)?;
encoder::pack_empty_args_array(&mut Some(self))?;
}
Ok(())
}
fn write_operation_for_bin(&mut self, bin: &Bin, op_type: OperationType) -> Result<()> {
let name_length = bin.name.len();
let value_length = bin.value.estimate_size()?;
self.write_i32((name_length + value_length + 4) as i32)?;
self.write_u8(op_type as u8)?;
self.write_u8(bin.value.particle_type() as u8)?;
self.write_u8(0)?;
self.write_u8(name_length as u8)?;
self.write_str(bin.name)?;
bin.value.write_to(self)?;
Ok(())
}
fn write_operation_for_bin_name(&mut self, name: &str, op_type: OperationType) -> Result<()> {
self.write_i32(name.len() as i32 + 4)?;
self.write_u8(op_type as u8)?;
self.write_u8(0)?;
self.write_u8(0)?;
self.write_u8(name.len() as u8)?;
self.write_str(name)?;
Ok(())
}
fn write_operation_for_operation_type(&mut self, op_type: OperationType) -> Result<()> {
self.write_i32(4)?;
self.write_u8(op_type as u8)?;
self.write_u8(0)?;
self.write_u8(0)?;
self.write_u8(0)?;
Ok(())
}
pub const fn data_offset(&self) -> usize {
self.data_offset
}
pub fn skip_bytes(&mut self, count: usize) {
self.data_offset += count;
}
pub fn skip(&mut self, count: usize) -> Result<()> {
self.data_offset += count;
Ok(())
}
pub fn peek(&self) -> u8 {
self.data_buffer[self.data_offset]
}
#[allow(clippy::option_if_let_else)]
pub fn read_u8(&mut self, pos: Option<usize>) -> Result<u8> {
if let Some(pos) = pos {
Ok(self.data_buffer[pos])
} else {
let res = self.data_buffer[self.data_offset];
self.data_offset += 1;
Ok(res)
}
}
#[allow(clippy::option_if_let_else)]
pub fn read_i8(&mut self, pos: Option<usize>) -> Result<i8> {
if let Some(pos) = pos {
Ok(self.data_buffer[pos] as i8)
} else {
let res = self.data_buffer[self.data_offset] as i8;
self.data_offset += 1;
Ok(res)
}
}
#[allow(clippy::option_if_let_else)]
pub fn read_u16(&mut self, pos: Option<usize>) -> Result<u16> {
let len = 2;
if let Some(pos) = pos {
Ok(NetworkEndian::read_u16(&self.data_buffer[pos..pos + len]))
} else {
let res = NetworkEndian::read_u16(
&self.data_buffer[self.data_offset..self.data_offset + len],
);
self.data_offset += len;
Ok(res)
}
}
pub fn read_i16(&mut self, pos: Option<usize>) -> Result<i16> {
let val = self.read_u16(pos)?;
Ok(val as i16)
}
#[allow(clippy::option_if_let_else)]
pub fn read_u32(&mut self, pos: Option<usize>) -> Result<u32> {
let len = 4;
if let Some(pos) = pos {
Ok(NetworkEndian::read_u32(&self.data_buffer[pos..pos + len]))
} else {
let res = NetworkEndian::read_u32(
&self.data_buffer[self.data_offset..self.data_offset + len],
);
self.data_offset += len;
Ok(res)
}
}
pub fn read_i32(&mut self, pos: Option<usize>) -> Result<i32> {
let val = self.read_u32(pos)?;
Ok(val as i32)
}
#[allow(clippy::option_if_let_else)]
pub fn read_u64(&mut self, pos: Option<usize>) -> Result<u64> {
let len = 8;
if let Some(pos) = pos {
Ok(NetworkEndian::read_u64(&self.data_buffer[pos..pos + len]))
} else {
let res = NetworkEndian::read_u64(
&self.data_buffer[self.data_offset..self.data_offset + len],
);
self.data_offset += len;
Ok(res)
}
}
pub fn read_i64(&mut self, pos: Option<usize>) -> Result<i64> {
let val = self.read_u64(pos)?;
Ok(val as i64)
}
pub fn read_msg_size(&mut self, pos: Option<usize>) -> Result<usize> {
let size = self.read_i64(pos)?;
let size = size & 0xFFFF_FFFF_FFFF;
Ok(size as usize)
}
#[allow(clippy::option_if_let_else)]
pub fn read_f32(&mut self, pos: Option<usize>) -> Result<f32> {
let len = 4;
if let Some(pos) = pos {
Ok(NetworkEndian::read_f32(&self.data_buffer[pos..pos + len]))
} else {
let res = NetworkEndian::read_f32(
&self.data_buffer[self.data_offset..self.data_offset + len],
);
self.data_offset += len;
Ok(res)
}
}
#[allow(clippy::option_if_let_else)]
pub fn read_f64(&mut self, pos: Option<usize>) -> Result<f64> {
let len = 8;
if let Some(pos) = pos {
Ok(NetworkEndian::read_f64(&self.data_buffer[pos..pos + len]))
} else {
let res = NetworkEndian::read_f64(
&self.data_buffer[self.data_offset..self.data_offset + len],
);
self.data_offset += len;
Ok(res)
}
}
pub fn read_str(&mut self, len: usize) -> Result<String> {
let s = str::from_utf8(&self.data_buffer[self.data_offset..self.data_offset + len])?;
self.data_offset += len;
Ok(s.to_owned())
}
pub fn read_bytes(&mut self, pos: usize, count: usize) -> Result<&[u8]> {
Ok(&self.data_buffer[pos..pos + count])
}
pub fn read_slice(&mut self, count: usize) -> Result<&[u8]> {
Ok(&self.data_buffer[self.data_offset..self.data_offset + count])
}
pub fn read_blob(&mut self, len: usize) -> Result<Vec<u8>> {
let val = self.data_buffer[self.data_offset..self.data_offset + len].to_vec();
self.data_offset += len;
Ok(val)
}
pub fn write_u8(&mut self, val: u8) -> Result<usize> {
self.data_buffer[self.data_offset] = val;
self.data_offset += 1;
Ok(1)
}
pub fn write_i8(&mut self, val: i8) -> Result<usize> {
self.data_buffer[self.data_offset] = val as u8;
self.data_offset += 1;
Ok(1)
}
pub fn write_u16(&mut self, val: u16) -> Result<usize> {
NetworkEndian::write_u16(
&mut self.data_buffer[self.data_offset..self.data_offset + 2],
val,
);
self.data_offset += 2;
Ok(2)
}
pub fn write_u16_little_endian(&mut self, val: u16) -> Result<usize> {
LittleEndian::write_u16(
&mut self.data_buffer[self.data_offset..self.data_offset + 2],
val,
);
self.data_offset += 2;
Ok(2)
}
pub fn write_i16(&mut self, val: i16) -> Result<usize> {
self.write_u16(val as u16)
}
pub fn write_u32(&mut self, val: u32) -> Result<usize> {
NetworkEndian::write_u32(
&mut self.data_buffer[self.data_offset..self.data_offset + 4],
val,
);
self.data_offset += 4;
Ok(4)
}
pub fn write_i32(&mut self, val: i32) -> Result<usize> {
self.write_u32(val as u32)
}
pub fn write_u64(&mut self, val: u64) -> Result<usize> {
NetworkEndian::write_u64(
&mut self.data_buffer[self.data_offset..self.data_offset + 8],
val,
);
self.data_offset += 8;
Ok(8)
}
pub fn write_i64(&mut self, val: i64) -> Result<usize> {
self.write_u64(val as u64)
}
pub fn write_bool(&mut self, val: bool) -> Result<usize> {
let val = if val { 1 } else { 0 };
self.write_i64(val)
}
pub fn write_f32(&mut self, val: f32) -> Result<usize> {
NetworkEndian::write_f32(
&mut self.data_buffer[self.data_offset..self.data_offset + 4],
val,
);
self.data_offset += 4;
Ok(4)
}
pub fn write_f64(&mut self, val: f64) -> Result<usize> {
NetworkEndian::write_f64(
&mut self.data_buffer[self.data_offset..self.data_offset + 8],
val,
);
self.data_offset += 8;
Ok(8)
}
pub fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
for b in bytes {
self.write_u8(*b)?;
}
Ok(bytes.len())
}
pub fn write_str(&mut self, val: &str) -> Result<usize> {
self.write_bytes(val.as_bytes())
}
pub fn write_geo(&mut self, value: &str) -> Result<usize> {
self.write_u8(0)?;
self.write_u8(0)?;
self.write_u8(0)?;
self.write_bytes(value.as_bytes())?;
Ok(3 + value.len())
}
pub fn write_timeout(&mut self, val: Option<Duration>) {
if let Some(val) = val {
let millis: i32 = (val.as_secs() * 1_000) as i32 + val.subsec_millis() as i32;
NetworkEndian::write_i32(&mut self.data_buffer[22..22 + 4], millis);
}
}
pub fn dump_buffer(&self) {
println!(">>>>>>>>>>>>>>> {:?}", self.data_buffer.to_vec());
}
}