1use futures_util::lock::Mutex;
2use log::LevelFilter;
3use sea_query::Values;
4use std::{fmt::Write, future::Future, pin::Pin, sync::Arc};
5
6use sqlx::{
7 Connection, Executor, PgPool, Postgres,
8 pool::PoolConnection,
9 postgres::{PgConnectOptions, PgQueryResult, PgRow},
10};
11
12use sea_query_sqlx::SqlxValues;
13use tracing::instrument;
14
15use crate::{
16 AccessMode, ConnectOptions, DatabaseConnection, DatabaseConnectionType, DatabaseTransaction,
17 DbBackend, IsolationLevel, QueryStream, Statement, TransactionError, debug_print, error::*,
18 executor::*,
19};
20
21use super::sqlx_common::*;
22
23#[derive(Debug)]
25pub struct SqlxPostgresConnector;
26
27#[derive(Clone)]
29pub struct SqlxPostgresPoolConnection {
30 pub(crate) pool: PgPool,
31 metric_callback: Option<crate::metric::Callback>,
32}
33
34impl std::fmt::Debug for SqlxPostgresPoolConnection {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
37 }
38}
39
40impl From<PgPool> for SqlxPostgresPoolConnection {
41 fn from(pool: PgPool) -> Self {
42 SqlxPostgresPoolConnection {
43 pool,
44 metric_callback: None,
45 }
46 }
47}
48
49impl From<PgPool> for DatabaseConnection {
50 fn from(pool: PgPool) -> Self {
51 DatabaseConnectionType::SqlxPostgresPoolConnection(pool.into()).into()
52 }
53}
54
55impl SqlxPostgresConnector {
56 pub fn accepts(string: &str) -> bool {
58 string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
59 }
60
61 #[instrument(level = "trace")]
63 pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
64 let mut sqlx_opts = options
65 .url
66 .parse::<PgConnectOptions>()
67 .map_err(sqlx_error_to_conn_err)?;
68 use sqlx::ConnectOptions;
69 if !options.sqlx_logging {
70 sqlx_opts = sqlx_opts.disable_statement_logging();
71 } else {
72 sqlx_opts = sqlx_opts.log_statements(options.sqlx_logging_level);
73 if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
74 sqlx_opts = sqlx_opts.log_slow_statements(
75 options.sqlx_slow_statements_logging_level,
76 options.sqlx_slow_statements_logging_threshold,
77 );
78 }
79 }
80
81 if let Some(f) = &options.pg_opts_fn {
82 sqlx_opts = f(sqlx_opts);
83 }
84
85 let set_search_path_sql = options.schema_search_path.as_ref().map(|schema| {
86 let mut string = "SET search_path = ".to_owned();
87 if schema.starts_with('"') {
88 write!(&mut string, "{schema}").expect("Infallible");
89 } else {
90 for (i, schema) in schema.split(',').enumerate() {
91 if i > 0 {
92 write!(&mut string, ",").expect("Infallible");
93 }
94 if schema.starts_with('"') {
95 write!(&mut string, "{schema}").expect("Infallible");
96 } else {
97 write!(&mut string, "\"{schema}\"").expect("Infallible");
98 }
99 }
100 }
101 string
102 });
103 let lazy = options.connect_lazy;
104 let mut pool_options = options.sqlx_pool_options();
105 if let Some(sql) = set_search_path_sql {
106 pool_options = pool_options.after_connect(move |conn, _| {
107 let sql = sql.clone();
108 Box::pin(async move {
109 sqlx::Executor::execute(conn, sql.as_str())
110 .await
111 .map(|_| ())
112 })
113 });
114 }
115 let pool = if lazy {
116 pool_options.connect_lazy_with(sqlx_opts)
117 } else {
118 pool_options
119 .connect_with(sqlx_opts)
120 .await
121 .map_err(sqlx_error_to_conn_err)?
122 };
123 Ok(
124 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
125 pool,
126 metric_callback: None,
127 })
128 .into(),
129 )
130 }
131}
132
133impl SqlxPostgresConnector {
134 pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
136 DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
137 pool,
138 metric_callback: None,
139 })
140 .into()
141 }
142}
143
144impl SqlxPostgresPoolConnection {
145 #[instrument(level = "trace")]
147 pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
148 debug_print!("{}", stmt);
149
150 let query = sqlx_query(&stmt);
151 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
152 crate::metric::metric!(self.metric_callback, &stmt, {
153 match query.execute(&mut *conn).await {
154 Ok(res) => Ok(res.into()),
155 Err(err) => Err(sqlx_error_to_exec_err(err)),
156 }
157 })
158 }
159
160 #[instrument(level = "trace")]
162 pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
163 debug_print!("{}", sql);
164
165 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
166 match conn.execute(sql).await {
167 Ok(res) => Ok(res.into()),
168 Err(err) => Err(sqlx_error_to_exec_err(err)),
169 }
170 }
171
172 #[instrument(level = "trace")]
174 pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
175 debug_print!("{}", stmt);
176
177 let query = sqlx_query(&stmt);
178 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
179 crate::metric::metric!(self.metric_callback, &stmt, {
180 match query.fetch_one(&mut *conn).await {
181 Ok(row) => Ok(Some(row.into())),
182 Err(err) => match err {
183 sqlx::Error::RowNotFound => Ok(None),
184 _ => Err(sqlx_error_to_query_err(err)),
185 },
186 }
187 })
188 }
189
190 #[instrument(level = "trace")]
192 pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
193 debug_print!("{}", stmt);
194
195 let query = sqlx_query(&stmt);
196 let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
197 crate::metric::metric!(self.metric_callback, &stmt, {
198 match query.fetch_all(&mut *conn).await {
199 Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
200 Err(err) => Err(sqlx_error_to_query_err(err)),
201 }
202 })
203 }
204
205 #[instrument(level = "trace")]
207 pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
208 debug_print!("{}", stmt);
209
210 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
211 Ok(QueryStream::from((
212 conn,
213 stmt,
214 self.metric_callback.clone(),
215 )))
216 }
217
218 #[instrument(level = "trace")]
220 pub async fn begin(
221 &self,
222 isolation_level: Option<IsolationLevel>,
223 access_mode: Option<AccessMode>,
224 ) -> Result<DatabaseTransaction, DbErr> {
225 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
226 DatabaseTransaction::new_postgres(
227 conn,
228 self.metric_callback.clone(),
229 isolation_level,
230 access_mode,
231 )
232 .await
233 }
234
235 #[instrument(level = "trace", skip(callback))]
237 pub async fn transaction<F, T, E>(
238 &self,
239 callback: F,
240 isolation_level: Option<IsolationLevel>,
241 access_mode: Option<AccessMode>,
242 ) -> Result<T, TransactionError<E>>
243 where
244 F: for<'b> FnOnce(
245 &'b DatabaseTransaction,
246 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
247 + Send,
248 T: Send,
249 E: std::fmt::Display + std::fmt::Debug + Send,
250 {
251 let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
252 let transaction = DatabaseTransaction::new_postgres(
253 conn,
254 self.metric_callback.clone(),
255 isolation_level,
256 access_mode,
257 )
258 .await
259 .map_err(|e| TransactionError::Connection(e))?;
260 transaction.run(callback).await
261 }
262
263 pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
264 where
265 F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
266 {
267 self.metric_callback = Some(Arc::new(callback));
268 }
269
270 pub async fn ping(&self) -> Result<(), DbErr> {
272 let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
273 match conn.ping().await {
274 Ok(_) => Ok(()),
275 Err(err) => Err(sqlx_error_to_conn_err(err)),
276 }
277 }
278
279 pub async fn close(self) -> Result<(), DbErr> {
282 self.close_by_ref().await
283 }
284
285 pub async fn close_by_ref(&self) -> Result<(), DbErr> {
287 self.pool.close().await;
288 Ok(())
289 }
290}
291
292impl From<PgRow> for QueryResult {
293 fn from(row: PgRow) -> QueryResult {
294 QueryResult {
295 row: QueryResultRow::SqlxPostgres(row),
296 }
297 }
298}
299
300impl From<PgQueryResult> for ExecResult {
301 fn from(result: PgQueryResult) -> ExecResult {
302 ExecResult {
303 result: ExecResultHolder::SqlxPostgres(result),
304 }
305 }
306}
307
308pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, SqlxValues> {
309 let values = stmt
310 .values
311 .as_ref()
312 .map_or(Values(Vec::new()), |values| values.clone());
313 sqlx::query_with(&stmt.sql, SqlxValues(values))
314}
315
316pub(crate) async fn set_transaction_config(
317 conn: &mut PoolConnection<Postgres>,
318 isolation_level: Option<IsolationLevel>,
319 access_mode: Option<AccessMode>,
320) -> Result<(), DbErr> {
321 if let Some(isolation_level) = isolation_level {
322 let stmt = Statement {
323 sql: format!("SET TRANSACTION ISOLATION LEVEL {isolation_level}"),
324 values: None,
325 db_backend: DbBackend::Postgres,
326 };
327 let query = sqlx_query(&stmt);
328 conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
329 }
330 if let Some(access_mode) = access_mode {
331 let stmt = Statement {
332 sql: format!("SET TRANSACTION {access_mode}"),
333 values: None,
334 db_backend: DbBackend::Postgres,
335 };
336 let query = sqlx_query(&stmt);
337 conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
338 }
339 Ok(())
340}
341
342impl
343 From<(
344 PoolConnection<sqlx::Postgres>,
345 Statement,
346 Option<crate::metric::Callback>,
347 )> for crate::QueryStream
348{
349 fn from(
350 (conn, stmt, metric_callback): (
351 PoolConnection<sqlx::Postgres>,
352 Statement,
353 Option<crate::metric::Callback>,
354 ),
355 ) -> Self {
356 crate::QueryStream::build(
357 stmt,
358 crate::InnerConnection::Postgres(conn),
359 metric_callback,
360 )
361 }
362}
363
364impl crate::DatabaseTransaction {
365 pub(crate) async fn new_postgres(
366 inner: PoolConnection<sqlx::Postgres>,
367 metric_callback: Option<crate::metric::Callback>,
368 isolation_level: Option<IsolationLevel>,
369 access_mode: Option<AccessMode>,
370 ) -> Result<crate::DatabaseTransaction, DbErr> {
371 Self::begin(
372 Arc::new(Mutex::new(crate::InnerConnection::Postgres(inner))),
373 crate::DbBackend::Postgres,
374 metric_callback,
375 isolation_level,
376 access_mode,
377 )
378 .await
379 }
380}
381
382#[cfg(feature = "proxy")]
383pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> crate::ProxyRow {
384 use sea_query::Value;
387 use sqlx::{Column, Row, TypeInfo};
388 crate::ProxyRow {
389 values: row
390 .columns()
391 .iter()
392 .map(|c| {
393 (
394 c.name().to_string(),
395 match c.type_info().name() {
396 "BOOL" => Value::Bool(Some(
397 row.try_get(c.ordinal()).expect("Failed to get boolean"),
398 )),
399 #[cfg(feature = "postgres-array")]
400 "BOOL[]" => Value::Array(
401 sea_query::ArrayType::Bool,
402 Some(Box::new(
403 row.try_get::<Vec<bool>, _>(c.ordinal())
404 .expect("Failed to get boolean array")
405 .iter()
406 .map(|val| Value::Bool(Some(*val)))
407 .collect(),
408 )),
409 ),
410
411 "\"CHAR\"" => Value::TinyInt(Some(
412 row.try_get(c.ordinal())
413 .expect("Failed to get small integer"),
414 )),
415 #[cfg(feature = "postgres-array")]
416 "\"CHAR\"[]" => Value::Array(
417 sea_query::ArrayType::TinyInt,
418 Some(Box::new(
419 row.try_get::<Vec<i8>, _>(c.ordinal())
420 .expect("Failed to get small integer array")
421 .iter()
422 .map(|val| Value::TinyInt(Some(*val)))
423 .collect(),
424 )),
425 ),
426
427 "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(Some(
428 row.try_get(c.ordinal())
429 .expect("Failed to get small integer"),
430 )),
431 #[cfg(feature = "postgres-array")]
432 "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
433 sea_query::ArrayType::SmallInt,
434 Some(Box::new(
435 row.try_get::<Vec<i16>, _>(c.ordinal())
436 .expect("Failed to get small integer array")
437 .iter()
438 .map(|val| Value::SmallInt(Some(*val)))
439 .collect(),
440 )),
441 ),
442
443 "INT" | "SERIAL" | "INT4" => Value::Int(Some(
444 row.try_get(c.ordinal()).expect("Failed to get integer"),
445 )),
446 #[cfg(feature = "postgres-array")]
447 "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
448 sea_query::ArrayType::Int,
449 Some(Box::new(
450 row.try_get::<Vec<i32>, _>(c.ordinal())
451 .expect("Failed to get integer array")
452 .iter()
453 .map(|val| Value::Int(Some(*val)))
454 .collect(),
455 )),
456 ),
457
458 "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(Some(
459 row.try_get(c.ordinal()).expect("Failed to get big integer"),
460 )),
461 #[cfg(feature = "postgres-array")]
462 "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
463 sea_query::ArrayType::BigInt,
464 Some(Box::new(
465 row.try_get::<Vec<i64>, _>(c.ordinal())
466 .expect("Failed to get big integer array")
467 .iter()
468 .map(|val| Value::BigInt(Some(*val)))
469 .collect(),
470 )),
471 ),
472
473 "FLOAT4" | "REAL" => Value::Float(Some(
474 row.try_get(c.ordinal()).expect("Failed to get float"),
475 )),
476 #[cfg(feature = "postgres-array")]
477 "FLOAT4[]" | "REAL[]" => Value::Array(
478 sea_query::ArrayType::Float,
479 Some(Box::new(
480 row.try_get::<Vec<f32>, _>(c.ordinal())
481 .expect("Failed to get float array")
482 .iter()
483 .map(|val| Value::Float(Some(*val)))
484 .collect(),
485 )),
486 ),
487
488 "FLOAT8" | "DOUBLE PRECISION" => Value::Double(Some(
489 row.try_get(c.ordinal()).expect("Failed to get double"),
490 )),
491 #[cfg(feature = "postgres-array")]
492 "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
493 sea_query::ArrayType::Double,
494 Some(Box::new(
495 row.try_get::<Vec<f64>, _>(c.ordinal())
496 .expect("Failed to get double array")
497 .iter()
498 .map(|val| Value::Double(Some(*val)))
499 .collect(),
500 )),
501 ),
502
503 "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(Some(Box::new(
504 row.try_get(c.ordinal()).expect("Failed to get string"),
505 ))),
506 #[cfg(feature = "postgres-array")]
507 "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
508 sea_query::ArrayType::String,
509 Some(Box::new(
510 row.try_get::<Vec<String>, _>(c.ordinal())
511 .expect("Failed to get string array")
512 .iter()
513 .map(|val| Value::String(Some(Box::new(val.clone()))))
514 .collect(),
515 )),
516 ),
517
518 "BYTEA" => Value::Bytes(Some(Box::new(
519 row.try_get(c.ordinal()).expect("Failed to get bytes"),
520 ))),
521 #[cfg(feature = "postgres-array")]
522 "BYTEA[]" => Value::Array(
523 sea_query::ArrayType::Bytes,
524 Some(Box::new(
525 row.try_get::<Vec<Vec<u8>>, _>(c.ordinal())
526 .expect("Failed to get bytes array")
527 .iter()
528 .map(|val| Value::Bytes(Some(Box::new(val.clone()))))
529 .collect(),
530 )),
531 ),
532
533 #[cfg(feature = "with-bigdecimal")]
534 "NUMERIC" => Value::BigDecimal(Some(Box::new(
535 row.try_get(c.ordinal()).expect("Failed to get numeric"),
536 ))),
537 #[cfg(all(
538 feature = "with-rust_decimal",
539 not(feature = "with-bigdecimal")
540 ))]
541 "NUMERIC" => Value::Decimal(Some(Box::new(
542 row.try_get(c.ordinal()).expect("Failed to get numeric"),
543 ))),
544
545 #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
546 "NUMERIC[]" => Value::Array(
547 sea_query::ArrayType::BigDecimal,
548 Some(Box::new(
549 row.try_get::<Vec<bigdecimal::BigDecimal>, _>(c.ordinal())
550 .expect("Failed to get numeric array")
551 .iter()
552 .map(|val| Value::BigDecimal(Some(Box::new(val.clone()))))
553 .collect(),
554 )),
555 ),
556 #[cfg(all(
557 feature = "with-rust_decimal",
558 not(feature = "with-bigdecimal"),
559 feature = "postgres-array"
560 ))]
561 "NUMERIC[]" => Value::Array(
562 sea_query::ArrayType::Decimal,
563 Some(Box::new(
564 row.try_get::<Vec<rust_decimal::Decimal>, _>(c.ordinal())
565 .expect("Failed to get numeric array")
566 .iter()
567 .map(|val| Value::Decimal(Some(Box::new(val.clone()))))
568 .collect(),
569 )),
570 ),
571
572 "OID" => Value::BigInt(Some(
573 row.try_get(c.ordinal()).expect("Failed to get oid"),
574 )),
575 #[cfg(feature = "postgres-array")]
576 "OID[]" => Value::Array(
577 sea_query::ArrayType::BigInt,
578 Some(Box::new(
579 row.try_get::<Vec<i64>, _>(c.ordinal())
580 .expect("Failed to get oid array")
581 .iter()
582 .map(|val| Value::BigInt(Some(*val)))
583 .collect(),
584 )),
585 ),
586
587 "JSON" | "JSONB" => Value::Json(Some(Box::new(
588 row.try_get(c.ordinal()).expect("Failed to get json"),
589 ))),
590 #[cfg(any(feature = "json-array", feature = "postgres-array"))]
591 "JSON[]" | "JSONB[]" => Value::Array(
592 sea_query::ArrayType::Json,
593 Some(Box::new(
594 row.try_get::<Vec<serde_json::Value>, _>(c.ordinal())
595 .expect("Failed to get json array")
596 .iter()
597 .map(|val| Value::Json(Some(Box::new(val.clone()))))
598 .collect(),
599 )),
600 ),
601
602 #[cfg(feature = "with-ipnetwork")]
603 "INET" | "CIDR" => Value::IpNetwork(Some(Box::new(
604 row.try_get(c.ordinal()).expect("Failed to get ip address"),
605 ))),
606 #[cfg(feature = "with-ipnetwork")]
607 "INET[]" | "CIDR[]" => Value::Array(
608 sea_query::ArrayType::IpNetwork,
609 Some(Box::new(
610 row.try_get::<Vec<ipnetwork::IpNetwork>, _>(c.ordinal())
611 .expect("Failed to get ip address array")
612 .iter()
613 .map(|val| Value::IpNetwork(Some(Box::new(*val))))
614 .collect(),
615 )),
616 ),
617
618 #[cfg(feature = "with-mac_address")]
619 "MACADDR" | "MACADDR8" => Value::MacAddress(Some(Box::new(
620 row.try_get(c.ordinal()).expect("Failed to get mac address"),
621 ))),
622 #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
623 "MACADDR[]" | "MACADDR8[]" => Value::Array(
624 sea_query::ArrayType::MacAddress,
625 Some(Box::new(
626 row.try_get::<Vec<mac_address::MacAddress>, _>(c.ordinal())
627 .expect("Failed to get mac address array")
628 .iter()
629 .map(|val| Value::MacAddress(Some(Box::new(val.clone()))))
630 .collect(),
631 )),
632 ),
633
634 #[cfg(feature = "with-chrono")]
635 "TIMESTAMP" => Value::ChronoDateTime(Some(Box::new(
636 row.try_get(c.ordinal()).expect("Failed to get timestamp"),
637 ))),
638 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
639 "TIMESTAMP" => Value::TimeDateTime(Some(Box::new(
640 row.try_get(c.ordinal()).expect("Failed to get timestamp"),
641 ))),
642
643 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
644 "TIMESTAMP[]" => Value::Array(
645 sea_query::ArrayType::ChronoDateTime,
646 Some(Box::new(
647 row.try_get::<Vec<chrono::NaiveDateTime>, _>(c.ordinal())
648 .expect("Failed to get timestamp array")
649 .iter()
650 .map(|val| Value::ChronoDateTime(Some(Box::new(*val))))
651 .collect(),
652 )),
653 ),
654 #[cfg(all(
655 feature = "with-time",
656 not(feature = "with-chrono"),
657 feature = "postgres-array"
658 ))]
659 "TIMESTAMP[]" => Value::Array(
660 sea_query::ArrayType::TimeDateTime,
661 Some(Box::new(
662 row.try_get::<Vec<time::OffsetDateTime>, _>(c.ordinal())
663 .expect("Failed to get timestamp array")
664 .iter()
665 .map(|val| Value::TimeDateTime(Some(Box::new(val.clone()))))
666 .collect(),
667 )),
668 ),
669
670 #[cfg(feature = "with-chrono")]
671 "DATE" => Value::ChronoDate(Some(Box::new(
672 row.try_get(c.ordinal()).expect("Failed to get date"),
673 ))),
674 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
675 "DATE" => Value::TimeDate(Some(Box::new(
676 row.try_get(c.ordinal()).expect("Failed to get date"),
677 ))),
678
679 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
680 "DATE[]" => Value::Array(
681 sea_query::ArrayType::ChronoDate,
682 Some(Box::new(
683 row.try_get::<Vec<chrono::NaiveDate>, _>(c.ordinal())
684 .expect("Failed to get date array")
685 .iter()
686 .map(|val| Value::ChronoDate(Some(Box::new(*val))))
687 .collect(),
688 )),
689 ),
690 #[cfg(all(
691 feature = "with-time",
692 not(feature = "with-chrono"),
693 feature = "postgres-array"
694 ))]
695 "DATE[]" => Value::Array(
696 sea_query::ArrayType::TimeDate,
697 Some(Box::new(
698 row.try_get::<Vec<time::Date>, _>(c.ordinal())
699 .expect("Failed to get date array")
700 .iter()
701 .map(|val| Value::TimeDate(Some(Box::new(val.clone()))))
702 .collect(),
703 )),
704 ),
705
706 #[cfg(feature = "with-chrono")]
707 "TIME" => Value::ChronoTime(Some(Box::new(
708 row.try_get(c.ordinal()).expect("Failed to get time"),
709 ))),
710 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
711 "TIME" => Value::TimeTime(Some(Box::new(
712 row.try_get(c.ordinal()).expect("Failed to get time"),
713 ))),
714
715 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
716 "TIME[]" => Value::Array(
717 sea_query::ArrayType::ChronoTime,
718 Some(Box::new(
719 row.try_get::<Vec<chrono::NaiveTime>, _>(c.ordinal())
720 .expect("Failed to get time array")
721 .iter()
722 .map(|val| Value::ChronoTime(Some(Box::new(*val))))
723 .collect(),
724 )),
725 ),
726 #[cfg(all(
727 feature = "with-time",
728 not(feature = "with-chrono"),
729 feature = "postgres-array"
730 ))]
731 "TIME[]" => Value::Array(
732 sea_query::ArrayType::TimeTime,
733 Some(Box::new(
734 row.try_get::<Vec<time::Time>, _>(c.ordinal())
735 .expect("Failed to get time array")
736 .iter()
737 .map(|val| Value::TimeTime(Some(Box::new(val.clone()))))
738 .collect(),
739 )),
740 ),
741
742 #[cfg(feature = "with-chrono")]
743 "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(Some(Box::new(
744 row.try_get(c.ordinal()).expect("Failed to get timestamptz"),
745 ))),
746 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
747 "TIMESTAMPTZ" => Value::TimeDateTime(Some(Box::new(
748 row.try_get(c.ordinal()).expect("Failed to get timestamptz"),
749 ))),
750
751 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
752 "TIMESTAMPTZ[]" => Value::Array(
753 sea_query::ArrayType::ChronoDateTimeUtc,
754 Some(Box::new(
755 row.try_get::<Vec<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
756 .expect("Failed to get timestamptz array")
757 .iter()
758 .map(|val| Value::ChronoDateTimeUtc(Some(Box::new(*val))))
759 .collect(),
760 )),
761 ),
762 #[cfg(all(
763 feature = "with-time",
764 not(feature = "with-chrono"),
765 feature = "postgres-array"
766 ))]
767 "TIMESTAMPTZ[]" => Value::Array(
768 sea_query::ArrayType::TimeDateTime,
769 Some(Box::new(
770 row.try_get::<Vec<time::OffsetDateTime>, _>(c.ordinal())
771 .expect("Failed to get timestamptz array")
772 .iter()
773 .map(|val| Value::TimeDateTime(Some(Box::new(val.clone()))))
774 .collect(),
775 )),
776 ),
777
778 #[cfg(feature = "with-chrono")]
779 "TIMETZ" => Value::ChronoTime(Some(Box::new(
780 row.try_get(c.ordinal()).expect("Failed to get timetz"),
781 ))),
782 #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
783 "TIMETZ" => Value::TimeTime(Some(Box::new(
784 row.try_get(c.ordinal()).expect("Failed to get timetz"),
785 ))),
786
787 #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
788 "TIMETZ[]" => Value::Array(
789 sea_query::ArrayType::ChronoTime,
790 Some(Box::new(
791 row.try_get::<Vec<chrono::NaiveTime>, _>(c.ordinal())
792 .expect("Failed to get timetz array")
793 .iter()
794 .map(|val| Value::ChronoTime(Some(Box::new(*val))))
795 .collect(),
796 )),
797 ),
798 #[cfg(all(
799 feature = "with-time",
800 not(feature = "with-chrono"),
801 feature = "postgres-array"
802 ))]
803 "TIMETZ[]" => Value::Array(
804 sea_query::ArrayType::TimeTime,
805 Some(Box::new(
806 row.try_get::<Vec<time::Time>, _>(c.ordinal())
807 .expect("Failed to get timetz array")
808 .iter()
809 .map(|val| Value::TimeTime(Some(Box::new(val.clone()))))
810 .collect(),
811 )),
812 ),
813
814 #[cfg(feature = "with-uuid")]
815 "UUID" => Value::Uuid(Some(Box::new(
816 row.try_get(c.ordinal()).expect("Failed to get uuid"),
817 ))),
818
819 #[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
820 "UUID[]" => Value::Array(
821 sea_query::ArrayType::Uuid,
822 Some(Box::new(
823 row.try_get::<Vec<uuid::Uuid>, _>(c.ordinal())
824 .expect("Failed to get uuid array")
825 .iter()
826 .map(|val| Value::Uuid(Some(Box::new(*val))))
827 .collect(),
828 )),
829 ),
830
831 _ => unreachable!("Unknown column type: {}", c.type_info().name()),
832 },
833 )
834 })
835 .collect(),
836 }
837}