1use chrono::{DateTime, Utc};
113use colored::Colorize;
114use lazy_static::lazy_static;
115use rdkafka::config::ClientConfig;
116use rdkafka::producer::{FutureProducer, FutureRecord};
117use serde::{Deserialize, Serialize};
118use serde_json::Value;
119use std::{
120 collections::HashMap,
121 fmt,
122 io::{self, Write},
123 sync::{Arc, Mutex},
124 time::Duration,
125};
126
127pub use chrono;
129pub use colored;
130pub use serde;
131pub use serde_json;
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
135#[serde(rename_all = "lowercase")]
136pub enum Level {
137 Trace,
138 Debug,
139 Info,
140 Warn,
141 Error,
142 Fatal,
143 Panic,
144}
145
146impl fmt::Display for Level {
147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148 match self {
149 Level::Trace => write!(f, "TRACE"),
150 Level::Debug => write!(f, "DEBUG"),
151 Level::Info => write!(f, "INFO"),
152 Level::Warn => write!(f, "WARN"),
153 Level::Error => write!(f, "ERROR"),
154 Level::Fatal => write!(f, "FATAL"),
155 Level::Panic => write!(f, "PANIC"),
156 }
157 }
158}
159
160impl Level {
161 pub fn from_str(level: &str) -> Option<Level> {
163 match level.to_lowercase().as_str() {
164 "trace" => Some(Level::Trace),
165 "debug" => Some(Level::Debug),
166 "info" => Some(Level::Info),
167 "warn" | "warning" => Some(Level::Warn),
168 "error" => Some(Level::Error),
169 "fatal" => Some(Level::Fatal),
170 "panic" => Some(Level::Panic),
171 _ => None,
172 }
173 }
174}
175
176pub type Fields = HashMap<String, Value>;
178
179#[derive(Debug, Clone, Serialize)]
181pub struct Entry<'a> {
182 pub timestamp: DateTime<Utc>,
183 pub level: Level,
184 pub message: String,
185 pub fields: Fields,
186 #[serde(skip)]
187 pub logger: &'a Logger,
188}
189
190pub trait Hook: Send + Sync {
192 fn levels(&self) -> Vec<Level>;
194
195 fn fire(&self, entry: &Entry) -> Result<(), Box<dyn std::error::Error>>;
197
198 #[allow(unused_variables)]
200 fn fire_async<'a>(
201 &'a self,
202 entry: &'a Entry,
203 ) -> std::pin::Pin<
204 Box<dyn std::future::Future<Output = Result<(), Box<dyn std::error::Error>>> + Send + 'a>,
205 > {
206 Box::pin(async move { self.fire(entry) })
207 }
208}
209
210pub trait Formatter: Send + Sync {
212 fn format(&self, entry: &Entry) -> Result<Vec<u8>, Box<dyn std::error::Error>>;
213}
214
215#[derive(Debug, Clone)]
217pub struct TextFormatter {
218 timestamp_format: String,
219 colors: bool,
220 full_timestamp: bool,
221}
222
223impl Default for TextFormatter {
224 fn default() -> Self {
225 Self {
226 timestamp_format: "%Y-%m-%dT%H:%M:%S%.3fZ".to_string(),
227 colors: true,
228 full_timestamp: true,
229 }
230 }
231}
232
233impl TextFormatter {
234 pub fn new() -> Self {
235 Self::default()
236 }
237
238 pub fn timestamp_format(mut self, format: &str) -> Self {
239 self.timestamp_format = format.to_string();
240 self
241 }
242
243 pub fn colors(mut self, enabled: bool) -> Self {
244 self.colors = enabled;
245 self
246 }
247
248 pub fn full_timestamp(mut self, enabled: bool) -> Self {
249 self.full_timestamp = enabled;
250 self
251 }
252
253 pub fn build(self) -> Self {
254 self
255 }
256}
257
258impl Formatter for TextFormatter {
259 fn format(&self, entry: &Entry) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
260 let mut output = Vec::new();
261
262 let timestamp = if self.full_timestamp {
264 entry.timestamp.format(&self.timestamp_format).to_string()
265 } else {
266 entry.timestamp.format("%H:%M:%S").to_string()
267 };
268
269 let level = if self.colors {
271 match entry.level {
272 Level::Trace => entry.level.to_string().white(),
273 Level::Debug => entry.level.to_string().blue(),
274 Level::Info => entry.level.to_string().green(),
275 Level::Warn => entry.level.to_string().yellow(),
276 Level::Error => entry.level.to_string().red(),
277 Level::Fatal => entry.level.to_string().red().bold(),
278 Level::Panic => entry.level.to_string().red().bold(),
279 }
280 .to_string()
281 } else {
282 entry.level.to_string()
283 };
284
285 write!(output, "[{}] [{}] {}", timestamp, level, entry.message)?;
287
288 if !entry.fields.is_empty() {
290 for (key, value) in &entry.fields {
291 write!(output, " {}={}", key, value)?;
292 }
293 }
294
295 write!(output, "\n")?;
296 Ok(output)
297 }
298}
299
300#[derive(Debug, Clone)]
302pub struct JSONFormatter {
303 pretty: bool,
304}
305
306impl JSONFormatter {
307 pub fn new() -> Self {
308 Self { pretty: false }
309 }
310
311 pub fn pretty(mut self, enabled: bool) -> Self {
312 self.pretty = enabled;
313 self
314 }
315}
316
317impl Default for JSONFormatter {
318 fn default() -> Self {
319 Self { pretty: false }
320 }
321}
322
323impl Formatter for JSONFormatter {
324 fn format(&self, entry: &Entry) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
325 let mut output = Vec::new();
326 if self.pretty {
327 serde_json::to_writer_pretty(&mut output, &entry)?;
328 } else {
329 serde_json::to_writer(&mut output, &entry)?;
330 }
331 output.extend_from_slice(b"\n");
332 Ok(output)
333 }
334}
335
336pub struct KafkaHook {
338 producer: FutureProducer,
339 topic: String,
340 key_field: Option<String>,
341}
342
343impl KafkaHook {
344 pub fn new(bootstrap_servers: &str, topic: String) -> Result<Self, Box<dyn std::error::Error>> {
346 let producer: FutureProducer = ClientConfig::new()
347 .set("bootstrap.servers", bootstrap_servers)
348 .create()?;
349
350 Ok(KafkaHook {
351 producer,
352 topic,
353 key_field: None,
354 })
355 }
356
357 pub fn with_key_field(mut self, key_field: String) -> Self {
359 self.key_field = Some(key_field);
360 self
361 }
362
363 fn get_key_from_fields(&self, fields: &Fields) -> Option<String> {
364 self.key_field.as_ref().and_then(|key_field| {
365 fields
366 .get(key_field)
367 .and_then(|value| value.as_str().map(|s| s.to_string()))
368 })
369 }
370}
371
372impl Hook for KafkaHook {
373 fn levels(&self) -> Vec<Level> {
374 vec![
375 Level::Trace,
376 Level::Debug,
377 Level::Info,
378 Level::Warn,
379 Level::Error,
380 Level::Fatal,
381 Level::Panic,
382 ]
383 }
384
385 fn fire(&self, _entry: &Entry) -> Result<(), Box<dyn std::error::Error>> {
386 Err("KafkaHook requires an async runtime. Please use fire_async or ensure you're in an async context.".into())
388 }
389
390 fn fire_async<'a>(
391 &'a self,
392 entry: &'a Entry,
393 ) -> std::pin::Pin<
394 Box<dyn std::future::Future<Output = Result<(), Box<dyn std::error::Error>>> + Send + 'a>,
395 > {
396 Box::pin(async move {
397 let payload = serde_json::to_string(&entry)?;
398 let key = self.get_key_from_fields(&entry.fields);
399
400 let mut record = FutureRecord::to(&self.topic).payload(payload.as_bytes());
401
402 if let Some(ref key) = key {
403 record = record.key(key);
404 }
405
406 self.producer
407 .send(record, Duration::from_secs(0))
408 .await
409 .map_err(|(err, _)| err)?;
410 Ok(())
411 })
412 }
413}
414
415pub struct Logger {
417 level: Level,
418 formatter: Box<dyn Formatter>,
419 hooks: Vec<Box<dyn Hook>>,
420 output: Arc<Mutex<Box<dyn Write + Send>>>,
421}
422
423impl fmt::Debug for Logger {
424 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
425 f.debug_struct("Logger")
426 .field("level", &self.level)
427 .field("hooks_count", &self.hooks.len())
428 .finish()
429 }
430}
431
432impl Clone for Logger {
433 fn clone(&self) -> Self {
434 Self {
435 level: self.level,
436 formatter: Box::new(TextFormatter::default()),
437 hooks: Vec::new(),
438 output: Arc::clone(&self.output),
439 }
440 }
441}
442
443impl Logger {
444 pub fn new() -> Self {
445 Self {
446 level: Level::Info,
447 formatter: Box::new(TextFormatter::default()),
448 hooks: Vec::new(),
449 output: Arc::new(Mutex::new(Box::new(io::stdout()))),
450 }
451 }
452
453 pub fn level(mut self, level: Level) -> Self {
454 self.level = level;
455 self
456 }
457
458 pub fn formatter<F: Formatter + 'static>(mut self, formatter: F) -> Self {
459 self.formatter = Box::new(formatter);
460 self
461 }
462
463 pub fn add_hook<H: Hook + 'static>(mut self, hook: H) -> Self {
464 self.hooks.push(Box::new(hook));
465 self
466 }
467
468 pub fn output<W: Write + Send + 'static>(mut self, output: W) -> Self {
469 self.output = Arc::new(Mutex::new(Box::new(output)));
470 self
471 }
472
473 pub fn build(self) -> Arc<Self> {
474 Arc::new(self)
475 }
476
477 pub async fn log_async(
479 &self,
480 level: Level,
481 msg: &str,
482 fields: Fields,
483 ) -> Result<(), Box<dyn std::error::Error>> {
484 if level < self.level {
485 return Ok(());
486 }
487
488 let entry = Entry {
489 message: msg.to_string(),
490 level,
491 timestamp: chrono::Utc::now(),
492 fields,
493 logger: self,
494 };
495
496 let formatted = self.formatter.format(&entry)?;
498 {
499 let mut output = self.output.lock().unwrap();
500 output.write_all(&formatted)?;
501 output.flush()?;
502 }
503
504 for hook in &self.hooks {
506 if hook.levels().contains(&level) {
507 if let Err(e) = hook.fire_async(&entry).await {
508 eprintln!("Hook failed: {}", e);
509 }
510 }
511 }
512
513 Ok(())
514 }
515
516 pub fn log(
518 &self,
519 level: Level,
520 msg: &str,
521 fields: Fields,
522 ) -> Result<(), Box<dyn std::error::Error>> {
523 if level < self.level {
524 return Ok(());
525 }
526
527 let entry = Entry {
528 message: msg.to_string(),
529 level,
530 timestamp: chrono::Utc::now(),
531 fields,
532 logger: self,
533 };
534
535 let formatted = self.formatter.format(&entry)?;
537 {
538 let mut output = self.output.lock().unwrap();
539 output.write_all(&formatted)?;
540 output.flush()?;
541 }
542
543 for hook in &self.hooks {
545 if hook.levels().contains(&level) {
546 if let Ok(rt) = tokio::runtime::Runtime::new() {
548 rt.block_on(async {
549 if let Err(e) = hook.fire_async(&entry).await {
550 eprintln!("Hook failed: {}", e);
551 }
552 });
553 } else if let Err(e) = hook.fire(&entry) {
554 eprintln!("Hook failed: {}", e);
555 }
556 }
557 }
558
559 Ok(())
560 }
561
562 pub fn with_fields(&self, fields: Fields) -> EntryBuilder {
563 EntryBuilder {
564 logger: self,
565 fields,
566 }
567 }
568}
569
570pub struct EntryBuilder<'a> {
572 logger: &'a Logger,
573 fields: Fields,
574}
575
576impl<'a> Clone for EntryBuilder<'a> {
577 fn clone(&self) -> Self {
578 Self {
579 logger: self.logger,
580 fields: self.fields.clone(),
581 }
582 }
583}
584
585impl<'a> EntryBuilder<'a> {
586 pub fn with_field<K, V>(mut self, key: K, value: V) -> Self
587 where
588 K: Into<String>,
589 V: Serialize,
590 {
591 self.fields.insert(
592 key.into(),
593 serde_json::to_value(value).unwrap_or(Value::Null),
594 );
595 self
596 }
597
598 pub fn with_time(self, time: DateTime<Utc>) -> Self {
599 self.with_field("time", time.to_rfc3339())
600 }
601
602 pub fn with_error<E: std::error::Error>(self, err: &E) -> Self {
603 self.with_field("error", err.to_string())
604 }
605
606 pub fn with_fields_map<K, V>(mut self, fields: impl IntoIterator<Item = (K, V)>) -> Self
607 where
608 K: Into<String>,
609 V: serde::Serialize,
610 {
611 for (key, value) in fields {
612 if let Ok(value) = serde_json::to_value(value) {
613 self.fields.insert(key.into(), value);
614 }
615 }
616 self
617 }
618
619 pub fn trace<M: Into<String>>(self, msg: M) -> Result<(), Box<dyn std::error::Error>> {
620 self.logger.log(Level::Trace, &msg.into(), self.fields)
621 }
622
623 pub fn debug<M: Into<String>>(self, msg: M) -> Result<(), Box<dyn std::error::Error>> {
624 self.logger.log(Level::Debug, &msg.into(), self.fields)
625 }
626
627 pub fn info<M: Into<String>>(self, msg: M) -> Result<(), Box<dyn std::error::Error>> {
628 self.logger.log(Level::Info, &msg.into(), self.fields)
629 }
630
631 pub fn warn<M: Into<String>>(self, msg: M) -> Result<(), Box<dyn std::error::Error>> {
632 self.logger.log(Level::Warn, &msg.into(), self.fields)
633 }
634
635 pub fn error<M: Into<String>>(self, msg: M) -> Result<(), Box<dyn std::error::Error>> {
636 self.logger.log(Level::Error, &msg.into(), self.fields)
637 }
638
639 pub fn fatal<M: Into<String>>(self, msg: M) -> Result<(), Box<dyn std::error::Error>> {
640 self.logger.log(Level::Fatal, &msg.into(), self.fields)
641 }
642
643 pub fn panic<M: Into<String>>(self, msg: M) -> Result<(), Box<dyn std::error::Error>> {
644 self.logger.log(Level::Panic, &msg.into(), self.fields)
645 }
646}
647
648lazy_static! {
650 static ref GLOBAL_LOGGER: Arc<Logger> = Logger::new().build();
651}
652
653pub fn set_level(level: Level) {
655 let logger = GLOBAL_LOGGER.clone();
656 if let Some(logger) = Arc::get_mut(&mut logger.clone()) {
657 logger.level = level;
658 }
659}
660
661pub fn with_fields<'a>(fields: Fields) -> EntryBuilder<'a> {
662 GLOBAL_LOGGER.with_fields(fields)
663}
664
665pub fn with_error<E: std::error::Error>(err: &E) -> EntryBuilder<'static> {
666 GLOBAL_LOGGER.with_fields(Fields::new()).with_error(err)
667}
668
669pub fn with_time(time: DateTime<Utc>) -> EntryBuilder<'static> {
670 GLOBAL_LOGGER.with_fields(Fields::new()).with_time(time)
671}
672
673pub fn parse_level(level: &str) -> Option<Level> {
674 Level::from_str(level)
675}
676
677#[macro_export]
679macro_rules! with_fields {
680 ($($key:expr => $value:expr),* $(,)?) => {{
681 let mut fields = ::std::collections::HashMap::new();
682 $(
683 fields.insert($key.to_string(), $crate::serde_json::to_value($value).unwrap_or($crate::serde_json::Value::Null));
684 )*
685 $crate::with_fields(fields)
686 }};
687}
688
689#[macro_export]
690macro_rules! trace {
691 ($msg:expr) => {
692 $crate::with_fields!()
693 .trace($msg)
694 .expect("Failed to log trace message")
695 };
696}
697
698#[macro_export]
699macro_rules! debug {
700 ($msg:expr) => {
701 $crate::with_fields!()
702 .debug($msg)
703 .expect("Failed to log debug message")
704 };
705}
706
707#[macro_export]
708macro_rules! info {
709 ($msg:expr) => {
710 $crate::with_fields!()
711 .info($msg)
712 .expect("Failed to log info message")
713 };
714}
715
716#[macro_export]
717macro_rules! warn {
718 ($msg:expr) => {
719 $crate::with_fields!()
720 .warn($msg)
721 .expect("Failed to log warning message")
722 };
723}
724
725#[macro_export]
726macro_rules! error {
727 ($msg:expr) => {
728 $crate::with_fields!()
729 .error($msg)
730 .expect("Failed to log error message")
731 };
732}
733
734#[macro_export]
735macro_rules! fatal {
736 ($msg:expr) => {
737 $crate::with_fields!()
738 .fatal($msg)
739 .expect("Failed to log fatal message")
740 };
741}
742
743#[macro_export]
744macro_rules! panic {
745 ($msg:expr) => {
746 $crate::with_fields!()
747 .panic($msg)
748 .expect("Failed to log panic message")
749 };
750}
751
752#[cfg(test)]
754mod test {
755 use super::*;
756 use std::io;
757
758 #[derive(Debug)]
760 struct TestWriter {
761 buffer: Arc<Mutex<Vec<u8>>>,
762 }
763
764 impl Default for TestWriter {
765 fn default() -> Self {
766 Self {
767 buffer: Arc::new(Mutex::new(Vec::new())),
768 }
769 }
770 }
771
772 impl Write for TestWriter {
773 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
774 self.buffer.lock().unwrap().extend_from_slice(buf);
775 Ok(buf.len())
776 }
777
778 fn flush(&mut self) -> io::Result<()> {
779 Ok(())
780 }
781 }
782
783 impl Clone for TestWriter {
784 fn clone(&self) -> Self {
785 Self {
786 buffer: Arc::clone(&self.buffer),
787 }
788 }
789 }
790
791 struct TestHook {
793 called: Arc<Mutex<bool>>,
794 }
795
796 impl TestHook {
797 fn new() -> (Self, Arc<Mutex<bool>>) {
798 let called = Arc::new(Mutex::new(false));
799 (
800 Self {
801 called: Arc::clone(&called),
802 },
803 called,
804 )
805 }
806 }
807
808 impl Hook for TestHook {
809 fn levels(&self) -> Vec<Level> {
810 vec![Level::Info]
811 }
812
813 fn fire(&self, _entry: &Entry) -> Result<(), Box<dyn std::error::Error>> {
814 *self.called.lock().unwrap() = true;
815 Ok(())
816 }
817
818 fn fire_async<'a>(
819 &'a self,
820 entry: &'a Entry,
821 ) -> std::pin::Pin<
822 Box<
823 dyn std::future::Future<Output = Result<(), Box<dyn std::error::Error>>>
824 + Send
825 + 'a,
826 >,
827 > {
828 Box::pin(async move { self.fire(entry) })
829 }
830 }
831
832 fn create_test_logger() -> (Arc<Logger>, TestWriter) {
834 let writer = TestWriter::default();
835 let logger = Logger::new()
836 .formatter(TextFormatter::default().colors(false))
837 .output(Box::new(writer.clone()))
838 .build();
839 (logger, writer)
840 }
841
842 #[test]
843 fn test_basic_logging() {
844 let (logger, writer) = create_test_logger();
845 logger
846 .log(Level::Info, "test message", Fields::new())
847 .unwrap();
848 let output = String::from_utf8(writer.buffer.lock().unwrap().clone()).unwrap();
849 assert!(output.contains("test message"));
850 }
851
852 #[test]
853 fn test_json_formatter() {
854 let writer = TestWriter::default();
855 let logger = Logger::new()
856 .formatter(JSONFormatter::default())
857 .output(Box::new(writer.clone()))
858 .build();
859
860 let mut fields = Fields::new();
861 fields.insert("key".to_string(), serde_json::json!("value"));
862 logger.log(Level::Info, "test message", fields).unwrap();
863
864 let output = String::from_utf8(writer.buffer.lock().unwrap().clone()).unwrap();
865 let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
866 assert_eq!(parsed["message"], "test message");
867 assert_eq!(parsed["fields"]["key"], "value");
868 }
869
870 #[test]
871 fn test_hooks() {
872 let (hook, called) = TestHook::new();
873 let logger = Logger::new().add_hook(hook).build();
874
875 logger
876 .log(Level::Info, "test message", Fields::new())
877 .unwrap();
878 assert!(*called.lock().unwrap());
879 }
880
881 #[test]
882 fn test_with_error_and_time() {
883 let (logger, writer) = create_test_logger();
884 let error = io::Error::new(io::ErrorKind::Other, "test error");
885 let time = chrono::Utc::now();
886
887 logger
888 .with_fields(Fields::new())
889 .with_error(&error)
890 .with_time(time)
891 .info("test message")
892 .unwrap();
893
894 let output = String::from_utf8(writer.buffer.lock().unwrap().clone()).unwrap();
895 assert!(output.contains("test message"));
896 assert!(output.contains("test error"));
897 }
898
899 #[test]
900 fn test_with_fields_map() {
901 let (logger, writer) = create_test_logger();
902 let mut fields = std::collections::HashMap::new();
903 fields.insert("key".to_string(), "value");
904
905 logger
906 .with_fields(Fields::new())
907 .with_fields_map(fields)
908 .info("test message")
909 .unwrap();
910
911 let output = String::from_utf8(writer.buffer.lock().unwrap().clone()).unwrap();
912 assert!(output.contains("test message"));
913 assert!(output.contains("key"));
914 assert!(output.contains("value"));
915 }
916
917 #[test]
918 fn test_level_parsing() {
919 assert_eq!(Level::from_str("INFO"), Some(Level::Info));
920 assert_eq!(Level::from_str("invalid"), None);
921 }
922
923 #[test]
924 fn test_all_log_levels() {
925 let (_logger, writer) = create_test_logger();
926 let logger = Logger::new()
928 .level(Level::Trace)
929 .formatter(TextFormatter::default().colors(false))
930 .output(Box::new(writer.clone()))
931 .build();
932
933 logger
935 .log(Level::Trace, "trace message", Fields::new())
936 .unwrap();
937 logger
938 .log(Level::Debug, "debug message", Fields::new())
939 .unwrap();
940 logger
941 .log(Level::Info, "info message", Fields::new())
942 .unwrap();
943 logger
944 .log(Level::Warn, "warn message", Fields::new())
945 .unwrap();
946 logger
947 .log(Level::Error, "error message", Fields::new())
948 .unwrap();
949 logger
950 .log(Level::Fatal, "fatal message", Fields::new())
951 .unwrap();
952 logger
953 .log(Level::Panic, "panic message", Fields::new())
954 .unwrap();
955
956 let output = String::from_utf8(writer.buffer.lock().unwrap().clone()).unwrap();
957
958 assert!(output.contains("trace message"));
960 assert!(output.contains("debug message"));
961 assert!(output.contains("info message"));
962 assert!(output.contains("warn message"));
963 assert!(output.contains("error message"));
964 assert!(output.contains("fatal message"));
965 assert!(output.contains("panic message"));
966
967 assert!(output.contains("TRACE"));
969 assert!(output.contains("DEBUG"));
970 assert!(output.contains("INFO"));
971 assert!(output.contains("WARN"));
972 assert!(output.contains("ERROR"));
973 assert!(output.contains("FATAL"));
974 assert!(output.contains("PANIC"));
975 }
976}