taos_optin/
lib.rs

1use std::{
2    cell::UnsafeCell,
3    ffi::{c_char, CStr, CString},
4    sync::Arc,
5    time::Duration,
6};
7
8use anyhow::Context;
9use once_cell::sync::OnceCell;
10use raw::{ApiEntry, BlockState, RawRes, RawTaos};
11use tracing::warn;
12
13use taos_query::{
14    prelude::tokio::time,
15    prelude::{Field, Precision, RawBlock, RawMeta, RawResult},
16    util::Edition,
17};
18
19const MAX_CONNECT_RETRIES: u8 = 2;
20
21mod into_c_str;
22mod raw;
23mod stmt;
24
25#[allow(non_camel_case_types)]
26pub(crate) mod types;
27
28pub mod tmq;
29pub use stmt::Stmt;
30pub use tmq::{Consumer, TmqBuilder};
31
32pub mod prelude {
33    pub use super::{Consumer, ResultSet, Stmt, Taos, TaosBuilder, TmqBuilder};
34
35    pub use taos_query::prelude::*;
36
37    pub mod sync {
38        pub use crate::{Consumer, ResultSet, Stmt, Taos, TaosBuilder, TmqBuilder};
39        pub use taos_query::prelude::sync::*;
40    }
41}
42
43#[macro_export(local_inner_macros)]
44macro_rules! err_or {
45    ($res:ident, $code:expr, $ret:expr) => {
46        unsafe {
47            let code: taos_query::prelude::Code = { $code }.into();
48            if code.success() {
49                Ok($ret)
50            } else {
51                Err(taos_query::prelude::RawError::new(code, $res.err_as_str()))
52            }
53        }
54    };
55
56    ($res:ident, $code:expr) => {{
57        err_or!($res, $code, ())
58    }};
59    ($code:expr, $ret:expr) => {
60        unsafe {
61            let code: Code = { $code }.into();
62            if code.success() {
63                Ok($ret)
64            } else {
65                Err(RawError::from_code(code))
66            }
67        }
68    };
69
70    ($code:expr) => {
71        err_or!($code, ())
72    };
73}
74
75#[derive(Debug)]
76pub struct Taos {
77    raw: RawTaos,
78}
79
80impl Drop for Taos {
81    fn drop(&mut self) {
82        self.raw.close();
83    }
84}
85
86impl taos_query::Queryable for Taos {
87    type ResultSet = ResultSet;
88
89    fn query<T: AsRef<str>>(&self, sql: T) -> RawResult<Self::ResultSet> {
90        tracing::trace!("Query with SQL: {}", sql.as_ref());
91        self.raw.query(sql.as_ref()).map(ResultSet::new)
92    }
93
94    fn query_with_req_id<T: AsRef<str>>(
95        &self,
96        _sql: T,
97        _req_id: u64,
98    ) -> RawResult<Self::ResultSet> {
99        tracing::trace!("Query with SQL: {}", _sql.as_ref());
100        self.raw
101            .query_with_req_id(_sql.as_ref(), _req_id)
102            .map(ResultSet::new)
103    }
104
105    fn write_raw_meta(&self, meta: &RawMeta) -> RawResult<()> {
106        let raw = meta.as_raw_data_t();
107        self.raw.write_raw_meta(raw)
108    }
109
110    fn write_raw_block(&self, raw: &RawBlock) -> RawResult<()> {
111        self.raw.write_raw_block(raw)
112    }
113
114    fn write_raw_block_with_req_id(&self, raw: &RawBlock, req_id: u64) -> RawResult<()> {
115        self.raw.write_raw_block_with_req_id(raw, req_id)
116    }
117
118    fn put(&self, data: &taos_query::common::SmlData) -> RawResult<()> {
119        self.raw.put(data)
120    }
121
122    fn table_vgroup_id(&self, db: &str, table: &str) -> Option<i32> {
123        self.raw.get_table_vgroup_id(db, table).ok()
124    }
125
126    fn tables_vgroup_ids<T: AsRef<str>>(&self, db: &str, tables: &[T]) -> Option<Vec<i32>> {
127        self.raw.get_tables_vgroup_ids(db, tables).ok()
128    }
129}
130
131#[async_trait::async_trait]
132impl taos_query::AsyncQueryable for Taos {
133    type AsyncResultSet = ResultSet;
134
135    async fn query<T: AsRef<str> + Send + Sync>(&self, sql: T) -> RawResult<Self::AsyncResultSet> {
136        tracing::trace!("Async query with SQL: {}", sql.as_ref());
137
138        match self.raw.query_async(sql.as_ref()).await {
139            Err(err) if err.code() == 0x2603 => {
140                self.raw.query_async(sql.as_ref()).await.map(ResultSet::new)
141            }
142            Err(err) => Err(err),
143            Ok(raw) => Ok(ResultSet::new(raw)),
144        }
145    }
146
147    async fn query_with_req_id<T: AsRef<str> + Send + Sync>(
148        &self,
149        _sql: T,
150        _req_id: u64,
151    ) -> RawResult<Self::AsyncResultSet> {
152        self.raw
153            .query_with_req_id(_sql.as_ref(), _req_id)
154            .map(ResultSet::new)
155    }
156
157    async fn write_raw_meta(&self, meta: &taos_query::common::RawMeta) -> RawResult<()> {
158        self.raw.write_raw_meta(meta.as_raw_data_t())
159    }
160
161    async fn write_raw_block(&self, block: &RawBlock) -> RawResult<()> {
162        self.raw.write_raw_block(block)
163    }
164
165    async fn write_raw_block_with_req_id(&self, block: &RawBlock, req_id: u64) -> RawResult<()> {
166        self.raw.write_raw_block_with_req_id(block, req_id)
167    }
168
169    async fn put(&self, data: &taos_query::common::SmlData) -> RawResult<()> {
170        self.raw.put(data)
171    }
172
173    async fn table_vgroup_id(&self, db: &str, table: &str) -> Option<i32> {
174        self.raw.get_table_vgroup_id(db, table).ok()
175    }
176
177    async fn tables_vgroup_ids<T: AsRef<str> + Sync>(
178        &self,
179        db: &str,
180        tables: &[T],
181    ) -> Option<Vec<i32>> {
182        self.raw.get_tables_vgroup_ids(db, tables).ok()
183    }
184}
185
186/// Connection builder.
187///
188/// ## Examples
189///
190/// ### Synchronous
191///
192/// ```rust
193/// use taos_optin::prelude::sync::*;
194/// fn main() -> anyhow::Result<()> {
195///     let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
196///     let taos = builder.build()?;
197///     let mut query = taos.query("show databases")?;
198///     for row in query.rows() {
199///         println!("{:?}", row?.into_values());
200///     }
201///     Ok(())
202/// }
203/// ```
204///
205/// ### Async
206///
207/// ```rust
208/// use taos_optin::prelude::*;
209///
210/// #[tokio::main]
211/// async fn main() -> anyhow::Result<()> {
212///     let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
213///     let taos = builder.build().await?;
214///     let mut query = taos.query("show databases").await?;
215///
216///     while let Some(row) = query.rows().try_next().await? {
217///         println!("{:?}", row.into_values());
218///     }
219///     Ok(())
220/// }
221/// #
222/// ```
223#[derive(Debug)]
224pub struct TaosBuilder {
225    // dsn: Dsn,
226    auth: Auth,
227    lib: Arc<ApiEntry>,
228    inner_conn: OnceCell<Taos>,
229    server_version: OnceCell<String>,
230}
231impl TaosBuilder {
232    fn inner_connection(&self) -> RawResult<&Taos> {
233        if let Some(taos) = self.inner_conn.get() {
234            Ok(taos)
235        } else {
236            let ptr = self
237                .lib
238                .connect_with_retries(&self.auth, self.auth.max_retries())?;
239
240            let raw = RawTaos::new(self.lib.clone(), ptr)?;
241            let taos = Ok(Taos { raw });
242            self.inner_conn.get_or_try_init(|| taos)
243        }
244    }
245}
246
247#[derive(Debug, Default)]
248struct Auth {
249    host: Option<CString>,
250    user: Option<CString>,
251    pass: Option<CString>,
252    db: Option<CString>,
253    port: u16,
254    max_retries: u8,
255}
256
257impl Auth {
258    pub(crate) fn host(&self) -> Option<&CStr> {
259        self.host.as_deref()
260    }
261    pub(crate) fn host_as_ptr(&self) -> *const c_char {
262        self.host().map_or_else(std::ptr::null, |s| s.as_ptr())
263    }
264    pub(crate) fn user(&self) -> Option<&CStr> {
265        self.user.as_deref()
266    }
267    pub(crate) fn user_as_ptr(&self) -> *const c_char {
268        self.user().map_or_else(std::ptr::null, |s| s.as_ptr())
269    }
270    pub(crate) fn password(&self) -> Option<&CStr> {
271        self.pass.as_deref()
272    }
273    pub(crate) fn password_as_ptr(&self) -> *const c_char {
274        self.password().map_or_else(std::ptr::null, |s| s.as_ptr())
275    }
276    pub(crate) fn database(&self) -> Option<&CStr> {
277        self.db.as_deref()
278    }
279    pub(crate) fn database_as_ptr(&self) -> *const c_char {
280        self.database().map_or_else(std::ptr::null, |s| s.as_ptr())
281    }
282    pub(crate) fn port(&self) -> u16 {
283        self.port
284    }
285    pub(crate) fn max_retries(&self) -> u8 {
286        self.max_retries
287    }
288}
289
290impl taos_query::TBuilder for TaosBuilder {
291    type Target = Taos;
292
293    fn available_params() -> &'static [&'static str] {
294        const PARAMS: &[&str] = &["configDir", "libraryPath"];
295        PARAMS
296    }
297
298    fn from_dsn<D: taos_query::IntoDsn>(dsn: D) -> RawResult<Self> {
299        let mut dsn = dsn.into_dsn()?;
300
301        let lib = if let Some(path) = dsn.params.remove("libraryPath") {
302            tracing::trace!("using library path: {path}");
303            ApiEntry::dlopen(path).map_err(|err| taos_query::RawError::any(err))?
304        } else {
305            tracing::trace!("using default library of taos");
306            ApiEntry::open_default().map_err(|err| taos_query::RawError::any(err))?
307        };
308        let mut auth = Auth::default();
309        // let mut builder = TaosBuilder::default();
310        if let Some(addr) = dsn.addresses.first() {
311            if let Some(host) = &addr.host {
312                auth.host.replace(CString::new(host.as_str()).unwrap());
313            }
314            if let Some(port) = addr.port {
315                auth.port = port;
316            }
317        }
318        if let Some(db) = dsn.subject.as_deref() {
319            auth.db.replace(CString::new(db).unwrap());
320        }
321        if let Some(user) = dsn.username.as_deref() {
322            auth.user.replace(CString::new(user).unwrap());
323        }
324        if let Some(pass) = dsn.password.as_deref() {
325            auth.pass.replace(CString::new(pass).unwrap());
326        }
327        let params = &dsn.params;
328        if let Some(dir) = params.get("configDir") {
329            lib.options(types::TSDB_OPTION::ConfigDir, dir);
330        }
331
332        lib.options(types::TSDB_OPTION::ShellActivityTimer, "3600");
333
334        if let Some(max_retries) = params.get("maxRetries") {
335            auth.max_retries = max_retries.parse().unwrap_or(MAX_CONNECT_RETRIES);
336        } else {
337            auth.max_retries = MAX_CONNECT_RETRIES;
338        }
339
340        Ok(Self {
341            // dsn,
342            auth,
343            lib: Arc::new(lib),
344            inner_conn: OnceCell::new(),
345            server_version: OnceCell::new(),
346        })
347    }
348
349    fn client_version() -> &'static str {
350        "dynamic"
351    }
352
353    fn ping(&self, conn: &mut Self::Target) -> RawResult<()> {
354        conn.raw.query("select server_status()")?;
355        Ok(())
356    }
357
358    fn ready(&self) -> bool {
359        true
360    }
361
362    fn build(&self) -> RawResult<Self::Target> {
363        let ptr = self
364            .lib
365            .connect_with_retries(&self.auth, self.auth.max_retries())?;
366
367        let raw = RawTaos::new(self.lib.clone(), ptr)?;
368        Ok(Taos { raw })
369    }
370
371    fn server_version(&self) -> RawResult<&str> {
372        if let Some(v) = self.server_version.get() {
373            Ok(v.as_str())
374        } else {
375            let conn = self.inner_connection()?;
376            use taos_query::prelude::sync::Queryable;
377            let v: String = Queryable::query_one(conn, "select server_version()")?.unwrap();
378            Ok(match self.server_version.try_insert(v) {
379                Ok(v) => v.as_str(),
380                Err((v, _)) => v.as_str(),
381            })
382        }
383    }
384
385    fn is_enterprise_edition(&self) -> RawResult<bool> {
386        let taos = self.inner_connection()?;
387        use taos_query::prelude::sync::Queryable;
388        let grant: RawResult<Option<(String, bool)>> = Queryable::query_one(
389            taos,
390            "select version, (expire_time < now) as valid from information_schema.ins_cluster",
391        );
392
393        let edition = if let Ok(Some((edition, expired))) = grant {
394            Edition::new(edition, expired)
395        } else {
396            let grant: RawResult<Option<(String, (), String)>> =
397                Queryable::query_one(taos, "show grants");
398
399            if let Ok(Some((edition, _, expired))) = grant {
400                Edition::new(
401                    edition.trim(),
402                    expired.trim() == "false" || expired.trim() == "unlimited",
403                )
404            } else {
405                warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
406                Edition::new("unknown", true)
407            }
408        };
409        Ok(edition.is_enterprise_edition())
410    }
411
412    fn get_edition(&self) -> RawResult<Edition> {
413        let taos = self.inner_connection()?;
414        use taos_query::prelude::sync::Queryable;
415        let grant: RawResult<Option<(String, bool)>> = Queryable::query_one(
416            taos,
417            "select version, (expire_time < now) as valid from information_schema.ins_cluster",
418        );
419
420        let edition = if let Ok(Some((edition, expired))) = grant {
421            Edition::new(edition, expired)
422        } else {
423            let grant: RawResult<Option<(String, (), String)>> =
424                Queryable::query_one(taos, "show grants");
425
426            if let Ok(Some((edition, _, expired))) = grant {
427                Edition::new(
428                    edition.trim(),
429                    !(expired.trim() == "false" || expired.trim() == "unlimited"),
430                )
431            } else {
432                warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
433                Edition::new("unknown", true)
434            }
435        };
436        Ok(edition)
437    }
438}
439
440#[async_trait::async_trait]
441impl taos_query::AsyncTBuilder for TaosBuilder {
442    type Target = Taos;
443
444    fn from_dsn<D: taos_query::IntoDsn>(dsn: D) -> RawResult<Self> {
445        let mut dsn = dsn.into_dsn()?;
446
447        let lib = if let Some(path) = dsn.params.remove("libraryPath") {
448            tracing::trace!("using library path: {path}");
449            ApiEntry::dlopen(path).map_err(|err| taos_query::RawError::any(err))?
450        } else {
451            tracing::trace!("using default library of taos");
452            ApiEntry::open_default().map_err(|err| taos_query::RawError::any(err))?
453        };
454        let mut auth = Auth::default();
455        // let mut builder = TaosBuilder::default();
456        if let Some(addr) = dsn.addresses.first() {
457            if let Some(host) = &addr.host {
458                auth.host.replace(CString::new(host.as_str()).unwrap());
459            }
460            if let Some(port) = addr.port {
461                auth.port = port;
462            }
463        }
464        if let Some(db) = dsn.subject.as_deref() {
465            auth.db.replace(CString::new(db).unwrap());
466        }
467        if let Some(user) = dsn.username.as_deref() {
468            auth.user.replace(CString::new(user).unwrap());
469        }
470        if let Some(pass) = dsn.password.as_deref() {
471            auth.pass.replace(CString::new(pass).unwrap());
472        }
473        let params = &dsn.params;
474        if let Some(dir) = params.get("configDir") {
475            lib.options(types::TSDB_OPTION::ConfigDir, dir);
476        }
477
478        lib.options(types::TSDB_OPTION::ShellActivityTimer, "3600");
479
480        if let Some(max_retries) = params.get("maxRetries") {
481            auth.max_retries = max_retries.parse().unwrap_or(MAX_CONNECT_RETRIES);
482        } else {
483            auth.max_retries = MAX_CONNECT_RETRIES;
484        }
485
486        Ok(Self {
487            // dsn,
488            auth,
489            lib: Arc::new(lib),
490            inner_conn: OnceCell::new(),
491            server_version: OnceCell::new(),
492        })
493    }
494
495    fn client_version() -> &'static str {
496        "dynamic"
497    }
498
499    async fn ping(&self, _: &mut Self::Target) -> RawResult<()> {
500        // use taos_query::prelude::AsyncQueryable;
501        // conn.query("select server_status()").await?;
502        Ok(())
503    }
504
505    async fn ready(&self) -> bool {
506        true
507    }
508
509    async fn build(&self) -> RawResult<Self::Target> {
510        let ptr = self
511            .lib
512            .connect_with_retries(&self.auth, self.auth.max_retries())?;
513
514        let raw = RawTaos::new(self.lib.clone(), ptr)?;
515        Ok(Taos { raw })
516    }
517
518    async fn server_version(&self) -> RawResult<&str> {
519        if let Some(v) = self.server_version.get() {
520            Ok(v.as_str())
521        } else {
522            let conn = self.inner_connection()?;
523            use taos_query::prelude::AsyncQueryable;
524            let v: String = AsyncQueryable::query_one(conn, "select server_version()")
525                .await?
526                .unwrap();
527            Ok(match self.server_version.try_insert(v) {
528                Ok(v) => v.as_str(),
529                Err((v, _)) => v.as_str(),
530            })
531        }
532    }
533
534    async fn is_enterprise_edition(&self) -> RawResult<bool> {
535        let taos = self.inner_connection()?;
536        use taos_query::prelude::AsyncQueryable;
537
538        // the latest version of 3.x should work
539        let grant: RawResult<Option<(String, bool)>> = time::timeout(
540            Duration::from_secs(60),
541            AsyncQueryable::query_one(
542                taos,
543                "select version, (expire_time < now) as valid from information_schema.ins_cluster",
544            ),
545        )
546        .await
547        .context("Check cluster edition timeout")?;
548
549        let edition = if let Ok(Some((edition, expired))) = grant {
550            Edition::new(edition, expired)
551        } else {
552            let grant: RawResult<Option<(String, (), String)>> = time::timeout(
553                Duration::from_secs(60),
554                AsyncQueryable::query_one(taos, "show grants"),
555            )
556            .await
557            .context("Check legacy grants timeout")?;
558
559            if let Ok(Some((edition, _, expired))) = grant {
560                Edition::new(
561                    edition.trim(),
562                    expired.trim() == "false" || expired.trim() == "unlimited",
563                )
564            } else {
565                warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
566                Edition::new("unknown", true)
567            }
568        };
569        Ok(edition.is_enterprise_edition())
570    }
571
572    async fn get_edition(&self) -> RawResult<Edition> {
573        let taos = self.inner_connection()?;
574        use taos_query::prelude::AsyncQueryable;
575
576        // the latest version of 3.x should work
577        let grant: RawResult<Option<(String, bool)>> = time::timeout(
578            Duration::from_secs(60),
579            AsyncQueryable::query_one(
580                taos,
581                "select version, (expire_time < now) as valid from information_schema.ins_cluster",
582            ),
583        )
584        .await
585        .context("Check cluster edition timeout")?;
586
587        let edition = if let Ok(Some((edition, expired))) = grant {
588            Edition::new(edition, expired)
589        } else {
590            let grant: RawResult<Option<(String, (), String)>> = time::timeout(
591                Duration::from_secs(60),
592                AsyncQueryable::query_one(taos, "show grants"),
593            )
594            .await
595            .context("Check legacy grants timeout")?;
596
597            if let Ok(Some((edition, _, expired))) = grant {
598                Edition::new(
599                    edition.trim(),
600                    !(expired.trim() == "false" || expired.trim() == "unlimited"),
601                )
602            } else {
603                warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
604                Edition::new("unknown", true)
605            }
606        };
607        Ok(edition)
608    }
609}
610
611#[derive(Debug)]
612pub struct ResultSet {
613    raw: RawRes,
614    fields: OnceCell<Vec<Field>>,
615    summary: UnsafeCell<(usize, usize)>,
616    state: Arc<UnsafeCell<BlockState>>,
617}
618
619impl ResultSet {
620    fn new(raw: RawRes) -> Self {
621        Self {
622            raw,
623            fields: OnceCell::new(),
624            summary: UnsafeCell::new((0, 0)),
625            state: Arc::new(UnsafeCell::new(BlockState::default())),
626        }
627    }
628
629    fn precision(&self) -> Precision {
630        self.raw.precision()
631    }
632
633    fn fields(&self) -> &[Field] {
634        self.fields.get_or_init(|| self.raw.fetch_fields())
635    }
636
637    fn update_summary(&mut self, nrows: usize) {
638        let summary = self.summary.get_mut();
639        summary.0 += 1;
640        summary.1 += nrows;
641    }
642
643    pub(crate) fn summary(&self) -> &(usize, usize) {
644        unsafe { &*self.summary.get() }
645    }
646
647    pub(crate) fn affected_rows(&self) -> i32 {
648        self.raw.affected_rows() as _
649    }
650}
651
652impl taos_query::Fetchable for ResultSet {
653    fn affected_rows(&self) -> i32 {
654        self.affected_rows()
655    }
656
657    fn precision(&self) -> Precision {
658        self.precision()
659    }
660
661    fn fields(&self) -> &[Field] {
662        self.fields()
663    }
664
665    fn summary(&self) -> (usize, usize) {
666        *self.summary()
667    }
668
669    fn update_summary(&mut self, nrows: usize) {
670        self.update_summary(nrows)
671    }
672
673    fn fetch_raw_block(&mut self) -> RawResult<Option<RawBlock>> {
674        self.raw.fetch_raw_block(self.fields())
675    }
676}
677
678impl taos_query::AsyncFetchable for ResultSet {
679    fn affected_rows(&self) -> i32 {
680        self.affected_rows()
681    }
682
683    fn precision(&self) -> Precision {
684        self.precision()
685    }
686
687    fn fields(&self) -> &[Field] {
688        self.fields()
689    }
690
691    fn summary(&self) -> (usize, usize) {
692        *self.summary()
693    }
694
695    fn fetch_raw_block(
696        &mut self,
697        cx: &mut std::task::Context<'_>,
698    ) -> std::task::Poll<RawResult<Option<RawBlock>>> {
699        self.raw
700            .fetch_raw_block_async(self.fields(), self.precision(), &self.state, cx)
701    }
702
703    fn update_summary(&mut self, nrows: usize) {
704        self.update_summary(nrows)
705    }
706}
707
708impl Drop for ResultSet {
709    fn drop(&mut self) {
710        self.raw.free_result();
711    }
712}
713
714unsafe impl Send for ResultSet {}
715unsafe impl Sync for ResultSet {}
716
717#[cfg(test)]
718pub(crate) mod constants {
719    // pub const DSN_V2: &str = "taos://localhost:16030?libraryPath=tests/libs/libtaos.so.2.6.0.16";
720    // pub const DSN_V3: &str = "taos://localhost:26030?libraryPath=tests/libs/libtaos.so.3.0.1.5";
721    pub const DSN_V2: &str = "taos://localhost:6030";
722    pub const DSN_V3: &str = "taos://localhost:6030";
723}
724
725#[cfg(test)]
726mod tests {
727    use crate::constants::{DSN_V2, DSN_V3};
728
729    use super::*;
730
731    use taos_query::common::SchemalessPrecision;
732    use taos_query::common::SchemalessProtocol;
733    use taos_query::common::SmlDataBuilder;
734
735    #[test]
736    fn show_databases() -> RawResult<()> {
737        use taos_query::prelude::sync::*;
738        let builder = TaosBuilder::from_dsn(DSN_V3)?;
739        let taos = builder.build()?;
740        let mut set = taos.query("show databases")?;
741
742        for raw in &mut set.blocks() {
743            let raw = raw?;
744            for (col, view) in raw.columns().enumerate() {
745                for (row, value) in view.iter().enumerate().take(10) {
746                    println!("Value at (row: {}, col: {}) is: {}", row, col, value);
747                }
748            }
749
750            for (row, view) in raw.rows().enumerate().take(10) {
751                for (col, value) in view.enumerate() {
752                    println!("Value at (row: {}, col: {}) is: {:?}", row, col, value);
753                }
754            }
755        }
756
757        println!("summary: {:?}", set.summary());
758
759        Ok(())
760    }
761    #[test]
762    fn long_query() -> RawResult<()> {
763        use taos_query::prelude::sync::*;
764        let builder = TaosBuilder::from_dsn(DSN_V3)?;
765        let taos = builder.build()?;
766        let mut set = taos.query("show databases")?;
767
768        for raw in &mut set.blocks() {
769            let raw = raw?;
770            for (col, view) in raw.columns().enumerate() {
771                for (row, value) in view.iter().enumerate().take(10) {
772                    println!("Value at (row: {}, col: {}) is: {}", row, col, value);
773                }
774            }
775
776            for (row, view) in raw.rows().enumerate().take(10) {
777                for (col, value) in view.enumerate() {
778                    println!("Value at (row: {}, col: {}) is: {:?}", row, col, value);
779                }
780            }
781        }
782
783        println!("summary: {:?}", set.summary());
784
785        Ok(())
786    }
787
788    #[tokio::test]
789    #[ignore]
790    async fn builder_retry_once() -> RawResult<()> {
791        use taos_query::prelude::*;
792
793        let builder = TaosBuilder::from_dsn("taos://localhost:6041?maxRetries=1")?;
794        assert!(builder.ready().await);
795
796        let res = builder.build().await;
797        assert!(res.is_err());
798
799        Ok(())
800    }
801
802    #[tokio::test]
803    #[ignore]
804    async fn builder_retry_default() -> RawResult<()> {
805        use taos_query::prelude::*;
806
807        let builder = TaosBuilder::from_dsn("taos://localhost:6041")?;
808        assert!(builder.ready().await);
809
810        let res = builder.build().await;
811        assert!(res.is_err());
812
813        Ok(())
814    }
815
816    #[tokio::test]
817    async fn long_query_async() -> RawResult<()> {
818        use taos_query::prelude::*;
819        let builder = TaosBuilder::from_dsn(DSN_V3)?;
820        let taos = builder.build().await?;
821        let mut set = taos.query("show databases").await?;
822
823        set.blocks()
824            .try_for_each_concurrent(10, |block| async move {
825                println!("{}", block.pretty_format());
826                Ok(())
827            })
828            .await?;
829        println!("summary: {:?}", set.summary());
830
831        let mut set = taos.query("show databases").await?;
832
833        set.rows()
834            .try_for_each_concurrent(10, |row| async move {
835                println!(
836                    "{}",
837                    row.map(|(_, value)| value.to_string().unwrap()).join(",")
838                );
839                Ok(())
840            })
841            .await?;
842
843        println!("summary: {:?}", set.summary());
844
845        Ok(())
846    }
847    #[tokio::test]
848    async fn show_databases_async() -> RawResult<()> {
849        use taos_query::prelude::*;
850
851        std::env::set_var("RUST_LOG", "debug");
852        // let _ = pretty_env_logger::try_init();
853        let builder = TaosBuilder::from_dsn(DSN_V3)?;
854        let taos = builder.build().await?;
855        let mut set = taos.query("show databases").await?;
856
857        let mut rows = set.rows();
858        let mut nrows = 0;
859        while let Some(row) = rows.try_next().await? {
860            for (col, (name, value)) in row.enumerate() {
861                println!("[{}, {}] (named `{:>4}`): {}", nrows, col, name, value);
862            }
863            nrows += 1;
864        }
865
866        println!("summary: {:?}", set.summary());
867
868        Ok(())
869    }
870    #[tokio::test]
871    async fn error_async() -> RawResult<()> {
872        use taos_query::prelude::*;
873
874        std::env::set_var("RUST_LOG", "debug");
875        // let _ = pretty_env_logger::try_init();
876        let builder = TaosBuilder::from_dsn("taos:///")?;
877        let taos = builder.build().await?;
878        let err = taos
879            .exec("create table test.`abc.` (ts timestamp, val int)")
880            .await
881            .unwrap_err();
882        // dbg!(err);
883        println!("{:?}", err);
884        assert!(err.code() == 0x2617);
885        let err_str = err.to_string();
886        assert!(err_str.contains("0x2617"));
887        assert!(err_str.contains("The table name cannot contain '.'"));
888        Ok(())
889    }
890    #[tokio::test]
891    async fn error_fetch_async() -> RawResult<()> {
892        use taos_query::prelude::*;
893
894        std::env::set_var("RUST_LOG", "debug");
895        // let _ = pretty_env_logger::try_init();
896        let builder = TaosBuilder::from_dsn("taos:///")?;
897        let taos = builder.build().await?;
898        let err = taos
899            .query("select * from testxxxx.meters")
900            .await
901            .unwrap_err();
902
903        tracing::trace!("{:?}", err);
904
905        assert!(err.code() == 0x2662);
906        let err_str = err.to_string();
907        assert!(err_str.contains("0x2662"));
908        assert!(err_str.contains("Database not exist"));
909
910        Ok(())
911    }
912    #[tokio::test]
913    async fn error_sync() -> RawResult<()> {
914        use taos_query::prelude::sync::*;
915
916        std::env::set_var("RUST_LOG", "debug");
917        // let _ = pretty_env_logger::try_init();
918        let builder = TaosBuilder::from_dsn("taos:///")?;
919        let taos = builder.build()?;
920        let err = taos
921            .exec("create table test.`abc.` (ts timestamp, val int)")
922            .unwrap_err();
923        // dbg!(err);
924        assert!(err.code() == 0x2617);
925        let err_str = err.to_string();
926        assert!(err_str.contains("0x2617"));
927        assert!(err_str.contains("The table name cannot contain '.'"));
928        println!("{:?}", err);
929        Ok(())
930    }
931
932    #[test]
933    fn show_databases_v2() -> RawResult<()> {
934        use taos_query::prelude::sync::*;
935        let builder = TaosBuilder::from_dsn(crate::constants::DSN_V2)?;
936        let taos = builder.build()?;
937        let mut set = taos.query("show databases")?;
938
939        for raw in &mut set.blocks() {
940            let raw = raw?;
941            for (col, view) in raw.columns().enumerate() {
942                for (row, value) in view.iter().enumerate().take(10) {
943                    println!("Value at (row: {}, col: {}) is: {}", row, col, value);
944                }
945            }
946
947            for (row, view) in raw.rows().enumerate().take(10) {
948                for (col, value) in view.enumerate() {
949                    println!("Value at (row: {}, col: {}) is: {:?}", row, col, value);
950                }
951            }
952        }
953
954        println!("summary: {:?}", set.summary());
955
956        Ok(())
957    }
958
959    #[tokio::test]
960    async fn show_databases_async_v2() -> RawResult<()> {
961        use taos_query::prelude::*;
962        let builder = TaosBuilder::from_dsn(DSN_V2)?;
963        let taos = builder.build().await?;
964        let mut set = taos.query("show databases").await?;
965
966        let mut rows = set.rows();
967        let mut nrows = 0;
968        while let Some(row) = rows.try_next().await? {
969            for (col, (name, value)) in row.enumerate() {
970                println!("[{}, {}] (named `{:>4}`): {}", nrows, col, name, value);
971            }
972            nrows += 1;
973        }
974
975        println!("summary: {:?}", set.summary());
976        Ok(())
977    }
978
979    #[test]
980    fn test_put_line() -> anyhow::Result<()> {
981        // std::env::set_var("RUST_LOG", "taos=trace");
982        std::env::set_var("RUST_LOG", "taos=debug");
983        // let _ = pretty_env_logger::try_init();
984        use taos_query::prelude::sync::*;
985
986        let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
987        tracing::debug!("dsn: {:?}", &dsn);
988
989        let client = TaosBuilder::from_dsn(dsn)?.build()?;
990
991        let db = "test_schemaless_optin";
992
993        client.exec(format!("drop database if exists {db}"))?;
994
995        client.exec(format!("create database if not exists {db}"))?;
996
997        // should specify database before insert
998        client.exec(format!("use {db}"))?;
999
1000        let data = [
1001            "measurement,host=host1 field1=2i,field2=2.0 1577837300000",
1002            "measurement,host=host1 field1=2i,field2=2.0 1577837400000",
1003            "measurement,host=host1 field1=2i,field2=2.0 1577837500000",
1004            "measurement,host=host1 field1=2i,field2=2.0 1577837600000",
1005        ]
1006        .map(String::from)
1007        .to_vec();
1008
1009        let sml_data = SmlDataBuilder::default()
1010            .protocol(SchemalessProtocol::Line)
1011            .precision(SchemalessPrecision::Millisecond)
1012            .data(data.clone())
1013            .ttl(1000)
1014            .req_id(100u64)
1015            .build()?;
1016        assert_eq!(client.put(&sml_data)?, ());
1017
1018        let sml_data = SmlDataBuilder::default()
1019            .protocol(SchemalessProtocol::Line)
1020            .precision(SchemalessPrecision::Millisecond)
1021            .data(data.clone())
1022            .ttl(1000)
1023            .build()?;
1024        assert_eq!(client.put(&sml_data)?, ());
1025
1026        let sml_data = SmlDataBuilder::default()
1027            .protocol(SchemalessProtocol::Line)
1028            .precision(SchemalessPrecision::Millisecond)
1029            .data(data.clone())
1030            .build()?;
1031        assert_eq!(client.put(&sml_data)?, ());
1032
1033        let sml_data = SmlDataBuilder::default()
1034            .protocol(SchemalessProtocol::Line)
1035            .data(data)
1036            .req_id(103u64)
1037            .build()?;
1038        assert_eq!(client.put(&sml_data)?, ());
1039
1040        client.exec(format!("drop database if exists {db}"))?;
1041
1042        Ok(())
1043    }
1044
1045    #[test]
1046    fn test_put_telnet() -> anyhow::Result<()> {
1047        // std::env::set_var("RUST_LOG", "taos=trace");
1048        std::env::set_var("RUST_LOG", "taos=debug");
1049        // pretty_env_logger::init();
1050        use taos_query::prelude::sync::*;
1051
1052        let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
1053        tracing::debug!("dsn: {:?}", &dsn);
1054
1055        let client = TaosBuilder::from_dsn(dsn)?.build()?;
1056
1057        let db = "test_schemaless_telnet_optin";
1058
1059        client.exec(format!("drop database if exists {db}"))?;
1060
1061        client.exec(format!("create database if not exists {db}"))?;
1062
1063        // should specify database before insert
1064        client.exec(format!("use {db}"))?;
1065
1066        let data = [
1067            "meters.current 1648432611249 10.3 location=California.SanFrancisco group=2",
1068            "meters.current 1648432611250 12.6 location=California.SanFrancisco group=2",
1069            "meters.current 1648432611249 10.8 location=California.LosAngeles group=3",
1070            "meters.current 1648432611250 11.3 location=California.LosAngeles group=3",
1071            "meters.voltage 1648432611249 219 location=California.SanFrancisco group=2",
1072            "meters.voltage 1648432611250 218 location=California.SanFrancisco group=2",
1073            "meters.voltage 1648432611249 221 location=California.LosAngeles group=3",
1074            "meters.voltage 1648432611250 217 location=California.LosAngeles group=3",
1075        ]
1076        .map(String::from)
1077        .to_vec();
1078
1079        let sml_data = SmlDataBuilder::default()
1080            .protocol(SchemalessProtocol::Telnet)
1081            .precision(SchemalessPrecision::Millisecond)
1082            .data(data.clone())
1083            .ttl(1000)
1084            .req_id(100u64)
1085            .build()?;
1086        assert_eq!(client.put(&sml_data)?, ());
1087
1088        let sml_data = SmlDataBuilder::default()
1089            .protocol(SchemalessProtocol::Telnet)
1090            .precision(SchemalessPrecision::Millisecond)
1091            .data(data.clone())
1092            .req_id(101u64)
1093            .build()?;
1094        assert_eq!(client.put(&sml_data)?, ());
1095
1096        let sml_data = SmlDataBuilder::default()
1097            .protocol(SchemalessProtocol::Telnet)
1098            .precision(SchemalessPrecision::Millisecond)
1099            .data(data.clone())
1100            .build()?;
1101        assert_eq!(client.put(&sml_data)?, ());
1102
1103        let sml_data = SmlDataBuilder::default()
1104            .protocol(SchemalessProtocol::Telnet)
1105            .data(data)
1106            .req_id(103u64)
1107            .build()?;
1108        assert_eq!(client.put(&sml_data)?, ());
1109
1110        client.exec(format!("drop database if exists {db}"))?;
1111
1112        Ok(())
1113    }
1114
1115    #[test]
1116    fn test_put_json() -> anyhow::Result<()> {
1117        // std::env::set_var("RUST_LOG", "taos=trace");
1118        std::env::set_var("RUST_LOG", "taos=debug");
1119        // pretty_env_logger::init();
1120        use taos_query::prelude::sync::*;
1121
1122        let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
1123        tracing::debug!("dsn: {:?}", &dsn);
1124
1125        let client = TaosBuilder::from_dsn(dsn)?.build()?;
1126
1127        let db = "test_schemaless_json_optin";
1128
1129        client.exec(format!("drop database if exists {db}"))?;
1130
1131        client.exec(format!("create database if not exists {db}"))?;
1132
1133        // should specify database before insert
1134        client.exec(format!("use {db}"))?;
1135
1136        // SchemalessProtocol::Json
1137        let data = [
1138            r#"[{"metric": "meters.current", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#
1139        ]
1140        .map(String::from)
1141        .to_vec();
1142
1143        let sml_data = SmlDataBuilder::default()
1144            .protocol(SchemalessProtocol::Json)
1145            .precision(SchemalessPrecision::Millisecond)
1146            .data(data.clone())
1147            .ttl(1000)
1148            .req_id(300u64)
1149            .build()?;
1150        assert_eq!(client.put(&sml_data)?, ());
1151
1152        let sml_data = SmlDataBuilder::default()
1153            .protocol(SchemalessProtocol::Json)
1154            .data(data.clone())
1155            .ttl(1000)
1156            .req_id(301u64)
1157            .build()?;
1158        assert_eq!(client.put(&sml_data)?, ());
1159
1160        let sml_data = SmlDataBuilder::default()
1161            .protocol(SchemalessProtocol::Json)
1162            .data(data.clone())
1163            .req_id(302u64)
1164            .build()?;
1165        assert_eq!(client.put(&sml_data)?, ());
1166
1167        let sml_data = SmlDataBuilder::default()
1168            .protocol(SchemalessProtocol::Json)
1169            .data(data.clone())
1170            .build()?;
1171        assert_eq!(client.put(&sml_data)?, ());
1172
1173        client.exec(format!("drop database if exists {db}"))?;
1174
1175        Ok(())
1176    }
1177
1178    #[test]
1179    fn test_error_details() -> anyhow::Result<()> {
1180        // std::env::set_var("RUST_LOG", "taos=trace");
1181        std::env::set_var("RUST_LOG", "taos=debug");
1182        // pretty_env_logger::init();
1183        use taos_query::prelude::sync::*;
1184
1185        let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
1186        tracing::debug!("dsn: {:?}", &dsn);
1187
1188        let client = TaosBuilder::from_dsn(dsn)?.build()?;
1189
1190        let db = "test_tmq_err_details";
1191
1192        client.exec(format!("drop database if exists {db}"))?;
1193
1194        client.exec(format!("create database if not exists {db}"))?;
1195
1196        // should specify database before insert
1197        client.exec(format!("use {db}"))?;
1198
1199        client.exec("create table t1 (ts timestamp, val int)")?;
1200
1201        let views = vec![
1202            ColumnView::from_millis_timestamp(vec![164000000000]),
1203            ColumnView::from_bools(vec![true]),
1204        ];
1205        let mut block = RawBlock::from_views(&views, Precision::Millisecond);
1206        block.with_table_name("t1");
1207
1208        let err = client.write_raw_block(&block).unwrap_err();
1209        dbg!(&err);
1210
1211        Ok(())
1212    }
1213}