1use serde::{Deserialize, Serialize};
15
16use crate::error::SidecarError;
17
18#[derive(Debug, Clone, Default, Serialize, Deserialize)]
26pub struct Watermark {
27 pub timestamp: i64,
29 pub last_pk: Option<Vec<String>>,
32}
33
34impl Watermark {
35 pub fn new() -> Self {
37 Self {
38 timestamp: 0,
39 last_pk: None,
40 }
41 }
42
43 pub fn advance(&mut self, timestamp: i64, pk_values: Vec<String>) {
45 self.timestamp = timestamp;
46 self.last_pk = Some(pk_values);
47 }
48}
49
50pub trait WatermarkStore: Send + Sync {
59 fn load(&self, table_name: &str) -> Result<Option<Watermark>, SidecarError>;
62
63 fn save(&self, table_name: &str, wm: &Watermark) -> Result<(), SidecarError>;
65
66 fn load_global_seq(&self) -> Result<i64, SidecarError>;
69
70 fn save_global_seq(&self, seq: i64) -> Result<(), SidecarError>;
72}
73
74pub struct NullWatermarkStore;
78
79impl WatermarkStore for NullWatermarkStore {
80 fn load(&self, _table_name: &str) -> Result<Option<Watermark>, SidecarError> {
81 Ok(None)
82 }
83 fn save(&self, _table_name: &str, _wm: &Watermark) -> Result<(), SidecarError> {
84 Ok(())
85 }
86 fn load_global_seq(&self) -> Result<i64, SidecarError> {
87 Ok(1)
88 }
89 fn save_global_seq(&self, _seq: i64) -> Result<(), SidecarError> {
90 Ok(())
91 }
92}
93
94#[cfg(feature = "rocksdb-watermark")]
99mod rocksdb_store {
100 use super::*;
101 use rust_rocksdb::{WriteBatch, DB};
102 use std::sync::Arc;
103
104 const WM_PREFIX: &[u8] = b"wm/";
106 const GLOBAL_SEQ_KEY: &[u8] = b"\xff__meta__/global_seq";
108
109 pub struct RocksDbWatermarkStore {
130 db: Arc<DB>,
131 }
132
133 impl RocksDbWatermarkStore {
134 pub fn open(path: &str) -> Result<Self, SidecarError> {
141 let mut opts = rust_rocksdb::Options::default();
142 opts.create_if_missing(true);
143
144 let db = DB::open(&opts, path)
145 .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb open: {e}")))?;
146
147 Ok(Self { db: Arc::new(db) })
148 }
149
150 fn table_key(table_name: &str) -> Vec<u8> {
151 let mut key = Vec::with_capacity(WM_PREFIX.len() + table_name.len());
152 key.extend_from_slice(WM_PREFIX);
153 key.extend_from_slice(table_name.as_bytes());
154 key
155 }
156 }
157
158 impl WatermarkStore for RocksDbWatermarkStore {
159 fn load(&self, table_name: &str) -> Result<Option<Watermark>, SidecarError> {
160 let key = Self::table_key(table_name);
161 match self.db.get(&key).map_err(|e| {
162 SidecarError::WatermarkStore(format!("rocksdb get {table_name}: {e}"))
163 })? {
164 Some(bytes) => {
165 let wm: Watermark = serde_json::from_slice(&bytes).map_err(|e| {
166 SidecarError::WatermarkStore(format!(
167 "deserialize watermark {table_name}: {e}"
168 ))
169 })?;
170 Ok(Some(wm))
171 }
172 None => Ok(None),
173 }
174 }
175
176 fn save(&self, table_name: &str, wm: &Watermark) -> Result<(), SidecarError> {
177 let key = Self::table_key(table_name);
178 let value = serde_json::to_vec(wm).map_err(|e| {
179 SidecarError::WatermarkStore(format!("serialize watermark {table_name}: {e}"))
180 })?;
181 self.db
182 .put(&key, &value)
183 .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb put: {e}")))?;
184 Ok(())
185 }
186
187 fn load_global_seq(&self) -> Result<i64, SidecarError> {
188 match self
189 .db
190 .get(GLOBAL_SEQ_KEY)
191 .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb get global_seq: {e}")))?
192 {
193 Some(bytes) if bytes.len() == 8 => {
194 let arr: [u8; 8] = bytes[..8].try_into().unwrap();
195 Ok(i64::from_be_bytes(arr))
196 }
197 _ => Ok(1),
198 }
199 }
200
201 fn save_global_seq(&self, seq: i64) -> Result<(), SidecarError> {
202 let mut batch = WriteBatch::default();
203 batch.put(GLOBAL_SEQ_KEY, seq.to_be_bytes());
204 self.db
205 .write(&batch)
206 .map_err(|e| SidecarError::WatermarkStore(format!("rocksdb write: {e}")))?;
207 Ok(())
208 }
209 }
210}
211
212#[cfg(feature = "rocksdb-watermark")]
213pub use rocksdb_store::RocksDbWatermarkStore;
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218
219 #[test]
220 fn test_watermark_advancement() {
221 let mut wm = Watermark::new();
222 assert_eq!(wm.timestamp, 0);
223 assert!(wm.last_pk.is_none());
224
225 wm.advance(1000, vec!["42".to_string()]);
226 assert_eq!(wm.timestamp, 1000);
227 assert_eq!(
228 wm.last_pk.as_deref(),
229 Some(vec!["42".to_string()].as_slice())
230 );
231
232 wm.advance(1000, vec!["43".to_string()]);
234 assert_eq!(wm.timestamp, 1000);
235 assert_eq!(wm.last_pk, Some(vec!["43".to_string()]));
236
237 wm.advance(2000, vec!["1".to_string()]);
239 assert_eq!(wm.timestamp, 2000);
240 assert_eq!(wm.last_pk, Some(vec!["1".to_string()]));
241 }
242
243 #[test]
244 fn test_watermark_composite_pk() {
245 let mut wm = Watermark::new();
246 wm.advance(1000, vec!["tenant_1".to_string(), "42".to_string()]);
247 assert_eq!(wm.timestamp, 1000);
248 assert_eq!(
249 wm.last_pk,
250 Some(vec!["tenant_1".to_string(), "42".to_string()])
251 );
252 }
253
254 #[test]
255 fn test_watermark_serde_roundtrip() {
256 let mut wm = Watermark::new();
257 wm.advance(1234, vec!["pk_val".to_string()]);
258
259 let json = serde_json::to_string(&wm).unwrap();
260 let restored: Watermark = serde_json::from_str(&json).unwrap();
261 assert_eq!(restored.timestamp, 1234);
262 assert_eq!(restored.last_pk, Some(vec!["pk_val".to_string()]));
263 }
264
265 #[test]
266 fn test_watermark_composite_serde_roundtrip() {
267 let mut wm = Watermark::new();
268 wm.advance(
269 5000,
270 vec!["a".to_string(), "b".to_string(), "c".to_string()],
271 );
272
273 let json = serde_json::to_string(&wm).unwrap();
274 let restored: Watermark = serde_json::from_str(&json).unwrap();
275 assert_eq!(restored.timestamp, 5000);
276 assert_eq!(
277 restored.last_pk,
278 Some(vec!["a".to_string(), "b".to_string(), "c".to_string()])
279 );
280 }
281
282 #[test]
283 fn test_null_watermark_store() {
284 let store = NullWatermarkStore;
285 assert!(store.load("t").unwrap().is_none());
286 assert_eq!(store.load_global_seq().unwrap(), 1);
287 store.save("t", &Watermark::new()).unwrap();
288 store.save_global_seq(42).unwrap();
289 assert!(store.load("t").unwrap().is_none());
291 assert_eq!(store.load_global_seq().unwrap(), 1);
292 }
293
294 #[test]
295 fn test_rocksdb_watermark_store() {
296 let dir = tempfile::tempdir().unwrap();
297 let path = dir.path().join("wm_test");
298
299 {
301 let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();
302
303 assert!(store.load("users").unwrap().is_none());
304 assert_eq!(store.load_global_seq().unwrap(), 1);
305
306 let mut wm = Watermark::new();
307 wm.advance(500, vec!["pk_42".to_string()]);
308 store.save("users", &wm).unwrap();
309 store.save_global_seq(99).unwrap();
310
311 let loaded = store.load("users").unwrap().unwrap();
312 assert_eq!(loaded.timestamp, 500);
313 assert_eq!(loaded.last_pk, Some(vec!["pk_42".to_string()]));
314 assert_eq!(store.load_global_seq().unwrap(), 99);
315 }
316
317 {
319 let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();
320
321 let loaded = store.load("users").unwrap().unwrap();
322 assert_eq!(loaded.timestamp, 500);
323 assert_eq!(loaded.last_pk, Some(vec!["pk_42".to_string()]));
324 assert_eq!(store.load_global_seq().unwrap(), 99);
325
326 assert!(store.load("orders").unwrap().is_none());
328 }
329 }
330
331 #[test]
332 fn test_rocksdb_watermark_store_composite_pk() {
333 let dir = tempfile::tempdir().unwrap();
334 let path = dir.path().join("wm_composite_test");
335
336 let store = RocksDbWatermarkStore::open(path.to_str().unwrap()).unwrap();
337
338 let mut wm = Watermark::new();
339 wm.advance(1000, vec!["tenant_a".to_string(), "42".to_string()]);
340 store.save("orders", &wm).unwrap();
341
342 let loaded = store.load("orders").unwrap().unwrap();
343 assert_eq!(loaded.timestamp, 1000);
344 assert_eq!(
345 loaded.last_pk,
346 Some(vec!["tenant_a".to_string(), "42".to_string()])
347 );
348 }
349}