1use crate::api::conn::Method;
2use crate::api::conn::Param;
3use crate::api::conn::Router;
4use crate::api::err::Error;
5use crate::api::opt;
6use crate::api::Connection;
7use crate::api::Result;
8use crate::sql;
9use crate::sql::to_value;
10use crate::sql::Array;
11use crate::sql::Object;
12use crate::sql::Statement;
13use crate::sql::Statements;
14use crate::sql::Strand;
15use crate::sql::Value;
16use indexmap::IndexMap;
17use serde::de::DeserializeOwned;
18use serde::Serialize;
19use std::collections::BTreeMap;
20use std::collections::HashMap;
21use std::future::Future;
22use std::future::IntoFuture;
23use std::mem;
24use std::pin::Pin;
25
26#[derive(Debug)]
28#[must_use = "futures do nothing unless you `.await` or poll them"]
29pub struct Query<'r, C: Connection> {
30 pub(super) router: Result<&'r Router<C>>,
31 pub(super) query: Vec<Result<Vec<Statement>>>,
32 pub(super) bindings: Result<BTreeMap<String, Value>>,
33}
34
35impl<'r, Client> IntoFuture for Query<'r, Client>
36where
37 Client: Connection,
38{
39 type Output = Result<Response>;
40 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'r>>;
41
42 fn into_future(self) -> Self::IntoFuture {
43 Box::pin(async move {
44 let mut statements = Vec::with_capacity(self.query.len());
45 for query in self.query {
46 statements.extend(query?);
47 }
48 let query = sql::Query(Statements(statements));
49 let param = Param::query(query, self.bindings?);
50 let mut conn = Client::new(Method::Query);
51 conn.execute_query(self.router?, param).await
52 })
53 }
54}
55
56impl<'r, C> Query<'r, C>
57where
58 C: Connection,
59{
60 pub fn query(mut self, query: impl opt::IntoQuery) -> Self {
62 self.query.push(query.into_query());
63 self
64 }
65
66 pub fn bind(mut self, bindings: impl Serialize) -> Self {
108 if let Ok(current) = &mut self.bindings {
109 match to_value(bindings) {
110 Ok(mut bindings) => {
111 if let Value::Array(Array(array)) = &mut bindings {
112 if let [Value::Strand(Strand(key)), value] = &mut array[..] {
113 let mut map = BTreeMap::new();
114 map.insert(mem::take(key), mem::take(value));
115 bindings = map.into();
116 }
117 }
118 match &mut bindings {
119 Value::Object(Object(map)) => current.append(map),
120 _ => {
121 self.bindings = Err(Error::InvalidBindings(bindings).into());
122 }
123 }
124 }
125 Err(error) => {
126 self.bindings = Err(error.into());
127 }
128 }
129 }
130 self
131 }
132}
133
134pub(crate) type QueryResult = Result<Value>;
135
136#[derive(Debug)]
138pub struct Response(pub(crate) IndexMap<usize, QueryResult>);
139
140impl Response {
141 pub fn take<R>(&mut self, index: impl opt::QueryResult<R>) -> Result<R>
204 where
205 R: DeserializeOwned,
206 {
207 index.query_result(self)
208 }
209
210 pub fn take_errors(&mut self) -> HashMap<usize, crate::Error> {
229 let mut keys = Vec::new();
230 for (key, result) in &self.0 {
231 if result.is_err() {
232 keys.push(*key);
233 }
234 }
235 let mut errors = HashMap::with_capacity(keys.len());
236 for key in keys {
237 if let Some(Err(error)) = self.0.remove(&key) {
238 errors.insert(key, error);
239 }
240 }
241 errors
242 }
243
244 pub fn check(mut self) -> Result<Self> {
260 let mut first_error = None;
261 for (key, result) in &self.0 {
262 if result.is_err() {
263 first_error = Some(*key);
264 break;
265 }
266 }
267 if let Some(key) = first_error {
268 if let Some(Err(error)) = self.0.remove(&key) {
269 return Err(error);
270 }
271 }
272 Ok(self)
273 }
274
275 pub fn num_statements(&self) -> usize {
292 self.0.len()
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use crate::Error::Api;
300 use serde::Deserialize;
301
302 #[derive(Debug, Clone, Serialize, Deserialize)]
303 struct Summary {
304 title: String,
305 }
306
307 #[derive(Debug, Clone, Serialize, Deserialize)]
308 struct Article {
309 title: String,
310 body: String,
311 }
312
313 fn to_map(vec: Vec<QueryResult>) -> IndexMap<usize, QueryResult> {
314 vec.into_iter().enumerate().collect()
315 }
316
317 #[test]
318 fn take_from_an_empty_response() {
319 let mut response = Response(Default::default());
320 let value: Value = response.take(0).unwrap();
321 assert!(value.is_none());
322
323 let mut response = Response(Default::default());
324 let option: Option<String> = response.take(0).unwrap();
325 assert!(option.is_none());
326
327 let mut response = Response(Default::default());
328 let vec: Vec<String> = response.take(0).unwrap();
329 assert!(vec.is_empty());
330 }
331
332 #[test]
333 fn take_from_an_errored_query() {
334 let mut response = Response(to_map(vec![Err(Error::ConnectionUninitialised.into())]));
335 response.take::<Option<()>>(0).unwrap_err();
336 }
337
338 #[test]
339 fn take_from_empty_records() {
340 let mut response = Response(to_map(vec![]));
341 let value: Value = response.take(0).unwrap();
342 assert_eq!(value, Default::default());
343
344 let mut response = Response(to_map(vec![]));
345 let option: Option<String> = response.take(0).unwrap();
346 assert!(option.is_none());
347
348 let mut response = Response(to_map(vec![]));
349 let vec: Vec<String> = response.take(0).unwrap();
350 assert!(vec.is_empty());
351 }
352
353 #[test]
354 fn take_from_a_scalar_response() {
355 let scalar = 265;
356
357 let mut response = Response(to_map(vec![Ok(scalar.into())]));
358 let value: Value = response.take(0).unwrap();
359 assert_eq!(value, Value::from(scalar));
360
361 let mut response = Response(to_map(vec![Ok(scalar.into())]));
362 let option: Option<_> = response.take(0).unwrap();
363 assert_eq!(option, Some(scalar));
364
365 let mut response = Response(to_map(vec![Ok(scalar.into())]));
366 let vec: Vec<usize> = response.take(0).unwrap();
367 assert_eq!(vec, vec![scalar]);
368
369 let scalar = true;
370
371 let mut response = Response(to_map(vec![Ok(scalar.into())]));
372 let value: Value = response.take(0).unwrap();
373 assert_eq!(value, Value::from(scalar));
374
375 let mut response = Response(to_map(vec![Ok(scalar.into())]));
376 let option: Option<_> = response.take(0).unwrap();
377 assert_eq!(option, Some(scalar));
378
379 let mut response = Response(to_map(vec![Ok(scalar.into())]));
380 let vec: Vec<bool> = response.take(0).unwrap();
381 assert_eq!(vec, vec![scalar]);
382 }
383
384 #[test]
385 fn take_preserves_order() {
386 let mut response = Response(to_map(vec![
387 Ok(0.into()),
388 Ok(1.into()),
389 Ok(2.into()),
390 Ok(3.into()),
391 Ok(4.into()),
392 Ok(5.into()),
393 Ok(6.into()),
394 Ok(7.into()),
395 ]));
396 let Some(four): Option<i32> = response.take(4).unwrap() else {
397 panic!("query not found");
398 };
399 assert_eq!(four, 4);
400 let Some(six): Option<i32> = response.take(6).unwrap() else {
401 panic!("query not found");
402 };
403 assert_eq!(six, 6);
404 let Some(zero): Option<i32> = response.take(0).unwrap() else {
405 panic!("query not found");
406 };
407 assert_eq!(zero, 0);
408 let one: Value = response.take(1).unwrap();
409 assert_eq!(one, Value::from(1));
410 }
411
412 #[test]
413 fn take_key() {
414 let summary = Summary {
415 title: "Lorem Ipsum".to_owned(),
416 };
417 let value = to_value(summary.clone()).unwrap();
418
419 let mut response = Response(to_map(vec![Ok(value.clone())]));
420 let title: Value = response.take("title").unwrap();
421 assert_eq!(title, Value::from(summary.title.as_str()));
422
423 let mut response = Response(to_map(vec![Ok(value.clone())]));
424 let Some(title): Option<String> = response.take("title").unwrap() else {
425 panic!("title not found");
426 };
427 assert_eq!(title, summary.title);
428
429 let mut response = Response(to_map(vec![Ok(value)]));
430 let vec: Vec<String> = response.take("title").unwrap();
431 assert_eq!(vec, vec![summary.title]);
432
433 let article = Article {
434 title: "Lorem Ipsum".to_owned(),
435 body: "Lorem Ipsum Lorem Ipsum".to_owned(),
436 };
437 let value = to_value(article.clone()).unwrap();
438
439 let mut response = Response(to_map(vec![Ok(value.clone())]));
440 let Some(title): Option<String> = response.take("title").unwrap() else {
441 panic!("title not found");
442 };
443 assert_eq!(title, article.title);
444 let Some(body): Option<String> = response.take("body").unwrap() else {
445 panic!("body not found");
446 };
447 assert_eq!(body, article.body);
448
449 let mut response = Response(to_map(vec![Ok(value.clone())]));
450 let vec: Vec<String> = response.take("title").unwrap();
451 assert_eq!(vec, vec![article.title.clone()]);
452
453 let mut response = Response(to_map(vec![Ok(value)]));
454 let value: Value = response.take("title").unwrap();
455 assert_eq!(value, Value::from(article.title));
456 }
457
458 #[test]
459 fn take_partial_records() {
460 let mut response = Response(to_map(vec![Ok(vec![true, false].into())]));
461 let value: Value = response.take(0).unwrap();
462 assert_eq!(value, vec![Value::from(true), Value::from(false)].into());
463
464 let mut response = Response(to_map(vec![Ok(vec![true, false].into())]));
465 let vec: Vec<bool> = response.take(0).unwrap();
466 assert_eq!(vec, vec![true, false]);
467
468 let mut response = Response(to_map(vec![Ok(vec![true, false].into())]));
469 let Err(Api(Error::LossyTake(Response(mut map)))): Result<Option<bool>> = response.take(0)
470 else {
471 panic!("silently dropping records not allowed");
472 };
473 let records = map.remove(&0).unwrap().unwrap();
474 assert_eq!(records, vec![true, false].into());
475 }
476
477 #[test]
478 fn check_returns_the_first_error() {
479 let response = vec![
480 Ok(0.into()),
481 Ok(1.into()),
482 Ok(2.into()),
483 Err(Error::ConnectionUninitialised.into()),
484 Ok(3.into()),
485 Ok(4.into()),
486 Ok(5.into()),
487 Err(Error::BackupsNotSupported.into()),
488 Ok(6.into()),
489 Ok(7.into()),
490 Err(Error::DuplicateRequestId(0).into()),
491 ];
492 let response = Response(to_map(response));
493 let crate::Error::Api(Error::ConnectionUninitialised) = response.check().unwrap_err()
494 else {
495 panic!("check did not return the first error");
496 };
497 }
498
499 #[test]
500 fn take_errors() {
501 let response = vec![
502 Ok(0.into()),
503 Ok(1.into()),
504 Ok(2.into()),
505 Err(Error::ConnectionUninitialised.into()),
506 Ok(3.into()),
507 Ok(4.into()),
508 Ok(5.into()),
509 Err(Error::BackupsNotSupported.into()),
510 Ok(6.into()),
511 Ok(7.into()),
512 Err(Error::DuplicateRequestId(0).into()),
513 ];
514 let mut response = Response(to_map(response));
515 let errors = response.take_errors();
516 assert_eq!(response.num_statements(), 8);
517 assert_eq!(errors.len(), 3);
518 let crate::Error::Api(Error::DuplicateRequestId(0)) = errors.get(&10).unwrap() else {
519 panic!("index `10` is not `DuplicateRequestId`");
520 };
521 let crate::Error::Api(Error::BackupsNotSupported) = errors.get(&7).unwrap() else {
522 panic!("index `7` is not `BackupsNotSupported`");
523 };
524 let crate::Error::Api(Error::ConnectionUninitialised) = errors.get(&3).unwrap() else {
525 panic!("index `3` is not `ConnectionUninitialised`");
526 };
527 let Some(value): Option<i32> = response.take(2).unwrap() else {
528 panic!("statement not found");
529 };
530 assert_eq!(value, 2);
531 let value: Value = response.take(4).unwrap();
532 assert_eq!(value, Value::from(3));
533 }
534}