grafbase_sdk/host_io/kafka/
producer.rs1use std::time::Duration;
2
3use crate::{SdkError, wit};
4
5use super::{KafkaAuthentication, KafkaTlsConfig};
6
7pub struct KafkaProducer {
9 pub(super) inner: wit::KafkaProducer,
10}
11
12impl KafkaProducer {
13 pub fn produce(&self, key: Option<&str>, value: &[u8]) -> Result<(), SdkError> {
20 self.inner.produce(key, value)?;
21 Ok(())
22 }
23}
24
25pub struct KafkaProducerConfig {
27 compression: wit::KafkaProducerCompression,
29 partitions: Option<Vec<i32>>,
31 batching: Option<KafkaBatchConfig>,
33 tls: Option<wit::KafkaTlsConfig>,
35 authentication: Option<wit::KafkaAuthentication>,
37}
38
39impl KafkaProducerConfig {
40 pub fn compression(&mut self, compression: KafkaProducerCompression) {
46 self.compression = compression.into();
47 }
48
49 pub fn partitions(&mut self, partitions: Vec<i32>) {
55 self.partitions = Some(partitions);
56 }
57
58 pub fn batching(&mut self, batching: KafkaBatchConfig) {
64 self.batching = Some(batching);
65 }
66
67 pub fn tls(&mut self, tls: KafkaTlsConfig) {
69 self.tls = Some(tls.into());
70 }
71
72 pub fn authentication(&mut self, authentication: KafkaAuthentication) {
78 self.authentication = Some(authentication.into());
79 }
80}
81
82pub struct KafkaBatchConfig {
84 pub max_size_bytes: u64,
86 pub linger: Duration,
88}
89
90pub enum KafkaProducerCompression {
92 None,
94 Gzip,
96 Snappy,
98 Lz4,
100 Zstd,
102}
103
104impl From<KafkaProducerCompression> for wit::KafkaProducerCompression {
105 fn from(value: KafkaProducerCompression) -> Self {
106 match value {
107 KafkaProducerCompression::None => wit::KafkaProducerCompression::None,
108 KafkaProducerCompression::Gzip => wit::KafkaProducerCompression::Gzip,
109 KafkaProducerCompression::Snappy => wit::KafkaProducerCompression::Snappy,
110 KafkaProducerCompression::Lz4 => wit::KafkaProducerCompression::Lz4,
111 KafkaProducerCompression::Zstd => wit::KafkaProducerCompression::Zstd,
112 }
113 }
114}
115
116impl From<KafkaProducerConfig> for wit::KafkaProducerConfig {
117 fn from(value: KafkaProducerConfig) -> Self {
118 Self {
119 compression: value.compression,
120 client_config: wit::KafkaClientConfig {
121 partitions: value.partitions,
122 tls: value.tls,
123 authentication: value.authentication,
124 },
125 batching: value.batching.map(Into::into),
126 }
127 }
128}
129
130impl From<KafkaBatchConfig> for wit::KafkaBatchConfig {
131 fn from(value: KafkaBatchConfig) -> Self {
132 Self {
133 linger_ms: value.linger.as_millis() as u64,
134 batch_size_bytes: value.max_size_bytes,
135 }
136 }
137}
138
139impl Default for KafkaProducerConfig {
140 fn default() -> Self {
141 Self {
142 compression: wit::KafkaProducerCompression::None,
143 partitions: None,
144 batching: None,
145 tls: None,
146 authentication: None,
147 }
148 }
149}