fake_opentelemetry_collector/
lib.rs1mod common;
2mod logs;
3mod metrics;
4mod trace;
5
6use logs::*;
7use metrics::*;
8use trace::*;
9
10pub use logs::ExportedLog;
11pub use metrics::ExportedMetric;
12pub use trace::ExportedSpan;
13
14use futures::StreamExt;
15use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
16use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
17use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
18use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
19use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
20use std::net::SocketAddr;
21use std::time::{Duration, Instant};
22use tokio::sync::mpsc;
23use tokio::sync::mpsc::Receiver;
24use tokio_stream::wrappers::TcpListenerStream;
25use tracing::debug;
26
27pub struct FakeCollectorServer {
28 address: SocketAddr,
29 req_rx: mpsc::Receiver<ExportedSpan>,
30 log_rx: mpsc::Receiver<ExportedLog>,
31 metrics_rx: mpsc::Receiver<ExportedMetric>,
32 handle: tokio::task::JoinHandle<()>,
33}
34
35impl FakeCollectorServer {
36 pub async fn start() -> Result<Self, Box<dyn std::error::Error>> {
37 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
38 let listener = tokio::net::TcpListener::bind(addr).await?;
39 let addr = listener.local_addr()?;
40 let stream = TcpListenerStream::new(listener).map(|s| {
41 if let Ok(ref s) = s {
42 debug!("Got new conn at {}", s.peer_addr()?);
43 }
44 s
45 });
46
47 let (req_tx, req_rx) = mpsc::channel::<ExportedSpan>(64);
48 let (log_tx, log_rx) = mpsc::channel::<ExportedLog>(64);
49 let (metrics_tx, metrics_rx) = mpsc::channel::<ExportedMetric>(64);
50 let trace_service = TraceServiceServer::new(FakeTraceService::new(req_tx));
51 let logs_service = LogsServiceServer::new(FakeLogsService::new(log_tx));
52 let metrics_service = MetricsServiceServer::new(FakeMetricsService::new(metrics_tx));
53 let handle = tokio::task::spawn(async move {
54 debug!("start FakeCollectorServer http://{addr}"); tonic::transport::Server::builder()
56 .add_service(trace_service)
57 .add_service(logs_service)
58 .add_service(metrics_service)
59 .serve_with_incoming(stream)
60 .await
61 .expect("Server failed");
62 debug!("stop FakeCollectorServer");
63 });
64 Ok(Self {
65 address: addr,
66 req_rx,
67 log_rx,
68 metrics_rx,
69 handle,
70 })
71 }
72
73 pub fn address(&self) -> SocketAddr {
74 self.address
75 }
76
77 pub fn endpoint(&self) -> String {
78 format!("http://{}", self.address()) }
80
81 pub async fn exported_spans(
82 &mut self,
83 at_least: usize,
84 timeout: Duration,
85 ) -> Vec<ExportedSpan> {
86 recv_many(&mut self.req_rx, at_least, timeout).await
87 }
88
89 pub async fn exported_logs(&mut self, at_least: usize, timeout: Duration) -> Vec<ExportedLog> {
90 recv_many(&mut self.log_rx, at_least, timeout).await
91 }
92
93 pub async fn exported_metrics(
94 &mut self,
95 at_least: usize,
96 timeout: Duration,
97 ) -> Vec<ExportedMetric> {
98 recv_many(&mut self.metrics_rx, at_least, timeout).await
99 }
100
101 pub fn abort(self) {
102 self.handle.abort()
103 }
104}
105
106async fn recv_many<T>(rx: &mut Receiver<T>, at_least: usize, timeout: Duration) -> Vec<T> {
107 let deadline = Instant::now();
108 let pause = (timeout / 10).min(Duration::from_millis(10));
109 while rx.len() < at_least && deadline.elapsed() < timeout {
110 tokio::time::sleep(pause).await;
111 }
112 std::iter::from_fn(|| rx.try_recv().ok()).collect::<Vec<_>>()
113}
114
115pub async fn setup_tracer_provider(
116 fake_server: &FakeCollectorServer,
117) -> opentelemetry_sdk::trace::SdkTracerProvider {
118 std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT");
120
121 opentelemetry_sdk::trace::SdkTracerProvider::builder()
122 .with_batch_exporter(
123 SpanExporter::builder()
124 .with_tonic()
125 .with_endpoint(fake_server.endpoint())
126 .build()
127 .expect("failed to install tracer"),
128 )
129 .build()
130}
131
132pub async fn setup_logger_provider(
133 fake_server: &FakeCollectorServer,
134) -> opentelemetry_sdk::logs::SdkLoggerProvider {
135 opentelemetry_sdk::logs::SdkLoggerProvider::builder()
136 .with_simple_exporter(
138 LogExporter::builder()
139 .with_tonic()
140 .with_endpoint(fake_server.endpoint())
141 .build()
142 .expect("failed to install logging"),
143 )
144 .build()
145}
146
147pub async fn setup_meter_provider(
148 fake_server: &FakeCollectorServer,
149) -> opentelemetry_sdk::metrics::SdkMeterProvider {
150 let exporter = MetricExporter::builder()
151 .with_tonic()
152 .with_endpoint(fake_server.endpoint())
153 .build()
154 .expect("failed to install metrics");
155
156 let reader = PeriodicReader::builder(exporter).build();
157
158 SdkMeterProvider::builder().with_reader(reader).build()
159}