use std::time::Duration;
use kafka_protocol::protocol::Request;
pub(crate) struct ProducerRequestMetrics<'a> {
pub client_id: &'a str,
pub leader_id: i32,
pub transactional: bool,
pub expects_response: bool,
pub batch_count: usize,
pub record_count: usize,
pub batch_bytes: usize,
pub oldest_batch_age_ms: u128,
}
pub(crate) struct ProducerBatchMetrics<'a> {
pub client_id: &'a str,
pub topic: &'a str,
pub partition: i32,
pub record_count: usize,
pub duration: Duration,
}
pub(crate) struct ProducerAccumulatorMetrics<'a> {
pub client_id: &'a str,
pub batch_count: usize,
pub record_count: usize,
pub estimated_bytes: usize,
pub retrying_batch_count: usize,
pub oldest_batch_age_ms: u128,
}
pub(crate) fn record_broker_connection(
client_id: &str,
address: &str,
security_protocol: &str,
duration: Duration,
success: bool,
) {
let result = result_label(success);
metrics::counter!(
"kafkit.client.broker.connections.total",
"client_id" => client_id.to_owned(),
"address" => address.to_owned(),
"security_protocol" => security_protocol.to_owned(),
"result" => result
)
.increment(1);
metrics::histogram!(
"kafkit.client.broker.connection.duration.ms",
"client_id" => client_id.to_owned(),
"address" => address.to_owned(),
"security_protocol" => security_protocol.to_owned(),
"result" => result
)
.record(duration_ms(duration));
}
pub(crate) fn record_kafka_request<Req>(
client_id: &str,
version: i16,
request_bytes: usize,
response_bytes: usize,
duration: Duration,
success: bool,
expects_response: bool,
) where
Req: Request,
{
let api_key = Req::KEY.to_string();
let api_version = version.to_string();
let expects_response_label = expects_response.to_string();
let result = result_label(success);
metrics::counter!(
"kafkit.client.kafka.requests.total",
"client_id" => client_id.to_owned(),
"api_key" => api_key.clone(),
"api_version" => api_version.clone(),
"expects_response" => expects_response_label.clone(),
"result" => result
)
.increment(1);
metrics::histogram!(
"kafkit.client.kafka.request.duration.ms",
"client_id" => client_id.to_owned(),
"api_key" => api_key.clone(),
"api_version" => api_version.clone(),
"expects_response" => expects_response_label.clone(),
"result" => result
)
.record(duration_ms(duration));
metrics::histogram!(
"kafkit.client.kafka.request.bytes",
"client_id" => client_id.to_owned(),
"api_key" => api_key.clone(),
"api_version" => api_version.clone(),
"expects_response" => expects_response_label.clone(),
"result" => result
)
.record(request_bytes as f64);
metrics::counter!(
"kafkit.client.kafka.request.bytes.total",
"client_id" => client_id.to_owned(),
"api_key" => api_key.clone(),
"api_version" => api_version.clone(),
"expects_response" => expects_response_label,
"result" => result
)
.increment(request_bytes as u64);
if expects_response {
metrics::histogram!(
"kafkit.client.kafka.response.bytes",
"client_id" => client_id.to_owned(),
"api_key" => api_key.clone(),
"api_version" => api_version.clone(),
"result" => result
)
.record(response_bytes as f64);
metrics::counter!(
"kafkit.client.kafka.response.bytes.total",
"client_id" => client_id.to_owned(),
"api_key" => api_key,
"api_version" => api_version,
"result" => result
)
.increment(response_bytes as u64);
}
}
pub(crate) fn record_producer_request_dispatched(metrics: &ProducerRequestMetrics<'_>) {
let leader_id = metrics.leader_id.to_string();
let transactional = metrics.transactional.to_string();
let expects_response = metrics.expects_response.to_string();
metrics::counter!(
"kafkit.client.producer.produce.requests.dispatched.total",
"client_id" => metrics.client_id.to_owned(),
"leader_id" => leader_id.clone(),
"transactional" => transactional.clone(),
"expects_response" => expects_response
)
.increment(1);
metrics::counter!(
"kafkit.client.producer.records.dispatched.total",
"client_id" => metrics.client_id.to_owned(),
"leader_id" => leader_id.clone(),
"transactional" => transactional.clone()
)
.increment(metrics.record_count as u64);
metrics::counter!(
"kafkit.client.producer.batches.dispatched.total",
"client_id" => metrics.client_id.to_owned(),
"leader_id" => leader_id.clone(),
"transactional" => transactional
)
.increment(metrics.batch_count as u64);
metrics::histogram!(
"kafkit.client.producer.batch.oldest.age.ms",
"client_id" => metrics.client_id.to_owned(),
"leader_id" => leader_id,
"transactional" => metrics.transactional.to_string()
)
.record(metrics.oldest_batch_age_ms as f64);
}
pub(crate) fn record_producer_request_completed(
metrics: &ProducerRequestMetrics<'_>,
duration: Duration,
success: bool,
) {
let leader_id = metrics.leader_id.to_string();
let transactional = metrics.transactional.to_string();
let result = result_label(success);
metrics::counter!(
"kafkit.client.producer.produce.requests.completed.total",
"client_id" => metrics.client_id.to_owned(),
"leader_id" => leader_id.clone(),
"transactional" => transactional.clone(),
"expects_response" => metrics.expects_response.to_string(),
"result" => result
)
.increment(1);
metrics::histogram!(
"kafkit.client.producer.produce.request.duration.ms",
"client_id" => metrics.client_id.to_owned(),
"leader_id" => leader_id.clone(),
"transactional" => transactional.clone(),
"expects_response" => metrics.expects_response.to_string(),
"result" => result
)
.record(duration_ms(duration));
metrics::histogram!(
"kafkit.client.producer.produce.request.records",
"client_id" => metrics.client_id.to_owned(),
"leader_id" => leader_id.clone(),
"transactional" => transactional.clone(),
"result" => result
)
.record(metrics.record_count as f64);
metrics::histogram!(
"kafkit.client.producer.produce.request.batches",
"client_id" => metrics.client_id.to_owned(),
"leader_id" => leader_id.clone(),
"transactional" => transactional.clone(),
"result" => result
)
.record(metrics.batch_count as f64);
metrics::histogram!(
"kafkit.client.producer.produce.request.bytes",
"client_id" => metrics.client_id.to_owned(),
"leader_id" => leader_id,
"transactional" => transactional,
"result" => result
)
.record(metrics.batch_bytes as f64);
}
pub(crate) fn record_producer_record_queued(client_id: &str, topic: &str, partition: i32) {
metrics::counter!(
"kafkit.client.producer.records.queued.total",
"client_id" => client_id.to_owned(),
"topic" => topic.to_owned(),
"partition" => partition.to_string()
)
.increment(1);
}
pub(crate) fn record_producer_batch_opened(client_id: &str, topic: &str, partition: i32) {
metrics::counter!(
"kafkit.client.producer.batches.opened.total",
"client_id" => client_id.to_owned(),
"topic" => topic.to_owned(),
"partition" => partition.to_string()
)
.increment(1);
}
pub(crate) fn record_producer_accumulator(metrics: &ProducerAccumulatorMetrics<'_>) {
metrics::gauge!(
"kafkit.client.producer.accumulator.batches",
"client_id" => metrics.client_id.to_owned()
)
.set(metrics.batch_count as f64);
metrics::gauge!(
"kafkit.client.producer.accumulator.records",
"client_id" => metrics.client_id.to_owned()
)
.set(metrics.record_count as f64);
metrics::gauge!(
"kafkit.client.producer.accumulator.bytes",
"client_id" => metrics.client_id.to_owned()
)
.set(metrics.estimated_bytes as f64);
metrics::gauge!(
"kafkit.client.producer.accumulator.retrying_batches",
"client_id" => metrics.client_id.to_owned()
)
.set(metrics.retrying_batch_count as f64);
metrics::histogram!(
"kafkit.client.producer.accumulator.oldest_batch.age.ms",
"client_id" => metrics.client_id.to_owned()
)
.record(metrics.oldest_batch_age_ms as f64);
}
pub(crate) fn record_producer_batch_succeeded(metrics: &ProducerBatchMetrics<'_>) {
metrics::counter!(
"kafkit.client.producer.batches.succeeded.total",
"client_id" => metrics.client_id.to_owned(),
"topic" => metrics.topic.to_owned(),
"partition" => metrics.partition.to_string()
)
.increment(1);
metrics::counter!(
"kafkit.client.producer.records.succeeded.total",
"client_id" => metrics.client_id.to_owned(),
"topic" => metrics.topic.to_owned(),
"partition" => metrics.partition.to_string()
)
.increment(metrics.record_count as u64);
record_producer_batch_duration(metrics, true, None);
}
pub(crate) fn record_producer_batch_failed(
metrics: &ProducerBatchMetrics<'_>,
reason: &'static str,
error: &str,
) {
metrics::counter!(
"kafkit.client.producer.batches.failed.total",
"client_id" => metrics.client_id.to_owned(),
"topic" => metrics.topic.to_owned(),
"partition" => metrics.partition.to_string(),
"reason" => reason,
"error" => error.to_owned()
)
.increment(1);
metrics::counter!(
"kafkit.client.producer.records.failed.total",
"client_id" => metrics.client_id.to_owned(),
"topic" => metrics.topic.to_owned(),
"partition" => metrics.partition.to_string(),
"reason" => reason,
"error" => error.to_owned()
)
.increment(metrics.record_count as u64);
record_producer_batch_duration(metrics, false, Some(reason));
}
fn record_producer_batch_duration(
metrics: &ProducerBatchMetrics<'_>,
success: bool,
reason: Option<&'static str>,
) {
let result = result_label(success);
if let Some(reason) = reason {
metrics::histogram!(
"kafkit.client.producer.batch.delivery.duration.ms",
"client_id" => metrics.client_id.to_owned(),
"topic" => metrics.topic.to_owned(),
"partition" => metrics.partition.to_string(),
"result" => result,
"reason" => reason
)
.record(duration_ms(metrics.duration));
} else {
metrics::histogram!(
"kafkit.client.producer.batch.delivery.duration.ms",
"client_id" => metrics.client_id.to_owned(),
"topic" => metrics.topic.to_owned(),
"partition" => metrics.partition.to_string(),
"result" => result
)
.record(duration_ms(metrics.duration));
}
}
pub(crate) fn record_consumer_poll_started(client_id: &str, group_id: &str) {
metrics::counter!(
"kafkit.client.consumer.polls.started.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned()
)
.increment(1);
}
pub(crate) fn record_consumer_poll_completed(
client_id: &str,
group_id: &str,
duration: Duration,
record_count: usize,
result: &'static str,
) {
metrics::counter!(
"kafkit.client.consumer.polls.completed.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"result" => result
)
.increment(1);
metrics::histogram!(
"kafkit.client.consumer.poll.duration.ms",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"result" => result
)
.record(duration_ms(duration));
metrics::histogram!(
"kafkit.client.consumer.poll.records",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"result" => result
)
.record(record_count as f64);
if record_count > 0 {
metrics::counter!(
"kafkit.client.consumer.records.delivered.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned()
)
.increment(record_count as u64);
}
}
pub(crate) fn record_consumer_records_fetched(
client_id: &str,
group_id: &str,
record_count: usize,
partition_count: usize,
buffered_records: usize,
) {
metrics::counter!(
"kafkit.client.consumer.records.fetched.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned()
)
.increment(record_count as u64);
metrics::histogram!(
"kafkit.client.consumer.fetch.records",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned()
)
.record(record_count as f64);
metrics::histogram!(
"kafkit.client.consumer.fetch.partitions",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned()
)
.record(partition_count as f64);
metrics::gauge!(
"kafkit.client.consumer.buffered.records",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned()
)
.set(buffered_records as f64);
}
pub(crate) fn record_consumer_offset_reset(
client_id: &str,
group_id: &str,
topic: &str,
partition: i32,
) {
metrics::counter!(
"kafkit.client.consumer.offset_resets.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"topic" => topic.to_owned(),
"partition" => partition.to_string()
)
.increment(1);
}
pub(crate) fn record_consumer_commit_completed(
client_id: &str,
group_id: &str,
kind: &'static str,
offset_count: usize,
duration: Duration,
success: bool,
) {
let result = result_label(success);
metrics::counter!(
"kafkit.client.consumer.commits.completed.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"kind" => kind,
"result" => result
)
.increment(1);
metrics::histogram!(
"kafkit.client.consumer.commit.duration.ms",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"kind" => kind,
"result" => result
)
.record(duration_ms(duration));
metrics::histogram!(
"kafkit.client.consumer.commit.offsets",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"kind" => kind,
"result" => result
)
.record(offset_count as f64);
}
pub(crate) fn record_share_poll_completed(
client_id: &str,
group_id: &str,
duration: Duration,
record_count: usize,
result: &'static str,
) {
metrics::counter!(
"kafkit.client.share.polls.completed.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"result" => result
)
.increment(1);
metrics::histogram!(
"kafkit.client.share.poll.duration.ms",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"result" => result
)
.record(duration_ms(duration));
metrics::histogram!(
"kafkit.client.share.poll.records",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"result" => result
)
.record(record_count as f64);
if record_count > 0 {
metrics::counter!(
"kafkit.client.share.records.delivered.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned()
)
.increment(record_count as u64);
}
}
pub(crate) fn record_share_acknowledgement_queued(
client_id: &str,
group_id: &str,
topic: &str,
partition: i32,
acknowledge_type: &'static str,
) {
metrics::counter!(
"kafkit.client.share.acknowledgements.queued.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"topic" => topic.to_owned(),
"partition" => partition.to_string(),
"acknowledge_type" => acknowledge_type
)
.increment(1);
}
pub(crate) fn record_share_fetch_completed(
client_id: &str,
group_id: &str,
leader_id: i32,
record_count: usize,
duration: Duration,
success: bool,
) {
let result = result_label(success);
metrics::counter!(
"kafkit.client.share.fetches.completed.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"leader_id" => leader_id.to_string(),
"result" => result
)
.increment(1);
metrics::histogram!(
"kafkit.client.share.fetch.duration.ms",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"leader_id" => leader_id.to_string(),
"result" => result
)
.record(duration_ms(duration));
metrics::histogram!(
"kafkit.client.share.fetch.records",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"leader_id" => leader_id.to_string(),
"result" => result
)
.record(record_count as f64);
if record_count > 0 {
metrics::counter!(
"kafkit.client.share.records.fetched.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"leader_id" => leader_id.to_string()
)
.increment(record_count as u64);
}
}
pub(crate) fn record_share_acknowledge_completed(
client_id: &str,
group_id: &str,
leader_id: i32,
acknowledgement_count: usize,
close_session: bool,
duration: Duration,
success: bool,
) {
let result = result_label(success);
metrics::counter!(
"kafkit.client.share.acknowledge.requests.completed.total",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"leader_id" => leader_id.to_string(),
"close_session" => close_session.to_string(),
"result" => result
)
.increment(1);
metrics::histogram!(
"kafkit.client.share.acknowledge.request.duration.ms",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"leader_id" => leader_id.to_string(),
"close_session" => close_session.to_string(),
"result" => result
)
.record(duration_ms(duration));
metrics::histogram!(
"kafkit.client.share.acknowledge.request.records",
"client_id" => client_id.to_owned(),
"group_id" => group_id.to_owned(),
"leader_id" => leader_id.to_string(),
"close_session" => close_session.to_string(),
"result" => result
)
.record(acknowledgement_count as f64);
}
pub(crate) fn result_label(success: bool) -> &'static str {
if success { "success" } else { "error" }
}
pub(crate) fn duration_ms(duration: Duration) -> f64 {
duration.as_secs_f64() * 1_000.0
}