use bincode::config::Configuration;
#[cfg(feature = "atomic")]
use crate::traits::AtomicStorage;
use crate::{
wrap::{encode_value, wrap, Subtable, WrapPrelude},
Database, DatabaseEntry, DatabaseError, DeriveKey, Incrementable, KeyBytes, Manifest,
Manifests, RecordKey, SerializationError, Storage,
};
type Write = (Vec<u8>, Vec<u8>);
pub struct DatabaseTransaction<Manifest> {
pending_writes: Vec<Write>,
pending_deletes: Vec<Vec<u8>>,
serialization_config: Configuration,
_marker: std::marker::PhantomData<Manifest>,
}
impl<M: Manifest> DatabaseTransaction<M> {
pub fn new<S: Storage>(database: &Database<S, M>) -> Self {
Self {
pending_writes: Vec::new(),
pending_deletes: Vec::new(),
serialization_config: database.serialization_config(),
_marker: std::marker::PhantomData,
}
}
pub fn new_with_serialization_config(serialization_config: Configuration) -> Self {
Self {
pending_writes: Vec::new(),
pending_deletes: Vec::new(),
serialization_config,
_marker: std::marker::PhantomData,
}
}
pub fn insert<K: RecordKey<Record = R>, R>(
&mut self,
record: R,
) -> Result<K, SerializationError>
where
R: DeriveKey<Key = K> + DatabaseEntry<Key = K>,
M: Manifests<R>,
{
let original_key = R::key(&record);
let writes = self.prepare_writes::<R>(&record, &original_key)?;
self.pending_writes.extend(writes);
Ok(original_key)
}
pub fn put<S: Storage, R: DatabaseEntry>(
&mut self,
record: R,
database: &mut Database<S, M>,
) -> Result<R::Key, DatabaseError<S::StoreError>>
where
R::Key: RecordKey<Record = R> + Incrementable + Ord,
M: Manifests<R>,
{
let last_key = database.manifest.last();
let new_key = last_key
.as_ref()
.map(|k| k.next_id().unwrap())
.unwrap_or_default();
let writes = self.prepare_writes::<R>(&record, &new_key)?;
self.pending_writes.extend(writes);
last_key.replace(new_key.clone());
Ok(new_key)
}
pub fn write(&mut self, key: Vec<u8>, value: Vec<u8>) {
self.pending_writes.retain(|(k, _)| k != &key);
self.pending_deletes.retain(|k| k != &key);
self.pending_writes.push((key, value));
}
pub fn delete(&mut self, key: Vec<u8>) {
if !self.pending_writes.iter().any(|(k, _)| k == &key) {
self.pending_deletes.push(key);
}
}
pub fn is_empty(&self) -> bool {
self.pending_writes.is_empty() && self.pending_deletes.is_empty()
}
pub fn write_count(&self) -> usize {
self.pending_writes.len()
}
pub fn delete_count(&self) -> usize {
self.pending_deletes.len()
}
pub fn pending_writes(&self) -> impl Iterator<Item = &Write> {
self.pending_writes.iter()
}
pub fn pending_deletes(&self) -> impl Iterator<Item = &Vec<u8>> {
self.pending_deletes.iter()
}
fn serialization_config(&self) -> Configuration {
self.serialization_config
}
#[cfg(feature = "atomic")]
pub fn commit<S: AtomicStorage>(
self,
storage: &mut S,
) -> Result<Vec<Option<Vec<u8>>>, DatabaseError<S::StoreError>> {
if self.is_empty() {
return Ok(Vec::new());
}
let inserts: Vec<Write> = self.pending_writes.into_iter().collect();
let removes: Vec<Vec<u8>> = self.pending_deletes.into_iter().collect();
storage
.batch_mixed(inserts, removes)
.map_err(DatabaseError::Storage)
}
pub fn rollback(self) {
drop(self);
}
pub fn consume(self) -> (impl Iterator<Item = Write>, impl Iterator<Item = Vec<u8>>) {
(
self.pending_writes.into_iter(),
self.pending_deletes.into_iter(),
)
}
fn prepare_writes<R: DatabaseEntry>(
&self,
record: &R,
key: &R::Key,
) -> Result<Vec<Write>, SerializationError>
where
R::Key: RecordKey<Record = R>,
{
let index_keys = record.index_keys();
let mut writes = Vec::with_capacity(1 + index_keys.len());
for (discriminator, index_key) in record.index_keys() {
let mut entry = WrapPrelude::new::<R>(Subtable::Index(discriminator))
.to_bytes(self.serialization_config());
entry.extend_from_slice(&index_key.to_bytes(self.serialization_config()));
let key_bytes = key.to_bytes(self.serialization_config());
entry.extend_from_slice(&key_bytes);
writes.push((entry.clone(), key_bytes.clone()));
}
let key = wrap::<R>(key, self.serialization_config())?;
let value = encode_value(record, self.serialization_config())?;
writes.push((key, value));
Ok(writes)
}
}