Skip to main content

harn_vm/provenance/
mod.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use base64::Engine;
5use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
6use serde::{Deserialize, Serialize};
7use sha2::{Digest, Sha256};
8use time::OffsetDateTime;
9use uuid::Uuid;
10
11use crate::event_log::{AnyEventLog, EventId, EventLog, LogError, LogEvent};
12use crate::secrets::{SecretBytes, SecretError, SecretId, SecretProvider};
13
14pub const EVENT_PROVENANCE_SCHEMA: &str = "harn-eventlog-provenance-v1";
15pub const RECEIPT_SCHEMA: &str = "harn-provenance-receipt-v1";
16pub const HEADER_SCHEMA: &str = "harn.provenance.schema";
17pub const HEADER_PREV_HASH: &str = "harn.provenance.prev_hash";
18pub const HEADER_RECORD_HASH: &str = "harn.provenance.record_hash";
19const SIGNATURE_DOMAIN: &[u8] = b"harn provenance receipt v1\n";
20const DEFAULT_AGENT_ID: &str = "harn-cli";
21
22#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
23pub struct ProvenanceReceipt {
24    pub schema: String,
25    pub receipt_id: String,
26    #[serde(with = "time::serde::rfc3339")]
27    pub issued_at: OffsetDateTime,
28    pub producer: ReceiptProducer,
29    pub run: ReceiptRun,
30    pub event_log: ReceiptEventLog,
31    pub chain: ReceiptChain,
32    #[serde(default)]
33    pub signatures: Vec<ReceiptSignature>,
34}
35
36#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
37pub struct ReceiptProducer {
38    pub name: String,
39    pub version: String,
40}
41
42#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
43pub struct ReceiptRun {
44    pub pipeline: String,
45    pub status: String,
46    pub started_at_ms: i64,
47    pub finished_at_ms: i64,
48    pub exit_code: i32,
49}
50
51#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
52pub struct ReceiptEventLog {
53    pub backend: String,
54    pub topics: Vec<String>,
55    pub events: Vec<ReceiptEvent>,
56}
57
58#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
59pub struct ReceiptEvent {
60    pub topic: String,
61    pub event_id: EventId,
62    pub kind: String,
63    pub payload: serde_json::Value,
64    pub headers: BTreeMap<String, String>,
65    pub occurred_at_ms: i64,
66    pub prev_hash: Option<String>,
67    pub record_hash: String,
68}
69
70#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
71pub struct ReceiptChain {
72    pub algorithm: String,
73    pub event_root_hash: String,
74    pub receipt_hash: String,
75}
76
77#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
78pub struct ReceiptSignature {
79    pub algorithm: String,
80    pub key_id: String,
81    pub public_key_base64: String,
82    pub signature_base64: String,
83    #[serde(with = "time::serde::rfc3339")]
84    pub signed_at: OffsetDateTime,
85}
86
87#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
88pub struct ReceiptVerificationReport {
89    pub verified: bool,
90    pub receipt_id: Option<String>,
91    pub receipt_hash: Option<String>,
92    pub event_root_hash: Option<String>,
93    pub event_count: usize,
94    pub signature_count: usize,
95    pub errors: Vec<String>,
96}
97
98#[derive(Clone, Debug)]
99pub struct ReceiptBuildOptions {
100    pub pipeline: String,
101    pub status: String,
102    pub started_at_ms: i64,
103    pub finished_at_ms: i64,
104    pub exit_code: i32,
105    pub producer_name: String,
106    pub producer_version: String,
107}
108
109pub fn prepare_event_for_append(
110    topic: &str,
111    event_id: EventId,
112    previous_hash: Option<String>,
113    mut event: LogEvent,
114) -> Result<LogEvent, LogError> {
115    event.headers.insert(
116        HEADER_SCHEMA.to_string(),
117        EVENT_PROVENANCE_SCHEMA.to_string(),
118    );
119    match previous_hash {
120        Some(previous_hash) => {
121            event
122                .headers
123                .insert(HEADER_PREV_HASH.to_string(), previous_hash);
124        }
125        None => {
126            event.headers.remove(HEADER_PREV_HASH);
127        }
128    }
129    event.headers.remove(HEADER_RECORD_HASH);
130    let record_hash = compute_event_record_hash(topic, event_id, &event)?;
131    event
132        .headers
133        .insert(HEADER_RECORD_HASH.to_string(), record_hash);
134    Ok(event)
135}
136
137pub fn event_record_hash_from_headers(
138    topic: &str,
139    event_id: EventId,
140    event: &LogEvent,
141) -> Result<String, LogError> {
142    match event.headers.get(HEADER_RECORD_HASH) {
143        Some(hash) if !hash.trim().is_empty() => Ok(hash.clone()),
144        _ => compute_event_record_hash(topic, event_id, event),
145    }
146}
147
148pub fn compute_event_record_hash(
149    topic: &str,
150    event_id: EventId,
151    event: &LogEvent,
152) -> Result<String, LogError> {
153    let mut headers = event.headers.clone();
154    headers.remove(HEADER_RECORD_HASH);
155    let value = serde_json::json!({
156        "topic": topic,
157        "event_id": event_id,
158        "kind": event.kind,
159        "payload": event.payload,
160        "headers": headers,
161        "occurred_at_ms": event.occurred_at_ms,
162    });
163    sha256_json("event log record hash", &value)
164}
165
166pub async fn load_or_generate_agent_signing_key(
167    provider: &dyn SecretProvider,
168    agent_id: Option<&str>,
169) -> Result<(SigningKey, String), SecretError> {
170    let agent_id = agent_id
171        .filter(|value| !value.trim().is_empty())
172        .unwrap_or(DEFAULT_AGENT_ID);
173    let id = SecretId::new("provenance", format!("{agent_id}.ed25519.seed"));
174    match provider.get(&id).await {
175        Ok(secret) => {
176            let seed = secret.with_exposed(decode_seed_secret)?;
177            let signing_key = SigningKey::from_bytes(&seed);
178            let key_id = key_id_for_verifying_key(&signing_key.verifying_key());
179            Ok((signing_key, key_id))
180        }
181        Err(error) if secret_error_is_not_found(&error) => {
182            let seed: [u8; 32] = rand::random();
183            let signing_key = SigningKey::from_bytes(&seed);
184            let encoded = base64::engine::general_purpose::STANDARD.encode(seed);
185            provider.put(&id, SecretBytes::from(encoded)).await?;
186            let key_id = key_id_for_verifying_key(&signing_key.verifying_key());
187            Ok((signing_key, key_id))
188        }
189        Err(error) => Err(error),
190    }
191}
192
193pub async fn build_signed_receipt(
194    log: &Arc<AnyEventLog>,
195    options: ReceiptBuildOptions,
196    signing_key: &SigningKey,
197    key_id: String,
198) -> Result<ProvenanceReceipt, LogError> {
199    let description = log.describe();
200    let mut topics = log.topics().await?;
201    topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
202
203    let mut topic_names = Vec::with_capacity(topics.len());
204    let mut events = Vec::new();
205    for topic in topics {
206        topic_names.push(topic.as_str().to_string());
207        for (event_id, event) in log.read_range(&topic, None, usize::MAX).await? {
208            let record_hash = event_record_hash_from_headers(topic.as_str(), event_id, &event)?;
209            let prev_hash = event.headers.get(HEADER_PREV_HASH).cloned();
210            events.push(ReceiptEvent {
211                topic: topic.as_str().to_string(),
212                event_id,
213                kind: event.kind,
214                payload: event.payload,
215                headers: event.headers,
216                occurred_at_ms: event.occurred_at_ms,
217                prev_hash,
218                record_hash,
219            });
220        }
221    }
222    events.sort_by(|left, right| {
223        left.topic
224            .cmp(&right.topic)
225            .then(left.event_id.cmp(&right.event_id))
226    });
227    let event_root_hash = merkle_root(events.iter().map(|event| event.record_hash.as_str()));
228    let mut receipt = ProvenanceReceipt {
229        schema: RECEIPT_SCHEMA.to_string(),
230        receipt_id: format!("receipt-{}", Uuid::now_v7()),
231        issued_at: OffsetDateTime::now_utc(),
232        producer: ReceiptProducer {
233            name: options.producer_name,
234            version: options.producer_version,
235        },
236        run: ReceiptRun {
237            pipeline: options.pipeline,
238            status: options.status,
239            started_at_ms: options.started_at_ms,
240            finished_at_ms: options.finished_at_ms,
241            exit_code: options.exit_code,
242        },
243        event_log: ReceiptEventLog {
244            backend: description.backend.to_string(),
245            topics: topic_names,
246            events,
247        },
248        chain: ReceiptChain {
249            algorithm: "sha256/ed25519".to_string(),
250            event_root_hash,
251            receipt_hash: String::new(),
252        },
253        signatures: Vec::new(),
254    };
255    let canonical = canonical_unsigned_receipt(&receipt)?;
256    let receipt_hash = sha256_bytes_prefixed(&canonical);
257    let mut message = Vec::with_capacity(SIGNATURE_DOMAIN.len() + canonical.len());
258    message.extend_from_slice(SIGNATURE_DOMAIN);
259    message.extend_from_slice(canonical.as_bytes());
260    let signature = signing_key.sign(&message);
261    let verifying_key = signing_key.verifying_key();
262    receipt.chain.receipt_hash = receipt_hash;
263    receipt.signatures.push(ReceiptSignature {
264        algorithm: "ed25519".to_string(),
265        key_id,
266        public_key_base64: base64::engine::general_purpose::STANDARD
267            .encode(verifying_key.to_bytes()),
268        signature_base64: base64::engine::general_purpose::STANDARD.encode(signature.to_bytes()),
269        signed_at: OffsetDateTime::now_utc(),
270    });
271    Ok(receipt)
272}
273
274pub fn verify_receipt(receipt: &ProvenanceReceipt) -> ReceiptVerificationReport {
275    let mut report = ReceiptVerificationReport {
276        receipt_id: Some(receipt.receipt_id.clone()),
277        receipt_hash: Some(receipt.chain.receipt_hash.clone()),
278        event_root_hash: Some(receipt.chain.event_root_hash.clone()),
279        event_count: receipt.event_log.events.len(),
280        signature_count: receipt.signatures.len(),
281        ..ReceiptVerificationReport::default()
282    };
283
284    if receipt.schema != RECEIPT_SCHEMA {
285        report
286            .errors
287            .push(format!("unsupported receipt schema '{}'", receipt.schema));
288    }
289    verify_receipt_events(receipt, &mut report);
290    verify_receipt_hash(receipt, &mut report);
291    verify_receipt_signatures(receipt, &mut report);
292    report.verified = report.errors.is_empty();
293    report
294}
295
296fn verify_receipt_events(receipt: &ProvenanceReceipt, report: &mut ReceiptVerificationReport) {
297    let mut by_topic: HashMap<&str, Vec<&ReceiptEvent>> = HashMap::new();
298    for event in &receipt.event_log.events {
299        by_topic
300            .entry(event.topic.as_str())
301            .or_default()
302            .push(event);
303    }
304
305    let mut event_hashes = Vec::with_capacity(receipt.event_log.events.len());
306    for (topic, events) in by_topic.iter_mut() {
307        events.sort_by_key(|event| event.event_id);
308        let mut previous_hash: Option<String> = None;
309        for event in events.iter() {
310            if event.prev_hash != previous_hash {
311                report.errors.push(format!(
312                    "topic {topic} event {} prev_hash mismatch; expected {:?}, found {:?}",
313                    event.event_id, previous_hash, event.prev_hash
314                ));
315            }
316            let header_prev = event.headers.get(HEADER_PREV_HASH).cloned();
317            if header_prev != event.prev_hash {
318                report.errors.push(format!(
319                    "topic {topic} event {} prev_hash does not match provenance header",
320                    event.event_id
321                ));
322            }
323            let header_hash = event.headers.get(HEADER_RECORD_HASH);
324            if header_hash != Some(&event.record_hash) {
325                report.errors.push(format!(
326                    "topic {topic} event {} record_hash does not match provenance header",
327                    event.event_id
328                ));
329            }
330            let log_event = LogEvent {
331                kind: event.kind.clone(),
332                payload: event.payload.clone(),
333                headers: event.headers.clone(),
334                occurred_at_ms: event.occurred_at_ms,
335            };
336            match compute_event_record_hash(topic, event.event_id, &log_event) {
337                Ok(expected) if expected == event.record_hash => {}
338                Ok(expected) => report.errors.push(format!(
339                    "topic {topic} event {} record_hash mismatch; expected {expected}, found {}",
340                    event.event_id, event.record_hash
341                )),
342                Err(error) => report.errors.push(format!(
343                    "topic {topic} event {} hash error: {error}",
344                    event.event_id
345                )),
346            }
347            previous_hash = Some(event.record_hash.clone());
348            event_hashes.push((
349                event.topic.as_str(),
350                event.event_id,
351                event.record_hash.as_str(),
352            ));
353        }
354    }
355
356    event_hashes.sort_by(|left, right| left.0.cmp(right.0).then(left.1.cmp(&right.1)));
357    let expected_root = merkle_root(event_hashes.iter().map(|(_, _, hash)| *hash));
358    if expected_root != receipt.chain.event_root_hash {
359        report.errors.push(format!(
360            "event_root_hash mismatch; expected {expected_root}, found {}",
361            receipt.chain.event_root_hash
362        ));
363    }
364}
365
366fn verify_receipt_hash(receipt: &ProvenanceReceipt, report: &mut ReceiptVerificationReport) {
367    match canonical_unsigned_receipt(receipt) {
368        Ok(canonical) => {
369            let expected = sha256_bytes_prefixed(&canonical);
370            if expected != receipt.chain.receipt_hash {
371                report.errors.push(format!(
372                    "receipt_hash mismatch; expected {expected}, found {}",
373                    receipt.chain.receipt_hash
374                ));
375            }
376        }
377        Err(error) => report
378            .errors
379            .push(format!("receipt canonicalization failed: {error}")),
380    }
381}
382
383fn verify_receipt_signatures(receipt: &ProvenanceReceipt, report: &mut ReceiptVerificationReport) {
384    if receipt.signatures.is_empty() {
385        report.errors.push("receipt has no signatures".to_string());
386        return;
387    }
388    let canonical = match canonical_unsigned_receipt(receipt) {
389        Ok(canonical) => canonical,
390        Err(error) => {
391            report
392                .errors
393                .push(format!("receipt canonicalization failed: {error}"));
394            return;
395        }
396    };
397    let mut message = Vec::with_capacity(SIGNATURE_DOMAIN.len() + canonical.len());
398    message.extend_from_slice(SIGNATURE_DOMAIN);
399    message.extend_from_slice(canonical.as_bytes());
400    let mut any_valid = false;
401    for signature in &receipt.signatures {
402        if signature.algorithm != "ed25519" {
403            report.errors.push(format!(
404                "signature {} uses unsupported algorithm '{}'",
405                signature.key_id, signature.algorithm
406            ));
407            continue;
408        }
409        let public_key = match base64::engine::general_purpose::STANDARD
410            .decode(signature.public_key_base64.as_bytes())
411        {
412            Ok(bytes) => bytes,
413            Err(error) => {
414                report.errors.push(format!(
415                    "signature {} public key is not base64: {error}",
416                    signature.key_id
417                ));
418                continue;
419            }
420        };
421        let Ok(public_key) = <[u8; 32]>::try_from(public_key.as_slice()) else {
422            report.errors.push(format!(
423                "signature {} public key must be 32 bytes",
424                signature.key_id
425            ));
426            continue;
427        };
428        let verifying_key = match VerifyingKey::from_bytes(&public_key) {
429            Ok(key) => key,
430            Err(error) => {
431                report.errors.push(format!(
432                    "signature {} public key is invalid: {error}",
433                    signature.key_id
434                ));
435                continue;
436            }
437        };
438        let signature_bytes = match base64::engine::general_purpose::STANDARD
439            .decode(signature.signature_base64.as_bytes())
440        {
441            Ok(bytes) => bytes,
442            Err(error) => {
443                report.errors.push(format!(
444                    "signature {} bytes are not base64: {error}",
445                    signature.key_id
446                ));
447                continue;
448            }
449        };
450        let parsed_signature = match Signature::from_slice(&signature_bytes) {
451            Ok(signature) => signature,
452            Err(error) => {
453                report.errors.push(format!(
454                    "signature {} bytes are invalid: {error}",
455                    signature.key_id
456                ));
457                continue;
458            }
459        };
460        match verifying_key.verify(&message, &parsed_signature) {
461            Ok(()) => any_valid = true,
462            Err(error) => report.errors.push(format!(
463                "signature {} failed verification: {error}",
464                signature.key_id
465            )),
466        }
467    }
468    if !any_valid {
469        report
470            .errors
471            .push("no valid receipt signature found".to_string());
472    }
473}
474
475fn canonical_unsigned_receipt(receipt: &ProvenanceReceipt) -> Result<String, LogError> {
476    let mut unsigned = receipt.clone();
477    unsigned.chain.receipt_hash.clear();
478    unsigned.signatures.clear();
479    serde_json::to_string(&unsigned)
480        .map_err(|error| LogError::Serde(format!("receipt canonicalize error: {error}")))
481}
482
483fn sha256_json(context: &str, value: &serde_json::Value) -> Result<String, LogError> {
484    let canonical = serde_json::to_string(value)
485        .map_err(|error| LogError::Serde(format!("{context} canonicalize error: {error}")))?;
486    Ok(sha256_bytes_prefixed(&canonical))
487}
488
489fn sha256_bytes_prefixed(bytes: &str) -> String {
490    let digest = Sha256::digest(bytes.as_bytes());
491    format!("sha256:{}", hex::encode(digest))
492}
493
494fn merkle_root<'a>(hashes: impl Iterator<Item = &'a str>) -> String {
495    let mut level = hashes.map(str::to_string).collect::<Vec<_>>();
496    if level.is_empty() {
497        return sha256_bytes_prefixed("");
498    }
499    while level.len() > 1 {
500        let mut next = Vec::with_capacity(level.len().div_ceil(2));
501        for pair in level.chunks(2) {
502            let right = pair.get(1).unwrap_or(&pair[0]);
503            next.push(sha256_bytes_prefixed(&format!("{}{}", pair[0], right)));
504        }
505        level = next;
506    }
507    level.pop().expect("non-empty merkle level")
508}
509
510fn decode_seed_secret(bytes: &[u8]) -> Result<[u8; 32], SecretError> {
511    let text = std::str::from_utf8(bytes).map_err(|error| SecretError::Backend {
512        provider: "provenance".to_string(),
513        message: format!("stored Ed25519 seed is not UTF-8: {error}"),
514    })?;
515    let decoded = base64::engine::general_purpose::STANDARD
516        .decode(text.trim().as_bytes())
517        .map_err(|error| SecretError::Backend {
518            provider: "provenance".to_string(),
519            message: format!("stored Ed25519 seed is not base64: {error}"),
520        })?;
521    <[u8; 32]>::try_from(decoded.as_slice()).map_err(|_| SecretError::Backend {
522        provider: "provenance".to_string(),
523        message: "stored Ed25519 seed must decode to 32 bytes".to_string(),
524    })
525}
526
527fn key_id_for_verifying_key(key: &VerifyingKey) -> String {
528    let digest = Sha256::digest(key.to_bytes());
529    format!("ed25519:{}", hex::encode(&digest[..16]))
530}
531
532fn secret_error_is_not_found(error: &SecretError) -> bool {
533    match error {
534        SecretError::NotFound { .. } => true,
535        SecretError::All(errors) => errors.iter().all(secret_error_is_not_found),
536        _ => false,
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543    use crate::event_log::{EventLog, MemoryEventLog, Topic};
544
545    #[tokio::test]
546    async fn receipt_verifies_and_detects_event_tamper() {
547        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
548        let topic = Topic::new("run.provenance").unwrap();
549        log.append(
550            &topic,
551            LogEvent::new("started", serde_json::json!({"pipeline": "main"})),
552        )
553        .await
554        .unwrap();
555        log.append(
556            &topic,
557            LogEvent::new("finished", serde_json::json!({"status": "ok"})),
558        )
559        .await
560        .unwrap();
561        let signing_key = SigningKey::from_bytes(&[7; 32]);
562        let receipt = build_signed_receipt(
563            &log,
564            ReceiptBuildOptions {
565                pipeline: "main.harn".to_string(),
566                status: "ok".to_string(),
567                started_at_ms: 1,
568                finished_at_ms: 2,
569                exit_code: 0,
570                producer_name: "harn".to_string(),
571                producer_version: "test".to_string(),
572            },
573            &signing_key,
574            "test-key".to_string(),
575        )
576        .await
577        .unwrap();
578        assert!(verify_receipt(&receipt).verified);
579
580        let mut tampered = receipt;
581        tampered.event_log.events[0].payload = serde_json::json!({"pipeline": "other"});
582        let report = verify_receipt(&tampered);
583        assert!(!report.verified);
584        assert!(report
585            .errors
586            .iter()
587            .any(|error| error.contains("record_hash mismatch")));
588    }
589}