use bytes::{Bytes, BytesMut};
use futures_util::future::BoxFuture;
use crabka_protocol::owned::fetch_snapshot_request::FetchSnapshotRequest;
use crabka_protocol::owned::fetch_snapshot_response::{
FetchSnapshotResponse, LeaderIdAndEpoch, PartitionSnapshot, SnapshotId, TopicSnapshot,
};
use crabka_protocol::records::RecordsPayload;
use crabka_protocol::{Decode, Encode};
use crabka_raft::SnapshotRange;
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
const CLUSTER_METADATA_TOPIC: &str = "__cluster_metadata";
fn err_partition(index: i32, error_code: i16) -> PartitionSnapshot {
PartitionSnapshot {
index,
error_code,
snapshot_id: SnapshotId::default(),
size: 0,
position: 0,
unaligned_records: RecordsPayload::default(),
current_leader: LeaderIdAndEpoch::default(),
..Default::default()
}
}
pub(crate) fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
) -> BoxFuture<'static, Result<Bytes, BrokerError>> {
let req_bytes = req_bytes.to_vec();
let controller = broker.controller.clone();
Box::pin(async move {
let mut cur: &[u8] = &req_bytes;
let req = FetchSnapshotRequest::decode(&mut cur, version)?;
let max_bytes = req.max_bytes;
let resolve =
|position: i64, _max: i32| controller.read_snapshot_range(position, max_bytes);
let local_cluster_id = controller.current_image().cluster_id();
let resp = build_response(local_cluster_id, &req, &resolve);
let mut out = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut out, version)?;
Ok(out.freeze())
})
}
fn build_response(
local_cluster_id: uuid::Uuid,
req: &FetchSnapshotRequest,
resolve: &dyn Fn(i64, i32) -> SnapshotRange,
) -> FetchSnapshotResponse {
if let Some(s) = req.cluster_id.as_deref()
&& s != local_cluster_id.to_string()
{
return FetchSnapshotResponse {
throttle_time_ms: 0,
error_code: codes::INCONSISTENT_CLUSTER_ID,
topics: Vec::new(),
node_endpoints: Vec::new(),
..Default::default()
};
}
let topics = req
.topics
.iter()
.map(|topic| {
let is_metadata = topic.name == CLUSTER_METADATA_TOPIC;
let partitions = topic
.partitions
.iter()
.map(|part| {
if !is_metadata || part.partition != 0 {
return err_partition(part.partition, codes::INVALID_TOPIC_EXCEPTION);
}
match resolve(part.position, req.max_bytes) {
SnapshotRange::NoSnapshot => err_partition(0, codes::SNAPSHOT_NOT_FOUND),
SnapshotRange::OutOfRange => err_partition(0, codes::POSITION_OUT_OF_RANGE),
SnapshotRange::Slice(slice) => PartitionSnapshot {
index: 0,
error_code: codes::NONE,
snapshot_id: SnapshotId {
end_offset: slice.end_offset,
epoch: slice.epoch,
..Default::default()
},
size: slice.total_size,
position: part.position,
unaligned_records: RecordsPayload::Legacy(slice.bytes),
current_leader: LeaderIdAndEpoch::default(),
..Default::default()
},
}
})
.collect();
TopicSnapshot {
name: topic.name.clone(),
partitions,
..Default::default()
}
})
.collect();
FetchSnapshotResponse {
throttle_time_ms: 0,
error_code: codes::NONE,
topics,
node_endpoints: Vec::new(),
..Default::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crabka_raft::SnapshotSlice;
#[test]
fn build_response_serves_requested_range() {
use crabka_protocol::owned::fetch_snapshot_request::{
FetchSnapshotRequest, PartitionSnapshot as ReqPartition, SnapshotId as ReqSnapshotId,
TopicSnapshot as ReqTopic,
};
use uuid::Uuid;
let cid = Uuid::from_u128(7);
let req = FetchSnapshotRequest {
replica_id: -1,
max_bytes: 1024,
topics: vec![ReqTopic {
name: CLUSTER_METADATA_TOPIC.into(),
partitions: vec![ReqPartition {
partition: 0,
current_leader_epoch: 0,
snapshot_id: ReqSnapshotId {
end_offset: 6,
epoch: 1,
..Default::default()
},
position: 0,
..Default::default()
}],
..Default::default()
}],
cluster_id: None,
..Default::default()
};
let resolve = |_pos: i64, _max: i32| {
SnapshotRange::Slice(SnapshotSlice {
end_offset: 6,
epoch: 1,
total_size: 100,
bytes: bytes::Bytes::from_static(b"abc"),
})
};
let resp = build_response(cid, &req, &resolve);
let part = &resp.topics[0].partitions[0];
assert!(resp.error_code == 0);
assert!(part.error_code == 0);
assert!(part.snapshot_id.end_offset == 6);
assert!(part.snapshot_id.epoch == 1);
assert!(part.size == 100);
assert!(part.position == 0);
let mut buf = bytes::BytesMut::new();
part.unaligned_records.encode_to(&mut buf).unwrap();
assert!(&buf[..] == b"abc");
}
#[test]
fn build_response_rejects_cluster_id_mismatch() {
use crabka_protocol::owned::fetch_snapshot_request::{
FetchSnapshotRequest, PartitionSnapshot as ReqPartition, SnapshotId as ReqSnapshotId,
TopicSnapshot as ReqTopic,
};
use uuid::Uuid;
let cid = Uuid::from_u128(7);
let req = FetchSnapshotRequest {
replica_id: -1,
max_bytes: 1024,
topics: vec![ReqTopic {
name: CLUSTER_METADATA_TOPIC.into(),
partitions: vec![ReqPartition {
partition: 0,
current_leader_epoch: 0,
snapshot_id: ReqSnapshotId {
end_offset: 6,
epoch: 1,
..Default::default()
},
position: 0,
..Default::default()
}],
..Default::default()
}],
cluster_id: Some("different".into()),
..Default::default()
};
let resolve = |_pos: i64, _max: i32| SnapshotRange::NoSnapshot;
let resp = build_response(cid, &req, &resolve);
assert!(resp.error_code == codes::INCONSISTENT_CLUSTER_ID);
assert!(resp.topics.is_empty());
}
#[test]
fn build_response_position_past_end_returns_out_of_range() {
use crabka_protocol::owned::fetch_snapshot_request::{
FetchSnapshotRequest, PartitionSnapshot as ReqPartition, TopicSnapshot as ReqTopic,
};
use uuid::Uuid;
let cid = Uuid::from_u128(7);
let req = FetchSnapshotRequest {
replica_id: -1,
max_bytes: 1024,
topics: vec![ReqTopic {
name: CLUSTER_METADATA_TOPIC.into(),
partitions: vec![ReqPartition {
partition: 0,
position: 9_999,
..Default::default()
}],
..Default::default()
}],
cluster_id: None,
..Default::default()
};
let resolve = |_pos: i64, _max: i32| SnapshotRange::OutOfRange;
let resp = build_response(cid, &req, &resolve);
let part = &resp.topics[0].partitions[0];
assert!(resp.error_code == codes::NONE);
assert!(part.error_code == codes::POSITION_OUT_OF_RANGE);
}
}