use std::{
borrow::Cow,
collections::HashMap,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use lru::LruCache;
use parking_lot::{Mutex, MutexGuard};
use super::{LogEntry, TransactionHandle, TreeLock, TreeLocks};
const UNINITIALIZED_ID: u64 = 0;
#[derive(Clone, Debug)]
pub struct State {
state: Arc<ActiveState>,
}
#[derive(Debug)]
struct ActiveState {
path: PathBuf,
current_transaction_id: AtomicU64,
tree_locks: Mutex<HashMap<Cow<'static, [u8]>, TreeLock>>,
log_position: Mutex<LogPosition>,
known_completed_transactions: Mutex<LruCache<u64, Option<u64>>>,
}
#[derive(Debug)]
pub struct LogPosition {
pub file_offset: u64,
pub last_written_transaction: u64,
}
impl Default for LogPosition {
fn default() -> Self {
Self {
file_offset: 0,
last_written_transaction: UNINITIALIZED_ID,
}
}
}
impl State {
pub fn from_path(path: impl AsRef<Path>) -> Self {
Self {
state: Arc::new(ActiveState {
path: path.as_ref().to_path_buf(),
tree_locks: Mutex::default(),
current_transaction_id: AtomicU64::new(UNINITIALIZED_ID),
log_position: Mutex::new(LogPosition::default()),
known_completed_transactions: Mutex::new(LruCache::new(1024)),
}),
}
}
pub(crate) fn initialize(&self, last_written_transaction: u64, log_position: u64) {
let mut state_position = self.state.log_position.lock();
self.state
.current_transaction_id
.compare_exchange(
UNINITIALIZED_ID,
last_written_transaction + 1,
Ordering::SeqCst,
Ordering::SeqCst,
)
.expect("state already initialized");
state_position.file_offset = log_position;
state_position.last_written_transaction = last_written_transaction;
}
#[must_use]
pub fn current_transaction_id(&self) -> Option<u64> {
let position = self.state.log_position.lock();
match position.last_written_transaction {
UNINITIALIZED_ID => None,
other => Some(other),
}
}
#[must_use]
pub fn next_transaction_id(&self) -> u64 {
self.state.current_transaction_id.load(Ordering::SeqCst)
}
#[must_use]
pub fn path(&self) -> &Path {
&self.state.path
}
#[must_use]
pub fn len(&self) -> u64 {
let position = self.state.log_position.lock();
position.file_offset
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
fn fetch_tree_locks<'a>(&self, trees: impl Iterator<Item = &'a [u8]>, locks: &mut TreeLocks) {
let mut trees = trees.collect::<Vec<_>>();
trees.sort_unstable();
let mut tree_locks = self.state.tree_locks.lock();
for tree in trees {
if let Some(lock) = tree_locks.get(&Cow::Borrowed(tree)) {
locks.push(lock.lock());
} else {
let lock = TreeLock::new();
let locked = lock.lock();
tree_locks.insert(Cow::Owned(tree.to_vec()), lock);
locks.push(locked);
}
}
}
#[must_use]
pub fn new_transaction<
'a,
I: IntoIterator<Item = &'a [u8], IntoIter = II>,
II: ExactSizeIterator<Item = &'a [u8]>,
>(
&self,
trees: I,
) -> TransactionHandle {
let trees = trees.into_iter();
let mut locked_trees = Vec::with_capacity(trees.len());
self.fetch_tree_locks(trees, &mut locked_trees);
TransactionHandle {
locked_trees,
transaction: LogEntry {
id: self
.state
.current_transaction_id
.fetch_add(1, Ordering::SeqCst),
data: None,
},
}
}
pub(crate) fn note_transaction_id_status(&self, transaction_id: u64, position: Option<u64>) {
let mut cache = self.state.known_completed_transactions.lock();
cache.put(transaction_id, position);
}
pub(crate) fn note_transaction_ids_completed(&self, transaction_ids: &[(u64, Option<u64>)]) {
let mut cache = self.state.known_completed_transactions.lock();
for (id, position) in transaction_ids {
cache.put(*id, *position);
}
}
#[allow(clippy::option_option)]
pub(crate) fn transaction_id_position(&self, transaction_id: u64) -> Option<Option<u64>> {
let mut cache = self.state.known_completed_transactions.lock();
cache.get(&transaction_id).copied()
}
}
impl State {
pub(crate) fn lock_for_write(&self) -> MutexGuard<'_, LogPosition> {
self.state.log_position.lock()
}
}