Skip to main content

edgesentry_rs/ingest/
storage.rs

1use std::collections::HashMap;
2
3use thiserror::Error;
4use tracing::{debug, error, info, instrument, warn};
5
6use crate::integrity::compute_payload_hash;
7use crate::record::AuditRecord;
8use super::policy::IntegrityPolicyGate;
9use super::verify::IngestError;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum IngestDecision {
13    Accepted,
14    Rejected,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct OperationLogEntry {
19    pub decision: IngestDecision,
20    pub device_id: String,
21    pub sequence: u64,
22    pub message: String,
23}
24
25pub trait RawDataStore {
26    type Error: std::error::Error;
27
28    fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error>;
29}
30
31pub trait AuditLedger {
32    type Error: std::error::Error;
33
34    fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error>;
35}
36
37pub trait OperationLogStore {
38    type Error: std::error::Error;
39
40    fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error>;
41}
42
43#[derive(Debug, Error)]
44pub enum IngestServiceError {
45    #[error("ingest verification failed: {0}")]
46    Verify(#[from] IngestError),
47    #[error("payload hash mismatch for device={device_id} sequence={sequence}")]
48    PayloadHashMismatch { device_id: String, sequence: u64 },
49    #[error("raw data store error: {0}")]
50    RawDataStore(String),
51    #[error("audit ledger error: {0}")]
52    AuditLedger(String),
53    #[error("operation log error: {0}")]
54    OperationLog(String),
55}
56
57pub struct IngestService<R, L, O>
58where
59    R: RawDataStore,
60    L: AuditLedger,
61    O: OperationLogStore,
62{
63    policy: IntegrityPolicyGate,
64    raw_data_store: R,
65    audit_ledger: L,
66    operation_log: O,
67}
68
69impl<R, L, O> IngestService<R, L, O>
70where
71    R: RawDataStore,
72    L: AuditLedger,
73    O: OperationLogStore,
74{
75    pub fn new(policy: IntegrityPolicyGate, raw_data_store: R, audit_ledger: L, operation_log: O) -> Self {
76        Self {
77            policy,
78            raw_data_store,
79            audit_ledger,
80            operation_log,
81        }
82    }
83
84    pub fn register_device(&mut self, device_id: impl Into<String>, key: ed25519_dalek::VerifyingKey) {
85        self.policy.register_device(device_id, key);
86    }
87
88    #[instrument(skip(self, raw_payload), fields(
89        device_id = %record.device_id,
90        sequence  = record.sequence,
91        object_ref = %record.object_ref,
92    ))]
93    pub fn ingest(&mut self, record: AuditRecord, raw_payload: &[u8], cert_identity: Option<&str>) -> Result<(), IngestServiceError> {
94        debug!(payload_bytes = raw_payload.len(), "ingest started");
95
96        let payload_hash = compute_payload_hash(raw_payload);
97        if payload_hash != record.payload_hash {
98            warn!("payload hash mismatch — record rejected");
99            self.log_rejection(&record, "payload hash mismatch");
100            return Err(IngestServiceError::PayloadHashMismatch {
101                device_id: record.device_id,
102                sequence: record.sequence,
103            });
104        }
105
106        if let Err(error) = self.policy.enforce(&record, cert_identity) {
107            warn!(reason = %error, "integrity policy rejected record");
108            self.log_rejection(&record, &error.to_string());
109            return Err(IngestServiceError::Verify(error));
110        }
111
112        self.raw_data_store
113            .put(&record.object_ref, raw_payload)
114            .map_err(|e| {
115                error!(error = %e, "raw data store write failed");
116                IngestServiceError::RawDataStore(e.to_string())
117            })?;
118
119        self.audit_ledger
120            .append(record.clone())
121            .map_err(|e| {
122                error!(error = %e, "audit ledger append failed");
123                IngestServiceError::AuditLedger(e.to_string())
124            })?;
125
126        self.operation_log
127            .write(OperationLogEntry {
128                decision: IngestDecision::Accepted,
129                device_id: record.device_id,
130                sequence: record.sequence,
131                message: "ingest accepted".to_string(),
132            })
133            .map_err(|e| {
134                error!(error = %e, "operation log write failed");
135                IngestServiceError::OperationLog(e.to_string())
136            })?;
137
138        info!("record accepted");
139        Ok(())
140    }
141
142    pub fn raw_data_store(&self) -> &R {
143        &self.raw_data_store
144    }
145
146    pub fn audit_ledger(&self) -> &L {
147        &self.audit_ledger
148    }
149
150    pub fn operation_log(&self) -> &O {
151        &self.operation_log
152    }
153
154    fn log_rejection(&mut self, record: &AuditRecord, reason: &str) {
155        let _ = self.operation_log.write(OperationLogEntry {
156            decision: IngestDecision::Rejected,
157            device_id: record.device_id.clone(),
158            sequence: record.sequence,
159            message: reason.to_string(),
160        });
161    }
162}
163
164#[derive(Debug, Error)]
165#[error("in-memory store error: {message}")]
166pub struct InMemoryStoreError {
167    message: String,
168}
169
170#[derive(Default)]
171pub struct InMemoryRawDataStore {
172    objects: HashMap<String, Vec<u8>>,
173}
174
175impl InMemoryRawDataStore {
176    pub fn get(&self, object_ref: &str) -> Option<&[u8]> {
177        self.objects.get(object_ref).map(Vec::as_slice)
178    }
179}
180
181impl RawDataStore for InMemoryRawDataStore {
182    type Error = InMemoryStoreError;
183
184    fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
185        self.objects.insert(object_ref.to_string(), payload.to_vec());
186        Ok(())
187    }
188}
189
190#[derive(Default)]
191pub struct InMemoryAuditLedger {
192    records: Vec<AuditRecord>,
193}
194
195impl InMemoryAuditLedger {
196    pub fn records(&self) -> &[AuditRecord] {
197        &self.records
198    }
199}
200
201impl AuditLedger for InMemoryAuditLedger {
202    type Error = InMemoryStoreError;
203
204    fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error> {
205        self.records.push(record);
206        Ok(())
207    }
208}
209
210#[derive(Default)]
211pub struct InMemoryOperationLog {
212    entries: Vec<OperationLogEntry>,
213}
214
215impl InMemoryOperationLog {
216    pub fn entries(&self) -> &[OperationLogEntry] {
217        &self.entries
218    }
219}
220
221impl OperationLogStore for InMemoryOperationLog {
222    type Error = InMemoryStoreError;
223
224    fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error> {
225        self.entries.push(entry);
226        Ok(())
227    }
228}
229
230#[cfg(feature = "postgres")]
231#[derive(Debug, Error)]
232pub enum PostgresStoreError {
233    #[error("postgres error: {0}")]
234    Postgres(#[from] ::postgres::Error),
235}
236
237#[cfg(feature = "postgres")]
238pub struct PostgresAuditLedger {
239    client: ::postgres::Client,
240}
241
242#[cfg(feature = "postgres")]
243impl PostgresAuditLedger {
244    pub fn connect(url: &str) -> Result<Self, ::postgres::Error> {
245        let client = ::postgres::Client::connect(url, ::postgres::NoTls)?;
246        Ok(Self { client })
247    }
248}
249
250#[cfg(feature = "postgres")]
251impl AuditLedger for PostgresAuditLedger {
252    type Error = PostgresStoreError;
253
254    fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error> {
255        self.client.execute(
256            "INSERT INTO audit_records \
257             (device_id, sequence, timestamp_ms, payload_hash, signature, prev_record_hash, object_ref) \
258             VALUES ($1, $2, $3, $4, $5, $6, $7)",
259            &[
260                &record.device_id,
261                &(record.sequence as i64),
262                &(record.timestamp_ms as i64),
263                &record.payload_hash.as_ref(),
264                &record.signature.as_ref(),
265                &record.prev_record_hash.as_ref(),
266                &record.object_ref,
267            ],
268        )?;
269        Ok(())
270    }
271}
272
273#[cfg(feature = "postgres")]
274pub struct PostgresOperationLog {
275    client: ::postgres::Client,
276}
277
278#[cfg(feature = "postgres")]
279impl PostgresOperationLog {
280    pub fn connect(url: &str) -> Result<Self, ::postgres::Error> {
281        let client = ::postgres::Client::connect(url, ::postgres::NoTls)?;
282        Ok(Self { client })
283    }
284
285    pub fn reset(&mut self) -> Result<(), ::postgres::Error> {
286        self.client.batch_execute(
287            "TRUNCATE TABLE operation_logs, audit_records RESTART IDENTITY;",
288        )
289    }
290}
291
292#[cfg(feature = "postgres")]
293impl OperationLogStore for PostgresOperationLog {
294    type Error = ::postgres::Error;
295
296    fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error> {
297        let decision = match entry.decision {
298            IngestDecision::Accepted => "Accepted",
299            IngestDecision::Rejected => "Rejected",
300        };
301        self.client.execute(
302            "INSERT INTO operation_logs (decision, device_id, sequence, message) \
303             VALUES ($1, $2, $3, $4)",
304            &[
305                &decision,
306                &entry.device_id,
307                &(entry.sequence as i64),
308                &entry.message,
309            ],
310        )?;
311        Ok(())
312    }
313}
314
315#[cfg(feature = "s3")]
316#[derive(Debug, Clone, PartialEq, Eq)]
317pub enum S3Backend {
318    AwsS3,
319    Minio,
320}
321
322#[cfg(feature = "s3")]
323#[derive(Debug, Clone)]
324pub struct S3ObjectStoreConfig {
325    pub backend: S3Backend,
326    pub bucket: String,
327    pub region: String,
328    pub endpoint: Option<String>,
329    pub access_key_id: Option<String>,
330    pub secret_access_key: Option<String>,
331}
332
333#[cfg(feature = "s3")]
334impl S3ObjectStoreConfig {
335    pub fn for_aws_s3(bucket: impl Into<String>, region: impl Into<String>) -> Self {
336        Self {
337            backend: S3Backend::AwsS3,
338            bucket: bucket.into(),
339            region: region.into(),
340            endpoint: None,
341            access_key_id: None,
342            secret_access_key: None,
343        }
344    }
345
346    pub fn for_minio(
347        bucket: impl Into<String>,
348        region: impl Into<String>,
349        endpoint: impl Into<String>,
350        access_key_id: impl Into<String>,
351        secret_access_key: impl Into<String>,
352    ) -> Self {
353        Self {
354            backend: S3Backend::Minio,
355            bucket: bucket.into(),
356            region: region.into(),
357            endpoint: Some(endpoint.into()),
358            access_key_id: Some(access_key_id.into()),
359            secret_access_key: Some(secret_access_key.into()),
360        }
361    }
362}
363
364#[cfg(feature = "s3")]
365#[derive(Debug, Error)]
366pub enum S3StoreError {
367    #[error("invalid config: {0}")]
368    InvalidConfig(String),
369    #[error("runtime initialization failed: {0}")]
370    Runtime(String),
371    #[error("s3 put failed: {0}")]
372    Put(String),
373}
374
375#[cfg(feature = "s3")]
376pub struct S3CompatibleRawDataStore {
377    runtime: tokio::runtime::Runtime,
378    client: aws_sdk_s3::Client,
379    bucket: String,
380}
381
382#[cfg(feature = "s3")]
383impl S3CompatibleRawDataStore {
384    pub fn new(config: S3ObjectStoreConfig) -> Result<Self, S3StoreError> {
385        use aws_config::BehaviorVersion;
386        use aws_config::Region;
387        use aws_credential_types::Credentials;
388
389        let runtime = tokio::runtime::Runtime::new()
390            .map_err(|e| S3StoreError::Runtime(e.to_string()))?;
391
392        let mut loader = aws_config::defaults(BehaviorVersion::latest())
393            .region(Region::new(config.region.clone()));
394
395        match config.backend {
396            S3Backend::AwsS3 => {
397                if let (Some(access_key_id), Some(secret_access_key)) =
398                    (config.access_key_id.clone(), config.secret_access_key.clone())
399                {
400                    let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
401                    loader = loader.credentials_provider(creds);
402                }
403            }
404            S3Backend::Minio => {
405                let endpoint = config.endpoint.clone().ok_or_else(|| {
406                    S3StoreError::InvalidConfig("endpoint is required for MinIO backend".to_string())
407                })?;
408                let access_key_id = config.access_key_id.clone().ok_or_else(|| {
409                    S3StoreError::InvalidConfig("access_key_id is required for MinIO backend".to_string())
410                })?;
411                let secret_access_key = config.secret_access_key.clone().ok_or_else(|| {
412                    S3StoreError::InvalidConfig(
413                        "secret_access_key is required for MinIO backend".to_string(),
414                    )
415                })?;
416
417                let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
418                loader = loader.endpoint_url(endpoint).credentials_provider(creds);
419            }
420        }
421
422        let shared = runtime.block_on(loader.load());
423        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&shared);
424
425        if config.backend == S3Backend::Minio {
426            s3_config_builder = s3_config_builder.force_path_style(true);
427        }
428
429        let client = aws_sdk_s3::Client::from_conf(s3_config_builder.build());
430
431        Ok(Self {
432            runtime,
433            client,
434            bucket: config.bucket,
435        })
436    }
437}
438
439#[cfg(feature = "s3")]
440impl S3CompatibleRawDataStore {
441    /// Extract the object key from an `object_ref` that may be a full `s3://bucket/key` URI
442    /// or a plain key path.  For example:
443    ///   `s3://bucket/lift-01/1.bin`  →  `lift-01/1.bin`
444    ///   `lift-01/1.bin`              →  `lift-01/1.bin`
445    fn object_key<'a>(&self, object_ref: &'a str) -> &'a str {
446        if let Some(rest) = object_ref.strip_prefix("s3://") {
447            if let Some(slash) = rest.find('/') {
448                return &rest[slash + 1..];
449            }
450        }
451        object_ref
452    }
453}
454
455#[cfg(feature = "s3")]
456impl RawDataStore for S3CompatibleRawDataStore {
457    type Error = S3StoreError;
458
459    fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
460        let key = self.object_key(object_ref);
461        let stream = aws_sdk_s3::primitives::ByteStream::from(payload.to_vec());
462        self.runtime
463            .block_on(
464                self.client
465                    .put_object()
466                    .bucket(&self.bucket)
467                    .key(key)
468                    .body(stream)
469                    .send(),
470            )
471            .map_err(|e| S3StoreError::Put(e.to_string()))?;
472        Ok(())
473    }
474}
475
476// ── Async ingest traits and implementations ───────────────────────────────────
477
478#[cfg(feature = "async-ingest")]
479use std::sync::Arc;
480#[cfg(feature = "async-ingest")]
481use tokio::sync::Mutex as AsyncMutex;
482
483/// Async variant of [`RawDataStore`] for use in async ingest pipelines.
484///
485/// Implementations must be `Send + Sync` so they can be shared across tasks.
486/// The `+ Send` bound on the returned future is required for use with
487/// multi-threaded tokio runtimes.
488#[cfg(feature = "async-ingest")]
489pub trait AsyncRawDataStore: Send + Sync {
490    type Error: std::error::Error + Send;
491
492    fn put(
493        &self,
494        object_ref: &str,
495        payload: &[u8],
496    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
497}
498
499/// Async variant of [`AuditLedger`] for use in async ingest pipelines.
500#[cfg(feature = "async-ingest")]
501pub trait AsyncAuditLedger: Send + Sync {
502    type Error: std::error::Error + Send;
503
504    fn append(
505        &self,
506        record: AuditRecord,
507    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
508}
509
510/// Async variant of [`OperationLogStore`] for use in async ingest pipelines.
511#[cfg(feature = "async-ingest")]
512pub trait AsyncOperationLogStore: Send + Sync {
513    type Error: std::error::Error + Send;
514
515    fn write(
516        &self,
517        entry: OperationLogEntry,
518    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
519}
520
521// ── In-memory async implementations (for testing) ────────────────────────────
522
523/// Async in-memory raw data store backed by a `tokio::sync::Mutex`.
524#[cfg(feature = "async-ingest")]
525#[derive(Default, Clone)]
526pub struct AsyncInMemoryRawDataStore {
527    objects: Arc<AsyncMutex<HashMap<String, Vec<u8>>>>,
528}
529
530#[cfg(feature = "async-ingest")]
531impl AsyncInMemoryRawDataStore {
532    pub async fn get(&self, object_ref: &str) -> Option<Vec<u8>> {
533        self.objects.lock().await.get(object_ref).cloned()
534    }
535}
536
537#[cfg(feature = "async-ingest")]
538impl AsyncRawDataStore for AsyncInMemoryRawDataStore {
539    type Error = InMemoryStoreError;
540
541    async fn put(&self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
542        self.objects
543            .lock()
544            .await
545            .insert(object_ref.to_string(), payload.to_vec());
546        Ok(())
547    }
548}
549
550/// Async in-memory audit ledger backed by a `tokio::sync::Mutex`.
551#[cfg(feature = "async-ingest")]
552#[derive(Default, Clone)]
553pub struct AsyncInMemoryAuditLedger {
554    records: Arc<AsyncMutex<Vec<AuditRecord>>>,
555}
556
557#[cfg(feature = "async-ingest")]
558impl AsyncInMemoryAuditLedger {
559    pub async fn records(&self) -> Vec<AuditRecord> {
560        self.records.lock().await.clone()
561    }
562}
563
564#[cfg(feature = "async-ingest")]
565impl AsyncAuditLedger for AsyncInMemoryAuditLedger {
566    type Error = InMemoryStoreError;
567
568    async fn append(&self, record: AuditRecord) -> Result<(), Self::Error> {
569        self.records.lock().await.push(record);
570        Ok(())
571    }
572}
573
574/// Async in-memory operation log backed by a `tokio::sync::Mutex`.
575#[cfg(feature = "async-ingest")]
576#[derive(Default, Clone)]
577pub struct AsyncInMemoryOperationLog {
578    entries: Arc<AsyncMutex<Vec<OperationLogEntry>>>,
579}
580
581#[cfg(feature = "async-ingest")]
582impl AsyncInMemoryOperationLog {
583    pub async fn entries(&self) -> Vec<OperationLogEntry> {
584        self.entries.lock().await.clone()
585    }
586}
587
588#[cfg(feature = "async-ingest")]
589impl AsyncOperationLogStore for AsyncInMemoryOperationLog {
590    type Error = InMemoryStoreError;
591
592    async fn write(&self, entry: OperationLogEntry) -> Result<(), Self::Error> {
593        self.entries.lock().await.push(entry);
594        Ok(())
595    }
596}
597
598// ── S3 async implementation ───────────────────────────────────────────────────
599
600/// `S3CompatibleRawDataStore` also implements `AsyncRawDataStore` when both
601/// `s3` and `async-ingest` features are active.  The async path calls `.await`
602/// directly on the SDK future, bypassing the embedded synchronous runtime.
603#[cfg(all(feature = "s3", feature = "async-ingest"))]
604impl AsyncRawDataStore for S3CompatibleRawDataStore {
605    type Error = S3StoreError;
606
607    async fn put(&self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
608        let key = self.object_key(object_ref);
609        let stream = aws_sdk_s3::primitives::ByteStream::from(payload.to_vec());
610        self.client
611            .put_object()
612            .bucket(&self.bucket)
613            .key(key)
614            .body(stream)
615            .send()
616            .await
617            .map_err(|e| S3StoreError::Put(e.to_string()))?;
618        Ok(())
619    }
620}
621
622// ── AsyncIngestService ────────────────────────────────────────────────────────
623
624/// Async orchestration service for tamper-evident record ingestion.
625///
626/// Drop-in async counterpart to [`IngestService`].  All storage calls are
627/// `await`-ed so the calling thread is never blocked.  The policy gate is
628/// wrapped in a `tokio::sync::Mutex` to allow `&self` on `ingest`, enabling
629/// the service to be shared across tasks via `Arc`.
630#[cfg(feature = "async-ingest")]
631pub struct AsyncIngestService<R, L, O>
632where
633    R: AsyncRawDataStore,
634    L: AsyncAuditLedger,
635    O: AsyncOperationLogStore,
636{
637    policy: AsyncMutex<super::policy::IntegrityPolicyGate>,
638    raw_data_store: R,
639    audit_ledger: L,
640    operation_log: O,
641}
642
643#[cfg(feature = "async-ingest")]
644impl<R, L, O> AsyncIngestService<R, L, O>
645where
646    R: AsyncRawDataStore,
647    L: AsyncAuditLedger,
648    O: AsyncOperationLogStore,
649{
650    pub fn new(
651        policy: super::policy::IntegrityPolicyGate,
652        raw_data_store: R,
653        audit_ledger: L,
654        operation_log: O,
655    ) -> Self {
656        Self {
657            policy: AsyncMutex::new(policy),
658            raw_data_store,
659            audit_ledger,
660            operation_log,
661        }
662    }
663
664    pub async fn register_device(
665        &self,
666        device_id: impl Into<String>,
667        key: ed25519_dalek::VerifyingKey,
668    ) {
669        self.policy.lock().await.register_device(device_id, key);
670    }
671
672    #[tracing::instrument(skip(self, raw_payload), fields(
673        device_id = %record.device_id,
674        sequence  = record.sequence,
675        object_ref = %record.object_ref,
676    ))]
677    pub async fn ingest(
678        &self,
679        record: AuditRecord,
680        raw_payload: &[u8],
681        cert_identity: Option<&str>,
682    ) -> Result<(), IngestServiceError> {
683        debug!(payload_bytes = raw_payload.len(), "async ingest started");
684
685        let payload_hash = compute_payload_hash(raw_payload);
686        if payload_hash != record.payload_hash {
687            warn!("payload hash mismatch — record rejected");
688            let _ = self
689                .operation_log
690                .write(OperationLogEntry {
691                    decision: IngestDecision::Rejected,
692                    device_id: record.device_id.clone(),
693                    sequence: record.sequence,
694                    message: "payload hash mismatch".to_string(),
695                })
696                .await;
697            return Err(IngestServiceError::PayloadHashMismatch {
698                device_id: record.device_id,
699                sequence: record.sequence,
700            });
701        }
702
703        // Acquire the policy lock, run the check, then release before any I/O.
704        let policy_result = {
705            let mut policy = self.policy.lock().await;
706            policy.enforce(&record, cert_identity)
707        };
708        if let Err(error) = policy_result {
709            warn!(reason = %error, "integrity policy rejected record");
710            let _ = self
711                .operation_log
712                .write(OperationLogEntry {
713                    decision: IngestDecision::Rejected,
714                    device_id: record.device_id.clone(),
715                    sequence: record.sequence,
716                    message: error.to_string(),
717                })
718                .await;
719            return Err(IngestServiceError::Verify(error));
720        }
721
722        self.raw_data_store
723            .put(&record.object_ref, raw_payload)
724            .await
725            .map_err(|e| {
726                error!(error = %e, "raw data store write failed");
727                IngestServiceError::RawDataStore(e.to_string())
728            })?;
729
730        self.audit_ledger
731            .append(record.clone())
732            .await
733            .map_err(|e| {
734                error!(error = %e, "audit ledger append failed");
735                IngestServiceError::AuditLedger(e.to_string())
736            })?;
737
738        self.operation_log
739            .write(OperationLogEntry {
740                decision: IngestDecision::Accepted,
741                device_id: record.device_id,
742                sequence: record.sequence,
743                message: "ingest accepted".to_string(),
744            })
745            .await
746            .map_err(|e| {
747                error!(error = %e, "operation log write failed");
748                IngestServiceError::OperationLog(e.to_string())
749            })?;
750
751        info!("record accepted");
752        Ok(())
753    }
754
755    pub fn raw_data_store(&self) -> &R {
756        &self.raw_data_store
757    }
758
759    pub fn audit_ledger(&self) -> &L {
760        &self.audit_ledger
761    }
762
763    pub fn operation_log(&self) -> &O {
764        &self.operation_log
765    }
766}