Skip to main content

tank_tests/
aggregates.rs

1use std::collections::BTreeSet;
2use std::{pin::pin, sync::LazyLock};
3use tank::{AsValue, Entity, Passive, Row, expr, stream::StreamExt, stream::TryStreamExt};
4use tank::{Executor, QueryBuilder, cols};
5use tokio::sync::Mutex;
6
7static MUTEX: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
8const EXPECTED_SUM: u32 = 68978385;
9const EXPECTED_AVG: u32 = 5873;
10const COUNT: u32 = 11745;
11
12#[derive(Default, Entity)]
13struct Values {
14    id: Passive<u64>,
15    /// This column contains the actual value
16    value: u32,
17}
18
19pub async fn aggregates(executor: &mut impl Executor) {
20    let _lock = MUTEX.lock();
21
22    // Setup
23    Values::drop_table(executor, true, false)
24        .await
25        .expect("Failed to drop Values table");
26    Values::create_table(executor, false, false)
27        .await
28        .expect("Failed to create Values table");
29
30    // Insert
31    // 1 + .. + 11745 = 68978385
32    // avg(1, .., 11745) = 5873
33    let mut values = (1..11746).map(|value| Values {
34        id: value.into(),
35        value: value as u32,
36    });
37    loop {
38        let rows = values.by_ref().take(2000).collect::<Vec<_>>();
39        if rows.is_empty() {
40            break;
41        }
42        let result = Values::insert_many(executor, rows.iter()).await;
43        assert!(
44            result.is_ok(),
45            "Failed to Values::insert_many: {:?}",
46            result.unwrap_err()
47        );
48        let result = result.unwrap();
49        if let Some(affected) = result.rows_affected {
50            assert_eq!(
51                affected,
52                rows.len() as u64,
53                "Values::insert_many should have affected {} rows",
54                rows.len()
55            );
56        }
57    }
58
59    // SELECT COUNT(*), SUM(value)
60    {
61        let mut stream = pin!(
62            executor.fetch(
63                QueryBuilder::new()
64                    .select(cols!(COUNT(*), SUM(Values::value)))
65                    .from(Values::table())
66                    .where_expr(true)
67                    .build(&executor.driver())
68            )
69        );
70        let count = stream.next().await;
71        assert!(
72            stream.next().await.is_none(),
73            "The query is expected to return a single row"
74        );
75        let expected = (COUNT as i128, EXPECTED_SUM as i128);
76        let actual = match count {
77            Some(Ok(Row { values, .. })) => {
78                let a = i128::try_from_value((*values)[0].clone());
79                let b = i128::try_from_value((*values)[1].clone());
80                match (a, b) {
81                    (Ok(a), Ok(b)) => Some((a, b)),
82                    (Err(e), _) => panic!("{e}"),
83                    (_, Err(e)) => panic!("{e}"),
84                }
85            }
86            _ => None,
87        };
88        assert_eq!(
89            actual,
90            Some(expected),
91            "SELECT COUNT(*), SUM(value) is expected to return {:?}",
92            expected
93        );
94    }
95
96    // SELECT *
97    {
98        {
99            let stream = pin!(
100                executor.fetch(
101                    QueryBuilder::new()
102                        .select(cols!(*))
103                        .from(Values::table())
104                        .where_expr(true)
105                        .build(&executor.driver())
106                )
107            );
108            let values = stream
109                .map(|row| {
110                    let row = row.expect("Error while fetching the row");
111                    let i = row
112                        .names()
113                        .iter()
114                        .enumerate()
115                        .find_map(|(i, v)| if v == "value" { Some(i) } else { None })
116                        .expect("Column `value` is not present");
117                    u32::try_from_value(row.values[i].clone())
118                        .expect("The result could not be convert back to u32")
119                })
120                .collect::<BTreeSet<_>>()
121                .await;
122            assert!(
123                values.into_iter().eq((1..(COUNT + 1)).into_iter()),
124                "The result didn't received from the db contains all the values that were inserted"
125            );
126        }
127    }
128
129    // SELECT value WHERE value > ?
130    {
131        let mut query = Values::prepare_find(executor, expr!(Values::value > ?), None)
132            .await
133            .expect("Failed to prepare the query");
134        assert!(query.is_prepared());
135        query
136            .bind(EXPECTED_AVG)
137            .expect("Could not bind the parameter");
138        let values = executor
139            .fetch(&mut query)
140            .try_collect::<Vec<_>>()
141            .await
142            .expect("Could not fetch rows above average from the prepared statement");
143        assert!(query.is_prepared());
144        assert_eq!(values.len(), COUNT as usize / 2);
145        query
146            .clear_bindings()
147            .expect("Could not clear the bindings");
148        query.bind(0).expect("Could not bind a second time");
149        let values = executor
150            .fetch(&mut query)
151            .try_collect::<Vec<_>>()
152            .await
153            .expect("Could not fetch positive rows from the prepared statement");
154        assert_eq!(values.len(), COUNT as usize);
155    }
156}