qoollo_logstash_rs/
buffer.rs1use log::Level;
2
3use crate::prelude::*;
4use std::{
5 sync::mpsc::{self, TrySendError},
6 time::{Duration, Instant},
7};
8
9#[derive(Debug, Clone)]
10pub(crate) enum Command {
11 Send(LogStashRecord),
12 SendBatch(Vec<LogStashRecord>),
13 Flush,
14}
15
16pub struct BufferedSender {
17 sender: mpsc::SyncSender<Command>,
18}
19
20impl BufferedSender {
21 pub fn new<S: Sender>(
22 sender: S,
23 buffer_size: Option<usize>,
24 buffer_lifetime: Option<Duration>,
25 ignore_buffer: Level,
26 error_period: Duration,
27 log_queue_len: usize,
28 ) -> Self {
29 let sender = BufferedSenderThread::new(
30 sender,
31 buffer_size,
32 buffer_lifetime,
33 ignore_buffer,
34 error_period,
35 log_queue_len,
36 )
37 .run();
38 Self { sender }
39 }
40}
41
42impl Sender for BufferedSender {
43 fn send(&self, event: LogStashRecord) -> Result<()> {
44 let important = event.level <= Level::Warn;
45 let result = self.sender.try_send(Command::Send(event));
46 process_result(result, important)
47 }
48
49 fn send_batch(&self, events: Vec<LogStashRecord>) -> Result<()> {
50 let important = events.iter().any(|e| e.level <= Level::Warn);
51 let result = self.sender.try_send(Command::SendBatch(events));
52 process_result(result, important)
53 }
54
55 fn flush(&self) -> Result<()> {
56 let result = self.sender.try_send(Command::Flush);
57 process_result(result, false)
58 }
59}
60
61fn process_result<T>(r: std::result::Result<(), TrySendError<T>>, log_full: bool) -> Result<()> {
62 match r {
63 Err(TrySendError::Disconnected(..)) => {
64 Err(Error::SenderThreadStopped(r.unwrap_err().to_string()))
65 }
66 Err(TrySendError::Full(..)) if log_full => Err(Error::BufferFull()),
67 _ => Ok(()),
68 }
69}
70
71#[derive(Debug)]
72struct BufferedSenderThread<S: Sender> {
73 sender: S,
74 buffer: Vec<LogStashRecord>,
75 buffer_size: Option<usize>,
76 buffer_lifetime: Option<Duration>,
77 deadline: Option<Instant>,
78 ignore_buffer: Level,
79 error_period: Duration,
80 log_queue_len: usize,
81}
82
83impl<S: Sender> BufferedSenderThread<S> {
84 fn new(
85 sender: S,
86 buffer_size: Option<usize>,
87 buffer_lifetime: Option<Duration>,
88 ignore_buffer: Level,
89 error_period: Duration,
90 log_queue_len: usize,
91 ) -> Self {
92 Self {
93 sender,
94 buffer: Vec::with_capacity(buffer_size.unwrap_or(0)),
95 buffer_size,
96 buffer_lifetime,
97 deadline: None,
98 ignore_buffer,
99 error_period,
100 log_queue_len,
101 }
102 }
103
104 fn run(self) -> mpsc::SyncSender<Command> {
105 let (sender, receiver) = mpsc::sync_channel(self.log_queue_len);
106 self.run_thread(receiver);
107 sender
108 }
109
110 fn next_deadline(&self) -> Option<Instant> {
111 if self.buffer.is_empty() && self.buffer_size.is_some() {
112 return self.buffer_lifetime.map(|lt| Instant::now() + lt);
113 }
114 None
115 }
116
117 fn run_thread(mut self, receiver: mpsc::Receiver<Command>) {
118 std::thread::spawn::<_, Result<()>>(move || {
119 {
120 let mut last_error: Option<Instant> = None;
121 loop {
122 let cmd = match self.deadline {
123 Some(deadline) => receiver
124 .recv_timeout(deadline.saturating_duration_since(Instant::now())),
125 None => receiver
126 .recv()
127 .map_err(|_| mpsc::RecvTimeoutError::Disconnected),
128 };
129
130 if let Ok(Command::SendBatch(_) | Command::Send(_)) = &cmd {
131 self.deadline = self.next_deadline();
132 }
133 let _ = match cmd {
134 Ok(Command::Flush) | Err(mpsc::RecvTimeoutError::Timeout) => self.flush(),
135 Ok(Command::Send(event)) => self.send(event),
136 Ok(Command::SendBatch(events)) => self.send_batch(events),
137 Err(mpsc::RecvTimeoutError::Disconnected) => break,
138 }
139 .or_else(|err| {
140 if last_error
141 .as_ref()
142 .map(|x| x.elapsed() > self.error_period)
143 .unwrap_or(true)
144 {
145 println!("logstash logger error: {}", err);
146 last_error = Some(Instant::now());
147 }
148 if matches!(
149 err,
150 Error::FatalInternal(..) | Error::SenderThreadStopped(..)
151 ) {
152 Result::Err(err)
153 } else {
154 Result::Ok(())
155 }
156 })?;
157 }
158 Ok(())
159 }
160 .map_err(|err| {
161 println!("fatal logger error: {}", err);
162 err
163 })
164 });
165 }
166
167 fn send(&mut self, event: LogStashRecord) -> Result<()> {
168 if event.level >= self.ignore_buffer {
169 self.sender.send(event)?;
170 } else if let Some(max_size) = self.buffer_size {
171 self.buffer.push(event);
172 if self.buffer.len() >= max_size {
173 self.flush()?;
174 }
175 } else {
176 self.sender.send(event)?;
177 }
178 Ok(())
179 }
180
181 fn send_batch(&mut self, events: Vec<LogStashRecord>) -> Result<()> {
182 for event in events {
183 self.send(event)?;
184 }
185 Ok(())
186 }
187
188 fn flush(&mut self) -> Result<()> {
189 if !self.buffer.is_empty() {
190 let buffer = std::mem::replace(
191 &mut self.buffer,
192 Vec::with_capacity(self.buffer_size.unwrap_or_default()),
193 );
194 self.sender.send_batch(buffer)?;
195 }
196 self.sender.flush()?;
197 self.deadline = None;
198 Ok(())
199 }
200}
201
202impl log::Log for BufferedSender {
203 fn enabled(&self, _metadata: &log::Metadata) -> bool {
204 true
205 }
206
207 fn log(&self, record: &log::Record) {
208 let record = LogStashRecord::from_record(record);
209 let _ = self.send(record);
210 }
211
212 fn flush(&self) {
213 let _ = Sender::flush(self);
214 }
215}