rust_loguru/handler/
network.rs1use std::fmt;
2use std::io::Write;
3use std::net::TcpStream;
4use std::sync::{Arc, Mutex};
5
6use crate::formatters::Formatter;
7use crate::level::LogLevel;
8use crate::record::Record;
9
10use super::{Handler, HandlerError, HandlerFilter};
11use std::fmt::Debug;
12
13pub struct NetworkHandler {
15 level: LogLevel,
16 enabled: bool,
17 formatter: Formatter,
18 stream: Arc<Mutex<Option<TcpStream>>>,
19 filter: Option<HandlerFilter>,
20 batch_buffer: Arc<Mutex<Vec<Record>>>,
21 batch_size: Option<usize>,
22 addr: String,
23}
24
25impl NetworkHandler {
26 pub fn new(stream: TcpStream, level: LogLevel) -> Self {
27 Self {
28 level,
29 enabled: true,
30 formatter: Formatter::text(),
31 stream: Arc::new(Mutex::new(Some(stream))),
32 filter: None,
33 batch_buffer: Arc::new(Mutex::new(Vec::new())),
34 batch_size: None,
35 addr: String::new(),
36 }
37 }
38
39 pub fn with_level(mut self, level: LogLevel) -> Self {
40 self.level = level;
41 self
42 }
43
44 pub fn with_formatter(mut self, formatter: Formatter) -> Self {
45 self.formatter = formatter;
46 self
47 }
48
49 pub fn with_colors(mut self, use_colors: bool) -> Self {
50 self.formatter = self.formatter.with_colors(use_colors);
51 self
52 }
53
54 pub fn with_pattern(mut self, pattern: impl Into<String>) -> Self {
55 self.formatter = self.formatter.with_pattern(pattern);
56 self
57 }
58
59 pub fn with_format<F>(mut self, format_fn: F) -> Self
60 where
61 F: Fn(&Record) -> String + Send + Sync + 'static,
62 {
63 self.formatter = self.formatter.with_format(format_fn);
64 self
65 }
66
67 pub fn with_filter(mut self, filter: HandlerFilter) -> Self {
68 self.filter = Some(filter);
69 self
70 }
71
72 pub fn with_batching(mut self, batch_size: usize) -> Self {
73 self.batch_size = Some(batch_size);
74 self
75 }
76}
77
78impl Handler for NetworkHandler {
79 fn handle(&self, record: &Record) -> Result<(), HandlerError> {
80 if !self.enabled || record.level() < self.level {
81 return Ok(());
82 }
83 if let Some(filter) = &self.filter {
84 if !(filter)(record) {
85 return Ok(());
86 }
87 }
88 if let Some(batch_size) = self.batch_size {
89 let mut buffer = self.batch_buffer.lock().unwrap();
90 buffer.push(record.clone());
91 if buffer.len() >= batch_size {
92 let batch = buffer.drain(..).collect::<Vec<_>>();
93 drop(buffer);
94 return self.handle_batch(&batch);
95 }
96 return Ok(());
97 }
98 let formatted = self.formatter.format(record);
99 if let Some(ref mut stream) = self.stream.lock().unwrap().as_mut() {
100 write!(stream, "{}", formatted).map_err(HandlerError::IoError)?;
101 stream.flush().map_err(HandlerError::IoError)?;
102 Ok(())
103 } else {
104 Err(HandlerError::NotInitialized)
105 }
106 }
107
108 fn level(&self) -> LogLevel {
109 self.level
110 }
111
112 fn set_level(&mut self, level: LogLevel) {
113 self.level = level;
114 }
115
116 fn is_enabled(&self) -> bool {
117 self.enabled
118 }
119
120 fn set_enabled(&mut self, enabled: bool) {
121 self.enabled = enabled;
122 }
123
124 fn formatter(&self) -> &Formatter {
125 &self.formatter
126 }
127
128 fn set_formatter(&mut self, formatter: Formatter) {
129 self.formatter = formatter;
130 }
131
132 fn set_filter(&mut self, filter: Option<HandlerFilter>) {
133 self.filter = filter;
134 }
135
136 fn filter(&self) -> Option<&HandlerFilter> {
137 self.filter.as_ref()
138 }
139
140 fn handle_batch(&self, records: &[Record]) -> Result<(), HandlerError> {
141 if !self.enabled {
142 return Ok(());
143 }
144
145 if let Some(ref mut stream) = self.stream.lock().unwrap().as_mut() {
146 for record in records {
147 if record.level() < self.level {
148 continue;
149 }
150 if let Some(filter) = &self.filter {
151 if !(filter)(record) {
152 continue;
153 }
154 }
155 let formatted = self.formatter.format(record);
156 write!(stream, "{}", formatted).map_err(HandlerError::IoError)?;
157 }
158 stream.flush().map_err(HandlerError::IoError)?;
159 Ok(())
160 } else {
161 Err(HandlerError::NotInitialized)
162 }
163 }
164
165 fn init(&mut self) -> Result<(), HandlerError> {
166 let stream = TcpStream::connect(&self.addr).map_err(HandlerError::IoError)?;
167 *self.stream.lock().unwrap() = Some(stream);
168 Ok(())
169 }
170
171 fn flush(&self) -> Result<(), HandlerError> {
172 if let Some(ref mut stream) = self.stream.lock().unwrap().as_mut() {
173 stream.flush().map_err(HandlerError::IoError)?;
174 Ok(())
175 } else {
176 Err(HandlerError::NotInitialized)
177 }
178 }
179
180 fn shutdown(&mut self) -> Result<(), HandlerError> {
181 self.flush()
182 }
183}
184
185impl Debug for NetworkHandler {
186 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187 f.debug_struct("NetworkHandler")
188 .field("level", &self.level)
189 .field("enabled", &self.enabled)
190 .field("formatter", &self.formatter)
191 .field("batch_size", &self.batch_size)
192 .finish()
193 }
194}
195
196impl Clone for NetworkHandler {
197 fn clone(&self) -> Self {
198 Self {
199 level: self.level,
200 enabled: self.enabled,
201 formatter: self.formatter.clone(),
202 stream: self.stream.clone(),
203 filter: self.filter.clone(),
204 batch_buffer: self.batch_buffer.clone(),
205 batch_size: self.batch_size,
206 addr: self.addr.clone(),
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use std::io::{BufRead, BufReader};
215 use std::net::{TcpListener, TcpStream};
216 use std::sync::mpsc::channel;
217 use std::thread;
218
219 #[test]
220 fn test_network_handler_filtering_and_batching() {
221 let (tx, rx) = channel();
222 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
223 let addr = listener.local_addr().unwrap();
224
225 thread::spawn(move || {
227 let (mut stream, _) = listener.accept().unwrap();
228 let mut reader = BufReader::new(&mut stream);
229 let mut lines = Vec::new();
230 loop {
231 let mut line = String::new();
232 match reader.read_line(&mut line) {
233 Ok(0) | Err(_) => break, Ok(_) => {
235 if !line.is_empty() {
236 lines.push(line);
237 }
238 }
239 }
240 }
241 tx.send(lines).unwrap();
242 });
243
244 let stream = TcpStream::connect(addr).unwrap();
246 let filter = std::sync::Arc::new(|record: &Record| record.message().contains("pass"));
247 let handler = NetworkHandler::new(stream, LogLevel::Info)
248 .with_filter(filter)
249 .with_batching(2);
250
251 let record1 = Record::new(
253 LogLevel::Info,
254 "should pass",
255 None::<String>,
256 None::<String>,
257 None,
258 );
259 let record2 = Record::new(
260 LogLevel::Info,
261 "should fail",
262 None::<String>,
263 None::<String>,
264 None,
265 );
266 let record3 = Record::new(
267 LogLevel::Info,
268 "should pass again",
269 None::<String>,
270 None::<String>,
271 None,
272 );
273
274 assert!(handler.handle(&record1).is_ok());
276 assert!(handler.handle(&record2).is_ok());
277 assert!(handler.handle(&record3).is_ok());
278 handler.flush().unwrap();
279
280 drop(handler);
282
283 let lines = rx.recv().unwrap();
285 assert!(lines.iter().any(|l| l.contains("should pass")));
286 assert!(lines.iter().any(|l| l.contains("should pass again")));
287 assert!(!lines.iter().any(|l| l.contains("should fail")));
288 }
289}