recomp 0.3.1

Reusable components
Documentation
use std::sync::{
    Arc,
    atomic::{AtomicU8, Ordering},
};

#[cfg(feature = "storage-aws")]
use object_store::aws::AmazonS3;
#[cfg(feature = "storage-azure")]
use object_store::azure::MicrosoftAzure;
#[cfg(feature = "storage-gcp")]
use object_store::gcp::GoogleCloudStorage;
#[cfg(feature = "storage-http")]
use object_store::http::HttpStore;
#[cfg(all(feature = "storage", not(target_arch = "wasm32")))]
use object_store::local::LocalFileSystem;
use object_store::{ObjectStore, memory::InMemory};
use thiserror::Error;
use tokio_util::sync::CancellationToken;

use crate::component::Component;

mod builder;

#[cfg(feature = "storage-azure")]
pub use builder::AzureObjectStorageBuilder;
#[cfg(feature = "storage-gcp")]
pub use builder::GcpObjectStorageBuilder;
#[cfg(feature = "storage-http")]
pub use builder::HttpObjectStorageBuilder;
#[cfg(all(feature = "storage", not(target_arch = "wasm32")))]
pub use builder::LocalObjectStorageBuilder;
pub use builder::MemoryObjectStorageBuilder;
#[cfg(feature = "storage-aws")]
pub use builder::S3ObjectStorageBuilder;

#[repr(u8)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum LifecycleState {
    Uninitialized = 0,
    Running = 1,
    Stopped = 2,
}

impl From<LifecycleState> for u8 {
    fn from(state: LifecycleState) -> Self {
        match state {
            LifecycleState::Uninitialized => 0,
            LifecycleState::Running => 1,
            LifecycleState::Stopped => 2,
        }
    }
}

/// Object storage component backed by an [`object_store::ObjectStore`].
///
/// Object storage clients do not usually own a persistent connection, so the
/// component lifecycle only records that the storage dependency has been made
/// available and keeps running until cancellation.
#[derive(Debug)]
pub struct ObjectStorage<S> {
    name: String,
    store: S,
    state: AtomicU8,
}

impl ObjectStorage<InMemory> {
    /// Creates a builder for in-memory object storage.
    ///
    /// `name` is the component name returned by [`Component::name`]; it is not
    /// part of the object store configuration.
    #[must_use]
    pub fn memory(name: impl Into<String>) -> MemoryObjectStorageBuilder {
        MemoryObjectStorageBuilder::new(name)
    }
}

#[cfg(all(feature = "storage", not(target_arch = "wasm32")))]
impl ObjectStorage<LocalFileSystem> {
    /// Creates a builder for local filesystem object storage.
    ///
    /// `name` is the component name returned by [`Component::name`]; it is not
    /// part of the object store configuration.
    #[must_use]
    pub fn local(
        name: impl Into<String>,
        root: impl AsRef<std::path::Path>,
    ) -> LocalObjectStorageBuilder {
        LocalObjectStorageBuilder::new(name, root)
    }
}

#[cfg(feature = "storage-aws")]
impl ObjectStorage<AmazonS3> {
    /// Creates a builder for Amazon S3 compatible object storage.
    ///
    /// `name` is the component name returned by [`Component::name`]; it is not
    /// part of the object store configuration.
    ///
    /// The builder starts from [`object_store::aws::AmazonS3Builder::from_env`]
    /// so standard AWS environment variables are honored before falling back to
    /// provider defaults such as instance metadata credentials. Callers can
    /// override any provider setting before building.
    #[must_use]
    pub fn s3(name: impl Into<String>) -> S3ObjectStorageBuilder {
        S3ObjectStorageBuilder::new(name)
    }
}

#[cfg(feature = "storage-gcp")]
impl ObjectStorage<GoogleCloudStorage> {
    /// Creates a builder for Google Cloud Storage.
    ///
    /// `name` is the component name returned by [`Component::name`]; it is not
    /// part of the object store configuration.
    ///
    /// The builder starts from
    /// [`object_store::gcp::GoogleCloudStorageBuilder::from_env`] so standard
    /// Google Cloud environment variables are honored before falling back to
    /// provider defaults such as application default credentials and metadata
    /// server credentials. Callers can override any provider setting before
    /// building.
    #[must_use]
    pub fn gcs(name: impl Into<String>) -> GcpObjectStorageBuilder {
        GcpObjectStorageBuilder::new(name)
    }
}

#[cfg(feature = "storage-azure")]
impl ObjectStorage<MicrosoftAzure> {
    /// Creates a builder for Microsoft Azure Blob Storage.
    ///
    /// `name` is the component name returned by [`Component::name`]; it is not
    /// part of the object store configuration.
    ///
    /// The builder starts from
    /// [`object_store::azure::MicrosoftAzureBuilder::from_env`] so standard
    /// Azure environment variables are honored before falling back to provider
    /// defaults such as managed identity credentials. Callers can override any
    /// provider setting before building.
    #[must_use]
    pub fn azure(name: impl Into<String>) -> AzureObjectStorageBuilder {
        AzureObjectStorageBuilder::new(name)
    }
}

#[cfg(feature = "storage-http")]
impl ObjectStorage<HttpStore> {
    /// Creates a builder for HTTP/WebDAV object storage.
    ///
    /// `name` is the component name returned by [`Component::name`]; it is not
    /// part of the object store configuration.
    #[must_use]
    pub fn http(name: impl Into<String>, url: impl Into<String>) -> HttpObjectStorageBuilder {
        HttpObjectStorageBuilder::new(name, url)
    }
}

impl<S> ObjectStorage<S> {
    pub(crate) fn from_store(name: impl Into<String>, store: S) -> Self {
        Self {
            name: name.into(),
            store,
            state: AtomicU8::new(LifecycleState::Uninitialized.into()),
        }
    }

    /// Returns the underlying object store.
    ///
    /// This is the escape hatch for provider-specific functionality.
    #[must_use]
    pub const fn store(&self) -> &S {
        &self.store
    }

    fn set_state(&self, state: LifecycleState) {
        self.state.store(state.into(), Ordering::Release);
    }
}

/// Error returned by [`ObjectStorage::run`](Component::run).
#[derive(Debug, Error)]
pub enum ObjectStorageRunError {
    #[error("object storage component is already running")]
    AlreadyRunning,
}

impl<S> Component for ObjectStorage<S>
where
    S: ObjectStore,
{
    type RunError = ObjectStorageRunError;

    fn name(&self) -> &str {
        self.name.as_str()
    }

    async fn run(self: Arc<Self>, cancel: CancellationToken) -> Result<(), Self::RunError> {
        // I know this later right now seems to be pretty useless because it's just a thin later on
        // top of `object_store`. The goal is to be able to extend it later on without too much
        // duplication. Like adding a `watch` method to know when metadata/version changes and pull
        // the file. Also another thing that I have in mind is to introduce a new file handle that'd
        // abstract away live updates. Like `FileHandle<T: StorageDeserializer>` and it'll have a
        // `get` method backed by `ArcSwap`. Every time the file changes, it'll download, deserialize
        // and then atomically swap without ever locking the hot path.
        if self
            .state
            .compare_exchange(
                LifecycleState::Uninitialized.into(),
                LifecycleState::Running.into(),
                Ordering::AcqRel,
                Ordering::Acquire,
            )
            .is_err()
        {
            return Err(ObjectStorageRunError::AlreadyRunning);
        }

        cancel.cancelled().await;
        self.set_state(LifecycleState::Stopped);

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use std::{env, fs, io::ErrorKind, process, sync::Arc, time::Duration};

    use object_store::{ObjectStoreExt as _, PutPayload, path::Path};
    use tokio_util::sync::CancellationToken;

    use crate::component::Component as _;

    use super::{ObjectStorage, ObjectStorageRunError};

    #[test]
    fn memory_builder_builds_usable_object_storage() {
        let runtime = runtime();
        let storage = ObjectStorage::memory("objects").build();
        let path = Path::from("documents/report.txt");

        runtime.block_on(async {
            match storage
                .store()
                .put(&path, PutPayload::from_static(b"hello storage"))
                .await
            {
                Ok(_) => {}
                Err(error) => panic!("memory object must be written: {error}"),
            }

            let bytes = match storage.store().get(&path).await {
                Ok(result) => match result.bytes().await {
                    Ok(bytes) => bytes,
                    Err(error) => panic!("memory object bytes must be read: {error}"),
                },
                Err(error) => panic!("memory object must be read: {error}"),
            };

            assert_eq!(
                bytes.as_ref(),
                b"hello storage",
                "read bytes must match written bytes"
            );
        });
    }

    #[cfg(not(target_arch = "wasm32"))]
    #[test]
    fn local_builder_builds_usable_object_storage() {
        let runtime = runtime();
        let root = test_directory("local_builder_builds_usable_object_storage");
        create_test_directory(&root);
        let storage = match ObjectStorage::local("objects", &root).build() {
            Ok(storage) => storage,
            Err(error) => panic!("local object storage must build: {error}"),
        };
        let path = Path::from("documents/report.txt");

        runtime.block_on(async {
            match storage
                .store()
                .put(&path, PutPayload::from_static(b"hello local storage"))
                .await
            {
                Ok(_) => {}
                Err(error) => panic!("local object must be written: {error}"),
            }

            let bytes = match storage.store().get(&path).await {
                Ok(result) => match result.bytes().await {
                    Ok(bytes) => bytes,
                    Err(error) => panic!("local object bytes must be read: {error}"),
                },
                Err(error) => panic!("local object must be read: {error}"),
            };

            assert_eq!(
                bytes.as_ref(),
                b"hello local storage",
                "read bytes must match written local bytes"
            );
        });

        remove_test_directory(&root);
    }

    #[test]
    fn component_run_stops_after_cancellation() {
        let runtime = runtime();
        let storage = Arc::new(ObjectStorage::memory("objects").build());
        let cancel = CancellationToken::new();

        runtime.block_on(async {
            let run = {
                let storage = Arc::clone(&storage);
                let cancel = cancel.clone();
                tokio::spawn(storage.run(cancel))
            };

            cancel.cancel();

            match tokio::time::timeout(Duration::from_secs(1), run).await {
                Ok(Ok(Ok(()))) => {}
                Ok(Ok(Err(error))) => panic!("component must stop cleanly: {error}"),
                Ok(Err(error)) => panic!("component task must not panic: {error}"),
                Err(error) => panic!("component must stop before timeout: {error}"),
            }
        });
    }

    #[test]
    fn component_rejects_second_run() {
        let runtime = runtime();
        let storage = Arc::new(ObjectStorage::memory("objects").build());

        runtime.block_on(async {
            let first_cancel = CancellationToken::new();
            first_cancel.cancel();
            match Arc::clone(&storage).run(first_cancel).await {
                Ok(()) => {}
                Err(error) => panic!("first component run must stop cleanly: {error}"),
            }

            let second_cancel = CancellationToken::new();
            second_cancel.cancel();
            let error = match Arc::clone(&storage).run(second_cancel).await {
                Ok(()) => panic!("second component run must be rejected"),
                Err(error) => error,
            };

            assert!(
                matches!(error, ObjectStorageRunError::AlreadyRunning),
                "second component run must return the already-running error"
            );
        });
    }

    fn runtime() -> tokio::runtime::Runtime {
        match tokio::runtime::Builder::new_current_thread()
            .enable_time()
            .build()
        {
            Ok(runtime) => runtime,
            Err(error) => panic!("test runtime must build: {error}"),
        }
    }

    #[cfg(not(target_arch = "wasm32"))]
    fn test_directory(name: &str) -> std::path::PathBuf {
        let directory = env::temp_dir().join(format!("recomp-storage-{name}-{}", process::id()));
        remove_test_directory(&directory);

        directory
    }

    #[cfg(not(target_arch = "wasm32"))]
    fn create_test_directory(directory: &std::path::Path) {
        match fs::create_dir_all(directory) {
            Ok(()) => {}
            Err(error) => panic!("test storage directory must be created: {error}"),
        }
    }

    #[cfg(not(target_arch = "wasm32"))]
    fn remove_test_directory(directory: &std::path::Path) {
        match fs::remove_dir_all(directory) {
            Ok(()) => {}
            Err(error) if error.kind() == ErrorKind::NotFound => {}
            Err(error) => panic!("test storage directory cleanup failed: {error}"),
        }
    }
}