greentic_dev/
tests_exec.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use anyhow::{Result, anyhow, bail};
5use async_trait::async_trait;
6use greentic_secrets::{Result as SecretResult, SecretError, SecretsManager};
7use greentic_types::flow::{Flow, Node, Routing};
8use serde_json::{Value as JsonValue, json};
9
10#[derive(Clone)]
11pub struct ExecOptions {
12    pub offline: bool,
13    pub external_enabled: bool,
14    pub mock_external: bool,
15    pub mock_external_payload: JsonValue,
16    pub secrets: Arc<dyn SecretsManager>,
17}
18
19impl ExecOptions {
20    pub fn builder() -> ExecOptionsBuilder {
21        ExecOptionsBuilder::default()
22    }
23}
24
25#[derive(Default)]
26pub struct ExecOptionsBuilder {
27    offline: bool,
28    external_enabled: bool,
29    mock_external: bool,
30    mock_external_payload: JsonValue,
31    secrets_env_prefix: String,
32}
33
34impl ExecOptionsBuilder {
35    pub fn offline(mut self, offline: bool) -> Self {
36        self.offline = offline;
37        self
38    }
39
40    pub fn external_enabled(mut self, enabled: bool) -> Self {
41        self.external_enabled = enabled;
42        self
43    }
44
45    pub fn mock_external(mut self, enabled: bool) -> Self {
46        self.mock_external = enabled;
47        self
48    }
49
50    pub fn mock_external_payload(mut self, payload: JsonValue) -> Self {
51        self.mock_external_payload = payload;
52        self
53    }
54
55    pub fn secrets_env_prefix(mut self, prefix: &str) -> Self {
56        self.secrets_env_prefix = prefix.to_string();
57        self
58    }
59
60    pub fn build(self) -> ExecOptions {
61        let secrets = MemorySecrets::from_env_prefix(&self.secrets_env_prefix);
62        ExecOptions {
63            offline: self.offline,
64            external_enabled: self.external_enabled,
65            mock_external: self.mock_external,
66            mock_external_payload: self.mock_external_payload,
67            secrets: Arc::new(secrets),
68        }
69    }
70}
71
72#[derive(Clone, Default)]
73pub struct MemorySecrets {
74    inner: Arc<Mutex<HashMap<String, Vec<u8>>>>,
75}
76
77impl MemorySecrets {
78    pub fn new() -> Self {
79        Self::default()
80    }
81
82    pub fn insert_str(&self, key: &str, value: &str) {
83        let mut guard = self.inner.lock().expect("mutex poisoned");
84        guard.insert(key.to_string(), value.as_bytes().to_vec());
85    }
86
87    pub fn from_env_prefix(prefix: &str) -> Self {
88        let mgr = Self::new();
89        if prefix.is_empty() {
90            return mgr;
91        }
92        for (k, v) in std::env::vars() {
93            if let Some(stripped) = k.strip_prefix(prefix) {
94                mgr.insert_str(stripped, &v);
95            }
96        }
97        mgr
98    }
99}
100
101#[async_trait]
102impl SecretsManager for MemorySecrets {
103    async fn read(&self, path: &str) -> SecretResult<Vec<u8>> {
104        let guard = self.inner.lock().expect("mutex poisoned");
105        guard
106            .get(path)
107            .cloned()
108            .ok_or_else(|| SecretError::NotFound(path.to_string()))
109    }
110
111    async fn write(&self, path: &str, bytes: &[u8]) -> SecretResult<()> {
112        let mut guard = self.inner.lock().expect("mutex poisoned");
113        guard.insert(path.to_string(), bytes.to_vec());
114        Ok(())
115    }
116
117    async fn delete(&self, path: &str) -> SecretResult<()> {
118        let mut guard = self.inner.lock().expect("mutex poisoned");
119        guard.remove(path);
120        Ok(())
121    }
122}
123
124pub fn execute(flow: &Flow, input: &JsonValue) -> Result<JsonValue> {
125    let opts = ExecOptionsBuilder::default().build();
126    execute_with_options(flow, input, &opts)
127}
128
129pub fn execute_with_options(
130    flow: &Flow,
131    input: &JsonValue,
132    opts: &ExecOptions,
133) -> Result<JsonValue> {
134    let nodes: HashMap<_, _> = flow
135        .nodes
136        .iter()
137        .map(|(id, node)| (id.clone(), node.clone()))
138        .collect();
139    let mut current = flow
140        .ingress()
141        .map(|(id, _)| id.clone())
142        .ok_or_else(|| anyhow!("flow has no ingress"))?;
143    let mut payload = input.clone();
144    let mut trace = Vec::new();
145    let mut last_status = String::from("ok");
146
147    loop {
148        let Some(node) = nodes.get(&current) else {
149            bail!("node `{current}` missing");
150        };
151        let (status, next_payload) = exec_node(node, &payload, opts)?;
152        trace.push(json!({
153            "node_id": node.id.as_str(),
154            "component": node.component.id.as_str(),
155            "status": status,
156            "payload": next_payload
157        }));
158        payload = next_payload;
159        if status == "error" || last_status == "error" {
160            last_status = "error".to_string();
161        } else {
162            last_status = status.clone();
163        }
164        current = match &node.routing {
165            Routing::Next { node_id } => node_id.clone(),
166            Routing::Branch { on_status, default } => {
167                if let Some(dest) = on_status.get(&status) {
168                    dest.clone()
169                } else if let Some(def) = default {
170                    def.clone()
171                } else {
172                    bail!("no branch for status `{status}`");
173                }
174            }
175            Routing::End => break,
176            Routing::Reply => break,
177            Routing::Custom(_) => break,
178        };
179    }
180
181    Ok(json!({
182        "status": last_status,
183        "output": payload,
184        "trace": trace,
185    }))
186}
187
188fn exec_node(node: &Node, payload: &JsonValue, opts: &ExecOptions) -> Result<(String, JsonValue)> {
189    let component = node.component.id.as_str();
190    match component {
191        "component.start" => Ok(("ok".into(), payload.clone())),
192        "component.tool.fixed" => {
193            if payload
194                .get("fail")
195                .and_then(|v| v.as_bool())
196                .unwrap_or(false)
197            {
198                Ok((
199                    "error".into(),
200                    json!({
201                        "error": "tool_failed",
202                        "input": payload
203                    }),
204                ))
205            } else {
206                Ok((
207                    "ok".into(),
208                    json!({
209                        "query": payload.get("query").cloned().unwrap_or(JsonValue::Null),
210                        "result": "fixed",
211                        "constant": 42
212                    }),
213                ))
214            }
215        }
216        "component.template" => {
217            let result_value = payload.get("result").cloned().unwrap_or(JsonValue::Null);
218            let result = result_value
219                .as_str()
220                .map(|s| s.to_string())
221                .unwrap_or_else(|| serde_json::to_string(&result_value).unwrap_or_default());
222            Ok((
223                "ok".into(),
224                json!({
225                    "answer": format!("Result: {result}"),
226                    "source": "template",
227                    "input": payload
228                }),
229            ))
230        }
231        "component.error.map" => Ok((
232            "ok".into(),
233            json!({
234                "message": "A friendly error occurred",
235                "details": payload
236            }),
237        )),
238        "component.tool.secret" => {
239            let secret = read_secret(opts, "API_KEY")?;
240            match secret {
241                None => Ok((
242                    "error".into(),
243                    json!({
244                        "error": "missing_secret",
245                        "key": "API_KEY",
246                        "secret_lookup": {
247                            "key": "API_KEY",
248                            "status": "missing"
249                        }
250                    }),
251                )),
252                Some(bytes) => {
253                    let prefix = String::from_utf8_lossy(&bytes);
254                    let prefix = prefix.chars().take(3).collect::<String>();
255                    Ok((
256                        "ok".into(),
257                        json!({
258                            "has_key": true,
259                            "prefix": prefix,
260                            "secret_lookup": {
261                                "key": "API_KEY",
262                                "status": "found"
263                            }
264                        }),
265                    ))
266                }
267            }
268        }
269        "component.tool.external" => {
270            if opts.offline || !opts.external_enabled {
271                return Ok((
272                    "error".into(),
273                    json!({
274                        "error": "external_blocked",
275                        "policy": {
276                            "offline": opts.offline,
277                            "external_enabled": opts.external_enabled,
278                            "mock_external": opts.mock_external,
279                        },
280                        "policy_status": "blocked_by_policy"
281                    }),
282                ));
283            }
284            if opts.mock_external {
285                return Ok((
286                    "ok".into(),
287                    json!({
288                        "policy_status": "mocked_external",
289                        "policy": {
290                            "offline": opts.offline,
291                            "external_enabled": opts.external_enabled,
292                            "mock_external": opts.mock_external,
293                        },
294                        "result": opts.mock_external_payload,
295                    }),
296                ));
297            }
298            Ok((
299                "error".into(),
300                json!({
301                    "error": "real_external_not_supported_in_tests",
302                    "policy_status": "blocked_by_policy",
303                    "policy": {
304                        "offline": opts.offline,
305                        "external_enabled": opts.external_enabled,
306                        "mock_external": opts.mock_external,
307                    }
308                }),
309            ))
310        }
311        _ => bail!("unknown component `{component}`"),
312    }
313}
314
315fn read_secret(opts: &ExecOptions, key: &str) -> Result<Option<Vec<u8>>> {
316    let fut = opts.secrets.read(key);
317    let handle = tokio::runtime::Handle::try_current();
318    let outcome = match handle {
319        Ok(handle) => handle.block_on(fut),
320        Err(_) => {
321            let rt = tokio::runtime::Builder::new_current_thread()
322                .enable_all()
323                .build()?;
324            rt.block_on(fut)
325        }
326    };
327    match outcome {
328        Ok(bytes) => Ok(Some(bytes)),
329        Err(SecretError::NotFound(_)) => Ok(None),
330        Err(other) => Err(anyhow!(other)),
331    }
332}