1use futures_util::lock::Mutex;
2use log::LevelFilter;
3use sea_query::Values;
4use std::{fmt::Write, future::Future, pin::Pin, sync::Arc};
5
6use sqlx::{
7 pool::PoolConnection,
8 postgres::{PgConnectOptions, PgQueryResult, PgRow},
9 Connection, Executor, PgPool, Postgres,
10};
11
12use sea_query_binder::SqlxValues;
13use tracing::instrument;
14
15use crate::{
16 debug_print, error::*, executor::*, AccessMode, ConnectOptions, DatabaseConnection,
17 DatabaseTransaction, DbBackend, IsolationLevel, QueryStream, Statement, TransactionError,
18};
19
20use super::sqlx_common::*;
21
22#[derive(Debug)]
24pub struct SqlxPostgresConnector;
25
26#[derive(Clone)]
28pub struct SqlxPostgresPoolConnection {
29 pub(crate) pool: PgPool,
30 metric_callback: Option<crate::metric::Callback>,
31}
32
33impl std::fmt::Debug for SqlxPostgresPoolConnection {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
36 }
37}
38
39impl From<PgPool> for SqlxPostgresPoolConnection {
40 fn from(pool: PgPool) -> Self {
41 SqlxPostgresPoolConnection {
42 pool,
43 metric_callback: None,
44 }
45 }
46}
47
48impl From<PgPool> for DatabaseConnection {
49 fn from(pool: PgPool) -> Self {
50 DatabaseConnection::SqlxPostgresPoolConnection(pool.into())
51 }
52}
53
54impl SqlxPostgresConnector {
55 pub fn accepts(string: &str) -> bool {
57 string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
58 }
59
60 #[instrument(level = "trace")]
62 pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
63 let mut sqlx_opts = options
64 .url
65 .parse::<PgConnectOptions>()
66 .map_err(sqlx_error_to_conn_err)?;
67 use sqlx::ConnectOptions;
68 if !options.sqlx_logging {
69 sqlx_opts = sqlx_opts.disable_statement_logging();
70 } else {
71 sqlx_opts = sqlx_opts.log_statements(options.sqlx_logging_level);
72 if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
73 sqlx_opts = sqlx_opts.log_slow_statements(
74 options.sqlx_slow_statements_logging_level,
75 options.sqlx_slow_statements_logging_threshold,
76 );
77 }
78 }
79
80 if let Some(f) = &options.pg_opts_fn {
81 sqlx_opts = f(sqlx_opts);
82 }
83
84 let set_search_path_sql = options.schema_search_path.as_ref().map(|schema| {
85 let mut string = "SET search_path = ".to_owned();
86 if schema.starts_with('"') {
87 write!(&mut string, "{schema}").unwrap();
88 } else {
89 for (i, schema) in schema.split(',').enumerate() {
90 if i > 0 {
91 write!(&mut string, ",").unwrap();
92 }
93 if schema.starts_with('"') {
94 write!(&mut string, "{schema}").unwrap();
95 } else {
96 write!(&mut string, "\"{schema}\"").unwrap();
97 }
98 }
99 }
100 string
101 });
102 let lazy = options.connect_lazy;
103 let mut pool_options = options.sqlx_pool_options();
104 if let Some(sql) = set_search_path_sql {
105 pool_options = pool_options.after_connect(move |conn, _| {
106 let sql = sql.clone();
107 Box::pin(async move {
108 sqlx::Executor::execute(conn, sql.as_str())
109 .await
110 .map(|_| ())
111 })
112 });
113 }
114 let pool = if lazy {
115 pool_options.connect_lazy_with(sqlx_opts)
116 } else {
117 pool_options
118 .connect_with(sqlx_opts)
119 .await
120 .map_err(sqlx_error_to_conn_err)?
121 };
122 Ok(DatabaseConnection::SqlxPostgresPoolConnection(
123 SqlxPostgresPoolConnection {
124 pool,
125 metric_callback: None,
126 },
127 ))
128 }
129}
130
131impl SqlxPostgresConnector {
132 pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
134 DatabaseConnection::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
135 pool,
136 metric_callback: None,
137 })
138 }
139}
140
141impl SqlxPostgresPoolConnection {
142 #[instrument(level = "trace")]
144 pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
145 debug_print!("{}", stmt);
146
147 let query = sqlx_query(&stmt);
148 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
149 crate::metric::metric!(self.metric_callback, &stmt, {
150 match query.execute(&mut *conn).await {
151 Ok(res) => Ok(res.into()),
152 Err(err) => Err(sqlx_error_to_exec_err(err)),
153 }
154 })
155 }
156
157 #[instrument(level = "trace")]
159 pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
160 debug_print!("{}", sql);
161
162 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
163 match conn.execute(sql).await {
164 Ok(res) => Ok(res.into()),
165 Err(err) => Err(sqlx_error_to_exec_err(err)),
166 }
167 }
168
169 #[instrument(level = "trace")]
171 pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
172 debug_print!("{}", stmt);
173
174 let query = sqlx_query(&stmt);
175 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
176 crate::metric::metric!(self.metric_callback, &stmt, {
177 match query.fetch_one(&mut *conn).await {
178 Ok(row) => Ok(Some(row.into())),
179 Err(err) => match err {
180 sqlx::Error::RowNotFound => Ok(None),
181 _ => Err(sqlx_error_to_query_err(err)),
182 },
183 }
184 })
185 }
186
187 #[instrument(level = "trace")]
189 pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
190 debug_print!("{}", stmt);
191
192 let query = sqlx_query(&stmt);
193 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
194 crate::metric::metric!(self.metric_callback, &stmt, {
195 match query.fetch_all(&mut *conn).await {
196 Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
197 Err(err) => Err(sqlx_error_to_query_err(err)),
198 }
199 })
200 }
201
202 #[instrument(level = "trace")]
204 pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
205 debug_print!("{}", stmt);
206
207 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
208 Ok(QueryStream::from((
209 conn,
210 stmt,
211 self.metric_callback.clone(),
212 )))
213 }
214
215 #[instrument(level = "trace")]
217 pub async fn begin(
218 &self,
219 isolation_level: Option<IsolationLevel>,
220 access_mode: Option<AccessMode>,
221 ) -> Result<DatabaseTransaction, DbErr> {
222 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
223 DatabaseTransaction::new_postgres(
224 conn,
225 self.metric_callback.clone(),
226 isolation_level,
227 access_mode,
228 )
229 .await
230 }
231
232 #[instrument(level = "trace", skip(callback))]
234 pub async fn transaction<F, T, E>(
235 &self,
236 callback: F,
237 isolation_level: Option<IsolationLevel>,
238 access_mode: Option<AccessMode>,
239 ) -> Result<T, TransactionError<E>>
240 where
241 F: for<'b> FnOnce(
242 &'b DatabaseTransaction,
243 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
244 + Send,
245 T: Send,
246 E: std::fmt::Display + std::fmt::Debug + Send,
247 {
248 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
249 let transaction = DatabaseTransaction::new_postgres(
250 conn,
251 self.metric_callback.clone(),
252 isolation_level,
253 access_mode,
254 )
255 .await
256 .map_err(|e| TransactionError::Connection(e))?;
257 transaction.run(callback).await
258 }
259
260 pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
261 where
262 F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
263 {
264 self.metric_callback = Some(Arc::new(callback));
265 }
266
267 pub async fn ping(&self) -> Result<(), DbErr> {
269 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
270 match conn.ping().await {
271 Ok(_) => Ok(()),
272 Err(err) => Err(sqlx_error_to_conn_err(err)),
273 }
274 }
275
276 pub async fn close(self) -> Result<(), DbErr> {
279 self.close_by_ref().await
280 }
281
282 pub async fn close_by_ref(&self) -> Result<(), DbErr> {
284 self.pool.close().await;
285 Ok(())
286 }
287}
288
289impl From<PgRow> for QueryResult {
290 fn from(row: PgRow) -> QueryResult {
291 QueryResult {
292 row: QueryResultRow::SqlxPostgres(row),
293 }
294 }
295}
296
297impl From<PgQueryResult> for ExecResult {
298 fn from(result: PgQueryResult) -> ExecResult {
299 ExecResult {
300 result: ExecResultHolder::SqlxPostgres(result),
301 }
302 }
303}
304
305pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, SqlxValues> {
306 let values = stmt
307 .values
308 .as_ref()
309 .map_or(Values(Vec::new()), |values| values.clone());
310 sqlx::query_with(&stmt.sql, SqlxValues(values))
311}
312
313pub(crate) async fn set_transaction_config(
314 conn: &mut PoolConnection<Postgres>,
315 isolation_level: Option<IsolationLevel>,
316 access_mode: Option<AccessMode>,
317) -> Result<(), DbErr> {
318 if let Some(isolation_level) = isolation_level {
319 let stmt = Statement {
320 sql: format!("SET TRANSACTION ISOLATION LEVEL {isolation_level}"),
321 values: None,
322 db_backend: DbBackend::Postgres,
323 };
324 let query = sqlx_query(&stmt);
325 conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
326 }
327 if let Some(access_mode) = access_mode {
328 let stmt = Statement {
329 sql: format!("SET TRANSACTION {access_mode}"),
330 values: None,
331 db_backend: DbBackend::Postgres,
332 };
333 let query = sqlx_query(&stmt);
334 conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
335 }
336 Ok(())
337}
338
339impl
340 From<(
341 PoolConnection<sqlx::Postgres>,
342 Statement,
343 Option<crate::metric::Callback>,
344 )> for crate::QueryStream
345{
346 fn from(
347 (conn, stmt, metric_callback): (
348 PoolConnection<sqlx::Postgres>,
349 Statement,
350 Option<crate::metric::Callback>,
351 ),
352 ) -> Self {
353 crate::QueryStream::build(
354 stmt,
355 crate::InnerConnection::Postgres(conn),
356 metric_callback,
357 )
358 }
359}
360
361impl crate::DatabaseTransaction {
362 pub(crate) async fn new_postgres(
363 inner: PoolConnection<sqlx::Postgres>,
364 metric_callback: Option<crate::metric::Callback>,
365 isolation_level: Option<IsolationLevel>,
366 access_mode: Option<AccessMode>,
367 ) -> Result<crate::DatabaseTransaction, DbErr> {
368 Self::begin(
369 Arc::new(Mutex::new(crate::InnerConnection::Postgres(inner))),
370 crate::DbBackend::Postgres,
371 metric_callback,
372 isolation_level,
373 access_mode,
374 )
375 .await
376 }
377}
378
379#[cfg(feature = "proxy")]
380pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> crate::ProxyRow {
381 use sea_query::Value;
384 use sqlx::{Column, Row, TypeInfo};
385 crate::ProxyRow {
386 values: row
387 .columns()
388 .iter()
389 .map(|c| {
390 (
391 c.name().to_string(),
392 match c.type_info().name() {
393 "BOOL" => {
394 Value::Bool(row.try_get(c.ordinal()).expect("Failed to get boolean"))
395 }
396 #[cfg(feature = "postgres-array")]
397 "BOOL[]" => Value::Array(
398 sea_query::ArrayType::Bool,
399 row.try_get::<Option<Vec<bool>>, _>(c.ordinal())
400 .expect("Failed to get boolean array")
401 .map(|vals| {
402 Box::new(
403 vals.into_iter()
404 .map(|val| Value::Bool(Some(val)))
405 .collect(),
406 )
407 }),
408 ),
409
410 "\"CHAR\"" => Value::TinyInt(
411 row.try_get(c.ordinal())
412 .expect("Failed to get small integer"),
413 ),
414 #[cfg(feature = "postgres-array")]
415 "\"CHAR\"[]" => Value::Array(
416 sea_query::ArrayType::TinyInt,
417 row.try_get::<Option<Vec<i8>>, _>(c.ordinal())
418 .expect("Failed to get small integer array")
419 .map(|vals: Vec<i8>| {
420 Box::new(
421 vals.into_iter()
422 .map(|val| Value::TinyInt(Some(val)))
423 .collect(),
424 )
425 }),
426 ),
427
428 "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(
429 row.try_get(c.ordinal())
430 .expect("Failed to get small integer"),
431 ),
432 #[cfg(feature = "postgres-array")]
433 "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
434 sea_query::ArrayType::SmallInt,
435 row.try_get::<Option<Vec<i16>>, _>(c.ordinal())
436 .expect("Failed to get small integer array")
437 .map(|vals: Vec<i16>| {
438 Box::new(
439 vals.into_iter()
440 .map(|val| Value::SmallInt(Some(val)))
441 .collect(),
442 )
443 }),
444 ),
445
446 "INT" | "SERIAL" | "INT4" => {
447 Value::Int(row.try_get(c.ordinal()).expect("Failed to get integer"))
448 }
449 #[cfg(feature = "postgres-array")]
450 "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
451 sea_query::ArrayType::Int,
452 row.try_get::<Option<Vec<i32>>, _>(c.ordinal())
453 .expect("Failed to get integer array")
454 .map(|vals: Vec<i32>| {
455 Box::new(
456 vals.into_iter().map(|val| Value::Int(Some(val))).collect(),
457 )
458 }),
459 ),
460
461 "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(
462 row.try_get(c.ordinal()).expect("Failed to get big integer"),
463 ),
464 #[cfg(feature = "postgres-array")]
465 "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
466 sea_query::ArrayType::BigInt,
467 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
468 .expect("Failed to get big integer array")
469 .map(|vals: Vec<i64>| {
470 Box::new(
471 vals.into_iter()
472 .map(|val| Value::BigInt(Some(val)))
473 .collect(),
474 )
475 }),
476 ),
477
478 "FLOAT4" | "REAL" => {
479 Value::Float(row.try_get(c.ordinal()).expect("Failed to get float"))
480 }
481 #[cfg(feature = "postgres-array")]
482 "FLOAT4[]" | "REAL[]" => Value::Array(
483 sea_query::ArrayType::Float,
484 row.try_get::<Option<Vec<f32>>, _>(c.ordinal())
485 .expect("Failed to get float array")
486 .map(|vals| {
487 Box::new(
488 vals.into_iter()
489 .map(|val| Value::Float(Some(val)))
490 .collect(),
491 )
492 }),
493 ),
494
495 "FLOAT8" | "DOUBLE PRECISION" => {
496 Value::Double(row.try_get(c.ordinal()).expect("Failed to get double"))
497 }
498 #[cfg(feature = "postgres-array")]
499 "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
500 sea_query::ArrayType::Double,
501 row.try_get::<Option<Vec<f64>>, _>(c.ordinal())
502 .expect("Failed to get double array")
503 .map(|vals| {
504 Box::new(
505 vals.into_iter()
506 .map(|val| Value::Double(Some(val)))
507 .collect(),
508 )
509 }),
510 ),
511
512 "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(
513 row.try_get::<Option<String>, _>(c.ordinal())
514 .expect("Failed to get string")
515 .map(Box::new),
516 ),
517 #[cfg(feature = "postgres-array")]
518 "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
519 sea_query::ArrayType::String,
520 row.try_get::<Option<Vec<String>>, _>(c.ordinal())
521 .expect("Failed to get string array")
522 .map(|vals| {
523 Box::new(
524 vals.into_iter()
525 .map(|val| Value::String(Some(Box::new(val))))
526 .collect(),
527 )
528 }),
529 ),
530
531 "BYTEA" => Value::Bytes(
532 row.try_get::<Option<Vec<u8>>, _>(c.ordinal())
533 .expect("Failed to get bytes")
534 .map(Box::new),
535 ),
536 #[cfg(feature = "postgres-array")]
537 "BYTEA[]" => Value::Array(
538 sea_query::ArrayType::Bytes,
539 row.try_get::<Option<Vec<Vec<u8>>>, _>(c.ordinal())
540 .expect("Failed to get bytes array")
541 .map(|vals| {
542 Box::new(
543 vals.into_iter()
544 .map(|val| Value::Bytes(Some(Box::new(val))))
545 .collect(),
546 )
547 }),
548 ),
549
550 #[cfg(feature = "with-bigdecimal")]
551 "NUMERIC" => Value::BigDecimal(
552 row.try_get::<Option<bigdecimal::BigDecimal>, _>(c.ordinal())
553 .expect("Failed to get numeric")
554 .map(Box::new),
555 ),
556 #[cfg(all(
557 feature = "with-rust_decimal",
558 not(feature = "with-bigdecimal")
559 ))]
560 "NUMERIC" => Value::Decimal(
561 row.try_get::<Option<rust_decimal::Decimal>, _>(c.ordinal())
562 .expect("Failed to get numeric")
563 .map(Box::new),
564 ),
565
566 #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
567 "NUMERIC[]" => Value::Array(
568 sea_query::ArrayType::BigDecimal,
569 row.try_get::<Option<Vec<bigdecimal::BigDecimal>>, _>(c.ordinal())
570 .expect("Failed to get numeric array")
571 .map(|vals| {
572 Box::new(
573 vals.into_iter()
574 .map(|val| Value::BigDecimal(Some(Box::new(val))))
575 .collect(),
576 )
577 }),
578 ),
579 #[cfg(all(
580 feature = "with-rust_decimal",
581 not(feature = "with-bigdecimal"),
582 feature = "postgres-array"
583 ))]
584 "NUMERIC[]" => Value::Array(
585 sea_query::ArrayType::Decimal,
586 row.try_get::<Option<Vec<rust_decimal::Decimal>>, _>(c.ordinal())
587 .expect("Failed to get numeric array")
588 .map(|vals| {
589 Box::new(
590 vals.into_iter()
591 .map(|val| Value::Decimal(Some(Box::new(val))))
592 .collect(),
593 )
594 }),
595 ),
596
597 "OID" => {
598 Value::BigInt(row.try_get(c.ordinal()).expect("Failed to get oid"))
599 }
600 #[cfg(feature = "postgres-array")]
601 "OID[]" => Value::Array(
602 sea_query::ArrayType::BigInt,
603 row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
604 .expect("Failed to get oid array")
605 .map(|vals| {
606 Box::new(
607 vals.into_iter()
608 .map(|val| Value::BigInt(Some(val)))
609 .collect(),
610 )
611 }),
612 ),
613
614 #[cfg(feature = "with-json")]
615 "JSON" | "JSONB" => Value::Json(
616 row.try_get::<Option<serde_json::Value>, _>(c.ordinal())
617 .expect("Failed to get json")
618 .map(Box::new),
619 ),
620 #[cfg(all(
621 feature = "with-json",
622 any(feature = "json-array", feature = "postgres-array")
623 ))]
624 "JSON[]" | "JSONB[]" => Value::Array(
625 sea_query::ArrayType::Json,
626 row.try_get::<Option<Vec<serde_json::Value>>, _>(c.ordinal())
627 .expect("Failed to get json array")
628 .map(|vals| {
629 Box::new(
630 vals.into_iter()
631 .map(|val| Value::Json(Some(Box::new(val))))
632 .collect(),
633 )
634 }),
635 ),
636
637 #[cfg(feature = "with-ipnetwork")]
638 "INET" | "CIDR" => Value::IpNetwork(
639 row.try_get::<Option<ipnetwork::IpNetwork>, _>(c.ordinal())
640 .expect("Failed to get ip address")
641 .map(Box::new),
642 ),
643 #[cfg(feature = "with-ipnetwork")]
644 "INET[]" | "CIDR[]" => Value::Array(
645 sea_query::ArrayType::IpNetwork,
646 row.try_get::<Option<Vec<ipnetwork::IpNetwork>>, _>(c.ordinal())
647 .expect("Failed to get ip address array")
648 .map(|vals| {
649 Box::new(
650 vals.into_iter()
651 .map(|val| Value::IpNetwork(Some(Box::new(val))))
652 .collect(),
653 )
654 }),
655 ),
656
657 #[cfg(feature = "with-mac_address")]
658 "MACADDR" | "MACADDR8" => Value::MacAddress(
659 row.try_get::<Option<mac_address::MacAddress>, _>(c.ordinal())
660 .expect("Failed to get mac address")
661 .map(Box::new),
662 ),
663 #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
664 "MACADDR[]" | "MACADDR8[]" => Value::Array(
665 sea_query::ArrayType::MacAddress,
666 row.try_get::<Option<Vec<mac_address::MacAddress>>, _>(c.ordinal())
667 .expect("Failed to get mac address array")
668 .map(|vals| {
669 Box::new(
670 vals.into_iter()
671 .map(|val| Value::MacAddress(Some(Box::new(val))))
672 .collect(),
673 )
674 }),
675 ),
676
677 #[cfg(feature = "with-chrono")]
678 "TIMESTAMP" => Value::ChronoDateTime(
679 row.try_get::<Option<chrono::NaiveDateTime>, _>(c.ordinal())
680 .expect("Failed to get timestamp")
681 .map(Box::new),
682 ),
683 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
684 "TIMESTAMP" => Value::TimeDateTime(
685 row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
686 .expect("Failed to get timestamp")
687 .map(Box::new),
688 ),
689
690 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
691 "TIMESTAMP[]" => Value::Array(
692 sea_query::ArrayType::ChronoDateTime,
693 row.try_get::<Option<Vec<chrono::NaiveDateTime>>, _>(c.ordinal())
694 .expect("Failed to get timestamp array")
695 .map(|vals| {
696 Box::new(
697 vals.into_iter()
698 .map(|val| Value::ChronoDateTime(Some(Box::new(val))))
699 .collect(),
700 )
701 }),
702 ),
703 #[cfg(all(
704 feature = "with-time",
705 not(feature = "with-chrono"),
706 feature = "postgres-array"
707 ))]
708 "TIMESTAMP[]" => Value::Array(
709 sea_query::ArrayType::TimeDateTime,
710 row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
711 .expect("Failed to get timestamp array")
712 .map(|vals| {
713 Box::new(
714 vals.into_iter()
715 .map(|val| Value::TimeDateTime(Some(Box::new(val))))
716 .collect(),
717 )
718 }),
719 ),
720
721 #[cfg(feature = "with-chrono")]
722 "DATE" => Value::ChronoDate(
723 row.try_get::<Option<chrono::NaiveDate>, _>(c.ordinal())
724 .expect("Failed to get date")
725 .map(Box::new),
726 ),
727 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
728 "DATE" => Value::TimeDate(
729 row.try_get::<Option<time::Date>, _>(c.ordinal())
730 .expect("Failed to get date")
731 .map(Box::new),
732 ),
733
734 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
735 "DATE[]" => Value::Array(
736 sea_query::ArrayType::ChronoDate,
737 row.try_get::<Option<Vec<chrono::NaiveDate>>, _>(c.ordinal())
738 .expect("Failed to get date array")
739 .map(|vals| {
740 Box::new(
741 vals.into_iter()
742 .map(|val| Value::ChronoDate(Some(Box::new(val))))
743 .collect(),
744 )
745 }),
746 ),
747 #[cfg(all(
748 feature = "with-time",
749 not(feature = "with-chrono"),
750 feature = "postgres-array"
751 ))]
752 "DATE[]" => Value::Array(
753 sea_query::ArrayType::TimeDate,
754 row.try_get::<Option<Vec<time::Date>>, _>(c.ordinal())
755 .expect("Failed to get date array")
756 .map(|vals| {
757 Box::new(
758 vals.into_iter()
759 .map(|val| Value::TimeDate(Some(Box::new(val))))
760 .collect(),
761 )
762 }),
763 ),
764
765 #[cfg(feature = "with-chrono")]
766 "TIME" => Value::ChronoTime(
767 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
768 .expect("Failed to get time")
769 .map(Box::new),
770 ),
771 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
772 "TIME" => Value::TimeTime(
773 row.try_get::<Option<time::Time>, _>(c.ordinal())
774 .expect("Failed to get time")
775 .map(Box::new),
776 ),
777
778 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
779 "TIME[]" => Value::Array(
780 sea_query::ArrayType::ChronoTime,
781 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
782 .expect("Failed to get time array")
783 .map(|vals| {
784 Box::new(
785 vals.into_iter()
786 .map(|val| Value::ChronoTime(Some(Box::new(val))))
787 .collect(),
788 )
789 }),
790 ),
791 #[cfg(all(
792 feature = "with-time",
793 not(feature = "with-chrono"),
794 feature = "postgres-array"
795 ))]
796 "TIME[]" => Value::Array(
797 sea_query::ArrayType::TimeTime,
798 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
799 .expect("Failed to get time array")
800 .map(|vals| {
801 Box::new(
802 vals.into_iter()
803 .map(|val| Value::TimeTime(Some(Box::new(val))))
804 .collect(),
805 )
806 }),
807 ),
808
809 #[cfg(feature = "with-chrono")]
810 "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(
811 row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
812 .expect("Failed to get timestamptz")
813 .map(Box::new),
814 ),
815 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
816 "TIMESTAMPTZ" => Value::TimeDateTimeWithTimeZone(
817 row.try_get::<Option<time::OffsetDateTime>, _>(c.ordinal())
818 .expect("Failed to get timestamptz")
819 .map(Box::new),
820 ),
821
822 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
823 "TIMESTAMPTZ[]" => Value::Array(
824 sea_query::ArrayType::ChronoDateTimeUtc,
825 row.try_get::<Option<Vec<chrono::DateTime<chrono::Utc>>>, _>(
826 c.ordinal(),
827 )
828 .expect("Failed to get timestamptz array")
829 .map(|vals| {
830 Box::new(
831 vals.into_iter()
832 .map(|val| Value::ChronoDateTimeUtc(Some(Box::new(val))))
833 .collect(),
834 )
835 }),
836 ),
837 #[cfg(all(
838 feature = "with-time",
839 not(feature = "with-chrono"),
840 feature = "postgres-array"
841 ))]
842 "TIMESTAMPTZ[]" => Value::Array(
843 sea_query::ArrayType::TimeDateTimeWithTimeZone,
844 row.try_get::<Option<Vec<time::OffsetDateTime>>, _>(c.ordinal())
845 .expect("Failed to get timestamptz array")
846 .map(|vals| {
847 Box::new(
848 vals.into_iter()
849 .map(|val| {
850 Value::TimeDateTimeWithTimeZone(Some(Box::new(val)))
851 })
852 .collect(),
853 )
854 }),
855 ),
856
857 #[cfg(feature = "with-chrono")]
858 "TIMETZ" => Value::ChronoTime(
859 row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
860 .expect("Failed to get timetz")
861 .map(Box::new),
862 ),
863 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
864 "TIMETZ" => Value::TimeTime(
865 row.try_get::<Option<time::Time>, _>(c.ordinal())
866 .expect("Failed to get timetz")
867 .map(Box::new),
868 ),
869
870 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
871 "TIMETZ[]" => Value::Array(
872 sea_query::ArrayType::ChronoTime,
873 row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
874 .expect("Failed to get timetz array")
875 .map(|vals| {
876 Box::new(
877 vals.into_iter()
878 .map(|val| Value::ChronoTime(Some(Box::new(val))))
879 .collect(),
880 )
881 }),
882 ),
883 #[cfg(all(
884 feature = "with-time",
885 not(feature = "with-chrono"),
886 feature = "postgres-array"
887 ))]
888 "TIMETZ[]" => Value::Array(
889 sea_query::ArrayType::TimeTime,
890 row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
891 .expect("Failed to get timetz array")
892 .map(|vals| {
893 Box::new(
894 vals.into_iter()
895 .map(|val| Value::TimeTime(Some(Box::new(val))))
896 .collect(),
897 )
898 }),
899 ),
900
901 #[cfg(feature = "with-uuid")]
902 "UUID" => Value::Uuid(
903 row.try_get::<Option<uuid::Uuid>, _>(c.ordinal())
904 .expect("Failed to get uuid")
905 .map(Box::new),
906 ),
907
908 #[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
909 "UUID[]" => Value::Array(
910 sea_query::ArrayType::Uuid,
911 row.try_get::<Option<Vec<uuid::Uuid>>, _>(c.ordinal())
912 .expect("Failed to get uuid array")
913 .map(|vals| {
914 Box::new(
915 vals.into_iter()
916 .map(|val| Value::Uuid(Some(Box::new(val))))
917 .collect(),
918 )
919 }),
920 ),
921
922 _ => unreachable!("Unknown column type: {}", c.type_info().name()),
923 },
924 )
925 })
926 .collect(),
927 }
928}