use async_trait::async_trait;
use bytes::Bytes;
use object_store::path::Path as ObjectPath;
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
use uuid::Uuid;
use crate::error::StorageError;
use crate::store::{Artifact, ArtifactStore, StoreResult};
pub struct DiskArtifactStore {
store: object_store::local::LocalFileSystem,
#[allow(dead_code)]
prefix: String,
}
impl DiskArtifactStore {
pub fn new(root: impl Into<std::path::PathBuf>) -> StoreResult<Self> {
let root = root.into();
std::fs::create_dir_all(&root).map_err(|e| StorageError::ConnectionFailed {
backend: "disk".to_owned(),
message: format!("failed to create directory {}: {}", root.display(), e),
source: Some(Box::new(e)),
})?;
let store = object_store::local::LocalFileSystem::new_with_prefix(&root).map_err(|e| {
StorageError::ConnectionFailed {
backend: "disk".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
}
})?;
Ok(Self {
store,
prefix: root.to_string_lossy().into_owned(),
})
}
fn artifact_path(id: &Uuid) -> ObjectPath {
ObjectPath::from(format!("artifacts/{id}"))
}
fn metadata_path(id: &Uuid) -> ObjectPath {
ObjectPath::from(format!("metadata/{id}.json"))
}
}
#[async_trait]
impl ArtifactStore for DiskArtifactStore {
async fn put(&self, artifact: Artifact) -> StoreResult<Artifact> {
let data_path = Self::artifact_path(&artifact.id);
let meta_path = Self::metadata_path(&artifact.id);
let payload = PutPayload::from_bytes(Bytes::from(artifact.data.clone()));
self.store
.put(&data_path, payload)
.await
.map_err(|e| StorageError::BackendError {
backend: "disk".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?;
let meta_json =
serde_json::to_vec(&artifact).map_err(|e| StorageError::SerializationFailed {
message: e.to_string(),
source: Some(Box::new(e)),
})?;
let payload = PutPayload::from_bytes(Bytes::from(meta_json));
self.store
.put(&meta_path, payload)
.await
.map_err(|e| StorageError::BackendError {
backend: "disk".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?;
Ok(artifact)
}
async fn get(&self, id: &Uuid) -> StoreResult<Option<Artifact>> {
let meta_path = Self::metadata_path(id);
let result = self.store.get(&meta_path).await;
match result {
Ok(meta_bytes) => {
let meta_data =
meta_bytes
.bytes()
.await
.map_err(|e| StorageError::BackendError {
backend: "disk".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?;
let mut artifact: Artifact = serde_json::from_slice(&meta_data).map_err(|e| {
StorageError::SerializationFailed {
message: e.to_string(),
source: Some(Box::new(e)),
}
})?;
let data_path = Self::artifact_path(id);
let data_bytes =
self.store
.get(&data_path)
.await
.map_err(|e| StorageError::BackendError {
backend: "disk".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?;
artifact.data = data_bytes
.bytes()
.await
.map_err(|e| StorageError::BackendError {
backend: "disk".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?
.to_vec();
Ok(Some(artifact))
}
Err(object_store::Error::NotFound { .. }) => Ok(None),
Err(e) => Err(StorageError::BackendError {
backend: "disk".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
}),
}
}
async fn delete(&self, id: &Uuid) -> StoreResult<()> {
let data_path = Self::artifact_path(id);
let meta_path = Self::metadata_path(id);
let _ = self.store.delete(&data_path).await;
let _ = self.store.delete(&meta_path).await;
Ok(())
}
async fn list_by_session(&self, session_id: &Uuid) -> StoreResult<Vec<Artifact>> {
use futures_util::TryStreamExt;
let prefix = ObjectPath::from("metadata/");
let mut artifacts = Vec::new();
let stream = self.store.list(Some(&prefix));
let entries: Vec<_> =
stream
.try_collect()
.await
.map_err(|e| StorageError::BackendError {
backend: "disk".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?;
for entry in entries {
if let Ok(meta_bytes) = self.store.get(&entry.location).await {
if let Ok(data) = meta_bytes.bytes().await {
if let Ok(artifact) = serde_json::from_slice::<Artifact>(&data) {
if artifact.session_id == Some(*session_id) {
artifacts.push(artifact);
}
}
}
}
}
Ok(artifacts)
}
}
pub struct S3ArtifactStore {
store: object_store::aws::AmazonS3,
}
impl S3ArtifactStore {
#[must_use]
pub fn new(store: object_store::aws::AmazonS3) -> Self {
Self { store }
}
fn artifact_path(id: &Uuid) -> ObjectPath {
ObjectPath::from(format!("artifacts/{id}"))
}
fn metadata_path(id: &Uuid) -> ObjectPath {
ObjectPath::from(format!("metadata/{id}.json"))
}
}
#[async_trait]
impl ArtifactStore for S3ArtifactStore {
async fn put(&self, artifact: Artifact) -> StoreResult<Artifact> {
let data_path = Self::artifact_path(&artifact.id);
let meta_path = Self::metadata_path(&artifact.id);
let payload = PutPayload::from_bytes(Bytes::from(artifact.data.clone()));
self.store
.put(&data_path, payload)
.await
.map_err(|e| StorageError::BackendError {
backend: "s3".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?;
let meta_json =
serde_json::to_vec(&artifact).map_err(|e| StorageError::SerializationFailed {
message: e.to_string(),
source: Some(Box::new(e)),
})?;
let payload = PutPayload::from_bytes(Bytes::from(meta_json));
self.store
.put(&meta_path, payload)
.await
.map_err(|e| StorageError::BackendError {
backend: "s3".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?;
Ok(artifact)
}
async fn get(&self, id: &Uuid) -> StoreResult<Option<Artifact>> {
let meta_path = Self::metadata_path(id);
match self.store.get(&meta_path).await {
Ok(meta_bytes) => {
let meta_data =
meta_bytes
.bytes()
.await
.map_err(|e| StorageError::BackendError {
backend: "s3".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?;
let mut artifact: Artifact = serde_json::from_slice(&meta_data).map_err(|e| {
StorageError::SerializationFailed {
message: e.to_string(),
source: Some(Box::new(e)),
}
})?;
let data_path = Self::artifact_path(id);
let data_bytes =
self.store
.get(&data_path)
.await
.map_err(|e| StorageError::BackendError {
backend: "s3".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?;
artifact.data = data_bytes
.bytes()
.await
.map_err(|e| StorageError::BackendError {
backend: "s3".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?
.to_vec();
Ok(Some(artifact))
}
Err(object_store::Error::NotFound { .. }) => Ok(None),
Err(e) => Err(StorageError::BackendError {
backend: "s3".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
}),
}
}
async fn delete(&self, id: &Uuid) -> StoreResult<()> {
let data_path = Self::artifact_path(id);
let meta_path = Self::metadata_path(id);
let _ = self.store.delete(&data_path).await;
let _ = self.store.delete(&meta_path).await;
Ok(())
}
async fn list_by_session(&self, session_id: &Uuid) -> StoreResult<Vec<Artifact>> {
use futures_util::TryStreamExt;
let prefix = ObjectPath::from("metadata/");
let mut artifacts = Vec::new();
let stream = self.store.list(Some(&prefix));
let entries: Vec<_> =
stream
.try_collect()
.await
.map_err(|e| StorageError::BackendError {
backend: "s3".to_owned(),
message: e.to_string(),
source: Some(Box::new(e)),
})?;
for entry in entries {
if let Ok(meta_bytes) = self.store.get(&entry.location).await {
if let Ok(data) = meta_bytes.bytes().await {
if let Ok(artifact) = serde_json::from_slice::<Artifact>(&data) {
if artifact.session_id == Some(*session_id) {
artifacts.push(artifact);
}
}
}
}
}
Ok(artifacts)
}
}