use std::collections::{BTreeMap, BTreeSet};
use std::sync::{Arc, Mutex, PoisonError};
use sources_core::cdc::AckSink;
#[derive(Debug)]
pub(crate) struct AckShared {
inner: Mutex<AckInner>,
}
#[derive(Debug)]
struct AckInner {
next_seq: u64,
lowest_unconfirmed: u64,
confirmed_ahead: BTreeSet<u64>,
lsn_by_seq: BTreeMap<u64, u64>,
confirmed_lsn: u64,
}
impl AckShared {
pub(crate) fn new(start_lsn: u64) -> Self {
Self {
inner: Mutex::new(AckInner {
next_seq: 0,
lowest_unconfirmed: 0,
confirmed_ahead: BTreeSet::new(),
lsn_by_seq: BTreeMap::new(),
confirmed_lsn: start_lsn,
}),
}
}
fn lock(&self) -> std::sync::MutexGuard<'_, AckInner> {
self.inner.lock().unwrap_or_else(PoisonError::into_inner)
}
pub(crate) fn register(&self, lsn: u64) -> u64 {
let mut inner = self.lock();
let seq = inner.next_seq;
inner.next_seq += 1;
inner.lsn_by_seq.insert(seq, lsn);
seq
}
pub(crate) fn confirmed_lsn(&self) -> u64 {
self.lock().confirmed_lsn
}
fn confirm(&self, seq: u64) {
let mut inner = self.lock();
if seq < inner.lowest_unconfirmed {
return; }
if seq > inner.lowest_unconfirmed {
inner.confirmed_ahead.insert(seq);
return;
}
let mut current = seq;
loop {
if let Some(lsn) = inner.lsn_by_seq.remove(¤t)
&& lsn > inner.confirmed_lsn
{
inner.confirmed_lsn = lsn;
}
let next = current + 1;
inner.lowest_unconfirmed = next;
if inner.confirmed_ahead.remove(&next) {
current = next;
} else {
break;
}
}
}
}
#[derive(Debug)]
pub(crate) struct WalAckSink {
shared: Arc<AckShared>,
}
impl WalAckSink {
pub(crate) fn new(shared: Arc<AckShared>) -> Self {
Self { shared }
}
}
impl AckSink for WalAckSink {
fn confirm(&self, seq: u64) {
self.shared.confirm(seq);
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests;