1pub mod accumulator_id;
2pub mod agg;
3pub mod compute_state;
4pub mod container;
5pub mod morcel_state;
6pub mod shuffle_state;
7
8pub trait StateType: PartialEq + Clone + std::fmt::Debug + Send + Sync + 'static {}
9
10impl<T: PartialEq + Clone + std::fmt::Debug + Send + Sync + 'static> StateType for T {}
11
12#[cfg(test)]
13mod state_test {
14 use itertools::Itertools;
15 use quickcheck_macros::quickcheck;
16 use rand::Rng;
17
18 use crate::{
19 core::state::{
20 accumulator_id::accumulators, compute_state::ComputeStateVec, container::merge_2_vecs,
21 morcel_state::MorcelComputeState, shuffle_state::ShuffleComputeState,
22 },
23 db::{api::mutation::AdditionOps, graph::graph::Graph},
24 prelude::NO_PROPS,
25 };
26
27 #[quickcheck]
28 fn check_merge_2_vecs(mut a: Vec<usize>, b: Vec<usize>) {
29 let len_a = a.len();
30 let len_b = b.len();
31
32 merge_2_vecs(&mut a, &b, |ia, ib| *ia = usize::max(*ia, *ib));
33
34 assert_eq!(a.len(), usize::max(len_a, len_b));
35
36 for (i, expected) in a.iter().enumerate() {
37 match (a.get(i), b.get(i)) {
38 (Some(va), Some(vb)) => assert_eq!(*expected, usize::max(*va, *vb)),
39 (Some(va), None) => assert_eq!(*expected, *va),
40 (None, Some(vb)) => assert_eq!(*expected, *vb),
41 (None, None) => panic!("value should exist in either a or b"),
42 }
43 }
44 }
45
46 fn tiny_graph() -> Graph {
47 let g = Graph::new();
48
49 g.add_node(1, 1, NO_PROPS, None).unwrap();
50 g.add_node(1, 2, NO_PROPS, None).unwrap();
51 g.add_node(1, 3, NO_PROPS, None).unwrap();
52 g
53 }
54
55 #[test]
56 fn min_aggregates_for_3_keys() {
57 let g = tiny_graph();
58
59 let min = accumulators::min(0);
60
61 let mut state_map: MorcelComputeState<ComputeStateVec> = MorcelComputeState::new(3);
62
63 let mut rng = rand::thread_rng();
65 let mut vec = vec![];
66 let mut actual_min = i32::MAX;
67 for _ in 0..100 {
68 let i = rng.gen_range(0..100);
69 actual_min = actual_min.min(i);
70 vec.push(i);
71 }
72
73 for a in vec {
74 state_map.accumulate_into(0, 0, a, &min);
75 state_map.accumulate_into(0, 1, a, &min);
76 state_map.accumulate_into(0, 2, a, &min);
77 }
78
79 let mut actual = state_map.finalize(0, &min, &g).into_iter().collect_vec();
80 actual.sort();
81 assert_eq!(
82 actual,
83 vec![(0, actual_min), (1, actual_min), (2, actual_min),]
84 );
85 }
86
87 #[test]
88 fn avg_aggregates_for_3_keys() {
89 let g = tiny_graph();
90
91 let avg = accumulators::avg(0);
92
93 let mut state_map: MorcelComputeState<ComputeStateVec> = MorcelComputeState::new(3);
94
95 let mut rng = rand::thread_rng();
97 let mut vec = vec![];
98 let mut sum = 0;
99 for _ in 0..100 {
100 let i = rng.gen_range(0..100);
101 sum += i;
102 vec.push(i);
103 }
104
105 for a in vec {
106 state_map.accumulate_into(0, 0, a, &avg);
107 state_map.accumulate_into(0, 1, a, &avg);
108 state_map.accumulate_into(0, 2, a, &avg);
109 }
110
111 let actual_avg = sum / 100;
112 let mut actual = state_map.finalize(0, &avg, &g).into_iter().collect_vec();
113 actual.sort();
114 assert_eq!(
115 actual,
116 vec![(0, actual_avg), (1, actual_avg), (2, actual_avg),]
117 );
118 }
119
120 #[test]
121 fn top3_aggregates_for_3_keys() {
122 let g = tiny_graph();
123
124 let top3 = accumulators::topk::<i32, 3>(0);
125
126 let mut state_map: MorcelComputeState<ComputeStateVec> = MorcelComputeState::new(3);
127
128 for a in 0..100 {
129 state_map.accumulate_into(0, 0, a, &top3);
130 state_map.accumulate_into(0, 1, a, &top3);
131 state_map.accumulate_into(0, 2, a, &top3);
132 }
133 let expected = vec![99, 98, 97];
134
135 let mut actual = state_map.finalize(0, &top3, &g).into_iter().collect_vec();
136
137 actual.sort();
138
139 assert_eq!(
140 actual,
141 vec![
142 (0, expected.clone()),
143 (1, expected.clone()),
144 (2, expected.clone()),
145 ]
146 );
147 }
148
149 #[test]
150 fn sum_aggregates_for_3_keys() {
151 let g = tiny_graph();
152
153 let sum = accumulators::sum(0);
154
155 let mut state: MorcelComputeState<ComputeStateVec> = MorcelComputeState::new(3);
156
157 let mut rng = rand::thread_rng();
159 let mut vec = vec![];
160 let mut actual_sum = 0;
161 for _ in 0..100 {
162 let i = rng.gen_range(0..100);
163 actual_sum += i;
164 vec.push(i);
165 }
166
167 for a in vec {
168 state.accumulate_into(0, 0, a, &sum);
169 state.accumulate_into(0, 1, a, &sum);
170 state.accumulate_into(0, 2, a, &sum);
171 }
172
173 let mut actual = state.finalize(0, &sum, &g).into_iter().collect_vec();
174 actual.sort();
175 assert_eq!(
176 actual,
177 vec![(0, actual_sum), (1, actual_sum), (2, actual_sum),]
178 );
179 }
180
181 #[test]
182 fn sum_aggregates_for_3_keys_2_parts() {
183 let sum = accumulators::sum(0);
184
185 let mut part1_state: ShuffleComputeState<ComputeStateVec> =
186 ShuffleComputeState::new(3, 2, 2);
187 let mut part2_state: ShuffleComputeState<ComputeStateVec> =
188 ShuffleComputeState::new(3, 2, 2);
189
190 let mut rng = rand::thread_rng();
192 let mut vec1 = vec![];
193 let mut vec2 = vec![];
194 let mut actual_sum_1 = 0;
195 let mut actual_sum_2 = 0;
196 for _ in 0..3 {
197 let i = rng.gen_range(0..100);
199 actual_sum_1 += i;
200 vec1.push(i);
201
202 let i = rng.gen_range(0..100);
204 actual_sum_2 += i;
205 vec2.push(i);
206 }
207
208 for a in vec1 {
212 part1_state.accumulate_into(0, 0, a, &sum);
213 part1_state.accumulate_into(0, 1, a, &sum);
214 }
215
216 for a in vec2 {
217 part2_state.accumulate_into(0, 0, a, &sum);
218 part2_state.accumulate_into(0, 2, a, &sum);
219 }
220
221 let actual = part1_state.iter_out(0, sum).collect_vec();
222
223 assert_eq!(actual, vec![(0, actual_sum_1), (1, actual_sum_1), (2, 0)]);
224
225 let actual = part2_state.iter_out(0, sum).collect_vec();
226
227 assert_eq!(actual, vec![(0, actual_sum_2), (1, 0), (2, actual_sum_2)]);
228
229 ShuffleComputeState::merge_mut(&mut part1_state, &part2_state, sum, 0);
230
231 let actual = part1_state.iter_out(0, sum).collect_vec();
232
233 assert_eq!(
234 actual,
235 vec![
236 (0, actual_sum_1 + actual_sum_2),
237 (1, actual_sum_1),
238 (2, actual_sum_2),
239 ]
240 );
241 }
242
243 #[test]
244 fn min_sum_aggregates_for_3_keys_2_parts() {
245 let g = tiny_graph();
246
247 let sum = accumulators::sum(0);
248 let min = accumulators::min(1);
249
250 let mut part1_state: ShuffleComputeState<ComputeStateVec> =
251 ShuffleComputeState::new(3, 2, 2);
252 let mut part2_state: ShuffleComputeState<ComputeStateVec> =
253 ShuffleComputeState::new(3, 2, 2);
254
255 let mut rng = rand::thread_rng();
257 let mut vec1 = vec![];
258 let mut vec2 = vec![];
259 let mut actual_sum_1 = 0;
260 let mut actual_sum_2 = 0;
261 let mut actual_min_1 = 100;
262 let mut actual_min_2 = 100;
263 for _ in 0..100 {
264 let i = rng.gen_range(0..100);
266 actual_sum_1 += i;
267 actual_min_1 = actual_min_1.min(i);
268 vec1.push(i);
269
270 let i = rng.gen_range(0..100);
272 actual_sum_2 += i;
273 actual_min_2 = actual_min_2.min(i);
274 vec2.push(i);
275 }
276
277 for a in vec1 {
281 part1_state.accumulate_into(0, 0, a, &sum);
282 part1_state.accumulate_into(0, 1, a, &sum);
283 part1_state.accumulate_into(0, 0, a, &min);
284 part1_state.accumulate_into(0, 1, a, &min);
285 }
286
287 for a in vec2 {
288 part2_state.accumulate_into(0, 0, a, &sum);
289 part2_state.accumulate_into(0, 2, a, &sum);
290 part2_state.accumulate_into(0, 0, a, &min);
291 part2_state.accumulate_into(0, 2, a, &min);
292 }
293
294 let mut actual = part1_state
295 .clone()
296 .finalize(&sum, 0, &g, |c| c)
297 .into_iter()
298 .collect_vec();
299
300 actual.sort();
301
302 assert_eq!(actual, vec![(0, actual_sum_1), (1, actual_sum_1), (2, 0),]);
303
304 let mut actual = part1_state
305 .clone()
306 .finalize(&min, 0, &g, |c| c)
307 .into_iter()
308 .collect_vec();
309
310 actual.sort();
311
312 assert_eq!(
313 actual,
314 vec![(0, actual_min_1), (1, actual_min_1), (2, i32::MAX),]
315 );
316
317 let mut actual = part2_state
318 .clone()
319 .finalize(&sum, 0, &g, |c| c)
320 .into_iter()
321 .collect_vec();
322
323 actual.sort();
324
325 assert_eq!(actual, vec![(0, actual_sum_2), (1, 0), (2, actual_sum_2),]);
326
327 let mut actual = part2_state
328 .clone()
329 .finalize(&min, 0, &g, |c| c)
330 .into_iter()
331 .collect_vec();
332
333 actual.sort();
334
335 assert_eq!(
336 actual,
337 vec![(0, actual_min_2), (1, i32::MAX), (2, actual_min_2),]
338 );
339
340 ShuffleComputeState::merge_mut(&mut part1_state, &part2_state, sum, 0);
341 let mut actual = part1_state
342 .clone()
343 .finalize(&sum, 0, &g, |c| c)
344 .into_iter()
345 .collect_vec();
346
347 actual.sort();
348
349 assert_eq!(
350 actual,
351 vec![
352 (0, actual_sum_1 + actual_sum_2),
353 (1, actual_sum_1),
354 (2, actual_sum_2),
355 ]
356 );
357
358 ShuffleComputeState::merge_mut(&mut part1_state, &part2_state, min, 0);
359 let mut actual = part1_state
360 .clone()
361 .finalize(&min, 0, &g, |c| c)
362 .into_iter()
363 .collect_vec();
364
365 actual.sort();
366
367 assert_eq!(
368 actual,
369 vec![
370 (0, actual_min_1.min(actual_min_2)),
371 (1, actual_min_1),
372 (2, actual_min_2),
373 ]
374 );
375 }
376}