Skip to main content

greentic_flow/
flow_ir.rs

1use indexmap::IndexMap;
2use serde::{Deserialize, Serialize};
3use serde_json::{Map, Value};
4
5use crate::{
6    error::{FlowError, FlowErrorLocation, Result},
7    loader::load_ygtc_from_str,
8    model::{FlowDoc, NodeDoc},
9};
10
11/// Typed intermediate representation for flows, suitable for planning edits before
12/// rendering back into YGTC YAML.
13#[derive(Debug, Clone)]
14pub struct FlowIr {
15    pub id: String,
16    pub kind: String,
17    pub schema_version: Option<u32>,
18    pub entrypoints: IndexMap<String, String>,
19    pub nodes: IndexMap<String, NodeIr>,
20}
21
22#[derive(Debug, Clone)]
23pub struct NodeIr {
24    pub id: String,
25    pub operation: String,
26    pub payload: Value,
27    pub output: Value,
28    pub routing: Vec<Route>,
29    pub telemetry: Option<Value>,
30}
31
32#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
33pub struct Route {
34    #[serde(default, skip_serializing_if = "Option::is_none")]
35    pub to: Option<String>,
36    #[serde(default, skip_serializing_if = "is_false")]
37    pub out: bool,
38    #[serde(default, skip_serializing_if = "Option::is_none")]
39    pub status: Option<String>,
40    #[serde(default, skip_serializing_if = "is_false")]
41    pub reply: bool,
42}
43
44fn is_false(value: &bool) -> bool {
45    !*value
46}
47
48impl FlowIr {
49    pub fn from_doc(doc: FlowDoc) -> Result<Self> {
50        let schema_version = doc.schema_version;
51        let entrypoints = resolve_entrypoints(&doc);
52        let mut nodes = IndexMap::new();
53        for (id, node_doc) in doc.nodes {
54            let (operation, payload) = extract_operation(&node_doc, &id)?;
55            let routing = parse_routing(&node_doc, &id)?;
56            let output = node_doc
57                .raw
58                .get("output")
59                .cloned()
60                .unwrap_or_else(|| Value::Object(Map::new()));
61            nodes.insert(
62                id.clone(),
63                NodeIr {
64                    id: id.clone(),
65                    operation,
66                    payload,
67                    output,
68                    routing,
69                    telemetry: node_doc
70                        .telemetry
71                        .clone()
72                        .and_then(|t| serde_json::to_value(t).ok()),
73                },
74            );
75        }
76
77        Ok(FlowIr {
78            id: doc.id,
79            kind: doc.flow_type,
80            schema_version,
81            entrypoints,
82            nodes,
83        })
84    }
85
86    pub fn to_doc(&self) -> Result<FlowDoc> {
87        let mut nodes: IndexMap<String, NodeDoc> = IndexMap::new();
88        for (id, node_ir) in &self.nodes {
89            let mut raw = IndexMap::new();
90            raw.insert(node_ir.operation.clone(), node_ir.payload.clone());
91            if !node_ir.output.is_object()
92                || !node_ir
93                    .output
94                    .as_object()
95                    .map(|m| m.is_empty())
96                    .unwrap_or(false)
97            {
98                raw.insert("output".to_string(), node_ir.output.clone());
99            }
100            let routing_value =
101                serde_json::to_value(&node_ir.routing).map_err(|e| FlowError::Internal {
102                    message: format!("serialize routing for node '{id}': {e}"),
103                    location: FlowErrorLocation::at_path(format!("nodes.{id}.routing")),
104                })?;
105            let routing_yaml = if node_ir.routing.len() == 1
106                && node_ir.routing[0].out
107                && node_ir.routing[0].to.is_none()
108                && !node_ir.routing[0].reply
109                && node_ir.routing[0].status.is_none()
110            {
111                Value::String("out".to_string())
112            } else if node_ir.routing.len() == 1
113                && node_ir.routing[0].reply
114                && node_ir.routing[0].to.is_none()
115                && !node_ir.routing[0].out
116                && node_ir.routing[0].status.is_none()
117            {
118                Value::String("reply".to_string())
119            } else {
120                routing_value
121            };
122            nodes.insert(
123                id.clone(),
124                NodeDoc {
125                    routing: routing_yaml,
126                    telemetry: node_ir
127                        .telemetry
128                        .as_ref()
129                        .and_then(|t| serde_json::from_value(t.clone()).ok()),
130                    operation: Some(node_ir.operation.clone()),
131                    payload: node_ir.payload.clone(),
132                    raw,
133                },
134            );
135        }
136
137        Ok(FlowDoc {
138            id: self.id.clone(),
139            title: None,
140            description: None,
141            flow_type: self.kind.clone(),
142            start: self.entrypoints.get("default").cloned(),
143            parameters: Value::Object(Map::new()),
144            tags: Vec::new(),
145            schema_version: self.schema_version,
146            entrypoints: IndexMap::new(),
147            nodes,
148        })
149    }
150}
151
152fn resolve_entrypoints(doc: &FlowDoc) -> IndexMap<String, String> {
153    let mut entries = IndexMap::new();
154    if let Some(start) = &doc.start {
155        entries.insert("default".to_string(), start.clone());
156    } else if doc.nodes.contains_key("in") {
157        entries.insert("default".to_string(), "in".to_string());
158    } else if let Some(first) = doc.nodes.keys().next() {
159        entries.insert("default".to_string(), first.clone());
160    }
161    for (k, v) in &doc.entrypoints {
162        if let Some(target) = v.as_str() {
163            entries.insert(k.clone(), target.to_string());
164        }
165    }
166    entries
167}
168
169fn parse_routing(node: &NodeDoc, node_id: &str) -> Result<Vec<Route>> {
170    if node.routing.is_null() {
171        return Ok(Vec::new());
172    }
173    if let Some(s) = node.routing.as_str() {
174        return match s {
175            "out" => Ok(vec![Route {
176                out: true,
177                ..Route::default()
178            }]),
179            "reply" => Ok(vec![Route {
180                reply: true,
181                ..Route::default()
182            }]),
183            other => Err(FlowError::Routing {
184                node_id: node_id.to_string(),
185                message: format!("unsupported routing shorthand '{other}'"),
186                location: FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
187            }),
188        };
189    }
190    #[derive(serde::Deserialize)]
191    struct RouteDoc {
192        #[serde(default)]
193        to: Option<String>,
194        #[serde(default)]
195        out: Option<bool>,
196        #[serde(default)]
197        status: Option<String>,
198        #[serde(default)]
199        reply: Option<bool>,
200    }
201
202    let routes: Vec<RouteDoc> =
203        serde_json::from_value(node.routing.clone()).map_err(|e| FlowError::Internal {
204            message: format!("routing decode for node '{node_id}': {e}"),
205            location: FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
206        })?;
207
208    Ok(routes
209        .into_iter()
210        .map(|r| Route {
211            to: r.to,
212            out: r.out.unwrap_or(false),
213            status: r.status,
214            reply: r.reply.unwrap_or(false),
215        })
216        .collect())
217}
218
219/// Helper for tests: load YAML text straight into Flow IR.
220pub fn parse_flow_to_ir(yaml: &str) -> Result<FlowIr> {
221    let doc = load_ygtc_from_str(yaml)?;
222    FlowIr::from_doc(doc)
223}
224
225fn extract_operation(node: &NodeDoc, node_id: &str) -> Result<(String, Value)> {
226    let reserved = [
227        "routing",
228        "telemetry",
229        "output",
230        "retry",
231        "timeout",
232        "when",
233        "annotations",
234        "meta",
235    ];
236    if let Some(exec) = node.raw.get("component.exec") {
237        let op = node
238            .raw
239            .get("operation")
240            .and_then(Value::as_str)
241            .or(node.operation.as_deref())
242            .unwrap_or("");
243        if op.trim().is_empty() {
244            return Err(FlowError::Internal {
245                message: format!("node '{node_id}' missing operation key"),
246                location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
247            });
248        }
249        return Ok((op.to_string(), exec.clone()));
250    }
251    let mut op_key: Option<String> = None;
252    let mut payload: Option<Value> = None;
253    for (k, v) in &node.raw {
254        if reserved.contains(&k.as_str()) {
255            continue;
256        }
257        if op_key.is_some() {
258            return Err(FlowError::Internal {
259                message: format!(
260                    "node '{node_id}' must have exactly one operation key, found multiple"
261                ),
262                location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
263            });
264        }
265        op_key = Some(k.clone());
266        payload = Some(v.clone());
267    }
268    if let (Some(k), Some(v)) = (op_key, payload) {
269        return Ok((k, v));
270    }
271
272    if let Some(op) = &node.operation {
273        return Ok((op.clone(), node.payload.clone()));
274    }
275
276    Err(FlowError::Internal {
277        message: format!("node '{node_id}' missing operation key"),
278        location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
279    })
280}