tracing_loki_layer/
lib.rs1use std::collections::HashMap;
2use std::sync::mpsc::{Receiver, Sender};
3use std::sync::Mutex;
4use std::thread;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use serde::Serialize;
8use tracing::field::Field;
9use tracing::Event;
10use tracing_subscriber::field::Visit;
11use tracing_subscriber::layer::Context;
12use tracing_subscriber::Layer;
13
14pub struct LokiLayer {
15 default_values: HashMap<String, String>,
16 sender: Mutex<Sender<LokiRequest>>,
17}
18
19#[derive(Debug, Serialize)]
20struct LokiStream {
21 stream: HashMap<String, String>,
22 values: Vec<[String; 2]>,
23}
24
25#[derive(Debug, Serialize)]
26struct LokiRequest {
27 streams: Vec<LokiStream>,
28}
29
30impl LokiLayer {
31 pub fn new<S: AsRef<str>>(url: S, default_values: HashMap<String, String>) -> Self {
32 let (sender, receiver) = std::sync::mpsc::channel();
33 let result = Self {
34 default_values,
35 sender: Mutex::new(sender),
36 };
37 Self::setup_loki_sender(url, receiver);
38 result
39 }
40
41 fn setup_loki_sender<S: AsRef<str>>(url: S, receiver: Receiver<LokiRequest>) {
42 let url = format!("{}/loki/api/v1/push", url.as_ref());
43 let client = reqwest::blocking::Client::new();
44 thread::spawn(move || loop {
45 match receiver.recv() {
46 Ok(request) => match client.post(url.clone()).json(&request).send() {
47 Ok(_) => {}
48 Err(e) => {
49 eprintln!("Error send to loki {:?}", e);
50 }
51 },
52 Err(e) => {
53 eprintln!("Shutdown loki sender {:?}", e);
54 break;
55 }
56 };
57 });
58 }
59}
60
61#[derive(Default)]
62pub struct ValuesVisitor {
63 values: HashMap<String, String>,
64}
65
66impl Visit for ValuesVisitor {
67 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
68 if field.name() != "message" {
69 self.values.insert(
70 field.name().to_string().replace('.', "_"),
71 format!("{:?}", value),
72 );
73 }
74 }
75}
76
77#[derive(Default)]
78pub struct ValueVisitor {
79 name: String,
80 result: Option<String>,
81}
82
83impl ValueVisitor {
84 pub fn new<S: AsRef<str>>(name: S) -> Self {
85 Self {
86 name: name.as_ref().to_string(),
87 result: None,
88 }
89 }
90}
91
92impl Visit for ValueVisitor {
93 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
94 if field.name() == self.name {
95 self.result = Some(format!("{:?}", value));
96 }
97 }
98}
99
100impl<S: tracing::Subscriber> Layer<S> for LokiLayer {
101 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
102 let mut values_visitor = ValuesVisitor::default();
103 event.record(&mut values_visitor);
104 let mut message_visitor = ValueVisitor::new("message");
105 event.record(&mut message_visitor);
106 let mut values = values_visitor.values;
107 values.insert(
108 "level".to_string(),
109 event.metadata().level().to_string().to_lowercase(),
110 );
111 for (k, v) in self.default_values.iter() {
112 values.insert(k.to_owned(), v.to_owned());
113 }
114
115 if let Some(file) = event.metadata().file() {
116 values.insert("file".to_owned(), file.to_owned());
117 }
118
119 if let Some(line) = event.metadata().line() {
120 values.insert("line".to_owned(), line.to_string());
121 }
122
123 if let Some(module) = event.metadata().module_path() {
124 values.insert("module".to_owned(), module.to_string());
125 }
126
127 let start = SystemTime::now();
128 let since_start = start
129 .duration_since(UNIX_EPOCH)
130 .expect("cant get duration since");
131 let time_ns = since_start.as_nanos().to_string();
132
133 let request = LokiRequest {
134 streams: vec![LokiStream {
135 stream: values,
136 values: vec![[
137 time_ns,
138 message_visitor.result.unwrap_or_else(|| "".to_string()),
139 ]],
140 }],
141 };
142
143 if event.metadata().name().to_string().contains("hyper") {
145 return;
146 }
147
148 match self.sender.lock() {
149 Ok(sender) => match sender.send(request) {
150 Ok(_) => {}
151 Err(e) => {
152 eprintln!("Loki sender thread is closed {:?}", e);
153 }
154 },
155 Err(e) => {
156 eprintln!("Cannot lock mutex for send to loki {:?}", e);
157 }
158 }
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use std::time::Duration;
165
166 use httpmock::MockServer;
167 use tracing::error;
168 use tracing_subscriber::layer::SubscriberExt;
169 use tracing_subscriber::Registry;
170
171 use crate::LokiLayer;
172
173 #[test]
174 pub fn should_send_log() {
175 let server = MockServer::start();
176 let loki_layer = LokiLayer::new(server.base_url(), Default::default());
177 let subscriber = Registry::default().with(loki_layer);
178 let _guard = tracing::subscriber::set_default(subscriber);
179 let http_server_mock = server.mock(|when, _then| {
180 when.method(httpmock::Method::POST)
181 .path("/loki/api/v1/push");
182 });
183
184 error!("test");
185 std::thread::sleep(Duration::from_secs(1));
186 http_server_mock.assert();
187 }
188}