Skip to main content

ergo_runtime/
provenance.rs

1use std::collections::BTreeSet;
2use std::fmt;
3
4use serde::Serialize;
5use sha2::{Digest, Sha256};
6
7use crate::cluster::{
8    Cardinality, ExpandedEndpoint, ExpandedGraph, ExpandedNode, ParameterType, ParameterValue,
9    PrimitiveCatalog, PrimitiveKind, ValueType,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum RuntimeProvenanceScheme {
14    Rpv1,
15}
16
17impl RuntimeProvenanceScheme {
18    pub fn prefix(self) -> &'static str {
19        match self {
20            Self::Rpv1 => "rpv1",
21        }
22    }
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26#[non_exhaustive]
27pub enum RuntimeProvenanceError {
28    MissingPrimitiveMetadata { impl_id: String, version: String },
29    NonFiniteFloat { context: String },
30    Serialize(String),
31}
32
33impl fmt::Display for RuntimeProvenanceError {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        match self {
36            Self::MissingPrimitiveMetadata { impl_id, version } => {
37                write!(
38                    f,
39                    "missing primitive metadata for '{}@{}'",
40                    impl_id, version
41                )
42            }
43            Self::NonFiniteFloat { context } => {
44                write!(f, "non-finite float in runtime provenance ({context})")
45            }
46            Self::Serialize(msg) => write!(f, "runtime provenance serialization failed: {msg}"),
47        }
48    }
49}
50
51impl std::error::Error for RuntimeProvenanceError {}
52
53pub fn compute_runtime_provenance<C: PrimitiveCatalog>(
54    scheme: RuntimeProvenanceScheme,
55    graph_id: &str,
56    graph: &ExpandedGraph,
57    catalog: &C,
58) -> Result<String, RuntimeProvenanceError> {
59    match scheme {
60        RuntimeProvenanceScheme::Rpv1 => compute_rpv1(graph_id, graph, catalog),
61    }
62}
63
64fn compute_rpv1<C: PrimitiveCatalog>(
65    graph_id: &str,
66    graph: &ExpandedGraph,
67    catalog: &C,
68) -> Result<String, RuntimeProvenanceError> {
69    let input = RuntimeProvenanceV1Input::from_graph(graph_id, graph, catalog)?;
70    let bytes = serde_json::to_vec(&input)
71        .map_err(|err| RuntimeProvenanceError::Serialize(err.to_string()))?;
72    let mut hasher = Sha256::new();
73    hasher.update(&bytes);
74    let digest = hasher.finalize();
75    Ok(format!(
76        "{}:sha256:{}",
77        RuntimeProvenanceScheme::Rpv1.prefix(),
78        to_hex(&digest)
79    ))
80}
81
82fn to_hex(bytes: &[u8]) -> String {
83    let mut out = String::with_capacity(bytes.len() * 2);
84    for byte in bytes {
85        use std::fmt::Write as _;
86        let _ = write!(&mut out, "{byte:02x}");
87    }
88    out
89}
90
91#[derive(Debug, Serialize)]
92struct RuntimeProvenanceV1Input {
93    graph_id: String,
94    nodes: Vec<ProvenanceNode>,
95    edges: Vec<ProvenanceEdge>,
96    primitives: Vec<ProvenancePrimitiveMeta>,
97}
98
99impl RuntimeProvenanceV1Input {
100    fn from_graph<C: PrimitiveCatalog>(
101        graph_id: &str,
102        graph: &ExpandedGraph,
103        catalog: &C,
104    ) -> Result<Self, RuntimeProvenanceError> {
105        let mut nodes = graph.nodes.values().collect::<Vec<_>>();
106        nodes.sort_by(|a, b| a.runtime_id.cmp(&b.runtime_id));
107        let nodes = nodes
108            .into_iter()
109            .map(ProvenanceNode::from_expanded_node)
110            .collect::<Result<Vec<_>, _>>()?;
111
112        let mut edges = graph
113            .edges
114            .iter()
115            .map(ProvenanceEdge::from_expanded_edge)
116            .collect::<Vec<_>>();
117        edges.sort_by_key(|a| a.sort_key());
118
119        let mut used = BTreeSet::<(String, String)>::new();
120        for node in graph.nodes.values() {
121            used.insert((
122                node.implementation.impl_id.clone(),
123                node.implementation.version.clone(),
124            ));
125        }
126        let mut primitives = Vec::with_capacity(used.len());
127        for (impl_id, resolved_version) in used {
128            let meta = catalog.get(&impl_id, &resolved_version).ok_or_else(|| {
129                RuntimeProvenanceError::MissingPrimitiveMetadata {
130                    impl_id: impl_id.clone(),
131                    version: resolved_version.clone(),
132                }
133            })?;
134            primitives.push(ProvenancePrimitiveMeta::from_meta(
135                impl_id,
136                resolved_version,
137                &meta,
138            )?);
139        }
140
141        Ok(Self {
142            graph_id: graph_id.to_string(),
143            nodes,
144            edges,
145            primitives,
146        })
147    }
148}
149
150#[derive(Debug, Serialize)]
151struct ProvenanceNode {
152    runtime_id: String,
153    impl_id: String,
154    requested_version: String,
155    resolved_version: String,
156    parameters: Vec<ProvenanceBoundParam>,
157}
158
159impl ProvenanceNode {
160    fn from_expanded_node(node: &ExpandedNode) -> Result<Self, RuntimeProvenanceError> {
161        let mut params = node
162            .parameters
163            .iter()
164            .map(|(name, value)| ProvenanceBoundParam::new(name, value))
165            .collect::<Result<Vec<_>, _>>()?;
166        params.sort_by(|a, b| a.name.cmp(&b.name));
167        Ok(Self {
168            runtime_id: node.runtime_id.clone(),
169            impl_id: node.implementation.impl_id.clone(),
170            requested_version: node.implementation.requested_version.clone(),
171            resolved_version: node.implementation.version.clone(),
172            parameters: params,
173        })
174    }
175}
176
177#[derive(Debug, Serialize)]
178struct ProvenanceBoundParam {
179    name: String,
180    value: CanonicalParameterValue,
181}
182
183impl ProvenanceBoundParam {
184    fn new(name: &str, value: &ParameterValue) -> Result<Self, RuntimeProvenanceError> {
185        Ok(Self {
186            name: name.to_string(),
187            value: CanonicalParameterValue::from_parameter_value(
188                value,
189                format!("bound parameter '{name}'"),
190            )?,
191        })
192    }
193}
194
195#[derive(Debug, Serialize)]
196#[serde(tag = "kind", content = "value")]
197enum CanonicalParameterValue {
198    Int(i64),
199    Number(f64),
200    Bool(bool),
201    String(String),
202    Enum(String),
203}
204
205impl CanonicalParameterValue {
206    fn from_parameter_value(
207        value: &ParameterValue,
208        context: String,
209    ) -> Result<Self, RuntimeProvenanceError> {
210        Ok(match value {
211            ParameterValue::Int(v) => Self::Int(*v),
212            ParameterValue::Number(v) => {
213                if !v.is_finite() {
214                    return Err(RuntimeProvenanceError::NonFiniteFloat { context });
215                }
216                Self::Number(*v)
217            }
218            ParameterValue::Bool(v) => Self::Bool(*v),
219            ParameterValue::String(v) => Self::String(v.clone()),
220            ParameterValue::Enum(v) => Self::Enum(v.clone()),
221        })
222    }
223}
224
225#[derive(Debug, Serialize)]
226struct ProvenanceEdge {
227    from: ProvenanceEndpoint,
228    to: ProvenanceEndpoint,
229}
230
231impl ProvenanceEdge {
232    fn from_expanded_edge(edge: &crate::cluster::ExpandedEdge) -> Self {
233        Self {
234            from: ProvenanceEndpoint::from_expanded_endpoint(&edge.from),
235            to: ProvenanceEndpoint::from_expanded_endpoint(&edge.to),
236        }
237    }
238
239    fn sort_key(&self) -> (String, String) {
240        (self.from.sort_key(), self.to.sort_key())
241    }
242}
243
244#[derive(Debug, Serialize)]
245#[serde(tag = "kind")]
246enum ProvenanceEndpoint {
247    NodePort { node_id: String, port_name: String },
248    ExternalInput { name: String },
249}
250
251impl ProvenanceEndpoint {
252    fn from_expanded_endpoint(endpoint: &ExpandedEndpoint) -> Self {
253        match endpoint {
254            ExpandedEndpoint::NodePort { node_id, port_name } => Self::NodePort {
255                node_id: node_id.clone(),
256                port_name: port_name.clone(),
257            },
258            ExpandedEndpoint::ExternalInput { name } => Self::ExternalInput { name: name.clone() },
259        }
260    }
261
262    fn sort_key(&self) -> String {
263        match self {
264            Self::NodePort { node_id, port_name } => format!("node:{node_id}.{port_name}"),
265            Self::ExternalInput { name } => format!("ext:{name}"),
266        }
267    }
268}
269
270#[derive(Debug, Serialize)]
271struct ProvenancePrimitiveMeta {
272    impl_id: String,
273    resolved_version: String,
274    kind: String,
275    inputs: Vec<ProvenanceInputMeta>,
276    outputs: Vec<ProvenanceOutputMeta>,
277    parameters: Vec<ProvenanceParameterMeta>,
278}
279
280impl ProvenancePrimitiveMeta {
281    fn from_meta(
282        impl_id: String,
283        resolved_version: String,
284        meta: &crate::cluster::PrimitiveMetadata,
285    ) -> Result<Self, RuntimeProvenanceError> {
286        let mut inputs = meta
287            .inputs
288            .iter()
289            .map(|input| ProvenanceInputMeta {
290                name: input.name.clone(),
291                value_type: value_type_name(&input.value_type).to_string(),
292                required: input.required,
293            })
294            .collect::<Vec<_>>();
295        inputs.sort_by(|a, b| a.name.cmp(&b.name));
296
297        let mut outputs = meta
298            .outputs
299            .iter()
300            .map(|(name, output)| ProvenanceOutputMeta {
301                name: name.clone(),
302                value_type: value_type_name(&output.value_type).to_string(),
303                cardinality: cardinality_name(&output.cardinality).to_string(),
304            })
305            .collect::<Vec<_>>();
306        outputs.sort_by(|a, b| a.name.cmp(&b.name));
307
308        let mut parameters = meta
309            .parameters
310            .iter()
311            .map(|param| {
312                Ok(ProvenanceParameterMeta {
313                    name: param.name.clone(),
314                    ty: parameter_type_name(&param.ty).to_string(),
315                    required: param.required,
316                    default: match &param.default {
317                        Some(value) => Some(CanonicalParameterValue::from_parameter_value(
318                            value,
319                            format!("default parameter '{}'", param.name),
320                        )?),
321                        None => None,
322                    },
323                })
324            })
325            .collect::<Result<Vec<_>, RuntimeProvenanceError>>()?;
326        parameters.sort_by(|a, b| a.name.cmp(&b.name));
327
328        Ok(Self {
329            impl_id,
330            resolved_version,
331            kind: primitive_kind_name(&meta.kind).to_string(),
332            inputs,
333            outputs,
334            parameters,
335        })
336    }
337}
338
339#[derive(Debug, Serialize)]
340struct ProvenanceInputMeta {
341    name: String,
342    value_type: String,
343    required: bool,
344}
345
346#[derive(Debug, Serialize)]
347struct ProvenanceOutputMeta {
348    name: String,
349    value_type: String,
350    cardinality: String,
351}
352
353#[derive(Debug, Serialize)]
354struct ProvenanceParameterMeta {
355    name: String,
356    ty: String,
357    required: bool,
358    default: Option<CanonicalParameterValue>,
359}
360
361fn value_type_name(value: &ValueType) -> &'static str {
362    match value {
363        ValueType::Number => "Number",
364        ValueType::Series => "Series",
365        ValueType::Bool => "Bool",
366        ValueType::Event => "Event",
367        ValueType::String => "String",
368    }
369}
370
371fn cardinality_name(cardinality: &Cardinality) -> &'static str {
372    match cardinality {
373        Cardinality::Single => "Single",
374        Cardinality::Multiple => "Multiple",
375    }
376}
377
378fn parameter_type_name(ty: &ParameterType) -> &'static str {
379    match ty {
380        ParameterType::Int => "Int",
381        ParameterType::Number => "Number",
382        ParameterType::Bool => "Bool",
383        ParameterType::String => "String",
384        ParameterType::Enum => "Enum",
385    }
386}
387
388fn primitive_kind_name(kind: &PrimitiveKind) -> &'static str {
389    match kind {
390        PrimitiveKind::Source => "Source",
391        PrimitiveKind::Compute => "Compute",
392        PrimitiveKind::Trigger => "Trigger",
393        PrimitiveKind::Action => "Action",
394    }
395}
396
397#[cfg(test)]
398mod tests;