1use crate::forwarder::{ForwarderHandle, TraceForwarder};
7use crate::protocol::{DetailLevel, Severity, TraceObject};
8use chrono::Utc;
9use std::sync::Arc;
10use tracing::Level;
11use tracing::field::{Field, Visit};
12use tracing_subscriber::layer::{Context, SubscriberExt};
13use tracing_subscriber::{Layer, Registry};
14
15struct JsonVisitor(serde_json::Map<String, serde_json::Value>);
17
18impl JsonVisitor {
19 fn new() -> Self {
20 Self(serde_json::Map::new())
21 }
22}
23
24impl Visit for JsonVisitor {
25 fn record_f64(&mut self, field: &Field, value: f64) {
26 self.0
27 .insert(field.name().to_string(), serde_json::json!(value));
28 }
29 fn record_i64(&mut self, field: &Field, value: i64) {
30 self.0
31 .insert(field.name().to_string(), serde_json::json!(value));
32 }
33 fn record_u64(&mut self, field: &Field, value: u64) {
34 self.0
35 .insert(field.name().to_string(), serde_json::json!(value));
36 }
37 fn record_bool(&mut self, field: &Field, value: bool) {
38 self.0
39 .insert(field.name().to_string(), serde_json::json!(value));
40 }
41 fn record_str(&mut self, field: &Field, value: &str) {
42 self.0
43 .insert(field.name().to_string(), serde_json::json!(value));
44 }
45 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
46 self.0.insert(
47 field.name().to_string(),
48 serde_json::json!(format!("{value:?}")),
49 );
50 }
51 fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
52 self.0.insert(
53 field.name().to_string(),
54 serde_json::json!(value.to_string()),
55 );
56 }
57}
58
59pub struct TracerBuilder {
61 forwarder: TraceForwarder,
62 hostname: String,
63 namespace_prefix: Vec<String>,
64}
65
66impl TracerBuilder {
67 pub fn new(forwarder: TraceForwarder) -> Self {
69 let hostname = hostname::get()
70 .ok()
71 .and_then(|h| h.into_string().ok())
72 .unwrap_or_else(|| "unknown".to_string());
73
74 Self {
75 forwarder,
76 hostname,
77 namespace_prefix: Vec::new(),
78 }
79 }
80
81 pub fn with_hostname(mut self, hostname: String) -> Self {
83 self.hostname = hostname;
84 self
85 }
86
87 pub fn with_namespace_prefix(mut self, prefix: Vec<String>) -> Self {
89 self.namespace_prefix = prefix;
90 self
91 }
92
93 pub fn build(self) -> (TraceForwarderLayer, tokio::task::JoinHandle<()>) {
99 let handle = self.forwarder.handle();
100 let layer = TraceForwarderLayer {
101 handle: handle.clone(),
102 hostname: Arc::new(self.hostname),
103 namespace_prefix: Arc::new(self.namespace_prefix),
104 };
105
106 let forwarder_handle = tokio::spawn(async move {
107 if let Err(e) = self.forwarder.run().await {
108 tracing::error!("Forwarder error: {}", e);
109 }
110 });
111
112 (layer, forwarder_handle)
113 }
114}
115
116pub struct TraceForwarderLayer {
118 handle: ForwarderHandle,
119 hostname: Arc<String>,
120 namespace_prefix: Arc<Vec<String>>,
121}
122
123impl TraceForwarderLayer {
124 fn level_to_severity(level: &Level) -> Severity {
126 match *level {
127 Level::TRACE => Severity::Debug,
128 Level::DEBUG => Severity::Debug,
129 Level::INFO => Severity::Info,
130 Level::WARN => Severity::Warning,
131 Level::ERROR => Severity::Error,
132 }
133 }
134
135 fn extract_namespace(&self, meta: &tracing::Metadata<'_>) -> Vec<String> {
140 let mut namespace = self.namespace_prefix.as_ref().clone();
141 let target = meta.target();
142 let segments: Vec<String> = if target.contains("::") {
143 target.split("::").map(|s| s.to_string()).collect()
144 } else {
145 target
146 .split('.')
147 .filter(|s| !s.is_empty())
148 .map(|s| s.to_string())
149 .collect()
150 };
151 namespace.extend(segments);
152 namespace
153 }
154}
155
156impl<S> Layer<S> for TraceForwarderLayer
157where
158 S: tracing::Subscriber,
159{
160 fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
161 let metadata = event.metadata();
162
163 let mut visitor = JsonVisitor::new();
165 event.record(&mut visitor);
166
167 let human = visitor
169 .0
170 .get("message")
171 .and_then(|v| v.as_str())
172 .map(|s| s.to_string());
173
174 let to_machine = serde_json::to_string(&visitor.0).unwrap_or_else(|_| "{}".to_string());
175
176 let trace_obj = TraceObject {
177 to_human: human,
178 to_machine,
179 to_namespace: self.extract_namespace(metadata),
180 to_severity: Self::level_to_severity(metadata.level()),
181 to_details: DetailLevel::DNormal,
182 to_timestamp: Utc::now(),
183 to_hostname: self.hostname.to_string(),
184 to_thread_id: format!("{:?}", std::thread::current().id()),
185 };
186
187 let _ = self.handle.try_send(trace_obj);
189 }
190}
191
192pub fn init_tracing_with_forwarder(
194 forwarder: TraceForwarder,
195) -> (impl tracing::Subscriber, tokio::task::JoinHandle<()>) {
196 let builder = TracerBuilder::new(forwarder);
197 let (layer, handle) = builder.build();
198
199 let subscriber = Registry::default().with(layer);
200
201 (subscriber, handle)
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use crate::forwarder::ForwarderConfig;
208
209 #[test]
210 fn test_level_to_severity() {
211 assert_eq!(
212 TraceForwarderLayer::level_to_severity(&Level::INFO),
213 Severity::Info
214 );
215 assert_eq!(
216 TraceForwarderLayer::level_to_severity(&Level::ERROR),
217 Severity::Error
218 );
219 }
220
221 #[test]
222 fn test_tracer_builder() {
223 let config = ForwarderConfig::default();
224 let forwarder = TraceForwarder::new(config);
225 let builder = TracerBuilder::new(forwarder);
226
227 assert!(!builder.hostname.is_empty());
228 assert_eq!(builder.namespace_prefix.len(), 0);
229 }
230}