extsort_iter/extension_trait/
parallel.rs

1use std::{cmp::Ordering, io};
2
3use rayon::slice::ParallelSliceMut;
4
5use crate::{
6    orderer::{FuncOrderer, KeyOrderer, OrdOrderer, Orderer},
7    sorter::{
8        self, buffer_cleaner::threaded::MultithreadedBufferCleaner, result_iter::ResultIterator,
9        ExtsortConfig,
10    },
11};
12
13/// The specific iterator type returned by
14/// the parallel sorting implementations.
15pub struct ParallelResultIterator<T, O> {
16    inner: ResultIterator<T, O>,
17}
18
19impl<T, O> Iterator for ParallelResultIterator<T, O>
20where
21    O: Orderer<T>,
22{
23    type Item = T;
24
25    fn next(&mut self) -> Option<T> {
26        self.inner.next()
27    }
28
29    fn size_hint(&self) -> (usize, Option<usize>) {
30        self.inner.size_hint()
31    }
32}
33impl<T, O> ExactSizeIterator for ParallelResultIterator<T, O> where O: Orderer<T> {}
34
35fn buffer_sort<T, O>(orderer: &O, buffer: &mut [T])
36where
37    T: Send,
38    O: Orderer<T> + Sync,
39{
40    buffer.par_sort_unstable_by(|a, b| orderer.compare(a, b));
41}
42
43fn run<T, O>(
44    source: impl Iterator<Item = T>,
45    options: ExtsortConfig,
46    orderer: O,
47) -> io::Result<ParallelResultIterator<T, O>>
48where
49    O: Orderer<T> + Send + Sync,
50    T: Send,
51{
52    let cleaner = MultithreadedBufferCleaner::new(options, orderer, buffer_sort);
53    cleaner.run(move |cleaner_handle| {
54        let inner = sorter::ExtSorter::new().run(source, cleaner_handle)?;
55        Ok(ParallelResultIterator { inner })
56    })
57}
58
59pub trait ParallelExtSortOrdExtension: Iterator
60where
61    Self::Item: Send,
62{
63    /// Sorts the provided Iterator according to the provided config
64    /// the native ordering specified on the iterated type.
65    /// # Errors
66    /// This function may error if a sort file fails to be written.
67    /// In this case the library will do its best to clean up the
68    /// already written files, but no guarantee is made.
69    fn par_external_sort(
70        self,
71        options: ExtsortConfig,
72    ) -> io::Result<ParallelResultIterator<Self::Item, OrdOrderer>>;
73}
74
75pub trait ParallelExtSortExtension: Iterator
76where
77    Self::Item: Send,
78{
79    /// Sorts the provided Iterator according to the provided config
80    /// using a custom comparison function.
81    /// # Errors
82    /// This function may error if a sort file fails to be written.
83    /// In this case the library will do its best to clean up the
84    /// already written files, but no guarantee is made.
85    fn par_external_sort_by<F>(
86        self,
87        options: ExtsortConfig,
88        comparator: F,
89    ) -> io::Result<ParallelResultIterator<Self::Item, FuncOrderer<F>>>
90    where
91        F: Fn(&Self::Item, &Self::Item) -> Ordering + Send + Sync;
92
93    /// Sorts the provided Iterator according to the provided config
94    /// using a key extraction function.
95    /// # Errors
96    /// This function may error if a sort file fails to be written.
97    /// In this case the library will do its best to clean up the
98    /// already written files, but no guarantee is made.
99    fn par_external_sort_by_key<F, K>(
100        self,
101        options: ExtsortConfig,
102        key_extractor: F,
103    ) -> io::Result<ParallelResultIterator<Self::Item, KeyOrderer<F>>>
104    where
105        F: Fn(&Self::Item) -> K + Send + Sync,
106        K: Ord;
107}
108
109impl<I, T> ParallelExtSortOrdExtension for I
110where
111    I: Iterator<Item = T>,
112    T: Send + Ord,
113{
114    fn par_external_sort(
115        self,
116        options: ExtsortConfig,
117    ) -> io::Result<ParallelResultIterator<Self::Item, OrdOrderer>> {
118        run(self, options, OrdOrderer::new())
119    }
120}
121
122impl<I, T> ParallelExtSortExtension for I
123where
124    I: Iterator<Item = T>,
125    T: Send,
126{
127    fn par_external_sort_by<F>(
128        self,
129        options: ExtsortConfig,
130        comparator: F,
131    ) -> io::Result<ParallelResultIterator<Self::Item, FuncOrderer<F>>>
132    where
133        F: Fn(&Self::Item, &Self::Item) -> Ordering + Send + Sync,
134    {
135        run(self, options, FuncOrderer::new(comparator))
136    }
137
138    fn par_external_sort_by_key<F, K>(
139        self,
140        options: ExtsortConfig,
141        key_extractor: F,
142    ) -> io::Result<ParallelResultIterator<Self::Item, KeyOrderer<F>>>
143    where
144        F: Fn(&Self::Item) -> K + Send + Sync,
145        K: Ord,
146    {
147        run(self, options, KeyOrderer::new(key_extractor))
148    }
149}