taos_ws/query/
mod.rs

1use once_cell::sync::OnceCell;
2use taos_query::common::SmlData;
3use taos_query::prelude::RawResult;
4use taos_query::{common::RawMeta, AsyncQueryable};
5
6pub mod asyn;
7pub(crate) mod infra;
8// pub mod sync;
9
10pub use asyn::Error;
11pub use asyn::ResultSet;
12pub(crate) use asyn::WsTaos;
13pub(crate) use infra::WsConnReq;
14
15use crate::TaosBuilder;
16
17#[derive(Debug)]
18pub struct Taos {
19    pub(crate) dsn: TaosBuilder,
20    pub(crate) async_client: OnceCell<WsTaos>,
21    pub(crate) async_sml: OnceCell<crate::schemaless::WsTaos>,
22}
23
24impl Taos {
25    pub fn version(&self) -> &str {
26        crate::block_in_place_or_global(self.client()).version()
27    }
28
29    async fn client(&self) -> &WsTaos {
30        if let Some(ws) = self.async_client.get() {
31            ws
32        } else {
33            let async_client = WsTaos::from_wsinfo(&self.dsn).await.unwrap();
34            self.async_client.get_or_init(|| async_client)
35        }
36    }
37}
38
39unsafe impl Send for Taos {}
40
41unsafe impl Sync for Taos {}
42
43#[async_trait::async_trait]
44impl taos_query::AsyncQueryable for Taos {
45    type AsyncResultSet = asyn::ResultSet;
46
47    async fn query<T: AsRef<str> + Send + Sync>(&self, sql: T) -> RawResult<Self::AsyncResultSet> {
48        if let Some(ws) = self.async_client.get() {
49            ws.s_query(sql.as_ref()).await
50        } else {
51            let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
52            self.async_client
53                .get_or_init(|| async_client)
54                .s_query(sql.as_ref())
55                .await
56        }
57    }
58
59    async fn query_with_req_id<T: AsRef<str> + Send + Sync>(
60        &self,
61        sql: T,
62        req_id: u64,
63    ) -> RawResult<Self::AsyncResultSet> {
64        if let Some(ws) = self.async_client.get() {
65            ws.s_query_with_req_id(sql.as_ref(), req_id).await
66        } else {
67            let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
68            self.async_client
69                .get_or_init(|| async_client)
70                .s_query_with_req_id(sql.as_ref(), req_id)
71                .await
72        }
73    }
74
75    async fn write_raw_meta(&self, raw: &RawMeta) -> RawResult<()> {
76        if let Some(ws) = self.async_client.get() {
77            ws.write_meta(raw).await
78        } else {
79            let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
80            self.async_client
81                .get_or_init(|| async_client)
82                .write_meta(raw)
83                .await
84        }
85    }
86
87    async fn write_raw_block(&self, block: &taos_query::RawBlock) -> RawResult<()> {
88        if let Some(ws) = self.async_client.get() {
89            ws.write_raw_block(block).await
90        } else {
91            let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
92            self.async_client
93                .get_or_init(|| async_client)
94                .write_raw_block(block)
95                .await
96        }
97    }
98
99    async fn write_raw_block_with_req_id(
100        &self,
101        block: &taos_query::RawBlock,
102        req_id: u64,
103    ) -> RawResult<()> {
104        if let Some(ws) = self.async_client.get() {
105            ws.write_raw_block_with_req_id(block, req_id).await
106        } else {
107            let async_client = WsTaos::from_wsinfo(&self.dsn).await?;
108            self.async_client
109                .get_or_init(|| async_client)
110                .write_raw_block_with_req_id(block, req_id)
111                .await
112        }
113    }
114
115    async fn put(&self, data: &SmlData) -> RawResult<()> {
116        if let Some(ws) = self.async_sml.get() {
117            ws.s_put(data).await
118        } else {
119            let async_sml = crate::schemaless::WsTaos::from_wsinfo(&self.dsn).await?;
120            self.async_sml.get_or_init(|| async_sml).s_put(data).await
121        }
122    }
123}
124
125impl taos_query::Queryable for Taos {
126    type ResultSet = asyn::ResultSet;
127
128    fn query<T: AsRef<str>>(&self, sql: T) -> RawResult<Self::ResultSet> {
129        let sql = sql.as_ref();
130        taos_query::block_in_place_or_global(<Self as AsyncQueryable>::query(self, sql))
131    }
132
133    fn query_with_req_id<T: AsRef<str>>(&self, sql: T, req_id: u64) -> RawResult<Self::ResultSet> {
134        let sql = sql.as_ref();
135        taos_query::block_in_place_or_global(<Self as AsyncQueryable>::query_with_req_id(
136            self, sql, req_id,
137        ))
138    }
139
140    fn write_raw_meta(&self, meta: &RawMeta) -> RawResult<()> {
141        crate::block_in_place_or_global(<Self as AsyncQueryable>::write_raw_meta(self, meta))
142    }
143
144    fn write_raw_block(&self, block: &taos_query::RawBlock) -> RawResult<()> {
145        crate::block_in_place_or_global(<Self as AsyncQueryable>::write_raw_block(self, block))
146    }
147
148    fn write_raw_block_with_req_id(
149        &self,
150        block: &taos_query::RawBlock,
151        req_id: u64,
152    ) -> RawResult<()> {
153        crate::block_in_place_or_global(<Self as AsyncQueryable>::write_raw_block_with_req_id(
154            self, block, req_id,
155        ))
156    }
157
158    fn put(&self, sml_data: &SmlData) -> RawResult<()> {
159        crate::block_in_place_or_global(<Self as AsyncQueryable>::put(self, sml_data))
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use crate::TaosBuilder;
166    use bytes::Bytes;
167    use taos_query::util::hex::*;
168
169    #[test]
170    fn ws_sync_json() -> anyhow::Result<()> {
171        std::env::set_var("RUST_LOG", "debug");
172        // pretty_env_logger::init();
173        use taos_query::prelude::sync::*;
174        let client = TaosBuilder::from_dsn("taosws://localhost:6041/")?.build()?;
175        let db = "ws_sync_json";
176        assert_eq!(client.exec(format!("drop database if exists {db}"))?, 0);
177        assert_eq!(client.exec(format!("create database {db} keep 36500"))?, 0);
178        assert_eq!(
179            client.exec(
180                format!("create table {db}.stb1(ts timestamp,\
181                    b1 bool, c8i1 tinyint, c16i1 smallint, c32i1 int, c64i1 bigint,\
182                    c8u1 tinyint unsigned, c16u1 smallint unsigned, c32u1 int unsigned, c64u1 bigint unsigned,\
183                    cb1 binary(100), cn1 nchar(10), cvb1 varbinary(50), cg1 geometry(50),
184
185                    b2 bool, c8i2 tinyint, c16i2 smallint, c32i2 int, c64i2 bigint,\
186                    c8u2 tinyint unsigned, c16u2 smallint unsigned, c32u2 int unsigned, c64u2 bigint unsigned,\
187                    cb2 binary(10), cn2 nchar(16), cvb2 varbinary(50), cg2 geometry(50)) tags (jt json)")
188            )?,
189            0
190        );
191        assert_eq!(
192            client.exec(format!(
193                r#"insert into {db}.tb1 using {db}.stb1 tags('{{"key":"数据"}}')
194                 values(0,    true, -1,  -2,  -3,  -4,   1,   2,   3,   4,   'abc', '涛思', '\x123456', 'POINT(1 2)',
195                              false,-5,  -6,  -7,  -8,   5,   6,   7,   8,   'def', '数据', '\x654321', 'POINT(3 4)')
196                       (65535,NULL, NULL,NULL,NULL,NULL, NULL,NULL,NULL,NULL, NULL,  NULL, NULL,  NULL,
197                              NULL, NULL,NULL,NULL,NULL, NULL,NULL,NULL,NULL, NULL,  NULL, NULL,  NULL)"#
198            ))?,
199            2
200        );
201        assert_eq!(
202            client.exec(format!(
203                r#"insert into {db}.tb2 using {db}.stb1 tags(NULL)
204                       values(1,    true, -1,  -2,  -3,  -4,   1,   2,   3,   4,   'abc', '涛思', '\x123456', 'POINT(1 2)',
205                                    false,-5,  -6,  -7,  -8,   5,   6,   7,   8,   'def', '数据', '\x654321', 'POINT(3 4)')
206                             (65536,NULL, NULL,NULL,NULL,NULL, NULL,NULL,NULL,NULL, NULL,  NULL, NULL,  NULL,
207                                    NULL, NULL,NULL,NULL,NULL, NULL,NULL,NULL,NULL, NULL,  NULL, NULL,  NULL)"#
208            ))?,
209            2
210        );
211
212        // let mut rs = client.s_query("select * from ws_sync.tb1").unwrap().unwrap();
213        let mut rs = client.query(format!("select * from {db}.tb1 order by ts limit 1"))?;
214
215        #[derive(Debug, serde::Deserialize, PartialEq, Eq)]
216        #[allow(dead_code)]
217        struct A {
218            ts: String,
219            b1: bool,
220            c8i1: i8,
221            c16i1: i16,
222            c32i1: i32,
223            c64i1: i64,
224            c8u1: u8,
225            c16u1: u16,
226            c32u1: u32,
227            c64u1: u64,
228
229            c8i2: i8,
230            c16i2: i16,
231            c32i2: i32,
232            c64i2: i64,
233            c8u2: u8,
234            c16u2: u16,
235            c32u2: u32,
236            c64u2: u64,
237
238            cb1: String,
239            cb2: String,
240            cn1: String,
241            cn2: String,
242
243            cvb1: Bytes,
244            cvb2: Bytes,
245            cg1: Bytes,
246            cg2: Bytes,
247        }
248
249        use itertools::Itertools;
250        let values: Vec<A> = rs.deserialize::<A>().try_collect()?;
251
252        dbg!(&values);
253
254        assert_eq!(
255            values[0],
256            A {
257                ts: "1970-01-01T08:00:00+08:00".to_string(),
258                b1: true,
259                c8i1: -1,
260                c16i1: -2,
261                c32i1: -3,
262                c64i1: -4,
263                c8u1: 1,
264                c16u1: 2,
265                c32u1: 3,
266                c64u1: 4,
267                c8i2: -5,
268                c16i2: -6,
269                c32i2: -7,
270                c64i2: -8,
271                c8u2: 5,
272                c16u2: 6,
273                c32u2: 7,
274                c64u2: 8,
275                cb1: "abc".to_string(),
276                cb2: "def".to_string(),
277                cn1: "涛思".to_string(),
278                cn2: "数据".to_string(),
279                cvb1: Bytes::from(vec![0x12, 0x34, 0x56]),
280                cvb2: Bytes::from(vec![0x65, 0x43, 0x21]),
281                cg1: hex_string_to_bytes("0101000000000000000000F03F0000000000000040"),
282                cg2: hex_string_to_bytes("010100000000000000000008400000000000001040"),
283            }
284        );
285
286        assert_eq!(client.exec(format!("drop database {db}"))?, 0);
287        Ok(())
288    }
289
290    #[test]
291    fn ws_sync() -> anyhow::Result<()> {
292        use bytes::Bytes;
293        use taos_query::prelude::sync::*;
294        use taos_query::util::hex::*;
295
296        let client = TaosBuilder::from_dsn("ws://localhost:6041/")?.build()?;
297        assert_eq!(client.exec("drop database if exists ws_sync")?, 0);
298        assert_eq!(client.exec("create database ws_sync keep 36500")?, 0);
299        assert_eq!(
300            client.exec(
301                "create table ws_sync.tb1(ts timestamp,\
302                    c8i1 tinyint, c16i1 smallint, c32i1 int, c64i1 bigint,\
303                    c8u1 tinyint unsigned, c16u1 smallint unsigned, c32u1 int unsigned, c64u1 bigint unsigned,\
304                    cb1 binary(100), cn1 nchar(10), cvb1 varbinary(50), cg1 geometry(50),\
305                    c8i2 tinyint, c16i2 smallint, c32i2 int, c64i2 bigint,\
306                    c8u2 tinyint unsigned, c16u2 smallint unsigned, c32u2 int unsigned, c64u2 bigint unsigned,\
307                    cb2 binary(10), cn2 nchar(16), cvb2 varbinary(50), cg2 geometry(50))"
308            )?,
309            0
310        );
311        assert_eq!(
312            client.exec(
313                r#"insert into ws_sync.tb1 values(65535,
314                -1,-2,-3,-4, 1,2,3,4, 'abc', '涛思', '\x123456', 'POINT(1 2)',
315                -5,-6,-7,-8, 5,6,7,8, 'def', '数据', '\x654321', 'POINT(3 4)')"#
316            )?,
317            1
318        );
319
320        let mut rs = client.query("select * from ws_sync.tb1")?;
321
322        #[derive(Debug, serde::Deserialize, PartialEq, Eq)]
323        #[allow(dead_code)]
324        struct A {
325            ts: String,
326            c8i1: i8,
327            c16i1: i16,
328            c32i1: i32,
329            c64i1: i64,
330            c8u1: u8,
331            c16u1: u16,
332            c32u1: u32,
333            c64u1: u64,
334
335            c8i2: i8,
336            c16i2: i16,
337            c32i2: i32,
338            c64i2: i64,
339            c8u2: u8,
340            c16u2: u16,
341            c32u2: u32,
342            c64u2: u64,
343
344            cb1: String,
345            cb2: String,
346            cn1: String,
347            cn2: String,
348
349            cvb1: Bytes,
350            cvb2: Bytes,
351            cg1: Bytes,
352            cg2: Bytes,
353        }
354
355        use itertools::Itertools;
356        let values: Vec<A> = rs.deserialize::<A>().try_collect()?;
357
358        dbg!(&values);
359
360        assert_eq!(
361            values[0],
362            A {
363                ts: "1970-01-01T08:01:05.535+08:00".to_string(),
364                c8i1: -1,
365                c16i1: -2,
366                c32i1: -3,
367                c64i1: -4,
368                c8u1: 1,
369                c16u1: 2,
370                c32u1: 3,
371                c64u1: 4,
372                c8i2: -5,
373                c16i2: -6,
374                c32i2: -7,
375                c64i2: -8,
376                c8u2: 5,
377                c16u2: 6,
378                c32u2: 7,
379                c64u2: 8,
380                cb1: "abc".to_string(),
381                cb2: "def".to_string(),
382                cn1: "涛思".to_string(),
383                cn2: "数据".to_string(),
384                cvb1: Bytes::from(vec![0x12, 0x34, 0x56]),
385                cvb2: Bytes::from(vec![0x65, 0x43, 0x21]),
386                cg1: hex_string_to_bytes("0101000000000000000000F03F0000000000000040"),
387                cg2: hex_string_to_bytes("010100000000000000000008400000000000001040"),
388            }
389        );
390
391        assert_eq!(client.exec("drop database ws_sync")?, 0);
392        Ok(())
393    }
394
395    #[test]
396    fn ws_show_databases() -> anyhow::Result<()> {
397        use taos_query::prelude::sync::*;
398        let dsn = std::env::var("TEST_DSN").unwrap_or("taos:///".to_string());
399
400        let client = TaosBuilder::from_dsn(dsn)?.build()?;
401        let mut rs = client.query("show databases")?;
402        let values = rs.to_rows_vec()?;
403
404        dbg!(values);
405        Ok(())
406    }
407
408    // #[tokio::test]
409    async fn _ws_select_from_meters() -> anyhow::Result<()> {
410        std::env::set_var("RUST_LOG", "info");
411        // pretty_env_logger::init_timed();
412        use taos_query::prelude::*;
413        let dsn = "taos+ws:///test";
414        let client = TaosBuilder::from_dsn(dsn)?.build().await?;
415
416        let mut rs = client.query("select * from meters").await?;
417
418        let mut blocks = rs.blocks();
419        let mut nb = 0;
420        let mut nr = 0;
421        while let Some(block) = blocks.try_next().await? {
422            nb += 1;
423            nr += block.nrows();
424        }
425        let summary = rs.summary();
426        dbg!(summary, (nb, nr));
427        Ok(())
428    }
429
430    #[cfg(feature = "async")]
431    #[tokio::test]
432    async fn test_client() -> anyhow::Result<()> {
433        std::env::set_var("RUST_LOG", "debug");
434        // pretty_env_logger::init();
435        use futures::TryStreamExt;
436        use taos_query::{AsyncFetchable, AsyncQueryable};
437
438        let client = TaosBuilder::from_dsn("ws://localhost:6041/")?.build()?;
439        assert_eq!(
440            client
441                .exec("create database if not exists ws_test_client")
442                .await?,
443            0
444        );
445        assert_eq!(
446            client
447                .exec("create table if not exists ws_test_client.tb1(ts timestamp, v int)")
448                .await?,
449            0
450        );
451        assert_eq!(
452            client
453                .exec("insert into ws_test_client.tb1 values(1655793421375, 1)")
454                .await?,
455            1
456        );
457
458        // let mut rs = client.s_query("select * from ws_test_client.tb1").unwrap().unwrap();
459        let mut rs = client.query("select * from ws_test_client.tb1").await?;
460
461        #[derive(Debug, serde::Deserialize)]
462        #[allow(dead_code)]
463        struct A {
464            ts: String,
465            v: i32,
466        }
467
468        let values: Vec<A> = rs.deserialize_stream().try_collect().await?;
469
470        dbg!(values);
471
472        assert_eq!(client.exec("drop database ws_test_client").await?, 0);
473        Ok(())
474    }
475}