taos_query/
lib.rs

1//! This is the common query traits/types for TDengine connectors.
2//!
3#![allow(clippy::len_without_is_empty)]
4#![allow(clippy::type_complexity)]
5
6use std::{
7    collections::BTreeMap,
8    fmt::{Debug, Display},
9    ops::{Deref, DerefMut},
10    rc::Rc,
11};
12
13use async_trait::async_trait;
14pub use mdsn::{Address, Dsn, DsnError, IntoDsn};
15pub use serde::de::value::Error as DeError;
16
17mod error;
18
19pub mod common;
20mod de;
21pub mod helpers;
22
23mod iter;
24pub mod util;
25
26use common::*;
27pub use iter::*;
28
29pub use common::RawBlock;
30
31pub mod stmt;
32pub mod tmq;
33
34pub mod prelude;
35
36pub use prelude::sync::{Fetchable, Queryable};
37pub use prelude::{AsyncFetchable, AsyncQueryable};
38
39pub use taos_error::Error as RawError;
40use util::Edition;
41pub type RawResult<T> = std::result::Result<T, RawError>;
42
43lazy_static::lazy_static! {
44    static ref GLOBAL_RT: tokio::runtime::Runtime = {
45        tokio::runtime::Builder::new_multi_thread()
46                    .enable_all()
47                    .build()
48                    .unwrap()
49    };
50}
51
52pub fn global_tokio_runtime() -> &'static tokio::runtime::Runtime {
53    &GLOBAL_RT
54}
55
56pub fn block_in_place_or_global<F: std::future::Future>(fut: F) -> F::Output {
57    use tokio::runtime::Handle;
58    use tokio::task;
59
60    match Handle::try_current() {
61        Ok(handle) => task::block_in_place(move || handle.block_on(fut)),
62        Err(_) => global_tokio_runtime().block_on(fut),
63    }
64}
65
66pub enum CodecOpts {
67    Raw,
68    Parquet,
69}
70
71pub trait BlockCodec {
72    fn encode(&self, _codec: CodecOpts) -> Vec<u8>;
73    fn decode(from: &[u8], _codec: CodecOpts) -> Self;
74}
75
76#[derive(Debug, thiserror::Error)]
77pub struct PingError {
78    msg: String,
79}
80impl Display for PingError {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        f.write_str(&self.msg)
83    }
84}
85
86/// A struct is `Connectable` when it can be build from a `Dsn`.
87pub trait TBuilder: Sized + Send + Sync + 'static {
88    type Target: Send + Sync + 'static;
89
90    /// A list of parameters available in DSN.
91    fn available_params() -> &'static [&'static str];
92
93    /// Connect with dsn without connection checking.
94    fn from_dsn<D: IntoDsn>(dsn: D) -> RawResult<Self>;
95
96    /// Get client version.
97    fn client_version() -> &'static str;
98
99    /// Get server version.
100    #[doc(hidden)]
101    fn server_version(&self) -> RawResult<&str>;
102
103    /// Check if the server is an enterprise edition.
104    #[doc(hidden)]
105    fn is_enterprise_edition(&self) -> RawResult<bool> {
106        Ok(false)
107    }
108
109    /// Get the edition.
110    #[doc(hidden)]
111    fn get_edition(&self) -> RawResult<Edition>;
112
113    /// Assert the server is an enterprise edition.
114    #[doc(hidden)]
115    fn assert_enterprise_edition(&self) -> RawResult<()> {
116        if let Ok(edition) = self.get_edition() {
117            edition.assert_enterprise_edition()
118        } else {
119            Err(RawError::from_string("get edition failed"))
120        }
121    }
122
123    /// Check a connection is still alive.
124    fn ping(&self, _: &mut Self::Target) -> RawResult<()>;
125
126    /// Check if it's ready to connect.
127    ///
128    /// In most cases, just return true. `r2d2` will use this method to check if it's valid to create a connection.
129    /// Just check the address is ready to connect.
130    fn ready(&self) -> bool;
131
132    /// Create a new connection from this struct.
133    fn build(&self) -> RawResult<Self::Target>;
134
135    /// Build connection pool with [r2d2::Pool]
136    ///
137    /// Here we will use some default options with [r2d2::Builder]
138    ///
139    /// - max_lifetime: 12h,
140    /// - max_size: 500,
141    /// - min_idle: 2.
142    /// - connection_timeout: 60s.
143    #[cfg(feature = "r2d2")]
144    fn pool(self) -> RawResult<r2d2::Pool<Manager<Self>>, r2d2::Error> {
145        self.pool_builder().build(Manager::new(self))
146    }
147
148    /// [r2d2::Builder] generation from config.
149    #[cfg(feature = "r2d2")]
150    #[inline]
151    fn pool_builder(&self) -> r2d2::Builder<Manager<Self>> {
152        r2d2::Builder::new()
153            .max_lifetime(Some(std::time::Duration::from_secs(12 * 60 * 60)))
154            .min_idle(Some(0))
155            .max_size(200)
156            .connection_timeout(std::time::Duration::from_secs(60))
157    }
158
159    /// Build connection pool with [r2d2::Builder]
160    #[cfg(feature = "r2d2")]
161    #[inline]
162    fn with_pool_builder(
163        self,
164        builder: r2d2::Builder<Manager<Self>>,
165    ) -> RawResult<r2d2::Pool<Manager<Self>>, r2d2::Error> {
166        builder.build(Manager::new(self))
167    }
168}
169
170#[cfg(feature = "r2d2")]
171impl<T: TBuilder> r2d2::ManageConnection for Manager<T> {
172    type Connection = T::Target;
173
174    type Error = T::Error;
175
176    fn connect(&self) -> RawResult<Self::Connection> {
177        self.deref().build()
178    }
179
180    fn is_valid(&self, conn: &mut Self::Connection) -> RawResult<()> {
181        self.deref().ping(conn)
182    }
183
184    fn has_broken(&self, _: &mut Self::Connection) -> bool {
185        !self.deref().ready()
186    }
187}
188
189/// A struct is `Connectable` when it can be build from a `Dsn`.
190#[async_trait]
191pub trait AsyncTBuilder: Sized + Send + Sync + 'static {
192    type Target: Send + Sync + 'static;
193
194    /// Connect with dsn without connection checking.
195    fn from_dsn<D: IntoDsn>(dsn: D) -> RawResult<Self>;
196
197    /// Get client version.
198    fn client_version() -> &'static str;
199
200    /// Get server version.
201    #[doc(hidden)]
202    async fn server_version(&self) -> RawResult<&str>;
203
204    /// Check if the server is an enterprise edition.
205    #[doc(hidden)]
206    async fn is_enterprise_edition(&self) -> RawResult<bool> {
207        Ok(false)
208    }
209
210    /// Get the edition.
211    #[doc(hidden)]
212    async fn get_edition(&self) -> RawResult<Edition>;
213
214    /// Assert the server is an enterprise edition.
215    #[doc(hidden)]
216    async fn assert_enterprise_edition(&self) -> RawResult<()> {
217        if let Ok(edition) = self.get_edition().await {
218            edition.assert_enterprise_edition()
219        } else {
220            Err(RawError::from_string("get edition failed"))
221        }
222    }
223
224    /// Check a connection is still alive.
225    async fn ping(&self, _: &mut Self::Target) -> RawResult<()>;
226
227    /// Check if it's ready to connect.
228    ///
229    /// In most cases, just return true. `r2d2` will use this method to check if it's valid to create a connection.
230    /// Just check the address is ready to connect.
231    async fn ready(&self) -> bool;
232
233    /// Create a new connection from this struct.
234    async fn build(&self) -> RawResult<Self::Target>;
235
236    /// Build connection pool with [deadpool::managed::Pool].
237    ///
238    /// Default:
239    /// - max_size: 500
240    #[cfg(feature = "deadpool")]
241    fn pool(self) -> RawResult<deadpool::managed::Pool<Manager<Self>>> {
242        let config = self.default_pool_config();
243        self.pool_builder()
244            .config(config)
245            .runtime(deadpool::Runtime::Tokio1)
246            .build()
247            .map_err(RawError::from_any)
248    }
249
250    /// [deadpool::managed::PoolBuilder] generation from config.
251    #[cfg(feature = "deadpool")]
252    #[inline]
253    fn pool_builder(self) -> deadpool::managed::PoolBuilder<Manager<Self>> {
254        deadpool::managed::Pool::builder(Manager { manager: self })
255    }
256
257    #[cfg(feature = "deadpool")]
258    #[inline]
259    fn default_pool_config(&self) -> deadpool::managed::PoolConfig {
260        deadpool::managed::PoolConfig {
261            max_size: 500,
262            timeouts: deadpool::managed::Timeouts::default(),
263            queue_mode: deadpool::managed::QueueMode::Fifo,
264        }
265    }
266
267    /// Build connection pool with [deadpool::managed::PoolBuilder]
268    #[cfg(feature = "deadpool")]
269    #[inline]
270    fn with_pool_config(
271        self,
272        config: deadpool::managed::PoolConfig,
273    ) -> RawResult<deadpool::managed::Pool<Manager<Self>>> {
274        deadpool::managed::Pool::builder(Manager { manager: self })
275            .config(config)
276            .runtime(deadpool::Runtime::Tokio1)
277            .build()
278            .map_err(RawError::from_any)
279    }
280}
281
282/// This is how we manage connections.
283pub struct Manager<T> {
284    manager: T,
285}
286
287impl<T> Deref for Manager<T> {
288    type Target = T;
289
290    fn deref(&self) -> &Self::Target {
291        &self.manager
292    }
293}
294impl<T> DerefMut for Manager<T> {
295    fn deref_mut(&mut self) -> &mut Self::Target {
296        &mut self.manager
297    }
298}
299
300impl<T: TBuilder> Default for Manager<T> {
301    fn default() -> Self {
302        Self {
303            manager: T::from_dsn("taos:///").expect("connect with empty default TDengine dsn"),
304        }
305    }
306}
307
308impl<T: TBuilder> Manager<T> {
309    pub fn new(builder: T) -> Self {
310        Self { manager: builder }
311    }
312    /// Build a connection manager from a DSN.
313    #[inline]
314    pub fn from_dsn<D: IntoDsn>(dsn: D) -> RawResult<(Self, BTreeMap<String, String>)> {
315        let mut dsn = dsn.into_dsn()?;
316
317        let params = T::available_params();
318        let (valid, not): (BTreeMap<_, _>, BTreeMap<_, _>) = dsn
319            .params
320            .into_iter()
321            .partition(|(key, _)| params.contains(&key.as_str()));
322
323        dsn.params = valid;
324
325        T::from_dsn(dsn).map(|builder| (Manager::new(builder), not))
326    }
327
328    #[cfg(feature = "r2d2")]
329    #[inline]
330    pub fn into_pool(self) -> RawResult<r2d2::Pool<Self>, r2d2::Error> {
331        r2d2::Pool::new(self)
332    }
333
334    #[cfg(feature = "r2d2")]
335    #[inline]
336    pub fn into_pool_with_builder(
337        self,
338        builder: r2d2::Builder<Self>,
339    ) -> RawResult<r2d2::Pool<Self>, r2d2::Error> {
340        builder.build(self)
341    }
342}
343
344#[cfg(all(feature = "r2d2", feature = "deadpool"))]
345compile_error!("Use only ONE of r2d2 or deadpool");
346
347#[cfg(feature = "r2d2")]
348pub type Pool<T> = r2d2::Pool<Manager<T>>;
349
350#[cfg(all(feature = "deadpool", not(feature = "r2d2")))]
351pub type Pool<T> = deadpool::managed::Pool<Manager<T>>;
352
353#[cfg(feature = "r2d2")]
354pub type PoolBuilder<T> = r2d2::Builder<Manager<T>>;
355
356#[async_trait]
357impl<T: AsyncTBuilder> deadpool::managed::Manager for Manager<T> {
358    type Type = <T as AsyncTBuilder>::Target;
359    type Error = RawError;
360
361    async fn create(&self) -> RawResult<Self::Type> {
362        self.manager.build().await
363    }
364
365    async fn recycle(
366        &self,
367        conn: &mut Self::Type,
368        _: &deadpool::managed::Metrics,
369    ) -> deadpool::managed::RecycleResult<Self::Error> {
370        self.ping(conn).await.map_err(RawError::from_any)?;
371        Ok(())
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use std::sync::atomic::AtomicUsize;
378
379    use super::*;
380    #[derive(Debug)]
381    struct Conn;
382
383    #[derive(Debug)]
384    struct MyResultSet;
385
386    impl Iterator for MyResultSet {
387        type Item = RawResult<RawBlock>;
388
389        fn next(&mut self) -> Option<Self::Item> {
390            static mut AVAILABLE: bool = true;
391            if unsafe { AVAILABLE } {
392                unsafe { AVAILABLE = false };
393
394                Some(Ok(RawBlock::parse_from_raw_block_v2(
395                    [1].as_slice(),
396                    &[Field::new("a", Ty::TinyInt, 1)],
397                    &[1],
398                    1,
399                    Precision::Millisecond,
400                )))
401            } else {
402                None
403            }
404        }
405    }
406
407    impl<'q> crate::Fetchable for MyResultSet {
408        fn fields(&self) -> &[Field] {
409            static mut F: Option<Vec<Field>> = None;
410            unsafe { F.get_or_insert(vec![Field::new("a", Ty::TinyInt, 1)]) };
411            unsafe { F.as_ref().unwrap() }
412        }
413
414        fn precision(&self) -> Precision {
415            Precision::Millisecond
416        }
417
418        fn summary(&self) -> (usize, usize) {
419            (0, 0)
420        }
421
422        fn affected_rows(&self) -> i32 {
423            0
424        }
425
426        fn update_summary(&mut self, _rows: usize) {}
427
428        fn fetch_raw_block(&mut self) -> RawResult<Option<RawBlock>> {
429            static mut B: AtomicUsize = AtomicUsize::new(4);
430            unsafe {
431                if B.load(std::sync::atomic::Ordering::SeqCst) == 0 {
432                    return Ok(None);
433                }
434            }
435            unsafe { B.fetch_sub(1, std::sync::atomic::Ordering::SeqCst) };
436
437            Ok(Some(RawBlock::parse_from_raw_block_v2(
438                [1].as_slice(),
439                &[Field::new("a", Ty::TinyInt, 1)],
440                &[1],
441                1,
442                Precision::Millisecond,
443            )))
444        }
445    }
446
447    #[derive(Debug)]
448    struct Error;
449
450    impl Display for Error {
451        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452            f.write_str("empty error")
453        }
454    }
455
456    impl From<taos_error::Error> for Error {
457        fn from(_: taos_error::Error) -> Self {
458            Error
459        }
460    }
461
462    impl std::error::Error for Error {}
463    impl From<DsnError> for Error {
464        fn from(_: DsnError) -> Self {
465            Error
466        }
467    }
468
469    impl TBuilder for Conn {
470        type Target = MyResultSet;
471
472        fn available_params() -> &'static [&'static str] {
473            &[]
474        }
475
476        fn from_dsn<D: IntoDsn>(_dsn: D) -> RawResult<Self> {
477            Ok(Self)
478        }
479
480        fn client_version() -> &'static str {
481            "3"
482        }
483
484        fn ready(&self) -> bool {
485            true
486        }
487
488        fn build(&self) -> RawResult<Self::Target> {
489            Ok(MyResultSet)
490        }
491
492        fn ping(&self, _: &mut Self::Target) -> RawResult<()> {
493            Ok(())
494        }
495
496        fn server_version(&self) -> RawResult<&str> {
497            todo!()
498        }
499
500        fn is_enterprise_edition(&self) -> RawResult<bool> {
501            todo!()
502        }
503
504        fn get_edition(&self) -> RawResult<Edition> {
505            Ok(Edition::new("community", false))
506        }
507    }
508
509    impl Queryable for Conn {
510        type ResultSet = MyResultSet;
511
512        fn query<T: AsRef<str>>(&self, _sql: T) -> RawResult<MyResultSet> {
513            Ok(MyResultSet)
514        }
515
516        fn query_with_req_id<T: AsRef<str>>(
517            &self,
518            _sql: T,
519            _req_id: u64,
520        ) -> RawResult<Self::ResultSet> {
521            Ok(MyResultSet)
522        }
523
524        fn exec<T: AsRef<str>>(&self, _sql: T) -> RawResult<usize> {
525            Ok(1)
526        }
527
528        fn write_raw_meta(&self, _: &RawMeta) -> RawResult<()> {
529            Ok(())
530        }
531
532        fn write_raw_block(&self, _: &RawBlock) -> RawResult<()> {
533            Ok(())
534        }
535
536        fn write_raw_block_with_req_id(&self, _: &RawBlock, _: u64) -> RawResult<()> {
537            Ok(())
538        }
539
540        fn put(&self, _data: &SmlData) -> RawResult<()> {
541            Ok(())
542        }
543    }
544    #[test]
545    fn query_deserialize() {
546        let conn = Conn;
547
548        let aff = conn.exec("nothing").unwrap();
549        assert_eq!(aff, 1);
550
551        let mut rs = conn.query("abc").unwrap();
552
553        for record in rs.deserialize::<(i32, String, u8)>() {
554            let _ = dbg!(record);
555        }
556    }
557    #[test]
558    fn block_deserialize_borrowed() {
559        let conn = Conn;
560
561        let aff = conn.exec("nothing").unwrap();
562        assert_eq!(aff, 1);
563
564        let mut set = conn.query("abc").unwrap();
565        for block in &mut set {
566            let block = block.unwrap();
567            for record in block.deserialize::<(i32,)>() {
568                dbg!(record.unwrap());
569            }
570        }
571    }
572    #[test]
573    fn block_deserialize_borrowed_bytes() {
574        let conn = Conn;
575
576        let aff = conn.exec("nothing").unwrap();
577        assert_eq!(aff, 1);
578
579        let mut set = conn.query("abc").unwrap();
580
581        for block in &mut set {
582            let block = block.unwrap();
583            for record in block.deserialize::<String>() {
584                dbg!(record.unwrap());
585            }
586        }
587    }
588    #[cfg(feature = "async")]
589    #[tokio::test]
590    async fn block_deserialize_borrowed_bytes_stream() {
591        let conn = Conn;
592
593        let aff = conn.exec("nothing").unwrap();
594        assert_eq!(aff, 1);
595
596        let mut set = conn.query("abc").unwrap();
597
598        for row in set.deserialize::<u8>() {
599            let row = row.unwrap();
600            dbg!(row);
601        }
602    }
603    #[test]
604    fn with_iter() {
605        let conn = Conn;
606
607        let aff = conn.exec("nothing").unwrap();
608        assert_eq!(aff, 1);
609
610        let mut set = conn.query("abc").unwrap();
611
612        for block in set.blocks() {
613            // todo
614            for row in block.unwrap().rows() {
615                for value in row {
616                    println!("{:?}", value);
617                }
618            }
619        }
620    }
621}