1use std::collections::BTreeSet;
2use std::{pin::pin, sync::LazyLock};
3use tank::{AsValue, Entity, Passive, Row, expr, stream::StreamExt, stream::TryStreamExt};
4use tank::{Executor, QueryBuilder, cols};
5use tokio::sync::Mutex;
6
7static MUTEX: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
8const EXPECTED_SUM: u32 = 68978385;
9const EXPECTED_AVG: u32 = 5873;
10const COUNT: u32 = 11745;
11
12#[derive(Default, Entity)]
13struct Values {
14 id: Passive<u64>,
15 value: u32,
17}
18
19pub async fn aggregates(executor: &mut impl Executor) {
20 let _lock = MUTEX.lock();
21
22 Values::drop_table(executor, true, false)
24 .await
25 .expect("Failed to drop Values table");
26 Values::create_table(executor, false, false)
27 .await
28 .expect("Failed to create Values table");
29
30 let mut values = (1..11746).map(|value| Values {
34 id: value.into(),
35 value: value as u32,
36 });
37 loop {
38 let rows = values.by_ref().take(2000).collect::<Vec<_>>();
39 if rows.is_empty() {
40 break;
41 }
42 let result = Values::insert_many(executor, rows.iter()).await;
43 assert!(
44 result.is_ok(),
45 "Failed to Values::insert_many: {:?}",
46 result.unwrap_err()
47 );
48 let result = result.unwrap();
49 if let Some(affected) = result.rows_affected {
50 assert_eq!(
51 affected,
52 rows.len() as u64,
53 "Values::insert_many should have affected {} rows",
54 rows.len()
55 );
56 }
57 }
58
59 {
61 let mut stream = pin!(
62 executor.fetch(
63 QueryBuilder::new()
64 .select(cols!(COUNT(*), SUM(Values::value)))
65 .from(Values::table())
66 .where_expr(true)
67 .build(&executor.driver())
68 )
69 );
70 let count = stream.next().await;
71 assert!(
72 stream.next().await.is_none(),
73 "The query is expected to return a single row"
74 );
75 let expected = (COUNT as i128, EXPECTED_SUM as i128);
76 let actual = match count {
77 Some(Ok(Row { values, .. })) => {
78 let a = i128::try_from_value((*values)[0].clone());
79 let b = i128::try_from_value((*values)[1].clone());
80 match (a, b) {
81 (Ok(a), Ok(b)) => Some((a, b)),
82 (Err(e), _) => panic!("{e}"),
83 (_, Err(e)) => panic!("{e}"),
84 }
85 }
86 _ => None,
87 };
88 assert_eq!(
89 actual,
90 Some(expected),
91 "SELECT COUNT(*), SUM(value) is expected to return {:?}",
92 expected
93 );
94 }
95
96 {
98 {
99 let stream = pin!(
100 executor.fetch(
101 QueryBuilder::new()
102 .select(cols!(*))
103 .from(Values::table())
104 .where_expr(true)
105 .build(&executor.driver())
106 )
107 );
108 let values = stream
109 .map(|row| {
110 let row = row.expect("Error while fetching the row");
111 let i = row
112 .names()
113 .iter()
114 .enumerate()
115 .find_map(|(i, v)| if v == "value" { Some(i) } else { None })
116 .expect("Column `value` is not present");
117 u32::try_from_value(row.values[i].clone())
118 .expect("The result could not be convert back to u32")
119 })
120 .collect::<BTreeSet<_>>()
121 .await;
122 assert!(
123 values.into_iter().eq((1..(COUNT + 1)).into_iter()),
124 "The result didn't received from the db contains all the values that were inserted"
125 );
126 }
127 }
128
129 {
131 let mut query = Values::prepare_find(executor, expr!(Values::value > ?), None)
132 .await
133 .expect("Failed to prepare the query");
134 assert!(query.is_prepared());
135 query
136 .bind(EXPECTED_AVG)
137 .expect("Could not bind the parameter");
138 let values = executor
139 .fetch(&mut query)
140 .try_collect::<Vec<_>>()
141 .await
142 .expect("Could not fetch rows above average from the prepared statement");
143 assert!(query.is_prepared());
144 assert_eq!(values.len(), COUNT as usize / 2);
145 query
146 .clear_bindings()
147 .expect("Could not clear the bindings");
148 query.bind(0).expect("Could not bind a second time");
149 let values = executor
150 .fetch(&mut query)
151 .try_collect::<Vec<_>>()
152 .await
153 .expect("Could not fetch positive rows from the prepared statement");
154 assert_eq!(values.len(), COUNT as usize);
155 }
156}