1#![deny(missing_docs)]
2#![deny(rustdoc::missing_doc_code_examples)]
3use serde::Serialize;
108use std::{
109 collections::HashMap,
110 error::Error,
111 time::{SystemTime, UNIX_EPOCH},
112};
113
114use log::{
115 kv::{Source, Visitor},
116 LevelFilter, Metadata, Record, SetLoggerError,
117};
118
119pub use log;
121
122#[derive(Serialize)]
123struct LokiStream {
124 stream: HashMap<String, String>,
125 values: Vec<[String; 2]>,
126}
127
128#[derive(Serialize)]
129struct LokiRequest {
130 streams: Vec<LokiStream>,
131}
132
133#[cfg(not(feature = "blocking"))]
134struct LokiLogger {
135 url: String,
136 initial_labels: Option<HashMap<String, String>>,
137 client: reqwest::Client,
138}
139
140#[cfg(feature = "blocking")]
141struct LokiLogger {
142 url: String,
143 initial_labels: Option<HashMap<String, String>>,
144 client: reqwest::blocking::Client,
145}
146
147fn init_inner<S: AsRef<str>>(
148 url: S,
149 max_log_level: LevelFilter,
150 initial_labels: Option<HashMap<String, String>>,
151) -> Result<(), SetLoggerError> {
152 let logger = Box::new(LokiLogger::new(url, initial_labels));
153 log::set_boxed_logger(logger).map(|()| log::set_max_level(max_log_level))
154}
155
156pub fn init<S: AsRef<str>>(url: S, max_log_level: LevelFilter) -> Result<(), SetLoggerError> {
181 init_inner(url, max_log_level, None)
182}
183
184pub fn init_with_labels<S: AsRef<str>>(
217 url: S,
218 max_log_level: LevelFilter,
219 initial_labels: HashMap<String, String>,
220) -> Result<(), SetLoggerError> {
221 init_inner(url, max_log_level, Some(initial_labels))
222}
223
224struct LokiVisitor<'kvs> {
225 values: HashMap<log::kv::Key<'kvs>, log::kv::Value<'kvs>>,
226}
227
228impl<'kvs> LokiVisitor<'kvs> {
229 pub fn new(count: usize) -> Self {
230 Self {
231 values: HashMap::with_capacity(count),
232 }
233 }
234
235 pub fn read_kv(
236 &'kvs mut self,
237 source: &'kvs dyn Source,
238 ) -> Result<&HashMap<log::kv::Key<'kvs>, log::kv::Value<'kvs>>, log::kv::Error> {
239 for _ in 0..source.count() {
240 source.visit(self)?;
241 }
242 Ok(&self.values)
243 }
244}
245
246impl<'kvs> Visitor<'kvs> for LokiVisitor<'kvs> {
247 fn visit_pair(
248 &mut self,
249 key: log::kv::Key<'kvs>,
250 value: log::kv::Value<'kvs>,
251 ) -> Result<(), log::kv::Error> {
252 self.values.insert(key, value);
253 Ok(())
254 }
255}
256
257impl log::Log for LokiLogger {
258 fn enabled(&self, _: &Metadata) -> bool {
259 true
260 }
261
262 fn log(&self, record: &Record) {
263 if self.enabled(record.metadata()) {
264 if let Err(e) = self.log_event_record(record) {
265 eprintln!("Impossible to log event to loki: {:?}", e)
266 }
267 }
268 }
269
270 fn flush(&self) {}
271}
272
273impl LokiLogger {
274 #[cfg(not(feature = "blocking"))]
275 fn new<S: AsRef<str>>(url: S, initial_labels: Option<HashMap<String, String>>) -> Self {
276 Self {
277 url: url.as_ref().to_string(),
278 initial_labels,
279 client: reqwest::Client::new(),
280 }
281 }
282
283 #[cfg(feature = "blocking")]
284 fn new<S: AsRef<str>>(url: S, initial_labels: Option<HashMap<String, String>>) -> Self {
285 Self {
286 url: url.as_ref().to_string(),
287 initial_labels,
288 client: reqwest::blocking::Client::new(),
289 }
290 }
291
292 #[cfg(not(feature = "blocking"))]
293 fn log_to_loki(
294 &self,
295 message: String,
296 labels: HashMap<String, String>,
297 ) -> Result<(), Box<dyn Error>> {
298 let client = self.client.clone();
299 let url = self.url.clone();
300
301 let loki_request = make_request(message, labels)?;
302 tokio::spawn(async move {
303 if let Err(e) = client.post(url).json(&loki_request).send().await {
304 eprintln!("{:?}", e);
305 };
306 });
307 Ok(())
308 }
309
310 #[cfg(feature = "blocking")]
311 fn log_to_loki(
312 &self,
313 message: String,
314 labels: HashMap<String, String>,
315 ) -> Result<(), Box<dyn Error>> {
316 let url = self.url.clone();
317
318 let loki_request = make_request(message, labels)?;
319 self.client.post(url).json(&loki_request).send()?;
320 Ok(())
321 }
322
323 fn merge_loki_labels(
324 &self,
325 kv_labels: &HashMap<log::kv::Key, log::kv::Value>,
326 ) -> HashMap<String, String> {
327 merge_labels(self.initial_labels.as_ref(), kv_labels)
328 }
329
330 fn log_event_record(&self, record: &Record) -> Result<(), Box<dyn Error>> {
331 let kv = record.key_values();
332 let mut visitor = LokiVisitor::new(kv.count());
333 let values = visitor.read_kv(kv)?;
334 let message = format!("{:?}", record.args());
335 let mut labels = self.merge_loki_labels(values);
336 labels.insert(
337 "level".to_string(),
338 record.level().to_string().to_ascii_lowercase(),
339 );
340 self.log_to_loki(message, labels)
341 }
342}
343
344fn merge_labels(
345 initial_labels: Option<&HashMap<String, String>>,
346 kv_labels: &HashMap<log::kv::Key, log::kv::Value>,
347) -> HashMap<String, String> {
348 let mut labels = if let Some(initial_labels) = initial_labels {
349 initial_labels.clone()
350 } else {
351 HashMap::with_capacity(kv_labels.len())
352 };
353 labels.extend(
354 kv_labels
355 .iter()
356 .map(|(key, value)| (key.to_string(), value.to_string())),
357 );
358 labels
359}
360
361fn make_request(
362 message: String,
363 labels: HashMap<String, String>,
364) -> Result<LokiRequest, Box<dyn Error>> {
365 let start = SystemTime::now();
366 let time_ns = time_offset_since(start)?;
367 let loki_request = LokiRequest {
368 streams: vec![LokiStream {
369 stream: labels,
370 values: vec![[time_ns, message]],
371 }],
372 };
373 Ok(loki_request)
374}
375
376fn time_offset_since(start: SystemTime) -> Result<String, Box<dyn Error>> {
377 let since_start = start.duration_since(UNIX_EPOCH)?;
378 let time_ns = since_start.as_nanos().to_string();
379 Ok(time_ns)
380}
381
382#[cfg(test)]
383mod tests {
384 use log::kv::{Key, Value};
385
386 use crate::{merge_labels, time_offset_since};
387 use std::{
388 collections::HashMap,
389 time::{Duration, SystemTime},
390 };
391
392 #[test]
393 fn time_offsets() {
394 let t1 = time_offset_since(SystemTime::now());
395 assert!(t1.is_ok());
396
397 let negative_time = SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(1));
399
400 assert!(negative_time.is_some());
401
402 let t2 = time_offset_since(negative_time.unwrap());
403 assert!(t2.is_err());
404 }
405
406 #[test]
407 fn merge_no_initial_labels() {
408 let kv_labels = HashMap::new();
409 let merged_labels = merge_labels(None, &kv_labels);
410
411 assert_eq!(merged_labels, HashMap::new());
412
413 let kv_labels = [
414 (Key::from_str("application"), Value::from("loki_logger")),
415 (Key::from_str("environment"), Value::from("development")),
416 ]
417 .into_iter()
418 .collect::<HashMap<_, _>>();
419 let merged_labels = merge_labels(None, &kv_labels);
420
421 assert_eq!(
422 merged_labels,
423 [
424 ("application".to_string(), "loki_logger".to_string()),
425 ("environment".to_string(), "development".to_string())
426 ]
427 .into_iter()
428 .collect::<HashMap<_, _>>()
429 );
430 }
431
432 #[test]
433 fn merge_initial_labels() {
434 let kv_labels = HashMap::new();
435 let initial_labels = HashMap::new();
436 let merged_labels = merge_labels(Some(&initial_labels), &kv_labels);
437
438 assert_eq!(merged_labels, HashMap::new());
439
440 let kv_labels = HashMap::new();
441 let initial_labels = [
442 ("application".to_string(), "loki_logger".to_string()),
443 ("environment".to_string(), "development".to_string()),
444 ]
445 .into_iter()
446 .collect::<HashMap<_, _>>();
447 let merged_labels = merge_labels(Some(&initial_labels), &kv_labels);
448
449 assert_eq!(merged_labels, initial_labels);
450
451 let initial_labels = [
452 ("application".to_string(), "loki_logger".to_string()),
453 ("environment".to_string(), "development".to_string()),
454 ]
455 .into_iter()
456 .collect::<HashMap<_, _>>();
457 let kv_labels = [
458 (Key::from_str("event_name"), Value::from("request")),
459 (Key::from_str("handler"), Value::from("/loki/api/v1/push")),
460 ]
461 .into_iter()
462 .collect::<HashMap<_, _>>();
463 let merged_labels = merge_labels(Some(&initial_labels), &kv_labels);
464
465 assert_eq!(
466 merged_labels,
467 [
468 ("application".to_string(), "loki_logger".to_string()),
469 ("environment".to_string(), "development".to_string()),
470 ("event_name".to_string(), "request".to_string()),
471 ("handler".to_string(), "/loki/api/v1/push".to_string())
472 ]
473 .into_iter()
474 .collect::<HashMap<_, _>>()
475 );
476 }
477
478 #[test]
479 fn merge_overwrite_labels() {
480 let initial_labels = [
481 ("application".to_string(), "loki_logger".to_string()),
482 ("environment".to_string(), "development".to_string()),
483 ]
484 .into_iter()
485 .collect::<HashMap<_, _>>();
486 let kv_labels = [
487 (Key::from_str("event_name"), Value::from("request")),
488 (Key::from_str("environment"), Value::from("production")),
489 ]
490 .into_iter()
491 .collect::<HashMap<_, _>>();
492 let merged_labels = merge_labels(Some(&initial_labels), &kv_labels);
493
494 assert_eq!(
495 merged_labels,
496 [
497 ("application".to_string(), "loki_logger".to_string()),
498 ("environment".to_string(), "production".to_string()),
499 ("event_name".to_string(), "request".to_string()),
500 ]
501 .into_iter()
502 .collect::<HashMap<_, _>>()
503 );
504 }
505}