taos_ws/consumer/
mod.rs

1//! TMQ consumer.
2//!
3use bytes::Bytes;
4use futures::{SinkExt, StreamExt};
5use itertools::Itertools;
6// use scc::HashMap;
7use dashmap::DashMap as HashMap;
8
9use log::warn;
10use taos_query::common::{JsonMeta, RawMeta};
11use taos_query::prelude::{Code, RawError};
12use taos_query::tmq::{
13    AsAsyncConsumer, AsConsumer, Assignment, IsAsyncData, IsAsyncMeta, IsData, IsOffset,
14    MessageSet, SyncOnAsync, Timeout, VGroupId,
15};
16use taos_query::util::{Edition, InlinableRead};
17use taos_query::RawResult;
18use taos_query::{DeError, DsnError, IntoDsn, RawBlock, TBuilder};
19use thiserror::Error;
20
21use tokio::sync::{oneshot, watch};
22
23use tokio::time;
24use tokio_tungstenite::tungstenite::Error as WsError;
25use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
26
27use crate::query::asyn::WS_ERROR_NO;
28use crate::query::infra::{ToMessage, WsConnReq};
29use crate::TaosBuilder;
30use messages::*;
31
32use ws_tool::{errors::WsError as WsErrorWst, frame::OpCode, Message as WsMessage};
33
34use std::fmt::Debug;
35use std::str::FromStr;
36use std::sync::atomic::AtomicU64;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40mod messages;
41
42type WsSender = tokio::sync::mpsc::Sender<WsMessage<bytes::Bytes>>;
43type WsTmqAgent = Arc<HashMap<ReqId, oneshot::Sender<RawResult<TmqRecvData>>>>;
44
45#[derive(Debug, Clone)]
46struct WsTmqSender {
47    req_id: Arc<AtomicU64>,
48    sender: WsSender,
49    queries: WsTmqAgent,
50    #[allow(dead_code)]
51    timeout: Timeout,
52}
53
54impl WsTmqSender {
55    fn req_id(&self) -> ReqId {
56        self.req_id
57            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
58    }
59    async fn send_recv(&self, msg: TmqSend) -> RawResult<TmqRecvData> {
60        self.send_recv_timeout(msg, Duration::MAX).await
61    }
62    async fn send_recv_timeout(&self, msg: TmqSend, timeout: Duration) -> RawResult<TmqRecvData> {
63        let send_timeout = Duration::from_millis(5000);
64        let req_id = msg.req_id();
65        let (tx, rx) = oneshot::channel();
66
67        self.queries.insert(req_id, tx);
68
69        self.sender
70            .send_timeout(msg.to_msg(), send_timeout)
71            .await
72            .map_err(WsTmqError::from)?;
73
74        let sleep = tokio::time::sleep(timeout);
75        tokio::pin!(sleep);
76        let data = tokio::select! {
77            _ = &mut sleep, if !sleep.is_elapsed() => {
78               log::trace!("poll timed out");
79               Err(WsTmqError::QueryTimeout("poll".to_string()))?
80            }
81            message = rx => {
82                message.map_err(WsTmqError::from)??
83            }
84        };
85        Ok(data)
86    }
87}
88
89#[derive(Debug)]
90pub struct TmqBuilder {
91    info: TaosBuilder,
92    conf: TmqInit,
93    timeout: Timeout,
94}
95
96impl TBuilder for TmqBuilder {
97    type Target = Consumer;
98
99    fn available_params() -> &'static [&'static str] {
100        &["token", "timeout", "group.id", "client.id"]
101    }
102
103    fn from_dsn<D: IntoDsn>(dsn: D) -> RawResult<Self> {
104        Self::new(dsn)
105    }
106
107    fn client_version() -> &'static str {
108        "0"
109    }
110
111    fn ping(&self, _: &mut Self::Target) -> RawResult<()> {
112        Ok(())
113    }
114
115    fn ready(&self) -> bool {
116        true
117    }
118
119    fn build(&self) -> RawResult<Self::Target> {
120        taos_query::block_in_place_or_global(self.build_consumer())
121    }
122
123    fn server_version(&self) -> RawResult<&str> {
124        todo!()
125    }
126
127    fn is_enterprise_edition(&self) -> RawResult<bool> {
128        todo!()
129    }
130
131    fn get_edition(&self) -> RawResult<taos_query::util::Edition> {
132        if self
133            .info
134            .addr
135            .matches(".cloud.tdengine.com")
136            .next()
137            .is_some()
138            || self
139                .info
140                .addr
141                .matches(".cloud.taosdata.com")
142                .next()
143                .is_some()
144        {
145            let edition = Edition::new("cloud", false);
146            return Ok(edition);
147        }
148
149        let taos = TBuilder::build(&self.info)?;
150
151        use taos_query::prelude::sync::Queryable;
152        let grant: RawResult<Option<(String, bool)>> = Queryable::query_one(
153            &taos,
154            "select version, (expire_time < now) from information_schema.ins_cluster",
155        );
156
157        let edition = if let Ok(Some((edition, expired))) = grant {
158            Edition::new(edition, expired)
159        } else {
160            let grant: RawResult<Option<(String, (), String)>> =
161                Queryable::query_one(&taos, "show grants");
162
163            if let Ok(Some((edition, _, expired))) = grant {
164                Edition::new(
165                    edition.trim(),
166                    expired.trim() == "false" || expired.trim() == "unlimited",
167                )
168            } else {
169                warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
170                Edition::new("unknown", true)
171            }
172        };
173        Ok(edition)
174    }
175}
176
177#[async_trait::async_trait]
178impl taos_query::AsyncTBuilder for TmqBuilder {
179    type Target = Consumer;
180
181    fn from_dsn<D: IntoDsn>(dsn: D) -> RawResult<Self> {
182        Self::new(dsn)
183    }
184
185    fn client_version() -> &'static str {
186        "0"
187    }
188
189    async fn ping(&self, _: &mut Self::Target) -> RawResult<()> {
190        Ok(())
191    }
192
193    async fn ready(&self) -> bool {
194        true
195    }
196
197    async fn build(&self) -> RawResult<Self::Target> {
198        self.build_consumer().await
199    }
200
201    async fn server_version(&self) -> RawResult<&str> {
202        todo!()
203    }
204
205    async fn is_enterprise_edition(&self) -> RawResult<bool> {
206        todo!()
207    }
208
209    async fn get_edition(&self) -> RawResult<taos_query::util::Edition> {
210        use taos_query::prelude::AsyncQueryable;
211
212        let taos = taos_query::AsyncTBuilder::build(&self.info).await?;
213        // Ensure server is ready.
214        taos.exec("select server_status()").await?;
215
216        match self
217            .info
218            .addr
219            .matches(".cloud.tdengine.com")
220            .next()
221            .is_some()
222            || self
223                .info
224                .addr
225                .matches(".cloud.taosdata.com")
226                .next()
227                .is_some()
228        {
229            true => {
230                let edition = Edition::new("cloud", false);
231                return Ok(edition);
232            }
233            false => (),
234        }
235
236        let grant: RawResult<Option<(String, bool)>> = AsyncQueryable::query_one(
237            &taos,
238            "select version, (expire_time < now) from information_schema.ins_cluster",
239        )
240        .await;
241
242        let edition = if let Ok(Some((edition, expired))) = grant {
243            Edition::new(edition, expired)
244        } else {
245            let grant: RawResult<Option<(String, (), String)>> =
246                AsyncQueryable::query_one(&taos, "show grants").await;
247
248            if let Ok(Some((edition, _, expired))) = grant {
249                Edition::new(
250                    edition.trim(),
251                    expired.trim() == "false" || expired.trim() == "unlimited",
252                )
253            } else {
254                warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
255                Edition::new("unknown", true)
256            }
257        };
258        Ok(edition)
259    }
260}
261#[derive(Debug)]
262struct WsMessageBase {
263    sender: WsTmqSender,
264    message_id: MessageId,
265}
266
267impl WsMessageBase {
268    async fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>> {
269        let req_id = self.sender.req_id();
270
271        let msg = TmqSend::Fetch(MessageArgs {
272            req_id,
273            message_id: self.message_id,
274        });
275        let data = self.sender.send_recv(msg).await?;
276        let fetch = if let TmqRecvData::Fetch(fetch) = data {
277            fetch
278        } else {
279            unreachable!()
280        };
281
282        if fetch.completed {
283            return Ok(None);
284        }
285
286        let msg = TmqSend::FetchBlock(MessageArgs {
287            req_id,
288            message_id: self.message_id,
289        });
290        let data = self.sender.send_recv(msg).await?;
291        if let TmqRecvData::Bytes(bytes) = data {
292            let mut raw = RawBlock::parse_from_raw_block(bytes, fetch.precision);
293
294            // for row in 0..raw.nrows() {
295            //     for col in 0..raw.ncols() {
296            //         log::trace!("at ({}, {})", row, col);
297            //         let v = unsafe { raw.get_ref_unchecked(row, col) };
298            //         println!("({}, {}): {:?}", row, col, v);
299            //     }
300            // }
301            raw.with_field_names(fetch.fields().iter().map(|f| f.name()));
302            if let Some(name) = fetch.table_name {
303                raw.with_table_name(name);
304            }
305            return Ok(Some(raw));
306        }
307        todo!()
308    }
309    async fn fetch_json_meta(&self) -> RawResult<JsonMeta> {
310        let req_id = self.sender.req_id();
311        let msg = TmqSend::FetchJsonMeta(MessageArgs {
312            req_id,
313            message_id: self.message_id,
314        });
315        let data = self.sender.send_recv(msg).await?;
316        if let TmqRecvData::FetchJsonMeta { data } = data {
317            let json: JsonMeta = serde_json::from_value(data).map_err(WsTmqError::from)?;
318            return Ok(json);
319        }
320        unreachable!()
321    }
322    async fn fetch_raw_meta(&self) -> RawResult<RawMeta> {
323        let req_id = self.sender.req_id();
324        let msg = TmqSend::FetchRaw(MessageArgs {
325            req_id,
326            message_id: self.message_id,
327        });
328        let data = self.sender.send_recv(msg).await?;
329        if let TmqRecvData::Bytes(bytes) = data {
330            let message_type = bytes.as_ref().read_u64().unwrap();
331            debug_assert_eq!(message_type, 3, "should be meta message type");
332            let raw = RawMeta::new(bytes.slice(8..)); // first u64 is message type.
333            return Ok(raw);
334        }
335        unreachable!()
336    }
337}
338
339#[derive(Debug)]
340pub struct Meta(WsMessageBase);
341
342// impl WsMetaMessage {
343//     pub async fn as_raw_meta(&self) -> Result<RawMeta> {
344//         self.0.fetch_raw_meta().await
345//     }
346//     pub async fn as_json_meta(&self) -> Result<JsonMeta> {
347//         self.0.fetch_json_meta().await
348//     }
349// }
350
351#[async_trait::async_trait]
352impl IsAsyncMeta for Meta {
353    async fn as_raw_meta(&self) -> RawResult<RawMeta> {
354        self.0.fetch_raw_meta().await
355    }
356
357    async fn as_json_meta(&self) -> RawResult<JsonMeta> {
358        self.0.fetch_json_meta().await
359    }
360}
361
362impl SyncOnAsync for Meta {}
363
364#[derive(Debug)]
365pub struct Data(WsMessageBase);
366
367impl Data {
368    pub async fn fetch_block(&self) -> RawResult<Option<RawBlock>> {
369        self.0.fetch_raw_block().await
370    }
371}
372
373impl Iterator for Data {
374    type Item = RawResult<RawBlock>;
375
376    fn next(&mut self) -> Option<Self::Item> {
377        taos_query::block_in_place_or_global(self.fetch_block()).transpose()
378    }
379}
380
381#[async_trait::async_trait]
382impl IsAsyncData for Data {
383    async fn as_raw_data(&self) -> RawResult<taos_query::common::RawData> {
384        self.0
385            .fetch_raw_meta()
386            .await
387            .map(|raw| unsafe { std::mem::transmute(raw) })
388    }
389
390    async fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>> {
391        self.fetch_block().await
392    }
393}
394
395impl IsData for Data {
396    fn as_raw_data(&self) -> RawResult<taos_query::common::RawData> {
397        taos_query::block_in_place_or_global(self.0.fetch_raw_meta())
398            .map(|raw| unsafe { std::mem::transmute(raw) })
399    }
400
401    fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>> {
402        taos_query::block_in_place_or_global(self.fetch_block())
403    }
404}
405pub enum WsMessageSet {
406    Meta(Meta),
407    Data(Data),
408}
409
410impl WsMessageSet {
411    pub const fn message_type(&self) -> MessageType {
412        match self {
413            WsMessageSet::Meta(_) => MessageType::Meta,
414            WsMessageSet::Data(_) => MessageType::Data,
415        }
416    }
417}
418
419impl Consumer {
420    // async fn init_poll(&self, timeout: Duration) -> Result<()> {
421    //     let req_id = self.sender.req_id();
422    //     let action = TmqSend::Poll {
423    //         req_id,
424    //         blocking_time: 0,
425    //     };
426
427    //     let data = self.sender.send_recv_timeout(action, timeout).await?;
428    //     match data {
429    //         TmqRecvData::Poll(TmqPoll {
430    //             message_id,
431    //             database,
432    //             have_message,
433    //             topic,
434    //             vgroup_id,
435    //             message_type,
436    //         }) => {
437    //             assert!(!have_message);
438    //         }
439    //         _ => unreachable!(),
440    //     }
441    //     Ok(())
442    // }
443    async fn poll_wait(&self) -> RawResult<(Offset, MessageSet<Meta, Data>)> {
444        let elapsed = tokio::time::Instant::now();
445        loop {
446            let req_id = self.sender.req_id();
447            let action = TmqSend::Poll {
448                req_id,
449                blocking_time: 0,
450            };
451
452            let data = self.sender.send_recv(action).await?;
453
454            match data {
455                TmqRecvData::Poll(TmqPoll {
456                    message_id,
457                    database,
458                    have_message,
459                    topic,
460                    vgroup_id,
461                    message_type,
462                }) => {
463                    if have_message {
464                        let dur = elapsed.elapsed();
465                        let offset = Offset {
466                            message_id,
467                            database,
468                            topic,
469                            vgroup_id,
470                        };
471                        let message = WsMessageBase {
472                            sender: self.sender.clone(),
473                            message_id,
474                        };
475                        log::trace!("Got message in {}ms", dur.as_millis());
476                        break match message_type {
477                            MessageType::Meta => Ok((offset, MessageSet::Meta(Meta(message)))),
478                            MessageType::Data => Ok((offset, MessageSet::Data(Data(message)))),
479                            MessageType::MetaData => Ok((
480                                offset,
481                                MessageSet::MetaData(
482                                    Meta(message),
483                                    Data(WsMessageBase {
484                                        sender: self.sender.clone(),
485                                        message_id,
486                                    }),
487                                ),
488                            )),
489                            MessageType::Invalid => unreachable!(),
490                            // _ => unreachable!(),
491                        };
492                    } else {
493                        tokio::time::sleep(Duration::from_millis(500)).await;
494                        continue;
495                    }
496                }
497                _ => unreachable!(),
498            }
499        }
500    }
501    pub(crate) async fn poll_timeout(
502        &self,
503        timeout: Duration,
504    ) -> RawResult<Option<(Offset, MessageSet<Meta, Data>)>> {
505        let sleep = tokio::time::sleep(timeout);
506        tokio::pin!(sleep);
507        tokio::select! {
508            _ = &mut sleep, if !sleep.is_elapsed() => {
509               Ok(None)
510            }
511            message = self.poll_wait() => {
512                Ok(Some(message?))
513            }
514        }
515    }
516}
517
518#[async_trait::async_trait]
519impl AsAsyncConsumer for Consumer {
520    type Offset = Offset;
521
522    type Meta = Meta;
523
524    type Data = Data;
525
526    async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
527        &mut self,
528        topics: I,
529    ) -> RawResult<()> {
530        self.topics = topics.into_iter().map(Into::into).collect_vec();
531        let req_id = self.sender.req_id();
532        let action = TmqSend::Subscribe {
533            req_id,
534            req: self.tmq_conf.clone(),
535            topics: self.topics.clone(),
536            conn: self.conn.clone(),
537        };
538        self.sender.send_recv(action).await?;
539
540        // dbg!(&self.tmq_conf);
541
542        if let Some(offset) = self.tmq_conf.offset_seek.clone() {
543            // dbg!(offset);
544            let offsets = offset
545                .split(',')
546                .map(|s| {
547                    s.split(':')
548                        .map(|i| i.parse::<i64>().unwrap())
549                        .collect_vec()
550                })
551                .collect_vec();
552            let topic_name = &self.topics[0];
553            for offset in offsets {
554                let vgroup_id = offset[0] as i32;
555                let offset = offset[1];
556                log::trace!(
557                    "topic {} seeking to offset {} for vgroup {}",
558                    &topic_name,
559                    offset,
560                    vgroup_id
561                );
562
563                let req_id = self.sender.req_id();
564                let action = TmqSend::Seek(OffsetSeekArgs {
565                    req_id,
566                    topic: topic_name.to_string(),
567                    vgroup_id,
568                    offset,
569                });
570
571                let _ = self
572                    .sender
573                    .send_recv(action)
574                    .await
575                    .unwrap_or(crate::consumer::messages::TmqRecvData::Seek { timing: 0 });
576            }
577        }
578
579        Ok(())
580    }
581
582    async fn unsubscribe(self) {
583        let req_id = self.sender.req_id();
584        log::trace!("unsubscribe {} start", req_id);
585        let action = TmqSend::Unsubscribe { req_id };
586        self.sender.send_recv(action).await.unwrap();
587        drop(self)
588    }
589
590    async fn recv_timeout(
591        &self,
592        timeout: taos_query::tmq::Timeout,
593    ) -> RawResult<
594        Option<(
595            Self::Offset,
596            taos_query::tmq::MessageSet<Self::Meta, Self::Data>,
597        )>,
598    > {
599        match timeout {
600            Timeout::Never | Timeout::None => self.poll_timeout(Duration::MAX).await,
601            Timeout::Duration(timeout) => self.poll_timeout(timeout).await,
602        }
603    }
604
605    async fn commit(&self, offset: Self::Offset) -> RawResult<()> {
606        let req_id = self.sender.req_id();
607        let action = TmqSend::Commit(MessageArgs {
608            req_id,
609            message_id: offset.message_id,
610        });
611
612        let _ = self.sender.send_recv(action).await?;
613        Ok(())
614    }
615
616    async fn commit_offset(
617        &self,
618        topic_name: &str,
619        vgroup_id: VGroupId,
620        offset: i64,
621    ) -> RawResult<()> {
622        let req_id = self.sender.req_id();
623        let action = TmqSend::CommitOffset(OffsetSeekArgs {
624            req_id,
625            topic: topic_name.to_string(),
626            vgroup_id,
627            offset,
628        });
629
630        let _ = self.sender.send_recv(action).await?;
631        Ok(())
632    }
633
634    async fn list_topics(&self) -> RawResult<Vec<String>> {
635        let topics = self.topics.clone();
636        Ok(topics)
637    }
638
639    async fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>> {
640        let topics = self.topics.clone();
641        log::trace!("topics: {:?}", topics);
642
643        let mut ret = Vec::new();
644        for topic in topics {
645            let assignments = self.topic_assignment(&topic).await;
646            ret.push((topic, assignments));
647        }
648
649        Some(ret)
650    }
651
652    async fn topic_assignment(&self, topic: &str) -> Vec<Assignment> {
653        let req_id = self.sender.req_id();
654        let action = TmqSend::Assignment(TopicAssignmentArgs {
655            req_id,
656            topic: topic.to_string(),
657        });
658
659        let recv = self.sender.send_recv(action).await.ok();
660        match recv {
661            Some(TmqRecvData::Assignment(TopicAssignment { assignment, timing })) => {
662                // assert_eq!(topic, topic);
663                log::trace!("timing: {:?}", timing);
664                log::trace!("assignment: {:?}", assignment);
665                assignment
666            }
667            _ => {
668                vec![]
669            }
670        }
671    }
672
673    async fn offset_seek(
674        &mut self,
675        topic: &str,
676        vgroup_id: VGroupId,
677        offset: i64,
678    ) -> RawResult<()> {
679        let req_id = self.sender.req_id();
680        let action = TmqSend::Seek(OffsetSeekArgs {
681            req_id,
682            topic: topic.to_string(),
683            vgroup_id,
684            offset,
685        });
686
687        let _ = self.sender.send_recv(action).await?;
688        Ok(())
689    }
690
691    async fn committed(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
692        let req_id = self.sender.req_id();
693        let action = TmqSend::Committed(OffsetArgs {
694            req_id,
695            topic_vgroup_ids: vec![OffsetInnerArgs {
696                topic: topic.to_string(),
697                vgroup_id,
698            }],
699        });
700
701        let data = self.sender.send_recv(action).await?;
702        if let TmqRecvData::Committed { committed } = data {
703            let offset = committed[0];
704            Ok(offset)
705        } else {
706            Ok(0)
707        }
708    }
709
710    async fn position(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
711        let req_id = self.sender.req_id();
712        let action = TmqSend::Position(OffsetArgs {
713            req_id,
714            topic_vgroup_ids: vec![OffsetInnerArgs {
715                topic: topic.to_string(),
716                vgroup_id,
717            }],
718        });
719
720        let data = self.sender.send_recv(action).await?;
721        if let TmqRecvData::Position { position } = data {
722            let offset = position[0];
723            Ok(offset)
724        } else {
725            Ok(0)
726        }
727    }
728
729    fn default_timeout(&self) -> Timeout {
730        self.timeout
731    }
732}
733
734impl AsConsumer for Consumer {
735    type Offset = Offset;
736
737    type Meta = Meta;
738
739    type Data = Data;
740
741    fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
742        &mut self,
743        topics: I,
744    ) -> RawResult<()> {
745        taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::subscribe(self, topics))
746    }
747
748    fn recv_timeout(
749        &self,
750        timeout: Timeout,
751    ) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>> {
752        taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::recv_timeout(
753            self, timeout,
754        ))
755    }
756
757    fn commit(&self, offset: Self::Offset) -> RawResult<()> {
758        taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::commit(self, offset))
759    }
760
761    fn commit_offset(&self, topic_name: &str, vgroup_id: VGroupId, offset: i64) -> RawResult<()> {
762        taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::commit_offset(
763            self, topic_name, vgroup_id, offset,
764        ))
765    }
766
767    fn list_topics(&self) -> RawResult<Vec<String>> {
768        taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::list_topics(self))
769    }
770
771    fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>> {
772        taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::assignments(self))
773    }
774
775    fn offset_seek(&mut self, topic: &str, vg_id: VGroupId, offset: i64) -> RawResult<()> {
776        taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::offset_seek(
777            self, topic, vg_id, offset,
778        ))
779    }
780
781    fn committed(&self, topic: &str, vg_id: VGroupId) -> RawResult<i64> {
782        taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::committed(
783            self, topic, vg_id,
784        ))
785    }
786
787    fn position(&self, topic: &str, vg_id: VGroupId) -> RawResult<i64> {
788        taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::position(
789            self, topic, vg_id,
790        ))
791    }
792}
793
794impl TmqBuilder {
795    pub fn new<D: IntoDsn>(dsn: D) -> RawResult<Self> {
796        let dsn = dsn.into_dsn()?;
797        let info = TaosBuilder::from_dsn(&dsn)?;
798        let group_id = dsn
799            .params
800            .get("group.id")
801            .map(ToString::to_string)
802            .ok_or_else(|| DsnError::RequireParam("group.id".to_string()))?;
803        let client_id = dsn.params.get("client.id").map(ToString::to_string);
804        let offset_reset = dsn.params.get("auto.offset.reset").map(ToString::to_string);
805        let auto_commit = dsn
806            .params
807            .get("enable.auto.commit")
808            .map(|s| {
809                if s.is_empty() {
810                    "false".to_string()
811                } else {
812                    s.to_string()
813                }
814            })
815            .unwrap_or("false".to_string());
816        let auto_commit_interval_ms = dsn.params.get("auto.commit.interval.ms").and_then(|s| {
817            if s.is_empty() {
818                None
819            } else {
820                Some(s.to_string())
821            }
822        });
823        let snapshot_enable = dsn
824            .params
825            .get("experimental.snapshot.enable")
826            .and_then(|s| {
827                if s.is_empty() {
828                    None
829                } else {
830                    Some(s.to_string())
831                }
832            })
833            .unwrap_or("false".to_string());
834        let with_table_name = dsn
835            .params
836            .get("with.table.name")
837            .and_then(|s| {
838                if s.is_empty() {
839                    None
840                } else {
841                    Some(s.to_string())
842                }
843            })
844            .unwrap_or("true".to_string());
845        let timeout = if let Some(timeout) = dsn.get("timeout") {
846            Timeout::from_str(timeout).map_err(RawError::from_any)?
847        } else {
848            Timeout::Duration(Duration::from_secs(5))
849        };
850        let offset_seek = dsn.params.get("offset").and_then(|s| {
851            if s.is_empty() {
852                None
853            } else {
854                Some(s.to_string())
855            }
856        });
857        let conf = TmqInit {
858            group_id,
859            client_id,
860            offset_reset,
861            auto_commit,
862            auto_commit_interval_ms,
863            snapshot_enable,
864            with_table_name,
865            offset_seek,
866        };
867
868        Ok(Self {
869            info,
870            conf,
871            timeout,
872        })
873    }
874
875    #[allow(dead_code)]
876    async fn tung_build_consumer(&self) -> RawResult<Consumer> {
877        let url = self.info.to_tmq_url();
878        // let (ws, _) = taos_query::block_in_place_or_global(connect_async(url))?;
879        let (ws, _) = connect_async(&url).await.map_err(WsTmqError::from)?;
880        let (mut sender, mut reader) = ws.split();
881
882        let queries = Arc::new(HashMap::<ReqId, tokio::sync::oneshot::Sender<_>>::new());
883
884        let queries_sender = queries.clone();
885        let msg_handler = queries.clone();
886
887        let (ws, mut msg_recv) = tokio::sync::mpsc::channel::<Message>(100);
888        let ws2 = ws.clone();
889
890        // Connection watcher
891        let (tx, mut rx) = watch::channel(false);
892        let mut close_listener = rx.clone();
893
894        let sending_url = url.clone();
895        static PING_INTERVAL: u64 = 29;
896        const PING: &[u8] = b"TAOSX";
897
898        tokio::spawn(async move {
899            let mut interval = time::interval(Duration::from_secs(PING_INTERVAL));
900
901            loop {
902                tokio::select! {
903                    _ = interval.tick() => {
904                        log::trace!("Check websocket message sender alive");
905                        if let Err(err) = sender.send(Message::Ping(PING.to_vec())).await {
906                            log::trace!("sending ping message to {sending_url} error: {err:?}");
907                            // let mut keys = Vec::new();
908                            let keys = msg_handler.iter().map(|r| *r.key()).collect_vec();
909
910                            // msg_handler.for_each_async(|k, _| {
911                            //     keys.push(*k);
912                            // }).await;
913                            for k in keys {
914                                if let Some((_, sender)) = msg_handler.remove(&k) {
915                                    let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(),
916                                        format!("WebSocket internal error: {err}"))));
917                                }
918                            }
919                        }
920                    }
921                    Some(msg) = msg_recv.recv() => {
922                        if msg.is_close() {
923                            let _ = sender.send(msg).await;
924                            let _ = sender.close().await;
925                            break;
926                        }
927                        log::trace!("send message {msg:?}");
928                        if let Err(err) = sender.send(msg).await {
929                            log::trace!("sending message to {sending_url} error: {err:?}");
930                            let keys = msg_handler.iter().map(|r| *r.key()).collect_vec();
931                            for k in keys {
932                                if let Some((_, sender)) = msg_handler.remove(&k) {
933                                    let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(),
934                                        format!("WebSocket internal error: {err}"))));
935                                }
936                            }
937                        }
938                        log::trace!("send message done");
939                    }
940                    _ = rx.changed() => {
941                        let _= sender.send(Message::Close(None)).await;
942                        let _ = sender.close().await;
943                        log::trace!("close tmq sender");
944                        break;
945                    }
946                }
947            }
948        });
949
950        tokio::spawn(async move {
951            let instant = Instant::now();
952            'ws: loop {
953                tokio::select! {
954                    Some(message) = reader.next() => {
955                        match message {
956                            Ok(message) => match message {
957                                Message::Text(text) => {
958                                    log::trace!("json response: {}", text);
959                                    let v: TmqRecv = serde_json::from_str(&text).expect(&text);
960                                    let (req_id, recv, ok) = v.ok();
961                                    match &recv {
962                                        TmqRecvData::Subscribe => {
963                                            log::trace!("subscribe with: {:?}", req_id);
964
965                                            if let Some((_, sender)) = queries_sender.remove(&req_id)
966                                            {
967                                                let _ = sender.send(ok.map(|_|recv));
968                                            }  else {
969                                                log::warn!("subscribe message received but no receiver alive");
970                                            }
971                                        },
972                                        TmqRecvData::Unsubscribe => {
973                                            log::trace!("unsubscribe with: {:?} successed", req_id);
974                                            if let Some((_, sender)) = queries_sender.remove(&req_id)
975                                            {
976                                                let _ = sender.send(ok.map(|_|recv));
977                                            }  else {
978                                                log::warn!("unsubscribe message received but no receiver alive");
979                                            }
980                                        },
981                                        TmqRecvData::Poll(_) => {
982                                            if let Some((_, sender)) = queries_sender.remove(&req_id)
983                                            {
984                                                let _ = sender.send(ok.map(|_|recv));
985                                            }  else {
986                                                log::warn!("poll message received but no receiver alive");
987                                            }
988                                        },
989                                        TmqRecvData::FetchJsonMeta { data }=> {
990                                            log::trace!("fetch json meta data: {:?}", data);
991                                            if let Some((_, sender)) = queries_sender.remove(&req_id)
992                                            {
993                                                let _ = sender.send(ok.map(|_|recv));
994                                            }  else {
995                                                log::warn!("poll message received but no receiver alive");
996                                            }
997                                        }
998                                        TmqRecvData::FetchRaw { meta: _ }=> {
999                                            if let Some((_, sender)) = queries_sender.remove(&req_id)
1000                                            {
1001                                                let _ = sender.send(ok.map(|_|recv));
1002                                            }  else {
1003                                                log::warn!("poll message received but no receiver alive");
1004                                            }
1005                                        }
1006                                        TmqRecvData::Commit=> {
1007                                            log::trace!("commit done: {:?}", recv);
1008                                            if let Some((_, sender)) = queries_sender.remove(&req_id)
1009                                            {
1010                                                let _ = sender.send(ok.map(|_|recv));
1011                                            }  else {
1012                                                log::warn!("poll message received but no receiver alive");
1013                                            }
1014                                        }
1015                                        TmqRecvData::Fetch(fetch)=> {
1016                                            log::trace!("fetch done: {:?}", fetch);
1017                                            if let Some((_, sender)) = queries_sender.remove(&req_id)
1018                                            {
1019                                                let _ = sender.send(ok.map(|_|recv));
1020                                            }  else {
1021                                                log::warn!("poll message received but no receiver alive");
1022                                            }
1023                                        }
1024                                        TmqRecvData::FetchBlock{ data: _ }=> {
1025                                            if let Some((_, sender)) = queries_sender.remove(&req_id) {
1026                                                let _ = sender.send(Err(RawError::new(
1027                                                    WS_ERROR_NO::WEBSOCKET_ERROR.as_code(),
1028                                                    format!("WebSocket internal error: {:?}", &text)
1029                                                )));
1030                                            }
1031                                            break 'ws;
1032                                        }
1033                                        TmqRecvData::Assignment(assignment)=> {
1034                                            log::trace!("assignment done: {:?}", assignment);
1035                                            if let Some((_, sender)) = queries_sender.remove(&req_id)
1036                                            {
1037                                                let _ = sender.send(ok.map(|_|recv));
1038                                            }  else {
1039                                                log::warn!("assignment message received but no receiver alive");
1040                                            }
1041                                        }
1042                                        TmqRecvData::Seek { timing }=> {
1043                                            log::trace!("seek done: req_id {:?} timing {:?}", &req_id, timing);
1044                                            if let Some((_, sender)) = queries_sender.remove(&req_id)
1045                                            {
1046                                                let _ = sender.send(ok.map(|_|recv));
1047                                            }  else {
1048                                                log::warn!("seek message received but no receiver alive");
1049                                            }
1050                                        }
1051                                        TmqRecvData::Committed { committed }=> {
1052                                            log::trace!("committed done: {:?}", committed);
1053                                            if let Some((_, sender)) = queries_sender.remove(&req_id)
1054                                            {
1055                                                let _ = sender.send(ok.map(|_|recv));
1056                                            }  else {
1057                                                log::warn!("committed message received but no receiver alive");
1058                                            }
1059                                        }
1060                                        TmqRecvData::Position { position }=> {
1061                                            log::trace!("position done: {:?}", position);
1062                                            if let Some((_, sender)) = queries_sender.remove(&req_id)
1063                                            {
1064                                                let _ = sender.send(ok.map(|_|recv));
1065                                            }  else {
1066                                                log::warn!("position message received but no receiver alive");
1067                                            }
1068                                        }
1069                                        TmqRecvData::CommitOffset { timing }=> {
1070                                            log::trace!("commit offset done: {:?}", timing);
1071                                            if let Some((_, sender)) = queries_sender.remove(&req_id) {
1072                                                let _ = sender.send(ok.map(|_|recv));
1073                                            } else {
1074                                                log::warn!("commit offset message received but no receiver alive");
1075                                            }
1076                                        }
1077
1078                                        _ => unreachable!("unknown tmq response"),
1079                                    }
1080                                }
1081                                Message::Binary(data) => {
1082                                    // writeUint64(message.buffer, req.ReqID)
1083                                    // writeUint64(message.buffer, req.MessageID)
1084                                    // writeUint64(message.buffer, TMQRawMetaMessage)
1085                                    // writeUint32(message.buffer, length)
1086                                    // writeUint16(message.buffer, metaType)
1087                                    let mut bytes = Bytes::from(data);
1088                                    let part = bytes.slice(24..);
1089                                    // dbg!(&bytes);
1090                                    use bytes::Buf;
1091                                    let timing = bytes.get_u64_le();
1092                                    let req_id = bytes.get_u64_le();
1093                                    let message_id = bytes.get_u64_le();
1094
1095
1096                                    log::trace!("[{:.2}ms] receive binary message with req_id {} message_id {}",
1097                                        Duration::from_nanos(timing).as_secs_f64() / 1000.,
1098                                        req_id, message_id);
1099
1100                                    if let Some((_, sender)) = queries_sender.remove(&req_id)
1101                                    {
1102                                        sender.send(Ok(TmqRecvData::Bytes(part))).unwrap();
1103                                    }  else {
1104                                        log::warn!("poll message received but no receiver alive");
1105                                    }
1106
1107
1108                                }
1109                                Message::Close(close) => {
1110                                    log::warn!("websocket connection is closed (unexpected?)");
1111
1112                                    let keys = queries_sender.iter().map(|r| *r.key()).collect_vec();
1113                                    let err = if let Some(close) = close {
1114                                        format!("WebSocket internal error: {}", close)
1115                                    } else {
1116                                        "WebSocket internal error, connection is reset by server".to_string()
1117                                    };
1118                                    for k in keys {
1119                                        if let Some((_, sender)) = queries_sender.remove(&k) {
1120                                            let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(), err.clone())));
1121                                        }
1122                                    }
1123                                    break 'ws;
1124                                }
1125                                Message::Ping(bytes) => {
1126                                    ws2.send(Message::Pong(bytes)).await.unwrap();
1127                                }
1128                                Message::Pong(bytes) => {
1129                                    if bytes == PING {
1130                                        log::trace!("ping/pong handshake success");
1131                                    } else {
1132                                        // do nothing
1133                                        log::warn!("received (unexpected) pong message, do nothing");
1134                                    }
1135                                }
1136                                Message::Frame(frame) => {
1137                                    // do no`thing
1138                                    log::warn!("received (unexpected) frame message, do nothing");
1139                                    log::trace!("* frame data: {frame:?}");
1140                                }
1141                            },
1142                            Err(err) => {
1143                                let keys = queries_sender.iter().map(|r| *r.key()).collect_vec();
1144                                for k in keys {
1145                                    if let Some((_, sender)) = queries_sender.remove(&k) {
1146                                        let _ = sender.send(Err(RawError::new(
1147                                            WS_ERROR_NO::CONN_CLOSED.as_code(),
1148                                            format!("WebSocket internal error: {err}")
1149                                        )));
1150                                    }
1151                                }
1152                                break 'ws;
1153                            }
1154                        }
1155                    }
1156                    _ = close_listener.changed() => {
1157                        log::trace!("close reader task");
1158                        break 'ws;
1159                    }
1160                }
1161            }
1162            log::trace!("Consuming done in {:?}", instant.elapsed());
1163        });
1164        let (ws, mut _msg_recv) = tokio::sync::mpsc::channel(100);
1165        let ws_cloned: tokio::sync::mpsc::Sender<WsMessage<bytes::Bytes>> = ws.clone();
1166        let consumer = Consumer {
1167            conn: self.info.to_conn_request(),
1168            tmq_conf: self.conf.clone(),
1169            sender: WsTmqSender {
1170                req_id: Arc::new(AtomicU64::new(1)),
1171                queries,
1172                sender: ws_cloned,
1173                timeout: Timeout::Duration(Duration::MAX),
1174            },
1175            // fetches,
1176            close_signal: tx,
1177            timeout: self.timeout,
1178            topics: vec![],
1179        };
1180
1181        Ok(consumer)
1182    }
1183
1184    async fn build_consumer(&self) -> RawResult<Consumer> {
1185        let url = self.info.to_tmq_url();
1186        let sending_url = url.clone();
1187
1188        let ws = self.info.build_tmq_stream(url).await?;
1189        let (mut reader, mut sender) = ws.split();
1190
1191        let queries = Arc::new(HashMap::<ReqId, tokio::sync::oneshot::Sender<_>>::new());
1192
1193        let queries_sender = queries.clone();
1194        let msg_handler = queries.clone();
1195
1196        let (ws, mut msg_recv) = tokio::sync::mpsc::channel::<WsMessage<bytes::Bytes>>(100);
1197        let ws2 = ws.clone();
1198
1199        // Connection watcher
1200        let (tx, mut rx) = watch::channel(false);
1201        let mut close_listener = rx.clone();
1202
1203        static PING_INTERVAL: u64 = 29;
1204        const PING: &[u8] = b"TAOS";
1205
1206        tokio::spawn(async move {
1207            let mut interval = time::interval(Duration::from_secs(PING_INTERVAL));
1208
1209            loop {
1210                tokio::select! {
1211                    _ = interval.tick() => {
1212                        log::trace!("Check websocket message sender alive");
1213                        if let Err(err) = sender.send(OpCode::Ping, &serde_json::to_vec(&PING).unwrap()).await {
1214
1215                            log::trace!("sending ping message to {sending_url} error: {err:?}");
1216                            let keys = msg_handler.iter().map(|r| *r.key()).collect_vec();
1217
1218                            for k in keys {
1219                                if let Some((_, sender)) = msg_handler.remove(&k) {
1220                                    let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(),
1221                                        format!("WebSocket internal error: {err}"))));
1222                                }
1223                            }
1224                        }
1225                    }
1226                    Some(msg) = msg_recv.recv() => {
1227
1228                        log::trace!("send message {msg:?}");
1229                        let opcode = msg.code;
1230                        let msg = msg.data;
1231                        if let Err(err) = sender.send(opcode, &msg).await {
1232                            log::trace!("sending message to {sending_url} error: {err:?}");
1233                            let keys = msg_handler.iter().map(|r| *r.key()).collect_vec();
1234                            for k in keys {
1235                                if let Some((_, sender)) = msg_handler.remove(&k) {
1236                                    let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(),
1237                                        format!("WebSocket internal error: {err}"))));
1238                                }
1239                            }
1240                        }
1241                        log::trace!("send message done");
1242                    }
1243                    _ = rx.changed() => {
1244                        let _ = sender.send(OpCode::Close, b"").await;
1245                        log::trace!("close tmq sender");
1246                        break;
1247                    }
1248                }
1249            }
1250        });
1251
1252        tokio::spawn(async move {
1253            let instant = Instant::now();
1254            'ws: loop {
1255                tokio::select! {
1256                    Ok(frame) = reader.receive() => {
1257                        let (header, payload) = frame;
1258                        let code = header.code;
1259                        match code {
1260                            OpCode::Text => {
1261                                log::trace!("received json response: {payload}", payload = String::from_utf8_lossy(&payload));
1262                                let v: TmqRecv = serde_json::from_slice(&payload).unwrap();
1263                                let (req_id, recv, ok) = v.ok();
1264                                match &recv {
1265                                    TmqRecvData::Subscribe => {
1266                                        log::trace!("subscribe with: {:?}", req_id);
1267
1268                                        if let Some((_, sender)) = queries_sender.remove(&req_id)
1269                                        {
1270                                            let _ = sender.send(ok.map(|_|recv));
1271                                        }  else {
1272                                            log::warn!("subscribe message received but no receiver alive");
1273                                        }
1274                                    },
1275                                    TmqRecvData::Unsubscribe => {
1276                                        log::trace!("unsubscribe with: {:?} successed", req_id);
1277                                        if let Some((_, sender)) = queries_sender.remove(&req_id)
1278                                        {
1279                                            let _ = sender.send(ok.map(|_|recv));
1280                                        }  else {
1281                                            log::warn!("unsubscribe message received but no receiver alive");
1282                                        }
1283                                    },
1284                                    TmqRecvData::Poll(_) => {
1285                                        if let Some((_, sender)) = queries_sender.remove(&req_id)
1286                                        {
1287                                            let _ = sender.send(ok.map(|_|recv));
1288                                        }  else {
1289                                            log::warn!("poll message received but no receiver alive");
1290                                        }
1291                                    },
1292                                    TmqRecvData::FetchJsonMeta { data }=> {
1293                                        log::trace!("fetch json meta data: {:?}", data);
1294                                        if let Some((_, sender)) = queries_sender.remove(&req_id)
1295                                        {
1296                                            let _ = sender.send(ok.map(|_|recv));
1297                                        }  else {
1298                                            log::warn!("poll message received but no receiver alive");
1299                                        }
1300                                    }
1301                                    TmqRecvData::FetchRaw { meta: _ }=> {
1302                                        if let Some((_, sender)) = queries_sender.remove(&req_id)
1303                                        {
1304                                            let _ = sender.send(ok.map(|_|recv));
1305                                        }  else {
1306                                            log::warn!("poll message received but no receiver alive");
1307                                        }
1308                                    }
1309                                    TmqRecvData::Commit=> {
1310                                        log::trace!("commit done: {:?}", recv);
1311                                        if let Some((_, sender)) = queries_sender.remove(&req_id)
1312                                        {
1313                                            let _ = sender.send(ok.map(|_|recv));
1314                                        }  else {
1315                                            log::warn!("poll message received but no receiver alive");
1316                                        }
1317                                    }
1318                                    TmqRecvData::Fetch(fetch)=> {
1319                                        log::trace!("fetch done: {:?}", fetch);
1320                                        if let Some((_, sender)) = queries_sender.remove(&req_id)
1321                                        {
1322                                            let _ = sender.send(ok.map(|_|recv));
1323                                        }  else {
1324                                            log::warn!("poll message received but no receiver alive");
1325                                        }
1326                                    }
1327                                    TmqRecvData::FetchBlock{ data: _ }=> {
1328                                        if let Some((_, sender)) = queries_sender.remove(&req_id) {
1329                                            let _ = sender.send(Err(RawError::new(
1330                                                WS_ERROR_NO::WEBSOCKET_ERROR.as_code(),
1331                                                format!("WebSocket internal error")
1332                                            )));
1333                                        }
1334                                        break 'ws;
1335                                    }
1336                                    TmqRecvData::Assignment(assignment)=> {
1337                                        log::trace!("assignment done: {:?}", assignment);
1338                                        if let Some((_, sender)) = queries_sender.remove(&req_id)
1339                                        {
1340                                            let _ = sender.send(ok.map(|_|recv));
1341                                        }  else {
1342                                            log::warn!("assignment message received but no receiver alive");
1343                                        }
1344                                    }
1345                                    TmqRecvData::Seek { timing }=> {
1346                                        log::trace!("seek done: req_id {:?} timing {:?}", &req_id, timing);
1347                                        if let Some((_, sender)) = queries_sender.remove(&req_id)
1348                                        {
1349                                            let _ = sender.send(ok.map(|_|recv));
1350                                        }  else {
1351                                            log::warn!("seek message received but no receiver alive");
1352                                        }
1353                                    }
1354                                    TmqRecvData::Committed { committed }=> {
1355                                        log::trace!("committed done: {:?}", committed);
1356                                        if let Some((_, sender)) = queries_sender.remove(&req_id)
1357                                        {
1358                                            let _ = sender.send(ok.map(|_|recv));
1359                                        }  else {
1360                                            log::warn!("committed message received but no receiver alive");
1361                                        }
1362                                    }
1363                                    TmqRecvData::Position { position }=> {
1364                                        log::trace!("position done: {:?}", position);
1365                                        if let Some((_, sender)) = queries_sender.remove(&req_id)
1366                                        {
1367                                            let _ = sender.send(ok.map(|_|recv));
1368                                        }  else {
1369                                            log::warn!("position message received but no receiver alive");
1370                                        }
1371                                    }
1372                                    TmqRecvData::CommitOffset { timing }=> {
1373                                        log::trace!("commit offset done: {:?}", timing);
1374                                        if let Some((_, sender)) = queries_sender.remove(&req_id) {
1375                                            let _ = sender.send(ok.map(|_|recv));
1376                                        } else {
1377                                            log::warn!("commit offset message received but no receiver alive");
1378                                        }
1379                                    }
1380                                    _ => unreachable!("unknown tmq response"),
1381                                }
1382                            }
1383                            OpCode::Binary => {
1384                                let block = payload.to_vec();
1385                                let mut slice = block.as_slice();
1386                                use taos_query::util::InlinableRead;
1387                                let offset = 24;
1388                                let part = slice[offset..].to_vec();
1389
1390                                let _timing = {
1391                                    let timing = slice.read_u64().unwrap();
1392                                    Duration::from_nanos(timing as _)
1393                                };
1394
1395                                let req_id = slice.read_u64().unwrap();
1396
1397                                if let Some((_, sender)) = queries_sender.remove(&req_id) {
1398                                    log::trace!("send data to fetches with id {}", req_id);
1399                                    sender.send(Ok(TmqRecvData::Bytes(part.into()))).unwrap();
1400                                } else {
1401                                    log::warn!("req_id {req_id} not detected, message might be lost");
1402                                }
1403                            }
1404                            OpCode::Close => {
1405                                log::warn!("websocket connection is closed normally");
1406                                let mut keys = Vec::new();
1407                                for e in queries_sender.iter() {
1408                                    keys.push(*e.key());
1409                                }
1410                                for k in keys {
1411                                    if let Some((_, sender)) = queries_sender.remove(&k) {
1412                                        let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(), "received close message")));
1413                                    }
1414                                }
1415                                break 'ws;
1416                            }
1417                            OpCode::Ping => {
1418                                let bytes = payload.to_vec();
1419                                ws2.send(WsMessage{
1420                                    code: OpCode::Pong,
1421                                    data: bytes.into(),
1422                                    close_code: None
1423                                }).await.unwrap();
1424                            }
1425                            OpCode::Pong => {
1426                                // do nothing
1427                                log::trace!("received pong message, do nothing");
1428                            }
1429                            _ => {
1430                                let frame = payload;
1431                                // do nothing
1432                                log::warn!("received (unexpected) frame message, do nothing");
1433                                log::trace!("* frame data: {frame:?}");
1434                            }
1435                        }
1436                    }
1437                    _ = close_listener.changed() => {
1438                        log::trace!("close reader task");
1439                        break 'ws;
1440                    }
1441                }
1442            }
1443            log::trace!("Consuming done in {:?}", instant.elapsed());
1444        });
1445        let consumer = Consumer {
1446            conn: self.info.to_conn_request(),
1447            tmq_conf: self.conf.clone(),
1448            sender: WsTmqSender {
1449                req_id: Arc::new(AtomicU64::new(1)),
1450                queries,
1451                sender: ws,
1452                timeout: Timeout::Duration(Duration::MAX),
1453            },
1454            // fetches,
1455            close_signal: tx,
1456            timeout: self.timeout,
1457            topics: vec![],
1458        };
1459
1460        Ok(consumer)
1461    }
1462}
1463
1464#[derive(Debug)]
1465pub struct Consumer {
1466    conn: WsConnReq,
1467    tmq_conf: TmqInit,
1468    sender: WsTmqSender,
1469    close_signal: watch::Sender<bool>,
1470    timeout: Timeout,
1471    topics: Vec<String>,
1472}
1473
1474impl Drop for Consumer {
1475    fn drop(&mut self) {
1476        let _ = self.close_signal.send(true);
1477    }
1478}
1479#[derive(Debug)]
1480pub struct Offset {
1481    message_id: MessageId,
1482    database: String,
1483    topic: String,
1484    vgroup_id: i32,
1485}
1486
1487impl IsOffset for Offset {
1488    fn database(&self) -> &str {
1489        &self.database
1490    }
1491
1492    fn topic(&self) -> &str {
1493        &self.topic
1494    }
1495
1496    fn vgroup_id(&self) -> i32 {
1497        self.vgroup_id
1498    }
1499}
1500
1501#[derive(Debug, Error)]
1502pub enum WsTmqError {
1503    #[error("{0}")]
1504    Dsn(#[from] DsnError),
1505    #[error("{0}")]
1506    FetchError(#[from] oneshot::error::RecvError),
1507    #[error("{0}")]
1508    SendError(#[from] tokio::sync::mpsc::error::SendError<WsMessage<bytes::Bytes>>),
1509    #[error(transparent)]
1510    SendTimeoutError(#[from] tokio::sync::mpsc::error::SendTimeoutError<WsMessage<bytes::Bytes>>),
1511    #[error("{0}")]
1512    DeError(#[from] DeError),
1513    #[error("Deserialize json error: {0}")]
1514    JsonError(#[from] serde_json::Error),
1515    #[error("WebSocket internal[ws-tool] error: {0}")]
1516    WsErrorWst(#[from] WsErrorWst),
1517    #[error("{0}")]
1518    WsError(#[from] WsError),
1519    #[error("{0}")]
1520    TaosError(#[from] RawError),
1521    #[error("Receive timeout in {0}")]
1522    QueryTimeout(String),
1523}
1524
1525unsafe impl Send for WsTmqError {}
1526
1527unsafe impl Sync for WsTmqError {}
1528
1529impl WsTmqError {
1530    pub const fn errno(&self) -> Code {
1531        match self {
1532            WsTmqError::TaosError(error) => error.code(),
1533            _ => Code::FAILED,
1534        }
1535    }
1536    pub fn errstr(&self) -> String {
1537        match self {
1538            WsTmqError::TaosError(error) => error.message().to_string(),
1539            _ => format!("{}", self),
1540        }
1541    }
1542}
1543
1544impl From<WsTmqError> for RawError {
1545    fn from(value: WsTmqError) -> Self {
1546        match value {
1547            WsTmqError::TaosError(error) => error,
1548            error => {
1549                let code = error.errno();
1550                if code == Code::FAILED {
1551                    RawError::from_any(error)
1552                } else {
1553                    RawError::new(code, error.to_string())
1554                }
1555            }
1556        }
1557    }
1558}
1559
1560#[cfg(test)]
1561mod tests {
1562    use std::time::Duration;
1563
1564    use super::{TaosBuilder, TmqBuilder};
1565
1566    #[tokio::test]
1567    async fn test_ws_tmq_meta_batch() -> anyhow::Result<()> {
1568        use taos_query::prelude::*;
1569        let _ = tracing_subscriber::fmt()
1570            .with_file(true)
1571            .with_line_number(true)
1572            .with_max_level(tracing::Level::DEBUG)
1573            .compact()
1574            .try_init();
1575
1576        let taos = TaosBuilder::from_dsn("taos://localhost:6041")?
1577            .build()
1578            .await?;
1579        taos.exec_many([
1580            "drop topic if exists ws_tmq_meta_batch",
1581            "drop database if exists ws_tmq_meta_batch",
1582            "create database ws_tmq_meta_batch wal_retention_period 3600",
1583            "create topic ws_tmq_meta_batch with meta as database ws_tmq_meta_batch",
1584            "use ws_tmq_meta_batch",
1585            // kind 1: create super table using all types
1586            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1587            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1588            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1589            tags(t1 json)",
1590            // kind 2: create child table with json tag
1591            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1592            "create table tb1 using stb1 tags(NULL)",
1593            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1594            NULL, NULL, NULL, NULL, NULL,
1595            NULL, NULL, NULL, NULL)
1596            tb1 values(now, true, -2, -3, -4, -5, \
1597            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1598            254, 65534, 1, 1)",
1599            // kind 3: create super table with all types except json (especially for tags)
1600            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1601            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1602            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1603            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1604            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1605            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1606            // kind 4: create child table with all types except json
1607            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1608            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1609            254, 65534, 1, 1)",
1610            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1611            NULL, NULL, NULL, NULL, NULL,
1612            NULL, NULL, NULL, NULL)",
1613            // kind 5: create common table
1614            "create table `table` (ts timestamp, v int)",
1615            // kind 6: column in super table
1616            "alter table stb1 add column new1 bool",
1617            "alter table stb1 add column new2 tinyint",
1618            "alter table stb1 add column new10 nchar(16)",
1619            "alter table stb1 modify column new10 nchar(32)",
1620            "alter table stb1 drop column new10",
1621            "alter table stb1 drop column new2",
1622            "alter table stb1 drop column new1",
1623            // kind 7: add tag in super table
1624            "alter table `stb2` add tag new1 bool",
1625            "alter table `stb2` rename tag new1 new1_new",
1626            "alter table `stb2` modify tag t10 nchar(32)",
1627            "alter table `stb2` drop tag new1_new",
1628            // kind 8: column in common table
1629            "alter table `table` add column new1 bool",
1630            "alter table `table` add column new2 tinyint",
1631            "alter table `table` add column new10 nchar(16)",
1632            "alter table `table` modify column new10 nchar(32)",
1633            "alter table `table` rename column new10 new10_new",
1634            "alter table `table` drop column new10_new",
1635            "alter table `table` drop column new2",
1636            "alter table `table` drop column new1",
1637            // // kind 9: drop normal table
1638            // "drop table `table`",
1639            // // kind 10: drop child table
1640            // "drop table `tb2` `tb1`",
1641            // // kind 11: drop super table
1642            // "drop table `stb2`",
1643            // "drop table `stb1`",
1644        ])
1645        .await?;
1646
1647        taos.exec_many([
1648            "drop database if exists ws_tmq_meta_batch2",
1649            "create database if not exists ws_tmq_meta_batch2 wal_retention_period 3600",
1650            "use ws_tmq_meta_batch2",
1651        ])
1652        .await?;
1653
1654        let builder = TmqBuilder::new(
1655            "taos://localhost:6041?group.id=10&timeout=5s&auto.offset.reset=earliest&msg.enable.batchmeta=true&experimental.snapshot.enable=true",
1656        )?;
1657        let mut consumer = builder.build_consumer().await?;
1658        consumer.subscribe(["ws_tmq_meta_batch"]).await?;
1659
1660        {
1661            let mut stream = consumer.stream();
1662
1663            let mut count = 0;
1664            while let Some((offset, message)) = stream.try_next().await? {
1665                // Offset contains information for topic name, database name and vgroup id,
1666                //  similar to kafka topic/partition/offset.
1667                let _ = offset.topic();
1668                let _ = offset.database();
1669                let _ = offset.vgroup_id();
1670                count += 1;
1671
1672                // Different to kafka message, TDengine consumer would consume two kind of messages.
1673                //
1674                // 1. meta
1675                // 2. data
1676                // 3. meta + data
1677                match message {
1678                    MessageSet::Meta(meta) => {
1679                        let _raw = meta.as_raw_meta().await?;
1680                        // taos.write_meta(raw).await?;
1681
1682                        // meta data can be write to an database seamlessly by raw or json (to sql).
1683                        let json = meta.as_json_meta().await?;
1684                        tracing::info!(count, batch.size = json.iter().len(), "{json:?}");
1685                        // assert!(json.iter().len() > 1, "json meta is batch");
1686                        for meta in &json {
1687                            let sql = meta.to_string();
1688                            tracing::debug!(count, "sql: {}", sql);
1689                            if let Err(err) = taos.exec(sql).await {
1690                                match err.code() {
1691                                    Code::TAG_ALREADY_EXIST => log::trace!("tag already exists"),
1692                                    Code::TAG_NOT_EXIST => log::trace!("tag not exist"),
1693                                    Code::COLUMN_EXISTS => log::trace!("column already exists"),
1694                                    Code::COLUMN_NOT_EXIST => log::trace!("column not exists"),
1695                                    Code::INVALID_COLUMN_NAME => log::trace!("invalid column name"),
1696                                    Code::MODIFIED_ALREADY => log::trace!("modified already done"),
1697                                    Code::TABLE_NOT_EXIST => log::trace!("table does not exists"),
1698                                    Code::STABLE_NOT_EXIST => log::trace!("stable does not exists"),
1699                                    _ => {
1700                                        tracing::error!(count, "{}", err);
1701                                    }
1702                                }
1703                            }
1704                        }
1705                    }
1706                    MessageSet::Data(data) => {
1707                        // data message may have more than one data block for various tables.
1708                        while let Some(_data) = data.fetch_block().await? {
1709                            // dbg!(data.table_name());
1710                            // dbg!(data);
1711                        }
1712                    }
1713                    _ => unreachable!(),
1714                }
1715                consumer.commit(offset).await?;
1716            }
1717        }
1718        consumer.unsubscribe().await;
1719
1720        tokio::time::sleep(Duration::from_secs(2)).await;
1721
1722        taos.exec_many([
1723            "drop database ws_tmq_meta_batch2",
1724            "drop topic ws_tmq_meta_batch",
1725            "drop database ws_tmq_meta_batch",
1726        ])
1727        .await?;
1728        Ok(())
1729    }
1730
1731    #[tokio::test]
1732    async fn test_ws_tmq_meta() -> anyhow::Result<()> {
1733        use taos_query::prelude::*;
1734        let _ = tracing_subscriber::fmt()
1735            .with_file(true)
1736            .with_line_number(true)
1737            .with_max_level(tracing::Level::DEBUG)
1738            .compact()
1739            .try_init();
1740
1741        let taos = TaosBuilder::from_dsn("taos://localhost:6041")?
1742            .build()
1743            .await?;
1744        taos.exec_many([
1745            "drop topic if exists ws_tmq_meta",
1746            "drop database if exists ws_tmq_meta",
1747            "create database ws_tmq_meta wal_retention_period 3600",
1748            "create topic ws_tmq_meta with meta as database ws_tmq_meta",
1749            "use ws_tmq_meta",
1750            // kind 1: create super table using all types
1751            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1752            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1753            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1754            tags(t1 json)",
1755            // kind 2: create child table with json tag
1756            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1757            "create table tb1 using stb1 tags(NULL)",
1758            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1759            NULL, NULL, NULL, NULL, NULL,
1760            NULL, NULL, NULL, NULL)
1761            tb1 values(now, true, -2, -3, -4, -5, \
1762            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1763            254, 65534, 1, 1)",
1764            // kind 3: create super table with all types except json (especially for tags)
1765            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1766            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1767            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1768            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1769            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1770            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1771            // kind 4: create child table with all types except json
1772            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1773            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1774            254, 65534, 1, 1)",
1775            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1776            NULL, NULL, NULL, NULL, NULL,
1777            NULL, NULL, NULL, NULL)",
1778            // kind 5: create common table
1779            "create table `table` (ts timestamp, v int)",
1780            // kind 6: column in super table
1781            "alter table stb1 add column new1 bool",
1782            "alter table stb1 add column new2 tinyint",
1783            "alter table stb1 add column new10 nchar(16)",
1784            "alter table stb1 modify column new10 nchar(32)",
1785            "alter table stb1 drop column new10",
1786            "alter table stb1 drop column new2",
1787            "alter table stb1 drop column new1",
1788            // kind 7: add tag in super table
1789            "alter table `stb2` add tag new1 bool",
1790            "alter table `stb2` rename tag new1 new1_new",
1791            "alter table `stb2` modify tag t10 nchar(32)",
1792            "alter table `stb2` drop tag new1_new",
1793            // kind 8: column in common table
1794            "alter table `table` add column new1 bool",
1795            "alter table `table` add column new2 tinyint",
1796            "alter table `table` add column new10 nchar(16)",
1797            "alter table `table` modify column new10 nchar(32)",
1798            "alter table `table` rename column new10 new10_new",
1799            "alter table `table` drop column new10_new",
1800            "alter table `table` drop column new2",
1801            "alter table `table` drop column new1",
1802            // // kind 9: drop normal table
1803            // "drop table `table`",
1804            // // kind 10: drop child table
1805            // "drop table `tb2` `tb1`",
1806            // // kind 11: drop super table
1807            // "drop table `stb2`",
1808            // "drop table `stb1`",
1809        ])
1810        .await?;
1811
1812        taos.exec_many([
1813            "drop database if exists ws_tmq_meta2",
1814            "create database if not exists ws_tmq_meta2 wal_retention_period 3600",
1815            "use ws_tmq_meta2",
1816        ])
1817        .await?;
1818
1819        let builder = TmqBuilder::new(
1820            "taos://localhost:6041?group.id=10&timeout=5s&auto.offset.reset=earliest",
1821        )?;
1822        let mut consumer = builder.build_consumer().await?;
1823        consumer.subscribe(["ws_tmq_meta"]).await?;
1824
1825        {
1826            let mut stream = consumer.stream();
1827
1828            let mut count = 0;
1829            while let Some((offset, message)) = stream.try_next().await? {
1830                // Offset contains information for topic name, database name and vgroup id,
1831                //  similar to kafka topic/partition/offset.
1832                let _ = offset.topic();
1833                let _ = offset.database();
1834                let _ = offset.vgroup_id();
1835                count += 1;
1836
1837                // Different to kafka message, TDengine consumer would consume two kind of messages.
1838                //
1839                // 1. meta
1840                // 2. data
1841                // 3. meta + data
1842                match message {
1843                    MessageSet::Meta(meta) => {
1844                        let _raw = meta.as_raw_meta().await?;
1845                        // taos.write_meta(raw).await?;
1846
1847                        // meta data can be write to an database seamlessly by raw or json (to sql).
1848                        let json = meta.as_json_meta().await?;
1849                        for meta in &json {
1850                            let sql = meta.to_string();
1851                            log::debug!("sql: {}", sql);
1852                            if let Err(err) = taos.exec(sql).await {
1853                                match err.code() {
1854                                    Code::TAG_ALREADY_EXIST => log::trace!("tag already exists"),
1855                                    Code::TAG_NOT_EXIST => log::trace!("tag not exist"),
1856                                    Code::COLUMN_EXISTS => log::trace!("column already exists"),
1857                                    Code::COLUMN_NOT_EXIST => log::trace!("column not exists"),
1858                                    Code::INVALID_COLUMN_NAME => log::trace!("invalid column name"),
1859                                    Code::MODIFIED_ALREADY => log::trace!("modified already done"),
1860                                    Code::TABLE_NOT_EXIST => log::trace!("table does not exists"),
1861                                    Code::STABLE_NOT_EXIST => log::trace!("stable does not exists"),
1862                                    _ => {
1863                                        tracing::error!(count, "{}", err);
1864                                    }
1865                                }
1866                            }
1867                        }
1868                    }
1869                    MessageSet::Data(data) => {
1870                        // data message may have more than one data block for various tables.
1871                        while let Some(_data) = data.fetch_block().await? {
1872                            // dbg!(data.table_name());
1873                            // dbg!(data);
1874                        }
1875                    }
1876                    _ => unreachable!(),
1877                }
1878                consumer.commit(offset).await?;
1879            }
1880        }
1881        consumer.unsubscribe().await;
1882
1883        tokio::time::sleep(Duration::from_secs(2)).await;
1884
1885        taos.exec_many([
1886            "drop database ws_tmq_meta2",
1887            "drop topic ws_tmq_meta",
1888            "drop database ws_tmq_meta",
1889        ])
1890        .await?;
1891        Ok(())
1892    }
1893
1894    #[test]
1895    fn test_ws_tmq_meta_sync() -> anyhow::Result<()> {
1896        use taos_query::prelude::sync::*;
1897        // pretty_env_logger::formatted_builder()
1898        //     .filter_level(log::LevelFilter::Info)
1899        //     .init();
1900
1901        let taos = TaosBuilder::from_dsn("ws://localhost:6041")?.build()?;
1902        taos.exec_many([
1903            "drop topic if exists ws_tmq_meta_sync",
1904            "drop database if exists ws_tmq_meta_sync",
1905            "create database ws_tmq_meta_sync wal_retention_period 3600",
1906            "create topic ws_tmq_meta_sync with meta as database ws_tmq_meta_sync",
1907            "use ws_tmq_meta_sync",
1908            // kind 1: create super table using all types
1909            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1910            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1911            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1912            tags(t1 json)",
1913            // kind 2: create child table with json tag
1914            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1915            "create table tb1 using stb1 tags(NULL)",
1916            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1917            NULL, NULL, NULL, NULL, NULL,
1918            NULL, NULL, NULL, NULL)
1919            tb1 values(now, true, -2, -3, -4, -5, \
1920            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1921            254, 65534, 1, 1)",
1922            // kind 3: create super table with all types except json (especially for tags)
1923            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1924            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1925            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1926            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1927            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1928            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1929            // kind 4: create child table with all types except json
1930            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1931            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1932            254, 65534, 1, 1)",
1933            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1934            NULL, NULL, NULL, NULL, NULL,
1935            NULL, NULL, NULL, NULL)",
1936            // kind 5: create common table
1937            "create table `table` (ts timestamp, v int)",
1938            // kind 6: column in super table
1939            "alter table stb1 add column new1 bool",
1940            "alter table stb1 add column new2 tinyint",
1941            "alter table stb1 add column new10 nchar(16)",
1942            "alter table stb1 modify column new10 nchar(32)",
1943            "alter table stb1 drop column new10",
1944            "alter table stb1 drop column new2",
1945            "alter table stb1 drop column new1",
1946            // kind 7: add tag in super table
1947            "alter table `stb2` add tag new1 bool",
1948            "alter table `stb2` rename tag new1 new1_new",
1949            "alter table `stb2` modify tag t10 nchar(32)",
1950            "alter table `stb2` drop tag new1_new",
1951            // kind 8: column in common table
1952            "alter table `table` add column new1 bool",
1953            "alter table `table` add column new2 tinyint",
1954            "alter table `table` add column new10 nchar(16)",
1955            "alter table `table` modify column new10 nchar(32)",
1956            "alter table `table` rename column new10 new10_new",
1957            "alter table `table` drop column new10_new",
1958            "alter table `table` drop column new2",
1959            "alter table `table` drop column new1",
1960            // kind 9: drop normal table
1961            "drop table `table`",
1962            // kind 10: drop child table
1963            "drop table `tb2`, `tb1`",
1964            // kind 11: drop super table
1965            "drop table `stb2`",
1966            "drop table `stb1`",
1967        ])?;
1968
1969        taos.exec_many([
1970            "drop database if exists ws_tmq_meta_sync2",
1971            "create database if not exists ws_tmq_meta_sync2 wal_retention_period 3600",
1972            "use ws_tmq_meta_sync2",
1973        ])?;
1974
1975        let builder = TmqBuilder::new(
1976            "taos://localhost:6041?group.id=10&timeout=1000ms&auto.offset.reset=earliest",
1977        )?;
1978        let mut consumer = builder.build()?;
1979        consumer.subscribe(["ws_tmq_meta_sync"])?;
1980
1981        let topics = consumer.list_topics();
1982        log::debug!("topics: {:?}", topics);
1983
1984        let iter = consumer.iter_with_timeout(Timeout::from_secs(1));
1985
1986        for msg in iter {
1987            let (offset, message) = msg?;
1988            // Offset contains information for topic name, database name and vgroup id,
1989            //  similar to kafka topic/partition/offset.
1990            let _ = offset.topic();
1991            let _ = offset.database();
1992            let _ = offset.vgroup_id();
1993
1994            // Different to kafka message, TDengine consumer would consume two kind of messages.
1995            //
1996            // 1. meta
1997            // 2. data
1998            match message {
1999                MessageSet::Meta(meta) => {
2000                    let _raw = meta.as_raw_meta()?;
2001                    // taos.write_meta(raw)?;
2002
2003                    // meta data can be write to an database seamlessly by raw or json (to sql).
2004                    let json = meta.as_json_meta()?;
2005                    for meta in &json {
2006                        let sql = meta.to_string();
2007                        log::debug!("sql: {}", sql);
2008                        if let Err(err) = taos.exec(sql) {
2009                            match err.code() {
2010                                Code::TAG_ALREADY_EXIST => log::trace!("tag already exists"),
2011                                Code::TAG_NOT_EXIST => log::trace!("tag not exist"),
2012                                Code::COLUMN_EXISTS => log::trace!("column already exists"),
2013                                Code::COLUMN_NOT_EXIST => log::trace!("column not exists"),
2014                                Code::INVALID_COLUMN_NAME => log::trace!("invalid column name"),
2015                                Code::MODIFIED_ALREADY => log::trace!("modified already done"),
2016                                Code::TABLE_NOT_EXIST => log::trace!("table does not exists"),
2017                                Code::STABLE_NOT_EXIST => log::trace!("stable does not exists"),
2018                                _ => {
2019                                    log::error!("{}", err);
2020                                }
2021                            }
2022                        }
2023                    }
2024                }
2025                MessageSet::Data(data) => {
2026                    // data message may have more than one data block for various tables.
2027                    for block in data {
2028                        let _block = block?;
2029                        // dbg!(block.table_name());
2030                        // dbg!(block);
2031                    }
2032                }
2033                _ => unreachable!(),
2034            }
2035            consumer.commit(offset)?;
2036        }
2037
2038        // get assignments
2039        let assignments = consumer.assignments();
2040        log::debug!("assignments all: {:?}", assignments);
2041
2042        if let Some(assignments) = assignments {
2043            for (topic, assignment_vec) in assignments {
2044                for assignment in assignment_vec {
2045                    log::debug!("assignment: {:?} {:?}", topic, assignment);
2046                    let vgroup_id = assignment.vgroup_id();
2047                    let end = assignment.end();
2048
2049                    let position = consumer.position(&topic, vgroup_id);
2050                    log::debug!("position: {:?}", position);
2051                    let committed = consumer.committed(&topic, vgroup_id);
2052                    log::debug!("committed: {:?}", committed);
2053
2054                    let res = consumer.offset_seek(&topic, vgroup_id, end);
2055                    log::debug!("seek: {:?}", res);
2056
2057                    let position = consumer.position(&topic, vgroup_id);
2058                    log::debug!("after seek position: {:?}", position);
2059                    let committed = consumer.committed(&topic, vgroup_id);
2060                    log::debug!("after seek committed: {:?}", committed);
2061
2062                    let res = consumer.commit_offset(&topic, vgroup_id, end);
2063                    log::debug!("commit offset: {:?}", res);
2064
2065                    let position = consumer.position(&topic, vgroup_id);
2066                    log::debug!("after commit offset position: {:?}", position);
2067                    let committed = consumer.committed(&topic, vgroup_id);
2068                    log::debug!("after commit offset committed: {:?}", committed);
2069                }
2070            }
2071        }
2072
2073        consumer.unsubscribe();
2074
2075        std::thread::sleep(Duration::from_secs(5));
2076
2077        taos.exec_many([
2078            "drop database ws_tmq_meta_sync2",
2079            "drop topic ws_tmq_meta_sync",
2080            "drop database ws_tmq_meta_sync",
2081        ])?;
2082        Ok(())
2083    }
2084
2085    #[test]
2086    fn test_ws_tmq_metadata() -> anyhow::Result<()> {
2087        use taos_query::prelude::sync::*;
2088        // pretty_env_logger::formatted_builder()
2089        //     .filter_level(log::LevelFilter::Debug)
2090        //     .init();
2091
2092        let taos = TaosBuilder::from_dsn("ws://localhost:6041")?.build()?;
2093        taos.exec_many([
2094            "drop topic if exists ws_tmq_meta_sync3",
2095            "drop database if exists ws_tmq_meta_sync3",
2096            "create database ws_tmq_meta_sync3 wal_retention_period 3600",
2097            "create topic ws_tmq_meta_sync3 with meta as database ws_tmq_meta_sync3",
2098            "use ws_tmq_meta_sync3",
2099            // kind 1: create super table using all types
2100            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2101            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
2102            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2103            tags(t1 json)",
2104            // kind 2: create child table with json tag
2105            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
2106            "create table tb1 using stb1 tags(NULL)",
2107            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
2108            NULL, NULL, NULL, NULL, NULL,
2109            NULL, NULL, NULL, NULL)
2110            tb1 values(now, true, -2, -3, -4, -5, \
2111            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2112            254, 65534, 1, 1)",
2113            // kind 3: create super table with all types except json (especially for tags)
2114            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2115            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
2116            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2117            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
2118            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
2119            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
2120            // kind 4: create child table with all types except json
2121            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
2122            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2123            254, 65534, 1, 1)",
2124            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
2125            NULL, NULL, NULL, NULL, NULL,
2126            NULL, NULL, NULL, NULL)",
2127            // kind 5: create common table
2128            "create table `table` (ts timestamp, v int)",
2129            // kind 6: column in super table
2130            "alter table stb1 add column new1 bool",
2131            "alter table stb1 add column new2 tinyint",
2132            "alter table stb1 add column new10 nchar(16)",
2133            "alter table stb1 modify column new10 nchar(32)",
2134            "alter table stb1 drop column new10",
2135            "alter table stb1 drop column new2",
2136            "alter table stb1 drop column new1",
2137            // kind 7: add tag in super table
2138            "alter table `stb2` add tag new1 bool",
2139            "alter table `stb2` rename tag new1 new1_new",
2140            "alter table `stb2` modify tag t10 nchar(32)",
2141            "alter table `stb2` drop tag new1_new",
2142            // kind 8: column in common table
2143            "alter table `table` add column new1 bool",
2144            "alter table `table` add column new2 tinyint",
2145            "alter table `table` add column new10 nchar(16)",
2146            "alter table `table` modify column new10 nchar(32)",
2147            "alter table `table` rename column new10 new10_new",
2148            "alter table `table` drop column new10_new",
2149            "alter table `table` drop column new2",
2150            "alter table `table` drop column new1",
2151            // kind 9: drop normal table
2152            "drop table `table`",
2153            // kind 10: drop child table
2154            "drop table `tb2`, `tb1`",
2155            // kind 11: drop super table
2156            "drop table `stb2`",
2157            "drop table `stb1`",
2158        ])?;
2159
2160        taos.exec_many([
2161            "drop database if exists ws_tmq_meta_sync32",
2162            "create database if not exists ws_tmq_meta_sync32 wal_retention_period 3600",
2163            "use ws_tmq_meta_sync32",
2164        ])?;
2165
2166        let builder = TmqBuilder::new(
2167            "taos://localhost:6041?group.id=10&timeout=1000ms&auto.offset.reset=earliest",
2168        )?;
2169        let mut consumer = builder.build()?;
2170        consumer.subscribe(["ws_tmq_meta_sync3"])?;
2171
2172        let iter = consumer.iter_with_timeout(Timeout::from_secs(1));
2173
2174        for msg in iter {
2175            let (offset, message) = msg?;
2176            // Offset contains information for topic name, database name and vgroup id,
2177            //  similar to kafka topic/partition/offset.
2178            let _ = offset.topic();
2179            let _ = offset.database();
2180            let _ = offset.vgroup_id();
2181
2182            // Different to kafka message, TDengine consumer would consume two kind of messages.
2183            //
2184            // 1. meta
2185            // 2. data
2186            match message {
2187                MessageSet::Meta(meta) => {
2188                    let _raw = meta.as_raw_meta()?;
2189                    // taos.write_meta(raw)?;
2190
2191                    // meta data can be write to an database seamlessly by raw or json (to sql).
2192                    let json = meta.as_json_meta()?;
2193                    for json in &json {
2194                        let sql = json.to_string();
2195                        log::debug!("sql: {}", sql);
2196                        if let Err(err) = taos.exec(sql) {
2197                            match err.code() {
2198                                Code::TAG_ALREADY_EXIST => log::trace!("tag already exists"),
2199                                Code::TAG_NOT_EXIST => log::trace!("tag not exist"),
2200                                Code::COLUMN_EXISTS => log::trace!("column already exists"),
2201                                Code::COLUMN_NOT_EXIST => log::trace!("column not exists"),
2202                                Code::INVALID_COLUMN_NAME => log::trace!("invalid column name"),
2203                                Code::MODIFIED_ALREADY => log::trace!("modified already done"),
2204                                Code::TABLE_NOT_EXIST => log::trace!("table does not exists"),
2205                                Code::STABLE_NOT_EXIST => log::trace!("stable does not exists"),
2206                                _ => {
2207                                    log::error!("{}", err);
2208                                }
2209                            }
2210                        }
2211                    }
2212                }
2213                MessageSet::Data(data) => {
2214                    // data message may have more than one data block for various tables.
2215                    for block in data {
2216                        let _block = block?;
2217                        // dbg!(block.table_name());
2218                        // dbg!(block);
2219                    }
2220                }
2221                _ => unreachable!(),
2222            }
2223            consumer.commit(offset)?;
2224        }
2225        consumer.unsubscribe();
2226
2227        std::thread::sleep(Duration::from_secs(5));
2228
2229        taos.exec_many([
2230            "drop database ws_tmq_meta_sync32",
2231            "drop topic ws_tmq_meta_sync3",
2232            "drop database ws_tmq_meta_sync3",
2233        ])?;
2234        Ok(())
2235    }
2236
2237    #[cfg(feature = "rustls")]
2238    #[tokio::test]
2239    async fn test_consumer_cloud() -> anyhow::Result<()> {
2240        use taos_query::prelude::*;
2241        std::env::set_var("RUST_LOG", "debug");
2242        // let _ = pretty_env_logger::formatted_builder()
2243        //     .filter_level(log::LevelFilter::Debug)
2244        //     .try_init();
2245        let dsn = std::env::var("TDENGINE_ClOUD_DSN");
2246        if dsn.is_err() {
2247            println!("Skip test when not in cloud");
2248            return Ok(());
2249        }
2250        let dsn = dsn.unwrap();
2251
2252        let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
2253        taos.exec_many([
2254            "drop topic if exists ws_tmq_meta",
2255            "drop database if exists ws_tmq_meta",
2256            "create database ws_tmq_meta wal_retention_period 3600",
2257            "create topic ws_tmq_meta with meta as database ws_tmq_meta",
2258            "use ws_tmq_meta",
2259            // kind 1: create super table using all types
2260            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2261            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
2262            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2263            tags(t1 json)",
2264            // kind 2: create child table with json tag
2265            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
2266            "create table tb1 using stb1 tags(NULL)",
2267            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
2268            NULL, NULL, NULL, NULL, NULL,
2269            NULL, NULL, NULL, NULL)
2270            tb1 values(now, true, -2, -3, -4, -5, \
2271            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2272            254, 65534, 1, 1)",
2273            // kind 3: create super table with all types except json (especially for tags)
2274            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2275            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
2276            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2277            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
2278            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
2279            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
2280            // kind 4: create child table with all types except json
2281            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
2282            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2283            254, 65534, 1, 1)",
2284            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
2285            NULL, NULL, NULL, NULL, NULL,
2286            NULL, NULL, NULL, NULL)",
2287            // kind 5: create common table
2288            "create table `table` (ts timestamp, v int)",
2289            // kind 6: column in super table
2290            "alter table stb1 add column new1 bool",
2291            "alter table stb1 add column new2 tinyint",
2292            "alter table stb1 add column new10 nchar(16)",
2293            "alter table stb1 modify column new10 nchar(32)",
2294            "alter table stb1 drop column new10",
2295            "alter table stb1 drop column new2",
2296            "alter table stb1 drop column new1",
2297            // kind 7: add tag in super table
2298            "alter table `stb2` add tag new1 bool",
2299            "alter table `stb2` rename tag new1 new1_new",
2300            "alter table `stb2` modify tag t10 nchar(32)",
2301            "alter table `stb2` drop tag new1_new",
2302            // kind 8: column in common table
2303            "alter table `table` add column new1 bool",
2304            "alter table `table` add column new2 tinyint",
2305            "alter table `table` add column new10 nchar(16)",
2306            "alter table `table` modify column new10 nchar(32)",
2307            "alter table `table` rename column new10 new10_new",
2308            "alter table `table` drop column new10_new",
2309            "alter table `table` drop column new2",
2310            "alter table `table` drop column new1",
2311        ])
2312        .await?;
2313
2314        taos.exec_many([
2315            "drop database if exists ws_tmq_meta2",
2316            "create database if not exists ws_tmq_meta2 wal_retention_period 3600",
2317            "use ws_tmq_meta2",
2318        ])
2319        .await?;
2320
2321        let builder = TmqBuilder::new(&dsn)?;
2322        let mut consumer = builder.build_consumer().await?;
2323        consumer.subscribe(["ws_tmq_meta"]).await?;
2324
2325        {
2326            let mut stream = consumer.stream();
2327
2328            while let Some((offset, message)) = stream.try_next().await? {
2329                // Offset contains information for topic name, database name and vgroup id,
2330                //  similar to kafka topic/partition/offset.
2331                let _ = offset.topic();
2332                let _ = offset.database();
2333                let _ = offset.vgroup_id();
2334
2335                match message {
2336                    MessageSet::Meta(meta) => {
2337                        let _raw = meta.as_raw_meta().await?;
2338
2339                        // meta data can be write to an database seamlessly by raw or json (to sql).
2340                        let json = meta.as_json_meta().await?;
2341                        for meta in &json {
2342                            let sql = meta.to_string();
2343                            log::debug!("sql: {}", sql);
2344                            if let Err(err) = taos.exec(sql).await {
2345                                match err.code() {
2346                                    Code::TAG_ALREADY_EXIST => log::trace!("tag already exists"),
2347                                    Code::TAG_NOT_EXIST => log::trace!("tag not exist"),
2348                                    Code::COLUMN_EXISTS => log::trace!("column already exists"),
2349                                    Code::COLUMN_NOT_EXIST => log::trace!("column not exists"),
2350                                    Code::INVALID_COLUMN_NAME => log::trace!("invalid column name"),
2351                                    Code::MODIFIED_ALREADY => log::trace!("modified already done"),
2352                                    Code::TABLE_NOT_EXIST => log::trace!("table does not exists"),
2353                                    Code::STABLE_NOT_EXIST => log::trace!("stable does not exists"),
2354                                    _ => {
2355                                        log::error!("{}", err);
2356                                    }
2357                                }
2358                            }
2359                        }
2360                    }
2361                    MessageSet::Data(data) => {
2362                        // data message may have more than one data block for various tables.
2363                        while let Some(_data) = data.fetch_block().await? {
2364                            // dbg!(data);
2365                        }
2366                    }
2367                    _ => unreachable!(),
2368                }
2369                consumer.commit(offset).await?;
2370            }
2371        }
2372        consumer.unsubscribe().await;
2373
2374        tokio::time::sleep(Duration::from_secs(2)).await;
2375
2376        taos.exec_many([
2377            "drop database ws_tmq_meta2",
2378            "drop topic ws_tmq_meta",
2379            "drop database ws_tmq_meta",
2380        ])
2381        .await?;
2382        Ok(())
2383    }
2384}