1use taos_query::{
2 prelude::{AsAsyncConsumer, RawMeta, Timeout},
3 tmq::{Assignment, VGroupId},
4 RawBlock, RawResult,
5};
6
7#[derive(Debug)]
8enum TmqBuilderInner {
9 Native(crate::sys::TmqBuilder),
10 Ws(taos_ws::consumer::TmqBuilder),
11}
12
13#[derive(Debug)]
14enum ConsumerInner {
15 Native(crate::sys::Consumer),
16 Ws(taos_ws::consumer::Consumer),
17}
18
19#[derive(Debug)]
20enum OffsetInner {
21 Native(crate::sys::tmq::Offset),
22 Ws(taos_ws::consumer::Offset),
23}
24
25#[derive(Debug)]
26enum MetaInner {
27 Native(crate::sys::tmq::Meta),
28 Ws(taos_ws::consumer::Meta),
29}
30
31#[derive(Debug)]
32enum DataInner {
33 Native(crate::sys::tmq::Data),
34 Ws(taos_ws::consumer::Data),
35}
36
37#[derive(Debug)]
38pub struct Offset(OffsetInner);
39
40#[derive(Debug)]
41pub struct Meta(MetaInner);
42
43#[derive(Debug)]
44pub struct Data(DataInner);
45
46pub type MessageSet<Meta, Data> = taos_query::tmq::MessageSet<Meta, Data>;
47
48#[derive(Debug)]
49pub struct TmqBuilder(TmqBuilderInner);
50
51#[derive(Debug)]
52pub struct Consumer(ConsumerInner);
53
54impl taos_query::TBuilder for TmqBuilder {
55 type Target = Consumer;
56
57 fn available_params() -> &'static [&'static str] {
58 &[]
59 }
60
61 fn from_dsn<D: taos_query::IntoDsn>(dsn: D) -> RawResult<Self> {
62 let dsn = dsn.into_dsn()?;
63 match (dsn.driver.as_str(), dsn.protocol.as_deref()) {
65 ("ws" | "wss" | "http" | "https" | "taosws", _) => Ok(Self(TmqBuilderInner::Ws(
66 taos_ws::consumer::TmqBuilder::from_dsn(dsn)?,
67 ))),
68 ("taos" | "tmq", None) => Ok(Self(TmqBuilderInner::Native(
69 crate::sys::TmqBuilder::from_dsn(dsn)?,
70 ))),
71 ("taos" | "tmq", Some("ws" | "wss" | "http" | "https")) => Ok(Self(
72 TmqBuilderInner::Ws(taos_ws::consumer::TmqBuilder::from_dsn(dsn)?),
73 )),
74 (driver, _) => Err(taos_query::DsnError::InvalidDriver(driver.to_string()).into()),
75 }
76 }
77
78 fn client_version() -> &'static str {
79 ""
80 }
81
82 fn ping(&self, conn: &mut Self::Target) -> RawResult<()> {
83 match &self.0 {
84 TmqBuilderInner::Native(b) => match &mut conn.0 {
85 ConsumerInner::Native(taos) => Ok(b.ping(taos)?),
86 _ => unreachable!(),
87 },
88 TmqBuilderInner::Ws(b) => match &mut conn.0 {
89 ConsumerInner::Ws(taos) => Ok(b.ping(taos)?),
90 _ => unreachable!(),
91 },
92 }
93 }
94
95 fn ready(&self) -> bool {
96 match &self.0 {
97 TmqBuilderInner::Native(b) => b.ready(),
98 TmqBuilderInner::Ws(b) => b.ready(),
99 }
100 }
101
102 fn build(&self) -> RawResult<Self::Target> {
103 match &self.0 {
104 TmqBuilderInner::Native(b) => Ok(Consumer(ConsumerInner::Native(b.build()?))),
105 TmqBuilderInner::Ws(b) => Ok(Consumer(ConsumerInner::Ws(b.build()?))),
106 }
107 }
108
109 fn server_version(&self) -> RawResult<&str> {
110 todo!()
111 }
112
113 fn is_enterprise_edition(&self) -> RawResult<bool> {
114 todo!()
115 }
116
117 fn get_edition(&self) -> RawResult<taos_query::util::Edition> {
118 match &self.0 {
119 TmqBuilderInner::Native(b) => Ok(b.get_edition()?),
120 TmqBuilderInner::Ws(b) => Ok(b.get_edition()?),
121 }
122 }
123}
124
125#[async_trait::async_trait]
126impl taos_query::AsyncTBuilder for TmqBuilder {
127 type Target = Consumer;
128
129 fn from_dsn<D: taos_query::IntoDsn>(dsn: D) -> RawResult<Self> {
130 let dsn = dsn.into_dsn()?;
131 match (dsn.driver.as_str(), dsn.protocol.as_deref()) {
133 ("ws" | "wss" | "http" | "https" | "taosws", _) => Ok(Self(TmqBuilderInner::Ws(
134 taos_ws::consumer::TmqBuilder::from_dsn(dsn)?,
135 ))),
136 ("taos" | "tmq", None) => Ok(Self(TmqBuilderInner::Native(
137 crate::sys::TmqBuilder::from_dsn(dsn)?,
138 ))),
139 ("taos" | "tmq", Some("ws" | "wss" | "http" | "https")) => Ok(Self(
140 TmqBuilderInner::Ws(taos_ws::consumer::TmqBuilder::from_dsn(dsn)?),
141 )),
142 (driver, _) => Err(taos_query::DsnError::InvalidDriver(driver.to_string()).into()),
143 }
144 }
145
146 fn client_version() -> &'static str {
147 ""
148 }
149
150 async fn ping(&self, conn: &mut Self::Target) -> RawResult<()> {
151 match &self.0 {
152 TmqBuilderInner::Native(b) => match &mut conn.0 {
153 ConsumerInner::Native(taos) => Ok(b.ping(taos).await?),
154 _ => unreachable!(),
155 },
156 TmqBuilderInner::Ws(b) => match &mut conn.0 {
157 ConsumerInner::Ws(taos) => Ok(b.ping(taos).await?),
158 _ => unreachable!(),
159 },
160 }
161 }
162
163 async fn ready(&self) -> bool {
164 match &self.0 {
165 TmqBuilderInner::Native(b) => b.ready().await,
166 TmqBuilderInner::Ws(b) => b.ready().await,
167 }
168 }
169
170 async fn build(&self) -> RawResult<Self::Target> {
171 match &self.0 {
172 TmqBuilderInner::Native(b) => Ok(Consumer(ConsumerInner::Native(b.build().await?))),
173 TmqBuilderInner::Ws(b) => Ok(Consumer(ConsumerInner::Ws(b.build().await?))),
174 }
175 }
176
177 async fn server_version(&self) -> RawResult<&str> {
178 match &self.0 {
179 TmqBuilderInner::Native(b) => Ok(b.server_version().await?),
180 TmqBuilderInner::Ws(b) => Ok(b.server_version().await?),
181 }
182 }
183
184 async fn is_enterprise_edition(&self) -> RawResult<bool> {
185 match &self.0 {
186 TmqBuilderInner::Native(b) => Ok(b.is_enterprise_edition().await?),
187 TmqBuilderInner::Ws(b) => Ok(b.is_enterprise_edition().await?),
188 }
189 }
190
191 async fn get_edition(&self) -> RawResult<taos_query::util::Edition> {
192 match &self.0 {
193 TmqBuilderInner::Native(b) => Ok(b.get_edition().await?),
194 TmqBuilderInner::Ws(b) => Ok(b.get_edition().await?),
195 }
196 }
197}
198
199impl taos_query::tmq::IsOffset for Offset {
200 fn database(&self) -> &str {
201 match &self.0 {
202 OffsetInner::Native(offset) => {
203 <crate::sys::tmq::Offset as taos_query::tmq::IsOffset>::database(offset)
204 }
205 OffsetInner::Ws(offset) => {
206 <taos_ws::consumer::Offset as taos_query::tmq::IsOffset>::database(offset)
207 }
208 }
209 }
210
211 fn topic(&self) -> &str {
212 match &self.0 {
213 OffsetInner::Native(offset) => {
214 <crate::sys::tmq::Offset as taos_query::tmq::IsOffset>::topic(offset)
215 }
216 OffsetInner::Ws(offset) => {
217 <taos_ws::consumer::Offset as taos_query::tmq::IsOffset>::topic(offset)
218 }
219 }
220 }
221
222 fn vgroup_id(&self) -> taos_query::tmq::VGroupId {
223 match &self.0 {
224 OffsetInner::Native(offset) => {
225 <crate::sys::tmq::Offset as taos_query::tmq::IsOffset>::vgroup_id(offset)
226 }
227 OffsetInner::Ws(offset) => {
228 <taos_ws::consumer::Offset as taos_query::tmq::IsOffset>::vgroup_id(offset)
229 }
230 }
231 }
232}
233
234#[async_trait::async_trait]
235impl taos_query::tmq::IsAsyncMeta for Meta {
236 async fn as_raw_meta(&self) -> RawResult<RawMeta> {
237 match &self.0 {
238 MetaInner::Native(data) => {
239 <crate::sys::tmq::Meta as taos_query::tmq::IsAsyncMeta>::as_raw_meta(data)
240 .await
241 .map_err(Into::into)
242 }
243 MetaInner::Ws(data) => {
244 <taos_ws::consumer::Meta as taos_query::tmq::IsAsyncMeta>::as_raw_meta(data)
245 .await
246 .map_err(Into::into)
247 }
248 }
249 }
250
251 async fn as_json_meta(&self) -> RawResult<taos_query::common::JsonMeta> {
252 match &self.0 {
253 MetaInner::Native(data) => {
254 <crate::sys::tmq::Meta as taos_query::tmq::IsAsyncMeta>::as_json_meta(data)
255 .await
256 .map_err(Into::into)
257 }
258 MetaInner::Ws(data) => {
259 <taos_ws::consumer::Meta as taos_query::tmq::IsAsyncMeta>::as_json_meta(data)
260 .await
261 .map_err(Into::into)
262 }
263 }
264 }
265}
266
267#[async_trait::async_trait]
268impl taos_query::tmq::IsAsyncData for Data {
269 async fn as_raw_data(&self) -> RawResult<taos_query::common::RawData> {
270 match &self.0 {
271 DataInner::Native(data) => {
272 <crate::sys::tmq::Data as taos_query::tmq::IsAsyncData>::as_raw_data(data)
273 .await
274 .map_err(Into::into)
275 }
276 DataInner::Ws(data) => {
277 <taos_ws::consumer::Data as taos_query::tmq::IsAsyncData>::as_raw_data(data)
278 .await
279 .map_err(Into::into)
280 }
281 }
282 }
283
284 async fn fetch_raw_block(&self) -> RawResult<Option<taos_query::RawBlock>> {
285 match &self.0 {
286 DataInner::Native(data) => {
287 <crate::sys::tmq::Data as taos_query::tmq::IsAsyncData>::fetch_raw_block(data)
288 .await
289 .map_err(Into::into)
290 }
291 DataInner::Ws(data) => {
292 <taos_ws::consumer::Data as taos_query::tmq::IsAsyncData>::fetch_raw_block(data)
293 .await
294 .map_err(Into::into)
295 }
296 }
297 }
298}
299
300#[async_trait::async_trait]
301impl AsAsyncConsumer for Consumer {
302 type Offset = Offset;
303
304 type Meta = Meta;
305
306 type Data = Data;
307
308 fn default_timeout(&self) -> Timeout {
309 match &self.0 {
310 ConsumerInner::Native(c) => {
311 <crate::sys::Consumer as AsAsyncConsumer>::default_timeout(c)
312 }
313 ConsumerInner::Ws(c) => {
314 <taos_ws::consumer::Consumer as AsAsyncConsumer>::default_timeout(c)
315 }
316 }
317 }
318
319 async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
320 &mut self,
321 topics: I,
322 ) -> RawResult<()> {
323 match &mut self.0 {
324 ConsumerInner::Native(c) => {
325 <crate::sys::Consumer as AsAsyncConsumer>::subscribe(c, topics)
326 .await
327 .map_err(Into::into)
328 }
329 ConsumerInner::Ws(c) => {
330 <taos_ws::consumer::Consumer as AsAsyncConsumer>::subscribe(c, topics)
331 .await
332 .map_err(Into::into)
333 }
334 }
335 }
336
337 async fn unsubscribe(self) {
338 match self.0 {
339 ConsumerInner::Native(c) => {
340 <crate::sys::Consumer as AsAsyncConsumer>::unsubscribe(c).await
341 }
342 ConsumerInner::Ws(c) => {
343 <taos_ws::consumer::Consumer as AsAsyncConsumer>::unsubscribe(c).await
344 }
345 }
346 }
347
348 async fn recv_timeout(
349 &self,
350 timeout: Timeout,
351 ) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>> {
352 match &self.0 {
353 ConsumerInner::Native(c) => {
354 <crate::sys::Consumer as AsAsyncConsumer>::recv_timeout(c, timeout)
355 .await
356 .map_err(Into::into)
357 .map(|msg| {
358 msg.map(|(offset, msg)| {
359 (
360 Offset(OffsetInner::Native(offset)),
361 match msg {
362 MessageSet::Meta(meta) => {
363 MessageSet::Meta(Meta(MetaInner::Native(meta)))
364 }
365 MessageSet::Data(data) => {
366 MessageSet::Data(Data(DataInner::Native(data)))
367 }
368 MessageSet::MetaData(meta, data) => MessageSet::MetaData(
369 Meta(MetaInner::Native(meta)),
370 Data(DataInner::Native(data)),
371 ),
372 },
373 )
374 })
375 })
376 }
377 ConsumerInner::Ws(c) => {
378 <taos_ws::consumer::Consumer as AsAsyncConsumer>::recv_timeout(c, timeout)
379 .await
380 .map_err(Into::into)
381 .map(|msg| {
382 msg.map(|(offset, msg)| {
383 (
384 Offset(OffsetInner::Ws(offset)),
385 match msg {
386 taos_query::tmq::MessageSet::Meta(meta) => {
387 MessageSet::Meta(Meta(MetaInner::Ws(meta)))
388 }
389 taos_query::tmq::MessageSet::Data(data) => {
390 MessageSet::Data(Data(DataInner::Ws(data)))
391 }
392 taos_query::tmq::MessageSet::MetaData(meta, data) => {
393 MessageSet::MetaData(
394 Meta(MetaInner::Ws(meta)),
395 Data(DataInner::Ws(data)),
396 )
397 }
398 },
399 )
400 })
401 })
402 }
403 }
404 }
405
406 async fn commit(&self, offset: Self::Offset) -> RawResult<()> {
407 match &self.0 {
408 ConsumerInner::Native(c) => match offset.0 {
409 OffsetInner::Native(offset) => {
410 <crate::sys::Consumer as AsAsyncConsumer>::commit(c, offset)
411 .await
412 .map_err(Into::into)
413 }
414 OffsetInner::Ws(_) => unreachable!(),
415 },
416 ConsumerInner::Ws(c) => match offset.0 {
417 OffsetInner::Ws(offset) => {
418 <taos_ws::consumer::Consumer as AsAsyncConsumer>::commit(c, offset)
419 .await
420 .map_err(Into::into)
421 }
422 _ => unreachable!(),
423 },
424 }
425 }
426
427 async fn commit_offset(&self, topic: &str, vgroup_id: VGroupId, offset: i64) -> RawResult<()> {
428 match &self.0 {
429 ConsumerInner::Native(c) => <crate::sys::Consumer as AsAsyncConsumer>::commit_offset(
430 c, topic, vgroup_id, offset,
431 )
432 .await
433 .map_err(Into::into),
434 ConsumerInner::Ws(c) => {
435 <taos_ws::consumer::Consumer as AsAsyncConsumer>::commit_offset(
436 c, topic, vgroup_id, offset,
437 )
438 .await
439 .map_err(Into::into)
440 }
441 }
442 }
443
444 async fn list_topics(&self) -> RawResult<Vec<String>> {
445 match &self.0 {
446 ConsumerInner::Native(c) => <crate::sys::Consumer as AsAsyncConsumer>::list_topics(c)
447 .await
448 .map_err(Into::into),
449 ConsumerInner::Ws(c) => {
450 <taos_ws::consumer::Consumer as AsAsyncConsumer>::list_topics(c)
451 .await
452 .map_err(Into::into)
453 }
454 }
455 }
456
457 async fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>> {
458 match &self.0 {
459 ConsumerInner::Native(c) => {
460 <crate::sys::Consumer as AsAsyncConsumer>::assignments(c).await
461 }
462 ConsumerInner::Ws(c) => {
463 <taos_ws::consumer::Consumer as AsAsyncConsumer>::assignments(c).await
464 }
465 }
466 }
467
468 async fn topic_assignment(&self, topic: &str) -> Vec<Assignment> {
469 match &self.0 {
470 ConsumerInner::Native(c) => {
471 <crate::sys::Consumer as AsAsyncConsumer>::topic_assignment(c, topic).await
472 }
473 ConsumerInner::Ws(c) => {
474 <taos_ws::consumer::Consumer as AsAsyncConsumer>::topic_assignment(c, topic).await
475 }
476 }
477 }
478
479 async fn offset_seek(
480 &mut self,
481 topic: &str,
482 vgroup_id: VGroupId,
483 offset: i64,
484 ) -> RawResult<()> {
485 match &mut self.0 {
486 ConsumerInner::Native(c) => {
487 <crate::sys::Consumer as AsAsyncConsumer>::offset_seek(c, topic, vgroup_id, offset)
488 .await
489 .map_err(Into::into)
490 }
491 ConsumerInner::Ws(c) => <taos_ws::consumer::Consumer as AsAsyncConsumer>::offset_seek(
492 c, topic, vgroup_id, offset,
493 )
494 .await
495 .map_err(Into::into),
496 }
497 }
498
499 async fn committed(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
500 match &self.0 {
501 ConsumerInner::Native(c) => {
502 <crate::sys::Consumer as AsAsyncConsumer>::committed(c, topic, vgroup_id)
503 .await
504 .map_err(Into::into)
505 }
506 ConsumerInner::Ws(c) => {
507 <taos_ws::consumer::Consumer as AsAsyncConsumer>::committed(c, topic, vgroup_id)
508 .await
509 .map_err(Into::into)
510 }
511 }
512 }
513
514 async fn position(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
515 match &self.0 {
516 ConsumerInner::Native(c) => {
517 <crate::sys::Consumer as AsAsyncConsumer>::position(c, topic, vgroup_id)
518 .await
519 .map_err(Into::into)
520 }
521 ConsumerInner::Ws(c) => {
522 <taos_ws::consumer::Consumer as AsAsyncConsumer>::position(c, topic, vgroup_id)
523 .await
524 .map_err(Into::into)
525 }
526 }
527 }
528}
529
530impl taos_query::tmq::SyncOnAsync for Consumer {}
531impl taos_query::tmq::SyncOnAsync for Data {}
532impl taos_query::tmq::SyncOnAsync for Meta {}
533
534impl Iterator for Data {
535 type Item = RawResult<RawBlock>;
536
537 fn next(&mut self) -> Option<Self::Item> {
538 match &self.0 {
539 DataInner::Native(data) => {
540 <crate::sys::tmq::Data as taos_query::tmq::IsData>::fetch_raw_block(data)
541 .transpose()
542 }
543 DataInner::Ws(data) => {
544 <taos_ws::consumer::Data as taos_query::tmq::IsData>::fetch_raw_block(data)
545 .transpose()
546 }
547 }
548 }
549}
550#[cfg(test)]
552mod tests {
553
554 use super::TmqBuilder;
555
556 #[test]
557 fn builder() -> taos_query::RawResult<()> {
558 use taos_query::prelude::*;
559 let mut dsn: Dsn = "taos://".parse()?;
560 dsn.set("group.id", "group1");
561 dsn.set("client.id", "test");
562 dsn.set("auto.offset.reset", "earliest");
563
564 let _tmq = TmqBuilder::from_dsn(dsn)?;
565 Ok(())
566 }
567}
568
569#[cfg(test)]
570mod async_tests {
571 use std::{str::FromStr, time::Duration};
572
573 use super::TmqBuilder;
574 use crate::TaosBuilder;
575
576 #[tokio::test]
577 async fn test_ws_tmq_meta() -> taos_query::RawResult<()> {
578 use taos_query::prelude::*;
582 let dsn = std::env::var("TEST_DSN").unwrap_or("taos+ws://localhost:6041".to_string());
583 let mut dsn = Dsn::from_str(&dsn)?;
584
585 let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
586
587 let db = "ws_abc1";
588
589 taos.exec(format!("drop topic if exists {db}")).await?;
590 taos.exec(format!("drop database if exists {db}")).await?;
591
592 std::thread::sleep(std::time::Duration::from_secs(3));
593
594 taos.exec(format!(
595 "create database if not exists {db} wal_retention_period 3600"
596 ))
597 .await?;
598
599 std::thread::sleep(std::time::Duration::from_secs(3));
600
601 taos.exec_many([
602 "create topic ws_abc1 with meta as database ws_abc1",
603 "use ws_abc1",
604 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
606 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
607 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
608 tags(t1 json)",
609 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
611 "create table tb1 using stb1 tags(NULL)",
612 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
613 NULL, NULL, NULL, NULL, NULL,
614 NULL, NULL, NULL, NULL)
615 tb1 values(now, true, -2, -3, -4, -5, \
616 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
617 254, 65534, 1, 1)",
618 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
620 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
621 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
622 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
623 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
624 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
625 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
627 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
628 254, 65534, 1, 1)",
629 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
630 NULL, NULL, NULL, NULL, NULL,
631 NULL, NULL, NULL, NULL)",
632 "create table `table` (ts timestamp, v int)",
634 "alter table stb1 add column new1 bool",
636 "alter table stb1 add column new2 tinyint",
637 "alter table stb1 add column new10 nchar(16)",
638 "alter table stb1 modify column new10 nchar(32)",
639 "alter table stb1 drop column new10",
640 "alter table stb1 drop column new2",
641 "alter table stb1 drop column new1",
642 "alter table `stb2` add tag new1 bool",
644 "alter table `stb2` rename tag new1 new1_new",
645 "alter table `stb2` modify tag t10 nchar(32)",
646 "alter table `stb2` drop tag new1_new",
647 "alter table `table` add column new1 bool",
649 "alter table `table` add column new2 tinyint",
650 "alter table `table` add column new10 nchar(16)",
651 "alter table `table` modify column new10 nchar(32)",
652 "alter table `table` rename column new10 new10_new",
653 "alter table `table` drop column new10_new",
654 "alter table `table` drop column new2",
655 "alter table `table` drop column new1",
656 "drop table `table`",
658 "drop table `tb2`, `tb1`",
660 "drop table `stb2`",
662 "drop table `stb1`",
663 ])
664 .await?;
665
666 taos.exec_many([
667 "drop database if exists db2",
668 "create database if not exists db2 wal_retention_period 3600",
669 "use db2",
670 ])
671 .await?;
672
673 dsn.params.insert("group.id".to_string(), "abc".to_string());
674
675 dsn.params
676 .insert("auto.offset.reset".to_string(), "earliest".to_string());
677
678 let builder = TmqBuilder::from_dsn(&dsn)?;
679 let mut consumer = builder.build().await?;
680 consumer.subscribe(["ws_abc1"]).await?;
681
682 {
683 let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
684
685 while let Some((offset, message)) = stream.try_next().await? {
686 let _ = offset.topic();
689 let _ = offset.database();
690 let _ = offset.vgroup_id();
691
692 match message {
697 MessageSet::Meta(meta) => {
698 let raw = meta.as_raw_meta().await?;
699 taos.write_raw_meta(&raw).await?;
700
701 let json = meta.as_json_meta().await?;
703 let sql = json.iter().next().unwrap().to_string();
704 if let Err(err) = taos.exec(sql).await {
705 println!("maybe error: {}", err);
706 }
707 }
708 MessageSet::Data(data) => {
709 while let Some(data) = data.fetch_raw_block().await? {
711 dbg!(data.table_name());
712 dbg!(data);
713 }
714 }
715 MessageSet::MetaData(meta, data) => {
716 let raw = meta.as_raw_meta().await?;
717 taos.write_raw_meta(&raw).await?;
718
719 let json = meta.as_json_meta().await?;
721 let sql = json.iter().next().unwrap().to_string();
722 if let Err(err) = taos.exec(sql).await {
723 println!("maybe error: {}", err);
724 }
725 while let Some(data) = data.fetch_raw_block().await? {
727 dbg!(data.table_name());
728 dbg!(data);
729 }
730 }
731 }
732 consumer.commit(offset).await?;
733 }
734 }
735 consumer.unsubscribe().await;
736
737 tokio::time::sleep(Duration::from_secs(2)).await;
738
739 taos.exec_many([
740 "drop database db2",
741 "drop topic ws_abc1",
742 "drop database ws_abc1",
743 ])
744 .await?;
745 Ok(())
746 }
747
748 #[tokio::test]
749 #[ignore]
750 async fn test_tmq() -> taos_query::RawResult<()> {
751 use taos_query::prelude::*;
756 let dsn = "taos://localhost:6030".to_string();
758 log::info!("dsn: {}", dsn);
759 let mut dsn = Dsn::from_str(&dsn)?;
760
761 let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
762 taos.exec_many([
763 "drop topic if exists ws_abc1",
764 "drop database if exists ws_abc1",
765 "create database ws_abc1 wal_retention_period 3600",
766 "create topic ws_abc1 with meta as database ws_abc1",
767 "use ws_abc1",
768 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
770 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
771 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
772 tags(t1 json)",
773 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
775 "create table tb1 using stb1 tags(NULL)",
776 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
777 NULL, NULL, NULL, NULL, NULL,
778 NULL, NULL, NULL, NULL)
779 tb1 values(now, true, -2, -3, -4, -5, \
780 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
781 254, 65534, 1, 1)",
782 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
784 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
785 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
786 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
787 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
788 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
789 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
791 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
792 254, 65534, 1, 1)",
793 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
794 NULL, NULL, NULL, NULL, NULL,
795 NULL, NULL, NULL, NULL)",
796 "create table `table` (ts timestamp, v int)",
798 "alter table stb1 add column new1 bool",
800 "alter table stb1 add column new2 tinyint",
801 "alter table stb1 add column new10 nchar(16)",
802 "alter table stb1 modify column new10 nchar(32)",
803 "alter table stb1 drop column new10",
804 "alter table stb1 drop column new2",
805 "alter table stb1 drop column new1",
806 "alter table `stb2` add tag new1 bool",
808 "alter table `stb2` rename tag new1 new1_new",
809 "alter table `stb2` modify tag t10 nchar(32)",
810 "alter table `stb2` drop tag new1_new",
811 "alter table `table` add column new1 bool",
813 "alter table `table` add column new2 tinyint",
814 "alter table `table` add column new10 nchar(16)",
815 "alter table `table` modify column new10 nchar(32)",
816 "alter table `table` rename column new10 new10_new",
817 "alter table `table` drop column new10_new",
818 "alter table `table` drop column new2",
819 "alter table `table` drop column new1",
820 ])
828 .await?;
829
830 taos.exec_many([
831 "drop database if exists db2",
832 "create database if not exists db2 wal_retention_period 3600",
833 "use db2",
834 ])
835 .await?;
836
837 dsn.params.insert("group.id".to_string(), "abc".to_string());
838
839 dsn.params
840 .insert("auto.offset.reset".to_string(), "earliest".to_string());
841
842 let builder = TmqBuilder::from_dsn(&dsn)?;
843 let mut consumer = builder.build().await?;
844 consumer.subscribe(["ws_abc1"]).await?;
845
846 {
847 let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
848
849 while let Some((offset, message)) = stream.try_next().await? {
850 let topic: &str = offset.topic();
853 let database = offset.database();
854 let vgroup_id = offset.vgroup_id();
855 log::debug!(
856 "topic: {}, database: {}, vgroup_id: {}",
857 topic,
858 database,
859 vgroup_id
860 );
861
862 match message {
867 MessageSet::Meta(meta) => {
868 log::debug!("Meta");
869 let raw = meta.as_raw_meta().await?;
870 taos.write_raw_meta(&raw).await?;
871
872 let json = meta.as_json_meta().await?;
874 let sql = json.iter().next().unwrap().to_string();
875 if let Err(err) = taos.exec(sql).await {
876 println!("maybe error: {}", err);
877 }
878 }
879 MessageSet::Data(data) => {
880 log::debug!("Data");
881 while let Some(data) = data.fetch_raw_block().await? {
883 log::debug!("table_name: {:?}", data.table_name());
884 log::debug!("data: {:?}", data);
885 }
886 }
887 MessageSet::MetaData(meta, data) => {
888 log::debug!("MetaData");
889 let raw = meta.as_raw_meta().await?;
890 taos.write_raw_meta(&raw).await?;
891
892 let json = meta.as_json_meta().await?;
894 let sql = json.iter().next().unwrap().to_string();
895 if let Err(err) = taos.exec(sql).await {
896 println!("maybe error: {}", err);
897 }
898 while let Some(data) = data.fetch_raw_block().await? {
900 log::debug!("table_name: {:?}", data.table_name());
901 log::debug!("data: {:?}", data);
902 }
903 }
904 }
905 consumer.commit(offset).await?;
906 }
907 }
908
909 let assignments = consumer.assignments().await.unwrap();
910 log::debug!("assignments: {:?}", assignments);
911
912 for topic_vec_assignment in assignments {
914 let topic = &topic_vec_assignment.0;
915 let vec_assignment = topic_vec_assignment.1;
916 for assignment in vec_assignment {
917 let vgroup_id = assignment.vgroup_id();
918 let current = assignment.current_offset();
919 let begin = assignment.begin();
920 let end = assignment.end();
921 log::debug!(
922 "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
923 topic,
924 vgroup_id,
925 current,
926 begin,
927 end
928 );
929 let res = consumer.offset_seek(topic, vgroup_id, end).await;
930 if res.is_err() {
931 log::error!("seek offset error: {:?}", res);
932 let a = consumer.assignments().await.unwrap();
933 log::error!("assignments: {:?}", a);
934 }
936 }
937
938 let topic_assignment = consumer.topic_assignment(topic).await;
939 log::debug!("topic assignment: {:?}", topic_assignment);
940 }
941
942 let assignments = consumer.assignments().await.unwrap();
944 log::debug!("after seek offset assignments: {:?}", assignments);
945
946 consumer.unsubscribe().await;
947
948 tokio::time::sleep(Duration::from_secs(1)).await;
949
950 taos.exec_many([
951 "drop database db2",
952 "drop topic ws_abc1",
953 "drop database ws_abc1",
954 ])
955 .await?;
956 Ok(())
957 }
958
959 #[tokio::test]
960 #[ignore]
961 async fn test_tmq_offset() -> taos_query::RawResult<()> {
962 use taos_query::prelude::*;
967 let dsn = "tmq://localhost:6030?offset=10:20,11:40".to_string();
969 log::info!("dsn: {}", dsn);
970 let mut dsn = Dsn::from_str(&dsn)?;
971 let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
974 taos.exec_many([
975 "drop topic if exists ws_abc1",
976 "drop database if exists ws_abc1",
977 "create database ws_abc1 wal_retention_period 3600",
978 "create topic ws_abc1 with meta as database ws_abc1",
979 "use ws_abc1",
980 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
982 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
983 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
984 tags(t1 json)",
985 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
987 "create table tb1 using stb1 tags(NULL)",
988 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
989 NULL, NULL, NULL, NULL, NULL,
990 NULL, NULL, NULL, NULL)
991 tb1 values(now, true, -2, -3, -4, -5, \
992 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
993 254, 65534, 1, 1)",
994 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
996 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
997 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
998 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
999 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1000 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1001 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1003 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1004 254, 65534, 1, 1)",
1005 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1006 NULL, NULL, NULL, NULL, NULL,
1007 NULL, NULL, NULL, NULL)",
1008 "create table `table` (ts timestamp, v int)",
1010 "alter table stb1 add column new1 bool",
1012 "alter table stb1 add column new2 tinyint",
1013 "alter table stb1 add column new10 nchar(16)",
1014 "alter table stb1 modify column new10 nchar(32)",
1015 "alter table stb1 drop column new10",
1016 "alter table stb1 drop column new2",
1017 "alter table stb1 drop column new1",
1018 "alter table `stb2` add tag new1 bool",
1020 "alter table `stb2` rename tag new1 new1_new",
1021 "alter table `stb2` modify tag t10 nchar(32)",
1022 "alter table `stb2` drop tag new1_new",
1023 "alter table `table` add column new1 bool",
1025 "alter table `table` add column new2 tinyint",
1026 "alter table `table` add column new10 nchar(16)",
1027 "alter table `table` modify column new10 nchar(32)",
1028 "alter table `table` rename column new10 new10_new",
1029 "alter table `table` drop column new10_new",
1030 "alter table `table` drop column new2",
1031 "alter table `table` drop column new1",
1032 ])
1040 .await?;
1041
1042 taos.exec_many([
1043 "drop database if exists db2",
1044 "create database if not exists db2 wal_retention_period 3600",
1045 "use db2",
1046 ])
1047 .await?;
1048
1049 dsn.params.insert("group.id".to_string(), "abc".to_string());
1050
1051 dsn.params
1052 .insert("auto.offset.reset".to_string(), "earliest".to_string());
1053
1054 let builder = TmqBuilder::from_dsn(&dsn)?;
1055 let mut consumer = builder.build().await?;
1057
1058 consumer.subscribe(["ws_abc1"]).await?;
1059
1060 {
1061 let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
1062
1063 while let Some((offset, message)) = stream.try_next().await? {
1064 let topic: &str = offset.topic();
1067 let database = offset.database();
1068 let vgroup_id = offset.vgroup_id();
1069 log::debug!(
1070 "topic: {}, database: {}, vgroup_id: {}",
1071 topic,
1072 database,
1073 vgroup_id
1074 );
1075
1076 match message {
1081 MessageSet::Meta(meta) => {
1082 log::debug!("Meta");
1083 let raw = meta.as_raw_meta().await?;
1084 taos.write_raw_meta(&raw).await?;
1085
1086 let json = meta.as_json_meta().await?;
1088 let sql = json.iter().next().unwrap().to_string();
1089 if let Err(err) = taos.exec(sql).await {
1090 println!("maybe error: {}", err);
1091 }
1092 }
1093 MessageSet::Data(data) => {
1094 log::debug!("Data");
1095 while let Some(data) = data.fetch_raw_block().await? {
1097 log::debug!("table_name: {:?}", data.table_name());
1098 log::debug!("data: {:?}", data);
1099 }
1100 }
1101 MessageSet::MetaData(meta, data) => {
1102 log::debug!("MetaData");
1103 let raw = meta.as_raw_meta().await?;
1104 taos.write_raw_meta(&raw).await?;
1105
1106 let json = meta.as_json_meta().await?;
1108 let sql = json.iter().next().unwrap().to_string();
1109 if let Err(err) = taos.exec(sql).await {
1110 println!("maybe error: {}", err);
1111 }
1112 while let Some(data) = data.fetch_raw_block().await? {
1114 log::debug!("table_name: {:?}", data.table_name());
1115 log::debug!("data: {:?}", data);
1116 }
1117 }
1118 }
1119 consumer.commit(offset).await?;
1120 }
1121 }
1122
1123 let assignments = consumer.assignments().await.unwrap();
1124 log::debug!("assignments: {:?}", assignments);
1125
1126 for topic_vec_assignment in assignments {
1128 let topic = &topic_vec_assignment.0;
1129 let vec_assignment = topic_vec_assignment.1;
1130 for assignment in vec_assignment {
1131 let vgroup_id = assignment.vgroup_id();
1132 let current = assignment.current_offset();
1133 let begin = assignment.begin();
1134 let end = assignment.end();
1135 log::debug!(
1136 "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
1137 topic,
1138 vgroup_id,
1139 current,
1140 begin,
1141 end
1142 );
1143 let res = consumer.offset_seek(topic, vgroup_id, end).await;
1144 if res.is_err() {
1145 log::error!("seek offset error: {:?}", res);
1146 let a = consumer.assignments().await.unwrap();
1147 log::error!("assignments: {:?}", a);
1148 }
1150 }
1151
1152 let topic_assignment = consumer.topic_assignment(topic).await;
1153 log::debug!("topic assignment: {:?}", topic_assignment);
1154 }
1155
1156 let assignments = consumer.assignments().await.unwrap();
1158 log::debug!("after seek offset assignments: {:?}", assignments);
1159
1160 consumer.unsubscribe().await;
1161
1162 tokio::time::sleep(Duration::from_secs(1)).await;
1163
1164 taos.exec_many([
1165 "drop database db2",
1166 "drop topic ws_abc1",
1167 "drop database ws_abc1",
1168 ])
1169 .await?;
1170 Ok(())
1171 }
1172
1173 #[tokio::test]
1174 async fn test_ws_tmq() -> taos_query::RawResult<()> {
1175 use taos_query::prelude::*;
1180 let dsn = "taosws://localhost:6041".to_string();
1182 log::info!("dsn: {}", dsn);
1183 let mut dsn = Dsn::from_str(&dsn)?;
1184
1185 let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
1186
1187 let db = "ws_tmq_1";
1188 let db2 = "ws_tmq_1_dest";
1189
1190 taos.exec_many([
1191 format!("drop topic if exists {db}").as_str(),
1192 format!("drop database if exists {db}").as_str(),
1193 format!("create database {db} wal_retention_period 1").as_str(),
1194 format!("create topic {db} with meta as database {db}").as_str(),
1195 format!("use {db}").as_str(),
1196 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1198 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1199 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1200 tags(t1 json)",
1201 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1203 "create table tb1 using stb1 tags(NULL)",
1204 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1205 NULL, NULL, NULL, NULL, NULL,
1206 NULL, NULL, NULL, NULL)
1207 tb1 values(now, true, -2, -3, -4, -5, \
1208 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1209 254, 65534, 1, 1)",
1210 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1212 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1213 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1214 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1215 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1216 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1217 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1219 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1220 254, 65534, 1, 1)",
1221 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1222 NULL, NULL, NULL, NULL, NULL,
1223 NULL, NULL, NULL, NULL)",
1224 "create table `table` (ts timestamp, v int)",
1226 ])
1227 .await?;
1228
1229 taos.exec_many([
1230 format!("drop database if exists {db2}"),
1231 format!("create database if not exists {db2} wal_retention_period 1"),
1232 format!("use {db2}"),
1233 ])
1234 .await?;
1235
1236 dsn.params.insert("group.id".to_string(), "abc".to_string());
1237
1238 dsn.params
1239 .insert("auto.offset.reset".to_string(), "earliest".to_string());
1240
1241 let builder = TmqBuilder::from_dsn(&dsn)?;
1242 let mut consumer = builder.build().await?;
1243 consumer.subscribe([db]).await?;
1244
1245 {
1246 let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
1247
1248 while let Some((offset, message)) = stream.try_next().await? {
1249 let topic: &str = offset.topic();
1252 let database = offset.database();
1253 let vgroup_id = offset.vgroup_id();
1254 log::debug!(
1255 "topic: {}, database: {}, vgroup_id: {}",
1256 topic,
1257 database,
1258 vgroup_id
1259 );
1260
1261 match message {
1266 MessageSet::Meta(meta) => {
1267 log::debug!("Meta");
1268 let raw = meta.as_raw_meta().await?;
1269 taos.write_raw_meta(&raw).await?;
1270
1271 let json = meta.as_json_meta().await?;
1273 let sql = json.iter().next().unwrap().to_string();
1274 if let Err(err) = taos.exec(sql).await {
1276 log::error!("meta error: {}", err);
1277 }
1278 }
1279 MessageSet::Data(data) => {
1280 log::debug!("Data");
1281 while let Some(data) = data.fetch_raw_block().await? {
1283 log::debug!("table_name: {:?}", data.table_name());
1284 log::debug!("data: {:?}", data);
1285 }
1286 }
1287 MessageSet::MetaData(meta, data) => {
1288 log::debug!("MetaData");
1289 let raw = meta.as_raw_meta().await?;
1290 taos.write_raw_meta(&raw).await?;
1291
1292 let json = meta.as_json_meta().await?;
1294 let sql = json.iter().next().unwrap().to_string();
1295 if let Err(err) = taos.exec(sql).await {
1296 println!("metadata error: {}", err);
1297 }
1298 while let Some(data) = data.fetch_raw_block().await? {
1300 log::debug!("table_name: {:?}", data.table_name());
1301 log::debug!("data: {:?}", data);
1302 }
1303 }
1304 }
1305 consumer.commit(offset).await?;
1306 }
1307 }
1308
1309 let assignments = consumer.assignments().await.unwrap();
1310 log::info!("assignments: {:?}", assignments);
1312
1313 for topic_vec_assignment in assignments {
1315 let topic = &topic_vec_assignment.0;
1316 let vec_assignment = topic_vec_assignment.1;
1317 for assignment in vec_assignment {
1318 let vgroup_id = assignment.vgroup_id();
1319 let current = assignment.current_offset();
1320 let begin = assignment.begin();
1321 let end = assignment.end();
1322 log::info!(
1323 "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
1324 topic,
1325 vgroup_id,
1326 current,
1327 begin,
1328 end
1329 );
1330 let res = consumer.offset_seek(topic, vgroup_id, end).await;
1331 if res.is_err() {
1332 log::error!("seek offset error: {:?}", res);
1333 let a = consumer.assignments().await.unwrap();
1334 log::error!("assignments: {:?}", a);
1335 }
1337 }
1338
1339 let topic_assignment = consumer.topic_assignment(topic).await;
1340 log::info!("topic assignment: {:?}", topic_assignment);
1341 }
1342
1343 let assignments = consumer.assignments().await.unwrap();
1345 log::info!("after seek offset assignments: {:?}", assignments);
1346
1347 consumer.unsubscribe().await;
1348
1349 tokio::time::sleep(Duration::from_secs(1)).await;
1350
1351 taos.exec_many([
1352 format!("drop database {db2}"),
1353 format!("drop topic {db}"),
1354 format!("drop database {db}"),
1355 ])
1356 .await?;
1357 Ok(())
1358 }
1359
1360 #[tokio::test]
1361 async fn test_ws_raw_block_table_name() -> taos_query::RawResult<()> {
1362 use taos_query::prelude::*;
1367 let dsn = "taosws://localhost:6041".to_string();
1369 log::info!("dsn: {}", dsn);
1370 let mut dsn = Dsn::from_str(&dsn)?;
1371
1372 let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
1373
1374 let db = "ws_tmq_block_1";
1375 let db2 = "ws_tmq_block_1_dest";
1376
1377 taos.exec_many([
1378 format!("drop topic if exists {db}").as_str(),
1379 format!("drop database if exists {db}").as_str(),
1380 format!("create database {db} wal_retention_period 1").as_str(),
1381 format!("create topic {db} with meta as database {db}").as_str(),
1382 format!("use {db}").as_str(),
1383 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1385 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1386 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1387 tags(t1 json)",
1388 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1390 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1391 NULL, NULL, NULL, NULL, NULL,
1392 NULL, NULL, NULL, NULL)",
1393 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1395 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1396 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1397 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1398 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1399 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1400 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1402 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1403 254, 65534, 1, 1)",
1404 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1405 NULL, NULL, NULL, NULL, NULL,
1406 NULL, NULL, NULL, NULL)",
1407 "create table `table` (ts timestamp, v int)",
1409 ])
1410 .await?;
1411
1412 taos.exec_many([
1413 format!("drop database if exists {db2}"),
1414 format!("create database if not exists {db2} wal_retention_period 1"),
1415 format!("use {db2}"),
1416 ])
1417 .await?;
1418
1419 dsn.params.insert("group.id".to_string(), "abc".to_string());
1420
1421 dsn.params
1422 .insert("auto.offset.reset".to_string(), "earliest".to_string());
1423
1424 let builder = TmqBuilder::from_dsn(&dsn)?;
1425 let mut consumer = builder.build().await?;
1426 consumer.subscribe([db]).await?;
1427
1428 {
1429 let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
1430
1431 while let Some((offset, message)) = stream.try_next().await? {
1432 let topic: &str = offset.topic();
1433 let database = offset.database();
1434 let vgroup_id = offset.vgroup_id();
1435 log::debug!(
1436 "topic: {}, database: {}, vgroup_id: {}",
1437 topic,
1438 database,
1439 vgroup_id
1440 );
1441
1442 match message {
1443 MessageSet::Meta(meta) => {
1444 log::debug!("Meta");
1445 let raw = meta.as_raw_meta().await?;
1446 taos.write_raw_meta(&raw).await?;
1447
1448 let json = meta.as_json_meta().await?;
1449 let sql = json.iter().next().unwrap().to_string();
1450 if let Err(err) = taos.exec(sql).await {
1452 log::error!("meta error: {}", err);
1453 }
1454 }
1455 MessageSet::Data(data) => {
1456 log::info!("Data");
1457 while let Some(data) = data.fetch_raw_block().await? {
1459 log::info!("table_name: {:?}", data.table_name());
1460 assert_eq!(data.table_name(), Some("tb0"));
1461 log::info!("data: {}", data.pretty_format());
1462 assert!(data
1463 .pretty_format()
1464 .to_string()
1465 .contains("table name \"tb0\""));
1466 }
1467 }
1468 MessageSet::MetaData(meta, data) => {
1469 log::info!("MetaData");
1470 let raw = meta.as_raw_meta().await?;
1471 taos.write_raw_meta(&raw).await?;
1472
1473 let json = meta.as_json_meta().await?;
1475 let sql = json.iter().next().unwrap().to_string();
1476 if let Err(err) = taos.exec(sql).await {
1477 println!("metadata error: {}", err);
1478 }
1479 while let Some(data) = data.fetch_raw_block().await? {
1481 log::info!("MetaData table_name: {:?}", data.table_name());
1482 log::info!("MetaData data: {:?}", data);
1483 }
1484 }
1485 }
1486 consumer.commit(offset).await?;
1487 }
1488 }
1489
1490 consumer.unsubscribe().await;
1491
1492 tokio::time::sleep(Duration::from_secs(1)).await;
1493
1494 taos.exec_many([
1495 format!("drop database {db2}"),
1496 format!("drop topic {db}"),
1497 format!("drop database {db}"),
1498 ])
1499 .await?;
1500 Ok(())
1501 }
1502
1503 #[tokio::test]
1504 async fn test_ws_flush_db() -> taos_query::RawResult<()> {
1505 use taos_query::prelude::*;
1510 let dsn = "taosws://localhost:6041".to_string();
1512 log::info!("dsn: {}", dsn);
1513 let mut dsn = Dsn::from_str(&dsn)?;
1514
1515 let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
1516
1517 let db = "ws_tmq_flush_1";
1518 let db2 = "ws_tmq_flush_1_dest";
1519
1520 taos.exec_many([
1521 format!("drop topic if exists {db}").as_str(),
1522 format!("drop database if exists {db}").as_str(),
1523 format!("create database {db} wal_retention_period 1").as_str(),
1524 format!("create topic {db} with meta as database {db}").as_str(),
1525 format!("use {db}").as_str(),
1526 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1528 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1529 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1530 tags(t1 json)",
1531 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1533 "create table tb1 using stb1 tags(NULL)",
1534 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1535 NULL, NULL, NULL, NULL, NULL,
1536 NULL, NULL, NULL, NULL)
1537 tb1 values(now, true, -2, -3, -4, -5, \
1538 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1539 254, 65534, 1, 1)",
1540 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1542 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1543 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1544 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1545 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1546 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1547 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1549 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1550 254, 65534, 1, 1)",
1551 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1552 NULL, NULL, NULL, NULL, NULL,
1553 NULL, NULL, NULL, NULL)",
1554 "create table `table` (ts timestamp, v int)",
1556 "alter table stb1 add column new1 bool",
1558 "alter table stb1 add column new2 tinyint",
1559 "alter table stb1 add column new10 nchar(16)",
1560 "alter table stb1 modify column new10 nchar(32)",
1561 "alter table stb1 drop column new10",
1562 "alter table stb1 drop column new2",
1563 "alter table stb1 drop column new1",
1564 "alter table `stb2` add tag new1 bool",
1566 "alter table `stb2` rename tag new1 new1_new",
1567 "alter table `stb2` modify tag t10 nchar(32)",
1568 "alter table `stb2` drop tag new1_new",
1569 "alter table `table` add column new1 bool",
1571 "alter table `table` add column new2 tinyint",
1572 "alter table `table` add column new10 nchar(16)",
1573 "alter table `table` modify column new10 nchar(32)",
1574 "alter table `table` rename column new10 new10_new",
1575 "alter table `table` drop column new10_new",
1576 "alter table `table` drop column new2",
1577 "alter table `table` drop column new1",
1578 ])
1579 .await?;
1580
1581 for _ in 0..1000 {
1582 taos.exec(
1583 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1584 NULL, NULL, NULL, NULL, NULL,
1585 NULL, NULL, NULL, NULL)
1586 tb1 values(now, true, -2, -3, -4, -5, \
1587 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1588 254, 65534, 1, 1)",
1589 )
1590 .await?;
1591 }
1592 taos.exec(format!("flush database {db}")).await?;
1593 tokio::time::sleep(Duration::from_secs(1)).await;
1594
1595 taos.exec_many([
1596 format!("drop database if exists {db2}"),
1597 format!("create database if not exists {db2} wal_retention_period 1"),
1598 format!("use {db2}"),
1599 ])
1600 .await?;
1601
1602 dsn.params.insert("group.id".to_string(), "abc".to_string());
1603
1604 dsn.params
1605 .insert("auto.offset.reset".to_string(), "earliest".to_string());
1606 dsn.params.insert(
1607 "experimental.snapshot.enable".to_string(),
1608 "true".to_string(),
1609 );
1610
1611 let builder = TmqBuilder::from_dsn(&dsn)?;
1612 let mut consumer = builder.build().await?;
1613 consumer.subscribe([db]).await?;
1614
1615 {
1616 let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
1617
1618 while let Some((offset, message)) = stream.try_next().await? {
1619 let topic: &str = offset.topic();
1622 let database = offset.database();
1623 let vgroup_id = offset.vgroup_id();
1624 log::debug!(
1625 "topic: {}, database: {}, vgroup_id: {}",
1626 topic,
1627 database,
1628 vgroup_id
1629 );
1630
1631 match message {
1636 MessageSet::Meta(meta) => {
1637 log::debug!("Meta");
1638 let raw = meta.as_raw_meta().await?;
1639 taos.write_raw_meta(&raw).await?;
1640
1641 let json = meta.as_json_meta().await?;
1643 let sql = json.iter().next().unwrap().to_string();
1644 if let Err(err) = taos.exec(sql).await {
1646 log::error!("maybe error: {}", err);
1647 }
1648 }
1649 MessageSet::Data(data) => {
1650 log::debug!("Data");
1651 while let Some(data) = data.fetch_raw_block().await? {
1653 log::debug!("table_name: {:?}", data.table_name());
1654 log::debug!("data: {:?}", data);
1655 }
1656 }
1657 MessageSet::MetaData(meta, data) => {
1658 log::debug!("MetaData");
1659 let raw = meta.as_raw_meta().await?;
1660 taos.write_raw_meta(&raw).await?;
1661
1662 let json = meta.as_json_meta().await?;
1664 let sql = json.iter().next().unwrap().to_string();
1665 if let Err(err) = taos.exec(sql).await {
1666 println!("maybe error: {}", err);
1667 }
1668 while let Some(data) = data.fetch_raw_block().await? {
1670 log::debug!("table_name: {:?}", data.table_name());
1671 log::debug!("data: {:?}", data);
1672 }
1673 }
1674 }
1675 consumer.commit(offset).await?;
1676 }
1677 }
1678
1679 let assignments = consumer.assignments().await.unwrap();
1680 log::info!("assignments: {:?}", assignments);
1682
1683 for topic_vec_assignment in assignments {
1685 let topic = &topic_vec_assignment.0;
1686 let vec_assignment = topic_vec_assignment.1;
1687 for assignment in vec_assignment {
1688 let vgroup_id = assignment.vgroup_id();
1689 let current = assignment.current_offset();
1690 let begin = assignment.begin();
1691 let end = assignment.end();
1692 log::info!(
1693 "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
1694 topic,
1695 vgroup_id,
1696 current,
1697 begin,
1698 end
1699 );
1700 let res = consumer.offset_seek(topic, vgroup_id, end).await;
1701 if res.is_err() {
1702 log::error!("seek offset error: {:?}", res);
1703 let a = consumer.assignments().await.unwrap();
1704 log::error!("assignments: {:?}", a);
1705 }
1707 }
1708
1709 let topic_assignment = consumer.topic_assignment(topic).await;
1710 log::info!("topic assignment: {:?}", topic_assignment);
1711 }
1712
1713 let assignments = consumer.assignments().await.unwrap();
1715 log::info!("after seek offset assignments: {:?}", assignments);
1716
1717 consumer.unsubscribe().await;
1718
1719 tokio::time::sleep(Duration::from_secs(1)).await;
1720
1721 taos.exec_many([
1722 format!("drop database {db2}"),
1723 format!("drop topic {db}"),
1724 format!("drop database {db}"),
1725 ])
1726 .await?;
1727 Ok(())
1728 }
1729
1730 #[tokio::test]
1731 async fn test_ws_tmq_snapshot() -> taos_query::RawResult<()> {
1732 use taos_query::prelude::*;
1736 let dsn = "taosws://localhost:6041".to_string();
1738 log::info!("dsn: {}", dsn);
1739 let mut dsn = Dsn::from_str(&dsn)?;
1740
1741 let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
1742
1743 let db = "ws_abc1_snapshot";
1744 let db2 = "ws_abc1_snapshot_dest";
1745
1746 taos.exec_many([
1747 format!("drop topic if exists {db}").as_str(),
1748 format!("drop database if exists {db}").as_str(),
1749 format!("create database {db} wal_retention_period 3600").as_str(),
1750 format!("create topic {db} with meta as database {db}").as_str(),
1751 format!("use {db}").as_str(),
1752 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1754 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1755 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1756 tags(t1 json)",
1757 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1759 "create table tb1 using stb1 tags(NULL)",
1760 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1761 NULL, NULL, NULL, NULL, NULL,
1762 NULL, NULL, NULL, NULL)
1763 tb1 values(now, true, -2, -3, -4, -5, \
1764 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1765 254, 65534, 1, 1)",
1766 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1768 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1769 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1770 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1771 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1772 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1773 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1775 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1776 254, 65534, 1, 1)",
1777 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1778 NULL, NULL, NULL, NULL, NULL,
1779 NULL, NULL, NULL, NULL)",
1780 "create table `table` (ts timestamp, v int)",
1782 "alter table stb1 add column new1 bool",
1784 "alter table stb1 add column new2 tinyint",
1785 "alter table stb1 add column new10 nchar(16)",
1786 "alter table stb1 modify column new10 nchar(32)",
1787 "alter table stb1 drop column new10",
1788 "alter table stb1 drop column new2",
1789 "alter table stb1 drop column new1",
1790 "alter table `stb2` add tag new1 bool",
1792 "alter table `stb2` rename tag new1 new1_new",
1793 "alter table `stb2` modify tag t10 nchar(32)",
1794 "alter table `stb2` drop tag new1_new",
1795 "alter table `table` add column new1 bool",
1797 "alter table `table` add column new2 tinyint",
1798 "alter table `table` add column new10 nchar(16)",
1799 "alter table `table` modify column new10 nchar(32)",
1800 "alter table `table` rename column new10 new10_new",
1801 "alter table `table` drop column new10_new",
1802 "alter table `table` drop column new2",
1803 "alter table `table` drop column new1",
1804 ])
1805 .await?;
1806
1807 for _ in 0..100 {
1808 taos.exec(
1809 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1810 NULL, NULL, NULL, NULL, NULL,
1811 NULL, NULL, NULL, NULL)
1812 tb1 values(now, true, -2, -3, -4, -5, \
1813 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1814 254, 65534, 1, 1)",
1815 )
1816 .await?;
1817 }
1818
1819 taos.exec_many([
1820 format!("drop database if exists {db2}"),
1821 format!("create database if not exists {db2} wal_retention_period 3600"),
1822 format!("use {db2}"),
1823 ])
1824 .await?;
1825
1826 dsn.params.insert("group.id".to_string(), "abc".to_string());
1827
1828 dsn.params
1829 .insert("auto.offset.reset".to_string(), "earliest".to_string());
1830
1831 let builder = TmqBuilder::from_dsn(&dsn)?;
1832 let mut consumer = builder.build().await?;
1833 consumer.subscribe([db]).await?;
1834
1835 {
1836 let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
1837
1838 while let Some((offset, message)) = stream.try_next().await? {
1839 let topic: &str = offset.topic();
1842 let database = offset.database();
1843 let vgroup_id = offset.vgroup_id();
1844 log::debug!(
1845 "topic: {}, database: {}, vgroup_id: {}",
1846 topic,
1847 database,
1848 vgroup_id
1849 );
1850
1851 match message {
1856 MessageSet::Meta(meta) => {
1857 log::debug!("Meta");
1858 let raw = meta.as_raw_meta().await?;
1859 taos.write_raw_meta(&raw).await?;
1860
1861 let json = meta.as_json_meta().await?;
1863 let sql = json.iter().next().unwrap().to_string();
1864 if let Err(err) = taos.exec(sql).await {
1866 log::debug!("maybe error: {}", err);
1867 }
1868 }
1869 MessageSet::Data(data) => {
1870 log::debug!("Data");
1871 while let Some(data) = data.fetch_raw_block().await? {
1873 log::debug!("table_name: {:?}", data.table_name());
1874 log::debug!("data: {:?}", data);
1875 }
1876 }
1877 MessageSet::MetaData(meta, data) => {
1878 log::debug!("MetaData");
1879 let raw = meta.as_raw_meta().await?;
1880 taos.write_raw_meta(&raw).await?;
1881
1882 let json = meta.as_json_meta().await?;
1884 let sql = json.iter().next().unwrap().to_string();
1885 if let Err(err) = taos.exec(sql).await {
1886 println!("maybe error: {}", err);
1887 }
1888 while let Some(data) = data.fetch_raw_block().await? {
1890 log::debug!("table_name: {:?}", data.table_name());
1891 log::debug!("data: {:?}", data);
1892 }
1893 }
1894 }
1895 consumer.commit(offset).await?;
1896 }
1897 }
1898
1899 let assignments = consumer.assignments().await.unwrap();
1900 dbg!(&assignments);
1901 log::info!("assignments: {:?}", assignments);
1902
1903 for topic_vec_assignment in assignments {
1905 let topic = &topic_vec_assignment.0;
1906 let vec_assignment = topic_vec_assignment.1;
1907 for assignment in vec_assignment {
1908 let vgroup_id = assignment.vgroup_id();
1909 let current = assignment.current_offset();
1910 let begin = assignment.begin();
1911 let end = assignment.end();
1912 log::info!(
1913 "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
1914 topic,
1915 vgroup_id,
1916 current,
1917 begin,
1918 end
1919 );
1920 let res = consumer.offset_seek(topic, vgroup_id, end).await;
1921 if res.is_err() {
1922 log::error!("seek offset error: {:?}", res);
1923 let a = consumer.assignments().await.unwrap();
1924 log::error!("assignments: {:?}", a);
1925 }
1927 }
1928
1929 let topic_assignment = consumer.topic_assignment(topic).await;
1930 log::info!("topic assignment: {:?}", topic_assignment);
1931 }
1932
1933 let assignments = consumer.assignments().await.unwrap();
1935 log::info!("after seek offset assignments: {:?}", assignments);
1936
1937 consumer.unsubscribe().await;
1938
1939 tokio::time::sleep(Duration::from_secs(1)).await;
1940
1941 taos.exec_many([
1942 format!("drop database {db2}"),
1943 format!("drop topic {db}"),
1944 format!("drop database {db}"),
1945 ])
1946 .await?;
1947 Ok(())
1948 }
1949 #[tokio::test]
1950 async fn test_ws_tmq_offset() -> taos_query::RawResult<()> {
1951 use taos_query::prelude::*;
1956 let dsn = "tmq+ws://localhost:6041?offset=10:20,11:40".to_string();
1958 log::info!("dsn: {}", dsn);
1959 let mut dsn = Dsn::from_str(&dsn)?;
1960
1961 let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
1962
1963 let db = "ws_tmq_abc2";
1964 let db2 = "ws_tmq_abc2_dest";
1965
1966 taos.exec(format!("drop topic if exists {db}")).await?;
1967 taos.exec(format!("drop database if exists {db}")).await?;
1968
1969 std::thread::sleep(std::time::Duration::from_secs(3));
1970
1971 taos.exec(format!(
1972 "create database if not exists {db} wal_retention_period 3600"
1973 ))
1974 .await?;
1975
1976 std::thread::sleep(std::time::Duration::from_secs(3));
1977
1978 taos.exec_many([
1979 format!("create topic {db} with meta as database {db}").as_str(),
1980 format!("use {db}").as_str(),
1981 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1983 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1984 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1985 tags(t1 json)",
1986 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1988 "create table tb1 using stb1 tags(NULL)",
1989 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1990 NULL, NULL, NULL, NULL, NULL,
1991 NULL, NULL, NULL, NULL)
1992 tb1 values(now, true, -2, -3, -4, -5, \
1993 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1994 254, 65534, 1, 1)",
1995 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1997 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1998 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1999 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
2000 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
2001 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
2002 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
2004 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2005 254, 65534, 1, 1)",
2006 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
2007 NULL, NULL, NULL, NULL, NULL,
2008 NULL, NULL, NULL, NULL)",
2009 "create table `table` (ts timestamp, v int)",
2011 "alter table stb1 add column new1 bool",
2013 "alter table stb1 add column new2 tinyint",
2014 "alter table stb1 add column new10 nchar(16)",
2015 "alter table stb1 modify column new10 nchar(32)",
2016 "alter table stb1 drop column new10",
2017 "alter table stb1 drop column new2",
2018 "alter table stb1 drop column new1",
2019 "alter table `stb2` add tag new1 bool",
2021 "alter table `stb2` rename tag new1 new1_new",
2022 "alter table `stb2` modify tag t10 nchar(32)",
2023 "alter table `stb2` drop tag new1_new",
2024 "alter table `table` add column new1 bool",
2026 "alter table `table` add column new2 tinyint",
2027 "alter table `table` add column new10 nchar(16)",
2028 "alter table `table` modify column new10 nchar(32)",
2029 "alter table `table` rename column new10 new10_new",
2030 "alter table `table` drop column new10_new",
2031 "alter table `table` drop column new2",
2032 "alter table `table` drop column new1",
2033 ])
2041 .await?;
2042
2043 taos.exec_many([
2044 format!("drop database if exists {db2}"),
2045 format!("create database if not exists {db2} wal_retention_period 3600"),
2046 format!("use {db2}"),
2047 ])
2048 .await?;
2049
2050 dsn.params.insert("group.id".to_string(), "abc".to_string());
2051
2052 dsn.params
2053 .insert("auto.offset.reset".to_string(), "earliest".to_string());
2054
2055 let builder = TmqBuilder::from_dsn(&dsn)?;
2056 let mut consumer = builder.build().await?;
2058 consumer.subscribe([db]).await?;
2059
2060 {
2061 let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
2062
2063 while let Some((offset, message)) = stream.try_next().await? {
2064 let topic: &str = offset.topic();
2067 let database = offset.database();
2068 let vgroup_id = offset.vgroup_id();
2069 log::debug!(
2070 "topic: {}, database: {}, vgroup_id: {}",
2071 topic,
2072 database,
2073 vgroup_id
2074 );
2075
2076 match message {
2081 MessageSet::Meta(meta) => {
2082 log::debug!("Meta");
2083 let raw = meta.as_raw_meta().await?;
2084 taos.write_raw_meta(&raw).await?;
2085
2086 let json = meta.as_json_meta().await?;
2088 let sql = json.iter().next().unwrap().to_string();
2089 if let Err(err) = taos.exec(sql).await {
2090 println!("maybe error: {}", err);
2091 }
2092 }
2093 MessageSet::Data(data) => {
2094 log::debug!("Data");
2095 while let Some(data) = data.fetch_raw_block().await? {
2097 log::debug!("table_name: {:?}", data.table_name());
2098 log::debug!("data: {:?}", data);
2099 }
2100 }
2101 MessageSet::MetaData(meta, data) => {
2102 log::debug!("MetaData");
2103 let raw = meta.as_raw_meta().await?;
2104 taos.write_raw_meta(&raw).await?;
2105
2106 let json = meta.as_json_meta().await?;
2108 let sql = json.iter().next().unwrap().to_string();
2109 if let Err(err) = taos.exec(sql).await {
2110 println!("maybe error: {}", err);
2111 }
2112 while let Some(data) = data.fetch_raw_block().await? {
2114 log::debug!("table_name: {:?}", data.table_name());
2115 log::debug!("data: {:?}", data);
2116 }
2117 }
2118 }
2119 consumer.commit(offset).await?;
2120 }
2121 }
2122
2123 let assignments = consumer.assignments().await.unwrap();
2124 log::debug!("assignments: {:?}", assignments);
2125
2126 for topic_vec_assignment in assignments {
2128 let topic = &topic_vec_assignment.0;
2129 let vec_assignment = topic_vec_assignment.1;
2130 for assignment in vec_assignment {
2131 let vgroup_id = assignment.vgroup_id();
2132 let current = assignment.current_offset();
2133 let begin = assignment.begin();
2134 let end = assignment.end();
2135 log::debug!(
2136 "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
2137 topic,
2138 vgroup_id,
2139 current,
2140 begin,
2141 end
2142 );
2143 let res = consumer.offset_seek(topic, vgroup_id, end).await;
2144 if res.is_err() {
2145 log::error!("seek offset error: {:?}", res);
2146 let a = consumer.assignments().await.unwrap();
2147 log::error!("assignments: {:?}", a);
2148 }
2150 }
2151
2152 let topic_assignment = consumer.topic_assignment(topic).await;
2153 log::debug!("topic assignment: {:?}", topic_assignment);
2154 }
2155
2156 let assignments = consumer.assignments().await.unwrap();
2158 log::debug!("after seek offset assignments: {:?}", assignments);
2159
2160 consumer.unsubscribe().await;
2161
2162 tokio::time::sleep(Duration::from_secs(1)).await;
2163
2164 taos.exec_many([
2165 format!("drop database {db2}"),
2166 format!("drop topic {db}"),
2167 format!("drop database {db}"),
2168 ])
2169 .await?;
2170 Ok(())
2171 }
2172
2173 #[tokio::test]
2174 async fn test_ws_tmq_committed() -> taos_query::RawResult<()> {
2175 use taos_query::prelude::*;
2180
2181 let dsn = "tmq+ws://localhost:6041?".to_string();
2182 log::info!("dsn: {}", dsn);
2183 let mut dsn = Dsn::from_str(&dsn)?;
2184
2185 let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
2186
2187 let db = "ws_tmq_committed";
2188 let db2 = "ws_tmq_committed_dest";
2189
2190 taos.exec(format!("drop topic if exists {db}")).await?;
2191 taos.exec(format!("drop database if exists {db}")).await?;
2192
2193 std::thread::sleep(std::time::Duration::from_secs(1));
2194
2195 taos.exec(format!(
2196 "create database if not exists {db} wal_retention_period 3600"
2197 ))
2198 .await?;
2199
2200 std::thread::sleep(std::time::Duration::from_secs(1));
2201
2202 taos.exec_many([
2203 format!("create topic {db} with meta as database {db}").as_str(),
2204 format!("use {db}").as_str(),
2205 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2207 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
2208 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2209 tags(t1 json)",
2210 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
2212 "create table tb1 using stb1 tags(NULL)",
2213 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
2214 NULL, NULL, NULL, NULL, NULL,
2215 NULL, NULL, NULL, NULL)
2216 tb1 values(now, true, -2, -3, -4, -5, \
2217 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2218 254, 65534, 1, 1)",
2219 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2221 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
2222 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2223 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
2224 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
2225 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
2226 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
2228 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2229 254, 65534, 1, 1)",
2230 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
2231 NULL, NULL, NULL, NULL, NULL,
2232 NULL, NULL, NULL, NULL)",
2233 "create table `table` (ts timestamp, v int)",
2235 "alter table stb1 add column new1 bool",
2237 "alter table stb1 add column new2 tinyint",
2238 "alter table stb1 add column new10 nchar(16)",
2239 "alter table stb1 modify column new10 nchar(32)",
2240 "alter table stb1 drop column new10",
2241 "alter table stb1 drop column new2",
2242 "alter table stb1 drop column new1",
2243 "alter table `stb2` add tag new1 bool",
2245 "alter table `stb2` rename tag new1 new1_new",
2246 "alter table `stb2` modify tag t10 nchar(32)",
2247 "alter table `stb2` drop tag new1_new",
2248 "alter table `table` add column new1 bool",
2250 "alter table `table` add column new2 tinyint",
2251 "alter table `table` add column new10 nchar(16)",
2252 "alter table `table` modify column new10 nchar(32)",
2253 "alter table `table` rename column new10 new10_new",
2254 "alter table `table` drop column new10_new",
2255 "alter table `table` drop column new2",
2256 "alter table `table` drop column new1",
2257 "drop table `table`",
2259 "drop table `tb2`, `tb1`",
2261 "drop table `stb2`",
2263 "drop table `stb1`",
2264 ])
2265 .await?;
2266
2267 taos.exec_many([
2268 format!("drop database if exists {db2}"),
2269 format!("create database if not exists {db2} wal_retention_period 3600"),
2270 format!("use {db2}"),
2271 ])
2272 .await?;
2273
2274 dsn.params.insert("group.id".to_string(), "abc".to_string());
2275
2276 dsn.params
2277 .insert("auto.offset.reset".to_string(), "earliest".to_string());
2278 let builder = TmqBuilder::from_dsn(&dsn)?;
2279 let mut consumer = builder.build().await?;
2281
2282 let topics = consumer.list_topics().await?;
2283 log::info!("topics: {:?}", topics);
2284 consumer.subscribe([db]).await?;
2285 let topics = consumer.list_topics().await?;
2286 log::info!("topics: {:?}", topics);
2287
2288 {
2289 let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
2290
2291 while let Some((offset, message)) = stream.try_next().await? {
2292 let topic: &str = offset.topic();
2295 let database = offset.database();
2296 let vgroup_id = offset.vgroup_id();
2297 log::debug!(
2298 "topic: {}, database: {}, vgroup_id: {}",
2299 topic,
2300 database,
2301 vgroup_id
2302 );
2303
2304 match message {
2309 MessageSet::Meta(meta) => {
2310 log::debug!("Meta");
2311 let raw = meta.as_raw_meta().await?;
2312 taos.write_raw_meta(&raw).await?;
2313
2314 let json = meta.as_json_meta().await?;
2316 let sql = json.iter().next().unwrap().to_string();
2317 if let Err(err) = taos.exec(sql).await {
2318 log::trace!("maybe error: {}", err);
2319 }
2320 }
2321 MessageSet::Data(data) => {
2322 log::debug!("Data");
2323 while let Some(data) = data.fetch_raw_block().await? {
2325 log::debug!("table_name: {:?}", data.table_name());
2326 log::debug!("data: {:?}", data);
2327 }
2328 }
2329 MessageSet::MetaData(meta, data) => {
2330 log::debug!("MetaData");
2331 let raw = meta.as_raw_meta().await?;
2332 taos.write_raw_meta(&raw).await?;
2333
2334 let json = meta.as_json_meta().await?;
2336 let sql = json.iter().next().unwrap().to_string();
2337 if let Err(err) = taos.exec(sql).await {
2338 println!("maybe error: {}", err);
2339 }
2340 while let Some(data) = data.fetch_raw_block().await? {
2342 log::debug!("table_name: {:?}", data.table_name());
2343 log::debug!("data: {:?}", data);
2344 }
2345 }
2346 }
2347 consumer.commit(offset).await?;
2348 }
2349 }
2350
2351 let assignments = consumer.assignments().await.unwrap();
2352 log::info!("assignments: {:?}", assignments);
2353
2354 for topic_vec_assignment in assignments {
2356 let topic = &topic_vec_assignment.0;
2357 let vec_assignment = topic_vec_assignment.1;
2358 for assignment in vec_assignment {
2359 let vgroup_id = assignment.vgroup_id();
2360 let current = assignment.current_offset();
2361 let begin = assignment.begin();
2362 let end = assignment.end();
2363 log::info!(
2364 "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
2365 topic,
2366 vgroup_id,
2367 current,
2368 begin,
2369 end
2370 );
2371
2372 let committed = consumer.committed(topic, vgroup_id).await?;
2373 log::info!("committed: {:?}", committed);
2374
2375 let position = consumer.position(topic, vgroup_id).await?;
2376 log::info!("position: {:?}", position);
2377
2378 let res = consumer.offset_seek(topic, vgroup_id, end).await;
2379 if res.is_err() {
2380 log::error!("seek offset error: {:?}", res);
2381 let a = consumer.assignments().await.unwrap();
2382 log::error!("assignments: {:?}", a);
2383 }
2384
2385 let committed = consumer.committed(topic, vgroup_id).await?;
2386 log::info!("after seek committed: {:?}", committed);
2387
2388 let position = consumer.position(topic, vgroup_id).await?;
2389 log::info!("after seek position: {:?}", position);
2390
2391 let res = consumer.commit_offset(topic, vgroup_id, end).await;
2392 if res.is_err() {
2393 log::error!("commit offset response: {:?}", res);
2394 }
2395
2396 let committed = consumer.committed(topic, vgroup_id).await?;
2397 log::info!("after commit committed: {:?}", committed);
2398
2399 let position = consumer.position(topic, vgroup_id).await?;
2400 log::info!("after commit position: {:?}", position);
2401 }
2402
2403 let topic_assignment = consumer.topic_assignment(topic).await;
2404 log::info!("topic assignment: {:?}", topic_assignment);
2405 }
2406
2407 let assignments = consumer.assignments().await.unwrap();
2409 log::debug!("after seek offset assignments: {:?}", assignments);
2410
2411 consumer.unsubscribe().await;
2412
2413 tokio::time::sleep(Duration::from_secs(1)).await;
2414
2415 taos.exec_many([
2416 format!("drop database {db2}"),
2417 format!("drop topic {db}"),
2418 format!("drop database {db}"),
2419 ])
2420 .await?;
2421 Ok(())
2422 }
2423}
2424
2425#[cfg(feature = "deflate")]
2426#[cfg(test)]
2427mod tmq_deflate_tests {
2428
2429 use crate::{
2430 query::infra::{ToMessage, WsRecv, WsSend},
2431 *,
2432 };
2433 use futures::{SinkExt, StreamExt};
2434 use std::time::Duration;
2435 use tracing::*;
2436 use tracing_subscriber::util::SubscriberInitExt;
2437 use ws_tool::frame::OpCode;
2438
2439 #[cfg(feature = "deflate")]
2440 #[tokio::test]
2441 async fn test_build_stream_with_deflate() -> Result<(), anyhow::Error> {
2442 let _subscriber = tracing_subscriber::fmt::fmt()
2443 .with_max_level(Level::DEBUG)
2444 .with_file(true)
2445 .with_line_number(true)
2446 .finish();
2447 let _ = _subscriber.try_init();
2448
2449 let dsn = std::env::var("TEST_CLOUD_DSN").unwrap_or("http://localhost:6041".to_string());
2450
2451 let builder = TaosBuilder::from_dsn(dsn).unwrap();
2452 let url = builder.to_query_url();
2453 let ws = builder.ws_tool_build_stream(url).await.unwrap();
2454
2455 let (mut sink, mut source) = ws.split();
2456
2457 let version = WsSend::Version;
2458 source
2459 .send(OpCode::Text, &serde_json::to_vec(&version)?)
2460 .await?;
2461
2462 let _handle = tokio::spawn(async move {
2463 loop {
2464 let frame = sink.receive().await.unwrap();
2465 let (header, payload) = frame;
2466 trace!("header.code: {:?}, payload: {:?}", &header.code, &payload);
2467 let code = header.code;
2468
2469 match code {
2470 OpCode::Binary => {
2471 println!("{:?}", payload);
2472 }
2473 OpCode::Text => {
2474 let recv: crate::query::infra::WsRecv =
2475 serde_json::from_slice(&payload).unwrap();
2476 info!("recv: {:?}", recv);
2477 assert_eq!(recv.code, 0);
2478 }
2479 _ => (),
2480 }
2481 }
2482 });
2483
2484 tokio::time::sleep(Duration::from_millis(1000)).await;
2485
2486 Ok(())
2487 }
2488
2489 #[cfg(feature = "deflate")]
2490 #[tokio::test]
2491 async fn test_ws_tmq_deflate() -> taos_query::RawResult<()> {
2492 use taos_query::prelude::*;
2497
2498 let dsn = "tmq+ws://localhost:6041?".to_string();
2499 log::trace!("dsn: {}", dsn);
2500 let mut dsn = Dsn::from_str(&dsn)?;
2501
2502 let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
2503
2504 let db = "ws_tmq_deflate";
2505 let db2 = "ws_tmq_deflate_dest";
2506
2507 taos.exec(format!("drop topic if exists {db}")).await?;
2508 taos.exec(format!("drop database if exists {db}")).await?;
2509
2510 std::thread::sleep(std::time::Duration::from_secs(1));
2511
2512 taos.exec(format!(
2513 "create database if not exists {db} wal_retention_period 3600"
2514 ))
2515 .await?;
2516
2517 std::thread::sleep(std::time::Duration::from_secs(1));
2518
2519 taos.exec_many([
2520 format!("create topic {db} with meta as database {db}").as_str(),
2521 format!("use {db}").as_str(),
2522 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2524 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
2525 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2526 tags(t1 json)",
2527 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
2529 "create table tb1 using stb1 tags(NULL)",
2530 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
2531 NULL, NULL, NULL, NULL, NULL,
2532 NULL, NULL, NULL, NULL)
2533 tb1 values(now, true, -2, -3, -4, -5, \
2534 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2535 254, 65534, 1, 1)",
2536 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2538 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
2539 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2540 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
2541 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
2542 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
2543 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
2545 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2546 254, 65534, 1, 1)",
2547 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
2548 NULL, NULL, NULL, NULL, NULL,
2549 NULL, NULL, NULL, NULL)",
2550 "create table `table` (ts timestamp, v int)",
2552 "alter table stb1 add column new1 bool",
2554 "alter table stb1 add column new2 tinyint",
2555 "alter table stb1 add column new10 nchar(16)",
2556 "alter table stb1 modify column new10 nchar(32)",
2557 "alter table stb1 drop column new10",
2558 "alter table stb1 drop column new2",
2559 "alter table stb1 drop column new1",
2560 "alter table `stb2` add tag new1 bool",
2562 "alter table `stb2` rename tag new1 new1_new",
2563 "alter table `stb2` modify tag t10 nchar(32)",
2564 "alter table `stb2` drop tag new1_new",
2565 "alter table `table` add column new1 bool",
2567 "alter table `table` add column new2 tinyint",
2568 "alter table `table` add column new10 nchar(16)",
2569 "alter table `table` modify column new10 nchar(32)",
2570 "alter table `table` rename column new10 new10_new",
2571 "alter table `table` drop column new10_new",
2572 "alter table `table` drop column new2",
2573 "alter table `table` drop column new1",
2574 "drop table `table`",
2576 "drop table `tb2`, `tb1`",
2578 "drop table `stb2`",
2580 "drop table `stb1`",
2581 ])
2582 .await?;
2583
2584 taos.exec_many([
2585 format!("drop database if exists {db2}"),
2586 format!("create database if not exists {db2} wal_retention_period 3600"),
2587 format!("use {db2}"),
2588 ])
2589 .await?;
2590
2591 dsn.params
2592 .insert("group.id".to_string(), "ws_tmq_deflate_1".to_string());
2593
2594 dsn.params
2595 .insert("auto.offset.reset".to_string(), "earliest".to_string());
2596 let builder = TmqBuilder::from_dsn(&dsn)?;
2597 let mut consumer = builder.build().await?;
2599
2600 let topics = consumer.list_topics().await?;
2601 log::info!("topics: {:?}", topics);
2602 consumer.subscribe([db]).await?;
2603 let topics = consumer.list_topics().await?;
2604 log::info!("topics: {:?}", topics);
2605
2606 {
2607 let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
2608
2609 while let Some((offset, message)) = stream.try_next().await? {
2610 let topic: &str = offset.topic();
2611 let database = offset.database();
2612 let vgroup_id = offset.vgroup_id();
2613 log::debug!(
2614 "topic: {}, database: {}, vgroup_id: {}",
2615 topic,
2616 database,
2617 vgroup_id
2618 );
2619
2620 match message {
2621 MessageSet::Meta(meta) => {
2622 log::debug!("Meta");
2623 let raw = meta.as_raw_meta().await?;
2624 taos.write_raw_meta(&raw).await?;
2625
2626 let json = meta.as_json_meta().await?;
2627 let sql = json.to_string();
2628 if let Err(err) = taos.exec(sql).await {
2629 log::trace!("maybe error: {}", err);
2630 }
2631 }
2632 MessageSet::Data(data) => {
2633 log::debug!("Data");
2634
2635 while let Some(data) = data.fetch_raw_block().await? {
2636 log::debug!("table_name: {:?}", data.table_name());
2637 log::debug!("data: {:?}", data);
2638 }
2639 }
2640 MessageSet::MetaData(meta, data) => {
2641 log::debug!("MetaData");
2642 let raw = meta.as_raw_meta().await?;
2643 taos.write_raw_meta(&raw).await?;
2644
2645 let json = meta.as_json_meta().await?;
2646 let sql = json.to_string();
2647 if let Err(err) = taos.exec(sql).await {
2648 println!("maybe error: {}", err);
2649 }
2650
2651 while let Some(data) = data.fetch_raw_block().await? {
2652 log::debug!("table_name: {:?}", data.table_name());
2653 log::debug!("data: {:?}", data);
2654 }
2655 }
2656 }
2657 consumer.commit(offset).await?;
2658 }
2659 }
2660
2661 let assignments = consumer.assignments().await.unwrap();
2662 log::info!("assignments: {:?}", assignments);
2663
2664 for topic_vec_assignment in assignments {
2666 let topic = &topic_vec_assignment.0;
2667 let vec_assignment = topic_vec_assignment.1;
2668 for assignment in vec_assignment {
2669 let vgroup_id = assignment.vgroup_id();
2670 let current = assignment.current_offset();
2671 let begin = assignment.begin();
2672 let end = assignment.end();
2673 log::info!(
2674 "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
2675 topic,
2676 vgroup_id,
2677 current,
2678 begin,
2679 end
2680 );
2681
2682 let committed = consumer.committed(topic, vgroup_id).await?;
2683 log::info!("committed: {:?}", committed);
2684
2685 let position = consumer.position(topic, vgroup_id).await?;
2686 log::info!("position: {:?}", position);
2687
2688 let res = consumer.offset_seek(topic, vgroup_id, end).await;
2689 if res.is_err() {
2690 log::error!("seek offset error: {:?}", res);
2691 let a = consumer.assignments().await.unwrap();
2692 log::error!("assignments: {:?}", a);
2693 }
2694
2695 let committed = consumer.committed(topic, vgroup_id).await?;
2696 log::info!("after seek committed: {:?}", committed);
2697
2698 let position = consumer.position(topic, vgroup_id).await?;
2699 log::info!("after seek position: {:?}", position);
2700
2701 let res = consumer.commit_offset(topic, vgroup_id, end).await;
2702 if res.is_err() {
2703 log::error!("commit offset response: {:?}", res);
2704 }
2705
2706 let committed = consumer.committed(topic, vgroup_id).await?;
2707 log::info!("after commit committed: {:?}", committed);
2708
2709 let position = consumer.position(topic, vgroup_id).await?;
2710 log::info!("after commit position: {:?}", position);
2711 }
2712
2713 let topic_assignment = consumer.topic_assignment(topic).await;
2714 log::info!("topic assignment: {:?}", topic_assignment);
2715 }
2716
2717 let assignments = consumer.assignments().await.unwrap();
2719 log::debug!("after seek offset assignments: {:?}", assignments);
2720
2721 consumer.unsubscribe().await;
2722
2723 tokio::time::sleep(Duration::from_secs(1)).await;
2724
2725 taos.exec_many([
2726 format!("drop database {db2}"),
2727 format!("drop topic {db}"),
2728 format!("drop database {db}"),
2729 ])
2730 .await?;
2731 Ok(())
2732 }
2733}