workflow_graph_queue/memory/
artifacts.rs1use std::collections::HashMap;
2
3use tokio::sync::Mutex;
4
5use crate::error::ArtifactError;
6use crate::traits::ArtifactStore;
7
8pub struct InMemoryArtifactStore {
9 store: Mutex<HashMap<(String, String), HashMap<String, String>>>,
11}
12
13impl InMemoryArtifactStore {
14 pub fn new() -> Self {
15 Self {
16 store: Mutex::new(HashMap::new()),
17 }
18 }
19}
20
21impl Default for InMemoryArtifactStore {
22 fn default() -> Self {
23 Self::new()
24 }
25}
26
27impl ArtifactStore for InMemoryArtifactStore {
28 async fn put_outputs(
29 &self,
30 workflow_id: &str,
31 job_id: &str,
32 outputs: HashMap<String, String>,
33 ) -> Result<(), ArtifactError> {
34 self.store
35 .lock()
36 .await
37 .insert((workflow_id.to_string(), job_id.to_string()), outputs);
38 Ok(())
39 }
40
41 async fn get_outputs(
42 &self,
43 workflow_id: &str,
44 job_id: &str,
45 ) -> Result<HashMap<String, String>, ArtifactError> {
46 let store = self.store.lock().await;
47 Ok(store
48 .get(&(workflow_id.to_string(), job_id.to_string()))
49 .cloned()
50 .unwrap_or_default())
51 }
52
53 async fn get_upstream_outputs(
54 &self,
55 workflow_id: &str,
56 job_ids: &[String],
57 ) -> Result<HashMap<String, HashMap<String, String>>, ArtifactError> {
58 let store = self.store.lock().await;
59 let mut result = HashMap::new();
60 for job_id in job_ids {
61 let key = (workflow_id.to_string(), job_id.clone());
62 if let Some(outputs) = store.get(&key) {
63 result.insert(job_id.clone(), outputs.clone());
64 }
65 }
66 Ok(result)
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use super::*;
73
74 #[tokio::test]
75 async fn test_put_and_get() {
76 let store = InMemoryArtifactStore::new();
77 let mut outputs = HashMap::new();
78 outputs.insert("version".into(), "1.0".into());
79
80 store.put_outputs("wf1", "build", outputs).await.unwrap();
81
82 let retrieved = store.get_outputs("wf1", "build").await.unwrap();
83 assert_eq!(retrieved.get("version").unwrap(), "1.0");
84 }
85
86 #[tokio::test]
87 async fn test_get_upstream_outputs() {
88 let store = InMemoryArtifactStore::new();
89
90 let mut o1 = HashMap::new();
91 o1.insert("hash".into(), "abc123".into());
92 store.put_outputs("wf1", "build", o1).await.unwrap();
93
94 let mut o2 = HashMap::new();
95 o2.insert("passed".into(), "true".into());
96 store.put_outputs("wf1", "test", o2).await.unwrap();
97
98 let upstream = store
99 .get_upstream_outputs("wf1", &["build".into(), "test".into(), "missing".into()])
100 .await
101 .unwrap();
102
103 assert_eq!(upstream.len(), 2);
104 assert_eq!(upstream["build"]["hash"], "abc123");
105 assert_eq!(upstream["test"]["passed"], "true");
106 }
107
108 #[tokio::test]
109 async fn test_missing_returns_empty() {
110 let store = InMemoryArtifactStore::new();
111 let result = store.get_outputs("wf1", "nonexistent").await.unwrap();
112 assert!(result.is_empty());
113 }
114}