use bytes::{Bytes, BytesMut};
#[derive(Debug, Clone, Copy)]
pub enum AutoOffsetReset {
Earliest,
Latest,
None,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
}
impl IsolationLevel {
pub(crate) fn wire(self) -> i8 {
match self {
IsolationLevel::ReadUncommitted => 0,
IsolationLevel::ReadCommitted => 1,
}
}
}
use crabka_protocol::owned::consumer_protocol_assignment::{
ConsumerProtocolAssignment, TopicPartition as AssignTopicPartition,
};
use crabka_protocol::owned::consumer_protocol_subscription::{
ConsumerProtocolSubscription, TopicPartition as SubTopicPartition,
};
use crabka_protocol::{Decode, Encode, UnknownTaggedFields};
const SUBSCRIPTION_WIRE_VERSION: i16 = 3;
const ASSIGNMENT_WIRE_VERSION: i16 = 3;
pub(crate) struct DecodedSubscription {
pub topics: Vec<String>,
pub owned: Vec<(String, i32)>,
pub generation_id: i32,
#[allow(dead_code)]
pub rack_id: Option<String>,
}
fn group_by_topic(pairs: &[(String, i32)]) -> std::collections::BTreeMap<&str, Vec<i32>> {
let mut by_topic: std::collections::BTreeMap<&str, Vec<i32>> =
std::collections::BTreeMap::new();
for (t, p) in pairs {
by_topic.entry(t.as_str()).or_default().push(*p);
}
by_topic
}
fn peek_version(bytes: &[u8]) -> i16 {
if bytes.len() < 2 {
return 0;
}
i16::from_be_bytes([bytes[0], bytes[1]])
}
pub(crate) fn encode_subscription(
topics: &[String],
owned: &[(String, i32)],
generation_id: i32,
rack_id: Option<&str>,
) -> Bytes {
use bytes::BufMut;
let owned_partitions: Vec<SubTopicPartition> = group_by_topic(owned)
.into_iter()
.map(|(topic, partitions)| SubTopicPartition {
topic: topic.to_string(),
partitions,
unknown_tagged_fields: UnknownTaggedFields::default(),
})
.collect();
let msg = ConsumerProtocolSubscription {
topics: topics.to_vec(),
user_data: None,
owned_partitions,
generation_id,
rack_id: rack_id.map(str::to_string),
unknown_tagged_fields: UnknownTaggedFields::default(),
};
let mut buf = BytesMut::with_capacity(2 + msg.encoded_len(SUBSCRIPTION_WIRE_VERSION));
buf.put_i16(SUBSCRIPTION_WIRE_VERSION);
msg.encode(&mut buf, SUBSCRIPTION_WIRE_VERSION)
.expect("ConsumerProtocolSubscription encode");
buf.freeze()
}
pub(crate) fn decode_subscription(bytes: &[u8]) -> DecodedSubscription {
if bytes.len() < 2 {
return DecodedSubscription {
topics: Vec::new(),
owned: Vec::new(),
generation_id: -1,
rack_id: None,
};
}
let version = peek_version(bytes).clamp(0, SUBSCRIPTION_WIRE_VERSION);
let mut cur = &bytes[2..];
let Ok(msg) = ConsumerProtocolSubscription::decode(&mut cur, version) else {
return DecodedSubscription {
topics: Vec::new(),
owned: Vec::new(),
generation_id: -1,
rack_id: None,
};
};
let mut owned = Vec::new();
for tp in msg.owned_partitions {
for p in tp.partitions {
owned.push((tp.topic.clone(), p));
}
}
DecodedSubscription {
topics: msg.topics,
owned,
generation_id: msg.generation_id,
rack_id: msg.rack_id,
}
}
pub(crate) fn encode_assignment(partitions: &[(String, i32)]) -> Bytes {
use bytes::BufMut;
let assigned_partitions: Vec<AssignTopicPartition> = group_by_topic(partitions)
.into_iter()
.map(|(topic, partitions)| AssignTopicPartition {
topic: topic.to_string(),
partitions,
unknown_tagged_fields: UnknownTaggedFields::default(),
})
.collect();
let msg = ConsumerProtocolAssignment {
assigned_partitions,
user_data: None,
unknown_tagged_fields: UnknownTaggedFields::default(),
};
let mut buf = BytesMut::with_capacity(2 + msg.encoded_len(ASSIGNMENT_WIRE_VERSION));
buf.put_i16(ASSIGNMENT_WIRE_VERSION);
msg.encode(&mut buf, ASSIGNMENT_WIRE_VERSION)
.expect("ConsumerProtocolAssignment encode");
buf.freeze()
}
pub(crate) fn decode_assignment(bytes: &[u8]) -> Vec<(String, i32)> {
if bytes.len() < 2 {
return Vec::new();
}
let version = peek_version(bytes).clamp(0, ASSIGNMENT_WIRE_VERSION);
let mut cur = &bytes[2..];
let Ok(msg) = ConsumerProtocolAssignment::decode(&mut cur, version) else {
return Vec::new();
};
let mut out = Vec::new();
for tp in msg.assigned_partitions {
for p in tp.partitions {
out.push((tp.topic.clone(), p));
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn subscription_round_trip() {
let s = encode_subscription(&["t1".into(), "t2".into()], &[], -1, None);
let decoded = decode_subscription(&s);
assert!(decoded.topics == vec!["t1", "t2"]);
}
#[test]
fn subscription_empty_round_trip() {
let s = encode_subscription(&[], &[], -1, None);
let decoded = decode_subscription(&s);
assert!(decoded.topics.is_empty());
assert!(decoded.owned.is_empty());
assert!(decoded.generation_id == -1);
assert!(decoded.rack_id == None);
}
#[test]
fn subscription_v3_owned_partitions_round_trip() {
let owned = vec![("t".into(), 0), ("t".into(), 1), ("u".into(), 0)];
let s = encode_subscription(&["t".into(), "u".into()], &owned, -1, None);
let decoded = decode_subscription(&s);
let mut got = decoded.owned.clone();
got.sort();
let mut want = owned.clone();
want.sort();
assert!(got == want);
}
#[test]
fn subscription_v3_generation_and_rack_round_trip() {
let s = encode_subscription(&["t".into()], &[], 42, Some("rack-a"));
let decoded = decode_subscription(&s);
assert!(decoded.generation_id == 42);
assert!(decoded.rack_id.as_deref() == Some("rack-a"));
}
#[test]
fn subscription_decodes_v1_payload() {
use bytes::BufMut;
let mut buf = BytesMut::new();
buf.put_i16(1);
buf.put_i32(1);
let t = "t1";
buf.put_i16(i16::try_from(t.len()).unwrap());
buf.put_slice(t.as_bytes());
buf.put_i32(-1); buf.put_i32(0); let payload = buf.freeze();
let decoded = decode_subscription(&payload);
assert!(decoded.topics == vec!["t1"]);
assert!(decoded.owned.is_empty());
assert!(decoded.generation_id == -1);
assert!(decoded.rack_id == None);
}
#[test]
fn assignment_round_trip() {
let s = encode_assignment(&[("t".into(), 0), ("t".into(), 1), ("u".into(), 0)]);
let decoded = decode_assignment(&s);
assert!(decoded.contains(&("t".into(), 0)));
assert!(decoded.contains(&("t".into(), 1)));
assert!(decoded.contains(&("u".into(), 0)));
assert!(decoded.len() == 3);
}
#[test]
fn assignment_empty_round_trip() {
let s = encode_assignment(&[]);
let decoded = decode_assignment(&s);
assert!(decoded.is_empty());
}
}