1use std::collections::BTreeSet;
2use std::fmt;
3
4use serde::Serialize;
5use sha2::{Digest, Sha256};
6
7use crate::cluster::{
8 Cardinality, ExpandedEndpoint, ExpandedGraph, ExpandedNode, ParameterType, ParameterValue,
9 PrimitiveCatalog, PrimitiveKind, ValueType,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum RuntimeProvenanceScheme {
14 Rpv1,
15}
16
17impl RuntimeProvenanceScheme {
18 pub fn prefix(self) -> &'static str {
19 match self {
20 Self::Rpv1 => "rpv1",
21 }
22 }
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26#[non_exhaustive]
27pub enum RuntimeProvenanceError {
28 MissingPrimitiveMetadata { impl_id: String, version: String },
29 NonFiniteFloat { context: String },
30 Serialize(String),
31}
32
33impl fmt::Display for RuntimeProvenanceError {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 match self {
36 Self::MissingPrimitiveMetadata { impl_id, version } => {
37 write!(
38 f,
39 "missing primitive metadata for '{}@{}'",
40 impl_id, version
41 )
42 }
43 Self::NonFiniteFloat { context } => {
44 write!(f, "non-finite float in runtime provenance ({context})")
45 }
46 Self::Serialize(msg) => write!(f, "runtime provenance serialization failed: {msg}"),
47 }
48 }
49}
50
51impl std::error::Error for RuntimeProvenanceError {}
52
53pub fn compute_runtime_provenance<C: PrimitiveCatalog>(
54 scheme: RuntimeProvenanceScheme,
55 graph_id: &str,
56 graph: &ExpandedGraph,
57 catalog: &C,
58) -> Result<String, RuntimeProvenanceError> {
59 match scheme {
60 RuntimeProvenanceScheme::Rpv1 => compute_rpv1(graph_id, graph, catalog),
61 }
62}
63
64fn compute_rpv1<C: PrimitiveCatalog>(
65 graph_id: &str,
66 graph: &ExpandedGraph,
67 catalog: &C,
68) -> Result<String, RuntimeProvenanceError> {
69 let input = RuntimeProvenanceV1Input::from_graph(graph_id, graph, catalog)?;
70 let bytes = serde_json::to_vec(&input)
71 .map_err(|err| RuntimeProvenanceError::Serialize(err.to_string()))?;
72 let mut hasher = Sha256::new();
73 hasher.update(&bytes);
74 let digest = hasher.finalize();
75 Ok(format!(
76 "{}:sha256:{}",
77 RuntimeProvenanceScheme::Rpv1.prefix(),
78 to_hex(&digest)
79 ))
80}
81
82fn to_hex(bytes: &[u8]) -> String {
83 let mut out = String::with_capacity(bytes.len() * 2);
84 for byte in bytes {
85 use std::fmt::Write as _;
86 let _ = write!(&mut out, "{byte:02x}");
87 }
88 out
89}
90
91#[derive(Debug, Serialize)]
92struct RuntimeProvenanceV1Input {
93 graph_id: String,
94 nodes: Vec<ProvenanceNode>,
95 edges: Vec<ProvenanceEdge>,
96 primitives: Vec<ProvenancePrimitiveMeta>,
97}
98
99impl RuntimeProvenanceV1Input {
100 fn from_graph<C: PrimitiveCatalog>(
101 graph_id: &str,
102 graph: &ExpandedGraph,
103 catalog: &C,
104 ) -> Result<Self, RuntimeProvenanceError> {
105 let mut nodes = graph.nodes.values().collect::<Vec<_>>();
106 nodes.sort_by(|a, b| a.runtime_id.cmp(&b.runtime_id));
107 let nodes = nodes
108 .into_iter()
109 .map(ProvenanceNode::from_expanded_node)
110 .collect::<Result<Vec<_>, _>>()?;
111
112 let mut edges = graph
113 .edges
114 .iter()
115 .map(ProvenanceEdge::from_expanded_edge)
116 .collect::<Vec<_>>();
117 edges.sort_by_key(|a| a.sort_key());
118
119 let mut used = BTreeSet::<(String, String)>::new();
120 for node in graph.nodes.values() {
121 used.insert((
122 node.implementation.impl_id.clone(),
123 node.implementation.version.clone(),
124 ));
125 }
126 let mut primitives = Vec::with_capacity(used.len());
127 for (impl_id, resolved_version) in used {
128 let meta = catalog.get(&impl_id, &resolved_version).ok_or_else(|| {
129 RuntimeProvenanceError::MissingPrimitiveMetadata {
130 impl_id: impl_id.clone(),
131 version: resolved_version.clone(),
132 }
133 })?;
134 primitives.push(ProvenancePrimitiveMeta::from_meta(
135 impl_id,
136 resolved_version,
137 &meta,
138 )?);
139 }
140
141 Ok(Self {
142 graph_id: graph_id.to_string(),
143 nodes,
144 edges,
145 primitives,
146 })
147 }
148}
149
150#[derive(Debug, Serialize)]
151struct ProvenanceNode {
152 runtime_id: String,
153 impl_id: String,
154 requested_version: String,
155 resolved_version: String,
156 parameters: Vec<ProvenanceBoundParam>,
157}
158
159impl ProvenanceNode {
160 fn from_expanded_node(node: &ExpandedNode) -> Result<Self, RuntimeProvenanceError> {
161 let mut params = node
162 .parameters
163 .iter()
164 .map(|(name, value)| ProvenanceBoundParam::new(name, value))
165 .collect::<Result<Vec<_>, _>>()?;
166 params.sort_by(|a, b| a.name.cmp(&b.name));
167 Ok(Self {
168 runtime_id: node.runtime_id.clone(),
169 impl_id: node.implementation.impl_id.clone(),
170 requested_version: node.implementation.requested_version.clone(),
171 resolved_version: node.implementation.version.clone(),
172 parameters: params,
173 })
174 }
175}
176
177#[derive(Debug, Serialize)]
178struct ProvenanceBoundParam {
179 name: String,
180 value: CanonicalParameterValue,
181}
182
183impl ProvenanceBoundParam {
184 fn new(name: &str, value: &ParameterValue) -> Result<Self, RuntimeProvenanceError> {
185 Ok(Self {
186 name: name.to_string(),
187 value: CanonicalParameterValue::from_parameter_value(
188 value,
189 format!("bound parameter '{name}'"),
190 )?,
191 })
192 }
193}
194
195#[derive(Debug, Serialize)]
196#[serde(tag = "kind", content = "value")]
197enum CanonicalParameterValue {
198 Int(i64),
199 Number(f64),
200 Bool(bool),
201 String(String),
202 Enum(String),
203}
204
205impl CanonicalParameterValue {
206 fn from_parameter_value(
207 value: &ParameterValue,
208 context: String,
209 ) -> Result<Self, RuntimeProvenanceError> {
210 Ok(match value {
211 ParameterValue::Int(v) => Self::Int(*v),
212 ParameterValue::Number(v) => {
213 if !v.is_finite() {
214 return Err(RuntimeProvenanceError::NonFiniteFloat { context });
215 }
216 Self::Number(*v)
217 }
218 ParameterValue::Bool(v) => Self::Bool(*v),
219 ParameterValue::String(v) => Self::String(v.clone()),
220 ParameterValue::Enum(v) => Self::Enum(v.clone()),
221 })
222 }
223}
224
225#[derive(Debug, Serialize)]
226struct ProvenanceEdge {
227 from: ProvenanceEndpoint,
228 to: ProvenanceEndpoint,
229}
230
231impl ProvenanceEdge {
232 fn from_expanded_edge(edge: &crate::cluster::ExpandedEdge) -> Self {
233 Self {
234 from: ProvenanceEndpoint::from_expanded_endpoint(&edge.from),
235 to: ProvenanceEndpoint::from_expanded_endpoint(&edge.to),
236 }
237 }
238
239 fn sort_key(&self) -> (String, String) {
240 (self.from.sort_key(), self.to.sort_key())
241 }
242}
243
244#[derive(Debug, Serialize)]
245#[serde(tag = "kind")]
246enum ProvenanceEndpoint {
247 NodePort { node_id: String, port_name: String },
248 ExternalInput { name: String },
249}
250
251impl ProvenanceEndpoint {
252 fn from_expanded_endpoint(endpoint: &ExpandedEndpoint) -> Self {
253 match endpoint {
254 ExpandedEndpoint::NodePort { node_id, port_name } => Self::NodePort {
255 node_id: node_id.clone(),
256 port_name: port_name.clone(),
257 },
258 ExpandedEndpoint::ExternalInput { name } => Self::ExternalInput { name: name.clone() },
259 }
260 }
261
262 fn sort_key(&self) -> String {
263 match self {
264 Self::NodePort { node_id, port_name } => format!("node:{node_id}.{port_name}"),
265 Self::ExternalInput { name } => format!("ext:{name}"),
266 }
267 }
268}
269
270#[derive(Debug, Serialize)]
271struct ProvenancePrimitiveMeta {
272 impl_id: String,
273 resolved_version: String,
274 kind: String,
275 inputs: Vec<ProvenanceInputMeta>,
276 outputs: Vec<ProvenanceOutputMeta>,
277 parameters: Vec<ProvenanceParameterMeta>,
278}
279
280impl ProvenancePrimitiveMeta {
281 fn from_meta(
282 impl_id: String,
283 resolved_version: String,
284 meta: &crate::cluster::PrimitiveMetadata,
285 ) -> Result<Self, RuntimeProvenanceError> {
286 let mut inputs = meta
287 .inputs
288 .iter()
289 .map(|input| ProvenanceInputMeta {
290 name: input.name.clone(),
291 value_type: value_type_name(&input.value_type).to_string(),
292 required: input.required,
293 })
294 .collect::<Vec<_>>();
295 inputs.sort_by(|a, b| a.name.cmp(&b.name));
296
297 let mut outputs = meta
298 .outputs
299 .iter()
300 .map(|(name, output)| ProvenanceOutputMeta {
301 name: name.clone(),
302 value_type: value_type_name(&output.value_type).to_string(),
303 cardinality: cardinality_name(&output.cardinality).to_string(),
304 })
305 .collect::<Vec<_>>();
306 outputs.sort_by(|a, b| a.name.cmp(&b.name));
307
308 let mut parameters = meta
309 .parameters
310 .iter()
311 .map(|param| {
312 Ok(ProvenanceParameterMeta {
313 name: param.name.clone(),
314 ty: parameter_type_name(¶m.ty).to_string(),
315 required: param.required,
316 default: match ¶m.default {
317 Some(value) => Some(CanonicalParameterValue::from_parameter_value(
318 value,
319 format!("default parameter '{}'", param.name),
320 )?),
321 None => None,
322 },
323 })
324 })
325 .collect::<Result<Vec<_>, RuntimeProvenanceError>>()?;
326 parameters.sort_by(|a, b| a.name.cmp(&b.name));
327
328 Ok(Self {
329 impl_id,
330 resolved_version,
331 kind: primitive_kind_name(&meta.kind).to_string(),
332 inputs,
333 outputs,
334 parameters,
335 })
336 }
337}
338
339#[derive(Debug, Serialize)]
340struct ProvenanceInputMeta {
341 name: String,
342 value_type: String,
343 required: bool,
344}
345
346#[derive(Debug, Serialize)]
347struct ProvenanceOutputMeta {
348 name: String,
349 value_type: String,
350 cardinality: String,
351}
352
353#[derive(Debug, Serialize)]
354struct ProvenanceParameterMeta {
355 name: String,
356 ty: String,
357 required: bool,
358 default: Option<CanonicalParameterValue>,
359}
360
361fn value_type_name(value: &ValueType) -> &'static str {
362 match value {
363 ValueType::Number => "Number",
364 ValueType::Series => "Series",
365 ValueType::Bool => "Bool",
366 ValueType::Event => "Event",
367 ValueType::String => "String",
368 }
369}
370
371fn cardinality_name(cardinality: &Cardinality) -> &'static str {
372 match cardinality {
373 Cardinality::Single => "Single",
374 Cardinality::Multiple => "Multiple",
375 }
376}
377
378fn parameter_type_name(ty: &ParameterType) -> &'static str {
379 match ty {
380 ParameterType::Int => "Int",
381 ParameterType::Number => "Number",
382 ParameterType::Bool => "Bool",
383 ParameterType::String => "String",
384 ParameterType::Enum => "Enum",
385 }
386}
387
388fn primitive_kind_name(kind: &PrimitiveKind) -> &'static str {
389 match kind {
390 PrimitiveKind::Source => "Source",
391 PrimitiveKind::Compute => "Compute",
392 PrimitiveKind::Trigger => "Trigger",
393 PrimitiveKind::Action => "Action",
394 }
395}
396
397#[cfg(test)]
398mod tests;