1use crate::{
2 component_schema::jsonschema_options_with_base,
3 error::{FlowError, FlowErrorLocation, Result, SchemaErrorDetail},
4 model::FlowDoc,
5 path_safety::normalize_under_root,
6};
7use serde::Deserialize;
8use serde_json::Value as JsonValue;
9use serde_json::Value;
10use serde_yaml_bw::Location as YamlLocation;
11use std::{
12 fs, io,
13 path::{Path, PathBuf},
14 sync::OnceLock,
15};
16
17const INLINE_SOURCE: &str = "<inline>";
18const DEFAULT_SCHEMA_LABEL: &str = "https://raw.githubusercontent.com/greenticai/greentic-flow/refs/heads/master/schemas/ygtc.flow.schema.json";
19const EMBEDDED_SCHEMA: &str = include_str!("../schemas/ygtc.flow.schema.json");
20fn schema_file_valid(path: &Path) -> bool {
21 let Ok(text) = fs::read_to_string(path) else {
22 return false;
23 };
24 if text.trim().is_empty() {
25 return false;
26 }
27 serde_json::from_str::<JsonValue>(&text).is_ok()
28}
29
30fn write_schema_atomically(path: &Path) -> io::Result<()> {
31 let parent = path
32 .parent()
33 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "schema path has no parent"))?;
34 let unique = format!(
35 "greentic-flow-config-schema-{}.tmp-{}",
36 std::process::id(),
37 std::time::SystemTime::now()
38 .duration_since(std::time::UNIX_EPOCH)
39 .map(|d| d.as_nanos())
40 .unwrap_or(0)
41 );
42 let tmp = parent.join(unique);
43 fs::write(&tmp, EMBEDDED_SCHEMA)?;
44 match fs::rename(&tmp, path) {
45 Ok(()) => Ok(()),
46 Err(err) => {
47 let _ = fs::remove_file(&tmp);
48 if schema_file_valid(path) {
49 Ok(())
50 } else {
51 Err(err)
52 }
53 }
54 }
55}
56
57pub fn ensure_config_schema_path() -> io::Result<PathBuf> {
59 static CONFIG_SCHEMA_PATH: OnceLock<PathBuf> = OnceLock::new();
60 if let Some(path) = CONFIG_SCHEMA_PATH.get() {
61 if !schema_file_valid(path) {
62 write_schema_atomically(path)?;
63 }
64 return Ok(path.clone());
65 }
66 let mut path = std::env::temp_dir();
67 path.push(format!(
68 "greentic-flow-config-schema-{}.json",
69 env!("CARGO_PKG_VERSION")
70 ));
71 if let Some(parent) = path.parent() {
72 fs::create_dir_all(parent)?;
73 }
74 if !schema_file_valid(&path) {
75 write_schema_atomically(&path)?;
76 }
77 if !schema_file_valid(&path) {
78 return Err(io::Error::new(
79 io::ErrorKind::InvalidData,
80 format!("schema file is invalid after write: {}", path.display()),
81 ));
82 }
83 match CONFIG_SCHEMA_PATH.set(path.clone()) {
84 Ok(()) => Ok(path),
85 Err(_) => Ok(CONFIG_SCHEMA_PATH
86 .get()
87 .expect("config schema path set")
88 .clone()),
89 }
90}
91
92pub fn load_ygtc_from_str(yaml: &str) -> Result<FlowDoc> {
94 load_with_schema_text(
95 yaml,
96 EMBEDDED_SCHEMA,
97 DEFAULT_SCHEMA_LABEL.to_string(),
98 None,
99 INLINE_SOURCE,
100 None,
101 )
102}
103
104pub fn load_ygtc_from_path(path: &Path) -> Result<FlowDoc> {
106 let safe_path = canonicalize_user_path(path).map_err(|err| FlowError::Internal {
107 message: format!("invalid flow path {}: {err}", path.display()),
108 location: FlowErrorLocation::at_path(path.display().to_string())
109 .with_source_path(Some(path)),
110 })?;
111 let content = fs::read_to_string(&safe_path).map_err(|e| FlowError::Internal {
112 message: format!("failed to read {}: {e}", safe_path.display()),
113 location: FlowErrorLocation::at_path(safe_path.display().to_string())
114 .with_source_path(Some(&safe_path)),
115 })?;
116 load_with_schema_text(
117 &content,
118 EMBEDDED_SCHEMA,
119 DEFAULT_SCHEMA_LABEL.to_string(),
120 None,
121 safe_path.display().to_string(),
122 Some(&safe_path),
123 )
124}
125
126fn canonicalize_user_path(path: &Path) -> io::Result<PathBuf> {
127 if path.as_os_str().is_empty() {
128 return Err(io::Error::new(io::ErrorKind::InvalidInput, "path is empty"));
129 }
130 let candidate = if path.is_absolute() {
131 path.to_path_buf()
132 } else {
133 std::env::current_dir()?.join(path)
134 };
135 let canonical = candidate.canonicalize()?;
136 if !canonical.is_file() {
137 return Err(io::Error::new(
138 io::ErrorKind::InvalidInput,
139 "path does not reference a regular file",
140 ));
141 }
142 Ok(canonical)
143}
144
145pub fn load_ygtc_from_str_with_schema(yaml: &str, schema_path: &Path) -> Result<FlowDoc> {
147 load_ygtc_from_str_with_source(yaml, schema_path, INLINE_SOURCE)
148}
149
150pub fn load_ygtc_from_str_with_source(
151 yaml: &str,
152 schema_path: &Path,
153 source_label: impl Into<String>,
154) -> Result<FlowDoc> {
155 let schema_root = std::env::current_dir().map_err(|e| FlowError::Internal {
156 message: format!("resolve schema root: {e}"),
157 location: FlowErrorLocation::at_path(schema_path.display().to_string())
158 .with_source_path(Some(schema_path)),
159 })?;
160 let safe_schema_path = if schema_path.is_absolute() {
161 schema_path.to_path_buf()
162 } else {
163 normalize_under_root(&schema_root, schema_path).map_err(|e| FlowError::Internal {
164 message: format!("schema path validation for {}: {e}", schema_path.display()),
165 location: FlowErrorLocation::at_path(schema_path.display().to_string())
166 .with_source_path(Some(schema_path)),
167 })?
168 };
169 let schema_label = safe_schema_path.display().to_string();
170 let schema_text = fs::read_to_string(&safe_schema_path).map_err(|e| FlowError::Internal {
171 message: format!("schema read from {schema_label}: {e}"),
172 location: FlowErrorLocation::at_path(schema_label.clone())
173 .with_source_path(Some(&safe_schema_path)),
174 })?;
175 load_with_schema_text(
176 yaml,
177 &schema_text,
178 schema_label,
179 Some(&safe_schema_path),
180 source_label,
181 None,
182 )
183}
184
185pub(crate) fn load_with_schema_text(
186 yaml: &str,
187 schema_text: &str,
188 schema_label: impl Into<String>,
189 schema_path: Option<&Path>,
190 source_label: impl Into<String>,
191 source_path: Option<&Path>,
192) -> Result<FlowDoc> {
193 let schema_label = schema_label.into();
194 let source_label = source_label.into();
195 let mut v_yaml: serde_yaml_bw::Value =
196 serde_yaml_bw::from_str(yaml).map_err(|e| FlowError::Yaml {
197 message: e.to_string(),
198 location: yaml_error_location(&source_label, source_path, e.location()),
199 })?;
200 ensure_nodes_mapping(&mut v_yaml);
201 let v_json: Value = serde_json::to_value(&v_yaml).map_err(|e| FlowError::Internal {
202 message: format!("yaml->json: {e}"),
203 location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source_path),
204 })?;
205 let schema_version = v_json
206 .get("schema_version")
207 .and_then(Value::as_u64)
208 .unwrap_or(2);
209 let nodes_empty = v_json
210 .get("nodes")
211 .and_then(Value::as_object)
212 .map(|m| m.is_empty())
213 .unwrap_or(false);
214 let reserved_for_count = [
215 "routing",
216 "telemetry",
217 "output",
218 "in_map",
219 "out_map",
220 "err_map",
221 "retry",
222 "timeout",
223 "when",
224 "annotations",
225 "meta",
226 "operation",
227 ];
228 if v_json.get("type").is_none() {
229 return Err(FlowError::Schema {
230 message: format!("{source_label}/type: missing required property 'type'"),
231 details: vec![SchemaErrorDetail {
232 message: "Missing required property 'type'".to_string(),
233 location: FlowErrorLocation::at_path(format!("{source_label}/type"))
234 .with_source_path(source_path)
235 .with_json_pointer(Some("/type".to_string())),
236 }],
237 location: FlowErrorLocation::at_path(source_label.clone())
238 .with_source_path(source_path),
239 });
240 }
241
242 if let Some(nodes) = v_json.get("nodes").and_then(Value::as_object) {
243 for id in nodes.keys() {
244 let Some(node_val) = nodes.get(id) else {
245 continue;
246 };
247 let Some(obj) = node_val.as_object() else {
248 continue;
249 };
250 let op_count = obj
251 .keys()
252 .filter(|k| !reserved_for_count.contains(&k.as_str()))
253 .count();
254 let is_component_exec = obj.contains_key("component.exec");
255 let component_combo = is_component_exec && op_count == 2;
256 if op_count != 1 && !(component_combo || schema_version < 2) {
257 return Err(FlowError::NodeComponentShape {
258 node_id: id.clone(),
259 location: node_location(&source_label, source_path, id),
260 });
261 }
262 }
263 }
264
265 if !nodes_empty && schema_version >= 2 {
266 validate_json(
267 &v_json,
268 schema_text,
269 &schema_label,
270 schema_path,
271 &source_label,
272 source_path,
273 )?;
274 }
275
276 let mut flow: FlowDoc = match serde_yaml_bw::from_value(v_yaml) {
277 Ok(doc) => doc,
278 Err(e) => {
279 validate_json(
280 &v_json,
281 schema_text,
282 &schema_label,
283 schema_path,
284 &source_label,
285 source_path,
286 )?;
287 return Err(FlowError::Yaml {
288 message: e.to_string(),
289 location: yaml_error_location(&source_label, source_path, None),
290 });
291 }
292 };
293 if flow.schema_version.is_none() {
294 flow.schema_version = Some(2);
295 }
296
297 let node_ids: Vec<String> = flow.nodes.keys().cloned().collect();
298 for id in &node_ids {
299 let node = flow.nodes.get_mut(id).ok_or_else(|| FlowError::Internal {
300 message: format!("node '{id}' missing after load"),
301 location: node_location(&source_label, source_path, id),
302 })?;
303 let reserved = [
304 "routing",
305 "telemetry",
306 "output",
307 "in_map",
308 "out_map",
309 "err_map",
310 "retry",
311 "timeout",
312 "when",
313 "annotations",
314 "meta",
315 "operation",
316 ];
317 let op_count = node
318 .raw
319 .keys()
320 .filter(|k| !reserved.contains(&k.as_str()))
321 .count();
322 let is_component_exec = node.raw.contains_key("component.exec");
323 let component_combo = is_component_exec && op_count == 2;
324 if op_count != 1 && !(component_combo || flow.schema_version.unwrap_or(1) < 2) {
325 return Err(FlowError::NodeComponentShape {
326 node_id: id.clone(),
327 location: node_location(&source_label, source_path, id),
328 });
329 }
330
331 if let Some((comp_key, config)) = node
337 .raw
338 .iter()
339 .find(|(k, _)| !reserved.contains(&k.as_str()))
340 && comp_key.as_str() == crate::ir::MCP_COMPONENT
341 {
342 crate::ir::validate_mcp_config(id, config).map_err(|err| match err {
343 FlowError::McpConfig {
344 node_id, message, ..
345 } => FlowError::McpConfig {
346 node_id,
347 message,
348 location: node_location(&source_label, source_path, id),
349 },
350 other => other,
351 })?;
352 }
353 }
354
355 for (from_id, node) in &flow.nodes {
356 for route in parse_routes(&node.routing, from_id, &source_label, source_path)? {
357 if let Some(to) = &route.to
358 && to != "out"
359 && !flow.nodes.contains_key(to)
360 {
361 return Err(FlowError::MissingNode {
362 target: to.clone(),
363 node_id: from_id.clone(),
364 location: routing_location(&source_label, source_path, from_id),
365 });
366 }
367 }
368 }
369
370 if flow.start.is_none() && flow.nodes.contains_key("in") {
371 flow.start = Some("in".to_string());
372 }
373
374 Ok(flow)
375}
376
377fn parse_routes(
378 raw: &Value,
379 node_id: &str,
380 source_label: &str,
381 source_path: Option<&Path>,
382) -> Result<Vec<RouteDoc>> {
383 if raw.is_null() {
384 return Ok(Vec::new());
385 }
386 if let Some(shorthand) = raw.as_str() {
387 return match shorthand {
388 "out" => Ok(vec![RouteDoc {
389 to: Some("out".to_string()),
390 out: Some(true),
391 status: None,
392 reply: None,
393 condition: None,
394 }]),
395 "reply" => Ok(vec![RouteDoc {
396 to: None,
397 out: None,
398 status: None,
399 reply: Some(true),
400 condition: None,
401 }]),
402 other => Err(FlowError::Routing {
403 node_id: node_id.to_string(),
404 message: format!("invalid routing shorthand '{other}'"),
405 location: routing_location(source_label, source_path, node_id),
406 }),
407 };
408 }
409 serde_json::from_value::<Vec<RouteDoc>>(raw.clone()).map_err(|e| FlowError::Routing {
410 node_id: node_id.to_string(),
411 message: e.to_string(),
412 location: routing_location(source_label, source_path, node_id),
413 })
414}
415
416#[derive(Debug, Clone, Deserialize)]
417struct RouteDoc {
418 #[serde(default)]
419 pub to: Option<String>,
420 #[allow(dead_code)]
421 #[serde(default)]
422 pub out: Option<bool>,
423 #[allow(dead_code)]
424 #[serde(default)]
425 pub status: Option<String>,
426 #[allow(dead_code)]
427 #[serde(default)]
428 pub reply: Option<bool>,
429 #[allow(dead_code)]
430 #[serde(default)]
431 pub condition: Option<String>,
432}
433
434fn validate_json(
435 doc: &Value,
436 schema_text: &str,
437 schema_label: &str,
438 schema_path: Option<&Path>,
439 source_label: &str,
440 source_path: Option<&Path>,
441) -> Result<()> {
442 let validator = validator_for_schema(schema_text, schema_label, schema_path)?;
443 let details: Vec<SchemaErrorDetail> = validator
444 .iter_errors(doc)
445 .map(|e| {
446 let pointer = e.instance_path().to_string();
447 let pointer = if pointer.is_empty() {
448 "/".to_string()
449 } else {
450 pointer
451 };
452 SchemaErrorDetail {
453 message: e.to_string(),
454 location: FlowErrorLocation::at_path(format!("{source_label}{pointer}"))
455 .with_source_path(source_path)
456 .with_json_pointer(Some(pointer.clone())),
457 }
458 })
459 .collect();
460 if !details.is_empty() {
461 let message = details
462 .iter()
463 .map(|detail| {
464 let where_str = detail
465 .location
466 .describe()
467 .unwrap_or_else(|| source_label.to_string());
468 format!("{where_str}: {}", detail.message)
469 })
470 .collect::<Vec<_>>()
471 .join("\n");
472 return Err(FlowError::Schema {
473 message,
474 details,
475 location: FlowErrorLocation::at_path(source_label.to_string())
476 .with_source_path(source_path),
477 });
478 }
479 Ok(())
480}
481
482fn validator_for_schema<'a>(
483 schema_text: &'a str,
484 schema_label: &str,
485 schema_path: Option<&Path>,
486) -> Result<&'a jsonschema::Validator> {
487 if schema_path.is_none() && schema_text == EMBEDDED_SCHEMA {
488 static EMBEDDED_VALIDATOR: OnceLock<std::result::Result<jsonschema::Validator, String>> =
489 OnceLock::new();
490 let validator = EMBEDDED_VALIDATOR
491 .get_or_init(|| {
492 let schema: Value = serde_json::from_str(EMBEDDED_SCHEMA)
493 .map_err(|e| format!("schema parse for {DEFAULT_SCHEMA_LABEL}: {e}"))?;
494 jsonschema_options_with_base(None)
495 .build(&schema)
496 .map_err(|e| format!("schema compile for {DEFAULT_SCHEMA_LABEL}: {e}"))
497 })
498 .as_ref()
499 .map_err(|message| FlowError::Internal {
500 message: message.clone(),
501 location: FlowErrorLocation::at_path(schema_label.to_string())
502 .with_source_path(schema_path),
503 })?;
504 return Ok(validator);
505 }
506
507 let schema: Value = serde_json::from_str(schema_text).map_err(|e| FlowError::Internal {
508 message: format!("schema parse for {schema_label}: {e}"),
509 location: FlowErrorLocation::at_path(schema_label.to_string())
510 .with_source_path(schema_path),
511 })?;
512 let validator = jsonschema_options_with_base(schema_path)
513 .build(&schema)
514 .map_err(|e| FlowError::Internal {
515 message: format!("schema compile for {schema_label}: {e}"),
516 location: FlowErrorLocation::at_path(schema_label.to_string())
517 .with_source_path(schema_path),
518 })?;
519 Ok(Box::leak(Box::new(validator)))
520}
521
522fn ensure_nodes_mapping(doc: &mut serde_yaml_bw::Value) {
523 let Some(mapping) = doc.as_mapping_mut() else {
524 return;
525 };
526 let nodes_key = serde_yaml_bw::Value::String("nodes".to_string(), None);
527 match mapping.get_mut(&nodes_key) {
528 Some(existing) => {
529 if existing.is_null() {
530 *existing = serde_yaml_bw::Value::Mapping(serde_yaml_bw::Mapping::new());
531 }
532 }
533 None => {
534 mapping.insert(
535 nodes_key,
536 serde_yaml_bw::Value::Mapping(serde_yaml_bw::Mapping::new()),
537 );
538 }
539 }
540}
541
542fn node_location(
543 source_label: &str,
544 source_path: Option<&Path>,
545 node_id: &str,
546) -> FlowErrorLocation {
547 FlowErrorLocation::at_path(format!("{source_label}::nodes.{node_id}"))
548 .with_source_path(source_path)
549}
550
551fn routing_location(
552 source_label: &str,
553 source_path: Option<&Path>,
554 node_id: &str,
555) -> FlowErrorLocation {
556 FlowErrorLocation::at_path(format!("{source_label}::nodes.{node_id}.routing"))
557 .with_source_path(source_path)
558}
559
560pub(crate) fn yaml_error_location(
561 source_label: &str,
562 source_path: Option<&Path>,
563 loc: Option<YamlLocation>,
564) -> FlowErrorLocation {
565 if let Some(loc) = loc {
566 FlowErrorLocation::at_path_with_position(
567 source_label.to_string(),
568 Some(loc.line()),
569 Some(loc.column()),
570 )
571 .with_source_path(source_path)
572 } else {
573 FlowErrorLocation::at_path(source_label.to_string()).with_source_path(source_path)
574 }
575}