1use chrono::Utc;
2use env_logger;
3use log::{debug, error, info, trace, warn};
4use serde::{Deserialize, Serialize};
5use std::fmt::Arguments;
6use tokio::io::AsyncWriteExt;
7use tokio::net::TcpStream;
8
9#[derive(Serialize, Deserialize, Debug)]
11pub struct Message {
12 pub timestamp: String,
13 pub application: String,
14 pub level: String,
15 pub message: String,
16}
17
18impl Message {
19 pub fn new(timestamp: String, application: &str, level: &str, message: &str) -> Self {
20 Message {
21 timestamp,
22 application: application.to_string(),
23 level: level.to_string(),
24 message: message.to_string(),
25 }
26 }
27}
28
29pub struct Logger {
32 stream: Option<TcpStream>,
33 application: String,
34 level: String,
35 host: String,
36 port: u16,
37}
38
39impl Logger {
40 pub fn init_logger() {
41 env_logger::init();
42 }
43
44 pub async fn new(
46 application: &str,
47 level: &str,
48 host: &str,
49 port: u16,
50 ) -> tokio::io::Result<Logger> {
51 let addr = format!("{}:{}", host, port);
52 let stream = TcpStream::connect(addr).await.ok();
53 Ok(Logger {
54 application: application.to_string(),
55 level: level.to_string(),
56 stream,
57 host: host.to_string(),
58 port,
59 })
60 }
61
62 pub async fn reconnect(&self) -> tokio::io::Result<Logger> {
63 let addr = format!("{}:{}", self.host, self.port);
64 let new_stream = TcpStream::connect(addr).await.ok();
65 Ok(Logger {
66 application: self.application.clone(),
67 level: self.level.clone(),
68 stream: new_stream,
69 host: self.host.clone(),
70 port: self.port,
71 })
72 }
73
74 pub async fn init(
76 application: &str,
77 level: &str,
78 host: &str,
79 port: u16,
80 ) -> tokio::io::Result<Logger> {
81 let logger = Logger::new(application, level, host, port).await?;
82 Ok(logger)
83 }
84
85 pub fn time_now() -> String {
86 let now = Utc::now();
87 now.format("%Y-%m-%dT%H:%M:%S.%fZ").to_string()
88 }
89
90 async fn send(&mut self, message: &Message) -> Result<(), Box<dyn std::error::Error>> {
91 let json = serde_json::to_string(&message).unwrap();
92 loop {
93 if self.stream.is_none() {
94 let addr = format!("{}:{}", self.host, self.port);
95 self.stream = Some(TcpStream::connect(addr).await?);
96 }
97
98 if let Some(stream) = &mut self.stream {
99 match stream.write_all(json.as_bytes()).await {
100 Ok(_) => {
101 break;
102 }
103 Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
104 error!("Error sending message: {}", e);
105 self.stream = None;
106 }
107 Err(e) => return Err(Box::new(e)),
108 }
109 }
110 }
111
112 Ok(())
113 }
114
115 pub async fn info(&mut self, message: &str) {
116 if self.level.to_string().to_uppercase() == "ERROR"
117 || self.level.to_string().to_uppercase() == "WARN"
118 {
119 return;
120 }
121 let message = Message::new(Self::time_now(), &self.application, "INFO", message);
122 let result = self.send(&message).await;
123 match result {
124 Ok(_) => {
125 info!("{}", message.message);
126 }
127 Err(e) => {
128 error!("Error sending message: {}", e);
129 }
130 }
131 }
132
133 pub async fn infof(&mut self, fmt_str: Arguments<'_>) {
134 if self.level.to_string().to_uppercase() == "ERROR"
135 || self.level.to_string().to_uppercase() == "WARN"
136 {
137 return;
138 }
139 self.info(&fmt_str.to_string()).await;
140 }
141
142 pub async fn error(&mut self, message: &str) {
143 let message = Message::new(Self::time_now(), &self.application, "ERROR", message);
144 let result = self.send(&message).await;
145 match result {
146 Ok(_) => {
147 error!("{}", message.message);
148 }
149 Err(e) => {
150 error!("Error sending message: {}", e);
151 }
152 }
153 }
154
155 pub async fn errorf(&mut self, fmt_str: Arguments<'_>) {
156 self.error(&fmt_str.to_string()).await;
157 }
158
159 pub async fn warn(&mut self, message: &str) {
160 if self.level.to_string().to_uppercase() == "ERROR" {
161 return;
162 }
163 let message = Message::new(Self::time_now(), &self.application, "WARN", message);
164 let result = self.send(&message).await;
165 match result {
166 Ok(_) => {
167 warn!("{}", message.message);
168 }
169 Err(e) => {
170 error!("Error sending message: {}", e);
171 }
172 }
173 }
174
175 pub async fn warnf(&mut self, fmt_str: Arguments<'_>) {
176 if self.level.to_string().to_uppercase() == "ERROR" {
177 return;
178 }
179 self.warn(&fmt_str.to_string()).await;
180 }
181
182 pub async fn debug(&mut self, message: &str) {
183 if self.level.to_string().to_uppercase() != "DEBUG" {
184 return;
185 }
186 let message = Message::new(Self::time_now(), &self.application, "DEBUG", message);
187 let result = self.send(&message).await;
188 match result {
189 Ok(_) => {
190 debug!("{}", message.message);
191 }
192 Err(e) => {
193 error!("Error sending message: {}", e);
194 }
195 }
196 }
197
198 pub async fn debugf(&mut self, fmt_str: Arguments<'_>) {
199 if self.level.to_string().to_uppercase() != "DEBUG" {
200 return;
201 }
202 self.debug(&fmt_str.to_string()).await;
203 }
204
205 pub async fn trace(&mut self, message: &str) {
206 let message = Message::new(Self::time_now(), &self.application, "TRACE", message);
207 let result = self.send(&message).await;
208 match result {
209 Ok(_) => {
210 trace!("{}", message.message);
211 }
212 Err(e) => {
213 error!("Error sending message: {}", e);
214 }
215 }
216 }
217
218 pub async fn tracef(&mut self, fmt_str: Arguments<'_>) {
219 self.trace(&fmt_str.to_string()).await;
220 }
221}
222
223#[cfg(test)]
225mod tests {
226 use super::*;
227 use once_cell::sync::Lazy;
228 use rand::Rng;
229 use serde_json::Value;
230 use std::net::SocketAddr;
231 use std::sync::Mutex;
232 use tokio::io::AsyncReadExt;
233 use tokio::net::TcpListener;
234 use tokio::sync::{mpsc, oneshot};
235
236 static TEST_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
237
238 async fn start_mock_server(
239 port: u16,
240 ) -> (SocketAddr, mpsc::Receiver<String>, oneshot::Sender<()>) {
241 let (tx, rx) = mpsc::channel(100);
242 let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
243 let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
244 .await
245 .unwrap();
246 let addr = listener.local_addr().unwrap();
247
248 tokio::spawn(async move {
249 loop {
250 tokio::select! {
251 accept_result = listener.accept() => {
252 if let Ok((mut socket, _)) = accept_result {
253 let mut buf = vec![0; 1024];
254 match socket.read(&mut buf).await {
255 Ok(n) => {
256 if n == 0 { continue; }
257 let msg = String::from_utf8_lossy(&buf[..n]).to_string();
258 tx.send(msg).await.unwrap();
259 }
260 Err(e) => {
261 eprintln!("Error reading from socket: {}", e);
262 }
263 }
264 }
265 },
266 _ = &mut stop_rx => {
267 println!("Stopping server");
268 break;
269 },
270 }
271 }
272 });
273 (addr, rx, stop_tx)
274 }
275
276 #[tokio::test]
277 async fn test_message_creation() {
278 let msg = Message::new(
279 "12345".to_string(),
280 "TestApp",
281 "INFO",
282 "This is a test message",
283 );
284 assert_eq!(msg.application, "TestApp");
285 assert_eq!(msg.level, "INFO");
286 assert_eq!(msg.message, "This is a test message");
287 }
288
289 fn _get_random_port() -> u16 {
290 let mut rng = rand::thread_rng();
291 rng.gen_range(12300..12400)
292 }
293
294 #[tokio::test]
295 async fn test_logger_initialization() {
296 let _guard = TEST_MUTEX.lock().unwrap();
297 let (local_addr, _, stop_server) = start_mock_server(_get_random_port()).await;
298 let level = "INFO";
299
300 let logger = Logger::new(
301 "TestApp",
302 &level,
303 &local_addr.ip().to_string(),
304 local_addr.port(),
305 )
306 .await;
307 assert!(logger.is_ok());
308 stop_server.send(()).unwrap();
309 }
310
311 #[tokio::test]
312 async fn test_logger_send() {
313 let _guard = TEST_MUTEX.lock().unwrap();
314 let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
315 let level = "INFO";
316
317 let mut logger = Logger::new(
318 "TestApp",
319 &level,
320 &local_addr.ip().to_string(),
321 local_addr.port(),
322 )
323 .await
324 .unwrap();
325 let now = Logger::time_now();
326 logger
327 .send(&Message::new(
328 now.clone(),
329 "TestApp",
330 "INFO",
331 "This is a test message",
332 ))
333 .await
334 .unwrap();
335
336 let rx = receiver.recv().await.unwrap();
337 let received_message: Value = serde_json::from_str(&rx).unwrap();
338 let received_timestamp = received_message["timestamp"].as_str().unwrap();
339 assert_eq!(received_timestamp, now);
340 assert_eq!(received_message["application"], "TestApp");
341 assert_eq!(received_message["level"], "INFO");
342 assert_eq!(received_message["message"], "This is a test message");
343
344 stop_server.send(()).unwrap();
345 }
346
347 #[tokio::test]
348 async fn test_logger_info() {
349 let _guard = TEST_MUTEX.lock().unwrap();
350 let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
351 let level = "INFO";
352
353 let mut logger = Logger::new(
354 "TestApp",
355 &level,
356 &local_addr.ip().to_string(),
357 local_addr.port(),
358 )
359 .await
360 .unwrap();
361 logger.info("This is a test message").await;
362
363 let rx = receiver.recv().await.unwrap();
364
365 let received_message: Value = serde_json::from_str(&rx).unwrap();
366 assert_eq!(received_message["application"], "TestApp");
367 assert_eq!(received_message["level"], "INFO");
368 assert_eq!(received_message["message"], "This is a test message");
369 stop_server.send(()).unwrap();
370 }
371
372 #[tokio::test]
373 async fn test_logger_infof() {
374 let _guard = TEST_MUTEX.lock().unwrap();
375 let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
376 let level = "INFO";
377
378 let mut logger = Logger::new(
379 "TestApp",
380 &level,
381 &local_addr.ip().to_string(),
382 local_addr.port(),
383 )
384 .await
385 .unwrap();
386 let test_arg = "dear tester";
387 logger
388 .infof(format_args!("This is a test message {}", test_arg))
389 .await;
390
391 let rx = receiver.recv().await.unwrap();
392 let received_message: Value = serde_json::from_str(&rx).unwrap();
393 assert_eq!(received_message["application"], "TestApp");
394 assert_eq!(received_message["level"], "INFO");
395 assert_eq!(
396 received_message["message"],
397 "This is a test message dear tester"
398 );
399 stop_server.send(()).unwrap();
400 }
401
402 #[tokio::test]
403 async fn test_logger_error() {
404 let _guard = TEST_MUTEX.lock().unwrap();
405 let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
406 let level = "ERROR";
407
408 let mut logger = Logger::new(
409 "TestApp",
410 &level,
411 &local_addr.ip().to_string(),
412 local_addr.port(),
413 )
414 .await
415 .unwrap();
416 logger.error("This is a test message").await;
417
418 let rx = receiver.recv().await.unwrap();
419 let received_message: Value = serde_json::from_str(&rx).unwrap();
420 assert_eq!(received_message["application"], "TestApp");
421 assert_eq!(received_message["level"], "ERROR");
422 assert_eq!(received_message["message"], "This is a test message");
423 stop_server.send(()).unwrap();
424 }
425
426 #[tokio::test]
427 async fn test_logger_errorf() {
428 let _guard = TEST_MUTEX.lock().unwrap();
429 let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
430 let level = "ERROR";
431
432 let mut logger = Logger::new(
433 "TestApp",
434 &level,
435 &local_addr.ip().to_string(),
436 local_addr.port(),
437 )
438 .await
439 .unwrap();
440 let test_arg = "dear tester";
441 logger
442 .errorf(format_args!("This is a test message {}", test_arg))
443 .await;
444
445 let rx = receiver.recv().await.unwrap();
446 let received_message: Value = serde_json::from_str(&rx).unwrap();
447 assert_eq!(received_message["application"], "TestApp");
448 assert_eq!(received_message["level"], "ERROR");
449 assert_eq!(
450 received_message["message"],
451 "This is a test message dear tester"
452 );
453 stop_server.send(()).unwrap();
454 }
455
456 #[tokio::test]
457 async fn test_logger_warn() {
458 let _guard = TEST_MUTEX.lock().unwrap();
459 let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
460 let level = "WARN";
461
462 let mut logger = Logger::new(
463 "TestApp",
464 &level,
465 &local_addr.ip().to_string(),
466 local_addr.port(),
467 )
468 .await
469 .unwrap();
470 logger.warn("This is a test message").await;
471
472 let rx = receiver.recv().await.unwrap();
473 let received_message: Value = serde_json::from_str(&rx).unwrap();
474 assert_eq!(received_message["application"], "TestApp");
475 assert_eq!(received_message["level"], "WARN");
476 assert_eq!(received_message["message"], "This is a test message");
477 stop_server.send(()).unwrap();
478 }
479
480 #[tokio::test]
481 async fn test_logger_warnf() {
482 let _guard = TEST_MUTEX.lock().unwrap();
483 let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
484 let level = "WARN";
485
486 let mut logger = Logger::new(
487 "TestApp",
488 &level,
489 &local_addr.ip().to_string(),
490 local_addr.port(),
491 )
492 .await
493 .unwrap();
494 let test_arg = "dear tester";
495 logger
496 .warnf(format_args!("This is a test message {}", test_arg))
497 .await;
498
499 let rx = receiver.recv().await.unwrap();
500 let received_message: Value = serde_json::from_str(&rx).unwrap();
501 assert_eq!(received_message["application"], "TestApp");
502 assert_eq!(received_message["level"], "WARN");
503 assert_eq!(
504 received_message["message"],
505 "This is a test message dear tester"
506 );
507 stop_server.send(()).unwrap();
508 }
509
510 #[tokio::test]
511 async fn test_logger_debug() {
512 let _guard = TEST_MUTEX.lock().unwrap();
513 let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
514 let level = "DEBUG";
515
516 let mut logger = Logger::new(
517 "TestApp",
518 &level,
519 &local_addr.ip().to_string(),
520 local_addr.port(),
521 )
522 .await
523 .unwrap();
524 logger.debug("This is a test message").await;
525
526 let rx = receiver.recv().await.unwrap();
527 let received_message: Value = serde_json::from_str(&rx).unwrap();
528 assert_eq!(received_message["application"], "TestApp");
529 assert_eq!(received_message["level"], "DEBUG");
530 assert_eq!(received_message["message"], "This is a test message");
531 stop_server.send(()).unwrap();
532 }
533
534 #[tokio::test]
535 async fn test_logger_debugf() {
536 let _guard = TEST_MUTEX.lock().unwrap();
537 let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
538 let level = "DEBUG";
539
540 let mut logger = Logger::new(
541 "TestApp",
542 &level,
543 &local_addr.ip().to_string(),
544 local_addr.port(),
545 )
546 .await
547 .unwrap();
548 let test_arg = "dear tester";
549 logger
550 .debugf(format_args!("This is a test message {}", test_arg))
551 .await;
552
553 let rx = receiver.recv().await.unwrap();
554 let received_message: Value = serde_json::from_str(&rx).unwrap();
555 assert_eq!(received_message["application"], "TestApp");
556 assert_eq!(received_message["level"], "DEBUG");
557 assert_eq!(
558 received_message["message"],
559 "This is a test message dear tester"
560 );
561 stop_server.send(()).unwrap();
562 }
563
564 #[tokio::test]
565 async fn test_logger_trace() {
566 let _guard = TEST_MUTEX.lock().unwrap();
567 let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
568 let level = "TRACE";
569
570 let mut logger = Logger::new(
571 "TestApp",
572 &level,
573 &local_addr.ip().to_string(),
574 local_addr.port(),
575 )
576 .await
577 .unwrap();
578 logger.trace("This is a test message").await;
579
580 let rx = receiver.recv().await.unwrap();
581 let received_message: Value = serde_json::from_str(&rx).unwrap();
582 assert_eq!(received_message["application"], "TestApp");
583 assert_eq!(received_message["level"], "TRACE");
584 assert_eq!(received_message["message"], "This is a test message");
585 stop_server.send(()).unwrap();
586 }
587
588 #[tokio::test]
589 async fn test_logger_tracef() {
590 let _guard = TEST_MUTEX.lock().unwrap();
591 let (local_addr, mut receiver, stop_server) = start_mock_server(_get_random_port()).await;
592 let level = "TRACE";
593
594 let mut logger = Logger::new(
595 "TestApp",
596 &level,
597 &local_addr.ip().to_string(),
598 local_addr.port(),
599 )
600 .await
601 .unwrap();
602 let test_arg = "dear tester";
603 logger
604 .tracef(format_args!("This is a test message {}", test_arg))
605 .await;
606
607 let rx = receiver.recv().await.unwrap();
608 let received_message: Value = serde_json::from_str(&rx).unwrap();
609 assert_eq!(received_message["application"], "TestApp");
610 assert_eq!(received_message["level"], "TRACE");
611 assert_eq!(
612 received_message["message"],
613 "This is a test message dear tester"
614 );
615 stop_server.send(()).unwrap();
616 }
617
618 #[tokio::test]
619 async fn test_logger_reconnect() {
620 let _guard = TEST_MUTEX.lock().unwrap();
621 let (local_addr, _, stop_server) = start_mock_server(_get_random_port()).await;
622 let level = "INFO";
623
624 let logger = Logger::new(
625 "TestApp",
626 &level,
627 &local_addr.ip().to_string(),
628 local_addr.port(),
629 )
630 .await
631 .unwrap();
632 let mut logger_reconnected = logger.reconnect().await.unwrap();
633 logger_reconnected.info("This is a test message").await;
634 stop_server.send(()).unwrap();
635 }
636}