greentic_flow/
flow_bundle.rs

1use crate::{
2    error::{FlowError, FlowErrorLocation, Result},
3    ir::FlowIR,
4    loader, to_ir,
5};
6use blake3::Hasher;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::path::Path;
10
11const INLINE_SOURCE_LABEL: &str = "<inline>";
12const EMBEDDED_SCHEMA: &str = include_str!("../schemas/ygtc.flow.schema.json");
13const DEFAULT_SCHEMA_LABEL: &str =
14    "https://greentic-ai.github.io/greentic-flow/schemas/ygtc.flow.schema.json";
15
16pub type NodeId = String;
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct ComponentPin {
20    pub name: String,
21    pub version_req: String,
22}
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
25pub struct NodeRef {
26    pub node_id: String,
27    pub component: ComponentPin,
28    pub schema_id: Option<String>,
29}
30
31#[derive(Clone, Debug, Serialize, Deserialize)]
32pub struct FlowBundle {
33    pub id: String,
34    pub kind: String,
35    pub entry: String,
36    pub yaml: String,
37    pub json: Value,
38    pub hash_blake3: String,
39    pub nodes: Vec<NodeRef>,
40}
41
42/// Canonicalize a JSON value by sorting object keys recursively.
43pub fn canonicalize_json(value: &Value) -> Value {
44    match value {
45        Value::Object(map) => {
46            let mut keys: Vec<_> = map.keys().collect();
47            keys.sort();
48            let mut ordered = serde_json::Map::with_capacity(map.len());
49            for key in keys {
50                ordered.insert(key.clone(), canonicalize_json(&map[key]));
51            }
52            Value::Object(ordered)
53        }
54        Value::Array(items) => Value::Array(items.iter().map(canonicalize_json).collect()),
55        _ => value.clone(),
56    }
57}
58
59/// Compute a lowercase hex-encoded BLAKE3 hash for the provided bytes.
60pub fn blake3_hex(bytes: impl AsRef<[u8]>) -> String {
61    let mut hasher = Hasher::new();
62    hasher.update(bytes.as_ref());
63    let hash = hasher.finalize();
64    hash.to_hex().to_string()
65}
66
67/// Extract component pins from the IR.
68pub fn extract_component_pins(ir: &FlowIR) -> Vec<(NodeId, ComponentPin)> {
69    ir.nodes
70        .iter()
71        .map(|(node_id, node)| {
72            (
73                node_id.clone(),
74                ComponentPin {
75                    name: node.component.clone(),
76                    version_req: "*".to_string(),
77                },
78            )
79        })
80        .collect()
81}
82
83/// Load YAML into a canonical [`FlowBundle`] using the embedded schema.
84pub fn load_and_validate_bundle(yaml: &str, source: Option<&Path>) -> Result<FlowBundle> {
85    load_and_validate_bundle_with_schema_text(
86        yaml,
87        EMBEDDED_SCHEMA,
88        DEFAULT_SCHEMA_LABEL.to_string(),
89        None,
90        source,
91    )
92    .map(|(bundle, _)| bundle)
93}
94
95pub fn load_and_validate_bundle_with_ir(
96    yaml: &str,
97    source: Option<&Path>,
98) -> Result<(FlowBundle, FlowIR)> {
99    load_and_validate_bundle_with_schema_text(
100        yaml,
101        EMBEDDED_SCHEMA,
102        DEFAULT_SCHEMA_LABEL.to_string(),
103        None,
104        source,
105    )
106}
107
108pub fn load_and_validate_bundle_with_schema_text(
109    yaml: &str,
110    schema_text: &str,
111    schema_label: impl Into<String>,
112    schema_path: Option<&Path>,
113    source: Option<&Path>,
114) -> Result<(FlowBundle, FlowIR)> {
115    let schema_label = schema_label.into();
116    let source_label = source
117        .map(|p| p.display().to_string())
118        .unwrap_or_else(|| INLINE_SOURCE_LABEL.to_string());
119
120    let flow = loader::load_with_schema_text(
121        yaml,
122        schema_text,
123        schema_label,
124        schema_path,
125        source_label.clone(),
126        source,
127    )?;
128
129    let flow_json = serde_json::to_value(&flow).map_err(|e| FlowError::Internal {
130        message: format!("flow serialization: {e}"),
131        location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source),
132    })?;
133    let canonical_json = canonicalize_json(&flow_json);
134    let json_bytes = serde_json::to_vec(&canonical_json).map_err(|e| FlowError::Internal {
135        message: format!("canonical json encode: {e}"),
136        location: FlowErrorLocation::at_path(source_label.clone()).with_source_path(source),
137    })?;
138    let hash_blake3 = blake3_hex(&json_bytes);
139
140    let ir = to_ir(flow)?;
141    let bundle = build_bundle_from_parts(&ir, yaml, canonical_json, hash_blake3);
142
143    Ok((bundle, ir))
144}
145
146fn build_bundle_from_parts(
147    ir: &FlowIR,
148    yaml: &str,
149    canonical_json: Value,
150    hash_blake3: String,
151) -> FlowBundle {
152    let entry = resolve_entry(ir);
153    let nodes = extract_component_pins(ir)
154        .into_iter()
155        .map(|(node_id, component)| NodeRef {
156            node_id,
157            component,
158            schema_id: None,
159        })
160        .collect();
161
162    FlowBundle {
163        id: ir.id.clone(),
164        kind: ir.flow_type.clone(),
165        entry,
166        yaml: yaml.to_string(),
167        json: canonical_json,
168        hash_blake3,
169        nodes,
170    }
171}
172
173fn resolve_entry(ir: &FlowIR) -> String {
174    if let Some(entry) = &ir.start {
175        return entry.clone();
176    }
177    if ir.nodes.contains_key("in") {
178        return "in".to_string();
179    }
180    ir.nodes.keys().next().cloned().unwrap_or_default()
181}