rust_loguru/handler/
network.rs1use std::fmt;
2use std::io::Write;
3use std::net::TcpStream;
4use std::sync::{Arc, Mutex};
5
6use crate::formatters::Formatter;
7use crate::level::LogLevel;
8use crate::record::Record;
9
10use super::{Handler, HandlerFilter};
11use std::fmt::Debug;
12
13pub struct NetworkHandler {
15 level: LogLevel,
16 enabled: bool,
17 formatter: Formatter,
18 stream: Arc<Mutex<TcpStream>>,
19 filter: Option<HandlerFilter>,
20 batch_buffer: Arc<Mutex<Vec<Record>>>,
21 batch_size: Option<usize>,
22}
23
24impl NetworkHandler {
25 pub fn new(stream: TcpStream, level: LogLevel) -> Self {
26 Self {
27 level,
28 enabled: true,
29 formatter: Formatter::text(),
30 stream: Arc::new(Mutex::new(stream)),
31 filter: None,
32 batch_buffer: Arc::new(Mutex::new(Vec::new())),
33 batch_size: None,
34 }
35 }
36
37 pub fn with_level(mut self, level: LogLevel) -> Self {
38 self.level = level;
39 self
40 }
41
42 pub fn with_formatter(mut self, formatter: Formatter) -> Self {
43 self.formatter = formatter;
44 self
45 }
46
47 pub fn with_colors(mut self, use_colors: bool) -> Self {
48 self.formatter = self.formatter.with_colors(use_colors);
49 self
50 }
51
52 pub fn with_pattern(mut self, pattern: impl Into<String>) -> Self {
53 self.formatter = self.formatter.with_pattern(pattern);
54 self
55 }
56
57 pub fn with_format<F>(mut self, format_fn: F) -> Self
58 where
59 F: Fn(&Record) -> String + Send + Sync + 'static,
60 {
61 self.formatter = self.formatter.with_format(format_fn);
62 self
63 }
64
65 pub fn with_filter(mut self, filter: HandlerFilter) -> Self {
66 self.filter = Some(filter);
67 self
68 }
69
70 pub fn with_batching(mut self, batch_size: usize) -> Self {
71 self.batch_size = Some(batch_size);
72 self
73 }
74}
75
76impl Handler for NetworkHandler {
77 fn handle(&self, record: &Record) -> Result<(), String> {
78 if !self.enabled || record.level() < self.level {
79 return Ok(());
80 }
81 if let Some(filter) = &self.filter {
82 if !(filter)(record) {
83 return Ok(());
84 }
85 }
86 if let Some(batch_size) = self.batch_size {
87 let mut buffer = self.batch_buffer.lock().unwrap();
88 buffer.push(record.clone());
89 if buffer.len() >= batch_size {
90 let batch = buffer.drain(..).collect::<Vec<_>>();
91 drop(buffer);
92 return self.handle_batch(&batch);
93 }
94 return Ok(());
95 }
96 let formatted = self.formatter.format(record);
97 let mut stream = self
98 .stream
99 .lock()
100 .map_err(|e| format!("Failed to lock stream: {}", e))?;
101 writeln!(stream, "{}", formatted)
102 .map_err(|e| format!("Failed to write to network: {}", e))?;
103 Ok(())
104 }
105
106 fn level(&self) -> LogLevel {
107 self.level
108 }
109
110 fn set_level(&mut self, level: LogLevel) {
111 self.level = level;
112 }
113
114 fn is_enabled(&self) -> bool {
115 self.enabled
116 }
117
118 fn set_enabled(&mut self, enabled: bool) {
119 self.enabled = enabled;
120 }
121
122 fn formatter(&self) -> &Formatter {
123 &self.formatter
124 }
125
126 fn set_formatter(&mut self, formatter: Formatter) {
127 self.formatter = formatter;
128 }
129
130 fn set_filter(&mut self, filter: Option<HandlerFilter>) {
131 self.filter = filter;
132 }
133
134 fn filter(&self) -> Option<&HandlerFilter> {
135 self.filter.as_ref()
136 }
137
138 fn handle_batch(&self, records: &[Record]) -> Result<(), String> {
139 let mut stream = self
140 .stream
141 .lock()
142 .map_err(|e| format!("Failed to lock stream: {}", e))?;
143 for record in records {
144 if !self.enabled || record.level() < self.level {
145 continue;
146 }
147 if let Some(filter) = &self.filter {
148 if !(filter)(record) {
149 continue;
150 }
151 }
152 let formatted = self.formatter.format(record);
153 if let Err(e) = writeln!(stream, "{}", formatted) {
154 return Err(format!("Failed to write to network: {}", e));
155 }
156 }
157 Ok(())
158 }
159
160 fn init(&mut self) -> Result<(), String> {
161 Ok(())
162 }
163
164 fn flush(&self) -> Result<(), String> {
165 Ok(())
166 }
167
168 fn shutdown(&mut self) -> Result<(), String> {
169 Ok(())
170 }
171}
172
173impl Debug for NetworkHandler {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 f.debug_struct("NetworkHandler")
176 .field("level", &self.level)
177 .field("enabled", &self.enabled)
178 .field("formatter", &self.formatter)
179 .field("batch_size", &self.batch_size)
180 .finish()
181 }
182}
183
184impl Clone for NetworkHandler {
185 fn clone(&self) -> Self {
186 Self {
187 level: self.level,
188 enabled: self.enabled,
189 formatter: self.formatter.clone(),
190 stream: self.stream.clone(),
191 filter: self.filter.clone(),
192 batch_buffer: self.batch_buffer.clone(),
193 batch_size: self.batch_size,
194 }
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use std::io::{BufRead, BufReader};
202 use std::net::{TcpListener, TcpStream};
203 use std::sync::mpsc::channel;
204 use std::thread;
205
206 #[test]
207 fn test_network_handler_filtering_and_batching() {
208 let (tx, rx) = channel();
209 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
210 let addr = listener.local_addr().unwrap();
211 thread::spawn(move || {
212 let (mut stream, _) = listener.accept().unwrap();
213 let mut reader = BufReader::new(&mut stream);
214 let mut lines = Vec::new();
215 for _ in 0..3 {
216 let mut line = String::new();
217 reader.read_line(&mut line).unwrap();
218 lines.push(line);
219 }
220 tx.send(lines).unwrap();
221 });
222 let stream = TcpStream::connect(addr).unwrap();
223 let filter = std::sync::Arc::new(|record: &Record| record.message().contains("pass"));
224 let handler = NetworkHandler::new(stream, LogLevel::Info)
225 .with_filter(filter)
226 .with_batching(2);
227 let record1 = Record::new(
228 LogLevel::Info,
229 "should pass",
230 None::<String>,
231 None::<String>,
232 None,
233 );
234 let record2 = Record::new(
235 LogLevel::Info,
236 "should fail",
237 None::<String>,
238 None::<String>,
239 None,
240 );
241 let record3 = Record::new(
242 LogLevel::Info,
243 "should pass2",
244 None::<String>,
245 None::<String>,
246 None,
247 );
248 assert!(handler.handle(&record1).is_ok());
249 assert!(handler.handle(&record2).is_ok());
250 assert!(handler.handle(&record3).is_ok());
251 handler.flush().unwrap();
252 let lines = rx.recv().unwrap();
253 assert!(lines.iter().any(|l| l.contains("should pass")));
254 assert!(lines.iter().any(|l| l.contains("should pass2")));
255 assert!(!lines.iter().any(|l| l.contains("should fail")));
256 }
257}