uni_plugin/traits/cdc.rs
1//! CDC output / logical-replication plugins.
2
3use std::sync::Arc;
4use std::time::SystemTime;
5
6use datafusion::arrow::record_batch::RecordBatch;
7
8use crate::errors::FnError;
9
10/// Logical sequence number for change-data-capture.
11#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
12pub struct CdcLsn(pub u64);
13
14/// Per-instance start context for a CDC sink.
15#[derive(Debug)]
16#[non_exhaustive]
17pub struct CdcStartContext<'a> {
18 /// LSN to resume from (`None` for fresh streams).
19 pub from_lsn: Option<CdcLsn>,
20 /// Lifetime marker — host adapter wires session reference.
21 pub _marker: std::marker::PhantomData<&'a ()>,
22}
23
24impl<'a> CdcStartContext<'a> {
25 /// Construct a fresh context. The struct is `#[non_exhaustive]` so
26 /// external callers can't use struct-literal syntax; this
27 /// constructor is the supported path.
28 #[must_use]
29 pub fn new(from_lsn: Option<CdcLsn>) -> Self {
30 Self {
31 from_lsn,
32 _marker: std::marker::PhantomData,
33 }
34 }
35}
36
37/// A batch of CDC events with the LSN range it covers.
38#[derive(Clone, Debug)]
39pub struct CdcBatch {
40 /// Inclusive start of the LSN range.
41 pub lsn_start: CdcLsn,
42 /// Exclusive end of the LSN range.
43 pub lsn_end: CdcLsn,
44 /// Schema-stable mutation events as a typed batch.
45 pub mutations: Arc<RecordBatch>,
46 /// Wall-clock timestamp of the source commit.
47 pub commit_timestamp: SystemTime,
48}
49
50/// A CDC-output provider — produces an `Arc<dyn CdcStream>` on start.
51pub trait CdcOutputProvider: Send + Sync {
52 /// Provider name (`"kafka"`, `"pulsar"`, `"jsonl"`, …).
53 fn name(&self) -> &str;
54
55 /// Start a new CDC stream.
56 ///
57 /// # Errors
58 ///
59 /// Returns [`FnError`] if the sink cannot be initialized.
60 fn start(&self, ctx: CdcStartContext<'_>) -> Result<Box<dyn CdcStream>, FnError>;
61}
62
63/// A live CDC sink instance.
64pub trait CdcStream: Send {
65 /// Deliver a batch to the sink.
66 ///
67 /// # Errors
68 ///
69 /// Returns [`FnError`] on delivery failure (network error, queue full).
70 fn deliver(&mut self, batch: &CdcBatch) -> Result<(), FnError>;
71
72 /// Acknowledge progress — host advances retention to this LSN.
73 ///
74 /// # Errors
75 ///
76 /// Returns [`FnError`] if the checkpoint cannot be persisted.
77 fn checkpoint(&mut self) -> Result<CdcLsn, FnError>;
78
79 /// Gracefully shut down the sink.
80 ///
81 /// # Errors
82 ///
83 /// Returns [`FnError`] if shutdown fails (network errors, etc.).
84 fn shutdown(&mut self) -> Result<(), FnError>;
85}