async_ecs/join/
parallel.rs

1use std::iter::Iterator;
2
3use asparit::{Consumer, Executor, ParallelIterator, Producer, Reducer, WithSetup};
4
5use crate::misc::{BitIter, BitProducer};
6
7use super::Join;
8
9/* JoinParIter */
10
11/// `JoinParIter` is a `ParallelIterator` over a group of `Storages`.
12pub struct JoinParIter<J>(J);
13
14impl<J> JoinParIter<J> {
15    pub fn new(inner: J) -> Self {
16        Self(inner)
17    }
18}
19
20impl<'a, J> ParallelIterator<'a> for JoinParIter<J>
21where
22    J: Join + Send + 'a,
23    J::Type: Send,
24    J::Value: Copy + Send,
25    J::Mask: Copy + Send + Sync,
26{
27    type Item = J::Type;
28
29    fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
30    where
31        E: Executor<'a, D>,
32        C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
33        D: Send + 'a,
34        R: Reducer<D> + Send + 'a,
35    {
36        let (keys, values) = unsafe { self.0.open() };
37
38        let keys = BitIter::new(keys);
39
40        let producer = BitProducer::new(keys);
41        let producer = JoinProducer::<J>::new(producer, values);
42
43        executor.exec(producer, consumer)
44    }
45}
46
47/* JoinProducer */
48
49struct JoinProducer<J>
50where
51    J: Join,
52{
53    keys: BitProducer<J::Mask>,
54    values: J::Value,
55}
56
57impl<J> JoinProducer<J>
58where
59    J: Join,
60{
61    fn new(keys: BitProducer<J::Mask>, values: J::Value) -> Self {
62        JoinProducer { keys, values }
63    }
64}
65
66unsafe impl<J> Send for JoinProducer<J>
67where
68    J: Join + Send,
69    J::Type: Send,
70    J::Value: Send,
71    J::Mask: Send + Sync,
72{
73}
74
75impl<J> WithSetup for JoinProducer<J> where J: Join {}
76
77impl<J> Producer for JoinProducer<J>
78where
79    J: Join + Send,
80    J::Type: Send,
81    J::Value: Copy + Send,
82    J::Mask: Copy + Send + Sync,
83{
84    type Item = J::Type;
85    type IntoIter = JoinIter<J>;
86
87    fn into_iter(self) -> Self::IntoIter {
88        JoinIter {
89            keys: self.keys.into_iter(),
90            values: self.values,
91        }
92    }
93
94    fn split(self) -> (Self, Option<Self>) {
95        let values = self.values;
96        let (left, right) = self.keys.split();
97
98        let left = JoinProducer::new(left, values);
99        let right = right.map(|right| JoinProducer::new(right, values));
100
101        (left, right)
102    }
103}
104
105/* JoinIter */
106
107struct JoinIter<J>
108where
109    J: Join,
110{
111    keys: BitIter<J::Mask>,
112    values: J::Value,
113}
114
115impl<J> Iterator for JoinIter<J>
116where
117    J: Join,
118{
119    type Item = J::Type;
120
121    fn next(&mut self) -> Option<Self::Item> {
122        let index = self.keys.next()?;
123        let value = unsafe { J::get(&mut self.values, index) };
124
125        Some(value)
126    }
127
128    fn size_hint(&self) -> (usize, Option<usize>) {
129        self.keys.size_hint()
130    }
131}