Skip to main content

greentic_flow/
loader.rs

1use crate::{
2    component_schema::jsonschema_options_with_base,
3    error::{FlowError, FlowErrorLocation, Result, SchemaErrorDetail},
4    model::FlowDoc,
5    path_safety::normalize_under_root,
6};
7use serde::Deserialize;
8use serde_json::Value as JsonValue;
9use serde_json::Value;
10use serde_yaml_bw::Location as YamlLocation;
11use std::{
12    fs, io,
13    path::{Path, PathBuf},
14    sync::OnceLock,
15};
16
17const INLINE_SOURCE: &str = "<inline>";
18const DEFAULT_SCHEMA_LABEL: &str = "https://raw.githubusercontent.com/greenticai/greentic-flow/refs/heads/master/schemas/ygtc.flow.schema.json";
19const EMBEDDED_SCHEMA: &str = include_str!("../schemas/ygtc.flow.schema.json");
20fn schema_file_valid(path: &Path) -> bool {
21    let Ok(text) = fs::read_to_string(path) else {
22        return false;
23    };
24    if text.trim().is_empty() {
25        return false;
26    }
27    serde_json::from_str::<JsonValue>(&text).is_ok()
28}
29
30fn write_schema_atomically(path: &Path) -> io::Result<()> {
31    let parent = path
32        .parent()
33        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "schema path has no parent"))?;
34    let unique = format!(
35        "greentic-flow-config-schema-{}.tmp-{}",
36        std::process::id(),
37        std::time::SystemTime::now()
38            .duration_since(std::time::UNIX_EPOCH)
39            .map(|d| d.as_nanos())
40            .unwrap_or(0)
41    );
42    let tmp = parent.join(unique);
43    fs::write(&tmp, EMBEDDED_SCHEMA)?;
44    match fs::rename(&tmp, path) {
45        Ok(()) => Ok(()),
46        Err(err) => {
47            let _ = fs::remove_file(&tmp);
48            if schema_file_valid(path) {
49                Ok(())
50            } else {
51                Err(err)
52            }
53        }
54    }
55}
56
57/// Ensure a temporary copy of the embedded flow schema exists and return its path.
58pub fn ensure_config_schema_path() -> io::Result<PathBuf> {
59    static CONFIG_SCHEMA_PATH: OnceLock<PathBuf> = OnceLock::new();
60    if let Some(path) = CONFIG_SCHEMA_PATH.get() {
61        if !schema_file_valid(path) {
62            write_schema_atomically(path)?;
63        }
64        return Ok(path.clone());
65    }
66    let mut path = std::env::temp_dir();
67    path.push(format!(
68        "greentic-flow-config-schema-{}.json",
69        env!("CARGO_PKG_VERSION")
70    ));
71    if let Some(parent) = path.parent() {
72        fs::create_dir_all(parent)?;
73    }
74    if !schema_file_valid(&path) {
75        write_schema_atomically(&path)?;
76    }
77    if !schema_file_valid(&path) {
78        return Err(io::Error::new(
79            io::ErrorKind::InvalidData,
80            format!("schema file is invalid after write: {}", path.display()),
81        ));
82    }
83    match CONFIG_SCHEMA_PATH.set(path.clone()) {
84        Ok(()) => Ok(path),
85        Err(_) => Ok(CONFIG_SCHEMA_PATH
86            .get()
87            .expect("config schema path set")
88            .clone()),
89    }
90}
91
92/// Load YGTC YAML from a string using the embedded schema.
93pub fn load_ygtc_from_str(yaml: &str) -> Result<FlowDoc> {
94    load_with_schema_text(
95        yaml,
96        EMBEDDED_SCHEMA,
97        DEFAULT_SCHEMA_LABEL.to_string(),
98        None,
99        INLINE_SOURCE,
100        None,
101    )
102}
103
104/// Load YGTC YAML from a file path using the embedded schema.
105pub fn load_ygtc_from_path(path: &Path) -> Result<FlowDoc> {
106    let safe_path = canonicalize_user_path(path).map_err(|err| FlowError::Internal {
107        message: format!("invalid flow path {}: {err}", path.display()),
108        location: FlowErrorLocation::at_path(path.display().to_string())
109            .with_source_path(Some(path)),
110    })?;
111    let content = fs::read_to_string(&safe_path).map_err(|e| FlowError::Internal {
112        message: format!("failed to read {}: {e}", safe_path.display()),
113        location: FlowErrorLocation::at_path(safe_path.display().to_string())
114            .with_source_path(Some(&safe_path)),
115    })?;
116    load_with_schema_text(
117        &content,
118        EMBEDDED_SCHEMA,
119        DEFAULT_SCHEMA_LABEL.to_string(),
120        None,
121        safe_path.display().to_string(),
122        Some(&safe_path),
123    )
124}
125
126fn canonicalize_user_path(path: &Path) -> io::Result<PathBuf> {
127    if path.as_os_str().is_empty() {
128        return Err(io::Error::new(io::ErrorKind::InvalidInput, "path is empty"));
129    }
130    let candidate = if path.is_absolute() {
131        path.to_path_buf()
132    } else {
133        std::env::current_dir()?.join(path)
134    };
135    let canonical = candidate.canonicalize()?;
136    if !canonical.is_file() {
137        return Err(io::Error::new(
138            io::ErrorKind::InvalidInput,
139            "path does not reference a regular file",
140        ));
141    }
142    Ok(canonical)
143}
144
145/// Load YGTC YAML from a string using a schema file on disk.
146pub fn load_ygtc_from_str_with_schema(yaml: &str, schema_path: &Path) -> Result<FlowDoc> {
147    load_ygtc_from_str_with_source(yaml, schema_path, INLINE_SOURCE)
148}
149
150pub fn load_ygtc_from_str_with_source(
151    yaml: &str,
152    schema_path: &Path,
153    source_label: impl Into<String>,
154) -> Result<FlowDoc> {
155    let schema_root = std::env::current_dir().map_err(|e| FlowError::Internal {
156        message: format!("resolve schema root: {e}"),
157        location: FlowErrorLocation::at_path(schema_path.display().to_string())
158            .with_source_path(Some(schema_path)),
159    })?;
160    let safe_schema_path = if schema_path.is_absolute() {
161        schema_path.to_path_buf()
162    } else {
163        normalize_under_root(&schema_root, schema_path).map_err(|e| FlowError::Internal {
164            message: format!("schema path validation for {}: {e}", schema_path.display()),
165            location: FlowErrorLocation::at_path(schema_path.display().to_string())
166                .with_source_path(Some(schema_path)),
167        })?
168    };
169    let schema_label = safe_schema_path.display().to_string();
170    let schema_text = fs::read_to_string(&safe_schema_path).map_err(|e| FlowError::Internal {
171        message: format!("schema read from {schema_label}: {e}"),
172        location: FlowErrorLocation::at_path(schema_label.clone())
173            .with_source_path(Some(&safe_schema_path)),
174    })?;
175    load_with_schema_text(
176        yaml,
177        &schema_text,
178        schema_label,
179        Some(&safe_schema_path),
180        source_label,
181        None,
182    )
183}
184
185pub(crate) fn load_with_schema_text(
186    yaml: &str,
187    schema_text: &str,
188    schema_label: impl Into<String>,
189    schema_path: Option<&Path>,
190    source_label: impl Into<String>,
191    source_path: Option<&Path>,
192) -> Result<FlowDoc> {
193    let schema_label = schema_label.into();
194    let source_label = source_label.into();
195    let mut v_yaml: serde_yaml_bw::Value =
196        serde_yaml_bw::from_str(yaml).map_err(|e| FlowError::Yaml {
197            message: e.to_string(),
198            location: yaml_error_location(&source_label, source_path, e.location()),
199        })?;
200    ensure_nodes_mapping(&mut v_yaml);
201    let v_json: Value = serde_json::to_value(&v_yaml).map_err(|e| FlowError::Internal {
202        message: format!("yaml->json: {e}"),
203        location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source_path),
204    })?;
205    let schema_version = v_json
206        .get("schema_version")
207        .and_then(Value::as_u64)
208        .unwrap_or(2);
209    let nodes_empty = v_json
210        .get("nodes")
211        .and_then(Value::as_object)
212        .map(|m| m.is_empty())
213        .unwrap_or(false);
214    let reserved_for_count = [
215        "routing",
216        "telemetry",
217        "output",
218        "in_map",
219        "out_map",
220        "err_map",
221        "retry",
222        "timeout",
223        "when",
224        "annotations",
225        "meta",
226        "operation",
227    ];
228    if v_json.get("type").is_none() {
229        return Err(FlowError::Schema {
230            message: format!("{source_label}/type: missing required property 'type'"),
231            details: vec![SchemaErrorDetail {
232                message: "Missing required property 'type'".to_string(),
233                location: FlowErrorLocation::at_path(format!("{source_label}/type"))
234                    .with_source_path(source_path)
235                    .with_json_pointer(Some("/type".to_string())),
236            }],
237            location: FlowErrorLocation::at_path(source_label.clone())
238                .with_source_path(source_path),
239        });
240    }
241
242    if let Some(nodes) = v_json.get("nodes").and_then(Value::as_object) {
243        for id in nodes.keys() {
244            let Some(node_val) = nodes.get(id) else {
245                continue;
246            };
247            let Some(obj) = node_val.as_object() else {
248                continue;
249            };
250            let op_count = obj
251                .keys()
252                .filter(|k| !reserved_for_count.contains(&k.as_str()))
253                .count();
254            let is_component_exec = obj.contains_key("component.exec");
255            let component_combo = is_component_exec && op_count == 2;
256            if op_count != 1 && !(component_combo || schema_version < 2) {
257                return Err(FlowError::NodeComponentShape {
258                    node_id: id.clone(),
259                    location: node_location(&source_label, source_path, id),
260                });
261            }
262        }
263    }
264
265    if !nodes_empty && schema_version >= 2 {
266        validate_json(
267            &v_json,
268            schema_text,
269            &schema_label,
270            schema_path,
271            &source_label,
272            source_path,
273        )?;
274    }
275
276    let mut flow: FlowDoc = match serde_yaml_bw::from_value(v_yaml) {
277        Ok(doc) => doc,
278        Err(e) => {
279            validate_json(
280                &v_json,
281                schema_text,
282                &schema_label,
283                schema_path,
284                &source_label,
285                source_path,
286            )?;
287            return Err(FlowError::Yaml {
288                message: e.to_string(),
289                location: yaml_error_location(&source_label, source_path, None),
290            });
291        }
292    };
293    if flow.schema_version.is_none() {
294        flow.schema_version = Some(2);
295    }
296
297    let node_ids: Vec<String> = flow.nodes.keys().cloned().collect();
298    for id in &node_ids {
299        let node = flow.nodes.get_mut(id).ok_or_else(|| FlowError::Internal {
300            message: format!("node '{id}' missing after load"),
301            location: node_location(&source_label, source_path, id),
302        })?;
303        let reserved = [
304            "routing",
305            "telemetry",
306            "output",
307            "in_map",
308            "out_map",
309            "err_map",
310            "retry",
311            "timeout",
312            "when",
313            "annotations",
314            "meta",
315            "operation",
316        ];
317        let op_count = node
318            .raw
319            .keys()
320            .filter(|k| !reserved.contains(&k.as_str()))
321            .count();
322        let is_component_exec = node.raw.contains_key("component.exec");
323        let component_combo = is_component_exec && op_count == 2;
324        if op_count != 1 && !(component_combo || flow.schema_version.unwrap_or(1) < 2) {
325            return Err(FlowError::NodeComponentShape {
326                node_id: id.clone(),
327                location: node_location(&source_label, source_path, id),
328            });
329        }
330
331        // Structurally validate MCP nodes (op key == "mcp"). This is
332        // offline-only: it checks the payload `server`/`tool`/`arguments`/
333        // `output` shape and never probes the server. The component key is the
334        // single non-reserved raw key; server and tool live in the payload, not
335        // the key, so the key stays a valid greentic_types::ComponentId.
336        if let Some((comp_key, config)) = node
337            .raw
338            .iter()
339            .find(|(k, _)| !reserved.contains(&k.as_str()))
340            && comp_key.as_str() == crate::ir::MCP_COMPONENT
341        {
342            crate::ir::validate_mcp_config(id, config).map_err(|err| match err {
343                FlowError::McpConfig {
344                    node_id, message, ..
345                } => FlowError::McpConfig {
346                    node_id,
347                    message,
348                    location: node_location(&source_label, source_path, id),
349                },
350                other => other,
351            })?;
352        }
353    }
354
355    for (from_id, node) in &flow.nodes {
356        for route in parse_routes(&node.routing, from_id, &source_label, source_path)? {
357            if let Some(to) = &route.to
358                && to != "out"
359                && !flow.nodes.contains_key(to)
360            {
361                return Err(FlowError::MissingNode {
362                    target: to.clone(),
363                    node_id: from_id.clone(),
364                    location: routing_location(&source_label, source_path, from_id),
365                });
366            }
367        }
368    }
369
370    if flow.start.is_none() && flow.nodes.contains_key("in") {
371        flow.start = Some("in".to_string());
372    }
373
374    Ok(flow)
375}
376
377fn parse_routes(
378    raw: &Value,
379    node_id: &str,
380    source_label: &str,
381    source_path: Option<&Path>,
382) -> Result<Vec<RouteDoc>> {
383    if raw.is_null() {
384        return Ok(Vec::new());
385    }
386    if let Some(shorthand) = raw.as_str() {
387        return match shorthand {
388            "out" => Ok(vec![RouteDoc {
389                to: Some("out".to_string()),
390                out: Some(true),
391                status: None,
392                reply: None,
393                condition: None,
394            }]),
395            "reply" => Ok(vec![RouteDoc {
396                to: None,
397                out: None,
398                status: None,
399                reply: Some(true),
400                condition: None,
401            }]),
402            other => Err(FlowError::Routing {
403                node_id: node_id.to_string(),
404                message: format!("invalid routing shorthand '{other}'"),
405                location: routing_location(source_label, source_path, node_id),
406            }),
407        };
408    }
409    serde_json::from_value::<Vec<RouteDoc>>(raw.clone()).map_err(|e| FlowError::Routing {
410        node_id: node_id.to_string(),
411        message: e.to_string(),
412        location: routing_location(source_label, source_path, node_id),
413    })
414}
415
416#[derive(Debug, Clone, Deserialize)]
417struct RouteDoc {
418    #[serde(default)]
419    pub to: Option<String>,
420    #[allow(dead_code)]
421    #[serde(default)]
422    pub out: Option<bool>,
423    #[allow(dead_code)]
424    #[serde(default)]
425    pub status: Option<String>,
426    #[allow(dead_code)]
427    #[serde(default)]
428    pub reply: Option<bool>,
429    #[allow(dead_code)]
430    #[serde(default)]
431    pub condition: Option<String>,
432}
433
434fn validate_json(
435    doc: &Value,
436    schema_text: &str,
437    schema_label: &str,
438    schema_path: Option<&Path>,
439    source_label: &str,
440    source_path: Option<&Path>,
441) -> Result<()> {
442    let validator = validator_for_schema(schema_text, schema_label, schema_path)?;
443    let details: Vec<SchemaErrorDetail> = validator
444        .iter_errors(doc)
445        .map(|e| {
446            let pointer = e.instance_path().to_string();
447            let pointer = if pointer.is_empty() {
448                "/".to_string()
449            } else {
450                pointer
451            };
452            SchemaErrorDetail {
453                message: e.to_string(),
454                location: FlowErrorLocation::at_path(format!("{source_label}{pointer}"))
455                    .with_source_path(source_path)
456                    .with_json_pointer(Some(pointer.clone())),
457            }
458        })
459        .collect();
460    if !details.is_empty() {
461        let message = details
462            .iter()
463            .map(|detail| {
464                let where_str = detail
465                    .location
466                    .describe()
467                    .unwrap_or_else(|| source_label.to_string());
468                format!("{where_str}: {}", detail.message)
469            })
470            .collect::<Vec<_>>()
471            .join("\n");
472        return Err(FlowError::Schema {
473            message,
474            details,
475            location: FlowErrorLocation::at_path(source_label.to_string())
476                .with_source_path(source_path),
477        });
478    }
479    Ok(())
480}
481
482fn validator_for_schema<'a>(
483    schema_text: &'a str,
484    schema_label: &str,
485    schema_path: Option<&Path>,
486) -> Result<&'a jsonschema::Validator> {
487    if schema_path.is_none() && schema_text == EMBEDDED_SCHEMA {
488        static EMBEDDED_VALIDATOR: OnceLock<std::result::Result<jsonschema::Validator, String>> =
489            OnceLock::new();
490        let validator = EMBEDDED_VALIDATOR
491            .get_or_init(|| {
492                let schema: Value = serde_json::from_str(EMBEDDED_SCHEMA)
493                    .map_err(|e| format!("schema parse for {DEFAULT_SCHEMA_LABEL}: {e}"))?;
494                jsonschema_options_with_base(None)
495                    .build(&schema)
496                    .map_err(|e| format!("schema compile for {DEFAULT_SCHEMA_LABEL}: {e}"))
497            })
498            .as_ref()
499            .map_err(|message| FlowError::Internal {
500                message: message.clone(),
501                location: FlowErrorLocation::at_path(schema_label.to_string())
502                    .with_source_path(schema_path),
503            })?;
504        return Ok(validator);
505    }
506
507    let schema: Value = serde_json::from_str(schema_text).map_err(|e| FlowError::Internal {
508        message: format!("schema parse for {schema_label}: {e}"),
509        location: FlowErrorLocation::at_path(schema_label.to_string())
510            .with_source_path(schema_path),
511    })?;
512    let validator = jsonschema_options_with_base(schema_path)
513        .build(&schema)
514        .map_err(|e| FlowError::Internal {
515            message: format!("schema compile for {schema_label}: {e}"),
516            location: FlowErrorLocation::at_path(schema_label.to_string())
517                .with_source_path(schema_path),
518        })?;
519    Ok(Box::leak(Box::new(validator)))
520}
521
522fn ensure_nodes_mapping(doc: &mut serde_yaml_bw::Value) {
523    let Some(mapping) = doc.as_mapping_mut() else {
524        return;
525    };
526    let nodes_key = serde_yaml_bw::Value::String("nodes".to_string(), None);
527    match mapping.get_mut(&nodes_key) {
528        Some(existing) => {
529            if existing.is_null() {
530                *existing = serde_yaml_bw::Value::Mapping(serde_yaml_bw::Mapping::new());
531            }
532        }
533        None => {
534            mapping.insert(
535                nodes_key,
536                serde_yaml_bw::Value::Mapping(serde_yaml_bw::Mapping::new()),
537            );
538        }
539    }
540}
541
542fn node_location(
543    source_label: &str,
544    source_path: Option<&Path>,
545    node_id: &str,
546) -> FlowErrorLocation {
547    FlowErrorLocation::at_path(format!("{source_label}::nodes.{node_id}"))
548        .with_source_path(source_path)
549}
550
551fn routing_location(
552    source_label: &str,
553    source_path: Option<&Path>,
554    node_id: &str,
555) -> FlowErrorLocation {
556    FlowErrorLocation::at_path(format!("{source_label}::nodes.{node_id}.routing"))
557        .with_source_path(source_path)
558}
559
560pub(crate) fn yaml_error_location(
561    source_label: &str,
562    source_path: Option<&Path>,
563    loc: Option<YamlLocation>,
564) -> FlowErrorLocation {
565    if let Some(loc) = loc {
566        FlowErrorLocation::at_path_with_position(
567            source_label.to_string(),
568            Some(loc.line()),
569            Some(loc.column()),
570        )
571        .with_source_path(source_path)
572    } else {
573        FlowErrorLocation::at_path(source_label.to_string()).with_source_path(source_path)
574    }
575}