#![allow(clippy::unwrap_used)]
use std::sync::{Arc, Mutex};
use std::time::Duration;
use http_body_util::{BodyExt, Full};
use hyper::body::{Bytes, Incoming};
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use osproxy_observe::SpanExporter;
use osproxy_otlp::OtlpHttpExporter;
use tokio::net::TcpListener;
#[derive(Clone, Debug)]
struct Captured {
uri: String,
content_type: String,
body: String,
}
async fn start_collector() -> (String, Arc<Mutex<Option<Captured>>>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let captured = Arc::new(Mutex::new(None));
let captured_for_task = Arc::clone(&captured);
tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let io = TokioIo::new(stream);
let service = service_fn(move |req: Request<Incoming>| {
let captured = Arc::clone(&captured_for_task);
async move {
let uri = req.uri().to_string();
let content_type = req
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_owned();
let body = req.into_body().collect().await.unwrap().to_bytes();
*captured.lock().unwrap() = Some(Captured {
uri,
content_type,
body: String::from_utf8_lossy(&body).into_owned(),
});
Ok::<_, std::convert::Infallible>(Response::new(Full::new(Bytes::from_static(
b"{}",
))))
}
});
let _ = hyper::server::conn::http1::Builder::new()
.serve_connection(io, service)
.await;
});
(format!("http://{addr}"), captured)
}
async fn await_captured(slot: &Arc<Mutex<Option<Captured>>>) -> Option<Captured> {
for _ in 0..100 {
if let Some(c) = slot.lock().unwrap().clone() {
return Some(c);
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
None
}
#[tokio::test]
async fn posts_otlp_json_spans_to_v1_traces() {
let (base, captured) = start_collector().await;
let exporter = OtlpHttpExporter::new(&base);
assert!(exporter.endpoint().ends_with("/v1/traces"));
assert!(exporter.enabled());
let payload = serde_json::json!({
"resourceSpans": [{ "scopeSpans": [{ "spans": [{ "traceId": "4bf9" }] }] }]
});
exporter.export(payload);
let got = await_captured(&captured)
.await
.expect("collector should receive the exported span");
assert!(got.uri.ends_with("/v1/traces"), "uri: {}", got.uri);
assert_eq!(got.content_type, "application/json");
assert!(got.body.contains("resourceSpans"), "body: {}", got.body);
assert!(
got.body.contains("4bf9"),
"body carries the span: {}",
got.body
);
}
#[tokio::test]
async fn a_trailing_slash_on_the_base_url_is_not_doubled() {
let exporter = OtlpHttpExporter::new("http://collector:4318/");
assert_eq!(exporter.endpoint(), "http://collector:4318/v1/traces");
}
#[tokio::test]
async fn exporting_to_a_down_collector_neither_blocks_nor_panics() {
let exporter = OtlpHttpExporter::new("http://127.0.0.1:9");
let payload = serde_json::json!({ "resourceSpans": [] });
for _ in 0..10 {
exporter.export(payload.clone());
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}