1#![doc = include_str!("../README.md")]
2#![allow(clippy::type_complexity)]
3
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use parking_lot::Mutex;
10use rapace::{Frame, RpcError, RpcSession};
11use tracing::span::{Attributes, Record};
12use tracing::{Event, Id, Subscriber};
13use tracing_subscriber::Layer;
14use tracing_subscriber::layer::Context;
15use tracing_subscriber::registry::LookupSpan;
16
17#[derive(Debug, Clone, facet::Facet)]
23pub struct Field {
24 pub name: String,
26 pub value: String,
28}
29
30#[derive(Debug, Clone, facet::Facet)]
32pub struct SpanMeta {
33 pub name: String,
35 pub target: String,
37 pub level: String,
39 pub file: Option<String>,
41 pub line: Option<u32>,
43 pub fields: Vec<Field>,
45}
46
47#[derive(Debug, Clone, facet::Facet)]
49pub struct EventMeta {
50 pub message: String,
52 pub target: String,
54 pub level: String,
56 pub file: Option<String>,
58 pub line: Option<u32>,
60 pub fields: Vec<Field>,
62 pub parent_span_id: Option<u64>,
64}
65
66#[allow(async_fn_in_trait)]
74#[rapace::service]
75pub trait TracingSink {
76 async fn new_span(&self, span: crate::SpanMeta) -> u64;
79
80 async fn record(&self, span_id: u64, fields: Vec<crate::Field>);
82
83 async fn event(&self, event: crate::EventMeta);
85
86 async fn enter(&self, span_id: u64);
88
89 async fn exit(&self, span_id: u64);
91
92 async fn drop_span(&self, span_id: u64);
94}
95
96#[allow(async_fn_in_trait)]
105#[rapace::service]
106pub trait TracingConfig {
107 async fn set_filter(&self, filter: String);
112}
113
114use tracing_subscriber::EnvFilter;
119
120#[derive(Clone)]
124pub struct SharedFilter {
125 inner: Arc<parking_lot::RwLock<EnvFilter>>,
126}
127
128impl SharedFilter {
129 pub fn new() -> Self {
131 let default_filter = std::env::var("RAPACE_TRACING_DEFAULT_FILTER")
134 .ok()
135 .filter(|s| !s.trim().is_empty())
136 .unwrap_or_else(|| "warn".to_string());
137
138 let filter = EnvFilter::new(default_filter);
139 Self {
140 inner: Arc::new(parking_lot::RwLock::new(filter)),
141 }
142 }
143
144 pub fn set_filter(&self, filter_str: &str) {
146 match EnvFilter::builder().parse(filter_str) {
147 Ok(filter) => {
148 *self.inner.write() = filter;
149 }
150 Err(e) => {
151 eprintln!("rapace-tracing: invalid filter '{}': {}", filter_str, e);
153 }
154 }
155 }
156
157 pub fn max_level_enabled(&self, level: tracing::level_filters::LevelFilter) -> bool {
162 let filter = self.inner.read();
163 if let Some(max) = filter.max_level_hint() {
164 level <= max
165 } else {
166 true
167 }
168 }
169
170 pub fn max_level_hint(&self) -> Option<tracing::level_filters::LevelFilter> {
172 self.inner.read().max_level_hint()
173 }
174}
175
176impl Default for SharedFilter {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182#[derive(Clone)]
190pub struct TracingConfigImpl {
191 filter: SharedFilter,
192}
193
194impl TracingConfigImpl {
195 pub fn new(filter: SharedFilter) -> Self {
197 Self { filter }
198 }
199}
200
201impl TracingConfig for TracingConfigImpl {
202 async fn set_filter(&self, filter: String) {
203 self.filter.set_filter(&filter);
204 }
205}
206
207pub fn create_tracing_config_dispatcher(
209 config: TracingConfigImpl,
210) -> impl Fn(Frame) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
211+ Send
212+ Sync
213+ 'static {
214 move |request: Frame| {
215 let config = config.clone();
216 Box::pin(async move {
217 let server = TracingConfigServer::new(config);
218 let mut response = server
219 .dispatch(request.desc.method_id, request.payload_bytes())
220 .await?;
221 response.desc.channel_id = request.desc.channel_id;
222 response.desc.msg_id = request.desc.msg_id;
223 Ok(response)
224 })
225 }
226}
227
228pub struct RapaceTracingLayer {
240 session: Arc<RpcSession>,
241 span_ids: Mutex<HashMap<u64, u64>>,
243 next_span_id: AtomicU64,
245 rt: tokio::runtime::Handle,
247 filter: SharedFilter,
249}
250
251impl RapaceTracingLayer {
252 pub fn new(session: Arc<RpcSession>, rt: tokio::runtime::Handle) -> (Self, SharedFilter) {
258 let filter = SharedFilter::new();
259 let layer = Self {
260 session,
261 span_ids: Mutex::new(HashMap::new()),
262 next_span_id: AtomicU64::new(1),
263 rt,
264 filter: filter.clone(),
265 };
266 (layer, filter)
267 }
268
269 pub fn with_filter(
271 session: Arc<RpcSession>,
272 rt: tokio::runtime::Handle,
273 filter: SharedFilter,
274 ) -> Self {
275 Self {
276 session,
277 span_ids: Mutex::new(HashMap::new()),
278 next_span_id: AtomicU64::new(1),
279 rt,
280 filter,
281 }
282 }
283
284 fn call_new_span(&self, meta: SpanMeta) -> u64 {
286 let local_id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
287 let session = self.session.clone();
288
289 self.rt.spawn(async move {
290 let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&meta).unwrap();
291 let _ = session
292 .notify(TRACING_SINK_METHOD_ID_NEW_SPAN, request_bytes)
293 .await;
294 });
295
296 local_id
297 }
298
299 fn call_record(&self, span_id: u64, fields: Vec<Field>) {
301 let session = self.session.clone();
302 self.rt.spawn(async move {
303 let request_bytes: Vec<u8> =
304 rapace::facet_postcard::to_vec(&(span_id, fields)).unwrap();
305 let _ = session
306 .notify(TRACING_SINK_METHOD_ID_RECORD, request_bytes)
307 .await;
308 });
309 }
310
311 fn call_event(&self, event: EventMeta) {
313 let session = self.session.clone();
314 self.rt.spawn(async move {
315 let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&event).unwrap();
316 let _ = session
317 .notify(TRACING_SINK_METHOD_ID_EVENT, request_bytes)
318 .await;
319 });
320 }
321
322 fn call_enter(&self, span_id: u64) {
324 let session = self.session.clone();
325 self.rt.spawn(async move {
326 let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&span_id).unwrap();
327 let _ = session
328 .notify(TRACING_SINK_METHOD_ID_ENTER, request_bytes)
329 .await;
330 });
331 }
332
333 fn call_exit(&self, span_id: u64) {
335 let session = self.session.clone();
336 self.rt.spawn(async move {
337 let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&span_id).unwrap();
338 let _ = session
339 .notify(TRACING_SINK_METHOD_ID_EXIT, request_bytes)
340 .await;
341 });
342 }
343
344 fn call_drop_span(&self, span_id: u64) {
346 let session = self.session.clone();
347 self.rt.spawn(async move {
348 let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&span_id).unwrap();
349 let _ = session
350 .notify(TRACING_SINK_METHOD_ID_DROP_SPAN, request_bytes)
351 .await;
352 });
353 }
354}
355
356impl<S> Layer<S> for RapaceTracingLayer
357where
358 S: Subscriber + for<'a> LookupSpan<'a>,
359{
360 fn enabled(&self, metadata: &tracing::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
361 let target = metadata.target();
364 if target.starts_with("rapace_tracing")
365 || target.starts_with("rapace_core")
366 || target.starts_with("rapace_transport_shm")
367 {
368 return false;
369 }
370
371 let level = match *metadata.level() {
373 tracing::Level::ERROR => tracing::level_filters::LevelFilter::ERROR,
374 tracing::Level::WARN => tracing::level_filters::LevelFilter::WARN,
375 tracing::Level::INFO => tracing::level_filters::LevelFilter::INFO,
376 tracing::Level::DEBUG => tracing::level_filters::LevelFilter::DEBUG,
377 tracing::Level::TRACE => tracing::level_filters::LevelFilter::TRACE,
378 };
379 self.filter.max_level_enabled(level)
380 }
381
382 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
383 let meta = attrs.metadata();
384
385 let mut visitor = FieldVisitor::new();
387 attrs.record(&mut visitor);
388
389 let span_meta = SpanMeta {
390 name: meta.name().to_string(),
391 target: meta.target().to_string(),
392 level: meta.level().to_string(),
393 file: meta.file().map(|s| s.to_string()),
394 line: meta.line(),
395 fields: visitor.fields,
396 };
397
398 let local_id = self.call_new_span(span_meta);
399
400 self.span_ids.lock().insert(id.into_u64(), local_id);
402 }
403
404 fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
405 let span_id = match self.span_ids.lock().get(&id.into_u64()) {
406 Some(&id) => id,
407 None => return,
408 };
409
410 let mut visitor = FieldVisitor::new();
411 values.record(&mut visitor);
412
413 if !visitor.fields.is_empty() {
414 self.call_record(span_id, visitor.fields);
415 }
416 }
417
418 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
419 let meta = event.metadata();
420
421 let mut visitor = FieldVisitor::new();
423 event.record(&mut visitor);
424
425 let message = visitor
427 .fields
428 .iter()
429 .find(|f| f.name == "message")
430 .map(|f| f.value.clone())
431 .unwrap_or_default();
432
433 let parent_span_id = ctx
435 .current_span()
436 .id()
437 .and_then(|id| self.span_ids.lock().get(&id.into_u64()).copied());
438
439 let event_meta = EventMeta {
440 message,
441 target: meta.target().to_string(),
442 level: meta.level().to_string(),
443 file: meta.file().map(|s| s.to_string()),
444 line: meta.line(),
445 fields: visitor.fields,
446 parent_span_id,
447 };
448
449 self.call_event(event_meta);
450 }
451
452 fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
453 if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
454 self.call_enter(span_id);
455 }
456 }
457
458 fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
459 if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
460 self.call_exit(span_id);
461 }
462 }
463
464 fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
465 if let Some(span_id) = self.span_ids.lock().remove(&id.into_u64()) {
466 self.call_drop_span(span_id);
467 }
468 }
469}
470
471struct FieldVisitor {
473 fields: Vec<Field>,
474}
475
476impl FieldVisitor {
477 fn new() -> Self {
478 Self { fields: Vec::new() }
479 }
480}
481
482impl tracing::field::Visit for FieldVisitor {
483 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
484 self.fields.push(Field {
485 name: field.name().to_string(),
486 value: format!("{:?}", value),
487 });
488 }
489
490 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
491 self.fields.push(Field {
492 name: field.name().to_string(),
493 value: value.to_string(),
494 });
495 }
496
497 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
498 self.fields.push(Field {
499 name: field.name().to_string(),
500 value: value.to_string(),
501 });
502 }
503
504 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
505 self.fields.push(Field {
506 name: field.name().to_string(),
507 value: value.to_string(),
508 });
509 }
510
511 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
512 self.fields.push(Field {
513 name: field.name().to_string(),
514 value: value.to_string(),
515 });
516 }
517}
518
519#[derive(Debug, Clone)]
525pub enum TraceRecord {
526 NewSpan { id: u64, meta: SpanMeta },
527 Record { span_id: u64, fields: Vec<Field> },
528 Event(EventMeta),
529 Enter { span_id: u64 },
530 Exit { span_id: u64 },
531 DropSpan { span_id: u64 },
532}
533
534#[derive(Clone)]
539pub struct HostTracingSink {
540 records: Arc<Mutex<Vec<TraceRecord>>>,
541 next_span_id: Arc<AtomicU64>,
542}
543
544impl HostTracingSink {
545 pub fn new() -> Self {
547 Self {
548 records: Arc::new(Mutex::new(Vec::new())),
549 next_span_id: Arc::new(AtomicU64::new(1)),
550 }
551 }
552
553 pub fn records(&self) -> Vec<TraceRecord> {
555 self.records.lock().clone()
556 }
557
558 pub fn clear(&self) {
560 self.records.lock().clear();
561 }
562}
563
564impl Default for HostTracingSink {
565 fn default() -> Self {
566 Self::new()
567 }
568}
569
570impl TracingSink for HostTracingSink {
571 async fn new_span(&self, span: SpanMeta) -> u64 {
572 let id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
573 self.records
574 .lock()
575 .push(TraceRecord::NewSpan { id, meta: span });
576 id
577 }
578
579 async fn record(&self, span_id: u64, fields: Vec<Field>) {
580 self.records
581 .lock()
582 .push(TraceRecord::Record { span_id, fields });
583 }
584
585 async fn event(&self, event: EventMeta) {
586 self.records.lock().push(TraceRecord::Event(event));
587 }
588
589 async fn enter(&self, span_id: u64) {
590 self.records.lock().push(TraceRecord::Enter { span_id });
591 }
592
593 async fn exit(&self, span_id: u64) {
594 self.records.lock().push(TraceRecord::Exit { span_id });
595 }
596
597 async fn drop_span(&self, span_id: u64) {
598 self.records.lock().push(TraceRecord::DropSpan { span_id });
599 }
600}
601
602pub fn create_tracing_sink_dispatcher(
608 sink: HostTracingSink,
609) -> impl Fn(Frame) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
610+ Send
611+ Sync
612+ 'static {
613 move |request: Frame| {
614 let sink = sink.clone();
615 Box::pin(async move {
616 let server = TracingSinkServer::new(sink);
617 let mut response = server
618 .dispatch(request.desc.method_id, request.payload_bytes())
619 .await?;
620 response.desc.channel_id = request.desc.channel_id;
621 response.desc.msg_id = request.desc.msg_id;
622 Ok(response)
623 })
624 }
625}
626
627