lamellar/array/iterator/
consumer.rs

1//! Represents iterators that consume the elements.
2//! Iterator Consumers are what end up "driving" the iterator
3//!
4
5use crate::active_messaging::{LamellarArcLocalAm, SyncSend};
6use crate::lamellar_task_group::TaskGroupLocalAmHandle;
7use crate::lamellar_team::LamellarTeamRT;
8
9use parking_lot::Mutex;
10use rand::prelude::SliceRandom;
11use rand::thread_rng;
12use std::collections::VecDeque;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicUsize, Ordering};
15use std::sync::Arc;
16
17// trait Consumer{
18//     type Item;
19//     fn init(&self, start: usize, cnt: usize, _s: Sealed) -> Self;
20//     fn monotonic(&self) -> Self;
21//     fn next(&self) -> Self::Item;
22// }
23
24#[derive(Clone, Debug)]
25pub(crate) struct IterWorkStealer {
26    pub(crate) range: Arc<Mutex<(usize, usize)>>, //start, end
27}
28
29impl IterWorkStealer {
30    fn set_range(&self, start: usize, end: usize) {
31        let mut range = self.range.lock();
32        range.0 = start;
33        range.1 = end;
34    }
35
36    fn next(&self) -> Option<usize> {
37        let mut range = self.range.lock();
38        let index = range.0;
39        range.0 += 1;
40        if range.0 <= range.1 {
41            Some(index)
42        } else {
43            None
44        }
45    }
46    // fn set_done(&self) {
47    //     let mut range = self.range.lock();
48    //     range.0 = range.1;
49    // }
50
51    fn steal(&self) -> Option<(usize, usize)> {
52        let mut range = self.range.lock();
53        let start = range.0;
54        let end = range.1;
55        if end > start && end - start > 2 {
56            let new_end = (start + end) / 2;
57            range.1 = new_end;
58            Some((new_end, end))
59        } else {
60            None
61        }
62    }
63}
64
65#[derive(Clone, Debug)]
66pub(crate) enum IterSchedule {
67    Static(usize, usize),
68    Dynamic(Arc<AtomicUsize>, usize),
69    Chunk(Vec<(usize, usize)>, Arc<AtomicUsize>),
70    WorkStealing(IterWorkStealer, Vec<IterWorkStealer>),
71}
72
73impl IterSchedule {
74    pub(crate) fn init_iter<I: IterConsumer>(&self, iter: I) -> IterScheduleIter<I> {
75        match self {
76            IterSchedule::Static(start, end) => {
77                IterScheduleIter::Static(iter.init(*start, end - start))
78            }
79            IterSchedule::Dynamic(cur_i, max_i) => {
80                IterScheduleIter::Dynamic(iter, cur_i.clone(), *max_i)
81            }
82            IterSchedule::Chunk(ranges, range_i) => {
83                IterScheduleIter::Chunk(iter.init(0, 0), ranges.clone(), range_i.clone())
84            }
85            IterSchedule::WorkStealing(range, siblings) => {
86                let (start, end) = *range.range.lock();
87                IterScheduleIter::WorkStealing(
88                    iter.init(start, end - start),
89                    range.clone(),
90                    siblings.clone(),
91                )
92            }
93        }
94    }
95}
96
97pub(crate) enum IterScheduleIter<I: IterConsumer> {
98    Static(I),
99    Dynamic(I, Arc<AtomicUsize>, usize),
100    Chunk(I, Vec<(usize, usize)>, Arc<AtomicUsize>),
101    WorkStealing(I, IterWorkStealer, Vec<IterWorkStealer>),
102}
103
104impl<I: IterConsumer> Iterator for IterScheduleIter<I> {
105    type Item = I::Item;
106    fn next(&mut self) -> Option<Self::Item> {
107        match self {
108            IterScheduleIter::Static(iter) => iter.next(),
109            IterScheduleIter::Dynamic(iter, cur_i, max_i) => {
110                let mut ci = cur_i.fetch_add(1, Ordering::Relaxed);
111                while ci < *max_i {
112                    // println!("ci {:?} maxi {:?} {:?}", ci, *max_i, std::thread::current().id());
113                    *iter = iter.init(ci, 1);
114                    if let Some(elem) = iter.next() {
115                        return Some(elem);
116                    }
117                    ci = cur_i.fetch_add(1, Ordering::Relaxed);
118                }
119                None
120            }
121            IterScheduleIter::Chunk(iter, ranges, range_i) => {
122                let mut next = iter.next();
123                // println!("next {:?} {:?}", next.is_none(), std::thread::current().id());
124                if next.is_none() {
125                    let ri = range_i.fetch_add(1, Ordering::Relaxed);
126                    // println!("range {:?} {:?}", ri, std::thread::current().id());
127                    if ri < ranges.len() {
128                        *iter = iter.init(ranges[ri].0, ranges[ri].1 - ranges[ri].0);
129                        next = iter.next();
130                    }
131                }
132                next
133            }
134            IterScheduleIter::WorkStealing(iter, range, siblings) => {
135                let inner_next = |iter: &mut I| {
136                    while let Some(ri) = range.next() {
137                        *iter = iter.init(ri, 1);
138                        if let Some(elem) = iter.next() {
139                            return Some(elem);
140                        }
141                        // else{
142                        //     range.set_done();
143                        // }
144                    }
145                    None
146                };
147                let mut next = inner_next(iter);
148                if next.is_none() {
149                    let mut rng = thread_rng();
150                    let mut workers = (0..siblings.len()).collect::<Vec<usize>>();
151                    workers.shuffle(&mut rng);
152                    if let Some(worker) = workers.pop() {
153                        if let Some((start, end)) = siblings[worker].steal() {
154                            *iter = iter.init(start, end - start);
155                            range.set_range(start, end);
156                            next = inner_next(iter);
157                        }
158                    }
159                }
160                next
161            }
162        }
163    }
164}
165
166pub(crate) trait IterConsumer: SyncSend {
167    type AmOutput;
168    type Output;
169    type Item;
170    type Handle;
171    fn init(&self, start: usize, cnt: usize) -> Self;
172    fn next(&mut self) -> Option<Self::Item>;
173    fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm;
174    fn create_handle(
175        self,
176        team: Pin<Arc<LamellarTeamRT>>,
177        reqs: VecDeque<TaskGroupLocalAmHandle<Self::AmOutput>>,
178    ) -> Self::Handle;
179    fn max_elems(&self, in_elems: usize) -> usize;
180}