#![doc = include_str!("../README.md")]
#![allow(clippy::type_complexity)]
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use parking_lot::Mutex;
use rapace::{Frame, RpcError, RpcSession};
use tracing::span::{Attributes, Record};
use tracing::{Event, Id, Subscriber};
use tracing_subscriber::Layer;
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
#[derive(Debug, Clone, facet::Facet)]
pub struct Field {
pub name: String,
pub value: String,
}
#[derive(Debug, Clone, facet::Facet)]
pub struct SpanMeta {
pub name: String,
pub target: String,
pub level: String,
pub file: Option<String>,
pub line: Option<u32>,
pub fields: Vec<Field>,
}
#[derive(Debug, Clone, facet::Facet)]
pub struct EventMeta {
pub message: String,
pub target: String,
pub level: String,
pub file: Option<String>,
pub line: Option<u32>,
pub fields: Vec<Field>,
pub parent_span_id: Option<u64>,
}
#[allow(async_fn_in_trait)]
#[rapace::service]
pub trait TracingSink {
async fn new_span(&self, span: crate::SpanMeta) -> u64;
async fn record(&self, span_id: u64, fields: Vec<crate::Field>);
async fn event(&self, event: crate::EventMeta);
async fn enter(&self, span_id: u64);
async fn exit(&self, span_id: u64);
async fn drop_span(&self, span_id: u64);
}
#[allow(async_fn_in_trait)]
#[rapace::service]
pub trait TracingConfig {
async fn set_filter(&self, filter: String);
}
use tracing_subscriber::EnvFilter;
#[derive(Clone)]
pub struct SharedFilter {
inner: Arc<parking_lot::RwLock<EnvFilter>>,
}
impl SharedFilter {
pub fn new() -> Self {
let default_filter = std::env::var("RAPACE_TRACING_DEFAULT_FILTER")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "warn".to_string());
let filter = EnvFilter::new(default_filter);
Self {
inner: Arc::new(parking_lot::RwLock::new(filter)),
}
}
pub fn set_filter(&self, filter_str: &str) {
match EnvFilter::builder().parse(filter_str) {
Ok(filter) => {
*self.inner.write() = filter;
}
Err(e) => {
eprintln!("rapace-tracing: invalid filter '{}': {}", filter_str, e);
}
}
}
pub fn max_level_enabled(&self, level: tracing::level_filters::LevelFilter) -> bool {
let filter = self.inner.read();
if let Some(max) = filter.max_level_hint() {
level <= max
} else {
true
}
}
pub fn max_level_hint(&self) -> Option<tracing::level_filters::LevelFilter> {
self.inner.read().max_level_hint()
}
}
impl Default for SharedFilter {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct TracingConfigImpl {
filter: SharedFilter,
}
impl TracingConfigImpl {
pub fn new(filter: SharedFilter) -> Self {
Self { filter }
}
}
impl TracingConfig for TracingConfigImpl {
async fn set_filter(&self, filter: String) {
self.filter.set_filter(&filter);
}
}
pub fn create_tracing_config_dispatcher(
config: TracingConfigImpl,
) -> impl Fn(Frame) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
+ Send
+ Sync
+ 'static {
move |request: Frame| {
let config = config.clone();
Box::pin(async move {
let server = TracingConfigServer::new(config);
let mut response = server
.dispatch(request.desc.method_id, request.payload_bytes())
.await?;
response.desc.channel_id = request.desc.channel_id;
response.desc.msg_id = request.desc.msg_id;
Ok(response)
})
}
}
pub struct RapaceTracingLayer {
session: Arc<RpcSession>,
span_ids: Mutex<HashMap<u64, u64>>,
next_span_id: AtomicU64,
rt: tokio::runtime::Handle,
filter: SharedFilter,
}
impl RapaceTracingLayer {
pub fn new(session: Arc<RpcSession>, rt: tokio::runtime::Handle) -> (Self, SharedFilter) {
let filter = SharedFilter::new();
let layer = Self {
session,
span_ids: Mutex::new(HashMap::new()),
next_span_id: AtomicU64::new(1),
rt,
filter: filter.clone(),
};
(layer, filter)
}
pub fn with_filter(
session: Arc<RpcSession>,
rt: tokio::runtime::Handle,
filter: SharedFilter,
) -> Self {
Self {
session,
span_ids: Mutex::new(HashMap::new()),
next_span_id: AtomicU64::new(1),
rt,
filter,
}
}
fn call_new_span(&self, meta: SpanMeta) -> u64 {
let local_id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
let session = self.session.clone();
self.rt.spawn(async move {
let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&meta).unwrap();
let _ = session
.notify(TRACING_SINK_METHOD_ID_NEW_SPAN, request_bytes)
.await;
});
local_id
}
fn call_record(&self, span_id: u64, fields: Vec<Field>) {
let session = self.session.clone();
self.rt.spawn(async move {
let request_bytes: Vec<u8> =
rapace::facet_postcard::to_vec(&(span_id, fields)).unwrap();
let _ = session
.notify(TRACING_SINK_METHOD_ID_RECORD, request_bytes)
.await;
});
}
fn call_event(&self, event: EventMeta) {
let session = self.session.clone();
self.rt.spawn(async move {
let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&event).unwrap();
let _ = session
.notify(TRACING_SINK_METHOD_ID_EVENT, request_bytes)
.await;
});
}
fn call_enter(&self, span_id: u64) {
let session = self.session.clone();
self.rt.spawn(async move {
let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&span_id).unwrap();
let _ = session
.notify(TRACING_SINK_METHOD_ID_ENTER, request_bytes)
.await;
});
}
fn call_exit(&self, span_id: u64) {
let session = self.session.clone();
self.rt.spawn(async move {
let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&span_id).unwrap();
let _ = session
.notify(TRACING_SINK_METHOD_ID_EXIT, request_bytes)
.await;
});
}
fn call_drop_span(&self, span_id: u64) {
let session = self.session.clone();
self.rt.spawn(async move {
let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&span_id).unwrap();
let _ = session
.notify(TRACING_SINK_METHOD_ID_DROP_SPAN, request_bytes)
.await;
});
}
}
impl<S> Layer<S> for RapaceTracingLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn enabled(&self, metadata: &tracing::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
let target = metadata.target();
if target.starts_with("rapace_tracing")
|| target.starts_with("rapace_core")
|| target.starts_with("rapace_transport_shm")
{
return false;
}
let level = match *metadata.level() {
tracing::Level::ERROR => tracing::level_filters::LevelFilter::ERROR,
tracing::Level::WARN => tracing::level_filters::LevelFilter::WARN,
tracing::Level::INFO => tracing::level_filters::LevelFilter::INFO,
tracing::Level::DEBUG => tracing::level_filters::LevelFilter::DEBUG,
tracing::Level::TRACE => tracing::level_filters::LevelFilter::TRACE,
};
self.filter.max_level_enabled(level)
}
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
let meta = attrs.metadata();
let mut visitor = FieldVisitor::new();
attrs.record(&mut visitor);
let span_meta = SpanMeta {
name: meta.name().to_string(),
target: meta.target().to_string(),
level: meta.level().to_string(),
file: meta.file().map(|s| s.to_string()),
line: meta.line(),
fields: visitor.fields,
};
let local_id = self.call_new_span(span_meta);
self.span_ids.lock().insert(id.into_u64(), local_id);
}
fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
let span_id = match self.span_ids.lock().get(&id.into_u64()) {
Some(&id) => id,
None => return,
};
let mut visitor = FieldVisitor::new();
values.record(&mut visitor);
if !visitor.fields.is_empty() {
self.call_record(span_id, visitor.fields);
}
}
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
let meta = event.metadata();
let mut visitor = FieldVisitor::new();
event.record(&mut visitor);
let message = visitor
.fields
.iter()
.find(|f| f.name == "message")
.map(|f| f.value.clone())
.unwrap_or_default();
let parent_span_id = ctx
.current_span()
.id()
.and_then(|id| self.span_ids.lock().get(&id.into_u64()).copied());
let event_meta = EventMeta {
message,
target: meta.target().to_string(),
level: meta.level().to_string(),
file: meta.file().map(|s| s.to_string()),
line: meta.line(),
fields: visitor.fields,
parent_span_id,
};
self.call_event(event_meta);
}
fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
self.call_enter(span_id);
}
}
fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
self.call_exit(span_id);
}
}
fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
if let Some(span_id) = self.span_ids.lock().remove(&id.into_u64()) {
self.call_drop_span(span_id);
}
}
}
struct FieldVisitor {
fields: Vec<Field>,
}
impl FieldVisitor {
fn new() -> Self {
Self { fields: Vec::new() }
}
}
impl tracing::field::Visit for FieldVisitor {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.fields.push(Field {
name: field.name().to_string(),
value: format!("{:?}", value),
});
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.fields.push(Field {
name: field.name().to_string(),
value: value.to_string(),
});
}
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.fields.push(Field {
name: field.name().to_string(),
value: value.to_string(),
});
}
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
self.fields.push(Field {
name: field.name().to_string(),
value: value.to_string(),
});
}
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.fields.push(Field {
name: field.name().to_string(),
value: value.to_string(),
});
}
}
#[derive(Debug, Clone)]
pub enum TraceRecord {
NewSpan { id: u64, meta: SpanMeta },
Record { span_id: u64, fields: Vec<Field> },
Event(EventMeta),
Enter { span_id: u64 },
Exit { span_id: u64 },
DropSpan { span_id: u64 },
}
#[derive(Clone)]
pub struct HostTracingSink {
records: Arc<Mutex<Vec<TraceRecord>>>,
next_span_id: Arc<AtomicU64>,
}
impl HostTracingSink {
pub fn new() -> Self {
Self {
records: Arc::new(Mutex::new(Vec::new())),
next_span_id: Arc::new(AtomicU64::new(1)),
}
}
pub fn records(&self) -> Vec<TraceRecord> {
self.records.lock().clone()
}
pub fn clear(&self) {
self.records.lock().clear();
}
}
impl Default for HostTracingSink {
fn default() -> Self {
Self::new()
}
}
impl TracingSink for HostTracingSink {
async fn new_span(&self, span: SpanMeta) -> u64 {
let id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
self.records
.lock()
.push(TraceRecord::NewSpan { id, meta: span });
id
}
async fn record(&self, span_id: u64, fields: Vec<Field>) {
self.records
.lock()
.push(TraceRecord::Record { span_id, fields });
}
async fn event(&self, event: EventMeta) {
self.records.lock().push(TraceRecord::Event(event));
}
async fn enter(&self, span_id: u64) {
self.records.lock().push(TraceRecord::Enter { span_id });
}
async fn exit(&self, span_id: u64) {
self.records.lock().push(TraceRecord::Exit { span_id });
}
async fn drop_span(&self, span_id: u64) {
self.records.lock().push(TraceRecord::DropSpan { span_id });
}
}
pub fn create_tracing_sink_dispatcher(
sink: HostTracingSink,
) -> impl Fn(Frame) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
+ Send
+ Sync
+ 'static {
move |request: Frame| {
let sink = sink.clone();
Box::pin(async move {
let server = TracingSinkServer::new(sink);
let mut response = server
.dispatch(request.desc.method_id, request.payload_bytes())
.await?;
response.desc.channel_id = request.desc.channel_id;
response.desc.msg_id = request.desc.msg_id;
Ok(response)
})
}
}