logstash_rs/
buffer.rs

1use log::Level;
2
3use crate::prelude::*;
4use std::{
5    sync::mpsc,
6    time::{Duration, Instant},
7};
8
9#[derive(Debug, Clone)]
10pub(crate) enum Command {
11    Send(LogStashRecord),
12    SendBatch(Vec<LogStashRecord>),
13    Flush,
14}
15
16pub struct BufferedSender {
17    sender: mpsc::SyncSender<Command>,
18}
19
20impl BufferedSender {
21    pub fn new<S: Sender>(
22        sender: S,
23        buffer_size: Option<usize>,
24        buffer_lifetime: Option<Duration>,
25        ignore_buffer: Level,
26        error_period: Duration,
27    ) -> Self {
28        let sender = BufferedSenderThread::new(
29            sender,
30            buffer_size,
31            buffer_lifetime,
32            ignore_buffer,
33            error_period,
34        )
35        .run();
36        Self { sender }
37    }
38}
39
40impl Sender for BufferedSender {
41    fn send(&self, event: LogStashRecord) -> Result<()> {
42        self.sender.send(Command::Send(event)).map_err(From::from)
43    }
44
45    fn send_batch(&self, events: Vec<LogStashRecord>) -> Result<()> {
46        self.sender.send(Command::SendBatch(events))?;
47        Ok(())
48    }
49
50    fn flush(&self) -> Result<()> {
51        self.sender.send(Command::Flush)?;
52        Ok(())
53    }
54}
55
56#[derive(Debug)]
57struct BufferedSenderThread<S: Sender> {
58    sender: S,
59    buffer: Vec<LogStashRecord>,
60    buffer_size: Option<usize>,
61    buffer_lifetime: Option<Duration>,
62    deadline: Option<Instant>,
63    ignore_buffer: Level,
64    error_period: Duration,
65}
66
67impl<S: Sender> BufferedSenderThread<S> {
68    fn new(
69        sender: S,
70        buffer_size: Option<usize>,
71        buffer_lifetime: Option<Duration>,
72        ignore_buffer: Level,
73        error_period: Duration,
74    ) -> Self {
75        Self {
76            sender,
77            buffer: Vec::with_capacity(buffer_size.unwrap_or(0)),
78            buffer_size,
79            buffer_lifetime,
80            deadline: None,
81            ignore_buffer,
82            error_period,
83        }
84    }
85
86    fn run(self) -> mpsc::SyncSender<Command> {
87        let (sender, receiver) = mpsc::sync_channel(1);
88        self.run_thread(receiver);
89        sender
90    }
91
92    fn next_deadline(&self) -> Option<Instant> {
93        if self.buffer.is_empty() && self.buffer_size.is_some() {
94            return self.buffer_lifetime.map(|lt| Instant::now() + lt);
95        }
96        None
97    }
98
99    fn run_thread(mut self, receiver: mpsc::Receiver<Command>) {
100        std::thread::spawn::<_, Result<()>>(move || {
101            {
102                let mut last_error: Option<Instant> = None;
103                loop {
104                    let cmd = match self.deadline {
105                        Some(deadline) => receiver
106                            .recv_timeout(deadline.saturating_duration_since(Instant::now())),
107                        None => receiver
108                            .recv()
109                            .map_err(|_| mpsc::RecvTimeoutError::Disconnected),
110                    };
111
112                    if let Ok(Command::SendBatch(_) | Command::Send(_)) = &cmd {
113                        self.deadline = self.next_deadline();
114                    }
115                    let _ = match cmd {
116                        Ok(Command::Flush) | Err(mpsc::RecvTimeoutError::Timeout) => self.flush(),
117                        Ok(Command::Send(event)) => self.send(event),
118                        Ok(Command::SendBatch(events)) => self.send_batch(events),
119                        Err(mpsc::RecvTimeoutError::Disconnected) => break,
120                    }
121                    .or_else(|err| {
122                        if last_error
123                            .as_ref()
124                            .map(|x| x.elapsed() > self.error_period)
125                            .unwrap_or(true)
126                        {
127                            println!("logstash logger error: {}", err);
128                            last_error = Some(Instant::now());
129                        }
130                        if matches!(
131                            err,
132                            Error::FatalInternal(..) | Error::SenderThreadStopped(..)
133                        ) {
134                            Result::Err(err)
135                        } else {
136                            Result::Ok(())
137                        }
138                    })?;
139                }
140                Ok(())
141            }
142            .map_err(|err| {
143                println!("fatal logger error: {}", err);
144                err
145            })
146        });
147    }
148
149    fn send(&mut self, event: LogStashRecord) -> Result<()> {
150        if event.level >= self.ignore_buffer {
151            self.sender.send(event)?;
152        } else if let Some(max_size) = self.buffer_size {
153            self.buffer.push(event);
154            if self.buffer.len() >= max_size {
155                self.flush()?;
156            }
157        } else {
158            self.sender.send(event)?;
159        }
160        Ok(())
161    }
162
163    fn send_batch(&mut self, events: Vec<LogStashRecord>) -> Result<()> {
164        for event in events {
165            self.send(event)?;
166        }
167        Ok(())
168    }
169
170    fn flush(&mut self) -> Result<()> {
171        if !self.buffer.is_empty() {
172            let buffer = std::mem::replace(
173                &mut self.buffer,
174                Vec::with_capacity(self.buffer_size.unwrap_or_default()),
175            );
176            self.sender.send_batch(buffer)?;
177        }
178        self.sender.flush()?;
179        self.deadline = None;
180        Ok(())
181    }
182}
183
184impl log::Log for BufferedSender {
185    fn enabled(&self, _metadata: &log::Metadata) -> bool {
186        true
187    }
188
189    fn log(&self, record: &log::Record) {
190        let record = LogStashRecord::from_record(record);
191        let _ = self.send(record);
192    }
193
194    fn flush(&self) {
195        let _ = Sender::flush(self);
196    }
197}