1use futures_util::lock::Mutex;
2use log::LevelFilter;
3use sea_query::Values;
4use std::{fmt::Write, future::Future, pin::Pin, sync::Arc};
5
6use sqlx::{
7 Connection, Executor, PgPool, Postgres,
8 pool::PoolConnection,
9 postgres::{PgConnectOptions, PgQueryResult, PgRow},
10};
11
12use sea_query_sqlx::SqlxValues;
13use tracing::instrument;
14
15use crate::{
16 AccessMode, ConnectOptions, DatabaseConnection, DatabaseConnectionType, DatabaseTransaction,
17 IsolationLevel, QueryStream, Statement, TransactionError, debug_print, error::*, executor::*,
18};
19
20use super::sqlx_common::*;
21
22#[derive(Debug)]
24pub struct SqlxPostgresConnector;
25
26#[derive(Clone)]
28pub struct SqlxPostgresPoolConnection {
29 pub(crate) pool: PgPool,
30 metric_callback: Option<crate::metric::Callback>,
31}
32
33impl std::fmt::Debug for SqlxPostgresPoolConnection {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
36 }
37}
38
39impl From<PgPool> for SqlxPostgresPoolConnection {
40 fn from(pool: PgPool) -> Self {
41 SqlxPostgresPoolConnection {
42 pool,
43 metric_callback: None,
44 }
45 }
46}
47
48impl From<PgPool> for DatabaseConnection {
49 fn from(pool: PgPool) -> Self {
50 DatabaseConnectionType::SqlxPostgresPoolConnection(pool.into()).into()
51 }
52}
53
54impl SqlxPostgresConnector {
55 pub fn accepts(string: &str) -> bool {
57 string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
58 }
59
60 #[instrument(level = "trace")]
62 pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
63 let mut sqlx_opts = options
64 .url
65 .parse::<PgConnectOptions>()
66 .map_err(sqlx_error_to_conn_err)?;
67 use sqlx::ConnectOptions;
68 if !options.sqlx_logging {
69 sqlx_opts = sqlx_opts.disable_statement_logging();
70 } else {
71 sqlx_opts = sqlx_opts.log_statements(options.sqlx_logging_level);
72 if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
73 sqlx_opts = sqlx_opts.log_slow_statements(
74 options.sqlx_slow_statements_logging_level,
75 options.sqlx_slow_statements_logging_threshold,
76 );
77 }
78 }
79
80 if let Some(application_name) = &options.application_name {
81 sqlx_opts = sqlx_opts.application_name(application_name);
82 }
83
84 if let Some(timeout) = options.statement_timeout {
85 sqlx_opts = sqlx_opts.options([("statement_timeout", timeout.as_millis().to_string())]);
86 }
87
88 if let Some(f) = &options.pg_opts_fn {
89 sqlx_opts = f(sqlx_opts);
90 }
91
92 let set_search_path_sql = options.schema_search_path.as_ref().map(|schema| {
93 let mut string = "SET search_path = ".to_owned();
94 if schema.starts_with('"') {
95 write!(&mut string, "{schema}").expect("Infallible");
96 } else {
97 for (i, schema) in schema.split(',').enumerate() {
98 if i > 0 {
99 write!(&mut string, ",").expect("Infallible");
100 }
101 if schema.starts_with('"') {
102 write!(&mut string, "{schema}").expect("Infallible");
103 } else {
104 write!(&mut string, "\"{schema}\"").expect("Infallible");
105 }
106 }
107 }
108 string
109 });
110
111 let lazy = options.connect_lazy;
112 let after_connect = options.after_connect.clone();
113 let pg_pool_opts_fn = options.pg_pool_opts_fn.clone();
114 let mut pool_options = options.sqlx_pool_options();
115
116 if let Some(sql) = set_search_path_sql {
117 pool_options = pool_options.after_connect(move |conn, _| {
118 let sql = sql.clone();
119 Box::pin(async move {
120 sqlx::Executor::execute(conn, sql.as_str())
121 .await
122 .map(|_| ())
123 })
124 });
125 }
126 if let Some(f) = &pg_pool_opts_fn {
127 pool_options = f(pool_options);
128 }
129 let pool = if lazy {
130 pool_options.connect_lazy_with(sqlx_opts)
131 } else {
132 pool_options
133 .connect_with(sqlx_opts)
134 .await
135 .map_err(sqlx_error_to_conn_err)?
136 };
137
138 let conn: DatabaseConnection =
139 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
140 pool,
141 metric_callback: None,
142 })
143 .into();
144
145 if let Some(cb) = after_connect {
146 cb(conn.clone()).await?;
147 }
148
149 Ok(conn)
150 }
151}
152
153impl SqlxPostgresConnector {
154 pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
156 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
157 pool,
158 metric_callback: None,
159 })
160 .into()
161 }
162}
163
164impl SqlxPostgresPoolConnection {
165 #[instrument(level = "trace")]
167 pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
168 debug_print!("{}", stmt);
169
170 let query = sqlx_query(&stmt);
171 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
172 crate::metric::metric!(self.metric_callback, &stmt, {
173 match query.execute(&mut *conn).await {
174 Ok(res) => Ok(res.into()),
175 Err(err) => Err(sqlx_error_to_exec_err(err)),
176 }
177 })
178 }
179
180 #[instrument(level = "trace")]
182 pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
183 debug_print!("{}", sql);
184
185 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
186 match conn.execute(sql).await {
187 Ok(res) => Ok(res.into()),
188 Err(err) => Err(sqlx_error_to_exec_err(err)),
189 }
190 }
191
192 #[instrument(level = "trace")]
194 pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
195 debug_print!("{}", stmt);
196
197 let query = sqlx_query(&stmt);
198 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
199 crate::metric::metric!(self.metric_callback, &stmt, {
200 match query.fetch_one(&mut *conn).await {
201 Ok(row) => Ok(Some(row.into())),
202 Err(err) => match err {
203 sqlx::Error::RowNotFound => Ok(None),
204 _ => Err(sqlx_error_to_query_err(err)),
205 },
206 }
207 })
208 }
209
210 #[instrument(level = "trace")]
212 pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
213 debug_print!("{}", stmt);
214
215 let query = sqlx_query(&stmt);
216 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
217 crate::metric::metric!(self.metric_callback, &stmt, {
218 match query.fetch_all(&mut *conn).await {
219 Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
220 Err(err) => Err(sqlx_error_to_query_err(err)),
221 }
222 })
223 }
224
225 #[instrument(level = "trace")]
227 pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
228 debug_print!("{}", stmt);
229
230 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
231 Ok(QueryStream::from((
232 conn,
233 stmt,
234 self.metric_callback.clone(),
235 )))
236 }
237
238 #[instrument(level = "trace")]
240 pub async fn begin(
241 &self,
242 isolation_level: Option<IsolationLevel>,
243 access_mode: Option<AccessMode>,
244 ) -> Result<DatabaseTransaction, DbErr> {
245 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
246 DatabaseTransaction::new_postgres(
247 conn,
248 self.metric_callback.clone(),
249 isolation_level,
250 access_mode,
251 )
252 .await
253 }
254
255 #[instrument(level = "trace", skip(callback))]
257 pub async fn transaction<F, T, E>(
258 &self,
259 callback: F,
260 isolation_level: Option<IsolationLevel>,
261 access_mode: Option<AccessMode>,
262 ) -> Result<T, TransactionError<E>>
263 where
264 F: for<'b> FnOnce(
265 &'b DatabaseTransaction,
266 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
267 + Send,
268 T: Send,
269 E: std::fmt::Display + std::fmt::Debug + Send,
270 {
271 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
272 let transaction = DatabaseTransaction::new_postgres(
273 conn,
274 self.metric_callback.clone(),
275 isolation_level,
276 access_mode,
277 )
278 .await
279 .map_err(|e| TransactionError::Connection(e))?;
280 transaction.run(callback).await
281 }
282
283 pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
284 where
285 F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
286 {
287 self.metric_callback = Some(Arc::new(callback));
288 }
289
290 pub async fn ping(&self) -> Result<(), DbErr> {
292 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
293 match conn.ping().await {
294 Ok(_) => Ok(()),
295 Err(err) => Err(sqlx_error_to_conn_err(err)),
296 }
297 }
298
299 pub async fn close(self) -> Result<(), DbErr> {
302 self.close_by_ref().await
303 }
304
305 pub async fn close_by_ref(&self) -> Result<(), DbErr> {
307 self.pool.close().await;
308 Ok(())
309 }
310}
311
312impl From<PgRow> for QueryResult {
313 fn from(row: PgRow) -> QueryResult {
314 QueryResult {
315 row: QueryResultRow::SqlxPostgres(row),
316 }
317 }
318}
319
320impl From<PgQueryResult> for ExecResult {
321 fn from(result: PgQueryResult) -> ExecResult {
322 ExecResult {
323 result: ExecResultHolder::SqlxPostgres(result),
324 }
325 }
326}
327
328pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, SqlxValues> {
329 let values = stmt
330 .values
331 .as_ref()
332 .map_or(Values(Vec::new()), |values| values.clone());
333 sqlx::query_with(&stmt.sql, SqlxValues(values))
334}
335
336pub(crate) async fn set_transaction_config(
337 conn: &mut PoolConnection<Postgres>,
338 isolation_level: Option<IsolationLevel>,
339 access_mode: Option<AccessMode>,
340) -> Result<(), DbErr> {
341 let mut settings = Vec::new();
342
343 if let Some(isolation_level) = isolation_level {
344 settings.push(format!("ISOLATION LEVEL {isolation_level}"));
345 }
346
347 if let Some(access_mode) = access_mode {
348 settings.push(access_mode.to_string());
349 }
350
351 if !settings.is_empty() {
352 let sql = format!("SET TRANSACTION {}", settings.join(" "));
353 sqlx::query(&sql)
354 .execute(&mut **conn)
355 .await
356 .map_err(sqlx_error_to_exec_err)?;
357 }
358 Ok(())
359}
360
361impl
362 From<(
363 PoolConnection<sqlx::Postgres>,
364 Statement,
365 Option<crate::metric::Callback>,
366 )> for crate::QueryStream
367{
368 fn from(
369 (conn, stmt, metric_callback): (
370 PoolConnection<sqlx::Postgres>,
371 Statement,
372 Option<crate::metric::Callback>,
373 ),
374 ) -> Self {
375 crate::QueryStream::build(
376 stmt,
377 crate::InnerConnection::Postgres(conn),
378 metric_callback,
379 )
380 }
381}
382
383impl crate::DatabaseTransaction {
384 pub(crate) async fn new_postgres(
385 inner: PoolConnection<sqlx::Postgres>,
386 metric_callback: Option<crate::metric::Callback>,
387 isolation_level: Option<IsolationLevel>,
388 access_mode: Option<AccessMode>,
389 ) -> Result<crate::DatabaseTransaction, DbErr> {
390 Self::begin(
391 Arc::new(Mutex::new(crate::InnerConnection::Postgres(inner))),
392 crate::DbBackend::Postgres,
393 metric_callback,
394 isolation_level,
395 access_mode,
396 None,
397 )
398 .await
399 }
400}
401
402#[cfg(feature = "proxy")]
403pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> crate::ProxyRow {
404 use sea_query::Value;
407 use sqlx::{Column, Row, TypeInfo};
408 crate::ProxyRow {
409 values: row
410 .columns()
411 .iter()
412 .map(|c| {
413 (
414 c.name().to_string(),
415 match c.type_info().name() {
416 "BOOL" => {
417 Value::Bool(row.try_get(c.ordinal()).expect("Failed to get boolean"))
418 }
419 #[cfg(feature = "postgres-array")]
420 "BOOL[]" => Value::Array(
421 sea_query::ArrayType::Bool,
422 row.try_get::<Option<Vec<bool>>, _>(c.ordinal())
423 .expect("Failed to get boolean array")
424 .map(|vals| {
425 Box::new(
426 vals.into_iter()
427 .map(|val| Value::Bool(Some(val)))
428 .collect(),
429 )
430 }),
431 ),
432
433 "\"CHAR\"" => Value::TinyInt(
434 row.try_get(c.ordinal())
435 .expect("Failed to get small integer"),
436 ),
437 #[cfg(feature = "postgres-array")]
438 "\"CHAR\"[]" => Value::Array(
439 sea_query::ArrayType::TinyInt,
440 row.try_get::<Option<Vec<i8>>, _>(c.ordinal())
441 .expect("Failed to get small integer array")
442 .map(|vals: Vec<i8>| {
443 Box::new(
444 vals.into_iter()
445 .map(|val| Value::TinyInt(Some(val)))
446 .collect(),
447 )
448 }),
449 ),
450
451 "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(
452 row.try_get(c.ordinal())
453 .expect("Failed to get small integer"),
454 ),
455 #[cfg(feature = "postgres-array")]
456 "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
457 sea_query::ArrayType::SmallInt,
458 row.try_get::<Option<Vec<i16>>, _>(c.ordinal())
459 .expect("Failed to get small integer array")
460 .map(|vals: Vec<i16>| {
461 Box::new(
462 vals.into_iter()
463 .map(|val| Value::SmallInt(Some(val)))
464 .collect(),
465 )
466 }),
467 ),
468
469 "INT" | "SERIAL" | "INT4" => {
470 Value::Int(row.try_get(c.ordinal()).expect("Failed to get integer"))
471 }
472 #[cfg(feature = "postgres-array")]
473 "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
474 sea_query::ArrayType::Int,
475 row.try_get::<Option<Vec<i32>>, _>(c.ordinal())
476 .expect("Failed to get integer array")
477 .map(|vals: Vec<i32>| {
478 Box::new(
479 vals.into_iter().map(|val| Value::Int(Some(val))).collect(),
480 )
481 }),
482 ),
483
484 "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(
485 row.try_get(c.ordinal()).expect("Failed to get big integer"),
486 ),
487 #[cfg(feature = "postgres-array")]
488 "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
489 sea_query::ArrayType::BigInt,
490 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
491 .expect("Failed to get big integer array")
492 .map(|vals: Vec<i64>| {
493 Box::new(
494 vals.into_iter()
495 .map(|val| Value::BigInt(Some(val)))
496 .collect(),
497 )
498 }),
499 ),
500
501 "FLOAT4" | "REAL" => {
502 Value::Float(row.try_get(c.ordinal()).expect("Failed to get float"))
503 }
504 #[cfg(feature = "postgres-array")]
505 "FLOAT4[]" | "REAL[]" => Value::Array(
506 sea_query::ArrayType::Float,
507 row.try_get::<Option<Vec<f32>>, _>(c.ordinal())
508 .expect("Failed to get float array")
509 .map(|vals| {
510 Box::new(
511 vals.into_iter()
512 .map(|val| Value::Float(Some(val)))
513 .collect(),
514 )
515 }),
516 ),
517
518 "FLOAT8" | "DOUBLE PRECISION" => {
519 Value::Double(row.try_get(c.ordinal()).expect("Failed to get double"))
520 }
521 #[cfg(feature = "postgres-array")]
522 "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
523 sea_query::ArrayType::Double,
524 row.try_get::<Option<Vec<f64>>, _>(c.ordinal())
525 .expect("Failed to get double array")
526 .map(|vals| {
527 Box::new(
528 vals.into_iter()
529 .map(|val| Value::Double(Some(val)))
530 .collect(),
531 )
532 }),
533 ),
534
535 "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(
536 row.try_get::<Option<String>, _>(c.ordinal())
537 .expect("Failed to get string"),
538 ),
539 #[cfg(feature = "postgres-array")]
540 "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
541 sea_query::ArrayType::String,
542 row.try_get::<Option<Vec<String>>, _>(c.ordinal())
543 .expect("Failed to get string array")
544 .map(|vals| {
545 Box::new(
546 vals.into_iter()
547 .map(|val| Value::String(Some(val)))
548 .collect(),
549 )
550 }),
551 ),
552
553 "BYTEA" => Value::Bytes(
554 row.try_get::<Option<Vec<u8>>, _>(c.ordinal())
555 .expect("Failed to get bytes"),
556 ),
557 #[cfg(feature = "postgres-array")]
558 "BYTEA[]" => Value::Array(
559 sea_query::ArrayType::Bytes,
560 row.try_get::<Option<Vec<Vec<u8>>>, _>(c.ordinal())
561 .expect("Failed to get bytes array")
562 .map(|vals| {
563 Box::new(
564 vals.into_iter()
565 .map(|val| Value::Bytes(Some(val)))
566 .collect(),
567 )
568 }),
569 ),
570
571 #[cfg(feature = "with-bigdecimal")]
572 "NUMERIC" => Value::BigDecimal(
573 row.try_get::<Option<bigdecimal::BigDecimal>, _>(c.ordinal())
574 .expect("Failed to get numeric"),
575 ),
576 #[cfg(all(
577 feature = "with-rust_decimal",
578 not(feature = "with-bigdecimal")
579 ))]
580 "NUMERIC" => {
581 Value::Decimal(row.try_get(c.ordinal()).expect("Failed to get numeric"))
582 }
583
584 #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
585 "NUMERIC[]" => Value::Array(
586 sea_query::ArrayType::BigDecimal,
587 row.try_get::<Option<Vec<bigdecimal::BigDecimal>>, _>(c.ordinal())
588 .expect("Failed to get numeric array")
589 .map(|vals| {
590 Box::new(
591 vals.into_iter()
592 .map(|val| Value::BigDecimal(Some(val)))
593 .collect(),
594 )
595 }),
596 ),
597 #[cfg(all(
598 feature = "with-rust_decimal",
599 not(feature = "with-bigdecimal"),
600 feature = "postgres-array"
601 ))]
602 "NUMERIC[]" => Value::Array(
603 sea_query::ArrayType::Decimal,
604 row.try_get::<Option<Vec<rust_decimal::Decimal>>, _>(c.ordinal())
605 .expect("Failed to get numeric array")
606 .map(|vals| {
607 Box::new(
608 vals.into_iter()
609 .map(|val| Value::Decimal(Some(val)))
610 .collect(),
611 )
612 }),
613 ),
614
615 "OID" => {
616 Value::BigInt(row.try_get(c.ordinal()).expect("Failed to get oid"))
617 }
618 #[cfg(feature = "postgres-array")]
619 "OID[]" => Value::Array(
620 sea_query::ArrayType::BigInt,
621 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
622 .expect("Failed to get oid array")
623 .map(|vals| {
624 Box::new(
625 vals.into_iter()
626 .map(|val| Value::BigInt(Some(val)))
627 .collect(),
628 )
629 }),
630 ),
631
632 #[cfg(feature = "with-json")]
633 "JSON" | "JSONB" => Value::Json(
634 row.try_get::<Option<serde_json::Value>, _>(c.ordinal())
635 .expect("Failed to get json")
636 .map(Box::new),
637 ),
638 #[cfg(all(
639 feature = "with-json",
640 any(feature = "json-array", feature = "postgres-array")
641 ))]
642 "JSON[]" | "JSONB[]" => Value::Array(
643 sea_query::ArrayType::Json,
644 row.try_get::<Option<Vec<serde_json::Value>>, _>(c.ordinal())
645 .expect("Failed to get json array")
646 .map(|vals| {
647 Box::new(
648 vals.into_iter()
649 .map(|val| Value::Json(Some(Box::new(val))))
650 .collect(),
651 )
652 }),
653 ),
654
655 #[cfg(feature = "with-ipnetwork")]
656 "INET" | "CIDR" => Value::IpNetwork(
657 row.try_get::<Option<ipnetwork::IpNetwork>, _>(c.ordinal())
658 .expect("Failed to get ip address"),
659 ),
660 #[cfg(feature = "with-ipnetwork")]
661 "INET[]" | "CIDR[]" => Value::Array(
662 sea_query::ArrayType::IpNetwork,
663 row.try_get::<Option<Vec<ipnetwork::IpNetwork>>, _>(c.ordinal())
664 .expect("Failed to get ip address array")
665 .map(|vals| {
666 Box::new(
667 vals.into_iter()
668 .map(|val| Value::IpNetwork(Some(val)))
669 .collect(),
670 )
671 }),
672 ),
673
674 #[cfg(feature = "with-mac_address")]
675 "MACADDR" | "MACADDR8" => Value::MacAddress(
676 row.try_get::<Option<mac_address::MacAddress>, _>(c.ordinal())
677 .expect("Failed to get mac address"),
678 ),
679 #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
680 "MACADDR[]" | "MACADDR8[]" => Value::Array(
681 sea_query::ArrayType::MacAddress,
682 row.try_get::<Option<Vec<mac_address::MacAddress>>, _>(c.ordinal())
683 .expect("Failed to get mac address array")
684 .map(|vals| {
685 Box::new(
686 vals.into_iter()
687 .map(|val| Value::MacAddress(Some(val)))
688 .collect(),
689 )
690 }),
691 ),
692
693 #[cfg(feature = "with-chrono")]
694 "TIMESTAMP" => Value::ChronoDateTime(
695 row.try_get::<Option<chrono::NaiveDateTime>, _>(c.ordinal())
696 .expect("Failed to get timestamp"),
697 ),
698 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
699 "TIMESTAMP" => Value::TimeDateTime(
700 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
701 .expect("Failed to get timestamp"),
702 ),
703
704 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
705 "TIMESTAMP[]" => Value::Array(
706 sea_query::ArrayType::ChronoDateTime,
707 row.try_get::<Option<Vec<chrono::NaiveDateTime>>, _>(c.ordinal())
708 .expect("Failed to get timestamp array")
709 .map(|vals| {
710 Box::new(
711 vals.into_iter()
712 .map(|val| Value::ChronoDateTime(Some(val)))
713 .collect(),
714 )
715 }),
716 ),
717 #[cfg(all(
718 feature = "with-time",
719 not(feature = "with-chrono"),
720 feature = "postgres-array"
721 ))]
722 "TIMESTAMP[]" => Value::Array(
723 sea_query::ArrayType::TimeDateTime,
724 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
725 .expect("Failed to get timestamp array")
726 .map(|vals| {
727 Box::new(
728 vals.into_iter()
729 .map(|val| Value::TimeDateTime(Some(val)))
730 .collect(),
731 )
732 }),
733 ),
734
735 #[cfg(feature = "with-chrono")]
736 "DATE" => Value::ChronoDate(
737 row.try_get::<Option<chrono::NaiveDate>, _>(c.ordinal())
738 .expect("Failed to get date"),
739 ),
740 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
741 "DATE" => Value::TimeDate(
742 row.try_get::<Option<time::Date>, _>(c.ordinal())
743 .expect("Failed to get date"),
744 ),
745
746 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
747 "DATE[]" => Value::Array(
748 sea_query::ArrayType::ChronoDate,
749 row.try_get::<Option<Vec<chrono::NaiveDate>>, _>(c.ordinal())
750 .expect("Failed to get date array")
751 .map(|vals| {
752 Box::new(
753 vals.into_iter()
754 .map(|val| Value::ChronoDate(Some(val)))
755 .collect(),
756 )
757 }),
758 ),
759 #[cfg(all(
760 feature = "with-time",
761 not(feature = "with-chrono"),
762 feature = "postgres-array"
763 ))]
764 "DATE[]" => Value::Array(
765 sea_query::ArrayType::TimeDate,
766 row.try_get::<Option<Vec<time::Date>>, _>(c.ordinal())
767 .expect("Failed to get date array")
768 .map(|vals| {
769 Box::new(
770 vals.into_iter()
771 .map(|val| Value::TimeDate(Some(val)))
772 .collect(),
773 )
774 }),
775 ),
776
777 #[cfg(feature = "with-chrono")]
778 "TIME" => Value::ChronoTime(
779 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
780 .expect("Failed to get time"),
781 ),
782 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
783 "TIME" => Value::TimeTime(
784 row.try_get::<Option<time::Time>, _>(c.ordinal())
785 .expect("Failed to get time"),
786 ),
787
788 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
789 "TIME[]" => Value::Array(
790 sea_query::ArrayType::ChronoTime,
791 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
792 .expect("Failed to get time array")
793 .map(|vals| {
794 Box::new(
795 vals.into_iter()
796 .map(|val| Value::ChronoTime(Some(val)))
797 .collect(),
798 )
799 }),
800 ),
801 #[cfg(all(
802 feature = "with-time",
803 not(feature = "with-chrono"),
804 feature = "postgres-array"
805 ))]
806 "TIME[]" => Value::Array(
807 sea_query::ArrayType::TimeTime,
808 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
809 .expect("Failed to get time array")
810 .map(|vals| {
811 Box::new(
812 vals.into_iter()
813 .map(|val| Value::TimeTime(Some(val)))
814 .collect(),
815 )
816 }),
817 ),
818
819 #[cfg(feature = "with-chrono")]
820 "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(
821 row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
822 .expect("Failed to get timestamptz"),
823 ),
824 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
825 "TIMESTAMPTZ" => Value::TimeDateTimeWithTimeZone(
826 row.try_get::<Option<time::OffsetDateTime>, _>(c.ordinal())
827 .expect("Failed to get timestamptz"),
828 ),
829
830 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
831 "TIMESTAMPTZ[]" => Value::Array(
832 sea_query::ArrayType::ChronoDateTimeUtc,
833 row.try_get::<Option<Vec<chrono::DateTime<chrono::Utc>>>, _>(
834 c.ordinal(),
835 )
836 .expect("Failed to get timestamptz array")
837 .map(|vals| {
838 Box::new(
839 vals.into_iter()
840 .map(|val| Value::ChronoDateTimeUtc(Some(val)))
841 .collect(),
842 )
843 }),
844 ),
845 #[cfg(all(
846 feature = "with-time",
847 not(feature = "with-chrono"),
848 feature = "postgres-array"
849 ))]
850 "TIMESTAMPTZ[]" => Value::Array(
851 sea_query::ArrayType::TimeDateTimeWithTimeZone,
852 row.try_get::<Option<Vec<time::OffsetDateTime>>, _>(c.ordinal())
853 .expect("Failed to get timestamptz array")
854 .map(|vals| {
855 Box::new(
856 vals.into_iter()
857 .map(|val| Value::TimeDateTimeWithTimeZone(Some(val)))
858 .collect(),
859 )
860 }),
861 ),
862
863 #[cfg(feature = "with-chrono")]
864 "TIMETZ" => Value::ChronoTime(
865 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
866 .expect("Failed to get timetz"),
867 ),
868 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
869 "TIMETZ" => {
870 Value::TimeTime(row.try_get(c.ordinal()).expect("Failed to get timetz"))
871 }
872
873 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
874 "TIMETZ[]" => Value::Array(
875 sea_query::ArrayType::ChronoTime,
876 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
877 .expect("Failed to get timetz array")
878 .map(|vals| {
879 Box::new(
880 vals.into_iter()
881 .map(|val| Value::ChronoTime(Some(val)))
882 .collect(),
883 )
884 }),
885 ),
886 #[cfg(all(
887 feature = "with-time",
888 not(feature = "with-chrono"),
889 feature = "postgres-array"
890 ))]
891 "TIMETZ[]" => Value::Array(
892 sea_query::ArrayType::TimeTime,
893 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
894 .expect("Failed to get timetz array")
895 .map(|vals| {
896 Box::new(
897 vals.into_iter()
898 .map(|val| Value::TimeTime(Some(val)))
899 .collect(),
900 )
901 }),
902 ),
903
904 #[cfg(feature = "with-uuid")]
905 "UUID" => Value::Uuid(
906 row.try_get::<Option<uuid::Uuid>, _>(c.ordinal())
907 .expect("Failed to get uuid"),
908 ),
909
910 #[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
911 "UUID[]" => Value::Array(
912 sea_query::ArrayType::Uuid,
913 row.try_get::<Option<Vec<uuid::Uuid>>, _>(c.ordinal())
914 .expect("Failed to get uuid array")
915 .map(|vals| {
916 Box::new(
917 vals.into_iter()
918 .map(|val| Value::Uuid(Some(val)))
919 .collect(),
920 )
921 }),
922 ),
923
924 _ => unreachable!("Unknown column type: {}", c.type_info().name()),
925 },
926 )
927 })
928 .collect(),
929 }
930}