mecha10_core/
state.rs

1//! Persistent State Storage Abstraction for Nodes
2//!
3//! This module provides a unified interface for nodes to persist and restore
4//! state across restarts, crashes, or deployments.
5//!
6//! # Features
7//!
8//! - Multiple storage backends (in-memory, filesystem, Redis, PostgreSQL)
9//! - Type-safe state serialization/deserialization
10//! - Versioned state with migration support
11//! - Atomic updates with compare-and-swap
12//! - State snapshots and restoration
13//! - TTL support for temporary state
14//! - Namespace isolation per node
15//!
16//! # Usage
17//!
18//! ```rust
19//! use mecha10::prelude::*;
20//!
21//! #[derive(Debug, Clone, Serialize, Deserialize)]
22//! struct MyNodeState {
23//!     counter: u64,
24//!     last_position: (f64, f64),
25//!     calibration_data: Vec<f32>,
26//! }
27//!
28//! async fn example(ctx: &Context) -> Result<()> {
29//!     let state_mgr = ctx.state_manager();
30//!
31//!     // Save state
32//!     let state = MyNodeState {
33//!         counter: 42,
34//!         last_position: (1.5, 2.3),
35//!         calibration_data: vec![0.1, 0.2, 0.3],
36//!     };
37//!     state_mgr.save("my_node", &state).await?;
38//!
39//!     // Load state
40//!     if let Some(restored) = state_mgr.load::<MyNodeState>("my_node").await? {
41//!         println!("Restored state: {:?}", restored);
42//!     }
43//!
44//!     Ok(())
45//! }
46//! ```
47
48use crate::error::{Mecha10Error, Result};
49use async_trait::async_trait;
50use serde::{de::DeserializeOwned, Deserialize, Serialize};
51use std::collections::HashMap;
52use std::path::{Path, PathBuf};
53use std::sync::Arc;
54use std::time::{Duration, SystemTime};
55use tokio::fs;
56use tokio::sync::RwLock;
57
58// ============================================================================
59// Core Types
60// ============================================================================
61
62/// Versioned state wrapper for migration support
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct VersionedState<T> {
65    /// Schema version for migration
66    pub version: u32,
67
68    /// Actual state data
69    pub data: T,
70
71    /// Timestamp when state was saved
72    pub timestamp: u64,
73
74    /// Optional TTL in seconds
75    pub ttl_seconds: Option<u64>,
76}
77
78impl<T> VersionedState<T> {
79    /// Create new versioned state
80    pub fn new(data: T, version: u32) -> Self {
81        Self {
82            version,
83            data,
84            timestamp: now_micros(),
85            ttl_seconds: None,
86        }
87    }
88
89    /// Create new versioned state with TTL
90    pub fn with_ttl(data: T, version: u32, ttl: Duration) -> Self {
91        Self {
92            version,
93            data,
94            timestamp: now_micros(),
95            ttl_seconds: Some(ttl.as_secs()),
96        }
97    }
98
99    /// Check if state has expired based on TTL
100    pub fn is_expired(&self) -> bool {
101        if let Some(ttl_seconds) = self.ttl_seconds {
102            let age_seconds = (now_micros() - self.timestamp) / 1_000_000;
103            age_seconds > ttl_seconds
104        } else {
105            false
106        }
107    }
108
109    /// Get age of state in seconds
110    pub fn age_seconds(&self) -> u64 {
111        (now_micros() - self.timestamp) / 1_000_000
112    }
113}
114
115/// State snapshot metadata
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct StateSnapshot {
118    /// Snapshot identifier
119    pub id: String,
120
121    /// Node ID this snapshot belongs to
122    pub node_id: String,
123
124    /// Timestamp when snapshot was taken
125    pub timestamp: u64,
126
127    /// Optional description
128    pub description: Option<String>,
129
130    /// Size in bytes
131    pub size_bytes: usize,
132}
133
134/// Options for state operations
135#[derive(Debug, Clone)]
136pub struct StateOptions {
137    /// Version for schema migration
138    pub version: u32,
139
140    /// Optional TTL
141    pub ttl: Option<Duration>,
142
143    /// Whether to create backup before overwrite
144    pub create_backup: bool,
145
146    /// Whether to compress state
147    pub compress: bool,
148}
149
150impl Default for StateOptions {
151    fn default() -> Self {
152        Self {
153            version: 1,
154            ttl: None,
155            create_backup: false,
156            compress: false,
157        }
158    }
159}
160
161// ============================================================================
162// StateManager Trait
163// ============================================================================
164
165/// Persistent state storage abstraction
166#[async_trait]
167pub trait StateManager: Send + Sync {
168    /// Save state for a node
169    async fn save<T: Serialize + Send + Sync>(&self, node_id: &str, state: &T) -> Result<()>;
170
171    /// Save state with options
172    async fn save_with_options<T: Serialize + Send + Sync>(
173        &self,
174        node_id: &str,
175        state: &T,
176        options: StateOptions,
177    ) -> Result<()>;
178
179    /// Load state for a node
180    async fn load<T: DeserializeOwned + Send + Sync>(&self, node_id: &str) -> Result<Option<T>>;
181
182    /// Load versioned state with migration support
183    async fn load_versioned<T: DeserializeOwned + Send + Sync>(
184        &self,
185        node_id: &str,
186    ) -> Result<Option<VersionedState<T>>>;
187
188    /// Delete state for a node
189    async fn delete(&self, node_id: &str) -> Result<()>;
190
191    /// Check if state exists for a node
192    async fn exists(&self, node_id: &str) -> Result<bool>;
193
194    /// List all node IDs with stored state
195    async fn list_nodes(&self) -> Result<Vec<String>>;
196
197    /// Get state size in bytes
198    async fn size(&self, node_id: &str) -> Result<Option<usize>>;
199
200    /// Create a snapshot of current state
201    async fn create_snapshot(&self, node_id: &str, description: Option<String>) -> Result<StateSnapshot>;
202
203    /// Restore from a snapshot
204    async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()>;
205
206    /// List snapshots for a node
207    async fn list_snapshots(&self, node_id: &str) -> Result<Vec<StateSnapshot>>;
208
209    /// Delete a snapshot
210    async fn delete_snapshot(&self, snapshot_id: &str) -> Result<()>;
211
212    /// Atomic compare-and-swap operation
213    async fn compare_and_swap<T: Serialize + DeserializeOwned + Send + Sync>(
214        &self,
215        node_id: &str,
216        expected: &T,
217        new: &T,
218    ) -> Result<bool>;
219
220    /// Clear all expired states (based on TTL)
221    async fn clear_expired(&self) -> Result<usize>;
222}
223
224// ============================================================================
225// In-Memory Backend
226// ============================================================================
227
228/// In-memory state storage (not persistent across restarts)
229#[derive(Clone)]
230pub struct MemoryStateManager {
231    states: Arc<RwLock<HashMap<String, Vec<u8>>>>,
232    #[allow(clippy::type_complexity)]
233    snapshots: Arc<RwLock<HashMap<String, (Vec<u8>, StateSnapshot)>>>,
234}
235
236impl MemoryStateManager {
237    /// Create new in-memory state manager
238    pub fn new() -> Self {
239        Self {
240            states: Arc::new(RwLock::new(HashMap::new())),
241            snapshots: Arc::new(RwLock::new(HashMap::new())),
242        }
243    }
244}
245
246impl Default for MemoryStateManager {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252#[async_trait]
253impl StateManager for MemoryStateManager {
254    async fn save<T: Serialize + Send + Sync>(&self, node_id: &str, state: &T) -> Result<()> {
255        self.save_with_options(node_id, state, StateOptions::default()).await
256    }
257
258    async fn save_with_options<T: Serialize + Send + Sync>(
259        &self,
260        node_id: &str,
261        state: &T,
262        options: StateOptions,
263    ) -> Result<()> {
264        let versioned = if options.ttl.is_some() {
265            VersionedState::with_ttl(state, options.version, options.ttl.unwrap())
266        } else {
267            VersionedState::new(state, options.version)
268        };
269
270        let data = serde_json::to_vec(&versioned).map_err(|e| Mecha10Error::SerializationError {
271            message: format!("Failed to serialize state: {}", e),
272            suggestion: "Ensure state type implements Serialize".to_string(),
273        })?;
274
275        let mut states = self.states.write().await;
276        states.insert(node_id.to_string(), data);
277
278        Ok(())
279    }
280
281    async fn load<T: DeserializeOwned + Send + Sync>(&self, node_id: &str) -> Result<Option<T>> {
282        if let Some(versioned) = self.load_versioned::<T>(node_id).await? {
283            Ok(Some(versioned.data))
284        } else {
285            Ok(None)
286        }
287    }
288
289    async fn load_versioned<T: DeserializeOwned + Send + Sync>(
290        &self,
291        node_id: &str,
292    ) -> Result<Option<VersionedState<T>>> {
293        let states = self.states.read().await;
294
295        if let Some(data) = states.get(node_id) {
296            let versioned: VersionedState<T> =
297                serde_json::from_slice(data).map_err(|e| Mecha10Error::SerializationError {
298                    message: format!("Failed to deserialize state: {}", e),
299                    suggestion: "State may be corrupted or type mismatch".to_string(),
300                })?;
301
302            // Check TTL expiration
303            if versioned.is_expired() {
304                return Ok(None);
305            }
306
307            Ok(Some(versioned))
308        } else {
309            Ok(None)
310        }
311    }
312
313    async fn delete(&self, node_id: &str) -> Result<()> {
314        let mut states = self.states.write().await;
315        states.remove(node_id);
316        Ok(())
317    }
318
319    async fn exists(&self, node_id: &str) -> Result<bool> {
320        let states = self.states.read().await;
321        Ok(states.contains_key(node_id))
322    }
323
324    async fn list_nodes(&self) -> Result<Vec<String>> {
325        let states = self.states.read().await;
326        Ok(states.keys().cloned().collect())
327    }
328
329    async fn size(&self, node_id: &str) -> Result<Option<usize>> {
330        let states = self.states.read().await;
331        Ok(states.get(node_id).map(|data| data.len()))
332    }
333
334    async fn create_snapshot(&self, node_id: &str, description: Option<String>) -> Result<StateSnapshot> {
335        let states = self.states.read().await;
336
337        let data = states
338            .get(node_id)
339            .ok_or_else(|| Mecha10Error::Configuration(format!("No state for node: {}", node_id)))?
340            .clone();
341
342        let snapshot = StateSnapshot {
343            id: uuid::Uuid::new_v4().to_string(),
344            node_id: node_id.to_string(),
345            timestamp: now_micros(),
346            description,
347            size_bytes: data.len(),
348        };
349
350        let mut snapshots = self.snapshots.write().await;
351        snapshots.insert(snapshot.id.clone(), (data, snapshot.clone()));
352
353        Ok(snapshot)
354    }
355
356    async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
357        let snapshots = self.snapshots.read().await;
358
359        let (data, snapshot) = snapshots
360            .get(snapshot_id)
361            .ok_or_else(|| Mecha10Error::Configuration(format!("Snapshot not found: {}", snapshot_id)))?;
362
363        let mut states = self.states.write().await;
364        states.insert(snapshot.node_id.clone(), data.clone());
365
366        Ok(())
367    }
368
369    async fn list_snapshots(&self, node_id: &str) -> Result<Vec<StateSnapshot>> {
370        let snapshots = self.snapshots.read().await;
371
372        let mut result: Vec<StateSnapshot> = snapshots
373            .values()
374            .filter_map(|(_, snapshot)| {
375                if snapshot.node_id == node_id {
376                    Some(snapshot.clone())
377                } else {
378                    None
379                }
380            })
381            .collect();
382
383        result.sort_by_key(|s| s.timestamp);
384        result.reverse();
385
386        Ok(result)
387    }
388
389    async fn delete_snapshot(&self, snapshot_id: &str) -> Result<()> {
390        let mut snapshots = self.snapshots.write().await;
391        snapshots.remove(snapshot_id);
392        Ok(())
393    }
394
395    async fn compare_and_swap<T: Serialize + DeserializeOwned + Send + Sync>(
396        &self,
397        node_id: &str,
398        expected: &T,
399        new: &T,
400    ) -> Result<bool> {
401        let mut states = self.states.write().await;
402
403        if let Some(data) = states.get(node_id) {
404            let current: VersionedState<T> =
405                serde_json::from_slice(data).map_err(|e| Mecha10Error::SerializationError {
406                    message: format!("Failed to deserialize for CAS: {}", e),
407                    suggestion: "State may be corrupted".to_string(),
408                })?;
409
410            let expected_json = serde_json::to_string(expected).unwrap();
411            let current_json = serde_json::to_string(&current.data).unwrap();
412
413            if expected_json == current_json {
414                // Match - perform swap
415                let versioned = VersionedState::new(new, current.version + 1);
416                let new_data = serde_json::to_vec(&versioned).unwrap();
417                states.insert(node_id.to_string(), new_data);
418                Ok(true)
419            } else {
420                // No match
421                Ok(false)
422            }
423        } else {
424            // No existing state
425            Ok(false)
426        }
427    }
428
429    async fn clear_expired(&self) -> Result<usize> {
430        let mut states = self.states.write().await;
431        let mut expired_keys = Vec::new();
432
433        for (node_id, data) in states.iter() {
434            if let Ok(versioned) = serde_json::from_slice::<VersionedState<serde_json::Value>>(data) {
435                if versioned.is_expired() {
436                    expired_keys.push(node_id.clone());
437                }
438            }
439        }
440
441        let count = expired_keys.len();
442        for key in expired_keys {
443            states.remove(&key);
444        }
445
446        Ok(count)
447    }
448}
449
450// ============================================================================
451// Filesystem Backend
452// ============================================================================
453
454/// Filesystem-based state storage
455#[derive(Clone)]
456pub struct FilesystemStateManager {
457    base_path: PathBuf,
458}
459
460impl FilesystemStateManager {
461    /// Create new filesystem state manager
462    pub async fn new<P: AsRef<Path>>(base_path: P) -> Result<Self> {
463        let base_path = base_path.as_ref().to_path_buf();
464
465        // Create directory if it doesn't exist
466        fs::create_dir_all(&base_path)
467            .await
468            .map_err(|e| Mecha10Error::IoError {
469                message: format!("Failed to create state directory: {}", e),
470                suggestion: "Check directory permissions".to_string(),
471            })?;
472
473        Ok(Self { base_path })
474    }
475
476    fn state_path(&self, node_id: &str) -> PathBuf {
477        self.base_path.join(format!("{}.state.json", node_id))
478    }
479
480    fn snapshot_path(&self, snapshot_id: &str) -> PathBuf {
481        self.base_path
482            .join("snapshots")
483            .join(format!("{}.snapshot.json", snapshot_id))
484    }
485
486    fn snapshot_meta_path(&self, snapshot_id: &str) -> PathBuf {
487        self.base_path
488            .join("snapshots")
489            .join(format!("{}.meta.json", snapshot_id))
490    }
491}
492
493#[async_trait]
494impl StateManager for FilesystemStateManager {
495    async fn save<T: Serialize + Send + Sync>(&self, node_id: &str, state: &T) -> Result<()> {
496        self.save_with_options(node_id, state, StateOptions::default()).await
497    }
498
499    async fn save_with_options<T: Serialize + Send + Sync>(
500        &self,
501        node_id: &str,
502        state: &T,
503        options: StateOptions,
504    ) -> Result<()> {
505        let versioned = if options.ttl.is_some() {
506            VersionedState::with_ttl(state, options.version, options.ttl.unwrap())
507        } else {
508            VersionedState::new(state, options.version)
509        };
510
511        let path = self.state_path(node_id);
512
513        // Create backup if requested
514        if options.create_backup && path.exists() {
515            let backup_path = path.with_extension("state.json.backup");
516            fs::copy(&path, backup_path).await.ok();
517        }
518
519        let json = serde_json::to_string_pretty(&versioned).map_err(|e| Mecha10Error::SerializationError {
520            message: format!("Failed to serialize state: {}", e),
521            suggestion: "Ensure state type implements Serialize".to_string(),
522        })?;
523
524        fs::write(&path, json).await.map_err(|e| Mecha10Error::IoError {
525            message: format!("Failed to write state file: {}", e),
526            suggestion: "Check file permissions".to_string(),
527        })?;
528
529        Ok(())
530    }
531
532    async fn load<T: DeserializeOwned + Send + Sync>(&self, node_id: &str) -> Result<Option<T>> {
533        if let Some(versioned) = self.load_versioned::<T>(node_id).await? {
534            Ok(Some(versioned.data))
535        } else {
536            Ok(None)
537        }
538    }
539
540    async fn load_versioned<T: DeserializeOwned + Send + Sync>(
541        &self,
542        node_id: &str,
543    ) -> Result<Option<VersionedState<T>>> {
544        let path = self.state_path(node_id);
545
546        if !path.exists() {
547            return Ok(None);
548        }
549
550        let json = fs::read_to_string(&path).await.map_err(|e| Mecha10Error::IoError {
551            message: format!("Failed to read state file: {}", e),
552            suggestion: "Check file permissions".to_string(),
553        })?;
554
555        let versioned: VersionedState<T> =
556            serde_json::from_str(&json).map_err(|e| Mecha10Error::SerializationError {
557                message: format!("Failed to deserialize state: {}", e),
558                suggestion: "State may be corrupted or type mismatch".to_string(),
559            })?;
560
561        // Check TTL expiration
562        if versioned.is_expired() {
563            return Ok(None);
564        }
565
566        Ok(Some(versioned))
567    }
568
569    async fn delete(&self, node_id: &str) -> Result<()> {
570        let path = self.state_path(node_id);
571
572        if path.exists() {
573            fs::remove_file(&path).await.map_err(|e| Mecha10Error::IoError {
574                message: format!("Failed to delete state file: {}", e),
575                suggestion: "Check file permissions".to_string(),
576            })?;
577        }
578
579        Ok(())
580    }
581
582    async fn exists(&self, node_id: &str) -> Result<bool> {
583        Ok(self.state_path(node_id).exists())
584    }
585
586    async fn list_nodes(&self) -> Result<Vec<String>> {
587        let mut nodes = Vec::new();
588        let mut entries = fs::read_dir(&self.base_path).await.map_err(|e| Mecha10Error::IoError {
589            message: format!("Failed to read state directory: {}", e),
590            suggestion: "Check directory permissions".to_string(),
591        })?;
592
593        while let Some(entry) = entries.next_entry().await.map_err(|e| Mecha10Error::IoError {
594            message: format!("Failed to read directory entry: {}", e),
595            suggestion: "Check directory permissions".to_string(),
596        })? {
597            let path = entry.path();
598            if let Some(filename) = path.file_name() {
599                let filename = filename.to_string_lossy();
600                if filename.ends_with(".state.json") {
601                    let node_id = filename.trim_end_matches(".state.json").to_string();
602                    nodes.push(node_id);
603                }
604            }
605        }
606
607        Ok(nodes)
608    }
609
610    async fn size(&self, node_id: &str) -> Result<Option<usize>> {
611        let path = self.state_path(node_id);
612
613        if !path.exists() {
614            return Ok(None);
615        }
616
617        let metadata = fs::metadata(&path).await.map_err(|e| Mecha10Error::IoError {
618            message: format!("Failed to get file metadata: {}", e),
619            suggestion: "Check file permissions".to_string(),
620        })?;
621
622        Ok(Some(metadata.len() as usize))
623    }
624
625    async fn create_snapshot(&self, node_id: &str, description: Option<String>) -> Result<StateSnapshot> {
626        let state_path = self.state_path(node_id);
627
628        if !state_path.exists() {
629            return Err(Mecha10Error::Configuration(format!("No state for node: {}", node_id)));
630        }
631
632        let snapshot_id = uuid::Uuid::new_v4().to_string();
633
634        // Ensure snapshots directory exists
635        let snapshots_dir = self.base_path.join("snapshots");
636        fs::create_dir_all(&snapshots_dir)
637            .await
638            .map_err(|e| Mecha10Error::IoError {
639                message: format!("Failed to create snapshots directory: {}", e),
640                suggestion: "Check directory permissions".to_string(),
641            })?;
642
643        let snapshot_path = self.snapshot_path(&snapshot_id);
644        fs::copy(&state_path, &snapshot_path)
645            .await
646            .map_err(|e| Mecha10Error::IoError {
647                message: format!("Failed to create snapshot: {}", e),
648                suggestion: "Check file permissions".to_string(),
649            })?;
650
651        let size_bytes = self.size(node_id).await?.unwrap_or(0);
652
653        let snapshot = StateSnapshot {
654            id: snapshot_id.clone(),
655            node_id: node_id.to_string(),
656            timestamp: now_micros(),
657            description,
658            size_bytes,
659        };
660
661        // Save snapshot metadata
662        let meta_path = self.snapshot_meta_path(&snapshot_id);
663        let meta_json = serde_json::to_string_pretty(&snapshot).unwrap();
664        fs::write(&meta_path, meta_json)
665            .await
666            .map_err(|e| Mecha10Error::IoError {
667                message: format!("Failed to write snapshot metadata: {}", e),
668                suggestion: "Check file permissions".to_string(),
669            })?;
670
671        Ok(snapshot)
672    }
673
674    async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
675        let snapshot_path = self.snapshot_path(snapshot_id);
676        let meta_path = self.snapshot_meta_path(snapshot_id);
677
678        if !snapshot_path.exists() || !meta_path.exists() {
679            return Err(Mecha10Error::Configuration(format!(
680                "Snapshot not found: {}",
681                snapshot_id
682            )));
683        }
684
685        let meta_json = fs::read_to_string(&meta_path)
686            .await
687            .map_err(|e| Mecha10Error::IoError {
688                message: format!("Failed to read snapshot metadata: {}", e),
689                suggestion: "Check file permissions".to_string(),
690            })?;
691
692        let snapshot: StateSnapshot =
693            serde_json::from_str(&meta_json).map_err(|e| Mecha10Error::SerializationError {
694                message: format!("Failed to deserialize snapshot metadata: {}", e),
695                suggestion: "Snapshot metadata may be corrupted".to_string(),
696            })?;
697
698        let state_path = self.state_path(&snapshot.node_id);
699        fs::copy(&snapshot_path, &state_path)
700            .await
701            .map_err(|e| Mecha10Error::IoError {
702                message: format!("Failed to restore snapshot: {}", e),
703                suggestion: "Check file permissions".to_string(),
704            })?;
705
706        Ok(())
707    }
708
709    async fn list_snapshots(&self, node_id: &str) -> Result<Vec<StateSnapshot>> {
710        let snapshots_dir = self.base_path.join("snapshots");
711
712        if !snapshots_dir.exists() {
713            return Ok(Vec::new());
714        }
715
716        let mut snapshots = Vec::new();
717        let mut entries = fs::read_dir(&snapshots_dir).await.map_err(|e| Mecha10Error::IoError {
718            message: format!("Failed to read snapshots directory: {}", e),
719            suggestion: "Check directory permissions".to_string(),
720        })?;
721
722        while let Some(entry) = entries.next_entry().await.map_err(|e| Mecha10Error::IoError {
723            message: format!("Failed to read directory entry: {}", e),
724            suggestion: "Check directory permissions".to_string(),
725        })? {
726            let path = entry.path();
727            if let Some(filename) = path.file_name() {
728                let filename = filename.to_string_lossy();
729                if filename.ends_with(".meta.json") {
730                    let json = fs::read_to_string(&path).await.ok();
731                    if let Some(json) = json {
732                        if let Ok(snapshot) = serde_json::from_str::<StateSnapshot>(&json) {
733                            if snapshot.node_id == node_id {
734                                snapshots.push(snapshot);
735                            }
736                        }
737                    }
738                }
739            }
740        }
741
742        snapshots.sort_by_key(|s| s.timestamp);
743        snapshots.reverse();
744
745        Ok(snapshots)
746    }
747
748    async fn delete_snapshot(&self, snapshot_id: &str) -> Result<()> {
749        let snapshot_path = self.snapshot_path(snapshot_id);
750        let meta_path = self.snapshot_meta_path(snapshot_id);
751
752        if snapshot_path.exists() {
753            fs::remove_file(&snapshot_path).await.ok();
754        }
755
756        if meta_path.exists() {
757            fs::remove_file(&meta_path).await.ok();
758        }
759
760        Ok(())
761    }
762
763    async fn compare_and_swap<T: Serialize + DeserializeOwned + Send + Sync>(
764        &self,
765        node_id: &str,
766        expected: &T,
767        new: &T,
768    ) -> Result<bool> {
769        let path = self.state_path(node_id);
770
771        if !path.exists() {
772            return Ok(false);
773        }
774
775        let json = fs::read_to_string(&path).await.map_err(|e| Mecha10Error::IoError {
776            message: format!("Failed to read state file: {}", e),
777            suggestion: "Check file permissions".to_string(),
778        })?;
779
780        let current: VersionedState<T> = serde_json::from_str(&json).map_err(|e| Mecha10Error::SerializationError {
781            message: format!("Failed to deserialize for CAS: {}", e),
782            suggestion: "State may be corrupted".to_string(),
783        })?;
784
785        let expected_json = serde_json::to_string(expected).unwrap();
786        let current_json = serde_json::to_string(&current.data).unwrap();
787
788        if expected_json == current_json {
789            // Match - perform swap
790            let versioned = VersionedState::new(new, current.version + 1);
791            let new_json = serde_json::to_string_pretty(&versioned).unwrap();
792            fs::write(&path, new_json).await.map_err(|e| Mecha10Error::IoError {
793                message: format!("Failed to write state file: {}", e),
794                suggestion: "Check file permissions".to_string(),
795            })?;
796            Ok(true)
797        } else {
798            // No match
799            Ok(false)
800        }
801    }
802
803    async fn clear_expired(&self) -> Result<usize> {
804        let nodes = self.list_nodes().await?;
805        let mut count = 0;
806
807        for node_id in nodes {
808            if let Ok(Some(versioned)) = self.load_versioned::<serde_json::Value>(&node_id).await {
809                if versioned.is_expired() {
810                    self.delete(&node_id).await?;
811                    count += 1;
812                }
813            }
814        }
815
816        Ok(count)
817    }
818}
819
820// ============================================================================
821// Concrete StateManager enum for trait object compatibility
822// ============================================================================
823
824/// Concrete enum to wrap StateManager implementations
825/// This allows using StateManager without dyn (which isn't object-safe due to generic methods)
826#[derive(Clone)]
827pub enum ConcreteStateManager {
828    Memory(MemoryStateManager),
829    Filesystem(FilesystemStateManager),
830}
831
832impl ConcreteStateManager {
833    pub async fn save<T: Serialize + Send + Sync>(&self, node_id: &str, state: &T) -> Result<()> {
834        match self {
835            Self::Memory(m) => m.save(node_id, state).await,
836            Self::Filesystem(f) => f.save(node_id, state).await,
837        }
838    }
839
840    pub async fn save_with_options<T: Serialize + Send + Sync>(
841        &self,
842        node_id: &str,
843        state: &T,
844        options: StateOptions,
845    ) -> Result<()> {
846        match self {
847            Self::Memory(m) => m.save_with_options(node_id, state, options).await,
848            Self::Filesystem(f) => f.save_with_options(node_id, state, options).await,
849        }
850    }
851
852    pub async fn load<T: DeserializeOwned + Send + Sync>(&self, node_id: &str) -> Result<Option<T>> {
853        match self {
854            Self::Memory(m) => m.load(node_id).await,
855            Self::Filesystem(f) => f.load(node_id).await,
856        }
857    }
858
859    pub async fn load_versioned<T: DeserializeOwned + Send + Sync>(
860        &self,
861        node_id: &str,
862    ) -> Result<Option<VersionedState<T>>> {
863        match self {
864            Self::Memory(m) => m.load_versioned(node_id).await,
865            Self::Filesystem(f) => f.load_versioned(node_id).await,
866        }
867    }
868
869    pub async fn delete(&self, node_id: &str) -> Result<()> {
870        match self {
871            Self::Memory(m) => m.delete(node_id).await,
872            Self::Filesystem(f) => f.delete(node_id).await,
873        }
874    }
875
876    pub async fn exists(&self, node_id: &str) -> Result<bool> {
877        match self {
878            Self::Memory(m) => m.exists(node_id).await,
879            Self::Filesystem(f) => f.exists(node_id).await,
880        }
881    }
882
883    pub async fn list_nodes(&self) -> Result<Vec<String>> {
884        match self {
885            Self::Memory(m) => m.list_nodes().await,
886            Self::Filesystem(f) => f.list_nodes().await,
887        }
888    }
889
890    pub async fn size(&self, node_id: &str) -> Result<Option<usize>> {
891        match self {
892            Self::Memory(m) => m.size(node_id).await,
893            Self::Filesystem(f) => f.size(node_id).await,
894        }
895    }
896
897    pub async fn create_snapshot(&self, node_id: &str, description: Option<String>) -> Result<StateSnapshot> {
898        match self {
899            Self::Memory(m) => m.create_snapshot(node_id, description).await,
900            Self::Filesystem(f) => f.create_snapshot(node_id, description).await,
901        }
902    }
903
904    pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
905        match self {
906            Self::Memory(m) => m.restore_snapshot(snapshot_id).await,
907            Self::Filesystem(f) => f.restore_snapshot(snapshot_id).await,
908        }
909    }
910
911    pub async fn list_snapshots(&self, node_id: &str) -> Result<Vec<StateSnapshot>> {
912        match self {
913            Self::Memory(m) => m.list_snapshots(node_id).await,
914            Self::Filesystem(f) => f.list_snapshots(node_id).await,
915        }
916    }
917
918    pub async fn delete_snapshot(&self, snapshot_id: &str) -> Result<()> {
919        match self {
920            Self::Memory(m) => m.delete_snapshot(snapshot_id).await,
921            Self::Filesystem(f) => f.delete_snapshot(snapshot_id).await,
922        }
923    }
924
925    pub async fn compare_and_swap<T: Serialize + DeserializeOwned + Send + Sync>(
926        &self,
927        node_id: &str,
928        expected: &T,
929        new_value: &T,
930    ) -> Result<bool> {
931        match self {
932            Self::Memory(m) => m.compare_and_swap(node_id, expected, new_value).await,
933            Self::Filesystem(f) => f.compare_and_swap(node_id, expected, new_value).await,
934        }
935    }
936}
937
938// ============================================================================
939// Helper Functions
940// ============================================================================
941
942fn now_micros() -> u64 {
943    SystemTime::now()
944        .duration_since(SystemTime::UNIX_EPOCH)
945        .unwrap()
946        .as_micros() as u64
947}
948
949// ============================================================================
950// Tests
951// ============================================================================