rbdc_pg/driver.rs
1#![allow(mismatched_lifetime_syntaxes)]
2
3use crate::options::PgConnectOptions;
4use futures_core::future::BoxFuture;
5use rbdc::db::{ConnectOptions, Connection, Driver, Placeholder};
6use rbdc::{impl_exchange, Error};
7
8#[derive(Debug)]
9pub struct PgDriver {}
10
11impl Driver for PgDriver {
12 fn name(&self) -> &str {
13 "postgres"
14 }
15
16 fn connect(&self, url: &str) -> BoxFuture<'_, Result<Box<dyn Connection>, Error>> {
17 let url = url.to_owned();
18 Box::pin(async move {
19 let mut opt = self.default_option();
20 opt.set_uri(&url)?;
21 if let Some(opt) = opt.downcast_ref::<PgConnectOptions>() {
22 let conn = opt.connect().await?;
23 Ok(Box::new(conn) as Box<dyn Connection>)
24 } else {
25 Err(Error::from("downcast_ref failure"))
26 }
27 })
28 }
29 fn connect_opt<'a>(
30 &'a self,
31 opt: &'a dyn ConnectOptions,
32 ) -> BoxFuture<'a, Result<Box<dyn Connection>, Error>> {
33 let opt: &PgConnectOptions = opt.downcast_ref().unwrap();
34 Box::pin(async move {
35 let conn = opt.connect().await?;
36 Ok(conn)
37 })
38 }
39 fn default_option(&self) -> Box<dyn ConnectOptions> {
40 Box::new(PgConnectOptions::default())
41 }
42}
43
44impl Placeholder for PgDriver {
45 fn exchange(&self, sql: &str) -> String {
46 impl_exchange("$", 1, sql)
47 }
48}
49
50#[cfg(test)]
51mod test {
52 use crate::driver::PgDriver;
53 use rbdc::db::Placeholder;
54 #[test]
55 fn test_default() {}
56 #[test]
57 fn test_exchange() {
58 let v = "insert into biz_activity (id,name,pc_link,h5_link,pc_banner_img,h5_banner_img,sort,status,remark,create_time,version,delete_flag) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)";
59 let d = PgDriver {};
60 let sql = d.exchange(v);
61 assert_eq!("insert into biz_activity (id,name,pc_link,h5_link,pc_banner_img,h5_banner_img,sort,status,remark,create_time,version,delete_flag) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)", sql);
62 }
63}
64
65// #[cfg(test)]
66// mod test2 {
67// use crate::driver::PgDriver;
68// use rbdc::block_on;
69// use rbdc::datetime::DateTime;
70// use rbdc::db::Driver;
71// use rbdc::db::Placeholder;
72// use rbdc::decimal::Decimal;
73// use rbdc::pool::Pool;
74// use rbdc::timestamp::Timestamp;
75// use rbs::Value;
76//
77// #[test]
78// fn test_pg_pool() {
79// let task = async move {
80// let pool = Pool::new_url(
81// PgDriver {},
82// "postgres://postgres:123456@localhost:5432/postgres",
83// )
84// .unwrap();
85// std::thread::sleep(std::time::Duration::from_secs(2));
86// let mut conn = pool.get().await.unwrap();
87// let data = conn
88// .get_values("select * from biz_activity", vec![])
89// .await
90// .unwrap();
91// for mut x in data {
92// println!("row: {}", x);
93// }
94// };
95// block_on!(task);
96// }
97//
98// #[test]
99// fn test_pg_param() {
100// let task = async move {
101// let mut d = PgDriver {};
102// let mut c = d
103// .connect("postgres://postgres:123456@localhost:5432/postgres")
104// .await
105// .unwrap();
106// let param = vec![
107// Value::String("http://www.test.com".to_string()),
108// DateTime::now().into(),
109// Decimal("1".to_string()).into(),
110// Value::String("1".to_string()),
111// ];
112// println!("param => {}", Value::Array(param.clone()));
113// let data = c
114// .exec(
115// "update biz_activity set pc_link = $1,create_time = $2,delete_flag=$3 where id = $4",
116// param,
117// )
118// .await
119// .unwrap();
120// println!("{}", data);
121// };
122// block_on!(task);
123// }
124// }