1use std::collections::VecDeque;
6use std::sync::{Arc, Mutex};
7
8pub struct NoLock;
12
13impl NoLock {
14 pub fn new() -> Self {
16 Self
17 }
18
19 pub fn lock(&self) -> NoLockGuard {
21 NoLockGuard
22 }
23}
24
25impl Default for NoLock {
26 fn default() -> Self {
27 Self::new()
28 }
29}
30
31pub struct NoLockGuard;
33
34pub fn batch_iterate<T, I>(size: Option<usize>, iterable: I) -> BatchIterator<T, I::IntoIter>
55where
56 I: IntoIterator<Item = T>,
57{
58 BatchIterator {
59 size,
60 iter: iterable.into_iter(),
61 }
62}
63
64pub struct BatchIterator<T, I>
66where
67 I: Iterator<Item = T>,
68{
69 size: Option<usize>,
70 iter: I,
71}
72
73impl<T, I> Iterator for BatchIterator<T, I>
74where
75 I: Iterator<Item = T>,
76{
77 type Item = Vec<T>;
78
79 fn next(&mut self) -> Option<Self::Item> {
80 match self.size {
81 Some(size) => {
82 let batch: Vec<T> = self.iter.by_ref().take(size).collect();
83 if batch.is_empty() { None } else { Some(batch) }
84 }
85 None => {
86 let batch: Vec<T> = self.iter.by_ref().collect();
87 if batch.is_empty() { None } else { Some(batch) }
88 }
89 }
90 }
91}
92
93pub fn tee<T, I>(iterable: I, n: usize) -> Tee<T>
120where
121 T: Clone,
122 I: IntoIterator<Item = T>,
123 <I as IntoIterator>::IntoIter: Send + 'static,
124{
125 Tee::new(iterable, n)
126}
127
128pub struct Tee<T> {
130 source: Arc<Mutex<TeeSource<T>>>,
131 children: Vec<TeeChild<T>>,
132}
133
134struct TeeSource<T> {
135 iter: Box<dyn Iterator<Item = T> + Send>,
136 buffers: Vec<VecDeque<T>>,
137}
138
139impl<T> Tee<T>
140where
141 T: Clone,
142{
143 pub fn new<I>(iterable: I, n: usize) -> Self
145 where
146 I: IntoIterator<Item = T>,
147 <I as IntoIterator>::IntoIter: Send + 'static,
148 {
149 let iter: Box<dyn Iterator<Item = T> + Send> = Box::new(iterable.into_iter());
150 let buffers: Vec<VecDeque<T>> = (0..n).map(|_| VecDeque::new()).collect();
151
152 let source = Arc::new(Mutex::new(TeeSource { iter, buffers }));
153
154 let children: Vec<TeeChild<T>> = (0..n)
155 .map(|index| TeeChild {
156 source: Arc::clone(&source),
157 index,
158 })
159 .collect();
160
161 Self { source, children }
162 }
163
164 pub fn len(&self) -> usize {
166 self.children.len()
167 }
168
169 pub fn is_empty(&self) -> bool {
171 self.children.is_empty()
172 }
173
174 pub fn get(&self, index: usize) -> Option<TeeChild<T>> {
176 if index < self.children.len() {
177 Some(TeeChild {
178 source: Arc::clone(&self.source),
179 index,
180 })
181 } else {
182 None
183 }
184 }
185
186 pub fn into_children(self) -> Vec<TeeChild<T>> {
188 self.children
189 }
190}
191
192pub struct TeeChild<T> {
194 source: Arc<Mutex<TeeSource<T>>>,
195 index: usize,
196}
197
198impl<T> Clone for TeeChild<T> {
199 fn clone(&self) -> Self {
200 Self {
201 source: Arc::clone(&self.source),
202 index: self.index,
203 }
204 }
205}
206
207impl<T: Clone> Iterator for TeeChild<T> {
208 type Item = T;
209
210 fn next(&mut self) -> Option<Self::Item> {
211 let mut source = self.source.lock().ok()?;
212
213 if let Some(item) = source.buffers.get_mut(self.index)?.pop_front() {
214 return Some(item);
215 }
216
217 if let Some(item) = source.iter.next() {
218 for (i, buffer) in source.buffers.iter_mut().enumerate() {
219 if i != self.index {
220 buffer.push_back(item.clone());
221 }
222 }
223 Some(item)
224 } else {
225 None
226 }
227 }
228}
229
230pub fn safetee<T, I>(iterable: I, n: usize) -> Tee<T>
232where
233 T: Clone,
234 I: IntoIterator<Item = T>,
235 <I as IntoIterator>::IntoIter: Send + 'static,
236{
237 tee(iterable, n)
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243
244 #[test]
245 fn test_batch_iterate() {
246 let items = vec![1, 2, 3, 4, 5];
247 let batches: Vec<Vec<i32>> = batch_iterate(Some(2), items).collect();
248 assert_eq!(batches, vec![vec![1, 2], vec![3, 4], vec![5]]);
249 }
250
251 #[test]
252 fn test_batch_iterate_exact() {
253 let items = vec![1, 2, 3, 4];
254 let batches: Vec<Vec<i32>> = batch_iterate(Some(2), items).collect();
255 assert_eq!(batches, vec![vec![1, 2], vec![3, 4]]);
256 }
257
258 #[test]
259 fn test_batch_iterate_empty() {
260 let items: Vec<i32> = vec![];
261 let batches: Vec<Vec<i32>> = batch_iterate(Some(2), items).collect();
262 assert!(batches.is_empty());
263 }
264
265 #[test]
266 fn test_batch_iterate_none_size() {
267 let items = vec![1, 2, 3, 4, 5];
268 let batches: Vec<Vec<i32>> = batch_iterate(None, items).collect();
269 assert_eq!(batches, vec![vec![1, 2, 3, 4, 5]]);
270 }
271
272 #[test]
273 fn test_tee_basic() {
274 let items = vec![1, 2, 3];
275 let t = tee(items, 2);
276
277 assert_eq!(t.len(), 2);
278
279 let children = t.into_children();
280 let first: Vec<i32> = children[0].clone().collect();
281 let second: Vec<i32> = children[1].clone().collect();
282
283 assert_eq!(first, vec![1, 2, 3]);
284 assert_eq!(second, vec![1, 2, 3]);
285 }
286
287 #[test]
288 fn test_tee_interleaved() {
289 let items = vec![1, 2, 3];
290 let t = tee(items, 2);
291 let mut children = t.into_children();
292
293 assert_eq!(children[0].next(), Some(1));
294 assert_eq!(children[1].next(), Some(1));
295 assert_eq!(children[0].next(), Some(2));
296 assert_eq!(children[0].next(), Some(3));
297 assert_eq!(children[1].next(), Some(2));
298 assert_eq!(children[1].next(), Some(3));
299 assert_eq!(children[0].next(), None);
300 assert_eq!(children[1].next(), None);
301 }
302
303 #[test]
304 fn test_no_lock() {
305 let lock = NoLock::new();
306 let _guard = lock.lock();
307 }
308}