1#![allow(clippy::type_complexity)]
2use std::collections::HashMap;
52use std::pin::Pin;
53use std::sync::Arc;
54use std::sync::atomic::{AtomicU64, Ordering};
55
56use parking_lot::Mutex;
57use rapace::{Frame, RpcError, RpcSession, Transport};
58use tracing::span::{Attributes, Record};
59use tracing::{Event, Id, Subscriber};
60use tracing_subscriber::Layer;
61use tracing_subscriber::layer::Context;
62use tracing_subscriber::registry::LookupSpan;
63
64#[derive(Debug, Clone, facet::Facet)]
70pub struct Field {
71 pub name: String,
73 pub value: String,
75}
76
77#[derive(Debug, Clone, facet::Facet)]
79pub struct SpanMeta {
80 pub name: String,
82 pub target: String,
84 pub level: String,
86 pub file: Option<String>,
88 pub line: Option<u32>,
90 pub fields: Vec<Field>,
92}
93
94#[derive(Debug, Clone, facet::Facet)]
96pub struct EventMeta {
97 pub message: String,
99 pub target: String,
101 pub level: String,
103 pub file: Option<String>,
105 pub line: Option<u32>,
107 pub fields: Vec<Field>,
109 pub parent_span_id: Option<u64>,
111}
112
113#[allow(async_fn_in_trait)]
121#[rapace::service]
122pub trait TracingSink {
123 async fn new_span(&self, span: crate::SpanMeta) -> u64;
126
127 async fn record(&self, span_id: u64, fields: Vec<crate::Field>);
129
130 async fn event(&self, event: crate::EventMeta);
132
133 async fn enter(&self, span_id: u64);
135
136 async fn exit(&self, span_id: u64);
138
139 async fn drop_span(&self, span_id: u64);
141}
142
143#[allow(async_fn_in_trait)]
152#[rapace::service]
153pub trait TracingConfig {
154 async fn set_filter(&self, filter: String);
159}
160
161use tracing_subscriber::EnvFilter;
166
167#[derive(Clone)]
171pub struct SharedFilter {
172 inner: Arc<parking_lot::RwLock<EnvFilter>>,
173}
174
175impl SharedFilter {
176 pub fn new() -> Self {
178 let filter = EnvFilter::new("trace");
182 Self {
183 inner: Arc::new(parking_lot::RwLock::new(filter)),
184 }
185 }
186
187 pub fn set_filter(&self, filter_str: &str) {
189 match EnvFilter::builder().parse(filter_str) {
190 Ok(filter) => {
191 *self.inner.write() = filter;
192 }
193 Err(e) => {
194 eprintln!("rapace-tracing: invalid filter '{}': {}", filter_str, e);
196 }
197 }
198 }
199
200 pub fn max_level_enabled(&self, level: tracing::level_filters::LevelFilter) -> bool {
205 let filter = self.inner.read();
206 if let Some(max) = filter.max_level_hint() {
207 level <= max
208 } else {
209 true
210 }
211 }
212
213 pub fn max_level_hint(&self) -> Option<tracing::level_filters::LevelFilter> {
215 self.inner.read().max_level_hint()
216 }
217}
218
219impl Default for SharedFilter {
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225#[derive(Clone)]
233pub struct TracingConfigImpl {
234 filter: SharedFilter,
235}
236
237impl TracingConfigImpl {
238 pub fn new(filter: SharedFilter) -> Self {
240 Self { filter }
241 }
242}
243
244impl TracingConfig for TracingConfigImpl {
245 async fn set_filter(&self, filter: String) {
246 self.filter.set_filter(&filter);
247 }
248}
249
250pub fn create_tracing_config_dispatcher(
252 config: TracingConfigImpl,
253) -> impl Fn(
254 u32,
255 u32,
256 Vec<u8>,
257) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
258+ Send
259+ Sync
260+ 'static {
261 move |_channel_id, method_id, payload| {
262 let config = config.clone();
263 Box::pin(async move {
264 let server = TracingConfigServer::new(config);
265 server.dispatch(method_id, &payload).await
266 })
267 }
268}
269
270pub struct RapaceTracingLayer<T: Transport + Send + Sync + 'static> {
282 session: Arc<RpcSession<T>>,
283 span_ids: Mutex<HashMap<u64, u64>>,
285 next_span_id: AtomicU64,
287 rt: tokio::runtime::Handle,
289 filter: SharedFilter,
291}
292
293impl<T: Transport + Send + Sync + 'static> RapaceTracingLayer<T> {
294 pub fn new(session: Arc<RpcSession<T>>, rt: tokio::runtime::Handle) -> (Self, SharedFilter) {
300 let filter = SharedFilter::new();
301 let layer = Self {
302 session,
303 span_ids: Mutex::new(HashMap::new()),
304 next_span_id: AtomicU64::new(1),
305 rt,
306 filter: filter.clone(),
307 };
308 (layer, filter)
309 }
310
311 pub fn with_filter(
313 session: Arc<RpcSession<T>>,
314 rt: tokio::runtime::Handle,
315 filter: SharedFilter,
316 ) -> Self {
317 Self {
318 session,
319 span_ids: Mutex::new(HashMap::new()),
320 next_span_id: AtomicU64::new(1),
321 rt,
322 filter,
323 }
324 }
325
326 fn call_new_span(&self, meta: SpanMeta) -> u64 {
328 let client = TracingSinkClient::new(self.session.clone());
329 let local_id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
330
331 self.rt.spawn(async move {
332 let _ = client.new_span(meta).await;
333 });
334
335 local_id
336 }
337
338 fn call_record(&self, span_id: u64, fields: Vec<Field>) {
340 let client = TracingSinkClient::new(self.session.clone());
341 self.rt.spawn(async move {
342 let _ = client.record(span_id, fields).await;
343 });
344 }
345
346 fn call_event(&self, event: EventMeta) {
348 let client = TracingSinkClient::new(self.session.clone());
349 self.rt.spawn(async move {
350 let _ = client.event(event).await;
351 });
352 }
353
354 fn call_enter(&self, span_id: u64) {
356 let client = TracingSinkClient::new(self.session.clone());
357 self.rt.spawn(async move {
358 let _ = client.enter(span_id).await;
359 });
360 }
361
362 fn call_exit(&self, span_id: u64) {
364 let client = TracingSinkClient::new(self.session.clone());
365 self.rt.spawn(async move {
366 let _ = client.exit(span_id).await;
367 });
368 }
369
370 fn call_drop_span(&self, span_id: u64) {
372 let client = TracingSinkClient::new(self.session.clone());
373 self.rt.spawn(async move {
374 let _ = client.drop_span(span_id).await;
375 });
376 }
377}
378
379impl<S, T> Layer<S> for RapaceTracingLayer<T>
380where
381 S: Subscriber + for<'a> LookupSpan<'a>,
382 T: Transport + Send + Sync + 'static,
383{
384 fn enabled(&self, metadata: &tracing::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
385 let level = match *metadata.level() {
387 tracing::Level::ERROR => tracing::level_filters::LevelFilter::ERROR,
388 tracing::Level::WARN => tracing::level_filters::LevelFilter::WARN,
389 tracing::Level::INFO => tracing::level_filters::LevelFilter::INFO,
390 tracing::Level::DEBUG => tracing::level_filters::LevelFilter::DEBUG,
391 tracing::Level::TRACE => tracing::level_filters::LevelFilter::TRACE,
392 };
393 self.filter.max_level_enabled(level)
394 }
395
396 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
397 let meta = attrs.metadata();
398
399 let mut visitor = FieldVisitor::new();
401 attrs.record(&mut visitor);
402
403 let span_meta = SpanMeta {
404 name: meta.name().to_string(),
405 target: meta.target().to_string(),
406 level: meta.level().to_string(),
407 file: meta.file().map(|s| s.to_string()),
408 line: meta.line(),
409 fields: visitor.fields,
410 };
411
412 let local_id = self.call_new_span(span_meta);
413
414 self.span_ids.lock().insert(id.into_u64(), local_id);
416 }
417
418 fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
419 let span_id = match self.span_ids.lock().get(&id.into_u64()) {
420 Some(&id) => id,
421 None => return,
422 };
423
424 let mut visitor = FieldVisitor::new();
425 values.record(&mut visitor);
426
427 if !visitor.fields.is_empty() {
428 self.call_record(span_id, visitor.fields);
429 }
430 }
431
432 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
433 let meta = event.metadata();
434
435 let mut visitor = FieldVisitor::new();
437 event.record(&mut visitor);
438
439 let message = visitor
441 .fields
442 .iter()
443 .find(|f| f.name == "message")
444 .map(|f| f.value.clone())
445 .unwrap_or_default();
446
447 let parent_span_id = ctx
449 .current_span()
450 .id()
451 .and_then(|id| self.span_ids.lock().get(&id.into_u64()).copied());
452
453 let event_meta = EventMeta {
454 message,
455 target: meta.target().to_string(),
456 level: meta.level().to_string(),
457 file: meta.file().map(|s| s.to_string()),
458 line: meta.line(),
459 fields: visitor.fields,
460 parent_span_id,
461 };
462
463 self.call_event(event_meta);
464 }
465
466 fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
467 if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
468 self.call_enter(span_id);
469 }
470 }
471
472 fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
473 if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
474 self.call_exit(span_id);
475 }
476 }
477
478 fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
479 if let Some(span_id) = self.span_ids.lock().remove(&id.into_u64()) {
480 self.call_drop_span(span_id);
481 }
482 }
483}
484
485struct FieldVisitor {
487 fields: Vec<Field>,
488}
489
490impl FieldVisitor {
491 fn new() -> Self {
492 Self { fields: Vec::new() }
493 }
494}
495
496impl tracing::field::Visit for FieldVisitor {
497 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
498 self.fields.push(Field {
499 name: field.name().to_string(),
500 value: format!("{:?}", value),
501 });
502 }
503
504 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
505 self.fields.push(Field {
506 name: field.name().to_string(),
507 value: value.to_string(),
508 });
509 }
510
511 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
512 self.fields.push(Field {
513 name: field.name().to_string(),
514 value: value.to_string(),
515 });
516 }
517
518 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
519 self.fields.push(Field {
520 name: field.name().to_string(),
521 value: value.to_string(),
522 });
523 }
524
525 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
526 self.fields.push(Field {
527 name: field.name().to_string(),
528 value: value.to_string(),
529 });
530 }
531}
532
533#[derive(Debug, Clone)]
539pub enum TraceRecord {
540 NewSpan { id: u64, meta: SpanMeta },
541 Record { span_id: u64, fields: Vec<Field> },
542 Event(EventMeta),
543 Enter { span_id: u64 },
544 Exit { span_id: u64 },
545 DropSpan { span_id: u64 },
546}
547
548#[derive(Clone)]
553pub struct HostTracingSink {
554 records: Arc<Mutex<Vec<TraceRecord>>>,
555 next_span_id: Arc<AtomicU64>,
556}
557
558impl HostTracingSink {
559 pub fn new() -> Self {
561 Self {
562 records: Arc::new(Mutex::new(Vec::new())),
563 next_span_id: Arc::new(AtomicU64::new(1)),
564 }
565 }
566
567 pub fn records(&self) -> Vec<TraceRecord> {
569 self.records.lock().clone()
570 }
571
572 pub fn clear(&self) {
574 self.records.lock().clear();
575 }
576}
577
578impl Default for HostTracingSink {
579 fn default() -> Self {
580 Self::new()
581 }
582}
583
584impl TracingSink for HostTracingSink {
585 async fn new_span(&self, span: SpanMeta) -> u64 {
586 let id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
587 self.records
588 .lock()
589 .push(TraceRecord::NewSpan { id, meta: span });
590 id
591 }
592
593 async fn record(&self, span_id: u64, fields: Vec<Field>) {
594 self.records
595 .lock()
596 .push(TraceRecord::Record { span_id, fields });
597 }
598
599 async fn event(&self, event: EventMeta) {
600 self.records.lock().push(TraceRecord::Event(event));
601 }
602
603 async fn enter(&self, span_id: u64) {
604 self.records.lock().push(TraceRecord::Enter { span_id });
605 }
606
607 async fn exit(&self, span_id: u64) {
608 self.records.lock().push(TraceRecord::Exit { span_id });
609 }
610
611 async fn drop_span(&self, span_id: u64) {
612 self.records.lock().push(TraceRecord::DropSpan { span_id });
613 }
614}
615
616pub fn create_tracing_sink_dispatcher(
622 sink: HostTracingSink,
623) -> impl Fn(
624 u32,
625 u32,
626 Vec<u8>,
627) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
628+ Send
629+ Sync
630+ 'static {
631 move |_channel_id, method_id, payload| {
632 let sink = sink.clone();
633 Box::pin(async move {
634 let server = TracingSinkServer::new(sink);
635 server.dispatch(method_id, &payload).await
636 })
637 }
638}
639
640