laburnum 1.17.1

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
Documentation
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

use {
  super::prefixer::PrefixingExporter,
  opentelemetry::{
    KeyValue,
    global,
  },
  opentelemetry_otlp::{
    Protocol,
    WithExportConfig,
    WithHttpConfig,
    WithTonicConfig,
  },
  opentelemetry_sdk::{
    Resource,
    propagation::TraceContextPropagator,
  },
  opentelemetry_stdout::SpanExporter as StdoutExporter,
};

pub struct Telemetry {
  pub(crate) tracer_provider: opentelemetry_sdk::trace::SdkTracerProvider,
}

impl Drop for Telemetry {
  fn drop(&mut self) {
    otel::event!("Starting telemetry flush and shutdown");
    if let Err(e) = self.tracer_provider.force_flush() {
      otel::error!(
        "tracer_provider_flush_failed",
        format!("Failed to flush TracerProvider: {:?}", e)
      );
    } else {
      otel::event!("TracerProvider flushed successfully");
    }
    if let Err(e) = self.tracer_provider.shutdown() {
      otel::error!(
        "tracer_provider_shutdown_failed",
        format!("Failed to shutdown TracerProvider: {:?}", e)
      );
    } else {
      otel::event!("TracerProvider shutdown successfully");
    }
    otel::event!("Telemetry shutdown complete");
  }
}

/// Creates a gRPC OTLP exporter running on a dedicated Tokio thread.
/// Tonic requires Tokio, so we isolate it from the main smol runtime.
fn create_grpc_exporter(
  endpoint: &str,
  headers: std::collections::HashMap<String, String>,
) -> anyhow::Result<opentelemetry_otlp::SpanExporter> {
  let endpoint = endpoint.to_string();
  let (tx, rx) = std::sync::mpsc::sync_channel::<Result<opentelemetry_otlp::SpanExporter, String>>(1);

  std::thread::Builder::new()
    .name("otel-grpc-runtime".to_string())
    .spawn(move || {
      let rt = match tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
      {
        | Ok(rt) => rt,
        | Err(e) => {
          let _ = tx.send(Err(format!("Failed to create Tokio runtime for gRPC: {e}")));
          return;
        }
      };

      let exporter_result = rt.block_on(async {
        let mut builder = opentelemetry_otlp::SpanExporter::builder()
          .with_tonic()
          .with_endpoint(&endpoint)
          .with_timeout(std::time::Duration::from_secs(4));

        if !headers.is_empty() {
          let mut metadata = tonic::metadata::MetadataMap::new();
          for (k, v) in headers.into_iter() {
            if let (Ok(key), Ok(value)) = (
              k.parse::<tonic::metadata::MetadataKey<_>>(),
              v.parse::<tonic::metadata::MetadataValue<_>>(),
            ) {
              metadata.insert(key, value);
            }
          }
          builder = builder.with_metadata(metadata);
        }

        builder
          .build()
          .map_err(|e| format!("Failed to create gRPC OTLP exporter: {e}"))
      });

      let is_ok = exporter_result.is_ok();
      let _ = tx.send(exporter_result);

      // Keep runtime alive for the exporter's async operations
      if is_ok {
        rt.block_on(std::future::pending::<()>());
      }
    })
    .map_err(|e| anyhow::anyhow!("Failed to spawn gRPC runtime thread: {e}"))?;

  rx.recv()
    .map_err(|e| anyhow::anyhow!("Failed to receive gRPC exporter: {e}"))?
    .map_err(|e| anyhow::anyhow!("{e}"))
}

pub fn setup_telemetry() -> anyhow::Result<Telemetry> {
  let resource = Resource::builder()
    // .with_detectors(&[Box::new(EnvResourceDetector::new())
    //   as Box<dyn opentelemetry_sdk::resource::ResourceDetector>])
    .with_service_name(env!("CARGO_PKG_NAME"))
    .with_attributes([
        KeyValue::new("service.namespace", "laburnum"),
        KeyValue::new("process.runtime.name", "rustc"),
        KeyValue::new(
          "process.runtime.version",
          env!("CARGO_PKG_RUST_VERSION"),
        ),
        KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
    ])
    .with_schema_url(
      std::iter::empty::<KeyValue>(),
      format!(
        "https://opentelemetry.io/schemas/{}",
        otel::OTEL_SPEC_VERSION
      ),
    )
    .build();

  // Protocol selection: http/json (default), http/protobuf, or grpc
  let protocol = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL")
    .unwrap_or_else(|_| "http/json".to_string());

  // Local OTLP endpoint configuration
  let local_endpoint_default = if protocol == "grpc" {
    "http://localhost:4317"
  } else {
    "http://localhost:4318"
  };

  let local_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT")
    .unwrap_or_else(|_| local_endpoint_default.to_string());

  let mut builder = opentelemetry_sdk::trace::SdkTracerProvider::builder()
    .with_sampler(opentelemetry_sdk::trace::Sampler::AlwaysOn);

  // Always add local OTLP exporter
  let local_exporter: PrefixingExporter<opentelemetry_otlp::SpanExporter> =
    match protocol.as_str() {
      | "grpc" => {
        PrefixingExporter::new(
          "laburnum",
          create_grpc_exporter(
            &local_endpoint,
            std::collections::HashMap::new(),
          )?,
        )
      },
      | "http/protobuf" => {
        PrefixingExporter::new(
          "laburnum",
          opentelemetry_otlp::SpanExporter::builder()
            .with_http()
            .with_http_client(reqwest::blocking::Client::new())
            .with_protocol(Protocol::HttpBinary)
            .with_endpoint(format!("{}/v1/traces", local_endpoint))
            .with_timeout(std::time::Duration::from_secs(4))
            .build()
            .map_err(|e| anyhow::anyhow!("Failed to create HTTP/Protobuf OTLP exporter: {e}"))?,
        )
      },
      | _ => {
        PrefixingExporter::new(
          "laburnum",
          opentelemetry_otlp::SpanExporter::builder()
            .with_http()
            .with_http_client(reqwest::blocking::Client::new())
            .with_protocol(Protocol::HttpJson)
            .with_endpoint(format!("{}/v1/traces", local_endpoint))
            .with_timeout(std::time::Duration::from_secs(4))
            .build()
            .map_err(|e| anyhow::anyhow!("Failed to create HTTP/JSON OTLP exporter: {e}"))?,
        )
      },
    };

  builder = builder.with_span_processor(
    opentelemetry_sdk::trace::BatchSpanProcessor::builder(local_exporter)
      .with_batch_config(
        opentelemetry_sdk::trace::BatchConfigBuilder::default()
          .with_max_queue_size(2048)
          .with_max_export_batch_size(512)
          .build(),
      )
      .build(),
  );

  if std::env::var("RUST_LOG").is_ok() {
    let stdout_exporter = StdoutExporter::default();
    builder = builder.with_span_processor(
      opentelemetry_sdk::trace::SimpleSpanProcessor::new(stdout_exporter),
    );
  }

  let tracer_provider = builder.with_resource(resource).build();

  global::set_tracer_provider(tracer_provider.clone());

  // Set up global trace context propagator
  global::set_text_map_propagator(TraceContextPropagator::new());

  Ok(Telemetry { tracer_provider })
}