1use std::{fmt::Debug, pin::Pin, str::FromStr, time::Duration};
2
3use itertools::Itertools;
4use serde::{Deserialize, Serialize};
5
6use crate::{
7 common::{RawData, RawMeta},
8 JsonMeta, RawBlock, RawResult,
9};
10
11#[derive(Debug, Clone, Copy)]
12pub enum Timeout {
13 Never,
15 None,
17 Duration(Duration),
19}
20
21impl Timeout {
22 pub fn from_secs(secs: u64) -> Self {
23 Self::Duration(Duration::from_secs(secs))
24 }
25
26 pub fn from_millis(millis: u64) -> Self {
27 Self::Duration(Duration::from_millis(millis))
28 }
29
30 pub fn never() -> Self {
31 Self::Never
32 }
33
34 pub fn none() -> Self {
35 Self::None
36 }
37 pub fn as_raw_timeout(&self) -> i64 {
38 match self {
39 Timeout::Never => -1,
40 Timeout::None => 0,
41 Timeout::Duration(t) => t.as_millis() as _,
42 }
43 }
44
45 pub fn as_duration(&self) -> Duration {
46 match self {
47 Timeout::Never => Duration::from_secs(i64::MAX as u64 / 1000),
48 Timeout::None => Duration::from_secs(0),
49 Timeout::Duration(t) => *t,
50 }
51 }
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum TimeoutError {
56 #[error("empty timeout value")]
57 Empty,
58 #[error("invalid timeout expression `{0}`: {1}")]
59 Invalid(String, String),
60}
61
62impl FromStr for Timeout {
63 type Err = TimeoutError;
64
65 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
66 if s.is_empty() {
67 return Err(TimeoutError::Empty);
68 }
69 match s.to_lowercase().as_str() {
70 "never" => Ok(Timeout::Never),
71 "none" => Ok(Timeout::None),
72 _ => parse_duration::parse(s)
73 .map(Timeout::Duration)
74 .map_err(|err| TimeoutError::Invalid(s.to_string(), err.to_string())),
75 }
76 }
77}
78
79pub enum MessageSet<M, D> {
80 Meta(M),
81 Data(D),
82 MetaData(M, D),
83}
84
85impl<M, D> Debug for MessageSet<M, D>
86where
87 M: Debug,
88 D: Debug,
89{
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 Self::Meta(m) => f.debug_tuple("Meta").field(m).finish(),
93 Self::Data(d) => f.debug_tuple("Data").field(d).finish(),
94 Self::MetaData(m, d) => f.debug_tuple("MetaData").field(m).field(d).finish(),
95 }
96 }
97}
98
99impl<M, D> MessageSet<M, D> {
100 pub fn into_meta(self) -> Option<M> {
101 match self {
102 MessageSet::Meta(m) => Some(m),
103 MessageSet::Data(_) => None,
104 MessageSet::MetaData(m, _) => Some(m),
105 }
106 }
107 pub fn into_data(self) -> Option<D> {
108 match self {
109 MessageSet::Meta(_) => None,
110 MessageSet::Data(d) => Some(d),
111 MessageSet::MetaData(_, d) => Some(d),
112 }
113 }
114
115 pub fn has_meta(&self) -> bool {
116 matches!(self, &MessageSet::Meta(_) | &MessageSet::MetaData(_, _))
117 }
118 pub fn has_data(&self) -> bool {
119 matches!(self, &MessageSet::Data(_) | &MessageSet::MetaData(_, _))
120 }
121
122 pub fn meta(&self) -> Option<&M> {
123 match self {
124 MessageSet::Meta(m) => Some(m),
125 MessageSet::Data(_) => None,
126 MessageSet::MetaData(m, _) => Some(m),
127 }
128 }
129 pub fn data(&mut self) -> Option<&mut D> {
130 match self {
131 MessageSet::Meta(_) => None,
132 MessageSet::Data(d) => Some(d),
133 MessageSet::MetaData(_, d) => Some(d),
134 }
135 }
136}
137
138#[async_trait::async_trait]
139pub trait IsAsyncMeta {
140 async fn as_raw_meta(&self) -> RawResult<RawMeta>;
141
142 async fn as_json_meta(&self) -> RawResult<JsonMeta>;
143}
144
145impl<T> IsMeta for T
146where
147 T: IsAsyncMeta + SyncOnAsync,
148{
149 fn as_raw_meta(&self) -> RawResult<RawMeta> {
150 crate::block_in_place_or_global(T::as_raw_meta(self))
151 }
152
153 fn as_json_meta(&self) -> RawResult<JsonMeta> {
154 crate::block_in_place_or_global(T::as_json_meta(self))
155 }
156}
157
158#[async_trait::async_trait]
159impl<T> IsAsyncMeta for T
160where
161 T: IsMeta + AsyncOnSync + Send + Sync,
162{
163 async fn as_raw_meta(&self) -> RawResult<RawMeta> {
164 <T as IsMeta>::as_raw_meta(self)
165 }
166
167 async fn as_json_meta(&self) -> RawResult<JsonMeta> {
168 <T as IsMeta>::as_json_meta(self)
169 }
170}
171
172pub trait IsMeta {
173 fn as_raw_meta(&self) -> RawResult<RawMeta>;
174
175 fn as_json_meta(&self) -> RawResult<JsonMeta>;
176}
177
178#[async_trait::async_trait]
179pub trait IsAsyncData {
180 async fn as_raw_data(&self) -> RawResult<RawData>;
181 async fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>>;
182}
183
184pub trait IsData {
185 fn as_raw_data(&self) -> RawResult<RawData>;
186 fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>>;
187}
188
189#[async_trait::async_trait]
190pub trait AsyncMessage {
191 fn has_meta(&self) -> bool;
193 fn has_data(&self) -> bool;
195
196 async fn as_raw_data(&self) -> RawResult<RawData>;
198
199 async fn get_meta(&self) -> RawResult<Option<RawMeta>>;
201 async fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>>;
202}
203
204pub type VGroupId = i32;
205
206pub trait IsOffset {
208 fn database(&self) -> &str;
210
211 fn topic(&self) -> &str;
213
214 fn vgroup_id(&self) -> VGroupId;
216}
217
218#[repr(C)]
219#[derive(Debug, Default, Copy, Clone, Deserialize, Serialize)]
220pub struct Assignment {
221 vgroup_id: VGroupId,
222 offset: i64,
223 begin: i64,
224 end: i64,
225}
226
227impl Assignment {
228 pub fn new(vgroup_id: VGroupId, offset: i64, begin: i64, end: i64) -> Self {
229 Self {
230 vgroup_id,
231 offset,
232 begin,
233 end,
234 }
235 }
236
237 pub fn vgroup_id(&self) -> VGroupId {
238 self.vgroup_id
239 }
240
241 pub fn current_offset(&self) -> i64 {
242 self.offset
243 }
244
245 pub fn begin(&self) -> i64 {
246 self.begin
247 }
248
249 pub fn end(&self) -> i64 {
250 self.end
251 }
252}
253
254pub trait AsConsumer: Sized {
255 type Offset: IsOffset;
256 type Meta: IsMeta;
257 type Data: IntoIterator<Item = RawResult<RawBlock>>;
258
259 fn default_timeout(&self) -> Timeout {
261 Timeout::Never
262 }
263
264 fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
265 &mut self,
266 topics: I,
267 ) -> RawResult<()>;
268
269 fn recv_timeout(
271 &self,
272 timeout: Timeout,
273 ) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>>;
274
275 fn recv(&self) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>> {
276 self.recv_timeout(self.default_timeout())
277 }
278
279 fn iter_data_only(
280 &self,
281 timeout: Timeout,
282 ) -> Box<dyn '_ + Iterator<Item = RawResult<(Self::Offset, Self::Data)>>> {
283 Box::new(
284 self.iter_with_timeout(timeout)
285 .filter_map_ok(|m| m.1.into_data().map(|data| (m.0, data))),
286 )
287 }
288
289 fn iter_with_timeout(&self, timeout: Timeout) -> MessageSetsIter<'_, Self> {
290 MessageSetsIter {
291 consumer: self,
292 timeout,
293 }
294 }
295
296 fn iter(&self) -> MessageSetsIter<'_, Self> {
297 self.iter_with_timeout(self.default_timeout())
298 }
299
300 fn commit(&self, offset: Self::Offset) -> RawResult<()>;
301
302 fn commit_offset(&self, topic_name: &str, vgroup_id: VGroupId, offset: i64) -> RawResult<()>;
303
304 fn unsubscribe(self) {
305 drop(self)
306 }
307
308 fn list_topics(&self) -> RawResult<Vec<String>>;
309
310 fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>>;
311
312 fn offset_seek(&mut self, topic: &str, vg_id: VGroupId, offset: i64) -> RawResult<()>;
313
314 fn committed(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64>;
315
316 fn position(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64>;
317}
318
319pub struct MessageSetsIter<'a, C> {
320 consumer: &'a C,
321 timeout: Timeout,
322}
323
324impl<'a, C> Iterator for MessageSetsIter<'a, C>
325where
326 C: AsConsumer,
327{
328 type Item = RawResult<(C::Offset, MessageSet<C::Meta, C::Data>)>;
329
330 fn next(&mut self) -> Option<Self::Item> {
331 self.consumer.recv_timeout(self.timeout).transpose()
332 }
333}
334
335#[async_trait::async_trait]
336pub trait AsAsyncConsumer: Sized + Send + Sync {
337 type Offset: IsOffset;
338 type Meta: IsAsyncMeta;
339 type Data: IsAsyncData;
340
341 fn default_timeout(&self) -> Timeout;
342
343 async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
344 &mut self,
345 topics: I,
346 ) -> RawResult<()>;
347
348 async fn recv_timeout(
350 &self,
351 timeout: Timeout,
352 ) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>>;
353
354 fn stream_with_timeout(
355 &self,
356 timeout: Timeout,
357 ) -> Pin<
358 Box<
359 dyn '_
360 + Send
361 + futures::Stream<
362 Item = RawResult<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>,
363 >,
364 >,
365 > {
366 Box::pin(futures::stream::unfold((), move |_| async move {
367 let weather = self.recv_timeout(timeout).await.transpose();
368 weather.map(|res| (res, ()))
369 }))
370 }
371
372 fn stream(
373 &self,
374 ) -> Pin<
375 Box<
376 dyn '_
377 + Send
378 + futures::Stream<
379 Item = RawResult<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>,
380 >,
381 >,
382 > {
383 self.stream_with_timeout(self.default_timeout())
384 }
385
386 async fn commit(&self, offset: Self::Offset) -> RawResult<()>;
387
388 async fn commit_offset(
389 &self,
390 topic_name: &str,
391 vgroup_id: VGroupId,
392 offset: i64,
393 ) -> RawResult<()>;
394
395 async fn unsubscribe(self) {
396 drop(self)
397 }
398
399 async fn list_topics(&self) -> RawResult<Vec<String>>;
400
401 async fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>>;
402
403 async fn topic_assignment(&self, topic: &str) -> Vec<Assignment>;
404
405 async fn offset_seek(&mut self, topic: &str, vgroup_id: VGroupId, offset: i64)
406 -> RawResult<()>;
407
408 async fn committed(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64>;
409
410 async fn position(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64>;
411}
412
413pub trait SyncOnAsync {}
415
416pub trait AsyncOnSync {}
417
418impl<C> AsConsumer for C
419where
420 C: AsAsyncConsumer + SyncOnAsync,
421 C::Meta: IsMeta,
422 C::Data: IntoIterator<Item = RawResult<RawBlock>>,
423{
424 type Offset = C::Offset;
425
426 type Meta = C::Meta;
427
428 type Data = C::Data;
429
430 fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
431 &mut self,
432 topics: I,
433 ) -> RawResult<()> {
434 crate::block_in_place_or_global(<C as AsAsyncConsumer>::subscribe(self, topics))
435 }
436
437 fn recv_timeout(
438 &self,
439 timeout: Timeout,
440 ) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>> {
441 crate::block_in_place_or_global(<C as AsAsyncConsumer>::recv_timeout(self, timeout))
442 }
443
444 fn commit(&self, offset: Self::Offset) -> RawResult<()> {
445 crate::block_in_place_or_global(<C as AsAsyncConsumer>::commit(self, offset))
446 }
447
448 fn commit_offset(&self, topic_name: &str, vgroup_id: VGroupId, offset: i64) -> RawResult<()> {
449 crate::block_in_place_or_global(<C as AsAsyncConsumer>::commit_offset(
450 self, topic_name, vgroup_id, offset,
451 ))
452 }
453
454 fn list_topics(&self) -> RawResult<Vec<String>> {
455 crate::block_in_place_or_global(<C as AsAsyncConsumer>::list_topics(self))
456 }
457
458 fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>> {
459 crate::block_in_place_or_global(<C as AsAsyncConsumer>::assignments(self))
460 }
461
462 fn offset_seek(&mut self, topic: &str, vg_id: VGroupId, offset: i64) -> RawResult<()> {
463 crate::block_in_place_or_global(<C as AsAsyncConsumer>::offset_seek(
464 self, topic, vg_id, offset,
465 ))
466 }
467
468 fn committed(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
469 crate::block_in_place_or_global(<C as AsAsyncConsumer>::committed(self, topic, vgroup_id))
470 }
471
472 fn position(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
473 crate::block_in_place_or_global(<C as AsAsyncConsumer>::position(self, topic, vgroup_id))
474 }
475}
476
477