taos/
tmq.rs

1use taos_query::{
2    prelude::{AsAsyncConsumer, RawMeta, Timeout},
3    tmq::{Assignment, VGroupId},
4    RawBlock, RawResult,
5};
6
7#[derive(Debug)]
8enum TmqBuilderInner {
9    Native(crate::sys::TmqBuilder),
10    Ws(taos_ws::consumer::TmqBuilder),
11}
12
13#[derive(Debug)]
14enum ConsumerInner {
15    Native(crate::sys::Consumer),
16    Ws(taos_ws::consumer::Consumer),
17}
18
19#[derive(Debug)]
20enum OffsetInner {
21    Native(crate::sys::tmq::Offset),
22    Ws(taos_ws::consumer::Offset),
23}
24
25#[derive(Debug)]
26enum MetaInner {
27    Native(crate::sys::tmq::Meta),
28    Ws(taos_ws::consumer::Meta),
29}
30
31#[derive(Debug)]
32enum DataInner {
33    Native(crate::sys::tmq::Data),
34    Ws(taos_ws::consumer::Data),
35}
36
37#[derive(Debug)]
38pub struct Offset(OffsetInner);
39
40#[derive(Debug)]
41pub struct Meta(MetaInner);
42
43#[derive(Debug)]
44pub struct Data(DataInner);
45
46pub type MessageSet<Meta, Data> = taos_query::tmq::MessageSet<Meta, Data>;
47
48#[derive(Debug)]
49pub struct TmqBuilder(TmqBuilderInner);
50
51#[derive(Debug)]
52pub struct Consumer(ConsumerInner);
53
54impl taos_query::TBuilder for TmqBuilder {
55    type Target = Consumer;
56
57    fn available_params() -> &'static [&'static str] {
58        &[]
59    }
60
61    fn from_dsn<D: taos_query::IntoDsn>(dsn: D) -> RawResult<Self> {
62        let dsn = dsn.into_dsn()?;
63        // dbg!(&dsn);
64        match (dsn.driver.as_str(), dsn.protocol.as_deref()) {
65            ("ws" | "wss" | "http" | "https" | "taosws", _) => Ok(Self(TmqBuilderInner::Ws(
66                taos_ws::consumer::TmqBuilder::from_dsn(dsn)?,
67            ))),
68            ("taos" | "tmq", None) => Ok(Self(TmqBuilderInner::Native(
69                crate::sys::TmqBuilder::from_dsn(dsn)?,
70            ))),
71            ("taos" | "tmq", Some("ws" | "wss" | "http" | "https")) => Ok(Self(
72                TmqBuilderInner::Ws(taos_ws::consumer::TmqBuilder::from_dsn(dsn)?),
73            )),
74            (driver, _) => Err(taos_query::DsnError::InvalidDriver(driver.to_string()).into()),
75        }
76    }
77
78    fn client_version() -> &'static str {
79        ""
80    }
81
82    fn ping(&self, conn: &mut Self::Target) -> RawResult<()> {
83        match &self.0 {
84            TmqBuilderInner::Native(b) => match &mut conn.0 {
85                ConsumerInner::Native(taos) => Ok(b.ping(taos)?),
86                _ => unreachable!(),
87            },
88            TmqBuilderInner::Ws(b) => match &mut conn.0 {
89                ConsumerInner::Ws(taos) => Ok(b.ping(taos)?),
90                _ => unreachable!(),
91            },
92        }
93    }
94
95    fn ready(&self) -> bool {
96        match &self.0 {
97            TmqBuilderInner::Native(b) => b.ready(),
98            TmqBuilderInner::Ws(b) => b.ready(),
99        }
100    }
101
102    fn build(&self) -> RawResult<Self::Target> {
103        match &self.0 {
104            TmqBuilderInner::Native(b) => Ok(Consumer(ConsumerInner::Native(b.build()?))),
105            TmqBuilderInner::Ws(b) => Ok(Consumer(ConsumerInner::Ws(b.build()?))),
106        }
107    }
108
109    fn server_version(&self) -> RawResult<&str> {
110        todo!()
111    }
112
113    fn is_enterprise_edition(&self) -> RawResult<bool> {
114        todo!()
115    }
116
117    fn get_edition(&self) -> RawResult<taos_query::util::Edition> {
118        match &self.0 {
119            TmqBuilderInner::Native(b) => Ok(b.get_edition()?),
120            TmqBuilderInner::Ws(b) => Ok(b.get_edition()?),
121        }
122    }
123}
124
125#[async_trait::async_trait]
126impl taos_query::AsyncTBuilder for TmqBuilder {
127    type Target = Consumer;
128
129    fn from_dsn<D: taos_query::IntoDsn>(dsn: D) -> RawResult<Self> {
130        let dsn = dsn.into_dsn()?;
131        // dbg!(&dsn);
132        match (dsn.driver.as_str(), dsn.protocol.as_deref()) {
133            ("ws" | "wss" | "http" | "https" | "taosws", _) => Ok(Self(TmqBuilderInner::Ws(
134                taos_ws::consumer::TmqBuilder::from_dsn(dsn)?,
135            ))),
136            ("taos" | "tmq", None) => Ok(Self(TmqBuilderInner::Native(
137                crate::sys::TmqBuilder::from_dsn(dsn)?,
138            ))),
139            ("taos" | "tmq", Some("ws" | "wss" | "http" | "https")) => Ok(Self(
140                TmqBuilderInner::Ws(taos_ws::consumer::TmqBuilder::from_dsn(dsn)?),
141            )),
142            (driver, _) => Err(taos_query::DsnError::InvalidDriver(driver.to_string()).into()),
143        }
144    }
145
146    fn client_version() -> &'static str {
147        ""
148    }
149
150    async fn ping(&self, conn: &mut Self::Target) -> RawResult<()> {
151        match &self.0 {
152            TmqBuilderInner::Native(b) => match &mut conn.0 {
153                ConsumerInner::Native(taos) => Ok(b.ping(taos).await?),
154                _ => unreachable!(),
155            },
156            TmqBuilderInner::Ws(b) => match &mut conn.0 {
157                ConsumerInner::Ws(taos) => Ok(b.ping(taos).await?),
158                _ => unreachable!(),
159            },
160        }
161    }
162
163    async fn ready(&self) -> bool {
164        match &self.0 {
165            TmqBuilderInner::Native(b) => b.ready().await,
166            TmqBuilderInner::Ws(b) => b.ready().await,
167        }
168    }
169
170    async fn build(&self) -> RawResult<Self::Target> {
171        match &self.0 {
172            TmqBuilderInner::Native(b) => Ok(Consumer(ConsumerInner::Native(b.build().await?))),
173            TmqBuilderInner::Ws(b) => Ok(Consumer(ConsumerInner::Ws(b.build().await?))),
174        }
175    }
176
177    async fn server_version(&self) -> RawResult<&str> {
178        match &self.0 {
179            TmqBuilderInner::Native(b) => Ok(b.server_version().await?),
180            TmqBuilderInner::Ws(b) => Ok(b.server_version().await?),
181        }
182    }
183
184    async fn is_enterprise_edition(&self) -> RawResult<bool> {
185        match &self.0 {
186            TmqBuilderInner::Native(b) => Ok(b.is_enterprise_edition().await?),
187            TmqBuilderInner::Ws(b) => Ok(b.is_enterprise_edition().await?),
188        }
189    }
190
191    async fn get_edition(&self) -> RawResult<taos_query::util::Edition> {
192        match &self.0 {
193            TmqBuilderInner::Native(b) => Ok(b.get_edition().await?),
194            TmqBuilderInner::Ws(b) => Ok(b.get_edition().await?),
195        }
196    }
197}
198
199impl taos_query::tmq::IsOffset for Offset {
200    fn database(&self) -> &str {
201        match &self.0 {
202            OffsetInner::Native(offset) => {
203                <crate::sys::tmq::Offset as taos_query::tmq::IsOffset>::database(offset)
204            }
205            OffsetInner::Ws(offset) => {
206                <taos_ws::consumer::Offset as taos_query::tmq::IsOffset>::database(offset)
207            }
208        }
209    }
210
211    fn topic(&self) -> &str {
212        match &self.0 {
213            OffsetInner::Native(offset) => {
214                <crate::sys::tmq::Offset as taos_query::tmq::IsOffset>::topic(offset)
215            }
216            OffsetInner::Ws(offset) => {
217                <taos_ws::consumer::Offset as taos_query::tmq::IsOffset>::topic(offset)
218            }
219        }
220    }
221
222    fn vgroup_id(&self) -> taos_query::tmq::VGroupId {
223        match &self.0 {
224            OffsetInner::Native(offset) => {
225                <crate::sys::tmq::Offset as taos_query::tmq::IsOffset>::vgroup_id(offset)
226            }
227            OffsetInner::Ws(offset) => {
228                <taos_ws::consumer::Offset as taos_query::tmq::IsOffset>::vgroup_id(offset)
229            }
230        }
231    }
232}
233
234#[async_trait::async_trait]
235impl taos_query::tmq::IsAsyncMeta for Meta {
236    async fn as_raw_meta(&self) -> RawResult<RawMeta> {
237        match &self.0 {
238            MetaInner::Native(data) => {
239                <crate::sys::tmq::Meta as taos_query::tmq::IsAsyncMeta>::as_raw_meta(data)
240                    .await
241                    .map_err(Into::into)
242            }
243            MetaInner::Ws(data) => {
244                <taos_ws::consumer::Meta as taos_query::tmq::IsAsyncMeta>::as_raw_meta(data)
245                    .await
246                    .map_err(Into::into)
247            }
248        }
249    }
250
251    async fn as_json_meta(&self) -> RawResult<taos_query::common::JsonMeta> {
252        match &self.0 {
253            MetaInner::Native(data) => {
254                <crate::sys::tmq::Meta as taos_query::tmq::IsAsyncMeta>::as_json_meta(data)
255                    .await
256                    .map_err(Into::into)
257            }
258            MetaInner::Ws(data) => {
259                <taos_ws::consumer::Meta as taos_query::tmq::IsAsyncMeta>::as_json_meta(data)
260                    .await
261                    .map_err(Into::into)
262            }
263        }
264    }
265}
266
267#[async_trait::async_trait]
268impl taos_query::tmq::IsAsyncData for Data {
269    async fn as_raw_data(&self) -> RawResult<taos_query::common::RawData> {
270        match &self.0 {
271            DataInner::Native(data) => {
272                <crate::sys::tmq::Data as taos_query::tmq::IsAsyncData>::as_raw_data(data)
273                    .await
274                    .map_err(Into::into)
275            }
276            DataInner::Ws(data) => {
277                <taos_ws::consumer::Data as taos_query::tmq::IsAsyncData>::as_raw_data(data)
278                    .await
279                    .map_err(Into::into)
280            }
281        }
282    }
283
284    async fn fetch_raw_block(&self) -> RawResult<Option<taos_query::RawBlock>> {
285        match &self.0 {
286            DataInner::Native(data) => {
287                <crate::sys::tmq::Data as taos_query::tmq::IsAsyncData>::fetch_raw_block(data)
288                    .await
289                    .map_err(Into::into)
290            }
291            DataInner::Ws(data) => {
292                <taos_ws::consumer::Data as taos_query::tmq::IsAsyncData>::fetch_raw_block(data)
293                    .await
294                    .map_err(Into::into)
295            }
296        }
297    }
298}
299
300#[async_trait::async_trait]
301impl AsAsyncConsumer for Consumer {
302    type Offset = Offset;
303
304    type Meta = Meta;
305
306    type Data = Data;
307
308    fn default_timeout(&self) -> Timeout {
309        match &self.0 {
310            ConsumerInner::Native(c) => {
311                <crate::sys::Consumer as AsAsyncConsumer>::default_timeout(c)
312            }
313            ConsumerInner::Ws(c) => {
314                <taos_ws::consumer::Consumer as AsAsyncConsumer>::default_timeout(c)
315            }
316        }
317    }
318
319    async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
320        &mut self,
321        topics: I,
322    ) -> RawResult<()> {
323        match &mut self.0 {
324            ConsumerInner::Native(c) => {
325                <crate::sys::Consumer as AsAsyncConsumer>::subscribe(c, topics)
326                    .await
327                    .map_err(Into::into)
328            }
329            ConsumerInner::Ws(c) => {
330                <taos_ws::consumer::Consumer as AsAsyncConsumer>::subscribe(c, topics)
331                    .await
332                    .map_err(Into::into)
333            }
334        }
335    }
336
337    async fn unsubscribe(self) {
338        match self.0 {
339            ConsumerInner::Native(c) => {
340                <crate::sys::Consumer as AsAsyncConsumer>::unsubscribe(c).await
341            }
342            ConsumerInner::Ws(c) => {
343                <taos_ws::consumer::Consumer as AsAsyncConsumer>::unsubscribe(c).await
344            }
345        }
346    }
347
348    async fn recv_timeout(
349        &self,
350        timeout: Timeout,
351    ) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>> {
352        match &self.0 {
353            ConsumerInner::Native(c) => {
354                <crate::sys::Consumer as AsAsyncConsumer>::recv_timeout(c, timeout)
355                    .await
356                    .map_err(Into::into)
357                    .map(|msg| {
358                        msg.map(|(offset, msg)| {
359                            (
360                                Offset(OffsetInner::Native(offset)),
361                                match msg {
362                                    MessageSet::Meta(meta) => {
363                                        MessageSet::Meta(Meta(MetaInner::Native(meta)))
364                                    }
365                                    MessageSet::Data(data) => {
366                                        MessageSet::Data(Data(DataInner::Native(data)))
367                                    }
368                                    MessageSet::MetaData(meta, data) => MessageSet::MetaData(
369                                        Meta(MetaInner::Native(meta)),
370                                        Data(DataInner::Native(data)),
371                                    ),
372                                },
373                            )
374                        })
375                    })
376            }
377            ConsumerInner::Ws(c) => {
378                <taos_ws::consumer::Consumer as AsAsyncConsumer>::recv_timeout(c, timeout)
379                    .await
380                    .map_err(Into::into)
381                    .map(|msg| {
382                        msg.map(|(offset, msg)| {
383                            (
384                                Offset(OffsetInner::Ws(offset)),
385                                match msg {
386                                    taos_query::tmq::MessageSet::Meta(meta) => {
387                                        MessageSet::Meta(Meta(MetaInner::Ws(meta)))
388                                    }
389                                    taos_query::tmq::MessageSet::Data(data) => {
390                                        MessageSet::Data(Data(DataInner::Ws(data)))
391                                    }
392                                    taos_query::tmq::MessageSet::MetaData(meta, data) => {
393                                        MessageSet::MetaData(
394                                            Meta(MetaInner::Ws(meta)),
395                                            Data(DataInner::Ws(data)),
396                                        )
397                                    }
398                                },
399                            )
400                        })
401                    })
402            }
403        }
404    }
405
406    async fn commit(&self, offset: Self::Offset) -> RawResult<()> {
407        match &self.0 {
408            ConsumerInner::Native(c) => match offset.0 {
409                OffsetInner::Native(offset) => {
410                    <crate::sys::Consumer as AsAsyncConsumer>::commit(c, offset)
411                        .await
412                        .map_err(Into::into)
413                }
414                OffsetInner::Ws(_) => unreachable!(),
415            },
416            ConsumerInner::Ws(c) => match offset.0 {
417                OffsetInner::Ws(offset) => {
418                    <taos_ws::consumer::Consumer as AsAsyncConsumer>::commit(c, offset)
419                        .await
420                        .map_err(Into::into)
421                }
422                _ => unreachable!(),
423            },
424        }
425    }
426
427    async fn commit_offset(&self, topic: &str, vgroup_id: VGroupId, offset: i64) -> RawResult<()> {
428        match &self.0 {
429            ConsumerInner::Native(c) => <crate::sys::Consumer as AsAsyncConsumer>::commit_offset(
430                c, topic, vgroup_id, offset,
431            )
432            .await
433            .map_err(Into::into),
434            ConsumerInner::Ws(c) => {
435                <taos_ws::consumer::Consumer as AsAsyncConsumer>::commit_offset(
436                    c, topic, vgroup_id, offset,
437                )
438                .await
439                .map_err(Into::into)
440            }
441        }
442    }
443
444    async fn list_topics(&self) -> RawResult<Vec<String>> {
445        match &self.0 {
446            ConsumerInner::Native(c) => <crate::sys::Consumer as AsAsyncConsumer>::list_topics(c)
447                .await
448                .map_err(Into::into),
449            ConsumerInner::Ws(c) => {
450                <taos_ws::consumer::Consumer as AsAsyncConsumer>::list_topics(c)
451                    .await
452                    .map_err(Into::into)
453            }
454        }
455    }
456
457    async fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>> {
458        match &self.0 {
459            ConsumerInner::Native(c) => {
460                <crate::sys::Consumer as AsAsyncConsumer>::assignments(c).await
461            }
462            ConsumerInner::Ws(c) => {
463                <taos_ws::consumer::Consumer as AsAsyncConsumer>::assignments(c).await
464            }
465        }
466    }
467
468    async fn topic_assignment(&self, topic: &str) -> Vec<Assignment> {
469        match &self.0 {
470            ConsumerInner::Native(c) => {
471                <crate::sys::Consumer as AsAsyncConsumer>::topic_assignment(c, topic).await
472            }
473            ConsumerInner::Ws(c) => {
474                <taos_ws::consumer::Consumer as AsAsyncConsumer>::topic_assignment(c, topic).await
475            }
476        }
477    }
478
479    async fn offset_seek(
480        &mut self,
481        topic: &str,
482        vgroup_id: VGroupId,
483        offset: i64,
484    ) -> RawResult<()> {
485        match &mut self.0 {
486            ConsumerInner::Native(c) => {
487                <crate::sys::Consumer as AsAsyncConsumer>::offset_seek(c, topic, vgroup_id, offset)
488                    .await
489                    .map_err(Into::into)
490            }
491            ConsumerInner::Ws(c) => <taos_ws::consumer::Consumer as AsAsyncConsumer>::offset_seek(
492                c, topic, vgroup_id, offset,
493            )
494            .await
495            .map_err(Into::into),
496        }
497    }
498
499    async fn committed(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
500        match &self.0 {
501            ConsumerInner::Native(c) => {
502                <crate::sys::Consumer as AsAsyncConsumer>::committed(c, topic, vgroup_id)
503                    .await
504                    .map_err(Into::into)
505            }
506            ConsumerInner::Ws(c) => {
507                <taos_ws::consumer::Consumer as AsAsyncConsumer>::committed(c, topic, vgroup_id)
508                    .await
509                    .map_err(Into::into)
510            }
511        }
512    }
513
514    async fn position(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
515        match &self.0 {
516            ConsumerInner::Native(c) => {
517                <crate::sys::Consumer as AsAsyncConsumer>::position(c, topic, vgroup_id)
518                    .await
519                    .map_err(Into::into)
520            }
521            ConsumerInner::Ws(c) => {
522                <taos_ws::consumer::Consumer as AsAsyncConsumer>::position(c, topic, vgroup_id)
523                    .await
524                    .map_err(Into::into)
525            }
526        }
527    }
528}
529
530impl taos_query::tmq::SyncOnAsync for Consumer {}
531impl taos_query::tmq::SyncOnAsync for Data {}
532impl taos_query::tmq::SyncOnAsync for Meta {}
533
534impl Iterator for Data {
535    type Item = RawResult<RawBlock>;
536
537    fn next(&mut self) -> Option<Self::Item> {
538        match &self.0 {
539            DataInner::Native(data) => {
540                <crate::sys::tmq::Data as taos_query::tmq::IsData>::fetch_raw_block(data)
541                    .transpose()
542            }
543            DataInner::Ws(data) => {
544                <taos_ws::consumer::Data as taos_query::tmq::IsData>::fetch_raw_block(data)
545                    .transpose()
546            }
547        }
548    }
549}
550// impl taos_query::tmq::AsConsumer for Consumer {}
551#[cfg(test)]
552mod tests {
553
554    use super::TmqBuilder;
555
556    #[test]
557    fn builder() -> taos_query::RawResult<()> {
558        use taos_query::prelude::*;
559        let mut dsn: Dsn = "taos://".parse()?;
560        dsn.set("group.id", "group1");
561        dsn.set("client.id", "test");
562        dsn.set("auto.offset.reset", "earliest");
563
564        let _tmq = TmqBuilder::from_dsn(dsn)?;
565        Ok(())
566    }
567}
568
569#[cfg(test)]
570mod async_tests {
571    use std::{str::FromStr, time::Duration};
572
573    use super::TmqBuilder;
574    use crate::TaosBuilder;
575
576    #[tokio::test]
577    async fn test_ws_tmq_meta() -> taos_query::RawResult<()> {
578        // pretty_env_logger::formatted_timed_builder()
579        //     .filter_level(log::LevelFilter::Debug)
580        //     .init();
581        use taos_query::prelude::*;
582        let dsn = std::env::var("TEST_DSN").unwrap_or("taos+ws://localhost:6041".to_string());
583        let mut dsn = Dsn::from_str(&dsn)?;
584
585        let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
586
587        let db = "ws_abc1";
588
589        taos.exec(format!("drop topic if exists {db}")).await?;
590        taos.exec(format!("drop database if exists {db}")).await?;
591
592        std::thread::sleep(std::time::Duration::from_secs(3));
593
594        taos.exec(format!(
595            "create database if not exists {db} wal_retention_period 3600"
596        ))
597        .await?;
598
599        std::thread::sleep(std::time::Duration::from_secs(3));
600
601        taos.exec_many([
602            "create topic ws_abc1 with meta as database ws_abc1",
603            "use ws_abc1",
604            // kind 1: create super table using all types
605            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
606            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
607            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
608            tags(t1 json)",
609            // kind 2: create child table with json tag
610            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
611            "create table tb1 using stb1 tags(NULL)",
612            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
613            NULL, NULL, NULL, NULL, NULL,
614            NULL, NULL, NULL, NULL)
615            tb1 values(now, true, -2, -3, -4, -5, \
616            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
617            254, 65534, 1, 1)",
618            // kind 3: create super table with all types except json (especially for tags)
619            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
620            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
621            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
622            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
623            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
624            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
625            // kind 4: create child table with all types except json
626            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
627            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
628            254, 65534, 1, 1)",
629            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
630            NULL, NULL, NULL, NULL, NULL,
631            NULL, NULL, NULL, NULL)",
632            // kind 5: create common table
633            "create table `table` (ts timestamp, v int)",
634            // kind 6: column in super table
635            "alter table stb1 add column new1 bool",
636            "alter table stb1 add column new2 tinyint",
637            "alter table stb1 add column new10 nchar(16)",
638            "alter table stb1 modify column new10 nchar(32)",
639            "alter table stb1 drop column new10",
640            "alter table stb1 drop column new2",
641            "alter table stb1 drop column new1",
642            // kind 7: add tag in super table
643            "alter table `stb2` add tag new1 bool",
644            "alter table `stb2` rename tag new1 new1_new",
645            "alter table `stb2` modify tag t10 nchar(32)",
646            "alter table `stb2` drop tag new1_new",
647            // kind 8: column in common table
648            "alter table `table` add column new1 bool",
649            "alter table `table` add column new2 tinyint",
650            "alter table `table` add column new10 nchar(16)",
651            "alter table `table` modify column new10 nchar(32)",
652            "alter table `table` rename column new10 new10_new",
653            "alter table `table` drop column new10_new",
654            "alter table `table` drop column new2",
655            "alter table `table` drop column new1",
656            // kind 9: drop normal table
657            "drop table `table`",
658            // kind 10: drop child table
659            "drop table `tb2`, `tb1`",
660            // kind 11: drop super table
661            "drop table `stb2`",
662            "drop table `stb1`",
663        ])
664        .await?;
665
666        taos.exec_many([
667            "drop database if exists db2",
668            "create database if not exists db2 wal_retention_period 3600",
669            "use db2",
670        ])
671        .await?;
672
673        dsn.params.insert("group.id".to_string(), "abc".to_string());
674
675        dsn.params
676            .insert("auto.offset.reset".to_string(), "earliest".to_string());
677
678        let builder = TmqBuilder::from_dsn(&dsn)?;
679        let mut consumer = builder.build().await?;
680        consumer.subscribe(["ws_abc1"]).await?;
681
682        {
683            let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
684
685            while let Some((offset, message)) = stream.try_next().await? {
686                // Offset contains information for topic name, database name and vgroup id,
687                //  similar to kafka topic/partition/offset.
688                let _ = offset.topic();
689                let _ = offset.database();
690                let _ = offset.vgroup_id();
691
692                // Different to kafka message, TDengine consumer would consume two kind of messages.
693                //
694                // 1. meta
695                // 2. data
696                match message {
697                    MessageSet::Meta(meta) => {
698                        let raw = meta.as_raw_meta().await?;
699                        taos.write_raw_meta(&raw).await?;
700
701                        // meta data can be write to an database seamlessly by raw or json (to sql).
702                        let json = meta.as_json_meta().await?;
703                        let sql = json.iter().next().unwrap().to_string();
704                        if let Err(err) = taos.exec(sql).await {
705                            println!("maybe error: {}", err);
706                        }
707                    }
708                    MessageSet::Data(data) => {
709                        // data message may have more than one data block for various tables.
710                        while let Some(data) = data.fetch_raw_block().await? {
711                            dbg!(data.table_name());
712                            dbg!(data);
713                        }
714                    }
715                    MessageSet::MetaData(meta, data) => {
716                        let raw = meta.as_raw_meta().await?;
717                        taos.write_raw_meta(&raw).await?;
718
719                        // meta data can be write to an database seamlessly by raw or json (to sql).
720                        let json = meta.as_json_meta().await?;
721                        let sql = json.iter().next().unwrap().to_string();
722                        if let Err(err) = taos.exec(sql).await {
723                            println!("maybe error: {}", err);
724                        }
725                        // data message may have more than one data block for various tables.
726                        while let Some(data) = data.fetch_raw_block().await? {
727                            dbg!(data.table_name());
728                            dbg!(data);
729                        }
730                    }
731                }
732                consumer.commit(offset).await?;
733            }
734        }
735        consumer.unsubscribe().await;
736
737        tokio::time::sleep(Duration::from_secs(2)).await;
738
739        taos.exec_many([
740            "drop database db2",
741            "drop topic ws_abc1",
742            "drop database ws_abc1",
743        ])
744        .await?;
745        Ok(())
746    }
747
748    #[tokio::test]
749    #[ignore]
750    async fn test_tmq() -> taos_query::RawResult<()> {
751        // pretty_env_logger::formatted_timed_builder()
752        //     .filter_level(log::LevelFilter::Info)
753        //     .init();
754
755        use taos_query::prelude::*;
756        // let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
757        let dsn = "taos://localhost:6030".to_string();
758        log::info!("dsn: {}", dsn);
759        let mut dsn = Dsn::from_str(&dsn)?;
760
761        let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
762        taos.exec_many([
763            "drop topic if exists ws_abc1",
764            "drop database if exists ws_abc1",
765            "create database ws_abc1 wal_retention_period 3600",
766            "create topic ws_abc1 with meta as database ws_abc1",
767            "use ws_abc1",
768            // kind 1: create super table using all types
769            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
770            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
771            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
772            tags(t1 json)",
773            // kind 2: create child table with json tag
774            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
775            "create table tb1 using stb1 tags(NULL)",
776            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
777            NULL, NULL, NULL, NULL, NULL,
778            NULL, NULL, NULL, NULL)
779            tb1 values(now, true, -2, -3, -4, -5, \
780            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
781            254, 65534, 1, 1)",
782            // kind 3: create super table with all types except json (especially for tags)
783            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
784            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
785            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
786            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
787            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
788            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
789            // kind 4: create child table with all types except json
790            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
791            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
792            254, 65534, 1, 1)",
793            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
794            NULL, NULL, NULL, NULL, NULL,
795            NULL, NULL, NULL, NULL)",
796            // kind 5: create common table
797            "create table `table` (ts timestamp, v int)",
798            // kind 6: column in super table
799            "alter table stb1 add column new1 bool",
800            "alter table stb1 add column new2 tinyint",
801            "alter table stb1 add column new10 nchar(16)",
802            "alter table stb1 modify column new10 nchar(32)",
803            "alter table stb1 drop column new10",
804            "alter table stb1 drop column new2",
805            "alter table stb1 drop column new1",
806            // kind 7: add tag in super table
807            "alter table `stb2` add tag new1 bool",
808            "alter table `stb2` rename tag new1 new1_new",
809            "alter table `stb2` modify tag t10 nchar(32)",
810            "alter table `stb2` drop tag new1_new",
811            // kind 8: column in common table
812            "alter table `table` add column new1 bool",
813            "alter table `table` add column new2 tinyint",
814            "alter table `table` add column new10 nchar(16)",
815            "alter table `table` modify column new10 nchar(32)",
816            "alter table `table` rename column new10 new10_new",
817            "alter table `table` drop column new10_new",
818            "alter table `table` drop column new2",
819            "alter table `table` drop column new1",
820            // kind 9: drop normal table
821            // "drop table `table`",
822            // kind 10: drop child table
823            // "drop table `tb2`, `tb1`",
824            // kind 11: drop super table
825            // "drop table `stb2`",
826            // "drop table `stb1`",
827        ])
828        .await?;
829
830        taos.exec_many([
831            "drop database if exists db2",
832            "create database if not exists db2 wal_retention_period 3600",
833            "use db2",
834        ])
835        .await?;
836
837        dsn.params.insert("group.id".to_string(), "abc".to_string());
838
839        dsn.params
840            .insert("auto.offset.reset".to_string(), "earliest".to_string());
841
842        let builder = TmqBuilder::from_dsn(&dsn)?;
843        let mut consumer = builder.build().await?;
844        consumer.subscribe(["ws_abc1"]).await?;
845
846        {
847            let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
848
849            while let Some((offset, message)) = stream.try_next().await? {
850                // Offset contains information for topic name, database name and vgroup id,
851                //  similar to kafka topic/partition/offset.
852                let topic: &str = offset.topic();
853                let database = offset.database();
854                let vgroup_id = offset.vgroup_id();
855                log::debug!(
856                    "topic: {}, database: {}, vgroup_id: {}",
857                    topic,
858                    database,
859                    vgroup_id
860                );
861
862                // Different to kafka message, TDengine consumer would consume two kind of messages.
863                //
864                // 1. meta
865                // 2. data
866                match message {
867                    MessageSet::Meta(meta) => {
868                        log::debug!("Meta");
869                        let raw = meta.as_raw_meta().await?;
870                        taos.write_raw_meta(&raw).await?;
871
872                        // meta data can be write to an database seamlessly by raw or json (to sql).
873                        let json = meta.as_json_meta().await?;
874                        let sql = json.iter().next().unwrap().to_string();
875                        if let Err(err) = taos.exec(sql).await {
876                            println!("maybe error: {}", err);
877                        }
878                    }
879                    MessageSet::Data(data) => {
880                        log::debug!("Data");
881                        // data message may have more than one data block for various tables.
882                        while let Some(data) = data.fetch_raw_block().await? {
883                            log::debug!("table_name: {:?}", data.table_name());
884                            log::debug!("data: {:?}", data);
885                        }
886                    }
887                    MessageSet::MetaData(meta, data) => {
888                        log::debug!("MetaData");
889                        let raw = meta.as_raw_meta().await?;
890                        taos.write_raw_meta(&raw).await?;
891
892                        // meta data can be write to an database seamlessly by raw or json (to sql).
893                        let json = meta.as_json_meta().await?;
894                        let sql = json.iter().next().unwrap().to_string();
895                        if let Err(err) = taos.exec(sql).await {
896                            println!("maybe error: {}", err);
897                        }
898                        // data message may have more than one data block for various tables.
899                        while let Some(data) = data.fetch_raw_block().await? {
900                            log::debug!("table_name: {:?}", data.table_name());
901                            log::debug!("data: {:?}", data);
902                        }
903                    }
904                }
905                consumer.commit(offset).await?;
906            }
907        }
908
909        let assignments = consumer.assignments().await.unwrap();
910        log::debug!("assignments: {:?}", assignments);
911
912        // seek offset
913        for topic_vec_assignment in assignments {
914            let topic = &topic_vec_assignment.0;
915            let vec_assignment = topic_vec_assignment.1;
916            for assignment in vec_assignment {
917                let vgroup_id = assignment.vgroup_id();
918                let current = assignment.current_offset();
919                let begin = assignment.begin();
920                let end = assignment.end();
921                log::debug!(
922                    "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
923                    topic,
924                    vgroup_id,
925                    current,
926                    begin,
927                    end
928                );
929                let res = consumer.offset_seek(topic, vgroup_id, end).await;
930                if res.is_err() {
931                    log::error!("seek offset error: {:?}", res);
932                    let a = consumer.assignments().await.unwrap();
933                    log::error!("assignments: {:?}", a);
934                    // panic!()
935                }
936            }
937
938            let topic_assignment = consumer.topic_assignment(topic).await;
939            log::debug!("topic assignment: {:?}", topic_assignment);
940        }
941
942        // after seek offset
943        let assignments = consumer.assignments().await.unwrap();
944        log::debug!("after seek offset assignments: {:?}", assignments);
945
946        consumer.unsubscribe().await;
947
948        tokio::time::sleep(Duration::from_secs(1)).await;
949
950        taos.exec_many([
951            "drop database db2",
952            "drop topic ws_abc1",
953            "drop database ws_abc1",
954        ])
955        .await?;
956        Ok(())
957    }
958
959    #[tokio::test]
960    #[ignore]
961    async fn test_tmq_offset() -> taos_query::RawResult<()> {
962        // pretty_env_logger::formatted_timed_builder()
963        //     .filter_level(log::LevelFilter::Info)
964        //     .init();
965
966        use taos_query::prelude::*;
967        // let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
968        let dsn = "tmq://localhost:6030?offset=10:20,11:40".to_string();
969        log::info!("dsn: {}", dsn);
970        let mut dsn = Dsn::from_str(&dsn)?;
971        // dbg!(&dsn);
972
973        let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
974        taos.exec_many([
975            "drop topic if exists ws_abc1",
976            "drop database if exists ws_abc1",
977            "create database ws_abc1 wal_retention_period 3600",
978            "create topic ws_abc1 with meta as database ws_abc1",
979            "use ws_abc1",
980            // kind 1: create super table using all types
981            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
982            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
983            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
984            tags(t1 json)",
985            // kind 2: create child table with json tag
986            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
987            "create table tb1 using stb1 tags(NULL)",
988            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
989            NULL, NULL, NULL, NULL, NULL,
990            NULL, NULL, NULL, NULL)
991            tb1 values(now, true, -2, -3, -4, -5, \
992            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
993            254, 65534, 1, 1)",
994            // kind 3: create super table with all types except json (especially for tags)
995            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
996            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
997            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
998            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
999            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1000            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1001            // kind 4: create child table with all types except json
1002            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1003            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1004            254, 65534, 1, 1)",
1005            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1006            NULL, NULL, NULL, NULL, NULL,
1007            NULL, NULL, NULL, NULL)",
1008            // kind 5: create common table
1009            "create table `table` (ts timestamp, v int)",
1010            // kind 6: column in super table
1011            "alter table stb1 add column new1 bool",
1012            "alter table stb1 add column new2 tinyint",
1013            "alter table stb1 add column new10 nchar(16)",
1014            "alter table stb1 modify column new10 nchar(32)",
1015            "alter table stb1 drop column new10",
1016            "alter table stb1 drop column new2",
1017            "alter table stb1 drop column new1",
1018            // kind 7: add tag in super table
1019            "alter table `stb2` add tag new1 bool",
1020            "alter table `stb2` rename tag new1 new1_new",
1021            "alter table `stb2` modify tag t10 nchar(32)",
1022            "alter table `stb2` drop tag new1_new",
1023            // kind 8: column in common table
1024            "alter table `table` add column new1 bool",
1025            "alter table `table` add column new2 tinyint",
1026            "alter table `table` add column new10 nchar(16)",
1027            "alter table `table` modify column new10 nchar(32)",
1028            "alter table `table` rename column new10 new10_new",
1029            "alter table `table` drop column new10_new",
1030            "alter table `table` drop column new2",
1031            "alter table `table` drop column new1",
1032            // kind 9: drop normal table
1033            // "drop table `table`",
1034            // kind 10: drop child table
1035            // "drop table `tb2`, `tb1`",
1036            // kind 11: drop super table
1037            // "drop table `stb2`",
1038            // "drop table `stb1`",
1039        ])
1040        .await?;
1041
1042        taos.exec_many([
1043            "drop database if exists db2",
1044            "create database if not exists db2 wal_retention_period 3600",
1045            "use db2",
1046        ])
1047        .await?;
1048
1049        dsn.params.insert("group.id".to_string(), "abc".to_string());
1050
1051        dsn.params
1052            .insert("auto.offset.reset".to_string(), "earliest".to_string());
1053
1054        let builder = TmqBuilder::from_dsn(&dsn)?;
1055        // dbg!(&builder);
1056        let mut consumer = builder.build().await?;
1057
1058        consumer.subscribe(["ws_abc1"]).await?;
1059
1060        {
1061            let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
1062
1063            while let Some((offset, message)) = stream.try_next().await? {
1064                // Offset contains information for topic name, database name and vgroup id,
1065                //  similar to kafka topic/partition/offset.
1066                let topic: &str = offset.topic();
1067                let database = offset.database();
1068                let vgroup_id = offset.vgroup_id();
1069                log::debug!(
1070                    "topic: {}, database: {}, vgroup_id: {}",
1071                    topic,
1072                    database,
1073                    vgroup_id
1074                );
1075
1076                // Different to kafka message, TDengine consumer would consume two kind of messages.
1077                //
1078                // 1. meta
1079                // 2. data
1080                match message {
1081                    MessageSet::Meta(meta) => {
1082                        log::debug!("Meta");
1083                        let raw = meta.as_raw_meta().await?;
1084                        taos.write_raw_meta(&raw).await?;
1085
1086                        // meta data can be write to an database seamlessly by raw or json (to sql).
1087                        let json = meta.as_json_meta().await?;
1088                        let sql = json.iter().next().unwrap().to_string();
1089                        if let Err(err) = taos.exec(sql).await {
1090                            println!("maybe error: {}", err);
1091                        }
1092                    }
1093                    MessageSet::Data(data) => {
1094                        log::debug!("Data");
1095                        // data message may have more than one data block for various tables.
1096                        while let Some(data) = data.fetch_raw_block().await? {
1097                            log::debug!("table_name: {:?}", data.table_name());
1098                            log::debug!("data: {:?}", data);
1099                        }
1100                    }
1101                    MessageSet::MetaData(meta, data) => {
1102                        log::debug!("MetaData");
1103                        let raw = meta.as_raw_meta().await?;
1104                        taos.write_raw_meta(&raw).await?;
1105
1106                        // meta data can be write to an database seamlessly by raw or json (to sql).
1107                        let json = meta.as_json_meta().await?;
1108                        let sql = json.iter().next().unwrap().to_string();
1109                        if let Err(err) = taos.exec(sql).await {
1110                            println!("maybe error: {}", err);
1111                        }
1112                        // data message may have more than one data block for various tables.
1113                        while let Some(data) = data.fetch_raw_block().await? {
1114                            log::debug!("table_name: {:?}", data.table_name());
1115                            log::debug!("data: {:?}", data);
1116                        }
1117                    }
1118                }
1119                consumer.commit(offset).await?;
1120            }
1121        }
1122
1123        let assignments = consumer.assignments().await.unwrap();
1124        log::debug!("assignments: {:?}", assignments);
1125
1126        // seek offset
1127        for topic_vec_assignment in assignments {
1128            let topic = &topic_vec_assignment.0;
1129            let vec_assignment = topic_vec_assignment.1;
1130            for assignment in vec_assignment {
1131                let vgroup_id = assignment.vgroup_id();
1132                let current = assignment.current_offset();
1133                let begin = assignment.begin();
1134                let end = assignment.end();
1135                log::debug!(
1136                    "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
1137                    topic,
1138                    vgroup_id,
1139                    current,
1140                    begin,
1141                    end
1142                );
1143                let res = consumer.offset_seek(topic, vgroup_id, end).await;
1144                if res.is_err() {
1145                    log::error!("seek offset error: {:?}", res);
1146                    let a = consumer.assignments().await.unwrap();
1147                    log::error!("assignments: {:?}", a);
1148                    // panic!()
1149                }
1150            }
1151
1152            let topic_assignment = consumer.topic_assignment(topic).await;
1153            log::debug!("topic assignment: {:?}", topic_assignment);
1154        }
1155
1156        // after seek offset
1157        let assignments = consumer.assignments().await.unwrap();
1158        log::debug!("after seek offset assignments: {:?}", assignments);
1159
1160        consumer.unsubscribe().await;
1161
1162        tokio::time::sleep(Duration::from_secs(1)).await;
1163
1164        taos.exec_many([
1165            "drop database db2",
1166            "drop topic ws_abc1",
1167            "drop database ws_abc1",
1168        ])
1169        .await?;
1170        Ok(())
1171    }
1172
1173    #[tokio::test]
1174    async fn test_ws_tmq() -> taos_query::RawResult<()> {
1175        // pretty_env_logger::formatted_timed_builder()
1176        // .filter_level(log::LevelFilter::Info)
1177        // .init();
1178
1179        use taos_query::prelude::*;
1180        // let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
1181        let dsn = "taosws://localhost:6041".to_string();
1182        log::info!("dsn: {}", dsn);
1183        let mut dsn = Dsn::from_str(&dsn)?;
1184
1185        let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
1186
1187        let db = "ws_tmq_1";
1188        let db2 = "ws_tmq_1_dest";
1189
1190        taos.exec_many([
1191            format!("drop topic if exists {db}").as_str(),
1192            format!("drop database if exists {db}").as_str(),
1193            format!("create database {db} wal_retention_period 1").as_str(),
1194            format!("create topic {db} with meta as database {db}").as_str(),
1195            format!("use {db}").as_str(),
1196            // kind 1: create super table using all types
1197            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1198            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1199            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1200            tags(t1 json)",
1201            // kind 2: create child table with json tag
1202            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1203            "create table tb1 using stb1 tags(NULL)",
1204            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1205            NULL, NULL, NULL, NULL, NULL,
1206            NULL, NULL, NULL, NULL)
1207            tb1 values(now, true, -2, -3, -4, -5, \
1208            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1209            254, 65534, 1, 1)",
1210            // kind 3: create super table with all types except json (especially for tags)
1211            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1212            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1213            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1214            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1215            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1216            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1217            // kind 4: create child table with all types except json
1218            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1219            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1220            254, 65534, 1, 1)",
1221            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1222            NULL, NULL, NULL, NULL, NULL,
1223            NULL, NULL, NULL, NULL)",
1224            // kind 5: create common table
1225            "create table `table` (ts timestamp, v int)",
1226        ])
1227        .await?;
1228
1229        taos.exec_many([
1230            format!("drop database if exists {db2}"),
1231            format!("create database if not exists {db2} wal_retention_period 1"),
1232            format!("use {db2}"),
1233        ])
1234        .await?;
1235
1236        dsn.params.insert("group.id".to_string(), "abc".to_string());
1237
1238        dsn.params
1239            .insert("auto.offset.reset".to_string(), "earliest".to_string());
1240
1241        let builder = TmqBuilder::from_dsn(&dsn)?;
1242        let mut consumer = builder.build().await?;
1243        consumer.subscribe([db]).await?;
1244
1245        {
1246            let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
1247
1248            while let Some((offset, message)) = stream.try_next().await? {
1249                // Offset contains information for topic name, database name and vgroup id,
1250                //  similar to kafka topic/partition/offset.
1251                let topic: &str = offset.topic();
1252                let database = offset.database();
1253                let vgroup_id = offset.vgroup_id();
1254                log::debug!(
1255                    "topic: {}, database: {}, vgroup_id: {}",
1256                    topic,
1257                    database,
1258                    vgroup_id
1259                );
1260
1261                // Different to kafka message, TDengine consumer would consume two kind of messages.
1262                //
1263                // 1. meta
1264                // 2. data
1265                match message {
1266                    MessageSet::Meta(meta) => {
1267                        log::debug!("Meta");
1268                        let raw = meta.as_raw_meta().await?;
1269                        taos.write_raw_meta(&raw).await?;
1270
1271                        // meta data can be write to an database seamlessly by raw or json (to sql).
1272                        let json = meta.as_json_meta().await?;
1273                        let sql = json.iter().next().unwrap().to_string();
1274                        // dbg!(&sql);
1275                        if let Err(err) = taos.exec(sql).await {
1276                            log::error!("meta error: {}", err);
1277                        }
1278                    }
1279                    MessageSet::Data(data) => {
1280                        log::debug!("Data");
1281                        // data message may have more than one data block for various tables.
1282                        while let Some(data) = data.fetch_raw_block().await? {
1283                            log::debug!("table_name: {:?}", data.table_name());
1284                            log::debug!("data: {:?}", data);
1285                        }
1286                    }
1287                    MessageSet::MetaData(meta, data) => {
1288                        log::debug!("MetaData");
1289                        let raw = meta.as_raw_meta().await?;
1290                        taos.write_raw_meta(&raw).await?;
1291
1292                        // meta data can be write to an database seamlessly by raw or json (to sql).
1293                        let json = meta.as_json_meta().await?;
1294                        let sql = json.iter().next().unwrap().to_string();
1295                        if let Err(err) = taos.exec(sql).await {
1296                            println!("metadata error: {}", err);
1297                        }
1298                        // data message may have more than one data block for various tables.
1299                        while let Some(data) = data.fetch_raw_block().await? {
1300                            log::debug!("table_name: {:?}", data.table_name());
1301                            log::debug!("data: {:?}", data);
1302                        }
1303                    }
1304                }
1305                consumer.commit(offset).await?;
1306            }
1307        }
1308
1309        let assignments = consumer.assignments().await.unwrap();
1310        // dbg!(&assignments);
1311        log::info!("assignments: {:?}", assignments);
1312
1313        // seek offset
1314        for topic_vec_assignment in assignments {
1315            let topic = &topic_vec_assignment.0;
1316            let vec_assignment = topic_vec_assignment.1;
1317            for assignment in vec_assignment {
1318                let vgroup_id = assignment.vgroup_id();
1319                let current = assignment.current_offset();
1320                let begin = assignment.begin();
1321                let end = assignment.end();
1322                log::info!(
1323                    "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
1324                    topic,
1325                    vgroup_id,
1326                    current,
1327                    begin,
1328                    end
1329                );
1330                let res = consumer.offset_seek(topic, vgroup_id, end).await;
1331                if res.is_err() {
1332                    log::error!("seek offset error: {:?}", res);
1333                    let a = consumer.assignments().await.unwrap();
1334                    log::error!("assignments: {:?}", a);
1335                    // panic!()
1336                }
1337            }
1338
1339            let topic_assignment = consumer.topic_assignment(topic).await;
1340            log::info!("topic assignment: {:?}", topic_assignment);
1341        }
1342
1343        // after seek offset
1344        let assignments = consumer.assignments().await.unwrap();
1345        log::info!("after seek offset assignments: {:?}", assignments);
1346
1347        consumer.unsubscribe().await;
1348
1349        tokio::time::sleep(Duration::from_secs(1)).await;
1350
1351        taos.exec_many([
1352            format!("drop database {db2}"),
1353            format!("drop topic {db}"),
1354            format!("drop database {db}"),
1355        ])
1356        .await?;
1357        Ok(())
1358    }
1359
1360    #[tokio::test]
1361    async fn test_ws_raw_block_table_name() -> taos_query::RawResult<()> {
1362        // pretty_env_logger::formatted_timed_builder()
1363        // .filter_level(log::LevelFilter::Info)
1364        // .init();
1365
1366        use taos_query::prelude::*;
1367        // let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
1368        let dsn = "taosws://localhost:6041".to_string();
1369        log::info!("dsn: {}", dsn);
1370        let mut dsn = Dsn::from_str(&dsn)?;
1371
1372        let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
1373
1374        let db = "ws_tmq_block_1";
1375        let db2 = "ws_tmq_block_1_dest";
1376
1377        taos.exec_many([
1378            format!("drop topic if exists {db}").as_str(),
1379            format!("drop database if exists {db}").as_str(),
1380            format!("create database {db} wal_retention_period 1").as_str(),
1381            format!("create topic {db} with meta as database {db}").as_str(),
1382            format!("use {db}").as_str(),
1383            // kind 1: create super table using all types
1384            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1385            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1386            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1387            tags(t1 json)",
1388            // kind 2: create child table with json tag
1389            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1390            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1391            NULL, NULL, NULL, NULL, NULL,
1392            NULL, NULL, NULL, NULL)",
1393            // kind 3: create super table with all types except json (especially for tags)
1394            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1395            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1396            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1397            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1398            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1399            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1400            // kind 4: create child table with all types except json
1401            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1402            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1403            254, 65534, 1, 1)",
1404            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1405            NULL, NULL, NULL, NULL, NULL,
1406            NULL, NULL, NULL, NULL)",
1407            // kind 5: create common table
1408            "create table `table` (ts timestamp, v int)",
1409        ])
1410        .await?;
1411
1412        taos.exec_many([
1413            format!("drop database if exists {db2}"),
1414            format!("create database if not exists {db2} wal_retention_period 1"),
1415            format!("use {db2}"),
1416        ])
1417        .await?;
1418
1419        dsn.params.insert("group.id".to_string(), "abc".to_string());
1420
1421        dsn.params
1422            .insert("auto.offset.reset".to_string(), "earliest".to_string());
1423
1424        let builder = TmqBuilder::from_dsn(&dsn)?;
1425        let mut consumer = builder.build().await?;
1426        consumer.subscribe([db]).await?;
1427
1428        {
1429            let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
1430
1431            while let Some((offset, message)) = stream.try_next().await? {
1432                let topic: &str = offset.topic();
1433                let database = offset.database();
1434                let vgroup_id = offset.vgroup_id();
1435                log::debug!(
1436                    "topic: {}, database: {}, vgroup_id: {}",
1437                    topic,
1438                    database,
1439                    vgroup_id
1440                );
1441
1442                match message {
1443                    MessageSet::Meta(meta) => {
1444                        log::debug!("Meta");
1445                        let raw = meta.as_raw_meta().await?;
1446                        taos.write_raw_meta(&raw).await?;
1447
1448                        let json = meta.as_json_meta().await?;
1449                        let sql = json.iter().next().unwrap().to_string();
1450                        // dbg!(&sql);
1451                        if let Err(err) = taos.exec(sql).await {
1452                            log::error!("meta error: {}", err);
1453                        }
1454                    }
1455                    MessageSet::Data(data) => {
1456                        log::info!("Data");
1457                        // data message may have more than one data block for various tables.
1458                        while let Some(data) = data.fetch_raw_block().await? {
1459                            log::info!("table_name: {:?}", data.table_name());
1460                            assert_eq!(data.table_name(), Some("tb0"));
1461                            log::info!("data: {}", data.pretty_format());
1462                            assert!(data
1463                                .pretty_format()
1464                                .to_string()
1465                                .contains("table name \"tb0\""));
1466                        }
1467                    }
1468                    MessageSet::MetaData(meta, data) => {
1469                        log::info!("MetaData");
1470                        let raw = meta.as_raw_meta().await?;
1471                        taos.write_raw_meta(&raw).await?;
1472
1473                        // meta data can be write to an database seamlessly by raw or json (to sql).
1474                        let json = meta.as_json_meta().await?;
1475                        let sql = json.iter().next().unwrap().to_string();
1476                        if let Err(err) = taos.exec(sql).await {
1477                            println!("metadata error: {}", err);
1478                        }
1479                        // data message may have more than one data block for various tables.
1480                        while let Some(data) = data.fetch_raw_block().await? {
1481                            log::info!("MetaData table_name: {:?}", data.table_name());
1482                            log::info!("MetaData data: {:?}", data);
1483                        }
1484                    }
1485                }
1486                consumer.commit(offset).await?;
1487            }
1488        }
1489
1490        consumer.unsubscribe().await;
1491
1492        tokio::time::sleep(Duration::from_secs(1)).await;
1493
1494        taos.exec_many([
1495            format!("drop database {db2}"),
1496            format!("drop topic {db}"),
1497            format!("drop database {db}"),
1498        ])
1499        .await?;
1500        Ok(())
1501    }
1502
1503    #[tokio::test]
1504    async fn test_ws_flush_db() -> taos_query::RawResult<()> {
1505        // pretty_env_logger::formatted_timed_builder()
1506        // .filter_level(log::LevelFilter::Info)
1507        // .init();
1508
1509        use taos_query::prelude::*;
1510        // let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
1511        let dsn = "taosws://localhost:6041".to_string();
1512        log::info!("dsn: {}", dsn);
1513        let mut dsn = Dsn::from_str(&dsn)?;
1514
1515        let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
1516
1517        let db = "ws_tmq_flush_1";
1518        let db2 = "ws_tmq_flush_1_dest";
1519
1520        taos.exec_many([
1521            format!("drop topic if exists {db}").as_str(),
1522            format!("drop database if exists {db}").as_str(),
1523            format!("create database {db} wal_retention_period 1").as_str(),
1524            format!("create topic {db} with meta as database {db}").as_str(),
1525            format!("use {db}").as_str(),
1526            // kind 1: create super table using all types
1527            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1528            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1529            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1530            tags(t1 json)",
1531            // kind 2: create child table with json tag
1532            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1533            "create table tb1 using stb1 tags(NULL)",
1534            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1535            NULL, NULL, NULL, NULL, NULL,
1536            NULL, NULL, NULL, NULL)
1537            tb1 values(now, true, -2, -3, -4, -5, \
1538            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1539            254, 65534, 1, 1)",
1540            // kind 3: create super table with all types except json (especially for tags)
1541            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1542            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1543            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1544            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1545            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1546            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1547            // kind 4: create child table with all types except json
1548            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1549            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1550            254, 65534, 1, 1)",
1551            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1552            NULL, NULL, NULL, NULL, NULL,
1553            NULL, NULL, NULL, NULL)",
1554            // kind 5: create common table
1555            "create table `table` (ts timestamp, v int)",
1556            // kind 6: column in super table
1557            "alter table stb1 add column new1 bool",
1558            "alter table stb1 add column new2 tinyint",
1559            "alter table stb1 add column new10 nchar(16)",
1560            "alter table stb1 modify column new10 nchar(32)",
1561            "alter table stb1 drop column new10",
1562            "alter table stb1 drop column new2",
1563            "alter table stb1 drop column new1",
1564            // kind 7: add tag in super table
1565            "alter table `stb2` add tag new1 bool",
1566            "alter table `stb2` rename tag new1 new1_new",
1567            "alter table `stb2` modify tag t10 nchar(32)",
1568            "alter table `stb2` drop tag new1_new",
1569            // kind 8: column in common table
1570            "alter table `table` add column new1 bool",
1571            "alter table `table` add column new2 tinyint",
1572            "alter table `table` add column new10 nchar(16)",
1573            "alter table `table` modify column new10 nchar(32)",
1574            "alter table `table` rename column new10 new10_new",
1575            "alter table `table` drop column new10_new",
1576            "alter table `table` drop column new2",
1577            "alter table `table` drop column new1",
1578        ])
1579        .await?;
1580
1581        for _ in 0..1000 {
1582            taos.exec(
1583                "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1584            NULL, NULL, NULL, NULL, NULL,
1585            NULL, NULL, NULL, NULL)
1586            tb1 values(now, true, -2, -3, -4, -5, \
1587            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1588            254, 65534, 1, 1)",
1589            )
1590            .await?;
1591        }
1592        taos.exec(format!("flush database {db}")).await?;
1593        tokio::time::sleep(Duration::from_secs(1)).await;
1594
1595        taos.exec_many([
1596            format!("drop database if exists {db2}"),
1597            format!("create database if not exists {db2} wal_retention_period 1"),
1598            format!("use {db2}"),
1599        ])
1600        .await?;
1601
1602        dsn.params.insert("group.id".to_string(), "abc".to_string());
1603
1604        dsn.params
1605            .insert("auto.offset.reset".to_string(), "earliest".to_string());
1606        dsn.params.insert(
1607            "experimental.snapshot.enable".to_string(),
1608            "true".to_string(),
1609        );
1610
1611        let builder = TmqBuilder::from_dsn(&dsn)?;
1612        let mut consumer = builder.build().await?;
1613        consumer.subscribe([db]).await?;
1614
1615        {
1616            let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
1617
1618            while let Some((offset, message)) = stream.try_next().await? {
1619                // Offset contains information for topic name, database name and vgroup id,
1620                //  similar to kafka topic/partition/offset.
1621                let topic: &str = offset.topic();
1622                let database = offset.database();
1623                let vgroup_id = offset.vgroup_id();
1624                log::debug!(
1625                    "topic: {}, database: {}, vgroup_id: {}",
1626                    topic,
1627                    database,
1628                    vgroup_id
1629                );
1630
1631                // Different to kafka message, TDengine consumer would consume two kind of messages.
1632                //
1633                // 1. meta
1634                // 2. data
1635                match message {
1636                    MessageSet::Meta(meta) => {
1637                        log::debug!("Meta");
1638                        let raw = meta.as_raw_meta().await?;
1639                        taos.write_raw_meta(&raw).await?;
1640
1641                        // meta data can be write to an database seamlessly by raw or json (to sql).
1642                        let json = meta.as_json_meta().await?;
1643                        let sql = json.iter().next().unwrap().to_string();
1644                        // dbg!(&sql);
1645                        if let Err(err) = taos.exec(sql).await {
1646                            log::error!("maybe error: {}", err);
1647                        }
1648                    }
1649                    MessageSet::Data(data) => {
1650                        log::debug!("Data");
1651                        // data message may have more than one data block for various tables.
1652                        while let Some(data) = data.fetch_raw_block().await? {
1653                            log::debug!("table_name: {:?}", data.table_name());
1654                            log::debug!("data: {:?}", data);
1655                        }
1656                    }
1657                    MessageSet::MetaData(meta, data) => {
1658                        log::debug!("MetaData");
1659                        let raw = meta.as_raw_meta().await?;
1660                        taos.write_raw_meta(&raw).await?;
1661
1662                        // meta data can be write to an database seamlessly by raw or json (to sql).
1663                        let json = meta.as_json_meta().await?;
1664                        let sql = json.iter().next().unwrap().to_string();
1665                        if let Err(err) = taos.exec(sql).await {
1666                            println!("maybe error: {}", err);
1667                        }
1668                        // data message may have more than one data block for various tables.
1669                        while let Some(data) = data.fetch_raw_block().await? {
1670                            log::debug!("table_name: {:?}", data.table_name());
1671                            log::debug!("data: {:?}", data);
1672                        }
1673                    }
1674                }
1675                consumer.commit(offset).await?;
1676            }
1677        }
1678
1679        let assignments = consumer.assignments().await.unwrap();
1680        // dbg!(&assignments);
1681        log::info!("assignments: {:?}", assignments);
1682
1683        // seek offset
1684        for topic_vec_assignment in assignments {
1685            let topic = &topic_vec_assignment.0;
1686            let vec_assignment = topic_vec_assignment.1;
1687            for assignment in vec_assignment {
1688                let vgroup_id = assignment.vgroup_id();
1689                let current = assignment.current_offset();
1690                let begin = assignment.begin();
1691                let end = assignment.end();
1692                log::info!(
1693                    "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
1694                    topic,
1695                    vgroup_id,
1696                    current,
1697                    begin,
1698                    end
1699                );
1700                let res = consumer.offset_seek(topic, vgroup_id, end).await;
1701                if res.is_err() {
1702                    log::error!("seek offset error: {:?}", res);
1703                    let a = consumer.assignments().await.unwrap();
1704                    log::error!("assignments: {:?}", a);
1705                    // panic!()
1706                }
1707            }
1708
1709            let topic_assignment = consumer.topic_assignment(topic).await;
1710            log::info!("topic assignment: {:?}", topic_assignment);
1711        }
1712
1713        // after seek offset
1714        let assignments = consumer.assignments().await.unwrap();
1715        log::info!("after seek offset assignments: {:?}", assignments);
1716
1717        consumer.unsubscribe().await;
1718
1719        tokio::time::sleep(Duration::from_secs(1)).await;
1720
1721        taos.exec_many([
1722            format!("drop database {db2}"),
1723            format!("drop topic {db}"),
1724            format!("drop database {db}"),
1725        ])
1726        .await?;
1727        Ok(())
1728    }
1729
1730    #[tokio::test]
1731    async fn test_ws_tmq_snapshot() -> taos_query::RawResult<()> {
1732        // std::env::set_var("RUST_LOG", "tokio=warn,taos_ws=trace,info");
1733        // pretty_env_logger::init();
1734
1735        use taos_query::prelude::*;
1736        // let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
1737        let dsn = "taosws://localhost:6041".to_string();
1738        log::info!("dsn: {}", dsn);
1739        let mut dsn = Dsn::from_str(&dsn)?;
1740
1741        let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
1742
1743        let db = "ws_abc1_snapshot";
1744        let db2 = "ws_abc1_snapshot_dest";
1745
1746        taos.exec_many([
1747            format!("drop topic if exists {db}").as_str(),
1748            format!("drop database if exists {db}").as_str(),
1749            format!("create database {db} wal_retention_period 3600").as_str(),
1750            format!("create topic {db} with meta as database {db}").as_str(),
1751            format!("use {db}").as_str(),
1752            // kind 1: create super table using all types
1753            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1754            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1755            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1756            tags(t1 json)",
1757            // kind 2: create child table with json tag
1758            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1759            "create table tb1 using stb1 tags(NULL)",
1760            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1761            NULL, NULL, NULL, NULL, NULL,
1762            NULL, NULL, NULL, NULL)
1763            tb1 values(now, true, -2, -3, -4, -5, \
1764            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1765            254, 65534, 1, 1)",
1766            // kind 3: create super table with all types except json (especially for tags)
1767            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1768            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1769            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1770            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1771            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1772            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1773            // kind 4: create child table with all types except json
1774            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1775            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1776            254, 65534, 1, 1)",
1777            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1778            NULL, NULL, NULL, NULL, NULL,
1779            NULL, NULL, NULL, NULL)",
1780            // kind 5: create common table
1781            "create table `table` (ts timestamp, v int)",
1782            // kind 6: column in super table
1783            "alter table stb1 add column new1 bool",
1784            "alter table stb1 add column new2 tinyint",
1785            "alter table stb1 add column new10 nchar(16)",
1786            "alter table stb1 modify column new10 nchar(32)",
1787            "alter table stb1 drop column new10",
1788            "alter table stb1 drop column new2",
1789            "alter table stb1 drop column new1",
1790            // kind 7: add tag in super table
1791            "alter table `stb2` add tag new1 bool",
1792            "alter table `stb2` rename tag new1 new1_new",
1793            "alter table `stb2` modify tag t10 nchar(32)",
1794            "alter table `stb2` drop tag new1_new",
1795            // kind 8: column in common table
1796            "alter table `table` add column new1 bool",
1797            "alter table `table` add column new2 tinyint",
1798            "alter table `table` add column new10 nchar(16)",
1799            "alter table `table` modify column new10 nchar(32)",
1800            "alter table `table` rename column new10 new10_new",
1801            "alter table `table` drop column new10_new",
1802            "alter table `table` drop column new2",
1803            "alter table `table` drop column new1",
1804        ])
1805        .await?;
1806
1807        for _ in 0..100 {
1808            taos.exec(
1809                "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1810            NULL, NULL, NULL, NULL, NULL,
1811            NULL, NULL, NULL, NULL)
1812            tb1 values(now, true, -2, -3, -4, -5, \
1813            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1814            254, 65534, 1, 1)",
1815            )
1816            .await?;
1817        }
1818
1819        taos.exec_many([
1820            format!("drop database if exists {db2}"),
1821            format!("create database if not exists {db2} wal_retention_period 3600"),
1822            format!("use {db2}"),
1823        ])
1824        .await?;
1825
1826        dsn.params.insert("group.id".to_string(), "abc".to_string());
1827
1828        dsn.params
1829            .insert("auto.offset.reset".to_string(), "earliest".to_string());
1830
1831        let builder = TmqBuilder::from_dsn(&dsn)?;
1832        let mut consumer = builder.build().await?;
1833        consumer.subscribe([db]).await?;
1834
1835        {
1836            let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
1837
1838            while let Some((offset, message)) = stream.try_next().await? {
1839                // Offset contains information for topic name, database name and vgroup id,
1840                //  similar to kafka topic/partition/offset.
1841                let topic: &str = offset.topic();
1842                let database = offset.database();
1843                let vgroup_id = offset.vgroup_id();
1844                log::debug!(
1845                    "topic: {}, database: {}, vgroup_id: {}",
1846                    topic,
1847                    database,
1848                    vgroup_id
1849                );
1850
1851                // Different to kafka message, TDengine consumer would consume two kind of messages.
1852                //
1853                // 1. meta
1854                // 2. data
1855                match message {
1856                    MessageSet::Meta(meta) => {
1857                        log::debug!("Meta");
1858                        let raw = meta.as_raw_meta().await?;
1859                        taos.write_raw_meta(&raw).await?;
1860
1861                        // meta data can be write to an database seamlessly by raw or json (to sql).
1862                        let json = meta.as_json_meta().await?;
1863                        let sql = json.iter().next().unwrap().to_string();
1864                        // dbg!(&sql);
1865                        if let Err(err) = taos.exec(sql).await {
1866                            log::debug!("maybe error: {}", err);
1867                        }
1868                    }
1869                    MessageSet::Data(data) => {
1870                        log::debug!("Data");
1871                        // data message may have more than one data block for various tables.
1872                        while let Some(data) = data.fetch_raw_block().await? {
1873                            log::debug!("table_name: {:?}", data.table_name());
1874                            log::debug!("data: {:?}", data);
1875                        }
1876                    }
1877                    MessageSet::MetaData(meta, data) => {
1878                        log::debug!("MetaData");
1879                        let raw = meta.as_raw_meta().await?;
1880                        taos.write_raw_meta(&raw).await?;
1881
1882                        // meta data can be write to an database seamlessly by raw or json (to sql).
1883                        let json = meta.as_json_meta().await?;
1884                        let sql = json.iter().next().unwrap().to_string();
1885                        if let Err(err) = taos.exec(sql).await {
1886                            println!("maybe error: {}", err);
1887                        }
1888                        // data message may have more than one data block for various tables.
1889                        while let Some(data) = data.fetch_raw_block().await? {
1890                            log::debug!("table_name: {:?}", data.table_name());
1891                            log::debug!("data: {:?}", data);
1892                        }
1893                    }
1894                }
1895                consumer.commit(offset).await?;
1896            }
1897        }
1898
1899        let assignments = consumer.assignments().await.unwrap();
1900        dbg!(&assignments);
1901        log::info!("assignments: {:?}", assignments);
1902
1903        // seek offset
1904        for topic_vec_assignment in assignments {
1905            let topic = &topic_vec_assignment.0;
1906            let vec_assignment = topic_vec_assignment.1;
1907            for assignment in vec_assignment {
1908                let vgroup_id = assignment.vgroup_id();
1909                let current = assignment.current_offset();
1910                let begin = assignment.begin();
1911                let end = assignment.end();
1912                log::info!(
1913                    "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
1914                    topic,
1915                    vgroup_id,
1916                    current,
1917                    begin,
1918                    end
1919                );
1920                let res = consumer.offset_seek(topic, vgroup_id, end).await;
1921                if res.is_err() {
1922                    log::error!("seek offset error: {:?}", res);
1923                    let a = consumer.assignments().await.unwrap();
1924                    log::error!("assignments: {:?}", a);
1925                    // panic!()
1926                }
1927            }
1928
1929            let topic_assignment = consumer.topic_assignment(topic).await;
1930            log::info!("topic assignment: {:?}", topic_assignment);
1931        }
1932
1933        // after seek offset
1934        let assignments = consumer.assignments().await.unwrap();
1935        log::info!("after seek offset assignments: {:?}", assignments);
1936
1937        consumer.unsubscribe().await;
1938
1939        tokio::time::sleep(Duration::from_secs(1)).await;
1940
1941        taos.exec_many([
1942            format!("drop database {db2}"),
1943            format!("drop topic {db}"),
1944            format!("drop database {db}"),
1945        ])
1946        .await?;
1947        Ok(())
1948    }
1949    #[tokio::test]
1950    async fn test_ws_tmq_offset() -> taos_query::RawResult<()> {
1951        // pretty_env_logger::formatted_timed_builder()
1952        //     .filter_level(log::LevelFilter::Info)
1953        //     .init();
1954
1955        use taos_query::prelude::*;
1956        // let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
1957        let dsn = "tmq+ws://localhost:6041?offset=10:20,11:40".to_string();
1958        log::info!("dsn: {}", dsn);
1959        let mut dsn = Dsn::from_str(&dsn)?;
1960
1961        let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
1962
1963        let db = "ws_tmq_abc2";
1964        let db2 = "ws_tmq_abc2_dest";
1965
1966        taos.exec(format!("drop topic if exists {db}")).await?;
1967        taos.exec(format!("drop database if exists {db}")).await?;
1968
1969        std::thread::sleep(std::time::Duration::from_secs(3));
1970
1971        taos.exec(format!(
1972            "create database if not exists {db} wal_retention_period 3600"
1973        ))
1974        .await?;
1975
1976        std::thread::sleep(std::time::Duration::from_secs(3));
1977
1978        taos.exec_many([
1979            format!("create topic {db} with meta as database {db}").as_str(),
1980            format!("use {db}").as_str(),
1981            // kind 1: create super table using all types
1982            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1983            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1984            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1985            tags(t1 json)",
1986            // kind 2: create child table with json tag
1987            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1988            "create table tb1 using stb1 tags(NULL)",
1989            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1990            NULL, NULL, NULL, NULL, NULL,
1991            NULL, NULL, NULL, NULL)
1992            tb1 values(now, true, -2, -3, -4, -5, \
1993            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1994            254, 65534, 1, 1)",
1995            // kind 3: create super table with all types except json (especially for tags)
1996            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1997            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1998            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1999            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
2000            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
2001            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
2002            // kind 4: create child table with all types except json
2003            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
2004            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2005            254, 65534, 1, 1)",
2006            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
2007            NULL, NULL, NULL, NULL, NULL,
2008            NULL, NULL, NULL, NULL)",
2009            // kind 5: create common table
2010            "create table `table` (ts timestamp, v int)",
2011            // kind 6: column in super table
2012            "alter table stb1 add column new1 bool",
2013            "alter table stb1 add column new2 tinyint",
2014            "alter table stb1 add column new10 nchar(16)",
2015            "alter table stb1 modify column new10 nchar(32)",
2016            "alter table stb1 drop column new10",
2017            "alter table stb1 drop column new2",
2018            "alter table stb1 drop column new1",
2019            // kind 7: add tag in super table
2020            "alter table `stb2` add tag new1 bool",
2021            "alter table `stb2` rename tag new1 new1_new",
2022            "alter table `stb2` modify tag t10 nchar(32)",
2023            "alter table `stb2` drop tag new1_new",
2024            // kind 8: column in common table
2025            "alter table `table` add column new1 bool",
2026            "alter table `table` add column new2 tinyint",
2027            "alter table `table` add column new10 nchar(16)",
2028            "alter table `table` modify column new10 nchar(32)",
2029            "alter table `table` rename column new10 new10_new",
2030            "alter table `table` drop column new10_new",
2031            "alter table `table` drop column new2",
2032            "alter table `table` drop column new1",
2033            // kind 9: drop normal table
2034            // "drop table `table`",
2035            // kind 10: drop child table
2036            // "drop table `tb2`, `tb1`",
2037            // kind 11: drop super table
2038            // "drop table `stb2`",
2039            // "drop table `stb1`",
2040        ])
2041        .await?;
2042
2043        taos.exec_many([
2044            format!("drop database if exists {db2}"),
2045            format!("create database if not exists {db2} wal_retention_period 3600"),
2046            format!("use {db2}"),
2047        ])
2048        .await?;
2049
2050        dsn.params.insert("group.id".to_string(), "abc".to_string());
2051
2052        dsn.params
2053            .insert("auto.offset.reset".to_string(), "earliest".to_string());
2054
2055        let builder = TmqBuilder::from_dsn(&dsn)?;
2056        // dbg!(&builder);
2057        let mut consumer = builder.build().await?;
2058        consumer.subscribe([db]).await?;
2059
2060        {
2061            let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
2062
2063            while let Some((offset, message)) = stream.try_next().await? {
2064                // Offset contains information for topic name, database name and vgroup id,
2065                //  similar to kafka topic/partition/offset.
2066                let topic: &str = offset.topic();
2067                let database = offset.database();
2068                let vgroup_id = offset.vgroup_id();
2069                log::debug!(
2070                    "topic: {}, database: {}, vgroup_id: {}",
2071                    topic,
2072                    database,
2073                    vgroup_id
2074                );
2075
2076                // Different to kafka message, TDengine consumer would consume two kind of messages.
2077                //
2078                // 1. meta
2079                // 2. data
2080                match message {
2081                    MessageSet::Meta(meta) => {
2082                        log::debug!("Meta");
2083                        let raw = meta.as_raw_meta().await?;
2084                        taos.write_raw_meta(&raw).await?;
2085
2086                        // meta data can be write to an database seamlessly by raw or json (to sql).
2087                        let json = meta.as_json_meta().await?;
2088                        let sql = json.iter().next().unwrap().to_string();
2089                        if let Err(err) = taos.exec(sql).await {
2090                            println!("maybe error: {}", err);
2091                        }
2092                    }
2093                    MessageSet::Data(data) => {
2094                        log::debug!("Data");
2095                        // data message may have more than one data block for various tables.
2096                        while let Some(data) = data.fetch_raw_block().await? {
2097                            log::debug!("table_name: {:?}", data.table_name());
2098                            log::debug!("data: {:?}", data);
2099                        }
2100                    }
2101                    MessageSet::MetaData(meta, data) => {
2102                        log::debug!("MetaData");
2103                        let raw = meta.as_raw_meta().await?;
2104                        taos.write_raw_meta(&raw).await?;
2105
2106                        // meta data can be write to an database seamlessly by raw or json (to sql).
2107                        let json = meta.as_json_meta().await?;
2108                        let sql = json.iter().next().unwrap().to_string();
2109                        if let Err(err) = taos.exec(sql).await {
2110                            println!("maybe error: {}", err);
2111                        }
2112                        // data message may have more than one data block for various tables.
2113                        while let Some(data) = data.fetch_raw_block().await? {
2114                            log::debug!("table_name: {:?}", data.table_name());
2115                            log::debug!("data: {:?}", data);
2116                        }
2117                    }
2118                }
2119                consumer.commit(offset).await?;
2120            }
2121        }
2122
2123        let assignments = consumer.assignments().await.unwrap();
2124        log::debug!("assignments: {:?}", assignments);
2125
2126        // seek offset
2127        for topic_vec_assignment in assignments {
2128            let topic = &topic_vec_assignment.0;
2129            let vec_assignment = topic_vec_assignment.1;
2130            for assignment in vec_assignment {
2131                let vgroup_id = assignment.vgroup_id();
2132                let current = assignment.current_offset();
2133                let begin = assignment.begin();
2134                let end = assignment.end();
2135                log::debug!(
2136                    "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
2137                    topic,
2138                    vgroup_id,
2139                    current,
2140                    begin,
2141                    end
2142                );
2143                let res = consumer.offset_seek(topic, vgroup_id, end).await;
2144                if res.is_err() {
2145                    log::error!("seek offset error: {:?}", res);
2146                    let a = consumer.assignments().await.unwrap();
2147                    log::error!("assignments: {:?}", a);
2148                    // panic!()
2149                }
2150            }
2151
2152            let topic_assignment = consumer.topic_assignment(topic).await;
2153            log::debug!("topic assignment: {:?}", topic_assignment);
2154        }
2155
2156        // after seek offset
2157        let assignments = consumer.assignments().await.unwrap();
2158        log::debug!("after seek offset assignments: {:?}", assignments);
2159
2160        consumer.unsubscribe().await;
2161
2162        tokio::time::sleep(Duration::from_secs(1)).await;
2163
2164        taos.exec_many([
2165            format!("drop database {db2}"),
2166            format!("drop topic {db}"),
2167            format!("drop database {db}"),
2168        ])
2169        .await?;
2170        Ok(())
2171    }
2172
2173    #[tokio::test]
2174    async fn test_ws_tmq_committed() -> taos_query::RawResult<()> {
2175        // pretty_env_logger::formatted_timed_builder()
2176        //     .filter_level(log::LevelFilter::Info)
2177        //     .init();
2178
2179        use taos_query::prelude::*;
2180
2181        let dsn = "tmq+ws://localhost:6041?".to_string();
2182        log::info!("dsn: {}", dsn);
2183        let mut dsn = Dsn::from_str(&dsn)?;
2184
2185        let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
2186
2187        let db = "ws_tmq_committed";
2188        let db2 = "ws_tmq_committed_dest";
2189
2190        taos.exec(format!("drop topic if exists {db}")).await?;
2191        taos.exec(format!("drop database if exists {db}")).await?;
2192
2193        std::thread::sleep(std::time::Duration::from_secs(1));
2194
2195        taos.exec(format!(
2196            "create database if not exists {db} wal_retention_period 3600"
2197        ))
2198        .await?;
2199
2200        std::thread::sleep(std::time::Duration::from_secs(1));
2201
2202        taos.exec_many([
2203            format!("create topic {db} with meta as database {db}").as_str(),
2204            format!("use {db}").as_str(),
2205            // kind 1: create super table using all types
2206            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2207            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
2208            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2209            tags(t1 json)",
2210            // kind 2: create child table with json tag
2211            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
2212            "create table tb1 using stb1 tags(NULL)",
2213            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
2214            NULL, NULL, NULL, NULL, NULL,
2215            NULL, NULL, NULL, NULL)
2216            tb1 values(now, true, -2, -3, -4, -5, \
2217            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2218            254, 65534, 1, 1)",
2219            // kind 3: create super table with all types except json (especially for tags)
2220            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2221            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
2222            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2223            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
2224            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
2225            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
2226            // kind 4: create child table with all types except json
2227            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
2228            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2229            254, 65534, 1, 1)",
2230            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
2231            NULL, NULL, NULL, NULL, NULL,
2232            NULL, NULL, NULL, NULL)",
2233            // kind 5: create common table
2234            "create table `table` (ts timestamp, v int)",
2235            // kind 6: column in super table
2236            "alter table stb1 add column new1 bool",
2237            "alter table stb1 add column new2 tinyint",
2238            "alter table stb1 add column new10 nchar(16)",
2239            "alter table stb1 modify column new10 nchar(32)",
2240            "alter table stb1 drop column new10",
2241            "alter table stb1 drop column new2",
2242            "alter table stb1 drop column new1",
2243            // kind 7: add tag in super table
2244            "alter table `stb2` add tag new1 bool",
2245            "alter table `stb2` rename tag new1 new1_new",
2246            "alter table `stb2` modify tag t10 nchar(32)",
2247            "alter table `stb2` drop tag new1_new",
2248            // kind 8: column in common table
2249            "alter table `table` add column new1 bool",
2250            "alter table `table` add column new2 tinyint",
2251            "alter table `table` add column new10 nchar(16)",
2252            "alter table `table` modify column new10 nchar(32)",
2253            "alter table `table` rename column new10 new10_new",
2254            "alter table `table` drop column new10_new",
2255            "alter table `table` drop column new2",
2256            "alter table `table` drop column new1",
2257            // kind 9: drop normal table
2258            "drop table `table`",
2259            // kind 10: drop child table
2260            "drop table `tb2`, `tb1`",
2261            // kind 11: drop super table
2262            "drop table `stb2`",
2263            "drop table `stb1`",
2264        ])
2265        .await?;
2266
2267        taos.exec_many([
2268            format!("drop database if exists {db2}"),
2269            format!("create database if not exists {db2} wal_retention_period 3600"),
2270            format!("use {db2}"),
2271        ])
2272        .await?;
2273
2274        dsn.params.insert("group.id".to_string(), "abc".to_string());
2275
2276        dsn.params
2277            .insert("auto.offset.reset".to_string(), "earliest".to_string());
2278        let builder = TmqBuilder::from_dsn(&dsn)?;
2279        // dbg!(&builder);
2280        let mut consumer = builder.build().await?;
2281
2282        let topics = consumer.list_topics().await?;
2283        log::info!("topics: {:?}", topics);
2284        consumer.subscribe([db]).await?;
2285        let topics = consumer.list_topics().await?;
2286        log::info!("topics: {:?}", topics);
2287
2288        {
2289            let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
2290
2291            while let Some((offset, message)) = stream.try_next().await? {
2292                // Offset contains information for topic name, database name and vgroup id,
2293                //  similar to kafka topic/partition/offset.
2294                let topic: &str = offset.topic();
2295                let database = offset.database();
2296                let vgroup_id = offset.vgroup_id();
2297                log::debug!(
2298                    "topic: {}, database: {}, vgroup_id: {}",
2299                    topic,
2300                    database,
2301                    vgroup_id
2302                );
2303
2304                // Different to kafka message, TDengine consumer would consume two kind of messages.
2305                //
2306                // 1. meta
2307                // 2. data
2308                match message {
2309                    MessageSet::Meta(meta) => {
2310                        log::debug!("Meta");
2311                        let raw = meta.as_raw_meta().await?;
2312                        taos.write_raw_meta(&raw).await?;
2313
2314                        // meta data can be write to an database seamlessly by raw or json (to sql).
2315                        let json = meta.as_json_meta().await?;
2316                        let sql = json.iter().next().unwrap().to_string();
2317                        if let Err(err) = taos.exec(sql).await {
2318                            log::trace!("maybe error: {}", err);
2319                        }
2320                    }
2321                    MessageSet::Data(data) => {
2322                        log::debug!("Data");
2323                        // data message may have more than one data block for various tables.
2324                        while let Some(data) = data.fetch_raw_block().await? {
2325                            log::debug!("table_name: {:?}", data.table_name());
2326                            log::debug!("data: {:?}", data);
2327                        }
2328                    }
2329                    MessageSet::MetaData(meta, data) => {
2330                        log::debug!("MetaData");
2331                        let raw = meta.as_raw_meta().await?;
2332                        taos.write_raw_meta(&raw).await?;
2333
2334                        // meta data can be write to an database seamlessly by raw or json (to sql).
2335                        let json = meta.as_json_meta().await?;
2336                        let sql = json.iter().next().unwrap().to_string();
2337                        if let Err(err) = taos.exec(sql).await {
2338                            println!("maybe error: {}", err);
2339                        }
2340                        // data message may have more than one data block for various tables.
2341                        while let Some(data) = data.fetch_raw_block().await? {
2342                            log::debug!("table_name: {:?}", data.table_name());
2343                            log::debug!("data: {:?}", data);
2344                        }
2345                    }
2346                }
2347                consumer.commit(offset).await?;
2348            }
2349        }
2350
2351        let assignments = consumer.assignments().await.unwrap();
2352        log::info!("assignments: {:?}", assignments);
2353
2354        // seek offset
2355        for topic_vec_assignment in assignments {
2356            let topic = &topic_vec_assignment.0;
2357            let vec_assignment = topic_vec_assignment.1;
2358            for assignment in vec_assignment {
2359                let vgroup_id = assignment.vgroup_id();
2360                let current = assignment.current_offset();
2361                let begin = assignment.begin();
2362                let end = assignment.end();
2363                log::info!(
2364                    "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
2365                    topic,
2366                    vgroup_id,
2367                    current,
2368                    begin,
2369                    end
2370                );
2371
2372                let committed = consumer.committed(topic, vgroup_id).await?;
2373                log::info!("committed: {:?}", committed);
2374
2375                let position = consumer.position(topic, vgroup_id).await?;
2376                log::info!("position: {:?}", position);
2377
2378                let res = consumer.offset_seek(topic, vgroup_id, end).await;
2379                if res.is_err() {
2380                    log::error!("seek offset error: {:?}", res);
2381                    let a = consumer.assignments().await.unwrap();
2382                    log::error!("assignments: {:?}", a);
2383                }
2384
2385                let committed = consumer.committed(topic, vgroup_id).await?;
2386                log::info!("after seek committed: {:?}", committed);
2387
2388                let position = consumer.position(topic, vgroup_id).await?;
2389                log::info!("after seek position: {:?}", position);
2390
2391                let res = consumer.commit_offset(topic, vgroup_id, end).await;
2392                if res.is_err() {
2393                    log::error!("commit offset response: {:?}", res);
2394                }
2395
2396                let committed = consumer.committed(topic, vgroup_id).await?;
2397                log::info!("after commit committed: {:?}", committed);
2398
2399                let position = consumer.position(topic, vgroup_id).await?;
2400                log::info!("after commit position: {:?}", position);
2401            }
2402
2403            let topic_assignment = consumer.topic_assignment(topic).await;
2404            log::info!("topic assignment: {:?}", topic_assignment);
2405        }
2406
2407        // after seek offset
2408        let assignments = consumer.assignments().await.unwrap();
2409        log::debug!("after seek offset assignments: {:?}", assignments);
2410
2411        consumer.unsubscribe().await;
2412
2413        tokio::time::sleep(Duration::from_secs(1)).await;
2414
2415        taos.exec_many([
2416            format!("drop database {db2}"),
2417            format!("drop topic {db}"),
2418            format!("drop database {db}"),
2419        ])
2420        .await?;
2421        Ok(())
2422    }
2423}
2424
2425#[cfg(feature = "deflate")]
2426#[cfg(test)]
2427mod tmq_deflate_tests {
2428
2429    use crate::{
2430        query::infra::{ToMessage, WsRecv, WsSend},
2431        *,
2432    };
2433    use futures::{SinkExt, StreamExt};
2434    use std::time::Duration;
2435    use tracing::*;
2436    use tracing_subscriber::util::SubscriberInitExt;
2437    use ws_tool::frame::OpCode;
2438
2439    #[cfg(feature = "deflate")]
2440    #[tokio::test]
2441    async fn test_build_stream_with_deflate() -> Result<(), anyhow::Error> {
2442        let _subscriber = tracing_subscriber::fmt::fmt()
2443            .with_max_level(Level::DEBUG)
2444            .with_file(true)
2445            .with_line_number(true)
2446            .finish();
2447        let _ = _subscriber.try_init();
2448
2449        let dsn = std::env::var("TEST_CLOUD_DSN").unwrap_or("http://localhost:6041".to_string());
2450
2451        let builder = TaosBuilder::from_dsn(dsn).unwrap();
2452        let url = builder.to_query_url();
2453        let ws = builder.ws_tool_build_stream(url).await.unwrap();
2454
2455        let (mut sink, mut source) = ws.split();
2456
2457        let version = WsSend::Version;
2458        source
2459            .send(OpCode::Text, &serde_json::to_vec(&version)?)
2460            .await?;
2461
2462        let _handle = tokio::spawn(async move {
2463            loop {
2464                let frame = sink.receive().await.unwrap();
2465                let (header, payload) = frame;
2466                trace!("header.code: {:?}, payload: {:?}", &header.code, &payload);
2467                let code = header.code;
2468
2469                match code {
2470                    OpCode::Binary => {
2471                        println!("{:?}", payload);
2472                    }
2473                    OpCode::Text => {
2474                        let recv: crate::query::infra::WsRecv =
2475                            serde_json::from_slice(&payload).unwrap();
2476                        info!("recv: {:?}", recv);
2477                        assert_eq!(recv.code, 0);
2478                    }
2479                    _ => (),
2480                }
2481            }
2482        });
2483
2484        tokio::time::sleep(Duration::from_millis(1000)).await;
2485
2486        Ok(())
2487    }
2488
2489    #[cfg(feature = "deflate")]
2490    #[tokio::test]
2491    async fn test_ws_tmq_deflate() -> taos_query::RawResult<()> {
2492        // pretty_env_logger::formatted_timed_builder()
2493        //     .filter_level(log::LevelFilter::Info)
2494        //     .init();
2495
2496        use taos_query::prelude::*;
2497
2498        let dsn = "tmq+ws://localhost:6041?".to_string();
2499        log::trace!("dsn: {}", dsn);
2500        let mut dsn = Dsn::from_str(&dsn)?;
2501
2502        let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
2503
2504        let db = "ws_tmq_deflate";
2505        let db2 = "ws_tmq_deflate_dest";
2506
2507        taos.exec(format!("drop topic if exists {db}")).await?;
2508        taos.exec(format!("drop database if exists {db}")).await?;
2509
2510        std::thread::sleep(std::time::Duration::from_secs(1));
2511
2512        taos.exec(format!(
2513            "create database if not exists {db} wal_retention_period 3600"
2514        ))
2515        .await?;
2516
2517        std::thread::sleep(std::time::Duration::from_secs(1));
2518
2519        taos.exec_many([
2520            format!("create topic {db} with meta as database {db}").as_str(),
2521            format!("use {db}").as_str(),
2522            // kind 1: create super table using all types
2523            "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2524            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
2525            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2526            tags(t1 json)",
2527            // kind 2: create child table with json tag
2528            "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
2529            "create table tb1 using stb1 tags(NULL)",
2530            "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
2531            NULL, NULL, NULL, NULL, NULL,
2532            NULL, NULL, NULL, NULL)
2533            tb1 values(now, true, -2, -3, -4, -5, \
2534            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2535            254, 65534, 1, 1)",
2536            // kind 3: create super table with all types except json (especially for tags)
2537            "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2538            c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
2539            c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2540            tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
2541            t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
2542            t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
2543            // kind 4: create child table with all types except json
2544            "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
2545            '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2546            254, 65534, 1, 1)",
2547            "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
2548            NULL, NULL, NULL, NULL, NULL,
2549            NULL, NULL, NULL, NULL)",
2550            // kind 5: create common table
2551            "create table `table` (ts timestamp, v int)",
2552            // kind 6: column in super table
2553            "alter table stb1 add column new1 bool",
2554            "alter table stb1 add column new2 tinyint",
2555            "alter table stb1 add column new10 nchar(16)",
2556            "alter table stb1 modify column new10 nchar(32)",
2557            "alter table stb1 drop column new10",
2558            "alter table stb1 drop column new2",
2559            "alter table stb1 drop column new1",
2560            // kind 7: add tag in super table
2561            "alter table `stb2` add tag new1 bool",
2562            "alter table `stb2` rename tag new1 new1_new",
2563            "alter table `stb2` modify tag t10 nchar(32)",
2564            "alter table `stb2` drop tag new1_new",
2565            // kind 8: column in common table
2566            "alter table `table` add column new1 bool",
2567            "alter table `table` add column new2 tinyint",
2568            "alter table `table` add column new10 nchar(16)",
2569            "alter table `table` modify column new10 nchar(32)",
2570            "alter table `table` rename column new10 new10_new",
2571            "alter table `table` drop column new10_new",
2572            "alter table `table` drop column new2",
2573            "alter table `table` drop column new1",
2574            // kind 9: drop normal table
2575            "drop table `table`",
2576            // kind 10: drop child table
2577            "drop table `tb2`, `tb1`",
2578            // kind 11: drop super table
2579            "drop table `stb2`",
2580            "drop table `stb1`",
2581        ])
2582        .await?;
2583
2584        taos.exec_many([
2585            format!("drop database if exists {db2}"),
2586            format!("create database if not exists {db2} wal_retention_period 3600"),
2587            format!("use {db2}"),
2588        ])
2589        .await?;
2590
2591        dsn.params
2592            .insert("group.id".to_string(), "ws_tmq_deflate_1".to_string());
2593
2594        dsn.params
2595            .insert("auto.offset.reset".to_string(), "earliest".to_string());
2596        let builder = TmqBuilder::from_dsn(&dsn)?;
2597        // dbg!(&builder);
2598        let mut consumer = builder.build().await?;
2599
2600        let topics = consumer.list_topics().await?;
2601        log::info!("topics: {:?}", topics);
2602        consumer.subscribe([db]).await?;
2603        let topics = consumer.list_topics().await?;
2604        log::info!("topics: {:?}", topics);
2605
2606        {
2607            let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
2608
2609            while let Some((offset, message)) = stream.try_next().await? {
2610                let topic: &str = offset.topic();
2611                let database = offset.database();
2612                let vgroup_id = offset.vgroup_id();
2613                log::debug!(
2614                    "topic: {}, database: {}, vgroup_id: {}",
2615                    topic,
2616                    database,
2617                    vgroup_id
2618                );
2619
2620                match message {
2621                    MessageSet::Meta(meta) => {
2622                        log::debug!("Meta");
2623                        let raw = meta.as_raw_meta().await?;
2624                        taos.write_raw_meta(&raw).await?;
2625
2626                        let json = meta.as_json_meta().await?;
2627                        let sql = json.to_string();
2628                        if let Err(err) = taos.exec(sql).await {
2629                            log::trace!("maybe error: {}", err);
2630                        }
2631                    }
2632                    MessageSet::Data(data) => {
2633                        log::debug!("Data");
2634
2635                        while let Some(data) = data.fetch_raw_block().await? {
2636                            log::debug!("table_name: {:?}", data.table_name());
2637                            log::debug!("data: {:?}", data);
2638                        }
2639                    }
2640                    MessageSet::MetaData(meta, data) => {
2641                        log::debug!("MetaData");
2642                        let raw = meta.as_raw_meta().await?;
2643                        taos.write_raw_meta(&raw).await?;
2644
2645                        let json = meta.as_json_meta().await?;
2646                        let sql = json.to_string();
2647                        if let Err(err) = taos.exec(sql).await {
2648                            println!("maybe error: {}", err);
2649                        }
2650
2651                        while let Some(data) = data.fetch_raw_block().await? {
2652                            log::debug!("table_name: {:?}", data.table_name());
2653                            log::debug!("data: {:?}", data);
2654                        }
2655                    }
2656                }
2657                consumer.commit(offset).await?;
2658            }
2659        }
2660
2661        let assignments = consumer.assignments().await.unwrap();
2662        log::info!("assignments: {:?}", assignments);
2663
2664        // seek offset
2665        for topic_vec_assignment in assignments {
2666            let topic = &topic_vec_assignment.0;
2667            let vec_assignment = topic_vec_assignment.1;
2668            for assignment in vec_assignment {
2669                let vgroup_id = assignment.vgroup_id();
2670                let current = assignment.current_offset();
2671                let begin = assignment.begin();
2672                let end = assignment.end();
2673                log::info!(
2674                    "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
2675                    topic,
2676                    vgroup_id,
2677                    current,
2678                    begin,
2679                    end
2680                );
2681
2682                let committed = consumer.committed(topic, vgroup_id).await?;
2683                log::info!("committed: {:?}", committed);
2684
2685                let position = consumer.position(topic, vgroup_id).await?;
2686                log::info!("position: {:?}", position);
2687
2688                let res = consumer.offset_seek(topic, vgroup_id, end).await;
2689                if res.is_err() {
2690                    log::error!("seek offset error: {:?}", res);
2691                    let a = consumer.assignments().await.unwrap();
2692                    log::error!("assignments: {:?}", a);
2693                }
2694
2695                let committed = consumer.committed(topic, vgroup_id).await?;
2696                log::info!("after seek committed: {:?}", committed);
2697
2698                let position = consumer.position(topic, vgroup_id).await?;
2699                log::info!("after seek position: {:?}", position);
2700
2701                let res = consumer.commit_offset(topic, vgroup_id, end).await;
2702                if res.is_err() {
2703                    log::error!("commit offset response: {:?}", res);
2704                }
2705
2706                let committed = consumer.committed(topic, vgroup_id).await?;
2707                log::info!("after commit committed: {:?}", committed);
2708
2709                let position = consumer.position(topic, vgroup_id).await?;
2710                log::info!("after commit position: {:?}", position);
2711            }
2712
2713            let topic_assignment = consumer.topic_assignment(topic).await;
2714            log::info!("topic assignment: {:?}", topic_assignment);
2715        }
2716
2717        // after seek offset
2718        let assignments = consumer.assignments().await.unwrap();
2719        log::debug!("after seek offset assignments: {:?}", assignments);
2720
2721        consumer.unsubscribe().await;
2722
2723        tokio::time::sleep(Duration::from_secs(1)).await;
2724
2725        taos.exec_many([
2726            format!("drop database {db2}"),
2727            format!("drop topic {db}"),
2728            format!("drop database {db}"),
2729        ])
2730        .await?;
2731        Ok(())
2732    }
2733}