lamellar/array/iterator/
consumer.rs1use crate::active_messaging::{LamellarArcLocalAm, SyncSend};
6use crate::lamellar_task_group::TaskGroupLocalAmHandle;
7use crate::lamellar_team::LamellarTeamRT;
8
9use parking_lot::Mutex;
10use rand::prelude::SliceRandom;
11use rand::thread_rng;
12use std::collections::VecDeque;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicUsize, Ordering};
15use std::sync::Arc;
16
17#[derive(Clone, Debug)]
25pub(crate) struct IterWorkStealer {
26 pub(crate) range: Arc<Mutex<(usize, usize)>>, }
28
29impl IterWorkStealer {
30 fn set_range(&self, start: usize, end: usize) {
31 let mut range = self.range.lock();
32 range.0 = start;
33 range.1 = end;
34 }
35
36 fn next(&self) -> Option<usize> {
37 let mut range = self.range.lock();
38 let index = range.0;
39 range.0 += 1;
40 if range.0 <= range.1 {
41 Some(index)
42 } else {
43 None
44 }
45 }
46 fn steal(&self) -> Option<(usize, usize)> {
52 let mut range = self.range.lock();
53 let start = range.0;
54 let end = range.1;
55 if end > start && end - start > 2 {
56 let new_end = (start + end) / 2;
57 range.1 = new_end;
58 Some((new_end, end))
59 } else {
60 None
61 }
62 }
63}
64
65#[derive(Clone, Debug)]
66pub(crate) enum IterSchedule {
67 Static(usize, usize),
68 Dynamic(Arc<AtomicUsize>, usize),
69 Chunk(Vec<(usize, usize)>, Arc<AtomicUsize>),
70 WorkStealing(IterWorkStealer, Vec<IterWorkStealer>),
71}
72
73impl IterSchedule {
74 pub(crate) fn init_iter<I: IterConsumer>(&self, iter: I) -> IterScheduleIter<I> {
75 match self {
76 IterSchedule::Static(start, end) => {
77 IterScheduleIter::Static(iter.init(*start, end - start))
78 }
79 IterSchedule::Dynamic(cur_i, max_i) => {
80 IterScheduleIter::Dynamic(iter, cur_i.clone(), *max_i)
81 }
82 IterSchedule::Chunk(ranges, range_i) => {
83 IterScheduleIter::Chunk(iter.init(0, 0), ranges.clone(), range_i.clone())
84 }
85 IterSchedule::WorkStealing(range, siblings) => {
86 let (start, end) = *range.range.lock();
87 IterScheduleIter::WorkStealing(
88 iter.init(start, end - start),
89 range.clone(),
90 siblings.clone(),
91 )
92 }
93 }
94 }
95}
96
97pub(crate) enum IterScheduleIter<I: IterConsumer> {
98 Static(I),
99 Dynamic(I, Arc<AtomicUsize>, usize),
100 Chunk(I, Vec<(usize, usize)>, Arc<AtomicUsize>),
101 WorkStealing(I, IterWorkStealer, Vec<IterWorkStealer>),
102}
103
104impl<I: IterConsumer> Iterator for IterScheduleIter<I> {
105 type Item = I::Item;
106 fn next(&mut self) -> Option<Self::Item> {
107 match self {
108 IterScheduleIter::Static(iter) => iter.next(),
109 IterScheduleIter::Dynamic(iter, cur_i, max_i) => {
110 let mut ci = cur_i.fetch_add(1, Ordering::Relaxed);
111 while ci < *max_i {
112 *iter = iter.init(ci, 1);
114 if let Some(elem) = iter.next() {
115 return Some(elem);
116 }
117 ci = cur_i.fetch_add(1, Ordering::Relaxed);
118 }
119 None
120 }
121 IterScheduleIter::Chunk(iter, ranges, range_i) => {
122 let mut next = iter.next();
123 if next.is_none() {
125 let ri = range_i.fetch_add(1, Ordering::Relaxed);
126 if ri < ranges.len() {
128 *iter = iter.init(ranges[ri].0, ranges[ri].1 - ranges[ri].0);
129 next = iter.next();
130 }
131 }
132 next
133 }
134 IterScheduleIter::WorkStealing(iter, range, siblings) => {
135 let inner_next = |iter: &mut I| {
136 while let Some(ri) = range.next() {
137 *iter = iter.init(ri, 1);
138 if let Some(elem) = iter.next() {
139 return Some(elem);
140 }
141 }
145 None
146 };
147 let mut next = inner_next(iter);
148 if next.is_none() {
149 let mut rng = thread_rng();
150 let mut workers = (0..siblings.len()).collect::<Vec<usize>>();
151 workers.shuffle(&mut rng);
152 if let Some(worker) = workers.pop() {
153 if let Some((start, end)) = siblings[worker].steal() {
154 *iter = iter.init(start, end - start);
155 range.set_range(start, end);
156 next = inner_next(iter);
157 }
158 }
159 }
160 next
161 }
162 }
163 }
164}
165
166pub(crate) trait IterConsumer: SyncSend {
167 type AmOutput;
168 type Output;
169 type Item;
170 type Handle;
171 fn init(&self, start: usize, cnt: usize) -> Self;
172 fn next(&mut self) -> Option<Self::Item>;
173 fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm;
174 fn create_handle(
175 self,
176 team: Pin<Arc<LamellarTeamRT>>,
177 reqs: VecDeque<TaskGroupLocalAmHandle<Self::AmOutput>>,
178 ) -> Self::Handle;
179 fn max_elems(&self, in_elems: usize) -> usize;
180}