workflow_graph_queue/memory/
logs.rs1use std::collections::HashMap;
2
3use std::sync::Mutex as StdMutex;
4
5use tokio::sync::{Mutex, broadcast};
6
7use crate::error::LogError;
8use crate::traits::{LogChunk, LogSink};
9
10pub struct InMemoryLogSink {
11 store: Mutex<HashMap<(String, String), Vec<LogChunk>>>,
13 channels: StdMutex<HashMap<(String, String), broadcast::Sender<LogChunk>>>,
16}
17
18impl InMemoryLogSink {
19 pub fn new() -> Self {
20 Self {
21 store: Mutex::new(HashMap::new()),
22 channels: StdMutex::new(HashMap::new()),
23 }
24 }
25
26 fn get_or_create_channel(
27 &self,
28 workflow_id: &str,
29 job_id: &str,
30 ) -> broadcast::Sender<LogChunk> {
31 let key = (workflow_id.to_string(), job_id.to_string());
32 self.channels
33 .lock()
34 .unwrap()
35 .entry(key)
36 .or_insert_with(|| broadcast::channel(256).0)
37 .clone()
38 }
39}
40
41impl Default for InMemoryLogSink {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl LogSink for InMemoryLogSink {
48 async fn append(&self, chunk: LogChunk) -> Result<(), LogError> {
49 let key = (chunk.workflow_id.clone(), chunk.job_id.clone());
50
51 self.store
53 .lock()
54 .await
55 .entry(key)
56 .or_default()
57 .push(chunk.clone());
58
59 let tx = self.get_or_create_channel(&chunk.workflow_id, &chunk.job_id);
61 tx.send(chunk).ok(); Ok(())
64 }
65
66 async fn get_all(&self, workflow_id: &str, job_id: &str) -> Result<Vec<LogChunk>, LogError> {
67 let store = self.store.lock().await;
68 let key = (workflow_id.to_string(), job_id.to_string());
69 Ok(store.get(&key).cloned().unwrap_or_default())
70 }
71
72 fn subscribe(&self, workflow_id: &str, job_id: &str) -> broadcast::Receiver<LogChunk> {
73 let key = (workflow_id.to_string(), job_id.to_string());
74 let mut channels = self.channels.lock().unwrap();
77 let tx = channels
78 .entry(key)
79 .or_insert_with(|| broadcast::channel(256).0);
80 tx.subscribe()
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87 use crate::traits::LogStream;
88
89 fn make_chunk(wf: &str, job: &str, seq: u64, data: &str) -> LogChunk {
90 LogChunk {
91 workflow_id: wf.into(),
92 job_id: job.into(),
93 sequence: seq,
94 data: data.into(),
95 timestamp_ms: 0,
96 stream: LogStream::Stdout,
97 }
98 }
99
100 #[tokio::test]
101 async fn test_append_and_get_all() {
102 let sink = InMemoryLogSink::new();
103 sink.append(make_chunk("wf1", "j1", 0, "line 1\n"))
104 .await
105 .unwrap();
106 sink.append(make_chunk("wf1", "j1", 1, "line 2\n"))
107 .await
108 .unwrap();
109
110 let logs = sink.get_all("wf1", "j1").await.unwrap();
111 assert_eq!(logs.len(), 2);
112 assert_eq!(logs[0].data, "line 1\n");
113 assert_eq!(logs[1].data, "line 2\n");
114 }
115
116 #[tokio::test]
117 async fn test_subscribe_receives_live() {
118 let sink = InMemoryLogSink::new();
119 let mut rx = sink.subscribe("wf1", "j1");
120
121 sink.append(make_chunk("wf1", "j1", 0, "hello\n"))
122 .await
123 .unwrap();
124
125 let chunk = rx.recv().await.unwrap();
126 assert_eq!(chunk.data, "hello\n");
127 }
128
129 #[tokio::test]
130 async fn test_separate_jobs_isolated() {
131 let sink = InMemoryLogSink::new();
132 sink.append(make_chunk("wf1", "j1", 0, "job1"))
133 .await
134 .unwrap();
135 sink.append(make_chunk("wf1", "j2", 0, "job2"))
136 .await
137 .unwrap();
138
139 let logs1 = sink.get_all("wf1", "j1").await.unwrap();
140 let logs2 = sink.get_all("wf1", "j2").await.unwrap();
141 assert_eq!(logs1.len(), 1);
142 assert_eq!(logs2.len(), 1);
143 assert_eq!(logs1[0].data, "job1");
144 assert_eq!(logs2[0].data, "job2");
145 }
146}