enact_core/kernel/artifact/
store.rs1use super::metadata::{ArtifactMetadata, ArtifactType};
8use crate::kernel::ids::{ArtifactId, ExecutionId, StepId};
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use std::io;
12use thiserror::Error;
13
14#[derive(Debug, Error)]
20pub enum ArtifactStoreError {
21 #[error("Artifact not found: {0}")]
23 NotFound(ArtifactId),
24
25 #[error("IO error: {0}")]
27 Io(#[from] io::Error),
28
29 #[error("Serialization error: {0}")]
31 Serialization(#[from] serde_json::Error),
32
33 #[error("Compression error: {0}")]
35 Compression(String),
36
37 #[error("Invalid artifact: {0}")]
39 Invalid(String),
40
41 #[error("Storage error: {0}")]
43 Storage(String),
44
45 #[error("Artifact already exists: {0}")]
47 AlreadyExists(ArtifactId),
48}
49
50#[derive(Debug, Clone)]
56pub struct PutArtifactRequest {
57 pub execution_id: ExecutionId,
59 pub step_id: StepId,
61 pub name: String,
63 pub artifact_type: ArtifactType,
65 pub content_type: Option<String>,
67 pub content: Vec<u8>,
69 pub metadata: Option<serde_json::Value>,
71}
72
73impl PutArtifactRequest {
74 pub fn new(
76 execution_id: ExecutionId,
77 step_id: StepId,
78 name: impl Into<String>,
79 artifact_type: ArtifactType,
80 content: Vec<u8>,
81 ) -> Self {
82 Self {
83 execution_id,
84 step_id,
85 name: name.into(),
86 artifact_type,
87 content_type: None,
88 content,
89 metadata: None,
90 }
91 }
92
93 pub fn with_content_type(mut self, content_type: impl Into<String>) -> Self {
95 self.content_type = Some(content_type.into());
96 self
97 }
98
99 pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
101 self.metadata = Some(metadata);
102 self
103 }
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct PutArtifactResponse {
109 pub artifact_id: ArtifactId,
111 pub metadata: ArtifactMetadata,
113 pub compressed_size: u64,
115 pub original_size: u64,
117}
118
119#[allow(dead_code)]
121#[derive(Debug, Clone)]
122pub struct GetArtifactRequest {
123 pub artifact_id: ArtifactId,
125}
126
127#[derive(Debug, Clone)]
129pub struct GetArtifactResponse {
130 pub metadata: ArtifactMetadata,
132 pub content: Vec<u8>,
134}
135
136#[derive(Debug, Clone, Default)]
138pub struct ListArtifactsQuery {
139 pub execution_id: Option<ExecutionId>,
141 pub step_id: Option<StepId>,
143 pub artifact_type: Option<ArtifactType>,
145 pub limit: Option<usize>,
147 pub offset: Option<usize>,
149}
150
151impl ListArtifactsQuery {
152 pub fn for_execution(execution_id: ExecutionId) -> Self {
154 Self {
155 execution_id: Some(execution_id),
156 ..Default::default()
157 }
158 }
159
160 pub fn for_step(step_id: StepId) -> Self {
162 Self {
163 step_id: Some(step_id),
164 ..Default::default()
165 }
166 }
167}
168
169#[async_trait]
181pub trait ArtifactStore: Send + Sync {
182 async fn put(
186 &self,
187 request: PutArtifactRequest,
188 ) -> Result<PutArtifactResponse, ArtifactStoreError>;
189
190 async fn get(
192 &self,
193 artifact_id: &ArtifactId,
194 ) -> Result<GetArtifactResponse, ArtifactStoreError>;
195
196 async fn exists(&self, artifact_id: &ArtifactId) -> Result<bool, ArtifactStoreError>;
198
199 async fn delete(&self, artifact_id: &ArtifactId) -> Result<(), ArtifactStoreError>;
201
202 async fn list(
204 &self,
205 query: ListArtifactsQuery,
206 ) -> Result<Vec<ArtifactMetadata>, ArtifactStoreError>;
207
208 async fn get_metadata(
210 &self,
211 artifact_id: &ArtifactId,
212 ) -> Result<ArtifactMetadata, ArtifactStoreError>;
213
214 async fn get_execution_size(
216 &self,
217 execution_id: &ExecutionId,
218 ) -> Result<u64, ArtifactStoreError>;
219}
220
221use std::collections::HashMap;
226use tokio::sync::RwLock;
227
228pub struct InMemoryArtifactStore {
230 artifacts: RwLock<HashMap<ArtifactId, (ArtifactMetadata, Vec<u8>)>>,
231}
232
233impl InMemoryArtifactStore {
234 pub fn new() -> Self {
236 Self {
237 artifacts: RwLock::new(HashMap::new()),
238 }
239 }
240}
241
242impl Default for InMemoryArtifactStore {
243 fn default() -> Self {
244 Self::new()
245 }
246}
247
248#[async_trait]
249impl ArtifactStore for InMemoryArtifactStore {
250 async fn put(
251 &self,
252 request: PutArtifactRequest,
253 ) -> Result<PutArtifactResponse, ArtifactStoreError> {
254 let artifact_id = ArtifactId::new();
255 let original_size = request.content.len() as u64;
256
257 let metadata = ArtifactMetadata::new(
258 artifact_id.clone(),
259 request.execution_id,
260 request.step_id,
261 request.name,
262 request.artifact_type,
263 )
264 .with_original_size(original_size)
265 .with_compressed_size(original_size) .with_content_type(
267 request
268 .content_type
269 .unwrap_or_else(|| "application/octet-stream".to_string()),
270 );
271
272 {
273 let mut artifacts = self.artifacts.write().await;
274 artifacts.insert(artifact_id.clone(), (metadata.clone(), request.content));
275 }
276
277 Ok(PutArtifactResponse {
278 artifact_id,
279 metadata,
280 compressed_size: original_size,
281 original_size,
282 })
283 }
284
285 async fn get(
286 &self,
287 artifact_id: &ArtifactId,
288 ) -> Result<GetArtifactResponse, ArtifactStoreError> {
289 let artifacts = self.artifacts.read().await;
290 match artifacts.get(artifact_id) {
291 Some((metadata, content)) => Ok(GetArtifactResponse {
292 metadata: metadata.clone(),
293 content: content.clone(),
294 }),
295 None => Err(ArtifactStoreError::NotFound(artifact_id.clone())),
296 }
297 }
298
299 async fn exists(&self, artifact_id: &ArtifactId) -> Result<bool, ArtifactStoreError> {
300 let artifacts = self.artifacts.read().await;
301 Ok(artifacts.contains_key(artifact_id))
302 }
303
304 async fn delete(&self, artifact_id: &ArtifactId) -> Result<(), ArtifactStoreError> {
305 let mut artifacts = self.artifacts.write().await;
306 artifacts
307 .remove(artifact_id)
308 .ok_or_else(|| ArtifactStoreError::NotFound(artifact_id.clone()))?;
309 Ok(())
310 }
311
312 async fn list(
313 &self,
314 query: ListArtifactsQuery,
315 ) -> Result<Vec<ArtifactMetadata>, ArtifactStoreError> {
316 let artifacts = self.artifacts.read().await;
317 let mut results: Vec<ArtifactMetadata> = artifacts
318 .values()
319 .filter_map(|(metadata, _)| {
320 if let Some(ref exec_id) = query.execution_id {
322 if metadata.execution_id != *exec_id {
323 return None;
324 }
325 }
326 if let Some(ref step_id) = query.step_id {
327 if metadata.step_id != *step_id {
328 return None;
329 }
330 }
331 if let Some(ref artifact_type) = query.artifact_type {
332 if metadata.artifact_type != *artifact_type {
333 return None;
334 }
335 }
336 Some(metadata.clone())
337 })
338 .collect();
339
340 results.sort_by(|a, b| a.created_at.cmp(&b.created_at));
342
343 if let Some(offset) = query.offset {
345 results = results.into_iter().skip(offset).collect();
346 }
347 if let Some(limit) = query.limit {
348 results.truncate(limit);
349 }
350
351 Ok(results)
352 }
353
354 async fn get_metadata(
355 &self,
356 artifact_id: &ArtifactId,
357 ) -> Result<ArtifactMetadata, ArtifactStoreError> {
358 let artifacts = self.artifacts.read().await;
359 match artifacts.get(artifact_id) {
360 Some((metadata, _)) => Ok(metadata.clone()),
361 None => Err(ArtifactStoreError::NotFound(artifact_id.clone())),
362 }
363 }
364
365 async fn get_execution_size(
366 &self,
367 execution_id: &ExecutionId,
368 ) -> Result<u64, ArtifactStoreError> {
369 let artifacts = self.artifacts.read().await;
370 let total: u64 = artifacts
371 .values()
372 .filter(|(m, _)| m.execution_id == *execution_id)
373 .map(|(_, content)| content.len() as u64)
374 .sum();
375 Ok(total)
376 }
377}
378
379#[cfg(test)]
384mod tests {
385 use super::*;
386
387 #[tokio::test]
388 async fn test_in_memory_store_put_get() {
389 let store = InMemoryArtifactStore::new();
390 let exec_id = ExecutionId::new();
391 let step_id = StepId::new();
392
393 let request = PutArtifactRequest::new(
394 exec_id.clone(),
395 step_id,
396 "test.txt",
397 ArtifactType::Text,
398 b"Hello, World!".to_vec(),
399 );
400
401 let response = store.put(request).await.unwrap();
402 assert!(response.artifact_id.as_str().starts_with("artifact_"));
403
404 let get_response = store.get(&response.artifact_id).await.unwrap();
405 assert_eq!(get_response.content, b"Hello, World!");
406 assert_eq!(get_response.metadata.name, "test.txt");
407 }
408
409 #[tokio::test]
410 async fn test_in_memory_store_list() {
411 let store = InMemoryArtifactStore::new();
412 let exec_id = ExecutionId::new();
413 let step_id = StepId::new();
414
415 for i in 0..5 {
417 let request = PutArtifactRequest::new(
418 exec_id.clone(),
419 step_id.clone(),
420 format!("file{}.txt", i),
421 ArtifactType::Text,
422 format!("Content {}", i).into_bytes(),
423 );
424 store.put(request).await.unwrap();
425 }
426
427 let query = ListArtifactsQuery::for_execution(exec_id.clone());
429 let results = store.list(query).await.unwrap();
430 assert_eq!(results.len(), 5);
431
432 let query = ListArtifactsQuery {
434 execution_id: Some(exec_id),
435 limit: Some(3),
436 ..Default::default()
437 };
438 let results = store.list(query).await.unwrap();
439 assert_eq!(results.len(), 3);
440 }
441
442 #[tokio::test]
443 async fn test_in_memory_store_delete() {
444 let store = InMemoryArtifactStore::new();
445 let exec_id = ExecutionId::new();
446 let step_id = StepId::new();
447
448 let request = PutArtifactRequest::new(
449 exec_id,
450 step_id,
451 "test.txt",
452 ArtifactType::Text,
453 b"Hello".to_vec(),
454 );
455
456 let response = store.put(request).await.unwrap();
457 assert!(store.exists(&response.artifact_id).await.unwrap());
458
459 store.delete(&response.artifact_id).await.unwrap();
460 assert!(!store.exists(&response.artifact_id).await.unwrap());
461 }
462}