tracing_loki_layer/
lib.rs

1use std::collections::HashMap;
2use std::sync::mpsc::{Receiver, Sender};
3use std::sync::Mutex;
4use std::thread;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use serde::Serialize;
8use tracing::field::Field;
9use tracing::Event;
10use tracing_subscriber::field::Visit;
11use tracing_subscriber::layer::Context;
12use tracing_subscriber::Layer;
13
14pub struct LokiLayer {
15    default_values: HashMap<String, String>,
16    sender: Mutex<Sender<LokiRequest>>,
17}
18
19#[derive(Debug, Serialize)]
20struct LokiStream {
21    stream: HashMap<String, String>,
22    values: Vec<[String; 2]>,
23}
24
25#[derive(Debug, Serialize)]
26struct LokiRequest {
27    streams: Vec<LokiStream>,
28}
29
30impl LokiLayer {
31    pub fn new<S: AsRef<str>>(url: S, default_values: HashMap<String, String>) -> Self {
32        let (sender, receiver) = std::sync::mpsc::channel();
33        let result = Self {
34            default_values,
35            sender: Mutex::new(sender),
36        };
37        Self::setup_loki_sender(url, receiver);
38        result
39    }
40
41    fn setup_loki_sender<S: AsRef<str>>(url: S, receiver: Receiver<LokiRequest>) {
42        let url = format!("{}/loki/api/v1/push", url.as_ref());
43        let client = reqwest::blocking::Client::new();
44        thread::spawn(move || loop {
45            match receiver.recv() {
46                Ok(request) => match client.post(url.clone()).json(&request).send() {
47                    Ok(_) => {}
48                    Err(e) => {
49                        eprintln!("Error send to loki {:?}", e);
50                    }
51                },
52                Err(e) => {
53                    eprintln!("Shutdown loki sender {:?}", e);
54                    break;
55                }
56            };
57        });
58    }
59}
60
61#[derive(Default)]
62pub struct ValuesVisitor {
63    values: HashMap<String, String>,
64}
65
66impl Visit for ValuesVisitor {
67    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
68        if field.name() != "message" {
69            self.values.insert(
70                field.name().to_string().replace('.', "_"),
71                format!("{:?}", value),
72            );
73        }
74    }
75}
76
77#[derive(Default)]
78pub struct ValueVisitor {
79    name: String,
80    result: Option<String>,
81}
82
83impl ValueVisitor {
84    pub fn new<S: AsRef<str>>(name: S) -> Self {
85        Self {
86            name: name.as_ref().to_string(),
87            result: None,
88        }
89    }
90}
91
92impl Visit for ValueVisitor {
93    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
94        if field.name() == self.name {
95            self.result = Some(format!("{:?}", value));
96        }
97    }
98}
99
100impl<S: tracing::Subscriber> Layer<S> for LokiLayer {
101    fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
102        let mut values_visitor = ValuesVisitor::default();
103        event.record(&mut values_visitor);
104        let mut message_visitor = ValueVisitor::new("message");
105        event.record(&mut message_visitor);
106        let mut values = values_visitor.values;
107        values.insert(
108            "level".to_string(),
109            event.metadata().level().to_string().to_lowercase(),
110        );
111        for (k, v) in self.default_values.iter() {
112            values.insert(k.to_owned(), v.to_owned());
113        }
114
115        if let Some(file) = event.metadata().file() {
116            values.insert("file".to_owned(), file.to_owned());
117        }
118
119        if let Some(line) = event.metadata().line() {
120            values.insert("line".to_owned(), line.to_string());
121        }
122
123        if let Some(module) = event.metadata().module_path() {
124            values.insert("module".to_owned(), module.to_string());
125        }
126
127        let start = SystemTime::now();
128        let since_start = start
129            .duration_since(UNIX_EPOCH)
130            .expect("cant get duration since");
131        let time_ns = since_start.as_nanos().to_string();
132
133        let request = LokiRequest {
134            streams: vec![LokiStream {
135                stream: values,
136                values: vec![[
137                    time_ns,
138                    message_visitor.result.unwrap_or_else(|| "".to_string()),
139                ]],
140            }],
141        };
142
143        // disable recursion
144        if event.metadata().name().to_string().contains("hyper") {
145            return;
146        }
147
148        match self.sender.lock() {
149            Ok(sender) => match sender.send(request) {
150                Ok(_) => {}
151                Err(e) => {
152                    eprintln!("Loki sender thread is closed {:?}", e);
153                }
154            },
155            Err(e) => {
156                eprintln!("Cannot lock mutex for send to loki {:?}", e);
157            }
158        }
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use std::time::Duration;
165
166    use httpmock::MockServer;
167    use tracing::error;
168    use tracing_subscriber::layer::SubscriberExt;
169    use tracing_subscriber::Registry;
170
171    use crate::LokiLayer;
172
173    #[test]
174    pub fn should_send_log() {
175        let server = MockServer::start();
176        let loki_layer = LokiLayer::new(server.base_url(), Default::default());
177        let subscriber = Registry::default().with(loki_layer);
178        let _guard = tracing::subscriber::set_default(subscriber);
179        let http_server_mock = server.mock(|when, _then| {
180            when.method(httpmock::Method::POST)
181                .path("/loki/api/v1/push");
182        });
183
184        error!("test");
185        std::thread::sleep(Duration::from_secs(1));
186        http_server_mock.assert();
187    }
188}