Skip to main content

sea_orm/driver/
sqlx_postgres.rs

1use futures_util::lock::Mutex;
2use log::LevelFilter;
3use sea_query::Values;
4use std::{fmt::Write, future::Future, pin::Pin, sync::Arc};
5
6use sqlx::{
7    Connection, Executor, PgPool, Postgres,
8    pool::PoolConnection,
9    postgres::{PgConnectOptions, PgQueryResult, PgRow},
10};
11
12use sea_query_sqlx::SqlxValues;
13use tracing::instrument;
14
15use crate::{
16    AccessMode, ConnectOptions, DatabaseConnection, DatabaseConnectionType, DatabaseTransaction,
17    IsolationLevel, QueryStream, Statement, TransactionError, debug_print, error::*, executor::*,
18};
19
20use super::sqlx_common::*;
21
22/// Defines the [sqlx::postgres] connector
23#[derive(Debug)]
24pub struct SqlxPostgresConnector;
25
26/// Defines a sqlx PostgreSQL pool
27#[derive(Clone)]
28pub struct SqlxPostgresPoolConnection {
29    pub(crate) pool: PgPool,
30    metric_callback: Option<crate::metric::Callback>,
31    pub(crate) record_stmt_in_spans: bool,
32}
33
34impl std::fmt::Debug for SqlxPostgresPoolConnection {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
37    }
38}
39
40impl From<PgPool> for SqlxPostgresPoolConnection {
41    fn from(pool: PgPool) -> Self {
42        SqlxPostgresPoolConnection {
43            pool,
44            metric_callback: None,
45            record_stmt_in_spans: true,
46        }
47    }
48}
49
50impl From<PgPool> for DatabaseConnection {
51    fn from(pool: PgPool) -> Self {
52        DatabaseConnectionType::SqlxPostgresPoolConnection(pool.into()).into()
53    }
54}
55
56impl SqlxPostgresConnector {
57    /// Check if the URI provided corresponds to `postgres://` for a PostgreSQL database
58    pub fn accepts(string: &str) -> bool {
59        string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
60    }
61
62    /// Add configuration options for the PostgreSQL database
63    #[instrument(level = "trace")]
64    pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
65        let record_stmt_in_spans = options.get_record_stmt_in_spans();
66        let mut sqlx_opts = options
67            .url
68            .parse::<PgConnectOptions>()
69            .map_err(sqlx_error_to_conn_err)?;
70        use sqlx::ConnectOptions;
71        if !options.sqlx_logging {
72            sqlx_opts = sqlx_opts.disable_statement_logging();
73        } else {
74            sqlx_opts = sqlx_opts.log_statements(options.sqlx_logging_level);
75            if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
76                sqlx_opts = sqlx_opts.log_slow_statements(
77                    options.sqlx_slow_statements_logging_level,
78                    options.sqlx_slow_statements_logging_threshold,
79                );
80            }
81        }
82
83        if let Some(application_name) = &options.application_name {
84            sqlx_opts = sqlx_opts.application_name(application_name);
85        }
86
87        if let Some(timeout) = options.statement_timeout {
88            sqlx_opts = sqlx_opts.options([("statement_timeout", timeout.as_millis().to_string())]);
89        }
90
91        if let Some(f) = &options.pg_opts_fn {
92            sqlx_opts = f(sqlx_opts);
93        }
94
95        let set_search_path_sql = options.schema_search_path.as_ref().map(|schema| {
96            let mut string = "SET search_path = ".to_owned();
97            if schema.starts_with('"') {
98                write!(&mut string, "{schema}").expect("Infallible");
99            } else {
100                for (i, schema) in schema.split(',').enumerate() {
101                    if i > 0 {
102                        write!(&mut string, ",").expect("Infallible");
103                    }
104                    if schema.starts_with('"') {
105                        write!(&mut string, "{schema}").expect("Infallible");
106                    } else {
107                        write!(&mut string, "\"{schema}\"").expect("Infallible");
108                    }
109                }
110            }
111            string
112        });
113
114        let lazy = options.connect_lazy;
115        let after_connect = options.after_connect.clone();
116        let pg_pool_opts_fn = options.pg_pool_opts_fn.clone();
117        let mut pool_options = options.sqlx_pool_options();
118
119        if let Some(sql) = set_search_path_sql {
120            pool_options = pool_options.after_connect(move |conn, _| {
121                let sql = sql.clone();
122                Box::pin(async move {
123                    sqlx::Executor::execute(conn, sqlx::AssertSqlSafe(sql))
124                        .await
125                        .map(|_| ())
126                })
127            });
128        }
129        if let Some(f) = &pg_pool_opts_fn {
130            pool_options = f(pool_options);
131        }
132        let pool = if lazy {
133            pool_options.connect_lazy_with(sqlx_opts)
134        } else {
135            pool_options
136                .connect_with(sqlx_opts)
137                .await
138                .map_err(sqlx_error_to_conn_err)?
139        };
140
141        let conn: DatabaseConnection =
142            DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
143                pool,
144                metric_callback: None,
145                record_stmt_in_spans,
146            })
147            .into();
148
149        if let Some(cb) = after_connect {
150            cb(conn.clone()).await?;
151        }
152
153        Ok(conn)
154    }
155}
156
157impl SqlxPostgresConnector {
158    /// Instantiate a sqlx pool connection to a [DatabaseConnection]
159    pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
160        DatabaseConnectionType::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
161            pool,
162            metric_callback: None,
163            record_stmt_in_spans: true,
164        })
165        .into()
166    }
167}
168
169impl SqlxPostgresPoolConnection {
170    /// Execute a [Statement] on a PostgreSQL backend
171    #[instrument(level = "trace")]
172    pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
173        debug_print!("{}", stmt);
174
175        let query = sqlx_query(&stmt);
176        let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
177        crate::metric::metric!(self.metric_callback, &stmt, {
178            match query.execute(&mut *conn).await {
179                Ok(res) => Ok(res.into()),
180                Err(err) => Err(sqlx_error_to_exec_err(err)),
181            }
182        })
183    }
184
185    /// Execute an unprepared SQL statement on a PostgreSQL backend
186    #[instrument(level = "trace")]
187    pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
188        debug_print!("{}", sql);
189
190        let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
191        match conn.execute(sqlx::AssertSqlSafe(sql.to_owned())).await {
192            Ok(res) => Ok(res.into()),
193            Err(err) => Err(sqlx_error_to_exec_err(err)),
194        }
195    }
196
197    /// Get one result from a SQL query. Returns [Option::None] if no match was found
198    #[instrument(level = "trace")]
199    pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
200        debug_print!("{}", stmt);
201
202        let query = sqlx_query(&stmt);
203        let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
204        crate::metric::metric!(self.metric_callback, &stmt, {
205            match query.fetch_one(&mut *conn).await {
206                Ok(row) => Ok(Some(row.into())),
207                Err(err) => match err {
208                    sqlx::Error::RowNotFound => Ok(None),
209                    _ => Err(sqlx_error_to_query_err(err)),
210                },
211            }
212        })
213    }
214
215    /// Get the results of a query returning them as a Vec<[QueryResult]>
216    #[instrument(level = "trace")]
217    pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
218        debug_print!("{}", stmt);
219
220        let query = sqlx_query(&stmt);
221        let mut conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
222        crate::metric::metric!(self.metric_callback, &stmt, {
223            match query.fetch_all(&mut *conn).await {
224                Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
225                Err(err) => Err(sqlx_error_to_query_err(err)),
226            }
227        })
228    }
229
230    /// Stream the results of executing a SQL query
231    #[instrument(level = "trace")]
232    pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
233        debug_print!("{}", stmt);
234
235        let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
236        Ok(QueryStream::from((
237            conn,
238            stmt,
239            self.metric_callback.clone(),
240        )))
241    }
242
243    /// Bundle a set of SQL statements that execute together.
244    #[instrument(level = "trace")]
245    pub async fn begin(
246        &self,
247        isolation_level: Option<IsolationLevel>,
248        access_mode: Option<AccessMode>,
249    ) -> Result<DatabaseTransaction, DbErr> {
250        let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
251        DatabaseTransaction::new_postgres(
252            conn,
253            self.metric_callback.clone(),
254            self.record_stmt_in_spans,
255            isolation_level,
256            access_mode,
257        )
258        .await
259    }
260
261    /// Create a PostgreSQL transaction
262    #[instrument(level = "trace", skip(callback))]
263    pub async fn transaction<F, T, E>(
264        &self,
265        callback: F,
266        isolation_level: Option<IsolationLevel>,
267        access_mode: Option<AccessMode>,
268    ) -> Result<T, TransactionError<E>>
269    where
270        F: for<'b> FnOnce(
271                &'b DatabaseTransaction,
272            ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
273            + Send,
274        T: Send,
275        E: std::fmt::Display + std::fmt::Debug + Send,
276    {
277        let conn = self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
278        let transaction = DatabaseTransaction::new_postgres(
279            conn,
280            self.metric_callback.clone(),
281            self.record_stmt_in_spans,
282            isolation_level,
283            access_mode,
284        )
285        .await
286        .map_err(|e| TransactionError::Connection(e))?;
287        transaction.run(callback).await
288    }
289
290    pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
291    where
292        F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
293    {
294        self.metric_callback = Some(Arc::new(callback));
295    }
296
297    /// Checks if a connection to the database is still valid.
298    pub async fn ping(&self) -> Result<(), DbErr> {
299        let conn = &mut self.pool.acquire().await.map_err(sqlx_conn_acquire_err)?;
300        match conn.ping().await {
301            Ok(_) => Ok(()),
302            Err(err) => Err(sqlx_error_to_conn_err(err)),
303        }
304    }
305
306    /// Explicitly close the Postgres connection.
307    /// See [`Self::close_by_ref`] for usage with references.
308    pub async fn close(self) -> Result<(), DbErr> {
309        self.close_by_ref().await
310    }
311
312    /// Explicitly close the Postgres connection
313    pub async fn close_by_ref(&self) -> Result<(), DbErr> {
314        self.pool.close().await;
315        Ok(())
316    }
317}
318
319impl From<PgRow> for QueryResult {
320    fn from(row: PgRow) -> QueryResult {
321        QueryResult {
322            row: QueryResultRow::SqlxPostgres(row),
323        }
324    }
325}
326
327impl From<PgQueryResult> for ExecResult {
328    fn from(result: PgQueryResult) -> ExecResult {
329        ExecResult {
330            result: ExecResultHolder::SqlxPostgres(result),
331        }
332    }
333}
334
335pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, SqlxValues> {
336    let values = stmt
337        .values
338        .as_ref()
339        .map_or(Values(Vec::new()), |values| values.clone());
340    sqlx::query_with(sqlx::AssertSqlSafe(stmt.sql.as_str()), SqlxValues(values))
341}
342
343pub(crate) async fn set_transaction_config(
344    conn: &mut PoolConnection<Postgres>,
345    isolation_level: Option<IsolationLevel>,
346    access_mode: Option<AccessMode>,
347) -> Result<(), DbErr> {
348    let mut settings = Vec::new();
349
350    if let Some(isolation_level) = isolation_level {
351        settings.push(format!("ISOLATION LEVEL {isolation_level}"));
352    }
353
354    if let Some(access_mode) = access_mode {
355        settings.push(access_mode.to_string());
356    }
357
358    if !settings.is_empty() {
359        let sql = format!("SET TRANSACTION {}", settings.join(" "));
360        sqlx::query(sqlx::AssertSqlSafe(sql))
361            .execute(&mut **conn)
362            .await
363            .map_err(sqlx_error_to_exec_err)?;
364    }
365    Ok(())
366}
367
368impl
369    From<(
370        PoolConnection<sqlx::Postgres>,
371        Statement,
372        Option<crate::metric::Callback>,
373    )> for crate::QueryStream
374{
375    fn from(
376        (conn, stmt, metric_callback): (
377            PoolConnection<sqlx::Postgres>,
378            Statement,
379            Option<crate::metric::Callback>,
380        ),
381    ) -> Self {
382        crate::QueryStream::build(
383            stmt,
384            crate::InnerConnection::Postgres(conn),
385            metric_callback,
386        )
387    }
388}
389
390impl crate::DatabaseTransaction {
391    pub(crate) async fn new_postgres(
392        inner: PoolConnection<sqlx::Postgres>,
393        metric_callback: Option<crate::metric::Callback>,
394        record_stmt_in_spans: bool,
395        isolation_level: Option<IsolationLevel>,
396        access_mode: Option<AccessMode>,
397    ) -> Result<crate::DatabaseTransaction, DbErr> {
398        Self::begin(
399            Arc::new(Mutex::new(crate::InnerConnection::Postgres(inner))),
400            crate::DbBackend::Postgres,
401            metric_callback,
402            record_stmt_in_spans,
403            isolation_level,
404            access_mode,
405            None,
406        )
407        .await
408    }
409}
410
411#[cfg(feature = "proxy")]
412pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> crate::ProxyRow {
413    // https://docs.rs/sqlx-postgres/0.7.2/src/sqlx_postgres/type_info.rs.html
414    // https://docs.rs/sqlx-postgres/0.7.2/sqlx_postgres/types/index.html
415    use sea_query::Value;
416    use sqlx::{Column, Row, TypeInfo};
417    crate::ProxyRow {
418        values: row
419            .columns()
420            .iter()
421            .map(|c| {
422                (
423                    c.name().to_string(),
424                    match c.type_info().name() {
425                        "BOOL" => {
426                            Value::Bool(row.try_get(c.ordinal()).expect("Failed to get boolean"))
427                        }
428                        #[cfg(feature = "postgres-array")]
429                        "BOOL[]" => Value::Array(
430                            sea_query::ArrayType::Bool,
431                            row.try_get::<Option<Vec<bool>>, _>(c.ordinal())
432                                .expect("Failed to get boolean array")
433                                .map(|vals| {
434                                    Box::new(
435                                        vals.into_iter()
436                                            .map(|val| Value::Bool(Some(val)))
437                                            .collect(),
438                                    )
439                                }),
440                        ),
441
442                        "\"CHAR\"" => Value::TinyInt(
443                            row.try_get(c.ordinal())
444                                .expect("Failed to get small integer"),
445                        ),
446                        #[cfg(feature = "postgres-array")]
447                        "\"CHAR\"[]" => Value::Array(
448                            sea_query::ArrayType::TinyInt,
449                            row.try_get::<Option<Vec<i8>>, _>(c.ordinal())
450                                .expect("Failed to get small integer array")
451                                .map(|vals: Vec<i8>| {
452                                    Box::new(
453                                        vals.into_iter()
454                                            .map(|val| Value::TinyInt(Some(val)))
455                                            .collect(),
456                                    )
457                                }),
458                        ),
459
460                        "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(
461                            row.try_get(c.ordinal())
462                                .expect("Failed to get small integer"),
463                        ),
464                        #[cfg(feature = "postgres-array")]
465                        "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
466                            sea_query::ArrayType::SmallInt,
467                            row.try_get::<Option<Vec<i16>>, _>(c.ordinal())
468                                .expect("Failed to get small integer array")
469                                .map(|vals: Vec<i16>| {
470                                    Box::new(
471                                        vals.into_iter()
472                                            .map(|val| Value::SmallInt(Some(val)))
473                                            .collect(),
474                                    )
475                                }),
476                        ),
477
478                        "INT" | "SERIAL" | "INT4" => {
479                            Value::Int(row.try_get(c.ordinal()).expect("Failed to get integer"))
480                        }
481                        #[cfg(feature = "postgres-array")]
482                        "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
483                            sea_query::ArrayType::Int,
484                            row.try_get::<Option<Vec<i32>>, _>(c.ordinal())
485                                .expect("Failed to get integer array")
486                                .map(|vals: Vec<i32>| {
487                                    Box::new(
488                                        vals.into_iter().map(|val| Value::Int(Some(val))).collect(),
489                                    )
490                                }),
491                        ),
492
493                        "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(
494                            row.try_get(c.ordinal()).expect("Failed to get big integer"),
495                        ),
496                        #[cfg(feature = "postgres-array")]
497                        "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
498                            sea_query::ArrayType::BigInt,
499                            row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
500                                .expect("Failed to get big integer array")
501                                .map(|vals: Vec<i64>| {
502                                    Box::new(
503                                        vals.into_iter()
504                                            .map(|val| Value::BigInt(Some(val)))
505                                            .collect(),
506                                    )
507                                }),
508                        ),
509
510                        "FLOAT4" | "REAL" => {
511                            Value::Float(row.try_get(c.ordinal()).expect("Failed to get float"))
512                        }
513                        #[cfg(feature = "postgres-array")]
514                        "FLOAT4[]" | "REAL[]" => Value::Array(
515                            sea_query::ArrayType::Float,
516                            row.try_get::<Option<Vec<f32>>, _>(c.ordinal())
517                                .expect("Failed to get float array")
518                                .map(|vals| {
519                                    Box::new(
520                                        vals.into_iter()
521                                            .map(|val| Value::Float(Some(val)))
522                                            .collect(),
523                                    )
524                                }),
525                        ),
526
527                        "FLOAT8" | "DOUBLE PRECISION" => {
528                            Value::Double(row.try_get(c.ordinal()).expect("Failed to get double"))
529                        }
530                        #[cfg(feature = "postgres-array")]
531                        "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
532                            sea_query::ArrayType::Double,
533                            row.try_get::<Option<Vec<f64>>, _>(c.ordinal())
534                                .expect("Failed to get double array")
535                                .map(|vals| {
536                                    Box::new(
537                                        vals.into_iter()
538                                            .map(|val| Value::Double(Some(val)))
539                                            .collect(),
540                                    )
541                                }),
542                        ),
543
544                        "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(
545                            row.try_get::<Option<String>, _>(c.ordinal())
546                                .expect("Failed to get string"),
547                        ),
548                        #[cfg(feature = "postgres-array")]
549                        "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
550                            sea_query::ArrayType::String,
551                            row.try_get::<Option<Vec<String>>, _>(c.ordinal())
552                                .expect("Failed to get string array")
553                                .map(|vals| {
554                                    Box::new(
555                                        vals.into_iter()
556                                            .map(|val| Value::String(Some(val)))
557                                            .collect(),
558                                    )
559                                }),
560                        ),
561
562                        "BYTEA" => Value::Bytes(
563                            row.try_get::<Option<Vec<u8>>, _>(c.ordinal())
564                                .expect("Failed to get bytes"),
565                        ),
566                        #[cfg(feature = "postgres-array")]
567                        "BYTEA[]" => Value::Array(
568                            sea_query::ArrayType::Bytes,
569                            row.try_get::<Option<Vec<Vec<u8>>>, _>(c.ordinal())
570                                .expect("Failed to get bytes array")
571                                .map(|vals| {
572                                    Box::new(
573                                        vals.into_iter()
574                                            .map(|val| Value::Bytes(Some(val)))
575                                            .collect(),
576                                    )
577                                }),
578                        ),
579
580                        #[cfg(feature = "with-bigdecimal")]
581                        "NUMERIC" => Value::BigDecimal(
582                            row.try_get::<Option<bigdecimal::BigDecimal>, _>(c.ordinal())
583                                .expect("Failed to get numeric"),
584                        ),
585                        #[cfg(all(
586                            feature = "with-rust_decimal",
587                            not(feature = "with-bigdecimal")
588                        ))]
589                        "NUMERIC" => {
590                            Value::Decimal(row.try_get(c.ordinal()).expect("Failed to get numeric"))
591                        }
592
593                        #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
594                        "NUMERIC[]" => Value::Array(
595                            sea_query::ArrayType::BigDecimal,
596                            row.try_get::<Option<Vec<bigdecimal::BigDecimal>>, _>(c.ordinal())
597                                .expect("Failed to get numeric array")
598                                .map(|vals| {
599                                    Box::new(
600                                        vals.into_iter()
601                                            .map(|val| Value::BigDecimal(Some(val)))
602                                            .collect(),
603                                    )
604                                }),
605                        ),
606                        #[cfg(all(
607                            feature = "with-rust_decimal",
608                            not(feature = "with-bigdecimal"),
609                            feature = "postgres-array"
610                        ))]
611                        "NUMERIC[]" => Value::Array(
612                            sea_query::ArrayType::Decimal,
613                            row.try_get::<Option<Vec<rust_decimal::Decimal>>, _>(c.ordinal())
614                                .expect("Failed to get numeric array")
615                                .map(|vals| {
616                                    Box::new(
617                                        vals.into_iter()
618                                            .map(|val| Value::Decimal(Some(val)))
619                                            .collect(),
620                                    )
621                                }),
622                        ),
623
624                        "OID" => {
625                            Value::BigInt(row.try_get(c.ordinal()).expect("Failed to get oid"))
626                        }
627                        #[cfg(feature = "postgres-array")]
628                        "OID[]" => Value::Array(
629                            sea_query::ArrayType::BigInt,
630                            row.try_get::<Option<Vec<i64>>, _>(c.ordinal())
631                                .expect("Failed to get oid array")
632                                .map(|vals| {
633                                    Box::new(
634                                        vals.into_iter()
635                                            .map(|val| Value::BigInt(Some(val)))
636                                            .collect(),
637                                    )
638                                }),
639                        ),
640
641                        #[cfg(feature = "with-json")]
642                        "JSON" | "JSONB" => Value::Json(
643                            row.try_get::<Option<serde_json::Value>, _>(c.ordinal())
644                                .expect("Failed to get json")
645                                .map(Box::new),
646                        ),
647                        #[cfg(all(
648                            feature = "with-json",
649                            any(feature = "json-array", feature = "postgres-array")
650                        ))]
651                        "JSON[]" | "JSONB[]" => Value::Array(
652                            sea_query::ArrayType::Json,
653                            row.try_get::<Option<Vec<serde_json::Value>>, _>(c.ordinal())
654                                .expect("Failed to get json array")
655                                .map(|vals| {
656                                    Box::new(
657                                        vals.into_iter()
658                                            .map(|val| Value::Json(Some(Box::new(val))))
659                                            .collect(),
660                                    )
661                                }),
662                        ),
663
664                        #[cfg(feature = "with-ipnetwork")]
665                        "INET" | "CIDR" => Value::IpNetwork(
666                            row.try_get::<Option<ipnetwork::IpNetwork>, _>(c.ordinal())
667                                .expect("Failed to get ip address"),
668                        ),
669                        #[cfg(feature = "with-ipnetwork")]
670                        "INET[]" | "CIDR[]" => Value::Array(
671                            sea_query::ArrayType::IpNetwork,
672                            row.try_get::<Option<Vec<ipnetwork::IpNetwork>>, _>(c.ordinal())
673                                .expect("Failed to get ip address array")
674                                .map(|vals| {
675                                    Box::new(
676                                        vals.into_iter()
677                                            .map(|val| Value::IpNetwork(Some(val)))
678                                            .collect(),
679                                    )
680                                }),
681                        ),
682
683                        #[cfg(feature = "with-mac_address")]
684                        "MACADDR" | "MACADDR8" => Value::MacAddress(
685                            row.try_get::<Option<mac_address::MacAddress>, _>(c.ordinal())
686                                .expect("Failed to get mac address"),
687                        ),
688                        #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
689                        "MACADDR[]" | "MACADDR8[]" => Value::Array(
690                            sea_query::ArrayType::MacAddress,
691                            row.try_get::<Option<Vec<mac_address::MacAddress>>, _>(c.ordinal())
692                                .expect("Failed to get mac address array")
693                                .map(|vals| {
694                                    Box::new(
695                                        vals.into_iter()
696                                            .map(|val| Value::MacAddress(Some(val)))
697                                            .collect(),
698                                    )
699                                }),
700                        ),
701
702                        #[cfg(feature = "with-chrono")]
703                        "TIMESTAMP" => Value::ChronoDateTime(
704                            row.try_get::<Option<chrono::NaiveDateTime>, _>(c.ordinal())
705                                .expect("Failed to get timestamp"),
706                        ),
707                        #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
708                        "TIMESTAMP" => Value::TimeDateTime(
709                            row.try_get::<Option<time::PrimitiveDateTime>, _>(c.ordinal())
710                                .expect("Failed to get timestamp"),
711                        ),
712
713                        #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
714                        "TIMESTAMP[]" => Value::Array(
715                            sea_query::ArrayType::ChronoDateTime,
716                            row.try_get::<Option<Vec<chrono::NaiveDateTime>>, _>(c.ordinal())
717                                .expect("Failed to get timestamp array")
718                                .map(|vals| {
719                                    Box::new(
720                                        vals.into_iter()
721                                            .map(|val| Value::ChronoDateTime(Some(val)))
722                                            .collect(),
723                                    )
724                                }),
725                        ),
726                        #[cfg(all(
727                            feature = "with-time",
728                            not(feature = "with-chrono"),
729                            feature = "postgres-array"
730                        ))]
731                        "TIMESTAMP[]" => Value::Array(
732                            sea_query::ArrayType::TimeDateTime,
733                            row.try_get::<Option<Vec<time::PrimitiveDateTime>>, _>(c.ordinal())
734                                .expect("Failed to get timestamp array")
735                                .map(|vals| {
736                                    Box::new(
737                                        vals.into_iter()
738                                            .map(|val| Value::TimeDateTime(Some(val)))
739                                            .collect(),
740                                    )
741                                }),
742                        ),
743
744                        #[cfg(feature = "with-chrono")]
745                        "DATE" => Value::ChronoDate(
746                            row.try_get::<Option<chrono::NaiveDate>, _>(c.ordinal())
747                                .expect("Failed to get date"),
748                        ),
749                        #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
750                        "DATE" => Value::TimeDate(
751                            row.try_get::<Option<time::Date>, _>(c.ordinal())
752                                .expect("Failed to get date"),
753                        ),
754
755                        #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
756                        "DATE[]" => Value::Array(
757                            sea_query::ArrayType::ChronoDate,
758                            row.try_get::<Option<Vec<chrono::NaiveDate>>, _>(c.ordinal())
759                                .expect("Failed to get date array")
760                                .map(|vals| {
761                                    Box::new(
762                                        vals.into_iter()
763                                            .map(|val| Value::ChronoDate(Some(val)))
764                                            .collect(),
765                                    )
766                                }),
767                        ),
768                        #[cfg(all(
769                            feature = "with-time",
770                            not(feature = "with-chrono"),
771                            feature = "postgres-array"
772                        ))]
773                        "DATE[]" => Value::Array(
774                            sea_query::ArrayType::TimeDate,
775                            row.try_get::<Option<Vec<time::Date>>, _>(c.ordinal())
776                                .expect("Failed to get date array")
777                                .map(|vals| {
778                                    Box::new(
779                                        vals.into_iter()
780                                            .map(|val| Value::TimeDate(Some(val)))
781                                            .collect(),
782                                    )
783                                }),
784                        ),
785
786                        #[cfg(feature = "with-chrono")]
787                        "TIME" => Value::ChronoTime(
788                            row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
789                                .expect("Failed to get time"),
790                        ),
791                        #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
792                        "TIME" => Value::TimeTime(
793                            row.try_get::<Option<time::Time>, _>(c.ordinal())
794                                .expect("Failed to get time"),
795                        ),
796
797                        #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
798                        "TIME[]" => Value::Array(
799                            sea_query::ArrayType::ChronoTime,
800                            row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
801                                .expect("Failed to get time array")
802                                .map(|vals| {
803                                    Box::new(
804                                        vals.into_iter()
805                                            .map(|val| Value::ChronoTime(Some(val)))
806                                            .collect(),
807                                    )
808                                }),
809                        ),
810                        #[cfg(all(
811                            feature = "with-time",
812                            not(feature = "with-chrono"),
813                            feature = "postgres-array"
814                        ))]
815                        "TIME[]" => Value::Array(
816                            sea_query::ArrayType::TimeTime,
817                            row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
818                                .expect("Failed to get time array")
819                                .map(|vals| {
820                                    Box::new(
821                                        vals.into_iter()
822                                            .map(|val| Value::TimeTime(Some(val)))
823                                            .collect(),
824                                    )
825                                }),
826                        ),
827
828                        #[cfg(feature = "with-chrono")]
829                        "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(
830                            row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
831                                .expect("Failed to get timestamptz"),
832                        ),
833                        #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
834                        "TIMESTAMPTZ" => Value::TimeDateTimeWithTimeZone(
835                            row.try_get::<Option<time::OffsetDateTime>, _>(c.ordinal())
836                                .expect("Failed to get timestamptz"),
837                        ),
838
839                        #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
840                        "TIMESTAMPTZ[]" => Value::Array(
841                            sea_query::ArrayType::ChronoDateTimeUtc,
842                            row.try_get::<Option<Vec<chrono::DateTime<chrono::Utc>>>, _>(
843                                c.ordinal(),
844                            )
845                            .expect("Failed to get timestamptz array")
846                            .map(|vals| {
847                                Box::new(
848                                    vals.into_iter()
849                                        .map(|val| Value::ChronoDateTimeUtc(Some(val)))
850                                        .collect(),
851                                )
852                            }),
853                        ),
854                        #[cfg(all(
855                            feature = "with-time",
856                            not(feature = "with-chrono"),
857                            feature = "postgres-array"
858                        ))]
859                        "TIMESTAMPTZ[]" => Value::Array(
860                            sea_query::ArrayType::TimeDateTimeWithTimeZone,
861                            row.try_get::<Option<Vec<time::OffsetDateTime>>, _>(c.ordinal())
862                                .expect("Failed to get timestamptz array")
863                                .map(|vals| {
864                                    Box::new(
865                                        vals.into_iter()
866                                            .map(|val| Value::TimeDateTimeWithTimeZone(Some(val)))
867                                            .collect(),
868                                    )
869                                }),
870                        ),
871
872                        #[cfg(feature = "with-chrono")]
873                        "TIMETZ" => Value::ChronoTime(
874                            row.try_get::<Option<chrono::NaiveTime>, _>(c.ordinal())
875                                .expect("Failed to get timetz"),
876                        ),
877                        #[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
878                        "TIMETZ" => {
879                            Value::TimeTime(row.try_get(c.ordinal()).expect("Failed to get timetz"))
880                        }
881
882                        #[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
883                        "TIMETZ[]" => Value::Array(
884                            sea_query::ArrayType::ChronoTime,
885                            row.try_get::<Option<Vec<chrono::NaiveTime>>, _>(c.ordinal())
886                                .expect("Failed to get timetz array")
887                                .map(|vals| {
888                                    Box::new(
889                                        vals.into_iter()
890                                            .map(|val| Value::ChronoTime(Some(val)))
891                                            .collect(),
892                                    )
893                                }),
894                        ),
895                        #[cfg(all(
896                            feature = "with-time",
897                            not(feature = "with-chrono"),
898                            feature = "postgres-array"
899                        ))]
900                        "TIMETZ[]" => Value::Array(
901                            sea_query::ArrayType::TimeTime,
902                            row.try_get::<Option<Vec<time::Time>>, _>(c.ordinal())
903                                .expect("Failed to get timetz array")
904                                .map(|vals| {
905                                    Box::new(
906                                        vals.into_iter()
907                                            .map(|val| Value::TimeTime(Some(val)))
908                                            .collect(),
909                                    )
910                                }),
911                        ),
912
913                        #[cfg(feature = "with-uuid")]
914                        "UUID" => Value::Uuid(
915                            row.try_get::<Option<uuid::Uuid>, _>(c.ordinal())
916                                .expect("Failed to get uuid"),
917                        ),
918
919                        #[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
920                        "UUID[]" => Value::Array(
921                            sea_query::ArrayType::Uuid,
922                            row.try_get::<Option<Vec<uuid::Uuid>>, _>(c.ordinal())
923                                .expect("Failed to get uuid array")
924                                .map(|vals| {
925                                    Box::new(
926                                        vals.into_iter()
927                                            .map(|val| Value::Uuid(Some(val)))
928                                            .collect(),
929                                    )
930                                }),
931                        ),
932
933                        _ => unreachable!("Unknown column type: {}", c.type_info().name()),
934                    },
935                )
936            })
937            .collect(),
938    }
939}