[−][src]Crate cloudevents_sdk_rdkafka
This library provides Kafka protocol bindings for CloudEvents using the rust-rdkafka library.
To produce Cloudevents:
use cloudevents::Event; use rdkafka::producer::{FutureProducer, FutureRecord}; use cloudevents_sdk_rdkafka::{MessageRecord, FutureRecordExt}; let message_record = MessageRecord::from_event(event) .expect("error while serializing the event"); producer.send( FutureRecord::to("topic") .key("some_event") .message_record(&message_record), 0 ).await;
To consume Cloudevents:
use rdkafka::consumer::{StreamConsumer, DefaultConsumerContext, Consumer, CommitMode}; use cloudevents_sdk_rdkafka::MessageExt; use futures::StreamExt; let mut message_stream = consumer.start(); while let Some(message) = message_stream.next().await { match message { Err(e) => println!("Kafka error: {}", e), Ok(m) => { let event = m.to_event().expect("error while deserializing record to CloudEvent"); println!("Received Event: {:#?}", event); consumer.commit_message(&m, CommitMode::Async).unwrap(); } }; }
Structs
ConsumerRecordDeserializer | Wrapper for |
MessageRecord | This struct contains a serialized CloudEvent message in the Kafka shape.
Implements [ |
Traits
BaseRecordExt | Extension Trait for |
FutureRecordExt | Extension Trait for |
MessageExt | Extension Trait for |
Functions
record_to_event | Method to transform a |