1use dashmap::DashMap;
11use parking_lot::Mutex;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
16
17use super::NodeId;
18
19fn random_u64() -> u64 {
40 let mut bytes = [0u8; 8];
41 if let Err(e) = getrandom::fill(&mut bytes) {
42 use std::io::Write;
43 let _ = writeln!(
44 std::io::stderr(),
45 "FATAL: behavior::context::random_u64 getrandom failure ({e:?}); \
46 aborting to avoid panic across the FFI boundary"
47 );
48 std::process::abort();
49 }
50 u64::from_le_bytes(bytes)
51}
52
53fn random_f64() -> f64 {
55 let r = random_u64();
56 (r as f64) / (u64::MAX as f64)
57}
58
59fn percent_encode(s: &str) -> String {
61 let mut result = String::with_capacity(s.len() * 3);
62 for b in s.bytes() {
63 match b {
64 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
65 result.push(b as char);
66 }
67 _ => {
68 result.push_str(&format!("%{:02X}", b));
69 }
70 }
71 }
72 result
73}
74
75fn percent_decode(s: &str) -> Option<String> {
77 let mut result = Vec::with_capacity(s.len());
78 let mut chars = s.chars().peekable();
79
80 while let Some(c) = chars.next() {
81 if c == '%' {
82 let hex: String = chars.by_ref().take(2).collect();
83 if hex.len() != 2 {
84 return None;
85 }
86 let byte = u8::from_str_radix(&hex, 16).ok()?;
87 result.push(byte);
88 } else {
89 result.push(c as u8);
90 }
91 }
92
93 String::from_utf8(result).ok()
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
98pub struct TraceId {
99 pub high: u64,
101 pub low: u64,
103}
104
105impl TraceId {
106 pub fn generate() -> Self {
108 Self {
109 high: random_u64(),
110 low: random_u64(),
111 }
112 }
113
114 pub fn from_hex(s: &str) -> Option<Self> {
116 if s.len() != 32 {
117 return None;
118 }
119 let high = u64::from_str_radix(&s[0..16], 16).ok()?;
120 let low = u64::from_str_radix(&s[16..32], 16).ok()?;
121 Some(Self { high, low })
122 }
123
124 pub fn to_hex(&self) -> String {
126 format!("{:016x}{:016x}", self.high, self.low)
127 }
128}
129
130impl Default for TraceId {
131 fn default() -> Self {
132 Self::generate()
133 }
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
138pub struct SpanId(pub u64);
139
140impl SpanId {
141 pub fn generate() -> Self {
143 Self(random_u64())
144 }
145
146 pub fn from_hex(s: &str) -> Option<Self> {
148 u64::from_str_radix(s, 16).ok().map(Self)
149 }
150
151 pub fn to_hex(&self) -> String {
153 format!("{:016x}", self.0)
154 }
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
159pub struct TraceFlags(pub u8);
160
161impl TraceFlags {
162 pub const SAMPLED: u8 = 0x01;
164
165 pub fn sampled() -> Self {
167 Self(Self::SAMPLED)
168 }
169
170 pub fn not_sampled() -> Self {
172 Self(0)
173 }
174
175 pub fn is_sampled(&self) -> bool {
177 self.0 & Self::SAMPLED != 0
178 }
179}
180
181impl Default for TraceFlags {
182 fn default() -> Self {
183 Self::sampled()
184 }
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
189pub enum SpanKind {
190 #[default]
192 Internal,
193 Server,
195 Client,
197 Producer,
199 Consumer,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
205pub enum SpanStatus {
206 #[default]
208 Unset,
209 Ok,
211 Error {
213 message: String,
215 },
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct Span {
221 pub span_id: SpanId,
223 pub parent_span_id: Option<SpanId>,
225 pub trace_id: TraceId,
227 pub name: String,
229 pub kind: SpanKind,
231 pub start_time_us: u64,
233 pub end_time_us: Option<u64>,
235 pub attributes: HashMap<String, AttributeValue>,
237 pub status: SpanStatus,
239 pub events: Vec<SpanEvent>,
241 pub links: Vec<SpanLink>,
243 pub node_id: NodeId,
245}
246
247impl Span {
248 pub fn new(trace_id: TraceId, name: impl Into<String>, node_id: NodeId) -> Self {
250 Self {
251 span_id: SpanId::generate(),
252 parent_span_id: None,
253 trace_id,
254 name: name.into(),
255 kind: SpanKind::Internal,
256 start_time_us: now_micros(),
257 end_time_us: None,
258 attributes: HashMap::new(),
259 status: SpanStatus::Unset,
260 events: Vec::new(),
261 links: Vec::new(),
262 node_id,
263 }
264 }
265
266 pub fn with_parent(mut self, parent: SpanId) -> Self {
268 self.parent_span_id = Some(parent);
269 self
270 }
271
272 pub fn with_kind(mut self, kind: SpanKind) -> Self {
274 self.kind = kind;
275 self
276 }
277
278 pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<AttributeValue>) {
280 self.attributes.insert(key.into(), value.into());
281 }
282
283 pub fn add_event(&mut self, name: impl Into<String>) {
285 self.events.push(SpanEvent {
286 name: name.into(),
287 timestamp_us: now_micros(),
288 attributes: HashMap::new(),
289 });
290 }
291
292 pub fn add_event_with_attributes(
294 &mut self,
295 name: impl Into<String>,
296 attributes: HashMap<String, AttributeValue>,
297 ) {
298 self.events.push(SpanEvent {
299 name: name.into(),
300 timestamp_us: now_micros(),
301 attributes,
302 });
303 }
304
305 pub fn add_link(&mut self, trace_id: TraceId, span_id: SpanId) {
307 self.links.push(SpanLink {
308 trace_id,
309 span_id,
310 attributes: HashMap::new(),
311 });
312 }
313
314 pub fn set_ok(&mut self) {
316 self.status = SpanStatus::Ok;
317 }
318
319 pub fn set_error(&mut self, message: impl Into<String>) {
321 self.status = SpanStatus::Error {
322 message: message.into(),
323 };
324 }
325
326 pub fn end(&mut self) {
328 if self.end_time_us.is_none() {
329 self.end_time_us = Some(now_micros());
330 }
331 }
332
333 pub fn duration_us(&self) -> Option<u64> {
335 self.end_time_us
336 .map(|end| end.saturating_sub(self.start_time_us))
337 }
338}
339
340#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
342pub enum AttributeValue {
343 String(String),
345 Int(i64),
347 Float(f64),
349 Bool(bool),
351 StringArray(Vec<String>),
353 IntArray(Vec<i64>),
355 FloatArray(Vec<f64>),
357 BoolArray(Vec<bool>),
359}
360
361impl From<String> for AttributeValue {
362 fn from(s: String) -> Self {
363 Self::String(s)
364 }
365}
366
367impl From<&str> for AttributeValue {
368 fn from(s: &str) -> Self {
369 Self::String(s.to_string())
370 }
371}
372
373impl From<i64> for AttributeValue {
374 fn from(n: i64) -> Self {
375 Self::Int(n)
376 }
377}
378
379impl From<i32> for AttributeValue {
380 fn from(n: i32) -> Self {
381 Self::Int(n as i64)
382 }
383}
384
385impl From<f64> for AttributeValue {
386 fn from(n: f64) -> Self {
387 Self::Float(n)
388 }
389}
390
391impl From<bool> for AttributeValue {
392 fn from(b: bool) -> Self {
393 Self::Bool(b)
394 }
395}
396
397#[derive(Debug, Clone, Serialize, Deserialize)]
399pub struct SpanEvent {
400 pub name: String,
402 pub timestamp_us: u64,
404 pub attributes: HashMap<String, AttributeValue>,
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct SpanLink {
411 pub trace_id: TraceId,
413 pub span_id: SpanId,
415 pub attributes: HashMap<String, AttributeValue>,
417}
418
419#[derive(Debug, Clone, Default, Serialize, Deserialize)]
421pub struct Baggage {
422 items: HashMap<String, BaggageItem>,
423}
424
425#[derive(Debug, Clone, Serialize, Deserialize)]
427pub struct BaggageItem {
428 pub value: String,
430 pub metadata: Option<String>,
432}
433
434impl Baggage {
435 pub fn new() -> Self {
437 Self::default()
438 }
439
440 pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) {
442 self.items.insert(
443 key.into(),
444 BaggageItem {
445 value: value.into(),
446 metadata: None,
447 },
448 );
449 }
450
451 pub fn set_with_metadata(
453 &mut self,
454 key: impl Into<String>,
455 value: impl Into<String>,
456 metadata: impl Into<String>,
457 ) {
458 self.items.insert(
459 key.into(),
460 BaggageItem {
461 value: value.into(),
462 metadata: Some(metadata.into()),
463 },
464 );
465 }
466
467 pub fn get(&self, key: &str) -> Option<&str> {
469 self.items.get(key).map(|item| item.value.as_str())
470 }
471
472 pub fn get_with_metadata(&self, key: &str) -> Option<(&str, Option<&str>)> {
474 self.items
475 .get(key)
476 .map(|item| (item.value.as_str(), item.metadata.as_deref()))
477 }
478
479 pub fn remove(&mut self, key: &str) -> Option<String> {
481 self.items.remove(key).map(|item| item.value)
482 }
483
484 pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
486 self.items
487 .iter()
488 .map(|(k, v)| (k.as_str(), v.value.as_str()))
489 }
490
491 pub fn len(&self) -> usize {
493 self.items.len()
494 }
495
496 pub fn is_empty(&self) -> bool {
498 self.items.is_empty()
499 }
500
501 pub fn merge(&mut self, other: &Baggage) {
503 for (key, item) in &other.items {
504 self.items.insert(key.clone(), item.clone());
505 }
506 }
507}
508
509#[derive(Debug, Clone, Serialize, Deserialize)]
511pub struct Context {
512 pub trace_id: TraceId,
514 pub span_id: SpanId,
516 pub parent_span_id: Option<SpanId>,
518 pub trace_flags: TraceFlags,
520 pub trace_state: HashMap<String, String>,
522 pub baggage: Baggage,
524 pub deadline_us: Option<u64>,
526 pub origin_node: NodeId,
528 pub request_id: Option<String>,
530 pub correlation_id: Option<String>,
532 pub hop_count: u32,
534 pub max_hops: Option<u32>,
536}
537
538impl Context {
539 pub fn new(origin_node: NodeId) -> Self {
541 Self {
542 trace_id: TraceId::generate(),
543 span_id: SpanId::generate(),
544 parent_span_id: None,
545 trace_flags: TraceFlags::sampled(),
546 trace_state: HashMap::new(),
547 baggage: Baggage::new(),
548 deadline_us: None,
549 origin_node,
550 request_id: None,
551 correlation_id: None,
552 hop_count: 0,
553 max_hops: None,
554 }
555 }
556
557 pub fn child(&self, new_span_name: &str) -> Self {
559 let _ = new_span_name; Self {
561 trace_id: self.trace_id,
562 span_id: SpanId::generate(),
563 parent_span_id: Some(self.span_id),
564 trace_flags: self.trace_flags,
565 trace_state: self.trace_state.clone(),
566 baggage: self.baggage.clone(),
567 deadline_us: self.deadline_us,
568 origin_node: self.origin_node,
569 request_id: self.request_id.clone(),
570 correlation_id: self.correlation_id.clone(),
571 hop_count: self.hop_count,
572 max_hops: self.max_hops,
573 }
574 }
575
576 pub fn for_remote(&self) -> Self {
578 Self {
579 trace_id: self.trace_id,
580 span_id: SpanId::generate(),
581 parent_span_id: Some(self.span_id),
582 trace_flags: self.trace_flags,
583 trace_state: self.trace_state.clone(),
584 baggage: self.baggage.clone(),
585 deadline_us: self.deadline_us,
586 origin_node: self.origin_node,
587 request_id: self.request_id.clone(),
588 correlation_id: self.correlation_id.clone(),
589 hop_count: self.hop_count.saturating_add(1),
590 max_hops: self.max_hops,
591 }
592 }
593
594 pub fn with_timeout(mut self, timeout: Duration) -> Self {
596 self.deadline_us = Some(now_micros() + timeout.as_micros() as u64);
597 self
598 }
599
600 pub fn with_deadline(mut self, deadline_us: u64) -> Self {
602 self.deadline_us = Some(deadline_us);
603 self
604 }
605
606 pub fn is_expired(&self) -> bool {
608 self.deadline_us
609 .map(|deadline| now_micros() > deadline)
610 .unwrap_or(false)
611 }
612
613 pub fn remaining(&self) -> Option<Duration> {
615 self.deadline_us.and_then(|deadline| {
616 let now = now_micros();
617 if now >= deadline {
618 None
619 } else {
620 Some(Duration::from_micros(deadline - now))
621 }
622 })
623 }
624
625 pub fn exceeded_hops(&self) -> bool {
627 self.max_hops
628 .map(|max| self.hop_count >= max)
629 .unwrap_or(false)
630 }
631
632 pub fn with_max_hops(mut self, max: u32) -> Self {
634 self.max_hops = Some(max);
635 self
636 }
637
638 pub fn with_request_id(mut self, id: impl Into<String>) -> Self {
640 self.request_id = Some(id.into());
641 self
642 }
643
644 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
646 self.correlation_id = Some(id.into());
647 self
648 }
649
650 pub fn to_traceparent(&self) -> String {
652 format!(
653 "00-{}-{}-{:02x}",
654 self.trace_id.to_hex(),
655 self.span_id.to_hex(),
656 self.trace_flags.0
657 )
658 }
659
660 pub fn from_traceparent(header: &str, origin_node: NodeId) -> Option<Self> {
662 let parts: Vec<&str> = header.split('-').collect();
663 if parts.len() != 4 || parts[0] != "00" {
664 return None;
665 }
666
667 let trace_id = TraceId::from_hex(parts[1])?;
668 let span_id = SpanId::from_hex(parts[2])?;
669 let flags = u8::from_str_radix(parts[3], 16).ok()?;
670
671 Some(Self {
672 trace_id,
673 span_id: SpanId::generate(),
674 parent_span_id: Some(span_id),
675 trace_flags: TraceFlags(flags),
676 trace_state: HashMap::new(),
677 baggage: Baggage::new(),
678 deadline_us: None,
679 origin_node,
680 request_id: None,
681 correlation_id: None,
682 hop_count: 1,
683 max_hops: None,
684 })
685 }
686}
687
688#[derive(Debug, Clone, Serialize, Deserialize)]
690pub enum SamplingStrategy {
691 AlwaysOn,
693 AlwaysOff,
695 Ratio(f64),
697 RateLimited {
699 max_per_second: u32,
701 },
702 ParentBased,
704 Custom(String),
706}
707
708impl Default for SamplingStrategy {
709 fn default() -> Self {
710 Self::Ratio(0.1) }
712}
713
714#[derive(Debug)]
716pub struct Sampler {
717 strategy: SamplingStrategy,
718 count: AtomicU64,
719 last_reset: Mutex<Instant>,
720}
721
722impl Sampler {
723 pub fn new(strategy: SamplingStrategy) -> Self {
725 Self {
726 strategy,
727 count: AtomicU64::new(0),
728 last_reset: Mutex::new(Instant::now()),
729 }
730 }
731
732 pub fn should_sample(&self, parent_sampled: Option<bool>) -> bool {
734 match &self.strategy {
735 SamplingStrategy::AlwaysOn => true,
736 SamplingStrategy::AlwaysOff => false,
737 SamplingStrategy::Ratio(ratio) => random_f64() < *ratio,
738 SamplingStrategy::RateLimited { max_per_second } => {
739 let mut last_reset = self.last_reset.lock();
740 let now = Instant::now();
741
742 if now.duration_since(*last_reset) >= Duration::from_secs(1) {
744 self.count.store(0, Ordering::Relaxed);
745 *last_reset = now;
746 }
747
748 let current = self.count.fetch_add(1, Ordering::Relaxed);
749 current < *max_per_second as u64
750 }
751 SamplingStrategy::ParentBased => parent_sampled.unwrap_or(true),
752 SamplingStrategy::Custom(_) => true, }
754 }
755}
756
757#[derive(Debug, Clone, PartialEq, Eq)]
759pub enum ContextError {
760 Expired,
762 MaxHopsExceeded,
764 NotFound,
766 InvalidTraceId,
768 CapacityExceeded,
770}
771
772impl std::fmt::Display for ContextError {
773 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
774 match self {
775 Self::Expired => write!(f, "context has expired"),
776 Self::MaxHopsExceeded => write!(f, "maximum hops exceeded"),
777 Self::NotFound => write!(f, "context not found"),
778 Self::InvalidTraceId => write!(f, "invalid trace ID"),
779 Self::CapacityExceeded => write!(f, "storage capacity exceeded"),
780 }
781 }
782}
783
784impl std::error::Error for ContextError {}
785
786#[derive(Debug)]
788struct ContextEntry {
789 context: Context,
790 created_at: Instant,
791 spans: Vec<Span>,
792}
793
794#[derive(Debug, Clone, Default)]
796pub struct ContextStoreStats {
797 pub active_traces: u64,
799 pub total_spans: u64,
801 pub sampled_traces: u64,
803 pub dropped_traces: u64,
805 pub expired_traces: u64,
807}
808
809pub struct ContextStore {
811 contexts: DashMap<TraceId, ContextEntry>,
813 max_traces: usize,
815 max_spans_per_trace: usize,
817 trace_ttl: Duration,
819 sampler: Sampler,
821 sampled_count: AtomicU64,
823 dropped_count: AtomicU64,
824 expired_count: AtomicU64,
825 active_count: std::sync::atomic::AtomicUsize,
833}
834
835impl ContextStore {
836 pub fn new(max_traces: usize, max_spans_per_trace: usize, trace_ttl: Duration) -> Self {
838 Self {
839 contexts: DashMap::new(),
840 max_traces,
841 max_spans_per_trace,
842 trace_ttl,
843 sampler: Sampler::new(SamplingStrategy::default()),
844 sampled_count: AtomicU64::new(0),
845 dropped_count: AtomicU64::new(0),
846 expired_count: AtomicU64::new(0),
847 active_count: std::sync::atomic::AtomicUsize::new(0),
848 }
849 }
850
851 pub fn with_sampler(mut self, sampler: Sampler) -> Self {
855 self.sampler = sampler;
856 self
857 }
858
859 fn try_reserve_slot(&self) -> Option<SlotReservation<'_>> {
875 use std::sync::atomic::Ordering;
876 let ok = self
878 .active_count
879 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |cur| {
880 if cur < self.max_traces {
881 Some(cur + 1)
882 } else {
883 None
884 }
885 })
886 .is_ok();
887 if ok {
888 Some(SlotReservation { store: self })
889 } else {
890 None
891 }
892 }
893
894 fn release_slot(&self) {
901 use std::sync::atomic::Ordering;
902 self.active_count
903 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |cur| {
904 Some(cur.saturating_sub(1))
905 })
906 .ok();
907 }
908}
909
910pub(super) struct SlotReservation<'a> {
927 store: &'a ContextStore,
928}
929
930impl<'a> SlotReservation<'a> {
931 fn commit(self) {
935 std::mem::forget(self);
939 }
940}
941
942impl<'a> Drop for SlotReservation<'a> {
943 fn drop(&mut self) {
944 self.store.release_slot();
948 }
949}
950
951impl ContextStore {
952 pub fn with_sampling(mut self, strategy: SamplingStrategy) -> Self {
954 self.sampler = Sampler::new(strategy);
955 self
956 }
957
958 pub fn create_context(&self, origin_node: NodeId) -> Result<Context, ContextError> {
969 let guard = match self.try_reserve_slot() {
975 Some(g) => g,
976 None => {
977 self.cleanup_expired();
978 match self.try_reserve_slot() {
979 Some(g) => g,
980 None => {
981 self.dropped_count.fetch_add(1, Ordering::Relaxed);
982 return Err(ContextError::CapacityExceeded);
983 }
984 }
985 }
986 };
987
988 let ctx = Context::new(origin_node);
989
990 if !self.sampler.should_sample(None) {
992 let mut unsampled = ctx.clone();
993 unsampled.trace_flags = TraceFlags::not_sampled();
994 return Ok(unsampled);
997 }
998
999 self.sampled_count.fetch_add(1, Ordering::Relaxed);
1000
1001 self.contexts.insert(
1002 ctx.trace_id,
1003 ContextEntry {
1004 context: ctx.clone(),
1005 created_at: Instant::now(),
1006 spans: Vec::new(),
1007 },
1008 );
1009
1010 guard.commit();
1013
1014 Ok(ctx)
1015 }
1016
1017 pub fn continue_context(&self, ctx: Context) -> Result<Context, ContextError> {
1019 if ctx.is_expired() {
1021 return Err(ContextError::Expired);
1022 }
1023
1024 if ctx.exceeded_hops() {
1026 return Err(ContextError::MaxHopsExceeded);
1027 }
1028
1029 if self.contexts.contains_key(&ctx.trace_id) {
1031 return Ok(ctx);
1032 }
1033
1034 let guard = match self.try_reserve_slot() {
1037 Some(g) => g,
1038 None => {
1039 self.cleanup_expired();
1040 match self.try_reserve_slot() {
1041 Some(g) => g,
1042 None => {
1043 self.dropped_count.fetch_add(1, Ordering::Relaxed);
1044 return Err(ContextError::CapacityExceeded);
1045 }
1046 }
1047 }
1048 };
1049
1050 if !self
1052 .sampler
1053 .should_sample(Some(ctx.trace_flags.is_sampled()))
1054 {
1055 return Ok(ctx);
1057 }
1058
1059 self.sampled_count.fetch_add(1, Ordering::Relaxed);
1060
1061 let prev = self.contexts.insert(
1071 ctx.trace_id,
1072 ContextEntry {
1073 context: ctx.clone(),
1074 created_at: Instant::now(),
1075 spans: Vec::new(),
1076 },
1077 );
1078 guard.commit();
1079 if prev.is_some() {
1080 self.release_slot();
1083 }
1084
1085 Ok(ctx)
1086 }
1087
1088 pub fn add_span(&self, span: Span) -> Result<(), ContextError> {
1090 if let Some(mut entry) = self.contexts.get_mut(&span.trace_id) {
1091 if entry.spans.len() < self.max_spans_per_trace {
1092 entry.spans.push(span);
1093 }
1094 Ok(())
1095 } else {
1096 Err(ContextError::NotFound)
1097 }
1098 }
1099
1100 pub fn get_context(&self, trace_id: &TraceId) -> Option<Context> {
1102 self.contexts
1103 .get(trace_id)
1104 .map(|entry| entry.context.clone())
1105 }
1106
1107 pub fn get_spans(&self, trace_id: &TraceId) -> Vec<Span> {
1109 self.contexts
1110 .get(trace_id)
1111 .map(|entry| entry.spans.clone())
1112 .unwrap_or_default()
1113 }
1114
1115 pub fn complete_trace(&self, trace_id: &TraceId) -> Option<(Context, Vec<Span>)> {
1120 let removed = self
1121 .contexts
1122 .remove(trace_id)
1123 .map(|(_, entry)| (entry.context, entry.spans));
1124 if removed.is_some() {
1125 self.release_slot();
1126 }
1127 removed
1128 }
1129
1130 pub fn cleanup_expired(&self) {
1136 let now = Instant::now();
1137 let mut expired = Vec::new();
1138
1139 for entry in self.contexts.iter() {
1140 if now.duration_since(entry.created_at) > self.trace_ttl {
1141 expired.push(*entry.key());
1142 }
1143 }
1144
1145 for trace_id in expired {
1146 if self.contexts.remove(&trace_id).is_some() {
1147 self.expired_count.fetch_add(1, Ordering::Relaxed);
1148 self.release_slot();
1149 }
1150 }
1151 }
1152
1153 pub fn stats(&self) -> ContextStoreStats {
1155 let mut total_spans = 0;
1156 for entry in self.contexts.iter() {
1157 total_spans += entry.spans.len() as u64;
1158 }
1159
1160 ContextStoreStats {
1161 active_traces: self.contexts.len() as u64,
1162 total_spans,
1163 sampled_traces: self.sampled_count.load(Ordering::Relaxed),
1164 dropped_traces: self.dropped_count.load(Ordering::Relaxed),
1165 expired_traces: self.expired_count.load(Ordering::Relaxed),
1166 }
1167 }
1168}
1169
1170pub struct ContextScope<'a> {
1172 store: &'a ContextStore,
1173 span: Span,
1174 finished: bool,
1175}
1176
1177impl<'a> ContextScope<'a> {
1178 pub fn new(store: &'a ContextStore, ctx: &Context, name: &str, node_id: NodeId) -> Self {
1180 let mut span = Span::new(ctx.trace_id, name, node_id);
1181 if let Some(parent) = ctx.parent_span_id {
1182 span = span.with_parent(parent);
1183 }
1184
1185 Self {
1186 store,
1187 span,
1188 finished: false,
1189 }
1190 }
1191
1192 pub fn with_kind(mut self, kind: SpanKind) -> Self {
1194 self.span.kind = kind;
1195 self
1196 }
1197
1198 pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<AttributeValue>) {
1200 self.span.set_attribute(key, value);
1201 }
1202
1203 pub fn add_event(&mut self, name: impl Into<String>) {
1205 self.span.add_event(name);
1206 }
1207
1208 pub fn set_ok(&mut self) {
1210 self.span.set_ok();
1211 }
1212
1213 pub fn set_error(&mut self, message: impl Into<String>) {
1215 self.span.set_error(message);
1216 }
1217
1218 pub fn finish(mut self) {
1220 self.span.end();
1221 let _ = self.store.add_span(self.span.clone());
1222 self.finished = true;
1223 }
1224
1225 pub fn span(&self) -> &Span {
1227 &self.span
1228 }
1229}
1230
1231impl<'a> Drop for ContextScope<'a> {
1232 fn drop(&mut self) {
1233 if !self.finished {
1234 self.span.end();
1235 let _ = self.store.add_span(self.span.clone());
1236 }
1237 }
1238}
1239
1240#[derive(Debug, Clone, Serialize, Deserialize)]
1242pub struct PropagationContext {
1243 pub traceparent: String,
1245 pub tracestate: Option<String>,
1247 pub baggage: Option<String>,
1249 pub deadline_us: Option<u64>,
1251 pub hop_count: u32,
1253 pub max_hops: Option<u32>,
1255}
1256
1257impl PropagationContext {
1258 pub fn from_context(ctx: &Context) -> Self {
1260 let tracestate = if ctx.trace_state.is_empty() {
1261 None
1262 } else {
1263 Some(
1264 ctx.trace_state
1265 .iter()
1266 .map(|(k, v)| format!("{}={}", k, v))
1267 .collect::<Vec<_>>()
1268 .join(","),
1269 )
1270 };
1271
1272 let baggage = if ctx.baggage.is_empty() {
1273 None
1274 } else {
1275 Some(
1276 ctx.baggage
1277 .iter()
1278 .map(|(k, v)| format!("{}={}", k, percent_encode(v)))
1279 .collect::<Vec<_>>()
1280 .join(","),
1281 )
1282 };
1283
1284 Self {
1285 traceparent: ctx.to_traceparent(),
1286 tracestate,
1287 baggage,
1288 deadline_us: ctx.deadline_us,
1289 hop_count: ctx.hop_count,
1290 max_hops: ctx.max_hops,
1291 }
1292 }
1293
1294 pub fn to_context(&self, origin_node: NodeId) -> Option<Context> {
1296 let mut ctx = Context::from_traceparent(&self.traceparent, origin_node)?;
1297
1298 if let Some(ref ts) = self.tracestate {
1300 for pair in ts.split(',') {
1301 if let Some((k, v)) = pair.split_once('=') {
1302 ctx.trace_state.insert(k.to_string(), v.to_string());
1303 }
1304 }
1305 }
1306
1307 if let Some(ref bg) = self.baggage {
1309 for pair in bg.split(',') {
1310 if let Some((k, v)) = pair.split_once('=') {
1311 if let Some(decoded) = percent_decode(v) {
1312 ctx.baggage.set(k, decoded);
1313 }
1314 }
1315 }
1316 }
1317
1318 ctx.deadline_us = self.deadline_us;
1319 ctx.hop_count = self.hop_count;
1320 ctx.max_hops = self.max_hops;
1321
1322 Some(ctx)
1323 }
1324}
1325
1326fn now_micros() -> u64 {
1328 SystemTime::now()
1329 .duration_since(UNIX_EPOCH)
1330 .unwrap_or_default()
1331 .as_micros() as u64
1332}
1333
1334#[cfg(test)]
1335mod tests {
1336 use super::*;
1337
1338 fn test_node_id() -> NodeId {
1339 [1u8; 32]
1340 }
1341
1342 #[test]
1343 fn test_trace_id() {
1344 let id = TraceId::generate();
1345 let hex = id.to_hex();
1346 assert_eq!(hex.len(), 32);
1347
1348 let parsed = TraceId::from_hex(&hex).unwrap();
1349 assert_eq!(id, parsed);
1350 }
1351
1352 #[test]
1353 fn test_span_id() {
1354 let id = SpanId::generate();
1355 let hex = id.to_hex();
1356 assert_eq!(hex.len(), 16);
1357
1358 let parsed = SpanId::from_hex(&hex).unwrap();
1359 assert_eq!(id, parsed);
1360 }
1361
1362 #[test]
1363 fn test_span_lifecycle() {
1364 let trace_id = TraceId::generate();
1365 let node_id = test_node_id();
1366
1367 let mut span = Span::new(trace_id, "test_operation", node_id);
1368 span.set_attribute("key", "value");
1369 span.add_event("started");
1370
1371 assert!(span.end_time_us.is_none());
1372 span.end();
1373 assert!(span.end_time_us.is_some());
1374 assert!(span.duration_us().is_some());
1375 }
1376
1377 #[test]
1378 fn test_baggage() {
1379 let mut baggage = Baggage::new();
1380 baggage.set("user_id", "12345");
1381 baggage.set_with_metadata("tenant", "acme", "priority=high");
1382
1383 assert_eq!(baggage.get("user_id"), Some("12345"));
1384 assert_eq!(
1385 baggage.get_with_metadata("tenant"),
1386 Some(("acme", Some("priority=high")))
1387 );
1388
1389 let mut other = Baggage::new();
1390 other.set("user_id", "67890");
1391 other.set("request_id", "abc");
1392
1393 baggage.merge(&other);
1394 assert_eq!(baggage.get("user_id"), Some("67890"));
1395 assert_eq!(baggage.get("request_id"), Some("abc"));
1396 }
1397
1398 #[test]
1399 fn test_context_creation() {
1400 let node_id = test_node_id();
1401 let ctx = Context::new(node_id);
1402
1403 assert!(!ctx.is_expired());
1404 assert!(!ctx.exceeded_hops());
1405 assert_eq!(ctx.hop_count, 0);
1406 }
1407
1408 #[test]
1409 fn test_context_child() {
1410 let node_id = test_node_id();
1411 let parent = Context::new(node_id);
1412 let child = parent.child("child_operation");
1413
1414 assert_eq!(child.trace_id, parent.trace_id);
1415 assert_eq!(child.parent_span_id, Some(parent.span_id));
1416 assert_eq!(child.hop_count, parent.hop_count);
1417 }
1418
1419 #[test]
1420 fn test_context_remote() {
1421 let node_id = test_node_id();
1422 let local = Context::new(node_id);
1423 let remote = local.for_remote();
1424
1425 assert_eq!(remote.trace_id, local.trace_id);
1426 assert_eq!(remote.parent_span_id, Some(local.span_id));
1427 assert_eq!(remote.hop_count, local.hop_count + 1);
1428 }
1429
1430 #[test]
1431 fn test_context_timeout() {
1432 let node_id = test_node_id();
1433 let ctx = Context::new(node_id).with_timeout(Duration::from_millis(100));
1434
1435 assert!(!ctx.is_expired());
1436 assert!(ctx.remaining().is_some());
1437
1438 let expired = Context::new(node_id).with_timeout(Duration::from_nanos(1));
1439 std::thread::sleep(Duration::from_millis(1));
1440 assert!(expired.is_expired());
1441 }
1442
1443 #[test]
1444 fn test_context_max_hops() {
1445 let node_id = test_node_id();
1446 let mut ctx = Context::new(node_id).with_max_hops(3);
1447
1448 assert!(!ctx.exceeded_hops());
1449
1450 ctx.hop_count = 3;
1451 assert!(ctx.exceeded_hops());
1452 }
1453
1454 #[test]
1455 fn test_traceparent() {
1456 let node_id = test_node_id();
1457 let ctx = Context::new(node_id);
1458 let traceparent = ctx.to_traceparent();
1459
1460 assert!(traceparent.starts_with("00-"));
1461
1462 let parsed = Context::from_traceparent(&traceparent, node_id).unwrap();
1463 assert_eq!(parsed.trace_id, ctx.trace_id);
1464 assert_eq!(parsed.parent_span_id, Some(ctx.span_id));
1465 assert_eq!(parsed.hop_count, 1);
1466 }
1467
1468 #[test]
1469 fn test_sampler_always_on() {
1470 let sampler = Sampler::new(SamplingStrategy::AlwaysOn);
1471 for _ in 0..100 {
1472 assert!(sampler.should_sample(None));
1473 }
1474 }
1475
1476 #[test]
1477 fn test_sampler_always_off() {
1478 let sampler = Sampler::new(SamplingStrategy::AlwaysOff);
1479 for _ in 0..100 {
1480 assert!(!sampler.should_sample(None));
1481 }
1482 }
1483
1484 #[test]
1485 fn test_sampler_parent_based() {
1486 let sampler = Sampler::new(SamplingStrategy::ParentBased);
1487 assert!(sampler.should_sample(Some(true)));
1488 assert!(!sampler.should_sample(Some(false)));
1489 assert!(sampler.should_sample(None)); }
1491
1492 #[test]
1493 fn test_context_store() {
1494 let store = ContextStore::new(100, 1000, Duration::from_secs(60))
1495 .with_sampling(SamplingStrategy::AlwaysOn);
1496
1497 let node_id = test_node_id();
1498 let ctx = store.create_context(node_id).unwrap();
1499
1500 assert!(store.get_context(&ctx.trace_id).is_some());
1501
1502 let mut span = Span::new(ctx.trace_id, "test", node_id);
1503 span.end();
1504 store.add_span(span).unwrap();
1505
1506 let spans = store.get_spans(&ctx.trace_id);
1507 assert_eq!(spans.len(), 1);
1508
1509 let (completed_ctx, completed_spans) = store.complete_trace(&ctx.trace_id).unwrap();
1510 assert_eq!(completed_ctx.trace_id, ctx.trace_id);
1511 assert_eq!(completed_spans.len(), 1);
1512
1513 assert!(store.get_context(&ctx.trace_id).is_none());
1514 }
1515
1516 #[test]
1517 fn test_propagation_context() {
1518 let node_id = test_node_id();
1519 let mut ctx = Context::new(node_id)
1520 .with_timeout(Duration::from_secs(30))
1521 .with_max_hops(10);
1522
1523 ctx.baggage.set("user", "alice");
1524 ctx.trace_state.insert("vendor".into(), "data".into());
1525
1526 let prop = PropagationContext::from_context(&ctx);
1527 let restored = prop.to_context(node_id).unwrap();
1528
1529 assert_eq!(restored.trace_id, ctx.trace_id);
1530 assert_eq!(restored.baggage.get("user"), Some("alice"));
1531 assert_eq!(restored.max_hops, Some(10));
1532 }
1533
1534 #[test]
1535 fn test_context_store_capacity() {
1536 let store = ContextStore::new(2, 10, Duration::from_secs(60))
1537 .with_sampling(SamplingStrategy::AlwaysOn);
1538
1539 let node_id = test_node_id();
1540
1541 let ctx1 = store.create_context(node_id).unwrap();
1542 let ctx2 = store.create_context(node_id).unwrap();
1543
1544 assert!(matches!(
1546 store.create_context(node_id),
1547 Err(ContextError::CapacityExceeded)
1548 ));
1549
1550 store.complete_trace(&ctx1.trace_id);
1552
1553 assert!(store.create_context(node_id).is_ok());
1555
1556 store.complete_trace(&ctx2.trace_id);
1558 }
1559
1560 #[test]
1570 fn create_context_concurrent_inserts_do_not_exceed_max_traces() {
1571 use std::sync::Arc;
1572 use std::thread;
1573
1574 const MAX_TRACES: usize = 32;
1575 let store = Arc::new(
1576 ContextStore::new(MAX_TRACES, 10, Duration::from_secs(60))
1577 .with_sampling(SamplingStrategy::AlwaysOn),
1578 );
1579
1580 let node_id = test_node_id();
1581 let n_threads = 16;
1582 let attempts_per_thread = 8; let barrier = Arc::new(std::sync::Barrier::new(n_threads));
1585 let mut handles = Vec::new();
1586 for _ in 0..n_threads {
1587 let store = store.clone();
1588 let barrier = barrier.clone();
1589 handles.push(thread::spawn(move || {
1590 barrier.wait();
1591 for _ in 0..attempts_per_thread {
1592 let _ = store.create_context(node_id);
1593 }
1594 }));
1595 }
1596 for h in handles {
1597 h.join().expect("thread panicked");
1598 }
1599
1600 let stats = store.stats();
1601 assert!(
1602 stats.active_traces <= MAX_TRACES as u64,
1603 "active_traces ({}) exceeded MAX_TRACES ({}) — admission gate \
1604 must hold under concurrent inserts",
1605 stats.active_traces,
1606 MAX_TRACES,
1607 );
1608 assert!(
1611 stats.dropped_traces > 0,
1612 "with 128 attempts and a cap of 32, some inserts must have been dropped",
1613 );
1614 }
1615
1616 #[test]
1625 fn continue_context_duplicate_trace_id_does_not_leak_capacity() {
1626 const MAX_TRACES: usize = 4;
1627 let store = ContextStore::new(MAX_TRACES, 10, Duration::from_secs(60))
1628 .with_sampling(SamplingStrategy::AlwaysOn);
1629 let node_id = test_node_id();
1630
1631 let ctx = Context::new(node_id);
1635 for _ in 0..(MAX_TRACES * 4) {
1636 store
1637 .continue_context(ctx.clone())
1638 .expect("duplicate continue_context must succeed");
1639 }
1640
1641 assert_eq!(
1643 store.stats().active_traces,
1644 1,
1645 "duplicate continue_context must not grow the map",
1646 );
1647
1648 for _ in 0..(MAX_TRACES - 1) {
1653 store
1654 .create_context(node_id)
1655 .expect("active_count must reflect map size, not duplicate-insert count");
1656 }
1657 }
1658
1659 #[test]
1665 fn complete_trace_re_admits_capacity() {
1666 let store = ContextStore::new(2, 10, Duration::from_secs(60))
1667 .with_sampling(SamplingStrategy::AlwaysOn);
1668 let node_id = test_node_id();
1669
1670 let ctx1 = store.create_context(node_id).unwrap();
1671 let _ctx2 = store.create_context(node_id).unwrap();
1672 assert!(matches!(
1674 store.create_context(node_id),
1675 Err(ContextError::CapacityExceeded)
1676 ));
1677
1678 store.complete_trace(&ctx1.trace_id);
1681 assert!(
1682 store.create_context(node_id).is_ok(),
1683 "complete_trace must release a slot for re-admission",
1684 );
1685 }
1686
1687 #[test]
1688 fn test_context_store_stats() {
1689 let store = ContextStore::new(100, 1000, Duration::from_secs(60))
1690 .with_sampling(SamplingStrategy::AlwaysOn);
1691
1692 let node_id = test_node_id();
1693
1694 let ctx = store.create_context(node_id).unwrap();
1695
1696 let mut span = Span::new(ctx.trace_id, "op1", node_id);
1697 span.end();
1698 store.add_span(span).unwrap();
1699
1700 let mut span2 = Span::new(ctx.trace_id, "op2", node_id);
1701 span2.end();
1702 store.add_span(span2).unwrap();
1703
1704 let stats = store.stats();
1705 assert_eq!(stats.active_traces, 1);
1706 assert_eq!(stats.total_spans, 2);
1707 assert_eq!(stats.sampled_traces, 1);
1708 }
1709
1710 #[test]
1722 fn cr14_sampling_skip_releases_reservation_via_drop_guard() {
1723 let store = ContextStore::new(8, 100, std::time::Duration::from_secs(60))
1726 .with_sampling(SamplingStrategy::AlwaysOff);
1727
1728 let node = test_node_id();
1729 for _ in 0..50 {
1730 let _ = store.create_context(node).unwrap();
1731 }
1732
1733 let stats = store.stats();
1734 assert_eq!(
1735 stats.active_traces, 0,
1736 "all 50 contexts were sampling-skipped; the SlotReservation \
1737 Drop guard must have released every reservation. Got \
1738 active_traces = {} (CR-14 regression).",
1739 stats.active_traces
1740 );
1741 }
1742
1743 #[test]
1754 fn cr14_panic_between_reserve_and_commit_releases_slot() {
1755 use std::panic::{catch_unwind, AssertUnwindSafe};
1756 use std::sync::atomic::Ordering;
1757
1758 let store = ContextStore::new(8, 100, std::time::Duration::from_secs(60));
1759 let initial_active = store.active_count.load(Ordering::Relaxed);
1760
1761 let result = catch_unwind(AssertUnwindSafe(|| {
1766 let _guard = store
1767 .try_reserve_slot()
1768 .expect("first reserve must succeed against an empty store");
1769 panic!("simulated mid-path failure");
1772 }));
1773
1774 assert!(result.is_err(), "the closure must have panicked");
1775 let after_active = store.active_count.load(Ordering::Relaxed);
1776 assert_eq!(
1777 after_active, initial_active,
1778 "CR-14 regression: panic between reserve and commit MUST roll \
1779 back the slot reservation via SlotReservation::drop. \
1780 Got active before={} after={}",
1781 initial_active, after_active
1782 );
1783 }
1784
1785 #[test]
1793 fn cr21_random_u64_must_not_panic_on_getrandom_failure() {
1794 let needle_expect = format!("getrandom::fill({}{})", "&mut bytes).", "expect");
1797 let needle_unwrap = format!("getrandom::fill({}{})", "&mut bytes).", "unwrap");
1798
1799 let src = include_str!("context.rs");
1800 for (lineno, line) in src.lines().enumerate() {
1801 let trimmed = line.trim_start();
1802 if trimmed.starts_with("//") {
1803 continue;
1804 }
1805 assert!(
1806 !trimmed.contains(&needle_expect),
1807 "CR-21 regression: getrandom::fill(...).expect(...) reintroduced \
1808 at context.rs:{}. Use the abort-on-fail pattern (fallible \
1809 writeln to stderr + std::process::abort).\n line: {}",
1810 lineno + 1,
1811 line
1812 );
1813 assert!(
1814 !trimmed.contains(&needle_unwrap),
1815 "CR-21 regression: getrandom::fill(...).unwrap() reintroduced \
1816 at context.rs:{}. Use the abort-on-fail pattern (fallible \
1817 writeln to stderr + std::process::abort).\n line: {}",
1818 lineno + 1,
1819 line
1820 );
1821 }
1822 }
1823
1824 #[test]
1827 fn span_with_parent_and_kind_set_fields() {
1828 let parent = SpanId::generate();
1829 let span = Span::new(TraceId::generate(), "child", test_node_id())
1830 .with_parent(parent)
1831 .with_kind(SpanKind::Server);
1832 assert_eq!(span.parent_span_id, Some(parent));
1833 assert_eq!(span.kind, SpanKind::Server);
1834 }
1835
1836 #[test]
1837 fn span_set_ok_and_set_error_update_status() {
1838 let mut span = Span::new(TraceId::generate(), "op", test_node_id());
1839 span.set_ok();
1840 assert!(matches!(span.status, SpanStatus::Ok));
1841
1842 span.set_error("boom");
1843 match &span.status {
1844 SpanStatus::Error { message } => assert_eq!(message, "boom"),
1845 other => panic!("expected Error, got {:?}", other),
1846 }
1847 }
1848
1849 #[test]
1850 fn span_add_event_with_attributes_and_add_link_populate_collections() {
1851 let mut span = Span::new(TraceId::generate(), "op", test_node_id());
1852
1853 let mut attrs = HashMap::new();
1854 attrs.insert("k".into(), AttributeValue::from("v"));
1855 span.add_event_with_attributes("evt", attrs);
1856 assert_eq!(span.events.len(), 1);
1857 assert_eq!(span.events[0].name, "evt");
1858 assert!(span.events[0].attributes.contains_key("k"));
1859
1860 let other_trace = TraceId::generate();
1861 let other_span = SpanId::generate();
1862 span.add_link(other_trace, other_span);
1863 assert_eq!(span.links.len(), 1);
1864 assert_eq!(span.links[0].trace_id, other_trace);
1865 assert_eq!(span.links[0].span_id, other_span);
1866 }
1867
1868 #[test]
1871 fn context_error_display_covers_every_variant() {
1872 assert_eq!(format!("{}", ContextError::Expired), "context has expired");
1873 assert_eq!(
1874 format!("{}", ContextError::MaxHopsExceeded),
1875 "maximum hops exceeded"
1876 );
1877 assert_eq!(format!("{}", ContextError::NotFound), "context not found");
1878 assert_eq!(
1879 format!("{}", ContextError::InvalidTraceId),
1880 "invalid trace ID"
1881 );
1882 assert_eq!(
1883 format!("{}", ContextError::CapacityExceeded),
1884 "storage capacity exceeded"
1885 );
1886 }
1887
1888 #[test]
1891 fn percent_codec_roundtrips_ascii_and_unicode_and_punctuation() {
1892 for input in [
1893 "",
1894 "plain",
1895 "with space",
1896 "weird/chars?&=",
1897 "trailing space ",
1898 "key=value;meta=other",
1899 "café",
1901 ] {
1902 let encoded = percent_encode(input);
1903 assert!(!encoded.contains(' '));
1905 let decoded =
1906 percent_decode(&encoded).unwrap_or_else(|| panic!("decode failed: {}", encoded));
1907 assert_eq!(decoded, input, "roundtrip mismatch for {input:?}");
1908 }
1909 }
1910
1911 #[test]
1912 fn percent_decode_rejects_truncated_hex_escape() {
1913 assert_eq!(percent_decode("%4"), None);
1917 assert_eq!(percent_decode("%ZZ"), None);
1919 }
1920
1921 fn store_with_always_on_sampler() -> ContextStore {
1929 ContextStore::new(64, 64, Duration::from_secs(60))
1930 .with_sampler(Sampler::new(SamplingStrategy::AlwaysOn))
1931 }
1932
1933 #[test]
1934 fn context_scope_drop_records_span_into_store() {
1935 let store = store_with_always_on_sampler();
1936 let ctx = store.create_context(test_node_id()).unwrap();
1937 let trace_id = ctx.trace_id;
1938
1939 assert!(store.get_spans(&trace_id).is_empty());
1941
1942 {
1945 let _scope = ContextScope::new(&store, &ctx, "auto", test_node_id());
1946 }
1947 let spans = store.get_spans(&trace_id);
1948 assert_eq!(spans.len(), 1, "Drop must push the span");
1949 assert!(spans[0].end_time_us.is_some(), "Drop must end() the span");
1950 }
1951
1952 #[test]
1953 fn context_scope_finish_records_span_and_suppresses_drop() {
1954 let store = store_with_always_on_sampler();
1955 let ctx = store.create_context(test_node_id()).unwrap();
1956 let trace_id = ctx.trace_id;
1957
1958 let mut scope = ContextScope::new(&store, &ctx, "explicit", test_node_id());
1959 scope.set_ok();
1960 scope.finish();
1961
1962 let spans = store.get_spans(&trace_id);
1965 assert_eq!(spans.len(), 1);
1966 assert!(matches!(spans[0].status, SpanStatus::Ok));
1967 }
1968}