use crate::{
bucket::DEFAULT_BUCKET_NAME,
db::Db,
error::Result,
options::WriteOptions,
types::{CommitInfo, KeyRange, Sequence, Value},
write_batch::WriteBatch,
};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct TransactionOptions {
pub write_options: WriteOptions,
}
#[derive(Debug, Clone)]
pub struct Transaction {
db: Db,
read_sequence: Sequence,
options: TransactionOptions,
writes: WriteBatch,
point_reads: Vec<ReadKey>,
range_reads: Vec<ReadRange>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ReadKey {
pub(crate) bucket: String,
pub(crate) key: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ReadRange {
pub(crate) bucket: String,
pub(crate) range: KeyRange,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub(crate) struct TransactionReadSet {
pub(crate) point_reads: Vec<ReadKey>,
pub(crate) range_reads: Vec<ReadRange>,
}
impl Transaction {
#[must_use]
pub(crate) fn new(db: Db, read_sequence: Sequence, options: TransactionOptions) -> Self {
Self {
db,
read_sequence,
options,
writes: WriteBatch::new(),
point_reads: Vec::new(),
range_reads: Vec::new(),
}
}
#[must_use]
pub const fn read_sequence(&self) -> Sequence {
self.read_sequence
}
#[must_use]
pub const fn options(&self) -> TransactionOptions {
self.options
}
pub fn get_sync(&mut self, key: &[u8]) -> Result<Option<Value>> {
self.get_bucket_sync(DEFAULT_BUCKET_NAME, key)
}
pub fn get_bucket_sync(
&mut self,
bucket: impl Into<String>,
key: &[u8],
) -> Result<Option<Value>> {
let bucket = bucket.into();
let value = self.db.get_at_sequence(&bucket, key, self.read_sequence)?;
self.point_reads.push(ReadKey {
bucket,
key: key.to_vec(),
});
Ok(value)
}
pub fn read_range_sync(&mut self, range: KeyRange) -> Result<()> {
self.read_range_bucket_sync(DEFAULT_BUCKET_NAME, range)
}
pub fn read_range_bucket_sync(
&mut self,
bucket: impl Into<String>,
range: KeyRange,
) -> Result<()> {
self.db.ensure_open()?;
let bucket = bucket.into();
let iter = self.db.range_at_sequence(
&bucket,
&range,
self.read_sequence,
crate::Direction::Forward,
)?;
for item in iter {
item?;
}
self.range_reads.push(ReadRange { bucket, range });
Ok(())
}
pub fn put(&mut self, key: impl Into<Vec<u8>>, value: impl Into<Value>) {
self.writes.put(key, value);
}
pub fn put_bucket(
&mut self,
bucket: impl Into<String>,
key: impl Into<Vec<u8>>,
value: impl Into<Value>,
) -> Result<()> {
self.writes.put_bucket(bucket, key, value)
}
pub fn delete(&mut self, key: impl Into<Vec<u8>>) {
self.writes.delete(key);
}
pub fn delete_bucket(
&mut self,
bucket: impl Into<String>,
key: impl Into<Vec<u8>>,
) -> Result<()> {
self.writes.delete_bucket(bucket, key)
}
pub fn delete_range(&mut self, range: KeyRange) {
self.writes.delete_range(range);
}
pub fn delete_range_bucket(
&mut self,
bucket: impl Into<String>,
range: KeyRange,
) -> Result<()> {
self.writes.delete_range_bucket(bucket, range)
}
pub fn commit_sync(self) -> Result<CommitInfo> {
let read_set = TransactionReadSet {
point_reads: self.point_reads,
range_reads: self.range_reads,
};
self.db.commit_transaction(
self.read_sequence,
read_set,
self.writes,
self.options.write_options,
)
}
}
#[allow(clippy::unused_async)]
impl Transaction {
pub async fn get(&mut self, key: &[u8]) -> Result<Option<Value>> {
self.get_bucket(DEFAULT_BUCKET_NAME, key).await
}
pub async fn get_bucket(
&mut self,
bucket: impl Into<String>,
key: &[u8],
) -> Result<Option<Value>> {
let bucket = bucket.into();
let value = self
.db
.get_at_sequence_async(&bucket, key, self.read_sequence)
.await?;
self.point_reads.push(ReadKey {
bucket,
key: key.to_vec(),
});
Ok(value)
}
pub async fn read_range(&mut self, range: KeyRange) -> Result<()> {
self.read_range_bucket(DEFAULT_BUCKET_NAME, range).await
}
pub async fn read_range_bucket(
&mut self,
bucket: impl Into<String>,
range: KeyRange,
) -> Result<()> {
self.db.ensure_open()?;
let bucket = bucket.into();
let mut iter = self
.db
.range_at_sequence_async(
&bucket,
&range,
self.read_sequence,
crate::Direction::Forward,
)
.await?;
while iter.next().await?.is_some() {}
self.range_reads.push(ReadRange { bucket, range });
Ok(())
}
pub async fn commit(self) -> Result<CommitInfo> {
let read_set = TransactionReadSet {
point_reads: self.point_reads,
range_reads: self.range_reads,
};
self.db
.commit_transaction_async(
self.read_sequence,
read_set,
self.writes,
self.options.write_options,
)
.await
}
}