greentic_flow/add_step/
mod.rs

1pub mod id;
2pub mod modes;
3pub mod normalize;
4pub mod rewire;
5pub mod validate;
6
7use indexmap::IndexMap;
8use serde_json::Value;
9use std::{fs, path::Path};
10
11use crate::{
12    component_catalog::ComponentCatalog,
13    component_catalog::ManifestCatalog,
14    config_flow::run_config_flow,
15    error::{FlowError, FlowErrorLocation, Result},
16    flow_ir::{ComponentRef, FlowIr, NodeIr, NodeKind, Route},
17    loader::load_ygtc_from_str,
18    model::FlowDoc,
19};
20
21use self::{
22    id::{generate_node_id, is_placeholder_value},
23    normalize::normalize_node_map,
24    rewire::{apply_threaded_routing, rewrite_placeholder_routes},
25    validate::validate_schema_and_flow,
26};
27
28#[derive(Debug, Clone)]
29pub struct AddStepSpec {
30    pub after: Option<String>,
31    pub node_id_hint: Option<String>,
32    pub node: Value,
33    pub allow_cycles: bool,
34}
35
36#[derive(Debug, Clone)]
37pub struct AddStepPlan {
38    pub anchor: String,
39    pub new_node: NodeIr,
40    pub anchor_old_routing: Vec<Route>,
41}
42
43#[derive(Debug, Clone)]
44pub struct Diagnostic {
45    pub code: &'static str,
46    pub message: String,
47    pub location: Option<String>,
48}
49
50pub fn plan_add_step(
51    flow: &FlowIr,
52    spec: AddStepSpec,
53    catalog: &dyn ComponentCatalog,
54) -> std::result::Result<AddStepPlan, Vec<Diagnostic>> {
55    let mut diags = Vec::new();
56
57    let anchor = match resolve_anchor(flow, spec.after.as_deref()) {
58        Ok(anchor) => anchor,
59        Err(msg) => {
60            diags.push(Diagnostic {
61                code: "ADD_STEP_ANCHOR_MISSING",
62                message: msg,
63                location: Some("nodes".to_string()),
64            });
65            return Err(diags);
66        }
67    };
68
69    if let Some(hint) = spec.node_id_hint.as_deref()
70        && is_placeholder_value(hint)
71    {
72        diags.push(Diagnostic {
73            code: "ADD_STEP_NODE_ID_PLACEHOLDER",
74            message: format!(
75                "Config flow emitted placeholder node id '{hint}'; update greentic-component to emit the component name."
76            ),
77            location: Some("add_step.node_id".to_string()),
78        });
79        return Err(diags);
80    }
81
82    let normalized = match normalize_node_map(spec.node.clone()) {
83        Ok(node) => node,
84        Err(e) => {
85            diags.push(Diagnostic {
86                code: "ADD_STEP_NODE_INVALID",
87                message: e.to_string(),
88                location: Some("add_step.node".to_string()),
89            });
90            return Err(diags);
91        }
92    };
93
94    if let Some(meta) = catalog.resolve(&normalized.component_id) {
95        for req in meta.required_fields {
96            if normalized.payload.get(&req).is_none() {
97                diags.push(Diagnostic {
98                    code: "COMPONENT_CONFIG_REQUIRED",
99                    message: format!(
100                        "component '{}' missing required config '{}'",
101                        normalized.component_id, req
102                    ),
103                    location: Some(format!("nodes.{}.{}", normalized.component_id, req)),
104                });
105            }
106        }
107    } else {
108        diags.push(Diagnostic {
109            code: "ADD_STEP_COMPONENT_UNKNOWN",
110            message: format!(
111                "component '{}' not found in catalog",
112                normalized.component_id
113            ),
114            location: Some("add_step.component".to_string()),
115        });
116    }
117
118    if !diags.is_empty() {
119        return Err(diags);
120    }
121
122    let anchor_node = flow.nodes.get(&anchor).expect("anchor exists");
123    let anchor_old_routing = anchor_node.routing.clone();
124
125    let new_node_id = generate_node_id(
126        spec.node_id_hint.as_deref(),
127        &normalized.component_id,
128        normalized.operation.as_deref(),
129        normalized.pack_alias.as_deref(),
130        &anchor,
131        flow.nodes.keys().map(|k| k.as_str()),
132    );
133
134    let routing = rewrite_placeholder_routes(
135        normalized.routing.clone(),
136        &anchor_old_routing,
137        spec.allow_cycles,
138        &anchor,
139        true,
140    )
141    .map_err(|msg| {
142        vec![Diagnostic {
143            code: "ADD_STEP_ROUTING_INVALID",
144            message: msg,
145            location: Some(format!("nodes.{new_node_id}.routing")),
146        }]
147    })?;
148
149    let new_node = NodeIr {
150        id: new_node_id.clone(),
151        kind: NodeKind::Component(ComponentRef {
152            component_id: normalized.component_id.clone(),
153            pack_alias: normalized.pack_alias.clone(),
154            operation: normalized.operation.clone(),
155            payload: normalized.payload.clone(),
156        }),
157        routing,
158    };
159
160    Ok(AddStepPlan {
161        anchor,
162        new_node,
163        anchor_old_routing,
164    })
165}
166
167pub fn apply_plan(flow: &FlowIr, plan: AddStepPlan, allow_cycles: bool) -> Result<FlowIr> {
168    let mut nodes: IndexMap<String, NodeIr> = flow.nodes.clone();
169    if nodes.contains_key(&plan.new_node.id) {
170        return Err(FlowError::Internal {
171            message: format!("node '{}' already exists", plan.new_node.id),
172            location: FlowErrorLocation::at_path(format!("nodes.{}", plan.new_node.id)),
173        });
174    }
175
176    let mut anchor = nodes
177        .get(&plan.anchor)
178        .cloned()
179        .ok_or_else(|| FlowError::Internal {
180            message: format!("anchor '{}' not found", plan.anchor),
181            location: FlowErrorLocation::at_path(format!("nodes.{}", plan.anchor)),
182        })?;
183
184    let anchor_routing = apply_threaded_routing(
185        &plan.new_node.id,
186        &plan.anchor_old_routing,
187        allow_cycles,
188        &plan.anchor,
189    )?;
190    anchor.routing = anchor_routing;
191    nodes.insert(plan.anchor.clone(), anchor);
192    nodes.insert(plan.new_node.id.clone(), plan.new_node);
193
194    Ok(FlowIr {
195        id: flow.id.clone(),
196        kind: flow.kind.clone(),
197        entrypoints: flow.entrypoints.clone(),
198        nodes,
199    })
200}
201
202pub fn validate_flow(flow: &FlowIr, catalog: &dyn ComponentCatalog) -> Vec<Diagnostic> {
203    let mut diags = Vec::new();
204    if let Some((name, target)) = flow.entrypoints.get_index(0)
205        && !flow.nodes.contains_key(target)
206    {
207        diags.push(Diagnostic {
208            code: "ENTRYPOINT_MISSING",
209            message: format!("entrypoint '{}' targets unknown node '{}'", name, target),
210            location: Some(format!("entrypoints.{name}")),
211        });
212    }
213
214    for (id, node) in &flow.nodes {
215        for route in &node.routing {
216            if let Some(to) = &route.to
217                && !flow.nodes.contains_key(to)
218            {
219                diags.push(Diagnostic {
220                    code: "ROUTE_TARGET_MISSING",
221                    message: format!("node '{}' routes to unknown node '{}'", id, to),
222                    location: Some(format!("nodes.{id}.routing")),
223                });
224            }
225        }
226
227        match &node.kind {
228            NodeKind::Component(comp) => {
229                if comp.payload.is_null() {
230                    diags.push(Diagnostic {
231                        code: "COMPONENT_PAYLOAD_REQUIRED",
232                        message: format!(
233                            "component '{}' payload must not be null",
234                            comp.component_id
235                        ),
236                        location: Some(format!("nodes.{id}")),
237                    });
238                }
239
240                if let Some(meta) = catalog.resolve(&comp.component_id) {
241                    for req in meta.required_fields {
242                        if comp.payload.get(&req).is_none() {
243                            diags.push(Diagnostic {
244                                code: "COMPONENT_CONFIG_REQUIRED",
245                                message: format!(
246                                    "component '{}' missing required config '{}'",
247                                    comp.component_id, req
248                                ),
249                                location: Some(format!("nodes.{id}.{req}")),
250                            });
251                        }
252                    }
253                } else {
254                    diags.push(Diagnostic {
255                        code: "COMPONENT_NOT_FOUND",
256                        message: format!("component '{}' not found in catalog", comp.component_id),
257                        location: Some(format!("nodes.{id}")),
258                    });
259                }
260            }
261            NodeKind::Questions { fields } => {
262                if fields.get("fields").is_none() {
263                    diags.push(Diagnostic {
264                        code: "QUESTIONS_FIELDS_REQUIRED",
265                        message: "questions node missing fields".to_string(),
266                        location: Some(format!("nodes.{id}.questions.fields")),
267                    });
268                }
269            }
270            NodeKind::Template { template } => {
271                if template.is_empty() {
272                    diags.push(Diagnostic {
273                        code: "TEMPLATE_EMPTY",
274                        message: "template node payload is empty".to_string(),
275                        location: Some(format!("nodes.{id}.template")),
276                    });
277                }
278            }
279            NodeKind::Other { .. } => {}
280        }
281    }
282
283    diags
284}
285
286pub fn diagnostics_to_error(diags: Vec<Diagnostic>) -> Result<()> {
287    if diags.is_empty() {
288        return Ok(());
289    }
290    let combined = diags
291        .into_iter()
292        .map(|d| format!("{}: {}", d.code, d.message))
293        .collect::<Vec<_>>()
294        .join("; ");
295    Err(FlowError::Internal {
296        message: combined,
297        location: FlowErrorLocation::at_path("add_step".to_string()),
298    })
299}
300
301fn resolve_anchor(flow: &FlowIr, after: Option<&str>) -> std::result::Result<String, String> {
302    if let Some(id) = after {
303        if flow.nodes.contains_key(id) {
304            return Ok(id.to_string());
305        }
306        return Err(format!("anchor node '{}' not found", id));
307    }
308
309    if let Some(entry) = flow.entrypoints.get_index(0) {
310        return Ok(entry.1.clone());
311    }
312
313    if let Some(first) = flow.nodes.keys().next() {
314        return Ok(first.clone());
315    }
316
317    Err("flow has no nodes to anchor insertion".to_string())
318}
319
320pub fn apply_and_validate(
321    flow: &FlowIr,
322    plan: AddStepPlan,
323    catalog: &dyn ComponentCatalog,
324    allow_cycles: bool,
325) -> Result<FlowIr> {
326    let updated = apply_plan(flow, plan, allow_cycles)?;
327    validate_schema_and_flow(&updated, catalog)?;
328    Ok(updated)
329}
330
331/// Return ordered anchor candidates for UX: entrypoint target first (if present), then remaining nodes in insertion order.
332pub fn anchor_candidates(flow: &FlowIr) -> Vec<String> {
333    let mut seen = IndexMap::new();
334    if let Some((_name, target)) = flow.entrypoints.get_index(0) {
335        seen.insert(target.clone(), ());
336    }
337    for id in flow.nodes.keys() {
338        seen.entry(id.clone()).or_insert(());
339    }
340    seen.keys().cloned().collect()
341}
342
343/// Execute a config flow and insert its emitted node into the target flow.
344pub fn add_step_from_config_flow(
345    flow_yaml: &str,
346    config_flow_path: &Path,
347    schema_path: &Path,
348    manifests: &[impl AsRef<Path>],
349    after: Option<String>,
350    answers: &serde_json::Map<String, Value>,
351    allow_cycles: bool,
352) -> Result<FlowDoc> {
353    let flow_doc = load_ygtc_from_str(flow_yaml)?;
354    let flow_ir = FlowIr::from_doc(flow_doc)?;
355    let catalog = ManifestCatalog::load_from_paths(manifests);
356
357    let config_yaml = fs::read_to_string(config_flow_path).map_err(|e| FlowError::Internal {
358        message: format!("read config flow {}: {e}", config_flow_path.display()),
359        location: FlowErrorLocation::at_path(config_flow_path.display().to_string())
360            .with_source_path(Some(config_flow_path)),
361    })?;
362    let output = run_config_flow(&config_yaml, schema_path, answers)?;
363
364    let spec = AddStepSpec {
365        after,
366        node_id_hint: Some(output.node_id.clone()),
367        node: output.node.clone(),
368        allow_cycles,
369    };
370
371    let plan =
372        plan_add_step(&flow_ir, spec, &catalog).map_err(|diags| {
373            match diagnostics_to_error(diags) {
374                Ok(_) => FlowError::Internal {
375                    message: "add_step diagnostics unexpectedly empty".to_string(),
376                    location: FlowErrorLocation::at_path("add_step".to_string()),
377                },
378                Err(e) => e,
379            }
380        })?;
381    let updated = apply_and_validate(&flow_ir, plan, &catalog, allow_cycles)?;
382    updated.to_doc()
383}