1use futures_util::lock::Mutex;
2use log::LevelFilter;
3use sea_query::Values;
4use std::{fmt::Write, future::Future, pin::Pin, sync::Arc};
5
6use sqlx::{
7 Connection, Executor, PgPool, Postgres,
8 pool::PoolConnection,
9 postgres::{PgConnectOptions, PgQueryResult, PgRow},
10};
11
12use sea_query_sqlx::SqlxValues;
13use tracing::instrument;
14
15use crate::{
16 AccessMode, ConnectOptions, DatabaseConnection, DatabaseConnectionType, DatabaseTransaction,
17 IsolationLevel, QueryStream, Statement, TransactionError, debug_print, error::*, executor::*,
18};
19
20use super::sqlx_common::*;
21
22#[derive(Debug)]
24pub struct SqlxPostgresConnector;
25
26#[derive(Clone)]
28pub struct SqlxPostgresPoolConnection {
29 pub(crate) pool: PgPool,
30 metric_callback: Option<crate::metric::Callback>,
31}
32
33impl std::fmt::Debug for SqlxPostgresPoolConnection {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
36 }
37}
38
39impl From<PgPool> for SqlxPostgresPoolConnection {
40 fn from(pool: PgPool) -> Self {
41 SqlxPostgresPoolConnection {
42 pool,
43 metric_callback: None,
44 }
45 }
46}
47
48impl From<PgPool> for DatabaseConnection {
49 fn from(pool: PgPool) -> Self {
50 DatabaseConnectionType::SqlxPostgresPoolConnection(pool.into()).into()
51 }
52}
53
54impl SqlxPostgresConnector {
55 pub fn accepts(string: &str) -> bool {
57 string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
58 }
59
60 #[instrument(level = "trace")]
62 pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
63 let mut sqlx_opts = options
64 .url
65 .parse::<PgConnectOptions>()
66 .map_err(sqlx_error_to_conn_err)?;
67 use sqlx::ConnectOptions;
68 if !options.sqlx_logging {
69 sqlx_opts = sqlx_opts.disable_statement_logging();
70 } else {
71 sqlx_opts = sqlx_opts.log_statements(options.sqlx_logging_level);
72 if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
73 sqlx_opts = sqlx_opts.log_slow_statements(
74 options.sqlx_slow_statements_logging_level,
75 options.sqlx_slow_statements_logging_threshold,
76 );
77 }
78 }
79
80 if let Some(application_name) = &options.application_name {
81 sqlx_opts = sqlx_opts.application_name(application_name);
82 }
83
84 if let Some(timeout) = options.statement_timeout {
85 sqlx_opts = sqlx_opts.options([("statement_timeout", timeout.as_millis().to_string())]);
86 }
87
88 if let Some(f) = &options.pg_opts_fn {
89 sqlx_opts = f(sqlx_opts);
90 }
91
92 let set_search_path_sql = options.schema_search_path.as_ref().map(|schema| {
93 let mut string = "SET search_path = ".to_owned();
94 if schema.starts_with('"') {
95 write!(&mut string, "{schema}").expect("Infallible");
96 } else {
97 for (i, schema) in schema.split(',').enumerate() {
98 if i > 0 {
99 write!(&mut string, ",").expect("Infallible");
100 }
101 if schema.starts_with('"') {
102 write!(&mut string, "{schema}").expect("Infallible");
103 } else {
104 write!(&mut string, "\"{schema}\"").expect("Infallible");
105 }
106 }
107 }
108 string
109 });
110
111 let lazy = options.connect_lazy;
112 let after_connect = options.after_connect.clone();
113 let mut pool_options = options.sqlx_pool_options();
114
115 if let Some(sql) = set_search_path_sql {
116 pool_options = pool_options.after_connect(move |conn, _| {
117 let sql = sql.clone();
118 Box::pin(async move {
119 sqlx::Executor::execute(conn, sql.as_str())
120 .await
121 .map(|_| ())
122 })
123 });
124 }
125
126 let pool = if lazy {
127 pool_options.connect_lazy_with(sqlx_opts)
128 } else {
129 pool_options
130 .connect_with(sqlx_opts)
131 .await
132 .map_err(sqlx_error_to_conn_err)?
133 };
134
135 let conn: DatabaseConnection =
136 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
137 pool,
138 metric_callback: None,
139 })
140 .into();
141
142 if let Some(cb) = after_connect {
143 cb(conn.clone()).await?;
144 }
145
146 Ok(conn)
147 }
148}
149
150impl SqlxPostgresConnector {
151 pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
153 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
154 pool,
155 metric_callback: None,
156 })
157 .into()
158 }
159}
160
161impl SqlxPostgresPoolConnection {
162 #[instrument(level = "trace")]
164 pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
165 debug_print!("{}", stmt);
166
167 let query = sqlx_query(&stmt);
168 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
169 crate::metric::metric!(self.metric_callback, &stmt, {
170 match query.execute(&mut *conn).await {
171 Ok(res) => Ok(res.into()),
172 Err(err) => Err(sqlx_error_to_exec_err(err)),
173 }
174 })
175 }
176
177 #[instrument(level = "trace")]
179 pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
180 debug_print!("{}", sql);
181
182 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
183 match conn.execute(sql).await {
184 Ok(res) => Ok(res.into()),
185 Err(err) => Err(sqlx_error_to_exec_err(err)),
186 }
187 }
188
189 #[instrument(level = "trace")]
191 pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
192 debug_print!("{}", stmt);
193
194 let query = sqlx_query(&stmt);
195 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
196 crate::metric::metric!(self.metric_callback, &stmt, {
197 match query.fetch_one(&mut *conn).await {
198 Ok(row) => Ok(Some(row.into())),
199 Err(err) => match err {
200 sqlx::Error::RowNotFound => Ok(None),
201 _ => Err(sqlx_error_to_query_err(err)),
202 },
203 }
204 })
205 }
206
207 #[instrument(level = "trace")]
209 pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
210 debug_print!("{}", stmt);
211
212 let query = sqlx_query(&stmt);
213 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
214 crate::metric::metric!(self.metric_callback, &stmt, {
215 match query.fetch_all(&mut *conn).await {
216 Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
217 Err(err) => Err(sqlx_error_to_query_err(err)),
218 }
219 })
220 }
221
222 #[instrument(level = "trace")]
224 pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
225 debug_print!("{}", stmt);
226
227 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
228 Ok(QueryStream::from((
229 conn,
230 stmt,
231 self.metric_callback.clone(),
232 )))
233 }
234
235 #[instrument(level = "trace")]
237 pub async fn begin(
238 &self,
239 isolation_level: Option<IsolationLevel>,
240 access_mode: Option<AccessMode>,
241 ) -> Result<DatabaseTransaction, DbErr> {
242 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
243 DatabaseTransaction::new_postgres(
244 conn,
245 self.metric_callback.clone(),
246 isolation_level,
247 access_mode,
248 )
249 .await
250 }
251
252 #[instrument(level = "trace", skip(callback))]
254 pub async fn transaction<F, T, E>(
255 &self,
256 callback: F,
257 isolation_level: Option<IsolationLevel>,
258 access_mode: Option<AccessMode>,
259 ) -> Result<T, TransactionError<E>>
260 where
261 F: for<'b> FnOnce(
262 &'b DatabaseTransaction,
263 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
264 + Send,
265 T: Send,
266 E: std::fmt::Display + std::fmt::Debug + Send,
267 {
268 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
269 let transaction = DatabaseTransaction::new_postgres(
270 conn,
271 self.metric_callback.clone(),
272 isolation_level,
273 access_mode,
274 )
275 .await
276 .map_err(|e| TransactionError::Connection(e))?;
277 transaction.run(callback).await
278 }
279
280 pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
281 where
282 F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
283 {
284 self.metric_callback = Some(Arc::new(callback));
285 }
286
287 pub async fn ping(&self) -> Result<(), DbErr> {
289 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
290 match conn.ping().await {
291 Ok(_) => Ok(()),
292 Err(err) => Err(sqlx_error_to_conn_err(err)),
293 }
294 }
295
296 pub async fn close(self) -> Result<(), DbErr> {
299 self.close_by_ref().await
300 }
301
302 pub async fn close_by_ref(&self) -> Result<(), DbErr> {
304 self.pool.close().await;
305 Ok(())
306 }
307}
308
309impl From<PgRow> for QueryResult {
310 fn from(row: PgRow) -> QueryResult {
311 QueryResult {
312 row: QueryResultRow::SqlxPostgres(row),
313 }
314 }
315}
316
317impl From<PgQueryResult> for ExecResult {
318 fn from(result: PgQueryResult) -> ExecResult {
319 ExecResult {
320 result: ExecResultHolder::SqlxPostgres(result),
321 }
322 }
323}
324
325pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, SqlxValues> {
326 let values = stmt
327 .values
328 .as_ref()
329 .map_or(Values(Vec::new()), |values| values.clone());
330 sqlx::query_with(&stmt.sql, SqlxValues(values))
331}
332
333pub(crate) async fn set_transaction_config(
334 conn: &mut PoolConnection<Postgres>,
335 isolation_level: Option<IsolationLevel>,
336 access_mode: Option<AccessMode>,
337) -> Result<(), DbErr> {
338 let mut settings = Vec::new();
339
340 if let Some(isolation_level) = isolation_level {
341 settings.push(format!("ISOLATION LEVEL {isolation_level}"));
342 }
343
344 if let Some(access_mode) = access_mode {
345 settings.push(access_mode.to_string());
346 }
347
348 if !settings.is_empty() {
349 let sql = format!("SET TRANSACTION {}", settings.join(" "));
350 sqlx::query(&sql)
351 .execute(&mut **conn)
352 .await
353 .map_err(sqlx_error_to_exec_err)?;
354 }
355 Ok(())
356}
357
358impl
359 From<(
360 PoolConnection<sqlx::Postgres>,
361 Statement,
362 Option<crate::metric::Callback>,
363 )> for crate::QueryStream
364{
365 fn from(
366 (conn, stmt, metric_callback): (
367 PoolConnection<sqlx::Postgres>,
368 Statement,
369 Option<crate::metric::Callback>,
370 ),
371 ) -> Self {
372 crate::QueryStream::build(
373 stmt,
374 crate::InnerConnection::Postgres(conn),
375 metric_callback,
376 )
377 }
378}
379
380impl crate::DatabaseTransaction {
381 pub(crate) async fn new_postgres(
382 inner: PoolConnection<sqlx::Postgres>,
383 metric_callback: Option<crate::metric::Callback>,
384 isolation_level: Option<IsolationLevel>,
385 access_mode: Option<AccessMode>,
386 ) -> Result<crate::DatabaseTransaction, DbErr> {
387 Self::begin(
388 Arc::new(Mutex::new(crate::InnerConnection::Postgres(inner))),
389 crate::DbBackend::Postgres,
390 metric_callback,
391 isolation_level,
392 access_mode,
393 None,
394 )
395 .await
396 }
397}
398
399#[cfg(feature = "proxy")]
400pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> crate::ProxyRow {
401 use sea_query::Value;
404 use sqlx::{Column, Row, TypeInfo};
405 crate::ProxyRow {
406 values: row
407 .columns()
408 .iter()
409 .map(|c| {
410 (
411 c.name().to_string(),
412 match c.type_info().name() {
413 "BOOL" => {
414 Value::Bool(row.try_get(c.ordinal()).expect("Failed to get boolean"))
415 }
416 #[cfg(feature = "postgres-array")]
417 "BOOL[]" => Value::Array(
418 sea_query::ArrayType::Bool,
419 row.try_get::<Option<Vec<bool>>, _>(c.ordinal())
420 .expect("Failed to get boolean array")
421 .map(|vals| {
422 Box::new(
423 vals.into_iter()
424 .map(|val| Value::Bool(Some(val)))
425 .collect(),
426 )
427 }),
428 ),
429
430 "\"CHAR\"" => Value::TinyInt(
431 row.try_get(c.ordinal())
432 .expect("Failed to get small integer"),
433 ),
434 #[cfg(feature = "postgres-array")]
435 "\"CHAR\"[]" => Value::Array(
436 sea_query::ArrayType::TinyInt,
437 row.try_get::<Option<Vec<i8>>, _>(c.ordinal())
438 .expect("Failed to get small integer array")
439 .map(|vals: Vec<i8>| {
440 Box::new(
441 vals.into_iter()
442 .map(|val| Value::TinyInt(Some(val)))
443 .collect(),
444 )
445 }),
446 ),
447
448 "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(
449 row.try_get(c.ordinal())
450 .expect("Failed to get small integer"),
451 ),
452 #[cfg(feature = "postgres-array")]
453 "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
454 sea_query::ArrayType::SmallInt,
455 row.try_get::<Option<Vec<i16>>, _>(c.ordinal())
456 .expect("Failed to get small integer array")
457 .map(|vals: Vec<i16>| {
458 Box::new(
459 vals.into_iter()
460 .map(|val| Value::SmallInt(Some(val)))
461 .collect(),
462 )
463 }),
464 ),
465
466 "INT" | "SERIAL" | "INT4" => {
467 Value::Int(row.try_get(c.ordinal()).expect("Failed to get integer"))
468 }
469 #[cfg(feature = "postgres-array")]
470 "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
471 sea_query::ArrayType::Int,
472 row.try_get::<Option<Vec<i32>>, _>(c.ordinal())
473 .expect("Failed to get integer array")
474 .map(|vals: Vec<i32>| {
475 Box::new(
476 vals.into_iter().map(|val| Value::Int(Some(val))).collect(),
477 )
478 }),
479 ),
480
481 "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(
482 row.try_get(c.ordinal()).expect("Failed to get big integer"),
483 ),
484 #[cfg(feature = "postgres-array")]
485 "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
486 sea_query::ArrayType::BigInt,
487 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
488 .expect("Failed to get big integer array")
489 .map(|vals: Vec<i64>| {
490 Box::new(
491 vals.into_iter()
492 .map(|val| Value::BigInt(Some(val)))
493 .collect(),
494 )
495 }),
496 ),
497
498 "FLOAT4" | "REAL" => {
499 Value::Float(row.try_get(c.ordinal()).expect("Failed to get float"))
500 }
501 #[cfg(feature = "postgres-array")]
502 "FLOAT4[]" | "REAL[]" => Value::Array(
503 sea_query::ArrayType::Float,
504 row.try_get::<Option<Vec<f32>>, _>(c.ordinal())
505 .expect("Failed to get float array")
506 .map(|vals| {
507 Box::new(
508 vals.into_iter()
509 .map(|val| Value::Float(Some(val)))
510 .collect(),
511 )
512 }),
513 ),
514
515 "FLOAT8" | "DOUBLE PRECISION" => {
516 Value::Double(row.try_get(c.ordinal()).expect("Failed to get double"))
517 }
518 #[cfg(feature = "postgres-array")]
519 "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
520 sea_query::ArrayType::Double,
521 row.try_get::<Option<Vec<f64>>, _>(c.ordinal())
522 .expect("Failed to get double array")
523 .map(|vals| {
524 Box::new(
525 vals.into_iter()
526 .map(|val| Value::Double(Some(val)))
527 .collect(),
528 )
529 }),
530 ),
531
532 "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(
533 row.try_get::<Option<String>, _>(c.ordinal())
534 .expect("Failed to get string"),
535 ),
536 #[cfg(feature = "postgres-array")]
537 "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
538 sea_query::ArrayType::String,
539 row.try_get::<Option<Vec<String>>, _>(c.ordinal())
540 .expect("Failed to get string array")
541 .map(|vals| {
542 Box::new(
543 vals.into_iter()
544 .map(|val| Value::String(Some(val)))
545 .collect(),
546 )
547 }),
548 ),
549
550 "BYTEA" => Value::Bytes(
551 row.try_get::<Option<Vec<u8>>, _>(c.ordinal())
552 .expect("Failed to get bytes"),
553 ),
554 #[cfg(feature = "postgres-array")]
555 "BYTEA[]" => Value::Array(
556 sea_query::ArrayType::Bytes,
557 row.try_get::<Option<Vec<Vec<u8>>>, _>(c.ordinal())
558 .expect("Failed to get bytes array")
559 .map(|vals| {
560 Box::new(
561 vals.into_iter()
562 .map(|val| Value::Bytes(Some(val)))
563 .collect(),
564 )
565 }),
566 ),
567
568 #[cfg(feature = "with-bigdecimal")]
569 "NUMERIC" => Value::BigDecimal(
570 row.try_get::<Option<bigdecimal::BigDecimal>, _>(c.ordinal())
571 .expect("Failed to get numeric"),
572 ),
573 #[cfg(all(
574 feature = "with-rust_decimal",
575 not(feature = "with-bigdecimal")
576 ))]
577 "NUMERIC" => {
578 Value::Decimal(row.try_get(c.ordinal()).expect("Failed to get numeric"))
579 }
580
581 #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
582 "NUMERIC[]" => Value::Array(
583 sea_query::ArrayType::BigDecimal,
584 row.try_get::<Option<Vec<bigdecimal::BigDecimal>>, _>(c.ordinal())
585 .expect("Failed to get numeric array")
586 .map(|vals| {
587 Box::new(
588 vals.into_iter()
589 .map(|val| Value::BigDecimal(Some(val)))
590 .collect(),
591 )
592 }),
593 ),
594 #[cfg(all(
595 feature = "with-rust_decimal",
596 not(feature = "with-bigdecimal"),
597 feature = "postgres-array"
598 ))]
599 "NUMERIC[]" => Value::Array(
600 sea_query::ArrayType::Decimal,
601 row.try_get::<Option<Vec<rust_decimal::Decimal>>, _>(c.ordinal())
602 .expect("Failed to get numeric array")
603 .map(|vals| {
604 Box::new(
605 vals.into_iter()
606 .map(|val| Value::Decimal(Some(val)))
607 .collect(),
608 )
609 }),
610 ),
611
612 "OID" => {
613 Value::BigInt(row.try_get(c.ordinal()).expect("Failed to get oid"))
614 }
615 #[cfg(feature = "postgres-array")]
616 "OID[]" => Value::Array(
617 sea_query::ArrayType::BigInt,
618 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
619 .expect("Failed to get oid array")
620 .map(|vals| {
621 Box::new(
622 vals.into_iter()
623 .map(|val| Value::BigInt(Some(val)))
624 .collect(),
625 )
626 }),
627 ),
628
629 #[cfg(feature = "with-json")]
630 "JSON" | "JSONB" => Value::Json(
631 row.try_get::<Option<serde_json::Value>, _>(c.ordinal())
632 .expect("Failed to get json")
633 .map(Box::new),
634 ),
635 #[cfg(all(
636 feature = "with-json",
637 any(feature = "json-array", feature = "postgres-array")
638 ))]
639 "JSON[]" | "JSONB[]" => Value::Array(
640 sea_query::ArrayType::Json,
641 row.try_get::<Option<Vec<serde_json::Value>>, _>(c.ordinal())
642 .expect("Failed to get json array")
643 .map(|vals| {
644 Box::new(
645 vals.into_iter()
646 .map(|val| Value::Json(Some(Box::new(val))))
647 .collect(),
648 )
649 }),
650 ),
651
652 #[cfg(feature = "with-ipnetwork")]
653 "INET" | "CIDR" => Value::IpNetwork(
654 row.try_get::<Option<ipnetwork::IpNetwork>, _>(c.ordinal())
655 .expect("Failed to get ip address"),
656 ),
657 #[cfg(feature = "with-ipnetwork")]
658 "INET[]" | "CIDR[]" => Value::Array(
659 sea_query::ArrayType::IpNetwork,
660 row.try_get::<Option<Vec<ipnetwork::IpNetwork>>, _>(c.ordinal())
661 .expect("Failed to get ip address array")
662 .map(|vals| {
663 Box::new(
664 vals.into_iter()
665 .map(|val| Value::IpNetwork(Some(val)))
666 .collect(),
667 )
668 }),
669 ),
670
671 #[cfg(feature = "with-mac_address")]
672 "MACADDR" | "MACADDR8" => Value::MacAddress(
673 row.try_get::<Option<mac_address::MacAddress>, _>(c.ordinal())
674 .expect("Failed to get mac address"),
675 ),
676 #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
677 "MACADDR[]" | "MACADDR8[]" => Value::Array(
678 sea_query::ArrayType::MacAddress,
679 row.try_get::<Option<Vec<mac_address::MacAddress>>, _>(c.ordinal())
680 .expect("Failed to get mac address array")
681 .map(|vals| {
682 Box::new(
683 vals.into_iter()
684 .map(|val| Value::MacAddress(Some(val)))
685 .collect(),
686 )
687 }),
688 ),
689
690 #[cfg(feature = "with-chrono")]
691 "TIMESTAMP" => Value::ChronoDateTime(
692 row.try_get::<Option<chrono::NaiveDateTime>, _>(c.ordinal())
693 .expect("Failed to get timestamp"),
694 ),
695 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
696 "TIMESTAMP" => Value::TimeDateTime(
697 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
698 .expect("Failed to get timestamp"),
699 ),
700
701 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
702 "TIMESTAMP[]" => Value::Array(
703 sea_query::ArrayType::ChronoDateTime,
704 row.try_get::<Option<Vec<chrono::NaiveDateTime>>, _>(c.ordinal())
705 .expect("Failed to get timestamp array")
706 .map(|vals| {
707 Box::new(
708 vals.into_iter()
709 .map(|val| Value::ChronoDateTime(Some(val)))
710 .collect(),
711 )
712 }),
713 ),
714 #[cfg(all(
715 feature = "with-time",
716 not(feature = "with-chrono"),
717 feature = "postgres-array"
718 ))]
719 "TIMESTAMP[]" => Value::Array(
720 sea_query::ArrayType::TimeDateTime,
721 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
722 .expect("Failed to get timestamp array")
723 .map(|vals| {
724 Box::new(
725 vals.into_iter()
726 .map(|val| Value::TimeDateTime(Some(val)))
727 .collect(),
728 )
729 }),
730 ),
731
732 #[cfg(feature = "with-chrono")]
733 "DATE" => Value::ChronoDate(
734 row.try_get::<Option<chrono::NaiveDate>, _>(c.ordinal())
735 .expect("Failed to get date"),
736 ),
737 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
738 "DATE" => Value::TimeDate(
739 row.try_get::<Option<time::Date>, _>(c.ordinal())
740 .expect("Failed to get date"),
741 ),
742
743 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
744 "DATE[]" => Value::Array(
745 sea_query::ArrayType::ChronoDate,
746 row.try_get::<Option<Vec<chrono::NaiveDate>>, _>(c.ordinal())
747 .expect("Failed to get date array")
748 .map(|vals| {
749 Box::new(
750 vals.into_iter()
751 .map(|val| Value::ChronoDate(Some(val)))
752 .collect(),
753 )
754 }),
755 ),
756 #[cfg(all(
757 feature = "with-time",
758 not(feature = "with-chrono"),
759 feature = "postgres-array"
760 ))]
761 "DATE[]" => Value::Array(
762 sea_query::ArrayType::TimeDate,
763 row.try_get::<Option<Vec<time::Date>>, _>(c.ordinal())
764 .expect("Failed to get date array")
765 .map(|vals| {
766 Box::new(
767 vals.into_iter()
768 .map(|val| Value::TimeDate(Some(val)))
769 .collect(),
770 )
771 }),
772 ),
773
774 #[cfg(feature = "with-chrono")]
775 "TIME" => Value::ChronoTime(
776 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
777 .expect("Failed to get time"),
778 ),
779 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
780 "TIME" => Value::TimeTime(
781 row.try_get::<Option<time::Time>, _>(c.ordinal())
782 .expect("Failed to get time"),
783 ),
784
785 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
786 "TIME[]" => Value::Array(
787 sea_query::ArrayType::ChronoTime,
788 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
789 .expect("Failed to get time array")
790 .map(|vals| {
791 Box::new(
792 vals.into_iter()
793 .map(|val| Value::ChronoTime(Some(val)))
794 .collect(),
795 )
796 }),
797 ),
798 #[cfg(all(
799 feature = "with-time",
800 not(feature = "with-chrono"),
801 feature = "postgres-array"
802 ))]
803 "TIME[]" => Value::Array(
804 sea_query::ArrayType::TimeTime,
805 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
806 .expect("Failed to get time array")
807 .map(|vals| {
808 Box::new(
809 vals.into_iter()
810 .map(|val| Value::TimeTime(Some(val)))
811 .collect(),
812 )
813 }),
814 ),
815
816 #[cfg(feature = "with-chrono")]
817 "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(
818 row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
819 .expect("Failed to get timestamptz"),
820 ),
821 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
822 "TIMESTAMPTZ" => Value::TimeDateTime(
823 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
824 .expect("Failed to get timestamptz"),
825 ),
826
827 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
828 "TIMESTAMPTZ[]" => Value::Array(
829 sea_query::ArrayType::ChronoDateTimeUtc,
830 row.try_get::<Option<Vec<chrono::DateTime<chrono::Utc>>>, _>(
831 c.ordinal(),
832 )
833 .expect("Failed to get timestamptz array")
834 .map(|vals| {
835 Box::new(
836 vals.into_iter()
837 .map(|val| Value::ChronoDateTimeUtc(Some(val)))
838 .collect(),
839 )
840 }),
841 ),
842 #[cfg(all(
843 feature = "with-time",
844 not(feature = "with-chrono"),
845 feature = "postgres-array"
846 ))]
847 "TIMESTAMPTZ[]" => Value::Array(
848 sea_query::ArrayType::TimeDateTime,
849 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
850 .expect("Failed to get timestamptz array")
851 .map(|vals| {
852 Box::new(
853 vals.into_iter()
854 .map(|val| Value::TimeDateTime(Some(val)))
855 .collect(),
856 )
857 }),
858 ),
859
860 #[cfg(feature = "with-chrono")]
861 "TIMETZ" => Value::ChronoTime(
862 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
863 .expect("Failed to get timetz"),
864 ),
865 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
866 "TIMETZ" => {
867 Value::TimeTime(row.try_get(c.ordinal()).expect("Failed to get timetz"))
868 }
869
870 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
871 "TIMETZ[]" => Value::Array(
872 sea_query::ArrayType::ChronoTime,
873 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
874 .expect("Failed to get timetz array")
875 .map(|vals| {
876 Box::new(
877 vals.into_iter()
878 .map(|val| Value::ChronoTime(Some(val)))
879 .collect(),
880 )
881 }),
882 ),
883 #[cfg(all(
884 feature = "with-time",
885 not(feature = "with-chrono"),
886 feature = "postgres-array"
887 ))]
888 "TIMETZ[]" => Value::Array(
889 sea_query::ArrayType::TimeTime,
890 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
891 .expect("Failed to get timetz array")
892 .map(|vals| {
893 Box::new(
894 vals.into_iter()
895 .map(|val| Value::TimeTime(Some(val)))
896 .collect(),
897 )
898 }),
899 ),
900
901 #[cfg(feature = "with-uuid")]
902 "UUID" => Value::Uuid(
903 row.try_get::<Option<uuid::Uuid>, _>(c.ordinal())
904 .expect("Failed to get uuid"),
905 ),
906
907 #[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
908 "UUID[]" => Value::Array(
909 sea_query::ArrayType::Uuid,
910 row.try_get::<Option<Vec<uuid::Uuid>>, _>(c.ordinal())
911 .expect("Failed to get uuid array")
912 .map(|vals| {
913 Box::new(
914 vals.into_iter()
915 .map(|val| Value::Uuid(Some(val)))
916 .collect(),
917 )
918 }),
919 ),
920
921 _ => unreachable!("Unknown column type: {}", c.type_info().name()),
922 },
923 )
924 })
925 .collect(),
926 }
927}