log4rs_logstash/
appender.rs

1use anyhow::Result as AnyResult;
2use log::Level as LogLevel;
3use log::Record;
4use log4rs::append::Append;
5use logstash_rs::LogStashRecord;
6use logstash_rs::Sender;
7use logstash_rs::{BufferedSender, TcpSender};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::time::Duration;
11
12pub struct Appender<S> {
13    sender: S,
14    extra_fields: HashMap<String, Value>,
15}
16
17impl<S> std::fmt::Debug for Appender<S> {
18    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19        write!(f, "{}::Appender", module_path!())
20    }
21}
22
23#[derive(Debug)]
24pub struct AppenderBuilder {
25    hostname: String,
26    port: u16,
27    buffer_size: Option<usize>,
28    buffer_lifetime: Option<Duration>,
29    connection_timeout: Option<Duration>,
30    ignore_buffer: LogLevel,
31    use_tls: bool,
32    error_period: Duration,
33    extra_fields: HashMap<String, Value>,
34}
35
36impl Default for AppenderBuilder {
37    fn default() -> AppenderBuilder {
38        AppenderBuilder {
39            hostname: "127.0.0.1".to_string(),
40            port: 5044,
41            buffer_size: Some(1024),
42            buffer_lifetime: Some(Duration::from_secs(1)),
43            connection_timeout: Some(Duration::from_secs(10)),
44            use_tls: false,
45            ignore_buffer: LogLevel::Error,
46            error_period: Duration::from_secs(10),
47            extra_fields: Default::default(),
48        }
49    }
50}
51
52impl AppenderBuilder {
53    /// Sets threshold for this logger to level.
54    pub fn with_ignore_buffer_level(&mut self, level: LogLevel) -> &mut AppenderBuilder {
55        self.ignore_buffer = level;
56        self
57    }
58
59    /// Sets the hostname of the remote server.
60    pub fn with_hostname(&mut self, hostname: impl Into<String>) -> &mut AppenderBuilder {
61        self.hostname = hostname.into();
62        self
63    }
64
65    /// Sets the port of the remote server.
66    pub fn with_port(&mut self, port: u16) -> &mut AppenderBuilder {
67        self.port = port;
68        self
69    }
70
71    /// Sets the upperbound limit on the number of records that can be placed in the buffer, once
72    /// this size has been reached, the buffer will be sent to the remote server.
73    /// If buffer size is 0 or 1 then buffer is not used
74    pub fn with_buffer_size(&mut self, buffer_size: usize) -> &mut AppenderBuilder {
75        if buffer_size < 2 {
76            self.buffer_size = None;
77        } else {
78            self.buffer_size = Some(buffer_size);
79        }
80        self
81    }
82
83    /// Sets the maximum lifetime of the buffer before send it to the remote server.
84    pub fn with_buffer_lifetime(&mut self, buffer_duration: Duration) -> &mut AppenderBuilder {
85        self.buffer_lifetime = Some(buffer_duration);
86        self
87    }
88
89    /// Sets the timeout for network connections.
90    pub fn with_connection_timeout(&mut self, timeout: Duration) -> &mut AppenderBuilder {
91        self.connection_timeout = Some(timeout);
92        self
93    }
94
95    /// Use tls connection.
96    pub fn with_use_tls(&mut self, use_tls: bool) -> &mut AppenderBuilder {
97        self.use_tls = use_tls;
98        self
99    }
100
101    /// Print period for internal logstash errors.
102    pub fn with_error_period(&mut self, error_period: Duration) -> &mut AppenderBuilder {
103        self.error_period = error_period;
104        self
105    }
106
107    /// Additional fields to send to logstash
108    pub fn with_extra_fields(
109        &mut self,
110        extra_fields: HashMap<String, Value>,
111    ) -> &mut AppenderBuilder {
112        self.extra_fields = extra_fields;
113        self
114    }
115
116    /// Invoke the builder and return a [`Appender`](struct.Appender.html).
117    pub fn build(self) -> AnyResult<Appender<BufferedSender>> {
118        Ok(Appender {
119            sender: BufferedSender::new(
120                TcpSender::new(
121                    self.hostname,
122                    self.port,
123                    self.use_tls,
124                    self.connection_timeout,
125                ),
126                self.buffer_size,
127                self.buffer_lifetime,
128                self.ignore_buffer,
129                self.error_period,
130            ),
131            extra_fields: self.extra_fields,
132        })
133    }
134}
135
136impl<S> Appender<S>
137where
138    S: Sender + Sync + Send + 'static,
139{
140    pub fn builder() -> AppenderBuilder {
141        AppenderBuilder::default()
142    }
143
144    fn try_flush(&self) -> AnyResult<()> {
145        self.sender.flush()?;
146        Ok(())
147    }
148}
149
150impl<S> Append for Appender<S>
151where
152    S: Sender + Sync + Send + 'static,
153{
154    fn append(&self, record: &Record) -> AnyResult<()> {
155        self.sender
156            .send(LogStashRecord::from_record(record).with_data_from_map(&self.extra_fields))?;
157        Ok(())
158    }
159    fn flush(&self) {
160        if let Err(err) = self.try_flush() {
161            eprintln!("Logstash appender failed to flush: {}", err);
162        }
163    }
164}