orchflow_core/
storage.rs

1use async_trait::async_trait;
2use serde_json::Value;
3
4/// Transport-agnostic storage trait for state persistence
5#[async_trait]
6pub trait StateStore: Send + Sync {
7    /// Store a value with a key
8    async fn set(&self, key: &str, value: Value) -> Result<(), String>;
9
10    /// Get a value by key
11    async fn get(&self, key: &str) -> Result<Option<Value>, String>;
12
13    /// Delete a value by key
14    async fn delete(&self, key: &str) -> Result<(), String>;
15
16    /// List all keys with optional prefix
17    async fn list_keys(&self, prefix: Option<&str>) -> Result<Vec<String>, String>;
18
19    /// Batch get multiple values
20    async fn get_many(&self, keys: &[String]) -> Result<Vec<Option<Value>>, String>;
21
22    /// Batch set multiple values
23    async fn set_many(&self, items: &[(String, Value)]) -> Result<(), String>;
24
25    /// Clear all data with optional prefix
26    async fn clear(&self, prefix: Option<&str>) -> Result<(), String>;
27}
28
29/// In-memory implementation for testing
30pub struct MemoryStore {
31    data: tokio::sync::RwLock<std::collections::HashMap<String, Value>>,
32}
33
34impl Default for MemoryStore {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl MemoryStore {
41    pub fn new() -> Self {
42        Self {
43            data: tokio::sync::RwLock::new(std::collections::HashMap::new()),
44        }
45    }
46}
47
48#[async_trait]
49impl StateStore for MemoryStore {
50    async fn set(&self, key: &str, value: Value) -> Result<(), String> {
51        let mut data = self.data.write().await;
52        data.insert(key.to_string(), value);
53        Ok(())
54    }
55
56    async fn get(&self, key: &str) -> Result<Option<Value>, String> {
57        let data = self.data.read().await;
58        Ok(data.get(key).cloned())
59    }
60
61    async fn delete(&self, key: &str) -> Result<(), String> {
62        let mut data = self.data.write().await;
63        data.remove(key);
64        Ok(())
65    }
66
67    async fn list_keys(&self, prefix: Option<&str>) -> Result<Vec<String>, String> {
68        let data = self.data.read().await;
69        let keys: Vec<String> = if let Some(prefix) = prefix {
70            data.keys()
71                .filter(|k| k.starts_with(prefix))
72                .cloned()
73                .collect()
74        } else {
75            data.keys().cloned().collect()
76        };
77        Ok(keys)
78    }
79
80    async fn get_many(&self, keys: &[String]) -> Result<Vec<Option<Value>>, String> {
81        let data = self.data.read().await;
82        let values = keys.iter().map(|k| data.get(k).cloned()).collect();
83        Ok(values)
84    }
85
86    async fn set_many(&self, items: &[(String, Value)]) -> Result<(), String> {
87        let mut data = self.data.write().await;
88        for (key, value) in items {
89            data.insert(key.clone(), value.clone());
90        }
91        Ok(())
92    }
93
94    async fn clear(&self, prefix: Option<&str>) -> Result<(), String> {
95        let mut data = self.data.write().await;
96        if let Some(prefix) = prefix {
97            data.retain(|k, _| !k.starts_with(prefix));
98        } else {
99            data.clear();
100        }
101        Ok(())
102    }
103}