rust_loguru/handler/
network.rs

1use std::fmt;
2use std::io::Write;
3use std::net::TcpStream;
4use std::sync::{Arc, Mutex};
5
6use crate::formatters::Formatter;
7use crate::level::LogLevel;
8use crate::record::Record;
9
10use super::{Handler, HandlerFilter};
11use std::fmt::Debug;
12
13/// A handler that writes log records to a network socket
14pub struct NetworkHandler {
15    level: LogLevel,
16    enabled: bool,
17    formatter: Formatter,
18    stream: Arc<Mutex<TcpStream>>,
19    filter: Option<HandlerFilter>,
20    batch_buffer: Arc<Mutex<Vec<Record>>>,
21    batch_size: Option<usize>,
22}
23
24impl NetworkHandler {
25    pub fn new(stream: TcpStream, level: LogLevel) -> Self {
26        Self {
27            level,
28            enabled: true,
29            formatter: Formatter::text(),
30            stream: Arc::new(Mutex::new(stream)),
31            filter: None,
32            batch_buffer: Arc::new(Mutex::new(Vec::new())),
33            batch_size: None,
34        }
35    }
36
37    pub fn with_level(mut self, level: LogLevel) -> Self {
38        self.level = level;
39        self
40    }
41
42    pub fn with_formatter(mut self, formatter: Formatter) -> Self {
43        self.formatter = formatter;
44        self
45    }
46
47    pub fn with_colors(mut self, use_colors: bool) -> Self {
48        self.formatter = self.formatter.with_colors(use_colors);
49        self
50    }
51
52    pub fn with_pattern(mut self, pattern: impl Into<String>) -> Self {
53        self.formatter = self.formatter.with_pattern(pattern);
54        self
55    }
56
57    pub fn with_format<F>(mut self, format_fn: F) -> Self
58    where
59        F: Fn(&Record) -> String + Send + Sync + 'static,
60    {
61        self.formatter = self.formatter.with_format(format_fn);
62        self
63    }
64
65    pub fn with_filter(mut self, filter: HandlerFilter) -> Self {
66        self.filter = Some(filter);
67        self
68    }
69
70    pub fn with_batching(mut self, batch_size: usize) -> Self {
71        self.batch_size = Some(batch_size);
72        self
73    }
74}
75
76impl Handler for NetworkHandler {
77    fn handle(&self, record: &Record) -> Result<(), String> {
78        if !self.enabled || record.level() < self.level {
79            return Ok(());
80        }
81        if let Some(filter) = &self.filter {
82            if !(filter)(record) {
83                return Ok(());
84            }
85        }
86        if let Some(batch_size) = self.batch_size {
87            let mut buffer = self.batch_buffer.lock().unwrap();
88            buffer.push(record.clone());
89            if buffer.len() >= batch_size {
90                let batch = buffer.drain(..).collect::<Vec<_>>();
91                drop(buffer);
92                return self.handle_batch(&batch);
93            }
94            return Ok(());
95        }
96        let formatted = self.formatter.format(record);
97        let mut stream = self
98            .stream
99            .lock()
100            .map_err(|e| format!("Failed to lock stream: {}", e))?;
101        writeln!(stream, "{}", formatted)
102            .map_err(|e| format!("Failed to write to network: {}", e))?;
103        Ok(())
104    }
105
106    fn level(&self) -> LogLevel {
107        self.level
108    }
109
110    fn set_level(&mut self, level: LogLevel) {
111        self.level = level;
112    }
113
114    fn is_enabled(&self) -> bool {
115        self.enabled
116    }
117
118    fn set_enabled(&mut self, enabled: bool) {
119        self.enabled = enabled;
120    }
121
122    fn formatter(&self) -> &Formatter {
123        &self.formatter
124    }
125
126    fn set_formatter(&mut self, formatter: Formatter) {
127        self.formatter = formatter;
128    }
129
130    fn set_filter(&mut self, filter: Option<HandlerFilter>) {
131        self.filter = filter;
132    }
133
134    fn filter(&self) -> Option<&HandlerFilter> {
135        self.filter.as_ref()
136    }
137
138    fn handle_batch(&self, records: &[Record]) -> Result<(), String> {
139        let mut stream = self
140            .stream
141            .lock()
142            .map_err(|e| format!("Failed to lock stream: {}", e))?;
143        for record in records {
144            if !self.enabled || record.level() < self.level {
145                continue;
146            }
147            if let Some(filter) = &self.filter {
148                if !(filter)(record) {
149                    continue;
150                }
151            }
152            let formatted = self.formatter.format(record);
153            if let Err(e) = writeln!(stream, "{}", formatted) {
154                return Err(format!("Failed to write to network: {}", e));
155            }
156        }
157        Ok(())
158    }
159
160    fn init(&mut self) -> Result<(), String> {
161        Ok(())
162    }
163
164    fn flush(&self) -> Result<(), String> {
165        Ok(())
166    }
167
168    fn shutdown(&mut self) -> Result<(), String> {
169        Ok(())
170    }
171}
172
173impl Debug for NetworkHandler {
174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175        f.debug_struct("NetworkHandler")
176            .field("level", &self.level)
177            .field("enabled", &self.enabled)
178            .field("formatter", &self.formatter)
179            .field("batch_size", &self.batch_size)
180            .finish()
181    }
182}
183
184impl Clone for NetworkHandler {
185    fn clone(&self) -> Self {
186        Self {
187            level: self.level,
188            enabled: self.enabled,
189            formatter: self.formatter.clone(),
190            stream: self.stream.clone(),
191            filter: self.filter.clone(),
192            batch_buffer: self.batch_buffer.clone(),
193            batch_size: self.batch_size,
194        }
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use std::io::{BufRead, BufReader};
202    use std::net::{TcpListener, TcpStream};
203    use std::sync::mpsc::channel;
204    use std::thread;
205
206    #[test]
207    fn test_network_handler_filtering_and_batching() {
208        let (tx, rx) = channel();
209        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
210        let addr = listener.local_addr().unwrap();
211        thread::spawn(move || {
212            let (mut stream, _) = listener.accept().unwrap();
213            let mut reader = BufReader::new(&mut stream);
214            let mut lines = Vec::new();
215            for _ in 0..3 {
216                let mut line = String::new();
217                reader.read_line(&mut line).unwrap();
218                lines.push(line);
219            }
220            tx.send(lines).unwrap();
221        });
222        let stream = TcpStream::connect(addr).unwrap();
223        let filter = std::sync::Arc::new(|record: &Record| record.message().contains("pass"));
224        let handler = NetworkHandler::new(stream, LogLevel::Info)
225            .with_filter(filter)
226            .with_batching(2);
227        let record1 = Record::new(
228            LogLevel::Info,
229            "should pass",
230            None::<String>,
231            None::<String>,
232            None,
233        );
234        let record2 = Record::new(
235            LogLevel::Info,
236            "should fail",
237            None::<String>,
238            None::<String>,
239            None,
240        );
241        let record3 = Record::new(
242            LogLevel::Info,
243            "should pass2",
244            None::<String>,
245            None::<String>,
246            None,
247        );
248        assert!(handler.handle(&record1).is_ok());
249        assert!(handler.handle(&record2).is_ok());
250        assert!(handler.handle(&record3).is_ok());
251        handler.flush().unwrap();
252        let lines = rx.recv().unwrap();
253        assert!(lines.iter().any(|l| l.contains("should pass")));
254        assert!(lines.iter().any(|l| l.contains("should pass2")));
255        assert!(!lines.iter().any(|l| l.contains("should fail")));
256    }
257}