use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use protosocket::TcpSocketListener;
use protosocket_messagepack::{MessagePackDecoder, MessagePackSerializer};
use protosocket_rpc::Message;
use protosocket_rpc::server::{ConnectionService, RpcResponder, SocketRpcServer, SocketService};
use tokio::sync::watch;
use tracing::metadata::LevelFilter;
use tracing_cache::{ChanceHandle, EnabledPredicate, LevelHandle, SpanCache, SpanRecord};
use crate::protocol::{Request, RequestBody, Response, WireLevel, WireLevelFilter};
use crate::wire::{TimeBase, span_to_wire};
type ServerCodec = (MessagePackSerializer<Response>, MessagePackDecoder<Request>);
const STREAM_SUBSCRIBER_CAPACITY: u64 = 65_536;
#[derive(Debug, Default)]
struct StreamState {
streaming: bool,
min_level: Option<WireLevel>,
sampling_rate: f64,
}
impl StreamState {
fn new() -> Self {
Self {
streaming: false,
min_level: None,
sampling_rate: 1.0,
}
}
}
#[derive(Clone)]
pub(crate) struct CacheLevelBroadcast {
level_handle: LevelHandle,
level_tx: watch::Sender<WireLevelFilter>,
chance_handle: ChanceHandle,
chance_tx: watch::Sender<f64>,
active_streams: Arc<AtomicUsize>,
}
impl CacheLevelBroadcast {
pub fn new(level_handle: LevelHandle, chance_handle: ChanceHandle) -> Self {
let initial_level = WireLevelFilter::from_tracing(level_handle.get());
let initial_chance = chance_handle.get();
let (level_tx, _) = watch::channel(initial_level);
let (chance_tx, _) = watch::channel(initial_chance);
Self {
level_handle,
level_tx,
chance_handle,
chance_tx,
active_streams: Arc::new(AtomicUsize::new(0)),
}
}
fn set_level(&self, filter: WireLevelFilter) {
self.level_handle.set(filter.to_tracing());
let _ = self.level_tx.send(filter);
}
fn set_chance(&self, pct: f64) {
let pct = if pct.is_nan() {
0.0
} else {
pct.clamp(0.0, 100.0)
};
self.chance_handle.set(pct);
let _ = self.chance_tx.send(pct);
}
fn subscribe_level(&self) -> watch::Receiver<WireLevelFilter> {
self.level_tx.subscribe()
}
fn subscribe_chance(&self) -> watch::Receiver<f64> {
self.chance_tx.subscribe()
}
fn enter_stream(&self) -> StreamGuard {
self.active_streams.fetch_add(1, Ordering::SeqCst);
StreamGuard {
broadcast: self.clone(),
}
}
}
pub(crate) struct StreamGuard {
broadcast: CacheLevelBroadcast,
}
impl Drop for StreamGuard {
fn drop(&mut self) {
let prev = self.broadcast.active_streams.fetch_sub(1, Ordering::SeqCst);
if prev == 1 {
self.broadcast.level_handle.set(LevelFilter::OFF);
let _ = self.broadcast.level_tx.send(WireLevelFilter::Off);
self.broadcast.chance_handle.set(100.0);
let _ = self.broadcast.chance_tx.send(100.0);
}
}
}
pub(crate) struct ConnectionState<P: EnabledPredicate> {
cache: Arc<SpanCache<P>>,
base: TimeBase,
state: Arc<RwLock<StreamState>>,
level_bus: CacheLevelBroadcast,
stream_guard: Option<StreamGuard>,
}
impl<P: EnabledPredicate> ConnectionState<P> {
fn new(cache: Arc<SpanCache<P>>, base: TimeBase, level_bus: CacheLevelBroadcast) -> Self {
Self {
cache,
base,
state: Arc::new(RwLock::new(StreamState::new())),
level_bus,
stream_guard: None,
}
}
}
impl<P: EnabledPredicate> ConnectionService for ConnectionState<P> {
type Request = Request;
type Response = Response;
#[allow(clippy::expect_used, reason = "poisoned lock")]
fn new_rpc(&mut self, msg: Request, responder: RpcResponder<'_, Response>) {
let request_id = msg.message_id();
match msg.body {
RequestBody::StartStream => {
self.state
.write()
.expect("lock must not be poisoned")
.streaming = true;
if self.stream_guard.is_none() {
self.stream_guard = Some(self.level_bus.enter_stream());
}
let cache = Arc::clone(&self.cache);
let state = Arc::clone(&self.state);
let base = self.base;
let level_rx = self.level_bus.subscribe_level();
let chance_rx = self.level_bus.subscribe_chance();
tokio::spawn(responder.stream(span_stream(
cache, state, base, level_rx, chance_rx, request_id,
)));
}
RequestBody::StopStream => {
self.state
.write()
.expect("lock must not be poisoned")
.streaming = false;
responder.immediate(Response::ack().with_id(request_id));
}
RequestBody::SetLevel(level) => {
self.state
.write()
.expect("lock must not be poisoned")
.min_level = Some(level);
responder.immediate(Response::ack().with_id(request_id));
}
RequestBody::SetCacheLevel(filter) => {
self.level_bus.set_level(filter);
responder.immediate(Response::ack().with_id(request_id));
}
RequestBody::SetCacheChance(pct) => {
self.level_bus.set_chance(pct);
responder.immediate(Response::ack().with_id(request_id));
}
RequestBody::SetSamplingRate(rate) => {
if !(0.0..=1.0).contains(&rate) || rate.is_nan() {
responder.immediate(
Response::error(format!("sampling rate {rate} out of range [0.0, 1.0]"))
.with_id(request_id),
);
return;
}
self.state
.write()
.expect("lock must not be poisoned")
.sampling_rate = rate;
responder.immediate(Response::ack().with_id(request_id));
}
RequestBody::Noop => {}
}
}
}
fn span_stream<P: EnabledPredicate>(
cache: Arc<SpanCache<P>>,
state: Arc<RwLock<StreamState>>,
base: TimeBase,
mut level_rx: watch::Receiver<WireLevelFilter>,
mut chance_rx: watch::Receiver<f64>,
request_id: u64,
) -> impl futures_core::Stream<Item = Response> {
async_stream::stream! {
yield Response::server_info(env!("CARGO_PKG_VERSION")).with_id(request_id);
let initial_level = *level_rx.borrow_and_update();
yield Response::cache_level(initial_level).with_id(request_id);
let initial_chance = *chance_rx.borrow_and_update();
yield Response::cache_chance(initial_chance).with_id(request_id);
let mut span_rx = cache.subscribe(STREAM_SUBSCRIBER_CAPACITY);
loop {
tokio::select! {
changed = level_rx.changed() => {
if changed.is_err() { break; }
let lvl = *level_rx.borrow_and_update();
yield Response::cache_level(lvl).with_id(request_id);
}
changed = chance_rx.changed() => {
if changed.is_err() { break; }
let pct = *chance_rx.borrow_and_update();
yield Response::cache_chance(pct).with_id(request_id);
}
batch = span_rx.next_batch() => {
let Some(batch) = batch else { break };
let (streaming, min_level, sampling_rate) = {
#[allow(clippy::expect_used, reason = "poisoned lock")]
let s = state.read().expect("lock must not be poisoned");
(s.streaming, s.min_level, s.sampling_rate)
};
if !streaming {
drop(batch);
continue;
}
for record in batch {
if let Some(min) = min_level
&& !level_at_least(record.metadata.level(), min)
{
continue;
}
if !sampling_passes(&record, sampling_rate) {
continue;
}
yield Response::span(span_to_wire(&record, base)).with_id(request_id);
}
}
}
}
}
}
fn level_at_least(record_level: &tracing::Level, floor: WireLevel) -> bool {
record_level <= &floor.to_tracing()
}
fn sampling_passes(record: &SpanRecord, rate: f64) -> bool {
if rate >= 1.0 {
return true;
}
if rate <= 0.0 {
return false;
}
let bucket_id = record.parent_id.unwrap_or(record.id);
let mut x = bucket_id.wrapping_mul(0x9E37_79B9_7F4A_7C15);
x ^= x >> 33;
x = x.wrapping_mul(0xC2B2_AE3D_27D4_EB4F);
x ^= x >> 29;
let frac = (x as f64) / (u64::MAX as f64);
frac < rate
}
struct Service<P: EnabledPredicate> {
cache: Arc<SpanCache<P>>,
base: TimeBase,
level_bus: CacheLevelBroadcast,
}
impl<P: EnabledPredicate> SocketService for Service<P> {
type Codec = ServerCodec;
type ConnectionService = ConnectionState<P>;
type SocketListener = TcpSocketListener;
fn codec(&self) -> Self::Codec {
(
MessagePackSerializer::default(),
MessagePackDecoder::default(),
)
}
fn new_stream_service(
&self,
_stream: &<Self::SocketListener as protosocket::SocketListener>::Stream,
) -> Self::ConnectionService {
ConnectionState::new(Arc::clone(&self.cache), self.base, self.level_bus.clone())
}
}
#[derive(Debug)]
pub enum ServeError {
Io(std::io::Error),
Rpc(protosocket_rpc::Error),
}
impl std::fmt::Display for ServeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ServeError::Io(e) => write!(f, "io: {e}"),
ServeError::Rpc(e) => write!(f, "rpc: {e}"),
}
}
}
impl std::error::Error for ServeError {}
impl From<std::io::Error> for ServeError {
fn from(e: std::io::Error) -> Self {
ServeError::Io(e)
}
}
impl From<protosocket_rpc::Error> for ServeError {
fn from(e: protosocket_rpc::Error) -> Self {
ServeError::Rpc(e)
}
}
pub async fn serve<P: EnabledPredicate>(
cache: Arc<SpanCache<P>>,
level_handle: LevelHandle,
chance_handle: ChanceHandle,
addr: SocketAddr,
) -> Result<(), ServeError> {
let listener = TcpSocketListener::listen(addr, 1024, None)?;
let service = Service {
cache,
base: TimeBase::now(),
level_bus: CacheLevelBroadcast::new(level_handle, chance_handle),
};
let server: SocketRpcServer<Service<P>, _> = SocketRpcServer::new(
listener,
service,
16 * 1024 * 1024,
64 * 1024,
4096,
)?;
server.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::TcpListener as StdTcpListener;
use std::time::Duration;
use futures::StreamExt;
use protosocket_messagepack::{MessagePackDecoder, MessagePackSerializer};
use protosocket_rpc::client::{self, Configuration, RpcClient, TcpStreamConnector};
use tracing_cache::{ChancePredicate, SpanCache};
use crate::protocol::{ResponseBody, WireLevel};
type ClientCodec = (MessagePackSerializer<Request>, MessagePackDecoder<Response>);
fn pick_addr() -> SocketAddr {
let listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
format!("127.0.0.1:{port}").parse().unwrap()
}
fn prepare_cache() -> (
Arc<SpanCache<ChancePredicate<tracing_cache::LevelPredicate>>>,
LevelHandle,
ChanceHandle,
) {
let level =
tracing_cache::LevelPredicate::with_filter(tracing::metadata::LevelFilter::TRACE);
let level_handle = level.handle();
let predicate = ChancePredicate::new(level, 100.0);
let chance_handle = predicate.handle();
let (cache, driver) = SpanCache::with_predicate(1024, predicate);
let cache = Arc::new(cache);
tokio::spawn(driver.run());
(cache, level_handle, chance_handle)
}
fn emit_under<P: EnabledPredicate>(cache: &Arc<SpanCache<P>>, f: impl FnOnce()) {
tracing::subscriber::with_default(Arc::clone(cache), f);
cache.flush_pending();
}
async fn wait_for_initial(
stream: &mut (impl futures::Stream<Item = Result<Response, protosocket_rpc::Error>> + Unpin),
) {
let mut got_server_info = false;
let mut got_level = false;
let mut got_chance = false;
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
while !(got_server_info && got_level && got_chance)
&& tokio::time::Instant::now() < deadline
{
match tokio::time::timeout(Duration::from_millis(200), stream.next()).await {
Ok(Some(Ok(resp))) => match resp.body {
ResponseBody::ServerInfo(_) => got_server_info = true,
ResponseBody::CacheLevel(_) => got_level = true,
ResponseBody::CacheChance(_) => got_chance = true,
_ => {}
},
_ => break,
}
}
assert!(
got_server_info && got_level && got_chance,
"stream did not yield initial ServerInfo/CacheLevel/CacheChance",
);
}
async fn spawn_server<P: EnabledPredicate>(
cache: Arc<SpanCache<P>>,
level_handle: LevelHandle,
chance_handle: ChanceHandle,
) -> (SocketAddr, tokio::task::JoinHandle<()>) {
let addr = pick_addr();
let server_cache = Arc::clone(&cache);
let serve_level = level_handle.clone();
let serve_chance = chance_handle.clone();
let handle = tokio::spawn(async move {
let _ = serve(server_cache, serve_level, serve_chance, addr).await;
});
for _ in 0..50 {
if std::net::TcpStream::connect(addr).is_ok() {
return (addr, handle);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
panic!("server never came up on {addr}");
}
async fn connect_client(addr: SocketAddr) -> RpcClient<Request, Response> {
let cfg = Configuration::new(TcpStreamConnector);
let (rpc_client, conn) = client::connect::<ClientCodec, _>(addr, &cfg).await.unwrap();
tokio::spawn(conn);
rpc_client
}
async fn collect_spans(
stream: &mut (impl futures::Stream<Item = Result<Response, protosocket_rpc::Error>> + Unpin),
n: usize,
total_timeout: Duration,
) -> Vec<crate::WireSpan> {
let mut out = Vec::with_capacity(n);
let deadline = tokio::time::Instant::now() + total_timeout;
while out.len() < n {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
match tokio::time::timeout(remaining, stream.next()).await {
Ok(Some(Ok(resp))) => {
if let ResponseBody::Span(s) = resp.body {
out.push(s);
}
}
Ok(Some(Err(_))) | Ok(None) => break,
Err(_) => break,
}
}
out
}
#[tokio::test]
async fn start_stream_delivers_closed_spans() {
let (cache, level_handle, chance_handle) = prepare_cache();
let (addr, server) = spawn_server(
Arc::clone(&cache),
level_handle.clone(),
chance_handle.clone(),
)
.await;
let client = connect_client(addr).await;
let mut stream = client
.send_streaming(Request::new(RequestBody::StartStream))
.unwrap();
wait_for_initial(&mut stream).await;
emit_under(&cache, || {
for _ in 0..3 {
let span = tracing::span!(parent: None, tracing::Level::INFO, "test_a");
let _g = span.enter();
}
});
let received = collect_spans(&mut stream, 3, Duration::from_secs(2)).await;
assert_eq!(received.len(), 3);
assert!(received.iter().all(|s| s.name == "test_a"));
assert!(received.iter().all(|s| s.closed_at_ns.is_some()));
server.abort();
}
#[tokio::test]
async fn stop_stream_halts_delivery() {
let (cache, level_handle, chance_handle) = prepare_cache();
let (addr, server) = spawn_server(
Arc::clone(&cache),
level_handle.clone(),
chance_handle.clone(),
)
.await;
let client = connect_client(addr).await;
let mut stream = client
.send_streaming(Request::new(RequestBody::StartStream))
.unwrap();
wait_for_initial(&mut stream).await;
emit_under(&cache, || {
let _g = tracing::span!(parent: None, tracing::Level::INFO, "test_b").entered();
});
let initial = collect_spans(&mut stream, 1, Duration::from_secs(2)).await;
assert_eq!(initial.len(), 1);
let ack = client
.send_unary(Request::new(RequestBody::StopStream))
.unwrap()
.await
.unwrap();
assert!(matches!(ack.body, ResponseBody::Ack));
tokio::time::sleep(Duration::from_millis(50)).await;
emit_under(&cache, || {
for _ in 0..5 {
let _g = tracing::span!(parent: None, tracing::Level::INFO, "test_b").entered();
}
});
let drained_after_stop = collect_spans(&mut stream, 5, Duration::from_millis(300)).await;
assert!(
drained_after_stop.len() < 5,
"stream did not stop: got {} more spans after StopStream",
drained_after_stop.len(),
);
server.abort();
}
#[tokio::test]
async fn set_level_filters_below_threshold() {
let (cache, level_handle, chance_handle) = prepare_cache();
let (addr, server) = spawn_server(
Arc::clone(&cache),
level_handle.clone(),
chance_handle.clone(),
)
.await;
let client = connect_client(addr).await;
let ack = client
.send_unary(Request::new(RequestBody::SetLevel(WireLevel::Info)))
.unwrap()
.await
.unwrap();
assert!(matches!(ack.body, ResponseBody::Ack));
let mut stream = client
.send_streaming(Request::new(RequestBody::StartStream))
.unwrap();
wait_for_initial(&mut stream).await;
emit_under(&cache, || {
drop(tracing::span!(parent: None, tracing::Level::INFO, "info_span"));
drop(tracing::span!(parent: None, tracing::Level::DEBUG, "debug_span"));
});
let received = collect_spans(&mut stream, 2, Duration::from_millis(500)).await;
let names: Vec<_> = received.iter().map(|s| s.name.as_str()).collect();
assert_eq!(names, vec!["info_span"], "got: {names:?}");
server.abort();
}
#[tokio::test]
async fn set_sampling_rate_zero_drops_all() {
let (cache, level_handle, chance_handle) = prepare_cache();
let (addr, server) = spawn_server(
Arc::clone(&cache),
level_handle.clone(),
chance_handle.clone(),
)
.await;
let client = connect_client(addr).await;
client
.send_unary(Request::new(RequestBody::SetSamplingRate(0.0)))
.unwrap()
.await
.unwrap();
let mut stream = client
.send_streaming(Request::new(RequestBody::StartStream))
.unwrap();
wait_for_initial(&mut stream).await;
emit_under(&cache, || {
for _ in 0..5 {
let _g = tracing::span!(parent: None, tracing::Level::INFO, "sampled").entered();
}
});
let received = collect_spans(&mut stream, 5, Duration::from_millis(400)).await;
assert!(
received.is_empty(),
"rate=0 should drop everything; got {received:?}",
);
server.abort();
}
#[tokio::test]
async fn set_cache_level_keeps_stream_open() {
let (cache, level_handle, chance_handle) = prepare_cache();
let (addr, server) = spawn_server(
Arc::clone(&cache),
level_handle.clone(),
chance_handle.clone(),
)
.await;
let client = connect_client(addr).await;
let mut start = Request::new(RequestBody::StartStream);
start.id = 100;
let mut stream = client.send_streaming(start).unwrap();
let first = tokio::time::timeout(Duration::from_secs(1), stream.next())
.await
.unwrap()
.unwrap()
.unwrap();
let server_info = match first.body {
ResponseBody::ServerInfo(info) => info,
other => panic!("first message should be ServerInfo, got {other:?}"),
};
assert_eq!(
server_info.version,
env!("CARGO_PKG_VERSION"),
"server should advertise its own CARGO_PKG_VERSION",
);
let mut drained_level = false;
let mut drained_chance = false;
while !(drained_level && drained_chance) {
let item = tokio::time::timeout(Duration::from_millis(500), stream.next())
.await
.unwrap()
.unwrap()
.unwrap();
match item.body {
ResponseBody::CacheLevel(_) => drained_level = true,
ResponseBody::CacheChance(_) => drained_chance = true,
other => panic!("unexpected message during initial drain: {other:?}"),
}
}
let mut set = Request::new(RequestBody::SetCacheLevel(WireLevelFilter::Off));
set.id = 101;
let ack = client.send_unary(set).unwrap().await.unwrap();
assert!(matches!(ack.body, ResponseBody::Ack));
let mut next_level: Option<WireLevelFilter> = None;
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
while tokio::time::Instant::now() < deadline && next_level.is_none() {
let item = tokio::time::timeout(Duration::from_millis(200), stream.next()).await;
let Ok(Some(Ok(resp))) = item else { continue };
match resp.body {
ResponseBody::CacheLevel(l) => next_level = Some(l),
ResponseBody::CacheChance(_) => continue,
ResponseBody::ServerInfo(_) => continue,
ResponseBody::Span(_) => continue,
other => panic!("unexpected stream item: {other:?}"),
}
}
assert_eq!(
next_level,
Some(WireLevelFilter::Off),
"stream did not yield the updated CacheLevel (probably ended)",
);
server.abort();
}
#[tokio::test]
async fn level_resets_to_off_when_last_console_disconnects() {
let (cache, level_handle, chance_handle) = prepare_cache();
level_handle.set(LevelFilter::INFO);
let (addr, server) = spawn_server(
Arc::clone(&cache),
level_handle.clone(),
chance_handle.clone(),
)
.await;
{
let client = connect_client(addr).await;
let mut start = Request::new(RequestBody::StartStream);
start.id = 200;
let _stream = client.send_streaming(start).unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
}
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(
level_handle.get(),
LevelFilter::OFF,
"level should have reset to OFF after last console disconnected",
);
server.abort();
}
use std::time::Instant;
use tracing::callsite::{Callsite, DefaultCallsite, Identifier};
use tracing::field::FieldSet;
use tracing::metadata::Kind;
use tracing_cache::{FieldList, SpanRecord};
static SAMPLING_CALLSITE: DefaultCallsite = {
static META: tracing::Metadata<'static> = tracing::Metadata::new(
"sampling_test",
"sampling::test",
tracing::Level::INFO,
None,
None,
None,
FieldSet::new(&[], Identifier(&SAMPLING_CALLSITE)),
Kind::SPAN,
);
DefaultCallsite::new(&META)
};
fn synth_span(id: u64, parent_id: Option<u64>) -> SpanRecord {
SpanRecord {
id,
parent_id,
metadata: SAMPLING_CALLSITE.metadata(),
fields: FieldList::default(),
events: Vec::new(),
opened_at: Instant::now(),
closed_at: Some(Instant::now()),
}
}
#[test]
fn sampling_passes_rate_one_short_circuits_true() {
for id in [0u64, 1, 17, u64::MAX, 0x9E37_79B9_7F4A_7C15] {
assert!(sampling_passes(&synth_span(id, None), 1.0));
}
}
#[test]
fn sampling_passes_rate_zero_short_circuits_false() {
for id in [0u64, 1, 17, u64::MAX] {
assert!(!sampling_passes(&synth_span(id, None), 0.0));
}
}
#[test]
fn sampling_passes_is_deterministic_per_root_id() {
for id in 1u64..=20 {
let r = synth_span(id, None);
let first = sampling_passes(&r, 0.5);
for _ in 0..3 {
assert_eq!(sampling_passes(&r, 0.5), first, "id={id}");
}
}
}
#[test]
fn sampling_passes_children_inherit_parents_root_id_bucket() {
let root = synth_span(7, None);
let want = sampling_passes(&root, 0.5);
for child_id in [100u64, 200, 300, u64::MAX] {
let child = synth_span(child_id, Some(7));
assert_eq!(sampling_passes(&child, 0.5), want);
}
}
#[test]
fn sampling_passes_partitions_population_near_target_rate() {
let rate = 0.3;
let n = 5_000u64;
let mut passed = 0usize;
for id in 1..=n {
if sampling_passes(&synth_span(id, None), rate) {
passed += 1;
}
}
let frac = passed as f64 / n as f64;
assert!(
(frac - rate).abs() < 0.03,
"frac={frac} rate={rate} — hash distribution drifted",
);
}
#[tokio::test]
async fn set_sampling_rate_rejects_out_of_range() {
let (cache, level_handle, chance_handle) = prepare_cache();
let (addr, server) = spawn_server(
Arc::clone(&cache),
level_handle.clone(),
chance_handle.clone(),
)
.await;
let client = connect_client(addr).await;
for bad in [1.5_f64, -0.1, f64::NAN] {
let resp = client
.send_unary(Request::new(RequestBody::SetSamplingRate(bad)))
.unwrap()
.await
.unwrap();
match resp.body {
ResponseBody::Error(msg) => {
assert!(
msg.contains("sampling rate"),
"unexpected error message for {bad}: {msg}",
);
}
other => panic!("expected Error for rate={bad}, got {other:?}"),
}
}
server.abort();
}
}