1use futures_util::lock::Mutex;
2use log::LevelFilter;
3use sea_query::Values;
4use std::{fmt::Write, future::Future, pin::Pin, sync::Arc};
5
6use sqlx::{
7 Connection, Executor, PgPool, Postgres,
8 pool::PoolConnection,
9 postgres::{PgConnectOptions, PgQueryResult, PgRow},
10};
11
12use sea_query_sqlx::SqlxValues;
13use tracing::instrument;
14
15use crate::{
16 AccessMode, ConnectOptions, DatabaseConnection, DatabaseConnectionType, DatabaseTransaction,
17 IsolationLevel, Statement, TransactionError, debug_print, error::*, executor::*,
18};
19
20use super::sqlx_common::*;
21
22#[cfg(feature = "stream")]
23use crate::QueryStream;
24
25#[derive(Debug)]
27pub struct SqlxPostgresConnector;
28
29#[derive(Clone)]
31pub struct SqlxPostgresPoolConnection {
32 pub(crate) pool: PgPool,
33 metric_callback: Option<crate::metric::Callback>,
34 pub(crate) record_stmt_in_spans: bool,
35}
36
37impl std::fmt::Debug for SqlxPostgresPoolConnection {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
40 }
41}
42
43impl From<PgPool> for SqlxPostgresPoolConnection {
44 fn from(pool: PgPool) -> Self {
45 SqlxPostgresPoolConnection {
46 pool,
47 metric_callback: None,
48 record_stmt_in_spans: true,
49 }
50 }
51}
52
53impl From<PgPool> for DatabaseConnection {
54 fn from(pool: PgPool) -> Self {
55 DatabaseConnectionType::SqlxPostgresPoolConnection(pool.into()).into()
56 }
57}
58
59impl SqlxPostgresConnector {
60 pub fn accepts(string: &str) -> bool {
62 string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
63 }
64
65 #[instrument(level = "trace")]
67 pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
68 let record_stmt_in_spans = options.get_record_stmt_in_spans();
69 let mut sqlx_opts = options
70 .url
71 .parse::<PgConnectOptions>()
72 .map_err(sqlx_error_to_conn_err)?;
73 use sqlx::ConnectOptions;
74 if !options.sqlx_logging {
75 sqlx_opts = sqlx_opts.disable_statement_logging();
76 } else {
77 sqlx_opts = sqlx_opts.log_statements(options.sqlx_logging_level);
78 if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
79 sqlx_opts = sqlx_opts.log_slow_statements(
80 options.sqlx_slow_statements_logging_level,
81 options.sqlx_slow_statements_logging_threshold,
82 );
83 }
84 }
85
86 if let Some(application_name) = &options.application_name {
87 sqlx_opts = sqlx_opts.application_name(application_name);
88 }
89
90 if let Some(timeout) = options.statement_timeout {
91 sqlx_opts = sqlx_opts.options([("statement_timeout", timeout.as_millis().to_string())]);
92 }
93
94 if let Some(f) = &options.pg_opts_fn {
95 sqlx_opts = f(sqlx_opts);
96 }
97
98 let set_search_path_sql = options.schema_search_path.as_ref().map(|schema| {
99 let mut string = "SET search_path = ".to_owned();
100 if schema.starts_with('"') {
101 write!(&mut string, "{schema}").expect("Infallible");
102 } else {
103 for (i, schema) in schema.split(',').enumerate() {
104 if i > 0 {
105 write!(&mut string, ",").expect("Infallible");
106 }
107 if schema.starts_with('"') {
108 write!(&mut string, "{schema}").expect("Infallible");
109 } else {
110 write!(&mut string, "\"{schema}\"").expect("Infallible");
111 }
112 }
113 }
114 string
115 });
116
117 let lazy = options.connect_lazy;
118 let after_connect = options.after_connect.clone();
119 let pg_pool_opts_fn = options.pg_pool_opts_fn.clone();
120 let mut pool_options = options.sqlx_pool_options();
121
122 if let Some(sql) = set_search_path_sql {
123 pool_options = pool_options.after_connect(move |conn, _| {
124 let sql = sql.clone();
125 Box::pin(async move {
126 sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql))
127 .await
128 .map(|_| ())
129 })
130 });
131 }
132 if let Some(f) = &pg_pool_opts_fn {
133 pool_options = f(pool_options);
134 }
135 let pool = if lazy {
136 pool_options.connect_lazy_with(sqlx_opts)
137 } else {
138 pool_options
139 .connect_with(sqlx_opts)
140 .await
141 .map_err(sqlx_error_to_conn_err)?
142 };
143
144 let conn: DatabaseConnection =
145 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
146 pool,
147 metric_callback: None,
148 record_stmt_in_spans,
149 })
150 .into();
151
152 if let Some(cb) = after_connect {
153 cb(conn.clone()).await?;
154 }
155
156 Ok(conn)
157 }
158}
159
160impl SqlxPostgresConnector {
161 pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
163 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
164 pool,
165 metric_callback: None,
166 record_stmt_in_spans: true,
167 })
168 .into()
169 }
170}
171
172impl SqlxPostgresPoolConnection {
173 #[instrument(level = "trace", skip(stmt))]
175 pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
176 debug_print!("{}", stmt);
177
178 let query = sqlx_query(&stmt);
179 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
180 crate::metric::metric!(self.metric_callback, &stmt, {
181 match query.execute(&mut *conn).await {
182 Ok(res) => Ok(res.into()),
183 Err(err) => Err(sqlx_error_to_exec_err(err)),
184 }
185 })
186 }
187
188 #[instrument(level = "trace", skip(sql))]
190 pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
191 debug_print!("{}", sql);
192
193 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
194 match conn.execute(sqlx::AssertSqlSafe(sql.to_owned())).await {
195 Ok(res) => Ok(res.into()),
196 Err(err) => Err(sqlx_error_to_exec_err(err)),
197 }
198 }
199
200 #[instrument(level = "trace", skip(stmt))]
202 pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
203 debug_print!("{}", stmt);
204
205 let query = sqlx_query(&stmt);
206 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
207 crate::metric::metric!(self.metric_callback, &stmt, {
208 match query.fetch_one(&mut *conn).await {
209 Ok(row) => Ok(Some(row.into())),
210 Err(err) => match err {
211 sqlx::Error::RowNotFound => Ok(None),
212 _ => Err(sqlx_error_to_query_err(err)),
213 },
214 }
215 })
216 }
217
218 #[instrument(level = "trace", skip(stmt))]
220 pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
221 debug_print!("{}", stmt);
222
223 let query = sqlx_query(&stmt);
224 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
225 crate::metric::metric!(self.metric_callback, &stmt, {
226 match query.fetch_all(&mut *conn).await {
227 Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
228 Err(err) => Err(sqlx_error_to_query_err(err)),
229 }
230 })
231 }
232
233 #[instrument(level = "trace", skip(stmt))]
235 #[cfg(feature = "stream")]
236 pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
237 debug_print!("{}", stmt);
238
239 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
240 Ok(QueryStream::from((
241 conn,
242 stmt,
243 self.metric_callback.clone(),
244 )))
245 }
246
247 #[instrument(level = "trace")]
249 pub async fn begin(
250 &self,
251 isolation_level: Option<IsolationLevel>,
252 access_mode: Option<AccessMode>,
253 ) -> Result<DatabaseTransaction, DbErr> {
254 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
255 DatabaseTransaction::new_postgres(
256 conn,
257 self.metric_callback.clone(),
258 self.record_stmt_in_spans,
259 isolation_level,
260 access_mode,
261 )
262 .await
263 }
264
265 #[instrument(level = "trace", skip(callback))]
267 pub async fn transaction<F, T, E>(
268 &self,
269 callback: F,
270 isolation_level: Option<IsolationLevel>,
271 access_mode: Option<AccessMode>,
272 ) -> Result<T, TransactionError<E>>
273 where
274 F: for<'b> FnOnce(
275 &'b DatabaseTransaction,
276 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
277 + Send,
278 T: Send,
279 E: std::fmt::Display + std::fmt::Debug + Send,
280 {
281 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
282 let transaction = DatabaseTransaction::new_postgres(
283 conn,
284 self.metric_callback.clone(),
285 self.record_stmt_in_spans,
286 isolation_level,
287 access_mode,
288 )
289 .await
290 .map_err(|e| TransactionError::Connection(e))?;
291 transaction.run(callback).await
292 }
293
294 pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
295 where
296 F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
297 {
298 self.metric_callback = Some(Arc::new(callback));
299 }
300
301 pub async fn ping(&self) -> Result<(), DbErr> {
303 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
304 match conn.ping().await {
305 Ok(_) => Ok(()),
306 Err(err) => Err(sqlx_error_to_conn_err(err)),
307 }
308 }
309
310 pub async fn close(self) -> Result<(), DbErr> {
313 self.close_by_ref().await
314 }
315
316 pub async fn close_by_ref(&self) -> Result<(), DbErr> {
318 self.pool.close().await;
319 Ok(())
320 }
321}
322
323impl From<PgRow> for QueryResult {
324 fn from(row: PgRow) -> QueryResult {
325 QueryResult {
326 row: QueryResultRow::SqlxPostgres(row),
327 }
328 }
329}
330
331impl From<PgQueryResult> for ExecResult {
332 fn from(result: PgQueryResult) -> ExecResult {
333 ExecResult {
334 result: ExecResultHolder::SqlxPostgres(result),
335 }
336 }
337}
338
339pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, SqlxValues> {
340 let values = stmt
341 .values
342 .as_ref()
343 .map_or(Values(Vec::new()), |values| values.clone());
344 sqlx::query_with(sqlx::AssertSqlSafe(stmt.sql.as_str()), SqlxValues(values))
345}
346
347pub(crate) async fn set_transaction_config(
348 conn: &mut PoolConnection<Postgres>,
349 isolation_level: Option<IsolationLevel>,
350 access_mode: Option<AccessMode>,
351) -> Result<(), DbErr> {
352 let mut settings = Vec::new();
353
354 if let Some(isolation_level) = isolation_level {
355 settings.push(format!("ISOLATION LEVEL {isolation_level}"));
356 }
357
358 if let Some(access_mode) = access_mode {
359 settings.push(access_mode.to_string());
360 }
361
362 if !settings.is_empty() {
363 let sql = format!("SET TRANSACTION {}", settings.join(" "));
364 sqlx::query(sqlx::AssertSqlSafe(sql))
365 .execute(&mut **conn)
366 .await
367 .map_err(sqlx_error_to_exec_err)?;
368 }
369 Ok(())
370}
371
372#[cfg(feature = "stream")]
373impl
374 From<(
375 PoolConnection<sqlx::Postgres>,
376 Statement,
377 Option<crate::metric::Callback>,
378 )> for crate::QueryStream
379{
380 fn from(
381 (conn, stmt, metric_callback): (
382 PoolConnection<sqlx::Postgres>,
383 Statement,
384 Option<crate::metric::Callback>,
385 ),
386 ) -> Self {
387 crate::QueryStream::build(
388 stmt,
389 crate::InnerConnection::Postgres(conn),
390 metric_callback,
391 )
392 }
393}
394
395impl crate::DatabaseTransaction {
396 pub(crate) async fn new_postgres(
397 inner: PoolConnection<sqlx::Postgres>,
398 metric_callback: Option<crate::metric::Callback>,
399 record_stmt_in_spans: bool,
400 isolation_level: Option<IsolationLevel>,
401 access_mode: Option<AccessMode>,
402 ) -> Result<crate::DatabaseTransaction, DbErr> {
403 Self::begin(
404 Arc::new(Mutex::new(crate::InnerConnection::Postgres(inner))),
405 crate::DbBackend::Postgres,
406 metric_callback,
407 record_stmt_in_spans,
408 isolation_level,
409 access_mode,
410 None,
411 )
412 .await
413 }
414}
415
416#[cfg(feature = "proxy")]
417pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> crate::ProxyRow {
418 use sea_query::Value;
421 use sqlx::{Column, Row, TypeInfo};
422 crate::ProxyRow {
423 values: row
424 .columns()
425 .iter()
426 .map(|c| {
427 (
428 c.name().to_string(),
429 match c.type_info().name() {
430 "BOOL" => {
431 Value::Bool(row.try_get(c.ordinal()).expect("Failed to get boolean"))
432 }
433 #[cfg(feature = "postgres-array")]
434 "BOOL[]" => Value::Array(
435 sea_query::ArrayType::Bool,
436 row.try_get::<Option<Vec<bool>>, _>(c.ordinal())
437 .expect("Failed to get boolean array")
438 .map(|vals| {
439 Box::new(
440 vals.into_iter()
441 .map(|val| Value::Bool(Some(val)))
442 .collect(),
443 )
444 }),
445 ),
446
447 "\"CHAR\"" => Value::TinyInt(
448 row.try_get(c.ordinal())
449 .expect("Failed to get small integer"),
450 ),
451 #[cfg(feature = "postgres-array")]
452 "\"CHAR\"[]" => Value::Array(
453 sea_query::ArrayType::TinyInt,
454 row.try_get::<Option<Vec<i8>>, _>(c.ordinal())
455 .expect("Failed to get small integer array")
456 .map(|vals: Vec<i8>| {
457 Box::new(
458 vals.into_iter()
459 .map(|val| Value::TinyInt(Some(val)))
460 .collect(),
461 )
462 }),
463 ),
464
465 "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(
466 row.try_get(c.ordinal())
467 .expect("Failed to get small integer"),
468 ),
469 #[cfg(feature = "postgres-array")]
470 "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
471 sea_query::ArrayType::SmallInt,
472 row.try_get::<Option<Vec<i16>>, _>(c.ordinal())
473 .expect("Failed to get small integer array")
474 .map(|vals: Vec<i16>| {
475 Box::new(
476 vals.into_iter()
477 .map(|val| Value::SmallInt(Some(val)))
478 .collect(),
479 )
480 }),
481 ),
482
483 "INT" | "SERIAL" | "INT4" => {
484 Value::Int(row.try_get(c.ordinal()).expect("Failed to get integer"))
485 }
486 #[cfg(feature = "postgres-array")]
487 "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
488 sea_query::ArrayType::Int,
489 row.try_get::<Option<Vec<i32>>, _>(c.ordinal())
490 .expect("Failed to get integer array")
491 .map(|vals: Vec<i32>| {
492 Box::new(
493 vals.into_iter().map(|val| Value::Int(Some(val))).collect(),
494 )
495 }),
496 ),
497
498 "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(
499 row.try_get(c.ordinal()).expect("Failed to get big integer"),
500 ),
501 #[cfg(feature = "postgres-array")]
502 "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
503 sea_query::ArrayType::BigInt,
504 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
505 .expect("Failed to get big integer array")
506 .map(|vals: Vec<i64>| {
507 Box::new(
508 vals.into_iter()
509 .map(|val| Value::BigInt(Some(val)))
510 .collect(),
511 )
512 }),
513 ),
514
515 "FLOAT4" | "REAL" => {
516 Value::Float(row.try_get(c.ordinal()).expect("Failed to get float"))
517 }
518 #[cfg(feature = "postgres-array")]
519 "FLOAT4[]" | "REAL[]" => Value::Array(
520 sea_query::ArrayType::Float,
521 row.try_get::<Option<Vec<f32>>, _>(c.ordinal())
522 .expect("Failed to get float array")
523 .map(|vals| {
524 Box::new(
525 vals.into_iter()
526 .map(|val| Value::Float(Some(val)))
527 .collect(),
528 )
529 }),
530 ),
531
532 "FLOAT8" | "DOUBLE PRECISION" => {
533 Value::Double(row.try_get(c.ordinal()).expect("Failed to get double"))
534 }
535 #[cfg(feature = "postgres-array")]
536 "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
537 sea_query::ArrayType::Double,
538 row.try_get::<Option<Vec<f64>>, _>(c.ordinal())
539 .expect("Failed to get double array")
540 .map(|vals| {
541 Box::new(
542 vals.into_iter()
543 .map(|val| Value::Double(Some(val)))
544 .collect(),
545 )
546 }),
547 ),
548
549 "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(
550 row.try_get::<Option<String>, _>(c.ordinal())
551 .expect("Failed to get string"),
552 ),
553 #[cfg(feature = "postgres-array")]
554 "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
555 sea_query::ArrayType::String,
556 row.try_get::<Option<Vec<String>>, _>(c.ordinal())
557 .expect("Failed to get string array")
558 .map(|vals| {
559 Box::new(
560 vals.into_iter()
561 .map(|val| Value::String(Some(val)))
562 .collect(),
563 )
564 }),
565 ),
566
567 "BYTEA" => Value::Bytes(
568 row.try_get::<Option<Vec<u8>>, _>(c.ordinal())
569 .expect("Failed to get bytes"),
570 ),
571 #[cfg(feature = "postgres-array")]
572 "BYTEA[]" => Value::Array(
573 sea_query::ArrayType::Bytes,
574 row.try_get::<Option<Vec<Vec<u8>>>, _>(c.ordinal())
575 .expect("Failed to get bytes array")
576 .map(|vals| {
577 Box::new(
578 vals.into_iter()
579 .map(|val| Value::Bytes(Some(val)))
580 .collect(),
581 )
582 }),
583 ),
584
585 #[cfg(feature = "with-bigdecimal")]
586 "NUMERIC" => Value::BigDecimal(
587 row.try_get::<Option<bigdecimal::BigDecimal>, _>(c.ordinal())
588 .expect("Failed to get numeric"),
589 ),
590 #[cfg(all(
591 feature = "with-rust_decimal",
592 not(feature = "with-bigdecimal")
593 ))]
594 "NUMERIC" => {
595 Value::Decimal(row.try_get(c.ordinal()).expect("Failed to get numeric"))
596 }
597
598 #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
599 "NUMERIC[]" => Value::Array(
600 sea_query::ArrayType::BigDecimal,
601 row.try_get::<Option<Vec<bigdecimal::BigDecimal>>, _>(c.ordinal())
602 .expect("Failed to get numeric array")
603 .map(|vals| {
604 Box::new(
605 vals.into_iter()
606 .map(|val| Value::BigDecimal(Some(val)))
607 .collect(),
608 )
609 }),
610 ),
611 #[cfg(all(
612 feature = "with-rust_decimal",
613 not(feature = "with-bigdecimal"),
614 feature = "postgres-array"
615 ))]
616 "NUMERIC[]" => Value::Array(
617 sea_query::ArrayType::Decimal,
618 row.try_get::<Option<Vec<rust_decimal::Decimal>>, _>(c.ordinal())
619 .expect("Failed to get numeric array")
620 .map(|vals| {
621 Box::new(
622 vals.into_iter()
623 .map(|val| Value::Decimal(Some(val)))
624 .collect(),
625 )
626 }),
627 ),
628
629 "OID" => {
630 Value::BigInt(row.try_get(c.ordinal()).expect("Failed to get oid"))
631 }
632 #[cfg(feature = "postgres-array")]
633 "OID[]" => Value::Array(
634 sea_query::ArrayType::BigInt,
635 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
636 .expect("Failed to get oid array")
637 .map(|vals| {
638 Box::new(
639 vals.into_iter()
640 .map(|val| Value::BigInt(Some(val)))
641 .collect(),
642 )
643 }),
644 ),
645
646 #[cfg(feature = "with-json")]
647 "JSON" | "JSONB" => Value::Json(
648 row.try_get::<Option<serde_json::Value>, _>(c.ordinal())
649 .expect("Failed to get json")
650 .map(Box::new),
651 ),
652 #[cfg(all(
653 feature = "with-json",
654 any(feature = "json-array", feature = "postgres-array")
655 ))]
656 "JSON[]" | "JSONB[]" => Value::Array(
657 sea_query::ArrayType::Json,
658 row.try_get::<Option<Vec<serde_json::Value>>, _>(c.ordinal())
659 .expect("Failed to get json array")
660 .map(|vals| {
661 Box::new(
662 vals.into_iter()
663 .map(|val| Value::Json(Some(Box::new(val))))
664 .collect(),
665 )
666 }),
667 ),
668
669 #[cfg(feature = "with-ipnetwork")]
670 "INET" | "CIDR" => Value::IpNetwork(
671 row.try_get::<Option<ipnetwork::IpNetwork>, _>(c.ordinal())
672 .expect("Failed to get ip address"),
673 ),
674 #[cfg(feature = "with-ipnetwork")]
675 "INET[]" | "CIDR[]" => Value::Array(
676 sea_query::ArrayType::IpNetwork,
677 row.try_get::<Option<Vec<ipnetwork::IpNetwork>>, _>(c.ordinal())
678 .expect("Failed to get ip address array")
679 .map(|vals| {
680 Box::new(
681 vals.into_iter()
682 .map(|val| Value::IpNetwork(Some(val)))
683 .collect(),
684 )
685 }),
686 ),
687
688 #[cfg(feature = "with-mac_address")]
689 "MACADDR" | "MACADDR8" => Value::MacAddress(
690 row.try_get::<Option<mac_address::MacAddress>, _>(c.ordinal())
691 .expect("Failed to get mac address"),
692 ),
693 #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
694 "MACADDR[]" | "MACADDR8[]" => Value::Array(
695 sea_query::ArrayType::MacAddress,
696 row.try_get::<Option<Vec<mac_address::MacAddress>>, _>(c.ordinal())
697 .expect("Failed to get mac address array")
698 .map(|vals| {
699 Box::new(
700 vals.into_iter()
701 .map(|val| Value::MacAddress(Some(val)))
702 .collect(),
703 )
704 }),
705 ),
706
707 #[cfg(feature = "with-chrono")]
708 "TIMESTAMP" => Value::ChronoDateTime(
709 row.try_get::<Option<chrono::NaiveDateTime>, _>(c.ordinal())
710 .expect("Failed to get timestamp"),
711 ),
712 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
713 "TIMESTAMP" => Value::TimeDateTime(
714 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
715 .expect("Failed to get timestamp"),
716 ),
717
718 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
719 "TIMESTAMP[]" => Value::Array(
720 sea_query::ArrayType::ChronoDateTime,
721 row.try_get::<Option<Vec<chrono::NaiveDateTime>>, _>(c.ordinal())
722 .expect("Failed to get timestamp array")
723 .map(|vals| {
724 Box::new(
725 vals.into_iter()
726 .map(|val| Value::ChronoDateTime(Some(val)))
727 .collect(),
728 )
729 }),
730 ),
731 #[cfg(all(
732 feature = "with-time",
733 not(feature = "with-chrono"),
734 feature = "postgres-array"
735 ))]
736 "TIMESTAMP[]" => Value::Array(
737 sea_query::ArrayType::TimeDateTime,
738 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
739 .expect("Failed to get timestamp array")
740 .map(|vals| {
741 Box::new(
742 vals.into_iter()
743 .map(|val| Value::TimeDateTime(Some(val)))
744 .collect(),
745 )
746 }),
747 ),
748
749 #[cfg(feature = "with-chrono")]
750 "DATE" => Value::ChronoDate(
751 row.try_get::<Option<chrono::NaiveDate>, _>(c.ordinal())
752 .expect("Failed to get date"),
753 ),
754 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
755 "DATE" => Value::TimeDate(
756 row.try_get::<Option<time::Date>, _>(c.ordinal())
757 .expect("Failed to get date"),
758 ),
759
760 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
761 "DATE[]" => Value::Array(
762 sea_query::ArrayType::ChronoDate,
763 row.try_get::<Option<Vec<chrono::NaiveDate>>, _>(c.ordinal())
764 .expect("Failed to get date array")
765 .map(|vals| {
766 Box::new(
767 vals.into_iter()
768 .map(|val| Value::ChronoDate(Some(val)))
769 .collect(),
770 )
771 }),
772 ),
773 #[cfg(all(
774 feature = "with-time",
775 not(feature = "with-chrono"),
776 feature = "postgres-array"
777 ))]
778 "DATE[]" => Value::Array(
779 sea_query::ArrayType::TimeDate,
780 row.try_get::<Option<Vec<time::Date>>, _>(c.ordinal())
781 .expect("Failed to get date array")
782 .map(|vals| {
783 Box::new(
784 vals.into_iter()
785 .map(|val| Value::TimeDate(Some(val)))
786 .collect(),
787 )
788 }),
789 ),
790
791 #[cfg(feature = "with-chrono")]
792 "TIME" => Value::ChronoTime(
793 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
794 .expect("Failed to get time"),
795 ),
796 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
797 "TIME" => Value::TimeTime(
798 row.try_get::<Option<time::Time>, _>(c.ordinal())
799 .expect("Failed to get time"),
800 ),
801
802 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
803 "TIME[]" => Value::Array(
804 sea_query::ArrayType::ChronoTime,
805 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
806 .expect("Failed to get time array")
807 .map(|vals| {
808 Box::new(
809 vals.into_iter()
810 .map(|val| Value::ChronoTime(Some(val)))
811 .collect(),
812 )
813 }),
814 ),
815 #[cfg(all(
816 feature = "with-time",
817 not(feature = "with-chrono"),
818 feature = "postgres-array"
819 ))]
820 "TIME[]" => Value::Array(
821 sea_query::ArrayType::TimeTime,
822 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
823 .expect("Failed to get time array")
824 .map(|vals| {
825 Box::new(
826 vals.into_iter()
827 .map(|val| Value::TimeTime(Some(val)))
828 .collect(),
829 )
830 }),
831 ),
832
833 #[cfg(feature = "with-chrono")]
834 "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(
835 row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
836 .expect("Failed to get timestamptz"),
837 ),
838 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
839 "TIMESTAMPTZ" => Value::TimeDateTimeWithTimeZone(
840 row.try_get::<Option<time::OffsetDateTime>, _>(c.ordinal())
841 .expect("Failed to get timestamptz"),
842 ),
843
844 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
845 "TIMESTAMPTZ[]" => Value::Array(
846 sea_query::ArrayType::ChronoDateTimeUtc,
847 row.try_get::<Option<Vec<chrono::DateTime<chrono::Utc>>>, _>(
848 c.ordinal(),
849 )
850 .expect("Failed to get timestamptz array")
851 .map(|vals| {
852 Box::new(
853 vals.into_iter()
854 .map(|val| Value::ChronoDateTimeUtc(Some(val)))
855 .collect(),
856 )
857 }),
858 ),
859 #[cfg(all(
860 feature = "with-time",
861 not(feature = "with-chrono"),
862 feature = "postgres-array"
863 ))]
864 "TIMESTAMPTZ[]" => Value::Array(
865 sea_query::ArrayType::TimeDateTimeWithTimeZone,
866 row.try_get::<Option<Vec<time::OffsetDateTime>>, _>(c.ordinal())
867 .expect("Failed to get timestamptz array")
868 .map(|vals| {
869 Box::new(
870 vals.into_iter()
871 .map(|val| Value::TimeDateTimeWithTimeZone(Some(val)))
872 .collect(),
873 )
874 }),
875 ),
876
877 #[cfg(feature = "with-chrono")]
878 "TIMETZ" => Value::ChronoTime(
879 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
880 .expect("Failed to get timetz"),
881 ),
882 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
883 "TIMETZ" => {
884 Value::TimeTime(row.try_get(c.ordinal()).expect("Failed to get timetz"))
885 }
886
887 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
888 "TIMETZ[]" => Value::Array(
889 sea_query::ArrayType::ChronoTime,
890 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
891 .expect("Failed to get timetz array")
892 .map(|vals| {
893 Box::new(
894 vals.into_iter()
895 .map(|val| Value::ChronoTime(Some(val)))
896 .collect(),
897 )
898 }),
899 ),
900 #[cfg(all(
901 feature = "with-time",
902 not(feature = "with-chrono"),
903 feature = "postgres-array"
904 ))]
905 "TIMETZ[]" => Value::Array(
906 sea_query::ArrayType::TimeTime,
907 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
908 .expect("Failed to get timetz array")
909 .map(|vals| {
910 Box::new(
911 vals.into_iter()
912 .map(|val| Value::TimeTime(Some(val)))
913 .collect(),
914 )
915 }),
916 ),
917
918 #[cfg(feature = "with-uuid")]
919 "UUID" => Value::Uuid(
920 row.try_get::<Option<uuid::Uuid>, _>(c.ordinal())
921 .expect("Failed to get uuid"),
922 ),
923
924 #[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
925 "UUID[]" => Value::Array(
926 sea_query::ArrayType::Uuid,
927 row.try_get::<Option<Vec<uuid::Uuid>>, _>(c.ordinal())
928 .expect("Failed to get uuid array")
929 .map(|vals| {
930 Box::new(
931 vals.into_iter()
932 .map(|val| Value::Uuid(Some(val)))
933 .collect(),
934 )
935 }),
936 ),
937
938 _ => unreachable!("Unknown column type: {}", c.type_info().name()),
939 },
940 )
941 })
942 .collect(),
943 }
944}