Skip to main content

greentic_flow/
flow_ir.rs

1use indexmap::IndexMap;
2use serde::{Deserialize, Serialize};
3use serde_json::{Map, Value};
4
5use crate::{
6    error::{FlowError, FlowErrorLocation, Result},
7    loader::load_ygtc_from_str,
8    model::{FlowDoc, NodeDoc},
9};
10
11/// Typed intermediate representation for flows, suitable for planning edits before
12/// rendering back into YGTC YAML.
13#[derive(Debug, Clone)]
14pub struct FlowIr {
15    pub id: String,
16    pub title: Option<String>,
17    pub description: Option<String>,
18    pub kind: String,
19    pub start: Option<String>,
20    pub parameters: Value,
21    pub tags: Vec<String>,
22    pub schema_version: Option<u32>,
23    pub entrypoints: IndexMap<String, String>,
24    pub meta: Option<Value>,
25    pub slot_schema: Option<Value>,
26    pub nodes: IndexMap<String, NodeIr>,
27}
28
29#[derive(Debug, Clone)]
30pub struct NodeIr {
31    pub id: String,
32    pub operation: String,
33    pub payload: Value,
34    pub output: Value,
35    pub in_map: Option<Value>,
36    pub out_map: Option<Value>,
37    pub err_map: Option<Value>,
38    pub routing: Vec<Route>,
39    pub telemetry: Option<Value>,
40}
41
42#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
43pub struct Route {
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub to: Option<String>,
46    #[serde(default, skip_serializing_if = "is_false")]
47    pub out: bool,
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub status: Option<String>,
50    #[serde(default, skip_serializing_if = "is_false")]
51    pub reply: bool,
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    pub condition: Option<String>,
54}
55
56fn is_false(value: &bool) -> bool {
57    !*value
58}
59
60impl FlowIr {
61    pub fn from_doc(doc: FlowDoc) -> Result<Self> {
62        let schema_version = doc.schema_version;
63        let entrypoints = resolve_entrypoints(&doc);
64        let mut nodes = IndexMap::new();
65        for (id, node_doc) in doc.nodes {
66            let (operation, payload) = extract_operation(&node_doc, &id)?;
67            let routing = parse_routing(&node_doc, &id)?;
68            let output = node_doc
69                .raw
70                .get("output")
71                .cloned()
72                .unwrap_or_else(|| Value::Object(Map::new()));
73            let in_map = node_doc.raw.get("in_map").cloned();
74            let out_map = node_doc.raw.get("out_map").cloned();
75            let err_map = node_doc.raw.get("err_map").cloned();
76            nodes.insert(
77                id.clone(),
78                NodeIr {
79                    id: id.clone(),
80                    operation,
81                    payload,
82                    output,
83                    in_map,
84                    out_map,
85                    err_map,
86                    routing,
87                    telemetry: node_doc
88                        .telemetry
89                        .clone()
90                        .and_then(|t| serde_json::to_value(t).ok()),
91                },
92            );
93        }
94
95        Ok(FlowIr {
96            id: doc.id,
97            title: doc.title,
98            description: doc.description,
99            kind: doc.flow_type,
100            start: doc.start,
101            parameters: doc.parameters,
102            tags: doc.tags,
103            schema_version,
104            entrypoints,
105            meta: doc.meta,
106            slot_schema: doc.slot_schema,
107            nodes,
108        })
109    }
110
111    pub fn to_doc(&self) -> Result<FlowDoc> {
112        let mut nodes: IndexMap<String, NodeDoc> = IndexMap::new();
113        for (id, node_ir) in &self.nodes {
114            let mut raw = IndexMap::new();
115            raw.insert(node_ir.operation.clone(), node_ir.payload.clone());
116            if !node_ir.output.is_object()
117                || !node_ir
118                    .output
119                    .as_object()
120                    .map(|m| m.is_empty())
121                    .unwrap_or(false)
122            {
123                raw.insert("output".to_string(), node_ir.output.clone());
124            }
125            if let Some(in_map) = node_ir.in_map.as_ref() {
126                raw.insert("in_map".to_string(), in_map.clone());
127            }
128            if let Some(out_map) = node_ir.out_map.as_ref() {
129                raw.insert("out_map".to_string(), out_map.clone());
130            }
131            if let Some(err_map) = node_ir.err_map.as_ref() {
132                raw.insert("err_map".to_string(), err_map.clone());
133            }
134            let routing_value =
135                serde_json::to_value(&node_ir.routing).map_err(|e| FlowError::Internal {
136                    message: format!("serialize routing for node '{id}': {e}"),
137                    location: FlowErrorLocation::at_path(format!("nodes.{id}.routing")),
138                })?;
139            let routing_yaml = if node_ir.routing.len() == 1
140                && node_ir.routing[0].out
141                && node_ir.routing[0].to.is_none()
142                && !node_ir.routing[0].reply
143                && node_ir.routing[0].status.is_none()
144                && node_ir.routing[0].condition.is_none()
145            {
146                Value::String("out".to_string())
147            } else if node_ir.routing.len() == 1
148                && node_ir.routing[0].reply
149                && node_ir.routing[0].to.is_none()
150                && !node_ir.routing[0].out
151                && node_ir.routing[0].status.is_none()
152                && node_ir.routing[0].condition.is_none()
153            {
154                Value::String("reply".to_string())
155            } else {
156                routing_value
157            };
158            nodes.insert(
159                id.clone(),
160                NodeDoc {
161                    routing: routing_yaml,
162                    telemetry: node_ir
163                        .telemetry
164                        .as_ref()
165                        .and_then(|t| serde_json::from_value(t.clone()).ok()),
166                    operation: Some(node_ir.operation.clone()),
167                    payload: node_ir.payload.clone(),
168                    raw,
169                },
170            );
171        }
172
173        let mut entrypoints = IndexMap::new();
174        for (name, target) in &self.entrypoints {
175            if name == "default" {
176                continue;
177            }
178            entrypoints.insert(name.clone(), Value::String(target.clone()));
179        }
180
181        let start = self
182            .entrypoints
183            .get("default")
184            .cloned()
185            .or_else(|| self.start.clone());
186
187        Ok(FlowDoc {
188            id: self.id.clone(),
189            title: self.title.clone(),
190            description: self.description.clone(),
191            flow_type: self.kind.clone(),
192            start,
193            parameters: self.parameters.clone(),
194            tags: self.tags.clone(),
195            schema_version: self.schema_version,
196            entrypoints,
197            meta: self.meta.clone(),
198            slot_schema: self.slot_schema.clone(),
199            nodes,
200        })
201    }
202}
203
204fn resolve_entrypoints(doc: &FlowDoc) -> IndexMap<String, String> {
205    let mut entries = IndexMap::new();
206    if let Some(start) = &doc.start {
207        entries.insert("default".to_string(), start.clone());
208    } else if doc.nodes.contains_key("in") {
209        entries.insert("default".to_string(), "in".to_string());
210    } else if let Some(first) = doc.nodes.keys().next() {
211        entries.insert("default".to_string(), first.clone());
212    }
213    for (k, v) in &doc.entrypoints {
214        if let Some(target) = v.as_str() {
215            entries.insert(k.clone(), target.to_string());
216        }
217    }
218    entries
219}
220
221fn parse_routing(node: &NodeDoc, node_id: &str) -> Result<Vec<Route>> {
222    if node.routing.is_null() {
223        return Ok(Vec::new());
224    }
225    if let Some(s) = node.routing.as_str() {
226        return match s {
227            "out" => Ok(vec![Route {
228                out: true,
229                ..Route::default()
230            }]),
231            "reply" => Ok(vec![Route {
232                reply: true,
233                ..Route::default()
234            }]),
235            other => Err(FlowError::Routing {
236                node_id: node_id.to_string(),
237                message: format!("unsupported routing shorthand '{other}'"),
238                location: FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
239            }),
240        };
241    }
242    #[derive(serde::Deserialize)]
243    struct RouteDoc {
244        #[serde(default)]
245        to: Option<String>,
246        #[serde(default)]
247        out: Option<bool>,
248        #[serde(default)]
249        status: Option<String>,
250        #[serde(default)]
251        reply: Option<bool>,
252        #[serde(default)]
253        condition: Option<String>,
254    }
255
256    let routes: Vec<RouteDoc> =
257        serde_json::from_value(node.routing.clone()).map_err(|e| FlowError::Internal {
258            message: format!("routing decode for node '{node_id}': {e}"),
259            location: FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
260        })?;
261
262    Ok(routes
263        .into_iter()
264        .map(|r| Route {
265            to: r.to,
266            out: r.out.unwrap_or(false),
267            status: r.status,
268            reply: r.reply.unwrap_or(false),
269            condition: r.condition,
270        })
271        .collect())
272}
273
274/// Helper for tests: load YAML text straight into Flow IR.
275pub fn parse_flow_to_ir(yaml: &str) -> Result<FlowIr> {
276    let doc = load_ygtc_from_str(yaml)?;
277    FlowIr::from_doc(doc)
278}
279
280fn extract_operation(node: &NodeDoc, node_id: &str) -> Result<(String, Value)> {
281    let reserved = [
282        "routing",
283        "telemetry",
284        "output",
285        "in_map",
286        "out_map",
287        "err_map",
288        "retry",
289        "timeout",
290        "when",
291        "annotations",
292        "meta",
293    ];
294    if let Some(exec) = node.raw.get("component.exec") {
295        let op = node
296            .raw
297            .get("operation")
298            .and_then(Value::as_str)
299            .or(node.operation.as_deref())
300            .unwrap_or("");
301        if op.trim().is_empty() {
302            return Err(FlowError::Internal {
303                message: format!("node '{node_id}' missing operation key"),
304                location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
305            });
306        }
307        return Ok((op.to_string(), exec.clone()));
308    }
309    let mut op_key: Option<String> = None;
310    let mut payload: Option<Value> = None;
311    for (k, v) in &node.raw {
312        if reserved.contains(&k.as_str()) {
313            continue;
314        }
315        if op_key.is_some() {
316            return Err(FlowError::Internal {
317                message: format!(
318                    "node '{node_id}' must have exactly one operation key, found multiple"
319                ),
320                location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
321            });
322        }
323        op_key = Some(k.clone());
324        payload = Some(v.clone());
325    }
326    if let (Some(k), Some(v)) = (op_key, payload) {
327        return Ok((k, v));
328    }
329
330    if let Some(op) = &node.operation {
331        return Ok((op.clone(), node.payload.clone()));
332    }
333
334    Err(FlowError::Internal {
335        message: format!("node '{node_id}' missing operation key"),
336        location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
337    })
338}
339
340#[cfg(test)]
341mod tests {
342    use super::parse_flow_to_ir;
343    use serde_json::json;
344
345    #[test]
346    fn parse_and_roundtrip_preserves_alias_maps() {
347        let yaml = r#"
348id: alias_flow
349type: messaging
350schema_version: 2
351nodes:
352  start:
353    component.exec:
354      component: repo://demo/component
355      config:
356        greeting: hi
357    operation: run
358    in_map:
359      source: $.input
360    out_map:
361      target: $.output
362    err_map:
363      target: $.error
364    routing: out
365"#;
366
367        let flow = parse_flow_to_ir(yaml).expect("parse flow");
368        let node = flow.nodes.get("start").expect("start node");
369        assert_eq!(node.in_map.as_ref(), Some(&json!({ "source": "$.input" })));
370        assert_eq!(
371            node.out_map.as_ref(),
372            Some(&json!({ "target": "$.output" }))
373        );
374        assert_eq!(node.err_map.as_ref(), Some(&json!({ "target": "$.error" })));
375
376        let doc = flow.to_doc().expect("to doc");
377        let raw = &doc.nodes.get("start").expect("start doc node").raw;
378        assert_eq!(raw.get("in_map"), Some(&json!({ "source": "$.input" })));
379        assert_eq!(raw.get("out_map"), Some(&json!({ "target": "$.output" })));
380        assert_eq!(raw.get("err_map"), Some(&json!({ "target": "$.error" })));
381    }
382
383    #[test]
384    fn slot_schema_round_trips_through_ir() {
385        let yaml = r#"
386id: nda_intake
387type: messaging
388schema_version: 2
389slot_schema:
390  - name: counterparty
391    slot_type: string
392    pattern: "between\\s+([A-Z][\\w&. ]*)"
393    required: true
394  - name: due_date
395    slot_type: date
396    required: true
397nodes:
398  start:
399    component.exec:
400      component: repo://demo/component
401    operation: run
402    routing: out
403"#;
404
405        let flow = parse_flow_to_ir(yaml).expect("parse flow");
406        let schema = flow.slot_schema.as_ref().expect("slot_schema present");
407        assert_eq!(schema[0]["name"], "counterparty");
408        assert_eq!(schema[0]["slot_type"], "string");
409        assert_eq!(schema[1]["name"], "due_date");
410        assert_eq!(schema[1]["slot_type"], "date");
411
412        let doc = flow.to_doc().expect("to doc");
413        assert_eq!(doc.slot_schema.as_ref(), Some(schema));
414    }
415
416    #[test]
417    fn flow_without_slot_schema_round_trips_with_none() {
418        let yaml = r#"
419id: legacy
420type: messaging
421nodes:
422  start:
423    component.exec:
424      component: repo://demo/component
425    operation: run
426    routing: out
427"#;
428
429        let flow = parse_flow_to_ir(yaml).expect("parse flow");
430        assert!(flow.slot_schema.is_none());
431        let doc = flow.to_doc().expect("to doc");
432        assert!(doc.slot_schema.is_none());
433
434        // And the serialized YAML must NOT include a slot_schema key (skip-if-none).
435        let yaml_out = serde_yaml_bw::to_string(&doc).expect("serialize");
436        assert!(
437            !yaml_out.contains("slot_schema"),
438            "absent slot_schema must not serialize: {yaml_out}"
439        );
440    }
441}