1use once_cell::sync::OnceCell;
2use taos_query::common::SmlData;
3use taos_query::prelude::RawResult;
4use taos_query::{common::RawMeta, AsyncQueryable};
5
6pub mod asyn;
7pub(crate) mod infra;
8pub use asyn::Error;
11pub use asyn::ResultSet;
12pub(crate) use asyn::WsTaos;
13pub(crate) use infra::WsConnReq;
14
15use crate::TaosBuilder;
16
17#[derive(Debug)]
18pub struct Taos {
19 pub(crate) dsn: TaosBuilder,
20 pub(crate) async_client: OnceCell<WsTaos>,
21 pub(crate) async_sml: OnceCell<crate::schemaless::WsTaos>,
22}
23
24impl Taos {
25 pub fn version(&self) -> &str {
26 crate::block_in_place_or_global(self.client()).version()
27 }
28
29 async fn client(&self) -> &WsTaos {
30 if let Some(ws) = self.async_client.get() {
31 ws
32 } else {
33 let async_client = WsTaos::from_wsinfo(&self.dsn).await.unwrap();
34 self.async_client.get_or_init(|| async_client)
35 }
36 }
37}
38
39unsafe impl Send for Taos {}
40
41unsafe impl Sync for Taos {}
42
43#[async_trait::async_trait]
44impl taos_query::AsyncQueryable for Taos {
45 type AsyncResultSet = asyn::ResultSet;
46
47 async fn query<T: AsRef<str> + Send + Sync>(&self, sql: T) -> RawResult<Self::AsyncResultSet> {
48 if let Some(ws) = self.async_client.get() {
49 ws.s_query(sql.as_ref()).await
50 } else {
51 let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
52 self.async_client
53 .get_or_init(|| async_client)
54 .s_query(sql.as_ref())
55 .await
56 }
57 }
58
59 async fn query_with_req_id<T: AsRef<str> + Send + Sync>(
60 &self,
61 sql: T,
62 req_id: u64,
63 ) -> RawResult<Self::AsyncResultSet> {
64 if let Some(ws) = self.async_client.get() {
65 ws.s_query_with_req_id(sql.as_ref(), req_id).await
66 } else {
67 let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
68 self.async_client
69 .get_or_init(|| async_client)
70 .s_query_with_req_id(sql.as_ref(), req_id)
71 .await
72 }
73 }
74
75 async fn write_raw_meta(&self, raw: &RawMeta) -> RawResult<()> {
76 if let Some(ws) = self.async_client.get() {
77 ws.write_meta(raw).await
78 } else {
79 let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
80 self.async_client
81 .get_or_init(|| async_client)
82 .write_meta(raw)
83 .await
84 }
85 }
86
87 async fn write_raw_block(&self, block: &taos_query::RawBlock) -> RawResult<()> {
88 if let Some(ws) = self.async_client.get() {
89 ws.write_raw_block(block).await
90 } else {
91 let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
92 self.async_client
93 .get_or_init(|| async_client)
94 .write_raw_block(block)
95 .await
96 }
97 }
98
99 async fn write_raw_block_with_req_id(
100 &self,
101 block: &taos_query::RawBlock,
102 req_id: u64,
103 ) -> RawResult<()> {
104 if let Some(ws) = self.async_client.get() {
105 ws.write_raw_block_with_req_id(block, req_id).await
106 } else {
107 let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
108 self.async_client
109 .get_or_init(|| async_client)
110 .write_raw_block_with_req_id(block, req_id)
111 .await
112 }
113 }
114
115 async fn put(&self, data: &SmlData) -> RawResult<()> {
116 if let Some(ws) = self.async_sml.get() {
117 ws.s_put(data).await
118 } else {
119 let async_sml = crate::schemaless::WsTaos::from_wsinfo(&self.dsn).await?;
120 self.async_sml.get_or_init(|| async_sml).s_put(data).await
121 }
122 }
123}
124
125impl taos_query::Queryable for Taos {
126 type ResultSet = asyn::ResultSet;
127
128 fn query<T: AsRef<str>>(&self, sql: T) -> RawResult<Self::ResultSet> {
129 let sql = sql.as_ref();
130 taos_query::block_in_place_or_global(<Self as AsyncQueryable>::query(self, sql))
131 }
132
133 fn query_with_req_id<T: AsRef<str>>(&self, sql: T, req_id: u64) -> RawResult<Self::ResultSet> {
134 let sql = sql.as_ref();
135 taos_query::block_in_place_or_global(<Self as AsyncQueryable>::query_with_req_id(
136 self, sql, req_id,
137 ))
138 }
139
140 fn write_raw_meta(&self, meta: &RawMeta) -> RawResult<()> {
141 crate::block_in_place_or_global(<Self as AsyncQueryable>::write_raw_meta(self, meta))
142 }
143
144 fn write_raw_block(&self, block: &taos_query::RawBlock) -> RawResult<()> {
145 crate::block_in_place_or_global(<Self as AsyncQueryable>::write_raw_block(self, block))
146 }
147
148 fn write_raw_block_with_req_id(
149 &self,
150 block: &taos_query::RawBlock,
151 req_id: u64,
152 ) -> RawResult<()> {
153 crate::block_in_place_or_global(<Self as AsyncQueryable>::write_raw_block_with_req_id(
154 self, block, req_id,
155 ))
156 }
157
158 fn put(&self, sml_data: &SmlData) -> RawResult<()> {
159 crate::block_in_place_or_global(<Self as AsyncQueryable>::put(self, sml_data))
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use crate::TaosBuilder;
166 use bytes::Bytes;
167 use taos_query::util::hex::*;
168
169 #[test]
170 fn ws_sync_json() -> anyhow::Result<()> {
171 std::env::set_var("RUST_LOG", "debug");
172 use taos_query::prelude::sync::*;
174 let client = TaosBuilder::from_dsn("taosws://localhost:6041/")?.build()?;
175 let db = "ws_sync_json";
176 assert_eq!(client.exec(format!("drop database if exists {db}"))?, 0);
177 assert_eq!(client.exec(format!("create database {db} keep 36500"))?, 0);
178 assert_eq!(
179 client.exec(
180 format!("create table {db}.stb1(ts timestamp,\
181 b1 bool, c8i1 tinyint, c16i1 smallint, c32i1 int, c64i1 bigint,\
182 c8u1 tinyint unsigned, c16u1 smallint unsigned, c32u1 int unsigned, c64u1 bigint unsigned,\
183 cb1 binary(100), cn1 nchar(10), cvb1 varbinary(50), cg1 geometry(50),
184
185 b2 bool, c8i2 tinyint, c16i2 smallint, c32i2 int, c64i2 bigint,\
186 c8u2 tinyint unsigned, c16u2 smallint unsigned, c32u2 int unsigned, c64u2 bigint unsigned,\
187 cb2 binary(10), cn2 nchar(16), cvb2 varbinary(50), cg2 geometry(50)) tags (jt json)")
188 )?,
189 0
190 );
191 assert_eq!(
192 client.exec(format!(
193 r#"insert into {db}.tb1 using {db}.stb1 tags('{{"key":"数据"}}')
194 values(0, true, -1, -2, -3, -4, 1, 2, 3, 4, 'abc', '涛思', '\x123456', 'POINT(1 2)',
195 false,-5, -6, -7, -8, 5, 6, 7, 8, 'def', '数据', '\x654321', 'POINT(3 4)')
196 (65535,NULL, NULL,NULL,NULL,NULL, NULL,NULL,NULL,NULL, NULL, NULL, NULL, NULL,
197 NULL, NULL,NULL,NULL,NULL, NULL,NULL,NULL,NULL, NULL, NULL, NULL, NULL)"#
198 ))?,
199 2
200 );
201 assert_eq!(
202 client.exec(format!(
203 r#"insert into {db}.tb2 using {db}.stb1 tags(NULL)
204 values(1, true, -1, -2, -3, -4, 1, 2, 3, 4, 'abc', '涛思', '\x123456', 'POINT(1 2)',
205 false,-5, -6, -7, -8, 5, 6, 7, 8, 'def', '数据', '\x654321', 'POINT(3 4)')
206 (65536,NULL, NULL,NULL,NULL,NULL, NULL,NULL,NULL,NULL, NULL, NULL, NULL, NULL,
207 NULL, NULL,NULL,NULL,NULL, NULL,NULL,NULL,NULL, NULL, NULL, NULL, NULL)"#
208 ))?,
209 2
210 );
211
212 let mut rs = client.query(format!("select * from {db}.tb1 order by ts limit 1"))?;
214
215 #[derive(Debug, serde::Deserialize, PartialEq, Eq)]
216 #[allow(dead_code)]
217 struct A {
218 ts: String,
219 b1: bool,
220 c8i1: i8,
221 c16i1: i16,
222 c32i1: i32,
223 c64i1: i64,
224 c8u1: u8,
225 c16u1: u16,
226 c32u1: u32,
227 c64u1: u64,
228
229 c8i2: i8,
230 c16i2: i16,
231 c32i2: i32,
232 c64i2: i64,
233 c8u2: u8,
234 c16u2: u16,
235 c32u2: u32,
236 c64u2: u64,
237
238 cb1: String,
239 cb2: String,
240 cn1: String,
241 cn2: String,
242
243 cvb1: Bytes,
244 cvb2: Bytes,
245 cg1: Bytes,
246 cg2: Bytes,
247 }
248
249 use itertools::Itertools;
250 let values: Vec<A> = rs.deserialize::<A>().try_collect()?;
251
252 dbg!(&values);
253
254 assert_eq!(
255 values[0],
256 A {
257 ts: "1970-01-01T08:00:00+08:00".to_string(),
258 b1: true,
259 c8i1: -1,
260 c16i1: -2,
261 c32i1: -3,
262 c64i1: -4,
263 c8u1: 1,
264 c16u1: 2,
265 c32u1: 3,
266 c64u1: 4,
267 c8i2: -5,
268 c16i2: -6,
269 c32i2: -7,
270 c64i2: -8,
271 c8u2: 5,
272 c16u2: 6,
273 c32u2: 7,
274 c64u2: 8,
275 cb1: "abc".to_string(),
276 cb2: "def".to_string(),
277 cn1: "涛思".to_string(),
278 cn2: "数据".to_string(),
279 cvb1: Bytes::from(vec![0x12, 0x34, 0x56]),
280 cvb2: Bytes::from(vec![0x65, 0x43, 0x21]),
281 cg1: hex_string_to_bytes("0101000000000000000000F03F0000000000000040"),
282 cg2: hex_string_to_bytes("010100000000000000000008400000000000001040"),
283 }
284 );
285
286 assert_eq!(client.exec(format!("drop database {db}"))?, 0);
287 Ok(())
288 }
289
290 #[test]
291 fn ws_sync() -> anyhow::Result<()> {
292 use bytes::Bytes;
293 use taos_query::prelude::sync::*;
294 use taos_query::util::hex::*;
295
296 let client = TaosBuilder::from_dsn("ws://localhost:6041/")?.build()?;
297 assert_eq!(client.exec("drop database if exists ws_sync")?, 0);
298 assert_eq!(client.exec("create database ws_sync keep 36500")?, 0);
299 assert_eq!(
300 client.exec(
301 "create table ws_sync.tb1(ts timestamp,\
302 c8i1 tinyint, c16i1 smallint, c32i1 int, c64i1 bigint,\
303 c8u1 tinyint unsigned, c16u1 smallint unsigned, c32u1 int unsigned, c64u1 bigint unsigned,\
304 cb1 binary(100), cn1 nchar(10), cvb1 varbinary(50), cg1 geometry(50),\
305 c8i2 tinyint, c16i2 smallint, c32i2 int, c64i2 bigint,\
306 c8u2 tinyint unsigned, c16u2 smallint unsigned, c32u2 int unsigned, c64u2 bigint unsigned,\
307 cb2 binary(10), cn2 nchar(16), cvb2 varbinary(50), cg2 geometry(50))"
308 )?,
309 0
310 );
311 assert_eq!(
312 client.exec(
313 r#"insert into ws_sync.tb1 values(65535,
314 -1,-2,-3,-4, 1,2,3,4, 'abc', '涛思', '\x123456', 'POINT(1 2)',
315 -5,-6,-7,-8, 5,6,7,8, 'def', '数据', '\x654321', 'POINT(3 4)')"#
316 )?,
317 1
318 );
319
320 let mut rs = client.query("select * from ws_sync.tb1")?;
321
322 #[derive(Debug, serde::Deserialize, PartialEq, Eq)]
323 #[allow(dead_code)]
324 struct A {
325 ts: String,
326 c8i1: i8,
327 c16i1: i16,
328 c32i1: i32,
329 c64i1: i64,
330 c8u1: u8,
331 c16u1: u16,
332 c32u1: u32,
333 c64u1: u64,
334
335 c8i2: i8,
336 c16i2: i16,
337 c32i2: i32,
338 c64i2: i64,
339 c8u2: u8,
340 c16u2: u16,
341 c32u2: u32,
342 c64u2: u64,
343
344 cb1: String,
345 cb2: String,
346 cn1: String,
347 cn2: String,
348
349 cvb1: Bytes,
350 cvb2: Bytes,
351 cg1: Bytes,
352 cg2: Bytes,
353 }
354
355 use itertools::Itertools;
356 let values: Vec<A> = rs.deserialize::<A>().try_collect()?;
357
358 dbg!(&values);
359
360 assert_eq!(
361 values[0],
362 A {
363 ts: "1970-01-01T08:01:05.535+08:00".to_string(),
364 c8i1: -1,
365 c16i1: -2,
366 c32i1: -3,
367 c64i1: -4,
368 c8u1: 1,
369 c16u1: 2,
370 c32u1: 3,
371 c64u1: 4,
372 c8i2: -5,
373 c16i2: -6,
374 c32i2: -7,
375 c64i2: -8,
376 c8u2: 5,
377 c16u2: 6,
378 c32u2: 7,
379 c64u2: 8,
380 cb1: "abc".to_string(),
381 cb2: "def".to_string(),
382 cn1: "涛思".to_string(),
383 cn2: "数据".to_string(),
384 cvb1: Bytes::from(vec![0x12, 0x34, 0x56]),
385 cvb2: Bytes::from(vec![0x65, 0x43, 0x21]),
386 cg1: hex_string_to_bytes("0101000000000000000000F03F0000000000000040"),
387 cg2: hex_string_to_bytes("010100000000000000000008400000000000001040"),
388 }
389 );
390
391 assert_eq!(client.exec("drop database ws_sync")?, 0);
392 Ok(())
393 }
394
395 #[test]
396 fn ws_show_databases() -> anyhow::Result<()> {
397 use taos_query::prelude::sync::*;
398 let dsn = std::env::var("TEST_DSN").unwrap_or("taos:///".to_string());
399
400 let client = TaosBuilder::from_dsn(dsn)?.build()?;
401 let mut rs = client.query("show databases")?;
402 let values = rs.to_rows_vec()?;
403
404 dbg!(values);
405 Ok(())
406 }
407
408 async fn _ws_select_from_meters() -> anyhow::Result<()> {
410 std::env::set_var("RUST_LOG", "info");
411 use taos_query::prelude::*;
413 let dsn = "taos+ws:///test";
414 let client = TaosBuilder::from_dsn(dsn)?.build().await?;
415
416 let mut rs = client.query("select * from meters").await?;
417
418 let mut blocks = rs.blocks();
419 let mut nb = 0;
420 let mut nr = 0;
421 while let Some(block) = blocks.try_next().await? {
422 nb += 1;
423 nr += block.nrows();
424 }
425 let summary = rs.summary();
426 dbg!(summary, (nb, nr));
427 Ok(())
428 }
429
430 #[cfg(feature = "async")]
431 #[tokio::test]
432 async fn test_client() -> anyhow::Result<()> {
433 std::env::set_var("RUST_LOG", "debug");
434 use futures::TryStreamExt;
436 use taos_query::{AsyncFetchable, AsyncQueryable};
437
438 let client = TaosBuilder::from_dsn("ws://localhost:6041/")?.build()?;
439 assert_eq!(
440 client
441 .exec("create database if not exists ws_test_client")
442 .await?,
443 0
444 );
445 assert_eq!(
446 client
447 .exec("create table if not exists ws_test_client.tb1(ts timestamp, v int)")
448 .await?,
449 0
450 );
451 assert_eq!(
452 client
453 .exec("insert into ws_test_client.tb1 values(1655793421375, 1)")
454 .await?,
455 1
456 );
457
458 let mut rs = client.query("select * from ws_test_client.tb1").await?;
460
461 #[derive(Debug, serde::Deserialize)]
462 #[allow(dead_code)]
463 struct A {
464 ts: String,
465 v: i32,
466 }
467
468 let values: Vec<A> = rs.deserialize_stream().try_collect().await?;
469
470 dbg!(values);
471
472 assert_eq!(client.exec("drop database ws_test_client").await?, 0);
473 Ok(())
474 }
475}