1use bytes::{Bytes, BytesMut};
4use kafka_protocol::messages::{
5 ApiKey, BrokerId, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse,
6 GroupId, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse,
7 OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse,
8 RequestHeader, ResponseHeader, TopicName, fetch_request::FetchPartition as KpFetchPartition,
9 fetch_request::FetchTopic as KpFetchTopic, list_offsets_request::ListOffsetsPartition,
10 list_offsets_request::ListOffsetsTopic, metadata_request::MetadataRequestTopic,
11 offset_commit_request::OffsetCommitRequestPartition,
12 offset_commit_request::OffsetCommitRequestTopic, offset_fetch_request::OffsetFetchRequestTopic,
13};
14use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion, StrBytes};
15use kafka_protocol::records::RecordBatchDecoder;
16use rustfs_kafka::client::SecurityConfig;
17use rustfs_kafka::consumer::{FetchOffset, MessageSets};
18use rustfs_kafka::error::{ConsumerError, Error, KafkaCode, ProtocolError, Result};
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21use std::time::Duration;
22use std::time::{SystemTime, UNIX_EPOCH};
23use tracing::debug;
24
25use crate::AsyncKafkaClient;
26use crate::connection::AsyncConnection;
27
28const API_VERSION_METADATA: i16 = 1;
29const API_VERSION_FETCH: i16 = 12;
30const API_VERSION_FIND_COORDINATOR: i16 = 3;
31const API_VERSION_OFFSET_COMMIT: i16 = 2;
32const API_VERSION_OFFSET_FETCH: i16 = 2;
33const API_VERSION_LIST_OFFSETS: i16 = 1;
34const DEFAULT_NATIVE_RETRY_ATTEMPTS: usize = 3;
35const DEFAULT_NATIVE_RETRY_BACKOFF_MS: u64 = 100;
36const DEFAULT_NATIVE_RECENT_ERROR_LIMIT: usize = 32;
37const FETCH_MIN_BYTES: i32 = 1;
38const FETCH_MAX_WAIT_MS: i32 = 100;
39const FETCH_PARTITION_MAX_BYTES: i32 = 1_048_576;
40
41struct NativeConsumer {
42 client: AsyncKafkaClient,
43 group: String,
44 topics: Vec<String>,
45 fallback_offset: FetchOffset,
46 offsets: HashMap<(String, i32), i64>,
47 dirty_offsets: HashMap<(String, i32), i64>,
48 leaders: HashMap<(String, i32), String>,
49 coordinator: Option<String>,
50 correlation: i32,
51 retry_attempts: usize,
52 retry_backoff: Duration,
53 observability: NativeConsumerObservability,
54}
55
56enum AsyncConsumerMode {
57 Native(Box<NativeConsumer>),
58}
59
60#[derive(Debug, Clone)]
62pub struct NativeConsumerErrorSnapshot {
63 pub phase: String,
64 pub class: String,
65 pub kafka_code: Option<KafkaCode>,
66 pub message: String,
67 pub timestamp_unix_ms: u128,
68}
69
70#[derive(Debug, Clone)]
72pub struct NativeConsumerErrorStats {
73 pub total_errors: u64,
74 pub kafka_code_counts: HashMap<String, u64>,
75 pub class_counts: HashMap<String, u64>,
76 pub last_error: Option<NativeConsumerErrorSnapshot>,
77 pub recent_errors: Vec<NativeConsumerErrorSnapshot>,
78}
79
80#[derive(Debug, Clone)]
81struct NativeConsumerObservability {
82 total_errors: u64,
83 kafka_code_counts: HashMap<String, u64>,
84 class_counts: HashMap<String, u64>,
85 last_error: Option<NativeConsumerErrorSnapshot>,
86 recent_errors: VecDeque<NativeConsumerErrorSnapshot>,
87 recent_error_limit: usize,
88}
89
90impl Default for NativeConsumerObservability {
91 fn default() -> Self {
92 Self::new(DEFAULT_NATIVE_RECENT_ERROR_LIMIT)
93 }
94}
95
96impl NativeConsumerObservability {
97 fn new(recent_error_limit: usize) -> Self {
98 Self {
99 total_errors: 0,
100 kafka_code_counts: HashMap::new(),
101 class_counts: HashMap::new(),
102 last_error: None,
103 recent_errors: VecDeque::new(),
104 recent_error_limit: recent_error_limit.max(1),
105 }
106 }
107
108 fn clear(&mut self) {
109 self.total_errors = 0;
110 self.kafka_code_counts.clear();
111 self.class_counts.clear();
112 self.last_error = None;
113 self.recent_errors.clear();
114 }
115}
116
117pub struct AsyncConsumer {
119 mode: AsyncConsumerMode,
120}
121
122pub struct AsyncConsumerBuilder {
124 hosts: Vec<String>,
125 group: Option<String>,
126 topics: Vec<String>,
127 security: Option<SecurityConfig>,
128 channel_capacity: usize,
129 native_async: bool,
130 fallback_offset: FetchOffset,
131 native_retry_attempts: usize,
132 native_retry_backoff: Duration,
133 native_recent_error_limit: usize,
134}
135
136impl AsyncConsumerBuilder {
137 #[must_use]
139 pub fn new(hosts: Vec<String>) -> Self {
140 Self {
141 hosts,
142 group: None,
143 topics: Vec::new(),
144 security: None,
145 channel_capacity: 64,
146 native_async: true,
147 fallback_offset: FetchOffset::Latest,
148 native_retry_attempts: DEFAULT_NATIVE_RETRY_ATTEMPTS,
149 native_retry_backoff: Duration::from_millis(DEFAULT_NATIVE_RETRY_BACKOFF_MS),
150 native_recent_error_limit: DEFAULT_NATIVE_RECENT_ERROR_LIMIT,
151 }
152 }
153
154 #[must_use]
156 pub fn with_group(mut self, group: String) -> Self {
157 self.group = Some(group);
158 self
159 }
160
161 #[must_use]
163 pub fn with_topic(mut self, topic: String) -> Self {
164 self.topics.push(topic);
165 self
166 }
167
168 #[must_use]
170 pub fn with_topics(mut self, topics: Vec<String>) -> Self {
171 self.topics.extend(topics);
172 self
173 }
174
175 #[must_use]
177 pub fn with_security(mut self, security: SecurityConfig) -> Self {
178 self.security = Some(security);
179 self
180 }
181
182 #[must_use]
184 pub fn with_channel_capacity(mut self, channel_capacity: usize) -> Self {
185 self.channel_capacity = channel_capacity.max(1);
186 self
187 }
188
189 #[must_use]
191 pub fn with_native_async(mut self, native_async: bool) -> Self {
192 self.native_async = native_async;
193 self
194 }
195
196 #[must_use]
198 pub fn with_fallback_offset(mut self, fallback_offset: FetchOffset) -> Self {
199 self.fallback_offset = fallback_offset;
200 self
201 }
202
203 #[must_use]
205 pub fn with_native_retry_attempts(mut self, attempts: usize) -> Self {
206 self.native_retry_attempts = attempts.max(1);
207 self
208 }
209
210 #[must_use]
212 pub fn with_native_retry_backoff(mut self, backoff: Duration) -> Self {
213 self.native_retry_backoff = backoff;
214 self
215 }
216
217 #[must_use]
219 pub fn with_native_recent_error_limit(mut self, limit: usize) -> Self {
220 self.native_recent_error_limit = limit.max(1);
221 self
222 }
223
224 pub async fn build(self) -> Result<AsyncConsumer> {
226 let AsyncConsumerBuilder {
227 hosts,
228 group,
229 topics,
230 security,
231 channel_capacity,
232 native_async,
233 fallback_offset,
234 native_retry_attempts,
235 native_retry_backoff,
236 native_recent_error_limit,
237 } = self;
238
239 let group = group.ok_or(Error::Consumer(ConsumerError::UnsetGroupId))?;
240 if topics.is_empty() {
241 return Err(Error::Consumer(ConsumerError::NoTopicsAssigned));
242 }
243
244 if !native_async {
245 debug!(
246 "AsyncConsumerBuilder::with_native_async(false) is ignored: consumer always uses native async I/O"
247 );
248 }
249 let _ = channel_capacity;
250 let client = AsyncKafkaClient::with_client_id_and_security(
251 hosts,
252 "rustfs-kafka-async".to_owned(),
253 security,
254 )
255 .await?;
256
257 Ok(AsyncConsumer {
258 mode: AsyncConsumerMode::Native(Box::new(NativeConsumer {
259 client,
260 group,
261 topics,
262 fallback_offset,
263 offsets: HashMap::new(),
264 dirty_offsets: HashMap::new(),
265 leaders: HashMap::new(),
266 coordinator: None,
267 correlation: 1,
268 retry_attempts: native_retry_attempts,
269 retry_backoff: native_retry_backoff,
270 observability: NativeConsumerObservability::new(native_recent_error_limit),
271 })),
272 })
273 }
274}
275
276impl AsyncConsumer {
277 #[must_use]
279 pub fn builder(hosts: Vec<String>) -> AsyncConsumerBuilder {
280 AsyncConsumerBuilder::new(hosts)
281 }
282
283 pub async fn from_hosts(
285 hosts: Vec<String>,
286 group: String,
287 topics: Vec<String>,
288 ) -> Result<Self> {
289 Self::builder(hosts)
290 .with_group(group)
291 .with_topics(topics)
292 .build()
293 .await
294 }
295
296 pub async fn from_client(
298 client: AsyncKafkaClient,
299 group: String,
300 topics: Vec<String>,
301 ) -> Result<Self> {
302 if group.is_empty() {
303 return Err(Error::Consumer(ConsumerError::UnsetGroupId));
304 }
305 if topics.is_empty() {
306 return Err(Error::Consumer(ConsumerError::NoTopicsAssigned));
307 }
308 Ok(Self {
309 mode: AsyncConsumerMode::Native(Box::new(NativeConsumer {
310 client,
311 group,
312 topics,
313 fallback_offset: FetchOffset::Latest,
314 offsets: HashMap::new(),
315 dirty_offsets: HashMap::new(),
316 leaders: HashMap::new(),
317 coordinator: None,
318 correlation: 1,
319 retry_attempts: DEFAULT_NATIVE_RETRY_ATTEMPTS,
320 retry_backoff: Duration::from_millis(DEFAULT_NATIVE_RETRY_BACKOFF_MS),
321 observability: NativeConsumerObservability::default(),
322 })),
323 })
324 }
325
326 pub async fn poll(&mut self) -> Result<MessageSets> {
328 match &mut self.mode {
329 AsyncConsumerMode::Native(native) => native.poll().await,
330 }
331 }
332
333 pub async fn commit(&mut self) -> Result<()> {
335 match &mut self.mode {
336 AsyncConsumerMode::Native(native) => native.commit().await,
337 }
338 }
339
340 pub async fn close(self) -> Result<()> {
342 Ok(())
343 }
344
345 #[must_use]
347 pub fn native_error_stats(&self) -> Option<NativeConsumerErrorStats> {
348 match &self.mode {
349 AsyncConsumerMode::Native(native) => Some(native.error_stats()),
350 }
351 }
352
353 pub fn reset_native_error_stats(&mut self) -> bool {
357 match &mut self.mode {
358 AsyncConsumerMode::Native(native) => {
359 native.reset_error_stats();
360 true
361 }
362 }
363 }
364}
365
366impl NativeConsumer {
367 async fn poll(&mut self) -> Result<MessageSets> {
368 for attempt in 1..=self.retry_attempts {
369 match self.poll_once().await {
370 Ok(data) => return Ok(data),
371 Err(err) if attempt < self.retry_attempts && should_retry_poll(&err) => {
372 self.record_error("poll", &err);
373 self.leaders.clear();
374 self.refresh_metadata().await?;
375 tokio::time::sleep(self.retry_backoff).await;
376 continue;
377 }
378 Err(err) => {
379 self.record_error("poll", &err);
380 return Err(err);
381 }
382 }
383 }
384 Err(Error::Kafka(KafkaCode::Unknown))
385 }
386
387 async fn poll_once(&mut self) -> Result<MessageSets> {
388 self.client.ensure_connected().await?;
389 if self.leaders.is_empty() {
390 self.refresh_metadata().await?;
391 }
392 self.ensure_start_offsets().await?;
393
394 let mut by_broker: HashMap<String, Vec<(String, i32, i64)>> = HashMap::new();
395 for (tp, leader_host) in &self.leaders {
396 let offset = *self.offsets.get(tp).unwrap_or(&0);
397 by_broker
398 .entry(leader_host.clone())
399 .or_default()
400 .push((tp.0.clone(), tp.1, offset));
401 }
402
403 let correlation = self.next_correlation();
404 let client_id = self.client.client_id().to_owned();
405 let mut owned_responses = Vec::new();
406
407 for (broker, tps) in by_broker {
408 let parts: Vec<(&str, i32, i64, i32)> = tps
409 .iter()
410 .map(|(topic, partition, offset)| {
411 (
412 topic.as_str(),
413 *partition,
414 *offset,
415 FETCH_PARTITION_MAX_BYTES,
416 )
417 })
418 .collect();
419
420 let conn = self.client.get_connection(&broker).await?;
421 let (header, request) = build_fetch_request(correlation, &client_id, &parts);
422 send_kp_request(conn, &header, &request, API_VERSION_FETCH).await?;
423 let response = get_fetch_response(conn, API_VERSION_FETCH).await?;
424 let owned = convert_fetch_response(response, correlation);
425 if let Some(code) = first_fetch_error_code(&owned) {
426 return Err(Error::Kafka(code));
427 }
428
429 self.advance_offsets(&owned);
430 owned_responses.push(owned);
431 }
432
433 Ok(MessageSets::from_fetch_responses(owned_responses))
434 }
435
436 fn next_correlation(&mut self) -> i32 {
437 let cid = self.correlation;
438 self.correlation = self.correlation.wrapping_add(1);
439 cid
440 }
441
442 fn advance_offsets(&mut self, resp: &rustfs_kafka::client::fetch_kp::OwnedFetchResponse) {
443 for topic in &resp.topics {
444 for partition in &topic.partitions {
445 if let Ok(data) = partition.data()
446 && let Some(last) = data.messages.last()
447 {
448 let next_offset = last.offset + 1;
449 let tp = (topic.topic.clone(), partition.partition);
450 self.offsets.insert(tp.clone(), next_offset);
451 self.dirty_offsets.insert(tp, next_offset);
452 }
453 }
454 }
455 }
456
457 async fn commit(&mut self) -> Result<()> {
458 for attempt in 1..=self.retry_attempts {
459 match self.commit_once().await {
460 Ok(()) => return Ok(()),
461 Err(err) if attempt < self.retry_attempts && should_retry_commit(&err) => {
462 self.record_error("commit", &err);
463 self.coordinator = None;
464 self.refresh_coordinator().await?;
465 tokio::time::sleep(self.retry_backoff).await;
466 continue;
467 }
468 Err(err) => {
469 self.record_error("commit", &err);
470 return Err(err);
471 }
472 }
473 }
474 Err(Error::Kafka(KafkaCode::Unknown))
475 }
476
477 async fn commit_once(&mut self) -> Result<()> {
478 if self.dirty_offsets.is_empty() {
479 return Ok(());
480 }
481
482 self.client.ensure_connected().await?;
483 if self.coordinator.is_none() {
484 self.refresh_coordinator().await?;
485 }
486 let Some(coordinator) = self.coordinator.clone() else {
487 return Err(Error::Kafka(KafkaCode::GroupCoordinatorNotAvailable));
488 };
489
490 let client_id = self.client.client_id().to_owned();
491 let correlation = self.next_correlation();
492 let payload: Vec<(&str, i32, i64)> = self
493 .dirty_offsets
494 .iter()
495 .map(|((topic, partition), offset)| (topic.as_str(), *partition, *offset))
496 .collect();
497
498 let conn = self.client.get_connection(&coordinator).await?;
499 let (header, request) =
500 build_offset_commit_request(correlation, &client_id, &self.group, &payload);
501 send_kp_request(conn, &header, &request, API_VERSION_OFFSET_COMMIT).await?;
502 let response =
503 get_kp_response::<OffsetCommitResponse>(conn, API_VERSION_OFFSET_COMMIT).await?;
504
505 for topic in response.topics {
506 for partition in topic.partitions {
507 if partition.error_code != 0 {
508 if let Some(code) = map_kafka_code(partition.error_code) {
509 return Err(Error::Kafka(code));
510 }
511 return Err(Error::Kafka(KafkaCode::Unknown));
512 }
513 }
514 }
515
516 self.dirty_offsets.clear();
517 Ok(())
518 }
519
520 async fn refresh_metadata(&mut self) -> Result<()> {
521 let request_host = if let Some(connected) = self.client.connected_hosts().first() {
522 (*connected).to_owned()
523 } else {
524 self.client
525 .bootstrap_hosts()
526 .first()
527 .cloned()
528 .ok_or_else(no_host_reachable_error)?
529 };
530
531 let correlation = self.next_correlation();
532 let client_id = self.client.client_id().to_owned();
533 let conn = self.client.get_connection(&request_host).await?;
534 let (header, request) = build_metadata_request(correlation, &client_id, &self.topics);
535 send_kp_request(conn, &header, &request, API_VERSION_METADATA).await?;
536 let response = get_kp_response::<MetadataResponse>(conn, API_VERSION_METADATA).await?;
537
538 let mut brokers: HashMap<i32, String> = HashMap::new();
539 for broker in response.brokers {
540 brokers.insert(
541 i32::from(broker.node_id),
542 format!("{}:{}", broker.host, broker.port),
543 );
544 }
545
546 self.leaders.clear();
547 for topic in response.topics {
548 let Some(topic_name) = topic.name else {
549 continue;
550 };
551 for partition in topic.partitions {
552 let leader = i32::from(partition.leader_id);
553 if leader < 0 {
554 continue;
555 }
556 if let Some(host) = brokers.get(&leader) {
557 let tp = (topic_name.to_string(), partition.partition_index);
558 self.leaders.insert(tp.clone(), host.clone());
559 self.offsets.entry(tp).or_insert(0);
560 }
561 }
562 }
563
564 if self.leaders.is_empty() {
565 return Err(Error::Kafka(KafkaCode::LeaderNotAvailable));
566 }
567
568 Ok(())
569 }
570
571 async fn refresh_coordinator(&mut self) -> Result<()> {
572 let request_host = if let Some(connected) = self.client.connected_hosts().first() {
573 (*connected).to_owned()
574 } else {
575 self.client
576 .bootstrap_hosts()
577 .first()
578 .cloned()
579 .ok_or_else(no_host_reachable_error)?
580 };
581
582 let correlation = self.next_correlation();
583 let client_id = self.client.client_id().to_owned();
584 let conn = self.client.get_connection(&request_host).await?;
585 let (header, request) =
586 build_find_coordinator_request(correlation, &client_id, &self.group);
587 send_kp_request(conn, &header, &request, API_VERSION_FIND_COORDINATOR).await?;
588 let response =
589 get_kp_response::<FindCoordinatorResponse>(conn, API_VERSION_FIND_COORDINATOR).await?;
590
591 let (error_code, host, port) = if let Some(c) = response.coordinators.first() {
592 (c.error_code, c.host.to_string(), c.port)
593 } else {
594 (
595 response.error_code,
596 response.host.to_string(),
597 response.port,
598 )
599 };
600
601 if error_code != 0 {
602 if let Some(code) = map_kafka_code(error_code) {
603 return Err(Error::Kafka(code));
604 }
605 return Err(Error::Kafka(KafkaCode::Unknown));
606 }
607
608 self.coordinator = Some(format!("{host}:{port}"));
609 Ok(())
610 }
611
612 async fn ensure_start_offsets(&mut self) -> Result<()> {
613 let missing: Vec<(String, i32)> = self
614 .leaders
615 .keys()
616 .filter(|tp| !self.offsets.contains_key(*tp))
617 .cloned()
618 .collect();
619 if missing.is_empty() {
620 return Ok(());
621 }
622
623 self.client.ensure_connected().await?;
624 if self.coordinator.is_none() {
625 self.refresh_coordinator().await?;
626 }
627
628 let committed = self.fetch_committed_offsets(&missing).await?;
629 for tp in missing {
630 if let Some(offset) = committed.get(&tp)
631 && *offset >= 0
632 {
633 self.offsets.insert(tp.clone(), *offset);
634 continue;
635 }
636
637 let fallback = self.resolve_fallback_offset(&tp).await?;
638 self.offsets.insert(tp, fallback);
639 }
640
641 Ok(())
642 }
643
644 async fn fetch_committed_offsets(
645 &mut self,
646 partitions: &[(String, i32)],
647 ) -> Result<HashMap<(String, i32), i64>> {
648 let Some(coordinator) = self.coordinator.clone() else {
649 return Err(Error::Kafka(KafkaCode::GroupCoordinatorNotAvailable));
650 };
651
652 let client_id = self.client.client_id().to_owned();
653 let correlation = self.next_correlation();
654 let req_parts: Vec<(&str, i32)> = partitions
655 .iter()
656 .map(|(topic, partition)| (topic.as_str(), *partition))
657 .collect();
658
659 let conn = self.client.get_connection(&coordinator).await?;
660 let (header, request) =
661 build_offset_fetch_request(correlation, &client_id, &self.group, &req_parts);
662 send_kp_request(conn, &header, &request, API_VERSION_OFFSET_FETCH).await?;
663 let response =
664 get_kp_response::<OffsetFetchResponse>(conn, API_VERSION_OFFSET_FETCH).await?;
665
666 let mut committed = HashMap::new();
667 for topic in response.topics {
668 for partition in topic.partitions {
669 if partition.error_code != 0 {
670 if let Some(code) = map_kafka_code(partition.error_code) {
671 return Err(Error::Kafka(code));
672 }
673 return Err(Error::Kafka(KafkaCode::Unknown));
674 }
675 committed.insert(
676 (topic.name.to_string(), partition.partition_index),
677 partition.committed_offset,
678 );
679 }
680 }
681 Ok(committed)
682 }
683
684 async fn resolve_fallback_offset(&mut self, tp: &(String, i32)) -> Result<i64> {
685 let Some(leader) = self.leaders.get(tp).cloned() else {
686 return Err(Error::Kafka(KafkaCode::LeaderNotAvailable));
687 };
688
689 let timestamp = match self.fallback_offset {
690 FetchOffset::Earliest => -2,
691 FetchOffset::Latest => -1,
692 FetchOffset::ByTime(t) => t,
693 };
694
695 let correlation = self.next_correlation();
696 let client_id = self.client.client_id().to_owned();
697 let conn = self.client.get_connection(&leader).await?;
698 let (header, request) = build_list_offsets_request(
699 correlation,
700 &client_id,
701 &[(tp.0.as_str(), tp.1, timestamp)],
702 );
703 send_kp_request(conn, &header, &request, API_VERSION_LIST_OFFSETS).await?;
704 let response =
705 get_kp_response::<ListOffsetsResponse>(conn, API_VERSION_LIST_OFFSETS).await?;
706
707 for topic in response.topics {
708 if topic.name.as_str() != tp.0.as_str() {
709 continue;
710 }
711 for partition in topic.partitions {
712 if partition.partition_index != tp.1 {
713 continue;
714 }
715 if partition.error_code != 0 {
716 if let Some(code) = map_kafka_code(partition.error_code) {
717 return Err(Error::Kafka(code));
718 }
719 return Err(Error::Kafka(KafkaCode::Unknown));
720 }
721 return Ok(partition.offset);
722 }
723 }
724
725 Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition))
726 }
727
728 fn error_stats(&self) -> NativeConsumerErrorStats {
729 NativeConsumerErrorStats {
730 total_errors: self.observability.total_errors,
731 kafka_code_counts: self.observability.kafka_code_counts.clone(),
732 class_counts: self.observability.class_counts.clone(),
733 last_error: self.observability.last_error.clone(),
734 recent_errors: self.observability.recent_errors.iter().cloned().collect(),
735 }
736 }
737
738 fn reset_error_stats(&mut self) {
739 self.observability.clear();
740 }
741
742 fn record_error(&mut self, phase: &str, err: &Error) {
743 self.observability.total_errors = self.observability.total_errors.saturating_add(1);
744 let class = error_class(err);
745 let kafka_code = kafka_code_from_error(err).map(kafka_code_to_i16);
746 let kafka_code_label = kafka_code.map(|code| code.to_string());
747
748 *self
749 .observability
750 .class_counts
751 .entry(class.clone())
752 .or_insert(0) += 1;
753 if let Some(code) = &kafka_code_label {
754 *self
755 .observability
756 .kafka_code_counts
757 .entry(code.clone())
758 .or_insert(0) += 1;
759 }
760 let snapshot = NativeConsumerErrorSnapshot {
761 phase: phase.to_owned(),
762 class,
763 kafka_code: kafka_code.and_then(map_kafka_code),
764 message: err.to_string(),
765 timestamp_unix_ms: now_unix_ms(),
766 };
767 self.observability.last_error = Some(snapshot.clone());
768 self.observability.recent_errors.push_back(snapshot.clone());
769 while self.observability.recent_errors.len() > self.observability.recent_error_limit {
770 let _ = self.observability.recent_errors.pop_front();
771 }
772 crate::metrics::record_native_consumer_error(
773 phase,
774 &snapshot.class,
775 kafka_code_label.as_deref(),
776 self.observability.recent_errors.len(),
777 snapshot.timestamp_unix_ms,
778 );
779 }
780}
781
782fn build_metadata_request(
783 correlation_id: i32,
784 client_id: &str,
785 topics: &[String],
786) -> (RequestHeader, MetadataRequest) {
787 let header = RequestHeader::default()
788 .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
789 .with_request_api_key(ApiKey::Metadata as i16)
790 .with_request_api_version(API_VERSION_METADATA)
791 .with_correlation_id(correlation_id);
792
793 let request_topics: Vec<MetadataRequestTopic> = topics
794 .iter()
795 .map(|topic| {
796 MetadataRequestTopic::default()
797 .with_name(Some(TopicName::from(StrBytes::from_string(topic.clone()))))
798 })
799 .collect();
800
801 let request = MetadataRequest::default().with_topics(Some(request_topics));
802 (header, request)
803}
804
805fn build_fetch_request(
806 correlation_id: i32,
807 client_id: &str,
808 partitions: &[(&str, i32, i64, i32)],
809) -> (RequestHeader, FetchRequest) {
810 let header = RequestHeader::default()
811 .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
812 .with_request_api_key(ApiKey::Fetch as i16)
813 .with_request_api_version(API_VERSION_FETCH)
814 .with_correlation_id(correlation_id);
815
816 let mut topic_map: HashMap<&str, Vec<KpFetchPartition>> = HashMap::new();
817 for (topic, partition, offset, partition_max_bytes) in partitions {
818 topic_map.entry(topic).or_default().push(
819 KpFetchPartition::default()
820 .with_partition(*partition)
821 .with_fetch_offset(*offset)
822 .with_partition_max_bytes(*partition_max_bytes),
823 );
824 }
825
826 let topics: Vec<KpFetchTopic> = topic_map
827 .into_iter()
828 .map(|(topic_name, fetch_partitions)| {
829 KpFetchTopic::default()
830 .with_topic(TopicName::from(StrBytes::from_string(
831 topic_name.to_string(),
832 )))
833 .with_partitions(fetch_partitions)
834 })
835 .collect();
836
837 let request = FetchRequest::default()
838 .with_replica_id(kafka_protocol::messages::BrokerId::from(-1))
839 .with_max_wait_ms(FETCH_MAX_WAIT_MS)
840 .with_min_bytes(FETCH_MIN_BYTES)
841 .with_max_bytes(i32::MAX)
842 .with_isolation_level(0)
843 .with_topics(topics);
844
845 (header, request)
846}
847
848fn build_find_coordinator_request(
849 correlation_id: i32,
850 client_id: &str,
851 group_id: &str,
852) -> (RequestHeader, FindCoordinatorRequest) {
853 let header = RequestHeader::default()
854 .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
855 .with_request_api_key(ApiKey::FindCoordinator as i16)
856 .with_request_api_version(API_VERSION_FIND_COORDINATOR)
857 .with_correlation_id(correlation_id);
858
859 let request = FindCoordinatorRequest::default()
860 .with_key(StrBytes::from_string(group_id.to_owned()))
861 .with_key_type(0);
862
863 (header, request)
864}
865
866fn build_offset_commit_request(
867 correlation_id: i32,
868 client_id: &str,
869 group_id: &str,
870 offsets: &[(&str, i32, i64)],
871) -> (RequestHeader, OffsetCommitRequest) {
872 let header = RequestHeader::default()
873 .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
874 .with_request_api_key(ApiKey::OffsetCommit as i16)
875 .with_request_api_version(API_VERSION_OFFSET_COMMIT)
876 .with_correlation_id(correlation_id);
877
878 let mut topic_map: HashMap<&str, Vec<OffsetCommitRequestPartition>> = HashMap::new();
879 for (topic, partition, offset) in offsets {
880 topic_map.entry(topic).or_default().push(
881 OffsetCommitRequestPartition::default()
882 .with_partition_index(*partition)
883 .with_committed_offset(*offset)
884 .with_committed_metadata(None),
885 );
886 }
887
888 let topics: Vec<OffsetCommitRequestTopic> = topic_map
889 .into_iter()
890 .map(|(name, partitions)| {
891 OffsetCommitRequestTopic::default()
892 .with_name(TopicName::from(StrBytes::from_string(name.to_string())))
893 .with_partitions(partitions)
894 })
895 .collect();
896
897 let request = OffsetCommitRequest::default()
898 .with_group_id(GroupId::from(StrBytes::from_string(group_id.to_owned())))
899 .with_generation_id_or_member_epoch(-1)
900 .with_member_id(StrBytes::from_string(String::new()))
901 .with_retention_time_ms(-1)
902 .with_topics(topics);
903
904 (header, request)
905}
906
907fn build_offset_fetch_request(
908 correlation_id: i32,
909 client_id: &str,
910 group_id: &str,
911 partitions: &[(&str, i32)],
912) -> (RequestHeader, OffsetFetchRequest) {
913 let header = RequestHeader::default()
914 .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
915 .with_request_api_key(ApiKey::OffsetFetch as i16)
916 .with_request_api_version(API_VERSION_OFFSET_FETCH)
917 .with_correlation_id(correlation_id);
918
919 let mut topic_map: HashMap<&str, Vec<i32>> = HashMap::new();
920 for (topic, partition) in partitions {
921 topic_map.entry(topic).or_default().push(*partition);
922 }
923
924 let topics: Vec<OffsetFetchRequestTopic> = topic_map
925 .into_iter()
926 .map(|(topic, partition_indexes)| {
927 OffsetFetchRequestTopic::default()
928 .with_name(TopicName::from(StrBytes::from_string(topic.to_owned())))
929 .with_partition_indexes(partition_indexes)
930 })
931 .collect();
932
933 let request = OffsetFetchRequest::default()
934 .with_group_id(GroupId::from(StrBytes::from_string(group_id.to_owned())))
935 .with_topics(Some(topics));
936 (header, request)
937}
938
939fn build_list_offsets_request(
940 correlation_id: i32,
941 client_id: &str,
942 partitions: &[(&str, i32, i64)],
943) -> (RequestHeader, ListOffsetsRequest) {
944 let header = RequestHeader::default()
945 .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
946 .with_request_api_key(ApiKey::ListOffsets as i16)
947 .with_request_api_version(API_VERSION_LIST_OFFSETS)
948 .with_correlation_id(correlation_id);
949
950 let mut topic_map: HashMap<&str, Vec<ListOffsetsPartition>> = HashMap::new();
951 for (topic, partition, timestamp) in partitions {
952 topic_map.entry(topic).or_default().push(
953 ListOffsetsPartition::default()
954 .with_partition_index(*partition)
955 .with_timestamp(*timestamp),
956 );
957 }
958
959 let topics: Vec<ListOffsetsTopic> = topic_map
960 .into_iter()
961 .map(|(topic, parts)| {
962 ListOffsetsTopic::default()
963 .with_name(TopicName::from(StrBytes::from_string(topic.to_owned())))
964 .with_partitions(parts)
965 })
966 .collect();
967
968 let request = ListOffsetsRequest::default()
969 .with_replica_id(BrokerId::from(-1))
970 .with_isolation_level(0)
971 .with_topics(topics);
972 (header, request)
973}
974
975async fn send_kp_request<T>(
976 conn: &mut AsyncConnection,
977 header: &RequestHeader,
978 body: &T,
979 api_version: i16,
980) -> Result<()>
981where
982 T: Encodable + kafka_protocol::protocol::HeaderVersion,
983{
984 let header_version = T::header_version(api_version);
985
986 let mut header_buf = BytesMut::new();
987 header
988 .encode(&mut header_buf, header_version)
989 .map_err(|_| Error::Protocol(ProtocolError::Codec))?;
990
991 let mut body_buf = BytesMut::new();
992 body.encode(&mut body_buf, api_version)
993 .map_err(|_| Error::Protocol(ProtocolError::Codec))?;
994
995 let total_len = usize_to_i32(header_buf.len() + body_buf.len())?;
996 let mut out = BytesMut::with_capacity(4 + non_negative_i32_to_usize(total_len)?);
997 out.extend_from_slice(&total_len.to_be_bytes());
998 out.extend_from_slice(&header_buf);
999 out.extend_from_slice(&body_buf);
1000
1001 conn.send(&out).await
1002}
1003
1004async fn get_kp_response<R>(conn: &mut AsyncConnection, api_version: i16) -> Result<R>
1005where
1006 R: Decodable + kafka_protocol::protocol::HeaderVersion,
1007{
1008 let size_bytes = conn.read_exact(4).await?;
1009 let size = i32::from_be_bytes(
1010 <[u8; 4]>::try_from(size_bytes.as_ref())
1011 .map_err(|_| Error::Protocol(ProtocolError::Codec))?,
1012 );
1013 let mut bytes = conn.read_exact(non_negative_i32_to_u64(size)?).await?;
1014
1015 let response_header_version = R::header_version(api_version);
1016 let _resp_header = ResponseHeader::decode(&mut bytes, response_header_version)
1017 .map_err(|_| Error::Protocol(ProtocolError::Codec))?;
1018
1019 R::decode(&mut bytes, api_version).map_err(|_| Error::Protocol(ProtocolError::Codec))
1020}
1021
1022async fn get_fetch_response(
1023 conn: &mut AsyncConnection,
1024 requested_version: i16,
1025) -> Result<FetchResponse> {
1026 let size_bytes = conn.read_exact(4).await?;
1027 let size = i32::from_be_bytes(
1028 <[u8; 4]>::try_from(size_bytes.as_ref())
1029 .map_err(|_| Error::Protocol(ProtocolError::Codec))?,
1030 );
1031 let resp_bytes = conn.read_exact(non_negative_i32_to_u64(size)?).await?;
1032
1033 let mut candidates = Vec::with_capacity(1 + 18);
1034 candidates.push(requested_version);
1035 for v in (0..=17).rev() {
1036 if v != requested_version {
1037 candidates.push(v);
1038 }
1039 }
1040
1041 for version in candidates {
1042 let mut bytes = resp_bytes.clone();
1043 let header_version = FetchResponse::header_version(version);
1044 if ResponseHeader::decode(&mut bytes, header_version).is_err() {
1045 continue;
1046 }
1047 if let Ok(resp) = FetchResponse::decode(&mut bytes, version) {
1048 return Ok(resp);
1049 }
1050 }
1051
1052 Err(Error::Protocol(ProtocolError::Codec))
1053}
1054
1055fn convert_fetch_response(
1056 kp_resp: FetchResponse,
1057 correlation_id: i32,
1058) -> rustfs_kafka::client::fetch_kp::OwnedFetchResponse {
1059 use rustfs_kafka::client::fetch_kp::{OwnedFetchResponse, OwnedPartition, OwnedTopic};
1060
1061 let topics = kp_resp
1062 .responses
1063 .into_iter()
1064 .map(|t| {
1065 let topic_name = t.topic.to_string();
1066 let partitions: Vec<OwnedPartition> = t
1067 .partitions
1068 .into_iter()
1069 .map(|p| {
1070 let data = if p.error_code != 0 {
1071 Err(Arc::new(Error::TopicPartitionError {
1072 topic_name: topic_name.clone(),
1073 partition_id: p.partition_index,
1074 error_code: map_kafka_code(p.error_code).unwrap_or(KafkaCode::Unknown),
1075 }))
1076 } else {
1077 decode_partition_records(p.records, p.high_watermark)
1078 };
1079 OwnedPartition {
1080 partition: p.partition_index,
1081 data,
1082 highwatermark: p.high_watermark,
1083 }
1084 })
1085 .collect();
1086 OwnedTopic {
1087 topic: topic_name,
1088 partitions,
1089 }
1090 })
1091 .collect();
1092
1093 OwnedFetchResponse {
1094 correlation_id,
1095 topics,
1096 }
1097}
1098
1099fn first_fetch_error_code(
1100 resp: &rustfs_kafka::client::fetch_kp::OwnedFetchResponse,
1101) -> Option<KafkaCode> {
1102 for topic in &resp.topics {
1103 for partition in &topic.partitions {
1104 if let Err(err) = partition.data()
1105 && let Error::TopicPartitionError { error_code, .. } = &**err
1106 {
1107 return Some(*error_code);
1108 }
1109 }
1110 }
1111 None
1112}
1113
1114fn should_retry_poll(err: &Error) -> bool {
1115 match err {
1116 Error::Kafka(code) => matches!(
1117 code,
1118 KafkaCode::LeaderNotAvailable
1119 | KafkaCode::NotLeaderForPartition
1120 | KafkaCode::RequestTimedOut
1121 | KafkaCode::NetworkException
1122 ),
1123 Error::Connection(_) => true,
1124 _ => false,
1125 }
1126}
1127
1128fn should_retry_commit(err: &Error) -> bool {
1129 match err {
1130 Error::Kafka(code) => matches!(
1131 code,
1132 KafkaCode::GroupCoordinatorNotAvailable
1133 | KafkaCode::NotCoordinatorForGroup
1134 | KafkaCode::GroupLoadInProgress
1135 | KafkaCode::RequestTimedOut
1136 | KafkaCode::NetworkException
1137 ),
1138 Error::Connection(_) => true,
1139 _ => false,
1140 }
1141}
1142
1143fn kafka_code_from_error(err: &Error) -> Option<KafkaCode> {
1144 match err {
1145 Error::Kafka(code) => Some(*code),
1146 Error::TopicPartitionError { error_code, .. } => Some(*error_code),
1147 Error::BrokerRequestError { source, .. } => kafka_code_from_error(source),
1148 _ => None,
1149 }
1150}
1151
1152fn error_class(err: &Error) -> String {
1153 match err {
1154 Error::Kafka(_) => "kafka".to_owned(),
1155 Error::Connection(_) => "connection".to_owned(),
1156 Error::Protocol(_) => "protocol".to_owned(),
1157 Error::Config(_) => "config".to_owned(),
1158 Error::Consumer(_) => "consumer".to_owned(),
1159 Error::TopicPartitionError { .. } => "topic_partition".to_owned(),
1160 Error::BrokerRequestError { .. } => "broker_request".to_owned(),
1161 }
1162}
1163
1164fn now_unix_ms() -> u128 {
1165 SystemTime::now()
1166 .duration_since(UNIX_EPOCH)
1167 .map_or(0, |d| d.as_millis())
1168}
1169
1170fn decode_partition_records(
1171 records: Option<Bytes>,
1172 high_watermark: i64,
1173) -> std::result::Result<rustfs_kafka::client::fetch_kp::OwnedData, Arc<Error>> {
1174 use rustfs_kafka::client::fetch_kp::{OwnedData, OwnedMessage};
1175
1176 let Some(mut records_bytes) = records else {
1177 return Ok(OwnedData {
1178 highwatermark_offset: high_watermark,
1179 messages: vec![],
1180 });
1181 };
1182 if records_bytes.is_empty() {
1183 return Ok(OwnedData {
1184 highwatermark_offset: high_watermark,
1185 messages: vec![],
1186 });
1187 }
1188
1189 let Ok(record_set) = RecordBatchDecoder::decode(&mut records_bytes) else {
1190 return Err(Arc::new(Error::Protocol(ProtocolError::Codec)));
1191 };
1192
1193 let mut messages: Vec<OwnedMessage> = Vec::new();
1194 for record in &record_set.records {
1195 messages.push(OwnedMessage {
1196 offset: record.offset,
1197 key: record.key.clone().unwrap_or_default(),
1198 value: record.value.clone().unwrap_or_default(),
1199 });
1200 }
1201
1202 Ok(OwnedData {
1203 highwatermark_offset: high_watermark,
1204 messages,
1205 })
1206}
1207
1208fn map_kafka_code(code: i16) -> Option<KafkaCode> {
1209 match code {
1210 0 => None,
1211 1 => Some(KafkaCode::OffsetOutOfRange),
1212 2 => Some(KafkaCode::CorruptMessage),
1213 3 => Some(KafkaCode::UnknownTopicOrPartition),
1214 4 => Some(KafkaCode::InvalidMessageSize),
1215 5 => Some(KafkaCode::LeaderNotAvailable),
1216 6 => Some(KafkaCode::NotLeaderForPartition),
1217 7 => Some(KafkaCode::RequestTimedOut),
1218 8 => Some(KafkaCode::BrokerNotAvailable),
1219 9 => Some(KafkaCode::ReplicaNotAvailable),
1220 10 => Some(KafkaCode::MessageSizeTooLarge),
1221 11 => Some(KafkaCode::StaleControllerEpoch),
1222 12 => Some(KafkaCode::OffsetMetadataTooLarge),
1223 13 => Some(KafkaCode::NetworkException),
1224 14 => Some(KafkaCode::GroupLoadInProgress),
1225 15 => Some(KafkaCode::GroupCoordinatorNotAvailable),
1226 16 => Some(KafkaCode::NotCoordinatorForGroup),
1227 17 => Some(KafkaCode::InvalidTopic),
1228 18 => Some(KafkaCode::RecordListTooLarge),
1229 19 => Some(KafkaCode::NotEnoughReplicas),
1230 20 => Some(KafkaCode::NotEnoughReplicasAfterAppend),
1231 21 => Some(KafkaCode::InvalidRequiredAcks),
1232 22 => Some(KafkaCode::IllegalGeneration),
1233 23 => Some(KafkaCode::InconsistentGroupProtocol),
1234 24 => Some(KafkaCode::InvalidGroupId),
1235 25 => Some(KafkaCode::UnknownMemberId),
1236 26 => Some(KafkaCode::InvalidSessionTimeout),
1237 27 => Some(KafkaCode::RebalanceInProgress),
1238 28 => Some(KafkaCode::InvalidCommitOffsetSize),
1239 29 => Some(KafkaCode::TopicAuthorizationFailed),
1240 30 => Some(KafkaCode::GroupAuthorizationFailed),
1241 31 => Some(KafkaCode::ClusterAuthorizationFailed),
1242 32 => Some(KafkaCode::InvalidTimestamp),
1243 33 => Some(KafkaCode::UnsupportedSaslMechanism),
1244 34 => Some(KafkaCode::IllegalSaslState),
1245 35 => Some(KafkaCode::UnsupportedVersion),
1246 _ => Some(KafkaCode::Unknown),
1247 }
1248}
1249
1250fn kafka_code_to_i16(code: KafkaCode) -> i16 {
1251 match code {
1252 KafkaCode::OffsetOutOfRange => 1,
1253 KafkaCode::CorruptMessage => 2,
1254 KafkaCode::UnknownTopicOrPartition => 3,
1255 KafkaCode::InvalidMessageSize => 4,
1256 KafkaCode::LeaderNotAvailable => 5,
1257 KafkaCode::NotLeaderForPartition => 6,
1258 KafkaCode::RequestTimedOut => 7,
1259 KafkaCode::BrokerNotAvailable => 8,
1260 KafkaCode::ReplicaNotAvailable => 9,
1261 KafkaCode::MessageSizeTooLarge => 10,
1262 KafkaCode::StaleControllerEpoch => 11,
1263 KafkaCode::OffsetMetadataTooLarge => 12,
1264 KafkaCode::NetworkException => 13,
1265 KafkaCode::GroupLoadInProgress => 14,
1266 KafkaCode::GroupCoordinatorNotAvailable => 15,
1267 KafkaCode::NotCoordinatorForGroup => 16,
1268 KafkaCode::InvalidTopic => 17,
1269 KafkaCode::RecordListTooLarge => 18,
1270 KafkaCode::NotEnoughReplicas => 19,
1271 KafkaCode::NotEnoughReplicasAfterAppend => 20,
1272 KafkaCode::InvalidRequiredAcks => 21,
1273 KafkaCode::IllegalGeneration => 22,
1274 KafkaCode::InconsistentGroupProtocol => 23,
1275 KafkaCode::InvalidGroupId => 24,
1276 KafkaCode::UnknownMemberId => 25,
1277 KafkaCode::InvalidSessionTimeout => 26,
1278 KafkaCode::RebalanceInProgress => 27,
1279 KafkaCode::InvalidCommitOffsetSize => 28,
1280 KafkaCode::TopicAuthorizationFailed => 29,
1281 KafkaCode::GroupAuthorizationFailed => 30,
1282 KafkaCode::ClusterAuthorizationFailed => 31,
1283 KafkaCode::InvalidTimestamp => 32,
1284 KafkaCode::UnsupportedSaslMechanism => 33,
1285 KafkaCode::IllegalSaslState => 34,
1286 KafkaCode::UnsupportedVersion => 35,
1287 KafkaCode::Unknown => -1,
1288 }
1289}
1290
1291fn usize_to_i32(value: usize) -> Result<i32> {
1292 i32::try_from(value).map_err(|_| Error::Protocol(ProtocolError::Codec))
1293}
1294
1295fn non_negative_i32_to_usize(value: i32) -> Result<usize> {
1296 usize::try_from(value).map_err(|_| Error::Protocol(ProtocolError::Codec))
1297}
1298
1299fn non_negative_i32_to_u64(value: i32) -> Result<u64> {
1300 u64::try_from(value).map_err(|_| Error::Protocol(ProtocolError::Codec))
1301}
1302
1303fn no_host_reachable_error() -> Error {
1304 Error::Connection(rustfs_kafka::error::ConnectionError::NoHostReachable)
1305}
1306
1307#[cfg(test)]
1308mod tests {
1309 use rustfs_kafka::error::{ConnectionError, Error};
1310
1311 use super::*;
1312
1313 #[tokio::test]
1314 async fn from_hosts_fails_with_unreachable_hosts() {
1315 let result = AsyncConsumer::from_hosts(
1316 vec!["127.0.0.1:1".to_owned()],
1317 "test-group".to_owned(),
1318 vec!["test-topic".to_owned()],
1319 )
1320 .await;
1321 assert!(matches!(
1322 result,
1323 Err(Error::Connection(ConnectionError::NoHostReachable))
1324 ));
1325 }
1326
1327 #[tokio::test]
1328 async fn from_client_fails_with_unreachable_hosts() {
1329 let client = AsyncKafkaClient::new(vec![]).await.unwrap();
1330 let result = AsyncConsumer::from_client(
1331 client,
1332 "test-group".to_owned(),
1333 vec!["test-topic".to_owned()],
1334 )
1335 .await;
1336 assert!(result.is_ok());
1337 }
1338
1339 #[tokio::test]
1340 async fn drop_consumer_without_close_does_not_panic() {
1341 let result = AsyncConsumer::from_hosts(
1342 vec!["127.0.0.1:1".to_owned()],
1343 "test-drop-group".to_owned(),
1344 vec!["test-drop-topic".to_owned()],
1345 )
1346 .await;
1347 assert!(result.is_err());
1348 }
1349
1350 #[tokio::test]
1351 async fn builder_without_group_returns_error() {
1352 let result = AsyncConsumer::builder(vec![])
1353 .with_topic("t".to_owned())
1354 .build()
1355 .await;
1356 assert!(matches!(
1357 result,
1358 Err(Error::Consumer(ConsumerError::UnsetGroupId))
1359 ));
1360 }
1361
1362 #[tokio::test]
1363 async fn builder_without_topics_returns_error() {
1364 let result = AsyncConsumer::builder(vec![])
1365 .with_group("g".to_owned())
1366 .build()
1367 .await;
1368 assert!(matches!(
1369 result,
1370 Err(Error::Consumer(ConsumerError::NoTopicsAssigned))
1371 ));
1372 }
1373
1374 #[tokio::test]
1375 async fn native_observability_tracks_recent_snapshots_and_numeric_codes() {
1376 let client = AsyncKafkaClient::new(vec![]).await.unwrap();
1377 let mut native = NativeConsumer {
1378 client,
1379 group: "g".to_owned(),
1380 topics: vec!["t".to_owned()],
1381 fallback_offset: FetchOffset::Latest,
1382 offsets: HashMap::new(),
1383 dirty_offsets: HashMap::new(),
1384 leaders: HashMap::new(),
1385 coordinator: None,
1386 correlation: 1,
1387 retry_attempts: DEFAULT_NATIVE_RETRY_ATTEMPTS,
1388 retry_backoff: Duration::from_millis(DEFAULT_NATIVE_RETRY_BACKOFF_MS),
1389 observability: NativeConsumerObservability::new(2),
1390 };
1391
1392 native.record_error("poll", &Error::Kafka(KafkaCode::LeaderNotAvailable));
1393 native.record_error("poll", &Error::Kafka(KafkaCode::NotLeaderForPartition));
1394 native.record_error(
1395 "commit",
1396 &Error::Kafka(KafkaCode::GroupCoordinatorNotAvailable),
1397 );
1398
1399 let stats = native.error_stats();
1400 assert_eq!(stats.total_errors, 3);
1401 assert_eq!(stats.kafka_code_counts.get("5").copied(), Some(1));
1402 assert_eq!(stats.kafka_code_counts.get("6").copied(), Some(1));
1403 assert_eq!(stats.kafka_code_counts.get("15").copied(), Some(1));
1404 assert_eq!(stats.recent_errors.len(), 2);
1405 assert_eq!(stats.recent_errors[0].phase, "poll");
1406 assert_eq!(stats.recent_errors[1].phase, "commit");
1407
1408 native.reset_error_stats();
1409 let reset = native.error_stats();
1410 assert_eq!(reset.total_errors, 0);
1411 assert!(reset.kafka_code_counts.is_empty());
1412 assert!(reset.recent_errors.is_empty());
1413 }
1414}