greentic_dev/
flow_cmd.rs

1use std::fs;
2use std::path::Path;
3
4use crate::cli::{ConfigFlowModeArg, FlowAddStepArgs};
5use crate::component_add::run_component_add;
6use crate::pack_init::PackInitIntent;
7use crate::path_safety::normalize_under_root;
8use anyhow::{Context, Result, anyhow, bail};
9use greentic_flow::flow_bundle::load_and_validate_bundle;
10use serde_json::Value as JsonValue;
11use serde_yaml_bw as serde_yaml;
12use serde_yaml_bw::Mapping as YamlMapping;
13use serde_yaml_bw::Sequence as YamlSequence;
14use serde_yaml_bw::Value as YamlValue;
15use std::fs::OpenOptions;
16use std::io::Write;
17use std::path::PathBuf;
18use std::str::FromStr;
19use std::{io, io::IsTerminal};
20
21pub fn validate(path: &Path, compact_json: bool) -> Result<()> {
22    let root = std::env::current_dir()
23        .context("failed to resolve workspace root")?
24        .canonicalize()
25        .context("failed to canonicalize workspace root")?;
26    let safe = normalize_under_root(&root, path)?;
27    let source = fs::read_to_string(&safe)
28        .with_context(|| format!("failed to read flow definition at {}", safe.display()))?;
29
30    let bundle = load_and_validate_bundle(&source, Some(&safe)).with_context(|| {
31        format!(
32            "flow validation failed for {} using greentic-flow",
33            safe.display()
34        )
35    })?;
36
37    let serialized = if compact_json {
38        serde_json::to_string(&bundle)?
39    } else {
40        serde_json::to_string_pretty(&bundle)?
41    };
42
43    println!("{serialized}");
44    Ok(())
45}
46
47pub fn run_add_step(args: FlowAddStepArgs) -> Result<()> {
48    let root = std::env::current_dir()
49        .context("failed to resolve workspace root")?
50        .canonicalize()
51        .context("failed to canonicalize workspace root")?;
52    let flow_path = root.join("flows").join(format!("{}.ygtc", args.flow_id));
53    if !flow_path.exists() {
54        bail!("flow file not found: {}", flow_path.display());
55    }
56    let flow_src = std::fs::read_to_string(&flow_path)
57        .with_context(|| format!("failed to read {}", flow_path.display()))?;
58    let mut flow_doc: YamlValue = serde_yaml::from_str(&flow_src)
59        .with_context(|| format!("failed to parse {}", flow_path.display()))?;
60
61    let coord = args
62        .coordinate
63        .ok_or_else(|| anyhow!("component coordinate is required (pass --coordinate)"))?;
64
65    // Fetch or use local component bundle
66    let bundle_dir = resolve_component_bundle(&coord, args.profile.as_deref())?;
67    let flows_dir = bundle_dir.join("flows");
68    let custom_flow = flows_dir.join("custom.ygtc");
69    let default_flow = flows_dir.join("default.ygtc");
70    let selected = match args.mode {
71        Some(ConfigFlowModeArg::Custom) => {
72            if custom_flow.exists() {
73                custom_flow
74            } else {
75                default_flow.clone()
76            }
77        }
78        Some(ConfigFlowModeArg::Default) => {
79            if default_flow.exists() {
80                default_flow
81            } else {
82                custom_flow.clone()
83            }
84        }
85        None => {
86            if default_flow.exists() {
87                default_flow
88            } else if custom_flow.exists() {
89                custom_flow
90            } else {
91                bail!("component bundle does not provide flows/default.ygtc or flows/custom.ygtc")
92            }
93        }
94    };
95    if !selected.exists() {
96        bail!("selected config flow missing at {}", selected.display());
97    }
98
99    let output = crate::pack_run::run_config_flow(&selected)
100        .with_context(|| format!("failed to run config flow {}", selected.display()))?;
101    let (node_id, mut node) = parse_config_flow_output(&output)?;
102    let after = args
103        .after
104        .clone()
105        .or_else(|| prompt_routing_target(&flow_doc));
106    if let Some(after) = after.as_deref() {
107        patch_placeholder_routing(&mut node, after);
108    }
109
110    // Update target flow YAML
111    let nodes = flow_doc
112        .as_mapping_mut()
113        .and_then(|m| m.get_mut(YamlValue::String("nodes".to_string(), None)))
114        .and_then(|n| n.as_mapping_mut())
115        .ok_or_else(|| anyhow!("flow missing nodes map"))?;
116    nodes.insert(
117        YamlValue::String(node_id.clone(), None),
118        node_to_yaml(node)?,
119    );
120
121    if let Some(after) = args.after {
122        append_routing(&mut flow_doc, &after, &node_id)?;
123    } else if let Some(after) = after.as_deref() {
124        append_routing(&mut flow_doc, after, &node_id)?;
125    }
126
127    let rendered = serde_yaml::to_string(&flow_doc).context("failed to render updated flow")?;
128    let mut file = OpenOptions::new()
129        .write(true)
130        .truncate(true)
131        .open(&flow_path)
132        .with_context(|| format!("failed to open {} for writing", flow_path.display()))?;
133    file.write_all(rendered.as_bytes())
134        .with_context(|| format!("failed to write {}", flow_path.display()))?;
135
136    println!(
137        "Added node `{}` from config flow {} to {}",
138        node_id,
139        selected.file_name().unwrap_or_default().to_string_lossy(),
140        flow_path.display()
141    );
142    Ok(())
143}
144
145fn prompt_routing_target(flow_doc: &YamlValue) -> Option<String> {
146    if !io::stdout().is_terminal() {
147        return None;
148    }
149    let nodes = flow_doc
150        .as_mapping()
151        .and_then(|m| m.get(YamlValue::String("nodes".to_string(), None)))
152        .and_then(|n| n.as_mapping())?;
153    let mut keys: Vec<String> = nodes
154        .keys()
155        .filter_map(|k| k.as_str().map(|s| s.to_string()))
156        .collect();
157    keys.sort();
158    if keys.is_empty() {
159        return None;
160    }
161
162    println!("Select where to route from (empty to skip):");
163    for (idx, key) in keys.iter().enumerate() {
164        println!("  {}) {}", idx + 1, key);
165    }
166    print!("Choice: ");
167    let _ = io::stdout().flush();
168    let mut buf = String::new();
169    if io::stdin().read_line(&mut buf).is_err() {
170        return None;
171    }
172    let choice = buf.trim();
173    if choice.is_empty() {
174        return None;
175    }
176    if let Ok(idx) = choice.parse::<usize>()
177        && idx >= 1
178        && idx <= keys.len()
179    {
180        return Some(keys[idx - 1].clone());
181    }
182    None
183}
184
185fn patch_placeholder_routing(node: &mut JsonValue, next: &str) {
186    let Some(map) = node.as_object_mut() else {
187        return;
188    };
189    let Some(routing) = map.get_mut("routing") else {
190        return;
191    };
192    let Some(routes) = routing.as_array_mut() else {
193        return;
194    };
195    for entry in routes.iter_mut() {
196        if let Some(JsonValue::String(to)) =
197            entry.as_object_mut().and_then(|route| route.get_mut("to"))
198            && to == "NEXT_NODE_PLACEHOLDER"
199        {
200            *to = next.to_string();
201        }
202    }
203}
204
205fn resolve_component_bundle(coordinate: &str, profile: Option<&str>) -> Result<PathBuf> {
206    let path = PathBuf::from_str(coordinate).unwrap_or_default();
207    if path.exists() {
208        return Ok(path);
209    }
210    let dir = run_component_add(coordinate, profile, PackInitIntent::Dev)?;
211    Ok(dir)
212}
213
214pub fn parse_config_flow_output(output: &str) -> Result<(String, JsonValue)> {
215    let value: JsonValue =
216        serde_json::from_str(output).context("config flow output is not valid JSON")?;
217    let obj = value
218        .as_object()
219        .ok_or_else(|| anyhow!("config flow output must be a JSON object"))?;
220    let node_id = obj
221        .get("node_id")
222        .and_then(|v| v.as_str())
223        .ok_or_else(|| anyhow!("config flow output missing node_id"))?
224        .to_string();
225    let node = obj
226        .get("node")
227        .ok_or_else(|| anyhow!("config flow output missing node"))?
228        .clone();
229    if !node.is_object() {
230        bail!("config flow output node must be an object");
231    }
232    Ok((node_id, node))
233}
234
235fn node_to_yaml(node: JsonValue) -> Result<YamlValue> {
236    let yaml_string = serde_yaml::to_string(&node).context("failed to render node to YAML")?;
237    let yaml_value: YamlValue =
238        serde_yaml::from_str(&yaml_string).context("failed to parse rendered YAML")?;
239    Ok(yaml_value)
240}
241
242fn append_routing(flow_doc: &mut YamlValue, from_node: &str, to_node: &str) -> Result<()> {
243    let nodes = flow_doc
244        .as_mapping_mut()
245        .and_then(|m| m.get_mut(YamlValue::String("nodes".to_string(), None)))
246        .and_then(|n| n.as_mapping_mut())
247        .ok_or_else(|| anyhow!("flow missing nodes map"))?;
248    let key = YamlValue::String(from_node.to_string(), None);
249    let Some(node) = nodes.get_mut(&key) else {
250        bail!("node `{from_node}` not found for routing update");
251    };
252    let mapping = node
253        .as_mapping_mut()
254        .ok_or_else(|| anyhow!("node `{from_node}` is not a mapping"))?;
255    let routes_key = YamlValue::String("routing".to_string(), None);
256    let routing = mapping
257        .entry(routes_key)
258        .or_insert(YamlValue::Sequence(YamlSequence::default()));
259    let seq = routing
260        .as_sequence_mut()
261        .ok_or_else(|| anyhow!("routing for `{from_node}` is not a list"))?;
262    let mut entry = YamlMapping::new();
263    entry.insert(
264        YamlValue::String("to".to_string(), None),
265        YamlValue::String(to_node.to_string(), None),
266    );
267    seq.elements.push(YamlValue::Mapping(entry));
268    Ok(())
269}