1use crate::{
2 error::{FlowError, FlowErrorLocation, Result},
3 ir::FlowIR,
4 loader, to_ir,
5};
6use blake3::Hasher;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::path::Path;
10
11const INLINE_SOURCE_LABEL: &str = "<inline>";
12const EMBEDDED_SCHEMA: &str = include_str!("../schemas/ygtc.flow.schema.json");
13const DEFAULT_SCHEMA_LABEL: &str = "https://raw.githubusercontent.com/greentic-ai/greentic-flow/refs/heads/master/schemas/ygtc.flow.schema.json";
14
15pub type NodeId = String;
16
17#[derive(Clone, Debug, Serialize, Deserialize)]
18pub struct ComponentPin {
19 pub name: String,
20 pub version_req: String,
21}
22
23#[derive(Clone, Debug, Serialize, Deserialize)]
24pub struct NodeRef {
25 pub node_id: String,
26 pub component: ComponentPin,
27 pub schema_id: Option<String>,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowBundle {
32 pub id: String,
33 pub kind: String,
34 pub entry: String,
35 pub yaml: String,
36 pub json: Value,
37 pub hash_blake3: String,
38 pub nodes: Vec<NodeRef>,
39}
40
41pub fn canonicalize_json(value: &Value) -> Value {
43 match value {
44 Value::Object(map) => {
45 let mut keys: Vec<_> = map.keys().collect();
46 keys.sort();
47 let mut ordered = serde_json::Map::with_capacity(map.len());
48 for key in keys {
49 ordered.insert(key.clone(), canonicalize_json(&map[key]));
50 }
51 Value::Object(ordered)
52 }
53 Value::Array(items) => Value::Array(items.iter().map(canonicalize_json).collect()),
54 _ => value.clone(),
55 }
56}
57
58pub fn blake3_hex(bytes: impl AsRef<[u8]>) -> String {
60 let mut hasher = Hasher::new();
61 hasher.update(bytes.as_ref());
62 let hash = hasher.finalize();
63 hash.to_hex().to_string()
64}
65
66pub fn extract_component_pins(ir: &FlowIR) -> Vec<(NodeId, ComponentPin)> {
68 ir.nodes
69 .iter()
70 .map(|(node_id, node)| {
71 (
72 node_id.clone(),
73 ComponentPin {
74 name: node.component.clone(),
75 version_req: "*".to_string(),
76 },
77 )
78 })
79 .collect()
80}
81
82pub fn load_and_validate_bundle(yaml: &str, source: Option<&Path>) -> Result<FlowBundle> {
84 load_and_validate_bundle_with_schema_text(
85 yaml,
86 EMBEDDED_SCHEMA,
87 DEFAULT_SCHEMA_LABEL.to_string(),
88 None,
89 source,
90 )
91 .map(|(bundle, _)| bundle)
92}
93
94pub fn load_and_validate_bundle_with_ir(
95 yaml: &str,
96 source: Option<&Path>,
97) -> Result<(FlowBundle, FlowIR)> {
98 load_and_validate_bundle_with_schema_text(
99 yaml,
100 EMBEDDED_SCHEMA,
101 DEFAULT_SCHEMA_LABEL.to_string(),
102 None,
103 source,
104 )
105}
106
107pub fn load_and_validate_bundle_with_schema_text(
108 yaml: &str,
109 schema_text: &str,
110 schema_label: impl Into<String>,
111 schema_path: Option<&Path>,
112 source: Option<&Path>,
113) -> Result<(FlowBundle, FlowIR)> {
114 let schema_label = schema_label.into();
115 let source_label = source
116 .map(|p| p.display().to_string())
117 .unwrap_or_else(|| INLINE_SOURCE_LABEL.to_string());
118
119 let flow = loader::load_with_schema_text(
120 yaml,
121 schema_text,
122 schema_label,
123 schema_path,
124 source_label.clone(),
125 source,
126 )?;
127
128 let flow_json = serde_json::to_value(&flow).map_err(|e| FlowError::Internal {
129 message: format!("flow serialization: {e}"),
130 location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source),
131 })?;
132 let canonical_json = canonicalize_json(&flow_json);
133 let json_bytes = serde_json::to_vec(&canonical_json).map_err(|e| FlowError::Internal {
134 message: format!("canonical json encode: {e}"),
135 location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source),
136 })?;
137 let hash_blake3 = blake3_hex(&json_bytes);
138
139 let ir = to_ir(flow)?;
140 let bundle = build_bundle_from_parts(&ir, yaml, canonical_json, hash_blake3);
141
142 Ok((bundle, ir))
143}
144
145fn build_bundle_from_parts(
146 ir: &FlowIR,
147 yaml: &str,
148 canonical_json: Value,
149 hash_blake3: String,
150) -> FlowBundle {
151 let entry = resolve_entry(ir);
152 let nodes = extract_component_pins(ir)
153 .into_iter()
154 .map(|(node_id, component)| NodeRef {
155 node_id,
156 component,
157 schema_id: None,
158 })
159 .collect();
160
161 FlowBundle {
162 id: ir.id.clone(),
163 kind: ir.flow_type.clone(),
164 entry,
165 yaml: yaml.to_string(),
166 json: canonical_json,
167 hash_blake3,
168 nodes,
169 }
170}
171
172fn resolve_entry(ir: &FlowIR) -> String {
173 if let Some(entry) = &ir.start {
174 return entry.clone();
175 }
176 if ir.nodes.contains_key("in") {
177 return "in".to_string();
178 }
179 ir.nodes.keys().next().cloned().unwrap_or_default()
180}