use crate::SkipList;
use crate::arc::ArcManager;
use crate::error::{FluxError, PersistenceError};
use crate::mem::{EvictionPolicy, MemSize};
use crate::persistence::{DurabilityLevel, PersistenceEngine, PersistenceOptions};
use crate::transaction::Transaction;
use futures::stream::StreamExt;
use log::error;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::future::Future;
use std::hash::Hash;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
Serializable,
Snapshot,
}
impl Default for IsolationLevel {
fn default() -> Self {
IsolationLevel::Serializable
}
}
#[derive(Debug, Clone, Copy)]
pub struct VacuumOptions {
pub interval: Duration,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MemoryInfo {
pub max_bytes: Option<u64>,
pub current_bytes: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KeyMetadata<K> {
pub key: K,
pub last_accessed: u64,
pub access_count: u64,
pub size_bytes: u64,
}
pub trait BuilderState {}
#[derive(Debug, Default)]
pub struct Initial;
impl BuilderState for Initial {}
#[derive(Debug, Default)]
pub struct RelaxedDurability;
impl BuilderState for RelaxedDurability {}
#[derive(Debug, Default)]
pub struct Buildable;
impl BuilderState for Buildable {}
pub struct DatabaseBuilder<K, V, S: BuilderState = Initial> {
vacuum: Option<VacuumOptions>,
persistence_options: Option<PersistenceOptions>,
is_full_durability: bool,
flush_interval: Option<Duration>,
flush_after_n_commits: Option<usize>,
flush_after_m_bytes: Option<u64>,
max_memory_bytes: Option<u64>,
eviction_policy: EvictionPolicy,
ttl: Option<Duration>,
p_factor: Option<f64>,
isolation_level: IsolationLevel,
_phantom: PhantomData<(K, V, S)>,
}
impl<K, V, S: BuilderState> DatabaseBuilder<K, V, S> {
pub fn auto_vacuum(mut self, vacuum: VacuumOptions) -> Self {
self.vacuum = Some(vacuum);
self
}
pub fn max_memory(mut self, bytes: u64) -> Self {
self.max_memory_bytes = Some(bytes);
self
}
pub fn eviction_policy(mut self, policy: EvictionPolicy) -> Self {
self.eviction_policy = policy;
self
}
pub fn eviction_ttl(mut self, ttl: Duration) -> Self {
self.ttl = Some(ttl);
self
}
pub fn skiplist_p(mut self, p: f64) -> Self {
self.p_factor = Some(p);
self
}
pub fn isolation_level(mut self, level: IsolationLevel) -> Self {
self.isolation_level = level;
self
}
fn transition<NewState: BuilderState>(self) -> DatabaseBuilder<K, V, NewState> {
DatabaseBuilder {
vacuum: self.vacuum,
persistence_options: self.persistence_options,
is_full_durability: self.is_full_durability,
flush_interval: self.flush_interval,
flush_after_n_commits: self.flush_after_n_commits,
flush_after_m_bytes: self.flush_after_m_bytes,
max_memory_bytes: self.max_memory_bytes,
eviction_policy: self.eviction_policy,
ttl: self.ttl,
p_factor: self.p_factor,
isolation_level: self.isolation_level,
_phantom: PhantomData,
}
}
}
impl<K, V> DatabaseBuilder<K, V, Initial>
where
K: Ord
+ Clone
+ Send
+ Sync
+ 'static
+ Hash
+ Eq
+ Serialize
+ for<'de> Deserialize<'de>
+ MemSize
+ Borrow<str>,
V: Clone + Send + Sync + 'static + Serialize + for<'de> Deserialize<'de> + MemSize,
{
pub fn durability_full(
mut self,
options: PersistenceOptions,
) -> DatabaseBuilder<K, V, Buildable> {
self.persistence_options = Some(options);
self.is_full_durability = true;
self.transition()
}
pub fn durability_relaxed(
mut self,
options: PersistenceOptions,
) -> DatabaseBuilder<K, V, RelaxedDurability> {
self.persistence_options = Some(options);
self.is_full_durability = false;
self.transition()
}
pub async fn build(self) -> Result<Database<K, V>, FluxError> {
build_internal(self, DurabilityLevel::InMemory).await
}
}
impl<K, V> DatabaseBuilder<K, V, RelaxedDurability> {
pub fn flush_interval(mut self, interval: Duration) -> DatabaseBuilder<K, V, Buildable> {
self.flush_interval = Some(interval);
self.transition()
}
pub fn flush_after_commits(mut self, n: usize) -> DatabaseBuilder<K, V, Buildable> {
self.flush_after_n_commits = Some(n);
self.transition()
}
pub fn flush_after_bytes(mut self, m: u64) -> DatabaseBuilder<K, V, Buildable> {
self.flush_after_m_bytes = Some(m);
self.transition()
}
}
impl<K, V> DatabaseBuilder<K, V, Buildable>
where
K: Ord
+ Clone
+ Send
+ Sync
+ 'static
+ Hash
+ Eq
+ Serialize
+ for<'de> Deserialize<'de>
+ MemSize
+ Borrow<str>,
V: Clone + Send + Sync + 'static + Serialize + for<'de> Deserialize<'de> + MemSize,
{
pub fn flush_interval(mut self, interval: Duration) -> Self {
self.flush_interval = Some(interval);
self
}
pub fn flush_after_commits(mut self, n: usize) -> Self {
self.flush_after_n_commits = Some(n);
self
}
pub fn flush_after_bytes(mut self, m: u64) -> Self {
self.flush_after_m_bytes = Some(m);
self
}
pub async fn build(self) -> Result<Database<K, V>, FluxError> {
let durability = if self.is_full_durability {
DurabilityLevel::Full {
options: self.persistence_options.clone().unwrap(), }
} else {
DurabilityLevel::Relaxed {
options: self.persistence_options.clone().unwrap(), flush_interval_ms: self.flush_interval.map(|d| d.as_millis() as u64),
flush_after_n_commits: self.flush_after_n_commits,
flush_after_m_bytes: self.flush_after_m_bytes,
}
};
build_internal(self, durability).await
}
}
async fn build_internal<K, V, S: BuilderState>(
builder: DatabaseBuilder<K, V, S>,
durability: DurabilityLevel,
) -> Result<Database<K, V>, FluxError>
where
K: Ord
+ Clone
+ Send
+ Sync
+ 'static
+ Hash
+ Eq
+ Serialize
+ for<'de> Deserialize<'de>
+ MemSize
+ Borrow<str>,
V: Clone + Send + Sync + 'static + Serialize + for<'de> Deserialize<'de> + MemSize,
{
if builder.eviction_policy == EvictionPolicy::Ttl && builder.ttl.is_none() {
return Err(FluxError::Configuration(
"The TTL eviction policy requires a TTL duration to be set using .eviction_ttl()."
.to_string(),
));
}
if builder.eviction_policy != EvictionPolicy::Manual && builder.max_memory_bytes.is_none() {
return Err(FluxError::Configuration(
"An automatic eviction policy requires max_memory to be set.".to_string(),
));
}
let fatal_error = Arc::new(Mutex::new(None));
let persistence_engine = PersistenceEngine::new(durability, fatal_error.clone())?.map(Arc::new);
let current_memory_bytes = Arc::new(AtomicU64::new(0));
let access_clock = Arc::new(AtomicU64::new(0));
let skiplist = if let Some(engine) = &persistence_engine {
Arc::new(
engine
.recover(
current_memory_bytes.clone(),
access_clock.clone(),
builder.p_factor,
)
.await?,
)
} else {
let list = match builder.p_factor {
Some(p) => SkipList::with_p(p, current_memory_bytes.clone(), access_clock.clone()),
None => SkipList::new(current_memory_bytes.clone(), access_clock.clone()),
};
Arc::new(list)
};
let shutdown = Arc::new(AtomicBool::new(false));
let vacuum_handle = if let Some(vacuum_opts) = builder.vacuum {
let skiplist_clone = skiplist.clone();
let shutdown_clone = shutdown.clone();
let handle = std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
while !shutdown_clone.load(Ordering::Relaxed) {
tokio::time::sleep(vacuum_opts.interval).await;
if shutdown_clone.load(Ordering::Relaxed) {
break;
}
if let Err(e) = skiplist_clone.vacuum().await {
error!("Automatic vacuuming failed: {:?}", e);
}
}
});
});
Some(handle)
} else {
None
};
let arc_manager = if builder.eviction_policy == EvictionPolicy::Arc {
let max_mem = builder.max_memory_bytes.unwrap();
Some(Arc::new(ArcManager::new(max_mem)))
} else {
None
};
Ok(Database {
skiplist,
persistence_engine,
_vacuum_handle: vacuum_handle,
shutdown,
max_memory_bytes: builder.max_memory_bytes,
current_memory_bytes,
eviction_policy: builder.eviction_policy,
ttl: builder.ttl,
arc_manager,
fatal_error,
isolation_level: builder.isolation_level,
})
}
pub struct Database<K, V>
where
K: Ord
+ Clone
+ Send
+ Sync
+ 'static
+ Hash
+ Eq
+ Serialize
+ DeserializeOwned
+ MemSize
+ Borrow<str>,
V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned + MemSize,
{
#[cfg(test)]
pub skiplist: Arc<SkipList<K, V>>,
#[cfg(not(test))]
skiplist: Arc<SkipList<K, V>>,
persistence_engine: Option<Arc<PersistenceEngine<K, V>>>,
_vacuum_handle: Option<JoinHandle<()>>,
shutdown: Arc<AtomicBool>,
max_memory_bytes: Option<u64>,
current_memory_bytes: Arc<AtomicU64>,
eviction_policy: EvictionPolicy,
ttl: Option<Duration>,
arc_manager: Option<Arc<ArcManager<K>>>,
fatal_error: Arc<Mutex<Option<String>>>,
isolation_level: IsolationLevel,
}
impl<K, V> Drop for Database<K, V>
where
K: Ord
+ Clone
+ Send
+ Sync
+ 'static
+ std::hash::Hash
+ Eq
+ Serialize
+ DeserializeOwned
+ MemSize
+ Borrow<str>,
V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned + MemSize,
{
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
if let Some(handle) = self._vacuum_handle.take() {
if let Err(e) = handle.join() {
error!("Automatic vacuum thread panicked: {:?}", e);
}
}
}
}
impl<K, V> Database<K, V>
where
K: Ord
+ Clone
+ Send
+ Sync
+ 'static
+ std::hash::Hash
+ Eq
+ Serialize
+ for<'de> Deserialize<'de>
+ MemSize
+ Borrow<str>,
V: Clone + Send + Sync + 'static + Serialize + for<'de> Deserialize<'de> + MemSize,
{
async fn evict_if_needed(&self, spare_key: Option<&K>) {
if self.eviction_policy == EvictionPolicy::Manual {
return; }
const EVICTION_BATCH_SIZE: usize = 10;
if let Some(max_mem) = self.max_memory_bytes {
while self.current_memory_bytes.load(Ordering::Relaxed) > max_mem {
if self.eviction_policy == EvictionPolicy::Arc {
if self.evict_one(spare_key).await.is_err() {
break; }
continue; }
let victim_candidates = if self.eviction_policy == EvictionPolicy::Ttl {
self.skiplist
.find_ttl_victim_keys(self.ttl.unwrap(), EVICTION_BATCH_SIZE)
} else {
self.skiplist.find_victim_keys(
self.eviction_policy,
spare_key,
EVICTION_BATCH_SIZE,
)
};
if victim_candidates.is_empty() {
break; }
let mut evicted_in_batch = false;
for victim_key in victim_candidates {
if self.skiplist.evict(&victim_key).is_some() {
evicted_in_batch = true;
}
if self.current_memory_bytes.load(Ordering::Relaxed) <= max_mem {
break;
}
}
if !evicted_in_batch {
break;
}
}
}
}
async fn evict_one(&self, spare_key: Option<&K>) -> Result<(), FluxError> {
let victim_key = if let Some(manager) = &self.arc_manager {
manager.find_victim()
} else {
self.skiplist
.find_victim_keys(self.eviction_policy, spare_key, 1)
.into_iter()
.next()
};
if let Some(victim_key) = victim_key {
if self.skiplist.evict(&victim_key).is_some() {
Ok(())
} else {
Err(FluxError::EvictionError)
}
} else {
Err(FluxError::EvictionError) }
}
pub fn builder() -> DatabaseBuilder<K, V, Initial> {
DatabaseBuilder {
vacuum: None,
persistence_options: None,
is_full_durability: false,
flush_interval: None,
flush_after_n_commits: None,
flush_after_m_bytes: None,
max_memory_bytes: None,
eviction_policy: EvictionPolicy::default(),
ttl: None,
p_factor: None,
isolation_level: IsolationLevel::default(),
_phantom: PhantomData,
}
}
pub async fn new_in_memory() -> Result<Self, FluxError> {
Database::builder().build().await
}
pub async fn new(config: DurabilityLevel) -> Result<Self, FluxError>
where
K: Ord
+ Clone
+ Send
+ Sync
+ 'static
+ Hash
+ Eq
+ Serialize
+ for<'de> Deserialize<'de>
+ MemSize
+ Borrow<str>,
V: Clone + Send + Sync + 'static + Serialize + for<'de> Deserialize<'de> + MemSize,
{
match config {
DurabilityLevel::InMemory => Database::builder().build().await,
DurabilityLevel::Full { options } => {
Database::builder().durability_full(options).build().await
}
DurabilityLevel::Relaxed {
options,
flush_interval_ms,
flush_after_n_commits,
flush_after_m_bytes,
} => {
let relaxed_builder = Database::builder().durability_relaxed(options);
if let Some(interval) = flush_interval_ms {
let mut buildable =
relaxed_builder.flush_interval(Duration::from_millis(interval));
if let Some(n) = flush_after_n_commits {
buildable = buildable.flush_after_commits(n);
}
if let Some(m) = flush_after_m_bytes {
buildable = buildable.flush_after_bytes(m);
}
buildable.build().await
} else if let Some(n) = flush_after_n_commits {
let mut buildable = relaxed_builder.flush_after_commits(n);
if let Some(m) = flush_after_m_bytes {
buildable = buildable.flush_after_bytes(m);
}
buildable.build().await
} else if let Some(m) = flush_after_m_bytes {
relaxed_builder.flush_after_bytes(m).build().await
} else {
Err(FluxError::Configuration("Relaxed durability mode requires at least one flush condition (interval, commits, or bytes).".to_string()))
}
}
}
}
pub async fn vacuum(&self) -> Result<(usize, usize), ()> {
self.skiplist.vacuum().await
}
pub fn memory_info(&self) -> MemoryInfo {
MemoryInfo {
max_bytes: self.max_memory_bytes,
current_bytes: self.current_memory_bytes.load(Ordering::Relaxed),
}
}
pub fn handle(&self) -> Handle<'_, K, V> {
Handle {
db: self,
skiplist: &self.skiplist,
active_tx: None,
persistence_engine: &self.persistence_engine,
arc_manager: &self.arc_manager,
fatal_error: &self.fatal_error,
}
}
}
pub struct Handle<'db, K, V>
where
K: Ord
+ Clone
+ Send
+ Sync
+ 'static
+ Hash
+ Eq
+ Serialize
+ DeserializeOwned
+ MemSize
+ Borrow<str>,
V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned + MemSize,
{
db: &'db Database<K, V>,
skiplist: &'db Arc<SkipList<K, V>>,
#[cfg(test)]
pub active_tx: Option<Arc<Transaction<K, V>>>,
#[cfg(not(test))]
active_tx: Option<Arc<Transaction<K, V>>>,
persistence_engine: &'db Option<Arc<PersistenceEngine<K, V>>>,
arc_manager: &'db Option<Arc<ArcManager<K>>>,
fatal_error: &'db Arc<Mutex<Option<String>>>,
}
impl<'db, K, V> Drop for Handle<'db, K, V>
where
K: Ord
+ Clone
+ Send
+ Sync
+ 'static
+ Hash
+ Eq
+ Serialize
+ DeserializeOwned
+ MemSize
+ Borrow<str>,
V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned + MemSize,
{
fn drop(&mut self) {
if let Some(active_tx) = self.active_tx.take() {
let tx_manager = self.skiplist.transaction_manager();
tx_manager.abort(&active_tx);
}
}
}
impl<'db, K, V> Handle<'db, K, V>
where
K: Ord
+ Clone
+ Send
+ Sync
+ 'static
+ Hash
+ Eq
+ Serialize
+ DeserializeOwned
+ MemSize
+ Borrow<str>,
V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned + MemSize,
{
pub async fn evict(&self, key: &K) -> Result<bool, FluxError> {
self.check_fatal_error()?;
if self.db.skiplist.evict(key).is_some() {
Ok(true)
} else {
Ok(false)
}
}
pub fn scan_metadata<'a>(&'a self) -> impl futures::stream::Stream<Item = KeyMetadata<K>> + 'a {
self.skiplist.scan_metadata()
}
fn check_fatal_error(&self) -> Result<(), FluxError> {
if let Some(err_msg) = self.fatal_error.lock().unwrap().as_ref() {
return Err(FluxError::FatalPersistenceError(err_msg.clone()));
}
Ok(())
}
pub fn get(&self, key: &K) -> Result<Option<Arc<V>>, FluxError> {
self.check_fatal_error()?;
if let Some(active_tx) = &self.active_tx {
let workspace = active_tx.workspace.read().unwrap();
if let Some(workspace_value) = workspace.get(key.borrow()) {
return Ok(workspace_value.clone());
}
Ok(self.skiplist.get(key, active_tx))
} else {
let tx_manager = self.skiplist.transaction_manager();
let tx = tx_manager.begin();
let result = self.skiplist.get(key, &tx);
if let (Some(manager), Some(_)) = (&self.arc_manager, &result) {
manager.hit(key);
}
tx_manager.commit(&tx, || Ok(()), self.db.isolation_level)?;
Ok(result)
}
}
pub async fn insert(&self, key: K, value: V) -> Result<(), FluxError> {
self.check_fatal_error()?;
if let Some(active_tx) = &self.active_tx {
active_tx
.workspace
.write()
.unwrap()
.insert(key, Some(Arc::new(value)));
Ok(())
} else {
let tx_manager = self.skiplist.transaction_manager();
let tx = tx_manager.begin();
let key_for_eviction = key.clone();
let allocated_size = self
.skiplist
.insert(key.clone(), Arc::new(value.clone()), &tx)
.await;
let on_pre_commit = || {
if let Some(engine) = self.persistence_engine {
let mut workspace = crate::transaction::Workspace::new();
workspace.insert(key, Some(Arc::new(value)));
let mut serialized_data = Vec::new();
ciborium::into_writer(&workspace, &mut serialized_data).map_err(|e| {
FluxError::Persistence(PersistenceError::Serialization(e.to_string()))
})?;
engine.log(&serialized_data)?;
}
Ok(())
};
match tx_manager.commit(&tx, on_pre_commit, self.db.isolation_level) {
Ok(()) => {
if let Some(manager) = &self.arc_manager {
manager.miss(key_for_eviction.clone(), allocated_size as usize);
}
if self.db.eviction_policy == EvictionPolicy::Manual {
if let Some(max_mem) = self.db.max_memory_bytes {
if self.db.current_memory_bytes.load(Ordering::Relaxed) > max_mem {
return Err(FluxError::MemoryLimitExceeded);
}
}
} else {
self.db.evict_if_needed(Some(&key_for_eviction)).await;
}
Ok(())
}
Err(e) => Err(e),
}
}
}
pub async fn remove(&self, key: &K) -> Result<Option<Arc<V>>, FluxError> {
self.check_fatal_error()?;
if let Some(active_tx) = &self.active_tx {
let mut workspace = active_tx.workspace.write().unwrap();
let old_val_in_workspace = workspace.insert(key.clone(), None);
if let Some(Some(val)) = old_val_in_workspace {
Ok(Some(val))
} else {
Ok(self.skiplist.get(key, active_tx))
}
} else {
let tx_manager = self.skiplist.transaction_manager();
let tx = tx_manager.begin();
let result = self.skiplist.remove(key, &tx).await;
let key_clone = key.clone();
let on_pre_commit = || {
if let Some(engine) = self.persistence_engine {
let mut workspace: crate::transaction::Workspace<K, V> =
crate::transaction::Workspace::new();
workspace.insert(key_clone, None); let mut serialized_data = Vec::new();
ciborium::into_writer(&workspace, &mut serialized_data).map_err(|e| {
FluxError::Persistence(PersistenceError::Serialization(e.to_string()))
})?;
engine.log(&serialized_data)?;
}
Ok(())
};
match tx_manager.commit(&tx, on_pre_commit, self.db.isolation_level) {
Ok(()) => Ok(result),
Err(e) => Err(e),
}
}
}
pub fn range(&self, start: &K, end: &K) -> Result<Vec<(K, Arc<V>)>, FluxError> {
self.check_fatal_error()?;
if let Some(active_tx) = &self.active_tx {
active_tx
.range_scans
.write()
.unwrap()
.push((start.clone(), end.clone()));
let mut results: BTreeMap<K, Arc<V>> = self
.skiplist
.range(start, end, active_tx)
.into_iter()
.collect();
let workspace = active_tx.workspace.read().unwrap();
for (key, value) in workspace.iter() {
if key >= start && key <= end {
match value {
Some(v) => {
results.insert(key.clone(), v.clone());
}
None => {
results.remove(key.borrow());
}
}
}
}
Ok(results.into_iter().collect())
} else {
let tx_manager = self.skiplist.transaction_manager();
let tx = tx_manager.begin();
let results = self.skiplist.range(start, end, &tx);
tx_manager.commit(&tx, || Ok(()), self.db.isolation_level)?;
Ok(results)
}
}
pub fn range_stream<'a>(
&'a self,
start: &'a K,
end: &'a K,
) -> impl futures::stream::Stream<Item = Result<(K, Arc<V>), FluxError>> + 'a {
async_stream::stream! {
if let Err(e) = self.check_fatal_error() {
yield Err(e);
return;
}
if let Some(active_tx) = &self.active_tx {
active_tx
.range_scans
.write()
.unwrap()
.push((start.clone(), end.clone()));
let skiplist_stream = self.skiplist.range_stream(start, end, active_tx).peekable();
futures::pin_mut!(skiplist_stream);
let workspace = active_tx.workspace.read().unwrap();
let mut workspace_items: BTreeMap<K, Option<Arc<V>>> = workspace
.iter()
.filter(|(k, _)| *k >= start && *k <= end)
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
loop {
let skiplist_peek = skiplist_stream.as_mut().peek().await;
match skiplist_peek {
Some((skiplist_key, _)) => {
let workspace_key = workspace_items.keys().next().cloned();
if let Some(wk) = workspace_key {
if skiplist_key < &wk {
let (key, value) = skiplist_stream.as_mut().next().await.unwrap();
yield Ok((key, value));
} else if skiplist_key > &wk {
let (key, value_opt) = workspace_items.pop_first().unwrap();
if let Some(value) = value_opt {
yield Ok((key, value));
}
} else { let (key, value_opt) = workspace_items.pop_first().unwrap();
if let Some(value) = value_opt {
yield Ok((key, value));
}
let _ = skiplist_stream.as_mut().next().await;
}
} else {
while let Some((key, value)) = skiplist_stream.as_mut().next().await {
yield Ok((key, value));
}
break;
}
}
None => {
for (key, value_opt) in workspace_items {
if let Some(value) = value_opt {
yield Ok((key, value));
}
}
break;
}
}
}
} else {
let tx_manager = self.skiplist.transaction_manager();
let tx = tx_manager.begin();
let stream = self.skiplist.range_stream(start, end, &tx);
futures::pin_mut!(stream);
while let Some(item) = stream.next().await {
yield Ok(item);
}
if let Err(e) = tx_manager.commit(&tx, || Ok(()), self.db.isolation_level) {
yield Err(e);
}
}
}
}
pub fn begin(&mut self) -> Result<(), FluxError> {
self.check_fatal_error()?;
if self.active_tx.is_some() {
return Err(FluxError::TransactionAlreadyActive);
}
let tx_manager = self.skiplist.transaction_manager();
let tx = tx_manager.begin();
self.active_tx = Some(tx);
Ok(())
}
pub async fn commit(&mut self) -> Result<(), FluxError> {
self.check_fatal_error()?;
let active_tx = self
.active_tx
.take()
.ok_or(FluxError::NoActiveTransaction)?;
let workspace = active_tx.workspace.read().unwrap().clone();
for (key, value) in workspace.iter() {
match value {
Some(val) => {
self.skiplist
.insert(key.clone(), val.clone(), &active_tx)
.await;
}
None => {
self.skiplist.remove(key, &active_tx).await;
}
}
}
let tx_manager = self.skiplist.transaction_manager();
let persistence_engine = self.persistence_engine;
let on_pre_commit = || {
if let Some(engine) = persistence_engine {
if !workspace.is_empty() {
let mut serialized_data = Vec::new();
ciborium::into_writer(&workspace, &mut serialized_data).map_err(|e| {
FluxError::Persistence(PersistenceError::Serialization(e.to_string()))
})?;
engine.log(&serialized_data)?;
}
}
Ok(())
};
match tx_manager.commit(&active_tx, on_pre_commit, self.db.isolation_level) {
Ok(()) => {
if self.db.eviction_policy == EvictionPolicy::Manual {
if let Some(max_mem) = self.db.max_memory_bytes {
if self.db.current_memory_bytes.load(Ordering::Relaxed) > max_mem {
return Err(FluxError::MemoryLimitExceeded);
}
}
} else {
self.db.evict_if_needed(None).await;
}
Ok(())
}
Err(e) => {
Err(e)
}
}
}
pub fn rollback(&mut self) -> Result<(), FluxError> {
self.check_fatal_error()?;
let active_tx = self
.active_tx
.take()
.ok_or(FluxError::NoActiveTransaction)?;
let tx_manager = self.skiplist.transaction_manager();
tx_manager.abort(&active_tx);
Ok(())
}
pub fn savepoint(&mut self, name: &str) -> Result<(), FluxError> {
self.check_fatal_error()?;
let active_tx = self
.active_tx
.as_ref()
.ok_or(FluxError::NoActiveTransaction)?;
let workspace = active_tx.workspace.read().unwrap();
let mut savepoints = active_tx.savepoints.write().unwrap();
savepoints.push((name.to_string(), workspace.clone()));
Ok(())
}
pub fn rollback_to(&mut self, name: &str) -> Result<(), FluxError> {
self.check_fatal_error()?;
let active_tx = self
.active_tx
.as_ref()
.ok_or(FluxError::NoActiveTransaction)?;
let mut savepoints = active_tx.savepoints.write().unwrap();
if let Some(pos) = savepoints.iter().rposition(|(sp_name, _)| sp_name == name) {
let (_, saved_workspace) = savepoints[pos].clone();
let mut current_workspace = active_tx.workspace.write().unwrap();
*current_workspace = saved_workspace;
savepoints.truncate(pos);
Ok(())
} else {
Err(FluxError::SavepointNotFound(name.to_string()))
}
}
pub async fn transaction<F, T, E>(&mut self, f: F) -> Result<T, E>
where
F: for<'a> FnOnce(&'a mut Self) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
E: From<FluxError>,
{
self.check_fatal_error()?;
self.begin()?;
let result = f(self).await;
if result.is_ok() {
match self.commit().await {
Ok(_) => result, Err(commit_err) => {
Err(commit_err.into())
}
}
} else {
self.rollback().unwrap_or_else(|_| {
});
result }
}
pub fn prefix_scan(&self, prefix: &str) -> Result<Vec<(K, Arc<V>)>, FluxError> {
self.check_fatal_error()?;
if let Some(active_tx) = &self.active_tx {
active_tx
.prefix_scans
.write()
.unwrap()
.push(prefix.to_string());
let mut results: BTreeMap<K, Arc<V>> = self
.skiplist
.prefix_scan(prefix, active_tx)
.into_iter()
.collect();
let workspace = active_tx.workspace.read().unwrap();
for (key, value) in workspace.iter() {
if key.borrow().starts_with(prefix) {
match value {
Some(v) => {
results.insert(key.clone(), v.clone());
}
None => {
results.remove(key.borrow());
}
}
}
}
Ok(results.into_iter().collect())
} else {
let tx_manager = self.skiplist.transaction_manager();
let tx = tx_manager.begin();
let results = self.skiplist.prefix_scan(prefix, &tx);
tx_manager.commit(&tx, || Ok(()), self.db.isolation_level)?;
Ok(results)
}
}
pub fn prefix_scan_stream<'a>(
&'a self,
prefix: &'a str,
) -> impl futures::stream::Stream<Item = Result<(K, Arc<V>), FluxError>> + 'a {
async_stream::stream! {
if let Err(e) = self.check_fatal_error() {
yield Err(e);
return;
}
if let Some(active_tx) = &self.active_tx {
active_tx
.prefix_scans
.write()
.unwrap()
.push(prefix.to_string());
let skiplist_stream = self.skiplist.prefix_scan_stream(prefix, active_tx).peekable();
futures::pin_mut!(skiplist_stream);
let workspace = active_tx.workspace.read().unwrap();
let mut workspace_items: BTreeMap<K, Option<Arc<V>>> = workspace
.iter()
.filter(|(k, _)| <K as Borrow<str>>::borrow(k).starts_with(prefix))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
loop {
let skiplist_peek = skiplist_stream.as_mut().peek().await;
match skiplist_peek {
Some((skiplist_key, _)) => {
let workspace_key = workspace_items.keys().next().cloned();
if let Some(wk) = workspace_key {
if skiplist_key < &wk {
let (key, value) = skiplist_stream.as_mut().next().await.unwrap();
yield Ok((key, value));
} else if skiplist_key > &wk {
let (key, value_opt) = workspace_items.pop_first().unwrap();
if let Some(value) = value_opt {
yield Ok((key, value));
}
} else { let (key, value_opt) = workspace_items.pop_first().unwrap();
if let Some(value) = value_opt {
yield Ok((key, value));
}
let _ = skiplist_stream.as_mut().next().await;
}
} else {
while let Some((key, value)) = skiplist_stream.as_mut().next().await {
yield Ok((key, value));
}
break;
}
}
None => {
for (key, value_opt) in workspace_items {
if let Some(value) = value_opt {
yield Ok((key, value));
}
}
break;
}
}
}
} else {
let tx_manager = self.skiplist.transaction_manager();
let tx = tx_manager.begin();
let stream = self.skiplist.prefix_scan_stream(prefix, &tx);
futures::pin_mut!(stream);
while let Some(item) = stream.next().await {
yield Ok(item);
}
if let Err(e) = tx_manager.commit(&tx, || Ok(()), self.db.isolation_level) {
yield Err(e);
}
}
}
}
}