greentic_dev/
flow_cmd.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3
4use crate::cli::{ConfigFlowModeArg, FlowAddStepArgs};
5use crate::component_add::run_component_add;
6use crate::pack_init::PackInitIntent;
7use crate::path_safety::normalize_under_root;
8use anyhow::{Context, Result, anyhow, bail};
9use greentic_flow::flow_bundle::load_and_validate_bundle;
10use serde_json::Value as JsonValue;
11use serde_yaml_bw as serde_yaml;
12use std::io::Write;
13use std::str::FromStr;
14use std::{io, io::IsTerminal};
15use tempfile::NamedTempFile;
16
17use greentic_types::FlowId;
18use greentic_types::component::ComponentManifest;
19
20const DEFAULT_CONFIG_FLOW_TYPE: &str = "component-config";
21
22pub fn validate(path: &Path, compact_json: bool) -> Result<()> {
23    let root = std::env::current_dir()
24        .context("failed to resolve workspace root")?
25        .canonicalize()
26        .context("failed to canonicalize workspace root")?;
27    let safe = normalize_under_root(&root, path)?;
28    let source = fs::read_to_string(&safe)
29        .with_context(|| format!("failed to read flow definition at {}", safe.display()))?;
30
31    let bundle = load_and_validate_bundle(&source, Some(&safe)).with_context(|| {
32        format!(
33            "flow validation failed for {} using greentic-flow",
34            safe.display()
35        )
36    })?;
37
38    let serialized = if compact_json {
39        serde_json::to_string(&bundle)?
40    } else {
41        serde_json::to_string_pretty(&bundle)?
42    };
43
44    println!("{serialized}");
45    Ok(())
46}
47
48pub fn render_config_flow_yaml(config_flow_id: &str, graph: &JsonValue) -> Result<String> {
49    let mut graph_obj = graph
50        .as_object()
51        .cloned()
52        .ok_or_else(|| anyhow!("config flow `{config_flow_id}` graph is not an object"))?;
53    let ty = graph_obj
54        .entry("type".to_string())
55        .or_insert(JsonValue::String(DEFAULT_CONFIG_FLOW_TYPE.to_string()));
56    if !ty.is_string() {
57        *ty = JsonValue::String(DEFAULT_CONFIG_FLOW_TYPE.to_string());
58    }
59    let normalized = JsonValue::Object(graph_obj);
60    serde_yaml::to_string(&normalized).context("failed to render config flow graph to YAML")
61}
62
63pub fn run_add_step(args: FlowAddStepArgs) -> Result<()> {
64    let manifest_path = args
65        .manifest
66        .clone()
67        .unwrap_or_else(|| PathBuf::from("component.manifest.json"));
68    if !manifest_path.exists() {
69        bail!(
70            "component.manifest.json not found at {}. Use --manifest to point to the manifest file.",
71            manifest_path.display()
72        );
73    }
74    let manifest_raw = std::fs::read_to_string(&manifest_path)
75        .with_context(|| format!("failed to read {}", manifest_path.display()))?;
76    let mut manifest_value: JsonValue = serde_json::from_str(&manifest_raw).with_context(|| {
77        format!(
78            "failed to parse component manifest JSON at {}",
79            manifest_path.display()
80        )
81    })?;
82    normalize_manifest(&mut manifest_value);
83    let manifest: ComponentManifest =
84        serde_json::from_value(manifest_value).with_context(|| {
85            format!(
86                "failed to parse component manifest JSON at {}",
87                manifest_path.display()
88            )
89        })?;
90
91    let config_flow_id = match args.mode {
92        Some(ConfigFlowModeArg::Custom) => "custom".to_string(),
93        Some(ConfigFlowModeArg::Default) => "default".to_string(),
94        None => args.flow.clone(),
95    };
96    let config_flow_key = FlowId::from_str(&config_flow_id).map_err(|_| {
97        anyhow!(
98            "invalid flow identifier `{}`; flow ids must be valid FlowId strings",
99            config_flow_id
100        )
101    })?;
102    let Some(config_flow) = manifest.dev_flows.get(&config_flow_key) else {
103        bail!(
104            "Flow '{}' is missing from manifest.dev_flows. Run `greentic-component flow update` to regenerate config flows.",
105            config_flow_id
106        );
107    };
108
109    let coord = args
110        .coordinate
111        .ok_or_else(|| anyhow!("component coordinate is required (pass --coordinate)"))?;
112
113    // Ensure the component is available locally (fetch if needed).
114    let _bundle_dir = resolve_component_bundle(&coord, args.profile.as_deref())?;
115
116    // Render the dev flow graph to YAML so the existing runner can consume it.
117    let config_flow_yaml = render_config_flow_yaml(&config_flow_id, &config_flow.graph)?;
118    let mut temp_flow =
119        NamedTempFile::new().context("failed to create temporary config flow file")?;
120    temp_flow
121        .write_all(config_flow_yaml.as_bytes())
122        .context("failed to write temporary config flow")?;
123    temp_flow.flush()?;
124
125    let pack_flow_path = PathBuf::from("flows").join(format!("{}.ygtc", args.flow_id));
126    if !pack_flow_path.exists() {
127        bail!(
128            "Pack flow '{}' not found at {}",
129            args.flow_id,
130            pack_flow_path.display()
131        );
132    }
133    let pack_flow_raw = std::fs::read_to_string(&pack_flow_path)
134        .with_context(|| format!("failed to read pack flow {}", pack_flow_path.display()))?;
135    let mut pack_flow_json: JsonValue = serde_yaml::from_str(&pack_flow_raw)
136        .with_context(|| format!("invalid YAML in {}", pack_flow_path.display()))?;
137
138    let output = crate::pack_run::run_config_flow(temp_flow.path())
139        .with_context(|| format!("failed to run config flow {}", config_flow_id))?;
140    let (node_id, mut node) = parse_config_flow_output(&output)?;
141    normalize_node(&node_id, &mut node, &manifest)?;
142    let after = args
143        .after
144        .clone()
145        .or_else(|| prompt_routing_target(&pack_flow_json));
146    if let Some(after) = after.as_deref() {
147        patch_placeholder_routing(&mut node, after);
148    }
149
150    let graph_obj = pack_flow_json
151        .as_object_mut()
152        .ok_or_else(|| anyhow!("pack flow {} is not a mapping", pack_flow_path.display()))?;
153
154    // Update target pack flow JSON
155    let nodes = graph_obj
156        .get_mut("nodes")
157        .and_then(|n| n.as_object_mut())
158        .ok_or_else(|| anyhow!("flow `{}` missing nodes map", args.flow_id))?;
159    nodes.insert(node_id.clone(), node);
160
161    if let Some(after) = args.after {
162        append_routing(graph_obj, &after, &node_id)?;
163    } else if let Some(after) = after.as_deref() {
164        append_routing(graph_obj, after, &node_id)?;
165    }
166
167    let rendered =
168        serde_yaml::to_string(&pack_flow_json).context("failed to render updated pack flow")?;
169    std::fs::write(&pack_flow_path, rendered)
170        .with_context(|| format!("failed to write {}", pack_flow_path.display()))?;
171
172    println!(
173        "Added node `{}` from config flow {} to {}",
174        node_id,
175        config_flow_id,
176        pack_flow_path.display()
177    );
178    Ok(())
179}
180
181fn normalize_node(node_id: &str, node: &mut JsonValue, manifest: &ComponentManifest) -> Result<()> {
182    let Some(obj) = node.as_object_mut() else {
183        return Ok(());
184    };
185
186    // If the config flow forgot to populate component.exec details, inject sensible defaults so the
187    // generated pack flow can execute.
188    if let Some(exec) = obj
189        .get_mut("component.exec")
190        .and_then(|v| v.as_object_mut())
191    {
192        exec.entry("component")
193            .or_insert_with(|| JsonValue::String(manifest.id.to_string()));
194
195        let op_missing = match exec.get("operation") {
196            Some(JsonValue::String(s)) => s.trim().is_empty(),
197            None => true,
198            _ => false,
199        };
200        if op_missing {
201            let op = manifest
202                .operations
203                .first()
204                .map(|o| o.name.clone())
205                .ok_or_else(|| {
206                    anyhow!(
207                        "config flow emitted component.exec without operation and manifest has none"
208                    )
209                })?;
210            exec.insert("operation".to_string(), JsonValue::String(op.clone()));
211            exec.entry("op").or_insert(JsonValue::String(op));
212        } else if let Some(JsonValue::String(op)) = exec.get("operation").cloned() {
213            exec.entry("op").or_insert(JsonValue::String(op));
214        }
215    }
216
217    if let Some(tool) = obj.get("tool").and_then(|v| v.as_object()) {
218        let Some(comp) = tool.get("component").and_then(|v| v.as_str()) else {
219            bail!("config flow output tool is missing `component`");
220        };
221        // Without an operation, we cannot produce a valid component.exec node.
222        // Surface a clear error so users know to regenerate config flows or add an operation.
223        bail!(
224            "config flow emitted invalid step `{}`: component `{}` has no operations; regenerate config flows with a component version that declares operations",
225            node_id,
226            comp
227        );
228    }
229
230    Ok(())
231}
232
233fn normalize_manifest(manifest: &mut JsonValue) {
234    let Some(obj) = manifest.as_object_mut() else {
235        return;
236    };
237    if let Some(ops) = obj.get_mut("operations")
238        && let Some(arr) = ops.as_array_mut()
239    {
240        let mut normalized = Vec::with_capacity(arr.len());
241        for entry in arr.drain(..) {
242            if let Some(s) = entry.as_str() {
243                let mut map = serde_json::Map::new();
244                map.insert("name".to_string(), JsonValue::String(s.to_string()));
245                normalized.push(JsonValue::Object(map));
246            } else {
247                normalized.push(entry);
248            }
249        }
250        *arr = normalized;
251    }
252}
253
254fn prompt_routing_target(flow_doc: &JsonValue) -> Option<String> {
255    if !io::stdout().is_terminal() {
256        return None;
257    }
258    let nodes = flow_doc
259        .as_object()
260        .and_then(|m| m.get("nodes"))
261        .and_then(|n| n.as_object())?;
262    let mut keys: Vec<String> = nodes.keys().cloned().collect();
263    keys.sort();
264    if keys.is_empty() {
265        return None;
266    }
267
268    println!("Select where to route from (empty to skip):");
269    for (idx, key) in keys.iter().enumerate() {
270        println!("  {}) {}", idx + 1, key);
271    }
272    print!("Choice: ");
273    let _ = io::stdout().flush();
274    let mut buf = String::new();
275    if io::stdin().read_line(&mut buf).is_err() {
276        return None;
277    }
278    let choice = buf.trim();
279    if choice.is_empty() {
280        return None;
281    }
282    if let Ok(idx) = choice.parse::<usize>()
283        && idx >= 1
284        && idx <= keys.len()
285    {
286        return Some(keys[idx - 1].clone());
287    }
288    None
289}
290
291fn patch_placeholder_routing(node: &mut JsonValue, next: &str) {
292    let Some(map) = node.as_object_mut() else {
293        return;
294    };
295    let Some(routing) = map.get_mut("routing") else {
296        return;
297    };
298    let Some(routes) = routing.as_array_mut() else {
299        return;
300    };
301    for entry in routes.iter_mut() {
302        if let Some(JsonValue::String(to)) =
303            entry.as_object_mut().and_then(|route| route.get_mut("to"))
304            && to == "NEXT_NODE_PLACEHOLDER"
305        {
306            *to = next.to_string();
307        }
308    }
309}
310
311fn resolve_component_bundle(coordinate: &str, profile: Option<&str>) -> Result<PathBuf> {
312    let path = PathBuf::from_str(coordinate).unwrap_or_default();
313    if path.exists() {
314        return Ok(path);
315    }
316    let dir = run_component_add(coordinate, profile, PackInitIntent::Dev)?;
317    Ok(dir)
318}
319
320pub fn parse_config_flow_output(output: &str) -> Result<(String, JsonValue)> {
321    let value: JsonValue =
322        serde_json::from_str(output).context("config flow output is not valid JSON")?;
323    let obj = value
324        .as_object()
325        .ok_or_else(|| anyhow!("config flow output must be a JSON object"))?;
326    let node_id = obj
327        .get("node_id")
328        .and_then(|v| v.as_str())
329        .ok_or_else(|| anyhow!("config flow output missing node_id"))?
330        .to_string();
331    let node = obj
332        .get("node")
333        .ok_or_else(|| anyhow!("config flow output missing node"))?
334        .clone();
335    if !node.is_object() {
336        bail!("config flow output node must be an object");
337    }
338    Ok((node_id, node))
339}
340
341fn append_routing(
342    flow_doc: &mut serde_json::Map<String, JsonValue>,
343    from_node: &str,
344    to_node: &str,
345) -> Result<()> {
346    let nodes = flow_doc
347        .get_mut("nodes")
348        .and_then(|n| n.as_object_mut())
349        .ok_or_else(|| anyhow!("flow missing nodes map"))?;
350    let Some(node) = nodes.get_mut(from_node) else {
351        bail!("node `{from_node}` not found for routing update");
352    };
353    let mapping = node
354        .as_object_mut()
355        .ok_or_else(|| anyhow!("node `{from_node}` is not an object"))?;
356    let routing = mapping
357        .entry("routing")
358        .or_insert(JsonValue::Array(Vec::new()));
359    let seq = routing
360        .as_array_mut()
361        .ok_or_else(|| anyhow!("routing for `{from_node}` is not a list"))?;
362    let mut entry = serde_json::Map::new();
363    entry.insert("to".to_string(), JsonValue::String(to_node.to_string()));
364    seq.push(JsonValue::Object(entry));
365    Ok(())
366}