1use std::cell::RefCell;
2use std::cmp::{max, Reverse};
3use std::collections::BinaryHeap;
4use std::fs::File;
5use std::io::{BufRead, BufReader, BufWriter, Write};
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8use std::thread;
9
10use anyhow::{anyhow, Context};
11use command_executor::shutdown_mode::ShutdownMode;
12use command_executor::thread_pool::ThreadPool;
13use command_executor::thread_pool_builder::ThreadPoolBuilder;
14use rand::distributions::uniform::SampleBorrow;
15use regex::Regex;
16use rlimit::{getrlimit, Resource, setrlimit};
17use tempfile::{Builder, NamedTempFile};
18
19use crate::chunk_iterator::ChunkIterator;
20use crate::config::Config;
21use crate::field::Field;
22use crate::field_type::FieldType;
23use crate::line_record::LineRecord;
24use crate::order::Order;
25use crate::sort_command::SortCommand;
26use crate::sorted_chunk_file::SortedChunkFile;
27use crate::unmerged_chunk_file::UnmergedChunkFile;
28
29thread_local! {
30 pub(crate) static LINE_CAPACITY: RefCell<usize> = RefCell::new(1);
31 pub(crate) static LINE_RECORDS_CAPACITY: RefCell<usize> = RefCell::new(1);
32 pub(crate) static SORTED_FILES: RefCell<BinaryHeap<Reverse<SortedChunkFile>>> = RefCell::new(BinaryHeap::new());
33 pub(crate) static CONFIG: RefCell<Option<Config>> = RefCell::new(None);
34}
35
36pub(crate) fn get_line_capacity() -> usize {
37 LINE_CAPACITY.with(|capacity| *capacity.borrow().borrow())
38}
39
40pub(crate) fn set_line_capacity(value: usize) {
41 LINE_CAPACITY.with(|capacity| capacity.replace(value));
42}
43
44pub(crate) fn get_line_records_capacity() -> usize {
45 LINE_RECORDS_CAPACITY.with(|capacity| *capacity.borrow().borrow())
46}
47
48pub(crate) fn set_line_records_capacity(value: usize) {
49 LINE_RECORDS_CAPACITY.with(|capacity| capacity.replace(value));
50}
51
52pub(crate) fn get_tl_config() -> Config {
53 CONFIG.with(
54 |config| {
55 config.borrow().as_ref().unwrap().clone()
56 }
57 )
58}
59
60pub(crate) fn create_tmp_file(config: &Config) -> NamedTempFile {
61 Builder::new()
62 .prefix(config.tmp_prefix())
63 .suffix(config.tmp_suffix())
64 .tempfile_in(config.tmp())
65 .map_err(|e| anyhow!("Failed to create new temp file: {}", e.to_string()))
66 .unwrap()
67}
68
69pub struct Sort {
91 input_files: Vec<PathBuf>,
92 output: PathBuf,
93 tmp: PathBuf,
94 tasks: usize,
95 field_separator: char,
96 ignore_empty: bool,
97 ignore_lines: Option<Regex>,
98 concurrent_merge: bool,
99 chunk_size_bytes: u64,
100 files: usize,
101 fields: Vec<Field>,
102 order: Order,
103 prefix: Vec<String>,
104 suffix: Vec<String>,
105 endl: char,
106}
107
108impl Sort {
109 pub fn new(input_files: Vec<PathBuf>, output: PathBuf) -> Sort {
126 Sort {
127 input_files,
128 output,
129 tmp: std::env::temp_dir(),
130 tasks: 0,
131 field_separator: '\t',
132 ignore_empty: false,
133 ignore_lines: Some(Regex::new("^#").unwrap()),
134 concurrent_merge: true,
135 chunk_size_bytes: 10_000_000,
136 files: 1024,
137 fields: vec![],
138 order: Order::Asc,
139 prefix: vec![],
140 suffix: vec![],
141 endl: '\n',
142 }
143 }
144
145 pub fn with_tmp_dir(&mut self, tmp: PathBuf) {
149 self.tmp = tmp;
150 }
151
152 pub fn with_tasks(&mut self, tasks: usize) {
154 self.tasks = tasks;
155 }
156
157 pub fn with_field_separator(&mut self, field_separator: char) {
159 self.field_separator = field_separator
160 }
161
162 pub fn with_concurrent_merge(&mut self, concurrent_merge: bool) {
164 self.concurrent_merge = concurrent_merge
165 }
166
167 pub fn with_chunk_size_bytes(&mut self, chunk_size_bytes: u64) {
169 self.chunk_size_bytes = chunk_size_bytes;
170 }
171
172 pub fn with_chunk_size_mb(&mut self, chunk_size_mb: u64) {
174 self.chunk_size_bytes = chunk_size_mb * 1_000_000;
175 }
176
177 pub fn with_intermediate_files(&mut self, files: usize) {
179 self.files = files;
180 }
181
182 pub fn with_ignore_empty(&mut self) {
184 self.ignore_empty = true;
185 }
186
187 pub fn with_ignore_lines(&mut self, r: Regex) {
190 self.ignore_lines = Some(r)
191 }
192
193 pub fn add_field(&mut self, field: Field) {
196 self.fields.push(field);
197 }
198
199 pub fn with_fields(&mut self, fields: Vec<Field>) {
201 self.fields = fields
202 }
203
204 pub fn with_order(&mut self, order: Order) {
206 self.order = order
207 }
208
209 pub fn add_prefix_line(&mut self, prefix_line: String) {
211 self.prefix.push(prefix_line);
212 }
213
214 pub fn with_prefix_lines(&mut self, prefix_lines: Vec<String>) {
216 self.prefix = prefix_lines;
217 }
218
219 pub fn add_suffix_line(&mut self, suffix_line: String) {
221 self.suffix.push(suffix_line);
222 }
223
224 pub fn with_suffix_lines(&mut self, suffix_lines: Vec<String>) {
226 self.suffix = suffix_lines;
227 }
228
229 pub fn with_endl(&mut self, endl: char) {
231 self.endl = endl
232 }
233
234 pub fn sort(&self) -> Result<(), anyhow::Error> {
236 let config = self.create_config();
237 let (current_soft, current_hard) = Self::get_rlimits()?;
238 log::info!("Current rlimit NOFILE, soft: {}, hard: {}", current_soft, current_hard);
239 let new_soft = max((config.files() + 256) as u64, current_soft);
240 log::info!("Set new rlimit NOFILE, soft: {}, hard: {}", new_soft, current_hard);
241 Self::set_rlimits(new_soft, current_hard)?;
242 Self::internal_sort(&self.input_files, &config, &self.output)?;
243 log::info!("Restore rlimit NOFILE, soft: {}, hard: {}", current_soft, current_hard);
244 Self::set_rlimits(current_soft, current_hard)?;
245 Ok(())
246 }
247
248 fn get_rlimits() -> Result<(u64, u64), anyhow::Error> {
249 getrlimit(Resource::NOFILE).with_context(|| "getrlimit")
250 }
251
252 fn set_rlimits(soft: u64, hard: u64) -> Result<(), anyhow::Error> {
253 setrlimit(Resource::NOFILE, soft, hard)
254 .with_context(|| format!("set rlimit NOFILE, soft: {}, hard: {}", soft, hard))?;
255 Ok(())
256 }
257
258 fn create_config(&self) -> Config {
259 let fields = if self.fields.is_empty() {
260 vec![Field::new(0, FieldType::String)]
261 } else {
262 self.fields.clone()
263 };
264
265 let mut tasks = self.tasks;
266 if self.tasks == 0 {
267 tasks = num_cpus::get();
268 }
269
270 let mut files = tasks * 2;
271 if self.files > files {
272 files = self.files
273 }
274
275 Config::new(
276 self.tmp.clone(),
277 "part-".to_string(),
278 ".unmerged".to_string(),
279 tasks,
280 self.field_separator,
281 self.ignore_empty,
282 self.ignore_lines.clone(),
283 self.concurrent_merge,
284 self.chunk_size_bytes,
285 files,
286 fields,
287 self.order.clone(),
288 self.prefix.clone(),
289 self.suffix.clone(),
290 self.endl
291 )
292 }
293
294 fn merge_sorted_files(thread_pool: &ThreadPool) {
295 thread_pool.in_all_threads(
296 Arc::new(
297 || {
298 SORTED_FILES.with(
299 |sorted_files| {
300 if sorted_files.borrow().len() > 1 {
301 let mut intermediate = Vec::new();
302 while sorted_files.borrow().len() > 0 {
303 let sorted_chunk_file = sorted_files.borrow_mut().pop().unwrap();
304 let path = sorted_chunk_file.0.path().clone();
305 intermediate.push(path);
306 }
307 let config = get_tl_config();
308 let (path, size) = Self::internal_merge(intermediate, &config, true, false).expect("TODO: ");
309 sorted_files
310 .borrow_mut()
311 .push(Reverse(SortedChunkFile::new(path, size)));
312 }
313 }
314 );
315 }
316 )
317 );
318 }
319
320 fn collect_sorted_files(thread_pool: &mut ThreadPool) -> Vec<PathBuf> {
321 let result: Arc<Mutex<Vec<PathBuf>>> = Arc::new(Mutex::new(Vec::new()));
322 let result_clone = result.clone();
323 thread_pool.in_all_threads_mut(
324 Arc::new(
325 Mutex::new(
326 move || {
327 SORTED_FILES.with(
328 |sorted_files| {
329 log::info!("Start collecting thread intermediate results, thread: {}", thread::current().name().unwrap_or("unnamed"));
330 let mut intermediate = Vec::new();
331 while sorted_files.borrow().len() > 0 {
332 let sorted_chunk_file = sorted_files.borrow_mut().pop().unwrap();
333 let path = sorted_chunk_file.0.path().clone();
334 intermediate.push(path);
335 }
336 let mut result_guard = result_clone.lock().unwrap();
337 result_guard.append(&mut intermediate);
338 log::info!("Finish collecting thread intermediate results, thread: {}", thread::current().name().unwrap_or("unnamed"));
339 }
340 );
341 }
342 )
343 )
344 );
345 let mut result_guard = result.lock().unwrap();
346 std::mem::take(result_guard.as_mut())
347 }
348
349 pub fn check(&self) -> Result<bool, anyhow::Error> {
350 let config = self.create_config();
351
352 let mut result = true;
353 for path in &self.input_files {
354 result = Self::internal_check(path, &config)?;
355 if !result {
356 break;
357 }
358 }
359 Ok(result)
360 }
361
362 pub(crate) fn internal_check(path: &PathBuf, config: &Config) -> Result<bool, anyhow::Error> {
363 let mut result = true;
364 let mut line = String::new();
365 let mut previous: Option<LineRecord> = None;
366 let mut reader = BufReader::new(File::open(path)?);
367 while reader.read_line(&mut line)? != 0 {
368 if config.ignore_empty() && line.trim().is_empty() {
369 continue;
370 }
371
372 if let Some(r) = config.ignore_lines() {
373 if r.is_match(line.trim()) {
374 continue;
375 }
376 }
377 let current_line_record = LineRecord::new(
378 line,
379 config.fields(),
380 config.field_separator(),
381 config.order().clone(),
382 )?;
383
384 match previous {
385 None => {
386 previous = Some(current_line_record);
387 }
388 Some(previous_line_record) => {
389 if previous_line_record <= current_line_record {
390 previous = Some(current_line_record);
391 } else {
392 result = false;
393 break;
394 }
395 }
396 }
397 line = String::new();
398 }
399 Ok(result)
400 }
401
402 pub fn merge(&self) -> Result<(), anyhow::Error> {
403 let config = self.create_config();
404 let (current_soft, current_hard) = Self::get_rlimits()?;
405 log::info!("Current rlimit NOFILE, soft: {}, hard: {}", current_soft, current_hard);
406 let new_soft = max((config.files() + 256) as u64, current_soft);
407 log::info!("Set new rlimit NOFILE, soft: {}, hard: {}", new_soft, current_hard);
408 Self::set_rlimits(new_soft, current_hard)?;
409 let (path, _lines) = Self::internal_merge(self.input_files.clone(), &config, false, true)?;
410 std::fs::rename(path.clone(), &self.output)
411 .with_context(|| anyhow!("Rename {} to {}", path.display(), self.output.display()))?;
412 log::info!("Restore rlimit NOFILE, soft: {}, hard: {}", current_soft, current_hard);
413 Self::set_rlimits(current_soft, current_hard)?;
414 Ok(())
415 }
416
417 pub(crate) fn internal_merge(files: Vec<PathBuf>, config: &Config, remove_merged: bool, add_prefix_suffix: bool) -> Result<(PathBuf, usize), anyhow::Error> {
418 log::info!("Merging {} sorted files, thread: {}", files.len(), thread::current().name().unwrap_or("unnamed"));
419 let mut merged_len: usize = 0;
420 let merged_file = create_tmp_file(config);
421 let (persisted_merged_file, path) = merged_file.keep()?;
422 let mut merged_writer = BufWriter::new(persisted_merged_file);
423 if add_prefix_suffix {
424 for prefix in config.prefix() {
425 writeln!(merged_writer, "{}", prefix)?;
426 merged_len += 1;
427 }
428 }
429
430 if files.len() == 1 {
431 let file = File::open(files[0].clone()).with_context(|| format!("path: {}", files[0].display()))?;
432 let mut reader = BufReader::new(file);
433 let mut line = String::new();
434
435 while reader.read_line(&mut line)? > 0 {
436 merged_writer.write_all(line.as_bytes())?;
437 line = String::new();
438 merged_len += 1;
439 }
440 std::fs::remove_file(files[0].clone())?;
441 } else {
442 let mut unmerged_files: BinaryHeap<UnmergedChunkFile> = files.into_iter()
443 .map(
444 |path| UnmergedChunkFile::new(
445 path,
446 config.fields(),
447 config.field_separator(),
448 config.order().clone(),
449 )
450 .unwrap()
451 )
452 .collect();
453 while unmerged_files.len() > 1 {
454 let mut current_min = unmerged_files.pop().unwrap();
455 let unmerged_min = unmerged_files.peek().unwrap();
456
457 let mut current_min_done = false;
458 while ¤t_min >= unmerged_min {
460 if let Some(line_record) = current_min.line_record() {
461 let line = line_record.line();
462 merged_writer.write_all(line.as_bytes())?;
463 merged_len += 1;
464 } else {
465 current_min_done = true;
466 if remove_merged {
467 std::fs::remove_file(current_min.path())?;
468 }
469 break;
470 }
471 }
472 if !current_min_done {
473 unmerged_files.push(current_min)
474 }
475 }
476 let mut current_min = unmerged_files.pop().unwrap();
477 loop {
478 if let Some(line_record) = current_min.line_record() {
479 let line = line_record.line();
480 merged_writer.write_all(line.as_bytes())?;
481 merged_len += 1;
482 } else {
483 std::fs::remove_file(current_min.path())?;
484 break;
485 }
486 }
487
488 log::info!("Finished merging sorted files, thread: {}, merged length: {} lines", thread::current().name().unwrap_or("unnamed"), merged_len);
489 }
490 if add_prefix_suffix {
491 for suffix in config.suffix() {
492 writeln!(merged_writer, "{}", suffix)?;
493 merged_len += 1;
494 }
495 }
496 Ok((path, merged_len))
497 }
498
499 fn internal_sort(input_files: &Vec<PathBuf>, config: &Config, output: &Path) -> Result<(), anyhow::Error> {
500 log::info!("Start parallel sort");
501 let mut thread_pool_builder = ThreadPoolBuilder::new();
502 let mut sorting_pool = thread_pool_builder
503 .with_name("sorting".to_string())
504 .with_tasks(config.tasks())
505 .with_queue_size(config.queue_size())
506 .with_shutdown_mode(ShutdownMode::CompletePending)
507 .build()
508 .unwrap();
509
510 sorting_pool.set_thread_local(&CONFIG, Some(config.clone()));
511
512 for path in input_files {
513 for chunk in ChunkIterator::new(path, config.chunk_size_bytes(), config.endl()).unwrap() {
514 let sort_command = Box::new(SortCommand::new(Some(chunk)));
515 sorting_pool.submit(sort_command);
516 }
517 }
518
519 if config.concurrent_merge() {
520 Self::merge_sorted_files(&sorting_pool);
521 }
522
523 let sorted_files = Self::collect_sorted_files(&mut sorting_pool);
524 log::info!("Shutting down sorting pool");
525 sorting_pool.shutdown();
526 sorting_pool.join()?;
527
528 let (path, _lines) = Self::internal_merge(sorted_files, config, true, true)?;
529
530 std::fs::rename(path.clone(), output)
531 .with_context(|| anyhow!("Rename {} to {}", path.display(), output.display()))?;
532 log::info!("Finish parallel sort");
533 Ok(())
534 }
535}