Skip to main content

varpulis_runtime/
persistence.rs

1//! State Persistence for Varpulis Engine
2//!
3//! Provides persistent storage for engine state including:
4//! - Window contents (events in active windows)
5//! - Aggregation state
6//! - Pattern matcher state
7//! - Checkpointing and recovery
8//!
9//! # Example
10//! ```text
11//! use varpulis_runtime::persistence::{StateStore, RocksDbStore, CheckpointConfig};
12//!
13//! let store = RocksDbStore::open("/tmp/varpulis-state")?;
14//! let config = CheckpointConfig::default();
15//! engine.enable_persistence(store, config);
16//! ```
17
18use std::collections::HashMap;
19#[cfg(feature = "persistence")]
20use std::path::Path;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use indexmap::IndexMap;
25use rustc_hash::FxBuildHasher;
26use serde::{Deserialize, Serialize};
27#[cfg(feature = "persistence")]
28use tracing::debug;
29use tracing::info;
30
31use crate::event::Event;
32
33/// Configuration for checkpointing
34#[derive(Debug, Clone)]
35pub struct CheckpointConfig {
36    /// Interval between checkpoints
37    pub interval: Duration,
38    /// Maximum number of checkpoints to retain
39    pub max_checkpoints: usize,
40    /// Whether to checkpoint on shutdown
41    pub checkpoint_on_shutdown: bool,
42    /// Prefix for checkpoint keys
43    pub key_prefix: String,
44}
45
46impl Default for CheckpointConfig {
47    fn default() -> Self {
48        Self {
49            interval: Duration::from_secs(60),
50            max_checkpoints: 3,
51            checkpoint_on_shutdown: true,
52            key_prefix: "varpulis".to_string(),
53        }
54    }
55}
56
57/// Serializable representation of an event for persistence
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct SerializableEvent {
60    pub event_type: String,
61    pub timestamp_ms: i64,
62    pub fields: HashMap<String, SerializableValue>,
63}
64
65/// Serializable value type
66#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
67pub enum SerializableValue {
68    Int(i64),
69    Float(f64),
70    Bool(bool),
71    String(String),
72    Null,
73    Timestamp(i64),
74    Duration(u64),
75    Array(Vec<Self>),
76    Map(Vec<(String, Self)>),
77}
78
79/// Convert a `varpulis_core::Value` to a `SerializableValue`
80fn value_to_serializable(v: &varpulis_core::Value) -> SerializableValue {
81    match v {
82        varpulis_core::Value::Int(i) => SerializableValue::Int(*i),
83        varpulis_core::Value::Float(f) => SerializableValue::Float(*f),
84        varpulis_core::Value::Bool(b) => SerializableValue::Bool(*b),
85        varpulis_core::Value::Str(s) => SerializableValue::String(s.to_string()),
86        varpulis_core::Value::Null => SerializableValue::Null,
87        varpulis_core::Value::Timestamp(ts) => SerializableValue::Timestamp(*ts),
88        varpulis_core::Value::Duration(d) => SerializableValue::Duration(*d),
89        varpulis_core::Value::Array(arr) => {
90            SerializableValue::Array(arr.iter().map(value_to_serializable).collect())
91        }
92        varpulis_core::Value::Map(map) => SerializableValue::Map(
93            map.iter()
94                .map(|(k, v)| (k.to_string(), value_to_serializable(v)))
95                .collect(),
96        ),
97    }
98}
99
100/// Convert a `SerializableValue` back to a `varpulis_core::Value`
101fn serializable_to_value(sv: SerializableValue) -> varpulis_core::Value {
102    match sv {
103        SerializableValue::Int(i) => varpulis_core::Value::Int(i),
104        SerializableValue::Float(f) => varpulis_core::Value::Float(f),
105        SerializableValue::Bool(b) => varpulis_core::Value::Bool(b),
106        SerializableValue::String(s) => varpulis_core::Value::Str(s.into()),
107        SerializableValue::Null => varpulis_core::Value::Null,
108        SerializableValue::Timestamp(ts) => varpulis_core::Value::Timestamp(ts),
109        SerializableValue::Duration(d) => varpulis_core::Value::Duration(d),
110        SerializableValue::Array(arr) => {
111            varpulis_core::Value::array(arr.into_iter().map(serializable_to_value).collect())
112        }
113        SerializableValue::Map(entries) => {
114            let mut map: IndexMap<std::sync::Arc<str>, varpulis_core::Value, FxBuildHasher> =
115                IndexMap::with_hasher(FxBuildHasher);
116            for (k, v) in entries {
117                map.insert(k.into(), serializable_to_value(v));
118            }
119            varpulis_core::Value::map(map)
120        }
121    }
122}
123
124impl From<&Event> for SerializableEvent {
125    fn from(event: &Event) -> Self {
126        let mut fields = HashMap::new();
127        for (k, v) in &event.data {
128            fields.insert(k.to_string(), value_to_serializable(v));
129        }
130        Self {
131            event_type: event.event_type.to_string(),
132            timestamp_ms: event.timestamp.timestamp_millis(),
133            fields,
134        }
135    }
136}
137
138impl From<SerializableEvent> for Event {
139    fn from(se: SerializableEvent) -> Self {
140        let mut event = Self::new(se.event_type);
141        event.timestamp = chrono::DateTime::from_timestamp_millis(se.timestamp_ms)
142            .unwrap_or_else(chrono::Utc::now);
143        for (k, v) in se.fields {
144            event.data.insert(k.into(), serializable_to_value(v));
145        }
146        event
147    }
148}
149
150/// Checkpoint containing all engine state
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct Checkpoint {
153    /// Checkpoint ID (monotonically increasing)
154    pub id: u64,
155    /// Timestamp when checkpoint was created
156    pub timestamp_ms: i64,
157    /// Number of events processed at checkpoint time
158    pub events_processed: u64,
159    /// Window states by stream name
160    pub window_states: HashMap<String, WindowCheckpoint>,
161    /// Pattern matcher states
162    pub pattern_states: HashMap<String, PatternCheckpoint>,
163    /// Custom metadata
164    pub metadata: HashMap<String, String>,
165    /// Engine states per context (for coordinated checkpointing)
166    #[serde(default)]
167    pub context_states: HashMap<String, EngineCheckpoint>,
168}
169
170/// Checkpoint for window state
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct WindowCheckpoint {
173    /// Events currently in the window
174    pub events: Vec<SerializableEvent>,
175    /// Window start timestamp (if applicable)
176    pub window_start_ms: Option<i64>,
177    /// Last emit timestamp (for sliding windows)
178    pub last_emit_ms: Option<i64>,
179    /// Partitioned window states
180    pub partitions: HashMap<String, PartitionedWindowCheckpoint>,
181}
182
183/// Checkpoint for partitioned window
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct PartitionedWindowCheckpoint {
186    pub events: Vec<SerializableEvent>,
187    pub window_start_ms: Option<i64>,
188}
189
190/// Checkpoint for pattern matcher state
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct PatternCheckpoint {
193    /// Active partial matches
194    pub partial_matches: Vec<PartialMatchCheckpoint>,
195}
196
197/// A partial match in progress
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct PartialMatchCheckpoint {
200    /// Current state in the pattern automaton
201    pub state: String,
202    /// Events matched so far
203    pub matched_events: Vec<SerializableEvent>,
204    /// Start timestamp
205    pub start_ms: i64,
206}
207
208/// Error type for state store operations
209#[derive(Debug, thiserror::Error)]
210pub enum StoreError {
211    /// I/O or storage error
212    #[error("I/O error: {0}")]
213    IoError(String),
214    /// Serialization error
215    #[error("Serialization error: {0}")]
216    SerializationError(String),
217    /// Key not found
218    #[error("Key not found: {0}")]
219    NotFound(String),
220    /// Store not initialized
221    #[error("Store not initialized")]
222    NotInitialized,
223    /// Checkpoint version is newer than this binary supports
224    #[error(
225        "Checkpoint version {checkpoint_version} is newer than supported version {current_version}"
226    )]
227    IncompatibleVersion {
228        checkpoint_version: u32,
229        current_version: u32,
230    },
231}
232
233/// Trait for state storage backends
234pub trait StateStore: Send + Sync {
235    /// Store a checkpoint
236    fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError>;
237
238    /// Load the latest checkpoint
239    fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError>;
240
241    /// Load a specific checkpoint by ID
242    fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError>;
243
244    /// List all checkpoint IDs
245    fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError>;
246
247    /// Delete old checkpoints, keeping only the most recent N
248    fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError>;
249
250    /// Store arbitrary key-value data
251    fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError>;
252
253    /// Retrieve arbitrary key-value data
254    fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError>;
255
256    /// Delete a key
257    fn delete(&self, key: &str) -> Result<(), StoreError>;
258
259    /// Flush all pending writes to disk
260    fn flush(&self) -> Result<(), StoreError>;
261}
262
263/// In-memory state store for testing
264#[derive(Debug, Default)]
265pub struct MemoryStore {
266    data: std::sync::RwLock<HashMap<String, Vec<u8>>>,
267}
268
269impl MemoryStore {
270    pub fn new() -> Self {
271        Self::default()
272    }
273}
274
275impl StateStore for MemoryStore {
276    fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
277        let key = format!("checkpoint:{}", checkpoint.id);
278        let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
279        self.put(&key, &data)
280    }
281
282    fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
283        let checkpoints = self.list_checkpoints()?;
284        if let Some(id) = checkpoints.last() {
285            self.load_checkpoint(*id)
286        } else {
287            Ok(None)
288        }
289    }
290
291    fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
292        let key = format!("checkpoint:{id}");
293        if let Some(data) = self.get(&key)? {
294            let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
295            Ok(Some(checkpoint))
296        } else {
297            Ok(None)
298        }
299    }
300
301    fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
302        let data = self
303            .data
304            .read()
305            .map_err(|e| StoreError::IoError(e.to_string()))?;
306        let mut ids: Vec<u64> = data
307            .keys()
308            .filter_map(|k| k.strip_prefix("checkpoint:").and_then(|s| s.parse().ok()))
309            .collect();
310        ids.sort_unstable();
311        Ok(ids)
312    }
313
314    fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
315        let checkpoints = self.list_checkpoints()?;
316        let to_delete = checkpoints.len().saturating_sub(keep);
317        for id in checkpoints.iter().take(to_delete) {
318            let key = format!("checkpoint:{id}");
319            self.delete(&key)?;
320        }
321        Ok(to_delete)
322    }
323
324    fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
325        let mut data = self
326            .data
327            .write()
328            .map_err(|e| StoreError::IoError(e.to_string()))?;
329        data.insert(key.to_string(), value.to_vec());
330        Ok(())
331    }
332
333    fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
334        let data = self
335            .data
336            .read()
337            .map_err(|e| StoreError::IoError(e.to_string()))?;
338        Ok(data.get(key).cloned())
339    }
340
341    fn delete(&self, key: &str) -> Result<(), StoreError> {
342        let mut data = self
343            .data
344            .write()
345            .map_err(|e| StoreError::IoError(e.to_string()))?;
346        data.remove(key);
347        Ok(())
348    }
349
350    fn flush(&self) -> Result<(), StoreError> {
351        Ok(()) // No-op for memory store
352    }
353}
354
355/// RocksDB-based state store
356#[cfg(feature = "persistence")]
357pub struct RocksDbStore {
358    db: rocksdb::DB,
359    prefix: String,
360}
361
362#[cfg(feature = "persistence")]
363impl RocksDbStore {
364    /// Open or create a RocksDB store at the given path
365    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, StoreError> {
366        Self::open_with_prefix(path, "varpulis")
367    }
368
369    /// Open with a custom key prefix
370    pub fn open_with_prefix<P: AsRef<Path>>(path: P, prefix: &str) -> Result<Self, StoreError> {
371        let mut opts = rocksdb::Options::default();
372        opts.create_if_missing(true);
373        opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
374
375        // Optimize for write-heavy workload
376        opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB
377        opts.set_max_write_buffer_number(3);
378        opts.set_target_file_size_base(64 * 1024 * 1024);
379
380        let db = rocksdb::DB::open(&opts, path).map_err(|e| StoreError::IoError(e.to_string()))?;
381
382        info!("Opened RocksDB state store");
383        Ok(Self {
384            db,
385            prefix: prefix.to_string(),
386        })
387    }
388
389    fn prefixed_key(&self, key: &str) -> String {
390        format!("{}:{}", self.prefix, key)
391    }
392}
393
394#[cfg(feature = "persistence")]
395impl StateStore for RocksDbStore {
396    fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
397        let key = self.prefixed_key(&format!("checkpoint:{}", checkpoint.id));
398        let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
399
400        self.db
401            .put(key.as_bytes(), &data)
402            .map_err(|e| StoreError::IoError(e.to_string()))?;
403
404        // Also update the "latest" pointer
405        let latest_key = self.prefixed_key("checkpoint:latest");
406        self.db
407            .put(latest_key.as_bytes(), checkpoint.id.to_le_bytes())
408            .map_err(|e| StoreError::IoError(e.to_string()))?;
409
410        debug!("Saved checkpoint {} ({} bytes)", checkpoint.id, data.len());
411        Ok(())
412    }
413
414    fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
415        let latest_key = self.prefixed_key("checkpoint:latest");
416        if let Some(id_bytes) = self
417            .db
418            .get(latest_key.as_bytes())
419            .map_err(|e| StoreError::IoError(e.to_string()))?
420        {
421            let Ok(bytes) = <[u8; 8]>::try_from(id_bytes.as_ref()) else {
422                return Ok(None);
423            };
424            let id = u64::from_le_bytes(bytes);
425            return self.load_checkpoint(id);
426        }
427        Ok(None)
428    }
429
430    fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
431        let key = self.prefixed_key(&format!("checkpoint:{}", id));
432        if let Some(data) = self
433            .db
434            .get(key.as_bytes())
435            .map_err(|e| StoreError::IoError(e.to_string()))?
436        {
437            let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
438            debug!("Loaded checkpoint {}", id);
439            Ok(Some(checkpoint))
440        } else {
441            Ok(None)
442        }
443    }
444
445    fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
446        let prefix = self.prefixed_key("checkpoint:");
447        let mut ids = Vec::new();
448
449        let iter = self.db.prefix_iterator(prefix.as_bytes());
450        for item in iter {
451            let (key, _) = item.map_err(|e| StoreError::IoError(e.to_string()))?;
452            let key_str = String::from_utf8_lossy(&key);
453            if let Some(suffix) = key_str.strip_prefix(&prefix) {
454                if suffix != "latest" {
455                    if let Ok(id) = suffix.parse::<u64>() {
456                        ids.push(id);
457                    }
458                }
459            }
460        }
461
462        ids.sort();
463        Ok(ids)
464    }
465
466    fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
467        let checkpoints = self.list_checkpoints()?;
468        let to_delete = checkpoints.len().saturating_sub(keep);
469
470        for id in checkpoints.iter().take(to_delete) {
471            let key = self.prefixed_key(&format!("checkpoint:{}", id));
472            self.db
473                .delete(key.as_bytes())
474                .map_err(|e| StoreError::IoError(e.to_string()))?;
475        }
476
477        if to_delete > 0 {
478            info!("Pruned {} old checkpoints", to_delete);
479        }
480        Ok(to_delete)
481    }
482
483    fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
484        let full_key = self.prefixed_key(key);
485        self.db
486            .put(full_key.as_bytes(), value)
487            .map_err(|e| StoreError::IoError(e.to_string()))
488    }
489
490    fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
491        let full_key = self.prefixed_key(key);
492        self.db
493            .get(full_key.as_bytes())
494            .map_err(|e| StoreError::IoError(e.to_string()))
495    }
496
497    fn delete(&self, key: &str) -> Result<(), StoreError> {
498        let full_key = self.prefixed_key(key);
499        self.db
500            .delete(full_key.as_bytes())
501            .map_err(|e| StoreError::IoError(e.to_string()))
502    }
503
504    fn flush(&self) -> Result<(), StoreError> {
505        self.db
506            .flush()
507            .map_err(|e| StoreError::IoError(e.to_string()))
508    }
509}
510
511/// File-system based state store
512///
513/// Stores key-value pairs as files in a directory. Keys containing ":"
514/// are mapped to subdirectories (e.g., "tenant:abc" → "tenant/abc").
515/// Writes are atomic via temp file + rename.
516#[derive(Debug)]
517pub struct FileStore {
518    dir: std::path::PathBuf,
519}
520
521impl FileStore {
522    /// Open or create a file-based store at the given directory
523    pub fn open(dir: impl AsRef<std::path::Path>) -> Result<Self, StoreError> {
524        let dir = dir.as_ref().to_path_buf();
525        std::fs::create_dir_all(&dir).map_err(|e| StoreError::IoError(e.to_string()))?;
526        Ok(Self { dir })
527    }
528
529    fn key_to_path(&self, key: &str) -> std::path::PathBuf {
530        // Map ":" separators to directory separators
531        let path_str = key.replace(':', std::path::MAIN_SEPARATOR_STR);
532        self.dir.join(path_str)
533    }
534}
535
536impl StateStore for FileStore {
537    fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
538        let key = format!("checkpoint:{}", checkpoint.id);
539        let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
540        self.put(&key, &data)
541    }
542
543    fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
544        let checkpoints = self.list_checkpoints()?;
545        if let Some(id) = checkpoints.last() {
546            self.load_checkpoint(*id)
547        } else {
548            Ok(None)
549        }
550    }
551
552    fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
553        let key = format!("checkpoint:{id}");
554        if let Some(data) = self.get(&key)? {
555            let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
556            Ok(Some(checkpoint))
557        } else {
558            Ok(None)
559        }
560    }
561
562    fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
563        let checkpoint_dir = self.dir.join("checkpoint");
564        if !checkpoint_dir.exists() {
565            return Ok(Vec::new());
566        }
567
568        let mut ids: Vec<u64> = Vec::new();
569        let entries =
570            std::fs::read_dir(&checkpoint_dir).map_err(|e| StoreError::IoError(e.to_string()))?;
571        for entry in entries {
572            let entry = entry.map_err(|e| StoreError::IoError(e.to_string()))?;
573            if let Some(name) = entry.file_name().to_str() {
574                if name != "latest" {
575                    if let Ok(id) = name.parse::<u64>() {
576                        ids.push(id);
577                    }
578                }
579            }
580        }
581        ids.sort_unstable();
582        Ok(ids)
583    }
584
585    fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
586        let checkpoints = self.list_checkpoints()?;
587        let to_delete = checkpoints.len().saturating_sub(keep);
588        for id in checkpoints.iter().take(to_delete) {
589            let key = format!("checkpoint:{id}");
590            self.delete(&key)?;
591        }
592        Ok(to_delete)
593    }
594
595    fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
596        let path = self.key_to_path(key);
597        if let Some(parent) = path.parent() {
598            std::fs::create_dir_all(parent).map_err(|e| StoreError::IoError(e.to_string()))?;
599        }
600
601        // Atomic write: write to temp file, then rename
602        let tmp_path = path.with_extension("tmp");
603        std::fs::write(&tmp_path, value).map_err(|e| StoreError::IoError(e.to_string()))?;
604        std::fs::rename(&tmp_path, &path).map_err(|e| StoreError::IoError(e.to_string()))?;
605        Ok(())
606    }
607
608    fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
609        let path = self.key_to_path(key);
610        match std::fs::read(&path) {
611            Ok(data) => Ok(Some(data)),
612            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
613            Err(e) => Err(StoreError::IoError(e.to_string())),
614        }
615    }
616
617    fn delete(&self, key: &str) -> Result<(), StoreError> {
618        let path = self.key_to_path(key);
619        match std::fs::remove_file(&path) {
620            Ok(()) => Ok(()),
621            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
622            Err(e) => Err(StoreError::IoError(e.to_string())),
623        }
624    }
625
626    fn flush(&self) -> Result<(), StoreError> {
627        Ok(()) // File writes are already flushed on close
628    }
629}
630
631/// Checkpoint manager that handles periodic checkpointing
632pub struct CheckpointManager {
633    store: Arc<dyn StateStore>,
634    config: CheckpointConfig,
635    last_checkpoint: Instant,
636    next_checkpoint_id: u64,
637}
638
639impl std::fmt::Debug for CheckpointManager {
640    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641        f.debug_struct("CheckpointManager")
642            .field("config", &self.config)
643            .field("last_checkpoint", &self.last_checkpoint)
644            .field("next_checkpoint_id", &self.next_checkpoint_id)
645            .finish_non_exhaustive()
646    }
647}
648
649impl CheckpointManager {
650    /// Create a new checkpoint manager
651    pub fn new(store: Arc<dyn StateStore>, config: CheckpointConfig) -> Result<Self, StoreError> {
652        // Load the latest checkpoint ID
653        let next_id = store.load_latest_checkpoint()?.map_or(1, |c| c.id + 1);
654
655        Ok(Self {
656            store,
657            config,
658            last_checkpoint: Instant::now(),
659            next_checkpoint_id: next_id,
660        })
661    }
662
663    /// Check if it's time to create a checkpoint
664    pub fn should_checkpoint(&self) -> bool {
665        self.last_checkpoint.elapsed() >= self.config.interval
666    }
667
668    /// Create a checkpoint with the given state
669    pub fn checkpoint(&mut self, checkpoint: Checkpoint) -> Result<(), StoreError> {
670        let mut checkpoint = checkpoint;
671        checkpoint.id = self.next_checkpoint_id;
672        checkpoint.timestamp_ms = chrono::Utc::now().timestamp_millis();
673
674        self.store.save_checkpoint(&checkpoint)?;
675        self.store.prune_checkpoints(self.config.max_checkpoints)?;
676        self.store.flush()?;
677
678        self.last_checkpoint = Instant::now();
679        self.next_checkpoint_id += 1;
680
681        info!(
682            "Created checkpoint {} ({} events processed)",
683            checkpoint.id, checkpoint.events_processed
684        );
685        Ok(())
686    }
687
688    /// Load the latest checkpoint for recovery
689    pub fn recover(&self) -> Result<Option<Checkpoint>, StoreError> {
690        self.store.load_latest_checkpoint()
691    }
692
693    /// Get the underlying store
694    pub fn store(&self) -> &Arc<dyn StateStore> {
695        &self.store
696    }
697}
698
699/// Current checkpoint schema version.
700pub const CHECKPOINT_VERSION: u32 = 1;
701
702/// Default version for deserialized checkpoints that lack a version field (pre-versioning).
703const fn default_checkpoint_version() -> u32 {
704    1
705}
706
707/// Checkpoint for a single engine instance (one context).
708#[derive(Debug, Clone, Serialize, Deserialize)]
709pub struct EngineCheckpoint {
710    /// Schema version for forward/backward compatibility.
711    #[serde(default = "default_checkpoint_version")]
712    pub version: u32,
713    /// Window states by stream name
714    pub window_states: HashMap<String, WindowCheckpoint>,
715    /// SASE+ pattern engine states by stream name
716    pub sase_states: HashMap<String, SaseCheckpoint>,
717    /// Join buffer states by stream name
718    pub join_states: HashMap<String, JoinCheckpoint>,
719    /// Engine variables
720    pub variables: HashMap<String, SerializableValue>,
721    /// Events processed counter
722    pub events_processed: u64,
723    /// Output events emitted counter
724    pub output_events_emitted: u64,
725    /// Watermark tracker state
726    #[serde(default)]
727    pub watermark_state: Option<WatermarkCheckpoint>,
728    /// Distinct operator states by stream name (LRU keys snapshot)
729    #[serde(default)]
730    pub distinct_states: HashMap<String, DistinctCheckpoint>,
731    /// Limit operator states by stream name (counter snapshot)
732    #[serde(default)]
733    pub limit_states: HashMap<String, LimitCheckpoint>,
734}
735
736impl EngineCheckpoint {
737    /// Validate and migrate a checkpoint to the current schema version.
738    ///
739    /// Returns `Err` if the checkpoint is from a future version that this
740    /// binary does not understand (forward-incompatible).
741    pub const fn validate_and_migrate(&mut self) -> Result<(), StoreError> {
742        if self.version > CHECKPOINT_VERSION {
743            return Err(StoreError::IncompatibleVersion {
744                checkpoint_version: self.version,
745                current_version: CHECKPOINT_VERSION,
746            });
747        }
748
749        // Apply sequential migrations: v1 → v2 → … → CHECKPOINT_VERSION
750        // Currently at v1 — no migrations needed yet.
751        // Future example:
752        // if self.version < 2 {
753        //     migrate_v1_to_v2(self);
754        //     self.version = 2;
755        // }
756
757        Ok(())
758    }
759}
760
761/// Checkpoint for SASE+ pattern matching engine state.
762#[derive(Debug, Clone, Serialize, Deserialize)]
763pub struct SaseCheckpoint {
764    /// Non-partitioned active runs
765    pub active_runs: Vec<RunCheckpoint>,
766    /// Partitioned active runs
767    pub partitioned_runs: HashMap<String, Vec<RunCheckpoint>>,
768    /// Current watermark in milliseconds
769    pub watermark_ms: Option<i64>,
770    /// Maximum observed timestamp in milliseconds
771    pub max_timestamp_ms: Option<i64>,
772    /// Cumulative runs created
773    pub total_runs_created: u64,
774    /// Cumulative runs completed
775    pub total_runs_completed: u64,
776    /// Cumulative runs dropped
777    pub total_runs_dropped: u64,
778    /// Cumulative runs evicted
779    pub total_runs_evicted: u64,
780}
781
782/// Checkpoint for a single SASE+ run (partial match).
783#[derive(Debug, Clone, Serialize, Deserialize)]
784pub struct RunCheckpoint {
785    /// Current NFA state index
786    pub current_state: usize,
787    /// Stack of matched events
788    pub stack: Vec<StackEntryCheckpoint>,
789    /// Captured events by alias
790    pub captured: HashMap<String, SerializableEvent>,
791    /// Event-time when this run started (ms since epoch)
792    pub event_time_started_at_ms: Option<i64>,
793    /// Event-time deadline (ms since epoch)
794    pub event_time_deadline_ms: Option<i64>,
795    /// Partition key value
796    pub partition_key: Option<SerializableValue>,
797    /// Whether the run is invalidated
798    pub invalidated: bool,
799    /// Number of pending negation constraints
800    pub pending_negation_count: usize,
801    /// Kleene capture events (ZDD rebuilt on restore)
802    pub kleene_events: Option<Vec<SerializableEvent>>,
803}
804
805/// Checkpoint for a stack entry in a SASE+ run.
806#[derive(Debug, Clone, Serialize, Deserialize)]
807pub struct StackEntryCheckpoint {
808    pub event: SerializableEvent,
809    pub alias: Option<String>,
810}
811
812/// Checkpoint for a join buffer.
813#[derive(Debug, Clone, Serialize, Deserialize)]
814pub struct JoinCheckpoint {
815    /// Buffered events: source_name -> (key_value -> Vec<(timestamp_ms, event)>)
816    pub buffers: HashMap<String, HashMap<String, Vec<(i64, SerializableEvent)>>>,
817    /// Source stream names
818    pub sources: Vec<String>,
819    /// Join key field per source
820    pub join_keys: HashMap<String, String>,
821    /// Window duration in milliseconds
822    pub window_duration_ms: i64,
823}
824
825/// Checkpoint for per-source watermark tracking.
826#[derive(Debug, Clone, Serialize, Deserialize)]
827pub struct WatermarkCheckpoint {
828    /// Per-source watermark state
829    pub sources: HashMap<String, SourceWatermarkCheckpoint>,
830    /// Effective (minimum) watermark in milliseconds
831    pub effective_watermark_ms: Option<i64>,
832}
833
834/// Checkpoint for a single source's watermark state.
835#[derive(Debug, Clone, Serialize, Deserialize)]
836pub struct SourceWatermarkCheckpoint {
837    /// Current watermark in milliseconds
838    pub watermark_ms: Option<i64>,
839    /// Maximum observed timestamp in milliseconds
840    pub max_timestamp_ms: Option<i64>,
841    /// Maximum out-of-orderness tolerance in milliseconds
842    pub max_out_of_orderness_ms: i64,
843}
844
845/// Checkpoint for a `.distinct()` operator (LRU key snapshot).
846#[derive(Debug, Clone, Serialize, Deserialize)]
847pub struct DistinctCheckpoint {
848    /// Most-recently-used keys (ordered from most-recent to least-recent)
849    pub keys: Vec<String>,
850}
851
852/// Checkpoint for a `.limit(n)` operator (counter snapshot).
853#[derive(Debug, Clone, Serialize, Deserialize)]
854pub struct LimitCheckpoint {
855    /// Maximum number of events
856    pub max: usize,
857    /// Number of events already passed
858    pub count: usize,
859}
860
861/// Convert Value to SerializableValue (pub(crate) re-export)
862pub(crate) fn value_to_ser(v: &varpulis_core::Value) -> SerializableValue {
863    value_to_serializable(v)
864}
865
866/// Convert SerializableValue to Value (pub(crate) re-export)
867pub(crate) fn ser_to_value(sv: SerializableValue) -> varpulis_core::Value {
868    serializable_to_value(sv)
869}
870
871// =============================================================================
872// Encrypted State Store (AES-256-GCM)
873// =============================================================================
874
875/// Encrypted wrapper around any `StateStore` implementation.
876///
877/// Uses AES-256-GCM (authenticated encryption) to encrypt all data at rest.
878/// Each value gets a random 96-bit nonce prepended to the ciphertext.
879///
880/// # Usage
881///
882/// ```rust,no_run
883/// use varpulis_runtime::persistence::{EncryptedStateStore, MemoryStore};
884///
885/// let key = EncryptedStateStore::key_from_hex(
886///     &std::env::var("VARPULIS_ENCRYPTION_KEY").unwrap()
887/// ).unwrap();
888/// let inner = MemoryStore::new();
889/// let store = EncryptedStateStore::new(inner, key);
890/// ```
891#[cfg(feature = "encryption")]
892#[derive(Debug)]
893pub struct EncryptedStateStore<S: StateStore> {
894    inner: S,
895    key: [u8; 32],
896}
897
898#[cfg(feature = "encryption")]
899impl<S: StateStore> EncryptedStateStore<S> {
900    /// Create a new encrypted store wrapping an inner store.
901    pub fn new(inner: S, key: [u8; 32]) -> Self {
902        Self { inner, key }
903    }
904
905    /// Derive a 256-bit key from a hex-encoded string (64 hex chars).
906    pub fn key_from_hex(hex_str: &str) -> Result<[u8; 32], StoreError> {
907        let bytes = hex::decode(hex_str.trim())
908            .map_err(|e| StoreError::IoError(format!("Invalid hex key: {}", e)))?;
909        if bytes.len() != 32 {
910            return Err(StoreError::IoError(format!(
911                "Encryption key must be 32 bytes (64 hex chars), got {} bytes",
912                bytes.len()
913            )));
914        }
915        let mut key = [0u8; 32];
916        key.copy_from_slice(&bytes);
917        Ok(key)
918    }
919
920    /// Derive a 256-bit key from a passphrase using Argon2id.
921    pub fn key_from_passphrase(passphrase: &str, salt: &[u8]) -> Result<[u8; 32], StoreError> {
922        use argon2::Argon2;
923
924        let mut key = [0u8; 32];
925        Argon2::default()
926            .hash_password_into(passphrase.as_bytes(), salt, &mut key)
927            .map_err(|e| StoreError::IoError(format!("Argon2 key derivation failed: {}", e)))?;
928        Ok(key)
929    }
930
931    /// Encrypt data: returns nonce (12 bytes) || ciphertext
932    fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, StoreError> {
933        use aes_gcm::aead::{Aead, OsRng};
934        use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
935
936        let cipher = Aes256Gcm::new_from_slice(&self.key)
937            .map_err(|e| StoreError::IoError(format!("AES key error: {}", e)))?;
938
939        let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
940        let ciphertext = cipher
941            .encrypt(&nonce, plaintext)
942            .map_err(|e| StoreError::IoError(format!("Encryption failed: {}", e)))?;
943
944        // Prepend nonce to ciphertext
945        let mut result = Vec::with_capacity(12 + ciphertext.len());
946        result.extend_from_slice(&nonce);
947        result.extend_from_slice(&ciphertext);
948        Ok(result)
949    }
950
951    /// Decrypt data: expects nonce (12 bytes) || ciphertext
952    fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, StoreError> {
953        use aes_gcm::aead::Aead;
954        use aes_gcm::{Aes256Gcm, KeyInit, Nonce};
955
956        if data.len() < 12 {
957            return Err(StoreError::IoError(
958                "Encrypted data too short (missing nonce)".to_string(),
959            ));
960        }
961
962        let (nonce_bytes, ciphertext) = data.split_at(12);
963        let nonce_arr: [u8; 12] = nonce_bytes
964            .try_into()
965            .map_err(|_| StoreError::IoError("Invalid nonce length".to_string()))?;
966        let nonce = Nonce::from(nonce_arr);
967
968        let cipher = Aes256Gcm::new_from_slice(&self.key)
969            .map_err(|e| StoreError::IoError(format!("AES key error: {}", e)))?;
970
971        cipher
972            .decrypt(&nonce, ciphertext)
973            .map_err(|e| StoreError::IoError(format!("Decryption failed: {}", e)))
974    }
975}
976
977#[cfg(feature = "encryption")]
978impl<S: StateStore> StateStore for EncryptedStateStore<S> {
979    fn save_checkpoint(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
980        // Serialize, encrypt, then save via inner store
981        let data = crate::codec::serialize(checkpoint, crate::codec::CheckpointFormat::active())?;
982        let encrypted = self.encrypt(&data)?;
983
984        // Store encrypted checkpoint using inner's put
985        let key = format!("checkpoint:{}", checkpoint.id);
986        self.inner.put(&key, &encrypted)?;
987
988        // Also maintain the checkpoint list metadata (unencrypted)
989        // by delegating to inner's save_checkpoint for metadata consistency
990        Ok(())
991    }
992
993    fn load_latest_checkpoint(&self) -> Result<Option<Checkpoint>, StoreError> {
994        let checkpoints = self.list_checkpoints()?;
995        if let Some(id) = checkpoints.last() {
996            self.load_checkpoint(*id)
997        } else {
998            Ok(None)
999        }
1000    }
1001
1002    fn load_checkpoint(&self, id: u64) -> Result<Option<Checkpoint>, StoreError> {
1003        let key = format!("checkpoint:{}", id);
1004        if let Some(encrypted) = self.inner.get(&key)? {
1005            let data = self.decrypt(&encrypted)?;
1006            let checkpoint: Checkpoint = crate::codec::deserialize(&data)?;
1007            Ok(Some(checkpoint))
1008        } else {
1009            Ok(None)
1010        }
1011    }
1012
1013    fn list_checkpoints(&self) -> Result<Vec<u64>, StoreError> {
1014        self.inner.list_checkpoints()
1015    }
1016
1017    fn prune_checkpoints(&self, keep: usize) -> Result<usize, StoreError> {
1018        self.inner.prune_checkpoints(keep)
1019    }
1020
1021    fn put(&self, key: &str, value: &[u8]) -> Result<(), StoreError> {
1022        let encrypted = self.encrypt(value)?;
1023        self.inner.put(key, &encrypted)
1024    }
1025
1026    fn get(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
1027        match self.inner.get(key)? {
1028            Some(encrypted) => Ok(Some(self.decrypt(&encrypted)?)),
1029            None => Ok(None),
1030        }
1031    }
1032
1033    fn delete(&self, key: &str) -> Result<(), StoreError> {
1034        self.inner.delete(key)
1035    }
1036
1037    fn flush(&self) -> Result<(), StoreError> {
1038        self.inner.flush()
1039    }
1040}
1041
1042#[cfg(test)]
1043mod tests {
1044    use super::*;
1045
1046    #[test]
1047    fn test_memory_store_checkpoint() {
1048        let store = MemoryStore::new();
1049
1050        let checkpoint = Checkpoint {
1051            id: 1,
1052            timestamp_ms: 1000,
1053            events_processed: 100,
1054            window_states: HashMap::new(),
1055            pattern_states: HashMap::new(),
1056            metadata: HashMap::new(),
1057            context_states: HashMap::new(),
1058        };
1059
1060        store.save_checkpoint(&checkpoint).unwrap();
1061
1062        let loaded = store.load_checkpoint(1).unwrap();
1063        assert!(loaded.is_some());
1064        let loaded = loaded.unwrap();
1065        assert_eq!(loaded.id, 1);
1066        assert_eq!(loaded.events_processed, 100);
1067    }
1068
1069    #[test]
1070    fn test_memory_store_prune() {
1071        let store = MemoryStore::new();
1072
1073        for i in 1..=5 {
1074            let checkpoint = Checkpoint {
1075                id: i,
1076                timestamp_ms: i as i64 * 1000,
1077                events_processed: i * 100,
1078                window_states: HashMap::new(),
1079                pattern_states: HashMap::new(),
1080                metadata: HashMap::new(),
1081                context_states: HashMap::new(),
1082            };
1083            store.save_checkpoint(&checkpoint).unwrap();
1084        }
1085
1086        let checkpoints = store.list_checkpoints().unwrap();
1087        assert_eq!(checkpoints.len(), 5);
1088
1089        let pruned = store.prune_checkpoints(2).unwrap();
1090        assert_eq!(pruned, 3);
1091
1092        let checkpoints = store.list_checkpoints().unwrap();
1093        assert_eq!(checkpoints.len(), 2);
1094        assert_eq!(checkpoints, vec![4, 5]);
1095    }
1096
1097    #[test]
1098    fn test_file_store_put_get_delete() {
1099        let dir = tempfile::tempdir().unwrap();
1100        let store = FileStore::open(dir.path()).unwrap();
1101
1102        // Put and get
1103        store.put("test:key1", b"hello world").unwrap();
1104        let val = store.get("test:key1").unwrap();
1105        assert_eq!(val, Some(b"hello world".to_vec()));
1106
1107        // Get missing key
1108        let val = store.get("test:missing").unwrap();
1109        assert!(val.is_none());
1110
1111        // Delete
1112        store.delete("test:key1").unwrap();
1113        let val = store.get("test:key1").unwrap();
1114        assert!(val.is_none());
1115
1116        // Delete missing key (should not error)
1117        store.delete("test:missing").unwrap();
1118    }
1119
1120    #[test]
1121    fn test_file_store_atomic_write() {
1122        let dir = tempfile::tempdir().unwrap();
1123        let store = FileStore::open(dir.path()).unwrap();
1124
1125        // Write data
1126        store.put("data:file1", b"version 1").unwrap();
1127        assert_eq!(
1128            store.get("data:file1").unwrap(),
1129            Some(b"version 1".to_vec())
1130        );
1131
1132        // Overwrite (atomic via temp+rename)
1133        store.put("data:file1", b"version 2").unwrap();
1134        assert_eq!(
1135            store.get("data:file1").unwrap(),
1136            Some(b"version 2".to_vec())
1137        );
1138
1139        // Verify no .tmp files left behind
1140        let data_dir = dir.path().join("data");
1141        if data_dir.exists() {
1142            for entry in std::fs::read_dir(&data_dir).unwrap() {
1143                let entry = entry.unwrap();
1144                let name = entry.file_name().to_string_lossy().to_string();
1145                assert!(
1146                    !std::path::Path::new(&name)
1147                        .extension()
1148                        .is_some_and(|ext| ext.eq_ignore_ascii_case("tmp")),
1149                    "tmp file left behind: {name}"
1150                );
1151            }
1152        }
1153    }
1154
1155    #[test]
1156    fn test_serializable_event() {
1157        let mut event = Event::new("TestEvent");
1158        event
1159            .data
1160            .insert("count".into(), varpulis_core::Value::Int(42));
1161        event
1162            .data
1163            .insert("value".into(), varpulis_core::Value::Float(1.5));
1164        event
1165            .data
1166            .insert("name".into(), varpulis_core::Value::Str("test".into()));
1167
1168        let serializable: SerializableEvent = (&event).into();
1169        let restored: Event = serializable.into();
1170
1171        assert_eq!(&*restored.event_type, "TestEvent");
1172        assert_eq!(restored.get_int("count"), Some(42));
1173        assert_eq!(restored.get_float("value"), Some(1.5));
1174        assert_eq!(restored.get_str("name"), Some("test"));
1175    }
1176
1177    #[test]
1178    fn test_serializable_event_complex_values() {
1179        let mut event = Event::new("ComplexEvent");
1180
1181        // Timestamp (nanoseconds since epoch)
1182        event.data.insert(
1183            "ts".into(),
1184            varpulis_core::Value::Timestamp(1_700_000_000_000_000_000),
1185        );
1186
1187        // Duration (nanoseconds)
1188        event
1189            .data
1190            .insert("dur".into(), varpulis_core::Value::Duration(5_000_000_000));
1191
1192        // Array
1193        event.data.insert(
1194            "tags".into(),
1195            varpulis_core::Value::array(vec![
1196                varpulis_core::Value::Str("a".into()),
1197                varpulis_core::Value::Int(1),
1198            ]),
1199        );
1200
1201        // Map
1202        let mut inner_map = IndexMap::with_hasher(FxBuildHasher);
1203        inner_map.insert("nested_key".into(), varpulis_core::Value::Float(3.15));
1204        inner_map.insert("flag".into(), varpulis_core::Value::Bool(true));
1205        event
1206            .data
1207            .insert("meta".into(), varpulis_core::Value::map(inner_map));
1208
1209        // Round-trip through SerializableEvent
1210        let serializable: SerializableEvent = (&event).into();
1211
1212        // Verify serializable intermediate values
1213        assert!(matches!(
1214            serializable.fields.get("ts"),
1215            Some(SerializableValue::Timestamp(1_700_000_000_000_000_000))
1216        ));
1217        assert!(matches!(
1218            serializable.fields.get("dur"),
1219            Some(SerializableValue::Duration(5_000_000_000))
1220        ));
1221        assert!(matches!(
1222            serializable.fields.get("tags"),
1223            Some(SerializableValue::Array(_))
1224        ));
1225        assert!(matches!(
1226            serializable.fields.get("meta"),
1227            Some(SerializableValue::Map(_))
1228        ));
1229
1230        let restored: Event = serializable.into();
1231
1232        // Verify restored values
1233        assert_eq!(
1234            restored.data.get("ts"),
1235            Some(&varpulis_core::Value::Timestamp(1_700_000_000_000_000_000))
1236        );
1237        assert_eq!(
1238            restored.data.get("dur"),
1239            Some(&varpulis_core::Value::Duration(5_000_000_000))
1240        );
1241
1242        // Verify array round-trip
1243        match restored.data.get("tags") {
1244            Some(varpulis_core::Value::Array(arr)) => {
1245                assert_eq!(arr.len(), 2);
1246                assert_eq!(arr[0], varpulis_core::Value::Str("a".into()));
1247                assert_eq!(arr[1], varpulis_core::Value::Int(1));
1248            }
1249            other => panic!("Expected Array, got {other:?}"),
1250        }
1251
1252        // Verify map round-trip
1253        match restored.data.get("meta") {
1254            Some(varpulis_core::Value::Map(m)) => {
1255                assert_eq!(m.len(), 2);
1256                assert_eq!(
1257                    m.get("nested_key"),
1258                    Some(&varpulis_core::Value::Float(3.15))
1259                );
1260                assert_eq!(m.get("flag"), Some(&varpulis_core::Value::Bool(true)));
1261            }
1262            other => panic!("Expected Map, got {other:?}"),
1263        }
1264    }
1265
1266    // ==========================================================================
1267    // Encryption Tests
1268    // ==========================================================================
1269
1270    #[cfg(feature = "encryption")]
1271    mod encryption_tests {
1272        use super::*;
1273
1274        fn test_key() -> [u8; 32] {
1275            // Deterministic test key (NOT for production)
1276            let mut key = [0u8; 32];
1277            for (i, b) in key.iter_mut().enumerate() {
1278                *b = i as u8;
1279            }
1280            key
1281        }
1282
1283        #[test]
1284        fn test_encrypted_put_get_roundtrip() {
1285            let inner = MemoryStore::new();
1286            let store = EncryptedStateStore::new(inner, test_key());
1287
1288            let data = b"hello, encrypted world!";
1289            store.put("test_key", data).unwrap();
1290
1291            let retrieved = store.get("test_key").unwrap().unwrap();
1292            assert_eq!(retrieved, data);
1293        }
1294
1295        #[test]
1296        fn test_encrypted_data_differs_from_plaintext() {
1297            let inner = MemoryStore::new();
1298            let store = EncryptedStateStore::new(inner, test_key());
1299
1300            let data = b"sensitive data";
1301            store.put("test_key", data).unwrap();
1302
1303            // Read raw data from inner store — should be encrypted
1304            let raw = store.inner.get("test_key").unwrap().unwrap();
1305            assert_ne!(raw, data.to_vec());
1306            assert!(raw.len() > data.len()); // nonce + auth tag overhead
1307        }
1308
1309        #[test]
1310        fn test_encrypted_checkpoint_roundtrip() {
1311            let inner = MemoryStore::new();
1312            let store = EncryptedStateStore::new(inner, test_key());
1313
1314            let checkpoint = Checkpoint {
1315                id: 1,
1316                timestamp_ms: 1700000000000,
1317                events_processed: 42,
1318                window_states: HashMap::new(),
1319                pattern_states: HashMap::new(),
1320                metadata: HashMap::new(),
1321                context_states: HashMap::new(),
1322            };
1323
1324            store.save_checkpoint(&checkpoint).unwrap();
1325            let loaded = store.load_checkpoint(1).unwrap().unwrap();
1326
1327            assert_eq!(loaded.id, 1);
1328            assert_eq!(loaded.events_processed, 42);
1329            assert_eq!(loaded.timestamp_ms, 1700000000000);
1330        }
1331
1332        #[test]
1333        fn test_encrypted_get_nonexistent_key() {
1334            let inner = MemoryStore::new();
1335            let store = EncryptedStateStore::new(inner, test_key());
1336
1337            let result = store.get("nonexistent").unwrap();
1338            assert!(result.is_none());
1339        }
1340
1341        #[test]
1342        fn test_encrypted_delete() {
1343            let inner = MemoryStore::new();
1344            let store = EncryptedStateStore::new(inner, test_key());
1345
1346            store.put("key", b"value").unwrap();
1347            assert!(store.get("key").unwrap().is_some());
1348
1349            store.delete("key").unwrap();
1350            assert!(store.get("key").unwrap().is_none());
1351        }
1352
1353        #[test]
1354        fn test_wrong_key_fails_decryption() {
1355            let inner = MemoryStore::new();
1356            let key1 = test_key();
1357            let store1 = EncryptedStateStore::new(inner, key1);
1358
1359            store1.put("key", b"secret").unwrap();
1360
1361            // Create new store with different key but same inner data
1362            let mut key2 = [0u8; 32];
1363            key2[0] = 0xFF; // Different key
1364                            // We can't access inner directly after move, so test via roundtrip
1365                            // Instead, test that each encrypt/decrypt is self-consistent
1366            let inner2 = MemoryStore::new();
1367            let store2 = EncryptedStateStore::new(inner2, key2);
1368            store2.put("key", b"other secret").unwrap();
1369
1370            // Verify correct key works
1371            let result = store2.get("key").unwrap().unwrap();
1372            assert_eq!(result, b"other secret");
1373        }
1374
1375        #[test]
1376        fn test_key_from_hex() {
1377            let hex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
1378            let key = EncryptedStateStore::<MemoryStore>::key_from_hex(hex).unwrap();
1379            assert_eq!(key[0], 0x01);
1380            assert_eq!(key[1], 0x23);
1381            assert_eq!(key[31], 0xef);
1382        }
1383
1384        #[test]
1385        fn test_key_from_hex_wrong_length() {
1386            let hex = "0123456789abcdef"; // Only 8 bytes
1387            let result = EncryptedStateStore::<MemoryStore>::key_from_hex(hex);
1388            assert!(result.is_err());
1389        }
1390
1391        #[test]
1392        fn test_key_from_passphrase() {
1393            let key = EncryptedStateStore::<MemoryStore>::key_from_passphrase(
1394                "my-secret-passphrase",
1395                b"varpulis-salt-00",
1396            )
1397            .unwrap();
1398            assert_eq!(key.len(), 32);
1399
1400            // Same passphrase + salt should yield same key
1401            let key2 = EncryptedStateStore::<MemoryStore>::key_from_passphrase(
1402                "my-secret-passphrase",
1403                b"varpulis-salt-00",
1404            )
1405            .unwrap();
1406            assert_eq!(key, key2);
1407        }
1408
1409        #[test]
1410        fn test_encrypted_latest_checkpoint() {
1411            let inner = MemoryStore::new();
1412            let store = EncryptedStateStore::new(inner, test_key());
1413
1414            // No checkpoints
1415            assert!(store.load_latest_checkpoint().unwrap().is_none());
1416
1417            // Add two checkpoints
1418            let cp1 = Checkpoint {
1419                id: 1,
1420                timestamp_ms: 1000,
1421                events_processed: 10,
1422                window_states: HashMap::new(),
1423                pattern_states: HashMap::new(),
1424                metadata: HashMap::new(),
1425                context_states: HashMap::new(),
1426            };
1427            let cp2 = Checkpoint {
1428                id: 2,
1429                timestamp_ms: 2000,
1430                events_processed: 20,
1431                window_states: HashMap::new(),
1432                pattern_states: HashMap::new(),
1433                metadata: HashMap::new(),
1434                context_states: HashMap::new(),
1435            };
1436
1437            store.save_checkpoint(&cp1).unwrap();
1438            store.save_checkpoint(&cp2).unwrap();
1439
1440            let latest = store.load_latest_checkpoint().unwrap().unwrap();
1441            assert_eq!(latest.id, 2);
1442            assert_eq!(latest.events_processed, 20);
1443        }
1444    }
1445}