Skip to main content

greentic_flow/add_step/
mod.rs

1pub mod id;
2pub mod modes;
3pub mod normalize;
4pub mod rewire;
5pub mod validate;
6
7use indexmap::IndexMap;
8use serde_json::Value;
9use std::{fs, path::Path};
10
11use crate::{
12    component_catalog::ComponentCatalog,
13    component_catalog::ManifestCatalog,
14    config_flow::run_config_flow,
15    error::{FlowError, FlowErrorLocation, Result},
16    flow_ir::{FlowIr, NodeIr, Route},
17    loader::load_ygtc_from_str,
18    model::FlowDoc,
19};
20
21use self::{
22    id::{generate_node_id, is_placeholder_value},
23    normalize::normalize_node_map,
24    rewire::{apply_threaded_routing, rewrite_placeholder_routes},
25    validate::validate_schema_and_flow,
26};
27
28#[derive(Debug, Clone)]
29pub struct AddStepSpec {
30    pub after: Option<String>,
31    pub node_id_hint: Option<String>,
32    pub node: Value,
33    pub allow_cycles: bool,
34    pub require_placeholder: bool,
35}
36
37#[derive(Debug, Clone)]
38pub struct AddStepPlan {
39    pub anchor: String,
40    pub new_node: NodeIr,
41    pub anchor_old_routing: Vec<Route>,
42    pub insert_before_entrypoint: bool,
43}
44
45#[derive(Debug, Clone)]
46pub struct Diagnostic {
47    pub code: &'static str,
48    pub message: String,
49    pub location: Option<String>,
50}
51
52fn looks_like_component_id(hint: &str) -> bool {
53    let trimmed = hint.trim();
54    if trimmed.contains('.') {
55        return true;
56    }
57    let parts: Vec<&str> = trimmed.split('_').filter(|p| !p.is_empty()).collect();
58    parts.len() >= 3
59}
60
61fn simplify_component_name(raw: &str) -> Option<String> {
62    let mut candidate = raw.trim();
63    if candidate.is_empty() {
64        return None;
65    }
66    if let Some(last) = candidate.rsplit(['/', '\\']).next() {
67        candidate = last;
68    }
69    if let Some((base, _)) = candidate.split_once('@') {
70        candidate = base;
71    }
72    if let Some((base, _)) = candidate.split_once(':') {
73        candidate = base;
74    }
75    if let Some(last) = candidate.rsplit('.').next() {
76        candidate = last;
77    }
78    let underscore_parts: Vec<&str> = candidate.split('_').filter(|p| !p.is_empty()).collect();
79    if underscore_parts.len() >= 3 {
80        candidate = underscore_parts[underscore_parts.len() - 1];
81    }
82    let normalized = candidate.replace('_', "-");
83    if normalized.trim().is_empty() {
84        None
85    } else {
86        Some(normalized)
87    }
88}
89
90fn component_name_from_node(node: &Value) -> Option<String> {
91    let obj = node.as_object()?;
92    if let Some(exec) = obj.get("component.exec")
93        && let Some(component) = exec.get("component").and_then(Value::as_str)
94    {
95        return simplify_component_name(component);
96    }
97    if let Some(component) = obj.get("component").and_then(Value::as_str) {
98        return simplify_component_name(component);
99    }
100    None
101}
102
103pub fn normalize_node_id_hint(hint: Option<String>, node: &Value) -> Option<String> {
104    let derived = component_name_from_node(node);
105    match (hint.as_deref(), derived) {
106        (_, None) => hint,
107        (None, Some(name)) => Some(name),
108        (Some(existing), Some(name)) => {
109            if existing.trim().is_empty()
110                || is_placeholder_value(existing)
111                || looks_like_component_id(existing)
112            {
113                return Some(name);
114            }
115            Some(existing.to_string())
116        }
117    }
118}
119
120pub fn plan_add_step(
121    flow: &FlowIr,
122    spec: AddStepSpec,
123    _catalog: &dyn ComponentCatalog,
124) -> std::result::Result<AddStepPlan, Vec<Diagnostic>> {
125    let mut diags = Vec::new();
126
127    let anchor_source = match resolve_anchor(flow, spec.after.as_deref()) {
128        Ok(anchor) => anchor,
129        Err(msg) => {
130            diags.push(Diagnostic {
131                code: "ADD_STEP_ANCHOR_MISSING",
132                message: msg,
133                location: Some("nodes".to_string()),
134            });
135            return Err(diags);
136        }
137    };
138    let mut insert_before_entrypoint = false;
139    if spec.after.is_none()
140        && let Some((_, target)) = flow.entrypoints.get_index(0)
141        && target == &anchor_source
142    {
143        insert_before_entrypoint = true;
144    }
145    let anchor = anchor_source;
146
147    if let Some(hint) = spec.node_id_hint.as_deref()
148        && is_placeholder_value(hint)
149    {
150        diags.push(Diagnostic {
151            code: "ADD_STEP_NODE_ID_PLACEHOLDER",
152            message: format!(
153                "Config flow emitted placeholder node id '{hint}'; update greentic-component to emit the component name."
154            ),
155            location: Some("add_step.node_id".to_string()),
156        });
157        return Err(diags);
158    }
159
160    let normalized = match normalize_node_map(spec.node.clone()) {
161        Ok(node) => node,
162        Err(e) => {
163            diags.push(Diagnostic {
164                code: "ADD_STEP_NODE_INVALID",
165                message: e.to_string(),
166                location: Some("add_step.node".to_string()),
167            });
168            return Err(diags);
169        }
170    };
171
172    let anchor_old_routing = if let Some(anchor_node) = flow.nodes.get(&anchor) {
173        anchor_node.routing.clone()
174    } else if flow.nodes.is_empty() {
175        Vec::new()
176    } else {
177        return Err(vec![Diagnostic {
178            code: "ADD_STEP_ANCHOR_MISSING",
179            message: format!("anchor node '{}' not found", anchor),
180            location: Some("nodes".to_string()),
181        }]);
182    };
183
184    let hint = spec
185        .node_id_hint
186        .as_deref()
187        .or(Some(normalized.operation.as_str()));
188    let new_node_id = generate_node_id(hint, &anchor, flow.nodes.keys().map(|k| k.as_str()));
189
190    let routing = rewrite_placeholder_routes(
191        normalized.routing.clone(),
192        &anchor_old_routing,
193        spec.allow_cycles,
194        &anchor,
195        spec.require_placeholder,
196    )
197    .map_err(|msg| {
198        vec![Diagnostic {
199            code: "ADD_STEP_ROUTING_INVALID",
200            message: msg,
201            location: Some(format!("nodes.{new_node_id}.routing")),
202        }]
203    })?;
204
205    if routing.is_empty() {
206        return Err(vec![Diagnostic {
207            code: "ADD_STEP_ROUTING_MISSING",
208            message: "add-step requires at least one routing target; use --routing-* or include routing in config flow output".to_string(),
209            location: Some(format!("nodes.{new_node_id}.routing")),
210        }]);
211    }
212
213    let new_node = NodeIr {
214        id: new_node_id.clone(),
215        operation: normalized.operation.clone(),
216        payload: normalized.payload.clone(),
217        output: serde_json::Value::Object(Default::default()),
218        routing,
219        telemetry: normalized.telemetry.clone(),
220    };
221
222    Ok(AddStepPlan {
223        anchor,
224        new_node,
225        anchor_old_routing,
226        insert_before_entrypoint,
227    })
228}
229
230pub fn apply_plan(flow: &FlowIr, plan: AddStepPlan, allow_cycles: bool) -> Result<FlowIr> {
231    let mut nodes: IndexMap<String, NodeIr> = flow.nodes.clone();
232    if nodes.contains_key(&plan.new_node.id) {
233        return Err(FlowError::Internal {
234            message: format!("node '{}' already exists", plan.new_node.id),
235            location: FlowErrorLocation::at_path(format!("nodes.{}", plan.new_node.id)),
236        });
237    }
238
239    if nodes.is_empty() {
240        let mut entrypoints = IndexMap::new();
241        entrypoints.insert("default".to_string(), plan.new_node.id.clone());
242        nodes.insert(plan.new_node.id.clone(), plan.new_node);
243        return Ok(FlowIr {
244            id: flow.id.clone(),
245            kind: flow.kind.clone(),
246            schema_version: flow.schema_version,
247            entrypoints,
248            nodes,
249        });
250    }
251
252    if plan.insert_before_entrypoint {
253        // Insert new node before the entrypoint target: keep anchor routing, retarget entrypoints.
254        let mut new_nodes = IndexMap::new();
255        for (id, node) in nodes.into_iter() {
256            if id == plan.anchor {
257                let mut new_node = plan.new_node.clone();
258                new_node.routing = vec![Route {
259                    to: Some(plan.anchor.clone()),
260                    ..Route::default()
261                }];
262                new_nodes.insert(new_node.id.clone(), new_node);
263            }
264            new_nodes.insert(id.clone(), node);
265        }
266
267        let mut entrypoints = flow.entrypoints.clone();
268        for (_name, target) in entrypoints.iter_mut() {
269            if target == &plan.anchor {
270                *target = plan.new_node.id.clone();
271            }
272        }
273
274        return Ok(FlowIr {
275            id: flow.id.clone(),
276            kind: flow.kind.clone(),
277            schema_version: flow.schema_version,
278            entrypoints,
279            nodes: new_nodes,
280        });
281    }
282
283    let mut reordered = IndexMap::new();
284    let mut anchor_found = false;
285    for (id, node) in nodes.into_iter() {
286        if id == plan.anchor {
287            anchor_found = true;
288            let mut anchor = node.clone();
289            anchor.routing = apply_threaded_routing(
290                &plan.new_node.id,
291                &plan.anchor_old_routing,
292                allow_cycles,
293                &plan.anchor,
294            )?;
295            reordered.insert(id.clone(), anchor);
296            reordered.insert(plan.new_node.id.clone(), plan.new_node.clone());
297        } else {
298            reordered.insert(id.clone(), node);
299        }
300    }
301
302    if !anchor_found {
303        return Err(FlowError::Internal {
304            message: format!("anchor '{}' not found", plan.anchor),
305            location: FlowErrorLocation::at_path(format!("nodes.{}", plan.anchor)),
306        });
307    }
308
309    Ok(FlowIr {
310        id: flow.id.clone(),
311        kind: flow.kind.clone(),
312        schema_version: flow.schema_version,
313        entrypoints: flow.entrypoints.clone(),
314        nodes: reordered,
315    })
316}
317
318pub fn validate_flow(flow: &FlowIr, _catalog: &dyn ComponentCatalog) -> Vec<Diagnostic> {
319    let mut diags = Vec::new();
320    if let Some((name, target)) = flow.entrypoints.get_index(0)
321        && !flow.nodes.contains_key(target)
322    {
323        diags.push(Diagnostic {
324            code: "ENTRYPOINT_MISSING",
325            message: format!("entrypoint '{}' targets unknown node '{}'", name, target),
326            location: Some(format!("entrypoints.{name}")),
327        });
328    }
329
330    for (id, node) in &flow.nodes {
331        for route in &node.routing {
332            if let Some(to) = &route.to
333                && !flow.nodes.contains_key(to)
334            {
335                diags.push(Diagnostic {
336                    code: "ROUTE_TARGET_MISSING",
337                    message: format!("node '{}' routes to unknown node '{}'", id, to),
338                    location: Some(format!("nodes.{id}.routing")),
339                });
340            }
341        }
342        if node.operation.trim().is_empty() {
343            diags.push(Diagnostic {
344                code: "OPERATION_REQUIRED",
345                message: format!("node '{}' missing operation name", id),
346                location: Some(format!("nodes.{id}")),
347            });
348        }
349        if node.payload.is_null() {
350            diags.push(Diagnostic {
351                code: "PAYLOAD_REQUIRED",
352                message: format!("node '{}' payload must not be null", id),
353                location: Some(format!("nodes.{id}")),
354            });
355        }
356    }
357
358    diags
359}
360
361pub fn diagnostics_to_error(diags: Vec<Diagnostic>) -> Result<()> {
362    if diags.is_empty() {
363        return Ok(());
364    }
365    let combined = diags
366        .into_iter()
367        .map(|d| format!("{}: {}", d.code, d.message))
368        .collect::<Vec<_>>()
369        .join("; ");
370    Err(FlowError::Internal {
371        message: combined,
372        location: FlowErrorLocation::at_path("add_step".to_string()),
373    })
374}
375
376fn resolve_anchor(flow: &FlowIr, after: Option<&str>) -> std::result::Result<String, String> {
377    if let Some(id) = after {
378        if flow.nodes.contains_key(id) {
379            return Ok(id.to_string());
380        }
381        return Err(format!("anchor node '{}' not found", id));
382    }
383
384    if flow.nodes.is_empty() {
385        // Empty flow: no anchor needed; apply_plan will insert the first node and set entrypoint.
386        return Ok(String::new());
387    }
388
389    if let Some(entry) = flow.entrypoints.get_index(0) {
390        return Ok(entry.1.clone());
391    }
392
393    if let Some(first) = flow.nodes.keys().next() {
394        return Ok(first.clone());
395    }
396
397    Err("flow has no nodes to anchor insertion".to_string())
398}
399
400pub fn apply_and_validate(
401    flow: &FlowIr,
402    plan: AddStepPlan,
403    catalog: &dyn ComponentCatalog,
404    allow_cycles: bool,
405) -> Result<FlowIr> {
406    let updated = apply_plan(flow, plan, allow_cycles)?;
407    validate_schema_and_flow(&updated, catalog)?;
408    Ok(updated)
409}
410
411/// Return ordered anchor candidates for UX: entrypoint target first (if present), then remaining nodes in insertion order.
412pub fn anchor_candidates(flow: &FlowIr) -> Vec<String> {
413    let mut seen = IndexMap::new();
414    if let Some((_name, target)) = flow.entrypoints.get_index(0) {
415        seen.insert(target.clone(), ());
416    }
417    for id in flow.nodes.keys() {
418        seen.entry(id.clone()).or_insert(());
419    }
420    seen.keys().cloned().collect()
421}
422
423/// Execute a config flow and insert its emitted node into the target flow.
424pub fn add_step_from_config_flow(
425    flow_yaml: &str,
426    config_flow_path: &Path,
427    schema_path: &Path,
428    manifests: &[impl AsRef<Path>],
429    after: Option<String>,
430    answers: &serde_json::Map<String, Value>,
431    allow_cycles: bool,
432) -> Result<FlowDoc> {
433    let flow_doc = load_ygtc_from_str(flow_yaml)?;
434    let flow_ir = FlowIr::from_doc(flow_doc)?;
435    let catalog = ManifestCatalog::load_from_paths(manifests);
436
437    let config_yaml = fs::read_to_string(config_flow_path).map_err(|e| FlowError::Internal {
438        message: format!("read config flow {}: {e}", config_flow_path.display()),
439        location: FlowErrorLocation::at_path(config_flow_path.display().to_string())
440            .with_source_path(Some(config_flow_path)),
441    })?;
442    let output = run_config_flow(&config_yaml, schema_path, answers, None)?;
443    let node_id_hint = normalize_node_id_hint(Some(output.node_id.clone()), &output.node);
444
445    let spec = AddStepSpec {
446        after,
447        node_id_hint,
448        node: output.node.clone(),
449        allow_cycles,
450        require_placeholder: true,
451    };
452
453    let plan =
454        plan_add_step(&flow_ir, spec, &catalog).map_err(|diags| {
455            match diagnostics_to_error(diags) {
456                Ok(_) => FlowError::Internal {
457                    message: "add_step diagnostics unexpectedly empty".to_string(),
458                    location: FlowErrorLocation::at_path("add_step".to_string()),
459                },
460                Err(e) => e,
461            }
462        })?;
463    let updated = apply_and_validate(&flow_ir, plan, &catalog, allow_cycles)?;
464    updated.to_doc()
465}