1#[cfg(test)]
55mod tests;
56
57use std::cell::{Cell, RefCell};
58use std::io::{Read, Write};
59use std::net::TcpStream;
60use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
61use std::sync::{Mutex, OnceLock};
62use std::time::{Duration, SystemTime, UNIX_EPOCH};
63
64use crate::application::Application;
65use crate::middleware::Middleware;
66use crate::request::Request;
67use crate::response::Response;
68use crate::server::ConnectionInfo;
69
70static COUNTER: AtomicU64 = AtomicU64::new(1);
73static START_SECS: OnceLock<u64> = OnceLock::new();
74
75fn start_secs() -> u64 {
76 *START_SECS.get_or_init(|| {
77 SystemTime::now()
78 .duration_since(UNIX_EPOCH)
79 .unwrap_or_default()
80 .as_secs()
81 })
82}
83
84pub fn new_trace_id() -> [u8; 16] {
87 let start = start_secs();
88 let seq = COUNTER.fetch_add(2, Ordering::Relaxed);
89 let mut id = [0u8; 16];
90 id[..8].copy_from_slice(&start.to_be_bytes());
91 id[8..].copy_from_slice(&seq.to_be_bytes());
92 id
93}
94
95pub fn new_span_id() -> [u8; 8] {
97 let seq = COUNTER.fetch_add(2, Ordering::Relaxed) + 1;
98 seq.to_be_bytes()
99}
100
101fn hex16(b: &[u8; 16]) -> String {
102 b.iter().map(|x| format!("{:02x}", x)).collect()
103}
104
105fn hex8(b: &[u8; 8]) -> String {
106 b.iter().map(|x| format!("{:02x}", x)).collect()
107}
108
109#[derive(Copy, Clone, Debug, PartialEq, Eq)]
115pub struct TraceContext {
116 pub trace_id: [u8; 16],
117 pub parent_span_id: [u8; 8],
118 pub sampled: bool,
119}
120
121impl TraceContext {
122 pub fn parse(header: &str) -> Option<Self> {
124 let parts: Vec<&str> = header.trim().splitn(4, '-').collect();
125 if parts.len() != 4 || parts[0] != "00" {
126 return None;
127 }
128 let trace_id = parse_hex16(parts[1])?;
129 let parent_span_id = parse_hex8(parts[2])?;
130 let flags = u8::from_str_radix(parts[3], 16).ok()?;
131 Some(TraceContext { trace_id, parent_span_id, sampled: flags & 0x01 != 0 })
132 }
133
134 pub fn as_header(&self, span_id: &[u8; 8]) -> String {
136 format!("00-{}-{}-{:02x}", hex16(&self.trace_id), hex8(span_id), self.sampled as u8)
137 }
138}
139
140fn parse_hex16(s: &str) -> Option<[u8; 16]> {
141 if s.len() != 32 { return None; }
142 let mut out = [0u8; 16];
143 for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
144 out[i] = u8::from_str_radix(std::str::from_utf8(chunk).ok()?, 16).ok()?;
145 }
146 Some(out)
147}
148
149fn parse_hex8(s: &str) -> Option<[u8; 8]> {
150 if s.len() != 16 { return None; }
151 let mut out = [0u8; 8];
152 for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
153 out[i] = u8::from_str_radix(std::str::from_utf8(chunk).ok()?, 16).ok()?;
154 }
155 Some(out)
156}
157
158#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
165pub enum SpanKind {
166 #[default]
169 Internal = 1,
170 Server = 2,
172 Client = 3,
174}
175
176#[derive(Clone, Debug, PartialEq)]
178pub enum AttributeValue {
179 String(String),
180 Int(i64),
181 Float(f64),
182 Bool(bool),
183}
184
185impl From<&str> for AttributeValue {
186 fn from(s: &str) -> Self { AttributeValue::String(s.to_string()) }
187}
188impl From<String> for AttributeValue {
189 fn from(s: String) -> Self { AttributeValue::String(s) }
190}
191impl From<i64> for AttributeValue {
192 fn from(v: i64) -> Self { AttributeValue::Int(v) }
193}
194impl From<i32> for AttributeValue {
195 fn from(v: i32) -> Self { AttributeValue::Int(v as i64) }
196}
197impl From<u32> for AttributeValue {
198 fn from(v: u32) -> Self { AttributeValue::Int(v as i64) }
199}
200impl From<f64> for AttributeValue {
201 fn from(v: f64) -> Self { AttributeValue::Float(v) }
202}
203impl From<bool> for AttributeValue {
204 fn from(v: bool) -> Self { AttributeValue::Bool(v) }
205}
206
207#[derive(Debug, Clone, Default)]
211pub struct SpanData {
212 pub trace_id: [u8; 16],
213 pub span_id: [u8; 8],
214 pub parent_span_id: Option<[u8; 8]>,
215 pub name: String,
217 pub start_ns: u64,
218 pub end_ns: u64,
219 pub http_method: String,
222 pub http_target: String,
223 pub http_status: i16,
224 pub status_code: u8,
226 pub kind: SpanKind,
227 pub attributes: Vec<(String, AttributeValue)>,
231}
232
233impl SpanData {
234 fn duration_ms(&self) -> f64 {
235 (self.end_ns.saturating_sub(self.start_ns)) as f64 / 1_000_000.0
236 }
237}
238
239fn now_ns() -> u64 {
240 SystemTime::now()
241 .duration_since(UNIX_EPOCH)
242 .unwrap_or_default()
243 .as_nanos() as u64
244}
245
246fn strip_query(uri: &str) -> &str {
247 match uri.find('?') {
248 Some(i) => &uri[..i],
249 None => uri,
250 }
251}
252
253#[derive(Copy, Clone)]
258struct ActiveSpanCtx {
259 trace_id: [u8; 16],
260 span_id: [u8; 8],
261 sampled: bool,
262}
263
264thread_local! {
265 static ACTIVE_STACK: RefCell<Vec<ActiveSpanCtx>> = const { RefCell::new(Vec::new()) };
270}
271
272pub fn current_traceparent() -> Option<String> {
279 ACTIVE_STACK.with(|stack| {
280 stack.borrow().last().map(|s| {
281 format!(
282 "00-{}-{}-{:02x}",
283 hex16(&s.trace_id),
284 hex8(&s.span_id),
285 s.sampled as u8,
286 )
287 })
288 })
289}
290
291pub struct Span {
318 trace_id: [u8; 16],
319 span_id: [u8; 8],
320 parent_span_id: Option<[u8; 8]>,
321 sampled: bool,
322 kind: SpanKind,
323 name: String,
324 start_ns: u64,
325 attributes: RefCell<Vec<(String, AttributeValue)>>,
326 status_code: Cell<u8>,
327 http_method: RefCell<Option<String>>,
328 http_target: RefCell<Option<String>>,
329 http_status: Cell<Option<i16>>,
330 _not_send: std::marker::PhantomData<std::rc::Rc<()>>,
333}
334
335fn new_span(name: &str, kind: SpanKind, incoming: Option<TraceContext>) -> Span {
336 let (trace_id, parent_span_id, sampled) = ACTIVE_STACK.with(|stack| {
337 let stack = stack.borrow();
338 if let Some(top) = stack.last() {
339 (top.trace_id, Some(top.span_id), top.sampled)
344 } else {
345 let sampled = tracer().map(|t| t.should_sample()).unwrap_or(false);
348 match incoming {
349 Some(ctx) => (ctx.trace_id, Some(ctx.parent_span_id), sampled),
350 None => (new_trace_id(), None, sampled),
351 }
352 }
353 });
354 let span_id = new_span_id();
355 ACTIVE_STACK.with(|stack| stack.borrow_mut().push(ActiveSpanCtx { trace_id, span_id, sampled }));
356
357 Span {
358 trace_id,
359 span_id,
360 parent_span_id,
361 sampled,
362 kind,
363 name: name.to_string(),
364 start_ns: now_ns(),
365 attributes: RefCell::new(Vec::new()),
366 status_code: Cell::new(0),
367 http_method: RefCell::new(None),
368 http_target: RefCell::new(None),
369 http_status: Cell::new(None),
370 _not_send: std::marker::PhantomData,
371 }
372}
373
374impl Span {
375 pub fn new(name: &str, kind: SpanKind) -> Span {
378 new_span(name, kind, None)
379 }
380
381 pub(crate) fn start_root(name: &str, kind: SpanKind, incoming: Option<TraceContext>) -> Span {
384 new_span(name, kind, incoming)
385 }
386
387 pub fn set_attribute(&self, key: &str, value: impl Into<AttributeValue>) {
390 self.attributes.borrow_mut().push((key.to_string(), value.into()));
391 }
392
393 pub fn set_error(&self) {
395 self.status_code.set(2);
396 }
397
398 pub fn record_error(&self, message: &str) {
400 self.set_error();
401 self.set_attribute("error.message", message);
402 }
403
404 pub fn trace_id(&self) -> [u8; 16] {
405 self.trace_id
406 }
407
408 pub fn span_id(&self) -> [u8; 8] {
409 self.span_id
410 }
411
412 pub fn parent_span_id(&self) -> Option<[u8; 8]> {
413 self.parent_span_id
414 }
415
416 pub fn end(self) {}
421
422 pub(crate) fn set_http(&self, method: &str, target: &str) {
423 *self.http_method.borrow_mut() = Some(method.to_string());
424 *self.http_target.borrow_mut() = Some(target.to_string());
425 }
426
427 pub(crate) fn set_http_status(&self, status: i16) {
429 self.http_status.set(Some(status));
430 if status >= 500 {
431 self.set_error();
432 }
433 }
434
435 fn finish(&mut self, end_ns: u64) -> SpanData {
436 SpanData {
437 trace_id: self.trace_id,
438 span_id: self.span_id,
439 parent_span_id: self.parent_span_id,
440 name: std::mem::take(&mut self.name),
441 start_ns: self.start_ns,
442 end_ns,
443 http_method: self.http_method.get_mut().take().unwrap_or_default(),
444 http_target: self.http_target.get_mut().take().unwrap_or_default(),
445 http_status: self.http_status.get().unwrap_or(0),
446 status_code: self.status_code.get(),
447 kind: self.kind,
448 attributes: std::mem::take(self.attributes.get_mut()),
449 }
450 }
451}
452
453impl Drop for Span {
454 fn drop(&mut self) {
455 ACTIVE_STACK.with(|stack| {
456 let mut stack = stack.borrow_mut();
457 if let Some(pos) = stack.iter().rposition(|c| c.span_id == self.span_id) {
460 stack.truncate(pos);
461 }
462 });
463
464 if !self.sampled {
465 return;
466 }
467 let Some(t) = tracer() else { return };
468 let end_ns = now_ns();
469 t.record(self.finish(end_ns));
470 }
471}
472
473pub fn span(name: &str) -> Span {
484 Span::new(name, SpanKind::Internal)
485}
486
487pub fn client_span(name: &str) -> Span {
490 Span::new(name, SpanKind::Client)
491}
492
493pub trait Exporter: Send + Sync {
497 fn export(&self, spans: &[SpanData]);
498 fn shutdown(&self) {}
499}
500
501fn attr_value_json(v: &AttributeValue) -> String {
504 match v {
505 AttributeValue::String(s) => format!("{{\"stringValue\":\"{s}\"}}"),
506 AttributeValue::Int(i) => format!("{{\"intValue\":\"{i}\"}}"),
507 AttributeValue::Float(f) => format!("{{\"doubleValue\":{f}}}"),
508 AttributeValue::Bool(b) => format!("{{\"boolValue\":{b}}}"),
509 }
510}
511
512fn attr_json(key: &str, value: &AttributeValue) -> String {
514 format!("{{\"key\":\"{key}\",\"value\":{}}}", attr_value_json(value))
515}
516
517pub struct StdoutExporter;
520
521impl StdoutExporter {
522 fn format_span(span: &SpanData) -> String {
523 let http_attrs = if span.http_method.is_empty() {
524 String::new()
525 } else {
526 format!(
527 ",\"httpMethod\":\"{}\",\"httpTarget\":\"{}\",\"httpStatus\":{}",
528 span.http_method, span.http_target, span.http_status,
529 )
530 };
531 let extra_attrs: String = span.attributes.iter()
532 .map(|(k, v)| format!(",\"{k}\":{}", attr_value_json(v)))
533 .collect();
534 format!(
535 "{{\"traceId\":\"{}\",\"spanId\":\"{}\",\"parentSpanId\":{},\
536 \"name\":\"{}\",\"kind\":{},\"startNs\":{},\"durationMs\":{:.3}{http_attrs}{extra_attrs}}}",
537 hex16(&span.trace_id),
538 hex8(&span.span_id),
539 span.parent_span_id
540 .as_ref()
541 .map(|p| format!("\"{}\"", hex8(p)))
542 .unwrap_or_else(|| "null".to_string()),
543 span.name,
544 span.kind as i32,
545 span.start_ns,
546 span.duration_ms(),
547 )
548 }
549}
550
551impl Exporter for StdoutExporter {
552 fn export(&self, spans: &[SpanData]) {
553 for span in spans {
554 println!("{}", Self::format_span(span));
555 }
556 }
557}
558
559pub struct OtlpHttpExporter {
564 host: String,
565 port: u16,
566 timeout: Duration,
567 service_name: String,
568 service_version: String,
569}
570
571impl OtlpHttpExporter {
572 pub fn new(endpoint: &str, service_name: &str, service_version: &str) -> Self {
573 let stripped = endpoint
575 .trim_start_matches("http://")
576 .trim_start_matches("https://");
577 let (host, port) = if let Some(i) = stripped.rfind(':') {
578 let p = stripped[i + 1..].parse().unwrap_or(4318);
579 (stripped[..i].to_string(), p)
580 } else {
581 (stripped.to_string(), 4318)
582 };
583 OtlpHttpExporter {
584 host,
585 port,
586 timeout: Duration::from_secs(5),
587 service_name: service_name.to_string(),
588 service_version: service_version.to_string(),
589 }
590 }
591
592 pub fn build_body(&self, spans: &[SpanData]) -> String {
593 let span_jsons: Vec<String> = spans.iter().map(|s| {
594 let parent = s.parent_span_id
595 .as_ref()
596 .map(|p| format!(",\"parentSpanId\":\"{}\"", hex8(p)))
597 .unwrap_or_default();
598 let status_msg = if s.status_code == 2 { "Error" } else { "Unset" };
599
600 let mut attrs: Vec<String> = Vec::new();
601 if !s.http_method.is_empty() {
602 attrs.push(format!("{{\"key\":\"http.method\",\"value\":{{\"stringValue\":\"{}\"}} }}", s.http_method));
603 attrs.push(format!("{{\"key\":\"http.target\",\"value\":{{\"stringValue\":\"{}\"}} }}", s.http_target));
604 attrs.push(format!("{{\"key\":\"http.status_code\",\"value\":{{\"intValue\":\"{}\"}} }}", s.http_status));
605 }
606 attrs.extend(s.attributes.iter().map(|(k, v)| attr_json(k, v)));
607
608 format!(
609 "{{\"traceId\":\"{trace}\",\"spanId\":\"{span}\"{parent},\
610 \"name\":\"{name}\",\"kind\":{kind},\
611 \"startTimeUnixNano\":\"{start}\",\"endTimeUnixNano\":\"{end}\",\
612 \"attributes\":[{attrs}],\
613 \"status\":{{\"code\":{scode},\"message\":\"{smsg}\"}} }}",
614 trace = hex16(&s.trace_id),
615 span = hex8(&s.span_id),
616 name = s.name,
617 kind = s.kind as i32,
618 start = s.start_ns,
619 end = s.end_ns,
620 attrs = attrs.join(","),
621 scode = s.status_code,
622 smsg = status_msg,
623 )
624 }).collect();
625
626 format!(
627 "{{\"resourceSpans\":[{{\"resource\":{{\"attributes\":[\
628 {{\"key\":\"service.name\",\"value\":{{\"stringValue\":\"{svc}\"}} }},\
629 {{\"key\":\"service.version\",\"value\":{{\"stringValue\":\"{ver}\"}} }}\
630 ]}},\"scopeSpans\":[{{\"scope\":{{\"name\":\"rws\"}},\"spans\":[{spans}]}}]}}]}}",
631 svc = self.service_name,
632 ver = self.service_version,
633 spans = span_jsons.join(","),
634 )
635 }
636
637 fn post(&self, body: &str) {
638 use std::net::ToSocketAddrs;
639 let addr = format!("{}:{}", self.host, self.port);
640 let Some(socket_addr) = addr.to_socket_addrs().ok().and_then(|mut i| i.next()) else {
641 return;
642 };
643 let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, self.timeout) else {
644 return;
645 };
646 let _ = stream.set_write_timeout(Some(self.timeout));
647 let _ = stream.set_read_timeout(Some(self.timeout));
648 let request = format!(
649 "POST /v1/traces HTTP/1.1\r\n\
650 Host: {host}:{port}\r\n\
651 Content-Type: application/json\r\n\
652 Content-Length: {len}\r\n\
653 Connection: close\r\n\r\n\
654 {body}",
655 host = self.host,
656 port = self.port,
657 len = body.len(),
658 body = body,
659 );
660 if stream.write_all(request.as_bytes()).is_ok() {
661 let mut _buf = [0u8; 256];
662 let _ = stream.read(&mut _buf); }
664 }
665}
666
667impl Exporter for OtlpHttpExporter {
668 fn export(&self, spans: &[SpanData]) {
669 if spans.is_empty() { return; }
670 let body = self.build_body(spans);
671 self.post(&body);
672 }
673}
674
675struct GlobalTracer {
678 exporter: Box<dyn Exporter>,
679 batch: Mutex<Vec<SpanData>>,
680 batch_size: usize,
681 sample_rate: f64,
682 shutdown_flag: AtomicBool,
683}
684
685impl GlobalTracer {
686 fn should_sample(&self) -> bool {
687 if self.sample_rate >= 1.0 { return true; }
688 if self.sample_rate <= 0.0 { return false; }
689 let n = COUNTER.load(Ordering::Relaxed);
691 (n % 10000) < (self.sample_rate * 10000.0) as u64
692 }
693
694 fn record(&self, span: SpanData) {
695 let mut guard = self.batch.lock().unwrap();
696 guard.push(span);
697 if guard.len() >= self.batch_size {
698 let spans = std::mem::take(&mut *guard);
699 drop(guard);
700 self.exporter.export(&spans);
701 }
702 }
703
704 fn flush(&self) {
705 let spans = std::mem::take(&mut *self.batch.lock().unwrap());
706 if !spans.is_empty() {
707 self.exporter.export(&spans);
708 }
709 }
710}
711
712static TRACER: OnceLock<GlobalTracer> = OnceLock::new();
713
714fn tracer() -> Option<&'static GlobalTracer> {
715 TRACER.get()
716}
717
718#[derive(Clone, Debug)]
722pub enum ExporterConfig {
723 Stdout,
725 Otlp { endpoint: String },
730 Discard,
732}
733
734#[derive(Clone, Debug)]
736pub struct TracingConfig {
737 pub service_name: String,
739 pub service_version: String,
741 pub exporter: ExporterConfig,
743 pub sample_rate: f64,
745 pub batch_size: usize,
747}
748
749impl Default for TracingConfig {
750 fn default() -> Self {
751 TracingConfig {
752 service_name: "rws".to_string(),
753 service_version: env!("CARGO_PKG_VERSION").to_string(),
754 exporter: ExporterConfig::Stdout,
755 sample_rate: 1.0,
756 batch_size: 128,
757 }
758 }
759}
760
761pub fn setup(config: TracingConfig) {
766 let exporter: Box<dyn Exporter> = match &config.exporter {
767 ExporterConfig::Stdout => Box::new(StdoutExporter),
768 ExporterConfig::Otlp { endpoint } => Box::new(OtlpHttpExporter::new(
769 endpoint,
770 &config.service_name,
771 &config.service_version,
772 )),
773 ExporterConfig::Discard => Box::new(DiscardExporter),
774 };
775 setup_with_exporter(config, exporter);
776}
777
778pub fn setup_with_exporter(config: TracingConfig, exporter: Box<dyn Exporter>) {
785 let _ = TRACER.set(GlobalTracer {
786 exporter,
787 batch: Mutex::new(Vec::new()),
788 batch_size: config.batch_size.max(1),
789 sample_rate: config.sample_rate.clamp(0.0, 1.0),
790 shutdown_flag: AtomicBool::new(false),
791 });
792}
793
794pub fn setup_from_env() {
800 let service_name = std::env::var("OTEL_SERVICE_NAME")
801 .unwrap_or_else(|_| "rws".to_string());
802 let sample_rate: f64 = std::env::var("OTEL_TRACES_SAMPLER_ARG")
803 .ok()
804 .and_then(|v| v.parse().ok())
805 .unwrap_or(1.0);
806 let exporter = match std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok() {
807 Some(ep) if !ep.is_empty() => ExporterConfig::Otlp { endpoint: ep },
808 _ => ExporterConfig::Stdout,
809 };
810 setup(TracingConfig {
811 service_name,
812 service_version: env!("CARGO_PKG_VERSION").to_string(),
813 exporter,
814 sample_rate,
815 batch_size: 128,
816 });
817}
818
819pub fn shutdown() {
822 if let Some(t) = tracer() {
823 t.shutdown_flag.store(true, Ordering::Relaxed);
824 t.flush();
825 t.exporter.shutdown();
826 }
827}
828
829pub fn flush() {
831 if let Some(t) = tracer() {
832 t.flush();
833 }
834}
835
836pub struct OtelLayer;
850
851impl Middleware for OtelLayer {
852 fn handle(
853 &self,
854 request: &Request,
855 connection: &ConnectionInfo,
856 next: &dyn Application,
857 ) -> Result<Response, String> {
858 if tracer().is_none() {
859 return next.execute(request, connection);
860 }
861
862 let incoming = request.headers.iter()
864 .find(|h| h.name.eq_ignore_ascii_case("traceparent"))
865 .and_then(|h| TraceContext::parse(&h.value));
866
867 let path = strip_query(&request.request_uri).to_string();
868 let root = Span::start_root(&format!("{} {}", request.method, path), SpanKind::Server, incoming);
869 root.set_http(&request.method, &request.request_uri);
870
871 let result = next.execute(request, connection);
872
873 let status = match &result {
874 Ok(r) => r.status_code,
875 Err(_) => 500,
876 };
877 root.set_http_status(status);
878
879 result
880 }
883}
884
885struct DiscardExporter;
888impl Exporter for DiscardExporter {
889 fn export(&self, _: &[SpanData]) {}
890}
891
892pub struct CapturingExporter {
913 pub spans: Mutex<Vec<SpanData>>,
914}
915
916impl CapturingExporter {
917 pub fn new() -> Self {
918 CapturingExporter { spans: Mutex::new(Vec::new()) }
919 }
920
921 pub fn take(&self) -> Vec<SpanData> {
922 std::mem::take(&mut *self.spans.lock().unwrap())
923 }
924}
925
926impl Default for CapturingExporter {
927 fn default() -> Self { Self::new() }
928}
929
930impl Exporter for CapturingExporter {
931 fn export(&self, spans: &[SpanData]) {
932 self.spans.lock().unwrap().extend_from_slice(spans);
933 }
934}
935
936impl Exporter for std::sync::Arc<CapturingExporter> {
941 fn export(&self, spans: &[SpanData]) {
942 self.spans.lock().unwrap().extend_from_slice(spans);
943 }
944}