use std::sync::{
Arc,
atomic::{AtomicU8, Ordering},
};
#[cfg(feature = "storage-aws")]
use object_store::aws::AmazonS3;
#[cfg(feature = "storage-azure")]
use object_store::azure::MicrosoftAzure;
#[cfg(feature = "storage-gcp")]
use object_store::gcp::GoogleCloudStorage;
#[cfg(feature = "storage-http")]
use object_store::http::HttpStore;
#[cfg(all(feature = "storage", not(target_arch = "wasm32")))]
use object_store::local::LocalFileSystem;
use object_store::{ObjectStore, memory::InMemory};
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use crate::component::Component;
mod builder;
#[cfg(feature = "storage-azure")]
pub use builder::AzureObjectStorageBuilder;
#[cfg(feature = "storage-gcp")]
pub use builder::GcpObjectStorageBuilder;
#[cfg(feature = "storage-http")]
pub use builder::HttpObjectStorageBuilder;
#[cfg(all(feature = "storage", not(target_arch = "wasm32")))]
pub use builder::LocalObjectStorageBuilder;
pub use builder::MemoryObjectStorageBuilder;
#[cfg(feature = "storage-aws")]
pub use builder::S3ObjectStorageBuilder;
#[repr(u8)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum LifecycleState {
Uninitialized = 0,
Running = 1,
Stopped = 2,
}
impl From<LifecycleState> for u8 {
fn from(state: LifecycleState) -> Self {
match state {
LifecycleState::Uninitialized => 0,
LifecycleState::Running => 1,
LifecycleState::Stopped => 2,
}
}
}
#[derive(Debug)]
pub struct ObjectStorage<S> {
name: String,
store: S,
state: AtomicU8,
}
impl ObjectStorage<InMemory> {
#[must_use]
pub fn memory(name: impl Into<String>) -> MemoryObjectStorageBuilder {
MemoryObjectStorageBuilder::new(name)
}
}
#[cfg(all(feature = "storage", not(target_arch = "wasm32")))]
impl ObjectStorage<LocalFileSystem> {
#[must_use]
pub fn local(
name: impl Into<String>,
root: impl AsRef<std::path::Path>,
) -> LocalObjectStorageBuilder {
LocalObjectStorageBuilder::new(name, root)
}
}
#[cfg(feature = "storage-aws")]
impl ObjectStorage<AmazonS3> {
#[must_use]
pub fn s3(name: impl Into<String>) -> S3ObjectStorageBuilder {
S3ObjectStorageBuilder::new(name)
}
}
#[cfg(feature = "storage-gcp")]
impl ObjectStorage<GoogleCloudStorage> {
#[must_use]
pub fn gcs(name: impl Into<String>) -> GcpObjectStorageBuilder {
GcpObjectStorageBuilder::new(name)
}
}
#[cfg(feature = "storage-azure")]
impl ObjectStorage<MicrosoftAzure> {
#[must_use]
pub fn azure(name: impl Into<String>) -> AzureObjectStorageBuilder {
AzureObjectStorageBuilder::new(name)
}
}
#[cfg(feature = "storage-http")]
impl ObjectStorage<HttpStore> {
#[must_use]
pub fn http(name: impl Into<String>, url: impl Into<String>) -> HttpObjectStorageBuilder {
HttpObjectStorageBuilder::new(name, url)
}
}
impl<S> ObjectStorage<S> {
pub(crate) fn from_store(name: impl Into<String>, store: S) -> Self {
Self {
name: name.into(),
store,
state: AtomicU8::new(LifecycleState::Uninitialized.into()),
}
}
#[must_use]
pub const fn store(&self) -> &S {
&self.store
}
fn set_state(&self, state: LifecycleState) {
self.state.store(state.into(), Ordering::Release);
}
}
#[derive(Debug, Error)]
pub enum ObjectStorageRunError {
#[error("object storage component is already running")]
AlreadyRunning,
}
impl<S> Component for ObjectStorage<S>
where
S: ObjectStore,
{
type RunError = ObjectStorageRunError;
fn name(&self) -> &str {
self.name.as_str()
}
async fn run(self: Arc<Self>, cancel: CancellationToken) -> Result<(), Self::RunError> {
if self
.state
.compare_exchange(
LifecycleState::Uninitialized.into(),
LifecycleState::Running.into(),
Ordering::AcqRel,
Ordering::Acquire,
)
.is_err()
{
return Err(ObjectStorageRunError::AlreadyRunning);
}
cancel.cancelled().await;
self.set_state(LifecycleState::Stopped);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::{env, fs, io::ErrorKind, process, sync::Arc, time::Duration};
use object_store::{ObjectStoreExt as _, PutPayload, path::Path};
use tokio_util::sync::CancellationToken;
use crate::component::Component as _;
use super::{ObjectStorage, ObjectStorageRunError};
#[test]
fn memory_builder_builds_usable_object_storage() {
let runtime = runtime();
let storage = ObjectStorage::memory("objects").build();
let path = Path::from("documents/report.txt");
runtime.block_on(async {
match storage
.store()
.put(&path, PutPayload::from_static(b"hello storage"))
.await
{
Ok(_) => {}
Err(error) => panic!("memory object must be written: {error}"),
}
let bytes = match storage.store().get(&path).await {
Ok(result) => match result.bytes().await {
Ok(bytes) => bytes,
Err(error) => panic!("memory object bytes must be read: {error}"),
},
Err(error) => panic!("memory object must be read: {error}"),
};
assert_eq!(
bytes.as_ref(),
b"hello storage",
"read bytes must match written bytes"
);
});
}
#[cfg(not(target_arch = "wasm32"))]
#[test]
fn local_builder_builds_usable_object_storage() {
let runtime = runtime();
let root = test_directory("local_builder_builds_usable_object_storage");
create_test_directory(&root);
let storage = match ObjectStorage::local("objects", &root).build() {
Ok(storage) => storage,
Err(error) => panic!("local object storage must build: {error}"),
};
let path = Path::from("documents/report.txt");
runtime.block_on(async {
match storage
.store()
.put(&path, PutPayload::from_static(b"hello local storage"))
.await
{
Ok(_) => {}
Err(error) => panic!("local object must be written: {error}"),
}
let bytes = match storage.store().get(&path).await {
Ok(result) => match result.bytes().await {
Ok(bytes) => bytes,
Err(error) => panic!("local object bytes must be read: {error}"),
},
Err(error) => panic!("local object must be read: {error}"),
};
assert_eq!(
bytes.as_ref(),
b"hello local storage",
"read bytes must match written local bytes"
);
});
remove_test_directory(&root);
}
#[test]
fn component_run_stops_after_cancellation() {
let runtime = runtime();
let storage = Arc::new(ObjectStorage::memory("objects").build());
let cancel = CancellationToken::new();
runtime.block_on(async {
let run = {
let storage = Arc::clone(&storage);
let cancel = cancel.clone();
tokio::spawn(storage.run(cancel))
};
cancel.cancel();
match tokio::time::timeout(Duration::from_secs(1), run).await {
Ok(Ok(Ok(()))) => {}
Ok(Ok(Err(error))) => panic!("component must stop cleanly: {error}"),
Ok(Err(error)) => panic!("component task must not panic: {error}"),
Err(error) => panic!("component must stop before timeout: {error}"),
}
});
}
#[test]
fn component_rejects_second_run() {
let runtime = runtime();
let storage = Arc::new(ObjectStorage::memory("objects").build());
runtime.block_on(async {
let first_cancel = CancellationToken::new();
first_cancel.cancel();
match Arc::clone(&storage).run(first_cancel).await {
Ok(()) => {}
Err(error) => panic!("first component run must stop cleanly: {error}"),
}
let second_cancel = CancellationToken::new();
second_cancel.cancel();
let error = match Arc::clone(&storage).run(second_cancel).await {
Ok(()) => panic!("second component run must be rejected"),
Err(error) => error,
};
assert!(
matches!(error, ObjectStorageRunError::AlreadyRunning),
"second component run must return the already-running error"
);
});
}
fn runtime() -> tokio::runtime::Runtime {
match tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
{
Ok(runtime) => runtime,
Err(error) => panic!("test runtime must build: {error}"),
}
}
#[cfg(not(target_arch = "wasm32"))]
fn test_directory(name: &str) -> std::path::PathBuf {
let directory = env::temp_dir().join(format!("recomp-storage-{name}-{}", process::id()));
remove_test_directory(&directory);
directory
}
#[cfg(not(target_arch = "wasm32"))]
fn create_test_directory(directory: &std::path::Path) {
match fs::create_dir_all(directory) {
Ok(()) => {}
Err(error) => panic!("test storage directory must be created: {error}"),
}
}
#[cfg(not(target_arch = "wasm32"))]
fn remove_test_directory(directory: &std::path::Path) {
match fs::remove_dir_all(directory) {
Ok(()) => {}
Err(error) if error.kind() == ErrorKind::NotFound => {}
Err(error) => panic!("test storage directory cleanup failed: {error}"),
}
}
}