text_file_sort/
sort.rs

1use std::cell::RefCell;
2use std::cmp::{max, Reverse};
3use std::collections::BinaryHeap;
4use std::fs::File;
5use std::io::{BufRead, BufReader, BufWriter, Write};
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8use std::thread;
9
10use anyhow::{anyhow, Context};
11use command_executor::shutdown_mode::ShutdownMode;
12use command_executor::thread_pool::ThreadPool;
13use command_executor::thread_pool_builder::ThreadPoolBuilder;
14use rand::distributions::uniform::SampleBorrow;
15use regex::Regex;
16use rlimit::{getrlimit, Resource, setrlimit};
17use tempfile::{Builder, NamedTempFile};
18
19use crate::chunk_iterator::ChunkIterator;
20use crate::config::Config;
21use crate::field::Field;
22use crate::field_type::FieldType;
23use crate::line_record::LineRecord;
24use crate::order::Order;
25use crate::sort_command::SortCommand;
26use crate::sorted_chunk_file::SortedChunkFile;
27use crate::unmerged_chunk_file::UnmergedChunkFile;
28
29thread_local! {
30    pub(crate) static LINE_CAPACITY: RefCell<usize> = RefCell::new(1);
31    pub(crate) static LINE_RECORDS_CAPACITY: RefCell<usize> = RefCell::new(1);
32    pub(crate) static SORTED_FILES: RefCell<BinaryHeap<Reverse<SortedChunkFile>>> = RefCell::new(BinaryHeap::new());
33    pub(crate) static CONFIG: RefCell<Option<Config>> = RefCell::new(None);
34}
35
36pub(crate) fn get_line_capacity() -> usize {
37    LINE_CAPACITY.with(|capacity| *capacity.borrow().borrow())
38}
39
40pub(crate) fn set_line_capacity(value: usize) {
41    LINE_CAPACITY.with(|capacity| capacity.replace(value));
42}
43
44pub(crate) fn get_line_records_capacity() -> usize {
45    LINE_RECORDS_CAPACITY.with(|capacity| *capacity.borrow().borrow())
46}
47
48pub(crate) fn set_line_records_capacity(value: usize) {
49    LINE_RECORDS_CAPACITY.with(|capacity| capacity.replace(value));
50}
51
52pub(crate) fn get_tl_config() -> Config {
53    CONFIG.with(
54        |config| {
55            config.borrow().as_ref().unwrap().clone()
56        }
57    )
58}
59
60pub(crate) fn create_tmp_file(config: &Config) -> NamedTempFile {
61    Builder::new()
62        .prefix(config.tmp_prefix())
63        .suffix(config.tmp_suffix())
64        .tempfile_in(config.tmp())
65        .map_err(|e| anyhow!("Failed to create new temp file: {}", e.to_string()))
66        .unwrap()
67}
68
69/// Sort a text file with record like lines
70///
71/// # Examples
72/// ```
73/// use std::path::PathBuf;
74/// use text_file_sort::sort::Sort;
75///
76/// // parallel record sort
77/// fn sort_records(input: PathBuf, output: PathBuf, tmp: PathBuf) -> Result<(), anyhow::Error> {
78///    let mut text_file_sort = Sort::new(vec![input.clone()], output.clone());
79///     // set number of CPU cores the sort will attempt to use. When given the number that exceeds
80///     // the number of available CPU cores the work will be split among available cores with
81///     // somewhat degraded performance.
82///     text_file_sort.with_tasks(2);
83///     // set the directory for intermediate results. The default is the system temp dir -
84///     // std::env::temp_dir(), however, for large files it is recommended to provide a dedicated
85///     // directory for intermediate files, preferably on the same file system as the output result.
86///     text_file_sort.with_tmp_dir(tmp);
87///     text_file_sort.sort()
88/// }
89/// ```
90pub struct Sort {
91    input_files: Vec<PathBuf>,
92    output: PathBuf,
93    tmp: PathBuf,
94    tasks: usize,
95    field_separator: char,
96    ignore_empty: bool,
97    ignore_lines: Option<Regex>,
98    concurrent_merge: bool,
99    chunk_size_bytes: u64,
100    files: usize,
101    fields: Vec<Field>,
102    order: Order,
103    prefix: Vec<String>,
104    suffix: Vec<String>,
105    endl: char,
106}
107
108impl Sort {
109    /// Create a default Sort definition.
110    ///
111    /// A default Sort definition will use the system temporary
112    /// directory as defined by std::env::temp_dir().
113    /// * The default field separator is a TAB ('\t')
114    /// * The complete line will be considered as a single String field
115    /// * empty lines will be sorted lexicographically
116    /// * lines starting with '#' will be ignored
117    /// * max intermediate files is set to 1024.
118    /// * input is read in chunks of 10 MB bytes
119    /// * default Order is Asc
120    /// * prefix and suffix are empty
121    /// * default end lines is '\n'
122    ///
123    /// The Sort implementation will increase the file descriptor rlimit to accommodate configured
124    /// open files
125    pub fn new(input_files: Vec<PathBuf>, output: PathBuf) -> Sort {
126        Sort {
127            input_files,
128            output,
129            tmp: std::env::temp_dir(),
130            tasks: 0,
131            field_separator: '\t',
132            ignore_empty: false,
133            ignore_lines: Some(Regex::new("^#").unwrap()),
134            concurrent_merge: true,
135            chunk_size_bytes: 10_000_000,
136            files: 1024,
137            fields: vec![],
138            order: Order::Asc,
139            prefix: vec![],
140            suffix: vec![],
141            endl: '\n',
142        }
143    }
144
145    /// Set directory for intermediate files. By default use std::env::temp_dir()
146    /// It is recommended for large files to create a dedicated directory for intermediate files
147    /// on the same file system as the output target
148    pub fn with_tmp_dir(&mut self, tmp: PathBuf) {
149        self.tmp = tmp;
150    }
151
152    /// Set the number of tasks. The default is zero which will result in using all system cores
153    pub fn with_tasks(&mut self, tasks: usize) {
154        self.tasks = tasks;
155    }
156
157    /// Set the field separator. The default is '\t'
158    pub fn with_field_separator(&mut self, field_separator: char) {
159        self.field_separator = field_separator
160    }
161
162    /// Merge sorted files concurrently to reduce the number of files before the final merge
163    pub fn with_concurrent_merge(&mut self, concurrent_merge: bool) {
164        self.concurrent_merge = concurrent_merge
165    }
166
167    /// The input will be read in chunks of 'chunk_size_bytes' respecting line boundaries
168    pub fn with_chunk_size_bytes(&mut self, chunk_size_bytes: u64) {
169        self.chunk_size_bytes = chunk_size_bytes;
170    }
171
172    /// The input will be read in chunks of 'chunk_size_mb' MB respecting line boundaries
173    pub fn with_chunk_size_mb(&mut self, chunk_size_mb: u64) {
174        self.chunk_size_bytes = chunk_size_mb * 1_000_000;
175    }
176
177    /// Set the number of intermediate files. The default is 1024.
178    pub fn with_intermediate_files(&mut self, files: usize) {
179        self.files = files;
180    }
181
182    /// Direct the algorithm to ignore empty lines. The default is false
183    pub fn with_ignore_empty(&mut self) {
184        self.ignore_empty = true;
185    }
186
187    /// Specify which lines to ignore. Each line matching the regex will be ignored and will not
188    /// appear in the output.
189    pub fn with_ignore_lines(&mut self, r: Regex) {
190        self.ignore_lines = Some(r)
191    }
192
193    /// Add field specification. The default is to treat the complete line as a single String
194    /// field in the record
195    pub fn add_field(&mut self, field: Field) {
196        self.fields.push(field);
197    }
198
199    /// Replace all fields with the `fields` value.
200    pub fn with_fields(&mut self, fields: Vec<Field>) {
201        self.fields = fields
202    }
203
204    /// Set [Order]
205    pub fn with_order(&mut self, order: Order) {
206        self.order = order
207    }
208
209    /// Add file prefix. The provided prefix will be inserted at the beginning of the sorted file
210    pub fn add_prefix_line(&mut self, prefix_line: String) {
211        self.prefix.push(prefix_line);
212    }
213
214    /// Set prefix lines
215    pub fn with_prefix_lines(&mut self, prefix_lines: Vec<String>) {
216        self.prefix = prefix_lines;
217    }
218
219    /// Add file suffix. The provided suffix will be inserted at the end of the sorted file
220    pub fn add_suffix_line(&mut self, suffix_line: String) {
221        self.suffix.push(suffix_line);
222    }
223
224    /// Set suffix lines
225    pub fn with_suffix_lines(&mut self, suffix_lines: Vec<String>) {
226        self.suffix = suffix_lines;
227    }
228
229    /// Set line ending char - not supporting CRLF
230    pub fn with_endl(&mut self, endl: char) {
231        self.endl = endl
232    }
233
234    /// Sort input files or STDIN
235    pub fn sort(&self) -> Result<(), anyhow::Error> {
236        let config = self.create_config();
237        let (current_soft, current_hard) = Self::get_rlimits()?;
238        log::info!("Current rlimit NOFILE, soft: {}, hard: {}", current_soft, current_hard);
239        let new_soft = max((config.files() + 256) as u64, current_soft);
240        log::info!("Set new rlimit NOFILE, soft: {}, hard: {}", new_soft, current_hard);
241        Self::set_rlimits(new_soft, current_hard)?;
242        Self::internal_sort(&self.input_files, &config, &self.output)?;
243        log::info!("Restore rlimit NOFILE, soft: {}, hard: {}", current_soft, current_hard);
244        Self::set_rlimits(current_soft, current_hard)?;
245        Ok(())
246    }
247
248    fn get_rlimits() -> Result<(u64, u64), anyhow::Error> {
249        getrlimit(Resource::NOFILE).with_context(|| "getrlimit")
250    }
251
252    fn set_rlimits(soft: u64, hard: u64) -> Result<(), anyhow::Error> {
253        setrlimit(Resource::NOFILE, soft, hard)
254            .with_context(|| format!("set rlimit NOFILE, soft: {}, hard: {}", soft, hard))?;
255        Ok(())
256    }
257
258    fn create_config(&self) -> Config {
259        let fields = if self.fields.is_empty() {
260            vec![Field::new(0, FieldType::String)]
261        } else {
262            self.fields.clone()
263        };
264
265        let mut tasks = self.tasks;
266        if self.tasks == 0 {
267            tasks = num_cpus::get();
268        }
269
270        let mut files = tasks * 2;
271        if self.files > files {
272            files = self.files
273        }
274
275        Config::new(
276            self.tmp.clone(),
277            "part-".to_string(),
278            ".unmerged".to_string(),
279            tasks,
280            self.field_separator,
281            self.ignore_empty,
282            self.ignore_lines.clone(),
283            self.concurrent_merge,
284            self.chunk_size_bytes,
285            files,
286            fields,
287            self.order.clone(),
288            self.prefix.clone(),
289            self.suffix.clone(),
290            self.endl
291        )
292    }
293
294    fn merge_sorted_files(thread_pool: &ThreadPool) {
295        thread_pool.in_all_threads(
296            Arc::new(
297                || {
298                    SORTED_FILES.with(
299                        |sorted_files| {
300                            if sorted_files.borrow().len() > 1 {
301                                let mut intermediate = Vec::new();
302                                while sorted_files.borrow().len() > 0 {
303                                    let sorted_chunk_file = sorted_files.borrow_mut().pop().unwrap();
304                                    let path = sorted_chunk_file.0.path().clone();
305                                    intermediate.push(path);
306                                }
307                                let config = get_tl_config();
308                                let (path, size) = Self::internal_merge(intermediate, &config, true, false).expect("TODO: ");
309                                sorted_files
310                                    .borrow_mut()
311                                    .push(Reverse(SortedChunkFile::new(path, size)));
312                            }
313                        }
314                    );
315                }
316            )
317        );
318    }
319
320    fn collect_sorted_files(thread_pool: &mut ThreadPool) -> Vec<PathBuf> {
321        let result: Arc<Mutex<Vec<PathBuf>>> = Arc::new(Mutex::new(Vec::new()));
322        let result_clone = result.clone();
323        thread_pool.in_all_threads_mut(
324            Arc::new(
325                Mutex::new(
326                    move || {
327                        SORTED_FILES.with(
328                            |sorted_files| {
329                                log::info!("Start collecting thread intermediate results, thread: {}", thread::current().name().unwrap_or("unnamed"));
330                                let mut intermediate = Vec::new();
331                                while sorted_files.borrow().len() > 0 {
332                                    let sorted_chunk_file = sorted_files.borrow_mut().pop().unwrap();
333                                    let path = sorted_chunk_file.0.path().clone();
334                                    intermediate.push(path);
335                                }
336                                let mut result_guard = result_clone.lock().unwrap();
337                                result_guard.append(&mut intermediate);
338                                log::info!("Finish collecting thread intermediate results, thread: {}", thread::current().name().unwrap_or("unnamed"));
339                            }
340                        );
341                    }
342                )
343            )
344        );
345        let mut result_guard = result.lock().unwrap();
346        std::mem::take(result_guard.as_mut())
347    }
348
349    pub fn check(&self) -> Result<bool, anyhow::Error> {
350        let config = self.create_config();
351
352        let mut result = true;
353        for path in &self.input_files {
354            result = Self::internal_check(path, &config)?;
355            if !result {
356                break;
357            }
358        }
359        Ok(result)
360    }
361
362    pub(crate) fn internal_check(path: &PathBuf, config: &Config) -> Result<bool, anyhow::Error> {
363        let mut result = true;
364        let mut line = String::new();
365        let mut previous: Option<LineRecord> = None;
366        let mut reader = BufReader::new(File::open(path)?);
367        while reader.read_line(&mut line)? != 0 {
368            if config.ignore_empty() && line.trim().is_empty() {
369                continue;
370            }
371
372            if let Some(r) = config.ignore_lines() {
373                if r.is_match(line.trim()) {
374                    continue;
375                }
376            }
377            let current_line_record = LineRecord::new(
378                line,
379                config.fields(),
380                config.field_separator(),
381                config.order().clone(),
382            )?;
383
384            match previous {
385                None => {
386                    previous = Some(current_line_record);
387                }
388                Some(previous_line_record) => {
389                    if previous_line_record <= current_line_record {
390                        previous = Some(current_line_record);
391                    } else {
392                        result = false;
393                        break;
394                    }
395                }
396            }
397            line = String::new();
398        }
399        Ok(result)
400    }
401
402    pub fn merge(&self) -> Result<(), anyhow::Error> {
403        let config = self.create_config();
404        let (current_soft, current_hard) = Self::get_rlimits()?;
405        log::info!("Current rlimit NOFILE, soft: {}, hard: {}", current_soft, current_hard);
406        let new_soft = max((config.files() + 256) as u64, current_soft);
407        log::info!("Set new rlimit NOFILE, soft: {}, hard: {}", new_soft, current_hard);
408        Self::set_rlimits(new_soft, current_hard)?;
409        let (path, _lines) = Self::internal_merge(self.input_files.clone(), &config, false, true)?;
410        std::fs::rename(path.clone(), &self.output)
411            .with_context(|| anyhow!("Rename {} to {}", path.display(), self.output.display()))?;
412        log::info!("Restore rlimit NOFILE, soft: {}, hard: {}", current_soft, current_hard);
413        Self::set_rlimits(current_soft, current_hard)?;
414        Ok(())
415    }
416
417    pub(crate) fn internal_merge(files: Vec<PathBuf>, config: &Config, remove_merged: bool, add_prefix_suffix: bool) -> Result<(PathBuf, usize), anyhow::Error> {
418        log::info!("Merging {} sorted files, thread: {}", files.len(), thread::current().name().unwrap_or("unnamed"));
419        let mut merged_len: usize = 0;
420        let merged_file = create_tmp_file(config);
421        let (persisted_merged_file, path) = merged_file.keep()?;
422        let mut merged_writer = BufWriter::new(persisted_merged_file);
423        if add_prefix_suffix {
424            for prefix in config.prefix() {
425                writeln!(merged_writer, "{}", prefix)?;
426                merged_len += 1;
427            }
428        }
429
430        if files.len() == 1 {
431            let file = File::open(files[0].clone()).with_context(|| format!("path: {}", files[0].display()))?;
432            let mut reader = BufReader::new(file);
433            let mut line = String::new();
434
435            while reader.read_line(&mut line)? > 0 {
436                merged_writer.write_all(line.as_bytes())?;
437                line = String::new();
438                merged_len += 1;
439            }
440            std::fs::remove_file(files[0].clone())?;
441        } else {
442            let mut unmerged_files: BinaryHeap<UnmergedChunkFile> = files.into_iter()
443                .map(
444                    |path| UnmergedChunkFile::new(
445                        path,
446                        config.fields(),
447                        config.field_separator(),
448                        config.order().clone(),
449                    )
450                        .unwrap()
451                )
452                .collect();
453            while unmerged_files.len() > 1 {
454                let mut current_min = unmerged_files.pop().unwrap();
455                let unmerged_min = unmerged_files.peek().unwrap();
456
457                let mut current_min_done = false;
458                // comparison operators are flipped to work with BinaryHeap (Max Heap)
459                while &current_min >= unmerged_min {
460                    if let Some(line_record) = current_min.line_record() {
461                        let line = line_record.line();
462                        merged_writer.write_all(line.as_bytes())?;
463                        merged_len += 1;
464                    } else {
465                        current_min_done = true;
466                        if remove_merged {
467                            std::fs::remove_file(current_min.path())?;
468                        }
469                        break;
470                    }
471                }
472                if !current_min_done {
473                    unmerged_files.push(current_min)
474                }
475            }
476            let mut current_min = unmerged_files.pop().unwrap();
477            loop {
478                if let Some(line_record) = current_min.line_record() {
479                    let line = line_record.line();
480                    merged_writer.write_all(line.as_bytes())?;
481                    merged_len += 1;
482                } else {
483                    std::fs::remove_file(current_min.path())?;
484                    break;
485                }
486            }
487
488            log::info!("Finished merging sorted files, thread: {}, merged length: {} lines", thread::current().name().unwrap_or("unnamed"), merged_len);
489        }
490        if add_prefix_suffix {
491            for suffix in config.suffix() {
492                writeln!(merged_writer, "{}", suffix)?;
493                merged_len += 1;
494            }
495        }
496        Ok((path, merged_len))
497    }
498
499    fn internal_sort(input_files: &Vec<PathBuf>, config: &Config, output: &Path) -> Result<(), anyhow::Error> {
500        log::info!("Start parallel sort");
501        let mut thread_pool_builder = ThreadPoolBuilder::new();
502        let mut sorting_pool = thread_pool_builder
503            .with_name("sorting".to_string())
504            .with_tasks(config.tasks())
505            .with_queue_size(config.queue_size())
506            .with_shutdown_mode(ShutdownMode::CompletePending)
507            .build()
508            .unwrap();
509
510        sorting_pool.set_thread_local(&CONFIG, Some(config.clone()));
511
512        for path in input_files {
513            for chunk in ChunkIterator::new(path, config.chunk_size_bytes(), config.endl()).unwrap() {
514                let sort_command = Box::new(SortCommand::new(Some(chunk)));
515                sorting_pool.submit(sort_command);
516            }
517        }
518
519        if config.concurrent_merge() {
520            Self::merge_sorted_files(&sorting_pool);
521        }
522
523        let sorted_files = Self::collect_sorted_files(&mut sorting_pool);
524        log::info!("Shutting down sorting pool");
525        sorting_pool.shutdown();
526        sorting_pool.join()?;
527
528        let (path, _lines) = Self::internal_merge(sorted_files, config, true, true)?;
529
530        std::fs::rename(path.clone(), output)
531            .with_context(|| anyhow!("Rename {} to {}", path.display(), output.display()))?;
532        log::info!("Finish parallel sort");
533        Ok(())
534    }
535}