greentic_flow/add_step/
normalize.rs1use serde_json::Value;
2
3use crate::{
4 error::{FlowError, FlowErrorLocation, Result},
5 flow_ir::Route,
6};
7
8#[derive(Debug, Clone)]
9pub struct NormalizedNode {
10 pub operation: String,
11 pub payload: Value,
12 pub routing: Vec<Route>,
13 pub telemetry: Option<Value>,
14}
15
16pub fn normalize_node_map(value: Value) -> Result<NormalizedNode> {
17 let mut map = value
18 .as_object()
19 .cloned()
20 .ok_or_else(|| FlowError::Internal {
21 message: "node must be an object".to_string(),
22 location: FlowErrorLocation::at_path("node".to_string()),
23 })?;
24
25 if map.contains_key("tool") {
26 return Err(FlowError::Internal {
27 message: "Legacy tool emission is not supported. Update greentic-component to emit component.exec nodes without tool."
28 .to_string(),
29 location: FlowErrorLocation::at_path("node.tool".to_string()),
30 });
31 }
32
33 let mut op_key: Option<String> = None;
34 let mut op_value: Option<Value> = None;
35 let mut routing: Option<Value> = None;
36 let mut telemetry: Option<Value> = None;
37
38 for (key, val) in map.clone() {
39 match key.as_str() {
40 "routing" => {
41 routing = Some(val.clone());
42 map.remove(&key);
43 }
44 "telemetry" => {
45 telemetry = Some(val.clone());
46 map.remove(&key);
47 }
48 _ => {}
49 }
50 }
51
52 if map.contains_key("component.exec") {
54 let mut op = value
55 .get("operation")
56 .and_then(Value::as_str)
57 .unwrap_or("")
58 .to_string();
59 if op.trim().is_empty()
60 && let Some(payload_op) = map
61 .get("component.exec")
62 .and_then(|v| v.get("operation"))
63 .and_then(Value::as_str)
64 {
65 op = payload_op.to_string();
66 }
67 if op.trim().is_empty() {
68 return Err(FlowError::Internal {
69 message: "component.exec requires a non-empty operation".to_string(),
70 location: FlowErrorLocation::at_path("node.operation".to_string()),
71 });
72 }
73 let payload = map
74 .remove("component.exec")
75 .unwrap_or(Value::Object(Default::default()));
76 let payload = if let Some(obj) = payload.as_object()
77 && obj.contains_key("operation")
78 {
79 let mut obj = obj.clone();
80 obj.remove("operation");
81 Value::Object(obj)
82 } else {
83 payload
84 };
85 let routes = parse_routes(routing.unwrap_or(Value::Array(Vec::new())))?;
86 return Ok(NormalizedNode {
87 operation: op,
88 payload,
89 routing: routes,
90 telemetry,
91 });
92 }
93
94 for (key, val) in map {
95 if op_key.is_some() {
96 return Err(FlowError::Internal {
97 message: "node must have exactly one operation key".to_string(),
98 location: FlowErrorLocation::at_path("node".to_string()),
99 });
100 }
101 op_key = Some(key);
102 op_value = Some(val);
103 }
104
105 if op_key.is_none() && value.get("component.exec").is_some() {
107 let op = value
108 .get("operation")
109 .and_then(Value::as_str)
110 .unwrap_or("")
111 .to_string();
112 if op.trim().is_empty() {
113 return Err(FlowError::Internal {
114 message: "component.exec requires a non-empty operation".to_string(),
115 location: FlowErrorLocation::at_path("node.operation".to_string()),
116 });
117 }
118 let payload = value
119 .get("component.exec")
120 .cloned()
121 .unwrap_or(Value::Object(Default::default()));
122 op_key = Some(op);
123 op_value = Some(payload);
124 }
125
126 let operation = op_key.ok_or_else(|| FlowError::Internal {
127 message: "node must contain exactly one operation key".to_string(),
128 location: FlowErrorLocation::at_path("node".to_string()),
129 })?;
130
131 let payload = op_value.unwrap_or(Value::Object(Default::default()));
132
133 let routes = parse_routes(routing.unwrap_or(Value::Array(Vec::new())))?;
134
135 Ok(NormalizedNode {
136 operation,
137 payload,
138 routing: routes,
139 telemetry,
140 })
141}
142
143fn parse_routes(raw: Value) -> Result<Vec<Route>> {
144 if raw.is_null() {
145 return Ok(Vec::new());
146 }
147
148 if let Some(shorthand) = raw.as_str() {
149 return match shorthand {
150 "out" => Ok(vec![Route {
151 out: true,
152 ..Route::default()
153 }]),
154 "reply" => Ok(vec![Route {
155 reply: true,
156 ..Route::default()
157 }]),
158 other => Err(FlowError::Internal {
159 message: format!("unsupported routing shorthand '{other}'"),
160 location: FlowErrorLocation::at_path("routing".to_string()),
161 }),
162 };
163 }
164
165 let arr = raw.as_array().ok_or_else(|| FlowError::Internal {
166 message: "routing must be an array".to_string(),
167 location: FlowErrorLocation::at_path("routing".to_string()),
168 })?;
169
170 let mut routes = Vec::new();
171 for entry in arr {
172 let obj = entry.as_object().ok_or_else(|| FlowError::Internal {
173 message: "routing entries must be objects".to_string(),
174 location: FlowErrorLocation::at_path("routing".to_string()),
175 })?;
176 for key in obj.keys() {
177 match key.as_str() {
178 "to" | "out" | "status" | "reply" => {}
179 other => {
180 return Err(FlowError::Internal {
181 message: format!("unsupported routing key '{other}'"),
182 location: FlowErrorLocation::at_path("routing".to_string()),
183 });
184 }
185 }
186 }
187 routes.push(Route {
188 to: obj.get("to").and_then(Value::as_str).map(|s| s.to_string()),
189 out: obj.get("out").and_then(Value::as_bool).unwrap_or(false),
190 status: obj
191 .get("status")
192 .and_then(Value::as_str)
193 .map(|s| s.to_string()),
194 reply: obj.get("reply").and_then(Value::as_bool).unwrap_or(false),
195 });
196 }
197
198 Ok(routes)
199}