Skip to main content

workflow_graph_queue/memory/
logs.rs

1use std::collections::HashMap;
2
3use std::sync::Mutex as StdMutex;
4
5use tokio::sync::{Mutex, broadcast};
6
7use crate::error::LogError;
8use crate::traits::{LogChunk, LogSink};
9
10pub struct InMemoryLogSink {
11    /// Stored log chunks, keyed by (workflow_id, job_id).
12    store: Mutex<HashMap<(String, String), Vec<LogChunk>>>,
13    /// Per-job broadcast channels for live streaming.
14    /// Uses std::sync::Mutex since it's held only briefly and needs sync access in subscribe().
15    channels: StdMutex<HashMap<(String, String), broadcast::Sender<LogChunk>>>,
16}
17
18impl InMemoryLogSink {
19    pub fn new() -> Self {
20        Self {
21            store: Mutex::new(HashMap::new()),
22            channels: StdMutex::new(HashMap::new()),
23        }
24    }
25
26    fn get_or_create_channel(
27        &self,
28        workflow_id: &str,
29        job_id: &str,
30    ) -> broadcast::Sender<LogChunk> {
31        let key = (workflow_id.to_string(), job_id.to_string());
32        self.channels
33            .lock()
34            .unwrap()
35            .entry(key)
36            .or_insert_with(|| broadcast::channel(256).0)
37            .clone()
38    }
39}
40
41impl Default for InMemoryLogSink {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl LogSink for InMemoryLogSink {
48    async fn append(&self, chunk: LogChunk) -> Result<(), LogError> {
49        let key = (chunk.workflow_id.clone(), chunk.job_id.clone());
50
51        // Store the chunk
52        self.store
53            .lock()
54            .await
55            .entry(key)
56            .or_default()
57            .push(chunk.clone());
58
59        // Broadcast to live subscribers
60        let tx = self.get_or_create_channel(&chunk.workflow_id, &chunk.job_id);
61        tx.send(chunk).ok(); // ok if no subscribers
62
63        Ok(())
64    }
65
66    async fn get_all(&self, workflow_id: &str, job_id: &str) -> Result<Vec<LogChunk>, LogError> {
67        let store = self.store.lock().await;
68        let key = (workflow_id.to_string(), job_id.to_string());
69        Ok(store.get(&key).cloned().unwrap_or_default())
70    }
71
72    fn subscribe(&self, workflow_id: &str, job_id: &str) -> broadcast::Receiver<LogChunk> {
73        let key = (workflow_id.to_string(), job_id.to_string());
74        // Use try_lock for sync context; if contended, create a new channel
75        // (the append path will get_or_create the canonical one)
76        let mut channels = self.channels.lock().unwrap();
77        let tx = channels
78            .entry(key)
79            .or_insert_with(|| broadcast::channel(256).0);
80        tx.subscribe()
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87    use crate::traits::LogStream;
88
89    fn make_chunk(wf: &str, job: &str, seq: u64, data: &str) -> LogChunk {
90        LogChunk {
91            workflow_id: wf.into(),
92            job_id: job.into(),
93            sequence: seq,
94            data: data.into(),
95            timestamp_ms: 0,
96            stream: LogStream::Stdout,
97        }
98    }
99
100    #[tokio::test]
101    async fn test_append_and_get_all() {
102        let sink = InMemoryLogSink::new();
103        sink.append(make_chunk("wf1", "j1", 0, "line 1\n"))
104            .await
105            .unwrap();
106        sink.append(make_chunk("wf1", "j1", 1, "line 2\n"))
107            .await
108            .unwrap();
109
110        let logs = sink.get_all("wf1", "j1").await.unwrap();
111        assert_eq!(logs.len(), 2);
112        assert_eq!(logs[0].data, "line 1\n");
113        assert_eq!(logs[1].data, "line 2\n");
114    }
115
116    #[tokio::test]
117    async fn test_subscribe_receives_live() {
118        let sink = InMemoryLogSink::new();
119        let mut rx = sink.subscribe("wf1", "j1");
120
121        sink.append(make_chunk("wf1", "j1", 0, "hello\n"))
122            .await
123            .unwrap();
124
125        let chunk = rx.recv().await.unwrap();
126        assert_eq!(chunk.data, "hello\n");
127    }
128
129    #[tokio::test]
130    async fn test_separate_jobs_isolated() {
131        let sink = InMemoryLogSink::new();
132        sink.append(make_chunk("wf1", "j1", 0, "job1"))
133            .await
134            .unwrap();
135        sink.append(make_chunk("wf1", "j2", 0, "job2"))
136            .await
137            .unwrap();
138
139        let logs1 = sink.get_all("wf1", "j1").await.unwrap();
140        let logs2 = sink.get_all("wf1", "j2").await.unwrap();
141        assert_eq!(logs1.len(), 1);
142        assert_eq!(logs2.len(), 1);
143        assert_eq!(logs1[0].data, "job1");
144        assert_eq!(logs2[0].data, "job2");
145    }
146}