pub mod batch_executor;
pub mod batch_record;
use crate::commands::buffer::{FIELD_HEADER_SIZE, OPERATION_HEADER_SIZE};
use crate::expressions::Expression;
use crate::msgpack::encoder;
use crate::operations::Operation;
use crate::Bins;
use crate::CommitLevel;
use crate::Expiration;
use crate::GenerationPolicy;
use crate::Key;
use crate::ReadTouchTTL;
use crate::Record;
use crate::RecordExistsAction;
use crate::ResultCode;
use crate::Value;
pub use self::batch_executor::BatchExecutor;
pub use self::batch_record::BatchRecord;
use crate::errors::{Error, Result};
pub struct BatchRecordIndex {
pub batch_index: usize,
pub record: Option<crate::Record>,
pub result_code: ResultCode,
}
#[derive(Debug, Clone, PartialEq)]
pub struct BatchReadPolicy {
pub read_touch_ttl: ReadTouchTTL,
pub filter_expression: Option<Expression>,
}
impl Default for BatchReadPolicy {
fn default() -> Self {
Self {
read_touch_ttl: ReadTouchTTL::ServerDefault,
filter_expression: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct BatchWritePolicy {
pub record_exists_action: RecordExistsAction,
pub generation_policy: GenerationPolicy,
pub commit_level: CommitLevel,
pub generation: u32,
pub expiration: Expiration,
pub send_key: bool,
pub durable_delete: bool,
pub filter_expression: Option<Expression>,
}
impl Default for BatchWritePolicy {
fn default() -> Self {
Self {
record_exists_action: RecordExistsAction::Update,
generation_policy: GenerationPolicy::None,
commit_level: CommitLevel::CommitAll,
generation: 0,
expiration: Expiration::NamespaceDefault,
send_key: false,
durable_delete: false,
filter_expression: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct BatchDeletePolicy {
pub generation_policy: GenerationPolicy,
pub commit_level: CommitLevel,
pub generation: u32,
pub send_key: bool,
pub durable_delete: bool,
pub filter_expression: Option<Expression>,
}
impl Default for BatchDeletePolicy {
fn default() -> Self {
Self {
generation_policy: GenerationPolicy::None,
commit_level: CommitLevel::CommitAll,
generation: 0,
send_key: false,
durable_delete: false,
filter_expression: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct BatchUDFPolicy {
pub commit_level: CommitLevel,
pub expiration: Expiration,
pub send_key: bool,
pub durable_delete: bool,
pub filter_expression: Option<Expression>,
}
impl Default for BatchUDFPolicy {
fn default() -> Self {
Self {
commit_level: CommitLevel::CommitAll,
expiration: Expiration::NamespaceDefault,
send_key: false,
durable_delete: false,
filter_expression: None,
}
}
}
#[derive(Clone, Debug)]
pub enum BatchOperation {
#[doc(hidden)]
Read {
br: BatchRecord,
policy: BatchReadPolicy,
bins: Bins,
ops: Option<Vec<Operation>>,
},
#[doc(hidden)]
Write {
br: BatchRecord,
policy: BatchWritePolicy,
ops: Vec<Operation>,
},
#[doc(hidden)]
Delete {
br: BatchRecord,
policy: BatchDeletePolicy,
},
#[doc(hidden)]
UDF {
br: BatchRecord,
policy: BatchUDFPolicy,
udf_name: String,
function_name: String,
args: Option<Vec<Value>>,
},
}
impl BatchOperation {
pub fn read(policy: &BatchReadPolicy, key: Key, bins: Bins) -> Self {
BatchOperation::Read {
br: BatchRecord::new(key, false),
policy: policy.clone(),
bins,
ops: None,
}
}
pub fn read_ops(policy: &BatchReadPolicy, key: Key, ops: Vec<Operation>) -> Self {
BatchOperation::Read {
br: BatchRecord::new(key, false),
policy: policy.clone(),
bins: Bins::None,
ops: Some(ops),
}
}
pub fn write(policy: &BatchWritePolicy, key: Key, ops: Vec<Operation>) -> Self {
BatchOperation::Write {
br: BatchRecord::new(key, true),
policy: policy.clone(),
ops,
}
}
pub fn delete(policy: &BatchDeletePolicy, key: Key) -> Self {
BatchOperation::Delete {
br: BatchRecord::new(key, true),
policy: policy.clone(),
}
}
pub fn udf(
policy: &BatchUDFPolicy,
key: Key,
udf_name: &str,
function_name: &str,
args: Option<Vec<Value>>,
) -> Self {
BatchOperation::UDF {
br: BatchRecord::new(key, true),
policy: policy.clone(),
udf_name: udf_name.into(),
function_name: function_name.into(),
args,
}
}
pub(crate) fn size(&self, parent_fe: Option<&Expression>) -> Result<usize> {
match self {
Self::Read {
policy, bins, ops, ..
} => {
let mut size: usize = 0;
match (&policy.filter_expression, parent_fe) {
(Some(fe), _) => {
size += fe.size()? + FIELD_HEADER_SIZE as usize;
}
(_, Some(pfe)) => {
size += pfe.size()? + FIELD_HEADER_SIZE as usize;
}
_ => (),
}
if let Bins::Some(bin_names) = bins {
for bin in bin_names {
size += bin.len() + OPERATION_HEADER_SIZE as usize;
}
}
if let Some(ops) = ops {
for op in ops {
if op.is_write() {
return Err(Error::ClientError(
"Write operations not allowed in batch read".into(),
));
}
size += op.estimate_size()? + 8;
}
}
Ok(size)
}
Self::Write {
br, policy, ops, ..
} => {
let mut size: usize = 2;
match (&policy.filter_expression, parent_fe) {
(Some(fe), _) => {
size += fe.size()? + FIELD_HEADER_SIZE as usize;
}
(_, Some(pfe)) => {
size += pfe.size()? + FIELD_HEADER_SIZE as usize;
}
_ => (),
}
if policy.send_key && br.key.has_value_to_send() {
if let Some(ref user_key) = br.key.user_key {
size += user_key.estimate_size()? + FIELD_HEADER_SIZE as usize + 1;
}
}
let mut has_write = false;
for op in ops {
if op.is_write() {
has_write = true;
}
size += op.estimate_size()? + 8;
}
if !has_write {
return Err(Error::ClientError(
"Batch write operations do not contain a write".into(),
));
}
Ok(size)
}
Self::Delete { br, policy } => {
let mut size: usize = 2;
match (&policy.filter_expression, parent_fe) {
(Some(fe), _) => {
size += fe.size()? + FIELD_HEADER_SIZE as usize;
}
(_, Some(pfe)) => {
size += pfe.size()? + FIELD_HEADER_SIZE as usize;
}
_ => (),
}
if policy.send_key && br.key.has_value_to_send() {
if let Some(ref user_key) = br.key.user_key {
size += user_key.estimate_size()? + FIELD_HEADER_SIZE as usize + 1;
}
}
Ok(size)
}
Self::UDF {
br,
policy,
udf_name,
function_name,
args,
} => {
let mut size: usize = 2;
match (&policy.filter_expression, parent_fe) {
(Some(fe), _) => {
size += fe.size()? + FIELD_HEADER_SIZE as usize;
}
(_, Some(pfe)) => {
size += pfe.size()? + FIELD_HEADER_SIZE as usize;
}
_ => (),
}
if policy.send_key && br.key.has_value_to_send() {
if let Some(ref user_key) = br.key.user_key {
size += user_key.estimate_size()? + FIELD_HEADER_SIZE as usize + 1;
}
}
size += udf_name.len() + FIELD_HEADER_SIZE as usize;
size += function_name.len() + FIELD_HEADER_SIZE as usize;
if let Some(args) = args {
size += encoder::pack_array(&mut None, args)? + FIELD_HEADER_SIZE as usize;
} else {
size += encoder::pack_empty_args_array(&mut None) + FIELD_HEADER_SIZE as usize;
}
Ok(size)
}
}
}
pub(crate) const fn match_header(&self, _prev: Option<&BatchOperation>) -> bool {
false
}
pub(crate) fn key(&self) -> Key {
match self {
Self::Read { br, .. }
| Self::Write { br, .. }
| Self::Delete { br, .. }
| Self::UDF { br, .. } => br.key.clone(),
}
}
pub fn batch_record(&self) -> BatchRecord {
match self {
Self::Read { br, .. }
| Self::Write { br, .. }
| Self::Delete { br, .. }
| Self::UDF { br, .. } => br.clone(),
}
}
pub(crate) fn set_record(&mut self, record: Option<Record>) {
match self {
Self::Read { br, .. }
| Self::Write { br, .. }
| Self::Delete { br, .. }
| Self::UDF { br, .. } => {
br.record = record;
br.result_code = Some(ResultCode::Ok);
}
}
}
pub(crate) const fn set_result_code(&mut self, rc: ResultCode, in_doubt: bool) {
match self {
Self::Read { br, .. } => {
br.result_code = Some(rc);
br.in_doubt = false;
}
Self::Write { br, .. } | Self::Delete { br, .. } | Self::UDF { br, .. } => {
br.result_code = Some(rc);
br.in_doubt = in_doubt;
}
}
}
}