Skip to main content

crabka_client_consumer/
consumer.rs

1//! `Consumer` — public lifecycle handle. Built via [`Consumer::builder`].
2//! Subscribe-only — no `assign()`. Use `crabka-client-core` directly for
3//! manual partition consumption.
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Duration;
8
9use bytes::Bytes;
10use tokio::sync::Mutex;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13
14use crabka_client_core::Client;
15use crabka_protocol::owned::join_group_request::{JoinGroupRequest, JoinGroupRequestProtocol};
16use crabka_protocol::owned::join_group_response::JoinGroupResponse;
17use crabka_protocol::owned::sync_group_request::{SyncGroupRequest, SyncGroupRequestAssignment};
18use crabka_protocol::owned::sync_group_response::SyncGroupResponse;
19use crabka_protocol::primitives::uuid::Uuid as WireUuid;
20
21use crate::assignor::Assignor;
22use crate::builder::{
23    AutoOffsetReset, IsolationLevel, decode_assignment, decode_subscription, encode_assignment,
24    encode_subscription,
25};
26use crate::coordinator::{COORDINATOR_RETRY_TIMEOUT, CoordinatorState, with_coordinator_retry};
27use crate::error::ConsumerError;
28use crate::group_metadata::ConsumerGroupMetadata;
29
30/// Subscribe-style consumer handle. Construct via [`Consumer::builder`].
31#[allow(dead_code)] // `session_timeout` / `heartbeat_interval` / `generation_id`
32// are captured for diagnostics; the live values are owned
33// by the coordinator task post-start.
34pub struct Consumer {
35    pub(crate) client: Client,
36    pub(crate) group_id: String,
37    pub(crate) member_id: String,
38    /// Captured at start; not kept in sync as the coordinator rejoins.
39    pub(crate) generation_id: i32,
40    pub(crate) subscribed_topics: Vec<String>,
41    /// Current assigned partitions: `(topic, partition_index)`.
42    pub(crate) assigned: Arc<Mutex<Vec<(String, i32)>>>,
43    /// Next offset to fetch per partition.
44    pub(crate) next_offsets: Arc<Mutex<HashMap<(String, i32), i64>>>,
45    /// KIP-320 per-partition leader-epoch metadata, keyed like `next_offsets`.
46    pub(crate) positions: Arc<Mutex<HashMap<(String, i32), crate::position::PartitionPosition>>>,
47    /// Topic UUIDs resolved at build time. Required by Fetch v ≥ 13
48    /// (which carries `topic_id` instead of the topic name).
49    pub(crate) topic_ids: Arc<Mutex<HashMap<String, WireUuid>>>,
50    pub(crate) session_timeout: Duration,
51    pub(crate) heartbeat_interval: Duration,
52    #[allow(dead_code)]
53    pub(crate) assignor: Assignor,
54    pub(crate) coordinator_shutdown: CancellationToken,
55    pub(crate) coordinator_handle: Option<JoinHandle<()>>,
56    /// Controls which records are returned by `poll`.
57    pub(crate) isolation_level: IsolationLevel,
58    /// What `poll` does on a missing offset / detected truncation. `None`
59    /// surfaces `ConsumerError::LogTruncation`; otherwise the safe offset is
60    /// applied (KIP-320).
61    pub(crate) auto_offset_reset: AutoOffsetReset,
62}
63
64/// One record returned by `Consumer::poll`.
65#[derive(Debug, Clone)]
66pub struct ConsumerRecord {
67    pub topic: String,
68    pub partition: i32,
69    pub offset: i64,
70    pub leader_epoch: i32,
71    pub timestamp: i64,
72    pub key: Option<Bytes>,
73    pub value: Option<Bytes>,
74}
75
76#[bon::bon]
77impl Consumer {
78    /// Build a [`Consumer`] subscribed to the given topics: resolve bootstrap,
79    /// `JoinGroup` (twice), compute the assignment if we're the elected
80    /// leader, `SyncGroup`, prime offsets, then spawn the coordinator task
81    /// that owns the heartbeat + rebalance loop.
82    #[builder(start_fn = builder, finish_fn = build)]
83    #[allow(clippy::too_many_lines)]
84    pub async fn start(
85        #[builder(into)] bootstrap: String,
86        #[builder(into, default = "crabka-consumer".to_string())] client_id: String,
87        #[builder(into)] group_id: String,
88        #[builder(default = std::time::Duration::from_secs(45))]
89        session_timeout: std::time::Duration,
90        #[builder(default = std::time::Duration::from_mins(1))]
91        rebalance_timeout: std::time::Duration,
92        #[builder(default = std::time::Duration::from_secs(3))]
93        heartbeat_interval: std::time::Duration,
94        #[builder(into)] subscribe: Vec<String>,
95        #[builder(default = AutoOffsetReset::Latest)] auto_offset_reset: AutoOffsetReset,
96        #[builder(default = IsolationLevel::ReadUncommitted)] isolation_level: IsolationLevel,
97        #[builder(default = Assignor::Range)] assignor: Assignor,
98        #[builder(into)] client_rack: Option<String>,
99        security: Option<crabka_client_core::security::ClientSecurity>,
100    ) -> Result<Self, ConsumerError> {
101        if subscribe.is_empty() {
102            return Err(ConsumerError::NotSubscribed);
103        }
104        if group_id.is_empty() {
105            return Err(ConsumerError::RebalanceFailed("group_id required".into()));
106        }
107
108        let client = Client::builder()
109            .bootstrap(&bootstrap)
110            .client_id(client_id.clone())
111            .maybe_security(security.clone())
112            .build()
113            .await?;
114
115        let session_timeout_ms = i32::try_from(session_timeout.as_millis()).unwrap_or(i32::MAX);
116        let rebalance_timeout_ms = i32::try_from(rebalance_timeout.as_millis()).unwrap_or(i32::MAX);
117
118        // First JoinGroup uses empty `owned_partitions` + `generation_id=-1`:
119        // we've never been in the group before, so we have nothing to claim
120        // and no prior generation to defend against zombie ownership.
121        let subscription_bytes = encode_subscription(&subscribe, &[], -1, client_rack.as_deref());
122        let protocol_name = assignor.protocol_name().to_string();
123
124        // 1. First JoinGroup — empty member_id, expect MEMBER_ID_REQUIRED (79)
125        //    or a regular response; either way the broker hands us a member_id.
126        //    Retry a cold/relocating coordinator (14/15/16) with backoff.
127        let r1 = with_coordinator_retry(
128            COORDINATOR_RETRY_TIMEOUT,
129            |r: &JoinGroupResponse| r.error_code,
130            || {
131                let group_id = group_id.clone();
132                let protocol_name = protocol_name.clone();
133                let subscription_bytes = subscription_bytes.clone();
134                let client = &client;
135                async move {
136                    client
137                        .send(JoinGroupRequest {
138                            group_id,
139                            protocol_type: "consumer".into(),
140                            member_id: String::new(),
141                            session_timeout_ms,
142                            rebalance_timeout_ms,
143                            protocols: vec![JoinGroupRequestProtocol {
144                                name: protocol_name,
145                                metadata: subscription_bytes,
146                                ..Default::default()
147                            }],
148                            ..Default::default()
149                        })
150                        .await
151                        .map_err(ConsumerError::from)
152                }
153            },
154        )
155        .await?;
156        let member_id = if r1.error_code == 79 || r1.error_code == 0 {
157            r1.member_id.clone()
158        } else {
159            return Err(ConsumerError::Server(r1.error_code));
160        };
161        if member_id.is_empty() {
162            return Err(ConsumerError::RebalanceFailed(
163                "broker did not assign a member_id".into(),
164            ));
165        }
166
167        // 2. Second JoinGroup with the assigned member_id.
168        let r2 = with_coordinator_retry(
169            COORDINATOR_RETRY_TIMEOUT,
170            |r: &JoinGroupResponse| r.error_code,
171            || {
172                let group_id = group_id.clone();
173                let protocol_name = protocol_name.clone();
174                let subscription_bytes = subscription_bytes.clone();
175                let member_id = member_id.clone();
176                let client = &client;
177                async move {
178                    client
179                        .send(JoinGroupRequest {
180                            group_id,
181                            protocol_type: "consumer".into(),
182                            member_id,
183                            session_timeout_ms,
184                            rebalance_timeout_ms,
185                            protocols: vec![JoinGroupRequestProtocol {
186                                name: protocol_name,
187                                metadata: subscription_bytes,
188                                ..Default::default()
189                            }],
190                            ..Default::default()
191                        })
192                        .await
193                        .map_err(ConsumerError::from)
194                }
195            },
196        )
197        .await?;
198        if r2.error_code != 0 {
199            return Err(ConsumerError::Server(r2.error_code));
200        }
201
202        // 3. Always issue a Metadata to resolve topic_ids (needed for
203        //    Fetch v ≥ 13). If we are the leader, also use the partition
204        //    counts to compute the assignment.
205        //    `refresh_metadata` (not a bare `send`) so the main client's
206        //    BrokerPool learns each broker's (id → addr) mapping up front,
207        //    letting `poll`/`validate` route to partition leaders immediately
208        //    rather than waiting for the first `refresh_leader_epochs` pass.
209        let md = client.refresh_metadata().await?;
210        let mut topic_ids: HashMap<String, WireUuid> = HashMap::new();
211        let mut topic_partitions: HashMap<String, i32> = HashMap::new();
212        for t in &md.topics {
213            let Some(name) = &t.name else { continue };
214            if subscribe.iter().any(|s| s == name) {
215                let count = i32::try_from(t.partitions.len()).unwrap_or(i32::MAX);
216                topic_partitions.insert(name.clone(), count);
217                topic_ids.insert(name.clone(), t.topic_id);
218            }
219        }
220
221        let is_leader = r2.leader == member_id;
222        let assignments_for_sync: Vec<SyncGroupRequestAssignment> = if is_leader {
223            let assignments = match assignor {
224                Assignor::Range => {
225                    let inputs: Vec<(String, Vec<String>)> = r2
226                        .members
227                        .iter()
228                        .map(|m| {
229                            let ds = decode_subscription(&m.metadata);
230                            (m.member_id.clone(), ds.topics)
231                        })
232                        .collect();
233                    crate::assignor::range::assign(inputs, &topic_partitions)
234                }
235                Assignor::CooperativeSticky => {
236                    let inputs: Vec<crate::assignor::cooperative_sticky::MemberInput> = r2
237                        .members
238                        .iter()
239                        .map(|m| {
240                            let ds = decode_subscription(&m.metadata);
241                            (m.member_id.clone(), ds.topics, ds.owned, ds.generation_id)
242                        })
243                        .collect();
244                    crate::assignor::cooperative_sticky::assign(&inputs, &topic_partitions)
245                }
246            };
247            assignments
248                .into_iter()
249                .map(|(m, partitions)| SyncGroupRequestAssignment {
250                    member_id: m,
251                    assignment: encode_assignment(&partitions),
252                    ..Default::default()
253                })
254                .collect()
255        } else {
256            Vec::new()
257        };
258
259        // 4. SyncGroup — leader installs assignments; everyone receives
260        //    their own assignment in the response. Retry a cold coordinator.
261        let r3 = with_coordinator_retry(
262            COORDINATOR_RETRY_TIMEOUT,
263            |r: &SyncGroupResponse| r.error_code,
264            || {
265                let group_id = group_id.clone();
266                let protocol_name = protocol_name.clone();
267                let member_id = member_id.clone();
268                let assignments_for_sync = assignments_for_sync.clone();
269                let generation_id = r2.generation_id;
270                let client = &client;
271                async move {
272                    client
273                        .send(SyncGroupRequest {
274                            group_id,
275                            generation_id,
276                            member_id,
277                            protocol_type: Some("consumer".into()),
278                            protocol_name: Some(protocol_name),
279                            assignments: assignments_for_sync,
280                            ..Default::default()
281                        })
282                        .await
283                        .map_err(ConsumerError::from)
284                }
285            },
286        )
287        .await?;
288        if r3.error_code != 0 {
289            return Err(ConsumerError::Server(r3.error_code));
290        }
291        let assigned_partitions = decode_assignment(&r3.assignment);
292
293        // 5. Fetch existing committed offsets so poll() resumes correctly.
294        let mut next_offsets: HashMap<(String, i32), i64> = HashMap::new();
295        let mut positions: HashMap<(String, i32), crate::position::PartitionPosition> =
296            HashMap::new();
297        if !assigned_partitions.is_empty() {
298            let mut by_topic: HashMap<String, Vec<i32>> = HashMap::new();
299            for (t, p) in &assigned_partitions {
300                by_topic.entry(t.clone()).or_default().push(*p);
301            }
302            let of = client
303                .send(crate::offset_wire::build_offset_fetch(
304                    &group_id, &by_topic, &topic_ids,
305                ))
306                .await?;
307            let id_to_name = crate::offset_wire::id_to_name(&topic_ids);
308            for (name, partition_index, committed, committed_epoch) in
309                crate::offset_wire::parse_offset_fetch(&of, &id_to_name)
310            {
311                let starting = if committed >= 0 {
312                    committed
313                } else {
314                    match auto_offset_reset {
315                        AutoOffsetReset::Earliest => 0,
316                        // Resolved by poll() on first call.
317                        AutoOffsetReset::Latest | AutoOffsetReset::None => i64::MAX,
318                    }
319                };
320                next_offsets.insert((name.clone(), partition_index), starting);
321                positions.insert(
322                    (name, partition_index),
323                    crate::position::PartitionPosition {
324                        offset_epoch: committed_epoch,
325                        ..Default::default()
326                    },
327                );
328            }
329        }
330
331        // 6. Spawn the coordinator task (heartbeat + rebalance loop) on its
332        //    own connection.
333        //
334        //    The broker processes requests serially per TCP connection: a
335        //    JoinGroup parked in the rebalance-join purgatory (up to
336        //    INITIAL_REBALANCE_DELAY per round, and cooperative rounds
337        //    cascade) blocks every later request on that same socket. If the
338        //    coordinator shared `poll()`'s connection, a `Fetch` issued
339        //    mid-rebalance would head-of-line-block behind the parked
340        //    JoinGroup and stall until the client request timeout. A dedicated
341        //    coordinator connection keeps the data path (`poll`/commit)
342        //    independent of the group-protocol path. (The JVM client never
343        //    hits this because real brokers serve a connection's requests
344        //    concurrently.)
345        let coordinator_client = Client::builder()
346            .bootstrap(&bootstrap)
347            .client_id(client_id.clone())
348            .maybe_security(security.clone())
349            .build()
350            .await?;
351
352        let assigned = Arc::new(Mutex::new(assigned_partitions));
353        let next_offsets = Arc::new(Mutex::new(next_offsets));
354        let positions = Arc::new(Mutex::new(positions));
355        let topic_ids = Arc::new(Mutex::new(topic_ids));
356
357        let shutdown = CancellationToken::new();
358        let state = CoordinatorState {
359            client: coordinator_client,
360            group_id: group_id.clone(),
361            member_id: member_id.clone(),
362            generation_id: r2.generation_id,
363            assignor,
364            subscribed_topics: subscribe.clone(),
365            assigned: Arc::clone(&assigned),
366            next_offsets: Arc::clone(&next_offsets),
367            positions: Arc::clone(&positions),
368            topic_ids: Arc::clone(&topic_ids),
369            session_timeout,
370            rebalance_timeout,
371            heartbeat_interval,
372            auto_offset_reset,
373            client_rack: client_rack.clone(),
374        };
375        let coord_handle = tokio::spawn(crate::coordinator::run(state, shutdown.clone()));
376
377        Ok(Consumer {
378            client,
379            group_id,
380            member_id,
381            generation_id: r2.generation_id,
382            subscribed_topics: subscribe,
383            assigned,
384            next_offsets,
385            positions,
386            topic_ids,
387            session_timeout,
388            heartbeat_interval,
389            assignor,
390            coordinator_shutdown: shutdown,
391            coordinator_handle: Some(coord_handle),
392            isolation_level,
393            auto_offset_reset,
394        })
395    }
396}
397
398impl Consumer {
399    /// The consumer's group id.
400    #[must_use]
401    pub fn group_id(&self) -> &str {
402        &self.group_id
403    }
404
405    /// The member id assigned by the coordinator at join time.
406    #[must_use]
407    pub fn member_id(&self) -> &str {
408        &self.member_id
409    }
410
411    /// The generation id captured at the most recent successful join.
412    #[must_use]
413    pub fn generation_id(&self) -> i32 {
414        self.generation_id
415    }
416
417    /// KIP-447 group metadata to hand to a transactional producer's
418    /// `send_offsets_to_transaction`. The generation id is the value captured
419    /// at the most recent successful join (the field is not kept in sync as
420    /// the coordinator rejoins — see [`Self::generation_id`]); for a stable
421    /// single-member group this equals the coordinator's live generation.
422    /// `group_instance_id` is always `None` — the consumer has no
423    /// static-membership support yet.
424    #[must_use]
425    pub fn group_metadata(&self) -> ConsumerGroupMetadata {
426        ConsumerGroupMetadata {
427            group_id: self.group_id.clone(),
428            generation_id: self.generation_id,
429            member_id: self.member_id.clone(),
430            group_instance_id: None,
431        }
432    }
433
434    /// Topics this consumer subscribed to at build time.
435    #[must_use]
436    pub fn subscribed_topics(&self) -> &[String] {
437        &self.subscribed_topics
438    }
439
440    /// Snapshot of currently assigned `(topic, partition)` pairs.
441    pub async fn assignment(&self) -> Vec<(String, i32)> {
442        self.assigned.lock().await.clone()
443    }
444
445    /// Stop the coordinator task so the broker evicts this member promptly.
446    ///
447    /// The coordinator itself sends a best-effort `LeaveGroup` as the last
448    /// thing it does on shutdown (see `crate::coordinator::run`), using its
449    /// *live* `member_id`. That id can differ from the one captured at build
450    /// time — a from-scratch rejoin (`UNKNOWN_MEMBER_ID`) replaces it — so the
451    /// leave must come from the coordinator, which owns the current value;
452    /// sending it here with `self.member_id` would silently leave a stale id
453    /// and orphan the real member until its session expires. Cancel + join is
454    /// prompt because the coordinator races its in-tick RPCs against the
455    /// shutdown token.
456    pub async fn close(mut self) -> Result<(), ConsumerError> {
457        self.coordinator_shutdown.cancel();
458        if let Some(h) = self.coordinator_handle.take() {
459            let _ = h.await;
460        }
461        Ok(())
462    }
463}
464
465#[cfg(test)]
466mod security_arg_tests {
467    use super::*;
468    use assert2::assert;
469    use crabka_client_core::security::{ClientSecurity, SaslCredentials};
470    use crabka_security::ListenerProtocol;
471
472    #[tokio::test]
473    async fn consumer_builder_accepts_security() {
474        let security = ClientSecurity {
475            protocol: ListenerProtocol::SaslPlaintext,
476            tls: None,
477            sasl: Some(SaslCredentials::Plain {
478                username: "u".into(),
479                password: "p".into(),
480            }),
481            sasl_host: None,
482        };
483        // 127.0.0.1:1 is unroutable; the consumer build connects eagerly
484        // (JoinGroup), so it must fail — proving the security arg is
485        // threaded (not a type error).
486        let res = Consumer::builder()
487            .bootstrap("127.0.0.1:1")
488            .group_id("g")
489            .subscribe(vec!["t".to_string()])
490            .security(security)
491            .build()
492            .await;
493        assert!(res.is_err(), "connect to closed port must fail");
494    }
495}