grafbase_sdk/host_io/
kafka.rs

1//! # Kafka Producer Module
2//!
3//!
4//! This module provides a high-level Rust API for creating and configuring Kafka producers.
5//! It wraps the lower-level WIT (WebAssembly Interface Types) bindings to offer a more
6//! ergonomic interface for publishing messages to Kafka topics.
7//!
8//! ## Quick Start
9//!
10//! ```rust,no_run
11//! # use grafbase_sdk::{SdkError, host_io::kafka::{self, KafkaProducerConfig}};
12//!
13//! # fn main() -> Result<(), SdkError> {
14//! // Create a basic producer configuration
15//! let config = KafkaProducerConfig::default();
16//!
17//! // Connect to Kafka
18//! let producer = kafka::producer(
19//!     "my-producer",
20//!     ["localhost:9092"],
21//!     "my-topic",
22//!     config
23//! )?;
24//! # Ok(())
25//! # }
26//! ```
27//!
28//! ## Authentication
29//!
30//! The module supports multiple authentication methods:
31//!
32//! - **SASL/PLAIN**: Username and password authentication
33//! - **SASL/SCRAM**: SHA256 and SHA512 SCRAM mechanisms
34//! - **mTLS**: Mutual TLS with client certificates
35//!
36//! ## TLS Configuration
37//!
38//! TLS connections can be configured to use either the system's certificate authority
39//! store or a custom CA certificate file.
40
41mod consumer;
42mod producer;
43
44pub use consumer::{KafkaConsumer, KafkaConsumerConfig, KafkaMessage};
45pub use producer::{KafkaBatchConfig, KafkaProducer, KafkaProducerCompression, KafkaProducerConfig};
46
47use crate::{SdkError, wit};
48use std::path::{Path, PathBuf};
49
50/// Connects to Kafka servers and creates a new Kafka producer.
51pub fn producer(
52    name: &str,
53    servers: impl IntoIterator<Item = impl ToString>,
54    topic: &str,
55    config: KafkaProducerConfig,
56) -> Result<KafkaProducer, SdkError> {
57    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
58    let config = config.into();
59    let producer = wit::KafkaProducer::connect(name, &servers, topic, &config)?;
60
61    Ok(KafkaProducer { inner: producer })
62}
63
64/// Connects to Kafka servers and creates a new Kafka consumer.
65pub fn consumer(
66    servers: impl IntoIterator<Item = impl ToString>,
67    topic: &str,
68    config: KafkaConsumerConfig,
69) -> Result<KafkaConsumer, SdkError> {
70    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
71    let config = config.into();
72    let consumer = wit::KafkaConsumer::connect(&servers, topic, &config)?;
73
74    Ok(KafkaConsumer { inner: consumer })
75}
76
77/// TLS configuration options for Kafka connections.
78pub enum KafkaTlsConfig {
79    /// Use the system's default certificate authority store
80    SystemCa,
81    /// Use a custom certificate authority from the specified file path
82    CustomCa(PathBuf),
83}
84
85impl KafkaTlsConfig {
86    /// Creates a TLS configuration that uses the system's default certificate authority store.
87    pub fn system_ca() -> Self {
88        Self::SystemCa
89    }
90
91    /// Creates a TLS configuration that uses a custom certificate authority from a file.
92    ///
93    /// # Arguments
94    ///
95    /// * `ca_cert_path` - Path to the custom certificate authority file
96    pub fn custom_ca(ca_cert_path: impl AsRef<Path>) -> Self {
97        Self::CustomCa(ca_cert_path.as_ref().to_path_buf())
98    }
99}
100
101impl From<KafkaTlsConfig> for wit::KafkaTlsConfig {
102    fn from(value: KafkaTlsConfig) -> Self {
103        match value {
104            KafkaTlsConfig::SystemCa => wit::KafkaTlsConfig::SystemCa,
105            KafkaTlsConfig::CustomCa(path) => wit::KafkaTlsConfig::CustomCa(path.to_string_lossy().to_string()),
106        }
107    }
108}
109
110enum KafkaAuthenticationInner {
111    SaslPlain(wit::KafkaSaslPlainAuth),
112    SaslScram(wit::KafkaSaslScramAuth),
113    Mtls(wit::KafkaMtlsAuth),
114}
115
116/// Authentication configuration for Kafka connections.
117pub struct KafkaAuthentication {
118    inner: KafkaAuthenticationInner,
119}
120
121impl KafkaAuthentication {
122    /// Creates a SASL/PLAIN authentication configuration.
123    ///
124    /// # Arguments
125    ///
126    /// * `username` - Username for authentication
127    /// * `password` - Password for authentication
128    pub fn sasl_plain(username: impl ToString, password: impl ToString) -> Self {
129        Self {
130            inner: KafkaAuthenticationInner::SaslPlain(wit::KafkaSaslPlainAuth {
131                username: username.to_string(),
132                password: password.to_string(),
133            }),
134        }
135    }
136
137    /// Creates a SASL/SCRAM SHA256 authentication configuration.
138    ///
139    /// # Arguments
140    ///
141    /// * `username` - Username for authentication
142    /// * `password` - Password for authentication
143    pub fn sasl_scram_sha256(username: impl ToString, password: impl ToString) -> Self {
144        Self {
145            inner: KafkaAuthenticationInner::SaslScram(wit::KafkaSaslScramAuth {
146                username: username.to_string(),
147                password: password.to_string(),
148                mechanism: wit::KafkaScramMechanism::Sha256,
149            }),
150        }
151    }
152
153    /// Creates a SASL/SCRAM SHA256 authentication configuration.
154    ///
155    /// # Arguments
156    ///
157    /// * `username` - Username for authentication
158    /// * `password` - Password for authentication
159    pub fn sasl_scram_sha512(username: impl ToString, password: impl ToString) -> Self {
160        Self {
161            inner: KafkaAuthenticationInner::SaslScram(wit::KafkaSaslScramAuth {
162                username: username.to_string(),
163                password: password.to_string(),
164                mechanism: wit::KafkaScramMechanism::Sha512,
165            }),
166        }
167    }
168
169    /// Creates a mTLS authentication configuration.
170    ///
171    /// # Arguments
172    ///
173    /// * `cert_path` - Path to the client certificate file
174    /// * `key_path` - Path to the client key file
175    pub fn mtls(cert_path: impl AsRef<Path>, key_path: impl AsRef<Path>) -> Self {
176        Self {
177            inner: KafkaAuthenticationInner::Mtls(wit::KafkaMtlsAuth {
178                client_cert_path: cert_path.as_ref().to_string_lossy().to_string(),
179                client_key_path: key_path.as_ref().to_string_lossy().to_string(),
180            }),
181        }
182    }
183}
184
185impl From<KafkaAuthentication> for wit::KafkaAuthentication {
186    fn from(value: KafkaAuthentication) -> Self {
187        match value.inner {
188            KafkaAuthenticationInner::SaslPlain(wit::KafkaSaslPlainAuth { username, password }) => {
189                wit::KafkaAuthentication::SaslPlain(wit::KafkaSaslPlainAuth { username, password })
190            }
191            KafkaAuthenticationInner::SaslScram(wit::KafkaSaslScramAuth {
192                username,
193                password,
194                mechanism,
195            }) => wit::KafkaAuthentication::SaslScram(wit::KafkaSaslScramAuth {
196                username,
197                password,
198                mechanism,
199            }),
200            KafkaAuthenticationInner::Mtls(wit::KafkaMtlsAuth {
201                client_cert_path,
202                client_key_path,
203            }) => wit::KafkaAuthentication::Mtls(wit::KafkaMtlsAuth {
204                client_cert_path,
205                client_key_path,
206            }),
207        }
208    }
209}