tiberius_mssql_broker/
lib.rs1use std::collections::HashMap;
2use std::fmt::{Debug, Display, Formatter};
3
4use futures_core::Stream;
5use kanal::Sender;
6use log::{info, trace, warn};
7use rayon::prelude::*;
8use tiberius::{Client, Query};
9use tokio::net::TcpStream;
10use tokio_util::compat::Compat;
11
12use crate::broker::{Broker, ListenEvent};
13use crate::config::SqlConfig;
14use crate::connection::LongPooling;
15use crate::decode::Decode;
16use crate::encode::Encode;
17use crate::error::Error;
18use crate::value::Value;
19
20pub mod connection;
21pub mod config;
22pub mod deadpool;
23pub mod decode;
24pub mod encode;
25pub mod error;
26pub mod broker;
27pub mod value;
28pub mod cnv;
29pub mod json_ext;
30
31#[derive(Debug)]
32pub struct ExecResult {
33 pub rows_affected: u64,
34 pub last_insert_id: Value,
35}
36
37impl Display for ExecResult {
38 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
39 f.debug_map()
40 .key(&"rows_affected")
41 .value(&self.rows_affected)
42 .key(&"last_insert_id")
43 .value(&self.last_insert_id)
44 .finish()
45 }
46}
47
48pub struct MssqlConnection {
49 inner: Option<Client<Compat<TcpStream>>>,
50 pool: Option<LongPooling>,
51 cfg: SqlConfig,
52}
53
54impl MssqlConnection {
55 pub async fn establish(cfg: &SqlConfig) -> Result<Self, tiberius::error::Error> {
56 info!("connecting to - {:?}",&cfg);
57 let connect = connection::client(cfg).await?;
58 let pooling = LongPooling::new(cfg)?;
59 Ok(Self {
60 cfg: cfg.to_owned(),
61 inner: Some(connect),
62 pool: Some(pooling),
63 })
64 }
65
66 pub async fn select(
67 &mut self,
68 sql: &str,
69 params: Vec<Value>,
70 ) -> Result<Vec<HashMap<String, Value>>, Error> {
71 trace!("select query - {}",&sql);
72 let mut q = Query::new(sql);
73 for x in params {
74 x.encode(&mut q)?;
75 }
76 let v = q
77 .query(self.inner.as_mut().ok_or_else(|| Error::from("MssqlConnection is close"))?)
78 .await
79 .map_err(|e| Error::from(e.to_string()))?;
80 let mut results = Vec::with_capacity(v.size_hint().0);
81 let s = v
82 .into_results()
83 .await
84 .map_err(|e| Error::from(e.to_string()))?;
85 for item in s {
86 let (sx, rx) = kanal::unbounded();
87 item.into_par_iter().for_each(|r| {
88 let columns = r.columns().to_owned();
89 let mut row = HashMap::with_capacity(columns.len());
90 let mut count = 0;
91 for x in r {
92 let v = Value::decode(&x).unwrap();
93 match columns.get(count) {
94 None => {}
95 Some(col) => {
96 let name = col.name();
97 row.insert(name.to_string(), v);
98 }
99 }
100 count += 1;
101 }
102 sx.send(row).unwrap();
103 });
104 drop(sx);
105 while let Ok(row) = rx.recv() {
106 results.push(row);
107 }
108 }
109 Ok(results)
110 }
111
112 pub async fn exec(
113 &mut self,
114 sql: &str,
115 params: Vec<Value>,
116 ) -> Result<ExecResult, Error> {
117 trace!("executing query - {}",&sql);
118 let mut q = Query::new(sql);
119 for x in params {
120 x.encode(&mut q)?;
121 }
122 let v = q
123 .execute(self.inner.as_mut().ok_or_else(|| Error::from("MssqlConnection is close"))?)
124 .await
125 .map_err(|e| Error::from(e.to_string()))?;
126 Ok(ExecResult {
127 rows_affected: {
128 let mut rows_affected = 0;
129 for x in v.rows_affected() {
130 rows_affected += x.clone();
131 }
132 rows_affected
133 },
134 last_insert_id: Value::Int(None),
135 })
136 }
137
138 async fn ping(&mut self) -> Result<(), Error> {
139 trace!("ping...");
140 let ping = self.inner
141 .as_mut().expect("Mssql Connection is closed")
142 .query("SELECT 1", &[])
143 .await
144 .map_err(|e| Error::from(e));
145 match ping {
146 Ok(_) => Ok(()),
147 Err(err) => Err(err)
148 }
149 }
150
151 pub async fn close(&mut self) -> Result<(), Error> {
152 warn!("closing connection...");
153 if let Some(v) = self.inner.take() {
154 v.close().await.map_err(|e| Error::from(e))?;
155 }
156 Ok(())
157 }
158
159 pub async fn listen(self, id: u64, table: String, sx: Sender<Vec<ListenEvent>>) -> Result<(), tiberius::error::Error> {
160 info!("a new listener added to table - {}", &table);
161 let pool = self.pool
162 .expect("Mssql connection pool is not created");
163 let cfg = self.cfg.clone();
164 let mut broker = Broker::new(
165 pool,
166 cfg,
167 table,
168 id,
169 sx,
170 );
171 info!("starting sql");
172 broker.start().await
173 }
174}