Skip to main content

runmat_logging/
lib.rs

1use once_cell::sync::OnceCell;
2use serde::Serialize;
3use serde_json::Value as JsonValue;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6use tracing::subscriber::DefaultGuard;
7use tracing::Subscriber;
8use tracing_log::LogTracer;
9#[cfg(feature = "otlp")]
10use tracing_opentelemetry::OpenTelemetrySpanExt;
11use tracing_subscriber::filter::EnvFilter;
12use tracing_subscriber::layer::SubscriberExt;
13use tracing_subscriber::Layer;
14
15#[cfg(not(target_arch = "wasm32"))]
16use tracing_subscriber::reload;
17
18#[cfg(not(target_arch = "wasm32"))]
19use std::time::SystemTime;
20
21#[cfg(feature = "otlp")]
22use opentelemetry::trace::TraceContextExt;
23#[cfg(feature = "otlp")]
24use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
25#[cfg(feature = "otlp")]
26use opentelemetry::Context as OtelContext;
27
28const DEFAULT_LOG_FILTER: &str = "info";
29
30#[derive(Debug, Clone, Serialize)]
31#[serde(rename_all = "camelCase")]
32pub struct RuntimeLogRecord {
33    pub ts: String,
34    pub level: String,
35    pub target: String,
36    pub message: String,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub trace_id: Option<String>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub span_id: Option<String>,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub fields: Option<JsonValue>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46#[serde(rename_all = "camelCase")]
47pub struct TraceEvent {
48    pub name: String,
49    pub cat: String,
50    pub ph: String,
51    pub ts: i64,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub dur: Option<i64>,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub pid: Option<i64>,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub tid: Option<i64>,
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub trace_id: Option<String>,
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub span_id: Option<String>,
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub parent_span_id: Option<String>,
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub args: Option<JsonValue>,
66}
67
68type LogHook = Arc<dyn Fn(&RuntimeLogRecord) + Send + Sync>;
69type TraceHook = Arc<dyn Fn(&[TraceEvent]) + Send + Sync>;
70
71static LOG_HOOK: OnceCell<LogHook> = OnceCell::new();
72static TRACE_HOOK: OnceCell<TraceHook> = OnceCell::new();
73static FALLBACK_TRACE_ID: OnceCell<String> = OnceCell::new();
74static EVENT_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
75#[cfg(not(target_arch = "wasm32"))]
76static LOG_FILTER_HANDLE: OnceCell<
77    reload::Handle<EnvFilter, tracing_subscriber::registry::Registry>,
78> = OnceCell::new();
79
80pub struct LoggingGuard {
81    _guard: Option<DefaultGuard>,
82}
83
84#[derive(Clone, Default)]
85pub struct LoggingOptions {
86    pub enable_otlp: bool,
87    pub enable_traces: bool,
88    pub pid: i64,
89    pub default_filter: Option<String>,
90}
91
92pub fn set_runtime_log_hook<F>(hook: F)
93where
94    F: Fn(&RuntimeLogRecord) + Send + Sync + 'static,
95{
96    let _ = LOG_HOOK.set(Arc::new(hook));
97}
98
99pub fn set_trace_hook<F>(hook: F)
100where
101    F: Fn(&[TraceEvent]) + Send + Sync + 'static,
102{
103    let _ = TRACE_HOOK.set(Arc::new(hook));
104}
105
106pub fn init_logging(opts: LoggingOptions) -> LoggingGuard {
107    // Install LogTracer so log:: macros flow into tracing
108    let _ = LogTracer::init();
109
110    let fallback_filter = opts.default_filter.as_deref().unwrap_or(DEFAULT_LOG_FILTER);
111
112    let env_filter = EnvFilter::try_from_default_env()
113        .or_else(|_| EnvFilter::try_from_env("RUNMAT_LOG"))
114        .or_else(|_| EnvFilter::try_new(fallback_filter))
115        .unwrap_or_else(|_| EnvFilter::new(fallback_filter));
116
117    let build_subscriber = || {
118        let bridge_layer = LogBridgeLayer;
119        let trace_layer = if opts.enable_traces {
120            Some(TraceBridgeLayer { pid: opts.pid })
121        } else {
122            None
123        };
124
125        #[cfg(not(target_arch = "wasm32"))]
126        {
127            let (filter_layer, filter_handle) = reload::Layer::new(env_filter.clone());
128            if LOG_FILTER_HANDLE.get().is_none() {
129                let _ = LOG_FILTER_HANDLE.set(filter_handle.clone());
130            }
131            tracing_subscriber::registry()
132                .with(filter_layer)
133                .with(bridge_layer)
134                .with(trace_layer.clone())
135        }
136
137        #[cfg(target_arch = "wasm32")]
138        {
139            tracing_subscriber::registry()
140                .with(env_filter.clone())
141                .with(bridge_layer)
142                .with(trace_layer.clone())
143        }
144    };
145
146    let subscriber = build_subscriber();
147
148    #[cfg(feature = "otlp")]
149    let subscriber = {
150        let otel_layer = opts.enable_otlp.then(otel_layer);
151        subscriber.with(otel_layer)
152    };
153
154    let guard = match tracing::subscriber::set_global_default(subscriber) {
155        Ok(()) => None,
156        Err(_) => Some(tracing::subscriber::set_default(build_subscriber())),
157    };
158
159    LoggingGuard { _guard: guard }
160}
161
162#[cfg(not(target_arch = "wasm32"))]
163pub fn update_log_filter(spec: &str) -> Result<(), String> {
164    let handle = LOG_FILTER_HANDLE
165        .get()
166        .ok_or_else(|| "log filter handle not initialised".to_string())?;
167    let filter = EnvFilter::try_new(spec).map_err(|err| err.to_string())?;
168    handle.reload(filter).map_err(|err| err.to_string())
169}
170
171#[cfg(target_arch = "wasm32")]
172pub fn update_log_filter(_spec: &str) -> Result<(), String> {
173    Err("runtime log filtering is not yet supported in wasm builds".to_string())
174}
175
176pub fn with_signal_trace<T>(trace_id: Option<&str>, name: &str, f: impl FnOnce() -> T) -> T {
177    let Some(trace_id) = trace_id else {
178        return f();
179    };
180    if let Some(span) = build_signal_span(trace_id, name) {
181        let _guard = span.enter();
182        return f();
183    }
184    f()
185}
186
187fn build_signal_span(trace_id: &str, name: &str) -> Option<tracing::Span> {
188    let span = tracing::span!(
189        tracing::Level::INFO,
190        "signal",
191        signal = name,
192        trace_id = trace_id
193    );
194    #[cfg(feature = "otlp")]
195    {
196        if let Some((trace_id, span_id)) = parse_trace_parent(trace_id) {
197            let context = SpanContext::new(
198                trace_id,
199                span_id,
200                TraceFlags::SAMPLED,
201                true,
202                TraceState::default(),
203            );
204            span.set_parent(OtelContext::new().with_remote_span_context(context));
205        }
206    }
207    Some(span)
208}
209
210#[cfg(feature = "otlp")]
211fn parse_trace_parent(trace_id: &str) -> Option<(TraceId, SpanId)> {
212    if trace_id.len() != 32 {
213        return None;
214    }
215    let trace_id = TraceId::from_hex(trace_id).ok()?;
216    let span_id = SpanId::from_hex(&trace_id_hex_tail(trace_id)).ok()?;
217    Some((trace_id, span_id))
218}
219
220#[cfg(feature = "otlp")]
221fn trace_id_hex_tail(trace_id: TraceId) -> String {
222    let hex = trace_id.to_string();
223    hex.chars()
224        .rev()
225        .take(16)
226        .collect::<Vec<_>>()
227        .into_iter()
228        .rev()
229        .collect()
230}
231
232struct LogBridgeLayer;
233#[derive(Clone)]
234struct TraceBridgeLayer {
235    pid: i64,
236}
237
238#[cfg(target_arch = "wasm32")]
239fn now_rfc3339() -> String {
240    js_sys::Date::new_0()
241        .to_iso_string()
242        .as_string()
243        .unwrap_or_else(|| "1970-01-01T00:00:00.000Z".to_string())
244}
245
246#[cfg(not(target_arch = "wasm32"))]
247fn now_rfc3339() -> String {
248    chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
249}
250
251#[cfg(target_arch = "wasm32")]
252fn now_timestamp_micros() -> i64 {
253    // Date::now returns milliseconds since epoch as f64
254    (js_sys::Date::now() * 1000.0).round() as i64
255}
256
257#[cfg(not(target_arch = "wasm32"))]
258fn now_timestamp_micros() -> i64 {
259    chrono::Utc::now().timestamp_micros()
260}
261
262impl<S> Layer<S> for LogBridgeLayer
263where
264    S: Subscriber,
265{
266    fn on_event(
267        &self,
268        event: &tracing::Event<'_>,
269        _ctx: tracing_subscriber::layer::Context<'_, S>,
270    ) {
271        let mut visitor = JsonVisitor::default();
272        event.record(&mut visitor);
273
274        let record = RuntimeLogRecord {
275            ts: now_rfc3339(),
276            level: event.metadata().level().to_string(),
277            target: event.metadata().target().to_string(),
278            message: visitor
279                .message
280                .unwrap_or_else(|| event.metadata().name().to_string()),
281            trace_id: current_trace_id(),
282            span_id: current_span_id(),
283            fields: visitor
284                .fields
285                .and_then(|v| v.as_object().cloned().map(JsonValue::Object))
286                .filter(|obj| obj.as_object().map(|m| !m.is_empty()).unwrap_or(false)),
287        };
288
289        if let Some(hook) = LOG_HOOK.get() {
290            hook(&record);
291        }
292    }
293}
294
295impl<S> Layer<S> for TraceBridgeLayer
296where
297    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
298{
299    fn on_event(
300        &self,
301        event: &tracing::Event<'_>,
302        _ctx: tracing_subscriber::layer::Context<'_, S>,
303    ) {
304        // Only emit trace events if a hook is set
305        let hook = match TRACE_HOOK.get() {
306            Some(h) => h,
307            None => return,
308        };
309
310        let meta = event.metadata();
311        let ts = now_timestamp_micros();
312
313        let trace_id = current_trace_id();
314        let parent_span_id = current_span_id();
315        let span_id = Some(next_event_span_id());
316
317        let mut visitor = JsonVisitor::default();
318        event.record(&mut visitor);
319
320        let args = visitor
321            .fields
322            .as_ref()
323            .and_then(|v| v.as_object())
324            .cloned()
325            .map(JsonValue::Object);
326
327        let ev = TraceEvent {
328            name: visitor.message.unwrap_or_else(|| meta.name().to_string()),
329            cat: meta.target().to_string(),
330            ph: "i".to_string(), // instant event
331            ts,
332            dur: None,
333            pid: Some(self.pid),
334            tid: None,
335            trace_id,
336            span_id,
337            parent_span_id,
338            args,
339        };
340
341        hook(&[ev]);
342    }
343
344    fn on_enter(&self, id: &tracing::span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
345        if TRACE_HOOK.get().is_none() {
346            return;
347        }
348        if let Some(span) = ctx.span(id) {
349            emit_span_event(span, "B", self.pid);
350        }
351    }
352
353    fn on_exit(&self, id: &tracing::span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
354        if TRACE_HOOK.get().is_none() {
355            return;
356        }
357        if let Some(span) = ctx.span(id) {
358            emit_span_event(span, "E", self.pid);
359        }
360    }
361}
362
363fn emit_span_event<S>(span: tracing_subscriber::registry::SpanRef<'_, S>, phase: &str, pid: i64)
364where
365    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
366{
367    let hook = match TRACE_HOOK.get() {
368        Some(h) => h,
369        None => return,
370    };
371    let meta = span.metadata();
372    let ts = now_timestamp_micros();
373
374    let trace_id = current_trace_id();
375    let span_id = Some(span.id().clone().into_u64().to_string());
376    let parent_span_id = span
377        .parent()
378        .map(|parent| parent.id().clone().into_u64().to_string());
379
380    let ev = TraceEvent {
381        name: meta.name().to_string(),
382        cat: meta.target().to_string(),
383        ph: phase.to_string(),
384        ts,
385        dur: None,
386        pid: Some(pid),
387        tid: None,
388        trace_id,
389        span_id,
390        parent_span_id,
391        args: None,
392    };
393    hook(&[ev]);
394}
395
396fn current_trace_id() -> Option<String> {
397    current_trace_span_ids().0
398}
399
400fn current_span_id() -> Option<String> {
401    current_trace_span_ids().1
402}
403
404fn current_trace_span_ids() -> (Option<String>, Option<String>) {
405    #[cfg(feature = "otlp")]
406    {
407        use opentelemetry::trace::TraceContextExt;
408        use tracing_opentelemetry::OpenTelemetrySpanExt;
409        let span = tracing::Span::current();
410        let ctx = span.context();
411        let span = ctx.span();
412        let sc = span.span_context();
413        if sc.is_valid() {
414            return (
415                Some(sc.trace_id().to_string()),
416                Some(sc.span_id().to_string()),
417            );
418        }
419    }
420    let span_id = tracing::Span::current()
421        .id()
422        .map(|id| id.into_u64().to_string());
423    let trace_id = Some(fallback_trace_id());
424    (trace_id, span_id)
425}
426
427fn fallback_trace_id() -> String {
428    FALLBACK_TRACE_ID
429        .get_or_init(|| {
430            #[cfg(target_arch = "wasm32")]
431            {
432                let micros = now_timestamp_micros() as u128;
433                let rand = (js_sys::Math::random() * 1_000_000.0) as u128;
434                format!("{:x}-{:x}", micros, rand)
435            }
436            #[cfg(not(target_arch = "wasm32"))]
437            {
438                let nanos = SystemTime::now()
439                    .duration_since(std::time::UNIX_EPOCH)
440                    .map(|d| d.as_nanos())
441                    .unwrap_or_default();
442                let tid = format!("{:?}", std::thread::current().id());
443                format!("{:x}-{tid}", nanos)
444            }
445        })
446        .clone()
447}
448
449fn next_event_span_id() -> String {
450    let id = EVENT_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
451    format!("ev-{id}")
452}
453
454#[derive(Default)]
455struct JsonVisitor {
456    message: Option<String>,
457    fields: Option<JsonValue>,
458}
459
460impl tracing::field::Visit for JsonVisitor {
461    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
462        let entry = serde_json::json!(format!("{value:?}"));
463        if field.name() == "message" {
464            self.message = Some(entry.as_str().unwrap_or_default().to_string());
465        } else {
466            let obj = self
467                .fields
468                .get_or_insert_with(|| JsonValue::Object(Default::default()));
469            if let JsonValue::Object(map) = obj {
470                map.insert(field.name().to_string(), entry);
471            }
472        }
473    }
474
475    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
476        let entry = JsonValue::String(value.to_string());
477        if field.name() == "message" {
478            self.message = Some(value.to_string());
479        } else {
480            let obj = self
481                .fields
482                .get_or_insert_with(|| JsonValue::Object(Default::default()));
483            if let JsonValue::Object(map) = obj {
484                map.insert(field.name().to_string(), entry);
485            }
486        }
487    }
488}
489
490#[cfg(feature = "otlp")]
491fn otel_layer<S>() -> tracing_opentelemetry::OpenTelemetryLayer<S, opentelemetry_sdk::trace::Tracer>
492where
493    S: tracing::Subscriber,
494    for<'span> S: tracing_subscriber::registry::LookupSpan<'span>,
495{
496    use opentelemetry::KeyValue;
497    use opentelemetry_otlp::WithExportConfig;
498    use opentelemetry_sdk::{runtime::Tokio, trace, Resource};
499
500    let endpoint = std::env::var("RUNMAT_OTEL_ENDPOINT").unwrap_or_default();
501    let otel_exporter = opentelemetry_otlp::new_exporter()
502        .http()
503        .with_endpoint(endpoint);
504    let otel_tracer = opentelemetry_otlp::new_pipeline()
505        .tracing()
506        .with_exporter(otel_exporter)
507        .with_trace_config(
508            trace::config()
509                .with_resource(Resource::new(vec![KeyValue::new("service.name", "runmat")])),
510        )
511        .install_batch(Tokio)
512        .expect("failed to install OTEL pipeline");
513
514    tracing_opentelemetry::OpenTelemetryLayer::new(otel_tracer)
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use std::sync::{Arc, Mutex};
521    use tracing::info;
522
523    #[test]
524    fn log_hook_receives_record() {
525        let captured: Arc<Mutex<Vec<RuntimeLogRecord>>> = Arc::new(Mutex::new(Vec::new()));
526        let hook = {
527            let c = captured.clone();
528            move |rec: &RuntimeLogRecord| {
529                c.lock().unwrap().push(rec.clone());
530            }
531        };
532        set_runtime_log_hook(hook);
533        let _guard = init_logging(LoggingOptions {
534            enable_otlp: false,
535            enable_traces: false,
536            pid: 1,
537            default_filter: None,
538        });
539
540        info!("hello world");
541
542        let items = captured.lock().unwrap();
543        assert!(!items.is_empty());
544        assert!(items.iter().any(|r| r.message.contains("hello world")));
545    }
546
547    #[test]
548    fn trace_hook_receives_events() {
549        let captured: Arc<Mutex<Vec<TraceEvent>>> = Arc::new(Mutex::new(Vec::new()));
550        let hook = {
551            let c = captured.clone();
552            move |events: &[TraceEvent]| {
553                c.lock().unwrap().extend_from_slice(events);
554            }
555        };
556        set_trace_hook(hook);
557        let _guard = init_logging(LoggingOptions {
558            enable_otlp: false,
559            enable_traces: true,
560            pid: 1,
561            default_filter: None,
562        });
563
564        let span = tracing::info_span!("test_span");
565        let _enter = span.enter();
566        info!("inside span");
567
568        let items = captured.lock().unwrap();
569        assert!(!items.is_empty());
570        assert!(items.iter().any(|e| e.name == "test_span"
571            || e.message()
572                .unwrap_or_else(|| "".to_string())
573                .contains("inside span")));
574    }
575
576    // helper to get message from TraceEvent args if present
577    impl TraceEvent {
578        fn message(&self) -> Option<String> {
579            if let Some(args) = &self.args {
580                if let Some(obj) = args.as_object() {
581                    if let Some(val) = obj.get("message") {
582                        return val.as_str().map(|s| s.to_string());
583                    }
584                }
585            }
586            None
587        }
588    }
589}