1use std::collections::HashMap;
2
3use ledger_core::{compute_payload_hash, AuditRecord};
4use thiserror::Error;
5
6use crate::{IngestError, IngestState};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum IngestDecision {
10 Accepted,
11 Rejected,
12}
13
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct OperationLogEntry {
16 pub decision: IngestDecision,
17 pub device_id: String,
18 pub sequence: u64,
19 pub message: String,
20}
21
22pub trait RawDataStore {
23 type Error: std::error::Error;
24
25 fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error>;
26}
27
28pub trait AuditLedger {
29 type Error: std::error::Error;
30
31 fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error>;
32}
33
34pub trait OperationLogStore {
35 type Error: std::error::Error;
36
37 fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error>;
38}
39
40#[derive(Debug, Error)]
41pub enum IngestServiceError {
42 #[error("ingest verification failed: {0}")]
43 Verify(#[from] IngestError),
44 #[error("payload hash mismatch for device={device_id} sequence={sequence}")]
45 PayloadHashMismatch { device_id: String, sequence: u64 },
46 #[error("raw data store error: {0}")]
47 RawDataStore(String),
48 #[error("audit ledger error: {0}")]
49 AuditLedger(String),
50 #[error("operation log error: {0}")]
51 OperationLog(String),
52}
53
54pub struct IngestService<R, L, O>
55where
56 R: RawDataStore,
57 L: AuditLedger,
58 O: OperationLogStore,
59{
60 verifier: IngestState,
61 raw_data_store: R,
62 audit_ledger: L,
63 operation_log: O,
64}
65
66impl<R, L, O> IngestService<R, L, O>
67where
68 R: RawDataStore,
69 L: AuditLedger,
70 O: OperationLogStore,
71{
72 pub fn new(verifier: IngestState, raw_data_store: R, audit_ledger: L, operation_log: O) -> Self {
73 Self {
74 verifier,
75 raw_data_store,
76 audit_ledger,
77 operation_log,
78 }
79 }
80
81 pub fn register_device(&mut self, device_id: impl Into<String>, key: ed25519_dalek::VerifyingKey) {
82 self.verifier.register_device(device_id, key);
83 }
84
85 pub fn ingest(&mut self, record: AuditRecord, raw_payload: &[u8]) -> Result<(), IngestServiceError> {
86 let payload_hash = compute_payload_hash(raw_payload);
87 if payload_hash != record.payload_hash {
88 self.log_rejection(&record, "payload hash mismatch");
89 return Err(IngestServiceError::PayloadHashMismatch {
90 device_id: record.device_id,
91 sequence: record.sequence,
92 });
93 }
94
95 if let Err(error) = self.verifier.verify_and_accept(&record) {
96 self.log_rejection(&record, &error.to_string());
97 return Err(IngestServiceError::Verify(error));
98 }
99
100 self.raw_data_store
101 .put(&record.object_ref, raw_payload)
102 .map_err(|e| IngestServiceError::RawDataStore(e.to_string()))?;
103
104 self.audit_ledger
105 .append(record.clone())
106 .map_err(|e| IngestServiceError::AuditLedger(e.to_string()))?;
107
108 self.operation_log
109 .write(OperationLogEntry {
110 decision: IngestDecision::Accepted,
111 device_id: record.device_id,
112 sequence: record.sequence,
113 message: "ingest accepted".to_string(),
114 })
115 .map_err(|e| IngestServiceError::OperationLog(e.to_string()))?;
116
117 Ok(())
118 }
119
120 pub fn raw_data_store(&self) -> &R {
121 &self.raw_data_store
122 }
123
124 pub fn audit_ledger(&self) -> &L {
125 &self.audit_ledger
126 }
127
128 pub fn operation_log(&self) -> &O {
129 &self.operation_log
130 }
131
132 fn log_rejection(&mut self, record: &AuditRecord, reason: &str) {
133 let _ = self.operation_log.write(OperationLogEntry {
134 decision: IngestDecision::Rejected,
135 device_id: record.device_id.clone(),
136 sequence: record.sequence,
137 message: reason.to_string(),
138 });
139 }
140}
141
142#[derive(Debug, Error)]
143#[error("in-memory store error: {message}")]
144pub struct InMemoryStoreError {
145 message: String,
146}
147
148#[derive(Default)]
149pub struct InMemoryRawDataStore {
150 objects: HashMap<String, Vec<u8>>,
151}
152
153impl InMemoryRawDataStore {
154 pub fn get(&self, object_ref: &str) -> Option<&[u8]> {
155 self.objects.get(object_ref).map(Vec::as_slice)
156 }
157}
158
159impl RawDataStore for InMemoryRawDataStore {
160 type Error = InMemoryStoreError;
161
162 fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
163 self.objects.insert(object_ref.to_string(), payload.to_vec());
164 Ok(())
165 }
166}
167
168#[derive(Default)]
169pub struct InMemoryAuditLedger {
170 records: Vec<AuditRecord>,
171}
172
173impl InMemoryAuditLedger {
174 pub fn records(&self) -> &[AuditRecord] {
175 &self.records
176 }
177}
178
179impl AuditLedger for InMemoryAuditLedger {
180 type Error = InMemoryStoreError;
181
182 fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error> {
183 self.records.push(record);
184 Ok(())
185 }
186}
187
188#[derive(Default)]
189pub struct InMemoryOperationLog {
190 entries: Vec<OperationLogEntry>,
191}
192
193impl InMemoryOperationLog {
194 pub fn entries(&self) -> &[OperationLogEntry] {
195 &self.entries
196 }
197}
198
199impl OperationLogStore for InMemoryOperationLog {
200 type Error = InMemoryStoreError;
201
202 fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error> {
203 self.entries.push(entry);
204 Ok(())
205 }
206}
207
208#[cfg(feature = "s3")]
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub enum S3Backend {
211 AwsS3,
212 Minio,
213}
214
215#[cfg(feature = "s3")]
216#[derive(Debug, Clone)]
217pub struct S3ObjectStoreConfig {
218 pub backend: S3Backend,
219 pub bucket: String,
220 pub region: String,
221 pub endpoint: Option<String>,
222 pub access_key_id: Option<String>,
223 pub secret_access_key: Option<String>,
224}
225
226#[cfg(feature = "s3")]
227impl S3ObjectStoreConfig {
228 pub fn for_aws_s3(bucket: impl Into<String>, region: impl Into<String>) -> Self {
229 Self {
230 backend: S3Backend::AwsS3,
231 bucket: bucket.into(),
232 region: region.into(),
233 endpoint: None,
234 access_key_id: None,
235 secret_access_key: None,
236 }
237 }
238
239 pub fn for_minio(
240 bucket: impl Into<String>,
241 region: impl Into<String>,
242 endpoint: impl Into<String>,
243 access_key_id: impl Into<String>,
244 secret_access_key: impl Into<String>,
245 ) -> Self {
246 Self {
247 backend: S3Backend::Minio,
248 bucket: bucket.into(),
249 region: region.into(),
250 endpoint: Some(endpoint.into()),
251 access_key_id: Some(access_key_id.into()),
252 secret_access_key: Some(secret_access_key.into()),
253 }
254 }
255}
256
257#[cfg(feature = "s3")]
258#[derive(Debug, Error)]
259pub enum S3StoreError {
260 #[error("invalid config: {0}")]
261 InvalidConfig(String),
262 #[error("runtime initialization failed: {0}")]
263 Runtime(String),
264 #[error("s3 put failed: {0}")]
265 Put(String),
266}
267
268#[cfg(feature = "s3")]
269pub struct S3CompatibleRawDataStore {
270 runtime: tokio::runtime::Runtime,
271 client: aws_sdk_s3::Client,
272 bucket: String,
273}
274
275#[cfg(feature = "s3")]
276impl S3CompatibleRawDataStore {
277 pub fn new(config: S3ObjectStoreConfig) -> Result<Self, S3StoreError> {
278 use aws_config::BehaviorVersion;
279 use aws_config::Region;
280 use aws_credential_types::Credentials;
281
282 let runtime = tokio::runtime::Runtime::new()
283 .map_err(|e| S3StoreError::Runtime(e.to_string()))?;
284
285 let mut loader = aws_config::defaults(BehaviorVersion::latest())
286 .region(Region::new(config.region.clone()));
287
288 match config.backend {
289 S3Backend::AwsS3 => {
290 if let (Some(access_key_id), Some(secret_access_key)) =
291 (config.access_key_id.clone(), config.secret_access_key.clone())
292 {
293 let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
294 loader = loader.credentials_provider(creds);
295 }
296 }
297 S3Backend::Minio => {
298 let endpoint = config.endpoint.clone().ok_or_else(|| {
299 S3StoreError::InvalidConfig("endpoint is required for MinIO backend".to_string())
300 })?;
301 let access_key_id = config.access_key_id.clone().ok_or_else(|| {
302 S3StoreError::InvalidConfig("access_key_id is required for MinIO backend".to_string())
303 })?;
304 let secret_access_key = config.secret_access_key.clone().ok_or_else(|| {
305 S3StoreError::InvalidConfig(
306 "secret_access_key is required for MinIO backend".to_string(),
307 )
308 })?;
309
310 let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
311 loader = loader.endpoint_url(endpoint).credentials_provider(creds);
312 }
313 }
314
315 let shared = runtime.block_on(loader.load());
316 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&shared);
317
318 if config.backend == S3Backend::Minio {
319 s3_config_builder = s3_config_builder.force_path_style(true);
320 }
321
322 let client = aws_sdk_s3::Client::from_conf(s3_config_builder.build());
323
324 Ok(Self {
325 runtime,
326 client,
327 bucket: config.bucket,
328 })
329 }
330}
331
332#[cfg(feature = "s3")]
333impl RawDataStore for S3CompatibleRawDataStore {
334 type Error = S3StoreError;
335
336 fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
337 let stream = aws_sdk_s3::primitives::ByteStream::from(payload.to_vec());
338 self.runtime
339 .block_on(
340 self.client
341 .put_object()
342 .bucket(&self.bucket)
343 .key(object_ref)
344 .body(stream)
345 .send(),
346 )
347 .map_err(|e| S3StoreError::Put(e.to_string()))?;
348 Ok(())
349 }
350}