use bytes::{Buf, BufMut, Bytes};
use crabka_protocol::ProtocolError;
pub const API_KEY_SUBMIT_CHANGE: i16 = 1003;
pub const API_KEY_METADATA_FETCH: i16 = 1004;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CrabkaSubmitChangeRequest {
pub records: Bytes,
}
impl CrabkaSubmitChangeRequest {
pub fn encode_v0(&self, out: &mut Vec<u8>) -> Result<(), ProtocolError> {
out.put_i32(
i32::try_from(self.records.len())
.map_err(|_| ProtocolError::InvalidValue("records length exceeds i32::MAX"))?,
);
out.put_slice(&self.records);
Ok(())
}
pub fn decode_v0(buf: &mut &[u8]) -> Result<Self, ProtocolError> {
if buf.remaining() < 4 {
return Err(ProtocolError::UnexpectedEof {
needed: 4 - buf.remaining(),
});
}
let len = buf.get_i32();
let len = usize::try_from(len)
.map_err(|_| ProtocolError::InvalidValue("negative records length"))?;
if buf.remaining() < len {
return Err(ProtocolError::UnexpectedEof {
needed: len - buf.remaining(),
});
}
let records = Bytes::copy_from_slice(&buf[..len]);
buf.advance(len);
Ok(Self { records })
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CrabkaSubmitChangeResponse {
pub error_code: i16,
pub leader_hint: i64,
}
impl CrabkaSubmitChangeResponse {
pub fn encode_v0(&self, out: &mut Vec<u8>) {
out.put_i16(self.error_code);
out.put_i64(self.leader_hint);
}
pub fn decode_v0(buf: &mut &[u8]) -> Result<Self, ProtocolError> {
const LEN: usize = 2 + 8;
if buf.remaining() < LEN {
return Err(ProtocolError::UnexpectedEof {
needed: LEN - buf.remaining(),
});
}
Ok(Self {
error_code: buf.get_i16(),
leader_hint: buf.get_i64(),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CrabkaMetadataFetchRequest {
pub fetch_offset: i64,
pub max_bytes: i32,
}
impl CrabkaMetadataFetchRequest {
pub fn encode_v0(&self, out: &mut Vec<u8>) {
out.put_i64(self.fetch_offset);
out.put_i32(self.max_bytes);
}
pub fn decode_v0(buf: &mut &[u8]) -> Result<Self, ProtocolError> {
const LEN: usize = 8 + 4;
if buf.remaining() < LEN {
return Err(ProtocolError::UnexpectedEof {
needed: LEN - buf.remaining(),
});
}
Ok(Self {
fetch_offset: buf.get_i64(),
max_bytes: buf.get_i32(),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CrabkaMetadataFetchResponse {
pub error_code: i16,
pub leader_hint: i64,
pub log_start_offset: i64,
pub high_watermark: i64,
pub records: Bytes,
}
impl CrabkaMetadataFetchResponse {
pub fn encode_v0(&self, out: &mut Vec<u8>) -> Result<(), ProtocolError> {
out.put_i16(self.error_code);
out.put_i64(self.leader_hint);
out.put_i64(self.log_start_offset);
out.put_i64(self.high_watermark);
out.put_i32(
i32::try_from(self.records.len())
.map_err(|_| ProtocolError::InvalidValue("records length exceeds i32::MAX"))?,
);
out.put_slice(&self.records);
Ok(())
}
pub fn decode_v0(buf: &mut &[u8]) -> Result<Self, ProtocolError> {
const FIXED: usize = 2 + 8 + 8 + 8 + 4;
if buf.remaining() < FIXED {
return Err(ProtocolError::UnexpectedEof {
needed: FIXED - buf.remaining(),
});
}
let error_code = buf.get_i16();
let leader_hint = buf.get_i64();
let log_start_offset = buf.get_i64();
let high_watermark = buf.get_i64();
let len = buf.get_i32();
let len = usize::try_from(len)
.map_err(|_| ProtocolError::InvalidValue("negative records length"))?;
if buf.remaining() < len {
return Err(ProtocolError::UnexpectedEof {
needed: len - buf.remaining(),
});
}
let records = Bytes::copy_from_slice(&buf[..len]);
buf.advance(len);
Ok(Self {
error_code,
leader_hint,
log_start_offset,
high_watermark,
records,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn submit_change_round_trips() {
let req = CrabkaSubmitChangeRequest {
records: Bytes::from_static(b"\x01\x02\x03"),
};
let mut out = Vec::new();
req.encode_v0(&mut out).unwrap();
let mut cur: &[u8] = &out;
assert!(CrabkaSubmitChangeRequest::decode_v0(&mut cur).unwrap() == req);
let resp = CrabkaSubmitChangeResponse {
error_code: 1,
leader_hint: 3,
};
let mut out = Vec::new();
resp.encode_v0(&mut out);
let mut cur: &[u8] = &out;
assert!(CrabkaSubmitChangeResponse::decode_v0(&mut cur).unwrap() == resp);
}
#[test]
fn metadata_fetch_round_trips() {
let req = CrabkaMetadataFetchRequest {
fetch_offset: 42,
max_bytes: 1_048_576,
};
let mut out = Vec::new();
req.encode_v0(&mut out);
let mut cur: &[u8] = &out;
assert!(CrabkaMetadataFetchRequest::decode_v0(&mut cur).unwrap() == req);
let resp = CrabkaMetadataFetchResponse {
error_code: 0,
leader_hint: 3,
log_start_offset: 1,
high_watermark: 99,
records: Bytes::from_static(b"\x01\x02\x03"),
};
let mut out = Vec::new();
resp.encode_v0(&mut out).unwrap();
let mut cur: &[u8] = &out;
assert!(CrabkaMetadataFetchResponse::decode_v0(&mut cur).unwrap() == resp);
}
}