1pub mod id;
2pub mod modes;
3pub mod normalize;
4pub mod rewire;
5pub mod validate;
6
7use indexmap::IndexMap;
8use serde_json::Value;
9use std::{fs, path::Path};
10
11use crate::{
12 component_catalog::ComponentCatalog,
13 component_catalog::ManifestCatalog,
14 config_flow::run_config_flow,
15 error::{FlowError, FlowErrorLocation, Result},
16 flow_ir::{FlowIr, NodeIr, Route},
17 loader::load_ygtc_from_str,
18 model::FlowDoc,
19};
20
21use self::{
22 id::{generate_node_id, is_placeholder_value},
23 normalize::normalize_node_map,
24 rewire::{apply_threaded_routing, rewrite_placeholder_routes},
25 validate::validate_schema_and_flow,
26};
27
28#[derive(Debug, Clone)]
29pub struct AddStepSpec {
30 pub after: Option<String>,
31 pub node_id_hint: Option<String>,
32 pub node: Value,
33 pub allow_cycles: bool,
34 pub require_placeholder: bool,
35}
36
37#[derive(Debug, Clone)]
38pub struct AddStepPlan {
39 pub anchor: String,
40 pub new_node: NodeIr,
41 pub anchor_old_routing: Vec<Route>,
42 pub insert_before_entrypoint: bool,
43}
44
45#[derive(Debug, Clone)]
46pub struct Diagnostic {
47 pub code: &'static str,
48 pub message: String,
49 pub location: Option<String>,
50}
51
52fn looks_like_component_id(hint: &str) -> bool {
53 let trimmed = hint.trim();
54 if trimmed.contains('.') {
55 return true;
56 }
57 let parts: Vec<&str> = trimmed.split('_').filter(|p| !p.is_empty()).collect();
58 parts.len() >= 3
59}
60
61fn simplify_component_name(raw: &str) -> Option<String> {
62 let mut candidate = raw.trim();
63 if candidate.is_empty() {
64 return None;
65 }
66 if let Some(last) = candidate.rsplit(['/', '\\']).next() {
67 candidate = last;
68 }
69 if let Some((base, _)) = candidate.split_once('@') {
70 candidate = base;
71 }
72 if let Some((base, _)) = candidate.split_once(':') {
73 candidate = base;
74 }
75 if let Some(last) = candidate.rsplit('.').next() {
76 candidate = last;
77 }
78 let underscore_parts: Vec<&str> = candidate.split('_').filter(|p| !p.is_empty()).collect();
79 if underscore_parts.len() >= 3 {
80 candidate = underscore_parts[underscore_parts.len() - 1];
81 }
82 let normalized = candidate.replace('_', "-");
83 if normalized.trim().is_empty() {
84 None
85 } else {
86 Some(normalized)
87 }
88}
89
90fn component_name_from_node(node: &Value) -> Option<String> {
91 let obj = node.as_object()?;
92 if let Some(exec) = obj.get("component.exec")
93 && let Some(component) = exec.get("component").and_then(Value::as_str)
94 {
95 return simplify_component_name(component);
96 }
97 if let Some(component) = obj.get("component").and_then(Value::as_str) {
98 return simplify_component_name(component);
99 }
100 None
101}
102
103pub fn normalize_node_id_hint(hint: Option<String>, node: &Value) -> Option<String> {
104 let derived = component_name_from_node(node);
105 match (hint.as_deref(), derived) {
106 (_, None) => hint,
107 (None, Some(name)) => Some(name),
108 (Some(existing), Some(name)) => {
109 if existing.trim().is_empty()
110 || is_placeholder_value(existing)
111 || looks_like_component_id(existing)
112 {
113 return Some(name);
114 }
115 Some(existing.to_string())
116 }
117 }
118}
119
120pub fn plan_add_step(
121 flow: &FlowIr,
122 spec: AddStepSpec,
123 _catalog: &dyn ComponentCatalog,
124) -> std::result::Result<AddStepPlan, Vec<Diagnostic>> {
125 let mut diags = Vec::new();
126
127 let anchor_source = match resolve_anchor(flow, spec.after.as_deref()) {
128 Ok(anchor) => anchor,
129 Err(msg) => {
130 diags.push(Diagnostic {
131 code: "ADD_STEP_ANCHOR_MISSING",
132 message: msg,
133 location: Some("nodes".to_string()),
134 });
135 return Err(diags);
136 }
137 };
138 let mut insert_before_entrypoint = false;
139 if spec.after.is_none()
140 && let Some((_, target)) = flow.entrypoints.get_index(0)
141 && target == &anchor_source
142 {
143 insert_before_entrypoint = true;
144 }
145 let anchor = anchor_source;
146
147 if let Some(hint) = spec.node_id_hint.as_deref()
148 && is_placeholder_value(hint)
149 {
150 diags.push(Diagnostic {
151 code: "ADD_STEP_NODE_ID_PLACEHOLDER",
152 message: format!(
153 "Config flow emitted placeholder node id '{hint}'; update greentic-component to emit the component name."
154 ),
155 location: Some("add_step.node_id".to_string()),
156 });
157 return Err(diags);
158 }
159
160 let normalized = match normalize_node_map(spec.node.clone()) {
161 Ok(node) => node,
162 Err(e) => {
163 diags.push(Diagnostic {
164 code: "ADD_STEP_NODE_INVALID",
165 message: e.to_string(),
166 location: Some("add_step.node".to_string()),
167 });
168 return Err(diags);
169 }
170 };
171
172 let anchor_old_routing = if let Some(anchor_node) = flow.nodes.get(&anchor) {
173 anchor_node.routing.clone()
174 } else if flow.nodes.is_empty() {
175 Vec::new()
176 } else {
177 return Err(vec![Diagnostic {
178 code: "ADD_STEP_ANCHOR_MISSING",
179 message: format!("anchor node '{}' not found", anchor),
180 location: Some("nodes".to_string()),
181 }]);
182 };
183
184 let hint = spec
185 .node_id_hint
186 .as_deref()
187 .or(Some(normalized.operation.as_str()));
188 let new_node_id = generate_node_id(hint, &anchor, flow.nodes.keys().map(|k| k.as_str()));
189
190 let routing = rewrite_placeholder_routes(
191 normalized.routing.clone(),
192 &anchor_old_routing,
193 spec.allow_cycles,
194 &anchor,
195 spec.require_placeholder,
196 )
197 .map_err(|msg| {
198 vec![Diagnostic {
199 code: "ADD_STEP_ROUTING_INVALID",
200 message: msg,
201 location: Some(format!("nodes.{new_node_id}.routing")),
202 }]
203 })?;
204
205 if routing.is_empty() {
206 return Err(vec![Diagnostic {
207 code: "ADD_STEP_ROUTING_MISSING",
208 message: "add-step requires at least one routing target; use --routing-* or include routing in config flow output".to_string(),
209 location: Some(format!("nodes.{new_node_id}.routing")),
210 }]);
211 }
212
213 let new_node = NodeIr {
214 id: new_node_id.clone(),
215 operation: normalized.operation.clone(),
216 payload: normalized.payload.clone(),
217 output: serde_json::Value::Object(Default::default()),
218 in_map: None,
219 out_map: None,
220 err_map: None,
221 routing,
222 telemetry: normalized.telemetry.clone(),
223 };
224
225 Ok(AddStepPlan {
226 anchor,
227 new_node,
228 anchor_old_routing,
229 insert_before_entrypoint,
230 })
231}
232
233pub fn apply_plan(flow: &FlowIr, plan: AddStepPlan, allow_cycles: bool) -> Result<FlowIr> {
234 let mut nodes: IndexMap<String, NodeIr> = flow.nodes.clone();
235 if nodes.contains_key(&plan.new_node.id) {
236 return Err(FlowError::Internal {
237 message: format!("node '{}' already exists", plan.new_node.id),
238 location: FlowErrorLocation::at_path(format!("nodes.{}", plan.new_node.id)),
239 });
240 }
241
242 if nodes.is_empty() {
243 let mut entrypoints = IndexMap::new();
244 entrypoints.insert("default".to_string(), plan.new_node.id.clone());
245 nodes.insert(plan.new_node.id.clone(), plan.new_node);
246 return Ok(FlowIr {
247 id: flow.id.clone(),
248 title: flow.title.clone(),
249 description: flow.description.clone(),
250 kind: flow.kind.clone(),
251 start: flow.start.clone(),
252 parameters: flow.parameters.clone(),
253 tags: flow.tags.clone(),
254 schema_version: flow.schema_version,
255 entrypoints,
256 meta: flow.meta.clone(),
257 slot_schema: flow.slot_schema.clone(),
258 nodes,
259 });
260 }
261
262 if plan.insert_before_entrypoint {
263 let mut new_nodes = IndexMap::new();
265 for (id, node) in nodes.into_iter() {
266 if id == plan.anchor {
267 let mut new_node = plan.new_node.clone();
268 new_node.routing = vec![Route {
269 to: Some(plan.anchor.clone()),
270 ..Route::default()
271 }];
272 new_nodes.insert(new_node.id.clone(), new_node);
273 }
274 new_nodes.insert(id.clone(), node);
275 }
276
277 let mut entrypoints = flow.entrypoints.clone();
278 for (_name, target) in entrypoints.iter_mut() {
279 if target == &plan.anchor {
280 *target = plan.new_node.id.clone();
281 }
282 }
283
284 return Ok(FlowIr {
285 id: flow.id.clone(),
286 title: flow.title.clone(),
287 description: flow.description.clone(),
288 kind: flow.kind.clone(),
289 start: flow.start.clone(),
290 parameters: flow.parameters.clone(),
291 tags: flow.tags.clone(),
292 schema_version: flow.schema_version,
293 entrypoints,
294 meta: flow.meta.clone(),
295 slot_schema: flow.slot_schema.clone(),
296 nodes: new_nodes,
297 });
298 }
299
300 let mut reordered = IndexMap::new();
301 let mut anchor_found = false;
302 for (id, node) in nodes.into_iter() {
303 if id == plan.anchor {
304 anchor_found = true;
305 let mut anchor = node.clone();
306 anchor.routing = apply_threaded_routing(
307 &plan.new_node.id,
308 &plan.anchor_old_routing,
309 allow_cycles,
310 &plan.anchor,
311 )?;
312 reordered.insert(id.clone(), anchor);
313 reordered.insert(plan.new_node.id.clone(), plan.new_node.clone());
314 } else {
315 reordered.insert(id.clone(), node);
316 }
317 }
318
319 if !anchor_found {
320 return Err(FlowError::Internal {
321 message: format!("anchor '{}' not found", plan.anchor),
322 location: FlowErrorLocation::at_path(format!("nodes.{}", plan.anchor)),
323 });
324 }
325
326 Ok(FlowIr {
327 id: flow.id.clone(),
328 title: flow.title.clone(),
329 description: flow.description.clone(),
330 kind: flow.kind.clone(),
331 start: flow.start.clone(),
332 parameters: flow.parameters.clone(),
333 tags: flow.tags.clone(),
334 schema_version: flow.schema_version,
335 entrypoints: flow.entrypoints.clone(),
336 meta: flow.meta.clone(),
337 slot_schema: flow.slot_schema.clone(),
338 nodes: reordered,
339 })
340}
341
342pub fn validate_flow(flow: &FlowIr, _catalog: &dyn ComponentCatalog) -> Vec<Diagnostic> {
343 let mut diags = Vec::new();
344 if let Some((name, target)) = flow.entrypoints.get_index(0)
345 && !flow.nodes.contains_key(target)
346 {
347 diags.push(Diagnostic {
348 code: "ENTRYPOINT_MISSING",
349 message: format!("entrypoint '{}' targets unknown node '{}'", name, target),
350 location: Some(format!("entrypoints.{name}")),
351 });
352 }
353
354 for (id, node) in &flow.nodes {
355 for route in &node.routing {
356 if let Some(to) = &route.to
357 && !flow.nodes.contains_key(to)
358 {
359 diags.push(Diagnostic {
360 code: "ROUTE_TARGET_MISSING",
361 message: format!("node '{}' routes to unknown node '{}'", id, to),
362 location: Some(format!("nodes.{id}.routing")),
363 });
364 }
365 }
366 if node.operation.trim().is_empty() {
367 diags.push(Diagnostic {
368 code: "OPERATION_REQUIRED",
369 message: format!("node '{}' missing operation name", id),
370 location: Some(format!("nodes.{id}")),
371 });
372 }
373 if node.payload.is_null() {
374 diags.push(Diagnostic {
375 code: "PAYLOAD_REQUIRED",
376 message: format!("node '{}' payload must not be null", id),
377 location: Some(format!("nodes.{id}")),
378 });
379 }
380 }
381
382 diags
383}
384
385pub fn diagnostics_to_error(diags: Vec<Diagnostic>) -> Result<()> {
386 if diags.is_empty() {
387 return Ok(());
388 }
389 let combined = diags
390 .into_iter()
391 .map(|d| format!("{}: {}", d.code, d.message))
392 .collect::<Vec<_>>()
393 .join("; ");
394 Err(FlowError::Internal {
395 message: combined,
396 location: FlowErrorLocation::at_path("add_step".to_string()),
397 })
398}
399
400fn resolve_anchor(flow: &FlowIr, after: Option<&str>) -> std::result::Result<String, String> {
401 if let Some(id) = after {
402 if flow.nodes.contains_key(id) {
403 return Ok(id.to_string());
404 }
405 return Err(format!("anchor node '{}' not found", id));
406 }
407
408 if flow.nodes.is_empty() {
409 return Ok(String::new());
411 }
412
413 if let Some(entry) = flow.entrypoints.get_index(0) {
414 return Ok(entry.1.clone());
415 }
416
417 if let Some(first) = flow.nodes.keys().next() {
418 return Ok(first.clone());
419 }
420
421 Err("flow has no nodes to anchor insertion".to_string())
422}
423
424pub fn apply_and_validate(
425 flow: &FlowIr,
426 plan: AddStepPlan,
427 catalog: &dyn ComponentCatalog,
428 allow_cycles: bool,
429) -> Result<FlowIr> {
430 let updated = apply_plan(flow, plan, allow_cycles)?;
431 validate_schema_and_flow(&updated, catalog)?;
432 Ok(updated)
433}
434
435pub fn anchor_candidates(flow: &FlowIr) -> Vec<String> {
437 let mut seen = IndexMap::new();
438 if let Some((_name, target)) = flow.entrypoints.get_index(0) {
439 seen.insert(target.clone(), ());
440 }
441 for id in flow.nodes.keys() {
442 seen.entry(id.clone()).or_insert(());
443 }
444 seen.keys().cloned().collect()
445}
446
447pub fn add_step_from_config_flow(
449 flow_yaml: &str,
450 config_flow_path: &Path,
451 schema_path: &Path,
452 manifests: &[impl AsRef<Path>],
453 after: Option<String>,
454 answers: &serde_json::Map<String, Value>,
455 allow_cycles: bool,
456) -> Result<FlowDoc> {
457 let flow_doc = load_ygtc_from_str(flow_yaml)?;
458 let flow_ir = FlowIr::from_doc(flow_doc)?;
459 let catalog = ManifestCatalog::load_from_paths(manifests);
460
461 let config_yaml = fs::read_to_string(config_flow_path).map_err(|e| FlowError::Internal {
462 message: format!("read config flow {}: {e}", config_flow_path.display()),
463 location: FlowErrorLocation::at_path(config_flow_path.display().to_string())
464 .with_source_path(Some(config_flow_path)),
465 })?;
466 let output = run_config_flow(&config_yaml, schema_path, answers, None)?;
467 let node_id_hint = normalize_node_id_hint(Some(output.node_id.clone()), &output.node);
468
469 let spec = AddStepSpec {
470 after,
471 node_id_hint,
472 node: output.node.clone(),
473 allow_cycles,
474 require_placeholder: true,
475 };
476
477 let plan =
478 plan_add_step(&flow_ir, spec, &catalog).map_err(|diags| {
479 match diagnostics_to_error(diags) {
480 Ok(_) => FlowError::Internal {
481 message: "add_step diagnostics unexpectedly empty".to_string(),
482 location: FlowErrorLocation::at_path("add_step".to_string()),
483 },
484 Err(e) => e,
485 }
486 })?;
487 let updated = apply_and_validate(&flow_ir, plan, &catalog, allow_cycles)?;
488 updated.to_doc()
489}