Skip to main content

greentic_flow/add_step/
mod.rs

1pub mod id;
2pub mod modes;
3pub mod normalize;
4pub mod rewire;
5pub mod validate;
6
7use indexmap::IndexMap;
8use serde_json::Value;
9use std::{fs, path::Path};
10
11use crate::{
12    component_catalog::ComponentCatalog,
13    component_catalog::ManifestCatalog,
14    config_flow::run_config_flow,
15    error::{FlowError, FlowErrorLocation, Result},
16    flow_ir::{FlowIr, NodeIr, Route},
17    loader::load_ygtc_from_str,
18    model::FlowDoc,
19};
20
21use self::{
22    id::{generate_node_id, is_placeholder_value},
23    normalize::normalize_node_map,
24    rewire::{apply_threaded_routing, rewrite_placeholder_routes},
25    validate::validate_schema_and_flow,
26};
27
28#[derive(Debug, Clone)]
29pub struct AddStepSpec {
30    pub after: Option<String>,
31    pub node_id_hint: Option<String>,
32    pub node: Value,
33    pub allow_cycles: bool,
34    pub require_placeholder: bool,
35}
36
37#[derive(Debug, Clone)]
38pub struct AddStepPlan {
39    pub anchor: String,
40    pub new_node: NodeIr,
41    pub anchor_old_routing: Vec<Route>,
42    pub insert_before_entrypoint: bool,
43}
44
45#[derive(Debug, Clone)]
46pub struct Diagnostic {
47    pub code: &'static str,
48    pub message: String,
49    pub location: Option<String>,
50}
51
52fn looks_like_component_id(hint: &str) -> bool {
53    let trimmed = hint.trim();
54    if trimmed.contains('.') {
55        return true;
56    }
57    let parts: Vec<&str> = trimmed.split('_').filter(|p| !p.is_empty()).collect();
58    parts.len() >= 3
59}
60
61fn simplify_component_name(raw: &str) -> Option<String> {
62    let mut candidate = raw.trim();
63    if candidate.is_empty() {
64        return None;
65    }
66    if let Some(last) = candidate.rsplit(['/', '\\']).next() {
67        candidate = last;
68    }
69    if let Some((base, _)) = candidate.split_once('@') {
70        candidate = base;
71    }
72    if let Some((base, _)) = candidate.split_once(':') {
73        candidate = base;
74    }
75    if let Some(last) = candidate.rsplit('.').next() {
76        candidate = last;
77    }
78    let underscore_parts: Vec<&str> = candidate.split('_').filter(|p| !p.is_empty()).collect();
79    if underscore_parts.len() >= 3 {
80        candidate = underscore_parts[underscore_parts.len() - 1];
81    }
82    let normalized = candidate.replace('_', "-");
83    if normalized.trim().is_empty() {
84        None
85    } else {
86        Some(normalized)
87    }
88}
89
90fn component_name_from_node(node: &Value) -> Option<String> {
91    let obj = node.as_object()?;
92    if let Some(exec) = obj.get("component.exec")
93        && let Some(component) = exec.get("component").and_then(Value::as_str)
94    {
95        return simplify_component_name(component);
96    }
97    if let Some(component) = obj.get("component").and_then(Value::as_str) {
98        return simplify_component_name(component);
99    }
100    None
101}
102
103pub fn normalize_node_id_hint(hint: Option<String>, node: &Value) -> Option<String> {
104    let derived = component_name_from_node(node);
105    match (hint.as_deref(), derived) {
106        (_, None) => hint,
107        (None, Some(name)) => Some(name),
108        (Some(existing), Some(name)) => {
109            if existing.trim().is_empty()
110                || is_placeholder_value(existing)
111                || looks_like_component_id(existing)
112            {
113                return Some(name);
114            }
115            Some(existing.to_string())
116        }
117    }
118}
119
120pub fn plan_add_step(
121    flow: &FlowIr,
122    spec: AddStepSpec,
123    _catalog: &dyn ComponentCatalog,
124) -> std::result::Result<AddStepPlan, Vec<Diagnostic>> {
125    let mut diags = Vec::new();
126
127    let anchor_source = match resolve_anchor(flow, spec.after.as_deref()) {
128        Ok(anchor) => anchor,
129        Err(msg) => {
130            diags.push(Diagnostic {
131                code: "ADD_STEP_ANCHOR_MISSING",
132                message: msg,
133                location: Some("nodes".to_string()),
134            });
135            return Err(diags);
136        }
137    };
138    let mut insert_before_entrypoint = false;
139    if spec.after.is_none()
140        && let Some((_, target)) = flow.entrypoints.get_index(0)
141        && target == &anchor_source
142    {
143        insert_before_entrypoint = true;
144    }
145    let anchor = anchor_source;
146
147    if let Some(hint) = spec.node_id_hint.as_deref()
148        && is_placeholder_value(hint)
149    {
150        diags.push(Diagnostic {
151            code: "ADD_STEP_NODE_ID_PLACEHOLDER",
152            message: format!(
153                "Config flow emitted placeholder node id '{hint}'; update greentic-component to emit the component name."
154            ),
155            location: Some("add_step.node_id".to_string()),
156        });
157        return Err(diags);
158    }
159
160    let normalized = match normalize_node_map(spec.node.clone()) {
161        Ok(node) => node,
162        Err(e) => {
163            diags.push(Diagnostic {
164                code: "ADD_STEP_NODE_INVALID",
165                message: e.to_string(),
166                location: Some("add_step.node".to_string()),
167            });
168            return Err(diags);
169        }
170    };
171
172    let anchor_old_routing = if let Some(anchor_node) = flow.nodes.get(&anchor) {
173        anchor_node.routing.clone()
174    } else if flow.nodes.is_empty() {
175        Vec::new()
176    } else {
177        return Err(vec![Diagnostic {
178            code: "ADD_STEP_ANCHOR_MISSING",
179            message: format!("anchor node '{}' not found", anchor),
180            location: Some("nodes".to_string()),
181        }]);
182    };
183
184    let hint = spec
185        .node_id_hint
186        .as_deref()
187        .or(Some(normalized.operation.as_str()));
188    let new_node_id = generate_node_id(hint, &anchor, flow.nodes.keys().map(|k| k.as_str()));
189
190    let routing = rewrite_placeholder_routes(
191        normalized.routing.clone(),
192        &anchor_old_routing,
193        spec.allow_cycles,
194        &anchor,
195        spec.require_placeholder,
196    )
197    .map_err(|msg| {
198        vec![Diagnostic {
199            code: "ADD_STEP_ROUTING_INVALID",
200            message: msg,
201            location: Some(format!("nodes.{new_node_id}.routing")),
202        }]
203    })?;
204
205    if routing.is_empty() {
206        return Err(vec![Diagnostic {
207            code: "ADD_STEP_ROUTING_MISSING",
208            message: "add-step requires at least one routing target; use --routing-* or include routing in config flow output".to_string(),
209            location: Some(format!("nodes.{new_node_id}.routing")),
210        }]);
211    }
212
213    let new_node = NodeIr {
214        id: new_node_id.clone(),
215        operation: normalized.operation.clone(),
216        payload: normalized.payload.clone(),
217        output: serde_json::Value::Object(Default::default()),
218        in_map: None,
219        out_map: None,
220        err_map: None,
221        routing,
222        telemetry: normalized.telemetry.clone(),
223    };
224
225    Ok(AddStepPlan {
226        anchor,
227        new_node,
228        anchor_old_routing,
229        insert_before_entrypoint,
230    })
231}
232
233pub fn apply_plan(flow: &FlowIr, plan: AddStepPlan, allow_cycles: bool) -> Result<FlowIr> {
234    let mut nodes: IndexMap<String, NodeIr> = flow.nodes.clone();
235    if nodes.contains_key(&plan.new_node.id) {
236        return Err(FlowError::Internal {
237            message: format!("node '{}' already exists", plan.new_node.id),
238            location: FlowErrorLocation::at_path(format!("nodes.{}", plan.new_node.id)),
239        });
240    }
241
242    if nodes.is_empty() {
243        let mut entrypoints = IndexMap::new();
244        entrypoints.insert("default".to_string(), plan.new_node.id.clone());
245        nodes.insert(plan.new_node.id.clone(), plan.new_node);
246        return Ok(FlowIr {
247            id: flow.id.clone(),
248            title: flow.title.clone(),
249            description: flow.description.clone(),
250            kind: flow.kind.clone(),
251            start: flow.start.clone(),
252            parameters: flow.parameters.clone(),
253            tags: flow.tags.clone(),
254            schema_version: flow.schema_version,
255            entrypoints,
256            meta: flow.meta.clone(),
257            slot_schema: flow.slot_schema.clone(),
258            nodes,
259        });
260    }
261
262    if plan.insert_before_entrypoint {
263        // Insert new node before the entrypoint target: keep anchor routing, retarget entrypoints.
264        let mut new_nodes = IndexMap::new();
265        for (id, node) in nodes.into_iter() {
266            if id == plan.anchor {
267                let mut new_node = plan.new_node.clone();
268                new_node.routing = vec![Route {
269                    to: Some(plan.anchor.clone()),
270                    ..Route::default()
271                }];
272                new_nodes.insert(new_node.id.clone(), new_node);
273            }
274            new_nodes.insert(id.clone(), node);
275        }
276
277        let mut entrypoints = flow.entrypoints.clone();
278        for (_name, target) in entrypoints.iter_mut() {
279            if target == &plan.anchor {
280                *target = plan.new_node.id.clone();
281            }
282        }
283
284        return Ok(FlowIr {
285            id: flow.id.clone(),
286            title: flow.title.clone(),
287            description: flow.description.clone(),
288            kind: flow.kind.clone(),
289            start: flow.start.clone(),
290            parameters: flow.parameters.clone(),
291            tags: flow.tags.clone(),
292            schema_version: flow.schema_version,
293            entrypoints,
294            meta: flow.meta.clone(),
295            slot_schema: flow.slot_schema.clone(),
296            nodes: new_nodes,
297        });
298    }
299
300    let mut reordered = IndexMap::new();
301    let mut anchor_found = false;
302    for (id, node) in nodes.into_iter() {
303        if id == plan.anchor {
304            anchor_found = true;
305            let mut anchor = node.clone();
306            anchor.routing = apply_threaded_routing(
307                &plan.new_node.id,
308                &plan.anchor_old_routing,
309                allow_cycles,
310                &plan.anchor,
311            )?;
312            reordered.insert(id.clone(), anchor);
313            reordered.insert(plan.new_node.id.clone(), plan.new_node.clone());
314        } else {
315            reordered.insert(id.clone(), node);
316        }
317    }
318
319    if !anchor_found {
320        return Err(FlowError::Internal {
321            message: format!("anchor '{}' not found", plan.anchor),
322            location: FlowErrorLocation::at_path(format!("nodes.{}", plan.anchor)),
323        });
324    }
325
326    Ok(FlowIr {
327        id: flow.id.clone(),
328        title: flow.title.clone(),
329        description: flow.description.clone(),
330        kind: flow.kind.clone(),
331        start: flow.start.clone(),
332        parameters: flow.parameters.clone(),
333        tags: flow.tags.clone(),
334        schema_version: flow.schema_version,
335        entrypoints: flow.entrypoints.clone(),
336        meta: flow.meta.clone(),
337        slot_schema: flow.slot_schema.clone(),
338        nodes: reordered,
339    })
340}
341
342pub fn validate_flow(flow: &FlowIr, _catalog: &dyn ComponentCatalog) -> Vec<Diagnostic> {
343    let mut diags = Vec::new();
344    if let Some((name, target)) = flow.entrypoints.get_index(0)
345        && !flow.nodes.contains_key(target)
346    {
347        diags.push(Diagnostic {
348            code: "ENTRYPOINT_MISSING",
349            message: format!("entrypoint '{}' targets unknown node '{}'", name, target),
350            location: Some(format!("entrypoints.{name}")),
351        });
352    }
353
354    for (id, node) in &flow.nodes {
355        for route in &node.routing {
356            if let Some(to) = &route.to
357                && !flow.nodes.contains_key(to)
358            {
359                diags.push(Diagnostic {
360                    code: "ROUTE_TARGET_MISSING",
361                    message: format!("node '{}' routes to unknown node '{}'", id, to),
362                    location: Some(format!("nodes.{id}.routing")),
363                });
364            }
365        }
366        if node.operation.trim().is_empty() {
367            diags.push(Diagnostic {
368                code: "OPERATION_REQUIRED",
369                message: format!("node '{}' missing operation name", id),
370                location: Some(format!("nodes.{id}")),
371            });
372        }
373        if node.payload.is_null() {
374            diags.push(Diagnostic {
375                code: "PAYLOAD_REQUIRED",
376                message: format!("node '{}' payload must not be null", id),
377                location: Some(format!("nodes.{id}")),
378            });
379        }
380    }
381
382    diags
383}
384
385pub fn diagnostics_to_error(diags: Vec<Diagnostic>) -> Result<()> {
386    if diags.is_empty() {
387        return Ok(());
388    }
389    let combined = diags
390        .into_iter()
391        .map(|d| format!("{}: {}", d.code, d.message))
392        .collect::<Vec<_>>()
393        .join("; ");
394    Err(FlowError::Internal {
395        message: combined,
396        location: FlowErrorLocation::at_path("add_step".to_string()),
397    })
398}
399
400fn resolve_anchor(flow: &FlowIr, after: Option<&str>) -> std::result::Result<String, String> {
401    if let Some(id) = after {
402        if flow.nodes.contains_key(id) {
403            return Ok(id.to_string());
404        }
405        return Err(format!("anchor node '{}' not found", id));
406    }
407
408    if flow.nodes.is_empty() {
409        // Empty flow: no anchor needed; apply_plan will insert the first node and set entrypoint.
410        return Ok(String::new());
411    }
412
413    if let Some(entry) = flow.entrypoints.get_index(0) {
414        return Ok(entry.1.clone());
415    }
416
417    if let Some(first) = flow.nodes.keys().next() {
418        return Ok(first.clone());
419    }
420
421    Err("flow has no nodes to anchor insertion".to_string())
422}
423
424pub fn apply_and_validate(
425    flow: &FlowIr,
426    plan: AddStepPlan,
427    catalog: &dyn ComponentCatalog,
428    allow_cycles: bool,
429) -> Result<FlowIr> {
430    let updated = apply_plan(flow, plan, allow_cycles)?;
431    validate_schema_and_flow(&updated, catalog)?;
432    Ok(updated)
433}
434
435/// Return ordered anchor candidates for UX: entrypoint target first (if present), then remaining nodes in insertion order.
436pub fn anchor_candidates(flow: &FlowIr) -> Vec<String> {
437    let mut seen = IndexMap::new();
438    if let Some((_name, target)) = flow.entrypoints.get_index(0) {
439        seen.insert(target.clone(), ());
440    }
441    for id in flow.nodes.keys() {
442        seen.entry(id.clone()).or_insert(());
443    }
444    seen.keys().cloned().collect()
445}
446
447/// Execute a config flow and insert its emitted node into the target flow.
448pub fn add_step_from_config_flow(
449    flow_yaml: &str,
450    config_flow_path: &Path,
451    schema_path: &Path,
452    manifests: &[impl AsRef<Path>],
453    after: Option<String>,
454    answers: &serde_json::Map<String, Value>,
455    allow_cycles: bool,
456) -> Result<FlowDoc> {
457    let flow_doc = load_ygtc_from_str(flow_yaml)?;
458    let flow_ir = FlowIr::from_doc(flow_doc)?;
459    let catalog = ManifestCatalog::load_from_paths(manifests);
460
461    let config_yaml = fs::read_to_string(config_flow_path).map_err(|e| FlowError::Internal {
462        message: format!("read config flow {}: {e}", config_flow_path.display()),
463        location: FlowErrorLocation::at_path(config_flow_path.display().to_string())
464            .with_source_path(Some(config_flow_path)),
465    })?;
466    let output = run_config_flow(&config_yaml, schema_path, answers, None)?;
467    let node_id_hint = normalize_node_id_hint(Some(output.node_id.clone()), &output.node);
468
469    let spec = AddStepSpec {
470        after,
471        node_id_hint,
472        node: output.node.clone(),
473        allow_cycles,
474        require_placeholder: true,
475    };
476
477    let plan =
478        plan_add_step(&flow_ir, spec, &catalog).map_err(|diags| {
479            match diagnostics_to_error(diags) {
480                Ok(_) => FlowError::Internal {
481                    message: "add_step diagnostics unexpectedly empty".to_string(),
482                    location: FlowErrorLocation::at_path("add_step".to_string()),
483                },
484                Err(e) => e,
485            }
486        })?;
487    let updated = apply_and_validate(&flow_ir, plan, &catalog, allow_cycles)?;
488    updated.to_doc()
489}