1pub mod id;
2pub mod modes;
3pub mod normalize;
4pub mod rewire;
5pub mod validate;
6
7use indexmap::IndexMap;
8use serde_json::Value;
9use std::{fs, path::Path};
10
11use crate::{
12 component_catalog::ComponentCatalog,
13 component_catalog::ManifestCatalog,
14 config_flow::run_config_flow,
15 error::{FlowError, FlowErrorLocation, Result},
16 flow_ir::{ComponentRef, FlowIr, NodeIr, NodeKind, Route},
17 loader::load_ygtc_from_str,
18 model::FlowDoc,
19};
20
21use self::{
22 id::{generate_node_id, is_placeholder_value},
23 normalize::normalize_node_map,
24 rewire::{apply_threaded_routing, rewrite_placeholder_routes},
25 validate::validate_schema_and_flow,
26};
27
28#[derive(Debug, Clone)]
29pub struct AddStepSpec {
30 pub after: Option<String>,
31 pub node_id_hint: Option<String>,
32 pub node: Value,
33 pub allow_cycles: bool,
34}
35
36#[derive(Debug, Clone)]
37pub struct AddStepPlan {
38 pub anchor: String,
39 pub new_node: NodeIr,
40 pub anchor_old_routing: Vec<Route>,
41}
42
43#[derive(Debug, Clone)]
44pub struct Diagnostic {
45 pub code: &'static str,
46 pub message: String,
47 pub location: Option<String>,
48}
49
50pub fn plan_add_step(
51 flow: &FlowIr,
52 spec: AddStepSpec,
53 catalog: &dyn ComponentCatalog,
54) -> std::result::Result<AddStepPlan, Vec<Diagnostic>> {
55 let mut diags = Vec::new();
56
57 let anchor = match resolve_anchor(flow, spec.after.as_deref()) {
58 Ok(anchor) => anchor,
59 Err(msg) => {
60 diags.push(Diagnostic {
61 code: "ADD_STEP_ANCHOR_MISSING",
62 message: msg,
63 location: Some("nodes".to_string()),
64 });
65 return Err(diags);
66 }
67 };
68
69 if let Some(hint) = spec.node_id_hint.as_deref()
70 && is_placeholder_value(hint)
71 {
72 diags.push(Diagnostic {
73 code: "ADD_STEP_NODE_ID_PLACEHOLDER",
74 message: format!(
75 "Config flow emitted placeholder node id '{hint}'; update greentic-component to emit the component name."
76 ),
77 location: Some("add_step.node_id".to_string()),
78 });
79 return Err(diags);
80 }
81
82 let normalized = match normalize_node_map(spec.node.clone()) {
83 Ok(node) => node,
84 Err(e) => {
85 diags.push(Diagnostic {
86 code: "ADD_STEP_NODE_INVALID",
87 message: e.to_string(),
88 location: Some("add_step.node".to_string()),
89 });
90 return Err(diags);
91 }
92 };
93
94 if let Some(meta) = catalog.resolve(&normalized.component_id) {
95 for req in meta.required_fields {
96 if normalized.payload.get(&req).is_none() {
97 diags.push(Diagnostic {
98 code: "COMPONENT_CONFIG_REQUIRED",
99 message: format!(
100 "component '{}' missing required config '{}'",
101 normalized.component_id, req
102 ),
103 location: Some(format!("nodes.{}.{}", normalized.component_id, req)),
104 });
105 }
106 }
107 } else {
108 diags.push(Diagnostic {
109 code: "ADD_STEP_COMPONENT_UNKNOWN",
110 message: format!(
111 "component '{}' not found in catalog",
112 normalized.component_id
113 ),
114 location: Some("add_step.component".to_string()),
115 });
116 }
117
118 if !diags.is_empty() {
119 return Err(diags);
120 }
121
122 let anchor_node = flow.nodes.get(&anchor).expect("anchor exists");
123 let anchor_old_routing = anchor_node.routing.clone();
124
125 let new_node_id = generate_node_id(
126 spec.node_id_hint.as_deref(),
127 &normalized.component_id,
128 normalized.operation.as_deref(),
129 normalized.pack_alias.as_deref(),
130 &anchor,
131 flow.nodes.keys().map(|k| k.as_str()),
132 );
133
134 let routing = rewrite_placeholder_routes(
135 normalized.routing.clone(),
136 &anchor_old_routing,
137 spec.allow_cycles,
138 &anchor,
139 true,
140 )
141 .map_err(|msg| {
142 vec![Diagnostic {
143 code: "ADD_STEP_ROUTING_INVALID",
144 message: msg,
145 location: Some(format!("nodes.{new_node_id}.routing")),
146 }]
147 })?;
148
149 let new_node = NodeIr {
150 id: new_node_id.clone(),
151 kind: NodeKind::Component(ComponentRef {
152 component_id: normalized.component_id.clone(),
153 pack_alias: normalized.pack_alias.clone(),
154 operation: normalized.operation.clone(),
155 payload: normalized.payload.clone(),
156 }),
157 routing,
158 };
159
160 Ok(AddStepPlan {
161 anchor,
162 new_node,
163 anchor_old_routing,
164 })
165}
166
167pub fn apply_plan(flow: &FlowIr, plan: AddStepPlan, allow_cycles: bool) -> Result<FlowIr> {
168 let mut nodes: IndexMap<String, NodeIr> = flow.nodes.clone();
169 if nodes.contains_key(&plan.new_node.id) {
170 return Err(FlowError::Internal {
171 message: format!("node '{}' already exists", plan.new_node.id),
172 location: FlowErrorLocation::at_path(format!("nodes.{}", plan.new_node.id)),
173 });
174 }
175
176 let mut anchor = nodes
177 .get(&plan.anchor)
178 .cloned()
179 .ok_or_else(|| FlowError::Internal {
180 message: format!("anchor '{}' not found", plan.anchor),
181 location: FlowErrorLocation::at_path(format!("nodes.{}", plan.anchor)),
182 })?;
183
184 let anchor_routing = apply_threaded_routing(
185 &plan.new_node.id,
186 &plan.anchor_old_routing,
187 allow_cycles,
188 &plan.anchor,
189 )?;
190 anchor.routing = anchor_routing;
191 nodes.insert(plan.anchor.clone(), anchor);
192 nodes.insert(plan.new_node.id.clone(), plan.new_node);
193
194 Ok(FlowIr {
195 id: flow.id.clone(),
196 kind: flow.kind.clone(),
197 entrypoints: flow.entrypoints.clone(),
198 nodes,
199 })
200}
201
202pub fn validate_flow(flow: &FlowIr, catalog: &dyn ComponentCatalog) -> Vec<Diagnostic> {
203 let mut diags = Vec::new();
204 if let Some((name, target)) = flow.entrypoints.get_index(0)
205 && !flow.nodes.contains_key(target)
206 {
207 diags.push(Diagnostic {
208 code: "ENTRYPOINT_MISSING",
209 message: format!("entrypoint '{}' targets unknown node '{}'", name, target),
210 location: Some(format!("entrypoints.{name}")),
211 });
212 }
213
214 for (id, node) in &flow.nodes {
215 for route in &node.routing {
216 if let Some(to) = &route.to
217 && !flow.nodes.contains_key(to)
218 {
219 diags.push(Diagnostic {
220 code: "ROUTE_TARGET_MISSING",
221 message: format!("node '{}' routes to unknown node '{}'", id, to),
222 location: Some(format!("nodes.{id}.routing")),
223 });
224 }
225 }
226
227 match &node.kind {
228 NodeKind::Component(comp) => {
229 if comp.payload.is_null() {
230 diags.push(Diagnostic {
231 code: "COMPONENT_PAYLOAD_REQUIRED",
232 message: format!(
233 "component '{}' payload must not be null",
234 comp.component_id
235 ),
236 location: Some(format!("nodes.{id}")),
237 });
238 }
239
240 if let Some(meta) = catalog.resolve(&comp.component_id) {
241 for req in meta.required_fields {
242 if comp.payload.get(&req).is_none() {
243 diags.push(Diagnostic {
244 code: "COMPONENT_CONFIG_REQUIRED",
245 message: format!(
246 "component '{}' missing required config '{}'",
247 comp.component_id, req
248 ),
249 location: Some(format!("nodes.{id}.{req}")),
250 });
251 }
252 }
253 } else {
254 diags.push(Diagnostic {
255 code: "COMPONENT_NOT_FOUND",
256 message: format!("component '{}' not found in catalog", comp.component_id),
257 location: Some(format!("nodes.{id}")),
258 });
259 }
260 }
261 NodeKind::Questions { fields } => {
262 if fields.get("fields").is_none() {
263 diags.push(Diagnostic {
264 code: "QUESTIONS_FIELDS_REQUIRED",
265 message: "questions node missing fields".to_string(),
266 location: Some(format!("nodes.{id}.questions.fields")),
267 });
268 }
269 }
270 NodeKind::Template { template } => {
271 if template.is_empty() {
272 diags.push(Diagnostic {
273 code: "TEMPLATE_EMPTY",
274 message: "template node payload is empty".to_string(),
275 location: Some(format!("nodes.{id}.template")),
276 });
277 }
278 }
279 NodeKind::Other { .. } => {}
280 }
281 }
282
283 diags
284}
285
286pub fn diagnostics_to_error(diags: Vec<Diagnostic>) -> Result<()> {
287 if diags.is_empty() {
288 return Ok(());
289 }
290 let combined = diags
291 .into_iter()
292 .map(|d| format!("{}: {}", d.code, d.message))
293 .collect::<Vec<_>>()
294 .join("; ");
295 Err(FlowError::Internal {
296 message: combined,
297 location: FlowErrorLocation::at_path("add_step".to_string()),
298 })
299}
300
301fn resolve_anchor(flow: &FlowIr, after: Option<&str>) -> std::result::Result<String, String> {
302 if let Some(id) = after {
303 if flow.nodes.contains_key(id) {
304 return Ok(id.to_string());
305 }
306 return Err(format!("anchor node '{}' not found", id));
307 }
308
309 if let Some(entry) = flow.entrypoints.get_index(0) {
310 return Ok(entry.1.clone());
311 }
312
313 if let Some(first) = flow.nodes.keys().next() {
314 return Ok(first.clone());
315 }
316
317 Err("flow has no nodes to anchor insertion".to_string())
318}
319
320pub fn apply_and_validate(
321 flow: &FlowIr,
322 plan: AddStepPlan,
323 catalog: &dyn ComponentCatalog,
324 allow_cycles: bool,
325) -> Result<FlowIr> {
326 let updated = apply_plan(flow, plan, allow_cycles)?;
327 validate_schema_and_flow(&updated, catalog)?;
328 Ok(updated)
329}
330
331pub fn anchor_candidates(flow: &FlowIr) -> Vec<String> {
333 let mut seen = IndexMap::new();
334 if let Some((_name, target)) = flow.entrypoints.get_index(0) {
335 seen.insert(target.clone(), ());
336 }
337 for id in flow.nodes.keys() {
338 seen.entry(id.clone()).or_insert(());
339 }
340 seen.keys().cloned().collect()
341}
342
343pub fn add_step_from_config_flow(
345 flow_yaml: &str,
346 config_flow_path: &Path,
347 schema_path: &Path,
348 manifests: &[impl AsRef<Path>],
349 after: Option<String>,
350 answers: &serde_json::Map<String, Value>,
351 allow_cycles: bool,
352) -> Result<FlowDoc> {
353 let flow_doc = load_ygtc_from_str(flow_yaml)?;
354 let flow_ir = FlowIr::from_doc(flow_doc)?;
355 let catalog = ManifestCatalog::load_from_paths(manifests);
356
357 let config_yaml = fs::read_to_string(config_flow_path).map_err(|e| FlowError::Internal {
358 message: format!("read config flow {}: {e}", config_flow_path.display()),
359 location: FlowErrorLocation::at_path(config_flow_path.display().to_string())
360 .with_source_path(Some(config_flow_path)),
361 })?;
362 let output = run_config_flow(&config_yaml, schema_path, answers)?;
363
364 let spec = AddStepSpec {
365 after,
366 node_id_hint: Some(output.node_id.clone()),
367 node: output.node.clone(),
368 allow_cycles,
369 };
370
371 let plan =
372 plan_add_step(&flow_ir, spec, &catalog).map_err(|diags| {
373 match diagnostics_to_error(diags) {
374 Ok(_) => FlowError::Internal {
375 message: "add_step diagnostics unexpectedly empty".to_string(),
376 location: FlowErrorLocation::at_path("add_step".to_string()),
377 },
378 Err(e) => e,
379 }
380 })?;
381 let updated = apply_and_validate(&flow_ir, plan, &catalog, allow_cycles)?;
382 updated.to_doc()
383}