rust_vector_logger/
logger.rs

1use chrono::Utc;
2use env_logger;
3use log::{debug, error, info, trace, warn};
4use serde::{Deserialize, Serialize};
5use std::fmt::Arguments;
6use tokio::io::AsyncWriteExt;
7use tokio::net::TcpStream;
8
9// Message Structure for logging
10#[derive(Serialize, Deserialize, Debug)]
11pub struct Message {
12    pub timestamp: String,
13    pub application: String,
14    pub level: String,
15    pub message: String,
16}
17
18impl Message {
19    pub fn new(timestamp: String, application: &str, level: &str, message: &str) -> Self {
20        Message {
21            timestamp,
22            application: application.to_string(),
23            level: level.to_string(),
24            message: message.to_string(),
25        }
26    }
27}
28
29// Logger struct to handle connections
30//
31pub struct Logger {
32    stream: Option<TcpStream>,
33    application: String,
34    level: String,
35    host: String,
36    port: u16,
37}
38
39impl Logger {
40    pub fn init_logger() {
41        env_logger::init();
42    }
43
44    // Initialize a new Logger
45    pub async fn new(
46        application: &str,
47        level: &str,
48        host: &str,
49        port: u16,
50    ) -> tokio::io::Result<Logger> {
51        let addr = format!("{}:{}", host, port);
52        let stream = TcpStream::connect(addr).await.ok();
53        Ok(Logger {
54            application: application.to_string(),
55            level: level.to_string(),
56            stream,
57            host: host.to_string(),
58            port,
59        })
60    }
61
62    pub async fn reconnect(&self) -> tokio::io::Result<Logger> {
63        let addr = format!("{}:{}", self.host, self.port);
64        let new_stream = TcpStream::connect(addr).await.ok();
65        Ok(Logger {
66            application: self.application.clone(),
67            level: self.level.clone(),
68            stream: new_stream,
69            host: self.host.clone(),
70            port: self.port,
71        })
72    }
73
74    // Initialize a new Logger (backwards compatibility)
75    pub async fn init(
76        application: &str,
77        level: &str,
78        host: &str,
79        port: u16,
80    ) -> tokio::io::Result<Logger> {
81        let logger = Logger::new(application, level, host, port).await?;
82        Ok(logger)
83    }
84
85    pub fn time_now() -> String {
86        let now = Utc::now();
87        now.format("%Y-%m-%dT%H:%M:%S.%fZ").to_string()
88    }
89
90    async fn send(&mut self, message: &Message) -> Result<(), Box<dyn std::error::Error>> {
91        let json = serde_json::to_string(&message).unwrap();
92        loop {
93            if self.stream.is_none() {
94                let addr = format!("{}:{}", self.host, self.port);
95                self.stream = Some(TcpStream::connect(addr).await?);
96            }
97
98            if let Some(stream) = &mut self.stream {
99                match stream.write_all(json.as_bytes()).await {
100                    Ok(_) => {
101                        break;
102                    }
103                    Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
104                        error!("Error sending message: {}", e);
105                        self.stream = None;
106                    }
107                    Err(e) => return Err(Box::new(e)),
108                }
109            }
110        }
111
112        Ok(())
113    }
114
115    pub async fn info(&mut self, message: &str) {
116        if self.level.to_string().to_uppercase() == "ERROR"
117            || self.level.to_string().to_uppercase() == "WARN"
118        {
119            return;
120        }
121        let message = Message::new(Self::time_now(), &self.application, "INFO", message);
122        let result = self.send(&message).await;
123        match result {
124            Ok(_) => {
125                info!("{}", message.message);
126            }
127            Err(e) => {
128                error!("Error sending message: {}", e);
129            }
130        }
131    }
132
133    pub async fn infof(&mut self, fmt_str: Arguments<'_>) {
134        if self.level.to_string().to_uppercase() == "ERROR"
135            || self.level.to_string().to_uppercase() == "WARN"
136        {
137            return;
138        }
139        self.info(&fmt_str.to_string()).await;
140    }
141
142    pub async fn error(&mut self, message: &str) {
143        let message = Message::new(Self::time_now(), &self.application, "ERROR", message);
144        let result = self.send(&message).await;
145        match result {
146            Ok(_) => {
147                error!("{}", message.message);
148            }
149            Err(e) => {
150                error!("Error sending message: {}", e);
151            }
152        }
153    }
154
155    pub async fn errorf(&mut self, fmt_str: Arguments<'_>) {
156        self.error(&fmt_str.to_string()).await;
157    }
158
159    pub async fn warn(&mut self, message: &str) {
160        if self.level.to_string().to_uppercase() == "ERROR" {
161            return;
162        }
163        let message = Message::new(Self::time_now(), &self.application, "WARN", message);
164        let result = self.send(&message).await;
165        match result {
166            Ok(_) => {
167                warn!("{}", message.message);
168            }
169            Err(e) => {
170                error!("Error sending message: {}", e);
171            }
172        }
173    }
174
175    pub async fn warnf(&mut self, fmt_str: Arguments<'_>) {
176        if self.level.to_string().to_uppercase() == "ERROR" {
177            return;
178        }
179        self.warn(&fmt_str.to_string()).await;
180    }
181
182    pub async fn debug(&mut self, message: &str) {
183        if self.level.to_string().to_uppercase() != "DEBUG" {
184            return;
185        }
186        let message = Message::new(Self::time_now(), &self.application, "DEBUG", message);
187        let result = self.send(&message).await;
188        match result {
189            Ok(_) => {
190                debug!("{}", message.message);
191            }
192            Err(e) => {
193                error!("Error sending message: {}", e);
194            }
195        }
196    }
197
198    pub async fn debugf(&mut self, fmt_str: Arguments<'_>) {
199        if self.level.to_string().to_uppercase() != "DEBUG" {
200            return;
201        }
202        self.debug(&fmt_str.to_string()).await;
203    }
204
205    pub async fn trace(&mut self, message: &str) {
206        let message = Message::new(Self::time_now(), &self.application, "TRACE", message);
207        let result = self.send(&message).await;
208        match result {
209            Ok(_) => {
210                trace!("{}", message.message);
211            }
212            Err(e) => {
213                error!("Error sending message: {}", e);
214            }
215        }
216    }
217
218    pub async fn tracef(&mut self, fmt_str: Arguments<'_>) {
219        self.trace(&fmt_str.to_string()).await;
220    }
221}
222
223// Tests module
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use once_cell::sync::Lazy;
228    use rand::Rng;
229    use serde_json::Value;
230    use std::net::SocketAddr;
231    use std::sync::Mutex;
232    use tokio::io::AsyncReadExt;
233    use tokio::net::TcpListener;
234    use tokio::sync::{mpsc, oneshot};
235
236    static TEST_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
237
238    async fn start_mock_server(
239        port: u16,
240    ) -> (SocketAddr, mpsc::Receiver<String>, oneshot::Sender<()>) {
241        let (tx, rx) = mpsc::channel(100);
242        let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
243        let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
244            .await
245            .unwrap();
246        let addr = listener.local_addr().unwrap();
247
248        tokio::spawn(async move {
249            loop {
250                tokio::select! {
251                    accept_result = listener.accept() => {
252                        if let Ok((mut socket, _)) = accept_result {
253                            let mut buf = vec![0; 1024];
254                            match socket.read(&mut buf).await {
255                                Ok(n) => {
256                                    if n == 0 { continue; }
257                                    let msg = String::from_utf8_lossy(&buf[..n]).to_string();
258                                    tx.send(msg).await.unwrap();
259                                }
260                                Err(e) => {
261                                    eprintln!("Error reading from socket: {}", e);
262                                }
263                            }
264                        }
265                    },
266                    _ = &mut stop_rx => {
267                        println!("Stopping server");
268                        break;
269                    },
270                }
271            }
272        });
273        (addr, rx, stop_tx)
274    }
275
276    #[tokio::test]
277    async fn test_message_creation() {
278        let msg = Message::new(
279            "12345".to_string(),
280            "TestApp",
281            "INFO",
282            "This is a test message",
283        );
284        assert_eq!(msg.application, "TestApp");
285        assert_eq!(msg.level, "INFO");
286        assert_eq!(msg.message, "This is a test message");
287    }
288
289    fn _get_random_port() -> u16 {
290        let mut rng = rand::thread_rng();
291        rng.gen_range(12300..12400)
292    }
293
294    #[tokio::test]
295    async fn test_logger_initialization() {
296        let _guard = TEST_MUTEX.lock().unwrap();
297        let (local_addr, _, stop_server) = start_mock_server(_get_random_port()).await;
298        let level = "INFO";
299
300        let logger = Logger::new(
301            "TestApp",
302            &level,
303            &local_addr.ip().to_string(),
304            local_addr.port(),
305        )
306        .await;
307        assert!(logger.is_ok());
308        stop_server.send(()).unwrap();
309    }
310
311    #[tokio::test]
312    async fn test_logger_send() {
313        let _guard = TEST_MUTEX.lock().unwrap();
314        let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
315        let level = "INFO";
316
317        let mut logger = Logger::new(
318            "TestApp",
319            &level,
320            &local_addr.ip().to_string(),
321            local_addr.port(),
322        )
323        .await
324        .unwrap();
325        let now = Logger::time_now();
326        logger
327            .send(&Message::new(
328                now.clone(),
329                "TestApp",
330                "INFO",
331                "This is a test message",
332            ))
333            .await
334            .unwrap();
335
336        let rx = receiver.recv().await.unwrap();
337        let received_message: Value = serde_json::from_str(&rx).unwrap();
338        let received_timestamp = received_message["timestamp"].as_str().unwrap();
339        assert_eq!(received_timestamp, now);
340        assert_eq!(received_message["application"], "TestApp");
341        assert_eq!(received_message["level"], "INFO");
342        assert_eq!(received_message["message"], "This is a test message");
343
344        stop_server.send(()).unwrap();
345    }
346
347    #[tokio::test]
348    async fn test_logger_info() {
349        let _guard = TEST_MUTEX.lock().unwrap();
350        let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
351        let level = "INFO";
352
353        let mut logger = Logger::new(
354            "TestApp",
355            &level,
356            &local_addr.ip().to_string(),
357            local_addr.port(),
358        )
359        .await
360        .unwrap();
361        logger.info("This is a test message").await;
362
363        let rx = receiver.recv().await.unwrap();
364
365        let received_message: Value = serde_json::from_str(&rx).unwrap();
366        assert_eq!(received_message["application"], "TestApp");
367        assert_eq!(received_message["level"], "INFO");
368        assert_eq!(received_message["message"], "This is a test message");
369        stop_server.send(()).unwrap();
370    }
371
372    #[tokio::test]
373    async fn test_logger_infof() {
374        let _guard = TEST_MUTEX.lock().unwrap();
375        let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
376        let level = "INFO";
377
378        let mut logger = Logger::new(
379            "TestApp",
380            &level,
381            &local_addr.ip().to_string(),
382            local_addr.port(),
383        )
384        .await
385        .unwrap();
386        let test_arg = "dear tester";
387        logger
388            .infof(format_args!("This is a test message {}", test_arg))
389            .await;
390
391        let rx = receiver.recv().await.unwrap();
392        let received_message: Value = serde_json::from_str(&rx).unwrap();
393        assert_eq!(received_message["application"], "TestApp");
394        assert_eq!(received_message["level"], "INFO");
395        assert_eq!(
396            received_message["message"],
397            "This is a test message dear tester"
398        );
399        stop_server.send(()).unwrap();
400    }
401
402    #[tokio::test]
403    async fn test_logger_error() {
404        let _guard = TEST_MUTEX.lock().unwrap();
405        let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
406        let level = "ERROR";
407
408        let mut logger = Logger::new(
409            "TestApp",
410            &level,
411            &local_addr.ip().to_string(),
412            local_addr.port(),
413        )
414        .await
415        .unwrap();
416        logger.error("This is a test message").await;
417
418        let rx = receiver.recv().await.unwrap();
419        let received_message: Value = serde_json::from_str(&rx).unwrap();
420        assert_eq!(received_message["application"], "TestApp");
421        assert_eq!(received_message["level"], "ERROR");
422        assert_eq!(received_message["message"], "This is a test message");
423        stop_server.send(()).unwrap();
424    }
425
426    #[tokio::test]
427    async fn test_logger_errorf() {
428        let _guard = TEST_MUTEX.lock().unwrap();
429        let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
430        let level = "ERROR";
431
432        let mut logger = Logger::new(
433            "TestApp",
434            &level,
435            &local_addr.ip().to_string(),
436            local_addr.port(),
437        )
438        .await
439        .unwrap();
440        let test_arg = "dear tester";
441        logger
442            .errorf(format_args!("This is a test message {}", test_arg))
443            .await;
444
445        let rx = receiver.recv().await.unwrap();
446        let received_message: Value = serde_json::from_str(&rx).unwrap();
447        assert_eq!(received_message["application"], "TestApp");
448        assert_eq!(received_message["level"], "ERROR");
449        assert_eq!(
450            received_message["message"],
451            "This is a test message dear tester"
452        );
453        stop_server.send(()).unwrap();
454    }
455
456    #[tokio::test]
457    async fn test_logger_warn() {
458        let _guard = TEST_MUTEX.lock().unwrap();
459        let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
460        let level = "WARN";
461
462        let mut logger = Logger::new(
463            "TestApp",
464            &level,
465            &local_addr.ip().to_string(),
466            local_addr.port(),
467        )
468        .await
469        .unwrap();
470        logger.warn("This is a test message").await;
471
472        let rx = receiver.recv().await.unwrap();
473        let received_message: Value = serde_json::from_str(&rx).unwrap();
474        assert_eq!(received_message["application"], "TestApp");
475        assert_eq!(received_message["level"], "WARN");
476        assert_eq!(received_message["message"], "This is a test message");
477        stop_server.send(()).unwrap();
478    }
479
480    #[tokio::test]
481    async fn test_logger_warnf() {
482        let _guard = TEST_MUTEX.lock().unwrap();
483        let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
484        let level = "WARN";
485
486        let mut logger = Logger::new(
487            "TestApp",
488            &level,
489            &local_addr.ip().to_string(),
490            local_addr.port(),
491        )
492        .await
493        .unwrap();
494        let test_arg = "dear tester";
495        logger
496            .warnf(format_args!("This is a test message {}", test_arg))
497            .await;
498
499        let rx = receiver.recv().await.unwrap();
500        let received_message: Value = serde_json::from_str(&rx).unwrap();
501        assert_eq!(received_message["application"], "TestApp");
502        assert_eq!(received_message["level"], "WARN");
503        assert_eq!(
504            received_message["message"],
505            "This is a test message dear tester"
506        );
507        stop_server.send(()).unwrap();
508    }
509
510    #[tokio::test]
511    async fn test_logger_debug() {
512        let _guard = TEST_MUTEX.lock().unwrap();
513        let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
514        let level = "DEBUG";
515
516        let mut logger = Logger::new(
517            "TestApp",
518            &level,
519            &local_addr.ip().to_string(),
520            local_addr.port(),
521        )
522        .await
523        .unwrap();
524        logger.debug("This is a test message").await;
525
526        let rx = receiver.recv().await.unwrap();
527        let received_message: Value = serde_json::from_str(&rx).unwrap();
528        assert_eq!(received_message["application"], "TestApp");
529        assert_eq!(received_message["level"], "DEBUG");
530        assert_eq!(received_message["message"], "This is a test message");
531        stop_server.send(()).unwrap();
532    }
533
534    #[tokio::test]
535    async fn test_logger_debugf() {
536        let _guard = TEST_MUTEX.lock().unwrap();
537        let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
538        let level = "DEBUG";
539
540        let mut logger = Logger::new(
541            "TestApp",
542            &level,
543            &local_addr.ip().to_string(),
544            local_addr.port(),
545        )
546        .await
547        .unwrap();
548        let test_arg = "dear tester";
549        logger
550            .debugf(format_args!("This is a test message {}", test_arg))
551            .await;
552
553        let rx = receiver.recv().await.unwrap();
554        let received_message: Value = serde_json::from_str(&rx).unwrap();
555        assert_eq!(received_message["application"], "TestApp");
556        assert_eq!(received_message["level"], "DEBUG");
557        assert_eq!(
558            received_message["message"],
559            "This is a test message dear tester"
560        );
561        stop_server.send(()).unwrap();
562    }
563
564    #[tokio::test]
565    async fn test_logger_trace() {
566        let _guard = TEST_MUTEX.lock().unwrap();
567        let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
568        let level = "TRACE";
569
570        let mut logger = Logger::new(
571            "TestApp",
572            &level,
573            &local_addr.ip().to_string(),
574            local_addr.port(),
575        )
576        .await
577        .unwrap();
578        logger.trace("This is a test message").await;
579
580        let rx = receiver.recv().await.unwrap();
581        let received_message: Value = serde_json::from_str(&rx).unwrap();
582        assert_eq!(received_message["application"], "TestApp");
583        assert_eq!(received_message["level"], "TRACE");
584        assert_eq!(received_message["message"], "This is a test message");
585        stop_server.send(()).unwrap();
586    }
587
588    #[tokio::test]
589    async fn test_logger_tracef() {
590        let _guard = TEST_MUTEX.lock().unwrap();
591        let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
592        let level = "TRACE";
593
594        let mut logger = Logger::new(
595            "TestApp",
596            &level,
597            &local_addr.ip().to_string(),
598            local_addr.port(),
599        )
600        .await
601        .unwrap();
602        let test_arg = "dear tester";
603        logger
604            .tracef(format_args!("This is a test message {}", test_arg))
605            .await;
606
607        let rx = receiver.recv().await.unwrap();
608        let received_message: Value = serde_json::from_str(&rx).unwrap();
609        assert_eq!(received_message["application"], "TestApp");
610        assert_eq!(received_message["level"], "TRACE");
611        assert_eq!(
612            received_message["message"],
613            "This is a test message dear tester"
614        );
615        stop_server.send(()).unwrap();
616    }
617
618    #[tokio::test]
619    async fn test_logger_reconnect() {
620        let _guard = TEST_MUTEX.lock().unwrap();
621        let (local_addr, _, stop_server) = start_mock_server(_get_random_port()).await;
622        let level = "INFO";
623
624        let logger = Logger::new(
625            "TestApp",
626            &level,
627            &local_addr.ip().to_string(),
628            local_addr.port(),
629        )
630        .await
631        .unwrap();
632        let mut logger_reconnected = logger.reconnect().await.unwrap();
633        logger_reconnected.info("This is a test message").await;
634        stop_server.send(()).unwrap();
635    }
636}