rbdc_oracle/
connection.rs1use std::sync::Arc;
2
3use futures_core::future::BoxFuture;
4use oracle::Connection as OraConnect;
5use oracle::sql_type::OracleType;
6use rbdc::db::{Connection, ExecResult, Row};
7use rbdc::Error;
8use rbs::Value;
9
10use crate::{OracleColumn, OracleData, OracleRow};
11use crate::driver::OracleDriver;
12use crate::encode::Encode;
13use crate::options::OracleConnectOptions;
14
15#[derive(Clone)]
16pub struct OracleConnection {
17 pub conn: Arc<OraConnect>,
18 pub is_trans: bool
19}
20
21impl Connection for OracleConnection {
22 fn get_rows(
23 &mut self,
24 sql: &str,
25 params: Vec<Value>,
26 ) -> BoxFuture<Result<Vec<Box<dyn Row>>, rbdc::Error>> {
27 let sql: String = OracleDriver {}.pub_exchange(sql);
28 Box::pin(async move {
29 let builder = self.conn.statement(&sql);
30 let mut stmt = builder.build().map_err(|e| Error::from(e.to_string()))?;
31
32 for (idx,x) in params.into_iter().enumerate() {
33 x.encode(idx,&mut stmt).map_err(|e| Error::from(e.to_string()))?
34 }
35
36 let rows = stmt.query(&[]).map_err(|e| Error::from(e.to_string()))?;
37 let col_infos = rows.column_info();
38 let col_count = col_infos.len();
39 let mut results = Vec::with_capacity(col_count);
40 let mut columns = Vec::with_capacity(col_count);
41 for info in col_infos.iter() {
42 columns.push(OracleColumn {
43 name: info.name().to_string().to_lowercase(),
44 column_type: info.oracle_type().clone(),
45 })
46 }
47 for row_result in rows {
48 let row = row_result.map_err(|e| Error::from(e.to_string()))?;
49 let mut datas = Vec::with_capacity(col_count);
50 for col in row.sql_values().iter() {
51 let t = col.oracle_type().map_err(|e| Error::from(e.to_string()))?;
52 let t = t.clone();
53 if let Ok(true) = col.is_null() {
54 datas.push(OracleData {
55 str: None,
56 bin: None,
57 column_type: t.clone(),
58 is_sql_null: true
59 })
60 } else {
61 if t == OracleType::BLOB {
62 match col.get::<Vec<u8>>() {
63 Ok(bin) => datas.push(OracleData {
64 str: None,
65 bin: Some(bin),
66 column_type: t.clone(),
67 is_sql_null: false
68 }),
69 Err(_) => datas.push(OracleData {
70 str: None,
71 bin: None,
72 column_type: t.clone(),
73 is_sql_null: false
74 }),
75 }
76 }else{
77 match col.get::<String>() {
78 Ok(str) => datas.push(OracleData {
79 str: Some(str),
80 bin: None,
81 column_type: t.clone(),
82 is_sql_null: false
83 }),
84 Err(_) => datas.push(OracleData {
85 str: None,
86 bin: None,
87 column_type: t.clone(),
88 is_sql_null: false
89 }),
90 }
91 }
92 }
93 }
94 let row = OracleRow {
95 columns: Arc::new(columns.clone()),
96 datas: datas,
97 };
98 results.push(Box::new(row) as Box<dyn Row>);
99 }
100 Ok(results)
101 })
102 }
103
104 fn exec(
105 &mut self,
106 sql: &str,
107 params: Vec<Value>,
108 ) -> BoxFuture<Result<ExecResult, rbdc::Error>> {
109 if sql == "begin" {
110 self.is_trans = true;
111 Box::pin(async move {
112 Ok(ExecResult {
113 rows_affected: 0,
114 last_insert_id: Value::Null,
115 })
116 })
117 } else if sql == "commit" {
118 self.is_trans = false;
119 Box::pin(async move {
120 self.conn.commit().unwrap();
121 Ok(ExecResult {
122 rows_affected: 0,
123 last_insert_id: Value::Null,
124 })
125 })
126 } else if sql == "rollback" {
127 self.is_trans = false;
128 Box::pin(async move {
129 self.conn.rollback().unwrap();
130 Ok(ExecResult {
131 rows_affected: 0,
132 last_insert_id: Value::Null,
133 })
134 })
135 } else {
136 let sql: String = OracleDriver {}.pub_exchange(sql);
137 Box::pin(async move {
138 let builder = self.conn.statement(&sql);
139 let mut stmt = builder.build().map_err(|e| Error::from(e.to_string()))?;
140 for (idx,x) in params.into_iter().enumerate() {
141 x.encode(idx,&mut stmt).map_err(|e| Error::from(e.to_string()))?
142 }
143 stmt
144 .execute(&[])
145 .map_err(|e| Error::from(e.to_string()))?;
146 if !(self.is_trans) {
147 self.conn.commit().map_err(|e| Error::from(e.to_string()))?;
148 }
149 let rows_affected = stmt.row_count().map_err(|e| Error::from(e.to_string()))?;
150 let mut ret = vec![];
151 for i in 1..=stmt.bind_count(){
152 let res:Result<String,_> = stmt.bind_value(i);
153 match res {
154 Ok(v)=>{
155 ret.push(Value::String(v))
156 },
157 Err(_)=>{
158 ret.push(Value::Null)
159 }
160 }
161 }
162 Ok(ExecResult {
163 rows_affected,
164 last_insert_id: Value::Array(ret),
165 })
166 })
167 }
168 }
169
170 fn close(&mut self) -> BoxFuture<Result<(), rbdc::Error>> {
171 Box::pin(async move {
172 self.conn.commit().map_err(|e| Error::from(e.to_string()))?;
173 self.conn.close().map_err(|e| Error::from(e.to_string()))?;
174 Ok(())
175 })
176 }
177
178 fn ping(&mut self) -> BoxFuture<Result<(), rbdc::Error>> {
179 Box::pin(async move {
180 self.conn.ping()
181 .map_err(|e| Error::from(e.to_string()))?;
182 Ok(())
183 })
184 }
185}
186
187impl OracleConnection {
188 pub async fn establish(opt: &OracleConnectOptions) -> Result<Self, Error> {
189 let conn = OraConnect::connect(opt.username.clone(), opt.password.clone(), opt.connect_string.clone())
190 .map_err(|e| Error::from(e.to_string()))?;
191 Ok(Self {
192 conn: Arc::new(conn),
193 is_trans: false
194 })
195 }
196}