1use bytes::Bytes;
4use futures::{SinkExt, StreamExt};
5use itertools::Itertools;
6use dashmap::DashMap as HashMap;
8
9use log::warn;
10use taos_query::common::{JsonMeta, RawMeta};
11use taos_query::prelude::{Code, RawError};
12use taos_query::tmq::{
13 AsAsyncConsumer, AsConsumer, Assignment, IsAsyncData, IsAsyncMeta, IsData, IsOffset,
14 MessageSet, SyncOnAsync, Timeout, VGroupId,
15};
16use taos_query::util::{Edition, InlinableRead};
17use taos_query::RawResult;
18use taos_query::{DeError, DsnError, IntoDsn, RawBlock, TBuilder};
19use thiserror::Error;
20
21use tokio::sync::{oneshot, watch};
22
23use tokio::time;
24use tokio_tungstenite::tungstenite::Error as WsError;
25use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
26
27use crate::query::asyn::WS_ERROR_NO;
28use crate::query::infra::{ToMessage, WsConnReq};
29use crate::TaosBuilder;
30use messages::*;
31
32use ws_tool::{errors::WsError as WsErrorWst, frame::OpCode, Message as WsMessage};
33
34use std::fmt::Debug;
35use std::str::FromStr;
36use std::sync::atomic::AtomicU64;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40mod messages;
41
42type WsSender = tokio::sync::mpsc::Sender<WsMessage<bytes::Bytes>>;
43type WsTmqAgent = Arc<HashMap<ReqId, oneshot::Sender<RawResult<TmqRecvData>>>>;
44
45#[derive(Debug, Clone)]
46struct WsTmqSender {
47 req_id: Arc<AtomicU64>,
48 sender: WsSender,
49 queries: WsTmqAgent,
50 #[allow(dead_code)]
51 timeout: Timeout,
52}
53
54impl WsTmqSender {
55 fn req_id(&self) -> ReqId {
56 self.req_id
57 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
58 }
59 async fn send_recv(&self, msg: TmqSend) -> RawResult<TmqRecvData> {
60 self.send_recv_timeout(msg, Duration::MAX).await
61 }
62 async fn send_recv_timeout(&self, msg: TmqSend, timeout: Duration) -> RawResult<TmqRecvData> {
63 let send_timeout = Duration::from_millis(5000);
64 let req_id = msg.req_id();
65 let (tx, rx) = oneshot::channel();
66
67 self.queries.insert(req_id, tx);
68
69 self.sender
70 .send_timeout(msg.to_msg(), send_timeout)
71 .await
72 .map_err(WsTmqError::from)?;
73
74 let sleep = tokio::time::sleep(timeout);
75 tokio::pin!(sleep);
76 let data = tokio::select! {
77 _ = &mut sleep, if !sleep.is_elapsed() => {
78 log::trace!("poll timed out");
79 Err(WsTmqError::QueryTimeout("poll".to_string()))?
80 }
81 message = rx => {
82 message.map_err(WsTmqError::from)??
83 }
84 };
85 Ok(data)
86 }
87}
88
89#[derive(Debug)]
90pub struct TmqBuilder {
91 info: TaosBuilder,
92 conf: TmqInit,
93 timeout: Timeout,
94}
95
96impl TBuilder for TmqBuilder {
97 type Target = Consumer;
98
99 fn available_params() -> &'static [&'static str] {
100 &["token", "timeout", "group.id", "client.id"]
101 }
102
103 fn from_dsn<D: IntoDsn>(dsn: D) -> RawResult<Self> {
104 Self::new(dsn)
105 }
106
107 fn client_version() -> &'static str {
108 "0"
109 }
110
111 fn ping(&self, _: &mut Self::Target) -> RawResult<()> {
112 Ok(())
113 }
114
115 fn ready(&self) -> bool {
116 true
117 }
118
119 fn build(&self) -> RawResult<Self::Target> {
120 taos_query::block_in_place_or_global(self.build_consumer())
121 }
122
123 fn server_version(&self) -> RawResult<&str> {
124 todo!()
125 }
126
127 fn is_enterprise_edition(&self) -> RawResult<bool> {
128 todo!()
129 }
130
131 fn get_edition(&self) -> RawResult<taos_query::util::Edition> {
132 if self
133 .info
134 .addr
135 .matches(".cloud.tdengine.com")
136 .next()
137 .is_some()
138 || self
139 .info
140 .addr
141 .matches(".cloud.taosdata.com")
142 .next()
143 .is_some()
144 {
145 let edition = Edition::new("cloud", false);
146 return Ok(edition);
147 }
148
149 let taos = TBuilder::build(&self.info)?;
150
151 use taos_query::prelude::sync::Queryable;
152 let grant: RawResult<Option<(String, bool)>> = Queryable::query_one(
153 &taos,
154 "select version, (expire_time < now) from information_schema.ins_cluster",
155 );
156
157 let edition = if let Ok(Some((edition, expired))) = grant {
158 Edition::new(edition, expired)
159 } else {
160 let grant: RawResult<Option<(String, (), String)>> =
161 Queryable::query_one(&taos, "show grants");
162
163 if let Ok(Some((edition, _, expired))) = grant {
164 Edition::new(
165 edition.trim(),
166 expired.trim() == "false" || expired.trim() == "unlimited",
167 )
168 } else {
169 warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
170 Edition::new("unknown", true)
171 }
172 };
173 Ok(edition)
174 }
175}
176
177#[async_trait::async_trait]
178impl taos_query::AsyncTBuilder for TmqBuilder {
179 type Target = Consumer;
180
181 fn from_dsn<D: IntoDsn>(dsn: D) -> RawResult<Self> {
182 Self::new(dsn)
183 }
184
185 fn client_version() -> &'static str {
186 "0"
187 }
188
189 async fn ping(&self, _: &mut Self::Target) -> RawResult<()> {
190 Ok(())
191 }
192
193 async fn ready(&self) -> bool {
194 true
195 }
196
197 async fn build(&self) -> RawResult<Self::Target> {
198 self.build_consumer().await
199 }
200
201 async fn server_version(&self) -> RawResult<&str> {
202 todo!()
203 }
204
205 async fn is_enterprise_edition(&self) -> RawResult<bool> {
206 todo!()
207 }
208
209 async fn get_edition(&self) -> RawResult<taos_query::util::Edition> {
210 use taos_query::prelude::AsyncQueryable;
211
212 let taos = taos_query::AsyncTBuilder::build(&self.info).await?;
213 taos.exec("select server_status()").await?;
215
216 match self
217 .info
218 .addr
219 .matches(".cloud.tdengine.com")
220 .next()
221 .is_some()
222 || self
223 .info
224 .addr
225 .matches(".cloud.taosdata.com")
226 .next()
227 .is_some()
228 {
229 true => {
230 let edition = Edition::new("cloud", false);
231 return Ok(edition);
232 }
233 false => (),
234 }
235
236 let grant: RawResult<Option<(String, bool)>> = AsyncQueryable::query_one(
237 &taos,
238 "select version, (expire_time < now) from information_schema.ins_cluster",
239 )
240 .await;
241
242 let edition = if let Ok(Some((edition, expired))) = grant {
243 Edition::new(edition, expired)
244 } else {
245 let grant: RawResult<Option<(String, (), String)>> =
246 AsyncQueryable::query_one(&taos, "show grants").await;
247
248 if let Ok(Some((edition, _, expired))) = grant {
249 Edition::new(
250 edition.trim(),
251 expired.trim() == "false" || expired.trim() == "unlimited",
252 )
253 } else {
254 warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
255 Edition::new("unknown", true)
256 }
257 };
258 Ok(edition)
259 }
260}
261#[derive(Debug)]
262struct WsMessageBase {
263 sender: WsTmqSender,
264 message_id: MessageId,
265}
266
267impl WsMessageBase {
268 async fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>> {
269 let req_id = self.sender.req_id();
270
271 let msg = TmqSend::Fetch(MessageArgs {
272 req_id,
273 message_id: self.message_id,
274 });
275 let data = self.sender.send_recv(msg).await?;
276 let fetch = if let TmqRecvData::Fetch(fetch) = data {
277 fetch
278 } else {
279 unreachable!()
280 };
281
282 if fetch.completed {
283 return Ok(None);
284 }
285
286 let msg = TmqSend::FetchBlock(MessageArgs {
287 req_id,
288 message_id: self.message_id,
289 });
290 let data = self.sender.send_recv(msg).await?;
291 if let TmqRecvData::Bytes(bytes) = data {
292 let mut raw = RawBlock::parse_from_raw_block(bytes, fetch.precision);
293
294 raw.with_field_names(fetch.fields().iter().map(|f| f.name()));
302 if let Some(name) = fetch.table_name {
303 raw.with_table_name(name);
304 }
305 return Ok(Some(raw));
306 }
307 todo!()
308 }
309 async fn fetch_json_meta(&self) -> RawResult<JsonMeta> {
310 let req_id = self.sender.req_id();
311 let msg = TmqSend::FetchJsonMeta(MessageArgs {
312 req_id,
313 message_id: self.message_id,
314 });
315 let data = self.sender.send_recv(msg).await?;
316 if let TmqRecvData::FetchJsonMeta { data } = data {
317 let json: JsonMeta = serde_json::from_value(data).map_err(WsTmqError::from)?;
318 return Ok(json);
319 }
320 unreachable!()
321 }
322 async fn fetch_raw_meta(&self) -> RawResult<RawMeta> {
323 let req_id = self.sender.req_id();
324 let msg = TmqSend::FetchRaw(MessageArgs {
325 req_id,
326 message_id: self.message_id,
327 });
328 let data = self.sender.send_recv(msg).await?;
329 if let TmqRecvData::Bytes(bytes) = data {
330 let message_type = bytes.as_ref().read_u64().unwrap();
331 debug_assert_eq!(message_type, 3, "should be meta message type");
332 let raw = RawMeta::new(bytes.slice(8..)); return Ok(raw);
334 }
335 unreachable!()
336 }
337}
338
339#[derive(Debug)]
340pub struct Meta(WsMessageBase);
341
342#[async_trait::async_trait]
352impl IsAsyncMeta for Meta {
353 async fn as_raw_meta(&self) -> RawResult<RawMeta> {
354 self.0.fetch_raw_meta().await
355 }
356
357 async fn as_json_meta(&self) -> RawResult<JsonMeta> {
358 self.0.fetch_json_meta().await
359 }
360}
361
362impl SyncOnAsync for Meta {}
363
364#[derive(Debug)]
365pub struct Data(WsMessageBase);
366
367impl Data {
368 pub async fn fetch_block(&self) -> RawResult<Option<RawBlock>> {
369 self.0.fetch_raw_block().await
370 }
371}
372
373impl Iterator for Data {
374 type Item = RawResult<RawBlock>;
375
376 fn next(&mut self) -> Option<Self::Item> {
377 taos_query::block_in_place_or_global(self.fetch_block()).transpose()
378 }
379}
380
381#[async_trait::async_trait]
382impl IsAsyncData for Data {
383 async fn as_raw_data(&self) -> RawResult<taos_query::common::RawData> {
384 self.0
385 .fetch_raw_meta()
386 .await
387 .map(|raw| unsafe { std::mem::transmute(raw) })
388 }
389
390 async fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>> {
391 self.fetch_block().await
392 }
393}
394
395impl IsData for Data {
396 fn as_raw_data(&self) -> RawResult<taos_query::common::RawData> {
397 taos_query::block_in_place_or_global(self.0.fetch_raw_meta())
398 .map(|raw| unsafe { std::mem::transmute(raw) })
399 }
400
401 fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>> {
402 taos_query::block_in_place_or_global(self.fetch_block())
403 }
404}
405pub enum WsMessageSet {
406 Meta(Meta),
407 Data(Data),
408}
409
410impl WsMessageSet {
411 pub const fn message_type(&self) -> MessageType {
412 match self {
413 WsMessageSet::Meta(_) => MessageType::Meta,
414 WsMessageSet::Data(_) => MessageType::Data,
415 }
416 }
417}
418
419impl Consumer {
420 async fn poll_wait(&self) -> RawResult<(Offset, MessageSet<Meta, Data>)> {
444 let elapsed = tokio::time::Instant::now();
445 loop {
446 let req_id = self.sender.req_id();
447 let action = TmqSend::Poll {
448 req_id,
449 blocking_time: 0,
450 };
451
452 let data = self.sender.send_recv(action).await?;
453
454 match data {
455 TmqRecvData::Poll(TmqPoll {
456 message_id,
457 database,
458 have_message,
459 topic,
460 vgroup_id,
461 message_type,
462 }) => {
463 if have_message {
464 let dur = elapsed.elapsed();
465 let offset = Offset {
466 message_id,
467 database,
468 topic,
469 vgroup_id,
470 };
471 let message = WsMessageBase {
472 sender: self.sender.clone(),
473 message_id,
474 };
475 log::trace!("Got message in {}ms", dur.as_millis());
476 break match message_type {
477 MessageType::Meta => Ok((offset, MessageSet::Meta(Meta(message)))),
478 MessageType::Data => Ok((offset, MessageSet::Data(Data(message)))),
479 MessageType::MetaData => Ok((
480 offset,
481 MessageSet::MetaData(
482 Meta(message),
483 Data(WsMessageBase {
484 sender: self.sender.clone(),
485 message_id,
486 }),
487 ),
488 )),
489 MessageType::Invalid => unreachable!(),
490 };
492 } else {
493 tokio::time::sleep(Duration::from_millis(500)).await;
494 continue;
495 }
496 }
497 _ => unreachable!(),
498 }
499 }
500 }
501 pub(crate) async fn poll_timeout(
502 &self,
503 timeout: Duration,
504 ) -> RawResult<Option<(Offset, MessageSet<Meta, Data>)>> {
505 let sleep = tokio::time::sleep(timeout);
506 tokio::pin!(sleep);
507 tokio::select! {
508 _ = &mut sleep, if !sleep.is_elapsed() => {
509 Ok(None)
510 }
511 message = self.poll_wait() => {
512 Ok(Some(message?))
513 }
514 }
515 }
516}
517
518#[async_trait::async_trait]
519impl AsAsyncConsumer for Consumer {
520 type Offset = Offset;
521
522 type Meta = Meta;
523
524 type Data = Data;
525
526 async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
527 &mut self,
528 topics: I,
529 ) -> RawResult<()> {
530 self.topics = topics.into_iter().map(Into::into).collect_vec();
531 let req_id = self.sender.req_id();
532 let action = TmqSend::Subscribe {
533 req_id,
534 req: self.tmq_conf.clone(),
535 topics: self.topics.clone(),
536 conn: self.conn.clone(),
537 };
538 self.sender.send_recv(action).await?;
539
540 if let Some(offset) = self.tmq_conf.offset_seek.clone() {
543 let offsets = offset
545 .split(',')
546 .map(|s| {
547 s.split(':')
548 .map(|i| i.parse::<i64>().unwrap())
549 .collect_vec()
550 })
551 .collect_vec();
552 let topic_name = &self.topics[0];
553 for offset in offsets {
554 let vgroup_id = offset[0] as i32;
555 let offset = offset[1];
556 log::trace!(
557 "topic {} seeking to offset {} for vgroup {}",
558 &topic_name,
559 offset,
560 vgroup_id
561 );
562
563 let req_id = self.sender.req_id();
564 let action = TmqSend::Seek(OffsetSeekArgs {
565 req_id,
566 topic: topic_name.to_string(),
567 vgroup_id,
568 offset,
569 });
570
571 let _ = self
572 .sender
573 .send_recv(action)
574 .await
575 .unwrap_or(crate::consumer::messages::TmqRecvData::Seek { timing: 0 });
576 }
577 }
578
579 Ok(())
580 }
581
582 async fn unsubscribe(self) {
583 let req_id = self.sender.req_id();
584 log::trace!("unsubscribe {} start", req_id);
585 let action = TmqSend::Unsubscribe { req_id };
586 self.sender.send_recv(action).await.unwrap();
587 drop(self)
588 }
589
590 async fn recv_timeout(
591 &self,
592 timeout: taos_query::tmq::Timeout,
593 ) -> RawResult<
594 Option<(
595 Self::Offset,
596 taos_query::tmq::MessageSet<Self::Meta, Self::Data>,
597 )>,
598 > {
599 match timeout {
600 Timeout::Never | Timeout::None => self.poll_timeout(Duration::MAX).await,
601 Timeout::Duration(timeout) => self.poll_timeout(timeout).await,
602 }
603 }
604
605 async fn commit(&self, offset: Self::Offset) -> RawResult<()> {
606 let req_id = self.sender.req_id();
607 let action = TmqSend::Commit(MessageArgs {
608 req_id,
609 message_id: offset.message_id,
610 });
611
612 let _ = self.sender.send_recv(action).await?;
613 Ok(())
614 }
615
616 async fn commit_offset(
617 &self,
618 topic_name: &str,
619 vgroup_id: VGroupId,
620 offset: i64,
621 ) -> RawResult<()> {
622 let req_id = self.sender.req_id();
623 let action = TmqSend::CommitOffset(OffsetSeekArgs {
624 req_id,
625 topic: topic_name.to_string(),
626 vgroup_id,
627 offset,
628 });
629
630 let _ = self.sender.send_recv(action).await?;
631 Ok(())
632 }
633
634 async fn list_topics(&self) -> RawResult<Vec<String>> {
635 let topics = self.topics.clone();
636 Ok(topics)
637 }
638
639 async fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>> {
640 let topics = self.topics.clone();
641 log::trace!("topics: {:?}", topics);
642
643 let mut ret = Vec::new();
644 for topic in topics {
645 let assignments = self.topic_assignment(&topic).await;
646 ret.push((topic, assignments));
647 }
648
649 Some(ret)
650 }
651
652 async fn topic_assignment(&self, topic: &str) -> Vec<Assignment> {
653 let req_id = self.sender.req_id();
654 let action = TmqSend::Assignment(TopicAssignmentArgs {
655 req_id,
656 topic: topic.to_string(),
657 });
658
659 let recv = self.sender.send_recv(action).await.ok();
660 match recv {
661 Some(TmqRecvData::Assignment(TopicAssignment { assignment, timing })) => {
662 log::trace!("timing: {:?}", timing);
664 log::trace!("assignment: {:?}", assignment);
665 assignment
666 }
667 _ => {
668 vec![]
669 }
670 }
671 }
672
673 async fn offset_seek(
674 &mut self,
675 topic: &str,
676 vgroup_id: VGroupId,
677 offset: i64,
678 ) -> RawResult<()> {
679 let req_id = self.sender.req_id();
680 let action = TmqSend::Seek(OffsetSeekArgs {
681 req_id,
682 topic: topic.to_string(),
683 vgroup_id,
684 offset,
685 });
686
687 let _ = self.sender.send_recv(action).await?;
688 Ok(())
689 }
690
691 async fn committed(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
692 let req_id = self.sender.req_id();
693 let action = TmqSend::Committed(OffsetArgs {
694 req_id,
695 topic_vgroup_ids: vec![OffsetInnerArgs {
696 topic: topic.to_string(),
697 vgroup_id,
698 }],
699 });
700
701 let data = self.sender.send_recv(action).await?;
702 if let TmqRecvData::Committed { committed } = data {
703 let offset = committed[0];
704 Ok(offset)
705 } else {
706 Ok(0)
707 }
708 }
709
710 async fn position(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
711 let req_id = self.sender.req_id();
712 let action = TmqSend::Position(OffsetArgs {
713 req_id,
714 topic_vgroup_ids: vec![OffsetInnerArgs {
715 topic: topic.to_string(),
716 vgroup_id,
717 }],
718 });
719
720 let data = self.sender.send_recv(action).await?;
721 if let TmqRecvData::Position { position } = data {
722 let offset = position[0];
723 Ok(offset)
724 } else {
725 Ok(0)
726 }
727 }
728
729 fn default_timeout(&self) -> Timeout {
730 self.timeout
731 }
732}
733
734impl AsConsumer for Consumer {
735 type Offset = Offset;
736
737 type Meta = Meta;
738
739 type Data = Data;
740
741 fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
742 &mut self,
743 topics: I,
744 ) -> RawResult<()> {
745 taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::subscribe(self, topics))
746 }
747
748 fn recv_timeout(
749 &self,
750 timeout: Timeout,
751 ) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>> {
752 taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::recv_timeout(
753 self, timeout,
754 ))
755 }
756
757 fn commit(&self, offset: Self::Offset) -> RawResult<()> {
758 taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::commit(self, offset))
759 }
760
761 fn commit_offset(&self, topic_name: &str, vgroup_id: VGroupId, offset: i64) -> RawResult<()> {
762 taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::commit_offset(
763 self, topic_name, vgroup_id, offset,
764 ))
765 }
766
767 fn list_topics(&self) -> RawResult<Vec<String>> {
768 taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::list_topics(self))
769 }
770
771 fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>> {
772 taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::assignments(self))
773 }
774
775 fn offset_seek(&mut self, topic: &str, vg_id: VGroupId, offset: i64) -> RawResult<()> {
776 taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::offset_seek(
777 self, topic, vg_id, offset,
778 ))
779 }
780
781 fn committed(&self, topic: &str, vg_id: VGroupId) -> RawResult<i64> {
782 taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::committed(
783 self, topic, vg_id,
784 ))
785 }
786
787 fn position(&self, topic: &str, vg_id: VGroupId) -> RawResult<i64> {
788 taos_query::block_in_place_or_global(<Consumer as AsAsyncConsumer>::position(
789 self, topic, vg_id,
790 ))
791 }
792}
793
794impl TmqBuilder {
795 pub fn new<D: IntoDsn>(dsn: D) -> RawResult<Self> {
796 let dsn = dsn.into_dsn()?;
797 let info = TaosBuilder::from_dsn(&dsn)?;
798 let group_id = dsn
799 .params
800 .get("group.id")
801 .map(ToString::to_string)
802 .ok_or_else(|| DsnError::RequireParam("group.id".to_string()))?;
803 let client_id = dsn.params.get("client.id").map(ToString::to_string);
804 let offset_reset = dsn.params.get("auto.offset.reset").map(ToString::to_string);
805 let auto_commit = dsn
806 .params
807 .get("enable.auto.commit")
808 .map(|s| {
809 if s.is_empty() {
810 "false".to_string()
811 } else {
812 s.to_string()
813 }
814 })
815 .unwrap_or("false".to_string());
816 let auto_commit_interval_ms = dsn.params.get("auto.commit.interval.ms").and_then(|s| {
817 if s.is_empty() {
818 None
819 } else {
820 Some(s.to_string())
821 }
822 });
823 let snapshot_enable = dsn
824 .params
825 .get("experimental.snapshot.enable")
826 .and_then(|s| {
827 if s.is_empty() {
828 None
829 } else {
830 Some(s.to_string())
831 }
832 })
833 .unwrap_or("false".to_string());
834 let with_table_name = dsn
835 .params
836 .get("with.table.name")
837 .and_then(|s| {
838 if s.is_empty() {
839 None
840 } else {
841 Some(s.to_string())
842 }
843 })
844 .unwrap_or("true".to_string());
845 let timeout = if let Some(timeout) = dsn.get("timeout") {
846 Timeout::from_str(timeout).map_err(RawError::from_any)?
847 } else {
848 Timeout::Duration(Duration::from_secs(5))
849 };
850 let offset_seek = dsn.params.get("offset").and_then(|s| {
851 if s.is_empty() {
852 None
853 } else {
854 Some(s.to_string())
855 }
856 });
857 let conf = TmqInit {
858 group_id,
859 client_id,
860 offset_reset,
861 auto_commit,
862 auto_commit_interval_ms,
863 snapshot_enable,
864 with_table_name,
865 offset_seek,
866 };
867
868 Ok(Self {
869 info,
870 conf,
871 timeout,
872 })
873 }
874
875 #[allow(dead_code)]
876 async fn tung_build_consumer(&self) -> RawResult<Consumer> {
877 let url = self.info.to_tmq_url();
878 let (ws, _) = connect_async(&url).await.map_err(WsTmqError::from)?;
880 let (mut sender, mut reader) = ws.split();
881
882 let queries = Arc::new(HashMap::<ReqId, tokio::sync::oneshot::Sender<_>>::new());
883
884 let queries_sender = queries.clone();
885 let msg_handler = queries.clone();
886
887 let (ws, mut msg_recv) = tokio::sync::mpsc::channel::<Message>(100);
888 let ws2 = ws.clone();
889
890 let (tx, mut rx) = watch::channel(false);
892 let mut close_listener = rx.clone();
893
894 let sending_url = url.clone();
895 static PING_INTERVAL: u64 = 29;
896 const PING: &[u8] = b"TAOSX";
897
898 tokio::spawn(async move {
899 let mut interval = time::interval(Duration::from_secs(PING_INTERVAL));
900
901 loop {
902 tokio::select! {
903 _ = interval.tick() => {
904 log::trace!("Check websocket message sender alive");
905 if let Err(err) = sender.send(Message::Ping(PING.to_vec())).await {
906 log::trace!("sending ping message to {sending_url} error: {err:?}");
907 let keys = msg_handler.iter().map(|r| *r.key()).collect_vec();
909
910 for k in keys {
914 if let Some((_, sender)) = msg_handler.remove(&k) {
915 let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(),
916 format!("WebSocket internal error: {err}"))));
917 }
918 }
919 }
920 }
921 Some(msg) = msg_recv.recv() => {
922 if msg.is_close() {
923 let _ = sender.send(msg).await;
924 let _ = sender.close().await;
925 break;
926 }
927 log::trace!("send message {msg:?}");
928 if let Err(err) = sender.send(msg).await {
929 log::trace!("sending message to {sending_url} error: {err:?}");
930 let keys = msg_handler.iter().map(|r| *r.key()).collect_vec();
931 for k in keys {
932 if let Some((_, sender)) = msg_handler.remove(&k) {
933 let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(),
934 format!("WebSocket internal error: {err}"))));
935 }
936 }
937 }
938 log::trace!("send message done");
939 }
940 _ = rx.changed() => {
941 let _= sender.send(Message::Close(None)).await;
942 let _ = sender.close().await;
943 log::trace!("close tmq sender");
944 break;
945 }
946 }
947 }
948 });
949
950 tokio::spawn(async move {
951 let instant = Instant::now();
952 'ws: loop {
953 tokio::select! {
954 Some(message) = reader.next() => {
955 match message {
956 Ok(message) => match message {
957 Message::Text(text) => {
958 log::trace!("json response: {}", text);
959 let v: TmqRecv = serde_json::from_str(&text).expect(&text);
960 let (req_id, recv, ok) = v.ok();
961 match &recv {
962 TmqRecvData::Subscribe => {
963 log::trace!("subscribe with: {:?}", req_id);
964
965 if let Some((_, sender)) = queries_sender.remove(&req_id)
966 {
967 let _ = sender.send(ok.map(|_|recv));
968 } else {
969 log::warn!("subscribe message received but no receiver alive");
970 }
971 },
972 TmqRecvData::Unsubscribe => {
973 log::trace!("unsubscribe with: {:?} successed", req_id);
974 if let Some((_, sender)) = queries_sender.remove(&req_id)
975 {
976 let _ = sender.send(ok.map(|_|recv));
977 } else {
978 log::warn!("unsubscribe message received but no receiver alive");
979 }
980 },
981 TmqRecvData::Poll(_) => {
982 if let Some((_, sender)) = queries_sender.remove(&req_id)
983 {
984 let _ = sender.send(ok.map(|_|recv));
985 } else {
986 log::warn!("poll message received but no receiver alive");
987 }
988 },
989 TmqRecvData::FetchJsonMeta { data }=> {
990 log::trace!("fetch json meta data: {:?}", data);
991 if let Some((_, sender)) = queries_sender.remove(&req_id)
992 {
993 let _ = sender.send(ok.map(|_|recv));
994 } else {
995 log::warn!("poll message received but no receiver alive");
996 }
997 }
998 TmqRecvData::FetchRaw { meta: _ }=> {
999 if let Some((_, sender)) = queries_sender.remove(&req_id)
1000 {
1001 let _ = sender.send(ok.map(|_|recv));
1002 } else {
1003 log::warn!("poll message received but no receiver alive");
1004 }
1005 }
1006 TmqRecvData::Commit=> {
1007 log::trace!("commit done: {:?}", recv);
1008 if let Some((_, sender)) = queries_sender.remove(&req_id)
1009 {
1010 let _ = sender.send(ok.map(|_|recv));
1011 } else {
1012 log::warn!("poll message received but no receiver alive");
1013 }
1014 }
1015 TmqRecvData::Fetch(fetch)=> {
1016 log::trace!("fetch done: {:?}", fetch);
1017 if let Some((_, sender)) = queries_sender.remove(&req_id)
1018 {
1019 let _ = sender.send(ok.map(|_|recv));
1020 } else {
1021 log::warn!("poll message received but no receiver alive");
1022 }
1023 }
1024 TmqRecvData::FetchBlock{ data: _ }=> {
1025 if let Some((_, sender)) = queries_sender.remove(&req_id) {
1026 let _ = sender.send(Err(RawError::new(
1027 WS_ERROR_NO::WEBSOCKET_ERROR.as_code(),
1028 format!("WebSocket internal error: {:?}", &text)
1029 )));
1030 }
1031 break 'ws;
1032 }
1033 TmqRecvData::Assignment(assignment)=> {
1034 log::trace!("assignment done: {:?}", assignment);
1035 if let Some((_, sender)) = queries_sender.remove(&req_id)
1036 {
1037 let _ = sender.send(ok.map(|_|recv));
1038 } else {
1039 log::warn!("assignment message received but no receiver alive");
1040 }
1041 }
1042 TmqRecvData::Seek { timing }=> {
1043 log::trace!("seek done: req_id {:?} timing {:?}", &req_id, timing);
1044 if let Some((_, sender)) = queries_sender.remove(&req_id)
1045 {
1046 let _ = sender.send(ok.map(|_|recv));
1047 } else {
1048 log::warn!("seek message received but no receiver alive");
1049 }
1050 }
1051 TmqRecvData::Committed { committed }=> {
1052 log::trace!("committed done: {:?}", committed);
1053 if let Some((_, sender)) = queries_sender.remove(&req_id)
1054 {
1055 let _ = sender.send(ok.map(|_|recv));
1056 } else {
1057 log::warn!("committed message received but no receiver alive");
1058 }
1059 }
1060 TmqRecvData::Position { position }=> {
1061 log::trace!("position done: {:?}", position);
1062 if let Some((_, sender)) = queries_sender.remove(&req_id)
1063 {
1064 let _ = sender.send(ok.map(|_|recv));
1065 } else {
1066 log::warn!("position message received but no receiver alive");
1067 }
1068 }
1069 TmqRecvData::CommitOffset { timing }=> {
1070 log::trace!("commit offset done: {:?}", timing);
1071 if let Some((_, sender)) = queries_sender.remove(&req_id) {
1072 let _ = sender.send(ok.map(|_|recv));
1073 } else {
1074 log::warn!("commit offset message received but no receiver alive");
1075 }
1076 }
1077
1078 _ => unreachable!("unknown tmq response"),
1079 }
1080 }
1081 Message::Binary(data) => {
1082 let mut bytes = Bytes::from(data);
1088 let part = bytes.slice(24..);
1089 use bytes::Buf;
1091 let timing = bytes.get_u64_le();
1092 let req_id = bytes.get_u64_le();
1093 let message_id = bytes.get_u64_le();
1094
1095
1096 log::trace!("[{:.2}ms] receive binary message with req_id {} message_id {}",
1097 Duration::from_nanos(timing).as_secs_f64() / 1000.,
1098 req_id, message_id);
1099
1100 if let Some((_, sender)) = queries_sender.remove(&req_id)
1101 {
1102 sender.send(Ok(TmqRecvData::Bytes(part))).unwrap();
1103 } else {
1104 log::warn!("poll message received but no receiver alive");
1105 }
1106
1107
1108 }
1109 Message::Close(close) => {
1110 log::warn!("websocket connection is closed (unexpected?)");
1111
1112 let keys = queries_sender.iter().map(|r| *r.key()).collect_vec();
1113 let err = if let Some(close) = close {
1114 format!("WebSocket internal error: {}", close)
1115 } else {
1116 "WebSocket internal error, connection is reset by server".to_string()
1117 };
1118 for k in keys {
1119 if let Some((_, sender)) = queries_sender.remove(&k) {
1120 let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(), err.clone())));
1121 }
1122 }
1123 break 'ws;
1124 }
1125 Message::Ping(bytes) => {
1126 ws2.send(Message::Pong(bytes)).await.unwrap();
1127 }
1128 Message::Pong(bytes) => {
1129 if bytes == PING {
1130 log::trace!("ping/pong handshake success");
1131 } else {
1132 log::warn!("received (unexpected) pong message, do nothing");
1134 }
1135 }
1136 Message::Frame(frame) => {
1137 log::warn!("received (unexpected) frame message, do nothing");
1139 log::trace!("* frame data: {frame:?}");
1140 }
1141 },
1142 Err(err) => {
1143 let keys = queries_sender.iter().map(|r| *r.key()).collect_vec();
1144 for k in keys {
1145 if let Some((_, sender)) = queries_sender.remove(&k) {
1146 let _ = sender.send(Err(RawError::new(
1147 WS_ERROR_NO::CONN_CLOSED.as_code(),
1148 format!("WebSocket internal error: {err}")
1149 )));
1150 }
1151 }
1152 break 'ws;
1153 }
1154 }
1155 }
1156 _ = close_listener.changed() => {
1157 log::trace!("close reader task");
1158 break 'ws;
1159 }
1160 }
1161 }
1162 log::trace!("Consuming done in {:?}", instant.elapsed());
1163 });
1164 let (ws, mut _msg_recv) = tokio::sync::mpsc::channel(100);
1165 let ws_cloned: tokio::sync::mpsc::Sender<WsMessage<bytes::Bytes>> = ws.clone();
1166 let consumer = Consumer {
1167 conn: self.info.to_conn_request(),
1168 tmq_conf: self.conf.clone(),
1169 sender: WsTmqSender {
1170 req_id: Arc::new(AtomicU64::new(1)),
1171 queries,
1172 sender: ws_cloned,
1173 timeout: Timeout::Duration(Duration::MAX),
1174 },
1175 close_signal: tx,
1177 timeout: self.timeout,
1178 topics: vec![],
1179 };
1180
1181 Ok(consumer)
1182 }
1183
1184 async fn build_consumer(&self) -> RawResult<Consumer> {
1185 let url = self.info.to_tmq_url();
1186 let sending_url = url.clone();
1187
1188 let ws = self.info.build_tmq_stream(url).await?;
1189 let (mut reader, mut sender) = ws.split();
1190
1191 let queries = Arc::new(HashMap::<ReqId, tokio::sync::oneshot::Sender<_>>::new());
1192
1193 let queries_sender = queries.clone();
1194 let msg_handler = queries.clone();
1195
1196 let (ws, mut msg_recv) = tokio::sync::mpsc::channel::<WsMessage<bytes::Bytes>>(100);
1197 let ws2 = ws.clone();
1198
1199 let (tx, mut rx) = watch::channel(false);
1201 let mut close_listener = rx.clone();
1202
1203 static PING_INTERVAL: u64 = 29;
1204 const PING: &[u8] = b"TAOS";
1205
1206 tokio::spawn(async move {
1207 let mut interval = time::interval(Duration::from_secs(PING_INTERVAL));
1208
1209 loop {
1210 tokio::select! {
1211 _ = interval.tick() => {
1212 log::trace!("Check websocket message sender alive");
1213 if let Err(err) = sender.send(OpCode::Ping, &serde_json::to_vec(&PING).unwrap()).await {
1214
1215 log::trace!("sending ping message to {sending_url} error: {err:?}");
1216 let keys = msg_handler.iter().map(|r| *r.key()).collect_vec();
1217
1218 for k in keys {
1219 if let Some((_, sender)) = msg_handler.remove(&k) {
1220 let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(),
1221 format!("WebSocket internal error: {err}"))));
1222 }
1223 }
1224 }
1225 }
1226 Some(msg) = msg_recv.recv() => {
1227
1228 log::trace!("send message {msg:?}");
1229 let opcode = msg.code;
1230 let msg = msg.data;
1231 if let Err(err) = sender.send(opcode, &msg).await {
1232 log::trace!("sending message to {sending_url} error: {err:?}");
1233 let keys = msg_handler.iter().map(|r| *r.key()).collect_vec();
1234 for k in keys {
1235 if let Some((_, sender)) = msg_handler.remove(&k) {
1236 let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(),
1237 format!("WebSocket internal error: {err}"))));
1238 }
1239 }
1240 }
1241 log::trace!("send message done");
1242 }
1243 _ = rx.changed() => {
1244 let _ = sender.send(OpCode::Close, b"").await;
1245 log::trace!("close tmq sender");
1246 break;
1247 }
1248 }
1249 }
1250 });
1251
1252 tokio::spawn(async move {
1253 let instant = Instant::now();
1254 'ws: loop {
1255 tokio::select! {
1256 Ok(frame) = reader.receive() => {
1257 let (header, payload) = frame;
1258 let code = header.code;
1259 match code {
1260 OpCode::Text => {
1261 log::trace!("received json response: {payload}", payload = String::from_utf8_lossy(&payload));
1262 let v: TmqRecv = serde_json::from_slice(&payload).unwrap();
1263 let (req_id, recv, ok) = v.ok();
1264 match &recv {
1265 TmqRecvData::Subscribe => {
1266 log::trace!("subscribe with: {:?}", req_id);
1267
1268 if let Some((_, sender)) = queries_sender.remove(&req_id)
1269 {
1270 let _ = sender.send(ok.map(|_|recv));
1271 } else {
1272 log::warn!("subscribe message received but no receiver alive");
1273 }
1274 },
1275 TmqRecvData::Unsubscribe => {
1276 log::trace!("unsubscribe with: {:?} successed", req_id);
1277 if let Some((_, sender)) = queries_sender.remove(&req_id)
1278 {
1279 let _ = sender.send(ok.map(|_|recv));
1280 } else {
1281 log::warn!("unsubscribe message received but no receiver alive");
1282 }
1283 },
1284 TmqRecvData::Poll(_) => {
1285 if let Some((_, sender)) = queries_sender.remove(&req_id)
1286 {
1287 let _ = sender.send(ok.map(|_|recv));
1288 } else {
1289 log::warn!("poll message received but no receiver alive");
1290 }
1291 },
1292 TmqRecvData::FetchJsonMeta { data }=> {
1293 log::trace!("fetch json meta data: {:?}", data);
1294 if let Some((_, sender)) = queries_sender.remove(&req_id)
1295 {
1296 let _ = sender.send(ok.map(|_|recv));
1297 } else {
1298 log::warn!("poll message received but no receiver alive");
1299 }
1300 }
1301 TmqRecvData::FetchRaw { meta: _ }=> {
1302 if let Some((_, sender)) = queries_sender.remove(&req_id)
1303 {
1304 let _ = sender.send(ok.map(|_|recv));
1305 } else {
1306 log::warn!("poll message received but no receiver alive");
1307 }
1308 }
1309 TmqRecvData::Commit=> {
1310 log::trace!("commit done: {:?}", recv);
1311 if let Some((_, sender)) = queries_sender.remove(&req_id)
1312 {
1313 let _ = sender.send(ok.map(|_|recv));
1314 } else {
1315 log::warn!("poll message received but no receiver alive");
1316 }
1317 }
1318 TmqRecvData::Fetch(fetch)=> {
1319 log::trace!("fetch done: {:?}", fetch);
1320 if let Some((_, sender)) = queries_sender.remove(&req_id)
1321 {
1322 let _ = sender.send(ok.map(|_|recv));
1323 } else {
1324 log::warn!("poll message received but no receiver alive");
1325 }
1326 }
1327 TmqRecvData::FetchBlock{ data: _ }=> {
1328 if let Some((_, sender)) = queries_sender.remove(&req_id) {
1329 let _ = sender.send(Err(RawError::new(
1330 WS_ERROR_NO::WEBSOCKET_ERROR.as_code(),
1331 format!("WebSocket internal error")
1332 )));
1333 }
1334 break 'ws;
1335 }
1336 TmqRecvData::Assignment(assignment)=> {
1337 log::trace!("assignment done: {:?}", assignment);
1338 if let Some((_, sender)) = queries_sender.remove(&req_id)
1339 {
1340 let _ = sender.send(ok.map(|_|recv));
1341 } else {
1342 log::warn!("assignment message received but no receiver alive");
1343 }
1344 }
1345 TmqRecvData::Seek { timing }=> {
1346 log::trace!("seek done: req_id {:?} timing {:?}", &req_id, timing);
1347 if let Some((_, sender)) = queries_sender.remove(&req_id)
1348 {
1349 let _ = sender.send(ok.map(|_|recv));
1350 } else {
1351 log::warn!("seek message received but no receiver alive");
1352 }
1353 }
1354 TmqRecvData::Committed { committed }=> {
1355 log::trace!("committed done: {:?}", committed);
1356 if let Some((_, sender)) = queries_sender.remove(&req_id)
1357 {
1358 let _ = sender.send(ok.map(|_|recv));
1359 } else {
1360 log::warn!("committed message received but no receiver alive");
1361 }
1362 }
1363 TmqRecvData::Position { position }=> {
1364 log::trace!("position done: {:?}", position);
1365 if let Some((_, sender)) = queries_sender.remove(&req_id)
1366 {
1367 let _ = sender.send(ok.map(|_|recv));
1368 } else {
1369 log::warn!("position message received but no receiver alive");
1370 }
1371 }
1372 TmqRecvData::CommitOffset { timing }=> {
1373 log::trace!("commit offset done: {:?}", timing);
1374 if let Some((_, sender)) = queries_sender.remove(&req_id) {
1375 let _ = sender.send(ok.map(|_|recv));
1376 } else {
1377 log::warn!("commit offset message received but no receiver alive");
1378 }
1379 }
1380 _ => unreachable!("unknown tmq response"),
1381 }
1382 }
1383 OpCode::Binary => {
1384 let block = payload.to_vec();
1385 let mut slice = block.as_slice();
1386 use taos_query::util::InlinableRead;
1387 let offset = 24;
1388 let part = slice[offset..].to_vec();
1389
1390 let _timing = {
1391 let timing = slice.read_u64().unwrap();
1392 Duration::from_nanos(timing as _)
1393 };
1394
1395 let req_id = slice.read_u64().unwrap();
1396
1397 if let Some((_, sender)) = queries_sender.remove(&req_id) {
1398 log::trace!("send data to fetches with id {}", req_id);
1399 sender.send(Ok(TmqRecvData::Bytes(part.into()))).unwrap();
1400 } else {
1401 log::warn!("req_id {req_id} not detected, message might be lost");
1402 }
1403 }
1404 OpCode::Close => {
1405 log::warn!("websocket connection is closed normally");
1406 let mut keys = Vec::new();
1407 for e in queries_sender.iter() {
1408 keys.push(*e.key());
1409 }
1410 for k in keys {
1411 if let Some((_, sender)) = queries_sender.remove(&k) {
1412 let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(), "received close message")));
1413 }
1414 }
1415 break 'ws;
1416 }
1417 OpCode::Ping => {
1418 let bytes = payload.to_vec();
1419 ws2.send(WsMessage{
1420 code: OpCode::Pong,
1421 data: bytes.into(),
1422 close_code: None
1423 }).await.unwrap();
1424 }
1425 OpCode::Pong => {
1426 log::trace!("received pong message, do nothing");
1428 }
1429 _ => {
1430 let frame = payload;
1431 log::warn!("received (unexpected) frame message, do nothing");
1433 log::trace!("* frame data: {frame:?}");
1434 }
1435 }
1436 }
1437 _ = close_listener.changed() => {
1438 log::trace!("close reader task");
1439 break 'ws;
1440 }
1441 }
1442 }
1443 log::trace!("Consuming done in {:?}", instant.elapsed());
1444 });
1445 let consumer = Consumer {
1446 conn: self.info.to_conn_request(),
1447 tmq_conf: self.conf.clone(),
1448 sender: WsTmqSender {
1449 req_id: Arc::new(AtomicU64::new(1)),
1450 queries,
1451 sender: ws,
1452 timeout: Timeout::Duration(Duration::MAX),
1453 },
1454 close_signal: tx,
1456 timeout: self.timeout,
1457 topics: vec![],
1458 };
1459
1460 Ok(consumer)
1461 }
1462}
1463
1464#[derive(Debug)]
1465pub struct Consumer {
1466 conn: WsConnReq,
1467 tmq_conf: TmqInit,
1468 sender: WsTmqSender,
1469 close_signal: watch::Sender<bool>,
1470 timeout: Timeout,
1471 topics: Vec<String>,
1472}
1473
1474impl Drop for Consumer {
1475 fn drop(&mut self) {
1476 let _ = self.close_signal.send(true);
1477 }
1478}
1479#[derive(Debug)]
1480pub struct Offset {
1481 message_id: MessageId,
1482 database: String,
1483 topic: String,
1484 vgroup_id: i32,
1485}
1486
1487impl IsOffset for Offset {
1488 fn database(&self) -> &str {
1489 &self.database
1490 }
1491
1492 fn topic(&self) -> &str {
1493 &self.topic
1494 }
1495
1496 fn vgroup_id(&self) -> i32 {
1497 self.vgroup_id
1498 }
1499}
1500
1501#[derive(Debug, Error)]
1502pub enum WsTmqError {
1503 #[error("{0}")]
1504 Dsn(#[from] DsnError),
1505 #[error("{0}")]
1506 FetchError(#[from] oneshot::error::RecvError),
1507 #[error("{0}")]
1508 SendError(#[from] tokio::sync::mpsc::error::SendError<WsMessage<bytes::Bytes>>),
1509 #[error(transparent)]
1510 SendTimeoutError(#[from] tokio::sync::mpsc::error::SendTimeoutError<WsMessage<bytes::Bytes>>),
1511 #[error("{0}")]
1512 DeError(#[from] DeError),
1513 #[error("Deserialize json error: {0}")]
1514 JsonError(#[from] serde_json::Error),
1515 #[error("WebSocket internal[ws-tool] error: {0}")]
1516 WsErrorWst(#[from] WsErrorWst),
1517 #[error("{0}")]
1518 WsError(#[from] WsError),
1519 #[error("{0}")]
1520 TaosError(#[from] RawError),
1521 #[error("Receive timeout in {0}")]
1522 QueryTimeout(String),
1523}
1524
1525unsafe impl Send for WsTmqError {}
1526
1527unsafe impl Sync for WsTmqError {}
1528
1529impl WsTmqError {
1530 pub const fn errno(&self) -> Code {
1531 match self {
1532 WsTmqError::TaosError(error) => error.code(),
1533 _ => Code::FAILED,
1534 }
1535 }
1536 pub fn errstr(&self) -> String {
1537 match self {
1538 WsTmqError::TaosError(error) => error.message().to_string(),
1539 _ => format!("{}", self),
1540 }
1541 }
1542}
1543
1544impl From<WsTmqError> for RawError {
1545 fn from(value: WsTmqError) -> Self {
1546 match value {
1547 WsTmqError::TaosError(error) => error,
1548 error => {
1549 let code = error.errno();
1550 if code == Code::FAILED {
1551 RawError::from_any(error)
1552 } else {
1553 RawError::new(code, error.to_string())
1554 }
1555 }
1556 }
1557 }
1558}
1559
1560#[cfg(test)]
1561mod tests {
1562 use std::time::Duration;
1563
1564 use super::{TaosBuilder, TmqBuilder};
1565
1566 #[tokio::test]
1567 async fn test_ws_tmq_meta_batch() -> anyhow::Result<()> {
1568 use taos_query::prelude::*;
1569 let _ = tracing_subscriber::fmt()
1570 .with_file(true)
1571 .with_line_number(true)
1572 .with_max_level(tracing::Level::DEBUG)
1573 .compact()
1574 .try_init();
1575
1576 let taos = TaosBuilder::from_dsn("taos://localhost:6041")?
1577 .build()
1578 .await?;
1579 taos.exec_many([
1580 "drop topic if exists ws_tmq_meta_batch",
1581 "drop database if exists ws_tmq_meta_batch",
1582 "create database ws_tmq_meta_batch wal_retention_period 3600",
1583 "create topic ws_tmq_meta_batch with meta as database ws_tmq_meta_batch",
1584 "use ws_tmq_meta_batch",
1585 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1587 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1588 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1589 tags(t1 json)",
1590 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1592 "create table tb1 using stb1 tags(NULL)",
1593 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1594 NULL, NULL, NULL, NULL, NULL,
1595 NULL, NULL, NULL, NULL)
1596 tb1 values(now, true, -2, -3, -4, -5, \
1597 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1598 254, 65534, 1, 1)",
1599 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1601 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1602 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1603 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1604 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1605 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1606 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1608 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1609 254, 65534, 1, 1)",
1610 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1611 NULL, NULL, NULL, NULL, NULL,
1612 NULL, NULL, NULL, NULL)",
1613 "create table `table` (ts timestamp, v int)",
1615 "alter table stb1 add column new1 bool",
1617 "alter table stb1 add column new2 tinyint",
1618 "alter table stb1 add column new10 nchar(16)",
1619 "alter table stb1 modify column new10 nchar(32)",
1620 "alter table stb1 drop column new10",
1621 "alter table stb1 drop column new2",
1622 "alter table stb1 drop column new1",
1623 "alter table `stb2` add tag new1 bool",
1625 "alter table `stb2` rename tag new1 new1_new",
1626 "alter table `stb2` modify tag t10 nchar(32)",
1627 "alter table `stb2` drop tag new1_new",
1628 "alter table `table` add column new1 bool",
1630 "alter table `table` add column new2 tinyint",
1631 "alter table `table` add column new10 nchar(16)",
1632 "alter table `table` modify column new10 nchar(32)",
1633 "alter table `table` rename column new10 new10_new",
1634 "alter table `table` drop column new10_new",
1635 "alter table `table` drop column new2",
1636 "alter table `table` drop column new1",
1637 ])
1645 .await?;
1646
1647 taos.exec_many([
1648 "drop database if exists ws_tmq_meta_batch2",
1649 "create database if not exists ws_tmq_meta_batch2 wal_retention_period 3600",
1650 "use ws_tmq_meta_batch2",
1651 ])
1652 .await?;
1653
1654 let builder = TmqBuilder::new(
1655 "taos://localhost:6041?group.id=10&timeout=5s&auto.offset.reset=earliest&msg.enable.batchmeta=true&experimental.snapshot.enable=true",
1656 )?;
1657 let mut consumer = builder.build_consumer().await?;
1658 consumer.subscribe(["ws_tmq_meta_batch"]).await?;
1659
1660 {
1661 let mut stream = consumer.stream();
1662
1663 let mut count = 0;
1664 while let Some((offset, message)) = stream.try_next().await? {
1665 let _ = offset.topic();
1668 let _ = offset.database();
1669 let _ = offset.vgroup_id();
1670 count += 1;
1671
1672 match message {
1678 MessageSet::Meta(meta) => {
1679 let _raw = meta.as_raw_meta().await?;
1680 let json = meta.as_json_meta().await?;
1684 tracing::info!(count, batch.size = json.iter().len(), "{json:?}");
1685 for meta in &json {
1687 let sql = meta.to_string();
1688 tracing::debug!(count, "sql: {}", sql);
1689 if let Err(err) = taos.exec(sql).await {
1690 match err.code() {
1691 Code::TAG_ALREADY_EXIST => log::trace!("tag already exists"),
1692 Code::TAG_NOT_EXIST => log::trace!("tag not exist"),
1693 Code::COLUMN_EXISTS => log::trace!("column already exists"),
1694 Code::COLUMN_NOT_EXIST => log::trace!("column not exists"),
1695 Code::INVALID_COLUMN_NAME => log::trace!("invalid column name"),
1696 Code::MODIFIED_ALREADY => log::trace!("modified already done"),
1697 Code::TABLE_NOT_EXIST => log::trace!("table does not exists"),
1698 Code::STABLE_NOT_EXIST => log::trace!("stable does not exists"),
1699 _ => {
1700 tracing::error!(count, "{}", err);
1701 }
1702 }
1703 }
1704 }
1705 }
1706 MessageSet::Data(data) => {
1707 while let Some(_data) = data.fetch_block().await? {
1709 }
1712 }
1713 _ => unreachable!(),
1714 }
1715 consumer.commit(offset).await?;
1716 }
1717 }
1718 consumer.unsubscribe().await;
1719
1720 tokio::time::sleep(Duration::from_secs(2)).await;
1721
1722 taos.exec_many([
1723 "drop database ws_tmq_meta_batch2",
1724 "drop topic ws_tmq_meta_batch",
1725 "drop database ws_tmq_meta_batch",
1726 ])
1727 .await?;
1728 Ok(())
1729 }
1730
1731 #[tokio::test]
1732 async fn test_ws_tmq_meta() -> anyhow::Result<()> {
1733 use taos_query::prelude::*;
1734 let _ = tracing_subscriber::fmt()
1735 .with_file(true)
1736 .with_line_number(true)
1737 .with_max_level(tracing::Level::DEBUG)
1738 .compact()
1739 .try_init();
1740
1741 let taos = TaosBuilder::from_dsn("taos://localhost:6041")?
1742 .build()
1743 .await?;
1744 taos.exec_many([
1745 "drop topic if exists ws_tmq_meta",
1746 "drop database if exists ws_tmq_meta",
1747 "create database ws_tmq_meta wal_retention_period 3600",
1748 "create topic ws_tmq_meta with meta as database ws_tmq_meta",
1749 "use ws_tmq_meta",
1750 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1752 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1753 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1754 tags(t1 json)",
1755 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1757 "create table tb1 using stb1 tags(NULL)",
1758 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1759 NULL, NULL, NULL, NULL, NULL,
1760 NULL, NULL, NULL, NULL)
1761 tb1 values(now, true, -2, -3, -4, -5, \
1762 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1763 254, 65534, 1, 1)",
1764 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1766 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1767 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1768 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1769 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1770 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1771 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1773 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1774 254, 65534, 1, 1)",
1775 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1776 NULL, NULL, NULL, NULL, NULL,
1777 NULL, NULL, NULL, NULL)",
1778 "create table `table` (ts timestamp, v int)",
1780 "alter table stb1 add column new1 bool",
1782 "alter table stb1 add column new2 tinyint",
1783 "alter table stb1 add column new10 nchar(16)",
1784 "alter table stb1 modify column new10 nchar(32)",
1785 "alter table stb1 drop column new10",
1786 "alter table stb1 drop column new2",
1787 "alter table stb1 drop column new1",
1788 "alter table `stb2` add tag new1 bool",
1790 "alter table `stb2` rename tag new1 new1_new",
1791 "alter table `stb2` modify tag t10 nchar(32)",
1792 "alter table `stb2` drop tag new1_new",
1793 "alter table `table` add column new1 bool",
1795 "alter table `table` add column new2 tinyint",
1796 "alter table `table` add column new10 nchar(16)",
1797 "alter table `table` modify column new10 nchar(32)",
1798 "alter table `table` rename column new10 new10_new",
1799 "alter table `table` drop column new10_new",
1800 "alter table `table` drop column new2",
1801 "alter table `table` drop column new1",
1802 ])
1810 .await?;
1811
1812 taos.exec_many([
1813 "drop database if exists ws_tmq_meta2",
1814 "create database if not exists ws_tmq_meta2 wal_retention_period 3600",
1815 "use ws_tmq_meta2",
1816 ])
1817 .await?;
1818
1819 let builder = TmqBuilder::new(
1820 "taos://localhost:6041?group.id=10&timeout=5s&auto.offset.reset=earliest",
1821 )?;
1822 let mut consumer = builder.build_consumer().await?;
1823 consumer.subscribe(["ws_tmq_meta"]).await?;
1824
1825 {
1826 let mut stream = consumer.stream();
1827
1828 let mut count = 0;
1829 while let Some((offset, message)) = stream.try_next().await? {
1830 let _ = offset.topic();
1833 let _ = offset.database();
1834 let _ = offset.vgroup_id();
1835 count += 1;
1836
1837 match message {
1843 MessageSet::Meta(meta) => {
1844 let _raw = meta.as_raw_meta().await?;
1845 let json = meta.as_json_meta().await?;
1849 for meta in &json {
1850 let sql = meta.to_string();
1851 log::debug!("sql: {}", sql);
1852 if let Err(err) = taos.exec(sql).await {
1853 match err.code() {
1854 Code::TAG_ALREADY_EXIST => log::trace!("tag already exists"),
1855 Code::TAG_NOT_EXIST => log::trace!("tag not exist"),
1856 Code::COLUMN_EXISTS => log::trace!("column already exists"),
1857 Code::COLUMN_NOT_EXIST => log::trace!("column not exists"),
1858 Code::INVALID_COLUMN_NAME => log::trace!("invalid column name"),
1859 Code::MODIFIED_ALREADY => log::trace!("modified already done"),
1860 Code::TABLE_NOT_EXIST => log::trace!("table does not exists"),
1861 Code::STABLE_NOT_EXIST => log::trace!("stable does not exists"),
1862 _ => {
1863 tracing::error!(count, "{}", err);
1864 }
1865 }
1866 }
1867 }
1868 }
1869 MessageSet::Data(data) => {
1870 while let Some(_data) = data.fetch_block().await? {
1872 }
1875 }
1876 _ => unreachable!(),
1877 }
1878 consumer.commit(offset).await?;
1879 }
1880 }
1881 consumer.unsubscribe().await;
1882
1883 tokio::time::sleep(Duration::from_secs(2)).await;
1884
1885 taos.exec_many([
1886 "drop database ws_tmq_meta2",
1887 "drop topic ws_tmq_meta",
1888 "drop database ws_tmq_meta",
1889 ])
1890 .await?;
1891 Ok(())
1892 }
1893
1894 #[test]
1895 fn test_ws_tmq_meta_sync() -> anyhow::Result<()> {
1896 use taos_query::prelude::sync::*;
1897 let taos = TaosBuilder::from_dsn("ws://localhost:6041")?.build()?;
1902 taos.exec_many([
1903 "drop topic if exists ws_tmq_meta_sync",
1904 "drop database if exists ws_tmq_meta_sync",
1905 "create database ws_tmq_meta_sync wal_retention_period 3600",
1906 "create topic ws_tmq_meta_sync with meta as database ws_tmq_meta_sync",
1907 "use ws_tmq_meta_sync",
1908 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1910 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
1911 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1912 tags(t1 json)",
1913 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
1915 "create table tb1 using stb1 tags(NULL)",
1916 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
1917 NULL, NULL, NULL, NULL, NULL,
1918 NULL, NULL, NULL, NULL)
1919 tb1 values(now, true, -2, -3, -4, -5, \
1920 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1921 254, 65534, 1, 1)",
1922 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
1924 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
1925 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
1926 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
1927 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
1928 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
1929 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
1931 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
1932 254, 65534, 1, 1)",
1933 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
1934 NULL, NULL, NULL, NULL, NULL,
1935 NULL, NULL, NULL, NULL)",
1936 "create table `table` (ts timestamp, v int)",
1938 "alter table stb1 add column new1 bool",
1940 "alter table stb1 add column new2 tinyint",
1941 "alter table stb1 add column new10 nchar(16)",
1942 "alter table stb1 modify column new10 nchar(32)",
1943 "alter table stb1 drop column new10",
1944 "alter table stb1 drop column new2",
1945 "alter table stb1 drop column new1",
1946 "alter table `stb2` add tag new1 bool",
1948 "alter table `stb2` rename tag new1 new1_new",
1949 "alter table `stb2` modify tag t10 nchar(32)",
1950 "alter table `stb2` drop tag new1_new",
1951 "alter table `table` add column new1 bool",
1953 "alter table `table` add column new2 tinyint",
1954 "alter table `table` add column new10 nchar(16)",
1955 "alter table `table` modify column new10 nchar(32)",
1956 "alter table `table` rename column new10 new10_new",
1957 "alter table `table` drop column new10_new",
1958 "alter table `table` drop column new2",
1959 "alter table `table` drop column new1",
1960 "drop table `table`",
1962 "drop table `tb2`, `tb1`",
1964 "drop table `stb2`",
1966 "drop table `stb1`",
1967 ])?;
1968
1969 taos.exec_many([
1970 "drop database if exists ws_tmq_meta_sync2",
1971 "create database if not exists ws_tmq_meta_sync2 wal_retention_period 3600",
1972 "use ws_tmq_meta_sync2",
1973 ])?;
1974
1975 let builder = TmqBuilder::new(
1976 "taos://localhost:6041?group.id=10&timeout=1000ms&auto.offset.reset=earliest",
1977 )?;
1978 let mut consumer = builder.build()?;
1979 consumer.subscribe(["ws_tmq_meta_sync"])?;
1980
1981 let topics = consumer.list_topics();
1982 log::debug!("topics: {:?}", topics);
1983
1984 let iter = consumer.iter_with_timeout(Timeout::from_secs(1));
1985
1986 for msg in iter {
1987 let (offset, message) = msg?;
1988 let _ = offset.topic();
1991 let _ = offset.database();
1992 let _ = offset.vgroup_id();
1993
1994 match message {
1999 MessageSet::Meta(meta) => {
2000 let _raw = meta.as_raw_meta()?;
2001 let json = meta.as_json_meta()?;
2005 for meta in &json {
2006 let sql = meta.to_string();
2007 log::debug!("sql: {}", sql);
2008 if let Err(err) = taos.exec(sql) {
2009 match err.code() {
2010 Code::TAG_ALREADY_EXIST => log::trace!("tag already exists"),
2011 Code::TAG_NOT_EXIST => log::trace!("tag not exist"),
2012 Code::COLUMN_EXISTS => log::trace!("column already exists"),
2013 Code::COLUMN_NOT_EXIST => log::trace!("column not exists"),
2014 Code::INVALID_COLUMN_NAME => log::trace!("invalid column name"),
2015 Code::MODIFIED_ALREADY => log::trace!("modified already done"),
2016 Code::TABLE_NOT_EXIST => log::trace!("table does not exists"),
2017 Code::STABLE_NOT_EXIST => log::trace!("stable does not exists"),
2018 _ => {
2019 log::error!("{}", err);
2020 }
2021 }
2022 }
2023 }
2024 }
2025 MessageSet::Data(data) => {
2026 for block in data {
2028 let _block = block?;
2029 }
2032 }
2033 _ => unreachable!(),
2034 }
2035 consumer.commit(offset)?;
2036 }
2037
2038 let assignments = consumer.assignments();
2040 log::debug!("assignments all: {:?}", assignments);
2041
2042 if let Some(assignments) = assignments {
2043 for (topic, assignment_vec) in assignments {
2044 for assignment in assignment_vec {
2045 log::debug!("assignment: {:?} {:?}", topic, assignment);
2046 let vgroup_id = assignment.vgroup_id();
2047 let end = assignment.end();
2048
2049 let position = consumer.position(&topic, vgroup_id);
2050 log::debug!("position: {:?}", position);
2051 let committed = consumer.committed(&topic, vgroup_id);
2052 log::debug!("committed: {:?}", committed);
2053
2054 let res = consumer.offset_seek(&topic, vgroup_id, end);
2055 log::debug!("seek: {:?}", res);
2056
2057 let position = consumer.position(&topic, vgroup_id);
2058 log::debug!("after seek position: {:?}", position);
2059 let committed = consumer.committed(&topic, vgroup_id);
2060 log::debug!("after seek committed: {:?}", committed);
2061
2062 let res = consumer.commit_offset(&topic, vgroup_id, end);
2063 log::debug!("commit offset: {:?}", res);
2064
2065 let position = consumer.position(&topic, vgroup_id);
2066 log::debug!("after commit offset position: {:?}", position);
2067 let committed = consumer.committed(&topic, vgroup_id);
2068 log::debug!("after commit offset committed: {:?}", committed);
2069 }
2070 }
2071 }
2072
2073 consumer.unsubscribe();
2074
2075 std::thread::sleep(Duration::from_secs(5));
2076
2077 taos.exec_many([
2078 "drop database ws_tmq_meta_sync2",
2079 "drop topic ws_tmq_meta_sync",
2080 "drop database ws_tmq_meta_sync",
2081 ])?;
2082 Ok(())
2083 }
2084
2085 #[test]
2086 fn test_ws_tmq_metadata() -> anyhow::Result<()> {
2087 use taos_query::prelude::sync::*;
2088 let taos = TaosBuilder::from_dsn("ws://localhost:6041")?.build()?;
2093 taos.exec_many([
2094 "drop topic if exists ws_tmq_meta_sync3",
2095 "drop database if exists ws_tmq_meta_sync3",
2096 "create database ws_tmq_meta_sync3 wal_retention_period 3600",
2097 "create topic ws_tmq_meta_sync3 with meta as database ws_tmq_meta_sync3",
2098 "use ws_tmq_meta_sync3",
2099 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2101 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
2102 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2103 tags(t1 json)",
2104 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
2106 "create table tb1 using stb1 tags(NULL)",
2107 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
2108 NULL, NULL, NULL, NULL, NULL,
2109 NULL, NULL, NULL, NULL)
2110 tb1 values(now, true, -2, -3, -4, -5, \
2111 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2112 254, 65534, 1, 1)",
2113 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2115 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
2116 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2117 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
2118 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
2119 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
2120 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
2122 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2123 254, 65534, 1, 1)",
2124 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
2125 NULL, NULL, NULL, NULL, NULL,
2126 NULL, NULL, NULL, NULL)",
2127 "create table `table` (ts timestamp, v int)",
2129 "alter table stb1 add column new1 bool",
2131 "alter table stb1 add column new2 tinyint",
2132 "alter table stb1 add column new10 nchar(16)",
2133 "alter table stb1 modify column new10 nchar(32)",
2134 "alter table stb1 drop column new10",
2135 "alter table stb1 drop column new2",
2136 "alter table stb1 drop column new1",
2137 "alter table `stb2` add tag new1 bool",
2139 "alter table `stb2` rename tag new1 new1_new",
2140 "alter table `stb2` modify tag t10 nchar(32)",
2141 "alter table `stb2` drop tag new1_new",
2142 "alter table `table` add column new1 bool",
2144 "alter table `table` add column new2 tinyint",
2145 "alter table `table` add column new10 nchar(16)",
2146 "alter table `table` modify column new10 nchar(32)",
2147 "alter table `table` rename column new10 new10_new",
2148 "alter table `table` drop column new10_new",
2149 "alter table `table` drop column new2",
2150 "alter table `table` drop column new1",
2151 "drop table `table`",
2153 "drop table `tb2`, `tb1`",
2155 "drop table `stb2`",
2157 "drop table `stb1`",
2158 ])?;
2159
2160 taos.exec_many([
2161 "drop database if exists ws_tmq_meta_sync32",
2162 "create database if not exists ws_tmq_meta_sync32 wal_retention_period 3600",
2163 "use ws_tmq_meta_sync32",
2164 ])?;
2165
2166 let builder = TmqBuilder::new(
2167 "taos://localhost:6041?group.id=10&timeout=1000ms&auto.offset.reset=earliest",
2168 )?;
2169 let mut consumer = builder.build()?;
2170 consumer.subscribe(["ws_tmq_meta_sync3"])?;
2171
2172 let iter = consumer.iter_with_timeout(Timeout::from_secs(1));
2173
2174 for msg in iter {
2175 let (offset, message) = msg?;
2176 let _ = offset.topic();
2179 let _ = offset.database();
2180 let _ = offset.vgroup_id();
2181
2182 match message {
2187 MessageSet::Meta(meta) => {
2188 let _raw = meta.as_raw_meta()?;
2189 let json = meta.as_json_meta()?;
2193 for json in &json {
2194 let sql = json.to_string();
2195 log::debug!("sql: {}", sql);
2196 if let Err(err) = taos.exec(sql) {
2197 match err.code() {
2198 Code::TAG_ALREADY_EXIST => log::trace!("tag already exists"),
2199 Code::TAG_NOT_EXIST => log::trace!("tag not exist"),
2200 Code::COLUMN_EXISTS => log::trace!("column already exists"),
2201 Code::COLUMN_NOT_EXIST => log::trace!("column not exists"),
2202 Code::INVALID_COLUMN_NAME => log::trace!("invalid column name"),
2203 Code::MODIFIED_ALREADY => log::trace!("modified already done"),
2204 Code::TABLE_NOT_EXIST => log::trace!("table does not exists"),
2205 Code::STABLE_NOT_EXIST => log::trace!("stable does not exists"),
2206 _ => {
2207 log::error!("{}", err);
2208 }
2209 }
2210 }
2211 }
2212 }
2213 MessageSet::Data(data) => {
2214 for block in data {
2216 let _block = block?;
2217 }
2220 }
2221 _ => unreachable!(),
2222 }
2223 consumer.commit(offset)?;
2224 }
2225 consumer.unsubscribe();
2226
2227 std::thread::sleep(Duration::from_secs(5));
2228
2229 taos.exec_many([
2230 "drop database ws_tmq_meta_sync32",
2231 "drop topic ws_tmq_meta_sync3",
2232 "drop database ws_tmq_meta_sync3",
2233 ])?;
2234 Ok(())
2235 }
2236
2237 #[cfg(feature = "rustls")]
2238 #[tokio::test]
2239 async fn test_consumer_cloud() -> anyhow::Result<()> {
2240 use taos_query::prelude::*;
2241 std::env::set_var("RUST_LOG", "debug");
2242 let dsn = std::env::var("TDENGINE_ClOUD_DSN");
2246 if dsn.is_err() {
2247 println!("Skip test when not in cloud");
2248 return Ok(());
2249 }
2250 let dsn = dsn.unwrap();
2251
2252 let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
2253 taos.exec_many([
2254 "drop topic if exists ws_tmq_meta",
2255 "drop database if exists ws_tmq_meta",
2256 "create database ws_tmq_meta wal_retention_period 3600",
2257 "create topic ws_tmq_meta with meta as database ws_tmq_meta",
2258 "use ws_tmq_meta",
2259 "create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2261 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
2262 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2263 tags(t1 json)",
2264 "create table tb0 using stb1 tags('{\"name\":\"value\"}')",
2266 "create table tb1 using stb1 tags(NULL)",
2267 "insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
2268 NULL, NULL, NULL, NULL, NULL,
2269 NULL, NULL, NULL, NULL)
2270 tb1 values(now, true, -2, -3, -4, -5, \
2271 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2272 254, 65534, 1, 1)",
2273 "create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
2275 c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
2276 c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
2277 tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
2278 t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
2279 t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
2280 "create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
2282 '2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
2283 254, 65534, 1, 1)",
2284 "create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
2285 NULL, NULL, NULL, NULL, NULL,
2286 NULL, NULL, NULL, NULL)",
2287 "create table `table` (ts timestamp, v int)",
2289 "alter table stb1 add column new1 bool",
2291 "alter table stb1 add column new2 tinyint",
2292 "alter table stb1 add column new10 nchar(16)",
2293 "alter table stb1 modify column new10 nchar(32)",
2294 "alter table stb1 drop column new10",
2295 "alter table stb1 drop column new2",
2296 "alter table stb1 drop column new1",
2297 "alter table `stb2` add tag new1 bool",
2299 "alter table `stb2` rename tag new1 new1_new",
2300 "alter table `stb2` modify tag t10 nchar(32)",
2301 "alter table `stb2` drop tag new1_new",
2302 "alter table `table` add column new1 bool",
2304 "alter table `table` add column new2 tinyint",
2305 "alter table `table` add column new10 nchar(16)",
2306 "alter table `table` modify column new10 nchar(32)",
2307 "alter table `table` rename column new10 new10_new",
2308 "alter table `table` drop column new10_new",
2309 "alter table `table` drop column new2",
2310 "alter table `table` drop column new1",
2311 ])
2312 .await?;
2313
2314 taos.exec_many([
2315 "drop database if exists ws_tmq_meta2",
2316 "create database if not exists ws_tmq_meta2 wal_retention_period 3600",
2317 "use ws_tmq_meta2",
2318 ])
2319 .await?;
2320
2321 let builder = TmqBuilder::new(&dsn)?;
2322 let mut consumer = builder.build_consumer().await?;
2323 consumer.subscribe(["ws_tmq_meta"]).await?;
2324
2325 {
2326 let mut stream = consumer.stream();
2327
2328 while let Some((offset, message)) = stream.try_next().await? {
2329 let _ = offset.topic();
2332 let _ = offset.database();
2333 let _ = offset.vgroup_id();
2334
2335 match message {
2336 MessageSet::Meta(meta) => {
2337 let _raw = meta.as_raw_meta().await?;
2338
2339 let json = meta.as_json_meta().await?;
2341 for meta in &json {
2342 let sql = meta.to_string();
2343 log::debug!("sql: {}", sql);
2344 if let Err(err) = taos.exec(sql).await {
2345 match err.code() {
2346 Code::TAG_ALREADY_EXIST => log::trace!("tag already exists"),
2347 Code::TAG_NOT_EXIST => log::trace!("tag not exist"),
2348 Code::COLUMN_EXISTS => log::trace!("column already exists"),
2349 Code::COLUMN_NOT_EXIST => log::trace!("column not exists"),
2350 Code::INVALID_COLUMN_NAME => log::trace!("invalid column name"),
2351 Code::MODIFIED_ALREADY => log::trace!("modified already done"),
2352 Code::TABLE_NOT_EXIST => log::trace!("table does not exists"),
2353 Code::STABLE_NOT_EXIST => log::trace!("stable does not exists"),
2354 _ => {
2355 log::error!("{}", err);
2356 }
2357 }
2358 }
2359 }
2360 }
2361 MessageSet::Data(data) => {
2362 while let Some(_data) = data.fetch_block().await? {
2364 }
2366 }
2367 _ => unreachable!(),
2368 }
2369 consumer.commit(offset).await?;
2370 }
2371 }
2372 consumer.unsubscribe().await;
2373
2374 tokio::time::sleep(Duration::from_secs(2)).await;
2375
2376 taos.exec_many([
2377 "drop database ws_tmq_meta2",
2378 "drop topic ws_tmq_meta",
2379 "drop database ws_tmq_meta",
2380 ])
2381 .await?;
2382 Ok(())
2383 }
2384}