greentic_dev/
flow_cmd.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3
4use crate::cli::{ConfigFlowModeArg, FlowAddStepArgs};
5use crate::component_add::run_component_add;
6use crate::pack_init::PackInitIntent;
7use crate::path_safety::normalize_under_root;
8use anyhow::{Context, Result, anyhow, bail};
9use greentic_flow::flow_bundle::load_and_validate_bundle;
10use serde_json::Value as JsonValue;
11use serde_yaml_bw as serde_yaml;
12use std::io::Write;
13use std::str::FromStr;
14use std::{io, io::IsTerminal};
15use tempfile::NamedTempFile;
16
17use greentic_types::FlowId;
18use greentic_types::component::ComponentManifest;
19
20pub fn validate(path: &Path, compact_json: bool) -> Result<()> {
21    let root = std::env::current_dir()
22        .context("failed to resolve workspace root")?
23        .canonicalize()
24        .context("failed to canonicalize workspace root")?;
25    let safe = normalize_under_root(&root, path)?;
26    let source = fs::read_to_string(&safe)
27        .with_context(|| format!("failed to read flow definition at {}", safe.display()))?;
28
29    let bundle = load_and_validate_bundle(&source, Some(&safe)).with_context(|| {
30        format!(
31            "flow validation failed for {} using greentic-flow",
32            safe.display()
33        )
34    })?;
35
36    let serialized = if compact_json {
37        serde_json::to_string(&bundle)?
38    } else {
39        serde_json::to_string_pretty(&bundle)?
40    };
41
42    println!("{serialized}");
43    Ok(())
44}
45
46pub fn run_add_step(args: FlowAddStepArgs) -> Result<()> {
47    let manifest_path = args
48        .manifest
49        .clone()
50        .unwrap_or_else(|| PathBuf::from("component.manifest.json"));
51    if !manifest_path.exists() {
52        bail!(
53            "component.manifest.json not found at {}. Use --manifest to point to the manifest file.",
54            manifest_path.display()
55        );
56    }
57    let manifest_raw = std::fs::read_to_string(&manifest_path)
58        .with_context(|| format!("failed to read {}", manifest_path.display()))?;
59    let manifest: ComponentManifest = serde_json::from_str(&manifest_raw).with_context(|| {
60        format!(
61            "failed to parse component manifest JSON at {}",
62            manifest_path.display()
63        )
64    })?;
65
66    let config_flow_id = match args.mode {
67        Some(ConfigFlowModeArg::Custom) => "custom".to_string(),
68        Some(ConfigFlowModeArg::Default) => "default".to_string(),
69        None => args.flow.clone(),
70    };
71    let config_flow_key = FlowId::from_str(&config_flow_id).map_err(|_| {
72        anyhow!(
73            "invalid flow identifier `{}`; flow ids must be valid FlowId strings",
74            config_flow_id
75        )
76    })?;
77    let Some(config_flow) = manifest.dev_flows.get(&config_flow_key) else {
78        bail!(
79            "Flow '{}' is missing from manifest.dev_flows. Run `greentic-component flow update` to regenerate config flows.",
80            config_flow_id
81        );
82    };
83    if !config_flow.graph.is_object() {
84        bail!("config flow `{config_flow_id}` graph is not an object");
85    }
86
87    let coord = args
88        .coordinate
89        .ok_or_else(|| anyhow!("component coordinate is required (pass --coordinate)"))?;
90
91    // Ensure the component is available locally (fetch if needed).
92    let _bundle_dir = resolve_component_bundle(&coord, args.profile.as_deref())?;
93
94    // Render the dev flow graph to YAML so the existing runner can consume it.
95    let config_flow_yaml = serde_yaml::to_string(&config_flow.graph)
96        .context("failed to render config flow graph to YAML")?;
97    let mut temp_flow =
98        NamedTempFile::new().context("failed to create temporary config flow file")?;
99    temp_flow
100        .write_all(config_flow_yaml.as_bytes())
101        .context("failed to write temporary config flow")?;
102    temp_flow.flush()?;
103
104    let pack_flow_path = PathBuf::from("flows").join(format!("{}.ygtc", args.flow_id));
105    if !pack_flow_path.exists() {
106        bail!(
107            "Pack flow '{}' not found at {}",
108            args.flow_id,
109            pack_flow_path.display()
110        );
111    }
112    let pack_flow_raw = std::fs::read_to_string(&pack_flow_path)
113        .with_context(|| format!("failed to read pack flow {}", pack_flow_path.display()))?;
114    let mut pack_flow_json: JsonValue = serde_yaml::from_str(&pack_flow_raw)
115        .with_context(|| format!("invalid YAML in {}", pack_flow_path.display()))?;
116
117    let output = crate::pack_run::run_config_flow(temp_flow.path())
118        .with_context(|| format!("failed to run config flow {}", config_flow_id))?;
119    let (node_id, mut node) = parse_config_flow_output(&output)?;
120    let after = args
121        .after
122        .clone()
123        .or_else(|| prompt_routing_target(&pack_flow_json));
124    if let Some(after) = after.as_deref() {
125        patch_placeholder_routing(&mut node, after);
126    }
127
128    let graph_obj = pack_flow_json
129        .as_object_mut()
130        .ok_or_else(|| anyhow!("pack flow {} is not a mapping", pack_flow_path.display()))?;
131
132    // Update target pack flow JSON
133    let nodes = graph_obj
134        .get_mut("nodes")
135        .and_then(|n| n.as_object_mut())
136        .ok_or_else(|| anyhow!("flow `{}` missing nodes map", args.flow_id))?;
137    nodes.insert(node_id.clone(), node);
138
139    if let Some(after) = args.after {
140        append_routing(graph_obj, &after, &node_id)?;
141    } else if let Some(after) = after.as_deref() {
142        append_routing(graph_obj, after, &node_id)?;
143    }
144
145    let rendered =
146        serde_yaml::to_string(&pack_flow_json).context("failed to render updated pack flow")?;
147    std::fs::write(&pack_flow_path, rendered)
148        .with_context(|| format!("failed to write {}", pack_flow_path.display()))?;
149
150    println!(
151        "Added node `{}` from config flow {} to {}",
152        node_id,
153        config_flow_id,
154        pack_flow_path.display()
155    );
156    Ok(())
157}
158
159fn prompt_routing_target(flow_doc: &JsonValue) -> Option<String> {
160    if !io::stdout().is_terminal() {
161        return None;
162    }
163    let nodes = flow_doc
164        .as_object()
165        .and_then(|m| m.get("nodes"))
166        .and_then(|n| n.as_object())?;
167    let mut keys: Vec<String> = nodes.keys().cloned().collect();
168    keys.sort();
169    if keys.is_empty() {
170        return None;
171    }
172
173    println!("Select where to route from (empty to skip):");
174    for (idx, key) in keys.iter().enumerate() {
175        println!("  {}) {}", idx + 1, key);
176    }
177    print!("Choice: ");
178    let _ = io::stdout().flush();
179    let mut buf = String::new();
180    if io::stdin().read_line(&mut buf).is_err() {
181        return None;
182    }
183    let choice = buf.trim();
184    if choice.is_empty() {
185        return None;
186    }
187    if let Ok(idx) = choice.parse::<usize>()
188        && idx >= 1
189        && idx <= keys.len()
190    {
191        return Some(keys[idx - 1].clone());
192    }
193    None
194}
195
196fn patch_placeholder_routing(node: &mut JsonValue, next: &str) {
197    let Some(map) = node.as_object_mut() else {
198        return;
199    };
200    let Some(routing) = map.get_mut("routing") else {
201        return;
202    };
203    let Some(routes) = routing.as_array_mut() else {
204        return;
205    };
206    for entry in routes.iter_mut() {
207        if let Some(JsonValue::String(to)) =
208            entry.as_object_mut().and_then(|route| route.get_mut("to"))
209            && to == "NEXT_NODE_PLACEHOLDER"
210        {
211            *to = next.to_string();
212        }
213    }
214}
215
216fn resolve_component_bundle(coordinate: &str, profile: Option<&str>) -> Result<PathBuf> {
217    let path = PathBuf::from_str(coordinate).unwrap_or_default();
218    if path.exists() {
219        return Ok(path);
220    }
221    let dir = run_component_add(coordinate, profile, PackInitIntent::Dev)?;
222    Ok(dir)
223}
224
225pub fn parse_config_flow_output(output: &str) -> Result<(String, JsonValue)> {
226    let value: JsonValue =
227        serde_json::from_str(output).context("config flow output is not valid JSON")?;
228    let obj = value
229        .as_object()
230        .ok_or_else(|| anyhow!("config flow output must be a JSON object"))?;
231    let node_id = obj
232        .get("node_id")
233        .and_then(|v| v.as_str())
234        .ok_or_else(|| anyhow!("config flow output missing node_id"))?
235        .to_string();
236    let node = obj
237        .get("node")
238        .ok_or_else(|| anyhow!("config flow output missing node"))?
239        .clone();
240    if !node.is_object() {
241        bail!("config flow output node must be an object");
242    }
243    Ok((node_id, node))
244}
245
246fn append_routing(
247    flow_doc: &mut serde_json::Map<String, JsonValue>,
248    from_node: &str,
249    to_node: &str,
250) -> Result<()> {
251    let nodes = flow_doc
252        .get_mut("nodes")
253        .and_then(|n| n.as_object_mut())
254        .ok_or_else(|| anyhow!("flow missing nodes map"))?;
255    let Some(node) = nodes.get_mut(from_node) else {
256        bail!("node `{from_node}` not found for routing update");
257    };
258    let mapping = node
259        .as_object_mut()
260        .ok_or_else(|| anyhow!("node `{from_node}` is not an object"))?;
261    let routing = mapping
262        .entry("routing")
263        .or_insert(JsonValue::Array(Vec::new()));
264    let seq = routing
265        .as_array_mut()
266        .ok_or_else(|| anyhow!("routing for `{from_node}` is not a list"))?;
267    let mut entry = serde_json::Map::new();
268    entry.insert("to".to_string(), JsonValue::String(to_node.to_string()));
269    seq.push(JsonValue::Object(entry));
270    Ok(())
271}