tiberius_mssql_broker/
lib.rs

1use std::collections::HashMap;
2use std::fmt::{Debug, Display, Formatter};
3
4use futures_core::Stream;
5use kanal::Sender;
6use log::{info, trace, warn};
7use rayon::prelude::*;
8use tiberius::{Client, Query};
9use tokio::net::TcpStream;
10use tokio_util::compat::Compat;
11
12use crate::broker::{Broker, ListenEvent};
13use crate::config::SqlConfig;
14use crate::connection::LongPooling;
15use crate::decode::Decode;
16use crate::encode::Encode;
17use crate::error::Error;
18use crate::value::Value;
19
20pub mod connection;
21pub mod config;
22pub mod deadpool;
23pub mod decode;
24pub mod encode;
25pub mod error;
26pub mod broker;
27pub mod value;
28pub mod cnv;
29pub mod json_ext;
30
31#[derive(Debug)]
32pub struct ExecResult {
33    pub rows_affected: u64,
34    pub last_insert_id: Value,
35}
36
37impl Display for ExecResult {
38    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
39        f.debug_map()
40            .key(&"rows_affected")
41            .value(&self.rows_affected)
42            .key(&"last_insert_id")
43            .value(&self.last_insert_id)
44            .finish()
45    }
46}
47
48pub struct MssqlConnection {
49    inner: Option<Client<Compat<TcpStream>>>,
50    pool: Option<LongPooling>,
51    cfg: SqlConfig,
52}
53
54impl MssqlConnection {
55    pub async fn establish(cfg: &SqlConfig) -> Result<Self, tiberius::error::Error> {
56        info!("connecting to - {:?}",&cfg);
57        let connect = connection::client(cfg).await?;
58        let pooling = LongPooling::new(cfg)?;
59        Ok(Self {
60            cfg: cfg.to_owned(),
61            inner: Some(connect),
62            pool: Some(pooling),
63		})
64    }
65
66    pub async fn select(
67        &mut self,
68        sql: &str,
69        params: Vec<Value>,
70    ) -> Result<Vec<HashMap<String, Value>>, Error> {
71        trace!("select query - {}",&sql);
72        let mut q = Query::new(sql);
73        for x in params {
74            x.encode(&mut q)?;
75        }
76        let v = q
77            .query(self.inner.as_mut().ok_or_else(|| Error::from("MssqlConnection is close"))?)
78            .await
79            .map_err(|e| Error::from(e.to_string()))?;
80        let mut results = Vec::with_capacity(v.size_hint().0);
81        let s = v
82            .into_results()
83            .await
84            .map_err(|e| Error::from(e.to_string()))?;
85        for item in s {
86            let (sx, rx) = kanal::unbounded();
87            item.into_par_iter().for_each(|r| {
88                let columns = r.columns().to_owned();
89                let mut row = HashMap::with_capacity(columns.len());
90                let mut count = 0;
91                for x in r {
92                    let v = Value::decode(&x).unwrap();
93                    match columns.get(count) {
94                        None => {}
95                        Some(col) => {
96                            let name = col.name();
97                            row.insert(name.to_string(), v);
98                        }
99                    }
100                    count += 1;
101                }
102                sx.send(row).unwrap();
103            });
104            drop(sx);
105            while let Ok(row) = rx.recv() {
106                results.push(row);
107            }
108        }
109        Ok(results)
110    }
111
112    pub async fn exec(
113        &mut self,
114        sql: &str,
115        params: Vec<Value>,
116    ) -> Result<ExecResult, Error> {
117        trace!("executing query - {}",&sql);
118        let mut q = Query::new(sql);
119        for x in params {
120            x.encode(&mut q)?;
121        }
122        let v = q
123            .execute(self.inner.as_mut().ok_or_else(|| Error::from("MssqlConnection is close"))?)
124            .await
125            .map_err(|e| Error::from(e.to_string()))?;
126        Ok(ExecResult {
127            rows_affected: {
128                let mut rows_affected = 0;
129                for x in v.rows_affected() {
130                    rows_affected += x.clone();
131                }
132                rows_affected
133            },
134            last_insert_id: Value::Int(None),
135        })
136    }
137
138    async fn ping(&mut self) -> Result<(), Error> {
139        trace!("ping...");
140        let ping = self.inner
141            .as_mut().expect("Mssql Connection is closed")
142            .query("SELECT 1", &[])
143            .await
144            .map_err(|e| Error::from(e));
145        match ping {
146            Ok(_) => Ok(()),
147            Err(err) => Err(err)
148        }
149    }
150
151    pub async fn close(&mut self) -> Result<(), Error> {
152        warn!("closing connection...");
153        if let Some(v) = self.inner.take() {
154            v.close().await.map_err(|e| Error::from(e))?;
155        }
156        Ok(())
157    }
158
159    pub async fn listen(self, id: u64, table: String, sx: Sender<Vec<ListenEvent>>) -> Result<(), tiberius::error::Error> {
160        info!("a new listener added to table - {}", &table);
161        let pool = self.pool
162            .expect("Mssql connection pool is not created");
163        let cfg = self.cfg.clone();
164        let mut broker = Broker::new(
165			pool,
166			cfg,
167			table,
168			id,
169			sx,
170		);
171        info!("starting sql");
172        broker.start().await
173    }
174}