1use crate::error::SqlxResult;
4use sqlx::Row;
5use sqlx::postgres::{PgPool, PgRow};
6
7pub struct PgHelpers;
9
10impl PgHelpers {
11 pub async fn query_returning(pool: &PgPool, sql: &str) -> SqlxResult<Vec<PgRow>> {
13 let rows = sqlx::query(sql).fetch_all(pool).await?;
14 Ok(rows)
15 }
16
17 pub async fn upsert(
19 _pool: &PgPool,
20 table: &str,
21 columns: &[&str],
22 conflict_columns: &[&str],
23 update_columns: &[&str],
24 ) -> SqlxResult<String> {
25 let cols = columns.join(", ");
26 let placeholders: Vec<String> = (1..=columns.len()).map(|i| format!("${}", i)).collect();
27 let vals = placeholders.join(", ");
28 let conflict = conflict_columns.join(", ");
29 let updates: Vec<String> = update_columns
30 .iter()
31 .map(|c| format!("{} = EXCLUDED.{}", c, c))
32 .collect();
33 let update_clause = updates.join(", ");
34
35 let sql = format!(
36 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {} RETURNING *",
37 table, cols, vals, conflict, update_clause
38 );
39
40 Ok(sql)
41 }
42
43 pub fn array_literal<T: std::fmt::Display>(values: &[T]) -> String {
45 let items: Vec<String> = values.iter().map(|v| format!("'{}'", v)).collect();
46 format!("ARRAY[{}]", items.join(", "))
47 }
48
49 pub fn json_path(column: &str, path: &[&str]) -> String {
51 if path.is_empty() {
52 column.to_string()
53 } else {
54 let path_str: Vec<String> = path.iter().map(|p| format!("'{}'", p)).collect();
55 format!("{}->>{}", column, path_str.join("->"))
56 }
57 }
58
59 pub async fn has_extension(pool: &PgPool, extension: &str) -> SqlxResult<bool> {
61 let sql = "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = $1)";
62 let row = sqlx::query(sql).bind(extension).fetch_one(pool).await?;
63 let exists: bool = row.try_get(0)?;
64 Ok(exists)
65 }
66
67 pub async fn version(pool: &PgPool) -> SqlxResult<String> {
69 let sql = "SELECT version()";
70 let row = sqlx::query(sql).fetch_one(pool).await?;
71 let version: String = row.try_get(0)?;
72 Ok(version)
73 }
74
75 pub async fn listen(pool: &PgPool, channel: &str) -> SqlxResult<()> {
77 let sql = format!("LISTEN {}", channel);
78 sqlx::query(&sql).execute(pool).await?;
79 Ok(())
80 }
81
82 pub async fn notify(pool: &PgPool, channel: &str, payload: &str) -> SqlxResult<()> {
84 let sql = format!("NOTIFY {}, '{}'", channel, payload.replace('\'', "''"));
85 sqlx::query(&sql).execute(pool).await?;
86 Ok(())
87 }
88}
89
90pub struct AdvisoryLock;
92
93impl AdvisoryLock {
94 pub async fn acquire(pool: &PgPool, key: i64) -> SqlxResult<()> {
96 sqlx::query("SELECT pg_advisory_lock($1)")
97 .bind(key)
98 .execute(pool)
99 .await?;
100 Ok(())
101 }
102
103 pub async fn try_acquire(pool: &PgPool, key: i64) -> SqlxResult<bool> {
105 let row = sqlx::query("SELECT pg_try_advisory_lock($1)")
106 .bind(key)
107 .fetch_one(pool)
108 .await?;
109 let acquired: bool = row.try_get(0)?;
110 Ok(acquired)
111 }
112
113 pub async fn release(pool: &PgPool, key: i64) -> SqlxResult<()> {
115 sqlx::query("SELECT pg_advisory_unlock($1)")
116 .bind(key)
117 .execute(pool)
118 .await?;
119 Ok(())
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126
127 #[test]
128 fn test_array_literal() {
129 assert_eq!(PgHelpers::array_literal(&[1, 2, 3]), "ARRAY['1', '2', '3']");
130 assert_eq!(PgHelpers::array_literal(&["a", "b"]), "ARRAY['a', 'b']");
131 }
132
133 #[test]
134 fn test_json_path() {
135 assert_eq!(PgHelpers::json_path("data", &[]), "data");
136 assert_eq!(PgHelpers::json_path("data", &["name"]), "data->>'name'");
137 assert_eq!(
138 PgHelpers::json_path("data", &["user", "name"]),
139 "data->>'user'->'name'"
140 );
141 }
142}