rdkafka/producer/
mod.rs

1//! Kafka producers.
2//!
3//! ## The C librdkafka producer
4//!
5//! Rust-rdkafka relies on the C librdkafka producer to communicate with Kafka,
6//! so in order to understand how the Rust producers work it is important to
7//! understand the basics of the C one as well.
8//!
9//! ### Async
10//!
11//! The librdkafka producer is completely asynchronous: it maintains a memory
12//! buffer where messages waiting to be sent or currently in flight are stored.
13//! Once a message is delivered or an error occurred and the maximum number of
14//! retries has been reached, the producer will enqueue a delivery event with
15//! the appropriate delivery result into an internal event queue.
16//!
17//! The librdkafka user is responsible for calling the `poll` function at
18//! regular intervals to process those events; the thread calling `poll` will be
19//! the one executing the user-specified delivery callback for every delivery
20//! event. If `poll` is not called, or not frequently enough, the producer will
21//! return a [`RDKafkaErrorCode::QueueFull`] error and it won't be able to send
22//! any other message until more delivery events are processed via `poll`. The
23//! `QueueFull` error can also be returned if Kafka is not able to receive the
24//! messages quickly enough.
25//!
26//! ### Error reporting
27//!
28//! The C library will try deal with all the transient errors such as broker
29//! disconnection, timeouts etc. These errors, called global errors, are
30//! automatically logged in rust-rdkafka, but they normally don't require any
31//! handling as they are automatically handled internally. To see the logs, make
32//! sure you initialize the logger.
33//!
34//! As mentioned earlier, errors specific to message production will be reported
35//! in the delivery callback.
36//!
37//! ### Buffering
38//!
39//! Buffering is done automatically by librdkafka. When `send` is called, the
40//! message is enqueued internally and once enough messages have been enqueued,
41//! or when enough time has passed, they will be sent to Kafka as a single
42//! batch. You can control the behavior of the buffer by configuring the the
43//! `queue.buffering.max.*` parameters listed below.
44//!
45//! ## `rust-rdkafka` producers
46//!
47//! `rust-rdkafka` (rdkafka for brevity) provides two sets of producers: low
48//! level and high level.
49//!
50//! ### Low-level producers
51//!
52//! The lowest level producer provided by rdkafka is called [`BaseProducer`].
53//! The goal of the `BaseProducer` is to be as close as possible to the C one
54//! while maintaining a safe Rust interface. In particular, the `BaseProducer`
55//! needs to be polled at regular intervals to execute any delivery callback
56//! that might be waiting and to make sure the queue doesn't fill up.
57//!
58//! Another low lever producer is the [`ThreadedProducer`], which is a
59//! `BaseProducer` with a dedicated thread for polling.
60//!
61//! The delivery callback can be defined using a `ProducerContext`. See the
62//! [`base_producer`] module for more information.
63//!
64//! ### High-level producer
65//!
66//! At the moment the only high level producer implemented is the
67//! [`FutureProducer`]. The `FutureProducer` doesn't rely on user-defined
68//! callbacks to notify the delivery or failure of a message; instead, this
69//! information will be returned in a Future. The `FutureProducer` also uses an
70//! internal thread that is used for polling, which makes calling poll
71//! explicitly not necessary. The returned future will contain information about
72//! the delivered message in case of success, or a copy of the original message
73//! in case of failure. Additional computation can be chained to the returned
74//! future, and it will executed by the future executor once the value is
75//! available (for more information, check the documentation of the futures
76//! crate).
77//!
78//! ## Transactions
79//!
80//! All rust-rdkafka producers support transactions. Transactional producers
81//! work together with transaction-aware consumers configured with the default
82//! `isolation.level` of `read_committed`.
83//!
84//! To configure a producer for transactions set `transactional.id` to an
85//! identifier unique to the application when creating the producer. After
86//! creating the producer, you must initialize it with
87//! [`Producer::init_transactions`].
88//!
89//! To start a new transaction use [`Producer::begin_transaction`]. There can be
90//! **only one ongoing transaction** at a time per producer. All records sent
91//! after starting a transaction and before committing or aborting it will
92//! automatically be associated with that transaction.
93//!
94//! Once you have initialized transactions on a producer, you are not permitted
95//! to produce messages outside of a transaction.
96//!
97//! Consumer offsets can be sent as part of the ongoing transaction using
98//! `send_offsets_to_transaction` and will be committed atomically with the
99//! other records sent in the transaction.
100//!
101//! The current transaction can be committed with
102//! [`Producer::commit_transaction`] or aborted using
103//! [`Producer::abort_transaction`]. Afterwards, a new transaction can begin.
104//!
105//! ### Errors
106//!
107//! Errors returned by transaction methods may:
108//!
109//! * be retriable ([`RDKafkaError::is_retriable`]), in which case the operation
110//!   that encountered the error may be retried.
111//! * require abort ([`RDKafkaError::txn_requires_abort`], in which case the
112//!   current transaction must be aborted and a new transaction begun.
113//! * be fatal ([`RDKafkaError::is_fatal`]), in which case the producer must be
114//!   stopped and the application terminated.
115//!
116//! For more details about transactions, see the [Transactional Producer]
117//! section of the librdkafka introduction.
118//!
119//! ## Configuration
120//!
121//! ### Producer configuration
122//!
123//! For the configuration parameters common to both producers and consumers,
124//! refer to the documentation in the `config` module. Here are listed the most
125//! commonly used producer configuration. Click
126//! [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
127//! for the full list.
128//!
129//! - `queue.buffering.max.messages`: Maximum number of messages allowed on the
130//!   producer queue. Default: 100000.
131//! - `queue.buffering.max.kbytes`: Maximum total message size sum allowed on
132//!   the producer queue. This property has higher priority than
133//!   queue.buffering.max.messages. Default: 4000000.
134//! - `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in
135//!   the producer queue to accumulate before sending a request to the brokers.
136//!   A higher value allows larger and more effective (less overhead, improved
137//!   compression) batches of messages to accumulate at the expense of increased
138//!   message delivery latency. Default: 0.
139//! - `message.send.max.retries`: How many times to retry sending a failing
140//!   batch. Note: retrying may cause reordering. Default: 2.
141//! - `compression.codec`: Compression codec to use for compressing message
142//!   sets. Default: none.
143//! - `request.required.acks`: This field indicates how many acknowledgements
144//!   the leader broker must receive from ISR brokers before responding to the
145//!   request: 0=Broker does not send any response/ack to client, 1=Only the
146//!   leader broker will need to ack the message, -1 or all=broker will block
147//!   until message is committed by all in sync replicas (ISRs) or broker's
148//!   in.sync.replicas setting before sending response. Default: 1.
149//! - `request.timeout.ms`: The ack timeout of the producer request in
150//!   milliseconds. This value is only enforced by the broker and relies on
151//!   request.required.acks being != 0. Default: 5000.
152//! - `message.timeout.ms`: Local message timeout. This value is only enforced
153//!   locally and limits the time a produced message waits for successful
154//!   delivery. A time of 0 is infinite. Default: 300000.
155//!
156//! [`RDKafkaErrorCode::QueueFull`]: crate::error::RDKafkaErrorCode::QueueFull
157//! [`RDKafkaError::is_retriable`]: crate::error::RDKafkaError::is_retriable
158//! [`RDKafkaError::txn_requires_abort`]: crate::error::RDKafkaError::txn_requires_abort
159//! [`RDKafkaError::is_fatal`]: crate::error::RDKafkaError::is_fatal
160//! [Transactional Producer]: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#transactional-producer
161
162use std::sync::Arc;
163
164use crate::client::{Client, ClientContext};
165use crate::consumer::ConsumerGroupMetadata;
166use crate::error::KafkaResult;
167use crate::topic_partition_list::TopicPartitionList;
168use crate::util::{IntoOpaque, Timeout};
169
170pub mod base_producer;
171pub mod future_producer;
172
173#[doc(inline)]
174pub use self::base_producer::{BaseProducer, BaseRecord, DeliveryResult, ThreadedProducer};
175#[doc(inline)]
176pub use self::future_producer::{DeliveryFuture, FutureProducer, FutureRecord};
177
178//
179// ********** PRODUCER CONTEXT **********
180//
181
182/// Producer-specific context.
183///
184/// This user-defined object can be used to provide custom callbacks for
185/// producer events. Refer to the list of methods to check which callbacks can
186/// be specified. It can also specify custom partitioner to register and to be
187/// used for deciding to which partition write message into.
188///
189/// In particular, it can be used to specify the `delivery` callback that will
190/// be called when the acknowledgement for a delivered message is received.
191///
192/// See also the [`ClientContext`] trait.
193pub trait ProducerContext<Part: Partitioner = NoCustomPartitioner>: ClientContext {
194    /// A `DeliveryOpaque` is a user-defined structure that will be passed to
195    /// the producer when producing a message, and returned to the `delivery`
196    /// method once the message has been delivered, or failed to.
197    type DeliveryOpaque: IntoOpaque;
198
199    /// This method will be called once the message has been delivered (or
200    /// failed to). The `DeliveryOpaque` will be the one provided by the user
201    /// when calling send.
202    fn delivery(&self, delivery_result: &DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque);
203
204    /// This method is called when creating producer in order to optionally register custom partitioner.
205    /// If custom partitioner is not used then `partitioner` configuration property is used (or its default).
206    ///
207    /// sticky.partitioning.linger.ms must be 0 to run custom partitioner for messages with null key.
208    /// See https://github.com/confluentinc/librdkafka/blob/081fd972fa97f88a1e6d9a69fc893865ffbb561a/src/rdkafka_msg.c#L1192-L1196
209    fn get_custom_partitioner(&self) -> Option<&Part> {
210        None
211    }
212}
213
214/// Unassigned partition.
215/// See RD_KAFKA_PARTITION_UA from librdkafka.
216pub const PARTITION_UA: i32 = -1;
217
218/// Trait allowing to customize the partitioning of messages.
219pub trait Partitioner {
220    /// Return partition to use for `topic_name`.
221    /// `topic_name` is the name of a topic to which a message is being produced.
222    /// `partition_cnt` is the number of partitions for this topic.
223    /// `key` is an optional key of the message.
224    /// `is_partition_available` is a function that can be called to check if a partition has an active leader broker.
225    ///
226    /// It may be called in any thread at any time,
227    /// It may be called multiple times for the same message/key.
228    /// MUST NOT block or execute for prolonged periods of time.
229    /// MUST return a value between 0 and partition_cnt-1, or the
230    /// special RD_KAFKA_PARTITION_UA value if partitioning could not be performed.
231    /// See documentation for rd_kafka_topic_conf_set_partitioner_cb from librdkafka for more info.
232    fn partition(
233        &self,
234        topic_name: &str,
235        key: Option<&[u8]>,
236        partition_cnt: i32,
237        is_partition_available: impl Fn(i32) -> bool,
238    ) -> i32;
239}
240
241/// Placeholder used when no custom partitioner is needed.
242#[derive(Clone)]
243pub struct NoCustomPartitioner {}
244
245impl Partitioner for NoCustomPartitioner {
246    fn partition(
247        &self,
248        _topic_name: &str,
249        _key: Option<&[u8]>,
250        _partition_cnt: i32,
251        _is_paritition_available: impl Fn(i32) -> bool,
252    ) -> i32 {
253        panic!("NoCustomPartitioner should not be called");
254    }
255}
256
257/// An inert producer context that can be used when customizations are not
258/// required.
259#[derive(Clone)]
260pub struct DefaultProducerContext;
261
262impl ClientContext for DefaultProducerContext {}
263impl ProducerContext<NoCustomPartitioner> for DefaultProducerContext {
264    type DeliveryOpaque = ();
265
266    fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {}
267}
268
269/// Common trait for all producers.
270pub trait Producer<C = DefaultProducerContext, Part = NoCustomPartitioner>
271where
272    Part: Partitioner,
273    C: ProducerContext<Part>,
274{
275    /// Returns the [`Client`] underlying this producer.
276    fn client(&self) -> &Client<C>;
277
278    /// Returns a reference to the [`ProducerContext`] used to create this
279    /// producer.
280    fn context(&self) -> &Arc<C> {
281        self.client().context()
282    }
283
284    /// Returns the number of messages that are either waiting to be sent or are
285    /// sent but are waiting to be acknowledged.
286    fn in_flight_count(&self) -> i32;
287
288    /// Flushes any pending messages.
289    ///
290    /// This method should be called before termination to ensure delivery of
291    /// all enqueued messages. It will call `poll()` internally.
292    fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
293
294    /// Purge messages currently handled by the producer instance.
295    ///
296    /// See the [`PurgeConfig`] documentation for the list of flags that may be provided.
297    ///
298    /// If providing an empty set of flags, nothing will be purged.
299    ///
300    /// The application will need to call `::poll()` or `::flush()`
301    /// afterwards to serve the delivery report callbacks of the purged messages.
302    ///
303    /// Messages purged from internal queues fail with the delivery report
304    /// error code set to
305    /// [`KafkaError::MessageProduction(RDKafkaErrorCode::PurgeQueue)`](crate::error::RDKafkaErrorCode::PurgeQueue),
306    /// while purged messages that are in-flight to or from the broker will fail
307    /// with the error code set to
308    /// [`KafkaError::MessageProduction(RDKafkaErrorCode::PurgeInflight)`](crate::error::RDKafkaErrorCode::PurgeInflight).
309    ///
310    /// This call may block for a short time while background thread queues are purged.
311    fn purge(&self, flags: PurgeConfig);
312
313    /// Enable sending transactions with this producer.
314    ///
315    /// # Prerequisites
316    ///
317    /// * The configuration used to create the producer must include a
318    ///   `transactional.id` setting.
319    /// * You must not have sent any messages or called any of the other
320    ///   transaction-related functions.
321    ///
322    /// # Details
323    ///
324    /// This function ensures any transactions initiated by previous producers
325    /// with the same `transactional.id` are completed. Any transactions left
326    /// open by any such previous producers will be aborted.
327    ///
328    /// Once previous transactions have been fenced, this function acquires an
329    /// internal producer ID and epoch that will be used by all transactional
330    /// messages sent by this producer.
331    ///
332    /// If this function returns successfully, messages may only be sent to this
333    /// producer when a transaction is active. See
334    /// [`Producer::begin_transaction`].
335    ///
336    /// This function may block for the specified `timeout`.
337    fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
338
339    /// Begins a new transaction.
340    ///
341    /// # Prerequisites
342    ///
343    /// You must have successfully called [`Producer::init_transactions`].
344    ///
345    /// # Details
346    ///
347    /// This function begins a new transaction, and implicitly associates that
348    /// open transaction with this producer.
349    ///
350    /// After a successful call to this function, any messages sent via this
351    /// producer or any calls to [`Producer::send_offsets_to_transaction`] will
352    /// be implicitly associated with this transaction, until the transaction is
353    /// finished.
354    ///
355    /// Finish the transaction by calling [`Producer::commit_transaction`] or
356    /// [`Producer::abort_transaction`].
357    ///
358    /// While a transaction is open, you must perform at least one transaction
359    /// operation every `transaction.timeout.ms` to avoid timing out the
360    /// transaction on the broker.
361    fn begin_transaction(&self) -> KafkaResult<()>;
362
363    /// Associates an offset commit operation with this transaction.
364    ///
365    /// # Prerequisites
366    ///
367    /// The producer must have an open transaction via a call to
368    /// [`Producer::begin_transaction`].
369    ///
370    /// # Details
371    ///
372    /// Sends a list of topic partition offsets to the consumer group
373    /// coordinator for `cgm`, and marks the offsets as part of the current
374    /// transaction. These offsets will be considered committed only if the
375    /// transaction is committed successfully.
376    ///
377    /// The offsets should be the next message your application will consume,
378    /// i.e., one greater than the the last processed message's offset for each
379    /// partition.
380    ///
381    /// Use this method at the end of a consume-transform-produce loop, prior to
382    /// comitting the transaction with [`Producer::commit_transaction`].
383    ///
384    /// This function may block for the specified `timeout`.
385    ///
386    /// # Hints
387    ///
388    /// To obtain the correct consumer group metadata, call
389    /// [`Consumer::group_metadata`] on the consumer for which offsets are being
390    /// committed.
391    ///
392    /// The consumer must not have automatic commits enabled.
393    ///
394    /// [`Consumer::group_metadata`]: crate::consumer::Consumer::group_metadata
395    fn send_offsets_to_transaction<T: Into<Timeout>>(
396        &self,
397        offsets: &TopicPartitionList,
398        cgm: &ConsumerGroupMetadata,
399        timeout: T,
400    ) -> KafkaResult<()>;
401
402    /// Commits the current transaction.
403    ///
404    /// # Prerequisites
405    ///
406    /// The producer must have an open transaction via a call to
407    /// [`Producer::begin_transaction`].
408    ///
409    /// # Details
410    ///
411    /// Any outstanding messages will be flushed (i.e., delivered) before
412    /// actually committing the transaction.
413    ///
414    /// If any of the outstanding messages fail permanently, the current
415    /// transaction will enter an abortable error state and this function will
416    /// return an abortable error. You must then call
417    /// [`Producer::abort_transaction`] before attemping to create another
418    /// transaction.
419    ///
420    /// This function may block for the specified `timeout`.
421    fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
422
423    /// Aborts the current transaction.
424    ///
425    /// # Prerequisites
426    ///
427    /// The producer must have an open transaction via a call to
428    /// [`Producer::begin_transaction`].
429    ///
430    /// # Details
431    ///
432    /// Any oustanding messages will be purged and failed with
433    /// [`RDKafkaErrorCode::PurgeInflight`] or [`RDKafkaErrorCode::PurgeQueue`].
434    ///
435    /// This function should also be used to recover from non-fatal abortable
436    /// transaction errors.
437    ///
438    /// This function may block for the specified `timeout`.
439    ///
440    /// [`RDKafkaErrorCode::PurgeInflight`]: crate::error::RDKafkaErrorCode::PurgeInflight
441    /// [`RDKafkaErrorCode::PurgeQueue`]: crate::error::RDKafkaErrorCode::PurgeQueue
442    fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
443}
444
445/// Settings to provide to [`Producer::purge`] to parametrize the purge behavior
446///
447/// `PurgeConfig::default()` corresponds to a setting where nothing is purged.
448///
449/// # Example
450/// To purge both queued messages and in-flight messages:
451/// ```
452/// # use rdkafka::producer::PurgeConfig;
453/// let settings = PurgeConfig::default().queue().inflight();
454/// ```
455#[derive(Default, Clone, Copy)]
456pub struct PurgeConfig {
457    flag_bits: i32,
458}
459impl PurgeConfig {
460    /// Purge messages in internal queues. This does not purge inflight messages.
461    #[inline]
462    pub fn queue(self) -> Self {
463        Self {
464            flag_bits: self.flag_bits | rdkafka_sys::RD_KAFKA_PURGE_F_QUEUE,
465        }
466    }
467    /// Purge messages in-flight to or from the broker.
468    /// Purging these messages will void any future acknowledgements from the
469    /// broker, making it impossible for the application to know if these
470    /// messages were successfully delivered or not.
471    /// Retrying these messages may lead to duplicates.
472    ///
473    /// This does not purge messages in internal queues.
474    #[inline]
475    pub fn inflight(self) -> Self {
476        Self {
477            flag_bits: self.flag_bits | rdkafka_sys::RD_KAFKA_PURGE_F_INFLIGHT,
478        }
479    }
480    /// Don't wait for background thread queue purging to finish.
481    #[inline]
482    pub fn non_blocking(self) -> Self {
483        Self {
484            flag_bits: self.flag_bits | rdkafka_sys::RD_KAFKA_PURGE_F_NON_BLOCKING,
485        }
486    }
487}
488
489macro_rules! negative_and_debug_impls {
490    ($($f: ident -> !$set_fn: ident,)*) => {
491        impl PurgeConfig {
492            $(
493                #[inline]
494                #[doc = concat!("Unsets the flag set by [`", stringify!($set_fn), "`](PurgeConfig::", stringify!($set_fn),")")]
495                pub fn $f(self) -> Self {
496                    Self {
497                        flag_bits: self.flag_bits & !PurgeConfig::default().$set_fn().flag_bits,
498                    }
499                }
500            )*
501        }
502        impl std::fmt::Debug for PurgeConfig {
503            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
504                // Simulate a struct that holds a set of booleans
505                let mut d = f.debug_struct("PurgeConfig");
506                $(
507                    d.field(
508                        stringify!($set_fn),
509                        &((self.flag_bits & Self::default().$set_fn().flag_bits) != 0),
510                    );
511                )*
512                d.finish()
513            }
514        }
515    };
516}
517negative_and_debug_impls! {
518    no_queue -> !queue,
519    no_inflight -> !inflight,
520    blocking -> !non_blocking,
521}