1use crate::{
2 error::{FlowError, FlowErrorLocation, Result, SchemaErrorDetail},
3 model::FlowDoc,
4 path_safety::normalize_under_root,
5};
6use jsonschema::Draft;
7use serde::Deserialize;
8use serde_json::Value;
9use serde_yaml_bw::Location as YamlLocation;
10use std::{fs, path::Path};
11
12const INLINE_SOURCE: &str = "<inline>";
13const DEFAULT_SCHEMA_LABEL: &str = "https://raw.githubusercontent.com/greentic-ai/greentic-flow/refs/heads/master/schemas/ygtc.flow.schema.json";
14const EMBEDDED_SCHEMA: &str = include_str!("../schemas/ygtc.flow.schema.json");
15
16pub fn load_ygtc_from_str(yaml: &str) -> Result<FlowDoc> {
18 load_with_schema_text(
19 yaml,
20 EMBEDDED_SCHEMA,
21 DEFAULT_SCHEMA_LABEL.to_string(),
22 None,
23 INLINE_SOURCE,
24 None,
25 )
26}
27
28pub fn load_ygtc_from_path(path: &Path) -> Result<FlowDoc> {
30 let content = fs::read_to_string(path).map_err(|e| FlowError::Internal {
31 message: format!("failed to read {}: {e}", path.display()),
32 location: FlowErrorLocation::at_path(path.display().to_string())
33 .with_source_path(Some(path)),
34 })?;
35 load_with_schema_text(
36 &content,
37 EMBEDDED_SCHEMA,
38 DEFAULT_SCHEMA_LABEL.to_string(),
39 None,
40 path.display().to_string(),
41 Some(path),
42 )
43}
44
45pub fn load_ygtc_from_str_with_schema(yaml: &str, schema_path: &Path) -> Result<FlowDoc> {
47 load_ygtc_from_str_with_source(yaml, schema_path, INLINE_SOURCE)
48}
49
50pub fn load_ygtc_from_str_with_source(
51 yaml: &str,
52 schema_path: &Path,
53 source_label: impl Into<String>,
54) -> Result<FlowDoc> {
55 let schema_root = std::env::current_dir().map_err(|e| FlowError::Internal {
56 message: format!("resolve schema root: {e}"),
57 location: FlowErrorLocation::at_path(schema_path.display().to_string())
58 .with_source_path(Some(schema_path)),
59 })?;
60 let safe_schema_path = if schema_path.is_absolute() {
61 schema_path.to_path_buf()
62 } else {
63 normalize_under_root(&schema_root, schema_path).map_err(|e| FlowError::Internal {
64 message: format!("schema path validation for {}: {e}", schema_path.display()),
65 location: FlowErrorLocation::at_path(schema_path.display().to_string())
66 .with_source_path(Some(schema_path)),
67 })?
68 };
69 let schema_label = safe_schema_path.display().to_string();
70 let schema_text = fs::read_to_string(&safe_schema_path).map_err(|e| FlowError::Internal {
71 message: format!("schema read from {schema_label}: {e}"),
72 location: FlowErrorLocation::at_path(schema_label.clone())
73 .with_source_path(Some(&safe_schema_path)),
74 })?;
75 load_with_schema_text(
76 yaml,
77 &schema_text,
78 schema_label,
79 Some(&safe_schema_path),
80 source_label,
81 None,
82 )
83}
84
85pub(crate) fn load_with_schema_text(
86 yaml: &str,
87 schema_text: &str,
88 schema_label: impl Into<String>,
89 schema_path: Option<&Path>,
90 source_label: impl Into<String>,
91 source_path: Option<&Path>,
92) -> Result<FlowDoc> {
93 let schema_label = schema_label.into();
94 let source_label = source_label.into();
95 let mut v_yaml: serde_yaml_bw::Value =
96 serde_yaml_bw::from_str(yaml).map_err(|e| FlowError::Yaml {
97 message: e.to_string(),
98 location: yaml_error_location(&source_label, source_path, e.location()),
99 })?;
100 ensure_nodes_mapping(&mut v_yaml);
101 let v_json: Value = serde_json::to_value(&v_yaml).map_err(|e| FlowError::Internal {
102 message: format!("yaml->json: {e}"),
103 location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source_path),
104 })?;
105 let schema_version = v_json
106 .get("schema_version")
107 .and_then(Value::as_u64)
108 .unwrap_or(2);
109 let nodes_empty = v_json
110 .get("nodes")
111 .and_then(Value::as_object)
112 .map(|m| m.is_empty())
113 .unwrap_or(false);
114 let reserved_for_count = [
115 "routing",
116 "telemetry",
117 "output",
118 "retry",
119 "timeout",
120 "when",
121 "annotations",
122 "meta",
123 "operation",
124 ];
125 let looks_legacy = v_json.get("nodes").and_then(Value::as_object).map(|nodes| {
126 nodes.values().any(|n| {
127 let (op_count, has_dot_key) = n
128 .as_object()
129 .map(|obj| {
130 let op_count = obj
131 .keys()
132 .filter(|k| !reserved_for_count.contains(&k.as_str()))
133 .count();
134 let has_dot_key = obj.keys().any(|k| k.contains('.'));
135 (op_count, has_dot_key)
136 })
137 .unwrap_or((0, false));
138 op_count != 1
139 || has_dot_key
140 || n.get("component.exec").is_some()
141 || n.get("operation").is_some()
142 || n.get("pack_alias").is_some()
143 })
144 });
145 if v_json.get("type").is_none() {
146 return Err(FlowError::Schema {
147 message: format!("{source_label}/type: missing required property 'type'"),
148 details: vec![SchemaErrorDetail {
149 message: "Missing required property 'type'".to_string(),
150 location: FlowErrorLocation::at_path(format!("{source_label}/type"))
151 .with_source_path(source_path)
152 .with_json_pointer(Some("/type".to_string())),
153 }],
154 location: FlowErrorLocation::at_path(source_label.clone())
155 .with_source_path(source_path),
156 });
157 }
158
159 if let Some(nodes) = v_json.get("nodes").and_then(Value::as_object) {
160 for id in nodes.keys() {
161 let Some(node_val) = nodes.get(id) else {
162 continue;
163 };
164 let Some(obj) = node_val.as_object() else {
165 continue;
166 };
167 let op_count = obj
168 .keys()
169 .filter(|k| !reserved_for_count.contains(&k.as_str()))
170 .count();
171 let is_component_exec = obj.contains_key("component.exec");
172 let component_combo = is_component_exec && op_count == 2;
173 if op_count != 1 && !(component_combo || schema_version < 2) {
174 return Err(FlowError::NodeComponentShape {
175 node_id: id.clone(),
176 location: node_location(&source_label, source_path, id),
177 });
178 }
179 }
180 }
181
182 if !nodes_empty && schema_version >= 2 && !looks_legacy.unwrap_or(false) {
183 validate_json(
184 &v_json,
185 schema_text,
186 &schema_label,
187 schema_path,
188 &source_label,
189 source_path,
190 )?;
191 }
192
193 let mut flow: FlowDoc = match serde_yaml_bw::from_value(v_yaml.clone()) {
194 Ok(doc) => doc,
195 Err(e) => {
196 validate_json(
197 &v_json,
198 schema_text,
199 &schema_label,
200 schema_path,
201 &source_label,
202 source_path,
203 )?;
204 return Err(FlowError::Yaml {
205 message: e.to_string(),
206 location: yaml_error_location(&source_label, source_path, None),
207 });
208 }
209 };
210 if flow.schema_version.is_none() {
211 flow.schema_version = Some(2);
212 }
213
214 let node_ids: Vec<String> = flow.nodes.keys().cloned().collect();
215 for id in &node_ids {
216 let node = flow.nodes.get_mut(id).ok_or_else(|| FlowError::Internal {
217 message: format!("node '{id}' missing after load"),
218 location: node_location(&source_label, source_path, id),
219 })?;
220 let reserved = [
221 "routing",
222 "telemetry",
223 "output",
224 "retry",
225 "timeout",
226 "when",
227 "annotations",
228 "meta",
229 "operation",
230 ];
231 let op_count = node
232 .raw
233 .keys()
234 .filter(|k| !reserved.contains(&k.as_str()))
235 .count();
236 let is_component_exec = node.raw.contains_key("component.exec");
237 let component_combo = is_component_exec && op_count == 2;
238 if op_count != 1 && !(component_combo || flow.schema_version.unwrap_or(1) < 2) {
239 return Err(FlowError::NodeComponentShape {
240 node_id: id.clone(),
241 location: node_location(&source_label, source_path, id),
242 });
243 }
244 }
245
246 for (from_id, node) in &flow.nodes {
247 for route in parse_routes(&node.routing, from_id, &source_label, source_path)? {
248 if let Some(to) = &route.to
249 && to != "out"
250 && !flow.nodes.contains_key(to)
251 {
252 return Err(FlowError::MissingNode {
253 target: to.clone(),
254 node_id: from_id.clone(),
255 location: routing_location(&source_label, source_path, from_id),
256 });
257 }
258 }
259 }
260
261 if flow.start.is_none() && flow.nodes.contains_key("in") {
262 flow.start = Some("in".to_string());
263 }
264
265 Ok(flow)
266}
267
268fn parse_routes(
269 raw: &Value,
270 node_id: &str,
271 source_label: &str,
272 source_path: Option<&Path>,
273) -> Result<Vec<RouteDoc>> {
274 if raw.is_null() {
275 return Ok(Vec::new());
276 }
277 if let Some(shorthand) = raw.as_str() {
278 return match shorthand {
279 "out" => Ok(vec![RouteDoc {
280 to: Some("out".to_string()),
281 out: Some(true),
282 status: None,
283 reply: None,
284 }]),
285 "reply" => Ok(vec![RouteDoc {
286 to: None,
287 out: None,
288 status: None,
289 reply: Some(true),
290 }]),
291 other => Err(FlowError::Routing {
292 node_id: node_id.to_string(),
293 message: format!("invalid routing shorthand '{other}'"),
294 location: routing_location(source_label, source_path, node_id),
295 }),
296 };
297 }
298 serde_json::from_value::<Vec<RouteDoc>>(raw.clone()).map_err(|e| FlowError::Routing {
299 node_id: node_id.to_string(),
300 message: e.to_string(),
301 location: routing_location(source_label, source_path, node_id),
302 })
303}
304
305#[derive(Debug, Clone, Deserialize)]
306struct RouteDoc {
307 #[serde(default)]
308 pub to: Option<String>,
309 #[allow(dead_code)]
310 #[serde(default)]
311 pub out: Option<bool>,
312 #[allow(dead_code)]
313 #[serde(default)]
314 pub status: Option<String>,
315 #[allow(dead_code)]
316 #[serde(default)]
317 pub reply: Option<bool>,
318}
319
320fn validate_json(
321 doc: &Value,
322 schema_text: &str,
323 schema_label: &str,
324 schema_path: Option<&Path>,
325 source_label: &str,
326 source_path: Option<&Path>,
327) -> Result<()> {
328 let schema: Value = serde_json::from_str(schema_text).map_err(|e| FlowError::Internal {
329 message: format!("schema parse for {schema_label}: {e}"),
330 location: FlowErrorLocation::at_path(schema_label.to_string())
331 .with_source_path(schema_path),
332 })?;
333 let validator = jsonschema::options()
334 .with_draft(Draft::Draft202012)
335 .build(&schema)
336 .map_err(|e| FlowError::Internal {
337 message: format!("schema compile for {schema_label}: {e}"),
338 location: FlowErrorLocation::at_path(schema_label.to_string())
339 .with_source_path(schema_path),
340 })?;
341 let details: Vec<SchemaErrorDetail> = validator
342 .iter_errors(doc)
343 .map(|e| {
344 let pointer = e.instance_path().to_string();
345 let pointer = if pointer.is_empty() {
346 "/".to_string()
347 } else {
348 pointer
349 };
350 SchemaErrorDetail {
351 message: e.to_string(),
352 location: FlowErrorLocation::at_path(format!("{source_label}{pointer}"))
353 .with_source_path(source_path)
354 .with_json_pointer(Some(pointer.clone())),
355 }
356 })
357 .collect();
358 if !details.is_empty() {
359 let message = details
360 .iter()
361 .map(|detail| {
362 let where_str = detail
363 .location
364 .describe()
365 .unwrap_or_else(|| source_label.to_string());
366 format!("{where_str}: {}", detail.message)
367 })
368 .collect::<Vec<_>>()
369 .join("\n");
370 return Err(FlowError::Schema {
371 message,
372 details,
373 location: FlowErrorLocation::at_path(source_label.to_string())
374 .with_source_path(source_path),
375 });
376 }
377 Ok(())
378}
379
380fn ensure_nodes_mapping(doc: &mut serde_yaml_bw::Value) {
381 let Some(mapping) = doc.as_mapping_mut() else {
382 return;
383 };
384 let nodes_key = serde_yaml_bw::Value::String("nodes".to_string(), None);
385 match mapping.get_mut(&nodes_key) {
386 Some(existing) => {
387 if existing.is_null() {
388 *existing = serde_yaml_bw::Value::Mapping(serde_yaml_bw::Mapping::new());
389 }
390 }
391 None => {
392 mapping.insert(
393 nodes_key,
394 serde_yaml_bw::Value::Mapping(serde_yaml_bw::Mapping::new()),
395 );
396 }
397 }
398}
399
400fn node_location(
401 source_label: &str,
402 source_path: Option<&Path>,
403 node_id: &str,
404) -> FlowErrorLocation {
405 FlowErrorLocation::at_path(format!("{source_label}::nodes.{node_id}"))
406 .with_source_path(source_path)
407}
408
409fn routing_location(
410 source_label: &str,
411 source_path: Option<&Path>,
412 node_id: &str,
413) -> FlowErrorLocation {
414 FlowErrorLocation::at_path(format!("{source_label}::nodes.{node_id}.routing"))
415 .with_source_path(source_path)
416}
417
418pub(crate) fn yaml_error_location(
419 source_label: &str,
420 source_path: Option<&Path>,
421 loc: Option<YamlLocation>,
422) -> FlowErrorLocation {
423 if let Some(loc) = loc {
424 FlowErrorLocation::at_path_with_position(
425 source_label.to_string(),
426 Some(loc.line()),
427 Some(loc.column()),
428 )
429 .with_source_path(source_path)
430 } else {
431 FlowErrorLocation::at_path(source_label.to_string()).with_source_path(source_path)
432 }
433}