Skip to main content

uni_plugin_host/
cdc_runtime.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! M11 FU-4 — change-data-capture (CDC) runtime.
5//!
6//! Drives every registered [`uni_plugin::traits::cdc::CdcOutputProvider`]
7//! by subscribing to the commit broadcaster and converting each
8//! `crate::notifications::CommitNotification` into a
9//! [`uni_plugin::traits::cdc::CdcBatch`] delivered to every active
10//! stream.
11//!
12//! ## Lifecycle
13//!
14//! - At `Uni::build` time, [`CdcRuntime::spawn`] takes a snapshot of
15//!   the registered CDC providers, loads each provider's last
16//!   committed LSN from the JSON-sidecar
17//!   `<data_path>/_system/cdc_checkpoints.json`, and calls
18//!   `provider.start(CdcStartContext { from_lsn })` to obtain a live
19//!   [`uni_plugin::traits::cdc::CdcStream`]. The runtime spawns a
20//!   tokio task that subscribes to the commit broadcaster and
21//!   forwards each commit as a `CdcBatch` to every stream.
22//! - Per-commit, after every stream has accepted the batch, the
23//!   runtime calls `checkpoint()` on each stream and persists the
24//!   returned LSN to the sidecar. On restart, providers resume from
25//!   that LSN.
26//! - On shutdown the runtime calls `shutdown()` on each stream and
27//!   exits.
28//!
29//! ## v1 limitations
30//!
31//! `CdcBatch::mutations` ships as an empty single-row `RecordBatch`
32//! today — the LSN advancement, ordering, and checkpoint round-trip
33//! are the parts under test. Filling the batch with the actual
34//! mutation rows uses the same machinery as
35//! `crate::triggers::MutationEvents` and is tracked as a follow-up.
36
37// Rust guideline compliant
38
39use std::path::PathBuf;
40use std::sync::Arc;
41use std::time::SystemTime;
42
43use parking_lot::Mutex;
44use serde::{Deserialize, Serialize};
45use tokio::sync::broadcast;
46use uni_plugin::PluginRegistry;
47use uni_plugin::traits::cdc::{CdcBatch, CdcLsn, CdcStartContext, CdcStream};
48
49use crate::notifications::CommitNotification;
50use crate::shutdown::ShutdownHandle;
51use uni_sidecar::VecSidecar;
52
53/// Per-provider checkpoint row written to the JSON sidecar.
54#[derive(Clone, Debug, Serialize, Deserialize)]
55pub struct PersistedCheckpoint {
56    /// Provider name (`CdcOutputProvider::name()`).
57    pub name: String,
58    /// Last successfully-acknowledged LSN.
59    pub last_lsn: u64,
60}
61
62/// JSON-sidecar checkpoint store at
63/// `<data_path>/_system/cdc_checkpoints.json`.
64#[derive(Clone, Debug)]
65pub struct CdcCheckpointSidecar {
66    sidecar: VecSidecar<PersistedCheckpoint>,
67}
68
69impl CdcCheckpointSidecar {
70    /// Construct rooted at `<data_path>/_system/cdc_checkpoints.json`.
71    #[must_use]
72    pub fn new(data_path: PathBuf) -> Self {
73        Self {
74            sidecar: VecSidecar::new(data_path, "cdc_checkpoints.json"),
75        }
76    }
77
78    /// Borrow the sidecar path (for diagnostics).
79    #[must_use]
80    pub fn path(&self) -> &std::path::Path {
81        self.sidecar.path()
82    }
83
84    /// Load all persisted checkpoints. Returns an empty vec if the
85    /// sidecar doesn't exist.
86    ///
87    /// # Errors
88    ///
89    /// Returns a free-form error string on I/O or parse failure.
90    pub fn load_all(&self) -> Result<Vec<PersistedCheckpoint>, String> {
91        self.sidecar.load().map_err(|e| e.to_string())
92    }
93
94    /// Write the full checkpoint set atomically.
95    ///
96    /// # Errors
97    ///
98    /// Returns a free-form error string on I/O failure.
99    pub fn write_all(&self, rows: &[PersistedCheckpoint]) -> Result<(), String> {
100        self.sidecar.store(rows).map_err(|e| e.to_string())
101    }
102
103    /// Look up the persisted LSN for a single provider.
104    #[must_use]
105    pub fn lookup(&self, name: &str) -> Option<CdcLsn> {
106        self.load_all()
107            .ok()
108            .and_then(|rows| rows.into_iter().find(|r| r.name == name))
109            .map(|r| CdcLsn(r.last_lsn))
110    }
111
112    /// Replace a single provider's LSN, leaving other providers
113    /// unchanged. Reads-modify-writes the full sidecar atomically.
114    ///
115    /// # Errors
116    ///
117    /// Returns a free-form error string on I/O or parse failure.
118    pub fn write_one(&self, name: &str, lsn: CdcLsn) -> Result<(), String> {
119        let mut rows = self.load_all()?;
120        if let Some(row) = rows.iter_mut().find(|r| r.name == name) {
121            row.last_lsn = lsn.0;
122        } else {
123            rows.push(PersistedCheckpoint {
124                name: name.to_owned(),
125                last_lsn: lsn.0,
126            });
127        }
128        self.write_all(&rows)
129    }
130}
131
132/// Wraps a live CDC stream with the provider's name and most-recent
133/// committed LSN.
134struct ActiveStream {
135    name: String,
136    stream: Box<dyn CdcStream>,
137}
138
139/// Resume `provider` from its persisted LSN and start its stream.
140///
141/// Returns the [`ActiveStream`] on success, or `None` (logged) on failure so
142/// the caller skips it. Shared by [`CdcRuntime::spawn`] (`late = false`) and
143/// [`CdcRuntime::discover_new_providers`] (`late = true`); the only difference
144/// is the log wording.
145fn start_stream(
146    checkpoint: Option<&CdcCheckpointSidecar>,
147    name: &str,
148    provider: &Arc<dyn uni_plugin::traits::cdc::CdcOutputProvider>,
149    late: bool,
150) -> Option<ActiveStream> {
151    let from_lsn = checkpoint.and_then(|c| c.lookup(name));
152    match provider.start(CdcStartContext::new(from_lsn)) {
153        Ok(stream) => {
154            if late {
155                tracing::info!(provider = %name, from_lsn = ?from_lsn, "CdcRuntime: late-registered provider started");
156            } else {
157                tracing::info!(provider = %name, from_lsn = ?from_lsn, "CdcRuntime: provider started");
158            }
159            Some(ActiveStream {
160                name: name.to_owned(),
161                stream,
162            })
163        }
164        Err(e) => {
165            if late {
166                tracing::warn!(provider = %name, error = %e, "CdcRuntime: late-registered provider start failed");
167            } else {
168                tracing::warn!(provider = %name, error = %e, "CdcRuntime: provider start failed; skipping");
169            }
170            None
171        }
172    }
173}
174
175/// Host-side CDC runtime that drives every registered provider on
176/// the commit broadcaster.
177///
178/// One per `Uni` instance. Constructed by [`Self::spawn`] in
179/// `Uni::build`; the running background task exits when
180/// `ShutdownHandle` signals shutdown.
181pub struct CdcRuntime {
182    /// Active streams keyed by provider name.
183    streams: Arc<Mutex<Vec<ActiveStream>>>,
184    /// Checkpoint sidecar (`None` when no local data path).
185    checkpoint: Option<CdcCheckpointSidecar>,
186    /// Shared plugin registry — consulted on every commit to discover
187    /// providers registered *after* `Uni::build` returned (e.g., via
188    /// `Uni::add_plugin`).
189    registry: Arc<PluginRegistry>,
190}
191
192impl std::fmt::Debug for CdcRuntime {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        let count = self.streams.lock().len();
195        f.debug_struct("CdcRuntime")
196            .field("active_streams", &count)
197            .field(
198                "checkpoint_path",
199                &self.checkpoint.as_ref().map(|c| c.path().to_path_buf()),
200            )
201            .finish()
202    }
203}
204
205impl CdcRuntime {
206    /// Construct and spawn the CDC runtime.
207    ///
208    /// Snapshots every registered [`uni_plugin::traits::cdc::CdcOutputProvider`],
209    /// resumes each from its last persisted LSN (via the sidecar at
210    /// `<data_path>/_system/cdc_checkpoints.json`), and starts a tokio
211    /// task that delivers each commit notification to every active
212    /// stream.
213    ///
214    /// When no providers are registered, this is a no-op fast path —
215    /// the background task is still spawned so dynamic
216    /// `Uni::add_plugin` registrations land as a future improvement,
217    /// but it currently subscribes once at startup.
218    #[must_use]
219    pub fn spawn(
220        registry: &Arc<PluginRegistry>,
221        commit_rx: broadcast::Receiver<Arc<CommitNotification>>,
222        data_path: Option<PathBuf>,
223        shutdown: &ShutdownHandle,
224    ) -> Arc<Self> {
225        let checkpoint = data_path.map(CdcCheckpointSidecar::new);
226
227        let mut active: Vec<ActiveStream> = Vec::new();
228        for (name, provider) in registry.cdc_outputs_snapshot() {
229            if let Some(stream) = start_stream(checkpoint.as_ref(), name.as_str(), &provider, false)
230            {
231                active.push(stream);
232            }
233        }
234
235        let runtime = Arc::new(Self {
236            streams: Arc::new(Mutex::new(active)),
237            checkpoint,
238            registry: Arc::clone(registry),
239        });
240
241        // Spawn the driver task. When the broadcast channel sends an
242        // Err (lagged or closed) we re-loop; on `recv` of an
243        // `Arc<CommitNotification>` we forward.
244        let runtime_clone = Arc::clone(&runtime);
245        let mut commit_rx = commit_rx;
246        let mut shutdown_rx = shutdown.subscribe();
247        let handle = tokio::spawn(async move {
248            loop {
249                tokio::select! {
250                    biased;
251                    _ = shutdown_rx.recv() => {
252                        runtime_clone.shutdown_streams();
253                        break;
254                    }
255                    next = commit_rx.recv() => match next {
256                        Ok(notif) => runtime_clone.deliver_commit(&notif),
257                        Err(broadcast::error::RecvError::Lagged(n)) => {
258                            tracing::warn!(
259                                lagged = n,
260                                "CdcRuntime: commit broadcaster lagged",
261                            );
262                        }
263                        Err(broadcast::error::RecvError::Closed) => break,
264                    }
265                }
266            }
267        });
268        shutdown.track_task(handle);
269
270        runtime
271    }
272
273    /// Number of currently-active CDC streams (for diagnostics + tests).
274    #[must_use]
275    pub fn active_stream_count(&self) -> usize {
276        self.streams.lock().len()
277    }
278
279    /// Borrow the checkpoint sidecar, if local-disk persistence is
280    /// enabled. Used by tests to assert on persisted LSN.
281    #[must_use]
282    pub fn checkpoint_sidecar(&self) -> Option<&CdcCheckpointSidecar> {
283        self.checkpoint.as_ref()
284    }
285
286    /// Discover any providers registered after `Uni::build` (e.g.,
287    /// via `Uni::add_plugin`) and start a stream for each one. Called
288    /// at the start of every `deliver_commit` so dynamic
289    /// registrations don't miss any commits past the first.
290    fn discover_new_providers(&self) {
291        let snapshot = self.registry.cdc_outputs_snapshot();
292        let mut streams = self.streams.lock();
293        for (name, provider) in snapshot {
294            if streams.iter().any(|s| s.name == name.as_str()) {
295                continue;
296            }
297            if let Some(stream) =
298                start_stream(self.checkpoint.as_ref(), name.as_str(), &provider, true)
299            {
300                streams.push(stream);
301            }
302        }
303    }
304
305    /// Convert a single [`CommitNotification`] into a [`CdcBatch`] and
306    /// deliver it to every active stream, then checkpoint each
307    /// stream and persist the new LSN to the sidecar.
308    fn deliver_commit(&self, notif: &CommitNotification) {
309        self.discover_new_providers();
310        // FU-4: the broadcaster pre-materializes the mutation RecordBatch
311        // when at least one `CdcOutputProvider` is registered (see
312        // `Transaction::commit`). `None` here means either there were
313        // zero rows or the broadcaster ran without CDC subscribers
314        // (race: provider registered between the snapshot and now —
315        // discover_new_providers above picks them up for the *next*
316        // commit). Fall back to an empty batch matching the canonical
317        // event-row schema so downstream filters see consistent
318        // column types.
319        let mutations = notif.mutations.clone().unwrap_or_else(|| {
320            std::sync::Arc::new(arrow_array::RecordBatch::new_empty(
321                crate::triggers::event_row_schema(),
322            ))
323        });
324        let batch = CdcBatch {
325            lsn_start: CdcLsn(notif.causal_version),
326            lsn_end: CdcLsn(notif.version),
327            mutations,
328            commit_timestamp: SystemTime::now(),
329        };
330
331        let mut streams = self.streams.lock();
332        for active in streams.iter_mut() {
333            if let Err(e) = active.stream.deliver(&batch) {
334                tracing::warn!(
335                    provider = %active.name,
336                    error = %e,
337                    "CdcRuntime: deliver failed",
338                );
339                continue;
340            }
341            match active.stream.checkpoint() {
342                Ok(lsn) => {
343                    if let Some(sidecar) = &self.checkpoint
344                        && let Err(e) = sidecar.write_one(&active.name, lsn)
345                    {
346                        tracing::debug!(
347                            provider = %active.name,
348                            error = %e,
349                            "CdcRuntime: checkpoint write failed",
350                        );
351                    }
352                }
353                Err(e) => tracing::warn!(
354                    provider = %active.name,
355                    error = %e,
356                    "CdcRuntime: checkpoint failed",
357                ),
358            }
359        }
360    }
361
362    /// Call `shutdown()` on every active stream and drop them.
363    fn shutdown_streams(&self) {
364        let mut streams = self.streams.lock();
365        for active in streams.iter_mut() {
366            if let Err(e) = active.stream.shutdown() {
367                tracing::warn!(
368                    provider = %active.name,
369                    error = %e,
370                    "CdcRuntime: shutdown failed",
371                );
372            }
373        }
374        streams.clear();
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use tempfile::TempDir;
382
383    #[test]
384    fn checkpoint_sidecar_round_trip() {
385        let tmp = TempDir::new().unwrap();
386        let s = CdcCheckpointSidecar::new(tmp.path().to_path_buf());
387        assert!(s.load_all().unwrap().is_empty());
388        s.write_one("kafka", CdcLsn(42)).unwrap();
389        s.write_one("pulsar", CdcLsn(7)).unwrap();
390        let rows = s.load_all().unwrap();
391        assert_eq!(rows.len(), 2);
392        assert_eq!(s.lookup("kafka"), Some(CdcLsn(42)));
393        assert_eq!(s.lookup("pulsar"), Some(CdcLsn(7)));
394    }
395
396    #[test]
397    fn checkpoint_sidecar_survives_close_reopen() {
398        let tmp = TempDir::new().unwrap();
399        {
400            let s = CdcCheckpointSidecar::new(tmp.path().to_path_buf());
401            s.write_one("kafka", CdcLsn(99)).unwrap();
402        }
403        let s2 = CdcCheckpointSidecar::new(tmp.path().to_path_buf());
404        assert_eq!(s2.lookup("kafka"), Some(CdcLsn(99)));
405    }
406
407    #[test]
408    fn checkpoint_sidecar_overwrites_existing_provider() {
409        let tmp = TempDir::new().unwrap();
410        let s = CdcCheckpointSidecar::new(tmp.path().to_path_buf());
411        s.write_one("kafka", CdcLsn(1)).unwrap();
412        s.write_one("kafka", CdcLsn(2)).unwrap();
413        s.write_one("kafka", CdcLsn(3)).unwrap();
414        assert_eq!(s.lookup("kafka"), Some(CdcLsn(3)));
415        assert_eq!(s.load_all().unwrap().len(), 1);
416    }
417}