Skip to main content

crabka_protocol/records/metadata/
envelope.rs

1//! The `KRaft` metadata record-value envelope (`MetadataRecordSerde` /
2//! `ApiMessageAndVersion`): a record value is
3//! `frameVersion (uvarint, 1) + apiKey (uvarint) + apiVersion (uvarint) + body`.
4
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6
7use crate::primitives::varint::{get_uvarint, put_uvarint, uvarint_len};
8
9/// Current `KRaft` metadata frame version. Kafka writes 1 (verified against
10/// apache/kafka:4.0.0 `__cluster_metadata` record values — the leading byte is
11/// `0x01`).
12pub const FRAME_VERSION: u32 = 1;
13
14/// Decoded envelope header (everything before the message body).
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub struct ValueHeader {
17    pub frame_version: u32,
18    pub api_key: u32,
19    pub api_version: u32,
20}
21
22/// Error decoding a metadata record value envelope.
23#[derive(Debug, thiserror::Error)]
24pub enum EnvelopeError {
25    #[error("truncated metadata record value envelope")]
26    Truncated,
27}
28
29/// Encode a record value: envelope header + the already-encoded `body` bytes.
30#[must_use]
31pub fn encode_value(api_key: u32, api_version: u32, body: &[u8]) -> Bytes {
32    let mut out = BytesMut::with_capacity(
33        uvarint_len(FRAME_VERSION) + uvarint_len(api_key) + uvarint_len(api_version) + body.len(),
34    );
35    put_uvarint(&mut out, FRAME_VERSION);
36    put_uvarint(&mut out, api_key);
37    put_uvarint(&mut out, api_version);
38    out.put_slice(body);
39    out.freeze()
40}
41
42/// Decode the envelope header, leaving `buf` positioned at the message body.
43///
44/// # Errors
45/// Returns [`EnvelopeError::Truncated`] if any varint cannot be read.
46pub fn decode_value_header<B: Buf>(buf: &mut B) -> Result<ValueHeader, EnvelopeError> {
47    let frame_version = get_uvarint(buf).map_err(|_| EnvelopeError::Truncated)?;
48    let api_key = get_uvarint(buf).map_err(|_| EnvelopeError::Truncated)?;
49    let api_version = get_uvarint(buf).map_err(|_| EnvelopeError::Truncated)?;
50    Ok(ValueHeader {
51        frame_version,
52        api_key,
53        api_version,
54    })
55}
56
57#[cfg(test)]
58mod tests {
59    use super::*;
60    use assert2::assert;
61
62    #[test]
63    fn frame_zero_apikey_apiversion_roundtrip() {
64        // FeatureLevelRecord apiKey=12 apiVersion=0, body bytes b"\x01\x02".
65        let body: &[u8] = &[0x01, 0x02];
66        let value = encode_value(12, 0, body);
67        // frameVersion(0)=1 byte, apiKey(12)=1 byte, apiVersion(0)=1 byte, +body.
68        assert!(value.len() == 3 + body.len());
69        let mut cur: &[u8] = &value;
70        let hdr = decode_value_header(&mut cur).expect("decode header");
71        assert!(hdr.frame_version == 1);
72        assert!(hdr.api_key == 12);
73        assert!(hdr.api_version == 0);
74        assert!(cur == body);
75    }
76
77    #[test]
78    fn truncated_value_errors() {
79        let mut cur: &[u8] = &[]; // no frameVersion byte
80        assert!(decode_value_header(&mut cur).is_err());
81    }
82}