coil-storage 0.1.1

Object storage primitives for the Coil framework.
Documentation
use bytes::Bytes;
use object_store::ObjectStoreExt;
use object_store::aws::{AmazonS3, AmazonS3Builder};
use object_store::path::Path as ObjectPath;
use object_store::signer::Signer;
use object_store::{Attribute, Attributes, ObjectStore, PutOptions};
use reqwest::Method;
use std::future::Future;
use std::path::PathBuf;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use super::{ObjectStoreClientConfig, ObjectStoreCredentials, StorageExecutionError};

#[derive(Debug, Clone)]
pub struct S3CompatibleObjectStoreClient {
    state: ObjectStoreClientState,
    signed_url_ttl: Duration,
}

impl S3CompatibleObjectStoreClient {
    pub fn new(config: ObjectStoreClientConfig) -> Self {
        let signed_url_ttl = Duration::from_secs(config.signed_url_ttl_secs.max(1));
        let state = match build_store(&config) {
            Ok(store) => ObjectStoreClientState::Ready { store },
            Err(message) => ObjectStoreClientState::Invalid { message },
        };

        Self {
            state,
            signed_url_ttl,
        }
    }

    pub fn put(
        &self,
        object_key: &str,
        bytes: &[u8],
        content_type: Option<&str>,
    ) -> Result<PathBuf, StorageExecutionError> {
        let path = object_path(object_key)?;
        let store = self.ready_store()?.clone();
        let payload = Bytes::copy_from_slice(bytes);
        let mut attributes = Attributes::new();
        if let Some(content_type) = content_type {
            attributes.insert(Attribute::ContentType, content_type.to_string().into());
        }
        let options = PutOptions {
            attributes,
            ..Default::default()
        };
        run_object_store_future(
            async move { store.put_opts(&path, payload.into(), options).await },
        )
        .map_err(|message| StorageExecutionError::WriteFailed {
            path: object_key.to_string(),
            message,
        })?;
        Ok(PathBuf::from(normalize_object_key(object_key)?))
    }

    pub fn get(&self, object_key: &str) -> Result<(PathBuf, Vec<u8>), StorageExecutionError> {
        let path = object_path(object_key)?;
        let store = self.ready_store()?.clone();
        let bytes = run_object_store_future(async move {
            let result = store.get(&path).await?;
            result.bytes().await
        })
        .map_err(|message| StorageExecutionError::ReadFailed {
            path: object_key.to_string(),
            message,
        })?;
        Ok((
            PathBuf::from(normalize_object_key(object_key)?),
            bytes.to_vec(),
        ))
    }

    pub fn signed_get_url(
        &self,
        object_key: &str,
    ) -> Result<SignedObjectUrl, StorageExecutionError> {
        let path = object_path(object_key)?;
        let store = self.ready_store()?.clone();
        let signed_url_ttl = self.signed_url_ttl;
        let signed_url = run_object_store_future(async move {
            store.signed_url(Method::GET, &path, signed_url_ttl).await
        })
        .map_err(|message| StorageExecutionError::SignedUrlGenerationFailed {
            object_key: object_key.to_string(),
            message,
        })?;
        let expires_at_unix_seconds = SystemTime::now()
            .checked_add(signed_url_ttl)
            .and_then(|instant| instant.duration_since(UNIX_EPOCH).ok())
            .map(|duration| duration.as_secs())
            .unwrap_or(0);
        Ok(SignedObjectUrl {
            object_key: normalize_object_key(object_key)?,
            signed_url: signed_url.to_string(),
            expires_at_unix_seconds,
        })
    }

    fn ready_store(&self) -> Result<&AmazonS3, StorageExecutionError> {
        match &self.state {
            ObjectStoreClientState::Ready { store } => Ok(store),
            ObjectStoreClientState::Invalid { message } => {
                Err(StorageExecutionError::InvalidObjectStoreConfiguration {
                    detail: message.clone(),
                })
            }
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SignedObjectUrl {
    pub object_key: String,
    pub signed_url: String,
    pub expires_at_unix_seconds: u64,
}

#[derive(Debug, Clone)]
enum ObjectStoreClientState {
    Ready { store: AmazonS3 },
    Invalid { message: String },
}

fn build_store(config: &ObjectStoreClientConfig) -> Result<AmazonS3, String> {
    validate_runtime_config(config)?;

    let mut builder = AmazonS3Builder::new()
        .with_bucket_name(config.bucket.clone())
        .with_region(config.region.clone())
        .with_virtual_hosted_style_request(config.virtual_hosted_style_request);
    if let Some(endpoint_url) = &config.endpoint_url {
        builder = builder
            .with_endpoint(endpoint_url.clone())
            .with_allow_http(config.allow_http);
    }
    let ObjectStoreCredentials::Static {
        access_key_id,
        secret_access_key,
        session_token,
    } = &config.credentials
    else {
        return Err(
            "runtime-backed object-store clients require explicit access_key_id and secret_access_key in the structured object-store secret"
                .to_string(),
        );
    };
    builder = builder
        .with_access_key_id(access_key_id.clone())
        .with_secret_access_key(secret_access_key.clone());
    if let Some(session_token) = session_token {
        builder = builder.with_token(session_token.clone());
    }
    builder.build().map_err(|error| error.to_string())
}

fn validate_runtime_config(config: &ObjectStoreClientConfig) -> Result<(), String> {
    if matches!(&config.credentials, ObjectStoreCredentials::Environment) {
        return Err(
            "runtime-backed object-store clients require explicit access_key_id and secret_access_key in the structured object-store secret"
                .to_string(),
        );
    }

    match config.endpoint_url.as_deref() {
        Some(endpoint_url) if endpoint_url.starts_with("http://") && !config.allow_http => {
            Err("object-store endpoint uses http but allow_http is not enabled".to_string())
        }
        None if config.allow_http => Err(
            "allow_http requires an explicit endpoint_url in the object-store config".to_string(),
        ),
        _ => Ok(()),
    }
}

fn run_object_store_future<T, F>(future: F) -> Result<T, String>
where
    T: Send + 'static,
    F: Future<Output = Result<T, object_store::Error>> + Send + 'static,
{
    match tokio::runtime::Handle::try_current() {
        Ok(handle) => match handle.runtime_flavor() {
            tokio::runtime::RuntimeFlavor::MultiThread => tokio::task::block_in_place(|| {
                handle.block_on(future).map_err(|error| error.to_string())
            }),
            tokio::runtime::RuntimeFlavor::CurrentThread => run_future_on_dedicated_runtime(future),
            _ => run_future_on_dedicated_runtime(future),
        },
        Err(_) => run_future_on_ephemeral_runtime(future),
    }
}

fn run_future_on_dedicated_runtime<T, F>(future: F) -> Result<T, String>
where
    T: Send + 'static,
    F: Future<Output = Result<T, object_store::Error>> + Send + 'static,
{
    std::thread::spawn(move || {
        let runtime = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .map_err(|error| error.to_string())?;
        runtime.block_on(future).map_err(|error| error.to_string())
    })
    .join()
    .map_err(|_| "object-store worker thread panicked".to_string())?
}

fn run_future_on_ephemeral_runtime<T, F>(future: F) -> Result<T, String>
where
    T: Send + 'static,
    F: Future<Output = Result<T, object_store::Error>> + Send + 'static,
{
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .map_err(|error| error.to_string())?;
    runtime.block_on(future).map_err(|error| error.to_string())
}

fn object_path(object_key: &str) -> Result<ObjectPath, StorageExecutionError> {
    ObjectPath::parse(normalize_object_key(object_key)?).map_err(|error| {
        StorageExecutionError::InvalidTargetPath {
            path: error.to_string(),
        }
    })
}

fn normalize_object_key(object_key: &str) -> Result<String, StorageExecutionError> {
    let mut parts = Vec::new();
    for component in std::path::Path::new(object_key).components() {
        match component {
            std::path::Component::Normal(part) => {
                parts.push(part.to_string_lossy().into_owned());
            }
            _ => {
                return Err(StorageExecutionError::InvalidTargetPath {
                    path: object_key.to_string(),
                });
            }
        }
    }

    if parts.is_empty() {
        return Err(StorageExecutionError::InvalidTargetPath {
            path: object_key.to_string(),
        });
    }

    Ok(parts.join("/"))
}