crabka-client-consumer 0.3.6

Subscribe-style consumer client for Apache Kafka in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
//! `ShareConsumer::poll` + acknowledgement (`ShareFetch` / `ShareAcknowledge`).
//!
//! `poll()` issues one `ShareFetch` over the live assignment. Acquired records
//! are paired with the `acquired_records` ranges the broker returns (so each
//! carries the broker's `delivery_count`), and the ranges are remembered for the
//! next poll's implicit auto-`Accept`.
//!
//! ## Acknowledgement
//!
//! - [`Implicit`](super::types::ShareAckMode::Implicit) (default): the *next*
//!   `poll()` (and `close()`) implicitly `Accept`s every range returned by the
//!   previous `poll()`. Nothing is required of the application.
//! - [`Explicit`](super::types::ShareAckMode::Explicit): the application calls
//!   [`acknowledge`](ShareConsumer::acknowledge) per record; staged acks are
//!   flushed on the next `poll()` (piggybacked) or via
//!   [`commit`](ShareConsumer::commit) (standalone `ShareAcknowledge`).
//!
//! ## Session epoch
//!
//! The broker's share-session cache opens at epoch 0 (storing 1) and then
//! expects each subsequent `ShareFetch` *or* `ShareAcknowledge` to carry the
//! stored epoch, incrementing on each accepted request. We mirror that exactly:
//! send `self.share_session_epoch`, and advance it by one after every successful
//! `ShareFetch` / `ShareAcknowledge` (sequence 0 → 1 → 2 → …). Getting this wrong
//! makes the broker drop the session (`INVALID_SHARE_SESSION_EPOCH`).

use std::collections::HashMap;
use std::time::Duration;

use crabka_protocol::owned::share_acknowledge_request::{
    AcknowledgePartition, AcknowledgeTopic, AcknowledgementBatch as AckAckBatch,
    ShareAcknowledgeRequest,
};
use crabka_protocol::owned::share_fetch_request::{
    AcknowledgementBatch as FetchAckBatch, FetchPartition, FetchTopic, ShareFetchRequest,
};
use crabka_protocol::primitives::uuid::Uuid as WireUuid;

use super::consumer::ShareConsumer;
use super::types::{ShareAckMode, ShareAckType, ShareConsumerRecord};
use crate::error::ConsumerError;

/// `partition_max_bytes` / `max_bytes` budget for a `ShareFetch` (mirrors the
/// classic consumer's 50 MiB fetch budget).
const MAX_BYTES: i32 = 50 * 1024 * 1024;
/// Per-partition byte budget.
const PARTITION_MAX_BYTES: i32 = 1 << 20;
/// Cap on records returned per fetch.
const MAX_RECORDS: i32 = 500;

impl ShareConsumer {
    /// Acquire and return the next batch of records.
    ///
    /// Carries acknowledgements for the previous `poll()` (implicit auto-`Accept`
    /// or drained explicit `acknowledge()` calls) piggybacked on the
    /// `ShareFetch`, then decodes the acquired records — each paired with the
    /// broker-reported `delivery_count` from its `acquired_records` range.
    ///
    /// With no assignment yet, sleeps for `timeout` and returns empty (mirroring
    /// the classic [`Consumer::poll`](crate::Consumer::poll)).
    #[allow(clippy::too_many_lines)]
    pub async fn poll(
        &mut self,
        timeout: Duration,
    ) -> Result<Vec<ShareConsumerRecord>, ConsumerError> {
        // Snapshot the live assignment; with nothing assigned there is nothing
        // to fetch — sleep out the timeout and return empty (matches classic).
        let assignment = self.assignment.lock().await.clone();
        if assignment.is_empty() {
            tokio::time::sleep(timeout).await;
            return Ok(Vec::new());
        }

        // Build the piggyback acknowledgement batches per (topic_id, partition)
        // from the previous poll, draining the source so each ack is sent once.
        let acks = self.take_piggyback_acks();

        // Group assigned partitions by topic id, attaching the (topic_id,
        // partition) acks to the matching partition entry.
        let mut by_topic: HashMap<WireUuid, Vec<(i32, Vec<FetchAckBatch>)>> = HashMap::new();
        for (tid, _name, partition) in &assignment {
            let packs = acks.get(&(*tid, *partition)).cloned().unwrap_or_default();
            by_topic.entry(*tid).or_default().push((*partition, packs));
        }

        let topics: Vec<FetchTopic> = by_topic
            .into_iter()
            .map(|(topic_id, parts)| FetchTopic {
                topic_id,
                partitions: parts
                    .into_iter()
                    .map(
                        |(partition_index, acknowledgement_batches)| FetchPartition {
                            partition_index,
                            partition_max_bytes: PARTITION_MAX_BYTES,
                            acknowledgement_batches,
                            ..Default::default()
                        },
                    )
                    .collect(),
                ..Default::default()
            })
            .collect();

        let timeout_ms = i32::try_from(timeout.as_millis()).unwrap_or(i32::MAX);
        let resp = self
            .client
            .send(ShareFetchRequest {
                group_id: Some(self.group_id.clone()),
                member_id: Some(self.member_id.clone()),
                share_session_epoch: self.share_session_epoch,
                max_wait_ms: timeout_ms,
                min_bytes: 1,
                max_bytes: MAX_BYTES,
                max_records: MAX_RECORDS,
                batch_size: MAX_RECORDS,
                share_acquire_mode: 0,
                is_renew_ack: false,
                topics,
                forgotten_topics_data: vec![],
                ..Default::default()
            })
            .await?;

        if resp.error_code != 0 {
            return Err(ConsumerError::Server(resp.error_code));
        }
        // A successful ShareFetch consumes one session epoch; advance to the
        // value the broker now expects (it stored ours + 1).
        self.share_session_epoch = self.share_session_epoch.wrapping_add(1);

        // Reverse-map topic id → name for the returned rows (the fetch response
        // carries only topic_id). Fall back to the cached topic_names.
        let name_for: HashMap<WireUuid, String> = assignment
            .iter()
            .map(|(tid, name, _)| (*tid, name.clone()))
            .collect();

        let mut out: Vec<ShareConsumerRecord> = Vec::new();
        let mut delivered: Vec<(WireUuid, i32, i64, i64)> = Vec::new();
        for topic in &resp.responses {
            let topic_name = name_for.get(&topic.topic_id).cloned().unwrap_or_default();
            for part in &topic.partitions {
                if part.acknowledge_error_code != 0 {
                    tracing::warn!(
                        topic = %topic_name,
                        partition = part.partition_index,
                        acknowledge_error_code = part.acknowledge_error_code,
                        "share fetch piggyback acknowledge error"
                    );
                }
                if part.error_code != 0 {
                    tracing::warn!(
                        topic = %topic_name,
                        partition = part.partition_index,
                        error_code = part.error_code,
                        "share fetch partition error"
                    );
                    continue;
                }

                // Remember the acquired ranges for the next implicit auto-Accept.
                for ar in &part.acquired_records {
                    delivered.push((
                        topic.topic_id,
                        part.partition_index,
                        ar.first_offset,
                        ar.last_offset,
                    ));
                }

                let Some(payload) = &part.records else {
                    continue;
                };
                let Some(batches) = payload.as_v2() else {
                    continue;
                };
                for batch in batches {
                    if batch.attributes.is_control_batch() {
                        continue;
                    }
                    for r in &batch.records {
                        let offset = batch.base_offset + i64::from(r.offset_delta);
                        // Pair the record with the acquired range that contains
                        // it to read the broker's delivery_count for this offset.
                        let delivery_count = part
                            .acquired_records
                            .iter()
                            .find(|ar| ar.first_offset <= offset && offset <= ar.last_offset)
                            .map_or(0, |ar| ar.delivery_count);
                        out.push(ShareConsumerRecord {
                            topic: topic_name.clone(),
                            partition: part.partition_index,
                            offset,
                            timestamp: batch.base_timestamp + r.timestamp_delta,
                            key: r.key.clone(),
                            value: r.value.clone(),
                            delivery_count,
                        });
                    }
                }
            }
        }

        // Implicit mode auto-Accepts these ranges on the next poll/close; explicit
        // mode acknowledges per record, so the ranges are not auto-accepted.
        self.prev_delivered = delivered;
        Ok(out)
    }

    /// Stage an explicit acknowledgement for `record` (Explicit mode).
    ///
    /// The ack is flushed on the next [`poll`](ShareConsumer::poll) (piggybacked)
    /// or [`commit`](ShareConsumer::commit) (standalone `ShareAcknowledge`).
    ///
    /// # Errors
    ///
    /// Returns [`ConsumerError::IllegalState`] in
    /// [`Implicit`](super::types::ShareAckMode::Implicit) mode: there, every
    /// delivered record is auto-`Accept`ed on the next poll/close, so an explicit
    /// `acknowledge()` cannot be honored (staging it would silently leak into
    /// `pending_acks`, which the implicit path never flushes). This mirrors the
    /// JVM `KafkaShareConsumer`, which raises `IllegalStateException` if you
    /// explicitly acknowledge while in implicit acknowledgement mode.
    pub fn acknowledge(
        &mut self,
        record: &ShareConsumerRecord,
        ack: ShareAckType,
    ) -> Result<(), ConsumerError> {
        if self.ack_mode == ShareAckMode::Implicit {
            return Err(ConsumerError::IllegalState(
                "acknowledge() is not allowed in implicit ack mode; \
                 records are auto-accepted on the next poll/close"
                    .into(),
            ));
        }
        let topic_id = self.topic_id_for(&record.topic);
        self.pending_acks.push((
            topic_id,
            record.partition,
            record.offset,
            record.offset,
            ack.wire(),
        ));
        Ok(())
    }

    /// Renew the acquisition lock on a single delivered `record` (KIP-932
    /// RENEW). Sends a standalone `ShareAcknowledge` with `is_renew_ack = true`
    /// and an empty `acknowledge_types` for the record's offset, which extends
    /// the broker-side lock deadline without changing the record's state. Like
    /// [`acknowledge`](ShareConsumer::acknowledge), this is only valid in
    /// explicit ack mode. Advances the session epoch on success.
    ///
    /// # Errors
    ///
    /// Returns [`ConsumerError::IllegalState`] in
    /// [`Implicit`](super::types::ShareAckMode::Implicit) mode (records are
    /// auto-accepted on the next poll/close, so renewing a lock is meaningless),
    /// and [`ConsumerError::Server`] if the broker rejects the renew.
    pub async fn renew(&mut self, record: &ShareConsumerRecord) -> Result<(), ConsumerError> {
        if self.ack_mode == ShareAckMode::Implicit {
            return Err(ConsumerError::IllegalState(
                "renew() is not allowed in implicit ack mode; \
                 records are auto-accepted on the next poll/close"
                    .into(),
            ));
        }
        let topic_id = self.topic_id_for(&record.topic);
        let topics = vec![AcknowledgeTopic {
            topic_id,
            partitions: vec![AcknowledgePartition {
                partition_index: record.partition,
                acknowledgement_batches: vec![AckAckBatch {
                    first_offset: record.offset,
                    last_offset: record.offset,
                    acknowledge_types: vec![],
                    ..Default::default()
                }],
                ..Default::default()
            }],
            ..Default::default()
        }];

        let resp = self
            .client
            .send(ShareAcknowledgeRequest {
                group_id: Some(self.group_id.clone()),
                member_id: Some(self.member_id.clone()),
                share_session_epoch: self.share_session_epoch,
                is_renew_ack: true,
                topics,
                ..Default::default()
            })
            .await?;
        if resp.error_code != 0 {
            return Err(ConsumerError::Server(resp.error_code));
        }
        self.share_session_epoch = self.share_session_epoch.wrapping_add(1);
        Ok(())
    }

    /// Flush staged explicit acknowledgements via a standalone
    /// `ShareAcknowledge`. No-op when nothing is staged.
    pub async fn commit(&mut self) -> Result<(), ConsumerError> {
        self.flush_pending_acks().await
    }

    /// Drain `pending_acks` into a `ShareAcknowledge`. Advances the session epoch
    /// on success (an accepted `ShareAcknowledge` consumes one epoch, exactly
    /// like a `ShareFetch`). No-op (and no epoch advance) when nothing is staged.
    pub(crate) async fn flush_pending_acks(&mut self) -> Result<(), ConsumerError> {
        if self.pending_acks.is_empty() {
            return Ok(());
        }
        let drained = std::mem::take(&mut self.pending_acks);
        let topics = build_ack_topics(drained);

        let resp = self
            .client
            .send(ShareAcknowledgeRequest {
                group_id: Some(self.group_id.clone()),
                member_id: Some(self.member_id.clone()),
                share_session_epoch: self.share_session_epoch,
                is_renew_ack: false,
                topics,
                ..Default::default()
            })
            .await?;
        if resp.error_code != 0 {
            return Err(ConsumerError::Server(resp.error_code));
        }
        self.share_session_epoch = self.share_session_epoch.wrapping_add(1);
        Ok(())
    }

    /// Build the piggyback acknowledgement batches for the next `ShareFetch`,
    /// keyed by `(topic_id, partition)`, consuming the source state.
    ///
    /// - Implicit: one `Accept` batch per previously-delivered range.
    /// - Explicit: the drained `pending_acks`, grouped into per-offset batches.
    fn take_piggyback_acks(&mut self) -> HashMap<(WireUuid, i32), Vec<FetchAckBatch>> {
        let mut out: HashMap<(WireUuid, i32), Vec<FetchAckBatch>> = HashMap::new();
        match self.ack_mode {
            ShareAckMode::Implicit => {
                for (tid, partition, first, last) in std::mem::take(&mut self.prev_delivered) {
                    let count = usize::try_from(last - first + 1).unwrap_or(0);
                    out.entry((tid, partition))
                        .or_default()
                        .push(FetchAckBatch {
                            first_offset: first,
                            last_offset: last,
                            acknowledge_types: vec![ShareAckType::Accept.wire(); count],
                            ..Default::default()
                        });
                }
            }
            ShareAckMode::Explicit => {
                for (tid, partition, first, last, ack) in std::mem::take(&mut self.pending_acks) {
                    let count = usize::try_from(last - first + 1).unwrap_or(0);
                    out.entry((tid, partition))
                        .or_default()
                        .push(FetchAckBatch {
                            first_offset: first,
                            last_offset: last,
                            acknowledge_types: vec![ack; count],
                            ..Default::default()
                        });
                }
                // Explicit mode never auto-accepts; clear any stale ranges.
                self.prev_delivered.clear();
            }
        }
        out
    }

    /// Resolve a topic id from a topic name via the live assignment / cached
    /// `topic_names`. Returns the zero uuid if unknown (the broker will reject the
    /// ack, surfacing the misuse rather than silently mis-acking).
    fn topic_id_for(&self, name: &str) -> WireUuid {
        // The assignment carries (topic_id, name, partition); use it first since
        // it is the set the application is acking against. `try_lock` keeps this
        // sync (acknowledge() takes &mut self, not async).
        if let Ok(assignment) = self.assignment.try_lock()
            && let Some((tid, _, _)) = assignment.iter().find(|(_, n, _)| n == name)
        {
            return *tid;
        }
        if let Ok(names) = self.topic_names.try_lock()
            && let Some((tid, _)) = names.iter().find(|(_, n)| n.as_str() == name)
        {
            return *tid;
        }
        WireUuid::default()
    }
}

/// Group `(topic_id, partition, first, last, ack_wire)` acks into
/// `ShareAcknowledge` topic/partition/batch shape, coalescing by topic and
/// partition.
fn build_ack_topics(acks: Vec<(WireUuid, i32, i64, i64, i8)>) -> Vec<AcknowledgeTopic> {
    let mut by_topic: HashMap<WireUuid, HashMap<i32, Vec<AckAckBatch>>> = HashMap::new();
    for (tid, partition, first, last, ack) in acks {
        let count = usize::try_from(last - first + 1).unwrap_or(0);
        by_topic
            .entry(tid)
            .or_default()
            .entry(partition)
            .or_default()
            .push(AckAckBatch {
                first_offset: first,
                last_offset: last,
                acknowledge_types: vec![ack; count],
                ..Default::default()
            });
    }
    by_topic
        .into_iter()
        .map(|(topic_id, parts)| AcknowledgeTopic {
            topic_id,
            partitions: parts
                .into_iter()
                .map(
                    |(partition_index, acknowledgement_batches)| AcknowledgePartition {
                        partition_index,
                        acknowledgement_batches,
                        ..Default::default()
                    },
                )
                .collect(),
            ..Default::default()
        })
        .collect()
}