use super::*;
pub(super) type FlushEpoch = Arc<AtomicU64>;
pub(super) struct FlushEpochDurable {
tag: ProviderTag,
seq: AtomicU64,
flush_epoch: FlushEpoch,
pub(super) flushes: AtomicU64,
append_epoch: Mutex<BTreeMap<u64, u64>>,
}
impl FlushEpochDurable {
pub(super) fn new(tag: &[u8; 4], flush_epoch: FlushEpoch) -> Arc<Self> {
Arc::new(Self {
tag: ProviderTag(*tag),
seq: AtomicU64::new(0),
flush_epoch,
flushes: AtomicU64::new(0),
append_epoch: Mutex::new(BTreeMap::new()),
})
}
pub(super) fn append_epoch_of(&self, seq: u64) -> u64 {
*self
.append_epoch
.lock()
.unwrap()
.get(&seq)
.expect("append epoch recorded for seq")
}
}
impl DurableProvider for FlushEpochDurable {
fn provider_tag(&self) -> ProviderTag {
self.tag
}
fn write_commit(
&self,
_principal: Option<&Arc<[u8]>>,
_changes: &[Change],
_timestamp: HlcTimestamp,
) -> Result<u64, ProviderError> {
let seq = self.seq.fetch_add(1, Ordering::SeqCst) + 1;
let epoch = self.flush_epoch.load(Ordering::SeqCst);
self.append_epoch.lock().unwrap().insert(seq, epoch);
Ok(seq)
}
fn flush(&self) -> Result<Option<u64>, ProviderError> {
self.flushes.fetch_add(1, Ordering::SeqCst);
self.flush_epoch.fetch_add(1, Ordering::SeqCst);
Ok(Some(self.seq.load(Ordering::SeqCst)))
}
}
pub(super) struct FlushEpochObserver {
tag: ProviderTag,
flush_epoch: FlushEpoch,
publish_epoch: Mutex<BTreeMap<u64, u64>>,
}
impl FlushEpochObserver {
pub(super) fn new(tag: &[u8; 4], flush_epoch: FlushEpoch) -> Arc<Self> {
Arc::new(Self {
tag: ProviderTag(*tag),
flush_epoch,
publish_epoch: Mutex::new(BTreeMap::new()),
})
}
pub(super) fn publish_epoch_of(&self, id: u64) -> u64 {
*self
.publish_epoch
.lock()
.unwrap()
.get(&id)
.expect("publish epoch recorded for node id")
}
}
impl IndexProvider for FlushEpochObserver {
fn provider_tag(&self) -> ProviderTag {
self.tag
}
fn read_section(&self, _sub: SubTag, _bytes: &[u8]) -> Result<(), ProviderError> {
Ok(())
}
fn write_section(&self, _sub: SubTag) -> Result<Vec<u8>, ProviderError> {
Ok(Vec::new())
}
fn on_change(&self, change: &Change) -> Result<(), ProviderError> {
if let Change::NodeCreated { id, .. } = change {
let epoch = self.flush_epoch.load(Ordering::SeqCst);
self.publish_epoch.lock().unwrap().insert(id.get(), epoch);
}
Ok(())
}
fn declared_sub_tags(&self) -> &[SubTag] {
&[]
}
}