Skip to main content

immutable_trace/ingest/
storage.rs

1use std::collections::HashMap;
2
3use thiserror::Error;
4
5use crate::crypto::compute_payload_hash;
6use crate::record::AuditRecord;
7use super::verify::{IngestError, IngestState};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum IngestDecision {
11    Accepted,
12    Rejected,
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct OperationLogEntry {
17    pub decision: IngestDecision,
18    pub device_id: String,
19    pub sequence: u64,
20    pub message: String,
21}
22
23pub trait RawDataStore {
24    type Error: std::error::Error;
25
26    fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error>;
27}
28
29pub trait AuditLedger {
30    type Error: std::error::Error;
31
32    fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error>;
33}
34
35pub trait OperationLogStore {
36    type Error: std::error::Error;
37
38    fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error>;
39}
40
41#[derive(Debug, Error)]
42pub enum IngestServiceError {
43    #[error("ingest verification failed: {0}")]
44    Verify(#[from] IngestError),
45    #[error("payload hash mismatch for device={device_id} sequence={sequence}")]
46    PayloadHashMismatch { device_id: String, sequence: u64 },
47    #[error("raw data store error: {0}")]
48    RawDataStore(String),
49    #[error("audit ledger error: {0}")]
50    AuditLedger(String),
51    #[error("operation log error: {0}")]
52    OperationLog(String),
53}
54
55pub struct IngestService<R, L, O>
56where
57    R: RawDataStore,
58    L: AuditLedger,
59    O: OperationLogStore,
60{
61    verifier: IngestState,
62    raw_data_store: R,
63    audit_ledger: L,
64    operation_log: O,
65}
66
67impl<R, L, O> IngestService<R, L, O>
68where
69    R: RawDataStore,
70    L: AuditLedger,
71    O: OperationLogStore,
72{
73    pub fn new(verifier: IngestState, raw_data_store: R, audit_ledger: L, operation_log: O) -> Self {
74        Self {
75            verifier,
76            raw_data_store,
77            audit_ledger,
78            operation_log,
79        }
80    }
81
82    pub fn register_device(&mut self, device_id: impl Into<String>, key: ed25519_dalek::VerifyingKey) {
83        self.verifier.register_device(device_id, key);
84    }
85
86    pub fn ingest(&mut self, record: AuditRecord, raw_payload: &[u8]) -> Result<(), IngestServiceError> {
87        let payload_hash = compute_payload_hash(raw_payload);
88        if payload_hash != record.payload_hash {
89            self.log_rejection(&record, "payload hash mismatch");
90            return Err(IngestServiceError::PayloadHashMismatch {
91                device_id: record.device_id,
92                sequence: record.sequence,
93            });
94        }
95
96        if let Err(error) = self.verifier.verify_and_accept(&record) {
97            self.log_rejection(&record, &error.to_string());
98            return Err(IngestServiceError::Verify(error));
99        }
100
101        self.raw_data_store
102            .put(&record.object_ref, raw_payload)
103            .map_err(|e| IngestServiceError::RawDataStore(e.to_string()))?;
104
105        self.audit_ledger
106            .append(record.clone())
107            .map_err(|e| IngestServiceError::AuditLedger(e.to_string()))?;
108
109        self.operation_log
110            .write(OperationLogEntry {
111                decision: IngestDecision::Accepted,
112                device_id: record.device_id,
113                sequence: record.sequence,
114                message: "ingest accepted".to_string(),
115            })
116            .map_err(|e| IngestServiceError::OperationLog(e.to_string()))?;
117
118        Ok(())
119    }
120
121    pub fn raw_data_store(&self) -> &R {
122        &self.raw_data_store
123    }
124
125    pub fn audit_ledger(&self) -> &L {
126        &self.audit_ledger
127    }
128
129    pub fn operation_log(&self) -> &O {
130        &self.operation_log
131    }
132
133    fn log_rejection(&mut self, record: &AuditRecord, reason: &str) {
134        let _ = self.operation_log.write(OperationLogEntry {
135            decision: IngestDecision::Rejected,
136            device_id: record.device_id.clone(),
137            sequence: record.sequence,
138            message: reason.to_string(),
139        });
140    }
141}
142
143#[derive(Debug, Error)]
144#[error("in-memory store error: {message}")]
145pub struct InMemoryStoreError {
146    message: String,
147}
148
149#[derive(Default)]
150pub struct InMemoryRawDataStore {
151    objects: HashMap<String, Vec<u8>>,
152}
153
154impl InMemoryRawDataStore {
155    pub fn get(&self, object_ref: &str) -> Option<&[u8]> {
156        self.objects.get(object_ref).map(Vec::as_slice)
157    }
158}
159
160impl RawDataStore for InMemoryRawDataStore {
161    type Error = InMemoryStoreError;
162
163    fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
164        self.objects.insert(object_ref.to_string(), payload.to_vec());
165        Ok(())
166    }
167}
168
169#[derive(Default)]
170pub struct InMemoryAuditLedger {
171    records: Vec<AuditRecord>,
172}
173
174impl InMemoryAuditLedger {
175    pub fn records(&self) -> &[AuditRecord] {
176        &self.records
177    }
178}
179
180impl AuditLedger for InMemoryAuditLedger {
181    type Error = InMemoryStoreError;
182
183    fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error> {
184        self.records.push(record);
185        Ok(())
186    }
187}
188
189#[derive(Default)]
190pub struct InMemoryOperationLog {
191    entries: Vec<OperationLogEntry>,
192}
193
194impl InMemoryOperationLog {
195    pub fn entries(&self) -> &[OperationLogEntry] {
196        &self.entries
197    }
198}
199
200impl OperationLogStore for InMemoryOperationLog {
201    type Error = InMemoryStoreError;
202
203    fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error> {
204        self.entries.push(entry);
205        Ok(())
206    }
207}
208
209#[cfg(feature = "s3")]
210#[derive(Debug, Clone, PartialEq, Eq)]
211pub enum S3Backend {
212    AwsS3,
213    Minio,
214}
215
216#[cfg(feature = "s3")]
217#[derive(Debug, Clone)]
218pub struct S3ObjectStoreConfig {
219    pub backend: S3Backend,
220    pub bucket: String,
221    pub region: String,
222    pub endpoint: Option<String>,
223    pub access_key_id: Option<String>,
224    pub secret_access_key: Option<String>,
225}
226
227#[cfg(feature = "s3")]
228impl S3ObjectStoreConfig {
229    pub fn for_aws_s3(bucket: impl Into<String>, region: impl Into<String>) -> Self {
230        Self {
231            backend: S3Backend::AwsS3,
232            bucket: bucket.into(),
233            region: region.into(),
234            endpoint: None,
235            access_key_id: None,
236            secret_access_key: None,
237        }
238    }
239
240    pub fn for_minio(
241        bucket: impl Into<String>,
242        region: impl Into<String>,
243        endpoint: impl Into<String>,
244        access_key_id: impl Into<String>,
245        secret_access_key: impl Into<String>,
246    ) -> Self {
247        Self {
248            backend: S3Backend::Minio,
249            bucket: bucket.into(),
250            region: region.into(),
251            endpoint: Some(endpoint.into()),
252            access_key_id: Some(access_key_id.into()),
253            secret_access_key: Some(secret_access_key.into()),
254        }
255    }
256}
257
258#[cfg(feature = "s3")]
259#[derive(Debug, Error)]
260pub enum S3StoreError {
261    #[error("invalid config: {0}")]
262    InvalidConfig(String),
263    #[error("runtime initialization failed: {0}")]
264    Runtime(String),
265    #[error("s3 put failed: {0}")]
266    Put(String),
267}
268
269#[cfg(feature = "s3")]
270pub struct S3CompatibleRawDataStore {
271    runtime: tokio::runtime::Runtime,
272    client: aws_sdk_s3::Client,
273    bucket: String,
274}
275
276#[cfg(feature = "s3")]
277impl S3CompatibleRawDataStore {
278    pub fn new(config: S3ObjectStoreConfig) -> Result<Self, S3StoreError> {
279        use aws_config::BehaviorVersion;
280        use aws_config::Region;
281        use aws_credential_types::Credentials;
282
283        let runtime = tokio::runtime::Runtime::new()
284            .map_err(|e| S3StoreError::Runtime(e.to_string()))?;
285
286        let mut loader = aws_config::defaults(BehaviorVersion::latest())
287            .region(Region::new(config.region.clone()));
288
289        match config.backend {
290            S3Backend::AwsS3 => {
291                if let (Some(access_key_id), Some(secret_access_key)) =
292                    (config.access_key_id.clone(), config.secret_access_key.clone())
293                {
294                    let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
295                    loader = loader.credentials_provider(creds);
296                }
297            }
298            S3Backend::Minio => {
299                let endpoint = config.endpoint.clone().ok_or_else(|| {
300                    S3StoreError::InvalidConfig("endpoint is required for MinIO backend".to_string())
301                })?;
302                let access_key_id = config.access_key_id.clone().ok_or_else(|| {
303                    S3StoreError::InvalidConfig("access_key_id is required for MinIO backend".to_string())
304                })?;
305                let secret_access_key = config.secret_access_key.clone().ok_or_else(|| {
306                    S3StoreError::InvalidConfig(
307                        "secret_access_key is required for MinIO backend".to_string(),
308                    )
309                })?;
310
311                let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
312                loader = loader.endpoint_url(endpoint).credentials_provider(creds);
313            }
314        }
315
316        let shared = runtime.block_on(loader.load());
317        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&shared);
318
319        if config.backend == S3Backend::Minio {
320            s3_config_builder = s3_config_builder.force_path_style(true);
321        }
322
323        let client = aws_sdk_s3::Client::from_conf(s3_config_builder.build());
324
325        Ok(Self {
326            runtime,
327            client,
328            bucket: config.bucket,
329        })
330    }
331}
332
333#[cfg(feature = "s3")]
334impl RawDataStore for S3CompatibleRawDataStore {
335    type Error = S3StoreError;
336
337    fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
338        let stream = aws_sdk_s3::primitives::ByteStream::from(payload.to_vec());
339        self.runtime
340            .block_on(
341                self.client
342                    .put_object()
343                    .bucket(&self.bucket)
344                    .key(object_ref)
345                    .body(stream)
346                    .send(),
347            )
348            .map_err(|e| S3StoreError::Put(e.to_string()))?;
349        Ok(())
350    }
351}