use crate::PartitionId;
use sha2::{Digest, Sha256};
use tracing::{Level, Span};
pub const MESSAGING_SYSTEM: &str = "messaging.system";
pub const MESSAGING_DESTINATION: &str = "messaging.destination.name";
pub const MESSAGING_OPERATION: &str = "messaging.operation";
pub const MESSAGING_KAFKA_PARTITION: &str = "messaging.kafka.destination.partition";
pub const MESSAGING_KAFKA_OFFSET: &str = "messaging.kafka.message.offset";
pub const MESSAGING_KAFKA_CONSUMER_GROUP: &str = "messaging.kafka.consumer.group";
pub const MESSAGING_MESSAGE_KEY: &str = "messaging.message.id";
pub const KRAFKA_MESSAGE_KEY_SIZE: &str = "krafka.message.key.size";
pub const KRAFKA_MESSAGE_KEY_SHA256: &str = "krafka.message.key.sha256";
pub const MESSAGING_MESSAGE_BODY_SIZE: &str = "messaging.message.body.size";
pub const MESSAGING_BATCH_MESSAGE_COUNT: &str = "messaging.batch.message_count";
pub const KRAFKA_CORRELATION_ID: &str = "krafka.correlation_id";
pub const KRAFKA_COMPRESSION: &str = "krafka.compression";
pub const KRAFKA_ACKS: &str = "krafka.acks";
#[inline]
pub fn kafka_producer_span(
topic: &str,
partition: Option<PartitionId>,
key: Option<&[u8]>,
) -> Span {
if !tracing::enabled!(target: module_path!(), Level::INFO) {
return Span::none();
}
let span = tracing::span!(
Level::INFO,
"kafka.produce",
{ MESSAGING_SYSTEM } = tracing::field::Empty,
{ MESSAGING_OPERATION } = tracing::field::Empty,
{ MESSAGING_DESTINATION } = tracing::field::Empty,
{ MESSAGING_KAFKA_PARTITION } = tracing::field::Empty,
{ KRAFKA_MESSAGE_KEY_SIZE } = tracing::field::Empty,
{ KRAFKA_MESSAGE_KEY_SHA256 } = tracing::field::Empty,
otel.status_code = tracing::field::Empty,
error.message = tracing::field::Empty,
);
span.record(MESSAGING_SYSTEM, "kafka");
span.record(MESSAGING_OPERATION, "publish");
span.record(MESSAGING_DESTINATION, topic);
if let Some(p) = partition {
span.record(MESSAGING_KAFKA_PARTITION, p);
}
if let Some(key_bytes) = key
&& !span.is_disabled()
{
span.record(KRAFKA_MESSAGE_KEY_SIZE, key_bytes.len() as u64);
let key_hash = sha256_hex(key_bytes);
span.record(KRAFKA_MESSAGE_KEY_SHA256, key_hash.as_str());
}
span
}
fn sha256_hex(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let digest = Sha256::digest(bytes);
let mut output = String::with_capacity(digest.len() * 2);
for byte in digest {
output.push(HEX[(byte >> 4) as usize] as char);
output.push(HEX[(byte & 0x0f) as usize] as char);
}
output
}
#[inline]
pub fn kafka_consumer_poll_span(group_id: Option<&str>, topics: &[String]) -> Span {
if !tracing::enabled!(target: module_path!(), Level::INFO) {
return Span::none();
}
let topics_str = topics.join(",");
let span = tracing::span!(
Level::INFO,
"kafka.poll",
{ MESSAGING_SYSTEM } = "kafka",
{ MESSAGING_OPERATION } = "receive",
topics = %topics_str,
{ MESSAGING_KAFKA_CONSUMER_GROUP } = tracing::field::Empty,
otel.status_code = tracing::field::Empty,
error.message = tracing::field::Empty,
);
if let Some(gid) = group_id {
span.record(MESSAGING_KAFKA_CONSUMER_GROUP, gid);
}
span
}
#[inline]
pub fn kafka_fetch_span(topic: &str, partition: PartitionId, offset: i64) -> Span {
tracing::span!(
Level::DEBUG,
"kafka.fetch",
{ MESSAGING_SYSTEM } = "kafka",
{ MESSAGING_OPERATION } = "receive",
{ MESSAGING_DESTINATION } = topic,
{ MESSAGING_KAFKA_PARTITION } = partition,
{ MESSAGING_KAFKA_OFFSET } = offset,
otel.status_code = tracing::field::Empty,
error.message = tracing::field::Empty,
)
}
#[inline]
pub fn kafka_commit_span(
group_id: Option<&str>,
topic: &str,
partition: PartitionId,
offset: i64,
) -> Span {
let span = tracing::span!(
Level::DEBUG,
"kafka.commit",
{ MESSAGING_SYSTEM } = "kafka",
{ MESSAGING_OPERATION } = "settle",
{ MESSAGING_DESTINATION } = topic,
{ MESSAGING_KAFKA_PARTITION } = partition,
{ MESSAGING_KAFKA_OFFSET } = offset,
{ MESSAGING_KAFKA_CONSUMER_GROUP } = tracing::field::Empty,
otel.status_code = tracing::field::Empty,
error.message = tracing::field::Empty,
);
if let Some(gid) = group_id {
span.record(MESSAGING_KAFKA_CONSUMER_GROUP, gid);
}
span
}
#[inline]
pub fn kafka_admin_span(operation: &str, resource: Option<&str>) -> Span {
if !tracing::enabled!(target: module_path!(), Level::INFO) {
return Span::none();
}
let span = tracing::span!(
Level::INFO,
"kafka.admin",
{ MESSAGING_SYSTEM } = "kafka",
operation = operation,
resource = tracing::field::Empty,
otel.status_code = tracing::field::Empty,
error.message = tracing::field::Empty,
);
if let Some(res) = resource {
span.record("resource", res);
}
span
}
#[inline]
pub fn kafka_connection_span(broker_address: &str, operation: &str) -> Span {
tracing::span!(
Level::DEBUG,
"kafka.connection",
{ MESSAGING_SYSTEM } = "kafka",
broker = broker_address,
operation = operation,
)
}
#[inline]
pub fn kafka_request_span(api_key: &str, correlation_id: i32) -> Span {
tracing::span!(
Level::DEBUG,
"kafka.request",
{ MESSAGING_SYSTEM } = "kafka",
api_key = api_key,
{ KRAFKA_CORRELATION_ID } = correlation_id,
)
}
#[inline]
pub fn kafka_group_span(group_id: &str, operation: &str) -> Span {
tracing::span!(
Level::DEBUG,
"kafka.group",
{ MESSAGING_SYSTEM } = "kafka",
{ MESSAGING_KAFKA_CONSUMER_GROUP } = group_id,
operation = operation,
)
}
#[inline]
pub fn kafka_rebalance_span(group_id: &str, event: &str, partition_count: usize) -> Span {
if !tracing::enabled!(target: module_path!(), Level::INFO) {
return Span::none();
}
tracing::span!(
Level::INFO,
"kafka.rebalance",
{ MESSAGING_SYSTEM } = "kafka",
{ MESSAGING_KAFKA_CONSUMER_GROUP } = group_id,
event = event,
partition_count = partition_count,
)
}
#[inline]
pub fn record_error(error: &dyn std::error::Error) {
let span = Span::current();
span.record("otel.status_code", "ERROR");
span.record("error.message", error.to_string().as_str());
}
#[inline]
pub fn record_error_message(message: &str) {
let span = Span::current();
span.record("otel.status_code", "ERROR");
span.record("error.message", message);
}
#[inline]
pub fn record_success() {
let span = Span::current();
span.record("otel.status_code", "OK");
}
#[inline]
pub fn record_batch_count(count: usize) {
let span = Span::current();
span.record(MESSAGING_BATCH_MESSAGE_COUNT, count);
}
#[inline]
pub fn record_message_size(size: usize) {
let span = Span::current();
span.record(MESSAGING_MESSAGE_BODY_SIZE, size);
}
#[inline]
pub fn record_offset(offset: i64) {
let span = Span::current();
span.record(MESSAGING_KAFKA_OFFSET, offset);
}
#[inline]
pub fn record_partition(partition: PartitionId) {
let span = Span::current();
span.record(MESSAGING_KAFKA_PARTITION, partition);
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_kafka_producer_span() {
let _span = kafka_producer_span("test-topic", Some(0), Some(b"key"));
let _binary_key_span =
kafka_producer_span("test-topic", Some(0), Some(&[0, 159, 146, 150]));
let _span2 = kafka_producer_span("test-topic", None, None);
}
#[test]
fn test_sha256_hex_for_message_key() {
assert_eq!(
sha256_hex(b"key"),
"2c70e12b7a0646f92279f427c7b38e7334d8e5389cff167a1dc30e73f826b683"
);
}
#[test]
fn test_kafka_consumer_poll_span() {
let topics = vec!["topic1".to_string(), "topic2".to_string()];
let _span = kafka_consumer_poll_span(Some("my-group"), &topics);
let _span2 = kafka_consumer_poll_span(None, &topics);
}
#[test]
fn test_kafka_fetch_span() {
let _span = kafka_fetch_span("test-topic", 0, 100);
}
#[test]
fn test_kafka_commit_span() {
let _span = kafka_commit_span(Some("my-group"), "test-topic", 0, 100);
let _span2 = kafka_commit_span(None, "test-topic", 1, 200);
}
#[test]
fn test_kafka_admin_span() {
let _span = kafka_admin_span("create_topic", Some("new-topic"));
let _span2 = kafka_admin_span("list_topics", None);
}
#[test]
fn test_kafka_connection_span() {
let _span = kafka_connection_span("localhost:9092", "connect");
}
#[test]
fn test_kafka_request_span() {
let _span = kafka_request_span("Produce", 42);
}
#[test]
fn test_kafka_group_span() {
let _span = kafka_group_span("my-group", "join");
}
#[test]
fn test_kafka_rebalance_span() {
let _span = kafka_rebalance_span("my-group", "assigned", 3);
}
#[test]
fn test_record_helpers() {
record_batch_count(10);
record_message_size(1024);
record_offset(12345);
record_partition(0);
record_success();
record_error_message("test error");
}
#[test]
fn test_semantic_conventions() {
assert_eq!(MESSAGING_SYSTEM, "messaging.system");
assert_eq!(MESSAGING_DESTINATION, "messaging.destination.name");
assert_eq!(MESSAGING_OPERATION, "messaging.operation");
assert_eq!(
MESSAGING_KAFKA_PARTITION,
"messaging.kafka.destination.partition"
);
assert_eq!(MESSAGING_KAFKA_OFFSET, "messaging.kafka.message.offset");
assert_eq!(
MESSAGING_KAFKA_CONSUMER_GROUP,
"messaging.kafka.consumer.group"
);
}
}