1use crate::event::Event;
19use indexmap::IndexMap;
20use rustc_hash::FxBuildHasher;
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tracing::info;
26
27#[cfg(feature = "persistence")]
28use std::path::Path;
29#[cfg(feature = "persistence")]
30use tracing::debug;
31
32#[derive(Debug, Clone)]
34pub struct CheckpointConfig {
35 pub interval: Duration,
37 pub max_checkpoints: usize,
39 pub checkpoint_on_shutdown: bool,
41 pub key_prefix: String,
43}
44
45impl Default for CheckpointConfig {
46 fn default() -> Self {
47 Self {
48 interval: Duration::from_secs(60),
49 max_checkpoints: 3,
50 checkpoint_on_shutdown: true,
51 key_prefix: "varpulis".to_string(),
52 }
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct SerializableEvent {
59 pub event_type: String,
60 pub timestamp_ms: i64,
61 pub fields: HashMap<String, SerializableValue>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
66pub enum SerializableValue {
67 Int(i64),
68 Float(f64),
69 Bool(bool),
70 String(String),
71 Null,
72 Timestamp(i64),
73 Duration(u64),
74 Array(Vec<Self>),
75 Map(Vec<(String, Self)>),
76}
77
78fn value_to_serializable(v: &varpulis_core::Value) -> SerializableValue {
80 match v {
81 varpulis_core::Value::Int(i) => SerializableValue::Int(*i),
82 varpulis_core::Value::Float(f) => SerializableValue::Float(*f),
83 varpulis_core::Value::Bool(b) => SerializableValue::Bool(*b),
84 varpulis_core::Value::Str(s) => SerializableValue::String(s.to_string()),
85 varpulis_core::Value::Null => SerializableValue::Null,
86 varpulis_core::Value::Timestamp(ts) => SerializableValue::Timestamp(*ts),
87 varpulis_core::Value::Duration(d) => SerializableValue::Duration(*d),
88 varpulis_core::Value::Array(arr) => {
89 SerializableValue::Array(arr.iter().map(value_to_serializable).collect())
90 }
91 varpulis_core::Value::Map(map) => SerializableValue::Map(
92 map.iter()
93 .map(|(k, v)| (k.to_string(), value_to_serializable(v)))
94 .collect(),
95 ),
96 }
97}
98
99fn serializable_to_value(sv: SerializableValue) -> varpulis_core::Value {
101 match sv {
102 SerializableValue::Int(i) => varpulis_core::Value::Int(i),
103 SerializableValue::Float(f) => varpulis_core::Value::Float(f),
104 SerializableValue::Bool(b) => varpulis_core::Value::Bool(b),
105 SerializableValue::String(s) => varpulis_core::Value::Str(s.into()),
106 SerializableValue::Null => varpulis_core::Value::Null,
107 SerializableValue::Timestamp(ts) => varpulis_core::Value::Timestamp(ts),
108 SerializableValue::Duration(d) => varpulis_core::Value::Duration(d),
109 SerializableValue::Array(arr) => {
110 varpulis_core::Value::array(arr.into_iter().map(serializable_to_value).collect())
111 }
112 SerializableValue::Map(entries) => {
113 let mut map: IndexMap<std::sync::Arc<str>, varpulis_core::Value, FxBuildHasher> =
114 IndexMap::with_hasher(FxBuildHasher);
115 for (k, v) in entries {
116 map.insert(k.into(), serializable_to_value(v));
117 }
118 varpulis_core::Value::map(map)
119 }
120 }
121}
122
123impl From<&Event> for SerializableEvent {
124 fn from(event: &Event) -> Self {
125 let mut fields = HashMap::new();
126 for (k, v) in &event.data {
127 fields.insert(k.to_string(), value_to_serializable(v));
128 }
129 Self {
130 event_type: event.event_type.to_string(),
131 timestamp_ms: event.timestamp.timestamp_millis(),
132 fields,
133 }
134 }
135}
136
137impl From<SerializableEvent> for Event {
138 fn from(se: SerializableEvent) -> Self {
139 let mut event = Self::new(se.event_type);
140 event.timestamp = chrono::DateTime::from_timestamp_millis(se.timestamp_ms)
141 .unwrap_or_else(chrono::Utc::now);
142 for (k, v) in se.fields {
143 event.data.insert(k.into(), serializable_to_value(v));
144 }
145 event
146 }
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct Checkpoint {
152 pub id: u64,
154 pub timestamp_ms: i64,
156 pub events_processed: u64,
158 pub window_states: HashMap<String, WindowCheckpoint>,
160 pub pattern_states: HashMap<String, PatternCheckpoint>,
162 pub metadata: HashMap<String, String>,
164 #[serde(default)]
166 pub context_states: HashMap<String, EngineCheckpoint>,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct WindowCheckpoint {
172 pub events: Vec<SerializableEvent>,
174 pub window_start_ms: Option<i64>,
176 pub last_emit_ms: Option<i64>,
178 pub partitions: HashMap<String, PartitionedWindowCheckpoint>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct PartitionedWindowCheckpoint {
185 pub events: Vec<SerializableEvent>,
186 pub window_start_ms: Option<i64>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct PatternCheckpoint {
192 pub partial_matches: Vec<PartialMatchCheckpoint>,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct PartialMatchCheckpoint {
199 pub state: String,
201 pub matched_events: Vec<SerializableEvent>,
203 pub start_ms: i64,
205}
206
207#[derive(Debug, thiserror::Error)]
209pub enum StoreError {
210 #[error("I/O error: {0}")]
212 IoError(String),
213 #[error("Serialization error: {0}")]
215 SerializationError(String),
216 #[error("Key not found: {0}")]
218 NotFound(String),
219 #[error("Store not initialized")]
221 NotInitialized,
222 #[error(
224 "Checkpoint version {checkpoint_version} is newer than supported version {current_version}"
225 )]
226 IncompatibleVersion {
227 checkpoint_version: u32,
228 current_version: u32,
229 },
230}
231
232pub trait StateStore: Send + Sync {
234 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError>;
236
237 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError>;
239
240 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError>;
242
243 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError>;
245
246 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError>;
248
249 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError>;
251
252 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError>;
254
255 fn delete(&self, key: &str) -> Result<(), StoreError>;
257
258 fn flush(&self) -> Result<(), StoreError>;
260}
261
262#[derive(Debug, Default)]
264pub struct MemoryStore {
265 data: std::sync::RwLock<HashMap<String, Vec<u8>>>,
266}
267
268impl MemoryStore {
269 pub fn new() -> Self {
270 Self::default()
271 }
272}
273
274impl StateStore for MemoryStore {
275 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
276 let key = format!("checkpoint:{}", checkpoint.id);
277 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
278 self.put(&key, &data)
279 }
280
281 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
282 let checkpoints = self.list_checkpoints()?;
283 if let Some(id) = checkpoints.last() {
284 self.load_checkpoint(*id)
285 } else {
286 Ok(None)
287 }
288 }
289
290 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
291 let key = format!("checkpoint:{id}");
292 if let Some(data) = self.get(&key)? {
293 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
294 Ok(Some(checkpoint))
295 } else {
296 Ok(None)
297 }
298 }
299
300 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
301 let data = self
302 .data
303 .read()
304 .map_err(|e| StoreError::IoError(e.to_string()))?;
305 let mut ids: Vec<u64> = data
306 .keys()
307 .filter_map(|k| k.strip_prefix("checkpoint:").and_then(|s| s.parse().ok()))
308 .collect();
309 ids.sort_unstable();
310 Ok(ids)
311 }
312
313 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
314 let checkpoints = self.list_checkpoints()?;
315 let to_delete = checkpoints.len().saturating_sub(keep);
316 for id in checkpoints.iter().take(to_delete) {
317 let key = format!("checkpoint:{id}");
318 self.delete(&key)?;
319 }
320 Ok(to_delete)
321 }
322
323 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
324 let mut data = self
325 .data
326 .write()
327 .map_err(|e| StoreError::IoError(e.to_string()))?;
328 data.insert(key.to_string(), value.to_vec());
329 Ok(())
330 }
331
332 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
333 let data = self
334 .data
335 .read()
336 .map_err(|e| StoreError::IoError(e.to_string()))?;
337 Ok(data.get(key).cloned())
338 }
339
340 fn delete(&self, key: &str) -> Result<(), StoreError> {
341 let mut data = self
342 .data
343 .write()
344 .map_err(|e| StoreError::IoError(e.to_string()))?;
345 data.remove(key);
346 Ok(())
347 }
348
349 fn flush(&self) -> Result<(), StoreError> {
350 Ok(()) }
352}
353
354#[cfg(feature = "persistence")]
356pub struct RocksDbStore {
357 db: rocksdb::DB,
358 prefix: String,
359}
360
361#[cfg(feature = "persistence")]
362impl RocksDbStore {
363 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, StoreError> {
365 Self::open_with_prefix(path, "varpulis")
366 }
367
368 pub fn open_with_prefix<P: AsRef<Path>>(path: P, prefix: &str) -> Result<Self, StoreError> {
370 let mut opts = rocksdb::Options::default();
371 opts.create_if_missing(true);
372 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
373
374 opts.set_write_buffer_size(64 * 1024 * 1024); opts.set_max_write_buffer_number(3);
377 opts.set_target_file_size_base(64 * 1024 * 1024);
378
379 let db = rocksdb::DB::open(&opts, path).map_err(|e| StoreError::IoError(e.to_string()))?;
380
381 info!("Opened RocksDB state store");
382 Ok(Self {
383 db,
384 prefix: prefix.to_string(),
385 })
386 }
387
388 fn prefixed_key(&self, key: &str) -> String {
389 format!("{}:{}", self.prefix, key)
390 }
391}
392
393#[cfg(feature = "persistence")]
394impl StateStore for RocksDbStore {
395 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
396 let key = self.prefixed_key(&format!("checkpoint:{}", checkpoint.id));
397 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
398
399 self.db
400 .put(key.as_bytes(), &data)
401 .map_err(|e| StoreError::IoError(e.to_string()))?;
402
403 let latest_key = self.prefixed_key("checkpoint:latest");
405 self.db
406 .put(latest_key.as_bytes(), checkpoint.id.to_le_bytes())
407 .map_err(|e| StoreError::IoError(e.to_string()))?;
408
409 debug!("Saved checkpoint {} ({} bytes)", checkpoint.id, data.len());
410 Ok(())
411 }
412
413 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
414 let latest_key = self.prefixed_key("checkpoint:latest");
415 if let Some(id_bytes) = self
416 .db
417 .get(latest_key.as_bytes())
418 .map_err(|e| StoreError::IoError(e.to_string()))?
419 {
420 let Ok(bytes) = <[u8; 8]>::try_from(id_bytes.as_ref()) else {
421 return Ok(None);
422 };
423 let id = u64::from_le_bytes(bytes);
424 return self.load_checkpoint(id);
425 }
426 Ok(None)
427 }
428
429 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
430 let key = self.prefixed_key(&format!("checkpoint:{}", id));
431 if let Some(data) = self
432 .db
433 .get(key.as_bytes())
434 .map_err(|e| StoreError::IoError(e.to_string()))?
435 {
436 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
437 debug!("Loaded checkpoint {}", id);
438 Ok(Some(checkpoint))
439 } else {
440 Ok(None)
441 }
442 }
443
444 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
445 let prefix = self.prefixed_key("checkpoint:");
446 let mut ids = Vec::new();
447
448 let iter = self.db.prefix_iterator(prefix.as_bytes());
449 for item in iter {
450 let (key, _) = item.map_err(|e| StoreError::IoError(e.to_string()))?;
451 let key_str = String::from_utf8_lossy(&key);
452 if let Some(suffix) = key_str.strip_prefix(&prefix) {
453 if suffix != "latest" {
454 if let Ok(id) = suffix.parse::<u64>() {
455 ids.push(id);
456 }
457 }
458 }
459 }
460
461 ids.sort();
462 Ok(ids)
463 }
464
465 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
466 let checkpoints = self.list_checkpoints()?;
467 let to_delete = checkpoints.len().saturating_sub(keep);
468
469 for id in checkpoints.iter().take(to_delete) {
470 let key = self.prefixed_key(&format!("checkpoint:{}", id));
471 self.db
472 .delete(key.as_bytes())
473 .map_err(|e| StoreError::IoError(e.to_string()))?;
474 }
475
476 if to_delete > 0 {
477 info!("Pruned {} old checkpoints", to_delete);
478 }
479 Ok(to_delete)
480 }
481
482 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
483 let full_key = self.prefixed_key(key);
484 self.db
485 .put(full_key.as_bytes(), value)
486 .map_err(|e| StoreError::IoError(e.to_string()))
487 }
488
489 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
490 let full_key = self.prefixed_key(key);
491 self.db
492 .get(full_key.as_bytes())
493 .map_err(|e| StoreError::IoError(e.to_string()))
494 }
495
496 fn delete(&self, key: &str) -> Result<(), StoreError> {
497 let full_key = self.prefixed_key(key);
498 self.db
499 .delete(full_key.as_bytes())
500 .map_err(|e| StoreError::IoError(e.to_string()))
501 }
502
503 fn flush(&self) -> Result<(), StoreError> {
504 self.db
505 .flush()
506 .map_err(|e| StoreError::IoError(e.to_string()))
507 }
508}
509
510#[derive(Debug)]
516pub struct FileStore {
517 dir: std::path::PathBuf,
518}
519
520impl FileStore {
521 pub fn open(dir: impl AsRef<std::path::Path>) -> Result<Self, StoreError> {
523 let dir = dir.as_ref().to_path_buf();
524 std::fs::create_dir_all(&dir).map_err(|e| StoreError::IoError(e.to_string()))?;
525 Ok(Self { dir })
526 }
527
528 fn key_to_path(&self, key: &str) -> std::path::PathBuf {
529 let path_str = key.replace(':', std::path::MAIN_SEPARATOR_STR);
531 self.dir.join(path_str)
532 }
533}
534
535impl StateStore for FileStore {
536 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
537 let key = format!("checkpoint:{}", checkpoint.id);
538 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
539 self.put(&key, &data)
540 }
541
542 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
543 let checkpoints = self.list_checkpoints()?;
544 if let Some(id) = checkpoints.last() {
545 self.load_checkpoint(*id)
546 } else {
547 Ok(None)
548 }
549 }
550
551 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
552 let key = format!("checkpoint:{id}");
553 if let Some(data) = self.get(&key)? {
554 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
555 Ok(Some(checkpoint))
556 } else {
557 Ok(None)
558 }
559 }
560
561 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
562 let checkpoint_dir = self.dir.join("checkpoint");
563 if !checkpoint_dir.exists() {
564 return Ok(Vec::new());
565 }
566
567 let mut ids: Vec<u64> = Vec::new();
568 let entries =
569 std::fs::read_dir(&checkpoint_dir).map_err(|e| StoreError::IoError(e.to_string()))?;
570 for entry in entries {
571 let entry = entry.map_err(|e| StoreError::IoError(e.to_string()))?;
572 if let Some(name) = entry.file_name().to_str() {
573 if name != "latest" {
574 if let Ok(id) = name.parse::<u64>() {
575 ids.push(id);
576 }
577 }
578 }
579 }
580 ids.sort_unstable();
581 Ok(ids)
582 }
583
584 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
585 let checkpoints = self.list_checkpoints()?;
586 let to_delete = checkpoints.len().saturating_sub(keep);
587 for id in checkpoints.iter().take(to_delete) {
588 let key = format!("checkpoint:{id}");
589 self.delete(&key)?;
590 }
591 Ok(to_delete)
592 }
593
594 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
595 let path = self.key_to_path(key);
596 if let Some(parent) = path.parent() {
597 std::fs::create_dir_all(parent).map_err(|e| StoreError::IoError(e.to_string()))?;
598 }
599
600 let tmp_path = path.with_extension("tmp");
602 std::fs::write(&tmp_path, value).map_err(|e| StoreError::IoError(e.to_string()))?;
603 std::fs::rename(&tmp_path, &path).map_err(|e| StoreError::IoError(e.to_string()))?;
604 Ok(())
605 }
606
607 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
608 let path = self.key_to_path(key);
609 match std::fs::read(&path) {
610 Ok(data) => Ok(Some(data)),
611 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
612 Err(e) => Err(StoreError::IoError(e.to_string())),
613 }
614 }
615
616 fn delete(&self, key: &str) -> Result<(), StoreError> {
617 let path = self.key_to_path(key);
618 match std::fs::remove_file(&path) {
619 Ok(()) => Ok(()),
620 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
621 Err(e) => Err(StoreError::IoError(e.to_string())),
622 }
623 }
624
625 fn flush(&self) -> Result<(), StoreError> {
626 Ok(()) }
628}
629
630pub struct CheckpointManager {
632 store: Arc<dyn StateStore>,
633 config: CheckpointConfig,
634 last_checkpoint: Instant,
635 next_checkpoint_id: u64,
636}
637
638impl std::fmt::Debug for CheckpointManager {
639 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
640 f.debug_struct("CheckpointManager")
641 .field("config", &self.config)
642 .field("last_checkpoint", &self.last_checkpoint)
643 .field("next_checkpoint_id", &self.next_checkpoint_id)
644 .finish_non_exhaustive()
645 }
646}
647
648impl CheckpointManager {
649 pub fn new(store: Arc<dyn StateStore>, config: CheckpointConfig) -> Result<Self, StoreError> {
651 let next_id = store.load_latest_checkpoint()?.map_or(1, |c| c.id + 1);
653
654 Ok(Self {
655 store,
656 config,
657 last_checkpoint: Instant::now(),
658 next_checkpoint_id: next_id,
659 })
660 }
661
662 pub fn should_checkpoint(&self) -> bool {
664 self.last_checkpoint.elapsed() >= self.config.interval
665 }
666
667 pub fn checkpoint(&mut self, checkpoint: Checkpoint) -> Result<(), StoreError> {
669 let mut checkpoint = checkpoint;
670 checkpoint.id = self.next_checkpoint_id;
671 checkpoint.timestamp_ms = chrono::Utc::now().timestamp_millis();
672
673 self.store.save_checkpoint(&checkpoint)?;
674 self.store.prune_checkpoints(self.config.max_checkpoints)?;
675 self.store.flush()?;
676
677 self.last_checkpoint = Instant::now();
678 self.next_checkpoint_id += 1;
679
680 info!(
681 "Created checkpoint {} ({} events processed)",
682 checkpoint.id, checkpoint.events_processed
683 );
684 Ok(())
685 }
686
687 pub fn recover(&self) -> Result<Option<Checkpoint>, StoreError> {
689 self.store.load_latest_checkpoint()
690 }
691
692 pub fn store(&self) -> &Arc<dyn StateStore> {
694 &self.store
695 }
696}
697
698pub const CHECKPOINT_VERSION: u32 = 1;
700
701const fn default_checkpoint_version() -> u32 {
703 1
704}
705
706#[derive(Debug, Clone, Serialize, Deserialize)]
708pub struct EngineCheckpoint {
709 #[serde(default = "default_checkpoint_version")]
711 pub version: u32,
712 pub window_states: HashMap<String, WindowCheckpoint>,
714 pub sase_states: HashMap<String, SaseCheckpoint>,
716 pub join_states: HashMap<String, JoinCheckpoint>,
718 pub variables: HashMap<String, SerializableValue>,
720 pub events_processed: u64,
722 pub output_events_emitted: u64,
724 #[serde(default)]
726 pub watermark_state: Option<WatermarkCheckpoint>,
727 #[serde(default)]
729 pub distinct_states: HashMap<String, DistinctCheckpoint>,
730 #[serde(default)]
732 pub limit_states: HashMap<String, LimitCheckpoint>,
733}
734
735impl EngineCheckpoint {
736 pub const fn validate_and_migrate(&mut self) -> Result<(), StoreError> {
741 if self.version > CHECKPOINT_VERSION {
742 return Err(StoreError::IncompatibleVersion {
743 checkpoint_version: self.version,
744 current_version: CHECKPOINT_VERSION,
745 });
746 }
747
748 Ok(())
757 }
758}
759
760#[derive(Debug, Clone, Serialize, Deserialize)]
762pub struct SaseCheckpoint {
763 pub active_runs: Vec<RunCheckpoint>,
765 pub partitioned_runs: HashMap<String, Vec<RunCheckpoint>>,
767 pub watermark_ms: Option<i64>,
769 pub max_timestamp_ms: Option<i64>,
771 pub total_runs_created: u64,
773 pub total_runs_completed: u64,
775 pub total_runs_dropped: u64,
777 pub total_runs_evicted: u64,
779}
780
781#[derive(Debug, Clone, Serialize, Deserialize)]
783pub struct RunCheckpoint {
784 pub current_state: usize,
786 pub stack: Vec<StackEntryCheckpoint>,
788 pub captured: HashMap<String, SerializableEvent>,
790 pub event_time_started_at_ms: Option<i64>,
792 pub event_time_deadline_ms: Option<i64>,
794 pub partition_key: Option<SerializableValue>,
796 pub invalidated: bool,
798 pub pending_negation_count: usize,
800 pub kleene_events: Option<Vec<SerializableEvent>>,
802}
803
804#[derive(Debug, Clone, Serialize, Deserialize)]
806pub struct StackEntryCheckpoint {
807 pub event: SerializableEvent,
808 pub alias: Option<String>,
809}
810
811#[derive(Debug, Clone, Serialize, Deserialize)]
813pub struct JoinCheckpoint {
814 pub buffers: HashMap<String, HashMap<String, Vec<(i64, SerializableEvent)>>>,
816 pub sources: Vec<String>,
818 pub join_keys: HashMap<String, String>,
820 pub window_duration_ms: i64,
822}
823
824#[derive(Debug, Clone, Serialize, Deserialize)]
826pub struct WatermarkCheckpoint {
827 pub sources: HashMap<String, SourceWatermarkCheckpoint>,
829 pub effective_watermark_ms: Option<i64>,
831}
832
833#[derive(Debug, Clone, Serialize, Deserialize)]
835pub struct SourceWatermarkCheckpoint {
836 pub watermark_ms: Option<i64>,
838 pub max_timestamp_ms: Option<i64>,
840 pub max_out_of_orderness_ms: i64,
842}
843
844#[derive(Debug, Clone, Serialize, Deserialize)]
846pub struct DistinctCheckpoint {
847 pub keys: Vec<String>,
849}
850
851#[derive(Debug, Clone, Serialize, Deserialize)]
853pub struct LimitCheckpoint {
854 pub max: usize,
856 pub count: usize,
858}
859
860pub(crate) fn value_to_ser(v: &varpulis_core::Value) -> SerializableValue {
862 value_to_serializable(v)
863}
864
865pub(crate) fn ser_to_value(sv: SerializableValue) -> varpulis_core::Value {
867 serializable_to_value(sv)
868}
869
870#[cfg(feature = "encryption")]
891#[derive(Debug)]
892pub struct EncryptedStateStore<S: StateStore> {
893 inner: S,
894 key: [u8; 32],
895}
896
897#[cfg(feature = "encryption")]
898impl<S: StateStore> EncryptedStateStore<S> {
899 pub fn new(inner: S, key: [u8; 32]) -> Self {
901 Self { inner, key }
902 }
903
904 pub fn key_from_hex(hex_str: &str) -> Result<[u8; 32], StoreError> {
906 let bytes = hex::decode(hex_str.trim())
907 .map_err(|e| StoreError::IoError(format!("Invalid hex key: {}", e)))?;
908 if bytes.len() != 32 {
909 return Err(StoreError::IoError(format!(
910 "Encryption key must be 32 bytes (64 hex chars), got {} bytes",
911 bytes.len()
912 )));
913 }
914 let mut key = [0u8; 32];
915 key.copy_from_slice(&bytes);
916 Ok(key)
917 }
918
919 pub fn key_from_passphrase(passphrase: &str, salt: &[u8]) -> Result<[u8; 32], StoreError> {
921 use argon2::Argon2;
922
923 let mut key = [0u8; 32];
924 Argon2::default()
925 .hash_password_into(passphrase.as_bytes(), salt, &mut key)
926 .map_err(|e| StoreError::IoError(format!("Argon2 key derivation failed: {}", e)))?;
927 Ok(key)
928 }
929
930 fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, StoreError> {
932 use aes_gcm::aead::{Aead, OsRng};
933 use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
934
935 let cipher = Aes256Gcm::new_from_slice(&self.key)
936 .map_err(|e| StoreError::IoError(format!("AES key error: {}", e)))?;
937
938 let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
939 let ciphertext = cipher
940 .encrypt(&nonce, plaintext)
941 .map_err(|e| StoreError::IoError(format!("Encryption failed: {}", e)))?;
942
943 let mut result = Vec::with_capacity(12 + ciphertext.len());
945 result.extend_from_slice(&nonce);
946 result.extend_from_slice(&ciphertext);
947 Ok(result)
948 }
949
950 fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, StoreError> {
952 use aes_gcm::aead::Aead;
953 use aes_gcm::{Aes256Gcm, KeyInit, Nonce};
954
955 if data.len() < 12 {
956 return Err(StoreError::IoError(
957 "Encrypted data too short (missing nonce)".to_string(),
958 ));
959 }
960
961 let (nonce_bytes, ciphertext) = data.split_at(12);
962 let nonce_arr: [u8; 12] = nonce_bytes
963 .try_into()
964 .map_err(|_| StoreError::IoError("Invalid nonce length".to_string()))?;
965 let nonce = Nonce::from(nonce_arr);
966
967 let cipher = Aes256Gcm::new_from_slice(&self.key)
968 .map_err(|e| StoreError::IoError(format!("AES key error: {}", e)))?;
969
970 cipher
971 .decrypt(&nonce, ciphertext)
972 .map_err(|e| StoreError::IoError(format!("Decryption failed: {}", e)))
973 }
974}
975
976#[cfg(feature = "encryption")]
977impl<S: StateStore> StateStore for EncryptedStateStore<S> {
978 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
979 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
981 let encrypted = self.encrypt(&data)?;
982
983 let key = format!("checkpoint:{}", checkpoint.id);
985 self.inner.put(&key, &encrypted)?;
986
987 Ok(())
990 }
991
992 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
993 let checkpoints = self.list_checkpoints()?;
994 if let Some(id) = checkpoints.last() {
995 self.load_checkpoint(*id)
996 } else {
997 Ok(None)
998 }
999 }
1000
1001 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
1002 let key = format!("checkpoint:{}", id);
1003 if let Some(encrypted) = self.inner.get(&key)? {
1004 let data = self.decrypt(&encrypted)?;
1005 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
1006 Ok(Some(checkpoint))
1007 } else {
1008 Ok(None)
1009 }
1010 }
1011
1012 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
1013 self.inner.list_checkpoints()
1014 }
1015
1016 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
1017 self.inner.prune_checkpoints(keep)
1018 }
1019
1020 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
1021 let encrypted = self.encrypt(value)?;
1022 self.inner.put(key, &encrypted)
1023 }
1024
1025 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
1026 match self.inner.get(key)? {
1027 Some(encrypted) => Ok(Some(self.decrypt(&encrypted)?)),
1028 None => Ok(None),
1029 }
1030 }
1031
1032 fn delete(&self, key: &str) -> Result<(), StoreError> {
1033 self.inner.delete(key)
1034 }
1035
1036 fn flush(&self) -> Result<(), StoreError> {
1037 self.inner.flush()
1038 }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043 use super::*;
1044
1045 #[test]
1046 fn test_memory_store_checkpoint() {
1047 let store = MemoryStore::new();
1048
1049 let checkpoint = Checkpoint {
1050 id: 1,
1051 timestamp_ms: 1000,
1052 events_processed: 100,
1053 window_states: HashMap::new(),
1054 pattern_states: HashMap::new(),
1055 metadata: HashMap::new(),
1056 context_states: HashMap::new(),
1057 };
1058
1059 store.save_checkpoint(&checkpoint).unwrap();
1060
1061 let loaded = store.load_checkpoint(1).unwrap();
1062 assert!(loaded.is_some());
1063 let loaded = loaded.unwrap();
1064 assert_eq!(loaded.id, 1);
1065 assert_eq!(loaded.events_processed, 100);
1066 }
1067
1068 #[test]
1069 fn test_memory_store_prune() {
1070 let store = MemoryStore::new();
1071
1072 for i in 1..=5 {
1073 let checkpoint = Checkpoint {
1074 id: i,
1075 timestamp_ms: i as i64 * 1000,
1076 events_processed: i * 100,
1077 window_states: HashMap::new(),
1078 pattern_states: HashMap::new(),
1079 metadata: HashMap::new(),
1080 context_states: HashMap::new(),
1081 };
1082 store.save_checkpoint(&checkpoint).unwrap();
1083 }
1084
1085 let checkpoints = store.list_checkpoints().unwrap();
1086 assert_eq!(checkpoints.len(), 5);
1087
1088 let pruned = store.prune_checkpoints(2).unwrap();
1089 assert_eq!(pruned, 3);
1090
1091 let checkpoints = store.list_checkpoints().unwrap();
1092 assert_eq!(checkpoints.len(), 2);
1093 assert_eq!(checkpoints, vec![4, 5]);
1094 }
1095
1096 #[test]
1097 fn test_file_store_put_get_delete() {
1098 let dir = tempfile::tempdir().unwrap();
1099 let store = FileStore::open(dir.path()).unwrap();
1100
1101 store.put("test:key1", b"hello world").unwrap();
1103 let val = store.get("test:key1").unwrap();
1104 assert_eq!(val, Some(b"hello world".to_vec()));
1105
1106 let val = store.get("test:missing").unwrap();
1108 assert!(val.is_none());
1109
1110 store.delete("test:key1").unwrap();
1112 let val = store.get("test:key1").unwrap();
1113 assert!(val.is_none());
1114
1115 store.delete("test:missing").unwrap();
1117 }
1118
1119 #[test]
1120 fn test_file_store_atomic_write() {
1121 let dir = tempfile::tempdir().unwrap();
1122 let store = FileStore::open(dir.path()).unwrap();
1123
1124 store.put("data:file1", b"version 1").unwrap();
1126 assert_eq!(
1127 store.get("data:file1").unwrap(),
1128 Some(b"version 1".to_vec())
1129 );
1130
1131 store.put("data:file1", b"version 2").unwrap();
1133 assert_eq!(
1134 store.get("data:file1").unwrap(),
1135 Some(b"version 2".to_vec())
1136 );
1137
1138 let data_dir = dir.path().join("data");
1140 if data_dir.exists() {
1141 for entry in std::fs::read_dir(&data_dir).unwrap() {
1142 let entry = entry.unwrap();
1143 let name = entry.file_name().to_string_lossy().to_string();
1144 assert!(
1145 !std::path::Path::new(&name)
1146 .extension()
1147 .is_some_and(|ext| ext.eq_ignore_ascii_case("tmp")),
1148 "tmp file left behind: {name}"
1149 );
1150 }
1151 }
1152 }
1153
1154 #[test]
1155 fn test_serializable_event() {
1156 let mut event = Event::new("TestEvent");
1157 event
1158 .data
1159 .insert("count".into(), varpulis_core::Value::Int(42));
1160 event
1161 .data
1162 .insert("value".into(), varpulis_core::Value::Float(1.5));
1163 event
1164 .data
1165 .insert("name".into(), varpulis_core::Value::Str("test".into()));
1166
1167 let serializable: SerializableEvent = (&event).into();
1168 let restored: Event = serializable.into();
1169
1170 assert_eq!(&*restored.event_type, "TestEvent");
1171 assert_eq!(restored.get_int("count"), Some(42));
1172 assert_eq!(restored.get_float("value"), Some(1.5));
1173 assert_eq!(restored.get_str("name"), Some("test"));
1174 }
1175
1176 #[test]
1177 fn test_serializable_event_complex_values() {
1178 let mut event = Event::new("ComplexEvent");
1179
1180 event.data.insert(
1182 "ts".into(),
1183 varpulis_core::Value::Timestamp(1_700_000_000_000_000_000),
1184 );
1185
1186 event
1188 .data
1189 .insert("dur".into(), varpulis_core::Value::Duration(5_000_000_000));
1190
1191 event.data.insert(
1193 "tags".into(),
1194 varpulis_core::Value::array(vec![
1195 varpulis_core::Value::Str("a".into()),
1196 varpulis_core::Value::Int(1),
1197 ]),
1198 );
1199
1200 let mut inner_map = IndexMap::with_hasher(FxBuildHasher);
1202 inner_map.insert("nested_key".into(), varpulis_core::Value::Float(3.15));
1203 inner_map.insert("flag".into(), varpulis_core::Value::Bool(true));
1204 event
1205 .data
1206 .insert("meta".into(), varpulis_core::Value::map(inner_map));
1207
1208 let serializable: SerializableEvent = (&event).into();
1210
1211 assert!(matches!(
1213 serializable.fields.get("ts"),
1214 Some(SerializableValue::Timestamp(1_700_000_000_000_000_000))
1215 ));
1216 assert!(matches!(
1217 serializable.fields.get("dur"),
1218 Some(SerializableValue::Duration(5_000_000_000))
1219 ));
1220 assert!(matches!(
1221 serializable.fields.get("tags"),
1222 Some(SerializableValue::Array(_))
1223 ));
1224 assert!(matches!(
1225 serializable.fields.get("meta"),
1226 Some(SerializableValue::Map(_))
1227 ));
1228
1229 let restored: Event = serializable.into();
1230
1231 assert_eq!(
1233 restored.data.get("ts"),
1234 Some(&varpulis_core::Value::Timestamp(1_700_000_000_000_000_000))
1235 );
1236 assert_eq!(
1237 restored.data.get("dur"),
1238 Some(&varpulis_core::Value::Duration(5_000_000_000))
1239 );
1240
1241 match restored.data.get("tags") {
1243 Some(varpulis_core::Value::Array(arr)) => {
1244 assert_eq!(arr.len(), 2);
1245 assert_eq!(arr[0], varpulis_core::Value::Str("a".into()));
1246 assert_eq!(arr[1], varpulis_core::Value::Int(1));
1247 }
1248 other => panic!("Expected Array, got {other:?}"),
1249 }
1250
1251 match restored.data.get("meta") {
1253 Some(varpulis_core::Value::Map(m)) => {
1254 assert_eq!(m.len(), 2);
1255 assert_eq!(
1256 m.get("nested_key"),
1257 Some(&varpulis_core::Value::Float(3.15))
1258 );
1259 assert_eq!(m.get("flag"), Some(&varpulis_core::Value::Bool(true)));
1260 }
1261 other => panic!("Expected Map, got {other:?}"),
1262 }
1263 }
1264
1265 #[cfg(feature = "encryption")]
1270 mod encryption_tests {
1271 use super::*;
1272
1273 fn test_key() -> [u8; 32] {
1274 let mut key = [0u8; 32];
1276 for (i, b) in key.iter_mut().enumerate() {
1277 *b = i as u8;
1278 }
1279 key
1280 }
1281
1282 #[test]
1283 fn test_encrypted_put_get_roundtrip() {
1284 let inner = MemoryStore::new();
1285 let store = EncryptedStateStore::new(inner, test_key());
1286
1287 let data = b"hello, encrypted world!";
1288 store.put("test_key", data).unwrap();
1289
1290 let retrieved = store.get("test_key").unwrap().unwrap();
1291 assert_eq!(retrieved, data);
1292 }
1293
1294 #[test]
1295 fn test_encrypted_data_differs_from_plaintext() {
1296 let inner = MemoryStore::new();
1297 let store = EncryptedStateStore::new(inner, test_key());
1298
1299 let data = b"sensitive data";
1300 store.put("test_key", data).unwrap();
1301
1302 let raw = store.inner.get("test_key").unwrap().unwrap();
1304 assert_ne!(raw, data.to_vec());
1305 assert!(raw.len() > data.len()); }
1307
1308 #[test]
1309 fn test_encrypted_checkpoint_roundtrip() {
1310 let inner = MemoryStore::new();
1311 let store = EncryptedStateStore::new(inner, test_key());
1312
1313 let checkpoint = Checkpoint {
1314 id: 1,
1315 timestamp_ms: 1700000000000,
1316 events_processed: 42,
1317 window_states: HashMap::new(),
1318 pattern_states: HashMap::new(),
1319 metadata: HashMap::new(),
1320 context_states: HashMap::new(),
1321 };
1322
1323 store.save_checkpoint(&checkpoint).unwrap();
1324 let loaded = store.load_checkpoint(1).unwrap().unwrap();
1325
1326 assert_eq!(loaded.id, 1);
1327 assert_eq!(loaded.events_processed, 42);
1328 assert_eq!(loaded.timestamp_ms, 1700000000000);
1329 }
1330
1331 #[test]
1332 fn test_encrypted_get_nonexistent_key() {
1333 let inner = MemoryStore::new();
1334 let store = EncryptedStateStore::new(inner, test_key());
1335
1336 let result = store.get("nonexistent").unwrap();
1337 assert!(result.is_none());
1338 }
1339
1340 #[test]
1341 fn test_encrypted_delete() {
1342 let inner = MemoryStore::new();
1343 let store = EncryptedStateStore::new(inner, test_key());
1344
1345 store.put("key", b"value").unwrap();
1346 assert!(store.get("key").unwrap().is_some());
1347
1348 store.delete("key").unwrap();
1349 assert!(store.get("key").unwrap().is_none());
1350 }
1351
1352 #[test]
1353 fn test_wrong_key_fails_decryption() {
1354 let inner = MemoryStore::new();
1355 let key1 = test_key();
1356 let store1 = EncryptedStateStore::new(inner, key1);
1357
1358 store1.put("key", b"secret").unwrap();
1359
1360 let mut key2 = [0u8; 32];
1362 key2[0] = 0xFF; let inner2 = MemoryStore::new();
1366 let store2 = EncryptedStateStore::new(inner2, key2);
1367 store2.put("key", b"other secret").unwrap();
1368
1369 let result = store2.get("key").unwrap().unwrap();
1371 assert_eq!(result, b"other secret");
1372 }
1373
1374 #[test]
1375 fn test_key_from_hex() {
1376 let hex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
1377 let key = EncryptedStateStore::<MemoryStore>::key_from_hex(hex).unwrap();
1378 assert_eq!(key[0], 0x01);
1379 assert_eq!(key[1], 0x23);
1380 assert_eq!(key[31], 0xef);
1381 }
1382
1383 #[test]
1384 fn test_key_from_hex_wrong_length() {
1385 let hex = "0123456789abcdef"; let result = EncryptedStateStore::<MemoryStore>::key_from_hex(hex);
1387 assert!(result.is_err());
1388 }
1389
1390 #[test]
1391 fn test_key_from_passphrase() {
1392 let key = EncryptedStateStore::<MemoryStore>::key_from_passphrase(
1393 "my-secret-passphrase",
1394 b"varpulis-salt-00",
1395 )
1396 .unwrap();
1397 assert_eq!(key.len(), 32);
1398
1399 let key2 = EncryptedStateStore::<MemoryStore>::key_from_passphrase(
1401 "my-secret-passphrase",
1402 b"varpulis-salt-00",
1403 )
1404 .unwrap();
1405 assert_eq!(key, key2);
1406 }
1407
1408 #[test]
1409 fn test_encrypted_latest_checkpoint() {
1410 let inner = MemoryStore::new();
1411 let store = EncryptedStateStore::new(inner, test_key());
1412
1413 assert!(store.load_latest_checkpoint().unwrap().is_none());
1415
1416 let cp1 = Checkpoint {
1418 id: 1,
1419 timestamp_ms: 1000,
1420 events_processed: 10,
1421 window_states: HashMap::new(),
1422 pattern_states: HashMap::new(),
1423 metadata: HashMap::new(),
1424 context_states: HashMap::new(),
1425 };
1426 let cp2 = Checkpoint {
1427 id: 2,
1428 timestamp_ms: 2000,
1429 events_processed: 20,
1430 window_states: HashMap::new(),
1431 pattern_states: HashMap::new(),
1432 metadata: HashMap::new(),
1433 context_states: HashMap::new(),
1434 };
1435
1436 store.save_checkpoint(&cp1).unwrap();
1437 store.save_checkpoint(&cp2).unwrap();
1438
1439 let latest = store.load_latest_checkpoint().unwrap().unwrap();
1440 assert_eq!(latest.id, 2);
1441 assert_eq!(latest.events_processed, 20);
1442 }
1443 }
1444}