Skip to main content

agent_orchestrator/store/
command.rs

1//! Command adapter — shell-based generic backend for user-defined providers.
2
3use crate::config::StoreBackendCommands;
4use crate::store::{StoreEntry, StoreOp, StoreOpResult};
5use anyhow::{Result, anyhow};
6
7/// Adapter for store backends implemented as shell commands.
8pub struct CommandAdapter;
9
10impl CommandAdapter {
11    /// Executes a store operation by expanding provider command templates and parsing stdout.
12    pub async fn execute(
13        &self,
14        commands: &StoreBackendCommands,
15        op: StoreOp,
16    ) -> Result<StoreOpResult> {
17        let (cmd_template, env_vars, parse_mode) = match &op {
18            StoreOp::Get {
19                store_name,
20                project_id,
21                key,
22            } => (
23                &commands.get,
24                vec![
25                    ("STORE_NAME", store_name.as_str()),
26                    ("PROJECT_ID", project_id.as_str()),
27                    ("KEY", key.as_str()),
28                ],
29                ParseMode::Value,
30            ),
31            StoreOp::Put {
32                store_name,
33                project_id,
34                key,
35                value,
36                task_id,
37            } => (
38                &commands.put,
39                vec![
40                    ("STORE_NAME", store_name.as_str()),
41                    ("PROJECT_ID", project_id.as_str()),
42                    ("KEY", key.as_str()),
43                    ("VALUE", value.as_str()),
44                    ("TASK_ID", task_id.as_str()),
45                ],
46                ParseMode::None,
47            ),
48            StoreOp::Delete {
49                store_name,
50                project_id,
51                key,
52            } => (
53                &commands.delete,
54                vec![
55                    ("STORE_NAME", store_name.as_str()),
56                    ("PROJECT_ID", project_id.as_str()),
57                    ("KEY", key.as_str()),
58                ],
59                ParseMode::None,
60            ),
61            StoreOp::List {
62                store_name,
63                project_id,
64                ..
65            } => (
66                &commands.list,
67                vec![
68                    ("STORE_NAME", store_name.as_str()),
69                    ("PROJECT_ID", project_id.as_str()),
70                ],
71                ParseMode::Entries,
72            ),
73            StoreOp::Prune {
74                store_name,
75                project_id,
76                ..
77            } => {
78                let cmd = commands
79                    .prune
80                    .as_ref()
81                    .ok_or_else(|| anyhow!("provider does not support prune operation"))?;
82                (
83                    cmd,
84                    vec![
85                        ("STORE_NAME", store_name.as_str()),
86                        ("PROJECT_ID", project_id.as_str()),
87                    ],
88                    ParseMode::None,
89                )
90            }
91        };
92
93        // Build env vars, handling the limit/offset/max_entries/ttl_days cases
94        let mut envs: Vec<(String, String)> = env_vars
95            .iter()
96            .map(|(k, v)| (k.to_string(), v.to_string()))
97            .collect();
98
99        // Add numeric env vars for list/prune
100        match &op {
101            StoreOp::List { limit, offset, .. } => {
102                envs.retain(|e| e.0 != "LIMIT" && e.0 != "OFFSET");
103                envs.push(("LIMIT".to_string(), limit.to_string()));
104                envs.push(("OFFSET".to_string(), offset.to_string()));
105            }
106            StoreOp::Prune {
107                max_entries,
108                ttl_days,
109                ..
110            } => {
111                if let Some(max) = max_entries {
112                    envs.push(("MAX_ENTRIES".to_string(), max.to_string()));
113                }
114                if let Some(ttl) = ttl_days {
115                    envs.push(("TTL_DAYS".to_string(), ttl.to_string()));
116                }
117            }
118            _ => {}
119        }
120
121        let output = tokio::process::Command::new("sh")
122            .arg("-c")
123            .arg(cmd_template)
124            .envs(envs)
125            .output()
126            .await
127            .map_err(|e| anyhow!("failed to execute provider command: {}", e))?;
128
129        if !output.status.success() {
130            let stderr = String::from_utf8_lossy(&output.stderr);
131            return Err(anyhow!(
132                "provider command failed (exit {}): {}",
133                output.status.code().unwrap_or(-1),
134                stderr.trim()
135            ));
136        }
137
138        let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
139
140        match parse_mode {
141            ParseMode::Value => {
142                if stdout.is_empty() {
143                    Ok(StoreOpResult::Value(None))
144                } else {
145                    let value: serde_json::Value = serde_json::from_str(&stdout)
146                        .map_err(|e| anyhow!("failed to parse provider get output: {}", e))?;
147                    Ok(StoreOpResult::Value(Some(value)))
148                }
149            }
150            ParseMode::Entries => {
151                if stdout.is_empty() {
152                    Ok(StoreOpResult::Entries(vec![]))
153                } else {
154                    let entries: Vec<StoreEntry> = serde_json::from_str(&stdout)
155                        .map_err(|e| anyhow!("failed to parse provider list output: {}", e))?;
156                    Ok(StoreOpResult::Entries(entries))
157                }
158            }
159            ParseMode::None => Ok(StoreOpResult::Ok),
160        }
161    }
162}
163
164enum ParseMode {
165    Value,
166    Entries,
167    None,
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use crate::config::StoreBackendCommands;
174
175    #[tokio::test]
176    async fn command_adapter_put_get() {
177        let temp = tempfile::tempdir().expect("tempdir");
178        let base = temp.path().to_str().expect("path");
179
180        let commands = StoreBackendCommands {
181            get: format!(
182                "cat {}/\"$STORE_NAME\"-\"$KEY\".json 2>/dev/null || true",
183                base
184            ),
185            put: format!("echo \"$VALUE\" > {}/\"$STORE_NAME\"-\"$KEY\".json", base),
186            delete: format!("rm -f {}/\"$STORE_NAME\"-\"$KEY\".json", base),
187            list: "echo '[]'".to_string(),
188            prune: None,
189        };
190
191        let adapter = CommandAdapter;
192
193        // Put
194        let result = adapter
195            .execute(
196                &commands,
197                StoreOp::Put {
198                    store_name: "test".to_string(),
199                    project_id: "".to_string(),
200                    key: "k1".to_string(),
201                    value: r#"{"hello": "world"}"#.to_string(),
202                    task_id: "t1".to_string(),
203                },
204            )
205            .await
206            .expect("put");
207        assert!(matches!(result, StoreOpResult::Ok));
208
209        // Get
210        let result = adapter
211            .execute(
212                &commands,
213                StoreOp::Get {
214                    store_name: "test".to_string(),
215                    project_id: "".to_string(),
216                    key: "k1".to_string(),
217                },
218            )
219            .await
220            .expect("get");
221        match result {
222            StoreOpResult::Value(Some(v)) => assert_eq!(v["hello"], "world"),
223            other => panic!("expected Value(Some), got {:?}", other),
224        }
225    }
226
227    #[tokio::test]
228    async fn command_adapter_get_missing_returns_none() {
229        let commands = StoreBackendCommands {
230            get: "echo ''".to_string(),
231            put: "true".to_string(),
232            delete: "true".to_string(),
233            list: "echo '[]'".to_string(),
234            prune: None,
235        };
236
237        let adapter = CommandAdapter;
238        let result = adapter
239            .execute(
240                &commands,
241                StoreOp::Get {
242                    store_name: "s".to_string(),
243                    project_id: "".to_string(),
244                    key: "missing".to_string(),
245                },
246            )
247            .await
248            .expect("get");
249        assert!(matches!(result, StoreOpResult::Value(None)));
250    }
251
252    #[tokio::test]
253    async fn command_adapter_prune_unsupported() {
254        let commands = StoreBackendCommands {
255            get: "true".to_string(),
256            put: "true".to_string(),
257            delete: "true".to_string(),
258            list: "echo '[]'".to_string(),
259            prune: None,
260        };
261
262        let adapter = CommandAdapter;
263        let result = adapter
264            .execute(
265                &commands,
266                StoreOp::Prune {
267                    store_name: "s".to_string(),
268                    project_id: "".to_string(),
269                    max_entries: None,
270                    ttl_days: None,
271                },
272            )
273            .await;
274        assert!(result.is_err());
275    }
276
277    #[tokio::test]
278    async fn command_adapter_delete() {
279        let temp = tempfile::tempdir().expect("tempdir");
280        let base = temp.path().to_str().expect("path");
281
282        let commands = StoreBackendCommands {
283            get: format!(
284                "cat {}/\"$STORE_NAME\"-\"$KEY\".json 2>/dev/null || true",
285                base
286            ),
287            put: format!("echo \"$VALUE\" > {}/\"$STORE_NAME\"-\"$KEY\".json", base),
288            delete: format!("rm -f {}/\"$STORE_NAME\"-\"$KEY\".json", base),
289            list: "echo '[]'".to_string(),
290            prune: None,
291        };
292
293        let adapter = CommandAdapter;
294
295        // Put a value first
296        adapter
297            .execute(
298                &commands,
299                StoreOp::Put {
300                    store_name: "test".to_string(),
301                    project_id: "".to_string(),
302                    key: "k1".to_string(),
303                    value: r#"{"a":1}"#.to_string(),
304                    task_id: "t1".to_string(),
305                },
306            )
307            .await
308            .expect("put");
309
310        // Delete it
311        let result = adapter
312            .execute(
313                &commands,
314                StoreOp::Delete {
315                    store_name: "test".to_string(),
316                    project_id: "".to_string(),
317                    key: "k1".to_string(),
318                },
319            )
320            .await
321            .expect("delete");
322        assert!(matches!(result, StoreOpResult::Ok));
323
324        // Verify it's gone
325        let result = adapter
326            .execute(
327                &commands,
328                StoreOp::Get {
329                    store_name: "test".to_string(),
330                    project_id: "".to_string(),
331                    key: "k1".to_string(),
332                },
333            )
334            .await
335            .expect("get after delete");
336        assert!(matches!(result, StoreOpResult::Value(None)));
337    }
338
339    #[tokio::test]
340    async fn command_adapter_list_empty() {
341        let commands = StoreBackendCommands {
342            get: "true".to_string(),
343            put: "true".to_string(),
344            delete: "true".to_string(),
345            list: "echo '[]'".to_string(),
346            prune: None,
347        };
348
349        let adapter = CommandAdapter;
350        let result = adapter
351            .execute(
352                &commands,
353                StoreOp::List {
354                    store_name: "s".to_string(),
355                    project_id: "p".to_string(),
356                    limit: 10,
357                    offset: 0,
358                },
359            )
360            .await
361            .expect("list");
362        match result {
363            StoreOpResult::Entries(entries) => assert!(entries.is_empty()),
364            other => panic!("expected Entries([]), got {:?}", other),
365        }
366    }
367
368    #[tokio::test]
369    async fn command_adapter_list_with_entries() {
370        let json = r#"[{"key":"k1","value":{"x":1},"updated_at":"2025-01-01T00:00:00Z"},{"key":"k2","value":"hello","updated_at":"2025-01-02T00:00:00Z"}]"#;
371        let commands = StoreBackendCommands {
372            get: "true".to_string(),
373            put: "true".to_string(),
374            delete: "true".to_string(),
375            list: format!("echo '{}'", json),
376            prune: None,
377        };
378
379        let adapter = CommandAdapter;
380        let result = adapter
381            .execute(
382                &commands,
383                StoreOp::List {
384                    store_name: "s".to_string(),
385                    project_id: "p".to_string(),
386                    limit: 50,
387                    offset: 0,
388                },
389            )
390            .await
391            .expect("list");
392        match result {
393            StoreOpResult::Entries(entries) => {
394                assert_eq!(entries.len(), 2);
395                assert_eq!(entries[0].key, "k1");
396                assert_eq!(entries[0].value, serde_json::json!({"x": 1}));
397                assert_eq!(entries[0].updated_at, "2025-01-01T00:00:00Z");
398                assert_eq!(entries[1].key, "k2");
399                assert_eq!(entries[1].value, serde_json::json!("hello"));
400            }
401            other => panic!("expected Entries, got {:?}", other),
402        }
403    }
404
405    #[tokio::test]
406    async fn command_adapter_prune_supported() {
407        let commands = StoreBackendCommands {
408            get: "true".to_string(),
409            put: "true".to_string(),
410            delete: "true".to_string(),
411            list: "echo '[]'".to_string(),
412            prune: Some("true".to_string()),
413        };
414
415        let adapter = CommandAdapter;
416        let result = adapter
417            .execute(
418                &commands,
419                StoreOp::Prune {
420                    store_name: "s".to_string(),
421                    project_id: "p".to_string(),
422                    max_entries: None,
423                    ttl_days: None,
424                },
425            )
426            .await
427            .expect("prune");
428        assert!(matches!(result, StoreOpResult::Ok));
429    }
430
431    #[tokio::test]
432    async fn command_adapter_prune_with_env_vars() {
433        // The prune command echoes env vars to stderr (ignored) and exits 0.
434        // We verify the adapter passes MAX_ENTRIES and TTL_DAYS correctly.
435        let commands = StoreBackendCommands {
436            get: "true".to_string(),
437            put: "true".to_string(),
438            delete: "true".to_string(),
439            list: "echo '[]'".to_string(),
440            prune: Some(
441                "test \"$MAX_ENTRIES\" = \"100\" && test \"$TTL_DAYS\" = \"30\"".to_string(),
442            ),
443        };
444
445        let adapter = CommandAdapter;
446        let result = adapter
447            .execute(
448                &commands,
449                StoreOp::Prune {
450                    store_name: "s".to_string(),
451                    project_id: "p".to_string(),
452                    max_entries: Some(100),
453                    ttl_days: Some(30),
454                },
455            )
456            .await
457            .expect("prune with env vars");
458        assert!(matches!(result, StoreOpResult::Ok));
459    }
460
461    #[tokio::test]
462    async fn command_adapter_failed_command() {
463        let commands = StoreBackendCommands {
464            get: "echo 'something went wrong' >&2; exit 1".to_string(),
465            put: "true".to_string(),
466            delete: "true".to_string(),
467            list: "echo '[]'".to_string(),
468            prune: None,
469        };
470
471        let adapter = CommandAdapter;
472        let result = adapter
473            .execute(
474                &commands,
475                StoreOp::Get {
476                    store_name: "s".to_string(),
477                    project_id: "".to_string(),
478                    key: "k".to_string(),
479                },
480            )
481            .await;
482        assert!(result.is_err());
483        let err_msg = result.unwrap_err().to_string();
484        assert!(err_msg.contains("provider command failed"));
485        assert!(err_msg.contains("something went wrong"));
486    }
487
488    #[tokio::test]
489    async fn command_adapter_list_with_limit_offset() {
490        // Verify LIMIT and OFFSET env vars are set by using them in the command
491        let commands = StoreBackendCommands {
492            get: "true".to_string(),
493            put: "true".to_string(),
494            delete: "true".to_string(),
495            list: "test \"$LIMIT\" = \"5\" && test \"$OFFSET\" = \"10\" && echo '[]'".to_string(),
496            prune: None,
497        };
498
499        let adapter = CommandAdapter;
500        let result = adapter
501            .execute(
502                &commands,
503                StoreOp::List {
504                    store_name: "s".to_string(),
505                    project_id: "p".to_string(),
506                    limit: 5,
507                    offset: 10,
508                },
509            )
510            .await
511            .expect("list with limit/offset");
512        match result {
513            StoreOpResult::Entries(entries) => assert!(entries.is_empty()),
514            other => panic!("expected Entries([]), got {:?}", other),
515        }
516    }
517}