orx_concurrent_iter/chain/
con_iter_unknown_len_i.rs

1use crate::{
2    ConcurrentIter, ExactSizeConcurrentIter,
3    chain::chunk_puller_unknown_len_i::ChainedChunkPullerUnknownLenI,
4};
5use core::sync::atomic::{AtomicUsize, Ordering};
6
7/// Chain of two concurrent iterators where the length of the first iterator is not
8/// known with certainly; i.e., `I` does not implement `ExactSizeConcurrentIter`.
9pub struct ChainUnknownLenI<I, J>
10where
11    I: ConcurrentIter,
12    J: ConcurrentIter<Item = I::Item>,
13{
14    pub(super) i: I,
15    pub(super) j: J,
16    pub(super) num_pulled_i: AtomicUsize,
17}
18
19impl<I, J> ChainUnknownLenI<I, J>
20where
21    I: ConcurrentIter,
22    J: ConcurrentIter<Item = I::Item>,
23{
24    pub(crate) fn new(i: I, j: J) -> Self {
25        Self {
26            i,
27            j,
28            num_pulled_i: 0.into(),
29        }
30    }
31
32    #[inline(always)]
33    pub(super) fn num_pulled_i(&self) -> usize {
34        self.num_pulled_i.load(Ordering::SeqCst)
35    }
36}
37
38impl<I, J> ConcurrentIter for ChainUnknownLenI<I, J>
39where
40    I: ConcurrentIter,
41    J: ConcurrentIter<Item = I::Item>,
42{
43    type Item = I::Item;
44
45    type SequentialIter = core::iter::Chain<I::SequentialIter, J::SequentialIter>;
46
47    type ChunkPuller<'i>
48        = ChainedChunkPullerUnknownLenI<'i, I, J>
49    where
50        Self: 'i;
51
52    fn into_seq_iter(self) -> Self::SequentialIter {
53        self.i.into_seq_iter().chain(self.j.into_seq_iter())
54    }
55
56    fn skip_to_end(&self) {
57        self.i.skip_to_end();
58        self.j.skip_to_end();
59    }
60
61    fn next(&self) -> Option<Self::Item> {
62        match self.i.next() {
63            Some(x) => {
64                _ = self.num_pulled_i.fetch_add(1, Ordering::SeqCst);
65                Some(x)
66            }
67            None => self.j.next(),
68        }
69    }
70
71    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
72        match self.i.next_with_idx() {
73            Some((idx, x)) => {
74                _ = self.num_pulled_i.fetch_add(1, Ordering::SeqCst);
75                Some((idx, x))
76            }
77            None => self
78                .j
79                .next_with_idx()
80                .map(|(idx, x)| (self.num_pulled_i() + idx, x)),
81        }
82    }
83
84    fn size_hint(&self) -> (usize, Option<usize>) {
85        let (l1, u1) = self.i.size_hint();
86        let (l2, u2) = self.j.size_hint();
87        match (u1, u2) {
88            (Some(u1), Some(u2)) => (l1 + l2, Some(u1 + u2)),
89            _ => (l1 + l2, None),
90        }
91    }
92
93    fn is_completed_when_none_returned(&self) -> bool {
94        true
95    }
96
97    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
98        ChainedChunkPullerUnknownLenI::new(self, chunk_size)
99    }
100}
101
102impl<I, J> ExactSizeConcurrentIter for ChainUnknownLenI<I, J>
103where
104    I: ExactSizeConcurrentIter,
105    J: ExactSizeConcurrentIter<Item = I::Item>,
106{
107    fn len(&self) -> usize {
108        self.i.len() + self.j.len()
109    }
110}