Skip to main content

enact_core/kernel/artifact/
store.rs

1//! ArtifactStore - Trait for artifact storage backends
2//!
3//! The ArtifactStore trait defines the interface for storing and retrieving
4//! artifacts. Implementations must handle compression, content addressing,
5//! and lifecycle events.
6
7use super::metadata::{ArtifactMetadata, ArtifactType};
8use crate::kernel::ids::{ArtifactId, ExecutionId, StepId};
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use std::io;
12use thiserror::Error;
13
14// =============================================================================
15// Error Types
16// =============================================================================
17
18/// Errors that can occur during artifact operations
19#[derive(Debug, Error)]
20pub enum ArtifactStoreError {
21    /// Artifact not found
22    #[error("Artifact not found: {0}")]
23    NotFound(ArtifactId),
24
25    /// IO error during storage operation
26    #[error("IO error: {0}")]
27    Io(#[from] io::Error),
28
29    /// Serialization/deserialization error
30    #[error("Serialization error: {0}")]
31    Serialization(#[from] serde_json::Error),
32
33    /// Compression/decompression error
34    #[error("Compression error: {0}")]
35    Compression(String),
36
37    /// Invalid artifact data
38    #[error("Invalid artifact: {0}")]
39    Invalid(String),
40
41    /// Storage backend error
42    #[error("Storage error: {0}")]
43    Storage(String),
44
45    /// Artifact already exists (for put operations)
46    #[error("Artifact already exists: {0}")]
47    AlreadyExists(ArtifactId),
48}
49
50// =============================================================================
51// Artifact Data
52// =============================================================================
53
54/// Request to store an artifact
55#[derive(Debug, Clone)]
56pub struct PutArtifactRequest {
57    /// Execution that produced this artifact
58    pub execution_id: ExecutionId,
59    /// Step that produced this artifact
60    pub step_id: StepId,
61    /// Name of the artifact
62    pub name: String,
63    /// Type of artifact
64    pub artifact_type: ArtifactType,
65    /// Content type (MIME type)
66    pub content_type: Option<String>,
67    /// Raw content bytes
68    pub content: Vec<u8>,
69    /// Additional metadata
70    pub metadata: Option<serde_json::Value>,
71}
72
73impl PutArtifactRequest {
74    /// Create a new put request
75    pub fn new(
76        execution_id: ExecutionId,
77        step_id: StepId,
78        name: impl Into<String>,
79        artifact_type: ArtifactType,
80        content: Vec<u8>,
81    ) -> Self {
82        Self {
83            execution_id,
84            step_id,
85            name: name.into(),
86            artifact_type,
87            content_type: None,
88            content,
89            metadata: None,
90        }
91    }
92
93    /// Set content type
94    pub fn with_content_type(mut self, content_type: impl Into<String>) -> Self {
95        self.content_type = Some(content_type.into());
96        self
97    }
98
99    /// Set metadata
100    pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
101        self.metadata = Some(metadata);
102        self
103    }
104}
105
106/// Response from storing an artifact
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct PutArtifactResponse {
109    /// Generated artifact ID
110    pub artifact_id: ArtifactId,
111    /// Full metadata of the stored artifact
112    pub metadata: ArtifactMetadata,
113    /// Size of the compressed content (bytes)
114    pub compressed_size: u64,
115    /// Size of the original content (bytes)
116    pub original_size: u64,
117}
118
119/// Request to retrieve an artifact
120#[allow(dead_code)]
121#[derive(Debug, Clone)]
122pub struct GetArtifactRequest {
123    /// Artifact ID to retrieve
124    pub artifact_id: ArtifactId,
125}
126
127/// Response from retrieving an artifact
128#[derive(Debug, Clone)]
129pub struct GetArtifactResponse {
130    /// Artifact metadata
131    pub metadata: ArtifactMetadata,
132    /// Decompressed content bytes
133    pub content: Vec<u8>,
134}
135
136/// Query for listing artifacts
137#[derive(Debug, Clone, Default)]
138pub struct ListArtifactsQuery {
139    /// Filter by execution ID
140    pub execution_id: Option<ExecutionId>,
141    /// Filter by step ID
142    pub step_id: Option<StepId>,
143    /// Filter by artifact type
144    pub artifact_type: Option<ArtifactType>,
145    /// Maximum number of results
146    pub limit: Option<usize>,
147    /// Offset for pagination
148    pub offset: Option<usize>,
149}
150
151impl ListArtifactsQuery {
152    /// Create a query for a specific execution
153    pub fn for_execution(execution_id: ExecutionId) -> Self {
154        Self {
155            execution_id: Some(execution_id),
156            ..Default::default()
157        }
158    }
159
160    /// Create a query for a specific step
161    pub fn for_step(step_id: StepId) -> Self {
162        Self {
163            step_id: Some(step_id),
164            ..Default::default()
165        }
166    }
167}
168
169// =============================================================================
170// ArtifactStore Trait
171// =============================================================================
172
173/// Trait for artifact storage backends
174///
175/// Implementations must:
176/// - Generate deterministic artifact IDs
177/// - Compress content before storage (recommended: zstd)
178/// - Handle concurrent access safely
179/// - Support listing and filtering
180#[async_trait]
181pub trait ArtifactStore: Send + Sync {
182    /// Store an artifact
183    ///
184    /// Returns the generated artifact ID and metadata
185    async fn put(
186        &self,
187        request: PutArtifactRequest,
188    ) -> Result<PutArtifactResponse, ArtifactStoreError>;
189
190    /// Retrieve an artifact by ID
191    async fn get(
192        &self,
193        artifact_id: &ArtifactId,
194    ) -> Result<GetArtifactResponse, ArtifactStoreError>;
195
196    /// Check if an artifact exists
197    async fn exists(&self, artifact_id: &ArtifactId) -> Result<bool, ArtifactStoreError>;
198
199    /// Delete an artifact
200    async fn delete(&self, artifact_id: &ArtifactId) -> Result<(), ArtifactStoreError>;
201
202    /// List artifacts matching a query
203    async fn list(
204        &self,
205        query: ListArtifactsQuery,
206    ) -> Result<Vec<ArtifactMetadata>, ArtifactStoreError>;
207
208    /// Get metadata for an artifact without retrieving content
209    async fn get_metadata(
210        &self,
211        artifact_id: &ArtifactId,
212    ) -> Result<ArtifactMetadata, ArtifactStoreError>;
213
214    /// Get total storage size for an execution
215    async fn get_execution_size(
216        &self,
217        execution_id: &ExecutionId,
218    ) -> Result<u64, ArtifactStoreError>;
219}
220
221// =============================================================================
222// In-Memory Store (for testing)
223// =============================================================================
224
225use std::collections::HashMap;
226use tokio::sync::RwLock;
227
228/// In-memory artifact store for testing
229pub struct InMemoryArtifactStore {
230    artifacts: RwLock<HashMap<ArtifactId, (ArtifactMetadata, Vec<u8>)>>,
231}
232
233impl InMemoryArtifactStore {
234    /// Create a new in-memory store
235    pub fn new() -> Self {
236        Self {
237            artifacts: RwLock::new(HashMap::new()),
238        }
239    }
240}
241
242impl Default for InMemoryArtifactStore {
243    fn default() -> Self {
244        Self::new()
245    }
246}
247
248#[async_trait]
249impl ArtifactStore for InMemoryArtifactStore {
250    async fn put(
251        &self,
252        request: PutArtifactRequest,
253    ) -> Result<PutArtifactResponse, ArtifactStoreError> {
254        let artifact_id = ArtifactId::new();
255        let original_size = request.content.len() as u64;
256
257        let metadata = ArtifactMetadata::new(
258            artifact_id.clone(),
259            request.execution_id,
260            request.step_id,
261            request.name,
262            request.artifact_type,
263        )
264        .with_original_size(original_size)
265        .with_compressed_size(original_size) // No compression in memory
266        .with_content_type(
267            request
268                .content_type
269                .unwrap_or_else(|| "application/octet-stream".to_string()),
270        );
271
272        {
273            let mut artifacts = self.artifacts.write().await;
274            artifacts.insert(artifact_id.clone(), (metadata.clone(), request.content));
275        }
276
277        Ok(PutArtifactResponse {
278            artifact_id,
279            metadata,
280            compressed_size: original_size,
281            original_size,
282        })
283    }
284
285    async fn get(
286        &self,
287        artifact_id: &ArtifactId,
288    ) -> Result<GetArtifactResponse, ArtifactStoreError> {
289        let artifacts = self.artifacts.read().await;
290        match artifacts.get(artifact_id) {
291            Some((metadata, content)) => Ok(GetArtifactResponse {
292                metadata: metadata.clone(),
293                content: content.clone(),
294            }),
295            None => Err(ArtifactStoreError::NotFound(artifact_id.clone())),
296        }
297    }
298
299    async fn exists(&self, artifact_id: &ArtifactId) -> Result<bool, ArtifactStoreError> {
300        let artifacts = self.artifacts.read().await;
301        Ok(artifacts.contains_key(artifact_id))
302    }
303
304    async fn delete(&self, artifact_id: &ArtifactId) -> Result<(), ArtifactStoreError> {
305        let mut artifacts = self.artifacts.write().await;
306        artifacts
307            .remove(artifact_id)
308            .ok_or_else(|| ArtifactStoreError::NotFound(artifact_id.clone()))?;
309        Ok(())
310    }
311
312    async fn list(
313        &self,
314        query: ListArtifactsQuery,
315    ) -> Result<Vec<ArtifactMetadata>, ArtifactStoreError> {
316        let artifacts = self.artifacts.read().await;
317        let mut results: Vec<ArtifactMetadata> = artifacts
318            .values()
319            .filter_map(|(metadata, _)| {
320                // Apply filters
321                if let Some(ref exec_id) = query.execution_id {
322                    if metadata.execution_id != *exec_id {
323                        return None;
324                    }
325                }
326                if let Some(ref step_id) = query.step_id {
327                    if metadata.step_id != *step_id {
328                        return None;
329                    }
330                }
331                if let Some(ref artifact_type) = query.artifact_type {
332                    if metadata.artifact_type != *artifact_type {
333                        return None;
334                    }
335                }
336                Some(metadata.clone())
337            })
338            .collect();
339
340        // Sort by creation time
341        results.sort_by(|a, b| a.created_at.cmp(&b.created_at));
342
343        // Apply pagination
344        if let Some(offset) = query.offset {
345            results = results.into_iter().skip(offset).collect();
346        }
347        if let Some(limit) = query.limit {
348            results.truncate(limit);
349        }
350
351        Ok(results)
352    }
353
354    async fn get_metadata(
355        &self,
356        artifact_id: &ArtifactId,
357    ) -> Result<ArtifactMetadata, ArtifactStoreError> {
358        let artifacts = self.artifacts.read().await;
359        match artifacts.get(artifact_id) {
360            Some((metadata, _)) => Ok(metadata.clone()),
361            None => Err(ArtifactStoreError::NotFound(artifact_id.clone())),
362        }
363    }
364
365    async fn get_execution_size(
366        &self,
367        execution_id: &ExecutionId,
368    ) -> Result<u64, ArtifactStoreError> {
369        let artifacts = self.artifacts.read().await;
370        let total: u64 = artifacts
371            .values()
372            .filter(|(m, _)| m.execution_id == *execution_id)
373            .map(|(_, content)| content.len() as u64)
374            .sum();
375        Ok(total)
376    }
377}
378
379// =============================================================================
380// Tests
381// =============================================================================
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386
387    #[tokio::test]
388    async fn test_in_memory_store_put_get() {
389        let store = InMemoryArtifactStore::new();
390        let exec_id = ExecutionId::new();
391        let step_id = StepId::new();
392
393        let request = PutArtifactRequest::new(
394            exec_id.clone(),
395            step_id,
396            "test.txt",
397            ArtifactType::Text,
398            b"Hello, World!".to_vec(),
399        );
400
401        let response = store.put(request).await.unwrap();
402        assert!(response.artifact_id.as_str().starts_with("artifact_"));
403
404        let get_response = store.get(&response.artifact_id).await.unwrap();
405        assert_eq!(get_response.content, b"Hello, World!");
406        assert_eq!(get_response.metadata.name, "test.txt");
407    }
408
409    #[tokio::test]
410    async fn test_in_memory_store_list() {
411        let store = InMemoryArtifactStore::new();
412        let exec_id = ExecutionId::new();
413        let step_id = StepId::new();
414
415        // Store multiple artifacts
416        for i in 0..5 {
417            let request = PutArtifactRequest::new(
418                exec_id.clone(),
419                step_id.clone(),
420                format!("file{}.txt", i),
421                ArtifactType::Text,
422                format!("Content {}", i).into_bytes(),
423            );
424            store.put(request).await.unwrap();
425        }
426
427        // List all for execution
428        let query = ListArtifactsQuery::for_execution(exec_id.clone());
429        let results = store.list(query).await.unwrap();
430        assert_eq!(results.len(), 5);
431
432        // List with limit
433        let query = ListArtifactsQuery {
434            execution_id: Some(exec_id),
435            limit: Some(3),
436            ..Default::default()
437        };
438        let results = store.list(query).await.unwrap();
439        assert_eq!(results.len(), 3);
440    }
441
442    #[tokio::test]
443    async fn test_in_memory_store_delete() {
444        let store = InMemoryArtifactStore::new();
445        let exec_id = ExecutionId::new();
446        let step_id = StepId::new();
447
448        let request = PutArtifactRequest::new(
449            exec_id,
450            step_id,
451            "test.txt",
452            ArtifactType::Text,
453            b"Hello".to_vec(),
454        );
455
456        let response = store.put(request).await.unwrap();
457        assert!(store.exists(&response.artifact_id).await.unwrap());
458
459        store.delete(&response.artifact_id).await.unwrap();
460        assert!(!store.exists(&response.artifact_id).await.unwrap());
461    }
462}