rbdc_oracle/
connection.rs

1use std::sync::{Arc, Mutex};
2
3use futures_core::future::BoxFuture;
4use oracle::Connection as OraConnect;
5use oracle::sql_type::OracleType;
6use rbdc::db::{Connection, ExecResult, Row};
7use rbdc::Error;
8use rbs::Value;
9
10use crate::{OracleColumn, OracleData, OracleRow};
11use crate::driver::OracleDriver;
12use crate::encode::Encode;
13use crate::options::OracleConnectOptions;
14
15#[derive(Clone)]
16pub struct OracleConnection {
17    pub conn: Arc<OraConnect>,
18    pub is_trans: Arc<Mutex<bool>>,
19}
20
21impl Connection for OracleConnection {
22    fn get_rows(
23        &mut self,
24        sql: &str,
25        params: Vec<Value>,
26    ) -> BoxFuture<Result<Vec<Box<dyn Row>>, Error>> {
27        let sql: String = OracleDriver {}.pub_exchange(sql);
28        let oc = self.clone();
29        let task = tokio::task::spawn_blocking(move || {
30            let builder = oc.conn.statement(&sql);
31            let mut stmt = builder.build().map_err(|e| Error::from(e.to_string()))?;
32
33            for (idx, x) in params.into_iter().enumerate() {
34                x.encode(idx, &mut stmt).map_err(|e| Error::from(e.to_string()))?
35            }
36
37            let rows = stmt.query(&[]).map_err(|e| Error::from(e.to_string()))?;
38            let col_infos = rows.column_info();
39            let col_count = col_infos.len();
40            let mut results = Vec::with_capacity(col_count);
41            let mut columns = Vec::with_capacity(col_count);
42            for info in col_infos.iter() {
43                columns.push(OracleColumn {
44                    name: info.name().to_string().to_lowercase(),
45                    column_type: info.oracle_type().clone(),
46                })
47            }
48            for row_result in rows {
49                let row = row_result.map_err(|e| Error::from(e.to_string()))?;
50                let mut datas = Vec::with_capacity(col_count);
51                for col in row.sql_values().iter() {
52                    let t = col.oracle_type().map_err(|e| Error::from(e.to_string()))?;
53                    let t = t.clone();
54                    if let Ok(true) = col.is_null() {
55                        datas.push(OracleData {
56                            str: None,
57                            bin: None,
58                            column_type: t.clone(),
59                            is_sql_null: true,
60                        })
61                    } else {
62                        if t == OracleType::BLOB {
63                            match col.get::<Vec<u8>>() {
64                                Ok(bin) => datas.push(OracleData {
65                                    str: None,
66                                    bin: Some(bin),
67                                    column_type: t.clone(),
68                                    is_sql_null: false,
69                                }),
70                                Err(_) => datas.push(OracleData {
71                                    str: None,
72                                    bin: None,
73                                    column_type: t.clone(),
74                                    is_sql_null: false,
75                                }),
76                            }
77                        } else {
78                            match col.get::<String>() {
79                                Ok(str) => datas.push(OracleData {
80                                    str: Some(str),
81                                    bin: None,
82                                    column_type: t.clone(),
83                                    is_sql_null: false,
84                                }),
85                                Err(_) => datas.push(OracleData {
86                                    str: None,
87                                    bin: None,
88                                    column_type: t.clone(),
89                                    is_sql_null: false,
90                                }),
91                            }
92                        }
93                    }
94                }
95                let row = OracleRow {
96                    columns: Arc::new(columns.clone()),
97                    datas: datas,
98                };
99                results.push(Box::new(row) as Box<dyn Row>);
100            }
101            Ok(results)
102        });
103        Box::pin(async move {
104            task.await.map_err(|e| Error::from(e.to_string()))?
105        })
106    }
107
108    fn exec(
109        &mut self,
110        sql: &str,
111        params: Vec<Value>,
112    ) -> BoxFuture<Result<ExecResult, Error>> {
113        let oc = self.clone();
114        let sql = sql.to_string();
115        let task = tokio::task::spawn_blocking(move || {
116            let mut trans = oc.is_trans.lock()
117                .map_err(|e| Error::from(e.to_string()))?;
118            if sql == "begin" {
119                *trans = true;
120                Ok(ExecResult {
121                    rows_affected: 0,
122                    last_insert_id: Value::Null,
123                })
124            } else if sql == "commit" {
125                oc.conn.commit().unwrap();
126                *trans = false;
127                Ok(ExecResult {
128                    rows_affected: 0,
129                    last_insert_id: Value::Null,
130                })
131            } else if sql == "rollback" {
132                oc.conn.rollback().unwrap();
133                *trans = false;
134                Ok(ExecResult {
135                    rows_affected: 0,
136                    last_insert_id: Value::Null,
137                })
138            } else {
139                let sql: String = OracleDriver {}.pub_exchange(&sql);
140                let builder = oc.conn.statement(&sql);
141                let mut stmt = builder.build().map_err(|e| Error::from(e.to_string()))?;
142                for (idx, x) in params.into_iter().enumerate() {
143                    x.encode(idx, &mut stmt).map_err(|e| Error::from(e.to_string()))?
144                }
145                stmt
146                    .execute(&[])
147                    .map_err(|e| Error::from(e.to_string()))?;
148                if !*trans {
149                    oc.conn.commit().map_err(|e| Error::from(e.to_string()))?;
150                    *trans = false;
151                }
152                let rows_affected = stmt.row_count().map_err(|e| Error::from(e.to_string()))?;
153                let mut ret = vec![];
154                for i in 1..=stmt.bind_count() {
155                    let res: Result<String, _> = stmt.bind_value(i);
156                    match res {
157                        Ok(v) => {
158                            ret.push(Value::String(v))
159                        }
160                        Err(_) => {
161                            ret.push(Value::Null)
162                        }
163                    }
164                }
165                Ok(ExecResult {
166                    rows_affected,
167                    last_insert_id: Value::Array(ret),
168                })
169            }
170        });
171        Box::pin(async {
172            task.await.map_err(|e| Error::from(e.to_string()))?
173        })
174    }
175
176    fn ping(&mut self) -> BoxFuture<Result<(), rbdc::Error>> {
177        let oc = self.clone();
178        let task = tokio::task::spawn_blocking(move || {
179            oc.conn.ping()
180                .map_err(|e| Error::from(e.to_string()))?;
181            Ok(())
182        });
183        Box::pin(async {
184            task.await.map_err(|e| Error::from(e.to_string()))?
185        })
186    }
187
188    fn close(&mut self) -> BoxFuture<Result<(), rbdc::Error>> {
189        let oc = self.clone();
190        let task = tokio::task::spawn_blocking(move || {
191            oc.conn.commit().map_err(|e| Error::from(e.to_string()))?;
192            oc.conn.close().map_err(|e| Error::from(e.to_string()))?;
193            Ok(())
194        });
195        Box::pin(async {
196            task.await.map_err(|e| Error::from(e.to_string()))?
197        })
198    }
199}
200
201impl OracleConnection {
202    pub async fn establish(opt: &OracleConnectOptions) -> Result<Self, Error> {
203        let conn = OraConnect::connect(opt.username.clone(), opt.password.clone(), opt.connect_string.clone())
204            .map_err(|e| Error::from(e.to_string()))?;
205        Ok(Self {
206            conn: Arc::new(conn),
207            is_trans: Arc::new(Mutex::new(false)),
208        })
209    }
210}