taos_query/tmq/
mod.rs

1use std::{fmt::Debug, pin::Pin, str::FromStr, time::Duration};
2
3use itertools::Itertools;
4use serde::{Deserialize, Serialize};
5
6use crate::{
7    common::{RawData, RawMeta},
8    JsonMeta, RawBlock, RawResult,
9};
10
11#[derive(Debug, Clone, Copy)]
12pub enum Timeout {
13    /// Wait forever.
14    Never,
15    /// Try not block, will directly return when set timeout as `None`.
16    None,
17    /// Wait for a duration of time.
18    Duration(Duration),
19}
20
21impl Timeout {
22    pub fn from_secs(secs: u64) -> Self {
23        Self::Duration(Duration::from_secs(secs))
24    }
25
26    pub fn from_millis(millis: u64) -> Self {
27        Self::Duration(Duration::from_millis(millis))
28    }
29
30    pub fn never() -> Self {
31        Self::Never
32    }
33
34    pub fn none() -> Self {
35        Self::None
36    }
37    pub fn as_raw_timeout(&self) -> i64 {
38        match self {
39            Timeout::Never => -1,
40            Timeout::None => 0,
41            Timeout::Duration(t) => t.as_millis() as _,
42        }
43    }
44
45    pub fn as_duration(&self) -> Duration {
46        match self {
47            Timeout::Never => Duration::from_secs(i64::MAX as u64 / 1000),
48            Timeout::None => Duration::from_secs(0),
49            Timeout::Duration(t) => *t,
50        }
51    }
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum TimeoutError {
56    #[error("empty timeout value")]
57    Empty,
58    #[error("invalid timeout expression `{0}`: {1}")]
59    Invalid(String, String),
60}
61
62impl FromStr for Timeout {
63    type Err = TimeoutError;
64
65    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
66        if s.is_empty() {
67            return Err(TimeoutError::Empty);
68        }
69        match s.to_lowercase().as_str() {
70            "never" => Ok(Timeout::Never),
71            "none" => Ok(Timeout::None),
72            _ => parse_duration::parse(s)
73                .map(Timeout::Duration)
74                .map_err(|err| TimeoutError::Invalid(s.to_string(), err.to_string())),
75        }
76    }
77}
78
79pub enum MessageSet<M, D> {
80    Meta(M),
81    Data(D),
82    MetaData(M, D),
83}
84
85impl<M, D> Debug for MessageSet<M, D>
86where
87    M: Debug,
88    D: Debug,
89{
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            Self::Meta(m) => f.debug_tuple("Meta").field(m).finish(),
93            Self::Data(d) => f.debug_tuple("Data").field(d).finish(),
94            Self::MetaData(m, d) => f.debug_tuple("MetaData").field(m).field(d).finish(),
95        }
96    }
97}
98
99impl<M, D> MessageSet<M, D> {
100    pub fn into_meta(self) -> Option<M> {
101        match self {
102            MessageSet::Meta(m) => Some(m),
103            MessageSet::Data(_) => None,
104            MessageSet::MetaData(m, _) => Some(m),
105        }
106    }
107    pub fn into_data(self) -> Option<D> {
108        match self {
109            MessageSet::Meta(_) => None,
110            MessageSet::Data(d) => Some(d),
111            MessageSet::MetaData(_, d) => Some(d),
112        }
113    }
114
115    pub fn has_meta(&self) -> bool {
116        matches!(self, &MessageSet::Meta(_) | &MessageSet::MetaData(_, _))
117    }
118    pub fn has_data(&self) -> bool {
119        matches!(self, &MessageSet::Data(_) | &MessageSet::MetaData(_, _))
120    }
121
122    pub fn meta(&self) -> Option<&M> {
123        match self {
124            MessageSet::Meta(m) => Some(m),
125            MessageSet::Data(_) => None,
126            MessageSet::MetaData(m, _) => Some(m),
127        }
128    }
129    pub fn data(&mut self) -> Option<&mut D> {
130        match self {
131            MessageSet::Meta(_) => None,
132            MessageSet::Data(d) => Some(d),
133            MessageSet::MetaData(_, d) => Some(d),
134        }
135    }
136}
137
138#[async_trait::async_trait]
139pub trait IsAsyncMeta {
140    async fn as_raw_meta(&self) -> RawResult<RawMeta>;
141
142    async fn as_json_meta(&self) -> RawResult<JsonMeta>;
143}
144
145impl<T> IsMeta for T
146where
147    T: IsAsyncMeta + SyncOnAsync,
148{
149    fn as_raw_meta(&self) -> RawResult<RawMeta> {
150        crate::block_in_place_or_global(T::as_raw_meta(self))
151    }
152
153    fn as_json_meta(&self) -> RawResult<JsonMeta> {
154        crate::block_in_place_or_global(T::as_json_meta(self))
155    }
156}
157
158#[async_trait::async_trait]
159impl<T> IsAsyncMeta for T
160where
161    T: IsMeta + AsyncOnSync + Send + Sync,
162{
163    async fn as_raw_meta(&self) -> RawResult<RawMeta> {
164        <T as IsMeta>::as_raw_meta(self)
165    }
166
167    async fn as_json_meta(&self) -> RawResult<JsonMeta> {
168        <T as IsMeta>::as_json_meta(self)
169    }
170}
171
172pub trait IsMeta {
173    fn as_raw_meta(&self) -> RawResult<RawMeta>;
174
175    fn as_json_meta(&self) -> RawResult<JsonMeta>;
176}
177
178#[async_trait::async_trait]
179pub trait IsAsyncData {
180    async fn as_raw_data(&self) -> RawResult<RawData>;
181    async fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>>;
182}
183
184pub trait IsData {
185    fn as_raw_data(&self) -> RawResult<RawData>;
186    fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>>;
187}
188
189#[async_trait::async_trait]
190pub trait AsyncMessage {
191    /// Check if the message contains meta.
192    fn has_meta(&self) -> bool;
193    /// Check if the message contains data.
194    fn has_data(&self) -> bool;
195
196    /// Return raw data as bytes.
197    async fn as_raw_data(&self) -> RawResult<RawData>;
198
199    /// Extract meta message.
200    async fn get_meta(&self) -> RawResult<Option<RawMeta>>;
201    async fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>>;
202}
203
204pub type VGroupId = i32;
205
206/// Extract offset information.
207pub trait IsOffset {
208    /// Database name for current message
209    fn database(&self) -> &str;
210
211    /// Topic name for current message.
212    fn topic(&self) -> &str;
213
214    /// VGroup id for current message.
215    fn vgroup_id(&self) -> VGroupId;
216}
217
218#[repr(C)]
219#[derive(Debug, Default, Copy, Clone, Deserialize, Serialize)]
220pub struct Assignment {
221    vgroup_id: VGroupId,
222    offset: i64,
223    begin: i64,
224    end: i64,
225}
226
227impl Assignment {
228    pub fn new(vgroup_id: VGroupId, offset: i64, begin: i64, end: i64) -> Self {
229        Self {
230            vgroup_id,
231            offset,
232            begin,
233            end,
234        }
235    }
236
237    pub fn vgroup_id(&self) -> VGroupId {
238        self.vgroup_id
239    }
240
241    pub fn current_offset(&self) -> i64 {
242        self.offset
243    }
244
245    pub fn begin(&self) -> i64 {
246        self.begin
247    }
248
249    pub fn end(&self) -> i64 {
250        self.end
251    }
252}
253
254pub trait AsConsumer: Sized {
255    type Offset: IsOffset;
256    type Meta: IsMeta;
257    type Data: IntoIterator<Item = RawResult<RawBlock>>;
258
259    /// Default timeout getter for message stream.
260    fn default_timeout(&self) -> Timeout {
261        Timeout::Never
262    }
263
264    fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
265        &mut self,
266        topics: I,
267    ) -> RawResult<()>;
268
269    /// None means wait until next message come.
270    fn recv_timeout(
271        &self,
272        timeout: Timeout,
273    ) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>>;
274
275    fn recv(&self) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>> {
276        self.recv_timeout(self.default_timeout())
277    }
278
279    fn iter_data_only(
280        &self,
281        timeout: Timeout,
282    ) -> Box<dyn '_ + Iterator<Item = RawResult<(Self::Offset, Self::Data)>>> {
283        Box::new(
284            self.iter_with_timeout(timeout)
285                .filter_map_ok(|m| m.1.into_data().map(|data| (m.0, data))),
286        )
287    }
288
289    fn iter_with_timeout(&self, timeout: Timeout) -> MessageSetsIter<'_, Self> {
290        MessageSetsIter {
291            consumer: self,
292            timeout,
293        }
294    }
295
296    fn iter(&self) -> MessageSetsIter<'_, Self> {
297        self.iter_with_timeout(self.default_timeout())
298    }
299
300    fn commit(&self, offset: Self::Offset) -> RawResult<()>;
301
302    fn commit_offset(&self, topic_name: &str, vgroup_id: VGroupId, offset: i64) -> RawResult<()>;
303
304    fn unsubscribe(self) {
305        drop(self)
306    }
307
308    fn list_topics(&self) -> RawResult<Vec<String>>;
309
310    fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>>;
311
312    fn offset_seek(&mut self, topic: &str, vg_id: VGroupId, offset: i64) -> RawResult<()>;
313
314    fn committed(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64>;
315
316    fn position(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64>;
317}
318
319pub struct MessageSetsIter<'a, C> {
320    consumer: &'a C,
321    timeout: Timeout,
322}
323
324impl<'a, C> Iterator for MessageSetsIter<'a, C>
325where
326    C: AsConsumer,
327{
328    type Item = RawResult<(C::Offset, MessageSet<C::Meta, C::Data>)>;
329
330    fn next(&mut self) -> Option<Self::Item> {
331        self.consumer.recv_timeout(self.timeout).transpose()
332    }
333}
334
335#[async_trait::async_trait]
336pub trait AsAsyncConsumer: Sized + Send + Sync {
337    type Offset: IsOffset;
338    type Meta: IsAsyncMeta;
339    type Data: IsAsyncData;
340
341    fn default_timeout(&self) -> Timeout;
342
343    async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
344        &mut self,
345        topics: I,
346    ) -> RawResult<()>;
347
348    /// None means wait until next message come.
349    async fn recv_timeout(
350        &self,
351        timeout: Timeout,
352    ) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>>;
353
354    fn stream_with_timeout(
355        &self,
356        timeout: Timeout,
357    ) -> Pin<
358        Box<
359            dyn '_
360                + Send
361                + futures::Stream<
362                    Item = RawResult<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>,
363                >,
364        >,
365    > {
366        Box::pin(futures::stream::unfold((), move |_| async move {
367            let weather = self.recv_timeout(timeout).await.transpose();
368            weather.map(|res| (res, ()))
369        }))
370    }
371
372    fn stream(
373        &self,
374    ) -> Pin<
375        Box<
376            dyn '_
377                + Send
378                + futures::Stream<
379                    Item = RawResult<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>,
380                >,
381        >,
382    > {
383        self.stream_with_timeout(self.default_timeout())
384    }
385
386    async fn commit(&self, offset: Self::Offset) -> RawResult<()>;
387
388    async fn commit_offset(
389        &self,
390        topic_name: &str,
391        vgroup_id: VGroupId,
392        offset: i64,
393    ) -> RawResult<()>;
394
395    async fn unsubscribe(self) {
396        drop(self)
397    }
398
399    async fn list_topics(&self) -> RawResult<Vec<String>>;
400
401    async fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>>;
402
403    async fn topic_assignment(&self, topic: &str) -> Vec<Assignment>;
404
405    async fn offset_seek(&mut self, topic: &str, vgroup_id: VGroupId, offset: i64)
406        -> RawResult<()>;
407
408    async fn committed(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64>;
409
410    async fn position(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64>;
411}
412
413/// Marker trait to impl sync on async impl.
414pub trait SyncOnAsync {}
415
416pub trait AsyncOnSync {}
417
418impl<C> AsConsumer for C
419where
420    C: AsAsyncConsumer + SyncOnAsync,
421    C::Meta: IsMeta,
422    C::Data: IntoIterator<Item = RawResult<RawBlock>>,
423{
424    type Offset = C::Offset;
425
426    type Meta = C::Meta;
427
428    type Data = C::Data;
429
430    fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
431        &mut self,
432        topics: I,
433    ) -> RawResult<()> {
434        crate::block_in_place_or_global(<C as AsAsyncConsumer>::subscribe(self, topics))
435    }
436
437    fn recv_timeout(
438        &self,
439        timeout: Timeout,
440    ) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>> {
441        crate::block_in_place_or_global(<C as AsAsyncConsumer>::recv_timeout(self, timeout))
442    }
443
444    fn commit(&self, offset: Self::Offset) -> RawResult<()> {
445        crate::block_in_place_or_global(<C as AsAsyncConsumer>::commit(self, offset))
446    }
447
448    fn commit_offset(&self, topic_name: &str, vgroup_id: VGroupId, offset: i64) -> RawResult<()> {
449        crate::block_in_place_or_global(<C as AsAsyncConsumer>::commit_offset(
450            self, topic_name, vgroup_id, offset,
451        ))
452    }
453
454    fn list_topics(&self) -> RawResult<Vec<String>> {
455        crate::block_in_place_or_global(<C as AsAsyncConsumer>::list_topics(self))
456    }
457
458    fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>> {
459        crate::block_in_place_or_global(<C as AsAsyncConsumer>::assignments(self))
460    }
461
462    fn offset_seek(&mut self, topic: &str, vg_id: VGroupId, offset: i64) -> RawResult<()> {
463        crate::block_in_place_or_global(<C as AsAsyncConsumer>::offset_seek(
464            self, topic, vg_id, offset,
465        ))
466    }
467
468    fn committed(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
469        crate::block_in_place_or_global(<C as AsAsyncConsumer>::committed(self, topic, vgroup_id))
470    }
471
472    fn position(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
473        crate::block_in_place_or_global(<C as AsAsyncConsumer>::position(self, topic, vgroup_id))
474    }
475}
476
477// #[async_trait::async_trait]
478// impl<C> AsAsyncConsumer for C
479// where
480//     C: AsConsumer + AsyncOnSync + Send + Sync + 'static,
481//     C::Error: 'static + Sync + Send,
482//     C::Meta: IsAsyncMeta + Send,
483//     C::Offset: 'static + Sync + Send,
484//     C::Data: 'static + Send + Sync,
485// {
486//     type Error = C::Error;
487
488//     type Offset = C::Offset;
489
490//     type Meta = C::Meta;
491
492//     type Data = C::Data;
493
494//     async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
495//         &mut self,
496//         topics: I,
497//     ) -> Result<()> {
498//         <C as AsConsumer>::subscribe(self, topics)
499//     }
500
501//     async fn recv_timeout(
502//         &self,
503//         timeout: Timeout,
504//     ) -> Result<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>> {
505//         <C as AsConsumer>::recv_timeout(self, timeout)
506//     }
507
508//     async fn commit(&self, offset: Self::Offset) -> Result<()> {
509//         <C as AsConsumer>::commit(self, offset)
510//     }
511// }