[][src]Crate cloudevents_sdk_rdkafka

This library provides Kafka protocol bindings for CloudEvents using the rust-rdkafka library.

To produce Cloudevents:


use cloudevents::Event;
use rdkafka::producer::{FutureProducer, FutureRecord};
use cloudevents_sdk_rdkafka::{MessageRecord, FutureRecordExt};

let message_record = MessageRecord::from_event(event)
  .expect("error while serializing the event");

producer.send(
    FutureRecord::to("topic")
        .key("some_event")
        .message_record(&message_record),
    0
).await;

To consume Cloudevents:

use rdkafka::consumer::{StreamConsumer, DefaultConsumerContext, Consumer, CommitMode};
use cloudevents_sdk_rdkafka::MessageExt;
use futures::StreamExt;

let mut message_stream = consumer.start();

while let Some(message) = message_stream.next().await {
    match message {
        Err(e) => println!("Kafka error: {}", e),
        Ok(m) => {
            let event = m.to_event().expect("error while deserializing record to CloudEvent");
            println!("Received Event: {:#?}", event);
            consumer.commit_message(&m, CommitMode::Async).unwrap();
        }
    };
}

Structs

ConsumerRecordDeserializer

Wrapper for Message that implements [MessageDeserializer] trait.

MessageRecord

This struct contains a serialized CloudEvent message in the Kafka shape. Implements [StructuredSerializer] & [BinarySerializer] traits.

Traits

BaseRecordExt

Extension Trait for BaseRecord that fills the record with a MessageRecord.

FutureRecordExt

Extension Trait for FutureRecord that fills the record with a MessageRecord.

MessageExt

Extension Trait for Message which acts as a wrapper for the function record_to_event()

Functions

record_to_event

Method to transform a Message to [Event].