use crate::Effect;
use crate::collections::EffectHashMap;
use crate::collections::hash_map;
use crate::concurrency::fiber_ref::FiberRef;
use crate::effect;
use crate::kernel::box_future;
use crate::kernel::effect::{ProgramOp, SyncStep, start_async_operation, start_effect};
use crate::runtime::{Never, run_blocking};
use crate::scheduling::Clock;
use std::borrow::Cow;
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, Instant};
mod annotate_current_span_seal {
pub(super) trait Success {}
pub(super) trait Error {}
}
#[allow(private_bounds)] pub trait AnnotateCurrentSpanSuccess: From<()> + annotate_current_span_seal::Success {}
#[allow(private_bounds)]
pub trait AnnotateCurrentSpanErr: From<Never> + annotate_current_span_seal::Error {}
impl annotate_current_span_seal::Success for () {}
impl AnnotateCurrentSpanSuccess for () {}
impl annotate_current_span_seal::Error for Never {}
impl AnnotateCurrentSpanErr for Never {}
#[derive(Clone, Debug, PartialEq)]
pub struct TracingConfig {
pub enabled: bool,
pub bridge_to_tracing: bool,
pub record_in_memory: bool,
pub default_sample_rate: f64,
}
impl Default for TracingConfig {
fn default() -> Self {
Self {
enabled: false,
bridge_to_tracing: false,
record_in_memory: false,
default_sample_rate: 1.0,
}
}
}
impl TracingConfig {
#[inline]
pub fn enabled() -> Self {
Self {
enabled: true,
bridge_to_tracing: false,
record_in_memory: true,
default_sample_rate: 1.0,
}
}
#[inline]
pub fn enabled_with_tracing_bridge() -> Self {
Self {
enabled: true,
bridge_to_tracing: true,
record_in_memory: true,
default_sample_rate: 1.0,
}
}
#[inline]
pub fn tracing_bridge_only() -> Self {
Self {
enabled: true,
bridge_to_tracing: true,
record_in_memory: false,
default_sample_rate: 1.0,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[repr(u64)]
pub(crate) enum TracingMode {
Disabled = 0,
BridgeOnly = 1,
BridgeOnlyNoSubscriber = 2,
Snapshot = 3,
SnapshotBridge = 4,
}
impl TracingMode {
const fn from_config(config: &TracingConfig, subscriber_available: bool) -> Self {
if !config.enabled {
return Self::Disabled;
}
if !config.record_in_memory {
if !config.bridge_to_tracing || !subscriber_available {
return Self::Disabled;
}
return Self::BridgeOnly;
}
if config.bridge_to_tracing {
Self::SnapshotBridge
} else {
Self::Snapshot
}
}
#[inline]
const fn is_disabled(self) -> bool {
matches!(self, Self::Disabled)
}
#[inline]
const fn is_bridge_only(self) -> bool {
matches!(self, Self::BridgeOnly)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct TraceId([u8; 16]);
impl TraceId {
#[inline]
pub const fn from_bytes(bytes: [u8; 16]) -> Self {
Self(bytes)
}
#[inline]
pub const fn to_bytes(self) -> [u8; 16] {
self.0
}
}
impl fmt::Display for TraceId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt_hex(&self.0, f)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct SpanId([u8; 8]);
impl SpanId {
#[inline]
pub const fn from_bytes(bytes: [u8; 8]) -> Self {
Self(bytes)
}
#[inline]
pub const fn to_bytes(self) -> [u8; 8] {
self.0
}
}
impl fmt::Display for SpanId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt_hex(&self.0, f)
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
pub struct TraceFlags {
bits: u8,
}
impl TraceFlags {
pub const DEFAULT: Self = Self { bits: 0 };
pub const SAMPLED: Self = Self { bits: 1 };
#[inline]
pub const fn from_bits(bits: u8) -> Self {
Self { bits }
}
#[inline]
pub const fn bits(self) -> u8 {
self.bits
}
#[inline]
pub const fn sampled(self) -> bool {
self.bits & Self::SAMPLED.bits == Self::SAMPLED.bits
}
}
impl fmt::Display for TraceFlags {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:02x}", self.bits)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TraceParentParseError {
InvalidFieldCount,
UnsupportedVersion,
InvalidLength,
InvalidHex,
AllZeroId,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct SpanContext {
pub trace_id: TraceId,
pub span_id: SpanId,
pub trace_flags: TraceFlags,
}
impl SpanContext {
#[inline]
pub fn to_traceparent(self) -> String {
format!("00-{}-{}-{}", self.trace_id, self.span_id, self.trace_flags)
}
pub fn from_traceparent(value: &str) -> Result<Self, TraceParentParseError> {
let mut parts = value.split('-');
let version = parts
.next()
.ok_or(TraceParentParseError::InvalidFieldCount)?;
let trace_id = parts
.next()
.ok_or(TraceParentParseError::InvalidFieldCount)?;
let span_id = parts
.next()
.ok_or(TraceParentParseError::InvalidFieldCount)?;
let trace_flags = parts
.next()
.ok_or(TraceParentParseError::InvalidFieldCount)?;
if parts.next().is_some() {
return Err(TraceParentParseError::InvalidFieldCount);
}
if version != "00" {
return Err(TraceParentParseError::UnsupportedVersion);
}
let trace_id = TraceId::from_bytes(parse_hex_array::<16>(trace_id)?);
let span_id = SpanId::from_bytes(parse_hex_array::<8>(span_id)?);
if trace_id.to_bytes().iter().all(|byte| *byte == 0)
|| span_id.to_bytes().iter().all(|byte| *byte == 0)
{
return Err(TraceParentParseError::AllZeroId);
}
let [trace_flags] = parse_hex_array::<1>(trace_flags)?;
Ok(Self {
trace_id,
span_id,
trace_flags: TraceFlags::from_bits(trace_flags),
})
}
}
pub trait TraceContextProvider: Send + Sync + 'static {
fn root_context(&self) -> SpanContext;
fn child_context(&self, parent: &SpanContext) -> SpanContext;
}
#[derive(Debug, Default)]
pub struct SequentialTraceContextProvider {
next_trace: AtomicU64,
next_span: AtomicU64,
}
impl SequentialTraceContextProvider {
#[inline]
pub fn new() -> Self {
Self::default()
}
fn next_trace_id(&self) -> TraceId {
TraceId::from_bytes(counter_to_trace_bytes(
self.next_trace.fetch_add(1, Ordering::Relaxed) + 1,
))
}
fn next_span_id(&self) -> SpanId {
SpanId::from_bytes((self.next_span.fetch_add(1, Ordering::Relaxed) + 1).to_be_bytes())
}
}
impl TraceContextProvider for SequentialTraceContextProvider {
fn root_context(&self) -> SpanContext {
SpanContext {
trace_id: self.next_trace_id(),
span_id: self.next_span_id(),
trace_flags: TraceFlags::DEFAULT,
}
}
fn child_context(&self, parent: &SpanContext) -> SpanContext {
SpanContext {
trace_id: parent.trace_id,
span_id: self.next_span_id(),
trace_flags: parent.trace_flags,
}
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
pub enum SpanLevel {
Trace,
Debug,
#[default]
Info,
Warn,
Error,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub enum SpanStatus {
#[default]
Unset,
Ok,
Error,
}
#[derive(Clone, Debug, PartialEq)]
pub enum SpanAttributeValue {
String(String),
Bool(bool),
I64(i64),
F64(f64),
}
impl From<String> for SpanAttributeValue {
fn from(value: String) -> Self {
Self::String(value)
}
}
impl From<&str> for SpanAttributeValue {
fn from(value: &str) -> Self {
Self::String(value.to_owned())
}
}
impl From<bool> for SpanAttributeValue {
fn from(value: bool) -> Self {
Self::Bool(value)
}
}
impl From<i64> for SpanAttributeValue {
fn from(value: i64) -> Self {
Self::I64(value)
}
}
impl From<i32> for SpanAttributeValue {
fn from(value: i32) -> Self {
Self::I64(i64::from(value))
}
}
impl From<f64> for SpanAttributeValue {
fn from(value: f64) -> Self {
Self::F64(value)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct SpanEvent {
pub name: String,
pub attributes: EffectHashMap<String, SpanAttributeValue>,
pub occurred_at: Instant,
}
#[derive(Clone, Debug, PartialEq)]
pub struct SpanOptions {
pub name: Cow<'static, str>,
pub level: SpanLevel,
pub attributes: EffectHashMap<String, SpanAttributeValue>,
pub sample_rate: Option<f64>,
}
impl SpanOptions {
#[inline]
pub fn new(name: impl Into<Cow<'static, str>>) -> Self {
Self {
name: name.into(),
level: SpanLevel::Info,
attributes: EffectHashMap::new(),
sample_rate: None,
}
}
#[inline]
pub fn with_level(mut self, level: SpanLevel) -> Self {
self.level = level;
self
}
#[inline]
pub fn with_attribute(
mut self,
key: impl Into<String>,
value: impl Into<SpanAttributeValue>,
) -> Self {
self.attributes = hash_map::set(&self.attributes, key.into(), value.into());
self
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EffectEvent {
Start {
span: String,
},
Success {
span: String,
},
Failure {
span: String,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FiberEvent {
Spawn {
fiber_id: String,
},
Complete {
fiber_id: String,
},
Interrupt {
fiber_id: String,
},
}
#[derive(Clone, Debug, PartialEq)]
pub struct SpanRecord {
pub name: String,
pub context: SpanContext,
pub parent_span_id: Option<SpanId>,
pub level: SpanLevel,
pub status: SpanStatus,
pub attributes: EffectHashMap<String, SpanAttributeValue>,
pub events: Vec<SpanEvent>,
pub started_at: Instant,
pub ended_at: Option<Instant>,
}
impl SpanRecord {
#[inline]
pub fn duration(&self) -> Option<Duration> {
self
.ended_at
.map(|ended_at| ended_at.duration_since(self.started_at))
}
}
#[derive(Clone, Debug)]
pub struct LogSpan {
pub name: Cow<'static, str>,
pub context: SpanContext,
pub tracing_span: Option<::tracing::Span>,
}
#[derive(Clone, Debug, PartialEq, Default)]
pub struct TracingSnapshot {
pub effect_events: Vec<EffectEvent>,
pub fiber_events: Vec<FiberEvent>,
pub spans: Vec<SpanRecord>,
}
struct TraceState {
config: TracingConfig,
effect_events: Vec<EffectEvent>,
fiber_events: Vec<FiberEvent>,
spans: Vec<SpanRecord>,
context_provider: Arc<dyn TraceContextProvider>,
clock_now: Arc<dyn Fn() -> Instant + Send + Sync>,
}
impl Default for TraceState {
fn default() -> Self {
Self {
config: TracingConfig::default(),
effect_events: Vec::new(),
fiber_events: Vec::new(),
spans: Vec::new(),
context_provider: Arc::new(SequentialTraceContextProvider::new()),
clock_now: Arc::new(Instant::now),
}
}
}
static TRACE_STATE: OnceLock<Mutex<TraceState>> = OnceLock::new();
static TRACING_MODE: AtomicU64 = AtomicU64::new(TracingMode::Disabled as u64);
static SAMPLING_MASK: AtomicU64 = AtomicU64::new(0);
const SAMPLING_NEVER: u64 = u64::MAX;
std::thread_local! {
static LOCAL_SPAN_COUNTER: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
}
#[derive(Clone)]
pub struct TracingFiberRefs {
pub span_stack: FiberRef<Vec<LogSpan>>,
pub span_annotations: FiberRef<EffectHashMap<String, SpanAttributeValue>>,
}
struct SpanStackPopGuard {
refs: TracingFiberRefs,
active: bool,
}
impl SpanStackPopGuard {
fn new(refs: TracingFiberRefs) -> Self {
Self { refs, active: true }
}
fn pop(&mut self) {
if !self.active {
return;
}
run_blocking(
self.refs.span_stack.update(|mut v| {
v.pop();
v
}),
(),
)
.expect("span_stack pop");
self.active = false;
}
}
impl Drop for SpanStackPopGuard {
fn drop(&mut self) {
if !self.active {
return;
}
let _ = run_blocking(
self.refs.span_stack.update(|mut v| {
v.pop();
v
}),
(),
);
self.active = false;
}
}
static TRACING_FIBER_REFS: OnceLock<TracingFiberRefs> = OnceLock::new();
fn trace_state() -> &'static Mutex<TraceState> {
TRACE_STATE.get_or_init(|| Mutex::new(TraceState::default()))
}
#[inline]
fn tracing_mode() -> TracingMode {
let raw = TRACING_MODE.load(Ordering::Relaxed);
match raw {
0 => TracingMode::Disabled,
1 => TracingMode::BridgeOnly,
2 => TracingMode::BridgeOnlyNoSubscriber,
3 => TracingMode::Snapshot,
4 => TracingMode::SnapshotBridge,
_ => TracingMode::Disabled,
}
}
#[inline]
fn should_sample(sample_rate_override: Option<f64>) -> bool {
let mask = if let Some(rate) = sample_rate_override {
sampling_mask(rate)
} else {
SAMPLING_MASK.load(Ordering::Relaxed)
};
if mask == 0 {
return true;
}
if mask == SAMPLING_NEVER {
return false;
}
LOCAL_SPAN_COUNTER.with(|cell| {
let counter = cell.get();
cell.set(counter.wrapping_add(1));
(counter & mask) == 0
})
}
#[doc(hidden)]
pub fn tracing_enabled() -> bool {
!tracing_mode().is_disabled()
}
fn current_tracing_subscriber_available() -> bool {
::tracing::dispatcher::get_default(|dispatch| {
!dispatch.is::<::tracing::subscriber::NoSubscriber>()
})
}
fn sampling_mask(rate: f64) -> u64 {
if rate >= 1.0 {
return 0;
}
if rate <= 0.0 {
return SAMPLING_NEVER;
}
let interval = (1.0 / rate).ceil() as u64;
interval.next_power_of_two().saturating_sub(1)
}
fn level_to_tracing_level(level: SpanLevel) -> ::tracing::Level {
match level {
SpanLevel::Trace => ::tracing::Level::TRACE,
SpanLevel::Debug => ::tracing::Level::DEBUG,
SpanLevel::Info => ::tracing::Level::INFO,
SpanLevel::Warn => ::tracing::Level::WARN,
SpanLevel::Error => ::tracing::Level::ERROR,
}
}
fn make_tracing_span(
options: &SpanOptions,
context: SpanContext,
parent_span_id: Option<SpanId>,
) -> Option<::tracing::Span> {
let parent = parent_span_id.map(|id| id.to_string());
let span = match level_to_tracing_level(options.level) {
::tracing::Level::TRACE => ::tracing::trace_span!(
"effectful.span",
otel_name = %options.name,
otel_trace_id = %context.trace_id,
otel_span_id = %context.span_id,
otel_parent_span_id = parent.as_deref().unwrap_or(""),
otel_trace_flags = %context.trace_flags,
effectful_status = ::tracing::field::Empty,
effectful_duration_ns = ::tracing::field::Empty,
effectful_attributes = ?options.attributes,
),
::tracing::Level::DEBUG => ::tracing::debug_span!(
"effectful.span",
otel_name = %options.name,
otel_trace_id = %context.trace_id,
otel_span_id = %context.span_id,
otel_parent_span_id = parent.as_deref().unwrap_or(""),
otel_trace_flags = %context.trace_flags,
effectful_status = ::tracing::field::Empty,
effectful_duration_ns = ::tracing::field::Empty,
effectful_attributes = ?options.attributes,
),
::tracing::Level::INFO => ::tracing::info_span!(
"effectful.span",
otel_name = %options.name,
otel_trace_id = %context.trace_id,
otel_span_id = %context.span_id,
otel_parent_span_id = parent.as_deref().unwrap_or(""),
otel_trace_flags = %context.trace_flags,
effectful_status = ::tracing::field::Empty,
effectful_duration_ns = ::tracing::field::Empty,
effectful_attributes = ?options.attributes,
),
::tracing::Level::WARN => ::tracing::warn_span!(
"effectful.span",
otel_name = %options.name,
otel_trace_id = %context.trace_id,
otel_span_id = %context.span_id,
otel_parent_span_id = parent.as_deref().unwrap_or(""),
otel_trace_flags = %context.trace_flags,
effectful_status = ::tracing::field::Empty,
effectful_duration_ns = ::tracing::field::Empty,
effectful_attributes = ?options.attributes,
),
::tracing::Level::ERROR => ::tracing::error_span!(
"effectful.span",
otel_name = %options.name,
otel_trace_id = %context.trace_id,
otel_span_id = %context.span_id,
otel_parent_span_id = parent.as_deref().unwrap_or(""),
otel_trace_flags = %context.trace_flags,
effectful_status = ::tracing::field::Empty,
effectful_duration_ns = ::tracing::field::Empty,
effectful_attributes = ?options.attributes,
),
};
Some(span)
}
fn make_bridge_only_span(options: &SpanOptions) -> Option<::tracing::Span> {
if !matches!(tracing_mode(), TracingMode::BridgeOnly) {
return None;
}
let span = match level_to_tracing_level(options.level) {
::tracing::Level::TRACE => ::tracing::trace_span!("effectful.span", otel_name = %options.name),
::tracing::Level::DEBUG => ::tracing::debug_span!("effectful.span", otel_name = %options.name),
::tracing::Level::INFO => ::tracing::info_span!("effectful.span", otel_name = %options.name),
::tracing::Level::WARN => ::tracing::warn_span!("effectful.span", otel_name = %options.name),
::tracing::Level::ERROR => ::tracing::error_span!("effectful.span", otel_name = %options.name),
};
Some(span)
}
fn emit_tracing_span_event(
span: &Option<::tracing::Span>,
name: &str,
attributes: &EffectHashMap<String, SpanAttributeValue>,
) {
if let Some(span) = span {
span.in_scope(|| {
::tracing::event!(
::tracing::Level::INFO,
effectful_event = %name,
effectful_event_attributes = ?attributes,
);
});
}
}
fn fmt_hex(bytes: &[u8], f: &mut fmt::Formatter<'_>) -> fmt::Result {
for byte in bytes {
write!(f, "{byte:02x}")?;
}
Ok(())
}
fn parse_hex_array<const N: usize>(value: &str) -> Result<[u8; N], TraceParentParseError> {
if value.len() != N * 2 {
return Err(TraceParentParseError::InvalidLength);
}
let mut out = [0; N];
let bytes = value.as_bytes();
for index in 0..N {
let high = hex_nibble(bytes[index * 2])?;
let low = hex_nibble(bytes[index * 2 + 1])?;
out[index] = (high << 4) | low;
}
Ok(out)
}
fn hex_nibble(byte: u8) -> Result<u8, TraceParentParseError> {
match byte {
b'0'..=b'9' => Ok(byte - b'0'),
b'a'..=b'f' => Ok(byte - b'a' + 10),
b'A'..=b'F' => Ok(byte - b'A' + 10),
_ => Err(TraceParentParseError::InvalidHex),
}
}
fn counter_to_trace_bytes(counter: u64) -> [u8; 16] {
let mut bytes = [0; 16];
let counter_bytes = counter.to_be_bytes();
bytes[8..].copy_from_slice(&counter_bytes);
bytes
}
pub(crate) fn fiber_refs() -> Option<&'static TracingFiberRefs> {
TRACING_FIBER_REFS.get()
}
fn with_state_mut<F>(f: F)
where
F: FnOnce(&mut TraceState),
{
let mut guard = trace_state().lock().expect("trace state mutex poisoned");
if !guard.config.enabled || !guard.config.record_in_memory {
return;
}
f(&mut guard);
}
fn next_span_context(parent: Option<SpanContext>) -> Option<SpanContext> {
let guard = trace_state().lock().expect("trace state mutex poisoned");
if !guard.config.enabled {
return None;
}
Some(match parent {
Some(parent) => guard.context_provider.child_context(&parent),
None => guard.context_provider.root_context(),
})
}
fn tracing_now() -> Instant {
let clock_now = {
let guard = trace_state().lock().expect("trace state mutex poisoned");
Arc::clone(&guard.clock_now)
};
clock_now()
}
fn record_span_start(
options: &SpanOptions,
context: SpanContext,
parent_span_id: Option<SpanId>,
started_at: Instant,
) {
with_state_mut(|state| {
state.spans.push(SpanRecord {
name: options.name.to_string(),
context,
parent_span_id,
level: options.level,
status: SpanStatus::Unset,
attributes: options.attributes.clone(),
events: Vec::new(),
started_at,
ended_at: None,
});
});
}
fn record_span_end(
span_id: SpanId,
attributes: EffectHashMap<String, SpanAttributeValue>,
status: SpanStatus,
ended_at: Instant,
) {
with_state_mut(|state| {
if let Some(span) = state
.spans
.iter_mut()
.rev()
.find(|span| span.context.span_id == span_id)
{
span.attributes = attributes;
span.status = status;
span.ended_at = Some(ended_at);
}
});
}
fn record_span_event(
span_id: SpanId,
name: String,
attributes: EffectHashMap<String, SpanAttributeValue>,
occurred_at: Instant,
) {
with_state_mut(|state| {
if let Some(span) = state
.spans
.iter_mut()
.rev()
.find(|span| span.context.span_id == span_id)
{
span.events.push(SpanEvent {
name,
attributes,
occurred_at,
});
}
});
}
fn record_effect_event(event: EffectEvent) {
with_state_mut(|state| {
state.effect_events.push(event);
});
}
pub fn install_tracing_layer(config: TracingConfig) -> Effect<(), Never, ()> {
install_tracing_layer_with_context_provider(
config,
Arc::new(SequentialTraceContextProvider::new()),
)
}
pub fn install_tracing_layer_with_clock<C>(config: TracingConfig, clock: C) -> Effect<(), Never, ()>
where
C: Clock + Send + Sync + 'static,
{
install_tracing_layer_with_context_provider_and_clock(
config,
Arc::new(SequentialTraceContextProvider::new()),
clock,
)
}
pub fn install_tracing_layer_with_context_provider(
config: TracingConfig,
context_provider: Arc<dyn TraceContextProvider>,
) -> Effect<(), Never, ()> {
install_tracing_layer_with_context_provider_and_clock_fn(
config,
context_provider,
Arc::new(Instant::now),
)
}
pub fn install_tracing_layer_with_context_provider_and_clock<C>(
config: TracingConfig,
context_provider: Arc<dyn TraceContextProvider>,
clock: C,
) -> Effect<(), Never, ()>
where
C: Clock + Send + Sync + 'static,
{
install_tracing_layer_with_context_provider_and_clock_fn(
config,
context_provider,
Arc::new(move || clock.now()),
)
}
fn install_tracing_layer_with_context_provider_and_clock_fn(
config: TracingConfig,
context_provider: Arc<dyn TraceContextProvider>,
clock_now: Arc<dyn Fn() -> Instant + Send + Sync>,
) -> Effect<(), Never, ()> {
Effect::new(move |_env| {
TRACING_FIBER_REFS.get_or_init(|| {
let span_stack = run_blocking(
FiberRef::make_with(
Vec::<LogSpan>::new,
|_parent| Vec::new(),
|parent, _child| parent.clone(),
),
(),
)
.expect("tracing span_stack FiberRef");
let span_annotations = run_blocking(
FiberRef::make_with(
hash_map::empty::<String, SpanAttributeValue>,
|_parent| hash_map::empty(),
|parent, _child| parent.clone(),
),
(),
)
.expect("tracing span_annotations FiberRef");
TracingFiberRefs {
span_stack,
span_annotations,
}
});
let mut guard = trace_state().lock().expect("trace state mutex poisoned");
guard.config = config.clone();
let subscriber_available =
config.enabled && config.bridge_to_tracing && current_tracing_subscriber_available();
let mode = TracingMode::from_config(&config, subscriber_available);
TRACING_MODE.store(mode as u64, Ordering::Relaxed);
let mask = sampling_mask(config.default_sample_rate);
SAMPLING_MASK.store(mask, Ordering::Relaxed);
guard.context_provider = Arc::clone(&context_provider);
guard.clock_now = Arc::clone(&clock_now);
guard.effect_events.clear();
guard.fiber_events.clear();
guard.spans.clear();
Ok(())
})
}
pub fn emit_effect_event(event: EffectEvent) -> Effect<(), Never, ()> {
Effect::new(move |_env| {
record_effect_event(event.clone());
Ok(())
})
}
pub fn emit_fiber_event(event: FiberEvent) -> Effect<(), Never, ()> {
Effect::new(move |_env| {
with_state_mut(|state| {
state.fiber_events.push(event.clone());
});
Ok(())
})
}
pub fn annotate_current_span<A, E, R>(
key: &'static str,
value: impl Into<String>,
) -> Effect<A, E, R>
where
A: AnnotateCurrentSpanSuccess + 'static,
E: AnnotateCurrentSpanErr + 'static,
R: 'static,
{
annotate_current_span_attribute(key, SpanAttributeValue::String(value.into()))
}
pub fn annotate_current_span_attribute<A, E, R>(
key: &'static str,
value: impl Into<SpanAttributeValue>,
) -> Effect<A, E, R>
where
A: AnnotateCurrentSpanSuccess + 'static,
E: AnnotateCurrentSpanErr + 'static,
R: 'static,
{
let value = value.into();
effect!(|_r: &mut R| {
if !tracing_enabled() {
return Ok(A::from(()));
}
let Some(refs) = fiber_refs() else {
return Ok(A::from(()));
};
let stack = run_blocking(refs.span_stack.get(), ()).expect("span_stack get");
if stack.is_empty() {
return Ok(A::from(()));
}
let span_id = stack.last().expect("non-empty span stack").context.span_id;
let val = value.clone();
run_blocking(
refs
.span_annotations
.update(move |m| hash_map::set(&m, key.to_string(), val)),
(),
)
.expect("span_annotations update");
with_state_mut(|state| {
if let Some(span) = state
.spans
.iter_mut()
.rev()
.find(|span| span.context.span_id == span_id)
{
span.attributes = hash_map::set(&span.attributes, key.to_string(), value.clone());
}
});
A::from(())
})
}
pub fn emit_current_span_event<R>(name: impl Into<String>) -> Effect<(), Never, R>
where
R: 'static,
{
emit_current_span_event_with_attributes(name, EffectHashMap::new())
}
pub fn emit_current_span_event_with_attributes<R>(
name: impl Into<String>,
attributes: EffectHashMap<String, SpanAttributeValue>,
) -> Effect<(), Never, R>
where
R: 'static,
{
let name = name.into();
effect!(|_r: &mut R| {
if !tracing_enabled() {
return Ok(());
}
let Some(refs) = fiber_refs() else {
return Ok(());
};
let stack = run_blocking(refs.span_stack.get(), ()).expect("span_stack get");
let Some(span) = stack.last() else {
return Ok(());
};
emit_tracing_span_event(&span.tracing_span, &name, &attributes);
record_span_event(
span.context.span_id,
name.clone(),
attributes.clone(),
tracing_now(),
);
()
})
}
pub fn with_span<A, E, R>(
effect: Effect<A, E, R>,
name: impl Into<Cow<'static, str>>,
) -> Effect<A, E, R>
where
A: 'static,
E: 'static,
R: 'static,
{
with_span_options(effect, SpanOptions::new(name))
}
pub fn with_span_options<A, E, R>(effect: Effect<A, E, R>, options: SpanOptions) -> Effect<A, E, R>
where
A: 'static,
E: 'static,
R: 'static,
{
Effect::new_step(move |env| {
let mode = tracing_mode();
if mode.is_disabled() || !should_sample(options.sample_rate) {
return start_effect(effect, env);
}
if mode.is_bridge_only() {
return start_effect(with_span_options_bridge_only(effect, options), env);
}
start_effect(with_span_options_enabled(effect, options), env)
})
}
#[doc(hidden)]
pub fn __effectful_span_lazy<A, E, R, F, O>(
body: F,
options: O,
sample_rate: Option<f64>,
) -> Effect<A, E, R>
where
A: 'static,
E: 'static,
R: 'static,
F: FnOnce() -> Effect<A, E, R> + 'static,
O: FnOnce() -> SpanOptions + 'static,
{
let effect = body();
Effect::new_step(move |env| {
let mode = tracing_mode();
if mode.is_disabled() || !should_sample(sample_rate) {
return start_effect(effect, env);
}
let options = options();
if mode.is_bridge_only() {
return start_effect(with_span_options_bridge_only(effect, options), env);
}
start_effect(with_span_options_enabled(effect, options), env)
})
}
#[doc(hidden)]
pub fn __effectful_span_lazy_scoped<A, E, R, F>(
make: F,
sample_rate: Option<f64>,
) -> Effect<A, E, R>
where
A: 'static,
E: 'static,
R: 'static,
F: FnOnce(bool) -> (Option<SpanOptions>, Effect<A, E, R>) + 'static,
{
Effect::new_step(move |env| {
let mode = tracing_mode();
let instrument = !mode.is_disabled() && should_sample(sample_rate);
let (options, effect) = make(instrument);
let Some(options) = options else {
return start_effect(effect, env);
};
if mode.is_bridge_only() {
return start_effect(with_span_options_bridge_only(effect, options), env);
}
start_effect(with_span_options_enabled(effect, options), env)
})
}
fn with_span_options_bridge_only<A, E, R>(
effect: Effect<A, E, R>,
options: SpanOptions,
) -> Effect<A, E, R>
where
A: 'static,
E: 'static,
R: 'static,
{
Effect::new_program(BridgeOnlySpanProgram {
source: effect,
options,
})
}
struct BridgeOnlySpanProgram<A, E, R>
where
A: 'static,
E: 'static,
R: 'static,
{
source: Effect<A, E, R>,
options: SpanOptions,
}
impl<A, E, R> ProgramOp<A, E, R> for BridgeOnlySpanProgram<A, E, R>
where
A: 'static,
E: 'static,
R: 'static,
{
fn start(self: Box<Self>, env: &mut R) -> SyncStep<A, E, R> {
let Self { source, options } = *self;
if tracing_mode().is_disabled() {
return start_effect(source, env);
}
let Some(span) = make_bridge_only_span(&options) else {
return start_effect(source, env);
};
if span.is_disabled() {
return start_effect(source, env);
}
match span.in_scope(|| start_effect(source, env)) {
SyncStep::Ready(output) => SyncStep::Ready(output),
SyncStep::AsyncBorrow(f) => SyncStep::AsyncBorrow(Box::new(move |env| {
let span = span.clone();
box_future(async move {
use ::tracing::Instrument as _;
start_async_operation(f, env).instrument(span).await
})
})),
SyncStep::AsyncStatic(fut) => SyncStep::AsyncStatic(box_future(async move {
use ::tracing::Instrument as _;
fut.instrument(span).await
})),
SyncStep::AsyncPoll(mut poller) => SyncStep::AsyncPoll(Box::new(move |env, cx| {
let entered = span.enter();
match poller(env, cx) {
std::task::Poll::Ready(output) => {
drop(entered);
std::task::Poll::Ready(output)
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
})),
}
}
}
fn with_span_options_enabled<A, E, R>(
effect: Effect<A, E, R>,
options: SpanOptions,
) -> Effect<A, E, R>
where
A: 'static,
E: 'static,
R: 'static,
{
Effect::new_async(move |env: &mut R| {
let options = options.clone();
box_future(async move {
if tracing_mode().is_disabled() {
return effect.run(env).await;
}
let Some(refs) = fiber_refs().cloned() else {
return effect.run(env).await;
};
let stack = run_blocking(refs.span_stack.get(), ()).expect("span_stack get");
let parent_context = stack.last().map(|span| span.context);
let Some(context) = next_span_context(parent_context) else {
return effect.run(env).await;
};
let started_at = tracing_now();
let parent_span_id = parent_context.map(|parent| parent.span_id);
let tracing_span = make_tracing_span(&options, context, parent_span_id);
record_span_start(&options, context, parent_span_id, started_at);
record_effect_event(EffectEvent::Start {
span: options.name.to_string(),
});
let span_name_for_push = options.name.clone();
let tracing_span_for_push = tracing_span.clone();
run_blocking(
refs.span_stack.update(move |mut v| {
v.push(LogSpan {
name: span_name_for_push,
context,
tracing_span: tracing_span_for_push,
});
v
}),
(),
)
.expect("span_stack push");
let mut span_stack_guard = SpanStackPopGuard::new(refs.clone());
let initial_attributes = options.attributes.clone();
let refs_for_inner = refs.clone();
let span_name_inner = options.name.to_string();
let tracing_span_for_inner = tracing_span.clone();
let inner = Effect::new_async(move |env: &mut R| {
let span_name = span_name_inner.clone();
let refs = refs_for_inner.clone();
let tracing_span = tracing_span_for_inner.clone();
box_future(async move {
let out = effect.run(env).await;
if tracing_enabled() {
let attributes = run_blocking(refs.span_annotations.get(), ())
.expect("span_annotations get for flush");
let status = match &out {
Ok(_) => SpanStatus::Ok,
Err(_) => SpanStatus::Error,
};
let ended_at = tracing_now();
if let Some(span) = &tracing_span {
let status_label = match status {
SpanStatus::Unset => "unset",
SpanStatus::Ok => "ok",
SpanStatus::Error => "error",
};
let duration_ns =
u64::try_from(ended_at.duration_since(started_at).as_nanos()).unwrap_or(u64::MAX);
span.record("effectful_status", status_label);
span.record("effectful_duration_ns", duration_ns);
}
record_span_end(context.span_id, attributes, status, ended_at);
}
let event = match &out {
Ok(_) => EffectEvent::Success {
span: span_name.clone(),
},
Err(_) => EffectEvent::Failure {
span: span_name.clone(),
},
};
record_effect_event(event);
out
})
});
let instrumented = refs
.span_annotations
.locally(initial_attributes, inner)
.run(env);
let out = if let Some(span) = tracing_span {
use ::tracing::Instrument as _;
instrumented.instrument(span).await
} else {
instrumented.await
};
span_stack_guard.pop();
out
})
})
}
impl<A, E, R> Effect<A, E, R>
where
A: 'static,
E: 'static,
R: 'static,
{
#[inline]
pub fn with_span(self, name: impl Into<Cow<'static, str>>) -> Effect<A, E, R> {
with_span(self, name)
}
#[inline]
pub fn with_span_options(self, options: SpanOptions) -> Effect<A, E, R> {
with_span_options(self, options)
}
}
pub fn snapshot_tracing() -> TracingSnapshot {
let guard = trace_state().lock().expect("trace state mutex poisoned");
TracingSnapshot {
effect_events: guard.effect_events.clone(),
fiber_events: guard.fiber_events.clone(),
spans: guard.spans.clone(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::collections::hash_map;
use crate::scheduling::TestClock;
use crate::{fail, runtime::run_blocking, succeed};
use rstest::rstest;
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
fn test_lock() -> std::sync::MutexGuard<'static, ()> {
TEST_LOCK
.get_or_init(|| Mutex::new(()))
.lock()
.expect("test lock mutex poisoned")
}
mod with_span_events {
use super::*;
#[test]
fn with_span_when_effect_succeeds_records_start_and_success_events() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let eff = with_span(succeed::<u32, (), ()>(7), "test.span");
let out = run_blocking(eff, ());
assert_eq!(out, Ok(7));
let snapshot = snapshot_tracing();
let span = snapshot
.spans
.iter()
.find(|span| span.name == "test.span")
.expect("span record");
assert_eq!(span.status, SpanStatus::Ok);
assert_eq!(span.level, SpanLevel::Info);
assert!(span.ended_at.is_some());
assert!(span.duration().is_some());
assert_eq!(
snapshot.effect_events,
vec![
EffectEvent::Start {
span: "test.span".to_string()
},
EffectEvent::Success {
span: "test.span".to_string()
}
]
);
}
#[test]
fn with_span_when_effect_fails_records_start_and_failure_events() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let eff = with_span(fail::<(), &'static str, ()>("boom"), "failure.span");
let out = run_blocking(eff, ());
assert_eq!(out, Err("boom"));
let snapshot = snapshot_tracing();
let span = snapshot
.spans
.iter()
.find(|span| span.name == "failure.span")
.expect("span record");
assert_eq!(span.status, SpanStatus::Error);
assert_eq!(
snapshot.effect_events,
vec![
EffectEvent::Start {
span: "failure.span".to_string()
},
EffectEvent::Failure {
span: "failure.span".to_string()
}
]
);
}
#[test]
fn nested_spans_record_parent_child_context() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let eff = with_span(with_span(succeed::<(), (), ()>(()), "inner"), "outer");
let out = run_blocking(eff, ());
assert_eq!(out, Ok(()));
let snapshot = snapshot_tracing();
let outer = snapshot
.spans
.iter()
.find(|span| span.name == "outer")
.expect("outer span");
let inner = snapshot
.spans
.iter()
.find(|span| span.name == "inner")
.expect("inner span");
assert_eq!(outer.parent_span_id, None);
assert_eq!(inner.parent_span_id, Some(outer.context.span_id));
assert_eq!(inner.context.trace_id, outer.context.trace_id);
}
#[test]
fn method_with_span_records_span() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let eff = succeed::<u8, (), ()>(1).with_span_options(
SpanOptions::new("method.span")
.with_level(SpanLevel::Debug)
.with_attribute("static", "yes"),
);
let out = run_blocking(eff, ());
assert_eq!(out, Ok(1));
let snapshot = snapshot_tracing();
let span = snapshot
.spans
.iter()
.find(|span| span.name == "method.span")
.expect("method span");
assert_eq!(span.level, SpanLevel::Debug);
assert_eq!(
span.attributes.get("static"),
Some(&SpanAttributeValue::String("yes".to_string()))
);
}
#[test]
fn installed_clock_controls_span_timing() {
let _guard = test_lock();
let start = Instant::now();
let clock = TestClock::new(start);
let clock_for_effect = clock.clone();
let _ = run_blocking(
install_tracing_layer_with_clock(TracingConfig::enabled(), clock),
(),
);
let eff = with_span(
Effect::new(move |_env: &mut ()| {
clock_for_effect.advance(Duration::from_millis(5));
Ok::<(), ()>(())
}),
"clock.span",
);
let out = run_blocking(eff, ());
assert_eq!(out, Ok(()));
let snapshot = snapshot_tracing();
let span = snapshot
.spans
.iter()
.find(|span| span.name == "clock.span")
.expect("clock span");
assert_eq!(span.started_at, start);
assert_eq!(span.duration(), Some(Duration::from_millis(5)));
}
}
mod traceparent {
use super::*;
#[test]
fn span_context_renders_and_parses_traceparent() {
let context = SpanContext {
trace_id: TraceId::from_bytes([1; 16]),
span_id: SpanId::from_bytes([2; 8]),
trace_flags: TraceFlags::SAMPLED,
};
let header = context.to_traceparent();
assert_eq!(
header,
"00-01010101010101010101010101010101-0202020202020202-01"
);
assert_eq!(SpanContext::from_traceparent(&header), Ok(context));
}
#[test]
fn traceparent_rejects_all_zero_ids() {
let err =
SpanContext::from_traceparent("00-00000000000000000000000000000000-0202020202020202-01")
.expect_err("all-zero trace id rejected");
assert_eq!(err, TraceParentParseError::AllZeroId);
}
}
mod tracing_bridge {
use super::*;
use std::sync::{Arc, Mutex};
use tracing::Subscriber;
use tracing::field::{Field, Visit};
use tracing::span::{Attributes, Id};
use tracing_subscriber::layer::{Context, SubscriberExt};
use tracing_subscriber::{Layer, Registry};
#[derive(Clone, Default)]
struct RecordingLayer {
spans: Arc<Mutex<Vec<String>>>,
events: Arc<Mutex<Vec<String>>>,
}
impl<S> Layer<S> for RecordingLayer
where
S: Subscriber,
{
fn on_new_span(&self, attrs: &Attributes<'_>, _id: &Id, _ctx: Context<'_, S>) {
let mut visitor = BridgeVisitor::default();
attrs.record(&mut visitor);
if let Some(name) = visitor.otel_name {
self.spans.lock().expect("spans mutex poisoned").push(name);
}
}
fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
let mut visitor = BridgeVisitor::default();
event.record(&mut visitor);
if let Some(name) = visitor.effectful_event {
self
.events
.lock()
.expect("events mutex poisoned")
.push(name);
}
}
}
#[derive(Default)]
struct BridgeVisitor {
otel_name: Option<String>,
effectful_event: Option<String>,
}
impl Visit for BridgeVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
let value = format!("{value:?}");
if field.name() == "otel_name" {
self.otel_name = Some(value.trim_matches('"').to_string());
} else if field.name() == "effectful_event" {
self.effectful_event = Some(value.trim_matches('"').to_string());
}
}
}
#[test]
fn bridge_emits_rust_tracing_spans_and_events() {
let _guard = test_lock();
let layer = RecordingLayer::default();
let subscriber = Registry::default().with(layer.clone());
tracing::subscriber::with_default(subscriber, || {
let _ = run_blocking(
install_tracing_layer(TracingConfig::enabled_with_tracing_bridge()),
(),
);
let eff = with_span(
emit_current_span_event::<()>("domain.loaded"),
"bridge.span",
);
let out = run_blocking(eff, ());
assert_eq!(out, Ok(()));
});
assert_eq!(
layer.spans.lock().expect("spans mutex poisoned").as_slice(),
&["bridge.span".to_string()]
);
assert_eq!(
layer
.events
.lock()
.expect("events mutex poisoned")
.as_slice(),
&["domain.loaded".to_string()]
);
}
#[test]
fn bridge_only_emits_without_snapshotting() {
let _guard = test_lock();
let layer = RecordingLayer::default();
let subscriber = Registry::default().with(layer.clone());
tracing::subscriber::with_default(subscriber, || {
let _ = run_blocking(
install_tracing_layer(TracingConfig::tracing_bridge_only()),
(),
);
let out = run_blocking(with_span(succeed::<u8, (), ()>(1), "bridge.only"), ());
assert_eq!(out, Ok(1));
});
assert_eq!(
layer.spans.lock().expect("spans mutex poisoned").as_slice(),
&["bridge.only".to_string()]
);
let snapshot = snapshot_tracing();
assert!(snapshot.spans.is_empty());
assert!(snapshot.effect_events.is_empty());
}
}
mod hooks_and_config {
use super::*;
#[test]
fn annotation_and_fiber_event_hooks_when_enabled_record_data() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let eff = with_span(
annotate_current_span::<(), Never, ()>("market", "SOL-PERP").flat_map(|_| {
emit_fiber_event(FiberEvent::Spawn {
fiber_id: "fiber-1".to_string(),
})
}),
"annotated.span",
);
let _ = run_blocking(eff, ());
let snapshot = snapshot_tracing();
assert_eq!(snapshot.fiber_events.len(), 1);
let span = snapshot
.spans
.iter()
.find(|s| s.name == "annotated.span")
.expect("span should be present");
assert_eq!(
span.attributes.get("market"),
Some(&SpanAttributeValue::String("SOL-PERP".to_string()))
);
}
#[test]
fn typed_attributes_are_recorded() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let eff = with_span(
annotate_current_span_attribute::<(), Never, ()>("retry", 2_i32)
.flat_map(|_| annotate_current_span_attribute::<(), Never, ()>("cached", true)),
"typed.span",
);
let _ = run_blocking(eff, ());
let snapshot = snapshot_tracing();
let span = snapshot
.spans
.iter()
.find(|s| s.name == "typed.span")
.expect("span should be present");
assert_eq!(
span.attributes.get("retry"),
Some(&SpanAttributeValue::I64(2))
);
assert_eq!(
span.attributes.get("cached"),
Some(&SpanAttributeValue::Bool(true))
);
}
#[test]
fn current_span_events_are_recorded() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let attributes = hash_map::set(
&hash_map::empty(),
"rows".to_string(),
SpanAttributeValue::I64(3),
);
let eff = with_span(
emit_current_span_event_with_attributes::<()>("loaded", attributes),
"event.span",
);
let _ = run_blocking(eff, ());
let snapshot = snapshot_tracing();
let span = snapshot
.spans
.iter()
.find(|s| s.name == "event.span")
.expect("span should be present");
assert_eq!(span.events.len(), 1);
assert_eq!(span.events[0].name, "loaded");
assert_eq!(
span.events[0].attributes.get("rows"),
Some(&SpanAttributeValue::I64(3))
);
}
#[rstest]
#[case::effect_event(0)]
#[case::fiber_event(1)]
fn emit_hooks_when_tracing_disabled_do_not_record_events(#[case] mode: u8) {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::default()), ());
if mode == 0 {
let _ = run_blocking(
emit_effect_event(EffectEvent::Start {
span: "disabled.span".to_string(),
}),
(),
);
} else {
let _ = run_blocking(
emit_fiber_event(FiberEvent::Spawn {
fiber_id: "fiber-disabled".to_string(),
}),
(),
);
}
let snapshot = snapshot_tracing();
assert!(snapshot.effect_events.is_empty());
assert!(snapshot.fiber_events.is_empty());
assert!(snapshot.spans.is_empty());
}
#[test]
fn annotate_current_span_when_no_active_span_is_present_is_noop() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let _ = run_blocking(annotate_current_span::<(), Never, ()>("k", "v"), ());
let snapshot = snapshot_tracing();
assert!(snapshot.spans.is_empty());
assert!(snapshot.effect_events.is_empty());
assert!(snapshot.fiber_events.is_empty());
}
#[test]
fn tracing_config_enabled_constructor_sets_enabled_true() {
let cfg = TracingConfig::enabled();
assert!(cfg.enabled);
assert!(cfg.record_in_memory);
}
#[test]
fn span_decides_enabled_state_when_run_not_when_constructed() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::default()), ());
let eff = with_span(succeed::<u8, (), ()>(1), "late.enabled");
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let out = run_blocking(eff, ());
assert_eq!(out, Ok(1));
let snapshot = snapshot_tracing();
assert!(
snapshot
.spans
.iter()
.any(|span| span.name == "late.enabled")
);
}
#[test]
fn span_decides_disabled_state_when_run_not_when_constructed() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let eff = with_span(succeed::<u8, (), ()>(1), "late.disabled");
let _ = run_blocking(install_tracing_layer(TracingConfig::default()), ());
let out = run_blocking(eff, ());
assert_eq!(out, Ok(1));
let snapshot = snapshot_tracing();
assert!(snapshot.spans.is_empty());
}
#[test]
fn zero_sample_rate_records_no_spans() {
let _guard = test_lock();
let _ = run_blocking(
install_tracing_layer(TracingConfig {
enabled: true,
bridge_to_tracing: false,
record_in_memory: true,
default_sample_rate: 0.0,
}),
(),
);
let out = run_blocking(with_span(succeed::<u8, (), ()>(1), "never.sampled"), ());
assert_eq!(out, Ok(1));
assert!(snapshot_tracing().spans.is_empty());
}
#[test]
fn sampling_mask_rounds_down_to_power_of_two_cadence() {
assert_eq!(sampling_mask(1.0), 0);
assert_eq!(sampling_mask(0.5), 1);
assert_eq!(sampling_mask(0.3), 3);
assert_eq!(sampling_mask(0.25), 3);
assert_eq!(sampling_mask(0.0), SAMPLING_NEVER);
}
#[test]
fn tracing_snapshot_attributes_preserved_across_clone() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let eff = with_span(
annotate_current_span::<(), Never, ()>("market", "SOL-PERP"),
"clone.span",
);
let _ = run_blocking(eff, ());
let snap = snapshot_tracing();
let mut snap_clone = snap.clone();
let span = snap_clone
.spans
.iter_mut()
.find(|s| s.name == "clone.span")
.expect("span recorded");
span.attributes = hash_map::set(
&span.attributes,
"market".to_string(),
SpanAttributeValue::String("edited".to_string()),
);
let orig = snap
.spans
.iter()
.find(|s| s.name == "clone.span")
.expect("span in original snapshot");
assert_eq!(
orig.attributes.get("market"),
Some(&SpanAttributeValue::String("SOL-PERP".to_string()))
);
assert_eq!(
snap_clone
.spans
.iter()
.find(|s| s.name == "clone.span")
.expect("span in clone")
.attributes
.get("market"),
Some(&SpanAttributeValue::String("edited".to_string()))
);
}
}
mod fiber_local_tracing {
use super::*;
use crate::concurrency::fiber_ref::with_fiber_id;
use crate::runtime::FiberId;
#[test]
fn annotation_isolated_per_fiber() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let ef_a = with_span(
annotate_current_span::<(), Never, ()>("k", "fiber-a"),
"span.a",
);
let ef_b = with_span(
annotate_current_span::<(), Never, ()>("k", "fiber-b"),
"span.b",
);
with_fiber_id(FiberId::fresh(), || {
let _ = run_blocking(ef_a, ());
});
with_fiber_id(FiberId::fresh(), || {
let _ = run_blocking(ef_b, ());
});
let snap = snapshot_tracing();
let sa = snap
.spans
.iter()
.find(|s| s.name == "span.a")
.expect("span.a");
let sb = snap
.spans
.iter()
.find(|s| s.name == "span.b")
.expect("span.b");
assert_eq!(
sa.attributes.get("k"),
Some(&SpanAttributeValue::String("fiber-a".to_string()))
);
assert_eq!(
sb.attributes.get("k"),
Some(&SpanAttributeValue::String("fiber-b".to_string()))
);
}
#[test]
fn span_stack_not_shared_between_fibers() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let refs = fiber_refs().expect("refs").clone();
let id_a = FiberId::fresh();
let id_b = FiberId::fresh();
with_fiber_id(id_a, || {
run_blocking(
refs.span_stack.update(|mut v| {
v.push(LogSpan {
name: "only-a".into(),
context: SpanContext {
trace_id: TraceId::from_bytes([0; 16]),
span_id: SpanId::from_bytes([1; 8]),
trace_flags: TraceFlags::DEFAULT,
},
tracing_span: None,
});
v
}),
(),
)
.expect("push stack");
});
with_fiber_id(id_b, || {
let len = run_blocking(refs.span_stack.get(), ())
.expect("get stack")
.len();
assert_eq!(len, 0, "B should not see A's stack");
});
with_fiber_id(id_a, || {
let len = run_blocking(refs.span_stack.get(), ())
.expect("get stack a")
.len();
assert_eq!(len, 1);
});
}
#[test]
fn with_span_pushes_then_pops() {
let _guard = test_lock();
let _ = run_blocking(install_tracing_layer(TracingConfig::enabled()), ());
let refs = fiber_refs().expect("refs").clone();
let eff = with_span(with_span(succeed::<(), (), ()>(()), "inner"), "outer");
let _ = run_blocking(eff, ());
let len = run_blocking(refs.span_stack.get(), ())
.expect("stack len")
.len();
assert_eq!(len, 0);
}
}
}