crabka-client-consumer 0.3.5

Subscribe-style consumer client for Apache Kafka in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
//! `Consumer` — public lifecycle handle. Built via [`Consumer::builder`].
//! Subscribe-only — no `assign()`. Use `crabka-client-core` directly for
//! manual partition consumption.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

use crabka_client_core::Client;
use crabka_protocol::owned::join_group_request::{JoinGroupRequest, JoinGroupRequestProtocol};
use crabka_protocol::owned::join_group_response::JoinGroupResponse;
use crabka_protocol::owned::sync_group_request::{SyncGroupRequest, SyncGroupRequestAssignment};
use crabka_protocol::owned::sync_group_response::SyncGroupResponse;
use crabka_protocol::primitives::uuid::Uuid as WireUuid;

use crate::assignor::Assignor;
use crate::builder::{
    AutoOffsetReset, IsolationLevel, decode_assignment, decode_subscription, encode_assignment,
    encode_subscription,
};
use crate::coordinator::{COORDINATOR_RETRY_TIMEOUT, CoordinatorState, with_coordinator_retry};
use crate::error::ConsumerError;
use crate::group_metadata::ConsumerGroupMetadata;

/// Subscribe-style consumer handle. Construct via [`Consumer::builder`].
#[allow(dead_code)] // `session_timeout` / `heartbeat_interval` / `generation_id`
// are captured for diagnostics; the live values are owned
// by the coordinator task post-start.
pub struct Consumer {
    pub(crate) client: Client,
    pub(crate) group_id: String,
    pub(crate) member_id: String,
    /// Captured at start; not kept in sync as the coordinator rejoins.
    pub(crate) generation_id: i32,
    pub(crate) subscribed_topics: Vec<String>,
    /// Current assigned partitions: `(topic, partition_index)`.
    pub(crate) assigned: Arc<Mutex<Vec<(String, i32)>>>,
    /// Next offset to fetch per partition.
    pub(crate) next_offsets: Arc<Mutex<HashMap<(String, i32), i64>>>,
    /// KIP-320 per-partition leader-epoch metadata, keyed like `next_offsets`.
    pub(crate) positions: Arc<Mutex<HashMap<(String, i32), crate::position::PartitionPosition>>>,
    /// Topic UUIDs resolved at build time. Required by Fetch v ≥ 13
    /// (which carries `topic_id` instead of the topic name).
    pub(crate) topic_ids: Arc<Mutex<HashMap<String, WireUuid>>>,
    pub(crate) session_timeout: Duration,
    pub(crate) heartbeat_interval: Duration,
    #[allow(dead_code)]
    pub(crate) assignor: Assignor,
    pub(crate) coordinator_shutdown: CancellationToken,
    pub(crate) coordinator_handle: Option<JoinHandle<()>>,
    /// Controls which records are returned by `poll`.
    pub(crate) isolation_level: IsolationLevel,
    /// What `poll` does on a missing offset / detected truncation. `None`
    /// surfaces `ConsumerError::LogTruncation`; otherwise the safe offset is
    /// applied (KIP-320).
    pub(crate) auto_offset_reset: AutoOffsetReset,
}

/// One record returned by `Consumer::poll`.
#[derive(Debug, Clone)]
pub struct ConsumerRecord {
    pub topic: String,
    pub partition: i32,
    pub offset: i64,
    pub leader_epoch: i32,
    pub timestamp: i64,
    pub key: Option<Bytes>,
    pub value: Option<Bytes>,
}

#[bon::bon]
impl Consumer {
    /// Build a [`Consumer`] subscribed to the given topics: resolve bootstrap,
    /// `JoinGroup` (twice), compute the assignment if we're the elected
    /// leader, `SyncGroup`, prime offsets, then spawn the coordinator task
    /// that owns the heartbeat + rebalance loop.
    #[builder(start_fn = builder, finish_fn = build)]
    #[allow(clippy::too_many_lines)]
    pub async fn start(
        #[builder(into)] bootstrap: String,
        #[builder(into, default = "crabka-consumer".to_string())] client_id: String,
        #[builder(into)] group_id: String,
        #[builder(default = std::time::Duration::from_secs(45))]
        session_timeout: std::time::Duration,
        #[builder(default = std::time::Duration::from_mins(1))]
        rebalance_timeout: std::time::Duration,
        #[builder(default = std::time::Duration::from_secs(3))]
        heartbeat_interval: std::time::Duration,
        #[builder(into)] subscribe: Vec<String>,
        #[builder(default = AutoOffsetReset::Latest)] auto_offset_reset: AutoOffsetReset,
        #[builder(default = IsolationLevel::ReadUncommitted)] isolation_level: IsolationLevel,
        #[builder(default = Assignor::Range)] assignor: Assignor,
        #[builder(into)] client_rack: Option<String>,
        security: Option<crabka_client_core::security::ClientSecurity>,
    ) -> Result<Self, ConsumerError> {
        if subscribe.is_empty() {
            return Err(ConsumerError::NotSubscribed);
        }
        if group_id.is_empty() {
            return Err(ConsumerError::RebalanceFailed("group_id required".into()));
        }

        let client = Client::builder()
            .bootstrap(&bootstrap)
            .client_id(client_id.clone())
            .maybe_security(security.clone())
            .build()
            .await?;

        let session_timeout_ms = i32::try_from(session_timeout.as_millis()).unwrap_or(i32::MAX);
        let rebalance_timeout_ms = i32::try_from(rebalance_timeout.as_millis()).unwrap_or(i32::MAX);

        // First JoinGroup uses empty `owned_partitions` + `generation_id=-1`:
        // we've never been in the group before, so we have nothing to claim
        // and no prior generation to defend against zombie ownership.
        let subscription_bytes = encode_subscription(&subscribe, &[], -1, client_rack.as_deref());
        let protocol_name = assignor.protocol_name().to_string();

        // 1. First JoinGroup — empty member_id, expect MEMBER_ID_REQUIRED (79)
        //    or a regular response; either way the broker hands us a member_id.
        //    Retry a cold/relocating coordinator (14/15/16) with backoff.
        let r1 = with_coordinator_retry(
            COORDINATOR_RETRY_TIMEOUT,
            |r: &JoinGroupResponse| r.error_code,
            || {
                let group_id = group_id.clone();
                let protocol_name = protocol_name.clone();
                let subscription_bytes = subscription_bytes.clone();
                let client = &client;
                async move {
                    client
                        .send(JoinGroupRequest {
                            group_id,
                            protocol_type: "consumer".into(),
                            member_id: String::new(),
                            session_timeout_ms,
                            rebalance_timeout_ms,
                            protocols: vec![JoinGroupRequestProtocol {
                                name: protocol_name,
                                metadata: subscription_bytes,
                                ..Default::default()
                            }],
                            ..Default::default()
                        })
                        .await
                        .map_err(ConsumerError::from)
                }
            },
        )
        .await?;
        let member_id = if r1.error_code == 79 || r1.error_code == 0 {
            r1.member_id.clone()
        } else {
            return Err(ConsumerError::Server(r1.error_code));
        };
        if member_id.is_empty() {
            return Err(ConsumerError::RebalanceFailed(
                "broker did not assign a member_id".into(),
            ));
        }

        // 2. Second JoinGroup with the assigned member_id.
        let r2 = with_coordinator_retry(
            COORDINATOR_RETRY_TIMEOUT,
            |r: &JoinGroupResponse| r.error_code,
            || {
                let group_id = group_id.clone();
                let protocol_name = protocol_name.clone();
                let subscription_bytes = subscription_bytes.clone();
                let member_id = member_id.clone();
                let client = &client;
                async move {
                    client
                        .send(JoinGroupRequest {
                            group_id,
                            protocol_type: "consumer".into(),
                            member_id,
                            session_timeout_ms,
                            rebalance_timeout_ms,
                            protocols: vec![JoinGroupRequestProtocol {
                                name: protocol_name,
                                metadata: subscription_bytes,
                                ..Default::default()
                            }],
                            ..Default::default()
                        })
                        .await
                        .map_err(ConsumerError::from)
                }
            },
        )
        .await?;
        if r2.error_code != 0 {
            return Err(ConsumerError::Server(r2.error_code));
        }

        // 3. Always issue a Metadata to resolve topic_ids (needed for
        //    Fetch v ≥ 13). If we are the leader, also use the partition
        //    counts to compute the assignment.
        //    `refresh_metadata` (not a bare `send`) so the main client's
        //    BrokerPool learns each broker's (id → addr) mapping up front,
        //    letting `poll`/`validate` route to partition leaders immediately
        //    rather than waiting for the first `refresh_leader_epochs` pass.
        let md = client.refresh_metadata().await?;
        let mut topic_ids: HashMap<String, WireUuid> = HashMap::new();
        let mut topic_partitions: HashMap<String, i32> = HashMap::new();
        for t in &md.topics {
            let Some(name) = &t.name else { continue };
            if subscribe.iter().any(|s| s == name) {
                let count = i32::try_from(t.partitions.len()).unwrap_or(i32::MAX);
                topic_partitions.insert(name.clone(), count);
                topic_ids.insert(name.clone(), t.topic_id);
            }
        }

        let is_leader = r2.leader == member_id;
        let assignments_for_sync: Vec<SyncGroupRequestAssignment> = if is_leader {
            let assignments = match assignor {
                Assignor::Range => {
                    let inputs: Vec<(String, Vec<String>)> = r2
                        .members
                        .iter()
                        .map(|m| {
                            let ds = decode_subscription(&m.metadata);
                            (m.member_id.clone(), ds.topics)
                        })
                        .collect();
                    crate::assignor::range::assign(inputs, &topic_partitions)
                }
                Assignor::CooperativeSticky => {
                    let inputs: Vec<crate::assignor::cooperative_sticky::MemberInput> = r2
                        .members
                        .iter()
                        .map(|m| {
                            let ds = decode_subscription(&m.metadata);
                            (m.member_id.clone(), ds.topics, ds.owned, ds.generation_id)
                        })
                        .collect();
                    crate::assignor::cooperative_sticky::assign(&inputs, &topic_partitions)
                }
            };
            assignments
                .into_iter()
                .map(|(m, partitions)| SyncGroupRequestAssignment {
                    member_id: m,
                    assignment: encode_assignment(&partitions),
                    ..Default::default()
                })
                .collect()
        } else {
            Vec::new()
        };

        // 4. SyncGroup — leader installs assignments; everyone receives
        //    their own assignment in the response. Retry a cold coordinator.
        let r3 = with_coordinator_retry(
            COORDINATOR_RETRY_TIMEOUT,
            |r: &SyncGroupResponse| r.error_code,
            || {
                let group_id = group_id.clone();
                let protocol_name = protocol_name.clone();
                let member_id = member_id.clone();
                let assignments_for_sync = assignments_for_sync.clone();
                let generation_id = r2.generation_id;
                let client = &client;
                async move {
                    client
                        .send(SyncGroupRequest {
                            group_id,
                            generation_id,
                            member_id,
                            protocol_type: Some("consumer".into()),
                            protocol_name: Some(protocol_name),
                            assignments: assignments_for_sync,
                            ..Default::default()
                        })
                        .await
                        .map_err(ConsumerError::from)
                }
            },
        )
        .await?;
        if r3.error_code != 0 {
            return Err(ConsumerError::Server(r3.error_code));
        }
        let assigned_partitions = decode_assignment(&r3.assignment);

        // 5. Fetch existing committed offsets so poll() resumes correctly.
        let mut next_offsets: HashMap<(String, i32), i64> = HashMap::new();
        let mut positions: HashMap<(String, i32), crate::position::PartitionPosition> =
            HashMap::new();
        if !assigned_partitions.is_empty() {
            let mut by_topic: HashMap<String, Vec<i32>> = HashMap::new();
            for (t, p) in &assigned_partitions {
                by_topic.entry(t.clone()).or_default().push(*p);
            }
            let of = client
                .send(crate::offset_wire::build_offset_fetch(
                    &group_id, &by_topic, &topic_ids,
                ))
                .await?;
            let id_to_name = crate::offset_wire::id_to_name(&topic_ids);
            for (name, partition_index, committed, committed_epoch) in
                crate::offset_wire::parse_offset_fetch(&of, &id_to_name)
            {
                let starting = if committed >= 0 {
                    committed
                } else {
                    match auto_offset_reset {
                        AutoOffsetReset::Earliest => 0,
                        // Resolved by poll() on first call.
                        AutoOffsetReset::Latest | AutoOffsetReset::None => i64::MAX,
                    }
                };
                next_offsets.insert((name.clone(), partition_index), starting);
                positions.insert(
                    (name, partition_index),
                    crate::position::PartitionPosition {
                        offset_epoch: committed_epoch,
                        ..Default::default()
                    },
                );
            }
        }

        // 6. Spawn the coordinator task (heartbeat + rebalance loop) on its
        //    own connection.
        //
        //    The broker processes requests serially per TCP connection: a
        //    JoinGroup parked in the rebalance-join purgatory (up to
        //    INITIAL_REBALANCE_DELAY per round, and cooperative rounds
        //    cascade) blocks every later request on that same socket. If the
        //    coordinator shared `poll()`'s connection, a `Fetch` issued
        //    mid-rebalance would head-of-line-block behind the parked
        //    JoinGroup and stall until the client request timeout. A dedicated
        //    coordinator connection keeps the data path (`poll`/commit)
        //    independent of the group-protocol path. (The JVM client never
        //    hits this because real brokers serve a connection's requests
        //    concurrently.)
        let coordinator_client = Client::builder()
            .bootstrap(&bootstrap)
            .client_id(client_id.clone())
            .maybe_security(security.clone())
            .build()
            .await?;

        let assigned = Arc::new(Mutex::new(assigned_partitions));
        let next_offsets = Arc::new(Mutex::new(next_offsets));
        let positions = Arc::new(Mutex::new(positions));
        let topic_ids = Arc::new(Mutex::new(topic_ids));

        let shutdown = CancellationToken::new();
        let state = CoordinatorState {
            client: coordinator_client,
            group_id: group_id.clone(),
            member_id: member_id.clone(),
            generation_id: r2.generation_id,
            assignor,
            subscribed_topics: subscribe.clone(),
            assigned: Arc::clone(&assigned),
            next_offsets: Arc::clone(&next_offsets),
            positions: Arc::clone(&positions),
            topic_ids: Arc::clone(&topic_ids),
            session_timeout,
            rebalance_timeout,
            heartbeat_interval,
            auto_offset_reset,
            client_rack: client_rack.clone(),
        };
        let coord_handle = tokio::spawn(crate::coordinator::run(state, shutdown.clone()));

        Ok(Consumer {
            client,
            group_id,
            member_id,
            generation_id: r2.generation_id,
            subscribed_topics: subscribe,
            assigned,
            next_offsets,
            positions,
            topic_ids,
            session_timeout,
            heartbeat_interval,
            assignor,
            coordinator_shutdown: shutdown,
            coordinator_handle: Some(coord_handle),
            isolation_level,
            auto_offset_reset,
        })
    }
}

impl Consumer {
    /// The consumer's group id.
    #[must_use]
    pub fn group_id(&self) -> &str {
        &self.group_id
    }

    /// The member id assigned by the coordinator at join time.
    #[must_use]
    pub fn member_id(&self) -> &str {
        &self.member_id
    }

    /// The generation id captured at the most recent successful join.
    #[must_use]
    pub fn generation_id(&self) -> i32 {
        self.generation_id
    }

    /// KIP-447 group metadata to hand to a transactional producer's
    /// `send_offsets_to_transaction`. The generation id is the value captured
    /// at the most recent successful join (the field is not kept in sync as
    /// the coordinator rejoins — see [`Self::generation_id`]); for a stable
    /// single-member group this equals the coordinator's live generation.
    /// `group_instance_id` is always `None` — the consumer has no
    /// static-membership support yet.
    #[must_use]
    pub fn group_metadata(&self) -> ConsumerGroupMetadata {
        ConsumerGroupMetadata {
            group_id: self.group_id.clone(),
            generation_id: self.generation_id,
            member_id: self.member_id.clone(),
            group_instance_id: None,
        }
    }

    /// Topics this consumer subscribed to at build time.
    #[must_use]
    pub fn subscribed_topics(&self) -> &[String] {
        &self.subscribed_topics
    }

    /// Snapshot of currently assigned `(topic, partition)` pairs.
    pub async fn assignment(&self) -> Vec<(String, i32)> {
        self.assigned.lock().await.clone()
    }

    /// Stop the coordinator task so the broker evicts this member promptly.
    ///
    /// The coordinator itself sends a best-effort `LeaveGroup` as the last
    /// thing it does on shutdown (see `crate::coordinator::run`), using its
    /// *live* `member_id`. That id can differ from the one captured at build
    /// time — a from-scratch rejoin (`UNKNOWN_MEMBER_ID`) replaces it — so the
    /// leave must come from the coordinator, which owns the current value;
    /// sending it here with `self.member_id` would silently leave a stale id
    /// and orphan the real member until its session expires. Cancel + join is
    /// prompt because the coordinator races its in-tick RPCs against the
    /// shutdown token.
    pub async fn close(mut self) -> Result<(), ConsumerError> {
        self.coordinator_shutdown.cancel();
        if let Some(h) = self.coordinator_handle.take() {
            let _ = h.await;
        }
        Ok(())
    }
}

#[cfg(test)]
mod security_arg_tests {
    use super::*;
    use assert2::assert;
    use crabka_client_core::security::{ClientSecurity, SaslCredentials};
    use crabka_security::ListenerProtocol;

    #[tokio::test]
    async fn consumer_builder_accepts_security() {
        let security = ClientSecurity {
            protocol: ListenerProtocol::SaslPlaintext,
            tls: None,
            sasl: Some(SaslCredentials::Plain {
                username: "u".into(),
                password: "p".into(),
            }),
            sasl_host: None,
        };
        // 127.0.0.1:1 is unroutable; the consumer build connects eagerly
        // (JoinGroup), so it must fail — proving the security arg is
        // threaded (not a type error).
        let res = Consumer::builder()
            .bootstrap("127.0.0.1:1")
            .group_id("g")
            .subscribe(vec!["t".to_string()])
            .security(security)
            .build()
            .await;
        assert!(res.is_err(), "connect to closed port must fail");
    }
}