1use std::collections::BTreeSet;
2use std::{pin::pin, sync::LazyLock};
3use tank::{
4 AsValue, DataSet, Entity, Passive, RowLabeled, expr, stream::StreamExt, stream::TryStreamExt,
5};
6use tank::{Executor, cols};
7use tokio::sync::Mutex;
8
9#[derive(Default, Entity)]
10struct Values {
11 id: Passive<u64>,
12 value: u32,
14}
15const EXPECTED_SUM: u32 = 68978385;
16const EXPECTED_AVG: u32 = 5873;
17const COUNT: u32 = 11745;
18static MUTEX: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
19
20pub async fn aggregates<E: Executor>(executor: &mut E) {
21 let _lock = MUTEX.lock();
22
23 Values::drop_table(executor, true, false)
25 .await
26 .expect("Failed to drop Values table");
27 Values::create_table(executor, false, false)
28 .await
29 .expect("Failed to create Values table");
30
31 let mut values = (1..11746).map(|value| Values {
35 id: value.into(),
36 value: value as u32,
37 });
38 loop {
39 let rows = values.by_ref().take(2000).collect::<Vec<_>>();
40 if rows.is_empty() {
41 break;
42 }
43 let result = Values::insert_many(executor, rows.iter()).await;
44 assert!(
45 result.is_ok(),
46 "Failed to Values::insert_many: {:?}",
47 result.unwrap_err()
48 );
49 let result = result.unwrap();
50 if let Some(affected) = result.rows_affected {
51 assert_eq!(
52 affected,
53 rows.len() as u64,
54 "Values::insert_many should have affected {} rows",
55 rows.len()
56 );
57 }
58 }
59
60 {
62 let mut stream = pin!(Values::table().select(
63 executor,
64 cols!(COUNT(*), SUM(Values::value)),
65 &true,
66 None
67 ));
68 let count = stream.next().await;
69 assert!(
70 stream.next().await.is_none(),
71 "The query is expected to return a single row"
72 );
73 let expected = (COUNT as i128, EXPECTED_SUM as i128);
74 let actual = match count {
75 Some(Ok(RowLabeled { values, .. })) => {
76 let a = i128::try_from_value((*values)[0].clone());
77 let b = i128::try_from_value((*values)[1].clone());
78 match (a, b) {
79 (Ok(a), Ok(b)) => Some((a, b)),
80 (Err(e), _) => panic!("{e}"),
81 (_, Err(e)) => panic!("{e}"),
82 }
83 }
84 _ => None,
85 };
86 assert_eq!(
87 actual,
88 Some(expected),
89 "SELECT COUNT(*), SUM(value) is expected to return {:?}",
90 expected
91 );
92 }
93
94 {
96 {
97 let stream = pin!(Values::table().select(executor, cols!(*), &true, None));
98 let values = stream
99 .map(|row| {
100 let row = row.expect("Error while fetching the row");
101 let i = row
102 .names()
103 .iter()
104 .enumerate()
105 .find_map(|(i, v)| if v == "value" { Some(i) } else { None })
106 .expect("Column `value` is not present");
107 u32::try_from_value(row.values[i].clone())
108 .expect("The result could not be convert back to u32")
109 })
110 .collect::<BTreeSet<_>>()
111 .await;
112 assert!(
113 values.into_iter().eq((1..(COUNT + 1)).into_iter()),
114 "The result didn't received from the db contains all the values that were inserted"
115 );
116 }
117 }
118
119 {
121 let mut query = Values::prepare_find(executor, &expr!(Values::value > ?), None)
122 .await
123 .expect("Failed to prepare the query");
124 assert!(query.is_prepared());
125 query
126 .bind(EXPECTED_AVG)
127 .expect("Could not bind the parameter");
128 let values = executor
129 .fetch(&mut query)
130 .try_collect::<Vec<_>>()
131 .await
132 .expect("Could not fetch rows above average from the prepared statement");
133 assert!(query.is_prepared());
134 assert_eq!(values.len(), COUNT as usize / 2);
135 query
136 .clear_bindings()
137 .expect("Could not clear the bindings");
138 query.bind(0).expect("Could not bind a second time");
139 let values = executor
140 .fetch(&mut query)
141 .try_collect::<Vec<_>>()
142 .await
143 .expect("Could not fetch positive rows from the prepared statement");
144 assert_eq!(values.len(), COUNT as usize);
145 }
146}