rbdc_oracle/
connection.rs

1use std::sync::Arc;
2
3use futures_core::future::BoxFuture;
4use oracle::Connection as OraConnect;
5use oracle::sql_type::OracleType;
6use rbdc::db::{Connection, ExecResult, Row};
7use rbdc::Error;
8use rbs::Value;
9
10use crate::{OracleColumn, OracleData, OracleRow};
11use crate::driver::OracleDriver;
12use crate::encode::Encode;
13use crate::options::OracleConnectOptions;
14
15#[derive(Clone)]
16pub struct OracleConnection {
17    pub conn: Arc<OraConnect>,
18    pub is_trans: bool
19}
20
21impl Connection for OracleConnection {
22    fn get_rows(
23        &mut self,
24        sql: &str,
25        params: Vec<Value>,
26    ) -> BoxFuture<Result<Vec<Box<dyn Row>>, rbdc::Error>> {
27        let sql: String = OracleDriver {}.pub_exchange(sql);
28        Box::pin(async move {
29            let builder = self.conn.statement(&sql);
30            let mut stmt = builder.build().map_err(|e| Error::from(e.to_string()))?;
31
32            for (idx,x) in params.into_iter().enumerate() {
33                x.encode(idx,&mut stmt).map_err(|e| Error::from(e.to_string()))?
34            }
35
36            let rows = stmt.query(&[]).map_err(|e| Error::from(e.to_string()))?;
37            let col_infos = rows.column_info();
38            let col_count = col_infos.len();
39            let mut results = Vec::with_capacity(col_count);
40            let mut columns = Vec::with_capacity(col_count);
41            for info in col_infos.iter() {
42                columns.push(OracleColumn {
43                    name: info.name().to_string().to_lowercase(),
44                    column_type: info.oracle_type().clone(),
45                })
46            }
47            for row_result in rows {
48                let row = row_result.map_err(|e| Error::from(e.to_string()))?;
49                let mut datas = Vec::with_capacity(col_count);
50                for col in row.sql_values().iter() {
51                    let t = col.oracle_type().map_err(|e| Error::from(e.to_string()))?;
52                    let t = t.clone();
53                    if let Ok(true) = col.is_null() {
54                        datas.push(OracleData {
55                            str: None,
56                            bin: None,
57                            column_type: t.clone(),
58                            is_sql_null: true
59                        })
60                    } else {
61                        if t == OracleType::BLOB {
62                            match col.get::<Vec<u8>>() {
63                                Ok(bin) => datas.push(OracleData {
64                                    str: None,
65                                    bin: Some(bin),
66                                    column_type: t.clone(),
67                                    is_sql_null: false
68                                }),
69                                Err(_) => datas.push(OracleData {
70                                    str: None,
71                                    bin: None,
72                                    column_type: t.clone(),
73                                    is_sql_null: false
74                                }),
75                            }
76                        }else{
77                            match col.get::<String>() {
78                                Ok(str) => datas.push(OracleData {
79                                    str: Some(str),
80                                    bin: None,
81                                    column_type: t.clone(),
82                                    is_sql_null: false
83                                }),
84                                Err(_) => datas.push(OracleData {
85                                    str: None,
86                                    bin: None,
87                                    column_type: t.clone(),
88                                    is_sql_null: false
89                                }),
90                            }
91                        }
92                    }
93                }
94                let row = OracleRow {
95                    columns: Arc::new(columns.clone()),
96                    datas: datas,
97                };
98                results.push(Box::new(row) as Box<dyn Row>);
99            }
100            Ok(results)
101        })
102    }
103
104    fn exec(
105        &mut self,
106        sql: &str,
107        params: Vec<Value>,
108    ) -> BoxFuture<Result<ExecResult, rbdc::Error>> {
109        if sql == "begin" {
110            self.is_trans = true;
111            Box::pin(async move {
112                Ok(ExecResult {
113                    rows_affected: 0,
114                    last_insert_id: Value::Null,
115                })
116            })
117        } else if sql == "commit" {
118            self.is_trans = false;
119            Box::pin(async move {
120                self.conn.commit().unwrap();
121                Ok(ExecResult {
122                    rows_affected: 0,
123                    last_insert_id: Value::Null,
124                })
125            })
126        } else if sql == "rollback" {
127            self.is_trans = false;
128            Box::pin(async move {
129                self.conn.rollback().unwrap();
130                Ok(ExecResult {
131                    rows_affected: 0,
132                    last_insert_id: Value::Null,
133                })
134            })
135        } else {
136            let sql: String = OracleDriver {}.pub_exchange(sql);
137            Box::pin(async move {
138                let builder = self.conn.statement(&sql);
139                let mut stmt = builder.build().map_err(|e| Error::from(e.to_string()))?;
140                for (idx,x) in params.into_iter().enumerate() {
141                    x.encode(idx,&mut stmt).map_err(|e| Error::from(e.to_string()))?
142                }
143                stmt
144                    .execute(&[])
145                    .map_err(|e| Error::from(e.to_string()))?;
146                if !(self.is_trans) {
147                    self.conn.commit().map_err(|e| Error::from(e.to_string()))?;
148                }
149                let rows_affected = stmt.row_count().map_err(|e| Error::from(e.to_string()))?;
150                let mut ret = vec![];
151                for i in 1..=stmt.bind_count(){
152                    let res:Result<String,_> = stmt.bind_value(i);
153                    match res {
154                        Ok(v)=>{
155                            ret.push(Value::String(v))
156                        },
157                        Err(_)=>{
158                            ret.push(Value::Null)
159                        }
160                    }
161                }
162                Ok(ExecResult {
163                    rows_affected,
164                    last_insert_id: Value::Array(ret),
165                })
166            })
167        }
168    }
169
170    fn close(&mut self) -> BoxFuture<Result<(), rbdc::Error>> {
171        Box::pin(async move {
172            self.conn.commit().map_err(|e| Error::from(e.to_string()))?;
173            self.conn.close().map_err(|e| Error::from(e.to_string()))?;
174            Ok(())
175        })
176    }
177
178    fn ping(&mut self) -> BoxFuture<Result<(), rbdc::Error>> {
179        Box::pin(async move {
180            self.conn.ping()
181                .map_err(|e| Error::from(e.to_string()))?;
182            Ok(())
183        })
184    }
185}
186
187impl OracleConnection {
188    pub async fn establish(opt: &OracleConnectOptions) -> Result<Self, Error> {
189        let conn = OraConnect::connect(opt.username.clone(), opt.password.clone(), opt.connect_string.clone())
190            .map_err(|e| Error::from(e.to_string()))?;
191        Ok(Self {
192            conn: Arc::new(conn),
193            is_trans: false
194        })
195    }
196}