prax_sqlx/
postgres.rs

1//! PostgreSQL-specific functionality for SQLx.
2
3use crate::error::SqlxResult;
4use sqlx::Row;
5use sqlx::postgres::{PgPool, PgRow};
6
7/// PostgreSQL-specific query helpers.
8pub struct PgHelpers;
9
10impl PgHelpers {
11    /// Execute a query with RETURNING clause.
12    pub async fn query_returning(pool: &PgPool, sql: &str) -> SqlxResult<Vec<PgRow>> {
13        let rows = sqlx::query(sql).fetch_all(pool).await?;
14        Ok(rows)
15    }
16
17    /// Execute INSERT ... ON CONFLICT (upsert).
18    pub async fn upsert(
19        _pool: &PgPool,
20        table: &str,
21        columns: &[&str],
22        conflict_columns: &[&str],
23        update_columns: &[&str],
24    ) -> SqlxResult<String> {
25        let cols = columns.join(", ");
26        let placeholders: Vec<String> = (1..=columns.len()).map(|i| format!("${}", i)).collect();
27        let vals = placeholders.join(", ");
28        let conflict = conflict_columns.join(", ");
29        let updates: Vec<String> = update_columns
30            .iter()
31            .map(|c| format!("{} = EXCLUDED.{}", c, c))
32            .collect();
33        let update_clause = updates.join(", ");
34
35        let sql = format!(
36            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {} RETURNING *",
37            table, cols, vals, conflict, update_clause
38        );
39
40        Ok(sql)
41    }
42
43    /// Generate a PostgreSQL array literal.
44    pub fn array_literal<T: std::fmt::Display>(values: &[T]) -> String {
45        let items: Vec<String> = values.iter().map(|v| format!("'{}'", v)).collect();
46        format!("ARRAY[{}]", items.join(", "))
47    }
48
49    /// Generate a PostgreSQL JSON/JSONB path expression.
50    pub fn json_path(column: &str, path: &[&str]) -> String {
51        if path.is_empty() {
52            column.to_string()
53        } else {
54            let path_str: Vec<String> = path.iter().map(|p| format!("'{}'", p)).collect();
55            format!("{}->>{}", column, path_str.join("->"))
56        }
57    }
58
59    /// Check if a PostgreSQL extension is available.
60    pub async fn has_extension(pool: &PgPool, extension: &str) -> SqlxResult<bool> {
61        let sql = "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = $1)";
62        let row = sqlx::query(sql).bind(extension).fetch_one(pool).await?;
63        let exists: bool = row.try_get(0)?;
64        Ok(exists)
65    }
66
67    /// Get PostgreSQL version.
68    pub async fn version(pool: &PgPool) -> SqlxResult<String> {
69        let sql = "SELECT version()";
70        let row = sqlx::query(sql).fetch_one(pool).await?;
71        let version: String = row.try_get(0)?;
72        Ok(version)
73    }
74
75    /// Execute LISTEN for notifications.
76    pub async fn listen(pool: &PgPool, channel: &str) -> SqlxResult<()> {
77        let sql = format!("LISTEN {}", channel);
78        sqlx::query(&sql).execute(pool).await?;
79        Ok(())
80    }
81
82    /// Execute NOTIFY.
83    pub async fn notify(pool: &PgPool, channel: &str, payload: &str) -> SqlxResult<()> {
84        let sql = format!("NOTIFY {}, '{}'", channel, payload.replace('\'', "''"));
85        sqlx::query(&sql).execute(pool).await?;
86        Ok(())
87    }
88}
89
90/// PostgreSQL advisory lock helpers.
91pub struct AdvisoryLock;
92
93impl AdvisoryLock {
94    /// Acquire an advisory lock (blocking).
95    pub async fn acquire(pool: &PgPool, key: i64) -> SqlxResult<()> {
96        sqlx::query("SELECT pg_advisory_lock($1)")
97            .bind(key)
98            .execute(pool)
99            .await?;
100        Ok(())
101    }
102
103    /// Try to acquire an advisory lock (non-blocking).
104    pub async fn try_acquire(pool: &PgPool, key: i64) -> SqlxResult<bool> {
105        let row = sqlx::query("SELECT pg_try_advisory_lock($1)")
106            .bind(key)
107            .fetch_one(pool)
108            .await?;
109        let acquired: bool = row.try_get(0)?;
110        Ok(acquired)
111    }
112
113    /// Release an advisory lock.
114    pub async fn release(pool: &PgPool, key: i64) -> SqlxResult<()> {
115        sqlx::query("SELECT pg_advisory_unlock($1)")
116            .bind(key)
117            .execute(pool)
118            .await?;
119        Ok(())
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    #[test]
128    fn test_array_literal() {
129        assert_eq!(PgHelpers::array_literal(&[1, 2, 3]), "ARRAY['1', '2', '3']");
130        assert_eq!(PgHelpers::array_literal(&["a", "b"]), "ARRAY['a', 'b']");
131    }
132
133    #[test]
134    fn test_json_path() {
135        assert_eq!(PgHelpers::json_path("data", &[]), "data");
136        assert_eq!(PgHelpers::json_path("data", &["name"]), "data->>'name'");
137        assert_eq!(
138            PgHelpers::json_path("data", &["user", "name"]),
139            "data->>'user'->'name'"
140        );
141    }
142}