Skip to main content

greentic_flow/add_step/
normalize.rs

1use serde_json::Value;
2
3use crate::{
4    error::{FlowError, FlowErrorLocation, Result},
5    flow_ir::Route,
6};
7
8#[derive(Debug, Clone)]
9pub struct NormalizedNode {
10    pub operation: String,
11    pub payload: Value,
12    pub routing: Vec<Route>,
13    pub telemetry: Option<Value>,
14}
15
16pub fn normalize_node_map(value: Value) -> Result<NormalizedNode> {
17    let mut map = value
18        .as_object()
19        .cloned()
20        .ok_or_else(|| FlowError::Internal {
21            message: "node must be an object".to_string(),
22            location: FlowErrorLocation::at_path("node".to_string()),
23        })?;
24
25    if map.contains_key("tool") {
26        return Err(FlowError::Internal {
27            message: "Legacy tool emission is not supported. Update greentic-component to emit component.exec nodes without tool."
28                .to_string(),
29            location: FlowErrorLocation::at_path("node.tool".to_string()),
30        });
31    }
32
33    let mut op_key: Option<String> = None;
34    let mut op_value: Option<Value> = None;
35    let mut routing: Option<Value> = None;
36    let mut telemetry: Option<Value> = None;
37
38    for (key, val) in map.clone() {
39        match key.as_str() {
40            "routing" => {
41                routing = Some(val.clone());
42                map.remove(&key);
43            }
44            "telemetry" => {
45                telemetry = Some(val.clone());
46                map.remove(&key);
47            }
48            _ => {}
49        }
50    }
51
52    // Legacy path: component.exec + operation
53    if map.contains_key("component.exec") {
54        let mut op = value
55            .get("operation")
56            .and_then(Value::as_str)
57            .unwrap_or("")
58            .to_string();
59        if op.trim().is_empty()
60            && let Some(payload_op) = map
61                .get("component.exec")
62                .and_then(|v| v.get("operation"))
63                .and_then(Value::as_str)
64        {
65            op = payload_op.to_string();
66        }
67        if op.trim().is_empty() {
68            return Err(FlowError::Internal {
69                message: "component.exec requires a non-empty operation".to_string(),
70                location: FlowErrorLocation::at_path("node.operation".to_string()),
71            });
72        }
73        let payload = map
74            .remove("component.exec")
75            .unwrap_or(Value::Object(Default::default()));
76        let payload = if let Some(obj) = payload.as_object()
77            && obj.contains_key("operation")
78        {
79            let mut obj = obj.clone();
80            obj.remove("operation");
81            Value::Object(obj)
82        } else {
83            payload
84        };
85        let routes = parse_routes(routing.unwrap_or(Value::Array(Vec::new())))?;
86        return Ok(NormalizedNode {
87            operation: op,
88            payload,
89            routing: routes,
90            telemetry,
91        });
92    }
93
94    for (key, val) in map {
95        if op_key.is_some() {
96            return Err(FlowError::Internal {
97                message: "node must have exactly one operation key".to_string(),
98                location: FlowErrorLocation::at_path("node".to_string()),
99            });
100        }
101        op_key = Some(key);
102        op_value = Some(val);
103    }
104
105    // Legacy path: component.exec + operation
106    if op_key.is_none() && value.get("component.exec").is_some() {
107        let op = value
108            .get("operation")
109            .and_then(Value::as_str)
110            .unwrap_or("")
111            .to_string();
112        if op.trim().is_empty() {
113            return Err(FlowError::Internal {
114                message: "component.exec requires a non-empty operation".to_string(),
115                location: FlowErrorLocation::at_path("node.operation".to_string()),
116            });
117        }
118        let payload = value
119            .get("component.exec")
120            .cloned()
121            .unwrap_or(Value::Object(Default::default()));
122        op_key = Some(op);
123        op_value = Some(payload);
124    }
125
126    let operation = op_key.ok_or_else(|| FlowError::Internal {
127        message: "node must contain exactly one operation key".to_string(),
128        location: FlowErrorLocation::at_path("node".to_string()),
129    })?;
130
131    let payload = op_value.unwrap_or(Value::Object(Default::default()));
132
133    let routes = parse_routes(routing.unwrap_or(Value::Array(Vec::new())))?;
134
135    Ok(NormalizedNode {
136        operation,
137        payload,
138        routing: routes,
139        telemetry,
140    })
141}
142
143fn parse_routes(raw: Value) -> Result<Vec<Route>> {
144    if raw.is_null() {
145        return Ok(Vec::new());
146    }
147
148    if let Some(shorthand) = raw.as_str() {
149        return match shorthand {
150            "out" => Ok(vec![Route {
151                out: true,
152                ..Route::default()
153            }]),
154            "reply" => Ok(vec![Route {
155                reply: true,
156                ..Route::default()
157            }]),
158            other => Err(FlowError::Internal {
159                message: format!("unsupported routing shorthand '{other}'"),
160                location: FlowErrorLocation::at_path("routing".to_string()),
161            }),
162        };
163    }
164
165    let arr = raw.as_array().ok_or_else(|| FlowError::Internal {
166        message: "routing must be an array".to_string(),
167        location: FlowErrorLocation::at_path("routing".to_string()),
168    })?;
169
170    let mut routes = Vec::new();
171    for entry in arr {
172        let obj = entry.as_object().ok_or_else(|| FlowError::Internal {
173            message: "routing entries must be objects".to_string(),
174            location: FlowErrorLocation::at_path("routing".to_string()),
175        })?;
176        for key in obj.keys() {
177            match key.as_str() {
178                "to" | "out" | "status" | "reply" => {}
179                other => {
180                    return Err(FlowError::Internal {
181                        message: format!("unsupported routing key '{other}'"),
182                        location: FlowErrorLocation::at_path("routing".to_string()),
183                    });
184                }
185            }
186        }
187        routes.push(Route {
188            to: obj.get("to").and_then(Value::as_str).map(|s| s.to_string()),
189            out: obj.get("out").and_then(Value::as_bool).unwrap_or(false),
190            status: obj
191                .get("status")
192                .and_then(Value::as_str)
193                .map(|s| s.to_string()),
194            reply: obj.get("reply").and_then(Value::as_bool).unwrap_or(false),
195        });
196    }
197
198    Ok(routes)
199}