pub struct BaseProducer<C = DefaultProducerContext, Part = NoCustomPartitioner>where
Part: Partitioner,
C: ProducerContext<Part>,{ /* private fields */ }madsim only.Expand description
Lowest level Kafka producer.
The BaseProducer needs to be polled at regular intervals in order to serve
queued delivery report callbacks (for more information, refer to the
module-level documentation).
§Example usage
This code will send a message to Kafka. No custom ProducerContext is
specified, so the DefaultProducerContext will be used. To see how to use
a producer context, refer to the examples in the examples folder.
use madsim_rdkafka::config::ClientConfig;
use madsim_rdkafka::producer::{BaseProducer, BaseRecord, Producer};
use std::time::Duration;
let producer: BaseProducer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.create()
.await
.expect("Producer creation error");
producer.send(
BaseRecord::to("destination_topic")
.payload("this is the payload")
.key("and this is a key"),
).expect("Failed to enqueue");
// Poll at regular intervals to process all the asynchronous delivery events.
for _ in 0..10 {
producer.poll(Duration::from_millis(100));
}
// And/or flush the producer before dropping it.
producer.flush(Duration::from_secs(1));Implementations§
Source§impl<C, Part> BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
impl<C, Part> BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
Sourcepub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) -> i32
pub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) -> i32
Polls the producer, returning the number of events served.
Regular calls to poll are required to process the events and execute
the message delivery callbacks.
Sourcepub fn send<'a, K, P>(
&self,
record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
pub fn send<'a, K, P>( &self, record: BaseRecord<'a, K, P, C::DeliveryOpaque>, ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
Sends a message to Kafka.
Message fields such as key, payload, partition, timestamp etc. are
provided to this method via a BaseRecord. If the message is
correctly enqueued in the producer’s memory buffer, the method will take
ownership of the record and return immediately; in case of failure to
enqueue, the original record is returned, alongside an error code. If
the message fails to be produced after being enqueued in the buffer, the
ProducerContext::delivery method will be called asynchronously, with
the provided ProducerContext::DeliveryOpaque.
When no partition is specified the underlying Kafka library picks a partition based on a hash of the key. If no key is specified, a random partition will be used. To correctly handle errors, the delivery callback should be implemented.
Note that this method will never block.
Trait Implementations§
Source§impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>where
C: ProducerContext<Part>,
impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>where
C: ProducerContext<Part>,
Source§impl FromClientConfig for BaseProducer<DefaultProducerContext>
impl FromClientConfig for BaseProducer<DefaultProducerContext>
Source§fn from_config<'life0, 'async_trait>(
config: &'life0 ClientConfig,
) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<DefaultProducerContext>>> + Send + 'async_trait>>where
'life0: 'async_trait,
fn from_config<'life0, 'async_trait>(
config: &'life0 ClientConfig,
) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<DefaultProducerContext>>> + Send + 'async_trait>>where
'life0: 'async_trait,
Creates a new BaseProducer starting from a configuration.
Source§impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
Source§fn from_config_and_context<'life0, 'async_trait>(
config: &'life0 ClientConfig,
context: C,
) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<C, Part>>> + Send + 'async_trait>>where
'life0: 'async_trait,
fn from_config_and_context<'life0, 'async_trait>(
config: &'life0 ClientConfig,
context: C,
) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<C, Part>>> + Send + 'async_trait>>where
'life0: 'async_trait,
Creates a new BaseProducer starting from a configuration and a
context.
SAFETY: Raw pointer to custom partitioner is used as opaque. It’s comes from reference to field in producer context so it’s valid as the context is valid.
Source§impl<C, Part> Producer<C, Part> for BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
impl<C, Part> Producer<C, Part> for BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
Source§fn flush<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn flush<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn purge(&self, flags: PurgeConfig)
fn purge(&self, flags: PurgeConfig)
Source§fn in_flight_count(&self) -> i32
fn in_flight_count(&self) -> i32
Source§fn init_transactions<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn init_transactions<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn begin_transaction(&self) -> KafkaResult<()>
fn begin_transaction(&self) -> KafkaResult<()>
Source§fn send_offsets_to_transaction<'life0, 'life1, 'life2, 'async_trait, T>(
&'life0 self,
offsets: &'life1 TopicPartitionList,
cgm: &'life2 ConsumerGroupMetadata,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn send_offsets_to_transaction<'life0, 'life1, 'life2, 'async_trait, T>( &'life0 self, offsets: &'life1 TopicPartitionList, cgm: &'life2 ConsumerGroupMetadata, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn commit_transaction<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn commit_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn abort_transaction<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn abort_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn context(&self) -> &Arc<C>
fn context(&self) -> &Arc<C>
ProducerContext used to create this
producer.