#![deny(missing_docs)]
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use http_body_util::Full;
use hyper::{Method, Request};
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use osproxy_observe::SpanExporter;
use serde_json::Value;
use tokio::sync::Semaphore;
const EXPORT_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_INFLIGHT: usize = 256;
#[derive(Clone, Debug)]
pub struct OtlpHttpExporter {
endpoint: String,
client: Client<HttpConnector, Full<Bytes>>,
handle: Option<tokio::runtime::Handle>,
inflight: Arc<Semaphore>,
}
impl OtlpHttpExporter {
#[must_use]
pub fn new(endpoint_base: &str) -> Self {
let endpoint = format!("{}/v1/traces", endpoint_base.trim_end_matches('/'));
let client = Client::builder(TokioExecutor::new()).build_http();
Self {
endpoint,
client,
handle: tokio::runtime::Handle::try_current().ok(),
inflight: Arc::new(Semaphore::new(MAX_INFLIGHT)),
}
}
#[must_use]
pub fn endpoint(&self) -> &str {
&self.endpoint
}
}
impl SpanExporter for OtlpHttpExporter {
fn export(&self, payload: Value) {
let Some(handle) = self.handle.clone() else {
return;
};
let Ok(permit) = Arc::clone(&self.inflight).try_acquire_owned() else {
return;
};
let client = self.client.clone();
let endpoint = self.endpoint.clone();
handle.spawn(async move {
let _permit = permit; let Ok(body) = serde_json::to_vec(&payload) else {
return;
};
let Ok(req) = Request::builder()
.method(Method::POST)
.uri(&endpoint)
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
else {
return;
};
let _ = tokio::time::timeout(EXPORT_TIMEOUT, client.request(req)).await;
});
}
}