tiny_web/sys/dbs/
pgsql.rs

1use std::{
2    collections::{btree_map::Entry, BTreeMap},
3    future::Future,
4    io,
5    pin::Pin,
6    sync::Arc,
7    task::{Context, Poll},
8};
9
10use chrono::{DateTime, Utc};
11use futures_util::{pin_mut, TryStreamExt};
12use postgres::{
13    tls::{ChannelBinding, MakeTlsConnect, TlsConnect},
14    types::{ToSql, Type},
15    Column, Error, NoTls, Row, Statement, ToStatement,
16};
17use ring::digest;
18use rustls::{pki_types::ServerName, ClientConfig, RootCertStore};
19use serde_json::Value;
20use tiny_web_macro::fnv1a_64;
21use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
22use tokio_postgres::Client;
23use tokio_rustls::{client::TlsStream, TlsConnector};
24use x509_certificate::{
25    DigestAlgorithm::{Sha1, Sha256, Sha384, Sha512},
26    SignatureAlgorithm::{EcdsaSha256, EcdsaSha384, Ed25519, NoSignature, RsaSha1, RsaSha256, RsaSha384, RsaSha512},
27    X509Certificate,
28};
29
30use crate::sys::{data::Data, init::DBConfig, log::Log};
31
32use super::adapter::{KeyOrQuery, NoCertificateVerification, StrOrI64OrUSize};
33
34/// Response to the result of the query
35#[derive(Debug)]
36enum DBResult {
37    /// The request was completed successfully.
38    Vec(Vec<Row>),
39    /// The request was completed successfully without result.
40    Void,
41    /// Connection is empty.
42    NoClient,
43    /// Query execution error.
44    ErrQuery(String),
45    /// Connection is lost.
46    ErrConnect(String),
47    /// No prepare query
48    ErrPrepare,
49}
50
51/// Responsible for working with postgresql database
52pub(crate) struct PgSql {
53    /// Client for connection to database.
54    client: Option<Client>,
55    /// Connection config.
56    sql_conn: tokio_postgres::Config,
57    /// Use tls for connection when sslmode=require.
58    tls: Option<MakeRustlsConnect>,
59    /// Prepare statements to database.
60    prepare: BTreeMap<i64, PgStatement>,
61}
62
63/// Statement to database
64pub(crate) struct PgStatement {
65    /// Statement to database
66    statement: Statement,
67    /// Sql query to database
68    sql: String,
69}
70
71/// Names of columns
72type PgColumnName = (usize, fn(&Row, usize) -> Data);
73
74impl PgSql {
75    /// Initializes a new object `PgSql`
76    pub fn new(config: Arc<DBConfig>) -> Option<PgSql> {
77        let (sql_conn, tls) = match PgSql::create_connect_string(&config) {
78            Ok(v) => v,
79            Err(e) => {
80                Log::stop(609, Some(e.to_string()));
81                return None;
82            }
83        };
84
85        Some(PgSql {
86            client: None,
87            sql_conn,
88            tls,
89            prepare: BTreeMap::new(),
90        })
91    }
92
93    fn create_connect_string(config: &DBConfig) -> Result<(tokio_postgres::Config, Option<MakeRustlsConnect>), Error> {
94        let mut conn_str = String::with_capacity(512);
95        //host
96        conn_str.push_str("host='");
97        conn_str.push_str(&config.host);
98        conn_str.push_str("' ");
99        //port
100        if let Some(p) = &config.port {
101            conn_str.push_str("port='");
102            conn_str.push_str(&p.to_string());
103            conn_str.push_str("' ");
104        }
105        // Database name
106        conn_str.push_str("dbname='");
107        conn_str.push_str(&config.name);
108        conn_str.push_str("' ");
109        //user
110        if let Some(u) = &config.user {
111            conn_str.push_str("user='");
112            conn_str.push_str(u);
113            conn_str.push_str("' ");
114        }
115        //password
116        if let Some(p) = &config.pwd {
117            conn_str.push_str("password='");
118            conn_str.push_str(p);
119            conn_str.push_str("' ");
120        }
121        //sslmode
122        if config.sslmode {
123            conn_str.push_str("sslmode=require ");
124        }
125        //connect_timeout
126        conn_str.push_str("connect_timeout=1 ");
127        //application_name
128        conn_str.push_str("application_name='");
129        conn_str.push_str(env!("CARGO_PKG_NAME"));
130        conn_str.push(' ');
131        conn_str.push_str(env!("CARGO_PKG_VERSION"));
132        conn_str.push_str("' ");
133        //options
134        conn_str.push_str("options='--client_encoding=UTF8'");
135
136        let sql_conn: tokio_postgres::Config = match conn_str.parse() {
137            Ok(c) => c,
138            Err(e) => return Err(e),
139        };
140        let tls = if config.sslmode {
141            let mut config = ClientConfig::builder().with_root_certificates(RootCertStore::empty()).with_no_client_auth();
142            config.dangerous().set_certificate_verifier(Arc::new(NoCertificateVerification {}));
143            Some(MakeRustlsConnect::new(config))
144        } else {
145            None
146        };
147
148        Ok((sql_conn, tls))
149    }
150
151    pub(crate) async fn check_db(config: &DBConfig, sql: Option<Vec<String>>) -> Result<String, String> {
152        let (sql_conn, tls) = match PgSql::create_connect_string(config) {
153            Ok(v) => v,
154            Err(e) => return Err(e.to_string()),
155        };
156        let client = match tls {
157            Some(tls) => match sql_conn.connect(tls).await {
158                Ok((client, connection)) => {
159                    tokio::spawn(async move {
160                        let _ = connection.await;
161                    });
162                    client
163                }
164                Err(e) => return Err(e.to_string()),
165            },
166            None => match sql_conn.connect(NoTls).await {
167                Ok((client, connection)) => {
168                    tokio::spawn(async move {
169                        let _ = connection.await;
170                    });
171                    client
172                }
173                Err(e) => return Err(e.to_string()),
174            },
175        };
176        if let Some(sqls) = sql {
177            for q in sqls {
178                if let Err(e) = client.query(&q, &[]).await {
179                    return Err(e.to_string());
180                }
181            }
182        }
183        let row = match client.query_one("SELECT now()::text", &[]).await {
184            Ok(r) => r,
185            Err(e) => return Err(e.to_string()),
186        };
187        let time: &str = row.get(0);
188
189        Ok(time.to_owned())
190    }
191
192    /// Connect to the database
193    pub async fn connect(&mut self) -> bool {
194        match &self.client {
195            Some(c) => {
196                if c.is_closed() {
197                    self.try_connect().await
198                } else {
199                    true
200                }
201            }
202            None => self.try_connect().await,
203        }
204    }
205
206    /// Trying to connect to the database
207    async fn try_connect(&mut self) -> bool {
208        match self.tls.clone() {
209            Some(tls) => match self.sql_conn.connect(tls).await {
210                Ok((client, connection)) => {
211                    tokio::spawn(async move {
212                        if let Err(e) = connection.await {
213                            Log::stop(612, Some(e.to_string()));
214                        }
215                    });
216                    self.client = Some(client);
217                }
218                Err(e) => {
219                    Log::warning(601, Some(format!("Error: {} => {:?}", e, &self.sql_conn)));
220                    return false;
221                }
222            },
223            None => match self.sql_conn.connect(NoTls).await {
224                Ok((client, connection)) => {
225                    tokio::spawn(async move {
226                        if let Err(e) = connection.await {
227                            Log::warning(612, Some(e.to_string()));
228                        }
229                    });
230                    self.client = Some(client);
231                }
232                Err(e) => {
233                    Log::warning(601, Some(format!("Error: {} => {:?}", e, &self.sql_conn)));
234                    return false;
235                }
236            },
237        }
238        self.prepare().await
239    }
240
241    /// Prepare sql statement
242    async fn prepare(&mut self) -> bool {
243        self.prepare.clear();
244        match &self.client {
245            Some(client) => {
246                let mut map = BTreeMap::new();
247
248                // Get avaible lang 4156762777733340057
249                let sql = r#"
250                    SELECT lang_id, code, name, index
251                    FROM lang
252                    WHERE enable
253                    ORDER BY sort
254                "#;
255                map.insert(fnv1a_64!("lib_get_langs"), (client.prepare_typed(sql, &[]), sql.to_owned()));
256
257                // Get all lang 3367482389811013093
258                let sql = r#"
259                    SELECT lang_id, code, name, index
260                    FROM lang
261                    ORDER BY index, sort
262                "#;
263                map.insert(fnv1a_64!("lib_get_all_langs"), (client.prepare_typed(sql, &[]), sql.to_owned()));
264
265                // Get session 6716397077443474616
266                let sql = r#"
267                    WITH upd AS (
268                        UPDATE session
269                        SET 
270                            last = now()
271                        WHERE
272                            session=$1
273                        RETURNING session_id, user_id, data, lang_id
274                    )
275                    SELECT 
276                        s.session_id, s.user_id, u.role_id, s.data, s.lang_id 
277                    FROM 
278                        upd s
279                        INNER JOIN "user" u ON u.user_id=s.user_id
280                "#;
281                map.insert(fnv1a_64!("lib_get_session"), (client.prepare_typed(sql, &[Type::TEXT]), sql.to_owned()));
282
283                // Update session -400086351751991892
284                let sql = r#"
285                    UPDATE session
286                    SET 
287                        user_id=$1,
288                        lang_id=$2,
289                        data=$3,
290                        last=now(),
291                        ip=$4,
292                        user_agent=$5
293                    WHERE
294                        session_id=$6
295                "#;
296                map.insert(
297                    fnv1a_64!("lib_set_session"),
298                    (client.prepare_typed(sql, &[Type::INT8, Type::INT8, Type::BYTEA, Type::TEXT, Type::TEXT, Type::INT8]), sql.to_owned()),
299                );
300
301                // Insert session 8029853374838241583
302                let sql = r#"
303                    INSERT INTO session (user_id, lang_id, session, data, created, last, ip, user_agent)
304                    SELECT $1, $2, $3, $4, now(), now(), $5, $6
305                "#;
306                map.insert(
307                    fnv1a_64!("lib_add_session"),
308                    (client.prepare_typed(sql, &[Type::INT8, Type::INT8, Type::TEXT, Type::BYTEA, Type::TEXT, Type::TEXT]), sql.to_owned()),
309                );
310
311                // Get redirect -1566077906756142556
312                let sql = r#"
313                    SELECT redirect, permanently FROM redirect WHERE url=$1
314                "#;
315                map.insert(fnv1a_64!("lib_get_redirect"), (client.prepare_typed(sql, &[Type::TEXT]), sql.to_owned()));
316
317                // Get route 3077841024002823969
318                let sql = r#"
319                    SELECT 
320                        c.module, c.class, c.action,
321                        c.module_id, c.class_id, c.action_id,
322                        r.params, r.lang_id
323                    FROM 
324                        route r
325                        INNER JOIN controller c ON c.controller_id=r.controller_id
326                    WHERE r.url=$1
327                "#;
328                map.insert(fnv1a_64!("lib_get_route"), (client.prepare_typed(sql, &[Type::TEXT]), sql.to_owned()));
329
330                // Get route from module/class/action 8508883211214576597
331                let sql = r#"
332                    SELECT r.url 
333                    FROM 
334                        controller c
335                        INNER JOIN route r ON 
336                            r.controller_id=c.controller_id AND r.lang_id=$5 AND r.params = $4
337                    WHERE 
338                        c.module_id=$1 AND c.class_id=$2 AND c.action_id=$3
339                "#;
340                map.insert(
341                    fnv1a_64!("lib_get_url"),
342                    (client.prepare_typed(sql, &[Type::INT8, Type::INT8, Type::INT8, Type::TEXT, Type::INT8]), sql.to_owned()),
343                );
344
345                // Get auth permissions -4169186416014187350
346                let sql = r#"
347                    SELECT COALESCE(MAX(a.access::int), 0) AS access
348                    FROM
349                        access a
350                        INNER JOIN "user" u ON u.role_id=a.role_id
351                        INNER JOIN controller c ON a.controller_id=c.controller_id
352                    WHERE
353                        a.access AND a.role_id=$1 AND (
354                            (c.module_id=-3750763034362895579 AND c.class_id=-3750763034362895579 AND c.action_id=-3750763034362895579)
355                            OR (c.module_id=$2 AND c.class_id=-3750763034362895579 AND c.action_id=-3750763034362895579)
356                            OR (c.module_id=$3 AND c.class_id=$5 AND c.action_id=-3750763034362895579)
357                            OR (c.module_id=$4 AND c.class_id=$6 AND c.action_id=$7)
358                        )
359                "#;
360                map.insert(
361                    fnv1a_64!("lib_get_auth"),
362                    (
363                        client.prepare_typed(sql, &[Type::INT8, Type::INT8, Type::INT8, Type::INT8, Type::INT8, Type::INT8, Type::INT8]),
364                        sql.to_owned(),
365                    ),
366                );
367
368                // Get not found -8338028735031617838
369                let sql = r#"
370                    SELECT url
371                    FROM route
372                    WHERE controller_id=3 AND lang_id=$1
373                "#;
374                map.insert(fnv1a_64!("lib_get_not_found"), (client.prepare_typed(sql, &[Type::INT8]), sql.to_owned()));
375
376                // Get settings 2305043036426846632
377                let sql = r#"
378                    SELECT data FROM setting WHERE key=$1
379                "#;
380                map.insert(fnv1a_64!("lib_get_setting"), (client.prepare_typed(sql, &[Type::INT8]), sql.to_owned()));
381
382                // Insert email 5843182919945045895
383                let sql = r#"
384                    INSERT INTO mail(user_id, mail, "create", err, transport)
385                    VALUES ($1, $2, now(), false, $3)
386                    RETURNING mail_id;
387                "#;
388                map.insert(fnv1a_64!("lib_mail_new"), (client.prepare_typed(sql, &[Type::INT8, Type::JSON, Type::TEXT]), sql.to_owned()));
389
390                // Insert email without provider 4032848019693494130
391                let sql = r#"
392                    INSERT INTO mail(user_id, mail, "create", send, err, transport)
393                    VALUES ($1, $2, now(), now(), false, 'None')
394                "#;
395                map.insert(fnv1a_64!("lib_mail_add"), (client.prepare_typed(sql, &[Type::INT8, Type::JSON]), sql.to_owned()));
396
397                // Insert error send email 1423150573745747914
398                let sql = r#"
399                    UPDATE mail
400                    SET err=true, send=now(), err_text=$1
401                    WHERE mail_id=$2
402                "#;
403                map.insert(fnv1a_64!("lib_mail_err"), (client.prepare_typed(sql, &[Type::TEXT, Type::INT8]), sql.to_owned()));
404
405                // Insert success send email 2568591733020940649
406                let sql = r#"
407                    UPDATE mail
408                    SET err=false, send=now()
409                    WHERE mail_id=$1
410                "#;
411                map.insert(fnv1a_64!("lib_mail_ok"), (client.prepare_typed(sql, &[Type::INT8]), sql.to_owned()));
412
413                // Prepare statements
414                for (key, (prepare, sql)) in map {
415                    match prepare.await {
416                        Ok(s) => {
417                            self.prepare.insert(key, PgStatement { statement: s, sql });
418                        }
419                        Err(e) => {
420                            Log::stop(613, Some(format!("Error={}. sql={}", e, sql)));
421                            return false;
422                        }
423                    }
424                }
425                true
426            }
427            None => false,
428        }
429    }
430
431    /// Executes a statement in database, returning the results
432    async fn query_raw<T>(client: &Option<tokio_postgres::Client>, query: &T, params: &[&(dyn ToSql + Sync)]) -> DBResult
433    where
434        T: ?Sized + ToStatement,
435    {
436        match client {
437            Some(sql) => match sql.query_raw(query, PgSql::slice_iter(params)).await {
438                Ok(res) => {
439                    pin_mut!(res);
440                    match res.try_next().await {
441                        Ok(row) => match row {
442                            Some(r) => {
443                                let mut result = match res.rows_affected() {
444                                    Some(s) => Vec::with_capacity(s as usize),
445                                    None => Vec::new(),
446                                };
447                                result.push(r);
448                                loop {
449                                    match res.try_next().await {
450                                        Ok(row) => match row {
451                                            Some(r) => {
452                                                result.push(r);
453                                            }
454                                            None => break,
455                                        },
456                                        Err(e) => return DBResult::ErrQuery(e.to_string()),
457                                    }
458                                }
459                                DBResult::Vec(result)
460                            }
461                            None => DBResult::Void,
462                        },
463                        Err(e) => DBResult::ErrQuery(e.to_string()),
464                    }
465                }
466                Err(e) => {
467                    if e.is_closed() {
468                        DBResult::ErrConnect(e.to_string())
469                    } else {
470                        DBResult::ErrQuery(e.to_string())
471                    }
472                }
473            },
474            None => DBResult::NoClient,
475        }
476    }
477
478    /// Executes a statement in database, without results
479    async fn execute_raw<T>(client: &Option<tokio_postgres::Client>, query: &T, params: &[&(dyn ToSql + Sync)]) -> DBResult
480    where
481        T: ?Sized + ToStatement,
482    {
483        match client {
484            Some(sql) => match sql.execute_raw(query, PgSql::slice_iter(params)).await {
485                Ok(_) => DBResult::Void,
486                Err(e) => {
487                    if e.is_closed() {
488                        DBResult::ErrConnect(e.to_string())
489                    } else {
490                        DBResult::ErrQuery(e.to_string())
491                    }
492                }
493            },
494            None => DBResult::NoClient,
495        }
496    }
497
498    /// Slise to ToSql
499    fn slice_iter<'a>(s: &'a [&'a (dyn ToSql + Sync)]) -> impl ExactSizeIterator<Item = &'a dyn ToSql> + 'a {
500        s.iter().map(|s| *s as _)
501    }
502
503    /// Execute query to database and return a result
504    async fn query_statement(&self, query: &impl KeyOrQuery, params: &[&(dyn ToSql + Sync)]) -> DBResult {
505        if query.is_key() {
506            let stat = match self.prepare.get(&query.to_i64()) {
507                Some(s) => s,
508                None => return DBResult::ErrPrepare,
509            };
510            PgSql::query_raw(&self.client, &stat.statement, params).await
511        } else {
512            PgSql::query_raw(&self.client, query.to_str(), params).await
513        }
514    }
515
516    /// Execute query to database without a result
517    async fn execute_statement(&self, query: &impl KeyOrQuery, params: &[&(dyn ToSql + Sync)]) -> DBResult {
518        if query.is_key() {
519            let stat = match self.prepare.get(&query.to_i64()) {
520                Some(s) => s,
521                None => return DBResult::ErrPrepare,
522            };
523            PgSql::execute_raw(&self.client, &stat.statement, params).await
524        } else {
525            PgSql::execute_raw(&self.client, query.to_str(), params).await
526        }
527    }
528
529    /// Execute query to database without a result
530    pub async fn execute(&mut self, query: &impl KeyOrQuery, params: &[&(dyn ToSql + Sync)]) -> Option<()> {
531        match self.execute_statement(query, params).await {
532            DBResult::Void | DBResult::Vec(_) => return Some(()),
533            DBResult::ErrQuery(e) => {
534                if query.is_key() {
535                    Log::warning(602, Some(format!("Statement key={} error={}", query.to_i64(), e)));
536                } else {
537                    Log::warning(602, Some(format!("{} error={}", query.to_str(), e)));
538                }
539                return None;
540            }
541            DBResult::ErrPrepare => {
542                Log::warning(615, Some(format!("{:?}", query.to_i64())));
543                return None;
544            }
545            DBResult::NoClient => Log::warning(604, None),
546            DBResult::ErrConnect(e) => Log::warning(603, Some(e)),
547        };
548        self.client = None;
549        if self.try_connect().await {
550            match self.execute_statement(query, params).await {
551                DBResult::Void | DBResult::Vec(_) => return Some(()),
552                _ => {}
553            }
554        }
555        None
556    }
557
558    /// Execute query to database and return a result
559    pub async fn query(&mut self, query: &impl KeyOrQuery, params: &[&(dyn ToSql + Sync)], assoc: bool) -> Option<Vec<Data>> {
560        match self.query_statement(query, params).await {
561            DBResult::Vec(rows) => return Some(self.convert(rows, assoc)),
562            DBResult::Void => return Some(Vec::new()),
563            DBResult::ErrQuery(e) => {
564                if query.is_key() {
565                    Log::warning(602, Some(format!("Statement key={} error={}", query.to_i64(), e)));
566                } else {
567                    Log::warning(602, Some(format!("{} error={}", query.to_str(), e)));
568                }
569                return None;
570            }
571            DBResult::ErrPrepare => {
572                Log::warning(615, Some(format!("{:?}", query.to_i64())));
573                return None;
574            }
575            DBResult::NoClient => Log::warning(604, None),
576            DBResult::ErrConnect(e) => Log::warning(603, Some(e)),
577        };
578        self.client = None;
579        if self.try_connect().await {
580            match self.query_statement(query, params).await {
581                DBResult::Vec(rows) => return Some(self.convert(rows, assoc)),
582                DBResult::Void => return Some(Vec::new()),
583                _ => {}
584            }
585        }
586        None
587    }
588
589    /// Execute query to database and return a result,  
590    /// and grouping tabular data according to specified conditions.
591    pub async fn query_group(
592        &mut self,
593        query: &impl KeyOrQuery,
594        params: &[&(dyn ToSql + Sync)],
595        assoc: bool,
596        conds: &[&[impl StrOrI64OrUSize]],
597    ) -> Option<Data> {
598        if conds.is_empty() {
599            return None;
600        }
601        match self.query_statement(query, params).await {
602            DBResult::Vec(rows) => {
603                if rows.is_empty() {
604                    return Some(Data::Map(BTreeMap::new()));
605                }
606                if assoc {
607                    return Some(self.convert_map(rows, conds));
608                } else {
609                    return Some(self.convert_vec(rows, conds));
610                }
611            }
612            DBResult::Void => return Some(Data::Map(BTreeMap::new())),
613            DBResult::ErrQuery(e) => {
614                if query.is_key() {
615                    Log::warning(602, Some(format!("Statement key={} error={}", query.to_i64(), e)));
616                } else {
617                    Log::warning(602, Some(format!("{} error={}", query.to_str(), e)));
618                }
619                return None;
620            }
621            DBResult::ErrPrepare => {
622                Log::warning(615, Some(format!("{:?}", query.to_i64())));
623                return None;
624            }
625            DBResult::NoClient => Log::warning(604, None),
626            DBResult::ErrConnect(e) => Log::warning(603, Some(e)),
627        };
628        self.client = None;
629        if self.try_connect().await {
630            match self.query_statement(query, params).await {
631                DBResult::Vec(rows) => {
632                    if rows.is_empty() {
633                        return Some(Data::Map(BTreeMap::new()));
634                    }
635                    if assoc {
636                        return Some(self.convert_map(rows, conds));
637                    } else {
638                        return Some(self.convert_vec(rows, conds));
639                    }
640                }
641                DBResult::Void => return Some(Data::Map(BTreeMap::new())),
642                _ => {}
643            }
644        }
645        Some(Data::Map(BTreeMap::new()))
646    }
647
648    /// Convert Vec<Row> to Data::Map<Data::Map<...>>
649    fn convert_map(&self, rows: Vec<Row>, conds: &[&[impl StrOrI64OrUSize]]) -> Data {
650        let mut map = BTreeMap::new();
651        let cols = unsafe { rows.get_unchecked(0) }.columns();
652        let columns = self.get_column_type_name(cols);
653        for row in &rows {
654            let mut item = &mut map;
655            for row_conds in conds {
656                if row_conds.is_empty() {
657                    break;
658                }
659                item = match self.fill_map(row, &columns, row_conds, item) {
660                    Some(i) => i,
661                    None => break,
662                };
663            }
664        }
665        Data::Map(map)
666    }
667
668    /// Convert Vec<Row> to Data::Map<Data::Vec<...>>
669    fn convert_vec(&self, rows: Vec<Row>, conds: &[&[impl StrOrI64OrUSize]]) -> Data {
670        let mut map = BTreeMap::new();
671        let cols = unsafe { rows.get_unchecked(0) }.columns();
672        let columns = self.get_column_type(cols);
673        for row in &rows {
674            let mut item = &mut map;
675            for row_conds in conds {
676                if row_conds.is_empty() {
677                    break;
678                }
679                item = match self.fill_vec(row, &columns, row_conds, item) {
680                    Some(i) => i,
681                    None => break,
682                };
683            }
684        }
685        Data::Map(map)
686    }
687
688    /// Fill tree items in map
689    fn fill_map<'a>(
690        &self,
691        row: &Row,
692        columns: &BTreeMap<i64, PgColumnName>,
693        conds: &[impl StrOrI64OrUSize],
694        map: &'a mut BTreeMap<i64, Data>,
695    ) -> Option<&'a mut BTreeMap<i64, Data>> {
696        let mut index = unsafe { conds.get_unchecked(0) }.to_i64();
697        if index == 0 {
698            return None;
699        }
700        let (idx, func) = match columns.get(&index) {
701            Some(f) => f,
702            None => return None,
703        };
704        let val = if let Data::I64(val) = func(row, *idx) {
705            val
706        } else {
707            return None;
708        };
709        let res_map = match map.entry(val) {
710            Entry::Vacant(v) => {
711                let mut new_map = BTreeMap::new();
712                new_map.insert(index, Data::I64(val));
713                let mut turple;
714                for item in &conds[1..] {
715                    index = item.to_i64();
716                    if index == 0 {
717                        return None;
718                    }
719                    turple = match columns.get(&index) {
720                        Some(f) => f,
721                        None => return None,
722                    };
723                    new_map.insert(index, turple.1(row, turple.0));
724                }
725                new_map.insert(fnv1a_64!("sub"), Data::Map(BTreeMap::new()));
726                v.insert(Data::Map(new_map))
727            }
728            Entry::Occupied(o) => o.into_mut(),
729        };
730        if let Data::Map(found_map) = res_map {
731            if let Data::Map(submap) = found_map.get_mut(&fnv1a_64!("sub"))? {
732                Some(submap)
733            } else {
734                None
735            }
736        } else {
737            None
738        }
739    }
740
741    /// Fill tree items in vec
742    fn fill_vec<'a>(
743        &self,
744        row: &Row,
745        columns: &[fn(&Row, usize) -> Data],
746        conds: &[impl StrOrI64OrUSize],
747        map: &'a mut BTreeMap<i64, Data>,
748    ) -> Option<&'a mut BTreeMap<i64, Data>> {
749        let mut index = unsafe { conds.get_unchecked(0) }.to_usize();
750        if index == usize::MAX {
751            return None;
752        }
753        let mut func = unsafe { columns.get_unchecked(index) };
754        let val = if let Data::I64(val) = func(row, index) {
755            val
756        } else {
757            return None;
758        };
759        let res_map = match map.entry(val) {
760            Entry::Vacant(v) => {
761                let mut new_vec = Vec::with_capacity(conds.len() + 1);
762                new_vec.push(Data::I64(val));
763                for item in &conds[1..] {
764                    index = item.to_usize();
765                    if index == usize::MAX {
766                        return None;
767                    }
768                    func = unsafe { columns.get_unchecked(index) };
769                    new_vec.push(func(row, index));
770                }
771                new_vec.push(Data::Map(BTreeMap::new()));
772                v.insert(Data::Vec(new_vec))
773            }
774            Entry::Occupied(o) => o.into_mut(),
775        };
776        if let Data::Vec(found_vec) = res_map {
777            if let Data::Map(submap) = found_vec.last_mut()? {
778                Some(submap)
779            } else {
780                None
781            }
782        } else {
783            None
784        }
785    }
786
787    /// Convert Vec<Row> to Vec<Data>
788    fn convert(&self, rows: Vec<Row>, assoc: bool) -> Vec<Data> {
789        let mut vec = Vec::with_capacity(rows.len());
790        let cols = unsafe { rows.get_unchecked(0) }.columns();
791        if !assoc {
792            let columns = self.get_column_type(cols);
793            let mut func;
794            for row in &rows {
795                let mut v = Vec::with_capacity(columns.len());
796                for idx in 0..columns.len() {
797                    func = unsafe { columns.get_unchecked(idx) };
798                    v.push(func(row, idx))
799                }
800                vec.push(Data::Vec(v));
801            }
802        } else {
803            let columns = self.get_column_type_name(cols);
804            for row in &rows {
805                let mut t = BTreeMap::new();
806                for (name, turple) in &columns {
807                    t.insert(*name, turple.1(row, turple.0));
808                }
809                vec.push(Data::Map(t));
810            }
811        }
812        vec
813    }
814
815    /// Detect columns' type with columns' name
816    fn get_column_type<'a>(&self, cols: &'a [Column]) -> Vec<fn(&Row, usize) -> Data> {
817        let mut columns = Vec::with_capacity(cols.len());
818        for col in cols {
819            let func = match col.type_() {
820                &Type::BOOL => Self::get_bool,
821                &Type::BYTEA => Self::get_bytea,
822                &Type::TEXT => Self::get_string,
823                &Type::JSON => Self::get_json,
824                &Type::JSONB => Self::get_json,
825                &Type::UUID => Self::get_uuid,
826                &Type::VARCHAR => Self::get_string,
827                &Type::INT8 => Self::get_i64,
828                &Type::INT2 => Self::get_i16,
829                &Type::INT4 => Self::get_i32,
830                &Type::FLOAT4 => Self::get_f32,
831                &Type::FLOAT8 => Self::get_f64,
832                &Type::TIMESTAMPTZ => Self::get_date,
833                u => {
834                    Log::warning(614, Some(format!("Type: {}", u)));
835                    Self::get_unknown
836                }
837            };
838            columns.push(func);
839        }
840        columns
841    }
842
843    /// Detect columns' type with columns' name
844    fn get_column_type_name(&self, cols: &[Column]) -> BTreeMap<i64, PgColumnName> {
845        let mut columns = BTreeMap::new();
846        for (idx, col) in cols.iter().enumerate() {
847            let func = match col.type_() {
848                &Type::BOOL => Self::get_bool,
849                &Type::BYTEA => Self::get_bytea,
850                &Type::TEXT => Self::get_string,
851                &Type::JSON => Self::get_json,
852                &Type::JSONB => Self::get_json,
853                &Type::UUID => Self::get_uuid,
854                &Type::VARCHAR => Self::get_string,
855                &Type::INT8 => Self::get_i64,
856                &Type::INT2 => Self::get_i16,
857                &Type::INT4 => Self::get_i32,
858                &Type::FLOAT4 => Self::get_f32,
859                &Type::FLOAT8 => Self::get_f64,
860                &Type::TIMESTAMPTZ => Self::get_date,
861                u => {
862                    Log::warning(614, Some(format!("Type: {}", u)));
863                    Self::get_unknown
864                }
865            };
866            columns.insert(crate::fnv1a_64(col.name().as_bytes()), (idx, func));
867        }
868        columns
869    }
870
871    /// Unknown Row type to Data::None
872    #[inline]
873    fn get_unknown(_: &Row, _: usize) -> Data {
874        Data::None
875    }
876
877    /// Row::i16 to Data::I16
878    #[inline]
879    fn get_i16(row: &Row, idx: usize) -> Data {
880        let i: Option<i16> = row.get(idx);
881        match i {
882            Some(i) => Data::I16(i),
883            None => Data::None,
884        }
885    }
886
887    /// Row::i32 to Data::I32
888    #[inline]
889    fn get_i32(row: &Row, idx: usize) -> Data {
890        let i: Option<i32> = row.get(idx);
891        match i {
892            Some(i) => Data::I32(i),
893            None => Data::None,
894        }
895    }
896
897    /// Row::i64 to Data::I64
898    #[inline]
899    fn get_i64(row: &Row, idx: usize) -> Data {
900        let i: Option<i64> = row.get(idx);
901        match i {
902            Some(i) => Data::I64(i),
903            None => Data::None,
904        }
905    }
906
907    /// Row::f32 to Data::F32
908    #[inline]
909    fn get_f32(row: &Row, idx: usize) -> Data {
910        let f: Option<f32> = row.get(idx);
911        match f {
912            Some(f) => Data::F32(f),
913            None => Data::None,
914        }
915    }
916
917    /// Row::f64 to Data::F64
918    #[inline]
919    fn get_f64(row: &Row, idx: usize) -> Data {
920        let f: Option<f64> = row.get(idx);
921        match f {
922            Some(f) => Data::F64(f),
923            None => Data::None,
924        }
925    }
926
927    /// Row::DateTime<Utc> to Data::DateTime<Utc>
928    #[inline]
929    fn get_date(row: &Row, idx: usize) -> Data {
930        let d: Option<DateTime<Utc>> = row.get(idx);
931        match d {
932            Some(d) => Data::Date(d),
933            None => Data::None,
934        }
935    }
936
937    /// Row::Uuid to Data::String
938    #[inline]
939    fn get_uuid(row: &Row, idx: usize) -> Data {
940        let u: Option<uuid::Uuid> = row.get(idx);
941        match u {
942            Some(u) => Data::String(u.to_string()),
943            None => Data::None,
944        }
945    }
946
947    /// Row::Json to Data::Json
948    #[inline]
949    fn get_json(row: &Row, idx: usize) -> Data {
950        let j: Option<Value> = row.get(idx);
951        match j {
952            Some(j) => Data::Json(j),
953            None => Data::None,
954        }
955    }
956
957    /// Row::String to Data::String
958    #[inline]
959    fn get_string(row: &Row, idx: usize) -> Data {
960        let s: Option<String> = row.get(idx);
961        match s {
962            Some(s) => Data::String(s),
963            None => Data::None,
964        }
965    }
966
967    /// Row::Vec<u8> to Data::Raw
968    #[inline]
969    fn get_bytea(row: &Row, idx: usize) -> Data {
970        let r: Option<Vec<u8>> = row.get(idx);
971        match r {
972            Some(r) => Data::Raw(r),
973            None => Data::None,
974        }
975    }
976
977    /// Row::Bool to Data::Bool
978    #[inline]
979    fn get_bool(row: &Row, idx: usize) -> Data {
980        let b: Option<bool> = row.get(idx);
981        match b {
982            Some(b) => Data::Bool(b),
983            None => Data::None,
984        }
985    }
986}
987
988impl std::fmt::Debug for PgSql {
989    /// Formats the value using the given formatter.
990    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
991        let tls = self.tls.clone().map(|_| "TlsConnector");
992        let PgSql { client, sql_conn, tls: _, prepare } = self;
993        f.debug_struct("DB").field("client", &client).field("sql_conn", &sql_conn).field("tls", &tls).field("prepare", &prepare).finish()
994    }
995}
996
997impl std::fmt::Debug for PgStatement {
998    /// Formats the value using the given formatter.
999    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1000        let PgStatement { statement, sql } = self;
1001        f.debug_struct("PgStatement")
1002            .field("sql", &sql)
1003            .field("columns", &statement.columns())
1004            .field("params", &statement.params())
1005            .finish()
1006    }
1007}
1008
1009#[derive(Clone)]
1010pub(crate) struct MakeRustlsConnect {
1011    config: Arc<ClientConfig>,
1012}
1013
1014impl MakeRustlsConnect {
1015    pub fn new(config: ClientConfig) -> Self {
1016        Self { config: Arc::new(config) }
1017    }
1018}
1019
1020impl<S> MakeTlsConnect<S> for MakeRustlsConnect
1021where
1022    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1023{
1024    type Stream = RustlsStream<S>;
1025    type TlsConnect = RustlsConnect;
1026    type Error = rustls::pki_types::InvalidDnsNameError;
1027
1028    fn make_tls_connect(&mut self, hostname: &str) -> Result<RustlsConnect, Self::Error> {
1029        ServerName::try_from(hostname).map(|dns_name| {
1030            RustlsConnect(RustlsConnectData {
1031                hostname: dns_name.to_owned(),
1032                connector: Arc::clone(&self.config).into(),
1033            })
1034        })
1035    }
1036}
1037
1038pub(crate) struct RustlsConnect(RustlsConnectData);
1039
1040struct RustlsConnectData {
1041    hostname: ServerName<'static>,
1042    connector: TlsConnector,
1043}
1044
1045impl<S> TlsConnect<S> for RustlsConnect
1046where
1047    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1048{
1049    type Stream = RustlsStream<S>;
1050    type Error = io::Error;
1051    type Future = Pin<Box<dyn Future<Output = io::Result<RustlsStream<S>>> + Send>>;
1052
1053    fn connect(self, stream: S) -> Self::Future {
1054        Box::pin(async move { self.0.connector.connect(self.0.hostname, stream).await.map(|s| RustlsStream(Box::pin(s))) })
1055    }
1056}
1057
1058pub(crate) struct RustlsStream<S>(Pin<Box<TlsStream<S>>>);
1059
1060impl<S> tokio_postgres::tls::TlsStream for RustlsStream<S>
1061where
1062    S: AsyncRead + AsyncWrite + Unpin,
1063{
1064    fn channel_binding(&self) -> ChannelBinding {
1065        let (_, session) = self.0.get_ref();
1066        match session.peer_certificates() {
1067            Some(certs) if !certs.is_empty() => X509Certificate::from_der(&certs[0])
1068                .ok()
1069                .and_then(|cert| cert.signature_algorithm())
1070                .map(|algorithm| match algorithm {
1071                    RsaSha1 | RsaSha256 | EcdsaSha256 => &digest::SHA256,
1072                    RsaSha384 | EcdsaSha384 => &digest::SHA384,
1073                    RsaSha512 | Ed25519 => &digest::SHA512,
1074                    NoSignature(algo) => match algo {
1075                        Sha1 | Sha256 => &digest::SHA256,
1076                        Sha384 => &digest::SHA384,
1077                        Sha512 => &digest::SHA512,
1078                    },
1079                })
1080                .map(|algorithm| {
1081                    let hash = digest::digest(algorithm, certs[0].as_ref());
1082                    ChannelBinding::tls_server_end_point(hash.as_ref().into())
1083                })
1084                .unwrap_or_else(ChannelBinding::none),
1085            _ => ChannelBinding::none(),
1086        }
1087    }
1088}
1089
1090impl<S> AsyncRead for RustlsStream<S>
1091where
1092    S: AsyncRead + AsyncWrite + Unpin,
1093{
1094    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf<'_>) -> Poll<tokio::io::Result<()>> {
1095        self.0.as_mut().poll_read(cx, buf)
1096    }
1097}
1098
1099impl<S> AsyncWrite for RustlsStream<S>
1100where
1101    S: AsyncRead + AsyncWrite + Unpin,
1102{
1103    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<tokio::io::Result<usize>> {
1104        self.0.as_mut().poll_write(cx, buf)
1105    }
1106
1107    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<()>> {
1108        self.0.as_mut().poll_flush(cx)
1109    }
1110
1111    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<()>> {
1112        self.0.as_mut().poll_shutdown(cx)
1113    }
1114}