1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use anyhow::{Result, anyhow, bail};
5use async_trait::async_trait;
6use greentic_secrets::{Result as SecretResult, SecretError, SecretsManager};
7use greentic_types::flow::{Flow, Node, Routing};
8use serde_json::{Value as JsonValue, json};
9
10#[derive(Clone)]
11pub struct ExecOptions {
12 pub offline: bool,
13 pub external_enabled: bool,
14 pub mock_external: bool,
15 pub mock_external_payload: JsonValue,
16 pub secrets: Arc<dyn SecretsManager>,
17}
18
19impl ExecOptions {
20 pub fn builder() -> ExecOptionsBuilder {
21 ExecOptionsBuilder::default()
22 }
23}
24
25#[derive(Default)]
26pub struct ExecOptionsBuilder {
27 offline: bool,
28 external_enabled: bool,
29 mock_external: bool,
30 mock_external_payload: JsonValue,
31 secrets_env_prefix: String,
32}
33
34impl ExecOptionsBuilder {
35 pub fn offline(mut self, offline: bool) -> Self {
36 self.offline = offline;
37 self
38 }
39
40 pub fn external_enabled(mut self, enabled: bool) -> Self {
41 self.external_enabled = enabled;
42 self
43 }
44
45 pub fn mock_external(mut self, enabled: bool) -> Self {
46 self.mock_external = enabled;
47 self
48 }
49
50 pub fn mock_external_payload(mut self, payload: JsonValue) -> Self {
51 self.mock_external_payload = payload;
52 self
53 }
54
55 pub fn secrets_env_prefix(mut self, prefix: &str) -> Self {
56 self.secrets_env_prefix = prefix.to_string();
57 self
58 }
59
60 pub fn build(self) -> ExecOptions {
61 let secrets = MemorySecrets::from_env_prefix(&self.secrets_env_prefix);
62 ExecOptions {
63 offline: self.offline,
64 external_enabled: self.external_enabled,
65 mock_external: self.mock_external,
66 mock_external_payload: self.mock_external_payload,
67 secrets: Arc::new(secrets),
68 }
69 }
70}
71
72#[derive(Clone, Default)]
73pub struct MemorySecrets {
74 inner: Arc<Mutex<HashMap<String, Vec<u8>>>>,
75}
76
77impl MemorySecrets {
78 pub fn new() -> Self {
79 Self::default()
80 }
81
82 pub fn insert_str(&self, key: &str, value: &str) {
83 let mut guard = self.inner.lock().expect("mutex poisoned");
84 guard.insert(key.to_string(), value.as_bytes().to_vec());
85 }
86
87 pub fn from_env_prefix(prefix: &str) -> Self {
88 let mgr = Self::new();
89 if prefix.is_empty() {
90 return mgr;
91 }
92 for (k, v) in std::env::vars() {
93 if let Some(stripped) = k.strip_prefix(prefix) {
94 mgr.insert_str(stripped, &v);
95 }
96 }
97 mgr
98 }
99}
100
101#[async_trait]
102impl SecretsManager for MemorySecrets {
103 async fn read(&self, path: &str) -> SecretResult<Vec<u8>> {
104 let guard = self.inner.lock().expect("mutex poisoned");
105 guard
106 .get(path)
107 .cloned()
108 .ok_or_else(|| SecretError::NotFound(path.to_string()))
109 }
110
111 async fn write(&self, path: &str, bytes: &[u8]) -> SecretResult<()> {
112 let mut guard = self.inner.lock().expect("mutex poisoned");
113 guard.insert(path.to_string(), bytes.to_vec());
114 Ok(())
115 }
116
117 async fn delete(&self, path: &str) -> SecretResult<()> {
118 let mut guard = self.inner.lock().expect("mutex poisoned");
119 guard.remove(path);
120 Ok(())
121 }
122}
123
124pub fn execute(flow: &Flow, input: &JsonValue) -> Result<JsonValue> {
125 let opts = ExecOptionsBuilder::default().build();
126 execute_with_options(flow, input, &opts)
127}
128
129pub fn execute_with_options(
130 flow: &Flow,
131 input: &JsonValue,
132 opts: &ExecOptions,
133) -> Result<JsonValue> {
134 let nodes: HashMap<_, _> = flow
135 .nodes
136 .iter()
137 .map(|(id, node)| (id.clone(), node.clone()))
138 .collect();
139 let mut current = flow
140 .ingress()
141 .map(|(id, _)| id.clone())
142 .ok_or_else(|| anyhow!("flow has no ingress"))?;
143 let mut payload = input.clone();
144 let mut trace = Vec::new();
145 let mut last_status = String::from("ok");
146
147 loop {
148 let Some(node) = nodes.get(¤t) else {
149 bail!("node `{current}` missing");
150 };
151 let (status, next_payload) = exec_node(node, &payload, opts)?;
152 trace.push(json!({
153 "node_id": node.id.as_str(),
154 "component": node.component.id.as_str(),
155 "status": status,
156 "payload": next_payload
157 }));
158 payload = next_payload;
159 if status == "error" || last_status == "error" {
160 last_status = "error".to_string();
161 } else {
162 last_status = status.clone();
163 }
164 current = match &node.routing {
165 Routing::Next { node_id } => node_id.clone(),
166 Routing::Branch { on_status, default } => {
167 if let Some(dest) = on_status.get(&status) {
168 dest.clone()
169 } else if let Some(def) = default {
170 def.clone()
171 } else {
172 bail!("no branch for status `{status}`");
173 }
174 }
175 Routing::End => break,
176 Routing::Reply => break,
177 Routing::Custom(_) => break,
178 };
179 }
180
181 Ok(json!({
182 "status": last_status,
183 "output": payload,
184 "trace": trace,
185 }))
186}
187
188fn exec_node(node: &Node, payload: &JsonValue, opts: &ExecOptions) -> Result<(String, JsonValue)> {
189 let component = node.component.id.as_str();
190 match component {
191 "component.start" => Ok(("ok".into(), payload.clone())),
192 "component.tool.fixed" => {
193 if payload
194 .get("fail")
195 .and_then(|v| v.as_bool())
196 .unwrap_or(false)
197 {
198 Ok((
199 "error".into(),
200 json!({
201 "error": "tool_failed",
202 "input": payload
203 }),
204 ))
205 } else {
206 Ok((
207 "ok".into(),
208 json!({
209 "query": payload.get("query").cloned().unwrap_or(JsonValue::Null),
210 "result": "fixed",
211 "constant": 42
212 }),
213 ))
214 }
215 }
216 "component.template" => {
217 let result_value = payload.get("result").cloned().unwrap_or(JsonValue::Null);
218 let result = result_value
219 .as_str()
220 .map(|s| s.to_string())
221 .unwrap_or_else(|| serde_json::to_string(&result_value).unwrap_or_default());
222 Ok((
223 "ok".into(),
224 json!({
225 "answer": format!("Result: {result}"),
226 "source": "template",
227 "input": payload
228 }),
229 ))
230 }
231 "component.error.map" => Ok((
232 "ok".into(),
233 json!({
234 "message": "A friendly error occurred",
235 "details": payload
236 }),
237 )),
238 "component.tool.secret" => {
239 let secret = read_secret(opts, "API_KEY")?;
240 match secret {
241 None => Ok((
242 "error".into(),
243 json!({
244 "error": "missing_secret",
245 "key": "API_KEY",
246 "secret_lookup": {
247 "key": "API_KEY",
248 "status": "missing"
249 }
250 }),
251 )),
252 Some(bytes) => {
253 let prefix = String::from_utf8_lossy(&bytes);
254 let prefix = prefix.chars().take(3).collect::<String>();
255 Ok((
256 "ok".into(),
257 json!({
258 "has_key": true,
259 "prefix": prefix,
260 "secret_lookup": {
261 "key": "API_KEY",
262 "status": "found"
263 }
264 }),
265 ))
266 }
267 }
268 }
269 "component.tool.external" => {
270 if opts.offline || !opts.external_enabled {
271 return Ok((
272 "error".into(),
273 json!({
274 "error": "external_blocked",
275 "policy": {
276 "offline": opts.offline,
277 "external_enabled": opts.external_enabled,
278 "mock_external": opts.mock_external,
279 },
280 "policy_status": "blocked_by_policy"
281 }),
282 ));
283 }
284 if opts.mock_external {
285 return Ok((
286 "ok".into(),
287 json!({
288 "policy_status": "mocked_external",
289 "policy": {
290 "offline": opts.offline,
291 "external_enabled": opts.external_enabled,
292 "mock_external": opts.mock_external,
293 },
294 "result": opts.mock_external_payload,
295 }),
296 ));
297 }
298 Ok((
299 "error".into(),
300 json!({
301 "error": "real_external_not_supported_in_tests",
302 "policy_status": "blocked_by_policy",
303 "policy": {
304 "offline": opts.offline,
305 "external_enabled": opts.external_enabled,
306 "mock_external": opts.mock_external,
307 }
308 }),
309 ))
310 }
311 _ => bail!("unknown component `{component}`"),
312 }
313}
314
315fn read_secret(opts: &ExecOptions, key: &str) -> Result<Option<Vec<u8>>> {
316 let fut = opts.secrets.read(key);
317 let handle = tokio::runtime::Handle::try_current();
318 let outcome = match handle {
319 Ok(handle) => handle.block_on(fut),
320 Err(_) => {
321 let rt = tokio::runtime::Builder::new_current_thread()
322 .enable_all()
323 .build()?;
324 rt.block_on(fut)
325 }
326 };
327 match outcome {
328 Ok(bytes) => Ok(Some(bytes)),
329 Err(SecretError::NotFound(_)) => Ok(None),
330 Err(other) => Err(anyhow!(other)),
331 }
332}