tracing_layer_axiom/
lib.rs1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4
5use axiom_rs::Client;
6use chrono::Utc;
7use serde_json::Value;
8use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
9use tracing::field::Field;
10use tracing::Subscriber;
11use tracing_subscriber::Layer;
12use typed_builder::TypedBuilder;
13
14const MAX_RETRIES: usize = 10;
16
17#[derive(TypedBuilder)]
18pub struct ConfigBuilder {
19 pub token: String,
20 pub org_id: String,
21 pub dataset: String,
22 pub application: String,
23 pub environment: String,
24}
25
26impl ConfigBuilder {
27 pub fn into_layer(self) -> AxiomLoggingLayer {
28 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
29 let client = Arc::new(
30 Client::builder()
31 .with_token(self.token)
32 .with_org_id(self.org_id)
33 .build()
34 .unwrap(),
35 );
36 tokio::spawn(axiom_backend_worker(
37 rx,
38 client.clone(),
39 self.dataset.clone(),
40 ));
41 AxiomLoggingLayer {
42 application: self.application,
43 environment: self.environment,
44 tx,
45 }
46 }
47}
48
49pub(crate) async fn axiom_backend_worker(
50 mut rx: UnboundedReceiver<LogEvent>,
51 client: Arc<Client>,
52 dataset: String,
53) {
54 let mut buf = Vec::with_capacity(10);
55
56 while rx.recv_many(&mut buf, 10).await > 0 {
57 let mut retries = 0;
58 while retries < MAX_RETRIES {
59 let res = client.ingest(dataset.clone(), &buf).await;
60 if let Err(e) = res {
61 retries += 1;
62 println!("fail to send logs to axiom: {}", e);
63 } else {
64 break;
65 }
66 }
67
68 buf.clear();
69 }
70}
71#[derive(Debug)]
72pub struct AxiomLoggingLayer {
73 application: String,
74 environment: String,
75 tx: UnboundedSender<LogEvent>,
76}
77
78impl<S> Layer<S> for AxiomLoggingLayer
79where
80 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
81{
82 fn on_event(
83 &self,
84 event: &tracing::Event<'_>,
85 _ctx: tracing_subscriber::layer::Context<'_, S>,
86 ) {
87 let mut visitor = JsonVisitor::default();
88 event.record(&mut visitor);
89
90 let log_event = LogEvent {
91 _time: Utc::now().timestamp_millis(),
92 application: self.application.to_owned(),
93 environment: self.environment.to_owned(),
94 level: event.metadata().level().to_string(),
95 target: visitor
96 .log_target
97 .map(|it| it.to_owned())
98 .unwrap_or_else(|| event.metadata().target().to_string()),
99 message: visitor.message.unwrap_or_default(),
100 fields: serde_json::to_value(visitor.fields)
101 .expect("cannot serde a hashmap, it's a bug"),
102 };
103
104 if let Err(e) = self.tx.send(log_event) {
105 tracing::error!(err=%e, "fail to send log event to given channel");
106 }
107 }
108}
109
110#[derive(Default)]
111pub struct JsonVisitor<'a> {
112 log_target: Option<String>,
113 message: Option<String>,
114 fields: HashMap<&'a str, serde_json::Value>,
115}
116
117impl<'a> tracing::field::Visit for JsonVisitor<'a> {
118 fn record_f64(&mut self, field: &Field, value: f64) {
119 self.record_value(field.name(), Value::from(value));
120 }
121
122 fn record_i64(&mut self, field: &Field, value: i64) {
124 self.record_value(field.name(), Value::from(value));
125 }
126
127 fn record_u64(&mut self, field: &Field, value: u64) {
129 self.record_value(field.name(), Value::from(value));
130 }
131
132 fn record_bool(&mut self, field: &Field, value: bool) {
134 self.record_value(field.name(), Value::from(value));
135 }
136
137 fn record_str(&mut self, field: &Field, value: &str) {
139 let field_name = field.name();
140 match field_name {
141 "log.target" => {
142 self.log_target = Some(value.to_owned());
143 }
144 "message" => {
145 self.message = Some(value.to_owned());
146 }
147 n if n.starts_with("log.") => {}
148 n => {
149 self.record_value(n, Value::from(value));
150 }
151 }
152 }
153
154 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
155 self.record_value(
156 field.name(),
157 serde_json::Value::from(format!("{:?}", value)),
158 );
159 }
160}
161
162impl<'a> JsonVisitor<'a> {
163 fn record_value(&mut self, name: &'a str, value: Value) {
164 match name {
165 "message" => {
166 self.message = value.as_str().map(|it| it.to_owned());
167 }
168 n if n.starts_with("r#") => {
169 self.fields.insert(&n[2..], value);
170 }
171 n => {
172 self.fields.insert(n, value);
173 }
174 }
175 }
176}
177
178#[derive(serde::Serialize, Debug)]
179pub struct LogEvent {
180 _time: i64,
181 application: String,
182 environment: String,
183 level: String,
184 target: String,
185 message: String,
186 fields: Value,
187}