grafbase_sdk/host_io/
kafka.rs1mod consumer;
42mod producer;
43
44pub use consumer::{KafkaConsumer, KafkaConsumerConfig, KafkaMessage};
45pub use producer::{KafkaBatchConfig, KafkaProducer, KafkaProducerCompression, KafkaProducerConfig};
46
47use crate::{SdkError, wit};
48use std::path::{Path, PathBuf};
49
50pub fn producer(
52 name: &str,
53 servers: impl IntoIterator<Item = impl ToString>,
54 topic: &str,
55 config: KafkaProducerConfig,
56) -> Result<KafkaProducer, SdkError> {
57 let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
58 let config = config.into();
59 let producer = wit::KafkaProducer::connect(name, &servers, topic, &config)?;
60
61 Ok(KafkaProducer { inner: producer })
62}
63
64pub fn consumer(
66 servers: impl IntoIterator<Item = impl ToString>,
67 topic: &str,
68 config: KafkaConsumerConfig,
69) -> Result<KafkaConsumer, SdkError> {
70 let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
71 let config = config.into();
72 let consumer = wit::KafkaConsumer::connect(&servers, topic, &config)?;
73
74 Ok(KafkaConsumer { inner: consumer })
75}
76
77pub enum KafkaTlsConfig {
79 SystemCa,
81 CustomCa(PathBuf),
83}
84
85impl KafkaTlsConfig {
86 pub fn system_ca() -> Self {
88 Self::SystemCa
89 }
90
91 pub fn custom_ca(ca_cert_path: impl AsRef<Path>) -> Self {
97 Self::CustomCa(ca_cert_path.as_ref().to_path_buf())
98 }
99}
100
101impl From<KafkaTlsConfig> for wit::KafkaTlsConfig {
102 fn from(value: KafkaTlsConfig) -> Self {
103 match value {
104 KafkaTlsConfig::SystemCa => wit::KafkaTlsConfig::SystemCa,
105 KafkaTlsConfig::CustomCa(path) => wit::KafkaTlsConfig::CustomCa(path.to_string_lossy().to_string()),
106 }
107 }
108}
109
110enum KafkaAuthenticationInner {
111 SaslPlain(wit::KafkaSaslPlainAuth),
112 SaslScram(wit::KafkaSaslScramAuth),
113 Mtls(wit::KafkaMtlsAuth),
114}
115
116pub struct KafkaAuthentication {
118 inner: KafkaAuthenticationInner,
119}
120
121impl KafkaAuthentication {
122 pub fn sasl_plain(username: impl ToString, password: impl ToString) -> Self {
129 Self {
130 inner: KafkaAuthenticationInner::SaslPlain(wit::KafkaSaslPlainAuth {
131 username: username.to_string(),
132 password: password.to_string(),
133 }),
134 }
135 }
136
137 pub fn sasl_scram_sha256(username: impl ToString, password: impl ToString) -> Self {
144 Self {
145 inner: KafkaAuthenticationInner::SaslScram(wit::KafkaSaslScramAuth {
146 username: username.to_string(),
147 password: password.to_string(),
148 mechanism: wit::KafkaScramMechanism::Sha256,
149 }),
150 }
151 }
152
153 pub fn sasl_scram_sha512(username: impl ToString, password: impl ToString) -> Self {
160 Self {
161 inner: KafkaAuthenticationInner::SaslScram(wit::KafkaSaslScramAuth {
162 username: username.to_string(),
163 password: password.to_string(),
164 mechanism: wit::KafkaScramMechanism::Sha512,
165 }),
166 }
167 }
168
169 pub fn mtls(cert_path: impl AsRef<Path>, key_path: impl AsRef<Path>) -> Self {
176 Self {
177 inner: KafkaAuthenticationInner::Mtls(wit::KafkaMtlsAuth {
178 client_cert_path: cert_path.as_ref().to_string_lossy().to_string(),
179 client_key_path: key_path.as_ref().to_string_lossy().to_string(),
180 }),
181 }
182 }
183}
184
185impl From<KafkaAuthentication> for wit::KafkaAuthentication {
186 fn from(value: KafkaAuthentication) -> Self {
187 match value.inner {
188 KafkaAuthenticationInner::SaslPlain(wit::KafkaSaslPlainAuth { username, password }) => {
189 wit::KafkaAuthentication::SaslPlain(wit::KafkaSaslPlainAuth { username, password })
190 }
191 KafkaAuthenticationInner::SaslScram(wit::KafkaSaslScramAuth {
192 username,
193 password,
194 mechanism,
195 }) => wit::KafkaAuthentication::SaslScram(wit::KafkaSaslScramAuth {
196 username,
197 password,
198 mechanism,
199 }),
200 KafkaAuthenticationInner::Mtls(wit::KafkaMtlsAuth {
201 client_cert_path,
202 client_key_path,
203 }) => wit::KafkaAuthentication::Mtls(wit::KafkaMtlsAuth {
204 client_cert_path,
205 client_key_path,
206 }),
207 }
208 }
209}