grafbase_sdk/host_io/kafka/
consumer.rs

1use std::{borrow::Cow, time::Duration};
2
3use chrono::{DateTime, Utc};
4
5use crate::{
6    SdkError, Subscription,
7    types::{Error, Response, SubscriptionItem},
8    wit,
9};
10
11use super::{KafkaAuthentication, KafkaTlsConfig};
12
13/// A Kafka consumer that can receive messages from Kafka topics.
14pub struct KafkaConsumer {
15    pub(super) inner: wit::KafkaConsumer,
16}
17
18impl KafkaConsumer {
19    /// Retrieves the next message from the Kafka consumer.
20    ///
21    /// Returns `Ok(Some(message))` if a message is available,
22    /// `Ok(None)` if no message is currently available,
23    /// or `Err` if an error occurred while consuming.
24    pub fn next(&self) -> Result<Option<KafkaMessage>, SdkError> {
25        self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
26    }
27}
28
29impl Subscription for KafkaConsumer {
30    fn next(&mut self) -> Result<Option<SubscriptionItem>, Error> {
31        match KafkaConsumer::next(self) {
32            Ok(Some(msg)) => Ok(Some(
33                Response::json(msg.inner.value.unwrap_or_else(|| b"null".into())).into(),
34            )),
35            Ok(None) => Ok(None),
36            Err(err) => Err(format!("Error receiving Kafka message: {err}").into()),
37        }
38    }
39}
40
41/// A Kafka message containing key, value, headers, and metadata.
42pub struct KafkaMessage {
43    inner: wit::KafkaMessage,
44}
45
46impl KafkaMessage {
47    /// Returns the message key as a UTF-8 string, if present.
48    ///
49    /// The key is used for partitioning and message ordering within a partition.
50    pub fn key(&self) -> Option<Cow<'_, str>> {
51        self.raw_key().map(|key| String::from_utf8_lossy(key))
52    }
53
54    /// Returns the raw message key as bytes, if present.
55    ///
56    /// This provides access to the unprocessed message key without UTF-8 conversion.
57    pub fn raw_key(&self) -> Option<&[u8]> {
58        self.inner.key.as_deref()
59    }
60
61    /// Deserializes the message value from JSON.
62    pub fn value<S>(&self) -> Result<Option<S>, SdkError>
63    where
64        S: for<'de> serde::Deserialize<'de>,
65    {
66        match self.raw_value() {
67            Some(value) => serde_json::from_slice(value).map_err(Into::into),
68            None => Ok(None),
69        }
70    }
71
72    /// Returns the raw message value as bytes, if present.
73    ///
74    /// This provides access to the unprocessed message value without any JSON deserialization.
75    pub fn raw_value(&self) -> Option<&[u8]> {
76        self.inner.value.as_deref()
77    }
78
79    /// Consumes the message and returns the raw value as a `Vec<u8>`, if present.
80    pub fn into_raw_value(self) -> Option<Vec<u8>> {
81        self.inner.value
82    }
83
84    /// Returns the message offset within the partition.
85    ///
86    /// The offset is a unique identifier for the message within its partition.
87    pub fn offset(&self) -> i64 {
88        self.inner.offset
89    }
90
91    /// Gets a header value by key and deserializes it from JSON.
92    ///
93    /// Returns `Ok(Some(value))` if the header exists and can be deserialized,
94    /// `Ok(None)` if the header doesn't exist,
95    /// or `Err` if deserialization fails.
96    pub fn get_header_value<S>(&self, key: &str) -> Result<Option<S>, SdkError>
97    where
98        S: for<'de> serde::Deserialize<'de>,
99    {
100        match self.get_raw_header_value(key) {
101            Some(value) => {
102                let value = serde_json::from_slice(value)?;
103                Ok(Some(value))
104            }
105            None => Ok(None),
106        }
107    }
108
109    /// Gets a raw header value by key as bytes.
110    pub fn get_raw_header_value(&self, key: &str) -> Option<&[u8]> {
111        // The kafka headers come as btree, which means they are sorted by the key.
112        // This means we can use binary search to find the header value.
113        match self.inner.headers.binary_search_by(|item| item.0.as_str().cmp(key)) {
114            Ok(index) => Some(self.inner.headers[index].1.as_ref()),
115            Err(_) => None,
116        }
117    }
118
119    /// Returns the timestamp when the message was produced.
120    ///
121    /// The timestamp is in UTC and represents when the message was created.
122    pub fn timestamp(&self) -> DateTime<Utc> {
123        DateTime::from_timestamp(self.inner.timestamp, 0)
124            .expect("we converted this from a datetime in the host, it must be valid")
125    }
126
127    /// Returns the high watermark for the partition.
128    ///
129    /// The high watermark indicates the offset of the last message + 1 in the partition.
130    pub fn high_watermark(&self) -> i64 {
131        self.inner.high_watermark
132    }
133}
134
135impl From<wit::KafkaMessage> for KafkaMessage {
136    fn from(inner: wit::KafkaMessage) -> Self {
137        Self { inner }
138    }
139}
140
141/// Configuration for a Kafka consumer.
142///
143/// This struct contains settings that control how the consumer behaves,
144/// including batch sizes, wait times, client configuration, and starting offset.
145pub struct KafkaConsumerConfig {
146    /// Minimum number of messages to batch together before returning.
147    min_batch_size: Option<i32>,
148    /// Maximum number of messages to batch together.
149    max_batch_size: Option<i32>,
150    /// Maximum time to wait for messages before returning a partial batch.
151    max_wait_time: Option<Duration>,
152    /// Client configuration including partitions, TLS, and authentication.
153    client_config: wit::KafkaClientConfig,
154    /// The offset position to start consuming from.
155    start_offset: wit::KafkaConsumerStartOffset,
156}
157
158impl KafkaConsumerConfig {
159    /// Sets the minimum number of messages to batch together before returning.
160    pub fn min_batch_size(&mut self, min_batch_size: i32) {
161        self.min_batch_size = Some(min_batch_size);
162    }
163
164    /// Sets the maximum number of messages to batch together.
165    pub fn max_batch_size(&mut self, max_batch_size: i32) {
166        self.max_batch_size = Some(max_batch_size);
167    }
168
169    /// Sets the maximum time to wait for messages before returning a partial batch.
170    pub fn max_wait_time(&mut self, max_wait_time: Duration) {
171        self.max_wait_time = Some(max_wait_time);
172    }
173
174    /// Configures the consumer to start consuming from the latest offset.
175    ///
176    /// This means the consumer will only receive new messages that arrive after it starts.
177    pub fn from_latest_offset(&mut self) {
178        self.start_offset = wit::KafkaConsumerStartOffset::Latest;
179    }
180
181    /// Configures the consumer to start consuming from the earliest offset.
182    ///
183    /// This means the consumer will receive all available messages in the topic from the beginning.
184    pub fn from_earliest_offset(&mut self) {
185        self.start_offset = wit::KafkaConsumerStartOffset::Earliest;
186    }
187
188    /// Configures the consumer to start consuming from a specific offset.
189    pub fn from_specific_offset(&mut self, offset: i64) {
190        self.start_offset = wit::KafkaConsumerStartOffset::Specific(offset);
191    }
192
193    /// Sets the TLS configuration for secure connections to Kafka brokers.
194    pub fn tls(&mut self, tls: KafkaTlsConfig) {
195        self.client_config.tls = Some(tls.into());
196    }
197
198    /// Sets the authentication configuration for connecting to Kafka brokers.
199    pub fn authentication(&mut self, authentication: KafkaAuthentication) {
200        self.client_config.authentication = Some(authentication.into());
201    }
202
203    /// Sets the specific partitions to consume from.
204    ///
205    /// If not specified, the consumer will consume from all partitions of the topic.
206    pub fn partitions(&mut self, partitions: Vec<i32>) {
207        self.client_config.partitions = Some(partitions);
208    }
209}
210
211impl Default for KafkaConsumerConfig {
212    fn default() -> Self {
213        Self {
214            min_batch_size: None,
215            max_batch_size: None,
216            max_wait_time: None,
217            client_config: wit::KafkaClientConfig {
218                partitions: None,
219                tls: None,
220                authentication: None,
221            },
222            start_offset: wit::KafkaConsumerStartOffset::Latest,
223        }
224    }
225}
226
227impl From<KafkaConsumerConfig> for wit::KafkaConsumerConfig {
228    fn from(value: KafkaConsumerConfig) -> Self {
229        Self {
230            min_batch_size: value.min_batch_size,
231            max_batch_size: value.max_batch_size,
232            max_wait_ms: value.max_wait_time.map(|ms| ms.as_millis() as i32),
233            client_config: value.client_config,
234            start_offset: value.start_offset,
235        }
236    }
237}