crabka-client-consumer 0.3.2

Subscribe-style consumer client for Apache Kafka in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
//! Background coordinator task — owns the join/sync/heartbeat/rebalance
//! lifecycle for a [`Consumer`](crate::consumer::Consumer).
//!
//! On each tick we either send a `Heartbeat` (steady-state) or run a
//! full `JoinGroup` + `SyncGroup` round (`needs_rejoin`). The broker
//! signals a rebalance via `error_code = 27 (REBALANCE_IN_PROGRESS)`
//! on heartbeat; `25 (UNKNOWN_MEMBER_ID)` forces a from-scratch
//! handshake (clear `member_id`, `generation_id = -1`).
//!
//! Cooperative rebalance (KIP-429) runs phase-1 + phase-2 in place:
//! phase 1 reduces the owned set to the partitions we kept, then we
//! immediately re-Join + re-Sync so the leader can place the freshly
//! freed partitions onto whoever needs them. Eager (`range`) drops the
//! whole assignment and reinstalls in a single round.
//!
//! During a rejoin in flight we deliberately do *not* heartbeat —
//! `JoinGroup` resets the broker-side session timer.

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;

use crabka_client_core::Client;
use crabka_protocol::owned::heartbeat_request::HeartbeatRequest;
use crabka_protocol::owned::join_group_request::{JoinGroupRequest, JoinGroupRequestProtocol};
use crabka_protocol::owned::join_group_response::JoinGroupResponse;
use crabka_protocol::owned::leave_group_request::{LeaveGroupRequest, MemberIdentity};
use crabka_protocol::owned::metadata_request::MetadataRequest;
use crabka_protocol::owned::offset_commit_request::OffsetCommitRequest;
use crabka_protocol::owned::sync_group_request::{SyncGroupRequest, SyncGroupRequestAssignment};
use crabka_protocol::owned::sync_group_response::SyncGroupResponse;
use crabka_protocol::primitives::uuid::Uuid as WireUuid;

use crate::assignor::{Assignor, RebalanceProtocol};
use crate::builder::{
    AutoOffsetReset, decode_assignment, decode_subscription, encode_assignment, encode_subscription,
};
use crate::error::ConsumerError;
use crate::offset_wire::{build_commit_topics, build_offset_fetch, id_to_name, parse_offset_fetch};

/// Retriable group-coordinator error codes. The coordinator is loading its
/// state (`14`), not yet available (`15`), or has moved to another broker
/// (`16`). Kafka clients retry these with backoff rather than failing.
pub(crate) const COORDINATOR_LOAD_IN_PROGRESS: i16 = 14;
pub(crate) const COORDINATOR_NOT_AVAILABLE: i16 = 15;
pub(crate) const NOT_COORDINATOR: i16 = 16;

/// How long `with_coordinator_retry` keeps retrying a cold coordinator before
/// surfacing the last error. Matches a typical client `request.timeout.ms`.
pub(crate) const COORDINATOR_RETRY_TIMEOUT: Duration = Duration::from_secs(30);

fn is_retriable_coordinator_code(code: i16) -> bool {
    matches!(
        code,
        COORDINATOR_LOAD_IN_PROGRESS | COORDINATOR_NOT_AVAILABLE | NOT_COORDINATOR
    )
}

/// Send a group-coordinator RPC, retrying on cold-coordinator codes
/// (14/15/16) and transient `Disconnected` transport errors with capped
/// exponential backoff until `timeout` elapses. `make` rebuilds the request
/// each attempt (so it can be re-sent); `code` reads the response's
/// `error_code`. On deadline, returns the last response (so the caller's
/// `error_code` handling runs) or `CoordinatorUnavailable` if the last attempt
/// was a disconnect.
pub(crate) async fn with_coordinator_retry<R, F, Fut>(
    timeout: Duration,
    code: impl Fn(&R) -> i16,
    make: F,
) -> Result<R, ConsumerError>
where
    F: Fn() -> Fut,
    Fut: std::future::Future<Output = Result<R, ConsumerError>>,
{
    const MAX_BACKOFF: Duration = Duration::from_secs(1);
    let start = tokio::time::Instant::now();
    let mut backoff = Duration::from_millis(100);
    loop {
        match make().await {
            Ok(r) if !is_retriable_coordinator_code(code(&r)) => return Ok(r),
            Ok(r) => {
                if start.elapsed() >= timeout {
                    return Ok(r);
                }
            }
            Err(ConsumerError::Client(crabka_client_core::ClientError::Disconnected)) => {
                if start.elapsed() >= timeout {
                    return Err(ConsumerError::CoordinatorUnavailable);
                }
            }
            Err(e) => return Err(e),
        }
        tokio::time::sleep(backoff).await;
        backoff = (backoff * 2).min(MAX_BACKOFF);
    }
}

/// Mutable state owned exclusively by the coordinator task.
///
/// The `Arc<Mutex<...>>` fields are shared with the parent `Consumer`
/// so that `poll()` / `assignment()` see live updates as rebalances
/// land. Plain (non-`Arc`) fields are exclusive to the coordinator
/// and may be mutated freely — `member_id` and `generation_id` change
/// on a from-scratch rejoin.
pub(crate) struct CoordinatorState {
    pub client: Client,
    pub group_id: String,
    pub member_id: String,
    pub generation_id: i32,
    pub assignor: Assignor,
    pub subscribed_topics: Vec<String>,
    pub assigned: Arc<Mutex<Vec<(String, i32)>>>,
    pub next_offsets: Arc<Mutex<HashMap<(String, i32), i64>>>,
    pub positions: Arc<Mutex<HashMap<(String, i32), crate::position::PartitionPosition>>>,
    pub topic_ids: Arc<Mutex<HashMap<String, WireUuid>>>,
    pub session_timeout: Duration,
    pub rebalance_timeout: Duration,
    pub heartbeat_interval: Duration,
    pub auto_offset_reset: AutoOffsetReset,
    pub client_rack: Option<String>,
}

/// Outcome of a single heartbeat RPC.
enum HeartbeatOutcome {
    /// `error_code == 0`.
    Ok,
    /// `REBALANCE_IN_PROGRESS (27)` or `ILLEGAL_GENERATION (22)` — rejoin
    /// with the current `member_id`. `ILLEGAL_GENERATION` fires when our
    /// heartbeat tick lands after the broker has already advanced past
    /// the generation we last synced on (e.g. a rebalance completed
    /// while we were between heartbeat windows); without a rejoin we'd
    /// keep heartbeating the dead generation forever and never pick up
    /// the new assignment.
    NeedRejoin,
    /// `UNKNOWN_MEMBER_ID (25)` — clear `member_id` + rejoin from scratch.
    RejoinFromScratch,
    /// Transport error or unexpected non-fatal broker code; retry on next tick.
    Transient,
}

/// Drive the heartbeat + rebalance loop until `shutdown` fires.
///
/// On entry the caller has already done one initial Join+Sync, so we
/// begin in steady-state heartbeating. `needs_rejoin` becomes `true`
/// as soon as the broker signals a rebalance; the next tick performs
/// the rejoin in place of heartbeating.
pub(crate) async fn run(mut state: CoordinatorState, shutdown: CancellationToken) {
    let mut ticker = tokio::time::interval(state.heartbeat_interval);
    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
    let mut needs_rejoin = false;

    loop {
        tokio::select! {
            () = shutdown.cancelled() => break,
            _ = ticker.tick() => {}
        }

        // Race the per-tick RPCs against shutdown so `close()` returns
        // promptly even when we're mid-rebalance and the broker is holding
        // a JoinGroup / SyncGroup open. Without this, cancellation is only
        // observed *between* ticks, so a `rejoin()` in flight against an
        // open broker call would stall `close()` for up to session_timeout.
        // The RPC futures are cancellation-safe: `Client` multiplexes on
        // correlation ids, so dropping an in-flight send only abandons its
        // pending response — it can't corrupt the connection.
        if needs_rejoin {
            tokio::select! {
                () = shutdown.cancelled() => break,
                result = rejoin(&mut state) => match result {
                    Ok(()) => needs_rejoin = false,
                    Err(e) => {
                        tracing::warn!(error = %e, "rejoin failed; will retry on next tick");
                    }
                },
            }
        } else {
            tokio::select! {
                () = shutdown.cancelled() => break,
                outcome = heartbeat_once(&state) => match outcome {
                    HeartbeatOutcome::Ok | HeartbeatOutcome::Transient => {}
                    HeartbeatOutcome::NeedRejoin => needs_rejoin = true,
                    HeartbeatOutcome::RejoinFromScratch => {
                        state.member_id.clear();
                        state.generation_id = -1;
                        needs_rejoin = true;
                    }
                },
            }
        }
    }

    // Graceful departure: tell the broker to evict us *now* rather than
    // waiting out `session_timeout`. This MUST use `state.member_id`, which
    // is the live id — a from-scratch rejoin (`UNKNOWN_MEMBER_ID`) replaces
    // it mid-life, so the `Consumer`'s build-time copy can be stale. Leaving
    // with a stale id is a silent no-op that orphans the real member until
    // its session expires, stalling the rest of the group's rebalance.
    // Best-effort and bounded: a hung broker must not block `close()`.
    leave_group(&state).await;
}

/// Best-effort `LeaveGroup` for the coordinator's *current* member id.
///
/// Sent once, on shutdown, with a short timeout: the broker falling back to
/// session-timeout eviction is harmless on close, but a stalled send would
/// hang `close()` (which awaits this task). Mirrors the Java client, which
/// leaves the group on close for dynamic members. Skips a cleared id (a
/// from-scratch rejoin that never re-completed), which the broker wouldn't
/// recognize anyway.
async fn leave_group(state: &CoordinatorState) {
    if state.member_id.is_empty() {
        return;
    }
    // `member_id` is populated for both the v0–v2 (top-level) and v3+
    // (`members` array) wire shapes so the negotiated version picks up
    // whichever it serializes.
    let send = state.client.send(LeaveGroupRequest {
        group_id: state.group_id.clone(),
        member_id: state.member_id.clone(),
        members: vec![MemberIdentity {
            member_id: state.member_id.clone(),
            ..Default::default()
        }],
        ..Default::default()
    });
    let _ = tokio::time::timeout(Duration::from_secs(5), send).await;
}

/// Send one `Heartbeat` and translate the response into a directive.
async fn heartbeat_once(state: &CoordinatorState) -> HeartbeatOutcome {
    let result = state
        .client
        .send(HeartbeatRequest {
            group_id: state.group_id.clone(),
            generation_id: state.generation_id,
            member_id: state.member_id.clone(),
            ..Default::default()
        })
        .await;
    match result {
        Ok(r) if r.error_code == 0 => HeartbeatOutcome::Ok,
        Ok(r) if r.error_code == 27 || r.error_code == 22 => HeartbeatOutcome::NeedRejoin,
        Ok(r) if r.error_code == 25 => HeartbeatOutcome::RejoinFromScratch,
        Ok(r) => {
            tracing::warn!(error_code = r.error_code, "unexpected heartbeat error");
            HeartbeatOutcome::Transient
        }
        Err(e) => {
            tracing::warn!(error = %e, "heartbeat send failed");
            HeartbeatOutcome::Transient
        }
    }
}

/// Run one complete rebalance round (Join + Sync), then mutate the
/// shared `assigned` / `next_offsets` snapshots in place.
///
/// For [`RebalanceProtocol::Cooperative`] this may issue *two* Join+Sync
/// rounds back-to-back: the first to install the kept partitions only,
/// the second (phase 2) to receive the freshly placed ones. See KIP-429.
async fn rejoin(state: &mut CoordinatorState) -> Result<(), ConsumerError> {
    let owned: Vec<(String, i32)> = state.assigned.lock().await.clone();
    let (new_assignment, new_generation, _protocol_name) = join_and_sync(state, &owned).await?;

    let old_set: HashSet<(String, i32)> = owned.iter().cloned().collect();
    let new_set: HashSet<(String, i32)> = new_assignment.iter().cloned().collect();
    let revoked: Vec<(String, i32)> = old_set.difference(&new_set).cloned().collect();
    let added: Vec<(String, i32)> = new_set.difference(&old_set).cloned().collect();

    match state.assignor.rebalance_protocol() {
        RebalanceProtocol::Eager => {
            // Drop everything and reinstall in a single round. Prime the
            // added partitions' fetch offsets *before* publishing the new
            // assignment: `poll()` defaults an assigned-but-unprimed
            // partition to offset 0 (poll.rs's `unwrap_or(0)`), so a poll
            // racing between the `assigned` publish and the prime would
            // re-fetch from 0 and re-deliver already-consumed records. Prime
            // first → a partition is only visible in `assigned` once its
            // next_offset is established.
            prime_offsets(state, &added).await?;
            {
                let mut a = state.assigned.lock().await;
                a.clone_from(&new_assignment);
            }
            {
                let mut off = state.next_offsets.lock().await;
                off.retain(|k, _| new_set.contains(k));
                // Prune the KIP-320 position sidecar in lockstep so stale
                // epoch metadata for dropped partitions doesn't accumulate.
                let mut pos = state.positions.lock().await;
                pos.retain(|k, _| new_set.contains(k));
            }
            state.generation_id = new_generation;
        }
        RebalanceProtocol::Cooperative => {
            if revoked.is_empty() {
                // Pure additions: merge into the existing assigned set.
                // No phase 2 needed because no member needed to revoke.
                //
                // Prime the added partitions' fetch offsets *before*
                // publishing them into `assigned`: a `poll()` racing the
                // rebalance would otherwise see an assigned-but-unprimed
                // partition and fetch it from offset 0 (poll.rs's
                // `unwrap_or(0)`), re-delivering records the previous owner
                // already committed past at revoke time.
                prime_offsets(state, &added).await?;
                {
                    let mut a = state.assigned.lock().await;
                    for p in &added {
                        if !a.contains(p) {
                            a.push(p.clone());
                        }
                    }
                }
                state.generation_id = new_generation;
            } else {
                // Phase 1: drop the partitions we're losing, then
                // immediately rejoin so the leader can place them on
                // whoever needs them in phase 2. Keeping kept partitions
                // active throughout is the whole point of KIP-429.
                {
                    let mut a = state.assigned.lock().await;
                    a.retain(|p| !revoked.contains(p));
                }
                // Adopt the generation from round 1 *before* committing: the
                // broker advanced the group epoch when we rejoined above, so
                // an OffsetCommit carrying the pre-rebalance generation is
                // rejected with ILLEGAL_GENERATION. Commit the revoked
                // partitions' positions under the current generation so the
                // member that picks them up in phase 2 primes from the offset
                // we'd consumed to, rather than re-delivering records we
                // already saw (KIP-429 onPartitionsRevoked semantics).
                state.generation_id = new_generation;
                commit_revoked(state, &revoked).await;
                {
                    let mut off = state.next_offsets.lock().await;
                    let mut pos = state.positions.lock().await;
                    for p in &revoked {
                        off.remove(p);
                        // Prune the KIP-320 position sidecar in lockstep.
                        pos.remove(p);
                    }
                }

                // Phase 2: rejoin with the reduced owned-set.
                let owned_after_revoke: Vec<(String, i32)> = state.assigned.lock().await.clone();
                let (assignment2, gen2, _) = join_and_sync(state, &owned_after_revoke).await?;
                let owned_after_revoke_set: HashSet<(String, i32)> =
                    owned_after_revoke.iter().cloned().collect();
                let added2: Vec<(String, i32)> = assignment2
                    .iter()
                    .filter(|p| !owned_after_revoke_set.contains(*p))
                    .cloned()
                    .collect();
                // Prime the freshly placed partitions *before* publishing the
                // phase-2 assignment, so a poll racing the rebalance can't
                // observe them in `assigned` without a primed next_offset and
                // fetch from 0 (poll.rs). That primed value is the offset the
                // revoking member committed at revoke time; fetching from 0
                // instead would re-deliver the records it already consumed.
                prime_offsets(state, &added2).await?;
                {
                    let mut a = state.assigned.lock().await;
                    *a = assignment2;
                }
                state.generation_id = gen2;
            }
        }
    }
    Ok(())
}

/// Best-effort `OffsetCommit` for partitions being revoked in a
/// cooperative rebalance, using the current (pre-rebalance) generation.
///
/// Failures are logged and swallowed: a revoke-time commit racing the
/// generation bump can return `ILLEGAL_GENERATION`, and surfacing that
/// into `poll()` would break the KIP-429 transparency guarantee. Worst
/// case the new owner re-delivers a few records (at-least-once).
async fn commit_revoked(state: &CoordinatorState, revoked: &[(String, i32)]) {
    let revoked_set: HashSet<&(String, i32)> = revoked.iter().collect();
    let offsets: HashMap<(String, i32), (i64, i32)> = {
        let off = state.next_offsets.lock().await;
        let pos = state.positions.lock().await;
        off.iter()
            // Only commit partitions where we actually consumed something. A
            // next_offset still at its reset baseline (0 = Earliest, i64::MAX =
            // Latest) means no records were polled, so there is no progress to
            // preserve — committing it just adds a blocking round-trip that
            // widens the mid-rebalance generation-race window.
            .filter(|(k, v)| revoked_set.contains(k) && **v > 0 && **v != i64::MAX)
            .map(|(k, v)| {
                let epoch = pos.get(k).map_or(-1, |p| p.offset_epoch);
                (k.clone(), (*v, epoch))
            })
            .collect()
    };
    if offsets.is_empty() {
        return;
    }
    let topic_ids = state.topic_ids.lock().await.clone();
    let topics = build_commit_topics(offsets, &topic_ids);
    let res = state
        .client
        .send(OffsetCommitRequest {
            group_id: state.group_id.clone(),
            generation_id_or_member_epoch: state.generation_id,
            member_id: state.member_id.clone(),
            topics,
            ..Default::default()
        })
        .await;
    match res {
        Ok(_) => {}
        Err(e) => {
            tracing::warn!(error = %e, "revoke-time offset commit failed; partitions may re-deliver");
        }
    }
}

/// Issue `JoinGroup` (handling the `MEMBER_ID_REQUIRED` two-step when
/// our `member_id` is empty), assign as leader if we won the election,
/// then `SyncGroup`. Returns `(assignment, generation_id, protocol_name)`.
// Sequential join/sync state machine; splitting fragments the linear
// MEMBER_ID_REQUIRED → leader-assign → SyncGroup flow.
#[allow(clippy::too_many_lines)]
async fn join_and_sync(
    state: &mut CoordinatorState,
    owned: &[(String, i32)],
) -> Result<(Vec<(String, i32)>, i32, String), ConsumerError> {
    let session_timeout_ms = i32::try_from(state.session_timeout.as_millis()).unwrap_or(i32::MAX);
    let rebalance_timeout_ms =
        i32::try_from(state.rebalance_timeout.as_millis()).unwrap_or(i32::MAX);

    let subscription_bytes = encode_subscription(
        &state.subscribed_topics,
        owned,
        state.generation_id,
        state.client_rack.as_deref(),
    );
    let protocol_name = state.assignor.protocol_name().to_string();

    // First join: if we have no member_id, expect MEMBER_ID_REQUIRED (79) and
    // capture the broker-assigned id, then issue a second join. Retry a cold or
    // relocating coordinator (14/15/16) with backoff on each send.
    let r1 = with_coordinator_retry(
        COORDINATOR_RETRY_TIMEOUT,
        |r: &JoinGroupResponse| r.error_code,
        || {
            let group_id = state.group_id.clone();
            let member_id = state.member_id.clone();
            let protocol_name = protocol_name.clone();
            let subscription_bytes = subscription_bytes.clone();
            let client = &state.client;
            async move {
                client
                    .send(JoinGroupRequest {
                        group_id,
                        protocol_type: "consumer".into(),
                        member_id,
                        session_timeout_ms,
                        rebalance_timeout_ms,
                        protocols: vec![JoinGroupRequestProtocol {
                            name: protocol_name,
                            metadata: subscription_bytes,
                            ..Default::default()
                        }],
                        ..Default::default()
                    })
                    .await
                    .map_err(ConsumerError::from)
            }
        },
    )
    .await?;
    let join_resp = if r1.error_code == 0 {
        r1
    } else if r1.error_code == 79 {
        let assigned_id = r1.member_id.clone();
        if assigned_id.is_empty() {
            return Err(ConsumerError::RebalanceFailed(
                "broker did not assign a member_id".into(),
            ));
        }
        state.member_id.clone_from(&assigned_id);
        let r2 = with_coordinator_retry(
            COORDINATOR_RETRY_TIMEOUT,
            |r: &JoinGroupResponse| r.error_code,
            || {
                let group_id = state.group_id.clone();
                let assigned_id = assigned_id.clone();
                let protocol_name = protocol_name.clone();
                let subscription_bytes = subscription_bytes.clone();
                let client = &state.client;
                async move {
                    client
                        .send(JoinGroupRequest {
                            group_id,
                            protocol_type: "consumer".into(),
                            member_id: assigned_id,
                            session_timeout_ms,
                            rebalance_timeout_ms,
                            protocols: vec![JoinGroupRequestProtocol {
                                name: protocol_name,
                                metadata: subscription_bytes,
                                ..Default::default()
                            }],
                            ..Default::default()
                        })
                        .await
                        .map_err(ConsumerError::from)
                }
            },
        )
        .await?;
        if r2.error_code != 0 {
            return Err(ConsumerError::Server(r2.error_code));
        }
        r2
    } else {
        return Err(ConsumerError::Server(r1.error_code));
    };

    // The broker may have refreshed our member_id on this join too.
    if !join_resp.member_id.is_empty() {
        state.member_id.clone_from(&join_resp.member_id);
    }
    let chosen_protocol = join_resp
        .protocol_name
        .clone()
        .unwrap_or_else(|| protocol_name.clone());
    let generation_id = join_resp.generation_id;

    // Leader: resolve partition counts via Metadata and run the assignor.
    let is_leader = join_resp.leader == state.member_id;
    let assignments_for_sync: Vec<SyncGroupRequestAssignment> = if is_leader {
        let md = state.client.send(MetadataRequest::default()).await?;
        let mut topic_partitions: HashMap<String, i32> = HashMap::new();
        let mut resolved_ids: HashMap<String, WireUuid> = HashMap::new();
        for t in &md.topics {
            let Some(name) = &t.name else { continue };
            if state.subscribed_topics.iter().any(|s| s == name) {
                let count = i32::try_from(t.partitions.len()).unwrap_or(i32::MAX);
                topic_partitions.insert(name.clone(), count);
                resolved_ids.insert(name.clone(), t.topic_id);
            }
        }
        // Push the freshly resolved topic_ids into the shared map so
        // poll() can satisfy newly added partitions on Fetch v ≥ 13.
        {
            let mut ids = state.topic_ids.lock().await;
            for (k, v) in resolved_ids {
                ids.insert(k, v);
            }
        }

        let decoded: Vec<(String, crate::builder::DecodedSubscription)> = join_resp
            .members
            .iter()
            .map(|m| (m.member_id.clone(), decode_subscription(&m.metadata)))
            .collect();

        let assignments = match state.assignor {
            Assignor::Range => {
                let inputs: Vec<(String, Vec<String>)> = decoded
                    .into_iter()
                    .map(|(id, sub)| (id, sub.topics))
                    .collect();
                crate::assignor::range::assign(inputs, &topic_partitions)
            }
            Assignor::CooperativeSticky => {
                let inputs: Vec<crate::assignor::cooperative_sticky::MemberInput> = decoded
                    .into_iter()
                    .map(|(id, sub)| (id, sub.topics, sub.owned, sub.generation_id))
                    .collect();
                crate::assignor::cooperative_sticky::assign(&inputs, &topic_partitions)
            }
        };
        assignments
            .into_iter()
            .map(|(m, partitions)| SyncGroupRequestAssignment {
                member_id: m,
                assignment: encode_assignment(&partitions),
                ..Default::default()
            })
            .collect()
    } else {
        Vec::new()
    };

    let sync_resp = with_coordinator_retry(
        COORDINATOR_RETRY_TIMEOUT,
        |r: &SyncGroupResponse| r.error_code,
        || {
            let group_id = state.group_id.clone();
            let member_id = state.member_id.clone();
            let chosen_protocol = chosen_protocol.clone();
            let assignments_for_sync = assignments_for_sync.clone();
            let client = &state.client;
            async move {
                client
                    .send(SyncGroupRequest {
                        group_id,
                        generation_id,
                        member_id,
                        protocol_type: Some("consumer".into()),
                        protocol_name: Some(chosen_protocol),
                        assignments: assignments_for_sync,
                        ..Default::default()
                    })
                    .await
                    .map_err(ConsumerError::from)
            }
        },
    )
    .await?;
    if sync_resp.error_code != 0 {
        return Err(ConsumerError::Server(sync_resp.error_code));
    }
    let my_assignment = decode_assignment(&sync_resp.assignment);
    Ok((my_assignment, generation_id, chosen_protocol))
}

/// Populate `next_offsets` for newly added partitions by batch-fetching
/// committed offsets, falling back to `auto.offset.reset` semantics
/// when no commit exists. Mirrors the initial-prime in
/// `consumer.rs::start` step 5.
async fn prime_offsets(
    state: &CoordinatorState,
    partitions: &[(String, i32)],
) -> Result<(), ConsumerError> {
    if partitions.is_empty() {
        return Ok(());
    }
    let mut by_topic: HashMap<String, Vec<i32>> = HashMap::new();
    for (t, p) in partitions {
        by_topic.entry(t.clone()).or_default().push(*p);
    }
    let topic_ids = state.topic_ids.lock().await.clone();
    let of = state
        .client
        .send(build_offset_fetch(&state.group_id, &by_topic, &topic_ids))
        .await?;

    let id_to_name = id_to_name(&topic_ids);
    let mut offsets = state.next_offsets.lock().await;
    let mut positions = state.positions.lock().await;
    let mut seen: HashSet<(String, i32)> = HashSet::new();
    for (name, partition_index, committed, committed_epoch) in parse_offset_fetch(&of, &id_to_name)
    {
        let starting = if committed >= 0 {
            committed
        } else {
            match state.auto_offset_reset {
                AutoOffsetReset::Earliest => 0,
                // Resolved lazily by poll::resolve_latest_sentinels.
                AutoOffsetReset::Latest | AutoOffsetReset::None => i64::MAX,
            }
        };
        let key = (name, partition_index);
        seen.insert(key.clone());
        offsets.insert(key.clone(), starting);
        positions.entry(key).or_default().offset_epoch = committed_epoch;
    }
    // The broker may omit partitions that have no commit record at all;
    // ensure every requested partition has an entry so poll() can find it.
    for tp in partitions {
        if !seen.contains(tp) {
            let starting = match state.auto_offset_reset {
                AutoOffsetReset::Earliest => 0,
                AutoOffsetReset::Latest | AutoOffsetReset::None => i64::MAX,
            };
            offsets.insert(tp.clone(), starting);
            positions.entry(tp.clone()).or_default();
        }
    }
    Ok(())
}

#[cfg(test)]
mod retry_tests {
    use super::*;
    use assert2::assert;
    use std::sync::atomic::{AtomicUsize, Ordering};

    struct Resp {
        error_code: i16,
    }

    #[tokio::test(start_paused = true)]
    async fn retries_until_coordinator_finishes_loading() {
        let calls = AtomicUsize::new(0);
        let r = with_coordinator_retry(
            Duration::from_secs(30),
            |r: &Resp| r.error_code,
            || {
                let n = calls.fetch_add(1, Ordering::SeqCst);
                async move {
                    // COORDINATOR_LOAD_IN_PROGRESS (14) thrice, then success.
                    Ok::<_, ConsumerError>(Resp {
                        error_code: if n < 3 { 14 } else { 0 },
                    })
                }
            },
        )
        .await
        .unwrap();
        assert!(r.error_code == 0);
        assert!(calls.load(Ordering::SeqCst) == 4);
    }

    #[tokio::test(start_paused = true)]
    async fn surfaces_last_response_after_deadline() {
        let r = with_coordinator_retry(
            Duration::from_secs(1),
            |r: &Resp| r.error_code,
            || async { Ok::<_, ConsumerError>(Resp { error_code: 15 }) },
        )
        .await
        .unwrap();
        // Deadline hit while still retriable: return the last response so the
        // caller's `error_code != 0` handling surfaces it.
        assert!(r.error_code == 15);
    }

    #[tokio::test(start_paused = true)]
    async fn non_retriable_code_returns_immediately() {
        let calls = AtomicUsize::new(0);
        let r = with_coordinator_retry(
            Duration::from_secs(30),
            |r: &Resp| r.error_code,
            || {
                calls.fetch_add(1, Ordering::SeqCst);
                async move { Ok::<_, ConsumerError>(Resp { error_code: 25 }) } // UNKNOWN_MEMBER_ID
            },
        )
        .await
        .unwrap();
        assert!(r.error_code == 25);
        assert!(calls.load(Ordering::SeqCst) == 1);
    }

    #[tokio::test(start_paused = true)]
    async fn disconnect_past_deadline_surfaces_coordinator_unavailable() {
        let r = with_coordinator_retry(
            Duration::from_secs(1),
            |r: &Resp| r.error_code,
            || async {
                Err::<Resp, _>(ConsumerError::Client(
                    crabka_client_core::ClientError::Disconnected,
                ))
            },
        )
        .await;
        assert!(matches!(r, Err(ConsumerError::CoordinatorUnavailable)));
    }
}