Skip to main content

varpulis_runtime/
persistence.rs

1//! State Persistence for Varpulis Engine
2//!
3//! Provides persistent storage for engine state including:
4//! - Window contents (events in active windows)
5//! - Aggregation state
6//! - Pattern matcher state
7//! - Checkpointing and recovery
8//!
9//! # Example
10//! ```text
11//! use varpulis_runtime::persistence::{StateStore, RocksDbStore, CheckpointConfig};
12//!
13//! let store = RocksDbStore::open("/tmp/varpulis-state")?;
14//! let config = CheckpointConfig::default();
15//! engine.enable_persistence(store, config);
16//! ```
17
18use crate::event::Event;
19use indexmap::IndexMap;
20use rustc_hash::FxBuildHasher;
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tracing::info;
26
27#[cfg(feature = "persistence")]
28use std::path::Path;
29#[cfg(feature = "persistence")]
30use tracing::debug;
31
32/// Configuration for checkpointing
33#[derive(Debug, Clone)]
34pub struct CheckpointConfig {
35    /// Interval between checkpoints
36    pub interval: Duration,
37    /// Maximum number of checkpoints to retain
38    pub max_checkpoints: usize,
39    /// Whether to checkpoint on shutdown
40    pub checkpoint_on_shutdown: bool,
41    /// Prefix for checkpoint keys
42    pub key_prefix: String,
43}
44
45impl Default for CheckpointConfig {
46    fn default() -> Self {
47        Self {
48            interval: Duration::from_secs(60),
49            max_checkpoints: 3,
50            checkpoint_on_shutdown: true,
51            key_prefix: "varpulis".to_string(),
52        }
53    }
54}
55
56/// Serializable representation of an event for persistence
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct SerializableEvent {
59    pub event_type: String,
60    pub timestamp_ms: i64,
61    pub fields: HashMap<String, SerializableValue>,
62}
63
64/// Serializable value type
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
66pub enum SerializableValue {
67    Int(i64),
68    Float(f64),
69    Bool(bool),
70    String(String),
71    Null,
72    Timestamp(i64),
73    Duration(u64),
74    Array(Vec<Self>),
75    Map(Vec<(String, Self)>),
76}
77
78/// Convert a `varpulis_core::Value` to a `SerializableValue`
79fn value_to_serializable(v: &varpulis_core::Value) -> SerializableValue {
80    match v {
81        varpulis_core::Value::Int(i) => SerializableValue::Int(*i),
82        varpulis_core::Value::Float(f) => SerializableValue::Float(*f),
83        varpulis_core::Value::Bool(b) => SerializableValue::Bool(*b),
84        varpulis_core::Value::Str(s) => SerializableValue::String(s.to_string()),
85        varpulis_core::Value::Null => SerializableValue::Null,
86        varpulis_core::Value::Timestamp(ts) => SerializableValue::Timestamp(*ts),
87        varpulis_core::Value::Duration(d) => SerializableValue::Duration(*d),
88        varpulis_core::Value::Array(arr) => {
89            SerializableValue::Array(arr.iter().map(value_to_serializable).collect())
90        }
91        varpulis_core::Value::Map(map) => SerializableValue::Map(
92            map.iter()
93                .map(|(k, v)| (k.to_string(), value_to_serializable(v)))
94                .collect(),
95        ),
96    }
97}
98
99/// Convert a `SerializableValue` back to a `varpulis_core::Value`
100fn serializable_to_value(sv: SerializableValue) -> varpulis_core::Value {
101    match sv {
102        SerializableValue::Int(i) => varpulis_core::Value::Int(i),
103        SerializableValue::Float(f) => varpulis_core::Value::Float(f),
104        SerializableValue::Bool(b) => varpulis_core::Value::Bool(b),
105        SerializableValue::String(s) => varpulis_core::Value::Str(s.into()),
106        SerializableValue::Null => varpulis_core::Value::Null,
107        SerializableValue::Timestamp(ts) => varpulis_core::Value::Timestamp(ts),
108        SerializableValue::Duration(d) => varpulis_core::Value::Duration(d),
109        SerializableValue::Array(arr) => {
110            varpulis_core::Value::array(arr.into_iter().map(serializable_to_value).collect())
111        }
112        SerializableValue::Map(entries) => {
113            let mut map: IndexMap<std::sync::Arc<str>, varpulis_core::Value, FxBuildHasher> =
114                IndexMap::with_hasher(FxBuildHasher);
115            for (k, v) in entries {
116                map.insert(k.into(), serializable_to_value(v));
117            }
118            varpulis_core::Value::map(map)
119        }
120    }
121}
122
123impl From<&Event> for SerializableEvent {
124    fn from(event: &Event) -> Self {
125        let mut fields = HashMap::new();
126        for (k, v) in &event.data {
127            fields.insert(k.to_string(), value_to_serializable(v));
128        }
129        Self {
130            event_type: event.event_type.to_string(),
131            timestamp_ms: event.timestamp.timestamp_millis(),
132            fields,
133        }
134    }
135}
136
137impl From<SerializableEvent> for Event {
138    fn from(se: SerializableEvent) -> Self {
139        let mut event = Self::new(se.event_type);
140        event.timestamp = chrono::DateTime::from_timestamp_millis(se.timestamp_ms)
141            .unwrap_or_else(chrono::Utc::now);
142        for (k, v) in se.fields {
143            event.data.insert(k.into(), serializable_to_value(v));
144        }
145        event
146    }
147}
148
149/// Checkpoint containing all engine state
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct Checkpoint {
152    /// Checkpoint ID (monotonically increasing)
153    pub id: u64,
154    /// Timestamp when checkpoint was created
155    pub timestamp_ms: i64,
156    /// Number of events processed at checkpoint time
157    pub events_processed: u64,
158    /// Window states by stream name
159    pub window_states: HashMap<String, WindowCheckpoint>,
160    /// Pattern matcher states
161    pub pattern_states: HashMap<String, PatternCheckpoint>,
162    /// Custom metadata
163    pub metadata: HashMap<String, String>,
164    /// Engine states per context (for coordinated checkpointing)
165    #[serde(default)]
166    pub context_states: HashMap<String, EngineCheckpoint>,
167}
168
169/// Checkpoint for window state
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct WindowCheckpoint {
172    /// Events currently in the window
173    pub events: Vec<SerializableEvent>,
174    /// Window start timestamp (if applicable)
175    pub window_start_ms: Option<i64>,
176    /// Last emit timestamp (for sliding windows)
177    pub last_emit_ms: Option<i64>,
178    /// Partitioned window states
179    pub partitions: HashMap<String, PartitionedWindowCheckpoint>,
180}
181
182/// Checkpoint for partitioned window
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct PartitionedWindowCheckpoint {
185    pub events: Vec<SerializableEvent>,
186    pub window_start_ms: Option<i64>,
187}
188
189/// Checkpoint for pattern matcher state
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct PatternCheckpoint {
192    /// Active partial matches
193    pub partial_matches: Vec<PartialMatchCheckpoint>,
194}
195
196/// A partial match in progress
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct PartialMatchCheckpoint {
199    /// Current state in the pattern automaton
200    pub state: String,
201    /// Events matched so far
202    pub matched_events: Vec<SerializableEvent>,
203    /// Start timestamp
204    pub start_ms: i64,
205}
206
207/// Error type for state store operations
208#[derive(Debug, thiserror::Error)]
209pub enum StoreError {
210    /// I/O or storage error
211    #[error("I/O error: {0}")]
212    IoError(String),
213    /// Serialization error
214    #[error("Serialization error: {0}")]
215    SerializationError(String),
216    /// Key not found
217    #[error("Key not found: {0}")]
218    NotFound(String),
219    /// Store not initialized
220    #[error("Store not initialized")]
221    NotInitialized,
222    /// Checkpoint version is newer than this binary supports
223    #[error(
224        "Checkpoint version {checkpoint_version} is newer than supported version {current_version}"
225    )]
226    IncompatibleVersion {
227        checkpoint_version: u32,
228        current_version: u32,
229    },
230}
231
232/// Trait for state storage backends
233pub trait StateStore: Send + Sync {
234    /// Store a checkpoint
235    fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError>;
236
237    /// Load the latest checkpoint
238    fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError>;
239
240    /// Load a specific checkpoint by ID
241    fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError>;
242
243    /// List all checkpoint IDs
244    fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError>;
245
246    /// Delete old checkpoints, keeping only the most recent N
247    fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError>;
248
249    /// Store arbitrary key-value data
250    fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError>;
251
252    /// Retrieve arbitrary key-value data
253    fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError>;
254
255    /// Delete a key
256    fn delete(&self, key: &str) -> Result<(), StoreError>;
257
258    /// Flush all pending writes to disk
259    fn flush(&self) -> Result<(), StoreError>;
260}
261
262/// In-memory state store for testing
263#[derive(Debug, Default)]
264pub struct MemoryStore {
265    data: std::sync::RwLock<HashMap<String, Vec<u8>>>,
266}
267
268impl MemoryStore {
269    pub fn new() -> Self {
270        Self::default()
271    }
272}
273
274impl StateStore for MemoryStore {
275    fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
276        let key = format!("checkpoint:{}", checkpoint.id);
277        let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
278        self.put(&key, &data)
279    }
280
281    fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
282        let checkpoints = self.list_checkpoints()?;
283        if let Some(id) = checkpoints.last() {
284            self.load_checkpoint(*id)
285        } else {
286            Ok(None)
287        }
288    }
289
290    fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
291        let key = format!("checkpoint:{id}");
292        if let Some(data) = self.get(&key)? {
293            let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
294            Ok(Some(checkpoint))
295        } else {
296            Ok(None)
297        }
298    }
299
300    fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
301        let data = self
302            .data
303            .read()
304            .map_err(|e| StoreError::IoError(e.to_string()))?;
305        let mut ids: Vec<u64> = data
306            .keys()
307            .filter_map(|k| k.strip_prefix("checkpoint:").and_then(|s| s.parse().ok()))
308            .collect();
309        ids.sort_unstable();
310        Ok(ids)
311    }
312
313    fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
314        let checkpoints = self.list_checkpoints()?;
315        let to_delete = checkpoints.len().saturating_sub(keep);
316        for id in checkpoints.iter().take(to_delete) {
317            let key = format!("checkpoint:{id}");
318            self.delete(&key)?;
319        }
320        Ok(to_delete)
321    }
322
323    fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
324        let mut data = self
325            .data
326            .write()
327            .map_err(|e| StoreError::IoError(e.to_string()))?;
328        data.insert(key.to_string(), value.to_vec());
329        Ok(())
330    }
331
332    fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
333        let data = self
334            .data
335            .read()
336            .map_err(|e| StoreError::IoError(e.to_string()))?;
337        Ok(data.get(key).cloned())
338    }
339
340    fn delete(&self, key: &str) -> Result<(), StoreError> {
341        let mut data = self
342            .data
343            .write()
344            .map_err(|e| StoreError::IoError(e.to_string()))?;
345        data.remove(key);
346        Ok(())
347    }
348
349    fn flush(&self) -> Result<(), StoreError> {
350        Ok(()) // No-op for memory store
351    }
352}
353
354/// RocksDB-based state store
355#[cfg(feature = "persistence")]
356pub struct RocksDbStore {
357    db: rocksdb::DB,
358    prefix: String,
359}
360
361#[cfg(feature = "persistence")]
362impl RocksDbStore {
363    /// Open or create a RocksDB store at the given path
364    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, StoreError> {
365        Self::open_with_prefix(path, "varpulis")
366    }
367
368    /// Open with a custom key prefix
369    pub fn open_with_prefix<P: AsRef<Path>>(path: P, prefix: &str) -> Result<Self, StoreError> {
370        let mut opts = rocksdb::Options::default();
371        opts.create_if_missing(true);
372        opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
373
374        // Optimize for write-heavy workload
375        opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB
376        opts.set_max_write_buffer_number(3);
377        opts.set_target_file_size_base(64 * 1024 * 1024);
378
379        let db = rocksdb::DB::open(&opts, path).map_err(|e| StoreError::IoError(e.to_string()))?;
380
381        info!("Opened RocksDB state store");
382        Ok(Self {
383            db,
384            prefix: prefix.to_string(),
385        })
386    }
387
388    fn prefixed_key(&self, key: &str) -> String {
389        format!("{}:{}", self.prefix, key)
390    }
391}
392
393#[cfg(feature = "persistence")]
394impl StateStore for RocksDbStore {
395    fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
396        let key = self.prefixed_key(&format!("checkpoint:{}", checkpoint.id));
397        let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
398
399        self.db
400            .put(key.as_bytes(), &data)
401            .map_err(|e| StoreError::IoError(e.to_string()))?;
402
403        // Also update the "latest" pointer
404        let latest_key = self.prefixed_key("checkpoint:latest");
405        self.db
406            .put(latest_key.as_bytes(), checkpoint.id.to_le_bytes())
407            .map_err(|e| StoreError::IoError(e.to_string()))?;
408
409        debug!("Saved checkpoint {} ({} bytes)", checkpoint.id, data.len());
410        Ok(())
411    }
412
413    fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
414        let latest_key = self.prefixed_key("checkpoint:latest");
415        if let Some(id_bytes) = self
416            .db
417            .get(latest_key.as_bytes())
418            .map_err(|e| StoreError::IoError(e.to_string()))?
419        {
420            let Ok(bytes) = <[u8; 8]>::try_from(id_bytes.as_ref()) else {
421                return Ok(None);
422            };
423            let id = u64::from_le_bytes(bytes);
424            return self.load_checkpoint(id);
425        }
426        Ok(None)
427    }
428
429    fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
430        let key = self.prefixed_key(&format!("checkpoint:{}", id));
431        if let Some(data) = self
432            .db
433            .get(key.as_bytes())
434            .map_err(|e| StoreError::IoError(e.to_string()))?
435        {
436            let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
437            debug!("Loaded checkpoint {}", id);
438            Ok(Some(checkpoint))
439        } else {
440            Ok(None)
441        }
442    }
443
444    fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
445        let prefix = self.prefixed_key("checkpoint:");
446        let mut ids = Vec::new();
447
448        let iter = self.db.prefix_iterator(prefix.as_bytes());
449        for item in iter {
450            let (key, _) = item.map_err(|e| StoreError::IoError(e.to_string()))?;
451            let key_str = String::from_utf8_lossy(&key);
452            if let Some(suffix) = key_str.strip_prefix(&prefix) {
453                if suffix != "latest" {
454                    if let Ok(id) = suffix.parse::<u64>() {
455                        ids.push(id);
456                    }
457                }
458            }
459        }
460
461        ids.sort();
462        Ok(ids)
463    }
464
465    fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
466        let checkpoints = self.list_checkpoints()?;
467        let to_delete = checkpoints.len().saturating_sub(keep);
468
469        for id in checkpoints.iter().take(to_delete) {
470            let key = self.prefixed_key(&format!("checkpoint:{}", id));
471            self.db
472                .delete(key.as_bytes())
473                .map_err(|e| StoreError::IoError(e.to_string()))?;
474        }
475
476        if to_delete > 0 {
477            info!("Pruned {} old checkpoints", to_delete);
478        }
479        Ok(to_delete)
480    }
481
482    fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
483        let full_key = self.prefixed_key(key);
484        self.db
485            .put(full_key.as_bytes(), value)
486            .map_err(|e| StoreError::IoError(e.to_string()))
487    }
488
489    fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
490        let full_key = self.prefixed_key(key);
491        self.db
492            .get(full_key.as_bytes())
493            .map_err(|e| StoreError::IoError(e.to_string()))
494    }
495
496    fn delete(&self, key: &str) -> Result<(), StoreError> {
497        let full_key = self.prefixed_key(key);
498        self.db
499            .delete(full_key.as_bytes())
500            .map_err(|e| StoreError::IoError(e.to_string()))
501    }
502
503    fn flush(&self) -> Result<(), StoreError> {
504        self.db
505            .flush()
506            .map_err(|e| StoreError::IoError(e.to_string()))
507    }
508}
509
510/// File-system based state store
511///
512/// Stores key-value pairs as files in a directory. Keys containing ":"
513/// are mapped to subdirectories (e.g., "tenant:abc" → "tenant/abc").
514/// Writes are atomic via temp file + rename.
515#[derive(Debug)]
516pub struct FileStore {
517    dir: std::path::PathBuf,
518}
519
520impl FileStore {
521    /// Open or create a file-based store at the given directory
522    pub fn open(dir: impl AsRef<std::path::Path>) -> Result<Self, StoreError> {
523        let dir = dir.as_ref().to_path_buf();
524        std::fs::create_dir_all(&dir).map_err(|e| StoreError::IoError(e.to_string()))?;
525        Ok(Self { dir })
526    }
527
528    fn key_to_path(&self, key: &str) -> std::path::PathBuf {
529        // Map ":" separators to directory separators
530        let path_str = key.replace(':', std::path::MAIN_SEPARATOR_STR);
531        self.dir.join(path_str)
532    }
533}
534
535impl StateStore for FileStore {
536    fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
537        let key = format!("checkpoint:{}", checkpoint.id);
538        let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
539        self.put(&key, &data)
540    }
541
542    fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
543        let checkpoints = self.list_checkpoints()?;
544        if let Some(id) = checkpoints.last() {
545            self.load_checkpoint(*id)
546        } else {
547            Ok(None)
548        }
549    }
550
551    fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
552        let key = format!("checkpoint:{id}");
553        if let Some(data) = self.get(&key)? {
554            let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
555            Ok(Some(checkpoint))
556        } else {
557            Ok(None)
558        }
559    }
560
561    fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
562        let checkpoint_dir = self.dir.join("checkpoint");
563        if !checkpoint_dir.exists() {
564            return Ok(Vec::new());
565        }
566
567        let mut ids: Vec<u64> = Vec::new();
568        let entries =
569            std::fs::read_dir(&checkpoint_dir).map_err(|e| StoreError::IoError(e.to_string()))?;
570        for entry in entries {
571            let entry = entry.map_err(|e| StoreError::IoError(e.to_string()))?;
572            if let Some(name) = entry.file_name().to_str() {
573                if name != "latest" {
574                    if let Ok(id) = name.parse::<u64>() {
575                        ids.push(id);
576                    }
577                }
578            }
579        }
580        ids.sort_unstable();
581        Ok(ids)
582    }
583
584    fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
585        let checkpoints = self.list_checkpoints()?;
586        let to_delete = checkpoints.len().saturating_sub(keep);
587        for id in checkpoints.iter().take(to_delete) {
588            let key = format!("checkpoint:{id}");
589            self.delete(&key)?;
590        }
591        Ok(to_delete)
592    }
593
594    fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
595        let path = self.key_to_path(key);
596        if let Some(parent) = path.parent() {
597            std::fs::create_dir_all(parent).map_err(|e| StoreError::IoError(e.to_string()))?;
598        }
599
600        // Atomic write: write to temp file, then rename
601        let tmp_path = path.with_extension("tmp");
602        std::fs::write(&tmp_path, value).map_err(|e| StoreError::IoError(e.to_string()))?;
603        std::fs::rename(&tmp_path, &path).map_err(|e| StoreError::IoError(e.to_string()))?;
604        Ok(())
605    }
606
607    fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
608        let path = self.key_to_path(key);
609        match std::fs::read(&path) {
610            Ok(data) => Ok(Some(data)),
611            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
612            Err(e) => Err(StoreError::IoError(e.to_string())),
613        }
614    }
615
616    fn delete(&self, key: &str) -> Result<(), StoreError> {
617        let path = self.key_to_path(key);
618        match std::fs::remove_file(&path) {
619            Ok(()) => Ok(()),
620            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
621            Err(e) => Err(StoreError::IoError(e.to_string())),
622        }
623    }
624
625    fn flush(&self) -> Result<(), StoreError> {
626        Ok(()) // File writes are already flushed on close
627    }
628}
629
630/// Checkpoint manager that handles periodic checkpointing
631pub struct CheckpointManager {
632    store: Arc<dyn StateStore>,
633    config: CheckpointConfig,
634    last_checkpoint: Instant,
635    next_checkpoint_id: u64,
636}
637
638impl std::fmt::Debug for CheckpointManager {
639    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
640        f.debug_struct("CheckpointManager")
641            .field("config", &self.config)
642            .field("last_checkpoint", &self.last_checkpoint)
643            .field("next_checkpoint_id", &self.next_checkpoint_id)
644            .finish_non_exhaustive()
645    }
646}
647
648impl CheckpointManager {
649    /// Create a new checkpoint manager
650    pub fn new(store: Arc<dyn StateStore>, config: CheckpointConfig) -> Result<Self, StoreError> {
651        // Load the latest checkpoint ID
652        let next_id = store.load_latest_checkpoint()?.map_or(1, |c| c.id + 1);
653
654        Ok(Self {
655            store,
656            config,
657            last_checkpoint: Instant::now(),
658            next_checkpoint_id: next_id,
659        })
660    }
661
662    /// Check if it's time to create a checkpoint
663    pub fn should_checkpoint(&self) -> bool {
664        self.last_checkpoint.elapsed() >= self.config.interval
665    }
666
667    /// Create a checkpoint with the given state
668    pub fn checkpoint(&mut self, checkpoint: Checkpoint) -> Result<(), StoreError> {
669        let mut checkpoint = checkpoint;
670        checkpoint.id = self.next_checkpoint_id;
671        checkpoint.timestamp_ms = chrono::Utc::now().timestamp_millis();
672
673        self.store.save_checkpoint(&checkpoint)?;
674        self.store.prune_checkpoints(self.config.max_checkpoints)?;
675        self.store.flush()?;
676
677        self.last_checkpoint = Instant::now();
678        self.next_checkpoint_id += 1;
679
680        info!(
681            "Created checkpoint {} ({} events processed)",
682            checkpoint.id, checkpoint.events_processed
683        );
684        Ok(())
685    }
686
687    /// Load the latest checkpoint for recovery
688    pub fn recover(&self) -> Result<Option<Checkpoint>, StoreError> {
689        self.store.load_latest_checkpoint()
690    }
691
692    /// Get the underlying store
693    pub fn store(&self) -> &Arc<dyn StateStore> {
694        &self.store
695    }
696}
697
698/// Current checkpoint schema version.
699pub const CHECKPOINT_VERSION: u32 = 1;
700
701/// Default version for deserialized checkpoints that lack a version field (pre-versioning).
702const fn default_checkpoint_version() -> u32 {
703    1
704}
705
706/// Checkpoint for a single engine instance (one context).
707#[derive(Debug, Clone, Serialize, Deserialize)]
708pub struct EngineCheckpoint {
709    /// Schema version for forward/backward compatibility.
710    #[serde(default = "default_checkpoint_version")]
711    pub version: u32,
712    /// Window states by stream name
713    pub window_states: HashMap<String, WindowCheckpoint>,
714    /// SASE+ pattern engine states by stream name
715    pub sase_states: HashMap<String, SaseCheckpoint>,
716    /// Join buffer states by stream name
717    pub join_states: HashMap<String, JoinCheckpoint>,
718    /// Engine variables
719    pub variables: HashMap<String, SerializableValue>,
720    /// Events processed counter
721    pub events_processed: u64,
722    /// Output events emitted counter
723    pub output_events_emitted: u64,
724    /// Watermark tracker state
725    #[serde(default)]
726    pub watermark_state: Option<WatermarkCheckpoint>,
727    /// Distinct operator states by stream name (LRU keys snapshot)
728    #[serde(default)]
729    pub distinct_states: HashMap<String, DistinctCheckpoint>,
730    /// Limit operator states by stream name (counter snapshot)
731    #[serde(default)]
732    pub limit_states: HashMap<String, LimitCheckpoint>,
733}
734
735impl EngineCheckpoint {
736    /// Validate and migrate a checkpoint to the current schema version.
737    ///
738    /// Returns `Err` if the checkpoint is from a future version that this
739    /// binary does not understand (forward-incompatible).
740    pub const fn validate_and_migrate(&mut self) -> Result<(), StoreError> {
741        if self.version > CHECKPOINT_VERSION {
742            return Err(StoreError::IncompatibleVersion {
743                checkpoint_version: self.version,
744                current_version: CHECKPOINT_VERSION,
745            });
746        }
747
748        // Apply sequential migrations: v1 → v2 → … → CHECKPOINT_VERSION
749        // Currently at v1 — no migrations needed yet.
750        // Future example:
751        // if self.version < 2 {
752        //     migrate_v1_to_v2(self);
753        //     self.version = 2;
754        // }
755
756        Ok(())
757    }
758}
759
760/// Checkpoint for SASE+ pattern matching engine state.
761#[derive(Debug, Clone, Serialize, Deserialize)]
762pub struct SaseCheckpoint {
763    /// Non-partitioned active runs
764    pub active_runs: Vec<RunCheckpoint>,
765    /// Partitioned active runs
766    pub partitioned_runs: HashMap<String, Vec<RunCheckpoint>>,
767    /// Current watermark in milliseconds
768    pub watermark_ms: Option<i64>,
769    /// Maximum observed timestamp in milliseconds
770    pub max_timestamp_ms: Option<i64>,
771    /// Cumulative runs created
772    pub total_runs_created: u64,
773    /// Cumulative runs completed
774    pub total_runs_completed: u64,
775    /// Cumulative runs dropped
776    pub total_runs_dropped: u64,
777    /// Cumulative runs evicted
778    pub total_runs_evicted: u64,
779}
780
781/// Checkpoint for a single SASE+ run (partial match).
782#[derive(Debug, Clone, Serialize, Deserialize)]
783pub struct RunCheckpoint {
784    /// Current NFA state index
785    pub current_state: usize,
786    /// Stack of matched events
787    pub stack: Vec<StackEntryCheckpoint>,
788    /// Captured events by alias
789    pub captured: HashMap<String, SerializableEvent>,
790    /// Event-time when this run started (ms since epoch)
791    pub event_time_started_at_ms: Option<i64>,
792    /// Event-time deadline (ms since epoch)
793    pub event_time_deadline_ms: Option<i64>,
794    /// Partition key value
795    pub partition_key: Option<SerializableValue>,
796    /// Whether the run is invalidated
797    pub invalidated: bool,
798    /// Number of pending negation constraints
799    pub pending_negation_count: usize,
800    /// Kleene capture events (ZDD rebuilt on restore)
801    pub kleene_events: Option<Vec<SerializableEvent>>,
802}
803
804/// Checkpoint for a stack entry in a SASE+ run.
805#[derive(Debug, Clone, Serialize, Deserialize)]
806pub struct StackEntryCheckpoint {
807    pub event: SerializableEvent,
808    pub alias: Option<String>,
809}
810
811/// Checkpoint for a join buffer.
812#[derive(Debug, Clone, Serialize, Deserialize)]
813pub struct JoinCheckpoint {
814    /// Buffered events: source_name -> (key_value -> Vec<(timestamp_ms, event)>)
815    pub buffers: HashMap<String, HashMap<String, Vec<(i64, SerializableEvent)>>>,
816    /// Source stream names
817    pub sources: Vec<String>,
818    /// Join key field per source
819    pub join_keys: HashMap<String, String>,
820    /// Window duration in milliseconds
821    pub window_duration_ms: i64,
822}
823
824/// Checkpoint for per-source watermark tracking.
825#[derive(Debug, Clone, Serialize, Deserialize)]
826pub struct WatermarkCheckpoint {
827    /// Per-source watermark state
828    pub sources: HashMap<String, SourceWatermarkCheckpoint>,
829    /// Effective (minimum) watermark in milliseconds
830    pub effective_watermark_ms: Option<i64>,
831}
832
833/// Checkpoint for a single source's watermark state.
834#[derive(Debug, Clone, Serialize, Deserialize)]
835pub struct SourceWatermarkCheckpoint {
836    /// Current watermark in milliseconds
837    pub watermark_ms: Option<i64>,
838    /// Maximum observed timestamp in milliseconds
839    pub max_timestamp_ms: Option<i64>,
840    /// Maximum out-of-orderness tolerance in milliseconds
841    pub max_out_of_orderness_ms: i64,
842}
843
844/// Checkpoint for a `.distinct()` operator (LRU key snapshot).
845#[derive(Debug, Clone, Serialize, Deserialize)]
846pub struct DistinctCheckpoint {
847    /// Most-recently-used keys (ordered from most-recent to least-recent)
848    pub keys: Vec<String>,
849}
850
851/// Checkpoint for a `.limit(n)` operator (counter snapshot).
852#[derive(Debug, Clone, Serialize, Deserialize)]
853pub struct LimitCheckpoint {
854    /// Maximum number of events
855    pub max: usize,
856    /// Number of events already passed
857    pub count: usize,
858}
859
860/// Convert Value to SerializableValue (pub(crate) re-export)
861pub(crate) fn value_to_ser(v: &varpulis_core::Value) -> SerializableValue {
862    value_to_serializable(v)
863}
864
865/// Convert SerializableValue to Value (pub(crate) re-export)
866pub(crate) fn ser_to_value(sv: SerializableValue) -> varpulis_core::Value {
867    serializable_to_value(sv)
868}
869
870// =============================================================================
871// Encrypted State Store (AES-256-GCM)
872// =============================================================================
873
874/// Encrypted wrapper around any `StateStore` implementation.
875///
876/// Uses AES-256-GCM (authenticated encryption) to encrypt all data at rest.
877/// Each value gets a random 96-bit nonce prepended to the ciphertext.
878///
879/// # Usage
880///
881/// ```rust,no_run
882/// use varpulis_runtime::persistence::{EncryptedStateStore, MemoryStore};
883///
884/// let key = EncryptedStateStore::key_from_hex(
885///     &std::env::var("VARPULIS_ENCRYPTION_KEY").unwrap()
886/// ).unwrap();
887/// let inner = MemoryStore::new();
888/// let store = EncryptedStateStore::new(inner, key);
889/// ```
890#[cfg(feature = "encryption")]
891#[derive(Debug)]
892pub struct EncryptedStateStore<S: StateStore> {
893    inner: S,
894    key: [u8; 32],
895}
896
897#[cfg(feature = "encryption")]
898impl<S: StateStore> EncryptedStateStore<S> {
899    /// Create a new encrypted store wrapping an inner store.
900    pub fn new(inner: S, key: [u8; 32]) -> Self {
901        Self { inner, key }
902    }
903
904    /// Derive a 256-bit key from a hex-encoded string (64 hex chars).
905    pub fn key_from_hex(hex_str: &str) -> Result<[u8; 32], StoreError> {
906        let bytes = hex::decode(hex_str.trim())
907            .map_err(|e| StoreError::IoError(format!("Invalid hex key: {}", e)))?;
908        if bytes.len() != 32 {
909            return Err(StoreError::IoError(format!(
910                "Encryption key must be 32 bytes (64 hex chars), got {} bytes",
911                bytes.len()
912            )));
913        }
914        let mut key = [0u8; 32];
915        key.copy_from_slice(&bytes);
916        Ok(key)
917    }
918
919    /// Derive a 256-bit key from a passphrase using Argon2id.
920    pub fn key_from_passphrase(passphrase: &str, salt: &[u8]) -> Result<[u8; 32], StoreError> {
921        use argon2::Argon2;
922
923        let mut key = [0u8; 32];
924        Argon2::default()
925            .hash_password_into(passphrase.as_bytes(), salt, &mut key)
926            .map_err(|e| StoreError::IoError(format!("Argon2 key derivation failed: {}", e)))?;
927        Ok(key)
928    }
929
930    /// Encrypt data: returns nonce (12 bytes) || ciphertext
931    fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, StoreError> {
932        use aes_gcm::aead::{Aead, OsRng};
933        use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
934
935        let cipher = Aes256Gcm::new_from_slice(&self.key)
936            .map_err(|e| StoreError::IoError(format!("AES key error: {}", e)))?;
937
938        let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
939        let ciphertext = cipher
940            .encrypt(&nonce, plaintext)
941            .map_err(|e| StoreError::IoError(format!("Encryption failed: {}", e)))?;
942
943        // Prepend nonce to ciphertext
944        let mut result = Vec::with_capacity(12 + ciphertext.len());
945        result.extend_from_slice(&nonce);
946        result.extend_from_slice(&ciphertext);
947        Ok(result)
948    }
949
950    /// Decrypt data: expects nonce (12 bytes) || ciphertext
951    fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, StoreError> {
952        use aes_gcm::aead::Aead;
953        use aes_gcm::{Aes256Gcm, KeyInit, Nonce};
954
955        if data.len() < 12 {
956            return Err(StoreError::IoError(
957                "Encrypted data too short (missing nonce)".to_string(),
958            ));
959        }
960
961        let (nonce_bytes, ciphertext) = data.split_at(12);
962        let nonce_arr: [u8; 12] = nonce_bytes
963            .try_into()
964            .map_err(|_| StoreError::IoError("Invalid nonce length".to_string()))?;
965        let nonce = Nonce::from(nonce_arr);
966
967        let cipher = Aes256Gcm::new_from_slice(&self.key)
968            .map_err(|e| StoreError::IoError(format!("AES key error: {}", e)))?;
969
970        cipher
971            .decrypt(&nonce, ciphertext)
972            .map_err(|e| StoreError::IoError(format!("Decryption failed: {}", e)))
973    }
974}
975
976#[cfg(feature = "encryption")]
977impl<S: StateStore> StateStore for EncryptedStateStore<S> {
978    fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
979        // Serialize, encrypt, then save via inner store
980        let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
981        let encrypted = self.encrypt(&data)?;
982
983        // Store encrypted checkpoint using inner's put
984        let key = format!("checkpoint:{}", checkpoint.id);
985        self.inner.put(&key, &encrypted)?;
986
987        // Also maintain the checkpoint list metadata (unencrypted)
988        // by delegating to inner's save_checkpoint for metadata consistency
989        Ok(())
990    }
991
992    fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
993        let checkpoints = self.list_checkpoints()?;
994        if let Some(id) = checkpoints.last() {
995            self.load_checkpoint(*id)
996        } else {
997            Ok(None)
998        }
999    }
1000
1001    fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
1002        let key = format!("checkpoint:{}", id);
1003        if let Some(encrypted) = self.inner.get(&key)? {
1004            let data = self.decrypt(&encrypted)?;
1005            let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
1006            Ok(Some(checkpoint))
1007        } else {
1008            Ok(None)
1009        }
1010    }
1011
1012    fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
1013        self.inner.list_checkpoints()
1014    }
1015
1016    fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
1017        self.inner.prune_checkpoints(keep)
1018    }
1019
1020    fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
1021        let encrypted = self.encrypt(value)?;
1022        self.inner.put(key, &encrypted)
1023    }
1024
1025    fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
1026        match self.inner.get(key)? {
1027            Some(encrypted) => Ok(Some(self.decrypt(&encrypted)?)),
1028            None => Ok(None),
1029        }
1030    }
1031
1032    fn delete(&self, key: &str) -> Result<(), StoreError> {
1033        self.inner.delete(key)
1034    }
1035
1036    fn flush(&self) -> Result<(), StoreError> {
1037        self.inner.flush()
1038    }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043    use super::*;
1044
1045    #[test]
1046    fn test_memory_store_checkpoint() {
1047        let store = MemoryStore::new();
1048
1049        let checkpoint = Checkpoint {
1050            id: 1,
1051            timestamp_ms: 1000,
1052            events_processed: 100,
1053            window_states: HashMap::new(),
1054            pattern_states: HashMap::new(),
1055            metadata: HashMap::new(),
1056            context_states: HashMap::new(),
1057        };
1058
1059        store.save_checkpoint(&checkpoint).unwrap();
1060
1061        let loaded = store.load_checkpoint(1).unwrap();
1062        assert!(loaded.is_some());
1063        let loaded = loaded.unwrap();
1064        assert_eq!(loaded.id, 1);
1065        assert_eq!(loaded.events_processed, 100);
1066    }
1067
1068    #[test]
1069    fn test_memory_store_prune() {
1070        let store = MemoryStore::new();
1071
1072        for i in 1..=5 {
1073            let checkpoint = Checkpoint {
1074                id: i,
1075                timestamp_ms: i as i64 * 1000,
1076                events_processed: i * 100,
1077                window_states: HashMap::new(),
1078                pattern_states: HashMap::new(),
1079                metadata: HashMap::new(),
1080                context_states: HashMap::new(),
1081            };
1082            store.save_checkpoint(&checkpoint).unwrap();
1083        }
1084
1085        let checkpoints = store.list_checkpoints().unwrap();
1086        assert_eq!(checkpoints.len(), 5);
1087
1088        let pruned = store.prune_checkpoints(2).unwrap();
1089        assert_eq!(pruned, 3);
1090
1091        let checkpoints = store.list_checkpoints().unwrap();
1092        assert_eq!(checkpoints.len(), 2);
1093        assert_eq!(checkpoints, vec![4, 5]);
1094    }
1095
1096    #[test]
1097    fn test_file_store_put_get_delete() {
1098        let dir = tempfile::tempdir().unwrap();
1099        let store = FileStore::open(dir.path()).unwrap();
1100
1101        // Put and get
1102        store.put("test:key1", b"hello world").unwrap();
1103        let val = store.get("test:key1").unwrap();
1104        assert_eq!(val, Some(b"hello world".to_vec()));
1105
1106        // Get missing key
1107        let val = store.get("test:missing").unwrap();
1108        assert!(val.is_none());
1109
1110        // Delete
1111        store.delete("test:key1").unwrap();
1112        let val = store.get("test:key1").unwrap();
1113        assert!(val.is_none());
1114
1115        // Delete missing key (should not error)
1116        store.delete("test:missing").unwrap();
1117    }
1118
1119    #[test]
1120    fn test_file_store_atomic_write() {
1121        let dir = tempfile::tempdir().unwrap();
1122        let store = FileStore::open(dir.path()).unwrap();
1123
1124        // Write data
1125        store.put("data:file1", b"version 1").unwrap();
1126        assert_eq!(
1127            store.get("data:file1").unwrap(),
1128            Some(b"version 1".to_vec())
1129        );
1130
1131        // Overwrite (atomic via temp+rename)
1132        store.put("data:file1", b"version 2").unwrap();
1133        assert_eq!(
1134            store.get("data:file1").unwrap(),
1135            Some(b"version 2".to_vec())
1136        );
1137
1138        // Verify no .tmp files left behind
1139        let data_dir = dir.path().join("data");
1140        if data_dir.exists() {
1141            for entry in std::fs::read_dir(&data_dir).unwrap() {
1142                let entry = entry.unwrap();
1143                let name = entry.file_name().to_string_lossy().to_string();
1144                assert!(
1145                    !std::path::Path::new(&name)
1146                        .extension()
1147                        .is_some_and(|ext| ext.eq_ignore_ascii_case("tmp")),
1148                    "tmp file left behind: {name}"
1149                );
1150            }
1151        }
1152    }
1153
1154    #[test]
1155    fn test_serializable_event() {
1156        let mut event = Event::new("TestEvent");
1157        event
1158            .data
1159            .insert("count".into(), varpulis_core::Value::Int(42));
1160        event
1161            .data
1162            .insert("value".into(), varpulis_core::Value::Float(1.5));
1163        event
1164            .data
1165            .insert("name".into(), varpulis_core::Value::Str("test".into()));
1166
1167        let serializable: SerializableEvent = (&event).into();
1168        let restored: Event = serializable.into();
1169
1170        assert_eq!(&*restored.event_type, "TestEvent");
1171        assert_eq!(restored.get_int("count"), Some(42));
1172        assert_eq!(restored.get_float("value"), Some(1.5));
1173        assert_eq!(restored.get_str("name"), Some("test"));
1174    }
1175
1176    #[test]
1177    fn test_serializable_event_complex_values() {
1178        let mut event = Event::new("ComplexEvent");
1179
1180        // Timestamp (nanoseconds since epoch)
1181        event.data.insert(
1182            "ts".into(),
1183            varpulis_core::Value::Timestamp(1_700_000_000_000_000_000),
1184        );
1185
1186        // Duration (nanoseconds)
1187        event
1188            .data
1189            .insert("dur".into(), varpulis_core::Value::Duration(5_000_000_000));
1190
1191        // Array
1192        event.data.insert(
1193            "tags".into(),
1194            varpulis_core::Value::array(vec![
1195                varpulis_core::Value::Str("a".into()),
1196                varpulis_core::Value::Int(1),
1197            ]),
1198        );
1199
1200        // Map
1201        let mut inner_map = IndexMap::with_hasher(FxBuildHasher);
1202        inner_map.insert("nested_key".into(), varpulis_core::Value::Float(3.15));
1203        inner_map.insert("flag".into(), varpulis_core::Value::Bool(true));
1204        event
1205            .data
1206            .insert("meta".into(), varpulis_core::Value::map(inner_map));
1207
1208        // Round-trip through SerializableEvent
1209        let serializable: SerializableEvent = (&event).into();
1210
1211        // Verify serializable intermediate values
1212        assert!(matches!(
1213            serializable.fields.get("ts"),
1214            Some(SerializableValue::Timestamp(1_700_000_000_000_000_000))
1215        ));
1216        assert!(matches!(
1217            serializable.fields.get("dur"),
1218            Some(SerializableValue::Duration(5_000_000_000))
1219        ));
1220        assert!(matches!(
1221            serializable.fields.get("tags"),
1222            Some(SerializableValue::Array(_))
1223        ));
1224        assert!(matches!(
1225            serializable.fields.get("meta"),
1226            Some(SerializableValue::Map(_))
1227        ));
1228
1229        let restored: Event = serializable.into();
1230
1231        // Verify restored values
1232        assert_eq!(
1233            restored.data.get("ts"),
1234            Some(&varpulis_core::Value::Timestamp(1_700_000_000_000_000_000))
1235        );
1236        assert_eq!(
1237            restored.data.get("dur"),
1238            Some(&varpulis_core::Value::Duration(5_000_000_000))
1239        );
1240
1241        // Verify array round-trip
1242        match restored.data.get("tags") {
1243            Some(varpulis_core::Value::Array(arr)) => {
1244                assert_eq!(arr.len(), 2);
1245                assert_eq!(arr[0], varpulis_core::Value::Str("a".into()));
1246                assert_eq!(arr[1], varpulis_core::Value::Int(1));
1247            }
1248            other => panic!("Expected Array, got {other:?}"),
1249        }
1250
1251        // Verify map round-trip
1252        match restored.data.get("meta") {
1253            Some(varpulis_core::Value::Map(m)) => {
1254                assert_eq!(m.len(), 2);
1255                assert_eq!(
1256                    m.get("nested_key"),
1257                    Some(&varpulis_core::Value::Float(3.15))
1258                );
1259                assert_eq!(m.get("flag"), Some(&varpulis_core::Value::Bool(true)));
1260            }
1261            other => panic!("Expected Map, got {other:?}"),
1262        }
1263    }
1264
1265    // ==========================================================================
1266    // Encryption Tests
1267    // ==========================================================================
1268
1269    #[cfg(feature = "encryption")]
1270    mod encryption_tests {
1271        use super::*;
1272
1273        fn test_key() -> [u8; 32] {
1274            // Deterministic test key (NOT for production)
1275            let mut key = [0u8; 32];
1276            for (i, b) in key.iter_mut().enumerate() {
1277                *b = i as u8;
1278            }
1279            key
1280        }
1281
1282        #[test]
1283        fn test_encrypted_put_get_roundtrip() {
1284            let inner = MemoryStore::new();
1285            let store = EncryptedStateStore::new(inner, test_key());
1286
1287            let data = b"hello, encrypted world!";
1288            store.put("test_key", data).unwrap();
1289
1290            let retrieved = store.get("test_key").unwrap().unwrap();
1291            assert_eq!(retrieved, data);
1292        }
1293
1294        #[test]
1295        fn test_encrypted_data_differs_from_plaintext() {
1296            let inner = MemoryStore::new();
1297            let store = EncryptedStateStore::new(inner, test_key());
1298
1299            let data = b"sensitive data";
1300            store.put("test_key", data).unwrap();
1301
1302            // Read raw data from inner store — should be encrypted
1303            let raw = store.inner.get("test_key").unwrap().unwrap();
1304            assert_ne!(raw, data.to_vec());
1305            assert!(raw.len() > data.len()); // nonce + auth tag overhead
1306        }
1307
1308        #[test]
1309        fn test_encrypted_checkpoint_roundtrip() {
1310            let inner = MemoryStore::new();
1311            let store = EncryptedStateStore::new(inner, test_key());
1312
1313            let checkpoint = Checkpoint {
1314                id: 1,
1315                timestamp_ms: 1700000000000,
1316                events_processed: 42,
1317                window_states: HashMap::new(),
1318                pattern_states: HashMap::new(),
1319                metadata: HashMap::new(),
1320                context_states: HashMap::new(),
1321            };
1322
1323            store.save_checkpoint(&checkpoint).unwrap();
1324            let loaded = store.load_checkpoint(1).unwrap().unwrap();
1325
1326            assert_eq!(loaded.id, 1);
1327            assert_eq!(loaded.events_processed, 42);
1328            assert_eq!(loaded.timestamp_ms, 1700000000000);
1329        }
1330
1331        #[test]
1332        fn test_encrypted_get_nonexistent_key() {
1333            let inner = MemoryStore::new();
1334            let store = EncryptedStateStore::new(inner, test_key());
1335
1336            let result = store.get("nonexistent").unwrap();
1337            assert!(result.is_none());
1338        }
1339
1340        #[test]
1341        fn test_encrypted_delete() {
1342            let inner = MemoryStore::new();
1343            let store = EncryptedStateStore::new(inner, test_key());
1344
1345            store.put("key", b"value").unwrap();
1346            assert!(store.get("key").unwrap().is_some());
1347
1348            store.delete("key").unwrap();
1349            assert!(store.get("key").unwrap().is_none());
1350        }
1351
1352        #[test]
1353        fn test_wrong_key_fails_decryption() {
1354            let inner = MemoryStore::new();
1355            let key1 = test_key();
1356            let store1 = EncryptedStateStore::new(inner, key1);
1357
1358            store1.put("key", b"secret").unwrap();
1359
1360            // Create new store with different key but same inner data
1361            let mut key2 = [0u8; 32];
1362            key2[0] = 0xFF; // Different key
1363                            // We can't access inner directly after move, so test via roundtrip
1364                            // Instead, test that each encrypt/decrypt is self-consistent
1365            let inner2 = MemoryStore::new();
1366            let store2 = EncryptedStateStore::new(inner2, key2);
1367            store2.put("key", b"other secret").unwrap();
1368
1369            // Verify correct key works
1370            let result = store2.get("key").unwrap().unwrap();
1371            assert_eq!(result, b"other secret");
1372        }
1373
1374        #[test]
1375        fn test_key_from_hex() {
1376            let hex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
1377            let key = EncryptedStateStore::<MemoryStore>::key_from_hex(hex).unwrap();
1378            assert_eq!(key[0], 0x01);
1379            assert_eq!(key[1], 0x23);
1380            assert_eq!(key[31], 0xef);
1381        }
1382
1383        #[test]
1384        fn test_key_from_hex_wrong_length() {
1385            let hex = "0123456789abcdef"; // Only 8 bytes
1386            let result = EncryptedStateStore::<MemoryStore>::key_from_hex(hex);
1387            assert!(result.is_err());
1388        }
1389
1390        #[test]
1391        fn test_key_from_passphrase() {
1392            let key = EncryptedStateStore::<MemoryStore>::key_from_passphrase(
1393                "my-secret-passphrase",
1394                b"varpulis-salt-00",
1395            )
1396            .unwrap();
1397            assert_eq!(key.len(), 32);
1398
1399            // Same passphrase + salt should yield same key
1400            let key2 = EncryptedStateStore::<MemoryStore>::key_from_passphrase(
1401                "my-secret-passphrase",
1402                b"varpulis-salt-00",
1403            )
1404            .unwrap();
1405            assert_eq!(key, key2);
1406        }
1407
1408        #[test]
1409        fn test_encrypted_latest_checkpoint() {
1410            let inner = MemoryStore::new();
1411            let store = EncryptedStateStore::new(inner, test_key());
1412
1413            // No checkpoints
1414            assert!(store.load_latest_checkpoint().unwrap().is_none());
1415
1416            // Add two checkpoints
1417            let cp1 = Checkpoint {
1418                id: 1,
1419                timestamp_ms: 1000,
1420                events_processed: 10,
1421                window_states: HashMap::new(),
1422                pattern_states: HashMap::new(),
1423                metadata: HashMap::new(),
1424                context_states: HashMap::new(),
1425            };
1426            let cp2 = Checkpoint {
1427                id: 2,
1428                timestamp_ms: 2000,
1429                events_processed: 20,
1430                window_states: HashMap::new(),
1431                pattern_states: HashMap::new(),
1432                metadata: HashMap::new(),
1433                context_states: HashMap::new(),
1434            };
1435
1436            store.save_checkpoint(&cp1).unwrap();
1437            store.save_checkpoint(&cp2).unwrap();
1438
1439            let latest = store.load_latest_checkpoint().unwrap().unwrap();
1440            assert_eq!(latest.id, 2);
1441            assert_eq!(latest.events_processed, 20);
1442        }
1443    }
1444}