1#[cfg(test)]
55mod tests;
56
57use std::cell::Cell;
58use std::io::{Read, Write};
59use std::net::TcpStream;
60use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
61use std::sync::{Mutex, OnceLock};
62use std::time::{Duration, SystemTime, UNIX_EPOCH};
63
64use crate::application::Application;
65use crate::middleware::Middleware;
66use crate::request::Request;
67use crate::response::Response;
68use crate::server::ConnectionInfo;
69
70static COUNTER: AtomicU64 = AtomicU64::new(1);
73static START_SECS: OnceLock<u64> = OnceLock::new();
74
75fn start_secs() -> u64 {
76 *START_SECS.get_or_init(|| {
77 SystemTime::now()
78 .duration_since(UNIX_EPOCH)
79 .unwrap_or_default()
80 .as_secs()
81 })
82}
83
84pub fn new_trace_id() -> [u8; 16] {
87 let start = start_secs();
88 let seq = COUNTER.fetch_add(2, Ordering::Relaxed);
89 let mut id = [0u8; 16];
90 id[..8].copy_from_slice(&start.to_be_bytes());
91 id[8..].copy_from_slice(&seq.to_be_bytes());
92 id
93}
94
95pub fn new_span_id() -> [u8; 8] {
97 let seq = COUNTER.fetch_add(2, Ordering::Relaxed) + 1;
98 seq.to_be_bytes()
99}
100
101fn hex16(b: &[u8; 16]) -> String {
102 b.iter().map(|x| format!("{:02x}", x)).collect()
103}
104
105fn hex8(b: &[u8; 8]) -> String {
106 b.iter().map(|x| format!("{:02x}", x)).collect()
107}
108
109#[derive(Copy, Clone, Debug, PartialEq, Eq)]
115pub struct TraceContext {
116 pub trace_id: [u8; 16],
117 pub parent_span_id: [u8; 8],
118 pub sampled: bool,
119}
120
121impl TraceContext {
122 pub fn parse(header: &str) -> Option<Self> {
124 let parts: Vec<&str> = header.trim().splitn(4, '-').collect();
125 if parts.len() != 4 || parts[0] != "00" {
126 return None;
127 }
128 let trace_id = parse_hex16(parts[1])?;
129 let parent_span_id = parse_hex8(parts[2])?;
130 let flags = u8::from_str_radix(parts[3], 16).ok()?;
131 Some(TraceContext { trace_id, parent_span_id, sampled: flags & 0x01 != 0 })
132 }
133
134 pub fn as_header(&self, span_id: &[u8; 8]) -> String {
136 format!("00-{}-{}-{:02x}", hex16(&self.trace_id), hex8(span_id), self.sampled as u8)
137 }
138}
139
140fn parse_hex16(s: &str) -> Option<[u8; 16]> {
141 if s.len() != 32 { return None; }
142 let mut out = [0u8; 16];
143 for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
144 out[i] = u8::from_str_radix(std::str::from_utf8(chunk).ok()?, 16).ok()?;
145 }
146 Some(out)
147}
148
149fn parse_hex8(s: &str) -> Option<[u8; 8]> {
150 if s.len() != 16 { return None; }
151 let mut out = [0u8; 8];
152 for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
153 out[i] = u8::from_str_radix(std::str::from_utf8(chunk).ok()?, 16).ok()?;
154 }
155 Some(out)
156}
157
158#[derive(Copy, Clone)]
162struct ActiveSpan {
163 trace_id: [u8; 16],
164 span_id: [u8; 8],
165 sampled: bool,
166}
167
168thread_local! {
169 static ACTIVE: Cell<Option<ActiveSpan>> = Cell::new(None);
170}
171
172pub fn current_traceparent() -> Option<String> {
178 ACTIVE.with(|cell| {
179 cell.get().map(|s| {
180 format!(
181 "00-{}-{}-{:02x}",
182 hex16(&s.trace_id),
183 hex8(&s.span_id),
184 s.sampled as u8,
185 )
186 })
187 })
188}
189
190#[derive(Debug, Clone)]
194pub struct SpanData {
195 pub trace_id: [u8; 16],
196 pub span_id: [u8; 8],
197 pub parent_span_id: Option<[u8; 8]>,
198 pub name: String,
200 pub start_ns: u64,
201 pub end_ns: u64,
202 pub http_method: String,
203 pub http_target: String,
204 pub http_status: i16,
205 pub status_code: u8,
207}
208
209impl SpanData {
210 fn duration_ms(&self) -> f64 {
211 (self.end_ns.saturating_sub(self.start_ns)) as f64 / 1_000_000.0
212 }
213}
214
215fn now_ns() -> u64 {
216 SystemTime::now()
217 .duration_since(UNIX_EPOCH)
218 .unwrap_or_default()
219 .as_nanos() as u64
220}
221
222fn strip_query(uri: &str) -> &str {
223 match uri.find('?') {
224 Some(i) => &uri[..i],
225 None => uri,
226 }
227}
228
229pub trait Exporter: Send + Sync {
233 fn export(&self, spans: &[SpanData]);
234 fn shutdown(&self) {}
235}
236
237pub struct StdoutExporter;
240
241impl Exporter for StdoutExporter {
242 fn export(&self, spans: &[SpanData]) {
243 for span in spans {
244 println!(
245 "{{\"traceId\":\"{}\",\"spanId\":\"{}\",\"parentSpanId\":{},\
246 \"name\":\"{}\",\"startNs\":{},\"durationMs\":{:.3},\
247 \"httpMethod\":\"{}\",\"httpTarget\":\"{}\",\"httpStatus\":{}}}",
248 hex16(&span.trace_id),
249 hex8(&span.span_id),
250 span.parent_span_id
251 .as_ref()
252 .map(|p| format!("\"{}\"", hex8(p)))
253 .unwrap_or_else(|| "null".to_string()),
254 span.name,
255 span.start_ns,
256 span.duration_ms(),
257 span.http_method,
258 span.http_target,
259 span.http_status,
260 );
261 }
262 }
263}
264
265pub struct OtlpHttpExporter {
270 host: String,
271 port: u16,
272 timeout: Duration,
273 service_name: String,
274 service_version: String,
275}
276
277impl OtlpHttpExporter {
278 pub fn new(endpoint: &str, service_name: &str, service_version: &str) -> Self {
279 let stripped = endpoint
281 .trim_start_matches("http://")
282 .trim_start_matches("https://");
283 let (host, port) = if let Some(i) = stripped.rfind(':') {
284 let p = stripped[i + 1..].parse().unwrap_or(4318);
285 (stripped[..i].to_string(), p)
286 } else {
287 (stripped.to_string(), 4318)
288 };
289 OtlpHttpExporter {
290 host,
291 port,
292 timeout: Duration::from_secs(5),
293 service_name: service_name.to_string(),
294 service_version: service_version.to_string(),
295 }
296 }
297
298 pub fn build_body(&self, spans: &[SpanData]) -> String {
299 let span_jsons: Vec<String> = spans.iter().map(|s| {
300 let parent = s.parent_span_id
301 .as_ref()
302 .map(|p| format!(",\"parentSpanId\":\"{}\"", hex8(p)))
303 .unwrap_or_default();
304 let status_msg = if s.status_code == 2 { "Error" } else { "Unset" };
305 format!(
306 "{{\"traceId\":\"{trace}\",\"spanId\":\"{span}\"{parent},\
307 \"name\":\"{name}\",\"kind\":2,\
308 \"startTimeUnixNano\":\"{start}\",\"endTimeUnixNano\":\"{end}\",\
309 \"attributes\":[\
310 {{\"key\":\"http.method\",\"value\":{{\"stringValue\":\"{method}\"}} }},\
311 {{\"key\":\"http.target\",\"value\":{{\"stringValue\":\"{target}\"}} }},\
312 {{\"key\":\"http.status_code\",\"value\":{{\"intValue\":{status}}} }}\
313 ],\
314 \"status\":{{\"code\":{scode},\"message\":\"{smsg}\"}} }}",
315 trace = hex16(&s.trace_id),
316 span = hex8(&s.span_id),
317 name = s.name,
318 start = s.start_ns,
319 end = s.end_ns,
320 method = s.http_method,
321 target = s.http_target,
322 status = s.http_status,
323 scode = s.status_code,
324 smsg = status_msg,
325 )
326 }).collect();
327
328 format!(
329 "{{\"resourceSpans\":[{{\"resource\":{{\"attributes\":[\
330 {{\"key\":\"service.name\",\"value\":{{\"stringValue\":\"{svc}\"}} }},\
331 {{\"key\":\"service.version\",\"value\":{{\"stringValue\":\"{ver}\"}} }}\
332 ]}},\"scopeSpans\":[{{\"scope\":{{\"name\":\"rws\"}},\"spans\":[{spans}]}}]}}]}}",
333 svc = self.service_name,
334 ver = self.service_version,
335 spans = span_jsons.join(","),
336 )
337 }
338
339 fn post(&self, body: &str) {
340 use std::net::ToSocketAddrs;
341 let addr = format!("{}:{}", self.host, self.port);
342 let Some(socket_addr) = addr.to_socket_addrs().ok().and_then(|mut i| i.next()) else {
343 return;
344 };
345 let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, self.timeout) else {
346 return;
347 };
348 let _ = stream.set_write_timeout(Some(self.timeout));
349 let _ = stream.set_read_timeout(Some(self.timeout));
350 let request = format!(
351 "POST /v1/traces HTTP/1.1\r\n\
352 Host: {host}:{port}\r\n\
353 Content-Type: application/json\r\n\
354 Content-Length: {len}\r\n\
355 Connection: close\r\n\r\n\
356 {body}",
357 host = self.host,
358 port = self.port,
359 len = body.len(),
360 body = body,
361 );
362 if stream.write_all(request.as_bytes()).is_ok() {
363 let mut _buf = [0u8; 256];
364 let _ = stream.read(&mut _buf); }
366 }
367}
368
369impl Exporter for OtlpHttpExporter {
370 fn export(&self, spans: &[SpanData]) {
371 if spans.is_empty() { return; }
372 let body = self.build_body(spans);
373 self.post(&body);
374 }
375}
376
377struct GlobalTracer {
380 exporter: Box<dyn Exporter>,
381 batch: Mutex<Vec<SpanData>>,
382 batch_size: usize,
383 sample_rate: f64,
384 shutdown_flag: AtomicBool,
385}
386
387impl GlobalTracer {
388 fn should_sample(&self) -> bool {
389 if self.sample_rate >= 1.0 { return true; }
390 if self.sample_rate <= 0.0 { return false; }
391 let n = COUNTER.load(Ordering::Relaxed);
393 (n % 10000) < (self.sample_rate * 10000.0) as u64
394 }
395
396 fn record(&self, span: SpanData) {
397 let mut guard = self.batch.lock().unwrap();
398 guard.push(span);
399 if guard.len() >= self.batch_size {
400 let spans = std::mem::take(&mut *guard);
401 drop(guard);
402 self.exporter.export(&spans);
403 }
404 }
405
406 fn flush(&self) {
407 let spans = std::mem::take(&mut *self.batch.lock().unwrap());
408 if !spans.is_empty() {
409 self.exporter.export(&spans);
410 }
411 }
412}
413
414static TRACER: OnceLock<GlobalTracer> = OnceLock::new();
415
416fn tracer() -> Option<&'static GlobalTracer> {
417 TRACER.get()
418}
419
420#[derive(Clone, Debug)]
424pub enum ExporterConfig {
425 Stdout,
427 Otlp { endpoint: String },
432 Discard,
434}
435
436#[derive(Clone, Debug)]
438pub struct TracingConfig {
439 pub service_name: String,
441 pub service_version: String,
443 pub exporter: ExporterConfig,
445 pub sample_rate: f64,
447 pub batch_size: usize,
449}
450
451impl Default for TracingConfig {
452 fn default() -> Self {
453 TracingConfig {
454 service_name: "rws".to_string(),
455 service_version: env!("CARGO_PKG_VERSION").to_string(),
456 exporter: ExporterConfig::Stdout,
457 sample_rate: 1.0,
458 batch_size: 128,
459 }
460 }
461}
462
463pub fn setup(config: TracingConfig) {
468 let exporter: Box<dyn Exporter> = match &config.exporter {
469 ExporterConfig::Stdout => Box::new(StdoutExporter),
470 ExporterConfig::Otlp { endpoint } => Box::new(OtlpHttpExporter::new(
471 endpoint,
472 &config.service_name,
473 &config.service_version,
474 )),
475 ExporterConfig::Discard => Box::new(DiscardExporter),
476 };
477 let _ = TRACER.set(GlobalTracer {
478 exporter,
479 batch: Mutex::new(Vec::new()),
480 batch_size: config.batch_size.max(1),
481 sample_rate: config.sample_rate.clamp(0.0, 1.0),
482 shutdown_flag: AtomicBool::new(false),
483 });
484}
485
486pub fn setup_from_env() {
492 let service_name = std::env::var("OTEL_SERVICE_NAME")
493 .unwrap_or_else(|_| "rws".to_string());
494 let sample_rate: f64 = std::env::var("OTEL_TRACES_SAMPLER_ARG")
495 .ok()
496 .and_then(|v| v.parse().ok())
497 .unwrap_or(1.0);
498 let exporter = match std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok() {
499 Some(ep) if !ep.is_empty() => ExporterConfig::Otlp { endpoint: ep },
500 _ => ExporterConfig::Stdout,
501 };
502 setup(TracingConfig {
503 service_name,
504 service_version: env!("CARGO_PKG_VERSION").to_string(),
505 exporter,
506 sample_rate,
507 batch_size: 128,
508 });
509}
510
511pub fn shutdown() {
514 if let Some(t) = tracer() {
515 t.shutdown_flag.store(true, Ordering::Relaxed);
516 t.flush();
517 t.exporter.shutdown();
518 }
519}
520
521pub fn flush() {
523 if let Some(t) = tracer() {
524 t.flush();
525 }
526}
527
528pub struct OtelLayer;
542
543impl Middleware for OtelLayer {
544 fn handle(
545 &self,
546 request: &Request,
547 connection: &ConnectionInfo,
548 next: &dyn Application,
549 ) -> Result<Response, String> {
550 let Some(t) = tracer() else {
551 return next.execute(request, connection);
552 };
553
554 let sampled = t.should_sample();
555
556 let incoming = request.headers.iter()
558 .find(|h| h.name.eq_ignore_ascii_case("traceparent"))
559 .and_then(|h| TraceContext::parse(&h.value));
560
561 let trace_id = incoming.map(|c| c.trace_id).unwrap_or_else(new_trace_id);
562 let parent_span_id = incoming.map(|c| c.parent_span_id);
563 let span_id = new_span_id();
564
565 ACTIVE.with(|cell| {
567 cell.set(Some(ActiveSpan { trace_id, span_id, sampled }));
568 });
569
570 let start_ns = now_ns();
571 let result = next.execute(request, connection);
572 let end_ns = now_ns();
573
574 ACTIVE.with(|cell| cell.set(None));
576
577 if sampled {
578 let status = match &result {
579 Ok(r) => r.status_code,
580 Err(_) => 500,
581 };
582 let path = strip_query(&request.request_uri).to_string();
583 t.record(SpanData {
584 trace_id,
585 span_id,
586 parent_span_id,
587 name: format!("{} {}", request.method, path),
588 start_ns,
589 end_ns,
590 http_method: request.method.clone(),
591 http_target: request.request_uri.clone(),
592 http_status: status,
593 status_code: if status >= 500 { 2 } else { 0 },
594 });
595 }
596
597 result
598 }
599}
600
601struct DiscardExporter;
604impl Exporter for DiscardExporter {
605 fn export(&self, _: &[SpanData]) {}
606}
607
608pub struct CapturingExporter {
620 pub spans: Mutex<Vec<SpanData>>,
621}
622
623impl CapturingExporter {
624 pub fn new() -> Self {
625 CapturingExporter { spans: Mutex::new(Vec::new()) }
626 }
627
628 pub fn take(&self) -> Vec<SpanData> {
629 std::mem::take(&mut *self.spans.lock().unwrap())
630 }
631}
632
633impl Default for CapturingExporter {
634 fn default() -> Self { Self::new() }
635}
636
637impl Exporter for CapturingExporter {
638 fn export(&self, spans: &[SpanData]) {
639 self.spans.lock().unwrap().extend_from_slice(spans);
640 }
641}