1use std::io::{self, Write};
4
5use crate::core::{Error, Event, Result};
6use crate::sink::SinkAdapter;
7
8#[derive(Debug, Default)]
43pub struct StdoutSink {
44 closed: bool,
45 events_sent: u64,
46 pretty: bool,
47 captured: Option<Vec<Event>>,
49}
50
51impl StdoutSink {
52 pub fn new() -> Self {
54 Self::default()
55 }
56
57 pub fn with_pretty() -> Self {
62 Self {
63 pretty: true,
64 ..Self::default()
65 }
66 }
67
68 pub fn with_capture() -> Self {
74 Self {
75 captured: Some(Vec::new()),
76 ..Self::default()
77 }
78 }
79
80 pub fn events_sent(&self) -> u64 {
82 self.events_sent
83 }
84}
85
86impl SinkAdapter for StdoutSink {
87 async fn send(&mut self, event: &Event) -> Result<()> {
88 if self.closed {
89 return Err(Error::StateError("StdoutSink is closed".into()));
90 }
91 let mut bytes = if self.pretty {
92 serde_json::to_vec_pretty(event)?
93 } else {
94 serde_json::to_vec(event)?
95 };
96 bytes.push(b'\n');
97 let stdout = io::stdout();
98 let mut handle = stdout.lock();
99 handle
100 .write_all(&bytes)
101 .map_err(|e| Error::StateError(format!("StdoutSink write: {e}")))?;
102 self.events_sent += 1;
103 if let Some(ref mut buf) = self.captured {
104 buf.push(event.clone());
105 }
106 Ok(())
107 }
108
109 async fn flush(&mut self) -> Result<()> {
110 if self.closed {
111 return Err(Error::StateError("StdoutSink is closed".into()));
112 }
113 let stdout = io::stdout();
114 let mut handle = stdout.lock();
115 handle
116 .flush()
117 .map_err(|e| Error::StateError(format!("StdoutSink flush: {e}")))?;
118 Ok(())
119 }
120
121 async fn close(&mut self) -> Result<()> {
122 if !self.closed {
123 let stdout = io::stdout();
125 let mut handle = stdout.lock();
126 let _ = handle.flush();
127 }
128 self.closed = true;
129 Ok(())
130 }
131
132 fn name(&self) -> &str {
133 "stdout"
134 }
135
136 fn is_closed(&self) -> bool {
137 self.closed
138 }
139
140 fn delivery_metrics(&self) -> Option<crate::sink::SinkDeliveryMetrics> {
141 Some(crate::sink::SinkDeliveryMetrics {
142 events_sent: self.events_sent,
143 ..Default::default()
144 })
145 }
146
147 fn exported_events(&self) -> Option<&[Event]> {
148 self.captured.as_deref()
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155
156 #[tokio::test]
157 async fn send_after_close_errors() {
158 let mut sink = StdoutSink::new();
159 sink.close().await.unwrap();
160 let event = Event::default();
161 assert!(sink.send(&event).await.is_err());
162 }
163
164 #[tokio::test]
165 async fn flush_after_close_errors() {
166 let mut sink = StdoutSink::new();
167 sink.close().await.unwrap();
168 assert!(sink.flush().await.is_err());
169 }
170
171 #[test]
172 fn name_is_stdout() {
173 let sink = StdoutSink::new();
174 assert_eq!(sink.name(), "stdout");
175 }
176
177 #[test]
178 fn is_closed_starts_false() {
179 let sink = StdoutSink::new();
180 assert!(!sink.is_closed());
181 }
182
183 #[tokio::test]
184 async fn is_closed_true_after_close() {
185 let mut sink = StdoutSink::new();
186 sink.close().await.unwrap();
187 assert!(sink.is_closed());
188 }
189
190 #[tokio::test]
191 async fn events_sent_increments() {
192 let mut sink = StdoutSink::new();
195 assert_eq!(sink.events_sent(), 0);
196 sink.send(&Event::default()).await.unwrap();
197 assert_eq!(sink.events_sent(), 1);
198 sink.send(&Event::default()).await.unwrap();
199 assert_eq!(sink.events_sent(), 2);
200 }
201}