use std::collections::HashMap;
use crabka_protocol::owned::offset_commit_request::{
OffsetCommitRequestPartition, OffsetCommitRequestTopic,
};
use crabka_protocol::owned::offset_fetch_request::{
OffsetFetchRequest, OffsetFetchRequestGroup, OffsetFetchRequestTopic, OffsetFetchRequestTopics,
};
use crabka_protocol::owned::offset_fetch_response::OffsetFetchResponse;
use crabka_protocol::primitives::uuid::Uuid as WireUuid;
pub(crate) fn build_offset_fetch(
group_id: &str,
by_topic: &HashMap<String, Vec<i32>>,
topic_ids: &HashMap<String, WireUuid>,
) -> OffsetFetchRequest {
let legacy_topics: Vec<OffsetFetchRequestTopic> = by_topic
.iter()
.map(|(name, parts)| OffsetFetchRequestTopic {
name: name.clone(),
partition_indexes: parts.clone(),
..Default::default()
})
.collect();
let group_topics: Vec<OffsetFetchRequestTopics> = by_topic
.iter()
.map(|(name, parts)| OffsetFetchRequestTopics {
name: name.clone(),
topic_id: topic_ids.get(name).copied().unwrap_or_default(),
partition_indexes: parts.clone(),
..Default::default()
})
.collect();
OffsetFetchRequest {
group_id: group_id.to_string(),
topics: Some(legacy_topics),
groups: vec![OffsetFetchRequestGroup {
group_id: group_id.to_string(),
topics: Some(group_topics),
..Default::default()
}],
..Default::default()
}
}
pub(crate) fn parse_offset_fetch(
resp: &OffsetFetchResponse,
id_to_name: &HashMap<WireUuid, String>,
) -> Vec<(String, i32, i64, i32)> {
let mut out = Vec::new();
if resp.groups.is_empty() {
for t in &resp.topics {
for p in &t.partitions {
out.push((
t.name.clone(),
p.partition_index,
p.committed_offset,
p.committed_leader_epoch,
));
}
}
} else {
for g in &resp.groups {
for t in &g.topics {
let name = if t.name.is_empty() {
id_to_name.get(&t.topic_id).cloned().unwrap_or_default()
} else {
t.name.clone()
};
for p in &t.partitions {
out.push((
name.clone(),
p.partition_index,
p.committed_offset,
p.committed_leader_epoch,
));
}
}
}
}
out
}
pub(crate) fn build_commit_topics(
offsets: HashMap<(String, i32), (i64, i32)>,
topic_ids: &HashMap<String, WireUuid>,
) -> Vec<OffsetCommitRequestTopic> {
let mut by_topic: HashMap<String, Vec<(i32, i64, i32)>> = HashMap::new();
for ((t, p), (off, epoch)) in offsets {
by_topic.entry(t).or_default().push((p, off, epoch));
}
by_topic
.into_iter()
.map(|(name, parts)| OffsetCommitRequestTopic {
topic_id: topic_ids.get(&name).copied().unwrap_or_default(),
name,
partitions: parts
.into_iter()
.map(|(p, off, epoch)| OffsetCommitRequestPartition {
partition_index: p,
committed_offset: off,
committed_leader_epoch: epoch,
committed_metadata: Some(String::new()),
..Default::default()
})
.collect(),
..Default::default()
})
.collect()
}
pub(crate) fn id_to_name(topic_ids: &HashMap<String, WireUuid>) -> HashMap<WireUuid, String> {
topic_ids.iter().map(|(n, id)| (*id, n.clone())).collect()
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crabka_protocol::owned::offset_fetch_response::{
OffsetFetchResponse, OffsetFetchResponseGroup, OffsetFetchResponsePartition,
OffsetFetchResponsePartitions, OffsetFetchResponseTopic, OffsetFetchResponseTopics,
};
fn id(n: u8) -> WireUuid {
let mut b = [0u8; 16];
b[15] = n;
WireUuid(b)
}
#[test]
fn build_offset_fetch_populates_legacy_and_groups() {
let mut by_topic = HashMap::new();
by_topic.insert("t".to_string(), vec![0, 1]);
let mut ids = HashMap::new();
ids.insert("t".to_string(), id(7));
let req = build_offset_fetch("g", &by_topic, &ids);
assert!(req.group_id == "g");
let legacy = req.topics.as_ref().expect("legacy topics");
assert!(legacy.len() == 1 && legacy[0].name == "t");
assert!(req.groups.len() == 1 && req.groups[0].group_id == "g");
let gtops = req.groups[0].topics.as_ref().expect("group topics");
assert!(gtops[0].name == "t" && gtops[0].topic_id == id(7));
}
#[test]
fn build_offset_fetch_defaults_topic_id_when_unknown() {
let mut by_topic = HashMap::new();
by_topic.insert("t".to_string(), vec![0]);
let req = build_offset_fetch("g", &by_topic, &HashMap::new());
let gtops = req.groups[0].topics.as_ref().unwrap();
assert!(gtops[0].topic_id == WireUuid::ZERO);
}
#[test]
fn parse_offset_fetch_reads_groups_with_name() {
let resp = OffsetFetchResponse {
groups: vec![OffsetFetchResponseGroup {
group_id: "g".into(),
topics: vec![OffsetFetchResponseTopics {
name: "t".into(),
topic_id: id(7),
partitions: vec![OffsetFetchResponsePartitions {
partition_index: 3,
committed_offset: 42,
..Default::default()
}],
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
let out = parse_offset_fetch(&resp, &HashMap::new());
assert!(out == vec![("t".to_string(), 3, 42, -1)]);
}
#[test]
fn parse_offset_fetch_resolves_topic_id_when_name_empty() {
let resp = OffsetFetchResponse {
groups: vec![OffsetFetchResponseGroup {
group_id: "g".into(),
topics: vec![OffsetFetchResponseTopics {
name: String::new(),
topic_id: id(9),
partitions: vec![OffsetFetchResponsePartitions {
partition_index: 0,
committed_offset: 5,
..Default::default()
}],
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
let mut id_map = HashMap::new();
id_map.insert(id(9), "named".to_string());
let out = parse_offset_fetch(&resp, &id_map);
assert!(out == vec![("named".to_string(), 0, 5, -1)]);
}
#[test]
fn parse_offset_fetch_falls_back_to_legacy_topics() {
let resp = OffsetFetchResponse {
topics: vec![OffsetFetchResponseTopic {
name: "legacy".into(),
partitions: vec![OffsetFetchResponsePartition {
partition_index: 1,
committed_offset: 11,
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
let out = parse_offset_fetch(&resp, &HashMap::new());
assert!(out == vec![("legacy".to_string(), 1, 11, -1)]);
}
#[test]
fn build_commit_topics_tags_topic_id() {
let mut offsets = HashMap::new();
offsets.insert(("t".to_string(), 0), (100, 5));
let mut ids = HashMap::new();
ids.insert("t".to_string(), id(7));
let topics = build_commit_topics(offsets, &ids);
assert!(topics.len() == 1);
assert!(topics[0].name == "t" && topics[0].topic_id == id(7));
assert!(topics[0].partitions[0].committed_offset == 100);
assert!(topics[0].partitions[0].committed_leader_epoch == 5);
let mut o2 = HashMap::new();
o2.insert(("u".to_string(), 0), (1, -1));
let t2 = build_commit_topics(o2, &HashMap::new());
assert!(t2[0].topic_id == WireUuid::ZERO);
}
#[test]
fn id_to_name_inverts_the_map() {
let mut ids = HashMap::new();
ids.insert("t".to_string(), id(7));
let inv = id_to_name(&ids);
assert!(inv.get(&id(7)) == Some(&"t".to_string()));
}
}