Skip to main content

net/adapter/net/behavior/
context.rs

1//! Context Fabric (CTXT-FABRIC) - Phase 4F
2//!
3//! Provides distributed context propagation across the Net network:
4//! - Request context with trace IDs and spans
5//! - Context inheritance and propagation
6//! - Distributed baggage (key-value propagation)
7//! - Context scopes with automatic cleanup
8//! - Sampling and rate limiting for tracing
9
10use dashmap::DashMap;
11use parking_lot::Mutex;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
16
17use super::NodeId;
18
19/// Generate random bytes using getrandom.
20///
21/// Aborts on `getrandom` failure rather than panic-unwinding
22/// through the FFI boundary. Trace IDs are not directly
23/// auth-bearing, but this function is reachable from hot paths
24/// called by `extern "C"` FFI consumers (Python / Node / Go
25/// bindings) — a `getrandom` failure (kernel rng exhaustion,
26/// container-restricted /dev/urandom) that unwound across the C
27/// ABI would be undefined behaviour. `process::abort` is
28/// `extern "C"`-safe (terminates rather than unwinds) and
29/// loss-of-availability is the only safe response when the
30/// system can't produce randomness.
31///
32/// The diagnostic uses a fallible `writeln!` rather than
33/// `eprintln!` because the latter panics if the underlying
34/// stderr write fails (closed fd, sandboxed process). A panic
35/// here would defeat the whole point of the abort path —
36/// unwinding across the FFI boundary that we're trying to
37/// protect — so we ignore any write error and proceed straight
38/// to `abort()`.
39fn random_u64() -> u64 {
40    let mut bytes = [0u8; 8];
41    if let Err(e) = getrandom::fill(&mut bytes) {
42        use std::io::Write;
43        let _ = writeln!(
44            std::io::stderr(),
45            "FATAL: behavior::context::random_u64 getrandom failure ({e:?}); \
46             aborting to avoid panic across the FFI boundary"
47        );
48        std::process::abort();
49    }
50    u64::from_le_bytes(bytes)
51}
52
53/// Generate random f64 between 0.0 and 1.0
54fn random_f64() -> f64 {
55    let r = random_u64();
56    (r as f64) / (u64::MAX as f64)
57}
58
59/// Simple percent-encode a string for baggage propagation
60fn percent_encode(s: &str) -> String {
61    let mut result = String::with_capacity(s.len() * 3);
62    for b in s.bytes() {
63        match b {
64            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
65                result.push(b as char);
66            }
67            _ => {
68                result.push_str(&format!("%{:02X}", b));
69            }
70        }
71    }
72    result
73}
74
75/// Simple percent-decode a string
76fn percent_decode(s: &str) -> Option<String> {
77    let mut result = Vec::with_capacity(s.len());
78    let mut chars = s.chars().peekable();
79
80    while let Some(c) = chars.next() {
81        if c == '%' {
82            let hex: String = chars.by_ref().take(2).collect();
83            if hex.len() != 2 {
84                return None;
85            }
86            let byte = u8::from_str_radix(&hex, 16).ok()?;
87            result.push(byte);
88        } else {
89            result.push(c as u8);
90        }
91    }
92
93    String::from_utf8(result).ok()
94}
95
96/// Unique trace identifier (128-bit for W3C compatibility)
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
98pub struct TraceId {
99    /// High 64 bits of the 128-bit trace ID
100    pub high: u64,
101    /// Low 64 bits of the 128-bit trace ID
102    pub low: u64,
103}
104
105impl TraceId {
106    /// Generate a new random trace ID
107    pub fn generate() -> Self {
108        Self {
109            high: random_u64(),
110            low: random_u64(),
111        }
112    }
113
114    /// Create from hex string (32 characters)
115    pub fn from_hex(s: &str) -> Option<Self> {
116        if s.len() != 32 {
117            return None;
118        }
119        let high = u64::from_str_radix(&s[0..16], 16).ok()?;
120        let low = u64::from_str_radix(&s[16..32], 16).ok()?;
121        Some(Self { high, low })
122    }
123
124    /// Convert to hex string
125    pub fn to_hex(&self) -> String {
126        format!("{:016x}{:016x}", self.high, self.low)
127    }
128}
129
130impl Default for TraceId {
131    fn default() -> Self {
132        Self::generate()
133    }
134}
135
136/// Unique span identifier (64-bit)
137#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
138pub struct SpanId(pub u64);
139
140impl SpanId {
141    /// Generate a new random span ID
142    pub fn generate() -> Self {
143        Self(random_u64())
144    }
145
146    /// Parse a span ID from a 16-character hex string
147    pub fn from_hex(s: &str) -> Option<Self> {
148        u64::from_str_radix(s, 16).ok().map(Self)
149    }
150
151    /// Convert to hex string
152    pub fn to_hex(&self) -> String {
153        format!("{:016x}", self.0)
154    }
155}
156
157/// Trace flags (W3C compatible)
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
159pub struct TraceFlags(pub u8);
160
161impl TraceFlags {
162    /// Bit flag indicating the trace is sampled
163    pub const SAMPLED: u8 = 0x01;
164
165    /// Create flags with the sampled bit set
166    pub fn sampled() -> Self {
167        Self(Self::SAMPLED)
168    }
169
170    /// Create flags with no bits set (not sampled)
171    pub fn not_sampled() -> Self {
172        Self(0)
173    }
174
175    /// Returns true if the sampled flag is set
176    pub fn is_sampled(&self) -> bool {
177        self.0 & Self::SAMPLED != 0
178    }
179}
180
181impl Default for TraceFlags {
182    fn default() -> Self {
183        Self::sampled()
184    }
185}
186
187/// Span kind (role in the trace)
188#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
189pub enum SpanKind {
190    /// Internal operation
191    #[default]
192    Internal,
193    /// Server handling a request
194    Server,
195    /// Client making a request
196    Client,
197    /// Producer sending a message
198    Producer,
199    /// Consumer receiving a message
200    Consumer,
201}
202
203/// Span status
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
205pub enum SpanStatus {
206    /// Status has not been set
207    #[default]
208    Unset,
209    /// Span completed successfully
210    Ok,
211    /// Span completed with an error
212    Error {
213        /// Human-readable error description
214        message: String,
215    },
216}
217
218/// A span represents a unit of work in a trace
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct Span {
221    /// Unique span ID
222    pub span_id: SpanId,
223    /// Parent span ID (None for root spans)
224    pub parent_span_id: Option<SpanId>,
225    /// Trace this span belongs to
226    pub trace_id: TraceId,
227    /// Human-readable name
228    pub name: String,
229    /// Kind of span
230    pub kind: SpanKind,
231    /// Start timestamp (microseconds since Unix epoch)
232    pub start_time_us: u64,
233    /// End timestamp (microseconds since Unix epoch)
234    pub end_time_us: Option<u64>,
235    /// Span attributes
236    pub attributes: HashMap<String, AttributeValue>,
237    /// Status
238    pub status: SpanStatus,
239    /// Events that occurred during the span
240    pub events: Vec<SpanEvent>,
241    /// Links to other spans
242    pub links: Vec<SpanLink>,
243    /// Node that created this span
244    pub node_id: NodeId,
245}
246
247impl Span {
248    /// Create a new root span within the given trace
249    pub fn new(trace_id: TraceId, name: impl Into<String>, node_id: NodeId) -> Self {
250        Self {
251            span_id: SpanId::generate(),
252            parent_span_id: None,
253            trace_id,
254            name: name.into(),
255            kind: SpanKind::Internal,
256            start_time_us: now_micros(),
257            end_time_us: None,
258            attributes: HashMap::new(),
259            status: SpanStatus::Unset,
260            events: Vec::new(),
261            links: Vec::new(),
262            node_id,
263        }
264    }
265
266    /// Set the parent span ID on this span
267    pub fn with_parent(mut self, parent: SpanId) -> Self {
268        self.parent_span_id = Some(parent);
269        self
270    }
271
272    /// Set the kind of this span
273    pub fn with_kind(mut self, kind: SpanKind) -> Self {
274        self.kind = kind;
275        self
276    }
277
278    /// Insert a key-value attribute on this span
279    pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<AttributeValue>) {
280        self.attributes.insert(key.into(), value.into());
281    }
282
283    /// Record a named event on this span at the current time
284    pub fn add_event(&mut self, name: impl Into<String>) {
285        self.events.push(SpanEvent {
286            name: name.into(),
287            timestamp_us: now_micros(),
288            attributes: HashMap::new(),
289        });
290    }
291
292    /// Record a named event with additional attributes on this span
293    pub fn add_event_with_attributes(
294        &mut self,
295        name: impl Into<String>,
296        attributes: HashMap<String, AttributeValue>,
297    ) {
298        self.events.push(SpanEvent {
299            name: name.into(),
300            timestamp_us: now_micros(),
301            attributes,
302        });
303    }
304
305    /// Add a causal link to another span in a different trace
306    pub fn add_link(&mut self, trace_id: TraceId, span_id: SpanId) {
307        self.links.push(SpanLink {
308            trace_id,
309            span_id,
310            attributes: HashMap::new(),
311        });
312    }
313
314    /// Mark this span as successfully completed
315    pub fn set_ok(&mut self) {
316        self.status = SpanStatus::Ok;
317    }
318
319    /// Mark this span as failed with the given error message
320    pub fn set_error(&mut self, message: impl Into<String>) {
321        self.status = SpanStatus::Error {
322            message: message.into(),
323        };
324    }
325
326    /// Record the end timestamp if not already set
327    pub fn end(&mut self) {
328        if self.end_time_us.is_none() {
329            self.end_time_us = Some(now_micros());
330        }
331    }
332
333    /// Return the elapsed duration in microseconds, if the span has ended
334    pub fn duration_us(&self) -> Option<u64> {
335        self.end_time_us
336            .map(|end| end.saturating_sub(self.start_time_us))
337    }
338}
339
340/// Attribute value types
341#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
342pub enum AttributeValue {
343    /// A UTF-8 string value
344    String(String),
345    /// A signed 64-bit integer value
346    Int(i64),
347    /// A 64-bit floating-point value
348    Float(f64),
349    /// A boolean value
350    Bool(bool),
351    /// An array of UTF-8 string values
352    StringArray(Vec<String>),
353    /// An array of signed 64-bit integer values
354    IntArray(Vec<i64>),
355    /// An array of 64-bit floating-point values
356    FloatArray(Vec<f64>),
357    /// An array of boolean values
358    BoolArray(Vec<bool>),
359}
360
361impl From<String> for AttributeValue {
362    fn from(s: String) -> Self {
363        Self::String(s)
364    }
365}
366
367impl From<&str> for AttributeValue {
368    fn from(s: &str) -> Self {
369        Self::String(s.to_string())
370    }
371}
372
373impl From<i64> for AttributeValue {
374    fn from(n: i64) -> Self {
375        Self::Int(n)
376    }
377}
378
379impl From<i32> for AttributeValue {
380    fn from(n: i32) -> Self {
381        Self::Int(n as i64)
382    }
383}
384
385impl From<f64> for AttributeValue {
386    fn from(n: f64) -> Self {
387        Self::Float(n)
388    }
389}
390
391impl From<bool> for AttributeValue {
392    fn from(b: bool) -> Self {
393        Self::Bool(b)
394    }
395}
396
397/// An event that occurred during a span
398#[derive(Debug, Clone, Serialize, Deserialize)]
399pub struct SpanEvent {
400    /// Human-readable event name
401    pub name: String,
402    /// Timestamp when the event occurred (microseconds since Unix epoch)
403    pub timestamp_us: u64,
404    /// Additional attributes describing the event
405    pub attributes: HashMap<String, AttributeValue>,
406}
407
408/// A link to another span (e.g., for batched operations)
409#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct SpanLink {
411    /// Trace ID of the linked span
412    pub trace_id: TraceId,
413    /// Span ID of the linked span
414    pub span_id: SpanId,
415    /// Optional attributes describing the relationship
416    pub attributes: HashMap<String, AttributeValue>,
417}
418
419/// Baggage is key-value pairs that propagate across the network
420#[derive(Debug, Clone, Default, Serialize, Deserialize)]
421pub struct Baggage {
422    items: HashMap<String, BaggageItem>,
423}
424
425/// A single key-value entry carried in distributed baggage
426#[derive(Debug, Clone, Serialize, Deserialize)]
427pub struct BaggageItem {
428    /// The propagated string value for this baggage entry
429    pub value: String,
430    /// Optional properties metadata associated with this entry
431    pub metadata: Option<String>,
432}
433
434impl Baggage {
435    /// Create a new empty baggage container
436    pub fn new() -> Self {
437        Self::default()
438    }
439
440    /// Insert or replace a baggage entry by key
441    pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) {
442        self.items.insert(
443            key.into(),
444            BaggageItem {
445                value: value.into(),
446                metadata: None,
447            },
448        );
449    }
450
451    /// Insert or replace a baggage entry with associated metadata properties
452    pub fn set_with_metadata(
453        &mut self,
454        key: impl Into<String>,
455        value: impl Into<String>,
456        metadata: impl Into<String>,
457    ) {
458        self.items.insert(
459            key.into(),
460            BaggageItem {
461                value: value.into(),
462                metadata: Some(metadata.into()),
463            },
464        );
465    }
466
467    /// Look up a baggage value by key
468    pub fn get(&self, key: &str) -> Option<&str> {
469        self.items.get(key).map(|item| item.value.as_str())
470    }
471
472    /// Look up a baggage value and its optional metadata by key
473    pub fn get_with_metadata(&self, key: &str) -> Option<(&str, Option<&str>)> {
474        self.items
475            .get(key)
476            .map(|item| (item.value.as_str(), item.metadata.as_deref()))
477    }
478
479    /// Remove a baggage entry by key and return its value
480    pub fn remove(&mut self, key: &str) -> Option<String> {
481        self.items.remove(key).map(|item| item.value)
482    }
483
484    /// Iterate over all baggage entries as (key, value) pairs
485    pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
486        self.items
487            .iter()
488            .map(|(k, v)| (k.as_str(), v.value.as_str()))
489    }
490
491    /// Return the number of baggage entries
492    pub fn len(&self) -> usize {
493        self.items.len()
494    }
495
496    /// Return true if there are no baggage entries
497    pub fn is_empty(&self) -> bool {
498        self.items.is_empty()
499    }
500
501    /// Merge another baggage into this one (other takes precedence)
502    pub fn merge(&mut self, other: &Baggage) {
503        for (key, item) in &other.items {
504            self.items.insert(key.clone(), item.clone());
505        }
506    }
507}
508
509/// Request context that propagates across the network
510#[derive(Debug, Clone, Serialize, Deserialize)]
511pub struct Context {
512    /// Trace ID for distributed tracing
513    pub trace_id: TraceId,
514    /// Current span ID
515    pub span_id: SpanId,
516    /// Parent span ID (for hierarchy)
517    pub parent_span_id: Option<SpanId>,
518    /// Trace flags
519    pub trace_flags: TraceFlags,
520    /// Trace state (vendor-specific key-value pairs)
521    pub trace_state: HashMap<String, String>,
522    /// Baggage that propagates with the request
523    pub baggage: Baggage,
524    /// Deadline for this request (microseconds since Unix epoch)
525    pub deadline_us: Option<u64>,
526    /// Originating node
527    pub origin_node: NodeId,
528    /// Request ID (application-level)
529    pub request_id: Option<String>,
530    /// Correlation ID (for related requests)
531    pub correlation_id: Option<String>,
532    /// Hop count (increases with each network hop)
533    pub hop_count: u32,
534    /// Maximum allowed hops
535    pub max_hops: Option<u32>,
536}
537
538impl Context {
539    /// Create a new root context originating from the given node
540    pub fn new(origin_node: NodeId) -> Self {
541        Self {
542            trace_id: TraceId::generate(),
543            span_id: SpanId::generate(),
544            parent_span_id: None,
545            trace_flags: TraceFlags::sampled(),
546            trace_state: HashMap::new(),
547            baggage: Baggage::new(),
548            deadline_us: None,
549            origin_node,
550            request_id: None,
551            correlation_id: None,
552            hop_count: 0,
553            max_hops: None,
554        }
555    }
556
557    /// Create a child context for a new span
558    pub fn child(&self, new_span_name: &str) -> Self {
559        let _ = new_span_name; // Used for logging/tracing, not stored in context
560        Self {
561            trace_id: self.trace_id,
562            span_id: SpanId::generate(),
563            parent_span_id: Some(self.span_id),
564            trace_flags: self.trace_flags,
565            trace_state: self.trace_state.clone(),
566            baggage: self.baggage.clone(),
567            deadline_us: self.deadline_us,
568            origin_node: self.origin_node,
569            request_id: self.request_id.clone(),
570            correlation_id: self.correlation_id.clone(),
571            hop_count: self.hop_count,
572            max_hops: self.max_hops,
573        }
574    }
575
576    /// Create a context for sending to another node
577    pub fn for_remote(&self) -> Self {
578        Self {
579            trace_id: self.trace_id,
580            span_id: SpanId::generate(),
581            parent_span_id: Some(self.span_id),
582            trace_flags: self.trace_flags,
583            trace_state: self.trace_state.clone(),
584            baggage: self.baggage.clone(),
585            deadline_us: self.deadline_us,
586            origin_node: self.origin_node,
587            request_id: self.request_id.clone(),
588            correlation_id: self.correlation_id.clone(),
589            hop_count: self.hop_count.saturating_add(1),
590            max_hops: self.max_hops,
591        }
592    }
593
594    /// Set a timeout from now
595    pub fn with_timeout(mut self, timeout: Duration) -> Self {
596        self.deadline_us = Some(now_micros() + timeout.as_micros() as u64);
597        self
598    }
599
600    /// Set an absolute deadline
601    pub fn with_deadline(mut self, deadline_us: u64) -> Self {
602        self.deadline_us = Some(deadline_us);
603        self
604    }
605
606    /// Check if the context has expired
607    pub fn is_expired(&self) -> bool {
608        self.deadline_us
609            .map(|deadline| now_micros() > deadline)
610            .unwrap_or(false)
611    }
612
613    /// Get remaining time until deadline
614    pub fn remaining(&self) -> Option<Duration> {
615        self.deadline_us.and_then(|deadline| {
616            let now = now_micros();
617            if now >= deadline {
618                None
619            } else {
620                Some(Duration::from_micros(deadline - now))
621            }
622        })
623    }
624
625    /// Check if we've exceeded max hops
626    pub fn exceeded_hops(&self) -> bool {
627        self.max_hops
628            .map(|max| self.hop_count >= max)
629            .unwrap_or(false)
630    }
631
632    /// Set max hops
633    pub fn with_max_hops(mut self, max: u32) -> Self {
634        self.max_hops = Some(max);
635        self
636    }
637
638    /// Set request ID
639    pub fn with_request_id(mut self, id: impl Into<String>) -> Self {
640        self.request_id = Some(id.into());
641        self
642    }
643
644    /// Set correlation ID
645    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
646        self.correlation_id = Some(id.into());
647        self
648    }
649
650    /// Encode to W3C traceparent header format
651    pub fn to_traceparent(&self) -> String {
652        format!(
653            "00-{}-{}-{:02x}",
654            self.trace_id.to_hex(),
655            self.span_id.to_hex(),
656            self.trace_flags.0
657        )
658    }
659
660    /// Parse from W3C traceparent header
661    pub fn from_traceparent(header: &str, origin_node: NodeId) -> Option<Self> {
662        let parts: Vec<&str> = header.split('-').collect();
663        if parts.len() != 4 || parts[0] != "00" {
664            return None;
665        }
666
667        let trace_id = TraceId::from_hex(parts[1])?;
668        let span_id = SpanId::from_hex(parts[2])?;
669        let flags = u8::from_str_radix(parts[3], 16).ok()?;
670
671        Some(Self {
672            trace_id,
673            span_id: SpanId::generate(),
674            parent_span_id: Some(span_id),
675            trace_flags: TraceFlags(flags),
676            trace_state: HashMap::new(),
677            baggage: Baggage::new(),
678            deadline_us: None,
679            origin_node,
680            request_id: None,
681            correlation_id: None,
682            hop_count: 1,
683            max_hops: None,
684        })
685    }
686}
687
688/// Sampling strategy for traces
689#[derive(Debug, Clone, Serialize, Deserialize)]
690pub enum SamplingStrategy {
691    /// Always sample
692    AlwaysOn,
693    /// Never sample
694    AlwaysOff,
695    /// Sample a fixed ratio (0.0 to 1.0)
696    Ratio(f64),
697    /// Sample based on rate limit (max per second)
698    RateLimited {
699        /// Maximum number of traces to sample per second
700        max_per_second: u32,
701    },
702    /// Parent-based sampling (inherit from parent)
703    ParentBased,
704    /// Custom sampler by name
705    Custom(String),
706}
707
708impl Default for SamplingStrategy {
709    fn default() -> Self {
710        Self::Ratio(0.1) // 10% default sampling
711    }
712}
713
714/// Sampler that decides whether to sample a trace
715#[derive(Debug)]
716pub struct Sampler {
717    strategy: SamplingStrategy,
718    count: AtomicU64,
719    last_reset: Mutex<Instant>,
720}
721
722impl Sampler {
723    /// Create a new sampler with the given strategy
724    pub fn new(strategy: SamplingStrategy) -> Self {
725        Self {
726            strategy,
727            count: AtomicU64::new(0),
728            last_reset: Mutex::new(Instant::now()),
729        }
730    }
731
732    /// Decide whether to sample a new trace, given the parent's sampling decision
733    pub fn should_sample(&self, parent_sampled: Option<bool>) -> bool {
734        match &self.strategy {
735            SamplingStrategy::AlwaysOn => true,
736            SamplingStrategy::AlwaysOff => false,
737            SamplingStrategy::Ratio(ratio) => random_f64() < *ratio,
738            SamplingStrategy::RateLimited { max_per_second } => {
739                let mut last_reset = self.last_reset.lock();
740                let now = Instant::now();
741
742                // Reset counter every second
743                if now.duration_since(*last_reset) >= Duration::from_secs(1) {
744                    self.count.store(0, Ordering::Relaxed);
745                    *last_reset = now;
746                }
747
748                let current = self.count.fetch_add(1, Ordering::Relaxed);
749                current < *max_per_second as u64
750            }
751            SamplingStrategy::ParentBased => parent_sampled.unwrap_or(true),
752            SamplingStrategy::Custom(_) => true, // Custom samplers default to true
753        }
754    }
755}
756
757/// Context error types
758#[derive(Debug, Clone, PartialEq, Eq)]
759pub enum ContextError {
760    /// Context has expired
761    Expired,
762    /// Maximum hops exceeded
763    MaxHopsExceeded,
764    /// Context not found
765    NotFound,
766    /// Invalid trace ID
767    InvalidTraceId,
768    /// Storage capacity exceeded
769    CapacityExceeded,
770}
771
772impl std::fmt::Display for ContextError {
773    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
774        match self {
775            Self::Expired => write!(f, "context has expired"),
776            Self::MaxHopsExceeded => write!(f, "maximum hops exceeded"),
777            Self::NotFound => write!(f, "context not found"),
778            Self::InvalidTraceId => write!(f, "invalid trace ID"),
779            Self::CapacityExceeded => write!(f, "storage capacity exceeded"),
780        }
781    }
782}
783
784impl std::error::Error for ContextError {}
785
786/// Entry in the context store with metadata
787#[derive(Debug)]
788struct ContextEntry {
789    context: Context,
790    created_at: Instant,
791    spans: Vec<Span>,
792}
793
794/// Statistics for the context store
795#[derive(Debug, Clone, Default)]
796pub struct ContextStoreStats {
797    /// Number of traces currently being tracked
798    pub active_traces: u64,
799    /// Total number of spans across all active traces
800    pub total_spans: u64,
801    /// Cumulative count of traces that were sampled
802    pub sampled_traces: u64,
803    /// Cumulative count of traces dropped due to capacity limits
804    pub dropped_traces: u64,
805    /// Cumulative count of traces removed due to TTL expiry
806    pub expired_traces: u64,
807}
808
809/// Store for active contexts and traces
810pub struct ContextStore {
811    /// Active contexts by trace ID
812    contexts: DashMap<TraceId, ContextEntry>,
813    /// Maximum number of traces to store
814    max_traces: usize,
815    /// Maximum spans per trace
816    max_spans_per_trace: usize,
817    /// TTL for traces
818    trace_ttl: Duration,
819    /// Sampler
820    sampler: Sampler,
821    /// Stats
822    sampled_count: AtomicU64,
823    dropped_count: AtomicU64,
824    expired_count: AtomicU64,
825    /// Authoritative atomic counter so the "is the store full?"
826    /// check can be a CAS-with-cap rather than a
827    /// `dashmap.len() >= max` racy probe. Bumped on insert via
828    /// `try_reserve_slot` (CAS), decremented on eviction
829    /// (`cleanup_expired`, explicit removal). DashMap's own
830    /// `len()` is the source of truth for queries; this counter
831    /// exists only to gate admission atomically.
832    active_count: std::sync::atomic::AtomicUsize,
833}
834
835impl ContextStore {
836    /// Create a new store with the given capacity limits and TTL
837    pub fn new(max_traces: usize, max_spans_per_trace: usize, trace_ttl: Duration) -> Self {
838        Self {
839            contexts: DashMap::new(),
840            max_traces,
841            max_spans_per_trace,
842            trace_ttl,
843            sampler: Sampler::new(SamplingStrategy::default()),
844            sampled_count: AtomicU64::new(0),
845            dropped_count: AtomicU64::new(0),
846            expired_count: AtomicU64::new(0),
847            active_count: std::sync::atomic::AtomicUsize::new(0),
848        }
849    }
850
851    /// Replace the default sampler. Useful when an operator wants
852    /// deterministic capture (e.g. `AlwaysOn` for diagnostics) or
853    /// a custom ratio different from the default.
854    pub fn with_sampler(mut self, sampler: Sampler) -> Self {
855        self.sampler = sampler;
856        self
857    }
858
859    /// Atomically reserve a slot if `active_count < max_traces`.
860    ///
861    /// Returns an [`Option<SlotReservation<'_>>`]; the `Some` arm
862    /// carries an RAII guard whose `Drop` releases the reservation
863    /// automatically. The success-path caller MUST invoke
864    /// [`SlotReservation::commit`] to keep the slot once the
865    /// matching insert lands. Any other exit (early return, error,
866    /// panic between reserve and insert) drops the guard, which
867    /// undoes the reservation atomically. A `bool` return with a
868    /// manual `release_slot` call on every failure path would be
869    /// easy to miss and would leak a slot permanently across an
870    /// `active_count` underflow guard.
871    ///
872    /// This is the admission gate. A `dashmap.len() >= max` probe
873    /// would lose the race against concurrent inserters.
874    fn try_reserve_slot(&self) -> Option<SlotReservation<'_>> {
875        use std::sync::atomic::Ordering;
876        // Fetch-update CAS loop: only commit if `cur < max`.
877        let ok = self
878            .active_count
879            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |cur| {
880                if cur < self.max_traces {
881                    Some(cur + 1)
882                } else {
883                    None
884                }
885            })
886            .is_ok();
887        if ok {
888            Some(SlotReservation { store: self })
889        } else {
890            None
891        }
892    }
893
894    /// Release a slot reserved by `try_reserve_slot` — used for the
895    /// post-insert duplicate-trace-id detection path in
896    /// `continue_context` (where the reservation is committed but
897    /// the matching insert turned out to be a no-op overwrite of
898    /// an existing entry). Most callers should rely on
899    /// [`SlotReservation`]'s automatic Drop release instead.
900    fn release_slot(&self) {
901        use std::sync::atomic::Ordering;
902        self.active_count
903            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |cur| {
904                Some(cur.saturating_sub(1))
905            })
906            .ok();
907    }
908}
909
910/// RAII guard returned by [`ContextStore::try_reserve_slot`].
911/// The Drop impl decrements `active_count` UNLESS [`Self::commit`]
912/// was called first (which `mem::forget`-equivalent the guard, so
913/// no decrement runs).
914///
915/// Pattern:
916/// ```ignore
917/// let guard = self.try_reserve_slot()?;       // reserve
918/// // ... do work that may panic / early-return ...
919/// // success path:
920/// guard.commit();                              // keep the slot
921/// ```
922///
923/// On any non-commit exit (panic, error, early return) the slot
924/// reservation is rolled back automatically — the admission cap
925/// stays accurate even when the matching insert never lands.
926pub(super) struct SlotReservation<'a> {
927    store: &'a ContextStore,
928}
929
930impl<'a> SlotReservation<'a> {
931    /// Forget the guard so its Drop does NOT release the slot.
932    /// Call this only on the success path AFTER the matching
933    /// insert has landed.
934    fn commit(self) {
935        // `mem::forget` skips the Drop impl; the reserved slot
936        // stays counted against `active_count`, where it
937        // correctly reflects the live entry.
938        std::mem::forget(self);
939    }
940}
941
942impl<'a> Drop for SlotReservation<'a> {
943    fn drop(&mut self) {
944        // Roll back the reservation. `release_slot` is itself
945        // saturating so a double-Drop (which `mem::forget`
946        // already prevents structurally) would not underflow.
947        self.store.release_slot();
948    }
949}
950
951impl ContextStore {
952    /// Override the default sampling strategy for this store
953    pub fn with_sampling(mut self, strategy: SamplingStrategy) -> Self {
954        self.sampler = Sampler::new(strategy);
955        self
956    }
957
958    /// Create a new context and register it
959    ///
960    /// Capacity admission goes through the atomic
961    /// `try_reserve_slot` CAS rather than a `contexts.len() >= max`
962    /// probe. Two threads inserting at exactly capacity could
963    /// otherwise both observe `len < max` after a
964    /// `cleanup_expired` and both insert, growing the map past
965    /// `max_traces`. The slot is reserved atomically before the
966    /// insert; if the reserve fails after a `cleanup_expired`
967    /// retry, we surface `CapacityExceeded`.
968    pub fn create_context(&self, origin_node: NodeId) -> Result<Context, ContextError> {
969        // Hold the reservation as a RAII guard. Any path out
970        // before `guard.commit()` (early return, panic in
971        // `Context::new` / `should_sample`, future refactor that
972        // adds another fallible step) drops the guard and the
973        // slot is released automatically.
974        let guard = match self.try_reserve_slot() {
975            Some(g) => g,
976            None => {
977                self.cleanup_expired();
978                match self.try_reserve_slot() {
979                    Some(g) => g,
980                    None => {
981                        self.dropped_count.fetch_add(1, Ordering::Relaxed);
982                        return Err(ContextError::CapacityExceeded);
983                    }
984                }
985            }
986        };
987
988        let ctx = Context::new(origin_node);
989
990        // Check if we should sample this trace
991        if !self.sampler.should_sample(None) {
992            let mut unsampled = ctx.clone();
993            unsampled.trace_flags = TraceFlags::not_sampled();
994            // Sampling-skip path: no insert happens — `guard`'s
995            // Drop releases the slot. No manual release needed.
996            return Ok(unsampled);
997        }
998
999        self.sampled_count.fetch_add(1, Ordering::Relaxed);
1000
1001        self.contexts.insert(
1002            ctx.trace_id,
1003            ContextEntry {
1004                context: ctx.clone(),
1005                created_at: Instant::now(),
1006                spans: Vec::new(),
1007            },
1008        );
1009
1010        // Insert succeeded — commit the reservation so its slot
1011        // stays counted against `active_count`.
1012        guard.commit();
1013
1014        Ok(ctx)
1015    }
1016
1017    /// Continue a context from a remote node
1018    pub fn continue_context(&self, ctx: Context) -> Result<Context, ContextError> {
1019        // Check if expired
1020        if ctx.is_expired() {
1021            return Err(ContextError::Expired);
1022        }
1023
1024        // Check hop count
1025        if ctx.exceeded_hops() {
1026            return Err(ContextError::MaxHopsExceeded);
1027        }
1028
1029        // If already tracking this trace, just return
1030        if self.contexts.contains_key(&ctx.trace_id) {
1031            return Ok(ctx);
1032        }
1033
1034        // RAII reserve. Drop releases on any non-commit exit
1035        // (sampling-skip, panic, error).
1036        let guard = match self.try_reserve_slot() {
1037            Some(g) => g,
1038            None => {
1039                self.cleanup_expired();
1040                match self.try_reserve_slot() {
1041                    Some(g) => g,
1042                    None => {
1043                        self.dropped_count.fetch_add(1, Ordering::Relaxed);
1044                        return Err(ContextError::CapacityExceeded);
1045                    }
1046                }
1047            }
1048        };
1049
1050        // Check sampling (parent-based)
1051        if !self
1052            .sampler
1053            .should_sample(Some(ctx.trace_flags.is_sampled()))
1054        {
1055            // Sampling-skip path — `guard` Drop releases the slot.
1056            return Ok(ctx);
1057        }
1058
1059        self.sampled_count.fetch_add(1, Ordering::Relaxed);
1060
1061        // Two threads racing on the same `trace_id` both pass the
1062        // `contains_key` check above (TOCTOU) and both reserve a
1063        // slot via `try_reserve_slot`. `DashMap::insert` overwrites
1064        // the existing entry and returns the prior value; the
1065        // losing thread did not actually grow the map, so its
1066        // reservation is a leak. Commit the guard FIRST (so we
1067        // own the reservation) then inspect the return: when a
1068        // prior entry existed, manually release one slot to keep
1069        // `active_count` in lockstep with the map size.
1070        let prev = self.contexts.insert(
1071            ctx.trace_id,
1072            ContextEntry {
1073                context: ctx.clone(),
1074                created_at: Instant::now(),
1075                spans: Vec::new(),
1076            },
1077        );
1078        guard.commit();
1079        if prev.is_some() {
1080            // Insert was an overwrite, not a growth — undo the
1081            // reservation we just committed.
1082            self.release_slot();
1083        }
1084
1085        Ok(ctx)
1086    }
1087
1088    /// Add a span to a trace
1089    pub fn add_span(&self, span: Span) -> Result<(), ContextError> {
1090        if let Some(mut entry) = self.contexts.get_mut(&span.trace_id) {
1091            if entry.spans.len() < self.max_spans_per_trace {
1092                entry.spans.push(span);
1093            }
1094            Ok(())
1095        } else {
1096            Err(ContextError::NotFound)
1097        }
1098    }
1099
1100    /// Get a context by trace ID
1101    pub fn get_context(&self, trace_id: &TraceId) -> Option<Context> {
1102        self.contexts
1103            .get(trace_id)
1104            .map(|entry| entry.context.clone())
1105    }
1106
1107    /// Get all spans for a trace
1108    pub fn get_spans(&self, trace_id: &TraceId) -> Vec<Span> {
1109        self.contexts
1110            .get(trace_id)
1111            .map(|entry| entry.spans.clone())
1112            .unwrap_or_default()
1113    }
1114
1115    /// Complete a trace and return all spans
1116    ///
1117    /// Also releases the `active_count` slot so the
1118    /// `try_reserve_slot` admission gate can re-admit.
1119    pub fn complete_trace(&self, trace_id: &TraceId) -> Option<(Context, Vec<Span>)> {
1120        let removed = self
1121            .contexts
1122            .remove(trace_id)
1123            .map(|(_, entry)| (entry.context, entry.spans));
1124        if removed.is_some() {
1125            self.release_slot();
1126        }
1127        removed
1128    }
1129
1130    /// Cleanup expired traces
1131    ///
1132    /// Every successful removal also releases an `active_count`
1133    /// slot so the `try_reserve_slot` admission gate can re-admit
1134    /// work as soon as expired entries are reclaimed.
1135    pub fn cleanup_expired(&self) {
1136        let now = Instant::now();
1137        let mut expired = Vec::new();
1138
1139        for entry in self.contexts.iter() {
1140            if now.duration_since(entry.created_at) > self.trace_ttl {
1141                expired.push(*entry.key());
1142            }
1143        }
1144
1145        for trace_id in expired {
1146            if self.contexts.remove(&trace_id).is_some() {
1147                self.expired_count.fetch_add(1, Ordering::Relaxed);
1148                self.release_slot();
1149            }
1150        }
1151    }
1152
1153    /// Get statistics
1154    pub fn stats(&self) -> ContextStoreStats {
1155        let mut total_spans = 0;
1156        for entry in self.contexts.iter() {
1157            total_spans += entry.spans.len() as u64;
1158        }
1159
1160        ContextStoreStats {
1161            active_traces: self.contexts.len() as u64,
1162            total_spans,
1163            sampled_traces: self.sampled_count.load(Ordering::Relaxed),
1164            dropped_traces: self.dropped_count.load(Ordering::Relaxed),
1165            expired_traces: self.expired_count.load(Ordering::Relaxed),
1166        }
1167    }
1168}
1169
1170/// Context scope for automatic span management
1171pub struct ContextScope<'a> {
1172    store: &'a ContextStore,
1173    span: Span,
1174    finished: bool,
1175}
1176
1177impl<'a> ContextScope<'a> {
1178    /// Create a new scope that automatically records a span on drop
1179    pub fn new(store: &'a ContextStore, ctx: &Context, name: &str, node_id: NodeId) -> Self {
1180        let mut span = Span::new(ctx.trace_id, name, node_id);
1181        if let Some(parent) = ctx.parent_span_id {
1182            span = span.with_parent(parent);
1183        }
1184
1185        Self {
1186            store,
1187            span,
1188            finished: false,
1189        }
1190    }
1191
1192    /// Set the kind of the underlying span
1193    pub fn with_kind(mut self, kind: SpanKind) -> Self {
1194        self.span.kind = kind;
1195        self
1196    }
1197
1198    /// Set an attribute on the underlying span
1199    pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<AttributeValue>) {
1200        self.span.set_attribute(key, value);
1201    }
1202
1203    /// Record a named event on the underlying span
1204    pub fn add_event(&mut self, name: impl Into<String>) {
1205        self.span.add_event(name);
1206    }
1207
1208    /// Mark the underlying span as successfully completed
1209    pub fn set_ok(&mut self) {
1210        self.span.set_ok();
1211    }
1212
1213    /// Mark the underlying span as failed with the given message
1214    pub fn set_error(&mut self, message: impl Into<String>) {
1215        self.span.set_error(message);
1216    }
1217
1218    /// Explicitly finish the scope and submit the span to the store
1219    pub fn finish(mut self) {
1220        self.span.end();
1221        let _ = self.store.add_span(self.span.clone());
1222        self.finished = true;
1223    }
1224
1225    /// Return a reference to the underlying span
1226    pub fn span(&self) -> &Span {
1227        &self.span
1228    }
1229}
1230
1231impl<'a> Drop for ContextScope<'a> {
1232    fn drop(&mut self) {
1233        if !self.finished {
1234            self.span.end();
1235            let _ = self.store.add_span(self.span.clone());
1236        }
1237    }
1238}
1239
1240/// A lightweight propagation context for network transmission
1241#[derive(Debug, Clone, Serialize, Deserialize)]
1242pub struct PropagationContext {
1243    /// W3C traceparent
1244    pub traceparent: String,
1245    /// W3C tracestate (optional vendor-specific data)
1246    pub tracestate: Option<String>,
1247    /// Serialized baggage
1248    pub baggage: Option<String>,
1249    /// Deadline (microseconds since epoch)
1250    pub deadline_us: Option<u64>,
1251    /// Hop count
1252    pub hop_count: u32,
1253    /// Max hops
1254    pub max_hops: Option<u32>,
1255}
1256
1257impl PropagationContext {
1258    /// Serialize a `Context` into a wire-ready propagation envelope
1259    pub fn from_context(ctx: &Context) -> Self {
1260        let tracestate = if ctx.trace_state.is_empty() {
1261            None
1262        } else {
1263            Some(
1264                ctx.trace_state
1265                    .iter()
1266                    .map(|(k, v)| format!("{}={}", k, v))
1267                    .collect::<Vec<_>>()
1268                    .join(","),
1269            )
1270        };
1271
1272        let baggage = if ctx.baggage.is_empty() {
1273            None
1274        } else {
1275            Some(
1276                ctx.baggage
1277                    .iter()
1278                    .map(|(k, v)| format!("{}={}", k, percent_encode(v)))
1279                    .collect::<Vec<_>>()
1280                    .join(","),
1281            )
1282        };
1283
1284        Self {
1285            traceparent: ctx.to_traceparent(),
1286            tracestate,
1287            baggage,
1288            deadline_us: ctx.deadline_us,
1289            hop_count: ctx.hop_count,
1290            max_hops: ctx.max_hops,
1291        }
1292    }
1293
1294    /// Deserialize this propagation envelope back into a `Context` for the given node
1295    pub fn to_context(&self, origin_node: NodeId) -> Option<Context> {
1296        let mut ctx = Context::from_traceparent(&self.traceparent, origin_node)?;
1297
1298        // Parse tracestate
1299        if let Some(ref ts) = self.tracestate {
1300            for pair in ts.split(',') {
1301                if let Some((k, v)) = pair.split_once('=') {
1302                    ctx.trace_state.insert(k.to_string(), v.to_string());
1303                }
1304            }
1305        }
1306
1307        // Parse baggage
1308        if let Some(ref bg) = self.baggage {
1309            for pair in bg.split(',') {
1310                if let Some((k, v)) = pair.split_once('=') {
1311                    if let Some(decoded) = percent_decode(v) {
1312                        ctx.baggage.set(k, decoded);
1313                    }
1314                }
1315            }
1316        }
1317
1318        ctx.deadline_us = self.deadline_us;
1319        ctx.hop_count = self.hop_count;
1320        ctx.max_hops = self.max_hops;
1321
1322        Some(ctx)
1323    }
1324}
1325
1326/// Helper to get current time in microseconds
1327fn now_micros() -> u64 {
1328    SystemTime::now()
1329        .duration_since(UNIX_EPOCH)
1330        .unwrap_or_default()
1331        .as_micros() as u64
1332}
1333
1334#[cfg(test)]
1335mod tests {
1336    use super::*;
1337
1338    fn test_node_id() -> NodeId {
1339        [1u8; 32]
1340    }
1341
1342    #[test]
1343    fn test_trace_id() {
1344        let id = TraceId::generate();
1345        let hex = id.to_hex();
1346        assert_eq!(hex.len(), 32);
1347
1348        let parsed = TraceId::from_hex(&hex).unwrap();
1349        assert_eq!(id, parsed);
1350    }
1351
1352    #[test]
1353    fn test_span_id() {
1354        let id = SpanId::generate();
1355        let hex = id.to_hex();
1356        assert_eq!(hex.len(), 16);
1357
1358        let parsed = SpanId::from_hex(&hex).unwrap();
1359        assert_eq!(id, parsed);
1360    }
1361
1362    #[test]
1363    fn test_span_lifecycle() {
1364        let trace_id = TraceId::generate();
1365        let node_id = test_node_id();
1366
1367        let mut span = Span::new(trace_id, "test_operation", node_id);
1368        span.set_attribute("key", "value");
1369        span.add_event("started");
1370
1371        assert!(span.end_time_us.is_none());
1372        span.end();
1373        assert!(span.end_time_us.is_some());
1374        assert!(span.duration_us().is_some());
1375    }
1376
1377    #[test]
1378    fn test_baggage() {
1379        let mut baggage = Baggage::new();
1380        baggage.set("user_id", "12345");
1381        baggage.set_with_metadata("tenant", "acme", "priority=high");
1382
1383        assert_eq!(baggage.get("user_id"), Some("12345"));
1384        assert_eq!(
1385            baggage.get_with_metadata("tenant"),
1386            Some(("acme", Some("priority=high")))
1387        );
1388
1389        let mut other = Baggage::new();
1390        other.set("user_id", "67890");
1391        other.set("request_id", "abc");
1392
1393        baggage.merge(&other);
1394        assert_eq!(baggage.get("user_id"), Some("67890"));
1395        assert_eq!(baggage.get("request_id"), Some("abc"));
1396    }
1397
1398    #[test]
1399    fn test_context_creation() {
1400        let node_id = test_node_id();
1401        let ctx = Context::new(node_id);
1402
1403        assert!(!ctx.is_expired());
1404        assert!(!ctx.exceeded_hops());
1405        assert_eq!(ctx.hop_count, 0);
1406    }
1407
1408    #[test]
1409    fn test_context_child() {
1410        let node_id = test_node_id();
1411        let parent = Context::new(node_id);
1412        let child = parent.child("child_operation");
1413
1414        assert_eq!(child.trace_id, parent.trace_id);
1415        assert_eq!(child.parent_span_id, Some(parent.span_id));
1416        assert_eq!(child.hop_count, parent.hop_count);
1417    }
1418
1419    #[test]
1420    fn test_context_remote() {
1421        let node_id = test_node_id();
1422        let local = Context::new(node_id);
1423        let remote = local.for_remote();
1424
1425        assert_eq!(remote.trace_id, local.trace_id);
1426        assert_eq!(remote.parent_span_id, Some(local.span_id));
1427        assert_eq!(remote.hop_count, local.hop_count + 1);
1428    }
1429
1430    #[test]
1431    fn test_context_timeout() {
1432        let node_id = test_node_id();
1433        let ctx = Context::new(node_id).with_timeout(Duration::from_millis(100));
1434
1435        assert!(!ctx.is_expired());
1436        assert!(ctx.remaining().is_some());
1437
1438        let expired = Context::new(node_id).with_timeout(Duration::from_nanos(1));
1439        std::thread::sleep(Duration::from_millis(1));
1440        assert!(expired.is_expired());
1441    }
1442
1443    #[test]
1444    fn test_context_max_hops() {
1445        let node_id = test_node_id();
1446        let mut ctx = Context::new(node_id).with_max_hops(3);
1447
1448        assert!(!ctx.exceeded_hops());
1449
1450        ctx.hop_count = 3;
1451        assert!(ctx.exceeded_hops());
1452    }
1453
1454    #[test]
1455    fn test_traceparent() {
1456        let node_id = test_node_id();
1457        let ctx = Context::new(node_id);
1458        let traceparent = ctx.to_traceparent();
1459
1460        assert!(traceparent.starts_with("00-"));
1461
1462        let parsed = Context::from_traceparent(&traceparent, node_id).unwrap();
1463        assert_eq!(parsed.trace_id, ctx.trace_id);
1464        assert_eq!(parsed.parent_span_id, Some(ctx.span_id));
1465        assert_eq!(parsed.hop_count, 1);
1466    }
1467
1468    #[test]
1469    fn test_sampler_always_on() {
1470        let sampler = Sampler::new(SamplingStrategy::AlwaysOn);
1471        for _ in 0..100 {
1472            assert!(sampler.should_sample(None));
1473        }
1474    }
1475
1476    #[test]
1477    fn test_sampler_always_off() {
1478        let sampler = Sampler::new(SamplingStrategy::AlwaysOff);
1479        for _ in 0..100 {
1480            assert!(!sampler.should_sample(None));
1481        }
1482    }
1483
1484    #[test]
1485    fn test_sampler_parent_based() {
1486        let sampler = Sampler::new(SamplingStrategy::ParentBased);
1487        assert!(sampler.should_sample(Some(true)));
1488        assert!(!sampler.should_sample(Some(false)));
1489        assert!(sampler.should_sample(None)); // No parent defaults to true
1490    }
1491
1492    #[test]
1493    fn test_context_store() {
1494        let store = ContextStore::new(100, 1000, Duration::from_secs(60))
1495            .with_sampling(SamplingStrategy::AlwaysOn);
1496
1497        let node_id = test_node_id();
1498        let ctx = store.create_context(node_id).unwrap();
1499
1500        assert!(store.get_context(&ctx.trace_id).is_some());
1501
1502        let mut span = Span::new(ctx.trace_id, "test", node_id);
1503        span.end();
1504        store.add_span(span).unwrap();
1505
1506        let spans = store.get_spans(&ctx.trace_id);
1507        assert_eq!(spans.len(), 1);
1508
1509        let (completed_ctx, completed_spans) = store.complete_trace(&ctx.trace_id).unwrap();
1510        assert_eq!(completed_ctx.trace_id, ctx.trace_id);
1511        assert_eq!(completed_spans.len(), 1);
1512
1513        assert!(store.get_context(&ctx.trace_id).is_none());
1514    }
1515
1516    #[test]
1517    fn test_propagation_context() {
1518        let node_id = test_node_id();
1519        let mut ctx = Context::new(node_id)
1520            .with_timeout(Duration::from_secs(30))
1521            .with_max_hops(10);
1522
1523        ctx.baggage.set("user", "alice");
1524        ctx.trace_state.insert("vendor".into(), "data".into());
1525
1526        let prop = PropagationContext::from_context(&ctx);
1527        let restored = prop.to_context(node_id).unwrap();
1528
1529        assert_eq!(restored.trace_id, ctx.trace_id);
1530        assert_eq!(restored.baggage.get("user"), Some("alice"));
1531        assert_eq!(restored.max_hops, Some(10));
1532    }
1533
1534    #[test]
1535    fn test_context_store_capacity() {
1536        let store = ContextStore::new(2, 10, Duration::from_secs(60))
1537            .with_sampling(SamplingStrategy::AlwaysOn);
1538
1539        let node_id = test_node_id();
1540
1541        let ctx1 = store.create_context(node_id).unwrap();
1542        let ctx2 = store.create_context(node_id).unwrap();
1543
1544        // Third should fail due to capacity
1545        assert!(matches!(
1546            store.create_context(node_id),
1547            Err(ContextError::CapacityExceeded)
1548        ));
1549
1550        // Complete one to make room
1551        store.complete_trace(&ctx1.trace_id);
1552
1553        // Now should succeed
1554        assert!(store.create_context(node_id).is_ok());
1555
1556        // Cleanup the second one too
1557        store.complete_trace(&ctx2.trace_id);
1558    }
1559
1560    // ========================================================================
1561    // create_context capacity check must be atomic
1562    // ========================================================================
1563
1564    /// Concurrent `create_context` calls must not grow `contexts` past
1565    /// `max_traces`. Pre-fix, two threads could each observe
1566    /// `len < max` after a `cleanup_expired` and both insert,
1567    /// producing `len > max`. The new atomic `try_reserve_slot` CAS
1568    /// gate guarantees the cap is hard.
1569    #[test]
1570    fn create_context_concurrent_inserts_do_not_exceed_max_traces() {
1571        use std::sync::Arc;
1572        use std::thread;
1573
1574        const MAX_TRACES: usize = 32;
1575        let store = Arc::new(
1576            ContextStore::new(MAX_TRACES, 10, Duration::from_secs(60))
1577                .with_sampling(SamplingStrategy::AlwaysOn),
1578        );
1579
1580        let node_id = test_node_id();
1581        let n_threads = 16;
1582        let attempts_per_thread = 8; // 16 * 8 = 128 total attempts
1583
1584        let barrier = Arc::new(std::sync::Barrier::new(n_threads));
1585        let mut handles = Vec::new();
1586        for _ in 0..n_threads {
1587            let store = store.clone();
1588            let barrier = barrier.clone();
1589            handles.push(thread::spawn(move || {
1590                barrier.wait();
1591                for _ in 0..attempts_per_thread {
1592                    let _ = store.create_context(node_id);
1593                }
1594            }));
1595        }
1596        for h in handles {
1597            h.join().expect("thread panicked");
1598        }
1599
1600        let stats = store.stats();
1601        assert!(
1602            stats.active_traces <= MAX_TRACES as u64,
1603            "active_traces ({}) exceeded MAX_TRACES ({}) — admission gate \
1604             must hold under concurrent inserts",
1605            stats.active_traces,
1606            MAX_TRACES,
1607        );
1608        // Also verify dropped_traces accounts for at least some
1609        // attempts that were rejected at capacity.
1610        assert!(
1611            stats.dropped_traces > 0,
1612            "with 128 attempts and a cap of 32, some inserts must have been dropped",
1613        );
1614    }
1615
1616    /// Two threads calling `continue_context` with the SAME trace_id
1617    /// must not strand `active_count` slots when `DashMap::insert`
1618    /// overwrites a prior entry. Pre-fix the duplicate-insert path
1619    /// reserved a slot but never released it on overwrite, so each
1620    /// duplicate `continue_context` permanently consumed one slot
1621    /// of capacity even though the map size never grew past 1. After
1622    /// `n` duplicates against the same trace_id the store would
1623    /// refuse new admissions despite `contexts.len() == 1`.
1624    #[test]
1625    fn continue_context_duplicate_trace_id_does_not_leak_capacity() {
1626        const MAX_TRACES: usize = 4;
1627        let store = ContextStore::new(MAX_TRACES, 10, Duration::from_secs(60))
1628            .with_sampling(SamplingStrategy::AlwaysOn);
1629        let node_id = test_node_id();
1630
1631        // Build a single context once and replay it `MAX_TRACES * 4`
1632        // times. Pre-fix this stranded `MAX_TRACES * 4 - 1` slots and
1633        // the next fresh `create_context` would hit CapacityExceeded.
1634        let ctx = Context::new(node_id);
1635        for _ in 0..(MAX_TRACES * 4) {
1636            store
1637                .continue_context(ctx.clone())
1638                .expect("duplicate continue_context must succeed");
1639        }
1640
1641        // Map only ever holds the one entry.
1642        assert_eq!(
1643            store.stats().active_traces,
1644            1,
1645            "duplicate continue_context must not grow the map",
1646        );
1647
1648        // The store still has room for `MAX_TRACES - 1` brand-new
1649        // traces. Pre-fix this loop tripped CapacityExceeded on the
1650        // first iteration because every duplicate had silently
1651        // consumed a slot.
1652        for _ in 0..(MAX_TRACES - 1) {
1653            store
1654                .create_context(node_id)
1655                .expect("active_count must reflect map size, not duplicate-insert count");
1656        }
1657    }
1658
1659    /// `complete_trace` releases an `active_count` slot so the
1660    /// store can re-admit after a trace finishes. Without this,
1661    /// the atomic counter would leak slots and the
1662    /// admission gate would refuse new traces even after the
1663    /// `contexts` map shrinks.
1664    #[test]
1665    fn complete_trace_re_admits_capacity() {
1666        let store = ContextStore::new(2, 10, Duration::from_secs(60))
1667            .with_sampling(SamplingStrategy::AlwaysOn);
1668        let node_id = test_node_id();
1669
1670        let ctx1 = store.create_context(node_id).unwrap();
1671        let _ctx2 = store.create_context(node_id).unwrap();
1672        // At cap → next create must be rejected.
1673        assert!(matches!(
1674            store.create_context(node_id),
1675            Err(ContextError::CapacityExceeded)
1676        ));
1677
1678        // Complete one trace; the slot must be released and a new
1679        // create must succeed.
1680        store.complete_trace(&ctx1.trace_id);
1681        assert!(
1682            store.create_context(node_id).is_ok(),
1683            "complete_trace must release a slot for re-admission",
1684        );
1685    }
1686
1687    #[test]
1688    fn test_context_store_stats() {
1689        let store = ContextStore::new(100, 1000, Duration::from_secs(60))
1690            .with_sampling(SamplingStrategy::AlwaysOn);
1691
1692        let node_id = test_node_id();
1693
1694        let ctx = store.create_context(node_id).unwrap();
1695
1696        let mut span = Span::new(ctx.trace_id, "op1", node_id);
1697        span.end();
1698        store.add_span(span).unwrap();
1699
1700        let mut span2 = Span::new(ctx.trace_id, "op2", node_id);
1701        span2.end();
1702        store.add_span(span2).unwrap();
1703
1704        let stats = store.stats();
1705        assert_eq!(stats.active_traces, 1);
1706        assert_eq!(stats.total_spans, 2);
1707        assert_eq!(stats.sampled_traces, 1);
1708    }
1709
1710    /// CR-14: pin that an early-return path between
1711    /// `try_reserve_slot` and the matching insert correctly
1712    /// rolls back the reservation via the `SlotReservation` guard's
1713    /// Drop impl. Pre-CR-14 each early-return path had to manually
1714    /// call `release_slot()` — easy to miss, leaks a slot
1715    /// permanently across `active_count` underflow guard.
1716    ///
1717    /// We exercise the sampling-skip path which bails out BEFORE
1718    /// calling `commit()` on the guard. Pre-CR-14 the code had a
1719    /// manual `release_slot` here; with the guard it's automatic
1720    /// — the test ensures the release still happens.
1721    #[test]
1722    fn cr14_sampling_skip_releases_reservation_via_drop_guard() {
1723        // Sampler that ALWAYS skips — every reserved slot must
1724        // be released via the guard's Drop.
1725        let store = ContextStore::new(8, 100, std::time::Duration::from_secs(60))
1726            .with_sampling(SamplingStrategy::AlwaysOff);
1727
1728        let node = test_node_id();
1729        for _ in 0..50 {
1730            let _ = store.create_context(node).unwrap();
1731        }
1732
1733        let stats = store.stats();
1734        assert_eq!(
1735            stats.active_traces, 0,
1736            "all 50 contexts were sampling-skipped; the SlotReservation \
1737             Drop guard must have released every reservation. Got \
1738             active_traces = {} (CR-14 regression).",
1739            stats.active_traces
1740        );
1741    }
1742
1743    /// CR-14: pin that a panic between reserve and commit ALSO
1744    /// releases the slot via Drop. We use `catch_unwind` to
1745    /// observe the panic without poisoning the test harness, then
1746    /// verify `active_count` rolled back.
1747    ///
1748    /// Cubic P2: read `active_count` directly rather than
1749    /// `stats().active_traces`. The latter is `contexts.len()` —
1750    /// the DashMap size — and a leaked reservation would bump the
1751    /// atomic but never reach an insert, leaving the map size
1752    /// unchanged and silently masking the regression.
1753    #[test]
1754    fn cr14_panic_between_reserve_and_commit_releases_slot() {
1755        use std::panic::{catch_unwind, AssertUnwindSafe};
1756        use std::sync::atomic::Ordering;
1757
1758        let store = ContextStore::new(8, 100, std::time::Duration::from_secs(60));
1759        let initial_active = store.active_count.load(Ordering::Relaxed);
1760
1761        // Synthesize "reserve then panic before commit" via direct
1762        // guard manipulation — mirrors what would happen if a
1763        // future refactor added a fallible step between reserve
1764        // and `guard.commit()`.
1765        let result = catch_unwind(AssertUnwindSafe(|| {
1766            let _guard = store
1767                .try_reserve_slot()
1768                .expect("first reserve must succeed against an empty store");
1769            // Simulate a panic on the path between reserve and
1770            // commit. The guard's Drop runs as part of unwind.
1771            panic!("simulated mid-path failure");
1772        }));
1773
1774        assert!(result.is_err(), "the closure must have panicked");
1775        let after_active = store.active_count.load(Ordering::Relaxed);
1776        assert_eq!(
1777            after_active, initial_active,
1778            "CR-14 regression: panic between reserve and commit MUST roll \
1779             back the slot reservation via SlotReservation::drop. \
1780             Got active before={} after={}",
1781            initial_active, after_active
1782        );
1783    }
1784
1785    /// CR-21: pin that this module's `random_u64`
1786    /// uses the abort-on-fail pattern, NOT `expect()` or
1787    /// `.unwrap()`. A getrandom panic here would unwind across
1788    /// any `extern "C"` FFI frame that called into the trace
1789    /// context layer — undefined behaviour. Source-level
1790    /// tripwire (assemble forbidden token at runtime so the test
1791    /// file doesn't trigger itself).
1792    #[test]
1793    fn cr21_random_u64_must_not_panic_on_getrandom_failure() {
1794        // Forbidden shapes — assembled at runtime to prevent the
1795        // test's own source from triggering the scan.
1796        let needle_expect = format!("getrandom::fill({}{})", "&mut bytes).", "expect");
1797        let needle_unwrap = format!("getrandom::fill({}{})", "&mut bytes).", "unwrap");
1798
1799        let src = include_str!("context.rs");
1800        for (lineno, line) in src.lines().enumerate() {
1801            let trimmed = line.trim_start();
1802            if trimmed.starts_with("//") {
1803                continue;
1804            }
1805            assert!(
1806                !trimmed.contains(&needle_expect),
1807                "CR-21 regression: getrandom::fill(...).expect(...) reintroduced \
1808                 at context.rs:{}. Use the abort-on-fail pattern (fallible \
1809                 writeln to stderr + std::process::abort).\n  line: {}",
1810                lineno + 1,
1811                line
1812            );
1813            assert!(
1814                !trimmed.contains(&needle_unwrap),
1815                "CR-21 regression: getrandom::fill(...).unwrap() reintroduced \
1816                 at context.rs:{}. Use the abort-on-fail pattern (fallible \
1817                 writeln to stderr + std::process::abort).\n  line: {}",
1818                lineno + 1,
1819                line
1820            );
1821        }
1822    }
1823
1824    // ---------- Span builder / lifecycle coverage ----------
1825
1826    #[test]
1827    fn span_with_parent_and_kind_set_fields() {
1828        let parent = SpanId::generate();
1829        let span = Span::new(TraceId::generate(), "child", test_node_id())
1830            .with_parent(parent)
1831            .with_kind(SpanKind::Server);
1832        assert_eq!(span.parent_span_id, Some(parent));
1833        assert_eq!(span.kind, SpanKind::Server);
1834    }
1835
1836    #[test]
1837    fn span_set_ok_and_set_error_update_status() {
1838        let mut span = Span::new(TraceId::generate(), "op", test_node_id());
1839        span.set_ok();
1840        assert!(matches!(span.status, SpanStatus::Ok));
1841
1842        span.set_error("boom");
1843        match &span.status {
1844            SpanStatus::Error { message } => assert_eq!(message, "boom"),
1845            other => panic!("expected Error, got {:?}", other),
1846        }
1847    }
1848
1849    #[test]
1850    fn span_add_event_with_attributes_and_add_link_populate_collections() {
1851        let mut span = Span::new(TraceId::generate(), "op", test_node_id());
1852
1853        let mut attrs = HashMap::new();
1854        attrs.insert("k".into(), AttributeValue::from("v"));
1855        span.add_event_with_attributes("evt", attrs);
1856        assert_eq!(span.events.len(), 1);
1857        assert_eq!(span.events[0].name, "evt");
1858        assert!(span.events[0].attributes.contains_key("k"));
1859
1860        let other_trace = TraceId::generate();
1861        let other_span = SpanId::generate();
1862        span.add_link(other_trace, other_span);
1863        assert_eq!(span.links.len(), 1);
1864        assert_eq!(span.links[0].trace_id, other_trace);
1865        assert_eq!(span.links[0].span_id, other_span);
1866    }
1867
1868    // ---------- ContextError Display ----------
1869
1870    #[test]
1871    fn context_error_display_covers_every_variant() {
1872        assert_eq!(format!("{}", ContextError::Expired), "context has expired");
1873        assert_eq!(
1874            format!("{}", ContextError::MaxHopsExceeded),
1875            "maximum hops exceeded"
1876        );
1877        assert_eq!(format!("{}", ContextError::NotFound), "context not found");
1878        assert_eq!(
1879            format!("{}", ContextError::InvalidTraceId),
1880            "invalid trace ID"
1881        );
1882        assert_eq!(
1883            format!("{}", ContextError::CapacityExceeded),
1884            "storage capacity exceeded"
1885        );
1886    }
1887
1888    // ---------- percent_encode / percent_decode ----------
1889
1890    #[test]
1891    fn percent_codec_roundtrips_ascii_and_unicode_and_punctuation() {
1892        for input in [
1893            "",
1894            "plain",
1895            "with space",
1896            "weird/chars?&=",
1897            "trailing space ",
1898            "key=value;meta=other",
1899            // Unicode bytes get encoded byte-by-byte.
1900            "café",
1901        ] {
1902            let encoded = percent_encode(input);
1903            // Unreserved chars survive; everything else is %HH.
1904            assert!(!encoded.contains(' '));
1905            let decoded =
1906                percent_decode(&encoded).unwrap_or_else(|| panic!("decode failed: {}", encoded));
1907            assert_eq!(decoded, input, "roundtrip mismatch for {input:?}");
1908        }
1909    }
1910
1911    #[test]
1912    fn percent_decode_rejects_truncated_hex_escape() {
1913        // `%4` is missing the second hex digit — the decoder must
1914        // surface None rather than silently consuming a partial
1915        // escape (which would corrupt baggage propagation).
1916        assert_eq!(percent_decode("%4"), None);
1917        // Non-hex characters after `%` also fail.
1918        assert_eq!(percent_decode("%ZZ"), None);
1919    }
1920
1921    // ---------- ContextScope RAII + explicit finish ----------
1922
1923    /// Build a store whose sampler is forced to AlwaysOn so
1924    /// `create_context` deterministically inserts the trace.
1925    /// The default sampler is `Ratio(0.1)` — most contexts go
1926    /// unsampled (not stored), and `add_span` then returns
1927    /// `NotFound` regardless of the scope's behavior.
1928    fn store_with_always_on_sampler() -> ContextStore {
1929        ContextStore::new(64, 64, Duration::from_secs(60))
1930            .with_sampler(Sampler::new(SamplingStrategy::AlwaysOn))
1931    }
1932
1933    #[test]
1934    fn context_scope_drop_records_span_into_store() {
1935        let store = store_with_always_on_sampler();
1936        let ctx = store.create_context(test_node_id()).unwrap();
1937        let trace_id = ctx.trace_id;
1938
1939        // Pre: no spans yet.
1940        assert!(store.get_spans(&trace_id).is_empty());
1941
1942        // Drop the scope without calling finish — the Drop impl
1943        // must end the span and push it to the store.
1944        {
1945            let _scope = ContextScope::new(&store, &ctx, "auto", test_node_id());
1946        }
1947        let spans = store.get_spans(&trace_id);
1948        assert_eq!(spans.len(), 1, "Drop must push the span");
1949        assert!(spans[0].end_time_us.is_some(), "Drop must end() the span");
1950    }
1951
1952    #[test]
1953    fn context_scope_finish_records_span_and_suppresses_drop() {
1954        let store = store_with_always_on_sampler();
1955        let ctx = store.create_context(test_node_id()).unwrap();
1956        let trace_id = ctx.trace_id;
1957
1958        let mut scope = ContextScope::new(&store, &ctx, "explicit", test_node_id());
1959        scope.set_ok();
1960        scope.finish();
1961
1962        // Exactly one span — `finish` set finished=true so Drop
1963        // didn't push a duplicate.
1964        let spans = store.get_spans(&trace_id);
1965        assert_eq!(spans.len(), 1);
1966        assert!(matches!(spans[0].status, SpanStatus::Ok));
1967    }
1968}