1use std::time::Duration;
2
3use log::{LevelFilter, Log};
4use rdkafka::{
5 producer::{BaseProducer, BaseRecord, Producer},
6 ClientConfig,
7};
8use serde::{Deserialize, Serialize};
9
10pub fn init_from_env() {
12 let server = std::env::var("KAFKA_BROKER").expect("Failed to load KAFKA_BROKER");
13 let topic = std::env::var("KAFKA_TOPIC_LOG").expect("Failed to load KAFKA_TOPIC_LOG");
14 let filter = std::env::var("RUST_LOG")
15 .map(|level| level.parse().unwrap_or(LevelFilter::Info))
16 .unwrap_or(LevelFilter::Info);
17
18 init(&server, filter, &topic);
19}
20
21pub fn init(server: &str, filter: LevelFilter, topic: &str) {
32 log::set_boxed_logger(Box::new(Logger::init(server, filter, topic)))
33 .expect("Failed to set Logger");
34 log::set_max_level(filter);
35}
36
37pub fn shutdown() {
39 log::logger().flush();
40}
41
42#[derive(Serialize, Deserialize, Debug)]
44pub struct Record {
45 pub level: String,
46 pub target: String,
47 pub message: String,
48}
49
50struct Logger {
51 producer: BaseProducer,
52 filter: LevelFilter,
53 topic: String,
54}
55
56impl Logger {
57 fn init(server: &str, filter: LevelFilter, topic: &str) -> Self {
58 let producer: BaseProducer = ClientConfig::new()
59 .set("bootstrap.servers", server)
60 .set("message.timeout.ms", "5000")
61 .create()
62 .expect("Failed to create logger");
63
64 Self {
65 producer,
66 filter,
67 topic: topic.into(),
68 }
69 }
70}
71
72impl Drop for Logger {
73 fn drop(&mut self) {
74 self.flush();
75 }
76}
77
78impl Log for Logger {
79 fn enabled(&self, metadata: &log::Metadata) -> bool {
80 self
81 .filter
82 .to_level()
83 .map(|level| metadata.level() <= level)
84 .unwrap_or(false)
85 }
86
87 fn flush(&self) {
88 self
89 .producer
90 .flush(Duration::from_secs(2))
91 .expect("Failed to flush");
92 }
93
94 fn log(&self, record: &log::Record) {
95 if self.enabled(record.metadata()) && !record.target().starts_with("rdkafka") {
96 let _ = self.producer.send(
97 BaseRecord::to(&self.topic)
98 .payload(
99 &serde_json::to_string(&Record {
100 level: record.level().to_string(),
101 target: record.target().to_string(),
102 message: record.args().to_string(),
103 })
104 .unwrap(),
105 )
106 .key(&()),
107 );
108 }
109 }
110}
111
112#[cfg(test)]
113mod test {
114 use crate::{init_from_env, shutdown};
115
116 #[test]
117 fn test_env_init() {
118 std::env::set_var("KAFKA_BROKER", "localhost");
119 std::env::set_var("KAFKA_TOPIC_LOG", "logging");
120
121 init_from_env();
122 shutdown();
123 }
124}