use std::sync::Arc;
use chrono::{DateTime, Utc};
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::notify::LogStream;
#[derive(Debug, Clone)]
pub struct LogLine {
pub run_id: Uuid,
pub step_id: Uuid,
pub step_name: Arc<str>,
pub stream: LogStream,
pub line: String,
pub at: DateTime<Utc>,
}
pub type LogSender = mpsc::UnboundedSender<LogLine>;
pub type LogReceiver = mpsc::UnboundedReceiver<LogLine>;
pub fn channel() -> (LogSender, LogReceiver) {
mpsc::unbounded_channel()
}
#[derive(Clone)]
pub struct StepLogSender {
inner: LogSender,
run_id: Uuid,
step_id: Uuid,
step_name: Arc<str>,
}
impl StepLogSender {
pub fn new(inner: LogSender, run_id: Uuid, step_id: Uuid, step_name: String) -> Self {
Self {
inner,
run_id,
step_id,
step_name: Arc::from(step_name),
}
}
pub fn emit(&self, stream: LogStream, line: &str) {
let _ = self.inner.send(LogLine {
run_id: self.run_id,
step_id: self.step_id,
step_name: self.step_name.clone(),
stream,
line: line.to_string(),
at: Utc::now(),
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn channel_send_and_receive() {
let (sender, mut receiver) = channel();
let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "build".to_string());
step.emit(LogStream::Stdout, "line 1");
step.emit(LogStream::Stderr, "error!");
let l1 = receiver.try_recv().unwrap();
assert_eq!(l1.line, "line 1");
assert_eq!(l1.stream, LogStream::Stdout);
assert_eq!(&*l1.step_name, "build");
let l2 = receiver.try_recv().unwrap();
assert_eq!(l2.line, "error!");
assert_eq!(l2.stream, LogStream::Stderr);
}
#[test]
fn emit_after_receiver_dropped_does_not_panic() {
let (sender, receiver) = channel();
let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
drop(receiver);
step.emit(LogStream::System, "should not panic");
}
#[test]
fn clone_step_sender_shares_channel() {
let (sender, mut receiver) = channel();
let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
let cloned = step.clone();
step.emit(LogStream::Stdout, "from original");
cloned.emit(LogStream::Stdout, "from clone");
assert_eq!(receiver.try_recv().unwrap().line, "from original");
assert_eq!(receiver.try_recv().unwrap().line, "from clone");
}
}