1use std::{
16 cmp::Ordering,
17 collections::VecDeque,
18 fs::{File, OpenOptions},
19 io::{BufWriter, Error},
20 path::PathBuf,
21};
22
23use rayon::slice::ParallelSliceMut;
24
25use crate::{ExternalSorterOptions, Sortable, SortedIterator};
26
27pub struct PushExternalSorter<T, F>
32where
33 T: Sortable,
34 F: Fn(&T, &T) -> Ordering + Send + Sync + Clone,
35{
36 options: ExternalSorterOptions,
37 tempdir: Option<tempfile::TempDir>,
38 count: u64,
39 segment_files: Vec<File>,
40 buffer: Vec<T>,
41 cmp: F,
42}
43
44impl<T, F> PushExternalSorter<T, F>
45where
46 T: Sortable,
47 F: Fn(&T, &T) -> Ordering + Send + Sync + Clone,
48{
49 pub(crate) fn new(options: crate::ExternalSorterOptions, cmp: F) -> PushExternalSorter<T, F> {
50 PushExternalSorter {
51 options,
52 tempdir: None,
53 count: 0,
54 segment_files: Vec::new(),
55 buffer: Vec::new(),
56 cmp,
57 }
58 }
59
60 pub fn push_iter<I>(&mut self, iterator: I) -> Result<(), Error>
64 where
65 I: IntoIterator<Item = T>,
66 {
67 for next_item in iterator.into_iter() {
68 self.push(next_item)?;
69 }
70 Ok(())
71 }
72
73 pub fn push(&mut self, item: T) -> Result<(), Error> {
75 self.buffer.push(item);
76 self.count += 1;
77
78 if self.buffer.len() > self.options.segment_size {
79 self.sort_and_write_segment()?;
80 }
81
82 Ok(())
83 }
84
85 pub fn done(mut self) -> Result<SortedIterator<T, F>, Error> {
86 let pass_through_queue = if !self.buffer.is_empty() && !self.segment_files.is_empty() {
89 self.sort_and_write_segment()?;
90 None
91 } else {
92 let cmp = self.cmp.clone();
93 self.buffer.sort_unstable_by(cmp);
94 Some(VecDeque::from(self.buffer))
95 };
96
97 SortedIterator::new(
98 self.tempdir,
99 pass_through_queue,
100 self.segment_files,
101 self.count,
102 self.cmp,
103 self.options.clone(),
104 )
105 }
106
107 fn sort_and_write_segment(&mut self) -> Result<(), Error> {
108 let cmp = self.cmp.clone();
109 if self.options.parallel {
110 self.buffer.par_sort_unstable_by(|a, b| cmp(a, b));
111 } else {
112 self.buffer.sort_unstable_by(|a, b| cmp(a, b));
113 }
114
115 let sort_dir = self.get_sort_dir()?;
116 let segment_path = sort_dir.join(format!("{}", self.segment_files.len()));
117 let segment_file = OpenOptions::new()
118 .create(true)
119 .truncate(true)
120 .read(true)
121 .write(true)
122 .open(segment_path)?;
123 let mut buf_writer = BufWriter::new(segment_file);
124
125 for item in self.buffer.drain(0..) {
126 item.encode(&mut buf_writer)?;
127 }
128
129 let file = buf_writer.into_inner()?;
130 self.segment_files.push(file);
131
132 Ok(())
133 }
134
135 fn get_sort_dir(&mut self) -> Result<PathBuf, Error> {
138 if let Some(sort_dir) = self.options.sort_dir.as_ref() {
139 return Ok(sort_dir.clone());
140 }
141
142 self.options.sort_dir = if let Some(ref sort_dir) = self.options.sort_dir {
143 Some(sort_dir.to_path_buf())
144 } else {
145 self.tempdir = Some(tempfile::TempDir::new()?);
146 Some(self.tempdir.as_ref().unwrap().path().to_path_buf())
147 };
148
149 Ok(self.options.sort_dir.as_ref().unwrap().clone())
150 }
151}