1#![allow(clippy::too_many_arguments)]
4#![expect(unexpected_cfgs)]
5
6use std::borrow::Cow;
7use std::cell::RefCell;
8use std::collections::HashMap;
9use std::env;
10use std::ffi::c_void;
11use std::fmt::Debug;
12use std::pin::Pin;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::sync::atomic::AtomicU64;
17use std::task::Context;
18use std::task::Poll;
19use std::thread;
20use std::time::Duration;
21use std::time::Instant;
22use std::time::SystemTime;
23
24use deno_core::GarbageCollected;
25use deno_core::OpState;
26use deno_core::futures::FutureExt;
27use deno_core::futures::Stream;
28use deno_core::futures::StreamExt;
29use deno_core::futures::channel::mpsc;
30use deno_core::futures::channel::mpsc::UnboundedSender;
31use deno_core::futures::future::BoxFuture;
32use deno_core::futures::stream;
33use deno_core::op2;
34use deno_core::v8;
35use deno_core::v8::DataError;
36use deno_error::JsError;
37use deno_error::JsErrorBox;
38use once_cell::sync::Lazy;
39use once_cell::sync::OnceCell;
40use opentelemetry::InstrumentationScope;
41pub use opentelemetry::Key;
42pub use opentelemetry::KeyValue;
43pub use opentelemetry::StringValue;
44pub use opentelemetry::Value;
45use opentelemetry::logs::AnyValue;
46use opentelemetry::logs::LogRecord as LogRecordTrait;
47use opentelemetry::logs::Severity;
48use opentelemetry::metrics::AsyncInstrumentBuilder;
49pub use opentelemetry::metrics::Gauge;
50pub use opentelemetry::metrics::Histogram;
51use opentelemetry::metrics::InstrumentBuilder;
52pub use opentelemetry::metrics::MeterProvider;
53pub use opentelemetry::metrics::UpDownCounter;
54use opentelemetry::otel_debug;
55use opentelemetry::otel_error;
56use opentelemetry::trace::Event;
57use opentelemetry::trace::Link;
58use opentelemetry::trace::SpanContext;
59use opentelemetry::trace::SpanId;
60use opentelemetry::trace::SpanKind;
61use opentelemetry::trace::Status as SpanStatus;
62use opentelemetry::trace::TraceFlags;
63use opentelemetry::trace::TraceId;
64use opentelemetry::trace::TraceState;
65use opentelemetry_otlp::HttpExporterBuilder;
66use opentelemetry_otlp::Protocol;
67use opentelemetry_otlp::WithExportConfig;
68use opentelemetry_otlp::WithHttpConfig;
69use opentelemetry_sdk::Resource;
70use opentelemetry_sdk::export::trace::SpanData;
71use opentelemetry_sdk::logs::BatchLogProcessor;
72use opentelemetry_sdk::logs::LogProcessor;
73use opentelemetry_sdk::logs::LogRecord;
74use opentelemetry_sdk::metrics::ManualReader;
75use opentelemetry_sdk::metrics::MetricResult;
76use opentelemetry_sdk::metrics::SdkMeterProvider;
77use opentelemetry_sdk::metrics::Temporality;
78use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
79use opentelemetry_sdk::metrics::reader::MetricReader;
80use opentelemetry_sdk::trace::BatchSpanProcessor;
81use opentelemetry_sdk::trace::IdGenerator;
82use opentelemetry_sdk::trace::RandomIdGenerator;
83use opentelemetry_sdk::trace::SpanEvents;
84use opentelemetry_sdk::trace::SpanLinks;
85use opentelemetry_sdk::trace::SpanProcessor as _;
86use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_NAME;
87use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_VERSION;
88use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_LANGUAGE;
89use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_NAME;
90use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_VERSION;
91use serde::Deserialize;
92use serde::Serialize;
93use thiserror::Error;
94use tokio::sync::oneshot;
95use tokio::task::JoinSet;
96
97deno_core::extension!(
98 deno_telemetry,
99 ops = [
100 op_otel_collect_isolate_metrics,
101 op_otel_enable_isolate_metrics,
102 op_otel_log,
103 op_otel_log_foreign,
104 op_otel_span_attribute1,
105 op_otel_span_attribute2,
106 op_otel_span_attribute3,
107 op_otel_span_add_link,
108 op_otel_span_update_name,
109 op_otel_metric_attribute3,
110 op_otel_metric_record0,
111 op_otel_metric_record1,
112 op_otel_metric_record2,
113 op_otel_metric_record3,
114 op_otel_metric_observable_record0,
115 op_otel_metric_observable_record1,
116 op_otel_metric_observable_record2,
117 op_otel_metric_observable_record3,
118 op_otel_metric_wait_to_observe,
119 op_otel_metric_observation_done,
120 ],
121 objects = [OtelTracer, OtelMeter, OtelSpan],
122 esm = ["telemetry.ts", "util.ts"],
123);
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct OtelRuntimeConfig {
127 pub runtime_name: Cow<'static, str>,
128 pub runtime_version: Cow<'static, str>,
129}
130
131#[derive(Default, Debug, Clone, Serialize, Deserialize)]
132pub struct OtelConfig {
133 pub tracing_enabled: bool,
134 pub metrics_enabled: bool,
135 pub console: OtelConsoleConfig,
136 pub deterministic_prefix: Option<u8>,
137 pub propagators: std::collections::HashSet<OtelPropagators>,
138}
139
140impl OtelConfig {
141 pub fn as_v8(&self) -> Box<[u8]> {
142 let mut data = vec![
143 self.tracing_enabled as u8,
144 self.metrics_enabled as u8,
145 self.console as u8,
146 ];
147
148 data.extend(self.propagators.iter().map(|propagator| *propagator as u8));
149
150 data.into_boxed_slice()
151 }
152}
153
154#[derive(
155 Default, Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash,
156)]
157#[repr(u8)]
158pub enum OtelPropagators {
159 TraceContext = 0,
160 Baggage = 1,
161 #[default]
162 None = 2,
163}
164
165#[derive(
166 Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize,
167)]
168#[repr(u8)]
169pub enum OtelConsoleConfig {
170 #[default]
171 Ignore = 0,
172 Capture = 1,
173 Replace = 2,
174}
175
176static OTEL_SHARED_RUNTIME_SPAWN_TASK_TX: Lazy<
177 UnboundedSender<BoxFuture<'static, ()>>,
178> = Lazy::new(otel_create_shared_runtime);
179
180static OTEL_PRE_COLLECT_CALLBACKS: Lazy<
181 Mutex<Vec<oneshot::Sender<oneshot::Sender<()>>>>,
182> = Lazy::new(Default::default);
183
184fn otel_create_shared_runtime() -> UnboundedSender<BoxFuture<'static, ()>> {
185 let (spawn_task_tx, mut spawn_task_rx) =
186 mpsc::unbounded::<BoxFuture<'static, ()>>();
187
188 thread::spawn(move || {
189 let rt = tokio::runtime::Builder::new_current_thread()
190 .enable_io()
191 .enable_time()
192 .max_blocking_threads(if cfg!(windows) {
198 4 * std::thread::available_parallelism()
201 .map(|n| n.get())
202 .unwrap_or(8)
203 } else {
204 32
205 })
206 .build()
207 .unwrap();
208
209 rt.block_on(async move {
210 while let Some(task) = spawn_task_rx.next().await {
211 tokio::spawn(task);
212 }
213 });
214 });
215
216 spawn_task_tx
217}
218
219#[derive(Clone, Copy)]
220pub struct OtelSharedRuntime;
221
222impl hyper::rt::Executor<BoxFuture<'static, ()>> for OtelSharedRuntime {
223 fn execute(&self, fut: BoxFuture<'static, ()>) {
224 (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
225 .unbounded_send(fut)
226 .expect("failed to send task to shared OpenTelemetry runtime");
227 }
228}
229
230impl opentelemetry_sdk::runtime::Runtime for OtelSharedRuntime {
231 type Interval = Pin<Box<dyn Stream<Item = ()> + Send + 'static>>;
232 type Delay = Pin<Box<tokio::time::Sleep>>;
233
234 fn interval(&self, period: Duration) -> Self::Interval {
235 stream::repeat(())
236 .then(move |_| tokio::time::sleep(period))
237 .boxed()
238 }
239
240 fn spawn(&self, future: BoxFuture<'static, ()>) {
241 (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
242 .unbounded_send(future)
243 .expect("failed to send task to shared OpenTelemetry runtime");
244 }
245
246 fn delay(&self, duration: Duration) -> Self::Delay {
247 Box::pin(tokio::time::sleep(duration))
248 }
249}
250
251impl opentelemetry_sdk::runtime::RuntimeChannel for OtelSharedRuntime {
252 type Receiver<T: Debug + Send> = BatchMessageChannelReceiver<T>;
253 type Sender<T: Debug + Send> = BatchMessageChannelSender<T>;
254
255 fn batch_message_channel<T: Debug + Send>(
256 &self,
257 capacity: usize,
258 ) -> (Self::Sender<T>, Self::Receiver<T>) {
259 let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<T>(capacity);
260 (batch_tx.into(), batch_rx.into())
261 }
262}
263
264#[derive(Debug)]
265pub struct BatchMessageChannelSender<T: Send> {
266 sender: tokio::sync::mpsc::Sender<T>,
267}
268
269impl<T: Send> From<tokio::sync::mpsc::Sender<T>>
270 for BatchMessageChannelSender<T>
271{
272 fn from(sender: tokio::sync::mpsc::Sender<T>) -> Self {
273 Self { sender }
274 }
275}
276
277impl<T: Send> opentelemetry_sdk::runtime::TrySend
278 for BatchMessageChannelSender<T>
279{
280 type Message = T;
281
282 fn try_send(
283 &self,
284 item: Self::Message,
285 ) -> Result<(), opentelemetry_sdk::runtime::TrySendError> {
286 self.sender.try_send(item).map_err(|err| match err {
287 tokio::sync::mpsc::error::TrySendError::Full(_) => {
288 opentelemetry_sdk::runtime::TrySendError::ChannelFull
289 }
290 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
291 opentelemetry_sdk::runtime::TrySendError::ChannelClosed
292 }
293 })
294 }
295}
296
297pub struct BatchMessageChannelReceiver<T> {
298 receiver: tokio::sync::mpsc::Receiver<T>,
299}
300
301impl<T> From<tokio::sync::mpsc::Receiver<T>>
302 for BatchMessageChannelReceiver<T>
303{
304 fn from(receiver: tokio::sync::mpsc::Receiver<T>) -> Self {
305 Self { receiver }
306 }
307}
308
309impl<T> Stream for BatchMessageChannelReceiver<T> {
310 type Item = T;
311
312 fn poll_next(
313 mut self: Pin<&mut Self>,
314 cx: &mut Context<'_>,
315 ) -> Poll<Option<Self::Item>> {
316 self.receiver.poll_recv(cx)
317 }
318}
319
320enum DenoPeriodicReaderMessage {
321 Register(std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>),
322 Export,
323 ForceFlush(oneshot::Sender<MetricResult<()>>),
324 Shutdown(oneshot::Sender<MetricResult<()>>),
325}
326
327#[derive(Debug)]
328struct DenoPeriodicReader {
329 tx: tokio::sync::mpsc::Sender<DenoPeriodicReaderMessage>,
330 temporality: Temporality,
331}
332
333impl MetricReader for DenoPeriodicReader {
334 fn register_pipeline(
335 &self,
336 pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>,
337 ) {
338 let _ = self
339 .tx
340 .try_send(DenoPeriodicReaderMessage::Register(pipeline));
341 }
342
343 fn collect(
344 &self,
345 _rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
346 ) -> opentelemetry_sdk::metrics::MetricResult<()> {
347 unreachable!("collect should not be called on DenoPeriodicReader");
348 }
349
350 fn force_flush(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
351 let (tx, rx) = oneshot::channel();
352 let _ = self.tx.try_send(DenoPeriodicReaderMessage::ForceFlush(tx));
353 deno_core::futures::executor::block_on(rx).unwrap()?;
354 Ok(())
355 }
356
357 fn shutdown(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
358 let (tx, rx) = oneshot::channel();
359 let _ = self.tx.try_send(DenoPeriodicReaderMessage::Shutdown(tx));
360 deno_core::futures::executor::block_on(rx).unwrap()?;
361 Ok(())
362 }
363
364 fn temporality(
365 &self,
366 _kind: opentelemetry_sdk::metrics::InstrumentKind,
367 ) -> Temporality {
368 self.temporality
369 }
370}
371
372const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
373const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
374
375impl DenoPeriodicReader {
376 fn new(exporter: opentelemetry_otlp::MetricExporter) -> Self {
377 let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
378 .ok()
379 .and_then(|v| v.parse().map(Duration::from_millis).ok())
380 .unwrap_or(DEFAULT_INTERVAL);
381
382 let (tx, mut rx) = tokio::sync::mpsc::channel(256);
383
384 let temporality = PushMetricExporter::temporality(&exporter);
385
386 let worker = async move {
387 let inner = ManualReader::builder()
388 .with_temporality(PushMetricExporter::temporality(&exporter))
389 .build();
390
391 let collect_and_export = |collect_observed: bool| {
392 let inner = &inner;
393 let exporter = &exporter;
394 async move {
395 let mut resource_metrics =
396 opentelemetry_sdk::metrics::data::ResourceMetrics {
397 resource: Default::default(),
398 scope_metrics: Default::default(),
399 };
400 if collect_observed {
401 let callbacks = {
402 let mut callbacks = OTEL_PRE_COLLECT_CALLBACKS.lock().unwrap();
403 std::mem::take(&mut *callbacks)
404 };
405 let mut futures = JoinSet::new();
406 for callback in callbacks {
407 let (tx, rx) = oneshot::channel();
408 if let Ok(()) = callback.send(tx) {
409 futures.spawn(rx);
410 }
411 }
412 while futures.join_next().await.is_some() {}
413 }
414 inner.collect(&mut resource_metrics)?;
415 if resource_metrics.scope_metrics.is_empty() {
416 return Ok(());
417 }
418 exporter.export(&mut resource_metrics).await?;
419 Ok(())
420 }
421 };
422
423 let mut ticker = tokio::time::interval(interval);
424 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
425 ticker.tick().await;
426
427 loop {
428 let message = tokio::select! {
429 _ = ticker.tick() => DenoPeriodicReaderMessage::Export,
430 message = rx.recv() => if let Some(message) = message {
431 message
432 } else {
433 break;
434 },
435 };
436
437 match message {
438 DenoPeriodicReaderMessage::Register(new_pipeline) => {
439 inner.register_pipeline(new_pipeline);
440 }
441 DenoPeriodicReaderMessage::Export => {
442 otel_debug!(
443 name: "DenoPeriodicReader.ExportTriggered",
444 message = "Export message received.",
445 );
446 if let Err(err) = collect_and_export(true).await {
447 otel_error!(
448 name: "DenoPeriodicReader.ExportFailed",
449 message = "Failed to export metrics",
450 reason = format!("{}", err));
451 }
452 }
453 DenoPeriodicReaderMessage::ForceFlush(sender) => {
454 otel_debug!(
455 name: "DenoPeriodicReader.ForceFlushCalled",
456 message = "Flush message received.",
457 );
458 let res = collect_and_export(false).await;
459 if let Err(send_error) = sender.send(res) {
460 otel_debug!(
461 name: "DenoPeriodicReader.Flush.SendResultError",
462 message = "Failed to send flush result.",
463 reason = format!("{:?}", send_error),
464 );
465 }
466 }
467 DenoPeriodicReaderMessage::Shutdown(sender) => {
468 otel_debug!(
469 name: "DenoPeriodicReader.ShutdownCalled",
470 message = "Shutdown message received",
471 );
472 let res = collect_and_export(false).await;
473 let _ = exporter.shutdown();
474 if let Err(send_error) = sender.send(res) {
475 otel_debug!(
476 name: "DenoPeriodicReader.Shutdown.SendResultError",
477 message = "Failed to send shutdown result",
478 reason = format!("{:?}", send_error),
479 );
480 }
481 break;
482 }
483 }
484 }
485 };
486
487 (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
488 .unbounded_send(worker.boxed())
489 .expect("failed to send task to shared OpenTelemetry runtime");
490
491 DenoPeriodicReader { tx, temporality }
492 }
493}
494
495mod hyper_client {
496 use std::fmt::Debug;
497 use std::pin::Pin;
498 use std::task::Poll;
499
500 use deno_net::tunnel::TunnelConnection;
501 use deno_net::tunnel::TunnelStream;
502 use deno_net::tunnel::get_tunnel;
503 use deno_tls::SocketUse;
504 use deno_tls::TlsKey;
505 use deno_tls::TlsKeys;
506 use deno_tls::create_client_config;
507 use deno_tls::load_certs;
508 use deno_tls::load_private_keys;
509 use http_body_util::BodyExt;
510 use http_body_util::Full;
511 use hyper::Uri;
512 use hyper_rustls::HttpsConnector;
513 use hyper_rustls::MaybeHttpsStream;
514 use hyper_util::client::legacy::Client;
515 use hyper_util::client::legacy::connect::Connected;
516 use hyper_util::client::legacy::connect::HttpConnector;
517 use hyper_util::rt::TokioIo;
518 use opentelemetry_http::Bytes;
519 use opentelemetry_http::HttpError;
520 use opentelemetry_http::Request;
521 use opentelemetry_http::Response;
522 use opentelemetry_http::ResponseExt;
523 use tokio::net::TcpStream;
524 #[cfg(any(
525 target_os = "android",
526 target_os = "linux",
527 target_os = "macos"
528 ))]
529 use tokio_vsock::VsockAddr;
530 #[cfg(any(
531 target_os = "android",
532 target_os = "linux",
533 target_os = "macos"
534 ))]
535 use tokio_vsock::VsockStream;
536
537 use super::OtelSharedRuntime;
538
539 #[derive(Debug, thiserror::Error)]
540 enum Error {
541 #[error(transparent)]
542 StdIo(#[from] std::io::Error),
543 #[error(transparent)]
544 Box(#[from] Box<dyn std::error::Error + Send + Sync>),
545 #[error(transparent)]
546 Tunnel(#[from] deno_net::tunnel::Error),
547 }
548
549 #[derive(Debug, Clone)]
550 enum Connector {
551 Http(HttpsConnector<HttpConnector>),
552 Tunnel(TunnelConnection),
553 #[cfg(any(
554 target_os = "android",
555 target_os = "linux",
556 target_os = "macos"
557 ))]
558 Vsock(VsockAddr),
559 }
560
561 #[allow(clippy::large_enum_variant)]
562 #[pin_project::pin_project(project = IOProj)]
563 enum IO {
564 Tls(#[pin] TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>),
565 Tunnel(#[pin] TunnelStream),
566 #[cfg(any(
567 target_os = "android",
568 target_os = "linux",
569 target_os = "macos"
570 ))]
571 Vsock(#[pin] VsockStream),
572 }
573
574 impl tokio::io::AsyncRead for IO {
575 fn poll_read(
576 self: std::pin::Pin<&mut Self>,
577 cx: &mut std::task::Context<'_>,
578 buf: &mut tokio::io::ReadBuf<'_>,
579 ) -> Poll<std::io::Result<()>> {
580 match self.project() {
581 IOProj::Tls(stream) => stream.poll_read(cx, buf),
582 IOProj::Tunnel(stream) => stream.poll_read(cx, buf),
583 #[cfg(any(
584 target_os = "android",
585 target_os = "linux",
586 target_os = "macos"
587 ))]
588 IOProj::Vsock(stream) => stream.poll_read(cx, buf),
589 }
590 }
591 }
592
593 impl tokio::io::AsyncWrite for IO {
594 fn poll_write(
595 self: std::pin::Pin<&mut Self>,
596 cx: &mut std::task::Context<'_>,
597 buf: &[u8],
598 ) -> Poll<Result<usize, std::io::Error>> {
599 match self.project() {
600 IOProj::Tls(stream) => stream.poll_write(cx, buf),
601 IOProj::Tunnel(stream) => stream.poll_write(cx, buf),
602 #[cfg(any(
603 target_os = "android",
604 target_os = "linux",
605 target_os = "macos"
606 ))]
607 IOProj::Vsock(stream) => stream.poll_write(cx, buf),
608 }
609 }
610
611 fn poll_flush(
612 self: std::pin::Pin<&mut Self>,
613 cx: &mut std::task::Context<'_>,
614 ) -> Poll<Result<(), std::io::Error>> {
615 match self.project() {
616 IOProj::Tls(stream) => stream.poll_flush(cx),
617 IOProj::Tunnel(stream) => stream.poll_flush(cx),
618 #[cfg(any(
619 target_os = "android",
620 target_os = "linux",
621 target_os = "macos"
622 ))]
623 IOProj::Vsock(stream) => stream.poll_flush(cx),
624 }
625 }
626
627 fn poll_shutdown(
628 self: std::pin::Pin<&mut Self>,
629 cx: &mut std::task::Context<'_>,
630 ) -> Poll<Result<(), std::io::Error>> {
631 match self.project() {
632 IOProj::Tls(stream) => stream.poll_shutdown(cx),
633 IOProj::Tunnel(stream) => stream.poll_shutdown(cx),
634 #[cfg(any(
635 target_os = "android",
636 target_os = "linux",
637 target_os = "macos"
638 ))]
639 IOProj::Vsock(stream) => stream.poll_shutdown(cx),
640 }
641 }
642
643 fn is_write_vectored(&self) -> bool {
644 match self {
645 IO::Tls(stream) => stream.is_write_vectored(),
646 IO::Tunnel(stream) => stream.is_write_vectored(),
647 #[cfg(any(
648 target_os = "android",
649 target_os = "linux",
650 target_os = "macos"
651 ))]
652 IO::Vsock(stream) => stream.is_write_vectored(),
653 }
654 }
655
656 fn poll_write_vectored(
657 self: std::pin::Pin<&mut Self>,
658 cx: &mut std::task::Context<'_>,
659 bufs: &[std::io::IoSlice<'_>],
660 ) -> Poll<Result<usize, std::io::Error>> {
661 match self.project() {
662 IOProj::Tls(stream) => stream.poll_write_vectored(cx, bufs),
663 IOProj::Tunnel(stream) => stream.poll_write_vectored(cx, bufs),
664 #[cfg(any(
665 target_os = "android",
666 target_os = "linux",
667 target_os = "macos"
668 ))]
669 IOProj::Vsock(stream) => stream.poll_write_vectored(cx, bufs),
670 }
671 }
672 }
673
674 impl hyper_util::client::legacy::connect::Connection for IO {
675 fn connected(&self) -> Connected {
676 match self {
677 Self::Tls(stream) => stream.connected(),
678 Self::Tunnel(_) => Connected::new().proxy(true),
679 #[cfg(any(
680 target_os = "android",
681 target_os = "linux",
682 target_os = "macos"
683 ))]
684 Self::Vsock(_) => Connected::new().proxy(true),
685 }
686 }
687 }
688
689 impl tower_service::Service<Uri> for Connector {
690 type Response = TokioIo<IO>;
691 type Error = Error;
692 type Future = Pin<
693 Box<
694 dyn std::future::Future<Output = Result<Self::Response, Self::Error>>
695 + Send,
696 >,
697 >;
698
699 fn poll_ready(
700 &mut self,
701 cx: &mut std::task::Context<'_>,
702 ) -> Poll<Result<(), Self::Error>> {
703 match self {
704 Self::Http(c) => c.poll_ready(cx).map_err(Into::into),
705 Self::Tunnel(_) => Poll::Ready(Ok(())),
706 #[cfg(any(
707 target_os = "android",
708 target_os = "linux",
709 target_os = "macos"
710 ))]
711 Self::Vsock(_) => Poll::Ready(Ok(())),
712 }
713 }
714
715 fn call(&mut self, dst: Uri) -> Self::Future {
716 let this = self.clone();
717 Box::pin(async move {
718 match this {
719 Self::Http(mut connector) => {
720 let stream = connector.call(dst).await?;
721 Ok(TokioIo::new(IO::Tls(TokioIo::new(stream))))
722 }
723 Self::Tunnel(listener) => {
724 let stream = listener.create_agent_stream().await?;
725 Ok(TokioIo::new(IO::Tunnel(stream)))
726 }
727 #[cfg(any(
728 target_os = "android",
729 target_os = "linux",
730 target_os = "macos"
731 ))]
732 Self::Vsock(addr) => {
733 let stream = VsockStream::connect(addr).await?;
734 Ok(TokioIo::new(IO::Vsock(stream)))
735 }
736 }
737 })
738 }
739 }
740
741 #[derive(Debug, Clone)]
742 pub struct HyperClient {
743 inner: Client<Connector, Full<Bytes>>,
744 }
745
746 impl HyperClient {
747 pub fn new() -> deno_core::anyhow::Result<Self> {
748 let connector = if let Some(tunnel) = get_tunnel() {
749 Connector::Tunnel(tunnel.clone())
750 } else if let Ok(addr) = std::env::var("OTEL_DENO_VSOCK") {
751 #[cfg(not(any(
752 target_os = "android",
753 target_os = "linux",
754 target_os = "macos"
755 )))]
756 {
757 let _ = addr;
758 deno_core::anyhow::bail!("vsock is not supported on this platform")
759 }
760
761 #[cfg(any(
762 target_os = "android",
763 target_os = "linux",
764 target_os = "macos"
765 ))]
766 {
767 let Some((cid, port)) = addr.split_once(':') else {
768 deno_core::anyhow::bail!("invalid vsock addr");
769 };
770 let cid = if cid == "-1" { u32::MAX } else { cid.parse()? };
771 let port = port.parse()?;
772 let addr = VsockAddr::new(cid, port);
773 Connector::Vsock(addr)
774 }
775 } else {
776 let ca_certs = match std::env::var("OTEL_EXPORTER_OTLP_CERTIFICATE") {
777 Ok(path) => vec![std::fs::read(path)?],
778 _ => vec![],
779 };
780
781 let keys = match (
782 std::env::var("OTEL_EXPORTER_OTLP_CLIENT_KEY"),
783 std::env::var("OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE"),
784 ) {
785 (Ok(key_path), Ok(cert_path)) => {
786 let key = std::fs::read(key_path)?;
787 let cert = std::fs::read(cert_path)?;
788
789 let certs = load_certs(&mut std::io::Cursor::new(cert))?;
790 let key = load_private_keys(&key)?.into_iter().next().unwrap();
791
792 TlsKeys::Static(TlsKey(certs, key))
793 }
794 _ => TlsKeys::Null,
795 };
796
797 let tls_config =
798 create_client_config(deno_tls::TlsClientConfigOptions {
799 root_cert_store: None,
800 ca_certs,
801 unsafely_ignore_certificate_errors: None,
802 unsafely_disable_hostname_verification: false,
803 cert_chain_and_key: keys,
804 socket_use: SocketUse::Http,
805 })?;
806 let mut http_connector = HttpConnector::new();
807 http_connector.enforce_http(false);
808 let connector = HttpsConnector::from((http_connector, tls_config));
809 Connector::Http(connector)
810 };
811
812 Ok(Self {
813 inner: Client::builder(OtelSharedRuntime).build(connector),
814 })
815 }
816 }
817
818 #[async_trait::async_trait]
819 impl opentelemetry_http::HttpClient for HyperClient {
820 async fn send(
821 &self,
822 request: Request<Vec<u8>>,
823 ) -> Result<Response<Bytes>, HttpError> {
824 let (parts, body) = request.into_parts();
825 let request = Request::from_parts(parts, Full::from(body));
826 let response = self.inner.request(request).await?;
827 let (parts, body) = response.into_parts();
828 let body = body.collect().await?.to_bytes();
829 let response = Response::from_parts(parts, body);
830 Ok(response.error_for_status()?)
831 }
832 }
833}
834
835#[derive(Debug)]
836pub struct OtelGlobals {
837 pub span_processor: BatchSpanProcessor<OtelSharedRuntime>,
838 pub log_processor: BatchLogProcessor<OtelSharedRuntime>,
839 pub id_generator: DenoIdGenerator,
840 pub meter_provider: SdkMeterProvider,
841 pub builtin_instrumentation_scope: InstrumentationScope,
842 pub config: OtelConfig,
843}
844
845impl OtelGlobals {
846 pub fn has_tracing(&self) -> bool {
847 self.config.tracing_enabled
848 }
849
850 pub fn has_metrics(&self) -> bool {
851 self.config.metrics_enabled
852 }
853}
854
855pub static OTEL_GLOBALS: OnceCell<OtelGlobals> = OnceCell::new();
856
857pub fn init(
858 rt_config: OtelRuntimeConfig,
859 config: OtelConfig,
860) -> deno_core::anyhow::Result<()> {
861 if !config.metrics_enabled
862 && !config.tracing_enabled
863 && config.console == OtelConsoleConfig::Ignore
864 {
865 return Ok(());
866 }
867
868 let protocol = match env::var("OTEL_EXPORTER_OTLP_PROTOCOL").as_deref() {
872 Ok("http/protobuf") => Protocol::HttpBinary,
873 Ok("http/json") => Protocol::HttpJson,
874 Ok("") | Err(env::VarError::NotPresent) => Protocol::HttpBinary,
875 Ok(protocol) => {
876 return Err(deno_core::anyhow::anyhow!(
877 "Env var OTEL_EXPORTER_OTLP_PROTOCOL specifies an unsupported protocol: {}",
878 protocol
879 ));
880 }
881 Err(err) => {
882 return Err(deno_core::anyhow::anyhow!(
883 "Failed to read env var OTEL_EXPORTER_OTLP_PROTOCOL: {}",
884 err
885 ));
886 }
887 };
888
889 let mut resource = Resource::default();
896
897 resource = resource.merge(&Resource::new(vec![
900 KeyValue::new(PROCESS_RUNTIME_NAME, rt_config.runtime_name),
901 KeyValue::new(PROCESS_RUNTIME_VERSION, rt_config.runtime_version.clone()),
902 KeyValue::new(
903 TELEMETRY_SDK_LANGUAGE,
904 format!(
905 "deno-{}",
906 resource.get(Key::new(TELEMETRY_SDK_LANGUAGE)).unwrap()
907 ),
908 ),
909 KeyValue::new(
910 TELEMETRY_SDK_NAME,
911 format!(
912 "deno-{}",
913 resource.get(Key::new(TELEMETRY_SDK_NAME)).unwrap()
914 ),
915 ),
916 KeyValue::new(
917 TELEMETRY_SDK_VERSION,
918 format!(
919 "{}-{}",
920 rt_config.runtime_version,
921 resource.get(Key::new(TELEMETRY_SDK_VERSION)).unwrap()
922 ),
923 ),
924 ]));
925
926 let client = hyper_client::HyperClient::new()?;
931
932 let span_exporter = HttpExporterBuilder::default()
933 .with_http_client(client.clone())
934 .with_protocol(protocol)
935 .build_span_exporter()?;
936 let mut span_processor =
937 BatchSpanProcessor::builder(span_exporter, OtelSharedRuntime).build();
938 span_processor.set_resource(&resource);
939
940 let temporality_preference =
941 env::var("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE")
942 .ok()
943 .map(|s| s.to_lowercase());
944 let temporality = match temporality_preference.as_deref() {
945 None | Some("cumulative") => Temporality::Cumulative,
946 Some("delta") => Temporality::Delta,
947 Some("lowmemory") => Temporality::LowMemory,
948 Some(other) => {
949 return Err(deno_core::anyhow::anyhow!(
950 "Invalid value for OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: {}",
951 other
952 ));
953 }
954 };
955 let metric_exporter = HttpExporterBuilder::default()
956 .with_http_client(client.clone())
957 .with_protocol(protocol)
958 .build_metrics_exporter(temporality)?;
959 let metric_reader = DenoPeriodicReader::new(metric_exporter);
960 let meter_provider = SdkMeterProvider::builder()
961 .with_reader(metric_reader)
962 .with_resource(resource.clone())
963 .build();
964
965 let log_exporter = HttpExporterBuilder::default()
966 .with_http_client(client)
967 .with_protocol(protocol)
968 .build_log_exporter()?;
969 let log_processor =
970 BatchLogProcessor::builder(log_exporter, OtelSharedRuntime).build();
971 log_processor.set_resource(&resource);
972
973 let builtin_instrumentation_scope =
974 opentelemetry::InstrumentationScope::builder("deno")
975 .with_version(rt_config.runtime_version.clone())
976 .build();
977
978 let id_generator = if let Some(prefix) = config.deterministic_prefix {
979 DenoIdGenerator::deterministic(prefix)
980 } else {
981 DenoIdGenerator::random()
982 };
983
984 OTEL_GLOBALS
985 .set(OtelGlobals {
986 log_processor,
987 span_processor,
988 id_generator,
989 meter_provider,
990 builtin_instrumentation_scope,
991 config,
992 })
993 .map_err(|_| deno_core::anyhow::anyhow!("failed to set otel globals"))?;
994
995 deno_signals::before_exit(before_exit);
996 deno_net::tunnel::disable_before_exit();
997
998 Ok(())
999}
1000
1001fn before_exit() {
1002 log::trace!("deno_telemetry::before_exit");
1003
1004 let Some(OtelGlobals {
1005 span_processor: spans,
1006 log_processor: logs,
1007 meter_provider,
1008 ..
1009 }) = OTEL_GLOBALS.get()
1010 else {
1011 return;
1012 };
1013
1014 let r = spans.shutdown();
1015 log::trace!("spans={:?}", r);
1016
1017 let r = logs.shutdown();
1018 log::trace!("logs={:?}", r);
1019
1020 let r = meter_provider.shutdown();
1021 log::trace!("meters={:?}", r);
1022
1023 deno_net::tunnel::before_exit();
1024}
1025
1026pub fn handle_log(record: &log::Record) {
1027 use log::Level;
1028
1029 let Some(OtelGlobals {
1030 log_processor: logs,
1031 builtin_instrumentation_scope,
1032 ..
1033 }) = OTEL_GLOBALS.get()
1034 else {
1035 return;
1036 };
1037
1038 let mut log_record = LogRecord::default();
1039
1040 let now = SystemTime::now();
1041 log_record.set_timestamp(now);
1042 log_record.set_observed_timestamp(now);
1043 log_record.set_severity_number(match record.level() {
1044 Level::Error => Severity::Error,
1045 Level::Warn => Severity::Warn,
1046 Level::Info => Severity::Info,
1047 Level::Debug => Severity::Debug,
1048 Level::Trace => Severity::Trace,
1049 });
1050 log_record.set_severity_text(record.level().as_str());
1051 log_record.set_body(record.args().to_string().into());
1052 log_record.set_target(record.metadata().target().to_string());
1053
1054 struct Visitor<'s>(&'s mut LogRecord);
1055
1056 impl<'kvs> log::kv::VisitSource<'kvs> for Visitor<'_> {
1057 fn visit_pair(
1058 &mut self,
1059 key: log::kv::Key<'kvs>,
1060 value: log::kv::Value<'kvs>,
1061 ) -> Result<(), log::kv::Error> {
1062 #[allow(clippy::manual_map)]
1063 let value = if let Some(v) = value.to_bool() {
1064 Some(AnyValue::Boolean(v))
1065 } else if let Some(v) = value.to_borrowed_str() {
1066 Some(AnyValue::String(v.to_owned().into()))
1067 } else if let Some(v) = value.to_f64() {
1068 Some(AnyValue::Double(v))
1069 } else if let Some(v) = value.to_i64() {
1070 Some(AnyValue::Int(v))
1071 } else {
1072 None
1073 };
1074
1075 if let Some(value) = value {
1076 let key = Key::from(key.as_str().to_owned());
1077 self.0.add_attribute(key, value);
1078 }
1079
1080 Ok(())
1081 }
1082 }
1083
1084 let _ = record.key_values().visit(&mut Visitor(&mut log_record));
1085
1086 logs.emit(&mut log_record, builtin_instrumentation_scope);
1087}
1088
1089#[derive(Debug)]
1090pub enum DenoIdGenerator {
1091 Random(RandomIdGenerator),
1092 Deterministic {
1093 next_trace_id: AtomicU64,
1094 next_span_id: AtomicU64,
1095 },
1096}
1097
1098impl IdGenerator for DenoIdGenerator {
1099 fn new_trace_id(&self) -> TraceId {
1100 match self {
1101 Self::Random(generator) => generator.new_trace_id(),
1102 Self::Deterministic { next_trace_id, .. } => {
1103 let id =
1104 next_trace_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1105 let bytes = id.to_be_bytes();
1106 let bytes = [
1107 0, 0, 0, 0, 0, 0, 0, 0, bytes[0], bytes[1], bytes[2], bytes[3],
1108 bytes[4], bytes[5], bytes[6], bytes[7],
1109 ];
1110 TraceId::from_bytes(bytes)
1111 }
1112 }
1113 }
1114
1115 fn new_span_id(&self) -> SpanId {
1116 match self {
1117 Self::Random(generator) => generator.new_span_id(),
1118 Self::Deterministic { next_span_id, .. } => {
1119 let id =
1120 next_span_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1121 SpanId::from_bytes(id.to_be_bytes())
1122 }
1123 }
1124 }
1125}
1126
1127impl DenoIdGenerator {
1128 fn random() -> Self {
1129 Self::Random(RandomIdGenerator::default())
1130 }
1131
1132 fn deterministic(prefix: u8) -> Self {
1133 let prefix = u64::from(prefix) << 56;
1134 Self::Deterministic {
1135 next_trace_id: AtomicU64::new(prefix + 1),
1136 next_span_id: AtomicU64::new(prefix + 1),
1137 }
1138 }
1139}
1140
1141fn parse_trace_id(
1142 scope: &mut v8::PinScope<'_, '_>,
1143 trace_id: v8::Local<'_, v8::Value>,
1144) -> TraceId {
1145 if let Ok(string) = trace_id.try_cast() {
1146 let value_view = v8::ValueView::new(scope, string);
1147 match value_view.data() {
1148 v8::ValueViewData::OneByte(bytes) => {
1149 TraceId::from_hex(&String::from_utf8_lossy(bytes))
1150 .unwrap_or(TraceId::INVALID)
1151 }
1152
1153 _ => TraceId::INVALID,
1154 }
1155 } else if let Ok(uint8array) = trace_id.try_cast::<v8::Uint8Array>() {
1156 let data = uint8array.data();
1157 let byte_length = uint8array.byte_length();
1158 if byte_length != 16 {
1159 return TraceId::INVALID;
1160 }
1161 let bytes = unsafe { &*(data as *const u8 as *const [u8; 16]) };
1164 TraceId::from_bytes(*bytes)
1165 } else {
1166 TraceId::INVALID
1167 }
1168}
1169
1170fn parse_span_id(
1171 scope: &mut v8::PinScope<'_, '_>,
1172 span_id: v8::Local<'_, v8::Value>,
1173) -> SpanId {
1174 if let Ok(string) = span_id.try_cast() {
1175 let value_view = v8::ValueView::new(scope, string);
1176 match value_view.data() {
1177 v8::ValueViewData::OneByte(bytes) => {
1178 SpanId::from_hex(&String::from_utf8_lossy(bytes))
1179 .unwrap_or(SpanId::INVALID)
1180 }
1181 _ => SpanId::INVALID,
1182 }
1183 } else if let Ok(uint8array) = span_id.try_cast::<v8::Uint8Array>() {
1184 let data = uint8array.data();
1185 let byte_length = uint8array.byte_length();
1186 if byte_length != 8 {
1187 return SpanId::INVALID;
1188 }
1189 let bytes = unsafe { &*(data as *const u8 as *const [u8; 8]) };
1192 SpanId::from_bytes(*bytes)
1193 } else {
1194 SpanId::INVALID
1195 }
1196}
1197
1198macro_rules! attr_raw {
1199 ($scope:ident, $name:expr, $value:expr) => {{
1200 let name = if let Ok(name) = $name.try_cast() {
1201 let view = v8::ValueView::new($scope, name);
1202 match view.data() {
1203 v8::ValueViewData::OneByte(bytes) => {
1204 Some(String::from_utf8_lossy(bytes).into_owned())
1205 }
1206 v8::ValueViewData::TwoByte(bytes) => {
1207 Some(String::from_utf16_lossy(bytes))
1208 }
1209 }
1210 } else {
1211 None
1212 };
1213 let value = if let Ok(string) = $value.try_cast::<v8::String>() {
1214 Some(Value::String(StringValue::from({
1215 let x = v8::ValueView::new($scope, string);
1216 match x.data() {
1217 v8::ValueViewData::OneByte(bytes) => {
1218 String::from_utf8_lossy(bytes).into_owned()
1219 }
1220 v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes),
1221 }
1222 })))
1223 } else if let Ok(number) = $value.try_cast::<v8::Number>() {
1224 Some(Value::F64(number.value()))
1225 } else if let Ok(boolean) = $value.try_cast::<v8::Boolean>() {
1226 Some(Value::Bool(boolean.is_true()))
1227 } else if let Ok(bigint) = $value.try_cast::<v8::BigInt>() {
1228 let (i64_value, _lossless) = bigint.i64_value();
1229 Some(Value::I64(i64_value))
1230 } else if let Ok(_array) = $value.try_cast::<v8::Array>() {
1231 None
1233 } else {
1234 None
1235 };
1236 if let (Some(name), Some(value)) = (name, value) {
1237 Some(KeyValue::new(name, value))
1238 } else {
1239 None
1240 }
1241 }};
1242}
1243
1244macro_rules! attr {
1245 ($scope:ident, $attributes:expr $(=> $dropped_attributes_count:expr)?, $name:expr, $value:expr) => {
1246 let attr = attr_raw!($scope, $name, $value);
1247 if let Some(kv) = attr {
1248 $attributes.push(kv);
1249 }
1250 $(
1251 else {
1252 $dropped_attributes_count += 1;
1253 }
1254 )?
1255 };
1256}
1257
1258#[op2(fast)]
1259fn op_otel_log<'s>(
1260 scope: &mut v8::PinScope<'s, '_>,
1261 message: v8::Local<'s, v8::Value>,
1262 #[smi] level: i32,
1263 span: v8::Local<'s, v8::Value>,
1264) {
1265 let Some(OtelGlobals {
1266 log_processor,
1267 builtin_instrumentation_scope,
1268 ..
1269 }) = OTEL_GLOBALS.get()
1270 else {
1271 return;
1272 };
1273
1274 let severity = match level {
1277 ..=0 => Severity::Debug,
1278 1 => Severity::Info,
1279 2 => Severity::Warn,
1280 3 | 5.. => Severity::Error,
1281 4 => Severity::Trace,
1282 };
1283
1284 let mut log_record = LogRecord::default();
1285 let now = SystemTime::now();
1286 log_record.set_timestamp(now);
1287 log_record.set_observed_timestamp(now);
1288 let Ok(message) = message.try_cast() else {
1289 return;
1290 };
1291 log_record.set_body(owned_string(scope, message).into());
1292 log_record.set_severity_number(severity);
1293 log_record.set_severity_text(severity.name());
1294 if let Some(span) =
1295 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1296 {
1297 let state = span.0.borrow();
1298 match &**state {
1299 OtelSpanState::Recording(span) => {
1300 log_record.set_trace_context(
1301 span.span_context.trace_id(),
1302 span.span_context.span_id(),
1303 Some(span.span_context.trace_flags()),
1304 );
1305 }
1306 OtelSpanState::Done(span_context) => {
1307 log_record.set_trace_context(
1308 span_context.trace_id(),
1309 span_context.span_id(),
1310 Some(span_context.trace_flags()),
1311 );
1312 }
1313 }
1314 }
1315
1316 log_processor.emit(&mut log_record, builtin_instrumentation_scope);
1317}
1318
1319#[op2(fast)]
1320fn op_otel_log_foreign(
1321 scope: &mut v8::PinScope<'_, '_>,
1322 #[string] message: String,
1323 #[smi] level: i32,
1324 trace_id: v8::Local<'_, v8::Value>,
1325 span_id: v8::Local<'_, v8::Value>,
1326 #[smi] trace_flags: u8,
1327) {
1328 let Some(OtelGlobals {
1329 log_processor,
1330 builtin_instrumentation_scope,
1331 ..
1332 }) = OTEL_GLOBALS.get()
1333 else {
1334 return;
1335 };
1336
1337 let severity = match level {
1340 ..=0 => Severity::Debug,
1341 1 => Severity::Info,
1342 2 => Severity::Warn,
1343 3 | 5.. => Severity::Error,
1344 4 => Severity::Trace,
1345 };
1346
1347 let trace_id = parse_trace_id(scope, trace_id);
1348 let span_id = parse_span_id(scope, span_id);
1349
1350 let mut log_record = LogRecord::default();
1351
1352 let now = SystemTime::now();
1353 log_record.set_timestamp(now);
1354 log_record.set_observed_timestamp(now);
1355 log_record.set_body(message.into());
1356 log_record.set_severity_number(severity);
1357 log_record.set_severity_text(severity.name());
1358 if trace_id != TraceId::INVALID && span_id != SpanId::INVALID {
1359 log_record.set_trace_context(
1360 trace_id,
1361 span_id,
1362 Some(TraceFlags::new(trace_flags)),
1363 );
1364 }
1365
1366 log_processor.emit(&mut log_record, builtin_instrumentation_scope);
1367}
1368
1369pub fn report_event(name: &'static str, data: impl std::fmt::Display) {
1370 let Some(OtelGlobals {
1371 log_processor,
1372 builtin_instrumentation_scope,
1373 ..
1374 }) = OTEL_GLOBALS.get()
1375 else {
1376 return;
1377 };
1378
1379 let mut log_record = LogRecord::default();
1380
1381 log_record.set_observed_timestamp(SystemTime::now());
1382 log_record.set_event_name(name);
1383 log_record.set_severity_number(Severity::Trace);
1384 log_record.set_severity_text(Severity::Trace.name());
1385 log_record.set_body(format!("{data}").into());
1386
1387 log_processor.emit(&mut log_record, builtin_instrumentation_scope);
1388}
1389
1390fn owned_string<'s>(
1391 scope: &mut v8::PinScope<'s, '_>,
1392 string: v8::Local<'s, v8::String>,
1393) -> String {
1394 let x = v8::ValueView::new(scope, string);
1395 match x.data() {
1396 v8::ValueViewData::OneByte(bytes) => {
1397 String::from_utf8_lossy(bytes).into_owned()
1398 }
1399 v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes),
1400 }
1401}
1402
1403struct OtelTracer(InstrumentationScope);
1404
1405unsafe impl deno_core::GarbageCollected for OtelTracer {
1407 fn trace(&self, _visitor: &mut deno_core::v8::cppgc::Visitor) {}
1408
1409 fn get_name(&self) -> &'static std::ffi::CStr {
1410 c"OtelTracer"
1411 }
1412}
1413
1414#[op2]
1415impl OtelTracer {
1416 #[constructor]
1417 #[cppgc]
1418 fn new(
1419 #[string] name: String,
1420 #[string] version: Option<String>,
1421 #[string] schema_url: Option<String>,
1422 ) -> OtelTracer {
1423 let mut builder = opentelemetry::InstrumentationScope::builder(name);
1424 if let Some(version) = version {
1425 builder = builder.with_version(version);
1426 }
1427 if let Some(schema_url) = schema_url {
1428 builder = builder.with_schema_url(schema_url);
1429 }
1430 let scope = builder.build();
1431 OtelTracer(scope)
1432 }
1433
1434 #[static_method]
1435 #[cppgc]
1436 fn builtin() -> OtelTracer {
1437 let OtelGlobals {
1438 builtin_instrumentation_scope,
1439 ..
1440 } = OTEL_GLOBALS.get().unwrap();
1441 OtelTracer(builtin_instrumentation_scope.clone())
1442 }
1443
1444 #[cppgc]
1445 fn start_span<'s>(
1446 &self,
1447 scope: &mut v8::PinScope<'s, '_>,
1448 #[cppgc] parent: Option<&OtelSpan>,
1449 name: v8::Local<'s, v8::Value>,
1450 #[smi] span_kind: u8,
1451 start_time: Option<f64>,
1452 #[smi] attribute_count: usize,
1453 ) -> Result<OtelSpan, JsErrorBox> {
1454 let OtelGlobals { id_generator, .. } = OTEL_GLOBALS.get().unwrap();
1455 let span_context;
1456 let parent_span_id;
1457 match parent {
1458 Some(parent) => {
1459 let parent = parent.0.borrow();
1460 let parent_span_context = match &**parent {
1461 OtelSpanState::Recording(span) => &span.span_context,
1462 OtelSpanState::Done(span_context) => span_context,
1463 };
1464 span_context = SpanContext::new(
1465 parent_span_context.trace_id(),
1466 id_generator.new_span_id(),
1467 TraceFlags::SAMPLED,
1468 false,
1469 parent_span_context.trace_state().clone(),
1470 );
1471 parent_span_id = parent_span_context.span_id();
1472 }
1473 None => {
1474 span_context = SpanContext::new(
1475 id_generator.new_trace_id(),
1476 id_generator.new_span_id(),
1477 TraceFlags::SAMPLED,
1478 false,
1479 TraceState::NONE,
1480 );
1481 parent_span_id = SpanId::INVALID;
1482 }
1483 }
1484 let name = owned_string(
1485 scope,
1486 name
1487 .try_cast()
1488 .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
1489 );
1490 let span_kind = match span_kind {
1491 0 => SpanKind::Internal,
1492 1 => SpanKind::Server,
1493 2 => SpanKind::Client,
1494 3 => SpanKind::Producer,
1495 4 => SpanKind::Consumer,
1496 _ => return Err(JsErrorBox::generic("invalid span kind")),
1497 };
1498 let start_time = start_time
1499 .map(|start_time| {
1500 SystemTime::UNIX_EPOCH
1501 .checked_add(std::time::Duration::from_secs_f64(start_time / 1000.0))
1502 .ok_or_else(|| JsErrorBox::generic("invalid start time"))
1503 })
1504 .unwrap_or_else(|| Ok(SystemTime::now()))?;
1505 let span_data = SpanData {
1506 span_context,
1507 parent_span_id,
1508 span_kind,
1509 name: Cow::Owned(name),
1510 start_time,
1511 end_time: SystemTime::UNIX_EPOCH,
1512 attributes: Vec::with_capacity(attribute_count),
1513 dropped_attributes_count: 0,
1514 status: SpanStatus::Unset,
1515 events: SpanEvents::default(),
1516 links: SpanLinks::default(),
1517 instrumentation_scope: self.0.clone(),
1518 };
1519 Ok(OtelSpan(RefCell::new(Box::new(OtelSpanState::Recording(
1520 span_data,
1521 )))))
1522 }
1523
1524 #[cppgc]
1525 fn start_span_foreign<'s>(
1526 &self,
1527 scope: &mut v8::PinScope<'s, '_>,
1528 parent_trace_id: v8::Local<'s, v8::Value>,
1529 parent_span_id: v8::Local<'s, v8::Value>,
1530 name: v8::Local<'s, v8::Value>,
1531 #[smi] span_kind: u8,
1532 start_time: Option<f64>,
1533 #[smi] attribute_count: usize,
1534 ) -> Result<OtelSpan, JsErrorBox> {
1535 let parent_trace_id = parse_trace_id(scope, parent_trace_id);
1536 if parent_trace_id == TraceId::INVALID {
1537 return Err(JsErrorBox::generic("invalid trace id"));
1538 };
1539 let parent_span_id = parse_span_id(scope, parent_span_id);
1540 if parent_span_id == SpanId::INVALID {
1541 return Err(JsErrorBox::generic("invalid span id"));
1542 };
1543 let OtelGlobals { id_generator, .. } = OTEL_GLOBALS.get().unwrap();
1544 let span_context = SpanContext::new(
1545 parent_trace_id,
1546 id_generator.new_span_id(),
1547 TraceFlags::SAMPLED,
1548 false,
1549 TraceState::NONE,
1550 );
1551 let name = owned_string(
1552 scope,
1553 name
1554 .try_cast()
1555 .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
1556 );
1557 let span_kind = match span_kind {
1558 0 => SpanKind::Internal,
1559 1 => SpanKind::Server,
1560 2 => SpanKind::Client,
1561 3 => SpanKind::Producer,
1562 4 => SpanKind::Consumer,
1563 _ => return Err(JsErrorBox::generic("invalid span kind")),
1564 };
1565 let start_time = start_time
1566 .map(|start_time| {
1567 SystemTime::UNIX_EPOCH
1568 .checked_add(std::time::Duration::from_secs_f64(start_time / 1000.0))
1569 .ok_or_else(|| JsErrorBox::generic("invalid start time"))
1570 })
1571 .unwrap_or_else(|| Ok(SystemTime::now()))?;
1572 let span_data = SpanData {
1573 span_context,
1574 parent_span_id,
1575 span_kind,
1576 name: Cow::Owned(name),
1577 start_time,
1578 end_time: SystemTime::UNIX_EPOCH,
1579 attributes: Vec::with_capacity(attribute_count),
1580 dropped_attributes_count: 0,
1581 status: SpanStatus::Unset,
1582 events: SpanEvents::default(),
1583 links: SpanLinks::default(),
1584 instrumentation_scope: self.0.clone(),
1585 };
1586 Ok(OtelSpan(RefCell::new(Box::new(OtelSpanState::Recording(
1587 span_data,
1588 )))))
1589 }
1590}
1591
1592#[derive(Serialize)]
1593#[serde(rename_all = "camelCase")]
1594struct JsSpanContext {
1595 trace_id: Box<str>,
1596 span_id: Box<str>,
1597 trace_flags: u8,
1598}
1599
1600#[derive(Debug, Error, JsError)]
1601#[error("OtelSpan cannot be constructed.")]
1602#[class(type)]
1603struct OtelSpanCannotBeConstructedError;
1604
1605#[derive(Debug, Error, JsError)]
1606#[error("invalid span status code")]
1607#[class(type)]
1608struct InvalidSpanStatusCodeError;
1609
1610#[derive(Debug)]
1612struct OtelSpan(RefCell<Box<OtelSpanState>>);
1613
1614#[derive(Debug)]
1615#[allow(clippy::large_enum_variant)]
1616enum OtelSpanState {
1617 Recording(SpanData),
1618 Done(SpanContext),
1619}
1620
1621unsafe impl deno_core::GarbageCollected for OtelSpan {
1623 fn trace(&self, _visitor: &mut deno_core::v8::cppgc::Visitor) {}
1624
1625 fn get_name(&self) -> &'static std::ffi::CStr {
1626 c"OtelSpan"
1627 }
1628}
1629
1630#[op2]
1631impl OtelSpan {
1632 #[constructor]
1633 #[cppgc]
1634 fn new() -> Result<OtelSpan, OtelSpanCannotBeConstructedError> {
1635 Err(OtelSpanCannotBeConstructedError)
1636 }
1637
1638 #[serde]
1639 fn span_context(&self) -> JsSpanContext {
1640 let state = self.0.borrow();
1641 let span_context = match &**state {
1642 OtelSpanState::Recording(span) => &span.span_context,
1643 OtelSpanState::Done(span_context) => span_context,
1644 };
1645 JsSpanContext {
1646 trace_id: format!("{:?}", span_context.trace_id()).into(),
1647 span_id: format!("{:?}", span_context.span_id()).into(),
1648 trace_flags: span_context.trace_flags().to_u8(),
1649 }
1650 }
1651
1652 #[fast]
1653 fn set_status<'s>(
1654 &self,
1655 #[smi] status: u8,
1656 #[string] error_description: String,
1657 ) -> Result<(), InvalidSpanStatusCodeError> {
1658 let mut state = self.0.borrow_mut();
1659 let OtelSpanState::Recording(span) = &mut **state else {
1660 return Ok(());
1661 };
1662 span.status = match status {
1663 0 => SpanStatus::Unset,
1664 1 => SpanStatus::Ok,
1665 2 => SpanStatus::Error {
1666 description: Cow::Owned(error_description),
1667 },
1668 _ => return Err(InvalidSpanStatusCodeError),
1669 };
1670 Ok(())
1671 }
1672
1673 #[fast]
1674 fn add_event(&self, #[string] name: String, start_time: f64) {
1675 let start_time = if start_time.is_nan() {
1676 SystemTime::now()
1677 } else {
1678 SystemTime::UNIX_EPOCH
1679 .checked_add(Duration::from_secs_f64(start_time / 1000.0))
1680 .unwrap()
1681 };
1682 let mut state = self.0.borrow_mut();
1683 let OtelSpanState::Recording(span) = &mut **state else {
1684 return;
1685 };
1686 span
1687 .events
1688 .events
1689 .push(Event::new(name, start_time, vec![], 0));
1690 }
1691
1692 #[fast]
1693 fn drop_event(&self) {
1694 let mut state = self.0.borrow_mut();
1695 match &mut **state {
1696 OtelSpanState::Recording(span) => {
1697 span.events.dropped_count += 1;
1698 }
1699 OtelSpanState::Done(_) => {}
1700 }
1701 }
1702
1703 #[fast]
1704 fn end(&self, end_time: f64) {
1705 let end_time = if end_time.is_nan() {
1706 SystemTime::now()
1707 } else {
1708 SystemTime::UNIX_EPOCH
1709 .checked_add(Duration::from_secs_f64(end_time / 1000.0))
1710 .unwrap()
1711 };
1712
1713 let mut state = self.0.borrow_mut();
1714 if let OtelSpanState::Recording(span) = &mut **state {
1715 let span_context = span.span_context.clone();
1716 if let OtelSpanState::Recording(mut span) = *std::mem::replace(
1717 &mut *state,
1718 Box::new(OtelSpanState::Done(span_context)),
1719 ) {
1720 span.end_time = end_time;
1721 let Some(OtelGlobals { span_processor, .. }) = OTEL_GLOBALS.get()
1722 else {
1723 return;
1724 };
1725 span_processor.on_end(span);
1726 }
1727 }
1728 }
1729}
1730
1731fn span_attributes(
1732 span: &mut SpanData,
1733 location: u32,
1734) -> Option<(&mut Vec<KeyValue>, &mut u32)> {
1735 match location {
1736 0 => Some((&mut span.attributes, &mut span.dropped_attributes_count)),
1738 1 => span
1740 .events
1741 .events
1742 .last_mut()
1743 .map(|e| (&mut e.attributes, &mut e.dropped_attributes_count)),
1744 2 => span
1746 .links
1747 .links
1748 .last_mut()
1749 .map(|e| (&mut e.attributes, &mut e.dropped_attributes_count)),
1750 _ => None,
1751 }
1752}
1753
1754#[op2(fast)]
1755fn op_otel_span_attribute1<'s>(
1756 scope: &mut v8::PinScope<'s, '_>,
1757 span: v8::Local<'_, v8::Value>,
1758 #[smi] location: u32,
1759 key: v8::Local<'s, v8::Value>,
1760 value: v8::Local<'s, v8::Value>,
1761) {
1762 let Some(span) =
1763 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1764 else {
1765 return;
1766 };
1767 let mut state = span.0.borrow_mut();
1768 if let OtelSpanState::Recording(span) = &mut **state {
1769 let Some((attributes, dropped_attributes_count)) =
1770 span_attributes(span, location)
1771 else {
1772 return;
1773 };
1774 attr!(scope, attributes => *dropped_attributes_count, key, value);
1775 }
1776}
1777
1778#[op2(fast)]
1779fn op_otel_span_attribute2<'s>(
1780 scope: &mut v8::PinScope<'s, '_>,
1781 span: v8::Local<'_, v8::Value>,
1782 #[smi] location: u32,
1783 key1: v8::Local<'s, v8::Value>,
1784 value1: v8::Local<'s, v8::Value>,
1785 key2: v8::Local<'s, v8::Value>,
1786 value2: v8::Local<'s, v8::Value>,
1787) {
1788 let Some(span) =
1789 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1790 else {
1791 return;
1792 };
1793 let mut state = span.0.borrow_mut();
1794 if let OtelSpanState::Recording(span) = &mut **state {
1795 let Some((attributes, dropped_attributes_count)) =
1796 span_attributes(span, location)
1797 else {
1798 return;
1799 };
1800 attr!(scope, attributes => *dropped_attributes_count, key1, value1);
1801 attr!(scope, attributes => *dropped_attributes_count, key2, value2);
1802 }
1803}
1804
1805#[allow(clippy::too_many_arguments)]
1806#[op2(fast)]
1807fn op_otel_span_attribute3<'s>(
1808 scope: &mut v8::PinScope<'s, '_>,
1809 span: v8::Local<'_, v8::Value>,
1810 #[smi] location: u32,
1811 key1: v8::Local<'s, v8::Value>,
1812 value1: v8::Local<'s, v8::Value>,
1813 key2: v8::Local<'s, v8::Value>,
1814 value2: v8::Local<'s, v8::Value>,
1815 key3: v8::Local<'s, v8::Value>,
1816 value3: v8::Local<'s, v8::Value>,
1817) {
1818 let Some(span) =
1819 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1820 else {
1821 return;
1822 };
1823 let mut state = span.0.borrow_mut();
1824 if let OtelSpanState::Recording(span) = &mut **state {
1825 let Some((attributes, dropped_attributes_count)) =
1826 span_attributes(span, location)
1827 else {
1828 return;
1829 };
1830 attr!(scope, attributes => *dropped_attributes_count, key1, value1);
1831 attr!(scope, attributes => *dropped_attributes_count, key2, value2);
1832 attr!(scope, attributes => *dropped_attributes_count, key3, value3);
1833 }
1834}
1835
1836#[op2(fast)]
1837fn op_otel_span_update_name<'s>(
1838 scope: &mut v8::PinScope<'s, '_>,
1839 span: v8::Local<'s, v8::Value>,
1840 name: v8::Local<'s, v8::Value>,
1841) {
1842 let Ok(name) = name.try_cast() else {
1843 return;
1844 };
1845 let name = owned_string(scope, name);
1846 let Some(span) =
1847 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1848 else {
1849 return;
1850 };
1851 let mut state = span.0.borrow_mut();
1852 if let OtelSpanState::Recording(span) = &mut **state {
1853 span.name = Cow::Owned(name)
1854 }
1855}
1856
1857#[op2(fast)]
1858fn op_otel_span_add_link<'s>(
1859 scope: &mut v8::PinScope<'s, '_>,
1860 span: v8::Local<'s, v8::Value>,
1861 trace_id: v8::Local<'s, v8::Value>,
1862 span_id: v8::Local<'s, v8::Value>,
1863 #[smi] trace_flags: u8,
1864 is_remote: bool,
1865 #[smi] dropped_attributes_count: u32,
1866) -> bool {
1867 let trace_id = parse_trace_id(scope, trace_id);
1868 if trace_id == TraceId::INVALID {
1869 return false;
1870 };
1871 let span_id = parse_span_id(scope, span_id);
1872 if span_id == SpanId::INVALID {
1873 return false;
1874 };
1875 let span_context = SpanContext::new(
1876 trace_id,
1877 span_id,
1878 TraceFlags::new(trace_flags),
1879 is_remote,
1880 TraceState::NONE,
1881 );
1882
1883 let Some(span) =
1884 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1885 else {
1886 return true;
1887 };
1888 let mut state = span.0.borrow_mut();
1889 if let OtelSpanState::Recording(span) = &mut **state {
1890 span.links.links.push(Link::new(
1891 span_context,
1892 vec![],
1893 dropped_attributes_count,
1894 ));
1895 }
1896 true
1897}
1898
1899struct OtelMeter(opentelemetry::metrics::Meter);
1900
1901unsafe impl deno_core::GarbageCollected for OtelMeter {
1903 fn trace(&self, _visitor: &mut deno_core::v8::cppgc::Visitor) {}
1904
1905 fn get_name(&self) -> &'static std::ffi::CStr {
1906 c"OtelMeter"
1907 }
1908}
1909
1910#[op2]
1911impl OtelMeter {
1912 #[constructor]
1913 #[cppgc]
1914 fn new(
1915 #[string] name: String,
1916 #[string] version: Option<String>,
1917 #[string] schema_url: Option<String>,
1918 ) -> OtelMeter {
1919 let mut builder = opentelemetry::InstrumentationScope::builder(name);
1920 if let Some(version) = version {
1921 builder = builder.with_version(version);
1922 }
1923 if let Some(schema_url) = schema_url {
1924 builder = builder.with_schema_url(schema_url);
1925 }
1926 let scope = builder.build();
1927 let meter = OTEL_GLOBALS
1928 .get()
1929 .unwrap()
1930 .meter_provider
1931 .meter_with_scope(scope);
1932 OtelMeter(meter)
1933 }
1934
1935 #[cppgc]
1936 fn create_counter<'s>(
1937 &self,
1938 scope: &mut v8::PinScope<'s, '_>,
1939 name: v8::Local<'s, v8::Value>,
1940 description: v8::Local<'s, v8::Value>,
1941 unit: v8::Local<'s, v8::Value>,
1942 ) -> Result<Instrument, JsErrorBox> {
1943 create_instrument(
1944 |name| self.0.f64_counter(name),
1945 |i| Instrument::Counter(i.build()),
1946 scope,
1947 name,
1948 description,
1949 unit,
1950 )
1951 .map_err(|e| JsErrorBox::generic(e.to_string()))
1952 }
1953
1954 #[cppgc]
1955 fn create_up_down_counter<'s>(
1956 &self,
1957 scope: &mut v8::PinScope<'s, '_>,
1958 name: v8::Local<'s, v8::Value>,
1959 description: v8::Local<'s, v8::Value>,
1960 unit: v8::Local<'s, v8::Value>,
1961 ) -> Result<Instrument, JsErrorBox> {
1962 create_instrument(
1963 |name| self.0.f64_up_down_counter(name),
1964 |i| Instrument::UpDownCounter(i.build()),
1965 scope,
1966 name,
1967 description,
1968 unit,
1969 )
1970 .map_err(|e| JsErrorBox::generic(e.to_string()))
1971 }
1972
1973 #[cppgc]
1974 fn create_gauge<'s>(
1975 &self,
1976 scope: &mut v8::PinScope<'s, '_>,
1977 name: v8::Local<'s, v8::Value>,
1978 description: v8::Local<'s, v8::Value>,
1979 unit: v8::Local<'s, v8::Value>,
1980 ) -> Result<Instrument, JsErrorBox> {
1981 create_instrument(
1982 |name| self.0.f64_gauge(name),
1983 |i| Instrument::Gauge(i.build()),
1984 scope,
1985 name,
1986 description,
1987 unit,
1988 )
1989 .map_err(|e| JsErrorBox::generic(e.to_string()))
1990 }
1991
1992 #[cppgc]
1993 fn create_histogram<'s>(
1994 &self,
1995 scope: &mut v8::PinScope<'s, '_>,
1996 name: v8::Local<'s, v8::Value>,
1997 description: v8::Local<'s, v8::Value>,
1998 unit: v8::Local<'s, v8::Value>,
1999 #[serde] boundaries: Option<Vec<f64>>,
2000 ) -> Result<Instrument, JsErrorBox> {
2001 let name = owned_string(
2002 scope,
2003 name
2004 .try_cast()
2005 .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
2006 );
2007 let mut builder = self.0.f64_histogram(name);
2008 if !description.is_null_or_undefined() {
2009 let description = owned_string(
2010 scope,
2011 description
2012 .try_cast()
2013 .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
2014 );
2015 builder = builder.with_description(description);
2016 };
2017 if !unit.is_null_or_undefined() {
2018 let unit = owned_string(
2019 scope,
2020 unit
2021 .try_cast()
2022 .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
2023 );
2024 builder = builder.with_unit(unit);
2025 };
2026 if let Some(boundaries) = boundaries {
2027 builder = builder.with_boundaries(boundaries);
2028 }
2029
2030 Ok(Instrument::Histogram(builder.build()))
2031 }
2032
2033 #[cppgc]
2034 fn create_observable_counter<'s>(
2035 &self,
2036 scope: &mut v8::PinScope<'s, '_>,
2037 name: v8::Local<'s, v8::Value>,
2038 description: v8::Local<'s, v8::Value>,
2039 unit: v8::Local<'s, v8::Value>,
2040 ) -> Result<Instrument, JsErrorBox> {
2041 create_async_instrument(
2042 |name| self.0.f64_observable_counter(name),
2043 |i| {
2044 i.build();
2045 },
2046 scope,
2047 name,
2048 description,
2049 unit,
2050 )
2051 .map_err(|e| JsErrorBox::generic(e.to_string()))
2052 }
2053
2054 #[cppgc]
2055 fn create_observable_up_down_counter<'s>(
2056 &self,
2057 scope: &mut v8::PinScope<'s, '_>,
2058 name: v8::Local<'s, v8::Value>,
2059 description: v8::Local<'s, v8::Value>,
2060 unit: v8::Local<'s, v8::Value>,
2061 ) -> Result<Instrument, JsErrorBox> {
2062 create_async_instrument(
2063 |name| self.0.f64_observable_up_down_counter(name),
2064 |i| {
2065 i.build();
2066 },
2067 scope,
2068 name,
2069 description,
2070 unit,
2071 )
2072 .map_err(|e| JsErrorBox::generic(e.to_string()))
2073 }
2074
2075 #[cppgc]
2076 fn create_observable_gauge<'s>(
2077 &self,
2078 scope: &mut v8::PinScope<'s, '_>,
2079 name: v8::Local<'s, v8::Value>,
2080 description: v8::Local<'s, v8::Value>,
2081 unit: v8::Local<'s, v8::Value>,
2082 ) -> Result<Instrument, JsErrorBox> {
2083 create_async_instrument(
2084 |name| self.0.f64_observable_gauge(name),
2085 |i| {
2086 i.build();
2087 },
2088 scope,
2089 name,
2090 description,
2091 unit,
2092 )
2093 .map_err(|e| JsErrorBox::generic(e.to_string()))
2094 }
2095}
2096
2097enum Instrument {
2098 Counter(opentelemetry::metrics::Counter<f64>),
2099 UpDownCounter(UpDownCounter<f64>),
2100 Gauge(opentelemetry::metrics::Gauge<f64>),
2101 Histogram(Histogram<f64>),
2102 Observable(Arc<Mutex<HashMap<Vec<KeyValue>, f64>>>),
2103}
2104
2105unsafe impl GarbageCollected for Instrument {
2107 fn trace(&self, _visitor: &mut deno_core::v8::cppgc::Visitor) {}
2108
2109 fn get_name(&self) -> &'static std::ffi::CStr {
2110 c"Instrument"
2111 }
2112}
2113
2114fn create_instrument<'a, 'b, T>(
2115 cb: impl FnOnce(String) -> InstrumentBuilder<'b, T>,
2116 cb2: impl FnOnce(InstrumentBuilder<'b, T>) -> Instrument,
2117 scope: &mut v8::PinScope<'a, '_>,
2118 name: v8::Local<'a, v8::Value>,
2119 description: v8::Local<'a, v8::Value>,
2120 unit: v8::Local<'a, v8::Value>,
2121) -> Result<Instrument, v8::DataError> {
2122 let name = owned_string(scope, name.try_cast()?);
2123 let mut builder = cb(name);
2124 if !description.is_null_or_undefined() {
2125 let description = owned_string(scope, description.try_cast()?);
2126 builder = builder.with_description(description);
2127 };
2128 if !unit.is_null_or_undefined() {
2129 let unit = owned_string(scope, unit.try_cast()?);
2130 builder = builder.with_unit(unit);
2131 };
2132
2133 Ok(cb2(builder))
2134}
2135
2136fn create_async_instrument<'a, 'b, T>(
2137 cb: impl FnOnce(String) -> AsyncInstrumentBuilder<'b, T, f64>,
2138 cb2: impl FnOnce(AsyncInstrumentBuilder<'b, T, f64>),
2139 scope: &mut v8::PinScope<'a, '_>,
2140 name: v8::Local<'a, v8::Value>,
2141 description: v8::Local<'a, v8::Value>,
2142 unit: v8::Local<'a, v8::Value>,
2143) -> Result<Instrument, DataError> {
2144 let name = owned_string(scope, name.try_cast()?);
2145 let mut builder = cb(name);
2146 if !description.is_null_or_undefined() {
2147 let description = owned_string(scope, description.try_cast()?);
2148 builder = builder.with_description(description);
2149 };
2150 if !unit.is_null_or_undefined() {
2151 let unit = owned_string(scope, unit.try_cast()?);
2152 builder = builder.with_unit(unit);
2153 };
2154
2155 let data_share = Arc::new(Mutex::new(HashMap::new()));
2156 let data_share_: Arc<Mutex<HashMap<Vec<KeyValue>, f64>>> = data_share.clone();
2157 builder = builder.with_callback(move |i| {
2158 let data = {
2159 let mut data = data_share_.lock().unwrap();
2160 std::mem::take(&mut *data)
2161 };
2162 for (attributes, value) in data {
2163 i.observe(value, &attributes);
2164 }
2165 });
2166 cb2(builder);
2167
2168 Ok(Instrument::Observable(data_share))
2169}
2170
2171struct MetricAttributes {
2172 attributes: Vec<KeyValue>,
2173}
2174
2175#[op2(fast)]
2176fn op_otel_metric_record0(
2177 state: &mut OpState,
2178 #[cppgc] instrument: &Instrument,
2179 value: f64,
2180) {
2181 let values = state.try_take::<MetricAttributes>();
2182 let attributes = match &values {
2183 Some(values) => &*values.attributes,
2184 None => &[],
2185 };
2186 match instrument {
2187 Instrument::Counter(counter) => counter.add(value, attributes),
2188 Instrument::UpDownCounter(counter) => counter.add(value, attributes),
2189 Instrument::Gauge(gauge) => gauge.record(value, attributes),
2190 Instrument::Histogram(histogram) => histogram.record(value, attributes),
2191 _ => {}
2192 }
2193}
2194
2195#[op2(fast)]
2196fn op_otel_metric_record1(
2197 state: &mut OpState,
2198 scope: &mut v8::PinScope<'_, '_>,
2199 instrument: v8::Local<'_, v8::Value>,
2200 value: f64,
2201 key1: v8::Local<'_, v8::Value>,
2202 value1: v8::Local<'_, v8::Value>,
2203) {
2204 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2205 &mut *scope,
2206 instrument,
2207 ) else {
2208 return;
2209 };
2210 let mut values = state.try_take::<MetricAttributes>();
2211 let attr1 = attr_raw!(scope, key1, value1);
2212 let attributes = match &mut values {
2213 Some(values) => {
2214 if let Some(kv) = attr1 {
2215 values.attributes.reserve_exact(1);
2216 values.attributes.push(kv);
2217 }
2218 &*values.attributes
2219 }
2220 None => match attr1 {
2221 Some(kv1) => &[kv1] as &[KeyValue],
2222 None => &[],
2223 },
2224 };
2225 match &*instrument {
2226 Instrument::Counter(counter) => counter.add(value, attributes),
2227 Instrument::UpDownCounter(counter) => counter.add(value, attributes),
2228 Instrument::Gauge(gauge) => gauge.record(value, attributes),
2229 Instrument::Histogram(histogram) => histogram.record(value, attributes),
2230 _ => {}
2231 }
2232}
2233
2234#[allow(clippy::too_many_arguments)]
2235#[op2(fast)]
2236fn op_otel_metric_record2(
2237 state: &mut OpState,
2238 scope: &mut v8::PinScope<'_, '_>,
2239 instrument: v8::Local<'_, v8::Value>,
2240 value: f64,
2241 key1: v8::Local<'_, v8::Value>,
2242 value1: v8::Local<'_, v8::Value>,
2243 key2: v8::Local<'_, v8::Value>,
2244 value2: v8::Local<'_, v8::Value>,
2245) {
2246 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2247 &mut *scope,
2248 instrument,
2249 ) else {
2250 return;
2251 };
2252 let mut values = state.try_take::<MetricAttributes>();
2253 let attr1 = attr_raw!(scope, key1, value1);
2254 let attr2 = attr_raw!(scope, key2, value2);
2255 let attributes = match &mut values {
2256 Some(values) => {
2257 values.attributes.reserve_exact(2);
2258 if let Some(kv1) = attr1 {
2259 values.attributes.push(kv1);
2260 }
2261 if let Some(kv2) = attr2 {
2262 values.attributes.push(kv2);
2263 }
2264 &*values.attributes
2265 }
2266 None => match (attr1, attr2) {
2267 (Some(kv1), Some(kv2)) => &[kv1, kv2] as &[KeyValue],
2268 (Some(kv1), None) => &[kv1],
2269 (None, Some(kv2)) => &[kv2],
2270 (None, None) => &[],
2271 },
2272 };
2273 match &*instrument {
2274 Instrument::Counter(counter) => counter.add(value, attributes),
2275 Instrument::UpDownCounter(counter) => counter.add(value, attributes),
2276 Instrument::Gauge(gauge) => gauge.record(value, attributes),
2277 Instrument::Histogram(histogram) => histogram.record(value, attributes),
2278 _ => {}
2279 }
2280}
2281
2282#[allow(clippy::too_many_arguments)]
2283#[op2(fast)]
2284fn op_otel_metric_record3(
2285 state: &mut OpState,
2286 scope: &mut v8::PinScope<'_, '_>,
2287 instrument: v8::Local<'_, v8::Value>,
2288 value: f64,
2289 key1: v8::Local<'_, v8::Value>,
2290 value1: v8::Local<'_, v8::Value>,
2291 key2: v8::Local<'_, v8::Value>,
2292 value2: v8::Local<'_, v8::Value>,
2293 key3: v8::Local<'_, v8::Value>,
2294 value3: v8::Local<'_, v8::Value>,
2295) {
2296 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2297 &mut *scope,
2298 instrument,
2299 ) else {
2300 return;
2301 };
2302 let mut values = state.try_take::<MetricAttributes>();
2303 let attr1 = attr_raw!(scope, key1, value1);
2304 let attr2 = attr_raw!(scope, key2, value2);
2305 let attr3 = attr_raw!(scope, key3, value3);
2306 let attributes = match &mut values {
2307 Some(values) => {
2308 values.attributes.reserve_exact(3);
2309 if let Some(kv1) = attr1 {
2310 values.attributes.push(kv1);
2311 }
2312 if let Some(kv2) = attr2 {
2313 values.attributes.push(kv2);
2314 }
2315 if let Some(kv3) = attr3 {
2316 values.attributes.push(kv3);
2317 }
2318 &*values.attributes
2319 }
2320 None => match (attr1, attr2, attr3) {
2321 (Some(kv1), Some(kv2), Some(kv3)) => &[kv1, kv2, kv3] as &[KeyValue],
2322 (Some(kv1), Some(kv2), None) => &[kv1, kv2],
2323 (Some(kv1), None, Some(kv3)) => &[kv1, kv3],
2324 (None, Some(kv2), Some(kv3)) => &[kv2, kv3],
2325 (Some(kv1), None, None) => &[kv1],
2326 (None, Some(kv2), None) => &[kv2],
2327 (None, None, Some(kv3)) => &[kv3],
2328 (None, None, None) => &[],
2329 },
2330 };
2331 match &*instrument {
2332 Instrument::Counter(counter) => counter.add(value, attributes),
2333 Instrument::UpDownCounter(counter) => counter.add(value, attributes),
2334 Instrument::Gauge(gauge) => gauge.record(value, attributes),
2335 Instrument::Histogram(histogram) => histogram.record(value, attributes),
2336 _ => {}
2337 }
2338}
2339
2340#[op2(fast)]
2341fn op_otel_metric_observable_record0(
2342 state: &mut OpState,
2343 #[cppgc] instrument: &Instrument,
2344 value: f64,
2345) {
2346 let values = state.try_take::<MetricAttributes>();
2347 let attributes = values.map(|attr| attr.attributes).unwrap_or_default();
2348 if let Instrument::Observable(data_share) = instrument {
2349 let mut data = data_share.lock().unwrap();
2350 data.insert(attributes, value);
2351 }
2352}
2353
2354#[op2(fast)]
2355fn op_otel_metric_observable_record1(
2356 state: &mut OpState,
2357 scope: &mut v8::PinScope<'_, '_>,
2358 instrument: v8::Local<'_, v8::Value>,
2359 value: f64,
2360 key1: v8::Local<'_, v8::Value>,
2361 value1: v8::Local<'_, v8::Value>,
2362) {
2363 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2364 &mut *scope,
2365 instrument,
2366 ) else {
2367 return;
2368 };
2369 let values = state.try_take::<MetricAttributes>();
2370 let attr1 = attr_raw!(scope, key1, value1);
2371 let mut attributes = values
2372 .map(|mut attr| {
2373 attr.attributes.reserve_exact(1);
2374 attr.attributes
2375 })
2376 .unwrap_or_else(|| Vec::with_capacity(1));
2377 if let Some(kv1) = attr1 {
2378 attributes.push(kv1);
2379 }
2380 if let Instrument::Observable(data_share) = &*instrument {
2381 let mut data = data_share.lock().unwrap();
2382 data.insert(attributes, value);
2383 }
2384}
2385
2386#[allow(clippy::too_many_arguments)]
2387#[op2(fast)]
2388fn op_otel_metric_observable_record2(
2389 state: &mut OpState,
2390 scope: &mut v8::PinScope<'_, '_>,
2391 instrument: v8::Local<'_, v8::Value>,
2392 value: f64,
2393 key1: v8::Local<'_, v8::Value>,
2394 value1: v8::Local<'_, v8::Value>,
2395 key2: v8::Local<'_, v8::Value>,
2396 value2: v8::Local<'_, v8::Value>,
2397) {
2398 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2399 &mut *scope,
2400 instrument,
2401 ) else {
2402 return;
2403 };
2404 let values = state.try_take::<MetricAttributes>();
2405 let mut attributes = values
2406 .map(|mut attr| {
2407 attr.attributes.reserve_exact(2);
2408 attr.attributes
2409 })
2410 .unwrap_or_else(|| Vec::with_capacity(2));
2411 let attr1 = attr_raw!(scope, key1, value1);
2412 let attr2 = attr_raw!(scope, key2, value2);
2413 if let Some(kv1) = attr1 {
2414 attributes.push(kv1);
2415 }
2416 if let Some(kv2) = attr2 {
2417 attributes.push(kv2);
2418 }
2419 if let Instrument::Observable(data_share) = &*instrument {
2420 let mut data = data_share.lock().unwrap();
2421 data.insert(attributes, value);
2422 }
2423}
2424
2425#[allow(clippy::too_many_arguments)]
2426#[op2(fast)]
2427fn op_otel_metric_observable_record3(
2428 state: &mut OpState,
2429 scope: &mut v8::PinScope<'_, '_>,
2430 instrument: v8::Local<'_, v8::Value>,
2431 value: f64,
2432 key1: v8::Local<'_, v8::Value>,
2433 value1: v8::Local<'_, v8::Value>,
2434 key2: v8::Local<'_, v8::Value>,
2435 value2: v8::Local<'_, v8::Value>,
2436 key3: v8::Local<'_, v8::Value>,
2437 value3: v8::Local<'_, v8::Value>,
2438) {
2439 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2440 &mut *scope,
2441 instrument,
2442 ) else {
2443 return;
2444 };
2445 let values = state.try_take::<MetricAttributes>();
2446 let mut attributes = values
2447 .map(|mut attr| {
2448 attr.attributes.reserve_exact(3);
2449 attr.attributes
2450 })
2451 .unwrap_or_else(|| Vec::with_capacity(3));
2452 let attr1 = attr_raw!(scope, key1, value1);
2453 let attr2 = attr_raw!(scope, key2, value2);
2454 let attr3 = attr_raw!(scope, key3, value3);
2455 if let Some(kv1) = attr1 {
2456 attributes.push(kv1);
2457 }
2458 if let Some(kv2) = attr2 {
2459 attributes.push(kv2);
2460 }
2461 if let Some(kv3) = attr3 {
2462 attributes.push(kv3);
2463 }
2464 if let Instrument::Observable(data_share) = &*instrument {
2465 let mut data = data_share.lock().unwrap();
2466 data.insert(attributes, value);
2467 }
2468}
2469
2470#[allow(clippy::too_many_arguments)]
2471#[op2(fast)]
2472fn op_otel_metric_attribute3<'s>(
2473 scope: &mut v8::PinScope<'s, '_>,
2474 state: &mut OpState,
2475 #[smi] capacity: u32,
2476 key1: v8::Local<'s, v8::Value>,
2477 value1: v8::Local<'s, v8::Value>,
2478 key2: v8::Local<'s, v8::Value>,
2479 value2: v8::Local<'s, v8::Value>,
2480 key3: v8::Local<'s, v8::Value>,
2481 value3: v8::Local<'s, v8::Value>,
2482) {
2483 let mut values = state.try_borrow_mut::<MetricAttributes>();
2484 let attr1 = attr_raw!(scope, key1, value1);
2485 let attr2 = attr_raw!(scope, key2, value2);
2486 let attr3 = attr_raw!(scope, key3, value3);
2487 if let Some(values) = &mut values {
2488 values.attributes.reserve_exact(
2489 (capacity as usize).saturating_sub(values.attributes.capacity()),
2490 );
2491 if let Some(kv1) = attr1 {
2492 values.attributes.push(kv1);
2493 }
2494 if let Some(kv2) = attr2 {
2495 values.attributes.push(kv2);
2496 }
2497 if let Some(kv3) = attr3 {
2498 values.attributes.push(kv3);
2499 }
2500 } else {
2501 let mut attributes = Vec::with_capacity(capacity as usize);
2502 if let Some(kv1) = attr1 {
2503 attributes.push(kv1);
2504 }
2505 if let Some(kv2) = attr2 {
2506 attributes.push(kv2);
2507 }
2508 if let Some(kv3) = attr3 {
2509 attributes.push(kv3);
2510 }
2511 state.put(MetricAttributes { attributes });
2512 }
2513}
2514
2515struct ObservationDone(oneshot::Sender<()>);
2516
2517#[op2]
2518async fn op_otel_metric_wait_to_observe(state: Rc<RefCell<OpState>>) -> bool {
2519 let (tx, rx) = oneshot::channel();
2520 {
2521 OTEL_PRE_COLLECT_CALLBACKS
2522 .lock()
2523 .expect("mutex poisoned")
2524 .push(tx);
2525 }
2526 match rx.await {
2527 Ok(done) => {
2528 state.borrow_mut().put(ObservationDone(done));
2529 true
2530 }
2531 _ => false,
2532 }
2533}
2534
2535#[op2(fast)]
2536fn op_otel_metric_observation_done(state: &mut OpState) {
2537 if let Some(ObservationDone(done)) = state.try_take::<ObservationDone>() {
2538 let _ = done.send(());
2539 }
2540}
2541
2542struct GcMetricDataInner {
2543 start: Instant,
2544 duration: Histogram<f64>,
2545}
2546
2547struct GcMetricData(RefCell<GcMetricDataInner>);
2548
2549impl GcMetricData {
2550 extern "C" fn prologue_callback(
2551 isolate: v8::UnsafeRawIsolatePtr,
2552 _gc_type: v8::GCType,
2553 _flags: v8::GCCallbackFlags,
2554 _data: *mut c_void,
2555 ) {
2556 let isolate =
2558 unsafe { v8::Isolate::from_raw_isolate_ptr_unchecked(isolate) };
2559 let this = isolate.get_slot::<Self>().unwrap();
2560 this.0.borrow_mut().start = Instant::now();
2561 }
2562
2563 extern "C" fn epilogue_callback(
2564 isolate: v8::UnsafeRawIsolatePtr,
2565 gc_type: v8::GCType,
2566 _flags: v8::GCCallbackFlags,
2567 _data: *mut c_void,
2568 ) {
2569 let isolate =
2571 unsafe { v8::Isolate::from_raw_isolate_ptr_unchecked(isolate) };
2572 let this = isolate.get_slot::<Self>().unwrap();
2573 let this = this.0.borrow_mut();
2574
2575 let elapsed = this.start.elapsed();
2576
2577 let gc_type = KeyValue::new(
2579 "v8js.gc.type",
2580 match gc_type {
2581 v8::GCType::kGCTypeScavenge => "minor",
2582 v8::GCType::kGCTypeMinorMarkSweep => "minor",
2583 v8::GCType::kGCTypeMarkSweepCompact => "major",
2584 v8::GCType::kGCTypeIncrementalMarking => "incremental",
2585 v8::GCType::kGCTypeProcessWeakCallbacks => "weakcb",
2586 _ => return,
2587 },
2588 );
2589
2590 this.duration.record(elapsed.as_secs_f64(), &[gc_type]);
2591 }
2592}
2593
2594#[derive(Clone)]
2595struct HeapMetricData {
2596 heap_limit: Gauge<u64>,
2597 heap_size: Gauge<u64>,
2598 available_size: Gauge<u64>,
2599 physical_size: Gauge<u64>,
2600}
2601
2602#[op2(fast)]
2603fn op_otel_enable_isolate_metrics(scope: &mut v8::PinScope<'_, '_>) {
2604 if scope.get_slot::<GcMetricData>().is_some() {
2605 return;
2606 }
2607
2608 let meter = OTEL_GLOBALS.get().unwrap().meter_provider.meter("v8js");
2609
2610 let duration = meter
2612 .f64_histogram("v8js.gc.duration")
2613 .with_unit("S")
2614 .with_description("Garbage collection duration")
2615 .with_boundaries(vec![0.01, 0.1, 1.0, 10.0])
2616 .build();
2617
2618 scope.set_slot(GcMetricData(RefCell::new(GcMetricDataInner {
2619 start: Instant::now(),
2620 duration,
2621 })));
2622
2623 scope.add_gc_prologue_callback(
2624 GcMetricData::prologue_callback,
2625 std::ptr::null_mut(),
2626 v8::GCType::kGCTypeAll,
2627 );
2628
2629 scope.add_gc_epilogue_callback(
2630 GcMetricData::epilogue_callback,
2631 std::ptr::null_mut(),
2632 v8::GCType::kGCTypeAll,
2633 );
2634
2635 let heap_limit = meter
2636 .u64_gauge("v8js.memory.heap.limit")
2637 .with_unit("By")
2638 .with_description("Total heap memory size pre-allocated.")
2639 .build();
2640 let heap_size = meter
2641 .u64_gauge("v8js.memory.heap.size")
2642 .with_unit("By")
2643 .with_description("Heap Memory size allocated.")
2644 .build();
2645 let available_size = meter
2646 .u64_gauge("v8js.memory.space.available_size")
2647 .with_unit("By")
2648 .with_description("Heap space available size.")
2649 .build();
2650 let physical_size = meter
2651 .u64_gauge("v8js.memory.space.physical_size")
2652 .with_unit("By")
2653 .with_description("Committed size of a heap space.")
2654 .build();
2655
2656 scope.set_slot(HeapMetricData {
2657 heap_limit,
2658 heap_size,
2659 available_size,
2660 physical_size,
2661 });
2662}
2663
2664#[op2(fast)]
2665fn op_otel_collect_isolate_metrics(scope: &mut v8::PinScope<'_, '_>) {
2666 let data = scope.get_slot::<HeapMetricData>().unwrap().clone();
2667 for i in 0..scope.get_number_of_data_slots() {
2668 let Some(space) = scope.get_heap_space_statistics(i as _) else {
2669 continue;
2670 };
2671 let space_name: &'static std::ffi::CStr =
2674 unsafe { std::ffi::CStr::from_ptr(space.space_name().as_ptr()) };
2675 let Ok(space_name) = space_name.to_str() else {
2676 continue;
2677 };
2678 let attributes = [KeyValue::new("v8js.heap.space.name", space_name)];
2679 data.heap_limit.record(space.space_size() as _, &attributes);
2680 data
2681 .heap_size
2682 .record(space.space_used_size() as _, &attributes);
2683 data
2684 .available_size
2685 .record(space.space_available_size() as _, &attributes);
2686 data
2687 .physical_size
2688 .record(space.physical_space_size() as _, &attributes);
2689 }
2690}