1use futures_util::lock::Mutex;
2use log::LevelFilter;
3use sea_query::Values;
4use std::{fmt::Write, future::Future, pin::Pin, sync::Arc};
5
6use sqlx::{
7 Connection, Executor, PgPool, Postgres,
8 pool::PoolConnection,
9 postgres::{PgConnectOptions, PgQueryResult, PgRow},
10};
11
12use sea_query_sqlx::SqlxValues;
13use tracing::instrument;
14
15use crate::{
16 AccessMode, ConnectOptions, DatabaseConnection, DatabaseConnectionType, DatabaseTransaction,
17 IsolationLevel, QueryStream, Statement, TransactionError, debug_print, error::*, executor::*,
18};
19
20use super::sqlx_common::*;
21
22#[derive(Debug)]
24pub struct SqlxPostgresConnector;
25
26#[derive(Clone)]
28pub struct SqlxPostgresPoolConnection {
29 pub(crate) pool: PgPool,
30 metric_callback: Option<crate::metric::Callback>,
31 pub(crate) record_stmt_in_spans: bool,
32}
33
34impl std::fmt::Debug for SqlxPostgresPoolConnection {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
37 }
38}
39
40impl From<PgPool> for SqlxPostgresPoolConnection {
41 fn from(pool: PgPool) -> Self {
42 SqlxPostgresPoolConnection {
43 pool,
44 metric_callback: None,
45 record_stmt_in_spans: true,
46 }
47 }
48}
49
50impl From<PgPool> for DatabaseConnection {
51 fn from(pool: PgPool) -> Self {
52 DatabaseConnectionType::SqlxPostgresPoolConnection(pool.into()).into()
53 }
54}
55
56impl SqlxPostgresConnector {
57 pub fn accepts(string: &str) -> bool {
59 string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
60 }
61
62 #[instrument(level = "trace")]
64 pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
65 let record_stmt_in_spans = options.get_record_stmt_in_spans();
66 let mut sqlx_opts = options
67 .url
68 .parse::<PgConnectOptions>()
69 .map_err(sqlx_error_to_conn_err)?;
70 use sqlx::ConnectOptions;
71 if !options.sqlx_logging {
72 sqlx_opts = sqlx_opts.disable_statement_logging();
73 } else {
74 sqlx_opts = sqlx_opts.log_statements(options.sqlx_logging_level);
75 if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
76 sqlx_opts = sqlx_opts.log_slow_statements(
77 options.sqlx_slow_statements_logging_level,
78 options.sqlx_slow_statements_logging_threshold,
79 );
80 }
81 }
82
83 if let Some(application_name) = &options.application_name {
84 sqlx_opts = sqlx_opts.application_name(application_name);
85 }
86
87 if let Some(timeout) = options.statement_timeout {
88 sqlx_opts = sqlx_opts.options([("statement_timeout", timeout.as_millis().to_string())]);
89 }
90
91 if let Some(f) = &options.pg_opts_fn {
92 sqlx_opts = f(sqlx_opts);
93 }
94
95 let set_search_path_sql = options.schema_search_path.as_ref().map(|schema| {
96 let mut string = "SET search_path = ".to_owned();
97 if schema.starts_with('"') {
98 write!(&mut string, "{schema}").expect("Infallible");
99 } else {
100 for (i, schema) in schema.split(',').enumerate() {
101 if i > 0 {
102 write!(&mut string, ",").expect("Infallible");
103 }
104 if schema.starts_with('"') {
105 write!(&mut string, "{schema}").expect("Infallible");
106 } else {
107 write!(&mut string, "\"{schema}\"").expect("Infallible");
108 }
109 }
110 }
111 string
112 });
113
114 let lazy = options.connect_lazy;
115 let after_connect = options.after_connect.clone();
116 let pg_pool_opts_fn = options.pg_pool_opts_fn.clone();
117 let mut pool_options = options.sqlx_pool_options();
118
119 if let Some(sql) = set_search_path_sql {
120 pool_options = pool_options.after_connect(move |conn, _| {
121 let sql = sql.clone();
122 Box::pin(async move {
123 sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql))
124 .await
125 .map(|_| ())
126 })
127 });
128 }
129 if let Some(f) = &pg_pool_opts_fn {
130 pool_options = f(pool_options);
131 }
132 let pool = if lazy {
133 pool_options.connect_lazy_with(sqlx_opts)
134 } else {
135 pool_options
136 .connect_with(sqlx_opts)
137 .await
138 .map_err(sqlx_error_to_conn_err)?
139 };
140
141 let conn: DatabaseConnection =
142 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
143 pool,
144 metric_callback: None,
145 record_stmt_in_spans,
146 })
147 .into();
148
149 if let Some(cb) = after_connect {
150 cb(conn.clone()).await?;
151 }
152
153 Ok(conn)
154 }
155}
156
157impl SqlxPostgresConnector {
158 pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
160 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
161 pool,
162 metric_callback: None,
163 record_stmt_in_spans: true,
164 })
165 .into()
166 }
167}
168
169impl SqlxPostgresPoolConnection {
170 #[instrument(level = "trace")]
172 pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
173 debug_print!("{}", stmt);
174
175 let query = sqlx_query(&stmt);
176 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
177 crate::metric::metric!(self.metric_callback, &stmt, {
178 match query.execute(&mut *conn).await {
179 Ok(res) => Ok(res.into()),
180 Err(err) => Err(sqlx_error_to_exec_err(err)),
181 }
182 })
183 }
184
185 #[instrument(level = "trace")]
187 pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
188 debug_print!("{}", sql);
189
190 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
191 match conn.execute(sqlx::AssertSqlSafe(sql.to_owned())).await {
192 Ok(res) => Ok(res.into()),
193 Err(err) => Err(sqlx_error_to_exec_err(err)),
194 }
195 }
196
197 #[instrument(level = "trace")]
199 pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
200 debug_print!("{}", stmt);
201
202 let query = sqlx_query(&stmt);
203 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
204 crate::metric::metric!(self.metric_callback, &stmt, {
205 match query.fetch_one(&mut *conn).await {
206 Ok(row) => Ok(Some(row.into())),
207 Err(err) => match err {
208 sqlx::Error::RowNotFound => Ok(None),
209 _ => Err(sqlx_error_to_query_err(err)),
210 },
211 }
212 })
213 }
214
215 #[instrument(level = "trace")]
217 pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
218 debug_print!("{}", stmt);
219
220 let query = sqlx_query(&stmt);
221 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
222 crate::metric::metric!(self.metric_callback, &stmt, {
223 match query.fetch_all(&mut *conn).await {
224 Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
225 Err(err) => Err(sqlx_error_to_query_err(err)),
226 }
227 })
228 }
229
230 #[instrument(level = "trace")]
232 pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
233 debug_print!("{}", stmt);
234
235 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
236 Ok(QueryStream::from((
237 conn,
238 stmt,
239 self.metric_callback.clone(),
240 )))
241 }
242
243 #[instrument(level = "trace")]
245 pub async fn begin(
246 &self,
247 isolation_level: Option<IsolationLevel>,
248 access_mode: Option<AccessMode>,
249 ) -> Result<DatabaseTransaction, DbErr> {
250 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
251 DatabaseTransaction::new_postgres(
252 conn,
253 self.metric_callback.clone(),
254 self.record_stmt_in_spans,
255 isolation_level,
256 access_mode,
257 )
258 .await
259 }
260
261 #[instrument(level = "trace", skip(callback))]
263 pub async fn transaction<F, T, E>(
264 &self,
265 callback: F,
266 isolation_level: Option<IsolationLevel>,
267 access_mode: Option<AccessMode>,
268 ) -> Result<T, TransactionError<E>>
269 where
270 F: for<'b> FnOnce(
271 &'b DatabaseTransaction,
272 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
273 + Send,
274 T: Send,
275 E: std::fmt::Display + std::fmt::Debug + Send,
276 {
277 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
278 let transaction = DatabaseTransaction::new_postgres(
279 conn,
280 self.metric_callback.clone(),
281 self.record_stmt_in_spans,
282 isolation_level,
283 access_mode,
284 )
285 .await
286 .map_err(|e| TransactionError::Connection(e))?;
287 transaction.run(callback).await
288 }
289
290 pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
291 where
292 F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
293 {
294 self.metric_callback = Some(Arc::new(callback));
295 }
296
297 pub async fn ping(&self) -> Result<(), DbErr> {
299 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
300 match conn.ping().await {
301 Ok(_) => Ok(()),
302 Err(err) => Err(sqlx_error_to_conn_err(err)),
303 }
304 }
305
306 pub async fn close(self) -> Result<(), DbErr> {
309 self.close_by_ref().await
310 }
311
312 pub async fn close_by_ref(&self) -> Result<(), DbErr> {
314 self.pool.close().await;
315 Ok(())
316 }
317}
318
319impl From<PgRow> for QueryResult {
320 fn from(row: PgRow) -> QueryResult {
321 QueryResult {
322 row: QueryResultRow::SqlxPostgres(row),
323 }
324 }
325}
326
327impl From<PgQueryResult> for ExecResult {
328 fn from(result: PgQueryResult) -> ExecResult {
329 ExecResult {
330 result: ExecResultHolder::SqlxPostgres(result),
331 }
332 }
333}
334
335pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, SqlxValues> {
336 let values = stmt
337 .values
338 .as_ref()
339 .map_or(Values(Vec::new()), |values| values.clone());
340 sqlx::query_with(sqlx::AssertSqlSafe(stmt.sql.as_str()), SqlxValues(values))
341}
342
343pub(crate) async fn set_transaction_config(
344 conn: &mut PoolConnection<Postgres>,
345 isolation_level: Option<IsolationLevel>,
346 access_mode: Option<AccessMode>,
347) -> Result<(), DbErr> {
348 let mut settings = Vec::new();
349
350 if let Some(isolation_level) = isolation_level {
351 settings.push(format!("ISOLATION LEVEL {isolation_level}"));
352 }
353
354 if let Some(access_mode) = access_mode {
355 settings.push(access_mode.to_string());
356 }
357
358 if !settings.is_empty() {
359 let sql = format!("SET TRANSACTION {}", settings.join(" "));
360 sqlx::query(sqlx::AssertSqlSafe(sql))
361 .execute(&mut **conn)
362 .await
363 .map_err(sqlx_error_to_exec_err)?;
364 }
365 Ok(())
366}
367
368impl
369 From<(
370 PoolConnection<sqlx::Postgres>,
371 Statement,
372 Option<crate::metric::Callback>,
373 )> for crate::QueryStream
374{
375 fn from(
376 (conn, stmt, metric_callback): (
377 PoolConnection<sqlx::Postgres>,
378 Statement,
379 Option<crate::metric::Callback>,
380 ),
381 ) -> Self {
382 crate::QueryStream::build(
383 stmt,
384 crate::InnerConnection::Postgres(conn),
385 metric_callback,
386 )
387 }
388}
389
390impl crate::DatabaseTransaction {
391 pub(crate) async fn new_postgres(
392 inner: PoolConnection<sqlx::Postgres>,
393 metric_callback: Option<crate::metric::Callback>,
394 record_stmt_in_spans: bool,
395 isolation_level: Option<IsolationLevel>,
396 access_mode: Option<AccessMode>,
397 ) -> Result<crate::DatabaseTransaction, DbErr> {
398 Self::begin(
399 Arc::new(Mutex::new(crate::InnerConnection::Postgres(inner))),
400 crate::DbBackend::Postgres,
401 metric_callback,
402 record_stmt_in_spans,
403 isolation_level,
404 access_mode,
405 None,
406 )
407 .await
408 }
409}
410
411#[cfg(feature = "proxy")]
412pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> crate::ProxyRow {
413 use sea_query::Value;
416 use sqlx::{Column, Row, TypeInfo};
417 crate::ProxyRow {
418 values: row
419 .columns()
420 .iter()
421 .map(|c| {
422 (
423 c.name().to_string(),
424 match c.type_info().name() {
425 "BOOL" => {
426 Value::Bool(row.try_get(c.ordinal()).expect("Failed to get boolean"))
427 }
428 #[cfg(feature = "postgres-array")]
429 "BOOL[]" => Value::Array(
430 sea_query::ArrayType::Bool,
431 row.try_get::<Option<Vec<bool>>, _>(c.ordinal())
432 .expect("Failed to get boolean array")
433 .map(|vals| {
434 Box::new(
435 vals.into_iter()
436 .map(|val| Value::Bool(Some(val)))
437 .collect(),
438 )
439 }),
440 ),
441
442 "\"CHAR\"" => Value::TinyInt(
443 row.try_get(c.ordinal())
444 .expect("Failed to get small integer"),
445 ),
446 #[cfg(feature = "postgres-array")]
447 "\"CHAR\"[]" => Value::Array(
448 sea_query::ArrayType::TinyInt,
449 row.try_get::<Option<Vec<i8>>, _>(c.ordinal())
450 .expect("Failed to get small integer array")
451 .map(|vals: Vec<i8>| {
452 Box::new(
453 vals.into_iter()
454 .map(|val| Value::TinyInt(Some(val)))
455 .collect(),
456 )
457 }),
458 ),
459
460 "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(
461 row.try_get(c.ordinal())
462 .expect("Failed to get small integer"),
463 ),
464 #[cfg(feature = "postgres-array")]
465 "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
466 sea_query::ArrayType::SmallInt,
467 row.try_get::<Option<Vec<i16>>, _>(c.ordinal())
468 .expect("Failed to get small integer array")
469 .map(|vals: Vec<i16>| {
470 Box::new(
471 vals.into_iter()
472 .map(|val| Value::SmallInt(Some(val)))
473 .collect(),
474 )
475 }),
476 ),
477
478 "INT" | "SERIAL" | "INT4" => {
479 Value::Int(row.try_get(c.ordinal()).expect("Failed to get integer"))
480 }
481 #[cfg(feature = "postgres-array")]
482 "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
483 sea_query::ArrayType::Int,
484 row.try_get::<Option<Vec<i32>>, _>(c.ordinal())
485 .expect("Failed to get integer array")
486 .map(|vals: Vec<i32>| {
487 Box::new(
488 vals.into_iter().map(|val| Value::Int(Some(val))).collect(),
489 )
490 }),
491 ),
492
493 "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(
494 row.try_get(c.ordinal()).expect("Failed to get big integer"),
495 ),
496 #[cfg(feature = "postgres-array")]
497 "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
498 sea_query::ArrayType::BigInt,
499 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
500 .expect("Failed to get big integer array")
501 .map(|vals: Vec<i64>| {
502 Box::new(
503 vals.into_iter()
504 .map(|val| Value::BigInt(Some(val)))
505 .collect(),
506 )
507 }),
508 ),
509
510 "FLOAT4" | "REAL" => {
511 Value::Float(row.try_get(c.ordinal()).expect("Failed to get float"))
512 }
513 #[cfg(feature = "postgres-array")]
514 "FLOAT4[]" | "REAL[]" => Value::Array(
515 sea_query::ArrayType::Float,
516 row.try_get::<Option<Vec<f32>>, _>(c.ordinal())
517 .expect("Failed to get float array")
518 .map(|vals| {
519 Box::new(
520 vals.into_iter()
521 .map(|val| Value::Float(Some(val)))
522 .collect(),
523 )
524 }),
525 ),
526
527 "FLOAT8" | "DOUBLE PRECISION" => {
528 Value::Double(row.try_get(c.ordinal()).expect("Failed to get double"))
529 }
530 #[cfg(feature = "postgres-array")]
531 "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
532 sea_query::ArrayType::Double,
533 row.try_get::<Option<Vec<f64>>, _>(c.ordinal())
534 .expect("Failed to get double array")
535 .map(|vals| {
536 Box::new(
537 vals.into_iter()
538 .map(|val| Value::Double(Some(val)))
539 .collect(),
540 )
541 }),
542 ),
543
544 "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(
545 row.try_get::<Option<String>, _>(c.ordinal())
546 .expect("Failed to get string"),
547 ),
548 #[cfg(feature = "postgres-array")]
549 "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
550 sea_query::ArrayType::String,
551 row.try_get::<Option<Vec<String>>, _>(c.ordinal())
552 .expect("Failed to get string array")
553 .map(|vals| {
554 Box::new(
555 vals.into_iter()
556 .map(|val| Value::String(Some(val)))
557 .collect(),
558 )
559 }),
560 ),
561
562 "BYTEA" => Value::Bytes(
563 row.try_get::<Option<Vec<u8>>, _>(c.ordinal())
564 .expect("Failed to get bytes"),
565 ),
566 #[cfg(feature = "postgres-array")]
567 "BYTEA[]" => Value::Array(
568 sea_query::ArrayType::Bytes,
569 row.try_get::<Option<Vec<Vec<u8>>>, _>(c.ordinal())
570 .expect("Failed to get bytes array")
571 .map(|vals| {
572 Box::new(
573 vals.into_iter()
574 .map(|val| Value::Bytes(Some(val)))
575 .collect(),
576 )
577 }),
578 ),
579
580 #[cfg(feature = "with-bigdecimal")]
581 "NUMERIC" => Value::BigDecimal(
582 row.try_get::<Option<bigdecimal::BigDecimal>, _>(c.ordinal())
583 .expect("Failed to get numeric"),
584 ),
585 #[cfg(all(
586 feature = "with-rust_decimal",
587 not(feature = "with-bigdecimal")
588 ))]
589 "NUMERIC" => {
590 Value::Decimal(row.try_get(c.ordinal()).expect("Failed to get numeric"))
591 }
592
593 #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
594 "NUMERIC[]" => Value::Array(
595 sea_query::ArrayType::BigDecimal,
596 row.try_get::<Option<Vec<bigdecimal::BigDecimal>>, _>(c.ordinal())
597 .expect("Failed to get numeric array")
598 .map(|vals| {
599 Box::new(
600 vals.into_iter()
601 .map(|val| Value::BigDecimal(Some(val)))
602 .collect(),
603 )
604 }),
605 ),
606 #[cfg(all(
607 feature = "with-rust_decimal",
608 not(feature = "with-bigdecimal"),
609 feature = "postgres-array"
610 ))]
611 "NUMERIC[]" => Value::Array(
612 sea_query::ArrayType::Decimal,
613 row.try_get::<Option<Vec<rust_decimal::Decimal>>, _>(c.ordinal())
614 .expect("Failed to get numeric array")
615 .map(|vals| {
616 Box::new(
617 vals.into_iter()
618 .map(|val| Value::Decimal(Some(val)))
619 .collect(),
620 )
621 }),
622 ),
623
624 "OID" => {
625 Value::BigInt(row.try_get(c.ordinal()).expect("Failed to get oid"))
626 }
627 #[cfg(feature = "postgres-array")]
628 "OID[]" => Value::Array(
629 sea_query::ArrayType::BigInt,
630 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
631 .expect("Failed to get oid array")
632 .map(|vals| {
633 Box::new(
634 vals.into_iter()
635 .map(|val| Value::BigInt(Some(val)))
636 .collect(),
637 )
638 }),
639 ),
640
641 #[cfg(feature = "with-json")]
642 "JSON" | "JSONB" => Value::Json(
643 row.try_get::<Option<serde_json::Value>, _>(c.ordinal())
644 .expect("Failed to get json")
645 .map(Box::new),
646 ),
647 #[cfg(all(
648 feature = "with-json",
649 any(feature = "json-array", feature = "postgres-array")
650 ))]
651 "JSON[]" | "JSONB[]" => Value::Array(
652 sea_query::ArrayType::Json,
653 row.try_get::<Option<Vec<serde_json::Value>>, _>(c.ordinal())
654 .expect("Failed to get json array")
655 .map(|vals| {
656 Box::new(
657 vals.into_iter()
658 .map(|val| Value::Json(Some(Box::new(val))))
659 .collect(),
660 )
661 }),
662 ),
663
664 #[cfg(feature = "with-ipnetwork")]
665 "INET" | "CIDR" => Value::IpNetwork(
666 row.try_get::<Option<ipnetwork::IpNetwork>, _>(c.ordinal())
667 .expect("Failed to get ip address"),
668 ),
669 #[cfg(feature = "with-ipnetwork")]
670 "INET[]" | "CIDR[]" => Value::Array(
671 sea_query::ArrayType::IpNetwork,
672 row.try_get::<Option<Vec<ipnetwork::IpNetwork>>, _>(c.ordinal())
673 .expect("Failed to get ip address array")
674 .map(|vals| {
675 Box::new(
676 vals.into_iter()
677 .map(|val| Value::IpNetwork(Some(val)))
678 .collect(),
679 )
680 }),
681 ),
682
683 #[cfg(feature = "with-mac_address")]
684 "MACADDR" | "MACADDR8" => Value::MacAddress(
685 row.try_get::<Option<mac_address::MacAddress>, _>(c.ordinal())
686 .expect("Failed to get mac address"),
687 ),
688 #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
689 "MACADDR[]" | "MACADDR8[]" => Value::Array(
690 sea_query::ArrayType::MacAddress,
691 row.try_get::<Option<Vec<mac_address::MacAddress>>, _>(c.ordinal())
692 .expect("Failed to get mac address array")
693 .map(|vals| {
694 Box::new(
695 vals.into_iter()
696 .map(|val| Value::MacAddress(Some(val)))
697 .collect(),
698 )
699 }),
700 ),
701
702 #[cfg(feature = "with-chrono")]
703 "TIMESTAMP" => Value::ChronoDateTime(
704 row.try_get::<Option<chrono::NaiveDateTime>, _>(c.ordinal())
705 .expect("Failed to get timestamp"),
706 ),
707 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
708 "TIMESTAMP" => Value::TimeDateTime(
709 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
710 .expect("Failed to get timestamp"),
711 ),
712
713 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
714 "TIMESTAMP[]" => Value::Array(
715 sea_query::ArrayType::ChronoDateTime,
716 row.try_get::<Option<Vec<chrono::NaiveDateTime>>, _>(c.ordinal())
717 .expect("Failed to get timestamp array")
718 .map(|vals| {
719 Box::new(
720 vals.into_iter()
721 .map(|val| Value::ChronoDateTime(Some(val)))
722 .collect(),
723 )
724 }),
725 ),
726 #[cfg(all(
727 feature = "with-time",
728 not(feature = "with-chrono"),
729 feature = "postgres-array"
730 ))]
731 "TIMESTAMP[]" => Value::Array(
732 sea_query::ArrayType::TimeDateTime,
733 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
734 .expect("Failed to get timestamp array")
735 .map(|vals| {
736 Box::new(
737 vals.into_iter()
738 .map(|val| Value::TimeDateTime(Some(val)))
739 .collect(),
740 )
741 }),
742 ),
743
744 #[cfg(feature = "with-chrono")]
745 "DATE" => Value::ChronoDate(
746 row.try_get::<Option<chrono::NaiveDate>, _>(c.ordinal())
747 .expect("Failed to get date"),
748 ),
749 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
750 "DATE" => Value::TimeDate(
751 row.try_get::<Option<time::Date>, _>(c.ordinal())
752 .expect("Failed to get date"),
753 ),
754
755 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
756 "DATE[]" => Value::Array(
757 sea_query::ArrayType::ChronoDate,
758 row.try_get::<Option<Vec<chrono::NaiveDate>>, _>(c.ordinal())
759 .expect("Failed to get date array")
760 .map(|vals| {
761 Box::new(
762 vals.into_iter()
763 .map(|val| Value::ChronoDate(Some(val)))
764 .collect(),
765 )
766 }),
767 ),
768 #[cfg(all(
769 feature = "with-time",
770 not(feature = "with-chrono"),
771 feature = "postgres-array"
772 ))]
773 "DATE[]" => Value::Array(
774 sea_query::ArrayType::TimeDate,
775 row.try_get::<Option<Vec<time::Date>>, _>(c.ordinal())
776 .expect("Failed to get date array")
777 .map(|vals| {
778 Box::new(
779 vals.into_iter()
780 .map(|val| Value::TimeDate(Some(val)))
781 .collect(),
782 )
783 }),
784 ),
785
786 #[cfg(feature = "with-chrono")]
787 "TIME" => Value::ChronoTime(
788 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
789 .expect("Failed to get time"),
790 ),
791 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
792 "TIME" => Value::TimeTime(
793 row.try_get::<Option<time::Time>, _>(c.ordinal())
794 .expect("Failed to get time"),
795 ),
796
797 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
798 "TIME[]" => Value::Array(
799 sea_query::ArrayType::ChronoTime,
800 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
801 .expect("Failed to get time array")
802 .map(|vals| {
803 Box::new(
804 vals.into_iter()
805 .map(|val| Value::ChronoTime(Some(val)))
806 .collect(),
807 )
808 }),
809 ),
810 #[cfg(all(
811 feature = "with-time",
812 not(feature = "with-chrono"),
813 feature = "postgres-array"
814 ))]
815 "TIME[]" => Value::Array(
816 sea_query::ArrayType::TimeTime,
817 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
818 .expect("Failed to get time array")
819 .map(|vals| {
820 Box::new(
821 vals.into_iter()
822 .map(|val| Value::TimeTime(Some(val)))
823 .collect(),
824 )
825 }),
826 ),
827
828 #[cfg(feature = "with-chrono")]
829 "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(
830 row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
831 .expect("Failed to get timestamptz"),
832 ),
833 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
834 "TIMESTAMPTZ" => Value::TimeDateTimeWithTimeZone(
835 row.try_get::<Option<time::OffsetDateTime>, _>(c.ordinal())
836 .expect("Failed to get timestamptz"),
837 ),
838
839 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
840 "TIMESTAMPTZ[]" => Value::Array(
841 sea_query::ArrayType::ChronoDateTimeUtc,
842 row.try_get::<Option<Vec<chrono::DateTime<chrono::Utc>>>, _>(
843 c.ordinal(),
844 )
845 .expect("Failed to get timestamptz array")
846 .map(|vals| {
847 Box::new(
848 vals.into_iter()
849 .map(|val| Value::ChronoDateTimeUtc(Some(val)))
850 .collect(),
851 )
852 }),
853 ),
854 #[cfg(all(
855 feature = "with-time",
856 not(feature = "with-chrono"),
857 feature = "postgres-array"
858 ))]
859 "TIMESTAMPTZ[]" => Value::Array(
860 sea_query::ArrayType::TimeDateTimeWithTimeZone,
861 row.try_get::<Option<Vec<time::OffsetDateTime>>, _>(c.ordinal())
862 .expect("Failed to get timestamptz array")
863 .map(|vals| {
864 Box::new(
865 vals.into_iter()
866 .map(|val| Value::TimeDateTimeWithTimeZone(Some(val)))
867 .collect(),
868 )
869 }),
870 ),
871
872 #[cfg(feature = "with-chrono")]
873 "TIMETZ" => Value::ChronoTime(
874 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
875 .expect("Failed to get timetz"),
876 ),
877 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
878 "TIMETZ" => {
879 Value::TimeTime(row.try_get(c.ordinal()).expect("Failed to get timetz"))
880 }
881
882 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
883 "TIMETZ[]" => Value::Array(
884 sea_query::ArrayType::ChronoTime,
885 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
886 .expect("Failed to get timetz array")
887 .map(|vals| {
888 Box::new(
889 vals.into_iter()
890 .map(|val| Value::ChronoTime(Some(val)))
891 .collect(),
892 )
893 }),
894 ),
895 #[cfg(all(
896 feature = "with-time",
897 not(feature = "with-chrono"),
898 feature = "postgres-array"
899 ))]
900 "TIMETZ[]" => Value::Array(
901 sea_query::ArrayType::TimeTime,
902 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
903 .expect("Failed to get timetz array")
904 .map(|vals| {
905 Box::new(
906 vals.into_iter()
907 .map(|val| Value::TimeTime(Some(val)))
908 .collect(),
909 )
910 }),
911 ),
912
913 #[cfg(feature = "with-uuid")]
914 "UUID" => Value::Uuid(
915 row.try_get::<Option<uuid::Uuid>, _>(c.ordinal())
916 .expect("Failed to get uuid"),
917 ),
918
919 #[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
920 "UUID[]" => Value::Array(
921 sea_query::ArrayType::Uuid,
922 row.try_get::<Option<Vec<uuid::Uuid>>, _>(c.ordinal())
923 .expect("Failed to get uuid array")
924 .map(|vals| {
925 Box::new(
926 vals.into_iter()
927 .map(|val| Value::Uuid(Some(val)))
928 .collect(),
929 )
930 }),
931 ),
932
933 _ => unreachable!("Unknown column type: {}", c.type_info().name()),
934 },
935 )
936 })
937 .collect(),
938 }
939}