use std::sync::Arc;
use opentelemetry_proto::tonic::collector::{
logs::v1::{
ExportLogsServiceRequest, ExportLogsServiceResponse,
logs_service_server::{LogsService, LogsServiceServer},
},
metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
metrics_service_server::{MetricsService, MetricsServiceServer},
},
trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
trace_service_server::{TraceService, TraceServiceServer},
},
};
use parking_lot::Mutex;
use tokio::sync::oneshot;
use tonic::{Request, Response, Status, transport::Server};
#[derive(Default)]
struct Captured {
logs: Vec<ExportLogsServiceRequest>,
metrics: Vec<ExportMetricsServiceRequest>,
traces: Vec<ExportTraceServiceRequest>,
traceparents: Vec<Option<String>>,
}
#[derive(Default, Clone)]
pub struct MockCollectorState {
captured: Arc<Mutex<Captured>>,
}
impl MockCollectorState {
#[must_use]
pub fn take_logs(&self) -> Vec<ExportLogsServiceRequest> {
std::mem::take(&mut self.captured.lock().logs)
}
#[must_use]
pub fn take_metrics(&self) -> Vec<ExportMetricsServiceRequest> {
std::mem::take(&mut self.captured.lock().metrics)
}
#[must_use]
pub fn take_traces(&self) -> Vec<ExportTraceServiceRequest> {
std::mem::take(&mut self.captured.lock().traces)
}
#[must_use]
pub fn take_traceparents(&self) -> Vec<Option<String>> {
std::mem::take(&mut self.captured.lock().traceparents)
}
}
impl std::fmt::Debug for MockCollectorState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let g = self.captured.lock();
f.debug_struct("MockCollectorState")
.field("logs", &g.logs.len())
.field("metrics", &g.metrics.len())
.field("traces", &g.traces.len())
.finish()
}
}
fn extract_traceparent<T>(request: &Request<T>) -> Option<String> {
request
.metadata()
.get("traceparent")
.and_then(|v| v.to_str().ok().map(ToString::to_string))
}
#[tonic::async_trait]
impl LogsService for MockCollectorState {
async fn export(
&self,
request: Request<ExportLogsServiceRequest>,
) -> Result<Response<ExportLogsServiceResponse>, Status> {
let tp = extract_traceparent(&request);
let mut g = self.captured.lock();
g.logs.push(request.into_inner());
g.traceparents.push(tp);
drop(g);
Ok(Response::new(ExportLogsServiceResponse {
partial_success: None,
}))
}
}
#[tonic::async_trait]
impl MetricsService for MockCollectorState {
async fn export(
&self,
request: Request<ExportMetricsServiceRequest>,
) -> Result<Response<ExportMetricsServiceResponse>, Status> {
let tp = extract_traceparent(&request);
let mut g = self.captured.lock();
g.metrics.push(request.into_inner());
g.traceparents.push(tp);
drop(g);
Ok(Response::new(ExportMetricsServiceResponse {
partial_success: None,
}))
}
}
#[tonic::async_trait]
impl TraceService for MockCollectorState {
async fn export(
&self,
request: Request<ExportTraceServiceRequest>,
) -> Result<Response<ExportTraceServiceResponse>, Status> {
let tp = extract_traceparent(&request);
let mut g = self.captured.lock();
g.traces.push(request.into_inner());
g.traceparents.push(tp);
drop(g);
Ok(Response::new(ExportTraceServiceResponse {
partial_success: None,
}))
}
}
pub struct MockOtelCollector {
endpoint: String,
state: MockCollectorState,
shutdown: Option<oneshot::Sender<()>>,
runtime: Option<tokio::runtime::Runtime>,
join: Option<tokio::task::JoinHandle<()>>,
}
impl std::fmt::Debug for MockOtelCollector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MockOtelCollector")
.field("endpoint", &self.endpoint)
.field("state", &self.state)
.finish()
}
}
impl MockOtelCollector {
pub fn start() -> std::io::Result<Self> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.thread_name("obs-otlp-mock")
.build()?;
let std_listener = std::net::TcpListener::bind("127.0.0.1:0")?;
std_listener.set_nonblocking(true)?;
let local_addr = std_listener.local_addr()?;
let endpoint = format!("http://{local_addr}");
let state = MockCollectorState::default();
let (tx, rx) = oneshot::channel::<()>();
let serve_state = state.clone();
let join = runtime.spawn(async move {
let listener = match tokio::net::TcpListener::from_std(std_listener) {
Ok(l) => l,
Err(e) => {
tracing::error!(error = %e, "mock collector listener bind failed");
return;
}
};
let incoming = tonic::service::Routes::default();
let _ = incoming;
let server = Server::builder()
.add_service(LogsServiceServer::new(serve_state.clone()))
.add_service(MetricsServiceServer::new(serve_state.clone()))
.add_service(TraceServiceServer::new(serve_state));
let listener_stream = tokio_stream::wrappers::TcpListenerStream::new(listener);
if let Err(e) = server
.serve_with_incoming_shutdown(listener_stream, async move {
let _ = rx.await;
})
.await
{
tracing::error!(error = %e, "mock collector exited with error");
}
});
Ok(Self {
endpoint,
state,
shutdown: Some(tx),
runtime: Some(runtime),
join: Some(join),
})
}
#[must_use]
pub fn endpoint(&self) -> &str {
&self.endpoint
}
#[must_use]
pub fn state(&self) -> MockCollectorState {
self.state.clone()
}
}
impl Drop for MockOtelCollector {
fn drop(&mut self) {
if let Some(tx) = self.shutdown.take() {
let _ = tx.send(());
}
if let (Some(rt), Some(join)) = (self.runtime.take(), self.join.take()) {
rt.block_on(async move {
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), join).await;
});
}
}
}