sources_core/cdc/ack.rs
1use std::sync::Arc;
2
3/// Confirms that one [`Change`](super::Change) was durably processed downstream
4/// — written to the sink and checkpointed. Returned inside every change.
5///
6/// Each change carries a monotonically increasing sequence number. The
7/// mechanism only advances its durable resume point (for WAL, a replication
8/// slot's `confirmed_flush_lsn`) to the highest **contiguous** confirmed
9/// sequence. Confirming out of order is therefore safe: a gap simply holds the
10/// resume point back until it is filled, and nothing already confirmed is
11/// re-sent.
12///
13/// Dropping an `Ack` without calling [`confirm`](Self::confirm) does *not*
14/// confirm the change. It will be redelivered after a restart — this is the
15/// at-least-once guarantee.
16#[derive(Debug)]
17pub struct Ack {
18 seq: u64,
19 sink: Arc<dyn AckSink>,
20}
21
22impl Ack {
23 /// Build an `Ack` for the change with sequence number `seq`. Called by a
24 /// mechanism implementation as it emits each change.
25 pub fn new(seq: u64, sink: Arc<dyn AckSink>) -> Self {
26 Self { seq, sink }
27 }
28
29 /// Confirm that this change was durably processed.
30 pub fn confirm(self) {
31 self.sink.confirm(self.seq);
32 }
33}
34
35/// The mechanism-side endpoint an [`Ack`] reports back to.
36///
37/// Implemented in the source crate — for example, mapping a sequence number to
38/// its LSN and advancing the replication slot once the confirmed sequences are
39/// contiguous. Implementations must be cheap to call and safe to call out of
40/// order.
41pub trait AckSink: std::fmt::Debug + Send + Sync {
42 /// Record that the change with this sequence number was durably processed.
43 fn confirm(&self, seq: u64);
44}
45
46// The design depends on an `Ack` being movable across threads/tasks and held
47// arbitrarily long before [`Ack::confirm`] is called — confirmation happens
48// externally, asynchronously, and out of order. Lock that property in at
49// compile time so it can never silently regress (e.g. if `AckSink` ever lost
50// its `Send + Sync` bound).
51const _: () = {
52 const fn assert_send_sync_static<T: Send + Sync + 'static>() {}
53 assert_send_sync_static::<Ack>();
54};