use bytes::{Buf, BufMut, Bytes};
use super::{VersionedDecode, VersionedEncode};
use crate::error::{ErrorCode, KrafkaError, ProtocolErrorKind, Result};
use crate::protocol::check_compact_array_len;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
#[derive(Debug, Clone)]
pub struct GetTelemetrySubscriptionsRequest {
pub client_instance_id: [u8; 16],
}
impl GetTelemetrySubscriptionsRequest {
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
buf.put_slice(&self.client_instance_id);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct GetTelemetrySubscriptionsResponse {
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
pub client_instance_id: [u8; 16],
pub subscription_id: i32,
pub accepted_compression_types: Vec<i8>,
pub push_interval_ms: i32,
pub telemetry_max_bytes: i32,
pub delta_temporality: bool,
pub requested_metrics: Vec<String>,
}
impl GetTelemetrySubscriptionsResponse {
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let mut client_instance_id = [0u8; 16];
if buf.remaining() < 16 {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::TruncatedFrame,
"not enough bytes for client_instance_id UUID",
));
}
buf.copy_to_slice(&mut client_instance_id);
let subscription_id = i32::decode(buf)?;
let ct_count = check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut accepted_compression_types = Vec::with_capacity(ct_count);
for _ in 0..ct_count {
accepted_compression_types.push(i8::decode(buf)?);
}
let push_interval_ms = i32::decode(buf)?;
let telemetry_max_bytes = i32::decode(buf)?;
let delta_temporality = {
if buf.remaining() < 1 {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::TruncatedFrame,
"not enough bytes for delta_temporality",
));
}
buf.get_u8() != 0
};
let rm_count = check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut requested_metrics = Vec::with_capacity(rm_count);
for _ in 0..rm_count {
let s = super::non_nullable_string(
"requested metric",
KafkaString::decode_compact(buf)?.0,
)?;
requested_metrics.push(s);
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
client_instance_id,
subscription_id,
accepted_compression_types,
push_interval_ms,
telemetry_max_bytes,
delta_temporality,
requested_metrics,
})
}
}
#[derive(Debug, Clone)]
pub struct PushTelemetryRequest {
pub client_instance_id: [u8; 16],
pub subscription_id: i32,
pub terminating: bool,
pub compression_type: i8,
pub metrics: Bytes,
}
impl PushTelemetryRequest {
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
buf.put_slice(&self.client_instance_id);
self.subscription_id.encode(buf);
buf.put_u8(u8::from(self.terminating));
self.compression_type.encode(buf);
let metrics_len = u32::try_from(self.metrics.len().saturating_add(1)).map_err(|_| {
KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
"metrics payload too large",
)
})?;
crate::util::varint::encode_unsigned_varint(metrics_len, buf);
buf.put_slice(&self.metrics);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct PushTelemetryResponse {
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
}
impl PushTelemetryResponse {
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
})
}
}
impl VersionedEncode for GetTelemetrySubscriptionsRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 => self.encode_v0(buf)?,
_ => return unsupported_encode!("GetTelemetrySubscriptionsRequest", version),
}
Ok(())
}
}
impl VersionedDecode for GetTelemetrySubscriptionsResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 => Self::decode_v0(buf),
_ => unsupported_decode!("GetTelemetrySubscriptionsResponse", version),
}
}
}
impl VersionedEncode for PushTelemetryRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 => self.encode_v0(buf)?,
_ => return unsupported_encode!("PushTelemetryRequest", version),
}
Ok(())
}
}
impl VersionedDecode for PushTelemetryResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 => Self::decode_v0(buf),
_ => unsupported_decode!("PushTelemetryResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::util::varint;
use bytes::BytesMut;
#[test]
fn test_get_telemetry_subscriptions_encode_v0() {
let req = GetTelemetrySubscriptionsRequest {
client_instance_id: [0u8; 16],
};
let mut buf = BytesMut::new();
req.encode_versioned(0, &mut buf).unwrap();
assert_eq!(buf.len(), 17);
}
#[test]
fn test_get_telemetry_subscriptions_response_decode_v0() {
let mut buf = BytesMut::new();
buf.put_i32(100); buf.put_i16(0); buf.put_slice(&[7u8; 16]); buf.put_i32(42); varint::encode_unsigned_varint(3, &mut buf); buf.put_i8(0);
buf.put_i8(1);
buf.put_i32(30_000); buf.put_i32(1_048_576); buf.put_u8(1); varint::encode_unsigned_varint(2, &mut buf); let metric = b"cpu.usage";
varint::encode_unsigned_varint((metric.len() + 1) as u32, &mut buf);
buf.put_slice(metric);
varint::encode_unsigned_varint(0, &mut buf);
let resp =
GetTelemetrySubscriptionsResponse::decode_versioned(0, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 100);
assert_eq!(resp.client_instance_id, [7u8; 16]);
assert_eq!(resp.subscription_id, 42);
assert_eq!(resp.accepted_compression_types, vec![0, 1]);
assert_eq!(resp.push_interval_ms, 30_000);
assert_eq!(resp.telemetry_max_bytes, 1_048_576);
assert!(resp.delta_temporality);
assert_eq!(resp.requested_metrics, vec!["cpu.usage"]);
}
#[test]
fn test_push_telemetry_encode_v0() {
let req = PushTelemetryRequest {
client_instance_id: [5u8; 16],
subscription_id: 99,
terminating: true,
compression_type: 1,
metrics: Bytes::from_static(b"binary-data"),
};
let mut buf = BytesMut::new();
req.encode_versioned(0, &mut buf).unwrap();
assert!(!buf.is_empty());
let encoded = buf.freeze();
assert!(
encoded.windows(11).any(|w| w == b"binary-data"),
"expected metrics payload in output"
);
}
#[test]
fn test_push_telemetry_response_decode_v0() {
let mut buf = BytesMut::new();
buf.put_i32(50); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf);
let resp = PushTelemetryResponse::decode_versioned(0, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 50);
assert_eq!(resp.error_code, ErrorCode::None);
}
}