kafka_logger/
lib.rs

1use std::time::Duration;
2
3use log::{LevelFilter, Log};
4use rdkafka::{
5  producer::{BaseProducer, BaseRecord, Producer},
6  ClientConfig,
7};
8use serde::{Deserialize, Serialize};
9
10/// Same as init but uses KAFKA_BROKER, KAFKA_TOPIC_LOG and RUST_LOG environment variables as parameters
11pub fn init_from_env() {
12  let server = std::env::var("KAFKA_BROKER").expect("Failed to load KAFKA_BROKER");
13  let topic = std::env::var("KAFKA_TOPIC_LOG").expect("Failed to load KAFKA_TOPIC_LOG");
14  let filter = std::env::var("RUST_LOG")
15    .map(|level| level.parse().unwrap_or(LevelFilter::Info))
16    .unwrap_or(LevelFilter::Info);
17
18  init(&server, filter, &topic);
19}
20
21/// Init the kafka logger
22///
23/// # Examples
24/// ```
25/// kafka_logger::init("localhost:9093", log::LevelFilter::Debug, "logging");
26///
27/// log::info!("test");
28///
29/// kafka_logger::shutdown();  
30/// ```
31pub fn init(server: &str, filter: LevelFilter, topic: &str) {
32  log::set_boxed_logger(Box::new(Logger::init(server, filter, topic)))
33    .expect("Failed to set Logger");
34  log::set_max_level(filter);
35}
36
37/// Waits for all remaining log messages to be send to kafka
38pub fn shutdown() {
39  log::logger().flush();
40}
41
42/// Format of log messages send to kafka
43#[derive(Serialize, Deserialize, Debug)]
44pub struct Record {
45  pub level: String,
46  pub target: String,
47  pub message: String,
48}
49
50struct Logger {
51  producer: BaseProducer,
52  filter: LevelFilter,
53  topic: String,
54}
55
56impl Logger {
57  fn init(server: &str, filter: LevelFilter, topic: &str) -> Self {
58    let producer: BaseProducer = ClientConfig::new()
59      .set("bootstrap.servers", server)
60      .set("message.timeout.ms", "5000")
61      .create()
62      .expect("Failed to create logger");
63
64    Self {
65      producer,
66      filter,
67      topic: topic.into(),
68    }
69  }
70}
71
72impl Drop for Logger {
73  fn drop(&mut self) {
74    self.flush();
75  }
76}
77
78impl Log for Logger {
79  fn enabled(&self, metadata: &log::Metadata) -> bool {
80    self
81      .filter
82      .to_level()
83      .map(|level| metadata.level() <= level)
84      .unwrap_or(false)
85  }
86
87  fn flush(&self) {
88    self
89      .producer
90      .flush(Duration::from_secs(2))
91      .expect("Failed to flush");
92  }
93
94  fn log(&self, record: &log::Record) {
95    if self.enabled(record.metadata()) && !record.target().starts_with("rdkafka") {
96      let _ = self.producer.send(
97        BaseRecord::to(&self.topic)
98          .payload(
99            &serde_json::to_string(&Record {
100              level: record.level().to_string(),
101              target: record.target().to_string(),
102              message: record.args().to_string(),
103            })
104            .unwrap(),
105          )
106          .key(&()),
107      );
108    }
109  }
110}
111
112#[cfg(test)]
113mod test {
114  use crate::{init_from_env, shutdown};
115
116  #[test]
117  fn test_env_init() {
118    std::env::set_var("KAFKA_BROKER", "localhost");
119    std::env::set_var("KAFKA_TOPIC_LOG", "logging");
120
121    init_from_env();
122    shutdown();
123  }
124}