use std::net::SocketAddr;
pub use crabka_telemetry::{OtlpConfig, OtlpProtocol, TelemetryError, TelemetryGuard, init};
pub const REQUEST_TARGET: &str = "crabka_broker::request";
#[must_use]
pub fn request_span(
api_key: i16,
api_version: i16,
correlation_id: i32,
client_id: Option<&str>,
peer: &SocketAddr,
) -> tracing::Span {
let span = tracing::debug_span!(
target: REQUEST_TARGET,
"kafka.request",
otel.kind = "server",
otel.name = tracing::field::Empty,
messaging.system = "kafka",
kafka.api_key = api_key,
kafka.api_version = api_version,
kafka.correlation_id = correlation_id,
messaging.kafka.client_id = client_id.unwrap_or(""),
network.peer.address = %peer,
);
span.record("otel.name", api_name(api_key));
span
}
#[must_use]
pub fn api_name(api_key: i16) -> &'static str {
crabka_protocol::ApiKey::from_i16(api_key).map_or("Unknown", Into::into)
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn api_name_known_and_unknown() {
assert!(api_name(0) == "Produce");
assert!(api_name(1) == "Fetch");
assert!(api_name(18) == "ApiVersions");
assert!(api_name(51) == "AlterUserScramCredentials");
assert!(api_name(30) == "CreateAcls");
assert!(api_name(9999) == "Unknown");
}
#[test]
fn request_span_records_otel_name() {
use std::sync::{Arc, Mutex};
use tracing::field::{Field, Visit};
use tracing::span::{Attributes, Record};
use tracing_subscriber::Layer;
use tracing_subscriber::layer::Context;
use tracing_subscriber::prelude::*;
#[derive(Default)]
struct Captured {
name: Option<String>,
api_key: Option<i64>,
kind: Option<String>,
}
struct V<'a>(&'a mut Captured);
impl Visit for V<'_> {
fn record_debug(&mut self, _f: &Field, _v: &dyn std::fmt::Debug) {}
fn record_str(&mut self, f: &Field, v: &str) {
match f.name() {
"otel.name" => self.0.name = Some(v.to_owned()),
"otel.kind" => self.0.kind = Some(v.to_owned()),
_ => {}
}
}
fn record_i64(&mut self, f: &Field, v: i64) {
if f.name() == "kafka.api_key" {
self.0.api_key = Some(v);
}
}
}
struct Cap(Arc<Mutex<Captured>>);
impl<S: tracing::Subscriber> Layer<S> for Cap {
fn on_new_span(&self, attrs: &Attributes<'_>, _id: &tracing::Id, _ctx: Context<'_, S>) {
attrs.record(&mut V(&mut self.0.lock().unwrap()));
}
fn on_record(&self, _id: &tracing::Id, values: &Record<'_>, _ctx: Context<'_, S>) {
values.record(&mut V(&mut self.0.lock().unwrap()));
}
}
let captured = Arc::new(Mutex::new(Captured::default()));
let subscriber = tracing_subscriber::registry().with(
Cap(captured.clone()).with_filter(tracing_subscriber::filter::LevelFilter::DEBUG),
);
let peer: SocketAddr = "127.0.0.1:9092".parse().unwrap();
tracing::subscriber::with_default(subscriber, || {
let _span = request_span(0, 9, 42, Some("my-client"), &peer);
});
let g = captured.lock().unwrap();
assert!(g.name.as_deref() == Some("Produce"));
assert!(g.kind.as_deref() == Some("server"));
assert!(g.api_key == Some(0));
}
}