use std::io::{self, Write};
use crate::core::{Error, Event, Result};
use crate::sink::SinkAdapter;
#[derive(Debug, Default)]
pub struct StdoutSink {
closed: bool,
events_sent: u64,
pretty: bool,
}
impl StdoutSink {
pub fn new() -> Self {
Self::default()
}
pub fn with_pretty() -> Self {
Self {
pretty: true,
..Self::default()
}
}
pub fn events_sent(&self) -> u64 {
self.events_sent
}
}
impl SinkAdapter for StdoutSink {
async fn send(&mut self, event: &Event) -> Result<()> {
if self.closed {
return Err(Error::StateError("StdoutSink is closed".into()));
}
let mut bytes = if self.pretty {
serde_json::to_vec_pretty(event)?
} else {
serde_json::to_vec(event)?
};
bytes.push(b'\n');
let stdout = io::stdout();
let mut handle = stdout.lock();
handle
.write_all(&bytes)
.map_err(|e| Error::StateError(format!("StdoutSink write: {e}")))?;
self.events_sent += 1;
Ok(())
}
async fn flush(&mut self) -> Result<()> {
if self.closed {
return Err(Error::StateError("StdoutSink is closed".into()));
}
let stdout = io::stdout();
let mut handle = stdout.lock();
handle
.flush()
.map_err(|e| Error::StateError(format!("StdoutSink flush: {e}")))?;
Ok(())
}
async fn close(&mut self) -> Result<()> {
if !self.closed {
let stdout = io::stdout();
let mut handle = stdout.lock();
let _ = handle.flush();
}
self.closed = true;
Ok(())
}
fn name(&self) -> &str {
"stdout"
}
fn is_closed(&self) -> Option<bool> {
Some(self.closed)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn send_after_close_errors() {
let mut sink = StdoutSink::new();
sink.close().await.unwrap();
let event = Event::default();
assert!(sink.send(&event).await.is_err());
}
#[tokio::test]
async fn flush_after_close_errors() {
let mut sink = StdoutSink::new();
sink.close().await.unwrap();
assert!(sink.flush().await.is_err());
}
#[test]
fn name_is_stdout() {
let sink = StdoutSink::new();
assert_eq!(sink.name(), "stdout");
}
#[test]
fn is_closed_starts_false() {
let sink = StdoutSink::new();
assert_eq!(sink.is_closed(), Some(false));
}
#[tokio::test]
async fn is_closed_true_after_close() {
let mut sink = StdoutSink::new();
sink.close().await.unwrap();
assert_eq!(sink.is_closed(), Some(true));
}
#[tokio::test]
async fn events_sent_increments() {
let mut sink = StdoutSink::new();
assert_eq!(sink.events_sent(), 0);
sink.send(&Event::default()).await.unwrap();
assert_eq!(sink.events_sent(), 1);
sink.send(&Event::default()).await.unwrap();
assert_eq!(sink.events_sent(), 2);
}
}