tank_tests/
aggregates.rs

1use std::collections::BTreeSet;
2use std::{pin::pin, sync::LazyLock};
3use tank::{
4    AsValue, DataSet, Entity, Passive, RowLabeled, expr, stream::StreamExt, stream::TryStreamExt,
5};
6use tank::{Executor, cols};
7use tokio::sync::Mutex;
8
9#[derive(Default, Entity)]
10struct Values {
11    id: Passive<u64>,
12    /// This column contains the actual value
13    value: u32,
14}
15const EXPECTED_SUM: u32 = 68978385;
16const EXPECTED_AVG: u32 = 5873;
17const COUNT: u32 = 11745;
18static MUTEX: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
19
20pub async fn aggregates<E: Executor>(executor: &mut E) {
21    let _lock = MUTEX.lock();
22
23    // Setup
24    Values::drop_table(executor, true, false)
25        .await
26        .expect("Failed to drop Values table");
27    Values::create_table(executor, false, false)
28        .await
29        .expect("Failed to create Values table");
30
31    // Insert
32    // 1 + .. + 11745 = 68978385
33    // avg(1, .., 11745) = 5873
34    let mut values = (1..11746).map(|value| Values {
35        id: value.into(),
36        value: value as u32,
37    });
38    loop {
39        let rows = values.by_ref().take(2000).collect::<Vec<_>>();
40        if rows.is_empty() {
41            break;
42        }
43        let result = Values::insert_many(executor, rows.iter()).await;
44        assert!(
45            result.is_ok(),
46            "Failed to Values::insert_many: {:?}",
47            result.unwrap_err()
48        );
49        let result = result.unwrap();
50        if let Some(affected) = result.rows_affected {
51            assert_eq!(
52                affected,
53                rows.len() as u64,
54                "Values::insert_many should have affected {} rows",
55                rows.len()
56            );
57        }
58    }
59
60    // SELECT COUNT(*), SUM(value)
61    {
62        let mut stream = pin!(Values::table().select(
63            executor,
64            cols!(COUNT(*), SUM(Values::value)),
65            &true,
66            None
67        ));
68        let count = stream.next().await;
69        assert!(
70            stream.next().await.is_none(),
71            "The query is expected to return a single row"
72        );
73        let expected = (COUNT as i128, EXPECTED_SUM as i128);
74        let actual = match count {
75            Some(Ok(RowLabeled { values, .. })) => {
76                let a = i128::try_from_value((*values)[0].clone());
77                let b = i128::try_from_value((*values)[1].clone());
78                match (a, b) {
79                    (Ok(a), Ok(b)) => Some((a, b)),
80                    (Err(e), _) => panic!("{e}"),
81                    (_, Err(e)) => panic!("{e}"),
82                }
83            }
84            _ => None,
85        };
86        assert_eq!(
87            actual,
88            Some(expected),
89            "SELECT COUNT(*), SUM(value) is expected to return {:?}",
90            expected
91        );
92    }
93
94    // SELECT *
95    {
96        {
97            let stream = pin!(Values::table().select(executor, cols!(*), &true, None));
98            let values = stream
99                .map(|row| {
100                    let row = row.expect("Error while fetching the row");
101                    let i = row
102                        .names()
103                        .iter()
104                        .enumerate()
105                        .find_map(|(i, v)| if v == "value" { Some(i) } else { None })
106                        .expect("Column `value` is not present");
107                    u32::try_from_value(row.values[i].clone())
108                        .expect("The result could not be convert back to u32")
109                })
110                .collect::<BTreeSet<_>>()
111                .await;
112            assert!(
113                values.into_iter().eq((1..(COUNT + 1)).into_iter()),
114                "The result didn't received from the db contains all the values that were inserted"
115            );
116        }
117    }
118
119    // SELECT value WHERE value > ?
120    {
121        let mut query = Values::prepare_find(executor, &expr!(Values::value > ?), None)
122            .await
123            .expect("Failed to prepare the query");
124        assert!(query.is_prepared());
125        query
126            .bind(EXPECTED_AVG)
127            .expect("Could not bind the parameter");
128        let values = executor
129            .fetch(&mut query)
130            .try_collect::<Vec<_>>()
131            .await
132            .expect("Could not fetch rows above average from the prepared statement");
133        assert!(query.is_prepared());
134        assert_eq!(values.len(), COUNT as usize / 2);
135        query
136            .clear_bindings()
137            .expect("Could not clear the bindings");
138        query.bind(0).expect("Could not bind a second time");
139        let values = executor
140            .fetch(&mut query)
141            .try_collect::<Vec<_>>()
142            .await
143            .expect("Could not fetch positive rows from the prepared statement");
144        assert_eq!(values.len(), COUNT as usize);
145    }
146}