1use log::Level;
2
3use crate::prelude::*;
4use std::{
5 sync::mpsc,
6 time::{Duration, Instant},
7};
8
9#[derive(Debug, Clone)]
10pub(crate) enum Command {
11 Send(LogStashRecord),
12 SendBatch(Vec<LogStashRecord>),
13 Flush,
14}
15
16pub struct BufferedSender {
17 sender: mpsc::SyncSender<Command>,
18}
19
20impl BufferedSender {
21 pub fn new<S: Sender>(
22 sender: S,
23 buffer_size: Option<usize>,
24 buffer_lifetime: Option<Duration>,
25 ignore_buffer: Level,
26 error_period: Duration,
27 ) -> Self {
28 let sender = BufferedSenderThread::new(
29 sender,
30 buffer_size,
31 buffer_lifetime,
32 ignore_buffer,
33 error_period,
34 )
35 .run();
36 Self { sender }
37 }
38}
39
40impl Sender for BufferedSender {
41 fn send(&self, event: LogStashRecord) -> Result<()> {
42 self.sender.send(Command::Send(event)).map_err(From::from)
43 }
44
45 fn send_batch(&self, events: Vec<LogStashRecord>) -> Result<()> {
46 self.sender.send(Command::SendBatch(events))?;
47 Ok(())
48 }
49
50 fn flush(&self) -> Result<()> {
51 self.sender.send(Command::Flush)?;
52 Ok(())
53 }
54}
55
56#[derive(Debug)]
57struct BufferedSenderThread<S: Sender> {
58 sender: S,
59 buffer: Vec<LogStashRecord>,
60 buffer_size: Option<usize>,
61 buffer_lifetime: Option<Duration>,
62 deadline: Option<Instant>,
63 ignore_buffer: Level,
64 error_period: Duration,
65}
66
67impl<S: Sender> BufferedSenderThread<S> {
68 fn new(
69 sender: S,
70 buffer_size: Option<usize>,
71 buffer_lifetime: Option<Duration>,
72 ignore_buffer: Level,
73 error_period: Duration,
74 ) -> Self {
75 Self {
76 sender,
77 buffer: Vec::with_capacity(buffer_size.unwrap_or(0)),
78 buffer_size,
79 buffer_lifetime,
80 deadline: None,
81 ignore_buffer,
82 error_period,
83 }
84 }
85
86 fn run(self) -> mpsc::SyncSender<Command> {
87 let (sender, receiver) = mpsc::sync_channel(1);
88 self.run_thread(receiver);
89 sender
90 }
91
92 fn next_deadline(&self) -> Option<Instant> {
93 if self.buffer.is_empty() && self.buffer_size.is_some() {
94 return self.buffer_lifetime.map(|lt| Instant::now() + lt);
95 }
96 None
97 }
98
99 fn run_thread(mut self, receiver: mpsc::Receiver<Command>) {
100 std::thread::spawn::<_, Result<()>>(move || {
101 {
102 let mut last_error: Option<Instant> = None;
103 loop {
104 let cmd = match self.deadline {
105 Some(deadline) => receiver
106 .recv_timeout(deadline.saturating_duration_since(Instant::now())),
107 None => receiver
108 .recv()
109 .map_err(|_| mpsc::RecvTimeoutError::Disconnected),
110 };
111
112 if let Ok(Command::SendBatch(_) | Command::Send(_)) = &cmd {
113 self.deadline = self.next_deadline();
114 }
115 let _ = match cmd {
116 Ok(Command::Flush) | Err(mpsc::RecvTimeoutError::Timeout) => self.flush(),
117 Ok(Command::Send(event)) => self.send(event),
118 Ok(Command::SendBatch(events)) => self.send_batch(events),
119 Err(mpsc::RecvTimeoutError::Disconnected) => break,
120 }
121 .or_else(|err| {
122 if last_error
123 .as_ref()
124 .map(|x| x.elapsed() > self.error_period)
125 .unwrap_or(true)
126 {
127 println!("logstash logger error: {}", err);
128 last_error = Some(Instant::now());
129 }
130 if matches!(
131 err,
132 Error::FatalInternal(..) | Error::SenderThreadStopped(..)
133 ) {
134 Result::Err(err)
135 } else {
136 Result::Ok(())
137 }
138 })?;
139 }
140 Ok(())
141 }
142 .map_err(|err| {
143 println!("fatal logger error: {}", err);
144 err
145 })
146 });
147 }
148
149 fn send(&mut self, event: LogStashRecord) -> Result<()> {
150 if event.level >= self.ignore_buffer {
151 self.sender.send(event)?;
152 } else if let Some(max_size) = self.buffer_size {
153 self.buffer.push(event);
154 if self.buffer.len() >= max_size {
155 self.flush()?;
156 }
157 } else {
158 self.sender.send(event)?;
159 }
160 Ok(())
161 }
162
163 fn send_batch(&mut self, events: Vec<LogStashRecord>) -> Result<()> {
164 for event in events {
165 self.send(event)?;
166 }
167 Ok(())
168 }
169
170 fn flush(&mut self) -> Result<()> {
171 if !self.buffer.is_empty() {
172 let buffer = std::mem::replace(
173 &mut self.buffer,
174 Vec::with_capacity(self.buffer_size.unwrap_or_default()),
175 );
176 self.sender.send_batch(buffer)?;
177 }
178 self.sender.flush()?;
179 self.deadline = None;
180 Ok(())
181 }
182}
183
184impl log::Log for BufferedSender {
185 fn enabled(&self, _metadata: &log::Metadata) -> bool {
186 true
187 }
188
189 fn log(&self, record: &log::Record) {
190 let record = LogStashRecord::from_record(record);
191 let _ = self.send(record);
192 }
193
194 fn flush(&self) {
195 let _ = Sender::flush(self);
196 }
197}