use std::{pin::Pin, time::Duration};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Deserializer, Serialize};
use serde_with::{base64::Base64, serde_as};
use smol_str::SmolStr;
use tokio_stream::Stream;
pub type Offset = u64;
pub type Key = u64;
pub type Partition = u32;
pub type Topic = SmolStr;
pub type HeaderKey = SmolStr;
#[serde_as]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct Header {
pub key: HeaderKey,
#[serde_as(as = "Base64")]
pub value: Vec<u8>,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct ConsumerOffset {
pub topic: Topic,
pub partition: Partition,
pub offset: Offset,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct Subscription {
pub topic: Topic,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct Consumer {
#[serde(deserialize_with = "nullable_vec")]
pub offsets: Vec<ConsumerOffset>,
pub subscriptions: Vec<Subscription>,
}
#[serde_as]
#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
pub struct ConsumerRecord {
pub topic: Topic,
#[serde(deserialize_with = "nullable_vec")]
pub headers: Vec<Header>,
pub timestamp: Option<DateTime<Utc>>,
pub key: Key,
#[serde_as(as = "Base64")]
pub value: Vec<u8>,
pub partition: Partition,
pub offset: Offset,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct PartitionOffsets {
pub beginning_offset: Offset,
pub end_offset: Offset,
}
#[serde_as]
#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
pub struct ProducerRecord {
pub topic: Topic,
#[serde(deserialize_with = "nullable_vec")]
pub headers: Vec<Header>,
pub timestamp: Option<DateTime<Utc>>,
pub key: Key,
#[serde_as(as = "Base64")]
pub value: Vec<u8>,
pub partition: Partition,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct ProducedOffset {
pub offset: Offset,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ProducerError {
CannotProduce,
}
#[async_trait]
pub trait CommitLog: Clone + Send + Sync {
async fn offsets(&self, topic: Topic, partition: Partition) -> Option<PartitionOffsets>;
async fn produce(&self, record: ProducerRecord) -> Result<ProducedOffset, ProducerError>;
fn scoped_subscribe<'a>(
&'a self,
consumer_group_name: &str,
offsets: Vec<ConsumerOffset>,
subscriptions: Vec<Subscription>,
idle_timeout: Option<Duration>,
) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>>;
}
fn nullable_vec<'de, D, T>(d: D) -> Result<Vec<T>, D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de>,
{
Deserialize::deserialize(d).map(|x: Option<_>| x.unwrap_or_default())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_nullable_vec_handles_null() {
let json = r#"
{
"offsets": null,
"subscriptions": []
}
"#;
assert_eq!(
serde_json::from_str::<Consumer>(json).unwrap(),
Consumer {
offsets: vec![],
subscriptions: vec![]
}
);
}
#[test]
fn test_nullable_vec_handles_empty_vec() {
let json = r#"
{
"offsets": [],
"subscriptions": []
}
"#;
assert_eq!(
serde_json::from_str::<Consumer>(json).unwrap(),
Consumer {
offsets: vec![],
subscriptions: vec![]
}
);
}
#[test]
fn test_nullable_vec_handles_vec() {
let json = r#"
{
"offsets": [{"topic": "topic", "partition": 0, "offset": 0}],
"subscriptions": []
}
"#;
assert_eq!(
serde_json::from_str::<Consumer>(json).unwrap(),
Consumer {
offsets: vec![ConsumerOffset {
topic: Topic::from("topic"),
partition: 0,
offset: 0
}],
subscriptions: vec![]
}
);
}
}