1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use std::sync::mpsc::{channel, Receiver};
use threadpool::{Builder, ThreadPool};

pub trait ThreadedMappable<F>
where
    Self: Iterator,
    F: FnOnce(<Self as Iterator>::Item) -> <Self::Iter as Iterator>::Item + Send + Clone,
    <Self as Iterator>::Item: Send,
    Self::Iter: Iterator,
    <Self::Iter as Iterator>::Item: Send + Sync,
{
    type Iter;

    /// Maps items of an iterator in parallel while conserving their order
    /// # Examples
    /// ```
    /// use threaded_map::ThreadedMappable;
    /// let items = vec![1, 2, 3, 4, 5, 6];
    /// let target: Vec<_> = items.iter().map(i32::to_string).collect();
    ///
    /// let result: Vec<_> = items
    ///     .into_iter()
    ///     .parallel_map(|item| item.to_string(), None)
    ///     .collect();
    ///
    /// assert_eq!(result, target);
    /// ```
    fn parallel_map(self, f: F, num_threads: Option<usize>) -> Self::Iter;
}

#[derive(Debug)]
pub struct ThreadedMap<I, F, O>
where
    I: Iterator,
    F: FnOnce(<I as Iterator>::Item) -> O + 'static,
    <I as Iterator>::Item: 'static,
    O: Send + 'static,
{
    iterator: I,
    function: F,
    thread_pool: ThreadPool,
    window: Vec<O>,
}

impl<I, F, O> ThreadedMap<I, F, O>
where
    I: Iterator,
    F: FnOnce(<I as Iterator>::Item) -> O + Send + Clone,
    <I as Iterator>::Item: Send,
    O: Send + Sync,
{
    pub fn new(iterator: I, function: F, num_threads: Option<usize>) -> Self {
        Self {
            iterator,
            function,
            thread_pool: num_threads.map_or_else(|| Builder::new().build(), ThreadPool::new),
            window: Vec::new(),
        }
    }

    fn send_items(&mut self) -> Receiver<(usize, O)> {
        let (tx, rx) = channel::<(usize, O)>();

        for (index, item) in (0..self.thread_pool.max_count())
            .map_while(|_| self.iterator.next())
            .enumerate()
        {
            let tx = tx.clone();
            let f = self.function.clone();
            self.thread_pool.execute(move || {
                tx.send((index, (f)(item)))
                    .expect("channel will be there waiting for the pool");
            });
        }

        rx
    }
}

impl<I, F, O> Iterator for ThreadedMap<I, F, O>
where
    I: Iterator,
    F: FnOnce(<I as Iterator>::Item) -> O + Send + Clone,
    <I as Iterator>::Item: Send,
    O: Send + Sync,
{
    type Item = O;

    fn next(&mut self) -> Option<Self::Item> {
        if let Some(item) = self.window.pop() {
            return Some(item);
        }

        let rx = self.send_items();
        let mut window: Vec<_> = rx.iter().collect();

        if window.is_empty() {
            return None;
        }

        window.sort_by(|(lhs, _), (rhs, _)| rhs.cmp(lhs));
        self.window = window.into_iter().map(|(_, item)| item).collect();
        self.window.pop()
    }
}

impl<I, F, O> ThreadedMappable<F> for I
where
    I: Iterator,
    F: FnOnce(<I as Iterator>::Item) -> O + Clone + Send + 'static,
    <I as Iterator>::Item: Send + 'static,
    O: Send + Sync + 'static,
{
    type Iter = ThreadedMap<Self, F, O>;

    fn parallel_map(self, f: F, num_threads: Option<usize>) -> Self::Iter {
        ThreadedMap::new(self, f, num_threads)
    }
}