use std::{borrow::Cow, time::Duration};
use chrono::{DateTime, Utc};
use crate::{
SdkError, Subscription,
types::{Error, Response, SubscriptionItem},
wit,
};
use super::{KafkaAuthentication, KafkaTlsConfig};
pub struct KafkaConsumer {
pub(super) inner: wit::KafkaConsumer,
}
impl KafkaConsumer {
pub fn next(&self) -> Result<Option<KafkaMessage>, SdkError> {
self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
}
}
impl Subscription for KafkaConsumer {
fn next(&mut self) -> Result<Option<SubscriptionItem>, Error> {
match KafkaConsumer::next(self) {
Ok(Some(msg)) => Ok(Some(
Response::json(msg.inner.value.unwrap_or_else(|| b"null".into())).into(),
)),
Ok(None) => Ok(None),
Err(err) => Err(format!("Error receiving Kafka message: {err}").into()),
}
}
}
pub struct KafkaMessage {
inner: wit::KafkaMessage,
}
impl KafkaMessage {
pub fn key(&self) -> Option<Cow<'_, str>> {
self.raw_key().map(|key| String::from_utf8_lossy(key))
}
pub fn raw_key(&self) -> Option<&[u8]> {
self.inner.key.as_deref()
}
pub fn value<S>(&self) -> Result<Option<S>, SdkError>
where
S: for<'de> serde::Deserialize<'de>,
{
match self.raw_value() {
Some(value) => serde_json::from_slice(value).map_err(Into::into),
None => Ok(None),
}
}
pub fn raw_value(&self) -> Option<&[u8]> {
self.inner.value.as_deref()
}
pub fn into_raw_value(self) -> Option<Vec<u8>> {
self.inner.value
}
pub fn offset(&self) -> i64 {
self.inner.offset
}
pub fn get_header_value<S>(&self, key: &str) -> Result<Option<S>, SdkError>
where
S: for<'de> serde::Deserialize<'de>,
{
match self.get_raw_header_value(key) {
Some(value) => {
let value = serde_json::from_slice(value)?;
Ok(Some(value))
}
None => Ok(None),
}
}
pub fn get_raw_header_value(&self, key: &str) -> Option<&[u8]> {
match self.inner.headers.binary_search_by(|item| item.0.as_str().cmp(key)) {
Ok(index) => Some(self.inner.headers[index].1.as_ref()),
Err(_) => None,
}
}
pub fn timestamp(&self) -> DateTime<Utc> {
DateTime::from_timestamp(self.inner.timestamp, 0)
.expect("we converted this from a datetime in the host, it must be valid")
}
pub fn high_watermark(&self) -> i64 {
self.inner.high_watermark
}
}
impl From<wit::KafkaMessage> for KafkaMessage {
fn from(inner: wit::KafkaMessage) -> Self {
Self { inner }
}
}
pub struct KafkaConsumerConfig {
min_batch_size: Option<i32>,
max_batch_size: Option<i32>,
max_wait_time: Option<Duration>,
client_config: wit::KafkaClientConfig,
start_offset: wit::KafkaConsumerStartOffset,
}
impl KafkaConsumerConfig {
pub fn min_batch_size(&mut self, min_batch_size: i32) {
self.min_batch_size = Some(min_batch_size);
}
pub fn max_batch_size(&mut self, max_batch_size: i32) {
self.max_batch_size = Some(max_batch_size);
}
pub fn max_wait_time(&mut self, max_wait_time: Duration) {
self.max_wait_time = Some(max_wait_time);
}
pub fn from_latest_offset(&mut self) {
self.start_offset = wit::KafkaConsumerStartOffset::Latest;
}
pub fn from_earliest_offset(&mut self) {
self.start_offset = wit::KafkaConsumerStartOffset::Earliest;
}
pub fn from_specific_offset(&mut self, offset: i64) {
self.start_offset = wit::KafkaConsumerStartOffset::Specific(offset);
}
pub fn tls(&mut self, tls: KafkaTlsConfig) {
self.client_config.tls = Some(tls.into());
}
pub fn authentication(&mut self, authentication: KafkaAuthentication) {
self.client_config.authentication = Some(authentication.into());
}
pub fn partitions(&mut self, partitions: Vec<i32>) {
self.client_config.partitions = Some(partitions);
}
}
impl Default for KafkaConsumerConfig {
fn default() -> Self {
Self {
min_batch_size: None,
max_batch_size: None,
max_wait_time: None,
client_config: wit::KafkaClientConfig {
partitions: None,
tls: None,
authentication: None,
},
start_offset: wit::KafkaConsumerStartOffset::Latest,
}
}
}
impl From<KafkaConsumerConfig> for wit::KafkaConsumerConfig {
fn from(value: KafkaConsumerConfig) -> Self {
Self {
min_batch_size: value.min_batch_size,
max_batch_size: value.max_batch_size,
max_wait_ms: value.max_wait_time.map(|ms| ms.as_millis() as i32),
client_config: value.client_config,
start_offset: value.start_offset,
}
}
}