extsort_iter/sorter/
mod.rs1use std::{
2 io::{self},
3 num::NonZeroUsize,
4 path::PathBuf,
5};
6
7use crate::{
8 orderer::Orderer, run::file_run::create_buffer_run, sorter::buffer_cleaner::BufferCleaner,
9 tape::compressor::CompressionCodec,
10};
11
12use self::result_iter::ResultIterator;
13
14pub mod buffer_cleaner;
15pub mod result_iter;
16
17#[non_exhaustive]
19pub struct ExtsortConfig {
20 pub(crate) sort_buffer_size_bytes: usize,
22 pub temp_file_folder: PathBuf,
23 #[cfg(feature = "compression")]
24 pub compress_with: CompressionCodec,
25}
26
27impl Default for ExtsortConfig {
28 fn default() -> Self {
29 Self {
30 sort_buffer_size_bytes: 10_000_000,
31 temp_file_folder: PathBuf::from("/tmp"),
32 #[cfg(feature = "compression")]
33 compress_with: Default::default(),
34 }
35 }
36}
37
38impl ExtsortConfig {
39 fn get_num_items_for<T>(&self) -> NonZeroUsize {
40 let t_size = std::mem::size_of::<T>();
41
42 let one = NonZeroUsize::new(1).unwrap();
43
44 if t_size == 0 {
45 one
46 } else {
47 NonZeroUsize::new(self.sort_buffer_size_bytes / t_size).unwrap_or(one)
48 }
49 }
50
51 pub fn new() -> Self {
57 Default::default()
58 }
59
60 pub fn with_buffer_size(sort_buf_bytes: usize) -> Self {
63 ExtsortConfig {
64 sort_buffer_size_bytes: sort_buf_bytes,
65 ..Default::default()
66 }
67 }
68
69 #[deprecated = "Use new() or the Default impl instead. These do not require a type annotation"]
72 pub fn create_with_buffer_size_for<T>(sort_buf_bytes: usize) -> Self {
73 ExtsortConfig {
74 sort_buffer_size_bytes: sort_buf_bytes,
75 ..Default::default()
76 }
77 }
78 #[deprecated = "Use new() or the Default impl instead. These do not require a type annotation"]
81 pub fn default_for<T>() -> Self {
82 Default::default()
83 }
84 pub fn temp_file_folder(self, folder: impl Into<PathBuf>) -> Self {
87 Self {
88 temp_file_folder: folder.into(),
89 ..self
90 }
91 }
92 #[cfg(feature = "compression_lz4_flex")]
93 pub fn compress_lz4_flex(mut self) -> Self {
94 self.compress_with = CompressionCodec::Lz4Flex;
95 self
96 }
97
98 pub fn sort_buffer_size(mut self, new_size: usize) -> Self {
100 self.sort_buffer_size_bytes = new_size;
101 self
102 }
103
104 fn compression_choice(&self) -> CompressionCodec {
105 #[cfg(feature = "compression")]
106 {
107 self.compress_with
108 }
109 #[cfg(not(feature = "compression"))]
110 {
111 CompressionCodec::NoCompression
112 }
113 }
114}
115
116pub struct ExtSorter {}
117
118impl ExtSorter {
119 pub fn new() -> Self {
120 Self {}
121 }
122
123 pub fn run<'a, S, T, C, O, F>(
124 self,
125 mut source: S,
126 mut buffer_cleaner: C,
127 ) -> io::Result<ResultIterator<T, O>>
128 where
129 S: Iterator<Item = T>,
130 T: 'a,
131 C: BufferCleaner<T, O, F>,
132 F: FnMut(&O, &mut [T]),
133 O: Orderer<T>,
134 {
135 let mut sort_buffer = buffer_cleaner.get_buffer();
136
137 let source = &mut source;
138 let mut any_buffer_was_flushed = false;
139 loop {
140 debug_assert!(sort_buffer.is_empty());
141 let capacity = sort_buffer.capacity();
142
143 sort_buffer.extend(source.take(capacity));
144 if sort_buffer.len() < capacity {
145 if !any_buffer_was_flushed {
149 let mut finalize_response = buffer_cleaner.finalize()?;
153 let orderer = finalize_response.orderer;
154 (finalize_response.sort_func)(&orderer, &mut sort_buffer);
155 let buffer_run = create_buffer_run(sort_buffer);
156 return Ok(ResultIterator::new(vec![buffer_run], orderer));
157 } else if !sort_buffer.is_empty() {
158 buffer_cleaner.clean_buffer(&mut sort_buffer)?;
161 }
162 break;
163 } else {
164 buffer_cleaner.clean_buffer(&mut sort_buffer)?;
165 any_buffer_was_flushed = true;
166 }
167 }
168
169 debug_assert!(sort_buffer.is_empty());
171 drop(sort_buffer);
176
177 let finalize_response = buffer_cleaner.finalize()?;
179 Ok(ResultIterator::new(
180 finalize_response.tapes,
181 finalize_response.orderer,
182 ))
183 }
184}