1#![allow(clippy::type_complexity)]
2use std::collections::HashMap;
52use std::pin::Pin;
53use std::sync::Arc;
54use std::sync::atomic::{AtomicU64, Ordering};
55
56use parking_lot::Mutex;
57use rapace::{Frame, RpcError, RpcSession, Transport};
58use tracing::span::{Attributes, Record};
59use tracing::{Event, Id, Subscriber};
60use tracing_subscriber::Layer;
61use tracing_subscriber::layer::Context;
62use tracing_subscriber::registry::LookupSpan;
63
64#[derive(Debug, Clone, facet::Facet)]
70pub struct Field {
71 pub name: String,
73 pub value: String,
75}
76
77#[derive(Debug, Clone, facet::Facet)]
79pub struct SpanMeta {
80 pub name: String,
82 pub target: String,
84 pub level: String,
86 pub file: Option<String>,
88 pub line: Option<u32>,
90 pub fields: Vec<Field>,
92}
93
94#[derive(Debug, Clone, facet::Facet)]
96pub struct EventMeta {
97 pub message: String,
99 pub target: String,
101 pub level: String,
103 pub file: Option<String>,
105 pub line: Option<u32>,
107 pub fields: Vec<Field>,
109 pub parent_span_id: Option<u64>,
111}
112
113#[allow(async_fn_in_trait)]
121#[rapace::service]
122pub trait TracingSink {
123 async fn new_span(&self, span: crate::SpanMeta) -> u64;
126
127 async fn record(&self, span_id: u64, fields: Vec<crate::Field>);
129
130 async fn event(&self, event: crate::EventMeta);
132
133 async fn enter(&self, span_id: u64);
135
136 async fn exit(&self, span_id: u64);
138
139 async fn drop_span(&self, span_id: u64);
141}
142
143#[allow(async_fn_in_trait)]
152#[rapace::service]
153pub trait TracingConfig {
154 async fn set_filter(&self, filter: String);
159}
160
161use tracing_subscriber::EnvFilter;
166
167#[derive(Clone)]
171pub struct SharedFilter {
172 inner: Arc<parking_lot::RwLock<EnvFilter>>,
173}
174
175impl SharedFilter {
176 pub fn new() -> Self {
178 let filter = EnvFilter::new("trace");
182 Self {
183 inner: Arc::new(parking_lot::RwLock::new(filter)),
184 }
185 }
186
187 pub fn set_filter(&self, filter_str: &str) {
189 match EnvFilter::builder().parse(filter_str) {
190 Ok(filter) => {
191 *self.inner.write() = filter;
192 }
193 Err(e) => {
194 eprintln!("rapace-tracing: invalid filter '{}': {}", filter_str, e);
196 }
197 }
198 }
199
200 pub fn max_level_enabled(&self, level: tracing::level_filters::LevelFilter) -> bool {
205 let filter = self.inner.read();
206 if let Some(max) = filter.max_level_hint() {
207 level <= max
208 } else {
209 true
210 }
211 }
212
213 pub fn max_level_hint(&self) -> Option<tracing::level_filters::LevelFilter> {
215 self.inner.read().max_level_hint()
216 }
217}
218
219impl Default for SharedFilter {
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225#[derive(Clone)]
233pub struct TracingConfigImpl {
234 filter: SharedFilter,
235}
236
237impl TracingConfigImpl {
238 pub fn new(filter: SharedFilter) -> Self {
240 Self { filter }
241 }
242}
243
244impl TracingConfig for TracingConfigImpl {
245 async fn set_filter(&self, filter: String) {
246 self.filter.set_filter(&filter);
247 }
248}
249
250pub fn create_tracing_config_dispatcher(
252 config: TracingConfigImpl,
253) -> impl Fn(
254 u32,
255 u32,
256 Vec<u8>,
257) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
258+ Send
259+ Sync
260+ 'static {
261 move |_channel_id, method_id, payload| {
262 let config = config.clone();
263 Box::pin(async move {
264 let server = TracingConfigServer::new(config);
265 server.dispatch(method_id, &payload).await
266 })
267 }
268}
269
270pub struct RapaceTracingLayer<T: Transport + Send + Sync + 'static> {
282 session: Arc<RpcSession<T>>,
283 span_ids: Mutex<HashMap<u64, u64>>,
285 next_span_id: AtomicU64,
287 rt: tokio::runtime::Handle,
289 filter: SharedFilter,
291}
292
293impl<T: Transport + Send + Sync + 'static> RapaceTracingLayer<T> {
294 pub fn new(session: Arc<RpcSession<T>>, rt: tokio::runtime::Handle) -> (Self, SharedFilter) {
300 let filter = SharedFilter::new();
301 let layer = Self {
302 session,
303 span_ids: Mutex::new(HashMap::new()),
304 next_span_id: AtomicU64::new(1),
305 rt,
306 filter: filter.clone(),
307 };
308 (layer, filter)
309 }
310
311 pub fn with_filter(
313 session: Arc<RpcSession<T>>,
314 rt: tokio::runtime::Handle,
315 filter: SharedFilter,
316 ) -> Self {
317 Self {
318 session,
319 span_ids: Mutex::new(HashMap::new()),
320 next_span_id: AtomicU64::new(1),
321 rt,
322 filter,
323 }
324 }
325
326 fn call_new_span(&self, meta: SpanMeta) -> u64 {
328 let client = TracingSinkClient::new(self.session.clone());
329 let local_id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
330
331 self.rt.spawn(async move {
332 let _ = client.new_span(meta).await;
333 });
334
335 local_id
336 }
337
338 fn call_record(&self, span_id: u64, fields: Vec<Field>) {
340 let client = TracingSinkClient::new(self.session.clone());
341 self.rt.spawn(async move {
342 let _ = client.record(span_id, fields).await;
343 });
344 }
345
346 fn call_event(&self, event: EventMeta) {
348 let client = TracingSinkClient::new(self.session.clone());
349 self.rt.spawn(async move {
350 let _ = client.event(event).await;
351 });
352 }
353
354 fn call_enter(&self, span_id: u64) {
356 let client = TracingSinkClient::new(self.session.clone());
357 self.rt.spawn(async move {
358 let _ = client.enter(span_id).await;
359 });
360 }
361
362 fn call_exit(&self, span_id: u64) {
364 let client = TracingSinkClient::new(self.session.clone());
365 self.rt.spawn(async move {
366 let _ = client.exit(span_id).await;
367 });
368 }
369
370 fn call_drop_span(&self, span_id: u64) {
372 let client = TracingSinkClient::new(self.session.clone());
373 self.rt.spawn(async move {
374 let _ = client.drop_span(span_id).await;
375 });
376 }
377}
378
379impl<S, T> Layer<S> for RapaceTracingLayer<T>
380where
381 S: Subscriber + for<'a> LookupSpan<'a>,
382 T: Transport + Send + Sync + 'static,
383{
384 fn enabled(&self, metadata: &tracing::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
385 let target = metadata.target();
388 if target.starts_with("rapace_tracing")
389 || target.starts_with("rapace_core")
390 || target.starts_with("rapace_transport_shm")
391 {
392 return false;
393 }
394
395 let level = match *metadata.level() {
397 tracing::Level::ERROR => tracing::level_filters::LevelFilter::ERROR,
398 tracing::Level::WARN => tracing::level_filters::LevelFilter::WARN,
399 tracing::Level::INFO => tracing::level_filters::LevelFilter::INFO,
400 tracing::Level::DEBUG => tracing::level_filters::LevelFilter::DEBUG,
401 tracing::Level::TRACE => tracing::level_filters::LevelFilter::TRACE,
402 };
403 self.filter.max_level_enabled(level)
404 }
405
406 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
407 let meta = attrs.metadata();
408
409 let mut visitor = FieldVisitor::new();
411 attrs.record(&mut visitor);
412
413 let span_meta = SpanMeta {
414 name: meta.name().to_string(),
415 target: meta.target().to_string(),
416 level: meta.level().to_string(),
417 file: meta.file().map(|s| s.to_string()),
418 line: meta.line(),
419 fields: visitor.fields,
420 };
421
422 let local_id = self.call_new_span(span_meta);
423
424 self.span_ids.lock().insert(id.into_u64(), local_id);
426 }
427
428 fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
429 let span_id = match self.span_ids.lock().get(&id.into_u64()) {
430 Some(&id) => id,
431 None => return,
432 };
433
434 let mut visitor = FieldVisitor::new();
435 values.record(&mut visitor);
436
437 if !visitor.fields.is_empty() {
438 self.call_record(span_id, visitor.fields);
439 }
440 }
441
442 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
443 let meta = event.metadata();
444
445 let mut visitor = FieldVisitor::new();
447 event.record(&mut visitor);
448
449 let message = visitor
451 .fields
452 .iter()
453 .find(|f| f.name == "message")
454 .map(|f| f.value.clone())
455 .unwrap_or_default();
456
457 let parent_span_id = ctx
459 .current_span()
460 .id()
461 .and_then(|id| self.span_ids.lock().get(&id.into_u64()).copied());
462
463 let event_meta = EventMeta {
464 message,
465 target: meta.target().to_string(),
466 level: meta.level().to_string(),
467 file: meta.file().map(|s| s.to_string()),
468 line: meta.line(),
469 fields: visitor.fields,
470 parent_span_id,
471 };
472
473 self.call_event(event_meta);
474 }
475
476 fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
477 if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
478 self.call_enter(span_id);
479 }
480 }
481
482 fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
483 if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
484 self.call_exit(span_id);
485 }
486 }
487
488 fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
489 if let Some(span_id) = self.span_ids.lock().remove(&id.into_u64()) {
490 self.call_drop_span(span_id);
491 }
492 }
493}
494
495struct FieldVisitor {
497 fields: Vec<Field>,
498}
499
500impl FieldVisitor {
501 fn new() -> Self {
502 Self { fields: Vec::new() }
503 }
504}
505
506impl tracing::field::Visit for FieldVisitor {
507 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
508 self.fields.push(Field {
509 name: field.name().to_string(),
510 value: format!("{:?}", value),
511 });
512 }
513
514 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
515 self.fields.push(Field {
516 name: field.name().to_string(),
517 value: value.to_string(),
518 });
519 }
520
521 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
522 self.fields.push(Field {
523 name: field.name().to_string(),
524 value: value.to_string(),
525 });
526 }
527
528 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
529 self.fields.push(Field {
530 name: field.name().to_string(),
531 value: value.to_string(),
532 });
533 }
534
535 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
536 self.fields.push(Field {
537 name: field.name().to_string(),
538 value: value.to_string(),
539 });
540 }
541}
542
543#[derive(Debug, Clone)]
549pub enum TraceRecord {
550 NewSpan { id: u64, meta: SpanMeta },
551 Record { span_id: u64, fields: Vec<Field> },
552 Event(EventMeta),
553 Enter { span_id: u64 },
554 Exit { span_id: u64 },
555 DropSpan { span_id: u64 },
556}
557
558#[derive(Clone)]
563pub struct HostTracingSink {
564 records: Arc<Mutex<Vec<TraceRecord>>>,
565 next_span_id: Arc<AtomicU64>,
566}
567
568impl HostTracingSink {
569 pub fn new() -> Self {
571 Self {
572 records: Arc::new(Mutex::new(Vec::new())),
573 next_span_id: Arc::new(AtomicU64::new(1)),
574 }
575 }
576
577 pub fn records(&self) -> Vec<TraceRecord> {
579 self.records.lock().clone()
580 }
581
582 pub fn clear(&self) {
584 self.records.lock().clear();
585 }
586}
587
588impl Default for HostTracingSink {
589 fn default() -> Self {
590 Self::new()
591 }
592}
593
594impl TracingSink for HostTracingSink {
595 async fn new_span(&self, span: SpanMeta) -> u64 {
596 let id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
597 self.records
598 .lock()
599 .push(TraceRecord::NewSpan { id, meta: span });
600 id
601 }
602
603 async fn record(&self, span_id: u64, fields: Vec<Field>) {
604 self.records
605 .lock()
606 .push(TraceRecord::Record { span_id, fields });
607 }
608
609 async fn event(&self, event: EventMeta) {
610 self.records.lock().push(TraceRecord::Event(event));
611 }
612
613 async fn enter(&self, span_id: u64) {
614 self.records.lock().push(TraceRecord::Enter { span_id });
615 }
616
617 async fn exit(&self, span_id: u64) {
618 self.records.lock().push(TraceRecord::Exit { span_id });
619 }
620
621 async fn drop_span(&self, span_id: u64) {
622 self.records.lock().push(TraceRecord::DropSpan { span_id });
623 }
624}
625
626pub fn create_tracing_sink_dispatcher(
632 sink: HostTracingSink,
633) -> impl Fn(
634 u32,
635 u32,
636 Vec<u8>,
637) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
638+ Send
639+ Sync
640+ 'static {
641 move |_channel_id, method_id, payload| {
642 let sink = sink.clone();
643 Box::pin(async move {
644 let server = TracingSinkServer::new(sink);
645 server.dispatch(method_id, &payload).await
646 })
647 }
648}
649
650