taos_query/
prelude.rs

1mod _priv {
2    pub use crate::common::{
3        AlterType, BorrowedValue, ColumnView, Field, JsonMeta, MetaAlter, MetaCreate, MetaDrop,
4        MetaUnit, Precision, RawBlock, RawMeta, TagWithValue, Ty, Value,
5    };
6    pub use crate::util::{Inlinable, InlinableRead, InlinableWrite};
7
8    pub use itertools::Itertools;
9    pub use mdsn::{Dsn, DsnError, IntoDsn};
10    pub use taos_error::{Code, Error as RawError};
11
12    pub use crate::tmq::{IsOffset, MessageSet, Timeout};
13}
14
15pub use crate::tmq::{AsAsyncConsumer, IsAsyncData, IsAsyncMeta};
16pub use crate::AsyncTBuilder;
17#[cfg(feature = "deadpool")]
18pub use crate::Pool;
19pub use crate::RawResult;
20pub use _priv::*;
21pub use futures::stream::{Stream, StreamExt, TryStreamExt};
22pub use r#async::*;
23pub use tokio;
24
25pub trait Helpers {
26    fn table_vgroup_id(&self, _db: &str, _table: &str) -> Option<i32> {
27        None
28    }
29
30    fn tables_vgroup_ids<T: AsRef<str>>(&self, _db: &str, _tables: &[T]) -> Option<Vec<i32>> {
31        None
32    }
33}
34
35pub mod sync {
36    pub use crate::RawResult;
37    pub use crate::TBuilder;
38    #[cfg(feature = "r2d2")]
39    pub use crate::{Pool, PoolBuilder};
40    #[cfg(feature = "r2d2")]
41    pub use r2d2::ManageConnection;
42    use std::borrow::Cow;
43
44    pub use super::_priv::*;
45
46    pub use crate::stmt::Bindable;
47    pub use crate::tmq::{AsConsumer, IsData, IsMeta};
48
49    use serde::de::DeserializeOwned;
50
51    pub use mdsn::{Address, Dsn, DsnError, IntoDsn};
52    pub use serde::de::value::Error as DeError;
53
54    use crate::common::*;
55    use crate::helpers::*;
56
57    pub struct IRowsIter<'a, T>
58    where
59        T: Fetchable,
60    {
61        iter: IBlockIter<'a, T>,
62        block: Option<RawBlock>,
63        // row: usize,
64        rows: Option<RowsIter<'a>>,
65    }
66
67    impl<'a, T> IRowsIter<'a, T>
68    where
69        T: Fetchable,
70    {
71        fn fetch(&mut self) -> RawResult<Option<RowView<'a>>> {
72            if let Some(block) = self.iter.next().transpose()? {
73                self.block = Some(block);
74                self.rows = self.block.as_mut().map(|raw| raw.rows());
75                let row = self.rows.as_mut().unwrap().next();
76                Ok(row)
77            } else {
78                Ok(None)
79            }
80        }
81        fn next_row(&mut self) -> RawResult<Option<RowView<'a>>> {
82            // has block
83            if let Some(rows) = self.rows.as_mut() {
84                // check if block over.
85                if let Some(row) = rows.next() {
86                    Ok(Some(row))
87                } else {
88                    self.fetch()
89                }
90            } else {
91                // no data, start fetching.
92                self.fetch()
93            }
94        }
95    }
96
97    impl<'a, T> Iterator for IRowsIter<'a, T>
98    where
99        T: Fetchable,
100    {
101        type Item = RawResult<RowView<'a>>;
102
103        fn next(&mut self) -> Option<Self::Item> {
104            self.next_row().transpose()
105        }
106    }
107
108    pub struct IBlockIter<'a, T>
109    where
110        T: Fetchable,
111    {
112        query: &'a mut T,
113    }
114
115    impl<'a, T> Iterator for IBlockIter<'a, T>
116    where
117        T: Fetchable,
118    {
119        type Item = RawResult<RawBlock>;
120
121        fn next(&mut self) -> Option<Self::Item> {
122            self.query
123                .fetch_raw_block()
124                .map(|raw| {
125                    if let Some(raw) = raw {
126                        self.query.update_summary(raw.nrows());
127                        Some(raw)
128                    } else {
129                        None
130                    }
131                })
132                .transpose()
133        }
134    }
135
136    pub trait Fetchable: Sized {
137        fn affected_rows(&self) -> i32;
138
139        fn precision(&self) -> Precision;
140
141        fn fields(&self) -> &[Field];
142
143        fn num_of_fields(&self) -> usize {
144            self.fields().len()
145        }
146
147        fn summary(&self) -> (usize, usize);
148
149        #[doc(hidden)]
150        fn update_summary(&mut self, nrows: usize);
151
152        #[doc(hidden)]
153        fn fetch_raw_block(&mut self) -> RawResult<Option<RawBlock>>;
154
155        /// Iterator for raw data blocks.
156        fn blocks(&mut self) -> IBlockIter<'_, Self> {
157            IBlockIter { query: self }
158        }
159
160        /// Iterator for querying by rows.
161        fn rows(&mut self) -> IRowsIter<'_, Self> {
162            IRowsIter {
163                iter: self.blocks(),
164                block: None,
165                // row: 0,
166                rows: None,
167            }
168        }
169
170        fn deserialize<T: DeserializeOwned>(
171            &mut self,
172        ) -> std::iter::Map<IRowsIter<'_, Self>, fn(RawResult<RowView>) -> RawResult<T>> {
173            self.rows().map(|row| T::deserialize(&mut row?))
174        }
175
176        fn to_rows_vec(&mut self) -> RawResult<Vec<Vec<Value>>> {
177            self.blocks()
178                .map_ok(|raw| raw.to_values())
179                .flatten_ok()
180                .try_collect()
181        }
182    }
183
184    /// The synchronous query trait for TDengine connection.
185    pub trait Queryable // where
186    //     Self::ResultSet: Iterator<Item = Result<RawData>>,
187    {
188        type ResultSet: Fetchable;
189
190        fn query<T: AsRef<str>>(&self, sql: T) -> RawResult<Self::ResultSet>;
191
192        fn query_with_req_id<T: AsRef<str>>(
193            &self,
194            sql: T,
195            req_id: u64,
196        ) -> RawResult<Self::ResultSet>;
197
198        fn exec<T: AsRef<str>>(&self, sql: T) -> RawResult<usize> {
199            self.query(sql).map(|res| res.affected_rows() as _)
200        }
201
202        fn write_raw_meta(&self, _: &RawMeta) -> RawResult<()>;
203
204        fn write_raw_block(&self, _: &RawBlock) -> RawResult<()>;
205
206        fn write_raw_block_with_req_id(&self, _: &RawBlock, _: u64) -> RawResult<()>;
207
208        fn exec_many<T: AsRef<str>, I: IntoIterator<Item = T>>(
209            &self,
210            input: I,
211        ) -> RawResult<usize> {
212            input
213                .into_iter()
214                .map(|sql| self.exec(sql))
215                .try_fold(0, |mut acc, aff| {
216                    acc += aff?;
217                    Ok(acc)
218                })
219        }
220
221        fn query_one<T: AsRef<str>, O: DeserializeOwned>(&self, sql: T) -> RawResult<Option<O>> {
222            self.query(sql)?
223                .deserialize::<O>()
224                .next()
225                .map_or(Ok(None), |v| v.map(Some).map_err(Into::into))
226        }
227
228        /// Short for `SELECT server_version()` as [String].
229        fn server_version(&self) -> RawResult<Cow<str>> {
230            Ok(self
231                .query_one::<_, String>("SELECT server_version()")?
232                .expect("should always has result")
233                .into())
234        }
235
236        fn create_topic(&self, name: impl AsRef<str>, sql: impl AsRef<str>) -> RawResult<()> {
237            let (name, sql) = (name.as_ref(), sql.as_ref());
238            let query = format!("create topic if not exists `{name}` as {sql}");
239
240            self.query(query)?;
241            Ok(())
242        }
243
244        fn create_topic_as_database(
245            &self,
246            name: impl AsRef<str>,
247            db: impl std::fmt::Display,
248        ) -> RawResult<()> {
249            let name = name.as_ref();
250            let query = format!("create topic if not exists `{name}` as database `{db}`");
251
252            self.exec(query)?;
253            Ok(())
254        }
255
256        fn databases(&self) -> RawResult<Vec<ShowDatabase>> {
257            self.query("show databases")?
258                .deserialize()
259                .try_collect()
260                .map_err(Into::into)
261        }
262
263        /// Topics information by `SELECT * FROM information_schema.ins_topics` sql.
264        ///
265        /// ## Compatibility
266        ///
267        /// This is a 3.x-only API.
268        fn topics(&self) -> RawResult<Vec<Topic>> {
269            self.query("SELECT * FROM information_schema.ins_topics")?
270                .deserialize()
271                .try_collect()
272                .map_err(Into::into)
273        }
274
275        fn describe(&self, table: &str) -> RawResult<Describe> {
276            Ok(Describe(
277                self.query(format!("describe `{table}`"))?
278                    .deserialize()
279                    .try_collect()?,
280            ))
281        }
282
283        /// Check if database exists
284        fn database_exists(&self, name: &str) -> RawResult<bool> {
285            Ok(self.exec(format!("show `{name}`.stables")).is_ok())
286        }
287
288        fn put(&self, data: &SmlData) -> RawResult<()>;
289
290        fn table_vgroup_id(&self, _db: &str, _table: &str) -> Option<i32> {
291            None
292        }
293
294        fn tables_vgroup_ids<T: AsRef<str>>(&self, _db: &str, _tables: &[T]) -> Option<Vec<i32>> {
295            None
296        }
297    }
298}
299
300mod r#async {
301    use serde::de::DeserializeOwned;
302    use std::borrow::Cow;
303    use std::marker::PhantomData;
304    use std::pin::Pin;
305    use std::task::{Context, Poll};
306
307    use crate::common::*;
308    use crate::helpers::*;
309    pub use crate::stmt::AsyncBindable;
310    pub use crate::RawResult;
311
312    pub use super::_priv::*;
313    pub use crate::util::AsyncInlinable;
314    pub use crate::util::AsyncInlinableRead;
315    pub use crate::util::AsyncInlinableWrite;
316    pub use mdsn::Address;
317    pub use serde::de::value::Error as DeError;
318
319    pub use futures::stream::{Stream, StreamExt, TryStreamExt};
320
321    // use crate::iter::*;
322    #[cfg(feature = "async")]
323    use async_trait::async_trait;
324
325    pub struct AsyncBlocks<'a, T> {
326        query: &'a mut T,
327    }
328
329    impl<'a, T> Stream for AsyncBlocks<'a, T>
330    where
331        T: AsyncFetchable,
332    {
333        type Item = RawResult<RawBlock>;
334
335        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
336            self.query.fetch_raw_block(cx).map(|raw| {
337                raw.map(|raw| {
338                    raw.map(|raw| {
339                        self.query.update_summary(raw.nrows());
340                        raw
341                    })
342                })
343                .transpose()
344            })
345        }
346    }
347
348    pub struct AsyncRows<'a, T> {
349        blocks: AsyncBlocks<'a, T>,
350        block: Option<RawBlock>,
351        rows: Option<RowsIter<'a>>,
352    }
353
354    impl<'a, T> AsyncRows<'a, T>
355    where
356        T: AsyncFetchable,
357    {
358        fn fetch(&mut self, cx: &mut Context<'_>) -> Poll<RawResult<Option<RowView<'a>>>> {
359            let poll = self.blocks.try_poll_next_unpin(cx);
360            match poll {
361                Poll::Ready(block) => match block.transpose() {
362                    Ok(Some(block)) => {
363                        self.block = Some(block);
364                        self.rows = self.block.as_mut().map(|raw| raw.rows());
365                        let row = self.rows.as_mut().unwrap().next();
366                        Poll::Ready(Ok(row))
367                    }
368                    Ok(None) => Poll::Ready(Ok(None)),
369                    Err(err) => Poll::Ready(Err(err)),
370                },
371                Poll::Pending => Poll::Pending,
372            }
373        }
374        fn next_row(&mut self, cx: &mut Context<'_>) -> Poll<RawResult<Option<RowView<'a>>>> {
375            // has block
376            if let Some(rows) = self.rows.as_mut() {
377                // check if block over.
378                if let Some(row) = rows.next() {
379                    Poll::Ready(Ok(Some(row)))
380                } else {
381                    self.fetch(cx)
382                }
383            } else {
384                // no data, start fetching.
385                self.fetch(cx)
386            }
387        }
388    }
389
390    impl<'a, T> Stream for AsyncRows<'a, T>
391    where
392        T: AsyncFetchable,
393    {
394        type Item = RawResult<RowView<'a>>;
395
396        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
397            self.next_row(cx).map(|row| row.transpose())
398        }
399    }
400
401    pub struct AsyncDeserialized<'a, T, V> {
402        rows: AsyncRows<'a, T>,
403        _marker: PhantomData<V>,
404    }
405
406    impl<'a, T, V> Unpin for AsyncDeserialized<'a, T, V> {}
407
408    impl<'a, T, V> Stream for AsyncDeserialized<'a, T, V>
409    where
410        T: AsyncFetchable,
411        V: DeserializeOwned,
412    {
413        type Item = RawResult<V>;
414
415        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
416            use futures::stream::*;
417            Pin::get_mut(self).rows.poll_next_unpin(cx).map(|row| {
418                row.map(|row| row.and_then(|mut row| V::deserialize(&mut row).map_err(Into::into)))
419            })
420        }
421    }
422
423    #[cfg(feature = "async")]
424    #[async_trait]
425    pub trait AsyncFetchable: Sized + Send + Sync {
426        fn affected_rows(&self) -> i32;
427
428        fn precision(&self) -> Precision;
429
430        fn fields(&self) -> &[Field];
431
432        fn filed_names(&self) -> Vec<&str> {
433            self.fields().iter().map(|f| f.name()).collect_vec()
434        }
435
436        fn num_of_fields(&self) -> usize {
437            self.fields().len()
438        }
439
440        fn summary(&self) -> (usize, usize);
441
442        #[doc(hidden)]
443        fn update_summary(&mut self, nrows: usize);
444
445        #[doc(hidden)]
446        fn fetch_raw_block(&mut self, cx: &mut Context<'_>) -> Poll<RawResult<Option<RawBlock>>>;
447
448        fn blocks(&mut self) -> AsyncBlocks<'_, Self> {
449            AsyncBlocks { query: self }
450        }
451
452        fn rows(&mut self) -> AsyncRows<'_, Self> {
453            AsyncRows {
454                blocks: self.blocks(),
455                block: None,
456                rows: None,
457            }
458        }
459
460        /// Records is a row-based 2-dimension matrix of values.
461        async fn to_records(&mut self) -> RawResult<Vec<Vec<Value>>> {
462            let future = self.rows().map_ok(RowView::into_values).try_collect();
463            future.await
464        }
465
466        fn deserialize<R>(&mut self) -> AsyncDeserialized<'_, Self, R>
467        where
468            R: serde::de::DeserializeOwned,
469        {
470            AsyncDeserialized {
471                rows: self.rows(),
472                _marker: PhantomData,
473            }
474        }
475    }
476
477    #[cfg(feature = "async")]
478    /// The synchronous query trait for TDengine connection.
479    #[async_trait]
480    pub trait AsyncQueryable: Send + Sync + Sized {
481        // type B: for<'b> BlockExt<'b, 'b>;
482        type AsyncResultSet: AsyncFetchable;
483
484        async fn query<T: AsRef<str> + Send + Sync>(
485            &self,
486            sql: T,
487        ) -> RawResult<Self::AsyncResultSet>;
488
489        async fn put(&self, schemaless_data: &SmlData) -> RawResult<()>;
490
491        // async fn put_line_protocol;
492        // async fn put_opentsdb_lines;
493        // async fn put_json()
494
495        async fn query_with_req_id<T: AsRef<str> + Send + Sync>(
496            &self,
497            sql: T,
498            req_id: u64,
499        ) -> RawResult<Self::AsyncResultSet>;
500
501        async fn exec<T: AsRef<str> + Send + Sync>(&self, sql: T) -> RawResult<usize> {
502            let sql = sql.as_ref();
503            // log::trace!("exec sql: {sql}");
504            self.query(sql).await.map(|res| res.affected_rows() as _)
505        }
506
507        async fn exec_with_req_id<T: AsRef<str> + Send + Sync>(
508            &self,
509            sql: T,
510            req_id: u64,
511        ) -> RawResult<usize> {
512            let sql = sql.as_ref();
513            // log::trace!("exec sql: {sql}");
514            self.query_with_req_id(sql, req_id)
515                .await
516                .map(|res| res.affected_rows() as _)
517        }
518
519        async fn write_raw_meta(&self, meta: &RawMeta) -> RawResult<()>;
520
521        async fn write_raw_block(&self, block: &RawBlock) -> RawResult<()>;
522
523        async fn write_raw_block_with_req_id(&self, block: &RawBlock, req_id: u64)
524            -> RawResult<()>;
525
526        async fn exec_many<T, I>(&self, input: I) -> RawResult<usize>
527        where
528            T: AsRef<str> + Send + Sync,
529            I::IntoIter: Send,
530            I: IntoIterator<Item = T> + Send,
531        {
532            let mut aff = 0;
533            for sql in input {
534                aff += self.exec(sql).await?;
535            }
536            Ok(aff)
537        }
538
539        /// To conveniently get first row of the result, useful for queries like
540        ///
541        /// - `select count(*) from ...`
542        /// - `select last(*) from ...`
543        ///
544        /// Type `T` could be `Vec<Value>`, a tuple, or a struct with serde support.
545        ///
546        /// ## Example
547        ///
548        /// ```rust,ignore
549        /// let count: u32 = taos.query_one("select count(*) from table1")?.unwrap_or(0);
550        ///
551        /// let one: (i32, String, Timestamp) =
552        ///    taos.query_one("select c1,c2,c3 from table1 limit 1")?.unwrap_or_default();
553        /// ```
554        async fn query_one<T: AsRef<str> + Send + Sync, O: DeserializeOwned + Send>(
555            &self,
556            sql: T,
557        ) -> RawResult<Option<O>> {
558            use futures::StreamExt;
559            // log::trace!("query one with sql: {}", sql.as_ref());
560            self.query(sql)
561                .await?
562                .deserialize::<O>()
563                .take(1)
564                .collect::<Vec<_>>()
565                .await
566                .into_iter()
567                .next()
568                .map_or(Ok(None), |v| v.map(Some).map_err(Into::into))
569        }
570
571        /// Short for `SELECT server_version()` as [String].
572        async fn server_version(&self) -> RawResult<Cow<str>> {
573            Ok(self
574                .query_one::<_, String>("SELECT server_version()")
575                .await?
576                .expect("should always has result")
577                .into())
578        }
579
580        /// Short for `CREATE DATABASE IF NOT EXISTS {name}`.
581        async fn create_database<N: AsRef<str> + Send>(&self, name: N) -> RawResult<()> {
582            let query = format!("CREATE DATABASE IF NOT EXISTS {}", name.as_ref());
583
584            self.query(query).await?;
585            Ok(())
586        }
587
588        /// Short for `USE {name}`.
589        async fn use_database<N: AsRef<str> + Send>(&self, name: N) -> RawResult<()> {
590            let query = format!("USE `{}`", name.as_ref());
591
592            self.query(query).await?;
593            Ok(())
594        }
595
596        /// Short for `CREATE TOPIC IF NOT EXISTS {name} AS {sql}`.
597        async fn create_topic<N: AsRef<str> + Send + Sync, S: AsRef<str> + Send>(
598            &self,
599            name: N,
600            sql: S,
601        ) -> RawResult<()> {
602            let (name, sql) = (name.as_ref(), sql.as_ref());
603            let query = format!("CREATE TOPIC IF NOT EXISTS `{name}` AS {sql}");
604
605            self.query(query).await?;
606            Ok(())
607        }
608
609        /// Short for `CREATE TOPIC IF NOT EXISTS {name} WITH META AS DATABASE {db}`.
610        async fn create_topic_as_database(
611            &self,
612            name: impl AsRef<str> + Send + Sync + 'async_trait,
613            db: impl std::fmt::Display + Send + 'async_trait,
614        ) -> RawResult<()> {
615            let name = name.as_ref();
616            let query = format!("create topic if not exists `{name}` with meta as database `{db}`");
617            self.exec(&query).await?;
618            Ok(())
619        }
620
621        /// Short for `SHOW DATABASES`.
622        async fn databases(&self) -> RawResult<Vec<ShowDatabase>> {
623            use futures::stream::TryStreamExt;
624            Ok(self
625                .query("SHOW DATABASES")
626                .await?
627                .deserialize()
628                .try_collect()
629                .await?)
630        }
631
632        /// Topics information by `SELECT * FROM information_schema.ins_topics` sql.
633        ///
634        /// ## Compatibility
635        ///
636        /// This is a 3.x-only API.
637        async fn topics(&self) -> RawResult<Vec<Topic>> {
638            let sql = "SELECT * FROM information_schema.ins_topics";
639            log::trace!("query one with sql: {sql}");
640            Ok(self.query(sql).await?.deserialize().try_collect().await?)
641        }
642
643        /// Get table meta information.
644        async fn describe(&self, table: &str) -> RawResult<Describe> {
645            Ok(Describe(
646                self.query(format!("DESCRIBE `{table}`"))
647                    .await?
648                    .deserialize()
649                    .try_collect()
650                    .await?,
651            ))
652        }
653
654        /// Check if database exists
655        async fn database_exists(&self, name: &str) -> RawResult<bool> {
656            Ok(self.exec(format!("show `{name}`.stables")).await.is_ok())
657        }
658
659        /// Sync version of `exec`.
660        fn exec_sync<T: AsRef<str> + Send + Sync>(&self, sql: T) -> RawResult<usize> {
661            crate::block_in_place_or_global(self.exec(sql))
662        }
663
664        /// Sync version of `query`.
665        fn query_sync<T: AsRef<str> + Send + Sync>(
666            &self,
667            sql: T,
668        ) -> RawResult<Self::AsyncResultSet> {
669            crate::block_in_place_or_global(self.query(sql))
670        }
671
672        async fn table_vgroup_id(&self, _db: &str, _table: &str) -> Option<i32> {
673            None
674        }
675
676        async fn tables_vgroup_ids<T: AsRef<str> + Sync>(
677            &self,
678            _db: &str,
679            _tables: &[T],
680        ) -> Option<Vec<i32>> {
681            None
682        }
683    }
684
685    #[test]
686    fn test() {
687        assert!(true);
688    }
689}