grafbase_sdk/host_io/kafka/
consumer.rs1use std::{borrow::Cow, time::Duration};
2
3use chrono::{DateTime, Utc};
4
5use crate::{
6 SdkError, Subscription,
7 types::{Error, Response, SubscriptionItem},
8 wit,
9};
10
11use super::{KafkaAuthentication, KafkaTlsConfig};
12
13pub struct KafkaConsumer {
15 pub(super) inner: wit::KafkaConsumer,
16}
17
18impl KafkaConsumer {
19 pub fn next(&self) -> Result<Option<KafkaMessage>, SdkError> {
25 self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
26 }
27}
28
29impl Subscription for KafkaConsumer {
30 fn next(&mut self) -> Result<Option<SubscriptionItem>, Error> {
31 match KafkaConsumer::next(self) {
32 Ok(Some(msg)) => Ok(Some(
33 Response::json(msg.inner.value.unwrap_or_else(|| b"null".into())).into(),
34 )),
35 Ok(None) => Ok(None),
36 Err(err) => Err(format!("Error receiving Kafka message: {err}").into()),
37 }
38 }
39}
40
41pub struct KafkaMessage {
43 inner: wit::KafkaMessage,
44}
45
46impl KafkaMessage {
47 pub fn key(&self) -> Option<Cow<'_, str>> {
51 self.raw_key().map(|key| String::from_utf8_lossy(key))
52 }
53
54 pub fn raw_key(&self) -> Option<&[u8]> {
58 self.inner.key.as_deref()
59 }
60
61 pub fn value<S>(&self) -> Result<Option<S>, SdkError>
63 where
64 S: for<'de> serde::Deserialize<'de>,
65 {
66 match self.raw_value() {
67 Some(value) => serde_json::from_slice(value).map_err(Into::into),
68 None => Ok(None),
69 }
70 }
71
72 pub fn raw_value(&self) -> Option<&[u8]> {
76 self.inner.value.as_deref()
77 }
78
79 pub fn into_raw_value(self) -> Option<Vec<u8>> {
81 self.inner.value
82 }
83
84 pub fn offset(&self) -> i64 {
88 self.inner.offset
89 }
90
91 pub fn get_header_value<S>(&self, key: &str) -> Result<Option<S>, SdkError>
97 where
98 S: for<'de> serde::Deserialize<'de>,
99 {
100 match self.get_raw_header_value(key) {
101 Some(value) => {
102 let value = serde_json::from_slice(value)?;
103 Ok(Some(value))
104 }
105 None => Ok(None),
106 }
107 }
108
109 pub fn get_raw_header_value(&self, key: &str) -> Option<&[u8]> {
111 match self.inner.headers.binary_search_by(|item| item.0.as_str().cmp(key)) {
114 Ok(index) => Some(self.inner.headers[index].1.as_ref()),
115 Err(_) => None,
116 }
117 }
118
119 pub fn timestamp(&self) -> DateTime<Utc> {
123 DateTime::from_timestamp(self.inner.timestamp, 0)
124 .expect("we converted this from a datetime in the host, it must be valid")
125 }
126
127 pub fn high_watermark(&self) -> i64 {
131 self.inner.high_watermark
132 }
133}
134
135impl From<wit::KafkaMessage> for KafkaMessage {
136 fn from(inner: wit::KafkaMessage) -> Self {
137 Self { inner }
138 }
139}
140
141pub struct KafkaConsumerConfig {
146 min_batch_size: Option<i32>,
148 max_batch_size: Option<i32>,
150 max_wait_time: Option<Duration>,
152 client_config: wit::KafkaClientConfig,
154 start_offset: wit::KafkaConsumerStartOffset,
156}
157
158impl KafkaConsumerConfig {
159 pub fn min_batch_size(&mut self, min_batch_size: i32) {
161 self.min_batch_size = Some(min_batch_size);
162 }
163
164 pub fn max_batch_size(&mut self, max_batch_size: i32) {
166 self.max_batch_size = Some(max_batch_size);
167 }
168
169 pub fn max_wait_time(&mut self, max_wait_time: Duration) {
171 self.max_wait_time = Some(max_wait_time);
172 }
173
174 pub fn from_latest_offset(&mut self) {
178 self.start_offset = wit::KafkaConsumerStartOffset::Latest;
179 }
180
181 pub fn from_earliest_offset(&mut self) {
185 self.start_offset = wit::KafkaConsumerStartOffset::Earliest;
186 }
187
188 pub fn from_specific_offset(&mut self, offset: i64) {
190 self.start_offset = wit::KafkaConsumerStartOffset::Specific(offset);
191 }
192
193 pub fn tls(&mut self, tls: KafkaTlsConfig) {
195 self.client_config.tls = Some(tls.into());
196 }
197
198 pub fn authentication(&mut self, authentication: KafkaAuthentication) {
200 self.client_config.authentication = Some(authentication.into());
201 }
202
203 pub fn partitions(&mut self, partitions: Vec<i32>) {
207 self.client_config.partitions = Some(partitions);
208 }
209}
210
211impl Default for KafkaConsumerConfig {
212 fn default() -> Self {
213 Self {
214 min_batch_size: None,
215 max_batch_size: None,
216 max_wait_time: None,
217 client_config: wit::KafkaClientConfig {
218 partitions: None,
219 tls: None,
220 authentication: None,
221 },
222 start_offset: wit::KafkaConsumerStartOffset::Latest,
223 }
224 }
225}
226
227impl From<KafkaConsumerConfig> for wit::KafkaConsumerConfig {
228 fn from(value: KafkaConsumerConfig) -> Self {
229 Self {
230 min_batch_size: value.min_batch_size,
231 max_batch_size: value.max_batch_size,
232 max_wait_ms: value.max_wait_time.map(|ms| ms.as_millis() as i32),
233 client_config: value.client_config,
234 start_offset: value.start_offset,
235 }
236 }
237}