rust_loguru/handler/
network.rs

1use std::fmt;
2use std::io::Write;
3use std::net::TcpStream;
4use std::sync::{Arc, Mutex};
5
6use crate::formatters::Formatter;
7use crate::level::LogLevel;
8use crate::record::Record;
9
10use super::{Handler, HandlerError, HandlerFilter};
11use std::fmt::Debug;
12
13/// A handler that writes log records to a network socket
14pub struct NetworkHandler {
15    level: LogLevel,
16    enabled: bool,
17    formatter: Formatter,
18    stream: Arc<Mutex<Option<TcpStream>>>,
19    filter: Option<HandlerFilter>,
20    batch_buffer: Arc<Mutex<Vec<Record>>>,
21    batch_size: Option<usize>,
22    addr: String,
23}
24
25impl NetworkHandler {
26    pub fn new(stream: TcpStream, level: LogLevel) -> Self {
27        Self {
28            level,
29            enabled: true,
30            formatter: Formatter::text(),
31            stream: Arc::new(Mutex::new(Some(stream))),
32            filter: None,
33            batch_buffer: Arc::new(Mutex::new(Vec::new())),
34            batch_size: None,
35            addr: String::new(),
36        }
37    }
38
39    pub fn with_level(mut self, level: LogLevel) -> Self {
40        self.level = level;
41        self
42    }
43
44    pub fn with_formatter(mut self, formatter: Formatter) -> Self {
45        self.formatter = formatter;
46        self
47    }
48
49    pub fn with_colors(mut self, use_colors: bool) -> Self {
50        self.formatter = self.formatter.with_colors(use_colors);
51        self
52    }
53
54    pub fn with_pattern(mut self, pattern: impl Into<String>) -> Self {
55        self.formatter = self.formatter.with_pattern(pattern);
56        self
57    }
58
59    pub fn with_format<F>(mut self, format_fn: F) -> Self
60    where
61        F: Fn(&Record) -> String + Send + Sync + 'static,
62    {
63        self.formatter = self.formatter.with_format(format_fn);
64        self
65    }
66
67    pub fn with_filter(mut self, filter: HandlerFilter) -> Self {
68        self.filter = Some(filter);
69        self
70    }
71
72    pub fn with_batching(mut self, batch_size: usize) -> Self {
73        self.batch_size = Some(batch_size);
74        self
75    }
76}
77
78impl Handler for NetworkHandler {
79    fn handle(&self, record: &Record) -> Result<(), HandlerError> {
80        if !self.enabled || record.level() < self.level {
81            return Ok(());
82        }
83        if let Some(filter) = &self.filter {
84            if !(filter)(record) {
85                return Ok(());
86            }
87        }
88        if let Some(batch_size) = self.batch_size {
89            let mut buffer = self.batch_buffer.lock().unwrap();
90            buffer.push(record.clone());
91            if buffer.len() >= batch_size {
92                let batch = buffer.drain(..).collect::<Vec<_>>();
93                drop(buffer);
94                return self.handle_batch(&batch);
95            }
96            return Ok(());
97        }
98        let formatted = self.formatter.format(record);
99        if let Some(ref mut stream) = self.stream.lock().unwrap().as_mut() {
100            write!(stream, "{}", formatted).map_err(HandlerError::IoError)?;
101            stream.flush().map_err(HandlerError::IoError)?;
102            Ok(())
103        } else {
104            Err(HandlerError::NotInitialized)
105        }
106    }
107
108    fn level(&self) -> LogLevel {
109        self.level
110    }
111
112    fn set_level(&mut self, level: LogLevel) {
113        self.level = level;
114    }
115
116    fn is_enabled(&self) -> bool {
117        self.enabled
118    }
119
120    fn set_enabled(&mut self, enabled: bool) {
121        self.enabled = enabled;
122    }
123
124    fn formatter(&self) -> &Formatter {
125        &self.formatter
126    }
127
128    fn set_formatter(&mut self, formatter: Formatter) {
129        self.formatter = formatter;
130    }
131
132    fn set_filter(&mut self, filter: Option<HandlerFilter>) {
133        self.filter = filter;
134    }
135
136    fn filter(&self) -> Option<&HandlerFilter> {
137        self.filter.as_ref()
138    }
139
140    fn handle_batch(&self, records: &[Record]) -> Result<(), HandlerError> {
141        if !self.enabled {
142            return Ok(());
143        }
144
145        if let Some(ref mut stream) = self.stream.lock().unwrap().as_mut() {
146            for record in records {
147                if record.level() < self.level {
148                    continue;
149                }
150                if let Some(filter) = &self.filter {
151                    if !(filter)(record) {
152                        continue;
153                    }
154                }
155                let formatted = self.formatter.format(record);
156                write!(stream, "{}", formatted).map_err(HandlerError::IoError)?;
157            }
158            stream.flush().map_err(HandlerError::IoError)?;
159            Ok(())
160        } else {
161            Err(HandlerError::NotInitialized)
162        }
163    }
164
165    fn init(&mut self) -> Result<(), HandlerError> {
166        let stream = TcpStream::connect(&self.addr).map_err(HandlerError::IoError)?;
167        *self.stream.lock().unwrap() = Some(stream);
168        Ok(())
169    }
170
171    fn flush(&self) -> Result<(), HandlerError> {
172        if let Some(ref mut stream) = self.stream.lock().unwrap().as_mut() {
173            stream.flush().map_err(HandlerError::IoError)?;
174            Ok(())
175        } else {
176            Err(HandlerError::NotInitialized)
177        }
178    }
179
180    fn shutdown(&mut self) -> Result<(), HandlerError> {
181        self.flush()
182    }
183}
184
185impl Debug for NetworkHandler {
186    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187        f.debug_struct("NetworkHandler")
188            .field("level", &self.level)
189            .field("enabled", &self.enabled)
190            .field("formatter", &self.formatter)
191            .field("batch_size", &self.batch_size)
192            .finish()
193    }
194}
195
196impl Clone for NetworkHandler {
197    fn clone(&self) -> Self {
198        Self {
199            level: self.level,
200            enabled: self.enabled,
201            formatter: self.formatter.clone(),
202            stream: self.stream.clone(),
203            filter: self.filter.clone(),
204            batch_buffer: self.batch_buffer.clone(),
205            batch_size: self.batch_size,
206            addr: self.addr.clone(),
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use std::io::{BufRead, BufReader};
215    use std::net::{TcpListener, TcpStream};
216    use std::sync::mpsc::channel;
217    use std::thread;
218
219    #[test]
220    fn test_network_handler_filtering_and_batching() {
221        let (tx, rx) = channel();
222        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
223        let addr = listener.local_addr().unwrap();
224
225        // Start server thread that collects all lines until connection closes
226        thread::spawn(move || {
227            let (mut stream, _) = listener.accept().unwrap();
228            let mut reader = BufReader::new(&mut stream);
229            let mut lines = Vec::new();
230            loop {
231                let mut line = String::new();
232                match reader.read_line(&mut line) {
233                    Ok(0) | Err(_) => break, // Connection closed or error
234                    Ok(_) => {
235                        if !line.is_empty() {
236                            lines.push(line);
237                        }
238                    }
239                }
240            }
241            tx.send(lines).unwrap();
242        });
243
244        // Create and configure handler
245        let stream = TcpStream::connect(addr).unwrap();
246        let filter = std::sync::Arc::new(|record: &Record| record.message().contains("pass"));
247        let handler = NetworkHandler::new(stream, LogLevel::Info)
248            .with_filter(filter)
249            .with_batching(2);
250
251        // Send test records
252        let record1 = Record::new(
253            LogLevel::Info,
254            "should pass",
255            None::<String>,
256            None::<String>,
257            None,
258        );
259        let record2 = Record::new(
260            LogLevel::Info,
261            "should fail",
262            None::<String>,
263            None::<String>,
264            None,
265        );
266        let record3 = Record::new(
267            LogLevel::Info,
268            "should pass again",
269            None::<String>,
270            None::<String>,
271            None,
272        );
273
274        // Handle records and flush
275        assert!(handler.handle(&record1).is_ok());
276        assert!(handler.handle(&record2).is_ok());
277        assert!(handler.handle(&record3).is_ok());
278        handler.flush().unwrap();
279
280        // Close the connection to signal server thread
281        drop(handler);
282
283        // Verify results
284        let lines = rx.recv().unwrap();
285        assert!(lines.iter().any(|l| l.contains("should pass")));
286        assert!(lines.iter().any(|l| l.contains("should pass again")));
287        assert!(!lines.iter().any(|l| l.contains("should fail")));
288    }
289}