use slatedb_txn_obj::DirtyObject;
use tokio::sync::watch;
use crate::db_state::ManifestCore;
use crate::error::SlateDBError;
use crate::manifest::Manifest;
use crate::utils::WatchableOnceCell;
use crate::CloseReason;
#[derive(Clone, Debug, PartialEq)]
pub struct VersionedManifest {
pub id: u64,
pub manifest: ManifestCore,
}
impl From<DirtyObject<Manifest>> for VersionedManifest {
fn from(dirty: DirtyObject<Manifest>) -> Self {
Self {
id: dirty.id.id(),
manifest: dirty.value.core,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct DbStatus {
pub durable_seq: u64,
pub current_manifest: VersionedManifest,
pub close_reason: Option<CloseReason>,
}
pub(crate) trait ClosedResultWriter: std::fmt::Debug + Send + Sync + 'static {
fn write_result(&self, result: Result<(), SlateDBError>);
fn result_reader(&self) -> crate::utils::WatchableOnceCellReader<Result<(), SlateDBError>>;
}
#[derive(Clone, Debug)]
pub(crate) struct DbStatusManager {
cell: WatchableOnceCell<Result<(), SlateDBError>>,
tx: watch::Sender<DbStatus>,
}
impl DbStatusManager {
#[cfg(test)]
pub(crate) fn new(initial_durable_seq: u64) -> Self {
Self::new_with_manifest(
initial_durable_seq,
VersionedManifest {
id: 1,
manifest: ManifestCore::new(),
},
)
}
pub(crate) fn new_with_manifest(
initial_durable_seq: u64,
initial_manifest: VersionedManifest,
) -> Self {
let (tx, _) = watch::channel(DbStatus {
durable_seq: initial_durable_seq,
current_manifest: initial_manifest,
close_reason: None,
});
Self {
cell: WatchableOnceCell::new(),
tx,
}
}
pub(crate) fn report_durable_seq(&self, seq: u64) {
self.tx.send_if_modified(|s| {
if seq > s.durable_seq {
s.durable_seq = seq;
true
} else {
false
}
});
}
pub(crate) fn report_manifest(&self, versioned: VersionedManifest) {
self.tx.send_if_modified(|s| {
if versioned.id >= s.current_manifest.id && s.current_manifest != versioned {
s.current_manifest = versioned;
true
} else {
false
}
});
}
fn report_closed(&self, reason: CloseReason) {
self.tx.send_if_modified(|s| {
if s.close_reason.is_none() {
s.close_reason = Some(reason);
true
} else {
false
}
});
}
pub(crate) fn subscribe(&self) -> watch::Receiver<DbStatus> {
self.tx.subscribe()
}
pub(crate) fn status(&self) -> DbStatus {
self.tx.borrow().clone()
}
}
impl ClosedResultWriter for WatchableOnceCell<Result<(), SlateDBError>> {
fn write_result(&self, result: Result<(), SlateDBError>) {
self.write(result);
}
fn result_reader(&self) -> crate::utils::WatchableOnceCellReader<Result<(), SlateDBError>> {
self.reader()
}
}
impl ClosedResultWriter for DbStatusManager {
fn write_result(&self, result: Result<(), SlateDBError>) {
let reason = match &result {
Ok(()) => CloseReason::Clean,
Err(err) => CloseReason::from(crate::Error::from(err.clone()).kind()),
};
if self.cell.write(result) {
self.report_closed(reason);
}
}
fn result_reader(&self) -> crate::utils::WatchableOnceCellReader<Result<(), SlateDBError>> {
self.cell.reader()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn versioned_manifest(id: u64) -> VersionedManifest {
VersionedManifest {
id,
manifest: ManifestCore::new(),
}
}
#[test]
fn should_not_notify_on_same_manifest() {
let initial = versioned_manifest(1);
let mgr = DbStatusManager::new_with_manifest(0, initial.clone());
let mut rx = mgr.subscribe();
rx.borrow_and_update();
mgr.report_manifest(initial);
assert!(!rx.has_changed().unwrap());
}
#[test]
fn should_not_notify_on_older_manifest() {
let mgr = DbStatusManager::new_with_manifest(0, versioned_manifest(5));
let mut rx = mgr.subscribe();
rx.borrow_and_update();
mgr.report_manifest(versioned_manifest(3));
assert!(!rx.has_changed().unwrap());
}
}