uni_plugin_host/cdc_runtime.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! M11 FU-4 — change-data-capture (CDC) runtime.
5//!
6//! Drives every registered [`uni_plugin::traits::cdc::CdcOutputProvider`]
7//! by subscribing to the commit broadcaster and converting each
8//! `crate::notifications::CommitNotification` into a
9//! [`uni_plugin::traits::cdc::CdcBatch`] delivered to every active
10//! stream.
11//!
12//! ## Lifecycle
13//!
14//! - At `Uni::build` time, [`CdcRuntime::spawn`] takes a snapshot of
15//! the registered CDC providers, loads each provider's last
16//! committed LSN from the JSON-sidecar
17//! `<data_path>/_system/cdc_checkpoints.json`, and calls
18//! `provider.start(CdcStartContext { from_lsn })` to obtain a live
19//! [`uni_plugin::traits::cdc::CdcStream`]. The runtime spawns a
20//! tokio task that subscribes to the commit broadcaster and
21//! forwards each commit as a `CdcBatch` to every stream.
22//! - Per-commit, after every stream has accepted the batch, the
23//! runtime calls `checkpoint()` on each stream and persists the
24//! returned LSN to the sidecar. On restart, providers resume from
25//! that LSN.
26//! - On shutdown the runtime calls `shutdown()` on each stream and
27//! exits.
28//!
29//! ## v1 limitations
30//!
31//! `CdcBatch::mutations` ships as an empty single-row `RecordBatch`
32//! today — the LSN advancement, ordering, and checkpoint round-trip
33//! are the parts under test. Filling the batch with the actual
34//! mutation rows uses the same machinery as
35//! `crate::triggers::MutationEvents` and is tracked as a follow-up.
36
37// Rust guideline compliant
38
39use std::path::PathBuf;
40use std::sync::Arc;
41use std::time::SystemTime;
42
43use parking_lot::Mutex;
44use serde::{Deserialize, Serialize};
45use tokio::sync::broadcast;
46use uni_plugin::PluginRegistry;
47use uni_plugin::traits::cdc::{CdcBatch, CdcLsn, CdcStartContext, CdcStream};
48
49use crate::notifications::CommitNotification;
50use crate::shutdown::ShutdownHandle;
51use uni_sidecar::VecSidecar;
52
53/// Per-provider checkpoint row written to the JSON sidecar.
54#[derive(Clone, Debug, Serialize, Deserialize)]
55pub struct PersistedCheckpoint {
56 /// Provider name (`CdcOutputProvider::name()`).
57 pub name: String,
58 /// Last successfully-acknowledged LSN.
59 pub last_lsn: u64,
60}
61
62/// JSON-sidecar checkpoint store at
63/// `<data_path>/_system/cdc_checkpoints.json`.
64#[derive(Clone, Debug)]
65pub struct CdcCheckpointSidecar {
66 sidecar: VecSidecar<PersistedCheckpoint>,
67}
68
69impl CdcCheckpointSidecar {
70 /// Construct rooted at `<data_path>/_system/cdc_checkpoints.json`.
71 #[must_use]
72 pub fn new(data_path: PathBuf) -> Self {
73 Self {
74 sidecar: VecSidecar::new(data_path, "cdc_checkpoints.json"),
75 }
76 }
77
78 /// Borrow the sidecar path (for diagnostics).
79 #[must_use]
80 pub fn path(&self) -> &std::path::Path {
81 self.sidecar.path()
82 }
83
84 /// Load all persisted checkpoints. Returns an empty vec if the
85 /// sidecar doesn't exist.
86 ///
87 /// # Errors
88 ///
89 /// Returns a free-form error string on I/O or parse failure.
90 pub fn load_all(&self) -> Result<Vec<PersistedCheckpoint>, String> {
91 self.sidecar.load().map_err(|e| e.to_string())
92 }
93
94 /// Write the full checkpoint set atomically.
95 ///
96 /// # Errors
97 ///
98 /// Returns a free-form error string on I/O failure.
99 pub fn write_all(&self, rows: &[PersistedCheckpoint]) -> Result<(), String> {
100 self.sidecar.store(rows).map_err(|e| e.to_string())
101 }
102
103 /// Look up the persisted LSN for a single provider.
104 #[must_use]
105 pub fn lookup(&self, name: &str) -> Option<CdcLsn> {
106 self.load_all()
107 .ok()
108 .and_then(|rows| rows.into_iter().find(|r| r.name == name))
109 .map(|r| CdcLsn(r.last_lsn))
110 }
111
112 /// Replace a single provider's LSN, leaving other providers
113 /// unchanged. Reads-modify-writes the full sidecar atomically.
114 ///
115 /// # Errors
116 ///
117 /// Returns a free-form error string on I/O or parse failure.
118 pub fn write_one(&self, name: &str, lsn: CdcLsn) -> Result<(), String> {
119 let mut rows = self.load_all()?;
120 if let Some(row) = rows.iter_mut().find(|r| r.name == name) {
121 row.last_lsn = lsn.0;
122 } else {
123 rows.push(PersistedCheckpoint {
124 name: name.to_owned(),
125 last_lsn: lsn.0,
126 });
127 }
128 self.write_all(&rows)
129 }
130}
131
132/// Wraps a live CDC stream with the provider's name and most-recent
133/// committed LSN.
134struct ActiveStream {
135 name: String,
136 stream: Box<dyn CdcStream>,
137}
138
139/// Resume `provider` from its persisted LSN and start its stream.
140///
141/// Returns the [`ActiveStream`] on success, or `None` (logged) on failure so
142/// the caller skips it. Shared by [`CdcRuntime::spawn`] (`late = false`) and
143/// [`CdcRuntime::discover_new_providers`] (`late = true`); the only difference
144/// is the log wording.
145fn start_stream(
146 checkpoint: Option<&CdcCheckpointSidecar>,
147 name: &str,
148 provider: &Arc<dyn uni_plugin::traits::cdc::CdcOutputProvider>,
149 late: bool,
150) -> Option<ActiveStream> {
151 let from_lsn = checkpoint.and_then(|c| c.lookup(name));
152 match provider.start(CdcStartContext::new(from_lsn)) {
153 Ok(stream) => {
154 if late {
155 tracing::info!(provider = %name, from_lsn = ?from_lsn, "CdcRuntime: late-registered provider started");
156 } else {
157 tracing::info!(provider = %name, from_lsn = ?from_lsn, "CdcRuntime: provider started");
158 }
159 Some(ActiveStream {
160 name: name.to_owned(),
161 stream,
162 })
163 }
164 Err(e) => {
165 if late {
166 tracing::warn!(provider = %name, error = %e, "CdcRuntime: late-registered provider start failed");
167 } else {
168 tracing::warn!(provider = %name, error = %e, "CdcRuntime: provider start failed; skipping");
169 }
170 None
171 }
172 }
173}
174
175/// Host-side CDC runtime that drives every registered provider on
176/// the commit broadcaster.
177///
178/// One per `Uni` instance. Constructed by [`Self::spawn`] in
179/// `Uni::build`; the running background task exits when
180/// `ShutdownHandle` signals shutdown.
181pub struct CdcRuntime {
182 /// Active streams keyed by provider name.
183 streams: Arc<Mutex<Vec<ActiveStream>>>,
184 /// Checkpoint sidecar (`None` when no local data path).
185 checkpoint: Option<CdcCheckpointSidecar>,
186 /// Shared plugin registry — consulted on every commit to discover
187 /// providers registered *after* `Uni::build` returned (e.g., via
188 /// `Uni::add_plugin`).
189 registry: Arc<PluginRegistry>,
190}
191
192impl std::fmt::Debug for CdcRuntime {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 let count = self.streams.lock().len();
195 f.debug_struct("CdcRuntime")
196 .field("active_streams", &count)
197 .field(
198 "checkpoint_path",
199 &self.checkpoint.as_ref().map(|c| c.path().to_path_buf()),
200 )
201 .finish()
202 }
203}
204
205impl CdcRuntime {
206 /// Construct and spawn the CDC runtime.
207 ///
208 /// Snapshots every registered [`uni_plugin::traits::cdc::CdcOutputProvider`],
209 /// resumes each from its last persisted LSN (via the sidecar at
210 /// `<data_path>/_system/cdc_checkpoints.json`), and starts a tokio
211 /// task that delivers each commit notification to every active
212 /// stream.
213 ///
214 /// When no providers are registered, this is a no-op fast path —
215 /// the background task is still spawned so dynamic
216 /// `Uni::add_plugin` registrations land as a future improvement,
217 /// but it currently subscribes once at startup.
218 #[must_use]
219 pub fn spawn(
220 registry: &Arc<PluginRegistry>,
221 commit_rx: broadcast::Receiver<Arc<CommitNotification>>,
222 data_path: Option<PathBuf>,
223 shutdown: &ShutdownHandle,
224 ) -> Arc<Self> {
225 let checkpoint = data_path.map(CdcCheckpointSidecar::new);
226
227 let mut active: Vec<ActiveStream> = Vec::new();
228 for (name, provider) in registry.cdc_outputs_snapshot() {
229 if let Some(stream) = start_stream(checkpoint.as_ref(), name.as_str(), &provider, false)
230 {
231 active.push(stream);
232 }
233 }
234
235 let runtime = Arc::new(Self {
236 streams: Arc::new(Mutex::new(active)),
237 checkpoint,
238 registry: Arc::clone(registry),
239 });
240
241 // Spawn the driver task. When the broadcast channel sends an
242 // Err (lagged or closed) we re-loop; on `recv` of an
243 // `Arc<CommitNotification>` we forward.
244 let runtime_clone = Arc::clone(&runtime);
245 let mut commit_rx = commit_rx;
246 let mut shutdown_rx = shutdown.subscribe();
247 let handle = tokio::spawn(async move {
248 loop {
249 tokio::select! {
250 biased;
251 _ = shutdown_rx.recv() => {
252 runtime_clone.shutdown_streams();
253 break;
254 }
255 next = commit_rx.recv() => match next {
256 Ok(notif) => runtime_clone.deliver_commit(¬if),
257 Err(broadcast::error::RecvError::Lagged(n)) => {
258 tracing::warn!(
259 lagged = n,
260 "CdcRuntime: commit broadcaster lagged",
261 );
262 }
263 Err(broadcast::error::RecvError::Closed) => break,
264 }
265 }
266 }
267 });
268 shutdown.track_task(handle);
269
270 runtime
271 }
272
273 /// Number of currently-active CDC streams (for diagnostics + tests).
274 #[must_use]
275 pub fn active_stream_count(&self) -> usize {
276 self.streams.lock().len()
277 }
278
279 /// Borrow the checkpoint sidecar, if local-disk persistence is
280 /// enabled. Used by tests to assert on persisted LSN.
281 #[must_use]
282 pub fn checkpoint_sidecar(&self) -> Option<&CdcCheckpointSidecar> {
283 self.checkpoint.as_ref()
284 }
285
286 /// Discover any providers registered after `Uni::build` (e.g.,
287 /// via `Uni::add_plugin`) and start a stream for each one. Called
288 /// at the start of every `deliver_commit` so dynamic
289 /// registrations don't miss any commits past the first.
290 fn discover_new_providers(&self) {
291 let snapshot = self.registry.cdc_outputs_snapshot();
292 let mut streams = self.streams.lock();
293 for (name, provider) in snapshot {
294 if streams.iter().any(|s| s.name == name.as_str()) {
295 continue;
296 }
297 if let Some(stream) =
298 start_stream(self.checkpoint.as_ref(), name.as_str(), &provider, true)
299 {
300 streams.push(stream);
301 }
302 }
303 }
304
305 /// Convert a single [`CommitNotification`] into a [`CdcBatch`] and
306 /// deliver it to every active stream, then checkpoint each
307 /// stream and persist the new LSN to the sidecar.
308 fn deliver_commit(&self, notif: &CommitNotification) {
309 self.discover_new_providers();
310 // FU-4: the broadcaster pre-materializes the mutation RecordBatch
311 // when at least one `CdcOutputProvider` is registered (see
312 // `Transaction::commit`). `None` here means either there were
313 // zero rows or the broadcaster ran without CDC subscribers
314 // (race: provider registered between the snapshot and now —
315 // discover_new_providers above picks them up for the *next*
316 // commit). Fall back to an empty batch matching the canonical
317 // event-row schema so downstream filters see consistent
318 // column types.
319 let mutations = notif.mutations.clone().unwrap_or_else(|| {
320 std::sync::Arc::new(arrow_array::RecordBatch::new_empty(
321 crate::triggers::event_row_schema(),
322 ))
323 });
324 let batch = CdcBatch {
325 lsn_start: CdcLsn(notif.causal_version),
326 lsn_end: CdcLsn(notif.version),
327 mutations,
328 commit_timestamp: SystemTime::now(),
329 };
330
331 let mut streams = self.streams.lock();
332 for active in streams.iter_mut() {
333 if let Err(e) = active.stream.deliver(&batch) {
334 tracing::warn!(
335 provider = %active.name,
336 error = %e,
337 "CdcRuntime: deliver failed",
338 );
339 continue;
340 }
341 match active.stream.checkpoint() {
342 Ok(lsn) => {
343 if let Some(sidecar) = &self.checkpoint
344 && let Err(e) = sidecar.write_one(&active.name, lsn)
345 {
346 tracing::debug!(
347 provider = %active.name,
348 error = %e,
349 "CdcRuntime: checkpoint write failed",
350 );
351 }
352 }
353 Err(e) => tracing::warn!(
354 provider = %active.name,
355 error = %e,
356 "CdcRuntime: checkpoint failed",
357 ),
358 }
359 }
360 }
361
362 /// Call `shutdown()` on every active stream and drop them.
363 fn shutdown_streams(&self) {
364 let mut streams = self.streams.lock();
365 for active in streams.iter_mut() {
366 if let Err(e) = active.stream.shutdown() {
367 tracing::warn!(
368 provider = %active.name,
369 error = %e,
370 "CdcRuntime: shutdown failed",
371 );
372 }
373 }
374 streams.clear();
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use tempfile::TempDir;
382
383 #[test]
384 fn checkpoint_sidecar_round_trip() {
385 let tmp = TempDir::new().unwrap();
386 let s = CdcCheckpointSidecar::new(tmp.path().to_path_buf());
387 assert!(s.load_all().unwrap().is_empty());
388 s.write_one("kafka", CdcLsn(42)).unwrap();
389 s.write_one("pulsar", CdcLsn(7)).unwrap();
390 let rows = s.load_all().unwrap();
391 assert_eq!(rows.len(), 2);
392 assert_eq!(s.lookup("kafka"), Some(CdcLsn(42)));
393 assert_eq!(s.lookup("pulsar"), Some(CdcLsn(7)));
394 }
395
396 #[test]
397 fn checkpoint_sidecar_survives_close_reopen() {
398 let tmp = TempDir::new().unwrap();
399 {
400 let s = CdcCheckpointSidecar::new(tmp.path().to_path_buf());
401 s.write_one("kafka", CdcLsn(99)).unwrap();
402 }
403 let s2 = CdcCheckpointSidecar::new(tmp.path().to_path_buf());
404 assert_eq!(s2.lookup("kafka"), Some(CdcLsn(99)));
405 }
406
407 #[test]
408 fn checkpoint_sidecar_overwrites_existing_provider() {
409 let tmp = TempDir::new().unwrap();
410 let s = CdcCheckpointSidecar::new(tmp.path().to_path_buf());
411 s.write_one("kafka", CdcLsn(1)).unwrap();
412 s.write_one("kafka", CdcLsn(2)).unwrap();
413 s.write_one("kafka", CdcLsn(3)).unwrap();
414 assert_eq!(s.lookup("kafka"), Some(CdcLsn(3)));
415 assert_eq!(s.load_all().unwrap().len(), 1);
416 }
417}