rhei-sidecar 1.5.0

Sidecar CDC consumer for Rhei — polls external databases by timestamp columns
Documentation
//! Watermark types and persistence backends for timestamp-based CDC polling.
//!
//! A [`Watermark`] records the last-seen `(updated_at, pk1, pk2, …)` position
//! for a single table, enabling the consumer to issue incremental poll queries
//! without re-reading rows it has already processed.
//!
//! The [`WatermarkStore`] trait abstracts persistence. Two implementations are
//! provided:
//!
//! - [`NullWatermarkStore`] — in-memory only; watermarks are lost on restart.
//! - [`RocksDbWatermarkStore`] — durable persistence via RocksDB (requires the
//!   `rocksdb-watermark` crate feature).

use serde::{Deserialize, Serialize};

use crate::error::SidecarError;

/// Internal watermark state for timestamp-based polling.
///
/// Tracks the last-seen (timestamp, primary_key_values) pair per table to handle
/// timestamp ties. Rows are polled with compound ordering over all PK columns:
///   WHERE updated_at > $watermark_ts
///     OR (updated_at = $watermark_ts AND (pk1, pk2, ...) > ($pk1, $pk2, ...))
///   ORDER BY updated_at ASC, pk1 ASC, pk2 ASC, ...
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Watermark {
    /// Last-seen timestamp (unix seconds).
    pub timestamp: i64,
    /// Stringified last-seen primary key values for tie-breaking (one per PK column).
    /// None if this is the initial watermark (no rows seen yet).
    pub last_pk: Option<Vec<String>>,
}

impl Watermark {
    /// Create a new watermark starting from the beginning of time.
    pub fn new() -> Self {
        Self {
            timestamp: 0,
            last_pk: None,
        }
    }

    /// Advance the watermark to a new (timestamp, pk_values) pair.
    pub fn advance(&mut self, timestamp: i64, pk_values: Vec<String>) {
        self.timestamp = timestamp;
        self.last_pk = Some(pk_values);
    }
}

// ---------------------------------------------------------------------------
// WatermarkStore — persistence abstraction
// ---------------------------------------------------------------------------

/// Persistence backend for sidecar watermarks.
///
/// Implementations store per-table watermarks and the global sequence counter
/// so that sidecar CDC can resume from its last position after a restart.
pub trait WatermarkStore: Send + Sync {
    /// Load the watermark for a table. Returns `None` if no watermark has been
    /// persisted for the table yet.
    fn load(&self, table_name: &str) -> Result<Option<Watermark>, SidecarError>;

    /// Persist the watermark for a table.
    fn save(&self, table_name: &str, wm: &Watermark) -> Result<(), SidecarError>;

    /// Load the persisted global sequence counter.
    /// Returns `1` if nothing has been persisted yet (the starting value).
    fn load_global_seq(&self) -> Result<i64, SidecarError>;

    /// Persist the global sequence counter.
    fn save_global_seq(&self, seq: i64) -> Result<(), SidecarError>;
}

/// No-op watermark store — provides backward-compatible in-memory-only behavior.
///
/// All loads return defaults; all saves are silently discarded.
pub struct NullWatermarkStore;

impl WatermarkStore for NullWatermarkStore {
    fn load(&self, _table_name: &str) -> Result<Option<Watermark>, SidecarError> {
        Ok(None)
    }
    fn save(&self, _table_name: &str, _wm: &Watermark) -> Result<(), SidecarError> {
        Ok(())
    }
    fn load_global_seq(&self) -> Result<i64, SidecarError> {
        Ok(1)
    }
    fn save_global_seq(&self, _seq: i64) -> Result<(), SidecarError> {
        Ok(())
    }
}

// ---------------------------------------------------------------------------
// RocksDB-backed watermark store
// ---------------------------------------------------------------------------

#[cfg(feature = "rocksdb-watermark")]
mod rocksdb_store {
    use super::*;
    use rust_rocksdb::{WriteBatch, DB};
    use std::sync::Arc;

    /// Key prefix for per-table watermarks.
    const WM_PREFIX: &[u8] = b"wm/";
    /// Key for the global sequence counter.
    const GLOBAL_SEQ_KEY: &[u8] = b"\xff__meta__/global_seq";

    /// A [`WatermarkStore`] backed by RocksDB for durable sidecar state.
    ///
    /// Available on crate feature `rocksdb-watermark` only.
    ///
    /// Persists per-table [`Watermark`]s and the global sequence counter in a
    /// local RocksDB database so that [`crate::TimestampCdcConsumer`] can
    /// resume exactly where it left off after a process restart.
    ///
    /// Keys use a `wm/<table_name>` prefix for per-table watermarks and a
    /// reserved `\xff__meta__/global_seq` key for the sequence counter. Values
    /// are JSON-serialized for human readability and forward compatibility.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use rhei_sidecar::RocksDbWatermarkStore;
    ///
    /// let store = RocksDbWatermarkStore::open("/var/lib/rhei/watermarks")
    ///     .expect("open watermark store");
    /// ```
    pub struct RocksDbWatermarkStore {
        db: Arc<DB>,
    }

    impl RocksDbWatermarkStore {
        /// Open (or create) the RocksDB database at `path`.
        ///
        /// If the directory does not exist it is created automatically.
        /// Returns a [`crate::SidecarError::WatermarkStore`] if RocksDB
        /// cannot open the path (e.g. locked by another process, permission
        /// denied).
        pub fn open(path: &str) -> Result<Self, SidecarError> {
            let mut opts = rust_rocksdb::Options::default();
            opts.create_if_missing(true);

            let db = DB::open(&opts, path)
                .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb open: {e}")))?;

            Ok(Self { db: Arc::new(db) })
        }

        fn table_key(table_name: &str) -> Vec<u8> {
            let mut key = Vec::with_capacity(WM_PREFIX.len() + table_name.len());
            key.extend_from_slice(WM_PREFIX);
            key.extend_from_slice(table_name.as_bytes());
            key
        }
    }

    impl WatermarkStore for RocksDbWatermarkStore {
        fn load(&self, table_name: &str) -> Result<Option<Watermark>, SidecarError> {
            let key = Self::table_key(table_name);
            match self.db.get(&key).map_err(|e| {
                SidecarError::WatermarkStore(format!("rocksdb get {table_name}: {e}"))
            })? {
                Some(bytes) => {
                    let wm: Watermark = serde_json::from_slice(&bytes).map_err(|e| {
                        SidecarError::WatermarkStore(format!(
                            "deserialize watermark {table_name}: {e}"
                        ))
                    })?;
                    Ok(Some(wm))
                }
                None => Ok(None),
            }
        }

        fn save(&self, table_name: &str, wm: &Watermark) -> Result<(), SidecarError> {
            let key = Self::table_key(table_name);
            let value = serde_json::to_vec(wm).map_err(|e| {
                SidecarError::WatermarkStore(format!("serialize watermark {table_name}: {e}"))
            })?;
            self.db
                .put(&key, &value)
                .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb put: {e}")))?;
            Ok(())
        }

        fn load_global_seq(&self) -> Result<i64, SidecarError> {
            match self
                .db
                .get(GLOBAL_SEQ_KEY)
                .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb get global_seq: {e}")))?
            {
                Some(bytes) if bytes.len() == 8 => {
                    let arr: [u8; 8] = bytes[..8].try_into().unwrap();
                    Ok(i64::from_be_bytes(arr))
                }
                _ => Ok(1),
            }
        }

        fn save_global_seq(&self, seq: i64) -> Result<(), SidecarError> {
            let mut batch = WriteBatch::default();
            batch.put(GLOBAL_SEQ_KEY, seq.to_be_bytes());
            self.db
                .write(&batch)
                .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb write: {e}")))?;
            Ok(())
        }
    }
}

#[cfg(feature = "rocksdb-watermark")]
pub use rocksdb_store::RocksDbWatermarkStore;

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_watermark_advancement() {
        let mut wm = Watermark::new();
        assert_eq!(wm.timestamp, 0);
        assert!(wm.last_pk.is_none());

        wm.advance(1000, vec!["42".to_string()]);
        assert_eq!(wm.timestamp, 1000);
        assert_eq!(
            wm.last_pk.as_deref(),
            Some(vec!["42".to_string()].as_slice())
        );

        // Same timestamp, different pk
        wm.advance(1000, vec!["43".to_string()]);
        assert_eq!(wm.timestamp, 1000);
        assert_eq!(wm.last_pk, Some(vec!["43".to_string()]));

        // New timestamp
        wm.advance(2000, vec!["1".to_string()]);
        assert_eq!(wm.timestamp, 2000);
        assert_eq!(wm.last_pk, Some(vec!["1".to_string()]));
    }

    #[test]
    fn test_watermark_composite_pk() {
        let mut wm = Watermark::new();
        wm.advance(1000, vec!["tenant_1".to_string(), "42".to_string()]);
        assert_eq!(wm.timestamp, 1000);
        assert_eq!(
            wm.last_pk,
            Some(vec!["tenant_1".to_string(), "42".to_string()])
        );
    }

    #[test]
    fn test_watermark_serde_roundtrip() {
        let mut wm = Watermark::new();
        wm.advance(1234, vec!["pk_val".to_string()]);

        let json = serde_json::to_string(&wm).unwrap();
        let restored: Watermark = serde_json::from_str(&json).unwrap();
        assert_eq!(restored.timestamp, 1234);
        assert_eq!(restored.last_pk, Some(vec!["pk_val".to_string()]));
    }

    #[test]
    fn test_watermark_composite_serde_roundtrip() {
        let mut wm = Watermark::new();
        wm.advance(
            5000,
            vec!["a".to_string(), "b".to_string(), "c".to_string()],
        );

        let json = serde_json::to_string(&wm).unwrap();
        let restored: Watermark = serde_json::from_str(&json).unwrap();
        assert_eq!(restored.timestamp, 5000);
        assert_eq!(
            restored.last_pk,
            Some(vec!["a".to_string(), "b".to_string(), "c".to_string()])
        );
    }

    #[test]
    fn test_null_watermark_store() {
        let store = NullWatermarkStore;
        assert!(store.load("t").unwrap().is_none());
        assert_eq!(store.load_global_seq().unwrap(), 1);
        store.save("t", &Watermark::new()).unwrap();
        store.save_global_seq(42).unwrap();
        // Still returns defaults
        assert!(store.load("t").unwrap().is_none());
        assert_eq!(store.load_global_seq().unwrap(), 1);
    }

    #[test]
    fn test_rocksdb_watermark_store() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("wm_test");

        // Open, save, load
        {
            let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();

            assert!(store.load("users").unwrap().is_none());
            assert_eq!(store.load_global_seq().unwrap(), 1);

            let mut wm = Watermark::new();
            wm.advance(500, vec!["pk_42".to_string()]);
            store.save("users", &wm).unwrap();
            store.save_global_seq(99).unwrap();

            let loaded = store.load("users").unwrap().unwrap();
            assert_eq!(loaded.timestamp, 500);
            assert_eq!(loaded.last_pk, Some(vec!["pk_42".to_string()]));
            assert_eq!(store.load_global_seq().unwrap(), 99);
        }

        // Reopen — verify persistence
        {
            let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();

            let loaded = store.load("users").unwrap().unwrap();
            assert_eq!(loaded.timestamp, 500);
            assert_eq!(loaded.last_pk, Some(vec!["pk_42".to_string()]));
            assert_eq!(store.load_global_seq().unwrap(), 99);

            // Other tables are still None
            assert!(store.load("orders").unwrap().is_none());
        }
    }

    #[test]
    fn test_rocksdb_watermark_store_composite_pk() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("wm_composite_test");

        let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();

        let mut wm = Watermark::new();
        wm.advance(1000, vec!["tenant_a".to_string(), "42".to_string()]);
        store.save("orders", &wm).unwrap();

        let loaded = store.load("orders").unwrap().unwrap();
        assert_eq!(loaded.timestamp, 1000);
        assert_eq!(
            loaded.last_pk,
            Some(vec!["tenant_a".to_string(), "42".to_string()])
        );
    }
}