enact_core/kernel/artifact/
filesystem.rs1use super::metadata::{ArtifactMetadata, CompressionType};
18use super::store::{
19 ArtifactStore, ArtifactStoreError, GetArtifactResponse, ListArtifactsQuery, PutArtifactRequest,
20 PutArtifactResponse,
21};
22use crate::kernel::ids::{ArtifactId, ExecutionId};
23use async_trait::async_trait;
24use sha2::{Digest, Sha256};
25use std::io::{Read, Write};
26use std::path::{Path, PathBuf};
27use tokio::fs;
28
29pub struct FilesystemArtifactStore {
34 base_path: PathBuf,
36 compression_level: i32,
38 compression_enabled: bool,
40}
41
42impl FilesystemArtifactStore {
43 pub fn new(base_path: impl Into<PathBuf>) -> Self {
45 Self {
46 base_path: base_path.into(),
47 compression_level: 3, compression_enabled: true,
49 }
50 }
51
52 pub fn with_compression_level(mut self, level: i32) -> Self {
54 self.compression_level = level.clamp(1, 22);
55 self
56 }
57
58 pub fn without_compression(mut self) -> Self {
60 self.compression_enabled = false;
61 self
62 }
63
64 fn execution_path(&self, execution_id: &ExecutionId) -> PathBuf {
66 self.base_path.join(execution_id.as_str())
67 }
68
69 fn artifact_content_path(
71 &self,
72 execution_id: &ExecutionId,
73 artifact_id: &ArtifactId,
74 ) -> PathBuf {
75 let ext = if self.compression_enabled { ".zst" } else { "" };
76 self.execution_path(execution_id)
77 .join(format!("{}{}", artifact_id.as_str(), ext))
78 }
79
80 fn artifact_metadata_path(
82 &self,
83 execution_id: &ExecutionId,
84 artifact_id: &ArtifactId,
85 ) -> PathBuf {
86 self.execution_path(execution_id)
87 .join(format!("{}.meta.json", artifact_id.as_str()))
88 }
89
90 fn compress(&self, data: &[u8]) -> Result<Vec<u8>, ArtifactStoreError> {
92 if !self.compression_enabled {
93 return Ok(data.to_vec());
94 }
95
96 let mut encoder = zstd_encoder(self.compression_level)?;
98 encoder.write_all(data).map_err(|e| {
99 ArtifactStoreError::Compression(format!("Failed to write to encoder: {}", e))
100 })?;
101 encoder.finish().map_err(|e| {
102 ArtifactStoreError::Compression(format!("Failed to finish compression: {}", e))
103 })
104 }
105
106 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, ArtifactStoreError> {
108 if !self.compression_enabled {
109 return Ok(data.to_vec());
110 }
111
112 let mut decoder = zstd_decoder(data)?;
113 let mut result = Vec::new();
114 decoder
115 .read_to_end(&mut result)
116 .map_err(|e| ArtifactStoreError::Compression(format!("Failed to decompress: {}", e)))?;
117 Ok(result)
118 }
119
120 fn hash_content(data: &[u8]) -> String {
122 let mut hasher = Sha256::new();
123 hasher.update(data);
124 format!("{:x}", hasher.finalize())
125 }
126
127 async fn load_metadata(&self, path: &Path) -> Result<ArtifactMetadata, ArtifactStoreError> {
129 let content = fs::read_to_string(path).await?;
130 let metadata: ArtifactMetadata = serde_json::from_str(&content)?;
131 Ok(metadata)
132 }
133
134 async fn save_metadata(
136 &self,
137 path: &Path,
138 metadata: &ArtifactMetadata,
139 ) -> Result<(), ArtifactStoreError> {
140 let content = serde_json::to_string_pretty(metadata)?;
141 fs::write(path, content).await?;
142 Ok(())
143 }
144}
145
146fn zstd_encoder(level: i32) -> Result<ZstdEncoder, ArtifactStoreError> {
154 Ok(ZstdEncoder::new(level))
155}
156
157fn zstd_decoder(data: &[u8]) -> Result<ZstdDecoder, ArtifactStoreError> {
159 Ok(ZstdDecoder::new(data))
160}
161
162struct ZstdEncoder {
165 level: i32,
166 buffer: Vec<u8>,
167}
168
169impl ZstdEncoder {
170 fn new(level: i32) -> Self {
171 Self {
172 level,
173 buffer: Vec::new(),
174 }
175 }
176
177 fn finish(self) -> Result<Vec<u8>, std::io::Error> {
178 let original_len = self.buffer.len() as u32;
184 let compressed = miniz_oxide::deflate::compress_to_vec(&self.buffer, self.level as u8);
185
186 let mut result = Vec::with_capacity(4 + compressed.len());
187 result.extend_from_slice(&original_len.to_le_bytes());
188 result.extend_from_slice(&compressed);
189 Ok(result)
190 }
191}
192
193impl Write for ZstdEncoder {
194 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
195 self.buffer.extend_from_slice(buf);
196 Ok(buf.len())
197 }
198
199 fn flush(&mut self) -> std::io::Result<()> {
200 Ok(())
201 }
202}
203
204struct ZstdDecoder {
206 data: Vec<u8>,
207 position: usize,
208}
209
210impl ZstdDecoder {
211 fn new(data: &[u8]) -> Self {
212 Self {
213 data: data.to_vec(),
214 position: 0,
215 }
216 }
217}
218
219impl Read for ZstdDecoder {
220 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
221 if self.position == 0 && self.data.len() > 4 {
222 let _original_len =
224 u32::from_le_bytes([self.data[0], self.data[1], self.data[2], self.data[3]])
225 as usize;
226
227 let decompressed =
228 miniz_oxide::inflate::decompress_to_vec(&self.data[4..]).map_err(|e| {
229 std::io::Error::new(std::io::ErrorKind::InvalidData, format!("{:?}", e))
230 })?;
231
232 self.data = decompressed;
233 }
234
235 let remaining = self.data.len() - self.position;
236 let to_read = std::cmp::min(remaining, buf.len());
237
238 if to_read > 0 {
239 buf[..to_read].copy_from_slice(&self.data[self.position..self.position + to_read]);
240 self.position += to_read;
241 }
242
243 Ok(to_read)
244 }
245}
246
247#[async_trait]
252impl ArtifactStore for FilesystemArtifactStore {
253 async fn put(
254 &self,
255 request: PutArtifactRequest,
256 ) -> Result<PutArtifactResponse, ArtifactStoreError> {
257 let artifact_id = ArtifactId::new();
258 let original_size = request.content.len() as u64;
259
260 let exec_path = self.execution_path(&request.execution_id);
262 fs::create_dir_all(&exec_path).await?;
263
264 let compressed = self.compress(&request.content)?;
266 let compressed_size = compressed.len() as u64;
267
268 let content_hash = Self::hash_content(&request.content);
270
271 let content_path = self.artifact_content_path(&request.execution_id, &artifact_id);
273 let metadata = ArtifactMetadata::new(
274 artifact_id.clone(),
275 request.execution_id.clone(),
276 request.step_id,
277 request.name,
278 request.artifact_type,
279 )
280 .with_original_size(original_size)
281 .with_compressed_size(compressed_size)
282 .with_compression(if self.compression_enabled {
283 CompressionType::Zstd
284 } else {
285 CompressionType::None
286 })
287 .with_content_hash(content_hash)
288 .with_storage_uri(content_path.to_string_lossy().to_string())
289 .with_content_type(
290 request
291 .content_type
292 .unwrap_or_else(|| request.artifact_type.default_content_type().to_string()),
293 );
294
295 fs::write(&content_path, &compressed).await?;
297
298 let metadata_path = self.artifact_metadata_path(&request.execution_id, &artifact_id);
300 self.save_metadata(&metadata_path, &metadata).await?;
301
302 Ok(PutArtifactResponse {
303 artifact_id,
304 metadata,
305 compressed_size,
306 original_size,
307 })
308 }
309
310 async fn get(
311 &self,
312 artifact_id: &ArtifactId,
313 ) -> Result<GetArtifactResponse, ArtifactStoreError> {
314 let mut entries = fs::read_dir(&self.base_path).await?;
317
318 while let Some(entry) = entries.next_entry().await? {
319 if entry.file_type().await?.is_dir() {
320 let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
321 let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
322
323 if metadata_path.exists() {
324 let metadata = self.load_metadata(&metadata_path).await?;
325 let content_path = self.artifact_content_path(&exec_id, artifact_id);
326
327 let compressed = fs::read(&content_path).await?;
328 let content = self.decompress(&compressed)?;
329
330 return Ok(GetArtifactResponse { metadata, content });
331 }
332 }
333 }
334
335 Err(ArtifactStoreError::NotFound(artifact_id.clone()))
336 }
337
338 async fn exists(&self, artifact_id: &ArtifactId) -> Result<bool, ArtifactStoreError> {
339 let mut entries = fs::read_dir(&self.base_path).await?;
341
342 while let Some(entry) = entries.next_entry().await? {
343 if entry.file_type().await?.is_dir() {
344 let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
345 let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
346
347 if metadata_path.exists() {
348 return Ok(true);
349 }
350 }
351 }
352
353 Ok(false)
354 }
355
356 async fn delete(&self, artifact_id: &ArtifactId) -> Result<(), ArtifactStoreError> {
357 let mut entries = fs::read_dir(&self.base_path).await?;
359
360 while let Some(entry) = entries.next_entry().await? {
361 if entry.file_type().await?.is_dir() {
362 let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
363 let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
364
365 if metadata_path.exists() {
366 let content_path = self.artifact_content_path(&exec_id, artifact_id);
367
368 if content_path.exists() {
370 fs::remove_file(&content_path).await?;
371 }
372 fs::remove_file(&metadata_path).await?;
373
374 return Ok(());
375 }
376 }
377 }
378
379 Err(ArtifactStoreError::NotFound(artifact_id.clone()))
380 }
381
382 async fn list(
383 &self,
384 query: ListArtifactsQuery,
385 ) -> Result<Vec<ArtifactMetadata>, ArtifactStoreError> {
386 let mut results = Vec::new();
387
388 let exec_dirs = if let Some(ref exec_id) = query.execution_id {
390 vec![self.execution_path(exec_id)]
391 } else {
392 let mut dirs = Vec::new();
394 let mut entries = fs::read_dir(&self.base_path).await?;
395 while let Some(entry) = entries.next_entry().await? {
396 if entry.file_type().await?.is_dir() {
397 dirs.push(entry.path());
398 }
399 }
400 dirs
401 };
402
403 for exec_path in exec_dirs {
404 if !exec_path.exists() {
405 continue;
406 }
407
408 let mut entries = fs::read_dir(&exec_path).await?;
409 while let Some(entry) = entries.next_entry().await? {
410 let path = entry.path();
411 if path.extension().map(|e| e == "json").unwrap_or(false)
412 && path.to_string_lossy().contains(".meta.")
413 {
414 if let Ok(metadata) = self.load_metadata(&path).await {
415 if let Some(ref step_id) = query.step_id {
417 if metadata.step_id != *step_id {
418 continue;
419 }
420 }
421 if let Some(ref artifact_type) = query.artifact_type {
422 if metadata.artifact_type != *artifact_type {
423 continue;
424 }
425 }
426 results.push(metadata);
427 }
428 }
429 }
430 }
431
432 results.sort_by(|a, b| a.created_at.cmp(&b.created_at));
434
435 if let Some(offset) = query.offset {
437 results = results.into_iter().skip(offset).collect();
438 }
439 if let Some(limit) = query.limit {
440 results.truncate(limit);
441 }
442
443 Ok(results)
444 }
445
446 async fn get_metadata(
447 &self,
448 artifact_id: &ArtifactId,
449 ) -> Result<ArtifactMetadata, ArtifactStoreError> {
450 let mut entries = fs::read_dir(&self.base_path).await?;
452
453 while let Some(entry) = entries.next_entry().await? {
454 if entry.file_type().await?.is_dir() {
455 let exec_id = ExecutionId::from(entry.file_name().to_string_lossy().as_ref());
456 let metadata_path = self.artifact_metadata_path(&exec_id, artifact_id);
457
458 if metadata_path.exists() {
459 return self.load_metadata(&metadata_path).await;
460 }
461 }
462 }
463
464 Err(ArtifactStoreError::NotFound(artifact_id.clone()))
465 }
466
467 async fn get_execution_size(
468 &self,
469 execution_id: &ExecutionId,
470 ) -> Result<u64, ArtifactStoreError> {
471 let exec_path = self.execution_path(execution_id);
472
473 if !exec_path.exists() {
474 return Ok(0);
475 }
476
477 let mut total: u64 = 0;
478 let mut entries = fs::read_dir(&exec_path).await?;
479
480 while let Some(entry) = entries.next_entry().await? {
481 if let Ok(metadata) = entry.metadata().await {
482 total += metadata.len();
483 }
484 }
485
486 Ok(total)
487 }
488}
489
490#[cfg(test)]
495mod tests {
496 use super::super::metadata::ArtifactType;
497 use super::*;
498 use crate::kernel::ids::StepId;
499 use tempfile::TempDir;
500
501 #[tokio::test]
502 async fn test_filesystem_store_put_get() {
503 let temp_dir = TempDir::new().unwrap();
504 let store = FilesystemArtifactStore::new(temp_dir.path());
505
506 let exec_id = ExecutionId::new();
507 let step_id = StepId::new();
508 let content = b"Hello, World! This is a test artifact.".to_vec();
509
510 let request = PutArtifactRequest::new(
511 exec_id.clone(),
512 step_id,
513 "test.txt",
514 ArtifactType::Text,
515 content.clone(),
516 );
517
518 let response = store.put(request).await.unwrap();
519 assert!(response.artifact_id.as_str().starts_with("artifact_"));
520 assert!(response.compressed_size > 0);
521 assert_eq!(response.original_size, content.len() as u64);
522
523 let get_response = store.get(&response.artifact_id).await.unwrap();
525 assert_eq!(get_response.content, content);
526 assert_eq!(get_response.metadata.name, "test.txt");
527 }
528
529 #[tokio::test]
530 async fn test_filesystem_store_compression() {
531 let temp_dir = TempDir::new().unwrap();
532 let store = FilesystemArtifactStore::new(temp_dir.path());
533
534 let exec_id = ExecutionId::new();
535 let step_id = StepId::new();
536
537 let content = "Hello, World! ".repeat(1000).into_bytes();
539
540 let request = PutArtifactRequest::new(
541 exec_id,
542 step_id,
543 "repetitive.txt",
544 ArtifactType::Text,
545 content.clone(),
546 );
547
548 let response = store.put(request).await.unwrap();
549
550 assert!(response.compressed_size < response.original_size);
552
553 let get_response = store.get(&response.artifact_id).await.unwrap();
555 assert_eq!(get_response.content, content);
556 }
557
558 #[tokio::test]
559 async fn test_filesystem_store_list() {
560 let temp_dir = TempDir::new().unwrap();
561 let store = FilesystemArtifactStore::new(temp_dir.path());
562
563 let exec_id = ExecutionId::new();
564 let step_id = StepId::new();
565
566 for i in 0..3 {
568 let request = PutArtifactRequest::new(
569 exec_id.clone(),
570 step_id.clone(),
571 format!("file{}.txt", i),
572 ArtifactType::Text,
573 format!("Content {}", i).into_bytes(),
574 );
575 store.put(request).await.unwrap();
576 }
577
578 let query = ListArtifactsQuery::for_execution(exec_id);
580 let results = store.list(query).await.unwrap();
581 assert_eq!(results.len(), 3);
582 }
583}