Skip to main content

workflow_graph_queue/memory/
artifacts.rs

1use std::collections::HashMap;
2
3use tokio::sync::Mutex;
4
5use crate::error::ArtifactError;
6use crate::traits::ArtifactStore;
7
8pub struct InMemoryArtifactStore {
9    /// Keyed by (workflow_id, job_id).
10    store: Mutex<HashMap<(String, String), HashMap<String, String>>>,
11}
12
13impl InMemoryArtifactStore {
14    pub fn new() -> Self {
15        Self {
16            store: Mutex::new(HashMap::new()),
17        }
18    }
19}
20
21impl Default for InMemoryArtifactStore {
22    fn default() -> Self {
23        Self::new()
24    }
25}
26
27impl ArtifactStore for InMemoryArtifactStore {
28    async fn put_outputs(
29        &self,
30        workflow_id: &str,
31        job_id: &str,
32        outputs: HashMap<String, String>,
33    ) -> Result<(), ArtifactError> {
34        self.store
35            .lock()
36            .await
37            .insert((workflow_id.to_string(), job_id.to_string()), outputs);
38        Ok(())
39    }
40
41    async fn get_outputs(
42        &self,
43        workflow_id: &str,
44        job_id: &str,
45    ) -> Result<HashMap<String, String>, ArtifactError> {
46        let store = self.store.lock().await;
47        Ok(store
48            .get(&(workflow_id.to_string(), job_id.to_string()))
49            .cloned()
50            .unwrap_or_default())
51    }
52
53    async fn get_upstream_outputs(
54        &self,
55        workflow_id: &str,
56        job_ids: &[String],
57    ) -> Result<HashMap<String, HashMap<String, String>>, ArtifactError> {
58        let store = self.store.lock().await;
59        let mut result = HashMap::new();
60        for job_id in job_ids {
61            let key = (workflow_id.to_string(), job_id.clone());
62            if let Some(outputs) = store.get(&key) {
63                result.insert(job_id.clone(), outputs.clone());
64            }
65        }
66        Ok(result)
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use super::*;
73
74    #[tokio::test]
75    async fn test_put_and_get() {
76        let store = InMemoryArtifactStore::new();
77        let mut outputs = HashMap::new();
78        outputs.insert("version".into(), "1.0".into());
79
80        store.put_outputs("wf1", "build", outputs).await.unwrap();
81
82        let retrieved = store.get_outputs("wf1", "build").await.unwrap();
83        assert_eq!(retrieved.get("version").unwrap(), "1.0");
84    }
85
86    #[tokio::test]
87    async fn test_get_upstream_outputs() {
88        let store = InMemoryArtifactStore::new();
89
90        let mut o1 = HashMap::new();
91        o1.insert("hash".into(), "abc123".into());
92        store.put_outputs("wf1", "build", o1).await.unwrap();
93
94        let mut o2 = HashMap::new();
95        o2.insert("passed".into(), "true".into());
96        store.put_outputs("wf1", "test", o2).await.unwrap();
97
98        let upstream = store
99            .get_upstream_outputs("wf1", &["build".into(), "test".into(), "missing".into()])
100            .await
101            .unwrap();
102
103        assert_eq!(upstream.len(), 2);
104        assert_eq!(upstream["build"]["hash"], "abc123");
105        assert_eq!(upstream["test"]["passed"], "true");
106    }
107
108    #[tokio::test]
109    async fn test_missing_returns_empty() {
110        let store = InMemoryArtifactStore::new();
111        let result = store.get_outputs("wf1", "nonexistent").await.unwrap();
112        assert!(result.is_empty());
113    }
114}