Skip to main content

sea_orm/driver/
sqlx_postgres.rs

1use futures_util::lock::Mutex;
2use log::LevelFilter;
3use sea_query::Values;
4use std::{fmt::Write, future::Future, pin::Pin, sync::Arc};
5
6use sqlx::{
7    Connection, Executor, PgPool, Postgres,
8    pool::PoolConnection,
9    postgres::{PgConnectOptions, PgQueryResult, PgRow},
10};
11
12use sea_query_sqlx::SqlxValues;
13use tracing::instrument;
14
15use crate::{
16    AccessMode, ConnectOptions, DatabaseConnection, DatabaseConnectionType, DatabaseTransaction,
17    IsolationLevel, Statement, TransactionError, debug_print, error::*, executor::*,
18};
19
20use super::sqlx_common::*;
21
22#[cfg(feature = "stream")]
23use crate::QueryStream;
24
25/// Defines the [sqlx::postgres] connector
26#[derive(Debug)]
27pub struct SqlxPostgresConnector;
28
29/// Defines a sqlx PostgreSQL pool
30#[derive(Clone)]
31pub struct SqlxPostgresPoolConnection {
32    pub(crate) pool: PgPool,
33    metric_callback: Option<crate::metric::Callback>,
34    pub(crate) record_stmt_in_spans: bool,
35}
36
37impl std::fmt::Debug for SqlxPostgresPoolConnection {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
40    }
41}
42
43impl From<PgPool> for SqlxPostgresPoolConnection {
44    fn from(pool: PgPool) -> Self {
45        SqlxPostgresPoolConnection {
46            pool,
47            metric_callback: None,
48            record_stmt_in_spans: true,
49        }
50    }
51}
52
53impl From<PgPool> for DatabaseConnection {
54    fn from(pool: PgPool) -> Self {
55        DatabaseConnectionType::SqlxPostgresPoolConnection(pool.into()).into()
56    }
57}
58
59impl SqlxPostgresConnector {
60    /// Check if the URI provided corresponds to `postgres://` for a PostgreSQL database
61    pub fn accepts(string: &str) -> bool {
62        string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
63    }
64
65    /// Add configuration options for the PostgreSQL database
66    #[instrument(level = "trace")]
67    pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
68        let record_stmt_in_spans = options.get_record_stmt_in_spans();
69        let mut sqlx_opts = options
70            .url
71            .parse::<PgConnectOptions>()
72            .map_err(sqlx_error_to_conn_err)?;
73        use sqlx::ConnectOptions;
74        if !options.sqlx_logging {
75            sqlx_opts = sqlx_opts.disable_statement_logging();
76        } else {
77            sqlx_opts = sqlx_opts.log_statements(options.sqlx_logging_level);
78            if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
79                sqlx_opts = sqlx_opts.log_slow_statements(
80                    options.sqlx_slow_statements_logging_level,
81                    options.sqlx_slow_statements_logging_threshold,
82                );
83            }
84        }
85
86        if let Some(application_name) = &options.application_name {
87            sqlx_opts = sqlx_opts.application_name(application_name);
88        }
89
90        if let Some(timeout) = options.statement_timeout {
91            sqlx_opts = sqlx_opts.options([("statement_timeout", timeout.as_millis().to_string())]);
92        }
93
94        if let Some(f) = &options.pg_opts_fn {
95            sqlx_opts = f(sqlx_opts);
96        }
97
98        let set_search_path_sql = options.schema_search_path.as_ref().map(|schema| {
99            let mut string = "SET search_path = ".to_owned();
100            if schema.starts_with('"') {
101                write!(&mut string, "{schema}").expect("Infallible");
102            } else {
103                for (i, schema) in schema.split(',').enumerate() {
104                    if i > 0 {
105                        write!(&mut string, ",").expect("Infallible");
106                    }
107                    if schema.starts_with('"') {
108                        write!(&mut string, "{schema}").expect("Infallible");
109                    } else {
110                        write!(&mut string, "\"{schema}\"").expect("Infallible");
111                    }
112                }
113            }
114            string
115        });
116
117        let lazy = options.connect_lazy;
118        let after_connect = options.after_connect.clone();
119        let pg_pool_opts_fn = options.pg_pool_opts_fn.clone();
120        let mut pool_options = options.sqlx_pool_options();
121
122        if let Some(sql) = set_search_path_sql {
123            pool_options = pool_options.after_connect(move |conn, _| {
124                let sql = sql.clone();
125                Box::pin(async move {
126                    sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql))
127                        .await
128                        .map(|_| ())
129                })
130            });
131        }
132        if let Some(f) = &pg_pool_opts_fn {
133            pool_options = f(pool_options);
134        }
135        let pool = if lazy {
136            pool_options.connect_lazy_with(sqlx_opts)
137        } else {
138            pool_options
139                .connect_with(sqlx_opts)
140                .await
141                .map_err(sqlx_error_to_conn_err)?
142        };
143
144        let conn: DatabaseConnection =
145            DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
146                pool,
147                metric_callback: None,
148                record_stmt_in_spans,
149            })
150            .into();
151
152        if let Some(cb) = after_connect {
153            cb(conn.clone()).await?;
154        }
155
156        Ok(conn)
157    }
158}
159
160impl SqlxPostgresConnector {
161    /// Instantiate a sqlx pool connection to a [DatabaseConnection]
162    pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
163        DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
164            pool,
165            metric_callback: None,
166            record_stmt_in_spans: true,
167        })
168        .into()
169    }
170}
171
172impl SqlxPostgresPoolConnection {
173    /// Execute a [Statement] on a PostgreSQL backend
174    #[instrument(level = "trace", skip(stmt))]
175    pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
176        debug_print!("{}", stmt);
177
178        let query = sqlx_query(&stmt);
179        let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
180        crate::metric::metric!(self.metric_callback, &stmt, {
181            match query.execute(&mut *conn).await {
182                Ok(res) => Ok(res.into()),
183                Err(err) => Err(sqlx_error_to_exec_err(err)),
184            }
185        })
186    }
187
188    /// Execute an unprepared SQL statement on a PostgreSQL backend
189    #[instrument(level = "trace", skip(sql))]
190    pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
191        debug_print!("{}", sql);
192
193        let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
194        match conn.execute(sqlx::AssertSqlSafe(sql.to_owned())).await {
195            Ok(res) => Ok(res.into()),
196            Err(err) => Err(sqlx_error_to_exec_err(err)),
197        }
198    }
199
200    /// Get one result from a SQL query. Returns [Option::None] if no match was found
201    #[instrument(level = "trace", skip(stmt))]
202    pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
203        debug_print!("{}", stmt);
204
205        let query = sqlx_query(&stmt);
206        let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
207        crate::metric::metric!(self.metric_callback, &stmt, {
208            match query.fetch_one(&mut *conn).await {
209                Ok(row) => Ok(Some(row.into())),
210                Err(err) => match err {
211                    sqlx::Error::RowNotFound => Ok(None),
212                    _ => Err(sqlx_error_to_query_err(err)),
213                },
214            }
215        })
216    }
217
218    /// Get the results of a query returning them as a Vec<[QueryResult]>
219    #[instrument(level = "trace", skip(stmt))]
220    pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
221        debug_print!("{}", stmt);
222
223        let query = sqlx_query(&stmt);
224        let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
225        crate::metric::metric!(self.metric_callback, &stmt, {
226            match query.fetch_all(&mut *conn).await {
227                Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
228                Err(err) => Err(sqlx_error_to_query_err(err)),
229            }
230        })
231    }
232
233    /// Stream the results of executing a SQL query
234    #[instrument(level = "trace", skip(stmt))]
235    #[cfg(feature = "stream")]
236    pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
237        debug_print!("{}", stmt);
238
239        let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
240        Ok(QueryStream::from((
241            conn,
242            stmt,
243            self.metric_callback.clone(),
244        )))
245    }
246
247    /// Bundle a set of SQL statements that execute together.
248    #[instrument(level = "trace")]
249    pub async fn begin(
250        &self,
251        isolation_level: Option<IsolationLevel>,
252        access_mode: Option<AccessMode>,
253    ) -> Result<DatabaseTransaction, DbErr> {
254        let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
255        DatabaseTransaction::new_postgres(
256            conn,
257            self.metric_callback.clone(),
258            self.record_stmt_in_spans,
259            isolation_level,
260            access_mode,
261        )
262        .await
263    }
264
265    /// Create a PostgreSQL transaction
266    #[instrument(level = "trace", skip(callback))]
267    pub async fn transaction<F, T, E>(
268        &self,
269        callback: F,
270        isolation_level: Option<IsolationLevel>,
271        access_mode: Option<AccessMode>,
272    ) -> Result<T, TransactionError<E>>
273    where
274        F: for<'b> FnOnce(
275                &'b DatabaseTransaction,
276            ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
277            + Send,
278        T: Send,
279        E: std::fmt::Display + std::fmt::Debug + Send,
280    {
281        let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
282        let transaction = DatabaseTransaction::new_postgres(
283            conn,
284            self.metric_callback.clone(),
285            self.record_stmt_in_spans,
286            isolation_level,
287            access_mode,
288        )
289        .await
290        .map_err(|e| TransactionError::Connection(e))?;
291        transaction.run(callback).await
292    }
293
294    pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
295    where
296        F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
297    {
298        self.metric_callback = Some(Arc::new(callback));
299    }
300
301    /// Checks if a connection to the database is still valid.
302    pub async fn ping(&self) -> Result<(), DbErr> {
303        let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
304        match conn.ping().await {
305            Ok(_) => Ok(()),
306            Err(err) => Err(sqlx_error_to_conn_err(err)),
307        }
308    }
309
310    /// Explicitly close the Postgres connection.
311    /// See [`Self::close_by_ref`] for usage with references.
312    pub async fn close(self) -> Result<(), DbErr> {
313        self.close_by_ref().await
314    }
315
316    /// Explicitly close the Postgres connection
317    pub async fn close_by_ref(&self) -> Result<(), DbErr> {
318        self.pool.close().await;
319        Ok(())
320    }
321}
322
323impl From<PgRow> for QueryResult {
324    fn from(row: PgRow) -> QueryResult {
325        QueryResult {
326            row: QueryResultRow::SqlxPostgres(row),
327        }
328    }
329}
330
331impl From<PgQueryResult> for ExecResult {
332    fn from(result: PgQueryResult) -> ExecResult {
333        ExecResult {
334            result: ExecResultHolder::SqlxPostgres(result),
335        }
336    }
337}
338
339pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, SqlxValues> {
340    let values = stmt
341        .values
342        .as_ref()
343        .map_or(Values(Vec::new()), |values| values.clone());
344    sqlx::query_with(sqlx::AssertSqlSafe(stmt.sql.as_str()), SqlxValues(values))
345}
346
347pub(crate) async fn set_transaction_config(
348    conn: &mut PoolConnection<Postgres>,
349    isolation_level: Option<IsolationLevel>,
350    access_mode: Option<AccessMode>,
351) -> Result<(), DbErr> {
352    let mut settings = Vec::new();
353
354    if let Some(isolation_level) = isolation_level {
355        settings.push(format!("ISOLATION LEVEL {isolation_level}"));
356    }
357
358    if let Some(access_mode) = access_mode {
359        settings.push(access_mode.to_string());
360    }
361
362    if !settings.is_empty() {
363        let sql = format!("SET TRANSACTION {}", settings.join(" "));
364        sqlx::query(sqlx::AssertSqlSafe(sql))
365            .execute(&mut **conn)
366            .await
367            .map_err(sqlx_error_to_exec_err)?;
368    }
369    Ok(())
370}
371
372#[cfg(feature = "stream")]
373impl
374    From<(
375        PoolConnection<sqlx::Postgres>,
376        Statement,
377        Option<crate::metric::Callback>,
378    )> for crate::QueryStream
379{
380    fn from(
381        (conn, stmt, metric_callback): (
382            PoolConnection<sqlx::Postgres>,
383            Statement,
384            Option<crate::metric::Callback>,
385        ),
386    ) -> Self {
387        crate::QueryStream::build(
388            stmt,
389            crate::InnerConnection::Postgres(conn),
390            metric_callback,
391        )
392    }
393}
394
395impl crate::DatabaseTransaction {
396    pub(crate) async fn new_postgres(
397        inner: PoolConnection<sqlx::Postgres>,
398        metric_callback: Option<crate::metric::Callback>,
399        record_stmt_in_spans: bool,
400        isolation_level: Option<IsolationLevel>,
401        access_mode: Option<AccessMode>,
402    ) -> Result<crate::DatabaseTransaction, DbErr> {
403        Self::begin(
404            Arc::new(Mutex::new(crate::InnerConnection::Postgres(inner))),
405            crate::DbBackend::Postgres,
406            metric_callback,
407            record_stmt_in_spans,
408            isolation_level,
409            access_mode,
410            None,
411        )
412        .await
413    }
414}
415
416#[cfg(feature = "proxy")]
417pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> crate::ProxyRow {
418    // https://docs.rs/sqlx-postgres/0.7.2/src/sqlx_postgres/type_info.rs.html
419    // https://docs.rs/sqlx-postgres/0.7.2/sqlx_postgres/types/index.html
420    use sea_query::Value;
421    use sqlx::{Column, Row, TypeInfo};
422    crate::ProxyRow {
423        values: row
424            .columns()
425            .iter()
426            .map(|c| {
427                (
428                    c.name().to_string(),
429                    match c.type_info().name() {
430                        "BOOL" => {
431                            Value::Bool(row.try_get(c.ordinal()).expect("Failed to get boolean"))
432                        }
433                        #[cfg(feature = "postgres-array")]
434                        "BOOL[]" => Value::Array(
435                            sea_query::ArrayType::Bool,
436                            row.try_get::<Option<Vec<bool>>, _>(c.ordinal())
437                                .expect("Failed to get boolean array")
438                                .map(|vals| {
439                                    Box::new(
440                                        vals.into_iter()
441                                            .map(|val| Value::Bool(Some(val)))
442                                            .collect(),
443                                    )
444                                }),
445                        ),
446
447                        "\"CHAR\"" => Value::TinyInt(
448                            row.try_get(c.ordinal())
449                                .expect("Failed to get small integer"),
450                        ),
451                        #[cfg(feature = "postgres-array")]
452                        "\"CHAR\"[]" => Value::Array(
453                            sea_query::ArrayType::TinyInt,
454                            row.try_get::<Option<Vec<i8>>, _>(c.ordinal())
455                                .expect("Failed to get small integer array")
456                                .map(|vals: Vec<i8>| {
457                                    Box::new(
458                                        vals.into_iter()
459                                            .map(|val| Value::TinyInt(Some(val)))
460                                            .collect(),
461                                    )
462                                }),
463                        ),
464
465                        "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(
466                            row.try_get(c.ordinal())
467                                .expect("Failed to get small integer"),
468                        ),
469                        #[cfg(feature = "postgres-array")]
470                        "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
471                            sea_query::ArrayType::SmallInt,
472                            row.try_get::<Option<Vec<i16>>, _>(c.ordinal())
473                                .expect("Failed to get small integer array")
474                                .map(|vals: Vec<i16>| {
475                                    Box::new(
476                                        vals.into_iter()
477                                            .map(|val| Value::SmallInt(Some(val)))
478                                            .collect(),
479                                    )
480                                }),
481                        ),
482
483                        "INT" | "SERIAL" | "INT4" => {
484                            Value::Int(row.try_get(c.ordinal()).expect("Failed to get integer"))
485                        }
486                        #[cfg(feature = "postgres-array")]
487                        "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
488                            sea_query::ArrayType::Int,
489                            row.try_get::<Option<Vec<i32>>, _>(c.ordinal())
490                                .expect("Failed to get integer array")
491                                .map(|vals: Vec<i32>| {
492                                    Box::new(
493                                        vals.into_iter().map(|val| Value::Int(Some(val))).collect(),
494                                    )
495                                }),
496                        ),
497
498                        "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(
499                            row.try_get(c.ordinal()).expect("Failed to get big integer"),
500                        ),
501                        #[cfg(feature = "postgres-array")]
502                        "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
503                            sea_query::ArrayType::BigInt,
504                            row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
505                                .expect("Failed to get big integer array")
506                                .map(|vals: Vec<i64>| {
507                                    Box::new(
508                                        vals.into_iter()
509                                            .map(|val| Value::BigInt(Some(val)))
510                                            .collect(),
511                                    )
512                                }),
513                        ),
514
515                        "FLOAT4" | "REAL" => {
516                            Value::Float(row.try_get(c.ordinal()).expect("Failed to get float"))
517                        }
518                        #[cfg(feature = "postgres-array")]
519                        "FLOAT4[]" | "REAL[]" => Value::Array(
520                            sea_query::ArrayType::Float,
521                            row.try_get::<Option<Vec<f32>>, _>(c.ordinal())
522                                .expect("Failed to get float array")
523                                .map(|vals| {
524                                    Box::new(
525                                        vals.into_iter()
526                                            .map(|val| Value::Float(Some(val)))
527                                            .collect(),
528                                    )
529                                }),
530                        ),
531
532                        "FLOAT8" | "DOUBLE PRECISION" => {
533                            Value::Double(row.try_get(c.ordinal()).expect("Failed to get double"))
534                        }
535                        #[cfg(feature = "postgres-array")]
536                        "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
537                            sea_query::ArrayType::Double,
538                            row.try_get::<Option<Vec<f64>>, _>(c.ordinal())
539                                .expect("Failed to get double array")
540                                .map(|vals| {
541                                    Box::new(
542                                        vals.into_iter()
543                                            .map(|val| Value::Double(Some(val)))
544                                            .collect(),
545                                    )
546                                }),
547                        ),
548
549                        "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(
550                            row.try_get::<Option<String>, _>(c.ordinal())
551                                .expect("Failed to get string"),
552                        ),
553                        #[cfg(feature = "postgres-array")]
554                        "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
555                            sea_query::ArrayType::String,
556                            row.try_get::<Option<Vec<String>>, _>(c.ordinal())
557                                .expect("Failed to get string array")
558                                .map(|vals| {
559                                    Box::new(
560                                        vals.into_iter()
561                                            .map(|val| Value::String(Some(val)))
562                                            .collect(),
563                                    )
564                                }),
565                        ),
566
567                        "BYTEA" => Value::Bytes(
568                            row.try_get::<Option<Vec<u8>>, _>(c.ordinal())
569                                .expect("Failed to get bytes"),
570                        ),
571                        #[cfg(feature = "postgres-array")]
572                        "BYTEA[]" => Value::Array(
573                            sea_query::ArrayType::Bytes,
574                            row.try_get::<Option<Vec<Vec<u8>>>, _>(c.ordinal())
575                                .expect("Failed to get bytes array")
576                                .map(|vals| {
577                                    Box::new(
578                                        vals.into_iter()
579                                            .map(|val| Value::Bytes(Some(val)))
580                                            .collect(),
581                                    )
582                                }),
583                        ),
584
585                        #[cfg(feature = "with-bigdecimal")]
586                        "NUMERIC" => Value::BigDecimal(
587                            row.try_get::<Option<bigdecimal::BigDecimal>, _>(c.ordinal())
588                                .expect("Failed to get numeric"),
589                        ),
590                        #[cfg(all(
591                            feature = "with-rust_decimal",
592                            not(feature = "with-bigdecimal")
593                        ))]
594                        "NUMERIC" => {
595                            Value::Decimal(row.try_get(c.ordinal()).expect("Failed to get numeric"))
596                        }
597
598                        #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
599                        "NUMERIC[]" => Value::Array(
600                            sea_query::ArrayType::BigDecimal,
601                            row.try_get::<Option<Vec<bigdecimal::BigDecimal>>, _>(c.ordinal())
602                                .expect("Failed to get numeric array")
603                                .map(|vals| {
604                                    Box::new(
605                                        vals.into_iter()
606                                            .map(|val| Value::BigDecimal(Some(val)))
607                                            .collect(),
608                                    )
609                                }),
610                        ),
611                        #[cfg(all(
612                            feature = "with-rust_decimal",
613                            not(feature = "with-bigdecimal"),
614                            feature = "postgres-array"
615                        ))]
616                        "NUMERIC[]" => Value::Array(
617                            sea_query::ArrayType::Decimal,
618                            row.try_get::<Option<Vec<rust_decimal::Decimal>>, _>(c.ordinal())
619                                .expect("Failed to get numeric array")
620                                .map(|vals| {
621                                    Box::new(
622                                        vals.into_iter()
623                                            .map(|val| Value::Decimal(Some(val)))
624                                            .collect(),
625                                    )
626                                }),
627                        ),
628
629                        "OID" => {
630                            Value::BigInt(row.try_get(c.ordinal()).expect("Failed to get oid"))
631                        }
632                        #[cfg(feature = "postgres-array")]
633                        "OID[]" => Value::Array(
634                            sea_query::ArrayType::BigInt,
635                            row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
636                                .expect("Failed to get oid array")
637                                .map(|vals| {
638                                    Box::new(
639                                        vals.into_iter()
640                                            .map(|val| Value::BigInt(Some(val)))
641                                            .collect(),
642                                    )
643                                }),
644                        ),
645
646                        #[cfg(feature = "with-json")]
647                        "JSON" | "JSONB" => Value::Json(
648                            row.try_get::<Option<serde_json::Value>, _>(c.ordinal())
649                                .expect("Failed to get json")
650                                .map(Box::new),
651                        ),
652                        #[cfg(all(
653                            feature = "with-json",
654                            any(feature = "json-array", feature = "postgres-array")
655                        ))]
656                        "JSON[]" | "JSONB[]" => Value::Array(
657                            sea_query::ArrayType::Json,
658                            row.try_get::<Option<Vec<serde_json::Value>>, _>(c.ordinal())
659                                .expect("Failed to get json array")
660                                .map(|vals| {
661                                    Box::new(
662                                        vals.into_iter()
663                                            .map(|val| Value::Json(Some(Box::new(val))))
664                                            .collect(),
665                                    )
666                                }),
667                        ),
668
669                        #[cfg(feature = "with-ipnetwork")]
670                        "INET" | "CIDR" => Value::IpNetwork(
671                            row.try_get::<Option<ipnetwork::IpNetwork>, _>(c.ordinal())
672                                .expect("Failed to get ip address"),
673                        ),
674                        #[cfg(feature = "with-ipnetwork")]
675                        "INET[]" | "CIDR[]" => Value::Array(
676                            sea_query::ArrayType::IpNetwork,
677                            row.try_get::<Option<Vec<ipnetwork::IpNetwork>>, _>(c.ordinal())
678                                .expect("Failed to get ip address array")
679                                .map(|vals| {
680                                    Box::new(
681                                        vals.into_iter()
682                                            .map(|val| Value::IpNetwork(Some(val)))
683                                            .collect(),
684                                    )
685                                }),
686                        ),
687
688                        #[cfg(feature = "with-mac_address")]
689                        "MACADDR" | "MACADDR8" => Value::MacAddress(
690                            row.try_get::<Option<mac_address::MacAddress>, _>(c.ordinal())
691                                .expect("Failed to get mac address"),
692                        ),
693                        #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
694                        "MACADDR[]" | "MACADDR8[]" => Value::Array(
695                            sea_query::ArrayType::MacAddress,
696                            row.try_get::<Option<Vec<mac_address::MacAddress>>, _>(c.ordinal())
697                                .expect("Failed to get mac address array")
698                                .map(|vals| {
699                                    Box::new(
700                                        vals.into_iter()
701                                            .map(|val| Value::MacAddress(Some(val)))
702                                            .collect(),
703                                    )
704                                }),
705                        ),
706
707                        #[cfg(feature = "with-chrono")]
708                        "TIMESTAMP" => Value::ChronoDateTime(
709                            row.try_get::<Option<chrono::NaiveDateTime>, _>(c.ordinal())
710                                .expect("Failed to get timestamp"),
711                        ),
712                        #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
713                        "TIMESTAMP" => Value::TimeDateTime(
714                            row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
715                                .expect("Failed to get timestamp"),
716                        ),
717
718                        #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
719                        "TIMESTAMP[]" => Value::Array(
720                            sea_query::ArrayType::ChronoDateTime,
721                            row.try_get::<Option<Vec<chrono::NaiveDateTime>>, _>(c.ordinal())
722                                .expect("Failed to get timestamp array")
723                                .map(|vals| {
724                                    Box::new(
725                                        vals.into_iter()
726                                            .map(|val| Value::ChronoDateTime(Some(val)))
727                                            .collect(),
728                                    )
729                                }),
730                        ),
731                        #[cfg(all(
732                            feature = "with-time",
733                            not(feature = "with-chrono"),
734                            feature = "postgres-array"
735                        ))]
736                        "TIMESTAMP[]" => Value::Array(
737                            sea_query::ArrayType::TimeDateTime,
738                            row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
739                                .expect("Failed to get timestamp array")
740                                .map(|vals| {
741                                    Box::new(
742                                        vals.into_iter()
743                                            .map(|val| Value::TimeDateTime(Some(val)))
744                                            .collect(),
745                                    )
746                                }),
747                        ),
748
749                        #[cfg(feature = "with-chrono")]
750                        "DATE" => Value::ChronoDate(
751                            row.try_get::<Option<chrono::NaiveDate>, _>(c.ordinal())
752                                .expect("Failed to get date"),
753                        ),
754                        #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
755                        "DATE" => Value::TimeDate(
756                            row.try_get::<Option<time::Date>, _>(c.ordinal())
757                                .expect("Failed to get date"),
758                        ),
759
760                        #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
761                        "DATE[]" => Value::Array(
762                            sea_query::ArrayType::ChronoDate,
763                            row.try_get::<Option<Vec<chrono::NaiveDate>>, _>(c.ordinal())
764                                .expect("Failed to get date array")
765                                .map(|vals| {
766                                    Box::new(
767                                        vals.into_iter()
768                                            .map(|val| Value::ChronoDate(Some(val)))
769                                            .collect(),
770                                    )
771                                }),
772                        ),
773                        #[cfg(all(
774                            feature = "with-time",
775                            not(feature = "with-chrono"),
776                            feature = "postgres-array"
777                        ))]
778                        "DATE[]" => Value::Array(
779                            sea_query::ArrayType::TimeDate,
780                            row.try_get::<Option<Vec<time::Date>>, _>(c.ordinal())
781                                .expect("Failed to get date array")
782                                .map(|vals| {
783                                    Box::new(
784                                        vals.into_iter()
785                                            .map(|val| Value::TimeDate(Some(val)))
786                                            .collect(),
787                                    )
788                                }),
789                        ),
790
791                        #[cfg(feature = "with-chrono")]
792                        "TIME" => Value::ChronoTime(
793                            row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
794                                .expect("Failed to get time"),
795                        ),
796                        #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
797                        "TIME" => Value::TimeTime(
798                            row.try_get::<Option<time::Time>, _>(c.ordinal())
799                                .expect("Failed to get time"),
800                        ),
801
802                        #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
803                        "TIME[]" => Value::Array(
804                            sea_query::ArrayType::ChronoTime,
805                            row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
806                                .expect("Failed to get time array")
807                                .map(|vals| {
808                                    Box::new(
809                                        vals.into_iter()
810                                            .map(|val| Value::ChronoTime(Some(val)))
811                                            .collect(),
812                                    )
813                                }),
814                        ),
815                        #[cfg(all(
816                            feature = "with-time",
817                            not(feature = "with-chrono"),
818                            feature = "postgres-array"
819                        ))]
820                        "TIME[]" => Value::Array(
821                            sea_query::ArrayType::TimeTime,
822                            row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
823                                .expect("Failed to get time array")
824                                .map(|vals| {
825                                    Box::new(
826                                        vals.into_iter()
827                                            .map(|val| Value::TimeTime(Some(val)))
828                                            .collect(),
829                                    )
830                                }),
831                        ),
832
833                        #[cfg(feature = "with-chrono")]
834                        "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(
835                            row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
836                                .expect("Failed to get timestamptz"),
837                        ),
838                        #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
839                        "TIMESTAMPTZ" => Value::TimeDateTimeWithTimeZone(
840                            row.try_get::<Option<time::OffsetDateTime>, _>(c.ordinal())
841                                .expect("Failed to get timestamptz"),
842                        ),
843
844                        #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
845                        "TIMESTAMPTZ[]" => Value::Array(
846                            sea_query::ArrayType::ChronoDateTimeUtc,
847                            row.try_get::<Option<Vec<chrono::DateTime<chrono::Utc>>>, _>(
848                                c.ordinal(),
849                            )
850                            .expect("Failed to get timestamptz array")
851                            .map(|vals| {
852                                Box::new(
853                                    vals.into_iter()
854                                        .map(|val| Value::ChronoDateTimeUtc(Some(val)))
855                                        .collect(),
856                                )
857                            }),
858                        ),
859                        #[cfg(all(
860                            feature = "with-time",
861                            not(feature = "with-chrono"),
862                            feature = "postgres-array"
863                        ))]
864                        "TIMESTAMPTZ[]" => Value::Array(
865                            sea_query::ArrayType::TimeDateTimeWithTimeZone,
866                            row.try_get::<Option<Vec<time::OffsetDateTime>>, _>(c.ordinal())
867                                .expect("Failed to get timestamptz array")
868                                .map(|vals| {
869                                    Box::new(
870                                        vals.into_iter()
871                                            .map(|val| Value::TimeDateTimeWithTimeZone(Some(val)))
872                                            .collect(),
873                                    )
874                                }),
875                        ),
876
877                        #[cfg(feature = "with-chrono")]
878                        "TIMETZ" => Value::ChronoTime(
879                            row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
880                                .expect("Failed to get timetz"),
881                        ),
882                        #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
883                        "TIMETZ" => {
884                            Value::TimeTime(row.try_get(c.ordinal()).expect("Failed to get timetz"))
885                        }
886
887                        #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
888                        "TIMETZ[]" => Value::Array(
889                            sea_query::ArrayType::ChronoTime,
890                            row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
891                                .expect("Failed to get timetz array")
892                                .map(|vals| {
893                                    Box::new(
894                                        vals.into_iter()
895                                            .map(|val| Value::ChronoTime(Some(val)))
896                                            .collect(),
897                                    )
898                                }),
899                        ),
900                        #[cfg(all(
901                            feature = "with-time",
902                            not(feature = "with-chrono"),
903                            feature = "postgres-array"
904                        ))]
905                        "TIMETZ[]" => Value::Array(
906                            sea_query::ArrayType::TimeTime,
907                            row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
908                                .expect("Failed to get timetz array")
909                                .map(|vals| {
910                                    Box::new(
911                                        vals.into_iter()
912                                            .map(|val| Value::TimeTime(Some(val)))
913                                            .collect(),
914                                    )
915                                }),
916                        ),
917
918                        #[cfg(feature = "with-uuid")]
919                        "UUID" => Value::Uuid(
920                            row.try_get::<Option<uuid::Uuid>, _>(c.ordinal())
921                                .expect("Failed to get uuid"),
922                        ),
923
924                        #[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
925                        "UUID[]" => Value::Array(
926                            sea_query::ArrayType::Uuid,
927                            row.try_get::<Option<Vec<uuid::Uuid>>, _>(c.ordinal())
928                                .expect("Failed to get uuid array")
929                                .map(|vals| {
930                                    Box::new(
931                                        vals.into_iter()
932                                            .map(|val| Value::Uuid(Some(val)))
933                                            .collect(),
934                                    )
935                                }),
936                        ),
937
938                        _ => unreachable!("Unknown column type: {}", c.type_info().name()),
939                    },
940                )
941            })
942            .collect(),
943    }
944}