celerix_store/engine/
memstore.rs

1use std::collections::HashMap;
2use std::sync::{Arc, RwLock};
3use async_trait::async_trait;
4use crate::{Result, Error, KVReader, KVWriter, AppEnumeration, BatchExporter, GlobalSearcher, Orchestrator, CelerixStore, AppScope, VaultScope};
5use crate::engine::{Persistence, vault};
6
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9type StoreData = HashMap<String, HashMap<String, HashMap<String, serde_json::Value>>>;
10
11pub struct MemStore {
12    data: RwLock<StoreData>,
13    persistence: Option<Arc<Persistence>>,
14    pending_tasks: Arc<AtomicUsize>,
15}
16
17impl MemStore {
18    pub fn new(initial_data: StoreData, persistence: Option<Arc<Persistence>>) -> Self {
19        Self {
20            data: RwLock::new(initial_data),
21            persistence,
22            pending_tasks: Arc::new(AtomicUsize::new(0)),
23        }
24    }
25
26    pub async fn wait(&self) {
27        while self.pending_tasks.load(Ordering::SeqCst) > 0 {
28            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
29        }
30    }
31
32    fn copy_persona_data(&self, persona_id: &str) -> Option<HashMap<String, HashMap<String, serde_json::Value>>> {
33        let data = self.data.read().unwrap();
34        data.get(persona_id).cloned()
35    }
36
37    async fn persist(&self, persona_id: String) {
38        if let Some(p) = &self.persistence {
39            if let Some(persona_data) = self.copy_persona_data(&persona_id) {
40                let p = p.clone();
41                let pending = self.pending_tasks.clone();
42                pending.fetch_add(1, Ordering::SeqCst);
43                tokio::task::spawn_blocking(move || {
44                    if let Err(e) = p.save_persona(&persona_id, &persona_data) {
45                        log::error!("Failed to persist persona {}: {}", persona_id, e);
46                    }
47                    pending.fetch_sub(1, Ordering::SeqCst);
48                });
49            }
50        }
51    }
52}
53
54#[async_trait]
55impl KVReader for MemStore {
56    async fn get(&self, persona_id: &str, app_id: &str, key: &str) -> Result<serde_json::Value> {
57        let data = self.data.read().unwrap();
58        let persona = data.get(persona_id);
59        
60        if persona.is_none() {
61            // Log for debugging if needed
62            // log::debug!("Persona {} not found, checking legacy or empty", persona_id);
63            return Err(Error::PersonaNotFound);
64        }
65
66        persona.unwrap()
67            .get(app_id)
68            .ok_or(Error::AppNotFound)?
69            .get(key)
70            .cloned()
71            .ok_or(Error::KeyNotFound)
72    }
73}
74
75#[async_trait]
76impl KVWriter for MemStore {
77    async fn set(&self, persona_id: &str, app_id: &str, key: &str, value: serde_json::Value) -> Result<()> {
78        {
79            let mut data = self.data.write().unwrap();
80            let persona = data.entry(persona_id.to_string()).or_default();
81            let app = persona.entry(app_id.to_string()).or_default();
82            app.insert(key.to_string(), value);
83        }
84        self.persist(persona_id.to_string()).await;
85        Ok(())
86    }
87
88    async fn delete(&self, persona_id: &str, app_id: &str, key: &str) -> Result<()> {
89        {
90            let mut data = self.data.write().unwrap();
91            if let Some(persona) = data.get_mut(persona_id) {
92                if let Some(app) = persona.get_mut(app_id) {
93                    app.remove(key);
94                }
95            }
96        }
97        self.persist(persona_id.to_string()).await;
98        Ok(())
99    }
100}
101
102#[async_trait]
103impl AppEnumeration for MemStore {
104    async fn get_personas(&self) -> Result<Vec<String>> {
105        let data = self.data.read().unwrap();
106        Ok(data.keys().cloned().collect())
107    }
108
109    async fn get_apps(&self, persona_id: &str) -> Result<Vec<String>> {
110        let data = self.data.read().unwrap();
111        Ok(data.get(persona_id)
112            .map(|p| p.keys().cloned().collect())
113            .unwrap_or_default())
114    }
115}
116
117#[async_trait]
118impl BatchExporter for MemStore {
119    async fn get_app_store(&self, persona_id: &str, app_id: &str) -> Result<HashMap<String, serde_json::Value>> {
120        let data = self.data.read().unwrap();
121        data.get(persona_id)
122            .ok_or(Error::PersonaNotFound)?
123            .get(app_id)
124            .cloned()
125            .ok_or(Error::AppNotFound)
126    }
127
128    async fn dump_app(&self, app_id: &str) -> Result<HashMap<String, HashMap<String, serde_json::Value>>> {
129        let data = self.data.read().unwrap();
130        let mut result = HashMap::new();
131        for (persona_id, apps) in data.iter() {
132            if let Some(app_data) = apps.get(app_id) {
133                result.insert(persona_id.clone(), app_data.clone());
134            }
135        }
136        Ok(result)
137    }
138}
139
140#[async_trait]
141impl GlobalSearcher for MemStore {
142    async fn get_global(&self, app_id: &str, key: &str) -> Result<(serde_json::Value, String)> {
143        let data = self.data.read().unwrap();
144        for (persona_id, apps) in data.iter() {
145            if let Some(app_data) = apps.get(app_id) {
146                if let Some(val) = app_data.get(key) {
147                    return Ok((val.clone(), persona_id.clone()));
148                }
149            }
150        }
151        Err(Error::KeyNotFound)
152    }
153}
154
155#[async_trait]
156impl Orchestrator for MemStore {
157    async fn move_key(&self, src_persona: &str, dst_persona: &str, app_id: &str, key: &str) -> Result<()> {
158        let val = {
159            let mut data = self.data.write().unwrap();
160            let src_persona_data = data.get_mut(src_persona).ok_or(Error::PersonaNotFound)?;
161            let src_app_data = src_persona_data.get_mut(app_id).ok_or(Error::AppNotFound)?;
162            src_app_data.remove(key).ok_or(Error::KeyNotFound)?
163        };
164
165        self.set(dst_persona, app_id, key, val).await?;
166        self.persist(src_persona.to_string()).await;
167        
168        Ok(())
169    }
170}
171
172impl CelerixStore for MemStore {
173    fn app(&self, persona_id: &str, app_id: &str) -> Box<dyn AppScope + '_> {
174        Box::new(MemAppScope {
175            store: self,
176            persona_id: persona_id.to_string(),
177            app_id: app_id.to_string(),
178        })
179    }
180}
181
182pub struct MemAppScope<'a> {
183    store: &'a MemStore,
184    persona_id: String,
185    app_id: String,
186}
187
188#[async_trait]
189impl<'a> AppScope for MemAppScope<'a> {
190    async fn get(&self, key: &str) -> Result<serde_json::Value> {
191        self.store.get(&self.persona_id, &self.app_id, key).await
192    }
193
194    async fn set(&self, key: &str, value: serde_json::Value) -> Result<()> {
195        self.store.set(&self.persona_id, &self.app_id, key, value).await
196    }
197
198    async fn delete(&self, key: &str) -> Result<()> {
199        self.store.delete(&self.persona_id, &self.app_id, key).await
200    }
201
202    fn vault(&self, master_key: &[u8]) -> Box<dyn VaultScope + '_> {
203        Box::new(MemVaultScope {
204            app: self,
205            master_key: master_key.to_vec(),
206        })
207    }
208}
209
210pub struct MemVaultScope<'a> {
211    app: &'a MemAppScope<'a>,
212    master_key: Vec<u8>,
213}
214
215#[async_trait]
216impl<'a> VaultScope for MemVaultScope<'a> {
217    async fn get(&self, key: &str) -> Result<String> {
218        let val = self.app.get(key).await?;
219        let cipher_hex = val.as_str().ok_or_else(|| Error::Internal("Vault data is not a string".to_string()))?;
220        vault::decrypt(cipher_hex, &self.master_key)
221    }
222
223    async fn set(&self, key: &str, plaintext: &str) -> Result<()> {
224        let cipher_hex = vault::encrypt(plaintext, &self.master_key)?;
225        self.app.set(key, serde_json::Value::String(cipher_hex)).await
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use serde_json::json;
233
234    #[tokio::test]
235    async fn test_memstore_get_set() {
236        let store = MemStore::new(HashMap::new(), None);
237        store.set("p1", "app1", "k1", json!("v1")).await.unwrap();
238        
239        let val = store.get("p1", "app1", "k1").await.unwrap();
240        assert_eq!(val, json!("v1"));
241    }
242
243    #[tokio::test]
244    async fn test_memstore_delete() {
245        let store = MemStore::new(HashMap::new(), None);
246        store.set("p1", "app1", "k1", json!("v1")).await.unwrap();
247        store.delete("p1", "app1", "k1").await.unwrap();
248        
249        let res = store.get("p1", "app1", "k1").await;
250        assert!(matches!(res, Err(Error::KeyNotFound)));
251    }
252
253    #[tokio::test]
254    async fn test_move_key() {
255        let store = MemStore::new(HashMap::new(), None);
256        store.set("p1", "app1", "k1", json!("v1")).await.unwrap();
257        store.move_key("p1", "p2", "app1", "k1").await.unwrap();
258        
259        assert!(matches!(store.get("p1", "app1", "k1").await, Err(Error::KeyNotFound)));
260        assert_eq!(store.get("p2", "app1", "k1").await.unwrap(), json!("v1"));
261    }
262
263    #[tokio::test]
264    async fn test_app_scope_and_vault() {
265        let store = MemStore::new(HashMap::new(), None);
266        let master_key = b"thisis32byteslongsecretkey123456";
267
268        let scope = store.app("p1", "a1");
269        scope.set("secret", json!("hidden")).await.unwrap();
270
271        let val = scope.get("secret").await.unwrap();
272        assert_eq!(val, json!("hidden"));
273
274        let v = scope.vault(master_key);
275        v.set("password", "topsecret").await.unwrap();
276
277        let pass = v.get("password").await.unwrap();
278        assert_eq!(pass, "topsecret");
279
280        // Check that it's encrypted in the underlying store
281        let raw = scope.get("password").await.unwrap();
282        assert_ne!(raw, json!("topsecret"));
283        assert!(raw.is_string());
284    }
285}