emit_otlp 1.18.0

Emit diagnostic events to an OpenTelemetry-compatible collector.
Documentation
#[cfg(not(all(
    target_arch = "wasm32",
    target_vendor = "unknown",
    target_os = "unknown"
)))]
#[path = "http/tokio.rs"]
mod imp;

#[cfg(all(
    feature = "web",
    target_arch = "wasm32",
    target_vendor = "unknown",
    target_os = "unknown"
))]
#[path = "http/web.rs"]
mod imp;

#[cfg(all(
    not(feature = "web"),
    target_arch = "wasm32",
    target_vendor = "unknown",
    target_os = "unknown"
))]
#[path = "http/stub.rs"]
mod imp;

pub(crate) use self::imp::*;

use std::{borrow::Cow, fmt, io::Cursor};

use bytes::Buf;
use emit::{Ctxt as _, Props as _};

use crate::{
    client::Encoding,
    data::{EncodedPayload, PreEncodedCursor},
    internal_metrics::InternalMetrics,
    Error,
};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum HttpVersion {
    Http1,
    Http2,
}

pub(crate) struct HttpUri(http::Uri);

impl fmt::Display for HttpUri {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt::Display::fmt(&self.0, f)
    }
}

impl HttpUri {
    fn new(uri: impl AsRef<str>) -> Result<Self, Error> {
        let uri = uri.as_ref();

        Ok(HttpUri(uri.parse().map_err(|e| {
            Error::new(format_args!("failed to parse {uri}"), e)
        })?))
    }

    #[cfg(not(all(
        target_arch = "wasm32",
        target_vendor = "unknown",
        target_os = "unknown"
    )))]
    pub fn is_https(&self) -> bool {
        self.0.scheme().unwrap() == &http::uri::Scheme::HTTPS
    }
}

#[cfg(not(all(
    target_arch = "wasm32",
    target_vendor = "unknown",
    target_os = "unknown"
)))]
impl HttpUri {
    pub fn host(&self) -> &str {
        self.0.host().unwrap()
    }

    pub fn authority(&self) -> &str {
        self.0.authority().unwrap().as_str()
    }

    pub fn port(&self) -> u16 {
        self.0
            .port_u16()
            .unwrap_or_else(|| if self.is_https() { 443 } else { 80 })
    }
}

#[derive(Clone)]
pub(crate) struct HttpContent {
    custom_headers: &'static [(&'static str, &'static str)],
    content_frame: Option<HttpContentHeader>,
    content_payload: Option<HttpContentPayload>,
    content_type_header: &'static str,
    content_encoding_header: Option<&'static str>,
}

fn content_type_of(payload: &EncodedPayload) -> &'static str {
    match Encoding::of(payload) {
        Encoding::Proto => "application/x-protobuf",
        Encoding::Json => "application/json",
    }
}

impl HttpContent {
    fn new(
        allow_compression: bool,
        configure: impl Fn(HttpContent) -> Result<HttpContent, Error>,
        metrics: &InternalMetrics,
        payload: EncodedPayload,
    ) -> Result<HttpContent, Error> {
        let body = {
            #[cfg(feature = "gzip")]
            {
                if allow_compression {
                    metrics.transport_request_compress_gzip.increment();

                    HttpContent::gzip(payload)?
                } else {
                    HttpContent::raw(payload)
                }
            }
            #[cfg(not(feature = "gzip"))]
            {
                let _ = allow_compression;
                let _ = metrics;

                HttpContent::raw(payload)
            }
        };

        configure(body)
    }

    fn raw(payload: EncodedPayload) -> Self {
        HttpContent {
            content_frame: None,
            content_type_header: content_type_of(&payload),
            content_encoding_header: None,
            custom_headers: &[],
            content_payload: Some(HttpContentPayload::PreEncoded(payload)),
        }
    }

    #[cfg(feature = "gzip")]
    fn gzip(payload: EncodedPayload) -> Result<Self, Error> {
        use std::io::Write as _;

        let content_type = content_type_of(&payload);

        let mut enc = flate2::write::GzEncoder::new(
            Vec::with_capacity(payload.len()),
            flate2::Compression::fast(),
        );

        let mut payload = payload.into_cursor();
        loop {
            let chunk = payload.chunk();
            if chunk.len() == 0 {
                break;
            }

            enc.write_all(chunk)
                .map_err(|e| Error::new("failed to compress a chunk of bytes", e))?;
            payload.advance(chunk.len());
        }

        let buf = enc
            .finish()
            .map_err(|e| Error::new("failed to finalize compression", e))?;

        Ok(HttpContent {
            content_type_header: content_type,
            content_encoding_header: Some("gzip"),
            custom_headers: &[],
            content_frame: None,
            content_payload: Some(HttpContentPayload::Bytes(buf.into_boxed_slice())),
        })
    }

    pub fn with_content_frame(mut self, header: [u8; 5]) -> Self {
        self.content_frame = Some(HttpContentHeader::SmallBytes(header));
        self
    }

    pub fn content_type_header(&self) -> &'static str {
        self.content_type_header
    }

    pub fn with_content_type_header(mut self, content_type: &'static str) -> Self {
        self.content_type_header = content_type;
        self
    }

    pub fn with_content_encoding_header(mut self, content_encoding: Option<&'static str>) -> Self {
        self.content_encoding_header = content_encoding;
        self
    }

    pub fn content_encoding_header(&self) -> Option<&'static str> {
        self.content_encoding_header
    }

    pub fn with_headers(mut self, headers: &'static [(&'static str, &'static str)]) -> Self {
        self.custom_headers = headers;
        self
    }

    pub fn iter_headers(&self) -> impl Iterator<Item = (&str, Cow<'_, str>)> {
        Some(("content-type", Cow::Borrowed(self.content_type_header)))
            .into_iter()
            .chain(Some((
                "content-length",
                Cow::Owned(self.content_len().to_string()),
            )))
            .chain(
                self.content_encoding_header
                    .map(|v| ("content-encoding", Cow::Borrowed(v))),
            )
            .chain(
                self.custom_headers
                    .iter()
                    .map(|(k, v)| (*k, Cow::Borrowed(*v))),
            )
    }

    pub fn content_len(&self) -> usize {
        self.content_frame_len() + self.content_payload_len()
    }

    pub fn content_frame_len(&self) -> usize {
        self.content_frame
            .as_ref()
            .map(|header| header.len())
            .unwrap_or(0)
    }

    pub fn content_payload_len(&self) -> usize {
        self.content_payload
            .as_ref()
            .map(|payload| payload.len())
            .unwrap_or(0)
    }

    fn next_content_cursor(&mut self) -> Option<HttpContentCursor> {
        if let Some(header) = self.content_frame.take() {
            return Some(header.into_cursor());
        }

        if let Some(payload) = self.content_payload.take() {
            return Some(payload.into_cursor());
        };

        None
    }
}

#[cfg(not(all(
    target_arch = "wasm32",
    target_vendor = "unknown",
    target_os = "unknown"
)))]
impl HttpContent {
    fn has_next_content_cursor(&self) -> bool {
        self.content_frame.is_some() || self.content_payload.is_some()
    }
}

#[derive(Clone)]
enum HttpContentHeader {
    // NOTE: Basically hardcodes gRPC header, but could be generalized if it was worth it
    SmallBytes([u8; 5]),
}

#[derive(Clone)]
enum HttpContentPayload {
    PreEncoded(EncodedPayload),
    #[allow(dead_code)]
    Bytes(Box<[u8]>),
}

impl HttpContentHeader {
    fn len(&self) -> usize {
        match self {
            HttpContentHeader::SmallBytes(header) => header.len(),
        }
    }

    fn into_cursor(self) -> HttpContentCursor {
        match self {
            HttpContentHeader::SmallBytes(header) => {
                HttpContentCursor::SmallBytes(Cursor::new(header))
            }
        }
    }
}

impl HttpContentPayload {
    fn len(&self) -> usize {
        match self {
            HttpContentPayload::PreEncoded(payload) => payload.len(),
            HttpContentPayload::Bytes(payload) => payload.len(),
        }
    }

    fn into_cursor(self) -> HttpContentCursor {
        match self {
            HttpContentPayload::PreEncoded(payload) => {
                HttpContentCursor::PreEncoded(payload.into_cursor())
            }
            HttpContentPayload::Bytes(payload) => HttpContentCursor::Bytes(Cursor::new(payload)),
        }
    }
}

pub(crate) enum HttpContentCursor {
    PreEncoded(PreEncodedCursor),
    Bytes(Cursor<Box<[u8]>>),
    SmallBytes(Cursor<[u8; 5]>),
}

impl Buf for HttpContentCursor {
    fn remaining(&self) -> usize {
        match self {
            HttpContentCursor::PreEncoded(buf) => buf.remaining(),
            HttpContentCursor::Bytes(buf) => buf.remaining(),
            HttpContentCursor::SmallBytes(buf) => buf.remaining(),
        }
    }

    fn chunk(&self) -> &[u8] {
        match self {
            HttpContentCursor::PreEncoded(buf) => buf.chunk(),
            HttpContentCursor::Bytes(buf) => buf.chunk(),
            HttpContentCursor::SmallBytes(buf) => buf.chunk(),
        }
    }

    fn advance(&mut self, cnt: usize) {
        match self {
            HttpContentCursor::PreEncoded(buf) => buf.advance(cnt),
            HttpContentCursor::Bytes(buf) => buf.advance(cnt),
            HttpContentCursor::SmallBytes(buf) => buf.advance(cnt),
        }
    }
}

fn outgoing_traceparent_header() -> Option<(&'static str, String)> {
    let (trace_id, span_id) = emit::runtime::internal().ctxt().with_current(|props| {
        (
            props.pull::<emit::TraceId, _>(emit::well_known::KEY_TRACE_ID),
            props.pull::<emit::SpanId, _>(emit::well_known::KEY_SPAN_ID),
        )
    });

    if let (Some(trace_id), Some(span_id)) = (trace_id, span_id) {
        Some(("traceparent", format!("00-{trace_id}-{span_id}-00")))
    } else {
        None
    }
}