BaseProducer

Struct BaseProducer 

Source
pub struct BaseProducer<C = DefaultProducerContext, Part = NoCustomPartitioner>
where Part: Partitioner, C: ProducerContext<Part>,
{ /* private fields */ }
Available on non-madsim only.
Expand description

Lowest level Kafka producer.

The BaseProducer needs to be polled at regular intervals in order to serve queued delivery report callbacks (for more information, refer to the module-level documentation).

§Example usage

This code will send a message to Kafka. No custom ProducerContext is specified, so the DefaultProducerContext will be used. To see how to use a producer context, refer to the examples in the examples folder.

use madsim_rdkafka::config::ClientConfig;
use madsim_rdkafka::producer::{BaseProducer, BaseRecord, Producer};
use std::time::Duration;

let producer: BaseProducer = ClientConfig::new()
    .set("bootstrap.servers", "kafka:9092")
    .create()
    .await
    .expect("Producer creation error");

producer.send(
    BaseRecord::to("destination_topic")
        .payload("this is the payload")
        .key("and this is a key"),
).expect("Failed to enqueue");

// Poll at regular intervals to process all the asynchronous delivery events.
for _ in 0..10 {
    producer.poll(Duration::from_millis(100));
}

// And/or flush the producer before dropping it.
producer.flush(Duration::from_secs(1));

Implementations§

Source§

impl<C, Part> BaseProducer<C, Part>
where Part: Partitioner, C: ProducerContext<Part>,

Source

pub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) -> i32

Polls the producer, returning the number of events served.

Regular calls to poll are required to process the events and execute the message delivery callbacks.

Source

pub fn send<'a, K, P>( &self, record: BaseRecord<'a, K, P, C::DeliveryOpaque>, ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
where K: ToBytes + ?Sized, P: ToBytes + ?Sized,

Sends a message to Kafka.

Message fields such as key, payload, partition, timestamp etc. are provided to this method via a BaseRecord. If the message is correctly enqueued in the producer’s memory buffer, the method will take ownership of the record and return immediately; in case of failure to enqueue, the original record is returned, alongside an error code. If the message fails to be produced after being enqueued in the buffer, the ProducerContext::delivery method will be called asynchronously, with the provided ProducerContext::DeliveryOpaque.

When no partition is specified the underlying Kafka library picks a partition based on a hash of the key. If no key is specified, a random partition will be used. To correctly handle errors, the delivery callback should be implemented.

Note that this method will never block.

Trait Implementations§

Source§

impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>
where C: ProducerContext<Part>,

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl FromClientConfig for BaseProducer<DefaultProducerContext>

Source§

fn from_config<'life0, 'async_trait>( config: &'life0 ClientConfig, ) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<DefaultProducerContext>>> + Send + 'async_trait>>
where 'life0: 'async_trait,

Creates a new BaseProducer starting from a configuration.

Source§

impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>
where Part: Partitioner, C: ProducerContext<Part>,

Source§

fn from_config_and_context<'life0, 'async_trait>( config: &'life0 ClientConfig, context: C, ) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<C, Part>>> + Send + 'async_trait>>
where 'life0: 'async_trait,

Creates a new BaseProducer starting from a configuration and a context.

SAFETY: Raw pointer to custom partitioner is used as opaque. It’s comes from reference to field in producer context so it’s valid as the context is valid.

Source§

impl<C, Part> Producer<C, Part> for BaseProducer<C, Part>
where Part: Partitioner, C: ProducerContext<Part>,

Source§

fn client(&self) -> &Client<C>

Returns the Client underlying this producer.
Source§

fn flush<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Flushes any pending messages. Read more
Source§

fn purge(&self, flags: PurgeConfig)

Purge messages currently handled by the producer instance. Read more
Source§

fn in_flight_count(&self) -> i32

Returns the number of messages that are either waiting to be sent or are sent but are waiting to be acknowledged.
Source§

fn init_transactions<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Enable sending transactions with this producer. Read more
Source§

fn begin_transaction(&self) -> KafkaResult<()>

Begins a new transaction. Read more
Source§

fn send_offsets_to_transaction<'life0, 'life1, 'life2, 'async_trait, T>( &'life0 self, offsets: &'life1 TopicPartitionList, cgm: &'life2 ConsumerGroupMetadata, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Associates an offset commit operation with this transaction. Read more
Source§

fn commit_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Commits the current transaction. Read more
Source§

fn abort_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Aborts the current transaction. Read more
Source§

fn context(&self) -> &Arc<C>

Returns a reference to the ProducerContext used to create this producer.

Auto Trait Implementations§

§

impl<C, Part> Freeze for BaseProducer<C, Part>

§

impl<C, Part> RefUnwindSafe for BaseProducer<C, Part>
where Part: RefUnwindSafe, C: RefUnwindSafe,

§

impl<C, Part> Send for BaseProducer<C, Part>

§

impl<C, Part> Sync for BaseProducer<C, Part>

§

impl<C, Part> Unpin for BaseProducer<C, Part>
where Part: Unpin,

§

impl<C, Part> UnwindSafe for BaseProducer<C, Part>
where Part: UnwindSafe, C: RefUnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.