1use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Duration;
8
9use bytes::Bytes;
10use tokio::sync::Mutex;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13
14use crabka_client_core::Client;
15use crabka_protocol::owned::join_group_request::{JoinGroupRequest, JoinGroupRequestProtocol};
16use crabka_protocol::owned::join_group_response::JoinGroupResponse;
17use crabka_protocol::owned::sync_group_request::{SyncGroupRequest, SyncGroupRequestAssignment};
18use crabka_protocol::owned::sync_group_response::SyncGroupResponse;
19use crabka_protocol::primitives::uuid::Uuid as WireUuid;
20
21use crate::assignor::Assignor;
22use crate::builder::{
23 AutoOffsetReset, IsolationLevel, decode_assignment, decode_subscription, encode_assignment,
24 encode_subscription,
25};
26use crate::coordinator::{COORDINATOR_RETRY_TIMEOUT, CoordinatorState, with_coordinator_retry};
27use crate::error::ConsumerError;
28use crate::group_metadata::ConsumerGroupMetadata;
29
30#[allow(dead_code)] pub struct Consumer {
35 pub(crate) client: Client,
36 pub(crate) group_id: String,
37 pub(crate) member_id: String,
38 pub(crate) generation_id: i32,
40 pub(crate) subscribed_topics: Vec<String>,
41 pub(crate) assigned: Arc<Mutex<Vec<(String, i32)>>>,
43 pub(crate) next_offsets: Arc<Mutex<HashMap<(String, i32), i64>>>,
45 pub(crate) positions: Arc<Mutex<HashMap<(String, i32), crate::position::PartitionPosition>>>,
47 pub(crate) topic_ids: Arc<Mutex<HashMap<String, WireUuid>>>,
50 pub(crate) session_timeout: Duration,
51 pub(crate) heartbeat_interval: Duration,
52 #[allow(dead_code)]
53 pub(crate) assignor: Assignor,
54 pub(crate) coordinator_shutdown: CancellationToken,
55 pub(crate) coordinator_handle: Option<JoinHandle<()>>,
56 pub(crate) isolation_level: IsolationLevel,
58 pub(crate) auto_offset_reset: AutoOffsetReset,
62}
63
64#[derive(Debug, Clone)]
66pub struct ConsumerRecord {
67 pub topic: String,
68 pub partition: i32,
69 pub offset: i64,
70 pub leader_epoch: i32,
71 pub timestamp: i64,
72 pub key: Option<Bytes>,
73 pub value: Option<Bytes>,
74}
75
76#[bon::bon]
77impl Consumer {
78 #[builder(start_fn = builder, finish_fn = build)]
83 #[allow(clippy::too_many_lines)]
84 pub async fn start(
85 #[builder(into)] bootstrap: String,
86 #[builder(into, default = "crabka-consumer".to_string())] client_id: String,
87 #[builder(into)] group_id: String,
88 #[builder(default = std::time::Duration::from_secs(45))]
89 session_timeout: std::time::Duration,
90 #[builder(default = std::time::Duration::from_mins(1))]
91 rebalance_timeout: std::time::Duration,
92 #[builder(default = std::time::Duration::from_secs(3))]
93 heartbeat_interval: std::time::Duration,
94 #[builder(into)] subscribe: Vec<String>,
95 #[builder(default = AutoOffsetReset::Latest)] auto_offset_reset: AutoOffsetReset,
96 #[builder(default = IsolationLevel::ReadUncommitted)] isolation_level: IsolationLevel,
97 #[builder(default = Assignor::Range)] assignor: Assignor,
98 #[builder(into)] client_rack: Option<String>,
99 security: Option<crabka_client_core::security::ClientSecurity>,
100 ) -> Result<Self, ConsumerError> {
101 if subscribe.is_empty() {
102 return Err(ConsumerError::NotSubscribed);
103 }
104 if group_id.is_empty() {
105 return Err(ConsumerError::RebalanceFailed("group_id required".into()));
106 }
107
108 let client = Client::builder()
109 .bootstrap(&bootstrap)
110 .client_id(client_id.clone())
111 .maybe_security(security.clone())
112 .build()
113 .await?;
114
115 let session_timeout_ms = i32::try_from(session_timeout.as_millis()).unwrap_or(i32::MAX);
116 let rebalance_timeout_ms = i32::try_from(rebalance_timeout.as_millis()).unwrap_or(i32::MAX);
117
118 let subscription_bytes = encode_subscription(&subscribe, &[], -1, client_rack.as_deref());
122 let protocol_name = assignor.protocol_name().to_string();
123
124 let r1 = with_coordinator_retry(
128 COORDINATOR_RETRY_TIMEOUT,
129 |r: &JoinGroupResponse| r.error_code,
130 || {
131 let group_id = group_id.clone();
132 let protocol_name = protocol_name.clone();
133 let subscription_bytes = subscription_bytes.clone();
134 let client = &client;
135 async move {
136 client
137 .send(JoinGroupRequest {
138 group_id,
139 protocol_type: "consumer".into(),
140 member_id: String::new(),
141 session_timeout_ms,
142 rebalance_timeout_ms,
143 protocols: vec![JoinGroupRequestProtocol {
144 name: protocol_name,
145 metadata: subscription_bytes,
146 ..Default::default()
147 }],
148 ..Default::default()
149 })
150 .await
151 .map_err(ConsumerError::from)
152 }
153 },
154 )
155 .await?;
156 let member_id = if r1.error_code == 79 || r1.error_code == 0 {
157 r1.member_id.clone()
158 } else {
159 return Err(ConsumerError::Server(r1.error_code));
160 };
161 if member_id.is_empty() {
162 return Err(ConsumerError::RebalanceFailed(
163 "broker did not assign a member_id".into(),
164 ));
165 }
166
167 let r2 = with_coordinator_retry(
169 COORDINATOR_RETRY_TIMEOUT,
170 |r: &JoinGroupResponse| r.error_code,
171 || {
172 let group_id = group_id.clone();
173 let protocol_name = protocol_name.clone();
174 let subscription_bytes = subscription_bytes.clone();
175 let member_id = member_id.clone();
176 let client = &client;
177 async move {
178 client
179 .send(JoinGroupRequest {
180 group_id,
181 protocol_type: "consumer".into(),
182 member_id,
183 session_timeout_ms,
184 rebalance_timeout_ms,
185 protocols: vec![JoinGroupRequestProtocol {
186 name: protocol_name,
187 metadata: subscription_bytes,
188 ..Default::default()
189 }],
190 ..Default::default()
191 })
192 .await
193 .map_err(ConsumerError::from)
194 }
195 },
196 )
197 .await?;
198 if r2.error_code != 0 {
199 return Err(ConsumerError::Server(r2.error_code));
200 }
201
202 let md = client.refresh_metadata().await?;
210 let mut topic_ids: HashMap<String, WireUuid> = HashMap::new();
211 let mut topic_partitions: HashMap<String, i32> = HashMap::new();
212 for t in &md.topics {
213 let Some(name) = &t.name else { continue };
214 if subscribe.iter().any(|s| s == name) {
215 let count = i32::try_from(t.partitions.len()).unwrap_or(i32::MAX);
216 topic_partitions.insert(name.clone(), count);
217 topic_ids.insert(name.clone(), t.topic_id);
218 }
219 }
220
221 let is_leader = r2.leader == member_id;
222 let assignments_for_sync: Vec<SyncGroupRequestAssignment> = if is_leader {
223 let assignments = match assignor {
224 Assignor::Range => {
225 let inputs: Vec<(String, Vec<String>)> = r2
226 .members
227 .iter()
228 .map(|m| {
229 let ds = decode_subscription(&m.metadata);
230 (m.member_id.clone(), ds.topics)
231 })
232 .collect();
233 crate::assignor::range::assign(inputs, &topic_partitions)
234 }
235 Assignor::CooperativeSticky => {
236 let inputs: Vec<crate::assignor::cooperative_sticky::MemberInput> = r2
237 .members
238 .iter()
239 .map(|m| {
240 let ds = decode_subscription(&m.metadata);
241 (m.member_id.clone(), ds.topics, ds.owned, ds.generation_id)
242 })
243 .collect();
244 crate::assignor::cooperative_sticky::assign(&inputs, &topic_partitions)
245 }
246 };
247 assignments
248 .into_iter()
249 .map(|(m, partitions)| SyncGroupRequestAssignment {
250 member_id: m,
251 assignment: encode_assignment(&partitions),
252 ..Default::default()
253 })
254 .collect()
255 } else {
256 Vec::new()
257 };
258
259 let r3 = with_coordinator_retry(
262 COORDINATOR_RETRY_TIMEOUT,
263 |r: &SyncGroupResponse| r.error_code,
264 || {
265 let group_id = group_id.clone();
266 let protocol_name = protocol_name.clone();
267 let member_id = member_id.clone();
268 let assignments_for_sync = assignments_for_sync.clone();
269 let generation_id = r2.generation_id;
270 let client = &client;
271 async move {
272 client
273 .send(SyncGroupRequest {
274 group_id,
275 generation_id,
276 member_id,
277 protocol_type: Some("consumer".into()),
278 protocol_name: Some(protocol_name),
279 assignments: assignments_for_sync,
280 ..Default::default()
281 })
282 .await
283 .map_err(ConsumerError::from)
284 }
285 },
286 )
287 .await?;
288 if r3.error_code != 0 {
289 return Err(ConsumerError::Server(r3.error_code));
290 }
291 let assigned_partitions = decode_assignment(&r3.assignment);
292
293 let mut next_offsets: HashMap<(String, i32), i64> = HashMap::new();
295 let mut positions: HashMap<(String, i32), crate::position::PartitionPosition> =
296 HashMap::new();
297 if !assigned_partitions.is_empty() {
298 let mut by_topic: HashMap<String, Vec<i32>> = HashMap::new();
299 for (t, p) in &assigned_partitions {
300 by_topic.entry(t.clone()).or_default().push(*p);
301 }
302 let of = client
303 .send(crate::offset_wire::build_offset_fetch(
304 &group_id, &by_topic, &topic_ids,
305 ))
306 .await?;
307 let id_to_name = crate::offset_wire::id_to_name(&topic_ids);
308 for (name, partition_index, committed, committed_epoch) in
309 crate::offset_wire::parse_offset_fetch(&of, &id_to_name)
310 {
311 let starting = if committed >= 0 {
312 committed
313 } else {
314 match auto_offset_reset {
315 AutoOffsetReset::Earliest => 0,
316 AutoOffsetReset::Latest | AutoOffsetReset::None => i64::MAX,
318 }
319 };
320 next_offsets.insert((name.clone(), partition_index), starting);
321 positions.insert(
322 (name, partition_index),
323 crate::position::PartitionPosition {
324 offset_epoch: committed_epoch,
325 ..Default::default()
326 },
327 );
328 }
329 }
330
331 let coordinator_client = Client::builder()
346 .bootstrap(&bootstrap)
347 .client_id(client_id.clone())
348 .maybe_security(security.clone())
349 .build()
350 .await?;
351
352 let assigned = Arc::new(Mutex::new(assigned_partitions));
353 let next_offsets = Arc::new(Mutex::new(next_offsets));
354 let positions = Arc::new(Mutex::new(positions));
355 let topic_ids = Arc::new(Mutex::new(topic_ids));
356
357 let shutdown = CancellationToken::new();
358 let state = CoordinatorState {
359 client: coordinator_client,
360 group_id: group_id.clone(),
361 member_id: member_id.clone(),
362 generation_id: r2.generation_id,
363 assignor,
364 subscribed_topics: subscribe.clone(),
365 assigned: Arc::clone(&assigned),
366 next_offsets: Arc::clone(&next_offsets),
367 positions: Arc::clone(&positions),
368 topic_ids: Arc::clone(&topic_ids),
369 session_timeout,
370 rebalance_timeout,
371 heartbeat_interval,
372 auto_offset_reset,
373 client_rack: client_rack.clone(),
374 };
375 let coord_handle = tokio::spawn(crate::coordinator::run(state, shutdown.clone()));
376
377 Ok(Consumer {
378 client,
379 group_id,
380 member_id,
381 generation_id: r2.generation_id,
382 subscribed_topics: subscribe,
383 assigned,
384 next_offsets,
385 positions,
386 topic_ids,
387 session_timeout,
388 heartbeat_interval,
389 assignor,
390 coordinator_shutdown: shutdown,
391 coordinator_handle: Some(coord_handle),
392 isolation_level,
393 auto_offset_reset,
394 })
395 }
396}
397
398impl Consumer {
399 #[must_use]
401 pub fn group_id(&self) -> &str {
402 &self.group_id
403 }
404
405 #[must_use]
407 pub fn member_id(&self) -> &str {
408 &self.member_id
409 }
410
411 #[must_use]
413 pub fn generation_id(&self) -> i32 {
414 self.generation_id
415 }
416
417 #[must_use]
425 pub fn group_metadata(&self) -> ConsumerGroupMetadata {
426 ConsumerGroupMetadata {
427 group_id: self.group_id.clone(),
428 generation_id: self.generation_id,
429 member_id: self.member_id.clone(),
430 group_instance_id: None,
431 }
432 }
433
434 #[must_use]
436 pub fn subscribed_topics(&self) -> &[String] {
437 &self.subscribed_topics
438 }
439
440 pub async fn assignment(&self) -> Vec<(String, i32)> {
442 self.assigned.lock().await.clone()
443 }
444
445 pub async fn close(mut self) -> Result<(), ConsumerError> {
457 self.coordinator_shutdown.cancel();
458 if let Some(h) = self.coordinator_handle.take() {
459 let _ = h.await;
460 }
461 Ok(())
462 }
463}
464
465#[cfg(test)]
466mod security_arg_tests {
467 use super::*;
468 use assert2::assert;
469 use crabka_client_core::security::{ClientSecurity, SaslCredentials};
470 use crabka_security::ListenerProtocol;
471
472 #[tokio::test]
473 async fn consumer_builder_accepts_security() {
474 let security = ClientSecurity {
475 protocol: ListenerProtocol::SaslPlaintext,
476 tls: None,
477 sasl: Some(SaslCredentials::Plain {
478 username: "u".into(),
479 password: "p".into(),
480 }),
481 sasl_host: None,
482 };
483 let res = Consumer::builder()
487 .bootstrap("127.0.0.1:1")
488 .group_id("g")
489 .subscribe(vec!["t".to_string()])
490 .security(security)
491 .build()
492 .await;
493 assert!(res.is_err(), "connect to closed port must fail");
494 }
495}