1use std::collections::{HashMap, HashSet};
2
3use ed25519_dalek::VerifyingKey;
4use ledger_core::{verify_payload_signature, AuditRecord, Hash32};
5use thiserror::Error;
6
7#[derive(Debug, Error, PartialEq, Eq)]
8pub enum IngestError {
9 #[error("unknown device: {0}")]
10 UnknownDevice(String),
11 #[error("duplicate record for device={device_id} sequence={sequence}")]
12 Duplicate { device_id: String, sequence: u64 },
13 #[error("invalid sequence for device={device_id}: expected={expected} actual={actual}")]
14 InvalidSequence {
15 device_id: String,
16 expected: u64,
17 actual: u64,
18 },
19 #[error("invalid previous hash for device={0}")]
20 InvalidPrevHash(String),
21 #[error("invalid signature for device={0}")]
22 InvalidSignature(String),
23}
24
25#[derive(Default)]
26pub struct IngestState {
27 public_keys: HashMap<String, VerifyingKey>,
28 seen: HashSet<(String, u64)>,
29 last_sequence: HashMap<String, u64>,
30 last_hash: HashMap<String, Hash32>,
31}
32
33impl IngestState {
34 pub fn register_device(&mut self, device_id: impl Into<String>, key: VerifyingKey) {
35 self.public_keys.insert(device_id.into(), key);
36 }
37
38 pub fn verify_and_accept(&mut self, record: &AuditRecord) -> Result<(), IngestError> {
39 let device_id = &record.device_id;
40 let key = self
41 .public_keys
42 .get(device_id)
43 .ok_or_else(|| IngestError::UnknownDevice(device_id.clone()))?;
44
45 if !verify_payload_signature(key, &record.payload_hash, &record.signature) {
46 return Err(IngestError::InvalidSignature(device_id.clone()));
47 }
48
49 if self.seen.contains(&(device_id.clone(), record.sequence)) {
50 return Err(IngestError::Duplicate {
51 device_id: device_id.clone(),
52 sequence: record.sequence,
53 });
54 }
55
56 let expected_sequence = self
57 .last_sequence
58 .get(device_id)
59 .map_or(1, |prev| prev.saturating_add(1));
60 if record.sequence != expected_sequence {
61 return Err(IngestError::InvalidSequence {
62 device_id: device_id.clone(),
63 expected: expected_sequence,
64 actual: record.sequence,
65 });
66 }
67
68 let expected_prev_hash = self
69 .last_hash
70 .get(device_id)
71 .copied()
72 .unwrap_or_else(AuditRecord::zero_hash);
73
74 if record.prev_record_hash != expected_prev_hash {
75 return Err(IngestError::InvalidPrevHash(device_id.clone()));
76 }
77
78 self.seen.insert((device_id.clone(), record.sequence));
79 self.last_sequence.insert(device_id.clone(), record.sequence);
80 self.last_hash.insert(device_id.clone(), record.hash());
81
82 Ok(())
83 }
84}