Skip to main content

deno_telemetry/
lib.rs

1// Copyright 2018-2026 the Deno authors. MIT license.
2
3#![allow(clippy::too_many_arguments)]
4#![expect(unexpected_cfgs)]
5
6use std::borrow::Cow;
7use std::cell::RefCell;
8use std::collections::HashMap;
9use std::env;
10use std::ffi::c_void;
11use std::fmt::Debug;
12use std::pin::Pin;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::sync::atomic::AtomicU64;
17use std::task::Context;
18use std::task::Poll;
19use std::thread;
20use std::time::Duration;
21use std::time::Instant;
22use std::time::SystemTime;
23
24use deno_core::GarbageCollected;
25use deno_core::OpState;
26use deno_core::futures::FutureExt;
27use deno_core::futures::Stream;
28use deno_core::futures::StreamExt;
29use deno_core::futures::channel::mpsc;
30use deno_core::futures::channel::mpsc::UnboundedSender;
31use deno_core::futures::future::BoxFuture;
32use deno_core::futures::stream;
33use deno_core::op2;
34use deno_core::v8;
35use deno_core::v8::DataError;
36use deno_error::JsError;
37use deno_error::JsErrorBox;
38use once_cell::sync::Lazy;
39use once_cell::sync::OnceCell;
40use opentelemetry::InstrumentationScope;
41pub use opentelemetry::Key;
42pub use opentelemetry::KeyValue;
43pub use opentelemetry::StringValue;
44pub use opentelemetry::Value;
45use opentelemetry::logs::AnyValue;
46use opentelemetry::logs::LogRecord as LogRecordTrait;
47use opentelemetry::logs::Severity;
48use opentelemetry::metrics::AsyncInstrumentBuilder;
49pub use opentelemetry::metrics::Gauge;
50pub use opentelemetry::metrics::Histogram;
51use opentelemetry::metrics::InstrumentBuilder;
52pub use opentelemetry::metrics::MeterProvider;
53pub use opentelemetry::metrics::UpDownCounter;
54use opentelemetry::otel_debug;
55use opentelemetry::otel_error;
56use opentelemetry::trace::Event;
57use opentelemetry::trace::Link;
58use opentelemetry::trace::SpanContext;
59use opentelemetry::trace::SpanId;
60use opentelemetry::trace::SpanKind;
61use opentelemetry::trace::Status as SpanStatus;
62use opentelemetry::trace::TraceFlags;
63use opentelemetry::trace::TraceId;
64use opentelemetry::trace::TraceState;
65use opentelemetry_otlp::HttpExporterBuilder;
66use opentelemetry_otlp::Protocol;
67use opentelemetry_otlp::WithExportConfig;
68use opentelemetry_otlp::WithHttpConfig;
69use opentelemetry_sdk::Resource;
70use opentelemetry_sdk::export::trace::SpanData;
71use opentelemetry_sdk::logs::BatchLogProcessor;
72use opentelemetry_sdk::logs::LogProcessor;
73use opentelemetry_sdk::logs::LogRecord;
74use opentelemetry_sdk::metrics::ManualReader;
75use opentelemetry_sdk::metrics::MetricResult;
76use opentelemetry_sdk::metrics::SdkMeterProvider;
77use opentelemetry_sdk::metrics::Temporality;
78use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
79use opentelemetry_sdk::metrics::reader::MetricReader;
80use opentelemetry_sdk::trace::BatchSpanProcessor;
81use opentelemetry_sdk::trace::IdGenerator;
82use opentelemetry_sdk::trace::RandomIdGenerator;
83use opentelemetry_sdk::trace::SpanEvents;
84use opentelemetry_sdk::trace::SpanLinks;
85use opentelemetry_sdk::trace::SpanProcessor as _;
86use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_NAME;
87use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_VERSION;
88use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_LANGUAGE;
89use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_NAME;
90use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_VERSION;
91use serde::Deserialize;
92use serde::Serialize;
93use thiserror::Error;
94use tokio::sync::oneshot;
95use tokio::task::JoinSet;
96
97deno_core::extension!(
98  deno_telemetry,
99  ops = [
100    op_otel_collect_isolate_metrics,
101    op_otel_enable_isolate_metrics,
102    op_otel_log,
103    op_otel_log_foreign,
104    op_otel_span_attribute1,
105    op_otel_span_attribute2,
106    op_otel_span_attribute3,
107    op_otel_span_add_link,
108    op_otel_span_update_name,
109    op_otel_metric_attribute3,
110    op_otel_metric_record0,
111    op_otel_metric_record1,
112    op_otel_metric_record2,
113    op_otel_metric_record3,
114    op_otel_metric_observable_record0,
115    op_otel_metric_observable_record1,
116    op_otel_metric_observable_record2,
117    op_otel_metric_observable_record3,
118    op_otel_metric_wait_to_observe,
119    op_otel_metric_observation_done,
120  ],
121  objects = [OtelTracer, OtelMeter, OtelSpan],
122  esm = ["telemetry.ts", "util.ts"],
123);
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct OtelRuntimeConfig {
127  pub runtime_name: Cow<'static, str>,
128  pub runtime_version: Cow<'static, str>,
129}
130
131#[derive(Default, Debug, Clone, Serialize, Deserialize)]
132pub struct OtelConfig {
133  pub tracing_enabled: bool,
134  pub metrics_enabled: bool,
135  pub console: OtelConsoleConfig,
136  pub deterministic_prefix: Option<u8>,
137  pub propagators: std::collections::HashSet<OtelPropagators>,
138}
139
140impl OtelConfig {
141  pub fn as_v8(&self) -> Box<[u8]> {
142    let mut data = vec![
143      self.tracing_enabled as u8,
144      self.metrics_enabled as u8,
145      self.console as u8,
146    ];
147
148    data.extend(self.propagators.iter().map(|propagator| *propagator as u8));
149
150    data.into_boxed_slice()
151  }
152}
153
154#[derive(
155  Default, Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash,
156)]
157#[repr(u8)]
158pub enum OtelPropagators {
159  TraceContext = 0,
160  Baggage = 1,
161  #[default]
162  None = 2,
163}
164
165#[derive(
166  Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize,
167)]
168#[repr(u8)]
169pub enum OtelConsoleConfig {
170  #[default]
171  Ignore = 0,
172  Capture = 1,
173  Replace = 2,
174}
175
176static OTEL_SHARED_RUNTIME_SPAWN_TASK_TX: Lazy<
177  UnboundedSender<BoxFuture<'static, ()>>,
178> = Lazy::new(otel_create_shared_runtime);
179
180static OTEL_PRE_COLLECT_CALLBACKS: Lazy<
181  Mutex<Vec<oneshot::Sender<oneshot::Sender<()>>>>,
182> = Lazy::new(Default::default);
183
184fn otel_create_shared_runtime() -> UnboundedSender<BoxFuture<'static, ()>> {
185  let (spawn_task_tx, mut spawn_task_rx) =
186    mpsc::unbounded::<BoxFuture<'static, ()>>();
187
188  thread::spawn(move || {
189    let rt = tokio::runtime::Builder::new_current_thread()
190      .enable_io()
191      .enable_time()
192      // This limits the number of threads for blocking operations (like for
193      // synchronous fs ops) or CPU bound tasks like when we run dprint in
194      // parallel for deno fmt.
195      // The default value is 512, which is an unhelpfully large thread pool. We
196      // don't ever want to have more than a couple dozen threads.
197      .max_blocking_threads(if cfg!(windows) {
198        // on windows, tokio uses blocking tasks for child process IO, make sure
199        // we have enough available threads for other tasks to run
200        4 * std::thread::available_parallelism()
201          .map(|n| n.get())
202          .unwrap_or(8)
203      } else {
204        32
205      })
206      .build()
207      .unwrap();
208
209    rt.block_on(async move {
210      while let Some(task) = spawn_task_rx.next().await {
211        tokio::spawn(task);
212      }
213    });
214  });
215
216  spawn_task_tx
217}
218
219#[derive(Clone, Copy)]
220pub struct OtelSharedRuntime;
221
222impl hyper::rt::Executor<BoxFuture<'static, ()>> for OtelSharedRuntime {
223  fn execute(&self, fut: BoxFuture<'static, ()>) {
224    (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
225      .unbounded_send(fut)
226      .expect("failed to send task to shared OpenTelemetry runtime");
227  }
228}
229
230impl opentelemetry_sdk::runtime::Runtime for OtelSharedRuntime {
231  type Interval = Pin<Box<dyn Stream<Item = ()> + Send + 'static>>;
232  type Delay = Pin<Box<tokio::time::Sleep>>;
233
234  fn interval(&self, period: Duration) -> Self::Interval {
235    stream::repeat(())
236      .then(move |_| tokio::time::sleep(period))
237      .boxed()
238  }
239
240  fn spawn(&self, future: BoxFuture<'static, ()>) {
241    (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
242      .unbounded_send(future)
243      .expect("failed to send task to shared OpenTelemetry runtime");
244  }
245
246  fn delay(&self, duration: Duration) -> Self::Delay {
247    Box::pin(tokio::time::sleep(duration))
248  }
249}
250
251impl opentelemetry_sdk::runtime::RuntimeChannel for OtelSharedRuntime {
252  type Receiver<T: Debug + Send> = BatchMessageChannelReceiver<T>;
253  type Sender<T: Debug + Send> = BatchMessageChannelSender<T>;
254
255  fn batch_message_channel<T: Debug + Send>(
256    &self,
257    capacity: usize,
258  ) -> (Self::Sender<T>, Self::Receiver<T>) {
259    let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<T>(capacity);
260    (batch_tx.into(), batch_rx.into())
261  }
262}
263
264#[derive(Debug)]
265pub struct BatchMessageChannelSender<T: Send> {
266  sender: tokio::sync::mpsc::Sender<T>,
267}
268
269impl<T: Send> From<tokio::sync::mpsc::Sender<T>>
270  for BatchMessageChannelSender<T>
271{
272  fn from(sender: tokio::sync::mpsc::Sender<T>) -> Self {
273    Self { sender }
274  }
275}
276
277impl<T: Send> opentelemetry_sdk::runtime::TrySend
278  for BatchMessageChannelSender<T>
279{
280  type Message = T;
281
282  fn try_send(
283    &self,
284    item: Self::Message,
285  ) -> Result<(), opentelemetry_sdk::runtime::TrySendError> {
286    self.sender.try_send(item).map_err(|err| match err {
287      tokio::sync::mpsc::error::TrySendError::Full(_) => {
288        opentelemetry_sdk::runtime::TrySendError::ChannelFull
289      }
290      tokio::sync::mpsc::error::TrySendError::Closed(_) => {
291        opentelemetry_sdk::runtime::TrySendError::ChannelClosed
292      }
293    })
294  }
295}
296
297pub struct BatchMessageChannelReceiver<T> {
298  receiver: tokio::sync::mpsc::Receiver<T>,
299}
300
301impl<T> From<tokio::sync::mpsc::Receiver<T>>
302  for BatchMessageChannelReceiver<T>
303{
304  fn from(receiver: tokio::sync::mpsc::Receiver<T>) -> Self {
305    Self { receiver }
306  }
307}
308
309impl<T> Stream for BatchMessageChannelReceiver<T> {
310  type Item = T;
311
312  fn poll_next(
313    mut self: Pin<&mut Self>,
314    cx: &mut Context<'_>,
315  ) -> Poll<Option<Self::Item>> {
316    self.receiver.poll_recv(cx)
317  }
318}
319
320enum DenoPeriodicReaderMessage {
321  Register(std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>),
322  Export,
323  ForceFlush(oneshot::Sender<MetricResult<()>>),
324  Shutdown(oneshot::Sender<MetricResult<()>>),
325}
326
327#[derive(Debug)]
328struct DenoPeriodicReader {
329  tx: tokio::sync::mpsc::Sender<DenoPeriodicReaderMessage>,
330  temporality: Temporality,
331}
332
333impl MetricReader for DenoPeriodicReader {
334  fn register_pipeline(
335    &self,
336    pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>,
337  ) {
338    let _ = self
339      .tx
340      .try_send(DenoPeriodicReaderMessage::Register(pipeline));
341  }
342
343  fn collect(
344    &self,
345    _rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
346  ) -> opentelemetry_sdk::metrics::MetricResult<()> {
347    unreachable!("collect should not be called on DenoPeriodicReader");
348  }
349
350  fn force_flush(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
351    let (tx, rx) = oneshot::channel();
352    let _ = self.tx.try_send(DenoPeriodicReaderMessage::ForceFlush(tx));
353    deno_core::futures::executor::block_on(rx).unwrap()?;
354    Ok(())
355  }
356
357  fn shutdown(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
358    let (tx, rx) = oneshot::channel();
359    let _ = self.tx.try_send(DenoPeriodicReaderMessage::Shutdown(tx));
360    deno_core::futures::executor::block_on(rx).unwrap()?;
361    Ok(())
362  }
363
364  fn temporality(
365    &self,
366    _kind: opentelemetry_sdk::metrics::InstrumentKind,
367  ) -> Temporality {
368    self.temporality
369  }
370}
371
372const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
373const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
374
375impl DenoPeriodicReader {
376  fn new(exporter: opentelemetry_otlp::MetricExporter) -> Self {
377    let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
378      .ok()
379      .and_then(|v| v.parse().map(Duration::from_millis).ok())
380      .unwrap_or(DEFAULT_INTERVAL);
381
382    let (tx, mut rx) = tokio::sync::mpsc::channel(256);
383
384    let temporality = PushMetricExporter::temporality(&exporter);
385
386    let worker = async move {
387      let inner = ManualReader::builder()
388        .with_temporality(PushMetricExporter::temporality(&exporter))
389        .build();
390
391      let collect_and_export = |collect_observed: bool| {
392        let inner = &inner;
393        let exporter = &exporter;
394        async move {
395          let mut resource_metrics =
396            opentelemetry_sdk::metrics::data::ResourceMetrics {
397              resource: Default::default(),
398              scope_metrics: Default::default(),
399            };
400          if collect_observed {
401            let callbacks = {
402              let mut callbacks = OTEL_PRE_COLLECT_CALLBACKS.lock().unwrap();
403              std::mem::take(&mut *callbacks)
404            };
405            let mut futures = JoinSet::new();
406            for callback in callbacks {
407              let (tx, rx) = oneshot::channel();
408              if let Ok(()) = callback.send(tx) {
409                futures.spawn(rx);
410              }
411            }
412            while futures.join_next().await.is_some() {}
413          }
414          inner.collect(&mut resource_metrics)?;
415          if resource_metrics.scope_metrics.is_empty() {
416            return Ok(());
417          }
418          exporter.export(&mut resource_metrics).await?;
419          Ok(())
420        }
421      };
422
423      let mut ticker = tokio::time::interval(interval);
424      ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
425      ticker.tick().await;
426
427      loop {
428        let message = tokio::select! {
429          _ = ticker.tick() => DenoPeriodicReaderMessage::Export,
430          message = rx.recv() => if let Some(message) = message {
431            message
432          } else {
433            break;
434          },
435        };
436
437        match message {
438          DenoPeriodicReaderMessage::Register(new_pipeline) => {
439            inner.register_pipeline(new_pipeline);
440          }
441          DenoPeriodicReaderMessage::Export => {
442            otel_debug!(
443                name: "DenoPeriodicReader.ExportTriggered",
444                message = "Export message received.",
445            );
446            if let Err(err) = collect_and_export(true).await {
447              otel_error!(
448                name: "DenoPeriodicReader.ExportFailed",
449                message = "Failed to export metrics",
450                reason = format!("{}", err));
451            }
452          }
453          DenoPeriodicReaderMessage::ForceFlush(sender) => {
454            otel_debug!(
455                name: "DenoPeriodicReader.ForceFlushCalled",
456                message = "Flush message received.",
457            );
458            let res = collect_and_export(false).await;
459            if let Err(send_error) = sender.send(res) {
460              otel_debug!(
461                  name: "DenoPeriodicReader.Flush.SendResultError",
462                  message = "Failed to send flush result.",
463                  reason = format!("{:?}", send_error),
464              );
465            }
466          }
467          DenoPeriodicReaderMessage::Shutdown(sender) => {
468            otel_debug!(
469                name: "DenoPeriodicReader.ShutdownCalled",
470                message = "Shutdown message received",
471            );
472            let res = collect_and_export(false).await;
473            let _ = exporter.shutdown();
474            if let Err(send_error) = sender.send(res) {
475              otel_debug!(
476                  name: "DenoPeriodicReader.Shutdown.SendResultError",
477                  message = "Failed to send shutdown result",
478                  reason = format!("{:?}", send_error),
479              );
480            }
481            break;
482          }
483        }
484      }
485    };
486
487    (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
488      .unbounded_send(worker.boxed())
489      .expect("failed to send task to shared OpenTelemetry runtime");
490
491    DenoPeriodicReader { tx, temporality }
492  }
493}
494
495mod hyper_client {
496  use std::fmt::Debug;
497  use std::pin::Pin;
498  use std::task::Poll;
499
500  use deno_net::tunnel::TunnelConnection;
501  use deno_net::tunnel::TunnelStream;
502  use deno_net::tunnel::get_tunnel;
503  use deno_tls::SocketUse;
504  use deno_tls::TlsKey;
505  use deno_tls::TlsKeys;
506  use deno_tls::create_client_config;
507  use deno_tls::load_certs;
508  use deno_tls::load_private_keys;
509  use http_body_util::BodyExt;
510  use http_body_util::Full;
511  use hyper::Uri;
512  use hyper_rustls::HttpsConnector;
513  use hyper_rustls::MaybeHttpsStream;
514  use hyper_util::client::legacy::Client;
515  use hyper_util::client::legacy::connect::Connected;
516  use hyper_util::client::legacy::connect::HttpConnector;
517  use hyper_util::rt::TokioIo;
518  use opentelemetry_http::Bytes;
519  use opentelemetry_http::HttpError;
520  use opentelemetry_http::Request;
521  use opentelemetry_http::Response;
522  use opentelemetry_http::ResponseExt;
523  use tokio::net::TcpStream;
524  #[cfg(any(
525    target_os = "android",
526    target_os = "linux",
527    target_os = "macos"
528  ))]
529  use tokio_vsock::VsockAddr;
530  #[cfg(any(
531    target_os = "android",
532    target_os = "linux",
533    target_os = "macos"
534  ))]
535  use tokio_vsock::VsockStream;
536
537  use super::OtelSharedRuntime;
538
539  #[derive(Debug, thiserror::Error)]
540  enum Error {
541    #[error(transparent)]
542    StdIo(#[from] std::io::Error),
543    #[error(transparent)]
544    Box(#[from] Box<dyn std::error::Error + Send + Sync>),
545    #[error(transparent)]
546    Tunnel(#[from] deno_net::tunnel::Error),
547  }
548
549  #[derive(Debug, Clone)]
550  enum Connector {
551    Http(HttpsConnector<HttpConnector>),
552    Tunnel(TunnelConnection),
553    #[cfg(any(
554      target_os = "android",
555      target_os = "linux",
556      target_os = "macos"
557    ))]
558    Vsock(VsockAddr),
559  }
560
561  #[allow(clippy::large_enum_variant)]
562  #[pin_project::pin_project(project = IOProj)]
563  enum IO {
564    Tls(#[pin] TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>),
565    Tunnel(#[pin] TunnelStream),
566    #[cfg(any(
567      target_os = "android",
568      target_os = "linux",
569      target_os = "macos"
570    ))]
571    Vsock(#[pin] VsockStream),
572  }
573
574  impl tokio::io::AsyncRead for IO {
575    fn poll_read(
576      self: std::pin::Pin<&mut Self>,
577      cx: &mut std::task::Context<'_>,
578      buf: &mut tokio::io::ReadBuf<'_>,
579    ) -> Poll<std::io::Result<()>> {
580      match self.project() {
581        IOProj::Tls(stream) => stream.poll_read(cx, buf),
582        IOProj::Tunnel(stream) => stream.poll_read(cx, buf),
583        #[cfg(any(
584          target_os = "android",
585          target_os = "linux",
586          target_os = "macos"
587        ))]
588        IOProj::Vsock(stream) => stream.poll_read(cx, buf),
589      }
590    }
591  }
592
593  impl tokio::io::AsyncWrite for IO {
594    fn poll_write(
595      self: std::pin::Pin<&mut Self>,
596      cx: &mut std::task::Context<'_>,
597      buf: &[u8],
598    ) -> Poll<Result<usize, std::io::Error>> {
599      match self.project() {
600        IOProj::Tls(stream) => stream.poll_write(cx, buf),
601        IOProj::Tunnel(stream) => stream.poll_write(cx, buf),
602        #[cfg(any(
603          target_os = "android",
604          target_os = "linux",
605          target_os = "macos"
606        ))]
607        IOProj::Vsock(stream) => stream.poll_write(cx, buf),
608      }
609    }
610
611    fn poll_flush(
612      self: std::pin::Pin<&mut Self>,
613      cx: &mut std::task::Context<'_>,
614    ) -> Poll<Result<(), std::io::Error>> {
615      match self.project() {
616        IOProj::Tls(stream) => stream.poll_flush(cx),
617        IOProj::Tunnel(stream) => stream.poll_flush(cx),
618        #[cfg(any(
619          target_os = "android",
620          target_os = "linux",
621          target_os = "macos"
622        ))]
623        IOProj::Vsock(stream) => stream.poll_flush(cx),
624      }
625    }
626
627    fn poll_shutdown(
628      self: std::pin::Pin<&mut Self>,
629      cx: &mut std::task::Context<'_>,
630    ) -> Poll<Result<(), std::io::Error>> {
631      match self.project() {
632        IOProj::Tls(stream) => stream.poll_shutdown(cx),
633        IOProj::Tunnel(stream) => stream.poll_shutdown(cx),
634        #[cfg(any(
635          target_os = "android",
636          target_os = "linux",
637          target_os = "macos"
638        ))]
639        IOProj::Vsock(stream) => stream.poll_shutdown(cx),
640      }
641    }
642
643    fn is_write_vectored(&self) -> bool {
644      match self {
645        IO::Tls(stream) => stream.is_write_vectored(),
646        IO::Tunnel(stream) => stream.is_write_vectored(),
647        #[cfg(any(
648          target_os = "android",
649          target_os = "linux",
650          target_os = "macos"
651        ))]
652        IO::Vsock(stream) => stream.is_write_vectored(),
653      }
654    }
655
656    fn poll_write_vectored(
657      self: std::pin::Pin<&mut Self>,
658      cx: &mut std::task::Context<'_>,
659      bufs: &[std::io::IoSlice<'_>],
660    ) -> Poll<Result<usize, std::io::Error>> {
661      match self.project() {
662        IOProj::Tls(stream) => stream.poll_write_vectored(cx, bufs),
663        IOProj::Tunnel(stream) => stream.poll_write_vectored(cx, bufs),
664        #[cfg(any(
665          target_os = "android",
666          target_os = "linux",
667          target_os = "macos"
668        ))]
669        IOProj::Vsock(stream) => stream.poll_write_vectored(cx, bufs),
670      }
671    }
672  }
673
674  impl hyper_util::client::legacy::connect::Connection for IO {
675    fn connected(&self) -> Connected {
676      match self {
677        Self::Tls(stream) => stream.connected(),
678        Self::Tunnel(_) => Connected::new().proxy(true),
679        #[cfg(any(
680          target_os = "android",
681          target_os = "linux",
682          target_os = "macos"
683        ))]
684        Self::Vsock(_) => Connected::new().proxy(true),
685      }
686    }
687  }
688
689  impl tower_service::Service<Uri> for Connector {
690    type Response = TokioIo<IO>;
691    type Error = Error;
692    type Future = Pin<
693      Box<
694        dyn std::future::Future<Output = Result<Self::Response, Self::Error>>
695          + Send,
696      >,
697    >;
698
699    fn poll_ready(
700      &mut self,
701      cx: &mut std::task::Context<'_>,
702    ) -> Poll<Result<(), Self::Error>> {
703      match self {
704        Self::Http(c) => c.poll_ready(cx).map_err(Into::into),
705        Self::Tunnel(_) => Poll::Ready(Ok(())),
706        #[cfg(any(
707          target_os = "android",
708          target_os = "linux",
709          target_os = "macos"
710        ))]
711        Self::Vsock(_) => Poll::Ready(Ok(())),
712      }
713    }
714
715    fn call(&mut self, dst: Uri) -> Self::Future {
716      let this = self.clone();
717      Box::pin(async move {
718        match this {
719          Self::Http(mut connector) => {
720            let stream = connector.call(dst).await?;
721            Ok(TokioIo::new(IO::Tls(TokioIo::new(stream))))
722          }
723          Self::Tunnel(listener) => {
724            let stream = listener.create_agent_stream().await?;
725            Ok(TokioIo::new(IO::Tunnel(stream)))
726          }
727          #[cfg(any(
728            target_os = "android",
729            target_os = "linux",
730            target_os = "macos"
731          ))]
732          Self::Vsock(addr) => {
733            let stream = VsockStream::connect(addr).await?;
734            Ok(TokioIo::new(IO::Vsock(stream)))
735          }
736        }
737      })
738    }
739  }
740
741  #[derive(Debug, Clone)]
742  pub struct HyperClient {
743    inner: Client<Connector, Full<Bytes>>,
744  }
745
746  impl HyperClient {
747    pub fn new() -> deno_core::anyhow::Result<Self> {
748      let connector = if let Some(tunnel) = get_tunnel() {
749        Connector::Tunnel(tunnel.clone())
750      } else if let Ok(addr) = std::env::var("OTEL_DENO_VSOCK") {
751        #[cfg(not(any(
752          target_os = "android",
753          target_os = "linux",
754          target_os = "macos"
755        )))]
756        {
757          let _ = addr;
758          deno_core::anyhow::bail!("vsock is not supported on this platform")
759        }
760
761        #[cfg(any(
762          target_os = "android",
763          target_os = "linux",
764          target_os = "macos"
765        ))]
766        {
767          let Some((cid, port)) = addr.split_once(':') else {
768            deno_core::anyhow::bail!("invalid vsock addr");
769          };
770          let cid = if cid == "-1" { u32::MAX } else { cid.parse()? };
771          let port = port.parse()?;
772          let addr = VsockAddr::new(cid, port);
773          Connector::Vsock(addr)
774        }
775      } else {
776        let ca_certs = match std::env::var("OTEL_EXPORTER_OTLP_CERTIFICATE") {
777          Ok(path) => vec![std::fs::read(path)?],
778          _ => vec![],
779        };
780
781        let keys = match (
782          std::env::var("OTEL_EXPORTER_OTLP_CLIENT_KEY"),
783          std::env::var("OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE"),
784        ) {
785          (Ok(key_path), Ok(cert_path)) => {
786            let key = std::fs::read(key_path)?;
787            let cert = std::fs::read(cert_path)?;
788
789            let certs = load_certs(&mut std::io::Cursor::new(cert))?;
790            let key = load_private_keys(&key)?.into_iter().next().unwrap();
791
792            TlsKeys::Static(TlsKey(certs, key))
793          }
794          _ => TlsKeys::Null,
795        };
796
797        let tls_config =
798          create_client_config(deno_tls::TlsClientConfigOptions {
799            root_cert_store: None,
800            ca_certs,
801            unsafely_ignore_certificate_errors: None,
802            unsafely_disable_hostname_verification: false,
803            cert_chain_and_key: keys,
804            socket_use: SocketUse::Http,
805          })?;
806        let mut http_connector = HttpConnector::new();
807        http_connector.enforce_http(false);
808        let connector = HttpsConnector::from((http_connector, tls_config));
809        Connector::Http(connector)
810      };
811
812      Ok(Self {
813        inner: Client::builder(OtelSharedRuntime).build(connector),
814      })
815    }
816  }
817
818  #[async_trait::async_trait]
819  impl opentelemetry_http::HttpClient for HyperClient {
820    async fn send(
821      &self,
822      request: Request<Vec<u8>>,
823    ) -> Result<Response<Bytes>, HttpError> {
824      let (parts, body) = request.into_parts();
825      let request = Request::from_parts(parts, Full::from(body));
826      let response = self.inner.request(request).await?;
827      let (parts, body) = response.into_parts();
828      let body = body.collect().await?.to_bytes();
829      let response = Response::from_parts(parts, body);
830      Ok(response.error_for_status()?)
831    }
832  }
833}
834
835#[derive(Debug)]
836pub struct OtelGlobals {
837  pub span_processor: BatchSpanProcessor<OtelSharedRuntime>,
838  pub log_processor: BatchLogProcessor<OtelSharedRuntime>,
839  pub id_generator: DenoIdGenerator,
840  pub meter_provider: SdkMeterProvider,
841  pub builtin_instrumentation_scope: InstrumentationScope,
842  pub config: OtelConfig,
843}
844
845impl OtelGlobals {
846  pub fn has_tracing(&self) -> bool {
847    self.config.tracing_enabled
848  }
849
850  pub fn has_metrics(&self) -> bool {
851    self.config.metrics_enabled
852  }
853}
854
855pub static OTEL_GLOBALS: OnceCell<OtelGlobals> = OnceCell::new();
856
857pub fn init(
858  rt_config: OtelRuntimeConfig,
859  config: OtelConfig,
860) -> deno_core::anyhow::Result<()> {
861  if !config.metrics_enabled
862    && !config.tracing_enabled
863    && config.console == OtelConsoleConfig::Ignore
864  {
865    return Ok(());
866  }
867
868  // Parse the `OTEL_EXPORTER_OTLP_PROTOCOL` variable. The opentelemetry_*
869  // crates don't do this automatically.
870  // TODO(piscisaureus): enable GRPC support.
871  let protocol = match env::var("OTEL_EXPORTER_OTLP_PROTOCOL").as_deref() {
872    Ok("http/protobuf") => Protocol::HttpBinary,
873    Ok("http/json") => Protocol::HttpJson,
874    Ok("") | Err(env::VarError::NotPresent) => Protocol::HttpBinary,
875    Ok(protocol) => {
876      return Err(deno_core::anyhow::anyhow!(
877        "Env var OTEL_EXPORTER_OTLP_PROTOCOL specifies an unsupported protocol: {}",
878        protocol
879      ));
880    }
881    Err(err) => {
882      return Err(deno_core::anyhow::anyhow!(
883        "Failed to read env var OTEL_EXPORTER_OTLP_PROTOCOL: {}",
884        err
885      ));
886    }
887  };
888
889  // Define the resource attributes that will be attached to all log records.
890  // These attributes are sourced as follows (in order of precedence):
891  //   * The `service.name` attribute from the `OTEL_SERVICE_NAME` env var.
892  //   * Additional attributes from the `OTEL_RESOURCE_ATTRIBUTES` env var.
893  //   * Default attribute values defined here.
894  // TODO(piscisaureus): add more default attributes (e.g. script path).
895  let mut resource = Resource::default();
896
897  // Add the runtime name and version to the resource attributes. Also override
898  // the `telemetry.sdk` attributes to include the Deno runtime.
899  resource = resource.merge(&Resource::new(vec![
900    KeyValue::new(PROCESS_RUNTIME_NAME, rt_config.runtime_name),
901    KeyValue::new(PROCESS_RUNTIME_VERSION, rt_config.runtime_version.clone()),
902    KeyValue::new(
903      TELEMETRY_SDK_LANGUAGE,
904      format!(
905        "deno-{}",
906        resource.get(Key::new(TELEMETRY_SDK_LANGUAGE)).unwrap()
907      ),
908    ),
909    KeyValue::new(
910      TELEMETRY_SDK_NAME,
911      format!(
912        "deno-{}",
913        resource.get(Key::new(TELEMETRY_SDK_NAME)).unwrap()
914      ),
915    ),
916    KeyValue::new(
917      TELEMETRY_SDK_VERSION,
918      format!(
919        "{}-{}",
920        rt_config.runtime_version,
921        resource.get(Key::new(TELEMETRY_SDK_VERSION)).unwrap()
922      ),
923    ),
924  ]));
925
926  // The OTLP endpoint is automatically picked up from the
927  // `OTEL_EXPORTER_OTLP_ENDPOINT` environment variable. Additional headers can
928  // be specified using `OTEL_EXPORTER_OTLP_HEADERS`.
929
930  let client = hyper_client::HyperClient::new()?;
931
932  let span_exporter = HttpExporterBuilder::default()
933    .with_http_client(client.clone())
934    .with_protocol(protocol)
935    .build_span_exporter()?;
936  let mut span_processor =
937    BatchSpanProcessor::builder(span_exporter, OtelSharedRuntime).build();
938  span_processor.set_resource(&resource);
939
940  let temporality_preference =
941    env::var("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE")
942      .ok()
943      .map(|s| s.to_lowercase());
944  let temporality = match temporality_preference.as_deref() {
945    None | Some("cumulative") => Temporality::Cumulative,
946    Some("delta") => Temporality::Delta,
947    Some("lowmemory") => Temporality::LowMemory,
948    Some(other) => {
949      return Err(deno_core::anyhow::anyhow!(
950        "Invalid value for OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: {}",
951        other
952      ));
953    }
954  };
955  let metric_exporter = HttpExporterBuilder::default()
956    .with_http_client(client.clone())
957    .with_protocol(protocol)
958    .build_metrics_exporter(temporality)?;
959  let metric_reader = DenoPeriodicReader::new(metric_exporter);
960  let meter_provider = SdkMeterProvider::builder()
961    .with_reader(metric_reader)
962    .with_resource(resource.clone())
963    .build();
964
965  let log_exporter = HttpExporterBuilder::default()
966    .with_http_client(client)
967    .with_protocol(protocol)
968    .build_log_exporter()?;
969  let log_processor =
970    BatchLogProcessor::builder(log_exporter, OtelSharedRuntime).build();
971  log_processor.set_resource(&resource);
972
973  let builtin_instrumentation_scope =
974    opentelemetry::InstrumentationScope::builder("deno")
975      .with_version(rt_config.runtime_version.clone())
976      .build();
977
978  let id_generator = if let Some(prefix) = config.deterministic_prefix {
979    DenoIdGenerator::deterministic(prefix)
980  } else {
981    DenoIdGenerator::random()
982  };
983
984  OTEL_GLOBALS
985    .set(OtelGlobals {
986      log_processor,
987      span_processor,
988      id_generator,
989      meter_provider,
990      builtin_instrumentation_scope,
991      config,
992    })
993    .map_err(|_| deno_core::anyhow::anyhow!("failed to set otel globals"))?;
994
995  deno_signals::before_exit(before_exit);
996  deno_net::tunnel::disable_before_exit();
997
998  Ok(())
999}
1000
1001fn before_exit() {
1002  log::trace!("deno_telemetry::before_exit");
1003
1004  let Some(OtelGlobals {
1005    span_processor: spans,
1006    log_processor: logs,
1007    meter_provider,
1008    ..
1009  }) = OTEL_GLOBALS.get()
1010  else {
1011    return;
1012  };
1013
1014  let r = spans.shutdown();
1015  log::trace!("spans={:?}", r);
1016
1017  let r = logs.shutdown();
1018  log::trace!("logs={:?}", r);
1019
1020  let r = meter_provider.shutdown();
1021  log::trace!("meters={:?}", r);
1022
1023  deno_net::tunnel::before_exit();
1024}
1025
1026pub fn handle_log(record: &log::Record) {
1027  use log::Level;
1028
1029  let Some(OtelGlobals {
1030    log_processor: logs,
1031    builtin_instrumentation_scope,
1032    ..
1033  }) = OTEL_GLOBALS.get()
1034  else {
1035    return;
1036  };
1037
1038  let mut log_record = LogRecord::default();
1039
1040  let now = SystemTime::now();
1041  log_record.set_timestamp(now);
1042  log_record.set_observed_timestamp(now);
1043  log_record.set_severity_number(match record.level() {
1044    Level::Error => Severity::Error,
1045    Level::Warn => Severity::Warn,
1046    Level::Info => Severity::Info,
1047    Level::Debug => Severity::Debug,
1048    Level::Trace => Severity::Trace,
1049  });
1050  log_record.set_severity_text(record.level().as_str());
1051  log_record.set_body(record.args().to_string().into());
1052  log_record.set_target(record.metadata().target().to_string());
1053
1054  struct Visitor<'s>(&'s mut LogRecord);
1055
1056  impl<'kvs> log::kv::VisitSource<'kvs> for Visitor<'_> {
1057    fn visit_pair(
1058      &mut self,
1059      key: log::kv::Key<'kvs>,
1060      value: log::kv::Value<'kvs>,
1061    ) -> Result<(), log::kv::Error> {
1062      #[allow(clippy::manual_map)]
1063      let value = if let Some(v) = value.to_bool() {
1064        Some(AnyValue::Boolean(v))
1065      } else if let Some(v) = value.to_borrowed_str() {
1066        Some(AnyValue::String(v.to_owned().into()))
1067      } else if let Some(v) = value.to_f64() {
1068        Some(AnyValue::Double(v))
1069      } else if let Some(v) = value.to_i64() {
1070        Some(AnyValue::Int(v))
1071      } else {
1072        None
1073      };
1074
1075      if let Some(value) = value {
1076        let key = Key::from(key.as_str().to_owned());
1077        self.0.add_attribute(key, value);
1078      }
1079
1080      Ok(())
1081    }
1082  }
1083
1084  let _ = record.key_values().visit(&mut Visitor(&mut log_record));
1085
1086  logs.emit(&mut log_record, builtin_instrumentation_scope);
1087}
1088
1089#[derive(Debug)]
1090pub enum DenoIdGenerator {
1091  Random(RandomIdGenerator),
1092  Deterministic {
1093    next_trace_id: AtomicU64,
1094    next_span_id: AtomicU64,
1095  },
1096}
1097
1098impl IdGenerator for DenoIdGenerator {
1099  fn new_trace_id(&self) -> TraceId {
1100    match self {
1101      Self::Random(generator) => generator.new_trace_id(),
1102      Self::Deterministic { next_trace_id, .. } => {
1103        let id =
1104          next_trace_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1105        let bytes = id.to_be_bytes();
1106        let bytes = [
1107          0, 0, 0, 0, 0, 0, 0, 0, bytes[0], bytes[1], bytes[2], bytes[3],
1108          bytes[4], bytes[5], bytes[6], bytes[7],
1109        ];
1110        TraceId::from_bytes(bytes)
1111      }
1112    }
1113  }
1114
1115  fn new_span_id(&self) -> SpanId {
1116    match self {
1117      Self::Random(generator) => generator.new_span_id(),
1118      Self::Deterministic { next_span_id, .. } => {
1119        let id =
1120          next_span_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1121        SpanId::from_bytes(id.to_be_bytes())
1122      }
1123    }
1124  }
1125}
1126
1127impl DenoIdGenerator {
1128  fn random() -> Self {
1129    Self::Random(RandomIdGenerator::default())
1130  }
1131
1132  fn deterministic(prefix: u8) -> Self {
1133    let prefix = u64::from(prefix) << 56;
1134    Self::Deterministic {
1135      next_trace_id: AtomicU64::new(prefix + 1),
1136      next_span_id: AtomicU64::new(prefix + 1),
1137    }
1138  }
1139}
1140
1141fn parse_trace_id(
1142  scope: &mut v8::PinScope<'_, '_>,
1143  trace_id: v8::Local<'_, v8::Value>,
1144) -> TraceId {
1145  if let Ok(string) = trace_id.try_cast() {
1146    let value_view = v8::ValueView::new(scope, string);
1147    match value_view.data() {
1148      v8::ValueViewData::OneByte(bytes) => {
1149        TraceId::from_hex(&String::from_utf8_lossy(bytes))
1150          .unwrap_or(TraceId::INVALID)
1151      }
1152
1153      _ => TraceId::INVALID,
1154    }
1155  } else if let Ok(uint8array) = trace_id.try_cast::<v8::Uint8Array>() {
1156    let data = uint8array.data();
1157    let byte_length = uint8array.byte_length();
1158    if byte_length != 16 {
1159      return TraceId::INVALID;
1160    }
1161    // SAFETY: We have ensured that the byte length is 16, so it is safe to
1162    // cast the data to an array of 16 bytes.
1163    let bytes = unsafe { &*(data as *const u8 as *const [u8; 16]) };
1164    TraceId::from_bytes(*bytes)
1165  } else {
1166    TraceId::INVALID
1167  }
1168}
1169
1170fn parse_span_id(
1171  scope: &mut v8::PinScope<'_, '_>,
1172  span_id: v8::Local<'_, v8::Value>,
1173) -> SpanId {
1174  if let Ok(string) = span_id.try_cast() {
1175    let value_view = v8::ValueView::new(scope, string);
1176    match value_view.data() {
1177      v8::ValueViewData::OneByte(bytes) => {
1178        SpanId::from_hex(&String::from_utf8_lossy(bytes))
1179          .unwrap_or(SpanId::INVALID)
1180      }
1181      _ => SpanId::INVALID,
1182    }
1183  } else if let Ok(uint8array) = span_id.try_cast::<v8::Uint8Array>() {
1184    let data = uint8array.data();
1185    let byte_length = uint8array.byte_length();
1186    if byte_length != 8 {
1187      return SpanId::INVALID;
1188    }
1189    // SAFETY: We have ensured that the byte length is 8, so it is safe to
1190    // cast the data to an array of 8 bytes.
1191    let bytes = unsafe { &*(data as *const u8 as *const [u8; 8]) };
1192    SpanId::from_bytes(*bytes)
1193  } else {
1194    SpanId::INVALID
1195  }
1196}
1197
1198macro_rules! attr_raw {
1199  ($scope:ident, $name:expr, $value:expr) => {{
1200    let name = if let Ok(name) = $name.try_cast() {
1201      let view = v8::ValueView::new($scope, name);
1202      match view.data() {
1203        v8::ValueViewData::OneByte(bytes) => {
1204          Some(String::from_utf8_lossy(bytes).into_owned())
1205        }
1206        v8::ValueViewData::TwoByte(bytes) => {
1207          Some(String::from_utf16_lossy(bytes))
1208        }
1209      }
1210    } else {
1211      None
1212    };
1213    let value = if let Ok(string) = $value.try_cast::<v8::String>() {
1214      Some(Value::String(StringValue::from({
1215        let x = v8::ValueView::new($scope, string);
1216        match x.data() {
1217          v8::ValueViewData::OneByte(bytes) => {
1218            String::from_utf8_lossy(bytes).into_owned()
1219          }
1220          v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes),
1221        }
1222      })))
1223    } else if let Ok(number) = $value.try_cast::<v8::Number>() {
1224      Some(Value::F64(number.value()))
1225    } else if let Ok(boolean) = $value.try_cast::<v8::Boolean>() {
1226      Some(Value::Bool(boolean.is_true()))
1227    } else if let Ok(bigint) = $value.try_cast::<v8::BigInt>() {
1228      let (i64_value, _lossless) = bigint.i64_value();
1229      Some(Value::I64(i64_value))
1230    } else if let Ok(_array) = $value.try_cast::<v8::Array>() {
1231      // TODO: implement array attributes
1232      None
1233    } else {
1234      None
1235    };
1236    if let (Some(name), Some(value)) = (name, value) {
1237      Some(KeyValue::new(name, value))
1238    } else {
1239      None
1240    }
1241  }};
1242}
1243
1244macro_rules! attr {
1245  ($scope:ident, $attributes:expr $(=> $dropped_attributes_count:expr)?, $name:expr, $value:expr) => {
1246    let attr = attr_raw!($scope, $name, $value);
1247    if let Some(kv) = attr {
1248      $attributes.push(kv);
1249    }
1250    $(
1251      else {
1252        $dropped_attributes_count += 1;
1253      }
1254    )?
1255  };
1256}
1257
1258#[op2(fast)]
1259fn op_otel_log<'s>(
1260  scope: &mut v8::PinScope<'s, '_>,
1261  message: v8::Local<'s, v8::Value>,
1262  #[smi] level: i32,
1263  span: v8::Local<'s, v8::Value>,
1264) {
1265  let Some(OtelGlobals {
1266    log_processor,
1267    builtin_instrumentation_scope,
1268    ..
1269  }) = OTEL_GLOBALS.get()
1270  else {
1271    return;
1272  };
1273
1274  // Convert the integer log level that ext/console uses to the corresponding
1275  // OpenTelemetry log severity.
1276  let severity = match level {
1277    ..=0 => Severity::Debug,
1278    1 => Severity::Info,
1279    2 => Severity::Warn,
1280    3 | 5.. => Severity::Error,
1281    4 => Severity::Trace,
1282  };
1283
1284  let mut log_record = LogRecord::default();
1285  let now = SystemTime::now();
1286  log_record.set_timestamp(now);
1287  log_record.set_observed_timestamp(now);
1288  let Ok(message) = message.try_cast() else {
1289    return;
1290  };
1291  log_record.set_body(owned_string(scope, message).into());
1292  log_record.set_severity_number(severity);
1293  log_record.set_severity_text(severity.name());
1294  if let Some(span) =
1295    deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1296  {
1297    let state = span.0.borrow();
1298    match &**state {
1299      OtelSpanState::Recording(span) => {
1300        log_record.set_trace_context(
1301          span.span_context.trace_id(),
1302          span.span_context.span_id(),
1303          Some(span.span_context.trace_flags()),
1304        );
1305      }
1306      OtelSpanState::Done(span_context) => {
1307        log_record.set_trace_context(
1308          span_context.trace_id(),
1309          span_context.span_id(),
1310          Some(span_context.trace_flags()),
1311        );
1312      }
1313    }
1314  }
1315
1316  log_processor.emit(&mut log_record, builtin_instrumentation_scope);
1317}
1318
1319#[op2(fast)]
1320fn op_otel_log_foreign(
1321  scope: &mut v8::PinScope<'_, '_>,
1322  #[string] message: String,
1323  #[smi] level: i32,
1324  trace_id: v8::Local<'_, v8::Value>,
1325  span_id: v8::Local<'_, v8::Value>,
1326  #[smi] trace_flags: u8,
1327) {
1328  let Some(OtelGlobals {
1329    log_processor,
1330    builtin_instrumentation_scope,
1331    ..
1332  }) = OTEL_GLOBALS.get()
1333  else {
1334    return;
1335  };
1336
1337  // Convert the integer log level that ext/console uses to the corresponding
1338  // OpenTelemetry log severity.
1339  let severity = match level {
1340    ..=0 => Severity::Debug,
1341    1 => Severity::Info,
1342    2 => Severity::Warn,
1343    3 | 5.. => Severity::Error,
1344    4 => Severity::Trace,
1345  };
1346
1347  let trace_id = parse_trace_id(scope, trace_id);
1348  let span_id = parse_span_id(scope, span_id);
1349
1350  let mut log_record = LogRecord::default();
1351
1352  let now = SystemTime::now();
1353  log_record.set_timestamp(now);
1354  log_record.set_observed_timestamp(now);
1355  log_record.set_body(message.into());
1356  log_record.set_severity_number(severity);
1357  log_record.set_severity_text(severity.name());
1358  if trace_id != TraceId::INVALID && span_id != SpanId::INVALID {
1359    log_record.set_trace_context(
1360      trace_id,
1361      span_id,
1362      Some(TraceFlags::new(trace_flags)),
1363    );
1364  }
1365
1366  log_processor.emit(&mut log_record, builtin_instrumentation_scope);
1367}
1368
1369pub fn report_event(name: &'static str, data: impl std::fmt::Display) {
1370  let Some(OtelGlobals {
1371    log_processor,
1372    builtin_instrumentation_scope,
1373    ..
1374  }) = OTEL_GLOBALS.get()
1375  else {
1376    return;
1377  };
1378
1379  let mut log_record = LogRecord::default();
1380
1381  log_record.set_observed_timestamp(SystemTime::now());
1382  log_record.set_event_name(name);
1383  log_record.set_severity_number(Severity::Trace);
1384  log_record.set_severity_text(Severity::Trace.name());
1385  log_record.set_body(format!("{data}").into());
1386
1387  log_processor.emit(&mut log_record, builtin_instrumentation_scope);
1388}
1389
1390fn owned_string<'s>(
1391  scope: &mut v8::PinScope<'s, '_>,
1392  string: v8::Local<'s, v8::String>,
1393) -> String {
1394  let x = v8::ValueView::new(scope, string);
1395  match x.data() {
1396    v8::ValueViewData::OneByte(bytes) => {
1397      String::from_utf8_lossy(bytes).into_owned()
1398    }
1399    v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes),
1400  }
1401}
1402
1403struct OtelTracer(InstrumentationScope);
1404
1405// SAFETY: we're sure this can be GCed
1406unsafe impl deno_core::GarbageCollected for OtelTracer {
1407  fn trace(&self, _visitor: &mut deno_core::v8::cppgc::Visitor) {}
1408
1409  fn get_name(&self) -> &'static std::ffi::CStr {
1410    c"OtelTracer"
1411  }
1412}
1413
1414#[op2]
1415impl OtelTracer {
1416  #[constructor]
1417  #[cppgc]
1418  fn new(
1419    #[string] name: String,
1420    #[string] version: Option<String>,
1421    #[string] schema_url: Option<String>,
1422  ) -> OtelTracer {
1423    let mut builder = opentelemetry::InstrumentationScope::builder(name);
1424    if let Some(version) = version {
1425      builder = builder.with_version(version);
1426    }
1427    if let Some(schema_url) = schema_url {
1428      builder = builder.with_schema_url(schema_url);
1429    }
1430    let scope = builder.build();
1431    OtelTracer(scope)
1432  }
1433
1434  #[static_method]
1435  #[cppgc]
1436  fn builtin() -> OtelTracer {
1437    let OtelGlobals {
1438      builtin_instrumentation_scope,
1439      ..
1440    } = OTEL_GLOBALS.get().unwrap();
1441    OtelTracer(builtin_instrumentation_scope.clone())
1442  }
1443
1444  #[cppgc]
1445  fn start_span<'s>(
1446    &self,
1447    scope: &mut v8::PinScope<'s, '_>,
1448    #[cppgc] parent: Option<&OtelSpan>,
1449    name: v8::Local<'s, v8::Value>,
1450    #[smi] span_kind: u8,
1451    start_time: Option<f64>,
1452    #[smi] attribute_count: usize,
1453  ) -> Result<OtelSpan, JsErrorBox> {
1454    let OtelGlobals { id_generator, .. } = OTEL_GLOBALS.get().unwrap();
1455    let span_context;
1456    let parent_span_id;
1457    match parent {
1458      Some(parent) => {
1459        let parent = parent.0.borrow();
1460        let parent_span_context = match &**parent {
1461          OtelSpanState::Recording(span) => &span.span_context,
1462          OtelSpanState::Done(span_context) => span_context,
1463        };
1464        span_context = SpanContext::new(
1465          parent_span_context.trace_id(),
1466          id_generator.new_span_id(),
1467          TraceFlags::SAMPLED,
1468          false,
1469          parent_span_context.trace_state().clone(),
1470        );
1471        parent_span_id = parent_span_context.span_id();
1472      }
1473      None => {
1474        span_context = SpanContext::new(
1475          id_generator.new_trace_id(),
1476          id_generator.new_span_id(),
1477          TraceFlags::SAMPLED,
1478          false,
1479          TraceState::NONE,
1480        );
1481        parent_span_id = SpanId::INVALID;
1482      }
1483    }
1484    let name = owned_string(
1485      scope,
1486      name
1487        .try_cast()
1488        .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
1489    );
1490    let span_kind = match span_kind {
1491      0 => SpanKind::Internal,
1492      1 => SpanKind::Server,
1493      2 => SpanKind::Client,
1494      3 => SpanKind::Producer,
1495      4 => SpanKind::Consumer,
1496      _ => return Err(JsErrorBox::generic("invalid span kind")),
1497    };
1498    let start_time = start_time
1499      .map(|start_time| {
1500        SystemTime::UNIX_EPOCH
1501          .checked_add(std::time::Duration::from_secs_f64(start_time / 1000.0))
1502          .ok_or_else(|| JsErrorBox::generic("invalid start time"))
1503      })
1504      .unwrap_or_else(|| Ok(SystemTime::now()))?;
1505    let span_data = SpanData {
1506      span_context,
1507      parent_span_id,
1508      span_kind,
1509      name: Cow::Owned(name),
1510      start_time,
1511      end_time: SystemTime::UNIX_EPOCH,
1512      attributes: Vec::with_capacity(attribute_count),
1513      dropped_attributes_count: 0,
1514      status: SpanStatus::Unset,
1515      events: SpanEvents::default(),
1516      links: SpanLinks::default(),
1517      instrumentation_scope: self.0.clone(),
1518    };
1519    Ok(OtelSpan(RefCell::new(Box::new(OtelSpanState::Recording(
1520      span_data,
1521    )))))
1522  }
1523
1524  #[cppgc]
1525  fn start_span_foreign<'s>(
1526    &self,
1527    scope: &mut v8::PinScope<'s, '_>,
1528    parent_trace_id: v8::Local<'s, v8::Value>,
1529    parent_span_id: v8::Local<'s, v8::Value>,
1530    name: v8::Local<'s, v8::Value>,
1531    #[smi] span_kind: u8,
1532    start_time: Option<f64>,
1533    #[smi] attribute_count: usize,
1534  ) -> Result<OtelSpan, JsErrorBox> {
1535    let parent_trace_id = parse_trace_id(scope, parent_trace_id);
1536    if parent_trace_id == TraceId::INVALID {
1537      return Err(JsErrorBox::generic("invalid trace id"));
1538    };
1539    let parent_span_id = parse_span_id(scope, parent_span_id);
1540    if parent_span_id == SpanId::INVALID {
1541      return Err(JsErrorBox::generic("invalid span id"));
1542    };
1543    let OtelGlobals { id_generator, .. } = OTEL_GLOBALS.get().unwrap();
1544    let span_context = SpanContext::new(
1545      parent_trace_id,
1546      id_generator.new_span_id(),
1547      TraceFlags::SAMPLED,
1548      false,
1549      TraceState::NONE,
1550    );
1551    let name = owned_string(
1552      scope,
1553      name
1554        .try_cast()
1555        .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
1556    );
1557    let span_kind = match span_kind {
1558      0 => SpanKind::Internal,
1559      1 => SpanKind::Server,
1560      2 => SpanKind::Client,
1561      3 => SpanKind::Producer,
1562      4 => SpanKind::Consumer,
1563      _ => return Err(JsErrorBox::generic("invalid span kind")),
1564    };
1565    let start_time = start_time
1566      .map(|start_time| {
1567        SystemTime::UNIX_EPOCH
1568          .checked_add(std::time::Duration::from_secs_f64(start_time / 1000.0))
1569          .ok_or_else(|| JsErrorBox::generic("invalid start time"))
1570      })
1571      .unwrap_or_else(|| Ok(SystemTime::now()))?;
1572    let span_data = SpanData {
1573      span_context,
1574      parent_span_id,
1575      span_kind,
1576      name: Cow::Owned(name),
1577      start_time,
1578      end_time: SystemTime::UNIX_EPOCH,
1579      attributes: Vec::with_capacity(attribute_count),
1580      dropped_attributes_count: 0,
1581      status: SpanStatus::Unset,
1582      events: SpanEvents::default(),
1583      links: SpanLinks::default(),
1584      instrumentation_scope: self.0.clone(),
1585    };
1586    Ok(OtelSpan(RefCell::new(Box::new(OtelSpanState::Recording(
1587      span_data,
1588    )))))
1589  }
1590}
1591
1592#[derive(Serialize)]
1593#[serde(rename_all = "camelCase")]
1594struct JsSpanContext {
1595  trace_id: Box<str>,
1596  span_id: Box<str>,
1597  trace_flags: u8,
1598}
1599
1600#[derive(Debug, Error, JsError)]
1601#[error("OtelSpan cannot be constructed.")]
1602#[class(type)]
1603struct OtelSpanCannotBeConstructedError;
1604
1605#[derive(Debug, Error, JsError)]
1606#[error("invalid span status code")]
1607#[class(type)]
1608struct InvalidSpanStatusCodeError;
1609
1610// boxed because of https://github.com/denoland/rusty_v8/issues/1676
1611#[derive(Debug)]
1612struct OtelSpan(RefCell<Box<OtelSpanState>>);
1613
1614#[derive(Debug)]
1615#[allow(clippy::large_enum_variant)]
1616enum OtelSpanState {
1617  Recording(SpanData),
1618  Done(SpanContext),
1619}
1620
1621// SAFETY: we're sure this can be GCed
1622unsafe impl deno_core::GarbageCollected for OtelSpan {
1623  fn trace(&self, _visitor: &mut deno_core::v8::cppgc::Visitor) {}
1624
1625  fn get_name(&self) -> &'static std::ffi::CStr {
1626    c"OtelSpan"
1627  }
1628}
1629
1630#[op2]
1631impl OtelSpan {
1632  #[constructor]
1633  #[cppgc]
1634  fn new() -> Result<OtelSpan, OtelSpanCannotBeConstructedError> {
1635    Err(OtelSpanCannotBeConstructedError)
1636  }
1637
1638  #[serde]
1639  fn span_context(&self) -> JsSpanContext {
1640    let state = self.0.borrow();
1641    let span_context = match &**state {
1642      OtelSpanState::Recording(span) => &span.span_context,
1643      OtelSpanState::Done(span_context) => span_context,
1644    };
1645    JsSpanContext {
1646      trace_id: format!("{:?}", span_context.trace_id()).into(),
1647      span_id: format!("{:?}", span_context.span_id()).into(),
1648      trace_flags: span_context.trace_flags().to_u8(),
1649    }
1650  }
1651
1652  #[fast]
1653  fn set_status<'s>(
1654    &self,
1655    #[smi] status: u8,
1656    #[string] error_description: String,
1657  ) -> Result<(), InvalidSpanStatusCodeError> {
1658    let mut state = self.0.borrow_mut();
1659    let OtelSpanState::Recording(span) = &mut **state else {
1660      return Ok(());
1661    };
1662    span.status = match status {
1663      0 => SpanStatus::Unset,
1664      1 => SpanStatus::Ok,
1665      2 => SpanStatus::Error {
1666        description: Cow::Owned(error_description),
1667      },
1668      _ => return Err(InvalidSpanStatusCodeError),
1669    };
1670    Ok(())
1671  }
1672
1673  #[fast]
1674  fn add_event(&self, #[string] name: String, start_time: f64) {
1675    let start_time = if start_time.is_nan() {
1676      SystemTime::now()
1677    } else {
1678      SystemTime::UNIX_EPOCH
1679        .checked_add(Duration::from_secs_f64(start_time / 1000.0))
1680        .unwrap()
1681    };
1682    let mut state = self.0.borrow_mut();
1683    let OtelSpanState::Recording(span) = &mut **state else {
1684      return;
1685    };
1686    span
1687      .events
1688      .events
1689      .push(Event::new(name, start_time, vec![], 0));
1690  }
1691
1692  #[fast]
1693  fn drop_event(&self) {
1694    let mut state = self.0.borrow_mut();
1695    match &mut **state {
1696      OtelSpanState::Recording(span) => {
1697        span.events.dropped_count += 1;
1698      }
1699      OtelSpanState::Done(_) => {}
1700    }
1701  }
1702
1703  #[fast]
1704  fn end(&self, end_time: f64) {
1705    let end_time = if end_time.is_nan() {
1706      SystemTime::now()
1707    } else {
1708      SystemTime::UNIX_EPOCH
1709        .checked_add(Duration::from_secs_f64(end_time / 1000.0))
1710        .unwrap()
1711    };
1712
1713    let mut state = self.0.borrow_mut();
1714    if let OtelSpanState::Recording(span) = &mut **state {
1715      let span_context = span.span_context.clone();
1716      if let OtelSpanState::Recording(mut span) = *std::mem::replace(
1717        &mut *state,
1718        Box::new(OtelSpanState::Done(span_context)),
1719      ) {
1720        span.end_time = end_time;
1721        let Some(OtelGlobals { span_processor, .. }) = OTEL_GLOBALS.get()
1722        else {
1723          return;
1724        };
1725        span_processor.on_end(span);
1726      }
1727    }
1728  }
1729}
1730
1731fn span_attributes(
1732  span: &mut SpanData,
1733  location: u32,
1734) -> Option<(&mut Vec<KeyValue>, &mut u32)> {
1735  match location {
1736    // SELF
1737    0 => Some((&mut span.attributes, &mut span.dropped_attributes_count)),
1738    // LAST_EVENT
1739    1 => span
1740      .events
1741      .events
1742      .last_mut()
1743      .map(|e| (&mut e.attributes, &mut e.dropped_attributes_count)),
1744    // LAST_LINK
1745    2 => span
1746      .links
1747      .links
1748      .last_mut()
1749      .map(|e| (&mut e.attributes, &mut e.dropped_attributes_count)),
1750    _ => None,
1751  }
1752}
1753
1754#[op2(fast)]
1755fn op_otel_span_attribute1<'s>(
1756  scope: &mut v8::PinScope<'s, '_>,
1757  span: v8::Local<'_, v8::Value>,
1758  #[smi] location: u32,
1759  key: v8::Local<'s, v8::Value>,
1760  value: v8::Local<'s, v8::Value>,
1761) {
1762  let Some(span) =
1763    deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1764  else {
1765    return;
1766  };
1767  let mut state = span.0.borrow_mut();
1768  if let OtelSpanState::Recording(span) = &mut **state {
1769    let Some((attributes, dropped_attributes_count)) =
1770      span_attributes(span, location)
1771    else {
1772      return;
1773    };
1774    attr!(scope, attributes => *dropped_attributes_count, key, value);
1775  }
1776}
1777
1778#[op2(fast)]
1779fn op_otel_span_attribute2<'s>(
1780  scope: &mut v8::PinScope<'s, '_>,
1781  span: v8::Local<'_, v8::Value>,
1782  #[smi] location: u32,
1783  key1: v8::Local<'s, v8::Value>,
1784  value1: v8::Local<'s, v8::Value>,
1785  key2: v8::Local<'s, v8::Value>,
1786  value2: v8::Local<'s, v8::Value>,
1787) {
1788  let Some(span) =
1789    deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1790  else {
1791    return;
1792  };
1793  let mut state = span.0.borrow_mut();
1794  if let OtelSpanState::Recording(span) = &mut **state {
1795    let Some((attributes, dropped_attributes_count)) =
1796      span_attributes(span, location)
1797    else {
1798      return;
1799    };
1800    attr!(scope, attributes => *dropped_attributes_count, key1, value1);
1801    attr!(scope, attributes => *dropped_attributes_count, key2, value2);
1802  }
1803}
1804
1805#[allow(clippy::too_many_arguments)]
1806#[op2(fast)]
1807fn op_otel_span_attribute3<'s>(
1808  scope: &mut v8::PinScope<'s, '_>,
1809  span: v8::Local<'_, v8::Value>,
1810  #[smi] location: u32,
1811  key1: v8::Local<'s, v8::Value>,
1812  value1: v8::Local<'s, v8::Value>,
1813  key2: v8::Local<'s, v8::Value>,
1814  value2: v8::Local<'s, v8::Value>,
1815  key3: v8::Local<'s, v8::Value>,
1816  value3: v8::Local<'s, v8::Value>,
1817) {
1818  let Some(span) =
1819    deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1820  else {
1821    return;
1822  };
1823  let mut state = span.0.borrow_mut();
1824  if let OtelSpanState::Recording(span) = &mut **state {
1825    let Some((attributes, dropped_attributes_count)) =
1826      span_attributes(span, location)
1827    else {
1828      return;
1829    };
1830    attr!(scope, attributes => *dropped_attributes_count, key1, value1);
1831    attr!(scope, attributes => *dropped_attributes_count, key2, value2);
1832    attr!(scope, attributes => *dropped_attributes_count, key3, value3);
1833  }
1834}
1835
1836#[op2(fast)]
1837fn op_otel_span_update_name<'s>(
1838  scope: &mut v8::PinScope<'s, '_>,
1839  span: v8::Local<'s, v8::Value>,
1840  name: v8::Local<'s, v8::Value>,
1841) {
1842  let Ok(name) = name.try_cast() else {
1843    return;
1844  };
1845  let name = owned_string(scope, name);
1846  let Some(span) =
1847    deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1848  else {
1849    return;
1850  };
1851  let mut state = span.0.borrow_mut();
1852  if let OtelSpanState::Recording(span) = &mut **state {
1853    span.name = Cow::Owned(name)
1854  }
1855}
1856
1857#[op2(fast)]
1858fn op_otel_span_add_link<'s>(
1859  scope: &mut v8::PinScope<'s, '_>,
1860  span: v8::Local<'s, v8::Value>,
1861  trace_id: v8::Local<'s, v8::Value>,
1862  span_id: v8::Local<'s, v8::Value>,
1863  #[smi] trace_flags: u8,
1864  is_remote: bool,
1865  #[smi] dropped_attributes_count: u32,
1866) -> bool {
1867  let trace_id = parse_trace_id(scope, trace_id);
1868  if trace_id == TraceId::INVALID {
1869    return false;
1870  };
1871  let span_id = parse_span_id(scope, span_id);
1872  if span_id == SpanId::INVALID {
1873    return false;
1874  };
1875  let span_context = SpanContext::new(
1876    trace_id,
1877    span_id,
1878    TraceFlags::new(trace_flags),
1879    is_remote,
1880    TraceState::NONE,
1881  );
1882
1883  let Some(span) =
1884    deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1885  else {
1886    return true;
1887  };
1888  let mut state = span.0.borrow_mut();
1889  if let OtelSpanState::Recording(span) = &mut **state {
1890    span.links.links.push(Link::new(
1891      span_context,
1892      vec![],
1893      dropped_attributes_count,
1894    ));
1895  }
1896  true
1897}
1898
1899struct OtelMeter(opentelemetry::metrics::Meter);
1900
1901// SAFETY: we're sure this can be GCed
1902unsafe impl deno_core::GarbageCollected for OtelMeter {
1903  fn trace(&self, _visitor: &mut deno_core::v8::cppgc::Visitor) {}
1904
1905  fn get_name(&self) -> &'static std::ffi::CStr {
1906    c"OtelMeter"
1907  }
1908}
1909
1910#[op2]
1911impl OtelMeter {
1912  #[constructor]
1913  #[cppgc]
1914  fn new(
1915    #[string] name: String,
1916    #[string] version: Option<String>,
1917    #[string] schema_url: Option<String>,
1918  ) -> OtelMeter {
1919    let mut builder = opentelemetry::InstrumentationScope::builder(name);
1920    if let Some(version) = version {
1921      builder = builder.with_version(version);
1922    }
1923    if let Some(schema_url) = schema_url {
1924      builder = builder.with_schema_url(schema_url);
1925    }
1926    let scope = builder.build();
1927    let meter = OTEL_GLOBALS
1928      .get()
1929      .unwrap()
1930      .meter_provider
1931      .meter_with_scope(scope);
1932    OtelMeter(meter)
1933  }
1934
1935  #[cppgc]
1936  fn create_counter<'s>(
1937    &self,
1938    scope: &mut v8::PinScope<'s, '_>,
1939    name: v8::Local<'s, v8::Value>,
1940    description: v8::Local<'s, v8::Value>,
1941    unit: v8::Local<'s, v8::Value>,
1942  ) -> Result<Instrument, JsErrorBox> {
1943    create_instrument(
1944      |name| self.0.f64_counter(name),
1945      |i| Instrument::Counter(i.build()),
1946      scope,
1947      name,
1948      description,
1949      unit,
1950    )
1951    .map_err(|e| JsErrorBox::generic(e.to_string()))
1952  }
1953
1954  #[cppgc]
1955  fn create_up_down_counter<'s>(
1956    &self,
1957    scope: &mut v8::PinScope<'s, '_>,
1958    name: v8::Local<'s, v8::Value>,
1959    description: v8::Local<'s, v8::Value>,
1960    unit: v8::Local<'s, v8::Value>,
1961  ) -> Result<Instrument, JsErrorBox> {
1962    create_instrument(
1963      |name| self.0.f64_up_down_counter(name),
1964      |i| Instrument::UpDownCounter(i.build()),
1965      scope,
1966      name,
1967      description,
1968      unit,
1969    )
1970    .map_err(|e| JsErrorBox::generic(e.to_string()))
1971  }
1972
1973  #[cppgc]
1974  fn create_gauge<'s>(
1975    &self,
1976    scope: &mut v8::PinScope<'s, '_>,
1977    name: v8::Local<'s, v8::Value>,
1978    description: v8::Local<'s, v8::Value>,
1979    unit: v8::Local<'s, v8::Value>,
1980  ) -> Result<Instrument, JsErrorBox> {
1981    create_instrument(
1982      |name| self.0.f64_gauge(name),
1983      |i| Instrument::Gauge(i.build()),
1984      scope,
1985      name,
1986      description,
1987      unit,
1988    )
1989    .map_err(|e| JsErrorBox::generic(e.to_string()))
1990  }
1991
1992  #[cppgc]
1993  fn create_histogram<'s>(
1994    &self,
1995    scope: &mut v8::PinScope<'s, '_>,
1996    name: v8::Local<'s, v8::Value>,
1997    description: v8::Local<'s, v8::Value>,
1998    unit: v8::Local<'s, v8::Value>,
1999    #[serde] boundaries: Option<Vec<f64>>,
2000  ) -> Result<Instrument, JsErrorBox> {
2001    let name = owned_string(
2002      scope,
2003      name
2004        .try_cast()
2005        .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
2006    );
2007    let mut builder = self.0.f64_histogram(name);
2008    if !description.is_null_or_undefined() {
2009      let description = owned_string(
2010        scope,
2011        description
2012          .try_cast()
2013          .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
2014      );
2015      builder = builder.with_description(description);
2016    };
2017    if !unit.is_null_or_undefined() {
2018      let unit = owned_string(
2019        scope,
2020        unit
2021          .try_cast()
2022          .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
2023      );
2024      builder = builder.with_unit(unit);
2025    };
2026    if let Some(boundaries) = boundaries {
2027      builder = builder.with_boundaries(boundaries);
2028    }
2029
2030    Ok(Instrument::Histogram(builder.build()))
2031  }
2032
2033  #[cppgc]
2034  fn create_observable_counter<'s>(
2035    &self,
2036    scope: &mut v8::PinScope<'s, '_>,
2037    name: v8::Local<'s, v8::Value>,
2038    description: v8::Local<'s, v8::Value>,
2039    unit: v8::Local<'s, v8::Value>,
2040  ) -> Result<Instrument, JsErrorBox> {
2041    create_async_instrument(
2042      |name| self.0.f64_observable_counter(name),
2043      |i| {
2044        i.build();
2045      },
2046      scope,
2047      name,
2048      description,
2049      unit,
2050    )
2051    .map_err(|e| JsErrorBox::generic(e.to_string()))
2052  }
2053
2054  #[cppgc]
2055  fn create_observable_up_down_counter<'s>(
2056    &self,
2057    scope: &mut v8::PinScope<'s, '_>,
2058    name: v8::Local<'s, v8::Value>,
2059    description: v8::Local<'s, v8::Value>,
2060    unit: v8::Local<'s, v8::Value>,
2061  ) -> Result<Instrument, JsErrorBox> {
2062    create_async_instrument(
2063      |name| self.0.f64_observable_up_down_counter(name),
2064      |i| {
2065        i.build();
2066      },
2067      scope,
2068      name,
2069      description,
2070      unit,
2071    )
2072    .map_err(|e| JsErrorBox::generic(e.to_string()))
2073  }
2074
2075  #[cppgc]
2076  fn create_observable_gauge<'s>(
2077    &self,
2078    scope: &mut v8::PinScope<'s, '_>,
2079    name: v8::Local<'s, v8::Value>,
2080    description: v8::Local<'s, v8::Value>,
2081    unit: v8::Local<'s, v8::Value>,
2082  ) -> Result<Instrument, JsErrorBox> {
2083    create_async_instrument(
2084      |name| self.0.f64_observable_gauge(name),
2085      |i| {
2086        i.build();
2087      },
2088      scope,
2089      name,
2090      description,
2091      unit,
2092    )
2093    .map_err(|e| JsErrorBox::generic(e.to_string()))
2094  }
2095}
2096
2097enum Instrument {
2098  Counter(opentelemetry::metrics::Counter<f64>),
2099  UpDownCounter(UpDownCounter<f64>),
2100  Gauge(opentelemetry::metrics::Gauge<f64>),
2101  Histogram(Histogram<f64>),
2102  Observable(Arc<Mutex<HashMap<Vec<KeyValue>, f64>>>),
2103}
2104
2105// SAFETY: we're sure this can be GCed
2106unsafe impl GarbageCollected for Instrument {
2107  fn trace(&self, _visitor: &mut deno_core::v8::cppgc::Visitor) {}
2108
2109  fn get_name(&self) -> &'static std::ffi::CStr {
2110    c"Instrument"
2111  }
2112}
2113
2114fn create_instrument<'a, 'b, T>(
2115  cb: impl FnOnce(String) -> InstrumentBuilder<'b, T>,
2116  cb2: impl FnOnce(InstrumentBuilder<'b, T>) -> Instrument,
2117  scope: &mut v8::PinScope<'a, '_>,
2118  name: v8::Local<'a, v8::Value>,
2119  description: v8::Local<'a, v8::Value>,
2120  unit: v8::Local<'a, v8::Value>,
2121) -> Result<Instrument, v8::DataError> {
2122  let name = owned_string(scope, name.try_cast()?);
2123  let mut builder = cb(name);
2124  if !description.is_null_or_undefined() {
2125    let description = owned_string(scope, description.try_cast()?);
2126    builder = builder.with_description(description);
2127  };
2128  if !unit.is_null_or_undefined() {
2129    let unit = owned_string(scope, unit.try_cast()?);
2130    builder = builder.with_unit(unit);
2131  };
2132
2133  Ok(cb2(builder))
2134}
2135
2136fn create_async_instrument<'a, 'b, T>(
2137  cb: impl FnOnce(String) -> AsyncInstrumentBuilder<'b, T, f64>,
2138  cb2: impl FnOnce(AsyncInstrumentBuilder<'b, T, f64>),
2139  scope: &mut v8::PinScope<'a, '_>,
2140  name: v8::Local<'a, v8::Value>,
2141  description: v8::Local<'a, v8::Value>,
2142  unit: v8::Local<'a, v8::Value>,
2143) -> Result<Instrument, DataError> {
2144  let name = owned_string(scope, name.try_cast()?);
2145  let mut builder = cb(name);
2146  if !description.is_null_or_undefined() {
2147    let description = owned_string(scope, description.try_cast()?);
2148    builder = builder.with_description(description);
2149  };
2150  if !unit.is_null_or_undefined() {
2151    let unit = owned_string(scope, unit.try_cast()?);
2152    builder = builder.with_unit(unit);
2153  };
2154
2155  let data_share = Arc::new(Mutex::new(HashMap::new()));
2156  let data_share_: Arc<Mutex<HashMap<Vec<KeyValue>, f64>>> = data_share.clone();
2157  builder = builder.with_callback(move |i| {
2158    let data = {
2159      let mut data = data_share_.lock().unwrap();
2160      std::mem::take(&mut *data)
2161    };
2162    for (attributes, value) in data {
2163      i.observe(value, &attributes);
2164    }
2165  });
2166  cb2(builder);
2167
2168  Ok(Instrument::Observable(data_share))
2169}
2170
2171struct MetricAttributes {
2172  attributes: Vec<KeyValue>,
2173}
2174
2175#[op2(fast)]
2176fn op_otel_metric_record0(
2177  state: &mut OpState,
2178  #[cppgc] instrument: &Instrument,
2179  value: f64,
2180) {
2181  let values = state.try_take::<MetricAttributes>();
2182  let attributes = match &values {
2183    Some(values) => &*values.attributes,
2184    None => &[],
2185  };
2186  match instrument {
2187    Instrument::Counter(counter) => counter.add(value, attributes),
2188    Instrument::UpDownCounter(counter) => counter.add(value, attributes),
2189    Instrument::Gauge(gauge) => gauge.record(value, attributes),
2190    Instrument::Histogram(histogram) => histogram.record(value, attributes),
2191    _ => {}
2192  }
2193}
2194
2195#[op2(fast)]
2196fn op_otel_metric_record1(
2197  state: &mut OpState,
2198  scope: &mut v8::PinScope<'_, '_>,
2199  instrument: v8::Local<'_, v8::Value>,
2200  value: f64,
2201  key1: v8::Local<'_, v8::Value>,
2202  value1: v8::Local<'_, v8::Value>,
2203) {
2204  let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2205    &mut *scope,
2206    instrument,
2207  ) else {
2208    return;
2209  };
2210  let mut values = state.try_take::<MetricAttributes>();
2211  let attr1 = attr_raw!(scope, key1, value1);
2212  let attributes = match &mut values {
2213    Some(values) => {
2214      if let Some(kv) = attr1 {
2215        values.attributes.reserve_exact(1);
2216        values.attributes.push(kv);
2217      }
2218      &*values.attributes
2219    }
2220    None => match attr1 {
2221      Some(kv1) => &[kv1] as &[KeyValue],
2222      None => &[],
2223    },
2224  };
2225  match &*instrument {
2226    Instrument::Counter(counter) => counter.add(value, attributes),
2227    Instrument::UpDownCounter(counter) => counter.add(value, attributes),
2228    Instrument::Gauge(gauge) => gauge.record(value, attributes),
2229    Instrument::Histogram(histogram) => histogram.record(value, attributes),
2230    _ => {}
2231  }
2232}
2233
2234#[allow(clippy::too_many_arguments)]
2235#[op2(fast)]
2236fn op_otel_metric_record2(
2237  state: &mut OpState,
2238  scope: &mut v8::PinScope<'_, '_>,
2239  instrument: v8::Local<'_, v8::Value>,
2240  value: f64,
2241  key1: v8::Local<'_, v8::Value>,
2242  value1: v8::Local<'_, v8::Value>,
2243  key2: v8::Local<'_, v8::Value>,
2244  value2: v8::Local<'_, v8::Value>,
2245) {
2246  let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2247    &mut *scope,
2248    instrument,
2249  ) else {
2250    return;
2251  };
2252  let mut values = state.try_take::<MetricAttributes>();
2253  let attr1 = attr_raw!(scope, key1, value1);
2254  let attr2 = attr_raw!(scope, key2, value2);
2255  let attributes = match &mut values {
2256    Some(values) => {
2257      values.attributes.reserve_exact(2);
2258      if let Some(kv1) = attr1 {
2259        values.attributes.push(kv1);
2260      }
2261      if let Some(kv2) = attr2 {
2262        values.attributes.push(kv2);
2263      }
2264      &*values.attributes
2265    }
2266    None => match (attr1, attr2) {
2267      (Some(kv1), Some(kv2)) => &[kv1, kv2] as &[KeyValue],
2268      (Some(kv1), None) => &[kv1],
2269      (None, Some(kv2)) => &[kv2],
2270      (None, None) => &[],
2271    },
2272  };
2273  match &*instrument {
2274    Instrument::Counter(counter) => counter.add(value, attributes),
2275    Instrument::UpDownCounter(counter) => counter.add(value, attributes),
2276    Instrument::Gauge(gauge) => gauge.record(value, attributes),
2277    Instrument::Histogram(histogram) => histogram.record(value, attributes),
2278    _ => {}
2279  }
2280}
2281
2282#[allow(clippy::too_many_arguments)]
2283#[op2(fast)]
2284fn op_otel_metric_record3(
2285  state: &mut OpState,
2286  scope: &mut v8::PinScope<'_, '_>,
2287  instrument: v8::Local<'_, v8::Value>,
2288  value: f64,
2289  key1: v8::Local<'_, v8::Value>,
2290  value1: v8::Local<'_, v8::Value>,
2291  key2: v8::Local<'_, v8::Value>,
2292  value2: v8::Local<'_, v8::Value>,
2293  key3: v8::Local<'_, v8::Value>,
2294  value3: v8::Local<'_, v8::Value>,
2295) {
2296  let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2297    &mut *scope,
2298    instrument,
2299  ) else {
2300    return;
2301  };
2302  let mut values = state.try_take::<MetricAttributes>();
2303  let attr1 = attr_raw!(scope, key1, value1);
2304  let attr2 = attr_raw!(scope, key2, value2);
2305  let attr3 = attr_raw!(scope, key3, value3);
2306  let attributes = match &mut values {
2307    Some(values) => {
2308      values.attributes.reserve_exact(3);
2309      if let Some(kv1) = attr1 {
2310        values.attributes.push(kv1);
2311      }
2312      if let Some(kv2) = attr2 {
2313        values.attributes.push(kv2);
2314      }
2315      if let Some(kv3) = attr3 {
2316        values.attributes.push(kv3);
2317      }
2318      &*values.attributes
2319    }
2320    None => match (attr1, attr2, attr3) {
2321      (Some(kv1), Some(kv2), Some(kv3)) => &[kv1, kv2, kv3] as &[KeyValue],
2322      (Some(kv1), Some(kv2), None) => &[kv1, kv2],
2323      (Some(kv1), None, Some(kv3)) => &[kv1, kv3],
2324      (None, Some(kv2), Some(kv3)) => &[kv2, kv3],
2325      (Some(kv1), None, None) => &[kv1],
2326      (None, Some(kv2), None) => &[kv2],
2327      (None, None, Some(kv3)) => &[kv3],
2328      (None, None, None) => &[],
2329    },
2330  };
2331  match &*instrument {
2332    Instrument::Counter(counter) => counter.add(value, attributes),
2333    Instrument::UpDownCounter(counter) => counter.add(value, attributes),
2334    Instrument::Gauge(gauge) => gauge.record(value, attributes),
2335    Instrument::Histogram(histogram) => histogram.record(value, attributes),
2336    _ => {}
2337  }
2338}
2339
2340#[op2(fast)]
2341fn op_otel_metric_observable_record0(
2342  state: &mut OpState,
2343  #[cppgc] instrument: &Instrument,
2344  value: f64,
2345) {
2346  let values = state.try_take::<MetricAttributes>();
2347  let attributes = values.map(|attr| attr.attributes).unwrap_or_default();
2348  if let Instrument::Observable(data_share) = instrument {
2349    let mut data = data_share.lock().unwrap();
2350    data.insert(attributes, value);
2351  }
2352}
2353
2354#[op2(fast)]
2355fn op_otel_metric_observable_record1(
2356  state: &mut OpState,
2357  scope: &mut v8::PinScope<'_, '_>,
2358  instrument: v8::Local<'_, v8::Value>,
2359  value: f64,
2360  key1: v8::Local<'_, v8::Value>,
2361  value1: v8::Local<'_, v8::Value>,
2362) {
2363  let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2364    &mut *scope,
2365    instrument,
2366  ) else {
2367    return;
2368  };
2369  let values = state.try_take::<MetricAttributes>();
2370  let attr1 = attr_raw!(scope, key1, value1);
2371  let mut attributes = values
2372    .map(|mut attr| {
2373      attr.attributes.reserve_exact(1);
2374      attr.attributes
2375    })
2376    .unwrap_or_else(|| Vec::with_capacity(1));
2377  if let Some(kv1) = attr1 {
2378    attributes.push(kv1);
2379  }
2380  if let Instrument::Observable(data_share) = &*instrument {
2381    let mut data = data_share.lock().unwrap();
2382    data.insert(attributes, value);
2383  }
2384}
2385
2386#[allow(clippy::too_many_arguments)]
2387#[op2(fast)]
2388fn op_otel_metric_observable_record2(
2389  state: &mut OpState,
2390  scope: &mut v8::PinScope<'_, '_>,
2391  instrument: v8::Local<'_, v8::Value>,
2392  value: f64,
2393  key1: v8::Local<'_, v8::Value>,
2394  value1: v8::Local<'_, v8::Value>,
2395  key2: v8::Local<'_, v8::Value>,
2396  value2: v8::Local<'_, v8::Value>,
2397) {
2398  let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2399    &mut *scope,
2400    instrument,
2401  ) else {
2402    return;
2403  };
2404  let values = state.try_take::<MetricAttributes>();
2405  let mut attributes = values
2406    .map(|mut attr| {
2407      attr.attributes.reserve_exact(2);
2408      attr.attributes
2409    })
2410    .unwrap_or_else(|| Vec::with_capacity(2));
2411  let attr1 = attr_raw!(scope, key1, value1);
2412  let attr2 = attr_raw!(scope, key2, value2);
2413  if let Some(kv1) = attr1 {
2414    attributes.push(kv1);
2415  }
2416  if let Some(kv2) = attr2 {
2417    attributes.push(kv2);
2418  }
2419  if let Instrument::Observable(data_share) = &*instrument {
2420    let mut data = data_share.lock().unwrap();
2421    data.insert(attributes, value);
2422  }
2423}
2424
2425#[allow(clippy::too_many_arguments)]
2426#[op2(fast)]
2427fn op_otel_metric_observable_record3(
2428  state: &mut OpState,
2429  scope: &mut v8::PinScope<'_, '_>,
2430  instrument: v8::Local<'_, v8::Value>,
2431  value: f64,
2432  key1: v8::Local<'_, v8::Value>,
2433  value1: v8::Local<'_, v8::Value>,
2434  key2: v8::Local<'_, v8::Value>,
2435  value2: v8::Local<'_, v8::Value>,
2436  key3: v8::Local<'_, v8::Value>,
2437  value3: v8::Local<'_, v8::Value>,
2438) {
2439  let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2440    &mut *scope,
2441    instrument,
2442  ) else {
2443    return;
2444  };
2445  let values = state.try_take::<MetricAttributes>();
2446  let mut attributes = values
2447    .map(|mut attr| {
2448      attr.attributes.reserve_exact(3);
2449      attr.attributes
2450    })
2451    .unwrap_or_else(|| Vec::with_capacity(3));
2452  let attr1 = attr_raw!(scope, key1, value1);
2453  let attr2 = attr_raw!(scope, key2, value2);
2454  let attr3 = attr_raw!(scope, key3, value3);
2455  if let Some(kv1) = attr1 {
2456    attributes.push(kv1);
2457  }
2458  if let Some(kv2) = attr2 {
2459    attributes.push(kv2);
2460  }
2461  if let Some(kv3) = attr3 {
2462    attributes.push(kv3);
2463  }
2464  if let Instrument::Observable(data_share) = &*instrument {
2465    let mut data = data_share.lock().unwrap();
2466    data.insert(attributes, value);
2467  }
2468}
2469
2470#[allow(clippy::too_many_arguments)]
2471#[op2(fast)]
2472fn op_otel_metric_attribute3<'s>(
2473  scope: &mut v8::PinScope<'s, '_>,
2474  state: &mut OpState,
2475  #[smi] capacity: u32,
2476  key1: v8::Local<'s, v8::Value>,
2477  value1: v8::Local<'s, v8::Value>,
2478  key2: v8::Local<'s, v8::Value>,
2479  value2: v8::Local<'s, v8::Value>,
2480  key3: v8::Local<'s, v8::Value>,
2481  value3: v8::Local<'s, v8::Value>,
2482) {
2483  let mut values = state.try_borrow_mut::<MetricAttributes>();
2484  let attr1 = attr_raw!(scope, key1, value1);
2485  let attr2 = attr_raw!(scope, key2, value2);
2486  let attr3 = attr_raw!(scope, key3, value3);
2487  if let Some(values) = &mut values {
2488    values.attributes.reserve_exact(
2489      (capacity as usize).saturating_sub(values.attributes.capacity()),
2490    );
2491    if let Some(kv1) = attr1 {
2492      values.attributes.push(kv1);
2493    }
2494    if let Some(kv2) = attr2 {
2495      values.attributes.push(kv2);
2496    }
2497    if let Some(kv3) = attr3 {
2498      values.attributes.push(kv3);
2499    }
2500  } else {
2501    let mut attributes = Vec::with_capacity(capacity as usize);
2502    if let Some(kv1) = attr1 {
2503      attributes.push(kv1);
2504    }
2505    if let Some(kv2) = attr2 {
2506      attributes.push(kv2);
2507    }
2508    if let Some(kv3) = attr3 {
2509      attributes.push(kv3);
2510    }
2511    state.put(MetricAttributes { attributes });
2512  }
2513}
2514
2515struct ObservationDone(oneshot::Sender<()>);
2516
2517#[op2]
2518async fn op_otel_metric_wait_to_observe(state: Rc<RefCell<OpState>>) -> bool {
2519  let (tx, rx) = oneshot::channel();
2520  {
2521    OTEL_PRE_COLLECT_CALLBACKS
2522      .lock()
2523      .expect("mutex poisoned")
2524      .push(tx);
2525  }
2526  match rx.await {
2527    Ok(done) => {
2528      state.borrow_mut().put(ObservationDone(done));
2529      true
2530    }
2531    _ => false,
2532  }
2533}
2534
2535#[op2(fast)]
2536fn op_otel_metric_observation_done(state: &mut OpState) {
2537  if let Some(ObservationDone(done)) = state.try_take::<ObservationDone>() {
2538    let _ = done.send(());
2539  }
2540}
2541
2542struct GcMetricDataInner {
2543  start: Instant,
2544  duration: Histogram<f64>,
2545}
2546
2547struct GcMetricData(RefCell<GcMetricDataInner>);
2548
2549impl GcMetricData {
2550  extern "C" fn prologue_callback(
2551    isolate: v8::UnsafeRawIsolatePtr,
2552    _gc_type: v8::GCType,
2553    _flags: v8::GCCallbackFlags,
2554    _data: *mut c_void,
2555  ) {
2556    // SAFETY: Isolate is valid during callback
2557    let isolate =
2558      unsafe { v8::Isolate::from_raw_isolate_ptr_unchecked(isolate) };
2559    let this = isolate.get_slot::<Self>().unwrap();
2560    this.0.borrow_mut().start = Instant::now();
2561  }
2562
2563  extern "C" fn epilogue_callback(
2564    isolate: v8::UnsafeRawIsolatePtr,
2565    gc_type: v8::GCType,
2566    _flags: v8::GCCallbackFlags,
2567    _data: *mut c_void,
2568  ) {
2569    // SAFETY: Isolate is valid during callback
2570    let isolate =
2571      unsafe { v8::Isolate::from_raw_isolate_ptr_unchecked(isolate) };
2572    let this = isolate.get_slot::<Self>().unwrap();
2573    let this = this.0.borrow_mut();
2574
2575    let elapsed = this.start.elapsed();
2576
2577    // https://opentelemetry.io/docs/specs/semconv/runtime/v8js-metrics/#metric-v8jsgcduration
2578    let gc_type = KeyValue::new(
2579      "v8js.gc.type",
2580      match gc_type {
2581        v8::GCType::kGCTypeScavenge => "minor",
2582        v8::GCType::kGCTypeMinorMarkSweep => "minor",
2583        v8::GCType::kGCTypeMarkSweepCompact => "major",
2584        v8::GCType::kGCTypeIncrementalMarking => "incremental",
2585        v8::GCType::kGCTypeProcessWeakCallbacks => "weakcb",
2586        _ => return,
2587      },
2588    );
2589
2590    this.duration.record(elapsed.as_secs_f64(), &[gc_type]);
2591  }
2592}
2593
2594#[derive(Clone)]
2595struct HeapMetricData {
2596  heap_limit: Gauge<u64>,
2597  heap_size: Gauge<u64>,
2598  available_size: Gauge<u64>,
2599  physical_size: Gauge<u64>,
2600}
2601
2602#[op2(fast)]
2603fn op_otel_enable_isolate_metrics(scope: &mut v8::PinScope<'_, '_>) {
2604  if scope.get_slot::<GcMetricData>().is_some() {
2605    return;
2606  }
2607
2608  let meter = OTEL_GLOBALS.get().unwrap().meter_provider.meter("v8js");
2609
2610  // https://opentelemetry.io/docs/specs/semconv/runtime/v8js-metrics/#metric-v8jsgcduration
2611  let duration = meter
2612    .f64_histogram("v8js.gc.duration")
2613    .with_unit("S")
2614    .with_description("Garbage collection duration")
2615    .with_boundaries(vec![0.01, 0.1, 1.0, 10.0])
2616    .build();
2617
2618  scope.set_slot(GcMetricData(RefCell::new(GcMetricDataInner {
2619    start: Instant::now(),
2620    duration,
2621  })));
2622
2623  scope.add_gc_prologue_callback(
2624    GcMetricData::prologue_callback,
2625    std::ptr::null_mut(),
2626    v8::GCType::kGCTypeAll,
2627  );
2628
2629  scope.add_gc_epilogue_callback(
2630    GcMetricData::epilogue_callback,
2631    std::ptr::null_mut(),
2632    v8::GCType::kGCTypeAll,
2633  );
2634
2635  let heap_limit = meter
2636    .u64_gauge("v8js.memory.heap.limit")
2637    .with_unit("By")
2638    .with_description("Total heap memory size pre-allocated.")
2639    .build();
2640  let heap_size = meter
2641    .u64_gauge("v8js.memory.heap.size")
2642    .with_unit("By")
2643    .with_description("Heap Memory size allocated.")
2644    .build();
2645  let available_size = meter
2646    .u64_gauge("v8js.memory.space.available_size")
2647    .with_unit("By")
2648    .with_description("Heap space available size.")
2649    .build();
2650  let physical_size = meter
2651    .u64_gauge("v8js.memory.space.physical_size")
2652    .with_unit("By")
2653    .with_description("Committed size of a heap space.")
2654    .build();
2655
2656  scope.set_slot(HeapMetricData {
2657    heap_limit,
2658    heap_size,
2659    available_size,
2660    physical_size,
2661  });
2662}
2663
2664#[op2(fast)]
2665fn op_otel_collect_isolate_metrics(scope: &mut v8::PinScope<'_, '_>) {
2666  let data = scope.get_slot::<HeapMetricData>().unwrap().clone();
2667  for i in 0..scope.get_number_of_data_slots() {
2668    let Some(space) = scope.get_heap_space_statistics(i as _) else {
2669      continue;
2670    };
2671    // SAFETY: api has wrong lifetime, 'static is correct:
2672    // https://github.com/denoland/rusty_v8/pull/1744
2673    let space_name: &'static std::ffi::CStr =
2674      unsafe { std::ffi::CStr::from_ptr(space.space_name().as_ptr()) };
2675    let Ok(space_name) = space_name.to_str() else {
2676      continue;
2677    };
2678    let attributes = [KeyValue::new("v8js.heap.space.name", space_name)];
2679    data.heap_limit.record(space.space_size() as _, &attributes);
2680    data
2681      .heap_size
2682      .record(space.space_used_size() as _, &attributes);
2683    data
2684      .available_size
2685      .record(space.space_available_size() as _, &attributes);
2686    data
2687      .physical_size
2688      .record(space.physical_space_size() as _, &attributes);
2689  }
2690}