rbdc_mssql/
lib.rs

1pub extern crate tiberius;
2
3pub mod decode;
4pub mod driver;
5pub mod encode;
6
7pub use crate::driver::MssqlDriver;
8pub use crate::driver::MssqlDriver as Driver;
9
10use crate::decode::Decode;
11use crate::encode::Encode;
12use futures_core::future::BoxFuture;
13use futures_core::Stream;
14use rbdc::db::{ConnectOptions, Connection, ExecResult, MetaData, Placeholder, Row};
15use rbdc::Error;
16use rbs::Value;
17use std::sync::Arc;
18use tiberius::{Client, Column, ColumnData, Config, Query};
19use tokio::net::TcpStream;
20use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt};
21
22pub struct MssqlConnection {
23    inner: Option<Client<Compat<TcpStream>>>,
24}
25
26impl MssqlConnection {
27    /// let cfg = Config::from_jdbc_string(url).map_err(|e| Error::from(e.to_owned()))?;
28    pub async fn establish(cfg: &Config) -> Result<Self, Error> {
29        // let cfg = Config::from_jdbc_string(url).map_err(|e| Error::from(e.to_owned()))?;
30        let tcp = TcpStream::connect(cfg.get_addr())
31            .await
32            .map_err(|e| Error::from(e.to_string()))?;
33        tcp.set_nodelay(true)?;
34        let c = Client::connect(cfg.clone(), tcp.compat_write())
35            .await
36            .map_err(|e| Error::from(e.to_string()))?;
37        Ok(Self { inner: Some(c) })
38    }
39}
40
41#[derive(Debug)]
42pub struct MssqlConnectOptions(pub Config);
43
44impl ConnectOptions for MssqlConnectOptions {
45    fn connect(&self) -> BoxFuture<Result<Box<dyn Connection>, Error>> {
46        Box::pin(async move {
47            let v = MssqlConnection::establish(&self.0)
48                .await
49                .map_err(|e| Error::from(e.to_string()))?;
50            Ok(Box::new(v) as Box<dyn Connection>)
51        })
52    }
53
54    fn set_uri(&mut self, url: &str) -> Result<(), Error> {
55        if url.contains("jdbc"){
56            let mut config = Config::from_jdbc_string(url).map_err(|e| Error::from(e.to_string()))?;
57            config.trust_cert();
58            *self = MssqlConnectOptions(config);   
59        }else{
60            let mut config = Config::from_ado_string(url).map_err(|e| Error::from(e.to_string()))?;
61            config.trust_cert();
62            *self = MssqlConnectOptions(config);
63        }
64        Ok(())
65    }
66}
67
68#[derive(Debug)]
69pub struct MssqlRow {
70    pub columns: Arc<Vec<Column>>,
71    pub datas: Vec<ColumnData<'static>>,
72}
73
74#[derive(Debug)]
75pub struct MssqlMetaData(pub Arc<Vec<Column>>);
76
77impl MetaData for MssqlMetaData {
78    fn column_len(&self) -> usize {
79        self.0.len()
80    }
81
82    fn column_name(&self, i: usize) -> String {
83        self.0[i].name().to_string()
84    }
85
86    fn column_type(&self, i: usize) -> String {
87        format!("{:?}", self.0[i].column_type())
88    }
89}
90
91impl Row for MssqlRow {
92    fn meta_data(&self) -> Box<dyn MetaData> {
93        Box::new(MssqlMetaData(self.columns.clone()))
94    }
95
96    fn get(&mut self, i: usize) -> Result<Value, Error> {
97        Value::decode(&self.datas[i])
98    }
99}
100
101impl Connection for MssqlConnection {
102    fn get_rows(
103        &mut self,
104        sql: &str,
105        params: Vec<Value>,
106    ) -> BoxFuture<Result<Vec<Box<dyn Row>>, Error>> {
107        let sql = MssqlDriver {}.exchange(sql);
108        Box::pin(async move {
109            let mut q = Query::new(sql);
110            for x in params {
111                x.encode(&mut q)?;
112            }
113            let v = q
114                .query(
115                    self.inner
116                        .as_mut()
117                        .ok_or_else(|| Error::from("MssqlConnection is close"))?,
118                )
119                .await
120                .map_err(|e| Error::from(e.to_string()))?;
121            let mut results = Vec::with_capacity(v.size_hint().0);
122            let s = v
123                .into_results()
124                .await
125                .map_err(|e| Error::from(e.to_string()))?;
126            for item in s {
127                for r in item {
128                    let mut columns = Vec::with_capacity(r.columns().len());
129                    let mut row = MssqlRow {
130                        columns: Arc::new(vec![]),
131                        datas: Vec::with_capacity(r.columns().len()),
132                    };
133                    for x in r.columns() {
134                        columns.push(x.clone());
135                    }
136                    row.columns = Arc::new(columns);
137                    for x in r {
138                        row.datas.push(x);
139                    }
140                    results.push(Box::new(row) as Box<dyn Row>);
141                }
142            }
143            Ok(results)
144        })
145    }
146
147    fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<Result<ExecResult, Error>> {
148        let sql = MssqlDriver {}.exchange(sql);
149        Box::pin(async move {
150            let mut q = Query::new(sql);
151            for x in params {
152                x.encode(&mut q)?;
153            }
154            let v = q
155                .execute(
156                    self.inner
157                        .as_mut()
158                        .ok_or_else(|| Error::from("MssqlConnection is close"))?,
159                )
160                .await
161                .map_err(|e| Error::from(e.to_string()))?;
162            Ok(ExecResult {
163                rows_affected: {
164                    let mut rows_affected = 0;
165                    for x in v.rows_affected() {
166                        rows_affected += x.clone();
167                    }
168                    rows_affected
169                },
170                last_insert_id: Value::Null,
171            })
172        })
173    }
174
175    fn close(&mut self) -> BoxFuture<Result<(), Error>> {
176        Box::pin(async move {
177            //inner must be Option,so we can take owner and call close(self) method.
178            if let Some(v) = self.inner.take() {
179                v.close().await.map_err(|e| Error::from(e.to_string()))?;
180            }
181            Ok(())
182        })
183    }
184
185    fn ping(&mut self) -> BoxFuture<Result<(), rbdc::Error>> {
186        //TODO While 'select 1' can temporarily solve the problem of checking that the connection is valid, it looks ugly.Better replace it with something better way
187        Box::pin(async move {
188            self.inner
189                .as_mut()
190                .ok_or_else(|| Error::from("MssqlConnection is close"))?
191                .query("select 1", &[])
192                .await
193                .map_err(|e| Error::from(e.to_string()))?;
194            Ok(())
195        })
196    }
197
198    fn begin(&mut self) -> BoxFuture<Result<(), Error>> {
199        Box::pin(async move {
200            self.inner
201                .as_mut()
202                .ok_or_else(|| Error::from("MssqlConnection is close"))?
203                .simple_query("begin tran")
204                .await
205                .map_err(|e| Error::from(e.to_string()))?;
206            Ok(())
207        })
208    }
209
210    fn commit(&mut self) -> BoxFuture<Result<(), Error>> {
211        Box::pin(async move {
212            self.inner
213                .as_mut()
214                .ok_or_else(|| Error::from("MssqlConnection is close"))?
215                .simple_query("commit")
216                .await
217                .map_err(|e| Error::from(e.to_string()))?;
218            Ok(())
219        })
220    }
221
222    fn rollback(&mut self) -> BoxFuture<Result<(), Error>> {
223        Box::pin(async move {
224            self.inner
225                .as_mut()
226                .ok_or_else(|| Error::from("MssqlConnection is close"))?
227                .simple_query("rollback")
228                .await
229                .map_err(|e| Error::from(e.to_string()))?;
230            Ok(())
231        })
232    }
233}
234
235#[cfg(test)]
236mod test {
237    #[test]
238    fn test_datetime() {}
239}