greentic_flow/
loader.rs

1use crate::{
2    error::{FlowError, FlowErrorLocation, Result, SchemaErrorDetail},
3    model::FlowDoc,
4    path_safety::normalize_under_root,
5};
6use jsonschema::Draft;
7use serde::Deserialize;
8use serde_json::Value;
9use serde_yaml_bw::Location as YamlLocation;
10use std::{fs, path::Path};
11
12const INLINE_SOURCE: &str = "<inline>";
13const DEFAULT_SCHEMA_LABEL: &str = "https://raw.githubusercontent.com/greentic-ai/greentic-flow/refs/heads/master/schemas/ygtc.flow.schema.json";
14const EMBEDDED_SCHEMA: &str = include_str!("../schemas/ygtc.flow.schema.json");
15
16/// Load YGTC YAML from a string using the embedded schema.
17pub fn load_ygtc_from_str(yaml: &str) -> Result<FlowDoc> {
18    load_with_schema_text(
19        yaml,
20        EMBEDDED_SCHEMA,
21        DEFAULT_SCHEMA_LABEL.to_string(),
22        None,
23        INLINE_SOURCE,
24        None,
25    )
26}
27
28/// Load YGTC YAML from a file path using the embedded schema.
29pub fn load_ygtc_from_path(path: &Path) -> Result<FlowDoc> {
30    let content = fs::read_to_string(path).map_err(|e| FlowError::Internal {
31        message: format!("failed to read {}: {e}", path.display()),
32        location: FlowErrorLocation::at_path(path.display().to_string())
33            .with_source_path(Some(path)),
34    })?;
35    load_with_schema_text(
36        &content,
37        EMBEDDED_SCHEMA,
38        DEFAULT_SCHEMA_LABEL.to_string(),
39        None,
40        path.display().to_string(),
41        Some(path),
42    )
43}
44
45/// Load YGTC YAML from a string using a schema file on disk.
46pub fn load_ygtc_from_str_with_schema(yaml: &str, schema_path: &Path) -> Result<FlowDoc> {
47    load_ygtc_from_str_with_source(yaml, schema_path, INLINE_SOURCE)
48}
49
50pub fn load_ygtc_from_str_with_source(
51    yaml: &str,
52    schema_path: &Path,
53    source_label: impl Into<String>,
54) -> Result<FlowDoc> {
55    let schema_root = std::env::current_dir().map_err(|e| FlowError::Internal {
56        message: format!("resolve schema root: {e}"),
57        location: FlowErrorLocation::at_path(schema_path.display().to_string())
58            .with_source_path(Some(schema_path)),
59    })?;
60    let safe_schema_path = if schema_path.is_absolute() {
61        schema_path.to_path_buf()
62    } else {
63        normalize_under_root(&schema_root, schema_path).map_err(|e| FlowError::Internal {
64            message: format!("schema path validation for {}: {e}", schema_path.display()),
65            location: FlowErrorLocation::at_path(schema_path.display().to_string())
66                .with_source_path(Some(schema_path)),
67        })?
68    };
69    let schema_label = safe_schema_path.display().to_string();
70    let schema_text = fs::read_to_string(&safe_schema_path).map_err(|e| FlowError::Internal {
71        message: format!("schema read from {schema_label}: {e}"),
72        location: FlowErrorLocation::at_path(schema_label.clone())
73            .with_source_path(Some(&safe_schema_path)),
74    })?;
75    load_with_schema_text(
76        yaml,
77        &schema_text,
78        schema_label,
79        Some(&safe_schema_path),
80        source_label,
81        None,
82    )
83}
84
85pub(crate) fn load_with_schema_text(
86    yaml: &str,
87    schema_text: &str,
88    schema_label: impl Into<String>,
89    schema_path: Option<&Path>,
90    source_label: impl Into<String>,
91    source_path: Option<&Path>,
92) -> Result<FlowDoc> {
93    let schema_label = schema_label.into();
94    let source_label = source_label.into();
95    let mut v_yaml: serde_yaml_bw::Value =
96        serde_yaml_bw::from_str(yaml).map_err(|e| FlowError::Yaml {
97            message: e.to_string(),
98            location: yaml_error_location(&source_label, source_path, e.location()),
99        })?;
100    ensure_nodes_mapping(&mut v_yaml);
101    let v_json: Value = serde_json::to_value(&v_yaml).map_err(|e| FlowError::Internal {
102        message: format!("yaml->json: {e}"),
103        location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source_path),
104    })?;
105    let schema_version = v_json
106        .get("schema_version")
107        .and_then(Value::as_u64)
108        .unwrap_or(2);
109    let nodes_empty = v_json
110        .get("nodes")
111        .and_then(Value::as_object)
112        .map(|m| m.is_empty())
113        .unwrap_or(false);
114    let reserved_for_count = [
115        "routing",
116        "telemetry",
117        "output",
118        "retry",
119        "timeout",
120        "when",
121        "annotations",
122        "meta",
123        "operation",
124    ];
125    let looks_legacy = v_json.get("nodes").and_then(Value::as_object).map(|nodes| {
126        nodes.values().any(|n| {
127            let (op_count, has_dot_key) = n
128                .as_object()
129                .map(|obj| {
130                    let op_count = obj
131                        .keys()
132                        .filter(|k| !reserved_for_count.contains(&k.as_str()))
133                        .count();
134                    let has_dot_key = obj.keys().any(|k| k.contains('.'));
135                    (op_count, has_dot_key)
136                })
137                .unwrap_or((0, false));
138            op_count != 1
139                || has_dot_key
140                || n.get("component.exec").is_some()
141                || n.get("operation").is_some()
142                || n.get("pack_alias").is_some()
143        })
144    });
145    if v_json.get("type").is_none() {
146        return Err(FlowError::Schema {
147            message: format!("{source_label}/type: missing required property 'type'"),
148            details: vec![SchemaErrorDetail {
149                message: "Missing required property 'type'".to_string(),
150                location: FlowErrorLocation::at_path(format!("{source_label}/type"))
151                    .with_source_path(source_path)
152                    .with_json_pointer(Some("/type".to_string())),
153            }],
154            location: FlowErrorLocation::at_path(source_label.clone())
155                .with_source_path(source_path),
156        });
157    }
158
159    if let Some(nodes) = v_json.get("nodes").and_then(Value::as_object) {
160        for id in nodes.keys() {
161            let Some(node_val) = nodes.get(id) else {
162                continue;
163            };
164            let Some(obj) = node_val.as_object() else {
165                continue;
166            };
167            let op_count = obj
168                .keys()
169                .filter(|k| !reserved_for_count.contains(&k.as_str()))
170                .count();
171            let is_component_exec = obj.contains_key("component.exec");
172            let component_combo = is_component_exec && op_count == 2;
173            if op_count != 1 && !(component_combo || schema_version < 2) {
174                return Err(FlowError::NodeComponentShape {
175                    node_id: id.clone(),
176                    location: node_location(&source_label, source_path, id),
177                });
178            }
179        }
180    }
181
182    if !nodes_empty && schema_version >= 2 && !looks_legacy.unwrap_or(false) {
183        validate_json(
184            &v_json,
185            schema_text,
186            &schema_label,
187            schema_path,
188            &source_label,
189            source_path,
190        )?;
191    }
192
193    let mut flow: FlowDoc = match serde_yaml_bw::from_value(v_yaml.clone()) {
194        Ok(doc) => doc,
195        Err(e) => {
196            validate_json(
197                &v_json,
198                schema_text,
199                &schema_label,
200                schema_path,
201                &source_label,
202                source_path,
203            )?;
204            return Err(FlowError::Yaml {
205                message: e.to_string(),
206                location: yaml_error_location(&source_label, source_path, None),
207            });
208        }
209    };
210    if flow.schema_version.is_none() {
211        flow.schema_version = Some(2);
212    }
213
214    let node_ids: Vec<String> = flow.nodes.keys().cloned().collect();
215    for id in &node_ids {
216        let node = flow.nodes.get_mut(id).ok_or_else(|| FlowError::Internal {
217            message: format!("node '{id}' missing after load"),
218            location: node_location(&source_label, source_path, id),
219        })?;
220        let reserved = [
221            "routing",
222            "telemetry",
223            "output",
224            "retry",
225            "timeout",
226            "when",
227            "annotations",
228            "meta",
229            "operation",
230        ];
231        let op_count = node
232            .raw
233            .keys()
234            .filter(|k| !reserved.contains(&k.as_str()))
235            .count();
236        let is_component_exec = node.raw.contains_key("component.exec");
237        let component_combo = is_component_exec && op_count == 2;
238        if op_count != 1 && !(component_combo || flow.schema_version.unwrap_or(1) < 2) {
239            return Err(FlowError::NodeComponentShape {
240                node_id: id.clone(),
241                location: node_location(&source_label, source_path, id),
242            });
243        }
244    }
245
246    for (from_id, node) in &flow.nodes {
247        for route in parse_routes(&node.routing, from_id, &source_label, source_path)? {
248            if let Some(to) = &route.to
249                && to != "out"
250                && !flow.nodes.contains_key(to)
251            {
252                return Err(FlowError::MissingNode {
253                    target: to.clone(),
254                    node_id: from_id.clone(),
255                    location: routing_location(&source_label, source_path, from_id),
256                });
257            }
258        }
259    }
260
261    if flow.start.is_none() && flow.nodes.contains_key("in") {
262        flow.start = Some("in".to_string());
263    }
264
265    Ok(flow)
266}
267
268fn parse_routes(
269    raw: &Value,
270    node_id: &str,
271    source_label: &str,
272    source_path: Option<&Path>,
273) -> Result<Vec<RouteDoc>> {
274    if raw.is_null() {
275        return Ok(Vec::new());
276    }
277    if let Some(shorthand) = raw.as_str() {
278        return match shorthand {
279            "out" => Ok(vec![RouteDoc {
280                to: Some("out".to_string()),
281                out: Some(true),
282                status: None,
283                reply: None,
284            }]),
285            "reply" => Ok(vec![RouteDoc {
286                to: None,
287                out: None,
288                status: None,
289                reply: Some(true),
290            }]),
291            other => Err(FlowError::Routing {
292                node_id: node_id.to_string(),
293                message: format!("invalid routing shorthand '{other}'"),
294                location: routing_location(source_label, source_path, node_id),
295            }),
296        };
297    }
298    serde_json::from_value::<Vec<RouteDoc>>(raw.clone()).map_err(|e| FlowError::Routing {
299        node_id: node_id.to_string(),
300        message: e.to_string(),
301        location: routing_location(source_label, source_path, node_id),
302    })
303}
304
305#[derive(Debug, Clone, Deserialize)]
306struct RouteDoc {
307    #[serde(default)]
308    pub to: Option<String>,
309    #[allow(dead_code)]
310    #[serde(default)]
311    pub out: Option<bool>,
312    #[allow(dead_code)]
313    #[serde(default)]
314    pub status: Option<String>,
315    #[allow(dead_code)]
316    #[serde(default)]
317    pub reply: Option<bool>,
318}
319
320fn validate_json(
321    doc: &Value,
322    schema_text: &str,
323    schema_label: &str,
324    schema_path: Option<&Path>,
325    source_label: &str,
326    source_path: Option<&Path>,
327) -> Result<()> {
328    let schema: Value = serde_json::from_str(schema_text).map_err(|e| FlowError::Internal {
329        message: format!("schema parse for {schema_label}: {e}"),
330        location: FlowErrorLocation::at_path(schema_label.to_string())
331            .with_source_path(schema_path),
332    })?;
333    let validator = jsonschema::options()
334        .with_draft(Draft::Draft202012)
335        .build(&schema)
336        .map_err(|e| FlowError::Internal {
337            message: format!("schema compile for {schema_label}: {e}"),
338            location: FlowErrorLocation::at_path(schema_label.to_string())
339                .with_source_path(schema_path),
340        })?;
341    let details: Vec<SchemaErrorDetail> = validator
342        .iter_errors(doc)
343        .map(|e| {
344            let pointer = e.instance_path().to_string();
345            let pointer = if pointer.is_empty() {
346                "/".to_string()
347            } else {
348                pointer
349            };
350            SchemaErrorDetail {
351                message: e.to_string(),
352                location: FlowErrorLocation::at_path(format!("{source_label}{pointer}"))
353                    .with_source_path(source_path)
354                    .with_json_pointer(Some(pointer.clone())),
355            }
356        })
357        .collect();
358    if !details.is_empty() {
359        let message = details
360            .iter()
361            .map(|detail| {
362                let where_str = detail
363                    .location
364                    .describe()
365                    .unwrap_or_else(|| source_label.to_string());
366                format!("{where_str}: {}", detail.message)
367            })
368            .collect::<Vec<_>>()
369            .join("\n");
370        return Err(FlowError::Schema {
371            message,
372            details,
373            location: FlowErrorLocation::at_path(source_label.to_string())
374                .with_source_path(source_path),
375        });
376    }
377    Ok(())
378}
379
380fn ensure_nodes_mapping(doc: &mut serde_yaml_bw::Value) {
381    let Some(mapping) = doc.as_mapping_mut() else {
382        return;
383    };
384    let nodes_key = serde_yaml_bw::Value::String("nodes".to_string(), None);
385    match mapping.get_mut(&nodes_key) {
386        Some(existing) => {
387            if existing.is_null() {
388                *existing = serde_yaml_bw::Value::Mapping(serde_yaml_bw::Mapping::new());
389            }
390        }
391        None => {
392            mapping.insert(
393                nodes_key,
394                serde_yaml_bw::Value::Mapping(serde_yaml_bw::Mapping::new()),
395            );
396        }
397    }
398}
399
400fn node_location(
401    source_label: &str,
402    source_path: Option<&Path>,
403    node_id: &str,
404) -> FlowErrorLocation {
405    FlowErrorLocation::at_path(format!("{source_label}::nodes.{node_id}"))
406        .with_source_path(source_path)
407}
408
409fn routing_location(
410    source_label: &str,
411    source_path: Option<&Path>,
412    node_id: &str,
413) -> FlowErrorLocation {
414    FlowErrorLocation::at_path(format!("{source_label}::nodes.{node_id}.routing"))
415        .with_source_path(source_path)
416}
417
418pub(crate) fn yaml_error_location(
419    source_label: &str,
420    source_path: Option<&Path>,
421    loc: Option<YamlLocation>,
422) -> FlowErrorLocation {
423    if let Some(loc) = loc {
424        FlowErrorLocation::at_path_with_position(
425            source_label.to_string(),
426            Some(loc.line()),
427            Some(loc.column()),
428        )
429        .with_source_path(source_path)
430    } else {
431        FlowErrorLocation::at_path(source_label.to_string()).with_source_path(source_path)
432    }
433}