avantis_utils/
kafka.rs

1use bytes::Bytes;
2use rdkafka::producer::FutureRecord;
3use serde::Deserialize;
4use std::ops::Deref;
5
6pub mod consumer;
7pub mod producer;
8
9#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
10pub struct KafkaConfig {
11    pub brokers_csv: String,
12    pub flush_duration_millis: u64,
13    pub poll_duration_millis: u64,
14    pub security_protocol: Option<String>,
15}
16
17pub struct ProtobufKafkaRecord<'a> {
18    pub topic: &'a str,
19    pub message: ProtobufKafkaMessage,
20}
21
22pub struct ProtobufKafkaMessage {
23    pub key: String,
24    pub value: Bytes,
25}
26
27impl<'a> From<&'a ProtobufKafkaRecord<'a>> for FutureRecord<'a, String, [u8]> {
28    fn from(record: &'a ProtobufKafkaRecord<'a>) -> FutureRecord<'a, String, [u8]> {
29        FutureRecord::to(record.topic)
30            .key(&record.message.key)
31            .payload(record.message.value.deref())
32    }
33}