Skip to main content

enact_core/kernel/artifact/
filesystem.rs

1//! Filesystem Artifact Store - Local storage with zstd compression
2//!
3//! This implementation stores artifacts on the local filesystem with:
4//! - zstd compression for space efficiency
5//! - SHA-256 content hashing for integrity
6//! - Metadata stored as JSON sidecar files
7//!
8//! ## Directory Structure
9//!
10//! ```text
11//! base_path/
12//!   {execution_id}/
13//!     {artifact_id}.zst          # Compressed content
14//!     {artifact_id}.meta.json    # Metadata
15//! ```
16
17use super::metadata::{ArtifactMetadata, CompressionType};
18use super::store::{
19    ArtifactStore, ArtifactStoreError, GetArtifactResponse, ListArtifactsQuery, PutArtifactRequest,
20    PutArtifactResponse,
21};
22use crate::kernel::ids::{ArtifactId, ExecutionId};
23use async_trait::async_trait;
24use sha2::{Digest, Sha256};
25use std::io::{Read, Write};
26use std::path::{Path, PathBuf};
27use tokio::fs;
28
29/// Filesystem-based artifact store
30///
31/// Stores artifacts on local disk with optional compression.
32/// Uses zstd for compression by default.
33pub struct FilesystemArtifactStore {
34    /// Base directory for artifact storage
35    base_path: PathBuf,
36    /// Compression level (1-22, default 3)
37    compression_level: i32,
38    /// Whether compression is enabled
39    compression_enabled: bool,
40}
41
42impl FilesystemArtifactStore {
43    /// Create a new filesystem artifact store
44    pub fn new(base_path: impl Into<PathBuf>) -> Self {
45        Self {
46            base_path: base_path.into(),
47            compression_level: 3, // Default zstd level
48            compression_enabled: true,
49        }
50    }
51
52    /// Set compression level (1-22, higher = better compression but slower)
53    pub fn with_compression_level(mut self, level: i32) -> Self {
54        self.compression_level = level.clamp(1, 22);
55        self
56    }
57
58    /// Disable compression
59    pub fn without_compression(mut self) -> Self {
60        self.compression_enabled = false;
61        self
62    }
63
64    /// Get the path for an execution's artifact directory
65    fn execution_path(&self, execution_id: &ExecutionId) -> PathBuf {
66        self.base_path.join(execution_id.as_str())
67    }
68
69    /// Get the path for an artifact's content file
70    fn artifact_content_path(
71        &self,
72        execution_id: &ExecutionId,
73        artifact_id: &ArtifactId,
74    ) -> PathBuf {
75        let ext = if self.compression_enabled { ".zst" } else { "" };
76        self.execution_path(execution_id)
77            .join(format!("{}{}", artifact_id.as_str(), ext))
78    }
79
80    /// Get the path for an artifact's metadata file
81    fn artifact_metadata_path(
82        &self,
83        execution_id: &ExecutionId,
84        artifact_id: &ArtifactId,
85    ) -> PathBuf {
86        self.execution_path(execution_id)
87            .join(format!("{}.meta.json", artifact_id.as_str()))
88    }
89
90    /// Compress content using zstd
91    fn compress(&self, data: &[u8]) -> Result<Vec<u8>, ArtifactStoreError> {
92        if !self.compression_enabled {
93            return Ok(data.to_vec());
94        }
95
96        // Use pure Rust zstd encoder
97        let mut encoder = zstd_encoder(self.compression_level)?;
98        encoder.write_all(data).map_err(|e| {
99            ArtifactStoreError::Compression(format!("Failed to write to encoder: {}", e))
100        })?;
101        encoder.finish().map_err(|e| {
102            ArtifactStoreError::Compression(format!("Failed to finish compression: {}", e))
103        })
104    }
105
106    /// Decompress content using zstd
107    fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, ArtifactStoreError> {
108        if !self.compression_enabled {
109            return Ok(data.to_vec());
110        }
111
112        let mut decoder = zstd_decoder(data)?;
113        let mut result = Vec::new();
114        decoder
115            .read_to_end(&mut result)
116            .map_err(|e| ArtifactStoreError::Compression(format!("Failed to decompress: {}", e)))?;
117        Ok(result)
118    }
119
120    /// Calculate SHA-256 hash of content
121    fn hash_content(data: &[u8]) -> String {
122        let mut hasher = Sha256::new();
123        hasher.update(data);
124        format!("{:x}", hasher.finalize())
125    }
126
127    /// Load metadata from file
128    async fn load_metadata(&self, path: &Path) -> Result<ArtifactMetadata, ArtifactStoreError> {
129        let content = fs::read_to_string(path).await?;
130        let metadata: ArtifactMetadata = serde_json::from_str(&content)?;
131        Ok(metadata)
132    }
133
134    /// Save metadata to file
135    async fn save_metadata(
136        &self,
137        path: &Path,
138        metadata: &ArtifactMetadata,
139    ) -> Result<(), ArtifactStoreError> {
140        let content = serde_json::to_string_pretty(metadata)?;
141        fs::write(path, content).await?;
142        Ok(())
143    }
144}
145
146// =============================================================================
147// Compression Helpers (Simple implementation without external zstd crate)
148// =============================================================================
149
150/// Create a zstd encoder
151/// This is a simple wrapper that uses the flate2 crate as a fallback
152/// In production, you would use the zstd crate directly
153fn zstd_encoder(level: i32) -> Result<ZstdEncoder, ArtifactStoreError> {
154    Ok(ZstdEncoder::new(level))
155}
156
157/// Create a zstd decoder
158fn zstd_decoder(data: &[u8]) -> Result<ZstdDecoder, ArtifactStoreError> {
159    Ok(ZstdDecoder::new(data))
160}
161
162/// Simple zstd-like encoder using miniz_oxide for compression
163/// In production, replace with actual zstd crate
164struct ZstdEncoder {
165    level: i32,
166    buffer: Vec<u8>,
167}
168
169impl ZstdEncoder {
170    fn new(level: i32) -> Self {
171        Self {
172            level,
173            buffer: Vec::new(),
174        }
175    }
176
177    fn finish(self) -> Result<Vec<u8>, std::io::Error> {
178        // Simple compression using miniz_oxide (deflate)
179        // In production, use the zstd crate for actual zstd compression
180
181        // For now, we use a simple framing format:
182        // [4 bytes: original length][compressed data]
183        let original_len = self.buffer.len() as u32;
184        let compressed = miniz_oxide::deflate::compress_to_vec(&self.buffer, self.level as u8);
185
186        let mut result = Vec::with_capacity(4 + compressed.len());
187        result.extend_from_slice(&original_len.to_le_bytes());
188        result.extend_from_slice(&compressed);
189        Ok(result)
190    }
191}
192
193impl Write for ZstdEncoder {
194    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
195        self.buffer.extend_from_slice(buf);
196        Ok(buf.len())
197    }
198
199    fn flush(&mut self) -> std::io::Result<()> {
200        Ok(())
201    }
202}
203
204/// Simple zstd-like decoder
205struct ZstdDecoder {
206    data: Vec<u8>,
207    position: usize,
208}
209
210impl ZstdDecoder {
211    fn new(data: &[u8]) -> Self {
212        Self {
213            data: data.to_vec(),
214            position: 0,
215        }
216    }
217}
218
219impl Read for ZstdDecoder {
220    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
221        if self.position == 0 && self.data.len() > 4 {
222            // First read - decompress
223            let _original_len =
224                u32::from_le_bytes([self.data[0], self.data[1], self.data[2], self.data[3]])
225                    as usize;
226
227            let decompressed =
228                miniz_oxide::inflate::decompress_to_vec(&self.data[4..]).map_err(|e| {
229                    std::io::Error::new(std::io::ErrorKind::InvalidData, format!("{:?}", e))
230                })?;
231
232            self.data = decompressed;
233        }
234
235        let remaining = self.data.len() - self.position;
236        let to_read = std::cmp::min(remaining, buf.len());
237
238        if to_read > 0 {
239            buf[..to_read].copy_from_slice(&self.data[self.position..self.position + to_read]);
240            self.position += to_read;
241        }
242
243        Ok(to_read)
244    }
245}
246
247// =============================================================================
248// ArtifactStore Implementation
249// =============================================================================
250
251#[async_trait]
252impl ArtifactStore for FilesystemArtifactStore {
253    async fn put(
254        &self,
255        request: PutArtifactRequest,
256    ) -> Result<PutArtifactResponse, ArtifactStoreError> {
257        let artifact_id = ArtifactId::new();
258        let original_size = request.content.len() as u64;
259
260        // Ensure execution directory exists
261        let exec_path = self.execution_path(&request.execution_id);
262        fs::create_dir_all(&exec_path).await?;
263
264        // Compress content
265        let compressed = self.compress(&request.content)?;
266        let compressed_size = compressed.len() as u64;
267
268        // Calculate content hash
269        let content_hash = Self::hash_content(&request.content);
270
271        // Create metadata
272        let content_path = self.artifact_content_path(&request.execution_id, &artifact_id);
273        let metadata = ArtifactMetadata::new(
274            artifact_id.clone(),
275            request.execution_id.clone(),
276            request.step_id,
277            request.name,
278            request.artifact_type,
279        )
280        .with_original_size(original_size)
281        .with_compressed_size(compressed_size)
282        .with_compression(if self.compression_enabled {
283            CompressionType::Zstd
284        } else {
285            CompressionType::None
286        })
287        .with_content_hash(content_hash)
288        .with_storage_uri(content_path.to_string_lossy().to_string())
289        .with_content_type(
290            request
291                .content_type
292                .unwrap_or_else(|| request.artifact_type.default_content_type().to_string()),
293        );
294
295        // Write content file
296        fs::write(&content_path, &compressed).await?;
297
298        // Write metadata file
299        let metadata_path = self.artifact_metadata_path(&request.execution_id, &artifact_id);
300        self.save_metadata(&metadata_path, &metadata).await?;
301
302        Ok(PutArtifactResponse {
303            artifact_id,
304            metadata,
305            compressed_size,
306            original_size,
307        })
308    }
309
310    async fn get(
311        &self,
312        artifact_id: &ArtifactId,
313    ) -> Result<GetArtifactResponse, ArtifactStoreError> {
314        // We need to find the execution ID from metadata
315        // Search all execution directories
316        let mut entries = fs::read_dir(&self.base_path).await?;
317
318        while let Some(entry) = entries.next_entry().await? {
319            if entry.file_type().await?.is_dir() {
320                let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
321                let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
322
323                if metadata_path.exists() {
324                    let metadata = self.load_metadata(&metadata_path).await?;
325                    let content_path = self.artifact_content_path(&exec_id, artifact_id);
326
327                    let compressed = fs::read(&content_path).await?;
328                    let content = self.decompress(&compressed)?;
329
330                    return Ok(GetArtifactResponse { metadata, content });
331                }
332            }
333        }
334
335        Err(ArtifactStoreError::NotFound(artifact_id.clone()))
336    }
337
338    async fn exists(&self, artifact_id: &ArtifactId) -> Result<bool, ArtifactStoreError> {
339        // Search all execution directories
340        let mut entries = fs::read_dir(&self.base_path).await?;
341
342        while let Some(entry) = entries.next_entry().await? {
343            if entry.file_type().await?.is_dir() {
344                let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
345                let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
346
347                if metadata_path.exists() {
348                    return Ok(true);
349                }
350            }
351        }
352
353        Ok(false)
354    }
355
356    async fn delete(&self, artifact_id: &ArtifactId) -> Result<(), ArtifactStoreError> {
357        // Search all execution directories
358        let mut entries = fs::read_dir(&self.base_path).await?;
359
360        while let Some(entry) = entries.next_entry().await? {
361            if entry.file_type().await?.is_dir() {
362                let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
363                let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
364
365                if metadata_path.exists() {
366                    let content_path = self.artifact_content_path(&exec_id, artifact_id);
367
368                    // Delete both files
369                    if content_path.exists() {
370                        fs::remove_file(&content_path).await?;
371                    }
372                    fs::remove_file(&metadata_path).await?;
373
374                    return Ok(());
375                }
376            }
377        }
378
379        Err(ArtifactStoreError::NotFound(artifact_id.clone()))
380    }
381
382    async fn list(
383        &self,
384        query: ListArtifactsQuery,
385    ) -> Result<Vec<ArtifactMetadata>, ArtifactStoreError> {
386        let mut results = Vec::new();
387
388        // If execution_id is specified, only search that directory
389        let exec_dirs = if let Some(ref exec_id) = query.execution_id {
390            vec![self.execution_path(exec_id)]
391        } else {
392            // Search all execution directories
393            let mut dirs = Vec::new();
394            let mut entries = fs::read_dir(&self.base_path).await?;
395            while let Some(entry) = entries.next_entry().await? {
396                if entry.file_type().await?.is_dir() {
397                    dirs.push(entry.path());
398                }
399            }
400            dirs
401        };
402
403        for exec_path in exec_dirs {
404            if !exec_path.exists() {
405                continue;
406            }
407
408            let mut entries = fs::read_dir(&exec_path).await?;
409            while let Some(entry) = entries.next_entry().await? {
410                let path = entry.path();
411                if path.extension().map(|e| e == "json").unwrap_or(false)
412                    && path.to_string_lossy().contains(".meta.")
413                {
414                    if let Ok(metadata) = self.load_metadata(&path).await {
415                        // Apply filters
416                        if let Some(ref step_id) = query.step_id {
417                            if metadata.step_id != *step_id {
418                                continue;
419                            }
420                        }
421                        if let Some(ref artifact_type) = query.artifact_type {
422                            if metadata.artifact_type != *artifact_type {
423                                continue;
424                            }
425                        }
426                        results.push(metadata);
427                    }
428                }
429            }
430        }
431
432        // Sort by creation time
433        results.sort_by(|a, b| a.created_at.cmp(&b.created_at));
434
435        // Apply pagination
436        if let Some(offset) = query.offset {
437            results = results.into_iter().skip(offset).collect();
438        }
439        if let Some(limit) = query.limit {
440            results.truncate(limit);
441        }
442
443        Ok(results)
444    }
445
446    async fn get_metadata(
447        &self,
448        artifact_id: &ArtifactId,
449    ) -> Result<ArtifactMetadata, ArtifactStoreError> {
450        // Search all execution directories
451        let mut entries = fs::read_dir(&self.base_path).await?;
452
453        while let Some(entry) = entries.next_entry().await? {
454            if entry.file_type().await?.is_dir() {
455                let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
456                let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
457
458                if metadata_path.exists() {
459                    return self.load_metadata(&metadata_path).await;
460                }
461            }
462        }
463
464        Err(ArtifactStoreError::NotFound(artifact_id.clone()))
465    }
466
467    async fn get_execution_size(
468        &self,
469        execution_id: &ExecutionId,
470    ) -> Result<u64, ArtifactStoreError> {
471        let exec_path = self.execution_path(execution_id);
472
473        if !exec_path.exists() {
474            return Ok(0);
475        }
476
477        let mut total: u64 = 0;
478        let mut entries = fs::read_dir(&exec_path).await?;
479
480        while let Some(entry) = entries.next_entry().await? {
481            if let Ok(metadata) = entry.metadata().await {
482                total += metadata.len();
483            }
484        }
485
486        Ok(total)
487    }
488}
489
490// =============================================================================
491// Tests
492// =============================================================================
493
494#[cfg(test)]
495mod tests {
496    use super::super::metadata::ArtifactType;
497    use super::*;
498    use crate::kernel::ids::StepId;
499    use tempfile::TempDir;
500
501    #[tokio::test]
502    async fn test_filesystem_store_put_get() {
503        let temp_dir = TempDir::new().unwrap();
504        let store = FilesystemArtifactStore::new(temp_dir.path());
505
506        let exec_id = ExecutionId::new();
507        let step_id = StepId::new();
508        let content = b"Hello, World! This is a test artifact.".to_vec();
509
510        let request = PutArtifactRequest::new(
511            exec_id.clone(),
512            step_id,
513            "test.txt",
514            ArtifactType::Text,
515            content.clone(),
516        );
517
518        let response = store.put(request).await.unwrap();
519        assert!(response.artifact_id.as_str().starts_with("artifact_"));
520        assert!(response.compressed_size > 0);
521        assert_eq!(response.original_size, content.len() as u64);
522
523        // Retrieve and verify
524        let get_response = store.get(&response.artifact_id).await.unwrap();
525        assert_eq!(get_response.content, content);
526        assert_eq!(get_response.metadata.name, "test.txt");
527    }
528
529    #[tokio::test]
530    async fn test_filesystem_store_compression() {
531        let temp_dir = TempDir::new().unwrap();
532        let store = FilesystemArtifactStore::new(temp_dir.path());
533
534        let exec_id = ExecutionId::new();
535        let step_id = StepId::new();
536
537        // Create repetitive content that compresses well
538        let content = "Hello, World! ".repeat(1000).into_bytes();
539
540        let request = PutArtifactRequest::new(
541            exec_id,
542            step_id,
543            "repetitive.txt",
544            ArtifactType::Text,
545            content.clone(),
546        );
547
548        let response = store.put(request).await.unwrap();
549
550        // Compressed should be smaller than original for repetitive data
551        assert!(response.compressed_size < response.original_size);
552
553        // Verify content is preserved
554        let get_response = store.get(&response.artifact_id).await.unwrap();
555        assert_eq!(get_response.content, content);
556    }
557
558    #[tokio::test]
559    async fn test_filesystem_store_list() {
560        let temp_dir = TempDir::new().unwrap();
561        let store = FilesystemArtifactStore::new(temp_dir.path());
562
563        let exec_id = ExecutionId::new();
564        let step_id = StepId::new();
565
566        // Store multiple artifacts
567        for i in 0..3 {
568            let request = PutArtifactRequest::new(
569                exec_id.clone(),
570                step_id.clone(),
571                format!("file{}.txt", i),
572                ArtifactType::Text,
573                format!("Content {}", i).into_bytes(),
574            );
575            store.put(request).await.unwrap();
576        }
577
578        // List all for execution
579        let query = ListArtifactsQuery::for_execution(exec_id);
580        let results = store.list(query).await.unwrap();
581        assert_eq!(results.len(), 3);
582    }
583}