use flume;
use std::any::type_name;
use std::fs::File;
use std::io::{self, Result as IoResult, Stdout, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
use super::event::Event;
use crate::telemetry::{PlainFormatter, TelemetryFormatter};
pub trait EventSink: Sync + Send {
fn handle(&mut self, event: &Event) -> IoResult<()>;
fn name(&self) -> String {
type_name::<Self>().to_string()
}
}
pub struct StdOutSink<F: TelemetryFormatter = PlainFormatter> {
handle: Stdout,
formatter: F,
}
impl Default for StdOutSink {
fn default() -> Self {
Self {
handle: io::stdout(),
formatter: PlainFormatter::new(),
}
}
}
impl<F: TelemetryFormatter> StdOutSink<F> {
pub fn with_formatter(formatter: F) -> Self {
Self {
handle: io::stdout(),
formatter,
}
}
}
impl<F: TelemetryFormatter> EventSink for StdOutSink<F> {
fn handle(&mut self, event: &Event) -> IoResult<()> {
let text = self.formatter.render_event(event).join_lines();
self.handle.write_all(text.as_bytes())?;
self.handle.flush()
}
}
#[derive(Clone, Default)]
pub struct MemorySink {
entries: Arc<Mutex<Vec<Event>>>,
}
impl MemorySink {
pub fn new() -> Self {
Self::default()
}
pub fn snapshot(&self) -> Vec<Event> {
self.entries
.lock()
.expect("MemorySink mutex poisoned")
.clone()
}
pub fn clear(&self) {
self.entries
.lock()
.expect("MemorySink mutex poisoned")
.clear();
}
}
impl EventSink for MemorySink {
fn handle(&mut self, event: &Event) -> IoResult<()> {
self.entries
.lock()
.expect("MemorySink mutex poisoned")
.push(event.clone());
Ok(())
}
}
pub struct JsonLinesSink {
handle: Box<dyn Write + Send + Sync>,
pretty: bool,
}
impl JsonLinesSink {
pub fn new(handle: Box<dyn Write + Send + Sync>) -> Self {
Self {
handle,
pretty: false,
}
}
pub fn with_pretty_print(handle: Box<dyn Write + Send + Sync>) -> Self {
Self {
handle,
pretty: true,
}
}
pub fn to_stdout() -> Self {
Self::new(Box::new(io::stdout()))
}
pub fn to_file(path: impl AsRef<Path>) -> IoResult<Self> {
Ok(Self::new(Box::new(File::create(path)?)))
}
}
impl EventSink for JsonLinesSink {
fn handle(&mut self, event: &Event) -> IoResult<()> {
let json = if self.pretty {
event.to_json_pretty()
} else {
event.to_json_string()
}
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
writeln!(self.handle, "{json}")?;
self.handle.flush()
}
fn name(&self) -> String {
if self.pretty {
"JsonLinesSink(pretty)".to_string()
} else {
"JsonLinesSink".to_string()
}
}
}
pub struct ChannelSink {
tx: flume::Sender<Event>,
}
impl ChannelSink {
pub fn new(tx: flume::Sender<Event>) -> Self {
Self { tx }
}
}
impl EventSink for ChannelSink {
fn handle(&mut self, event: &Event) -> IoResult<()> {
self.tx
.send(event.clone())
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "channel receiver dropped"))
}
}