use std::sync::Arc;
use std::time::Instant;
use folk_api::{CounterVec, GaugeVec, HistogramVec, MetricsRegistry};
#[derive(Clone)]
pub struct GrpcMetrics {
pub requests_total: Arc<dyn CounterVec>,
pub request_duration_seconds: Arc<dyn HistogramVec>,
pub message_sent_bytes: Arc<dyn HistogramVec>,
pub message_received_bytes: Arc<dyn HistogramVec>,
pub active_streams: Arc<dyn GaugeVec>,
pub errors_total: Arc<dyn CounterVec>,
}
impl GrpcMetrics {
pub fn new(registry: &dyn MetricsRegistry) -> Self {
Self {
requests_total: registry.counter_vec(
"folk_grpc_requests_total",
"Total number of gRPC calls",
&["service", "method", "grpc_status"],
),
request_duration_seconds: registry.histogram_vec(
"folk_grpc_request_duration_seconds",
"gRPC request processing duration in seconds",
&["service", "method"],
),
message_sent_bytes: registry.histogram_vec(
"folk_grpc_message_sent_bytes",
"Size of sent gRPC messages in bytes",
&["service", "method"],
),
message_received_bytes: registry.histogram_vec(
"folk_grpc_message_received_bytes",
"Size of received gRPC messages in bytes",
&["service", "method"],
),
active_streams: registry.gauge_vec(
"folk_grpc_active_streams",
"Number of active gRPC streams",
&[],
),
errors_total: registry.counter_vec(
"folk_grpc_errors_total",
"Total gRPC errors by type",
&["service", "method", "type"],
),
}
}
pub fn track_start(&self) -> RequestTracker {
self.active_streams.with_labels(&[]).inc();
RequestTracker {
start: Instant::now(),
metrics: self.clone(),
}
}
}
pub struct RequestTracker {
start: Instant,
metrics: GrpcMetrics,
}
impl RequestTracker {
pub fn finish(
self,
service: &str,
method: &str,
grpc_status: u32,
recv_bytes: usize,
sent_bytes: usize,
) {
let duration = self.start.elapsed().as_secs_f64();
let status_str = grpc_status.to_string();
self.metrics
.requests_total
.with_labels(&[service, method, &status_str])
.inc();
self.metrics
.request_duration_seconds
.with_labels(&[service, method])
.observe(duration);
self.metrics
.message_received_bytes
.with_labels(&[service, method])
.observe(recv_bytes as f64);
self.metrics
.message_sent_bytes
.with_labels(&[service, method])
.observe(sent_bytes as f64);
if grpc_status != 0 {
let error_type = grpc_status_name(grpc_status);
self.metrics
.errors_total
.with_labels(&[service, method, error_type])
.inc();
}
self.metrics.active_streams.with_labels(&[]).dec();
}
}
fn grpc_status_name(code: u32) -> &'static str {
match code {
1 => "CANCELLED",
2 => "UNKNOWN",
3 => "INVALID_ARGUMENT",
4 => "DEADLINE_EXCEEDED",
5 => "NOT_FOUND",
11 => "RESOURCE_EXHAUSTED",
12 => "UNIMPLEMENTED",
13 => "INTERNAL",
14 => "UNAVAILABLE",
_ => "OTHER",
}
}