1use std::collections::HashMap;
2
3use thiserror::Error;
4
5use crate::crypto::compute_payload_hash;
6use crate::record::AuditRecord;
7use super::verify::{IngestError, IngestState};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum IngestDecision {
11 Accepted,
12 Rejected,
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct OperationLogEntry {
17 pub decision: IngestDecision,
18 pub device_id: String,
19 pub sequence: u64,
20 pub message: String,
21}
22
23pub trait RawDataStore {
24 type Error: std::error::Error;
25
26 fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error>;
27}
28
29pub trait AuditLedger {
30 type Error: std::error::Error;
31
32 fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error>;
33}
34
35pub trait OperationLogStore {
36 type Error: std::error::Error;
37
38 fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error>;
39}
40
41#[derive(Debug, Error)]
42pub enum IngestServiceError {
43 #[error("ingest verification failed: {0}")]
44 Verify(#[from] IngestError),
45 #[error("payload hash mismatch for device={device_id} sequence={sequence}")]
46 PayloadHashMismatch { device_id: String, sequence: u64 },
47 #[error("raw data store error: {0}")]
48 RawDataStore(String),
49 #[error("audit ledger error: {0}")]
50 AuditLedger(String),
51 #[error("operation log error: {0}")]
52 OperationLog(String),
53}
54
55pub struct IngestService<R, L, O>
56where
57 R: RawDataStore,
58 L: AuditLedger,
59 O: OperationLogStore,
60{
61 verifier: IngestState,
62 raw_data_store: R,
63 audit_ledger: L,
64 operation_log: O,
65}
66
67impl<R, L, O> IngestService<R, L, O>
68where
69 R: RawDataStore,
70 L: AuditLedger,
71 O: OperationLogStore,
72{
73 pub fn new(verifier: IngestState, raw_data_store: R, audit_ledger: L, operation_log: O) -> Self {
74 Self {
75 verifier,
76 raw_data_store,
77 audit_ledger,
78 operation_log,
79 }
80 }
81
82 pub fn register_device(&mut self, device_id: impl Into<String>, key: ed25519_dalek::VerifyingKey) {
83 self.verifier.register_device(device_id, key);
84 }
85
86 pub fn ingest(&mut self, record: AuditRecord, raw_payload: &[u8]) -> Result<(), IngestServiceError> {
87 let payload_hash = compute_payload_hash(raw_payload);
88 if payload_hash != record.payload_hash {
89 self.log_rejection(&record, "payload hash mismatch");
90 return Err(IngestServiceError::PayloadHashMismatch {
91 device_id: record.device_id,
92 sequence: record.sequence,
93 });
94 }
95
96 if let Err(error) = self.verifier.verify_and_accept(&record) {
97 self.log_rejection(&record, &error.to_string());
98 return Err(IngestServiceError::Verify(error));
99 }
100
101 self.raw_data_store
102 .put(&record.object_ref, raw_payload)
103 .map_err(|e| IngestServiceError::RawDataStore(e.to_string()))?;
104
105 self.audit_ledger
106 .append(record.clone())
107 .map_err(|e| IngestServiceError::AuditLedger(e.to_string()))?;
108
109 self.operation_log
110 .write(OperationLogEntry {
111 decision: IngestDecision::Accepted,
112 device_id: record.device_id,
113 sequence: record.sequence,
114 message: "ingest accepted".to_string(),
115 })
116 .map_err(|e| IngestServiceError::OperationLog(e.to_string()))?;
117
118 Ok(())
119 }
120
121 pub fn raw_data_store(&self) -> &R {
122 &self.raw_data_store
123 }
124
125 pub fn audit_ledger(&self) -> &L {
126 &self.audit_ledger
127 }
128
129 pub fn operation_log(&self) -> &O {
130 &self.operation_log
131 }
132
133 fn log_rejection(&mut self, record: &AuditRecord, reason: &str) {
134 let _ = self.operation_log.write(OperationLogEntry {
135 decision: IngestDecision::Rejected,
136 device_id: record.device_id.clone(),
137 sequence: record.sequence,
138 message: reason.to_string(),
139 });
140 }
141}
142
143#[derive(Debug, Error)]
144#[error("in-memory store error: {message}")]
145pub struct InMemoryStoreError {
146 message: String,
147}
148
149#[derive(Default)]
150pub struct InMemoryRawDataStore {
151 objects: HashMap<String, Vec<u8>>,
152}
153
154impl InMemoryRawDataStore {
155 pub fn get(&self, object_ref: &str) -> Option<&[u8]> {
156 self.objects.get(object_ref).map(Vec::as_slice)
157 }
158}
159
160impl RawDataStore for InMemoryRawDataStore {
161 type Error = InMemoryStoreError;
162
163 fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
164 self.objects.insert(object_ref.to_string(), payload.to_vec());
165 Ok(())
166 }
167}
168
169#[derive(Default)]
170pub struct InMemoryAuditLedger {
171 records: Vec<AuditRecord>,
172}
173
174impl InMemoryAuditLedger {
175 pub fn records(&self) -> &[AuditRecord] {
176 &self.records
177 }
178}
179
180impl AuditLedger for InMemoryAuditLedger {
181 type Error = InMemoryStoreError;
182
183 fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error> {
184 self.records.push(record);
185 Ok(())
186 }
187}
188
189#[derive(Default)]
190pub struct InMemoryOperationLog {
191 entries: Vec<OperationLogEntry>,
192}
193
194impl InMemoryOperationLog {
195 pub fn entries(&self) -> &[OperationLogEntry] {
196 &self.entries
197 }
198}
199
200impl OperationLogStore for InMemoryOperationLog {
201 type Error = InMemoryStoreError;
202
203 fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error> {
204 self.entries.push(entry);
205 Ok(())
206 }
207}
208
209#[cfg(feature = "s3")]
210#[derive(Debug, Clone, PartialEq, Eq)]
211pub enum S3Backend {
212 AwsS3,
213 Minio,
214}
215
216#[cfg(feature = "s3")]
217#[derive(Debug, Clone)]
218pub struct S3ObjectStoreConfig {
219 pub backend: S3Backend,
220 pub bucket: String,
221 pub region: String,
222 pub endpoint: Option<String>,
223 pub access_key_id: Option<String>,
224 pub secret_access_key: Option<String>,
225}
226
227#[cfg(feature = "s3")]
228impl S3ObjectStoreConfig {
229 pub fn for_aws_s3(bucket: impl Into<String>, region: impl Into<String>) -> Self {
230 Self {
231 backend: S3Backend::AwsS3,
232 bucket: bucket.into(),
233 region: region.into(),
234 endpoint: None,
235 access_key_id: None,
236 secret_access_key: None,
237 }
238 }
239
240 pub fn for_minio(
241 bucket: impl Into<String>,
242 region: impl Into<String>,
243 endpoint: impl Into<String>,
244 access_key_id: impl Into<String>,
245 secret_access_key: impl Into<String>,
246 ) -> Self {
247 Self {
248 backend: S3Backend::Minio,
249 bucket: bucket.into(),
250 region: region.into(),
251 endpoint: Some(endpoint.into()),
252 access_key_id: Some(access_key_id.into()),
253 secret_access_key: Some(secret_access_key.into()),
254 }
255 }
256}
257
258#[cfg(feature = "s3")]
259#[derive(Debug, Error)]
260pub enum S3StoreError {
261 #[error("invalid config: {0}")]
262 InvalidConfig(String),
263 #[error("runtime initialization failed: {0}")]
264 Runtime(String),
265 #[error("s3 put failed: {0}")]
266 Put(String),
267}
268
269#[cfg(feature = "s3")]
270pub struct S3CompatibleRawDataStore {
271 runtime: tokio::runtime::Runtime,
272 client: aws_sdk_s3::Client,
273 bucket: String,
274}
275
276#[cfg(feature = "s3")]
277impl S3CompatibleRawDataStore {
278 pub fn new(config: S3ObjectStoreConfig) -> Result<Self, S3StoreError> {
279 use aws_config::BehaviorVersion;
280 use aws_config::Region;
281 use aws_credential_types::Credentials;
282
283 let runtime = tokio::runtime::Runtime::new()
284 .map_err(|e| S3StoreError::Runtime(e.to_string()))?;
285
286 let mut loader = aws_config::defaults(BehaviorVersion::latest())
287 .region(Region::new(config.region.clone()));
288
289 match config.backend {
290 S3Backend::AwsS3 => {
291 if let (Some(access_key_id), Some(secret_access_key)) =
292 (config.access_key_id.clone(), config.secret_access_key.clone())
293 {
294 let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
295 loader = loader.credentials_provider(creds);
296 }
297 }
298 S3Backend::Minio => {
299 let endpoint = config.endpoint.clone().ok_or_else(|| {
300 S3StoreError::InvalidConfig("endpoint is required for MinIO backend".to_string())
301 })?;
302 let access_key_id = config.access_key_id.clone().ok_or_else(|| {
303 S3StoreError::InvalidConfig("access_key_id is required for MinIO backend".to_string())
304 })?;
305 let secret_access_key = config.secret_access_key.clone().ok_or_else(|| {
306 S3StoreError::InvalidConfig(
307 "secret_access_key is required for MinIO backend".to_string(),
308 )
309 })?;
310
311 let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
312 loader = loader.endpoint_url(endpoint).credentials_provider(creds);
313 }
314 }
315
316 let shared = runtime.block_on(loader.load());
317 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&shared);
318
319 if config.backend == S3Backend::Minio {
320 s3_config_builder = s3_config_builder.force_path_style(true);
321 }
322
323 let client = aws_sdk_s3::Client::from_conf(s3_config_builder.build());
324
325 Ok(Self {
326 runtime,
327 client,
328 bucket: config.bucket,
329 })
330 }
331}
332
333#[cfg(feature = "s3")]
334impl RawDataStore for S3CompatibleRawDataStore {
335 type Error = S3StoreError;
336
337 fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
338 let stream = aws_sdk_s3::primitives::ByteStream::from(payload.to_vec());
339 self.runtime
340 .block_on(
341 self.client
342 .put_object()
343 .bucket(&self.bucket)
344 .key(object_ref)
345 .body(stream)
346 .send(),
347 )
348 .map_err(|e| S3StoreError::Put(e.to_string()))?;
349 Ok(())
350 }
351}