1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use base64::Engine;
5use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
6use serde::{Deserialize, Serialize};
7use sha2::{Digest, Sha256};
8use time::OffsetDateTime;
9use uuid::Uuid;
10
11use crate::event_log::{AnyEventLog, EventId, EventLog, LogError, LogEvent};
12use crate::secrets::{SecretBytes, SecretError, SecretId, SecretProvider};
13
14pub const EVENT_PROVENANCE_SCHEMA: &str = "harn-eventlog-provenance-v1";
15pub const RECEIPT_SCHEMA: &str = "harn-provenance-receipt-v1";
16pub const HEADER_SCHEMA: &str = "harn.provenance.schema";
17pub const HEADER_PREV_HASH: &str = "harn.provenance.prev_hash";
18pub const HEADER_RECORD_HASH: &str = "harn.provenance.record_hash";
19const SIGNATURE_DOMAIN: &[u8] = b"harn provenance receipt v1\n";
20const DEFAULT_AGENT_ID: &str = "harn-cli";
21
22#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
23pub struct ProvenanceReceipt {
24 pub schema: String,
25 pub receipt_id: String,
26 #[serde(with = "time::serde::rfc3339")]
27 pub issued_at: OffsetDateTime,
28 pub producer: ReceiptProducer,
29 pub run: ReceiptRun,
30 pub event_log: ReceiptEventLog,
31 pub chain: ReceiptChain,
32 #[serde(default)]
33 pub signatures: Vec<ReceiptSignature>,
34}
35
36#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
37pub struct ReceiptProducer {
38 pub name: String,
39 pub version: String,
40}
41
42#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
43pub struct ReceiptRun {
44 pub pipeline: String,
45 pub status: String,
46 pub started_at_ms: i64,
47 pub finished_at_ms: i64,
48 pub exit_code: i32,
49}
50
51#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
52pub struct ReceiptEventLog {
53 pub backend: String,
54 pub topics: Vec<String>,
55 pub events: Vec<ReceiptEvent>,
56}
57
58#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
59pub struct ReceiptEvent {
60 pub topic: String,
61 pub event_id: EventId,
62 pub kind: String,
63 pub payload: serde_json::Value,
64 pub headers: BTreeMap<String, String>,
65 pub occurred_at_ms: i64,
66 pub prev_hash: Option<String>,
67 pub record_hash: String,
68}
69
70#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
71pub struct ReceiptChain {
72 pub algorithm: String,
73 pub event_root_hash: String,
74 pub receipt_hash: String,
75}
76
77#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
78pub struct ReceiptSignature {
79 pub algorithm: String,
80 pub key_id: String,
81 pub public_key_base64: String,
82 pub signature_base64: String,
83 #[serde(with = "time::serde::rfc3339")]
84 pub signed_at: OffsetDateTime,
85}
86
87#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
88pub struct ReceiptVerificationReport {
89 pub verified: bool,
90 pub receipt_id: Option<String>,
91 pub receipt_hash: Option<String>,
92 pub event_root_hash: Option<String>,
93 pub event_count: usize,
94 pub signature_count: usize,
95 pub errors: Vec<String>,
96}
97
98#[derive(Clone, Debug)]
99pub struct ReceiptBuildOptions {
100 pub pipeline: String,
101 pub status: String,
102 pub started_at_ms: i64,
103 pub finished_at_ms: i64,
104 pub exit_code: i32,
105 pub producer_name: String,
106 pub producer_version: String,
107}
108
109pub fn prepare_event_for_append(
110 topic: &str,
111 event_id: EventId,
112 previous_hash: Option<String>,
113 mut event: LogEvent,
114) -> Result<LogEvent, LogError> {
115 event.headers.insert(
116 HEADER_SCHEMA.to_string(),
117 EVENT_PROVENANCE_SCHEMA.to_string(),
118 );
119 match previous_hash {
120 Some(previous_hash) => {
121 event
122 .headers
123 .insert(HEADER_PREV_HASH.to_string(), previous_hash);
124 }
125 None => {
126 event.headers.remove(HEADER_PREV_HASH);
127 }
128 }
129 event.headers.remove(HEADER_RECORD_HASH);
130 let record_hash = compute_event_record_hash(topic, event_id, &event)?;
131 event
132 .headers
133 .insert(HEADER_RECORD_HASH.to_string(), record_hash);
134 Ok(event)
135}
136
137pub fn event_record_hash_from_headers(
138 topic: &str,
139 event_id: EventId,
140 event: &LogEvent,
141) -> Result<String, LogError> {
142 match event.headers.get(HEADER_RECORD_HASH) {
143 Some(hash) if !hash.trim().is_empty() => Ok(hash.clone()),
144 _ => compute_event_record_hash(topic, event_id, event),
145 }
146}
147
148pub fn compute_event_record_hash(
149 topic: &str,
150 event_id: EventId,
151 event: &LogEvent,
152) -> Result<String, LogError> {
153 let mut headers = event.headers.clone();
154 headers.remove(HEADER_RECORD_HASH);
155 let value = serde_json::json!({
156 "topic": topic,
157 "event_id": event_id,
158 "kind": event.kind,
159 "payload": event.payload,
160 "headers": headers,
161 "occurred_at_ms": event.occurred_at_ms,
162 });
163 sha256_json("event log record hash", &value)
164}
165
166pub async fn load_or_generate_agent_signing_key(
167 provider: &dyn SecretProvider,
168 agent_id: Option<&str>,
169) -> Result<(SigningKey, String), SecretError> {
170 let agent_id = agent_id
171 .filter(|value| !value.trim().is_empty())
172 .unwrap_or(DEFAULT_AGENT_ID);
173 let id = SecretId::new("provenance", format!("{agent_id}.ed25519.seed"));
174 match provider.get(&id).await {
175 Ok(secret) => {
176 let seed = secret.with_exposed(decode_seed_secret)?;
177 let signing_key = SigningKey::from_bytes(&seed);
178 let key_id = key_id_for_verifying_key(&signing_key.verifying_key());
179 Ok((signing_key, key_id))
180 }
181 Err(error) if secret_error_is_not_found(&error) => {
182 let seed: [u8; 32] = rand::random();
183 let signing_key = SigningKey::from_bytes(&seed);
184 let encoded = base64::engine::general_purpose::STANDARD.encode(seed);
185 provider.put(&id, SecretBytes::from(encoded)).await?;
186 let key_id = key_id_for_verifying_key(&signing_key.verifying_key());
187 Ok((signing_key, key_id))
188 }
189 Err(error) => Err(error),
190 }
191}
192
193pub async fn build_signed_receipt(
194 log: &Arc<AnyEventLog>,
195 options: ReceiptBuildOptions,
196 signing_key: &SigningKey,
197 key_id: String,
198) -> Result<ProvenanceReceipt, LogError> {
199 let description = log.describe();
200 let mut topics = log.topics().await?;
201 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
202
203 let mut topic_names = Vec::with_capacity(topics.len());
204 let mut events = Vec::new();
205 for topic in topics {
206 topic_names.push(topic.as_str().to_string());
207 for (event_id, event) in log.read_range(&topic, None, usize::MAX).await? {
208 let record_hash = event_record_hash_from_headers(topic.as_str(), event_id, &event)?;
209 let prev_hash = event.headers.get(HEADER_PREV_HASH).cloned();
210 events.push(ReceiptEvent {
211 topic: topic.as_str().to_string(),
212 event_id,
213 kind: event.kind,
214 payload: event.payload,
215 headers: event.headers,
216 occurred_at_ms: event.occurred_at_ms,
217 prev_hash,
218 record_hash,
219 });
220 }
221 }
222 events.sort_by(|left, right| {
223 left.topic
224 .cmp(&right.topic)
225 .then(left.event_id.cmp(&right.event_id))
226 });
227 let event_root_hash = merkle_root(events.iter().map(|event| event.record_hash.as_str()));
228 let mut receipt = ProvenanceReceipt {
229 schema: RECEIPT_SCHEMA.to_string(),
230 receipt_id: format!("receipt-{}", Uuid::now_v7()),
231 issued_at: OffsetDateTime::now_utc(),
232 producer: ReceiptProducer {
233 name: options.producer_name,
234 version: options.producer_version,
235 },
236 run: ReceiptRun {
237 pipeline: options.pipeline,
238 status: options.status,
239 started_at_ms: options.started_at_ms,
240 finished_at_ms: options.finished_at_ms,
241 exit_code: options.exit_code,
242 },
243 event_log: ReceiptEventLog {
244 backend: description.backend.to_string(),
245 topics: topic_names,
246 events,
247 },
248 chain: ReceiptChain {
249 algorithm: "sha256/ed25519".to_string(),
250 event_root_hash,
251 receipt_hash: String::new(),
252 },
253 signatures: Vec::new(),
254 };
255 let canonical = canonical_unsigned_receipt(&receipt)?;
256 let receipt_hash = sha256_bytes_prefixed(&canonical);
257 let mut message = Vec::with_capacity(SIGNATURE_DOMAIN.len() + canonical.len());
258 message.extend_from_slice(SIGNATURE_DOMAIN);
259 message.extend_from_slice(canonical.as_bytes());
260 let signature = signing_key.sign(&message);
261 let verifying_key = signing_key.verifying_key();
262 receipt.chain.receipt_hash = receipt_hash;
263 receipt.signatures.push(ReceiptSignature {
264 algorithm: "ed25519".to_string(),
265 key_id,
266 public_key_base64: base64::engine::general_purpose::STANDARD
267 .encode(verifying_key.to_bytes()),
268 signature_base64: base64::engine::general_purpose::STANDARD.encode(signature.to_bytes()),
269 signed_at: OffsetDateTime::now_utc(),
270 });
271 Ok(receipt)
272}
273
274pub fn verify_receipt(receipt: &ProvenanceReceipt) -> ReceiptVerificationReport {
275 let mut report = ReceiptVerificationReport {
276 receipt_id: Some(receipt.receipt_id.clone()),
277 receipt_hash: Some(receipt.chain.receipt_hash.clone()),
278 event_root_hash: Some(receipt.chain.event_root_hash.clone()),
279 event_count: receipt.event_log.events.len(),
280 signature_count: receipt.signatures.len(),
281 ..ReceiptVerificationReport::default()
282 };
283
284 if receipt.schema != RECEIPT_SCHEMA {
285 report
286 .errors
287 .push(format!("unsupported receipt schema '{}'", receipt.schema));
288 }
289 verify_receipt_events(receipt, &mut report);
290 verify_receipt_hash(receipt, &mut report);
291 verify_receipt_signatures(receipt, &mut report);
292 report.verified = report.errors.is_empty();
293 report
294}
295
296fn verify_receipt_events(receipt: &ProvenanceReceipt, report: &mut ReceiptVerificationReport) {
297 let mut by_topic: HashMap<&str, Vec<&ReceiptEvent>> = HashMap::new();
298 for event in &receipt.event_log.events {
299 by_topic
300 .entry(event.topic.as_str())
301 .or_default()
302 .push(event);
303 }
304
305 let mut event_hashes = Vec::with_capacity(receipt.event_log.events.len());
306 for (topic, events) in by_topic.iter_mut() {
307 events.sort_by_key(|event| event.event_id);
308 let mut previous_hash: Option<String> = None;
309 for event in events.iter() {
310 if event.prev_hash != previous_hash {
311 report.errors.push(format!(
312 "topic {topic} event {} prev_hash mismatch; expected {:?}, found {:?}",
313 event.event_id, previous_hash, event.prev_hash
314 ));
315 }
316 let header_prev = event.headers.get(HEADER_PREV_HASH).cloned();
317 if header_prev != event.prev_hash {
318 report.errors.push(format!(
319 "topic {topic} event {} prev_hash does not match provenance header",
320 event.event_id
321 ));
322 }
323 let header_hash = event.headers.get(HEADER_RECORD_HASH);
324 if header_hash != Some(&event.record_hash) {
325 report.errors.push(format!(
326 "topic {topic} event {} record_hash does not match provenance header",
327 event.event_id
328 ));
329 }
330 let log_event = LogEvent {
331 kind: event.kind.clone(),
332 payload: event.payload.clone(),
333 headers: event.headers.clone(),
334 occurred_at_ms: event.occurred_at_ms,
335 };
336 match compute_event_record_hash(topic, event.event_id, &log_event) {
337 Ok(expected) if expected == event.record_hash => {}
338 Ok(expected) => report.errors.push(format!(
339 "topic {topic} event {} record_hash mismatch; expected {expected}, found {}",
340 event.event_id, event.record_hash
341 )),
342 Err(error) => report.errors.push(format!(
343 "topic {topic} event {} hash error: {error}",
344 event.event_id
345 )),
346 }
347 previous_hash = Some(event.record_hash.clone());
348 event_hashes.push((
349 event.topic.as_str(),
350 event.event_id,
351 event.record_hash.as_str(),
352 ));
353 }
354 }
355
356 event_hashes.sort_by(|left, right| left.0.cmp(right.0).then(left.1.cmp(&right.1)));
357 let expected_root = merkle_root(event_hashes.iter().map(|(_, _, hash)| *hash));
358 if expected_root != receipt.chain.event_root_hash {
359 report.errors.push(format!(
360 "event_root_hash mismatch; expected {expected_root}, found {}",
361 receipt.chain.event_root_hash
362 ));
363 }
364}
365
366fn verify_receipt_hash(receipt: &ProvenanceReceipt, report: &mut ReceiptVerificationReport) {
367 match canonical_unsigned_receipt(receipt) {
368 Ok(canonical) => {
369 let expected = sha256_bytes_prefixed(&canonical);
370 if expected != receipt.chain.receipt_hash {
371 report.errors.push(format!(
372 "receipt_hash mismatch; expected {expected}, found {}",
373 receipt.chain.receipt_hash
374 ));
375 }
376 }
377 Err(error) => report
378 .errors
379 .push(format!("receipt canonicalization failed: {error}")),
380 }
381}
382
383fn verify_receipt_signatures(receipt: &ProvenanceReceipt, report: &mut ReceiptVerificationReport) {
384 if receipt.signatures.is_empty() {
385 report.errors.push("receipt has no signatures".to_string());
386 return;
387 }
388 let canonical = match canonical_unsigned_receipt(receipt) {
389 Ok(canonical) => canonical,
390 Err(error) => {
391 report
392 .errors
393 .push(format!("receipt canonicalization failed: {error}"));
394 return;
395 }
396 };
397 let mut message = Vec::with_capacity(SIGNATURE_DOMAIN.len() + canonical.len());
398 message.extend_from_slice(SIGNATURE_DOMAIN);
399 message.extend_from_slice(canonical.as_bytes());
400 let mut any_valid = false;
401 for signature in &receipt.signatures {
402 if signature.algorithm != "ed25519" {
403 report.errors.push(format!(
404 "signature {} uses unsupported algorithm '{}'",
405 signature.key_id, signature.algorithm
406 ));
407 continue;
408 }
409 let public_key = match base64::engine::general_purpose::STANDARD
410 .decode(signature.public_key_base64.as_bytes())
411 {
412 Ok(bytes) => bytes,
413 Err(error) => {
414 report.errors.push(format!(
415 "signature {} public key is not base64: {error}",
416 signature.key_id
417 ));
418 continue;
419 }
420 };
421 let Ok(public_key) = <[u8; 32]>::try_from(public_key.as_slice()) else {
422 report.errors.push(format!(
423 "signature {} public key must be 32 bytes",
424 signature.key_id
425 ));
426 continue;
427 };
428 let verifying_key = match VerifyingKey::from_bytes(&public_key) {
429 Ok(key) => key,
430 Err(error) => {
431 report.errors.push(format!(
432 "signature {} public key is invalid: {error}",
433 signature.key_id
434 ));
435 continue;
436 }
437 };
438 let signature_bytes = match base64::engine::general_purpose::STANDARD
439 .decode(signature.signature_base64.as_bytes())
440 {
441 Ok(bytes) => bytes,
442 Err(error) => {
443 report.errors.push(format!(
444 "signature {} bytes are not base64: {error}",
445 signature.key_id
446 ));
447 continue;
448 }
449 };
450 let parsed_signature = match Signature::from_slice(&signature_bytes) {
451 Ok(signature) => signature,
452 Err(error) => {
453 report.errors.push(format!(
454 "signature {} bytes are invalid: {error}",
455 signature.key_id
456 ));
457 continue;
458 }
459 };
460 match verifying_key.verify(&message, &parsed_signature) {
461 Ok(()) => any_valid = true,
462 Err(error) => report.errors.push(format!(
463 "signature {} failed verification: {error}",
464 signature.key_id
465 )),
466 }
467 }
468 if !any_valid {
469 report
470 .errors
471 .push("no valid receipt signature found".to_string());
472 }
473}
474
475fn canonical_unsigned_receipt(receipt: &ProvenanceReceipt) -> Result<String, LogError> {
476 let mut unsigned = receipt.clone();
477 unsigned.chain.receipt_hash.clear();
478 unsigned.signatures.clear();
479 serde_json::to_string(&unsigned)
480 .map_err(|error| LogError::Serde(format!("receipt canonicalize error: {error}")))
481}
482
483fn sha256_json(context: &str, value: &serde_json::Value) -> Result<String, LogError> {
484 let canonical = serde_json::to_string(value)
485 .map_err(|error| LogError::Serde(format!("{context} canonicalize error: {error}")))?;
486 Ok(sha256_bytes_prefixed(&canonical))
487}
488
489fn sha256_bytes_prefixed(bytes: &str) -> String {
490 let digest = Sha256::digest(bytes.as_bytes());
491 format!("sha256:{}", hex::encode(digest))
492}
493
494fn merkle_root<'a>(hashes: impl Iterator<Item = &'a str>) -> String {
495 let mut level = hashes.map(str::to_string).collect::<Vec<_>>();
496 if level.is_empty() {
497 return sha256_bytes_prefixed("");
498 }
499 while level.len() > 1 {
500 let mut next = Vec::with_capacity(level.len().div_ceil(2));
501 for pair in level.chunks(2) {
502 let right = pair.get(1).unwrap_or(&pair[0]);
503 next.push(sha256_bytes_prefixed(&format!("{}{}", pair[0], right)));
504 }
505 level = next;
506 }
507 level.pop().expect("non-empty merkle level")
508}
509
510fn decode_seed_secret(bytes: &[u8]) -> Result<[u8; 32], SecretError> {
511 let text = std::str::from_utf8(bytes).map_err(|error| SecretError::Backend {
512 provider: "provenance".to_string(),
513 message: format!("stored Ed25519 seed is not UTF-8: {error}"),
514 })?;
515 let decoded = base64::engine::general_purpose::STANDARD
516 .decode(text.trim().as_bytes())
517 .map_err(|error| SecretError::Backend {
518 provider: "provenance".to_string(),
519 message: format!("stored Ed25519 seed is not base64: {error}"),
520 })?;
521 <[u8; 32]>::try_from(decoded.as_slice()).map_err(|_| SecretError::Backend {
522 provider: "provenance".to_string(),
523 message: "stored Ed25519 seed must decode to 32 bytes".to_string(),
524 })
525}
526
527fn key_id_for_verifying_key(key: &VerifyingKey) -> String {
528 let digest = Sha256::digest(key.to_bytes());
529 format!("ed25519:{}", hex::encode(&digest[..16]))
530}
531
532fn secret_error_is_not_found(error: &SecretError) -> bool {
533 match error {
534 SecretError::NotFound { .. } => true,
535 SecretError::All(errors) => errors.iter().all(secret_error_is_not_found),
536 _ => false,
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543 use crate::event_log::{EventLog, MemoryEventLog, Topic};
544
545 #[tokio::test]
546 async fn receipt_verifies_and_detects_event_tamper() {
547 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
548 let topic = Topic::new("run.provenance").unwrap();
549 log.append(
550 &topic,
551 LogEvent::new("started", serde_json::json!({"pipeline": "main"})),
552 )
553 .await
554 .unwrap();
555 log.append(
556 &topic,
557 LogEvent::new("finished", serde_json::json!({"status": "ok"})),
558 )
559 .await
560 .unwrap();
561 let signing_key = SigningKey::from_bytes(&[7; 32]);
562 let receipt = build_signed_receipt(
563 &log,
564 ReceiptBuildOptions {
565 pipeline: "main.harn".to_string(),
566 status: "ok".to_string(),
567 started_at_ms: 1,
568 finished_at_ms: 2,
569 exit_code: 0,
570 producer_name: "harn".to_string(),
571 producer_version: "test".to_string(),
572 },
573 &signing_key,
574 "test-key".to_string(),
575 )
576 .await
577 .unwrap();
578 assert!(verify_receipt(&receipt).verified);
579
580 let mut tampered = receipt;
581 tampered.event_log.events[0].payload = serde_json::json!({"pipeline": "other"});
582 let report = verify_receipt(&tampered);
583 assert!(!report.verified);
584 assert!(report
585 .errors
586 .iter()
587 .any(|error| error.contains("record_hash mismatch")));
588 }
589}