1use std::collections::HashMap;
2
3use thiserror::Error;
4use tracing::{debug, error, info, instrument, warn};
5
6use crate::integrity::compute_payload_hash;
7use crate::record::AuditRecord;
8use super::policy::IntegrityPolicyGate;
9use super::verify::IngestError;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum IngestDecision {
13 Accepted,
14 Rejected,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct OperationLogEntry {
19 pub decision: IngestDecision,
20 pub device_id: String,
21 pub sequence: u64,
22 pub message: String,
23}
24
25pub trait RawDataStore {
26 type Error: std::error::Error;
27
28 fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error>;
29}
30
31pub trait AuditLedger {
32 type Error: std::error::Error;
33
34 fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error>;
35}
36
37pub trait OperationLogStore {
38 type Error: std::error::Error;
39
40 fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error>;
41}
42
43#[derive(Debug, Error)]
44pub enum IngestServiceError {
45 #[error("ingest verification failed: {0}")]
46 Verify(#[from] IngestError),
47 #[error("payload hash mismatch for device={device_id} sequence={sequence}")]
48 PayloadHashMismatch { device_id: String, sequence: u64 },
49 #[error("raw data store error: {0}")]
50 RawDataStore(String),
51 #[error("audit ledger error: {0}")]
52 AuditLedger(String),
53 #[error("operation log error: {0}")]
54 OperationLog(String),
55}
56
57pub struct IngestService<R, L, O>
58where
59 R: RawDataStore,
60 L: AuditLedger,
61 O: OperationLogStore,
62{
63 policy: IntegrityPolicyGate,
64 raw_data_store: R,
65 audit_ledger: L,
66 operation_log: O,
67}
68
69impl<R, L, O> IngestService<R, L, O>
70where
71 R: RawDataStore,
72 L: AuditLedger,
73 O: OperationLogStore,
74{
75 pub fn new(policy: IntegrityPolicyGate, raw_data_store: R, audit_ledger: L, operation_log: O) -> Self {
76 Self {
77 policy,
78 raw_data_store,
79 audit_ledger,
80 operation_log,
81 }
82 }
83
84 pub fn register_device(&mut self, device_id: impl Into<String>, key: ed25519_dalek::VerifyingKey) {
85 self.policy.register_device(device_id, key);
86 }
87
88 #[instrument(skip(self, raw_payload), fields(
89 device_id = %record.device_id,
90 sequence = record.sequence,
91 object_ref = %record.object_ref,
92 ))]
93 pub fn ingest(&mut self, record: AuditRecord, raw_payload: &[u8], cert_identity: Option<&str>) -> Result<(), IngestServiceError> {
94 debug!(payload_bytes = raw_payload.len(), "ingest started");
95
96 let payload_hash = compute_payload_hash(raw_payload);
97 if payload_hash != record.payload_hash {
98 warn!("payload hash mismatch — record rejected");
99 self.log_rejection(&record, "payload hash mismatch");
100 return Err(IngestServiceError::PayloadHashMismatch {
101 device_id: record.device_id,
102 sequence: record.sequence,
103 });
104 }
105
106 if let Err(error) = self.policy.enforce(&record, cert_identity) {
107 warn!(reason = %error, "integrity policy rejected record");
108 self.log_rejection(&record, &error.to_string());
109 return Err(IngestServiceError::Verify(error));
110 }
111
112 self.raw_data_store
113 .put(&record.object_ref, raw_payload)
114 .map_err(|e| {
115 error!(error = %e, "raw data store write failed");
116 IngestServiceError::RawDataStore(e.to_string())
117 })?;
118
119 self.audit_ledger
120 .append(record.clone())
121 .map_err(|e| {
122 error!(error = %e, "audit ledger append failed");
123 IngestServiceError::AuditLedger(e.to_string())
124 })?;
125
126 self.operation_log
127 .write(OperationLogEntry {
128 decision: IngestDecision::Accepted,
129 device_id: record.device_id,
130 sequence: record.sequence,
131 message: "ingest accepted".to_string(),
132 })
133 .map_err(|e| {
134 error!(error = %e, "operation log write failed");
135 IngestServiceError::OperationLog(e.to_string())
136 })?;
137
138 info!("record accepted");
139 Ok(())
140 }
141
142 pub fn raw_data_store(&self) -> &R {
143 &self.raw_data_store
144 }
145
146 pub fn audit_ledger(&self) -> &L {
147 &self.audit_ledger
148 }
149
150 pub fn operation_log(&self) -> &O {
151 &self.operation_log
152 }
153
154 fn log_rejection(&mut self, record: &AuditRecord, reason: &str) {
155 let _ = self.operation_log.write(OperationLogEntry {
156 decision: IngestDecision::Rejected,
157 device_id: record.device_id.clone(),
158 sequence: record.sequence,
159 message: reason.to_string(),
160 });
161 }
162}
163
164#[derive(Debug, Error)]
165#[error("in-memory store error: {message}")]
166pub struct InMemoryStoreError {
167 message: String,
168}
169
170#[derive(Default)]
171pub struct InMemoryRawDataStore {
172 objects: HashMap<String, Vec<u8>>,
173}
174
175impl InMemoryRawDataStore {
176 pub fn get(&self, object_ref: &str) -> Option<&[u8]> {
177 self.objects.get(object_ref).map(Vec::as_slice)
178 }
179}
180
181impl RawDataStore for InMemoryRawDataStore {
182 type Error = InMemoryStoreError;
183
184 fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
185 self.objects.insert(object_ref.to_string(), payload.to_vec());
186 Ok(())
187 }
188}
189
190#[derive(Default)]
191pub struct InMemoryAuditLedger {
192 records: Vec<AuditRecord>,
193}
194
195impl InMemoryAuditLedger {
196 pub fn records(&self) -> &[AuditRecord] {
197 &self.records
198 }
199}
200
201impl AuditLedger for InMemoryAuditLedger {
202 type Error = InMemoryStoreError;
203
204 fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error> {
205 self.records.push(record);
206 Ok(())
207 }
208}
209
210#[derive(Default)]
211pub struct InMemoryOperationLog {
212 entries: Vec<OperationLogEntry>,
213}
214
215impl InMemoryOperationLog {
216 pub fn entries(&self) -> &[OperationLogEntry] {
217 &self.entries
218 }
219}
220
221impl OperationLogStore for InMemoryOperationLog {
222 type Error = InMemoryStoreError;
223
224 fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error> {
225 self.entries.push(entry);
226 Ok(())
227 }
228}
229
230#[cfg(feature = "postgres")]
231#[derive(Debug, Error)]
232pub enum PostgresStoreError {
233 #[error("postgres error: {0}")]
234 Postgres(#[from] ::postgres::Error),
235}
236
237#[cfg(feature = "postgres")]
238pub struct PostgresAuditLedger {
239 client: ::postgres::Client,
240}
241
242#[cfg(feature = "postgres")]
243impl PostgresAuditLedger {
244 pub fn connect(url: &str) -> Result<Self, ::postgres::Error> {
245 let client = ::postgres::Client::connect(url, ::postgres::NoTls)?;
246 Ok(Self { client })
247 }
248}
249
250#[cfg(feature = "postgres")]
251impl AuditLedger for PostgresAuditLedger {
252 type Error = PostgresStoreError;
253
254 fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error> {
255 self.client.execute(
256 "INSERT INTO audit_records \
257 (device_id, sequence, timestamp_ms, payload_hash, signature, prev_record_hash, object_ref) \
258 VALUES ($1, $2, $3, $4, $5, $6, $7)",
259 &[
260 &record.device_id,
261 &(record.sequence as i64),
262 &(record.timestamp_ms as i64),
263 &record.payload_hash.as_ref(),
264 &record.signature.as_ref(),
265 &record.prev_record_hash.as_ref(),
266 &record.object_ref,
267 ],
268 )?;
269 Ok(())
270 }
271}
272
273#[cfg(feature = "postgres")]
274pub struct PostgresOperationLog {
275 client: ::postgres::Client,
276}
277
278#[cfg(feature = "postgres")]
279impl PostgresOperationLog {
280 pub fn connect(url: &str) -> Result<Self, ::postgres::Error> {
281 let client = ::postgres::Client::connect(url, ::postgres::NoTls)?;
282 Ok(Self { client })
283 }
284
285 pub fn reset(&mut self) -> Result<(), ::postgres::Error> {
286 self.client.batch_execute(
287 "TRUNCATE TABLE operation_logs, audit_records RESTART IDENTITY;",
288 )
289 }
290}
291
292#[cfg(feature = "postgres")]
293impl OperationLogStore for PostgresOperationLog {
294 type Error = ::postgres::Error;
295
296 fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error> {
297 let decision = match entry.decision {
298 IngestDecision::Accepted => "Accepted",
299 IngestDecision::Rejected => "Rejected",
300 };
301 self.client.execute(
302 "INSERT INTO operation_logs (decision, device_id, sequence, message) \
303 VALUES ($1, $2, $3, $4)",
304 &[
305 &decision,
306 &entry.device_id,
307 &(entry.sequence as i64),
308 &entry.message,
309 ],
310 )?;
311 Ok(())
312 }
313}
314
315#[cfg(feature = "s3")]
316#[derive(Debug, Clone, PartialEq, Eq)]
317pub enum S3Backend {
318 AwsS3,
319 Minio,
320}
321
322#[cfg(feature = "s3")]
323#[derive(Debug, Clone)]
324pub struct S3ObjectStoreConfig {
325 pub backend: S3Backend,
326 pub bucket: String,
327 pub region: String,
328 pub endpoint: Option<String>,
329 pub access_key_id: Option<String>,
330 pub secret_access_key: Option<String>,
331}
332
333#[cfg(feature = "s3")]
334impl S3ObjectStoreConfig {
335 pub fn for_aws_s3(bucket: impl Into<String>, region: impl Into<String>) -> Self {
336 Self {
337 backend: S3Backend::AwsS3,
338 bucket: bucket.into(),
339 region: region.into(),
340 endpoint: None,
341 access_key_id: None,
342 secret_access_key: None,
343 }
344 }
345
346 pub fn for_minio(
347 bucket: impl Into<String>,
348 region: impl Into<String>,
349 endpoint: impl Into<String>,
350 access_key_id: impl Into<String>,
351 secret_access_key: impl Into<String>,
352 ) -> Self {
353 Self {
354 backend: S3Backend::Minio,
355 bucket: bucket.into(),
356 region: region.into(),
357 endpoint: Some(endpoint.into()),
358 access_key_id: Some(access_key_id.into()),
359 secret_access_key: Some(secret_access_key.into()),
360 }
361 }
362}
363
364#[cfg(feature = "s3")]
365#[derive(Debug, Error)]
366pub enum S3StoreError {
367 #[error("invalid config: {0}")]
368 InvalidConfig(String),
369 #[error("runtime initialization failed: {0}")]
370 Runtime(String),
371 #[error("s3 put failed: {0}")]
372 Put(String),
373}
374
375#[cfg(feature = "s3")]
376pub struct S3CompatibleRawDataStore {
377 runtime: tokio::runtime::Runtime,
378 client: aws_sdk_s3::Client,
379 bucket: String,
380}
381
382#[cfg(feature = "s3")]
383impl S3CompatibleRawDataStore {
384 pub fn new(config: S3ObjectStoreConfig) -> Result<Self, S3StoreError> {
385 use aws_config::BehaviorVersion;
386 use aws_config::Region;
387 use aws_credential_types::Credentials;
388
389 let runtime = tokio::runtime::Runtime::new()
390 .map_err(|e| S3StoreError::Runtime(e.to_string()))?;
391
392 let mut loader = aws_config::defaults(BehaviorVersion::latest())
393 .region(Region::new(config.region.clone()));
394
395 match config.backend {
396 S3Backend::AwsS3 => {
397 if let (Some(access_key_id), Some(secret_access_key)) =
398 (config.access_key_id.clone(), config.secret_access_key.clone())
399 {
400 let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
401 loader = loader.credentials_provider(creds);
402 }
403 }
404 S3Backend::Minio => {
405 let endpoint = config.endpoint.clone().ok_or_else(|| {
406 S3StoreError::InvalidConfig("endpoint is required for MinIO backend".to_string())
407 })?;
408 let access_key_id = config.access_key_id.clone().ok_or_else(|| {
409 S3StoreError::InvalidConfig("access_key_id is required for MinIO backend".to_string())
410 })?;
411 let secret_access_key = config.secret_access_key.clone().ok_or_else(|| {
412 S3StoreError::InvalidConfig(
413 "secret_access_key is required for MinIO backend".to_string(),
414 )
415 })?;
416
417 let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
418 loader = loader.endpoint_url(endpoint).credentials_provider(creds);
419 }
420 }
421
422 let shared = runtime.block_on(loader.load());
423 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&shared);
424
425 if config.backend == S3Backend::Minio {
426 s3_config_builder = s3_config_builder.force_path_style(true);
427 }
428
429 let client = aws_sdk_s3::Client::from_conf(s3_config_builder.build());
430
431 Ok(Self {
432 runtime,
433 client,
434 bucket: config.bucket,
435 })
436 }
437}
438
439#[cfg(feature = "s3")]
440impl S3CompatibleRawDataStore {
441 fn object_key<'a>(&self, object_ref: &'a str) -> &'a str {
446 if let Some(rest) = object_ref.strip_prefix("s3://") {
447 if let Some(slash) = rest.find('/') {
448 return &rest[slash + 1..];
449 }
450 }
451 object_ref
452 }
453}
454
455#[cfg(feature = "s3")]
456impl RawDataStore for S3CompatibleRawDataStore {
457 type Error = S3StoreError;
458
459 fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
460 let key = self.object_key(object_ref);
461 let stream = aws_sdk_s3::primitives::ByteStream::from(payload.to_vec());
462 self.runtime
463 .block_on(
464 self.client
465 .put_object()
466 .bucket(&self.bucket)
467 .key(key)
468 .body(stream)
469 .send(),
470 )
471 .map_err(|e| S3StoreError::Put(e.to_string()))?;
472 Ok(())
473 }
474}
475
476#[cfg(feature = "async-ingest")]
479use std::sync::Arc;
480#[cfg(feature = "async-ingest")]
481use tokio::sync::Mutex as AsyncMutex;
482
483#[cfg(feature = "async-ingest")]
489pub trait AsyncRawDataStore: Send + Sync {
490 type Error: std::error::Error + Send;
491
492 fn put(
493 &self,
494 object_ref: &str,
495 payload: &[u8],
496 ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
497}
498
499#[cfg(feature = "async-ingest")]
501pub trait AsyncAuditLedger: Send + Sync {
502 type Error: std::error::Error + Send;
503
504 fn append(
505 &self,
506 record: AuditRecord,
507 ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
508}
509
510#[cfg(feature = "async-ingest")]
512pub trait AsyncOperationLogStore: Send + Sync {
513 type Error: std::error::Error + Send;
514
515 fn write(
516 &self,
517 entry: OperationLogEntry,
518 ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
519}
520
521#[cfg(feature = "async-ingest")]
525#[derive(Default, Clone)]
526pub struct AsyncInMemoryRawDataStore {
527 objects: Arc<AsyncMutex<HashMap<String, Vec<u8>>>>,
528}
529
530#[cfg(feature = "async-ingest")]
531impl AsyncInMemoryRawDataStore {
532 pub async fn get(&self, object_ref: &str) -> Option<Vec<u8>> {
533 self.objects.lock().await.get(object_ref).cloned()
534 }
535}
536
537#[cfg(feature = "async-ingest")]
538impl AsyncRawDataStore for AsyncInMemoryRawDataStore {
539 type Error = InMemoryStoreError;
540
541 async fn put(&self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
542 self.objects
543 .lock()
544 .await
545 .insert(object_ref.to_string(), payload.to_vec());
546 Ok(())
547 }
548}
549
550#[cfg(feature = "async-ingest")]
552#[derive(Default, Clone)]
553pub struct AsyncInMemoryAuditLedger {
554 records: Arc<AsyncMutex<Vec<AuditRecord>>>,
555}
556
557#[cfg(feature = "async-ingest")]
558impl AsyncInMemoryAuditLedger {
559 pub async fn records(&self) -> Vec<AuditRecord> {
560 self.records.lock().await.clone()
561 }
562}
563
564#[cfg(feature = "async-ingest")]
565impl AsyncAuditLedger for AsyncInMemoryAuditLedger {
566 type Error = InMemoryStoreError;
567
568 async fn append(&self, record: AuditRecord) -> Result<(), Self::Error> {
569 self.records.lock().await.push(record);
570 Ok(())
571 }
572}
573
574#[cfg(feature = "async-ingest")]
576#[derive(Default, Clone)]
577pub struct AsyncInMemoryOperationLog {
578 entries: Arc<AsyncMutex<Vec<OperationLogEntry>>>,
579}
580
581#[cfg(feature = "async-ingest")]
582impl AsyncInMemoryOperationLog {
583 pub async fn entries(&self) -> Vec<OperationLogEntry> {
584 self.entries.lock().await.clone()
585 }
586}
587
588#[cfg(feature = "async-ingest")]
589impl AsyncOperationLogStore for AsyncInMemoryOperationLog {
590 type Error = InMemoryStoreError;
591
592 async fn write(&self, entry: OperationLogEntry) -> Result<(), Self::Error> {
593 self.entries.lock().await.push(entry);
594 Ok(())
595 }
596}
597
598#[cfg(all(feature = "s3", feature = "async-ingest"))]
604impl AsyncRawDataStore for S3CompatibleRawDataStore {
605 type Error = S3StoreError;
606
607 async fn put(&self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
608 let key = self.object_key(object_ref);
609 let stream = aws_sdk_s3::primitives::ByteStream::from(payload.to_vec());
610 self.client
611 .put_object()
612 .bucket(&self.bucket)
613 .key(key)
614 .body(stream)
615 .send()
616 .await
617 .map_err(|e| S3StoreError::Put(e.to_string()))?;
618 Ok(())
619 }
620}
621
622#[cfg(feature = "async-ingest")]
631pub struct AsyncIngestService<R, L, O>
632where
633 R: AsyncRawDataStore,
634 L: AsyncAuditLedger,
635 O: AsyncOperationLogStore,
636{
637 policy: AsyncMutex<super::policy::IntegrityPolicyGate>,
638 raw_data_store: R,
639 audit_ledger: L,
640 operation_log: O,
641}
642
643#[cfg(feature = "async-ingest")]
644impl<R, L, O> AsyncIngestService<R, L, O>
645where
646 R: AsyncRawDataStore,
647 L: AsyncAuditLedger,
648 O: AsyncOperationLogStore,
649{
650 pub fn new(
651 policy: super::policy::IntegrityPolicyGate,
652 raw_data_store: R,
653 audit_ledger: L,
654 operation_log: O,
655 ) -> Self {
656 Self {
657 policy: AsyncMutex::new(policy),
658 raw_data_store,
659 audit_ledger,
660 operation_log,
661 }
662 }
663
664 pub async fn register_device(
665 &self,
666 device_id: impl Into<String>,
667 key: ed25519_dalek::VerifyingKey,
668 ) {
669 self.policy.lock().await.register_device(device_id, key);
670 }
671
672 #[tracing::instrument(skip(self, raw_payload), fields(
673 device_id = %record.device_id,
674 sequence = record.sequence,
675 object_ref = %record.object_ref,
676 ))]
677 pub async fn ingest(
678 &self,
679 record: AuditRecord,
680 raw_payload: &[u8],
681 cert_identity: Option<&str>,
682 ) -> Result<(), IngestServiceError> {
683 debug!(payload_bytes = raw_payload.len(), "async ingest started");
684
685 let payload_hash = compute_payload_hash(raw_payload);
686 if payload_hash != record.payload_hash {
687 warn!("payload hash mismatch — record rejected");
688 let _ = self
689 .operation_log
690 .write(OperationLogEntry {
691 decision: IngestDecision::Rejected,
692 device_id: record.device_id.clone(),
693 sequence: record.sequence,
694 message: "payload hash mismatch".to_string(),
695 })
696 .await;
697 return Err(IngestServiceError::PayloadHashMismatch {
698 device_id: record.device_id,
699 sequence: record.sequence,
700 });
701 }
702
703 let policy_result = {
705 let mut policy = self.policy.lock().await;
706 policy.enforce(&record, cert_identity)
707 };
708 if let Err(error) = policy_result {
709 warn!(reason = %error, "integrity policy rejected record");
710 let _ = self
711 .operation_log
712 .write(OperationLogEntry {
713 decision: IngestDecision::Rejected,
714 device_id: record.device_id.clone(),
715 sequence: record.sequence,
716 message: error.to_string(),
717 })
718 .await;
719 return Err(IngestServiceError::Verify(error));
720 }
721
722 self.raw_data_store
723 .put(&record.object_ref, raw_payload)
724 .await
725 .map_err(|e| {
726 error!(error = %e, "raw data store write failed");
727 IngestServiceError::RawDataStore(e.to_string())
728 })?;
729
730 self.audit_ledger
731 .append(record.clone())
732 .await
733 .map_err(|e| {
734 error!(error = %e, "audit ledger append failed");
735 IngestServiceError::AuditLedger(e.to_string())
736 })?;
737
738 self.operation_log
739 .write(OperationLogEntry {
740 decision: IngestDecision::Accepted,
741 device_id: record.device_id,
742 sequence: record.sequence,
743 message: "ingest accepted".to_string(),
744 })
745 .await
746 .map_err(|e| {
747 error!(error = %e, "operation log write failed");
748 IngestServiceError::OperationLog(e.to_string())
749 })?;
750
751 info!("record accepted");
752 Ok(())
753 }
754
755 pub fn raw_data_store(&self) -> &R {
756 &self.raw_data_store
757 }
758
759 pub fn audit_ledger(&self) -> &L {
760 &self.audit_ledger
761 }
762
763 pub fn operation_log(&self) -> &O {
764 &self.operation_log
765 }
766}