1use futures_util::lock::Mutex;
2use log::LevelFilter;
3use sea_query::Values;
4use std::{fmt::Write, future::Future, pin::Pin, sync::Arc};
5
6use sqlx::{
7 Connection, Executor, PgPool, Postgres,
8 pool::PoolConnection,
9 postgres::{PgConnectOptions, PgQueryResult, PgRow},
10};
11
12use sea_query_sqlx::SqlxValues;
13use tracing::instrument;
14
15use crate::{
16 AccessMode, ConnectOptions, DatabaseConnection, DatabaseConnectionType, DatabaseTransaction,
17 DbBackend, IsolationLevel, QueryStream, Statement, TransactionError, debug_print, error::*,
18 executor::*,
19};
20
21use super::sqlx_common::*;
22
23#[derive(Debug)]
25pub struct SqlxPostgresConnector;
26
27#[derive(Clone)]
29pub struct SqlxPostgresPoolConnection {
30 pub(crate) pool: PgPool,
31 metric_callback: Option<crate::metric::Callback>,
32}
33
34impl std::fmt::Debug for SqlxPostgresPoolConnection {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
37 }
38}
39
40impl From<PgPool> for SqlxPostgresPoolConnection {
41 fn from(pool: PgPool) -> Self {
42 SqlxPostgresPoolConnection {
43 pool,
44 metric_callback: None,
45 }
46 }
47}
48
49impl From<PgPool> for DatabaseConnection {
50 fn from(pool: PgPool) -> Self {
51 DatabaseConnectionType::SqlxPostgresPoolConnection(pool.into()).into()
52 }
53}
54
55impl SqlxPostgresConnector {
56 pub fn accepts(string: &str) -> bool {
58 string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
59 }
60
61 #[instrument(level = "trace")]
63 pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
64 let mut sqlx_opts = options
65 .url
66 .parse::<PgConnectOptions>()
67 .map_err(sqlx_error_to_conn_err)?;
68 use sqlx::ConnectOptions;
69 if !options.sqlx_logging {
70 sqlx_opts = sqlx_opts.disable_statement_logging();
71 } else {
72 sqlx_opts = sqlx_opts.log_statements(options.sqlx_logging_level);
73 if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
74 sqlx_opts = sqlx_opts.log_slow_statements(
75 options.sqlx_slow_statements_logging_level,
76 options.sqlx_slow_statements_logging_threshold,
77 );
78 }
79 }
80
81 if let Some(f) = &options.pg_opts_fn {
82 sqlx_opts = f(sqlx_opts);
83 }
84
85 let set_search_path_sql = options.schema_search_path.as_ref().map(|schema| {
86 let mut string = "SET search_path = ".to_owned();
87 if schema.starts_with('"') {
88 write!(&mut string, "{schema}").expect("Infallible");
89 } else {
90 for (i, schema) in schema.split(',').enumerate() {
91 if i > 0 {
92 write!(&mut string, ",").expect("Infallible");
93 }
94 if schema.starts_with('"') {
95 write!(&mut string, "{schema}").expect("Infallible");
96 } else {
97 write!(&mut string, "\"{schema}\"").expect("Infallible");
98 }
99 }
100 }
101 string
102 });
103
104 let lazy = options.connect_lazy;
105 let after_connect = options.after_connect.clone();
106 let mut pool_options = options.sqlx_pool_options();
107
108 if let Some(sql) = set_search_path_sql {
109 pool_options = pool_options.after_connect(move |conn, _| {
110 let sql = sql.clone();
111 Box::pin(async move {
112 sqlx::Executor::execute(conn, sql.as_str())
113 .await
114 .map(|_| ())
115 })
116 });
117 }
118
119 let pool = if lazy {
120 pool_options.connect_lazy_with(sqlx_opts)
121 } else {
122 pool_options
123 .connect_with(sqlx_opts)
124 .await
125 .map_err(sqlx_error_to_conn_err)?
126 };
127
128 let conn: DatabaseConnection =
129 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
130 pool,
131 metric_callback: None,
132 })
133 .into();
134
135 if let Some(cb) = after_connect {
136 cb(conn.clone()).await?;
137 }
138
139 Ok(conn)
140 }
141}
142
143impl SqlxPostgresConnector {
144 pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
146 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
147 pool,
148 metric_callback: None,
149 })
150 .into()
151 }
152}
153
154impl SqlxPostgresPoolConnection {
155 #[instrument(level = "trace")]
157 pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
158 debug_print!("{}", stmt);
159
160 let query = sqlx_query(&stmt);
161 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
162 crate::metric::metric!(self.metric_callback, &stmt, {
163 match query.execute(&mut *conn).await {
164 Ok(res) => Ok(res.into()),
165 Err(err) => Err(sqlx_error_to_exec_err(err)),
166 }
167 })
168 }
169
170 #[instrument(level = "trace")]
172 pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
173 debug_print!("{}", sql);
174
175 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
176 match conn.execute(sql).await {
177 Ok(res) => Ok(res.into()),
178 Err(err) => Err(sqlx_error_to_exec_err(err)),
179 }
180 }
181
182 #[instrument(level = "trace")]
184 pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
185 debug_print!("{}", stmt);
186
187 let query = sqlx_query(&stmt);
188 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
189 crate::metric::metric!(self.metric_callback, &stmt, {
190 match query.fetch_one(&mut *conn).await {
191 Ok(row) => Ok(Some(row.into())),
192 Err(err) => match err {
193 sqlx::Error::RowNotFound => Ok(None),
194 _ => Err(sqlx_error_to_query_err(err)),
195 },
196 }
197 })
198 }
199
200 #[instrument(level = "trace")]
202 pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
203 debug_print!("{}", stmt);
204
205 let query = sqlx_query(&stmt);
206 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
207 crate::metric::metric!(self.metric_callback, &stmt, {
208 match query.fetch_all(&mut *conn).await {
209 Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
210 Err(err) => Err(sqlx_error_to_query_err(err)),
211 }
212 })
213 }
214
215 #[instrument(level = "trace")]
217 pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
218 debug_print!("{}", stmt);
219
220 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
221 Ok(QueryStream::from((
222 conn,
223 stmt,
224 self.metric_callback.clone(),
225 )))
226 }
227
228 #[instrument(level = "trace")]
230 pub async fn begin(
231 &self,
232 isolation_level: Option<IsolationLevel>,
233 access_mode: Option<AccessMode>,
234 ) -> Result<DatabaseTransaction, DbErr> {
235 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
236 DatabaseTransaction::new_postgres(
237 conn,
238 self.metric_callback.clone(),
239 isolation_level,
240 access_mode,
241 )
242 .await
243 }
244
245 #[instrument(level = "trace", skip(callback))]
247 pub async fn transaction<F, T, E>(
248 &self,
249 callback: F,
250 isolation_level: Option<IsolationLevel>,
251 access_mode: Option<AccessMode>,
252 ) -> Result<T, TransactionError<E>>
253 where
254 F: for<'b> FnOnce(
255 &'b DatabaseTransaction,
256 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
257 + Send,
258 T: Send,
259 E: std::fmt::Display + std::fmt::Debug + Send,
260 {
261 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
262 let transaction = DatabaseTransaction::new_postgres(
263 conn,
264 self.metric_callback.clone(),
265 isolation_level,
266 access_mode,
267 )
268 .await
269 .map_err(|e| TransactionError::Connection(e))?;
270 transaction.run(callback).await
271 }
272
273 pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
274 where
275 F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
276 {
277 self.metric_callback = Some(Arc::new(callback));
278 }
279
280 pub async fn ping(&self) -> Result<(), DbErr> {
282 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
283 match conn.ping().await {
284 Ok(_) => Ok(()),
285 Err(err) => Err(sqlx_error_to_conn_err(err)),
286 }
287 }
288
289 pub async fn close(self) -> Result<(), DbErr> {
292 self.close_by_ref().await
293 }
294
295 pub async fn close_by_ref(&self) -> Result<(), DbErr> {
297 self.pool.close().await;
298 Ok(())
299 }
300}
301
302impl From<PgRow> for QueryResult {
303 fn from(row: PgRow) -> QueryResult {
304 QueryResult {
305 row: QueryResultRow::SqlxPostgres(row),
306 }
307 }
308}
309
310impl From<PgQueryResult> for ExecResult {
311 fn from(result: PgQueryResult) -> ExecResult {
312 ExecResult {
313 result: ExecResultHolder::SqlxPostgres(result),
314 }
315 }
316}
317
318pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, SqlxValues> {
319 let values = stmt
320 .values
321 .as_ref()
322 .map_or(Values(Vec::new()), |values| values.clone());
323 sqlx::query_with(&stmt.sql, SqlxValues(values))
324}
325
326pub(crate) async fn set_transaction_config(
327 conn: &mut PoolConnection<Postgres>,
328 isolation_level: Option<IsolationLevel>,
329 access_mode: Option<AccessMode>,
330) -> Result<(), DbErr> {
331 if let Some(isolation_level) = isolation_level {
332 let stmt = Statement {
333 sql: format!("SET TRANSACTION ISOLATION LEVEL {isolation_level}"),
334 values: None,
335 db_backend: DbBackend::Postgres,
336 };
337 let query = sqlx_query(&stmt);
338 conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
339 }
340 if let Some(access_mode) = access_mode {
341 let stmt = Statement {
342 sql: format!("SET TRANSACTION {access_mode}"),
343 values: None,
344 db_backend: DbBackend::Postgres,
345 };
346 let query = sqlx_query(&stmt);
347 conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
348 }
349 Ok(())
350}
351
352impl
353 From<(
354 PoolConnection<sqlx::Postgres>,
355 Statement,
356 Option<crate::metric::Callback>,
357 )> for crate::QueryStream
358{
359 fn from(
360 (conn, stmt, metric_callback): (
361 PoolConnection<sqlx::Postgres>,
362 Statement,
363 Option<crate::metric::Callback>,
364 ),
365 ) -> Self {
366 crate::QueryStream::build(
367 stmt,
368 crate::InnerConnection::Postgres(conn),
369 metric_callback,
370 )
371 }
372}
373
374impl crate::DatabaseTransaction {
375 pub(crate) async fn new_postgres(
376 inner: PoolConnection<sqlx::Postgres>,
377 metric_callback: Option<crate::metric::Callback>,
378 isolation_level: Option<IsolationLevel>,
379 access_mode: Option<AccessMode>,
380 ) -> Result<crate::DatabaseTransaction, DbErr> {
381 Self::begin(
382 Arc::new(Mutex::new(crate::InnerConnection::Postgres(inner))),
383 crate::DbBackend::Postgres,
384 metric_callback,
385 isolation_level,
386 access_mode,
387 )
388 .await
389 }
390}
391
392#[cfg(feature = "proxy")]
393pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> crate::ProxyRow {
394 use sea_query::Value;
397 use sqlx::{Column, Row, TypeInfo};
398 crate::ProxyRow {
399 values: row
400 .columns()
401 .iter()
402 .map(|c| {
403 (
404 c.name().to_string(),
405 match c.type_info().name() {
406 "BOOL" => {
407 Value::Bool(row.try_get(c.ordinal()).expect("Failed to get boolean"))
408 }
409 #[cfg(feature = "postgres-array")]
410 "BOOL[]" => Value::Array(
411 sea_query::ArrayType::Bool,
412 row.try_get::<Option<Vec<bool>>, _>(c.ordinal())
413 .expect("Failed to get boolean array")
414 .map(|vals| {
415 Box::new(
416 vals.into_iter()
417 .map(|val| Value::Bool(Some(val)))
418 .collect(),
419 )
420 }),
421 ),
422
423 "\"CHAR\"" => Value::TinyInt(
424 row.try_get(c.ordinal())
425 .expect("Failed to get small integer"),
426 ),
427 #[cfg(feature = "postgres-array")]
428 "\"CHAR\"[]" => Value::Array(
429 sea_query::ArrayType::TinyInt,
430 row.try_get::<Option<Vec<i8>>, _>(c.ordinal())
431 .expect("Failed to get small integer array")
432 .map(|vals: Vec<i8>| {
433 Box::new(
434 vals.into_iter()
435 .map(|val| Value::TinyInt(Some(val)))
436 .collect(),
437 )
438 }),
439 ),
440
441 "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(
442 row.try_get(c.ordinal())
443 .expect("Failed to get small integer"),
444 ),
445 #[cfg(feature = "postgres-array")]
446 "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
447 sea_query::ArrayType::SmallInt,
448 row.try_get::<Option<Vec<i16>>, _>(c.ordinal())
449 .expect("Failed to get small integer array")
450 .map(|vals: Vec<i16>| {
451 Box::new(
452 vals.into_iter()
453 .map(|val| Value::SmallInt(Some(val)))
454 .collect(),
455 )
456 }),
457 ),
458
459 "INT" | "SERIAL" | "INT4" => {
460 Value::Int(row.try_get(c.ordinal()).expect("Failed to get integer"))
461 }
462 #[cfg(feature = "postgres-array")]
463 "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
464 sea_query::ArrayType::Int,
465 row.try_get::<Option<Vec<i32>>, _>(c.ordinal())
466 .expect("Failed to get integer array")
467 .map(|vals: Vec<i32>| {
468 Box::new(
469 vals.into_iter().map(|val| Value::Int(Some(val))).collect(),
470 )
471 }),
472 ),
473
474 "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(
475 row.try_get(c.ordinal()).expect("Failed to get big integer"),
476 ),
477 #[cfg(feature = "postgres-array")]
478 "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
479 sea_query::ArrayType::BigInt,
480 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
481 .expect("Failed to get big integer array")
482 .map(|vals: Vec<i64>| {
483 Box::new(
484 vals.into_iter()
485 .map(|val| Value::BigInt(Some(val)))
486 .collect(),
487 )
488 }),
489 ),
490
491 "FLOAT4" | "REAL" => {
492 Value::Float(row.try_get(c.ordinal()).expect("Failed to get float"))
493 }
494 #[cfg(feature = "postgres-array")]
495 "FLOAT4[]" | "REAL[]" => Value::Array(
496 sea_query::ArrayType::Float,
497 row.try_get::<Option<Vec<f32>>, _>(c.ordinal())
498 .expect("Failed to get float array")
499 .map(|vals| {
500 Box::new(
501 vals.into_iter()
502 .map(|val| Value::Float(Some(val)))
503 .collect(),
504 )
505 }),
506 ),
507
508 "FLOAT8" | "DOUBLE PRECISION" => {
509 Value::Double(row.try_get(c.ordinal()).expect("Failed to get double"))
510 }
511 #[cfg(feature = "postgres-array")]
512 "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
513 sea_query::ArrayType::Double,
514 row.try_get::<Option<Vec<f64>>, _>(c.ordinal())
515 .expect("Failed to get double array")
516 .map(|vals| {
517 Box::new(
518 vals.into_iter()
519 .map(|val| Value::Double(Some(val)))
520 .collect(),
521 )
522 }),
523 ),
524
525 "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(
526 row.try_get::<Option<String>, _>(c.ordinal())
527 .expect("Failed to get string")
528 .map(Box::new),
529 ),
530 #[cfg(feature = "postgres-array")]
531 "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
532 sea_query::ArrayType::String,
533 row.try_get::<Option<Vec<String>>, _>(c.ordinal())
534 .expect("Failed to get string array")
535 .map(|vals| {
536 Box::new(
537 vals.into_iter()
538 .map(|val| Value::String(Some(Box::new(val))))
539 .collect(),
540 )
541 }),
542 ),
543
544 "BYTEA" => Value::Bytes(
545 row.try_get::<Option<Vec<u8>>, _>(c.ordinal())
546 .expect("Failed to get bytes")
547 .map(Box::new),
548 ),
549 #[cfg(feature = "postgres-array")]
550 "BYTEA[]" => Value::Array(
551 sea_query::ArrayType::Bytes,
552 row.try_get::<Option<Vec<Vec<u8>>>, _>(c.ordinal())
553 .expect("Failed to get bytes array")
554 .map(|vals| {
555 Box::new(
556 vals.into_iter()
557 .map(|val| Value::Bytes(Some(Box::new(val))))
558 .collect(),
559 )
560 }),
561 ),
562
563 #[cfg(feature = "with-bigdecimal")]
564 "NUMERIC" => Value::BigDecimal(
565 row.try_get::<Option<bigdecimal::BigDecimal>, _>(c.ordinal())
566 .expect("Failed to get numeric")
567 .map(Box::new),
568 ),
569 #[cfg(all(
570 feature = "with-rust_decimal",
571 not(feature = "with-bigdecimal")
572 ))]
573 "NUMERIC" => Value::Decimal(
574 row.try_get(c.ordinal())
575 .expect("Failed to get numeric")
576 .map(Box::new),
577 ),
578
579 #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
580 "NUMERIC[]" => Value::Array(
581 sea_query::ArrayType::BigDecimal,
582 row.try_get::<Option<Vec<bigdecimal::BigDecimal>>, _>(c.ordinal())
583 .expect("Failed to get numeric array")
584 .map(|vals| {
585 Box::new(
586 vals.into_iter()
587 .map(|val| Value::BigDecimal(Some(Box::new(val))))
588 .collect(),
589 )
590 }),
591 ),
592 #[cfg(all(
593 feature = "with-rust_decimal",
594 not(feature = "with-bigdecimal"),
595 feature = "postgres-array"
596 ))]
597 "NUMERIC[]" => Value::Array(
598 sea_query::ArrayType::Decimal,
599 row.try_get::<Option<Vec<rust_decimal::Decimal>>, _>(c.ordinal())
600 .expect("Failed to get numeric array")
601 .map(|vals| {
602 Box::new(
603 vals.into_iter()
604 .map(|val| Value::Decimal(Some(Box::new(val))))
605 .collect(),
606 )
607 }),
608 ),
609
610 "OID" => {
611 Value::BigInt(row.try_get(c.ordinal()).expect("Failed to get oid"))
612 }
613 #[cfg(feature = "postgres-array")]
614 "OID[]" => Value::Array(
615 sea_query::ArrayType::BigInt,
616 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
617 .expect("Failed to get oid array")
618 .map(|vals| {
619 Box::new(
620 vals.into_iter()
621 .map(|val| Value::BigInt(Some(val)))
622 .collect(),
623 )
624 }),
625 ),
626
627 "JSON" | "JSONB" => Value::Json(
628 row.try_get::<Option<serde_json::Value>, _>(c.ordinal())
629 .expect("Failed to get json")
630 .map(Box::new),
631 ),
632 #[cfg(any(feature = "json-array", feature = "postgres-array"))]
633 "JSON[]" | "JSONB[]" => Value::Array(
634 sea_query::ArrayType::Json,
635 row.try_get::<Option<Vec<serde_json::Value>>, _>(c.ordinal())
636 .expect("Failed to get json array")
637 .map(|vals| {
638 Box::new(
639 vals.into_iter()
640 .map(|val| Value::Json(Some(Box::new(val))))
641 .collect(),
642 )
643 }),
644 ),
645
646 #[cfg(feature = "with-ipnetwork")]
647 "INET" | "CIDR" => Value::IpNetwork(
648 row.try_get::<Option<ipnetwork::IpNetwork>, _>(c.ordinal())
649 .expect("Failed to get ip address")
650 .map(Box::new),
651 ),
652 #[cfg(feature = "with-ipnetwork")]
653 "INET[]" | "CIDR[]" => Value::Array(
654 sea_query::ArrayType::IpNetwork,
655 row.try_get::<Option<Vec<ipnetwork::IpNetwork>>, _>(c.ordinal())
656 .expect("Failed to get ip address array")
657 .map(|vals| {
658 Box::new(
659 vals.into_iter()
660 .map(|val| Value::IpNetwork(Some(Box::new(val))))
661 .collect(),
662 )
663 }),
664 ),
665
666 #[cfg(feature = "with-mac_address")]
667 "MACADDR" | "MACADDR8" => Value::MacAddress(
668 row.try_get::<Option<mac_address::MacAddress>, _>(c.ordinal())
669 .expect("Failed to get mac address")
670 .map(Box::new),
671 ),
672 #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
673 "MACADDR[]" | "MACADDR8[]" => Value::Array(
674 sea_query::ArrayType::MacAddress,
675 row.try_get::<Option<Vec<mac_address::MacAddress>>, _>(c.ordinal())
676 .expect("Failed to get mac address array")
677 .map(|vals| {
678 Box::new(
679 vals.into_iter()
680 .map(|val| Value::MacAddress(Some(Box::new(val))))
681 .collect(),
682 )
683 }),
684 ),
685
686 #[cfg(feature = "with-chrono")]
687 "TIMESTAMP" => Value::ChronoDateTime(
688 row.try_get::<Option<chrono::NaiveDateTime>, _>(c.ordinal())
689 .expect("Failed to get timestamp")
690 .map(Box::new),
691 ),
692 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
693 "TIMESTAMP" => Value::TimeDateTime(
694 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
695 .expect("Failed to get timestamp")
696 .map(Box::new),
697 ),
698
699 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
700 "TIMESTAMP[]" => Value::Array(
701 sea_query::ArrayType::ChronoDateTime,
702 row.try_get::<Option<Vec<chrono::NaiveDateTime>>, _>(c.ordinal())
703 .expect("Failed to get timestamp array")
704 .map(|vals| {
705 Box::new(
706 vals.into_iter()
707 .map(|val| Value::ChronoDateTime(Some(Box::new(val))))
708 .collect(),
709 )
710 }),
711 ),
712 #[cfg(all(
713 feature = "with-time",
714 not(feature = "with-chrono"),
715 feature = "postgres-array"
716 ))]
717 "TIMESTAMP[]" => Value::Array(
718 sea_query::ArrayType::TimeDateTime,
719 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
720 .expect("Failed to get timestamp array")
721 .map(|vals| {
722 Box::new(
723 vals.into_iter()
724 .map(|val| Value::TimeDateTime(Some(Box::new(val))))
725 .collect(),
726 )
727 }),
728 ),
729
730 #[cfg(feature = "with-chrono")]
731 "DATE" => Value::ChronoDate(
732 row.try_get::<Option<chrono::NaiveDate>, _>(c.ordinal())
733 .expect("Failed to get date")
734 .map(Box::new),
735 ),
736 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
737 "DATE" => Value::TimeDate(
738 row.try_get::<Option<time::Date>, _>(c.ordinal())
739 .expect("Failed to get date")
740 .map(Box::new),
741 ),
742
743 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
744 "DATE[]" => Value::Array(
745 sea_query::ArrayType::ChronoDate,
746 row.try_get::<Option<Vec<chrono::NaiveDate>>, _>(c.ordinal())
747 .expect("Failed to get date array")
748 .map(|vals| {
749 Box::new(
750 vals.into_iter()
751 .map(|val| Value::ChronoDate(Some(Box::new(val))))
752 .collect(),
753 )
754 }),
755 ),
756 #[cfg(all(
757 feature = "with-time",
758 not(feature = "with-chrono"),
759 feature = "postgres-array"
760 ))]
761 "DATE[]" => Value::Array(
762 sea_query::ArrayType::TimeDate,
763 row.try_get::<Option<Vec<time::Date>>, _>(c.ordinal())
764 .expect("Failed to get date array")
765 .map(|vals| {
766 Box::new(
767 vals.into_iter()
768 .map(|val| Value::TimeDate(Some(Box::new(val))))
769 .collect(),
770 )
771 }),
772 ),
773
774 #[cfg(feature = "with-chrono")]
775 "TIME" => Value::ChronoTime(
776 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
777 .expect("Failed to get time")
778 .map(Box::new),
779 ),
780 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
781 "TIME" => Value::TimeTime(
782 row.try_get::<Option<time::Time>, _>(c.ordinal())
783 .expect("Failed to get time")
784 .map(Box::new),
785 ),
786
787 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
788 "TIME[]" => Value::Array(
789 sea_query::ArrayType::ChronoTime,
790 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
791 .expect("Failed to get time array")
792 .map(|vals| {
793 Box::new(
794 vals.into_iter()
795 .map(|val| Value::ChronoTime(Some(Box::new(val))))
796 .collect(),
797 )
798 }),
799 ),
800 #[cfg(all(
801 feature = "with-time",
802 not(feature = "with-chrono"),
803 feature = "postgres-array"
804 ))]
805 "TIME[]" => Value::Array(
806 sea_query::ArrayType::TimeTime,
807 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
808 .expect("Failed to get time array")
809 .map(|vals| {
810 Box::new(
811 vals.into_iter()
812 .map(|val| Value::TimeTime(Some(Box::new(val))))
813 .collect(),
814 )
815 }),
816 ),
817
818 #[cfg(feature = "with-chrono")]
819 "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(
820 row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
821 .expect("Failed to get timestamptz")
822 .map(Box::new),
823 ),
824 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
825 "TIMESTAMPTZ" => Value::TimeDateTime(
826 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
827 .expect("Failed to get timestamptz")
828 .map(Box::new),
829 ),
830
831 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
832 "TIMESTAMPTZ[]" => Value::Array(
833 sea_query::ArrayType::ChronoDateTimeUtc,
834 row.try_get::<Option<Vec<chrono::DateTime<chrono::Utc>>>, _>(
835 c.ordinal(),
836 )
837 .expect("Failed to get timestamptz array")
838 .map(|vals| {
839 Box::new(
840 vals.into_iter()
841 .map(|val| Value::ChronoDateTimeUtc(Some(Box::new(val))))
842 .collect(),
843 )
844 }),
845 ),
846 #[cfg(all(
847 feature = "with-time",
848 not(feature = "with-chrono"),
849 feature = "postgres-array"
850 ))]
851 "TIMESTAMPTZ[]" => Value::Array(
852 sea_query::ArrayType::TimeDateTime,
853 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
854 .expect("Failed to get timestamptz array")
855 .map(|vals| {
856 Box::new(
857 vals.into_iter()
858 .map(|val| Value::TimeDateTime(Some(Box::new(val))))
859 .collect(),
860 )
861 }),
862 ),
863
864 #[cfg(feature = "with-chrono")]
865 "TIMETZ" => Value::ChronoTime(
866 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
867 .expect("Failed to get timetz")
868 .map(Box::new),
869 ),
870 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
871 "TIMETZ" => Value::TimeTime(
872 row.try_get(c.ordinal())
873 .expect("Failed to get timetz")
874 .map(Box::new),
875 ),
876
877 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
878 "TIMETZ[]" => Value::Array(
879 sea_query::ArrayType::ChronoTime,
880 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
881 .expect("Failed to get timetz array")
882 .map(|vals| {
883 Box::new(
884 vals.into_iter()
885 .map(|val| Value::ChronoTime(Some(Box::new(val))))
886 .collect(),
887 )
888 }),
889 ),
890 #[cfg(all(
891 feature = "with-time",
892 not(feature = "with-chrono"),
893 feature = "postgres-array"
894 ))]
895 "TIMETZ[]" => Value::Array(
896 sea_query::ArrayType::TimeTime,
897 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
898 .expect("Failed to get timetz array")
899 .map(|vals| {
900 Box::new(
901 vals.into_iter()
902 .map(|val| Value::TimeTime(Some(Box::new(val))))
903 .collect(),
904 )
905 }),
906 ),
907
908 #[cfg(feature = "with-uuid")]
909 "UUID" => Value::Uuid(
910 row.try_get::<Option<uuid::Uuid>, _>(c.ordinal())
911 .expect("Failed to get uuid")
912 .map(Box::new),
913 ),
914
915 #[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
916 "UUID[]" => Value::Array(
917 sea_query::ArrayType::Uuid,
918 row.try_get::<Option<Vec<uuid::Uuid>>, _>(c.ordinal())
919 .expect("Failed to get uuid array")
920 .map(|vals| {
921 Box::new(
922 vals.into_iter()
923 .map(|val| Value::Uuid(Some(Box::new(val))))
924 .collect(),
925 )
926 }),
927 ),
928
929 _ => unreachable!("Unknown column type: {}", c.type_info().name()),
930 },
931 )
932 })
933 .collect(),
934 }
935}