agent_orchestrator/store/
mod.rs1mod command;
9mod file;
10mod local;
11mod validate;
12
13pub use command::CommandAdapter;
14pub use file::FileStoreBackend;
15pub use local::LocalStoreBackend;
16pub use validate::validate_schema;
17
18use crate::async_database::AsyncDatabase;
19use crate::config::{StoreBackendProviderConfig, WorkflowStoreConfig};
20use crate::crd::projection::CrdProjectable;
21use crate::crd::types::CustomResource;
22use anyhow::{Result, anyhow};
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::sync::Arc;
26
27#[derive(Debug, Clone)]
29pub enum StoreOp {
30 Get {
32 store_name: String,
34 project_id: String,
36 key: String,
38 },
39 Put {
41 store_name: String,
43 project_id: String,
45 key: String,
47 value: String,
49 task_id: String,
51 },
52 Delete {
54 store_name: String,
56 project_id: String,
58 key: String,
60 },
61 List {
63 store_name: String,
65 project_id: String,
67 limit: u64,
69 offset: u64,
71 },
72 Prune {
74 store_name: String,
76 project_id: String,
78 max_entries: Option<u64>,
80 ttl_days: Option<u64>,
82 },
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub enum StoreOpResult {
88 Value(Option<serde_json::Value>),
90 Entries(Vec<StoreEntry>),
92 Ok,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct StoreEntry {
99 pub key: String,
101 pub value: serde_json::Value,
103 pub updated_at: String,
105}
106
107pub struct StoreManager {
109 local_backend: LocalStoreBackend,
110 file_backend: FileStoreBackend,
111 command_adapter: CommandAdapter,
112}
113
114impl StoreManager {
115 pub fn new(async_db: Arc<AsyncDatabase>, data_dir: std::path::PathBuf) -> Self {
117 Self {
118 local_backend: LocalStoreBackend::new(async_db.clone()),
119 file_backend: FileStoreBackend::new(data_dir),
120 command_adapter: CommandAdapter,
121 }
122 }
123
124 pub async fn execute(
128 &self,
129 custom_resources: &HashMap<String, CustomResource>,
130 op: StoreOp,
131 ) -> Result<StoreOpResult> {
132 let store_name = match &op {
133 StoreOp::Get { store_name, .. }
134 | StoreOp::Put { store_name, .. }
135 | StoreOp::Delete { store_name, .. }
136 | StoreOp::List { store_name, .. }
137 | StoreOp::Prune { store_name, .. } => store_name.clone(),
138 };
139
140 let store_config = self.resolve_store_config(custom_resources, &store_name);
142
143 if let StoreOp::Put { ref value, .. } = op {
145 if let Some(ref schema) = store_config.schema {
146 let parsed: serde_json::Value = serde_json::from_str(value)
147 .map_err(|e| anyhow!("invalid JSON value for store put: {}", e))?;
148 validate_schema(&parsed, schema)?;
149 }
150 }
151
152 let provider_name = &store_config.provider;
153 self.dispatch(custom_resources, provider_name, op).await
154 }
155
156 fn resolve_store_config(
157 &self,
158 custom_resources: &HashMap<String, CustomResource>,
159 store_name: &str,
160 ) -> WorkflowStoreConfig {
161 let key = format!("WorkflowStore/{}", store_name);
162 custom_resources
163 .get(&key)
164 .and_then(|cr| WorkflowStoreConfig::from_cr_spec(&cr.spec).ok())
165 .unwrap_or_default()
166 }
167
168 async fn dispatch(
169 &self,
170 custom_resources: &HashMap<String, CustomResource>,
171 provider_name: &str,
172 op: StoreOp,
173 ) -> Result<StoreOpResult> {
174 let provider = self.resolve_provider(custom_resources, provider_name)?;
175
176 if provider.builtin {
177 match provider_name {
178 "local" => self.local_backend.execute(op).await,
179 "file" => self.file_backend.execute(op).await,
180 _ => Err(anyhow!("unknown builtin provider: {}", provider_name)),
181 }
182 } else {
183 let commands = provider
184 .commands
185 .as_ref()
186 .ok_or_else(|| anyhow!("provider '{}' has no commands defined", provider_name))?;
187 self.command_adapter.execute(commands, op).await
188 }
189 }
190
191 fn resolve_provider(
192 &self,
193 custom_resources: &HashMap<String, CustomResource>,
194 provider_name: &str,
195 ) -> Result<StoreBackendProviderConfig> {
196 match provider_name {
198 "local" | "file" => {
199 return Ok(StoreBackendProviderConfig {
200 builtin: true,
201 commands: None,
202 });
203 }
204 _ => {}
205 }
206
207 let key = format!("StoreBackendProvider/{}", provider_name);
209 custom_resources
210 .get(&key)
211 .and_then(|cr| StoreBackendProviderConfig::from_cr_spec(&cr.spec).ok())
212 .ok_or_else(|| anyhow!("store backend provider '{}' not found", provider_name))
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn store_op_debug() {
222 let op = StoreOp::Get {
223 store_name: "metrics".to_string(),
224 project_id: "proj1".to_string(),
225 key: "k1".to_string(),
226 };
227 let debug = format!("{:?}", op);
228 assert!(debug.contains("Get"));
229 assert!(debug.contains("metrics"));
230 }
231
232 #[test]
233 fn store_entry_serde_round_trip() {
234 let entry = StoreEntry {
235 key: "bench_001".to_string(),
236 value: serde_json::json!({"test_count": 42}),
237 updated_at: "2026-03-07T00:00:00Z".to_string(),
238 };
239 let json = serde_json::to_string(&entry).expect("serialize");
240 let back: StoreEntry = serde_json::from_str(&json).expect("deserialize");
241 assert_eq!(back.key, "bench_001");
242 }
243
244 #[test]
245 fn store_op_all_variants_debug() {
246 let variants: Vec<StoreOp> = vec![
247 StoreOp::Get {
248 store_name: "s".into(),
249 project_id: "p".into(),
250 key: "k".into(),
251 },
252 StoreOp::Put {
253 store_name: "s".into(),
254 project_id: "p".into(),
255 key: "k".into(),
256 value: "v".into(),
257 task_id: "t".into(),
258 },
259 StoreOp::Delete {
260 store_name: "s".into(),
261 project_id: "p".into(),
262 key: "k".into(),
263 },
264 StoreOp::List {
265 store_name: "s".into(),
266 project_id: "p".into(),
267 limit: 10,
268 offset: 0,
269 },
270 StoreOp::Prune {
271 store_name: "s".into(),
272 project_id: "p".into(),
273 max_entries: Some(100),
274 ttl_days: Some(30),
275 },
276 ];
277 for op in &variants {
278 let debug = format!("{:?}", op);
279 assert!(!debug.is_empty());
280 }
281 }
282
283 #[test]
284 fn store_op_result_serde_round_trip_value() {
285 let result = StoreOpResult::Value(Some(serde_json::json!("hello")));
286 let json = serde_json::to_string(&result).expect("serialize");
287 let back: StoreOpResult = serde_json::from_str(&json).expect("deserialize");
288 match back {
289 StoreOpResult::Value(Some(v)) => assert_eq!(v, serde_json::json!("hello")),
290 _ => panic!("expected Value(Some)"),
291 }
292 }
293
294 #[test]
295 fn store_op_result_serde_round_trip_none() {
296 let result = StoreOpResult::Value(None);
297 let json = serde_json::to_string(&result).expect("serialize");
298 let back: StoreOpResult = serde_json::from_str(&json).expect("deserialize");
299 match back {
300 StoreOpResult::Value(None) => {}
301 _ => panic!("expected Value(None)"),
302 }
303 }
304
305 #[test]
306 fn store_op_result_serde_round_trip_entries() {
307 let result = StoreOpResult::Entries(vec![StoreEntry {
308 key: "k1".to_string(),
309 value: serde_json::json!(42),
310 updated_at: "2026-01-01T00:00:00Z".to_string(),
311 }]);
312 let json = serde_json::to_string(&result).expect("serialize");
313 let back: StoreOpResult = serde_json::from_str(&json).expect("deserialize");
314 match back {
315 StoreOpResult::Entries(entries) => {
316 assert_eq!(entries.len(), 1);
317 assert_eq!(entries[0].key, "k1");
318 }
319 _ => panic!("expected Entries"),
320 }
321 }
322
323 #[test]
324 fn store_op_result_serde_round_trip_ok() {
325 let result = StoreOpResult::Ok;
326 let json = serde_json::to_string(&result).expect("serialize");
327 let back: StoreOpResult = serde_json::from_str(&json).expect("deserialize");
328 assert!(matches!(back, StoreOpResult::Ok));
329 }
330
331 use crate::test_utils::TestState;
334
335 fn make_store_manager() -> StoreManager {
336 let mut fixture = TestState::new();
337 let state = fixture.build();
338 StoreManager::new(
339 state.async_database.clone(),
340 std::path::PathBuf::from("/tmp"),
341 )
342 }
343
344 #[test]
345 fn resolve_store_config_returns_default_when_not_found() {
346 let mgr = make_store_manager();
347 let cr = HashMap::new();
348 let config = mgr.resolve_store_config(&cr, "nonexistent");
349 assert_eq!(config.provider, "local");
350 }
351
352 #[test]
355 fn resolve_provider_builtin_local() {
356 let mgr = make_store_manager();
357 let cr = HashMap::new();
358 let provider = mgr.resolve_provider(&cr, "local").unwrap();
359 assert!(provider.builtin);
360 assert!(provider.commands.is_none());
361 }
362
363 #[test]
364 fn resolve_provider_builtin_file() {
365 let mgr = make_store_manager();
366 let cr = HashMap::new();
367 let provider = mgr.resolve_provider(&cr, "file").unwrap();
368 assert!(provider.builtin);
369 }
370
371 #[test]
372 fn resolve_provider_unknown_custom_not_found() {
373 let mgr = make_store_manager();
374 let cr = HashMap::new();
375 let result = mgr.resolve_provider(&cr, "my_custom_provider");
376 assert!(result.is_err());
377 assert!(result.unwrap_err().to_string().contains("not found"));
378 }
379}