1use bytes::{Bytes, BytesMut};
6
7#[derive(Debug, Clone, Copy)]
9pub enum AutoOffsetReset {
10 Earliest,
12 Latest,
15 None,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum IsolationLevel {
29 ReadUncommitted,
32 ReadCommitted,
35}
36
37impl IsolationLevel {
38 pub(crate) fn wire(self) -> i8 {
40 match self {
41 IsolationLevel::ReadUncommitted => 0,
42 IsolationLevel::ReadCommitted => 1,
43 }
44 }
45}
46
47use crabka_protocol::owned::consumer_protocol_assignment::{
50 ConsumerProtocolAssignment, TopicPartition as AssignTopicPartition,
51};
52use crabka_protocol::owned::consumer_protocol_subscription::{
53 ConsumerProtocolSubscription, TopicPartition as SubTopicPartition,
54};
55use crabka_protocol::{Decode, Encode, UnknownTaggedFields};
56
57const SUBSCRIPTION_WIRE_VERSION: i16 = 3;
58const ASSIGNMENT_WIRE_VERSION: i16 = 3;
59
60pub(crate) struct DecodedSubscription {
61 pub topics: Vec<String>,
62 pub owned: Vec<(String, i32)>,
63 pub generation_id: i32,
64 #[allow(dead_code)]
68 pub rack_id: Option<String>,
69}
70
71fn group_by_topic(pairs: &[(String, i32)]) -> std::collections::BTreeMap<&str, Vec<i32>> {
72 let mut by_topic: std::collections::BTreeMap<&str, Vec<i32>> =
73 std::collections::BTreeMap::new();
74 for (t, p) in pairs {
75 by_topic.entry(t.as_str()).or_default().push(*p);
76 }
77 by_topic
78}
79
80fn peek_version(bytes: &[u8]) -> i16 {
81 if bytes.len() < 2 {
82 return 0;
83 }
84 i16::from_be_bytes([bytes[0], bytes[1]])
85}
86
87pub(crate) fn encode_subscription(
88 topics: &[String],
89 owned: &[(String, i32)],
90 generation_id: i32,
91 rack_id: Option<&str>,
92) -> Bytes {
93 use bytes::BufMut;
94 let owned_partitions: Vec<SubTopicPartition> = group_by_topic(owned)
95 .into_iter()
96 .map(|(topic, partitions)| SubTopicPartition {
97 topic: topic.to_string(),
98 partitions,
99 unknown_tagged_fields: UnknownTaggedFields::default(),
100 })
101 .collect();
102 let msg = ConsumerProtocolSubscription {
103 topics: topics.to_vec(),
104 user_data: None,
105 owned_partitions,
106 generation_id,
107 rack_id: rack_id.map(str::to_string),
108 unknown_tagged_fields: UnknownTaggedFields::default(),
109 };
110 let mut buf = BytesMut::with_capacity(2 + msg.encoded_len(SUBSCRIPTION_WIRE_VERSION));
111 buf.put_i16(SUBSCRIPTION_WIRE_VERSION);
112 msg.encode(&mut buf, SUBSCRIPTION_WIRE_VERSION)
113 .expect("ConsumerProtocolSubscription encode");
114 buf.freeze()
115}
116
117pub(crate) fn decode_subscription(bytes: &[u8]) -> DecodedSubscription {
118 if bytes.len() < 2 {
119 return DecodedSubscription {
120 topics: Vec::new(),
121 owned: Vec::new(),
122 generation_id: -1,
123 rack_id: None,
124 };
125 }
126 let version = peek_version(bytes).clamp(0, SUBSCRIPTION_WIRE_VERSION);
127 let mut cur = &bytes[2..];
128 let Ok(msg) = ConsumerProtocolSubscription::decode(&mut cur, version) else {
129 return DecodedSubscription {
130 topics: Vec::new(),
131 owned: Vec::new(),
132 generation_id: -1,
133 rack_id: None,
134 };
135 };
136 let mut owned = Vec::new();
137 for tp in msg.owned_partitions {
138 for p in tp.partitions {
139 owned.push((tp.topic.clone(), p));
140 }
141 }
142 DecodedSubscription {
143 topics: msg.topics,
144 owned,
145 generation_id: msg.generation_id,
146 rack_id: msg.rack_id,
147 }
148}
149
150pub(crate) fn encode_assignment(partitions: &[(String, i32)]) -> Bytes {
151 use bytes::BufMut;
152 let assigned_partitions: Vec<AssignTopicPartition> = group_by_topic(partitions)
153 .into_iter()
154 .map(|(topic, partitions)| AssignTopicPartition {
155 topic: topic.to_string(),
156 partitions,
157 unknown_tagged_fields: UnknownTaggedFields::default(),
158 })
159 .collect();
160 let msg = ConsumerProtocolAssignment {
161 assigned_partitions,
162 user_data: None,
163 unknown_tagged_fields: UnknownTaggedFields::default(),
164 };
165 let mut buf = BytesMut::with_capacity(2 + msg.encoded_len(ASSIGNMENT_WIRE_VERSION));
166 buf.put_i16(ASSIGNMENT_WIRE_VERSION);
167 msg.encode(&mut buf, ASSIGNMENT_WIRE_VERSION)
168 .expect("ConsumerProtocolAssignment encode");
169 buf.freeze()
170}
171
172pub(crate) fn decode_assignment(bytes: &[u8]) -> Vec<(String, i32)> {
173 if bytes.len() < 2 {
174 return Vec::new();
175 }
176 let version = peek_version(bytes).clamp(0, ASSIGNMENT_WIRE_VERSION);
177 let mut cur = &bytes[2..];
178 let Ok(msg) = ConsumerProtocolAssignment::decode(&mut cur, version) else {
179 return Vec::new();
180 };
181 let mut out = Vec::new();
182 for tp in msg.assigned_partitions {
183 for p in tp.partitions {
184 out.push((tp.topic.clone(), p));
185 }
186 }
187 out
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193 use assert2::assert;
194
195 #[test]
196 fn subscription_round_trip() {
197 let s = encode_subscription(&["t1".into(), "t2".into()], &[], -1, None);
198 let decoded = decode_subscription(&s);
199 assert!(decoded.topics == vec!["t1", "t2"]);
200 }
201
202 #[test]
203 fn subscription_empty_round_trip() {
204 let s = encode_subscription(&[], &[], -1, None);
205 let decoded = decode_subscription(&s);
206 assert!(decoded.topics.is_empty());
207 assert!(decoded.owned.is_empty());
208 assert!(decoded.generation_id == -1);
209 assert!(decoded.rack_id == None);
210 }
211
212 #[test]
213 fn subscription_v3_owned_partitions_round_trip() {
214 let owned = vec![("t".into(), 0), ("t".into(), 1), ("u".into(), 0)];
215 let s = encode_subscription(&["t".into(), "u".into()], &owned, -1, None);
216 let decoded = decode_subscription(&s);
217 let mut got = decoded.owned.clone();
218 got.sort();
219 let mut want = owned.clone();
220 want.sort();
221 assert!(got == want);
222 }
223
224 #[test]
225 fn subscription_v3_generation_and_rack_round_trip() {
226 let s = encode_subscription(&["t".into()], &[], 42, Some("rack-a"));
227 let decoded = decode_subscription(&s);
228 assert!(decoded.generation_id == 42);
229 assert!(decoded.rack_id.as_deref() == Some("rack-a"));
230 }
231
232 #[test]
233 fn subscription_decodes_v1_payload() {
234 use bytes::BufMut;
235 let mut buf = BytesMut::new();
236 buf.put_i16(1);
237 buf.put_i32(1);
238 let t = "t1";
239 buf.put_i16(i16::try_from(t.len()).unwrap());
240 buf.put_slice(t.as_bytes());
241 buf.put_i32(-1); buf.put_i32(0); let payload = buf.freeze();
244 let decoded = decode_subscription(&payload);
245 assert!(decoded.topics == vec!["t1"]);
246 assert!(decoded.owned.is_empty());
247 assert!(decoded.generation_id == -1);
248 assert!(decoded.rack_id == None);
249 }
250
251 #[test]
252 fn assignment_round_trip() {
253 let s = encode_assignment(&[("t".into(), 0), ("t".into(), 1), ("u".into(), 0)]);
254 let decoded = decode_assignment(&s);
255 assert!(decoded.contains(&("t".into(), 0)));
256 assert!(decoded.contains(&("t".into(), 1)));
257 assert!(decoded.contains(&("u".into(), 0)));
258 assert!(decoded.len() == 3);
259 }
260
261 #[test]
262 fn assignment_empty_round_trip() {
263 let s = encode_assignment(&[]);
264 let decoded = decode_assignment(&s);
265 assert!(decoded.is_empty());
266 }
267}