use std::collections::{BTreeMap, BTreeSet};
use std::sync::{Arc, Mutex, PoisonError};
use sources_core::cdc::AckSink;
#[derive(Debug)]
pub(crate) struct AckShared {
inner: Mutex<AckInner>,
}
#[derive(Debug)]
struct AckInner {
next_seq: u64,
lowest_unconfirmed: u64,
confirmed_ahead: BTreeSet<u64>,
lsn_by_seq: BTreeMap<u64, u64>,
confirmed_lsn: u64,
}
impl AckShared {
pub(crate) fn new(start_lsn: u64) -> Self {
Self {
inner: Mutex::new(AckInner {
next_seq: 0,
lowest_unconfirmed: 0,
confirmed_ahead: BTreeSet::new(),
lsn_by_seq: BTreeMap::new(),
confirmed_lsn: start_lsn,
}),
}
}
fn lock(&self) -> std::sync::MutexGuard<'_, AckInner> {
self.inner.lock().unwrap_or_else(PoisonError::into_inner)
}
pub(crate) fn register(&self, lsn: u64) -> u64 {
let mut inner = self.lock();
let seq = inner.next_seq;
inner.next_seq += 1;
inner.lsn_by_seq.insert(seq, lsn);
seq
}
pub(crate) fn confirmed_lsn(&self) -> u64 {
self.lock().confirmed_lsn
}
fn confirm(&self, seq: u64) {
let mut inner = self.lock();
if seq < inner.lowest_unconfirmed {
return; }
if seq > inner.lowest_unconfirmed {
inner.confirmed_ahead.insert(seq);
return;
}
let mut current = seq;
loop {
if let Some(lsn) = inner.lsn_by_seq.remove(¤t)
&& lsn > inner.confirmed_lsn
{
inner.confirmed_lsn = lsn;
}
let next = current + 1;
inner.lowest_unconfirmed = next;
if inner.confirmed_ahead.remove(&next) {
current = next;
} else {
break;
}
}
}
}
#[derive(Debug)]
pub(crate) struct WalAckSink {
shared: Arc<AckShared>,
}
impl WalAckSink {
pub(crate) fn new(shared: Arc<AckShared>) -> Self {
Self { shared }
}
}
impl AckSink for WalAckSink {
fn confirm(&self, seq: u64) {
self.shared.confirm(seq);
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn in_order_confirmations_advance_watermark() {
let s = AckShared::new(0);
let a = s.register(10);
let b = s.register(20);
assert_eq!(s.confirmed_lsn(), 0);
s.confirm(a);
assert_eq!(s.confirmed_lsn(), 10);
s.confirm(b);
assert_eq!(s.confirmed_lsn(), 20);
}
#[test]
fn out_of_order_confirmation_holds_until_gap_fills() {
let s = AckShared::new(0);
let a = s.register(10);
let b = s.register(20);
let c = s.register(30);
s.confirm(c); assert_eq!(s.confirmed_lsn(), 0);
s.confirm(b); assert_eq!(s.confirmed_lsn(), 0);
s.confirm(a); assert_eq!(s.confirmed_lsn(), 30);
}
#[test]
fn never_regresses_below_start_lsn() {
let s = AckShared::new(100);
let a = s.register(50); s.confirm(a);
assert_eq!(s.confirmed_lsn(), 100);
}
}