rbdc_oracle/
connection.rs1use std::sync::{Arc, Mutex};
2
3use futures_core::future::BoxFuture;
4use oracle::Connection as OraConnect;
5use oracle::sql_type::OracleType;
6use rbdc::db::{Connection, ExecResult, Row};
7use rbdc::Error;
8use rbs::Value;
9
10use crate::{OracleColumn, OracleData, OracleRow};
11use crate::driver::OracleDriver;
12use crate::encode::Encode;
13use crate::options::OracleConnectOptions;
14
15#[derive(Clone)]
16pub struct OracleConnection {
17 pub conn: Arc<OraConnect>,
18 pub is_trans: Arc<Mutex<bool>>,
19}
20
21impl Connection for OracleConnection {
22 fn get_rows(
23 &mut self,
24 sql: &str,
25 params: Vec<Value>,
26 ) -> BoxFuture<Result<Vec<Box<dyn Row>>, Error>> {
27 let sql: String = OracleDriver {}.pub_exchange(sql);
28 let oc = self.clone();
29 let task = tokio::task::spawn_blocking(move || {
30 let builder = oc.conn.statement(&sql);
31 let mut stmt = builder.build().map_err(|e| Error::from(e.to_string()))?;
32
33 for (idx, x) in params.into_iter().enumerate() {
34 x.encode(idx, &mut stmt).map_err(|e| Error::from(e.to_string()))?
35 }
36
37 let rows = stmt.query(&[]).map_err(|e| Error::from(e.to_string()))?;
38 let col_infos = rows.column_info();
39 let col_count = col_infos.len();
40 let mut results = Vec::with_capacity(col_count);
41 let mut columns = Vec::with_capacity(col_count);
42 for info in col_infos.iter() {
43 columns.push(OracleColumn {
44 name: info.name().to_string().to_lowercase(),
45 column_type: info.oracle_type().clone(),
46 })
47 }
48 for row_result in rows {
49 let row = row_result.map_err(|e| Error::from(e.to_string()))?;
50 let mut datas = Vec::with_capacity(col_count);
51 for col in row.sql_values().iter() {
52 let t = col.oracle_type().map_err(|e| Error::from(e.to_string()))?;
53 let t = t.clone();
54 if let Ok(true) = col.is_null() {
55 datas.push(OracleData {
56 str: None,
57 bin: None,
58 column_type: t.clone(),
59 is_sql_null: true,
60 })
61 } else {
62 if t == OracleType::BLOB {
63 match col.get::<Vec<u8>>() {
64 Ok(bin) => datas.push(OracleData {
65 str: None,
66 bin: Some(bin),
67 column_type: t.clone(),
68 is_sql_null: false,
69 }),
70 Err(_) => datas.push(OracleData {
71 str: None,
72 bin: None,
73 column_type: t.clone(),
74 is_sql_null: false,
75 }),
76 }
77 } else {
78 match col.get::<String>() {
79 Ok(str) => datas.push(OracleData {
80 str: Some(str),
81 bin: None,
82 column_type: t.clone(),
83 is_sql_null: false,
84 }),
85 Err(_) => datas.push(OracleData {
86 str: None,
87 bin: None,
88 column_type: t.clone(),
89 is_sql_null: false,
90 }),
91 }
92 }
93 }
94 }
95 let row = OracleRow {
96 columns: Arc::new(columns.clone()),
97 datas: datas,
98 };
99 results.push(Box::new(row) as Box<dyn Row>);
100 }
101 Ok(results)
102 });
103 Box::pin(async move {
104 task.await.map_err(|e| Error::from(e.to_string()))?
105 })
106 }
107
108 fn exec(
109 &mut self,
110 sql: &str,
111 params: Vec<Value>,
112 ) -> BoxFuture<Result<ExecResult, Error>> {
113 let oc = self.clone();
114 let sql = sql.to_string();
115 let task = tokio::task::spawn_blocking(move || {
116 let mut trans = oc.is_trans.lock()
117 .map_err(|e| Error::from(e.to_string()))?;
118 if sql == "begin" {
119 *trans = true;
120 Ok(ExecResult {
121 rows_affected: 0,
122 last_insert_id: Value::Null,
123 })
124 } else if sql == "commit" {
125 oc.conn.commit().unwrap();
126 *trans = false;
127 Ok(ExecResult {
128 rows_affected: 0,
129 last_insert_id: Value::Null,
130 })
131 } else if sql == "rollback" {
132 oc.conn.rollback().unwrap();
133 *trans = false;
134 Ok(ExecResult {
135 rows_affected: 0,
136 last_insert_id: Value::Null,
137 })
138 } else {
139 let sql: String = OracleDriver {}.pub_exchange(&sql);
140 let builder = oc.conn.statement(&sql);
141 let mut stmt = builder.build().map_err(|e| Error::from(e.to_string()))?;
142 for (idx, x) in params.into_iter().enumerate() {
143 x.encode(idx, &mut stmt).map_err(|e| Error::from(e.to_string()))?
144 }
145 stmt
146 .execute(&[])
147 .map_err(|e| Error::from(e.to_string()))?;
148 if !*trans {
149 oc.conn.commit().map_err(|e| Error::from(e.to_string()))?;
150 *trans = false;
151 }
152 let rows_affected = stmt.row_count().map_err(|e| Error::from(e.to_string()))?;
153 let mut ret = vec![];
154 for i in 1..=stmt.bind_count() {
155 let res: Result<String, _> = stmt.bind_value(i);
156 match res {
157 Ok(v) => {
158 ret.push(Value::String(v))
159 }
160 Err(_) => {
161 ret.push(Value::Null)
162 }
163 }
164 }
165 Ok(ExecResult {
166 rows_affected,
167 last_insert_id: Value::Array(ret),
168 })
169 }
170 });
171 Box::pin(async {
172 task.await.map_err(|e| Error::from(e.to_string()))?
173 })
174 }
175
176 fn ping(&mut self) -> BoxFuture<Result<(), rbdc::Error>> {
177 let oc = self.clone();
178 let task = tokio::task::spawn_blocking(move || {
179 oc.conn.ping()
180 .map_err(|e| Error::from(e.to_string()))?;
181 Ok(())
182 });
183 Box::pin(async {
184 task.await.map_err(|e| Error::from(e.to_string()))?
185 })
186 }
187
188 fn close(&mut self) -> BoxFuture<Result<(), rbdc::Error>> {
189 let oc = self.clone();
190 let task = tokio::task::spawn_blocking(move || {
191 oc.conn.commit().map_err(|e| Error::from(e.to_string()))?;
192 oc.conn.close().map_err(|e| Error::from(e.to_string()))?;
193 Ok(())
194 });
195 Box::pin(async {
196 task.await.map_err(|e| Error::from(e.to_string()))?
197 })
198 }
199}
200
201impl OracleConnection {
202 pub async fn establish(opt: &OracleConnectOptions) -> Result<Self, Error> {
203 let conn = OraConnect::connect(opt.username.clone(), opt.password.clone(), opt.connect_string.clone())
204 .map_err(|e| Error::from(e.to_string()))?;
205 Ok(Self {
206 conn: Arc::new(conn),
207 is_trans: Arc::new(Mutex::new(false)),
208 })
209 }
210}