Skip to main content

agent_orchestrator/store/
mod.rs

1//! Persistent Store — cross-task workflow memory with pluggable backends.
2//!
3//! Three-layer architecture (analogous to K8s StorageClass pattern):
4//! - `StoreBackendProvider` CRD: defines HOW a backend works
5//! - `WorkflowStore` CRD: defines WHAT store to use
6//! - Store entries: actual data managed by the provider
7
8mod command;
9mod file;
10mod local;
11mod validate;
12
13pub use command::CommandAdapter;
14pub use file::FileStoreBackend;
15pub use local::LocalStoreBackend;
16pub use validate::validate_schema;
17
18use crate::async_database::AsyncDatabase;
19use crate::config::{StoreBackendProviderConfig, WorkflowStoreConfig};
20use crate::crd::projection::CrdProjectable;
21use crate::crd::types::CustomResource;
22use anyhow::{Result, anyhow};
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::sync::Arc;
26
27/// Operations that can be performed on a store.
28#[derive(Debug, Clone)]
29pub enum StoreOp {
30    /// Load a single value by key.
31    Get {
32        /// Logical workflow-store name.
33        store_name: String,
34        /// Project scope used to namespace entries.
35        project_id: String,
36        /// Entry key to load.
37        key: String,
38    },
39    /// Upsert a single value by key.
40    Put {
41        /// Logical workflow-store name.
42        store_name: String,
43        /// Project scope used to namespace entries.
44        project_id: String,
45        /// Entry key to write.
46        key: String,
47        /// JSON payload to persist.
48        value: String,
49        /// Task responsible for the write.
50        task_id: String,
51    },
52    /// Remove a single value by key.
53    Delete {
54        /// Logical workflow-store name.
55        store_name: String,
56        /// Project scope used to namespace entries.
57        project_id: String,
58        /// Entry key to delete.
59        key: String,
60    },
61    /// List entries in a store.
62    List {
63        /// Logical workflow-store name.
64        store_name: String,
65        /// Project scope used to namespace entries.
66        project_id: String,
67        /// Maximum number of rows to return.
68        limit: u64,
69        /// Zero-based row offset.
70        offset: u64,
71    },
72    /// Apply retention pruning to a store.
73    Prune {
74        /// Logical workflow-store name.
75        store_name: String,
76        /// Project scope used to namespace entries.
77        project_id: String,
78        /// Optional maximum number of retained entries.
79        max_entries: Option<u64>,
80        /// Optional retention window in days.
81        ttl_days: Option<u64>,
82    },
83}
84
85/// Result of a store operation.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub enum StoreOpResult {
88    /// Value retrieved (None if key not found).
89    Value(Option<serde_json::Value>),
90    /// List of entries.
91    Entries(Vec<StoreEntry>),
92    /// Operation succeeded with no return value.
93    Ok,
94}
95
96/// A single entry in a workflow store.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct StoreEntry {
99    /// Entry key.
100    pub key: String,
101    /// JSON payload stored under `key`.
102    pub value: serde_json::Value,
103    /// RFC 3339 timestamp of the latest update.
104    pub updated_at: String,
105}
106
107/// Manages store operations, dispatching to the appropriate backend.
108pub struct StoreManager {
109    local_backend: LocalStoreBackend,
110    file_backend: FileStoreBackend,
111    command_adapter: CommandAdapter,
112}
113
114impl StoreManager {
115    /// Creates a store manager with built-in local and file backends.
116    pub fn new(async_db: Arc<AsyncDatabase>, data_dir: std::path::PathBuf) -> Self {
117        Self {
118            local_backend: LocalStoreBackend::new(async_db.clone()),
119            file_backend: FileStoreBackend::new(data_dir),
120            command_adapter: CommandAdapter,
121        }
122    }
123
124    /// Execute a store operation, dispatching to the correct backend.
125    ///
126    /// `custom_resources` is the CRD instance map from `OrchestratorConfig.custom_resources`.
127    pub async fn execute(
128        &self,
129        custom_resources: &HashMap<String, CustomResource>,
130        op: StoreOp,
131    ) -> Result<StoreOpResult> {
132        let store_name = match &op {
133            StoreOp::Get { store_name, .. }
134            | StoreOp::Put { store_name, .. }
135            | StoreOp::Delete { store_name, .. }
136            | StoreOp::List { store_name, .. }
137            | StoreOp::Prune { store_name, .. } => store_name.clone(),
138        };
139
140        // Resolve WorkflowStore config (auto-provision with defaults if not declared)
141        let store_config = self.resolve_store_config(custom_resources, &store_name);
142
143        // Validate schema on put
144        if let StoreOp::Put { ref value, .. } = op {
145            if let Some(ref schema) = store_config.schema {
146                let parsed: serde_json::Value = serde_json::from_str(value)
147                    .map_err(|e| anyhow!("invalid JSON value for store put: {}", e))?;
148                validate_schema(&parsed, schema)?;
149            }
150        }
151
152        let provider_name = &store_config.provider;
153        self.dispatch(custom_resources, provider_name, op).await
154    }
155
156    fn resolve_store_config(
157        &self,
158        custom_resources: &HashMap<String, CustomResource>,
159        store_name: &str,
160    ) -> WorkflowStoreConfig {
161        let key = format!("WorkflowStore/{}", store_name);
162        custom_resources
163            .get(&key)
164            .and_then(|cr| WorkflowStoreConfig::from_cr_spec(&cr.spec).ok())
165            .unwrap_or_default()
166    }
167
168    async fn dispatch(
169        &self,
170        custom_resources: &HashMap<String, CustomResource>,
171        provider_name: &str,
172        op: StoreOp,
173    ) -> Result<StoreOpResult> {
174        let provider = self.resolve_provider(custom_resources, provider_name)?;
175
176        if provider.builtin {
177            match provider_name {
178                "local" => self.local_backend.execute(op).await,
179                "file" => self.file_backend.execute(op).await,
180                _ => Err(anyhow!("unknown builtin provider: {}", provider_name)),
181            }
182        } else {
183            let commands = provider
184                .commands
185                .as_ref()
186                .ok_or_else(|| anyhow!("provider '{}' has no commands defined", provider_name))?;
187            self.command_adapter.execute(commands, op).await
188        }
189    }
190
191    fn resolve_provider(
192        &self,
193        custom_resources: &HashMap<String, CustomResource>,
194        provider_name: &str,
195    ) -> Result<StoreBackendProviderConfig> {
196        // Built-in providers don't need a CRD instance
197        match provider_name {
198            "local" | "file" => {
199                return Ok(StoreBackendProviderConfig {
200                    builtin: true,
201                    commands: None,
202                });
203            }
204            _ => {}
205        }
206
207        // Look up user-defined provider from custom_resources
208        let key = format!("StoreBackendProvider/{}", provider_name);
209        custom_resources
210            .get(&key)
211            .and_then(|cr| StoreBackendProviderConfig::from_cr_spec(&cr.spec).ok())
212            .ok_or_else(|| anyhow!("store backend provider '{}' not found", provider_name))
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn store_op_debug() {
222        let op = StoreOp::Get {
223            store_name: "metrics".to_string(),
224            project_id: "proj1".to_string(),
225            key: "k1".to_string(),
226        };
227        let debug = format!("{:?}", op);
228        assert!(debug.contains("Get"));
229        assert!(debug.contains("metrics"));
230    }
231
232    #[test]
233    fn store_entry_serde_round_trip() {
234        let entry = StoreEntry {
235            key: "bench_001".to_string(),
236            value: serde_json::json!({"test_count": 42}),
237            updated_at: "2026-03-07T00:00:00Z".to_string(),
238        };
239        let json = serde_json::to_string(&entry).expect("serialize");
240        let back: StoreEntry = serde_json::from_str(&json).expect("deserialize");
241        assert_eq!(back.key, "bench_001");
242    }
243
244    #[test]
245    fn store_op_all_variants_debug() {
246        let variants: Vec<StoreOp> = vec![
247            StoreOp::Get {
248                store_name: "s".into(),
249                project_id: "p".into(),
250                key: "k".into(),
251            },
252            StoreOp::Put {
253                store_name: "s".into(),
254                project_id: "p".into(),
255                key: "k".into(),
256                value: "v".into(),
257                task_id: "t".into(),
258            },
259            StoreOp::Delete {
260                store_name: "s".into(),
261                project_id: "p".into(),
262                key: "k".into(),
263            },
264            StoreOp::List {
265                store_name: "s".into(),
266                project_id: "p".into(),
267                limit: 10,
268                offset: 0,
269            },
270            StoreOp::Prune {
271                store_name: "s".into(),
272                project_id: "p".into(),
273                max_entries: Some(100),
274                ttl_days: Some(30),
275            },
276        ];
277        for op in &variants {
278            let debug = format!("{:?}", op);
279            assert!(!debug.is_empty());
280        }
281    }
282
283    #[test]
284    fn store_op_result_serde_round_trip_value() {
285        let result = StoreOpResult::Value(Some(serde_json::json!("hello")));
286        let json = serde_json::to_string(&result).expect("serialize");
287        let back: StoreOpResult = serde_json::from_str(&json).expect("deserialize");
288        match back {
289            StoreOpResult::Value(Some(v)) => assert_eq!(v, serde_json::json!("hello")),
290            _ => panic!("expected Value(Some)"),
291        }
292    }
293
294    #[test]
295    fn store_op_result_serde_round_trip_none() {
296        let result = StoreOpResult::Value(None);
297        let json = serde_json::to_string(&result).expect("serialize");
298        let back: StoreOpResult = serde_json::from_str(&json).expect("deserialize");
299        match back {
300            StoreOpResult::Value(None) => {}
301            _ => panic!("expected Value(None)"),
302        }
303    }
304
305    #[test]
306    fn store_op_result_serde_round_trip_entries() {
307        let result = StoreOpResult::Entries(vec![StoreEntry {
308            key: "k1".to_string(),
309            value: serde_json::json!(42),
310            updated_at: "2026-01-01T00:00:00Z".to_string(),
311        }]);
312        let json = serde_json::to_string(&result).expect("serialize");
313        let back: StoreOpResult = serde_json::from_str(&json).expect("deserialize");
314        match back {
315            StoreOpResult::Entries(entries) => {
316                assert_eq!(entries.len(), 1);
317                assert_eq!(entries[0].key, "k1");
318            }
319            _ => panic!("expected Entries"),
320        }
321    }
322
323    #[test]
324    fn store_op_result_serde_round_trip_ok() {
325        let result = StoreOpResult::Ok;
326        let json = serde_json::to_string(&result).expect("serialize");
327        let back: StoreOpResult = serde_json::from_str(&json).expect("deserialize");
328        assert!(matches!(back, StoreOpResult::Ok));
329    }
330
331    // ── resolve_store_config tests ──
332
333    use crate::test_utils::TestState;
334
335    fn make_store_manager() -> StoreManager {
336        let mut fixture = TestState::new();
337        let state = fixture.build();
338        StoreManager::new(
339            state.async_database.clone(),
340            std::path::PathBuf::from("/tmp"),
341        )
342    }
343
344    #[test]
345    fn resolve_store_config_returns_default_when_not_found() {
346        let mgr = make_store_manager();
347        let cr = HashMap::new();
348        let config = mgr.resolve_store_config(&cr, "nonexistent");
349        assert_eq!(config.provider, "local");
350    }
351
352    // ── resolve_provider tests ──
353
354    #[test]
355    fn resolve_provider_builtin_local() {
356        let mgr = make_store_manager();
357        let cr = HashMap::new();
358        let provider = mgr.resolve_provider(&cr, "local").unwrap();
359        assert!(provider.builtin);
360        assert!(provider.commands.is_none());
361    }
362
363    #[test]
364    fn resolve_provider_builtin_file() {
365        let mgr = make_store_manager();
366        let cr = HashMap::new();
367        let provider = mgr.resolve_provider(&cr, "file").unwrap();
368        assert!(provider.builtin);
369    }
370
371    #[test]
372    fn resolve_provider_unknown_custom_not_found() {
373        let mgr = make_store_manager();
374        let cr = HashMap::new();
375        let result = mgr.resolve_provider(&cr, "my_custom_provider");
376        assert!(result.is_err());
377        assert!(result.unwrap_err().to_string().contains("not found"));
378    }
379}