use std::collections::HashMap;
use std::sync::Mutex as StdMutex;
use tokio::sync::{Mutex, broadcast};
use crate::error::LogError;
use crate::traits::{LogChunk, LogSink};
pub struct InMemoryLogSink {
store: Mutex<HashMap<(String, String), Vec<LogChunk>>>,
channels: StdMutex<HashMap<(String, String), broadcast::Sender<LogChunk>>>,
}
impl InMemoryLogSink {
pub fn new() -> Self {
Self {
store: Mutex::new(HashMap::new()),
channels: StdMutex::new(HashMap::new()),
}
}
fn get_or_create_channel(
&self,
workflow_id: &str,
job_id: &str,
) -> broadcast::Sender<LogChunk> {
let key = (workflow_id.to_string(), job_id.to_string());
self.channels
.lock()
.unwrap()
.entry(key)
.or_insert_with(|| broadcast::channel(256).0)
.clone()
}
}
impl Default for InMemoryLogSink {
fn default() -> Self {
Self::new()
}
}
impl LogSink for InMemoryLogSink {
async fn append(&self, chunk: LogChunk) -> Result<(), LogError> {
let key = (chunk.workflow_id.clone(), chunk.job_id.clone());
self.store
.lock()
.await
.entry(key)
.or_default()
.push(chunk.clone());
let tx = self.get_or_create_channel(&chunk.workflow_id, &chunk.job_id);
tx.send(chunk).ok();
Ok(())
}
async fn get_all(&self, workflow_id: &str, job_id: &str) -> Result<Vec<LogChunk>, LogError> {
let store = self.store.lock().await;
let key = (workflow_id.to_string(), job_id.to_string());
Ok(store.get(&key).cloned().unwrap_or_default())
}
fn subscribe(&self, workflow_id: &str, job_id: &str) -> broadcast::Receiver<LogChunk> {
let key = (workflow_id.to_string(), job_id.to_string());
let mut channels = self.channels.lock().unwrap();
let tx = channels
.entry(key)
.or_insert_with(|| broadcast::channel(256).0);
tx.subscribe()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::traits::LogStream;
fn make_chunk(wf: &str, job: &str, seq: u64, data: &str) -> LogChunk {
LogChunk {
workflow_id: wf.into(),
job_id: job.into(),
sequence: seq,
data: data.into(),
timestamp_ms: 0,
stream: LogStream::Stdout,
}
}
#[tokio::test]
async fn test_append_and_get_all() {
let sink = InMemoryLogSink::new();
sink.append(make_chunk("wf1", "j1", 0, "line 1\n"))
.await
.unwrap();
sink.append(make_chunk("wf1", "j1", 1, "line 2\n"))
.await
.unwrap();
let logs = sink.get_all("wf1", "j1").await.unwrap();
assert_eq!(logs.len(), 2);
assert_eq!(logs[0].data, "line 1\n");
assert_eq!(logs[1].data, "line 2\n");
}
#[tokio::test]
async fn test_subscribe_receives_live() {
let sink = InMemoryLogSink::new();
let mut rx = sink.subscribe("wf1", "j1");
sink.append(make_chunk("wf1", "j1", 0, "hello\n"))
.await
.unwrap();
let chunk = rx.recv().await.unwrap();
assert_eq!(chunk.data, "hello\n");
}
#[tokio::test]
async fn test_separate_jobs_isolated() {
let sink = InMemoryLogSink::new();
sink.append(make_chunk("wf1", "j1", 0, "job1"))
.await
.unwrap();
sink.append(make_chunk("wf1", "j2", 0, "job2"))
.await
.unwrap();
let logs1 = sink.get_all("wf1", "j1").await.unwrap();
let logs2 = sink.get_all("wf1", "j2").await.unwrap();
assert_eq!(logs1.len(), 1);
assert_eq!(logs2.len(), 1);
assert_eq!(logs1[0].data, "job1");
assert_eq!(logs2[0].data, "job2");
}
}