1use crate::error::{Mecha10Error, Result};
49use async_trait::async_trait;
50use serde::{de::DeserializeOwned, Deserialize, Serialize};
51use std::collections::HashMap;
52use std::path::{Path, PathBuf};
53use std::sync::Arc;
54use std::time::{Duration, SystemTime};
55use tokio::fs;
56use tokio::sync::RwLock;
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct VersionedState<T> {
65 pub version: u32,
67
68 pub data: T,
70
71 pub timestamp: u64,
73
74 pub ttl_seconds: Option<u64>,
76}
77
78impl<T> VersionedState<T> {
79 pub fn new(data: T, version: u32) -> Self {
81 Self {
82 version,
83 data,
84 timestamp: now_micros(),
85 ttl_seconds: None,
86 }
87 }
88
89 pub fn with_ttl(data: T, version: u32, ttl: Duration) -> Self {
91 Self {
92 version,
93 data,
94 timestamp: now_micros(),
95 ttl_seconds: Some(ttl.as_secs()),
96 }
97 }
98
99 pub fn is_expired(&self) -> bool {
101 if let Some(ttl_seconds) = self.ttl_seconds {
102 let age_seconds = (now_micros() - self.timestamp) / 1_000_000;
103 age_seconds > ttl_seconds
104 } else {
105 false
106 }
107 }
108
109 pub fn age_seconds(&self) -> u64 {
111 (now_micros() - self.timestamp) / 1_000_000
112 }
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct StateSnapshot {
118 pub id: String,
120
121 pub node_id: String,
123
124 pub timestamp: u64,
126
127 pub description: Option<String>,
129
130 pub size_bytes: usize,
132}
133
134#[derive(Debug, Clone)]
136pub struct StateOptions {
137 pub version: u32,
139
140 pub ttl: Option<Duration>,
142
143 pub create_backup: bool,
145
146 pub compress: bool,
148}
149
150impl Default for StateOptions {
151 fn default() -> Self {
152 Self {
153 version: 1,
154 ttl: None,
155 create_backup: false,
156 compress: false,
157 }
158 }
159}
160
161#[async_trait]
167pub trait StateManager: Send + Sync {
168 async fn save<T: Serialize + Send + Sync>(&self, node_id: &str, state: &T) -> Result<()>;
170
171 async fn save_with_options<T: Serialize + Send + Sync>(
173 &self,
174 node_id: &str,
175 state: &T,
176 options: StateOptions,
177 ) -> Result<()>;
178
179 async fn load<T: DeserializeOwned + Send + Sync>(&self, node_id: &str) -> Result<Option<T>>;
181
182 async fn load_versioned<T: DeserializeOwned + Send + Sync>(
184 &self,
185 node_id: &str,
186 ) -> Result<Option<VersionedState<T>>>;
187
188 async fn delete(&self, node_id: &str) -> Result<()>;
190
191 async fn exists(&self, node_id: &str) -> Result<bool>;
193
194 async fn list_nodes(&self) -> Result<Vec<String>>;
196
197 async fn size(&self, node_id: &str) -> Result<Option<usize>>;
199
200 async fn create_snapshot(&self, node_id: &str, description: Option<String>) -> Result<StateSnapshot>;
202
203 async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()>;
205
206 async fn list_snapshots(&self, node_id: &str) -> Result<Vec<StateSnapshot>>;
208
209 async fn delete_snapshot(&self, snapshot_id: &str) -> Result<()>;
211
212 async fn compare_and_swap<T: Serialize + DeserializeOwned + Send + Sync>(
214 &self,
215 node_id: &str,
216 expected: &T,
217 new: &T,
218 ) -> Result<bool>;
219
220 async fn clear_expired(&self) -> Result<usize>;
222}
223
224#[derive(Clone)]
230pub struct MemoryStateManager {
231 states: Arc<RwLock<HashMap<String, Vec<u8>>>>,
232 #[allow(clippy::type_complexity)]
233 snapshots: Arc<RwLock<HashMap<String, (Vec<u8>, StateSnapshot)>>>,
234}
235
236impl MemoryStateManager {
237 pub fn new() -> Self {
239 Self {
240 states: Arc::new(RwLock::new(HashMap::new())),
241 snapshots: Arc::new(RwLock::new(HashMap::new())),
242 }
243 }
244}
245
246impl Default for MemoryStateManager {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252#[async_trait]
253impl StateManager for MemoryStateManager {
254 async fn save<T: Serialize + Send + Sync>(&self, node_id: &str, state: &T) -> Result<()> {
255 self.save_with_options(node_id, state, StateOptions::default()).await
256 }
257
258 async fn save_with_options<T: Serialize + Send + Sync>(
259 &self,
260 node_id: &str,
261 state: &T,
262 options: StateOptions,
263 ) -> Result<()> {
264 let versioned = if options.ttl.is_some() {
265 VersionedState::with_ttl(state, options.version, options.ttl.unwrap())
266 } else {
267 VersionedState::new(state, options.version)
268 };
269
270 let data = serde_json::to_vec(&versioned).map_err(|e| Mecha10Error::SerializationError {
271 message: format!("Failed to serialize state: {}", e),
272 suggestion: "Ensure state type implements Serialize".to_string(),
273 })?;
274
275 let mut states = self.states.write().await;
276 states.insert(node_id.to_string(), data);
277
278 Ok(())
279 }
280
281 async fn load<T: DeserializeOwned + Send + Sync>(&self, node_id: &str) -> Result<Option<T>> {
282 if let Some(versioned) = self.load_versioned::<T>(node_id).await? {
283 Ok(Some(versioned.data))
284 } else {
285 Ok(None)
286 }
287 }
288
289 async fn load_versioned<T: DeserializeOwned + Send + Sync>(
290 &self,
291 node_id: &str,
292 ) -> Result<Option<VersionedState<T>>> {
293 let states = self.states.read().await;
294
295 if let Some(data) = states.get(node_id) {
296 let versioned: VersionedState<T> =
297 serde_json::from_slice(data).map_err(|e| Mecha10Error::SerializationError {
298 message: format!("Failed to deserialize state: {}", e),
299 suggestion: "State may be corrupted or type mismatch".to_string(),
300 })?;
301
302 if versioned.is_expired() {
304 return Ok(None);
305 }
306
307 Ok(Some(versioned))
308 } else {
309 Ok(None)
310 }
311 }
312
313 async fn delete(&self, node_id: &str) -> Result<()> {
314 let mut states = self.states.write().await;
315 states.remove(node_id);
316 Ok(())
317 }
318
319 async fn exists(&self, node_id: &str) -> Result<bool> {
320 let states = self.states.read().await;
321 Ok(states.contains_key(node_id))
322 }
323
324 async fn list_nodes(&self) -> Result<Vec<String>> {
325 let states = self.states.read().await;
326 Ok(states.keys().cloned().collect())
327 }
328
329 async fn size(&self, node_id: &str) -> Result<Option<usize>> {
330 let states = self.states.read().await;
331 Ok(states.get(node_id).map(|data| data.len()))
332 }
333
334 async fn create_snapshot(&self, node_id: &str, description: Option<String>) -> Result<StateSnapshot> {
335 let states = self.states.read().await;
336
337 let data = states
338 .get(node_id)
339 .ok_or_else(|| Mecha10Error::Configuration(format!("No state for node: {}", node_id)))?
340 .clone();
341
342 let snapshot = StateSnapshot {
343 id: uuid::Uuid::new_v4().to_string(),
344 node_id: node_id.to_string(),
345 timestamp: now_micros(),
346 description,
347 size_bytes: data.len(),
348 };
349
350 let mut snapshots = self.snapshots.write().await;
351 snapshots.insert(snapshot.id.clone(), (data, snapshot.clone()));
352
353 Ok(snapshot)
354 }
355
356 async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
357 let snapshots = self.snapshots.read().await;
358
359 let (data, snapshot) = snapshots
360 .get(snapshot_id)
361 .ok_or_else(|| Mecha10Error::Configuration(format!("Snapshot not found: {}", snapshot_id)))?;
362
363 let mut states = self.states.write().await;
364 states.insert(snapshot.node_id.clone(), data.clone());
365
366 Ok(())
367 }
368
369 async fn list_snapshots(&self, node_id: &str) -> Result<Vec<StateSnapshot>> {
370 let snapshots = self.snapshots.read().await;
371
372 let mut result: Vec<StateSnapshot> = snapshots
373 .values()
374 .filter_map(|(_, snapshot)| {
375 if snapshot.node_id == node_id {
376 Some(snapshot.clone())
377 } else {
378 None
379 }
380 })
381 .collect();
382
383 result.sort_by_key(|s| s.timestamp);
384 result.reverse();
385
386 Ok(result)
387 }
388
389 async fn delete_snapshot(&self, snapshot_id: &str) -> Result<()> {
390 let mut snapshots = self.snapshots.write().await;
391 snapshots.remove(snapshot_id);
392 Ok(())
393 }
394
395 async fn compare_and_swap<T: Serialize + DeserializeOwned + Send + Sync>(
396 &self,
397 node_id: &str,
398 expected: &T,
399 new: &T,
400 ) -> Result<bool> {
401 let mut states = self.states.write().await;
402
403 if let Some(data) = states.get(node_id) {
404 let current: VersionedState<T> =
405 serde_json::from_slice(data).map_err(|e| Mecha10Error::SerializationError {
406 message: format!("Failed to deserialize for CAS: {}", e),
407 suggestion: "State may be corrupted".to_string(),
408 })?;
409
410 let expected_json = serde_json::to_string(expected).unwrap();
411 let current_json = serde_json::to_string(¤t.data).unwrap();
412
413 if expected_json == current_json {
414 let versioned = VersionedState::new(new, current.version + 1);
416 let new_data = serde_json::to_vec(&versioned).unwrap();
417 states.insert(node_id.to_string(), new_data);
418 Ok(true)
419 } else {
420 Ok(false)
422 }
423 } else {
424 Ok(false)
426 }
427 }
428
429 async fn clear_expired(&self) -> Result<usize> {
430 let mut states = self.states.write().await;
431 let mut expired_keys = Vec::new();
432
433 for (node_id, data) in states.iter() {
434 if let Ok(versioned) = serde_json::from_slice::<VersionedState<serde_json::Value>>(data) {
435 if versioned.is_expired() {
436 expired_keys.push(node_id.clone());
437 }
438 }
439 }
440
441 let count = expired_keys.len();
442 for key in expired_keys {
443 states.remove(&key);
444 }
445
446 Ok(count)
447 }
448}
449
450#[derive(Clone)]
456pub struct FilesystemStateManager {
457 base_path: PathBuf,
458}
459
460impl FilesystemStateManager {
461 pub async fn new<P: AsRef<Path>>(base_path: P) -> Result<Self> {
463 let base_path = base_path.as_ref().to_path_buf();
464
465 fs::create_dir_all(&base_path)
467 .await
468 .map_err(|e| Mecha10Error::IoError {
469 message: format!("Failed to create state directory: {}", e),
470 suggestion: "Check directory permissions".to_string(),
471 })?;
472
473 Ok(Self { base_path })
474 }
475
476 fn state_path(&self, node_id: &str) -> PathBuf {
477 self.base_path.join(format!("{}.state.json", node_id))
478 }
479
480 fn snapshot_path(&self, snapshot_id: &str) -> PathBuf {
481 self.base_path
482 .join("snapshots")
483 .join(format!("{}.snapshot.json", snapshot_id))
484 }
485
486 fn snapshot_meta_path(&self, snapshot_id: &str) -> PathBuf {
487 self.base_path
488 .join("snapshots")
489 .join(format!("{}.meta.json", snapshot_id))
490 }
491}
492
493#[async_trait]
494impl StateManager for FilesystemStateManager {
495 async fn save<T: Serialize + Send + Sync>(&self, node_id: &str, state: &T) -> Result<()> {
496 self.save_with_options(node_id, state, StateOptions::default()).await
497 }
498
499 async fn save_with_options<T: Serialize + Send + Sync>(
500 &self,
501 node_id: &str,
502 state: &T,
503 options: StateOptions,
504 ) -> Result<()> {
505 let versioned = if options.ttl.is_some() {
506 VersionedState::with_ttl(state, options.version, options.ttl.unwrap())
507 } else {
508 VersionedState::new(state, options.version)
509 };
510
511 let path = self.state_path(node_id);
512
513 if options.create_backup && path.exists() {
515 let backup_path = path.with_extension("state.json.backup");
516 fs::copy(&path, backup_path).await.ok();
517 }
518
519 let json = serde_json::to_string_pretty(&versioned).map_err(|e| Mecha10Error::SerializationError {
520 message: format!("Failed to serialize state: {}", e),
521 suggestion: "Ensure state type implements Serialize".to_string(),
522 })?;
523
524 fs::write(&path, json).await.map_err(|e| Mecha10Error::IoError {
525 message: format!("Failed to write state file: {}", e),
526 suggestion: "Check file permissions".to_string(),
527 })?;
528
529 Ok(())
530 }
531
532 async fn load<T: DeserializeOwned + Send + Sync>(&self, node_id: &str) -> Result<Option<T>> {
533 if let Some(versioned) = self.load_versioned::<T>(node_id).await? {
534 Ok(Some(versioned.data))
535 } else {
536 Ok(None)
537 }
538 }
539
540 async fn load_versioned<T: DeserializeOwned + Send + Sync>(
541 &self,
542 node_id: &str,
543 ) -> Result<Option<VersionedState<T>>> {
544 let path = self.state_path(node_id);
545
546 if !path.exists() {
547 return Ok(None);
548 }
549
550 let json = fs::read_to_string(&path).await.map_err(|e| Mecha10Error::IoError {
551 message: format!("Failed to read state file: {}", e),
552 suggestion: "Check file permissions".to_string(),
553 })?;
554
555 let versioned: VersionedState<T> =
556 serde_json::from_str(&json).map_err(|e| Mecha10Error::SerializationError {
557 message: format!("Failed to deserialize state: {}", e),
558 suggestion: "State may be corrupted or type mismatch".to_string(),
559 })?;
560
561 if versioned.is_expired() {
563 return Ok(None);
564 }
565
566 Ok(Some(versioned))
567 }
568
569 async fn delete(&self, node_id: &str) -> Result<()> {
570 let path = self.state_path(node_id);
571
572 if path.exists() {
573 fs::remove_file(&path).await.map_err(|e| Mecha10Error::IoError {
574 message: format!("Failed to delete state file: {}", e),
575 suggestion: "Check file permissions".to_string(),
576 })?;
577 }
578
579 Ok(())
580 }
581
582 async fn exists(&self, node_id: &str) -> Result<bool> {
583 Ok(self.state_path(node_id).exists())
584 }
585
586 async fn list_nodes(&self) -> Result<Vec<String>> {
587 let mut nodes = Vec::new();
588 let mut entries = fs::read_dir(&self.base_path).await.map_err(|e| Mecha10Error::IoError {
589 message: format!("Failed to read state directory: {}", e),
590 suggestion: "Check directory permissions".to_string(),
591 })?;
592
593 while let Some(entry) = entries.next_entry().await.map_err(|e| Mecha10Error::IoError {
594 message: format!("Failed to read directory entry: {}", e),
595 suggestion: "Check directory permissions".to_string(),
596 })? {
597 let path = entry.path();
598 if let Some(filename) = path.file_name() {
599 let filename = filename.to_string_lossy();
600 if filename.ends_with(".state.json") {
601 let node_id = filename.trim_end_matches(".state.json").to_string();
602 nodes.push(node_id);
603 }
604 }
605 }
606
607 Ok(nodes)
608 }
609
610 async fn size(&self, node_id: &str) -> Result<Option<usize>> {
611 let path = self.state_path(node_id);
612
613 if !path.exists() {
614 return Ok(None);
615 }
616
617 let metadata = fs::metadata(&path).await.map_err(|e| Mecha10Error::IoError {
618 message: format!("Failed to get file metadata: {}", e),
619 suggestion: "Check file permissions".to_string(),
620 })?;
621
622 Ok(Some(metadata.len() as usize))
623 }
624
625 async fn create_snapshot(&self, node_id: &str, description: Option<String>) -> Result<StateSnapshot> {
626 let state_path = self.state_path(node_id);
627
628 if !state_path.exists() {
629 return Err(Mecha10Error::Configuration(format!("No state for node: {}", node_id)));
630 }
631
632 let snapshot_id = uuid::Uuid::new_v4().to_string();
633
634 let snapshots_dir = self.base_path.join("snapshots");
636 fs::create_dir_all(&snapshots_dir)
637 .await
638 .map_err(|e| Mecha10Error::IoError {
639 message: format!("Failed to create snapshots directory: {}", e),
640 suggestion: "Check directory permissions".to_string(),
641 })?;
642
643 let snapshot_path = self.snapshot_path(&snapshot_id);
644 fs::copy(&state_path, &snapshot_path)
645 .await
646 .map_err(|e| Mecha10Error::IoError {
647 message: format!("Failed to create snapshot: {}", e),
648 suggestion: "Check file permissions".to_string(),
649 })?;
650
651 let size_bytes = self.size(node_id).await?.unwrap_or(0);
652
653 let snapshot = StateSnapshot {
654 id: snapshot_id.clone(),
655 node_id: node_id.to_string(),
656 timestamp: now_micros(),
657 description,
658 size_bytes,
659 };
660
661 let meta_path = self.snapshot_meta_path(&snapshot_id);
663 let meta_json = serde_json::to_string_pretty(&snapshot).unwrap();
664 fs::write(&meta_path, meta_json)
665 .await
666 .map_err(|e| Mecha10Error::IoError {
667 message: format!("Failed to write snapshot metadata: {}", e),
668 suggestion: "Check file permissions".to_string(),
669 })?;
670
671 Ok(snapshot)
672 }
673
674 async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
675 let snapshot_path = self.snapshot_path(snapshot_id);
676 let meta_path = self.snapshot_meta_path(snapshot_id);
677
678 if !snapshot_path.exists() || !meta_path.exists() {
679 return Err(Mecha10Error::Configuration(format!(
680 "Snapshot not found: {}",
681 snapshot_id
682 )));
683 }
684
685 let meta_json = fs::read_to_string(&meta_path)
686 .await
687 .map_err(|e| Mecha10Error::IoError {
688 message: format!("Failed to read snapshot metadata: {}", e),
689 suggestion: "Check file permissions".to_string(),
690 })?;
691
692 let snapshot: StateSnapshot =
693 serde_json::from_str(&meta_json).map_err(|e| Mecha10Error::SerializationError {
694 message: format!("Failed to deserialize snapshot metadata: {}", e),
695 suggestion: "Snapshot metadata may be corrupted".to_string(),
696 })?;
697
698 let state_path = self.state_path(&snapshot.node_id);
699 fs::copy(&snapshot_path, &state_path)
700 .await
701 .map_err(|e| Mecha10Error::IoError {
702 message: format!("Failed to restore snapshot: {}", e),
703 suggestion: "Check file permissions".to_string(),
704 })?;
705
706 Ok(())
707 }
708
709 async fn list_snapshots(&self, node_id: &str) -> Result<Vec<StateSnapshot>> {
710 let snapshots_dir = self.base_path.join("snapshots");
711
712 if !snapshots_dir.exists() {
713 return Ok(Vec::new());
714 }
715
716 let mut snapshots = Vec::new();
717 let mut entries = fs::read_dir(&snapshots_dir).await.map_err(|e| Mecha10Error::IoError {
718 message: format!("Failed to read snapshots directory: {}", e),
719 suggestion: "Check directory permissions".to_string(),
720 })?;
721
722 while let Some(entry) = entries.next_entry().await.map_err(|e| Mecha10Error::IoError {
723 message: format!("Failed to read directory entry: {}", e),
724 suggestion: "Check directory permissions".to_string(),
725 })? {
726 let path = entry.path();
727 if let Some(filename) = path.file_name() {
728 let filename = filename.to_string_lossy();
729 if filename.ends_with(".meta.json") {
730 let json = fs::read_to_string(&path).await.ok();
731 if let Some(json) = json {
732 if let Ok(snapshot) = serde_json::from_str::<StateSnapshot>(&json) {
733 if snapshot.node_id == node_id {
734 snapshots.push(snapshot);
735 }
736 }
737 }
738 }
739 }
740 }
741
742 snapshots.sort_by_key(|s| s.timestamp);
743 snapshots.reverse();
744
745 Ok(snapshots)
746 }
747
748 async fn delete_snapshot(&self, snapshot_id: &str) -> Result<()> {
749 let snapshot_path = self.snapshot_path(snapshot_id);
750 let meta_path = self.snapshot_meta_path(snapshot_id);
751
752 if snapshot_path.exists() {
753 fs::remove_file(&snapshot_path).await.ok();
754 }
755
756 if meta_path.exists() {
757 fs::remove_file(&meta_path).await.ok();
758 }
759
760 Ok(())
761 }
762
763 async fn compare_and_swap<T: Serialize + DeserializeOwned + Send + Sync>(
764 &self,
765 node_id: &str,
766 expected: &T,
767 new: &T,
768 ) -> Result<bool> {
769 let path = self.state_path(node_id);
770
771 if !path.exists() {
772 return Ok(false);
773 }
774
775 let json = fs::read_to_string(&path).await.map_err(|e| Mecha10Error::IoError {
776 message: format!("Failed to read state file: {}", e),
777 suggestion: "Check file permissions".to_string(),
778 })?;
779
780 let current: VersionedState<T> = serde_json::from_str(&json).map_err(|e| Mecha10Error::SerializationError {
781 message: format!("Failed to deserialize for CAS: {}", e),
782 suggestion: "State may be corrupted".to_string(),
783 })?;
784
785 let expected_json = serde_json::to_string(expected).unwrap();
786 let current_json = serde_json::to_string(¤t.data).unwrap();
787
788 if expected_json == current_json {
789 let versioned = VersionedState::new(new, current.version + 1);
791 let new_json = serde_json::to_string_pretty(&versioned).unwrap();
792 fs::write(&path, new_json).await.map_err(|e| Mecha10Error::IoError {
793 message: format!("Failed to write state file: {}", e),
794 suggestion: "Check file permissions".to_string(),
795 })?;
796 Ok(true)
797 } else {
798 Ok(false)
800 }
801 }
802
803 async fn clear_expired(&self) -> Result<usize> {
804 let nodes = self.list_nodes().await?;
805 let mut count = 0;
806
807 for node_id in nodes {
808 if let Ok(Some(versioned)) = self.load_versioned::<serde_json::Value>(&node_id).await {
809 if versioned.is_expired() {
810 self.delete(&node_id).await?;
811 count += 1;
812 }
813 }
814 }
815
816 Ok(count)
817 }
818}
819
820#[derive(Clone)]
827pub enum ConcreteStateManager {
828 Memory(MemoryStateManager),
829 Filesystem(FilesystemStateManager),
830}
831
832impl ConcreteStateManager {
833 pub async fn save<T: Serialize + Send + Sync>(&self, node_id: &str, state: &T) -> Result<()> {
834 match self {
835 Self::Memory(m) => m.save(node_id, state).await,
836 Self::Filesystem(f) => f.save(node_id, state).await,
837 }
838 }
839
840 pub async fn save_with_options<T: Serialize + Send + Sync>(
841 &self,
842 node_id: &str,
843 state: &T,
844 options: StateOptions,
845 ) -> Result<()> {
846 match self {
847 Self::Memory(m) => m.save_with_options(node_id, state, options).await,
848 Self::Filesystem(f) => f.save_with_options(node_id, state, options).await,
849 }
850 }
851
852 pub async fn load<T: DeserializeOwned + Send + Sync>(&self, node_id: &str) -> Result<Option<T>> {
853 match self {
854 Self::Memory(m) => m.load(node_id).await,
855 Self::Filesystem(f) => f.load(node_id).await,
856 }
857 }
858
859 pub async fn load_versioned<T: DeserializeOwned + Send + Sync>(
860 &self,
861 node_id: &str,
862 ) -> Result<Option<VersionedState<T>>> {
863 match self {
864 Self::Memory(m) => m.load_versioned(node_id).await,
865 Self::Filesystem(f) => f.load_versioned(node_id).await,
866 }
867 }
868
869 pub async fn delete(&self, node_id: &str) -> Result<()> {
870 match self {
871 Self::Memory(m) => m.delete(node_id).await,
872 Self::Filesystem(f) => f.delete(node_id).await,
873 }
874 }
875
876 pub async fn exists(&self, node_id: &str) -> Result<bool> {
877 match self {
878 Self::Memory(m) => m.exists(node_id).await,
879 Self::Filesystem(f) => f.exists(node_id).await,
880 }
881 }
882
883 pub async fn list_nodes(&self) -> Result<Vec<String>> {
884 match self {
885 Self::Memory(m) => m.list_nodes().await,
886 Self::Filesystem(f) => f.list_nodes().await,
887 }
888 }
889
890 pub async fn size(&self, node_id: &str) -> Result<Option<usize>> {
891 match self {
892 Self::Memory(m) => m.size(node_id).await,
893 Self::Filesystem(f) => f.size(node_id).await,
894 }
895 }
896
897 pub async fn create_snapshot(&self, node_id: &str, description: Option<String>) -> Result<StateSnapshot> {
898 match self {
899 Self::Memory(m) => m.create_snapshot(node_id, description).await,
900 Self::Filesystem(f) => f.create_snapshot(node_id, description).await,
901 }
902 }
903
904 pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()> {
905 match self {
906 Self::Memory(m) => m.restore_snapshot(snapshot_id).await,
907 Self::Filesystem(f) => f.restore_snapshot(snapshot_id).await,
908 }
909 }
910
911 pub async fn list_snapshots(&self, node_id: &str) -> Result<Vec<StateSnapshot>> {
912 match self {
913 Self::Memory(m) => m.list_snapshots(node_id).await,
914 Self::Filesystem(f) => f.list_snapshots(node_id).await,
915 }
916 }
917
918 pub async fn delete_snapshot(&self, snapshot_id: &str) -> Result<()> {
919 match self {
920 Self::Memory(m) => m.delete_snapshot(snapshot_id).await,
921 Self::Filesystem(f) => f.delete_snapshot(snapshot_id).await,
922 }
923 }
924
925 pub async fn compare_and_swap<T: Serialize + DeserializeOwned + Send + Sync>(
926 &self,
927 node_id: &str,
928 expected: &T,
929 new_value: &T,
930 ) -> Result<bool> {
931 match self {
932 Self::Memory(m) => m.compare_and_swap(node_id, expected, new_value).await,
933 Self::Filesystem(f) => f.compare_and_swap(node_id, expected, new_value).await,
934 }
935 }
936}
937
938fn now_micros() -> u64 {
943 SystemTime::now()
944 .duration_since(SystemTime::UNIX_EPOCH)
945 .unwrap()
946 .as_micros() as u64
947}
948
949