kafka/client/mod.rs
1//! Kafka Client - A mid-level abstraction for a kafka cluster
2//! allowing building higher level constructs.
3//!
4//! The entry point into this module is `KafkaClient` obtained by a
5//! call to `KafkaClient::new()`.
6
7use std;
8use std::collections::hash_map;
9use std::collections::hash_map::HashMap;
10use std::io::Cursor;
11use std::iter::Iterator;
12use std::mem;
13use std::thread;
14use std::time::{Duration, Instant};
15
16// pub re-export
17pub use crate::compression::Compression;
18pub use crate::utils::PartitionOffset;
19
20#[cfg(feature = "security")]
21pub use self::network::SecurityConfig;
22
23use crate::codecs::{FromByte, ToByte};
24use crate::error::{Error, KafkaCode, Result};
25use crate::protocol::{self, ResponseParser};
26
27use crate::client_internals::KafkaClientInternals;
28
29pub mod metadata;
30mod network;
31mod state;
32
33// ~ re-export (only) certain types from the protocol::fetch module as
34// 'client::fetch'.
35pub mod fetch {
36 //! A representation of fetched messages from Kafka.
37
38 pub use crate::protocol::fetch::{Data, Message, Partition, Response, Topic};
39}
40
41const DEFAULT_CONNECTION_RW_TIMEOUT_SECS: u64 = 120;
42
43fn default_conn_rw_timeout() -> Option<Duration> {
44 match DEFAULT_CONNECTION_RW_TIMEOUT_SECS {
45 0 => None,
46 n => Some(Duration::from_secs(n)),
47 }
48}
49
50/// The default value for `KafkaClient::set_compression(..)`
51pub const DEFAULT_COMPRESSION: Compression = Compression::NONE;
52
53/// The default value for `KafkaClient::set_fetch_max_wait_time(..)`
54pub const DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS: u64 = 100;
55
56/// The default value for `KafkaClient::set_fetch_min_bytes(..)`
57pub const DEFAULT_FETCH_MIN_BYTES: i32 = 4096;
58
59/// The default value for `KafkaClient::set_fetch_max_bytes(..)`
60pub const DEFAULT_FETCH_MAX_BYTES_PER_PARTITION: i32 = 32 * 1024;
61
62/// The default value for `KafkaClient::set_fetch_crc_validation(..)`
63pub const DEFAULT_FETCH_CRC_VALIDATION: bool = true;
64
65/// The default value for `KafkaClient::set_group_offset_storage(..)`
66pub const DEFAULT_GROUP_OFFSET_STORAGE: Option<GroupOffsetStorage> = None;
67
68/// The default value for `KafkaClient::set_retry_backoff_time(..)`
69pub const DEFAULT_RETRY_BACKOFF_TIME_MILLIS: u64 = 100;
70
71/// The default value for `KafkaClient::set_retry_max_attempts(..)`
72// the default value: re-attempt a repeatable operation for
73// approximetaly up to two minutes
74pub const DEFAULT_RETRY_MAX_ATTEMPTS: u32 = 120_000 / DEFAULT_RETRY_BACKOFF_TIME_MILLIS as u32;
75
76/// The default value for `KafkaClient::set_connection_idle_timeout(..)`
77pub const DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS: u64 = 540_000;
78
79/// Client struct keeping track of brokers and topic metadata.
80///
81/// Implements methods described by the [Kafka Protocol](http://kafka.apache.org/protocol.html).
82///
83/// You will have to load metadata before making any other request.
84#[derive(Debug)]
85pub struct KafkaClient {
86 // ~ this kafka client configuration
87 config: ClientConfig,
88
89 // ~ a pool of re-usable connections to kafka brokers
90 conn_pool: network::Connections,
91
92 // ~ the current state of this client
93 state: state::ClientState,
94}
95
96#[derive(Debug)]
97struct ClientConfig {
98 client_id: String,
99 hosts: Vec<String>,
100 // ~ compression to use when sending messages
101 compression: Compression,
102 // ~ these are the defaults when fetching messages for details
103 // refer to the kafka wire protocol
104 fetch_max_wait_time: i32,
105 fetch_min_bytes: i32,
106 fetch_max_bytes_per_partition: i32,
107 fetch_crc_validation: bool,
108 // ~ the version of the API to use for the corresponding kafka
109 // calls; note that this might have an effect on the storage type
110 // kafka will then use (zookeeper or __consumer_offsets). it is
111 // important to use version for both of them which target the same
112 // storage type.
113 // offset_fetch_version: protocol::OffsetFetchVersion,
114 // offset_commit_version: protocol::OffsetCommitVersion,
115 offset_storage: Option<GroupOffsetStorage>,
116 // ~ the duration to wait before retrying a failed
117 // operation like refreshing group coordinators; this avoids
118 // operation retries in a tight loop.
119 retry_backoff_time: Duration,
120 // ~ the number of repeated retry attempts; prevents endless
121 // repetition of a retry attempt
122 retry_max_attempts: u32,
123}
124
125// --------------------------------------------------------------------
126
127/// Possible values when querying a topic's offset.
128/// See `KafkaClient::fetch_offsets`.
129#[derive(Debug, Copy, Clone)]
130pub enum FetchOffset {
131 /// Receive the earliest available offset.
132 Earliest,
133 /// Receive the latest offset.
134 Latest,
135 /// Used to ask for all messages before a certain time (ms); unix
136 /// timestamp in milliseconds.
137 /// See https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka#WritingaDriverforKafka-Offsets
138 ByTime(i64),
139}
140
141impl FetchOffset {
142 fn to_kafka_value(self) -> i64 {
143 match self {
144 FetchOffset::Earliest => -2,
145 FetchOffset::Latest => -1,
146 FetchOffset::ByTime(n) => n,
147 }
148 }
149}
150
151// --------------------------------------------------------------------
152
153/// Defines the available storage types to utilize when fetching or
154/// committing group offsets. See also `KafkaClient::set_group_offset_storage`.
155#[derive(Debug, Copy, Clone, PartialEq, Eq)]
156pub enum GroupOffsetStorage {
157 /// Zookeeper based storage (available as of kafka 0.8.1)
158 Zookeeper,
159 /// Kafka based storage (available as of Kafka 0.8.2). This is the
160 /// preferred method for groups to store their offsets at.
161 Kafka,
162}
163
164impl GroupOffsetStorage {
165 fn offset_fetch_version(self) -> protocol::OffsetFetchVersion {
166 match self {
167 GroupOffsetStorage::Zookeeper => protocol::OffsetFetchVersion::V0,
168 GroupOffsetStorage::Kafka => protocol::OffsetFetchVersion::V1,
169 }
170 }
171 fn offset_commit_version(self) -> protocol::OffsetCommitVersion {
172 match self {
173 GroupOffsetStorage::Zookeeper => protocol::OffsetCommitVersion::V0,
174 // ~ if we knew we'll be communicating with a kafka 0.9+
175 // broker we could set the commit-version to V2; however,
176 // since we still want to support Kafka 0.8.2 versions,
177 // we'll go with the less efficient but safe option V1.
178 GroupOffsetStorage::Kafka => protocol::OffsetCommitVersion::V1,
179 }
180 }
181}
182
183/// Data point identifying a topic partition to fetch a group's offset
184/// for. See `KafkaClient::fetch_group_offsets`.
185#[derive(Debug)]
186pub struct FetchGroupOffset<'a> {
187 /// The topic to fetch the group offset for
188 pub topic: &'a str,
189 /// The partition to fetch the group offset for
190 pub partition: i32,
191}
192
193impl<'a> FetchGroupOffset<'a> {
194 #[inline]
195 pub fn new(topic: &'a str, partition: i32) -> Self {
196 FetchGroupOffset { topic, partition }
197 }
198}
199
200impl<'a> AsRef<FetchGroupOffset<'a>> for FetchGroupOffset<'a> {
201 fn as_ref(&self) -> &Self {
202 self
203 }
204}
205
206// --------------------------------------------------------------------
207
208/// Data point identifying a particular topic partition offset to be
209/// committed.
210/// See `KafkaClient::commit_offsets`.
211#[derive(Debug)]
212pub struct CommitOffset<'a> {
213 /// The offset to be committed
214 pub offset: i64,
215 /// The topic to commit the offset for
216 pub topic: &'a str,
217 /// The partition to commit the offset for
218 pub partition: i32,
219}
220
221impl<'a> CommitOffset<'a> {
222 pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
223 CommitOffset {
224 topic,
225 partition,
226 offset,
227 }
228 }
229}
230
231impl<'a> AsRef<CommitOffset<'a>> for CommitOffset<'a> {
232 fn as_ref(&self) -> &Self {
233 self
234 }
235}
236
237// --------------------------------------------------------------------
238
239/// Possible choices on acknowledgement requirements when
240/// producing/sending messages to Kafka. See
241/// `KafkaClient::produce_messages`.
242#[derive(Debug, Copy, Clone)]
243pub enum RequiredAcks {
244 /// Indicates to the receiving Kafka broker not to acknowledge
245 /// messages sent to it at all. Sending messages with this
246 /// acknowledgement requirement translates into a fire-and-forget
247 /// scenario which - of course - is very fast but not reliable.
248 None = 0,
249 /// Requires the receiving Kafka broker to wait until the sent
250 /// messages are written to local disk. Such messages can be
251 /// regarded as acknowledged by one broker in the cluster.
252 One = 1,
253 /// Requires the sent messages to be acknowledged by all in-sync
254 /// replicas of the targeted topic partitions.
255 All = -1,
256}
257
258// --------------------------------------------------------------------
259
260/// Message data to be sent/produced to a particular topic partition.
261/// See `KafkaClient::produce_messages` and `Producer::send`.
262#[derive(Debug)]
263pub struct ProduceMessage<'a, 'b> {
264 /// The "key" data of this message.
265 pub key: Option<&'b [u8]>,
266
267 /// The "value" data of this message.
268 pub value: Option<&'b [u8]>,
269
270 /// The topic to produce this message to.
271 pub topic: &'a str,
272
273 /// The partition (of the corresponding topic) to produce this
274 /// message to.
275 pub partition: i32,
276}
277
278impl<'a, 'b> AsRef<ProduceMessage<'a, 'b>> for ProduceMessage<'a, 'b> {
279 fn as_ref(&self) -> &Self {
280 self
281 }
282}
283
284impl<'a, 'b> ProduceMessage<'a, 'b> {
285 /// A convenient constructor method to create a new produce
286 /// message with all attributes specified.
287 pub fn new(
288 topic: &'a str,
289 partition: i32,
290 key: Option<&'b [u8]>,
291 value: Option<&'b [u8]>,
292 ) -> Self {
293 ProduceMessage {
294 key,
295 value,
296 topic,
297 partition,
298 }
299 }
300}
301
302// --------------------------------------------------------------------
303
304/// Partition related request data for fetching messages.
305/// See `KafkaClient::fetch_messages`.
306#[derive(Debug)]
307pub struct FetchPartition<'a> {
308 /// The topic to fetch messages from.
309 pub topic: &'a str,
310
311 /// The offset as of which to fetch messages.
312 pub offset: i64,
313
314 /// The partition to fetch messasges from.
315 pub partition: i32,
316
317 /// Specifies the max. amount of data to fetch (for this
318 /// partition.) This implicitly defines the biggest message the
319 /// client can accept. If this value is too small, no messages
320 /// can be delivered. Setting this size should be in sync with
321 /// the producers to the partition.
322 ///
323 /// Zero or negative values are treated as "unspecified".
324 pub max_bytes: i32,
325}
326
327impl<'a> FetchPartition<'a> {
328 /// Creates a new "fetch messages" request structure with an
329 /// unspecified `max_bytes`.
330 pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
331 FetchPartition {
332 topic,
333 partition,
334 offset,
335 max_bytes: -1,
336 }
337 }
338
339 /// Sets the `max_bytes` value for the "fetch messages" request.
340 pub fn with_max_bytes(mut self, max_bytes: i32) -> Self {
341 self.max_bytes = max_bytes;
342 self
343 }
344}
345
346impl<'a> AsRef<FetchPartition<'a>> for FetchPartition<'a> {
347 fn as_ref(&self) -> &Self {
348 self
349 }
350}
351
352/// A confirmation of messages sent back by the Kafka broker
353/// to confirm delivery of producer messages.
354#[derive(Debug)]
355pub struct ProduceConfirm {
356 /// The topic the messages were sent to.
357 pub topic: String,
358
359 /// The list of individual confirmations for each offset and partition.
360 pub partition_confirms: Vec<ProducePartitionConfirm>,
361}
362
363/// A confirmation of messages sent back by the Kafka broker
364/// to confirm delivery of producer messages for a particular topic.
365#[derive(Debug)]
366pub struct ProducePartitionConfirm {
367 /// The offset assigned to the first message in the message set appended
368 /// to this partition, or an error if one occurred.
369 pub offset: std::result::Result<i64, KafkaCode>,
370
371 /// The partition to which the message(s) were appended.
372 pub partition: i32,
373}
374
375// --------------------------------------------------------------------
376
377impl KafkaClient {
378 /// Creates a new instance of KafkaClient. Before being able to
379 /// successfully use the new client, you'll have to load metadata.
380 ///
381 /// # Examples
382 ///
383 /// ```no_run
384 /// let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
385 /// client.load_metadata_all().unwrap();
386 /// ```
387 pub fn new(hosts: Vec<String>) -> KafkaClient {
388 KafkaClient {
389 config: ClientConfig {
390 client_id: String::new(),
391 hosts,
392 compression: DEFAULT_COMPRESSION,
393 fetch_max_wait_time: protocol::to_millis_i32(Duration::from_millis(
394 DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS,
395 ))
396 .expect("invalid default-fetch-max-time-millis"),
397 fetch_min_bytes: DEFAULT_FETCH_MIN_BYTES,
398 fetch_max_bytes_per_partition: DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
399 fetch_crc_validation: DEFAULT_FETCH_CRC_VALIDATION,
400 offset_storage: DEFAULT_GROUP_OFFSET_STORAGE,
401 retry_backoff_time: Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
402 retry_max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
403 },
404 conn_pool: network::Connections::new(
405 default_conn_rw_timeout(),
406 Duration::from_millis(DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS),
407 ),
408 state: state::ClientState::new(),
409 }
410 }
411
412 /// Creates a new secure instance of KafkaClient. Before being able to
413 /// successfully use the new client, you'll have to load metadata.
414 ///
415 /// # Examples
416 ///
417 /// ```no_run
418 /// extern crate openssl;
419 /// extern crate kafka;
420 ///
421 /// use openssl::ssl::{SslConnector, SslMethod, SslFiletype, SslVerifyMode};
422 /// use kafka::client::{KafkaClient, SecurityConfig};
423 ///
424 /// fn main() {
425 /// let (key, cert) = ("client.key".to_string(), "client.crt".to_string());
426 ///
427 /// // OpenSSL offers a variety of complex configurations. Here is an example:
428 /// let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
429 /// builder.set_cipher_list("DEFAULT").unwrap();
430 /// builder
431 /// .set_certificate_file(cert, SslFiletype::PEM)
432 /// .unwrap();
433 /// builder
434 /// .set_private_key_file(key, SslFiletype::PEM)
435 /// .unwrap();
436 /// builder.check_private_key().unwrap();
437 /// builder.set_default_verify_paths().unwrap();
438 /// builder.set_verify(SslVerifyMode::PEER);
439 /// let connector = builder.build();
440 ///
441 /// let mut client = KafkaClient::new_secure(vec!("localhost:9092".to_owned()),
442 /// SecurityConfig::new(connector));
443 /// client.load_metadata_all().unwrap();
444 /// }
445 /// ```
446 /// See also `SecurityConfig#with_hostname_verification` to disable hostname verification.
447 ///
448 /// See also `KafkaClient::load_metadatata_all` and
449 /// `KafkaClient::load_metadata` methods, the creates
450 /// [openssl](https://crates.io/crates/openssl)
451 /// and [openssl_verify](https://crates.io/crates/openssl-verify),
452 /// as well as
453 /// [Kafka's documentation](https://kafka.apache.org/documentation.html#security_ssl).
454 #[cfg(feature = "security")]
455 pub fn new_secure(hosts: Vec<String>, security: SecurityConfig) -> KafkaClient {
456 KafkaClient {
457 config: ClientConfig {
458 client_id: String::new(),
459 hosts,
460 compression: DEFAULT_COMPRESSION,
461 fetch_max_wait_time: protocol::to_millis_i32(Duration::from_millis(
462 DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS,
463 ))
464 .expect("invalid default-fetch-max-time-millis"),
465 fetch_min_bytes: DEFAULT_FETCH_MIN_BYTES,
466 fetch_max_bytes_per_partition: DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
467 fetch_crc_validation: DEFAULT_FETCH_CRC_VALIDATION,
468 offset_storage: DEFAULT_GROUP_OFFSET_STORAGE,
469 retry_backoff_time: Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
470 retry_max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
471 },
472 conn_pool: network::Connections::new_with_security(
473 default_conn_rw_timeout(),
474 Duration::from_millis(DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS),
475 Some(security),
476 ),
477 state: state::ClientState::new(),
478 }
479 }
480
481 /// Exposes the hosts used for discovery of the target kafka
482 /// cluster. This set of hosts corresponds to the values supplied
483 /// to `KafkaClient::new`.
484 #[inline]
485 pub fn hosts(&self) -> &[String] {
486 &self.config.hosts
487 }
488
489 /// Sets the client_id to be sent along every request to the
490 /// remote Kafka brokers. By default, this value is the empty
491 /// string.
492 ///
493 /// Kafka brokers write out this client id to their
494 /// request/response trace log - if configured appropriately.
495 pub fn set_client_id(&mut self, client_id: String) {
496 self.config.client_id = client_id;
497 }
498
499 /// Retrieves the current `KafkaClient::set_client_id` setting.
500 pub fn client_id(&self) -> &str {
501 &self.config.client_id
502 }
503
504 /// Sets the compression algorithm to use when sending out messages.
505 ///
506 /// # Example
507 ///
508 /// ```no_run
509 /// use kafka::client::{Compression, KafkaClient};
510 ///
511 /// let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
512 /// client.load_metadata_all().unwrap();
513 /// client.set_compression(Compression::NONE);
514 /// ```
515 #[inline]
516 pub fn set_compression(&mut self, compression: Compression) {
517 self.config.compression = compression;
518 }
519
520 /// Retrieves the current `KafkaClient::set_compression` setting.
521 #[inline]
522 pub fn compression(&self) -> Compression {
523 self.config.compression
524 }
525
526 /// Sets the maximum time in milliseconds to wait for insufficient
527 /// data to become available when fetching messages.
528 ///
529 /// See also `KafkaClient::set_fetch_min_bytes(..)` and
530 /// `KafkaClient::set_fetch_max_bytes_per_partition(..)`.
531 #[inline]
532 pub fn set_fetch_max_wait_time(&mut self, max_wait_time: Duration) -> Result<()> {
533 self.config.fetch_max_wait_time = protocol::to_millis_i32(max_wait_time)?;
534 Ok(())
535 }
536
537 /// Retrieves the current `KafkaClient::set_fetch_max_wait_time`
538 /// setting.
539 #[inline]
540 pub fn fetch_max_wait_time(&self) -> Duration {
541 Duration::from_millis(self.config.fetch_max_wait_time as u64)
542 }
543
544 /// Sets the minimum number of bytes of available data to wait for
545 /// as long as specified by `KafkaClient::set_fetch_max_wait_time`
546 /// when fetching messages.
547 ///
548 /// By setting higher values in combination with the timeout the
549 /// consumer can tune for throughput and trade a little additional
550 /// latency for reading only large chunks of data (e.g. setting
551 /// MaxWaitTime to 100 ms and setting MinBytes to 64k would allow
552 /// the server to wait up to 100ms to try to accumulate 64k of
553 /// data before responding).
554 ///
555 /// # Example
556 ///
557 /// ```no_run
558 /// use std::time::Duration;
559 /// use kafka::client::{KafkaClient, FetchPartition};
560 ///
561 /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
562 /// client.load_metadata_all().unwrap();
563 /// client.set_fetch_max_wait_time(Duration::from_millis(100));
564 /// client.set_fetch_min_bytes(64 * 1024);
565 /// let r = client.fetch_messages(&[FetchPartition::new("my-topic", 0, 0)]);
566 /// ```
567 ///
568 /// See also `KafkaClient::set_fetch_max_wait_time(..)` and
569 /// `KafkaClient::set_fetch_max_bytes_per_partition(..)`.
570 #[inline]
571 pub fn set_fetch_min_bytes(&mut self, min_bytes: i32) {
572 self.config.fetch_min_bytes = min_bytes;
573 }
574
575 /// Retrieves the current `KafkaClient::set_fetch_min_bytes`
576 /// setting.
577 #[inline]
578 pub fn fetch_min_bytes(&self) -> i32 {
579 self.config.fetch_min_bytes
580 }
581
582 /// Sets the default maximum number of bytes to obtain from _a
583 /// single kafka partition_ when fetching messages.
584 ///
585 /// This basically determines the maximum message size this client
586 /// will be able to fetch. If a topic partition contains a
587 /// message larger than this specified number of bytes, the server
588 /// will not deliver it.
589 ///
590 /// Note that this setting is related to a single partition. The
591 /// overall potential data size in a fetch messages response will
592 /// thus be determined by the number of partitions in the fetch
593 /// messages request times this "max bytes per partitions."
594 ///
595 /// This client will use this setting by default for all queried
596 /// partitions, however, `fetch_messages` does allow you to
597 /// override this setting for a particular partition being
598 /// queried.
599 ///
600 /// See also `KafkaClient::set_fetch_max_wait_time`,
601 /// `KafkaClient::set_fetch_min_bytes`, and `KafkaClient::fetch_messages`.
602 #[inline]
603 pub fn set_fetch_max_bytes_per_partition(&mut self, max_bytes: i32) {
604 self.config.fetch_max_bytes_per_partition = max_bytes;
605 }
606
607 /// Retrieves the current
608 /// `KafkaClient::set_fetch_max_bytes_per_partition` setting.
609 #[inline]
610 pub fn fetch_max_bytes_per_partition(&self) -> i32 {
611 self.config.fetch_max_bytes_per_partition
612 }
613
614 /// Specifies whether the to perform CRC validation on fetched
615 /// messages.
616 ///
617 /// This ensures detection of on-the-wire or on-disk corruption to
618 /// fetched messages. This check adds some overhead, so it may be
619 /// disabled in cases seeking extreme performance.
620 #[inline]
621 pub fn set_fetch_crc_validation(&mut self, validate_crc: bool) {
622 self.config.fetch_crc_validation = validate_crc;
623 }
624
625 /// Retrieves the current `KafkaClient::set_fetch_crc_validation`
626 /// setting.
627 #[inline]
628 pub fn fetch_crc_validation(&self) -> bool {
629 self.config.fetch_crc_validation
630 }
631
632 /// Specifies the group offset storage to address when fetching or
633 /// committing group offsets.
634 ///
635 /// In addition to Zookeeper, Kafka 0.8.2 brokers or later offer a
636 /// more performant (and scalable) way to manage group offset
637 /// directly by itself. Note that the remote storages are separate
638 /// and independent on each other. Hence, you typically want
639 /// consistently hard-code your choice in your program.
640 ///
641 /// Unless you have a 0.8.1 broker or want to participate in a
642 /// group which is already based on Zookeeper, you generally want
643 /// to choose `GroupOffsetStorage::Kafka` here.
644 ///
645 /// See also `KafkaClient::fetch_group_offsets` and
646 /// `KafkaClient::commit_offsets`.
647 #[inline]
648 pub fn set_group_offset_storage(&mut self, storage: Option<GroupOffsetStorage>) {
649 self.config.offset_storage = storage;
650 }
651
652 /// Retrieves the current `KafkaClient::set_group_offset_storage`
653 /// settings.
654 pub fn group_offset_storage(&self) -> Option<GroupOffsetStorage> {
655 self.config.offset_storage
656 }
657
658 /// Specifies the time to wait before retrying a failed,
659 /// repeatable operation against Kafka. This avoids retrying such
660 /// operations in a tight loop.
661 #[inline]
662 pub fn set_retry_backoff_time(&mut self, time: Duration) {
663 self.config.retry_backoff_time = time;
664 }
665
666 /// Retrieves the current `KafkaClient::set_retry_backoff_time`
667 /// setting.
668 pub fn retry_backoff_time(&self) -> Duration {
669 self.config.retry_backoff_time
670 }
671
672 /// Specifies the upper limit of retry attempts for failed,
673 /// repeatable operations against kafka. This avoids retrying
674 /// them forever.
675 #[inline]
676 pub fn set_retry_max_attempts(&mut self, attempts: u32) {
677 self.config.retry_max_attempts = attempts;
678 }
679
680 /// Retrieves the current `KafkaClient::set_retry_max_attempts`
681 /// setting.
682 #[inline]
683 pub fn retry_max_attempts(&self) -> u32 {
684 self.config.retry_max_attempts
685 }
686
687 /// Specifies the timeout after which idle connections will
688 /// transparently be closed/re-established by `KafkaClient`.
689 ///
690 /// To be effective this value must be smaller than the [remote
691 /// broker's `connections.max.idle.ms`
692 /// setting](https://kafka.apache.org/documentation.html#brokerconfigs).
693 #[inline]
694 pub fn set_connection_idle_timeout(&mut self, timeout: Duration) {
695 self.conn_pool.set_idle_timeout(timeout);
696 }
697
698 /// Retrieves the current
699 /// `KafkaClient::set_connection_idle_timeout` setting.
700 #[inline]
701 pub fn connection_idle_timeout(&self) -> Duration {
702 self.conn_pool.idle_timeout()
703 }
704
705 /// Provides a view onto the currently loaded metadata of known .
706 ///
707 /// # Examples
708 /// ```no_run
709 /// use kafka::client::KafkaClient;
710 /// use kafka::client::metadata::Broker;
711 ///
712 /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
713 /// client.load_metadata_all().unwrap();
714 /// for topic in client.topics() {
715 /// for partition in topic.partitions() {
716 /// println!("{} #{} => {}", topic.name(), partition.id(),
717 /// partition.leader()
718 /// .map(Broker::host)
719 /// .unwrap_or("no-leader!"));
720 /// }
721 /// }
722 /// ```
723 #[inline]
724 pub fn topics(&self) -> metadata::Topics<'_> {
725 metadata::Topics::new(self)
726 }
727
728 /// Resets and loads metadata for all topics from the underlying
729 /// brokers.
730 ///
731 /// # Examples
732 ///
733 /// ```no_run
734 /// let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
735 /// client.load_metadata_all().unwrap();
736 /// for topic in client.topics().names() {
737 /// println!("topic: {}", topic);
738 /// }
739 /// ```
740 ///
741 /// Returns the metadata for all loaded topics underlying this
742 /// client.
743 #[inline]
744 pub fn load_metadata_all(&mut self) -> Result<()> {
745 self.reset_metadata();
746 self.load_metadata::<&str>(&[])
747 }
748
749 /// Reloads metadata for a list of supplied topics.
750 ///
751 /// Note: if any of the specified topics does not exist yet on the
752 /// underlying brokers and these have the [configuration for "auto
753 /// create topics"
754 /// enabled](https://kafka.apache.org/documentation.html#configuration),
755 /// the remote kafka instance will create the yet missing topics
756 /// on the fly as a result of explicitly loading their metadata.
757 /// This is in contrast to other methods of this `KafkaClient`
758 /// which will silently filter out requests to
759 /// not-yet-loaded/not-yet-known topics and, thus, not cause
760 /// topics to be automatically created.
761 ///
762 /// # Examples
763 ///
764 /// ```no_run
765 /// let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
766 /// let _ = client.load_metadata(&["my-topic"]).unwrap();
767 /// ```
768 ///
769 /// Returns the metadata for _all_ loaded topics underlying this
770 /// client (this might be more topics than specified right to this
771 /// method call.)
772 #[inline]
773 pub fn load_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()> {
774 let resp = self.fetch_metadata(topics)?;
775 self.state.update_metadata(resp)
776 }
777
778 /// Clears metadata stored in the client. You must load metadata
779 /// after this call if you want to use the client.
780 #[inline]
781 pub fn reset_metadata(&mut self) {
782 self.state.clear_metadata();
783 }
784
785 /// Fetches metadata about the specified topics from all of the
786 /// underlying brokers (`self.hosts`).
787 fn fetch_metadata<T: AsRef<str>>(
788 &mut self,
789 topics: &[T],
790 ) -> Result<protocol::MetadataResponse> {
791 let correlation = self.state.next_correlation_id();
792 let now = Instant::now();
793
794 for host in &self.config.hosts {
795 debug!("fetch_metadata: requesting metadata from {}", host);
796 match self.conn_pool.get_conn(host, now) {
797 Ok(conn) => {
798 let req =
799 protocol::MetadataRequest::new(correlation, &self.config.client_id, topics);
800 match __send_request(conn, req) {
801 Ok(_) => return __get_response::<protocol::MetadataResponse>(conn),
802 Err(e) => debug!(
803 "fetch_metadata: failed to request metadata from {}: {}",
804 host, e
805 ),
806 }
807 }
808 Err(e) => {
809 debug!("fetch_metadata: failed to connect to {}: {}", host, e);
810 }
811 }
812 }
813 Err(Error::NoHostReachable)
814 }
815
816 /// Fetch offsets for a list of topics
817 ///
818 /// # Examples
819 ///
820 /// ```no_run
821 /// use kafka::client::KafkaClient;
822 ///
823 /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
824 /// client.load_metadata_all().unwrap();
825 /// let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
826 /// let offsets = client.fetch_offsets(&topics, kafka::client::FetchOffset::Latest).unwrap();
827 /// ```
828 ///
829 /// Returns a mapping of topic name to `PartitionOffset`s for each
830 /// currently available partition of the corresponding topic.
831 pub fn fetch_offsets<T: AsRef<str>>(
832 &mut self,
833 topics: &[T],
834 offset: FetchOffset,
835 ) -> Result<HashMap<String, Vec<PartitionOffset>>> {
836 let time = offset.to_kafka_value();
837 let n_topics = topics.len();
838
839 let state = &mut self.state;
840 let correlation = state.next_correlation_id();
841
842 // Map topic and partition to the corresponding broker
843 let config = &self.config;
844 let mut reqs: HashMap<&str, protocol::OffsetRequest<'_>> = HashMap::with_capacity(n_topics);
845 for topic in topics {
846 let topic = topic.as_ref();
847 if let Some(ps) = state.partitions_for(topic) {
848 for (id, host) in ps
849 .iter()
850 .filter_map(|(id, p)| p.broker(state).map(|b| (id, b.host())))
851 {
852 let entry = reqs.entry(host).or_insert_with(|| {
853 protocol::OffsetRequest::new(correlation, &config.client_id)
854 });
855 entry.add(topic, id, time);
856 }
857 }
858 }
859
860 // Call each broker with the request formed earlier
861 let now = Instant::now();
862 let mut res: HashMap<String, Vec<PartitionOffset>> = HashMap::with_capacity(n_topics);
863 for (host, req) in reqs {
864 let resp =
865 __send_receive::<_, protocol::OffsetResponse>(&mut self.conn_pool, host, now, req)?;
866 for tp in resp.topic_partitions {
867 let mut entry = res.entry(tp.topic);
868 let mut new_resp_offsets = None;
869 let mut err = None;
870 // Use an explicit scope here to allow insertion into a vacant entry
871 // below
872 {
873 // Return a &mut to the response we will be collecting into to
874 // return from this function. If there are some responses we have
875 // already prepared, keep collecting into that; otherwise, make a
876 // new collection to return.
877 let resp_offsets = match entry {
878 hash_map::Entry::Occupied(ref mut e) => e.get_mut(),
879 hash_map::Entry::Vacant(_) => {
880 new_resp_offsets = Some(Vec::new());
881 new_resp_offsets.as_mut().unwrap()
882 }
883 };
884 for p in tp.partitions {
885 let partition_offset = match p.to_offset() {
886 Ok(po) => po,
887 Err(code) => {
888 err = Some((p.partition, code));
889 break;
890 }
891 };
892 resp_offsets.push(partition_offset);
893 }
894 }
895 if let Some((partition, code)) = err {
896 let topic = KafkaClient::get_key_from_entry(entry);
897 return Err(Error::TopicPartitionError {
898 topic_name: topic,
899 partition_id: partition,
900 error_code: code,
901 });
902 }
903 if let hash_map::Entry::Vacant(e) = entry {
904 // unwrap is ok because if it is Vacant, it would have
905 // been made into a Some above
906 e.insert(new_resp_offsets.unwrap());
907 }
908 }
909 }
910
911 Ok(res)
912 }
913
914 /// Takes ownership back from the given HashMap Entry.
915 fn get_key_from_entry<'a, K: 'a, V: 'a>(entry: hash_map::Entry<'a, K, V>) -> K {
916 match entry {
917 hash_map::Entry::Occupied(e) => e.remove_entry().0,
918 hash_map::Entry::Vacant(e) => e.into_key(),
919 }
920 }
921
922 /// Fetch offset for a single topic.
923 ///
924 /// # Examples
925 ///
926 /// ```no_run
927 /// use kafka::client::{KafkaClient, FetchOffset};
928 ///
929 /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
930 /// client.load_metadata_all().unwrap();
931 /// let offsets = client.fetch_topic_offsets("my-topic", FetchOffset::Latest).unwrap();
932 /// ```
933 ///
934 /// Returns a vector of the offset data for each available partition.
935 /// See also `KafkaClient::fetch_offsets`.
936 pub fn fetch_topic_offsets<T: AsRef<str>>(
937 &mut self,
938 topic: T,
939 offset: FetchOffset,
940 ) -> Result<Vec<PartitionOffset>> {
941 let topic = topic.as_ref();
942
943 let mut m = self.fetch_offsets(&[topic], offset)?;
944 let offs = m.remove(topic).unwrap_or_default();
945 if offs.is_empty() {
946 Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition))
947 } else {
948 Ok(offs)
949 }
950 }
951
952 /// Fetch messages from Kafka (multiple topic, partitions).
953 ///
954 /// It takes a vector specifying the topic partitions and their
955 /// offsets as of which to fetch messages. Additionally, the
956 /// default "max fetch size per partition" can be explicitly
957 /// overridden if it is "defined" - this is, if `max_bytes` is
958 /// greater than zero.
959 ///
960 /// The result is exposed in a raw, complicated manner but allows
961 /// for very efficient consumption possibilities. All of the data
962 /// available through the returned fetch responses is bound to
963 /// their lifetime as that data is merely a "view" into parts of
964 /// the response structs. If you need to keep individual messages
965 /// for a longer time than the whole fetch responses, you'll need
966 /// to make a copy of the message data.
967 ///
968 /// * This method transparently uncompresses messages (while Kafka
969 /// might sent them in compressed format.)
970 ///
971 /// * This method ensures to skip messages with a lower offset
972 /// than requested (while Kafka might for efficiency reasons sent
973 /// messages with a lower offset.)
974 ///
975 /// Note: before using this method consider using
976 /// `kafka::consumer::Consumer` instead which provides an easier
977 /// to use API for the regular use-case of fetching messesage from
978 /// Kafka.
979 ///
980 /// # Example
981 ///
982 /// This example demonstrates iterating all fetched messages from
983 /// two topic partitions. From one partition we allow Kafka to
984 /// deliver to us the default number bytes as defined by
985 /// `KafkaClient::set_fetch_max_bytes_per_partition`, from the
986 /// other partition we allow Kafka to deliver up to 1MiB of
987 /// messages.
988 ///
989 /// ```no_run
990 /// use kafka::client::{KafkaClient, FetchPartition};
991 ///
992 /// let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
993 /// client.load_metadata_all().unwrap();
994 /// let reqs = &[FetchPartition::new("my-topic", 0, 0),
995 /// FetchPartition::new("my-topic-2", 0, 0).with_max_bytes(1024*1024)];
996 /// let resps = client.fetch_messages(reqs).unwrap();
997 /// for resp in resps {
998 /// for t in resp.topics() {
999 /// for p in t.partitions() {
1000 /// match p.data() {
1001 /// Err(ref e) => {
1002 /// println!("partition error: {}:{}: {}", t.topic(), p.partition(), e)
1003 /// }
1004 /// Ok(ref data) => {
1005 /// println!("topic: {} / partition: {} / latest available message offset: {}",
1006 /// t.topic(), p.partition(), data.highwatermark_offset());
1007 /// for msg in data.messages() {
1008 /// println!("topic: {} / partition: {} / message.offset: {} / message.len: {}",
1009 /// t.topic(), p.partition(), msg.offset, msg.value.len());
1010 /// }
1011 /// }
1012 /// }
1013 /// }
1014 /// }
1015 /// }
1016 /// ```
1017 /// See also `kafka::consumer`.
1018 /// See also `KafkaClient::set_fetch_max_bytes_per_partition`.
1019 pub fn fetch_messages<'a, I, J>(&mut self, input: I) -> Result<Vec<fetch::Response>>
1020 where
1021 J: AsRef<FetchPartition<'a>>,
1022 I: IntoIterator<Item = J>,
1023 {
1024 let state = &mut self.state;
1025 let config = &self.config;
1026
1027 let correlation = state.next_correlation_id();
1028
1029 // Map topic and partition to the corresponding broker
1030 let mut reqs: HashMap<&str, protocol::FetchRequest<'_, '_>> = HashMap::new();
1031 for inp in input {
1032 let inp = inp.as_ref();
1033 if let Some(broker) = state.find_broker(inp.topic, inp.partition) {
1034 reqs.entry(broker)
1035 .or_insert_with(|| {
1036 protocol::FetchRequest::new(
1037 correlation,
1038 &config.client_id,
1039 config.fetch_max_wait_time,
1040 config.fetch_min_bytes,
1041 )
1042 })
1043 .add(
1044 inp.topic,
1045 inp.partition,
1046 inp.offset,
1047 if inp.max_bytes > 0 {
1048 inp.max_bytes
1049 } else {
1050 config.fetch_max_bytes_per_partition
1051 },
1052 );
1053 }
1054 }
1055
1056 __fetch_messages(&mut self.conn_pool, config, reqs)
1057 }
1058
1059 /// Fetch messages from a single kafka partition.
1060 ///
1061 /// See `KafkaClient::fetch_messages`.
1062 pub fn fetch_messages_for_partition<'a>(
1063 &mut self,
1064 req: &FetchPartition<'a>,
1065 ) -> Result<Vec<fetch::Response>> {
1066 self.fetch_messages(&[req])
1067 }
1068
1069 /// Send a message to Kafka
1070 ///
1071 /// `required_acks` - indicates how many acknowledgements the
1072 /// servers should receive before responding to the request
1073 ///
1074 /// `ack_timeout` - provides a maximum time in milliseconds the
1075 /// server can await the receipt of the number of acknowledgements
1076 /// in `required_acks`
1077 ///
1078 /// `input` - the set of `ProduceMessage`s to send
1079 ///
1080 /// Note: Unlike the higher-level `Producer` API, this method will
1081 /// *not* automatically determine the partition to deliver the
1082 /// message to. It will strictly try to send the message to the
1083 /// specified partition.
1084 ///
1085 /// Note: Trying to send messages to non-existing topics or
1086 /// non-existing partitions will result in an error.
1087 ///
1088 /// # Example
1089 ///
1090 /// ```no_run
1091 /// use std::time::Duration;
1092 /// use kafka::client::{KafkaClient, ProduceMessage, RequiredAcks};
1093 ///
1094 /// let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
1095 /// client.load_metadata_all().unwrap();
1096 /// let req = vec![ProduceMessage::new("my-topic", 0, None, Some("a".as_bytes())),
1097 /// ProduceMessage::new("my-topic-2", 0, None, Some("b".as_bytes()))];
1098 /// let resp = client.produce_messages(RequiredAcks::One, Duration::from_millis(100), req);
1099 /// println!("{:?}", resp);
1100 /// ```
1101 ///
1102 /// The return value will contain a vector of topic, partition,
1103 /// offset and error if any OR error:Error.
1104
1105 // XXX rework signaling an error; note that we need to either return the
1106 // messages which kafka failed to accept or otherwise tell the client about them
1107
1108 pub fn produce_messages<'a, 'b, I, J>(
1109 &mut self,
1110 acks: RequiredAcks,
1111 ack_timeout: Duration,
1112 messages: I,
1113 ) -> Result<Vec<ProduceConfirm>>
1114 where
1115 J: AsRef<ProduceMessage<'a, 'b>>,
1116 I: IntoIterator<Item = J>,
1117 {
1118 self.internal_produce_messages(acks as i16, protocol::to_millis_i32(ack_timeout)?, messages)
1119 }
1120
1121 /// Commit offset for a topic partitions on behalf of a consumer group.
1122 ///
1123 /// # Examples
1124 ///
1125 /// ```no_run
1126 /// use kafka::client::{KafkaClient, CommitOffset};
1127 ///
1128 /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
1129 /// client.load_metadata_all().unwrap();
1130 /// client.commit_offsets("my-group",
1131 /// &[CommitOffset::new("my-topic", 0, 100),
1132 /// CommitOffset::new("my-topic", 1, 99)])
1133 /// .unwrap();
1134 /// ```
1135 ///
1136 /// In this example, we commit the offset 100 for the topic
1137 /// partition "my-topic:0" and 99 for the topic partition
1138 /// "my-topic:1". Once successfully committed, these can then be
1139 /// retrieved using `fetch_group_offsets` even from another
1140 /// process or at much later point in time to resume comusing the
1141 /// topic partitions as of these offsets.
1142 pub fn commit_offsets<'a, J, I>(&mut self, group: &str, offsets: I) -> Result<()>
1143 where
1144 J: AsRef<CommitOffset<'a>>,
1145 I: IntoIterator<Item = J>,
1146 {
1147 match self.config.offset_storage {
1148 Some(offset_storage) => {
1149 let mut req = protocol::OffsetCommitRequest::new(
1150 group,
1151 offset_storage.offset_commit_version(),
1152 self.state.next_correlation_id(),
1153 &self.config.client_id,
1154 );
1155 for o in offsets {
1156 let o = o.as_ref();
1157 if self.state.contains_topic_partition(o.topic, o.partition) {
1158 req.add(o.topic, o.partition, o.offset, "");
1159 } else {
1160 return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
1161 }
1162 }
1163 if req.topic_partitions.is_empty() {
1164 debug!("commit_offsets: no offsets provided");
1165 Ok(())
1166 } else {
1167 __commit_offsets(req, &mut self.state, &mut self.conn_pool, &self.config)
1168 }
1169 }
1170 None => Err(Error::UnsetOffsetStorage),
1171 }
1172 }
1173
1174 /// Commit offset of a particular topic partition on behalf of a
1175 /// consumer group.
1176 ///
1177 /// # Examples
1178 ///
1179 /// ```no_run
1180 /// use kafka::client::KafkaClient;
1181 ///
1182 /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
1183 /// client.load_metadata_all().unwrap();
1184 /// client.commit_offset("my-group", "my-topic", 0, 100).unwrap();
1185 /// ```
1186 ///
1187 /// See also `KafkaClient::commit_offsets`.
1188 pub fn commit_offset(
1189 &mut self,
1190 group: &str,
1191 topic: &str,
1192 partition: i32,
1193 offset: i64,
1194 ) -> Result<()> {
1195 self.commit_offsets(group, &[CommitOffset::new(topic, partition, offset)])
1196 }
1197
1198 /// Fetch offset for a specified list of topic partitions of a consumer group
1199 ///
1200 /// # Examples
1201 ///
1202 /// ```no_run
1203 /// use kafka::client::{KafkaClient, FetchGroupOffset};
1204 ///
1205 /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
1206 /// client.load_metadata_all().unwrap();
1207 ///
1208 /// let offsets =
1209 /// client.fetch_group_offsets("my-group",
1210 /// &[FetchGroupOffset::new("my-topic", 0),
1211 /// FetchGroupOffset::new("my-topic", 1)])
1212 /// .unwrap();
1213 /// ```
1214 ///
1215 /// See also `KafkaClient::fetch_group_topic_offset`.
1216 pub fn fetch_group_offsets<'a, J, I>(
1217 &mut self,
1218 group: &str,
1219 partitions: I,
1220 ) -> Result<HashMap<String, Vec<PartitionOffset>>>
1221 where
1222 J: AsRef<FetchGroupOffset<'a>>,
1223 I: IntoIterator<Item = J>,
1224 {
1225 match self.config.offset_storage {
1226 Some(offset_storage) => {
1227 let mut req = protocol::OffsetFetchRequest::new(
1228 group,
1229 offset_storage.offset_fetch_version(),
1230 self.state.next_correlation_id(),
1231 &self.config.client_id,
1232 );
1233 for p in partitions {
1234 let p = p.as_ref();
1235 if self.state.contains_topic_partition(p.topic, p.partition) {
1236 req.add(p.topic, p.partition);
1237 } else {
1238 return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
1239 }
1240 }
1241 __fetch_group_offsets(req, &mut self.state, &mut self.conn_pool, &self.config)
1242 }
1243 None => Err(Error::UnsetOffsetStorage),
1244 }
1245 }
1246
1247 /// Fetch offset for all partitions of a particular topic of a consumer group
1248 ///
1249 /// # Examples
1250 ///
1251 /// ```no_run
1252 /// use kafka::client::KafkaClient;
1253 ///
1254 /// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
1255 /// client.load_metadata_all().unwrap();
1256 /// let offsets = client.fetch_group_topic_offset("my-group", "my-topic").unwrap();
1257 /// ```
1258 pub fn fetch_group_topic_offset(
1259 &mut self,
1260 group: &str,
1261 topic: &str,
1262 ) -> Result<Vec<PartitionOffset>> {
1263 match self.config.offset_storage {
1264 Some(offset_storage) => {
1265 let mut req = protocol::OffsetFetchRequest::new(
1266 group,
1267 offset_storage.offset_fetch_version(),
1268 self.state.next_correlation_id(),
1269 &self.config.client_id,
1270 );
1271
1272 match self.state.partitions_for(topic) {
1273 None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
1274 Some(tp) => {
1275 for (id, _) in tp {
1276 req.add(topic, id);
1277 }
1278 }
1279 }
1280
1281 Ok(
1282 __fetch_group_offsets(req, &mut self.state, &mut self.conn_pool, &self.config)?
1283 .remove(topic)
1284 .unwrap_or_default(),
1285 )
1286 }
1287 None => Err(Error::UnsetOffsetStorage),
1288 }
1289
1290 // Ok(
1291 // __fetch_group_offsets(req, &mut self.state, &mut self.conn_pool, &self.config)?
1292 // .remove(topic)
1293 // .unwrap_or_default(),
1294 // )
1295 }
1296}
1297
1298impl KafkaClientInternals for KafkaClient {
1299 fn internal_produce_messages<'a, 'b, I, J>(
1300 &mut self,
1301 required_acks: i16,
1302 ack_timeout: i32,
1303 messages: I,
1304 ) -> Result<Vec<ProduceConfirm>>
1305 where
1306 J: AsRef<ProduceMessage<'a, 'b>>,
1307 I: IntoIterator<Item = J>,
1308 {
1309 let state = &mut self.state;
1310 let correlation = state.next_correlation_id();
1311
1312 // ~ map topic and partition to the corresponding brokers
1313 let config = &self.config;
1314 let mut reqs: HashMap<&str, protocol::ProduceRequest<'_, '_>> = HashMap::new();
1315 for msg in messages {
1316 let msg = msg.as_ref();
1317 match state.find_broker(msg.topic, msg.partition) {
1318 None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
1319 Some(broker) => reqs
1320 .entry(broker)
1321 .or_insert_with(|| {
1322 protocol::ProduceRequest::new(
1323 required_acks,
1324 ack_timeout,
1325 correlation,
1326 &config.client_id,
1327 config.compression,
1328 )
1329 })
1330 .add(msg.topic, msg.partition, msg.key, msg.value),
1331 }
1332 }
1333 __produce_messages(&mut self.conn_pool, reqs, required_acks == 0)
1334 }
1335}
1336
1337fn __get_group_coordinator<'a>(
1338 group: &str,
1339 state: &'a mut state::ClientState,
1340 conn_pool: &mut network::Connections,
1341 config: &ClientConfig,
1342 now: Instant,
1343) -> Result<&'a str> {
1344 if let Some(host) = state.group_coordinator(group) {
1345 // ~ decouple the lifetimes to make borrowck happy;
1346 // this is actually safe since we're immediately
1347 // returning this, so the follow up code is not
1348 // affected here
1349 return Ok(unsafe { mem::transmute(host) });
1350 }
1351 let correlation_id = state.next_correlation_id();
1352 let req = protocol::GroupCoordinatorRequest::new(group, correlation_id, &config.client_id);
1353 let mut attempt = 1;
1354 loop {
1355 // ~ idealy we'd make this work even if `load_metadata` has not
1356 // been called yet; if there are no connections available we can
1357 // try connecting to the user specified bootstrap server similar
1358 // to the way `load_metadata` works.
1359 let conn = conn_pool.get_conn_any(now).expect("available connection");
1360 debug!(
1361 "get_group_coordinator: asking for coordinator of '{}' on: {:?}",
1362 group, conn
1363 );
1364 let r = __send_receive_conn::<_, protocol::GroupCoordinatorResponse>(conn, &req)?;
1365 let retry_code = match r.into_result() {
1366 Ok(r) => {
1367 return Ok(state.set_group_coordinator(group, &r));
1368 }
1369 Err(Error::Kafka(e @ KafkaCode::GroupCoordinatorNotAvailable)) => e,
1370 Err(e) => {
1371 return Err(e);
1372 }
1373 };
1374 if attempt < config.retry_max_attempts {
1375 debug!(
1376 "get_group_coordinator: will retry request (c: {}) due to: {:?}",
1377 req.header.correlation_id, retry_code
1378 );
1379 attempt += 1;
1380 __retry_sleep(config);
1381 } else {
1382 return Err(Error::Kafka(retry_code));
1383 }
1384 }
1385}
1386
1387fn __commit_offsets(
1388 req: protocol::OffsetCommitRequest<'_, '_>,
1389 state: &mut state::ClientState,
1390 conn_pool: &mut network::Connections,
1391 config: &ClientConfig,
1392) -> Result<()> {
1393 let mut attempt = 1;
1394 loop {
1395 let now = Instant::now();
1396
1397 let tps = {
1398 let host = __get_group_coordinator(req.group, state, conn_pool, config, now)?;
1399 debug!(
1400 "__commit_offsets: sending offset commit request '{:?}' to: {}",
1401 req, host
1402 );
1403 __send_receive::<_, protocol::OffsetCommitResponse>(conn_pool, host, now, &req)?
1404 .topic_partitions
1405 };
1406
1407 let mut retry_code = None;
1408
1409 'rproc: for tp in tps {
1410 for p in tp.partitions {
1411 match p.to_error() {
1412 None => {}
1413 Some(e @ KafkaCode::GroupLoadInProgress) => {
1414 retry_code = Some(e);
1415 break 'rproc;
1416 }
1417 Some(e @ KafkaCode::NotCoordinatorForGroup) => {
1418 debug!(
1419 "commit_offsets: resetting group coordinator for '{}'",
1420 req.group
1421 );
1422 state.remove_group_coordinator(req.group);
1423 retry_code = Some(e);
1424 break 'rproc;
1425 }
1426 Some(code) => {
1427 // ~ immediately abort with the error
1428 return Err(Error::Kafka(code));
1429 }
1430 }
1431 }
1432 }
1433 match retry_code {
1434 Some(e) => {
1435 if attempt < config.retry_max_attempts {
1436 debug!(
1437 "commit_offsets: will retry request (c: {}) due to: {:?}",
1438 req.header.correlation_id, e
1439 );
1440 attempt += 1;
1441 __retry_sleep(config);
1442 }
1443 }
1444 None => {
1445 return Ok(());
1446 }
1447 }
1448 }
1449}
1450
1451fn __fetch_group_offsets(
1452 req: protocol::OffsetFetchRequest<'_, '_, '_>,
1453 state: &mut state::ClientState,
1454 conn_pool: &mut network::Connections,
1455 config: &ClientConfig,
1456) -> Result<HashMap<String, Vec<PartitionOffset>>> {
1457 let mut attempt = 1;
1458 loop {
1459 let now = Instant::now();
1460
1461 let r = {
1462 let host = __get_group_coordinator(req.group, state, conn_pool, config, now)?;
1463 debug!(
1464 "fetch_group_offsets: sending request {:?} to: {}",
1465 req, host
1466 );
1467 __send_receive::<_, protocol::OffsetFetchResponse>(conn_pool, host, now, &req)?
1468 };
1469
1470 debug!("fetch_group_offsets: received response: {:#?}", r);
1471
1472 let mut retry_code = None;
1473 let mut topic_map = HashMap::with_capacity(r.topic_partitions.len());
1474
1475 'rproc: for tp in r.topic_partitions {
1476 let mut partition_offsets = Vec::with_capacity(tp.partitions.len());
1477
1478 for p in tp.partitions {
1479 match p.get_offsets() {
1480 Ok(o) => {
1481 partition_offsets.push(o);
1482 }
1483 Err(Error::Kafka(e @ KafkaCode::GroupLoadInProgress)) => {
1484 retry_code = Some(e);
1485 break 'rproc;
1486 }
1487 Err(Error::Kafka(e @ KafkaCode::NotCoordinatorForGroup)) => {
1488 debug!(
1489 "fetch_group_offsets: resetting group coordinator for '{}'",
1490 req.group
1491 );
1492 state.remove_group_coordinator(req.group);
1493 retry_code = Some(e);
1494 break 'rproc;
1495 }
1496 Err(e) => {
1497 // ~ immediately abort with the error
1498 return Err(e);
1499 }
1500 }
1501 }
1502
1503 topic_map.insert(tp.topic, partition_offsets);
1504 }
1505
1506 // ~ have we processed the result successfully or shall we
1507 // retry once more?
1508 match retry_code {
1509 Some(e) => {
1510 if attempt < config.retry_max_attempts {
1511 debug!(
1512 "fetch_group_offsets: will retry request (c: {}) due to: {:?}",
1513 req.header.correlation_id, e
1514 );
1515 attempt += 1;
1516 __retry_sleep(config)
1517 } else {
1518 return Err(Error::Kafka(e));
1519 }
1520 }
1521 None => {
1522 return Ok(topic_map);
1523 }
1524 }
1525 }
1526}
1527
1528/// ~ carries out the given fetch requests and returns the response
1529fn __fetch_messages(
1530 conn_pool: &mut network::Connections,
1531 config: &ClientConfig,
1532 reqs: HashMap<&str, protocol::FetchRequest<'_, '_>>,
1533) -> Result<Vec<fetch::Response>> {
1534 let now = Instant::now();
1535 let mut res = Vec::with_capacity(reqs.len());
1536 for (host, req) in reqs {
1537 let p = protocol::fetch::ResponseParser {
1538 validate_crc: config.fetch_crc_validation,
1539 requests: Some(&req),
1540 };
1541 res.push(__z_send_receive(conn_pool, host, now, &req, &p)?);
1542 }
1543 Ok(res)
1544}
1545
1546/// ~ carries out the given produce requests and returns the response
1547fn __produce_messages(
1548 conn_pool: &mut network::Connections,
1549 reqs: HashMap<&str, protocol::ProduceRequest<'_, '_>>,
1550 no_acks: bool,
1551) -> Result<Vec<ProduceConfirm>> {
1552 let now = Instant::now();
1553 if no_acks {
1554 for (host, req) in reqs {
1555 __send_noack::<_, protocol::ProduceResponse>(conn_pool, host, now, req)?;
1556 }
1557 Ok(vec![])
1558 } else {
1559 let mut res: Vec<ProduceConfirm> = vec![];
1560 for (host, req) in reqs {
1561 let resp = __send_receive::<_, protocol::ProduceResponse>(conn_pool, host, now, req)?;
1562 for tpo in resp.get_response() {
1563 res.push(tpo);
1564 }
1565 }
1566 Ok(res)
1567 }
1568}
1569
1570fn __send_receive<T, V>(
1571 conn_pool: &mut network::Connections,
1572 host: &str,
1573 now: Instant,
1574 req: T,
1575) -> Result<V::R>
1576where
1577 T: ToByte,
1578 V: FromByte,
1579{
1580 __send_receive_conn::<T, V>(conn_pool.get_conn(host, now)?, req)
1581}
1582
1583fn __send_receive_conn<T, V>(conn: &mut network::KafkaConnection, req: T) -> Result<V::R>
1584where
1585 T: ToByte,
1586 V: FromByte,
1587{
1588 __send_request(conn, req)?;
1589 __get_response::<V>(conn)
1590}
1591
1592fn __send_noack<T, V>(
1593 conn_pool: &mut network::Connections,
1594 host: &str,
1595 now: Instant,
1596 req: T,
1597) -> Result<usize>
1598where
1599 T: ToByte,
1600 V: FromByte,
1601{
1602 let conn = conn_pool.get_conn(host, now)?;
1603 __send_request(conn, req)
1604}
1605
1606fn __send_request<T: ToByte>(conn: &mut network::KafkaConnection, request: T) -> Result<usize> {
1607 // ~ buffer to receive data to be sent
1608 let mut buffer = Vec::with_capacity(4);
1609 // ~ reserve bytes for the actual request size (we'll fill in that later)
1610 buffer.extend_from_slice(&[0, 0, 0, 0]);
1611 // ~ encode the request data
1612 request.encode(&mut buffer)?;
1613 // ~ put the size of the request data into the reserved area
1614 let size = buffer.len() as i32 - 4;
1615 size.encode(&mut &mut buffer[..])?;
1616
1617 trace!("__send_request: Sending bytes: {:?}", &buffer);
1618
1619 // ~ send the prepared buffer
1620 conn.send(&buffer)
1621}
1622
1623fn __get_response<T: FromByte>(conn: &mut network::KafkaConnection) -> Result<T::R> {
1624 let size = __get_response_size(conn)?;
1625 let resp = conn.read_exact_alloc(size as u64)?;
1626
1627 trace!("__get_response: received bytes: {:?}", &resp);
1628
1629 // {
1630 // use std::fs::OpenOptions;
1631 // use std::io::Write;
1632 // let mut f = OpenOptions::new()
1633 // .write(true)
1634 // .truncate(true)
1635 // .create(true)
1636 // .open("/tmp/dump.dat")
1637 // .unwrap();
1638 // f.write_all(&resp[..]).unwrap();
1639 // }
1640 // ::super::protocol::
1641 // let thing = ::super::error::KafkaCode::from_protocol(self.error); // KafkaCode::decode_new::<T>(&mut Cursor::new(resp));
1642
1643 T::decode_new(&mut Cursor::new(resp))
1644}
1645
1646fn __z_send_receive<R, P>(
1647 conn_pool: &mut network::Connections,
1648 host: &str,
1649 now: Instant,
1650 req: R,
1651 parser: &P,
1652) -> Result<P::T>
1653where
1654 R: ToByte,
1655 P: ResponseParser,
1656{
1657 let conn = conn_pool.get_conn(host, now)?;
1658 __send_request(conn, req)?;
1659 __z_get_response(conn, parser)
1660}
1661
1662fn __z_get_response<P>(conn: &mut network::KafkaConnection, parser: &P) -> Result<P::T>
1663where
1664 P: ResponseParser,
1665{
1666 let size = __get_response_size(conn)?;
1667 let resp = conn.read_exact_alloc(size as u64)?;
1668
1669 // {
1670 // use std::fs::OpenOptions;
1671 // use std::io::Write;
1672 // let mut f = OpenOptions::new()
1673 // .write(true)
1674 // .truncate(true)
1675 // .create(true)
1676 // .open("/tmp/dump.dat")
1677 // .unwrap();
1678 // f.write_all(&resp[..]).unwrap();
1679 // }
1680
1681 parser.parse(resp)
1682}
1683
1684fn __get_response_size(conn: &mut network::KafkaConnection) -> Result<i32> {
1685 let mut buf = [0u8; 4];
1686 conn.read_exact(&mut buf)?;
1687 i32::decode_new(&mut Cursor::new(&buf))
1688}
1689
1690/// Suspends the calling thread for the configured "retry" time. This
1691/// method should be called _only_ as part of a retry attempt.
1692fn __retry_sleep(cfg: &ClientConfig) {
1693 thread::sleep(cfg.retry_backoff_time)
1694}