1#![allow(clippy::len_without_is_empty)]
4#![allow(clippy::type_complexity)]
5
6use std::{
7 collections::BTreeMap,
8 fmt::{Debug, Display},
9 ops::{Deref, DerefMut},
10 rc::Rc,
11};
12
13use async_trait::async_trait;
14pub use mdsn::{Address, Dsn, DsnError, IntoDsn};
15pub use serde::de::value::Error as DeError;
16
17mod error;
18
19pub mod common;
20mod de;
21pub mod helpers;
22
23mod iter;
24pub mod util;
25
26use common::*;
27pub use iter::*;
28
29pub use common::RawBlock;
30
31pub mod stmt;
32pub mod tmq;
33
34pub mod prelude;
35
36pub use prelude::sync::{Fetchable, Queryable};
37pub use prelude::{AsyncFetchable, AsyncQueryable};
38
39pub use taos_error::Error as RawError;
40use util::Edition;
41pub type RawResult<T> = std::result::Result<T, RawError>;
42
43lazy_static::lazy_static! {
44 static ref GLOBAL_RT: tokio::runtime::Runtime = {
45 tokio::runtime::Builder::new_multi_thread()
46 .enable_all()
47 .build()
48 .unwrap()
49 };
50}
51
52pub fn global_tokio_runtime() -> &'static tokio::runtime::Runtime {
53 &GLOBAL_RT
54}
55
56pub fn block_in_place_or_global<F: std::future::Future>(fut: F) -> F::Output {
57 use tokio::runtime::Handle;
58 use tokio::task;
59
60 match Handle::try_current() {
61 Ok(handle) => task::block_in_place(move || handle.block_on(fut)),
62 Err(_) => global_tokio_runtime().block_on(fut),
63 }
64}
65
66pub enum CodecOpts {
67 Raw,
68 Parquet,
69}
70
71pub trait BlockCodec {
72 fn encode(&self, _codec: CodecOpts) -> Vec<u8>;
73 fn decode(from: &[u8], _codec: CodecOpts) -> Self;
74}
75
76#[derive(Debug, thiserror::Error)]
77pub struct PingError {
78 msg: String,
79}
80impl Display for PingError {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 f.write_str(&self.msg)
83 }
84}
85
86pub trait TBuilder: Sized + Send + Sync + 'static {
88 type Target: Send + Sync + 'static;
89
90 fn available_params() -> &'static [&'static str];
92
93 fn from_dsn<D: IntoDsn>(dsn: D) -> RawResult<Self>;
95
96 fn client_version() -> &'static str;
98
99 #[doc(hidden)]
101 fn server_version(&self) -> RawResult<&str>;
102
103 #[doc(hidden)]
105 fn is_enterprise_edition(&self) -> RawResult<bool> {
106 Ok(false)
107 }
108
109 #[doc(hidden)]
111 fn get_edition(&self) -> RawResult<Edition>;
112
113 #[doc(hidden)]
115 fn assert_enterprise_edition(&self) -> RawResult<()> {
116 if let Ok(edition) = self.get_edition() {
117 edition.assert_enterprise_edition()
118 } else {
119 Err(RawError::from_string("get edition failed"))
120 }
121 }
122
123 fn ping(&self, _: &mut Self::Target) -> RawResult<()>;
125
126 fn ready(&self) -> bool;
131
132 fn build(&self) -> RawResult<Self::Target>;
134
135 #[cfg(feature = "r2d2")]
144 fn pool(self) -> RawResult<r2d2::Pool<Manager<Self>>, r2d2::Error> {
145 self.pool_builder().build(Manager::new(self))
146 }
147
148 #[cfg(feature = "r2d2")]
150 #[inline]
151 fn pool_builder(&self) -> r2d2::Builder<Manager<Self>> {
152 r2d2::Builder::new()
153 .max_lifetime(Some(std::time::Duration::from_secs(12 * 60 * 60)))
154 .min_idle(Some(0))
155 .max_size(200)
156 .connection_timeout(std::time::Duration::from_secs(60))
157 }
158
159 #[cfg(feature = "r2d2")]
161 #[inline]
162 fn with_pool_builder(
163 self,
164 builder: r2d2::Builder<Manager<Self>>,
165 ) -> RawResult<r2d2::Pool<Manager<Self>>, r2d2::Error> {
166 builder.build(Manager::new(self))
167 }
168}
169
170#[cfg(feature = "r2d2")]
171impl<T: TBuilder> r2d2::ManageConnection for Manager<T> {
172 type Connection = T::Target;
173
174 type Error = T::Error;
175
176 fn connect(&self) -> RawResult<Self::Connection> {
177 self.deref().build()
178 }
179
180 fn is_valid(&self, conn: &mut Self::Connection) -> RawResult<()> {
181 self.deref().ping(conn)
182 }
183
184 fn has_broken(&self, _: &mut Self::Connection) -> bool {
185 !self.deref().ready()
186 }
187}
188
189#[async_trait]
191pub trait AsyncTBuilder: Sized + Send + Sync + 'static {
192 type Target: Send + Sync + 'static;
193
194 fn from_dsn<D: IntoDsn>(dsn: D) -> RawResult<Self>;
196
197 fn client_version() -> &'static str;
199
200 #[doc(hidden)]
202 async fn server_version(&self) -> RawResult<&str>;
203
204 #[doc(hidden)]
206 async fn is_enterprise_edition(&self) -> RawResult<bool> {
207 Ok(false)
208 }
209
210 #[doc(hidden)]
212 async fn get_edition(&self) -> RawResult<Edition>;
213
214 #[doc(hidden)]
216 async fn assert_enterprise_edition(&self) -> RawResult<()> {
217 if let Ok(edition) = self.get_edition().await {
218 edition.assert_enterprise_edition()
219 } else {
220 Err(RawError::from_string("get edition failed"))
221 }
222 }
223
224 async fn ping(&self, _: &mut Self::Target) -> RawResult<()>;
226
227 async fn ready(&self) -> bool;
232
233 async fn build(&self) -> RawResult<Self::Target>;
235
236 #[cfg(feature = "deadpool")]
241 fn pool(self) -> RawResult<deadpool::managed::Pool<Manager<Self>>> {
242 let config = self.default_pool_config();
243 self.pool_builder()
244 .config(config)
245 .runtime(deadpool::Runtime::Tokio1)
246 .build()
247 .map_err(RawError::from_any)
248 }
249
250 #[cfg(feature = "deadpool")]
252 #[inline]
253 fn pool_builder(self) -> deadpool::managed::PoolBuilder<Manager<Self>> {
254 deadpool::managed::Pool::builder(Manager { manager: self })
255 }
256
257 #[cfg(feature = "deadpool")]
258 #[inline]
259 fn default_pool_config(&self) -> deadpool::managed::PoolConfig {
260 deadpool::managed::PoolConfig {
261 max_size: 500,
262 timeouts: deadpool::managed::Timeouts::default(),
263 queue_mode: deadpool::managed::QueueMode::Fifo,
264 }
265 }
266
267 #[cfg(feature = "deadpool")]
269 #[inline]
270 fn with_pool_config(
271 self,
272 config: deadpool::managed::PoolConfig,
273 ) -> RawResult<deadpool::managed::Pool<Manager<Self>>> {
274 deadpool::managed::Pool::builder(Manager { manager: self })
275 .config(config)
276 .runtime(deadpool::Runtime::Tokio1)
277 .build()
278 .map_err(RawError::from_any)
279 }
280}
281
282pub struct Manager<T> {
284 manager: T,
285}
286
287impl<T> Deref for Manager<T> {
288 type Target = T;
289
290 fn deref(&self) -> &Self::Target {
291 &self.manager
292 }
293}
294impl<T> DerefMut for Manager<T> {
295 fn deref_mut(&mut self) -> &mut Self::Target {
296 &mut self.manager
297 }
298}
299
300impl<T: TBuilder> Default for Manager<T> {
301 fn default() -> Self {
302 Self {
303 manager: T::from_dsn("taos:///").expect("connect with empty default TDengine dsn"),
304 }
305 }
306}
307
308impl<T: TBuilder> Manager<T> {
309 pub fn new(builder: T) -> Self {
310 Self { manager: builder }
311 }
312 #[inline]
314 pub fn from_dsn<D: IntoDsn>(dsn: D) -> RawResult<(Self, BTreeMap<String, String>)> {
315 let mut dsn = dsn.into_dsn()?;
316
317 let params = T::available_params();
318 let (valid, not): (BTreeMap<_, _>, BTreeMap<_, _>) = dsn
319 .params
320 .into_iter()
321 .partition(|(key, _)| params.contains(&key.as_str()));
322
323 dsn.params = valid;
324
325 T::from_dsn(dsn).map(|builder| (Manager::new(builder), not))
326 }
327
328 #[cfg(feature = "r2d2")]
329 #[inline]
330 pub fn into_pool(self) -> RawResult<r2d2::Pool<Self>, r2d2::Error> {
331 r2d2::Pool::new(self)
332 }
333
334 #[cfg(feature = "r2d2")]
335 #[inline]
336 pub fn into_pool_with_builder(
337 self,
338 builder: r2d2::Builder<Self>,
339 ) -> RawResult<r2d2::Pool<Self>, r2d2::Error> {
340 builder.build(self)
341 }
342}
343
344#[cfg(all(feature = "r2d2", feature = "deadpool"))]
345compile_error!("Use only ONE of r2d2 or deadpool");
346
347#[cfg(feature = "r2d2")]
348pub type Pool<T> = r2d2::Pool<Manager<T>>;
349
350#[cfg(all(feature = "deadpool", not(feature = "r2d2")))]
351pub type Pool<T> = deadpool::managed::Pool<Manager<T>>;
352
353#[cfg(feature = "r2d2")]
354pub type PoolBuilder<T> = r2d2::Builder<Manager<T>>;
355
356#[async_trait]
357impl<T: AsyncTBuilder> deadpool::managed::Manager for Manager<T> {
358 type Type = <T as AsyncTBuilder>::Target;
359 type Error = RawError;
360
361 async fn create(&self) -> RawResult<Self::Type> {
362 self.manager.build().await
363 }
364
365 async fn recycle(
366 &self,
367 conn: &mut Self::Type,
368 _: &deadpool::managed::Metrics,
369 ) -> deadpool::managed::RecycleResult<Self::Error> {
370 self.ping(conn).await.map_err(RawError::from_any)?;
371 Ok(())
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use std::sync::atomic::AtomicUsize;
378
379 use super::*;
380 #[derive(Debug)]
381 struct Conn;
382
383 #[derive(Debug)]
384 struct MyResultSet;
385
386 impl Iterator for MyResultSet {
387 type Item = RawResult<RawBlock>;
388
389 fn next(&mut self) -> Option<Self::Item> {
390 static mut AVAILABLE: bool = true;
391 if unsafe { AVAILABLE } {
392 unsafe { AVAILABLE = false };
393
394 Some(Ok(RawBlock::parse_from_raw_block_v2(
395 [1].as_slice(),
396 &[Field::new("a", Ty::TinyInt, 1)],
397 &[1],
398 1,
399 Precision::Millisecond,
400 )))
401 } else {
402 None
403 }
404 }
405 }
406
407 impl<'q> crate::Fetchable for MyResultSet {
408 fn fields(&self) -> &[Field] {
409 static mut F: Option<Vec<Field>> = None;
410 unsafe { F.get_or_insert(vec![Field::new("a", Ty::TinyInt, 1)]) };
411 unsafe { F.as_ref().unwrap() }
412 }
413
414 fn precision(&self) -> Precision {
415 Precision::Millisecond
416 }
417
418 fn summary(&self) -> (usize, usize) {
419 (0, 0)
420 }
421
422 fn affected_rows(&self) -> i32 {
423 0
424 }
425
426 fn update_summary(&mut self, _rows: usize) {}
427
428 fn fetch_raw_block(&mut self) -> RawResult<Option<RawBlock>> {
429 static mut B: AtomicUsize = AtomicUsize::new(4);
430 unsafe {
431 if B.load(std::sync::atomic::Ordering::SeqCst) == 0 {
432 return Ok(None);
433 }
434 }
435 unsafe { B.fetch_sub(1, std::sync::atomic::Ordering::SeqCst) };
436
437 Ok(Some(RawBlock::parse_from_raw_block_v2(
438 [1].as_slice(),
439 &[Field::new("a", Ty::TinyInt, 1)],
440 &[1],
441 1,
442 Precision::Millisecond,
443 )))
444 }
445 }
446
447 #[derive(Debug)]
448 struct Error;
449
450 impl Display for Error {
451 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452 f.write_str("empty error")
453 }
454 }
455
456 impl From<taos_error::Error> for Error {
457 fn from(_: taos_error::Error) -> Self {
458 Error
459 }
460 }
461
462 impl std::error::Error for Error {}
463 impl From<DsnError> for Error {
464 fn from(_: DsnError) -> Self {
465 Error
466 }
467 }
468
469 impl TBuilder for Conn {
470 type Target = MyResultSet;
471
472 fn available_params() -> &'static [&'static str] {
473 &[]
474 }
475
476 fn from_dsn<D: IntoDsn>(_dsn: D) -> RawResult<Self> {
477 Ok(Self)
478 }
479
480 fn client_version() -> &'static str {
481 "3"
482 }
483
484 fn ready(&self) -> bool {
485 true
486 }
487
488 fn build(&self) -> RawResult<Self::Target> {
489 Ok(MyResultSet)
490 }
491
492 fn ping(&self, _: &mut Self::Target) -> RawResult<()> {
493 Ok(())
494 }
495
496 fn server_version(&self) -> RawResult<&str> {
497 todo!()
498 }
499
500 fn is_enterprise_edition(&self) -> RawResult<bool> {
501 todo!()
502 }
503
504 fn get_edition(&self) -> RawResult<Edition> {
505 Ok(Edition::new("community", false))
506 }
507 }
508
509 impl Queryable for Conn {
510 type ResultSet = MyResultSet;
511
512 fn query<T: AsRef<str>>(&self, _sql: T) -> RawResult<MyResultSet> {
513 Ok(MyResultSet)
514 }
515
516 fn query_with_req_id<T: AsRef<str>>(
517 &self,
518 _sql: T,
519 _req_id: u64,
520 ) -> RawResult<Self::ResultSet> {
521 Ok(MyResultSet)
522 }
523
524 fn exec<T: AsRef<str>>(&self, _sql: T) -> RawResult<usize> {
525 Ok(1)
526 }
527
528 fn write_raw_meta(&self, _: &RawMeta) -> RawResult<()> {
529 Ok(())
530 }
531
532 fn write_raw_block(&self, _: &RawBlock) -> RawResult<()> {
533 Ok(())
534 }
535
536 fn write_raw_block_with_req_id(&self, _: &RawBlock, _: u64) -> RawResult<()> {
537 Ok(())
538 }
539
540 fn put(&self, _data: &SmlData) -> RawResult<()> {
541 Ok(())
542 }
543 }
544 #[test]
545 fn query_deserialize() {
546 let conn = Conn;
547
548 let aff = conn.exec("nothing").unwrap();
549 assert_eq!(aff, 1);
550
551 let mut rs = conn.query("abc").unwrap();
552
553 for record in rs.deserialize::<(i32, String, u8)>() {
554 let _ = dbg!(record);
555 }
556 }
557 #[test]
558 fn block_deserialize_borrowed() {
559 let conn = Conn;
560
561 let aff = conn.exec("nothing").unwrap();
562 assert_eq!(aff, 1);
563
564 let mut set = conn.query("abc").unwrap();
565 for block in &mut set {
566 let block = block.unwrap();
567 for record in block.deserialize::<(i32,)>() {
568 dbg!(record.unwrap());
569 }
570 }
571 }
572 #[test]
573 fn block_deserialize_borrowed_bytes() {
574 let conn = Conn;
575
576 let aff = conn.exec("nothing").unwrap();
577 assert_eq!(aff, 1);
578
579 let mut set = conn.query("abc").unwrap();
580
581 for block in &mut set {
582 let block = block.unwrap();
583 for record in block.deserialize::<String>() {
584 dbg!(record.unwrap());
585 }
586 }
587 }
588 #[cfg(feature = "async")]
589 #[tokio::test]
590 async fn block_deserialize_borrowed_bytes_stream() {
591 let conn = Conn;
592
593 let aff = conn.exec("nothing").unwrap();
594 assert_eq!(aff, 1);
595
596 let mut set = conn.query("abc").unwrap();
597
598 for row in set.deserialize::<u8>() {
599 let row = row.unwrap();
600 dbg!(row);
601 }
602 }
603 #[test]
604 fn with_iter() {
605 let conn = Conn;
606
607 let aff = conn.exec("nothing").unwrap();
608 assert_eq!(aff, 1);
609
610 let mut set = conn.query("abc").unwrap();
611
612 for block in set.blocks() {
613 for row in block.unwrap().rows() {
615 for value in row {
616 println!("{:?}", value);
617 }
618 }
619 }
620 }
621}