use bytes::{Buf, BufMut, Bytes, BytesMut};
use tokio::sync::oneshot;
use crate::error::RaftError;
use crate::kraft::event::Event;
use crate::kraft::types::{LeaderEpoch, LogOffsetMetadata, NodeId};
#[derive(Debug)]
pub enum Inbound {
Vote {
req: Bytes,
reply: oneshot::Sender<Bytes>,
},
BeginQuorumEpoch {
req: Bytes,
reply: oneshot::Sender<Bytes>,
},
EndQuorumEpoch {
req: Bytes,
reply: oneshot::Sender<Bytes>,
},
Fetch {
req: Bytes,
reply: oneshot::Sender<Bytes>,
},
FetchSnapshot {
req: Bytes,
reply: oneshot::Sender<Bytes>,
},
}
pub enum Command {
Inbound(Inbound),
Event(Event),
FetchResponse {
from: NodeId,
body: Bytes,
},
FetchSnapshotResponse { from: NodeId, body: Bytes },
Timer(TimerTick),
SubmitChange {
records: Vec<crabka_metadata::MetadataRecord>,
reply: oneshot::Sender<Result<(), RaftError>>,
},
TriggerSnapshot {
reply: oneshot::Sender<Result<(), RaftError>>,
},
QuorumStateSnapshot {
reply: oneshot::Sender<QuorumStateSnapshot>,
},
MetadataFetch {
fetch_offset: i64,
max_bytes: usize,
reply: oneshot::Sender<MetadataFetchSlice>,
},
#[cfg(test)]
TestAppendAndCommit {
records: Vec<crabka_metadata::MetadataRecord>,
reply: oneshot::Sender<i64>,
},
Shutdown,
}
#[derive(Debug, Clone)]
pub struct MetadataFetchSlice {
pub records: bytes::Bytes,
pub log_start_offset: i64,
pub high_watermark: i64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TimerTick {
Election,
Fetch,
Heartbeat,
}
#[derive(Debug, Clone)]
pub struct QuorumStateSnapshot {
pub leader_id: Option<NodeId>,
pub leader_epoch: LeaderEpoch,
pub high_watermark: i64,
pub log_end_offset: i64,
pub log_start_offset: i64,
pub voters: Vec<NodeId>,
pub per_voter_fetch_offset: std::collections::BTreeMap<NodeId, i64>,
}
#[async_trait::async_trait]
pub trait PeerSender: Send + Sync {
async fn send(&self, peer: NodeId, api_key: i16, body: Bytes) -> Result<Bytes, RaftError>;
}
pub struct NullPeerSender;
#[async_trait::async_trait]
impl PeerSender for NullPeerSender {
async fn send(&self, peer: NodeId, _api_key: i16, _body: Bytes) -> Result<Bytes, RaftError> {
Err(RaftError::NotLeader {
current_leader: Some(peer),
})
}
}
pub mod api_key {
pub const FETCH: i16 = 1;
pub const VOTE: i16 = 52;
pub const BEGIN_QUORUM_EPOCH: i16 = 53;
pub const END_QUORUM_EPOCH: i16 = 54;
pub const FETCH_SNAPSHOT: i16 = 59;
}
pub mod wire {
use bytes::{Buf, Bytes, BytesMut};
use crabka_protocol::owned::begin_quorum_epoch_request::{
self as bqe_req, BeginQuorumEpochRequest,
};
use crabka_protocol::owned::begin_quorum_epoch_response::BeginQuorumEpochResponse;
use crabka_protocol::owned::end_quorum_epoch_request::{
self as eqe_req, EndQuorumEpochRequest,
};
use crabka_protocol::owned::end_quorum_epoch_response::EndQuorumEpochResponse;
use crabka_protocol::owned::fetch_request::{self as fetch_req, FetchRequest};
use crabka_protocol::owned::fetch_response::{self as fetch_resp, FetchResponse};
use crabka_protocol::owned::fetch_snapshot_request::{self as fs_req, FetchSnapshotRequest};
use crabka_protocol::owned::fetch_snapshot_response::{self as fs_resp, FetchSnapshotResponse};
use crabka_protocol::owned::vote_request::{self as vote_req, VoteRequest};
use crabka_protocol::owned::vote_response::{self as vote_resp, VoteResponse};
use crabka_protocol::records::RecordsPayload;
use crabka_protocol::{Decode, Encode};
use super::{LeaderEpoch, LogOffsetMetadata, NodeId};
use crabka_protocol::primitives::uuid::Uuid as MetaUuid;
const METADATA_TOPIC: &str = "__cluster_metadata";
const METADATA_PARTITION: i32 = 0;
const METADATA_TOPIC_ID: MetaUuid = MetaUuid([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]);
const VOTE_VERSION: i16 = 2;
const QUORUM_EPOCH_VERSION: i16 = 1;
const FETCH_VERSION: i16 = 17;
const FETCH_SNAPSHOT_VERSION: i16 = 1;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PeerRequest {
Vote {
voter_id: NodeId,
candidate_epoch: LeaderEpoch,
candidate: NodeId,
last_epoch: LeaderEpoch,
last_offset: i64,
pre_vote: bool,
},
BeginQuorumEpoch {
leader_id: NodeId,
leader_epoch: LeaderEpoch,
},
EndQuorumEpoch {
leader_id: NodeId,
leader_epoch: LeaderEpoch,
},
Fetch {
from: NodeId,
fetch_epoch: LeaderEpoch,
fetch_offset: i64,
},
FetchSnapshot {
from: NodeId,
snapshot_id: (i64, i32),
position: i64,
max_bytes: i32,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PeerResponse {
Vote {
epoch: LeaderEpoch,
granted: bool,
},
Ack {
epoch: LeaderEpoch,
},
Fetch {
leader_id: NodeId,
leader_epoch: LeaderEpoch,
diverging: Option<LogOffsetMetadata>,
snapshot_id: Option<(i64, i32)>,
hwm: i64,
records: Bytes,
},
FetchSnapshot {
snapshot_id: (i64, i32),
size: i64,
position: i64,
bytes: Bytes,
error_code: i16,
},
}
#[allow(clippy::cast_possible_wrap)]
fn epoch_to_wire(e: LeaderEpoch) -> i32 {
i32::try_from(e).unwrap_or(i32::MAX)
}
#[allow(clippy::cast_sign_loss)]
fn epoch_from_wire(e: i32) -> LeaderEpoch {
u32::try_from(e).unwrap_or(0)
}
fn node_to_wire(n: NodeId) -> i32 {
i32::try_from(n).unwrap_or(i32::MAX)
}
#[allow(clippy::cast_sign_loss)]
fn node_from_wire(n: i32) -> NodeId {
u64::try_from(n).unwrap_or(0)
}
fn encode_body<T: Encode>(msg: &T, version: i16) -> Bytes {
let mut buf = BytesMut::new();
let _ = msg.encode(&mut buf, version);
buf.freeze()
}
impl PeerRequest {
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn encode(&self) -> Bytes {
match *self {
PeerRequest::Vote {
voter_id,
candidate_epoch,
candidate,
last_epoch,
last_offset,
pre_vote,
} => {
let req = VoteRequest {
cluster_id: None,
voter_id: node_to_wire(voter_id),
topics: vec![vote_req::TopicData {
topic_name: METADATA_TOPIC.to_string(),
partitions: vec![vote_req::PartitionData {
partition_index: METADATA_PARTITION,
replica_epoch: epoch_to_wire(candidate_epoch),
replica_id: node_to_wire(candidate),
last_offset_epoch: epoch_to_wire(last_epoch),
last_offset,
pre_vote,
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
encode_body(&req, VOTE_VERSION)
}
PeerRequest::BeginQuorumEpoch {
leader_id,
leader_epoch,
} => {
let req = BeginQuorumEpochRequest {
cluster_id: None,
voter_id: -1,
topics: vec![bqe_req::TopicData {
topic_name: METADATA_TOPIC.to_string(),
partitions: vec![bqe_req::PartitionData {
partition_index: METADATA_PARTITION,
leader_id: node_to_wire(leader_id),
leader_epoch: epoch_to_wire(leader_epoch),
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
encode_body(&req, QUORUM_EPOCH_VERSION)
}
PeerRequest::EndQuorumEpoch {
leader_id,
leader_epoch,
} => {
let req = EndQuorumEpochRequest {
cluster_id: None,
topics: vec![eqe_req::TopicData {
topic_name: METADATA_TOPIC.to_string(),
partitions: vec![eqe_req::PartitionData {
partition_index: METADATA_PARTITION,
leader_id: node_to_wire(leader_id),
leader_epoch: epoch_to_wire(leader_epoch),
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
encode_body(&req, QUORUM_EPOCH_VERSION)
}
PeerRequest::Fetch {
from,
fetch_epoch,
fetch_offset,
} => {
let req = FetchRequest {
replica_state: fetch_req::ReplicaState {
replica_id: node_to_wire(from),
replica_epoch: -1,
..Default::default()
},
topics: vec![fetch_req::FetchTopic {
topic: METADATA_TOPIC.to_string(),
topic_id: METADATA_TOPIC_ID,
partitions: vec![fetch_req::FetchPartition {
partition: METADATA_PARTITION,
current_leader_epoch: epoch_to_wire(fetch_epoch),
fetch_offset,
last_fetched_epoch: epoch_to_wire(fetch_epoch),
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
encode_body(&req, FETCH_VERSION)
}
PeerRequest::FetchSnapshot {
from,
snapshot_id,
position,
max_bytes,
} => encode_fetch_snapshot_request(from, snapshot_id, position, max_bytes),
}
}
#[must_use]
pub fn decode(buf: &[u8]) -> Option<Self> {
let mut cur = buf;
if let Ok(req) = VoteRequest::decode(&mut cur, VOTE_VERSION)
&& cur.is_empty()
&& let Some(p) = req.topics.first().and_then(|t| t.partitions.first())
{
return Some(PeerRequest::Vote {
voter_id: node_from_wire(req.voter_id),
candidate_epoch: epoch_from_wire(p.replica_epoch),
candidate: node_from_wire(p.replica_id),
last_epoch: epoch_from_wire(p.last_offset_epoch),
last_offset: p.last_offset,
pre_vote: p.pre_vote,
});
}
None
}
}
#[must_use]
pub fn decode_vote(buf: &[u8]) -> Option<PeerRequest> {
let mut cur = buf;
let req = VoteRequest::decode(&mut cur, VOTE_VERSION).ok()?;
let p = req.topics.first()?.partitions.first()?;
Some(PeerRequest::Vote {
voter_id: node_from_wire(req.voter_id),
candidate_epoch: epoch_from_wire(p.replica_epoch),
candidate: node_from_wire(p.replica_id),
last_epoch: epoch_from_wire(p.last_offset_epoch),
last_offset: p.last_offset,
pre_vote: p.pre_vote,
})
}
#[must_use]
pub fn decode_begin(buf: &[u8]) -> Option<PeerRequest> {
let mut cur = buf;
let req = BeginQuorumEpochRequest::decode(&mut cur, QUORUM_EPOCH_VERSION).ok()?;
let p = req.topics.first()?.partitions.first()?;
Some(PeerRequest::BeginQuorumEpoch {
leader_id: node_from_wire(p.leader_id),
leader_epoch: epoch_from_wire(p.leader_epoch),
})
}
#[must_use]
pub fn decode_end(buf: &[u8]) -> Option<PeerRequest> {
let mut cur = buf;
let req = EndQuorumEpochRequest::decode(&mut cur, QUORUM_EPOCH_VERSION).ok()?;
let p = req.topics.first()?.partitions.first()?;
Some(PeerRequest::EndQuorumEpoch {
leader_id: node_from_wire(p.leader_id),
leader_epoch: epoch_from_wire(p.leader_epoch),
})
}
#[must_use]
pub fn decode_fetch(buf: &[u8]) -> Option<PeerRequest> {
let mut cur = buf;
let req = FetchRequest::decode(&mut cur, FETCH_VERSION).ok()?;
let from = node_from_wire(req.replica_state.replica_id);
let p = req.topics.first()?.partitions.first()?;
Some(PeerRequest::Fetch {
from,
fetch_epoch: epoch_from_wire(p.last_fetched_epoch),
fetch_offset: p.fetch_offset,
})
}
fn encode_fetch_snapshot_request(
from: NodeId,
snapshot_id: (i64, i32),
position: i64,
max_bytes: i32,
) -> Bytes {
let (end_offset, epoch) = snapshot_id;
let req = FetchSnapshotRequest {
replica_id: node_to_wire(from),
max_bytes,
topics: vec![fs_req::TopicSnapshot {
name: METADATA_TOPIC.to_string(),
partitions: vec![fs_req::PartitionSnapshot {
partition: METADATA_PARTITION,
current_leader_epoch: epoch,
snapshot_id: fs_req::SnapshotId {
end_offset,
epoch,
..Default::default()
},
position,
..Default::default()
}],
..Default::default()
}],
cluster_id: None,
..Default::default()
};
encode_body(&req, FETCH_SNAPSHOT_VERSION)
}
fn encode_fetch_snapshot_response(
snapshot_id: (i64, i32),
size: i64,
position: i64,
bytes: &Bytes,
error_code: i16,
) -> Bytes {
let (end_offset, epoch) = snapshot_id;
let resp = FetchSnapshotResponse {
topics: vec![fs_resp::TopicSnapshot {
name: METADATA_TOPIC.to_string(),
partitions: vec![fs_resp::PartitionSnapshot {
index: METADATA_PARTITION,
error_code,
snapshot_id: fs_resp::SnapshotId {
end_offset,
epoch,
..Default::default()
},
size,
position,
unaligned_records: RecordsPayload::Raw(bytes.clone()),
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
encode_body(&resp, FETCH_SNAPSHOT_VERSION)
}
#[must_use]
pub fn decode_fetch_snapshot(buf: &[u8]) -> Option<PeerRequest> {
let mut cur = buf;
let req = FetchSnapshotRequest::decode(&mut cur, FETCH_SNAPSHOT_VERSION).ok()?;
let p = req.topics.first()?.partitions.first()?;
Some(PeerRequest::FetchSnapshot {
from: node_from_wire(req.replica_id),
snapshot_id: (p.snapshot_id.end_offset, p.snapshot_id.epoch),
position: p.position,
max_bytes: req.max_bytes,
})
}
impl PeerResponse {
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn encode(&self) -> Bytes {
match self {
PeerResponse::Vote { epoch, granted } => {
let resp = VoteResponse {
error_code: 0,
topics: vec![vote_resp::TopicData {
topic_name: METADATA_TOPIC.to_string(),
partitions: vec![vote_resp::PartitionData {
partition_index: METADATA_PARTITION,
error_code: 0,
leader_id: -1,
leader_epoch: epoch_to_wire(*epoch),
vote_granted: *granted,
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
encode_body(&resp, VOTE_VERSION)
}
PeerResponse::Ack { epoch } => {
let resp = BeginQuorumEpochResponse {
error_code: 0,
topics: vec![
crabka_protocol::owned::begin_quorum_epoch_response::TopicData {
topic_name: METADATA_TOPIC.to_string(),
partitions: vec![
crabka_protocol::owned::begin_quorum_epoch_response::PartitionData {
partition_index: METADATA_PARTITION,
error_code: 0,
leader_id: -1,
leader_epoch: epoch_to_wire(*epoch),
..Default::default()
},
],
..Default::default()
},
],
..Default::default()
};
encode_body(&resp, QUORUM_EPOCH_VERSION)
}
PeerResponse::Fetch {
leader_id,
leader_epoch,
diverging,
snapshot_id,
hwm,
records,
} => {
let mut partition = fetch_resp::PartitionData {
partition_index: METADATA_PARTITION,
error_code: 0,
high_watermark: *hwm,
current_leader: fetch_resp::LeaderIdAndEpoch {
leader_id: node_to_wire(*leader_id),
leader_epoch: epoch_to_wire(*leader_epoch),
..Default::default()
},
..Default::default()
};
if let Some(point) = diverging {
partition.diverging_epoch = fetch_resp::EpochEndOffset {
epoch: epoch_to_wire(point.epoch),
end_offset: point.offset,
..Default::default()
};
}
if let Some((end_offset, epoch)) = snapshot_id {
partition.snapshot_id = fetch_resp::SnapshotId {
end_offset: *end_offset,
epoch: *epoch,
..Default::default()
};
}
if !records.is_empty() {
partition.records = Some(RecordsPayload::Raw(records.clone()));
}
let resp = FetchResponse {
responses: vec![fetch_resp::FetchableTopicResponse {
topic: METADATA_TOPIC.to_string(),
topic_id: METADATA_TOPIC_ID,
partitions: vec![partition],
..Default::default()
}],
..Default::default()
};
encode_body(&resp, FETCH_VERSION)
}
PeerResponse::FetchSnapshot {
snapshot_id,
size,
position,
bytes,
error_code,
} => encode_fetch_snapshot_response(
*snapshot_id,
*size,
*position,
bytes,
*error_code,
),
}
}
#[must_use]
pub fn decode_vote(buf: &[u8]) -> Option<Self> {
let mut cur = buf;
let resp = VoteResponse::decode(&mut cur, VOTE_VERSION).ok()?;
let p = resp.topics.first()?.partitions.first()?;
Some(PeerResponse::Vote {
epoch: epoch_from_wire(p.leader_epoch),
granted: p.vote_granted,
})
}
#[must_use]
pub fn decode_ack(buf: &[u8]) -> Option<Self> {
let mut cur = buf;
let resp = BeginQuorumEpochResponse::decode(&mut cur, QUORUM_EPOCH_VERSION).ok()?;
let p = resp.topics.first()?.partitions.first()?;
Some(PeerResponse::Ack {
epoch: epoch_from_wire(p.leader_epoch),
})
}
#[must_use]
pub fn decode_fetch(buf: &[u8]) -> Option<Self> {
let mut cur = buf;
let resp = FetchResponse::decode(&mut cur, FETCH_VERSION).ok()?;
let p = resp.responses.first()?.partitions.first()?;
let leader_id = node_from_wire(p.current_leader.leader_id);
let leader_epoch = epoch_from_wire(p.current_leader.leader_epoch);
let diverging = if p.diverging_epoch.end_offset >= 0 {
Some(LogOffsetMetadata {
offset: p.diverging_epoch.end_offset,
epoch: epoch_from_wire(p.diverging_epoch.epoch),
})
} else {
None
};
let snapshot_id = if p.snapshot_id.end_offset >= 0 {
Some((p.snapshot_id.end_offset, p.snapshot_id.epoch))
} else {
None
};
let records = match &p.records {
Some(RecordsPayload::Raw(b)) => b.clone(),
Some(other) => {
let mut out = BytesMut::new();
let _ = other.encode_to(&mut out);
out.freeze()
}
None => Bytes::new(),
};
Some(PeerResponse::Fetch {
leader_id,
leader_epoch,
diverging,
snapshot_id,
hwm: p.high_watermark,
records,
})
}
#[must_use]
pub fn decode_fetch_snapshot(buf: &[u8]) -> Option<Self> {
let mut cur = buf;
let resp = FetchSnapshotResponse::decode(&mut cur, FETCH_SNAPSHOT_VERSION).ok()?;
let p = resp.topics.first()?.partitions.first()?;
let bytes = match &p.unaligned_records {
RecordsPayload::Raw(b) => b.clone(),
other => {
let mut o = BytesMut::new();
let _ = other.encode_to(&mut o);
o.freeze()
}
};
Some(PeerResponse::FetchSnapshot {
snapshot_id: (p.snapshot_id.end_offset, p.snapshot_id.epoch),
size: p.size,
position: p.position,
bytes,
error_code: p.error_code,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn vote_request_round_trips() {
let req = PeerRequest::Vote {
voter_id: 9,
candidate_epoch: 3,
candidate: 7,
last_epoch: 2,
last_offset: 42,
pre_vote: true,
};
assert!(decode_vote(&req.encode()) == Some(req));
}
#[test]
fn begin_end_round_trip() {
let begin = PeerRequest::BeginQuorumEpoch {
leader_id: 5,
leader_epoch: 9,
};
assert!(decode_begin(&begin.encode()) == Some(begin));
let end = PeerRequest::EndQuorumEpoch {
leader_id: 1,
leader_epoch: 4,
};
assert!(decode_end(&end.encode()) == Some(end));
}
#[test]
fn fetch_request_round_trips() {
let req = PeerRequest::Fetch {
from: 2,
fetch_epoch: 1,
fetch_offset: 11,
};
assert!(decode_fetch(&req.encode()) == Some(req));
}
#[test]
fn vote_response_round_trips() {
let resp = PeerResponse::Vote {
epoch: 3,
granted: true,
};
assert!(PeerResponse::decode_vote(&resp.encode()) == Some(resp));
}
#[test]
fn decodes_jvm_style_response_without_echo_tag() {
let resp = VoteResponse {
error_code: 0,
topics: vec![vote_resp::TopicData {
topic_name: METADATA_TOPIC.to_string(),
partitions: vec![vote_resp::PartitionData {
partition_index: METADATA_PARTITION,
error_code: 0,
leader_id: -1,
leader_epoch: epoch_to_wire(7),
vote_granted: true,
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
let bytes = encode_body(&resp, VOTE_VERSION);
let decoded = PeerResponse::decode_vote(&bytes).unwrap();
assert!(
decoded
== PeerResponse::Vote {
epoch: 7,
granted: true
}
);
}
#[test]
fn ack_round_trips() {
let resp = PeerResponse::Ack { epoch: 8 };
assert!(PeerResponse::decode_ack(&resp.encode()) == Some(resp));
}
#[test]
fn fetch_response_carries_snapshot_id() {
let resp = PeerResponse::Fetch {
leader_id: 1,
leader_epoch: 4,
diverging: None,
snapshot_id: Some((42, 3)),
hwm: 0,
records: Bytes::new(),
};
assert!(PeerResponse::decode_fetch(&resp.encode()) == Some(resp));
}
#[test]
fn fetch_snapshot_request_round_trips() {
let req = PeerRequest::FetchSnapshot {
from: 2,
snapshot_id: (42, 3),
position: 128,
max_bytes: 4096,
};
assert!(decode_fetch_snapshot(&req.encode()) == Some(req));
}
#[test]
fn fetch_snapshot_response_round_trips() {
let resp = PeerResponse::FetchSnapshot {
snapshot_id: (42, 3),
size: 9,
position: 0,
bytes: Bytes::from_static(b"snapshotX"),
error_code: 0,
};
assert!(PeerResponse::decode_fetch_snapshot(&resp.encode()) == Some(resp));
}
#[test]
fn fetch_response_round_trips() {
let with_records = PeerResponse::Fetch {
leader_id: 2,
leader_epoch: 5,
diverging: None,
snapshot_id: None,
hwm: 7,
records: Bytes::from_static(b"\x01\x02\x03"),
};
assert!(PeerResponse::decode_fetch(&with_records.encode()) == Some(with_records));
let diverged = PeerResponse::Fetch {
leader_id: 2,
leader_epoch: 5,
diverging: Some(LogOffsetMetadata {
offset: 5,
epoch: 1,
}),
snapshot_id: None,
hwm: 0,
records: Bytes::new(),
};
assert!(PeerResponse::decode_fetch(&diverged.encode()) == Some(diverged));
}
#[test]
fn fetch_wire_carries_metadata_topic_id() {
use crabka_protocol::Decode;
use crabka_protocol::owned::fetch_request::FetchRequest;
use crabka_protocol::owned::fetch_response::FetchResponse;
let req = PeerRequest::Fetch {
from: 2,
fetch_epoch: 1,
fetch_offset: 5,
};
let mut c = &req.encode()[..];
let dreq = FetchRequest::decode(&mut c, FETCH_VERSION).unwrap();
assert!(dreq.topics[0].topic_id == METADATA_TOPIC_ID);
let resp = PeerResponse::Fetch {
leader_id: 1,
leader_epoch: 4,
diverging: None,
snapshot_id: None,
hwm: 0,
records: Bytes::new(),
};
let mut c2 = &resp.encode()[..];
let dresp = FetchResponse::decode(&mut c2, FETCH_VERSION).unwrap();
assert!(dresp.responses[0].topic_id == METADATA_TOPIC_ID);
}
}
}