1use std::{
2 collections::{btree_map::Entry, BTreeMap},
3 future::Future,
4 io,
5 pin::Pin,
6 sync::Arc,
7 task::{Context, Poll},
8};
9
10use chrono::{DateTime, Utc};
11use futures_util::{pin_mut, TryStreamExt};
12use postgres::{
13 tls::{ChannelBinding, MakeTlsConnect, TlsConnect},
14 types::{ToSql, Type},
15 Column, Error, NoTls, Row, Statement, ToStatement,
16};
17use ring::digest;
18use rustls::{pki_types::ServerName, ClientConfig, RootCertStore};
19use serde_json::Value;
20use tiny_web_macro::fnv1a_64;
21use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
22use tokio_postgres::Client;
23use tokio_rustls::{client::TlsStream, TlsConnector};
24use x509_certificate::{
25 DigestAlgorithm::{Sha1, Sha256, Sha384, Sha512},
26 SignatureAlgorithm::{EcdsaSha256, EcdsaSha384, Ed25519, NoSignature, RsaSha1, RsaSha256, RsaSha384, RsaSha512},
27 X509Certificate,
28};
29
30use crate::sys::{data::Data, init::DBConfig, log::Log};
31
32use super::adapter::{KeyOrQuery, NoCertificateVerification, StrOrI64OrUSize};
33
34#[derive(Debug)]
36enum DBResult {
37 Vec(Vec<Row>),
39 Void,
41 NoClient,
43 ErrQuery(String),
45 ErrConnect(String),
47 ErrPrepare,
49}
50
51pub(crate) struct PgSql {
53 client: Option<Client>,
55 sql_conn: tokio_postgres::Config,
57 tls: Option<MakeRustlsConnect>,
59 prepare: BTreeMap<i64, PgStatement>,
61}
62
63pub(crate) struct PgStatement {
65 statement: Statement,
67 sql: String,
69}
70
71type PgColumnName = (usize, fn(&Row, usize) -> Data);
73
74impl PgSql {
75 pub fn new(config: Arc<DBConfig>) -> Option<PgSql> {
77 let (sql_conn, tls) = match PgSql::create_connect_string(&config) {
78 Ok(v) => v,
79 Err(e) => {
80 Log::stop(609, Some(e.to_string()));
81 return None;
82 }
83 };
84
85 Some(PgSql {
86 client: None,
87 sql_conn,
88 tls,
89 prepare: BTreeMap::new(),
90 })
91 }
92
93 fn create_connect_string(config: &DBConfig) -> Result<(tokio_postgres::Config, Option<MakeRustlsConnect>), Error> {
94 let mut conn_str = String::with_capacity(512);
95 conn_str.push_str("host='");
97 conn_str.push_str(&config.host);
98 conn_str.push_str("' ");
99 if let Some(p) = &config.port {
101 conn_str.push_str("port='");
102 conn_str.push_str(&p.to_string());
103 conn_str.push_str("' ");
104 }
105 conn_str.push_str("dbname='");
107 conn_str.push_str(&config.name);
108 conn_str.push_str("' ");
109 if let Some(u) = &config.user {
111 conn_str.push_str("user='");
112 conn_str.push_str(u);
113 conn_str.push_str("' ");
114 }
115 if let Some(p) = &config.pwd {
117 conn_str.push_str("password='");
118 conn_str.push_str(p);
119 conn_str.push_str("' ");
120 }
121 if config.sslmode {
123 conn_str.push_str("sslmode=require ");
124 }
125 conn_str.push_str("connect_timeout=1 ");
127 conn_str.push_str("application_name='");
129 conn_str.push_str(env!("CARGO_PKG_NAME"));
130 conn_str.push(' ');
131 conn_str.push_str(env!("CARGO_PKG_VERSION"));
132 conn_str.push_str("' ");
133 conn_str.push_str("options='--client_encoding=UTF8'");
135
136 let sql_conn: tokio_postgres::Config = match conn_str.parse() {
137 Ok(c) => c,
138 Err(e) => return Err(e),
139 };
140 let tls = if config.sslmode {
141 let mut config = ClientConfig::builder().with_root_certificates(RootCertStore::empty()).with_no_client_auth();
142 config.dangerous().set_certificate_verifier(Arc::new(NoCertificateVerification {}));
143 Some(MakeRustlsConnect::new(config))
144 } else {
145 None
146 };
147
148 Ok((sql_conn, tls))
149 }
150
151 pub(crate) async fn check_db(config: &DBConfig, sql: Option<Vec<String>>) -> Result<String, String> {
152 let (sql_conn, tls) = match PgSql::create_connect_string(config) {
153 Ok(v) => v,
154 Err(e) => return Err(e.to_string()),
155 };
156 let client = match tls {
157 Some(tls) => match sql_conn.connect(tls).await {
158 Ok((client, connection)) => {
159 tokio::spawn(async move {
160 let _ = connection.await;
161 });
162 client
163 }
164 Err(e) => return Err(e.to_string()),
165 },
166 None => match sql_conn.connect(NoTls).await {
167 Ok((client, connection)) => {
168 tokio::spawn(async move {
169 let _ = connection.await;
170 });
171 client
172 }
173 Err(e) => return Err(e.to_string()),
174 },
175 };
176 if let Some(sqls) = sql {
177 for q in sqls {
178 if let Err(e) = client.query(&q, &[]).await {
179 return Err(e.to_string());
180 }
181 }
182 }
183 let row = match client.query_one("SELECT now()::text", &[]).await {
184 Ok(r) => r,
185 Err(e) => return Err(e.to_string()),
186 };
187 let time: &str = row.get(0);
188
189 Ok(time.to_owned())
190 }
191
192 pub async fn connect(&mut self) -> bool {
194 match &self.client {
195 Some(c) => {
196 if c.is_closed() {
197 self.try_connect().await
198 } else {
199 true
200 }
201 }
202 None => self.try_connect().await,
203 }
204 }
205
206 async fn try_connect(&mut self) -> bool {
208 match self.tls.clone() {
209 Some(tls) => match self.sql_conn.connect(tls).await {
210 Ok((client, connection)) => {
211 tokio::spawn(async move {
212 if let Err(e) = connection.await {
213 Log::stop(612, Some(e.to_string()));
214 }
215 });
216 self.client = Some(client);
217 }
218 Err(e) => {
219 Log::warning(601, Some(format!("Error: {} => {:?}", e, &self.sql_conn)));
220 return false;
221 }
222 },
223 None => match self.sql_conn.connect(NoTls).await {
224 Ok((client, connection)) => {
225 tokio::spawn(async move {
226 if let Err(e) = connection.await {
227 Log::warning(612, Some(e.to_string()));
228 }
229 });
230 self.client = Some(client);
231 }
232 Err(e) => {
233 Log::warning(601, Some(format!("Error: {} => {:?}", e, &self.sql_conn)));
234 return false;
235 }
236 },
237 }
238 self.prepare().await
239 }
240
241 async fn prepare(&mut self) -> bool {
243 self.prepare.clear();
244 match &self.client {
245 Some(client) => {
246 let mut map = BTreeMap::new();
247
248 let sql = r#"
250 SELECT lang_id, code, name, index
251 FROM lang
252 WHERE enable
253 ORDER BY sort
254 "#;
255 map.insert(fnv1a_64!("lib_get_langs"), (client.prepare_typed(sql, &[]), sql.to_owned()));
256
257 let sql = r#"
259 SELECT lang_id, code, name, index
260 FROM lang
261 ORDER BY index, sort
262 "#;
263 map.insert(fnv1a_64!("lib_get_all_langs"), (client.prepare_typed(sql, &[]), sql.to_owned()));
264
265 let sql = r#"
267 WITH upd AS (
268 UPDATE session
269 SET
270 last = now()
271 WHERE
272 session=$1
273 RETURNING session_id, user_id, data, lang_id
274 )
275 SELECT
276 s.session_id, s.user_id, u.role_id, s.data, s.lang_id
277 FROM
278 upd s
279 INNER JOIN "user" u ON u.user_id=s.user_id
280 "#;
281 map.insert(fnv1a_64!("lib_get_session"), (client.prepare_typed(sql, &[Type::TEXT]), sql.to_owned()));
282
283 let sql = r#"
285 UPDATE session
286 SET
287 user_id=$1,
288 lang_id=$2,
289 data=$3,
290 last=now(),
291 ip=$4,
292 user_agent=$5
293 WHERE
294 session_id=$6
295 "#;
296 map.insert(
297 fnv1a_64!("lib_set_session"),
298 (client.prepare_typed(sql, &[Type::INT8, Type::INT8, Type::BYTEA, Type::TEXT, Type::TEXT, Type::INT8]), sql.to_owned()),
299 );
300
301 let sql = r#"
303 INSERT INTO session (user_id, lang_id, session, data, created, last, ip, user_agent)
304 SELECT $1, $2, $3, $4, now(), now(), $5, $6
305 "#;
306 map.insert(
307 fnv1a_64!("lib_add_session"),
308 (client.prepare_typed(sql, &[Type::INT8, Type::INT8, Type::TEXT, Type::BYTEA, Type::TEXT, Type::TEXT]), sql.to_owned()),
309 );
310
311 let sql = r#"
313 SELECT redirect, permanently FROM redirect WHERE url=$1
314 "#;
315 map.insert(fnv1a_64!("lib_get_redirect"), (client.prepare_typed(sql, &[Type::TEXT]), sql.to_owned()));
316
317 let sql = r#"
319 SELECT
320 c.module, c.class, c.action,
321 c.module_id, c.class_id, c.action_id,
322 r.params, r.lang_id
323 FROM
324 route r
325 INNER JOIN controller c ON c.controller_id=r.controller_id
326 WHERE r.url=$1
327 "#;
328 map.insert(fnv1a_64!("lib_get_route"), (client.prepare_typed(sql, &[Type::TEXT]), sql.to_owned()));
329
330 let sql = r#"
332 SELECT r.url
333 FROM
334 controller c
335 INNER JOIN route r ON
336 r.controller_id=c.controller_id AND r.lang_id=$5 AND r.params = $4
337 WHERE
338 c.module_id=$1 AND c.class_id=$2 AND c.action_id=$3
339 "#;
340 map.insert(
341 fnv1a_64!("lib_get_url"),
342 (client.prepare_typed(sql, &[Type::INT8, Type::INT8, Type::INT8, Type::TEXT, Type::INT8]), sql.to_owned()),
343 );
344
345 let sql = r#"
347 SELECT COALESCE(MAX(a.access::int), 0) AS access
348 FROM
349 access a
350 INNER JOIN "user" u ON u.role_id=a.role_id
351 INNER JOIN controller c ON a.controller_id=c.controller_id
352 WHERE
353 a.access AND a.role_id=$1 AND (
354 (c.module_id=-3750763034362895579 AND c.class_id=-3750763034362895579 AND c.action_id=-3750763034362895579)
355 OR (c.module_id=$2 AND c.class_id=-3750763034362895579 AND c.action_id=-3750763034362895579)
356 OR (c.module_id=$3 AND c.class_id=$5 AND c.action_id=-3750763034362895579)
357 OR (c.module_id=$4 AND c.class_id=$6 AND c.action_id=$7)
358 )
359 "#;
360 map.insert(
361 fnv1a_64!("lib_get_auth"),
362 (
363 client.prepare_typed(sql, &[Type::INT8, Type::INT8, Type::INT8, Type::INT8, Type::INT8, Type::INT8, Type::INT8]),
364 sql.to_owned(),
365 ),
366 );
367
368 let sql = r#"
370 SELECT url
371 FROM route
372 WHERE controller_id=3 AND lang_id=$1
373 "#;
374 map.insert(fnv1a_64!("lib_get_not_found"), (client.prepare_typed(sql, &[Type::INT8]), sql.to_owned()));
375
376 let sql = r#"
378 SELECT data FROM setting WHERE key=$1
379 "#;
380 map.insert(fnv1a_64!("lib_get_setting"), (client.prepare_typed(sql, &[Type::INT8]), sql.to_owned()));
381
382 let sql = r#"
384 INSERT INTO mail(user_id, mail, "create", err, transport)
385 VALUES ($1, $2, now(), false, $3)
386 RETURNING mail_id;
387 "#;
388 map.insert(fnv1a_64!("lib_mail_new"), (client.prepare_typed(sql, &[Type::INT8, Type::JSON, Type::TEXT]), sql.to_owned()));
389
390 let sql = r#"
392 INSERT INTO mail(user_id, mail, "create", send, err, transport)
393 VALUES ($1, $2, now(), now(), false, 'None')
394 "#;
395 map.insert(fnv1a_64!("lib_mail_add"), (client.prepare_typed(sql, &[Type::INT8, Type::JSON]), sql.to_owned()));
396
397 let sql = r#"
399 UPDATE mail
400 SET err=true, send=now(), err_text=$1
401 WHERE mail_id=$2
402 "#;
403 map.insert(fnv1a_64!("lib_mail_err"), (client.prepare_typed(sql, &[Type::TEXT, Type::INT8]), sql.to_owned()));
404
405 let sql = r#"
407 UPDATE mail
408 SET err=false, send=now()
409 WHERE mail_id=$1
410 "#;
411 map.insert(fnv1a_64!("lib_mail_ok"), (client.prepare_typed(sql, &[Type::INT8]), sql.to_owned()));
412
413 for (key, (prepare, sql)) in map {
415 match prepare.await {
416 Ok(s) => {
417 self.prepare.insert(key, PgStatement { statement: s, sql });
418 }
419 Err(e) => {
420 Log::stop(613, Some(format!("Error={}. sql={}", e, sql)));
421 return false;
422 }
423 }
424 }
425 true
426 }
427 None => false,
428 }
429 }
430
431 async fn query_raw<T>(client: &Option<tokio_postgres::Client>, query: &T, params: &[&(dyn ToSql + Sync)]) -> DBResult
433 where
434 T: ?Sized + ToStatement,
435 {
436 match client {
437 Some(sql) => match sql.query_raw(query, PgSql::slice_iter(params)).await {
438 Ok(res) => {
439 pin_mut!(res);
440 match res.try_next().await {
441 Ok(row) => match row {
442 Some(r) => {
443 let mut result = match res.rows_affected() {
444 Some(s) => Vec::with_capacity(s as usize),
445 None => Vec::new(),
446 };
447 result.push(r);
448 loop {
449 match res.try_next().await {
450 Ok(row) => match row {
451 Some(r) => {
452 result.push(r);
453 }
454 None => break,
455 },
456 Err(e) => return DBResult::ErrQuery(e.to_string()),
457 }
458 }
459 DBResult::Vec(result)
460 }
461 None => DBResult::Void,
462 },
463 Err(e) => DBResult::ErrQuery(e.to_string()),
464 }
465 }
466 Err(e) => {
467 if e.is_closed() {
468 DBResult::ErrConnect(e.to_string())
469 } else {
470 DBResult::ErrQuery(e.to_string())
471 }
472 }
473 },
474 None => DBResult::NoClient,
475 }
476 }
477
478 async fn execute_raw<T>(client: &Option<tokio_postgres::Client>, query: &T, params: &[&(dyn ToSql + Sync)]) -> DBResult
480 where
481 T: ?Sized + ToStatement,
482 {
483 match client {
484 Some(sql) => match sql.execute_raw(query, PgSql::slice_iter(params)).await {
485 Ok(_) => DBResult::Void,
486 Err(e) => {
487 if e.is_closed() {
488 DBResult::ErrConnect(e.to_string())
489 } else {
490 DBResult::ErrQuery(e.to_string())
491 }
492 }
493 },
494 None => DBResult::NoClient,
495 }
496 }
497
498 fn slice_iter<'a>(s: &'a [&'a (dyn ToSql + Sync)]) -> impl ExactSizeIterator<Item = &'a dyn ToSql> + 'a {
500 s.iter().map(|s| *s as _)
501 }
502
503 async fn query_statement(&self, query: &impl KeyOrQuery, params: &[&(dyn ToSql + Sync)]) -> DBResult {
505 if query.is_key() {
506 let stat = match self.prepare.get(&query.to_i64()) {
507 Some(s) => s,
508 None => return DBResult::ErrPrepare,
509 };
510 PgSql::query_raw(&self.client, &stat.statement, params).await
511 } else {
512 PgSql::query_raw(&self.client, query.to_str(), params).await
513 }
514 }
515
516 async fn execute_statement(&self, query: &impl KeyOrQuery, params: &[&(dyn ToSql + Sync)]) -> DBResult {
518 if query.is_key() {
519 let stat = match self.prepare.get(&query.to_i64()) {
520 Some(s) => s,
521 None => return DBResult::ErrPrepare,
522 };
523 PgSql::execute_raw(&self.client, &stat.statement, params).await
524 } else {
525 PgSql::execute_raw(&self.client, query.to_str(), params).await
526 }
527 }
528
529 pub async fn execute(&mut self, query: &impl KeyOrQuery, params: &[&(dyn ToSql + Sync)]) -> Option<()> {
531 match self.execute_statement(query, params).await {
532 DBResult::Void | DBResult::Vec(_) => return Some(()),
533 DBResult::ErrQuery(e) => {
534 if query.is_key() {
535 Log::warning(602, Some(format!("Statement key={} error={}", query.to_i64(), e)));
536 } else {
537 Log::warning(602, Some(format!("{} error={}", query.to_str(), e)));
538 }
539 return None;
540 }
541 DBResult::ErrPrepare => {
542 Log::warning(615, Some(format!("{:?}", query.to_i64())));
543 return None;
544 }
545 DBResult::NoClient => Log::warning(604, None),
546 DBResult::ErrConnect(e) => Log::warning(603, Some(e)),
547 };
548 self.client = None;
549 if self.try_connect().await {
550 match self.execute_statement(query, params).await {
551 DBResult::Void | DBResult::Vec(_) => return Some(()),
552 _ => {}
553 }
554 }
555 None
556 }
557
558 pub async fn query(&mut self, query: &impl KeyOrQuery, params: &[&(dyn ToSql + Sync)], assoc: bool) -> Option<Vec<Data>> {
560 match self.query_statement(query, params).await {
561 DBResult::Vec(rows) => return Some(self.convert(rows, assoc)),
562 DBResult::Void => return Some(Vec::new()),
563 DBResult::ErrQuery(e) => {
564 if query.is_key() {
565 Log::warning(602, Some(format!("Statement key={} error={}", query.to_i64(), e)));
566 } else {
567 Log::warning(602, Some(format!("{} error={}", query.to_str(), e)));
568 }
569 return None;
570 }
571 DBResult::ErrPrepare => {
572 Log::warning(615, Some(format!("{:?}", query.to_i64())));
573 return None;
574 }
575 DBResult::NoClient => Log::warning(604, None),
576 DBResult::ErrConnect(e) => Log::warning(603, Some(e)),
577 };
578 self.client = None;
579 if self.try_connect().await {
580 match self.query_statement(query, params).await {
581 DBResult::Vec(rows) => return Some(self.convert(rows, assoc)),
582 DBResult::Void => return Some(Vec::new()),
583 _ => {}
584 }
585 }
586 None
587 }
588
589 pub async fn query_group(
592 &mut self,
593 query: &impl KeyOrQuery,
594 params: &[&(dyn ToSql + Sync)],
595 assoc: bool,
596 conds: &[&[impl StrOrI64OrUSize]],
597 ) -> Option<Data> {
598 if conds.is_empty() {
599 return None;
600 }
601 match self.query_statement(query, params).await {
602 DBResult::Vec(rows) => {
603 if rows.is_empty() {
604 return Some(Data::Map(BTreeMap::new()));
605 }
606 if assoc {
607 return Some(self.convert_map(rows, conds));
608 } else {
609 return Some(self.convert_vec(rows, conds));
610 }
611 }
612 DBResult::Void => return Some(Data::Map(BTreeMap::new())),
613 DBResult::ErrQuery(e) => {
614 if query.is_key() {
615 Log::warning(602, Some(format!("Statement key={} error={}", query.to_i64(), e)));
616 } else {
617 Log::warning(602, Some(format!("{} error={}", query.to_str(), e)));
618 }
619 return None;
620 }
621 DBResult::ErrPrepare => {
622 Log::warning(615, Some(format!("{:?}", query.to_i64())));
623 return None;
624 }
625 DBResult::NoClient => Log::warning(604, None),
626 DBResult::ErrConnect(e) => Log::warning(603, Some(e)),
627 };
628 self.client = None;
629 if self.try_connect().await {
630 match self.query_statement(query, params).await {
631 DBResult::Vec(rows) => {
632 if rows.is_empty() {
633 return Some(Data::Map(BTreeMap::new()));
634 }
635 if assoc {
636 return Some(self.convert_map(rows, conds));
637 } else {
638 return Some(self.convert_vec(rows, conds));
639 }
640 }
641 DBResult::Void => return Some(Data::Map(BTreeMap::new())),
642 _ => {}
643 }
644 }
645 Some(Data::Map(BTreeMap::new()))
646 }
647
648 fn convert_map(&self, rows: Vec<Row>, conds: &[&[impl StrOrI64OrUSize]]) -> Data {
650 let mut map = BTreeMap::new();
651 let cols = unsafe { rows.get_unchecked(0) }.columns();
652 let columns = self.get_column_type_name(cols);
653 for row in &rows {
654 let mut item = &mut map;
655 for row_conds in conds {
656 if row_conds.is_empty() {
657 break;
658 }
659 item = match self.fill_map(row, &columns, row_conds, item) {
660 Some(i) => i,
661 None => break,
662 };
663 }
664 }
665 Data::Map(map)
666 }
667
668 fn convert_vec(&self, rows: Vec<Row>, conds: &[&[impl StrOrI64OrUSize]]) -> Data {
670 let mut map = BTreeMap::new();
671 let cols = unsafe { rows.get_unchecked(0) }.columns();
672 let columns = self.get_column_type(cols);
673 for row in &rows {
674 let mut item = &mut map;
675 for row_conds in conds {
676 if row_conds.is_empty() {
677 break;
678 }
679 item = match self.fill_vec(row, &columns, row_conds, item) {
680 Some(i) => i,
681 None => break,
682 };
683 }
684 }
685 Data::Map(map)
686 }
687
688 fn fill_map<'a>(
690 &self,
691 row: &Row,
692 columns: &BTreeMap<i64, PgColumnName>,
693 conds: &[impl StrOrI64OrUSize],
694 map: &'a mut BTreeMap<i64, Data>,
695 ) -> Option<&'a mut BTreeMap<i64, Data>> {
696 let mut index = unsafe { conds.get_unchecked(0) }.to_i64();
697 if index == 0 {
698 return None;
699 }
700 let (idx, func) = match columns.get(&index) {
701 Some(f) => f,
702 None => return None,
703 };
704 let val = if let Data::I64(val) = func(row, *idx) {
705 val
706 } else {
707 return None;
708 };
709 let res_map = match map.entry(val) {
710 Entry::Vacant(v) => {
711 let mut new_map = BTreeMap::new();
712 new_map.insert(index, Data::I64(val));
713 let mut turple;
714 for item in &conds[1..] {
715 index = item.to_i64();
716 if index == 0 {
717 return None;
718 }
719 turple = match columns.get(&index) {
720 Some(f) => f,
721 None => return None,
722 };
723 new_map.insert(index, turple.1(row, turple.0));
724 }
725 new_map.insert(fnv1a_64!("sub"), Data::Map(BTreeMap::new()));
726 v.insert(Data::Map(new_map))
727 }
728 Entry::Occupied(o) => o.into_mut(),
729 };
730 if let Data::Map(found_map) = res_map {
731 if let Data::Map(submap) = found_map.get_mut(&fnv1a_64!("sub"))? {
732 Some(submap)
733 } else {
734 None
735 }
736 } else {
737 None
738 }
739 }
740
741 fn fill_vec<'a>(
743 &self,
744 row: &Row,
745 columns: &[fn(&Row, usize) -> Data],
746 conds: &[impl StrOrI64OrUSize],
747 map: &'a mut BTreeMap<i64, Data>,
748 ) -> Option<&'a mut BTreeMap<i64, Data>> {
749 let mut index = unsafe { conds.get_unchecked(0) }.to_usize();
750 if index == usize::MAX {
751 return None;
752 }
753 let mut func = unsafe { columns.get_unchecked(index) };
754 let val = if let Data::I64(val) = func(row, index) {
755 val
756 } else {
757 return None;
758 };
759 let res_map = match map.entry(val) {
760 Entry::Vacant(v) => {
761 let mut new_vec = Vec::with_capacity(conds.len() + 1);
762 new_vec.push(Data::I64(val));
763 for item in &conds[1..] {
764 index = item.to_usize();
765 if index == usize::MAX {
766 return None;
767 }
768 func = unsafe { columns.get_unchecked(index) };
769 new_vec.push(func(row, index));
770 }
771 new_vec.push(Data::Map(BTreeMap::new()));
772 v.insert(Data::Vec(new_vec))
773 }
774 Entry::Occupied(o) => o.into_mut(),
775 };
776 if let Data::Vec(found_vec) = res_map {
777 if let Data::Map(submap) = found_vec.last_mut()? {
778 Some(submap)
779 } else {
780 None
781 }
782 } else {
783 None
784 }
785 }
786
787 fn convert(&self, rows: Vec<Row>, assoc: bool) -> Vec<Data> {
789 let mut vec = Vec::with_capacity(rows.len());
790 let cols = unsafe { rows.get_unchecked(0) }.columns();
791 if !assoc {
792 let columns = self.get_column_type(cols);
793 let mut func;
794 for row in &rows {
795 let mut v = Vec::with_capacity(columns.len());
796 for idx in 0..columns.len() {
797 func = unsafe { columns.get_unchecked(idx) };
798 v.push(func(row, idx))
799 }
800 vec.push(Data::Vec(v));
801 }
802 } else {
803 let columns = self.get_column_type_name(cols);
804 for row in &rows {
805 let mut t = BTreeMap::new();
806 for (name, turple) in &columns {
807 t.insert(*name, turple.1(row, turple.0));
808 }
809 vec.push(Data::Map(t));
810 }
811 }
812 vec
813 }
814
815 fn get_column_type<'a>(&self, cols: &'a [Column]) -> Vec<fn(&Row, usize) -> Data> {
817 let mut columns = Vec::with_capacity(cols.len());
818 for col in cols {
819 let func = match col.type_() {
820 &Type::BOOL => Self::get_bool,
821 &Type::BYTEA => Self::get_bytea,
822 &Type::TEXT => Self::get_string,
823 &Type::JSON => Self::get_json,
824 &Type::JSONB => Self::get_json,
825 &Type::UUID => Self::get_uuid,
826 &Type::VARCHAR => Self::get_string,
827 &Type::INT8 => Self::get_i64,
828 &Type::INT2 => Self::get_i16,
829 &Type::INT4 => Self::get_i32,
830 &Type::FLOAT4 => Self::get_f32,
831 &Type::FLOAT8 => Self::get_f64,
832 &Type::TIMESTAMPTZ => Self::get_date,
833 u => {
834 Log::warning(614, Some(format!("Type: {}", u)));
835 Self::get_unknown
836 }
837 };
838 columns.push(func);
839 }
840 columns
841 }
842
843 fn get_column_type_name(&self, cols: &[Column]) -> BTreeMap<i64, PgColumnName> {
845 let mut columns = BTreeMap::new();
846 for (idx, col) in cols.iter().enumerate() {
847 let func = match col.type_() {
848 &Type::BOOL => Self::get_bool,
849 &Type::BYTEA => Self::get_bytea,
850 &Type::TEXT => Self::get_string,
851 &Type::JSON => Self::get_json,
852 &Type::JSONB => Self::get_json,
853 &Type::UUID => Self::get_uuid,
854 &Type::VARCHAR => Self::get_string,
855 &Type::INT8 => Self::get_i64,
856 &Type::INT2 => Self::get_i16,
857 &Type::INT4 => Self::get_i32,
858 &Type::FLOAT4 => Self::get_f32,
859 &Type::FLOAT8 => Self::get_f64,
860 &Type::TIMESTAMPTZ => Self::get_date,
861 u => {
862 Log::warning(614, Some(format!("Type: {}", u)));
863 Self::get_unknown
864 }
865 };
866 columns.insert(crate::fnv1a_64(col.name().as_bytes()), (idx, func));
867 }
868 columns
869 }
870
871 #[inline]
873 fn get_unknown(_: &Row, _: usize) -> Data {
874 Data::None
875 }
876
877 #[inline]
879 fn get_i16(row: &Row, idx: usize) -> Data {
880 let i: Option<i16> = row.get(idx);
881 match i {
882 Some(i) => Data::I16(i),
883 None => Data::None,
884 }
885 }
886
887 #[inline]
889 fn get_i32(row: &Row, idx: usize) -> Data {
890 let i: Option<i32> = row.get(idx);
891 match i {
892 Some(i) => Data::I32(i),
893 None => Data::None,
894 }
895 }
896
897 #[inline]
899 fn get_i64(row: &Row, idx: usize) -> Data {
900 let i: Option<i64> = row.get(idx);
901 match i {
902 Some(i) => Data::I64(i),
903 None => Data::None,
904 }
905 }
906
907 #[inline]
909 fn get_f32(row: &Row, idx: usize) -> Data {
910 let f: Option<f32> = row.get(idx);
911 match f {
912 Some(f) => Data::F32(f),
913 None => Data::None,
914 }
915 }
916
917 #[inline]
919 fn get_f64(row: &Row, idx: usize) -> Data {
920 let f: Option<f64> = row.get(idx);
921 match f {
922 Some(f) => Data::F64(f),
923 None => Data::None,
924 }
925 }
926
927 #[inline]
929 fn get_date(row: &Row, idx: usize) -> Data {
930 let d: Option<DateTime<Utc>> = row.get(idx);
931 match d {
932 Some(d) => Data::Date(d),
933 None => Data::None,
934 }
935 }
936
937 #[inline]
939 fn get_uuid(row: &Row, idx: usize) -> Data {
940 let u: Option<uuid::Uuid> = row.get(idx);
941 match u {
942 Some(u) => Data::String(u.to_string()),
943 None => Data::None,
944 }
945 }
946
947 #[inline]
949 fn get_json(row: &Row, idx: usize) -> Data {
950 let j: Option<Value> = row.get(idx);
951 match j {
952 Some(j) => Data::Json(j),
953 None => Data::None,
954 }
955 }
956
957 #[inline]
959 fn get_string(row: &Row, idx: usize) -> Data {
960 let s: Option<String> = row.get(idx);
961 match s {
962 Some(s) => Data::String(s),
963 None => Data::None,
964 }
965 }
966
967 #[inline]
969 fn get_bytea(row: &Row, idx: usize) -> Data {
970 let r: Option<Vec<u8>> = row.get(idx);
971 match r {
972 Some(r) => Data::Raw(r),
973 None => Data::None,
974 }
975 }
976
977 #[inline]
979 fn get_bool(row: &Row, idx: usize) -> Data {
980 let b: Option<bool> = row.get(idx);
981 match b {
982 Some(b) => Data::Bool(b),
983 None => Data::None,
984 }
985 }
986}
987
988impl std::fmt::Debug for PgSql {
989 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
991 let tls = self.tls.clone().map(|_| "TlsConnector");
992 let PgSql { client, sql_conn, tls: _, prepare } = self;
993 f.debug_struct("DB").field("client", &client).field("sql_conn", &sql_conn).field("tls", &tls).field("prepare", &prepare).finish()
994 }
995}
996
997impl std::fmt::Debug for PgStatement {
998 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1000 let PgStatement { statement, sql } = self;
1001 f.debug_struct("PgStatement")
1002 .field("sql", &sql)
1003 .field("columns", &statement.columns())
1004 .field("params", &statement.params())
1005 .finish()
1006 }
1007}
1008
1009#[derive(Clone)]
1010pub(crate) struct MakeRustlsConnect {
1011 config: Arc<ClientConfig>,
1012}
1013
1014impl MakeRustlsConnect {
1015 pub fn new(config: ClientConfig) -> Self {
1016 Self { config: Arc::new(config) }
1017 }
1018}
1019
1020impl<S> MakeTlsConnect<S> for MakeRustlsConnect
1021where
1022 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1023{
1024 type Stream = RustlsStream<S>;
1025 type TlsConnect = RustlsConnect;
1026 type Error = rustls::pki_types::InvalidDnsNameError;
1027
1028 fn make_tls_connect(&mut self, hostname: &str) -> Result<RustlsConnect, Self::Error> {
1029 ServerName::try_from(hostname).map(|dns_name| {
1030 RustlsConnect(RustlsConnectData {
1031 hostname: dns_name.to_owned(),
1032 connector: Arc::clone(&self.config).into(),
1033 })
1034 })
1035 }
1036}
1037
1038pub(crate) struct RustlsConnect(RustlsConnectData);
1039
1040struct RustlsConnectData {
1041 hostname: ServerName<'static>,
1042 connector: TlsConnector,
1043}
1044
1045impl<S> TlsConnect<S> for RustlsConnect
1046where
1047 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1048{
1049 type Stream = RustlsStream<S>;
1050 type Error = io::Error;
1051 type Future = Pin<Box<dyn Future<Output = io::Result<RustlsStream<S>>> + Send>>;
1052
1053 fn connect(self, stream: S) -> Self::Future {
1054 Box::pin(async move { self.0.connector.connect(self.0.hostname, stream).await.map(|s| RustlsStream(Box::pin(s))) })
1055 }
1056}
1057
1058pub(crate) struct RustlsStream<S>(Pin<Box<TlsStream<S>>>);
1059
1060impl<S> tokio_postgres::tls::TlsStream for RustlsStream<S>
1061where
1062 S: AsyncRead + AsyncWrite + Unpin,
1063{
1064 fn channel_binding(&self) -> ChannelBinding {
1065 let (_, session) = self.0.get_ref();
1066 match session.peer_certificates() {
1067 Some(certs) if !certs.is_empty() => X509Certificate::from_der(&certs[0])
1068 .ok()
1069 .and_then(|cert| cert.signature_algorithm())
1070 .map(|algorithm| match algorithm {
1071 RsaSha1 | RsaSha256 | EcdsaSha256 => &digest::SHA256,
1072 RsaSha384 | EcdsaSha384 => &digest::SHA384,
1073 RsaSha512 | Ed25519 => &digest::SHA512,
1074 NoSignature(algo) => match algo {
1075 Sha1 | Sha256 => &digest::SHA256,
1076 Sha384 => &digest::SHA384,
1077 Sha512 => &digest::SHA512,
1078 },
1079 })
1080 .map(|algorithm| {
1081 let hash = digest::digest(algorithm, certs[0].as_ref());
1082 ChannelBinding::tls_server_end_point(hash.as_ref().into())
1083 })
1084 .unwrap_or_else(ChannelBinding::none),
1085 _ => ChannelBinding::none(),
1086 }
1087 }
1088}
1089
1090impl<S> AsyncRead for RustlsStream<S>
1091where
1092 S: AsyncRead + AsyncWrite + Unpin,
1093{
1094 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf<'_>) -> Poll<tokio::io::Result<()>> {
1095 self.0.as_mut().poll_read(cx, buf)
1096 }
1097}
1098
1099impl<S> AsyncWrite for RustlsStream<S>
1100where
1101 S: AsyncRead + AsyncWrite + Unpin,
1102{
1103 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<tokio::io::Result<usize>> {
1104 self.0.as_mut().poll_write(cx, buf)
1105 }
1106
1107 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<()>> {
1108 self.0.as_mut().poll_flush(cx)
1109 }
1110
1111 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<()>> {
1112 self.0.as_mut().poll_shutdown(cx)
1113 }
1114}