Skip to main content

greentic_flow/
flow_bundle.rs

1use crate::{
2    error::{FlowError, FlowErrorLocation, Result},
3    loader,
4};
5use blake3::Hasher;
6use greentic_types::Flow;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::path::Path;
10
11const INLINE_SOURCE_LABEL: &str = "<inline>";
12const EMBEDDED_SCHEMA: &str = include_str!("../schemas/ygtc.flow.schema.json");
13const DEFAULT_SCHEMA_LABEL: &str = "https://raw.githubusercontent.com/greenticai/greentic-flow/refs/heads/master/schemas/ygtc.flow.schema.json";
14
15pub type NodeId = String;
16
17#[derive(Clone, Debug, Serialize, Deserialize)]
18pub struct ComponentPin {
19    pub name: String,
20    pub version_req: String,
21}
22
23#[derive(Clone, Debug, Serialize, Deserialize)]
24pub struct NodeRef {
25    pub node_id: String,
26    pub component: ComponentPin,
27    pub schema_id: Option<String>,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowBundle {
32    pub id: String,
33    pub kind: String,
34    pub entry: String,
35    pub yaml: String,
36    pub json: Value,
37    pub hash_blake3: String,
38    pub nodes: Vec<NodeRef>,
39}
40
41/// Canonicalize a JSON value by sorting object keys recursively.
42pub fn canonicalize_json(value: &Value) -> Value {
43    match value {
44        Value::Object(map) => {
45            let mut keys: Vec<_> = map.keys().collect();
46            keys.sort();
47            let mut ordered = serde_json::Map::with_capacity(map.len());
48            for key in keys {
49                ordered.insert(key.clone(), canonicalize_json(&map[key]));
50            }
51            Value::Object(ordered)
52        }
53        Value::Array(items) => Value::Array(items.iter().map(canonicalize_json).collect()),
54        _ => value.clone(),
55    }
56}
57
58/// Compute a lowercase hex-encoded BLAKE3 hash for the provided bytes.
59pub fn blake3_hex(bytes: impl AsRef<[u8]>) -> String {
60    let mut hasher = Hasher::new();
61    hasher.update(bytes.as_ref());
62    let hash = hasher.finalize();
63    hash.to_hex().to_string()
64}
65
66/// Extract component pins from the IR.
67pub fn extract_component_pins(flow: &Flow) -> Vec<(NodeId, ComponentPin)> {
68    flow.nodes
69        .iter()
70        .map(|(node_id, node)| {
71            let component_name = if node.component.id.as_str() == "component.exec" {
72                node.component
73                    .operation
74                    .clone()
75                    .unwrap_or_else(|| "component.exec".to_string())
76            } else {
77                node.component.id.as_str().to_string()
78            };
79            (
80                node_id.to_string(),
81                ComponentPin {
82                    name: component_name,
83                    version_req: "*".to_string(),
84                },
85            )
86        })
87        .collect()
88}
89
90/// Load YAML into a canonical [`FlowBundle`] using the embedded schema.
91pub fn load_and_validate_bundle(yaml: &str, source: Option<&Path>) -> Result<FlowBundle> {
92    load_and_validate_bundle_with_schema_text(
93        yaml,
94        EMBEDDED_SCHEMA,
95        DEFAULT_SCHEMA_LABEL.to_string(),
96        None,
97        source,
98    )
99    .map(|(bundle, _)| bundle)
100}
101
102pub fn load_and_validate_bundle_with_flow(
103    yaml: &str,
104    source: Option<&Path>,
105) -> Result<(FlowBundle, Flow)> {
106    load_and_validate_bundle_with_schema_text(
107        yaml,
108        EMBEDDED_SCHEMA,
109        DEFAULT_SCHEMA_LABEL.to_string(),
110        None,
111        source,
112    )
113}
114
115pub fn load_and_validate_bundle_with_schema_text(
116    yaml: &str,
117    schema_text: &str,
118    schema_label: impl Into<String>,
119    schema_path: Option<&Path>,
120    source: Option<&Path>,
121) -> Result<(FlowBundle, Flow)> {
122    let schema_label = schema_label.into();
123    let source_label = source
124        .map(|p| p.display().to_string())
125        .unwrap_or_else(|| INLINE_SOURCE_LABEL.to_string());
126
127    let flow_doc = loader::load_with_schema_text(
128        yaml,
129        schema_text,
130        schema_label,
131        schema_path,
132        source_label.clone(),
133        source,
134    )?;
135
136    let flow_json = serde_json::to_value(&flow_doc).map_err(|e| FlowError::Internal {
137        message: format!("flow serialization: {e}"),
138        location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source),
139    })?;
140    let canonical_json = canonicalize_json(&flow_json);
141    let json_bytes = serde_json::to_vec(&canonical_json).map_err(|e| FlowError::Internal {
142        message: format!("canonical json encode: {e}"),
143        location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source),
144    })?;
145    let hash_blake3 = blake3_hex(&json_bytes);
146
147    let bundle_id = flow_doc.id.clone();
148    let bundle_kind = flow_doc.flow_type.clone();
149    let bundle_entry = resolve_entry(&flow_doc);
150    let flow = crate::compile_flow(flow_doc)?;
151    let bundle = build_bundle_from_parts(
152        bundle_id,
153        bundle_kind,
154        bundle_entry,
155        &flow,
156        yaml,
157        canonical_json,
158        hash_blake3,
159    );
160
161    Ok((bundle, flow))
162}
163
164fn build_bundle_from_parts(
165    id: String,
166    kind: String,
167    entry: String,
168    flow: &Flow,
169    yaml: &str,
170    canonical_json: Value,
171    hash_blake3: String,
172) -> FlowBundle {
173    let nodes = extract_component_pins(flow)
174        .into_iter()
175        .map(|(node_id, component)| NodeRef {
176            node_id,
177            component,
178            schema_id: None,
179        })
180        .collect();
181
182    FlowBundle {
183        id,
184        kind,
185        entry,
186        yaml: yaml.to_string(),
187        json: canonical_json,
188        hash_blake3,
189        nodes,
190    }
191}
192
193fn resolve_entry(doc: &crate::model::FlowDoc) -> String {
194    if let Some(entry) = &doc.start {
195        return entry.clone();
196    }
197    if doc.nodes.contains_key("in") {
198        return "in".to_string();
199    }
200    doc.nodes.keys().next().cloned().unwrap_or_default()
201}