1use std::collections::HashMap;
19#[cfg(feature = "persistence")]
20use std::path::Path;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use indexmap::IndexMap;
25use rustc_hash::FxBuildHasher;
26use serde::{Deserialize, Serialize};
27#[cfg(feature = "persistence")]
28use tracing::debug;
29use tracing::info;
30
31use crate::event::Event;
32
33#[derive(Debug, Clone)]
35pub struct CheckpointConfig {
36 pub interval: Duration,
38 pub max_checkpoints: usize,
40 pub checkpoint_on_shutdown: bool,
42 pub key_prefix: String,
44}
45
46impl Default for CheckpointConfig {
47 fn default() -> Self {
48 Self {
49 interval: Duration::from_secs(60),
50 max_checkpoints: 3,
51 checkpoint_on_shutdown: true,
52 key_prefix: "varpulis".to_string(),
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct SerializableEvent {
60 pub event_type: String,
61 pub timestamp_ms: i64,
62 pub fields: HashMap<String, SerializableValue>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
67pub enum SerializableValue {
68 Int(i64),
69 Float(f64),
70 Bool(bool),
71 String(String),
72 Null,
73 Timestamp(i64),
74 Duration(u64),
75 Array(Vec<Self>),
76 Map(Vec<(String, Self)>),
77}
78
79fn value_to_serializable(v: &varpulis_core::Value) -> SerializableValue {
81 match v {
82 varpulis_core::Value::Int(i) => SerializableValue::Int(*i),
83 varpulis_core::Value::Float(f) => SerializableValue::Float(*f),
84 varpulis_core::Value::Bool(b) => SerializableValue::Bool(*b),
85 varpulis_core::Value::Str(s) => SerializableValue::String(s.to_string()),
86 varpulis_core::Value::Null => SerializableValue::Null,
87 varpulis_core::Value::Timestamp(ts) => SerializableValue::Timestamp(*ts),
88 varpulis_core::Value::Duration(d) => SerializableValue::Duration(*d),
89 varpulis_core::Value::Array(arr) => {
90 SerializableValue::Array(arr.iter().map(value_to_serializable).collect())
91 }
92 varpulis_core::Value::Map(map) => SerializableValue::Map(
93 map.iter()
94 .map(|(k, v)| (k.to_string(), value_to_serializable(v)))
95 .collect(),
96 ),
97 }
98}
99
100fn serializable_to_value(sv: SerializableValue) -> varpulis_core::Value {
102 match sv {
103 SerializableValue::Int(i) => varpulis_core::Value::Int(i),
104 SerializableValue::Float(f) => varpulis_core::Value::Float(f),
105 SerializableValue::Bool(b) => varpulis_core::Value::Bool(b),
106 SerializableValue::String(s) => varpulis_core::Value::Str(s.into()),
107 SerializableValue::Null => varpulis_core::Value::Null,
108 SerializableValue::Timestamp(ts) => varpulis_core::Value::Timestamp(ts),
109 SerializableValue::Duration(d) => varpulis_core::Value::Duration(d),
110 SerializableValue::Array(arr) => {
111 varpulis_core::Value::array(arr.into_iter().map(serializable_to_value).collect())
112 }
113 SerializableValue::Map(entries) => {
114 let mut map: IndexMap<std::sync::Arc<str>, varpulis_core::Value, FxBuildHasher> =
115 IndexMap::with_hasher(FxBuildHasher);
116 for (k, v) in entries {
117 map.insert(k.into(), serializable_to_value(v));
118 }
119 varpulis_core::Value::map(map)
120 }
121 }
122}
123
124impl From<&Event> for SerializableEvent {
125 fn from(event: &Event) -> Self {
126 let mut fields = HashMap::new();
127 for (k, v) in &event.data {
128 fields.insert(k.to_string(), value_to_serializable(v));
129 }
130 Self {
131 event_type: event.event_type.to_string(),
132 timestamp_ms: event.timestamp.timestamp_millis(),
133 fields,
134 }
135 }
136}
137
138impl From<SerializableEvent> for Event {
139 fn from(se: SerializableEvent) -> Self {
140 let mut event = Self::new(se.event_type);
141 event.timestamp = chrono::DateTime::from_timestamp_millis(se.timestamp_ms)
142 .unwrap_or_else(chrono::Utc::now);
143 for (k, v) in se.fields {
144 event.data.insert(k.into(), serializable_to_value(v));
145 }
146 event
147 }
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct Checkpoint {
153 pub id: u64,
155 pub timestamp_ms: i64,
157 pub events_processed: u64,
159 pub window_states: HashMap<String, WindowCheckpoint>,
161 pub pattern_states: HashMap<String, PatternCheckpoint>,
163 pub metadata: HashMap<String, String>,
165 #[serde(default)]
167 pub context_states: HashMap<String, EngineCheckpoint>,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct WindowCheckpoint {
173 pub events: Vec<SerializableEvent>,
175 pub window_start_ms: Option<i64>,
177 pub last_emit_ms: Option<i64>,
179 pub partitions: HashMap<String, PartitionedWindowCheckpoint>,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct PartitionedWindowCheckpoint {
186 pub events: Vec<SerializableEvent>,
187 pub window_start_ms: Option<i64>,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct PatternCheckpoint {
193 pub partial_matches: Vec<PartialMatchCheckpoint>,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct PartialMatchCheckpoint {
200 pub state: String,
202 pub matched_events: Vec<SerializableEvent>,
204 pub start_ms: i64,
206}
207
208#[derive(Debug, thiserror::Error)]
210pub enum StoreError {
211 #[error("I/O error: {0}")]
213 IoError(String),
214 #[error("Serialization error: {0}")]
216 SerializationError(String),
217 #[error("Key not found: {0}")]
219 NotFound(String),
220 #[error("Store not initialized")]
222 NotInitialized,
223 #[error(
225 "Checkpoint version {checkpoint_version} is newer than supported version {current_version}"
226 )]
227 IncompatibleVersion {
228 checkpoint_version: u32,
229 current_version: u32,
230 },
231}
232
233pub trait StateStore: Send + Sync {
235 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError>;
237
238 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError>;
240
241 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError>;
243
244 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError>;
246
247 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError>;
249
250 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError>;
252
253 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError>;
255
256 fn delete(&self, key: &str) -> Result<(), StoreError>;
258
259 fn flush(&self) -> Result<(), StoreError>;
261}
262
263#[derive(Debug, Default)]
265pub struct MemoryStore {
266 data: std::sync::RwLock<HashMap<String, Vec<u8>>>,
267}
268
269impl MemoryStore {
270 pub fn new() -> Self {
271 Self::default()
272 }
273}
274
275impl StateStore for MemoryStore {
276 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
277 let key = format!("checkpoint:{}", checkpoint.id);
278 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
279 self.put(&key, &data)
280 }
281
282 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
283 let checkpoints = self.list_checkpoints()?;
284 if let Some(id) = checkpoints.last() {
285 self.load_checkpoint(*id)
286 } else {
287 Ok(None)
288 }
289 }
290
291 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
292 let key = format!("checkpoint:{id}");
293 if let Some(data) = self.get(&key)? {
294 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
295 Ok(Some(checkpoint))
296 } else {
297 Ok(None)
298 }
299 }
300
301 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
302 let data = self
303 .data
304 .read()
305 .map_err(|e| StoreError::IoError(e.to_string()))?;
306 let mut ids: Vec<u64> = data
307 .keys()
308 .filter_map(|k| k.strip_prefix("checkpoint:").and_then(|s| s.parse().ok()))
309 .collect();
310 ids.sort_unstable();
311 Ok(ids)
312 }
313
314 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
315 let checkpoints = self.list_checkpoints()?;
316 let to_delete = checkpoints.len().saturating_sub(keep);
317 for id in checkpoints.iter().take(to_delete) {
318 let key = format!("checkpoint:{id}");
319 self.delete(&key)?;
320 }
321 Ok(to_delete)
322 }
323
324 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
325 let mut data = self
326 .data
327 .write()
328 .map_err(|e| StoreError::IoError(e.to_string()))?;
329 data.insert(key.to_string(), value.to_vec());
330 Ok(())
331 }
332
333 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
334 let data = self
335 .data
336 .read()
337 .map_err(|e| StoreError::IoError(e.to_string()))?;
338 Ok(data.get(key).cloned())
339 }
340
341 fn delete(&self, key: &str) -> Result<(), StoreError> {
342 let mut data = self
343 .data
344 .write()
345 .map_err(|e| StoreError::IoError(e.to_string()))?;
346 data.remove(key);
347 Ok(())
348 }
349
350 fn flush(&self) -> Result<(), StoreError> {
351 Ok(()) }
353}
354
355#[cfg(feature = "persistence")]
357pub struct RocksDbStore {
358 db: rocksdb::DB,
359 prefix: String,
360}
361
362#[cfg(feature = "persistence")]
363impl RocksDbStore {
364 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, StoreError> {
366 Self::open_with_prefix(path, "varpulis")
367 }
368
369 pub fn open_with_prefix<P: AsRef<Path>>(path: P, prefix: &str) -> Result<Self, StoreError> {
371 let mut opts = rocksdb::Options::default();
372 opts.create_if_missing(true);
373 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
374
375 opts.set_write_buffer_size(64 * 1024 * 1024); opts.set_max_write_buffer_number(3);
378 opts.set_target_file_size_base(64 * 1024 * 1024);
379
380 let db = rocksdb::DB::open(&opts, path).map_err(|e| StoreError::IoError(e.to_string()))?;
381
382 info!("Opened RocksDB state store");
383 Ok(Self {
384 db,
385 prefix: prefix.to_string(),
386 })
387 }
388
389 fn prefixed_key(&self, key: &str) -> String {
390 format!("{}:{}", self.prefix, key)
391 }
392}
393
394#[cfg(feature = "persistence")]
395impl StateStore for RocksDbStore {
396 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
397 let key = self.prefixed_key(&format!("checkpoint:{}", checkpoint.id));
398 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
399
400 self.db
401 .put(key.as_bytes(), &data)
402 .map_err(|e| StoreError::IoError(e.to_string()))?;
403
404 let latest_key = self.prefixed_key("checkpoint:latest");
406 self.db
407 .put(latest_key.as_bytes(), checkpoint.id.to_le_bytes())
408 .map_err(|e| StoreError::IoError(e.to_string()))?;
409
410 debug!("Saved checkpoint {} ({} bytes)", checkpoint.id, data.len());
411 Ok(())
412 }
413
414 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
415 let latest_key = self.prefixed_key("checkpoint:latest");
416 if let Some(id_bytes) = self
417 .db
418 .get(latest_key.as_bytes())
419 .map_err(|e| StoreError::IoError(e.to_string()))?
420 {
421 let Ok(bytes) = <[u8; 8]>::try_from(id_bytes.as_ref()) else {
422 return Ok(None);
423 };
424 let id = u64::from_le_bytes(bytes);
425 return self.load_checkpoint(id);
426 }
427 Ok(None)
428 }
429
430 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
431 let key = self.prefixed_key(&format!("checkpoint:{}", id));
432 if let Some(data) = self
433 .db
434 .get(key.as_bytes())
435 .map_err(|e| StoreError::IoError(e.to_string()))?
436 {
437 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
438 debug!("Loaded checkpoint {}", id);
439 Ok(Some(checkpoint))
440 } else {
441 Ok(None)
442 }
443 }
444
445 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
446 let prefix = self.prefixed_key("checkpoint:");
447 let mut ids = Vec::new();
448
449 let iter = self.db.prefix_iterator(prefix.as_bytes());
450 for item in iter {
451 let (key, _) = item.map_err(|e| StoreError::IoError(e.to_string()))?;
452 let key_str = String::from_utf8_lossy(&key);
453 if let Some(suffix) = key_str.strip_prefix(&prefix) {
454 if suffix != "latest" {
455 if let Ok(id) = suffix.parse::<u64>() {
456 ids.push(id);
457 }
458 }
459 }
460 }
461
462 ids.sort();
463 Ok(ids)
464 }
465
466 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
467 let checkpoints = self.list_checkpoints()?;
468 let to_delete = checkpoints.len().saturating_sub(keep);
469
470 for id in checkpoints.iter().take(to_delete) {
471 let key = self.prefixed_key(&format!("checkpoint:{}", id));
472 self.db
473 .delete(key.as_bytes())
474 .map_err(|e| StoreError::IoError(e.to_string()))?;
475 }
476
477 if to_delete > 0 {
478 info!("Pruned {} old checkpoints", to_delete);
479 }
480 Ok(to_delete)
481 }
482
483 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
484 let full_key = self.prefixed_key(key);
485 self.db
486 .put(full_key.as_bytes(), value)
487 .map_err(|e| StoreError::IoError(e.to_string()))
488 }
489
490 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
491 let full_key = self.prefixed_key(key);
492 self.db
493 .get(full_key.as_bytes())
494 .map_err(|e| StoreError::IoError(e.to_string()))
495 }
496
497 fn delete(&self, key: &str) -> Result<(), StoreError> {
498 let full_key = self.prefixed_key(key);
499 self.db
500 .delete(full_key.as_bytes())
501 .map_err(|e| StoreError::IoError(e.to_string()))
502 }
503
504 fn flush(&self) -> Result<(), StoreError> {
505 self.db
506 .flush()
507 .map_err(|e| StoreError::IoError(e.to_string()))
508 }
509}
510
511#[derive(Debug)]
517pub struct FileStore {
518 dir: std::path::PathBuf,
519}
520
521impl FileStore {
522 pub fn open(dir: impl AsRef<std::path::Path>) -> Result<Self, StoreError> {
524 let dir = dir.as_ref().to_path_buf();
525 std::fs::create_dir_all(&dir).map_err(|e| StoreError::IoError(e.to_string()))?;
526 Ok(Self { dir })
527 }
528
529 fn key_to_path(&self, key: &str) -> std::path::PathBuf {
530 let path_str = key.replace(':', std::path::MAIN_SEPARATOR_STR);
532 self.dir.join(path_str)
533 }
534}
535
536impl StateStore for FileStore {
537 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
538 let key = format!("checkpoint:{}", checkpoint.id);
539 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
540 self.put(&key, &data)
541 }
542
543 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
544 let checkpoints = self.list_checkpoints()?;
545 if let Some(id) = checkpoints.last() {
546 self.load_checkpoint(*id)
547 } else {
548 Ok(None)
549 }
550 }
551
552 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
553 let key = format!("checkpoint:{id}");
554 if let Some(data) = self.get(&key)? {
555 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
556 Ok(Some(checkpoint))
557 } else {
558 Ok(None)
559 }
560 }
561
562 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
563 let checkpoint_dir = self.dir.join("checkpoint");
564 if !checkpoint_dir.exists() {
565 return Ok(Vec::new());
566 }
567
568 let mut ids: Vec<u64> = Vec::new();
569 let entries =
570 std::fs::read_dir(&checkpoint_dir).map_err(|e| StoreError::IoError(e.to_string()))?;
571 for entry in entries {
572 let entry = entry.map_err(|e| StoreError::IoError(e.to_string()))?;
573 if let Some(name) = entry.file_name().to_str() {
574 if name != "latest" {
575 if let Ok(id) = name.parse::<u64>() {
576 ids.push(id);
577 }
578 }
579 }
580 }
581 ids.sort_unstable();
582 Ok(ids)
583 }
584
585 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
586 let checkpoints = self.list_checkpoints()?;
587 let to_delete = checkpoints.len().saturating_sub(keep);
588 for id in checkpoints.iter().take(to_delete) {
589 let key = format!("checkpoint:{id}");
590 self.delete(&key)?;
591 }
592 Ok(to_delete)
593 }
594
595 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
596 let path = self.key_to_path(key);
597 if let Some(parent) = path.parent() {
598 std::fs::create_dir_all(parent).map_err(|e| StoreError::IoError(e.to_string()))?;
599 }
600
601 let tmp_path = path.with_extension("tmp");
603 std::fs::write(&tmp_path, value).map_err(|e| StoreError::IoError(e.to_string()))?;
604 std::fs::rename(&tmp_path, &path).map_err(|e| StoreError::IoError(e.to_string()))?;
605 Ok(())
606 }
607
608 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
609 let path = self.key_to_path(key);
610 match std::fs::read(&path) {
611 Ok(data) => Ok(Some(data)),
612 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
613 Err(e) => Err(StoreError::IoError(e.to_string())),
614 }
615 }
616
617 fn delete(&self, key: &str) -> Result<(), StoreError> {
618 let path = self.key_to_path(key);
619 match std::fs::remove_file(&path) {
620 Ok(()) => Ok(()),
621 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
622 Err(e) => Err(StoreError::IoError(e.to_string())),
623 }
624 }
625
626 fn flush(&self) -> Result<(), StoreError> {
627 Ok(()) }
629}
630
631pub struct CheckpointManager {
633 store: Arc<dyn StateStore>,
634 config: CheckpointConfig,
635 last_checkpoint: Instant,
636 next_checkpoint_id: u64,
637}
638
639impl std::fmt::Debug for CheckpointManager {
640 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641 f.debug_struct("CheckpointManager")
642 .field("config", &self.config)
643 .field("last_checkpoint", &self.last_checkpoint)
644 .field("next_checkpoint_id", &self.next_checkpoint_id)
645 .finish_non_exhaustive()
646 }
647}
648
649impl CheckpointManager {
650 pub fn new(store: Arc<dyn StateStore>, config: CheckpointConfig) -> Result<Self, StoreError> {
652 let next_id = store.load_latest_checkpoint()?.map_or(1, |c| c.id + 1);
654
655 Ok(Self {
656 store,
657 config,
658 last_checkpoint: Instant::now(),
659 next_checkpoint_id: next_id,
660 })
661 }
662
663 pub fn should_checkpoint(&self) -> bool {
665 self.last_checkpoint.elapsed() >= self.config.interval
666 }
667
668 pub fn checkpoint(&mut self, checkpoint: Checkpoint) -> Result<(), StoreError> {
670 let mut checkpoint = checkpoint;
671 checkpoint.id = self.next_checkpoint_id;
672 checkpoint.timestamp_ms = chrono::Utc::now().timestamp_millis();
673
674 self.store.save_checkpoint(&checkpoint)?;
675 self.store.prune_checkpoints(self.config.max_checkpoints)?;
676 self.store.flush()?;
677
678 self.last_checkpoint = Instant::now();
679 self.next_checkpoint_id += 1;
680
681 info!(
682 "Created checkpoint {} ({} events processed)",
683 checkpoint.id, checkpoint.events_processed
684 );
685 Ok(())
686 }
687
688 pub fn recover(&self) -> Result<Option<Checkpoint>, StoreError> {
690 self.store.load_latest_checkpoint()
691 }
692
693 pub fn store(&self) -> &Arc<dyn StateStore> {
695 &self.store
696 }
697}
698
699pub const CHECKPOINT_VERSION: u32 = 1;
701
702const fn default_checkpoint_version() -> u32 {
704 1
705}
706
707#[derive(Debug, Clone, Serialize, Deserialize)]
709pub struct EngineCheckpoint {
710 #[serde(default = "default_checkpoint_version")]
712 pub version: u32,
713 pub window_states: HashMap<String, WindowCheckpoint>,
715 pub sase_states: HashMap<String, SaseCheckpoint>,
717 pub join_states: HashMap<String, JoinCheckpoint>,
719 pub variables: HashMap<String, SerializableValue>,
721 pub events_processed: u64,
723 pub output_events_emitted: u64,
725 #[serde(default)]
727 pub watermark_state: Option<WatermarkCheckpoint>,
728 #[serde(default)]
730 pub distinct_states: HashMap<String, DistinctCheckpoint>,
731 #[serde(default)]
733 pub limit_states: HashMap<String, LimitCheckpoint>,
734}
735
736impl EngineCheckpoint {
737 pub const fn validate_and_migrate(&mut self) -> Result<(), StoreError> {
742 if self.version > CHECKPOINT_VERSION {
743 return Err(StoreError::IncompatibleVersion {
744 checkpoint_version: self.version,
745 current_version: CHECKPOINT_VERSION,
746 });
747 }
748
749 Ok(())
758 }
759}
760
761#[derive(Debug, Clone, Serialize, Deserialize)]
763pub struct SaseCheckpoint {
764 pub active_runs: Vec<RunCheckpoint>,
766 pub partitioned_runs: HashMap<String, Vec<RunCheckpoint>>,
768 pub watermark_ms: Option<i64>,
770 pub max_timestamp_ms: Option<i64>,
772 pub total_runs_created: u64,
774 pub total_runs_completed: u64,
776 pub total_runs_dropped: u64,
778 pub total_runs_evicted: u64,
780}
781
782#[derive(Debug, Clone, Serialize, Deserialize)]
784pub struct RunCheckpoint {
785 pub current_state: usize,
787 pub stack: Vec<StackEntryCheckpoint>,
789 pub captured: HashMap<String, SerializableEvent>,
791 pub event_time_started_at_ms: Option<i64>,
793 pub event_time_deadline_ms: Option<i64>,
795 pub partition_key: Option<SerializableValue>,
797 pub invalidated: bool,
799 pub pending_negation_count: usize,
801 pub kleene_events: Option<Vec<SerializableEvent>>,
803}
804
805#[derive(Debug, Clone, Serialize, Deserialize)]
807pub struct StackEntryCheckpoint {
808 pub event: SerializableEvent,
809 pub alias: Option<String>,
810}
811
812#[derive(Debug, Clone, Serialize, Deserialize)]
814pub struct JoinCheckpoint {
815 pub buffers: HashMap<String, HashMap<String, Vec<(i64, SerializableEvent)>>>,
817 pub sources: Vec<String>,
819 pub join_keys: HashMap<String, String>,
821 pub window_duration_ms: i64,
823}
824
825#[derive(Debug, Clone, Serialize, Deserialize)]
827pub struct WatermarkCheckpoint {
828 pub sources: HashMap<String, SourceWatermarkCheckpoint>,
830 pub effective_watermark_ms: Option<i64>,
832}
833
834#[derive(Debug, Clone, Serialize, Deserialize)]
836pub struct SourceWatermarkCheckpoint {
837 pub watermark_ms: Option<i64>,
839 pub max_timestamp_ms: Option<i64>,
841 pub max_out_of_orderness_ms: i64,
843}
844
845#[derive(Debug, Clone, Serialize, Deserialize)]
847pub struct DistinctCheckpoint {
848 pub keys: Vec<String>,
850}
851
852#[derive(Debug, Clone, Serialize, Deserialize)]
854pub struct LimitCheckpoint {
855 pub max: usize,
857 pub count: usize,
859}
860
861pub(crate) fn value_to_ser(v: &varpulis_core::Value) -> SerializableValue {
863 value_to_serializable(v)
864}
865
866pub(crate) fn ser_to_value(sv: SerializableValue) -> varpulis_core::Value {
868 serializable_to_value(sv)
869}
870
871#[cfg(feature = "encryption")]
892#[derive(Debug)]
893pub struct EncryptedStateStore<S: StateStore> {
894 inner: S,
895 key: [u8; 32],
896}
897
898#[cfg(feature = "encryption")]
899impl<S: StateStore> EncryptedStateStore<S> {
900 pub fn new(inner: S, key: [u8; 32]) -> Self {
902 Self { inner, key }
903 }
904
905 pub fn key_from_hex(hex_str: &str) -> Result<[u8; 32], StoreError> {
907 let bytes = hex::decode(hex_str.trim())
908 .map_err(|e| StoreError::IoError(format!("Invalid hex key: {}", e)))?;
909 if bytes.len() != 32 {
910 return Err(StoreError::IoError(format!(
911 "Encryption key must be 32 bytes (64 hex chars), got {} bytes",
912 bytes.len()
913 )));
914 }
915 let mut key = [0u8; 32];
916 key.copy_from_slice(&bytes);
917 Ok(key)
918 }
919
920 pub fn key_from_passphrase(passphrase: &str, salt: &[u8]) -> Result<[u8; 32], StoreError> {
922 use argon2::Argon2;
923
924 let mut key = [0u8; 32];
925 Argon2::default()
926 .hash_password_into(passphrase.as_bytes(), salt, &mut key)
927 .map_err(|e| StoreError::IoError(format!("Argon2 key derivation failed: {}", e)))?;
928 Ok(key)
929 }
930
931 fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, StoreError> {
933 use aes_gcm::aead::{Aead, OsRng};
934 use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
935
936 let cipher = Aes256Gcm::new_from_slice(&self.key)
937 .map_err(|e| StoreError::IoError(format!("AES key error: {}", e)))?;
938
939 let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
940 let ciphertext = cipher
941 .encrypt(&nonce, plaintext)
942 .map_err(|e| StoreError::IoError(format!("Encryption failed: {}", e)))?;
943
944 let mut result = Vec::with_capacity(12 + ciphertext.len());
946 result.extend_from_slice(&nonce);
947 result.extend_from_slice(&ciphertext);
948 Ok(result)
949 }
950
951 fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, StoreError> {
953 use aes_gcm::aead::Aead;
954 use aes_gcm::{Aes256Gcm, KeyInit, Nonce};
955
956 if data.len() < 12 {
957 return Err(StoreError::IoError(
958 "Encrypted data too short (missing nonce)".to_string(),
959 ));
960 }
961
962 let (nonce_bytes, ciphertext) = data.split_at(12);
963 let nonce_arr: [u8; 12] = nonce_bytes
964 .try_into()
965 .map_err(|_| StoreError::IoError("Invalid nonce length".to_string()))?;
966 let nonce = Nonce::from(nonce_arr);
967
968 let cipher = Aes256Gcm::new_from_slice(&self.key)
969 .map_err(|e| StoreError::IoError(format!("AES key error: {}", e)))?;
970
971 cipher
972 .decrypt(&nonce, ciphertext)
973 .map_err(|e| StoreError::IoError(format!("Decryption failed: {}", e)))
974 }
975}
976
977#[cfg(feature = "encryption")]
978impl<S: StateStore> StateStore for EncryptedStateStore<S> {
979 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
980 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
982 let encrypted = self.encrypt(&data)?;
983
984 let key = format!("checkpoint:{}", checkpoint.id);
986 self.inner.put(&key, &encrypted)?;
987
988 Ok(())
991 }
992
993 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
994 let checkpoints = self.list_checkpoints()?;
995 if let Some(id) = checkpoints.last() {
996 self.load_checkpoint(*id)
997 } else {
998 Ok(None)
999 }
1000 }
1001
1002 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
1003 let key = format!("checkpoint:{}", id);
1004 if let Some(encrypted) = self.inner.get(&key)? {
1005 let data = self.decrypt(&encrypted)?;
1006 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
1007 Ok(Some(checkpoint))
1008 } else {
1009 Ok(None)
1010 }
1011 }
1012
1013 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
1014 self.inner.list_checkpoints()
1015 }
1016
1017 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
1018 self.inner.prune_checkpoints(keep)
1019 }
1020
1021 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
1022 let encrypted = self.encrypt(value)?;
1023 self.inner.put(key, &encrypted)
1024 }
1025
1026 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
1027 match self.inner.get(key)? {
1028 Some(encrypted) => Ok(Some(self.decrypt(&encrypted)?)),
1029 None => Ok(None),
1030 }
1031 }
1032
1033 fn delete(&self, key: &str) -> Result<(), StoreError> {
1034 self.inner.delete(key)
1035 }
1036
1037 fn flush(&self) -> Result<(), StoreError> {
1038 self.inner.flush()
1039 }
1040}
1041
1042#[cfg(test)]
1043mod tests {
1044 use super::*;
1045
1046 #[test]
1047 fn test_memory_store_checkpoint() {
1048 let store = MemoryStore::new();
1049
1050 let checkpoint = Checkpoint {
1051 id: 1,
1052 timestamp_ms: 1000,
1053 events_processed: 100,
1054 window_states: HashMap::new(),
1055 pattern_states: HashMap::new(),
1056 metadata: HashMap::new(),
1057 context_states: HashMap::new(),
1058 };
1059
1060 store.save_checkpoint(&checkpoint).unwrap();
1061
1062 let loaded = store.load_checkpoint(1).unwrap();
1063 assert!(loaded.is_some());
1064 let loaded = loaded.unwrap();
1065 assert_eq!(loaded.id, 1);
1066 assert_eq!(loaded.events_processed, 100);
1067 }
1068
1069 #[test]
1070 fn test_memory_store_prune() {
1071 let store = MemoryStore::new();
1072
1073 for i in 1..=5 {
1074 let checkpoint = Checkpoint {
1075 id: i,
1076 timestamp_ms: i as i64 * 1000,
1077 events_processed: i * 100,
1078 window_states: HashMap::new(),
1079 pattern_states: HashMap::new(),
1080 metadata: HashMap::new(),
1081 context_states: HashMap::new(),
1082 };
1083 store.save_checkpoint(&checkpoint).unwrap();
1084 }
1085
1086 let checkpoints = store.list_checkpoints().unwrap();
1087 assert_eq!(checkpoints.len(), 5);
1088
1089 let pruned = store.prune_checkpoints(2).unwrap();
1090 assert_eq!(pruned, 3);
1091
1092 let checkpoints = store.list_checkpoints().unwrap();
1093 assert_eq!(checkpoints.len(), 2);
1094 assert_eq!(checkpoints, vec![4, 5]);
1095 }
1096
1097 #[test]
1098 fn test_file_store_put_get_delete() {
1099 let dir = tempfile::tempdir().unwrap();
1100 let store = FileStore::open(dir.path()).unwrap();
1101
1102 store.put("test:key1", b"hello world").unwrap();
1104 let val = store.get("test:key1").unwrap();
1105 assert_eq!(val, Some(b"hello world".to_vec()));
1106
1107 let val = store.get("test:missing").unwrap();
1109 assert!(val.is_none());
1110
1111 store.delete("test:key1").unwrap();
1113 let val = store.get("test:key1").unwrap();
1114 assert!(val.is_none());
1115
1116 store.delete("test:missing").unwrap();
1118 }
1119
1120 #[test]
1121 fn test_file_store_atomic_write() {
1122 let dir = tempfile::tempdir().unwrap();
1123 let store = FileStore::open(dir.path()).unwrap();
1124
1125 store.put("data:file1", b"version 1").unwrap();
1127 assert_eq!(
1128 store.get("data:file1").unwrap(),
1129 Some(b"version 1".to_vec())
1130 );
1131
1132 store.put("data:file1", b"version 2").unwrap();
1134 assert_eq!(
1135 store.get("data:file1").unwrap(),
1136 Some(b"version 2".to_vec())
1137 );
1138
1139 let data_dir = dir.path().join("data");
1141 if data_dir.exists() {
1142 for entry in std::fs::read_dir(&data_dir).unwrap() {
1143 let entry = entry.unwrap();
1144 let name = entry.file_name().to_string_lossy().to_string();
1145 assert!(
1146 !std::path::Path::new(&name)
1147 .extension()
1148 .is_some_and(|ext| ext.eq_ignore_ascii_case("tmp")),
1149 "tmp file left behind: {name}"
1150 );
1151 }
1152 }
1153 }
1154
1155 #[test]
1156 fn test_serializable_event() {
1157 let mut event = Event::new("TestEvent");
1158 event
1159 .data
1160 .insert("count".into(), varpulis_core::Value::Int(42));
1161 event
1162 .data
1163 .insert("value".into(), varpulis_core::Value::Float(1.5));
1164 event
1165 .data
1166 .insert("name".into(), varpulis_core::Value::Str("test".into()));
1167
1168 let serializable: SerializableEvent = (&event).into();
1169 let restored: Event = serializable.into();
1170
1171 assert_eq!(&*restored.event_type, "TestEvent");
1172 assert_eq!(restored.get_int("count"), Some(42));
1173 assert_eq!(restored.get_float("value"), Some(1.5));
1174 assert_eq!(restored.get_str("name"), Some("test"));
1175 }
1176
1177 #[test]
1178 fn test_serializable_event_complex_values() {
1179 let mut event = Event::new("ComplexEvent");
1180
1181 event.data.insert(
1183 "ts".into(),
1184 varpulis_core::Value::Timestamp(1_700_000_000_000_000_000),
1185 );
1186
1187 event
1189 .data
1190 .insert("dur".into(), varpulis_core::Value::Duration(5_000_000_000));
1191
1192 event.data.insert(
1194 "tags".into(),
1195 varpulis_core::Value::array(vec![
1196 varpulis_core::Value::Str("a".into()),
1197 varpulis_core::Value::Int(1),
1198 ]),
1199 );
1200
1201 let mut inner_map = IndexMap::with_hasher(FxBuildHasher);
1203 inner_map.insert("nested_key".into(), varpulis_core::Value::Float(3.15));
1204 inner_map.insert("flag".into(), varpulis_core::Value::Bool(true));
1205 event
1206 .data
1207 .insert("meta".into(), varpulis_core::Value::map(inner_map));
1208
1209 let serializable: SerializableEvent = (&event).into();
1211
1212 assert!(matches!(
1214 serializable.fields.get("ts"),
1215 Some(SerializableValue::Timestamp(1_700_000_000_000_000_000))
1216 ));
1217 assert!(matches!(
1218 serializable.fields.get("dur"),
1219 Some(SerializableValue::Duration(5_000_000_000))
1220 ));
1221 assert!(matches!(
1222 serializable.fields.get("tags"),
1223 Some(SerializableValue::Array(_))
1224 ));
1225 assert!(matches!(
1226 serializable.fields.get("meta"),
1227 Some(SerializableValue::Map(_))
1228 ));
1229
1230 let restored: Event = serializable.into();
1231
1232 assert_eq!(
1234 restored.data.get("ts"),
1235 Some(&varpulis_core::Value::Timestamp(1_700_000_000_000_000_000))
1236 );
1237 assert_eq!(
1238 restored.data.get("dur"),
1239 Some(&varpulis_core::Value::Duration(5_000_000_000))
1240 );
1241
1242 match restored.data.get("tags") {
1244 Some(varpulis_core::Value::Array(arr)) => {
1245 assert_eq!(arr.len(), 2);
1246 assert_eq!(arr[0], varpulis_core::Value::Str("a".into()));
1247 assert_eq!(arr[1], varpulis_core::Value::Int(1));
1248 }
1249 other => panic!("Expected Array, got {other:?}"),
1250 }
1251
1252 match restored.data.get("meta") {
1254 Some(varpulis_core::Value::Map(m)) => {
1255 assert_eq!(m.len(), 2);
1256 assert_eq!(
1257 m.get("nested_key"),
1258 Some(&varpulis_core::Value::Float(3.15))
1259 );
1260 assert_eq!(m.get("flag"), Some(&varpulis_core::Value::Bool(true)));
1261 }
1262 other => panic!("Expected Map, got {other:?}"),
1263 }
1264 }
1265
1266 #[cfg(feature = "encryption")]
1271 mod encryption_tests {
1272 use super::*;
1273
1274 fn test_key() -> [u8; 32] {
1275 let mut key = [0u8; 32];
1277 for (i, b) in key.iter_mut().enumerate() {
1278 *b = i as u8;
1279 }
1280 key
1281 }
1282
1283 #[test]
1284 fn test_encrypted_put_get_roundtrip() {
1285 let inner = MemoryStore::new();
1286 let store = EncryptedStateStore::new(inner, test_key());
1287
1288 let data = b"hello, encrypted world!";
1289 store.put("test_key", data).unwrap();
1290
1291 let retrieved = store.get("test_key").unwrap().unwrap();
1292 assert_eq!(retrieved, data);
1293 }
1294
1295 #[test]
1296 fn test_encrypted_data_differs_from_plaintext() {
1297 let inner = MemoryStore::new();
1298 let store = EncryptedStateStore::new(inner, test_key());
1299
1300 let data = b"sensitive data";
1301 store.put("test_key", data).unwrap();
1302
1303 let raw = store.inner.get("test_key").unwrap().unwrap();
1305 assert_ne!(raw, data.to_vec());
1306 assert!(raw.len() > data.len()); }
1308
1309 #[test]
1310 fn test_encrypted_checkpoint_roundtrip() {
1311 let inner = MemoryStore::new();
1312 let store = EncryptedStateStore::new(inner, test_key());
1313
1314 let checkpoint = Checkpoint {
1315 id: 1,
1316 timestamp_ms: 1700000000000,
1317 events_processed: 42,
1318 window_states: HashMap::new(),
1319 pattern_states: HashMap::new(),
1320 metadata: HashMap::new(),
1321 context_states: HashMap::new(),
1322 };
1323
1324 store.save_checkpoint(&checkpoint).unwrap();
1325 let loaded = store.load_checkpoint(1).unwrap().unwrap();
1326
1327 assert_eq!(loaded.id, 1);
1328 assert_eq!(loaded.events_processed, 42);
1329 assert_eq!(loaded.timestamp_ms, 1700000000000);
1330 }
1331
1332 #[test]
1333 fn test_encrypted_get_nonexistent_key() {
1334 let inner = MemoryStore::new();
1335 let store = EncryptedStateStore::new(inner, test_key());
1336
1337 let result = store.get("nonexistent").unwrap();
1338 assert!(result.is_none());
1339 }
1340
1341 #[test]
1342 fn test_encrypted_delete() {
1343 let inner = MemoryStore::new();
1344 let store = EncryptedStateStore::new(inner, test_key());
1345
1346 store.put("key", b"value").unwrap();
1347 assert!(store.get("key").unwrap().is_some());
1348
1349 store.delete("key").unwrap();
1350 assert!(store.get("key").unwrap().is_none());
1351 }
1352
1353 #[test]
1354 fn test_wrong_key_fails_decryption() {
1355 let inner = MemoryStore::new();
1356 let key1 = test_key();
1357 let store1 = EncryptedStateStore::new(inner, key1);
1358
1359 store1.put("key", b"secret").unwrap();
1360
1361 let mut key2 = [0u8; 32];
1363 key2[0] = 0xFF; let inner2 = MemoryStore::new();
1367 let store2 = EncryptedStateStore::new(inner2, key2);
1368 store2.put("key", b"other secret").unwrap();
1369
1370 let result = store2.get("key").unwrap().unwrap();
1372 assert_eq!(result, b"other secret");
1373 }
1374
1375 #[test]
1376 fn test_key_from_hex() {
1377 let hex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
1378 let key = EncryptedStateStore::<MemoryStore>::key_from_hex(hex).unwrap();
1379 assert_eq!(key[0], 0x01);
1380 assert_eq!(key[1], 0x23);
1381 assert_eq!(key[31], 0xef);
1382 }
1383
1384 #[test]
1385 fn test_key_from_hex_wrong_length() {
1386 let hex = "0123456789abcdef"; let result = EncryptedStateStore::<MemoryStore>::key_from_hex(hex);
1388 assert!(result.is_err());
1389 }
1390
1391 #[test]
1392 fn test_key_from_passphrase() {
1393 let key = EncryptedStateStore::<MemoryStore>::key_from_passphrase(
1394 "my-secret-passphrase",
1395 b"varpulis-salt-00",
1396 )
1397 .unwrap();
1398 assert_eq!(key.len(), 32);
1399
1400 let key2 = EncryptedStateStore::<MemoryStore>::key_from_passphrase(
1402 "my-secret-passphrase",
1403 b"varpulis-salt-00",
1404 )
1405 .unwrap();
1406 assert_eq!(key, key2);
1407 }
1408
1409 #[test]
1410 fn test_encrypted_latest_checkpoint() {
1411 let inner = MemoryStore::new();
1412 let store = EncryptedStateStore::new(inner, test_key());
1413
1414 assert!(store.load_latest_checkpoint().unwrap().is_none());
1416
1417 let cp1 = Checkpoint {
1419 id: 1,
1420 timestamp_ms: 1000,
1421 events_processed: 10,
1422 window_states: HashMap::new(),
1423 pattern_states: HashMap::new(),
1424 metadata: HashMap::new(),
1425 context_states: HashMap::new(),
1426 };
1427 let cp2 = Checkpoint {
1428 id: 2,
1429 timestamp_ms: 2000,
1430 events_processed: 20,
1431 window_states: HashMap::new(),
1432 pattern_states: HashMap::new(),
1433 metadata: HashMap::new(),
1434 context_states: HashMap::new(),
1435 };
1436
1437 store.save_checkpoint(&cp1).unwrap();
1438 store.save_checkpoint(&cp2).unwrap();
1439
1440 let latest = store.load_latest_checkpoint().unwrap().unwrap();
1441 assert_eq!(latest.id, 2);
1442 assert_eq!(latest.events_processed, 20);
1443 }
1444 }
1445}