Skip to main content

uni_plugin/traits/
cdc.rs

1//! CDC output / logical-replication plugins.
2
3use std::sync::Arc;
4use std::time::SystemTime;
5
6use datafusion::arrow::record_batch::RecordBatch;
7
8use crate::errors::FnError;
9
10/// Logical sequence number for change-data-capture.
11#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
12pub struct CdcLsn(pub u64);
13
14/// Per-instance start context for a CDC sink.
15#[derive(Debug)]
16#[non_exhaustive]
17pub struct CdcStartContext<'a> {
18    /// LSN to resume from (`None` for fresh streams).
19    pub from_lsn: Option<CdcLsn>,
20    /// Lifetime marker — host adapter wires session reference.
21    pub _marker: std::marker::PhantomData<&'a ()>,
22}
23
24impl<'a> CdcStartContext<'a> {
25    /// Construct a fresh context. The struct is `#[non_exhaustive]` so
26    /// external callers can't use struct-literal syntax; this
27    /// constructor is the supported path.
28    #[must_use]
29    pub fn new(from_lsn: Option<CdcLsn>) -> Self {
30        Self {
31            from_lsn,
32            _marker: std::marker::PhantomData,
33        }
34    }
35}
36
37/// A batch of CDC events with the LSN range it covers.
38#[derive(Clone, Debug)]
39pub struct CdcBatch {
40    /// Inclusive start of the LSN range.
41    pub lsn_start: CdcLsn,
42    /// Exclusive end of the LSN range.
43    pub lsn_end: CdcLsn,
44    /// Schema-stable mutation events as a typed batch.
45    pub mutations: Arc<RecordBatch>,
46    /// Wall-clock timestamp of the source commit.
47    pub commit_timestamp: SystemTime,
48}
49
50/// A CDC-output provider — produces an `Arc<dyn CdcStream>` on start.
51pub trait CdcOutputProvider: Send + Sync {
52    /// Provider name (`"kafka"`, `"pulsar"`, `"jsonl"`, …).
53    fn name(&self) -> &str;
54
55    /// Start a new CDC stream.
56    ///
57    /// # Errors
58    ///
59    /// Returns [`FnError`] if the sink cannot be initialized.
60    fn start(&self, ctx: CdcStartContext<'_>) -> Result<Box<dyn CdcStream>, FnError>;
61}
62
63/// A live CDC sink instance.
64pub trait CdcStream: Send {
65    /// Deliver a batch to the sink.
66    ///
67    /// # Errors
68    ///
69    /// Returns [`FnError`] on delivery failure (network error, queue full).
70    fn deliver(&mut self, batch: &CdcBatch) -> Result<(), FnError>;
71
72    /// Acknowledge progress — host advances retention to this LSN.
73    ///
74    /// # Errors
75    ///
76    /// Returns [`FnError`] if the checkpoint cannot be persisted.
77    fn checkpoint(&mut self) -> Result<CdcLsn, FnError>;
78
79    /// Gracefully shut down the sink.
80    ///
81    /// # Errors
82    ///
83    /// Returns [`FnError`] if shutdown fails (network errors, etc.).
84    fn shutdown(&mut self) -> Result<(), FnError>;
85}