rapace_tracing/
lib.rs

1#![allow(clippy::type_complexity)]
2//! Tracing subscriber that forwards spans/events over rapace RPC.
3//!
4//! This crate enables plugins to use `tracing` normally while having all
5//! spans and events collected in the host process via rapace RPC.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌─────────────────────────────────────────────────────────────────────────┐
11//! │                             PLUGIN PROCESS                              │
12//! │                                                                         │
13//! │   tracing::info!("hello") ──► RapaceTracingLayer ──► TracingSinkClient ─┤
14//! │                                      ▲                                  │
15//! │                                      │                                  │
16//! │                          TracingConfigServer ◄──────────────────────────┤
17//! │                          (applies host's filter)                        │
18//! └────────────────────────────────────────────────────────────────────────┬┘
19//!                                                                          │
20//!                              rapace transport (TCP/Unix/SHM)             │
21//!                                                                          │
22//! ┌────────────────────────────────────────────────────────────────────────┴┐
23//! │                              HOST PROCESS                               │
24//! │                                                                         │
25//! │   TracingSinkServer ──► HostTracingSink ──► tracing_subscriber / logs  │
26//! │                                                                         │
27//! │   TracingConfigClient ──► pushes filter changes to plugin              │
28//! │                                                                         │
29//! └─────────────────────────────────────────────────────────────────────────┘
30//! ```
31//!
32//! # Filter Flow
33//!
34//! The host is the single source of truth for log filtering:
35//! 1. Host decides what log levels/targets are enabled
36//! 2. Host pushes filter config to plugin via `TracingConfig::set_filter`
37//! 3. Plugin applies the filter locally (no spam over RPC)
38//! 4. When host changes filters dynamically, it pushes the update
39//!
40//! # Example
41//!
42//! ```ignore
43//! // Plugin side: install the layer
44//! let layer = RapaceTracingLayer::new(sink_client);
45//! tracing_subscriber::registry().with(layer).init();
46//!
47//! // Now all tracing calls are forwarded to the host
48//! tracing::info!("hello from plugin");
49//! ```
50
51use std::collections::HashMap;
52use std::pin::Pin;
53use std::sync::Arc;
54use std::sync::atomic::{AtomicU64, Ordering};
55
56use parking_lot::Mutex;
57use rapace::{Frame, RpcError, RpcSession, Transport};
58use tracing::span::{Attributes, Record};
59use tracing::{Event, Id, Subscriber};
60use tracing_subscriber::Layer;
61use tracing_subscriber::layer::Context;
62use tracing_subscriber::registry::LookupSpan;
63
64// ============================================================================
65// Facet Types (transport-agnostic)
66// ============================================================================
67
68/// A field value captured from tracing.
69#[derive(Debug, Clone, facet::Facet)]
70pub struct Field {
71    /// Field name
72    pub name: String,
73    /// Field value (stringified for v1)
74    pub value: String,
75}
76
77/// Metadata about a span.
78#[derive(Debug, Clone, facet::Facet)]
79pub struct SpanMeta {
80    /// Span name
81    pub name: String,
82    /// Target (module path)
83    pub target: String,
84    /// Level as string ("TRACE", "DEBUG", "INFO", "WARN", "ERROR")
85    pub level: String,
86    /// Source file, if available
87    pub file: Option<String>,
88    /// Line number, if available
89    pub line: Option<u32>,
90    /// Fields recorded at span creation
91    pub fields: Vec<Field>,
92}
93
94/// Metadata about an event.
95#[derive(Debug, Clone, facet::Facet)]
96pub struct EventMeta {
97    /// Event message (from the `message` field if present)
98    pub message: String,
99    /// Target (module path)
100    pub target: String,
101    /// Level as string
102    pub level: String,
103    /// Source file, if available
104    pub file: Option<String>,
105    /// Line number, if available
106    pub line: Option<u32>,
107    /// All fields including message
108    pub fields: Vec<Field>,
109    /// Parent span ID if inside a span
110    pub parent_span_id: Option<u64>,
111}
112
113// ============================================================================
114// TracingSink Service (plugin calls host)
115// ============================================================================
116
117/// Service for receiving tracing data from plugins.
118///
119/// The host implements this, the plugin calls it via RPC.
120#[allow(async_fn_in_trait)]
121#[rapace::service]
122pub trait TracingSink {
123    /// Called when a new span is created.
124    /// Returns a span ID that the plugin should use for subsequent calls.
125    async fn new_span(&self, span: crate::SpanMeta) -> u64;
126
127    /// Called when fields are recorded on an existing span.
128    async fn record(&self, span_id: u64, fields: Vec<crate::Field>);
129
130    /// Called when an event is emitted.
131    async fn event(&self, event: crate::EventMeta);
132
133    /// Called when a span is entered.
134    async fn enter(&self, span_id: u64);
135
136    /// Called when a span is exited.
137    async fn exit(&self, span_id: u64);
138
139    /// Called when a span is dropped/closed.
140    async fn drop_span(&self, span_id: u64);
141}
142
143// ============================================================================
144// TracingConfig Service (host calls plugin)
145// ============================================================================
146
147/// Service for configuring tracing in plugins.
148///
149/// The plugin implements this, the host calls it via RPC to push filter config.
150/// This allows the host to be the single source of truth for log filtering.
151#[allow(async_fn_in_trait)]
152#[rapace::service]
153pub trait TracingConfig {
154    /// Set the tracing filter.
155    ///
156    /// The filter string uses the same format as RUST_LOG (e.g., "info,mymodule=debug").
157    /// The plugin should apply this filter to all subsequent tracing calls.
158    async fn set_filter(&self, filter: String);
159}
160
161// ============================================================================
162// Plugin Side: Shared Filter State
163// ============================================================================
164
165use tracing_subscriber::EnvFilter;
166
167/// Shared filter state that can be updated by the host.
168///
169/// This is wrapped in Arc and shared between `RapaceTracingLayer` and `TracingConfigImpl`.
170#[derive(Clone)]
171pub struct SharedFilter {
172    inner: Arc<parking_lot::RwLock<EnvFilter>>,
173}
174
175impl SharedFilter {
176    /// Create a new shared filter with default (allow all) settings.
177    pub fn new() -> Self {
178        // Default to allowing everything - host will push the real filter
179        // Note: EnvFilter::builder().parse("") returns a filter that blocks everything,
180        // so we explicitly use "trace" to allow all levels.
181        let filter = EnvFilter::new("trace");
182        Self {
183            inner: Arc::new(parking_lot::RwLock::new(filter)),
184        }
185    }
186
187    /// Update the filter from a filter string (RUST_LOG format).
188    pub fn set_filter(&self, filter_str: &str) {
189        match EnvFilter::builder().parse(filter_str) {
190            Ok(filter) => {
191                *self.inner.write() = filter;
192            }
193            Err(e) => {
194                // Log the error but don't crash - keep existing filter
195                eprintln!("rapace-tracing: invalid filter '{}': {}", filter_str, e);
196            }
197        }
198    }
199
200    /// Check if a given level is enabled (max level check).
201    ///
202    /// This is a quick check based on the filter's max level hint.
203    /// Target-specific filtering happens through the Layer's `enabled` method.
204    pub fn max_level_enabled(&self, level: tracing::level_filters::LevelFilter) -> bool {
205        let filter = self.inner.read();
206        if let Some(max) = filter.max_level_hint() {
207            level <= max
208        } else {
209            true
210        }
211    }
212
213    /// Get the current max level hint.
214    pub fn max_level_hint(&self) -> Option<tracing::level_filters::LevelFilter> {
215        self.inner.read().max_level_hint()
216    }
217}
218
219impl Default for SharedFilter {
220    fn default() -> Self {
221        Self::new()
222    }
223}
224
225// ============================================================================
226// Plugin Side: TracingConfig Implementation
227// ============================================================================
228
229/// Plugin-side implementation of TracingConfig.
230///
231/// Host calls this to push filter updates to the plugin.
232#[derive(Clone)]
233pub struct TracingConfigImpl {
234    filter: SharedFilter,
235}
236
237impl TracingConfigImpl {
238    /// Create a new TracingConfig implementation with the given shared filter.
239    pub fn new(filter: SharedFilter) -> Self {
240        Self { filter }
241    }
242}
243
244impl TracingConfig for TracingConfigImpl {
245    async fn set_filter(&self, filter: String) {
246        self.filter.set_filter(&filter);
247    }
248}
249
250/// Create a dispatcher for TracingConfig service (plugin side).
251pub fn create_tracing_config_dispatcher(
252    config: TracingConfigImpl,
253) -> impl Fn(
254    u32,
255    u32,
256    Vec<u8>,
257) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
258+ Send
259+ Sync
260+ 'static {
261    move |_channel_id, method_id, payload| {
262        let config = config.clone();
263        Box::pin(async move {
264            let server = TracingConfigServer::new(config);
265            server.dispatch(method_id, &payload).await
266        })
267    }
268}
269
270// ============================================================================
271// Plugin Side: RapaceTracingLayer
272// ============================================================================
273
274/// A tracing Layer that forwards spans/events to a TracingSink via RPC.
275///
276/// Install this layer in the plugin's tracing_subscriber registry to have
277/// all tracing data forwarded to the host process.
278///
279/// The layer uses a `SharedFilter` to apply host-controlled filtering locally,
280/// avoiding unnecessary RPC calls for filtered events.
281pub struct RapaceTracingLayer<T: Transport + Send + Sync + 'static> {
282    session: Arc<RpcSession<T>>,
283    /// Maps local tracing span IDs to our u64 IDs used in RPC
284    span_ids: Mutex<HashMap<u64, u64>>,
285    /// Counter for generating local span IDs
286    next_span_id: AtomicU64,
287    /// Runtime handle for spawning async tasks
288    rt: tokio::runtime::Handle,
289    /// Shared filter state (updated by host via TracingConfig)
290    filter: SharedFilter,
291}
292
293impl<T: Transport + Send + Sync + 'static> RapaceTracingLayer<T> {
294    /// Create a new layer that forwards to the given RPC session.
295    ///
296    /// The session should be connected to a host that implements TracingSink.
297    /// Use the returned `SharedFilter` to create a `TracingConfigImpl` for the
298    /// host to push filter updates.
299    pub fn new(session: Arc<RpcSession<T>>, rt: tokio::runtime::Handle) -> (Self, SharedFilter) {
300        let filter = SharedFilter::new();
301        let layer = Self {
302            session,
303            span_ids: Mutex::new(HashMap::new()),
304            next_span_id: AtomicU64::new(1),
305            rt,
306            filter: filter.clone(),
307        };
308        (layer, filter)
309    }
310
311    /// Create a new layer with an existing shared filter.
312    pub fn with_filter(
313        session: Arc<RpcSession<T>>,
314        rt: tokio::runtime::Handle,
315        filter: SharedFilter,
316    ) -> Self {
317        Self {
318            session,
319            span_ids: Mutex::new(HashMap::new()),
320            next_span_id: AtomicU64::new(1),
321            rt,
322            filter,
323        }
324    }
325
326    /// Call TracingSink.new_span via RPC (fire-and-forget from sync context).
327    fn call_new_span(&self, meta: SpanMeta) -> u64 {
328        let client = TracingSinkClient::new(self.session.clone());
329        let local_id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
330
331        self.rt.spawn(async move {
332            let _ = client.new_span(meta).await;
333        });
334
335        local_id
336    }
337
338    /// Call TracingSink.record via RPC.
339    fn call_record(&self, span_id: u64, fields: Vec<Field>) {
340        let client = TracingSinkClient::new(self.session.clone());
341        self.rt.spawn(async move {
342            let _ = client.record(span_id, fields).await;
343        });
344    }
345
346    /// Call TracingSink.event via RPC.
347    fn call_event(&self, event: EventMeta) {
348        let client = TracingSinkClient::new(self.session.clone());
349        self.rt.spawn(async move {
350            let _ = client.event(event).await;
351        });
352    }
353
354    /// Call TracingSink.enter via RPC.
355    fn call_enter(&self, span_id: u64) {
356        let client = TracingSinkClient::new(self.session.clone());
357        self.rt.spawn(async move {
358            let _ = client.enter(span_id).await;
359        });
360    }
361
362    /// Call TracingSink.exit via RPC.
363    fn call_exit(&self, span_id: u64) {
364        let client = TracingSinkClient::new(self.session.clone());
365        self.rt.spawn(async move {
366            let _ = client.exit(span_id).await;
367        });
368    }
369
370    /// Call TracingSink.drop_span via RPC.
371    fn call_drop_span(&self, span_id: u64) {
372        let client = TracingSinkClient::new(self.session.clone());
373        self.rt.spawn(async move {
374            let _ = client.drop_span(span_id).await;
375        });
376    }
377}
378
379impl<S, T> Layer<S> for RapaceTracingLayer<T>
380where
381    S: Subscriber + for<'a> LookupSpan<'a>,
382    T: Transport + Send + Sync + 'static,
383{
384    fn enabled(&self, metadata: &tracing::Metadata<'_>, _ctx: Context<'_, S>) -> bool {
385        // Check against the host-controlled filter
386        let level = match *metadata.level() {
387            tracing::Level::ERROR => tracing::level_filters::LevelFilter::ERROR,
388            tracing::Level::WARN => tracing::level_filters::LevelFilter::WARN,
389            tracing::Level::INFO => tracing::level_filters::LevelFilter::INFO,
390            tracing::Level::DEBUG => tracing::level_filters::LevelFilter::DEBUG,
391            tracing::Level::TRACE => tracing::level_filters::LevelFilter::TRACE,
392        };
393        self.filter.max_level_enabled(level)
394    }
395
396    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
397        let meta = attrs.metadata();
398
399        // Collect fields
400        let mut visitor = FieldVisitor::new();
401        attrs.record(&mut visitor);
402
403        let span_meta = SpanMeta {
404            name: meta.name().to_string(),
405            target: meta.target().to_string(),
406            level: meta.level().to_string(),
407            file: meta.file().map(|s| s.to_string()),
408            line: meta.line(),
409            fields: visitor.fields,
410        };
411
412        let local_id = self.call_new_span(span_meta);
413
414        // Store mapping from tracing's Id to our local ID
415        self.span_ids.lock().insert(id.into_u64(), local_id);
416    }
417
418    fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
419        let span_id = match self.span_ids.lock().get(&id.into_u64()) {
420            Some(&id) => id,
421            None => return,
422        };
423
424        let mut visitor = FieldVisitor::new();
425        values.record(&mut visitor);
426
427        if !visitor.fields.is_empty() {
428            self.call_record(span_id, visitor.fields);
429        }
430    }
431
432    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
433        let meta = event.metadata();
434
435        // Collect fields
436        let mut visitor = FieldVisitor::new();
437        event.record(&mut visitor);
438
439        // Extract message from fields
440        let message = visitor
441            .fields
442            .iter()
443            .find(|f| f.name == "message")
444            .map(|f| f.value.clone())
445            .unwrap_or_default();
446
447        // Get parent span ID if any
448        let parent_span_id = ctx
449            .current_span()
450            .id()
451            .and_then(|id| self.span_ids.lock().get(&id.into_u64()).copied());
452
453        let event_meta = EventMeta {
454            message,
455            target: meta.target().to_string(),
456            level: meta.level().to_string(),
457            file: meta.file().map(|s| s.to_string()),
458            line: meta.line(),
459            fields: visitor.fields,
460            parent_span_id,
461        };
462
463        self.call_event(event_meta);
464    }
465
466    fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
467        if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
468            self.call_enter(span_id);
469        }
470    }
471
472    fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
473        if let Some(&span_id) = self.span_ids.lock().get(&id.into_u64()) {
474            self.call_exit(span_id);
475        }
476    }
477
478    fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
479        if let Some(span_id) = self.span_ids.lock().remove(&id.into_u64()) {
480            self.call_drop_span(span_id);
481        }
482    }
483}
484
485/// Visitor for collecting tracing fields into our Field type.
486struct FieldVisitor {
487    fields: Vec<Field>,
488}
489
490impl FieldVisitor {
491    fn new() -> Self {
492        Self { fields: Vec::new() }
493    }
494}
495
496impl tracing::field::Visit for FieldVisitor {
497    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
498        self.fields.push(Field {
499            name: field.name().to_string(),
500            value: format!("{:?}", value),
501        });
502    }
503
504    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
505        self.fields.push(Field {
506            name: field.name().to_string(),
507            value: value.to_string(),
508        });
509    }
510
511    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
512        self.fields.push(Field {
513            name: field.name().to_string(),
514            value: value.to_string(),
515        });
516    }
517
518    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
519        self.fields.push(Field {
520            name: field.name().to_string(),
521            value: value.to_string(),
522        });
523    }
524
525    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
526        self.fields.push(Field {
527            name: field.name().to_string(),
528            value: value.to_string(),
529        });
530    }
531}
532
533// ============================================================================
534// Host Side: TracingSink Implementation
535// ============================================================================
536
537/// Collected trace data for inspection/testing.
538#[derive(Debug, Clone)]
539pub enum TraceRecord {
540    NewSpan { id: u64, meta: SpanMeta },
541    Record { span_id: u64, fields: Vec<Field> },
542    Event(EventMeta),
543    Enter { span_id: u64 },
544    Exit { span_id: u64 },
545    DropSpan { span_id: u64 },
546}
547
548/// Host-side implementation of TracingSink.
549///
550/// Collects all trace data into a buffer for inspection/testing.
551/// In a real application, you might forward to a real tracing subscriber.
552#[derive(Clone)]
553pub struct HostTracingSink {
554    records: Arc<Mutex<Vec<TraceRecord>>>,
555    next_span_id: Arc<AtomicU64>,
556}
557
558impl HostTracingSink {
559    /// Create a new sink that collects trace data.
560    pub fn new() -> Self {
561        Self {
562            records: Arc::new(Mutex::new(Vec::new())),
563            next_span_id: Arc::new(AtomicU64::new(1)),
564        }
565    }
566
567    /// Get all collected trace records.
568    pub fn records(&self) -> Vec<TraceRecord> {
569        self.records.lock().clone()
570    }
571
572    /// Clear all collected records.
573    pub fn clear(&self) {
574        self.records.lock().clear();
575    }
576}
577
578impl Default for HostTracingSink {
579    fn default() -> Self {
580        Self::new()
581    }
582}
583
584impl TracingSink for HostTracingSink {
585    async fn new_span(&self, span: SpanMeta) -> u64 {
586        let id = self.next_span_id.fetch_add(1, Ordering::Relaxed);
587        self.records
588            .lock()
589            .push(TraceRecord::NewSpan { id, meta: span });
590        id
591    }
592
593    async fn record(&self, span_id: u64, fields: Vec<Field>) {
594        self.records
595            .lock()
596            .push(TraceRecord::Record { span_id, fields });
597    }
598
599    async fn event(&self, event: EventMeta) {
600        self.records.lock().push(TraceRecord::Event(event));
601    }
602
603    async fn enter(&self, span_id: u64) {
604        self.records.lock().push(TraceRecord::Enter { span_id });
605    }
606
607    async fn exit(&self, span_id: u64) {
608        self.records.lock().push(TraceRecord::Exit { span_id });
609    }
610
611    async fn drop_span(&self, span_id: u64) {
612        self.records.lock().push(TraceRecord::DropSpan { span_id });
613    }
614}
615
616// ============================================================================
617// Dispatcher Helper
618// ============================================================================
619
620/// Create a dispatcher for TracingSink service.
621pub fn create_tracing_sink_dispatcher(
622    sink: HostTracingSink,
623) -> impl Fn(
624    u32,
625    u32,
626    Vec<u8>,
627) -> Pin<Box<dyn std::future::Future<Output = Result<Frame, RpcError>> + Send>>
628+ Send
629+ Sync
630+ 'static {
631    move |_channel_id, method_id, payload| {
632        let sink = sink.clone();
633        Box::pin(async move {
634            let server = TracingSinkServer::new(sink);
635            server.dispatch(method_id, &payload).await
636        })
637    }
638}
639
640// TracingSinkClient is generated by the rapace::service attribute.
641// Use TracingSinkClient::new(session) to create a client.