use serde::{Deserialize, Serialize};
use crate::error::SidecarError;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Watermark {
pub timestamp: i64,
pub last_pk: Option<Vec<String>>,
}
impl Watermark {
pub fn new() -> Self {
Self {
timestamp: 0,
last_pk: None,
}
}
pub fn advance(&mut self, timestamp: i64, pk_values: Vec<String>) {
self.timestamp = timestamp;
self.last_pk = Some(pk_values);
}
}
pub trait WatermarkStore: Send + Sync {
fn load(&self, table_name: &str) -> Result<Option<Watermark>, SidecarError>;
fn save(&self, table_name: &str, wm: &Watermark) -> Result<(), SidecarError>;
fn load_global_seq(&self) -> Result<i64, SidecarError>;
fn save_global_seq(&self, seq: i64) -> Result<(), SidecarError>;
}
pub struct NullWatermarkStore;
impl WatermarkStore for NullWatermarkStore {
fn load(&self, _table_name: &str) -> Result<Option<Watermark>, SidecarError> {
Ok(None)
}
fn save(&self, _table_name: &str, _wm: &Watermark) -> Result<(), SidecarError> {
Ok(())
}
fn load_global_seq(&self) -> Result<i64, SidecarError> {
Ok(1)
}
fn save_global_seq(&self, _seq: i64) -> Result<(), SidecarError> {
Ok(())
}
}
#[cfg(feature = "rocksdb-watermark")]
mod rocksdb_store {
use super::*;
use rust_rocksdb::{WriteBatch, DB};
use std::sync::Arc;
const WM_PREFIX: &[u8] = b"wm/";
const GLOBAL_SEQ_KEY: &[u8] = b"\xff__meta__/global_seq";
pub struct RocksDbWatermarkStore {
db: Arc<DB>,
}
impl RocksDbWatermarkStore {
pub fn open(path: &str) -> Result<Self, SidecarError> {
let mut opts = rust_rocksdb::Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, path)
.map_err(|e| SidecarError::WatermarkStore(format!("rocksdb open: {e}")))?;
Ok(Self { db: Arc::new(db) })
}
fn table_key(table_name: &str) -> Vec<u8> {
let mut key = Vec::with_capacity(WM_PREFIX.len() + table_name.len());
key.extend_from_slice(WM_PREFIX);
key.extend_from_slice(table_name.as_bytes());
key
}
}
impl WatermarkStore for RocksDbWatermarkStore {
fn load(&self, table_name: &str) -> Result<Option<Watermark>, SidecarError> {
let key = Self::table_key(table_name);
match self.db.get(&key).map_err(|e| {
SidecarError::WatermarkStore(format!("rocksdb get {table_name}: {e}"))
})? {
Some(bytes) => {
let wm: Watermark = serde_json::from_slice(&bytes).map_err(|e| {
SidecarError::WatermarkStore(format!(
"deserialize watermark {table_name}: {e}"
))
})?;
Ok(Some(wm))
}
None => Ok(None),
}
}
fn save(&self, table_name: &str, wm: &Watermark) -> Result<(), SidecarError> {
let key = Self::table_key(table_name);
let value = serde_json::to_vec(wm).map_err(|e| {
SidecarError::WatermarkStore(format!("serialize watermark {table_name}: {e}"))
})?;
self.db
.put(&key, &value)
.map_err(|e| SidecarError::WatermarkStore(format!("rocksdb put: {e}")))?;
Ok(())
}
fn load_global_seq(&self) -> Result<i64, SidecarError> {
match self
.db
.get(GLOBAL_SEQ_KEY)
.map_err(|e| SidecarError::WatermarkStore(format!("rocksdb get global_seq: {e}")))?
{
Some(bytes) if bytes.len() == 8 => {
let arr: [u8; 8] = bytes[..8].try_into().unwrap();
Ok(i64::from_be_bytes(arr))
}
_ => Ok(1),
}
}
fn save_global_seq(&self, seq: i64) -> Result<(), SidecarError> {
let mut batch = WriteBatch::default();
batch.put(GLOBAL_SEQ_KEY, seq.to_be_bytes());
self.db
.write(&batch)
.map_err(|e| SidecarError::WatermarkStore(format!("rocksdb write: {e}")))?;
Ok(())
}
}
}
#[cfg(feature = "rocksdb-watermark")]
pub use rocksdb_store::RocksDbWatermarkStore;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_watermark_advancement() {
let mut wm = Watermark::new();
assert_eq!(wm.timestamp, 0);
assert!(wm.last_pk.is_none());
wm.advance(1000, vec!["42".to_string()]);
assert_eq!(wm.timestamp, 1000);
assert_eq!(
wm.last_pk.as_deref(),
Some(vec!["42".to_string()].as_slice())
);
wm.advance(1000, vec!["43".to_string()]);
assert_eq!(wm.timestamp, 1000);
assert_eq!(wm.last_pk, Some(vec!["43".to_string()]));
wm.advance(2000, vec!["1".to_string()]);
assert_eq!(wm.timestamp, 2000);
assert_eq!(wm.last_pk, Some(vec!["1".to_string()]));
}
#[test]
fn test_watermark_composite_pk() {
let mut wm = Watermark::new();
wm.advance(1000, vec!["tenant_1".to_string(), "42".to_string()]);
assert_eq!(wm.timestamp, 1000);
assert_eq!(
wm.last_pk,
Some(vec!["tenant_1".to_string(), "42".to_string()])
);
}
#[test]
fn test_watermark_serde_roundtrip() {
let mut wm = Watermark::new();
wm.advance(1234, vec!["pk_val".to_string()]);
let json = serde_json::to_string(&wm).unwrap();
let restored: Watermark = serde_json::from_str(&json).unwrap();
assert_eq!(restored.timestamp, 1234);
assert_eq!(restored.last_pk, Some(vec!["pk_val".to_string()]));
}
#[test]
fn test_watermark_composite_serde_roundtrip() {
let mut wm = Watermark::new();
wm.advance(
5000,
vec!["a".to_string(), "b".to_string(), "c".to_string()],
);
let json = serde_json::to_string(&wm).unwrap();
let restored: Watermark = serde_json::from_str(&json).unwrap();
assert_eq!(restored.timestamp, 5000);
assert_eq!(
restored.last_pk,
Some(vec!["a".to_string(), "b".to_string(), "c".to_string()])
);
}
#[test]
fn test_null_watermark_store() {
let store = NullWatermarkStore;
assert!(store.load("t").unwrap().is_none());
assert_eq!(store.load_global_seq().unwrap(), 1);
store.save("t", &Watermark::new()).unwrap();
store.save_global_seq(42).unwrap();
assert!(store.load("t").unwrap().is_none());
assert_eq!(store.load_global_seq().unwrap(), 1);
}
#[test]
fn test_rocksdb_watermark_store() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("wm_test");
{
let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();
assert!(store.load("users").unwrap().is_none());
assert_eq!(store.load_global_seq().unwrap(), 1);
let mut wm = Watermark::new();
wm.advance(500, vec!["pk_42".to_string()]);
store.save("users", &wm).unwrap();
store.save_global_seq(99).unwrap();
let loaded = store.load("users").unwrap().unwrap();
assert_eq!(loaded.timestamp, 500);
assert_eq!(loaded.last_pk, Some(vec!["pk_42".to_string()]));
assert_eq!(store.load_global_seq().unwrap(), 99);
}
{
let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();
let loaded = store.load("users").unwrap().unwrap();
assert_eq!(loaded.timestamp, 500);
assert_eq!(loaded.last_pk, Some(vec!["pk_42".to_string()]));
assert_eq!(store.load_global_seq().unwrap(), 99);
assert!(store.load("orders").unwrap().is_none());
}
}
#[test]
fn test_rocksdb_watermark_store_composite_pk() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("wm_composite_test");
let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();
let mut wm = Watermark::new();
wm.advance(1000, vec!["tenant_a".to_string(), "42".to_string()]);
store.save("orders", &wm).unwrap();
let loaded = store.load("orders").unwrap().unwrap();
assert_eq!(loaded.timestamp, 1000);
assert_eq!(
loaded.last_pk,
Some(vec!["tenant_a".to_string(), "42".to_string()])
);
}
}