tracing_layer_axiom/
lib.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4
5use axiom_rs::Client;
6use chrono::Utc;
7use serde_json::Value;
8use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
9use tracing::field::Field;
10use tracing::Subscriber;
11use tracing_subscriber::Layer;
12use typed_builder::TypedBuilder;
13
14/// maxium retry times for sending
15const MAX_RETRIES: usize = 10;
16
17#[derive(TypedBuilder)]
18pub struct ConfigBuilder {
19    pub token: String,
20    pub org_id: String,
21    pub dataset: String,
22    pub application: String,
23    pub environment: String,
24}
25
26impl ConfigBuilder {
27    pub fn into_layer(self) -> AxiomLoggingLayer {
28        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
29        let client = Arc::new(
30            Client::builder()
31                .with_token(self.token)
32                .with_org_id(self.org_id)
33                .build()
34                .unwrap(),
35        );
36        tokio::spawn(axiom_backend_worker(
37            rx,
38            client.clone(),
39            self.dataset.clone(),
40        ));
41        AxiomLoggingLayer {
42            application: self.application,
43            environment: self.environment,
44            tx,
45        }
46    }
47}
48
49pub(crate) async fn axiom_backend_worker(
50    mut rx: UnboundedReceiver<LogEvent>,
51    client: Arc<Client>,
52    dataset: String,
53) {
54    let mut buf = Vec::with_capacity(10);
55
56    while rx.recv_many(&mut buf, 10).await > 0 {
57        let mut retries = 0;
58        while retries < MAX_RETRIES {
59            let res = client.ingest(dataset.clone(), &buf).await;
60            if let Err(e) = res {
61                retries += 1;
62                println!("fail to send logs to axiom: {}", e);
63            } else {
64                break;
65            }
66        }
67
68        buf.clear();
69    }
70}
71#[derive(Debug)]
72pub struct AxiomLoggingLayer {
73    application: String,
74    environment: String,
75    tx: UnboundedSender<LogEvent>,
76}
77
78impl<S> Layer<S> for AxiomLoggingLayer
79where
80    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
81{
82    fn on_event(
83        &self,
84        event: &tracing::Event<'_>,
85        _ctx: tracing_subscriber::layer::Context<'_, S>,
86    ) {
87        let mut visitor = JsonVisitor::default();
88        event.record(&mut visitor);
89
90        let log_event = LogEvent {
91            _time: Utc::now().timestamp_millis(),
92            application: self.application.to_owned(),
93            environment: self.environment.to_owned(),
94            level: event.metadata().level().to_string(),
95            target: visitor
96                .log_target
97                .map(|it| it.to_owned())
98                .unwrap_or_else(|| event.metadata().target().to_string()),
99            message: visitor.message.unwrap_or_default(),
100            fields: serde_json::to_value(visitor.fields)
101                .expect("cannot serde a hashmap, it's a bug"),
102        };
103
104        if let Err(e) = self.tx.send(log_event) {
105            tracing::error!(err=%e, "fail to send log event to given channel");
106        }
107    }
108}
109
110#[derive(Default)]
111pub struct JsonVisitor<'a> {
112    log_target: Option<String>,
113    message: Option<String>,
114    fields: HashMap<&'a str, serde_json::Value>,
115}
116
117impl<'a> tracing::field::Visit for JsonVisitor<'a> {
118    fn record_f64(&mut self, field: &Field, value: f64) {
119        self.record_value(field.name(), Value::from(value));
120    }
121
122    /// Visit a signed 64-bit integer value.
123    fn record_i64(&mut self, field: &Field, value: i64) {
124        self.record_value(field.name(), Value::from(value));
125    }
126
127    /// Visit an unsigned 64-bit integer value.
128    fn record_u64(&mut self, field: &Field, value: u64) {
129        self.record_value(field.name(), Value::from(value));
130    }
131
132    /// Visit a boolean value.
133    fn record_bool(&mut self, field: &Field, value: bool) {
134        self.record_value(field.name(), Value::from(value));
135    }
136
137    /// Visit a string value.
138    fn record_str(&mut self, field: &Field, value: &str) {
139        let field_name = field.name();
140        match field_name {
141            "log.target" => {
142                self.log_target = Some(value.to_owned());
143            }
144            "message" => {
145                self.message = Some(value.to_owned());
146            }
147            n if n.starts_with("log.") => {}
148            n => {
149                self.record_value(n, Value::from(value));
150            }
151        }
152    }
153
154    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
155        self.record_value(
156            field.name(),
157            serde_json::Value::from(format!("{:?}", value)),
158        );
159    }
160}
161
162impl<'a> JsonVisitor<'a> {
163    fn record_value(&mut self, name: &'a str, value: Value) {
164        match name {
165            "message" => {
166                self.message = value.as_str().map(|it| it.to_owned());
167            }
168            n if n.starts_with("r#") => {
169                self.fields.insert(&n[2..], value);
170            }
171            n => {
172                self.fields.insert(n, value);
173            }
174        }
175    }
176}
177
178#[derive(serde::Serialize, Debug)]
179pub struct LogEvent {
180    _time: i64,
181    application: String,
182    environment: String,
183    level: String,
184    target: String,
185    message: String,
186    fields: Value,
187}