agent_orchestrator/store/
local.rs1use crate::persistence::repository::{SqliteWorkflowStoreRepository, WorkflowStoreRepository};
4use crate::store::{StoreEntry, StoreOp, StoreOpResult};
5use anyhow::{Context, Result};
6use std::sync::Arc;
7
8pub struct LocalStoreBackend {
10 repository: Arc<dyn WorkflowStoreRepository>,
11}
12
13impl LocalStoreBackend {
14 pub fn new(async_db: Arc<crate::async_database::AsyncDatabase>) -> Self {
16 Self::with_repository(Arc::new(SqliteWorkflowStoreRepository::new(async_db)))
17 }
18
19 pub fn with_repository(repository: Arc<dyn WorkflowStoreRepository>) -> Self {
21 Self { repository }
22 }
23
24 pub async fn execute(&self, op: StoreOp) -> Result<StoreOpResult> {
26 match op {
27 StoreOp::Get {
28 store_name,
29 project_id,
30 key,
31 } => self.get(&store_name, &project_id, &key).await,
32 StoreOp::Put {
33 store_name,
34 project_id,
35 key,
36 value,
37 task_id,
38 } => {
39 self.put(&store_name, &project_id, &key, &value, &task_id)
40 .await
41 }
42 StoreOp::Delete {
43 store_name,
44 project_id,
45 key,
46 } => self.delete(&store_name, &project_id, &key).await,
47 StoreOp::List {
48 store_name,
49 project_id,
50 limit,
51 offset,
52 } => self.list(&store_name, &project_id, limit, offset).await,
53 StoreOp::Prune {
54 store_name,
55 project_id,
56 max_entries,
57 ttl_days,
58 } => {
59 self.prune(&store_name, &project_id, max_entries, ttl_days)
60 .await
61 }
62 }
63 }
64
65 async fn get(&self, store_name: &str, project_id: &str, key: &str) -> Result<StoreOpResult> {
66 let result = self.repository.get(store_name, project_id, key).await?;
67
68 let parsed = result
69 .map(|v| serde_json::from_str::<serde_json::Value>(&v))
70 .transpose()
71 .context("failed to parse stored JSON value")?;
72
73 Ok(StoreOpResult::Value(parsed))
74 }
75
76 async fn put(
77 &self,
78 store_name: &str,
79 project_id: &str,
80 key: &str,
81 value: &str,
82 task_id: &str,
83 ) -> Result<StoreOpResult> {
84 self.repository
85 .put(store_name, project_id, key, value, task_id)
86 .await?;
87
88 Ok(StoreOpResult::Ok)
89 }
90
91 async fn delete(&self, store_name: &str, project_id: &str, key: &str) -> Result<StoreOpResult> {
92 self.repository.delete(store_name, project_id, key).await?;
93
94 Ok(StoreOpResult::Ok)
95 }
96
97 async fn list(
98 &self,
99 store_name: &str,
100 project_id: &str,
101 limit: u64,
102 offset: u64,
103 ) -> Result<StoreOpResult> {
104 let entries = self
105 .repository
106 .list(store_name, project_id, limit, offset)
107 .await?;
108
109 let entries: Vec<StoreEntry> = entries
110 .into_iter()
111 .filter_map(|row| {
112 let value = serde_json::from_str(&row.value_json).ok()?;
113 Some(StoreEntry {
114 key: row.key,
115 value,
116 updated_at: row.updated_at,
117 })
118 })
119 .collect();
120
121 Ok(StoreOpResult::Entries(entries))
122 }
123
124 async fn prune(
125 &self,
126 store_name: &str,
127 project_id: &str,
128 max_entries: Option<u64>,
129 ttl_days: Option<u64>,
130 ) -> Result<StoreOpResult> {
131 self.repository
132 .prune(store_name, project_id, max_entries, ttl_days)
133 .await?;
134
135 Ok(StoreOpResult::Ok)
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use crate::test_utils::TestState;
143
144 #[tokio::test]
145 async fn put_get_delete_round_trip() {
146 let mut fixture = TestState::new();
147 let state = fixture.build();
148
149 let backend = LocalStoreBackend::new(state.async_database.clone());
150
151 let result = backend
153 .put(
154 "metrics",
155 "",
156 "bench_001",
157 r#"{"test_count": 42}"#,
158 "task-1",
159 )
160 .await
161 .expect("put should succeed");
162 assert!(matches!(result, StoreOpResult::Ok));
163
164 let result = backend
166 .get("metrics", "", "bench_001")
167 .await
168 .expect("get should succeed");
169 match result {
170 StoreOpResult::Value(Some(v)) => {
171 assert_eq!(v["test_count"], 42);
172 }
173 other => panic!("expected Value(Some(...)), got {:?}", other),
174 }
175
176 let result = backend
178 .delete("metrics", "", "bench_001")
179 .await
180 .expect("delete should succeed");
181 assert!(matches!(result, StoreOpResult::Ok));
182
183 let result = backend
185 .get("metrics", "", "bench_001")
186 .await
187 .expect("get should succeed");
188 assert!(matches!(result, StoreOpResult::Value(None)));
189 }
190
191 #[tokio::test]
192 async fn list_returns_entries() {
193 let mut fixture = TestState::new();
194 let state = fixture.build();
195
196 let backend = LocalStoreBackend::new(state.async_database.clone());
197
198 backend
199 .put("metrics", "p1", "k1", r#"{"v": 1}"#, "t1")
200 .await
201 .expect("put k1");
202 backend
203 .put("metrics", "p1", "k2", r#"{"v": 2}"#, "t1")
204 .await
205 .expect("put k2");
206
207 let result = backend
208 .list("metrics", "p1", 100, 0)
209 .await
210 .expect("list should succeed");
211 match result {
212 StoreOpResult::Entries(entries) => {
213 assert_eq!(entries.len(), 2);
214 }
215 other => panic!("expected Entries, got {:?}", other),
216 }
217 }
218
219 #[tokio::test]
220 async fn put_upserts_existing_key() {
221 let mut fixture = TestState::new();
222 let state = fixture.build();
223
224 let backend = LocalStoreBackend::new(state.async_database.clone());
225
226 backend
227 .put("s", "", "k", r#"{"v": 1}"#, "t1")
228 .await
229 .expect("first put");
230 backend
231 .put("s", "", "k", r#"{"v": 2}"#, "t2")
232 .await
233 .expect("second put (upsert)");
234
235 let result = backend.get("s", "", "k").await.expect("get");
236 match result {
237 StoreOpResult::Value(Some(v)) => assert_eq!(v["v"], 2),
238 other => panic!("expected Value(Some(...)), got {:?}", other),
239 }
240 }
241}