fake_opentelemetry_collector/
lib.rs

1mod common;
2mod logs;
3mod metrics;
4mod trace;
5
6use logs::*;
7use metrics::*;
8use trace::*;
9
10pub use logs::ExportedLog;
11pub use metrics::ExportedMetric;
12pub use trace::ExportedSpan;
13
14use futures::StreamExt;
15use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
16use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
17use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
18use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
19use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
20use std::net::SocketAddr;
21use std::time::{Duration, Instant};
22use tokio::sync::mpsc;
23use tokio::sync::mpsc::Receiver;
24use tokio_stream::wrappers::TcpListenerStream;
25use tracing::debug;
26
27pub struct FakeCollectorServer {
28    address: SocketAddr,
29    req_rx: mpsc::Receiver<ExportedSpan>,
30    log_rx: mpsc::Receiver<ExportedLog>,
31    metrics_rx: mpsc::Receiver<ExportedMetric>,
32    handle: tokio::task::JoinHandle<()>,
33}
34
35impl FakeCollectorServer {
36    pub async fn start() -> Result<Self, Box<dyn std::error::Error>> {
37        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
38        let listener = tokio::net::TcpListener::bind(addr).await?;
39        let addr = listener.local_addr()?;
40        let stream = TcpListenerStream::new(listener).map(|s| {
41            if let Ok(ref s) = s {
42                debug!("Got new conn at {}", s.peer_addr()?);
43            }
44            s
45        });
46
47        let (req_tx, req_rx) = mpsc::channel::<ExportedSpan>(64);
48        let (log_tx, log_rx) = mpsc::channel::<ExportedLog>(64);
49        let (metrics_tx, metrics_rx) = mpsc::channel::<ExportedMetric>(64);
50        let trace_service = TraceServiceServer::new(FakeTraceService::new(req_tx));
51        let logs_service = LogsServiceServer::new(FakeLogsService::new(log_tx));
52        let metrics_service = MetricsServiceServer::new(FakeMetricsService::new(metrics_tx));
53        let handle = tokio::task::spawn(async move {
54            debug!("start FakeCollectorServer http://{addr}"); //Devskim: ignore DS137138)
55            tonic::transport::Server::builder()
56                .add_service(trace_service)
57                .add_service(logs_service)
58                .add_service(metrics_service)
59                .serve_with_incoming(stream)
60                .await
61                .expect("Server failed");
62            debug!("stop FakeCollectorServer");
63        });
64        Ok(Self {
65            address: addr,
66            req_rx,
67            log_rx,
68            metrics_rx,
69            handle,
70        })
71    }
72
73    pub fn address(&self) -> SocketAddr {
74        self.address
75    }
76
77    pub fn endpoint(&self) -> String {
78        format!("http://{}", self.address()) //Devskim: ignore DS137138)
79    }
80
81    pub async fn exported_spans(
82        &mut self,
83        at_least: usize,
84        timeout: Duration,
85    ) -> Vec<ExportedSpan> {
86        recv_many(&mut self.req_rx, at_least, timeout).await
87    }
88
89    pub async fn exported_logs(&mut self, at_least: usize, timeout: Duration) -> Vec<ExportedLog> {
90        recv_many(&mut self.log_rx, at_least, timeout).await
91    }
92
93    pub async fn exported_metrics(
94        &mut self,
95        at_least: usize,
96        timeout: Duration,
97    ) -> Vec<ExportedMetric> {
98        recv_many(&mut self.metrics_rx, at_least, timeout).await
99    }
100
101    pub fn abort(self) {
102        self.handle.abort()
103    }
104}
105
106async fn recv_many<T>(rx: &mut Receiver<T>, at_least: usize, timeout: Duration) -> Vec<T> {
107    let deadline = Instant::now();
108    let pause = (timeout / 10).min(Duration::from_millis(10));
109    while rx.len() < at_least && deadline.elapsed() < timeout {
110        tokio::time::sleep(pause).await;
111    }
112    std::iter::from_fn(|| rx.try_recv().ok()).collect::<Vec<_>>()
113}
114
115pub async fn setup_tracer_provider(
116    fake_server: &FakeCollectorServer,
117) -> opentelemetry_sdk::trace::SdkTracerProvider {
118    // if the environment variable is set (in test or in caller), `with_endpoint` value is ignored
119    std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT");
120
121    opentelemetry_sdk::trace::SdkTracerProvider::builder()
122        .with_batch_exporter(
123            SpanExporter::builder()
124                .with_tonic()
125                .with_endpoint(fake_server.endpoint())
126                .build()
127                .expect("failed to install tracer"),
128        )
129        .build()
130}
131
132pub async fn setup_logger_provider(
133    fake_server: &FakeCollectorServer,
134) -> opentelemetry_sdk::logs::SdkLoggerProvider {
135    opentelemetry_sdk::logs::SdkLoggerProvider::builder()
136        //Install simple so we don't have to wait for batching in tests
137        .with_simple_exporter(
138            LogExporter::builder()
139                .with_tonic()
140                .with_endpoint(fake_server.endpoint())
141                .build()
142                .expect("failed to install logging"),
143        )
144        .build()
145}
146
147pub async fn setup_meter_provider(
148    fake_server: &FakeCollectorServer,
149) -> opentelemetry_sdk::metrics::SdkMeterProvider {
150    let exporter = MetricExporter::builder()
151        .with_tonic()
152        .with_endpoint(fake_server.endpoint())
153        .build()
154        .expect("failed to install metrics");
155
156    let reader = PeriodicReader::builder(exporter).build();
157
158    SdkMeterProvider::builder().with_reader(reader).build()
159}