Expand description
This library provides Kafka protocol bindings for CloudEvents using the rust-rdkafka library.
To produce Cloudevents:
use cloudevents::Event;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
use cloudevents_sdk_rdkafka::{MessageRecord, FutureRecordExt};
let message_record = MessageRecord::from_event(event)?;
producer.send(
FutureRecord::to("topic")
.key("some_event")
.message_record(&message_record),
Timeout::Never
).await;
To consume Cloudevents:
use rdkafka::consumer::{StreamConsumer, DefaultConsumerContext, Consumer, CommitMode};
use cloudevents_sdk_rdkafka::MessageExt;
use futures::StreamExt;
let mut message_stream = consumer.start();
while let Some(message) = message_stream.next().await {
match message {
Err(e) => println!("Kafka error: {}", e),
Ok(m) => {
let event = m.to_event()?;
println!("Received Event: {}", event);
consumer.commit_message(&m, CommitMode::Async)?;
}
};
}
Structs§
- Consumer
Record Deserializer - Wrapper for
Message
that implementsMessageDeserializer
trait. - Message
Record - This struct contains a serialized CloudEvent message in the Kafka shape.
Implements
StructuredSerializer
&BinarySerializer
traits.
Traits§
- Base
Record Ext - Extension Trait for
BaseRecord
that fills the record with aMessageRecord
. - Future
Record Ext - Extension Trait for
FutureRecord
that fills the record with aMessageRecord
. - Message
Ext - Extension Trait for
Message
which acts as a wrapper for the functionrecord_to_event()
.
Functions§
- record_
to_ event - Method to transform a
Message
toEvent
.