use crate::batch::BatchDeletePolicy;
use crate::batch::BatchReadPolicy;
use crate::batch::BatchUDFPolicy;
use crate::batch::BatchWritePolicy;
use crate::commands::buffer;
use crate::expressions::Expression;
use crate::operations::{Operation, OperationBin, OperationType};
use crate::policy::BatchPolicy;
use crate::CommitLevel;
use crate::GenerationPolicy;
use crate::RecordExistsAction;
#[derive(Default)]
pub struct BatchAttr {
pub(crate) filter_expression: Option<Expression>,
pub(crate) read_attr: u8,
pub(crate) write_attr: u8,
pub(crate) info_attr: u8,
pub(crate) txn_attr: u8,
pub(crate) expiration: u32,
pub(crate) generation: u32,
pub(crate) has_write: bool,
pub(crate) send_key: bool,
}
impl BatchAttr {
#[allow(dead_code)]
pub(crate) fn set_read(&mut self, rp: &BatchPolicy) {
self.filter_expression = None;
self.read_attr = buffer::INFO1_READ;
self.write_attr = 0;
self.txn_attr = 0;
self.expiration = rp.base_policy.read_touch_ttl.into();
self.generation = 0;
self.has_write = false;
self.send_key = false;
}
pub(crate) fn set_batch_read(&mut self, rp: &BatchReadPolicy, parent: &BatchPolicy) {
self.filter_expression = rp
.filter_expression
.clone()
.or_else(|| parent.filter_expression.clone());
self.read_attr = buffer::INFO1_READ;
self.write_attr = 0;
self.txn_attr = 0;
self.expiration = rp.read_touch_ttl.into();
self.generation = 0;
self.has_write = false;
self.send_key = false;
}
pub(crate) fn adjust_read(&mut self, ops: &Vec<Operation>) {
for op in ops {
if matches!(op.op, OperationType::Read) {
match op.bin {
OperationBin::All => {
self.read_attr |= buffer::INFO1_GET_ALL;
}
OperationBin::None => {
self.read_attr |= buffer::INFO1_NOBINDATA;
}
OperationBin::Name(_) => (),
}
}
}
}
pub(crate) const fn adjust_read_for_all_bins(&mut self, read_all_bins: bool) {
if read_all_bins {
self.read_attr |= buffer::INFO1_GET_ALL;
} else {
self.read_attr |= buffer::INFO1_NOBINDATA;
}
}
pub(crate) fn set_batch_write(&mut self, wp: &BatchWritePolicy, parent: &BatchPolicy) {
self.filter_expression = wp
.filter_expression
.clone()
.or_else(|| parent.filter_expression.clone());
self.read_attr = 0;
self.write_attr = buffer::INFO2_WRITE | buffer::INFO2_RESPOND_ALL_OPS;
self.info_attr = 0;
self.txn_attr = 0;
self.expiration = wp.expiration.into();
self.has_write = true;
self.send_key = wp.send_key;
match wp.generation_policy {
GenerationPolicy::None => {
self.generation = 0;
}
GenerationPolicy::ExpectGenEqual => {
self.generation = wp.generation;
self.write_attr |= buffer::INFO2_GENERATION;
}
GenerationPolicy::ExpectGenGreater => {
self.generation = wp.generation;
self.write_attr |= buffer::INFO2_GENERATION_GT;
}
}
match wp.record_exists_action {
RecordExistsAction::Update => (),
RecordExistsAction::UpdateOnly => self.info_attr |= buffer::INFO3_UPDATE_ONLY,
RecordExistsAction::Replace => self.info_attr |= buffer::INFO3_CREATE_OR_REPLACE,
RecordExistsAction::ReplaceOnly => self.info_attr |= buffer::INFO3_REPLACE_ONLY,
RecordExistsAction::CreateOnly => self.write_attr |= buffer::INFO2_CREATE_ONLY,
}
if wp.durable_delete {
self.write_attr |= buffer::INFO2_DURABLE_DELETE;
}
if wp.commit_level == CommitLevel::CommitMaster {
self.info_attr |= buffer::INFO3_COMMIT_MASTER;
}
}
pub(crate) fn adjust_write(&mut self, ops: &Vec<Operation>) {
let mut read_all_bins = false;
let mut read_header = false;
let mut has_read = false;
for op in ops {
match op.op {
OperationType::BitRead
| OperationType::ExpRead
| OperationType::HllRead
| OperationType::CdtRead
| OperationType::Read => {
match op.bin {
OperationBin::All => {
read_all_bins = true;
}
OperationBin::None => {
read_header = true;
}
OperationBin::Name(_) => (),
}
has_read = true;
}
_ => (),
}
}
if has_read {
self.read_attr |= buffer::INFO1_READ;
if read_all_bins {
self.read_attr |= buffer::INFO1_GET_ALL;
} else if read_header {
self.read_attr |= buffer::INFO1_NOBINDATA;
}
}
}
pub(crate) fn set_batch_udf(&mut self, up: &BatchUDFPolicy, parent: &BatchPolicy) {
self.filter_expression = up
.filter_expression
.clone()
.or_else(|| parent.filter_expression.clone());
self.read_attr = 0;
self.write_attr = buffer::INFO2_WRITE;
self.info_attr = 0;
self.txn_attr = 0;
self.expiration = up.expiration.into();
self.generation = 0;
self.has_write = true;
self.send_key = up.send_key;
if up.durable_delete {
self.write_attr |= buffer::INFO2_DURABLE_DELETE;
}
if up.commit_level == CommitLevel::CommitMaster {
self.info_attr |= buffer::INFO3_COMMIT_MASTER;
}
}
pub(crate) fn set_batch_delete(&mut self, dp: &BatchDeletePolicy, parent: &BatchPolicy) {
self.filter_expression = dp
.filter_expression
.clone()
.or_else(|| parent.filter_expression.clone());
self.read_attr = 0;
self.write_attr =
buffer::INFO2_WRITE | buffer::INFO2_RESPOND_ALL_OPS | buffer::INFO2_DELETE;
self.info_attr = 0;
self.txn_attr = 0;
self.expiration = 0;
self.has_write = true;
self.send_key = dp.send_key;
match dp.generation_policy {
GenerationPolicy::None => {
self.generation = 0;
}
GenerationPolicy::ExpectGenEqual => {
self.generation = dp.generation;
self.write_attr |= buffer::INFO2_GENERATION;
}
GenerationPolicy::ExpectGenGreater => {
self.generation = dp.generation;
self.write_attr |= buffer::INFO2_GENERATION_GT;
}
}
if dp.durable_delete {
self.write_attr |= buffer::INFO2_DURABLE_DELETE;
}
if dp.commit_level == CommitLevel::CommitMaster {
self.info_attr |= buffer::INFO3_COMMIT_MASTER;
}
}
}