Skip to main content

agent_orchestrator/store/
file.rs

1//! File store backend — filesystem-based persistent store.
2
3use crate::store::{StoreEntry, StoreOp, StoreOpResult};
4use anyhow::{Context, Result};
5use std::path::PathBuf;
6
7/// Filesystem-backed workflow store implementation.
8pub struct FileStoreBackend {
9    data_dir: PathBuf,
10}
11
12impl FileStoreBackend {
13    /// Creates a file-store backend rooted at the orchestrator app directory.
14    pub fn new(data_dir: PathBuf) -> Self {
15        Self { data_dir }
16    }
17
18    /// Executes a store operation against JSON files on disk.
19    pub async fn execute(&self, op: StoreOp) -> Result<StoreOpResult> {
20        match op {
21            StoreOp::Get {
22                store_name,
23                project_id,
24                key,
25            } => self.get(&store_name, &project_id, &key),
26            StoreOp::Put {
27                store_name,
28                project_id,
29                key,
30                value,
31                ..
32            } => self.put(&store_name, &project_id, &key, &value),
33            StoreOp::Delete {
34                store_name,
35                project_id,
36                key,
37            } => self.delete(&store_name, &project_id, &key),
38            StoreOp::List {
39                store_name,
40                project_id,
41                limit,
42                offset,
43            } => self.list(&store_name, &project_id, limit, offset),
44            StoreOp::Prune {
45                store_name,
46                project_id,
47                max_entries,
48                ..
49            } => self.prune(&store_name, &project_id, max_entries),
50        }
51    }
52
53    fn store_dir(&self, store_name: &str, project_id: &str) -> PathBuf {
54        let pid = if project_id.trim().is_empty() {
55            crate::config::DEFAULT_PROJECT_ID
56        } else {
57            project_id
58        };
59        self.data_dir
60            .join("data")
61            .join("stores")
62            .join(store_name)
63            .join(pid)
64    }
65
66    fn entry_path(&self, store_name: &str, project_id: &str, key: &str) -> PathBuf {
67        self.store_dir(store_name, project_id)
68            .join(format!("{}.json", key))
69    }
70
71    fn get(&self, store_name: &str, project_id: &str, key: &str) -> Result<StoreOpResult> {
72        let path = self.entry_path(store_name, project_id, key);
73        if !path.exists() {
74            return Ok(StoreOpResult::Value(None));
75        }
76        let content = std::fs::read_to_string(&path).context("failed to read store entry file")?;
77        let value: serde_json::Value =
78            serde_json::from_str(&content).context("failed to parse store entry JSON")?;
79        Ok(StoreOpResult::Value(Some(value)))
80    }
81
82    fn put(
83        &self,
84        store_name: &str,
85        project_id: &str,
86        key: &str,
87        value: &str,
88    ) -> Result<StoreOpResult> {
89        let path = self.entry_path(store_name, project_id, key);
90        if let Some(parent) = path.parent() {
91            std::fs::create_dir_all(parent).context("failed to create store directory")?;
92        }
93        std::fs::write(&path, value).context("failed to write store entry file")?;
94        Ok(StoreOpResult::Ok)
95    }
96
97    fn delete(&self, store_name: &str, project_id: &str, key: &str) -> Result<StoreOpResult> {
98        let path = self.entry_path(store_name, project_id, key);
99        if path.exists() {
100            std::fs::remove_file(&path).context("failed to delete store entry file")?;
101        }
102        Ok(StoreOpResult::Ok)
103    }
104
105    fn list(
106        &self,
107        store_name: &str,
108        project_id: &str,
109        limit: u64,
110        offset: u64,
111    ) -> Result<StoreOpResult> {
112        let dir = self.store_dir(store_name, project_id);
113        if !dir.exists() {
114            return Ok(StoreOpResult::Entries(vec![]));
115        }
116
117        let mut entries: Vec<StoreEntry> = Vec::new();
118        let read_dir = std::fs::read_dir(&dir).context("failed to read store directory")?;
119
120        let mut files: Vec<_> = read_dir
121            .filter_map(|e| e.ok())
122            .filter(|e| e.path().extension().is_some_and(|ext| ext == "json"))
123            .collect();
124
125        // Sort by modification time (newest first)
126        files.sort_by(|a, b| {
127            let t_a = a.metadata().and_then(|m| m.modified()).ok();
128            let t_b = b.metadata().and_then(|m| m.modified()).ok();
129            t_b.cmp(&t_a)
130        });
131
132        for entry in files.into_iter().skip(offset as usize).take(limit as usize) {
133            let path = entry.path();
134            let key = path
135                .file_stem()
136                .and_then(|s| s.to_str())
137                .unwrap_or("")
138                .to_string();
139            let content = match std::fs::read_to_string(&path) {
140                Ok(c) => c,
141                Err(_) => continue,
142            };
143            let value = match serde_json::from_str(&content) {
144                Ok(v) => v,
145                Err(_) => continue,
146            };
147            let updated_at = entry
148                .metadata()
149                .and_then(|m| m.modified())
150                .ok()
151                .and_then(|t| {
152                    let duration = t.duration_since(std::time::UNIX_EPOCH).ok()?;
153                    Some(
154                        chrono::DateTime::from_timestamp(duration.as_secs() as i64, 0)?
155                            .to_rfc3339(),
156                    )
157                })
158                .unwrap_or_default();
159
160            entries.push(StoreEntry {
161                key,
162                value,
163                updated_at,
164            });
165        }
166
167        Ok(StoreOpResult::Entries(entries))
168    }
169
170    fn prune(
171        &self,
172        store_name: &str,
173        project_id: &str,
174        max_entries: Option<u64>,
175    ) -> Result<StoreOpResult> {
176        let dir = self.store_dir(store_name, project_id);
177        if !dir.exists() {
178            return Ok(StoreOpResult::Ok);
179        }
180
181        if let Some(max) = max_entries {
182            let read_dir = std::fs::read_dir(&dir).context("failed to read store directory")?;
183            let mut files: Vec<_> = read_dir
184                .filter_map(|e| e.ok())
185                .filter(|e| e.path().extension().is_some_and(|ext| ext == "json"))
186                .collect();
187
188            files.sort_by(|a, b| {
189                let t_a = a.metadata().and_then(|m| m.modified()).ok();
190                let t_b = b.metadata().and_then(|m| m.modified()).ok();
191                t_b.cmp(&t_a)
192            });
193
194            // Remove entries beyond max
195            for entry in files.into_iter().skip(max as usize) {
196                let _ = std::fs::remove_file(entry.path());
197            }
198        }
199
200        Ok(StoreOpResult::Ok)
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[tokio::test]
209    async fn file_put_get_delete() {
210        let temp = tempfile::tempdir().expect("tempdir");
211        let backend = FileStoreBackend::new(temp.path().to_path_buf());
212
213        // Put
214        let result = backend
215            .execute(StoreOp::Put {
216                store_name: "metrics".to_string(),
217                project_id: "".to_string(),
218                key: "k1".to_string(),
219                value: r#"{"count": 10}"#.to_string(),
220                task_id: "t1".to_string(),
221            })
222            .await
223            .expect("put");
224        assert!(matches!(result, StoreOpResult::Ok));
225
226        // Get
227        let result = backend
228            .execute(StoreOp::Get {
229                store_name: "metrics".to_string(),
230                project_id: "".to_string(),
231                key: "k1".to_string(),
232            })
233            .await
234            .expect("get");
235        match result {
236            StoreOpResult::Value(Some(v)) => assert_eq!(v["count"], 10),
237            other => panic!("expected Value(Some), got {:?}", other),
238        }
239
240        // Delete
241        let result = backend
242            .execute(StoreOp::Delete {
243                store_name: "metrics".to_string(),
244                project_id: "".to_string(),
245                key: "k1".to_string(),
246            })
247            .await
248            .expect("delete");
249        assert!(matches!(result, StoreOpResult::Ok));
250
251        // Get after delete
252        let result = backend
253            .execute(StoreOp::Get {
254                store_name: "metrics".to_string(),
255                project_id: "".to_string(),
256                key: "k1".to_string(),
257            })
258            .await
259            .expect("get after delete");
260        assert!(matches!(result, StoreOpResult::Value(None)));
261    }
262
263    #[tokio::test]
264    async fn file_list_entries() {
265        let temp = tempfile::tempdir().expect("tempdir");
266        let backend = FileStoreBackend::new(temp.path().to_path_buf());
267
268        backend
269            .execute(StoreOp::Put {
270                store_name: "s".to_string(),
271                project_id: "p".to_string(),
272                key: "a".to_string(),
273                value: r#"{"v": 1}"#.to_string(),
274                task_id: "".to_string(),
275            })
276            .await
277            .expect("put a");
278        backend
279            .execute(StoreOp::Put {
280                store_name: "s".to_string(),
281                project_id: "p".to_string(),
282                key: "b".to_string(),
283                value: r#"{"v": 2}"#.to_string(),
284                task_id: "".to_string(),
285            })
286            .await
287            .expect("put b");
288
289        let result = backend
290            .execute(StoreOp::List {
291                store_name: "s".to_string(),
292                project_id: "p".to_string(),
293                limit: 100,
294                offset: 0,
295            })
296            .await
297            .expect("list");
298        match result {
299            StoreOpResult::Entries(entries) => assert_eq!(entries.len(), 2),
300            other => panic!("expected Entries, got {:?}", other),
301        }
302    }
303
304    #[tokio::test]
305    async fn list_with_offset_skips_entries() {
306        let temp = tempfile::tempdir().expect("tempdir");
307        let backend = FileStoreBackend::new(temp.path().to_path_buf());
308
309        for i in 0..5 {
310            backend
311                .execute(StoreOp::Put {
312                    store_name: "s".to_string(),
313                    project_id: "p".to_string(),
314                    key: format!("k{}", i),
315                    value: format!(r#"{{"v": {}}}"#, i),
316                    task_id: "".to_string(),
317                })
318                .await
319                .expect("put");
320        }
321
322        let result = backend
323            .execute(StoreOp::List {
324                store_name: "s".to_string(),
325                project_id: "p".to_string(),
326                limit: 100,
327                offset: 3,
328            })
329            .await
330            .expect("list with offset");
331        match result {
332            StoreOpResult::Entries(entries) => assert_eq!(entries.len(), 2),
333            other => panic!("expected Entries, got {:?}", other),
334        }
335    }
336
337    #[tokio::test]
338    async fn list_on_nonexistent_directory() {
339        let temp = tempfile::tempdir().expect("tempdir");
340        let backend = FileStoreBackend::new(temp.path().to_path_buf());
341
342        let result = backend
343            .execute(StoreOp::List {
344                store_name: "no_such_store".to_string(),
345                project_id: "no_such_project".to_string(),
346                limit: 10,
347                offset: 0,
348            })
349            .await
350            .expect("list nonexistent");
351        match result {
352            StoreOpResult::Entries(entries) => assert!(entries.is_empty()),
353            other => panic!("expected empty Entries, got {:?}", other),
354        }
355    }
356
357    #[tokio::test]
358    async fn prune_with_max_entries() {
359        let temp = tempfile::tempdir().expect("tempdir");
360        let backend = FileStoreBackend::new(temp.path().to_path_buf());
361
362        for i in 0..4 {
363            backend
364                .execute(StoreOp::Put {
365                    store_name: "s".to_string(),
366                    project_id: "p".to_string(),
367                    key: format!("k{}", i),
368                    value: format!(r#"{{"v": {}}}"#, i),
369                    task_id: "".to_string(),
370                })
371                .await
372                .expect("put");
373        }
374
375        let result = backend
376            .execute(StoreOp::Prune {
377                store_name: "s".to_string(),
378                project_id: "p".to_string(),
379                max_entries: Some(2),
380                ttl_days: None,
381            })
382            .await
383            .expect("prune");
384        assert!(matches!(result, StoreOpResult::Ok));
385
386        let result = backend
387            .execute(StoreOp::List {
388                store_name: "s".to_string(),
389                project_id: "p".to_string(),
390                limit: 100,
391                offset: 0,
392            })
393            .await
394            .expect("list after prune");
395        match result {
396            StoreOpResult::Entries(entries) => assert_eq!(entries.len(), 2),
397            other => panic!("expected 2 entries after prune, got {:?}", other),
398        }
399    }
400
401    #[tokio::test]
402    async fn prune_on_nonexistent_directory() {
403        let temp = tempfile::tempdir().expect("tempdir");
404        let backend = FileStoreBackend::new(temp.path().to_path_buf());
405
406        let result = backend
407            .execute(StoreOp::Prune {
408                store_name: "no_such".to_string(),
409                project_id: "no_such".to_string(),
410                max_entries: Some(5),
411                ttl_days: None,
412            })
413            .await
414            .expect("prune nonexistent");
415        assert!(matches!(result, StoreOpResult::Ok));
416    }
417
418    #[tokio::test]
419    async fn prune_with_none_max_entries_is_noop() {
420        let temp = tempfile::tempdir().expect("tempdir");
421        let backend = FileStoreBackend::new(temp.path().to_path_buf());
422
423        for i in 0..3 {
424            backend
425                .execute(StoreOp::Put {
426                    store_name: "s".to_string(),
427                    project_id: "p".to_string(),
428                    key: format!("k{}", i),
429                    value: format!(r#"{{"v": {}}}"#, i),
430                    task_id: "".to_string(),
431                })
432                .await
433                .expect("put");
434        }
435
436        let result = backend
437            .execute(StoreOp::Prune {
438                store_name: "s".to_string(),
439                project_id: "p".to_string(),
440                max_entries: None,
441                ttl_days: None,
442            })
443            .await
444            .expect("prune none");
445        assert!(matches!(result, StoreOpResult::Ok));
446
447        // All 3 entries should still be present.
448        let result = backend
449            .execute(StoreOp::List {
450                store_name: "s".to_string(),
451                project_id: "p".to_string(),
452                limit: 100,
453                offset: 0,
454            })
455            .await
456            .expect("list after no-op prune");
457        match result {
458            StoreOpResult::Entries(entries) => assert_eq!(entries.len(), 3),
459            other => panic!("expected 3 entries, got {:?}", other),
460        }
461    }
462
463    #[tokio::test]
464    async fn delete_nonexistent_key_returns_ok() {
465        let temp = tempfile::tempdir().expect("tempdir");
466        let backend = FileStoreBackend::new(temp.path().to_path_buf());
467
468        let result = backend
469            .execute(StoreOp::Delete {
470                store_name: "s".to_string(),
471                project_id: "p".to_string(),
472                key: "nonexistent".to_string(),
473            })
474            .await
475            .expect("delete nonexistent");
476        assert!(matches!(result, StoreOpResult::Ok));
477    }
478
479    #[tokio::test]
480    async fn get_nonexistent_key_returns_none() {
481        let temp = tempfile::tempdir().expect("tempdir");
482        let backend = FileStoreBackend::new(temp.path().to_path_buf());
483
484        let result = backend
485            .execute(StoreOp::Get {
486                store_name: "s".to_string(),
487                project_id: "p".to_string(),
488                key: "nonexistent".to_string(),
489            })
490            .await
491            .expect("get nonexistent");
492        assert!(matches!(result, StoreOpResult::Value(None)));
493    }
494
495    #[tokio::test]
496    async fn put_with_nonempty_project_id() {
497        let temp = tempfile::tempdir().expect("tempdir");
498        let backend = FileStoreBackend::new(temp.path().to_path_buf());
499
500        let result = backend
501            .execute(StoreOp::Put {
502                store_name: "metrics".to_string(),
503                project_id: "my-project".to_string(),
504                key: "k1".to_string(),
505                value: r#"{"x": 42}"#.to_string(),
506                task_id: "t1".to_string(),
507            })
508            .await
509            .expect("put with project_id");
510        assert!(matches!(result, StoreOpResult::Ok));
511
512        // Verify the file lands under the correct project directory.
513        let expected = temp.path().join("data/stores/metrics/my-project/k1.json");
514        assert!(
515            expected.exists(),
516            "entry should exist at project-specific path"
517        );
518
519        // Confirm retrieval works with the same project_id.
520        let result = backend
521            .execute(StoreOp::Get {
522                store_name: "metrics".to_string(),
523                project_id: "my-project".to_string(),
524                key: "k1".to_string(),
525            })
526            .await
527            .expect("get with project_id");
528        match result {
529            StoreOpResult::Value(Some(v)) => assert_eq!(v["x"], 42),
530            other => panic!("expected Value(Some), got {:?}", other),
531        }
532    }
533
534    #[tokio::test]
535    async fn empty_project_id_uses_default() {
536        let temp = tempfile::tempdir().expect("tempdir");
537        let backend = FileStoreBackend::new(temp.path().to_path_buf());
538
539        // Put with empty project_id — should fall back to DEFAULT_PROJECT_ID.
540        backend
541            .execute(StoreOp::Put {
542                store_name: "s".to_string(),
543                project_id: "".to_string(),
544                key: "k1".to_string(),
545                value: r#"{"v": 1}"#.to_string(),
546                task_id: "".to_string(),
547            })
548            .await
549            .expect("put empty pid");
550
551        let default_path = temp
552            .path()
553            .join("data/stores/s")
554            .join(crate::config::DEFAULT_PROJECT_ID)
555            .join("k1.json");
556        assert!(
557            default_path.exists(),
558            "empty project_id should map to DEFAULT_PROJECT_ID directory"
559        );
560
561        // Put with explicit project_id — should use that path instead.
562        backend
563            .execute(StoreOp::Put {
564                store_name: "s".to_string(),
565                project_id: "custom".to_string(),
566                key: "k1".to_string(),
567                value: r#"{"v": 2}"#.to_string(),
568                task_id: "".to_string(),
569            })
570            .await
571            .expect("put custom pid");
572
573        let custom_path = temp.path().join("data/stores/s/custom/k1.json");
574        assert!(
575            custom_path.exists(),
576            "explicit project_id should use its own directory"
577        );
578
579        // The two entries are isolated — listing empty pid should return only 1.
580        let result = backend
581            .execute(StoreOp::List {
582                store_name: "s".to_string(),
583                project_id: "".to_string(),
584                limit: 100,
585                offset: 0,
586            })
587            .await
588            .expect("list empty pid");
589        match result {
590            StoreOpResult::Entries(entries) => assert_eq!(entries.len(), 1),
591            other => panic!("expected 1 entry, got {:?}", other),
592        }
593    }
594}