use crate::oracle::Oracle;
#[cfg(not(target_arch = "wasm32"))]
use crate::persistence::Persistence;
use crate::queue::{Commit, Merge};
use crate::versions::Versions;
use crate::DatabaseOptions;
use bytes::Bytes;
use crossbeam_queue::SegQueue;
use crossbeam_skiplist::SkipMap;
use ferntree::Tree;
use parking_lot::RwLock;
use std::sync::atomic::{fence, AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use std::thread::JoinHandle;
use std::time::Duration;
pub(crate) const COUNTER_TOMBSTONE: u64 = u64::MAX;
pub struct Inner {
pub(crate) oracle: Arc<Oracle>,
pub(crate) datastore: Tree<Bytes, Versions>,
pub(crate) counter_by_oracle: SkipMap<u64, Arc<AtomicU64>>,
pub(crate) counter_by_commit: SkipMap<u64, Arc<AtomicU64>>,
pub(crate) transaction_queue_id: AtomicU64,
pub(crate) transaction_commit_id: AtomicU64,
pub(crate) transaction_merge_id: AtomicU64,
pub(crate) transaction_commit_queue: SkipMap<u64, Arc<Commit>>,
pub(crate) transaction_merge_queue: SkipMap<u64, Arc<Merge>>,
pub(crate) garbage_collection_epoch: RwLock<Option<Duration>>,
#[cfg(not(target_arch = "wasm32"))]
pub(crate) persistence: RwLock<Option<Arc<Persistence>>>,
pub(crate) background_threads_enabled: AtomicBool,
#[cfg(not(target_arch = "wasm32"))]
pub(crate) transaction_cleanup_handle: RwLock<Option<JoinHandle<()>>>,
#[cfg(not(target_arch = "wasm32"))]
pub(crate) garbage_collection_handle: RwLock<Option<JoinHandle<()>>>,
pub(crate) gc_dirty_keys: SegQueue<Bytes>,
pub(crate) gc_floor: AtomicU64,
pub(crate) reset_threshold: usize,
}
impl Inner {
pub fn new(opts: &DatabaseOptions) -> Self {
Self {
oracle: Oracle::new(opts.resync_interval),
datastore: Tree::new(),
counter_by_oracle: SkipMap::new(),
counter_by_commit: SkipMap::new(),
transaction_queue_id: AtomicU64::new(0),
transaction_commit_id: AtomicU64::new(0),
transaction_merge_id: AtomicU64::new(0),
transaction_commit_queue: SkipMap::new(),
transaction_merge_queue: SkipMap::new(),
garbage_collection_epoch: RwLock::new(None),
#[cfg(not(target_arch = "wasm32"))]
persistence: RwLock::new(None),
background_threads_enabled: AtomicBool::new(true),
#[cfg(not(target_arch = "wasm32"))]
transaction_cleanup_handle: RwLock::new(None),
#[cfg(not(target_arch = "wasm32"))]
garbage_collection_handle: RwLock::new(None),
gc_dirty_keys: SegQueue::new(),
gc_floor: AtomicU64::new(0),
reset_threshold: opts.reset_threshold,
}
}
}
impl Inner {
#[inline]
pub(crate) fn earliest_active_version(&self, fallback: u64) -> u64 {
earliest_active(&self.counter_by_oracle, fallback)
}
#[inline]
pub(crate) fn earliest_active_commit(&self, fallback: u64) -> u64 {
earliest_active(&self.counter_by_commit, fallback)
}
}
#[inline]
fn earliest_active(map: &SkipMap<u64, Arc<AtomicU64>>, fallback: u64) -> u64 {
fence(Ordering::SeqCst);
for entry in map.iter() {
let c = entry.value().load(Ordering::Acquire);
if c != 0 && c != COUNTER_TOMBSTONE {
return *entry.key();
}
}
fallback
}
impl Default for Inner {
fn default() -> Self {
Self::new(&DatabaseOptions::default())
}
}