1mod delete;
5mod select;
6mod table;
7mod transaction;
8mod update;
9
10pub use table::{Column, TypedColumn};
11
12use std::{
13 future::{Future, IntoFuture},
14 marker::PhantomData,
15 ops::{BitAnd, BitOr, Not},
16 pin::Pin,
17};
18
19use async_trait::async_trait;
20use tokio_postgres::{types::ToSql, Row, Transaction as PgTransaction};
21
22use crate::{fetch_client, DpClient, DpTransaction, Error};
23
24pub use delete::Delete;
25pub use select::Select;
26pub use transaction::*;
27pub use update::{NoneSet, SomeSet, Update};
28
29#[doc(hidden)]
31pub trait PushChunk<'a> {
32 fn push_to_buffer<T>(&mut self, buffer: &mut Query<'a, T>);
34}
35
36#[doc(hidden)]
38#[async_trait]
39pub trait Executor {
40 async fn query(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error>;
42 async fn execute(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error>;
44}
45
46#[async_trait]
49pub trait Executable {
50 type Output;
52
53 async fn exec(&self) -> Result<Self::Output, crate::Error> {
55 let client = fetch_client().await?;
56 self.exec_with(&client).await
57 }
58
59 async fn exec_with(
61 &self,
62 client: impl Executor + Send + Sync,
63 ) -> Result<Self::Output, crate::Error>;
64}
65
66pub struct Query<'a, T = Vec<Row>>(pub String, Vec<&'a (dyn ToSql + Sync)>, PhantomData<T>);
72
73pub trait ToQuery<'a, T>: PushChunk<'a> {
75 fn to_query(&mut self) -> Query<'a, T> {
77 let mut query = Query::default();
79
80 self.push_to_buffer(&mut query);
82
83 query.0 = replace_question_marks(query.0);
85
86 query
88 }
89}
90
91#[doc(hidden)]
96pub struct SqlChunk<'a>(pub String, pub Vec<&'a (dyn ToSql + Sync)>);
97
98fn push_all_with_sep<'a, T, U: PushChunk<'a>>(
107 vec: &mut Vec<U>,
108 buffer: &mut Query<'a, T>,
109 sep: &str,
110) {
111 if vec.is_empty() {
112 return;
113 }
114
115 for i in vec {
116 i.push_to_buffer(buffer);
117 buffer.0.push_str(sep);
118 }
119
120 buffer.0.truncate(buffer.0.len() - sep.len());
123}
124
125pub enum Where<'a> {
127 And(Vec<Where<'a>>),
129 Or(Vec<Where<'a>>),
131 Not(Box<Where<'a>>),
133 Raw(SqlChunk<'a>),
135 Empty,
137}
138
139fn replace_question_marks(stmt: String) -> String {
142 const RESERVED: usize = 9;
145 let mut buf = String::with_capacity(stmt.len() + RESERVED);
146
147 let mut last_index = 0;
149
150 for (count, (i, _)) in stmt.match_indices('?').enumerate() {
152 buf.push_str(&stmt[last_index..i]);
154
155 buf.push('$');
157 buf.push_str(&(count + 1).to_string());
158
159 last_index = i + 1;
160 }
161
162 buf.push_str(&stmt[last_index..]);
164
165 buf
166}
167
168impl<'a, T> Default for Query<'a, T> {
169 fn default() -> Self {
170 Self("".into(), vec![], PhantomData::<T>)
171 }
172}
173
174impl<'a, T> Query<'a, T> {
175 pub fn new(stmt: String, params: Vec<&'a (dyn ToSql + Sync)>) -> Query<'a, T> {
177 Query(replace_question_marks(stmt), params, PhantomData::<T>)
178 }
179}
180
181impl<'a> PushChunk<'a> for SqlChunk<'a> {
182 fn push_to_buffer<T>(&mut self, buffer: &mut Query<'a, T>) {
183 buffer.0.push_str(&self.0);
184 buffer.1.append(&mut self.1);
185 }
186}
187
188#[async_trait]
189impl<'a> Executor for &DpTransaction<'a> {
190 async fn query(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error> {
191 PgTransaction::query(self, stmt, params)
192 .await
193 .map_err(Error::from)
194 }
195
196 async fn execute(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
197 PgTransaction::execute(self, stmt, params)
198 .await
199 .map_err(Error::from)
200 }
201}
202
203#[async_trait]
204impl Executor for &DpClient {
205 async fn query(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error> {
206 (***self).query(stmt, params).await.map_err(Error::from)
207 }
208
209 async fn execute(&self, stmt: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error> {
210 (***self).execute(stmt, params).await.map_err(Error::from)
211 }
212}
213
214#[async_trait]
215impl<'a, T> Executable for Query<'a, Vec<T>>
216where
217 T: TryFrom<Row, Error = crate::Error> + Send + Sync,
218{
219 type Output = Vec<T>;
220
221 async fn exec_with(
222 &self,
223 client: impl Executor + Send + Sync,
224 ) -> Result<Self::Output, crate::Error> {
225 let rows = client.query(&self.0, &self.1).await?;
226
227 rows.into_iter().map(|i| T::try_from(i)).collect()
228 }
229}
230
231#[async_trait]
232impl<'a, T> Executable for Query<'a, Option<T>>
233where
234 T: TryFrom<Row, Error = crate::Error> + Send + Sync,
235{
236 type Output = Option<T>;
237
238 async fn exec_with(
239 &self,
240 client: impl Executor + Send + Sync,
241 ) -> Result<Self::Output, crate::Error> {
242 let rows = client.query(&self.0, &self.1).await?;
243
244 rows.into_iter()
245 .map(|i: Row| T::try_from(i))
246 .next()
247 .transpose()
248 }
249}
250
251#[async_trait]
252impl<'a> Executable for Query<'a, u64> {
253 type Output = u64;
254
255 async fn exec_with(
256 &self,
257 client: impl Executor + Send + Sync,
258 ) -> Result<Self::Output, crate::Error> {
259 client
260 .execute(&self.0, &self.1)
261 .await
262 .map_err(crate::Error::from)
263 }
264}
265
266impl<'a, T: Send + 'a> IntoFuture for Query<'a, T>
269where
270 Query<'a, T>: Executable<Output = T>,
271 T: Sync,
272{
273 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + 'a>>;
274 type Output = Result<T, crate::Error>;
275
276 fn into_future(self) -> Self::IntoFuture {
277 Box::pin(async move { self.exec().await })
278 }
279}
280
281impl<'a> Where<'a> {
282 pub(crate) fn new(expr: String, params: Vec<&'a (dyn ToSql + Sync)>) -> Where<'a> {
284 Self::Raw(SqlChunk(expr, params))
285 }
286
287 pub(crate) fn is_empty(&self) -> bool {
289 use Where::*;
290
291 match self {
292 Empty => true,
293 And(vec) => vec.iter().all(|i| i.is_empty()),
294 Or(vec) => vec.iter().all(|i| i.is_empty()),
295 Not(inner) => inner.is_empty(),
296 Raw(chunk) => chunk.0.is_empty(),
297 }
298 }
299
300 pub fn and(self, other: Where<'a>) -> Where<'a> {
304 self.bitand(other)
305 }
306
307 pub fn or(self, other: Where<'a>) -> Where<'a> {
317 self.bitor(other)
318 }
319}
320
321impl<'a> Default for Where<'a> {
322 fn default() -> Self {
323 Where::new("".into(), vec![])
324 }
325}
326
327impl<'a> BitAnd for Where<'a> {
328 type Output = Where<'a>;
329
330 fn bitand(mut self, mut other: Self) -> Self::Output {
331 use Where::*;
332
333 if let Empty = self {
334 return other;
335 }
336
337 if let Empty = other {
338 return self;
339 }
340
341 if let And(ref mut vec) = self {
345 if let And(ref mut other_vec) = other {
347 vec.append(other_vec);
348 } else {
349 vec.push(other);
350 }
351 return self;
352 }
353
354 if let And(ref mut vec) = other {
355 vec.push(self);
356 return other;
357 }
358
359 And(vec![self, other])
360 }
361}
362
363impl<'a> BitOr for Where<'a> {
364 type Output = Where<'a>;
365
366 fn bitor(mut self, mut other: Self) -> Self::Output {
367 use Where::*;
368
369 if let Empty = self {
370 return other;
371 }
372 if let Empty = other {
373 return self;
374 }
375
376 if let Or(ref mut vec) = self {
380 if let And(ref mut other_vec) = other {
382 vec.append(other_vec);
383 } else {
384 vec.push(other);
385 }
386 return self;
387 }
388
389 if let Or(ref mut vec) = other {
390 vec.push(self);
391 return other;
392 }
393
394 Or(vec![self, other])
395 }
396}
397
398impl<'a> Not for Where<'a> {
399 type Output = Where<'a>;
400
401 fn not(self) -> Self::Output {
402 use Where::*;
403
404 if let Not(inner) = self {
405 return *inner;
406 }
407
408 Not(Box::new(self))
409 }
410}
411
412impl<'a> PushChunk<'a> for Where<'a> {
413 fn push_to_buffer<T>(&mut self, buffer: &mut Query<'a, T>) {
414 use Where::*;
415
416 if self.is_empty() {
417 return;
418 }
419
420 match self {
421 Raw(chunk) => {
422 chunk.push_to_buffer(buffer);
423 }
424 Not(inner) => {
425 buffer.0.push_str("NOT (");
426 inner.push_to_buffer(buffer);
427 buffer.0.push(')');
428 }
429 And(vec) => {
430 buffer.0.push('(');
431 push_all_with_sep(vec, buffer, ") AND (");
432 buffer.0.push(')');
433 }
434 Or(vec) => {
435 buffer.0.push('(');
436 push_all_with_sep(vec, buffer, ") OR (");
437 buffer.0.push(')');
438 }
439 Empty => (),
440 }
441 }
442}