1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
//! `PushTelemetry` (`api_key=72`, KIP-714). Validates the push against the
//! client's subscription + throttle state, decompresses + decodes the OTLP
//! payload, and fans it out to the Prometheus + OTLP sinks.
use bytes::{Bytes, BytesMut};
use uuid::Uuid;
use crabka_compression::CompressionType;
use crabka_protocol::owned::push_telemetry_request::PushTelemetryRequest;
use crabka_protocol::owned::push_telemetry_response::PushTelemetryResponse;
use crabka_protocol::{Decode, Encode};
use crate::broker::Broker;
use crate::client_metrics::manager::PushDecision;
use crate::client_metrics::otlp;
use crate::client_metrics::prometheus_sink::DataPoint;
use crate::codes;
use crate::error::BrokerError;
use crate::handlers::context::TelemetryContext;
#[allow(clippy::unused_async)] // signature symmetry with other inline-intercept handlers
pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &TelemetryContext<'_>,
) -> Result<Bytes, BrokerError> {
let mut cur: &[u8] = req_bytes;
let req = PushTelemetryRequest::decode(&mut cur, version)?;
let instance = Uuid::from_bytes(req.client_instance_id.0);
let mut error_code = codes::NONE;
let mut throttle_time_ms = 0i32;
let codec =
CompressionType::from_attribute_bits(u8::try_from(req.compression_type).unwrap_or(0xff));
match broker.client_metrics.manager.authorize_push(
instance,
req.subscription_id,
req.terminating,
codec.is_some(),
req.metrics.len(),
) {
PushDecision::Reject {
error_code: ec,
throttle_ms,
} => {
error_code = ec;
throttle_time_ms = throttle_ms;
}
PushDecision::Accept { .. } => {
// authorize_push guarantees compression is supported on Accept.
// A terminating push that later fails to decode still fences the
// instance and drops those metrics (best-effort, matches Kafka).
let ct = codec.expect("authorize_push guarantees a supported codec on Accept");
// Bound decompressed output to guard against a decompression bomb
// in the client-metrics payload: ≤100x the compressed size, with a
// 16 MiB floor and a 1 GiB ceiling.
let max_output = req
.metrics
.len()
.saturating_mul(100)
.clamp(16 * 1024 * 1024, 1024 * 1024 * 1024);
match crabka_compression::decompress(ct, &req.metrics, max_output) {
Ok(raw) => match otlp::decode_metrics(&raw) {
Ok(md) => {
let instance_str = instance.to_string();
let points = flatten_for_prometheus(&md, &instance_str, ctx.client_id);
broker.client_metrics.prometheus.ingest(&points);
broker.client_metrics.otlp.forward(md, &instance_str);
}
Err(e) => tracing::debug!(error = %e, "client-metrics OTLP decode failed"),
},
Err(e) => tracing::debug!(error = %e, "client-metrics decompress failed"),
}
}
}
let resp = PushTelemetryResponse {
throttle_time_ms,
error_code,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}
/// Flatten an OTLP `MetricsData` into Prometheus data points (Sum/Gauge
/// numbers; Histogram → count/sum gauges). Best-effort — unknown shapes skipped.
fn flatten_for_prometheus(
md: &opentelemetry_proto::tonic::metrics::v1::MetricsData,
instance: &str,
client_id: &str,
) -> Vec<DataPoint> {
use opentelemetry_proto::tonic::metrics::v1::{metric::Data, number_data_point::Value};
let mut out = Vec::new();
let num = |v: &Value| -> f64 {
match v {
Value::AsDouble(d) => *d,
Value::AsInt(i) => {
#[allow(clippy::cast_precision_loss)]
// i64→f64 for telemetry display; sub-ms precision loss is acceptable
let f = *i as f64;
f
}
}
};
for rm in &md.resource_metrics {
for sm in &rm.scope_metrics {
for m in &sm.metrics {
match &m.data {
Some(Data::Gauge(g)) => {
for dp in &g.data_points {
if let Some(v) = &dp.value {
out.push(DataPoint {
metric: m.name.clone(),
client_instance_id: instance.to_string(),
client_id: client_id.to_string(),
value: num(v),
});
}
}
}
Some(Data::Sum(s)) => {
for dp in &s.data_points {
if let Some(v) = &dp.value {
out.push(DataPoint {
metric: m.name.clone(),
client_instance_id: instance.to_string(),
client_id: client_id.to_string(),
value: num(v),
});
}
}
}
Some(Data::Histogram(h)) => {
for dp in &h.data_points {
#[allow(clippy::cast_precision_loss)]
// u64→f64 for telemetry display; large counts lose sub-ms precision
out.push(DataPoint {
metric: format!("{}_count", m.name),
client_instance_id: instance.to_string(),
client_id: client_id.to_string(),
value: dp.count as f64,
});
if let Some(sum) = dp.sum {
out.push(DataPoint {
metric: format!("{}_sum", m.name),
client_instance_id: instance.to_string(),
client_id: client_id.to_string(),
value: sum,
});
}
}
}
_ => {}
}
}
}
}
out
}