rustfs-kafka 1.2.0

Rust client for Apache Kafka
Documentation
//! Low-level Kafka protocol transport utilities.
//!
//! Provides functions for sending requests and receiving responses over
//! a [`KafkaConnection`], handling the Kafka wire protocol frame format
//! (4-byte length prefix + header + body).

use tracing::trace;

use crate::error::{Error, Result};

use crate::network::KafkaConnection;

pub(crate) fn kp_send_request<T>(
    conn: &mut KafkaConnection,
    header: &kafka_protocol::messages::RequestHeader,
    body: &T,
    api_version: i16,
) -> Result<()>
where
    T: kafka_protocol::protocol::Encodable + kafka_protocol::protocol::HeaderVersion,
{
    use bytes::BytesMut;
    use kafka_protocol::protocol::Encodable;

    let header_version = T::header_version(api_version);

    let mut header_buf = BytesMut::new();
    header
        .encode(&mut header_buf, header_version)
        .map_err(|_| Error::codec())?;

    let mut body_buf = BytesMut::new();
    body.encode(&mut body_buf, api_version)
        .map_err(|_| Error::codec())?;

    let total_len = crate::protocol::usize_to_i32(header_buf.len() + body_buf.len())?;
    let out_len = crate::protocol::non_negative_i32_to_usize(total_len)?;
    let mut out = BytesMut::with_capacity(4 + out_len);
    out.extend_from_slice(&total_len.to_be_bytes());
    out.extend_from_slice(&header_buf);
    out.extend_from_slice(&body_buf);

    trace!("kp_send_request: sending {} bytes", out.len());
    conn.send(&out)?;
    Ok(())
}

pub(crate) fn kp_get_response<
    R: kafka_protocol::protocol::Decodable + kafka_protocol::protocol::HeaderVersion,
>(
    conn: &mut KafkaConnection,
    api_version: i16,
) -> Result<R> {
    use kafka_protocol::messages::ResponseHeader;
    use kafka_protocol::protocol::Decodable;

    let size = get_response_size(conn)?;
    let mut bytes = conn.read_exact_alloc(crate::protocol::non_negative_i32_to_u64(size)?)?;
    let response_header_version = R::header_version(api_version);
    let _resp_header =
        ResponseHeader::decode(&mut bytes, response_header_version).map_err(|_| Error::codec())?;

    R::decode(&mut bytes, api_version).map_err(|_| Error::codec())
}

pub(crate) fn get_response_size(conn: &mut KafkaConnection) -> Result<i32> {
    let mut buf = [0u8; 4];
    conn.read_exact(&mut buf)?;
    Ok(i32::from_be_bytes(buf))
}