use super::metadata::{ArtifactMetadata, ArtifactType};
use crate::kernel::ids::{ArtifactId, ExecutionId, StepId};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::io;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ArtifactStoreError {
#[error("Artifact not found: {0}")]
NotFound(ArtifactId),
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Compression error: {0}")]
Compression(String),
#[error("Invalid artifact: {0}")]
Invalid(String),
#[error("Storage error: {0}")]
Storage(String),
#[error("Artifact already exists: {0}")]
AlreadyExists(ArtifactId),
}
#[derive(Debug, Clone)]
pub struct PutArtifactRequest {
pub execution_id: ExecutionId,
pub step_id: StepId,
pub name: String,
pub artifact_type: ArtifactType,
pub content_type: Option<String>,
pub content: Vec<u8>,
pub metadata: Option<serde_json::Value>,
}
impl PutArtifactRequest {
pub fn new(
execution_id: ExecutionId,
step_id: StepId,
name: impl Into<String>,
artifact_type: ArtifactType,
content: Vec<u8>,
) -> Self {
Self {
execution_id,
step_id,
name: name.into(),
artifact_type,
content_type: None,
content,
metadata: None,
}
}
pub fn with_content_type(mut self, content_type: impl Into<String>) -> Self {
self.content_type = Some(content_type.into());
self
}
pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
self.metadata = Some(metadata);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PutArtifactResponse {
pub artifact_id: ArtifactId,
pub metadata: ArtifactMetadata,
pub compressed_size: u64,
pub original_size: u64,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct GetArtifactRequest {
pub artifact_id: ArtifactId,
}
#[derive(Debug, Clone)]
pub struct GetArtifactResponse {
pub metadata: ArtifactMetadata,
pub content: Vec<u8>,
}
#[derive(Debug, Clone, Default)]
pub struct ListArtifactsQuery {
pub execution_id: Option<ExecutionId>,
pub step_id: Option<StepId>,
pub artifact_type: Option<ArtifactType>,
pub limit: Option<usize>,
pub offset: Option<usize>,
}
impl ListArtifactsQuery {
pub fn for_execution(execution_id: ExecutionId) -> Self {
Self {
execution_id: Some(execution_id),
..Default::default()
}
}
pub fn for_step(step_id: StepId) -> Self {
Self {
step_id: Some(step_id),
..Default::default()
}
}
}
#[async_trait]
pub trait ArtifactStore: Send + Sync {
async fn put(
&self,
request: PutArtifactRequest,
) -> Result<PutArtifactResponse, ArtifactStoreError>;
async fn get(
&self,
artifact_id: &ArtifactId,
) -> Result<GetArtifactResponse, ArtifactStoreError>;
async fn exists(&self, artifact_id: &ArtifactId) -> Result<bool, ArtifactStoreError>;
async fn delete(&self, artifact_id: &ArtifactId) -> Result<(), ArtifactStoreError>;
async fn list(
&self,
query: ListArtifactsQuery,
) -> Result<Vec<ArtifactMetadata>, ArtifactStoreError>;
async fn get_metadata(
&self,
artifact_id: &ArtifactId,
) -> Result<ArtifactMetadata, ArtifactStoreError>;
async fn get_execution_size(
&self,
execution_id: &ExecutionId,
) -> Result<u64, ArtifactStoreError>;
}
use std::collections::HashMap;
use tokio::sync::RwLock;
pub struct InMemoryArtifactStore {
artifacts: RwLock<HashMap<ArtifactId, (ArtifactMetadata, Vec<u8>)>>,
}
impl InMemoryArtifactStore {
pub fn new() -> Self {
Self {
artifacts: RwLock::new(HashMap::new()),
}
}
}
impl Default for InMemoryArtifactStore {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ArtifactStore for InMemoryArtifactStore {
async fn put(
&self,
request: PutArtifactRequest,
) -> Result<PutArtifactResponse, ArtifactStoreError> {
let artifact_id = ArtifactId::new();
let original_size = request.content.len() as u64;
let metadata = ArtifactMetadata::new(
artifact_id.clone(),
request.execution_id,
request.step_id,
request.name,
request.artifact_type,
)
.with_original_size(original_size)
.with_compressed_size(original_size) .with_content_type(
request
.content_type
.unwrap_or_else(|| "application/octet-stream".to_string()),
);
{
let mut artifacts = self.artifacts.write().await;
artifacts.insert(artifact_id.clone(), (metadata.clone(), request.content));
}
Ok(PutArtifactResponse {
artifact_id,
metadata,
compressed_size: original_size,
original_size,
})
}
async fn get(
&self,
artifact_id: &ArtifactId,
) -> Result<GetArtifactResponse, ArtifactStoreError> {
let artifacts = self.artifacts.read().await;
match artifacts.get(artifact_id) {
Some((metadata, content)) => Ok(GetArtifactResponse {
metadata: metadata.clone(),
content: content.clone(),
}),
None => Err(ArtifactStoreError::NotFound(artifact_id.clone())),
}
}
async fn exists(&self, artifact_id: &ArtifactId) -> Result<bool, ArtifactStoreError> {
let artifacts = self.artifacts.read().await;
Ok(artifacts.contains_key(artifact_id))
}
async fn delete(&self, artifact_id: &ArtifactId) -> Result<(), ArtifactStoreError> {
let mut artifacts = self.artifacts.write().await;
artifacts
.remove(artifact_id)
.ok_or_else(|| ArtifactStoreError::NotFound(artifact_id.clone()))?;
Ok(())
}
async fn list(
&self,
query: ListArtifactsQuery,
) -> Result<Vec<ArtifactMetadata>, ArtifactStoreError> {
let artifacts = self.artifacts.read().await;
let mut results: Vec<ArtifactMetadata> = artifacts
.values()
.filter_map(|(metadata, _)| {
if let Some(ref exec_id) = query.execution_id {
if metadata.execution_id != *exec_id {
return None;
}
}
if let Some(ref step_id) = query.step_id {
if metadata.step_id != *step_id {
return None;
}
}
if let Some(ref artifact_type) = query.artifact_type {
if metadata.artifact_type != *artifact_type {
return None;
}
}
Some(metadata.clone())
})
.collect();
results.sort_by(|a, b| a.created_at.cmp(&b.created_at));
if let Some(offset) = query.offset {
results = results.into_iter().skip(offset).collect();
}
if let Some(limit) = query.limit {
results.truncate(limit);
}
Ok(results)
}
async fn get_metadata(
&self,
artifact_id: &ArtifactId,
) -> Result<ArtifactMetadata, ArtifactStoreError> {
let artifacts = self.artifacts.read().await;
match artifacts.get(artifact_id) {
Some((metadata, _)) => Ok(metadata.clone()),
None => Err(ArtifactStoreError::NotFound(artifact_id.clone())),
}
}
async fn get_execution_size(
&self,
execution_id: &ExecutionId,
) -> Result<u64, ArtifactStoreError> {
let artifacts = self.artifacts.read().await;
let total: u64 = artifacts
.values()
.filter(|(m, _)| m.execution_id == *execution_id)
.map(|(_, content)| content.len() as u64)
.sum();
Ok(total)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_in_memory_store_put_get() {
let store = InMemoryArtifactStore::new();
let exec_id = ExecutionId::new();
let step_id = StepId::new();
let request = PutArtifactRequest::new(
exec_id.clone(),
step_id,
"test.txt",
ArtifactType::Text,
b"Hello, World!".to_vec(),
);
let response = store.put(request).await.unwrap();
assert!(response.artifact_id.as_str().starts_with("artifact_"));
let get_response = store.get(&response.artifact_id).await.unwrap();
assert_eq!(get_response.content, b"Hello, World!");
assert_eq!(get_response.metadata.name, "test.txt");
}
#[tokio::test]
async fn test_in_memory_store_list() {
let store = InMemoryArtifactStore::new();
let exec_id = ExecutionId::new();
let step_id = StepId::new();
for i in 0..5 {
let request = PutArtifactRequest::new(
exec_id.clone(),
step_id.clone(),
format!("file{}.txt", i),
ArtifactType::Text,
format!("Content {}", i).into_bytes(),
);
store.put(request).await.unwrap();
}
let query = ListArtifactsQuery::for_execution(exec_id.clone());
let results = store.list(query).await.unwrap();
assert_eq!(results.len(), 5);
let query = ListArtifactsQuery {
execution_id: Some(exec_id),
limit: Some(3),
..Default::default()
};
let results = store.list(query).await.unwrap();
assert_eq!(results.len(), 3);
}
#[tokio::test]
async fn test_in_memory_store_delete() {
let store = InMemoryArtifactStore::new();
let exec_id = ExecutionId::new();
let step_id = StepId::new();
let request = PutArtifactRequest::new(
exec_id,
step_id,
"test.txt",
ArtifactType::Text,
b"Hello".to_vec(),
);
let response = store.put(request).await.unwrap();
assert!(store.exists(&response.artifact_id).await.unwrap());
store.delete(&response.artifact_id).await.unwrap();
assert!(!store.exists(&response.artifact_id).await.unwrap());
}
}