use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::SeqCst;
use crate::db_status::DbStatusManager;
pub(crate) trait Oracle: Send + Sync + 'static {
fn last_committed_seq(&self) -> u64;
fn last_remote_persisted_seq(&self) -> u64;
}
pub(crate) struct DbOracle {
last_seq: AtomicU64,
last_committed_seq: AtomicU64,
last_durable_seq: AtomicU64,
status_reporter: DbStatusManager,
}
impl DbOracle {
pub(crate) fn new(
last_seq: u64,
last_committed_seq: u64,
last_durable_seq: u64,
status_reporter: DbStatusManager,
) -> Self {
Self {
last_seq: AtomicU64::new(last_seq),
last_committed_seq: AtomicU64::new(last_committed_seq),
last_durable_seq: AtomicU64::new(last_durable_seq),
status_reporter,
}
}
pub(crate) fn next_seq(&self) -> u64 {
self.last_seq.fetch_add(1, SeqCst) + 1
}
pub(crate) fn peek_next_seq(&self) -> u64 {
self.last_seq.load(SeqCst) + 1
}
pub(crate) fn last_seq(&self) -> u64 {
self.last_seq.load(SeqCst)
}
pub(crate) fn advance_last_seq(&self, seq: u64) {
self.last_seq.fetch_max(seq, SeqCst);
}
pub(crate) fn advance_committed_seq(&self, seq: u64) {
self.last_committed_seq.fetch_max(seq, SeqCst);
}
pub(crate) fn advance_durable_seq(&self, seq: u64) {
self.last_durable_seq.fetch_max(seq, SeqCst);
self.status_reporter.report_durable_seq(seq);
}
#[cfg(test)]
pub(crate) fn set_durable_seq_unsafe(&self, value: u64) {
self.last_durable_seq.store(value, SeqCst);
self.status_reporter.report_durable_seq(value);
}
}
impl Oracle for DbOracle {
fn last_committed_seq(&self) -> u64 {
self.last_committed_seq.load(SeqCst)
}
fn last_remote_persisted_seq(&self) -> u64 {
self.last_durable_seq.load(SeqCst)
}
}
pub(crate) struct DbReaderOracle {
last_remote_persisted_seq: AtomicU64,
status_reporter: DbStatusManager,
}
impl DbReaderOracle {
pub(crate) fn new(last_remote_persisted_seq: u64, status_reporter: DbStatusManager) -> Self {
Self {
last_remote_persisted_seq: AtomicU64::new(last_remote_persisted_seq),
status_reporter,
}
}
pub(crate) fn advance_durable_seq(&self, seq: u64) {
self.last_remote_persisted_seq.fetch_max(seq, SeqCst);
self.status_reporter.report_durable_seq(seq);
}
}
impl Oracle for DbReaderOracle {
fn last_committed_seq(&self) -> u64 {
self.last_remote_persisted_seq.load(SeqCst)
}
fn last_remote_persisted_seq(&self) -> u64 {
self.last_remote_persisted_seq.load(SeqCst)
}
}