1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use anyhow::{Result, anyhow, bail};
5use async_trait::async_trait;
6use greentic_secrets::{Result as SecretResult, SecretError, SecretsManager};
7use greentic_types::flow::{Flow, Node, Routing};
8use serde_json::{Value as JsonValue, json};
9
10#[derive(Clone)]
11pub struct ExecOptions {
12 pub offline: bool,
13 pub external_enabled: bool,
14 pub mock_external: bool,
15 pub mock_external_payload: JsonValue,
16 pub secrets: Arc<dyn SecretsManager>,
17}
18
19impl ExecOptions {
20 pub fn builder() -> ExecOptionsBuilder {
21 ExecOptionsBuilder::default()
22 }
23}
24
25#[derive(Default)]
26pub struct ExecOptionsBuilder {
27 offline: bool,
28 external_enabled: bool,
29 mock_external: bool,
30 mock_external_payload: JsonValue,
31 seeds: HashMap<String, Vec<u8>>,
32}
33
34impl ExecOptionsBuilder {
35 pub fn offline(mut self, offline: bool) -> Self {
36 self.offline = offline;
37 self
38 }
39
40 pub fn external_enabled(mut self, enabled: bool) -> Self {
41 self.external_enabled = enabled;
42 self
43 }
44
45 pub fn mock_external(mut self, enabled: bool) -> Self {
46 self.mock_external = enabled;
47 self
48 }
49
50 pub fn mock_external_payload(mut self, payload: JsonValue) -> Self {
51 self.mock_external_payload = payload;
52 self
53 }
54
55 pub fn load_seed_file(mut self, path: &std::path::Path) -> Result<Self> {
56 let seeds = crate::secrets_seed::load_seed_file(path)?;
57 self.seeds.extend(seeds);
58 Ok(self)
59 }
60
61 pub fn seed_entry(mut self, key: &str, bytes: &[u8]) -> Self {
62 self.seeds.insert(key.to_string(), bytes.to_vec());
63 self
64 }
65
66 pub fn build(self) -> Result<ExecOptions> {
67 let secrets = MemorySecrets::from_seed_map(&self.seeds);
68 Ok(ExecOptions {
69 offline: self.offline,
70 external_enabled: self.external_enabled,
71 mock_external: self.mock_external,
72 mock_external_payload: self.mock_external_payload,
73 secrets: Arc::new(secrets),
74 })
75 }
76}
77
78#[derive(Clone, Default)]
79pub struct MemorySecrets {
80 inner: Arc<Mutex<HashMap<String, Vec<u8>>>>,
81}
82
83impl MemorySecrets {
84 pub fn new() -> Self {
85 Self::default()
86 }
87
88 pub fn from_seed_map(map: &HashMap<String, Vec<u8>>) -> Self {
89 let mgr = Self::new();
90 {
91 let mut guard = mgr.inner.lock().expect("mutex poisoned");
92 for (k, v) in map {
93 guard.insert(k.clone(), v.clone());
94 }
95 }
96 mgr
97 }
98
99 pub fn insert_str(&self, key: &str, value: &str) {
100 let mut guard = self.inner.lock().expect("mutex poisoned");
101 guard.insert(key.to_string(), value.as_bytes().to_vec());
102 }
103}
104
105#[async_trait]
106impl SecretsManager for MemorySecrets {
107 async fn read(&self, path: &str) -> SecretResult<Vec<u8>> {
108 let guard = self.inner.lock().expect("mutex poisoned");
109 guard
110 .get(path)
111 .cloned()
112 .ok_or_else(|| SecretError::NotFound(path.to_string()))
113 }
114
115 async fn write(&self, path: &str, bytes: &[u8]) -> SecretResult<()> {
116 let mut guard = self.inner.lock().expect("mutex poisoned");
117 guard.insert(path.to_string(), bytes.to_vec());
118 Ok(())
119 }
120
121 async fn delete(&self, path: &str) -> SecretResult<()> {
122 let mut guard = self.inner.lock().expect("mutex poisoned");
123 guard.remove(path);
124 Ok(())
125 }
126}
127
128pub fn execute(flow: &Flow, input: &JsonValue) -> Result<JsonValue> {
129 let opts = ExecOptionsBuilder::default().build()?;
130 execute_with_options(flow, input, &opts)
131}
132
133pub fn execute_with_options(
134 flow: &Flow,
135 input: &JsonValue,
136 opts: &ExecOptions,
137) -> Result<JsonValue> {
138 let nodes: HashMap<_, _> = flow
139 .nodes
140 .iter()
141 .map(|(id, node)| (id.clone(), node.clone()))
142 .collect();
143 let mut current = flow
144 .ingress()
145 .map(|(id, _)| id.clone())
146 .ok_or_else(|| anyhow!("flow has no ingress"))?;
147 let mut payload = input.clone();
148 let mut trace = Vec::new();
149 let mut last_status = String::from("ok");
150
151 loop {
152 let Some(node) = nodes.get(¤t) else {
153 bail!("node `{current}` missing");
154 };
155 let (status, next_payload) = exec_node(node, &payload, opts)?;
156 trace.push(json!({
157 "node_id": node.id.as_str(),
158 "component": node.component.id.as_str(),
159 "status": status,
160 "payload": next_payload
161 }));
162 payload = next_payload;
163 if status == "error" || last_status == "error" {
164 last_status = "error".to_string();
165 } else {
166 last_status = status.clone();
167 }
168 current = match &node.routing {
169 Routing::Next { node_id } => node_id.clone(),
170 Routing::Branch { on_status, default } => {
171 if let Some(dest) = on_status.get(&status) {
172 dest.clone()
173 } else if let Some(def) = default {
174 def.clone()
175 } else {
176 bail!("no branch for status `{status}`");
177 }
178 }
179 Routing::End => break,
180 Routing::Reply => break,
181 Routing::Custom(_) => break,
182 };
183 }
184
185 Ok(json!({
186 "status": last_status,
187 "output": payload,
188 "trace": trace,
189 }))
190}
191
192fn exec_node(node: &Node, payload: &JsonValue, opts: &ExecOptions) -> Result<(String, JsonValue)> {
193 let component = node.component.id.as_str();
194 match component {
195 "component.start" => Ok(("ok".into(), payload.clone())),
196 "component.tool.fixed" => {
197 if payload
198 .get("fail")
199 .and_then(|v| v.as_bool())
200 .unwrap_or(false)
201 {
202 Ok((
203 "error".into(),
204 json!({
205 "error": "tool_failed",
206 "input": payload
207 }),
208 ))
209 } else {
210 Ok((
211 "ok".into(),
212 json!({
213 "query": payload.get("query").cloned().unwrap_or(JsonValue::Null),
214 "result": "fixed",
215 "constant": 42
216 }),
217 ))
218 }
219 }
220 "component.template" => {
221 let result_value = payload.get("result").cloned().unwrap_or(JsonValue::Null);
222 let result = result_value
223 .as_str()
224 .map(|s| s.to_string())
225 .unwrap_or_else(|| serde_json::to_string(&result_value).unwrap_or_default());
226 Ok((
227 "ok".into(),
228 json!({
229 "answer": format!("Result: {result}"),
230 "source": "template",
231 "input": payload
232 }),
233 ))
234 }
235 "component.error.map" => Ok((
236 "ok".into(),
237 json!({
238 "message": "A friendly error occurred",
239 "details": payload
240 }),
241 )),
242 "component.tool.secret" => {
243 let secret = read_secret(opts, "API_KEY")?;
244 match secret {
245 None => Ok((
246 "error".into(),
247 json!({
248 "error": "missing_secret",
249 "key": "API_KEY",
250 "secret_lookup": {
251 "key": "API_KEY",
252 "status": "missing"
253 }
254 }),
255 )),
256 Some(bytes) => {
257 let prefix = String::from_utf8_lossy(&bytes);
258 let prefix = prefix.chars().take(3).collect::<String>();
259 Ok((
260 "ok".into(),
261 json!({
262 "has_key": true,
263 "prefix": prefix,
264 "secret_lookup": {
265 "key": "API_KEY",
266 "status": "found"
267 }
268 }),
269 ))
270 }
271 }
272 }
273 "component.tool.external" => {
274 if opts.offline || !opts.external_enabled {
275 return Ok((
276 "error".into(),
277 json!({
278 "error": "external_blocked",
279 "policy": {
280 "offline": opts.offline,
281 "external_enabled": opts.external_enabled,
282 "mock_external": opts.mock_external,
283 },
284 "policy_status": "blocked_by_policy"
285 }),
286 ));
287 }
288 if opts.mock_external {
289 return Ok((
290 "ok".into(),
291 json!({
292 "policy_status": "mocked_external",
293 "policy": {
294 "offline": opts.offline,
295 "external_enabled": opts.external_enabled,
296 "mock_external": opts.mock_external,
297 },
298 "result": opts.mock_external_payload,
299 }),
300 ));
301 }
302 Ok((
303 "error".into(),
304 json!({
305 "error": "real_external_not_supported_in_tests",
306 "policy_status": "blocked_by_policy",
307 "policy": {
308 "offline": opts.offline,
309 "external_enabled": opts.external_enabled,
310 "mock_external": opts.mock_external,
311 }
312 }),
313 ))
314 }
315 _ => bail!("unknown component `{component}`"),
316 }
317}
318
319fn read_secret(opts: &ExecOptions, key: &str) -> Result<Option<Vec<u8>>> {
320 let fut = opts.secrets.read(key);
321 let handle = tokio::runtime::Handle::try_current();
322 let outcome = match handle {
323 Ok(handle) => handle.block_on(fut),
324 Err(_) => {
325 let rt = tokio::runtime::Builder::new_current_thread()
326 .enable_all()
327 .build()?;
328 rt.block_on(fut)
329 }
330 };
331 match outcome {
332 Ok(bytes) => Ok(Some(bytes)),
333 Err(SecretError::NotFound(_)) => Ok(None),
334 Err(other) => Err(anyhow!(other)),
335 }
336}