coil-storage 0.1.0

Object storage primitives for the Coil framework.
Documentation
use std::fmt;
use std::path::{Path, PathBuf};

use thiserror::Error;

use crate::{StorageBackendKind, StoragePlan, StorageTopology, WriteTarget};

mod config;
mod local;
mod object_store;

#[cfg(test)]
mod tests;

pub use config::{ObjectStoreClientConfig, ObjectStoreClientConfigError, ObjectStoreCredentials};
use local::LocalDiskStorageClient;
pub use object_store::{S3CompatibleObjectStoreClient, SignedObjectUrl};

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StorageDeliveryLocation {
    PublicCdn {
        public_url: String,
        object_key: String,
    },
    SignedObject {
        object_key: String,
        signed_url: String,
        expires_at_unix_seconds: u64,
    },
    AppProxy {
        path: String,
    },
    LocalPath {
        path: PathBuf,
    },
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StorageWriteReceipt {
    pub target: WriteTarget,
    pub path: PathBuf,
    pub bytes_written: u64,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StorageReadReceipt {
    pub target: WriteTarget,
    pub path: PathBuf,
    pub bytes: Vec<u8>,
    pub bytes_read: u64,
}

pub trait ObjectStoreClient: fmt::Debug + Send + Sync {
    fn put(
        &self,
        object_key: &str,
        bytes: &[u8],
        content_type: Option<&str>,
    ) -> Result<PathBuf, StorageExecutionError>;
    fn get(&self, object_key: &str) -> Result<(PathBuf, Vec<u8>), StorageExecutionError>;
    fn signed_get_url(&self, object_key: &str) -> Result<SignedObjectUrl, StorageExecutionError>;
}

impl ObjectStoreClient for S3CompatibleObjectStoreClient {
    fn put(
        &self,
        object_key: &str,
        bytes: &[u8],
        content_type: Option<&str>,
    ) -> Result<PathBuf, StorageExecutionError> {
        self.put(object_key, bytes, content_type)
    }

    fn get(&self, object_key: &str) -> Result<(PathBuf, Vec<u8>), StorageExecutionError> {
        self.get(object_key)
    }

    fn signed_get_url(&self, object_key: &str) -> Result<SignedObjectUrl, StorageExecutionError> {
        self.signed_get_url(object_key)
    }
}

#[derive(Debug, Clone)]
pub struct StorageExecutor {
    local_client: LocalDiskStorageClient,
    object_store_required: bool,
    object_store: Option<S3CompatibleObjectStoreClient>,
}

impl StorageExecutor {
    pub fn from_topology(topology: &StorageTopology) -> Self {
        Self::from_topology_and_object_store(topology, None)
    }

    pub fn from_topology_and_object_store(
        topology: &StorageTopology,
        object_store: Option<ObjectStoreClientConfig>,
    ) -> Self {
        let local_root = PathBuf::from(&topology.local_root);
        let object_store_required = topology.object_store.is_some();
        let object_store = topology
            .object_store
            .as_ref()
            .and_then(|_| object_store.map(S3CompatibleObjectStoreClient::new));

        Self {
            local_client: LocalDiskStorageClient::new(local_root),
            object_store_required,
            object_store,
        }
    }

    pub fn execute_write(
        &self,
        plan: &StoragePlan,
        bytes: impl AsRef<[u8]>,
    ) -> Result<StorageWriteReceipt, StorageExecutionError> {
        self.execute_write_with_content_type(plan, bytes, None)
    }

    pub fn execute_write_with_content_type(
        &self,
        plan: &StoragePlan,
        bytes: impl AsRef<[u8]>,
        content_type: Option<&str>,
    ) -> Result<StorageWriteReceipt, StorageExecutionError> {
        let bytes = bytes.as_ref();
        let target = plan.primary_write_target().cloned().ok_or_else(|| {
            StorageExecutionError::MissingPrimaryWriteTarget {
                logical_path: plan.logical_path.clone(),
            }
        })?;

        let path = match target.backend {
            StorageBackendKind::LocalDisk => {
                let path = plan.local_path.as_ref().ok_or_else(|| {
                    StorageExecutionError::MissingLocalPath {
                        logical_path: plan.logical_path.clone(),
                    }
                })?;
                self.local_client.write(Path::new(path), bytes)?
            }
            StorageBackendKind::S3Compatible => {
                let object_key = plan.object_key.as_deref().ok_or_else(|| {
                    StorageExecutionError::MissingObjectKey {
                        logical_path: plan.logical_path.clone(),
                    }
                })?;
                self.object_store_client(&plan.logical_path)?.put(
                    object_key,
                    bytes,
                    content_type,
                )?
            }
        };

        Ok(StorageWriteReceipt {
            target,
            path,
            bytes_written: bytes.len() as u64,
        })
    }

    pub fn execute_read(
        &self,
        plan: &StoragePlan,
    ) -> Result<StorageReadReceipt, StorageExecutionError> {
        let target = plan.primary_write_target().cloned().ok_or_else(|| {
            StorageExecutionError::MissingPrimaryWriteTarget {
                logical_path: plan.logical_path.clone(),
            }
        })?;

        let (path, bytes) = match target.backend {
            StorageBackendKind::LocalDisk => {
                let path = plan.local_path.as_ref().ok_or_else(|| {
                    StorageExecutionError::MissingLocalPath {
                        logical_path: plan.logical_path.clone(),
                    }
                })?;
                self.local_client.read(Path::new(path))?
            }
            StorageBackendKind::S3Compatible => {
                let object_key = plan.object_key.as_deref().ok_or_else(|| {
                    StorageExecutionError::MissingObjectKey {
                        logical_path: plan.logical_path.clone(),
                    }
                })?;
                self.object_store_client(&plan.logical_path)?
                    .get(object_key)?
            }
        };

        Ok(StorageReadReceipt {
            target,
            path,
            bytes_read: bytes.len() as u64,
            bytes,
        })
    }

    pub fn delivery_location(
        &self,
        plan: &StoragePlan,
        cdn_base_url: Option<&str>,
    ) -> Result<StorageDeliveryLocation, StorageExecutionError> {
        match plan.policy.delivery_mode {
            crate::DeliveryMode::PublicCdn => {
                let object_key = plan.object_key.as_deref().ok_or_else(|| {
                    StorageExecutionError::MissingObjectKey {
                        logical_path: plan.logical_path.clone(),
                    }
                })?;
                let cdn_base_url =
                    cdn_base_url.ok_or_else(|| StorageExecutionError::MissingCdnBaseUrl {
                        logical_path: plan.logical_path.clone(),
                    })?;
                Ok(StorageDeliveryLocation::PublicCdn {
                    public_url: join_base_url(cdn_base_url, object_key),
                    object_key: object_key.to_string(),
                })
            }
            crate::DeliveryMode::SignedUrl => {
                let object_key = plan.object_key.as_deref().ok_or_else(|| {
                    StorageExecutionError::MissingObjectKey {
                        logical_path: plan.logical_path.clone(),
                    }
                })?;
                let signed = self
                    .object_store_client(&plan.logical_path)?
                    .signed_get_url(object_key)?;
                Ok(StorageDeliveryLocation::SignedObject {
                    object_key: signed.object_key,
                    signed_url: signed.signed_url,
                    expires_at_unix_seconds: signed.expires_at_unix_seconds,
                })
            }
            crate::DeliveryMode::AppProxy => Ok(StorageDeliveryLocation::AppProxy {
                path: plan.logical_path.clone(),
            }),
            crate::DeliveryMode::LocalOnly => {
                let path = plan
                    .local_path
                    .as_ref()
                    .ok_or_else(|| StorageExecutionError::MissingLocalPath {
                        logical_path: plan.logical_path.clone(),
                    })?
                    .clone();
                Ok(StorageDeliveryLocation::LocalPath {
                    path: PathBuf::from(path),
                })
            }
        }
    }

    fn object_store_client(
        &self,
        logical_path: &str,
    ) -> Result<&dyn ObjectStoreClient, StorageExecutionError> {
        self.object_store
            .as_ref()
            .map(|client| client as &dyn ObjectStoreClient)
            .ok_or_else(|| {
                if self.object_store_required {
                    StorageExecutionError::MissingObjectStoreConfiguration {
                        logical_path: logical_path.to_string(),
                    }
                } else {
                    StorageExecutionError::MissingObjectStoreBackend {
                        logical_path: logical_path.to_string(),
                    }
                }
            })
    }
}

fn join_base_url(base_url: &str, object_key: &str) -> String {
    format!(
        "{}/{}",
        base_url.trim_end_matches('/'),
        object_key.trim_start_matches('/')
    )
}

#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum StorageExecutionError {
    #[error("storage plan for `{logical_path}` does not provide a primary write target")]
    MissingPrimaryWriteTarget { logical_path: String },
    #[error("storage plan for `{logical_path}` requires a local path")]
    MissingLocalPath { logical_path: String },
    #[error("storage plan for `{logical_path}` requires an object key")]
    MissingObjectKey { logical_path: String },
    #[error("storage plan for `{logical_path}` requires an object-store backend")]
    MissingObjectStoreBackend { logical_path: String },
    #[error("storage plan for `{logical_path}` requires object-store client configuration")]
    MissingObjectStoreConfiguration { logical_path: String },
    #[error("storage plan for `{logical_path}` requires a configured object-store endpoint")]
    MissingObjectStoreEndpoint { logical_path: String },
    #[error("storage plan for `{logical_path}` requires `cdn_base_url` to resolve public delivery")]
    MissingCdnBaseUrl { logical_path: String },
    #[error("object-store configuration is invalid: {detail}")]
    InvalidObjectStoreConfiguration { detail: String },
    #[error("storage path `{path}` is outside the configured storage root")]
    InvalidTargetPath { path: String },
    #[error("failed to read storage path `{path}`: {message}")]
    ReadFailed { path: String, message: String },
    #[error("failed to generate a signed URL for `{object_key}`: {message}")]
    SignedUrlGenerationFailed { object_key: String, message: String },
    #[error("failed to write storage path `{path}`: {message}")]
    WriteFailed { path: String, message: String },
}