Skip to main content

synwire_agent/vfs/
store.rs

1//! Persistent cross-conversation key-value store VFS provider.
2
3use std::collections::BTreeMap;
4use std::sync::RwLock;
5
6use synwire_core::BoxFuture;
7use synwire_core::vfs::error::VfsError;
8use synwire_core::vfs::grep_options::GrepOptions;
9use synwire_core::vfs::protocol::Vfs;
10use synwire_core::vfs::types::{
11    CpOptions, DirEntry, EditResult, FileContent, GlobEntry, GrepMatch, LsOptions, RmOptions,
12    TransferResult, VfsCapabilities, WriteResult,
13};
14
15/// Namespaced key-value store that delegates to a [`BaseStore`].
16///
17/// In production this wraps a `SQLite` checkpoint.  In tests it wraps an
18/// in-memory map.  All keys are namespaced by `namespace/key`.
19pub trait BaseStore: Send + Sync {
20    /// Read a value.
21    fn get(&self, namespace: &str, key: &str) -> Result<Option<Vec<u8>>, VfsError>;
22    /// Write a value.
23    fn set(&self, namespace: &str, key: &str, value: Vec<u8>) -> Result<(), VfsError>;
24    /// Delete a value.
25    fn delete(&self, namespace: &str, key: &str) -> Result<(), VfsError>;
26    /// List all keys in a namespace.
27    fn list(&self, namespace: &str) -> Result<Vec<String>, VfsError>;
28}
29
30/// In-memory [`BaseStore`] implementation for tests.
31#[derive(Debug, Default)]
32pub struct InMemoryStore {
33    data: RwLock<BTreeMap<String, Vec<u8>>>,
34}
35
36impl InMemoryStore {
37    /// Create a new empty in-memory store.
38    #[must_use]
39    pub fn new() -> Self {
40        Self::default()
41    }
42
43    fn full_key(namespace: &str, key: &str) -> String {
44        format!("{namespace}/{key}")
45    }
46}
47
48impl BaseStore for InMemoryStore {
49    fn get(&self, namespace: &str, key: &str) -> Result<Option<Vec<u8>>, VfsError> {
50        let data = self
51            .data
52            .read()
53            .map_err(|_| VfsError::Unsupported("rwlock poisoned".into()))?;
54        Ok(data.get(&Self::full_key(namespace, key)).cloned())
55    }
56
57    fn set(&self, namespace: &str, key: &str, value: Vec<u8>) -> Result<(), VfsError> {
58        let _ = self
59            .data
60            .write()
61            .map_err(|_| VfsError::Unsupported("rwlock poisoned".into()))?
62            .insert(Self::full_key(namespace, key), value);
63        Ok(())
64    }
65
66    fn delete(&self, namespace: &str, key: &str) -> Result<(), VfsError> {
67        let removed = self
68            .data
69            .write()
70            .map_err(|_| VfsError::Unsupported("rwlock poisoned".into()))?
71            .remove(&Self::full_key(namespace, key));
72        if removed.is_none() {
73            return Err(VfsError::NotFound(key.to_string()));
74        }
75        Ok(())
76    }
77
78    fn list(&self, namespace: &str) -> Result<Vec<String>, VfsError> {
79        let prefix = format!("{namespace}/");
80        let keys = self
81            .data
82            .read()
83            .map_err(|_| VfsError::Unsupported("rwlock poisoned".into()))?
84            .keys()
85            .filter(|k| k.starts_with(&prefix))
86            .map(|k| k[prefix.len()..].to_string())
87            .collect();
88        Ok(keys)
89    }
90}
91
92/// Backend that wraps a [`BaseStore`] and exposes it as a [`Vfs`].
93///
94/// Keys map to paths: `/<namespace>/<key>`.
95pub struct StoreProvider {
96    namespace: String,
97    store: Box<dyn BaseStore>,
98}
99
100impl std::fmt::Debug for StoreProvider {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("StoreProvider")
103            .field("namespace", &self.namespace)
104            .finish_non_exhaustive()
105    }
106}
107
108impl StoreProvider {
109    /// Create a new store backend.
110    pub fn new(namespace: impl Into<String>, store: impl BaseStore + 'static) -> Self {
111        Self {
112            namespace: namespace.into(),
113            store: Box::new(store),
114        }
115    }
116}
117
118impl Vfs for StoreProvider {
119    fn ls(&self, _path: &str, _opts: LsOptions) -> BoxFuture<'_, Result<Vec<DirEntry>, VfsError>> {
120        Box::pin(async move {
121            let keys = self.store.list(&self.namespace)?;
122            let entries = keys
123                .into_iter()
124                .map(|k| DirEntry {
125                    path: format!("/{}/{}", self.namespace, k),
126                    name: k,
127                    is_dir: false,
128                    size: None,
129                    modified: None,
130                    permissions: None,
131                    is_symlink: false,
132                })
133                .collect();
134            Ok(entries)
135        })
136    }
137
138    fn read(&self, path: &str) -> BoxFuture<'_, Result<FileContent, VfsError>> {
139        let key = strip_namespace(path, &self.namespace);
140        Box::pin(async move {
141            let content = self
142                .store
143                .get(&self.namespace, &key)?
144                .ok_or(VfsError::NotFound(key))?;
145            Ok(FileContent {
146                content,
147                mime_type: None,
148            })
149        })
150    }
151
152    fn write(&self, path: &str, content: &[u8]) -> BoxFuture<'_, Result<WriteResult, VfsError>> {
153        let key = strip_namespace(path, &self.namespace);
154        let content = content.to_vec();
155        Box::pin(async move {
156            let bytes = content.len() as u64;
157            self.store.set(&self.namespace, &key, content)?;
158            Ok(WriteResult {
159                path: format!("/{}/{}", self.namespace, key),
160                bytes_written: bytes,
161            })
162        })
163    }
164
165    fn edit(
166        &self,
167        path: &str,
168        old: &str,
169        new: &str,
170    ) -> BoxFuture<'_, Result<EditResult, VfsError>> {
171        let key = strip_namespace(path, &self.namespace);
172        let old = old.to_string();
173        let new = new.to_string();
174        Box::pin(async move {
175            let bytes = self
176                .store
177                .get(&self.namespace, &key)?
178                .ok_or_else(|| VfsError::NotFound(key.clone()))?;
179            let text = String::from_utf8(bytes)
180                .map_err(|_| VfsError::Unsupported("binary content".into()))?;
181            if !text.contains(&old) {
182                return Ok(EditResult {
183                    path: key,
184                    edits_applied: 0,
185                    content_after: Some(text),
186                });
187            }
188            let replaced = text.replacen(&old, &new, 1);
189            let after = replaced.clone();
190            self.store
191                .set(&self.namespace, &key, replaced.into_bytes())?;
192            Ok(EditResult {
193                path: key,
194                edits_applied: 1,
195                content_after: Some(after),
196            })
197        })
198    }
199
200    fn grep(
201        &self,
202        _pattern: &str,
203        _opts: GrepOptions,
204    ) -> BoxFuture<'_, Result<Vec<GrepMatch>, VfsError>> {
205        Box::pin(async {
206            Err(VfsError::Unsupported(
207                "grep not supported on StoreProvider".into(),
208            ))
209        })
210    }
211
212    fn glob(&self, _pattern: &str) -> BoxFuture<'_, Result<Vec<GlobEntry>, VfsError>> {
213        Box::pin(async {
214            Err(VfsError::Unsupported(
215                "glob not supported on StoreProvider".into(),
216            ))
217        })
218    }
219
220    fn upload(&self, _from: &str, _to: &str) -> BoxFuture<'_, Result<TransferResult, VfsError>> {
221        Box::pin(async {
222            Err(VfsError::Unsupported(
223                "upload not supported on StoreProvider".into(),
224            ))
225        })
226    }
227
228    fn download(&self, _from: &str, _to: &str) -> BoxFuture<'_, Result<TransferResult, VfsError>> {
229        Box::pin(async {
230            Err(VfsError::Unsupported(
231                "download not supported on StoreProvider".into(),
232            ))
233        })
234    }
235
236    fn pwd(&self) -> BoxFuture<'_, Result<String, VfsError>> {
237        let ns = self.namespace.clone();
238        Box::pin(async move { Ok(format!("/{ns}")) })
239    }
240
241    fn cd(&self, _path: &str) -> BoxFuture<'_, Result<(), VfsError>> {
242        Box::pin(async {
243            Err(VfsError::Unsupported(
244                "cd not supported on StoreProvider".into(),
245            ))
246        })
247    }
248
249    fn rm(&self, path: &str, _opts: RmOptions) -> BoxFuture<'_, Result<(), VfsError>> {
250        let key = strip_namespace(path, &self.namespace);
251        Box::pin(async move { self.store.delete(&self.namespace, &key) })
252    }
253
254    fn cp(
255        &self,
256        _from: &str,
257        _to: &str,
258        _opts: CpOptions,
259    ) -> BoxFuture<'_, Result<TransferResult, VfsError>> {
260        Box::pin(async {
261            Err(VfsError::Unsupported(
262                "cp not supported on StoreProvider".into(),
263            ))
264        })
265    }
266
267    fn mv_file(&self, _from: &str, _to: &str) -> BoxFuture<'_, Result<TransferResult, VfsError>> {
268        Box::pin(async {
269            Err(VfsError::Unsupported(
270                "mv not supported on StoreProvider".into(),
271            ))
272        })
273    }
274
275    fn capabilities(&self) -> VfsCapabilities {
276        VfsCapabilities::READ | VfsCapabilities::WRITE | VfsCapabilities::RM
277    }
278
279    fn provider_name(&self) -> &'static str {
280        "StoreProvider"
281    }
282}
283
284fn strip_namespace(path: &str, namespace: &str) -> String {
285    let prefix = format!("/{namespace}/");
286    path.strip_prefix(&prefix)
287        .unwrap_or_else(|| path.trim_start_matches('/'))
288        .to_string()
289}
290
291#[cfg(test)]
292#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
293mod tests {
294    use super::*;
295
296    #[tokio::test]
297    async fn test_cross_conversation_persistence() {
298        let store = InMemoryStore::new();
299        let backend = StoreProvider::new("agent1", store);
300
301        let _ = backend
302            .write("/agent1/key1", b"value1")
303            .await
304            .expect("write");
305
306        let content = backend.read("/agent1/key1").await.expect("read");
307        assert_eq!(content.content, b"value1");
308    }
309
310    #[tokio::test]
311    async fn test_namespace_isolation() {
312        use std::sync::Arc;
313
314        struct SharedStore(Arc<InMemoryStore>);
315        impl BaseStore for SharedStore {
316            fn get(&self, ns: &str, key: &str) -> Result<Option<Vec<u8>>, VfsError> {
317                self.0.get(ns, key)
318            }
319            fn set(&self, ns: &str, key: &str, val: Vec<u8>) -> Result<(), VfsError> {
320                self.0.set(ns, key, val)
321            }
322            fn delete(&self, ns: &str, key: &str) -> Result<(), VfsError> {
323                self.0.delete(ns, key)
324            }
325            fn list(&self, ns: &str) -> Result<Vec<String>, VfsError> {
326                self.0.list(ns)
327            }
328        }
329
330        let store = Arc::new(InMemoryStore::new());
331        let b1 = StoreProvider::new("ns1", SharedStore(store.clone()));
332        let b2 = StoreProvider::new("ns2", SharedStore(store.clone()));
333
334        let _ = b1.write("/ns1/k", b"from-ns1").await.expect("write");
335        // ns2 cannot see ns1's key.
336        let err = b2.read("/ns2/k").await;
337        assert!(err.is_err());
338    }
339}