1use std::{pin::Pin, time::Duration};
10
11use async_trait::async_trait;
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Deserializer, Serialize};
14use serde_with::{base64::Base64, serde_as};
15use smol_str::SmolStr;
16use tokio_stream::Stream;
17
18pub type Offset = u64;
21
22pub type Key = u64;
26
27pub type Partition = u32;
30
31pub type Topic = SmolStr;
36
37pub type HeaderKey = SmolStr;
39
40#[serde_as]
43#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
44pub struct Header {
45 pub key: HeaderKey,
46 #[serde_as(as = "Base64")]
47 pub value: Vec<u8>,
48}
49
50#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
52pub struct ConsumerOffset {
53 pub topic: Topic,
54 pub partition: Partition,
55 pub offset: Offset,
56}
57
58#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
60pub struct Subscription {
61 pub topic: Topic,
62}
63
64#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
72pub struct Consumer {
73 #[serde(deserialize_with = "nullable_vec")]
74 pub offsets: Vec<ConsumerOffset>,
75 pub subscriptions: Vec<Subscription>,
76}
77
78#[serde_as]
80#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
81pub struct ConsumerRecord {
82 pub topic: Topic,
83 #[serde(deserialize_with = "nullable_vec")]
84 pub headers: Vec<Header>,
85 pub timestamp: Option<DateTime<Utc>>,
86 pub key: Key,
87 #[serde_as(as = "Base64")]
88 pub value: Vec<u8>,
89 pub partition: Partition,
90 pub offset: Offset,
91}
92
93#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
95pub struct PartitionOffsets {
96 pub beginning_offset: Offset,
97 pub end_offset: Offset,
98}
99
100#[serde_as]
102#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
103pub struct ProducerRecord {
104 pub topic: Topic,
105 #[serde(deserialize_with = "nullable_vec")]
106 pub headers: Vec<Header>,
107 pub timestamp: Option<DateTime<Utc>>,
108 pub key: Key,
109 #[serde_as(as = "Base64")]
110 pub value: Vec<u8>,
111 pub partition: Partition,
112}
113
114#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
116pub struct ProducedOffset {
117 pub offset: Offset,
118}
119
120#[derive(Clone, Debug, Eq, PartialEq)]
122pub enum ProducerError {
123 CannotProduce,
125}
126
127#[async_trait]
130pub trait CommitLog: Clone + Send + Sync {
131 async fn offsets(&self, topic: Topic, partition: Partition) -> Option<PartitionOffsets>;
133
134 async fn produce(&self, record: ProducerRecord) -> Result<ProducedOffset, ProducerError>;
136
137 fn scoped_subscribe<'a>(
143 &'a self,
144 consumer_group_name: &str,
145 offsets: Vec<ConsumerOffset>,
146 subscriptions: Vec<Subscription>,
147 idle_timeout: Option<Duration>,
148 ) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>>;
149}
150
151fn nullable_vec<'de, D, T>(d: D) -> Result<Vec<T>, D::Error>
152where
153 D: Deserializer<'de>,
154 T: Deserialize<'de>,
155{
156 Deserialize::deserialize(d).map(|x: Option<_>| x.unwrap_or_default())
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162
163 #[test]
164 fn test_nullable_vec_handles_null() {
165 let json = r#"
166 {
167 "offsets": null,
168 "subscriptions": []
169 }
170 "#;
171 assert_eq!(
172 serde_json::from_str::<Consumer>(json).unwrap(),
173 Consumer {
174 offsets: vec![],
175 subscriptions: vec![]
176 }
177 );
178 }
179
180 #[test]
181 fn test_nullable_vec_handles_empty_vec() {
182 let json = r#"
183 {
184 "offsets": [],
185 "subscriptions": []
186 }
187 "#;
188 assert_eq!(
189 serde_json::from_str::<Consumer>(json).unwrap(),
190 Consumer {
191 offsets: vec![],
192 subscriptions: vec![]
193 }
194 );
195 }
196
197 #[test]
198 fn test_nullable_vec_handles_vec() {
199 let json = r#"
200 {
201 "offsets": [{"topic": "topic", "partition": 0, "offset": 0}],
202 "subscriptions": []
203 }
204 "#;
205 assert_eq!(
206 serde_json::from_str::<Consumer>(json).unwrap(),
207 Consumer {
208 offsets: vec![ConsumerOffset {
209 topic: Topic::from("topic"),
210 partition: 0,
211 offset: 0
212 }],
213 subscriptions: vec![]
214 }
215 );
216 }
217}