1use std::{
2 cell::UnsafeCell,
3 ffi::{c_char, CStr, CString},
4 sync::Arc,
5 time::Duration,
6};
7
8use anyhow::Context;
9use once_cell::sync::OnceCell;
10use raw::{ApiEntry, BlockState, RawRes, RawTaos};
11use tracing::warn;
12
13use taos_query::{
14 prelude::tokio::time,
15 prelude::{Field, Precision, RawBlock, RawMeta, RawResult},
16 util::Edition,
17};
18
19const MAX_CONNECT_RETRIES: u8 = 2;
20
21mod into_c_str;
22mod raw;
23mod stmt;
24
25#[allow(non_camel_case_types)]
26pub(crate) mod types;
27
28pub mod tmq;
29pub use stmt::Stmt;
30pub use tmq::{Consumer, TmqBuilder};
31
32pub mod prelude {
33 pub use super::{Consumer, ResultSet, Stmt, Taos, TaosBuilder, TmqBuilder};
34
35 pub use taos_query::prelude::*;
36
37 pub mod sync {
38 pub use crate::{Consumer, ResultSet, Stmt, Taos, TaosBuilder, TmqBuilder};
39 pub use taos_query::prelude::sync::*;
40 }
41}
42
43#[macro_export(local_inner_macros)]
44macro_rules! err_or {
45 ($res:ident, $code:expr, $ret:expr) => {
46 unsafe {
47 let code: taos_query::prelude::Code = { $code }.into();
48 if code.success() {
49 Ok($ret)
50 } else {
51 Err(taos_query::prelude::RawError::new(code, $res.err_as_str()))
52 }
53 }
54 };
55
56 ($res:ident, $code:expr) => {{
57 err_or!($res, $code, ())
58 }};
59 ($code:expr, $ret:expr) => {
60 unsafe {
61 let code: Code = { $code }.into();
62 if code.success() {
63 Ok($ret)
64 } else {
65 Err(RawError::from_code(code))
66 }
67 }
68 };
69
70 ($code:expr) => {
71 err_or!($code, ())
72 };
73}
74
75#[derive(Debug)]
76pub struct Taos {
77 raw: RawTaos,
78}
79
80impl Drop for Taos {
81 fn drop(&mut self) {
82 self.raw.close();
83 }
84}
85
86impl taos_query::Queryable for Taos {
87 type ResultSet = ResultSet;
88
89 fn query<T: AsRef<str>>(&self, sql: T) -> RawResult<Self::ResultSet> {
90 tracing::trace!("Query with SQL: {}", sql.as_ref());
91 self.raw.query(sql.as_ref()).map(ResultSet::new)
92 }
93
94 fn query_with_req_id<T: AsRef<str>>(
95 &self,
96 _sql: T,
97 _req_id: u64,
98 ) -> RawResult<Self::ResultSet> {
99 tracing::trace!("Query with SQL: {}", _sql.as_ref());
100 self.raw
101 .query_with_req_id(_sql.as_ref(), _req_id)
102 .map(ResultSet::new)
103 }
104
105 fn write_raw_meta(&self, meta: &RawMeta) -> RawResult<()> {
106 let raw = meta.as_raw_data_t();
107 self.raw.write_raw_meta(raw)
108 }
109
110 fn write_raw_block(&self, raw: &RawBlock) -> RawResult<()> {
111 self.raw.write_raw_block(raw)
112 }
113
114 fn write_raw_block_with_req_id(&self, raw: &RawBlock, req_id: u64) -> RawResult<()> {
115 self.raw.write_raw_block_with_req_id(raw, req_id)
116 }
117
118 fn put(&self, data: &taos_query::common::SmlData) -> RawResult<()> {
119 self.raw.put(data)
120 }
121
122 fn table_vgroup_id(&self, db: &str, table: &str) -> Option<i32> {
123 self.raw.get_table_vgroup_id(db, table).ok()
124 }
125
126 fn tables_vgroup_ids<T: AsRef<str>>(&self, db: &str, tables: &[T]) -> Option<Vec<i32>> {
127 self.raw.get_tables_vgroup_ids(db, tables).ok()
128 }
129}
130
131#[async_trait::async_trait]
132impl taos_query::AsyncQueryable for Taos {
133 type AsyncResultSet = ResultSet;
134
135 async fn query<T: AsRef<str> + Send + Sync>(&self, sql: T) -> RawResult<Self::AsyncResultSet> {
136 tracing::trace!("Async query with SQL: {}", sql.as_ref());
137
138 match self.raw.query_async(sql.as_ref()).await {
139 Err(err) if err.code() == 0x2603 => {
140 self.raw.query_async(sql.as_ref()).await.map(ResultSet::new)
141 }
142 Err(err) => Err(err),
143 Ok(raw) => Ok(ResultSet::new(raw)),
144 }
145 }
146
147 async fn query_with_req_id<T: AsRef<str> + Send + Sync>(
148 &self,
149 _sql: T,
150 _req_id: u64,
151 ) -> RawResult<Self::AsyncResultSet> {
152 self.raw
153 .query_with_req_id(_sql.as_ref(), _req_id)
154 .map(ResultSet::new)
155 }
156
157 async fn write_raw_meta(&self, meta: &taos_query::common::RawMeta) -> RawResult<()> {
158 self.raw.write_raw_meta(meta.as_raw_data_t())
159 }
160
161 async fn write_raw_block(&self, block: &RawBlock) -> RawResult<()> {
162 self.raw.write_raw_block(block)
163 }
164
165 async fn write_raw_block_with_req_id(&self, block: &RawBlock, req_id: u64) -> RawResult<()> {
166 self.raw.write_raw_block_with_req_id(block, req_id)
167 }
168
169 async fn put(&self, data: &taos_query::common::SmlData) -> RawResult<()> {
170 self.raw.put(data)
171 }
172
173 async fn table_vgroup_id(&self, db: &str, table: &str) -> Option<i32> {
174 self.raw.get_table_vgroup_id(db, table).ok()
175 }
176
177 async fn tables_vgroup_ids<T: AsRef<str> + Sync>(
178 &self,
179 db: &str,
180 tables: &[T],
181 ) -> Option<Vec<i32>> {
182 self.raw.get_tables_vgroup_ids(db, tables).ok()
183 }
184}
185
186#[derive(Debug)]
224pub struct TaosBuilder {
225 auth: Auth,
227 lib: Arc<ApiEntry>,
228 inner_conn: OnceCell<Taos>,
229 server_version: OnceCell<String>,
230}
231impl TaosBuilder {
232 fn inner_connection(&self) -> RawResult<&Taos> {
233 if let Some(taos) = self.inner_conn.get() {
234 Ok(taos)
235 } else {
236 let ptr = self
237 .lib
238 .connect_with_retries(&self.auth, self.auth.max_retries())?;
239
240 let raw = RawTaos::new(self.lib.clone(), ptr)?;
241 let taos = Ok(Taos { raw });
242 self.inner_conn.get_or_try_init(|| taos)
243 }
244 }
245}
246
247#[derive(Debug, Default)]
248struct Auth {
249 host: Option<CString>,
250 user: Option<CString>,
251 pass: Option<CString>,
252 db: Option<CString>,
253 port: u16,
254 max_retries: u8,
255}
256
257impl Auth {
258 pub(crate) fn host(&self) -> Option<&CStr> {
259 self.host.as_deref()
260 }
261 pub(crate) fn host_as_ptr(&self) -> *const c_char {
262 self.host().map_or_else(std::ptr::null, |s| s.as_ptr())
263 }
264 pub(crate) fn user(&self) -> Option<&CStr> {
265 self.user.as_deref()
266 }
267 pub(crate) fn user_as_ptr(&self) -> *const c_char {
268 self.user().map_or_else(std::ptr::null, |s| s.as_ptr())
269 }
270 pub(crate) fn password(&self) -> Option<&CStr> {
271 self.pass.as_deref()
272 }
273 pub(crate) fn password_as_ptr(&self) -> *const c_char {
274 self.password().map_or_else(std::ptr::null, |s| s.as_ptr())
275 }
276 pub(crate) fn database(&self) -> Option<&CStr> {
277 self.db.as_deref()
278 }
279 pub(crate) fn database_as_ptr(&self) -> *const c_char {
280 self.database().map_or_else(std::ptr::null, |s| s.as_ptr())
281 }
282 pub(crate) fn port(&self) -> u16 {
283 self.port
284 }
285 pub(crate) fn max_retries(&self) -> u8 {
286 self.max_retries
287 }
288}
289
290impl taos_query::TBuilder for TaosBuilder {
291 type Target = Taos;
292
293 fn available_params() -> &'static [&'static str] {
294 const PARAMS: &[&str] = &["configDir", "libraryPath"];
295 PARAMS
296 }
297
298 fn from_dsn<D: taos_query::IntoDsn>(dsn: D) -> RawResult<Self> {
299 let mut dsn = dsn.into_dsn()?;
300
301 let lib = if let Some(path) = dsn.params.remove("libraryPath") {
302 tracing::trace!("using library path: {path}");
303 ApiEntry::dlopen(path).map_err(|err| taos_query::RawError::any(err))?
304 } else {
305 tracing::trace!("using default library of taos");
306 ApiEntry::open_default().map_err(|err| taos_query::RawError::any(err))?
307 };
308 let mut auth = Auth::default();
309 if let Some(addr) = dsn.addresses.first() {
311 if let Some(host) = &addr.host {
312 auth.host.replace(CString::new(host.as_str()).unwrap());
313 }
314 if let Some(port) = addr.port {
315 auth.port = port;
316 }
317 }
318 if let Some(db) = dsn.subject.as_deref() {
319 auth.db.replace(CString::new(db).unwrap());
320 }
321 if let Some(user) = dsn.username.as_deref() {
322 auth.user.replace(CString::new(user).unwrap());
323 }
324 if let Some(pass) = dsn.password.as_deref() {
325 auth.pass.replace(CString::new(pass).unwrap());
326 }
327 let params = &dsn.params;
328 if let Some(dir) = params.get("configDir") {
329 lib.options(types::TSDB_OPTION::ConfigDir, dir);
330 }
331
332 lib.options(types::TSDB_OPTION::ShellActivityTimer, "3600");
333
334 if let Some(max_retries) = params.get("maxRetries") {
335 auth.max_retries = max_retries.parse().unwrap_or(MAX_CONNECT_RETRIES);
336 } else {
337 auth.max_retries = MAX_CONNECT_RETRIES;
338 }
339
340 Ok(Self {
341 auth,
343 lib: Arc::new(lib),
344 inner_conn: OnceCell::new(),
345 server_version: OnceCell::new(),
346 })
347 }
348
349 fn client_version() -> &'static str {
350 "dynamic"
351 }
352
353 fn ping(&self, conn: &mut Self::Target) -> RawResult<()> {
354 conn.raw.query("select server_status()")?;
355 Ok(())
356 }
357
358 fn ready(&self) -> bool {
359 true
360 }
361
362 fn build(&self) -> RawResult<Self::Target> {
363 let ptr = self
364 .lib
365 .connect_with_retries(&self.auth, self.auth.max_retries())?;
366
367 let raw = RawTaos::new(self.lib.clone(), ptr)?;
368 Ok(Taos { raw })
369 }
370
371 fn server_version(&self) -> RawResult<&str> {
372 if let Some(v) = self.server_version.get() {
373 Ok(v.as_str())
374 } else {
375 let conn = self.inner_connection()?;
376 use taos_query::prelude::sync::Queryable;
377 let v: String = Queryable::query_one(conn, "select server_version()")?.unwrap();
378 Ok(match self.server_version.try_insert(v) {
379 Ok(v) => v.as_str(),
380 Err((v, _)) => v.as_str(),
381 })
382 }
383 }
384
385 fn is_enterprise_edition(&self) -> RawResult<bool> {
386 let taos = self.inner_connection()?;
387 use taos_query::prelude::sync::Queryable;
388 let grant: RawResult<Option<(String, bool)>> = Queryable::query_one(
389 taos,
390 "select version, (expire_time < now) as valid from information_schema.ins_cluster",
391 );
392
393 let edition = if let Ok(Some((edition, expired))) = grant {
394 Edition::new(edition, expired)
395 } else {
396 let grant: RawResult<Option<(String, (), String)>> =
397 Queryable::query_one(taos, "show grants");
398
399 if let Ok(Some((edition, _, expired))) = grant {
400 Edition::new(
401 edition.trim(),
402 expired.trim() == "false" || expired.trim() == "unlimited",
403 )
404 } else {
405 warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
406 Edition::new("unknown", true)
407 }
408 };
409 Ok(edition.is_enterprise_edition())
410 }
411
412 fn get_edition(&self) -> RawResult<Edition> {
413 let taos = self.inner_connection()?;
414 use taos_query::prelude::sync::Queryable;
415 let grant: RawResult<Option<(String, bool)>> = Queryable::query_one(
416 taos,
417 "select version, (expire_time < now) as valid from information_schema.ins_cluster",
418 );
419
420 let edition = if let Ok(Some((edition, expired))) = grant {
421 Edition::new(edition, expired)
422 } else {
423 let grant: RawResult<Option<(String, (), String)>> =
424 Queryable::query_one(taos, "show grants");
425
426 if let Ok(Some((edition, _, expired))) = grant {
427 Edition::new(
428 edition.trim(),
429 !(expired.trim() == "false" || expired.trim() == "unlimited"),
430 )
431 } else {
432 warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
433 Edition::new("unknown", true)
434 }
435 };
436 Ok(edition)
437 }
438}
439
440#[async_trait::async_trait]
441impl taos_query::AsyncTBuilder for TaosBuilder {
442 type Target = Taos;
443
444 fn from_dsn<D: taos_query::IntoDsn>(dsn: D) -> RawResult<Self> {
445 let mut dsn = dsn.into_dsn()?;
446
447 let lib = if let Some(path) = dsn.params.remove("libraryPath") {
448 tracing::trace!("using library path: {path}");
449 ApiEntry::dlopen(path).map_err(|err| taos_query::RawError::any(err))?
450 } else {
451 tracing::trace!("using default library of taos");
452 ApiEntry::open_default().map_err(|err| taos_query::RawError::any(err))?
453 };
454 let mut auth = Auth::default();
455 if let Some(addr) = dsn.addresses.first() {
457 if let Some(host) = &addr.host {
458 auth.host.replace(CString::new(host.as_str()).unwrap());
459 }
460 if let Some(port) = addr.port {
461 auth.port = port;
462 }
463 }
464 if let Some(db) = dsn.subject.as_deref() {
465 auth.db.replace(CString::new(db).unwrap());
466 }
467 if let Some(user) = dsn.username.as_deref() {
468 auth.user.replace(CString::new(user).unwrap());
469 }
470 if let Some(pass) = dsn.password.as_deref() {
471 auth.pass.replace(CString::new(pass).unwrap());
472 }
473 let params = &dsn.params;
474 if let Some(dir) = params.get("configDir") {
475 lib.options(types::TSDB_OPTION::ConfigDir, dir);
476 }
477
478 lib.options(types::TSDB_OPTION::ShellActivityTimer, "3600");
479
480 if let Some(max_retries) = params.get("maxRetries") {
481 auth.max_retries = max_retries.parse().unwrap_or(MAX_CONNECT_RETRIES);
482 } else {
483 auth.max_retries = MAX_CONNECT_RETRIES;
484 }
485
486 Ok(Self {
487 auth,
489 lib: Arc::new(lib),
490 inner_conn: OnceCell::new(),
491 server_version: OnceCell::new(),
492 })
493 }
494
495 fn client_version() -> &'static str {
496 "dynamic"
497 }
498
499 async fn ping(&self, _: &mut Self::Target) -> RawResult<()> {
500 Ok(())
503 }
504
505 async fn ready(&self) -> bool {
506 true
507 }
508
509 async fn build(&self) -> RawResult<Self::Target> {
510 let ptr = self
511 .lib
512 .connect_with_retries(&self.auth, self.auth.max_retries())?;
513
514 let raw = RawTaos::new(self.lib.clone(), ptr)?;
515 Ok(Taos { raw })
516 }
517
518 async fn server_version(&self) -> RawResult<&str> {
519 if let Some(v) = self.server_version.get() {
520 Ok(v.as_str())
521 } else {
522 let conn = self.inner_connection()?;
523 use taos_query::prelude::AsyncQueryable;
524 let v: String = AsyncQueryable::query_one(conn, "select server_version()")
525 .await?
526 .unwrap();
527 Ok(match self.server_version.try_insert(v) {
528 Ok(v) => v.as_str(),
529 Err((v, _)) => v.as_str(),
530 })
531 }
532 }
533
534 async fn is_enterprise_edition(&self) -> RawResult<bool> {
535 let taos = self.inner_connection()?;
536 use taos_query::prelude::AsyncQueryable;
537
538 let grant: RawResult<Option<(String, bool)>> = time::timeout(
540 Duration::from_secs(60),
541 AsyncQueryable::query_one(
542 taos,
543 "select version, (expire_time < now) as valid from information_schema.ins_cluster",
544 ),
545 )
546 .await
547 .context("Check cluster edition timeout")?;
548
549 let edition = if let Ok(Some((edition, expired))) = grant {
550 Edition::new(edition, expired)
551 } else {
552 let grant: RawResult<Option<(String, (), String)>> = time::timeout(
553 Duration::from_secs(60),
554 AsyncQueryable::query_one(taos, "show grants"),
555 )
556 .await
557 .context("Check legacy grants timeout")?;
558
559 if let Ok(Some((edition, _, expired))) = grant {
560 Edition::new(
561 edition.trim(),
562 expired.trim() == "false" || expired.trim() == "unlimited",
563 )
564 } else {
565 warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
566 Edition::new("unknown", true)
567 }
568 };
569 Ok(edition.is_enterprise_edition())
570 }
571
572 async fn get_edition(&self) -> RawResult<Edition> {
573 let taos = self.inner_connection()?;
574 use taos_query::prelude::AsyncQueryable;
575
576 let grant: RawResult<Option<(String, bool)>> = time::timeout(
578 Duration::from_secs(60),
579 AsyncQueryable::query_one(
580 taos,
581 "select version, (expire_time < now) as valid from information_schema.ins_cluster",
582 ),
583 )
584 .await
585 .context("Check cluster edition timeout")?;
586
587 let edition = if let Ok(Some((edition, expired))) = grant {
588 Edition::new(edition, expired)
589 } else {
590 let grant: RawResult<Option<(String, (), String)>> = time::timeout(
591 Duration::from_secs(60),
592 AsyncQueryable::query_one(taos, "show grants"),
593 )
594 .await
595 .context("Check legacy grants timeout")?;
596
597 if let Ok(Some((edition, _, expired))) = grant {
598 Edition::new(
599 edition.trim(),
600 !(expired.trim() == "false" || expired.trim() == "unlimited"),
601 )
602 } else {
603 warn!("Can't check enterprise edition with either \"show cluster\" or \"show grants\"");
604 Edition::new("unknown", true)
605 }
606 };
607 Ok(edition)
608 }
609}
610
611#[derive(Debug)]
612pub struct ResultSet {
613 raw: RawRes,
614 fields: OnceCell<Vec<Field>>,
615 summary: UnsafeCell<(usize, usize)>,
616 state: Arc<UnsafeCell<BlockState>>,
617}
618
619impl ResultSet {
620 fn new(raw: RawRes) -> Self {
621 Self {
622 raw,
623 fields: OnceCell::new(),
624 summary: UnsafeCell::new((0, 0)),
625 state: Arc::new(UnsafeCell::new(BlockState::default())),
626 }
627 }
628
629 fn precision(&self) -> Precision {
630 self.raw.precision()
631 }
632
633 fn fields(&self) -> &[Field] {
634 self.fields.get_or_init(|| self.raw.fetch_fields())
635 }
636
637 fn update_summary(&mut self, nrows: usize) {
638 let summary = self.summary.get_mut();
639 summary.0 += 1;
640 summary.1 += nrows;
641 }
642
643 pub(crate) fn summary(&self) -> &(usize, usize) {
644 unsafe { &*self.summary.get() }
645 }
646
647 pub(crate) fn affected_rows(&self) -> i32 {
648 self.raw.affected_rows() as _
649 }
650}
651
652impl taos_query::Fetchable for ResultSet {
653 fn affected_rows(&self) -> i32 {
654 self.affected_rows()
655 }
656
657 fn precision(&self) -> Precision {
658 self.precision()
659 }
660
661 fn fields(&self) -> &[Field] {
662 self.fields()
663 }
664
665 fn summary(&self) -> (usize, usize) {
666 *self.summary()
667 }
668
669 fn update_summary(&mut self, nrows: usize) {
670 self.update_summary(nrows)
671 }
672
673 fn fetch_raw_block(&mut self) -> RawResult<Option<RawBlock>> {
674 self.raw.fetch_raw_block(self.fields())
675 }
676}
677
678impl taos_query::AsyncFetchable for ResultSet {
679 fn affected_rows(&self) -> i32 {
680 self.affected_rows()
681 }
682
683 fn precision(&self) -> Precision {
684 self.precision()
685 }
686
687 fn fields(&self) -> &[Field] {
688 self.fields()
689 }
690
691 fn summary(&self) -> (usize, usize) {
692 *self.summary()
693 }
694
695 fn fetch_raw_block(
696 &mut self,
697 cx: &mut std::task::Context<'_>,
698 ) -> std::task::Poll<RawResult<Option<RawBlock>>> {
699 self.raw
700 .fetch_raw_block_async(self.fields(), self.precision(), &self.state, cx)
701 }
702
703 fn update_summary(&mut self, nrows: usize) {
704 self.update_summary(nrows)
705 }
706}
707
708impl Drop for ResultSet {
709 fn drop(&mut self) {
710 self.raw.free_result();
711 }
712}
713
714unsafe impl Send for ResultSet {}
715unsafe impl Sync for ResultSet {}
716
717#[cfg(test)]
718pub(crate) mod constants {
719 pub const DSN_V2: &str = "taos://localhost:6030";
722 pub const DSN_V3: &str = "taos://localhost:6030";
723}
724
725#[cfg(test)]
726mod tests {
727 use crate::constants::{DSN_V2, DSN_V3};
728
729 use super::*;
730
731 use taos_query::common::SchemalessPrecision;
732 use taos_query::common::SchemalessProtocol;
733 use taos_query::common::SmlDataBuilder;
734
735 #[test]
736 fn show_databases() -> RawResult<()> {
737 use taos_query::prelude::sync::*;
738 let builder = TaosBuilder::from_dsn(DSN_V3)?;
739 let taos = builder.build()?;
740 let mut set = taos.query("show databases")?;
741
742 for raw in &mut set.blocks() {
743 let raw = raw?;
744 for (col, view) in raw.columns().enumerate() {
745 for (row, value) in view.iter().enumerate().take(10) {
746 println!("Value at (row: {}, col: {}) is: {}", row, col, value);
747 }
748 }
749
750 for (row, view) in raw.rows().enumerate().take(10) {
751 for (col, value) in view.enumerate() {
752 println!("Value at (row: {}, col: {}) is: {:?}", row, col, value);
753 }
754 }
755 }
756
757 println!("summary: {:?}", set.summary());
758
759 Ok(())
760 }
761 #[test]
762 fn long_query() -> RawResult<()> {
763 use taos_query::prelude::sync::*;
764 let builder = TaosBuilder::from_dsn(DSN_V3)?;
765 let taos = builder.build()?;
766 let mut set = taos.query("show databases")?;
767
768 for raw in &mut set.blocks() {
769 let raw = raw?;
770 for (col, view) in raw.columns().enumerate() {
771 for (row, value) in view.iter().enumerate().take(10) {
772 println!("Value at (row: {}, col: {}) is: {}", row, col, value);
773 }
774 }
775
776 for (row, view) in raw.rows().enumerate().take(10) {
777 for (col, value) in view.enumerate() {
778 println!("Value at (row: {}, col: {}) is: {:?}", row, col, value);
779 }
780 }
781 }
782
783 println!("summary: {:?}", set.summary());
784
785 Ok(())
786 }
787
788 #[tokio::test]
789 #[ignore]
790 async fn builder_retry_once() -> RawResult<()> {
791 use taos_query::prelude::*;
792
793 let builder = TaosBuilder::from_dsn("taos://localhost:6041?maxRetries=1")?;
794 assert!(builder.ready().await);
795
796 let res = builder.build().await;
797 assert!(res.is_err());
798
799 Ok(())
800 }
801
802 #[tokio::test]
803 #[ignore]
804 async fn builder_retry_default() -> RawResult<()> {
805 use taos_query::prelude::*;
806
807 let builder = TaosBuilder::from_dsn("taos://localhost:6041")?;
808 assert!(builder.ready().await);
809
810 let res = builder.build().await;
811 assert!(res.is_err());
812
813 Ok(())
814 }
815
816 #[tokio::test]
817 async fn long_query_async() -> RawResult<()> {
818 use taos_query::prelude::*;
819 let builder = TaosBuilder::from_dsn(DSN_V3)?;
820 let taos = builder.build().await?;
821 let mut set = taos.query("show databases").await?;
822
823 set.blocks()
824 .try_for_each_concurrent(10, |block| async move {
825 println!("{}", block.pretty_format());
826 Ok(())
827 })
828 .await?;
829 println!("summary: {:?}", set.summary());
830
831 let mut set = taos.query("show databases").await?;
832
833 set.rows()
834 .try_for_each_concurrent(10, |row| async move {
835 println!(
836 "{}",
837 row.map(|(_, value)| value.to_string().unwrap()).join(",")
838 );
839 Ok(())
840 })
841 .await?;
842
843 println!("summary: {:?}", set.summary());
844
845 Ok(())
846 }
847 #[tokio::test]
848 async fn show_databases_async() -> RawResult<()> {
849 use taos_query::prelude::*;
850
851 std::env::set_var("RUST_LOG", "debug");
852 let builder = TaosBuilder::from_dsn(DSN_V3)?;
854 let taos = builder.build().await?;
855 let mut set = taos.query("show databases").await?;
856
857 let mut rows = set.rows();
858 let mut nrows = 0;
859 while let Some(row) = rows.try_next().await? {
860 for (col, (name, value)) in row.enumerate() {
861 println!("[{}, {}] (named `{:>4}`): {}", nrows, col, name, value);
862 }
863 nrows += 1;
864 }
865
866 println!("summary: {:?}", set.summary());
867
868 Ok(())
869 }
870 #[tokio::test]
871 async fn error_async() -> RawResult<()> {
872 use taos_query::prelude::*;
873
874 std::env::set_var("RUST_LOG", "debug");
875 let builder = TaosBuilder::from_dsn("taos:///")?;
877 let taos = builder.build().await?;
878 let err = taos
879 .exec("create table test.`abc.` (ts timestamp, val int)")
880 .await
881 .unwrap_err();
882 println!("{:?}", err);
884 assert!(err.code() == 0x2617);
885 let err_str = err.to_string();
886 assert!(err_str.contains("0x2617"));
887 assert!(err_str.contains("The table name cannot contain '.'"));
888 Ok(())
889 }
890 #[tokio::test]
891 async fn error_fetch_async() -> RawResult<()> {
892 use taos_query::prelude::*;
893
894 std::env::set_var("RUST_LOG", "debug");
895 let builder = TaosBuilder::from_dsn("taos:///")?;
897 let taos = builder.build().await?;
898 let err = taos
899 .query("select * from testxxxx.meters")
900 .await
901 .unwrap_err();
902
903 tracing::trace!("{:?}", err);
904
905 assert!(err.code() == 0x2662);
906 let err_str = err.to_string();
907 assert!(err_str.contains("0x2662"));
908 assert!(err_str.contains("Database not exist"));
909
910 Ok(())
911 }
912 #[tokio::test]
913 async fn error_sync() -> RawResult<()> {
914 use taos_query::prelude::sync::*;
915
916 std::env::set_var("RUST_LOG", "debug");
917 let builder = TaosBuilder::from_dsn("taos:///")?;
919 let taos = builder.build()?;
920 let err = taos
921 .exec("create table test.`abc.` (ts timestamp, val int)")
922 .unwrap_err();
923 assert!(err.code() == 0x2617);
925 let err_str = err.to_string();
926 assert!(err_str.contains("0x2617"));
927 assert!(err_str.contains("The table name cannot contain '.'"));
928 println!("{:?}", err);
929 Ok(())
930 }
931
932 #[test]
933 fn show_databases_v2() -> RawResult<()> {
934 use taos_query::prelude::sync::*;
935 let builder = TaosBuilder::from_dsn(crate::constants::DSN_V2)?;
936 let taos = builder.build()?;
937 let mut set = taos.query("show databases")?;
938
939 for raw in &mut set.blocks() {
940 let raw = raw?;
941 for (col, view) in raw.columns().enumerate() {
942 for (row, value) in view.iter().enumerate().take(10) {
943 println!("Value at (row: {}, col: {}) is: {}", row, col, value);
944 }
945 }
946
947 for (row, view) in raw.rows().enumerate().take(10) {
948 for (col, value) in view.enumerate() {
949 println!("Value at (row: {}, col: {}) is: {:?}", row, col, value);
950 }
951 }
952 }
953
954 println!("summary: {:?}", set.summary());
955
956 Ok(())
957 }
958
959 #[tokio::test]
960 async fn show_databases_async_v2() -> RawResult<()> {
961 use taos_query::prelude::*;
962 let builder = TaosBuilder::from_dsn(DSN_V2)?;
963 let taos = builder.build().await?;
964 let mut set = taos.query("show databases").await?;
965
966 let mut rows = set.rows();
967 let mut nrows = 0;
968 while let Some(row) = rows.try_next().await? {
969 for (col, (name, value)) in row.enumerate() {
970 println!("[{}, {}] (named `{:>4}`): {}", nrows, col, name, value);
971 }
972 nrows += 1;
973 }
974
975 println!("summary: {:?}", set.summary());
976 Ok(())
977 }
978
979 #[test]
980 fn test_put_line() -> anyhow::Result<()> {
981 std::env::set_var("RUST_LOG", "taos=debug");
983 use taos_query::prelude::sync::*;
985
986 let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
987 tracing::debug!("dsn: {:?}", &dsn);
988
989 let client = TaosBuilder::from_dsn(dsn)?.build()?;
990
991 let db = "test_schemaless_optin";
992
993 client.exec(format!("drop database if exists {db}"))?;
994
995 client.exec(format!("create database if not exists {db}"))?;
996
997 client.exec(format!("use {db}"))?;
999
1000 let data = [
1001 "measurement,host=host1 field1=2i,field2=2.0 1577837300000",
1002 "measurement,host=host1 field1=2i,field2=2.0 1577837400000",
1003 "measurement,host=host1 field1=2i,field2=2.0 1577837500000",
1004 "measurement,host=host1 field1=2i,field2=2.0 1577837600000",
1005 ]
1006 .map(String::from)
1007 .to_vec();
1008
1009 let sml_data = SmlDataBuilder::default()
1010 .protocol(SchemalessProtocol::Line)
1011 .precision(SchemalessPrecision::Millisecond)
1012 .data(data.clone())
1013 .ttl(1000)
1014 .req_id(100u64)
1015 .build()?;
1016 assert_eq!(client.put(&sml_data)?, ());
1017
1018 let sml_data = SmlDataBuilder::default()
1019 .protocol(SchemalessProtocol::Line)
1020 .precision(SchemalessPrecision::Millisecond)
1021 .data(data.clone())
1022 .ttl(1000)
1023 .build()?;
1024 assert_eq!(client.put(&sml_data)?, ());
1025
1026 let sml_data = SmlDataBuilder::default()
1027 .protocol(SchemalessProtocol::Line)
1028 .precision(SchemalessPrecision::Millisecond)
1029 .data(data.clone())
1030 .build()?;
1031 assert_eq!(client.put(&sml_data)?, ());
1032
1033 let sml_data = SmlDataBuilder::default()
1034 .protocol(SchemalessProtocol::Line)
1035 .data(data)
1036 .req_id(103u64)
1037 .build()?;
1038 assert_eq!(client.put(&sml_data)?, ());
1039
1040 client.exec(format!("drop database if exists {db}"))?;
1041
1042 Ok(())
1043 }
1044
1045 #[test]
1046 fn test_put_telnet() -> anyhow::Result<()> {
1047 std::env::set_var("RUST_LOG", "taos=debug");
1049 use taos_query::prelude::sync::*;
1051
1052 let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
1053 tracing::debug!("dsn: {:?}", &dsn);
1054
1055 let client = TaosBuilder::from_dsn(dsn)?.build()?;
1056
1057 let db = "test_schemaless_telnet_optin";
1058
1059 client.exec(format!("drop database if exists {db}"))?;
1060
1061 client.exec(format!("create database if not exists {db}"))?;
1062
1063 client.exec(format!("use {db}"))?;
1065
1066 let data = [
1067 "meters.current 1648432611249 10.3 location=California.SanFrancisco group=2",
1068 "meters.current 1648432611250 12.6 location=California.SanFrancisco group=2",
1069 "meters.current 1648432611249 10.8 location=California.LosAngeles group=3",
1070 "meters.current 1648432611250 11.3 location=California.LosAngeles group=3",
1071 "meters.voltage 1648432611249 219 location=California.SanFrancisco group=2",
1072 "meters.voltage 1648432611250 218 location=California.SanFrancisco group=2",
1073 "meters.voltage 1648432611249 221 location=California.LosAngeles group=3",
1074 "meters.voltage 1648432611250 217 location=California.LosAngeles group=3",
1075 ]
1076 .map(String::from)
1077 .to_vec();
1078
1079 let sml_data = SmlDataBuilder::default()
1080 .protocol(SchemalessProtocol::Telnet)
1081 .precision(SchemalessPrecision::Millisecond)
1082 .data(data.clone())
1083 .ttl(1000)
1084 .req_id(100u64)
1085 .build()?;
1086 assert_eq!(client.put(&sml_data)?, ());
1087
1088 let sml_data = SmlDataBuilder::default()
1089 .protocol(SchemalessProtocol::Telnet)
1090 .precision(SchemalessPrecision::Millisecond)
1091 .data(data.clone())
1092 .req_id(101u64)
1093 .build()?;
1094 assert_eq!(client.put(&sml_data)?, ());
1095
1096 let sml_data = SmlDataBuilder::default()
1097 .protocol(SchemalessProtocol::Telnet)
1098 .precision(SchemalessPrecision::Millisecond)
1099 .data(data.clone())
1100 .build()?;
1101 assert_eq!(client.put(&sml_data)?, ());
1102
1103 let sml_data = SmlDataBuilder::default()
1104 .protocol(SchemalessProtocol::Telnet)
1105 .data(data)
1106 .req_id(103u64)
1107 .build()?;
1108 assert_eq!(client.put(&sml_data)?, ());
1109
1110 client.exec(format!("drop database if exists {db}"))?;
1111
1112 Ok(())
1113 }
1114
1115 #[test]
1116 fn test_put_json() -> anyhow::Result<()> {
1117 std::env::set_var("RUST_LOG", "taos=debug");
1119 use taos_query::prelude::sync::*;
1121
1122 let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
1123 tracing::debug!("dsn: {:?}", &dsn);
1124
1125 let client = TaosBuilder::from_dsn(dsn)?.build()?;
1126
1127 let db = "test_schemaless_json_optin";
1128
1129 client.exec(format!("drop database if exists {db}"))?;
1130
1131 client.exec(format!("create database if not exists {db}"))?;
1132
1133 client.exec(format!("use {db}"))?;
1135
1136 let data = [
1138 r#"[{"metric": "meters.current", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#
1139 ]
1140 .map(String::from)
1141 .to_vec();
1142
1143 let sml_data = SmlDataBuilder::default()
1144 .protocol(SchemalessProtocol::Json)
1145 .precision(SchemalessPrecision::Millisecond)
1146 .data(data.clone())
1147 .ttl(1000)
1148 .req_id(300u64)
1149 .build()?;
1150 assert_eq!(client.put(&sml_data)?, ());
1151
1152 let sml_data = SmlDataBuilder::default()
1153 .protocol(SchemalessProtocol::Json)
1154 .data(data.clone())
1155 .ttl(1000)
1156 .req_id(301u64)
1157 .build()?;
1158 assert_eq!(client.put(&sml_data)?, ());
1159
1160 let sml_data = SmlDataBuilder::default()
1161 .protocol(SchemalessProtocol::Json)
1162 .data(data.clone())
1163 .req_id(302u64)
1164 .build()?;
1165 assert_eq!(client.put(&sml_data)?, ());
1166
1167 let sml_data = SmlDataBuilder::default()
1168 .protocol(SchemalessProtocol::Json)
1169 .data(data.clone())
1170 .build()?;
1171 assert_eq!(client.put(&sml_data)?, ());
1172
1173 client.exec(format!("drop database if exists {db}"))?;
1174
1175 Ok(())
1176 }
1177
1178 #[test]
1179 fn test_error_details() -> anyhow::Result<()> {
1180 std::env::set_var("RUST_LOG", "taos=debug");
1182 use taos_query::prelude::sync::*;
1184
1185 let dsn = std::env::var("TEST_DSN").unwrap_or("taos://localhost:6030".to_string());
1186 tracing::debug!("dsn: {:?}", &dsn);
1187
1188 let client = TaosBuilder::from_dsn(dsn)?.build()?;
1189
1190 let db = "test_tmq_err_details";
1191
1192 client.exec(format!("drop database if exists {db}"))?;
1193
1194 client.exec(format!("create database if not exists {db}"))?;
1195
1196 client.exec(format!("use {db}"))?;
1198
1199 client.exec("create table t1 (ts timestamp, val int)")?;
1200
1201 let views = vec![
1202 ColumnView::from_millis_timestamp(vec![164000000000]),
1203 ColumnView::from_bools(vec![true]),
1204 ];
1205 let mut block = RawBlock::from_views(&views, Precision::Millisecond);
1206 block.with_table_name("t1");
1207
1208 let err = client.write_raw_block(&block).unwrap_err();
1209 dbg!(&err);
1210
1211 Ok(())
1212 }
1213}