Skip to main content

rhei_sidecar/
watermark.rs

1//! Watermark types and persistence backends for timestamp-based CDC polling.
2//!
3//! A [`Watermark`] records the last-seen `(updated_at, pk1, pk2, …)` position
4//! for a single table, enabling the consumer to issue incremental poll queries
5//! without re-reading rows it has already processed.
6//!
7//! The [`WatermarkStore`] trait abstracts persistence. Two implementations are
8//! provided:
9//!
10//! - [`NullWatermarkStore`] — in-memory only; watermarks are lost on restart.
11//! - [`RocksDbWatermarkStore`] — durable persistence via RocksDB (requires the
12//!   `rocksdb-watermark` crate feature).
13
14use serde::{Deserialize, Serialize};
15
16use crate::error::SidecarError;
17
18/// Internal watermark state for timestamp-based polling.
19///
20/// Tracks the last-seen (timestamp, primary_key_values) pair per table to handle
21/// timestamp ties. Rows are polled with compound ordering over all PK columns:
22///   WHERE updated_at > $watermark_ts
23///     OR (updated_at = $watermark_ts AND (pk1, pk2, ...) > ($pk1, $pk2, ...))
24///   ORDER BY updated_at ASC, pk1 ASC, pk2 ASC, ...
25#[derive(Debug, Clone, Default, Serialize, Deserialize)]
26pub struct Watermark {
27    /// Last-seen timestamp (unix seconds).
28    pub timestamp: i64,
29    /// Stringified last-seen primary key values for tie-breaking (one per PK column).
30    /// None if this is the initial watermark (no rows seen yet).
31    pub last_pk: Option<Vec<String>>,
32}
33
34impl Watermark {
35    /// Create a new watermark starting from the beginning of time.
36    pub fn new() -> Self {
37        Self {
38            timestamp: 0,
39            last_pk: None,
40        }
41    }
42
43    /// Advance the watermark to a new (timestamp, pk_values) pair.
44    pub fn advance(&mut self, timestamp: i64, pk_values: Vec<String>) {
45        self.timestamp = timestamp;
46        self.last_pk = Some(pk_values);
47    }
48}
49
50// ---------------------------------------------------------------------------
51// WatermarkStore — persistence abstraction
52// ---------------------------------------------------------------------------
53
54/// Persistence backend for sidecar watermarks.
55///
56/// Implementations store per-table watermarks and the global sequence counter
57/// so that sidecar CDC can resume from its last position after a restart.
58pub trait WatermarkStore: Send + Sync {
59    /// Load the watermark for a table. Returns `None` if no watermark has been
60    /// persisted for the table yet.
61    fn load(&self, table_name: &str) -> Result<Option<Watermark>, SidecarError>;
62
63    /// Persist the watermark for a table.
64    fn save(&self, table_name: &str, wm: &Watermark) -> Result<(), SidecarError>;
65
66    /// Load the persisted global sequence counter.
67    /// Returns `1` if nothing has been persisted yet (the starting value).
68    fn load_global_seq(&self) -> Result<i64, SidecarError>;
69
70    /// Persist the global sequence counter.
71    fn save_global_seq(&self, seq: i64) -> Result<(), SidecarError>;
72}
73
74/// No-op watermark store — provides backward-compatible in-memory-only behavior.
75///
76/// All loads return defaults; all saves are silently discarded.
77pub struct NullWatermarkStore;
78
79impl WatermarkStore for NullWatermarkStore {
80    fn load(&self, _table_name: &str) -> Result<Option<Watermark>, SidecarError> {
81        Ok(None)
82    }
83    fn save(&self, _table_name: &str, _wm: &Watermark) -> Result<(), SidecarError> {
84        Ok(())
85    }
86    fn load_global_seq(&self) -> Result<i64, SidecarError> {
87        Ok(1)
88    }
89    fn save_global_seq(&self, _seq: i64) -> Result<(), SidecarError> {
90        Ok(())
91    }
92}
93
94// ---------------------------------------------------------------------------
95// RocksDB-backed watermark store
96// ---------------------------------------------------------------------------
97
98#[cfg(feature = "rocksdb-watermark")]
99mod rocksdb_store {
100    use super::*;
101    use rust_rocksdb::{WriteBatch, DB};
102    use std::sync::Arc;
103
104    /// Key prefix for per-table watermarks.
105    const WM_PREFIX: &[u8] = b"wm/";
106    /// Key for the global sequence counter.
107    const GLOBAL_SEQ_KEY: &[u8] = b"\xff__meta__/global_seq";
108
109    /// A [`WatermarkStore`] backed by RocksDB for durable sidecar state.
110    ///
111    /// Available on crate feature `rocksdb-watermark` only.
112    ///
113    /// Persists per-table [`Watermark`]s and the global sequence counter in a
114    /// local RocksDB database so that [`crate::TimestampCdcConsumer`] can
115    /// resume exactly where it left off after a process restart.
116    ///
117    /// Keys use a `wm/<table_name>` prefix for per-table watermarks and a
118    /// reserved `\xff__meta__/global_seq` key for the sequence counter. Values
119    /// are JSON-serialized for human readability and forward compatibility.
120    ///
121    /// # Example
122    ///
123    /// ```rust,no_run
124    /// use rhei_sidecar::RocksDbWatermarkStore;
125    ///
126    /// let store = RocksDbWatermarkStore::open("/var/lib/rhei/watermarks")
127    ///     .expect("open watermark store");
128    /// ```
129    pub struct RocksDbWatermarkStore {
130        db: Arc<DB>,
131    }
132
133    impl RocksDbWatermarkStore {
134        /// Open (or create) the RocksDB database at `path`.
135        ///
136        /// If the directory does not exist it is created automatically.
137        /// Returns a [`crate::SidecarError::WatermarkStore`] if RocksDB
138        /// cannot open the path (e.g. locked by another process, permission
139        /// denied).
140        pub fn open(path: &str) -> Result<Self, SidecarError> {
141            let mut opts = rust_rocksdb::Options::default();
142            opts.create_if_missing(true);
143
144            let db = DB::open(&opts, path)
145                .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb open: {e}")))?;
146
147            Ok(Self { db: Arc::new(db) })
148        }
149
150        fn table_key(table_name: &str) -> Vec<u8> {
151            let mut key = Vec::with_capacity(WM_PREFIX.len() + table_name.len());
152            key.extend_from_slice(WM_PREFIX);
153            key.extend_from_slice(table_name.as_bytes());
154            key
155        }
156    }
157
158    impl WatermarkStore for RocksDbWatermarkStore {
159        fn load(&self, table_name: &str) -> Result<Option<Watermark>, SidecarError> {
160            let key = Self::table_key(table_name);
161            match self.db.get(&key).map_err(|e| {
162                SidecarError::WatermarkStore(format!("rocksdb get {table_name}: {e}"))
163            })? {
164                Some(bytes) => {
165                    let wm: Watermark = serde_json::from_slice(&bytes).map_err(|e| {
166                        SidecarError::WatermarkStore(format!(
167                            "deserialize watermark {table_name}: {e}"
168                        ))
169                    })?;
170                    Ok(Some(wm))
171                }
172                None => Ok(None),
173            }
174        }
175
176        fn save(&self, table_name: &str, wm: &Watermark) -> Result<(), SidecarError> {
177            let key = Self::table_key(table_name);
178            let value = serde_json::to_vec(wm).map_err(|e| {
179                SidecarError::WatermarkStore(format!("serialize watermark {table_name}: {e}"))
180            })?;
181            self.db
182                .put(&key, &value)
183                .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb put: {e}")))?;
184            Ok(())
185        }
186
187        fn load_global_seq(&self) -> Result<i64, SidecarError> {
188            match self
189                .db
190                .get(GLOBAL_SEQ_KEY)
191                .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb get global_seq: {e}")))?
192            {
193                Some(bytes) if bytes.len() == 8 => {
194                    let arr: [u8; 8] = bytes[..8].try_into().unwrap();
195                    Ok(i64::from_be_bytes(arr))
196                }
197                _ => Ok(1),
198            }
199        }
200
201        fn save_global_seq(&self, seq: i64) -> Result<(), SidecarError> {
202            let mut batch = WriteBatch::default();
203            batch.put(GLOBAL_SEQ_KEY, seq.to_be_bytes());
204            self.db
205                .write(&batch)
206                .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb write: {e}")))?;
207            Ok(())
208        }
209    }
210}
211
212#[cfg(feature = "rocksdb-watermark")]
213pub use rocksdb_store::RocksDbWatermarkStore;
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[test]
220    fn test_watermark_advancement() {
221        let mut wm = Watermark::new();
222        assert_eq!(wm.timestamp, 0);
223        assert!(wm.last_pk.is_none());
224
225        wm.advance(1000, vec!["42".to_string()]);
226        assert_eq!(wm.timestamp, 1000);
227        assert_eq!(
228            wm.last_pk.as_deref(),
229            Some(vec!["42".to_string()].as_slice())
230        );
231
232        // Same timestamp, different pk
233        wm.advance(1000, vec!["43".to_string()]);
234        assert_eq!(wm.timestamp, 1000);
235        assert_eq!(wm.last_pk, Some(vec!["43".to_string()]));
236
237        // New timestamp
238        wm.advance(2000, vec!["1".to_string()]);
239        assert_eq!(wm.timestamp, 2000);
240        assert_eq!(wm.last_pk, Some(vec!["1".to_string()]));
241    }
242
243    #[test]
244    fn test_watermark_composite_pk() {
245        let mut wm = Watermark::new();
246        wm.advance(1000, vec!["tenant_1".to_string(), "42".to_string()]);
247        assert_eq!(wm.timestamp, 1000);
248        assert_eq!(
249            wm.last_pk,
250            Some(vec!["tenant_1".to_string(), "42".to_string()])
251        );
252    }
253
254    #[test]
255    fn test_watermark_serde_roundtrip() {
256        let mut wm = Watermark::new();
257        wm.advance(1234, vec!["pk_val".to_string()]);
258
259        let json = serde_json::to_string(&wm).unwrap();
260        let restored: Watermark = serde_json::from_str(&json).unwrap();
261        assert_eq!(restored.timestamp, 1234);
262        assert_eq!(restored.last_pk, Some(vec!["pk_val".to_string()]));
263    }
264
265    #[test]
266    fn test_watermark_composite_serde_roundtrip() {
267        let mut wm = Watermark::new();
268        wm.advance(
269            5000,
270            vec!["a".to_string(), "b".to_string(), "c".to_string()],
271        );
272
273        let json = serde_json::to_string(&wm).unwrap();
274        let restored: Watermark = serde_json::from_str(&json).unwrap();
275        assert_eq!(restored.timestamp, 5000);
276        assert_eq!(
277            restored.last_pk,
278            Some(vec!["a".to_string(), "b".to_string(), "c".to_string()])
279        );
280    }
281
282    #[test]
283    fn test_null_watermark_store() {
284        let store = NullWatermarkStore;
285        assert!(store.load("t").unwrap().is_none());
286        assert_eq!(store.load_global_seq().unwrap(), 1);
287        store.save("t", &Watermark::new()).unwrap();
288        store.save_global_seq(42).unwrap();
289        // Still returns defaults
290        assert!(store.load("t").unwrap().is_none());
291        assert_eq!(store.load_global_seq().unwrap(), 1);
292    }
293
294    #[test]
295    fn test_rocksdb_watermark_store() {
296        let dir = tempfile::tempdir().unwrap();
297        let path = dir.path().join("wm_test");
298
299        // Open, save, load
300        {
301            let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();
302
303            assert!(store.load("users").unwrap().is_none());
304            assert_eq!(store.load_global_seq().unwrap(), 1);
305
306            let mut wm = Watermark::new();
307            wm.advance(500, vec!["pk_42".to_string()]);
308            store.save("users", &wm).unwrap();
309            store.save_global_seq(99).unwrap();
310
311            let loaded = store.load("users").unwrap().unwrap();
312            assert_eq!(loaded.timestamp, 500);
313            assert_eq!(loaded.last_pk, Some(vec!["pk_42".to_string()]));
314            assert_eq!(store.load_global_seq().unwrap(), 99);
315        }
316
317        // Reopen — verify persistence
318        {
319            let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();
320
321            let loaded = store.load("users").unwrap().unwrap();
322            assert_eq!(loaded.timestamp, 500);
323            assert_eq!(loaded.last_pk, Some(vec!["pk_42".to_string()]));
324            assert_eq!(store.load_global_seq().unwrap(), 99);
325
326            // Other tables are still None
327            assert!(store.load("orders").unwrap().is_none());
328        }
329    }
330
331    #[test]
332    fn test_rocksdb_watermark_store_composite_pk() {
333        let dir = tempfile::tempdir().unwrap();
334        let path = dir.path().join("wm_composite_test");
335
336        let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();
337
338        let mut wm = Watermark::new();
339        wm.advance(1000, vec!["tenant_a".to_string(), "42".to_string()]);
340        store.save("orders", &wm).unwrap();
341
342        let loaded = store.load("orders").unwrap().unwrap();
343        assert_eq!(loaded.timestamp, 1000);
344        assert_eq!(
345            loaded.last_pk,
346            Some(vec!["tenant_a".to_string(), "42".to_string()])
347        );
348    }
349}