raphtory/core/state/
mod.rs

1pub mod accumulator_id;
2pub mod agg;
3pub mod compute_state;
4pub mod container;
5pub mod morcel_state;
6pub mod shuffle_state;
7
8pub trait StateType: PartialEq + Clone + std::fmt::Debug + Send + Sync + 'static {}
9
10impl<T: PartialEq + Clone + std::fmt::Debug + Send + Sync + 'static> StateType for T {}
11
12#[cfg(test)]
13mod state_test {
14    use itertools::Itertools;
15    use quickcheck_macros::quickcheck;
16    use rand::Rng;
17
18    use crate::{
19        core::state::{
20            accumulator_id::accumulators, compute_state::ComputeStateVec, container::merge_2_vecs,
21            morcel_state::MorcelComputeState, shuffle_state::ShuffleComputeState,
22        },
23        db::{api::mutation::AdditionOps, graph::graph::Graph},
24        prelude::NO_PROPS,
25    };
26
27    #[quickcheck]
28    fn check_merge_2_vecs(mut a: Vec<usize>, b: Vec<usize>) {
29        let len_a = a.len();
30        let len_b = b.len();
31
32        merge_2_vecs(&mut a, &b, |ia, ib| *ia = usize::max(*ia, *ib));
33
34        assert_eq!(a.len(), usize::max(len_a, len_b));
35
36        for (i, expected) in a.iter().enumerate() {
37            match (a.get(i), b.get(i)) {
38                (Some(va), Some(vb)) => assert_eq!(*expected, usize::max(*va, *vb)),
39                (Some(va), None) => assert_eq!(*expected, *va),
40                (None, Some(vb)) => assert_eq!(*expected, *vb),
41                (None, None) => panic!("value should exist in either a or b"),
42            }
43        }
44    }
45
46    fn tiny_graph() -> Graph {
47        let g = Graph::new();
48
49        g.add_node(1, 1, NO_PROPS, None).unwrap();
50        g.add_node(1, 2, NO_PROPS, None).unwrap();
51        g.add_node(1, 3, NO_PROPS, None).unwrap();
52        g
53    }
54
55    #[test]
56    fn min_aggregates_for_3_keys() {
57        let g = tiny_graph();
58
59        let min = accumulators::min(0);
60
61        let mut state_map: MorcelComputeState<ComputeStateVec> = MorcelComputeState::new(3);
62
63        // create random vec of numbers
64        let mut rng = rand::thread_rng();
65        let mut vec = vec![];
66        let mut actual_min = i32::MAX;
67        for _ in 0..100 {
68            let i = rng.gen_range(0..100);
69            actual_min = actual_min.min(i);
70            vec.push(i);
71        }
72
73        for a in vec {
74            state_map.accumulate_into(0, 0, a, &min);
75            state_map.accumulate_into(0, 1, a, &min);
76            state_map.accumulate_into(0, 2, a, &min);
77        }
78
79        let mut actual = state_map.finalize(0, &min, &g).into_iter().collect_vec();
80        actual.sort();
81        assert_eq!(
82            actual,
83            vec![(0, actual_min), (1, actual_min), (2, actual_min),]
84        );
85    }
86
87    #[test]
88    fn avg_aggregates_for_3_keys() {
89        let g = tiny_graph();
90
91        let avg = accumulators::avg(0);
92
93        let mut state_map: MorcelComputeState<ComputeStateVec> = MorcelComputeState::new(3);
94
95        // create random vec of numbers
96        let mut rng = rand::thread_rng();
97        let mut vec = vec![];
98        let mut sum = 0;
99        for _ in 0..100 {
100            let i = rng.gen_range(0..100);
101            sum += i;
102            vec.push(i);
103        }
104
105        for a in vec {
106            state_map.accumulate_into(0, 0, a, &avg);
107            state_map.accumulate_into(0, 1, a, &avg);
108            state_map.accumulate_into(0, 2, a, &avg);
109        }
110
111        let actual_avg = sum / 100;
112        let mut actual = state_map.finalize(0, &avg, &g).into_iter().collect_vec();
113        actual.sort();
114        assert_eq!(
115            actual,
116            vec![(0, actual_avg), (1, actual_avg), (2, actual_avg),]
117        );
118    }
119
120    #[test]
121    fn top3_aggregates_for_3_keys() {
122        let g = tiny_graph();
123
124        let top3 = accumulators::topk::<i32, 3>(0);
125
126        let mut state_map: MorcelComputeState<ComputeStateVec> = MorcelComputeState::new(3);
127
128        for a in 0..100 {
129            state_map.accumulate_into(0, 0, a, &top3);
130            state_map.accumulate_into(0, 1, a, &top3);
131            state_map.accumulate_into(0, 2, a, &top3);
132        }
133        let expected = vec![99, 98, 97];
134
135        let mut actual = state_map.finalize(0, &top3, &g).into_iter().collect_vec();
136
137        actual.sort();
138
139        assert_eq!(
140            actual,
141            vec![
142                (0, expected.clone()),
143                (1, expected.clone()),
144                (2, expected.clone()),
145            ]
146        );
147    }
148
149    #[test]
150    fn sum_aggregates_for_3_keys() {
151        let g = tiny_graph();
152
153        let sum = accumulators::sum(0);
154
155        let mut state: MorcelComputeState<ComputeStateVec> = MorcelComputeState::new(3);
156
157        // create random vec of numbers
158        let mut rng = rand::thread_rng();
159        let mut vec = vec![];
160        let mut actual_sum = 0;
161        for _ in 0..100 {
162            let i = rng.gen_range(0..100);
163            actual_sum += i;
164            vec.push(i);
165        }
166
167        for a in vec {
168            state.accumulate_into(0, 0, a, &sum);
169            state.accumulate_into(0, 1, a, &sum);
170            state.accumulate_into(0, 2, a, &sum);
171        }
172
173        let mut actual = state.finalize(0, &sum, &g).into_iter().collect_vec();
174        actual.sort();
175        assert_eq!(
176            actual,
177            vec![(0, actual_sum), (1, actual_sum), (2, actual_sum),]
178        );
179    }
180
181    #[test]
182    fn sum_aggregates_for_3_keys_2_parts() {
183        let sum = accumulators::sum(0);
184
185        let mut part1_state: ShuffleComputeState<ComputeStateVec> =
186            ShuffleComputeState::new(3, 2, 2);
187        let mut part2_state: ShuffleComputeState<ComputeStateVec> =
188            ShuffleComputeState::new(3, 2, 2);
189
190        // create random vec of numbers
191        let mut rng = rand::thread_rng();
192        let mut vec1 = vec![];
193        let mut vec2 = vec![];
194        let mut actual_sum_1 = 0;
195        let mut actual_sum_2 = 0;
196        for _ in 0..3 {
197            // data for first partition
198            let i = rng.gen_range(0..100);
199            actual_sum_1 += i;
200            vec1.push(i);
201
202            // data for second partition
203            let i = rng.gen_range(0..100);
204            actual_sum_2 += i;
205            vec2.push(i);
206        }
207
208        // 1 gets all the numbers
209        // 2 gets the numbers from part1
210        // 3 gets the numbers from part2
211        for a in vec1 {
212            part1_state.accumulate_into(0, 0, a, &sum);
213            part1_state.accumulate_into(0, 1, a, &sum);
214        }
215
216        for a in vec2 {
217            part2_state.accumulate_into(0, 0, a, &sum);
218            part2_state.accumulate_into(0, 2, a, &sum);
219        }
220
221        let actual = part1_state.iter_out(0, sum).collect_vec();
222
223        assert_eq!(actual, vec![(0, actual_sum_1), (1, actual_sum_1), (2, 0)]);
224
225        let actual = part2_state.iter_out(0, sum).collect_vec();
226
227        assert_eq!(actual, vec![(0, actual_sum_2), (1, 0), (2, actual_sum_2)]);
228
229        ShuffleComputeState::merge_mut(&mut part1_state, &part2_state, sum, 0);
230
231        let actual = part1_state.iter_out(0, sum).collect_vec();
232
233        assert_eq!(
234            actual,
235            vec![
236                (0, actual_sum_1 + actual_sum_2),
237                (1, actual_sum_1),
238                (2, actual_sum_2),
239            ]
240        );
241    }
242
243    #[test]
244    fn min_sum_aggregates_for_3_keys_2_parts() {
245        let g = tiny_graph();
246
247        let sum = accumulators::sum(0);
248        let min = accumulators::min(1);
249
250        let mut part1_state: ShuffleComputeState<ComputeStateVec> =
251            ShuffleComputeState::new(3, 2, 2);
252        let mut part2_state: ShuffleComputeState<ComputeStateVec> =
253            ShuffleComputeState::new(3, 2, 2);
254
255        // create random vec of numbers
256        let mut rng = rand::thread_rng();
257        let mut vec1 = vec![];
258        let mut vec2 = vec![];
259        let mut actual_sum_1 = 0;
260        let mut actual_sum_2 = 0;
261        let mut actual_min_1 = 100;
262        let mut actual_min_2 = 100;
263        for _ in 0..100 {
264            // data for first partition
265            let i = rng.gen_range(0..100);
266            actual_sum_1 += i;
267            actual_min_1 = actual_min_1.min(i);
268            vec1.push(i);
269
270            // data for second partition
271            let i = rng.gen_range(0..100);
272            actual_sum_2 += i;
273            actual_min_2 = actual_min_2.min(i);
274            vec2.push(i);
275        }
276
277        // 1 gets all the numbers
278        // 2 gets the numbers from part1
279        // 3 gets the numbers from part2
280        for a in vec1 {
281            part1_state.accumulate_into(0, 0, a, &sum);
282            part1_state.accumulate_into(0, 1, a, &sum);
283            part1_state.accumulate_into(0, 0, a, &min);
284            part1_state.accumulate_into(0, 1, a, &min);
285        }
286
287        for a in vec2 {
288            part2_state.accumulate_into(0, 0, a, &sum);
289            part2_state.accumulate_into(0, 2, a, &sum);
290            part2_state.accumulate_into(0, 0, a, &min);
291            part2_state.accumulate_into(0, 2, a, &min);
292        }
293
294        let mut actual = part1_state
295            .clone()
296            .finalize(&sum, 0, &g, |c| c)
297            .into_iter()
298            .collect_vec();
299
300        actual.sort();
301
302        assert_eq!(actual, vec![(0, actual_sum_1), (1, actual_sum_1), (2, 0),]);
303
304        let mut actual = part1_state
305            .clone()
306            .finalize(&min, 0, &g, |c| c)
307            .into_iter()
308            .collect_vec();
309
310        actual.sort();
311
312        assert_eq!(
313            actual,
314            vec![(0, actual_min_1), (1, actual_min_1), (2, i32::MAX),]
315        );
316
317        let mut actual = part2_state
318            .clone()
319            .finalize(&sum, 0, &g, |c| c)
320            .into_iter()
321            .collect_vec();
322
323        actual.sort();
324
325        assert_eq!(actual, vec![(0, actual_sum_2), (1, 0), (2, actual_sum_2),]);
326
327        let mut actual = part2_state
328            .clone()
329            .finalize(&min, 0, &g, |c| c)
330            .into_iter()
331            .collect_vec();
332
333        actual.sort();
334
335        assert_eq!(
336            actual,
337            vec![(0, actual_min_2), (1, i32::MAX), (2, actual_min_2),]
338        );
339
340        ShuffleComputeState::merge_mut(&mut part1_state, &part2_state, sum, 0);
341        let mut actual = part1_state
342            .clone()
343            .finalize(&sum, 0, &g, |c| c)
344            .into_iter()
345            .collect_vec();
346
347        actual.sort();
348
349        assert_eq!(
350            actual,
351            vec![
352                (0, actual_sum_1 + actual_sum_2),
353                (1, actual_sum_1),
354                (2, actual_sum_2),
355            ]
356        );
357
358        ShuffleComputeState::merge_mut(&mut part1_state, &part2_state, min, 0);
359        let mut actual = part1_state
360            .clone()
361            .finalize(&min, 0, &g, |c| c)
362            .into_iter()
363            .collect_vec();
364
365        actual.sort();
366
367        assert_eq!(
368            actual,
369            vec![
370                (0, actual_min_1.min(actual_min_2)),
371                (1, actual_min_1),
372                (2, actual_min_2),
373            ]
374        );
375    }
376}