1use std::collections::HashMap;
29#[cfg(feature = "persistence")]
30use std::path::Path;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33
34use indexmap::IndexMap;
35use rustc_hash::FxBuildHasher;
36use serde::{Deserialize, Serialize};
37#[cfg(feature = "persistence")]
38use tracing::debug;
39use tracing::info;
40
41use crate::event::Event;
42
43#[derive(Debug, Clone)]
45pub struct CheckpointConfig {
46 pub interval: Duration,
48 pub max_checkpoints: usize,
50 pub checkpoint_on_shutdown: bool,
52 pub key_prefix: String,
54}
55
56impl Default for CheckpointConfig {
57 fn default() -> Self {
58 Self {
59 interval: Duration::from_secs(60),
60 max_checkpoints: 3,
61 checkpoint_on_shutdown: true,
62 key_prefix: "varpulis".to_string(),
63 }
64 }
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct SerializableEvent {
70 pub event_type: String,
71 pub timestamp_ms: i64,
72 pub fields: HashMap<String, SerializableValue>,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
77pub enum SerializableValue {
78 Int(i64),
79 Float(f64),
80 Bool(bool),
81 String(String),
82 Null,
83 Timestamp(i64),
84 Duration(u64),
85 Array(Vec<Self>),
86 Map(Vec<(String, Self)>),
87}
88
89fn value_to_serializable(v: &varpulis_core::Value) -> SerializableValue {
91 match v {
92 varpulis_core::Value::Int(i) => SerializableValue::Int(*i),
93 varpulis_core::Value::Float(f) => SerializableValue::Float(*f),
94 varpulis_core::Value::Bool(b) => SerializableValue::Bool(*b),
95 varpulis_core::Value::Str(s) => SerializableValue::String(s.to_string()),
96 varpulis_core::Value::Null => SerializableValue::Null,
97 varpulis_core::Value::Timestamp(ts) => SerializableValue::Timestamp(*ts),
98 varpulis_core::Value::Duration(d) => SerializableValue::Duration(*d),
99 varpulis_core::Value::Array(arr) => {
100 SerializableValue::Array(arr.iter().map(value_to_serializable).collect())
101 }
102 varpulis_core::Value::Map(map) => SerializableValue::Map(
103 map.iter()
104 .map(|(k, v)| (k.to_string(), value_to_serializable(v)))
105 .collect(),
106 ),
107 }
108}
109
110fn serializable_to_value(sv: SerializableValue) -> varpulis_core::Value {
112 match sv {
113 SerializableValue::Int(i) => varpulis_core::Value::Int(i),
114 SerializableValue::Float(f) => varpulis_core::Value::Float(f),
115 SerializableValue::Bool(b) => varpulis_core::Value::Bool(b),
116 SerializableValue::String(s) => varpulis_core::Value::Str(s.into()),
117 SerializableValue::Null => varpulis_core::Value::Null,
118 SerializableValue::Timestamp(ts) => varpulis_core::Value::Timestamp(ts),
119 SerializableValue::Duration(d) => varpulis_core::Value::Duration(d),
120 SerializableValue::Array(arr) => {
121 varpulis_core::Value::array(arr.into_iter().map(serializable_to_value).collect())
122 }
123 SerializableValue::Map(entries) => {
124 let mut map: IndexMap<std::sync::Arc<str>, varpulis_core::Value, FxBuildHasher> =
125 IndexMap::with_hasher(FxBuildHasher);
126 for (k, v) in entries {
127 map.insert(k.into(), serializable_to_value(v));
128 }
129 varpulis_core::Value::map(map)
130 }
131 }
132}
133
134impl From<&Event> for SerializableEvent {
135 fn from(event: &Event) -> Self {
136 let mut fields = HashMap::new();
137 for (k, v) in &event.data {
138 fields.insert(k.to_string(), value_to_serializable(v));
139 }
140 Self {
141 event_type: event.event_type.to_string(),
142 timestamp_ms: event.timestamp.timestamp_millis(),
143 fields,
144 }
145 }
146}
147
148impl From<SerializableEvent> for Event {
149 fn from(se: SerializableEvent) -> Self {
150 let mut event = Self::new(se.event_type);
151 event.timestamp = chrono::DateTime::from_timestamp_millis(se.timestamp_ms)
152 .unwrap_or_else(chrono::Utc::now);
153 for (k, v) in se.fields {
154 event.data.insert(k.into(), serializable_to_value(v));
155 }
156 event
157 }
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct Checkpoint {
163 pub id: u64,
165 pub timestamp_ms: i64,
167 pub events_processed: u64,
169 pub window_states: HashMap<String, WindowCheckpoint>,
171 pub pattern_states: HashMap<String, PatternCheckpoint>,
173 pub metadata: HashMap<String, String>,
175 #[serde(default)]
177 pub context_states: HashMap<String, EngineCheckpoint>,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct WindowCheckpoint {
183 pub events: Vec<SerializableEvent>,
185 pub window_start_ms: Option<i64>,
187 pub last_emit_ms: Option<i64>,
189 pub partitions: HashMap<String, PartitionedWindowCheckpoint>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct PartitionedWindowCheckpoint {
196 pub events: Vec<SerializableEvent>,
197 pub window_start_ms: Option<i64>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct PatternCheckpoint {
203 pub partial_matches: Vec<PartialMatchCheckpoint>,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct PartialMatchCheckpoint {
210 pub state: String,
212 pub matched_events: Vec<SerializableEvent>,
214 pub start_ms: i64,
216}
217
218#[derive(Debug, thiserror::Error)]
220pub enum StoreError {
221 #[error("I/O error: {0}")]
223 IoError(String),
224 #[error("Serialization error: {0}")]
226 SerializationError(String),
227 #[error("Key not found: {0}")]
229 NotFound(String),
230 #[error("Store not initialized")]
232 NotInitialized,
233 #[error(
235 "Checkpoint version {checkpoint_version} is newer than supported version {current_version}"
236 )]
237 IncompatibleVersion {
238 checkpoint_version: u32,
239 current_version: u32,
240 },
241}
242
243pub trait StateStore: Send + Sync {
245 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError>;
247
248 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError>;
250
251 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError>;
253
254 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError>;
256
257 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError>;
259
260 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError>;
262
263 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError>;
265
266 fn delete(&self, key: &str) -> Result<(), StoreError>;
268
269 fn flush(&self) -> Result<(), StoreError>;
271}
272
273#[derive(Debug, Default)]
275pub struct MemoryStore {
276 data: std::sync::RwLock<HashMap<String, Vec<u8>>>,
277}
278
279impl MemoryStore {
280 pub fn new() -> Self {
281 Self::default()
282 }
283}
284
285impl StateStore for MemoryStore {
286 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
287 let key = format!("checkpoint:{}", checkpoint.id);
288 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
289 self.put(&key, &data)
290 }
291
292 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
293 let checkpoints = self.list_checkpoints()?;
294 if let Some(id) = checkpoints.last() {
295 self.load_checkpoint(*id)
296 } else {
297 Ok(None)
298 }
299 }
300
301 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
302 let key = format!("checkpoint:{id}");
303 if let Some(data) = self.get(&key)? {
304 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
305 Ok(Some(checkpoint))
306 } else {
307 Ok(None)
308 }
309 }
310
311 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
312 let data = self
313 .data
314 .read()
315 .map_err(|e| StoreError::IoError(e.to_string()))?;
316 let mut ids: Vec<u64> = data
317 .keys()
318 .filter_map(|k| k.strip_prefix("checkpoint:").and_then(|s| s.parse().ok()))
319 .collect();
320 ids.sort_unstable();
321 Ok(ids)
322 }
323
324 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
325 let checkpoints = self.list_checkpoints()?;
326 let to_delete = checkpoints.len().saturating_sub(keep);
327 for id in checkpoints.iter().take(to_delete) {
328 let key = format!("checkpoint:{id}");
329 self.delete(&key)?;
330 }
331 Ok(to_delete)
332 }
333
334 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
335 let mut data = self
336 .data
337 .write()
338 .map_err(|e| StoreError::IoError(e.to_string()))?;
339 data.insert(key.to_string(), value.to_vec());
340 Ok(())
341 }
342
343 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
344 let data = self
345 .data
346 .read()
347 .map_err(|e| StoreError::IoError(e.to_string()))?;
348 Ok(data.get(key).cloned())
349 }
350
351 fn delete(&self, key: &str) -> Result<(), StoreError> {
352 let mut data = self
353 .data
354 .write()
355 .map_err(|e| StoreError::IoError(e.to_string()))?;
356 data.remove(key);
357 Ok(())
358 }
359
360 fn flush(&self) -> Result<(), StoreError> {
361 Ok(()) }
363}
364
365#[cfg(feature = "persistence")]
367pub struct RocksDbStore {
368 db: rocksdb::DB,
369 prefix: String,
370}
371
372#[cfg(feature = "persistence")]
373impl RocksDbStore {
374 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, StoreError> {
376 Self::open_with_prefix(path, "varpulis")
377 }
378
379 pub fn open_with_prefix<P: AsRef<Path>>(path: P, prefix: &str) -> Result<Self, StoreError> {
381 let mut opts = rocksdb::Options::default();
382 opts.create_if_missing(true);
383 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
384
385 opts.set_write_buffer_size(64 * 1024 * 1024); opts.set_max_write_buffer_number(3);
388 opts.set_target_file_size_base(64 * 1024 * 1024);
389
390 let db = rocksdb::DB::open(&opts, path).map_err(|e| StoreError::IoError(e.to_string()))?;
391
392 info!("Opened RocksDB state store");
393 Ok(Self {
394 db,
395 prefix: prefix.to_string(),
396 })
397 }
398
399 fn prefixed_key(&self, key: &str) -> String {
400 format!("{}:{}", self.prefix, key)
401 }
402}
403
404#[cfg(feature = "persistence")]
405impl StateStore for RocksDbStore {
406 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
407 let key = self.prefixed_key(&format!("checkpoint:{}", checkpoint.id));
408 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
409
410 self.db
411 .put(key.as_bytes(), &data)
412 .map_err(|e| StoreError::IoError(e.to_string()))?;
413
414 let latest_key = self.prefixed_key("checkpoint:latest");
416 self.db
417 .put(latest_key.as_bytes(), checkpoint.id.to_le_bytes())
418 .map_err(|e| StoreError::IoError(e.to_string()))?;
419
420 debug!("Saved checkpoint {} ({} bytes)", checkpoint.id, data.len());
421 Ok(())
422 }
423
424 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
425 let latest_key = self.prefixed_key("checkpoint:latest");
426 if let Some(id_bytes) = self
427 .db
428 .get(latest_key.as_bytes())
429 .map_err(|e| StoreError::IoError(e.to_string()))?
430 {
431 let Ok(bytes) = <[u8; 8]>::try_from(id_bytes.as_ref()) else {
432 return Ok(None);
433 };
434 let id = u64::from_le_bytes(bytes);
435 return self.load_checkpoint(id);
436 }
437 Ok(None)
438 }
439
440 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
441 let key = self.prefixed_key(&format!("checkpoint:{}", id));
442 if let Some(data) = self
443 .db
444 .get(key.as_bytes())
445 .map_err(|e| StoreError::IoError(e.to_string()))?
446 {
447 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
448 debug!("Loaded checkpoint {}", id);
449 Ok(Some(checkpoint))
450 } else {
451 Ok(None)
452 }
453 }
454
455 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
456 let prefix = self.prefixed_key("checkpoint:");
457 let mut ids = Vec::new();
458
459 let iter = self.db.prefix_iterator(prefix.as_bytes());
460 for item in iter {
461 let (key, _) = item.map_err(|e| StoreError::IoError(e.to_string()))?;
462 let key_str = String::from_utf8_lossy(&key);
463 if let Some(suffix) = key_str.strip_prefix(&prefix) {
464 if suffix != "latest" {
465 if let Ok(id) = suffix.parse::<u64>() {
466 ids.push(id);
467 }
468 }
469 }
470 }
471
472 ids.sort();
473 Ok(ids)
474 }
475
476 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
477 let checkpoints = self.list_checkpoints()?;
478 let to_delete = checkpoints.len().saturating_sub(keep);
479
480 for id in checkpoints.iter().take(to_delete) {
481 let key = self.prefixed_key(&format!("checkpoint:{}", id));
482 self.db
483 .delete(key.as_bytes())
484 .map_err(|e| StoreError::IoError(e.to_string()))?;
485 }
486
487 if to_delete > 0 {
488 info!("Pruned {} old checkpoints", to_delete);
489 }
490 Ok(to_delete)
491 }
492
493 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
494 let full_key = self.prefixed_key(key);
495 self.db
496 .put(full_key.as_bytes(), value)
497 .map_err(|e| StoreError::IoError(e.to_string()))
498 }
499
500 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
501 let full_key = self.prefixed_key(key);
502 self.db
503 .get(full_key.as_bytes())
504 .map_err(|e| StoreError::IoError(e.to_string()))
505 }
506
507 fn delete(&self, key: &str) -> Result<(), StoreError> {
508 let full_key = self.prefixed_key(key);
509 self.db
510 .delete(full_key.as_bytes())
511 .map_err(|e| StoreError::IoError(e.to_string()))
512 }
513
514 fn flush(&self) -> Result<(), StoreError> {
515 self.db
516 .flush()
517 .map_err(|e| StoreError::IoError(e.to_string()))
518 }
519}
520
521#[derive(Debug)]
527pub struct FileStore {
528 dir: std::path::PathBuf,
529}
530
531impl FileStore {
532 pub fn open(dir: impl AsRef<std::path::Path>) -> Result<Self, StoreError> {
534 let dir = dir.as_ref().to_path_buf();
535 std::fs::create_dir_all(&dir).map_err(|e| StoreError::IoError(e.to_string()))?;
536 Ok(Self { dir })
537 }
538
539 fn key_to_path(&self, key: &str) -> std::path::PathBuf {
540 let path_str = key.replace(':', std::path::MAIN_SEPARATOR_STR);
542 self.dir.join(path_str)
543 }
544}
545
546impl StateStore for FileStore {
547 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
548 let key = format!("checkpoint:{}", checkpoint.id);
549 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
550 self.put(&key, &data)
551 }
552
553 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
554 let checkpoints = self.list_checkpoints()?;
555 if let Some(id) = checkpoints.last() {
556 self.load_checkpoint(*id)
557 } else {
558 Ok(None)
559 }
560 }
561
562 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
563 let key = format!("checkpoint:{id}");
564 if let Some(data) = self.get(&key)? {
565 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
566 Ok(Some(checkpoint))
567 } else {
568 Ok(None)
569 }
570 }
571
572 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
573 let checkpoint_dir = self.dir.join("checkpoint");
574 if !checkpoint_dir.exists() {
575 return Ok(Vec::new());
576 }
577
578 let mut ids: Vec<u64> = Vec::new();
579 let entries =
580 std::fs::read_dir(&checkpoint_dir).map_err(|e| StoreError::IoError(e.to_string()))?;
581 for entry in entries {
582 let entry = entry.map_err(|e| StoreError::IoError(e.to_string()))?;
583 if let Some(name) = entry.file_name().to_str() {
584 if name != "latest" {
585 if let Ok(id) = name.parse::<u64>() {
586 ids.push(id);
587 }
588 }
589 }
590 }
591 ids.sort_unstable();
592 Ok(ids)
593 }
594
595 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
596 let checkpoints = self.list_checkpoints()?;
597 let to_delete = checkpoints.len().saturating_sub(keep);
598 for id in checkpoints.iter().take(to_delete) {
599 let key = format!("checkpoint:{id}");
600 self.delete(&key)?;
601 }
602 Ok(to_delete)
603 }
604
605 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
606 let path = self.key_to_path(key);
607 if let Some(parent) = path.parent() {
608 std::fs::create_dir_all(parent).map_err(|e| StoreError::IoError(e.to_string()))?;
609 }
610
611 let tmp_path = path.with_extension("tmp");
613 std::fs::write(&tmp_path, value).map_err(|e| StoreError::IoError(e.to_string()))?;
614 std::fs::rename(&tmp_path, &path).map_err(|e| StoreError::IoError(e.to_string()))?;
615 Ok(())
616 }
617
618 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
619 let path = self.key_to_path(key);
620 match std::fs::read(&path) {
621 Ok(data) => Ok(Some(data)),
622 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
623 Err(e) => Err(StoreError::IoError(e.to_string())),
624 }
625 }
626
627 fn delete(&self, key: &str) -> Result<(), StoreError> {
628 let path = self.key_to_path(key);
629 match std::fs::remove_file(&path) {
630 Ok(()) => Ok(()),
631 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
632 Err(e) => Err(StoreError::IoError(e.to_string())),
633 }
634 }
635
636 fn flush(&self) -> Result<(), StoreError> {
637 Ok(()) }
639}
640
641pub struct CheckpointManager {
643 store: Arc<dyn StateStore>,
644 config: CheckpointConfig,
645 last_checkpoint: Instant,
646 next_checkpoint_id: u64,
647}
648
649impl std::fmt::Debug for CheckpointManager {
650 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
651 f.debug_struct("CheckpointManager")
652 .field("config", &self.config)
653 .field("last_checkpoint", &self.last_checkpoint)
654 .field("next_checkpoint_id", &self.next_checkpoint_id)
655 .finish_non_exhaustive()
656 }
657}
658
659impl CheckpointManager {
660 pub fn new(store: Arc<dyn StateStore>, config: CheckpointConfig) -> Result<Self, StoreError> {
662 let next_id = store.load_latest_checkpoint()?.map_or(1, |c| c.id + 1);
664
665 Ok(Self {
666 store,
667 config,
668 last_checkpoint: Instant::now(),
669 next_checkpoint_id: next_id,
670 })
671 }
672
673 pub fn should_checkpoint(&self) -> bool {
675 self.last_checkpoint.elapsed() >= self.config.interval
676 }
677
678 pub fn checkpoint(&mut self, checkpoint: Checkpoint) -> Result<(), StoreError> {
680 let mut checkpoint = checkpoint;
681 checkpoint.id = self.next_checkpoint_id;
682 checkpoint.timestamp_ms = chrono::Utc::now().timestamp_millis();
683
684 self.store.save_checkpoint(&checkpoint)?;
685 self.store.prune_checkpoints(self.config.max_checkpoints)?;
686 self.store.flush()?;
687
688 self.last_checkpoint = Instant::now();
689 self.next_checkpoint_id += 1;
690
691 info!(
692 "Created checkpoint {} ({} events processed)",
693 checkpoint.id, checkpoint.events_processed
694 );
695 Ok(())
696 }
697
698 pub fn recover(&self) -> Result<Option<Checkpoint>, StoreError> {
700 self.store.load_latest_checkpoint()
701 }
702
703 pub fn store(&self) -> &Arc<dyn StateStore> {
705 &self.store
706 }
707}
708
709pub const CHECKPOINT_VERSION: u32 = 1;
711
712const fn default_checkpoint_version() -> u32 {
714 1
715}
716
717#[derive(Debug, Clone, Serialize, Deserialize)]
719pub struct EngineCheckpoint {
720 #[serde(default = "default_checkpoint_version")]
722 pub version: u32,
723 pub window_states: HashMap<String, WindowCheckpoint>,
725 pub sase_states: HashMap<String, SaseCheckpoint>,
727 pub join_states: HashMap<String, JoinCheckpoint>,
729 pub variables: HashMap<String, SerializableValue>,
731 pub events_processed: u64,
733 pub output_events_emitted: u64,
735 #[serde(default)]
737 pub watermark_state: Option<WatermarkCheckpoint>,
738 #[serde(default)]
740 pub distinct_states: HashMap<String, DistinctCheckpoint>,
741 #[serde(default)]
743 pub limit_states: HashMap<String, LimitCheckpoint>,
744}
745
746impl EngineCheckpoint {
747 pub const fn validate_and_migrate(&mut self) -> Result<(), StoreError> {
752 if self.version > CHECKPOINT_VERSION {
753 return Err(StoreError::IncompatibleVersion {
754 checkpoint_version: self.version,
755 current_version: CHECKPOINT_VERSION,
756 });
757 }
758
759 Ok(())
768 }
769}
770
771#[derive(Debug, Clone, Serialize, Deserialize)]
773pub struct SaseCheckpoint {
774 pub active_runs: Vec<RunCheckpoint>,
776 pub partitioned_runs: HashMap<String, Vec<RunCheckpoint>>,
778 pub watermark_ms: Option<i64>,
780 pub max_timestamp_ms: Option<i64>,
782 pub total_runs_created: u64,
784 pub total_runs_completed: u64,
786 pub total_runs_dropped: u64,
788 pub total_runs_evicted: u64,
790}
791
792#[derive(Debug, Clone, Serialize, Deserialize)]
794pub struct RunCheckpoint {
795 pub current_state: usize,
797 pub stack: Vec<StackEntryCheckpoint>,
799 pub captured: HashMap<String, SerializableEvent>,
801 pub event_time_started_at_ms: Option<i64>,
803 pub event_time_deadline_ms: Option<i64>,
805 pub partition_key: Option<SerializableValue>,
807 pub invalidated: bool,
809 pub pending_negation_count: usize,
811 pub kleene_events: Option<Vec<SerializableEvent>>,
813}
814
815#[derive(Debug, Clone, Serialize, Deserialize)]
817pub struct StackEntryCheckpoint {
818 pub event: SerializableEvent,
819 pub alias: Option<String>,
820}
821
822#[derive(Debug, Clone, Serialize, Deserialize)]
824pub struct JoinCheckpoint {
825 pub buffers: HashMap<String, HashMap<String, Vec<(i64, SerializableEvent)>>>,
827 pub sources: Vec<String>,
829 pub join_keys: HashMap<String, String>,
831 pub window_duration_ms: i64,
833}
834
835#[derive(Debug, Clone, Serialize, Deserialize)]
837pub struct WatermarkCheckpoint {
838 pub sources: HashMap<String, SourceWatermarkCheckpoint>,
840 pub effective_watermark_ms: Option<i64>,
842}
843
844#[derive(Debug, Clone, Serialize, Deserialize)]
846pub struct SourceWatermarkCheckpoint {
847 pub watermark_ms: Option<i64>,
849 pub max_timestamp_ms: Option<i64>,
851 pub max_out_of_orderness_ms: i64,
853}
854
855#[derive(Debug, Clone, Serialize, Deserialize)]
857pub struct DistinctCheckpoint {
858 pub keys: Vec<String>,
860}
861
862#[derive(Debug, Clone, Serialize, Deserialize)]
864pub struct LimitCheckpoint {
865 pub max: usize,
867 pub count: usize,
869}
870
871pub(crate) fn value_to_ser(v: &varpulis_core::Value) -> SerializableValue {
873 value_to_serializable(v)
874}
875
876pub(crate) fn ser_to_value(sv: SerializableValue) -> varpulis_core::Value {
878 serializable_to_value(sv)
879}
880
881#[cfg(feature = "encryption")]
902#[derive(Debug)]
903pub struct EncryptedStateStore<S: StateStore> {
904 inner: S,
905 key: [u8; 32],
906}
907
908#[cfg(feature = "encryption")]
909impl<S: StateStore> EncryptedStateStore<S> {
910 pub fn new(inner: S, key: [u8; 32]) -> Self {
912 Self { inner, key }
913 }
914
915 pub fn key_from_hex(hex_str: &str) -> Result<[u8; 32], StoreError> {
917 let bytes = hex::decode(hex_str.trim())
918 .map_err(|e| StoreError::IoError(format!("Invalid hex key: {}", e)))?;
919 if bytes.len() != 32 {
920 return Err(StoreError::IoError(format!(
921 "Encryption key must be 32 bytes (64 hex chars), got {} bytes",
922 bytes.len()
923 )));
924 }
925 let mut key = [0u8; 32];
926 key.copy_from_slice(&bytes);
927 Ok(key)
928 }
929
930 pub fn key_from_passphrase(passphrase: &str, salt: &[u8]) -> Result<[u8; 32], StoreError> {
932 use argon2::Argon2;
933
934 let mut key = [0u8; 32];
935 Argon2::default()
936 .hash_password_into(passphrase.as_bytes(), salt, &mut key)
937 .map_err(|e| StoreError::IoError(format!("Argon2 key derivation failed: {}", e)))?;
938 Ok(key)
939 }
940
941 fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, StoreError> {
943 use aes_gcm::aead::{Aead, OsRng};
944 use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
945
946 let cipher = Aes256Gcm::new_from_slice(&self.key)
947 .map_err(|e| StoreError::IoError(format!("AES key error: {}", e)))?;
948
949 let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
950 let ciphertext = cipher
951 .encrypt(&nonce, plaintext)
952 .map_err(|e| StoreError::IoError(format!("Encryption failed: {}", e)))?;
953
954 let mut result = Vec::with_capacity(12 + ciphertext.len());
956 result.extend_from_slice(&nonce);
957 result.extend_from_slice(&ciphertext);
958 Ok(result)
959 }
960
961 fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, StoreError> {
963 use aes_gcm::aead::Aead;
964 use aes_gcm::{Aes256Gcm, KeyInit, Nonce};
965
966 if data.len() < 12 {
967 return Err(StoreError::IoError(
968 "Encrypted data too short (missing nonce)".to_string(),
969 ));
970 }
971
972 let (nonce_bytes, ciphertext) = data.split_at(12);
973 let nonce_arr: [u8; 12] = nonce_bytes
974 .try_into()
975 .map_err(|_| StoreError::IoError("Invalid nonce length".to_string()))?;
976 let nonce = Nonce::from(nonce_arr);
977
978 let cipher = Aes256Gcm::new_from_slice(&self.key)
979 .map_err(|e| StoreError::IoError(format!("AES key error: {}", e)))?;
980
981 cipher
982 .decrypt(&nonce, ciphertext)
983 .map_err(|e| StoreError::IoError(format!("Decryption failed: {}", e)))
984 }
985}
986
987#[cfg(feature = "encryption")]
988impl<S: StateStore> StateStore for EncryptedStateStore<S> {
989 fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
990 let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
992 let encrypted = self.encrypt(&data)?;
993
994 let key = format!("checkpoint:{}", checkpoint.id);
996 self.inner.put(&key, &encrypted)?;
997
998 Ok(())
1001 }
1002
1003 fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
1004 let checkpoints = self.list_checkpoints()?;
1005 if let Some(id) = checkpoints.last() {
1006 self.load_checkpoint(*id)
1007 } else {
1008 Ok(None)
1009 }
1010 }
1011
1012 fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
1013 let key = format!("checkpoint:{}", id);
1014 if let Some(encrypted) = self.inner.get(&key)? {
1015 let data = self.decrypt(&encrypted)?;
1016 let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
1017 Ok(Some(checkpoint))
1018 } else {
1019 Ok(None)
1020 }
1021 }
1022
1023 fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
1024 self.inner.list_checkpoints()
1025 }
1026
1027 fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
1028 self.inner.prune_checkpoints(keep)
1029 }
1030
1031 fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
1032 let encrypted = self.encrypt(value)?;
1033 self.inner.put(key, &encrypted)
1034 }
1035
1036 fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
1037 match self.inner.get(key)? {
1038 Some(encrypted) => Ok(Some(self.decrypt(&encrypted)?)),
1039 None => Ok(None),
1040 }
1041 }
1042
1043 fn delete(&self, key: &str) -> Result<(), StoreError> {
1044 self.inner.delete(key)
1045 }
1046
1047 fn flush(&self) -> Result<(), StoreError> {
1048 self.inner.flush()
1049 }
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054 use super::*;
1055
1056 #[test]
1057 fn test_memory_store_checkpoint() {
1058 let store = MemoryStore::new();
1059
1060 let checkpoint = Checkpoint {
1061 id: 1,
1062 timestamp_ms: 1000,
1063 events_processed: 100,
1064 window_states: HashMap::new(),
1065 pattern_states: HashMap::new(),
1066 metadata: HashMap::new(),
1067 context_states: HashMap::new(),
1068 };
1069
1070 store.save_checkpoint(&checkpoint).unwrap();
1071
1072 let loaded = store.load_checkpoint(1).unwrap();
1073 assert!(loaded.is_some());
1074 let loaded = loaded.unwrap();
1075 assert_eq!(loaded.id, 1);
1076 assert_eq!(loaded.events_processed, 100);
1077 }
1078
1079 #[test]
1080 fn test_memory_store_prune() {
1081 let store = MemoryStore::new();
1082
1083 for i in 1..=5 {
1084 let checkpoint = Checkpoint {
1085 id: i,
1086 timestamp_ms: i as i64 * 1000,
1087 events_processed: i * 100,
1088 window_states: HashMap::new(),
1089 pattern_states: HashMap::new(),
1090 metadata: HashMap::new(),
1091 context_states: HashMap::new(),
1092 };
1093 store.save_checkpoint(&checkpoint).unwrap();
1094 }
1095
1096 let checkpoints = store.list_checkpoints().unwrap();
1097 assert_eq!(checkpoints.len(), 5);
1098
1099 let pruned = store.prune_checkpoints(2).unwrap();
1100 assert_eq!(pruned, 3);
1101
1102 let checkpoints = store.list_checkpoints().unwrap();
1103 assert_eq!(checkpoints.len(), 2);
1104 assert_eq!(checkpoints, vec![4, 5]);
1105 }
1106
1107 #[test]
1108 fn test_file_store_put_get_delete() {
1109 let dir = tempfile::tempdir().unwrap();
1110 let store = FileStore::open(dir.path()).unwrap();
1111
1112 store.put("test:key1", b"hello world").unwrap();
1114 let val = store.get("test:key1").unwrap();
1115 assert_eq!(val, Some(b"hello world".to_vec()));
1116
1117 let val = store.get("test:missing").unwrap();
1119 assert!(val.is_none());
1120
1121 store.delete("test:key1").unwrap();
1123 let val = store.get("test:key1").unwrap();
1124 assert!(val.is_none());
1125
1126 store.delete("test:missing").unwrap();
1128 }
1129
1130 #[test]
1131 fn test_file_store_atomic_write() {
1132 let dir = tempfile::tempdir().unwrap();
1133 let store = FileStore::open(dir.path()).unwrap();
1134
1135 store.put("data:file1", b"version 1").unwrap();
1137 assert_eq!(
1138 store.get("data:file1").unwrap(),
1139 Some(b"version 1".to_vec())
1140 );
1141
1142 store.put("data:file1", b"version 2").unwrap();
1144 assert_eq!(
1145 store.get("data:file1").unwrap(),
1146 Some(b"version 2".to_vec())
1147 );
1148
1149 let data_dir = dir.path().join("data");
1151 if data_dir.exists() {
1152 for entry in std::fs::read_dir(&data_dir).unwrap() {
1153 let entry = entry.unwrap();
1154 let name = entry.file_name().to_string_lossy().to_string();
1155 assert!(
1156 !std::path::Path::new(&name)
1157 .extension()
1158 .is_some_and(|ext| ext.eq_ignore_ascii_case("tmp")),
1159 "tmp file left behind: {name}"
1160 );
1161 }
1162 }
1163 }
1164
1165 #[test]
1166 fn test_serializable_event() {
1167 let mut event = Event::new("TestEvent");
1168 event
1169 .data
1170 .insert("count".into(), varpulis_core::Value::Int(42));
1171 event
1172 .data
1173 .insert("value".into(), varpulis_core::Value::Float(1.5));
1174 event
1175 .data
1176 .insert("name".into(), varpulis_core::Value::Str("test".into()));
1177
1178 let serializable: SerializableEvent = (&event).into();
1179 let restored: Event = serializable.into();
1180
1181 assert_eq!(&*restored.event_type, "TestEvent");
1182 assert_eq!(restored.get_int("count"), Some(42));
1183 assert_eq!(restored.get_float("value"), Some(1.5));
1184 assert_eq!(restored.get_str("name"), Some("test"));
1185 }
1186
1187 #[test]
1188 fn test_serializable_event_complex_values() {
1189 let mut event = Event::new("ComplexEvent");
1190
1191 event.data.insert(
1193 "ts".into(),
1194 varpulis_core::Value::Timestamp(1_700_000_000_000_000_000),
1195 );
1196
1197 event
1199 .data
1200 .insert("dur".into(), varpulis_core::Value::Duration(5_000_000_000));
1201
1202 event.data.insert(
1204 "tags".into(),
1205 varpulis_core::Value::array(vec![
1206 varpulis_core::Value::Str("a".into()),
1207 varpulis_core::Value::Int(1),
1208 ]),
1209 );
1210
1211 let mut inner_map = IndexMap::with_hasher(FxBuildHasher);
1213 inner_map.insert("nested_key".into(), varpulis_core::Value::Float(3.15));
1214 inner_map.insert("flag".into(), varpulis_core::Value::Bool(true));
1215 event
1216 .data
1217 .insert("meta".into(), varpulis_core::Value::map(inner_map));
1218
1219 let serializable: SerializableEvent = (&event).into();
1221
1222 assert!(matches!(
1224 serializable.fields.get("ts"),
1225 Some(SerializableValue::Timestamp(1_700_000_000_000_000_000))
1226 ));
1227 assert!(matches!(
1228 serializable.fields.get("dur"),
1229 Some(SerializableValue::Duration(5_000_000_000))
1230 ));
1231 assert!(matches!(
1232 serializable.fields.get("tags"),
1233 Some(SerializableValue::Array(_))
1234 ));
1235 assert!(matches!(
1236 serializable.fields.get("meta"),
1237 Some(SerializableValue::Map(_))
1238 ));
1239
1240 let restored: Event = serializable.into();
1241
1242 assert_eq!(
1244 restored.data.get("ts"),
1245 Some(&varpulis_core::Value::Timestamp(1_700_000_000_000_000_000))
1246 );
1247 assert_eq!(
1248 restored.data.get("dur"),
1249 Some(&varpulis_core::Value::Duration(5_000_000_000))
1250 );
1251
1252 match restored.data.get("tags") {
1254 Some(varpulis_core::Value::Array(arr)) => {
1255 assert_eq!(arr.len(), 2);
1256 assert_eq!(arr[0], varpulis_core::Value::Str("a".into()));
1257 assert_eq!(arr[1], varpulis_core::Value::Int(1));
1258 }
1259 other => panic!("Expected Array, got {other:?}"),
1260 }
1261
1262 match restored.data.get("meta") {
1264 Some(varpulis_core::Value::Map(m)) => {
1265 assert_eq!(m.len(), 2);
1266 assert_eq!(
1267 m.get("nested_key"),
1268 Some(&varpulis_core::Value::Float(3.15))
1269 );
1270 assert_eq!(m.get("flag"), Some(&varpulis_core::Value::Bool(true)));
1271 }
1272 other => panic!("Expected Map, got {other:?}"),
1273 }
1274 }
1275
1276 #[cfg(feature = "encryption")]
1281 mod encryption_tests {
1282 use super::*;
1283
1284 fn test_key() -> [u8; 32] {
1285 let mut key = [0u8; 32];
1287 for (i, b) in key.iter_mut().enumerate() {
1288 *b = i as u8;
1289 }
1290 key
1291 }
1292
1293 #[test]
1294 fn test_encrypted_put_get_roundtrip() {
1295 let inner = MemoryStore::new();
1296 let store = EncryptedStateStore::new(inner, test_key());
1297
1298 let data = b"hello, encrypted world!";
1299 store.put("test_key", data).unwrap();
1300
1301 let retrieved = store.get("test_key").unwrap().unwrap();
1302 assert_eq!(retrieved, data);
1303 }
1304
1305 #[test]
1306 fn test_encrypted_data_differs_from_plaintext() {
1307 let inner = MemoryStore::new();
1308 let store = EncryptedStateStore::new(inner, test_key());
1309
1310 let data = b"sensitive data";
1311 store.put("test_key", data).unwrap();
1312
1313 let raw = store.inner.get("test_key").unwrap().unwrap();
1315 assert_ne!(raw, data.to_vec());
1316 assert!(raw.len() > data.len()); }
1318
1319 #[test]
1320 fn test_encrypted_checkpoint_roundtrip() {
1321 let inner = MemoryStore::new();
1322 let store = EncryptedStateStore::new(inner, test_key());
1323
1324 let checkpoint = Checkpoint {
1325 id: 1,
1326 timestamp_ms: 1700000000000,
1327 events_processed: 42,
1328 window_states: HashMap::new(),
1329 pattern_states: HashMap::new(),
1330 metadata: HashMap::new(),
1331 context_states: HashMap::new(),
1332 };
1333
1334 store.save_checkpoint(&checkpoint).unwrap();
1335 let loaded = store.load_checkpoint(1).unwrap().unwrap();
1336
1337 assert_eq!(loaded.id, 1);
1338 assert_eq!(loaded.events_processed, 42);
1339 assert_eq!(loaded.timestamp_ms, 1700000000000);
1340 }
1341
1342 #[test]
1343 fn test_encrypted_get_nonexistent_key() {
1344 let inner = MemoryStore::new();
1345 let store = EncryptedStateStore::new(inner, test_key());
1346
1347 let result = store.get("nonexistent").unwrap();
1348 assert!(result.is_none());
1349 }
1350
1351 #[test]
1352 fn test_encrypted_delete() {
1353 let inner = MemoryStore::new();
1354 let store = EncryptedStateStore::new(inner, test_key());
1355
1356 store.put("key", b"value").unwrap();
1357 assert!(store.get("key").unwrap().is_some());
1358
1359 store.delete("key").unwrap();
1360 assert!(store.get("key").unwrap().is_none());
1361 }
1362
1363 #[test]
1364 fn test_wrong_key_fails_decryption() {
1365 let inner = MemoryStore::new();
1366 let key1 = test_key();
1367 let store1 = EncryptedStateStore::new(inner, key1);
1368
1369 store1.put("key", b"secret").unwrap();
1370
1371 let mut key2 = [0u8; 32];
1373 key2[0] = 0xFF; let inner2 = MemoryStore::new();
1377 let store2 = EncryptedStateStore::new(inner2, key2);
1378 store2.put("key", b"other secret").unwrap();
1379
1380 let result = store2.get("key").unwrap().unwrap();
1382 assert_eq!(result, b"other secret");
1383 }
1384
1385 #[test]
1386 fn test_key_from_hex() {
1387 let hex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
1388 let key = EncryptedStateStore::<MemoryStore>::key_from_hex(hex).unwrap();
1389 assert_eq!(key[0], 0x01);
1390 assert_eq!(key[1], 0x23);
1391 assert_eq!(key[31], 0xef);
1392 }
1393
1394 #[test]
1395 fn test_key_from_hex_wrong_length() {
1396 let hex = "0123456789abcdef"; let result = EncryptedStateStore::<MemoryStore>::key_from_hex(hex);
1398 assert!(result.is_err());
1399 }
1400
1401 #[test]
1402 fn test_key_from_passphrase() {
1403 let key = EncryptedStateStore::<MemoryStore>::key_from_passphrase(
1404 "my-secret-passphrase",
1405 b"varpulis-salt-00",
1406 )
1407 .unwrap();
1408 assert_eq!(key.len(), 32);
1409
1410 let key2 = EncryptedStateStore::<MemoryStore>::key_from_passphrase(
1412 "my-secret-passphrase",
1413 b"varpulis-salt-00",
1414 )
1415 .unwrap();
1416 assert_eq!(key, key2);
1417 }
1418
1419 #[test]
1420 fn test_encrypted_latest_checkpoint() {
1421 let inner = MemoryStore::new();
1422 let store = EncryptedStateStore::new(inner, test_key());
1423
1424 assert!(store.load_latest_checkpoint().unwrap().is_none());
1426
1427 let cp1 = Checkpoint {
1429 id: 1,
1430 timestamp_ms: 1000,
1431 events_processed: 10,
1432 window_states: HashMap::new(),
1433 pattern_states: HashMap::new(),
1434 metadata: HashMap::new(),
1435 context_states: HashMap::new(),
1436 };
1437 let cp2 = Checkpoint {
1438 id: 2,
1439 timestamp_ms: 2000,
1440 events_processed: 20,
1441 window_states: HashMap::new(),
1442 pattern_states: HashMap::new(),
1443 metadata: HashMap::new(),
1444 context_states: HashMap::new(),
1445 };
1446
1447 store.save_checkpoint(&cp1).unwrap();
1448 store.save_checkpoint(&cp2).unwrap();
1449
1450 let latest = store.load_latest_checkpoint().unwrap().unwrap();
1451 assert_eq!(latest.id, 2);
1452 assert_eq!(latest.events_processed, 20);
1453 }
1454 }
1455}