1use indexmap::IndexMap;
2use serde::{Deserialize, Serialize};
3use serde_json::{Map, Value};
4
5use crate::{
6 error::{FlowError, FlowErrorLocation, Result},
7 loader::load_ygtc_from_str,
8 model::{FlowDoc, NodeDoc},
9};
10
11#[derive(Debug, Clone)]
14pub struct FlowIr {
15 pub id: String,
16 pub title: Option<String>,
17 pub description: Option<String>,
18 pub kind: String,
19 pub start: Option<String>,
20 pub parameters: Value,
21 pub tags: Vec<String>,
22 pub schema_version: Option<u32>,
23 pub entrypoints: IndexMap<String, String>,
24 pub meta: Option<Value>,
25 pub slot_schema: Option<Value>,
26 pub nodes: IndexMap<String, NodeIr>,
27}
28
29#[derive(Debug, Clone)]
30pub struct NodeIr {
31 pub id: String,
32 pub operation: String,
33 pub payload: Value,
34 pub output: Value,
35 pub in_map: Option<Value>,
36 pub out_map: Option<Value>,
37 pub err_map: Option<Value>,
38 pub routing: Vec<Route>,
39 pub telemetry: Option<Value>,
40}
41
42#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
43pub struct Route {
44 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub to: Option<String>,
46 #[serde(default, skip_serializing_if = "is_false")]
47 pub out: bool,
48 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub status: Option<String>,
50 #[serde(default, skip_serializing_if = "is_false")]
51 pub reply: bool,
52 #[serde(default, skip_serializing_if = "Option::is_none")]
53 pub condition: Option<String>,
54}
55
56fn is_false(value: &bool) -> bool {
57 !*value
58}
59
60impl FlowIr {
61 pub fn from_doc(doc: FlowDoc) -> Result<Self> {
62 let schema_version = doc.schema_version;
63 let entrypoints = resolve_entrypoints(&doc);
64 let mut nodes = IndexMap::new();
65 for (id, node_doc) in doc.nodes {
66 let (operation, payload) = extract_operation(&node_doc, &id)?;
67 let routing = parse_routing(&node_doc, &id)?;
68 let output = node_doc
69 .raw
70 .get("output")
71 .cloned()
72 .unwrap_or_else(|| Value::Object(Map::new()));
73 let in_map = node_doc.raw.get("in_map").cloned();
74 let out_map = node_doc.raw.get("out_map").cloned();
75 let err_map = node_doc.raw.get("err_map").cloned();
76 nodes.insert(
77 id.clone(),
78 NodeIr {
79 id: id.clone(),
80 operation,
81 payload,
82 output,
83 in_map,
84 out_map,
85 err_map,
86 routing,
87 telemetry: node_doc
88 .telemetry
89 .clone()
90 .and_then(|t| serde_json::to_value(t).ok()),
91 },
92 );
93 }
94
95 Ok(FlowIr {
96 id: doc.id,
97 title: doc.title,
98 description: doc.description,
99 kind: doc.flow_type,
100 start: doc.start,
101 parameters: doc.parameters,
102 tags: doc.tags,
103 schema_version,
104 entrypoints,
105 meta: doc.meta,
106 slot_schema: doc.slot_schema,
107 nodes,
108 })
109 }
110
111 pub fn to_doc(&self) -> Result<FlowDoc> {
112 let mut nodes: IndexMap<String, NodeDoc> = IndexMap::new();
113 for (id, node_ir) in &self.nodes {
114 let mut raw = IndexMap::new();
115 raw.insert(node_ir.operation.clone(), node_ir.payload.clone());
116 if !node_ir.output.is_object()
117 || !node_ir
118 .output
119 .as_object()
120 .map(|m| m.is_empty())
121 .unwrap_or(false)
122 {
123 raw.insert("output".to_string(), node_ir.output.clone());
124 }
125 if let Some(in_map) = node_ir.in_map.as_ref() {
126 raw.insert("in_map".to_string(), in_map.clone());
127 }
128 if let Some(out_map) = node_ir.out_map.as_ref() {
129 raw.insert("out_map".to_string(), out_map.clone());
130 }
131 if let Some(err_map) = node_ir.err_map.as_ref() {
132 raw.insert("err_map".to_string(), err_map.clone());
133 }
134 let routing_value =
135 serde_json::to_value(&node_ir.routing).map_err(|e| FlowError::Internal {
136 message: format!("serialize routing for node '{id}': {e}"),
137 location: FlowErrorLocation::at_path(format!("nodes.{id}.routing")),
138 })?;
139 let routing_yaml = if node_ir.routing.len() == 1
140 && node_ir.routing[0].out
141 && node_ir.routing[0].to.is_none()
142 && !node_ir.routing[0].reply
143 && node_ir.routing[0].status.is_none()
144 && node_ir.routing[0].condition.is_none()
145 {
146 Value::String("out".to_string())
147 } else if node_ir.routing.len() == 1
148 && node_ir.routing[0].reply
149 && node_ir.routing[0].to.is_none()
150 && !node_ir.routing[0].out
151 && node_ir.routing[0].status.is_none()
152 && node_ir.routing[0].condition.is_none()
153 {
154 Value::String("reply".to_string())
155 } else {
156 routing_value
157 };
158 nodes.insert(
159 id.clone(),
160 NodeDoc {
161 routing: routing_yaml,
162 telemetry: node_ir
163 .telemetry
164 .as_ref()
165 .and_then(|t| serde_json::from_value(t.clone()).ok()),
166 operation: Some(node_ir.operation.clone()),
167 payload: node_ir.payload.clone(),
168 raw,
169 },
170 );
171 }
172
173 let mut entrypoints = IndexMap::new();
174 for (name, target) in &self.entrypoints {
175 if name == "default" {
176 continue;
177 }
178 entrypoints.insert(name.clone(), Value::String(target.clone()));
179 }
180
181 let start = self
182 .entrypoints
183 .get("default")
184 .cloned()
185 .or_else(|| self.start.clone());
186
187 Ok(FlowDoc {
188 id: self.id.clone(),
189 title: self.title.clone(),
190 description: self.description.clone(),
191 flow_type: self.kind.clone(),
192 start,
193 parameters: self.parameters.clone(),
194 tags: self.tags.clone(),
195 schema_version: self.schema_version,
196 entrypoints,
197 meta: self.meta.clone(),
198 slot_schema: self.slot_schema.clone(),
199 nodes,
200 })
201 }
202}
203
204fn resolve_entrypoints(doc: &FlowDoc) -> IndexMap<String, String> {
205 let mut entries = IndexMap::new();
206 if let Some(start) = &doc.start {
207 entries.insert("default".to_string(), start.clone());
208 } else if doc.nodes.contains_key("in") {
209 entries.insert("default".to_string(), "in".to_string());
210 } else if let Some(first) = doc.nodes.keys().next() {
211 entries.insert("default".to_string(), first.clone());
212 }
213 for (k, v) in &doc.entrypoints {
214 if let Some(target) = v.as_str() {
215 entries.insert(k.clone(), target.to_string());
216 }
217 }
218 entries
219}
220
221fn parse_routing(node: &NodeDoc, node_id: &str) -> Result<Vec<Route>> {
222 if node.routing.is_null() {
223 return Ok(Vec::new());
224 }
225 if let Some(s) = node.routing.as_str() {
226 return match s {
227 "out" => Ok(vec![Route {
228 out: true,
229 ..Route::default()
230 }]),
231 "reply" => Ok(vec![Route {
232 reply: true,
233 ..Route::default()
234 }]),
235 other => Err(FlowError::Routing {
236 node_id: node_id.to_string(),
237 message: format!("unsupported routing shorthand '{other}'"),
238 location: FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
239 }),
240 };
241 }
242 #[derive(serde::Deserialize)]
243 struct RouteDoc {
244 #[serde(default)]
245 to: Option<String>,
246 #[serde(default)]
247 out: Option<bool>,
248 #[serde(default)]
249 status: Option<String>,
250 #[serde(default)]
251 reply: Option<bool>,
252 #[serde(default)]
253 condition: Option<String>,
254 }
255
256 let routes: Vec<RouteDoc> =
257 serde_json::from_value(node.routing.clone()).map_err(|e| FlowError::Internal {
258 message: format!("routing decode for node '{node_id}': {e}"),
259 location: FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
260 })?;
261
262 Ok(routes
263 .into_iter()
264 .map(|r| Route {
265 to: r.to,
266 out: r.out.unwrap_or(false),
267 status: r.status,
268 reply: r.reply.unwrap_or(false),
269 condition: r.condition,
270 })
271 .collect())
272}
273
274pub fn parse_flow_to_ir(yaml: &str) -> Result<FlowIr> {
276 let doc = load_ygtc_from_str(yaml)?;
277 FlowIr::from_doc(doc)
278}
279
280fn extract_operation(node: &NodeDoc, node_id: &str) -> Result<(String, Value)> {
281 let reserved = [
282 "routing",
283 "telemetry",
284 "output",
285 "in_map",
286 "out_map",
287 "err_map",
288 "retry",
289 "timeout",
290 "when",
291 "annotations",
292 "meta",
293 ];
294 if let Some(exec) = node.raw.get("component.exec") {
295 let op = node
296 .raw
297 .get("operation")
298 .and_then(Value::as_str)
299 .or(node.operation.as_deref())
300 .unwrap_or("");
301 if op.trim().is_empty() {
302 return Err(FlowError::Internal {
303 message: format!("node '{node_id}' missing operation key"),
304 location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
305 });
306 }
307 return Ok((op.to_string(), exec.clone()));
308 }
309 let mut op_key: Option<String> = None;
310 let mut payload: Option<Value> = None;
311 for (k, v) in &node.raw {
312 if reserved.contains(&k.as_str()) {
313 continue;
314 }
315 if op_key.is_some() {
316 return Err(FlowError::Internal {
317 message: format!(
318 "node '{node_id}' must have exactly one operation key, found multiple"
319 ),
320 location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
321 });
322 }
323 op_key = Some(k.clone());
324 payload = Some(v.clone());
325 }
326 if let (Some(k), Some(v)) = (op_key, payload) {
327 return Ok((k, v));
328 }
329
330 if let Some(op) = &node.operation {
331 return Ok((op.clone(), node.payload.clone()));
332 }
333
334 Err(FlowError::Internal {
335 message: format!("node '{node_id}' missing operation key"),
336 location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
337 })
338}
339
340#[cfg(test)]
341mod tests {
342 use super::parse_flow_to_ir;
343 use serde_json::json;
344
345 #[test]
346 fn parse_and_roundtrip_preserves_alias_maps() {
347 let yaml = r#"
348id: alias_flow
349type: messaging
350schema_version: 2
351nodes:
352 start:
353 component.exec:
354 component: repo://demo/component
355 config:
356 greeting: hi
357 operation: run
358 in_map:
359 source: $.input
360 out_map:
361 target: $.output
362 err_map:
363 target: $.error
364 routing: out
365"#;
366
367 let flow = parse_flow_to_ir(yaml).expect("parse flow");
368 let node = flow.nodes.get("start").expect("start node");
369 assert_eq!(node.in_map.as_ref(), Some(&json!({ "source": "$.input" })));
370 assert_eq!(
371 node.out_map.as_ref(),
372 Some(&json!({ "target": "$.output" }))
373 );
374 assert_eq!(node.err_map.as_ref(), Some(&json!({ "target": "$.error" })));
375
376 let doc = flow.to_doc().expect("to doc");
377 let raw = &doc.nodes.get("start").expect("start doc node").raw;
378 assert_eq!(raw.get("in_map"), Some(&json!({ "source": "$.input" })));
379 assert_eq!(raw.get("out_map"), Some(&json!({ "target": "$.output" })));
380 assert_eq!(raw.get("err_map"), Some(&json!({ "target": "$.error" })));
381 }
382
383 #[test]
384 fn slot_schema_round_trips_through_ir() {
385 let yaml = r#"
386id: nda_intake
387type: messaging
388schema_version: 2
389slot_schema:
390 - name: counterparty
391 slot_type: string
392 pattern: "between\\s+([A-Z][\\w&. ]*)"
393 required: true
394 - name: due_date
395 slot_type: date
396 required: true
397nodes:
398 start:
399 component.exec:
400 component: repo://demo/component
401 operation: run
402 routing: out
403"#;
404
405 let flow = parse_flow_to_ir(yaml).expect("parse flow");
406 let schema = flow.slot_schema.as_ref().expect("slot_schema present");
407 assert_eq!(schema[0]["name"], "counterparty");
408 assert_eq!(schema[0]["slot_type"], "string");
409 assert_eq!(schema[1]["name"], "due_date");
410 assert_eq!(schema[1]["slot_type"], "date");
411
412 let doc = flow.to_doc().expect("to doc");
413 assert_eq!(doc.slot_schema.as_ref(), Some(schema));
414 }
415
416 #[test]
417 fn flow_without_slot_schema_round_trips_with_none() {
418 let yaml = r#"
419id: legacy
420type: messaging
421nodes:
422 start:
423 component.exec:
424 component: repo://demo/component
425 operation: run
426 routing: out
427"#;
428
429 let flow = parse_flow_to_ir(yaml).expect("parse flow");
430 assert!(flow.slot_schema.is_none());
431 let doc = flow.to_doc().expect("to doc");
432 assert!(doc.slot_schema.is_none());
433
434 let yaml_out = serde_yaml_bw::to_string(&doc).expect("serialize");
436 assert!(
437 !yaml_out.contains("slot_schema"),
438 "absent slot_schema must not serialize: {yaml_out}"
439 );
440 }
441}