1use once_cell::sync::OnceCell;
2use serde::Serialize;
3use serde_json::Value as JsonValue;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6use tracing::subscriber::DefaultGuard;
7use tracing::Subscriber;
8use tracing_log::LogTracer;
9#[cfg(feature = "otlp")]
10use tracing_opentelemetry::OpenTelemetrySpanExt;
11use tracing_subscriber::filter::EnvFilter;
12use tracing_subscriber::layer::SubscriberExt;
13use tracing_subscriber::Layer;
14
15#[cfg(not(target_arch = "wasm32"))]
16use tracing_subscriber::reload;
17
18#[cfg(not(target_arch = "wasm32"))]
19use std::time::SystemTime;
20
21#[cfg(feature = "otlp")]
22use opentelemetry::trace::TraceContextExt;
23#[cfg(feature = "otlp")]
24use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
25#[cfg(feature = "otlp")]
26use opentelemetry::Context as OtelContext;
27
28const DEFAULT_LOG_FILTER: &str = "info";
29
30#[derive(Debug, Clone, Serialize)]
31#[serde(rename_all = "camelCase")]
32pub struct RuntimeLogRecord {
33 pub ts: String,
34 pub level: String,
35 pub target: String,
36 pub message: String,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub trace_id: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub span_id: Option<String>,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub fields: Option<JsonValue>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46#[serde(rename_all = "camelCase")]
47pub struct TraceEvent {
48 pub name: String,
49 pub cat: String,
50 pub ph: String,
51 pub ts: i64,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub dur: Option<i64>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub pid: Option<i64>,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub tid: Option<i64>,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub trace_id: Option<String>,
60 #[serde(skip_serializing_if = "Option::is_none")]
61 pub span_id: Option<String>,
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub parent_span_id: Option<String>,
64 #[serde(skip_serializing_if = "Option::is_none")]
65 pub args: Option<JsonValue>,
66}
67
68type LogHook = Arc<dyn Fn(&RuntimeLogRecord) + Send + Sync>;
69type TraceHook = Arc<dyn Fn(&[TraceEvent]) + Send + Sync>;
70
71static LOG_HOOK: OnceCell<LogHook> = OnceCell::new();
72static TRACE_HOOK: OnceCell<TraceHook> = OnceCell::new();
73static FALLBACK_TRACE_ID: OnceCell<String> = OnceCell::new();
74static EVENT_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
75#[cfg(not(target_arch = "wasm32"))]
76static LOG_FILTER_HANDLE: OnceCell<
77 reload::Handle<EnvFilter, tracing_subscriber::registry::Registry>,
78> = OnceCell::new();
79
80pub struct LoggingGuard {
81 _guard: Option<DefaultGuard>,
82}
83
84#[derive(Clone, Default)]
85pub struct LoggingOptions {
86 pub enable_otlp: bool,
87 pub enable_traces: bool,
88 pub pid: i64,
89 pub default_filter: Option<String>,
90}
91
92pub fn set_runtime_log_hook<F>(hook: F)
93where
94 F: Fn(&RuntimeLogRecord) + Send + Sync + 'static,
95{
96 let _ = LOG_HOOK.set(Arc::new(hook));
97}
98
99pub fn set_trace_hook<F>(hook: F)
100where
101 F: Fn(&[TraceEvent]) + Send + Sync + 'static,
102{
103 let _ = TRACE_HOOK.set(Arc::new(hook));
104}
105
106pub fn init_logging(opts: LoggingOptions) -> LoggingGuard {
107 let _ = LogTracer::init();
109
110 let fallback_filter = opts.default_filter.as_deref().unwrap_or(DEFAULT_LOG_FILTER);
111
112 let env_filter = EnvFilter::try_from_default_env()
113 .or_else(|_| EnvFilter::try_from_env("RUNMAT_LOG"))
114 .or_else(|_| EnvFilter::try_new(fallback_filter))
115 .unwrap_or_else(|_| EnvFilter::new(fallback_filter));
116
117 let build_subscriber = || {
118 let bridge_layer = LogBridgeLayer;
119 let trace_layer = if opts.enable_traces {
120 Some(TraceBridgeLayer { pid: opts.pid })
121 } else {
122 None
123 };
124
125 #[cfg(not(target_arch = "wasm32"))]
126 {
127 let (filter_layer, filter_handle) = reload::Layer::new(env_filter.clone());
128 if LOG_FILTER_HANDLE.get().is_none() {
129 let _ = LOG_FILTER_HANDLE.set(filter_handle.clone());
130 }
131 tracing_subscriber::registry()
132 .with(filter_layer)
133 .with(bridge_layer)
134 .with(trace_layer.clone())
135 }
136
137 #[cfg(target_arch = "wasm32")]
138 {
139 tracing_subscriber::registry()
140 .with(env_filter.clone())
141 .with(bridge_layer)
142 .with(trace_layer.clone())
143 }
144 };
145
146 let subscriber = build_subscriber();
147
148 #[cfg(feature = "otlp")]
149 let subscriber = {
150 let otel_layer = opts.enable_otlp.then(otel_layer);
151 subscriber.with(otel_layer)
152 };
153
154 let guard = match tracing::subscriber::set_global_default(subscriber) {
155 Ok(()) => None,
156 Err(_) => Some(tracing::subscriber::set_default(build_subscriber())),
157 };
158
159 LoggingGuard { _guard: guard }
160}
161
162#[cfg(not(target_arch = "wasm32"))]
163pub fn update_log_filter(spec: &str) -> Result<(), String> {
164 let handle = LOG_FILTER_HANDLE
165 .get()
166 .ok_or_else(|| "log filter handle not initialised".to_string())?;
167 let filter = EnvFilter::try_new(spec).map_err(|err| err.to_string())?;
168 handle.reload(filter).map_err(|err| err.to_string())
169}
170
171#[cfg(target_arch = "wasm32")]
172pub fn update_log_filter(_spec: &str) -> Result<(), String> {
173 Err("runtime log filtering is not yet supported in wasm builds".to_string())
174}
175
176pub fn with_signal_trace<T>(trace_id: Option<&str>, name: &str, f: impl FnOnce() -> T) -> T {
177 let Some(trace_id) = trace_id else {
178 return f();
179 };
180 if let Some(span) = build_signal_span(trace_id, name) {
181 let _guard = span.enter();
182 return f();
183 }
184 f()
185}
186
187fn build_signal_span(trace_id: &str, name: &str) -> Option<tracing::Span> {
188 let span = tracing::span!(
189 tracing::Level::INFO,
190 "signal",
191 signal = name,
192 trace_id = trace_id
193 );
194 #[cfg(feature = "otlp")]
195 {
196 if let Some((trace_id, span_id)) = parse_trace_parent(trace_id) {
197 let context = SpanContext::new(
198 trace_id,
199 span_id,
200 TraceFlags::SAMPLED,
201 true,
202 TraceState::default(),
203 );
204 span.set_parent(OtelContext::new().with_remote_span_context(context));
205 }
206 }
207 Some(span)
208}
209
210#[cfg(feature = "otlp")]
211fn parse_trace_parent(trace_id: &str) -> Option<(TraceId, SpanId)> {
212 if trace_id.len() != 32 {
213 return None;
214 }
215 let trace_id = TraceId::from_hex(trace_id).ok()?;
216 let span_id = SpanId::from_hex(&trace_id_hex_tail(trace_id)).ok()?;
217 Some((trace_id, span_id))
218}
219
220#[cfg(feature = "otlp")]
221fn trace_id_hex_tail(trace_id: TraceId) -> String {
222 let hex = trace_id.to_string();
223 hex.chars()
224 .rev()
225 .take(16)
226 .collect::<Vec<_>>()
227 .into_iter()
228 .rev()
229 .collect()
230}
231
232struct LogBridgeLayer;
233#[derive(Clone)]
234struct TraceBridgeLayer {
235 pid: i64,
236}
237
238#[cfg(target_arch = "wasm32")]
239fn now_rfc3339() -> String {
240 js_sys::Date::new_0()
241 .to_iso_string()
242 .as_string()
243 .unwrap_or_else(|| "1970-01-01T00:00:00.000Z".to_string())
244}
245
246#[cfg(not(target_arch = "wasm32"))]
247fn now_rfc3339() -> String {
248 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
249}
250
251#[cfg(target_arch = "wasm32")]
252fn now_timestamp_micros() -> i64 {
253 (js_sys::Date::now() * 1000.0).round() as i64
255}
256
257#[cfg(not(target_arch = "wasm32"))]
258fn now_timestamp_micros() -> i64 {
259 chrono::Utc::now().timestamp_micros()
260}
261
262impl<S> Layer<S> for LogBridgeLayer
263where
264 S: Subscriber,
265{
266 fn on_event(
267 &self,
268 event: &tracing::Event<'_>,
269 _ctx: tracing_subscriber::layer::Context<'_, S>,
270 ) {
271 let mut visitor = JsonVisitor::default();
272 event.record(&mut visitor);
273
274 let record = RuntimeLogRecord {
275 ts: now_rfc3339(),
276 level: event.metadata().level().to_string(),
277 target: event.metadata().target().to_string(),
278 message: visitor
279 .message
280 .unwrap_or_else(|| event.metadata().name().to_string()),
281 trace_id: current_trace_id(),
282 span_id: current_span_id(),
283 fields: visitor
284 .fields
285 .and_then(|v| v.as_object().cloned().map(JsonValue::Object))
286 .filter(|obj| obj.as_object().map(|m| !m.is_empty()).unwrap_or(false)),
287 };
288
289 if let Some(hook) = LOG_HOOK.get() {
290 hook(&record);
291 }
292 }
293}
294
295impl<S> Layer<S> for TraceBridgeLayer
296where
297 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
298{
299 fn on_event(
300 &self,
301 event: &tracing::Event<'_>,
302 _ctx: tracing_subscriber::layer::Context<'_, S>,
303 ) {
304 let hook = match TRACE_HOOK.get() {
306 Some(h) => h,
307 None => return,
308 };
309
310 let meta = event.metadata();
311 let ts = now_timestamp_micros();
312
313 let trace_id = current_trace_id();
314 let parent_span_id = current_span_id();
315 let span_id = Some(next_event_span_id());
316
317 let mut visitor = JsonVisitor::default();
318 event.record(&mut visitor);
319
320 let args = visitor
321 .fields
322 .as_ref()
323 .and_then(|v| v.as_object())
324 .cloned()
325 .map(JsonValue::Object);
326
327 let ev = TraceEvent {
328 name: visitor.message.unwrap_or_else(|| meta.name().to_string()),
329 cat: meta.target().to_string(),
330 ph: "i".to_string(), ts,
332 dur: None,
333 pid: Some(self.pid),
334 tid: None,
335 trace_id,
336 span_id,
337 parent_span_id,
338 args,
339 };
340
341 hook(&[ev]);
342 }
343
344 fn on_enter(&self, id: &tracing::span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
345 if TRACE_HOOK.get().is_none() {
346 return;
347 }
348 if let Some(span) = ctx.span(id) {
349 emit_span_event(span, "B", self.pid);
350 }
351 }
352
353 fn on_exit(&self, id: &tracing::span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
354 if TRACE_HOOK.get().is_none() {
355 return;
356 }
357 if let Some(span) = ctx.span(id) {
358 emit_span_event(span, "E", self.pid);
359 }
360 }
361}
362
363fn emit_span_event<S>(span: tracing_subscriber::registry::SpanRef<'_, S>, phase: &str, pid: i64)
364where
365 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
366{
367 let hook = match TRACE_HOOK.get() {
368 Some(h) => h,
369 None => return,
370 };
371 let meta = span.metadata();
372 let ts = now_timestamp_micros();
373
374 let trace_id = current_trace_id();
375 let span_id = Some(span.id().clone().into_u64().to_string());
376 let parent_span_id = span
377 .parent()
378 .map(|parent| parent.id().clone().into_u64().to_string());
379
380 let ev = TraceEvent {
381 name: meta.name().to_string(),
382 cat: meta.target().to_string(),
383 ph: phase.to_string(),
384 ts,
385 dur: None,
386 pid: Some(pid),
387 tid: None,
388 trace_id,
389 span_id,
390 parent_span_id,
391 args: None,
392 };
393 hook(&[ev]);
394}
395
396fn current_trace_id() -> Option<String> {
397 current_trace_span_ids().0
398}
399
400fn current_span_id() -> Option<String> {
401 current_trace_span_ids().1
402}
403
404fn current_trace_span_ids() -> (Option<String>, Option<String>) {
405 #[cfg(feature = "otlp")]
406 {
407 use opentelemetry::trace::TraceContextExt;
408 use tracing_opentelemetry::OpenTelemetrySpanExt;
409 let span = tracing::Span::current();
410 let ctx = span.context();
411 let span = ctx.span();
412 let sc = span.span_context();
413 if sc.is_valid() {
414 return (
415 Some(sc.trace_id().to_string()),
416 Some(sc.span_id().to_string()),
417 );
418 }
419 }
420 let span_id = tracing::Span::current()
421 .id()
422 .map(|id| id.into_u64().to_string());
423 let trace_id = Some(fallback_trace_id());
424 (trace_id, span_id)
425}
426
427fn fallback_trace_id() -> String {
428 FALLBACK_TRACE_ID
429 .get_or_init(|| {
430 #[cfg(target_arch = "wasm32")]
431 {
432 let micros = now_timestamp_micros() as u128;
433 let rand = (js_sys::Math::random() * 1_000_000.0) as u128;
434 format!("{:x}-{:x}", micros, rand)
435 }
436 #[cfg(not(target_arch = "wasm32"))]
437 {
438 let nanos = SystemTime::now()
439 .duration_since(std::time::UNIX_EPOCH)
440 .map(|d| d.as_nanos())
441 .unwrap_or_default();
442 let tid = format!("{:?}", std::thread::current().id());
443 format!("{:x}-{tid}", nanos)
444 }
445 })
446 .clone()
447}
448
449fn next_event_span_id() -> String {
450 let id = EVENT_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
451 format!("ev-{id}")
452}
453
454#[derive(Default)]
455struct JsonVisitor {
456 message: Option<String>,
457 fields: Option<JsonValue>,
458}
459
460impl tracing::field::Visit for JsonVisitor {
461 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
462 let entry = serde_json::json!(format!("{value:?}"));
463 if field.name() == "message" {
464 self.message = Some(entry.as_str().unwrap_or_default().to_string());
465 } else {
466 let obj = self
467 .fields
468 .get_or_insert_with(|| JsonValue::Object(Default::default()));
469 if let JsonValue::Object(map) = obj {
470 map.insert(field.name().to_string(), entry);
471 }
472 }
473 }
474
475 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
476 let entry = JsonValue::String(value.to_string());
477 if field.name() == "message" {
478 self.message = Some(value.to_string());
479 } else {
480 let obj = self
481 .fields
482 .get_or_insert_with(|| JsonValue::Object(Default::default()));
483 if let JsonValue::Object(map) = obj {
484 map.insert(field.name().to_string(), entry);
485 }
486 }
487 }
488}
489
490#[cfg(feature = "otlp")]
491fn otel_layer<S>() -> tracing_opentelemetry::OpenTelemetryLayer<S, opentelemetry_sdk::trace::Tracer>
492where
493 S: tracing::Subscriber,
494 for<'span> S: tracing_subscriber::registry::LookupSpan<'span>,
495{
496 use opentelemetry::KeyValue;
497 use opentelemetry_otlp::WithExportConfig;
498 use opentelemetry_sdk::{runtime::Tokio, trace, Resource};
499
500 let endpoint = std::env::var("RUNMAT_OTEL_ENDPOINT").unwrap_or_default();
501 let otel_exporter = opentelemetry_otlp::new_exporter()
502 .http()
503 .with_endpoint(endpoint);
504 let otel_tracer = opentelemetry_otlp::new_pipeline()
505 .tracing()
506 .with_exporter(otel_exporter)
507 .with_trace_config(
508 trace::config()
509 .with_resource(Resource::new(vec![KeyValue::new("service.name", "runmat")])),
510 )
511 .install_batch(Tokio)
512 .expect("failed to install OTEL pipeline");
513
514 tracing_opentelemetry::OpenTelemetryLayer::new(otel_tracer)
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use std::sync::{Arc, Mutex};
521 use tracing::info;
522
523 #[test]
524 fn log_hook_receives_record() {
525 let captured: Arc<Mutex<Vec<RuntimeLogRecord>>> = Arc::new(Mutex::new(Vec::new()));
526 let hook = {
527 let c = captured.clone();
528 move |rec: &RuntimeLogRecord| {
529 c.lock().unwrap().push(rec.clone());
530 }
531 };
532 set_runtime_log_hook(hook);
533 let _guard = init_logging(LoggingOptions {
534 enable_otlp: false,
535 enable_traces: false,
536 pid: 1,
537 default_filter: None,
538 });
539
540 info!("hello world");
541
542 let items = captured.lock().unwrap();
543 assert!(!items.is_empty());
544 assert!(items.iter().any(|r| r.message.contains("hello world")));
545 }
546
547 #[test]
548 fn trace_hook_receives_events() {
549 let captured: Arc<Mutex<Vec<TraceEvent>>> = Arc::new(Mutex::new(Vec::new()));
550 let hook = {
551 let c = captured.clone();
552 move |events: &[TraceEvent]| {
553 c.lock().unwrap().extend_from_slice(events);
554 }
555 };
556 set_trace_hook(hook);
557 let _guard = init_logging(LoggingOptions {
558 enable_otlp: false,
559 enable_traces: true,
560 pid: 1,
561 default_filter: None,
562 });
563
564 let span = tracing::info_span!("test_span");
565 let _enter = span.enter();
566 info!("inside span");
567
568 let items = captured.lock().unwrap();
569 assert!(!items.is_empty());
570 assert!(items.iter().any(|e| e.name == "test_span"
571 || e.message()
572 .unwrap_or_else(|| "".to_string())
573 .contains("inside span")));
574 }
575
576 impl TraceEvent {
578 fn message(&self) -> Option<String> {
579 if let Some(args) = &self.args {
580 if let Some(obj) = args.as_object() {
581 if let Some(val) = obj.get("message") {
582 return val.as_str().map(|s| s.to_string());
583 }
584 }
585 }
586 None
587 }
588 }
589}