rapace_tracing/
lib.rs

1#![doc = include_str!("../README.md")]
2#![allow(clippy::type_complexity)]
3
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use parking_lot::Mutex;
10use rapace::{Frame, RpcError, RpcSession};
11use tracing::span::{Attributes, Record};
12use tracing::{Event, Id, Subscriber};
13use tracing_subscriber::Layer;
14use tracing_subscriber::layer::Context;
15use tracing_subscriber::registry::LookupSpan;
16
17// ============================================================================
18// Facet Types (transport-agnostic)
19// ============================================================================
20
21/// A field value captured from tracing.
22#[derive(Debug, Clone, facet::Facet)]
23pub struct Field {
24    /// Field name
25    pub name: String,
26    /// Field value (stringified for v1)
27    pub value: String,
28}
29
30/// Metadata about a span.
31#[derive(Debug, Clone, facet::Facet)]
32pub struct SpanMeta {
33    /// Span name
34    pub name: String,
35    /// Target (module path)
36    pub target: String,
37    /// Level as string ("TRACE", "DEBUG", "INFO", "WARN", "ERROR")
38    pub level: String,
39    /// Source file, if available
40    pub file: Option<String>,
41    /// Line number, if available
42    pub line: Option<u32>,
43    /// Fields recorded at span creation
44    pub fields: Vec<Field>,
45}
46
47/// Metadata about an event.
48#[derive(Debug, Clone, facet::Facet)]
49pub struct EventMeta {
50    /// Event message (from the `message` field if present)
51    pub message: String,
52    /// Target (module path)
53    pub target: String,
54    /// Level as string
55    pub level: String,
56    /// Source file, if available
57    pub file: Option<String>,
58    /// Line number, if available
59    pub line: Option<u32>,
60    /// All fields including message
61    pub fields: Vec<Field>,
62    /// Parent span ID if inside a span
63    pub parent_span_id: Option<u64>,
64}
65
66// ============================================================================
67// TracingSink Service (plugin calls host)
68// ============================================================================
69
70/// Service for receiving tracing data from plugins.
71///
72/// The host implements this, the plugin calls it via RPC.
73#[allow(async_fn_in_trait)]
74#[rapace::service]
75pub trait TracingSink {
76    /// Called when a new span is created.
77    /// Returns a span ID that the plugin should use for subsequent calls.
78    async fn new_span(&self, span: crate::SpanMeta) -> u64;
79
80    /// Called when fields are recorded on an existing span.
81    async fn record(&self, span_id: u64, fields: Vec<crate::Field>);
82
83    /// Called when an event is emitted.
84    async fn event(&self, event: crate::EventMeta);
85
86    /// Called when a span is entered.
87    async fn enter(&self, span_id: u64);
88
89    /// Called when a span is exited.
90    async fn exit(&self, span_id: u64);
91
92    /// Called when a span is dropped/closed.
93    async fn drop_span(&self, span_id: u64);
94}
95
96// ============================================================================
97// TracingConfig Service (host calls plugin)
98// ============================================================================
99
100/// Service for configuring tracing in plugins.
101///
102/// The plugin implements this, the host calls it via RPC to push filter config.
103/// This allows the host to be the single source of truth for log filtering.
104#[allow(async_fn_in_trait)]
105#[rapace::service]
106pub trait TracingConfig {
107    /// Set the tracing filter.
108    ///
109    /// The filter string uses the same format as RUST_LOG (e.g., "info,mymodule=debug").
110    /// The plugin should apply this filter to all subsequent tracing calls.
111    async fn set_filter(&self, filter: String);
112}
113
114// ============================================================================
115// Plugin Side: Shared Filter State
116// ============================================================================
117
118use tracing_subscriber::EnvFilter;
119
120/// Shared filter state that can be updated by the host.
121///
122/// This is wrapped in Arc and shared between `RapaceTracingLayer` and `TracingConfigImpl`.
123#[derive(Clone)]
124pub struct SharedFilter {
125    inner: Arc<parking_lot::RwLock<EnvFilter>>,
126}
127
128impl SharedFilter {
129    /// Create a new shared filter with default (allow all) settings.
130    pub fn new() -> Self {
131        // Default to a conservative filter to avoid flooding the transport during startup.
132        // Hosts that want more verbosity can push a filter update via TracingConfig.
133        let default_filter = std::env::var("RAPACE_TRACING_DEFAULT_FILTER")
134            .ok()
135            .filter(|s| !s.trim().is_empty())
136            .unwrap_or_else(|| "warn".to_string());
137
138        let filter = EnvFilter::new(default_filter);
139        Self {
140            inner: Arc::new(parking_lot::RwLock::new(filter)),
141        }
142    }
143
144    /// Update the filter from a filter string (RUST_LOG format).
145    pub fn set_filter(&self, filter_str: &str) {
146        match EnvFilter::builder().parse(filter_str) {
147            Ok(filter) => {
148                *self.inner.write() = filter;
149            }
150            Err(e) => {
151                // Log the error but don't crash - keep existing filter
152                eprintln!("rapace-tracing: invalid filter '{}': {}", filter_str, e);
153            }
154        }
155    }
156
157    /// Check if a given level is enabled (max level check).
158    ///
159    /// This is a quick check based on the filter's max level hint.
160    /// Target-specific filtering happens through the Layer's `enabled` method.
161    pub fn max_level_enabled(&self, level: tracing::level_filters::LevelFilter) -> bool {
162        let filter = self.inner.read();
163        if let Some(max) = filter.max_level_hint() {
164            level <= max
165        } else {
166            true
167        }
168    }
169
170    /// Get the current max level hint.
171    pub fn max_level_hint(&self) -> Option<tracing::level_filters::LevelFilter> {
172        self.inner.read().max_level_hint()
173    }
174}
175
176impl Default for SharedFilter {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182// ============================================================================
183// Plugin Side: TracingConfig Implementation
184// ============================================================================
185
186/// Plugin-side implementation of TracingConfig.
187///
188/// Host calls this to push filter updates to the plugin.
189#[derive(Clone)]
190pub struct TracingConfigImpl {
191    filter: SharedFilter,
192}
193
194impl TracingConfigImpl {
195    /// Create a new TracingConfig implementation with the given shared filter.
196    pub fn new(filter: SharedFilter) -> Self {
197        Self { filter }
198    }
199}
200
201impl TracingConfig for TracingConfigImpl {
202    async fn set_filter(&self, filter: String) {
203        self.filter.set_filter(&filter);
204    }
205}
206
207/// Create a dispatcher for TracingConfig service (plugin side).
208pub fn create_tracing_config_dispatcher(
209    config: TracingConfigImpl,
210) -> impl Fn(Frame) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
211+ Send
212+ Sync
213+ 'static {
214    move |request: Frame| {
215        let config = config.clone();
216        Box::pin(async move {
217            let server = TracingConfigServer::new(config);
218            let mut response = server
219                .dispatch(request.desc.method_id, request.payload_bytes())
220                .await?;
221            response.desc.channel_id = request.desc.channel_id;
222            response.desc.msg_id = request.desc.msg_id;
223            Ok(response)
224        })
225    }
226}
227
228// ============================================================================
229// Plugin Side: RapaceTracingLayer
230// ============================================================================
231
232/// A tracing Layer that forwards spans/events to a TracingSink via RPC.
233///
234/// Install this layer in the plugin's tracing_subscriber registry to have
235/// all tracing data forwarded to the host process.
236///
237/// The layer uses a `SharedFilter` to apply host-controlled filtering locally,
238/// avoiding unnecessary RPC calls for filtered events.
239pub struct RapaceTracingLayer {
240    session: Arc<RpcSession>,
241    /// Maps local tracing span IDs to our u64 IDs used in RPC
242    span_ids: Mutex<HashMap<u64, u64>>,
243    /// Counter for generating local span IDs
244    next_span_id: AtomicU64,
245    /// Runtime handle for spawning async tasks
246    rt: tokio::runtime::Handle,
247    /// Shared filter state (updated by host via TracingConfig)
248    filter: SharedFilter,
249}
250
251impl RapaceTracingLayer {
252    /// Create a new layer that forwards to the given RPC session.
253    ///
254    /// The session should be connected to a host that implements TracingSink.
255    /// Use the returned `SharedFilter` to create a `TracingConfigImpl` for the
256    /// host to push filter updates.
257    pub fn new(session: Arc<RpcSession>, rt: tokio::runtime::Handle) -> (Self, SharedFilter) {
258        let filter = SharedFilter::new();
259        let layer = Self {
260            session,
261            span_ids: Mutex::new(HashMap::new()),
262            next_span_id: AtomicU64::new(1),
263            rt,
264            filter: filter.clone(),
265        };
266        (layer, filter)
267    }
268
269    /// Create a new layer with an existing shared filter.
270    pub fn with_filter(
271        session: Arc<RpcSession>,
272        rt: tokio::runtime::Handle,
273        filter: SharedFilter,
274    ) -> Self {
275        Self {
276            session,
277            span_ids: Mutex::new(HashMap::new()),
278            next_span_id: AtomicU64::new(1),
279            rt,
280            filter,
281        }
282    }
283
284    /// Call TracingSink.new_span via RPC (fire-and-forget from sync context).
285    fn call_new_span(&self, meta: SpanMeta) -> u64 {
286        let local_id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
287        let session = self.session.clone();
288
289        self.rt.spawn(async move {
290            let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&meta).unwrap();
291            let _ = session
292                .notify(TRACING_SINK_METHOD_ID_NEW_SPAN, request_bytes)
293                .await;
294        });
295
296        local_id
297    }
298
299    /// Call TracingSink.record via RPC.
300    fn call_record(&self, span_id: u64, fields: Vec<Field>) {
301        let session = self.session.clone();
302        self.rt.spawn(async move {
303            let request_bytes: Vec<u8> =
304                rapace::facet_postcard::to_vec(&(span_id, fields)).unwrap();
305            let _ = session
306                .notify(TRACING_SINK_METHOD_ID_RECORD, request_bytes)
307                .await;
308        });
309    }
310
311    /// Call TracingSink.event via RPC.
312    fn call_event(&self, event: EventMeta) {
313        let session = self.session.clone();
314        self.rt.spawn(async move {
315            let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&event).unwrap();
316            let _ = session
317                .notify(TRACING_SINK_METHOD_ID_EVENT, request_bytes)
318                .await;
319        });
320    }
321
322    /// Call TracingSink.enter via RPC.
323    fn call_enter(&self, span_id: u64) {
324        let session = self.session.clone();
325        self.rt.spawn(async move {
326            let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&span_id).unwrap();
327            let _ = session
328                .notify(TRACING_SINK_METHOD_ID_ENTER, request_bytes)
329                .await;
330        });
331    }
332
333    /// Call TracingSink.exit via RPC.
334    fn call_exit(&self, span_id: u64) {
335        let session = self.session.clone();
336        self.rt.spawn(async move {
337            let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&span_id).unwrap();
338            let _ = session
339                .notify(TRACING_SINK_METHOD_ID_EXIT, request_bytes)
340                .await;
341        });
342    }
343
344    /// Call TracingSink.drop_span via RPC.
345    fn call_drop_span(&self, span_id: u64) {
346        let session = self.session.clone();
347        self.rt.spawn(async move {
348            let request_bytes: Vec<u8> = rapace::facet_postcard::to_vec(&span_id).unwrap();
349            let _ = session
350                .notify(TRACING_SINK_METHOD_ID_DROP_SPAN, request_bytes)
351                .await;
352        });
353    }
354}
355
356impl<S> Layer<S> for RapaceTracingLayer
357where
358    S: Subscriber + for<'a> LookupSpan<'a>,
359{
360    fn enabled(&self, metadata: &tracing::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
361        // Avoid infinite recursion: don't forward events about rapace_tracing itself
362        // or rapace_core (which handles the RPC session internals)
363        let target = metadata.target();
364        if target.starts_with("rapace_tracing")
365            || target.starts_with("rapace_core")
366            || target.starts_with("rapace_transport_shm")
367        {
368            return false;
369        }
370
371        // Check against the host-controlled filter
372        let level = match *metadata.level() {
373            tracing::Level::ERROR => tracing::level_filters::LevelFilter::ERROR,
374            tracing::Level::WARN => tracing::level_filters::LevelFilter::WARN,
375            tracing::Level::INFO => tracing::level_filters::LevelFilter::INFO,
376            tracing::Level::DEBUG => tracing::level_filters::LevelFilter::DEBUG,
377            tracing::Level::TRACE => tracing::level_filters::LevelFilter::TRACE,
378        };
379        self.filter.max_level_enabled(level)
380    }
381
382    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
383        let meta = attrs.metadata();
384
385        // Collect fields
386        let mut visitor = FieldVisitor::new();
387        attrs.record(&mut visitor);
388
389        let span_meta = SpanMeta {
390            name: meta.name().to_string(),
391            target: meta.target().to_string(),
392            level: meta.level().to_string(),
393            file: meta.file().map(|s| s.to_string()),
394            line: meta.line(),
395            fields: visitor.fields,
396        };
397
398        let local_id = self.call_new_span(span_meta);
399
400        // Store mapping from tracing's Id to our local ID
401        self.span_ids.lock().insert(id.into_u64(), local_id);
402    }
403
404    fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
405        let span_id = match self.span_ids.lock().get(&id.into_u64()) {
406            Some(&id) => id,
407            None => return,
408        };
409
410        let mut visitor = FieldVisitor::new();
411        values.record(&mut visitor);
412
413        if !visitor.fields.is_empty() {
414            self.call_record(span_id, visitor.fields);
415        }
416    }
417
418    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
419        let meta = event.metadata();
420
421        // Collect fields
422        let mut visitor = FieldVisitor::new();
423        event.record(&mut visitor);
424
425        // Extract message from fields
426        let message = visitor
427            .fields
428            .iter()
429            .find(|f| f.name == "message")
430            .map(|f| f.value.clone())
431            .unwrap_or_default();
432
433        // Get parent span ID if any
434        let parent_span_id = ctx
435            .current_span()
436            .id()
437            .and_then(|id| self.span_ids.lock().get(&id.into_u64()).copied());
438
439        let event_meta = EventMeta {
440            message,
441            target: meta.target().to_string(),
442            level: meta.level().to_string(),
443            file: meta.file().map(|s| s.to_string()),
444            line: meta.line(),
445            fields: visitor.fields,
446            parent_span_id,
447        };
448
449        self.call_event(event_meta);
450    }
451
452    fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
453        if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
454            self.call_enter(span_id);
455        }
456    }
457
458    fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
459        if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
460            self.call_exit(span_id);
461        }
462    }
463
464    fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
465        if let Some(span_id) = self.span_ids.lock().remove(&id.into_u64()) {
466            self.call_drop_span(span_id);
467        }
468    }
469}
470
471/// Visitor for collecting tracing fields into our Field type.
472struct FieldVisitor {
473    fields: Vec<Field>,
474}
475
476impl FieldVisitor {
477    fn new() -> Self {
478        Self { fields: Vec::new() }
479    }
480}
481
482impl tracing::field::Visit for FieldVisitor {
483    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
484        self.fields.push(Field {
485            name: field.name().to_string(),
486            value: format!("{:?}", value),
487        });
488    }
489
490    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
491        self.fields.push(Field {
492            name: field.name().to_string(),
493            value: value.to_string(),
494        });
495    }
496
497    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
498        self.fields.push(Field {
499            name: field.name().to_string(),
500            value: value.to_string(),
501        });
502    }
503
504    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
505        self.fields.push(Field {
506            name: field.name().to_string(),
507            value: value.to_string(),
508        });
509    }
510
511    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
512        self.fields.push(Field {
513            name: field.name().to_string(),
514            value: value.to_string(),
515        });
516    }
517}
518
519// ============================================================================
520// Host Side: TracingSink Implementation
521// ============================================================================
522
523/// Collected trace data for inspection/testing.
524#[derive(Debug, Clone)]
525pub enum TraceRecord {
526    NewSpan { id: u64, meta: SpanMeta },
527    Record { span_id: u64, fields: Vec<Field> },
528    Event(EventMeta),
529    Enter { span_id: u64 },
530    Exit { span_id: u64 },
531    DropSpan { span_id: u64 },
532}
533
534/// Host-side implementation of TracingSink.
535///
536/// Collects all trace data into a buffer for inspection/testing.
537/// In a real application, you might forward to a real tracing subscriber.
538#[derive(Clone)]
539pub struct HostTracingSink {
540    records: Arc<Mutex<Vec<TraceRecord>>>,
541    next_span_id: Arc<AtomicU64>,
542}
543
544impl HostTracingSink {
545    /// Create a new sink that collects trace data.
546    pub fn new() -> Self {
547        Self {
548            records: Arc::new(Mutex::new(Vec::new())),
549            next_span_id: Arc::new(AtomicU64::new(1)),
550        }
551    }
552
553    /// Get all collected trace records.
554    pub fn records(&self) -> Vec<TraceRecord> {
555        self.records.lock().clone()
556    }
557
558    /// Clear all collected records.
559    pub fn clear(&self) {
560        self.records.lock().clear();
561    }
562}
563
564impl Default for HostTracingSink {
565    fn default() -> Self {
566        Self::new()
567    }
568}
569
570impl TracingSink for HostTracingSink {
571    async fn new_span(&self, span: SpanMeta) -> u64 {
572        let id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
573        self.records
574            .lock()
575            .push(TraceRecord::NewSpan { id, meta: span });
576        id
577    }
578
579    async fn record(&self, span_id: u64, fields: Vec<Field>) {
580        self.records
581            .lock()
582            .push(TraceRecord::Record { span_id, fields });
583    }
584
585    async fn event(&self, event: EventMeta) {
586        self.records.lock().push(TraceRecord::Event(event));
587    }
588
589    async fn enter(&self, span_id: u64) {
590        self.records.lock().push(TraceRecord::Enter { span_id });
591    }
592
593    async fn exit(&self, span_id: u64) {
594        self.records.lock().push(TraceRecord::Exit { span_id });
595    }
596
597    async fn drop_span(&self, span_id: u64) {
598        self.records.lock().push(TraceRecord::DropSpan { span_id });
599    }
600}
601
602// ============================================================================
603// Dispatcher Helper
604// ============================================================================
605
606/// Create a dispatcher for TracingSink service.
607pub fn create_tracing_sink_dispatcher(
608    sink: HostTracingSink,
609) -> impl Fn(Frame) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
610+ Send
611+ Sync
612+ 'static {
613    move |request: Frame| {
614        let sink = sink.clone();
615        Box::pin(async move {
616            let server = TracingSinkServer::new(sink);
617            let mut response = server
618                .dispatch(request.desc.method_id, request.payload_bytes())
619                .await?;
620            response.desc.channel_id = request.desc.channel_id;
621            response.desc.msg_id = request.desc.msg_id;
622            Ok(response)
623        })
624    }
625}
626
627// TracingSinkClient is generated by the rapace::service attribute.
628// Use TracingSinkClient::new(session) to create a client.