grafbase_sdk/host_io/kafka/
consumer.rs1use std::{borrow::Cow, time::Duration};
2
3use chrono::{DateTime, Utc};
4
5use crate::{
6 SdkError, Subscription,
7 types::{Error, SubscriptionOutput},
8 wit,
9};
10
11use super::{KafkaAuthentication, KafkaTlsConfig};
12
13pub struct KafkaConsumer {
15 pub(super) inner: wit::KafkaConsumer,
16}
17
18impl KafkaConsumer {
19 pub fn next(&self) -> Result<Option<KafkaMessage>, SdkError> {
25 self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
26 }
27}
28
29impl Subscription for KafkaConsumer {
30 fn next(&mut self) -> Result<Option<SubscriptionOutput>, Error> {
31 let item = match KafkaConsumer::next(self) {
32 Ok(Some(item)) => item,
33 Ok(None) => return Ok(None),
34 Err(e) => return Err(format!("Error receiving Kafka message: {e}").into()),
35 };
36
37 let mut builder = SubscriptionOutput::builder();
38
39 let message: Option<serde_json::Value> = item
40 .value()
41 .map_err(|e| format!("Error parsing Kafka message as JSON: {e}"))?;
42
43 match message {
44 Some(message) => {
45 builder.push(message)?;
46 }
47 None => {
48 builder.push(serde_json::Value::Null)?;
49 }
50 }
51
52 Ok(Some(builder.build()))
53 }
54}
55
56pub struct KafkaMessage {
58 inner: wit::KafkaMessage,
59}
60
61impl KafkaMessage {
62 pub fn key(&self) -> Option<Cow<'_, str>> {
66 self.raw_key().map(|key| String::from_utf8_lossy(key))
67 }
68
69 pub fn raw_key(&self) -> Option<&[u8]> {
73 self.inner.key.as_deref()
74 }
75
76 pub fn value<S>(&self) -> Result<Option<S>, SdkError>
78 where
79 S: for<'de> serde::Deserialize<'de>,
80 {
81 match self.raw_value() {
82 Some(value) => serde_json::from_slice(value).map_err(Into::into),
83 None => Ok(None),
84 }
85 }
86
87 pub fn raw_value(&self) -> Option<&[u8]> {
91 self.inner.value.as_deref()
92 }
93
94 pub fn offset(&self) -> i64 {
98 self.inner.offset
99 }
100
101 pub fn get_header_value<S>(&self, key: &str) -> Result<Option<S>, SdkError>
107 where
108 S: for<'de> serde::Deserialize<'de>,
109 {
110 match self.get_raw_header_value(key) {
111 Some(value) => {
112 let value = serde_json::from_slice(value)?;
113 Ok(Some(value))
114 }
115 None => Ok(None),
116 }
117 }
118
119 pub fn get_raw_header_value(&self, key: &str) -> Option<&[u8]> {
121 match self.inner.headers.binary_search_by(|item| item.0.as_str().cmp(key)) {
124 Ok(index) => Some(self.inner.headers[index].1.as_ref()),
125 Err(_) => None,
126 }
127 }
128
129 pub fn timestamp(&self) -> DateTime<Utc> {
133 DateTime::from_timestamp(self.inner.timestamp, 0)
134 .expect("we converted this from a datetime in the host, it must be valid")
135 }
136
137 pub fn high_watermark(&self) -> i64 {
141 self.inner.high_watermark
142 }
143}
144
145impl From<wit::KafkaMessage> for KafkaMessage {
146 fn from(inner: wit::KafkaMessage) -> Self {
147 Self { inner }
148 }
149}
150
151pub struct KafkaConsumerConfig {
156 min_batch_size: Option<i32>,
158 max_batch_size: Option<i32>,
160 max_wait_time: Option<Duration>,
162 client_config: wit::KafkaClientConfig,
164 start_offset: wit::KafkaConsumerStartOffset,
166}
167
168impl KafkaConsumerConfig {
169 pub fn min_batch_size(&mut self, min_batch_size: i32) {
171 self.min_batch_size = Some(min_batch_size);
172 }
173
174 pub fn max_batch_size(&mut self, max_batch_size: i32) {
176 self.max_batch_size = Some(max_batch_size);
177 }
178
179 pub fn max_wait_time(&mut self, max_wait_time: Duration) {
181 self.max_wait_time = Some(max_wait_time);
182 }
183
184 pub fn from_latest_offset(&mut self) {
188 self.start_offset = wit::KafkaConsumerStartOffset::Latest;
189 }
190
191 pub fn from_earliest_offset(&mut self) {
195 self.start_offset = wit::KafkaConsumerStartOffset::Earliest;
196 }
197
198 pub fn from_specific_offset(&mut self, offset: i64) {
200 self.start_offset = wit::KafkaConsumerStartOffset::Specific(offset);
201 }
202
203 pub fn tls(&mut self, tls: KafkaTlsConfig) {
205 self.client_config.tls = Some(tls.into());
206 }
207
208 pub fn authentication(&mut self, authentication: KafkaAuthentication) {
210 self.client_config.authentication = Some(authentication.into());
211 }
212
213 pub fn partitions(&mut self, partitions: Vec<i32>) {
217 self.client_config.partitions = Some(partitions);
218 }
219}
220
221impl Default for KafkaConsumerConfig {
222 fn default() -> Self {
223 Self {
224 min_batch_size: None,
225 max_batch_size: None,
226 max_wait_time: None,
227 client_config: wit::KafkaClientConfig {
228 partitions: None,
229 tls: None,
230 authentication: None,
231 },
232 start_offset: wit::KafkaConsumerStartOffset::Latest,
233 }
234 }
235}
236
237impl From<KafkaConsumerConfig> for wit::KafkaConsumerConfig {
238 fn from(value: KafkaConsumerConfig) -> Self {
239 Self {
240 min_batch_size: value.min_batch_size,
241 max_batch_size: value.max_batch_size,
242 max_wait_ms: value.max_wait_time.map(|ms| ms.as_millis() as i32),
243 client_config: value.client_config,
244 start_offset: value.start_offset,
245 }
246 }
247}