extsort/
push.rs

1// Copyright 2018 Andre-Philippe Paquet
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    cmp::Ordering,
17    collections::VecDeque,
18    fs::{File, OpenOptions},
19    io::{BufWriter, Error},
20    path::PathBuf,
21};
22
23use rayon::slice::ParallelSliceMut;
24
25use crate::{ExternalSorterOptions, Sortable, SortedIterator};
26
27/// External sorter that uses a "push" pattern instead of consuming an iterator.
28///
29/// It is used internally by the normal pull iterator (`ExternalSorter`), but can
30/// also be used directly to sort items in a push pattern.
31pub struct PushExternalSorter<T, F>
32where
33    T: Sortable,
34    F: Fn(&T, &T) -> Ordering + Send + Sync + Clone,
35{
36    options: ExternalSorterOptions,
37    tempdir: Option<tempfile::TempDir>,
38    count: u64,
39    segment_files: Vec<File>,
40    buffer: Vec<T>,
41    cmp: F,
42}
43
44impl<T, F> PushExternalSorter<T, F>
45where
46    T: Sortable,
47    F: Fn(&T, &T) -> Ordering + Send + Sync + Clone,
48{
49    pub(crate) fn new(options: crate::ExternalSorterOptions, cmp: F) -> PushExternalSorter<T, F> {
50        PushExternalSorter {
51            options,
52            tempdir: None,
53            count: 0,
54            segment_files: Vec::new(),
55            buffer: Vec::new(),
56            cmp,
57        }
58    }
59
60    /// Pushes all items from an iterator into the sorter.
61    ///
62    /// This can be called multiple times to push more items into the sorter.
63    pub fn push_iter<I>(&mut self, iterator: I) -> Result<(), Error>
64    where
65        I: IntoIterator<Item = T>,
66    {
67        for next_item in iterator.into_iter() {
68            self.push(next_item)?;
69        }
70        Ok(())
71    }
72
73    /// Pushes a single item into the sorter.
74    pub fn push(&mut self, item: T) -> Result<(), Error> {
75        self.buffer.push(item);
76        self.count += 1;
77
78        if self.buffer.len() > self.options.segment_size {
79            self.sort_and_write_segment()?;
80        }
81
82        Ok(())
83    }
84
85    pub fn done(mut self) -> Result<SortedIterator<T, F>, Error> {
86        // Write any items left in the buffer, but only if we had at least 1 segment
87        // written. Otherwise, we use the buffer itself to iterate from memory.
88        let pass_through_queue = if !self.buffer.is_empty() && !self.segment_files.is_empty() {
89            self.sort_and_write_segment()?;
90            None
91        } else {
92            let cmp = self.cmp.clone();
93            self.buffer.sort_unstable_by(cmp);
94            Some(VecDeque::from(self.buffer))
95        };
96
97        SortedIterator::new(
98            self.tempdir,
99            pass_through_queue,
100            self.segment_files,
101            self.count,
102            self.cmp,
103            self.options.clone(),
104        )
105    }
106
107    fn sort_and_write_segment(&mut self) -> Result<(), Error> {
108        let cmp = self.cmp.clone();
109        if self.options.parallel {
110            self.buffer.par_sort_unstable_by(|a, b| cmp(a, b));
111        } else {
112            self.buffer.sort_unstable_by(|a, b| cmp(a, b));
113        }
114
115        let sort_dir = self.get_sort_dir()?;
116        let segment_path = sort_dir.join(format!("{}", self.segment_files.len()));
117        let segment_file = OpenOptions::new()
118            .create(true)
119            .truncate(true)
120            .read(true)
121            .write(true)
122            .open(segment_path)?;
123        let mut buf_writer = BufWriter::new(segment_file);
124
125        for item in self.buffer.drain(0..) {
126            item.encode(&mut buf_writer)?;
127        }
128
129        let file = buf_writer.into_inner()?;
130        self.segment_files.push(file);
131
132        Ok(())
133    }
134
135    /// We only want to create a directory if it's needed (i.e., if the dataset
136    /// doesn't fit in memory) to prevent filesystem latency.
137    fn get_sort_dir(&mut self) -> Result<PathBuf, Error> {
138        if let Some(sort_dir) = self.options.sort_dir.as_ref() {
139            return Ok(sort_dir.clone());
140        }
141
142        self.options.sort_dir = if let Some(ref sort_dir) = self.options.sort_dir {
143            Some(sort_dir.to_path_buf())
144        } else {
145            self.tempdir = Some(tempfile::TempDir::new()?);
146            Some(self.tempdir.as_ref().unwrap().path().to_path_buf())
147        };
148
149        Ok(self.options.sort_dir.as_ref().unwrap().clone())
150    }
151}