use super::metadata::{ArtifactMetadata, CompressionType};
use super::store::{
ArtifactStore, ArtifactStoreError, GetArtifactResponse, ListArtifactsQuery, PutArtifactRequest,
PutArtifactResponse,
};
use crate::kernel::ids::{ArtifactId, ExecutionId};
use async_trait::async_trait;
use sha2::{Digest, Sha256};
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use tokio::fs;
pub struct FilesystemArtifactStore {
base_path: PathBuf,
compression_level: i32,
compression_enabled: bool,
}
impl FilesystemArtifactStore {
pub fn new(base_path: impl Into<PathBuf>) -> Self {
Self {
base_path: base_path.into(),
compression_level: 3, compression_enabled: true,
}
}
pub fn with_compression_level(mut self, level: i32) -> Self {
self.compression_level = level.clamp(1, 22);
self
}
pub fn without_compression(mut self) -> Self {
self.compression_enabled = false;
self
}
fn execution_path(&self, execution_id: &ExecutionId) -> PathBuf {
self.base_path.join(execution_id.as_str())
}
fn artifact_content_path(
&self,
execution_id: &ExecutionId,
artifact_id: &ArtifactId,
) -> PathBuf {
let ext = if self.compression_enabled { ".zst" } else { "" };
self.execution_path(execution_id)
.join(format!("{}{}", artifact_id.as_str(), ext))
}
fn artifact_metadata_path(
&self,
execution_id: &ExecutionId,
artifact_id: &ArtifactId,
) -> PathBuf {
self.execution_path(execution_id)
.join(format!("{}.meta.json", artifact_id.as_str()))
}
fn compress(&self, data: &[u8]) -> Result<Vec<u8>, ArtifactStoreError> {
if !self.compression_enabled {
return Ok(data.to_vec());
}
let mut encoder = zstd_encoder(self.compression_level)?;
encoder.write_all(data).map_err(|e| {
ArtifactStoreError::Compression(format!("Failed to write to encoder: {}", e))
})?;
encoder.finish().map_err(|e| {
ArtifactStoreError::Compression(format!("Failed to finish compression: {}", e))
})
}
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, ArtifactStoreError> {
if !self.compression_enabled {
return Ok(data.to_vec());
}
let mut decoder = zstd_decoder(data)?;
let mut result = Vec::new();
decoder
.read_to_end(&mut result)
.map_err(|e| ArtifactStoreError::Compression(format!("Failed to decompress: {}", e)))?;
Ok(result)
}
fn hash_content(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
format!("{:x}", hasher.finalize())
}
async fn load_metadata(&self, path: &Path) -> Result<ArtifactMetadata, ArtifactStoreError> {
let content = fs::read_to_string(path).await?;
let metadata: ArtifactMetadata = serde_json::from_str(&content)?;
Ok(metadata)
}
async fn save_metadata(
&self,
path: &Path,
metadata: &ArtifactMetadata,
) -> Result<(), ArtifactStoreError> {
let content = serde_json::to_string_pretty(metadata)?;
fs::write(path, content).await?;
Ok(())
}
}
fn zstd_encoder(level: i32) -> Result<ZstdEncoder, ArtifactStoreError> {
Ok(ZstdEncoder::new(level))
}
fn zstd_decoder(data: &[u8]) -> Result<ZstdDecoder, ArtifactStoreError> {
Ok(ZstdDecoder::new(data))
}
struct ZstdEncoder {
level: i32,
buffer: Vec<u8>,
}
impl ZstdEncoder {
fn new(level: i32) -> Self {
Self {
level,
buffer: Vec::new(),
}
}
fn finish(self) -> Result<Vec<u8>, std::io::Error> {
let original_len = self.buffer.len() as u32;
let compressed = miniz_oxide::deflate::compress_to_vec(&self.buffer, self.level as u8);
let mut result = Vec::with_capacity(4 + compressed.len());
result.extend_from_slice(&original_len.to_le_bytes());
result.extend_from_slice(&compressed);
Ok(result)
}
}
impl Write for ZstdEncoder {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.buffer.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
struct ZstdDecoder {
data: Vec<u8>,
position: usize,
}
impl ZstdDecoder {
fn new(data: &[u8]) -> Self {
Self {
data: data.to_vec(),
position: 0,
}
}
}
impl Read for ZstdDecoder {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.position == 0 && self.data.len() > 4 {
let _original_len =
u32::from_le_bytes([self.data[0], self.data[1], self.data[2], self.data[3]])
as usize;
let decompressed =
miniz_oxide::inflate::decompress_to_vec(&self.data[4..]).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, format!("{:?}", e))
})?;
self.data = decompressed;
}
let remaining = self.data.len() - self.position;
let to_read = std::cmp::min(remaining, buf.len());
if to_read > 0 {
buf[..to_read].copy_from_slice(&self.data[self.position..self.position + to_read]);
self.position += to_read;
}
Ok(to_read)
}
}
#[async_trait]
impl ArtifactStore for FilesystemArtifactStore {
async fn put(
&self,
request: PutArtifactRequest,
) -> Result<PutArtifactResponse, ArtifactStoreError> {
let artifact_id = ArtifactId::new();
let original_size = request.content.len() as u64;
let exec_path = self.execution_path(&request.execution_id);
fs::create_dir_all(&exec_path).await?;
let compressed = self.compress(&request.content)?;
let compressed_size = compressed.len() as u64;
let content_hash = Self::hash_content(&request.content);
let content_path = self.artifact_content_path(&request.execution_id, &artifact_id);
let metadata = ArtifactMetadata::new(
artifact_id.clone(),
request.execution_id.clone(),
request.step_id,
request.name,
request.artifact_type,
)
.with_original_size(original_size)
.with_compressed_size(compressed_size)
.with_compression(if self.compression_enabled {
CompressionType::Zstd
} else {
CompressionType::None
})
.with_content_hash(content_hash)
.with_storage_uri(content_path.to_string_lossy().to_string())
.with_content_type(
request
.content_type
.unwrap_or_else(|| request.artifact_type.default_content_type().to_string()),
);
fs::write(&content_path, &compressed).await?;
let metadata_path = self.artifact_metadata_path(&request.execution_id, &artifact_id);
self.save_metadata(&metadata_path, &metadata).await?;
Ok(PutArtifactResponse {
artifact_id,
metadata,
compressed_size,
original_size,
})
}
async fn get(
&self,
artifact_id: &ArtifactId,
) -> Result<GetArtifactResponse, ArtifactStoreError> {
let mut entries = fs::read_dir(&self.base_path).await?;
while let Some(entry) = entries.next_entry().await? {
if entry.file_type().await?.is_dir() {
let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
if metadata_path.exists() {
let metadata = self.load_metadata(&metadata_path).await?;
let content_path = self.artifact_content_path(&exec_id, artifact_id);
let compressed = fs::read(&content_path).await?;
let content = self.decompress(&compressed)?;
return Ok(GetArtifactResponse { metadata, content });
}
}
}
Err(ArtifactStoreError::NotFound(artifact_id.clone()))
}
async fn exists(&self, artifact_id: &ArtifactId) -> Result<bool, ArtifactStoreError> {
let mut entries = fs::read_dir(&self.base_path).await?;
while let Some(entry) = entries.next_entry().await? {
if entry.file_type().await?.is_dir() {
let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
if metadata_path.exists() {
return Ok(true);
}
}
}
Ok(false)
}
async fn delete(&self, artifact_id: &ArtifactId) -> Result<(), ArtifactStoreError> {
let mut entries = fs::read_dir(&self.base_path).await?;
while let Some(entry) = entries.next_entry().await? {
if entry.file_type().await?.is_dir() {
let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
if metadata_path.exists() {
let content_path = self.artifact_content_path(&exec_id, artifact_id);
if content_path.exists() {
fs::remove_file(&content_path).await?;
}
fs::remove_file(&metadata_path).await?;
return Ok(());
}
}
}
Err(ArtifactStoreError::NotFound(artifact_id.clone()))
}
async fn list(
&self,
query: ListArtifactsQuery,
) -> Result<Vec<ArtifactMetadata>, ArtifactStoreError> {
let mut results = Vec::new();
let exec_dirs = if let Some(ref exec_id) = query.execution_id {
vec![self.execution_path(exec_id)]
} else {
let mut dirs = Vec::new();
let mut entries = fs::read_dir(&self.base_path).await?;
while let Some(entry) = entries.next_entry().await? {
if entry.file_type().await?.is_dir() {
dirs.push(entry.path());
}
}
dirs
};
for exec_path in exec_dirs {
if !exec_path.exists() {
continue;
}
let mut entries = fs::read_dir(&exec_path).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().map(|e| e == "json").unwrap_or(false)
&& path.to_string_lossy().contains(".meta.")
{
if let Ok(metadata) = self.load_metadata(&path).await {
if let Some(ref step_id) = query.step_id {
if metadata.step_id != *step_id {
continue;
}
}
if let Some(ref artifact_type) = query.artifact_type {
if metadata.artifact_type != *artifact_type {
continue;
}
}
results.push(metadata);
}
}
}
}
results.sort_by(|a, b| a.created_at.cmp(&b.created_at));
if let Some(offset) = query.offset {
results = results.into_iter().skip(offset).collect();
}
if let Some(limit) = query.limit {
results.truncate(limit);
}
Ok(results)
}
async fn get_metadata(
&self,
artifact_id: &ArtifactId,
) -> Result<ArtifactMetadata, ArtifactStoreError> {
let mut entries = fs::read_dir(&self.base_path).await?;
while let Some(entry) = entries.next_entry().await? {
if entry.file_type().await?.is_dir() {
let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
if metadata_path.exists() {
return self.load_metadata(&metadata_path).await;
}
}
}
Err(ArtifactStoreError::NotFound(artifact_id.clone()))
}
async fn get_execution_size(
&self,
execution_id: &ExecutionId,
) -> Result<u64, ArtifactStoreError> {
let exec_path = self.execution_path(execution_id);
if !exec_path.exists() {
return Ok(0);
}
let mut total: u64 = 0;
let mut entries = fs::read_dir(&exec_path).await?;
while let Some(entry) = entries.next_entry().await? {
if let Ok(metadata) = entry.metadata().await {
total += metadata.len();
}
}
Ok(total)
}
}
#[cfg(test)]
mod tests {
use super::super::metadata::ArtifactType;
use super::*;
use crate::kernel::ids::StepId;
use tempfile::TempDir;
#[tokio::test]
async fn test_filesystem_store_put_get() {
let temp_dir = TempDir::new().unwrap();
let store = FilesystemArtifactStore::new(temp_dir.path());
let exec_id = ExecutionId::new();
let step_id = StepId::new();
let content = b"Hello, World! This is a test artifact.".to_vec();
let request = PutArtifactRequest::new(
exec_id.clone(),
step_id,
"test.txt",
ArtifactType::Text,
content.clone(),
);
let response = store.put(request).await.unwrap();
assert!(response.artifact_id.as_str().starts_with("artifact_"));
assert!(response.compressed_size > 0);
assert_eq!(response.original_size, content.len() as u64);
let get_response = store.get(&response.artifact_id).await.unwrap();
assert_eq!(get_response.content, content);
assert_eq!(get_response.metadata.name, "test.txt");
}
#[tokio::test]
async fn test_filesystem_store_compression() {
let temp_dir = TempDir::new().unwrap();
let store = FilesystemArtifactStore::new(temp_dir.path());
let exec_id = ExecutionId::new();
let step_id = StepId::new();
let content = "Hello, World! ".repeat(1000).into_bytes();
let request = PutArtifactRequest::new(
exec_id,
step_id,
"repetitive.txt",
ArtifactType::Text,
content.clone(),
);
let response = store.put(request).await.unwrap();
assert!(response.compressed_size < response.original_size);
let get_response = store.get(&response.artifact_id).await.unwrap();
assert_eq!(get_response.content, content);
}
#[tokio::test]
async fn test_filesystem_store_list() {
let temp_dir = TempDir::new().unwrap();
let store = FilesystemArtifactStore::new(temp_dir.path());
let exec_id = ExecutionId::new();
let step_id = StepId::new();
for i in 0..3 {
let request = PutArtifactRequest::new(
exec_id.clone(),
step_id.clone(),
format!("file{}.txt", i),
ArtifactType::Text,
format!("Content {}", i).into_bytes(),
);
store.put(request).await.unwrap();
}
let query = ListArtifactsQuery::for_execution(exec_id);
let results = store.list(query).await.unwrap();
assert_eq!(results.len(), 3);
}
}