mod _priv {
    pub use crate::common::{
        AlterType, BorrowedValue, ColumnView, Field, JsonMeta, MetaAlter, MetaCreate, MetaDrop,
        Precision, RawBlock, RawMeta, TagWithValue, Ty, Value,
    };
    pub use crate::util::{Inlinable, InlinableRead, InlinableWrite};
    pub use itertools::Itertools;
    pub use mdsn::{Dsn, DsnError, IntoDsn};
    pub use taos_error::{Code, Error as RawError};
    pub use crate::tmq::{IsOffset, MessageSet, Timeout};
}
pub use crate::tmq::{AsAsyncConsumer, IsAsyncData, IsAsyncMeta};
pub use crate::AsyncTBuilder;
#[cfg(feature = "deadpool")]
pub use crate::Pool;
pub use crate::RawResult;
pub use _priv::*;
pub use futures::stream::{Stream, StreamExt, TryStreamExt};
pub use r#async::*;
pub use tokio;
pub mod sync {
    pub use crate::RawResult;
    pub use crate::TBuilder;
    #[cfg(feature = "r2d2")]
    pub use crate::{Pool, PoolBuilder};
    #[cfg(feature = "r2d2")]
    pub use r2d2::ManageConnection;
    use std::borrow::Cow;
    pub use super::_priv::*;
    pub use crate::stmt::Bindable;
    pub use crate::tmq::{AsConsumer, IsMeta};
    use serde::de::DeserializeOwned;
    pub use mdsn::{Address, Dsn, DsnError, IntoDsn};
    pub use serde::de::value::Error as DeError;
    use crate::common::*;
    use crate::helpers::*;
    pub struct IRowsIter<'a, T>
    where
        T: Fetchable,
    {
        iter: IBlockIter<'a, T>,
        block: Option<RawBlock>,
        rows: Option<RowsIter<'a>>,
    }
    impl<'a, T> IRowsIter<'a, T>
    where
        T: Fetchable,
    {
        fn fetch(&mut self) -> RawResult<Option<RowView<'a>>> {
            if let Some(block) = self.iter.next().transpose()? {
                self.block = Some(block);
                self.rows = self.block.as_mut().map(|raw| raw.rows());
                let row = self.rows.as_mut().unwrap().next();
                Ok(row)
            } else {
                Ok(None)
            }
        }
        fn next_row(&mut self) -> RawResult<Option<RowView<'a>>> {
            if let Some(rows) = self.rows.as_mut() {
                if let Some(row) = rows.next() {
                    Ok(Some(row))
                } else {
                    self.fetch()
                }
            } else {
                self.fetch()
            }
        }
    }
    impl<'a, T> Iterator for IRowsIter<'a, T>
    where
        T: Fetchable,
    {
        type Item = RawResult<RowView<'a>>;
        fn next(&mut self) -> Option<Self::Item> {
            self.next_row().transpose()
        }
    }
    pub struct IBlockIter<'a, T>
    where
        T: Fetchable,
    {
        query: &'a mut T,
    }
    impl<'a, T> Iterator for IBlockIter<'a, T>
    where
        T: Fetchable,
    {
        type Item = RawResult<RawBlock>;
        fn next(&mut self) -> Option<Self::Item> {
            self.query
                .fetch_raw_block()
                .map(|raw| {
                    if let Some(raw) = raw {
                        self.query.update_summary(raw.nrows());
                        Some(raw)
                    } else {
                        None
                    }
                })
                .transpose()
        }
    }
    pub trait Fetchable: Sized {
        fn affected_rows(&self) -> i32;
        fn precision(&self) -> Precision;
        fn fields(&self) -> &[Field];
        fn num_of_fields(&self) -> usize {
            self.fields().len()
        }
        fn summary(&self) -> (usize, usize);
        #[doc(hidden)]
        fn update_summary(&mut self, nrows: usize);
        #[doc(hidden)]
        fn fetch_raw_block(&mut self) -> RawResult<Option<RawBlock>>;
        fn blocks(&mut self) -> IBlockIter<'_, Self> {
            IBlockIter { query: self }
        }
        fn rows(&mut self) -> IRowsIter<'_, Self> {
            IRowsIter {
                iter: self.blocks(),
                block: None,
                rows: None,
            }
        }
        fn deserialize<T: DeserializeOwned>(
            &mut self,
        ) -> std::iter::Map<IRowsIter<'_, Self>, fn(RawResult<RowView>) -> RawResult<T>> {
            self.rows().map(|row| T::deserialize(&mut row?))
        }
        fn to_rows_vec(&mut self) -> RawResult<Vec<Vec<Value>>> {
            self.blocks()
                .map_ok(|raw| raw.to_values())
                .flatten_ok()
                .try_collect()
        }
    }
    pub trait Queryable {
        type ResultSet: Fetchable;
        fn query<T: AsRef<str>>(&self, sql: T) -> RawResult<Self::ResultSet>;
        fn query_with_req_id<T: AsRef<str>>(
            &self,
            sql: T,
            req_id: u64,
        ) -> RawResult<Self::ResultSet>;
        fn exec<T: AsRef<str>>(&self, sql: T) -> RawResult<usize> {
            self.query(sql).map(|res| res.affected_rows() as _)
        }
        fn write_raw_meta(&self, _: &RawMeta) -> RawResult<()>;
        fn write_raw_block(&self, _: &RawBlock) -> RawResult<()>;
        fn exec_many<T: AsRef<str>, I: IntoIterator<Item = T>>(
            &self,
            input: I,
        ) -> RawResult<usize> {
            input
                .into_iter()
                .map(|sql| self.exec(sql))
                .try_fold(0, |mut acc, aff| {
                    acc += aff?;
                    Ok(acc)
                })
        }
        fn query_one<T: AsRef<str>, O: DeserializeOwned>(&self, sql: T) -> RawResult<Option<O>> {
            self.query(sql)?
                .deserialize::<O>()
                .next()
                .map_or(Ok(None), |v| v.map(Some).map_err(Into::into))
        }
        fn server_version(&self) -> RawResult<Cow<str>> {
            Ok(self
                .query_one::<_, String>("SELECT server_version()")?
                .expect("should always has result")
                .into())
        }
        fn create_topic(&self, name: impl AsRef<str>, sql: impl AsRef<str>) -> RawResult<()> {
            let (name, sql) = (name.as_ref(), sql.as_ref());
            let query = format!("create topic if not exists {name} as {sql}");
            self.query(query)?;
            Ok(())
        }
        fn create_topic_as_database(
            &self,
            name: impl AsRef<str>,
            db: impl std::fmt::Display,
        ) -> RawResult<()> {
            let name = name.as_ref();
            let query = format!("create topic if not exists {name} as database `{db}`");
            self.exec(query)?;
            Ok(())
        }
        fn databases(&self) -> RawResult<Vec<ShowDatabase>> {
            self.query("show databases")?
                .deserialize()
                .try_collect()
                .map_err(Into::into)
        }
        fn topics(&self) -> RawResult<Vec<Topic>> {
            self.query("SELECT * FROM information_schema.ins_topics")?
                .deserialize()
                .try_collect()
                .map_err(Into::into)
        }
        fn describe(&self, table: &str) -> RawResult<Describe> {
            Ok(Describe(
                self.query(format!("describe `{table}`"))?
                    .deserialize()
                    .try_collect()?,
            ))
        }
        fn database_exists(&self, name: &str) -> RawResult<bool> {
            Ok(self.exec(format!("show `{name}`.stables")).is_ok())
        }
        fn put(&self, data: &SmlData) -> RawResult<()>;
    }
}
mod r#async {
    use itertools::Itertools;
    use serde::de::DeserializeOwned;
    use std::borrow::Cow;
    use std::marker::PhantomData;
    use std::pin::Pin;
    use std::task::{Context, Poll};
    use crate::common::*;
    use crate::helpers::*;
    pub use crate::stmt::Bindable;
    pub use crate::RawResult;
    pub use super::_priv::*;
    pub use crate::util::AsyncInlinable;
    pub use crate::util::AsyncInlinableRead;
    pub use crate::util::AsyncInlinableWrite;
    pub use mdsn::{Address, Dsn, DsnError, IntoDsn};
    pub use serde::de::value::Error as DeError;
    pub use futures::stream::{Stream, StreamExt, TryStreamExt};
    #[cfg(feature = "async")]
    use async_trait::async_trait;
    pub struct AsyncBlocks<'a, T> {
        query: &'a mut T,
    }
    impl<'a, T> Stream for AsyncBlocks<'a, T>
    where
        T: AsyncFetchable,
    {
        type Item = RawResult<RawBlock>;
        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
            self.query.fetch_raw_block(cx).map(|raw| {
                raw.map(|raw| {
                    raw.map(|raw| {
                        self.query.update_summary(raw.nrows());
                        raw
                    })
                })
                .transpose()
            })
        }
    }
    pub struct AsyncRows<'a, T> {
        blocks: AsyncBlocks<'a, T>,
        block: Option<RawBlock>,
        rows: Option<RowsIter<'a>>,
    }
    impl<'a, T> AsyncRows<'a, T>
    where
        T: AsyncFetchable,
    {
        fn fetch(&mut self, cx: &mut Context<'_>) -> Poll<RawResult<Option<RowView<'a>>>> {
            let poll = self.blocks.try_poll_next_unpin(cx);
            match poll {
                Poll::Ready(block) => match block.transpose() {
                    Ok(Some(block)) => {
                        self.block = Some(block);
                        self.rows = self.block.as_mut().map(|raw| raw.rows());
                        let row = self.rows.as_mut().unwrap().next();
                        Poll::Ready(Ok(row))
                    }
                    Ok(None) => Poll::Ready(Ok(None)),
                    Err(err) => Poll::Ready(Err(err)),
                },
                Poll::Pending => Poll::Pending,
            }
        }
        fn next_row(&mut self, cx: &mut Context<'_>) -> Poll<RawResult<Option<RowView<'a>>>> {
            if let Some(rows) = self.rows.as_mut() {
                if let Some(row) = rows.next() {
                    Poll::Ready(Ok(Some(row)))
                } else {
                    self.fetch(cx)
                }
            } else {
                self.fetch(cx)
            }
        }
    }
    impl<'a, T> Stream for AsyncRows<'a, T>
    where
        T: AsyncFetchable,
    {
        type Item = RawResult<RowView<'a>>;
        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
            self.next_row(cx).map(|row| row.transpose())
        }
    }
    pub struct AsyncDeserialized<'a, T, V> {
        rows: AsyncRows<'a, T>,
        _marker: PhantomData<V>,
    }
    impl<'a, T, V> Unpin for AsyncDeserialized<'a, T, V> {}
    impl<'a, T, V> Stream for AsyncDeserialized<'a, T, V>
    where
        T: AsyncFetchable,
        V: DeserializeOwned,
    {
        type Item = RawResult<V>;
        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
            use futures::stream::*;
            Pin::get_mut(self).rows.poll_next_unpin(cx).map(|row| {
                row.map(|row| row.and_then(|mut row| V::deserialize(&mut row).map_err(Into::into)))
            })
        }
    }
    #[cfg(feature = "async")]
    #[async_trait]
    pub trait AsyncFetchable: Sized + Send + Sync {
        fn affected_rows(&self) -> i32;
        fn precision(&self) -> Precision;
        fn fields(&self) -> &[Field];
        fn filed_names(&self) -> Vec<&str> {
            self.fields().iter().map(|f| f.name()).collect_vec()
        }
        fn num_of_fields(&self) -> usize {
            self.fields().len()
        }
        fn summary(&self) -> (usize, usize);
        #[doc(hidden)]
        fn update_summary(&mut self, nrows: usize);
        #[doc(hidden)]
        fn fetch_raw_block(&mut self, cx: &mut Context<'_>) -> Poll<RawResult<Option<RawBlock>>>;
        fn blocks(&mut self) -> AsyncBlocks<'_, Self> {
            AsyncBlocks { query: self }
        }
        fn rows(&mut self) -> AsyncRows<'_, Self> {
            AsyncRows {
                blocks: self.blocks(),
                block: None,
                rows: None,
            }
        }
        async fn to_records(&mut self) -> RawResult<Vec<Vec<Value>>> {
            let future = self.rows().map_ok(RowView::into_values).try_collect();
            future.await
        }
        fn deserialize<R>(&mut self) -> AsyncDeserialized<'_, Self, R>
        where
            R: serde::de::DeserializeOwned,
        {
            AsyncDeserialized {
                rows: self.rows(),
                _marker: PhantomData,
            }
        }
    }
    #[cfg(feature = "async")]
    #[async_trait]
    pub trait AsyncQueryable: Send + Sync + Sized {
        type AsyncResultSet: AsyncFetchable;
        async fn query<T: AsRef<str> + Send + Sync>(
            &self,
            sql: T,
        ) -> RawResult<Self::AsyncResultSet>;
        async fn put(&self, schemaless_data: &SmlData) -> RawResult<()>;
        async fn query_with_req_id<T: AsRef<str> + Send + Sync>(
            &self,
            sql: T,
            req_id: u64,
        ) -> RawResult<Self::AsyncResultSet>;
        async fn exec<T: AsRef<str> + Send + Sync>(&self, sql: T) -> RawResult<usize> {
            let sql = sql.as_ref();
            self.query(sql).await.map(|res| res.affected_rows() as _)
        }
        async fn write_raw_meta(&self, meta: &RawMeta) -> RawResult<()>;
        async fn write_raw_block(&self, block: &RawBlock) -> RawResult<()>;
        async fn exec_many<T, I>(&self, input: I) -> RawResult<usize>
        where
            T: AsRef<str> + Send + Sync,
            I::IntoIter: Send,
            I: IntoIterator<Item = T> + Send,
        {
            let mut aff = 0;
            for sql in input {
                aff += self.exec(sql).await?;
            }
            Ok(aff)
        }
        async fn query_one<T: AsRef<str> + Send + Sync, O: DeserializeOwned + Send>(
            &self,
            sql: T,
        ) -> RawResult<Option<O>> {
            use futures::StreamExt;
            self.query(sql)
                .await?
                .deserialize::<O>()
                .take(1)
                .collect::<Vec<_>>()
                .await
                .into_iter()
                .next()
                .map_or(Ok(None), |v| v.map(Some).map_err(Into::into))
        }
        async fn server_version(&self) -> RawResult<Cow<str>> {
            Ok(self
                .query_one::<_, String>("SELECT server_version()")
                .await?
                .expect("should always has result")
                .into())
        }
        async fn create_database<N: AsRef<str> + Send>(&self, name: N) -> RawResult<()> {
            let query = format!("CREATE DATABASE IF NOT EXISTS {}", name.as_ref());
            self.query(query).await?;
            Ok(())
        }
        async fn use_database<N: AsRef<str> + Send>(&self, name: N) -> RawResult<()> {
            let query = format!("USE `{}`", name.as_ref());
            self.query(query).await?;
            Ok(())
        }
        async fn create_topic<N: AsRef<str> + Send + Sync, S: AsRef<str> + Send>(
            &self,
            name: N,
            sql: S,
        ) -> RawResult<()> {
            let (name, sql) = (name.as_ref(), sql.as_ref());
            let query = format!("CREATE TOPIC IF NOT EXISTS {name} AS {sql}");
            self.query(query).await?;
            Ok(())
        }
        async fn create_topic_as_database(
            &self,
            name: impl AsRef<str> + Send + Sync + 'async_trait,
            db: impl std::fmt::Display + Send + 'async_trait,
        ) -> RawResult<()> {
            let name = name.as_ref();
            let query = format!("create topic if not exists {name} with meta as database `{db}`");
            if let Err(err) = self.exec(&query).await {
                log::trace!("Create topic error: {:?}, try without ``", err);
                let query = format!("create topic if not exists {name} with meta as database {db}");
                self.exec(query).await?;
            }
            Ok(())
        }
        async fn databases(&self) -> RawResult<Vec<ShowDatabase>> {
            use futures::stream::TryStreamExt;
            Ok(self
                .query("SHOW DATABASES")
                .await?
                .deserialize()
                .try_collect()
                .await?)
        }
        async fn topics(&self) -> RawResult<Vec<Topic>> {
            let sql = "SELECT * FROM information_schema.ins_topics";
            log::trace!("query one with sql: {sql}");
            Ok(self.query(sql).await?.deserialize().try_collect().await?)
        }
        async fn describe(&self, table: &str) -> RawResult<Describe> {
            Ok(Describe(
                self.query(format!("DESCRIBE `{table}`"))
                    .await?
                    .deserialize()
                    .try_collect()
                    .await?,
            ))
        }
        async fn database_exists(&self, name: &str) -> RawResult<bool> {
            Ok(self.exec(format!("show `{name}`.stables")).await.is_ok())
        }
        fn exec_sync<T: AsRef<str> + Send + Sync>(&self, sql: T) -> RawResult<usize> {
            futures::executor::block_on(self.exec(sql))
        }
        fn query_sync<T: AsRef<str> + Send + Sync>(
            &self,
            sql: T,
        ) -> RawResult<Self::AsyncResultSet> {
            futures::executor::block_on(self.query(sql))
        }
    }
    #[test]
    fn test() {
        assert!(true);
    }
}