1pub extern crate tiberius;
2
3pub mod decode;
4pub mod driver;
5pub mod encode;
6
7pub use crate::driver::MssqlDriver;
8pub use crate::driver::MssqlDriver as Driver;
9
10use crate::decode::Decode;
11use crate::encode::Encode;
12use futures_core::future::BoxFuture;
13use futures_core::Stream;
14use rbdc::db::{ConnectOptions, Connection, ExecResult, MetaData, Placeholder, Row};
15use rbdc::Error;
16use rbs::Value;
17use std::sync::Arc;
18use tiberius::{Client, Column, ColumnData, Config, Query};
19use tokio::net::TcpStream;
20use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt};
21
22pub struct MssqlConnection {
23 inner: Option<Client<Compat<TcpStream>>>,
24}
25
26impl MssqlConnection {
27 pub async fn establish(cfg: &Config) -> Result<Self, Error> {
29 let tcp = TcpStream::connect(cfg.get_addr())
31 .await
32 .map_err(|e| Error::from(e.to_string()))?;
33 tcp.set_nodelay(true)?;
34 let c = Client::connect(cfg.clone(), tcp.compat_write())
35 .await
36 .map_err(|e| Error::from(e.to_string()))?;
37 Ok(Self { inner: Some(c) })
38 }
39}
40
41#[derive(Debug)]
42pub struct MssqlConnectOptions(pub Config);
43
44impl ConnectOptions for MssqlConnectOptions {
45 fn connect(&self) -> BoxFuture<Result<Box<dyn Connection>, Error>> {
46 Box::pin(async move {
47 let v = MssqlConnection::establish(&self.0)
48 .await
49 .map_err(|e| Error::from(e.to_string()))?;
50 Ok(Box::new(v) as Box<dyn Connection>)
51 })
52 }
53
54 fn set_uri(&mut self, url: &str) -> Result<(), Error> {
55 if url.contains("jdbc"){
56 let mut config = Config::from_jdbc_string(url).map_err(|e| Error::from(e.to_string()))?;
57 config.trust_cert();
58 *self = MssqlConnectOptions(config);
59 }else{
60 let mut config = Config::from_ado_string(url).map_err(|e| Error::from(e.to_string()))?;
61 config.trust_cert();
62 *self = MssqlConnectOptions(config);
63 }
64 Ok(())
65 }
66}
67
68#[derive(Debug)]
69pub struct MssqlRow {
70 pub columns: Arc<Vec<Column>>,
71 pub datas: Vec<ColumnData<'static>>,
72}
73
74#[derive(Debug)]
75pub struct MssqlMetaData(pub Arc<Vec<Column>>);
76
77impl MetaData for MssqlMetaData {
78 fn column_len(&self) -> usize {
79 self.0.len()
80 }
81
82 fn column_name(&self, i: usize) -> String {
83 self.0[i].name().to_string()
84 }
85
86 fn column_type(&self, i: usize) -> String {
87 format!("{:?}", self.0[i].column_type())
88 }
89}
90
91impl Row for MssqlRow {
92 fn meta_data(&self) -> Box<dyn MetaData> {
93 Box::new(MssqlMetaData(self.columns.clone()))
94 }
95
96 fn get(&mut self, i: usize) -> Result<Value, Error> {
97 Value::decode(&self.datas[i])
98 }
99}
100
101impl Connection for MssqlConnection {
102 fn get_rows(
103 &mut self,
104 sql: &str,
105 params: Vec<Value>,
106 ) -> BoxFuture<Result<Vec<Box<dyn Row>>, Error>> {
107 let sql = MssqlDriver {}.exchange(sql);
108 Box::pin(async move {
109 let mut q = Query::new(sql);
110 for x in params {
111 x.encode(&mut q)?;
112 }
113 let v = q
114 .query(
115 self.inner
116 .as_mut()
117 .ok_or_else(|| Error::from("MssqlConnection is close"))?,
118 )
119 .await
120 .map_err(|e| Error::from(e.to_string()))?;
121 let mut results = Vec::with_capacity(v.size_hint().0);
122 let s = v
123 .into_results()
124 .await
125 .map_err(|e| Error::from(e.to_string()))?;
126 for item in s {
127 for r in item {
128 let mut columns = Vec::with_capacity(r.columns().len());
129 let mut row = MssqlRow {
130 columns: Arc::new(vec![]),
131 datas: Vec::with_capacity(r.columns().len()),
132 };
133 for x in r.columns() {
134 columns.push(x.clone());
135 }
136 row.columns = Arc::new(columns);
137 for x in r {
138 row.datas.push(x);
139 }
140 results.push(Box::new(row) as Box<dyn Row>);
141 }
142 }
143 Ok(results)
144 })
145 }
146
147 fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<Result<ExecResult, Error>> {
148 let sql = MssqlDriver {}.exchange(sql);
149 Box::pin(async move {
150 let mut q = Query::new(sql);
151 for x in params {
152 x.encode(&mut q)?;
153 }
154 let v = q
155 .execute(
156 self.inner
157 .as_mut()
158 .ok_or_else(|| Error::from("MssqlConnection is close"))?,
159 )
160 .await
161 .map_err(|e| Error::from(e.to_string()))?;
162 Ok(ExecResult {
163 rows_affected: {
164 let mut rows_affected = 0;
165 for x in v.rows_affected() {
166 rows_affected += x.clone();
167 }
168 rows_affected
169 },
170 last_insert_id: Value::Null,
171 })
172 })
173 }
174
175 fn close(&mut self) -> BoxFuture<Result<(), Error>> {
176 Box::pin(async move {
177 if let Some(v) = self.inner.take() {
179 v.close().await.map_err(|e| Error::from(e.to_string()))?;
180 }
181 Ok(())
182 })
183 }
184
185 fn ping(&mut self) -> BoxFuture<Result<(), rbdc::Error>> {
186 Box::pin(async move {
188 self.inner
189 .as_mut()
190 .ok_or_else(|| Error::from("MssqlConnection is close"))?
191 .query("select 1", &[])
192 .await
193 .map_err(|e| Error::from(e.to_string()))?;
194 Ok(())
195 })
196 }
197
198 fn begin(&mut self) -> BoxFuture<Result<(), Error>> {
199 Box::pin(async move {
200 self.inner
201 .as_mut()
202 .ok_or_else(|| Error::from("MssqlConnection is close"))?
203 .simple_query("begin tran")
204 .await
205 .map_err(|e| Error::from(e.to_string()))?;
206 Ok(())
207 })
208 }
209
210 fn commit(&mut self) -> BoxFuture<Result<(), Error>> {
211 Box::pin(async move {
212 self.inner
213 .as_mut()
214 .ok_or_else(|| Error::from("MssqlConnection is close"))?
215 .simple_query("commit")
216 .await
217 .map_err(|e| Error::from(e.to_string()))?;
218 Ok(())
219 })
220 }
221
222 fn rollback(&mut self) -> BoxFuture<Result<(), Error>> {
223 Box::pin(async move {
224 self.inner
225 .as_mut()
226 .ok_or_else(|| Error::from("MssqlConnection is close"))?
227 .simple_query("rollback")
228 .await
229 .map_err(|e| Error::from(e.to_string()))?;
230 Ok(())
231 })
232 }
233}
234
235#[cfg(test)]
236mod test {
237 #[test]
238 fn test_datetime() {}
239}