Skip to main content

rustfs_kafka_async/
consumer.rs

1//! Async consumer for fetching messages from Kafka.
2
3use bytes::{Bytes, BytesMut};
4use kafka_protocol::messages::{
5    ApiKey, BrokerId, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse,
6    GroupId, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse,
7    OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse,
8    RequestHeader, ResponseHeader, TopicName, fetch_request::FetchPartition as KpFetchPartition,
9    fetch_request::FetchTopic as KpFetchTopic, list_offsets_request::ListOffsetsPartition,
10    list_offsets_request::ListOffsetsTopic, metadata_request::MetadataRequestTopic,
11    offset_commit_request::OffsetCommitRequestPartition,
12    offset_commit_request::OffsetCommitRequestTopic, offset_fetch_request::OffsetFetchRequestTopic,
13};
14use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion, StrBytes};
15use kafka_protocol::records::RecordBatchDecoder;
16use rustfs_kafka::client::SecurityConfig;
17use rustfs_kafka::consumer::{FetchOffset, MessageSets};
18use rustfs_kafka::error::{ConsumerError, Error, KafkaCode, ProtocolError, Result};
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21use std::time::Duration;
22use std::time::{SystemTime, UNIX_EPOCH};
23use tracing::debug;
24
25use crate::AsyncKafkaClient;
26use crate::connection::AsyncConnection;
27
28const API_VERSION_METADATA: i16 = 1;
29const API_VERSION_FETCH: i16 = 12;
30const API_VERSION_FIND_COORDINATOR: i16 = 3;
31const API_VERSION_OFFSET_COMMIT: i16 = 2;
32const API_VERSION_OFFSET_FETCH: i16 = 2;
33const API_VERSION_LIST_OFFSETS: i16 = 1;
34const DEFAULT_NATIVE_RETRY_ATTEMPTS: usize = 3;
35const DEFAULT_NATIVE_RETRY_BACKOFF_MS: u64 = 100;
36const DEFAULT_NATIVE_RECENT_ERROR_LIMIT: usize = 32;
37const FETCH_MIN_BYTES: i32 = 1;
38const FETCH_MAX_WAIT_MS: i32 = 100;
39const FETCH_PARTITION_MAX_BYTES: i32 = 1_048_576;
40
41struct NativeConsumer {
42    client: AsyncKafkaClient,
43    group: String,
44    topics: Vec<String>,
45    fallback_offset: FetchOffset,
46    offsets: HashMap<(String, i32), i64>,
47    dirty_offsets: HashMap<(String, i32), i64>,
48    leaders: HashMap<(String, i32), String>,
49    coordinator: Option<String>,
50    correlation: i32,
51    retry_attempts: usize,
52    retry_backoff: Duration,
53    observability: NativeConsumerObservability,
54}
55
56enum AsyncConsumerMode {
57    Native(Box<NativeConsumer>),
58}
59
60/// Native consumer error snapshot for diagnostics.
61#[derive(Debug, Clone)]
62pub struct NativeConsumerErrorSnapshot {
63    pub phase: String,
64    pub class: String,
65    pub kafka_code: Option<KafkaCode>,
66    pub message: String,
67    pub timestamp_unix_ms: u128,
68}
69
70/// Native consumer error statistics.
71#[derive(Debug, Clone)]
72pub struct NativeConsumerErrorStats {
73    pub total_errors: u64,
74    pub kafka_code_counts: HashMap<String, u64>,
75    pub class_counts: HashMap<String, u64>,
76    pub last_error: Option<NativeConsumerErrorSnapshot>,
77    pub recent_errors: Vec<NativeConsumerErrorSnapshot>,
78}
79
80#[derive(Debug, Clone)]
81struct NativeConsumerObservability {
82    total_errors: u64,
83    kafka_code_counts: HashMap<String, u64>,
84    class_counts: HashMap<String, u64>,
85    last_error: Option<NativeConsumerErrorSnapshot>,
86    recent_errors: VecDeque<NativeConsumerErrorSnapshot>,
87    recent_error_limit: usize,
88}
89
90impl Default for NativeConsumerObservability {
91    fn default() -> Self {
92        Self::new(DEFAULT_NATIVE_RECENT_ERROR_LIMIT)
93    }
94}
95
96impl NativeConsumerObservability {
97    fn new(recent_error_limit: usize) -> Self {
98        Self {
99            total_errors: 0,
100            kafka_code_counts: HashMap::new(),
101            class_counts: HashMap::new(),
102            last_error: None,
103            recent_errors: VecDeque::new(),
104            recent_error_limit: recent_error_limit.max(1),
105        }
106    }
107
108    fn clear(&mut self) {
109        self.total_errors = 0;
110        self.kafka_code_counts.clear();
111        self.class_counts.clear();
112        self.last_error = None;
113        self.recent_errors.clear();
114    }
115}
116
117/// An async Kafka consumer.
118pub struct AsyncConsumer {
119    mode: AsyncConsumerMode,
120}
121
122/// Builder for constructing an [`AsyncConsumer`] asynchronously.
123pub struct AsyncConsumerBuilder {
124    hosts: Vec<String>,
125    group: Option<String>,
126    topics: Vec<String>,
127    security: Option<SecurityConfig>,
128    channel_capacity: usize,
129    native_async: bool,
130    fallback_offset: FetchOffset,
131    native_retry_attempts: usize,
132    native_retry_backoff: Duration,
133    native_recent_error_limit: usize,
134}
135
136impl AsyncConsumerBuilder {
137    /// Creates a new async consumer builder from bootstrap hosts.
138    #[must_use]
139    pub fn new(hosts: Vec<String>) -> Self {
140        Self {
141            hosts,
142            group: None,
143            topics: Vec::new(),
144            security: None,
145            channel_capacity: 64,
146            native_async: true,
147            fallback_offset: FetchOffset::Latest,
148            native_retry_attempts: DEFAULT_NATIVE_RETRY_ATTEMPTS,
149            native_retry_backoff: Duration::from_millis(DEFAULT_NATIVE_RETRY_BACKOFF_MS),
150            native_recent_error_limit: DEFAULT_NATIVE_RECENT_ERROR_LIMIT,
151        }
152    }
153
154    /// Sets the consumer group.
155    #[must_use]
156    pub fn with_group(mut self, group: String) -> Self {
157        self.group = Some(group);
158        self
159    }
160
161    /// Adds a topic subscription.
162    #[must_use]
163    pub fn with_topic(mut self, topic: String) -> Self {
164        self.topics.push(topic);
165        self
166    }
167
168    /// Adds multiple topic subscriptions.
169    #[must_use]
170    pub fn with_topics(mut self, topics: Vec<String>) -> Self {
171        self.topics.extend(topics);
172        self
173    }
174
175    /// Sets optional TLS security configuration for broker connections.
176    #[must_use]
177    pub fn with_security(mut self, security: SecurityConfig) -> Self {
178        self.security = Some(security);
179        self
180    }
181
182    /// Backward-compatible no-op kept for API compatibility.
183    #[must_use]
184    pub fn with_channel_capacity(mut self, channel_capacity: usize) -> Self {
185        self.channel_capacity = channel_capacity.max(1);
186        self
187    }
188
189    /// Backward-compatible setting kept for API compatibility.
190    #[must_use]
191    pub fn with_native_async(mut self, native_async: bool) -> Self {
192        self.native_async = native_async;
193        self
194    }
195
196    /// Sets fallback offset used when there is no committed group offset.
197    #[must_use]
198    pub fn with_fallback_offset(mut self, fallback_offset: FetchOffset) -> Self {
199        self.fallback_offset = fallback_offset;
200        self
201    }
202
203    /// Sets retry attempts for native async poll/commit recoverable errors.
204    #[must_use]
205    pub fn with_native_retry_attempts(mut self, attempts: usize) -> Self {
206        self.native_retry_attempts = attempts.max(1);
207        self
208    }
209
210    /// Sets retry backoff for native async poll/commit recoverable errors.
211    #[must_use]
212    pub fn with_native_retry_backoff(mut self, backoff: Duration) -> Self {
213        self.native_retry_backoff = backoff;
214        self
215    }
216
217    /// Sets the max number of native recent error snapshots retained in memory.
218    #[must_use]
219    pub fn with_native_recent_error_limit(mut self, limit: usize) -> Self {
220        self.native_recent_error_limit = limit.max(1);
221        self
222    }
223
224    /// Builds an async consumer.
225    pub async fn build(self) -> Result<AsyncConsumer> {
226        let AsyncConsumerBuilder {
227            hosts,
228            group,
229            topics,
230            security,
231            channel_capacity,
232            native_async,
233            fallback_offset,
234            native_retry_attempts,
235            native_retry_backoff,
236            native_recent_error_limit,
237        } = self;
238
239        let group = group.ok_or(Error::Consumer(ConsumerError::UnsetGroupId))?;
240        if topics.is_empty() {
241            return Err(Error::Consumer(ConsumerError::NoTopicsAssigned));
242        }
243
244        if !native_async {
245            debug!(
246                "AsyncConsumerBuilder::with_native_async(false) is ignored: consumer always uses native async I/O"
247            );
248        }
249        let _ = channel_capacity;
250        let client = AsyncKafkaClient::with_client_id_and_security(
251            hosts,
252            "rustfs-kafka-async".to_owned(),
253            security,
254        )
255        .await?;
256
257        Ok(AsyncConsumer {
258            mode: AsyncConsumerMode::Native(Box::new(NativeConsumer {
259                client,
260                group,
261                topics,
262                fallback_offset,
263                offsets: HashMap::new(),
264                dirty_offsets: HashMap::new(),
265                leaders: HashMap::new(),
266                coordinator: None,
267                correlation: 1,
268                retry_attempts: native_retry_attempts,
269                retry_backoff: native_retry_backoff,
270                observability: NativeConsumerObservability::new(native_recent_error_limit),
271            })),
272        })
273    }
274}
275
276impl AsyncConsumer {
277    /// Starts building a new async consumer from bootstrap hosts.
278    #[must_use]
279    pub fn builder(hosts: Vec<String>) -> AsyncConsumerBuilder {
280        AsyncConsumerBuilder::new(hosts)
281    }
282
283    /// Creates a new async consumer from bootstrap hosts.
284    pub async fn from_hosts(
285        hosts: Vec<String>,
286        group: String,
287        topics: Vec<String>,
288    ) -> Result<Self> {
289        Self::builder(hosts)
290            .with_group(group)
291            .with_topics(topics)
292            .build()
293            .await
294    }
295
296    /// Creates a new async consumer from an [`AsyncKafkaClient`].
297    pub async fn from_client(
298        client: AsyncKafkaClient,
299        group: String,
300        topics: Vec<String>,
301    ) -> Result<Self> {
302        if group.is_empty() {
303            return Err(Error::Consumer(ConsumerError::UnsetGroupId));
304        }
305        if topics.is_empty() {
306            return Err(Error::Consumer(ConsumerError::NoTopicsAssigned));
307        }
308        Ok(Self {
309            mode: AsyncConsumerMode::Native(Box::new(NativeConsumer {
310                client,
311                group,
312                topics,
313                fallback_offset: FetchOffset::Latest,
314                offsets: HashMap::new(),
315                dirty_offsets: HashMap::new(),
316                leaders: HashMap::new(),
317                coordinator: None,
318                correlation: 1,
319                retry_attempts: DEFAULT_NATIVE_RETRY_ATTEMPTS,
320                retry_backoff: Duration::from_millis(DEFAULT_NATIVE_RETRY_BACKOFF_MS),
321                observability: NativeConsumerObservability::default(),
322            })),
323        })
324    }
325
326    /// Polls for new messages and returns fetched message sets.
327    pub async fn poll(&mut self) -> Result<MessageSets> {
328        match &mut self.mode {
329            AsyncConsumerMode::Native(native) => native.poll().await,
330        }
331    }
332
333    /// Commits the current consumed offsets.
334    pub async fn commit(&mut self) -> Result<()> {
335        match &mut self.mode {
336            AsyncConsumerMode::Native(native) => native.commit().await,
337        }
338    }
339
340    /// Gracefully closes the consumer.
341    pub async fn close(self) -> Result<()> {
342        Ok(())
343    }
344
345    /// Returns native consumer error statistics when running in native mode.
346    #[must_use]
347    pub fn native_error_stats(&self) -> Option<NativeConsumerErrorStats> {
348        match &self.mode {
349            AsyncConsumerMode::Native(native) => Some(native.error_stats()),
350        }
351    }
352
353    /// Resets native consumer error statistics.
354    ///
355    /// Returns `true` when reset was performed (native mode), otherwise `false`.
356    pub fn reset_native_error_stats(&mut self) -> bool {
357        match &mut self.mode {
358            AsyncConsumerMode::Native(native) => {
359                native.reset_error_stats();
360                true
361            }
362        }
363    }
364}
365
366impl NativeConsumer {
367    async fn poll(&mut self) -> Result<MessageSets> {
368        for attempt in 1..=self.retry_attempts {
369            match self.poll_once().await {
370                Ok(data) => return Ok(data),
371                Err(err) if attempt < self.retry_attempts && should_retry_poll(&err) => {
372                    self.record_error("poll", &err);
373                    self.leaders.clear();
374                    self.refresh_metadata().await?;
375                    tokio::time::sleep(self.retry_backoff).await;
376                    continue;
377                }
378                Err(err) => {
379                    self.record_error("poll", &err);
380                    return Err(err);
381                }
382            }
383        }
384        Err(Error::Kafka(KafkaCode::Unknown))
385    }
386
387    async fn poll_once(&mut self) -> Result<MessageSets> {
388        self.client.ensure_connected().await?;
389        if self.leaders.is_empty() {
390            self.refresh_metadata().await?;
391        }
392        self.ensure_start_offsets().await?;
393
394        let mut by_broker: HashMap<String, Vec<(String, i32, i64)>> = HashMap::new();
395        for (tp, leader_host) in &self.leaders {
396            let offset = *self.offsets.get(tp).unwrap_or(&0);
397            by_broker
398                .entry(leader_host.clone())
399                .or_default()
400                .push((tp.0.clone(), tp.1, offset));
401        }
402
403        let correlation = self.next_correlation();
404        let client_id = self.client.client_id().to_owned();
405        let mut owned_responses = Vec::new();
406
407        for (broker, tps) in by_broker {
408            let parts: Vec<(&str, i32, i64, i32)> = tps
409                .iter()
410                .map(|(topic, partition, offset)| {
411                    (
412                        topic.as_str(),
413                        *partition,
414                        *offset,
415                        FETCH_PARTITION_MAX_BYTES,
416                    )
417                })
418                .collect();
419
420            let conn = self.client.get_connection(&broker).await?;
421            let (header, request) = build_fetch_request(correlation, &client_id, &parts);
422            send_kp_request(conn, &header, &request, API_VERSION_FETCH).await?;
423            let response = get_fetch_response(conn, API_VERSION_FETCH).await?;
424            let owned = convert_fetch_response(response, correlation);
425            if let Some(code) = first_fetch_error_code(&owned) {
426                return Err(Error::Kafka(code));
427            }
428
429            self.advance_offsets(&owned);
430            owned_responses.push(owned);
431        }
432
433        Ok(MessageSets::from_fetch_responses(owned_responses))
434    }
435
436    fn next_correlation(&mut self) -> i32 {
437        let cid = self.correlation;
438        self.correlation = self.correlation.wrapping_add(1);
439        cid
440    }
441
442    fn advance_offsets(&mut self, resp: &rustfs_kafka::client::fetch_kp::OwnedFetchResponse) {
443        for topic in &resp.topics {
444            for partition in &topic.partitions {
445                if let Ok(data) = partition.data()
446                    && let Some(last) = data.messages.last()
447                {
448                    let next_offset = last.offset + 1;
449                    let tp = (topic.topic.clone(), partition.partition);
450                    self.offsets.insert(tp.clone(), next_offset);
451                    self.dirty_offsets.insert(tp, next_offset);
452                }
453            }
454        }
455    }
456
457    async fn commit(&mut self) -> Result<()> {
458        for attempt in 1..=self.retry_attempts {
459            match self.commit_once().await {
460                Ok(()) => return Ok(()),
461                Err(err) if attempt < self.retry_attempts && should_retry_commit(&err) => {
462                    self.record_error("commit", &err);
463                    self.coordinator = None;
464                    self.refresh_coordinator().await?;
465                    tokio::time::sleep(self.retry_backoff).await;
466                    continue;
467                }
468                Err(err) => {
469                    self.record_error("commit", &err);
470                    return Err(err);
471                }
472            }
473        }
474        Err(Error::Kafka(KafkaCode::Unknown))
475    }
476
477    async fn commit_once(&mut self) -> Result<()> {
478        if self.dirty_offsets.is_empty() {
479            return Ok(());
480        }
481
482        self.client.ensure_connected().await?;
483        if self.coordinator.is_none() {
484            self.refresh_coordinator().await?;
485        }
486        let Some(coordinator) = self.coordinator.clone() else {
487            return Err(Error::Kafka(KafkaCode::GroupCoordinatorNotAvailable));
488        };
489
490        let client_id = self.client.client_id().to_owned();
491        let correlation = self.next_correlation();
492        let payload: Vec<(&str, i32, i64)> = self
493            .dirty_offsets
494            .iter()
495            .map(|((topic, partition), offset)| (topic.as_str(), *partition, *offset))
496            .collect();
497
498        let conn = self.client.get_connection(&coordinator).await?;
499        let (header, request) =
500            build_offset_commit_request(correlation, &client_id, &self.group, &payload);
501        send_kp_request(conn, &header, &request, API_VERSION_OFFSET_COMMIT).await?;
502        let response =
503            get_kp_response::<OffsetCommitResponse>(conn, API_VERSION_OFFSET_COMMIT).await?;
504
505        for topic in response.topics {
506            for partition in topic.partitions {
507                if partition.error_code != 0 {
508                    if let Some(code) = map_kafka_code(partition.error_code) {
509                        return Err(Error::Kafka(code));
510                    }
511                    return Err(Error::Kafka(KafkaCode::Unknown));
512                }
513            }
514        }
515
516        self.dirty_offsets.clear();
517        Ok(())
518    }
519
520    async fn refresh_metadata(&mut self) -> Result<()> {
521        let request_host = if let Some(connected) = self.client.connected_hosts().first() {
522            (*connected).to_owned()
523        } else {
524            self.client
525                .bootstrap_hosts()
526                .first()
527                .cloned()
528                .ok_or_else(no_host_reachable_error)?
529        };
530
531        let correlation = self.next_correlation();
532        let client_id = self.client.client_id().to_owned();
533        let conn = self.client.get_connection(&request_host).await?;
534        let (header, request) = build_metadata_request(correlation, &client_id, &self.topics);
535        send_kp_request(conn, &header, &request, API_VERSION_METADATA).await?;
536        let response = get_kp_response::<MetadataResponse>(conn, API_VERSION_METADATA).await?;
537
538        let mut brokers: HashMap<i32, String> = HashMap::new();
539        for broker in response.brokers {
540            brokers.insert(
541                i32::from(broker.node_id),
542                format!("{}:{}", broker.host, broker.port),
543            );
544        }
545
546        self.leaders.clear();
547        for topic in response.topics {
548            let Some(topic_name) = topic.name else {
549                continue;
550            };
551            for partition in topic.partitions {
552                let leader = i32::from(partition.leader_id);
553                if leader < 0 {
554                    continue;
555                }
556                if let Some(host) = brokers.get(&leader) {
557                    let tp = (topic_name.to_string(), partition.partition_index);
558                    self.leaders.insert(tp.clone(), host.clone());
559                    self.offsets.entry(tp).or_insert(0);
560                }
561            }
562        }
563
564        if self.leaders.is_empty() {
565            return Err(Error::Kafka(KafkaCode::LeaderNotAvailable));
566        }
567
568        Ok(())
569    }
570
571    async fn refresh_coordinator(&mut self) -> Result<()> {
572        let request_host = if let Some(connected) = self.client.connected_hosts().first() {
573            (*connected).to_owned()
574        } else {
575            self.client
576                .bootstrap_hosts()
577                .first()
578                .cloned()
579                .ok_or_else(no_host_reachable_error)?
580        };
581
582        let correlation = self.next_correlation();
583        let client_id = self.client.client_id().to_owned();
584        let conn = self.client.get_connection(&request_host).await?;
585        let (header, request) =
586            build_find_coordinator_request(correlation, &client_id, &self.group);
587        send_kp_request(conn, &header, &request, API_VERSION_FIND_COORDINATOR).await?;
588        let response =
589            get_kp_response::<FindCoordinatorResponse>(conn, API_VERSION_FIND_COORDINATOR).await?;
590
591        let (error_code, host, port) = if let Some(c) = response.coordinators.first() {
592            (c.error_code, c.host.to_string(), c.port)
593        } else {
594            (
595                response.error_code,
596                response.host.to_string(),
597                response.port,
598            )
599        };
600
601        if error_code != 0 {
602            if let Some(code) = map_kafka_code(error_code) {
603                return Err(Error::Kafka(code));
604            }
605            return Err(Error::Kafka(KafkaCode::Unknown));
606        }
607
608        self.coordinator = Some(format!("{host}:{port}"));
609        Ok(())
610    }
611
612    async fn ensure_start_offsets(&mut self) -> Result<()> {
613        let missing: Vec<(String, i32)> = self
614            .leaders
615            .keys()
616            .filter(|tp| !self.offsets.contains_key(*tp))
617            .cloned()
618            .collect();
619        if missing.is_empty() {
620            return Ok(());
621        }
622
623        self.client.ensure_connected().await?;
624        if self.coordinator.is_none() {
625            self.refresh_coordinator().await?;
626        }
627
628        let committed = self.fetch_committed_offsets(&missing).await?;
629        for tp in missing {
630            if let Some(offset) = committed.get(&tp)
631                && *offset >= 0
632            {
633                self.offsets.insert(tp.clone(), *offset);
634                continue;
635            }
636
637            let fallback = self.resolve_fallback_offset(&tp).await?;
638            self.offsets.insert(tp, fallback);
639        }
640
641        Ok(())
642    }
643
644    async fn fetch_committed_offsets(
645        &mut self,
646        partitions: &[(String, i32)],
647    ) -> Result<HashMap<(String, i32), i64>> {
648        let Some(coordinator) = self.coordinator.clone() else {
649            return Err(Error::Kafka(KafkaCode::GroupCoordinatorNotAvailable));
650        };
651
652        let client_id = self.client.client_id().to_owned();
653        let correlation = self.next_correlation();
654        let req_parts: Vec<(&str, i32)> = partitions
655            .iter()
656            .map(|(topic, partition)| (topic.as_str(), *partition))
657            .collect();
658
659        let conn = self.client.get_connection(&coordinator).await?;
660        let (header, request) =
661            build_offset_fetch_request(correlation, &client_id, &self.group, &req_parts);
662        send_kp_request(conn, &header, &request, API_VERSION_OFFSET_FETCH).await?;
663        let response =
664            get_kp_response::<OffsetFetchResponse>(conn, API_VERSION_OFFSET_FETCH).await?;
665
666        let mut committed = HashMap::new();
667        for topic in response.topics {
668            for partition in topic.partitions {
669                if partition.error_code != 0 {
670                    if let Some(code) = map_kafka_code(partition.error_code) {
671                        return Err(Error::Kafka(code));
672                    }
673                    return Err(Error::Kafka(KafkaCode::Unknown));
674                }
675                committed.insert(
676                    (topic.name.to_string(), partition.partition_index),
677                    partition.committed_offset,
678                );
679            }
680        }
681        Ok(committed)
682    }
683
684    async fn resolve_fallback_offset(&mut self, tp: &(String, i32)) -> Result<i64> {
685        let Some(leader) = self.leaders.get(tp).cloned() else {
686            return Err(Error::Kafka(KafkaCode::LeaderNotAvailable));
687        };
688
689        let timestamp = match self.fallback_offset {
690            FetchOffset::Earliest => -2,
691            FetchOffset::Latest => -1,
692            FetchOffset::ByTime(t) => t,
693        };
694
695        let correlation = self.next_correlation();
696        let client_id = self.client.client_id().to_owned();
697        let conn = self.client.get_connection(&leader).await?;
698        let (header, request) = build_list_offsets_request(
699            correlation,
700            &client_id,
701            &[(tp.0.as_str(), tp.1, timestamp)],
702        );
703        send_kp_request(conn, &header, &request, API_VERSION_LIST_OFFSETS).await?;
704        let response =
705            get_kp_response::<ListOffsetsResponse>(conn, API_VERSION_LIST_OFFSETS).await?;
706
707        for topic in response.topics {
708            if topic.name.as_str() != tp.0.as_str() {
709                continue;
710            }
711            for partition in topic.partitions {
712                if partition.partition_index != tp.1 {
713                    continue;
714                }
715                if partition.error_code != 0 {
716                    if let Some(code) = map_kafka_code(partition.error_code) {
717                        return Err(Error::Kafka(code));
718                    }
719                    return Err(Error::Kafka(KafkaCode::Unknown));
720                }
721                return Ok(partition.offset);
722            }
723        }
724
725        Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition))
726    }
727
728    fn error_stats(&self) -> NativeConsumerErrorStats {
729        NativeConsumerErrorStats {
730            total_errors: self.observability.total_errors,
731            kafka_code_counts: self.observability.kafka_code_counts.clone(),
732            class_counts: self.observability.class_counts.clone(),
733            last_error: self.observability.last_error.clone(),
734            recent_errors: self.observability.recent_errors.iter().cloned().collect(),
735        }
736    }
737
738    fn reset_error_stats(&mut self) {
739        self.observability.clear();
740    }
741
742    fn record_error(&mut self, phase: &str, err: &Error) {
743        self.observability.total_errors = self.observability.total_errors.saturating_add(1);
744        let class = error_class(err);
745        let kafka_code = kafka_code_from_error(err).map(kafka_code_to_i16);
746        let kafka_code_label = kafka_code.map(|code| code.to_string());
747
748        *self
749            .observability
750            .class_counts
751            .entry(class.clone())
752            .or_insert(0) += 1;
753        if let Some(code) = &kafka_code_label {
754            *self
755                .observability
756                .kafka_code_counts
757                .entry(code.clone())
758                .or_insert(0) += 1;
759        }
760        let snapshot = NativeConsumerErrorSnapshot {
761            phase: phase.to_owned(),
762            class,
763            kafka_code: kafka_code.and_then(map_kafka_code),
764            message: err.to_string(),
765            timestamp_unix_ms: now_unix_ms(),
766        };
767        self.observability.last_error = Some(snapshot.clone());
768        self.observability.recent_errors.push_back(snapshot.clone());
769        while self.observability.recent_errors.len() > self.observability.recent_error_limit {
770            let _ = self.observability.recent_errors.pop_front();
771        }
772        crate::metrics::record_native_consumer_error(
773            phase,
774            &snapshot.class,
775            kafka_code_label.as_deref(),
776            self.observability.recent_errors.len(),
777            snapshot.timestamp_unix_ms,
778        );
779    }
780}
781
782fn build_metadata_request(
783    correlation_id: i32,
784    client_id: &str,
785    topics: &[String],
786) -> (RequestHeader, MetadataRequest) {
787    let header = RequestHeader::default()
788        .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
789        .with_request_api_key(ApiKey::Metadata as i16)
790        .with_request_api_version(API_VERSION_METADATA)
791        .with_correlation_id(correlation_id);
792
793    let request_topics: Vec<MetadataRequestTopic> = topics
794        .iter()
795        .map(|topic| {
796            MetadataRequestTopic::default()
797                .with_name(Some(TopicName::from(StrBytes::from_string(topic.clone()))))
798        })
799        .collect();
800
801    let request = MetadataRequest::default().with_topics(Some(request_topics));
802    (header, request)
803}
804
805fn build_fetch_request(
806    correlation_id: i32,
807    client_id: &str,
808    partitions: &[(&str, i32, i64, i32)],
809) -> (RequestHeader, FetchRequest) {
810    let header = RequestHeader::default()
811        .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
812        .with_request_api_key(ApiKey::Fetch as i16)
813        .with_request_api_version(API_VERSION_FETCH)
814        .with_correlation_id(correlation_id);
815
816    let mut topic_map: HashMap<&str, Vec<KpFetchPartition>> = HashMap::new();
817    for (topic, partition, offset, partition_max_bytes) in partitions {
818        topic_map.entry(topic).or_default().push(
819            KpFetchPartition::default()
820                .with_partition(*partition)
821                .with_fetch_offset(*offset)
822                .with_partition_max_bytes(*partition_max_bytes),
823        );
824    }
825
826    let topics: Vec<KpFetchTopic> = topic_map
827        .into_iter()
828        .map(|(topic_name, fetch_partitions)| {
829            KpFetchTopic::default()
830                .with_topic(TopicName::from(StrBytes::from_string(
831                    topic_name.to_string(),
832                )))
833                .with_partitions(fetch_partitions)
834        })
835        .collect();
836
837    let request = FetchRequest::default()
838        .with_replica_id(kafka_protocol::messages::BrokerId::from(-1))
839        .with_max_wait_ms(FETCH_MAX_WAIT_MS)
840        .with_min_bytes(FETCH_MIN_BYTES)
841        .with_max_bytes(i32::MAX)
842        .with_isolation_level(0)
843        .with_topics(topics);
844
845    (header, request)
846}
847
848fn build_find_coordinator_request(
849    correlation_id: i32,
850    client_id: &str,
851    group_id: &str,
852) -> (RequestHeader, FindCoordinatorRequest) {
853    let header = RequestHeader::default()
854        .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
855        .with_request_api_key(ApiKey::FindCoordinator as i16)
856        .with_request_api_version(API_VERSION_FIND_COORDINATOR)
857        .with_correlation_id(correlation_id);
858
859    let request = FindCoordinatorRequest::default()
860        .with_key(StrBytes::from_string(group_id.to_owned()))
861        .with_key_type(0);
862
863    (header, request)
864}
865
866fn build_offset_commit_request(
867    correlation_id: i32,
868    client_id: &str,
869    group_id: &str,
870    offsets: &[(&str, i32, i64)],
871) -> (RequestHeader, OffsetCommitRequest) {
872    let header = RequestHeader::default()
873        .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
874        .with_request_api_key(ApiKey::OffsetCommit as i16)
875        .with_request_api_version(API_VERSION_OFFSET_COMMIT)
876        .with_correlation_id(correlation_id);
877
878    let mut topic_map: HashMap<&str, Vec<OffsetCommitRequestPartition>> = HashMap::new();
879    for (topic, partition, offset) in offsets {
880        topic_map.entry(topic).or_default().push(
881            OffsetCommitRequestPartition::default()
882                .with_partition_index(*partition)
883                .with_committed_offset(*offset)
884                .with_committed_metadata(None),
885        );
886    }
887
888    let topics: Vec<OffsetCommitRequestTopic> = topic_map
889        .into_iter()
890        .map(|(name, partitions)| {
891            OffsetCommitRequestTopic::default()
892                .with_name(TopicName::from(StrBytes::from_string(name.to_string())))
893                .with_partitions(partitions)
894        })
895        .collect();
896
897    let request = OffsetCommitRequest::default()
898        .with_group_id(GroupId::from(StrBytes::from_string(group_id.to_owned())))
899        .with_generation_id_or_member_epoch(-1)
900        .with_member_id(StrBytes::from_string(String::new()))
901        .with_retention_time_ms(-1)
902        .with_topics(topics);
903
904    (header, request)
905}
906
907fn build_offset_fetch_request(
908    correlation_id: i32,
909    client_id: &str,
910    group_id: &str,
911    partitions: &[(&str, i32)],
912) -> (RequestHeader, OffsetFetchRequest) {
913    let header = RequestHeader::default()
914        .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
915        .with_request_api_key(ApiKey::OffsetFetch as i16)
916        .with_request_api_version(API_VERSION_OFFSET_FETCH)
917        .with_correlation_id(correlation_id);
918
919    let mut topic_map: HashMap<&str, Vec<i32>> = HashMap::new();
920    for (topic, partition) in partitions {
921        topic_map.entry(topic).or_default().push(*partition);
922    }
923
924    let topics: Vec<OffsetFetchRequestTopic> = topic_map
925        .into_iter()
926        .map(|(topic, partition_indexes)| {
927            OffsetFetchRequestTopic::default()
928                .with_name(TopicName::from(StrBytes::from_string(topic.to_owned())))
929                .with_partition_indexes(partition_indexes)
930        })
931        .collect();
932
933    let request = OffsetFetchRequest::default()
934        .with_group_id(GroupId::from(StrBytes::from_string(group_id.to_owned())))
935        .with_topics(Some(topics));
936    (header, request)
937}
938
939fn build_list_offsets_request(
940    correlation_id: i32,
941    client_id: &str,
942    partitions: &[(&str, i32, i64)],
943) -> (RequestHeader, ListOffsetsRequest) {
944    let header = RequestHeader::default()
945        .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
946        .with_request_api_key(ApiKey::ListOffsets as i16)
947        .with_request_api_version(API_VERSION_LIST_OFFSETS)
948        .with_correlation_id(correlation_id);
949
950    let mut topic_map: HashMap<&str, Vec<ListOffsetsPartition>> = HashMap::new();
951    for (topic, partition, timestamp) in partitions {
952        topic_map.entry(topic).or_default().push(
953            ListOffsetsPartition::default()
954                .with_partition_index(*partition)
955                .with_timestamp(*timestamp),
956        );
957    }
958
959    let topics: Vec<ListOffsetsTopic> = topic_map
960        .into_iter()
961        .map(|(topic, parts)| {
962            ListOffsetsTopic::default()
963                .with_name(TopicName::from(StrBytes::from_string(topic.to_owned())))
964                .with_partitions(parts)
965        })
966        .collect();
967
968    let request = ListOffsetsRequest::default()
969        .with_replica_id(BrokerId::from(-1))
970        .with_isolation_level(0)
971        .with_topics(topics);
972    (header, request)
973}
974
975async fn send_kp_request<T>(
976    conn: &mut AsyncConnection,
977    header: &RequestHeader,
978    body: &T,
979    api_version: i16,
980) -> Result<()>
981where
982    T: Encodable + kafka_protocol::protocol::HeaderVersion,
983{
984    let header_version = T::header_version(api_version);
985
986    let mut header_buf = BytesMut::new();
987    header
988        .encode(&mut header_buf, header_version)
989        .map_err(|_| Error::Protocol(ProtocolError::Codec))?;
990
991    let mut body_buf = BytesMut::new();
992    body.encode(&mut body_buf, api_version)
993        .map_err(|_| Error::Protocol(ProtocolError::Codec))?;
994
995    let total_len = usize_to_i32(header_buf.len() + body_buf.len())?;
996    let mut out = BytesMut::with_capacity(4 + non_negative_i32_to_usize(total_len)?);
997    out.extend_from_slice(&total_len.to_be_bytes());
998    out.extend_from_slice(&header_buf);
999    out.extend_from_slice(&body_buf);
1000
1001    conn.send(&out).await
1002}
1003
1004async fn get_kp_response<R>(conn: &mut AsyncConnection, api_version: i16) -> Result<R>
1005where
1006    R: Decodable + kafka_protocol::protocol::HeaderVersion,
1007{
1008    let size_bytes = conn.read_exact(4).await?;
1009    let size = i32::from_be_bytes(
1010        <[u8; 4]>::try_from(size_bytes.as_ref())
1011            .map_err(|_| Error::Protocol(ProtocolError::Codec))?,
1012    );
1013    let mut bytes = conn.read_exact(non_negative_i32_to_u64(size)?).await?;
1014
1015    let response_header_version = R::header_version(api_version);
1016    let _resp_header = ResponseHeader::decode(&mut bytes, response_header_version)
1017        .map_err(|_| Error::Protocol(ProtocolError::Codec))?;
1018
1019    R::decode(&mut bytes, api_version).map_err(|_| Error::Protocol(ProtocolError::Codec))
1020}
1021
1022async fn get_fetch_response(
1023    conn: &mut AsyncConnection,
1024    requested_version: i16,
1025) -> Result<FetchResponse> {
1026    let size_bytes = conn.read_exact(4).await?;
1027    let size = i32::from_be_bytes(
1028        <[u8; 4]>::try_from(size_bytes.as_ref())
1029            .map_err(|_| Error::Protocol(ProtocolError::Codec))?,
1030    );
1031    let resp_bytes = conn.read_exact(non_negative_i32_to_u64(size)?).await?;
1032
1033    let mut candidates = Vec::with_capacity(1 + 18);
1034    candidates.push(requested_version);
1035    for v in (0..=17).rev() {
1036        if v != requested_version {
1037            candidates.push(v);
1038        }
1039    }
1040
1041    for version in candidates {
1042        let mut bytes = resp_bytes.clone();
1043        let header_version = FetchResponse::header_version(version);
1044        if ResponseHeader::decode(&mut bytes, header_version).is_err() {
1045            continue;
1046        }
1047        if let Ok(resp) = FetchResponse::decode(&mut bytes, version) {
1048            return Ok(resp);
1049        }
1050    }
1051
1052    Err(Error::Protocol(ProtocolError::Codec))
1053}
1054
1055fn convert_fetch_response(
1056    kp_resp: FetchResponse,
1057    correlation_id: i32,
1058) -> rustfs_kafka::client::fetch_kp::OwnedFetchResponse {
1059    use rustfs_kafka::client::fetch_kp::{OwnedFetchResponse, OwnedPartition, OwnedTopic};
1060
1061    let topics = kp_resp
1062        .responses
1063        .into_iter()
1064        .map(|t| {
1065            let topic_name = t.topic.to_string();
1066            let partitions: Vec<OwnedPartition> = t
1067                .partitions
1068                .into_iter()
1069                .map(|p| {
1070                    let data = if p.error_code != 0 {
1071                        Err(Arc::new(Error::TopicPartitionError {
1072                            topic_name: topic_name.clone(),
1073                            partition_id: p.partition_index,
1074                            error_code: map_kafka_code(p.error_code).unwrap_or(KafkaCode::Unknown),
1075                        }))
1076                    } else {
1077                        decode_partition_records(p.records, p.high_watermark)
1078                    };
1079                    OwnedPartition {
1080                        partition: p.partition_index,
1081                        data,
1082                        highwatermark: p.high_watermark,
1083                    }
1084                })
1085                .collect();
1086            OwnedTopic {
1087                topic: topic_name,
1088                partitions,
1089            }
1090        })
1091        .collect();
1092
1093    OwnedFetchResponse {
1094        correlation_id,
1095        topics,
1096    }
1097}
1098
1099fn first_fetch_error_code(
1100    resp: &rustfs_kafka::client::fetch_kp::OwnedFetchResponse,
1101) -> Option<KafkaCode> {
1102    for topic in &resp.topics {
1103        for partition in &topic.partitions {
1104            if let Err(err) = partition.data()
1105                && let Error::TopicPartitionError { error_code, .. } = &**err
1106            {
1107                return Some(*error_code);
1108            }
1109        }
1110    }
1111    None
1112}
1113
1114fn should_retry_poll(err: &Error) -> bool {
1115    match err {
1116        Error::Kafka(code) => matches!(
1117            code,
1118            KafkaCode::LeaderNotAvailable
1119                | KafkaCode::NotLeaderForPartition
1120                | KafkaCode::RequestTimedOut
1121                | KafkaCode::NetworkException
1122        ),
1123        Error::Connection(_) => true,
1124        _ => false,
1125    }
1126}
1127
1128fn should_retry_commit(err: &Error) -> bool {
1129    match err {
1130        Error::Kafka(code) => matches!(
1131            code,
1132            KafkaCode::GroupCoordinatorNotAvailable
1133                | KafkaCode::NotCoordinatorForGroup
1134                | KafkaCode::GroupLoadInProgress
1135                | KafkaCode::RequestTimedOut
1136                | KafkaCode::NetworkException
1137        ),
1138        Error::Connection(_) => true,
1139        _ => false,
1140    }
1141}
1142
1143fn kafka_code_from_error(err: &Error) -> Option<KafkaCode> {
1144    match err {
1145        Error::Kafka(code) => Some(*code),
1146        Error::TopicPartitionError { error_code, .. } => Some(*error_code),
1147        Error::BrokerRequestError { source, .. } => kafka_code_from_error(source),
1148        _ => None,
1149    }
1150}
1151
1152fn error_class(err: &Error) -> String {
1153    match err {
1154        Error::Kafka(_) => "kafka".to_owned(),
1155        Error::Connection(_) => "connection".to_owned(),
1156        Error::Protocol(_) => "protocol".to_owned(),
1157        Error::Config(_) => "config".to_owned(),
1158        Error::Consumer(_) => "consumer".to_owned(),
1159        Error::TopicPartitionError { .. } => "topic_partition".to_owned(),
1160        Error::BrokerRequestError { .. } => "broker_request".to_owned(),
1161    }
1162}
1163
1164fn now_unix_ms() -> u128 {
1165    SystemTime::now()
1166        .duration_since(UNIX_EPOCH)
1167        .map_or(0, |d| d.as_millis())
1168}
1169
1170fn decode_partition_records(
1171    records: Option<Bytes>,
1172    high_watermark: i64,
1173) -> std::result::Result<rustfs_kafka::client::fetch_kp::OwnedData, Arc<Error>> {
1174    use rustfs_kafka::client::fetch_kp::{OwnedData, OwnedMessage};
1175
1176    let Some(mut records_bytes) = records else {
1177        return Ok(OwnedData {
1178            highwatermark_offset: high_watermark,
1179            messages: vec![],
1180        });
1181    };
1182    if records_bytes.is_empty() {
1183        return Ok(OwnedData {
1184            highwatermark_offset: high_watermark,
1185            messages: vec![],
1186        });
1187    }
1188
1189    let Ok(record_set) = RecordBatchDecoder::decode(&mut records_bytes) else {
1190        return Err(Arc::new(Error::Protocol(ProtocolError::Codec)));
1191    };
1192
1193    let mut messages: Vec<OwnedMessage> = Vec::new();
1194    for record in &record_set.records {
1195        messages.push(OwnedMessage {
1196            offset: record.offset,
1197            key: record.key.clone().unwrap_or_default(),
1198            value: record.value.clone().unwrap_or_default(),
1199        });
1200    }
1201
1202    Ok(OwnedData {
1203        highwatermark_offset: high_watermark,
1204        messages,
1205    })
1206}
1207
1208fn map_kafka_code(code: i16) -> Option<KafkaCode> {
1209    match code {
1210        0 => None,
1211        1 => Some(KafkaCode::OffsetOutOfRange),
1212        2 => Some(KafkaCode::CorruptMessage),
1213        3 => Some(KafkaCode::UnknownTopicOrPartition),
1214        4 => Some(KafkaCode::InvalidMessageSize),
1215        5 => Some(KafkaCode::LeaderNotAvailable),
1216        6 => Some(KafkaCode::NotLeaderForPartition),
1217        7 => Some(KafkaCode::RequestTimedOut),
1218        8 => Some(KafkaCode::BrokerNotAvailable),
1219        9 => Some(KafkaCode::ReplicaNotAvailable),
1220        10 => Some(KafkaCode::MessageSizeTooLarge),
1221        11 => Some(KafkaCode::StaleControllerEpoch),
1222        12 => Some(KafkaCode::OffsetMetadataTooLarge),
1223        13 => Some(KafkaCode::NetworkException),
1224        14 => Some(KafkaCode::GroupLoadInProgress),
1225        15 => Some(KafkaCode::GroupCoordinatorNotAvailable),
1226        16 => Some(KafkaCode::NotCoordinatorForGroup),
1227        17 => Some(KafkaCode::InvalidTopic),
1228        18 => Some(KafkaCode::RecordListTooLarge),
1229        19 => Some(KafkaCode::NotEnoughReplicas),
1230        20 => Some(KafkaCode::NotEnoughReplicasAfterAppend),
1231        21 => Some(KafkaCode::InvalidRequiredAcks),
1232        22 => Some(KafkaCode::IllegalGeneration),
1233        23 => Some(KafkaCode::InconsistentGroupProtocol),
1234        24 => Some(KafkaCode::InvalidGroupId),
1235        25 => Some(KafkaCode::UnknownMemberId),
1236        26 => Some(KafkaCode::InvalidSessionTimeout),
1237        27 => Some(KafkaCode::RebalanceInProgress),
1238        28 => Some(KafkaCode::InvalidCommitOffsetSize),
1239        29 => Some(KafkaCode::TopicAuthorizationFailed),
1240        30 => Some(KafkaCode::GroupAuthorizationFailed),
1241        31 => Some(KafkaCode::ClusterAuthorizationFailed),
1242        32 => Some(KafkaCode::InvalidTimestamp),
1243        33 => Some(KafkaCode::UnsupportedSaslMechanism),
1244        34 => Some(KafkaCode::IllegalSaslState),
1245        35 => Some(KafkaCode::UnsupportedVersion),
1246        _ => Some(KafkaCode::Unknown),
1247    }
1248}
1249
1250fn kafka_code_to_i16(code: KafkaCode) -> i16 {
1251    match code {
1252        KafkaCode::OffsetOutOfRange => 1,
1253        KafkaCode::CorruptMessage => 2,
1254        KafkaCode::UnknownTopicOrPartition => 3,
1255        KafkaCode::InvalidMessageSize => 4,
1256        KafkaCode::LeaderNotAvailable => 5,
1257        KafkaCode::NotLeaderForPartition => 6,
1258        KafkaCode::RequestTimedOut => 7,
1259        KafkaCode::BrokerNotAvailable => 8,
1260        KafkaCode::ReplicaNotAvailable => 9,
1261        KafkaCode::MessageSizeTooLarge => 10,
1262        KafkaCode::StaleControllerEpoch => 11,
1263        KafkaCode::OffsetMetadataTooLarge => 12,
1264        KafkaCode::NetworkException => 13,
1265        KafkaCode::GroupLoadInProgress => 14,
1266        KafkaCode::GroupCoordinatorNotAvailable => 15,
1267        KafkaCode::NotCoordinatorForGroup => 16,
1268        KafkaCode::InvalidTopic => 17,
1269        KafkaCode::RecordListTooLarge => 18,
1270        KafkaCode::NotEnoughReplicas => 19,
1271        KafkaCode::NotEnoughReplicasAfterAppend => 20,
1272        KafkaCode::InvalidRequiredAcks => 21,
1273        KafkaCode::IllegalGeneration => 22,
1274        KafkaCode::InconsistentGroupProtocol => 23,
1275        KafkaCode::InvalidGroupId => 24,
1276        KafkaCode::UnknownMemberId => 25,
1277        KafkaCode::InvalidSessionTimeout => 26,
1278        KafkaCode::RebalanceInProgress => 27,
1279        KafkaCode::InvalidCommitOffsetSize => 28,
1280        KafkaCode::TopicAuthorizationFailed => 29,
1281        KafkaCode::GroupAuthorizationFailed => 30,
1282        KafkaCode::ClusterAuthorizationFailed => 31,
1283        KafkaCode::InvalidTimestamp => 32,
1284        KafkaCode::UnsupportedSaslMechanism => 33,
1285        KafkaCode::IllegalSaslState => 34,
1286        KafkaCode::UnsupportedVersion => 35,
1287        KafkaCode::Unknown => -1,
1288    }
1289}
1290
1291fn usize_to_i32(value: usize) -> Result<i32> {
1292    i32::try_from(value).map_err(|_| Error::Protocol(ProtocolError::Codec))
1293}
1294
1295fn non_negative_i32_to_usize(value: i32) -> Result<usize> {
1296    usize::try_from(value).map_err(|_| Error::Protocol(ProtocolError::Codec))
1297}
1298
1299fn non_negative_i32_to_u64(value: i32) -> Result<u64> {
1300    u64::try_from(value).map_err(|_| Error::Protocol(ProtocolError::Codec))
1301}
1302
1303fn no_host_reachable_error() -> Error {
1304    Error::Connection(rustfs_kafka::error::ConnectionError::NoHostReachable)
1305}
1306
1307#[cfg(test)]
1308mod tests {
1309    use rustfs_kafka::error::{ConnectionError, Error};
1310
1311    use super::*;
1312
1313    #[tokio::test]
1314    async fn from_hosts_fails_with_unreachable_hosts() {
1315        let result = AsyncConsumer::from_hosts(
1316            vec!["127.0.0.1:1".to_owned()],
1317            "test-group".to_owned(),
1318            vec!["test-topic".to_owned()],
1319        )
1320        .await;
1321        assert!(matches!(
1322            result,
1323            Err(Error::Connection(ConnectionError::NoHostReachable))
1324        ));
1325    }
1326
1327    #[tokio::test]
1328    async fn from_client_fails_with_unreachable_hosts() {
1329        let client = AsyncKafkaClient::new(vec![]).await.unwrap();
1330        let result = AsyncConsumer::from_client(
1331            client,
1332            "test-group".to_owned(),
1333            vec!["test-topic".to_owned()],
1334        )
1335        .await;
1336        assert!(result.is_ok());
1337    }
1338
1339    #[tokio::test]
1340    async fn drop_consumer_without_close_does_not_panic() {
1341        let result = AsyncConsumer::from_hosts(
1342            vec!["127.0.0.1:1".to_owned()],
1343            "test-drop-group".to_owned(),
1344            vec!["test-drop-topic".to_owned()],
1345        )
1346        .await;
1347        assert!(result.is_err());
1348    }
1349
1350    #[tokio::test]
1351    async fn builder_without_group_returns_error() {
1352        let result = AsyncConsumer::builder(vec![])
1353            .with_topic("t".to_owned())
1354            .build()
1355            .await;
1356        assert!(matches!(
1357            result,
1358            Err(Error::Consumer(ConsumerError::UnsetGroupId))
1359        ));
1360    }
1361
1362    #[tokio::test]
1363    async fn builder_without_topics_returns_error() {
1364        let result = AsyncConsumer::builder(vec![])
1365            .with_group("g".to_owned())
1366            .build()
1367            .await;
1368        assert!(matches!(
1369            result,
1370            Err(Error::Consumer(ConsumerError::NoTopicsAssigned))
1371        ));
1372    }
1373
1374    #[tokio::test]
1375    async fn native_observability_tracks_recent_snapshots_and_numeric_codes() {
1376        let client = AsyncKafkaClient::new(vec![]).await.unwrap();
1377        let mut native = NativeConsumer {
1378            client,
1379            group: "g".to_owned(),
1380            topics: vec!["t".to_owned()],
1381            fallback_offset: FetchOffset::Latest,
1382            offsets: HashMap::new(),
1383            dirty_offsets: HashMap::new(),
1384            leaders: HashMap::new(),
1385            coordinator: None,
1386            correlation: 1,
1387            retry_attempts: DEFAULT_NATIVE_RETRY_ATTEMPTS,
1388            retry_backoff: Duration::from_millis(DEFAULT_NATIVE_RETRY_BACKOFF_MS),
1389            observability: NativeConsumerObservability::new(2),
1390        };
1391
1392        native.record_error("poll", &Error::Kafka(KafkaCode::LeaderNotAvailable));
1393        native.record_error("poll", &Error::Kafka(KafkaCode::NotLeaderForPartition));
1394        native.record_error(
1395            "commit",
1396            &Error::Kafka(KafkaCode::GroupCoordinatorNotAvailable),
1397        );
1398
1399        let stats = native.error_stats();
1400        assert_eq!(stats.total_errors, 3);
1401        assert_eq!(stats.kafka_code_counts.get("5").copied(), Some(1));
1402        assert_eq!(stats.kafka_code_counts.get("6").copied(), Some(1));
1403        assert_eq!(stats.kafka_code_counts.get("15").copied(), Some(1));
1404        assert_eq!(stats.recent_errors.len(), 2);
1405        assert_eq!(stats.recent_errors[0].phase, "poll");
1406        assert_eq!(stats.recent_errors[1].phase, "commit");
1407
1408        native.reset_error_stats();
1409        let reset = native.error_stats();
1410        assert_eq!(reset.total_errors, 0);
1411        assert!(reset.kafka_code_counts.is_empty());
1412        assert!(reset.recent_errors.is_empty());
1413    }
1414}