1use futures_util::lock::Mutex;
2use log::LevelFilter;
3use sea_query::Values;
4use std::{fmt::Write, future::Future, pin::Pin, sync::Arc};
5
6use sqlx::{
7 Connection, Executor, PgPool, Postgres,
8 pool::PoolConnection,
9 postgres::{PgConnectOptions, PgQueryResult, PgRow},
10};
11
12use sea_query_sqlx::SqlxValues;
13use tracing::instrument;
14
15use crate::{
16 AccessMode, ConnectOptions, DatabaseConnection, DatabaseConnectionType, DatabaseTransaction,
17 IsolationLevel, QueryStream, Statement, TransactionError, debug_print, error::*, executor::*,
18};
19
20use super::sqlx_common::*;
21
22#[derive(Debug)]
24pub struct SqlxPostgresConnector;
25
26#[derive(Clone)]
28pub struct SqlxPostgresPoolConnection {
29 pub(crate) pool: PgPool,
30 metric_callback: Option<crate::metric::Callback>,
31}
32
33impl std::fmt::Debug for SqlxPostgresPoolConnection {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
36 }
37}
38
39impl From<PgPool> for SqlxPostgresPoolConnection {
40 fn from(pool: PgPool) -> Self {
41 SqlxPostgresPoolConnection {
42 pool,
43 metric_callback: None,
44 }
45 }
46}
47
48impl From<PgPool> for DatabaseConnection {
49 fn from(pool: PgPool) -> Self {
50 DatabaseConnectionType::SqlxPostgresPoolConnection(pool.into()).into()
51 }
52}
53
54impl SqlxPostgresConnector {
55 pub fn accepts(string: &str) -> bool {
57 string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
58 }
59
60 #[instrument(level = "trace")]
62 pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
63 let mut sqlx_opts = options
64 .url
65 .parse::<PgConnectOptions>()
66 .map_err(sqlx_error_to_conn_err)?;
67 use sqlx::ConnectOptions;
68 if !options.sqlx_logging {
69 sqlx_opts = sqlx_opts.disable_statement_logging();
70 } else {
71 sqlx_opts = sqlx_opts.log_statements(options.sqlx_logging_level);
72 if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
73 sqlx_opts = sqlx_opts.log_slow_statements(
74 options.sqlx_slow_statements_logging_level,
75 options.sqlx_slow_statements_logging_threshold,
76 );
77 }
78 }
79
80 if let Some(application_name) = &options.application_name {
81 sqlx_opts = sqlx_opts.application_name(application_name);
82 }
83
84 if let Some(f) = &options.pg_opts_fn {
85 sqlx_opts = f(sqlx_opts);
86 }
87
88 let set_search_path_sql = options.schema_search_path.as_ref().map(|schema| {
89 let mut string = "SET search_path = ".to_owned();
90 if schema.starts_with('"') {
91 write!(&mut string, "{schema}").expect("Infallible");
92 } else {
93 for (i, schema) in schema.split(',').enumerate() {
94 if i > 0 {
95 write!(&mut string, ",").expect("Infallible");
96 }
97 if schema.starts_with('"') {
98 write!(&mut string, "{schema}").expect("Infallible");
99 } else {
100 write!(&mut string, "\"{schema}\"").expect("Infallible");
101 }
102 }
103 }
104 string
105 });
106
107 let lazy = options.connect_lazy;
108 let after_connect = options.after_connect.clone();
109 let mut pool_options = options.sqlx_pool_options();
110
111 if let Some(sql) = set_search_path_sql {
112 pool_options = pool_options.after_connect(move |conn, _| {
113 let sql = sql.clone();
114 Box::pin(async move {
115 sqlx::Executor::execute(conn, sql.as_str())
116 .await
117 .map(|_| ())
118 })
119 });
120 }
121
122 let pool = if lazy {
123 pool_options.connect_lazy_with(sqlx_opts)
124 } else {
125 pool_options
126 .connect_with(sqlx_opts)
127 .await
128 .map_err(sqlx_error_to_conn_err)?
129 };
130
131 let conn: DatabaseConnection =
132 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
133 pool,
134 metric_callback: None,
135 })
136 .into();
137
138 if let Some(cb) = after_connect {
139 cb(conn.clone()).await?;
140 }
141
142 Ok(conn)
143 }
144}
145
146impl SqlxPostgresConnector {
147 pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
149 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
150 pool,
151 metric_callback: None,
152 })
153 .into()
154 }
155}
156
157impl SqlxPostgresPoolConnection {
158 #[instrument(level = "trace")]
160 pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
161 debug_print!("{}", stmt);
162
163 let query = sqlx_query(&stmt);
164 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
165 crate::metric::metric!(self.metric_callback, &stmt, {
166 match query.execute(&mut *conn).await {
167 Ok(res) => Ok(res.into()),
168 Err(err) => Err(sqlx_error_to_exec_err(err)),
169 }
170 })
171 }
172
173 #[instrument(level = "trace")]
175 pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
176 debug_print!("{}", sql);
177
178 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
179 match conn.execute(sql).await {
180 Ok(res) => Ok(res.into()),
181 Err(err) => Err(sqlx_error_to_exec_err(err)),
182 }
183 }
184
185 #[instrument(level = "trace")]
187 pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
188 debug_print!("{}", stmt);
189
190 let query = sqlx_query(&stmt);
191 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
192 crate::metric::metric!(self.metric_callback, &stmt, {
193 match query.fetch_one(&mut *conn).await {
194 Ok(row) => Ok(Some(row.into())),
195 Err(err) => match err {
196 sqlx::Error::RowNotFound => Ok(None),
197 _ => Err(sqlx_error_to_query_err(err)),
198 },
199 }
200 })
201 }
202
203 #[instrument(level = "trace")]
205 pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
206 debug_print!("{}", stmt);
207
208 let query = sqlx_query(&stmt);
209 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
210 crate::metric::metric!(self.metric_callback, &stmt, {
211 match query.fetch_all(&mut *conn).await {
212 Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
213 Err(err) => Err(sqlx_error_to_query_err(err)),
214 }
215 })
216 }
217
218 #[instrument(level = "trace")]
220 pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
221 debug_print!("{}", stmt);
222
223 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
224 Ok(QueryStream::from((
225 conn,
226 stmt,
227 self.metric_callback.clone(),
228 )))
229 }
230
231 #[instrument(level = "trace")]
233 pub async fn begin(
234 &self,
235 isolation_level: Option<IsolationLevel>,
236 access_mode: Option<AccessMode>,
237 ) -> Result<DatabaseTransaction, DbErr> {
238 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
239 DatabaseTransaction::new_postgres(
240 conn,
241 self.metric_callback.clone(),
242 isolation_level,
243 access_mode,
244 )
245 .await
246 }
247
248 #[instrument(level = "trace", skip(callback))]
250 pub async fn transaction<F, T, E>(
251 &self,
252 callback: F,
253 isolation_level: Option<IsolationLevel>,
254 access_mode: Option<AccessMode>,
255 ) -> Result<T, TransactionError<E>>
256 where
257 F: for<'b> FnOnce(
258 &'b DatabaseTransaction,
259 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
260 + Send,
261 T: Send,
262 E: std::fmt::Display + std::fmt::Debug + Send,
263 {
264 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
265 let transaction = DatabaseTransaction::new_postgres(
266 conn,
267 self.metric_callback.clone(),
268 isolation_level,
269 access_mode,
270 )
271 .await
272 .map_err(|e| TransactionError::Connection(e))?;
273 transaction.run(callback).await
274 }
275
276 pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
277 where
278 F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
279 {
280 self.metric_callback = Some(Arc::new(callback));
281 }
282
283 pub async fn ping(&self) -> Result<(), DbErr> {
285 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
286 match conn.ping().await {
287 Ok(_) => Ok(()),
288 Err(err) => Err(sqlx_error_to_conn_err(err)),
289 }
290 }
291
292 pub async fn close(self) -> Result<(), DbErr> {
295 self.close_by_ref().await
296 }
297
298 pub async fn close_by_ref(&self) -> Result<(), DbErr> {
300 self.pool.close().await;
301 Ok(())
302 }
303}
304
305impl From<PgRow> for QueryResult {
306 fn from(row: PgRow) -> QueryResult {
307 QueryResult {
308 row: QueryResultRow::SqlxPostgres(row),
309 }
310 }
311}
312
313impl From<PgQueryResult> for ExecResult {
314 fn from(result: PgQueryResult) -> ExecResult {
315 ExecResult {
316 result: ExecResultHolder::SqlxPostgres(result),
317 }
318 }
319}
320
321pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, SqlxValues> {
322 let values = stmt
323 .values
324 .as_ref()
325 .map_or(Values(Vec::new()), |values| values.clone());
326 sqlx::query_with(&stmt.sql, SqlxValues(values))
327}
328
329pub(crate) async fn set_transaction_config(
330 conn: &mut PoolConnection<Postgres>,
331 isolation_level: Option<IsolationLevel>,
332 access_mode: Option<AccessMode>,
333) -> Result<(), DbErr> {
334 let mut settings = Vec::new();
335
336 if let Some(isolation_level) = isolation_level {
337 settings.push(format!("ISOLATION LEVEL {isolation_level}"));
338 }
339
340 if let Some(access_mode) = access_mode {
341 settings.push(access_mode.to_string());
342 }
343
344 if !settings.is_empty() {
345 let sql = format!("SET TRANSACTION {}", settings.join(" "));
346 sqlx::query(&sql)
347 .execute(&mut **conn)
348 .await
349 .map_err(sqlx_error_to_exec_err)?;
350 }
351 Ok(())
352}
353
354impl
355 From<(
356 PoolConnection<sqlx::Postgres>,
357 Statement,
358 Option<crate::metric::Callback>,
359 )> for crate::QueryStream
360{
361 fn from(
362 (conn, stmt, metric_callback): (
363 PoolConnection<sqlx::Postgres>,
364 Statement,
365 Option<crate::metric::Callback>,
366 ),
367 ) -> Self {
368 crate::QueryStream::build(
369 stmt,
370 crate::InnerConnection::Postgres(conn),
371 metric_callback,
372 )
373 }
374}
375
376impl crate::DatabaseTransaction {
377 pub(crate) async fn new_postgres(
378 inner: PoolConnection<sqlx::Postgres>,
379 metric_callback: Option<crate::metric::Callback>,
380 isolation_level: Option<IsolationLevel>,
381 access_mode: Option<AccessMode>,
382 ) -> Result<crate::DatabaseTransaction, DbErr> {
383 Self::begin(
384 Arc::new(Mutex::new(crate::InnerConnection::Postgres(inner))),
385 crate::DbBackend::Postgres,
386 metric_callback,
387 isolation_level,
388 access_mode,
389 None,
390 )
391 .await
392 }
393}
394
395#[cfg(feature = "proxy")]
396pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> crate::ProxyRow {
397 use sea_query::Value;
400 use sqlx::{Column, Row, TypeInfo};
401 crate::ProxyRow {
402 values: row
403 .columns()
404 .iter()
405 .map(|c| {
406 (
407 c.name().to_string(),
408 match c.type_info().name() {
409 "BOOL" => {
410 Value::Bool(row.try_get(c.ordinal()).expect("Failed to get boolean"))
411 }
412 #[cfg(feature = "postgres-array")]
413 "BOOL[]" => Value::Array(
414 sea_query::ArrayType::Bool,
415 row.try_get::<Option<Vec<bool>>, _>(c.ordinal())
416 .expect("Failed to get boolean array")
417 .map(|vals| {
418 Box::new(
419 vals.into_iter()
420 .map(|val| Value::Bool(Some(val)))
421 .collect(),
422 )
423 }),
424 ),
425
426 "\"CHAR\"" => Value::TinyInt(
427 row.try_get(c.ordinal())
428 .expect("Failed to get small integer"),
429 ),
430 #[cfg(feature = "postgres-array")]
431 "\"CHAR\"[]" => Value::Array(
432 sea_query::ArrayType::TinyInt,
433 row.try_get::<Option<Vec<i8>>, _>(c.ordinal())
434 .expect("Failed to get small integer array")
435 .map(|vals: Vec<i8>| {
436 Box::new(
437 vals.into_iter()
438 .map(|val| Value::TinyInt(Some(val)))
439 .collect(),
440 )
441 }),
442 ),
443
444 "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(
445 row.try_get(c.ordinal())
446 .expect("Failed to get small integer"),
447 ),
448 #[cfg(feature = "postgres-array")]
449 "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
450 sea_query::ArrayType::SmallInt,
451 row.try_get::<Option<Vec<i16>>, _>(c.ordinal())
452 .expect("Failed to get small integer array")
453 .map(|vals: Vec<i16>| {
454 Box::new(
455 vals.into_iter()
456 .map(|val| Value::SmallInt(Some(val)))
457 .collect(),
458 )
459 }),
460 ),
461
462 "INT" | "SERIAL" | "INT4" => {
463 Value::Int(row.try_get(c.ordinal()).expect("Failed to get integer"))
464 }
465 #[cfg(feature = "postgres-array")]
466 "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
467 sea_query::ArrayType::Int,
468 row.try_get::<Option<Vec<i32>>, _>(c.ordinal())
469 .expect("Failed to get integer array")
470 .map(|vals: Vec<i32>| {
471 Box::new(
472 vals.into_iter().map(|val| Value::Int(Some(val))).collect(),
473 )
474 }),
475 ),
476
477 "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(
478 row.try_get(c.ordinal()).expect("Failed to get big integer"),
479 ),
480 #[cfg(feature = "postgres-array")]
481 "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
482 sea_query::ArrayType::BigInt,
483 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
484 .expect("Failed to get big integer array")
485 .map(|vals: Vec<i64>| {
486 Box::new(
487 vals.into_iter()
488 .map(|val| Value::BigInt(Some(val)))
489 .collect(),
490 )
491 }),
492 ),
493
494 "FLOAT4" | "REAL" => {
495 Value::Float(row.try_get(c.ordinal()).expect("Failed to get float"))
496 }
497 #[cfg(feature = "postgres-array")]
498 "FLOAT4[]" | "REAL[]" => Value::Array(
499 sea_query::ArrayType::Float,
500 row.try_get::<Option<Vec<f32>>, _>(c.ordinal())
501 .expect("Failed to get float array")
502 .map(|vals| {
503 Box::new(
504 vals.into_iter()
505 .map(|val| Value::Float(Some(val)))
506 .collect(),
507 )
508 }),
509 ),
510
511 "FLOAT8" | "DOUBLE PRECISION" => {
512 Value::Double(row.try_get(c.ordinal()).expect("Failed to get double"))
513 }
514 #[cfg(feature = "postgres-array")]
515 "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
516 sea_query::ArrayType::Double,
517 row.try_get::<Option<Vec<f64>>, _>(c.ordinal())
518 .expect("Failed to get double array")
519 .map(|vals| {
520 Box::new(
521 vals.into_iter()
522 .map(|val| Value::Double(Some(val)))
523 .collect(),
524 )
525 }),
526 ),
527
528 "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(
529 row.try_get::<Option<String>, _>(c.ordinal())
530 .expect("Failed to get string"),
531 ),
532 #[cfg(feature = "postgres-array")]
533 "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
534 sea_query::ArrayType::String,
535 row.try_get::<Option<Vec<String>>, _>(c.ordinal())
536 .expect("Failed to get string array")
537 .map(|vals| {
538 Box::new(
539 vals.into_iter()
540 .map(|val| Value::String(Some(val)))
541 .collect(),
542 )
543 }),
544 ),
545
546 "BYTEA" => Value::Bytes(
547 row.try_get::<Option<Vec<u8>>, _>(c.ordinal())
548 .expect("Failed to get bytes"),
549 ),
550 #[cfg(feature = "postgres-array")]
551 "BYTEA[]" => Value::Array(
552 sea_query::ArrayType::Bytes,
553 row.try_get::<Option<Vec<Vec<u8>>>, _>(c.ordinal())
554 .expect("Failed to get bytes array")
555 .map(|vals| {
556 Box::new(
557 vals.into_iter()
558 .map(|val| Value::Bytes(Some(val)))
559 .collect(),
560 )
561 }),
562 ),
563
564 #[cfg(feature = "with-bigdecimal")]
565 "NUMERIC" => Value::BigDecimal(
566 row.try_get::<Option<bigdecimal::BigDecimal>, _>(c.ordinal())
567 .expect("Failed to get numeric"),
568 ),
569 #[cfg(all(
570 feature = "with-rust_decimal",
571 not(feature = "with-bigdecimal")
572 ))]
573 "NUMERIC" => {
574 Value::Decimal(row.try_get(c.ordinal()).expect("Failed to get numeric"))
575 }
576
577 #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
578 "NUMERIC[]" => Value::Array(
579 sea_query::ArrayType::BigDecimal,
580 row.try_get::<Option<Vec<bigdecimal::BigDecimal>>, _>(c.ordinal())
581 .expect("Failed to get numeric array")
582 .map(|vals| {
583 Box::new(
584 vals.into_iter()
585 .map(|val| Value::BigDecimal(Some(val)))
586 .collect(),
587 )
588 }),
589 ),
590 #[cfg(all(
591 feature = "with-rust_decimal",
592 not(feature = "with-bigdecimal"),
593 feature = "postgres-array"
594 ))]
595 "NUMERIC[]" => Value::Array(
596 sea_query::ArrayType::Decimal,
597 row.try_get::<Option<Vec<rust_decimal::Decimal>>, _>(c.ordinal())
598 .expect("Failed to get numeric array")
599 .map(|vals| {
600 Box::new(
601 vals.into_iter()
602 .map(|val| Value::Decimal(Some(val)))
603 .collect(),
604 )
605 }),
606 ),
607
608 "OID" => {
609 Value::BigInt(row.try_get(c.ordinal()).expect("Failed to get oid"))
610 }
611 #[cfg(feature = "postgres-array")]
612 "OID[]" => Value::Array(
613 sea_query::ArrayType::BigInt,
614 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
615 .expect("Failed to get oid array")
616 .map(|vals| {
617 Box::new(
618 vals.into_iter()
619 .map(|val| Value::BigInt(Some(val)))
620 .collect(),
621 )
622 }),
623 ),
624
625 "JSON" | "JSONB" => Value::Json(
626 row.try_get::<Option<serde_json::Value>, _>(c.ordinal())
627 .expect("Failed to get json")
628 .map(Box::new),
629 ),
630 #[cfg(any(feature = "json-array", feature = "postgres-array"))]
631 "JSON[]" | "JSONB[]" => Value::Array(
632 sea_query::ArrayType::Json,
633 row.try_get::<Option<Vec<serde_json::Value>>, _>(c.ordinal())
634 .expect("Failed to get json array")
635 .map(|vals| {
636 Box::new(
637 vals.into_iter()
638 .map(|val| Value::Json(Some(Box::new(val))))
639 .collect(),
640 )
641 }),
642 ),
643
644 #[cfg(feature = "with-ipnetwork")]
645 "INET" | "CIDR" => Value::IpNetwork(
646 row.try_get::<Option<ipnetwork::IpNetwork>, _>(c.ordinal())
647 .expect("Failed to get ip address"),
648 ),
649 #[cfg(feature = "with-ipnetwork")]
650 "INET[]" | "CIDR[]" => Value::Array(
651 sea_query::ArrayType::IpNetwork,
652 row.try_get::<Option<Vec<ipnetwork::IpNetwork>>, _>(c.ordinal())
653 .expect("Failed to get ip address array")
654 .map(|vals| {
655 Box::new(
656 vals.into_iter()
657 .map(|val| Value::IpNetwork(Some(val)))
658 .collect(),
659 )
660 }),
661 ),
662
663 #[cfg(feature = "with-mac_address")]
664 "MACADDR" | "MACADDR8" => Value::MacAddress(
665 row.try_get::<Option<mac_address::MacAddress>, _>(c.ordinal())
666 .expect("Failed to get mac address"),
667 ),
668 #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
669 "MACADDR[]" | "MACADDR8[]" => Value::Array(
670 sea_query::ArrayType::MacAddress,
671 row.try_get::<Option<Vec<mac_address::MacAddress>>, _>(c.ordinal())
672 .expect("Failed to get mac address array")
673 .map(|vals| {
674 Box::new(
675 vals.into_iter()
676 .map(|val| Value::MacAddress(Some(val)))
677 .collect(),
678 )
679 }),
680 ),
681
682 #[cfg(feature = "with-chrono")]
683 "TIMESTAMP" => Value::ChronoDateTime(
684 row.try_get::<Option<chrono::NaiveDateTime>, _>(c.ordinal())
685 .expect("Failed to get timestamp"),
686 ),
687 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
688 "TIMESTAMP" => Value::TimeDateTime(
689 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
690 .expect("Failed to get timestamp"),
691 ),
692
693 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
694 "TIMESTAMP[]" => Value::Array(
695 sea_query::ArrayType::ChronoDateTime,
696 row.try_get::<Option<Vec<chrono::NaiveDateTime>>, _>(c.ordinal())
697 .expect("Failed to get timestamp array")
698 .map(|vals| {
699 Box::new(
700 vals.into_iter()
701 .map(|val| Value::ChronoDateTime(Some(val)))
702 .collect(),
703 )
704 }),
705 ),
706 #[cfg(all(
707 feature = "with-time",
708 not(feature = "with-chrono"),
709 feature = "postgres-array"
710 ))]
711 "TIMESTAMP[]" => Value::Array(
712 sea_query::ArrayType::TimeDateTime,
713 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
714 .expect("Failed to get timestamp array")
715 .map(|vals| {
716 Box::new(
717 vals.into_iter()
718 .map(|val| Value::TimeDateTime(Some(val)))
719 .collect(),
720 )
721 }),
722 ),
723
724 #[cfg(feature = "with-chrono")]
725 "DATE" => Value::ChronoDate(
726 row.try_get::<Option<chrono::NaiveDate>, _>(c.ordinal())
727 .expect("Failed to get date"),
728 ),
729 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
730 "DATE" => Value::TimeDate(
731 row.try_get::<Option<time::Date>, _>(c.ordinal())
732 .expect("Failed to get date"),
733 ),
734
735 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
736 "DATE[]" => Value::Array(
737 sea_query::ArrayType::ChronoDate,
738 row.try_get::<Option<Vec<chrono::NaiveDate>>, _>(c.ordinal())
739 .expect("Failed to get date array")
740 .map(|vals| {
741 Box::new(
742 vals.into_iter()
743 .map(|val| Value::ChronoDate(Some(val)))
744 .collect(),
745 )
746 }),
747 ),
748 #[cfg(all(
749 feature = "with-time",
750 not(feature = "with-chrono"),
751 feature = "postgres-array"
752 ))]
753 "DATE[]" => Value::Array(
754 sea_query::ArrayType::TimeDate,
755 row.try_get::<Option<Vec<time::Date>>, _>(c.ordinal())
756 .expect("Failed to get date array")
757 .map(|vals| {
758 Box::new(
759 vals.into_iter()
760 .map(|val| Value::TimeDate(Some(val)))
761 .collect(),
762 )
763 }),
764 ),
765
766 #[cfg(feature = "with-chrono")]
767 "TIME" => Value::ChronoTime(
768 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
769 .expect("Failed to get time"),
770 ),
771 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
772 "TIME" => Value::TimeTime(
773 row.try_get::<Option<time::Time>, _>(c.ordinal())
774 .expect("Failed to get time"),
775 ),
776
777 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
778 "TIME[]" => Value::Array(
779 sea_query::ArrayType::ChronoTime,
780 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
781 .expect("Failed to get time array")
782 .map(|vals| {
783 Box::new(
784 vals.into_iter()
785 .map(|val| Value::ChronoTime(Some(val)))
786 .collect(),
787 )
788 }),
789 ),
790 #[cfg(all(
791 feature = "with-time",
792 not(feature = "with-chrono"),
793 feature = "postgres-array"
794 ))]
795 "TIME[]" => Value::Array(
796 sea_query::ArrayType::TimeTime,
797 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
798 .expect("Failed to get time array")
799 .map(|vals| {
800 Box::new(
801 vals.into_iter()
802 .map(|val| Value::TimeTime(Some(val)))
803 .collect(),
804 )
805 }),
806 ),
807
808 #[cfg(feature = "with-chrono")]
809 "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(
810 row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
811 .expect("Failed to get timestamptz"),
812 ),
813 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
814 "TIMESTAMPTZ" => Value::TimeDateTime(
815 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
816 .expect("Failed to get timestamptz"),
817 ),
818
819 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
820 "TIMESTAMPTZ[]" => Value::Array(
821 sea_query::ArrayType::ChronoDateTimeUtc,
822 row.try_get::<Option<Vec<chrono::DateTime<chrono::Utc>>>, _>(
823 c.ordinal(),
824 )
825 .expect("Failed to get timestamptz array")
826 .map(|vals| {
827 Box::new(
828 vals.into_iter()
829 .map(|val| Value::ChronoDateTimeUtc(Some(val)))
830 .collect(),
831 )
832 }),
833 ),
834 #[cfg(all(
835 feature = "with-time",
836 not(feature = "with-chrono"),
837 feature = "postgres-array"
838 ))]
839 "TIMESTAMPTZ[]" => Value::Array(
840 sea_query::ArrayType::TimeDateTime,
841 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
842 .expect("Failed to get timestamptz array")
843 .map(|vals| {
844 Box::new(
845 vals.into_iter()
846 .map(|val| Value::TimeDateTime(Some(val)))
847 .collect(),
848 )
849 }),
850 ),
851
852 #[cfg(feature = "with-chrono")]
853 "TIMETZ" => Value::ChronoTime(
854 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
855 .expect("Failed to get timetz"),
856 ),
857 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
858 "TIMETZ" => {
859 Value::TimeTime(row.try_get(c.ordinal()).expect("Failed to get timetz"))
860 }
861
862 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
863 "TIMETZ[]" => Value::Array(
864 sea_query::ArrayType::ChronoTime,
865 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
866 .expect("Failed to get timetz array")
867 .map(|vals| {
868 Box::new(
869 vals.into_iter()
870 .map(|val| Value::ChronoTime(Some(val)))
871 .collect(),
872 )
873 }),
874 ),
875 #[cfg(all(
876 feature = "with-time",
877 not(feature = "with-chrono"),
878 feature = "postgres-array"
879 ))]
880 "TIMETZ[]" => Value::Array(
881 sea_query::ArrayType::TimeTime,
882 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
883 .expect("Failed to get timetz array")
884 .map(|vals| {
885 Box::new(
886 vals.into_iter()
887 .map(|val| Value::TimeTime(Some(val)))
888 .collect(),
889 )
890 }),
891 ),
892
893 #[cfg(feature = "with-uuid")]
894 "UUID" => Value::Uuid(
895 row.try_get::<Option<uuid::Uuid>, _>(c.ordinal())
896 .expect("Failed to get uuid"),
897 ),
898
899 #[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
900 "UUID[]" => Value::Array(
901 sea_query::ArrayType::Uuid,
902 row.try_get::<Option<Vec<uuid::Uuid>>, _>(c.ordinal())
903 .expect("Failed to get uuid array")
904 .map(|vals| {
905 Box::new(
906 vals.into_iter()
907 .map(|val| Value::Uuid(Some(val)))
908 .collect(),
909 )
910 }),
911 ),
912
913 _ => unreachable!("Unknown column type: {}", c.type_info().name()),
914 },
915 )
916 })
917 .collect(),
918 }
919}