1mod _priv {
2 pub use crate::common::{
3 AlterType, BorrowedValue, ColumnView, Field, JsonMeta, MetaAlter, MetaCreate, MetaDrop,
4 MetaUnit, Precision, RawBlock, RawMeta, TagWithValue, Ty, Value,
5 };
6 pub use crate::util::{Inlinable, InlinableRead, InlinableWrite};
7
8 pub use itertools::Itertools;
9 pub use mdsn::{Dsn, DsnError, IntoDsn};
10 pub use taos_error::{Code, Error as RawError};
11
12 pub use crate::tmq::{IsOffset, MessageSet, Timeout};
13}
14
15pub use crate::tmq::{AsAsyncConsumer, IsAsyncData, IsAsyncMeta};
16pub use crate::AsyncTBuilder;
17#[cfg(feature = "deadpool")]
18pub use crate::Pool;
19pub use crate::RawResult;
20pub use _priv::*;
21pub use futures::stream::{Stream, StreamExt, TryStreamExt};
22pub use r#async::*;
23pub use tokio;
24
25pub trait Helpers {
26 fn table_vgroup_id(&self, _db: &str, _table: &str) -> Option<i32> {
27 None
28 }
29
30 fn tables_vgroup_ids<T: AsRef<str>>(&self, _db: &str, _tables: &[T]) -> Option<Vec<i32>> {
31 None
32 }
33}
34
35pub mod sync {
36 pub use crate::RawResult;
37 pub use crate::TBuilder;
38 #[cfg(feature = "r2d2")]
39 pub use crate::{Pool, PoolBuilder};
40 #[cfg(feature = "r2d2")]
41 pub use r2d2::ManageConnection;
42 use std::borrow::Cow;
43
44 pub use super::_priv::*;
45
46 pub use crate::stmt::Bindable;
47 pub use crate::tmq::{AsConsumer, IsData, IsMeta};
48
49 use serde::de::DeserializeOwned;
50
51 pub use mdsn::{Address, Dsn, DsnError, IntoDsn};
52 pub use serde::de::value::Error as DeError;
53
54 use crate::common::*;
55 use crate::helpers::*;
56
57 pub struct IRowsIter<'a, T>
58 where
59 T: Fetchable,
60 {
61 iter: IBlockIter<'a, T>,
62 block: Option<RawBlock>,
63 rows: Option<RowsIter<'a>>,
65 }
66
67 impl<'a, T> IRowsIter<'a, T>
68 where
69 T: Fetchable,
70 {
71 fn fetch(&mut self) -> RawResult<Option<RowView<'a>>> {
72 if let Some(block) = self.iter.next().transpose()? {
73 self.block = Some(block);
74 self.rows = self.block.as_mut().map(|raw| raw.rows());
75 let row = self.rows.as_mut().unwrap().next();
76 Ok(row)
77 } else {
78 Ok(None)
79 }
80 }
81 fn next_row(&mut self) -> RawResult<Option<RowView<'a>>> {
82 if let Some(rows) = self.rows.as_mut() {
84 if let Some(row) = rows.next() {
86 Ok(Some(row))
87 } else {
88 self.fetch()
89 }
90 } else {
91 self.fetch()
93 }
94 }
95 }
96
97 impl<'a, T> Iterator for IRowsIter<'a, T>
98 where
99 T: Fetchable,
100 {
101 type Item = RawResult<RowView<'a>>;
102
103 fn next(&mut self) -> Option<Self::Item> {
104 self.next_row().transpose()
105 }
106 }
107
108 pub struct IBlockIter<'a, T>
109 where
110 T: Fetchable,
111 {
112 query: &'a mut T,
113 }
114
115 impl<'a, T> Iterator for IBlockIter<'a, T>
116 where
117 T: Fetchable,
118 {
119 type Item = RawResult<RawBlock>;
120
121 fn next(&mut self) -> Option<Self::Item> {
122 self.query
123 .fetch_raw_block()
124 .map(|raw| {
125 if let Some(raw) = raw {
126 self.query.update_summary(raw.nrows());
127 Some(raw)
128 } else {
129 None
130 }
131 })
132 .transpose()
133 }
134 }
135
136 pub trait Fetchable: Sized {
137 fn affected_rows(&self) -> i32;
138
139 fn precision(&self) -> Precision;
140
141 fn fields(&self) -> &[Field];
142
143 fn num_of_fields(&self) -> usize {
144 self.fields().len()
145 }
146
147 fn summary(&self) -> (usize, usize);
148
149 #[doc(hidden)]
150 fn update_summary(&mut self, nrows: usize);
151
152 #[doc(hidden)]
153 fn fetch_raw_block(&mut self) -> RawResult<Option<RawBlock>>;
154
155 fn blocks(&mut self) -> IBlockIter<'_, Self> {
157 IBlockIter { query: self }
158 }
159
160 fn rows(&mut self) -> IRowsIter<'_, Self> {
162 IRowsIter {
163 iter: self.blocks(),
164 block: None,
165 rows: None,
167 }
168 }
169
170 fn deserialize<T: DeserializeOwned>(
171 &mut self,
172 ) -> std::iter::Map<IRowsIter<'_, Self>, fn(RawResult<RowView>) -> RawResult<T>> {
173 self.rows().map(|row| T::deserialize(&mut row?))
174 }
175
176 fn to_rows_vec(&mut self) -> RawResult<Vec<Vec<Value>>> {
177 self.blocks()
178 .map_ok(|raw| raw.to_values())
179 .flatten_ok()
180 .try_collect()
181 }
182 }
183
184 pub trait Queryable {
188 type ResultSet: Fetchable;
189
190 fn query<T: AsRef<str>>(&self, sql: T) -> RawResult<Self::ResultSet>;
191
192 fn query_with_req_id<T: AsRef<str>>(
193 &self,
194 sql: T,
195 req_id: u64,
196 ) -> RawResult<Self::ResultSet>;
197
198 fn exec<T: AsRef<str>>(&self, sql: T) -> RawResult<usize> {
199 self.query(sql).map(|res| res.affected_rows() as _)
200 }
201
202 fn write_raw_meta(&self, _: &RawMeta) -> RawResult<()>;
203
204 fn write_raw_block(&self, _: &RawBlock) -> RawResult<()>;
205
206 fn write_raw_block_with_req_id(&self, _: &RawBlock, _: u64) -> RawResult<()>;
207
208 fn exec_many<T: AsRef<str>, I: IntoIterator<Item = T>>(
209 &self,
210 input: I,
211 ) -> RawResult<usize> {
212 input
213 .into_iter()
214 .map(|sql| self.exec(sql))
215 .try_fold(0, |mut acc, aff| {
216 acc += aff?;
217 Ok(acc)
218 })
219 }
220
221 fn query_one<T: AsRef<str>, O: DeserializeOwned>(&self, sql: T) -> RawResult<Option<O>> {
222 self.query(sql)?
223 .deserialize::<O>()
224 .next()
225 .map_or(Ok(None), |v| v.map(Some).map_err(Into::into))
226 }
227
228 fn server_version(&self) -> RawResult<Cow<str>> {
230 Ok(self
231 .query_one::<_, String>("SELECT server_version()")?
232 .expect("should always has result")
233 .into())
234 }
235
236 fn create_topic(&self, name: impl AsRef<str>, sql: impl AsRef<str>) -> RawResult<()> {
237 let (name, sql) = (name.as_ref(), sql.as_ref());
238 let query = format!("create topic if not exists `{name}` as {sql}");
239
240 self.query(query)?;
241 Ok(())
242 }
243
244 fn create_topic_as_database(
245 &self,
246 name: impl AsRef<str>,
247 db: impl std::fmt::Display,
248 ) -> RawResult<()> {
249 let name = name.as_ref();
250 let query = format!("create topic if not exists `{name}` as database `{db}`");
251
252 self.exec(query)?;
253 Ok(())
254 }
255
256 fn databases(&self) -> RawResult<Vec<ShowDatabase>> {
257 self.query("show databases")?
258 .deserialize()
259 .try_collect()
260 .map_err(Into::into)
261 }
262
263 fn topics(&self) -> RawResult<Vec<Topic>> {
269 self.query("SELECT * FROM information_schema.ins_topics")?
270 .deserialize()
271 .try_collect()
272 .map_err(Into::into)
273 }
274
275 fn describe(&self, table: &str) -> RawResult<Describe> {
276 Ok(Describe(
277 self.query(format!("describe `{table}`"))?
278 .deserialize()
279 .try_collect()?,
280 ))
281 }
282
283 fn database_exists(&self, name: &str) -> RawResult<bool> {
285 Ok(self.exec(format!("show `{name}`.stables")).is_ok())
286 }
287
288 fn put(&self, data: &SmlData) -> RawResult<()>;
289
290 fn table_vgroup_id(&self, _db: &str, _table: &str) -> Option<i32> {
291 None
292 }
293
294 fn tables_vgroup_ids<T: AsRef<str>>(&self, _db: &str, _tables: &[T]) -> Option<Vec<i32>> {
295 None
296 }
297 }
298}
299
300mod r#async {
301 use serde::de::DeserializeOwned;
302 use std::borrow::Cow;
303 use std::marker::PhantomData;
304 use std::pin::Pin;
305 use std::task::{Context, Poll};
306
307 use crate::common::*;
308 use crate::helpers::*;
309 pub use crate::stmt::AsyncBindable;
310 pub use crate::RawResult;
311
312 pub use super::_priv::*;
313 pub use crate::util::AsyncInlinable;
314 pub use crate::util::AsyncInlinableRead;
315 pub use crate::util::AsyncInlinableWrite;
316 pub use mdsn::Address;
317 pub use serde::de::value::Error as DeError;
318
319 pub use futures::stream::{Stream, StreamExt, TryStreamExt};
320
321 #[cfg(feature = "async")]
323 use async_trait::async_trait;
324
325 pub struct AsyncBlocks<'a, T> {
326 query: &'a mut T,
327 }
328
329 impl<'a, T> Stream for AsyncBlocks<'a, T>
330 where
331 T: AsyncFetchable,
332 {
333 type Item = RawResult<RawBlock>;
334
335 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
336 self.query.fetch_raw_block(cx).map(|raw| {
337 raw.map(|raw| {
338 raw.map(|raw| {
339 self.query.update_summary(raw.nrows());
340 raw
341 })
342 })
343 .transpose()
344 })
345 }
346 }
347
348 pub struct AsyncRows<'a, T> {
349 blocks: AsyncBlocks<'a, T>,
350 block: Option<RawBlock>,
351 rows: Option<RowsIter<'a>>,
352 }
353
354 impl<'a, T> AsyncRows<'a, T>
355 where
356 T: AsyncFetchable,
357 {
358 fn fetch(&mut self, cx: &mut Context<'_>) -> Poll<RawResult<Option<RowView<'a>>>> {
359 let poll = self.blocks.try_poll_next_unpin(cx);
360 match poll {
361 Poll::Ready(block) => match block.transpose() {
362 Ok(Some(block)) => {
363 self.block = Some(block);
364 self.rows = self.block.as_mut().map(|raw| raw.rows());
365 let row = self.rows.as_mut().unwrap().next();
366 Poll::Ready(Ok(row))
367 }
368 Ok(None) => Poll::Ready(Ok(None)),
369 Err(err) => Poll::Ready(Err(err)),
370 },
371 Poll::Pending => Poll::Pending,
372 }
373 }
374 fn next_row(&mut self, cx: &mut Context<'_>) -> Poll<RawResult<Option<RowView<'a>>>> {
375 if let Some(rows) = self.rows.as_mut() {
377 if let Some(row) = rows.next() {
379 Poll::Ready(Ok(Some(row)))
380 } else {
381 self.fetch(cx)
382 }
383 } else {
384 self.fetch(cx)
386 }
387 }
388 }
389
390 impl<'a, T> Stream for AsyncRows<'a, T>
391 where
392 T: AsyncFetchable,
393 {
394 type Item = RawResult<RowView<'a>>;
395
396 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
397 self.next_row(cx).map(|row| row.transpose())
398 }
399 }
400
401 pub struct AsyncDeserialized<'a, T, V> {
402 rows: AsyncRows<'a, T>,
403 _marker: PhantomData<V>,
404 }
405
406 impl<'a, T, V> Unpin for AsyncDeserialized<'a, T, V> {}
407
408 impl<'a, T, V> Stream for AsyncDeserialized<'a, T, V>
409 where
410 T: AsyncFetchable,
411 V: DeserializeOwned,
412 {
413 type Item = RawResult<V>;
414
415 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
416 use futures::stream::*;
417 Pin::get_mut(self).rows.poll_next_unpin(cx).map(|row| {
418 row.map(|row| row.and_then(|mut row| V::deserialize(&mut row).map_err(Into::into)))
419 })
420 }
421 }
422
423 #[cfg(feature = "async")]
424 #[async_trait]
425 pub trait AsyncFetchable: Sized + Send + Sync {
426 fn affected_rows(&self) -> i32;
427
428 fn precision(&self) -> Precision;
429
430 fn fields(&self) -> &[Field];
431
432 fn filed_names(&self) -> Vec<&str> {
433 self.fields().iter().map(|f| f.name()).collect_vec()
434 }
435
436 fn num_of_fields(&self) -> usize {
437 self.fields().len()
438 }
439
440 fn summary(&self) -> (usize, usize);
441
442 #[doc(hidden)]
443 fn update_summary(&mut self, nrows: usize);
444
445 #[doc(hidden)]
446 fn fetch_raw_block(&mut self, cx: &mut Context<'_>) -> Poll<RawResult<Option<RawBlock>>>;
447
448 fn blocks(&mut self) -> AsyncBlocks<'_, Self> {
449 AsyncBlocks { query: self }
450 }
451
452 fn rows(&mut self) -> AsyncRows<'_, Self> {
453 AsyncRows {
454 blocks: self.blocks(),
455 block: None,
456 rows: None,
457 }
458 }
459
460 async fn to_records(&mut self) -> RawResult<Vec<Vec<Value>>> {
462 let future = self.rows().map_ok(RowView::into_values).try_collect();
463 future.await
464 }
465
466 fn deserialize<R>(&mut self) -> AsyncDeserialized<'_, Self, R>
467 where
468 R: serde::de::DeserializeOwned,
469 {
470 AsyncDeserialized {
471 rows: self.rows(),
472 _marker: PhantomData,
473 }
474 }
475 }
476
477 #[cfg(feature = "async")]
478 #[async_trait]
480 pub trait AsyncQueryable: Send + Sync + Sized {
481 type AsyncResultSet: AsyncFetchable;
483
484 async fn query<T: AsRef<str> + Send + Sync>(
485 &self,
486 sql: T,
487 ) -> RawResult<Self::AsyncResultSet>;
488
489 async fn put(&self, schemaless_data: &SmlData) -> RawResult<()>;
490
491 async fn query_with_req_id<T: AsRef<str> + Send + Sync>(
496 &self,
497 sql: T,
498 req_id: u64,
499 ) -> RawResult<Self::AsyncResultSet>;
500
501 async fn exec<T: AsRef<str> + Send + Sync>(&self, sql: T) -> RawResult<usize> {
502 let sql = sql.as_ref();
503 self.query(sql).await.map(|res| res.affected_rows() as _)
505 }
506
507 async fn exec_with_req_id<T: AsRef<str> + Send + Sync>(
508 &self,
509 sql: T,
510 req_id: u64,
511 ) -> RawResult<usize> {
512 let sql = sql.as_ref();
513 self.query_with_req_id(sql, req_id)
515 .await
516 .map(|res| res.affected_rows() as _)
517 }
518
519 async fn write_raw_meta(&self, meta: &RawMeta) -> RawResult<()>;
520
521 async fn write_raw_block(&self, block: &RawBlock) -> RawResult<()>;
522
523 async fn write_raw_block_with_req_id(&self, block: &RawBlock, req_id: u64)
524 -> RawResult<()>;
525
526 async fn exec_many<T, I>(&self, input: I) -> RawResult<usize>
527 where
528 T: AsRef<str> + Send + Sync,
529 I::IntoIter: Send,
530 I: IntoIterator<Item = T> + Send,
531 {
532 let mut aff = 0;
533 for sql in input {
534 aff += self.exec(sql).await?;
535 }
536 Ok(aff)
537 }
538
539 async fn query_one<T: AsRef<str> + Send + Sync, O: DeserializeOwned + Send>(
555 &self,
556 sql: T,
557 ) -> RawResult<Option<O>> {
558 use futures::StreamExt;
559 self.query(sql)
561 .await?
562 .deserialize::<O>()
563 .take(1)
564 .collect::<Vec<_>>()
565 .await
566 .into_iter()
567 .next()
568 .map_or(Ok(None), |v| v.map(Some).map_err(Into::into))
569 }
570
571 async fn server_version(&self) -> RawResult<Cow<str>> {
573 Ok(self
574 .query_one::<_, String>("SELECT server_version()")
575 .await?
576 .expect("should always has result")
577 .into())
578 }
579
580 async fn create_database<N: AsRef<str> + Send>(&self, name: N) -> RawResult<()> {
582 let query = format!("CREATE DATABASE IF NOT EXISTS {}", name.as_ref());
583
584 self.query(query).await?;
585 Ok(())
586 }
587
588 async fn use_database<N: AsRef<str> + Send>(&self, name: N) -> RawResult<()> {
590 let query = format!("USE `{}`", name.as_ref());
591
592 self.query(query).await?;
593 Ok(())
594 }
595
596 async fn create_topic<N: AsRef<str> + Send + Sync, S: AsRef<str> + Send>(
598 &self,
599 name: N,
600 sql: S,
601 ) -> RawResult<()> {
602 let (name, sql) = (name.as_ref(), sql.as_ref());
603 let query = format!("CREATE TOPIC IF NOT EXISTS `{name}` AS {sql}");
604
605 self.query(query).await?;
606 Ok(())
607 }
608
609 async fn create_topic_as_database(
611 &self,
612 name: impl AsRef<str> + Send + Sync + 'async_trait,
613 db: impl std::fmt::Display + Send + 'async_trait,
614 ) -> RawResult<()> {
615 let name = name.as_ref();
616 let query = format!("create topic if not exists `{name}` with meta as database `{db}`");
617 self.exec(&query).await?;
618 Ok(())
619 }
620
621 async fn databases(&self) -> RawResult<Vec<ShowDatabase>> {
623 use futures::stream::TryStreamExt;
624 Ok(self
625 .query("SHOW DATABASES")
626 .await?
627 .deserialize()
628 .try_collect()
629 .await?)
630 }
631
632 async fn topics(&self) -> RawResult<Vec<Topic>> {
638 let sql = "SELECT * FROM information_schema.ins_topics";
639 log::trace!("query one with sql: {sql}");
640 Ok(self.query(sql).await?.deserialize().try_collect().await?)
641 }
642
643 async fn describe(&self, table: &str) -> RawResult<Describe> {
645 Ok(Describe(
646 self.query(format!("DESCRIBE `{table}`"))
647 .await?
648 .deserialize()
649 .try_collect()
650 .await?,
651 ))
652 }
653
654 async fn database_exists(&self, name: &str) -> RawResult<bool> {
656 Ok(self.exec(format!("show `{name}`.stables")).await.is_ok())
657 }
658
659 fn exec_sync<T: AsRef<str> + Send + Sync>(&self, sql: T) -> RawResult<usize> {
661 crate::block_in_place_or_global(self.exec(sql))
662 }
663
664 fn query_sync<T: AsRef<str> + Send + Sync>(
666 &self,
667 sql: T,
668 ) -> RawResult<Self::AsyncResultSet> {
669 crate::block_in_place_or_global(self.query(sql))
670 }
671
672 async fn table_vgroup_id(&self, _db: &str, _table: &str) -> Option<i32> {
673 None
674 }
675
676 async fn tables_vgroup_ids<T: AsRef<str> + Sync>(
677 &self,
678 _db: &str,
679 _tables: &[T],
680 ) -> Option<Vec<i32>> {
681 None
682 }
683 }
684
685 #[test]
686 fn test() {
687 assert!(true);
688 }
689}