use std::io::{self, Write};
use crate::core::{Error, Event, Result};
use crate::sink::SinkAdapter;
#[derive(Debug, Default)]
pub struct StdoutSink {
closed: bool,
events_sent: u64,
pretty: bool,
captured: Option<Vec<Event>>,
}
impl StdoutSink {
pub fn new() -> Self {
Self::default()
}
pub fn with_pretty() -> Self {
Self {
pretty: true,
..Self::default()
}
}
pub fn with_capture() -> Self {
Self {
captured: Some(Vec::new()),
..Self::default()
}
}
pub fn events_sent(&self) -> u64 {
self.events_sent
}
}
impl SinkAdapter for StdoutSink {
async fn send(&mut self, event: &Event) -> Result<()> {
if self.closed {
return Err(Error::StateError("StdoutSink is closed".into()));
}
let mut bytes = if self.pretty {
serde_json::to_vec_pretty(event)?
} else {
serde_json::to_vec(event)?
};
bytes.push(b'\n');
let stdout = io::stdout();
let mut handle = stdout.lock();
handle
.write_all(&bytes)
.map_err(|e| Error::StateError(format!("StdoutSink write: {e}")))?;
self.events_sent += 1;
if let Some(ref mut buf) = self.captured {
buf.push(event.clone());
}
Ok(())
}
async fn flush(&mut self) -> Result<()> {
if self.closed {
return Err(Error::StateError("StdoutSink is closed".into()));
}
let stdout = io::stdout();
let mut handle = stdout.lock();
handle
.flush()
.map_err(|e| Error::StateError(format!("StdoutSink flush: {e}")))?;
Ok(())
}
async fn close(&mut self) -> Result<()> {
if !self.closed {
let stdout = io::stdout();
let mut handle = stdout.lock();
let _ = handle.flush();
}
self.closed = true;
Ok(())
}
fn name(&self) -> &str {
"stdout"
}
fn is_closed(&self) -> bool {
self.closed
}
fn delivery_metrics(&self) -> Option<crate::sink::SinkDeliveryMetrics> {
Some(crate::sink::SinkDeliveryMetrics {
events_sent: self.events_sent,
..Default::default()
})
}
fn exported_events(&self) -> Option<&[Event]> {
self.captured.as_deref()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn send_after_close_errors() {
let mut sink = StdoutSink::new();
sink.close().await.unwrap();
let event = Event::default();
assert!(sink.send(&event).await.is_err());
}
#[tokio::test]
async fn flush_after_close_errors() {
let mut sink = StdoutSink::new();
sink.close().await.unwrap();
assert!(sink.flush().await.is_err());
}
#[test]
fn name_is_stdout() {
let sink = StdoutSink::new();
assert_eq!(sink.name(), "stdout");
}
#[test]
fn is_closed_starts_false() {
let sink = StdoutSink::new();
assert!(!sink.is_closed());
}
#[tokio::test]
async fn is_closed_true_after_close() {
let mut sink = StdoutSink::new();
sink.close().await.unwrap();
assert!(sink.is_closed());
}
#[tokio::test]
async fn events_sent_increments() {
let mut sink = StdoutSink::new();
assert_eq!(sink.events_sent(), 0);
sink.send(&Event::default()).await.unwrap();
assert_eq!(sink.events_sent(), 1);
sink.send(&Event::default()).await.unwrap();
assert_eq!(sink.events_sent(), 2);
}
}