log4rs_logstash/
appender.rs1use anyhow::Result as AnyResult;
2use log::Level as LogLevel;
3use log::Record;
4use log4rs::append::Append;
5use logstash_rs::LogStashRecord;
6use logstash_rs::Sender;
7use logstash_rs::{BufferedSender, TcpSender};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::time::Duration;
11
12pub struct Appender<S> {
13 sender: S,
14 extra_fields: HashMap<String, Value>,
15}
16
17impl<S> std::fmt::Debug for Appender<S> {
18 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19 write!(f, "{}::Appender", module_path!())
20 }
21}
22
23#[derive(Debug)]
24pub struct AppenderBuilder {
25 hostname: String,
26 port: u16,
27 buffer_size: Option<usize>,
28 buffer_lifetime: Option<Duration>,
29 connection_timeout: Option<Duration>,
30 ignore_buffer: LogLevel,
31 use_tls: bool,
32 error_period: Duration,
33 extra_fields: HashMap<String, Value>,
34}
35
36impl Default for AppenderBuilder {
37 fn default() -> AppenderBuilder {
38 AppenderBuilder {
39 hostname: "127.0.0.1".to_string(),
40 port: 5044,
41 buffer_size: Some(1024),
42 buffer_lifetime: Some(Duration::from_secs(1)),
43 connection_timeout: Some(Duration::from_secs(10)),
44 use_tls: false,
45 ignore_buffer: LogLevel::Error,
46 error_period: Duration::from_secs(10),
47 extra_fields: Default::default(),
48 }
49 }
50}
51
52impl AppenderBuilder {
53 pub fn with_ignore_buffer_level(&mut self, level: LogLevel) -> &mut AppenderBuilder {
55 self.ignore_buffer = level;
56 self
57 }
58
59 pub fn with_hostname(&mut self, hostname: impl Into<String>) -> &mut AppenderBuilder {
61 self.hostname = hostname.into();
62 self
63 }
64
65 pub fn with_port(&mut self, port: u16) -> &mut AppenderBuilder {
67 self.port = port;
68 self
69 }
70
71 pub fn with_buffer_size(&mut self, buffer_size: usize) -> &mut AppenderBuilder {
75 if buffer_size < 2 {
76 self.buffer_size = None;
77 } else {
78 self.buffer_size = Some(buffer_size);
79 }
80 self
81 }
82
83 pub fn with_buffer_lifetime(&mut self, buffer_duration: Duration) -> &mut AppenderBuilder {
85 self.buffer_lifetime = Some(buffer_duration);
86 self
87 }
88
89 pub fn with_connection_timeout(&mut self, timeout: Duration) -> &mut AppenderBuilder {
91 self.connection_timeout = Some(timeout);
92 self
93 }
94
95 pub fn with_use_tls(&mut self, use_tls: bool) -> &mut AppenderBuilder {
97 self.use_tls = use_tls;
98 self
99 }
100
101 pub fn with_error_period(&mut self, error_period: Duration) -> &mut AppenderBuilder {
103 self.error_period = error_period;
104 self
105 }
106
107 pub fn with_extra_fields(
109 &mut self,
110 extra_fields: HashMap<String, Value>,
111 ) -> &mut AppenderBuilder {
112 self.extra_fields = extra_fields;
113 self
114 }
115
116 pub fn build(self) -> AnyResult<Appender<BufferedSender>> {
118 Ok(Appender {
119 sender: BufferedSender::new(
120 TcpSender::new(
121 self.hostname,
122 self.port,
123 self.use_tls,
124 self.connection_timeout,
125 ),
126 self.buffer_size,
127 self.buffer_lifetime,
128 self.ignore_buffer,
129 self.error_period,
130 ),
131 extra_fields: self.extra_fields,
132 })
133 }
134}
135
136impl<S> Appender<S>
137where
138 S: Sender + Sync + Send + 'static,
139{
140 pub fn builder() -> AppenderBuilder {
141 AppenderBuilder::default()
142 }
143
144 fn try_flush(&self) -> AnyResult<()> {
145 self.sender.flush()?;
146 Ok(())
147 }
148}
149
150impl<S> Append for Appender<S>
151where
152 S: Sender + Sync + Send + 'static,
153{
154 fn append(&self, record: &Record) -> AnyResult<()> {
155 self.sender
156 .send(LogStashRecord::from_record(record).with_data_from_map(&self.extra_fields))?;
157 Ok(())
158 }
159 fn flush(&self) {
160 if let Err(err) = self.try_flush() {
161 eprintln!("Logstash appender failed to flush: {}", err);
162 }
163 }
164}