1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
//! This library provides Kafka protocol bindings for CloudEvents //! using the [rust-rdkafka](https://fede1024.github.io/rust-rdkafka) library. //! //! To produce Cloudevents: //! //! ``` //! //! use cloudevents::Event; //! use rdkafka::producer::{FutureProducer, FutureRecord}; //! use rdkafka::util::Timeout; //! use cloudevents_sdk_rdkafka::{MessageRecord, FutureRecordExt}; //! //! # async fn produce(producer: &FutureProducer, event: Event) -> Result<(), Box<dyn std::error::Error>> { //! let message_record = MessageRecord::from_event(event)?; //! //! producer.send( //! FutureRecord::to("topic") //! .key("some_event") //! .message_record(&message_record), //! Timeout::Never //! ).await; //! # Ok(()) //! # } //! //! ``` //! //! To consume Cloudevents: //! //! ``` //! use rdkafka::consumer::{StreamConsumer, DefaultConsumerContext, Consumer, CommitMode}; //! use cloudevents_sdk_rdkafka::MessageExt; //! use futures::StreamExt; //! //! # async fn consume(consumer: StreamConsumer<DefaultConsumerContext>) -> Result<(), Box<dyn std::error::Error>> { //! let mut message_stream = consumer.start(); //! //! while let Some(message) = message_stream.next().await { //! match message { //! Err(e) => println!("Kafka error: {}", e), //! Ok(m) => { //! let event = m.to_event()?; //! println!("Received Event: {}", event); //! consumer.commit_message(&m, CommitMode::Async)?; //! } //! }; //! } //! # Ok(()) //! # } //! ``` #![doc(html_root_url = "https://docs.rs/cloudevents-sdk-rdkafka/0.3.0")] #![deny(broken_intra_doc_links)] #[macro_use] mod headers; mod kafka_consumer_record; mod kafka_producer_record; pub use kafka_consumer_record::record_to_event; pub use kafka_consumer_record::ConsumerRecordDeserializer; pub use kafka_consumer_record::MessageExt; pub use kafka_producer_record::BaseRecordExt; pub use kafka_producer_record::FutureRecordExt; pub use kafka_producer_record::MessageRecord;