1use futures_util::lock::Mutex;
2use log::LevelFilter;
3use sea_query::Values;
4use std::{fmt::Write, future::Future, pin::Pin, sync::Arc};
5
6use sqlx::{
7 pool::PoolConnection,
8 postgres::{PgConnectOptions, PgQueryResult, PgRow},
9 Connection, Executor, PgPool, Postgres,
10};
11
12use sea_query_binder::SqlxValues;
13use tracing::instrument;
14
15use crate::{
16 debug_print, error::*, executor::*, AccessMode, ConnectOptions, DatabaseConnection,
17 DatabaseTransaction, DbBackend, IsolationLevel, QueryStream, Statement, TransactionError,
18};
19
20use super::sqlx_common::*;
21
22#[derive(Debug)]
24pub struct SqlxPostgresConnector;
25
26#[derive(Clone)]
28pub struct SqlxPostgresPoolConnection {
29 pub(crate) pool: PgPool,
30 metric_callback: Option<crate::metric::Callback>,
31}
32
33impl std::fmt::Debug for SqlxPostgresPoolConnection {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
36 }
37}
38
39impl From<PgPool> for SqlxPostgresPoolConnection {
40 fn from(pool: PgPool) -> Self {
41 SqlxPostgresPoolConnection {
42 pool,
43 metric_callback: None,
44 }
45 }
46}
47
48impl From<PgPool> for DatabaseConnection {
49 fn from(pool: PgPool) -> Self {
50 DatabaseConnection::SqlxPostgresPoolConnection(pool.into())
51 }
52}
53
54impl SqlxPostgresConnector {
55 pub fn accepts(string: &str) -> bool {
57 string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
58 }
59
60 #[instrument(level = "trace")]
62 pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
63 let mut sqlx_opts = options
64 .url
65 .parse::<PgConnectOptions>()
66 .map_err(sqlx_error_to_conn_err)?;
67 use sqlx::ConnectOptions;
68 if !options.sqlx_logging {
69 sqlx_opts = sqlx_opts.disable_statement_logging();
70 } else {
71 sqlx_opts = sqlx_opts.log_statements(options.sqlx_logging_level);
72 if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
73 sqlx_opts = sqlx_opts.log_slow_statements(
74 options.sqlx_slow_statements_logging_level,
75 options.sqlx_slow_statements_logging_threshold,
76 );
77 }
78 }
79
80 if let Some(f) = &options.pg_opts_fn {
81 sqlx_opts = f(sqlx_opts);
82 }
83
84 let set_search_path_sql = options.schema_search_path.as_ref().map(|schema| {
85 let mut string = "SET search_path = ".to_owned();
86 if schema.starts_with('"') {
87 write!(&mut string, "{schema}").unwrap();
88 } else {
89 for (i, schema) in schema.split(',').enumerate() {
90 if i > 0 {
91 write!(&mut string, ",").unwrap();
92 }
93 if schema.starts_with('"') {
94 write!(&mut string, "{schema}").unwrap();
95 } else {
96 write!(&mut string, "\"{schema}\"").unwrap();
97 }
98 }
99 }
100 string
101 });
102 let lazy = options.connect_lazy;
103 let mut pool_options = options.sqlx_pool_options();
104 if let Some(sql) = set_search_path_sql {
105 pool_options = pool_options.after_connect(move |conn, _| {
106 let sql = sql.clone();
107 Box::pin(async move {
108 sqlx::Executor::execute(conn, sql.as_str())
109 .await
110 .map(|_| ())
111 })
112 });
113 }
114 let pool = if lazy {
115 pool_options.connect_lazy_with(sqlx_opts)
116 } else {
117 pool_options
118 .connect_with(sqlx_opts)
119 .await
120 .map_err(sqlx_error_to_conn_err)?
121 };
122 Ok(DatabaseConnection::SqlxPostgresPoolConnection(
123 SqlxPostgresPoolConnection {
124 pool,
125 metric_callback: None,
126 },
127 ))
128 }
129}
130
131impl SqlxPostgresConnector {
132 pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
134 DatabaseConnection::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
135 pool,
136 metric_callback: None,
137 })
138 }
139}
140
141impl SqlxPostgresPoolConnection {
142 #[instrument(level = "trace")]
144 pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
145 debug_print!("{}", stmt);
146
147 let query = sqlx_query(&stmt);
148 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
149 crate::metric::metric!(self.metric_callback, &stmt, {
150 match query.execute(&mut *conn).await {
151 Ok(res) => Ok(res.into()),
152 Err(err) => Err(sqlx_error_to_exec_err(err)),
153 }
154 })
155 }
156
157 #[instrument(level = "trace")]
159 pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
160 debug_print!("{}", sql);
161
162 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
163 match conn.execute(sql).await {
164 Ok(res) => Ok(res.into()),
165 Err(err) => Err(sqlx_error_to_exec_err(err)),
166 }
167 }
168
169 #[instrument(level = "trace")]
171 pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
172 debug_print!("{}", stmt);
173
174 let query = sqlx_query(&stmt);
175 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
176 crate::metric::metric!(self.metric_callback, &stmt, {
177 match query.fetch_one(&mut *conn).await {
178 Ok(row) => Ok(Some(row.into())),
179 Err(err) => match err {
180 sqlx::Error::RowNotFound => Ok(None),
181 _ => Err(sqlx_error_to_query_err(err)),
182 },
183 }
184 })
185 }
186
187 #[instrument(level = "trace")]
189 pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
190 debug_print!("{}", stmt);
191
192 let query = sqlx_query(&stmt);
193 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
194 crate::metric::metric!(self.metric_callback, &stmt, {
195 match query.fetch_all(&mut *conn).await {
196 Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
197 Err(err) => Err(sqlx_error_to_query_err(err)),
198 }
199 })
200 }
201
202 #[instrument(level = "trace")]
204 pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
205 debug_print!("{}", stmt);
206
207 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
208 Ok(QueryStream::from((
209 conn,
210 stmt,
211 self.metric_callback.clone(),
212 )))
213 }
214
215 #[instrument(level = "trace")]
217 pub async fn begin(
218 &self,
219 isolation_level: Option<IsolationLevel>,
220 access_mode: Option<AccessMode>,
221 ) -> Result<DatabaseTransaction, DbErr> {
222 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
223 DatabaseTransaction::new_postgres(
224 conn,
225 self.metric_callback.clone(),
226 isolation_level,
227 access_mode,
228 )
229 .await
230 }
231
232 #[instrument(level = "trace", skip(callback))]
234 pub async fn transaction<F, T, E>(
235 &self,
236 callback: F,
237 isolation_level: Option<IsolationLevel>,
238 access_mode: Option<AccessMode>,
239 ) -> Result<T, TransactionError<E>>
240 where
241 F: for<'b> FnOnce(
242 &'b DatabaseTransaction,
243 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
244 + Send,
245 T: Send,
246 E: std::fmt::Display + std::fmt::Debug + Send,
247 {
248 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
249 let transaction = DatabaseTransaction::new_postgres(
250 conn,
251 self.metric_callback.clone(),
252 isolation_level,
253 access_mode,
254 )
255 .await
256 .map_err(|e| TransactionError::Connection(e))?;
257 transaction.run(callback).await
258 }
259
260 pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
261 where
262 F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
263 {
264 self.metric_callback = Some(Arc::new(callback));
265 }
266
267 pub async fn ping(&self) -> Result<(), DbErr> {
269 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
270 match conn.ping().await {
271 Ok(_) => Ok(()),
272 Err(err) => Err(sqlx_error_to_conn_err(err)),
273 }
274 }
275
276 pub async fn close(self) -> Result<(), DbErr> {
279 self.close_by_ref().await
280 }
281
282 pub async fn close_by_ref(&self) -> Result<(), DbErr> {
284 self.pool.close().await;
285 Ok(())
286 }
287}
288
289impl From<PgRow> for QueryResult {
290 fn from(row: PgRow) -> QueryResult {
291 QueryResult {
292 row: QueryResultRow::SqlxPostgres(row),
293 }
294 }
295}
296
297impl From<PgQueryResult> for ExecResult {
298 fn from(result: PgQueryResult) -> ExecResult {
299 ExecResult {
300 result: ExecResultHolder::SqlxPostgres(result),
301 }
302 }
303}
304
305pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, SqlxValues> {
306 let values = stmt
307 .values
308 .as_ref()
309 .map_or(Values(Vec::new()), |values| values.clone());
310 sqlx::query_with(&stmt.sql, SqlxValues(values))
311}
312
313pub(crate) async fn set_transaction_config(
314 conn: &mut PoolConnection<Postgres>,
315 isolation_level: Option<IsolationLevel>,
316 access_mode: Option<AccessMode>,
317) -> Result<(), DbErr> {
318 if let Some(isolation_level) = isolation_level {
319 let stmt = Statement {
320 sql: format!("SET TRANSACTION ISOLATION LEVEL {isolation_level}"),
321 values: None,
322 db_backend: DbBackend::Postgres,
323 };
324 let query = sqlx_query(&stmt);
325 conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
326 }
327 if let Some(access_mode) = access_mode {
328 let stmt = Statement {
329 sql: format!("SET TRANSACTION {access_mode}"),
330 values: None,
331 db_backend: DbBackend::Postgres,
332 };
333 let query = sqlx_query(&stmt);
334 conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
335 }
336 Ok(())
337}
338
339impl
340 From<(
341 PoolConnection<sqlx::Postgres>,
342 Statement,
343 Option<crate::metric::Callback>,
344 )> for crate::QueryStream
345{
346 fn from(
347 (conn, stmt, metric_callback): (
348 PoolConnection<sqlx::Postgres>,
349 Statement,
350 Option<crate::metric::Callback>,
351 ),
352 ) -> Self {
353 crate::QueryStream::build(
354 stmt,
355 crate::InnerConnection::Postgres(conn),
356 metric_callback,
357 )
358 }
359}
360
361impl crate::DatabaseTransaction {
362 pub(crate) async fn new_postgres(
363 inner: PoolConnection<sqlx::Postgres>,
364 metric_callback: Option<crate::metric::Callback>,
365 isolation_level: Option<IsolationLevel>,
366 access_mode: Option<AccessMode>,
367 ) -> Result<crate::DatabaseTransaction, DbErr> {
368 Self::begin(
369 Arc::new(Mutex::new(crate::InnerConnection::Postgres(inner))),
370 crate::DbBackend::Postgres,
371 metric_callback,
372 isolation_level,
373 access_mode,
374 )
375 .await
376 }
377}
378
379#[cfg(feature = "proxy")]
380pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> crate::ProxyRow {
381 use sea_query::Value;
384 use sqlx::{Column, Row, TypeInfo};
385 crate::ProxyRow {
386 values: row
387 .columns()
388 .iter()
389 .map(|c| {
390 (
391 c.name().to_string(),
392 match c.type_info().name() {
393 "BOOL" => {
394 Value::Bool(row.try_get(c.ordinal()).expect("Failed to get boolean"))
395 }
396 #[cfg(feature = "postgres-array")]
397 "BOOL[]" => Value::Array(
398 sea_query::ArrayType::Bool,
399 row.try_get::<Option<Vec<bool>>, _>(c.ordinal())
400 .expect("Failed to get boolean array")
401 .map(|vals| {
402 Box::new(
403 vals.into_iter()
404 .map(|val| Value::Bool(Some(val)))
405 .collect(),
406 )
407 }),
408 ),
409
410 "\"CHAR\"" => Value::TinyInt(
411 row.try_get(c.ordinal())
412 .expect("Failed to get small integer"),
413 ),
414 #[cfg(feature = "postgres-array")]
415 "\"CHAR\"[]" => Value::Array(
416 sea_query::ArrayType::TinyInt,
417 row.try_get::<Option<Vec<i8>>, _>(c.ordinal())
418 .expect("Failed to get small integer array")
419 .map(|vals: Vec<i8>| {
420 Box::new(
421 vals.into_iter()
422 .map(|val| Value::TinyInt(Some(val)))
423 .collect(),
424 )
425 }),
426 ),
427
428 "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(
429 row.try_get(c.ordinal())
430 .expect("Failed to get small integer"),
431 ),
432 #[cfg(feature = "postgres-array")]
433 "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
434 sea_query::ArrayType::SmallInt,
435 row.try_get::<Option<Vec<i16>>, _>(c.ordinal())
436 .expect("Failed to get small integer array")
437 .map(|vals: Vec<i16>| {
438 Box::new(
439 vals.into_iter()
440 .map(|val| Value::SmallInt(Some(val)))
441 .collect(),
442 )
443 }),
444 ),
445
446 "INT" | "SERIAL" | "INT4" => {
447 Value::Int(row.try_get(c.ordinal()).expect("Failed to get integer"))
448 }
449 #[cfg(feature = "postgres-array")]
450 "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
451 sea_query::ArrayType::Int,
452 row.try_get::<Option<Vec<i32>>, _>(c.ordinal())
453 .expect("Failed to get integer array")
454 .map(|vals: Vec<i32>| {
455 Box::new(
456 vals.into_iter().map(|val| Value::Int(Some(val))).collect(),
457 )
458 }),
459 ),
460
461 "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(
462 row.try_get(c.ordinal()).expect("Failed to get big integer"),
463 ),
464 #[cfg(feature = "postgres-array")]
465 "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
466 sea_query::ArrayType::BigInt,
467 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
468 .expect("Failed to get big integer array")
469 .map(|vals: Vec<i64>| {
470 Box::new(
471 vals.into_iter()
472 .map(|val| Value::BigInt(Some(val)))
473 .collect(),
474 )
475 }),
476 ),
477
478 "FLOAT4" | "REAL" => {
479 Value::Float(row.try_get(c.ordinal()).expect("Failed to get float"))
480 }
481 #[cfg(feature = "postgres-array")]
482 "FLOAT4[]" | "REAL[]" => Value::Array(
483 sea_query::ArrayType::Float,
484 row.try_get::<Option<Vec<f32>>, _>(c.ordinal())
485 .expect("Failed to get float array")
486 .map(|vals| {
487 Box::new(
488 vals.into_iter()
489 .map(|val| Value::Float(Some(val)))
490 .collect(),
491 )
492 }),
493 ),
494
495 "FLOAT8" | "DOUBLE PRECISION" => {
496 Value::Double(row.try_get(c.ordinal()).expect("Failed to get double"))
497 }
498 #[cfg(feature = "postgres-array")]
499 "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
500 sea_query::ArrayType::Double,
501 row.try_get::<Option<Vec<f64>>, _>(c.ordinal())
502 .expect("Failed to get double array")
503 .map(|vals| {
504 Box::new(
505 vals.into_iter()
506 .map(|val| Value::Double(Some(val)))
507 .collect(),
508 )
509 }),
510 ),
511
512 "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(
513 row.try_get::<Option<String>, _>(c.ordinal())
514 .expect("Failed to get string")
515 .map(Box::new),
516 ),
517 #[cfg(feature = "postgres-array")]
518 "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
519 sea_query::ArrayType::String,
520 row.try_get::<Option<Vec<String>>, _>(c.ordinal())
521 .expect("Failed to get string array")
522 .map(|vals| {
523 Box::new(
524 vals.into_iter()
525 .map(|val| Value::String(Some(Box::new(val))))
526 .collect(),
527 )
528 }),
529 ),
530
531 "BYTEA" => Value::Bytes(
532 row.try_get::<Option<Vec<u8>>, _>(c.ordinal())
533 .expect("Failed to get bytes")
534 .map(Box::new),
535 ),
536 #[cfg(feature = "postgres-array")]
537 "BYTEA[]" => Value::Array(
538 sea_query::ArrayType::Bytes,
539 row.try_get::<Option<Vec<Vec<u8>>>, _>(c.ordinal())
540 .expect("Failed to get bytes array")
541 .map(|vals| {
542 Box::new(
543 vals.into_iter()
544 .map(|val| Value::Bytes(Some(Box::new(val))))
545 .collect(),
546 )
547 }),
548 ),
549
550 #[cfg(feature = "with-bigdecimal")]
551 "NUMERIC" => Value::BigDecimal(
552 row.try_get::<Option<bigdecimal::BigDecimal>, _>(c.ordinal())
553 .expect("Failed to get numeric")
554 .map(Box::new),
555 ),
556 #[cfg(all(
557 feature = "with-rust_decimal",
558 not(feature = "with-bigdecimal")
559 ))]
560 "NUMERIC" => Value::Decimal(
561 row.try_get(c.ordinal())
562 .expect("Failed to get numeric")
563 .map(Box::new),
564 ),
565
566 #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
567 "NUMERIC[]" => Value::Array(
568 sea_query::ArrayType::BigDecimal,
569 row.try_get::<Option<Vec<bigdecimal::BigDecimal>>, _>(c.ordinal())
570 .expect("Failed to get numeric array")
571 .map(|vals| {
572 Box::new(
573 vals.into_iter()
574 .map(|val| Value::BigDecimal(Some(Box::new(val))))
575 .collect(),
576 )
577 }),
578 ),
579 #[cfg(all(
580 feature = "with-rust_decimal",
581 not(feature = "with-bigdecimal"),
582 feature = "postgres-array"
583 ))]
584 "NUMERIC[]" => Value::Array(
585 sea_query::ArrayType::Decimal,
586 row.try_get::<Option<Vec<rust_decimal::Decimal>>, _>(c.ordinal())
587 .expect("Failed to get numeric array")
588 .map(|vals| {
589 Box::new(
590 vals.into_iter()
591 .map(|val| Value::Decimal(Some(Box::new(val))))
592 .collect(),
593 )
594 }),
595 ),
596
597 "OID" => {
598 Value::BigInt(row.try_get(c.ordinal()).expect("Failed to get oid"))
599 }
600 #[cfg(feature = "postgres-array")]
601 "OID[]" => Value::Array(
602 sea_query::ArrayType::BigInt,
603 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
604 .expect("Failed to get oid array")
605 .map(|vals| {
606 Box::new(
607 vals.into_iter()
608 .map(|val| Value::BigInt(Some(val)))
609 .collect(),
610 )
611 }),
612 ),
613
614 "JSON" | "JSONB" => Value::Json(
615 row.try_get::<Option<serde_json::Value>, _>(c.ordinal())
616 .expect("Failed to get json")
617 .map(Box::new),
618 ),
619 #[cfg(any(feature = "json-array", feature = "postgres-array"))]
620 "JSON[]" | "JSONB[]" => Value::Array(
621 sea_query::ArrayType::Json,
622 row.try_get::<Option<Vec<serde_json::Value>>, _>(c.ordinal())
623 .expect("Failed to get json array")
624 .map(|vals| {
625 Box::new(
626 vals.into_iter()
627 .map(|val| Value::Json(Some(Box::new(val))))
628 .collect(),
629 )
630 }),
631 ),
632
633 #[cfg(feature = "with-ipnetwork")]
634 "INET" | "CIDR" => Value::IpNetwork(
635 row.try_get::<Option<ipnetwork::IpNetwork>, _>(c.ordinal())
636 .expect("Failed to get ip address")
637 .map(Box::new),
638 ),
639 #[cfg(feature = "with-ipnetwork")]
640 "INET[]" | "CIDR[]" => Value::Array(
641 sea_query::ArrayType::IpNetwork,
642 row.try_get::<Option<Vec<ipnetwork::IpNetwork>>, _>(c.ordinal())
643 .expect("Failed to get ip address array")
644 .map(|vals| {
645 Box::new(
646 vals.into_iter()
647 .map(|val| Value::IpNetwork(Some(Box::new(val))))
648 .collect(),
649 )
650 }),
651 ),
652
653 #[cfg(feature = "with-mac_address")]
654 "MACADDR" | "MACADDR8" => Value::MacAddress(
655 row.try_get::<Option<mac_address::MacAddress>, _>(c.ordinal())
656 .expect("Failed to get mac address")
657 .map(Box::new),
658 ),
659 #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
660 "MACADDR[]" | "MACADDR8[]" => Value::Array(
661 sea_query::ArrayType::MacAddress,
662 row.try_get::<Option<Vec<mac_address::MacAddress>>, _>(c.ordinal())
663 .expect("Failed to get mac address array")
664 .map(|vals| {
665 Box::new(
666 vals.into_iter()
667 .map(|val| Value::MacAddress(Some(Box::new(val))))
668 .collect(),
669 )
670 }),
671 ),
672
673 #[cfg(feature = "with-chrono")]
674 "TIMESTAMP" => Value::ChronoDateTime(
675 row.try_get::<Option<chrono::NaiveDateTime>, _>(c.ordinal())
676 .expect("Failed to get timestamp")
677 .map(Box::new),
678 ),
679 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
680 "TIMESTAMP" => Value::TimeDateTime(
681 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
682 .expect("Failed to get timestamp")
683 .map(Box::new),
684 ),
685
686 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
687 "TIMESTAMP[]" => Value::Array(
688 sea_query::ArrayType::ChronoDateTime,
689 row.try_get::<Option<Vec<chrono::NaiveDateTime>>, _>(c.ordinal())
690 .expect("Failed to get timestamp array")
691 .map(|vals| {
692 Box::new(
693 vals.into_iter()
694 .map(|val| Value::ChronoDateTime(Some(Box::new(val))))
695 .collect(),
696 )
697 }),
698 ),
699 #[cfg(all(
700 feature = "with-time",
701 not(feature = "with-chrono"),
702 feature = "postgres-array"
703 ))]
704 "TIMESTAMP[]" => Value::Array(
705 sea_query::ArrayType::TimeDateTime,
706 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
707 .expect("Failed to get timestamp array")
708 .map(|vals| {
709 Box::new(
710 vals.into_iter()
711 .map(|val| Value::TimeDateTime(Some(Box::new(val))))
712 .collect(),
713 )
714 }),
715 ),
716
717 #[cfg(feature = "with-chrono")]
718 "DATE" => Value::ChronoDate(
719 row.try_get::<Option<chrono::NaiveDate>, _>(c.ordinal())
720 .expect("Failed to get date")
721 .map(Box::new),
722 ),
723 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
724 "DATE" => Value::TimeDate(
725 row.try_get::<Option<time::Date>, _>(c.ordinal())
726 .expect("Failed to get date")
727 .map(Box::new),
728 ),
729
730 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
731 "DATE[]" => Value::Array(
732 sea_query::ArrayType::ChronoDate,
733 row.try_get::<Option<Vec<chrono::NaiveDate>>, _>(c.ordinal())
734 .expect("Failed to get date array")
735 .map(|vals| {
736 Box::new(
737 vals.into_iter()
738 .map(|val| Value::ChronoDate(Some(Box::new(val))))
739 .collect(),
740 )
741 }),
742 ),
743 #[cfg(all(
744 feature = "with-time",
745 not(feature = "with-chrono"),
746 feature = "postgres-array"
747 ))]
748 "DATE[]" => Value::Array(
749 sea_query::ArrayType::TimeDate,
750 row.try_get::<Option<Vec<time::Date>>, _>(c.ordinal())
751 .expect("Failed to get date array")
752 .map(|vals| {
753 Box::new(
754 vals.into_iter()
755 .map(|val| Value::TimeDate(Some(Box::new(val))))
756 .collect(),
757 )
758 }),
759 ),
760
761 #[cfg(feature = "with-chrono")]
762 "TIME" => Value::ChronoTime(
763 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
764 .expect("Failed to get time")
765 .map(Box::new),
766 ),
767 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
768 "TIME" => Value::TimeTime(
769 row.try_get::<Option<time::Time>, _>(c.ordinal())
770 .expect("Failed to get time")
771 .map(Box::new),
772 ),
773
774 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
775 "TIME[]" => Value::Array(
776 sea_query::ArrayType::ChronoTime,
777 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
778 .expect("Failed to get time array")
779 .map(|vals| {
780 Box::new(
781 vals.into_iter()
782 .map(|val| Value::ChronoTime(Some(Box::new(val))))
783 .collect(),
784 )
785 }),
786 ),
787 #[cfg(all(
788 feature = "with-time",
789 not(feature = "with-chrono"),
790 feature = "postgres-array"
791 ))]
792 "TIME[]" => Value::Array(
793 sea_query::ArrayType::TimeTime,
794 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
795 .expect("Failed to get time array")
796 .map(|vals| {
797 Box::new(
798 vals.into_iter()
799 .map(|val| Value::TimeTime(Some(Box::new(val))))
800 .collect(),
801 )
802 }),
803 ),
804
805 #[cfg(feature = "with-chrono")]
806 "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(
807 row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
808 .expect("Failed to get timestamptz")
809 .map(Box::new),
810 ),
811 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
812 "TIMESTAMPTZ" => Value::TimeDateTime(
813 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
814 .expect("Failed to get timestamptz")
815 .map(Box::new),
816 ),
817
818 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
819 "TIMESTAMPTZ[]" => Value::Array(
820 sea_query::ArrayType::ChronoDateTimeUtc,
821 row.try_get::<Option<Vec<chrono::DateTime<chrono::Utc>>>, _>(
822 c.ordinal(),
823 )
824 .expect("Failed to get timestamptz array")
825 .map(|vals| {
826 Box::new(
827 vals.into_iter()
828 .map(|val| Value::ChronoDateTimeUtc(Some(Box::new(val))))
829 .collect(),
830 )
831 }),
832 ),
833 #[cfg(all(
834 feature = "with-time",
835 not(feature = "with-chrono"),
836 feature = "postgres-array"
837 ))]
838 "TIMESTAMPTZ[]" => Value::Array(
839 sea_query::ArrayType::TimeDateTime,
840 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
841 .expect("Failed to get timestamptz array")
842 .map(|vals| {
843 Box::new(
844 vals.into_iter()
845 .map(|val| Value::TimeDateTime(Some(Box::new(val))))
846 .collect(),
847 )
848 }),
849 ),
850
851 #[cfg(feature = "with-chrono")]
852 "TIMETZ" => Value::ChronoTime(
853 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
854 .expect("Failed to get timetz")
855 .map(Box::new),
856 ),
857 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
858 "TIMETZ" => Value::TimeTime(
859 row.try_get(c.ordinal())
860 .expect("Failed to get timetz")
861 .map(Box::new),
862 ),
863
864 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
865 "TIMETZ[]" => Value::Array(
866 sea_query::ArrayType::ChronoTime,
867 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
868 .expect("Failed to get timetz array")
869 .map(|vals| {
870 Box::new(
871 vals.into_iter()
872 .map(|val| Value::ChronoTime(Some(Box::new(val))))
873 .collect(),
874 )
875 }),
876 ),
877 #[cfg(all(
878 feature = "with-time",
879 not(feature = "with-chrono"),
880 feature = "postgres-array"
881 ))]
882 "TIMETZ[]" => Value::Array(
883 sea_query::ArrayType::TimeTime,
884 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
885 .expect("Failed to get timetz array")
886 .map(|vals| {
887 Box::new(
888 vals.into_iter()
889 .map(|val| Value::TimeTime(Some(Box::new(val))))
890 .collect(),
891 )
892 }),
893 ),
894
895 #[cfg(feature = "with-uuid")]
896 "UUID" => Value::Uuid(
897 row.try_get::<Option<uuid::Uuid>, _>(c.ordinal())
898 .expect("Failed to get uuid")
899 .map(Box::new),
900 ),
901
902 #[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
903 "UUID[]" => Value::Array(
904 sea_query::ArrayType::Uuid,
905 row.try_get::<Option<Vec<uuid::Uuid>>, _>(c.ordinal())
906 .expect("Failed to get uuid array")
907 .map(|vals| {
908 Box::new(
909 vals.into_iter()
910 .map(|val| Value::Uuid(Some(Box::new(val))))
911 .collect(),
912 )
913 }),
914 ),
915
916 _ => unreachable!("Unknown column type: {}", c.type_info().name()),
917 },
918 )
919 })
920 .collect(),
921 }
922}