1pub mod id;
2pub mod modes;
3pub mod normalize;
4pub mod rewire;
5pub mod validate;
6
7use indexmap::IndexMap;
8use serde_json::Value;
9use std::{fs, path::Path};
10
11use crate::{
12 component_catalog::ComponentCatalog,
13 component_catalog::ManifestCatalog,
14 config_flow::run_config_flow,
15 error::{FlowError, FlowErrorLocation, Result},
16 flow_ir::{FlowIr, NodeIr, Route},
17 loader::load_ygtc_from_str,
18 model::FlowDoc,
19};
20
21use self::{
22 id::{generate_node_id, is_placeholder_value},
23 normalize::normalize_node_map,
24 rewire::{apply_threaded_routing, rewrite_placeholder_routes},
25 validate::validate_schema_and_flow,
26};
27
28#[derive(Debug, Clone)]
29pub struct AddStepSpec {
30 pub after: Option<String>,
31 pub node_id_hint: Option<String>,
32 pub node: Value,
33 pub allow_cycles: bool,
34 pub require_placeholder: bool,
35}
36
37#[derive(Debug, Clone)]
38pub struct AddStepPlan {
39 pub anchor: String,
40 pub new_node: NodeIr,
41 pub anchor_old_routing: Vec<Route>,
42 pub insert_before_entrypoint: bool,
43}
44
45#[derive(Debug, Clone)]
46pub struct Diagnostic {
47 pub code: &'static str,
48 pub message: String,
49 pub location: Option<String>,
50}
51
52fn looks_like_component_id(hint: &str) -> bool {
53 let trimmed = hint.trim();
54 if trimmed.contains('.') {
55 return true;
56 }
57 let parts: Vec<&str> = trimmed.split('_').filter(|p| !p.is_empty()).collect();
58 parts.len() >= 3
59}
60
61fn simplify_component_name(raw: &str) -> Option<String> {
62 let mut candidate = raw.trim();
63 if candidate.is_empty() {
64 return None;
65 }
66 if let Some(last) = candidate.rsplit(['/', '\\']).next() {
67 candidate = last;
68 }
69 if let Some((base, _)) = candidate.split_once('@') {
70 candidate = base;
71 }
72 if let Some((base, _)) = candidate.split_once(':') {
73 candidate = base;
74 }
75 if let Some(last) = candidate.rsplit('.').next() {
76 candidate = last;
77 }
78 let underscore_parts: Vec<&str> = candidate.split('_').filter(|p| !p.is_empty()).collect();
79 if underscore_parts.len() >= 3 {
80 candidate = underscore_parts[underscore_parts.len() - 1];
81 }
82 let normalized = candidate.replace('_', "-");
83 if normalized.trim().is_empty() {
84 None
85 } else {
86 Some(normalized)
87 }
88}
89
90fn component_name_from_node(node: &Value) -> Option<String> {
91 let obj = node.as_object()?;
92 if let Some(exec) = obj.get("component.exec")
93 && let Some(component) = exec.get("component").and_then(Value::as_str)
94 {
95 return simplify_component_name(component);
96 }
97 if let Some(component) = obj.get("component").and_then(Value::as_str) {
98 return simplify_component_name(component);
99 }
100 None
101}
102
103pub fn normalize_node_id_hint(hint: Option<String>, node: &Value) -> Option<String> {
104 let derived = component_name_from_node(node);
105 match (hint.as_deref(), derived) {
106 (_, None) => hint,
107 (None, Some(name)) => Some(name),
108 (Some(existing), Some(name)) => {
109 if existing.trim().is_empty()
110 || is_placeholder_value(existing)
111 || looks_like_component_id(existing)
112 {
113 return Some(name);
114 }
115 Some(existing.to_string())
116 }
117 }
118}
119
120pub fn plan_add_step(
121 flow: &FlowIr,
122 spec: AddStepSpec,
123 _catalog: &dyn ComponentCatalog,
124) -> std::result::Result<AddStepPlan, Vec<Diagnostic>> {
125 let mut diags = Vec::new();
126
127 let anchor_source = match resolve_anchor(flow, spec.after.as_deref()) {
128 Ok(anchor) => anchor,
129 Err(msg) => {
130 diags.push(Diagnostic {
131 code: "ADD_STEP_ANCHOR_MISSING",
132 message: msg,
133 location: Some("nodes".to_string()),
134 });
135 return Err(diags);
136 }
137 };
138 let mut insert_before_entrypoint = false;
139 if spec.after.is_none()
140 && let Some((_, target)) = flow.entrypoints.get_index(0)
141 && target == &anchor_source
142 {
143 insert_before_entrypoint = true;
144 }
145 let anchor = anchor_source;
146
147 if let Some(hint) = spec.node_id_hint.as_deref()
148 && is_placeholder_value(hint)
149 {
150 diags.push(Diagnostic {
151 code: "ADD_STEP_NODE_ID_PLACEHOLDER",
152 message: format!(
153 "Config flow emitted placeholder node id '{hint}'; update greentic-component to emit the component name."
154 ),
155 location: Some("add_step.node_id".to_string()),
156 });
157 return Err(diags);
158 }
159
160 let normalized = match normalize_node_map(spec.node.clone()) {
161 Ok(node) => node,
162 Err(e) => {
163 diags.push(Diagnostic {
164 code: "ADD_STEP_NODE_INVALID",
165 message: e.to_string(),
166 location: Some("add_step.node".to_string()),
167 });
168 return Err(diags);
169 }
170 };
171
172 let anchor_old_routing = if let Some(anchor_node) = flow.nodes.get(&anchor) {
173 anchor_node.routing.clone()
174 } else if flow.nodes.is_empty() {
175 Vec::new()
176 } else {
177 return Err(vec![Diagnostic {
178 code: "ADD_STEP_ANCHOR_MISSING",
179 message: format!("anchor node '{}' not found", anchor),
180 location: Some("nodes".to_string()),
181 }]);
182 };
183
184 let hint = spec
185 .node_id_hint
186 .as_deref()
187 .or(Some(normalized.operation.as_str()));
188 let new_node_id = generate_node_id(hint, &anchor, flow.nodes.keys().map(|k| k.as_str()));
189
190 let routing = rewrite_placeholder_routes(
191 normalized.routing.clone(),
192 &anchor_old_routing,
193 spec.allow_cycles,
194 &anchor,
195 spec.require_placeholder,
196 )
197 .map_err(|msg| {
198 vec![Diagnostic {
199 code: "ADD_STEP_ROUTING_INVALID",
200 message: msg,
201 location: Some(format!("nodes.{new_node_id}.routing")),
202 }]
203 })?;
204
205 if routing.is_empty() {
206 return Err(vec![Diagnostic {
207 code: "ADD_STEP_ROUTING_MISSING",
208 message: "add-step requires at least one routing target; use --routing-* or include routing in config flow output".to_string(),
209 location: Some(format!("nodes.{new_node_id}.routing")),
210 }]);
211 }
212
213 let new_node = NodeIr {
214 id: new_node_id.clone(),
215 operation: normalized.operation.clone(),
216 payload: normalized.payload.clone(),
217 output: serde_json::Value::Object(Default::default()),
218 routing,
219 telemetry: normalized.telemetry.clone(),
220 };
221
222 Ok(AddStepPlan {
223 anchor,
224 new_node,
225 anchor_old_routing,
226 insert_before_entrypoint,
227 })
228}
229
230pub fn apply_plan(flow: &FlowIr, plan: AddStepPlan, allow_cycles: bool) -> Result<FlowIr> {
231 let mut nodes: IndexMap<String, NodeIr> = flow.nodes.clone();
232 if nodes.contains_key(&plan.new_node.id) {
233 return Err(FlowError::Internal {
234 message: format!("node '{}' already exists", plan.new_node.id),
235 location: FlowErrorLocation::at_path(format!("nodes.{}", plan.new_node.id)),
236 });
237 }
238
239 if nodes.is_empty() {
240 let mut entrypoints = IndexMap::new();
241 entrypoints.insert("default".to_string(), plan.new_node.id.clone());
242 nodes.insert(plan.new_node.id.clone(), plan.new_node);
243 return Ok(FlowIr {
244 id: flow.id.clone(),
245 kind: flow.kind.clone(),
246 schema_version: flow.schema_version,
247 entrypoints,
248 nodes,
249 });
250 }
251
252 if plan.insert_before_entrypoint {
253 let mut new_nodes = IndexMap::new();
255 for (id, node) in nodes.into_iter() {
256 if id == plan.anchor {
257 let mut new_node = plan.new_node.clone();
258 new_node.routing = vec![Route {
259 to: Some(plan.anchor.clone()),
260 ..Route::default()
261 }];
262 new_nodes.insert(new_node.id.clone(), new_node);
263 }
264 new_nodes.insert(id.clone(), node);
265 }
266
267 let mut entrypoints = flow.entrypoints.clone();
268 for (_name, target) in entrypoints.iter_mut() {
269 if target == &plan.anchor {
270 *target = plan.new_node.id.clone();
271 }
272 }
273
274 return Ok(FlowIr {
275 id: flow.id.clone(),
276 kind: flow.kind.clone(),
277 schema_version: flow.schema_version,
278 entrypoints,
279 nodes: new_nodes,
280 });
281 }
282
283 let mut reordered = IndexMap::new();
284 let mut anchor_found = false;
285 for (id, node) in nodes.into_iter() {
286 if id == plan.anchor {
287 anchor_found = true;
288 let mut anchor = node.clone();
289 anchor.routing = apply_threaded_routing(
290 &plan.new_node.id,
291 &plan.anchor_old_routing,
292 allow_cycles,
293 &plan.anchor,
294 )?;
295 reordered.insert(id.clone(), anchor);
296 reordered.insert(plan.new_node.id.clone(), plan.new_node.clone());
297 } else {
298 reordered.insert(id.clone(), node);
299 }
300 }
301
302 if !anchor_found {
303 return Err(FlowError::Internal {
304 message: format!("anchor '{}' not found", plan.anchor),
305 location: FlowErrorLocation::at_path(format!("nodes.{}", plan.anchor)),
306 });
307 }
308
309 Ok(FlowIr {
310 id: flow.id.clone(),
311 kind: flow.kind.clone(),
312 schema_version: flow.schema_version,
313 entrypoints: flow.entrypoints.clone(),
314 nodes: reordered,
315 })
316}
317
318pub fn validate_flow(flow: &FlowIr, _catalog: &dyn ComponentCatalog) -> Vec<Diagnostic> {
319 let mut diags = Vec::new();
320 if let Some((name, target)) = flow.entrypoints.get_index(0)
321 && !flow.nodes.contains_key(target)
322 {
323 diags.push(Diagnostic {
324 code: "ENTRYPOINT_MISSING",
325 message: format!("entrypoint '{}' targets unknown node '{}'", name, target),
326 location: Some(format!("entrypoints.{name}")),
327 });
328 }
329
330 for (id, node) in &flow.nodes {
331 for route in &node.routing {
332 if let Some(to) = &route.to
333 && !flow.nodes.contains_key(to)
334 {
335 diags.push(Diagnostic {
336 code: "ROUTE_TARGET_MISSING",
337 message: format!("node '{}' routes to unknown node '{}'", id, to),
338 location: Some(format!("nodes.{id}.routing")),
339 });
340 }
341 }
342 if node.operation.trim().is_empty() {
343 diags.push(Diagnostic {
344 code: "OPERATION_REQUIRED",
345 message: format!("node '{}' missing operation name", id),
346 location: Some(format!("nodes.{id}")),
347 });
348 }
349 if node.payload.is_null() {
350 diags.push(Diagnostic {
351 code: "PAYLOAD_REQUIRED",
352 message: format!("node '{}' payload must not be null", id),
353 location: Some(format!("nodes.{id}")),
354 });
355 }
356 }
357
358 diags
359}
360
361pub fn diagnostics_to_error(diags: Vec<Diagnostic>) -> Result<()> {
362 if diags.is_empty() {
363 return Ok(());
364 }
365 let combined = diags
366 .into_iter()
367 .map(|d| format!("{}: {}", d.code, d.message))
368 .collect::<Vec<_>>()
369 .join("; ");
370 Err(FlowError::Internal {
371 message: combined,
372 location: FlowErrorLocation::at_path("add_step".to_string()),
373 })
374}
375
376fn resolve_anchor(flow: &FlowIr, after: Option<&str>) -> std::result::Result<String, String> {
377 if let Some(id) = after {
378 if flow.nodes.contains_key(id) {
379 return Ok(id.to_string());
380 }
381 return Err(format!("anchor node '{}' not found", id));
382 }
383
384 if flow.nodes.is_empty() {
385 return Ok(String::new());
387 }
388
389 if let Some(entry) = flow.entrypoints.get_index(0) {
390 return Ok(entry.1.clone());
391 }
392
393 if let Some(first) = flow.nodes.keys().next() {
394 return Ok(first.clone());
395 }
396
397 Err("flow has no nodes to anchor insertion".to_string())
398}
399
400pub fn apply_and_validate(
401 flow: &FlowIr,
402 plan: AddStepPlan,
403 catalog: &dyn ComponentCatalog,
404 allow_cycles: bool,
405) -> Result<FlowIr> {
406 let updated = apply_plan(flow, plan, allow_cycles)?;
407 validate_schema_and_flow(&updated, catalog)?;
408 Ok(updated)
409}
410
411pub fn anchor_candidates(flow: &FlowIr) -> Vec<String> {
413 let mut seen = IndexMap::new();
414 if let Some((_name, target)) = flow.entrypoints.get_index(0) {
415 seen.insert(target.clone(), ());
416 }
417 for id in flow.nodes.keys() {
418 seen.entry(id.clone()).or_insert(());
419 }
420 seen.keys().cloned().collect()
421}
422
423pub fn add_step_from_config_flow(
425 flow_yaml: &str,
426 config_flow_path: &Path,
427 schema_path: &Path,
428 manifests: &[impl AsRef<Path>],
429 after: Option<String>,
430 answers: &serde_json::Map<String, Value>,
431 allow_cycles: bool,
432) -> Result<FlowDoc> {
433 let flow_doc = load_ygtc_from_str(flow_yaml)?;
434 let flow_ir = FlowIr::from_doc(flow_doc)?;
435 let catalog = ManifestCatalog::load_from_paths(manifests);
436
437 let config_yaml = fs::read_to_string(config_flow_path).map_err(|e| FlowError::Internal {
438 message: format!("read config flow {}: {e}", config_flow_path.display()),
439 location: FlowErrorLocation::at_path(config_flow_path.display().to_string())
440 .with_source_path(Some(config_flow_path)),
441 })?;
442 let output = run_config_flow(&config_yaml, schema_path, answers, None)?;
443 let node_id_hint = normalize_node_id_hint(Some(output.node_id.clone()), &output.node);
444
445 let spec = AddStepSpec {
446 after,
447 node_id_hint,
448 node: output.node.clone(),
449 allow_cycles,
450 require_placeholder: true,
451 };
452
453 let plan =
454 plan_add_step(&flow_ir, spec, &catalog).map_err(|diags| {
455 match diagnostics_to_error(diags) {
456 Ok(_) => FlowError::Internal {
457 message: "add_step diagnostics unexpectedly empty".to_string(),
458 location: FlowErrorLocation::at_path("add_step".to_string()),
459 },
460 Err(e) => e,
461 }
462 })?;
463 let updated = apply_and_validate(&flow_ir, plan, &catalog, allow_cycles)?;
464 updated.to_doc()
465}