1use crate::{
2 error::{FlowError, FlowErrorLocation, Result},
3 loader,
4};
5use blake3::Hasher;
6use greentic_types::Flow;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::path::Path;
10
11const INLINE_SOURCE_LABEL: &str = "<inline>";
12const EMBEDDED_SCHEMA: &str = include_str!("../schemas/ygtc.flow.schema.json");
13const DEFAULT_SCHEMA_LABEL: &str = "https://raw.githubusercontent.com/greenticai/greentic-flow/refs/heads/master/schemas/ygtc.flow.schema.json";
14
15pub type NodeId = String;
16
17#[derive(Clone, Debug, Serialize, Deserialize)]
18pub struct ComponentPin {
19 pub name: String,
20 pub version_req: String,
21}
22
23#[derive(Clone, Debug, Serialize, Deserialize)]
24pub struct NodeRef {
25 pub node_id: String,
26 pub component: ComponentPin,
27 pub schema_id: Option<String>,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowBundle {
32 pub id: String,
33 pub kind: String,
34 pub entry: String,
35 pub yaml: String,
36 pub json: Value,
37 pub hash_blake3: String,
38 pub nodes: Vec<NodeRef>,
39}
40
41pub fn canonicalize_json(value: &Value) -> Value {
43 match value {
44 Value::Object(map) => {
45 let mut keys: Vec<_> = map.keys().collect();
46 keys.sort();
47 let mut ordered = serde_json::Map::with_capacity(map.len());
48 for key in keys {
49 ordered.insert(key.clone(), canonicalize_json(&map[key]));
50 }
51 Value::Object(ordered)
52 }
53 Value::Array(items) => Value::Array(items.iter().map(canonicalize_json).collect()),
54 _ => value.clone(),
55 }
56}
57
58pub fn blake3_hex(bytes: impl AsRef<[u8]>) -> String {
60 let mut hasher = Hasher::new();
61 hasher.update(bytes.as_ref());
62 let hash = hasher.finalize();
63 hash.to_hex().to_string()
64}
65
66pub fn extract_component_pins(flow: &Flow) -> Vec<(NodeId, ComponentPin)> {
68 flow.nodes
69 .iter()
70 .map(|(node_id, node)| {
71 let component_name = if node.component.id.as_str() == "component.exec" {
72 node.component
73 .operation
74 .clone()
75 .unwrap_or_else(|| "component.exec".to_string())
76 } else {
77 node.component.id.as_str().to_string()
78 };
79 (
80 node_id.to_string(),
81 ComponentPin {
82 name: component_name,
83 version_req: "*".to_string(),
84 },
85 )
86 })
87 .collect()
88}
89
90pub fn load_and_validate_bundle(yaml: &str, source: Option<&Path>) -> Result<FlowBundle> {
92 load_and_validate_bundle_with_schema_text(
93 yaml,
94 EMBEDDED_SCHEMA,
95 DEFAULT_SCHEMA_LABEL.to_string(),
96 None,
97 source,
98 )
99 .map(|(bundle, _)| bundle)
100}
101
102pub fn load_and_validate_bundle_with_flow(
103 yaml: &str,
104 source: Option<&Path>,
105) -> Result<(FlowBundle, Flow)> {
106 load_and_validate_bundle_with_schema_text(
107 yaml,
108 EMBEDDED_SCHEMA,
109 DEFAULT_SCHEMA_LABEL.to_string(),
110 None,
111 source,
112 )
113}
114
115pub fn load_and_validate_bundle_with_schema_text(
116 yaml: &str,
117 schema_text: &str,
118 schema_label: impl Into<String>,
119 schema_path: Option<&Path>,
120 source: Option<&Path>,
121) -> Result<(FlowBundle, Flow)> {
122 let schema_label = schema_label.into();
123 let source_label = source
124 .map(|p| p.display().to_string())
125 .unwrap_or_else(|| INLINE_SOURCE_LABEL.to_string());
126
127 let flow_doc = loader::load_with_schema_text(
128 yaml,
129 schema_text,
130 schema_label,
131 schema_path,
132 source_label.clone(),
133 source,
134 )?;
135
136 let flow_json = serde_json::to_value(&flow_doc).map_err(|e| FlowError::Internal {
137 message: format!("flow serialization: {e}"),
138 location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source),
139 })?;
140 let canonical_json = canonicalize_json(&flow_json);
141 let json_bytes = serde_json::to_vec(&canonical_json).map_err(|e| FlowError::Internal {
142 message: format!("canonical json encode: {e}"),
143 location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source),
144 })?;
145 let hash_blake3 = blake3_hex(&json_bytes);
146
147 let bundle_id = flow_doc.id.clone();
148 let bundle_kind = flow_doc.flow_type.clone();
149 let bundle_entry = resolve_entry(&flow_doc);
150 let flow = crate::compile_flow(flow_doc)?;
151 let bundle = build_bundle_from_parts(
152 bundle_id,
153 bundle_kind,
154 bundle_entry,
155 &flow,
156 yaml,
157 canonical_json,
158 hash_blake3,
159 );
160
161 Ok((bundle, flow))
162}
163
164fn build_bundle_from_parts(
165 id: String,
166 kind: String,
167 entry: String,
168 flow: &Flow,
169 yaml: &str,
170 canonical_json: Value,
171 hash_blake3: String,
172) -> FlowBundle {
173 let nodes = extract_component_pins(flow)
174 .into_iter()
175 .map(|(node_id, component)| NodeRef {
176 node_id,
177 component,
178 schema_id: None,
179 })
180 .collect();
181
182 FlowBundle {
183 id,
184 kind,
185 entry,
186 yaml: yaml.to_string(),
187 json: canonical_json,
188 hash_blake3,
189 nodes,
190 }
191}
192
193fn resolve_entry(doc: &crate::model::FlowDoc) -> String {
194 if let Some(entry) = &doc.start {
195 return entry.clone();
196 }
197 if doc.nodes.contains_key("in") {
198 return "in".to_string();
199 }
200 doc.nodes.keys().next().cloned().unwrap_or_default()
201}