greentic_flow/
config_flow.rs1use std::path::Path;
2
3use lazy_static::lazy_static;
4use regex::Regex;
5use serde_json::{Map, Value};
6
7use crate::{
8 add_step::normalize::normalize_node_map,
9 compile_flow,
10 error::{FlowError, FlowErrorLocation, Result},
11 loader::load_ygtc_from_str_with_schema,
12};
13
14#[derive(Debug, Clone, PartialEq)]
16pub struct ConfigFlowOutput {
17 pub node_id: String,
18 pub node: Value,
19}
20
21pub fn run_config_flow(
30 yaml: &str,
31 schema_path: &Path,
32 answers: &Map<String, Value>,
33) -> Result<ConfigFlowOutput> {
34 let normalized_yaml = normalize_config_flow_yaml(yaml)?;
35 let doc = load_ygtc_from_str_with_schema(&normalized_yaml, schema_path)?;
36 let flow = compile_flow(doc.clone())?;
37 let mut state = answers.clone();
38
39 let mut current = resolve_entry(&doc);
40 let mut visited = 0usize;
41 while visited < flow.nodes.len().saturating_add(4) {
42 visited += 1;
43 let node_id = greentic_types::NodeId::new(current.as_str()).map_err(|e| {
44 FlowError::InvalidIdentifier {
45 kind: "node",
46 value: current.clone(),
47 detail: e.to_string(),
48 location: FlowErrorLocation::at_path(format!("nodes.{current}")),
49 }
50 })?;
51 let node = flow
52 .nodes
53 .get(&node_id)
54 .ok_or_else(|| FlowError::Internal {
55 message: format!("node '{current}' missing during config flow execution"),
56 location: FlowErrorLocation::at_path(format!("nodes.{current}")),
57 })?;
58
59 match node.component.id.as_str() {
60 "questions" => {
61 apply_questions(&node.input.mapping, &mut state)?;
62 }
63 "template" => {
64 let payload = render_template(&node.input.mapping, &state)?;
65 return extract_config_output(payload);
66 }
67 other => {
68 return Err(FlowError::Internal {
69 message: format!("unsupported component '{other}' in config flow"),
70 location: FlowErrorLocation::at_path(format!("nodes.{current}")),
71 });
72 }
73 }
74
75 current = match &node.routing {
76 greentic_types::Routing::Next { node_id } => node_id.as_str().to_string(),
77 greentic_types::Routing::End | greentic_types::Routing::Reply => {
78 return Err(FlowError::Internal {
79 message: "config flow terminated without reaching template node".to_string(),
80 location: FlowErrorLocation::at_path("nodes".to_string()),
81 });
82 }
83 greentic_types::Routing::Branch { .. } | greentic_types::Routing::Custom(_) => {
84 return Err(FlowError::Internal {
85 message: "unsupported routing shape in config flow".to_string(),
86 location: FlowErrorLocation::at_path(format!("nodes.{current}.routing")),
87 });
88 }
89 }
90 }
91
92 Err(FlowError::Internal {
93 message: "config flow exceeded traversal limit".to_string(),
94 location: FlowErrorLocation::at_path("nodes".to_string()),
95 })
96}
97
98pub fn run_config_flow_from_path(
100 path: &Path,
101 schema_path: &Path,
102 answers: &Map<String, Value>,
103) -> Result<ConfigFlowOutput> {
104 let text = std::fs::read_to_string(path).map_err(|e| FlowError::Internal {
105 message: format!("read config flow {}: {e}", path.display()),
106 location: FlowErrorLocation::at_path(path.display().to_string())
107 .with_source_path(Some(path)),
108 })?;
109 run_config_flow(&text, schema_path, answers)
110}
111
112fn resolve_entry(doc: &crate::model::FlowDoc) -> String {
113 if let Some(start) = &doc.start {
114 return start.clone();
115 }
116 if doc.nodes.contains_key("in") {
117 return "in".to_string();
118 }
119 doc.nodes
120 .keys()
121 .next()
122 .cloned()
123 .unwrap_or_else(|| "in".to_string())
124}
125
126fn apply_questions(payload: &Value, state: &mut Map<String, Value>) -> Result<()> {
127 let fields = payload
128 .get("fields")
129 .and_then(Value::as_array)
130 .ok_or_else(|| FlowError::Internal {
131 message: "questions node missing fields array".to_string(),
132 location: FlowErrorLocation::at_path("questions.fields".to_string()),
133 })?;
134
135 for field in fields {
136 let id = field
137 .get("id")
138 .and_then(Value::as_str)
139 .ok_or_else(|| FlowError::Internal {
140 message: "questions field missing id".to_string(),
141 location: FlowErrorLocation::at_path("questions.fields".to_string()),
142 })?;
143 if state.contains_key(id) {
144 continue;
145 }
146 if let Some(default) = field.get("default") {
147 state.insert(id.to_string(), default.clone());
148 } else {
149 return Err(FlowError::Internal {
150 message: format!("missing answer for '{id}'"),
151 location: FlowErrorLocation::at_path(format!("questions.fields.{id}")),
152 });
153 }
154 }
155 Ok(())
156}
157
158fn render_template(payload: &Value, state: &Map<String, Value>) -> Result<Value> {
159 let template_str = payload.as_str().ok_or_else(|| FlowError::Internal {
160 message: "template node payload must be a string".to_string(),
161 location: FlowErrorLocation::at_path("template".to_string()),
162 })?;
163 let mut value: Value = serde_json::from_str(template_str).map_err(|e| FlowError::Internal {
164 message: format!("template JSON parse error: {e}"),
165 location: FlowErrorLocation::at_path("template".to_string()),
166 })?;
167 substitute_state(&mut value, state)?;
168 Ok(value)
169}
170
171lazy_static! {
172 static ref STATE_RE: Regex = Regex::new(r"^\{\{\s*state\.([A-Za-z_]\w*)\s*\}\}$").unwrap();
173}
174
175fn substitute_state(target: &mut Value, state: &Map<String, Value>) -> Result<()> {
176 match target {
177 Value::String(s) => {
178 if let Some(caps) = STATE_RE.captures(s) {
179 let key = caps.get(1).unwrap().as_str();
180 let val = state.get(key).ok_or_else(|| FlowError::Internal {
181 message: format!("state value for '{key}' not found"),
182 location: FlowErrorLocation::at_path(format!("state.{key}")),
183 })?;
184 *target = val.clone();
185 }
186 Ok(())
187 }
188 Value::Array(items) => {
189 for item in items {
190 substitute_state(item, state)?;
191 }
192 Ok(())
193 }
194 Value::Object(map) => {
195 for value in map.values_mut() {
196 substitute_state(value, state)?;
197 }
198 Ok(())
199 }
200 _ => Ok(()),
201 }
202}
203
204fn extract_config_output(value: Value) -> Result<ConfigFlowOutput> {
205 let node_id = value
206 .get("node_id")
207 .and_then(Value::as_str)
208 .ok_or_else(|| FlowError::Internal {
209 message: "config flow output missing node_id".to_string(),
210 location: FlowErrorLocation::at_path("node_id".to_string()),
211 })?
212 .to_string();
213 let node = value
214 .get("node")
215 .cloned()
216 .ok_or_else(|| FlowError::Internal {
217 message: "config flow output missing node".to_string(),
218 location: FlowErrorLocation::at_path("node".to_string()),
219 })
220 .and_then(normalize_node_shape)?;
221 Ok(ConfigFlowOutput { node_id, node })
222}
223
224fn normalize_node_shape(node: Value) -> Result<Value> {
225 let normalized = normalize_node_map(node)?;
226 let mut map = Map::new();
227 map.insert(normalized.component_id.clone(), normalized.payload.clone());
228 if let Some(alias) = normalized.pack_alias {
229 map.insert("pack_alias".to_string(), Value::String(alias));
230 }
231 if let Some(op) = normalized.operation {
232 map.insert("operation".to_string(), Value::String(op));
233 }
234 let routing_value =
235 serde_json::to_value(&normalized.routing).map_err(|e| FlowError::Internal {
236 message: format!("serialize routing: {e}"),
237 location: FlowErrorLocation::at_path("node.routing".to_string()),
238 })?;
239 map.insert("routing".to_string(), routing_value);
240
241 Ok(Value::Object(map))
242}
243
244fn normalize_config_flow_yaml(yaml: &str) -> Result<String> {
245 let mut value: Value = serde_yaml_bw::from_str(yaml).map_err(|e| FlowError::Yaml {
246 message: e.to_string(),
247 location: FlowErrorLocation::at_path("config_flow".to_string()),
248 })?;
249 if let Some(map) = value.as_object_mut() {
250 match map.get("type") {
251 Some(Value::String(_)) => {}
252 _ => {
253 map.insert(
254 "type".to_string(),
255 Value::String("component-config".to_string()),
256 );
257 }
258 }
259 }
260 serde_yaml_bw::to_string(&value).map_err(|e| FlowError::Internal {
261 message: format!("normalize config flow: {e}"),
262 location: FlowErrorLocation::at_path("config_flow".to_string()),
263 })
264}