Skip to main content

crabka_client_consumer/
builder.rs

1//! Codec helpers for `ConsumerProtocol` subscription / assignment payloads,
2//! and the [`AutoOffsetReset`] / [`IsolationLevel`] enums used by
3//! [`Consumer::builder`].
4
5use bytes::{Bytes, BytesMut};
6
7/// What to do when a partition has no committed offset.
8#[derive(Debug, Clone, Copy)]
9pub enum AutoOffsetReset {
10    /// Start from offset 0.
11    Earliest,
12    /// Start from the log-end offset. Resolved lazily by `Consumer::poll`
13    /// using `ListOffsets(timestamp=-1)`.
14    Latest,
15    /// Do not reset automatically. On a missing offset or detected truncation,
16    /// `poll` returns `ConsumerError::LogTruncation` / surfaces the error.
17    None,
18}
19
20/// Controls which records are visible to this consumer.
21///
22/// Maps to Kafka's `isolation.level` configuration and the `isolation_level`
23/// field in the `Fetch` request (wire value: `i8`).
24///
25/// The default is [`ReadUncommitted`](IsolationLevel::ReadUncommitted) for
26/// backward compatibility.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum IsolationLevel {
29    /// All records are visible, including those from open or aborted
30    /// transactions. Equivalent to `isolation.level=read_uncommitted`.
31    ReadUncommitted,
32    /// Only records from committed transactions (and non-transactional
33    /// records) are visible. Equivalent to `isolation.level=read_committed`.
34    ReadCommitted,
35}
36
37impl IsolationLevel {
38    /// Returns the wire encoding used in the `Fetch` request (`i8`).
39    pub(crate) fn wire(self) -> i8 {
40        match self {
41            IsolationLevel::ReadUncommitted => 0,
42            IsolationLevel::ReadCommitted => 1,
43        }
44    }
45}
46
47// ── subscription / assignment codec (ConsumerProtocol v3) ─────────────────
48
49use crabka_protocol::owned::consumer_protocol_assignment::{
50    ConsumerProtocolAssignment, TopicPartition as AssignTopicPartition,
51};
52use crabka_protocol::owned::consumer_protocol_subscription::{
53    ConsumerProtocolSubscription, TopicPartition as SubTopicPartition,
54};
55use crabka_protocol::{Decode, Encode, UnknownTaggedFields};
56
57const SUBSCRIPTION_WIRE_VERSION: i16 = 3;
58const ASSIGNMENT_WIRE_VERSION: i16 = 3;
59
60pub(crate) struct DecodedSubscription {
61    pub topics: Vec<String>,
62    pub owned: Vec<(String, i32)>,
63    pub generation_id: i32,
64    // Part of the ConsumerProtocolSubscription v3 wire surface; kept here
65    // for symmetry with the wire form and round-trip tests, even though
66    // the coordinator does not currently consult it for assignment.
67    #[allow(dead_code)]
68    pub rack_id: Option<String>,
69}
70
71fn group_by_topic(pairs: &[(String, i32)]) -> std::collections::BTreeMap<&str, Vec<i32>> {
72    let mut by_topic: std::collections::BTreeMap<&str, Vec<i32>> =
73        std::collections::BTreeMap::new();
74    for (t, p) in pairs {
75        by_topic.entry(t.as_str()).or_default().push(*p);
76    }
77    by_topic
78}
79
80fn peek_version(bytes: &[u8]) -> i16 {
81    if bytes.len() < 2 {
82        return 0;
83    }
84    i16::from_be_bytes([bytes[0], bytes[1]])
85}
86
87pub(crate) fn encode_subscription(
88    topics: &[String],
89    owned: &[(String, i32)],
90    generation_id: i32,
91    rack_id: Option<&str>,
92) -> Bytes {
93    use bytes::BufMut;
94    let owned_partitions: Vec<SubTopicPartition> = group_by_topic(owned)
95        .into_iter()
96        .map(|(topic, partitions)| SubTopicPartition {
97            topic: topic.to_string(),
98            partitions,
99            unknown_tagged_fields: UnknownTaggedFields::default(),
100        })
101        .collect();
102    let msg = ConsumerProtocolSubscription {
103        topics: topics.to_vec(),
104        user_data: None,
105        owned_partitions,
106        generation_id,
107        rack_id: rack_id.map(str::to_string),
108        unknown_tagged_fields: UnknownTaggedFields::default(),
109    };
110    let mut buf = BytesMut::with_capacity(2 + msg.encoded_len(SUBSCRIPTION_WIRE_VERSION));
111    buf.put_i16(SUBSCRIPTION_WIRE_VERSION);
112    msg.encode(&mut buf, SUBSCRIPTION_WIRE_VERSION)
113        .expect("ConsumerProtocolSubscription encode");
114    buf.freeze()
115}
116
117pub(crate) fn decode_subscription(bytes: &[u8]) -> DecodedSubscription {
118    if bytes.len() < 2 {
119        return DecodedSubscription {
120            topics: Vec::new(),
121            owned: Vec::new(),
122            generation_id: -1,
123            rack_id: None,
124        };
125    }
126    let version = peek_version(bytes).clamp(0, SUBSCRIPTION_WIRE_VERSION);
127    let mut cur = &bytes[2..];
128    let Ok(msg) = ConsumerProtocolSubscription::decode(&mut cur, version) else {
129        return DecodedSubscription {
130            topics: Vec::new(),
131            owned: Vec::new(),
132            generation_id: -1,
133            rack_id: None,
134        };
135    };
136    let mut owned = Vec::new();
137    for tp in msg.owned_partitions {
138        for p in tp.partitions {
139            owned.push((tp.topic.clone(), p));
140        }
141    }
142    DecodedSubscription {
143        topics: msg.topics,
144        owned,
145        generation_id: msg.generation_id,
146        rack_id: msg.rack_id,
147    }
148}
149
150pub(crate) fn encode_assignment(partitions: &[(String, i32)]) -> Bytes {
151    use bytes::BufMut;
152    let assigned_partitions: Vec<AssignTopicPartition> = group_by_topic(partitions)
153        .into_iter()
154        .map(|(topic, partitions)| AssignTopicPartition {
155            topic: topic.to_string(),
156            partitions,
157            unknown_tagged_fields: UnknownTaggedFields::default(),
158        })
159        .collect();
160    let msg = ConsumerProtocolAssignment {
161        assigned_partitions,
162        user_data: None,
163        unknown_tagged_fields: UnknownTaggedFields::default(),
164    };
165    let mut buf = BytesMut::with_capacity(2 + msg.encoded_len(ASSIGNMENT_WIRE_VERSION));
166    buf.put_i16(ASSIGNMENT_WIRE_VERSION);
167    msg.encode(&mut buf, ASSIGNMENT_WIRE_VERSION)
168        .expect("ConsumerProtocolAssignment encode");
169    buf.freeze()
170}
171
172pub(crate) fn decode_assignment(bytes: &[u8]) -> Vec<(String, i32)> {
173    if bytes.len() < 2 {
174        return Vec::new();
175    }
176    let version = peek_version(bytes).clamp(0, ASSIGNMENT_WIRE_VERSION);
177    let mut cur = &bytes[2..];
178    let Ok(msg) = ConsumerProtocolAssignment::decode(&mut cur, version) else {
179        return Vec::new();
180    };
181    let mut out = Vec::new();
182    for tp in msg.assigned_partitions {
183        for p in tp.partitions {
184            out.push((tp.topic.clone(), p));
185        }
186    }
187    out
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use assert2::assert;
194
195    #[test]
196    fn subscription_round_trip() {
197        let s = encode_subscription(&["t1".into(), "t2".into()], &[], -1, None);
198        let decoded = decode_subscription(&s);
199        assert!(decoded.topics == vec!["t1", "t2"]);
200    }
201
202    #[test]
203    fn subscription_empty_round_trip() {
204        let s = encode_subscription(&[], &[], -1, None);
205        let decoded = decode_subscription(&s);
206        assert!(decoded.topics.is_empty());
207        assert!(decoded.owned.is_empty());
208        assert!(decoded.generation_id == -1);
209        assert!(decoded.rack_id == None);
210    }
211
212    #[test]
213    fn subscription_v3_owned_partitions_round_trip() {
214        let owned = vec![("t".into(), 0), ("t".into(), 1), ("u".into(), 0)];
215        let s = encode_subscription(&["t".into(), "u".into()], &owned, -1, None);
216        let decoded = decode_subscription(&s);
217        let mut got = decoded.owned.clone();
218        got.sort();
219        let mut want = owned.clone();
220        want.sort();
221        assert!(got == want);
222    }
223
224    #[test]
225    fn subscription_v3_generation_and_rack_round_trip() {
226        let s = encode_subscription(&["t".into()], &[], 42, Some("rack-a"));
227        let decoded = decode_subscription(&s);
228        assert!(decoded.generation_id == 42);
229        assert!(decoded.rack_id.as_deref() == Some("rack-a"));
230    }
231
232    #[test]
233    fn subscription_decodes_v1_payload() {
234        use bytes::BufMut;
235        let mut buf = BytesMut::new();
236        buf.put_i16(1);
237        buf.put_i32(1);
238        let t = "t1";
239        buf.put_i16(i16::try_from(t.len()).unwrap());
240        buf.put_slice(t.as_bytes());
241        buf.put_i32(-1); // user_data null
242        buf.put_i32(0); // owned_partitions empty (v1)
243        let payload = buf.freeze();
244        let decoded = decode_subscription(&payload);
245        assert!(decoded.topics == vec!["t1"]);
246        assert!(decoded.owned.is_empty());
247        assert!(decoded.generation_id == -1);
248        assert!(decoded.rack_id == None);
249    }
250
251    #[test]
252    fn assignment_round_trip() {
253        let s = encode_assignment(&[("t".into(), 0), ("t".into(), 1), ("u".into(), 0)]);
254        let decoded = decode_assignment(&s);
255        assert!(decoded.contains(&("t".into(), 0)));
256        assert!(decoded.contains(&("t".into(), 1)));
257        assert!(decoded.contains(&("u".into(), 0)));
258        assert!(decoded.len() == 3);
259    }
260
261    #[test]
262    fn assignment_empty_round_trip() {
263        let s = encode_assignment(&[]);
264        let decoded = decode_assignment(&s);
265        assert!(decoded.is_empty());
266    }
267}