Skip to main content

greentic_dev/
tests_exec.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use anyhow::{Result, anyhow, bail};
5use async_trait::async_trait;
6use greentic_secrets::{Result as SecretResult, SecretError, SecretsManager};
7use greentic_types::flow::{Flow, Node, Routing};
8use serde_json::{Value as JsonValue, json};
9
10#[derive(Clone)]
11pub struct ExecOptions {
12    pub offline: bool,
13    pub external_enabled: bool,
14    pub mock_external: bool,
15    pub mock_external_payload: JsonValue,
16    pub secrets: Arc<dyn SecretsManager>,
17}
18
19impl ExecOptions {
20    pub fn builder() -> ExecOptionsBuilder {
21        ExecOptionsBuilder::default()
22    }
23}
24
25#[derive(Default)]
26pub struct ExecOptionsBuilder {
27    offline: bool,
28    external_enabled: bool,
29    mock_external: bool,
30    mock_external_payload: JsonValue,
31    seeds: HashMap<String, Vec<u8>>,
32}
33
34impl ExecOptionsBuilder {
35    pub fn offline(mut self, offline: bool) -> Self {
36        self.offline = offline;
37        self
38    }
39
40    pub fn external_enabled(mut self, enabled: bool) -> Self {
41        self.external_enabled = enabled;
42        self
43    }
44
45    pub fn mock_external(mut self, enabled: bool) -> Self {
46        self.mock_external = enabled;
47        self
48    }
49
50    pub fn mock_external_payload(mut self, payload: JsonValue) -> Self {
51        self.mock_external_payload = payload;
52        self
53    }
54
55    pub fn load_seed_file(mut self, path: &std::path::Path) -> Result<Self> {
56        let seeds = crate::secrets_seed::load_seed_file(path)?;
57        self.seeds.extend(seeds);
58        Ok(self)
59    }
60
61    pub fn seed_entry(mut self, key: &str, bytes: &[u8]) -> Self {
62        self.seeds.insert(key.to_string(), bytes.to_vec());
63        self
64    }
65
66    pub fn build(self) -> Result<ExecOptions> {
67        let secrets = MemorySecrets::from_seed_map(&self.seeds);
68        Ok(ExecOptions {
69            offline: self.offline,
70            external_enabled: self.external_enabled,
71            mock_external: self.mock_external,
72            mock_external_payload: self.mock_external_payload,
73            secrets: Arc::new(secrets),
74        })
75    }
76}
77
78#[derive(Clone, Default)]
79pub struct MemorySecrets {
80    inner: Arc<Mutex<HashMap<String, Vec<u8>>>>,
81}
82
83impl MemorySecrets {
84    pub fn new() -> Self {
85        Self::default()
86    }
87
88    pub fn from_seed_map(map: &HashMap<String, Vec<u8>>) -> Self {
89        let mgr = Self::new();
90        {
91            let mut guard = mgr.inner.lock().expect("mutex poisoned");
92            for (k, v) in map {
93                guard.insert(k.clone(), v.clone());
94            }
95        }
96        mgr
97    }
98
99    pub fn insert_str(&self, key: &str, value: &str) {
100        let mut guard = self.inner.lock().expect("mutex poisoned");
101        guard.insert(key.to_string(), value.as_bytes().to_vec());
102    }
103}
104
105#[async_trait]
106impl SecretsManager for MemorySecrets {
107    async fn read(&self, path: &str) -> SecretResult<Vec<u8>> {
108        let guard = self.inner.lock().expect("mutex poisoned");
109        guard
110            .get(path)
111            .cloned()
112            .ok_or_else(|| SecretError::NotFound(path.to_string()))
113    }
114
115    async fn write(&self, path: &str, bytes: &[u8]) -> SecretResult<()> {
116        let mut guard = self.inner.lock().expect("mutex poisoned");
117        guard.insert(path.to_string(), bytes.to_vec());
118        Ok(())
119    }
120
121    async fn delete(&self, path: &str) -> SecretResult<()> {
122        let mut guard = self.inner.lock().expect("mutex poisoned");
123        guard.remove(path);
124        Ok(())
125    }
126}
127
128pub fn execute(flow: &Flow, input: &JsonValue) -> Result<JsonValue> {
129    let opts = ExecOptionsBuilder::default().build()?;
130    execute_with_options(flow, input, &opts)
131}
132
133pub fn execute_with_options(
134    flow: &Flow,
135    input: &JsonValue,
136    opts: &ExecOptions,
137) -> Result<JsonValue> {
138    let nodes: HashMap<_, _> = flow
139        .nodes
140        .iter()
141        .map(|(id, node)| (id.clone(), node.clone()))
142        .collect();
143    let mut current = flow
144        .ingress()
145        .map(|(id, _)| id.clone())
146        .ok_or_else(|| anyhow!("flow has no ingress"))?;
147    let mut payload = input.clone();
148    let mut trace = Vec::new();
149    let mut last_status = String::from("ok");
150
151    loop {
152        let Some(node) = nodes.get(&current) else {
153            bail!("node `{current}` missing");
154        };
155        let (status, next_payload) = exec_node(node, &payload, opts)?;
156        trace.push(json!({
157            "node_id": node.id.as_str(),
158            "component": node.component.id.as_str(),
159            "status": status,
160            "payload": next_payload
161        }));
162        payload = next_payload;
163        if status == "error" || last_status == "error" {
164            last_status = "error".to_string();
165        } else {
166            last_status = status.clone();
167        }
168        current = match &node.routing {
169            Routing::Next { node_id } => node_id.clone(),
170            Routing::Branch { on_status, default } => {
171                if let Some(dest) = on_status.get(&status) {
172                    dest.clone()
173                } else if let Some(def) = default {
174                    def.clone()
175                } else {
176                    bail!("no branch for status `{status}`");
177                }
178            }
179            Routing::End => break,
180            Routing::Reply => break,
181            Routing::Custom(_) => break,
182        };
183    }
184
185    Ok(json!({
186        "status": last_status,
187        "output": payload,
188        "trace": trace,
189    }))
190}
191
192fn exec_node(node: &Node, payload: &JsonValue, opts: &ExecOptions) -> Result<(String, JsonValue)> {
193    let component = node.component.id.as_str();
194    match component {
195        "component.start" => Ok(("ok".into(), payload.clone())),
196        "component.tool.fixed" => {
197            if payload
198                .get("fail")
199                .and_then(|v| v.as_bool())
200                .unwrap_or(false)
201            {
202                Ok((
203                    "error".into(),
204                    json!({
205                        "error": "tool_failed",
206                        "input": payload
207                    }),
208                ))
209            } else {
210                Ok((
211                    "ok".into(),
212                    json!({
213                        "query": payload.get("query").cloned().unwrap_or(JsonValue::Null),
214                        "result": "fixed",
215                        "constant": 42
216                    }),
217                ))
218            }
219        }
220        "component.template" => {
221            let result_value = payload.get("result").cloned().unwrap_or(JsonValue::Null);
222            let result = result_value
223                .as_str()
224                .map(|s| s.to_string())
225                .unwrap_or_else(|| serde_json::to_string(&result_value).unwrap_or_default());
226            Ok((
227                "ok".into(),
228                json!({
229                    "answer": format!("Result: {result}"),
230                    "source": "template",
231                    "input": payload
232                }),
233            ))
234        }
235        "component.error.map" => Ok((
236            "ok".into(),
237            json!({
238                "message": "A friendly error occurred",
239                "details": payload
240            }),
241        )),
242        "component.tool.secret" => {
243            let secret = read_secret(opts, "API_KEY")?;
244            match secret {
245                None => Ok((
246                    "error".into(),
247                    json!({
248                        "error": "missing_secret",
249                        "key": "API_KEY",
250                        "secret_lookup": {
251                            "key": "API_KEY",
252                            "status": "missing"
253                        }
254                    }),
255                )),
256                Some(bytes) => {
257                    let prefix = String::from_utf8_lossy(&bytes);
258                    let prefix = prefix.chars().take(3).collect::<String>();
259                    Ok((
260                        "ok".into(),
261                        json!({
262                            "has_key": true,
263                            "prefix": prefix,
264                            "secret_lookup": {
265                                "key": "API_KEY",
266                                "status": "found"
267                            }
268                        }),
269                    ))
270                }
271            }
272        }
273        "component.tool.external" => {
274            if opts.offline || !opts.external_enabled {
275                return Ok((
276                    "error".into(),
277                    json!({
278                        "error": "external_blocked",
279                        "policy": {
280                            "offline": opts.offline,
281                            "external_enabled": opts.external_enabled,
282                            "mock_external": opts.mock_external,
283                        },
284                        "policy_status": "blocked_by_policy"
285                    }),
286                ));
287            }
288            if opts.mock_external {
289                return Ok((
290                    "ok".into(),
291                    json!({
292                        "policy_status": "mocked_external",
293                        "policy": {
294                            "offline": opts.offline,
295                            "external_enabled": opts.external_enabled,
296                            "mock_external": opts.mock_external,
297                        },
298                        "result": opts.mock_external_payload,
299                    }),
300                ));
301            }
302            Ok((
303                "error".into(),
304                json!({
305                    "error": "real_external_not_supported_in_tests",
306                    "policy_status": "blocked_by_policy",
307                    "policy": {
308                        "offline": opts.offline,
309                        "external_enabled": opts.external_enabled,
310                        "mock_external": opts.mock_external,
311                    }
312                }),
313            ))
314        }
315        _ => bail!("unknown component `{component}`"),
316    }
317}
318
319fn read_secret(opts: &ExecOptions, key: &str) -> Result<Option<Vec<u8>>> {
320    let fut = opts.secrets.read(key);
321    let handle = tokio::runtime::Handle::try_current();
322    let outcome = match handle {
323        Ok(handle) => handle.block_on(fut),
324        Err(_) => {
325            let rt = tokio::runtime::Builder::new_current_thread()
326                .enable_all()
327                .build()?;
328            rt.block_on(fut)
329        }
330    };
331    match outcome {
332        Ok(bytes) => Ok(Some(bytes)),
333        Err(SecretError::NotFound(_)) => Ok(None),
334        Err(other) => Err(anyhow!(other)),
335    }
336}