1use std::{
2 cmp::Ordering,
3 error::Error,
4 fs::{self, read_dir, remove_dir, remove_file, File, OpenOptions},
5 io::{self, stdin, BufRead, BufReader, BufWriter, Error as IoError, Seek, Write},
6 iter::Zip,
7 marker::{Send, Sync},
8 ops::Range,
9 path::{Path, PathBuf},
10 time::SystemTime,
11 vec::IntoIter,
12};
13
14use bincode::{
15 self,
16 config::{FixintEncoding, WithOtherIntEncoding},
17 DefaultOptions, Error as BincodeError, Options,
18};
19use log::{debug, error, warn};
20use rayon::{prelude::ParallelSliceMut, ThreadPool}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
22use thiserror::Error;
23
24use crate::nested::{
25 get,
26 map::skymap::{implicit::ImplicitCountMapU32, SkyMap},
27 n_hash, Layer,
28};
29
30pub mod cindex;
31pub mod scindex;
32
33pub trait IntSortable: Send {}
35impl<A: Send> IntSortable for A {}
37
38pub trait ExtSortable: IntSortable + Serialize + DeserializeOwned {}
42impl<A: IntSortable + Serialize + DeserializeOwned> ExtSortable for A {}
44
45#[derive(Error, Debug)]
46pub enum SortError {
47 #[error("I/O error")]
48 IoError(#[from] IoError),
49 #[error("Serialization/deserialization (bincode) error")]
50 BincodeError(#[from] BincodeError),
51 #[error("Sort error: `{0}`.")]
52 Custom(String),
53}
54
55pub fn hpx_internal_sort<T, F>(elems: &mut [T], hpx29: F, n_threads: Option<usize>)
64where
65 T: IntSortable,
66 F: Fn(&T) -> u64 + Sync,
67{
68 match n_threads {
69 Some(1) => elems.sort_by_cached_key(hpx29),
70 _ => get_thread_pool(n_threads).install(|| elems.par_sort_by_cached_key(&hpx29)),
71 }
72}
73
74fn get_thread_pool(n_threads: Option<usize>) -> ThreadPool {
78 let mut pool_builder = rayon::ThreadPoolBuilder::new();
79 if let Some(n_threads) = n_threads {
80 pool_builder = pool_builder.num_threads(n_threads);
81 }
82 pool_builder.build().unwrap()
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86pub struct SimpleExtSortInfo {
87 n_elems_per_chunk: u32,
89 n_threads: Option<usize>,
91 clean: bool,
93 n_tot: u64,
95 depth: u8,
97 ordered_ranges_counts: Vec<(Range<u32>, u32)>,
99}
100impl SimpleExtSortInfo {
101 fn new(
102 n_elems_per_chunk: u32,
103 n_threads: Option<usize>,
104 clean: bool,
105 n_tot: u64,
106 depth: u8,
107 ordered_ranges_counts: Vec<(Range<u32>, u32)>,
108 ) -> Self {
109 Self {
110 n_elems_per_chunk,
111 n_threads,
112 clean,
113 n_tot,
114 depth,
115 ordered_ranges_counts,
116 }
117 }
118}
119
120pub struct SimpleExtSortParams {
121 tmp_dir: PathBuf,
123 n_elems_per_chunk: u32,
126 n_threads: Option<usize>,
128 clean: bool,
130}
131impl Default for SimpleExtSortParams {
132 fn default() -> Self {
133 Self::new(PathBuf::from(".sort_tmp/"), 10_000_000, None, true)
134 }
135}
136impl SimpleExtSortParams {
137 const ALL_FILENAME: &'static str = "hpxsort.all.unsorted.bin";
138 const PREFIX: &'static str = "hpxsort.";
139 const INFO_FILENAME: &'static str = "hpxsort.info.toml";
140 const SUCCESS_FILENAME: &'static str = "hpxsort.success";
141 const SUFFIX: &'static str = ".unsorted.bin";
142
143 pub fn new(
144 tmp_dir: PathBuf,
145 n_elems_per_chunk: u32,
146 n_threads: Option<usize>,
147 clean: bool,
148 ) -> Self {
149 Self {
150 tmp_dir,
151 n_elems_per_chunk,
152 n_threads,
153 clean,
154 }
155 }
156
157 fn clean<P: AsRef<Path>>(tmp_dir: P) -> Result<(), IoError> {
158 let mut path = PathBuf::from(tmp_dir.as_ref());
159 debug!("PATH: {:?}", &path);
160 assert!(path.file_name().is_some());
161 path.push(Self::SUCCESS_FILENAME);
163 debug!("Remove file {:?} if exists.", &path);
164 fs::exists(&path).and_then(|exists| if exists { remove_file(&path) } else { Ok(()) })?;
165 path.set_file_name(Self::INFO_FILENAME);
167 debug!("Remove file {:?} if exists.", &path);
168 fs::exists(&path).and_then(|exists| if exists { remove_file(&path) } else { Ok(()) })?;
169 path.set_file_name(Self::ALL_FILENAME);
171 debug!("Remove file {:?} if exists.", &path);
172 fs::exists(&path).and_then(|exists| if exists { remove_file(&path) } else { Ok(()) })?;
173 path.pop();
175 for file in read_dir(&path)? {
176 let file_path = file?.path();
177 if file_path.is_file()
178 && file_path
179 .file_name()
180 .and_then(|os_str| os_str.to_str())
181 .and_then(SimpleExtSortParams::parse_tmp_file_name)
182 .is_some()
183 {
184 debug!("Remove file {:?} if exists.", &file_path);
185 remove_file(file_path)?;
186 }
187 }
188 let tmp_dir_ref = tmp_dir.as_ref();
190 debug!("Remove dir {:?} if exists and is empty.", tmp_dir_ref);
192 if fs::exists(tmp_dir_ref).and_then(move |exists| {
193 if exists {
194 read_dir(tmp_dir_ref).map(|mut it| it.next().is_none())
195 } else {
196 Ok(false)
197 }
198 })? {
199 remove_dir(tmp_dir_ref)?;
200 } else {
201 warn!("Unable to remove directory {:?}", tmp_dir_ref);
202 }
203 Ok(())
204 }
205
206 pub fn set_tmp_dir(mut self, tmp_dir: PathBuf) -> Self {
207 self.tmp_dir = tmp_dir;
208 self
209 }
210
211 pub fn set_n_elems_per_chunk(mut self, n_elems_per_chunk: u32) -> Self {
212 self.n_elems_per_chunk = n_elems_per_chunk;
213 self
214 }
215
216 pub fn set_n_threads(mut self, n_threads: usize) -> Self {
217 self.n_threads = Some(n_threads);
218 self
219 }
220
221 fn from_dir_and_info(tmp_dir: PathBuf, info: &SimpleExtSortInfo) -> Self {
222 Self {
223 tmp_dir,
224 n_elems_per_chunk: info.n_elems_per_chunk,
225 n_threads: info.n_threads,
226 clean: info.clean,
227 }
228 }
229
230 fn create_tmp_dir(&self) -> Result<(), IoError> {
231 debug!("Create tmp dir: {}", &self.tmp_dir.to_string_lossy());
232 fs::create_dir_all(&self.tmp_dir)
233 }
234
235 fn create_file_all(&self) -> Result<File, IoError> {
237 let mut path = self.tmp_dir.clone();
238 path.push(Self::ALL_FILENAME);
239 debug!(
240 "Create or open to overwrite file: {}",
241 path.to_string_lossy()
242 );
243 OpenOptions::new()
244 .append(false)
245 .write(true)
246 .create(true)
247 .truncate(true)
248 .open(path)
249 }
250 fn open_file_all(&self) -> Result<File, IoError> {
251 let mut path = self.tmp_dir.clone();
252 path.push(Self::ALL_FILENAME);
253 File::open(path)
254 }
255
256 fn create_file_or_open_to_append(&self, depth: u8, range: &Range<u32>) -> Result<File, IoError> {
257 let n_digits = (1 + n_hash(depth).ilog10()) as usize;
258 let filename = format!(
259 "{}d{}f{:0n_digits$}t{:0n_digits$}{}",
260 Self::PREFIX,
261 depth,
262 range.start,
263 range.end,
264 Self::SUFFIX
265 );
266 let mut path = self.tmp_dir.clone();
267 path.push(filename);
268 debug!("Create or open to append file: {}.", path.to_string_lossy(),);
269 OpenOptions::new().append(true).create(true).open(path)
270 }
271
272 fn create_info_file_path(&self) -> PathBuf {
275 let mut path = self.tmp_dir.clone();
276 path.push(Self::INFO_FILENAME);
277 path
278 }
279
280 fn create_info_file_path_gen(path: &Path) -> PathBuf {
281 let mut path = path.to_path_buf();
282 path.push(Self::INFO_FILENAME);
283 path
284 }
285
286 fn write_info(
287 &self,
288 n_tot: u64,
289 depth: u8,
290 ordered_ranges_counts: Vec<(Range<u32>, u32)>,
291 ) -> Result<(), Box<dyn Error>> {
292 let info = SimpleExtSortInfo::new(
293 self.n_elems_per_chunk,
294 self.n_threads,
295 self.clean,
296 n_tot,
297 depth,
298 ordered_ranges_counts,
299 );
300 let content = toml::to_string_pretty(&info)?;
301 let path = self.create_info_file_path();
302 fs::write(&path, content)
303 .map_err(|e| format!("Error writing file {}: {:?}.", path.to_string_lossy(), e).into())
304 }
305
306 pub fn read_info(&self) -> Result<SimpleExtSortInfo, Box<dyn Error>> {
307 let path = self.create_info_file_path();
308 fs::read_to_string(&path)
309 .map_err(|e| format!("Error reading file {}: {:?}.", path.to_string_lossy(), e).into())
310 .and_then(|content| toml::from_str(&content).map_err(|e| e.into()))
311 }
312
313 fn read_info_gen(path: &Path) -> Result<SimpleExtSortInfo, Box<dyn Error>> {
314 let path = Self::create_info_file_path_gen(path);
315 fs::read_to_string(&path)
316 .map_err(|e| format!("Error reading file {}: {:?}.", path.to_string_lossy(), e).into())
317 .and_then(|content| toml::from_str(&content).map_err(|e| e.into()))
318 }
319
320 fn create_ok_file_path(&self) -> PathBuf {
323 let mut path = self.tmp_dir.clone();
324 path.push(Self::SUCCESS_FILENAME);
325 path
326 }
327 fn write_ok(&self) -> Result<(), Box<dyn Error>> {
328 let path = self.create_ok_file_path();
329 fs::write(&path, "")
330 .map_err(|e| format!("Error writing file {}: {:?}.", path.to_string_lossy(), e).into())
331 }
332 pub fn ok_file_exists(&self) -> Result<bool, Box<dyn Error>> {
333 let path = self.create_ok_file_path();
334 fs::exists(&path).map_err(|e| {
335 format!(
336 "Error cheking for file {}: {:?}.",
337 path.to_string_lossy(),
338 e
339 )
340 .into()
341 })
342 }
343
344 fn parse_tmp_file_name(name: &str) -> Option<(u8, Range<u32>)> {
348 name
349 .strip_prefix(format!("{}d", Self::PREFIX).as_str())
350 .and_then(|s| s.strip_suffix(Self::SUFFIX))
351 .and_then(|s| s.split_once('f'))
352 .and_then(|(depth, s)| s.split_once('t').map(|(from, to)| (depth, from, to)))
353 .and_then(|(depth, from, to)| {
354 match (depth.parse::<u8>(), from.parse::<u32>(), to.parse::<u32>()) {
355 (Ok(d), Ok(f), Ok(t)) => Some((d, f..t)),
356 _ => None,
357 }
358 })
359 }
360
361 fn get_ordered_files_in_tmp_dir(&self) -> Result<Vec<(PathBuf, u8, Range<u32>)>, IoError> {
364 read_dir(&self.tmp_dir).map(|it| {
365 let mut path_depth_range_vec: Vec<(PathBuf, u8, Range<u32>)> = it
366 .filter_map(|res_dir_entry| {
367 res_dir_entry.ok().and_then(|dir_entry| {
368 let path = dir_entry.path();
369 if path.is_file() {
370 path
371 .file_name()
372 .and_then(|os_str| os_str.to_str())
373 .and_then(SimpleExtSortParams::parse_tmp_file_name)
374 .map(|(depth, range)| (path, depth, range))
375 } else {
376 None
377 }
378 })
379 })
380 .collect();
381 path_depth_range_vec.sort_by(|(_, _, rl), (_, _, rr)| rl.start.cmp(&rr.start));
383 path_depth_range_vec.into_iter().collect()
384 })
385 }
386}
387
388pub fn hpx_external_sort_with_knowledge<'a, T, E, I, S, F>(
418 it: I,
419 count_map: &'a S,
420 hpx29: F,
421 sort_params: Option<SimpleExtSortParams>,
422) -> Result<impl Iterator<Item = Result<T, Box<dyn Error>>>, Box<dyn Error>>
423where
424 T: ExtSortable,
425 E: Error + Send + 'static,
426 I: Iterator<Item = Result<T, E>> + Send,
427 S: SkyMap<'a, HashType = u32, ValueType = u32>,
428 F: Fn(&T) -> u64 + Sync,
429{
430 let params = sort_params.unwrap_or_default();
431 let tmp_dir = params.tmp_dir.clone();
432 hpx_external_sort_with_knowledge_write_tmp(it, count_map, &hpx29, Some(params))
433 .and_then(|()| hpx_external_sort_with_knowledge_read_tmp(hpx29, tmp_dir))
434}
435
436pub fn hpx_external_sort_with_knowledge_write_tmp<'a, T, E, I, S, F>(
446 mut it: I,
447 count_map: &'a S,
448 hpx29: F,
449 sort_params: Option<SimpleExtSortParams>,
450) -> Result<(), Box<dyn Error>>
451where
452 T: ExtSortable,
453 E: Error + Send + 'static,
454 I: Iterator<Item = Result<T, E>> + Send,
455 S: SkyMap<'a, HashType = u32, ValueType = u32>,
456 F: Fn(&T) -> u64 + Sync,
457{
458 let params = sort_params.unwrap_or_default();
460 let depth = count_map.depth();
461 let dd = 29 - depth;
462 let twice_dd = dd << 1;
463 let bincode = get_bincode();
465 params.create_tmp_dir()?;
467 let pool = get_thread_pool(params.n_threads);
469 let n_max = params.n_elems_per_chunk;
471 let (n_tot, mut ranges_counts) = skymap2ranges(count_map, n_max);
472 params.write_info(n_tot, depth, ranges_counts.clone())?;
473
474 let n_max = n_max as usize;
475 let tstart = SystemTime::now();
477 let mut entries = (&mut it).take(n_max).collect::<Result<Vec<T>, _>>()?;
478 debug!(
479 "Read {} elements in {} ms",
480 entries.len(),
481 SystemTime::now()
482 .duration_since(tstart)
483 .unwrap_or_default()
484 .as_millis()
485 );
486
487 let mut n = entries.len();
488 let mut n_tot_it = 0_u64;
489 while n > 0 {
490 let tstart = SystemTime::now();
491 let (next_entries, ()) = pool.join(
492 || (&mut it).take(n_max).collect::<Result<Vec<T>, _>>(),
493 || {
494 let tstart = SystemTime::now();
495 entries.par_sort_by_cached_key(&hpx29);
496 debug!(
497 "Sort {} elements in {} ms",
498 entries.len(),
499 SystemTime::now()
500 .duration_since(tstart)
501 .unwrap_or_default()
502 .as_millis()
503 );
504 },
505 );
506 debug!(
507 "Read {} elements (+ parallel sort of prev elems) in {} ms",
508 entries.len(),
509 SystemTime::now()
510 .duration_since(tstart)
511 .unwrap_or_default()
512 .as_millis()
513 );
514 let next_entries = next_entries?;
515
516 let tstart = SystemTime::now();
518 let mut entries_view = entries.as_mut_slice();
519 let first_h = (hpx29(entries_view.first().unwrap()) >> twice_dd) as u32;
523 let last_h = (hpx29(entries_view.last().unwrap()) >> twice_dd) as u32;
524 let rstart = ranges_counts
525 .binary_search_by(get_range_binsearch(first_h))
526 .unwrap();
527 let rend = ranges_counts
528 .binary_search_by(get_range_binsearch(last_h))
529 .unwrap();
530
531 for (range, count) in &mut ranges_counts[rstart..=rend] {
532 let to = entries_view.partition_point(|row| {
533 let h = (hpx29(row) >> twice_dd) as u32;
534 range.contains(&h)
535 });
536 if to > 0 {
537 let (to_be_writen, remaining) = entries_view.split_at_mut(to);
538 entries_view = remaining;
539 let file = params.create_file_or_open_to_append(depth, range)?;
541 let mut bufw = BufWriter::new(file);
542 for row in to_be_writen {
543 bincode.serialize_into(&mut bufw, row)?;
544 }
545
546 let to = to as u32;
547 if to > *count {
548 return Err(
550 format!(
551 "Wrong number of counts for range [{}, {}). N remaining: {}. N to be removed: {}.",
552 range.start, range.end, count, &to
553 )
554 .into(),
555 );
556 }
557 *count -= to;
558 }
559 }
560 debug!(
561 "{} eleemnts added to temp files in {} ms",
562 n,
563 SystemTime::now()
564 .duration_since(tstart)
565 .unwrap_or_default()
566 .as_millis()
567 );
568
569 n_tot_it += n as u64;
570 entries = next_entries;
571 n = entries.len();
572 }
573 if n_tot_it != n_tot {
576 return Err(
578 format!(
579 "Wrong number of written rows. Expected: {}. Actual: {}",
580 n_tot, n_tot_it
581 )
582 .into(),
583 );
584 }
585 for (range, count) in &ranges_counts {
587 if *count != 0 {
588 return Err(
590 format!(
591 "Wrong number of written rows for range [{}, {}). N remaining: {}",
592 range.start, range.end, count
593 )
594 .into(),
595 );
596 }
597 }
598 params.write_ok()
600}
601
602pub fn hpx_external_sort_with_knowledge_read_tmp<T, F>(
606 hpx29: F,
607 tmp_dir: PathBuf,
608) -> Result<impl Iterator<Item = Result<T, Box<dyn Error>>>, Box<dyn Error>>
609where
610 T: ExtSortable,
611 F: Fn(&T) -> u64 + Sync,
612{
613 let info = SimpleExtSortParams::read_info_gen(&tmp_dir)?;
615 let params = SimpleExtSortParams::from_dir_and_info(tmp_dir, &info);
616 let ordered_files = params.get_ordered_files_in_tmp_dir()?;
618 if ordered_files.is_empty() {
619 return Err(format!("No tmp file found in {:?}", ¶ms.tmp_dir).into());
620 }
621 let thread_pool = get_thread_pool(params.n_threads);
623
624 fn load_file<TT: ExtSortable>(nrows: usize, path: PathBuf) -> Result<Vec<TT>, SortError> {
625 let path_str = path.to_string_lossy().to_string();
626 let tstart = SystemTime::now();
627 let file = File::open(path).map_err(SortError::IoError)?;
628 let file_len = file.metadata().map_err(SortError::IoError)?.len();
629 let mut bufr = BufReader::new(file);
630 let bincode = get_bincode();
631 let mut rows = Vec::with_capacity(nrows);
632 for _ in 0..nrows {
633 rows.push(
634 bincode
635 .deserialize_from(&mut bufr)
636 .map_err(SortError::BincodeError)?,
637 );
638 }
639 let pos = bufr.stream_position().map_err(SortError::IoError)?;
640 if pos != file_len {
641 Err(SortError::Custom(format!(
642 "File position '{}' does not match file len '{}'.",
643 pos, file_len
644 )))
645 } else {
646 debug!(
647 "Read file {} in {} ms",
648 path_str,
649 SystemTime::now()
650 .duration_since(tstart)
651 .unwrap_or_default()
652 .as_millis()
653 );
654 Ok(rows)
655 }
656 }
657 #[allow(clippy::type_complexity)]
658 fn load_next_file<TT: ExtSortable>(
659 elem: Option<((PathBuf, u8, Range<u32>), (Range<u32>, u32))>,
660 ) -> Option<Result<Vec<TT>, SortError>> {
661 match elem {
662 Some(((path, _depth, lrange), (rrange, nrows))) => {
663 assert_eq!(
664 rrange, lrange,
665 "File range difference from counts range: {:?} != {:?}.",
666 &rrange, &lrange
667 );
668 Some(load_file(nrows as usize, path.clone()))
669 }
670 None => None,
671 }
672 }
673
674 let mut ordered_files_counts_it = ordered_files.into_iter().zip(info.ordered_ranges_counts);
676 let ((file_path, _depth, lrange), (rrange, nrows)) = ordered_files_counts_it.next().unwrap();
677 assert_eq!(
678 rrange, lrange,
679 "File range difference from counts range: {:?} != {:?}.",
680 &rrange, &lrange
681 );
682
683 let mut rows_to_be_sorted = load_file(nrows as usize, file_path)?;
684 let (next_file_content, ()) = thread_pool.join(
685 || load_next_file(ordered_files_counts_it.next()),
686 || rows_to_be_sorted.par_sort_by_cached_key(&hpx29),
687 );
688
689 struct GlobalIt<TT: ExtSortable, FF: Fn(&TT) -> u64 + Sync> {
690 thread_pool: ThreadPool,
692 hpx29: FF,
694 #[allow(clippy::type_complexity)]
696 ordered_files_counts_it: Zip<IntoIter<(PathBuf, u8, Range<u32>)>, IntoIter<(Range<u32>, u32)>>,
697 rows_it: IntoIter<TT>,
699 next_rows: Option<Vec<TT>>,
701 tmp_dir: PathBuf,
703 clean: bool,
705 }
706 impl<TT: ExtSortable, FF: Fn(&TT) -> u64 + Sync> Iterator for GlobalIt<TT, FF> {
707 type Item = Result<TT, Box<dyn Error>>;
708
709 fn next(&mut self) -> Option<Self::Item> {
710 match self.rows_it.next() {
711 Some(next_row) => Some(Ok(next_row)),
712 None => match self.next_rows.as_mut() {
713 Some(rows_to_be_sorted) => {
714 let (next_file_content, ()) = self.thread_pool.join(
715 || load_next_file(self.ordered_files_counts_it.next()),
716 || {
717 let tstart = SystemTime::now();
718 rows_to_be_sorted.par_sort_by_cached_key(&self.hpx29);
719 debug!(
720 "{} rows sorted in {} ms",
721 rows_to_be_sorted.len(),
722 SystemTime::now()
723 .duration_since(tstart)
724 .unwrap_or_default()
725 .as_millis()
726 );
727 },
728 );
729 match next_file_content.transpose() {
730 Ok(Some(next_row_chunk)) => {
731 self.rows_it = self.next_rows.replace(next_row_chunk).unwrap().into_iter();
732 self.next()
733 }
734 Ok(None) => {
735 self.rows_it = self.next_rows.take().unwrap().into_iter();
736 self.next()
737 }
738 Err(e) => Some(Err(e.into())),
739 }
740 }
741 None => {
742 if self.clean {
743 if let Err(e) = SimpleExtSortParams::clean(&self.tmp_dir) {
744 error!("Error cleaning external sort temporary files: {}", e);
745 }
746 }
747 None
748 }
749 },
750 }
751 }
752 }
753
754 Ok(GlobalIt {
755 thread_pool,
756 hpx29,
757 ordered_files_counts_it,
758 rows_it: rows_to_be_sorted.into_iter(),
759 next_rows: next_file_content.transpose()?,
760 tmp_dir: params.tmp_dir,
761 clean: params.clean,
762 })
763}
764
765fn get_bincode() -> WithOtherIntEncoding<DefaultOptions, FixintEncoding> {
767 DefaultOptions::new().with_fixint_encoding()
768 }
770
771fn get_range_binsearch(hash: u32) -> impl Fn(&(Range<u32>, u32)) -> Ordering {
773 move |(target, _)| {
774 if target.end <= hash {
775 Ordering::Less
776 } else if hash < target.start {
777 Ordering::Greater
778 } else {
779 Ordering::Equal
780 }
781 }
782}
783
784fn skymap2ranges<'a, S>(counts: &'a S, threshold: u32) -> (u64, Vec<(Range<u32>, u32)>)
788where
789 S: SkyMap<'a, HashType = u32, ValueType = u32>,
790{
791 let mut range_count_vec: Vec<(Range<u32>, u32)> = Vec::with_capacity(1000);
793 let mut n_tot = 0_u64;
794
795 let depth = counts.depth();
796
797 let mut start = 0_u32;
798 let mut cumul_count = 0_u32;
799
800 for (hash, count) in counts.entries() {
801 let sum = cumul_count + count;
802 if sum <= threshold {
803 cumul_count = sum;
804 } else {
805 range_count_vec.push((start..hash, cumul_count));
806 n_tot += cumul_count as u64;
807 start = hash;
808 cumul_count = *count;
809 }
810 }
811 range_count_vec.push((start..n_hash(depth) as u32, cumul_count));
812 n_tot += cumul_count as u64;
813
814 (n_tot, range_count_vec)
815}
816
817pub fn hpx_external_sort<T, E, I, J, F, P: AsRef<Path>>(
832 iterator: I,
833 iterable: J,
834 hpx29: F,
835 depth: u8,
836 save_countmap_in_file: Option<P>, sort_params: Option<SimpleExtSortParams>,
838) -> Result<impl Iterator<Item = Result<T, Box<dyn Error>>>, Box<dyn Error>>
839where
840 T: ExtSortable,
841 E: Error + Send + 'static,
842 I: Iterator<Item = Result<u64, E>> + Send,
843 J: IntoIterator<Item = Result<T, E>>,
844 <J as IntoIterator>::IntoIter: Send,
845 F: Fn(&T) -> u64 + Send + Sync,
846{
847 debug!("Starts computing the count map of depth {}...", depth);
848 let tstart = SystemTime::now();
849 let params = sort_params.unwrap_or_default();
850 let twice_dd = (29 - depth) << 1;
851 let count_map = ImplicitCountMapU32::from_hash_values(
852 depth,
853 iterator
854 .enumerate()
855 .filter_map(|(irow, row_res)| match row_res {
856 Ok(row_hash) => Some((row_hash >> twice_dd) as u32),
857 Err(e) => {
858 error!("Error at line {}, line ignored: {:?}", irow, e);
859 None
860 }
861 }),
862 );
863 debug!(
864 "... count map of depth {} computed in {} ms.",
865 depth,
866 SystemTime::now()
867 .duration_since(tstart)
868 .unwrap_or_default()
869 .as_millis()
870 );
871 let tstart = SystemTime::now();
872 params.create_tmp_dir()?;
873 if let Some(countmap_path) = save_countmap_in_file {
874 count_map.to_fits_file(countmap_path)?;
875 }
876 debug!(
877 "Count map of writen in {} ms.",
878 SystemTime::now()
879 .duration_since(tstart)
880 .unwrap_or_default()
881 .as_millis()
882 );
883 hpx_external_sort_with_knowledge(iterable.into_iter(), &count_map, hpx29, Some(params))
884}
885
886pub fn hpx_external_sort_stream<T, E, I, F, P: AsRef<Path>>(
887 stream: I,
888 hpx29: F,
889 depth: u8,
890 save_countmap_in_file: Option<P>, sort_params: Option<SimpleExtSortParams>,
892) -> Result<impl Iterator<Item = Result<T, Box<dyn Error>>>, Box<dyn Error>>
893where
894 T: ExtSortable,
895 E: Error + 'static,
896 I: Iterator<Item = Result<T, E>>,
897 F: Fn(&T) -> u64 + Sync,
898{
899 let params = sort_params.unwrap_or_default();
900 let twice_dd = (29 - depth) << 1;
901 let bincode = get_bincode();
902 let count_map = {
904 let tmp_file_all = params
905 .create_tmp_dir()
906 .and_then(|()| params.create_file_all())?;
907 let mut bufw = BufWriter::new(tmp_file_all);
908 ImplicitCountMapU32::from_hash_values(
909 depth,
910 stream
911 .enumerate()
912 .filter_map(|(irow, row_res)| match row_res {
913 Ok(row) => {
914 let hash = (hpx29(&row) >> twice_dd) as u32;
915 match bincode.serialize_into(&mut bufw, &row) {
916 Ok(()) => Some(hash),
917 Err(e) => {
918 error!("Error writing line {}, line ignored: {:?}", irow, e);
919 None
920 }
921 }
922 }
923 Err(e) => {
924 error!("Error reading line {}, line ignored: {:?}", irow, e);
925 None
926 }
927 }),
928 )
929 };
930 let n_rows = count_map.values().map(|count| *count as u64).sum();
932 if let Some(countmap_path) = save_countmap_in_file {
933 count_map.to_fits_file(countmap_path)?;
934 }
935 let mut bufr = params.open_file_all().map(BufReader::new)?;
936 hpx_external_sort_with_knowledge(
937 (0..n_rows).map(move |_| bincode.deserialize_from(&mut bufr)),
938 &count_map,
939 hpx29,
940 Some(params),
941 )
942}
943
944pub fn hpx_external_sort_csv_stdin_stdout<P: AsRef<Path>>(
958 ilon: usize,
959 ilat: usize,
960 has_header: bool,
961 separator: Option<char>,
962 depth: u8,
963 save_countmap_in_file: Option<P>, sort_params: Option<SimpleExtSortParams>,
965) -> Result<(), Box<dyn Error>> {
966 let writer = io::stdout().lock();
967 hpx_external_sort_csv_stdin_gen(
968 writer,
969 ilon,
970 ilat,
971 has_header,
972 separator,
973 depth,
974 save_countmap_in_file,
975 sort_params,
976 )
977}
978
979pub fn hpx_external_sort_csv_stdin<OUT: AsRef<Path>, P: AsRef<Path>>(
996 output_path: OUT,
997 output_overwrite: bool,
998 ilon: usize,
999 ilat: usize,
1000 has_header: bool,
1001 separator: Option<char>,
1002 depth: u8,
1003 save_countmap_in_file: Option<P>, sort_params: Option<SimpleExtSortParams>,
1005) -> Result<(), Box<dyn Error>> {
1006 let writer = BufWriter::new(if output_overwrite {
1007 OpenOptions::new()
1008 .write(true)
1009 .create(true)
1010 .truncate(true)
1011 .open(output_path)
1012 } else {
1013 OpenOptions::new()
1014 .write(true)
1015 .create_new(true)
1016 .open(output_path)
1017 }?);
1018 hpx_external_sort_csv_stdin_gen(
1019 writer,
1020 ilon,
1021 ilat,
1022 has_header,
1023 separator,
1024 depth,
1025 save_countmap_in_file,
1026 sort_params,
1027 )
1028}
1029
1030pub fn hpx_external_sort_csv_stdin_gen<W: Write, P: AsRef<Path>>(
1046 mut writer: W,
1047 ilon: usize,
1048 ilat: usize,
1049 has_header: bool,
1050 separator: Option<char>,
1051 depth: u8,
1052 save_countmap_in_file: Option<P>, sort_params: Option<SimpleExtSortParams>,
1054) -> Result<(), Box<dyn Error>> {
1055 let mut line_res_it = stdin().lock().lines().peekable();
1057 while let Some(Ok(line)) = line_res_it.next_if(|res| {
1059 res
1060 .as_ref()
1061 .map(|line| line.starts_with('#'))
1062 .unwrap_or(false)
1063 }) {
1064 writer.write_all(line.as_bytes())?;
1065 }
1066 if has_header {
1068 if let Some(header) = line_res_it.next().transpose()? {
1069 write!(&mut writer, "{}\n", header)?;
1070 }
1071 }
1072 let hpx29 = get_hpx(ilon, ilat, separator.unwrap_or(','), get(29));
1074 let sorted_it = hpx_external_sort_stream(
1075 line_res_it,
1076 hpx29,
1077 depth,
1078 save_countmap_in_file,
1079 sort_params,
1080 )?;
1081 for row_res in sorted_it {
1083 row_res.and_then(|row| writeln!(&mut writer, "{}", row).map_err(|e| e.into()))?;
1084 }
1085 Ok(())
1086}
1087
1088pub fn hpx_external_sort_csv_file<IN: AsRef<Path>, OUT: AsRef<Path>, P: AsRef<Path>>(
1106 input_path: IN,
1107 output_path: OUT,
1108 output_overwrite: bool,
1109 ilon: usize,
1110 ilat: usize,
1111 has_header: bool,
1112 separator: Option<char>,
1113 depth: u8,
1114 save_countmap_in_file: Option<P>, sort_params: Option<SimpleExtSortParams>,
1116) -> Result<(), Box<dyn Error>> {
1117 let writer = BufWriter::new(if output_overwrite {
1118 OpenOptions::new()
1119 .write(true)
1120 .create(true)
1121 .truncate(true)
1122 .open(output_path)
1123 } else {
1124 OpenOptions::new()
1125 .write(true)
1126 .create_new(true)
1127 .open(output_path)
1128 }?);
1129 hpx_external_sort_csv_file_gen(
1130 input_path,
1131 writer,
1132 ilon,
1133 ilat,
1134 has_header,
1135 separator,
1136 depth,
1137 save_countmap_in_file,
1138 sort_params,
1139 )
1140}
1141
1142pub fn hpx_external_sort_csv_file_stdout<IN: AsRef<Path>, P: AsRef<Path>>(
1157 input_path: IN,
1158 ilon: usize,
1159 ilat: usize,
1160 has_header: bool,
1161 separator: Option<char>,
1162 depth: u8,
1163 save_countmap_in_file: Option<P>, sort_params: Option<SimpleExtSortParams>,
1165) -> Result<(), Box<dyn Error>> {
1166 let writer = io::stdout().lock();
1167 hpx_external_sort_csv_file_gen(
1168 input_path,
1169 writer,
1170 ilon,
1171 ilat,
1172 has_header,
1173 separator,
1174 depth,
1175 save_countmap_in_file,
1176 sort_params,
1177 )
1178}
1179
1180pub fn hpx_external_sort_csv_file_gen<IN: AsRef<Path>, W: Write, P: AsRef<Path>>(
1196 input_path: IN,
1197 mut writer: W,
1198 ilon: usize,
1199 ilat: usize,
1200 has_header: bool,
1201 separator: Option<char>,
1202 depth: u8,
1203 save_countmap_in_file: Option<P>, sort_params: Option<SimpleExtSortParams>,
1205) -> Result<(), Box<dyn Error>> {
1206 let hpx29 = get_hpx(ilon, ilat, separator.unwrap_or(','), get(29));
1208
1209 let mut line_res_it = BufReader::new(File::open(&input_path)?).lines().peekable();
1211 while let Some(Ok(line)) = line_res_it.next_if(|res| {
1213 res
1214 .as_ref()
1215 .map(|line| line.starts_with('#'))
1216 .unwrap_or(false)
1217 }) {
1218 writer.write_all(line.as_bytes())?;
1219 }
1220 if has_header {
1222 if let Some(header) = line_res_it.next().transpose()? {
1223 write!(&mut writer, "{}\n", header)?;
1224 }
1225 }
1226 let sort_params = sort_params.unwrap_or_default();
1227 let thread_pool = get_thread_pool(sort_params.n_threads);
1228
1229 let tstart = SystemTime::now();
1230 debug!("Start generating count map (first iteration on the full CSV file)...");
1231 let count_map = ImplicitCountMapU32::from_csv_it_par(
1232 line_res_it,
1233 ilon,
1234 ilat,
1235 separator,
1236 depth,
1237 sort_params.n_elems_per_chunk as usize,
1238 &thread_pool,
1239 )?;
1240 debug!(
1241 "... count map computed in {} ms",
1242 SystemTime::now()
1243 .duration_since(tstart)
1244 .unwrap_or_default()
1245 .as_millis()
1246 );
1247 let tstart = SystemTime::now();
1248 sort_params.create_tmp_dir()?;
1249 if let Some(countmap_path) = save_countmap_in_file {
1250 count_map.to_fits_file(countmap_path)?;
1251 }
1252 debug!(
1253 "Count map of writen in {} ms.",
1254 SystemTime::now()
1255 .duration_since(tstart)
1256 .unwrap_or_default()
1257 .as_millis()
1258 );
1259
1260 let mut line_res_it = BufReader::new(File::open(&input_path)?).lines().peekable();
1262 while line_res_it
1264 .next_if(|res| {
1265 res
1266 .as_ref()
1267 .map(|line| line.starts_with('#'))
1268 .unwrap_or(false)
1269 })
1270 .is_some()
1271 {}
1272 if has_header {
1274 line_res_it.next();
1275 }
1276 let sorted_it =
1278 hpx_external_sort_with_knowledge(line_res_it, &count_map, hpx29, Some(sort_params))?;
1279
1280 debug!("Starts writing sorted rows in output file...");
1281 let tstart = SystemTime::now();
1282 for row_res in sorted_it {
1284 row_res.and_then(|row| writeln!(&mut writer, "{}", row).map_err(|e| e.into()))?;
1285 }
1286 debug!(
1287 "... writing sorted rows in {} ms",
1288 SystemTime::now()
1289 .duration_since(tstart)
1290 .unwrap_or_default()
1291 .as_millis()
1292 );
1293 Ok(())
1294}
1295
1296pub fn get_hpx(
1301 ilon: usize,
1302 ilat: usize,
1303 separator: char,
1304 layer: &'static Layer,
1305) -> impl Fn(&String) -> u64 {
1306 let (index_first_col, offset_to_second_col, ilon, ilat) = if ilon < ilat {
1307 (ilon, ilat - ilon - 1, 0, 1)
1308 } else {
1309 (ilat, ilon - ilat - 1, 1, 0)
1310 };
1311
1312 move |line: &String| {
1313 let mut field_it = line.as_str().split(separator);
1314 let coos = [
1315 field_it.nth(index_first_col).and_then(|s| {
1316 s.parse::<f64>()
1317 .map_err(|e| error!("Error parsing 1st coo: '{}': {:?}", s, e))
1318 .ok()
1319 }),
1320 field_it.nth(offset_to_second_col).and_then(|s| {
1321 s.parse::<f64>()
1322 .map_err(|e| error!("Error parsing 2nd coo: '{}': {:?}", s, e))
1323 .ok()
1324 }),
1325 ];
1326 match (coos[ilon], coos[ilat]) {
1327 (Some(lon), Some(lat)) => layer.hash(lon.to_radians(), lat.to_radians()),
1328 _ => {
1329 error!(
1330 "Error parsing coordinates at line: {}. Hash set to 0.",
1331 line
1332 );
1333 0
1334 }
1335 }
1336 }
1337}
1338
1339pub fn get_hpx_opt(
1344 ilon: usize,
1345 ilat: usize,
1346 separator: char,
1347 layer: &'static Layer,
1348) -> impl Fn(&String) -> Option<u64> {
1349 let (index_first_col, offset_to_second_col, ilon, ilat) = if ilon < ilat {
1350 (ilon, ilat - ilon - 1, 0, 1)
1351 } else {
1352 (ilat, ilon - ilat - 1, 1, 0)
1353 };
1354 move |line: &String| {
1355 let mut field_it = line.as_str().split(separator);
1356 let coos = [
1357 field_it.nth(index_first_col).and_then(|s| {
1358 s.parse::<f64>()
1359 .map_err(|e| error!("Error parsing 1st coo: '{}': {:?}", s, e))
1360 .ok()
1361 }),
1362 field_it.nth(offset_to_second_col).and_then(|s| {
1363 s.parse::<f64>()
1364 .map_err(|e| error!("Error parsing 2nd coo: '{}': {:?}", s, e))
1365 .ok()
1366 }),
1367 ];
1368 match (coos[ilon], coos[ilat]) {
1369 (Some(lon), Some(lat)) => Some(layer.hash(lon.to_radians(), lat.to_radians())),
1370 _ => {
1371 error!(
1372 "Error parsing coordinates at line: {}. Lon: {:?}. Lat: {:?}. Hash set to 0.",
1373 line, coos[ilon], coos[ilat]
1374 );
1375 None
1376 }
1377 }
1378 }
1379}
1380
1381pub fn create_test_file(depth: u8, path: &str) -> Result<(), IoError> {
1386 debug!("Starts writing test file...");
1387 let tstart = SystemTime::now();
1388 let n_hash = n_hash(depth);
1389 let width = (n_hash.ilog10() + 1) as usize; let dummmy_cols = "abc,def,ghi,jkl,mno,pqr,stu,vwx,yz"; let layer = get(depth);
1392
1393 let mut bufw = BufWriter::new(File::create(path)?);
1394
1395 for h in 0..n_hash {
1396 let (lon, lat) = layer.center(h);
1397 writeln!(
1398 bufw,
1399 "{:0width$},{:014.10},{:+014.10},{}",
1400 h,
1401 lon.to_degrees(),
1402 lat.to_degrees(),
1403 dummmy_cols
1404 )?;
1405 }
1406 debug!(
1407 "... done in {} ms",
1408 SystemTime::now()
1409 .duration_since(tstart)
1410 .unwrap_or_default()
1411 .as_millis()
1412 );
1413 Ok(())
1414}
1415
1416#[cfg(test)]
1417mod tests {
1418 use super::*;
1419 use std::process::Command;
1420
1421 fn init_logger() {
1422 let log_level = log::LevelFilter::max();
1423 let _ = env_logger::builder()
1426 .filter_level(log_level)
1428 .is_test(true)
1430 .try_init();
1432 }
1433
1434 #[cfg(target_os = "linux")]
1435 #[test]
1436 fn testok_sort() {
1437 init_logger();
1438 let depth_file = 5; let depth_sort = 4;
1445 fs::create_dir_all("./local_resources/").unwrap();
1446 create_test_file(depth_file, "./local_resources/test.csv").unwrap();
1447 Command::new("bash")
1448 .arg("-c")
1449 .arg("shuf ./local_resources/test.csv -o ./local_resources/input.csv")
1450 .output()
1451 .expect("failed to execute process");
1452 hpx_external_sort_csv_file(
1453 "./local_resources/input.csv",
1454 "./local_resources/output.csv",
1455 true,
1456 1,
1457 2,
1458 false,
1459 Some(','),
1460 depth_sort,
1461 None::<&'static str>, Some(SimpleExtSortParams {
1463 tmp_dir: PathBuf::from("./local_resources/sort_tmp/"),
1464 n_elems_per_chunk: 1000, n_threads: Some(4),
1466 clean: true,
1467 }),
1468 )
1469 .unwrap();
1470 let out = Command::new("bash")
1471 .arg("-c")
1472 .arg("diff ./local_resources/test.csv ./local_resources/output.csv")
1473 .output()
1474 .expect("failed to execute process");
1475 assert!(out.status.success());
1476 assert!(out.stdout.is_empty());
1477 Command::new("bash")
1478 .arg("-c")
1479 .arg("rm -r ./local_resources/test.csv ./local_resources/output.csv")
1480 .output()
1481 .expect("failed to execute process");
1482 }
1483
1484 }