extsort_iter/sorter/
mod.rs

1use std::{
2    io::{self},
3    num::NonZeroUsize,
4    path::PathBuf,
5};
6
7use crate::{
8    orderer::Orderer, run::file_run::create_buffer_run, sorter::buffer_cleaner::BufferCleaner,
9    tape::compressor::CompressionCodec,
10};
11
12use self::result_iter::ResultIterator;
13
14pub mod buffer_cleaner;
15pub mod result_iter;
16
17/// The configuration for the external sorting.
18#[non_exhaustive]
19pub struct ExtsortConfig {
20    /// the maximum size of the sort buffer
21    pub(crate) sort_buffer_size_bytes: usize,
22    pub temp_file_folder: PathBuf,
23    #[cfg(feature = "compression")]
24    pub compress_with: CompressionCodec,
25}
26
27impl Default for ExtsortConfig {
28    fn default() -> Self {
29        Self {
30            sort_buffer_size_bytes: 10_000_000,
31            temp_file_folder: PathBuf::from("/tmp"),
32            #[cfg(feature = "compression")]
33            compress_with: Default::default(),
34        }
35    }
36}
37
38impl ExtsortConfig {
39    fn get_num_items_for<T>(&self) -> NonZeroUsize {
40        let t_size = std::mem::size_of::<T>();
41
42        let one = NonZeroUsize::new(1).unwrap();
43
44        if t_size == 0 {
45            one
46        } else {
47            NonZeroUsize::new(self.sort_buffer_size_bytes / t_size).unwrap_or(one)
48        }
49    }
50
51    /// Creates a configuration with a sort buffer size of 10M
52    /// and a sort directory of /tmp
53    ///
54    /// It is recommended to increase the sort buffer size
55    /// for improved performance.
56    pub fn new() -> Self {
57        Default::default()
58    }
59
60    /// Creates a configuration with a specified sort buffer size in bytes
61    /// and a sort directory of /tmp
62    pub fn with_buffer_size(sort_buf_bytes: usize) -> Self {
63        ExtsortConfig {
64            sort_buffer_size_bytes: sort_buf_bytes,
65            ..Default::default()
66        }
67    }
68
69    /// Creates a configuration with a specified sort buffer size in bytes
70    /// and a sort directory of /tmp
71    #[deprecated = "Use new() or the Default impl instead. These do not require a type annotation"]
72    pub fn create_with_buffer_size_for<T>(sort_buf_bytes: usize) -> Self {
73        ExtsortConfig {
74            sort_buffer_size_bytes: sort_buf_bytes,
75            ..Default::default()
76        }
77    }
78    /// Creates a configuration with a sort buffer size of 10M
79    /// and a sort directory of /tmp
80    #[deprecated = "Use new() or the Default impl instead. These do not require a type annotation"]
81    pub fn default_for<T>() -> Self {
82        Default::default()
83    }
84    /// Updates the temp_file_folder attribute.
85    /// Useful for fluent-style api usage.
86    pub fn temp_file_folder(self, folder: impl Into<PathBuf>) -> Self {
87        Self {
88            temp_file_folder: folder.into(),
89            ..self
90        }
91    }
92    #[cfg(feature = "compression_lz4_flex")]
93    pub fn compress_lz4_flex(mut self) -> Self {
94        self.compress_with = CompressionCodec::Lz4Flex;
95        self
96    }
97
98    /// sets the sort buffer size in bytes
99    pub fn sort_buffer_size(mut self, new_size: usize) -> Self {
100        self.sort_buffer_size_bytes = new_size;
101        self
102    }
103
104    fn compression_choice(&self) -> CompressionCodec {
105        #[cfg(feature = "compression")]
106        {
107            self.compress_with
108        }
109        #[cfg(not(feature = "compression"))]
110        {
111            CompressionCodec::NoCompression
112        }
113    }
114}
115
116pub struct ExtSorter {}
117
118impl ExtSorter {
119    pub fn new() -> Self {
120        Self {}
121    }
122
123    pub fn run<'a, S, T, C, O, F>(
124        self,
125        mut source: S,
126        mut buffer_cleaner: C,
127    ) -> io::Result<ResultIterator<T, O>>
128    where
129        S: Iterator<Item = T>,
130        T: 'a,
131        C: BufferCleaner<T, O, F>,
132        F: FnMut(&O, &mut [T]),
133        O: Orderer<T>,
134    {
135        let mut sort_buffer = buffer_cleaner.get_buffer();
136
137        let source = &mut source;
138        let mut any_buffer_was_flushed = false;
139        loop {
140            debug_assert!(sort_buffer.is_empty());
141            let capacity = sort_buffer.capacity();
142
143            sort_buffer.extend(source.take(capacity));
144            if sort_buffer.len() < capacity {
145                // we could not completely fill the buffer, so we know that this
146                // is the last run that will be generated.
147
148                if !any_buffer_was_flushed {
149                    // we did not acually move anything to disk.
150                    // in this case we can just reuse the sort buffer
151                    // as a sort of pseudo tape.
152                    let mut finalize_response = buffer_cleaner.finalize()?;
153                    let orderer = finalize_response.orderer;
154                    (finalize_response.sort_func)(&orderer, &mut sort_buffer);
155                    let buffer_run = create_buffer_run(sort_buffer);
156                    return Ok(ResultIterator::new(vec![buffer_run], orderer));
157                } else if !sort_buffer.is_empty() {
158                    // since we moved runs to disk, we will need to use memory for the read buffers.
159                    // to avoid going over budget, we move the final run to disk as well
160                    buffer_cleaner.clean_buffer(&mut sort_buffer)?;
161                }
162                break;
163            } else {
164                buffer_cleaner.clean_buffer(&mut sort_buffer)?;
165                any_buffer_was_flushed = true;
166            }
167        }
168
169        // at this point, we must have moved runs to disk, including the final sort buffer.
170        debug_assert!(sort_buffer.is_empty());
171        // it should not be necessary to manually drop the buffer here, but it sure does
172        // not hurt and this way we are guaranteed to have released the memory
173        // before initializing the tapes, even on compiler versions that do not
174        // implement NLL yet.
175        drop(sort_buffer);
176
177        // wait for the io thread to be done writing and get the file handles back to the main thread
178        let finalize_response = buffer_cleaner.finalize()?;
179        Ok(ResultIterator::new(
180            finalize_response.tapes,
181            finalize_response.orderer,
182        ))
183    }
184}