#![cfg_attr(nightly, feature(const_slice_index))]
use std::{
collections::BTreeMap,
fmt::{Debug, Display},
mem::MaybeUninit,
ops::{Deref, DerefMut},
rc::Rc,
};
use async_trait::async_trait;
pub use mdsn::{Address, Dsn, DsnError, IntoDsn};
pub use serde::de::value::Error as DeError;
mod error;
pub use error::*;
pub mod common;
mod de;
pub mod helpers;
mod insert;
mod iter;
pub mod util;
use common::*;
pub use iter::*;
pub use common::RawBlock;
pub mod stmt;
pub mod tmq;
pub mod prelude;
pub use prelude::sync::{Fetchable, Queryable};
pub use prelude::{AsyncFetchable, AsyncQueryable};
static mut RT: MaybeUninit<tokio::runtime::Runtime> = MaybeUninit::uninit();
static INIT: std::sync::Once = std::sync::Once::new();
pub fn global_tokio_runtime() -> &'static tokio::runtime::Runtime {
unsafe {
INIT.call_once(|| {
RT.write(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
);
});
RT.assume_init_mut()
}
}
pub fn block_in_place_or_global<F: std::future::Future>(fut: F) -> F::Output {
use tokio::runtime::Handle;
use tokio::task;
match Handle::try_current() {
Ok(handle) => task::block_in_place(move || handle.block_on(fut)),
Err(_) => global_tokio_runtime().block_on(fut),
}
}
pub enum CodecOpts {
Raw,
Parquet,
}
pub trait BlockCodec {
fn encode(&self, _codec: CodecOpts) -> Vec<u8>;
fn decode(from: &[u8], _codec: CodecOpts) -> Self;
}
#[derive(Debug, thiserror::Error)]
pub struct PingError {
msg: String,
}
impl Display for PingError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.msg)
}
}
pub trait TBuilder: Sized + Send + Sync + 'static {
type Target: Send + Sync + 'static;
type Error: std::error::Error + From<DsnError>;
fn available_params() -> &'static [&'static str];
fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>;
fn client_version() -> &'static str;
#[doc(hidden)]
fn server_version(&self) -> Result<&str, Self::Error>;
#[doc(hidden)]
fn is_enterprise_edition(&self) -> Result<bool, Self::Error> {
Ok(false)
}
fn ping(&self, _: &mut Self::Target) -> Result<(), Self::Error>;
fn ready(&self) -> bool;
fn build(&self) -> Result<Self::Target, Self::Error>;
#[cfg(feature = "r2d2")]
fn pool(self) -> Result<r2d2::Pool<Manager<Self>>, r2d2::Error> {
self.pool_builder().build(Manager::new(self))
}
#[cfg(feature = "r2d2")]
#[inline]
fn pool_builder(&self) -> r2d2::Builder<Manager<Self>> {
r2d2::Builder::new()
.max_lifetime(Some(std::time::Duration::from_secs(12 * 60 * 60)))
.min_idle(Some(0))
.max_size(200)
.connection_timeout(std::time::Duration::from_secs(60))
}
#[cfg(feature = "r2d2")]
#[inline]
fn with_pool_builder(
self,
builder: r2d2::Builder<Manager<Self>>,
) -> Result<r2d2::Pool<Manager<Self>>, r2d2::Error> {
builder.build(Manager::new(self))
}
}
#[cfg(feature = "r2d2")]
impl<T: TBuilder> r2d2::ManageConnection for Manager<T> {
type Connection = T::Target;
type Error = T::Error;
fn connect(&self) -> Result<Self::Connection, Self::Error> {
self.deref().build()
}
fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
self.deref().ping(conn)
}
fn has_broken(&self, _: &mut Self::Connection) -> bool {
!self.deref().ready()
}
}
#[async_trait]
pub trait AsyncTBuilder: Sized + Send + Sync + 'static {
type Target: Send + Sync + 'static;
type Error: std::error::Error + From<DsnError>;
fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>;
fn client_version() -> &'static str;
#[doc(hidden)]
async fn server_version(&self) -> Result<&str, Self::Error>;
#[doc(hidden)]
async fn is_enterprise_edition(&self) -> Result<bool, Self::Error> {
Ok(false)
}
async fn ping(&self, _: &mut Self::Target) -> Result<(), Self::Error>;
async fn ready(&self) -> bool;
async fn build(&self) -> Result<Self::Target, Self::Error>;
#[cfg(feature = "deadpool")]
fn pool(
self,
) -> Result<deadpool::managed::Pool<Manager<Self>>, deadpool::managed::BuildError<Self::Error>>
{
let config = self.default_pool_config();
self.pool_builder()
.config(config)
.runtime(deadpool::Runtime::Tokio1)
.build()
}
#[cfg(feature = "deadpool")]
#[inline]
fn pool_builder(self) -> deadpool::managed::PoolBuilder<Manager<Self>> {
deadpool::managed::Pool::builder(Manager { manager: self })
}
#[cfg(feature = "deadpool")]
#[inline]
fn default_pool_config(&self) -> deadpool::managed::PoolConfig {
deadpool::managed::PoolConfig {
max_size: 500,
timeouts: deadpool::managed::Timeouts::default(),
}
}
#[cfg(feature = "deadpool")]
#[inline]
fn with_pool_config(
self,
config: deadpool::managed::PoolConfig,
) -> Result<deadpool::managed::Pool<Manager<Self>>, deadpool::managed::BuildError<Self::Error>>
{
deadpool::managed::Pool::builder(Manager { manager: self })
.config(config)
.runtime(deadpool::Runtime::Tokio1)
.build()
}
}
pub struct Manager<T> {
manager: T,
}
impl<T> Deref for Manager<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.manager
}
}
impl<T> DerefMut for Manager<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.manager
}
}
impl<T: TBuilder> Default for Manager<T> {
fn default() -> Self {
Self {
manager: T::from_dsn("taos:///").expect("connect with empty default TDengine dsn"),
}
}
}
impl<T: TBuilder> Manager<T> {
pub fn new(builder: T) -> Self {
Self { manager: builder }
}
#[inline]
pub fn from_dsn<D: IntoDsn>(dsn: D) -> Result<(Self, BTreeMap<String, String>), T::Error> {
let mut dsn = dsn.into_dsn()?;
let params = T::available_params();
let (valid, not): (BTreeMap<_, _>, BTreeMap<_, _>) = dsn
.params
.into_iter()
.partition(|(key, _)| params.contains(&key.as_str()));
dsn.params = valid;
T::from_dsn(dsn).map(|builder| (Manager::new(builder), not))
}
#[cfg(feature = "r2d2")]
#[inline]
pub fn into_pool(self) -> Result<r2d2::Pool<Self>, r2d2::Error> {
r2d2::Pool::new(self)
}
#[cfg(feature = "r2d2")]
#[inline]
pub fn into_pool_with_builder(
self,
builder: r2d2::Builder<Self>,
) -> Result<r2d2::Pool<Self>, r2d2::Error> {
builder.build(self)
}
}
#[cfg(all(feature = "r2d2", feature = "deadpool"))]
compile_error!("Use only ONE of r2d2 or deadpool");
#[cfg(feature = "r2d2")]
pub type Pool<T> = r2d2::Pool<Manager<T>>;
#[cfg(all(feature = "deadpool", not(feature = "r2d2")))]
pub type Pool<T> = deadpool::managed::Pool<Manager<T>>;
#[cfg(feature = "r2d2")]
pub type PoolBuilder<T> = r2d2::Builder<Manager<T>>;
#[async_trait]
impl<T: AsyncTBuilder> deadpool::managed::Manager for Manager<T> {
type Type = <T as AsyncTBuilder>::Target;
type Error = <T as AsyncTBuilder>::Error;
async fn create(&self) -> Result<Self::Type, Self::Error> {
self.manager.build().await
}
async fn recycle(&self, _: &mut Self::Type) -> deadpool::managed::RecycleResult<Self::Error> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::{fmt::Display, sync::atomic::AtomicUsize};
use super::*;
#[derive(Debug)]
struct Conn;
#[derive(Debug)]
struct MyResultSet;
impl Iterator for MyResultSet {
type Item = Result<RawBlock, Error>;
fn next(&mut self) -> Option<Self::Item> {
static mut AVAILABLE: bool = true;
if unsafe { AVAILABLE } {
unsafe { AVAILABLE = false };
Some(Ok(RawBlock::parse_from_raw_block_v2(
[1].as_slice(),
&[Field::new("a", Ty::TinyInt, 1)],
&[1],
1,
Precision::Millisecond,
)))
} else {
None
}
}
}
impl<'q> crate::Fetchable for MyResultSet {
type Error = Error;
fn fields(&self) -> &[Field] {
static mut F: Option<Vec<Field>> = None;
unsafe { F.get_or_insert(vec![Field::new("a", Ty::TinyInt, 1)]) };
unsafe { F.as_ref().unwrap() }
}
fn precision(&self) -> Precision {
Precision::Millisecond
}
fn summary(&self) -> (usize, usize) {
(0, 0)
}
fn affected_rows(&self) -> i32 {
0
}
fn update_summary(&mut self, _rows: usize) {}
fn fetch_raw_block(&mut self) -> Result<Option<RawBlock>, Self::Error> {
static mut B: AtomicUsize = AtomicUsize::new(4);
unsafe {
if B.load(std::sync::atomic::Ordering::SeqCst) == 0 {
return Ok(None);
}
}
unsafe { B.fetch_sub(1, std::sync::atomic::Ordering::SeqCst) };
Ok(Some(RawBlock::parse_from_raw_block_v2(
[1].as_slice(),
&[Field::new("a", Ty::TinyInt, 1)],
&[1],
1,
Precision::Millisecond,
)))
}
}
#[derive(Debug)]
struct Error;
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("empty error")
}
}
impl From<taos_error::Error> for Error {
fn from(_: taos_error::Error) -> Self {
Error
}
}
impl std::error::Error for Error {}
impl From<DsnError> for Error {
fn from(_: DsnError) -> Self {
Error
}
}
impl TBuilder for Conn {
type Target = MyResultSet;
type Error = Error;
fn available_params() -> &'static [&'static str] {
&[]
}
fn from_dsn<D: IntoDsn>(_dsn: D) -> Result<Self, Self::Error> {
Ok(Self)
}
fn client_version() -> &'static str {
"3"
}
fn ready(&self) -> bool {
true
}
fn build(&self) -> Result<Self::Target, Self::Error> {
Ok(MyResultSet)
}
fn ping(&self, _: &mut Self::Target) -> Result<(), Self::Error> {
Ok(())
}
fn server_version(&self) -> Result<&str, Self::Error> {
todo!()
}
fn is_enterprise_edition(&self) -> Result<bool, Self::Error> {
todo!()
}
}
impl Queryable for Conn {
type Error = anyhow::Error;
type ResultSet = MyResultSet;
fn query<T: AsRef<str>>(&self, _sql: T) -> Result<MyResultSet, Self::Error> {
Ok(MyResultSet)
}
fn query_with_req_id<T: AsRef<str>>(
&self,
_sql: T,
_req_id: u64,
) -> Result<Self::ResultSet, Self::Error> {
Ok(MyResultSet)
}
fn exec<T: AsRef<str>>(&self, _sql: T) -> Result<usize, Self::Error> {
Ok(1)
}
fn write_raw_meta(&self, _: &RawMeta) -> Result<(), Self::Error> {
Ok(())
}
fn write_raw_block(&self, _: &RawBlock) -> Result<(), Self::Error> {
Ok(())
}
fn put(&self, _data: &SmlData) -> Result<(), Self::Error> {
Ok(())
}
}
#[test]
fn query_deserialize() {
let conn = Conn;
let aff = conn.exec("nothing").unwrap();
assert_eq!(aff, 1);
let mut rs = conn.query("abc").unwrap();
for record in rs.deserialize::<(i32, String, u8)>() {
let _ = dbg!(record);
}
}
#[test]
fn block_deserialize_borrowed() {
let conn = Conn;
let aff = conn.exec("nothing").unwrap();
assert_eq!(aff, 1);
let mut set = conn.query("abc").unwrap();
for block in &mut set {
let block = block.unwrap();
for record in block.deserialize::<(i32,)>() {
dbg!(record.unwrap());
}
}
}
#[test]
fn block_deserialize_borrowed_bytes() {
let conn = Conn;
let aff = conn.exec("nothing").unwrap();
assert_eq!(aff, 1);
let mut set = conn.query("abc").unwrap();
for block in &mut set {
let block = block.unwrap();
for record in block.deserialize::<String>() {
dbg!(record.unwrap());
}
}
}
#[cfg(feature = "async")]
#[tokio::test]
async fn block_deserialize_borrowed_bytes_stream() {
let conn = Conn;
let aff = conn.exec("nothing").unwrap();
assert_eq!(aff, 1);
let mut set = conn.query("abc").unwrap();
for row in set.deserialize::<u8>() {
let row = row.unwrap();
dbg!(row);
}
}
#[test]
fn with_iter() {
let conn = Conn;
let aff = conn.exec("nothing").unwrap();
assert_eq!(aff, 1);
let mut set = conn.query("abc").unwrap();
for block in set.blocks() {
for row in block.unwrap().rows() {
for value in row {
println!("{:?}", value);
}
}
}
}
}