qoollo_logstash_rs/
buffer.rs

1use log::Level;
2
3use crate::prelude::*;
4use std::{
5    sync::mpsc::{self, TrySendError},
6    time::{Duration, Instant},
7};
8
9#[derive(Debug, Clone)]
10pub(crate) enum Command {
11    Send(LogStashRecord),
12    SendBatch(Vec<LogStashRecord>),
13    Flush,
14}
15
16pub struct BufferedSender {
17    sender: mpsc::SyncSender<Command>,
18}
19
20impl BufferedSender {
21    pub fn new<S: Sender>(
22        sender: S,
23        buffer_size: Option<usize>,
24        buffer_lifetime: Option<Duration>,
25        ignore_buffer: Level,
26        error_period: Duration,
27        log_queue_len: usize,
28    ) -> Self {
29        let sender = BufferedSenderThread::new(
30            sender,
31            buffer_size,
32            buffer_lifetime,
33            ignore_buffer,
34            error_period,
35            log_queue_len,
36        )
37        .run();
38        Self { sender }
39    }
40}
41
42impl Sender for BufferedSender {
43    fn send(&self, event: LogStashRecord) -> Result<()> {
44        let important = event.level <= Level::Warn;
45        let result = self.sender.try_send(Command::Send(event));
46        process_result(result, important)
47    }
48
49    fn send_batch(&self, events: Vec<LogStashRecord>) -> Result<()> {
50        let important = events.iter().any(|e| e.level <= Level::Warn);
51        let result = self.sender.try_send(Command::SendBatch(events));
52        process_result(result, important)
53    }
54
55    fn flush(&self) -> Result<()> {
56        let result = self.sender.try_send(Command::Flush);
57        process_result(result, false)
58    }
59}
60
61fn process_result<T>(r: std::result::Result<(), TrySendError<T>>, log_full: bool) -> Result<()> {
62    match r {
63        Err(TrySendError::Disconnected(..)) => {
64            Err(Error::SenderThreadStopped(r.unwrap_err().to_string()))
65        }
66        Err(TrySendError::Full(..)) if log_full => Err(Error::BufferFull()),
67        _ => Ok(()),
68    }
69}
70
71#[derive(Debug)]
72struct BufferedSenderThread<S: Sender> {
73    sender: S,
74    buffer: Vec<LogStashRecord>,
75    buffer_size: Option<usize>,
76    buffer_lifetime: Option<Duration>,
77    deadline: Option<Instant>,
78    ignore_buffer: Level,
79    error_period: Duration,
80    log_queue_len: usize,
81}
82
83impl<S: Sender> BufferedSenderThread<S> {
84    fn new(
85        sender: S,
86        buffer_size: Option<usize>,
87        buffer_lifetime: Option<Duration>,
88        ignore_buffer: Level,
89        error_period: Duration,
90        log_queue_len: usize,
91    ) -> Self {
92        Self {
93            sender,
94            buffer: Vec::with_capacity(buffer_size.unwrap_or(0)),
95            buffer_size,
96            buffer_lifetime,
97            deadline: None,
98            ignore_buffer,
99            error_period,
100            log_queue_len,
101        }
102    }
103
104    fn run(self) -> mpsc::SyncSender<Command> {
105        let (sender, receiver) = mpsc::sync_channel(self.log_queue_len);
106        self.run_thread(receiver);
107        sender
108    }
109
110    fn next_deadline(&self) -> Option<Instant> {
111        if self.buffer.is_empty() && self.buffer_size.is_some() {
112            return self.buffer_lifetime.map(|lt| Instant::now() + lt);
113        }
114        None
115    }
116
117    fn run_thread(mut self, receiver: mpsc::Receiver<Command>) {
118        std::thread::spawn::<_, Result<()>>(move || {
119            {
120                let mut last_error: Option<Instant> = None;
121                loop {
122                    let cmd = match self.deadline {
123                        Some(deadline) => receiver
124                            .recv_timeout(deadline.saturating_duration_since(Instant::now())),
125                        None => receiver
126                            .recv()
127                            .map_err(|_| mpsc::RecvTimeoutError::Disconnected),
128                    };
129
130                    if let Ok(Command::SendBatch(_) | Command::Send(_)) = &cmd {
131                        self.deadline = self.next_deadline();
132                    }
133                    let _ = match cmd {
134                        Ok(Command::Flush) | Err(mpsc::RecvTimeoutError::Timeout) => self.flush(),
135                        Ok(Command::Send(event)) => self.send(event),
136                        Ok(Command::SendBatch(events)) => self.send_batch(events),
137                        Err(mpsc::RecvTimeoutError::Disconnected) => break,
138                    }
139                    .or_else(|err| {
140                        if last_error
141                            .as_ref()
142                            .map(|x| x.elapsed() > self.error_period)
143                            .unwrap_or(true)
144                        {
145                            println!("logstash logger error: {}", err);
146                            last_error = Some(Instant::now());
147                        }
148                        if matches!(
149                            err,
150                            Error::FatalInternal(..) | Error::SenderThreadStopped(..)
151                        ) {
152                            Result::Err(err)
153                        } else {
154                            Result::Ok(())
155                        }
156                    })?;
157                }
158                Ok(())
159            }
160            .map_err(|err| {
161                println!("fatal logger error: {}", err);
162                err
163            })
164        });
165    }
166
167    fn send(&mut self, event: LogStashRecord) -> Result<()> {
168        if event.level >= self.ignore_buffer {
169            self.sender.send(event)?;
170        } else if let Some(max_size) = self.buffer_size {
171            self.buffer.push(event);
172            if self.buffer.len() >= max_size {
173                self.flush()?;
174            }
175        } else {
176            self.sender.send(event)?;
177        }
178        Ok(())
179    }
180
181    fn send_batch(&mut self, events: Vec<LogStashRecord>) -> Result<()> {
182        for event in events {
183            self.send(event)?;
184        }
185        Ok(())
186    }
187
188    fn flush(&mut self) -> Result<()> {
189        if !self.buffer.is_empty() {
190            let buffer = std::mem::replace(
191                &mut self.buffer,
192                Vec::with_capacity(self.buffer_size.unwrap_or_default()),
193            );
194            self.sender.send_batch(buffer)?;
195        }
196        self.sender.flush()?;
197        self.deadline = None;
198        Ok(())
199    }
200}
201
202impl log::Log for BufferedSender {
203    fn enabled(&self, _metadata: &log::Metadata) -> bool {
204        true
205    }
206
207    fn log(&self, record: &log::Record) {
208        let record = LogStashRecord::from_record(record);
209        let _ = self.send(record);
210    }
211
212    fn flush(&self) {
213        let _ = Sender::flush(self);
214    }
215}