grafbase_sdk/host_io/kafka/
producer.rs

1use std::time::Duration;
2
3use crate::{SdkError, wit};
4
5use super::{KafkaAuthentication, KafkaTlsConfig};
6
7/// A Kafka producer client for publishing messages to Kafka topics.
8pub struct KafkaProducer {
9    pub(super) inner: wit::KafkaProducer,
10}
11
12impl KafkaProducer {
13    /// Produces a message to the Kafka topic.
14    ///
15    /// # Arguments
16    ///
17    /// * `key` - Optional message key for partitioning and message ordering
18    /// * `value` - The message payload as bytes
19    pub fn produce(&self, key: Option<&str>, value: &[u8]) -> Result<(), SdkError> {
20        self.inner.produce(key, value)?;
21        Ok(())
22    }
23}
24
25/// Configuration options for a Kafka producer.
26pub struct KafkaProducerConfig {
27    /// Compression algorithm to use for message batches
28    compression: wit::KafkaProducerCompression,
29    /// Optional list of specific partitions to produce to
30    partitions: Option<Vec<i32>>,
31    /// Optional batching configuration for message processing
32    batching: Option<KafkaBatchConfig>,
33    /// Optional TLS configuration for secure connections
34    tls: Option<wit::KafkaTlsConfig>,
35    /// Optional authentication configuration
36    authentication: Option<wit::KafkaAuthentication>,
37}
38
39impl KafkaProducerConfig {
40    /// Sets the compression algorithm to use for message batches.
41    ///
42    /// # Arguments
43    ///
44    /// * `compression` - The compression algorithm to apply to message batches
45    pub fn compression(&mut self, compression: KafkaProducerCompression) {
46        self.compression = compression.into();
47    }
48
49    /// Sets the specific partitions that this producer should send messages to.
50    ///
51    /// # Arguments
52    ///
53    /// * `partitions` - A list of partition IDs to produce messages to
54    pub fn partitions(&mut self, partitions: Vec<i32>) {
55        self.partitions = Some(partitions);
56    }
57
58    /// Sets the batching configuration for message processing.
59    ///
60    /// # Arguments
61    ///
62    /// * `batching` - The batching configuration settings to use
63    pub fn batching(&mut self, batching: KafkaBatchConfig) {
64        self.batching = Some(batching);
65    }
66
67    /// Sets the TLS configuration for secure connections to Kafka brokers.
68    pub fn tls(&mut self, tls: KafkaTlsConfig) {
69        self.tls = Some(tls.into());
70    }
71
72    /// Sets the authentication configuration for connecting to Kafka brokers.
73    ///
74    /// # Arguments
75    ///
76    /// * `authentication` - The authentication settings to use
77    pub fn authentication(&mut self, authentication: KafkaAuthentication) {
78        self.authentication = Some(authentication.into());
79    }
80}
81
82/// Configuration options for Kafka message batching.
83pub struct KafkaBatchConfig {
84    /// Maximum number of bytes to include in a single batch
85    pub max_size_bytes: u64,
86    /// Time to wait before sending a batch of messages
87    pub linger: Duration,
88}
89
90/// Compression algorithms available for Kafka message batches.
91pub enum KafkaProducerCompression {
92    /// No compression applied to messages
93    None,
94    /// Gzip compression algorithm
95    Gzip,
96    /// Snappy compression algorithm
97    Snappy,
98    /// LZ4 compression algorithm
99    Lz4,
100    /// Zstandard compression algorithm
101    Zstd,
102}
103
104impl From<KafkaProducerCompression> for wit::KafkaProducerCompression {
105    fn from(value: KafkaProducerCompression) -> Self {
106        match value {
107            KafkaProducerCompression::None => wit::KafkaProducerCompression::None,
108            KafkaProducerCompression::Gzip => wit::KafkaProducerCompression::Gzip,
109            KafkaProducerCompression::Snappy => wit::KafkaProducerCompression::Snappy,
110            KafkaProducerCompression::Lz4 => wit::KafkaProducerCompression::Lz4,
111            KafkaProducerCompression::Zstd => wit::KafkaProducerCompression::Zstd,
112        }
113    }
114}
115
116impl From<KafkaProducerConfig> for wit::KafkaProducerConfig {
117    fn from(value: KafkaProducerConfig) -> Self {
118        Self {
119            compression: value.compression,
120            client_config: wit::KafkaClientConfig {
121                partitions: value.partitions,
122                tls: value.tls,
123                authentication: value.authentication,
124            },
125            batching: value.batching.map(Into::into),
126        }
127    }
128}
129
130impl From<KafkaBatchConfig> for wit::KafkaBatchConfig {
131    fn from(value: KafkaBatchConfig) -> Self {
132        Self {
133            linger_ms: value.linger.as_millis() as u64,
134            batch_size_bytes: value.max_size_bytes,
135        }
136    }
137}
138
139impl Default for KafkaProducerConfig {
140    fn default() -> Self {
141        Self {
142            compression: wit::KafkaProducerCompression::None,
143            partitions: None,
144            batching: None,
145            tls: None,
146            authentication: None,
147        }
148    }
149}