use tracing::trace;
use crate::error::{Error, Result};
use crate::network::KafkaConnection;
pub(crate) fn kp_send_request<T>(
conn: &mut KafkaConnection,
header: &kafka_protocol::messages::RequestHeader,
body: &T,
api_version: i16,
) -> Result<()>
where
T: kafka_protocol::protocol::Encodable + kafka_protocol::protocol::HeaderVersion,
{
use bytes::BytesMut;
use kafka_protocol::protocol::Encodable;
let header_version = T::header_version(api_version);
let mut header_buf = BytesMut::new();
header
.encode(&mut header_buf, header_version)
.map_err(|_| Error::codec())?;
let mut body_buf = BytesMut::new();
body.encode(&mut body_buf, api_version)
.map_err(|_| Error::codec())?;
let total_len = crate::protocol::usize_to_i32(header_buf.len() + body_buf.len())?;
let out_len = crate::protocol::non_negative_i32_to_usize(total_len)?;
let mut out = BytesMut::with_capacity(4 + out_len);
out.extend_from_slice(&total_len.to_be_bytes());
out.extend_from_slice(&header_buf);
out.extend_from_slice(&body_buf);
trace!("kp_send_request: sending {} bytes", out.len());
conn.send(&out)?;
Ok(())
}
pub(crate) fn kp_get_response<
R: kafka_protocol::protocol::Decodable + kafka_protocol::protocol::HeaderVersion,
>(
conn: &mut KafkaConnection,
api_version: i16,
) -> Result<R> {
use kafka_protocol::messages::ResponseHeader;
use kafka_protocol::protocol::Decodable;
let size = get_response_size(conn)?;
let mut bytes = conn.read_exact_alloc(crate::protocol::non_negative_i32_to_u64(size)?)?;
let response_header_version = R::header_version(api_version);
let _resp_header =
ResponseHeader::decode(&mut bytes, response_header_version).map_err(|_| Error::codec())?;
R::decode(&mut bytes, api_version).map_err(|_| Error::codec())
}
pub(crate) fn get_response_size(conn: &mut KafkaConnection) -> Result<i32> {
let mut buf = [0u8; 4];
conn.read_exact(&mut buf)?;
Ok(i32::from_be_bytes(buf))
}