1pub mod id;
2pub mod modes;
3pub mod normalize;
4pub mod rewire;
5pub mod validate;
6
7use indexmap::IndexMap;
8use serde_json::Value;
9use std::{fs, path::Path};
10
11use crate::{
12 component_catalog::ComponentCatalog,
13 component_catalog::ManifestCatalog,
14 config_flow::run_config_flow,
15 error::{FlowError, FlowErrorLocation, Result},
16 flow_ir::{FlowIr, NodeIr, Route},
17 loader::load_ygtc_from_str,
18 model::FlowDoc,
19};
20
21use self::{
22 id::{generate_node_id, is_placeholder_value},
23 normalize::normalize_node_map,
24 rewire::{apply_threaded_routing, rewrite_placeholder_routes},
25 validate::validate_schema_and_flow,
26};
27
28#[derive(Debug, Clone)]
29pub struct AddStepSpec {
30 pub after: Option<String>,
31 pub node_id_hint: Option<String>,
32 pub node: Value,
33 pub allow_cycles: bool,
34 pub require_placeholder: bool,
35}
36
37#[derive(Debug, Clone)]
38pub struct AddStepPlan {
39 pub anchor: String,
40 pub new_node: NodeIr,
41 pub anchor_old_routing: Vec<Route>,
42 pub insert_before_entrypoint: bool,
43}
44
45#[derive(Debug, Clone)]
46pub struct Diagnostic {
47 pub code: &'static str,
48 pub message: String,
49 pub location: Option<String>,
50}
51
52pub fn plan_add_step(
53 flow: &FlowIr,
54 spec: AddStepSpec,
55 _catalog: &dyn ComponentCatalog,
56) -> std::result::Result<AddStepPlan, Vec<Diagnostic>> {
57 let mut diags = Vec::new();
58
59 let anchor_source = match resolve_anchor(flow, spec.after.as_deref()) {
60 Ok(anchor) => anchor,
61 Err(msg) => {
62 diags.push(Diagnostic {
63 code: "ADD_STEP_ANCHOR_MISSING",
64 message: msg,
65 location: Some("nodes".to_string()),
66 });
67 return Err(diags);
68 }
69 };
70 let mut insert_before_entrypoint = false;
71 if spec.after.is_none()
72 && let Some((_, target)) = flow.entrypoints.get_index(0)
73 && target == &anchor_source
74 {
75 insert_before_entrypoint = true;
76 }
77 let anchor = anchor_source;
78
79 if let Some(hint) = spec.node_id_hint.as_deref()
80 && is_placeholder_value(hint)
81 {
82 diags.push(Diagnostic {
83 code: "ADD_STEP_NODE_ID_PLACEHOLDER",
84 message: format!(
85 "Config flow emitted placeholder node id '{hint}'; update greentic-component to emit the component name."
86 ),
87 location: Some("add_step.node_id".to_string()),
88 });
89 return Err(diags);
90 }
91
92 let normalized = match normalize_node_map(spec.node.clone()) {
93 Ok(node) => node,
94 Err(e) => {
95 diags.push(Diagnostic {
96 code: "ADD_STEP_NODE_INVALID",
97 message: e.to_string(),
98 location: Some("add_step.node".to_string()),
99 });
100 return Err(diags);
101 }
102 };
103
104 let anchor_old_routing = if let Some(anchor_node) = flow.nodes.get(&anchor) {
105 anchor_node.routing.clone()
106 } else if flow.nodes.is_empty() {
107 Vec::new()
108 } else {
109 return Err(vec![Diagnostic {
110 code: "ADD_STEP_ANCHOR_MISSING",
111 message: format!("anchor node '{}' not found", anchor),
112 location: Some("nodes".to_string()),
113 }]);
114 };
115
116 let hint = spec
117 .node_id_hint
118 .as_deref()
119 .or(Some(normalized.operation.as_str()));
120 let new_node_id = generate_node_id(hint, &anchor, flow.nodes.keys().map(|k| k.as_str()));
121
122 let routing = rewrite_placeholder_routes(
123 normalized.routing.clone(),
124 &anchor_old_routing,
125 spec.allow_cycles,
126 &anchor,
127 spec.require_placeholder,
128 )
129 .map_err(|msg| {
130 vec![Diagnostic {
131 code: "ADD_STEP_ROUTING_INVALID",
132 message: msg,
133 location: Some(format!("nodes.{new_node_id}.routing")),
134 }]
135 })?;
136
137 let new_node = NodeIr {
138 id: new_node_id.clone(),
139 operation: normalized.operation.clone(),
140 payload: normalized.payload.clone(),
141 output: serde_json::Value::Object(Default::default()),
142 routing,
143 telemetry: normalized.telemetry.clone(),
144 };
145
146 Ok(AddStepPlan {
147 anchor,
148 new_node,
149 anchor_old_routing,
150 insert_before_entrypoint,
151 })
152}
153
154pub fn apply_plan(flow: &FlowIr, plan: AddStepPlan, allow_cycles: bool) -> Result<FlowIr> {
155 let mut nodes: IndexMap<String, NodeIr> = flow.nodes.clone();
156 if nodes.contains_key(&plan.new_node.id) {
157 return Err(FlowError::Internal {
158 message: format!("node '{}' already exists", plan.new_node.id),
159 location: FlowErrorLocation::at_path(format!("nodes.{}", plan.new_node.id)),
160 });
161 }
162
163 if nodes.is_empty() {
164 let mut entrypoints = IndexMap::new();
165 entrypoints.insert("default".to_string(), plan.new_node.id.clone());
166 nodes.insert(plan.new_node.id.clone(), plan.new_node);
167 return Ok(FlowIr {
168 id: flow.id.clone(),
169 kind: flow.kind.clone(),
170 schema_version: flow.schema_version,
171 entrypoints,
172 nodes,
173 });
174 }
175
176 if plan.insert_before_entrypoint {
177 let mut new_nodes = IndexMap::new();
179 for (id, node) in nodes.into_iter() {
180 if id == plan.anchor {
181 let mut new_node = plan.new_node.clone();
182 new_node.routing = vec![Route {
183 to: Some(plan.anchor.clone()),
184 ..Route::default()
185 }];
186 new_nodes.insert(new_node.id.clone(), new_node);
187 }
188 new_nodes.insert(id.clone(), node);
189 }
190
191 let mut entrypoints = flow.entrypoints.clone();
192 for (_name, target) in entrypoints.iter_mut() {
193 if target == &plan.anchor {
194 *target = plan.new_node.id.clone();
195 }
196 }
197
198 return Ok(FlowIr {
199 id: flow.id.clone(),
200 kind: flow.kind.clone(),
201 schema_version: flow.schema_version,
202 entrypoints,
203 nodes: new_nodes,
204 });
205 }
206
207 let mut reordered = IndexMap::new();
208 let mut anchor_found = false;
209 for (id, node) in nodes.into_iter() {
210 if id == plan.anchor {
211 anchor_found = true;
212 let mut anchor = node.clone();
213 anchor.routing = apply_threaded_routing(
214 &plan.new_node.id,
215 &plan.anchor_old_routing,
216 allow_cycles,
217 &plan.anchor,
218 )?;
219 reordered.insert(id.clone(), anchor);
220 reordered.insert(plan.new_node.id.clone(), plan.new_node.clone());
221 } else {
222 reordered.insert(id.clone(), node);
223 }
224 }
225
226 if !anchor_found {
227 return Err(FlowError::Internal {
228 message: format!("anchor '{}' not found", plan.anchor),
229 location: FlowErrorLocation::at_path(format!("nodes.{}", plan.anchor)),
230 });
231 }
232
233 Ok(FlowIr {
234 id: flow.id.clone(),
235 kind: flow.kind.clone(),
236 schema_version: flow.schema_version,
237 entrypoints: flow.entrypoints.clone(),
238 nodes: reordered,
239 })
240}
241
242pub fn validate_flow(flow: &FlowIr, _catalog: &dyn ComponentCatalog) -> Vec<Diagnostic> {
243 let mut diags = Vec::new();
244 if let Some((name, target)) = flow.entrypoints.get_index(0)
245 && !flow.nodes.contains_key(target)
246 {
247 diags.push(Diagnostic {
248 code: "ENTRYPOINT_MISSING",
249 message: format!("entrypoint '{}' targets unknown node '{}'", name, target),
250 location: Some(format!("entrypoints.{name}")),
251 });
252 }
253
254 for (id, node) in &flow.nodes {
255 for route in &node.routing {
256 if let Some(to) = &route.to
257 && !flow.nodes.contains_key(to)
258 {
259 diags.push(Diagnostic {
260 code: "ROUTE_TARGET_MISSING",
261 message: format!("node '{}' routes to unknown node '{}'", id, to),
262 location: Some(format!("nodes.{id}.routing")),
263 });
264 }
265 }
266 if node.operation.trim().is_empty() {
267 diags.push(Diagnostic {
268 code: "OPERATION_REQUIRED",
269 message: format!("node '{}' missing operation name", id),
270 location: Some(format!("nodes.{id}")),
271 });
272 }
273 if node.payload.is_null() {
274 diags.push(Diagnostic {
275 code: "PAYLOAD_REQUIRED",
276 message: format!("node '{}' payload must not be null", id),
277 location: Some(format!("nodes.{id}")),
278 });
279 }
280 }
281
282 diags
283}
284
285pub fn diagnostics_to_error(diags: Vec<Diagnostic>) -> Result<()> {
286 if diags.is_empty() {
287 return Ok(());
288 }
289 let combined = diags
290 .into_iter()
291 .map(|d| format!("{}: {}", d.code, d.message))
292 .collect::<Vec<_>>()
293 .join("; ");
294 Err(FlowError::Internal {
295 message: combined,
296 location: FlowErrorLocation::at_path("add_step".to_string()),
297 })
298}
299
300fn resolve_anchor(flow: &FlowIr, after: Option<&str>) -> std::result::Result<String, String> {
301 if let Some(id) = after {
302 if flow.nodes.contains_key(id) {
303 return Ok(id.to_string());
304 }
305 return Err(format!("anchor node '{}' not found", id));
306 }
307
308 if let Some(entry) = flow.entrypoints.get_index(0) {
309 return Ok(entry.1.clone());
310 }
311
312 if let Some(first) = flow.nodes.keys().next() {
313 return Ok(first.clone());
314 }
315
316 Err("flow has no nodes to anchor insertion".to_string())
317}
318
319pub fn apply_and_validate(
320 flow: &FlowIr,
321 plan: AddStepPlan,
322 catalog: &dyn ComponentCatalog,
323 allow_cycles: bool,
324) -> Result<FlowIr> {
325 let updated = apply_plan(flow, plan, allow_cycles)?;
326 validate_schema_and_flow(&updated, catalog)?;
327 Ok(updated)
328}
329
330pub fn anchor_candidates(flow: &FlowIr) -> Vec<String> {
332 let mut seen = IndexMap::new();
333 if let Some((_name, target)) = flow.entrypoints.get_index(0) {
334 seen.insert(target.clone(), ());
335 }
336 for id in flow.nodes.keys() {
337 seen.entry(id.clone()).or_insert(());
338 }
339 seen.keys().cloned().collect()
340}
341
342pub fn add_step_from_config_flow(
344 flow_yaml: &str,
345 config_flow_path: &Path,
346 schema_path: &Path,
347 manifests: &[impl AsRef<Path>],
348 after: Option<String>,
349 answers: &serde_json::Map<String, Value>,
350 allow_cycles: bool,
351) -> Result<FlowDoc> {
352 let flow_doc = load_ygtc_from_str(flow_yaml)?;
353 let flow_ir = FlowIr::from_doc(flow_doc)?;
354 let catalog = ManifestCatalog::load_from_paths(manifests);
355
356 let config_yaml = fs::read_to_string(config_flow_path).map_err(|e| FlowError::Internal {
357 message: format!("read config flow {}: {e}", config_flow_path.display()),
358 location: FlowErrorLocation::at_path(config_flow_path.display().to_string())
359 .with_source_path(Some(config_flow_path)),
360 })?;
361 let output = run_config_flow(&config_yaml, schema_path, answers)?;
362
363 let spec = AddStepSpec {
364 after,
365 node_id_hint: Some(output.node_id.clone()),
366 node: output.node.clone(),
367 allow_cycles,
368 require_placeholder: true,
369 };
370
371 let plan =
372 plan_add_step(&flow_ir, spec, &catalog).map_err(|diags| {
373 match diagnostics_to_error(diags) {
374 Ok(_) => FlowError::Internal {
375 message: "add_step diagnostics unexpectedly empty".to_string(),
376 location: FlowErrorLocation::at_path("add_step".to_string()),
377 },
378 Err(e) => e,
379 }
380 })?;
381 let updated = apply_and_validate(&flow_ir, plan, &catalog, allow_cycles)?;
382 updated.to_doc()
383}