grafbase_sdk/host_io/kafka/
consumer.rs

1use std::{borrow::Cow, time::Duration};
2
3use chrono::{DateTime, Utc};
4
5use crate::{
6    SdkError, Subscription,
7    types::{Error, SubscriptionOutput},
8    wit,
9};
10
11use super::{KafkaAuthentication, KafkaTlsConfig};
12
13/// A Kafka consumer that can receive messages from Kafka topics.
14pub struct KafkaConsumer {
15    pub(super) inner: wit::KafkaConsumer,
16}
17
18impl KafkaConsumer {
19    /// Retrieves the next message from the Kafka consumer.
20    ///
21    /// Returns `Ok(Some(message))` if a message is available,
22    /// `Ok(None)` if no message is currently available,
23    /// or `Err` if an error occurred while consuming.
24    pub fn next(&self) -> Result<Option<KafkaMessage>, SdkError> {
25        self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
26    }
27}
28
29impl Subscription for KafkaConsumer {
30    fn next(&mut self) -> Result<Option<SubscriptionOutput>, Error> {
31        let item = match KafkaConsumer::next(self) {
32            Ok(Some(item)) => item,
33            Ok(None) => return Ok(None),
34            Err(e) => return Err(format!("Error receiving Kafka message: {e}").into()),
35        };
36
37        let mut builder = SubscriptionOutput::builder();
38
39        let message: Option<serde_json::Value> = item
40            .value()
41            .map_err(|e| format!("Error parsing Kafka message as JSON: {e}"))?;
42
43        match message {
44            Some(message) => {
45                builder.push(message)?;
46            }
47            None => {
48                builder.push(serde_json::Value::Null)?;
49            }
50        }
51
52        Ok(Some(builder.build()))
53    }
54}
55
56/// A Kafka message containing key, value, headers, and metadata.
57pub struct KafkaMessage {
58    inner: wit::KafkaMessage,
59}
60
61impl KafkaMessage {
62    /// Returns the message key as a UTF-8 string, if present.
63    ///
64    /// The key is used for partitioning and message ordering within a partition.
65    pub fn key(&self) -> Option<Cow<'_, str>> {
66        self.raw_key().map(|key| String::from_utf8_lossy(key))
67    }
68
69    /// Returns the raw message key as bytes, if present.
70    ///
71    /// This provides access to the unprocessed message key without UTF-8 conversion.
72    pub fn raw_key(&self) -> Option<&[u8]> {
73        self.inner.key.as_deref()
74    }
75
76    /// Deserializes the message value from JSON.
77    pub fn value<S>(&self) -> Result<Option<S>, SdkError>
78    where
79        S: for<'de> serde::Deserialize<'de>,
80    {
81        match self.raw_value() {
82            Some(value) => serde_json::from_slice(value).map_err(Into::into),
83            None => Ok(None),
84        }
85    }
86
87    /// Returns the raw message value as bytes, if present.
88    ///
89    /// This provides access to the unprocessed message value without any JSON deserialization.
90    pub fn raw_value(&self) -> Option<&[u8]> {
91        self.inner.value.as_deref()
92    }
93
94    /// Returns the message offset within the partition.
95    ///
96    /// The offset is a unique identifier for the message within its partition.
97    pub fn offset(&self) -> i64 {
98        self.inner.offset
99    }
100
101    /// Gets a header value by key and deserializes it from JSON.
102    ///
103    /// Returns `Ok(Some(value))` if the header exists and can be deserialized,
104    /// `Ok(None)` if the header doesn't exist,
105    /// or `Err` if deserialization fails.
106    pub fn get_header_value<S>(&self, key: &str) -> Result<Option<S>, SdkError>
107    where
108        S: for<'de> serde::Deserialize<'de>,
109    {
110        match self.get_raw_header_value(key) {
111            Some(value) => {
112                let value = serde_json::from_slice(value)?;
113                Ok(Some(value))
114            }
115            None => Ok(None),
116        }
117    }
118
119    /// Gets a raw header value by key as bytes.
120    pub fn get_raw_header_value(&self, key: &str) -> Option<&[u8]> {
121        // The kafka headers come as btree, which means they are sorted by the key.
122        // This means we can use binary search to find the header value.
123        match self.inner.headers.binary_search_by(|item| item.0.as_str().cmp(key)) {
124            Ok(index) => Some(self.inner.headers[index].1.as_ref()),
125            Err(_) => None,
126        }
127    }
128
129    /// Returns the timestamp when the message was produced.
130    ///
131    /// The timestamp is in UTC and represents when the message was created.
132    pub fn timestamp(&self) -> DateTime<Utc> {
133        DateTime::from_timestamp(self.inner.timestamp, 0)
134            .expect("we converted this from a datetime in the host, it must be valid")
135    }
136
137    /// Returns the high watermark for the partition.
138    ///
139    /// The high watermark indicates the offset of the last message + 1 in the partition.
140    pub fn high_watermark(&self) -> i64 {
141        self.inner.high_watermark
142    }
143}
144
145impl From<wit::KafkaMessage> for KafkaMessage {
146    fn from(inner: wit::KafkaMessage) -> Self {
147        Self { inner }
148    }
149}
150
151/// Configuration for a Kafka consumer.
152///
153/// This struct contains settings that control how the consumer behaves,
154/// including batch sizes, wait times, client configuration, and starting offset.
155pub struct KafkaConsumerConfig {
156    /// Minimum number of messages to batch together before returning.
157    min_batch_size: Option<i32>,
158    /// Maximum number of messages to batch together.
159    max_batch_size: Option<i32>,
160    /// Maximum time to wait for messages before returning a partial batch.
161    max_wait_time: Option<Duration>,
162    /// Client configuration including partitions, TLS, and authentication.
163    client_config: wit::KafkaClientConfig,
164    /// The offset position to start consuming from.
165    start_offset: wit::KafkaConsumerStartOffset,
166}
167
168impl KafkaConsumerConfig {
169    /// Sets the minimum number of messages to batch together before returning.
170    pub fn min_batch_size(&mut self, min_batch_size: i32) {
171        self.min_batch_size = Some(min_batch_size);
172    }
173
174    /// Sets the maximum number of messages to batch together.
175    pub fn max_batch_size(&mut self, max_batch_size: i32) {
176        self.max_batch_size = Some(max_batch_size);
177    }
178
179    /// Sets the maximum time to wait for messages before returning a partial batch.
180    pub fn max_wait_time(&mut self, max_wait_time: Duration) {
181        self.max_wait_time = Some(max_wait_time);
182    }
183
184    /// Configures the consumer to start consuming from the latest offset.
185    ///
186    /// This means the consumer will only receive new messages that arrive after it starts.
187    pub fn from_latest_offset(&mut self) {
188        self.start_offset = wit::KafkaConsumerStartOffset::Latest;
189    }
190
191    /// Configures the consumer to start consuming from the earliest offset.
192    ///
193    /// This means the consumer will receive all available messages in the topic from the beginning.
194    pub fn from_earliest_offset(&mut self) {
195        self.start_offset = wit::KafkaConsumerStartOffset::Earliest;
196    }
197
198    /// Configures the consumer to start consuming from a specific offset.
199    pub fn from_specific_offset(&mut self, offset: i64) {
200        self.start_offset = wit::KafkaConsumerStartOffset::Specific(offset);
201    }
202
203    /// Sets the TLS configuration for secure connections to Kafka brokers.
204    pub fn tls(&mut self, tls: KafkaTlsConfig) {
205        self.client_config.tls = Some(tls.into());
206    }
207
208    /// Sets the authentication configuration for connecting to Kafka brokers.
209    pub fn authentication(&mut self, authentication: KafkaAuthentication) {
210        self.client_config.authentication = Some(authentication.into());
211    }
212
213    /// Sets the specific partitions to consume from.
214    ///
215    /// If not specified, the consumer will consume from all partitions of the topic.
216    pub fn partitions(&mut self, partitions: Vec<i32>) {
217        self.client_config.partitions = Some(partitions);
218    }
219}
220
221impl Default for KafkaConsumerConfig {
222    fn default() -> Self {
223        Self {
224            min_batch_size: None,
225            max_batch_size: None,
226            max_wait_time: None,
227            client_config: wit::KafkaClientConfig {
228                partitions: None,
229                tls: None,
230                authentication: None,
231            },
232            start_offset: wit::KafkaConsumerStartOffset::Latest,
233        }
234    }
235}
236
237impl From<KafkaConsumerConfig> for wit::KafkaConsumerConfig {
238    fn from(value: KafkaConsumerConfig) -> Self {
239        Self {
240            min_batch_size: value.min_batch_size,
241            max_batch_size: value.max_batch_size,
242            max_wait_ms: value.max_wait_time.map(|ms| ms.as_millis() as i32),
243            client_config: value.client_config,
244            start_offset: value.start_offset,
245        }
246    }
247}