use taos_query::{
prelude::{AsAsyncConsumer, RawMeta, Timeout},
tmq::{Assignment, VGroupId},
RawBlock, RawResult,
};
#[derive(Debug)]
enum TmqBuilderInner {
Native(crate::sys::TmqBuilder),
Ws(taos_ws::consumer::TmqBuilder),
}
#[derive(Debug)]
enum ConsumerInner {
Native(crate::sys::Consumer),
Ws(taos_ws::consumer::Consumer),
}
#[derive(Debug)]
enum OffsetInner {
Native(crate::sys::tmq::Offset),
Ws(taos_ws::consumer::Offset),
}
#[derive(Debug)]
enum MetaInner {
Native(crate::sys::tmq::Meta),
Ws(taos_ws::consumer::Meta),
}
#[derive(Debug)]
enum DataInner {
Native(crate::sys::tmq::Data),
Ws(taos_ws::consumer::Data),
}
#[derive(Debug)]
pub struct Offset(OffsetInner);
#[derive(Debug)]
pub struct Meta(MetaInner);
#[derive(Debug)]
pub struct Data(DataInner);
pub type MessageSet<Meta, Data> = taos_query::tmq::MessageSet<Meta, Data>;
#[derive(Debug)]
pub struct TmqBuilder(TmqBuilderInner);
#[derive(Debug)]
pub struct Consumer(ConsumerInner);
impl taos_query::TBuilder for TmqBuilder {
type Target = Consumer;
fn available_params() -> &'static [&'static str] {
&[]
}
fn from_dsn<D: taos_query::IntoDsn>(dsn: D) -> RawResult<Self> {
let dsn = dsn.into_dsn()?;
match (dsn.driver.as_str(), dsn.protocol.as_deref()) {
("ws" | "wss" | "http" | "https" | "taosws", _) => Ok(Self(TmqBuilderInner::Ws(
taos_ws::consumer::TmqBuilder::from_dsn(dsn)?,
))),
("taos" | "tmq", None) => Ok(Self(TmqBuilderInner::Native(
crate::sys::TmqBuilder::from_dsn(dsn)?,
))),
("taos" | "tmq", Some("ws" | "wss" | "http" | "https")) => Ok(Self(
TmqBuilderInner::Ws(taos_ws::consumer::TmqBuilder::from_dsn(dsn)?),
)),
(driver, _) => Err(taos_query::DsnError::InvalidDriver(driver.to_string()).into()),
}
}
fn client_version() -> &'static str {
""
}
fn ping(&self, conn: &mut Self::Target) -> RawResult<()> {
match &self.0 {
TmqBuilderInner::Native(b) => match &mut conn.0 {
ConsumerInner::Native(taos) => Ok(b.ping(taos)?),
_ => unreachable!(),
},
TmqBuilderInner::Ws(b) => match &mut conn.0 {
ConsumerInner::Ws(taos) => Ok(b.ping(taos)?),
_ => unreachable!(),
},
}
}
fn ready(&self) -> bool {
match &self.0 {
TmqBuilderInner::Native(b) => b.ready(),
TmqBuilderInner::Ws(b) => b.ready(),
}
}
fn build(&self) -> RawResult<Self::Target> {
match &self.0 {
TmqBuilderInner::Native(b) => Ok(Consumer(ConsumerInner::Native(b.build()?))),
TmqBuilderInner::Ws(b) => Ok(Consumer(ConsumerInner::Ws(b.build()?))),
}
}
fn server_version(&self) -> RawResult<&str> {
todo!()
}
fn is_enterprise_edition(&self) -> RawResult<bool> {
todo!()
}
fn get_edition(&self) -> RawResult<taos_query::util::Edition> {
match &self.0 {
TmqBuilderInner::Native(b) => Ok(b.get_edition()?),
TmqBuilderInner::Ws(b) => Ok(b.get_edition()?),
}
}
}
#[async_trait::async_trait]
impl taos_query::AsyncTBuilder for TmqBuilder {
type Target = Consumer;
fn from_dsn<D: taos_query::IntoDsn>(dsn: D) -> RawResult<Self> {
let dsn = dsn.into_dsn()?;
match (dsn.driver.as_str(), dsn.protocol.as_deref()) {
("ws" | "wss" | "http" | "https" | "taosws", _) => Ok(Self(TmqBuilderInner::Ws(
taos_ws::consumer::TmqBuilder::from_dsn(dsn)?,
))),
("taos" | "tmq", None) => Ok(Self(TmqBuilderInner::Native(
crate::sys::TmqBuilder::from_dsn(dsn)?,
))),
("taos" | "tmq", Some("ws" | "wss" | "http" | "https")) => Ok(Self(
TmqBuilderInner::Ws(taos_ws::consumer::TmqBuilder::from_dsn(dsn)?),
)),
(driver, _) => Err(taos_query::DsnError::InvalidDriver(driver.to_string()).into()),
}
}
fn client_version() -> &'static str {
""
}
async fn ping(&self, conn: &mut Self::Target) -> RawResult<()> {
match &self.0 {
TmqBuilderInner::Native(b) => match &mut conn.0 {
ConsumerInner::Native(taos) => Ok(b.ping(taos).await?),
_ => unreachable!(),
},
TmqBuilderInner::Ws(b) => match &mut conn.0 {
ConsumerInner::Ws(taos) => Ok(b.ping(taos).await?),
_ => unreachable!(),
},
}
}
async fn ready(&self) -> bool {
match &self.0 {
TmqBuilderInner::Native(b) => b.ready().await,
TmqBuilderInner::Ws(b) => b.ready().await,
}
}
async fn build(&self) -> RawResult<Self::Target> {
match &self.0 {
TmqBuilderInner::Native(b) => Ok(Consumer(ConsumerInner::Native(b.build().await?))),
TmqBuilderInner::Ws(b) => Ok(Consumer(ConsumerInner::Ws(b.build().await?))),
}
}
async fn server_version(&self) -> RawResult<&str> {
match &self.0 {
TmqBuilderInner::Native(b) => Ok(b.server_version().await?),
TmqBuilderInner::Ws(b) => Ok(b.server_version().await?),
}
}
async fn is_enterprise_edition(&self) -> RawResult<bool> {
match &self.0 {
TmqBuilderInner::Native(b) => Ok(b.is_enterprise_edition().await?),
TmqBuilderInner::Ws(b) => Ok(b.is_enterprise_edition().await?),
}
}
async fn get_edition(&self) -> RawResult<taos_query::util::Edition> {
match &self.0 {
TmqBuilderInner::Native(b) => Ok(b.get_edition().await?),
TmqBuilderInner::Ws(b) => Ok(b.get_edition().await?),
}
}
}
impl taos_query::tmq::IsOffset for Offset {
fn database(&self) -> &str {
match &self.0 {
OffsetInner::Native(offset) => {
<crate::sys::tmq::Offset as taos_query::tmq::IsOffset>::database(offset)
}
OffsetInner::Ws(offset) => {
<taos_ws::consumer::Offset as taos_query::tmq::IsOffset>::database(offset)
}
}
}
fn topic(&self) -> &str {
match &self.0 {
OffsetInner::Native(offset) => {
<crate::sys::tmq::Offset as taos_query::tmq::IsOffset>::topic(offset)
}
OffsetInner::Ws(offset) => {
<taos_ws::consumer::Offset as taos_query::tmq::IsOffset>::topic(offset)
}
}
}
fn vgroup_id(&self) -> taos_query::tmq::VGroupId {
match &self.0 {
OffsetInner::Native(offset) => {
<crate::sys::tmq::Offset as taos_query::tmq::IsOffset>::vgroup_id(offset)
}
OffsetInner::Ws(offset) => {
<taos_ws::consumer::Offset as taos_query::tmq::IsOffset>::vgroup_id(offset)
}
}
}
}
#[async_trait::async_trait]
impl taos_query::tmq::IsAsyncMeta for Meta {
async fn as_raw_meta(&self) -> RawResult<RawMeta> {
match &self.0 {
MetaInner::Native(data) => {
<crate::sys::tmq::Meta as taos_query::tmq::IsAsyncMeta>::as_raw_meta(data)
.await
.map_err(Into::into)
}
MetaInner::Ws(data) => {
<taos_ws::consumer::Meta as taos_query::tmq::IsAsyncMeta>::as_raw_meta(data)
.await
.map_err(Into::into)
}
}
}
async fn as_json_meta(&self) -> RawResult<taos_query::common::JsonMeta> {
match &self.0 {
MetaInner::Native(data) => {
<crate::sys::tmq::Meta as taos_query::tmq::IsAsyncMeta>::as_json_meta(data)
.await
.map_err(Into::into)
}
MetaInner::Ws(data) => {
<taos_ws::consumer::Meta as taos_query::tmq::IsAsyncMeta>::as_json_meta(data)
.await
.map_err(Into::into)
}
}
}
}
#[async_trait::async_trait]
impl taos_query::tmq::IsAsyncData for Data {
async fn as_raw_data(&self) -> RawResult<taos_query::common::RawData> {
match &self.0 {
DataInner::Native(data) => {
<crate::sys::tmq::Data as taos_query::tmq::IsAsyncData>::as_raw_data(data)
.await
.map_err(Into::into)
}
DataInner::Ws(data) => {
<taos_ws::consumer::Data as taos_query::tmq::IsAsyncData>::as_raw_data(data)
.await
.map_err(Into::into)
}
}
}
async fn fetch_raw_block(&self) -> RawResult<Option<taos_query::RawBlock>> {
match &self.0 {
DataInner::Native(data) => {
<crate::sys::tmq::Data as taos_query::tmq::IsAsyncData>::fetch_raw_block(data)
.await
.map_err(Into::into)
}
DataInner::Ws(data) => {
<taos_ws::consumer::Data as taos_query::tmq::IsAsyncData>::fetch_raw_block(data)
.await
.map_err(Into::into)
}
}
}
}
#[async_trait::async_trait]
impl AsAsyncConsumer for Consumer {
type Offset = Offset;
type Meta = Meta;
type Data = Data;
fn default_timeout(&self) -> Timeout {
match &self.0 {
ConsumerInner::Native(c) => {
<crate::sys::Consumer as AsAsyncConsumer>::default_timeout(c)
}
ConsumerInner::Ws(c) => {
<taos_ws::consumer::Consumer as AsAsyncConsumer>::default_timeout(c)
}
}
}
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
&mut self,
topics: I,
) -> RawResult<()> {
match &mut self.0 {
ConsumerInner::Native(c) => {
<crate::sys::Consumer as AsAsyncConsumer>::subscribe(c, topics)
.await
.map_err(Into::into)
}
ConsumerInner::Ws(c) => {
<taos_ws::consumer::Consumer as AsAsyncConsumer>::subscribe(c, topics)
.await
.map_err(Into::into)
}
}
}
async fn unsubscribe(self) {
match self.0 {
ConsumerInner::Native(c) => {
<crate::sys::Consumer as AsAsyncConsumer>::unsubscribe(c).await
}
ConsumerInner::Ws(c) => {
<taos_ws::consumer::Consumer as AsAsyncConsumer>::unsubscribe(c).await
}
}
}
async fn recv_timeout(
&self,
timeout: Timeout,
) -> RawResult<Option<(Self::Offset, MessageSet<Self::Meta, Self::Data>)>> {
match &self.0 {
ConsumerInner::Native(c) => {
<crate::sys::Consumer as AsAsyncConsumer>::recv_timeout(c, timeout)
.await
.map_err(Into::into)
.map(|msg| {
msg.map(|(offset, msg)| {
(
Offset(OffsetInner::Native(offset)),
match msg {
MessageSet::Meta(meta) => {
MessageSet::Meta(Meta(MetaInner::Native(meta)))
}
MessageSet::Data(data) => {
MessageSet::Data(Data(DataInner::Native(data)))
}
MessageSet::MetaData(meta, data) => MessageSet::MetaData(
Meta(MetaInner::Native(meta)),
Data(DataInner::Native(data)),
),
},
)
})
})
}
ConsumerInner::Ws(c) => {
<taos_ws::consumer::Consumer as AsAsyncConsumer>::recv_timeout(c, timeout)
.await
.map_err(Into::into)
.map(|msg| {
msg.map(|(offset, msg)| {
(
Offset(OffsetInner::Ws(offset)),
match msg {
taos_query::tmq::MessageSet::Meta(meta) => {
MessageSet::Meta(Meta(MetaInner::Ws(meta)))
}
taos_query::tmq::MessageSet::Data(data) => {
MessageSet::Data(Data(DataInner::Ws(data)))
}
taos_query::tmq::MessageSet::MetaData(meta, data) => {
MessageSet::MetaData(
Meta(MetaInner::Ws(meta)),
Data(DataInner::Ws(data)),
)
}
},
)
})
})
}
}
}
async fn commit(&self, offset: Self::Offset) -> RawResult<()> {
match &self.0 {
ConsumerInner::Native(c) => match offset.0 {
OffsetInner::Native(offset) => {
<crate::sys::Consumer as AsAsyncConsumer>::commit(c, offset)
.await
.map_err(Into::into)
}
OffsetInner::Ws(_) => unreachable!(),
},
ConsumerInner::Ws(c) => match offset.0 {
OffsetInner::Ws(offset) => {
<taos_ws::consumer::Consumer as AsAsyncConsumer>::commit(c, offset)
.await
.map_err(Into::into)
}
_ => unreachable!(),
},
}
}
async fn commit_offset(&self, topic: &str, vgroup_id: VGroupId, offset: i64) -> RawResult<()> {
match &self.0 {
ConsumerInner::Native(c) => <crate::sys::Consumer as AsAsyncConsumer>::commit_offset(
c, topic, vgroup_id, offset,
)
.await
.map_err(Into::into),
ConsumerInner::Ws(c) => {
<taos_ws::consumer::Consumer as AsAsyncConsumer>::commit_offset(
c, topic, vgroup_id, offset,
)
.await
.map_err(Into::into)
}
}
}
async fn list_topics(&self) -> RawResult<Vec<String>> {
match &self.0 {
ConsumerInner::Native(c) => <crate::sys::Consumer as AsAsyncConsumer>::list_topics(c)
.await
.map_err(Into::into),
ConsumerInner::Ws(c) => {
<taos_ws::consumer::Consumer as AsAsyncConsumer>::list_topics(c)
.await
.map_err(Into::into)
}
}
}
async fn assignments(&self) -> Option<Vec<(String, Vec<Assignment>)>> {
match &self.0 {
ConsumerInner::Native(c) => {
<crate::sys::Consumer as AsAsyncConsumer>::assignments(c).await
}
ConsumerInner::Ws(c) => {
<taos_ws::consumer::Consumer as AsAsyncConsumer>::assignments(c).await
}
}
}
async fn topic_assignment(&self, topic: &str) -> Vec<Assignment> {
match &self.0 {
ConsumerInner::Native(c) => {
<crate::sys::Consumer as AsAsyncConsumer>::topic_assignment(c, topic).await
}
ConsumerInner::Ws(c) => {
<taos_ws::consumer::Consumer as AsAsyncConsumer>::topic_assignment(c, topic).await
}
}
}
async fn offset_seek(
&mut self,
topic: &str,
vgroup_id: VGroupId,
offset: i64,
) -> RawResult<()> {
match &mut self.0 {
ConsumerInner::Native(c) => {
<crate::sys::Consumer as AsAsyncConsumer>::offset_seek(c, topic, vgroup_id, offset)
.await
.map_err(Into::into)
}
ConsumerInner::Ws(c) => <taos_ws::consumer::Consumer as AsAsyncConsumer>::offset_seek(
c, topic, vgroup_id, offset,
)
.await
.map_err(Into::into),
}
}
async fn committed(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
match &self.0 {
ConsumerInner::Native(c) => {
<crate::sys::Consumer as AsAsyncConsumer>::committed(c, topic, vgroup_id)
.await
.map_err(Into::into)
}
ConsumerInner::Ws(c) => {
<taos_ws::consumer::Consumer as AsAsyncConsumer>::committed(c, topic, vgroup_id)
.await
.map_err(Into::into)
}
}
}
async fn position(&self, topic: &str, vgroup_id: VGroupId) -> RawResult<i64> {
match &self.0 {
ConsumerInner::Native(c) => {
<crate::sys::Consumer as AsAsyncConsumer>::position(c, topic, vgroup_id)
.await
.map_err(Into::into)
}
ConsumerInner::Ws(c) => {
<taos_ws::consumer::Consumer as AsAsyncConsumer>::position(c, topic, vgroup_id)
.await
.map_err(Into::into)
}
}
}
}
impl taos_query::tmq::SyncOnAsync for Consumer {}
impl taos_query::tmq::SyncOnAsync for Data {}
impl taos_query::tmq::SyncOnAsync for Meta {}
impl Iterator for Data {
type Item = RawResult<RawBlock>;
fn next(&mut self) -> Option<Self::Item> {
match &self.0 {
DataInner::Native(data) => {
<crate::sys::tmq::Data as taos_query::tmq::IsData>::fetch_raw_block(data)
.transpose()
}
DataInner::Ws(data) => {
<taos_ws::consumer::Data as taos_query::tmq::IsData>::fetch_raw_block(data)
.transpose()
}
}
}
}
#[cfg(test)]
mod tests {
use super::TmqBuilder;
#[test]
fn builder() -> taos_query::RawResult<()> {
use taos_query::prelude::*;
let mut dsn: Dsn = "taos://".parse()?;
dsn.set("group.id", "group1");
dsn.set("client.id", "test");
dsn.set("auto.offset.reset", "earliest");
let _tmq = TmqBuilder::from_dsn(dsn)?;
Ok(())
}
}
#[cfg(test)]
mod async_tests {
use std::{str::FromStr, time::Duration};
use super::TmqBuilder;
use crate::TaosBuilder;
#[tokio::test]
async fn test_ws_tmq_meta() -> taos_query::RawResult<()> {
use taos_query::prelude::*;
let dsn = std::env::var("TEST_DSN").unwrap_or("taos+ws://localhost:6041".to_string());
let mut dsn = Dsn::from_str(&dsn)?;
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
let db = "ws_abc1";
taos.exec(format!("drop topic if exists {db}")).await?;
taos.exec(format!("drop database if exists {db}")).await?;
std::thread::sleep(std::time::Duration::from_secs(3));
taos.exec(format!(
"create database if not exists {db} wal_retention_period 3600"
))
.await?;
std::thread::sleep(std::time::Duration::from_secs(3));
taos.exec_many([
"create topic ws_abc1 with meta as database ws_abc1",
"use ws_abc1",
"create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 json)",
"create table tb0 using stb1 tags('{\"name\":\"value\"}')",
"create table tb1 using stb1 tags(NULL)",
"insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)
tb1 values(now, true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
"create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)",
"create table `table` (ts timestamp, v int)",
"alter table stb1 add column new1 bool",
"alter table stb1 add column new2 tinyint",
"alter table stb1 add column new10 nchar(16)",
"alter table stb1 modify column new10 nchar(32)",
"alter table stb1 drop column new10",
"alter table stb1 drop column new2",
"alter table stb1 drop column new1",
"alter table `stb2` add tag new1 bool",
"alter table `stb2` rename tag new1 new1_new",
"alter table `stb2` modify tag t10 nchar(32)",
"alter table `stb2` drop tag new1_new",
"alter table `table` add column new1 bool",
"alter table `table` add column new2 tinyint",
"alter table `table` add column new10 nchar(16)",
"alter table `table` modify column new10 nchar(32)",
"alter table `table` rename column new10 new10_new",
"alter table `table` drop column new10_new",
"alter table `table` drop column new2",
"alter table `table` drop column new1",
"drop table `table`",
"drop table `tb2`, `tb1`",
"drop table `stb2`",
"drop table `stb1`",
])
.await?;
taos.exec_many([
"drop database if exists db2",
"create database if not exists db2 wal_retention_period 3600",
"use db2",
])
.await?;
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params
.insert("auto.offset.reset".to_string(), "earliest".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
consumer.subscribe(["ws_abc1"]).await?;
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let _ = offset.topic();
let _ = offset.database();
let _ = offset.vgroup_id();
match message {
MessageSet::Meta(meta) => {
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
while let Some(data) = data.fetch_raw_block().await? {
dbg!(data.table_name());
dbg!(data);
}
}
MessageSet::MetaData(meta, data) => {
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
dbg!(data.table_name());
dbg!(data);
}
}
}
consumer.commit(offset).await?;
}
}
consumer.unsubscribe().await;
tokio::time::sleep(Duration::from_secs(2)).await;
taos.exec_many([
"drop database db2",
"drop topic ws_abc1",
"drop database ws_abc1",
])
.await?;
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_tmq() -> taos_query::RawResult<()> {
use taos_query::prelude::*;
let dsn = "taos://localhost:6030".to_string();
log::info!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
taos.exec_many([
"drop topic if exists ws_abc1",
"drop database if exists ws_abc1",
"create database ws_abc1 wal_retention_period 3600",
"create topic ws_abc1 with meta as database ws_abc1",
"use ws_abc1",
"create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 json)",
"create table tb0 using stb1 tags('{\"name\":\"value\"}')",
"create table tb1 using stb1 tags(NULL)",
"insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)
tb1 values(now, true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
"create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)",
"create table `table` (ts timestamp, v int)",
"alter table stb1 add column new1 bool",
"alter table stb1 add column new2 tinyint",
"alter table stb1 add column new10 nchar(16)",
"alter table stb1 modify column new10 nchar(32)",
"alter table stb1 drop column new10",
"alter table stb1 drop column new2",
"alter table stb1 drop column new1",
"alter table `stb2` add tag new1 bool",
"alter table `stb2` rename tag new1 new1_new",
"alter table `stb2` modify tag t10 nchar(32)",
"alter table `stb2` drop tag new1_new",
"alter table `table` add column new1 bool",
"alter table `table` add column new2 tinyint",
"alter table `table` add column new10 nchar(16)",
"alter table `table` modify column new10 nchar(32)",
"alter table `table` rename column new10 new10_new",
"alter table `table` drop column new10_new",
"alter table `table` drop column new2",
"alter table `table` drop column new1",
])
.await?;
taos.exec_many([
"drop database if exists db2",
"create database if not exists db2 wal_retention_period 3600",
"use db2",
])
.await?;
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params
.insert("auto.offset.reset".to_string(), "earliest".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
consumer.subscribe(["ws_abc1"]).await?;
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::debug!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
log::debug!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::debug!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}
let assignments = consumer.assignments().await.unwrap();
log::debug!("assignments: {:?}", assignments);
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::debug!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::debug!("topic assignment: {:?}", topic_assignment);
}
let assignments = consumer.assignments().await.unwrap();
log::debug!("after seek offset assignments: {:?}", assignments);
consumer.unsubscribe().await;
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
"drop database db2",
"drop topic ws_abc1",
"drop database ws_abc1",
])
.await?;
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_tmq_offset() -> taos_query::RawResult<()> {
use taos_query::prelude::*;
let dsn = "tmq://localhost:6030?offset=10:20,11:40".to_string();
log::info!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
taos.exec_many([
"drop topic if exists ws_abc1",
"drop database if exists ws_abc1",
"create database ws_abc1 wal_retention_period 3600",
"create topic ws_abc1 with meta as database ws_abc1",
"use ws_abc1",
"create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 json)",
"create table tb0 using stb1 tags('{\"name\":\"value\"}')",
"create table tb1 using stb1 tags(NULL)",
"insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)
tb1 values(now, true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
"create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)",
"create table `table` (ts timestamp, v int)",
"alter table stb1 add column new1 bool",
"alter table stb1 add column new2 tinyint",
"alter table stb1 add column new10 nchar(16)",
"alter table stb1 modify column new10 nchar(32)",
"alter table stb1 drop column new10",
"alter table stb1 drop column new2",
"alter table stb1 drop column new1",
"alter table `stb2` add tag new1 bool",
"alter table `stb2` rename tag new1 new1_new",
"alter table `stb2` modify tag t10 nchar(32)",
"alter table `stb2` drop tag new1_new",
"alter table `table` add column new1 bool",
"alter table `table` add column new2 tinyint",
"alter table `table` add column new10 nchar(16)",
"alter table `table` modify column new10 nchar(32)",
"alter table `table` rename column new10 new10_new",
"alter table `table` drop column new10_new",
"alter table `table` drop column new2",
"alter table `table` drop column new1",
])
.await?;
taos.exec_many([
"drop database if exists db2",
"create database if not exists db2 wal_retention_period 3600",
"use db2",
])
.await?;
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params
.insert("auto.offset.reset".to_string(), "earliest".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
consumer.subscribe(["ws_abc1"]).await?;
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::debug!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
log::debug!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::debug!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}
let assignments = consumer.assignments().await.unwrap();
log::debug!("assignments: {:?}", assignments);
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::debug!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::debug!("topic assignment: {:?}", topic_assignment);
}
let assignments = consumer.assignments().await.unwrap();
log::debug!("after seek offset assignments: {:?}", assignments);
consumer.unsubscribe().await;
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
"drop database db2",
"drop topic ws_abc1",
"drop database ws_abc1",
])
.await?;
Ok(())
}
#[tokio::test]
async fn test_ws_tmq() -> taos_query::RawResult<()> {
use taos_query::prelude::*;
let dsn = "taosws://localhost:6041".to_string();
log::info!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
let db = "ws_tmq_1";
let db2 = "ws_tmq_1_dest";
taos.exec_many([
format!("drop topic if exists {db}").as_str(),
format!("drop database if exists {db}").as_str(),
format!("create database {db} wal_retention_period 1").as_str(),
format!("create topic {db} with meta as database {db}").as_str(),
format!("use {db}").as_str(),
"create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 json)",
"create table tb0 using stb1 tags('{\"name\":\"value\"}')",
"create table tb1 using stb1 tags(NULL)",
"insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)
tb1 values(now, true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
"create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)",
"create table `table` (ts timestamp, v int)",
])
.await?;
taos.exec_many([
format!("drop database if exists {db2}"),
format!("create database if not exists {db2} wal_retention_period 1"),
format!("use {db2}"),
])
.await?;
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params
.insert("auto.offset.reset".to_string(), "earliest".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
consumer.subscribe([db]).await?;
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::debug!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
log::error!("meta error: {}", err);
}
}
MessageSet::Data(data) => {
log::debug!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::debug!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("metadata error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}
let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments);
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::info!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::info!("topic assignment: {:?}", topic_assignment);
}
let assignments = consumer.assignments().await.unwrap();
log::info!("after seek offset assignments: {:?}", assignments);
consumer.unsubscribe().await;
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
format!("drop database {db2}"),
format!("drop topic {db}"),
format!("drop database {db}"),
])
.await?;
Ok(())
}
#[tokio::test]
async fn test_ws_flush_db() -> taos_query::RawResult<()> {
use taos_query::prelude::*;
let dsn = "taosws://localhost:6041".to_string();
log::info!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
let db = "ws_tmq_flush_1";
let db2 = "ws_tmq_flush_1_dest";
taos.exec_many([
format!("drop topic if exists {db}").as_str(),
format!("drop database if exists {db}").as_str(),
format!("create database {db} wal_retention_period 1").as_str(),
format!("create topic {db} with meta as database {db}").as_str(),
format!("use {db}").as_str(),
"create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 json)",
"create table tb0 using stb1 tags('{\"name\":\"value\"}')",
"create table tb1 using stb1 tags(NULL)",
"insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)
tb1 values(now, true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
"create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)",
"create table `table` (ts timestamp, v int)",
"alter table stb1 add column new1 bool",
"alter table stb1 add column new2 tinyint",
"alter table stb1 add column new10 nchar(16)",
"alter table stb1 modify column new10 nchar(32)",
"alter table stb1 drop column new10",
"alter table stb1 drop column new2",
"alter table stb1 drop column new1",
"alter table `stb2` add tag new1 bool",
"alter table `stb2` rename tag new1 new1_new",
"alter table `stb2` modify tag t10 nchar(32)",
"alter table `stb2` drop tag new1_new",
"alter table `table` add column new1 bool",
"alter table `table` add column new2 tinyint",
"alter table `table` add column new10 nchar(16)",
"alter table `table` modify column new10 nchar(32)",
"alter table `table` rename column new10 new10_new",
"alter table `table` drop column new10_new",
"alter table `table` drop column new2",
"alter table `table` drop column new1",
])
.await?;
for _ in 0..1000 {
taos.exec(
"insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)
tb1 values(now, true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
)
.await?;
}
taos.exec(format!("flush database {db}")).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
format!("drop database if exists {db2}"),
format!("create database if not exists {db2} wal_retention_period 1"),
format!("use {db2}"),
])
.await?;
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params
.insert("auto.offset.reset".to_string(), "earliest".to_string());
dsn.params.insert(
"experimental.snapshot.enable".to_string(),
"true".to_string(),
);
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
consumer.subscribe([db]).await?;
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::debug!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
log::error!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
log::debug!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::debug!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}
let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments);
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::info!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::info!("topic assignment: {:?}", topic_assignment);
}
let assignments = consumer.assignments().await.unwrap();
log::info!("after seek offset assignments: {:?}", assignments);
consumer.unsubscribe().await;
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
format!("drop database {db2}"),
format!("drop topic {db}"),
format!("drop database {db}"),
])
.await?;
Ok(())
}
#[tokio::test]
async fn test_ws_tmq_snapshot() -> taos_query::RawResult<()> {
use taos_query::prelude::*;
let dsn = "taosws://localhost:6041".to_string();
log::info!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
let db = "ws_abc1_snapshot";
let db2 = "ws_abc1_snapshot_dest";
taos.exec_many([
format!("drop topic if exists {db}").as_str(),
format!("drop database if exists {db}").as_str(),
format!("create database {db} wal_retention_period 3600").as_str(),
format!("create topic {db} with meta as database {db}").as_str(),
format!("use {db}").as_str(),
"create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 json)",
"create table tb0 using stb1 tags('{\"name\":\"value\"}')",
"create table tb1 using stb1 tags(NULL)",
"insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)
tb1 values(now, true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
"create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)",
"create table `table` (ts timestamp, v int)",
"alter table stb1 add column new1 bool",
"alter table stb1 add column new2 tinyint",
"alter table stb1 add column new10 nchar(16)",
"alter table stb1 modify column new10 nchar(32)",
"alter table stb1 drop column new10",
"alter table stb1 drop column new2",
"alter table stb1 drop column new1",
"alter table `stb2` add tag new1 bool",
"alter table `stb2` rename tag new1 new1_new",
"alter table `stb2` modify tag t10 nchar(32)",
"alter table `stb2` drop tag new1_new",
"alter table `table` add column new1 bool",
"alter table `table` add column new2 tinyint",
"alter table `table` add column new10 nchar(16)",
"alter table `table` modify column new10 nchar(32)",
"alter table `table` rename column new10 new10_new",
"alter table `table` drop column new10_new",
"alter table `table` drop column new2",
"alter table `table` drop column new1",
])
.await?;
for _ in 0..100 {
taos.exec(
"insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)
tb1 values(now, true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
)
.await?;
}
taos.exec_many([
format!("drop database if exists {db2}"),
format!("create database if not exists {db2} wal_retention_period 3600"),
format!("use {db2}"),
])
.await?;
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params
.insert("auto.offset.reset".to_string(), "earliest".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
consumer.subscribe([db]).await?;
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::debug!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
log::debug!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
log::debug!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::debug!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}
let assignments = consumer.assignments().await.unwrap();
dbg!(&assignments);
log::info!("assignments: {:?}", assignments);
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::info!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::info!("topic assignment: {:?}", topic_assignment);
}
let assignments = consumer.assignments().await.unwrap();
log::info!("after seek offset assignments: {:?}", assignments);
consumer.unsubscribe().await;
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
format!("drop database {db2}"),
format!("drop topic {db}"),
format!("drop database {db}"),
])
.await?;
Ok(())
}
#[tokio::test]
async fn test_ws_tmq_offset() -> taos_query::RawResult<()> {
use taos_query::prelude::*;
let dsn = "tmq+ws://localhost:6041?offset=10:20,11:40".to_string();
log::info!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
let db = "ws_tmq_abc2";
let db2 = "ws_tmq_abc2_dest";
taos.exec(format!("drop topic if exists {db}")).await?;
taos.exec(format!("drop database if exists {db}")).await?;
std::thread::sleep(std::time::Duration::from_secs(3));
taos.exec(format!(
"create database if not exists {db} wal_retention_period 3600"
))
.await?;
std::thread::sleep(std::time::Duration::from_secs(3));
taos.exec_many([
format!("create topic {db} with meta as database {db}").as_str(),
format!("use {db}").as_str(),
"create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 json)",
"create table tb0 using stb1 tags('{\"name\":\"value\"}')",
"create table tb1 using stb1 tags(NULL)",
"insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)
tb1 values(now, true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
"create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)",
"create table `table` (ts timestamp, v int)",
"alter table stb1 add column new1 bool",
"alter table stb1 add column new2 tinyint",
"alter table stb1 add column new10 nchar(16)",
"alter table stb1 modify column new10 nchar(32)",
"alter table stb1 drop column new10",
"alter table stb1 drop column new2",
"alter table stb1 drop column new1",
"alter table `stb2` add tag new1 bool",
"alter table `stb2` rename tag new1 new1_new",
"alter table `stb2` modify tag t10 nchar(32)",
"alter table `stb2` drop tag new1_new",
"alter table `table` add column new1 bool",
"alter table `table` add column new2 tinyint",
"alter table `table` add column new10 nchar(16)",
"alter table `table` modify column new10 nchar(32)",
"alter table `table` rename column new10 new10_new",
"alter table `table` drop column new10_new",
"alter table `table` drop column new2",
"alter table `table` drop column new1",
])
.await?;
taos.exec_many([
format!("drop database if exists {db2}"),
format!("create database if not exists {db2} wal_retention_period 3600"),
format!("use {db2}"),
])
.await?;
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params
.insert("auto.offset.reset".to_string(), "earliest".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
consumer.subscribe([db]).await?;
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::debug!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
log::debug!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::debug!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}
let assignments = consumer.assignments().await.unwrap();
log::debug!("assignments: {:?}", assignments);
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::debug!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::debug!("topic assignment: {:?}", topic_assignment);
}
let assignments = consumer.assignments().await.unwrap();
log::debug!("after seek offset assignments: {:?}", assignments);
consumer.unsubscribe().await;
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
format!("drop database {db2}"),
format!("drop topic {db}"),
format!("drop database {db}"),
])
.await?;
Ok(())
}
#[tokio::test]
async fn test_ws_tmq_committed() -> taos_query::RawResult<()> {
use taos_query::prelude::*;
let dsn = "tmq+ws://localhost:6041?".to_string();
log::info!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
let db = "ws_tmq_committed";
let db2 = "ws_tmq_committed_dest";
taos.exec(format!("drop topic if exists {db}")).await?;
taos.exec(format!("drop database if exists {db}")).await?;
std::thread::sleep(std::time::Duration::from_secs(1));
taos.exec(format!(
"create database if not exists {db} wal_retention_period 3600"
))
.await?;
std::thread::sleep(std::time::Duration::from_secs(1));
taos.exec_many([
format!("create topic {db} with meta as database {db}").as_str(),
format!("use {db}").as_str(),
"create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 json)",
"create table tb0 using stb1 tags('{\"name\":\"value\"}')",
"create table tb1 using stb1 tags(NULL)",
"insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)
tb1 values(now, true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
"create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)",
"create table `table` (ts timestamp, v int)",
"alter table stb1 add column new1 bool",
"alter table stb1 add column new2 tinyint",
"alter table stb1 add column new10 nchar(16)",
"alter table stb1 modify column new10 nchar(32)",
"alter table stb1 drop column new10",
"alter table stb1 drop column new2",
"alter table stb1 drop column new1",
"alter table `stb2` add tag new1 bool",
"alter table `stb2` rename tag new1 new1_new",
"alter table `stb2` modify tag t10 nchar(32)",
"alter table `stb2` drop tag new1_new",
"alter table `table` add column new1 bool",
"alter table `table` add column new2 tinyint",
"alter table `table` add column new10 nchar(16)",
"alter table `table` modify column new10 nchar(32)",
"alter table `table` rename column new10 new10_new",
"alter table `table` drop column new10_new",
"alter table `table` drop column new2",
"alter table `table` drop column new1",
"drop table `table`",
"drop table `tb2`, `tb1`",
"drop table `stb2`",
"drop table `stb1`",
])
.await?;
taos.exec_many([
format!("drop database if exists {db2}"),
format!("create database if not exists {db2} wal_retention_period 3600"),
format!("use {db2}"),
])
.await?;
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params
.insert("auto.offset.reset".to_string(), "earliest".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
let topics = consumer.list_topics().await?;
log::info!("topics: {:?}", topics);
consumer.subscribe([db]).await?;
let topics = consumer.list_topics().await?;
log::info!("topics: {:?}", topics);
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::debug!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
log::trace!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
log::debug!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::debug!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}
let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments);
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::info!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
let committed = consumer.committed(topic, vgroup_id).await?;
log::info!("committed: {:?}", committed);
let position = consumer.position(topic, vgroup_id).await?;
log::info!("position: {:?}", position);
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
let committed = consumer.committed(topic, vgroup_id).await?;
log::info!("after seek committed: {:?}", committed);
let position = consumer.position(topic, vgroup_id).await?;
log::info!("after seek position: {:?}", position);
let res = consumer.commit_offset(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("commit offset response: {:?}", res);
}
let committed = consumer.committed(topic, vgroup_id).await?;
log::info!("after commit committed: {:?}", committed);
let position = consumer.position(topic, vgroup_id).await?;
log::info!("after commit position: {:?}", position);
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::info!("topic assignment: {:?}", topic_assignment);
}
let assignments = consumer.assignments().await.unwrap();
log::debug!("after seek offset assignments: {:?}", assignments);
consumer.unsubscribe().await;
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
format!("drop database {db2}"),
format!("drop topic {db}"),
format!("drop database {db}"),
])
.await?;
Ok(())
}
}
#[cfg(feature = "deflate")]
#[cfg(test)]
mod tmq_deflate_tests {
use crate::{
query::infra::{ToMessage, WsRecv, WsSend},
*,
};
use futures::{SinkExt, StreamExt};
use std::time::Duration;
use tracing::*;
use tracing_subscriber::util::SubscriberInitExt;
use ws_tool::frame::OpCode;
#[cfg(feature = "deflate")]
#[tokio::test]
async fn test_build_stream_with_deflate() -> Result<(), anyhow::Error> {
let _subscriber = tracing_subscriber::fmt::fmt()
.with_max_level(Level::DEBUG)
.with_file(true)
.with_line_number(true)
.finish();
let _ = _subscriber.try_init();
let dsn = std::env::var("TEST_CLOUD_DSN").unwrap_or("http://localhost:6041".to_string());
let builder = TaosBuilder::from_dsn(dsn).unwrap();
let url = builder.to_query_url();
let ws = builder.ws_tool_build_stream(url).await.unwrap();
let (mut sink, mut source) = ws.split();
let version = WsSend::Version;
source
.send(OpCode::Text, &serde_json::to_vec(&version)?)
.await?;
let _handle = tokio::spawn(async move {
loop {
let frame = sink.receive().await.unwrap();
let (header, payload) = frame;
trace!("header.code: {:?}, payload: {:?}", &header.code, &payload);
let code = header.code;
match code {
OpCode::Binary => {
println!("{:?}", payload);
}
OpCode::Text => {
let recv: crate::query::infra::WsRecv =
serde_json::from_slice(&payload).unwrap();
info!("recv: {:?}", recv);
assert_eq!(recv.code, 0);
}
_ => (),
}
}
});
tokio::time::sleep(Duration::from_millis(1000)).await;
Ok(())
}
#[cfg(feature = "deflate")]
#[tokio::test]
async fn test_ws_tmq_deflate() -> taos_query::RawResult<()> {
use taos_query::prelude::*;
let dsn = "tmq+ws://localhost:6041?".to_string();
log::trace!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
let db = "ws_tmq_deflate";
let db2 = "ws_tmq_deflate_dest";
taos.exec(format!("drop topic if exists {db}")).await?;
taos.exec(format!("drop database if exists {db}")).await?;
std::thread::sleep(std::time::Duration::from_secs(1));
taos.exec(format!(
"create database if not exists {db} wal_retention_period 3600"
))
.await?;
std::thread::sleep(std::time::Duration::from_secs(1));
taos.exec_many([
format!("create topic {db} with meta as database {db}").as_str(),
format!("use {db}").as_str(),
"create table stb1(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(16),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 json)",
"create table tb0 using stb1 tags('{\"name\":\"value\"}')",
"create table tb1 using stb1 tags(NULL)",
"insert into tb0 values(now, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)
tb1 values(now, true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table stb2(ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,\
c6 timestamp, c7 float, c8 double, c9 varchar(10), c10 nchar(10),\
c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned)\
tags(t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint,\
t6 timestamp, t7 float, t8 double, t9 varchar(10), t10 nchar(16),\
t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)",
"create table tb2 using stb2 tags(true, -2, -3, -4, -5, \
'2022-02-02 02:02:02.222', -0.1, -0.12345678910, 'abc 和我', 'Unicode + 涛思',\
254, 65534, 1, 1)",
"create table tb3 using stb2 tags( NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL)",
"create table `table` (ts timestamp, v int)",
"alter table stb1 add column new1 bool",
"alter table stb1 add column new2 tinyint",
"alter table stb1 add column new10 nchar(16)",
"alter table stb1 modify column new10 nchar(32)",
"alter table stb1 drop column new10",
"alter table stb1 drop column new2",
"alter table stb1 drop column new1",
"alter table `stb2` add tag new1 bool",
"alter table `stb2` rename tag new1 new1_new",
"alter table `stb2` modify tag t10 nchar(32)",
"alter table `stb2` drop tag new1_new",
"alter table `table` add column new1 bool",
"alter table `table` add column new2 tinyint",
"alter table `table` add column new10 nchar(16)",
"alter table `table` modify column new10 nchar(32)",
"alter table `table` rename column new10 new10_new",
"alter table `table` drop column new10_new",
"alter table `table` drop column new2",
"alter table `table` drop column new1",
"drop table `table`",
"drop table `tb2`, `tb1`",
"drop table `stb2`",
"drop table `stb1`",
])
.await?;
taos.exec_many([
format!("drop database if exists {db2}"),
format!("create database if not exists {db2} wal_retention_period 3600"),
format!("use {db2}"),
])
.await?;
dsn.params.insert("group.id".to_string(), "ws_tmq_deflate_1".to_string());
dsn.params
.insert("auto.offset.reset".to_string(), "earliest".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
let topics = consumer.list_topics().await?;
log::info!("topics: {:?}", topics);
consumer.subscribe([db]).await?;
let topics = consumer.list_topics().await?;
log::info!("topics: {:?}", topics);
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::debug!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
log::trace!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
log::debug!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::debug!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("table_name: {:?}", data.table_name());
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}
let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments);
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::info!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
let committed = consumer.committed(topic, vgroup_id).await?;
log::info!("committed: {:?}", committed);
let position = consumer.position(topic, vgroup_id).await?;
log::info!("position: {:?}", position);
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
let committed = consumer.committed(topic, vgroup_id).await?;
log::info!("after seek committed: {:?}", committed);
let position = consumer.position(topic, vgroup_id).await?;
log::info!("after seek position: {:?}", position);
let res = consumer.commit_offset(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("commit offset response: {:?}", res);
}
let committed = consumer.committed(topic, vgroup_id).await?;
log::info!("after commit committed: {:?}", committed);
let position = consumer.position(topic, vgroup_id).await?;
log::info!("after commit position: {:?}", position);
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::info!("topic assignment: {:?}", topic_assignment);
}
let assignments = consumer.assignments().await.unwrap();
log::debug!("after seek offset assignments: {:?}", assignments);
consumer.unsubscribe().await;
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
format!("drop database {db2}"),
format!("drop topic {db}"),
format!("drop database {db}"),
])
.await?;
Ok(())
}
}