1#![allow(clippy::too_many_arguments)]
4
5use std::borrow::Cow;
6use std::cell::RefCell;
7use std::collections::HashMap;
8use std::env;
9use std::fmt::Debug;
10use std::pin::Pin;
11use std::rc::Rc;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::sync::Mutex;
15use std::task::Context;
16use std::task::Poll;
17use std::thread;
18use std::time::Duration;
19use std::time::SystemTime;
20
21use deno_core::futures::channel::mpsc;
22use deno_core::futures::channel::mpsc::UnboundedSender;
23use deno_core::futures::future::BoxFuture;
24use deno_core::futures::stream;
25use deno_core::futures::FutureExt;
26use deno_core::futures::Stream;
27use deno_core::futures::StreamExt;
28use deno_core::op2;
29use deno_core::v8;
30use deno_core::v8::DataError;
31use deno_core::GarbageCollected;
32use deno_core::OpState;
33use deno_error::JsError;
34use deno_error::JsErrorBox;
35use once_cell::sync::Lazy;
36use once_cell::sync::OnceCell;
37use opentelemetry::logs::AnyValue;
38use opentelemetry::logs::LogRecord as LogRecordTrait;
39use opentelemetry::logs::Severity;
40use opentelemetry::metrics::AsyncInstrumentBuilder;
41pub use opentelemetry::metrics::Histogram;
42use opentelemetry::metrics::InstrumentBuilder;
43pub use opentelemetry::metrics::MeterProvider;
44pub use opentelemetry::metrics::UpDownCounter;
45use opentelemetry::otel_debug;
46use opentelemetry::otel_error;
47use opentelemetry::trace::Link;
48use opentelemetry::trace::SpanContext;
49use opentelemetry::trace::SpanId;
50use opentelemetry::trace::SpanKind;
51use opentelemetry::trace::Status as SpanStatus;
52use opentelemetry::trace::TraceFlags;
53use opentelemetry::trace::TraceId;
54use opentelemetry::trace::TraceState;
55use opentelemetry::InstrumentationScope;
56pub use opentelemetry::Key;
57pub use opentelemetry::KeyValue;
58pub use opentelemetry::StringValue;
59pub use opentelemetry::Value;
60use opentelemetry_otlp::HttpExporterBuilder;
61use opentelemetry_otlp::Protocol;
62use opentelemetry_otlp::WithExportConfig;
63use opentelemetry_otlp::WithHttpConfig;
64use opentelemetry_sdk::export::trace::SpanData;
65use opentelemetry_sdk::logs::BatchLogProcessor;
66use opentelemetry_sdk::logs::LogProcessor;
67use opentelemetry_sdk::logs::LogRecord;
68use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
69use opentelemetry_sdk::metrics::reader::MetricReader;
70use opentelemetry_sdk::metrics::ManualReader;
71use opentelemetry_sdk::metrics::MetricResult;
72use opentelemetry_sdk::metrics::SdkMeterProvider;
73use opentelemetry_sdk::metrics::Temporality;
74use opentelemetry_sdk::trace::BatchSpanProcessor;
75use opentelemetry_sdk::trace::IdGenerator;
76use opentelemetry_sdk::trace::RandomIdGenerator;
77use opentelemetry_sdk::trace::SpanEvents;
78use opentelemetry_sdk::trace::SpanLinks;
79use opentelemetry_sdk::trace::SpanProcessor as _;
80use opentelemetry_sdk::Resource;
81use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_NAME;
82use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_VERSION;
83use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_LANGUAGE;
84use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_NAME;
85use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_VERSION;
86use serde::Deserialize;
87use serde::Serialize;
88use thiserror::Error;
89use tokio::sync::oneshot;
90use tokio::task::JoinSet;
91
92deno_core::extension!(
93 deno_telemetry,
94 ops = [
95 op_otel_log,
96 op_otel_log_foreign,
97 op_otel_span_attribute1,
98 op_otel_span_attribute2,
99 op_otel_span_attribute3,
100 op_otel_span_add_link,
101 op_otel_span_update_name,
102 op_otel_metric_attribute3,
103 op_otel_metric_record0,
104 op_otel_metric_record1,
105 op_otel_metric_record2,
106 op_otel_metric_record3,
107 op_otel_metric_observable_record0,
108 op_otel_metric_observable_record1,
109 op_otel_metric_observable_record2,
110 op_otel_metric_observable_record3,
111 op_otel_metric_wait_to_observe,
112 op_otel_metric_observation_done,
113 ],
114 objects = [OtelTracer, OtelMeter, OtelSpan],
115 esm = ["telemetry.ts", "util.ts"],
116);
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct OtelRuntimeConfig {
120 pub runtime_name: Cow<'static, str>,
121 pub runtime_version: Cow<'static, str>,
122}
123
124#[derive(Default, Debug, Clone, Serialize, Deserialize)]
125pub struct OtelConfig {
126 pub tracing_enabled: bool,
127 pub metrics_enabled: bool,
128 pub console: OtelConsoleConfig,
129 pub deterministic: bool,
130}
131
132impl OtelConfig {
133 pub fn as_v8(&self) -> Box<[u8]> {
134 Box::new([
135 self.tracing_enabled as u8,
136 self.metrics_enabled as u8,
137 self.console as u8,
138 self.deterministic as u8,
139 ])
140 }
141}
142
143#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
144#[repr(u8)]
145pub enum OtelConsoleConfig {
146 Ignore = 0,
147 Capture = 1,
148 Replace = 2,
149}
150
151impl Default for OtelConsoleConfig {
152 fn default() -> Self {
153 Self::Ignore
154 }
155}
156
157static OTEL_SHARED_RUNTIME_SPAWN_TASK_TX: Lazy<
158 UnboundedSender<BoxFuture<'static, ()>>,
159> = Lazy::new(otel_create_shared_runtime);
160
161static OTEL_PRE_COLLECT_CALLBACKS: Lazy<
162 Mutex<Vec<oneshot::Sender<oneshot::Sender<()>>>>,
163> = Lazy::new(Default::default);
164
165fn otel_create_shared_runtime() -> UnboundedSender<BoxFuture<'static, ()>> {
166 let (spawn_task_tx, mut spawn_task_rx) =
167 mpsc::unbounded::<BoxFuture<'static, ()>>();
168
169 thread::spawn(move || {
170 let rt = tokio::runtime::Builder::new_current_thread()
171 .enable_io()
172 .enable_time()
173 .max_blocking_threads(if cfg!(windows) {
179 4 * std::thread::available_parallelism()
182 .map(|n| n.get())
183 .unwrap_or(8)
184 } else {
185 32
186 })
187 .build()
188 .unwrap();
189
190 rt.block_on(async move {
191 while let Some(task) = spawn_task_rx.next().await {
192 tokio::spawn(task);
193 }
194 });
195 });
196
197 spawn_task_tx
198}
199
200#[derive(Clone, Copy)]
201pub struct OtelSharedRuntime;
202
203impl hyper::rt::Executor<BoxFuture<'static, ()>> for OtelSharedRuntime {
204 fn execute(&self, fut: BoxFuture<'static, ()>) {
205 (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
206 .unbounded_send(fut)
207 .expect("failed to send task to shared OpenTelemetry runtime");
208 }
209}
210
211impl opentelemetry_sdk::runtime::Runtime for OtelSharedRuntime {
212 type Interval = Pin<Box<dyn Stream<Item = ()> + Send + 'static>>;
213 type Delay = Pin<Box<tokio::time::Sleep>>;
214
215 fn interval(&self, period: Duration) -> Self::Interval {
216 stream::repeat(())
217 .then(move |_| tokio::time::sleep(period))
218 .boxed()
219 }
220
221 fn spawn(&self, future: BoxFuture<'static, ()>) {
222 (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
223 .unbounded_send(future)
224 .expect("failed to send task to shared OpenTelemetry runtime");
225 }
226
227 fn delay(&self, duration: Duration) -> Self::Delay {
228 Box::pin(tokio::time::sleep(duration))
229 }
230}
231
232impl opentelemetry_sdk::runtime::RuntimeChannel for OtelSharedRuntime {
233 type Receiver<T: Debug + Send> = BatchMessageChannelReceiver<T>;
234 type Sender<T: Debug + Send> = BatchMessageChannelSender<T>;
235
236 fn batch_message_channel<T: Debug + Send>(
237 &self,
238 capacity: usize,
239 ) -> (Self::Sender<T>, Self::Receiver<T>) {
240 let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<T>(capacity);
241 (batch_tx.into(), batch_rx.into())
242 }
243}
244
245#[derive(Debug)]
246pub struct BatchMessageChannelSender<T: Send> {
247 sender: tokio::sync::mpsc::Sender<T>,
248}
249
250impl<T: Send> From<tokio::sync::mpsc::Sender<T>>
251 for BatchMessageChannelSender<T>
252{
253 fn from(sender: tokio::sync::mpsc::Sender<T>) -> Self {
254 Self { sender }
255 }
256}
257
258impl<T: Send> opentelemetry_sdk::runtime::TrySend
259 for BatchMessageChannelSender<T>
260{
261 type Message = T;
262
263 fn try_send(
264 &self,
265 item: Self::Message,
266 ) -> Result<(), opentelemetry_sdk::runtime::TrySendError> {
267 self.sender.try_send(item).map_err(|err| match err {
268 tokio::sync::mpsc::error::TrySendError::Full(_) => {
269 opentelemetry_sdk::runtime::TrySendError::ChannelFull
270 }
271 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
272 opentelemetry_sdk::runtime::TrySendError::ChannelClosed
273 }
274 })
275 }
276}
277
278pub struct BatchMessageChannelReceiver<T> {
279 receiver: tokio::sync::mpsc::Receiver<T>,
280}
281
282impl<T> From<tokio::sync::mpsc::Receiver<T>>
283 for BatchMessageChannelReceiver<T>
284{
285 fn from(receiver: tokio::sync::mpsc::Receiver<T>) -> Self {
286 Self { receiver }
287 }
288}
289
290impl<T> Stream for BatchMessageChannelReceiver<T> {
291 type Item = T;
292
293 fn poll_next(
294 mut self: Pin<&mut Self>,
295 cx: &mut Context<'_>,
296 ) -> Poll<Option<Self::Item>> {
297 self.receiver.poll_recv(cx)
298 }
299}
300
301enum DenoPeriodicReaderMessage {
302 Register(std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>),
303 Export,
304 ForceFlush(oneshot::Sender<MetricResult<()>>),
305 Shutdown(oneshot::Sender<MetricResult<()>>),
306}
307
308#[derive(Debug)]
309struct DenoPeriodicReader {
310 tx: tokio::sync::mpsc::Sender<DenoPeriodicReaderMessage>,
311 temporality: Temporality,
312}
313
314impl MetricReader for DenoPeriodicReader {
315 fn register_pipeline(
316 &self,
317 pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>,
318 ) {
319 let _ = self
320 .tx
321 .try_send(DenoPeriodicReaderMessage::Register(pipeline));
322 }
323
324 fn collect(
325 &self,
326 _rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
327 ) -> opentelemetry_sdk::metrics::MetricResult<()> {
328 unreachable!("collect should not be called on DenoPeriodicReader");
329 }
330
331 fn force_flush(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
332 let (tx, rx) = oneshot::channel();
333 let _ = self.tx.try_send(DenoPeriodicReaderMessage::ForceFlush(tx));
334 deno_core::futures::executor::block_on(rx).unwrap()?;
335 Ok(())
336 }
337
338 fn shutdown(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
339 let (tx, rx) = oneshot::channel();
340 let _ = self.tx.try_send(DenoPeriodicReaderMessage::Shutdown(tx));
341 deno_core::futures::executor::block_on(rx).unwrap()?;
342 Ok(())
343 }
344
345 fn temporality(
346 &self,
347 _kind: opentelemetry_sdk::metrics::InstrumentKind,
348 ) -> Temporality {
349 self.temporality
350 }
351}
352
353const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
354const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
355
356impl DenoPeriodicReader {
357 fn new(exporter: opentelemetry_otlp::MetricExporter) -> Self {
358 let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
359 .ok()
360 .and_then(|v| v.parse().map(Duration::from_millis).ok())
361 .unwrap_or(DEFAULT_INTERVAL);
362
363 let (tx, mut rx) = tokio::sync::mpsc::channel(256);
364
365 let temporality = PushMetricExporter::temporality(&exporter);
366
367 let worker = async move {
368 let inner = ManualReader::builder()
369 .with_temporality(PushMetricExporter::temporality(&exporter))
370 .build();
371
372 let collect_and_export = |collect_observed: bool| {
373 let inner = &inner;
374 let exporter = &exporter;
375 async move {
376 let mut resource_metrics =
377 opentelemetry_sdk::metrics::data::ResourceMetrics {
378 resource: Default::default(),
379 scope_metrics: Default::default(),
380 };
381 if collect_observed {
382 let callbacks = {
383 let mut callbacks = OTEL_PRE_COLLECT_CALLBACKS.lock().unwrap();
384 std::mem::take(&mut *callbacks)
385 };
386 let mut futures = JoinSet::new();
387 for callback in callbacks {
388 let (tx, rx) = oneshot::channel();
389 if let Ok(()) = callback.send(tx) {
390 futures.spawn(rx);
391 }
392 }
393 while futures.join_next().await.is_some() {}
394 }
395 inner.collect(&mut resource_metrics)?;
396 if resource_metrics.scope_metrics.is_empty() {
397 return Ok(());
398 }
399 exporter.export(&mut resource_metrics).await?;
400 Ok(())
401 }
402 };
403
404 let mut ticker = tokio::time::interval(interval);
405 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
406 ticker.tick().await;
407
408 loop {
409 let message = tokio::select! {
410 _ = ticker.tick() => DenoPeriodicReaderMessage::Export,
411 message = rx.recv() => if let Some(message) = message {
412 message
413 } else {
414 break;
415 },
416 };
417
418 match message {
419 DenoPeriodicReaderMessage::Register(new_pipeline) => {
420 inner.register_pipeline(new_pipeline);
421 }
422 DenoPeriodicReaderMessage::Export => {
423 otel_debug!(
424 name: "DenoPeriodicReader.ExportTriggered",
425 message = "Export message received.",
426 );
427 if let Err(err) = collect_and_export(true).await {
428 otel_error!(
429 name: "DenoPeriodicReader.ExportFailed",
430 message = "Failed to export metrics",
431 reason = format!("{}", err));
432 }
433 }
434 DenoPeriodicReaderMessage::ForceFlush(sender) => {
435 otel_debug!(
436 name: "DenoPeriodicReader.ForceFlushCalled",
437 message = "Flush message received.",
438 );
439 let res = collect_and_export(false).await;
440 if let Err(send_error) = sender.send(res) {
441 otel_debug!(
442 name: "DenoPeriodicReader.Flush.SendResultError",
443 message = "Failed to send flush result.",
444 reason = format!("{:?}", send_error),
445 );
446 }
447 }
448 DenoPeriodicReaderMessage::Shutdown(sender) => {
449 otel_debug!(
450 name: "DenoPeriodicReader.ShutdownCalled",
451 message = "Shutdown message received",
452 );
453 let res = collect_and_export(false).await;
454 let _ = exporter.shutdown();
455 if let Err(send_error) = sender.send(res) {
456 otel_debug!(
457 name: "DenoPeriodicReader.Shutdown.SendResultError",
458 message = "Failed to send shutdown result",
459 reason = format!("{:?}", send_error),
460 );
461 }
462 break;
463 }
464 }
465 }
466 };
467
468 (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
469 .unbounded_send(worker.boxed())
470 .expect("failed to send task to shared OpenTelemetry runtime");
471
472 DenoPeriodicReader { tx, temporality }
473 }
474}
475
476mod hyper_client {
477 use std::fmt::Debug;
478 use std::pin::Pin;
479 use std::task::Poll;
480 use std::task::{self};
481
482 use deno_tls::create_client_config;
483 use deno_tls::load_certs;
484 use deno_tls::load_private_keys;
485 use deno_tls::SocketUse;
486 use deno_tls::TlsKey;
487 use deno_tls::TlsKeys;
488 use http_body_util::BodyExt;
489 use http_body_util::Full;
490 use hyper::body::Body as HttpBody;
491 use hyper::body::Frame;
492 use hyper_rustls::HttpsConnector;
493 use hyper_util::client::legacy::connect::HttpConnector;
494 use hyper_util::client::legacy::Client;
495 use opentelemetry_http::Bytes;
496 use opentelemetry_http::HttpError;
497 use opentelemetry_http::Request;
498 use opentelemetry_http::Response;
499 use opentelemetry_http::ResponseExt;
500
501 use super::OtelSharedRuntime;
502
503 #[derive(Debug, Clone)]
505 pub struct HyperClient {
506 inner: Client<HttpsConnector<HttpConnector>, Body>,
507 }
508
509 impl HyperClient {
510 pub fn new() -> deno_core::anyhow::Result<Self> {
511 let ca_certs = match std::env::var("OTEL_EXPORTER_OTLP_CERTIFICATE") {
512 Ok(path) => vec![std::fs::read(path)?],
513 _ => vec![],
514 };
515
516 let keys = match (
517 std::env::var("OTEL_EXPORTER_OTLP_CLIENT_KEY"),
518 std::env::var("OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE"),
519 ) {
520 (Ok(key_path), Ok(cert_path)) => {
521 let key = std::fs::read(key_path)?;
522 let cert = std::fs::read(cert_path)?;
523
524 let certs = load_certs(&mut std::io::Cursor::new(cert))?;
525 let key = load_private_keys(&key)?.into_iter().next().unwrap();
526
527 TlsKeys::Static(TlsKey(certs, key))
528 }
529 _ => TlsKeys::Null,
530 };
531
532 let tls_config =
533 create_client_config(None, ca_certs, None, keys, SocketUse::Http)?;
534 let mut http_connector = HttpConnector::new();
535 http_connector.enforce_http(false);
536 let connector = HttpsConnector::from((http_connector, tls_config));
537
538 Ok(Self {
539 inner: Client::builder(OtelSharedRuntime).build(connector),
540 })
541 }
542 }
543
544 #[async_trait::async_trait]
545 impl opentelemetry_http::HttpClient for HyperClient {
546 async fn send(
547 &self,
548 request: Request<Vec<u8>>,
549 ) -> Result<Response<Bytes>, HttpError> {
550 let (parts, body) = request.into_parts();
551 let request = Request::from_parts(parts, Body(Full::from(body)));
552 let mut response = self.inner.request(request).await?;
553 let headers = std::mem::take(response.headers_mut());
554
555 let mut http_response = Response::builder()
556 .status(response.status())
557 .body(response.into_body().collect().await?.to_bytes())?;
558 *http_response.headers_mut() = headers;
559
560 Ok(http_response.error_for_status()?)
561 }
562 }
563
564 #[pin_project::pin_project]
565 pub struct Body(#[pin] Full<Bytes>);
566
567 impl HttpBody for Body {
568 type Data = Bytes;
569 type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
570
571 #[inline]
572 fn poll_frame(
573 self: Pin<&mut Self>,
574 cx: &mut task::Context<'_>,
575 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
576 self.project().0.poll_frame(cx).map_err(Into::into)
577 }
578
579 #[inline]
580 fn is_end_stream(&self) -> bool {
581 self.0.is_end_stream()
582 }
583
584 #[inline]
585 fn size_hint(&self) -> hyper::body::SizeHint {
586 self.0.size_hint()
587 }
588 }
589}
590
591#[derive(Debug)]
592pub struct OtelGlobals {
593 pub span_processor: BatchSpanProcessor<OtelSharedRuntime>,
594 pub log_processor: BatchLogProcessor<OtelSharedRuntime>,
595 pub id_generator: DenoIdGenerator,
596 pub meter_provider: SdkMeterProvider,
597 pub builtin_instrumentation_scope: InstrumentationScope,
598}
599
600pub static OTEL_GLOBALS: OnceCell<OtelGlobals> = OnceCell::new();
601
602pub fn init(
603 rt_config: OtelRuntimeConfig,
604 config: &OtelConfig,
605) -> deno_core::anyhow::Result<()> {
606 let protocol = match env::var("OTEL_EXPORTER_OTLP_PROTOCOL").as_deref() {
610 Ok("http/protobuf") => Protocol::HttpBinary,
611 Ok("http/json") => Protocol::HttpJson,
612 Ok("") | Err(env::VarError::NotPresent) => Protocol::HttpBinary,
613 Ok(protocol) => {
614 return Err(deno_core::anyhow::anyhow!(
615 "Env var OTEL_EXPORTER_OTLP_PROTOCOL specifies an unsupported protocol: {}",
616 protocol
617 ));
618 }
619 Err(err) => {
620 return Err(deno_core::anyhow::anyhow!(
621 "Failed to read env var OTEL_EXPORTER_OTLP_PROTOCOL: {}",
622 err
623 ));
624 }
625 };
626
627 let mut resource = Resource::default();
634
635 resource = resource.merge(&Resource::new(vec![
638 KeyValue::new(PROCESS_RUNTIME_NAME, rt_config.runtime_name),
639 KeyValue::new(PROCESS_RUNTIME_VERSION, rt_config.runtime_version.clone()),
640 KeyValue::new(
641 TELEMETRY_SDK_LANGUAGE,
642 format!(
643 "deno-{}",
644 resource.get(Key::new(TELEMETRY_SDK_LANGUAGE)).unwrap()
645 ),
646 ),
647 KeyValue::new(
648 TELEMETRY_SDK_NAME,
649 format!(
650 "deno-{}",
651 resource.get(Key::new(TELEMETRY_SDK_NAME)).unwrap()
652 ),
653 ),
654 KeyValue::new(
655 TELEMETRY_SDK_VERSION,
656 format!(
657 "{}-{}",
658 rt_config.runtime_version,
659 resource.get(Key::new(TELEMETRY_SDK_VERSION)).unwrap()
660 ),
661 ),
662 ]));
663
664 let client = hyper_client::HyperClient::new()?;
669
670 let span_exporter = HttpExporterBuilder::default()
671 .with_http_client(client.clone())
672 .with_protocol(protocol)
673 .build_span_exporter()?;
674 let mut span_processor =
675 BatchSpanProcessor::builder(span_exporter, OtelSharedRuntime).build();
676 span_processor.set_resource(&resource);
677
678 let temporality_preference =
679 env::var("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE")
680 .ok()
681 .map(|s| s.to_lowercase());
682 let temporality = match temporality_preference.as_deref() {
683 None | Some("cumulative") => Temporality::Cumulative,
684 Some("delta") => Temporality::Delta,
685 Some("lowmemory") => Temporality::LowMemory,
686 Some(other) => {
687 return Err(deno_core::anyhow::anyhow!(
688 "Invalid value for OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: {}",
689 other
690 ));
691 }
692 };
693 let metric_exporter = HttpExporterBuilder::default()
694 .with_http_client(client.clone())
695 .with_protocol(protocol)
696 .build_metrics_exporter(temporality)?;
697 let metric_reader = DenoPeriodicReader::new(metric_exporter);
698 let meter_provider = SdkMeterProvider::builder()
699 .with_reader(metric_reader)
700 .with_resource(resource.clone())
701 .build();
702
703 let log_exporter = HttpExporterBuilder::default()
704 .with_http_client(client)
705 .with_protocol(protocol)
706 .build_log_exporter()?;
707 let log_processor =
708 BatchLogProcessor::builder(log_exporter, OtelSharedRuntime).build();
709 log_processor.set_resource(&resource);
710
711 let builtin_instrumentation_scope =
712 opentelemetry::InstrumentationScope::builder("deno")
713 .with_version(rt_config.runtime_version.clone())
714 .build();
715
716 let id_generator = if config.deterministic {
717 DenoIdGenerator::deterministic()
718 } else {
719 DenoIdGenerator::random()
720 };
721
722 OTEL_GLOBALS
723 .set(OtelGlobals {
724 log_processor,
725 span_processor,
726 id_generator,
727 meter_provider,
728 builtin_instrumentation_scope,
729 })
730 .map_err(|_| deno_core::anyhow::anyhow!("failed to set otel globals"))?;
731
732 Ok(())
733}
734
735pub fn flush() {
739 if let Some(OtelGlobals {
740 span_processor: spans,
741 log_processor: logs,
742 meter_provider,
743 ..
744 }) = OTEL_GLOBALS.get()
745 {
746 let _ = spans.force_flush();
747 let _ = logs.force_flush();
748 let _ = meter_provider.force_flush();
749 }
750}
751
752pub fn handle_log(record: &log::Record) {
753 use log::Level;
754
755 let Some(OtelGlobals {
756 log_processor: logs,
757 builtin_instrumentation_scope,
758 ..
759 }) = OTEL_GLOBALS.get()
760 else {
761 return;
762 };
763
764 let mut log_record = LogRecord::default();
765
766 log_record.set_observed_timestamp(SystemTime::now());
767 log_record.set_severity_number(match record.level() {
768 Level::Error => Severity::Error,
769 Level::Warn => Severity::Warn,
770 Level::Info => Severity::Info,
771 Level::Debug => Severity::Debug,
772 Level::Trace => Severity::Trace,
773 });
774 log_record.set_severity_text(record.level().as_str());
775 log_record.set_body(record.args().to_string().into());
776 log_record.set_target(record.metadata().target().to_string());
777
778 struct Visitor<'s>(&'s mut LogRecord);
779
780 impl<'s, 'kvs> log::kv::VisitSource<'kvs> for Visitor<'s> {
781 fn visit_pair(
782 &mut self,
783 key: log::kv::Key<'kvs>,
784 value: log::kv::Value<'kvs>,
785 ) -> Result<(), log::kv::Error> {
786 #[allow(clippy::manual_map)]
787 let value = if let Some(v) = value.to_bool() {
788 Some(AnyValue::Boolean(v))
789 } else if let Some(v) = value.to_borrowed_str() {
790 Some(AnyValue::String(v.to_owned().into()))
791 } else if let Some(v) = value.to_f64() {
792 Some(AnyValue::Double(v))
793 } else if let Some(v) = value.to_i64() {
794 Some(AnyValue::Int(v))
795 } else {
796 None
797 };
798
799 if let Some(value) = value {
800 let key = Key::from(key.as_str().to_owned());
801 self.0.add_attribute(key, value);
802 }
803
804 Ok(())
805 }
806 }
807
808 let _ = record.key_values().visit(&mut Visitor(&mut log_record));
809
810 logs.emit(&mut log_record, builtin_instrumentation_scope);
811}
812
813#[derive(Debug)]
814pub enum DenoIdGenerator {
815 Random(RandomIdGenerator),
816 Deterministic {
817 next_trace_id: AtomicU64,
818 next_span_id: AtomicU64,
819 },
820}
821
822impl IdGenerator for DenoIdGenerator {
823 fn new_trace_id(&self) -> TraceId {
824 match self {
825 Self::Random(generator) => generator.new_trace_id(),
826 Self::Deterministic { next_trace_id, .. } => {
827 let id =
828 next_trace_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
829 let bytes = id.to_be_bytes();
830 let bytes = [
831 0, 0, 0, 0, 0, 0, 0, 0, bytes[0], bytes[1], bytes[2], bytes[3],
832 bytes[4], bytes[5], bytes[6], bytes[7],
833 ];
834 TraceId::from_bytes(bytes)
835 }
836 }
837 }
838
839 fn new_span_id(&self) -> SpanId {
840 match self {
841 Self::Random(generator) => generator.new_span_id(),
842 Self::Deterministic { next_span_id, .. } => {
843 let id =
844 next_span_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
845 SpanId::from_bytes(id.to_be_bytes())
846 }
847 }
848 }
849}
850
851impl DenoIdGenerator {
852 fn random() -> Self {
853 Self::Random(RandomIdGenerator::default())
854 }
855
856 fn deterministic() -> Self {
857 Self::Deterministic {
858 next_trace_id: AtomicU64::new(1),
859 next_span_id: AtomicU64::new(1),
860 }
861 }
862}
863
864fn parse_trace_id(
865 scope: &mut v8::HandleScope<'_>,
866 trace_id: v8::Local<'_, v8::Value>,
867) -> TraceId {
868 if let Ok(string) = trace_id.try_cast() {
869 let value_view = v8::ValueView::new(scope, string);
870 match value_view.data() {
871 v8::ValueViewData::OneByte(bytes) => {
872 TraceId::from_hex(&String::from_utf8_lossy(bytes))
873 .unwrap_or(TraceId::INVALID)
874 }
875
876 _ => TraceId::INVALID,
877 }
878 } else if let Ok(uint8array) = trace_id.try_cast::<v8::Uint8Array>() {
879 let data = uint8array.data();
880 let byte_length = uint8array.byte_length();
881 if byte_length != 16 {
882 return TraceId::INVALID;
883 }
884 let bytes = unsafe { &*(data as *const u8 as *const [u8; 16]) };
887 TraceId::from_bytes(*bytes)
888 } else {
889 TraceId::INVALID
890 }
891}
892
893fn parse_span_id(
894 scope: &mut v8::HandleScope<'_>,
895 span_id: v8::Local<'_, v8::Value>,
896) -> SpanId {
897 if let Ok(string) = span_id.try_cast() {
898 let value_view = v8::ValueView::new(scope, string);
899 match value_view.data() {
900 v8::ValueViewData::OneByte(bytes) => {
901 SpanId::from_hex(&String::from_utf8_lossy(bytes))
902 .unwrap_or(SpanId::INVALID)
903 }
904 _ => SpanId::INVALID,
905 }
906 } else if let Ok(uint8array) = span_id.try_cast::<v8::Uint8Array>() {
907 let data = uint8array.data();
908 let byte_length = uint8array.byte_length();
909 if byte_length != 8 {
910 return SpanId::INVALID;
911 }
912 let bytes = unsafe { &*(data as *const u8 as *const [u8; 8]) };
915 SpanId::from_bytes(*bytes)
916 } else {
917 SpanId::INVALID
918 }
919}
920
921macro_rules! attr_raw {
922 ($scope:ident, $name:expr, $value:expr) => {{
923 let name = if let Ok(name) = $name.try_cast() {
924 let view = v8::ValueView::new($scope, name);
925 match view.data() {
926 v8::ValueViewData::OneByte(bytes) => {
927 Some(String::from_utf8_lossy(bytes).into_owned())
928 }
929 v8::ValueViewData::TwoByte(bytes) => {
930 Some(String::from_utf16_lossy(bytes))
931 }
932 }
933 } else {
934 None
935 };
936 let value = if let Ok(string) = $value.try_cast::<v8::String>() {
937 Some(Value::String(StringValue::from({
938 let x = v8::ValueView::new($scope, string);
939 match x.data() {
940 v8::ValueViewData::OneByte(bytes) => {
941 String::from_utf8_lossy(bytes).into_owned()
942 }
943 v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes),
944 }
945 })))
946 } else if let Ok(number) = $value.try_cast::<v8::Number>() {
947 Some(Value::F64(number.value()))
948 } else if let Ok(boolean) = $value.try_cast::<v8::Boolean>() {
949 Some(Value::Bool(boolean.is_true()))
950 } else if let Ok(bigint) = $value.try_cast::<v8::BigInt>() {
951 let (i64_value, _lossless) = bigint.i64_value();
952 Some(Value::I64(i64_value))
953 } else if let Ok(_array) = $value.try_cast::<v8::Array>() {
954 None
956 } else {
957 None
958 };
959 if let (Some(name), Some(value)) = (name, value) {
960 Some(KeyValue::new(name, value))
961 } else {
962 None
963 }
964 }};
965}
966
967macro_rules! attr {
968 ($scope:ident, $attributes:expr $(=> $dropped_attributes_count:expr)?, $name:expr, $value:expr) => {
969 let attr = attr_raw!($scope, $name, $value);
970 if let Some(kv) = attr {
971 $attributes.push(kv);
972 }
973 $(
974 else {
975 $dropped_attributes_count += 1;
976 }
977 )?
978 };
979}
980
981#[op2(fast)]
982fn op_otel_log<'s>(
983 scope: &mut v8::HandleScope<'s>,
984 message: v8::Local<'s, v8::Value>,
985 #[smi] level: i32,
986 span: v8::Local<'s, v8::Value>,
987) {
988 let Some(OtelGlobals {
989 log_processor,
990 builtin_instrumentation_scope,
991 ..
992 }) = OTEL_GLOBALS.get()
993 else {
994 return;
995 };
996
997 let severity = match level {
1000 ..=0 => Severity::Debug,
1001 1 => Severity::Info,
1002 2 => Severity::Warn,
1003 3.. => Severity::Error,
1004 };
1005
1006 let mut log_record = LogRecord::default();
1007 log_record.set_observed_timestamp(SystemTime::now());
1008 let Ok(message) = message.try_cast() else {
1009 return;
1010 };
1011 log_record.set_body(owned_string(scope, message).into());
1012 log_record.set_severity_number(severity);
1013 log_record.set_severity_text(severity.name());
1014 if let Some(span) =
1015 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1016 {
1017 let state = span.0.borrow();
1018 match &**state {
1019 OtelSpanState::Recording(span) => {
1020 log_record.set_trace_context(
1021 span.span_context.trace_id(),
1022 span.span_context.span_id(),
1023 Some(span.span_context.trace_flags()),
1024 );
1025 }
1026 OtelSpanState::Done(span_context) => {
1027 log_record.set_trace_context(
1028 span_context.trace_id(),
1029 span_context.span_id(),
1030 Some(span_context.trace_flags()),
1031 );
1032 }
1033 }
1034 }
1035
1036 log_processor.emit(&mut log_record, builtin_instrumentation_scope);
1037}
1038
1039#[op2(fast)]
1040fn op_otel_log_foreign(
1041 scope: &mut v8::HandleScope<'_>,
1042 #[string] message: String,
1043 #[smi] level: i32,
1044 trace_id: v8::Local<'_, v8::Value>,
1045 span_id: v8::Local<'_, v8::Value>,
1046 #[smi] trace_flags: u8,
1047) {
1048 let Some(OtelGlobals {
1049 log_processor,
1050 builtin_instrumentation_scope,
1051 ..
1052 }) = OTEL_GLOBALS.get()
1053 else {
1054 return;
1055 };
1056
1057 let severity = match level {
1060 ..=0 => Severity::Debug,
1061 1 => Severity::Info,
1062 2 => Severity::Warn,
1063 3.. => Severity::Error,
1064 };
1065
1066 let trace_id = parse_trace_id(scope, trace_id);
1067 let span_id = parse_span_id(scope, span_id);
1068
1069 let mut log_record = LogRecord::default();
1070
1071 log_record.set_observed_timestamp(SystemTime::now());
1072 log_record.set_body(message.into());
1073 log_record.set_severity_number(severity);
1074 log_record.set_severity_text(severity.name());
1075 if trace_id != TraceId::INVALID && span_id != SpanId::INVALID {
1076 log_record.set_trace_context(
1077 trace_id,
1078 span_id,
1079 Some(TraceFlags::new(trace_flags)),
1080 );
1081 }
1082
1083 log_processor.emit(&mut log_record, builtin_instrumentation_scope);
1084}
1085
1086fn owned_string<'s>(
1087 scope: &mut v8::HandleScope<'s>,
1088 string: v8::Local<'s, v8::String>,
1089) -> String {
1090 let x = v8::ValueView::new(scope, string);
1091 match x.data() {
1092 v8::ValueViewData::OneByte(bytes) => {
1093 String::from_utf8_lossy(bytes).into_owned()
1094 }
1095 v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes),
1096 }
1097}
1098
1099struct OtelTracer(InstrumentationScope);
1100
1101impl deno_core::GarbageCollected for OtelTracer {}
1102
1103#[op2]
1104impl OtelTracer {
1105 #[constructor]
1106 #[cppgc]
1107 fn new(
1108 #[string] name: String,
1109 #[string] version: Option<String>,
1110 #[string] schema_url: Option<String>,
1111 ) -> OtelTracer {
1112 let mut builder = opentelemetry::InstrumentationScope::builder(name);
1113 if let Some(version) = version {
1114 builder = builder.with_version(version);
1115 }
1116 if let Some(schema_url) = schema_url {
1117 builder = builder.with_schema_url(schema_url);
1118 }
1119 let scope = builder.build();
1120 OtelTracer(scope)
1121 }
1122
1123 #[static_method]
1124 #[cppgc]
1125 fn builtin() -> OtelTracer {
1126 let OtelGlobals {
1127 builtin_instrumentation_scope,
1128 ..
1129 } = OTEL_GLOBALS.get().unwrap();
1130 OtelTracer(builtin_instrumentation_scope.clone())
1131 }
1132
1133 #[cppgc]
1134 fn start_span<'s>(
1135 &self,
1136 scope: &mut v8::HandleScope<'s>,
1137 #[cppgc] parent: Option<&OtelSpan>,
1138 name: v8::Local<'s, v8::Value>,
1139 #[smi] span_kind: u8,
1140 start_time: Option<f64>,
1141 #[smi] attribute_count: usize,
1142 ) -> Result<OtelSpan, JsErrorBox> {
1143 let OtelGlobals { id_generator, .. } = OTEL_GLOBALS.get().unwrap();
1144 let span_context;
1145 let parent_span_id;
1146 match parent {
1147 Some(parent) => {
1148 let parent = parent.0.borrow();
1149 let parent_span_context = match &**parent {
1150 OtelSpanState::Recording(span) => &span.span_context,
1151 OtelSpanState::Done(span_context) => span_context,
1152 };
1153 span_context = SpanContext::new(
1154 parent_span_context.trace_id(),
1155 id_generator.new_span_id(),
1156 TraceFlags::SAMPLED,
1157 false,
1158 parent_span_context.trace_state().clone(),
1159 );
1160 parent_span_id = parent_span_context.span_id();
1161 }
1162 None => {
1163 span_context = SpanContext::new(
1164 id_generator.new_trace_id(),
1165 id_generator.new_span_id(),
1166 TraceFlags::SAMPLED,
1167 false,
1168 TraceState::NONE,
1169 );
1170 parent_span_id = SpanId::INVALID;
1171 }
1172 }
1173 let name = owned_string(
1174 scope,
1175 name
1176 .try_cast()
1177 .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
1178 );
1179 let span_kind = match span_kind {
1180 0 => SpanKind::Internal,
1181 1 => SpanKind::Server,
1182 2 => SpanKind::Client,
1183 3 => SpanKind::Producer,
1184 4 => SpanKind::Consumer,
1185 _ => return Err(JsErrorBox::generic("invalid span kind")),
1186 };
1187 let start_time = start_time
1188 .map(|start_time| {
1189 SystemTime::UNIX_EPOCH
1190 .checked_add(std::time::Duration::from_secs_f64(start_time / 1000.0))
1191 .ok_or_else(|| JsErrorBox::generic("invalid start time"))
1192 })
1193 .unwrap_or_else(|| Ok(SystemTime::now()))?;
1194 let span_data = SpanData {
1195 span_context,
1196 parent_span_id,
1197 span_kind,
1198 name: Cow::Owned(name),
1199 start_time,
1200 end_time: SystemTime::UNIX_EPOCH,
1201 attributes: Vec::with_capacity(attribute_count),
1202 dropped_attributes_count: 0,
1203 status: SpanStatus::Unset,
1204 events: SpanEvents::default(),
1205 links: SpanLinks::default(),
1206 instrumentation_scope: self.0.clone(),
1207 };
1208 Ok(OtelSpan(RefCell::new(Box::new(OtelSpanState::Recording(
1209 span_data,
1210 )))))
1211 }
1212
1213 #[cppgc]
1214 fn start_span_foreign<'s>(
1215 &self,
1216 scope: &mut v8::HandleScope<'s>,
1217 parent_trace_id: v8::Local<'s, v8::Value>,
1218 parent_span_id: v8::Local<'s, v8::Value>,
1219 name: v8::Local<'s, v8::Value>,
1220 #[smi] span_kind: u8,
1221 start_time: Option<f64>,
1222 #[smi] attribute_count: usize,
1223 ) -> Result<OtelSpan, JsErrorBox> {
1224 let parent_trace_id = parse_trace_id(scope, parent_trace_id);
1225 if parent_trace_id == TraceId::INVALID {
1226 return Err(JsErrorBox::generic("invalid trace id"));
1227 };
1228 let parent_span_id = parse_span_id(scope, parent_span_id);
1229 if parent_span_id == SpanId::INVALID {
1230 return Err(JsErrorBox::generic("invalid span id"));
1231 };
1232 let OtelGlobals { id_generator, .. } = OTEL_GLOBALS.get().unwrap();
1233 let span_context = SpanContext::new(
1234 parent_trace_id,
1235 id_generator.new_span_id(),
1236 TraceFlags::SAMPLED,
1237 false,
1238 TraceState::NONE,
1239 );
1240 let name = owned_string(
1241 scope,
1242 name
1243 .try_cast()
1244 .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
1245 );
1246 let span_kind = match span_kind {
1247 0 => SpanKind::Internal,
1248 1 => SpanKind::Server,
1249 2 => SpanKind::Client,
1250 3 => SpanKind::Producer,
1251 4 => SpanKind::Consumer,
1252 _ => return Err(JsErrorBox::generic("invalid span kind")),
1253 };
1254 let start_time = start_time
1255 .map(|start_time| {
1256 SystemTime::UNIX_EPOCH
1257 .checked_add(std::time::Duration::from_secs_f64(start_time / 1000.0))
1258 .ok_or_else(|| JsErrorBox::generic("invalid start time"))
1259 })
1260 .unwrap_or_else(|| Ok(SystemTime::now()))?;
1261 let span_data = SpanData {
1262 span_context,
1263 parent_span_id,
1264 span_kind,
1265 name: Cow::Owned(name),
1266 start_time,
1267 end_time: SystemTime::UNIX_EPOCH,
1268 attributes: Vec::with_capacity(attribute_count),
1269 dropped_attributes_count: 0,
1270 status: SpanStatus::Unset,
1271 events: SpanEvents::default(),
1272 links: SpanLinks::default(),
1273 instrumentation_scope: self.0.clone(),
1274 };
1275 Ok(OtelSpan(RefCell::new(Box::new(OtelSpanState::Recording(
1276 span_data,
1277 )))))
1278 }
1279}
1280
1281#[derive(Serialize)]
1282#[serde(rename_all = "camelCase")]
1283struct JsSpanContext {
1284 trace_id: Box<str>,
1285 span_id: Box<str>,
1286 trace_flags: u8,
1287}
1288
1289#[derive(Debug, Error, JsError)]
1290#[error("OtelSpan cannot be constructed.")]
1291#[class(type)]
1292struct OtelSpanCannotBeConstructedError;
1293
1294#[derive(Debug, Error, JsError)]
1295#[error("invalid span status code")]
1296#[class(type)]
1297struct InvalidSpanStatusCodeError;
1298
1299#[derive(Debug)]
1301struct OtelSpan(RefCell<Box<OtelSpanState>>);
1302
1303#[derive(Debug)]
1304#[allow(clippy::large_enum_variant)]
1305enum OtelSpanState {
1306 Recording(SpanData),
1307 Done(SpanContext),
1308}
1309
1310impl deno_core::GarbageCollected for OtelSpan {}
1311
1312#[op2]
1313impl OtelSpan {
1314 #[constructor]
1315 #[cppgc]
1316 fn new() -> Result<OtelSpan, OtelSpanCannotBeConstructedError> {
1317 Err(OtelSpanCannotBeConstructedError)
1318 }
1319
1320 #[serde]
1321 fn span_context(&self) -> JsSpanContext {
1322 let state = self.0.borrow();
1323 let span_context = match &**state {
1324 OtelSpanState::Recording(span) => &span.span_context,
1325 OtelSpanState::Done(span_context) => span_context,
1326 };
1327 JsSpanContext {
1328 trace_id: format!("{:?}", span_context.trace_id()).into(),
1329 span_id: format!("{:?}", span_context.span_id()).into(),
1330 trace_flags: span_context.trace_flags().to_u8(),
1331 }
1332 }
1333
1334 #[fast]
1335 fn set_status<'s>(
1336 &self,
1337 #[smi] status: u8,
1338 #[string] error_description: String,
1339 ) -> Result<(), InvalidSpanStatusCodeError> {
1340 let mut state = self.0.borrow_mut();
1341 let OtelSpanState::Recording(span) = &mut **state else {
1342 return Ok(());
1343 };
1344 span.status = match status {
1345 0 => SpanStatus::Unset,
1346 1 => SpanStatus::Ok,
1347 2 => SpanStatus::Error {
1348 description: Cow::Owned(error_description),
1349 },
1350 _ => return Err(InvalidSpanStatusCodeError),
1351 };
1352 Ok(())
1353 }
1354
1355 #[fast]
1356 fn drop_event(&self) {
1357 let mut state = self.0.borrow_mut();
1358 match &mut **state {
1359 OtelSpanState::Recording(span) => {
1360 span.events.dropped_count += 1;
1361 }
1362 OtelSpanState::Done(_) => {}
1363 }
1364 }
1365
1366 #[fast]
1367 fn end(&self, end_time: f64) {
1368 let end_time = if end_time.is_nan() {
1369 SystemTime::now()
1370 } else {
1371 SystemTime::UNIX_EPOCH
1372 .checked_add(Duration::from_secs_f64(end_time / 1000.0))
1373 .unwrap()
1374 };
1375
1376 let mut state = self.0.borrow_mut();
1377 if let OtelSpanState::Recording(span) = &mut **state {
1378 let span_context = span.span_context.clone();
1379 if let OtelSpanState::Recording(mut span) = *std::mem::replace(
1380 &mut *state,
1381 Box::new(OtelSpanState::Done(span_context)),
1382 ) {
1383 span.end_time = end_time;
1384 let Some(OtelGlobals { span_processor, .. }) = OTEL_GLOBALS.get()
1385 else {
1386 return;
1387 };
1388 span_processor.on_end(span);
1389 }
1390 }
1391 }
1392}
1393
1394#[op2(fast)]
1395fn op_otel_span_attribute1<'s>(
1396 scope: &mut v8::HandleScope<'s>,
1397 span: v8::Local<'_, v8::Value>,
1398 key: v8::Local<'s, v8::Value>,
1399 value: v8::Local<'s, v8::Value>,
1400) {
1401 let Some(span) =
1402 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1403 else {
1404 return;
1405 };
1406 let mut state = span.0.borrow_mut();
1407 if let OtelSpanState::Recording(span) = &mut **state {
1408 attr!(scope, span.attributes => span.dropped_attributes_count, key, value);
1409 }
1410}
1411
1412#[op2(fast)]
1413fn op_otel_span_attribute2<'s>(
1414 scope: &mut v8::HandleScope<'s>,
1415 span: v8::Local<'_, v8::Value>,
1416 key1: v8::Local<'s, v8::Value>,
1417 value1: v8::Local<'s, v8::Value>,
1418 key2: v8::Local<'s, v8::Value>,
1419 value2: v8::Local<'s, v8::Value>,
1420) {
1421 let Some(span) =
1422 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1423 else {
1424 return;
1425 };
1426 let mut state = span.0.borrow_mut();
1427 if let OtelSpanState::Recording(span) = &mut **state {
1428 attr!(scope, span.attributes => span.dropped_attributes_count, key1, value1);
1429 attr!(scope, span.attributes => span.dropped_attributes_count, key2, value2);
1430 }
1431}
1432
1433#[allow(clippy::too_many_arguments)]
1434#[op2(fast)]
1435fn op_otel_span_attribute3<'s>(
1436 scope: &mut v8::HandleScope<'s>,
1437 span: v8::Local<'_, v8::Value>,
1438 key1: v8::Local<'s, v8::Value>,
1439 value1: v8::Local<'s, v8::Value>,
1440 key2: v8::Local<'s, v8::Value>,
1441 value2: v8::Local<'s, v8::Value>,
1442 key3: v8::Local<'s, v8::Value>,
1443 value3: v8::Local<'s, v8::Value>,
1444) {
1445 let Some(span) =
1446 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1447 else {
1448 return;
1449 };
1450 let mut state = span.0.borrow_mut();
1451 if let OtelSpanState::Recording(span) = &mut **state {
1452 attr!(scope, span.attributes => span.dropped_attributes_count, key1, value1);
1453 attr!(scope, span.attributes => span.dropped_attributes_count, key2, value2);
1454 attr!(scope, span.attributes => span.dropped_attributes_count, key3, value3);
1455 }
1456}
1457
1458#[op2(fast)]
1459fn op_otel_span_update_name<'s>(
1460 scope: &mut v8::HandleScope<'s>,
1461 span: v8::Local<'s, v8::Value>,
1462 name: v8::Local<'s, v8::Value>,
1463) {
1464 let Ok(name) = name.try_cast() else {
1465 return;
1466 };
1467 let name = owned_string(scope, name);
1468 let Some(span) =
1469 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1470 else {
1471 return;
1472 };
1473 let mut state = span.0.borrow_mut();
1474 if let OtelSpanState::Recording(span) = &mut **state {
1475 span.name = Cow::Owned(name)
1476 }
1477}
1478
1479#[op2(fast)]
1480fn op_otel_span_add_link<'s>(
1481 scope: &mut v8::HandleScope<'s>,
1482 span: v8::Local<'s, v8::Value>,
1483 trace_id: v8::Local<'s, v8::Value>,
1484 span_id: v8::Local<'s, v8::Value>,
1485 #[smi] trace_flags: u8,
1486 is_remote: bool,
1487 #[smi] dropped_attributes_count: u32,
1488) -> bool {
1489 let trace_id = parse_trace_id(scope, trace_id);
1490 if trace_id == TraceId::INVALID {
1491 return false;
1492 };
1493 let span_id = parse_span_id(scope, span_id);
1494 if span_id == SpanId::INVALID {
1495 return false;
1496 };
1497 let span_context = SpanContext::new(
1498 trace_id,
1499 span_id,
1500 TraceFlags::new(trace_flags),
1501 is_remote,
1502 TraceState::NONE,
1503 );
1504
1505 let Some(span) =
1506 deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
1507 else {
1508 return true;
1509 };
1510 let mut state = span.0.borrow_mut();
1511 if let OtelSpanState::Recording(span) = &mut **state {
1512 span.links.links.push(Link::new(
1513 span_context,
1514 vec![],
1515 dropped_attributes_count,
1516 ));
1517 }
1518 true
1519}
1520
1521struct OtelMeter(opentelemetry::metrics::Meter);
1522
1523impl deno_core::GarbageCollected for OtelMeter {}
1524
1525#[op2]
1526impl OtelMeter {
1527 #[constructor]
1528 #[cppgc]
1529 fn new(
1530 #[string] name: String,
1531 #[string] version: Option<String>,
1532 #[string] schema_url: Option<String>,
1533 ) -> OtelMeter {
1534 let mut builder = opentelemetry::InstrumentationScope::builder(name);
1535 if let Some(version) = version {
1536 builder = builder.with_version(version);
1537 }
1538 if let Some(schema_url) = schema_url {
1539 builder = builder.with_schema_url(schema_url);
1540 }
1541 let scope = builder.build();
1542 let meter = OTEL_GLOBALS
1543 .get()
1544 .unwrap()
1545 .meter_provider
1546 .meter_with_scope(scope);
1547 OtelMeter(meter)
1548 }
1549
1550 #[cppgc]
1551 fn create_counter<'s>(
1552 &self,
1553 scope: &mut v8::HandleScope<'s>,
1554 name: v8::Local<'s, v8::Value>,
1555 description: v8::Local<'s, v8::Value>,
1556 unit: v8::Local<'s, v8::Value>,
1557 ) -> Result<Instrument, JsErrorBox> {
1558 create_instrument(
1559 |name| self.0.f64_counter(name),
1560 |i| Instrument::Counter(i.build()),
1561 scope,
1562 name,
1563 description,
1564 unit,
1565 )
1566 .map_err(|e| JsErrorBox::generic(e.to_string()))
1567 }
1568
1569 #[cppgc]
1570 fn create_up_down_counter<'s>(
1571 &self,
1572 scope: &mut v8::HandleScope<'s>,
1573 name: v8::Local<'s, v8::Value>,
1574 description: v8::Local<'s, v8::Value>,
1575 unit: v8::Local<'s, v8::Value>,
1576 ) -> Result<Instrument, JsErrorBox> {
1577 create_instrument(
1578 |name| self.0.f64_up_down_counter(name),
1579 |i| Instrument::UpDownCounter(i.build()),
1580 scope,
1581 name,
1582 description,
1583 unit,
1584 )
1585 .map_err(|e| JsErrorBox::generic(e.to_string()))
1586 }
1587
1588 #[cppgc]
1589 fn create_gauge<'s>(
1590 &self,
1591 scope: &mut v8::HandleScope<'s>,
1592 name: v8::Local<'s, v8::Value>,
1593 description: v8::Local<'s, v8::Value>,
1594 unit: v8::Local<'s, v8::Value>,
1595 ) -> Result<Instrument, JsErrorBox> {
1596 create_instrument(
1597 |name| self.0.f64_gauge(name),
1598 |i| Instrument::Gauge(i.build()),
1599 scope,
1600 name,
1601 description,
1602 unit,
1603 )
1604 .map_err(|e| JsErrorBox::generic(e.to_string()))
1605 }
1606
1607 #[cppgc]
1608 fn create_histogram<'s>(
1609 &self,
1610 scope: &mut v8::HandleScope<'s>,
1611 name: v8::Local<'s, v8::Value>,
1612 description: v8::Local<'s, v8::Value>,
1613 unit: v8::Local<'s, v8::Value>,
1614 #[serde] boundaries: Option<Vec<f64>>,
1615 ) -> Result<Instrument, JsErrorBox> {
1616 let name = owned_string(
1617 scope,
1618 name
1619 .try_cast()
1620 .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
1621 );
1622 let mut builder = self.0.f64_histogram(name);
1623 if !description.is_null_or_undefined() {
1624 let description = owned_string(
1625 scope,
1626 description
1627 .try_cast()
1628 .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
1629 );
1630 builder = builder.with_description(description);
1631 };
1632 if !unit.is_null_or_undefined() {
1633 let unit = owned_string(
1634 scope,
1635 unit
1636 .try_cast()
1637 .map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
1638 );
1639 builder = builder.with_unit(unit);
1640 };
1641 if let Some(boundaries) = boundaries {
1642 builder = builder.with_boundaries(boundaries);
1643 }
1644
1645 Ok(Instrument::Histogram(builder.build()))
1646 }
1647
1648 #[cppgc]
1649 fn create_observable_counter<'s>(
1650 &self,
1651 scope: &mut v8::HandleScope<'s>,
1652 name: v8::Local<'s, v8::Value>,
1653 description: v8::Local<'s, v8::Value>,
1654 unit: v8::Local<'s, v8::Value>,
1655 ) -> Result<Instrument, JsErrorBox> {
1656 create_async_instrument(
1657 |name| self.0.f64_observable_counter(name),
1658 |i| {
1659 i.build();
1660 },
1661 scope,
1662 name,
1663 description,
1664 unit,
1665 )
1666 .map_err(|e| JsErrorBox::generic(e.to_string()))
1667 }
1668
1669 #[cppgc]
1670 fn create_observable_up_down_counter<'s>(
1671 &self,
1672 scope: &mut v8::HandleScope<'s>,
1673 name: v8::Local<'s, v8::Value>,
1674 description: v8::Local<'s, v8::Value>,
1675 unit: v8::Local<'s, v8::Value>,
1676 ) -> Result<Instrument, JsErrorBox> {
1677 create_async_instrument(
1678 |name| self.0.f64_observable_up_down_counter(name),
1679 |i| {
1680 i.build();
1681 },
1682 scope,
1683 name,
1684 description,
1685 unit,
1686 )
1687 .map_err(|e| JsErrorBox::generic(e.to_string()))
1688 }
1689
1690 #[cppgc]
1691 fn create_observable_gauge<'s>(
1692 &self,
1693 scope: &mut v8::HandleScope<'s>,
1694 name: v8::Local<'s, v8::Value>,
1695 description: v8::Local<'s, v8::Value>,
1696 unit: v8::Local<'s, v8::Value>,
1697 ) -> Result<Instrument, JsErrorBox> {
1698 create_async_instrument(
1699 |name| self.0.f64_observable_gauge(name),
1700 |i| {
1701 i.build();
1702 },
1703 scope,
1704 name,
1705 description,
1706 unit,
1707 )
1708 .map_err(|e| JsErrorBox::generic(e.to_string()))
1709 }
1710}
1711
1712enum Instrument {
1713 Counter(opentelemetry::metrics::Counter<f64>),
1714 UpDownCounter(UpDownCounter<f64>),
1715 Gauge(opentelemetry::metrics::Gauge<f64>),
1716 Histogram(Histogram<f64>),
1717 Observable(Arc<Mutex<HashMap<Vec<KeyValue>, f64>>>),
1718}
1719
1720impl GarbageCollected for Instrument {}
1721
1722fn create_instrument<'a, 'b, T>(
1723 cb: impl FnOnce(String) -> InstrumentBuilder<'b, T>,
1724 cb2: impl FnOnce(InstrumentBuilder<'b, T>) -> Instrument,
1725 scope: &mut v8::HandleScope<'a>,
1726 name: v8::Local<'a, v8::Value>,
1727 description: v8::Local<'a, v8::Value>,
1728 unit: v8::Local<'a, v8::Value>,
1729) -> Result<Instrument, v8::DataError> {
1730 let name = owned_string(scope, name.try_cast()?);
1731 let mut builder = cb(name);
1732 if !description.is_null_or_undefined() {
1733 let description = owned_string(scope, description.try_cast()?);
1734 builder = builder.with_description(description);
1735 };
1736 if !unit.is_null_or_undefined() {
1737 let unit = owned_string(scope, unit.try_cast()?);
1738 builder = builder.with_unit(unit);
1739 };
1740
1741 Ok(cb2(builder))
1742}
1743
1744fn create_async_instrument<'a, 'b, T>(
1745 cb: impl FnOnce(String) -> AsyncInstrumentBuilder<'b, T, f64>,
1746 cb2: impl FnOnce(AsyncInstrumentBuilder<'b, T, f64>),
1747 scope: &mut v8::HandleScope<'a>,
1748 name: v8::Local<'a, v8::Value>,
1749 description: v8::Local<'a, v8::Value>,
1750 unit: v8::Local<'a, v8::Value>,
1751) -> Result<Instrument, DataError> {
1752 let name = owned_string(scope, name.try_cast()?);
1753 let mut builder = cb(name);
1754 if !description.is_null_or_undefined() {
1755 let description = owned_string(scope, description.try_cast()?);
1756 builder = builder.with_description(description);
1757 };
1758 if !unit.is_null_or_undefined() {
1759 let unit = owned_string(scope, unit.try_cast()?);
1760 builder = builder.with_unit(unit);
1761 };
1762
1763 let data_share = Arc::new(Mutex::new(HashMap::new()));
1764 let data_share_: Arc<Mutex<HashMap<Vec<KeyValue>, f64>>> = data_share.clone();
1765 builder = builder.with_callback(move |i| {
1766 let data = {
1767 let mut data = data_share_.lock().unwrap();
1768 std::mem::take(&mut *data)
1769 };
1770 for (attributes, value) in data {
1771 i.observe(value, &attributes);
1772 }
1773 });
1774 cb2(builder);
1775
1776 Ok(Instrument::Observable(data_share))
1777}
1778
1779struct MetricAttributes {
1780 attributes: Vec<KeyValue>,
1781}
1782
1783#[op2(fast)]
1784fn op_otel_metric_record0(
1785 state: &mut OpState,
1786 #[cppgc] instrument: &Instrument,
1787 value: f64,
1788) {
1789 let values = state.try_take::<MetricAttributes>();
1790 let attributes = match &values {
1791 Some(values) => &*values.attributes,
1792 None => &[],
1793 };
1794 match instrument {
1795 Instrument::Counter(counter) => counter.add(value, attributes),
1796 Instrument::UpDownCounter(counter) => counter.add(value, attributes),
1797 Instrument::Gauge(gauge) => gauge.record(value, attributes),
1798 Instrument::Histogram(histogram) => histogram.record(value, attributes),
1799 _ => {}
1800 }
1801}
1802
1803#[op2(fast)]
1804fn op_otel_metric_record1(
1805 state: &mut OpState,
1806 scope: &mut v8::HandleScope<'_>,
1807 instrument: v8::Local<'_, v8::Value>,
1808 value: f64,
1809 key1: v8::Local<'_, v8::Value>,
1810 value1: v8::Local<'_, v8::Value>,
1811) {
1812 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
1813 &mut *scope,
1814 instrument,
1815 ) else {
1816 return;
1817 };
1818 let mut values = state.try_take::<MetricAttributes>();
1819 let attr1 = attr_raw!(scope, key1, value1);
1820 let attributes = match &mut values {
1821 Some(values) => {
1822 if let Some(kv) = attr1 {
1823 values.attributes.reserve_exact(1);
1824 values.attributes.push(kv);
1825 }
1826 &*values.attributes
1827 }
1828 None => match attr1 {
1829 Some(kv1) => &[kv1] as &[KeyValue],
1830 None => &[],
1831 },
1832 };
1833 match &*instrument {
1834 Instrument::Counter(counter) => counter.add(value, attributes),
1835 Instrument::UpDownCounter(counter) => counter.add(value, attributes),
1836 Instrument::Gauge(gauge) => gauge.record(value, attributes),
1837 Instrument::Histogram(histogram) => histogram.record(value, attributes),
1838 _ => {}
1839 }
1840}
1841
1842#[allow(clippy::too_many_arguments)]
1843#[op2(fast)]
1844fn op_otel_metric_record2(
1845 state: &mut OpState,
1846 scope: &mut v8::HandleScope<'_>,
1847 instrument: v8::Local<'_, v8::Value>,
1848 value: f64,
1849 key1: v8::Local<'_, v8::Value>,
1850 value1: v8::Local<'_, v8::Value>,
1851 key2: v8::Local<'_, v8::Value>,
1852 value2: v8::Local<'_, v8::Value>,
1853) {
1854 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
1855 &mut *scope,
1856 instrument,
1857 ) else {
1858 return;
1859 };
1860 let mut values = state.try_take::<MetricAttributes>();
1861 let attr1 = attr_raw!(scope, key1, value1);
1862 let attr2 = attr_raw!(scope, key2, value2);
1863 let attributes = match &mut values {
1864 Some(values) => {
1865 values.attributes.reserve_exact(2);
1866 if let Some(kv1) = attr1 {
1867 values.attributes.push(kv1);
1868 }
1869 if let Some(kv2) = attr2 {
1870 values.attributes.push(kv2);
1871 }
1872 &*values.attributes
1873 }
1874 None => match (attr1, attr2) {
1875 (Some(kv1), Some(kv2)) => &[kv1, kv2] as &[KeyValue],
1876 (Some(kv1), None) => &[kv1],
1877 (None, Some(kv2)) => &[kv2],
1878 (None, None) => &[],
1879 },
1880 };
1881 match &*instrument {
1882 Instrument::Counter(counter) => counter.add(value, attributes),
1883 Instrument::UpDownCounter(counter) => counter.add(value, attributes),
1884 Instrument::Gauge(gauge) => gauge.record(value, attributes),
1885 Instrument::Histogram(histogram) => histogram.record(value, attributes),
1886 _ => {}
1887 }
1888}
1889
1890#[allow(clippy::too_many_arguments)]
1891#[op2(fast)]
1892fn op_otel_metric_record3(
1893 state: &mut OpState,
1894 scope: &mut v8::HandleScope<'_>,
1895 instrument: v8::Local<'_, v8::Value>,
1896 value: f64,
1897 key1: v8::Local<'_, v8::Value>,
1898 value1: v8::Local<'_, v8::Value>,
1899 key2: v8::Local<'_, v8::Value>,
1900 value2: v8::Local<'_, v8::Value>,
1901 key3: v8::Local<'_, v8::Value>,
1902 value3: v8::Local<'_, v8::Value>,
1903) {
1904 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
1905 &mut *scope,
1906 instrument,
1907 ) else {
1908 return;
1909 };
1910 let mut values = state.try_take::<MetricAttributes>();
1911 let attr1 = attr_raw!(scope, key1, value1);
1912 let attr2 = attr_raw!(scope, key2, value2);
1913 let attr3 = attr_raw!(scope, key3, value3);
1914 let attributes = match &mut values {
1915 Some(values) => {
1916 values.attributes.reserve_exact(3);
1917 if let Some(kv1) = attr1 {
1918 values.attributes.push(kv1);
1919 }
1920 if let Some(kv2) = attr2 {
1921 values.attributes.push(kv2);
1922 }
1923 if let Some(kv3) = attr3 {
1924 values.attributes.push(kv3);
1925 }
1926 &*values.attributes
1927 }
1928 None => match (attr1, attr2, attr3) {
1929 (Some(kv1), Some(kv2), Some(kv3)) => &[kv1, kv2, kv3] as &[KeyValue],
1930 (Some(kv1), Some(kv2), None) => &[kv1, kv2],
1931 (Some(kv1), None, Some(kv3)) => &[kv1, kv3],
1932 (None, Some(kv2), Some(kv3)) => &[kv2, kv3],
1933 (Some(kv1), None, None) => &[kv1],
1934 (None, Some(kv2), None) => &[kv2],
1935 (None, None, Some(kv3)) => &[kv3],
1936 (None, None, None) => &[],
1937 },
1938 };
1939 match &*instrument {
1940 Instrument::Counter(counter) => counter.add(value, attributes),
1941 Instrument::UpDownCounter(counter) => counter.add(value, attributes),
1942 Instrument::Gauge(gauge) => gauge.record(value, attributes),
1943 Instrument::Histogram(histogram) => histogram.record(value, attributes),
1944 _ => {}
1945 }
1946}
1947
1948#[op2(fast)]
1949fn op_otel_metric_observable_record0(
1950 state: &mut OpState,
1951 #[cppgc] instrument: &Instrument,
1952 value: f64,
1953) {
1954 let values = state.try_take::<MetricAttributes>();
1955 let attributes = values.map(|attr| attr.attributes).unwrap_or_default();
1956 if let Instrument::Observable(data_share) = instrument {
1957 let mut data = data_share.lock().unwrap();
1958 data.insert(attributes, value);
1959 }
1960}
1961
1962#[op2(fast)]
1963fn op_otel_metric_observable_record1(
1964 state: &mut OpState,
1965 scope: &mut v8::HandleScope<'_>,
1966 instrument: v8::Local<'_, v8::Value>,
1967 value: f64,
1968 key1: v8::Local<'_, v8::Value>,
1969 value1: v8::Local<'_, v8::Value>,
1970) {
1971 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
1972 &mut *scope,
1973 instrument,
1974 ) else {
1975 return;
1976 };
1977 let values = state.try_take::<MetricAttributes>();
1978 let attr1 = attr_raw!(scope, key1, value1);
1979 let mut attributes = values
1980 .map(|mut attr| {
1981 attr.attributes.reserve_exact(1);
1982 attr.attributes
1983 })
1984 .unwrap_or_else(|| Vec::with_capacity(1));
1985 if let Some(kv1) = attr1 {
1986 attributes.push(kv1);
1987 }
1988 if let Instrument::Observable(data_share) = &*instrument {
1989 let mut data = data_share.lock().unwrap();
1990 data.insert(attributes, value);
1991 }
1992}
1993
1994#[allow(clippy::too_many_arguments)]
1995#[op2(fast)]
1996fn op_otel_metric_observable_record2(
1997 state: &mut OpState,
1998 scope: &mut v8::HandleScope<'_>,
1999 instrument: v8::Local<'_, v8::Value>,
2000 value: f64,
2001 key1: v8::Local<'_, v8::Value>,
2002 value1: v8::Local<'_, v8::Value>,
2003 key2: v8::Local<'_, v8::Value>,
2004 value2: v8::Local<'_, v8::Value>,
2005) {
2006 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2007 &mut *scope,
2008 instrument,
2009 ) else {
2010 return;
2011 };
2012 let values = state.try_take::<MetricAttributes>();
2013 let mut attributes = values
2014 .map(|mut attr| {
2015 attr.attributes.reserve_exact(2);
2016 attr.attributes
2017 })
2018 .unwrap_or_else(|| Vec::with_capacity(2));
2019 let attr1 = attr_raw!(scope, key1, value1);
2020 let attr2 = attr_raw!(scope, key2, value2);
2021 if let Some(kv1) = attr1 {
2022 attributes.push(kv1);
2023 }
2024 if let Some(kv2) = attr2 {
2025 attributes.push(kv2);
2026 }
2027 if let Instrument::Observable(data_share) = &*instrument {
2028 let mut data = data_share.lock().unwrap();
2029 data.insert(attributes, value);
2030 }
2031}
2032
2033#[allow(clippy::too_many_arguments)]
2034#[op2(fast)]
2035fn op_otel_metric_observable_record3(
2036 state: &mut OpState,
2037 scope: &mut v8::HandleScope<'_>,
2038 instrument: v8::Local<'_, v8::Value>,
2039 value: f64,
2040 key1: v8::Local<'_, v8::Value>,
2041 value1: v8::Local<'_, v8::Value>,
2042 key2: v8::Local<'_, v8::Value>,
2043 value2: v8::Local<'_, v8::Value>,
2044 key3: v8::Local<'_, v8::Value>,
2045 value3: v8::Local<'_, v8::Value>,
2046) {
2047 let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
2048 &mut *scope,
2049 instrument,
2050 ) else {
2051 return;
2052 };
2053 let values = state.try_take::<MetricAttributes>();
2054 let mut attributes = values
2055 .map(|mut attr| {
2056 attr.attributes.reserve_exact(3);
2057 attr.attributes
2058 })
2059 .unwrap_or_else(|| Vec::with_capacity(3));
2060 let attr1 = attr_raw!(scope, key1, value1);
2061 let attr2 = attr_raw!(scope, key2, value2);
2062 let attr3 = attr_raw!(scope, key3, value3);
2063 if let Some(kv1) = attr1 {
2064 attributes.push(kv1);
2065 }
2066 if let Some(kv2) = attr2 {
2067 attributes.push(kv2);
2068 }
2069 if let Some(kv3) = attr3 {
2070 attributes.push(kv3);
2071 }
2072 if let Instrument::Observable(data_share) = &*instrument {
2073 let mut data = data_share.lock().unwrap();
2074 data.insert(attributes, value);
2075 }
2076}
2077
2078#[allow(clippy::too_many_arguments)]
2079#[op2(fast)]
2080fn op_otel_metric_attribute3<'s>(
2081 scope: &mut v8::HandleScope<'s>,
2082 state: &mut OpState,
2083 #[smi] capacity: u32,
2084 key1: v8::Local<'s, v8::Value>,
2085 value1: v8::Local<'s, v8::Value>,
2086 key2: v8::Local<'s, v8::Value>,
2087 value2: v8::Local<'s, v8::Value>,
2088 key3: v8::Local<'s, v8::Value>,
2089 value3: v8::Local<'s, v8::Value>,
2090) {
2091 let mut values = state.try_borrow_mut::<MetricAttributes>();
2092 let attr1 = attr_raw!(scope, key1, value1);
2093 let attr2 = attr_raw!(scope, key2, value2);
2094 let attr3 = attr_raw!(scope, key3, value3);
2095 if let Some(values) = &mut values {
2096 values.attributes.reserve_exact(
2097 (capacity as usize).saturating_sub(values.attributes.capacity()),
2098 );
2099 if let Some(kv1) = attr1 {
2100 values.attributes.push(kv1);
2101 }
2102 if let Some(kv2) = attr2 {
2103 values.attributes.push(kv2);
2104 }
2105 if let Some(kv3) = attr3 {
2106 values.attributes.push(kv3);
2107 }
2108 } else {
2109 let mut attributes = Vec::with_capacity(capacity as usize);
2110 if let Some(kv1) = attr1 {
2111 attributes.push(kv1);
2112 }
2113 if let Some(kv2) = attr2 {
2114 attributes.push(kv2);
2115 }
2116 if let Some(kv3) = attr3 {
2117 attributes.push(kv3);
2118 }
2119 state.put(MetricAttributes { attributes });
2120 }
2121}
2122
2123struct ObservationDone(oneshot::Sender<()>);
2124
2125#[op2(async)]
2126async fn op_otel_metric_wait_to_observe(state: Rc<RefCell<OpState>>) -> bool {
2127 let (tx, rx) = oneshot::channel();
2128 {
2129 OTEL_PRE_COLLECT_CALLBACKS
2130 .lock()
2131 .expect("mutex poisoned")
2132 .push(tx);
2133 }
2134 if let Ok(done) = rx.await {
2135 state.borrow_mut().put(ObservationDone(done));
2136 true
2137 } else {
2138 false
2139 }
2140}
2141
2142#[op2(fast)]
2143fn op_otel_metric_observation_done(state: &mut OpState) {
2144 if let Some(ObservationDone(done)) = state.try_take::<ObservationDone>() {
2145 let _ = done.send(());
2146 }
2147}