immutable-trace 0.1.7

Tamper-evident immutable audit trace: signing, verification, ingestion and CLI
Documentation
use std::collections::HashMap;

use thiserror::Error;

use crate::crypto::compute_payload_hash;
use crate::record::AuditRecord;
use super::verify::{IngestError, IngestState};

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IngestDecision {
    Accepted,
    Rejected,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OperationLogEntry {
    pub decision: IngestDecision,
    pub device_id: String,
    pub sequence: u64,
    pub message: String,
}

pub trait RawDataStore {
    type Error: std::error::Error;

    fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error>;
}

pub trait AuditLedger {
    type Error: std::error::Error;

    fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error>;
}

pub trait OperationLogStore {
    type Error: std::error::Error;

    fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error>;
}

#[derive(Debug, Error)]
pub enum IngestServiceError {
    #[error("ingest verification failed: {0}")]
    Verify(#[from] IngestError),
    #[error("payload hash mismatch for device={device_id} sequence={sequence}")]
    PayloadHashMismatch { device_id: String, sequence: u64 },
    #[error("raw data store error: {0}")]
    RawDataStore(String),
    #[error("audit ledger error: {0}")]
    AuditLedger(String),
    #[error("operation log error: {0}")]
    OperationLog(String),
}

pub struct IngestService<R, L, O>
where
    R: RawDataStore,
    L: AuditLedger,
    O: OperationLogStore,
{
    verifier: IngestState,
    raw_data_store: R,
    audit_ledger: L,
    operation_log: O,
}

impl<R, L, O> IngestService<R, L, O>
where
    R: RawDataStore,
    L: AuditLedger,
    O: OperationLogStore,
{
    pub fn new(verifier: IngestState, raw_data_store: R, audit_ledger: L, operation_log: O) -> Self {
        Self {
            verifier,
            raw_data_store,
            audit_ledger,
            operation_log,
        }
    }

    pub fn register_device(&mut self, device_id: impl Into<String>, key: ed25519_dalek::VerifyingKey) {
        self.verifier.register_device(device_id, key);
    }

    pub fn ingest(&mut self, record: AuditRecord, raw_payload: &[u8]) -> Result<(), IngestServiceError> {
        let payload_hash = compute_payload_hash(raw_payload);
        if payload_hash != record.payload_hash {
            self.log_rejection(&record, "payload hash mismatch");
            return Err(IngestServiceError::PayloadHashMismatch {
                device_id: record.device_id,
                sequence: record.sequence,
            });
        }

        if let Err(error) = self.verifier.verify_and_accept(&record) {
            self.log_rejection(&record, &error.to_string());
            return Err(IngestServiceError::Verify(error));
        }

        self.raw_data_store
            .put(&record.object_ref, raw_payload)
            .map_err(|e| IngestServiceError::RawDataStore(e.to_string()))?;

        self.audit_ledger
            .append(record.clone())
            .map_err(|e| IngestServiceError::AuditLedger(e.to_string()))?;

        self.operation_log
            .write(OperationLogEntry {
                decision: IngestDecision::Accepted,
                device_id: record.device_id,
                sequence: record.sequence,
                message: "ingest accepted".to_string(),
            })
            .map_err(|e| IngestServiceError::OperationLog(e.to_string()))?;

        Ok(())
    }

    pub fn raw_data_store(&self) -> &R {
        &self.raw_data_store
    }

    pub fn audit_ledger(&self) -> &L {
        &self.audit_ledger
    }

    pub fn operation_log(&self) -> &O {
        &self.operation_log
    }

    fn log_rejection(&mut self, record: &AuditRecord, reason: &str) {
        let _ = self.operation_log.write(OperationLogEntry {
            decision: IngestDecision::Rejected,
            device_id: record.device_id.clone(),
            sequence: record.sequence,
            message: reason.to_string(),
        });
    }
}

#[derive(Debug, Error)]
#[error("in-memory store error: {message}")]
pub struct InMemoryStoreError {
    message: String,
}

#[derive(Default)]
pub struct InMemoryRawDataStore {
    objects: HashMap<String, Vec<u8>>,
}

impl InMemoryRawDataStore {
    pub fn get(&self, object_ref: &str) -> Option<&[u8]> {
        self.objects.get(object_ref).map(Vec::as_slice)
    }
}

impl RawDataStore for InMemoryRawDataStore {
    type Error = InMemoryStoreError;

    fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
        self.objects.insert(object_ref.to_string(), payload.to_vec());
        Ok(())
    }
}

#[derive(Default)]
pub struct InMemoryAuditLedger {
    records: Vec<AuditRecord>,
}

impl InMemoryAuditLedger {
    pub fn records(&self) -> &[AuditRecord] {
        &self.records
    }
}

impl AuditLedger for InMemoryAuditLedger {
    type Error = InMemoryStoreError;

    fn append(&mut self, record: AuditRecord) -> Result<(), Self::Error> {
        self.records.push(record);
        Ok(())
    }
}

#[derive(Default)]
pub struct InMemoryOperationLog {
    entries: Vec<OperationLogEntry>,
}

impl InMemoryOperationLog {
    pub fn entries(&self) -> &[OperationLogEntry] {
        &self.entries
    }
}

impl OperationLogStore for InMemoryOperationLog {
    type Error = InMemoryStoreError;

    fn write(&mut self, entry: OperationLogEntry) -> Result<(), Self::Error> {
        self.entries.push(entry);
        Ok(())
    }
}

#[cfg(feature = "s3")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum S3Backend {
    AwsS3,
    Minio,
}

#[cfg(feature = "s3")]
#[derive(Debug, Clone)]
pub struct S3ObjectStoreConfig {
    pub backend: S3Backend,
    pub bucket: String,
    pub region: String,
    pub endpoint: Option<String>,
    pub access_key_id: Option<String>,
    pub secret_access_key: Option<String>,
}

#[cfg(feature = "s3")]
impl S3ObjectStoreConfig {
    pub fn for_aws_s3(bucket: impl Into<String>, region: impl Into<String>) -> Self {
        Self {
            backend: S3Backend::AwsS3,
            bucket: bucket.into(),
            region: region.into(),
            endpoint: None,
            access_key_id: None,
            secret_access_key: None,
        }
    }

    pub fn for_minio(
        bucket: impl Into<String>,
        region: impl Into<String>,
        endpoint: impl Into<String>,
        access_key_id: impl Into<String>,
        secret_access_key: impl Into<String>,
    ) -> Self {
        Self {
            backend: S3Backend::Minio,
            bucket: bucket.into(),
            region: region.into(),
            endpoint: Some(endpoint.into()),
            access_key_id: Some(access_key_id.into()),
            secret_access_key: Some(secret_access_key.into()),
        }
    }
}

#[cfg(feature = "s3")]
#[derive(Debug, Error)]
pub enum S3StoreError {
    #[error("invalid config: {0}")]
    InvalidConfig(String),
    #[error("runtime initialization failed: {0}")]
    Runtime(String),
    #[error("s3 put failed: {0}")]
    Put(String),
}

#[cfg(feature = "s3")]
pub struct S3CompatibleRawDataStore {
    runtime: tokio::runtime::Runtime,
    client: aws_sdk_s3::Client,
    bucket: String,
}

#[cfg(feature = "s3")]
impl S3CompatibleRawDataStore {
    pub fn new(config: S3ObjectStoreConfig) -> Result<Self, S3StoreError> {
        use aws_config::BehaviorVersion;
        use aws_config::Region;
        use aws_credential_types::Credentials;

        let runtime = tokio::runtime::Runtime::new()
            .map_err(|e| S3StoreError::Runtime(e.to_string()))?;

        let mut loader = aws_config::defaults(BehaviorVersion::latest())
            .region(Region::new(config.region.clone()));

        match config.backend {
            S3Backend::AwsS3 => {
                if let (Some(access_key_id), Some(secret_access_key)) =
                    (config.access_key_id.clone(), config.secret_access_key.clone())
                {
                    let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
                    loader = loader.credentials_provider(creds);
                }
            }
            S3Backend::Minio => {
                let endpoint = config.endpoint.clone().ok_or_else(|| {
                    S3StoreError::InvalidConfig("endpoint is required for MinIO backend".to_string())
                })?;
                let access_key_id = config.access_key_id.clone().ok_or_else(|| {
                    S3StoreError::InvalidConfig("access_key_id is required for MinIO backend".to_string())
                })?;
                let secret_access_key = config.secret_access_key.clone().ok_or_else(|| {
                    S3StoreError::InvalidConfig(
                        "secret_access_key is required for MinIO backend".to_string(),
                    )
                })?;

                let creds = Credentials::new(access_key_id, secret_access_key, None, None, "static");
                loader = loader.endpoint_url(endpoint).credentials_provider(creds);
            }
        }

        let shared = runtime.block_on(loader.load());
        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&shared);

        if config.backend == S3Backend::Minio {
            s3_config_builder = s3_config_builder.force_path_style(true);
        }

        let client = aws_sdk_s3::Client::from_conf(s3_config_builder.build());

        Ok(Self {
            runtime,
            client,
            bucket: config.bucket,
        })
    }
}

#[cfg(feature = "s3")]
impl RawDataStore for S3CompatibleRawDataStore {
    type Error = S3StoreError;

    fn put(&mut self, object_ref: &str, payload: &[u8]) -> Result<(), Self::Error> {
        let stream = aws_sdk_s3::primitives::ByteStream::from(payload.to_vec());
        self.runtime
            .block_on(
                self.client
                    .put_object()
                    .bucket(&self.bucket)
                    .key(object_ref)
                    .body(stream)
                    .send(),
            )
            .map_err(|e| S3StoreError::Put(e.to_string()))?;
        Ok(())
    }
}