greentic_flow/add_step/
mod.rs

1pub mod id;
2pub mod modes;
3pub mod normalize;
4pub mod rewire;
5pub mod validate;
6
7use indexmap::IndexMap;
8use serde_json::Value;
9use std::{fs, path::Path};
10
11use crate::{
12    component_catalog::ComponentCatalog,
13    component_catalog::ManifestCatalog,
14    config_flow::run_config_flow,
15    error::{FlowError, FlowErrorLocation, Result},
16    flow_ir::{FlowIr, NodeIr, Route},
17    loader::load_ygtc_from_str,
18    model::FlowDoc,
19};
20
21use self::{
22    id::{generate_node_id, is_placeholder_value},
23    normalize::normalize_node_map,
24    rewire::{apply_threaded_routing, rewrite_placeholder_routes},
25    validate::validate_schema_and_flow,
26};
27
28#[derive(Debug, Clone)]
29pub struct AddStepSpec {
30    pub after: Option<String>,
31    pub node_id_hint: Option<String>,
32    pub node: Value,
33    pub allow_cycles: bool,
34    pub require_placeholder: bool,
35}
36
37#[derive(Debug, Clone)]
38pub struct AddStepPlan {
39    pub anchor: String,
40    pub new_node: NodeIr,
41    pub anchor_old_routing: Vec<Route>,
42    pub insert_before_entrypoint: bool,
43}
44
45#[derive(Debug, Clone)]
46pub struct Diagnostic {
47    pub code: &'static str,
48    pub message: String,
49    pub location: Option<String>,
50}
51
52pub fn plan_add_step(
53    flow: &FlowIr,
54    spec: AddStepSpec,
55    _catalog: &dyn ComponentCatalog,
56) -> std::result::Result<AddStepPlan, Vec<Diagnostic>> {
57    let mut diags = Vec::new();
58
59    let anchor_source = match resolve_anchor(flow, spec.after.as_deref()) {
60        Ok(anchor) => anchor,
61        Err(msg) => {
62            diags.push(Diagnostic {
63                code: "ADD_STEP_ANCHOR_MISSING",
64                message: msg,
65                location: Some("nodes".to_string()),
66            });
67            return Err(diags);
68        }
69    };
70    let mut insert_before_entrypoint = false;
71    if spec.after.is_none()
72        && let Some((_, target)) = flow.entrypoints.get_index(0)
73        && target == &anchor_source
74    {
75        insert_before_entrypoint = true;
76    }
77    let anchor = anchor_source;
78
79    if let Some(hint) = spec.node_id_hint.as_deref()
80        && is_placeholder_value(hint)
81    {
82        diags.push(Diagnostic {
83            code: "ADD_STEP_NODE_ID_PLACEHOLDER",
84            message: format!(
85                "Config flow emitted placeholder node id '{hint}'; update greentic-component to emit the component name."
86            ),
87            location: Some("add_step.node_id".to_string()),
88        });
89        return Err(diags);
90    }
91
92    let normalized = match normalize_node_map(spec.node.clone()) {
93        Ok(node) => node,
94        Err(e) => {
95            diags.push(Diagnostic {
96                code: "ADD_STEP_NODE_INVALID",
97                message: e.to_string(),
98                location: Some("add_step.node".to_string()),
99            });
100            return Err(diags);
101        }
102    };
103
104    let anchor_old_routing = if let Some(anchor_node) = flow.nodes.get(&anchor) {
105        anchor_node.routing.clone()
106    } else if flow.nodes.is_empty() {
107        Vec::new()
108    } else {
109        return Err(vec![Diagnostic {
110            code: "ADD_STEP_ANCHOR_MISSING",
111            message: format!("anchor node '{}' not found", anchor),
112            location: Some("nodes".to_string()),
113        }]);
114    };
115
116    let hint = spec
117        .node_id_hint
118        .as_deref()
119        .or(Some(normalized.operation.as_str()));
120    let new_node_id = generate_node_id(hint, &anchor, flow.nodes.keys().map(|k| k.as_str()));
121
122    let routing = rewrite_placeholder_routes(
123        normalized.routing.clone(),
124        &anchor_old_routing,
125        spec.allow_cycles,
126        &anchor,
127        spec.require_placeholder,
128    )
129    .map_err(|msg| {
130        vec![Diagnostic {
131            code: "ADD_STEP_ROUTING_INVALID",
132            message: msg,
133            location: Some(format!("nodes.{new_node_id}.routing")),
134        }]
135    })?;
136
137    let new_node = NodeIr {
138        id: new_node_id.clone(),
139        operation: normalized.operation.clone(),
140        payload: normalized.payload.clone(),
141        output: serde_json::Value::Object(Default::default()),
142        routing,
143        telemetry: normalized.telemetry.clone(),
144    };
145
146    Ok(AddStepPlan {
147        anchor,
148        new_node,
149        anchor_old_routing,
150        insert_before_entrypoint,
151    })
152}
153
154pub fn apply_plan(flow: &FlowIr, plan: AddStepPlan, allow_cycles: bool) -> Result<FlowIr> {
155    let mut nodes: IndexMap<String, NodeIr> = flow.nodes.clone();
156    if nodes.contains_key(&plan.new_node.id) {
157        return Err(FlowError::Internal {
158            message: format!("node '{}' already exists", plan.new_node.id),
159            location: FlowErrorLocation::at_path(format!("nodes.{}", plan.new_node.id)),
160        });
161    }
162
163    if nodes.is_empty() {
164        let mut entrypoints = IndexMap::new();
165        entrypoints.insert("default".to_string(), plan.new_node.id.clone());
166        nodes.insert(plan.new_node.id.clone(), plan.new_node);
167        return Ok(FlowIr {
168            id: flow.id.clone(),
169            kind: flow.kind.clone(),
170            schema_version: flow.schema_version,
171            entrypoints,
172            nodes,
173        });
174    }
175
176    if plan.insert_before_entrypoint {
177        // Insert new node before the entrypoint target: keep anchor routing, retarget entrypoints.
178        let mut new_nodes = IndexMap::new();
179        for (id, node) in nodes.into_iter() {
180            if id == plan.anchor {
181                let mut new_node = plan.new_node.clone();
182                new_node.routing = vec![Route {
183                    to: Some(plan.anchor.clone()),
184                    ..Route::default()
185                }];
186                new_nodes.insert(new_node.id.clone(), new_node);
187            }
188            new_nodes.insert(id.clone(), node);
189        }
190
191        let mut entrypoints = flow.entrypoints.clone();
192        for (_name, target) in entrypoints.iter_mut() {
193            if target == &plan.anchor {
194                *target = plan.new_node.id.clone();
195            }
196        }
197
198        return Ok(FlowIr {
199            id: flow.id.clone(),
200            kind: flow.kind.clone(),
201            schema_version: flow.schema_version,
202            entrypoints,
203            nodes: new_nodes,
204        });
205    }
206
207    let mut reordered = IndexMap::new();
208    let mut anchor_found = false;
209    for (id, node) in nodes.into_iter() {
210        if id == plan.anchor {
211            anchor_found = true;
212            let mut anchor = node.clone();
213            anchor.routing = apply_threaded_routing(
214                &plan.new_node.id,
215                &plan.anchor_old_routing,
216                allow_cycles,
217                &plan.anchor,
218            )?;
219            reordered.insert(id.clone(), anchor);
220            reordered.insert(plan.new_node.id.clone(), plan.new_node.clone());
221        } else {
222            reordered.insert(id.clone(), node);
223        }
224    }
225
226    if !anchor_found {
227        return Err(FlowError::Internal {
228            message: format!("anchor '{}' not found", plan.anchor),
229            location: FlowErrorLocation::at_path(format!("nodes.{}", plan.anchor)),
230        });
231    }
232
233    Ok(FlowIr {
234        id: flow.id.clone(),
235        kind: flow.kind.clone(),
236        schema_version: flow.schema_version,
237        entrypoints: flow.entrypoints.clone(),
238        nodes: reordered,
239    })
240}
241
242pub fn validate_flow(flow: &FlowIr, _catalog: &dyn ComponentCatalog) -> Vec<Diagnostic> {
243    let mut diags = Vec::new();
244    if let Some((name, target)) = flow.entrypoints.get_index(0)
245        && !flow.nodes.contains_key(target)
246    {
247        diags.push(Diagnostic {
248            code: "ENTRYPOINT_MISSING",
249            message: format!("entrypoint '{}' targets unknown node '{}'", name, target),
250            location: Some(format!("entrypoints.{name}")),
251        });
252    }
253
254    for (id, node) in &flow.nodes {
255        for route in &node.routing {
256            if let Some(to) = &route.to
257                && !flow.nodes.contains_key(to)
258            {
259                diags.push(Diagnostic {
260                    code: "ROUTE_TARGET_MISSING",
261                    message: format!("node '{}' routes to unknown node '{}'", id, to),
262                    location: Some(format!("nodes.{id}.routing")),
263                });
264            }
265        }
266        if node.operation.trim().is_empty() {
267            diags.push(Diagnostic {
268                code: "OPERATION_REQUIRED",
269                message: format!("node '{}' missing operation name", id),
270                location: Some(format!("nodes.{id}")),
271            });
272        }
273        if node.payload.is_null() {
274            diags.push(Diagnostic {
275                code: "PAYLOAD_REQUIRED",
276                message: format!("node '{}' payload must not be null", id),
277                location: Some(format!("nodes.{id}")),
278            });
279        }
280    }
281
282    diags
283}
284
285pub fn diagnostics_to_error(diags: Vec<Diagnostic>) -> Result<()> {
286    if diags.is_empty() {
287        return Ok(());
288    }
289    let combined = diags
290        .into_iter()
291        .map(|d| format!("{}: {}", d.code, d.message))
292        .collect::<Vec<_>>()
293        .join("; ");
294    Err(FlowError::Internal {
295        message: combined,
296        location: FlowErrorLocation::at_path("add_step".to_string()),
297    })
298}
299
300fn resolve_anchor(flow: &FlowIr, after: Option<&str>) -> std::result::Result<String, String> {
301    if let Some(id) = after {
302        if flow.nodes.contains_key(id) {
303            return Ok(id.to_string());
304        }
305        return Err(format!("anchor node '{}' not found", id));
306    }
307
308    if let Some(entry) = flow.entrypoints.get_index(0) {
309        return Ok(entry.1.clone());
310    }
311
312    if let Some(first) = flow.nodes.keys().next() {
313        return Ok(first.clone());
314    }
315
316    Err("flow has no nodes to anchor insertion".to_string())
317}
318
319pub fn apply_and_validate(
320    flow: &FlowIr,
321    plan: AddStepPlan,
322    catalog: &dyn ComponentCatalog,
323    allow_cycles: bool,
324) -> Result<FlowIr> {
325    let updated = apply_plan(flow, plan, allow_cycles)?;
326    validate_schema_and_flow(&updated, catalog)?;
327    Ok(updated)
328}
329
330/// Return ordered anchor candidates for UX: entrypoint target first (if present), then remaining nodes in insertion order.
331pub fn anchor_candidates(flow: &FlowIr) -> Vec<String> {
332    let mut seen = IndexMap::new();
333    if let Some((_name, target)) = flow.entrypoints.get_index(0) {
334        seen.insert(target.clone(), ());
335    }
336    for id in flow.nodes.keys() {
337        seen.entry(id.clone()).or_insert(());
338    }
339    seen.keys().cloned().collect()
340}
341
342/// Execute a config flow and insert its emitted node into the target flow.
343pub fn add_step_from_config_flow(
344    flow_yaml: &str,
345    config_flow_path: &Path,
346    schema_path: &Path,
347    manifests: &[impl AsRef<Path>],
348    after: Option<String>,
349    answers: &serde_json::Map<String, Value>,
350    allow_cycles: bool,
351) -> Result<FlowDoc> {
352    let flow_doc = load_ygtc_from_str(flow_yaml)?;
353    let flow_ir = FlowIr::from_doc(flow_doc)?;
354    let catalog = ManifestCatalog::load_from_paths(manifests);
355
356    let config_yaml = fs::read_to_string(config_flow_path).map_err(|e| FlowError::Internal {
357        message: format!("read config flow {}: {e}", config_flow_path.display()),
358        location: FlowErrorLocation::at_path(config_flow_path.display().to_string())
359            .with_source_path(Some(config_flow_path)),
360    })?;
361    let output = run_config_flow(&config_yaml, schema_path, answers)?;
362
363    let spec = AddStepSpec {
364        after,
365        node_id_hint: Some(output.node_id.clone()),
366        node: output.node.clone(),
367        allow_cycles,
368        require_placeholder: true,
369    };
370
371    let plan =
372        plan_add_step(&flow_ir, spec, &catalog).map_err(|diags| {
373            match diagnostics_to_error(diags) {
374                Ok(_) => FlowError::Internal {
375                    message: "add_step diagnostics unexpectedly empty".to_string(),
376                    location: FlowErrorLocation::at_path("add_step".to_string()),
377                },
378                Err(e) => e,
379            }
380        })?;
381    let updated = apply_and_validate(&flow_ir, plan, &catalog, allow_cycles)?;
382    updated.to_doc()
383}