1use crate::{
2 err_or,
3 into_c_str::IntoCStr,
4 raw::{ApiEntry, StmtApi},
5 RawRes, RawTaos, ResultSet,
6};
7
8use std::{
9 ffi::{c_void, CStr, CString},
10 sync::Arc,
11};
12
13use itertools::Itertools;
14use taos_query::{
16 prelude::{
17 sync::{Bindable, Queryable, RawError as Error},
18 Code, RawResult,
19 },
20 stmt::AsyncBindable,
21};
22
23use crate::types::*;
24
25mod bind;
26mod multi;
27
28#[derive(Debug)]
29pub struct Stmt {
30 raw: RawStmt,
31}
32
33impl Bindable<super::Taos> for Stmt {
34 fn init(taos: &super::Taos) -> RawResult<Self> {
35 Ok(Self {
36 raw: RawStmt::from_raw_taos(&taos.raw),
37 })
38 }
39
40 fn init_with_req_id(taos: &super::Taos, req_id: u64) -> RawResult<Self> {
41 Ok(Self {
42 raw: RawStmt::from_raw_taos_with_req_id(&taos.raw, req_id),
43 })
44 }
45
46 fn prepare<S: AsRef<str>>(&mut self, sql: S) -> RawResult<&mut Self> {
47 self.raw.prepare(sql.as_ref())?;
48 Ok(self)
49 }
50
51 fn set_tbname<S: AsRef<str>>(&mut self, sql: S) -> RawResult<&mut Self> {
52 self.raw.set_tbname(sql.as_ref())?;
53 Ok(self)
54 }
55
56 fn set_tags(&mut self, tags: &[taos_query::common::Value]) -> RawResult<&mut Self> {
57 if self.raw.is_v3() {
58 let tags = tags.iter().map(TaosBindV3::from_value).collect_vec();
59 self.raw.set_tags(tags.as_ptr() as _)?;
60 } else {
61 let tags = tags.iter().map(TaosBindV2::from_value).collect_vec();
62 self.raw.set_tags(tags.as_ptr() as _)?;
63 }
64 Ok(self)
65 }
66
67 fn bind(&mut self, params: &[taos_query::common::ColumnView]) -> RawResult<&mut Self> {
68 let params: Vec<DropMultiBind> = params.iter().map(|c| c.into()).collect_vec();
69 self.raw
70 .bind_param_batch(unsafe { std::mem::transmute(params.as_slice()) })?;
71 Ok(self)
72 }
73
74 fn add_batch(&mut self) -> RawResult<&mut Self> {
75 self.raw.add_batch()?;
76 Ok(self)
77 }
78
79 fn execute(&mut self) -> RawResult<usize> {
80 self.raw.execute().map_err(Into::into)
81 }
82
83 fn result_set(&mut self) -> RawResult<<super::Taos as Queryable>::ResultSet> {
84 self.raw.use_result().map_err(Into::into)
85 }
86
87 fn affected_rows(&self) -> usize {
88 self.raw.affected_rows() as _
89 }
90}
91
92#[async_trait::async_trait]
93impl AsyncBindable<super::Taos> for Stmt {
94 async fn init(taos: &super::Taos) -> RawResult<Self> {
95 Ok(Self {
96 raw: RawStmt::from_raw_taos(&taos.raw),
97 })
98 }
99
100 async fn init_with_req_id(taos: &super::Taos, req_id: u64) -> RawResult<Self> {
101 Ok(Self {
102 raw: RawStmt::from_raw_taos_with_req_id(&taos.raw, req_id),
103 })
104 }
105
106 async fn prepare(&mut self, sql: &str) -> RawResult<&mut Self> {
107 self.raw.prepare(sql)?;
108 Ok(self)
109 }
110
111 async fn set_tbname(&mut self, sql: &str) -> RawResult<&mut Self> {
112 self.raw.set_tbname(sql)?;
113 Ok(self)
114 }
115
116 async fn set_tags(&mut self, tags: &[taos_query::common::Value]) -> RawResult<&mut Self> {
117 if self.raw.is_v3() {
118 let tags = tags.iter().map(TaosBindV3::from_value).collect_vec();
119 self.raw.set_tags(tags.as_ptr() as _)?;
120 } else {
121 let tags = tags.iter().map(TaosBindV2::from_value).collect_vec();
122 self.raw.set_tags(tags.as_ptr() as _)?;
123 }
124 Ok(self)
125 }
126
127 async fn bind(&mut self, params: &[taos_query::common::ColumnView]) -> RawResult<&mut Self> {
128 let params: Vec<DropMultiBind> = params.iter().map(|c| c.into()).collect_vec();
129 self.raw
130 .bind_param_batch(unsafe { std::mem::transmute(params.as_slice()) })?;
131 Ok(self)
132 }
133
134 async fn add_batch(&mut self) -> RawResult<&mut Self> {
135 self.raw.add_batch()?;
136 Ok(self)
137 }
138
139 async fn execute(&mut self) -> RawResult<usize> {
140 self.raw.execute().map_err(Into::into)
141 }
142
143 async fn result_set(&mut self) -> RawResult<<super::Taos as Queryable>::ResultSet> {
144 self.raw.use_result().map_err(Into::into)
145 }
146
147 async fn affected_rows(&self) -> usize {
148 self.raw.affected_rows() as _
149 }
150}
151
152#[derive(Debug)]
153pub(crate) struct RawStmt {
154 c: Arc<ApiEntry>,
155 api: StmtApi,
156 ptr: *mut TAOS_STMT,
157 tbname: Option<CString>,
158}
159
160unsafe impl Sync for RawStmt {}
161unsafe impl Send for RawStmt {}
162impl Drop for RawStmt {
163 fn drop(&mut self) {
164 let _ = self.close();
165 }
166}
167
168impl RawStmt {
169 fn is_v3(&self) -> bool {
170 self.c.version().starts_with('3')
171 }
172 #[inline(always)]
173 fn ok(&self, code: impl Into<Code>) -> RawResult<()> {
174 let code = code.into();
175
176 if code.success() {
177 Ok(())
178 } else {
179 Err(Error::from_string(self.err_as_str()))
180 }
181 }
182
183 #[inline]
184 pub unsafe fn as_ptr(&self) -> *mut TAOS_STMT {
185 self.ptr
186 }
187
188 #[inline]
194 pub fn err_as_str(&self) -> String {
195 unsafe {
196 match self.api.taos_stmt_errstr {
197 Some(f) => CStr::from_ptr(f(self.as_ptr()))
198 .to_string_lossy()
199 .to_string(),
200 None => todo!(),
201 }
202 }
203 }
204
205 #[inline]
206 pub fn from_raw_taos(taos: &RawTaos) -> RawStmt {
207 RawStmt {
208 c: taos.c.clone(),
209 api: taos.c.stmt,
210 ptr: unsafe { (taos.c.stmt.taos_stmt_init)(taos.as_ptr()) },
211 tbname: None,
212 }
213 }
214
215 #[inline]
216 pub fn from_raw_taos_with_req_id(taos: &RawTaos, req_id: u64) -> RawStmt {
217 RawStmt {
218 c: taos.c.clone(),
219 api: taos.c.stmt,
220 ptr: unsafe { (taos.c.stmt.taos_stmt_init_with_reqid.unwrap())(taos.as_ptr(), req_id) },
221 tbname: None,
222 }
223 }
224 #[inline]
225 pub fn close(&mut self) -> RawResult<()> {
226 err_or!(self, (self.api.taos_stmt_close)(self.as_ptr()))
227 }
228
229 #[inline]
230 pub fn prepare<'c>(&mut self, sql: impl IntoCStr<'c>) -> RawResult<()> {
231 let sql = sql.into_c_str();
232 tracing::trace!("prepare stmt with sql: {sql:?}");
233 self.ok(unsafe {
234 (self.api.taos_stmt_prepare)(self.as_ptr(), sql.as_ptr(), sql.to_bytes().len() as _)
235 })
236 }
237
238 #[inline]
253 pub fn set_tbname<'c>(&mut self, name: impl IntoCStr<'c>) -> RawResult<()> {
254 let name = name.into_c_str();
255 let res = self.ok(unsafe {
256 match self.api.taos_stmt_set_tbname {
257 Some(f) => f(self.as_ptr(), name.into_c_str().as_ptr()),
258 None => todo!(),
259 }
260 });
261 if !self.is_v3() {
262 self.tbname = Some(name.into_owned());
263 }
264 res
265 }
266
267 #[inline]
275 pub fn set_tags(&mut self, tags: *const c_void) -> RawResult<()> {
276 if self.is_v3() {
277 self.ok(unsafe { (self.api.taos_stmt_set_tags.unwrap())(self.as_ptr(), tags as _) })
278 } else {
279 self.ok(unsafe {
280 match self.api.taos_stmt_set_tbname_tags {
281 Some(f) => f(
282 self.as_ptr(),
283 self.tbname.as_deref().unwrap().as_ptr(),
284 tags as _,
285 ),
286 None => todo!(),
287 }
288 })
289 }
290 }
291
292 #[inline]
293 pub fn use_result(&mut self) -> RawResult<ResultSet> {
294 unsafe {
295 RawRes::from_ptr(
296 self.c.clone(),
297 (self.api.taos_stmt_use_result)(self.as_ptr()),
298 )
299 .map(ResultSet::new)
300 }
301 }
302
303 #[inline]
304 pub fn affected_rows(&self) -> i32 {
305 unsafe {
306 match self.api.taos_stmt_affected_rows {
307 Some(f) => f(self.as_ptr()),
308 None => todo!(),
309 }
310 }
311 }
312
313 #[inline]
314 pub fn execute(&self) -> RawResult<usize> {
315 let cur = self.affected_rows();
316 err_or!(self, (self.api.taos_stmt_execute)(self.as_ptr()))?;
317 let new = self.affected_rows();
318 Ok((new - cur) as usize)
319 }
320
321 #[inline]
322 pub fn add_batch(&self) -> RawResult<()> {
323 err_or!(self, (self.api.taos_stmt_add_batch)(self.as_ptr()))
324 }
325
326 #[inline]
361 pub fn bind_param_batch(&mut self, bind: &[TaosMultiBind]) -> RawResult<()> {
362 match self.api.taos_stmt_bind_param_batch {
363 Some(f) => err_or!(self, f(self.as_ptr(), bind.as_ptr())),
364 None => todo!(),
365 }
366 }
367
368 }
375
376#[cfg(test)]
377mod tests {
378
379 use crate::{Stmt, TaosBuilder};
380 use bytes::Bytes;
381 use taos_query::util::hex::*;
382
383 #[test]
384 fn test_tbname_tags() -> anyhow::Result<()> {
385 use taos_query::prelude::sync::*;
386 let builder = TaosBuilder::from_dsn("taos:///")?;
387 let taos = builder.build()?;
388 taos.query("drop database if exists stt1")?;
389 taos.query("create database if not exists stt1 keep 36500")?;
390 taos.query("use stt1")?;
391 taos.query(
392 "create stable if not exists st1(ts timestamp, v int) tags(jt int, t1 varchar(32))",
394 )?;
395
396 let mut stmt = Stmt::init(&taos)?;
397 let sql = "insert into ? using st1 tags(?, ?) values(?, ?)";
398 stmt.prepare(sql)?;
399
400 let tbname = "tb1";
402 stmt.set_tbname(tbname)?;
403
404 let tags = vec![Value::Int(0), Value::VarChar(String::from("taos"))];
405 stmt.set_tags(&tags)?;
406 let params = vec![
407 ColumnView::from_millis_timestamp(vec![0]),
408 ColumnView::from_ints(vec![0]),
409 ];
410 stmt.bind(¶ms)?;
411 println!("bind");
412
413 let params = vec![
414 ColumnView::from_millis_timestamp(vec![1]),
415 ColumnView::from_ints(vec![0]),
416 ];
417 stmt.bind(¶ms)?;
418 println!("add batch");
419
420 stmt.add_batch()?;
421 println!("execute");
422 stmt.execute()?;
423
424 assert_eq!(stmt.affected_rows(), 2);
425 println!("done");
426
427 taos.query("drop database stt1")?;
428 Ok(())
429 }
430
431 #[test]
432 fn test_tbname_tags_json() -> anyhow::Result<()> {
433 use taos_query::prelude::sync::*;
434 let builder = TaosBuilder::from_dsn("taos:///")?;
435 let taos = builder.build()?;
436 taos.query("drop database if exists stt2")?;
437 taos.query("create database if not exists stt2 keep 36500")?;
438 taos.query("use stt2")?;
439 taos.query(
440 "create stable if not exists st1(ts timestamp, v int) tags(jt json)", )?;
442
443 let mut stmt = Stmt::init(&taos)?;
444 let sql = "insert into ? using st1 tags(?) values(?, ?)";
445 stmt.prepare(sql)?;
446
447 let tags = vec![Value::Json(serde_json::from_str(r#"{"name":"value"}"#)?)];
448 println!("tags: {tags:#?}");
450 let tbname = "tb1";
451 stmt.set_tbname(tbname)?;
452
453 stmt.set_tags(&tags)?;
454 println!("bind");
456
457 let params = vec![
461 ColumnView::from_millis_timestamp(vec![0]),
462 ColumnView::from_ints(vec![0]),
463 ];
464 stmt.bind(¶ms)?;
465 println!("add batch");
466
467 stmt.add_batch()?;
468 stmt.execute()?;
469
470 assert!(stmt.affected_rows() == 1);
471
472 stmt.prepare("insert into tb1 values(?,?)")?;
473 let params = vec![
474 ColumnView::from_millis_timestamp(vec![1]),
475 ColumnView::from_ints(vec![0]),
476 ];
477 stmt.bind(¶ms)?;
478 println!("add batch");
479
480 stmt.add_batch()?;
481 stmt.execute()?;
482
483 let mut res = taos.query("select * from st1")?;
484
485 if let Some(raw) = res.fetch_raw_block()? {
486 assert_eq!(raw.nrows(), 2);
487 } else {
488 panic!("no data retrieved");
489 }
490
491 taos.query("drop database stt2")?;
492 Ok(())
493 }
494
495 #[test]
496 fn test_bindable() -> anyhow::Result<()> {
497 use taos_query::prelude::sync::*;
498 let taos = TaosBuilder::from_dsn("taos:///")?.build()?;
499 taos.exec_many([
500 "drop database if exists test_bindable",
501 "create database test_bindable keep 36500",
502 "use test_bindable",
503 "create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,
504 c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned,
505 c10 float, c11 double, c12 varchar(100), c13 nchar(100), c14 varbinary(50), c15 geometry(50))",
506 ])?;
507 let mut stmt = Stmt::init(&taos)?;
508 stmt.prepare("insert into tb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")?;
509 let params = vec![
510 ColumnView::from_millis_timestamp(vec![0]),
511 ColumnView::from_bools(vec![true]),
512 ColumnView::from_tiny_ints(vec![0]),
513 ColumnView::from_small_ints(vec![0]),
514 ColumnView::from_ints(vec![0]),
515 ColumnView::from_big_ints(vec![0]),
516 ColumnView::from_unsigned_tiny_ints(vec![0]),
517 ColumnView::from_unsigned_small_ints(vec![0]),
518 ColumnView::from_unsigned_ints(vec![0]),
519 ColumnView::from_unsigned_big_ints(vec![0]),
520 ColumnView::from_floats(vec![0.0]),
521 ColumnView::from_doubles(vec![0.]),
522 ColumnView::from_varchar(vec!["ABC"]),
523 ColumnView::from_nchar(vec!["涛思数据"]),
524 ColumnView::from_bytes(vec![hex_string_to_bytes("123456").to_vec()]),
525 ColumnView::from_geobytes(vec![hex_string_to_bytes(
526 "0101000000000000000000F03F0000000000000040",
527 )
528 .to_vec()]),
529 ];
530 let rows = stmt.bind(¶ms)?.add_batch()?.execute()?;
531 assert_eq!(rows, 1);
532
533 let rows: Vec<(
534 String,
535 bool,
536 i8,
537 i16,
538 i32,
539 i64,
540 u8,
541 u16,
542 u32,
543 u64,
544 f32,
545 f64,
546 String,
547 String,
548 Bytes,
549 Bytes,
550 )> = taos
551 .query("select * from tb1")?
552 .deserialize()
553 .try_collect()?;
554 let row = &rows[0];
555 assert_eq!(row.12, "ABC");
556 assert_eq!(row.13, "涛思数据");
557 assert_eq!(row.14, hex_string_to_bytes("123456"));
558 assert_eq!(
559 row.15,
560 hex_string_to_bytes("0101000000000000000000F03F0000000000000040")
561 );
562 taos.query("drop database test_bindable")?;
563
564 Ok(())
565 }
566
567 #[test]
568 fn test_stmt_with_req_id() -> anyhow::Result<()> {
569 use taos_query::prelude::sync::*;
570 let taos = TaosBuilder::from_dsn("taos:///")?.build()?;
571 taos.exec_many([
572 "drop database if exists test_db_stmt",
573 "create database test_db_stmt keep 36500",
574 "use test_db_stmt",
575 "create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,
576 c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned,
577 c10 float, c11 double, c12 varchar(100), c13 nchar(100))",
578 ])?;
579 let req_id = 1000;
580 let mut stmt = Stmt::init_with_req_id(&taos, req_id)?;
581 stmt.prepare("insert into tb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")?;
582 let params = vec![
583 ColumnView::from_millis_timestamp(vec![0]),
584 ColumnView::from_bools(vec![true]),
585 ColumnView::from_tiny_ints(vec![0]),
586 ColumnView::from_small_ints(vec![0]),
587 ColumnView::from_ints(vec![0]),
588 ColumnView::from_big_ints(vec![0]),
589 ColumnView::from_unsigned_tiny_ints(vec![0]),
590 ColumnView::from_unsigned_small_ints(vec![0]),
591 ColumnView::from_unsigned_ints(vec![0]),
592 ColumnView::from_unsigned_big_ints(vec![0]),
593 ColumnView::from_floats(vec![0.0]),
594 ColumnView::from_doubles(vec![0.]),
595 ColumnView::from_varchar(vec!["ABC"]),
596 ColumnView::from_nchar(vec!["涛思数据"]),
597 ];
598 let rows = stmt.bind(¶ms)?.add_batch()?.execute()?;
599 assert_eq!(rows, 1);
600
601 let rows: Vec<(
602 String,
603 bool,
604 i8,
605 i16,
606 i32,
607 i64,
608 u8,
609 u16,
610 u32,
611 u64,
612 f32,
613 f64,
614 String,
615 String,
616 )> = taos
617 .query("select * from tb1")?
618 .deserialize()
619 .try_collect()?;
620 let row = &rows[0];
621 assert_eq!(row.12, "ABC");
622 assert_eq!(row.13, "涛思数据");
623 taos.query("drop database test_db_stmt")?;
624
625 Ok(())
626 }
627}
628
629#[cfg(test)]
630mod async_tests {
631 use crate::prelude::*;
632
633 #[tokio::test]
634 async fn test_stmt_with_req_id() -> anyhow::Result<()> {
635 let taos = TaosBuilder::from_dsn("taos:///")?.build().await?;
636 taos.exec_many([
637 "drop database if exists test_db_stmt_async",
638 "create database test_db_stmt_async keep 36500",
639 "use test_db_stmt_async",
640 "create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,
641 c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned,
642 c10 float, c11 double, c12 varchar(100), c13 nchar(100))",
643 ])
644 .await?;
645 let req_id = 1000;
646 let mut stmt = Stmt::init_with_req_id(&taos, req_id).await?;
647 stmt.prepare("insert into tb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
648 .await?;
649 let params = vec![
650 ColumnView::from_millis_timestamp(vec![0]),
651 ColumnView::from_bools(vec![true]),
652 ColumnView::from_tiny_ints(vec![0]),
653 ColumnView::from_small_ints(vec![0]),
654 ColumnView::from_ints(vec![0]),
655 ColumnView::from_big_ints(vec![0]),
656 ColumnView::from_unsigned_tiny_ints(vec![0]),
657 ColumnView::from_unsigned_small_ints(vec![0]),
658 ColumnView::from_unsigned_ints(vec![0]),
659 ColumnView::from_unsigned_big_ints(vec![0]),
660 ColumnView::from_floats(vec![0.0]),
661 ColumnView::from_doubles(vec![0.]),
662 ColumnView::from_varchar(vec!["ABC"]),
663 ColumnView::from_nchar(vec!["涛思数据"]),
664 ];
665 let rows = stmt
666 .bind(¶ms)
667 .await?
668 .add_batch()
669 .await?
670 .execute()
671 .await?;
672 assert_eq!(rows, 1);
673
674 let rows: Vec<(
675 String,
676 bool,
677 i8,
678 i16,
679 i32,
680 i64,
681 u8,
682 u16,
683 u32,
684 u64,
685 f32,
686 f64,
687 String,
688 String,
689 )> = taos
690 .query("select * from tb1")
691 .await?
692 .deserialize()
693 .try_collect()
694 .await?;
695 let row = &rows[0];
696 assert_eq!(row.12, "ABC");
697 assert_eq!(row.13, "涛思数据");
698 taos.query("drop database test_db_stmt_async").await?;
699
700 Ok(())
701 }
702}