1use crate::SqlError;
8use serde_json::Value;
9use sqlx::{Decode, Execute, Executor, FromRow, Row, Type};
10use sqlx_json::{QueryResult as _, RowExt};
11
12use crate::timeout::execute_with_timeout;
13
14#[allow(async_fn_in_trait)]
33pub trait Connection: Send + Sync
34where
35 for<'c> &'c mut <Self::DB as sqlx::Database>::Connection: Executor<'c, Database = Self::DB>,
36 usize: sqlx::ColumnIndex<<Self::DB as sqlx::Database>::Row>,
37 <Self::DB as sqlx::Database>::Row: RowExt,
38 <Self::DB as sqlx::Database>::QueryResult: sqlx_json::QueryResult,
39{
40 type DB: sqlx::Database;
42
43 async fn pool(&self, target: Option<&str>) -> Result<sqlx::Pool<Self::DB>, SqlError>;
49
50 fn query_timeout(&self) -> Option<u64>;
52
53 async fn execute<'q, E>(&self, query: E, database: Option<&str>) -> Result<u64, SqlError>
59 where
60 E: 'q + Execute<'q, Self::DB>,
61 {
62 let sql = query.sql().to_owned();
63 let pool = self.pool(database).await?;
64 execute_with_timeout(self.query_timeout(), &sql, async move {
65 Ok(pool.execute(query).await?.rows_affected())
66 })
67 .await
68 }
69
70 async fn fetch_json<'q, E>(&self, query: E, database: Option<&str>) -> Result<Vec<Value>, SqlError>
76 where
77 E: 'q + Execute<'q, Self::DB>,
78 {
79 let sql = query.sql().to_owned();
80 let pool = self.pool(database).await?;
81 execute_with_timeout(self.query_timeout(), &sql, async move {
82 Ok(pool.fetch_all(query).await?.iter().map(RowExt::to_json).collect())
83 })
84 .await
85 }
86
87 async fn fetch_optional<'q, E, T>(&self, query: E, database: Option<&str>) -> Result<Option<T>, SqlError>
96 where
97 E: 'q + Execute<'q, Self::DB>,
98 T: for<'r> Decode<'r, Self::DB> + Type<Self::DB> + Send + Unpin,
99 {
100 let sql = query.sql().to_owned();
101 let pool = self.pool(database).await?;
102 execute_with_timeout(self.query_timeout(), &sql, async move {
103 Ok(pool.fetch_optional(query).await?.and_then(|r| r.try_get(0usize).ok()))
104 })
105 .await
106 }
107
108 async fn fetch_scalar<'q, E, T>(&self, query: E, database: Option<&str>) -> Result<Vec<T>, SqlError>
114 where
115 E: 'q + Execute<'q, Self::DB>,
116 T: for<'r> Decode<'r, Self::DB> + Type<Self::DB> + Send + Unpin,
117 {
118 let sql = query.sql().to_owned();
119 let pool = self.pool(database).await?;
120 execute_with_timeout(self.query_timeout(), &sql, async move {
121 let rows = pool.fetch_all(query).await?;
122 rows.iter().map(|r| r.try_get(0usize)).collect()
123 })
124 .await
125 }
126
127 async fn fetch<'q, E, T>(&self, query: E, database: Option<&str>) -> Result<Vec<T>, SqlError>
135 where
136 E: 'q + Execute<'q, Self::DB>,
137 T: for<'r> FromRow<'r, <Self::DB as sqlx::Database>::Row> + Send + Unpin,
138 {
139 let sql = query.sql().to_owned();
140 let pool = self.pool(database).await?;
141 execute_with_timeout(self.query_timeout(), &sql, async move {
142 let rows = pool.fetch_all(query).await?;
143 rows.iter().map(T::from_row).collect()
144 })
145 .await
146 }
147}