kafka/client/
mod.rs

1//! Kafka Client - A mid-level abstraction for a kafka cluster
2//! allowing building higher level constructs.
3//!
4//! The entry point into this module is `KafkaClient` obtained by a
5//! call to `KafkaClient::new()`.
6
7use std;
8use std::collections::hash_map;
9use std::collections::hash_map::HashMap;
10use std::io::Cursor;
11use std::iter::Iterator;
12use std::mem;
13use std::thread;
14use std::time::{Duration, Instant};
15
16// pub re-export
17pub use crate::compression::Compression;
18pub use crate::utils::PartitionOffset;
19
20#[cfg(feature = "security")]
21pub use self::network::SecurityConfig;
22
23use crate::codecs::{FromByte, ToByte};
24use crate::error::{Error, KafkaCode, Result};
25use crate::protocol::{self, ResponseParser};
26
27use crate::client_internals::KafkaClientInternals;
28
29pub mod metadata;
30mod network;
31mod state;
32
33// ~ re-export (only) certain types from the protocol::fetch module as
34// 'client::fetch'.
35pub mod fetch {
36    //! A representation of fetched messages from Kafka.
37
38    pub use crate::protocol::fetch::{Data, Message, Partition, Response, Topic};
39}
40
41const DEFAULT_CONNECTION_RW_TIMEOUT_SECS: u64 = 120;
42
43fn default_conn_rw_timeout() -> Option<Duration> {
44    match DEFAULT_CONNECTION_RW_TIMEOUT_SECS {
45        0 => None,
46        n => Some(Duration::from_secs(n)),
47    }
48}
49
50/// The default value for `KafkaClient::set_compression(..)`
51pub const DEFAULT_COMPRESSION: Compression = Compression::NONE;
52
53/// The default value for `KafkaClient::set_fetch_max_wait_time(..)`
54pub const DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS: u64 = 100;
55
56/// The default value for `KafkaClient::set_fetch_min_bytes(..)`
57pub const DEFAULT_FETCH_MIN_BYTES: i32 = 4096;
58
59/// The default value for `KafkaClient::set_fetch_max_bytes(..)`
60pub const DEFAULT_FETCH_MAX_BYTES_PER_PARTITION: i32 = 32 * 1024;
61
62/// The default value for `KafkaClient::set_fetch_crc_validation(..)`
63pub const DEFAULT_FETCH_CRC_VALIDATION: bool = true;
64
65/// The default value for `KafkaClient::set_group_offset_storage(..)`
66pub const DEFAULT_GROUP_OFFSET_STORAGE: Option<GroupOffsetStorage> = None;
67
68/// The default value for `KafkaClient::set_retry_backoff_time(..)`
69pub const DEFAULT_RETRY_BACKOFF_TIME_MILLIS: u64 = 100;
70
71/// The default value for `KafkaClient::set_retry_max_attempts(..)`
72// the default value: re-attempt a repeatable operation for
73// approximetaly up to two minutes
74pub const DEFAULT_RETRY_MAX_ATTEMPTS: u32 = 120_000 / DEFAULT_RETRY_BACKOFF_TIME_MILLIS as u32;
75
76/// The default value for `KafkaClient::set_connection_idle_timeout(..)`
77pub const DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS: u64 = 540_000;
78
79/// Client struct keeping track of brokers and topic metadata.
80///
81/// Implements methods described by the [Kafka Protocol](http://kafka.apache.org/protocol.html).
82///
83/// You will have to load metadata before making any other request.
84#[derive(Debug)]
85pub struct KafkaClient {
86    // ~ this kafka client configuration
87    config: ClientConfig,
88
89    // ~ a pool of re-usable connections to kafka brokers
90    conn_pool: network::Connections,
91
92    // ~ the current state of this client
93    state: state::ClientState,
94}
95
96#[derive(Debug)]
97struct ClientConfig {
98    client_id: String,
99    hosts: Vec<String>,
100    // ~ compression to use when sending messages
101    compression: Compression,
102    // ~ these are the defaults when fetching messages for details
103    // refer to the kafka wire protocol
104    fetch_max_wait_time: i32,
105    fetch_min_bytes: i32,
106    fetch_max_bytes_per_partition: i32,
107    fetch_crc_validation: bool,
108    // ~ the version of the API to use for the corresponding kafka
109    // calls; note that this might have an effect on the storage type
110    // kafka will then use (zookeeper or __consumer_offsets).  it is
111    // important to use version for both of them which target the same
112    // storage type.
113    // offset_fetch_version: protocol::OffsetFetchVersion,
114    // offset_commit_version: protocol::OffsetCommitVersion,
115    offset_storage: Option<GroupOffsetStorage>,
116    // ~ the duration to wait before retrying a failed
117    // operation like refreshing group coordinators; this avoids
118    // operation retries in a tight loop.
119    retry_backoff_time: Duration,
120    // ~ the number of repeated retry attempts; prevents endless
121    // repetition of a retry attempt
122    retry_max_attempts: u32,
123}
124
125// --------------------------------------------------------------------
126
127/// Possible values when querying a topic's offset.
128/// See `KafkaClient::fetch_offsets`.
129#[derive(Debug, Copy, Clone)]
130pub enum FetchOffset {
131    /// Receive the earliest available offset.
132    Earliest,
133    /// Receive the latest offset.
134    Latest,
135    /// Used to ask for all messages before a certain time (ms); unix
136    /// timestamp in milliseconds.
137    /// See https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka#WritingaDriverforKafka-Offsets
138    ByTime(i64),
139}
140
141impl FetchOffset {
142    fn to_kafka_value(self) -> i64 {
143        match self {
144            FetchOffset::Earliest => -2,
145            FetchOffset::Latest => -1,
146            FetchOffset::ByTime(n) => n,
147        }
148    }
149}
150
151// --------------------------------------------------------------------
152
153/// Defines the available storage types to utilize when fetching or
154/// committing group offsets.  See also `KafkaClient::set_group_offset_storage`.
155#[derive(Debug, Copy, Clone, PartialEq, Eq)]
156pub enum GroupOffsetStorage {
157    /// Zookeeper based storage (available as of kafka 0.8.1)
158    Zookeeper,
159    /// Kafka based storage (available as of Kafka 0.8.2). This is the
160    /// preferred method for groups to store their offsets at.
161    Kafka,
162}
163
164impl GroupOffsetStorage {
165    fn offset_fetch_version(self) -> protocol::OffsetFetchVersion {
166        match self {
167            GroupOffsetStorage::Zookeeper => protocol::OffsetFetchVersion::V0,
168            GroupOffsetStorage::Kafka => protocol::OffsetFetchVersion::V1,
169        }
170    }
171    fn offset_commit_version(self) -> protocol::OffsetCommitVersion {
172        match self {
173            GroupOffsetStorage::Zookeeper => protocol::OffsetCommitVersion::V0,
174            // ~ if we knew we'll be communicating with a kafka 0.9+
175            // broker we could set the commit-version to V2; however,
176            // since we still want to support Kafka 0.8.2 versions,
177            // we'll go with the less efficient but safe option V1.
178            GroupOffsetStorage::Kafka => protocol::OffsetCommitVersion::V1,
179        }
180    }
181}
182
183/// Data point identifying a topic partition to fetch a group's offset
184/// for.  See `KafkaClient::fetch_group_offsets`.
185#[derive(Debug)]
186pub struct FetchGroupOffset<'a> {
187    /// The topic to fetch the group offset for
188    pub topic: &'a str,
189    /// The partition to fetch the group offset for
190    pub partition: i32,
191}
192
193impl<'a> FetchGroupOffset<'a> {
194    #[inline]
195    pub fn new(topic: &'a str, partition: i32) -> Self {
196        FetchGroupOffset { topic, partition }
197    }
198}
199
200impl<'a> AsRef<FetchGroupOffset<'a>> for FetchGroupOffset<'a> {
201    fn as_ref(&self) -> &Self {
202        self
203    }
204}
205
206// --------------------------------------------------------------------
207
208/// Data point identifying a particular topic partition offset to be
209/// committed.
210/// See `KafkaClient::commit_offsets`.
211#[derive(Debug)]
212pub struct CommitOffset<'a> {
213    /// The offset to be committed
214    pub offset: i64,
215    /// The topic to commit the offset for
216    pub topic: &'a str,
217    /// The partition to commit the offset for
218    pub partition: i32,
219}
220
221impl<'a> CommitOffset<'a> {
222    pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
223        CommitOffset {
224            topic,
225            partition,
226            offset,
227        }
228    }
229}
230
231impl<'a> AsRef<CommitOffset<'a>> for CommitOffset<'a> {
232    fn as_ref(&self) -> &Self {
233        self
234    }
235}
236
237// --------------------------------------------------------------------
238
239/// Possible choices on acknowledgement requirements when
240/// producing/sending messages to Kafka. See
241/// `KafkaClient::produce_messages`.
242#[derive(Debug, Copy, Clone)]
243pub enum RequiredAcks {
244    /// Indicates to the receiving Kafka broker not to acknowledge
245    /// messages sent to it at all. Sending messages with this
246    /// acknowledgement requirement translates into a fire-and-forget
247    /// scenario which - of course - is very fast but not reliable.
248    None = 0,
249    /// Requires the receiving Kafka broker to wait until the sent
250    /// messages are written to local disk.  Such messages can be
251    /// regarded as acknowledged by one broker in the cluster.
252    One = 1,
253    /// Requires the sent messages to be acknowledged by all in-sync
254    /// replicas of the targeted topic partitions.
255    All = -1,
256}
257
258// --------------------------------------------------------------------
259
260/// Message data to be sent/produced to a particular topic partition.
261/// See `KafkaClient::produce_messages` and `Producer::send`.
262#[derive(Debug)]
263pub struct ProduceMessage<'a, 'b> {
264    /// The "key" data of this message.
265    pub key: Option<&'b [u8]>,
266
267    /// The "value" data of this message.
268    pub value: Option<&'b [u8]>,
269
270    /// The topic to produce this message to.
271    pub topic: &'a str,
272
273    /// The partition (of the corresponding topic) to produce this
274    /// message to.
275    pub partition: i32,
276}
277
278impl<'a, 'b> AsRef<ProduceMessage<'a, 'b>> for ProduceMessage<'a, 'b> {
279    fn as_ref(&self) -> &Self {
280        self
281    }
282}
283
284impl<'a, 'b> ProduceMessage<'a, 'b> {
285    /// A convenient constructor method to create a new produce
286    /// message with all attributes specified.
287    pub fn new(
288        topic: &'a str,
289        partition: i32,
290        key: Option<&'b [u8]>,
291        value: Option<&'b [u8]>,
292    ) -> Self {
293        ProduceMessage {
294            key,
295            value,
296            topic,
297            partition,
298        }
299    }
300}
301
302// --------------------------------------------------------------------
303
304/// Partition related request data for fetching messages.
305/// See `KafkaClient::fetch_messages`.
306#[derive(Debug)]
307pub struct FetchPartition<'a> {
308    /// The topic to fetch messages from.
309    pub topic: &'a str,
310
311    /// The offset as of which to fetch messages.
312    pub offset: i64,
313
314    /// The partition to fetch messasges from.
315    pub partition: i32,
316
317    /// Specifies the max. amount of data to fetch (for this
318    /// partition.)  This implicitly defines the biggest message the
319    /// client can accept.  If this value is too small, no messages
320    /// can be delivered.  Setting this size should be in sync with
321    /// the producers to the partition.
322    ///
323    /// Zero or negative values are treated as "unspecified".
324    pub max_bytes: i32,
325}
326
327impl<'a> FetchPartition<'a> {
328    /// Creates a new "fetch messages" request structure with an
329    /// unspecified `max_bytes`.
330    pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
331        FetchPartition {
332            topic,
333            partition,
334            offset,
335            max_bytes: -1,
336        }
337    }
338
339    /// Sets the `max_bytes` value for the "fetch messages" request.
340    pub fn with_max_bytes(mut self, max_bytes: i32) -> Self {
341        self.max_bytes = max_bytes;
342        self
343    }
344}
345
346impl<'a> AsRef<FetchPartition<'a>> for FetchPartition<'a> {
347    fn as_ref(&self) -> &Self {
348        self
349    }
350}
351
352/// A confirmation of messages sent back by the Kafka broker
353/// to confirm delivery of producer messages.
354#[derive(Debug)]
355pub struct ProduceConfirm {
356    /// The topic the messages were sent to.
357    pub topic: String,
358
359    /// The list of individual confirmations for each offset and partition.
360    pub partition_confirms: Vec<ProducePartitionConfirm>,
361}
362
363/// A confirmation of messages sent back by the Kafka broker
364/// to confirm delivery of producer messages for a particular topic.
365#[derive(Debug)]
366pub struct ProducePartitionConfirm {
367    /// The offset assigned to the first message in the message set appended
368    /// to this partition, or an error if one occurred.
369    pub offset: std::result::Result<i64, KafkaCode>,
370
371    /// The partition to which the message(s) were appended.
372    pub partition: i32,
373}
374
375// --------------------------------------------------------------------
376
377impl KafkaClient {
378    /// Creates a new instance of KafkaClient. Before being able to
379    /// successfully use the new client, you'll have to load metadata.
380    ///
381    /// # Examples
382    ///
383    /// ```no_run
384    /// let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
385    /// client.load_metadata_all().unwrap();
386    /// ```
387    pub fn new(hosts: Vec<String>) -> KafkaClient {
388        KafkaClient {
389            config: ClientConfig {
390                client_id: String::new(),
391                hosts,
392                compression: DEFAULT_COMPRESSION,
393                fetch_max_wait_time: protocol::to_millis_i32(Duration::from_millis(
394                    DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS,
395                ))
396                .expect("invalid default-fetch-max-time-millis"),
397                fetch_min_bytes: DEFAULT_FETCH_MIN_BYTES,
398                fetch_max_bytes_per_partition: DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
399                fetch_crc_validation: DEFAULT_FETCH_CRC_VALIDATION,
400                offset_storage: DEFAULT_GROUP_OFFSET_STORAGE,
401                retry_backoff_time: Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
402                retry_max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
403            },
404            conn_pool: network::Connections::new(
405                default_conn_rw_timeout(),
406                Duration::from_millis(DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS),
407            ),
408            state: state::ClientState::new(),
409        }
410    }
411
412    /// Creates a new secure instance of KafkaClient. Before being able to
413    /// successfully use the new client, you'll have to load metadata.
414    ///
415    /// # Examples
416    ///
417    /// ```no_run
418    /// extern crate openssl;
419    /// extern crate kafka;
420    ///
421    /// use openssl::ssl::{SslConnector, SslMethod, SslFiletype, SslVerifyMode};
422    /// use kafka::client::{KafkaClient, SecurityConfig};
423    ///
424    /// fn main() {
425    ///     let (key, cert) = ("client.key".to_string(), "client.crt".to_string());
426    ///
427    ///     // OpenSSL offers a variety of complex configurations. Here is an example:
428    ///     let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
429    ///     builder.set_cipher_list("DEFAULT").unwrap();
430    ///     builder
431    ///         .set_certificate_file(cert, SslFiletype::PEM)
432    ///         .unwrap();
433    ///     builder
434    ///         .set_private_key_file(key, SslFiletype::PEM)
435    ///         .unwrap();
436    ///     builder.check_private_key().unwrap();
437    ///     builder.set_default_verify_paths().unwrap();
438    ///     builder.set_verify(SslVerifyMode::PEER);
439    ///     let connector = builder.build();
440    ///
441    ///     let mut client = KafkaClient::new_secure(vec!("localhost:9092".to_owned()),
442    ///                                              SecurityConfig::new(connector));
443    ///     client.load_metadata_all().unwrap();
444    /// }
445    /// ```
446    /// See also `SecurityConfig#with_hostname_verification` to disable hostname verification.
447    ///
448    /// See also `KafkaClient::load_metadatata_all` and
449    /// `KafkaClient::load_metadata` methods, the creates
450    /// [openssl](https://crates.io/crates/openssl)
451    /// and [openssl_verify](https://crates.io/crates/openssl-verify),
452    /// as well as
453    /// [Kafka's documentation](https://kafka.apache.org/documentation.html#security_ssl).
454    #[cfg(feature = "security")]
455    pub fn new_secure(hosts: Vec<String>, security: SecurityConfig) -> KafkaClient {
456        KafkaClient {
457            config: ClientConfig {
458                client_id: String::new(),
459                hosts,
460                compression: DEFAULT_COMPRESSION,
461                fetch_max_wait_time: protocol::to_millis_i32(Duration::from_millis(
462                    DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS,
463                ))
464                .expect("invalid default-fetch-max-time-millis"),
465                fetch_min_bytes: DEFAULT_FETCH_MIN_BYTES,
466                fetch_max_bytes_per_partition: DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
467                fetch_crc_validation: DEFAULT_FETCH_CRC_VALIDATION,
468                offset_storage: DEFAULT_GROUP_OFFSET_STORAGE,
469                retry_backoff_time: Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
470                retry_max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
471            },
472            conn_pool: network::Connections::new_with_security(
473                default_conn_rw_timeout(),
474                Duration::from_millis(DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS),
475                Some(security),
476            ),
477            state: state::ClientState::new(),
478        }
479    }
480
481    /// Exposes the hosts used for discovery of the target kafka
482    /// cluster.  This set of hosts corresponds to the values supplied
483    /// to `KafkaClient::new`.
484    #[inline]
485    pub fn hosts(&self) -> &[String] {
486        &self.config.hosts
487    }
488
489    /// Sets the client_id to be sent along every request to the
490    /// remote Kafka brokers.  By default, this value is the empty
491    /// string.
492    ///
493    /// Kafka brokers write out this client id to their
494    /// request/response trace log - if configured appropriately.
495    pub fn set_client_id(&mut self, client_id: String) {
496        self.config.client_id = client_id;
497    }
498
499    /// Retrieves the current `KafkaClient::set_client_id` setting.
500    pub fn client_id(&self) -> &str {
501        &self.config.client_id
502    }
503
504    /// Sets the compression algorithm to use when sending out messages.
505    ///
506    /// # Example
507    ///
508    /// ```no_run
509    /// use kafka::client::{Compression, KafkaClient};
510    ///
511    /// let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
512    /// client.load_metadata_all().unwrap();
513    /// client.set_compression(Compression::NONE);
514    /// ```
515    #[inline]
516    pub fn set_compression(&mut self, compression: Compression) {
517        self.config.compression = compression;
518    }
519
520    /// Retrieves the current `KafkaClient::set_compression` setting.
521    #[inline]
522    pub fn compression(&self) -> Compression {
523        self.config.compression
524    }
525
526    /// Sets the maximum time in milliseconds to wait for insufficient
527    /// data to become available when fetching messages.
528    ///
529    /// See also `KafkaClient::set_fetch_min_bytes(..)` and
530    /// `KafkaClient::set_fetch_max_bytes_per_partition(..)`.
531    #[inline]
532    pub fn set_fetch_max_wait_time(&mut self, max_wait_time: Duration) -> Result<()> {
533        self.config.fetch_max_wait_time = protocol::to_millis_i32(max_wait_time)?;
534        Ok(())
535    }
536
537    /// Retrieves the current `KafkaClient::set_fetch_max_wait_time`
538    /// setting.
539    #[inline]
540    pub fn fetch_max_wait_time(&self) -> Duration {
541        Duration::from_millis(self.config.fetch_max_wait_time as u64)
542    }
543
544    /// Sets the minimum number of bytes of available data to wait for
545    /// as long as specified by `KafkaClient::set_fetch_max_wait_time`
546    /// when fetching messages.
547    ///
548    /// By setting higher values in combination with the timeout the
549    /// consumer can tune for throughput and trade a little additional
550    /// latency for reading only large chunks of data (e.g. setting
551    /// MaxWaitTime to 100 ms and setting MinBytes to 64k would allow
552    /// the server to wait up to 100ms to try to accumulate 64k of
553    /// data before responding).
554    ///
555    /// # Example
556    ///
557    /// ```no_run
558    /// use std::time::Duration;
559    /// use kafka::client::{KafkaClient, FetchPartition};
560    ///
561    /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
562    /// client.load_metadata_all().unwrap();
563    /// client.set_fetch_max_wait_time(Duration::from_millis(100));
564    /// client.set_fetch_min_bytes(64 * 1024);
565    /// let r = client.fetch_messages(&[FetchPartition::new("my-topic", 0, 0)]);
566    /// ```
567    ///
568    /// See also `KafkaClient::set_fetch_max_wait_time(..)` and
569    /// `KafkaClient::set_fetch_max_bytes_per_partition(..)`.
570    #[inline]
571    pub fn set_fetch_min_bytes(&mut self, min_bytes: i32) {
572        self.config.fetch_min_bytes = min_bytes;
573    }
574
575    /// Retrieves the current `KafkaClient::set_fetch_min_bytes`
576    /// setting.
577    #[inline]
578    pub fn fetch_min_bytes(&self) -> i32 {
579        self.config.fetch_min_bytes
580    }
581
582    /// Sets the default maximum number of bytes to obtain from _a
583    /// single kafka partition_ when fetching messages.
584    ///
585    /// This basically determines the maximum message size this client
586    /// will be able to fetch.  If a topic partition contains a
587    /// message larger than this specified number of bytes, the server
588    /// will not deliver it.
589    ///
590    /// Note that this setting is related to a single partition.  The
591    /// overall potential data size in a fetch messages response will
592    /// thus be determined by the number of partitions in the fetch
593    /// messages request times this "max bytes per partitions."
594    ///
595    /// This client will use this setting by default for all queried
596    /// partitions, however, `fetch_messages` does allow you to
597    /// override this setting for a particular partition being
598    /// queried.
599    ///
600    /// See also `KafkaClient::set_fetch_max_wait_time`,
601    /// `KafkaClient::set_fetch_min_bytes`, and `KafkaClient::fetch_messages`.
602    #[inline]
603    pub fn set_fetch_max_bytes_per_partition(&mut self, max_bytes: i32) {
604        self.config.fetch_max_bytes_per_partition = max_bytes;
605    }
606
607    /// Retrieves the current
608    /// `KafkaClient::set_fetch_max_bytes_per_partition` setting.
609    #[inline]
610    pub fn fetch_max_bytes_per_partition(&self) -> i32 {
611        self.config.fetch_max_bytes_per_partition
612    }
613
614    /// Specifies whether the to perform CRC validation on fetched
615    /// messages.
616    ///
617    /// This ensures detection of on-the-wire or on-disk corruption to
618    /// fetched messages.  This check adds some overhead, so it may be
619    /// disabled in cases seeking extreme performance.
620    #[inline]
621    pub fn set_fetch_crc_validation(&mut self, validate_crc: bool) {
622        self.config.fetch_crc_validation = validate_crc;
623    }
624
625    /// Retrieves the current `KafkaClient::set_fetch_crc_validation`
626    /// setting.
627    #[inline]
628    pub fn fetch_crc_validation(&self) -> bool {
629        self.config.fetch_crc_validation
630    }
631
632    /// Specifies the group offset storage to address when fetching or
633    /// committing group offsets.
634    ///
635    /// In addition to Zookeeper, Kafka 0.8.2 brokers or later offer a
636    /// more performant (and scalable) way to manage group offset
637    /// directly by itself. Note that the remote storages are separate
638    /// and independent on each other. Hence, you typically want
639    /// consistently hard-code your choice in your program.
640    ///
641    /// Unless you have a 0.8.1 broker or want to participate in a
642    /// group which is already based on Zookeeper, you generally want
643    /// to choose `GroupOffsetStorage::Kafka` here.
644    ///
645    /// See also `KafkaClient::fetch_group_offsets` and
646    /// `KafkaClient::commit_offsets`.
647    #[inline]
648    pub fn set_group_offset_storage(&mut self, storage: Option<GroupOffsetStorage>) {
649        self.config.offset_storage = storage;
650    }
651
652    /// Retrieves the current `KafkaClient::set_group_offset_storage`
653    /// settings.
654    pub fn group_offset_storage(&self) -> Option<GroupOffsetStorage> {
655        self.config.offset_storage
656    }
657
658    /// Specifies the time to wait before retrying a failed,
659    /// repeatable operation against Kafka.  This avoids retrying such
660    /// operations in a tight loop.
661    #[inline]
662    pub fn set_retry_backoff_time(&mut self, time: Duration) {
663        self.config.retry_backoff_time = time;
664    }
665
666    /// Retrieves the current `KafkaClient::set_retry_backoff_time`
667    /// setting.
668    pub fn retry_backoff_time(&self) -> Duration {
669        self.config.retry_backoff_time
670    }
671
672    /// Specifies the upper limit of retry attempts for failed,
673    /// repeatable operations against kafka.  This avoids retrying
674    /// them forever.
675    #[inline]
676    pub fn set_retry_max_attempts(&mut self, attempts: u32) {
677        self.config.retry_max_attempts = attempts;
678    }
679
680    /// Retrieves the current `KafkaClient::set_retry_max_attempts`
681    /// setting.
682    #[inline]
683    pub fn retry_max_attempts(&self) -> u32 {
684        self.config.retry_max_attempts
685    }
686
687    /// Specifies the timeout after which idle connections will
688    /// transparently be closed/re-established by `KafkaClient`.
689    ///
690    /// To be effective this value must be smaller than the [remote
691    /// broker's `connections.max.idle.ms`
692    /// setting](https://kafka.apache.org/documentation.html#brokerconfigs).
693    #[inline]
694    pub fn set_connection_idle_timeout(&mut self, timeout: Duration) {
695        self.conn_pool.set_idle_timeout(timeout);
696    }
697
698    /// Retrieves the current
699    /// `KafkaClient::set_connection_idle_timeout` setting.
700    #[inline]
701    pub fn connection_idle_timeout(&self) -> Duration {
702        self.conn_pool.idle_timeout()
703    }
704
705    /// Provides a view onto the currently loaded metadata of known .
706    ///
707    /// # Examples
708    /// ```no_run
709    /// use kafka::client::KafkaClient;
710    /// use kafka::client::metadata::Broker;
711    ///
712    /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
713    /// client.load_metadata_all().unwrap();
714    /// for topic in client.topics() {
715    ///   for partition in topic.partitions() {
716    ///     println!("{} #{} => {}", topic.name(), partition.id(),
717    ///              partition.leader()
718    ///                       .map(Broker::host)
719    ///                       .unwrap_or("no-leader!"));
720    ///   }
721    /// }
722    /// ```
723    #[inline]
724    pub fn topics(&self) -> metadata::Topics<'_> {
725        metadata::Topics::new(self)
726    }
727
728    /// Resets and loads metadata for all topics from the underlying
729    /// brokers.
730    ///
731    /// # Examples
732    ///
733    /// ```no_run
734    /// let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
735    /// client.load_metadata_all().unwrap();
736    /// for topic in client.topics().names() {
737    ///   println!("topic: {}", topic);
738    /// }
739    /// ```
740    ///
741    /// Returns the metadata for all loaded topics underlying this
742    /// client.
743    #[inline]
744    pub fn load_metadata_all(&mut self) -> Result<()> {
745        self.reset_metadata();
746        self.load_metadata::<&str>(&[])
747    }
748
749    /// Reloads metadata for a list of supplied topics.
750    ///
751    /// Note: if any of the specified topics does not exist yet on the
752    /// underlying brokers and these have the [configuration for "auto
753    /// create topics"
754    /// enabled](https://kafka.apache.org/documentation.html#configuration),
755    /// the remote kafka instance will create the yet missing topics
756    /// on the fly as a result of explicitly loading their metadata.
757    /// This is in contrast to other methods of this `KafkaClient`
758    /// which will silently filter out requests to
759    /// not-yet-loaded/not-yet-known topics and, thus, not cause
760    /// topics to be automatically created.
761    ///
762    /// # Examples
763    ///
764    /// ```no_run
765    /// let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
766    /// let _ = client.load_metadata(&["my-topic"]).unwrap();
767    /// ```
768    ///
769    /// Returns the metadata for _all_ loaded topics underlying this
770    /// client (this might be more topics than specified right to this
771    /// method call.)
772    #[inline]
773    pub fn load_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()> {
774        let resp = self.fetch_metadata(topics)?;
775        self.state.update_metadata(resp)
776    }
777
778    /// Clears metadata stored in the client.  You must load metadata
779    /// after this call if you want to use the client.
780    #[inline]
781    pub fn reset_metadata(&mut self) {
782        self.state.clear_metadata();
783    }
784
785    /// Fetches metadata about the specified topics from all of the
786    /// underlying brokers (`self.hosts`).
787    fn fetch_metadata<T: AsRef<str>>(
788        &mut self,
789        topics: &[T],
790    ) -> Result<protocol::MetadataResponse> {
791        let correlation = self.state.next_correlation_id();
792        let now = Instant::now();
793
794        for host in &self.config.hosts {
795            debug!("fetch_metadata: requesting metadata from {}", host);
796            match self.conn_pool.get_conn(host, now) {
797                Ok(conn) => {
798                    let req =
799                        protocol::MetadataRequest::new(correlation, &self.config.client_id, topics);
800                    match __send_request(conn, req) {
801                        Ok(_) => return __get_response::<protocol::MetadataResponse>(conn),
802                        Err(e) => debug!(
803                            "fetch_metadata: failed to request metadata from {}: {}",
804                            host, e
805                        ),
806                    }
807                }
808                Err(e) => {
809                    debug!("fetch_metadata: failed to connect to {}: {}", host, e);
810                }
811            }
812        }
813        Err(Error::NoHostReachable)
814    }
815
816    /// Fetch offsets for a list of topics
817    ///
818    /// # Examples
819    ///
820    /// ```no_run
821    /// use kafka::client::KafkaClient;
822    ///
823    /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
824    /// client.load_metadata_all().unwrap();
825    /// let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
826    /// let offsets = client.fetch_offsets(&topics, kafka::client::FetchOffset::Latest).unwrap();
827    /// ```
828    ///
829    /// Returns a mapping of topic name to `PartitionOffset`s for each
830    /// currently available partition of the corresponding topic.
831    pub fn fetch_offsets<T: AsRef<str>>(
832        &mut self,
833        topics: &[T],
834        offset: FetchOffset,
835    ) -> Result<HashMap<String, Vec<PartitionOffset>>> {
836        let time = offset.to_kafka_value();
837        let n_topics = topics.len();
838
839        let state = &mut self.state;
840        let correlation = state.next_correlation_id();
841
842        // Map topic and partition to the corresponding broker
843        let config = &self.config;
844        let mut reqs: HashMap<&str, protocol::OffsetRequest<'_>> = HashMap::with_capacity(n_topics);
845        for topic in topics {
846            let topic = topic.as_ref();
847            if let Some(ps) = state.partitions_for(topic) {
848                for (id, host) in ps
849                    .iter()
850                    .filter_map(|(id, p)| p.broker(state).map(|b| (id, b.host())))
851                {
852                    let entry = reqs.entry(host).or_insert_with(|| {
853                        protocol::OffsetRequest::new(correlation, &config.client_id)
854                    });
855                    entry.add(topic, id, time);
856                }
857            }
858        }
859
860        // Call each broker with the request formed earlier
861        let now = Instant::now();
862        let mut res: HashMap<String, Vec<PartitionOffset>> = HashMap::with_capacity(n_topics);
863        for (host, req) in reqs {
864            let resp =
865                __send_receive::<_, protocol::OffsetResponse>(&mut self.conn_pool, host, now, req)?;
866            for tp in resp.topic_partitions {
867                let mut entry = res.entry(tp.topic);
868                let mut new_resp_offsets = None;
869                let mut err = None;
870                // Use an explicit scope here to allow insertion into a vacant entry
871                // below
872                {
873                    // Return a &mut to the response we will be collecting into to
874                    // return from this function. If there are some responses we have
875                    // already prepared, keep collecting into that; otherwise, make a
876                    // new collection to return.
877                    let resp_offsets = match entry {
878                        hash_map::Entry::Occupied(ref mut e) => e.get_mut(),
879                        hash_map::Entry::Vacant(_) => {
880                            new_resp_offsets = Some(Vec::new());
881                            new_resp_offsets.as_mut().unwrap()
882                        }
883                    };
884                    for p in tp.partitions {
885                        let partition_offset = match p.to_offset() {
886                            Ok(po) => po,
887                            Err(code) => {
888                                err = Some((p.partition, code));
889                                break;
890                            }
891                        };
892                        resp_offsets.push(partition_offset);
893                    }
894                }
895                if let Some((partition, code)) = err {
896                    let topic = KafkaClient::get_key_from_entry(entry);
897                    return Err(Error::TopicPartitionError {
898                        topic_name: topic,
899                        partition_id: partition,
900                        error_code: code,
901                    });
902                }
903                if let hash_map::Entry::Vacant(e) = entry {
904                    // unwrap is ok because if it is Vacant, it would have
905                    // been made into a Some above
906                    e.insert(new_resp_offsets.unwrap());
907                }
908            }
909        }
910
911        Ok(res)
912    }
913
914    /// Takes ownership back from the given HashMap Entry.
915    fn get_key_from_entry<'a, K: 'a, V: 'a>(entry: hash_map::Entry<'a, K, V>) -> K {
916        match entry {
917            hash_map::Entry::Occupied(e) => e.remove_entry().0,
918            hash_map::Entry::Vacant(e) => e.into_key(),
919        }
920    }
921
922    /// Fetch offset for a single topic.
923    ///
924    /// # Examples
925    ///
926    /// ```no_run
927    /// use kafka::client::{KafkaClient, FetchOffset};
928    ///
929    /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
930    /// client.load_metadata_all().unwrap();
931    /// let offsets = client.fetch_topic_offsets("my-topic", FetchOffset::Latest).unwrap();
932    /// ```
933    ///
934    /// Returns a vector of the offset data for each available partition.
935    /// See also `KafkaClient::fetch_offsets`.
936    pub fn fetch_topic_offsets<T: AsRef<str>>(
937        &mut self,
938        topic: T,
939        offset: FetchOffset,
940    ) -> Result<Vec<PartitionOffset>> {
941        let topic = topic.as_ref();
942
943        let mut m = self.fetch_offsets(&[topic], offset)?;
944        let offs = m.remove(topic).unwrap_or_default();
945        if offs.is_empty() {
946            Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition))
947        } else {
948            Ok(offs)
949        }
950    }
951
952    /// Fetch messages from Kafka (multiple topic, partitions).
953    ///
954    /// It takes a vector specifying the topic partitions and their
955    /// offsets as of which to fetch messages.  Additionally, the
956    /// default "max fetch size per partition" can be explicitly
957    /// overridden if it is "defined" - this is, if `max_bytes` is
958    /// greater than zero.
959    ///
960    /// The result is exposed in a raw, complicated manner but allows
961    /// for very efficient consumption possibilities. All of the data
962    /// available through the returned fetch responses is bound to
963    /// their lifetime as that data is merely a "view" into parts of
964    /// the response structs.  If you need to keep individual messages
965    /// for a longer time than the whole fetch responses, you'll need
966    /// to make a copy of the message data.
967    ///
968    /// * This method transparently uncompresses messages (while Kafka
969    /// might sent them in compressed format.)
970    ///
971    /// * This method ensures to skip messages with a lower offset
972    /// than requested (while Kafka might for efficiency reasons sent
973    /// messages with a lower offset.)
974    ///
975    /// Note: before using this method consider using
976    /// `kafka::consumer::Consumer` instead which provides an easier
977    /// to use API for the regular use-case of fetching messesage from
978    /// Kafka.
979    ///
980    /// # Example
981    ///
982    /// This example demonstrates iterating all fetched messages from
983    /// two topic partitions.  From one partition we allow Kafka to
984    /// deliver to us the default number bytes as defined by
985    /// `KafkaClient::set_fetch_max_bytes_per_partition`, from the
986    /// other partition we allow Kafka to deliver up to 1MiB of
987    /// messages.
988    ///
989    /// ```no_run
990    /// use kafka::client::{KafkaClient, FetchPartition};
991    ///
992    /// let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
993    /// client.load_metadata_all().unwrap();
994    /// let reqs = &[FetchPartition::new("my-topic", 0, 0),
995    ///              FetchPartition::new("my-topic-2", 0, 0).with_max_bytes(1024*1024)];
996    /// let resps = client.fetch_messages(reqs).unwrap();
997    /// for resp in resps {
998    ///   for t in resp.topics() {
999    ///     for p in t.partitions() {
1000    ///       match p.data() {
1001    ///         Err(ref e) => {
1002    ///           println!("partition error: {}:{}: {}", t.topic(), p.partition(), e)
1003    ///         }
1004    ///         Ok(ref data) => {
1005    ///           println!("topic: {} / partition: {} / latest available message offset: {}",
1006    ///                    t.topic(), p.partition(), data.highwatermark_offset());
1007    ///           for msg in data.messages() {
1008    ///             println!("topic: {} / partition: {} / message.offset: {} / message.len: {}",
1009    ///                      t.topic(), p.partition(), msg.offset, msg.value.len());
1010    ///           }
1011    ///         }
1012    ///       }
1013    ///     }
1014    ///   }
1015    /// }
1016    /// ```
1017    /// See also `kafka::consumer`.
1018    /// See also `KafkaClient::set_fetch_max_bytes_per_partition`.
1019    pub fn fetch_messages<'a, I, J>(&mut self, input: I) -> Result<Vec<fetch::Response>>
1020    where
1021        J: AsRef<FetchPartition<'a>>,
1022        I: IntoIterator<Item = J>,
1023    {
1024        let state = &mut self.state;
1025        let config = &self.config;
1026
1027        let correlation = state.next_correlation_id();
1028
1029        // Map topic and partition to the corresponding broker
1030        let mut reqs: HashMap<&str, protocol::FetchRequest<'_, '_>> = HashMap::new();
1031        for inp in input {
1032            let inp = inp.as_ref();
1033            if let Some(broker) = state.find_broker(inp.topic, inp.partition) {
1034                reqs.entry(broker)
1035                    .or_insert_with(|| {
1036                        protocol::FetchRequest::new(
1037                            correlation,
1038                            &config.client_id,
1039                            config.fetch_max_wait_time,
1040                            config.fetch_min_bytes,
1041                        )
1042                    })
1043                    .add(
1044                        inp.topic,
1045                        inp.partition,
1046                        inp.offset,
1047                        if inp.max_bytes > 0 {
1048                            inp.max_bytes
1049                        } else {
1050                            config.fetch_max_bytes_per_partition
1051                        },
1052                    );
1053            }
1054        }
1055
1056        __fetch_messages(&mut self.conn_pool, config, reqs)
1057    }
1058
1059    /// Fetch messages from a single kafka partition.
1060    ///
1061    /// See `KafkaClient::fetch_messages`.
1062    pub fn fetch_messages_for_partition<'a>(
1063        &mut self,
1064        req: &FetchPartition<'a>,
1065    ) -> Result<Vec<fetch::Response>> {
1066        self.fetch_messages(&[req])
1067    }
1068
1069    /// Send a message to Kafka
1070    ///
1071    /// `required_acks` - indicates how many acknowledgements the
1072    /// servers should receive before responding to the request
1073    ///
1074    /// `ack_timeout` - provides a maximum time in milliseconds the
1075    /// server can await the receipt of the number of acknowledgements
1076    /// in `required_acks`
1077    ///
1078    /// `input` - the set of `ProduceMessage`s to send
1079    ///
1080    /// Note: Unlike the higher-level `Producer` API, this method will
1081    /// *not* automatically determine the partition to deliver the
1082    /// message to.  It will strictly try to send the message to the
1083    /// specified partition.
1084    ///
1085    /// Note: Trying to send messages to non-existing topics or
1086    /// non-existing partitions will result in an error.
1087    ///
1088    /// # Example
1089    ///
1090    /// ```no_run
1091    /// use std::time::Duration;
1092    /// use kafka::client::{KafkaClient, ProduceMessage, RequiredAcks};
1093    ///
1094    /// let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
1095    /// client.load_metadata_all().unwrap();
1096    /// let req = vec![ProduceMessage::new("my-topic", 0, None, Some("a".as_bytes())),
1097    ///                ProduceMessage::new("my-topic-2", 0, None, Some("b".as_bytes()))];
1098    /// let resp = client.produce_messages(RequiredAcks::One, Duration::from_millis(100), req);
1099    /// println!("{:?}", resp);
1100    /// ```
1101    ///
1102    /// The return value will contain a vector of topic, partition,
1103    /// offset and error if any OR error:Error.
1104
1105    // XXX rework signaling an error; note that we need to either return the
1106    // messages which kafka failed to accept or otherwise tell the client about them
1107
1108    pub fn produce_messages<'a, 'b, I, J>(
1109        &mut self,
1110        acks: RequiredAcks,
1111        ack_timeout: Duration,
1112        messages: I,
1113    ) -> Result<Vec<ProduceConfirm>>
1114    where
1115        J: AsRef<ProduceMessage<'a, 'b>>,
1116        I: IntoIterator<Item = J>,
1117    {
1118        self.internal_produce_messages(acks as i16, protocol::to_millis_i32(ack_timeout)?, messages)
1119    }
1120
1121    /// Commit offset for a topic partitions on behalf of a consumer group.
1122    ///
1123    /// # Examples
1124    ///
1125    /// ```no_run
1126    /// use kafka::client::{KafkaClient, CommitOffset};
1127    ///
1128    /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
1129    /// client.load_metadata_all().unwrap();
1130    /// client.commit_offsets("my-group",
1131    ///     &[CommitOffset::new("my-topic", 0, 100),
1132    ///       CommitOffset::new("my-topic", 1, 99)])
1133    ///    .unwrap();
1134    /// ```
1135    ///
1136    /// In this example, we commit the offset 100 for the topic
1137    /// partition "my-topic:0" and 99 for the topic partition
1138    /// "my-topic:1".  Once successfully committed, these can then be
1139    /// retrieved using `fetch_group_offsets` even from another
1140    /// process or at much later point in time to resume comusing the
1141    /// topic partitions as of these offsets.
1142    pub fn commit_offsets<'a, J, I>(&mut self, group: &str, offsets: I) -> Result<()>
1143    where
1144        J: AsRef<CommitOffset<'a>>,
1145        I: IntoIterator<Item = J>,
1146    {
1147        match self.config.offset_storage {
1148            Some(offset_storage) => {
1149                let mut req = protocol::OffsetCommitRequest::new(
1150                    group,
1151                    offset_storage.offset_commit_version(),
1152                    self.state.next_correlation_id(),
1153                    &self.config.client_id,
1154                );
1155                for o in offsets {
1156                    let o = o.as_ref();
1157                    if self.state.contains_topic_partition(o.topic, o.partition) {
1158                        req.add(o.topic, o.partition, o.offset, "");
1159                    } else {
1160                        return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
1161                    }
1162                }
1163                if req.topic_partitions.is_empty() {
1164                    debug!("commit_offsets: no offsets provided");
1165                    Ok(())
1166                } else {
1167                    __commit_offsets(req, &mut self.state, &mut self.conn_pool, &self.config)
1168                }
1169            }
1170            None => Err(Error::UnsetOffsetStorage),
1171        }
1172    }
1173
1174    /// Commit offset of a particular topic partition on behalf of a
1175    /// consumer group.
1176    ///
1177    /// # Examples
1178    ///
1179    /// ```no_run
1180    /// use kafka::client::KafkaClient;
1181    ///
1182    /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
1183    /// client.load_metadata_all().unwrap();
1184    /// client.commit_offset("my-group", "my-topic", 0, 100).unwrap();
1185    /// ```
1186    ///
1187    /// See also `KafkaClient::commit_offsets`.
1188    pub fn commit_offset(
1189        &mut self,
1190        group: &str,
1191        topic: &str,
1192        partition: i32,
1193        offset: i64,
1194    ) -> Result<()> {
1195        self.commit_offsets(group, &[CommitOffset::new(topic, partition, offset)])
1196    }
1197
1198    /// Fetch offset for a specified list of topic partitions of a consumer group
1199    ///
1200    /// # Examples
1201    ///
1202    /// ```no_run
1203    /// use kafka::client::{KafkaClient, FetchGroupOffset};
1204    ///
1205    /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
1206    /// client.load_metadata_all().unwrap();
1207    ///
1208    /// let offsets =
1209    ///      client.fetch_group_offsets("my-group",
1210    ///             &[FetchGroupOffset::new("my-topic", 0),
1211    ///               FetchGroupOffset::new("my-topic", 1)])
1212    ///             .unwrap();
1213    /// ```
1214    ///
1215    /// See also `KafkaClient::fetch_group_topic_offset`.
1216    pub fn fetch_group_offsets<'a, J, I>(
1217        &mut self,
1218        group: &str,
1219        partitions: I,
1220    ) -> Result<HashMap<String, Vec<PartitionOffset>>>
1221    where
1222        J: AsRef<FetchGroupOffset<'a>>,
1223        I: IntoIterator<Item = J>,
1224    {
1225        match self.config.offset_storage {
1226            Some(offset_storage) => {
1227                let mut req = protocol::OffsetFetchRequest::new(
1228                    group,
1229                    offset_storage.offset_fetch_version(),
1230                    self.state.next_correlation_id(),
1231                    &self.config.client_id,
1232                );
1233                for p in partitions {
1234                    let p = p.as_ref();
1235                    if self.state.contains_topic_partition(p.topic, p.partition) {
1236                        req.add(p.topic, p.partition);
1237                    } else {
1238                        return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
1239                    }
1240                }
1241                __fetch_group_offsets(req, &mut self.state, &mut self.conn_pool, &self.config)
1242            }
1243            None => Err(Error::UnsetOffsetStorage),
1244        }
1245    }
1246
1247    /// Fetch offset for all partitions of a particular topic of a consumer group
1248    ///
1249    /// # Examples
1250    ///
1251    /// ```no_run
1252    /// use kafka::client::KafkaClient;
1253    ///
1254    /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
1255    /// client.load_metadata_all().unwrap();
1256    /// let offsets = client.fetch_group_topic_offset("my-group", "my-topic").unwrap();
1257    /// ```
1258    pub fn fetch_group_topic_offset(
1259        &mut self,
1260        group: &str,
1261        topic: &str,
1262    ) -> Result<Vec<PartitionOffset>> {
1263        match self.config.offset_storage {
1264            Some(offset_storage) => {
1265                let mut req = protocol::OffsetFetchRequest::new(
1266                    group,
1267                    offset_storage.offset_fetch_version(),
1268                    self.state.next_correlation_id(),
1269                    &self.config.client_id,
1270                );
1271
1272                match self.state.partitions_for(topic) {
1273                    None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
1274                    Some(tp) => {
1275                        for (id, _) in tp {
1276                            req.add(topic, id);
1277                        }
1278                    }
1279                }
1280
1281                Ok(
1282                    __fetch_group_offsets(req, &mut self.state, &mut self.conn_pool, &self.config)?
1283                        .remove(topic)
1284                        .unwrap_or_default(),
1285                )
1286            }
1287            None => Err(Error::UnsetOffsetStorage),
1288        }
1289
1290        // Ok(
1291        //     __fetch_group_offsets(req, &mut self.state, &mut self.conn_pool, &self.config)?
1292        //         .remove(topic)
1293        //         .unwrap_or_default(),
1294        // )
1295    }
1296}
1297
1298impl KafkaClientInternals for KafkaClient {
1299    fn internal_produce_messages<'a, 'b, I, J>(
1300        &mut self,
1301        required_acks: i16,
1302        ack_timeout: i32,
1303        messages: I,
1304    ) -> Result<Vec<ProduceConfirm>>
1305    where
1306        J: AsRef<ProduceMessage<'a, 'b>>,
1307        I: IntoIterator<Item = J>,
1308    {
1309        let state = &mut self.state;
1310        let correlation = state.next_correlation_id();
1311
1312        // ~ map topic and partition to the corresponding brokers
1313        let config = &self.config;
1314        let mut reqs: HashMap<&str, protocol::ProduceRequest<'_, '_>> = HashMap::new();
1315        for msg in messages {
1316            let msg = msg.as_ref();
1317            match state.find_broker(msg.topic, msg.partition) {
1318                None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
1319                Some(broker) => reqs
1320                    .entry(broker)
1321                    .or_insert_with(|| {
1322                        protocol::ProduceRequest::new(
1323                            required_acks,
1324                            ack_timeout,
1325                            correlation,
1326                            &config.client_id,
1327                            config.compression,
1328                        )
1329                    })
1330                    .add(msg.topic, msg.partition, msg.key, msg.value),
1331            }
1332        }
1333        __produce_messages(&mut self.conn_pool, reqs, required_acks == 0)
1334    }
1335}
1336
1337fn __get_group_coordinator<'a>(
1338    group: &str,
1339    state: &'a mut state::ClientState,
1340    conn_pool: &mut network::Connections,
1341    config: &ClientConfig,
1342    now: Instant,
1343) -> Result<&'a str> {
1344    if let Some(host) = state.group_coordinator(group) {
1345        // ~ decouple the lifetimes to make borrowck happy;
1346        // this is actually safe since we're immediately
1347        // returning this, so the follow up code is not
1348        // affected here
1349        return Ok(unsafe { mem::transmute(host) });
1350    }
1351    let correlation_id = state.next_correlation_id();
1352    let req = protocol::GroupCoordinatorRequest::new(group, correlation_id, &config.client_id);
1353    let mut attempt = 1;
1354    loop {
1355        // ~ idealy we'd make this work even if `load_metadata` has not
1356        // been called yet; if there are no connections available we can
1357        // try connecting to the user specified bootstrap server similar
1358        // to the way `load_metadata` works.
1359        let conn = conn_pool.get_conn_any(now).expect("available connection");
1360        debug!(
1361            "get_group_coordinator: asking for coordinator of '{}' on: {:?}",
1362            group, conn
1363        );
1364        let r = __send_receive_conn::<_, protocol::GroupCoordinatorResponse>(conn, &req)?;
1365        let retry_code = match r.into_result() {
1366            Ok(r) => {
1367                return Ok(state.set_group_coordinator(group, &r));
1368            }
1369            Err(Error::Kafka(e @ KafkaCode::GroupCoordinatorNotAvailable)) => e,
1370            Err(e) => {
1371                return Err(e);
1372            }
1373        };
1374        if attempt < config.retry_max_attempts {
1375            debug!(
1376                "get_group_coordinator: will retry request (c: {}) due to: {:?}",
1377                req.header.correlation_id, retry_code
1378            );
1379            attempt += 1;
1380            __retry_sleep(config);
1381        } else {
1382            return Err(Error::Kafka(retry_code));
1383        }
1384    }
1385}
1386
1387fn __commit_offsets(
1388    req: protocol::OffsetCommitRequest<'_, '_>,
1389    state: &mut state::ClientState,
1390    conn_pool: &mut network::Connections,
1391    config: &ClientConfig,
1392) -> Result<()> {
1393    let mut attempt = 1;
1394    loop {
1395        let now = Instant::now();
1396
1397        let tps = {
1398            let host = __get_group_coordinator(req.group, state, conn_pool, config, now)?;
1399            debug!(
1400                "__commit_offsets: sending offset commit request '{:?}' to: {}",
1401                req, host
1402            );
1403            __send_receive::<_, protocol::OffsetCommitResponse>(conn_pool, host, now, &req)?
1404                .topic_partitions
1405        };
1406
1407        let mut retry_code = None;
1408
1409        'rproc: for tp in tps {
1410            for p in tp.partitions {
1411                match p.to_error() {
1412                    None => {}
1413                    Some(e @ KafkaCode::GroupLoadInProgress) => {
1414                        retry_code = Some(e);
1415                        break 'rproc;
1416                    }
1417                    Some(e @ KafkaCode::NotCoordinatorForGroup) => {
1418                        debug!(
1419                            "commit_offsets: resetting group coordinator for '{}'",
1420                            req.group
1421                        );
1422                        state.remove_group_coordinator(req.group);
1423                        retry_code = Some(e);
1424                        break 'rproc;
1425                    }
1426                    Some(code) => {
1427                        // ~ immediately abort with the error
1428                        return Err(Error::Kafka(code));
1429                    }
1430                }
1431            }
1432        }
1433        match retry_code {
1434            Some(e) => {
1435                if attempt < config.retry_max_attempts {
1436                    debug!(
1437                        "commit_offsets: will retry request (c: {}) due to: {:?}",
1438                        req.header.correlation_id, e
1439                    );
1440                    attempt += 1;
1441                    __retry_sleep(config);
1442                }
1443            }
1444            None => {
1445                return Ok(());
1446            }
1447        }
1448    }
1449}
1450
1451fn __fetch_group_offsets(
1452    req: protocol::OffsetFetchRequest<'_, '_, '_>,
1453    state: &mut state::ClientState,
1454    conn_pool: &mut network::Connections,
1455    config: &ClientConfig,
1456) -> Result<HashMap<String, Vec<PartitionOffset>>> {
1457    let mut attempt = 1;
1458    loop {
1459        let now = Instant::now();
1460
1461        let r = {
1462            let host = __get_group_coordinator(req.group, state, conn_pool, config, now)?;
1463            debug!(
1464                "fetch_group_offsets: sending request {:?} to: {}",
1465                req, host
1466            );
1467            __send_receive::<_, protocol::OffsetFetchResponse>(conn_pool, host, now, &req)?
1468        };
1469
1470        debug!("fetch_group_offsets: received response: {:#?}", r);
1471
1472        let mut retry_code = None;
1473        let mut topic_map = HashMap::with_capacity(r.topic_partitions.len());
1474
1475        'rproc: for tp in r.topic_partitions {
1476            let mut partition_offsets = Vec::with_capacity(tp.partitions.len());
1477
1478            for p in tp.partitions {
1479                match p.get_offsets() {
1480                    Ok(o) => {
1481                        partition_offsets.push(o);
1482                    }
1483                    Err(Error::Kafka(e @ KafkaCode::GroupLoadInProgress)) => {
1484                        retry_code = Some(e);
1485                        break 'rproc;
1486                    }
1487                    Err(Error::Kafka(e @ KafkaCode::NotCoordinatorForGroup)) => {
1488                        debug!(
1489                            "fetch_group_offsets: resetting group coordinator for '{}'",
1490                            req.group
1491                        );
1492                        state.remove_group_coordinator(req.group);
1493                        retry_code = Some(e);
1494                        break 'rproc;
1495                    }
1496                    Err(e) => {
1497                        // ~ immediately abort with the error
1498                        return Err(e);
1499                    }
1500                }
1501            }
1502
1503            topic_map.insert(tp.topic, partition_offsets);
1504        }
1505
1506        // ~ have we processed the result successfully or shall we
1507        // retry once more?
1508        match retry_code {
1509            Some(e) => {
1510                if attempt < config.retry_max_attempts {
1511                    debug!(
1512                        "fetch_group_offsets: will retry request (c: {}) due to: {:?}",
1513                        req.header.correlation_id, e
1514                    );
1515                    attempt += 1;
1516                    __retry_sleep(config)
1517                } else {
1518                    return Err(Error::Kafka(e));
1519                }
1520            }
1521            None => {
1522                return Ok(topic_map);
1523            }
1524        }
1525    }
1526}
1527
1528/// ~ carries out the given fetch requests and returns the response
1529fn __fetch_messages(
1530    conn_pool: &mut network::Connections,
1531    config: &ClientConfig,
1532    reqs: HashMap<&str, protocol::FetchRequest<'_, '_>>,
1533) -> Result<Vec<fetch::Response>> {
1534    let now = Instant::now();
1535    let mut res = Vec::with_capacity(reqs.len());
1536    for (host, req) in reqs {
1537        let p = protocol::fetch::ResponseParser {
1538            validate_crc: config.fetch_crc_validation,
1539            requests: Some(&req),
1540        };
1541        res.push(__z_send_receive(conn_pool, host, now, &req, &p)?);
1542    }
1543    Ok(res)
1544}
1545
1546/// ~ carries out the given produce requests and returns the response
1547fn __produce_messages(
1548    conn_pool: &mut network::Connections,
1549    reqs: HashMap<&str, protocol::ProduceRequest<'_, '_>>,
1550    no_acks: bool,
1551) -> Result<Vec<ProduceConfirm>> {
1552    let now = Instant::now();
1553    if no_acks {
1554        for (host, req) in reqs {
1555            __send_noack::<_, protocol::ProduceResponse>(conn_pool, host, now, req)?;
1556        }
1557        Ok(vec![])
1558    } else {
1559        let mut res: Vec<ProduceConfirm> = vec![];
1560        for (host, req) in reqs {
1561            let resp = __send_receive::<_, protocol::ProduceResponse>(conn_pool, host, now, req)?;
1562            for tpo in resp.get_response() {
1563                res.push(tpo);
1564            }
1565        }
1566        Ok(res)
1567    }
1568}
1569
1570fn __send_receive<T, V>(
1571    conn_pool: &mut network::Connections,
1572    host: &str,
1573    now: Instant,
1574    req: T,
1575) -> Result<V::R>
1576where
1577    T: ToByte,
1578    V: FromByte,
1579{
1580    __send_receive_conn::<T, V>(conn_pool.get_conn(host, now)?, req)
1581}
1582
1583fn __send_receive_conn<T, V>(conn: &mut network::KafkaConnection, req: T) -> Result<V::R>
1584where
1585    T: ToByte,
1586    V: FromByte,
1587{
1588    __send_request(conn, req)?;
1589    __get_response::<V>(conn)
1590}
1591
1592fn __send_noack<T, V>(
1593    conn_pool: &mut network::Connections,
1594    host: &str,
1595    now: Instant,
1596    req: T,
1597) -> Result<usize>
1598where
1599    T: ToByte,
1600    V: FromByte,
1601{
1602    let conn = conn_pool.get_conn(host, now)?;
1603    __send_request(conn, req)
1604}
1605
1606fn __send_request<T: ToByte>(conn: &mut network::KafkaConnection, request: T) -> Result<usize> {
1607    // ~ buffer to receive data to be sent
1608    let mut buffer = Vec::with_capacity(4);
1609    // ~ reserve bytes for the actual request size (we'll fill in that later)
1610    buffer.extend_from_slice(&[0, 0, 0, 0]);
1611    // ~ encode the request data
1612    request.encode(&mut buffer)?;
1613    // ~ put the size of the request data into the reserved area
1614    let size = buffer.len() as i32 - 4;
1615    size.encode(&mut &mut buffer[..])?;
1616
1617    trace!("__send_request: Sending bytes: {:?}", &buffer);
1618
1619    // ~ send the prepared buffer
1620    conn.send(&buffer)
1621}
1622
1623fn __get_response<T: FromByte>(conn: &mut network::KafkaConnection) -> Result<T::R> {
1624    let size = __get_response_size(conn)?;
1625    let resp = conn.read_exact_alloc(size as u64)?;
1626
1627    trace!("__get_response: received bytes: {:?}", &resp);
1628
1629    // {
1630    //     use std::fs::OpenOptions;
1631    //     use std::io::Write;
1632    //     let mut f = OpenOptions::new()
1633    //         .write(true)
1634    //         .truncate(true)
1635    //         .create(true)
1636    //         .open("/tmp/dump.dat")
1637    //         .unwrap();
1638    //     f.write_all(&resp[..]).unwrap();
1639    // }
1640    // ::super::protocol::
1641    // let thing = ::super::error::KafkaCode::from_protocol(self.error); // KafkaCode::decode_new::<T>(&mut Cursor::new(resp));
1642
1643    T::decode_new(&mut Cursor::new(resp))
1644}
1645
1646fn __z_send_receive<R, P>(
1647    conn_pool: &mut network::Connections,
1648    host: &str,
1649    now: Instant,
1650    req: R,
1651    parser: &P,
1652) -> Result<P::T>
1653where
1654    R: ToByte,
1655    P: ResponseParser,
1656{
1657    let conn = conn_pool.get_conn(host, now)?;
1658    __send_request(conn, req)?;
1659    __z_get_response(conn, parser)
1660}
1661
1662fn __z_get_response<P>(conn: &mut network::KafkaConnection, parser: &P) -> Result<P::T>
1663where
1664    P: ResponseParser,
1665{
1666    let size = __get_response_size(conn)?;
1667    let resp = conn.read_exact_alloc(size as u64)?;
1668
1669    // {
1670    //     use std::fs::OpenOptions;
1671    //     use std::io::Write;
1672    //     let mut f = OpenOptions::new()
1673    //         .write(true)
1674    //         .truncate(true)
1675    //         .create(true)
1676    //         .open("/tmp/dump.dat")
1677    //         .unwrap();
1678    //     f.write_all(&resp[..]).unwrap();
1679    // }
1680
1681    parser.parse(resp)
1682}
1683
1684fn __get_response_size(conn: &mut network::KafkaConnection) -> Result<i32> {
1685    let mut buf = [0u8; 4];
1686    conn.read_exact(&mut buf)?;
1687    i32::decode_new(&mut Cursor::new(&buf))
1688}
1689
1690/// Suspends the calling thread for the configured "retry" time. This
1691/// method should be called _only_ as part of a retry attempt.
1692fn __retry_sleep(cfg: &ClientConfig) {
1693    thread::sleep(cfg.retry_backoff_time)
1694}