Skip to main content

agent_orchestrator/store/
local.rs

1//! Local store backend — SQLite-based persistent store via repository boundary.
2
3use crate::persistence::repository::{SqliteWorkflowStoreRepository, WorkflowStoreRepository};
4use crate::store::{StoreEntry, StoreOp, StoreOpResult};
5use anyhow::{Context, Result};
6use std::sync::Arc;
7
8/// SQLite-backed workflow store implementation.
9pub struct LocalStoreBackend {
10    repository: Arc<dyn WorkflowStoreRepository>,
11}
12
13impl LocalStoreBackend {
14    /// Creates a backend backed by the default SQLite repository.
15    pub fn new(async_db: Arc<crate::async_database::AsyncDatabase>) -> Self {
16        Self::with_repository(Arc::new(SqliteWorkflowStoreRepository::new(async_db)))
17    }
18
19    /// Creates a backend from an injected repository implementation.
20    pub fn with_repository(repository: Arc<dyn WorkflowStoreRepository>) -> Self {
21        Self { repository }
22    }
23
24    /// Executes a store operation using the repository boundary.
25    pub async fn execute(&self, op: StoreOp) -> Result<StoreOpResult> {
26        match op {
27            StoreOp::Get {
28                store_name,
29                project_id,
30                key,
31            } => self.get(&store_name, &project_id, &key).await,
32            StoreOp::Put {
33                store_name,
34                project_id,
35                key,
36                value,
37                task_id,
38            } => {
39                self.put(&store_name, &project_id, &key, &value, &task_id)
40                    .await
41            }
42            StoreOp::Delete {
43                store_name,
44                project_id,
45                key,
46            } => self.delete(&store_name, &project_id, &key).await,
47            StoreOp::List {
48                store_name,
49                project_id,
50                limit,
51                offset,
52            } => self.list(&store_name, &project_id, limit, offset).await,
53            StoreOp::Prune {
54                store_name,
55                project_id,
56                max_entries,
57                ttl_days,
58            } => {
59                self.prune(&store_name, &project_id, max_entries, ttl_days)
60                    .await
61            }
62        }
63    }
64
65    async fn get(&self, store_name: &str, project_id: &str, key: &str) -> Result<StoreOpResult> {
66        let result = self.repository.get(store_name, project_id, key).await?;
67
68        let parsed = result
69            .map(|v| serde_json::from_str::<serde_json::Value>(&v))
70            .transpose()
71            .context("failed to parse stored JSON value")?;
72
73        Ok(StoreOpResult::Value(parsed))
74    }
75
76    async fn put(
77        &self,
78        store_name: &str,
79        project_id: &str,
80        key: &str,
81        value: &str,
82        task_id: &str,
83    ) -> Result<StoreOpResult> {
84        self.repository
85            .put(store_name, project_id, key, value, task_id)
86            .await?;
87
88        Ok(StoreOpResult::Ok)
89    }
90
91    async fn delete(&self, store_name: &str, project_id: &str, key: &str) -> Result<StoreOpResult> {
92        self.repository.delete(store_name, project_id, key).await?;
93
94        Ok(StoreOpResult::Ok)
95    }
96
97    async fn list(
98        &self,
99        store_name: &str,
100        project_id: &str,
101        limit: u64,
102        offset: u64,
103    ) -> Result<StoreOpResult> {
104        let entries = self
105            .repository
106            .list(store_name, project_id, limit, offset)
107            .await?;
108
109        let entries: Vec<StoreEntry> = entries
110            .into_iter()
111            .filter_map(|row| {
112                let value = serde_json::from_str(&row.value_json).ok()?;
113                Some(StoreEntry {
114                    key: row.key,
115                    value,
116                    updated_at: row.updated_at,
117                })
118            })
119            .collect();
120
121        Ok(StoreOpResult::Entries(entries))
122    }
123
124    async fn prune(
125        &self,
126        store_name: &str,
127        project_id: &str,
128        max_entries: Option<u64>,
129        ttl_days: Option<u64>,
130    ) -> Result<StoreOpResult> {
131        self.repository
132            .prune(store_name, project_id, max_entries, ttl_days)
133            .await?;
134
135        Ok(StoreOpResult::Ok)
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use crate::test_utils::TestState;
143
144    #[tokio::test]
145    async fn put_get_delete_round_trip() {
146        let mut fixture = TestState::new();
147        let state = fixture.build();
148
149        let backend = LocalStoreBackend::new(state.async_database.clone());
150
151        // Put
152        let result = backend
153            .put(
154                "metrics",
155                "",
156                "bench_001",
157                r#"{"test_count": 42}"#,
158                "task-1",
159            )
160            .await
161            .expect("put should succeed");
162        assert!(matches!(result, StoreOpResult::Ok));
163
164        // Get
165        let result = backend
166            .get("metrics", "", "bench_001")
167            .await
168            .expect("get should succeed");
169        match result {
170            StoreOpResult::Value(Some(v)) => {
171                assert_eq!(v["test_count"], 42);
172            }
173            other => panic!("expected Value(Some(...)), got {:?}", other),
174        }
175
176        // Delete
177        let result = backend
178            .delete("metrics", "", "bench_001")
179            .await
180            .expect("delete should succeed");
181        assert!(matches!(result, StoreOpResult::Ok));
182
183        // Get after delete
184        let result = backend
185            .get("metrics", "", "bench_001")
186            .await
187            .expect("get should succeed");
188        assert!(matches!(result, StoreOpResult::Value(None)));
189    }
190
191    #[tokio::test]
192    async fn list_returns_entries() {
193        let mut fixture = TestState::new();
194        let state = fixture.build();
195
196        let backend = LocalStoreBackend::new(state.async_database.clone());
197
198        backend
199            .put("metrics", "p1", "k1", r#"{"v": 1}"#, "t1")
200            .await
201            .expect("put k1");
202        backend
203            .put("metrics", "p1", "k2", r#"{"v": 2}"#, "t1")
204            .await
205            .expect("put k2");
206
207        let result = backend
208            .list("metrics", "p1", 100, 0)
209            .await
210            .expect("list should succeed");
211        match result {
212            StoreOpResult::Entries(entries) => {
213                assert_eq!(entries.len(), 2);
214            }
215            other => panic!("expected Entries, got {:?}", other),
216        }
217    }
218
219    #[tokio::test]
220    async fn put_upserts_existing_key() {
221        let mut fixture = TestState::new();
222        let state = fixture.build();
223
224        let backend = LocalStoreBackend::new(state.async_database.clone());
225
226        backend
227            .put("s", "", "k", r#"{"v": 1}"#, "t1")
228            .await
229            .expect("first put");
230        backend
231            .put("s", "", "k", r#"{"v": 2}"#, "t2")
232            .await
233            .expect("second put (upsert)");
234
235        let result = backend.get("s", "", "k").await.expect("get");
236        match result {
237            StoreOpResult::Value(Some(v)) => assert_eq!(v["v"], 2),
238            other => panic!("expected Value(Some(...)), got {:?}", other),
239        }
240    }
241}