Skip to main content

cdshealpix/nested/sort/
mod.rs

1use std::{
2  cmp::Ordering,
3  error::Error,
4  fs::{self, read_dir, remove_dir, remove_file, File, OpenOptions},
5  io::{self, stdin, BufRead, BufReader, BufWriter, Error as IoError, Seek, Write},
6  iter::Zip,
7  marker::{Send, Sync},
8  ops::Range,
9  path::{Path, PathBuf},
10  time::SystemTime,
11  vec::IntoIter,
12};
13
14use bincode::{
15  self,
16  config::{FixintEncoding, WithOtherIntEncoding},
17  DefaultOptions, Error as BincodeError, Options,
18};
19use log::{debug, error, warn};
20use rayon::{prelude::ParallelSliceMut, ThreadPool}; // iter::ZipEq
21use serde::{de::DeserializeOwned, Deserialize, Serialize};
22use thiserror::Error;
23
24use crate::nested::{
25  get,
26  map::skymap::{implicit::ImplicitCountMapU32, SkyMap},
27  n_hash, Layer,
28};
29
30pub mod cindex;
31pub mod scindex;
32
33/// Type defining a row that can be sorted internally.
34pub trait IntSortable: Send {}
35/// All types that implement `Send` are `IntSortable`.
36impl<A: Send> IntSortable for A {}
37
38/// Type defining a row than can be sorted externally, i.e. need to be serialized and deserialized.
39/// Keep it mind that to ser/de  `&[u8]` or `Vec<u8>`, using
40/// [serde_bytes](https://docs.rs/serde_bytes/latest/serde_bytes/) can boost performances.
41pub trait ExtSortable: IntSortable + Serialize + DeserializeOwned {}
42/// All types that implement `IntSortable`, `Serialize` and `DeserializeOwned` are `ExtSortable`.
43impl<A: IntSortable + Serialize + DeserializeOwned> ExtSortable for A {}
44
45#[derive(Error, Debug)]
46pub enum SortError {
47  #[error("I/O error")]
48  IoError(#[from] IoError),
49  #[error("Serialization/deserialization (bincode) error")]
50  BincodeError(#[from] BincodeError),
51  #[error("Sort error: `{0}`.")]
52  Custom(String),
53}
54
55/// Sort internally (i.e. in memory) the given array of elements according to the order 29 HEALPix
56/// cell computed using the given `hpx29` function.
57/// If the provided input parameter `n_thread` is `None`, the maximum number of thread available
58/// on the machine is used. Use `n_thread = Some(1)` for a monothreaded execution (without having
59/// to instantiate a thread pool).
60/// # Note
61/// Having a costly `hpx29` method is ok: internally it will be called only once per row
62/// (not multiple times durng the sort process).
63pub fn hpx_internal_sort<T, F>(elems: &mut [T], hpx29: F, n_threads: Option<usize>)
64where
65  T: IntSortable,
66  F: Fn(&T) -> u64 + Sync,
67{
68  match n_threads {
69    Some(1) => elems.sort_by_cached_key(hpx29),
70    _ => get_thread_pool(n_threads).install(|| elems.par_sort_by_cached_key(&hpx29)),
71  }
72}
73
74/// Returns a pool of thread containing:
75/// * the maximum number of threads available on the machine, if `n_threads` is `None`
76/// * the given number of threads in the `n_threads` option if it is `Some`.
77fn get_thread_pool(n_threads: Option<usize>) -> ThreadPool {
78  let mut pool_builder = rayon::ThreadPoolBuilder::new();
79  if let Some(n_threads) = n_threads {
80    pool_builder = pool_builder.num_threads(n_threads);
81  }
82  pool_builder.build().unwrap()
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86pub struct SimpleExtSortInfo {
87  /// Maximum number of elements per chunk (i.e. per file).
88  n_elems_per_chunk: u32,
89  /// Number of used threads.
90  n_threads: Option<usize>,
91  /// Remove temporary files (and possibly temporary dir) once the output iteration is over.
92  clean: bool,
93  /// Total number of rows.
94  n_tot: u64,
95  /// Healpix depth of the ranges in filenames.
96  depth: u8,
97  /// Healpix ranges at `depth`, one range per chunk/file, together with the number of rows in the chunk/file.
98  ordered_ranges_counts: Vec<(Range<u32>, u32)>,
99}
100impl SimpleExtSortInfo {
101  fn new(
102    n_elems_per_chunk: u32,
103    n_threads: Option<usize>,
104    clean: bool,
105    n_tot: u64,
106    depth: u8,
107    ordered_ranges_counts: Vec<(Range<u32>, u32)>,
108  ) -> Self {
109    Self {
110      n_elems_per_chunk,
111      n_threads,
112      clean,
113      n_tot,
114      depth,
115      ordered_ranges_counts,
116    }
117  }
118}
119
120pub struct SimpleExtSortParams {
121  /// Directory containing the temporary directories/files.
122  tmp_dir: PathBuf,
123  /// Number of items to be sorted at once in memory, correspond to the maximum number of items
124  /// in a temporary file at second pass.
125  n_elems_per_chunk: u32,
126  /// Provide a given number of threads, else use all threads available.
127  n_threads: Option<usize>,
128  /// Remove temporary files (and possibly temporary dir) once the output iteration is over.
129  clean: bool,
130}
131impl Default for SimpleExtSortParams {
132  fn default() -> Self {
133    Self::new(PathBuf::from(".sort_tmp/"), 10_000_000, None, true)
134  }
135}
136impl SimpleExtSortParams {
137  const ALL_FILENAME: &'static str = "hpxsort.all.unsorted.bin";
138  const PREFIX: &'static str = "hpxsort.";
139  const INFO_FILENAME: &'static str = "hpxsort.info.toml";
140  const SUCCESS_FILENAME: &'static str = "hpxsort.success";
141  const SUFFIX: &'static str = ".unsorted.bin";
142
143  pub fn new(
144    tmp_dir: PathBuf,
145    n_elems_per_chunk: u32,
146    n_threads: Option<usize>,
147    clean: bool,
148  ) -> Self {
149    Self {
150      tmp_dir,
151      n_elems_per_chunk,
152      n_threads,
153      clean,
154    }
155  }
156
157  fn clean<P: AsRef<Path>>(tmp_dir: P) -> Result<(), IoError> {
158    let mut path = PathBuf::from(tmp_dir.as_ref());
159    debug!("PATH: {:?}", &path);
160    assert!(path.file_name().is_some());
161    // Remove SUCCESS first (since we are going to remove files, we have to remove the SUCCESS flag first)
162    path.push(Self::SUCCESS_FILENAME);
163    debug!("Remove file {:?} if exists.", &path);
164    fs::exists(&path).and_then(|exists| if exists { remove_file(&path) } else { Ok(()) })?;
165    // Remove info file
166    path.set_file_name(Self::INFO_FILENAME);
167    debug!("Remove file {:?} if exists.", &path);
168    fs::exists(&path).and_then(|exists| if exists { remove_file(&path) } else { Ok(()) })?;
169    // Remove all file if exists
170    path.set_file_name(Self::ALL_FILENAME);
171    debug!("Remove file {:?} if exists.", &path);
172    fs::exists(&path).and_then(|exists| if exists { remove_file(&path) } else { Ok(()) })?;
173    // Remove temporary files
174    path.pop();
175    for file in read_dir(&path)? {
176      let file_path = file?.path();
177      if file_path.is_file()
178        && file_path
179          .file_name()
180          .and_then(|os_str| os_str.to_str())
181          .and_then(SimpleExtSortParams::parse_tmp_file_name)
182          .is_some()
183      {
184        debug!("Remove file {:?} if exists.", &file_path);
185        remove_file(file_path)?;
186      }
187    }
188    // Finally remove the directory if empty
189    let tmp_dir_ref = tmp_dir.as_ref();
190    // debug_assert_eq!(tmp_dir_ref.as_os_str(), path.as_os_str());
191    debug!("Remove dir {:?} if exists and is empty.", tmp_dir_ref);
192    if fs::exists(tmp_dir_ref).and_then(move |exists| {
193      if exists {
194        read_dir(tmp_dir_ref).map(|mut it| it.next().is_none())
195      } else {
196        Ok(false)
197      }
198    })? {
199      remove_dir(tmp_dir_ref)?;
200    } else {
201      warn!("Unable to remove directory {:?}", tmp_dir_ref);
202    }
203    Ok(())
204  }
205
206  pub fn set_tmp_dir(mut self, tmp_dir: PathBuf) -> Self {
207    self.tmp_dir = tmp_dir;
208    self
209  }
210
211  pub fn set_n_elems_per_chunk(mut self, n_elems_per_chunk: u32) -> Self {
212    self.n_elems_per_chunk = n_elems_per_chunk;
213    self
214  }
215
216  pub fn set_n_threads(mut self, n_threads: usize) -> Self {
217    self.n_threads = Some(n_threads);
218    self
219  }
220
221  fn from_dir_and_info(tmp_dir: PathBuf, info: &SimpleExtSortInfo) -> Self {
222    Self {
223      tmp_dir,
224      n_elems_per_chunk: info.n_elems_per_chunk,
225      n_threads: info.n_threads,
226      clean: info.clean,
227    }
228  }
229
230  fn create_tmp_dir(&self) -> Result<(), IoError> {
231    debug!("Create tmp dir: {}", &self.tmp_dir.to_string_lossy());
232    fs::create_dir_all(&self.tmp_dir)
233  }
234
235  /// Open or create, in write mode (replacing previous content if any).
236  fn create_file_all(&self) -> Result<File, IoError> {
237    let mut path = self.tmp_dir.clone();
238    path.push(Self::ALL_FILENAME);
239    debug!(
240      "Create or open to overwrite file: {}",
241      path.to_string_lossy()
242    );
243    OpenOptions::new()
244      .append(false)
245      .write(true)
246      .create(true)
247      .truncate(true)
248      .open(path)
249  }
250  fn open_file_all(&self) -> Result<File, IoError> {
251    let mut path = self.tmp_dir.clone();
252    path.push(Self::ALL_FILENAME);
253    File::open(path)
254  }
255
256  fn create_file_or_open_to_append(&self, depth: u8, range: &Range<u32>) -> Result<File, IoError> {
257    let n_digits = (1 + n_hash(depth).ilog10()) as usize;
258    let filename = format!(
259      "{}d{}f{:0n_digits$}t{:0n_digits$}{}",
260      Self::PREFIX,
261      depth,
262      range.start,
263      range.end,
264      Self::SUFFIX
265    );
266    let mut path = self.tmp_dir.clone();
267    path.push(filename);
268    debug!("Create or open to append file: {}.", path.to_string_lossy(),);
269    OpenOptions::new().append(true).create(true).open(path)
270  }
271
272  // Rea/Write info file
273
274  fn create_info_file_path(&self) -> PathBuf {
275    let mut path = self.tmp_dir.clone();
276    path.push(Self::INFO_FILENAME);
277    path
278  }
279
280  fn create_info_file_path_gen(path: &Path) -> PathBuf {
281    let mut path = path.to_path_buf();
282    path.push(Self::INFO_FILENAME);
283    path
284  }
285
286  fn write_info(
287    &self,
288    n_tot: u64,
289    depth: u8,
290    ordered_ranges_counts: Vec<(Range<u32>, u32)>,
291  ) -> Result<(), Box<dyn Error>> {
292    let info = SimpleExtSortInfo::new(
293      self.n_elems_per_chunk,
294      self.n_threads,
295      self.clean,
296      n_tot,
297      depth,
298      ordered_ranges_counts,
299    );
300    let content = toml::to_string_pretty(&info)?;
301    let path = self.create_info_file_path();
302    fs::write(&path, content)
303      .map_err(|e| format!("Error writing file {}: {:?}.", path.to_string_lossy(), e).into())
304  }
305
306  pub fn read_info(&self) -> Result<SimpleExtSortInfo, Box<dyn Error>> {
307    let path = self.create_info_file_path();
308    fs::read_to_string(&path)
309      .map_err(|e| format!("Error reading file {}: {:?}.", path.to_string_lossy(), e).into())
310      .and_then(|content| toml::from_str(&content).map_err(|e| e.into()))
311  }
312
313  fn read_info_gen(path: &Path) -> Result<SimpleExtSortInfo, Box<dyn Error>> {
314    let path = Self::create_info_file_path_gen(path);
315    fs::read_to_string(&path)
316      .map_err(|e| format!("Error reading file {}: {:?}.", path.to_string_lossy(), e).into())
317      .and_then(|content| toml::from_str(&content).map_err(|e| e.into()))
318  }
319
320  // Read/Write SUCESS file
321
322  fn create_ok_file_path(&self) -> PathBuf {
323    let mut path = self.tmp_dir.clone();
324    path.push(Self::SUCCESS_FILENAME);
325    path
326  }
327  fn write_ok(&self) -> Result<(), Box<dyn Error>> {
328    let path = self.create_ok_file_path();
329    fs::write(&path, "")
330      .map_err(|e| format!("Error writing file {}: {:?}.", path.to_string_lossy(), e).into())
331  }
332  pub fn ok_file_exists(&self) -> Result<bool, Box<dyn Error>> {
333    let path = self.create_ok_file_path();
334    fs::exists(&path).map_err(|e| {
335      format!(
336        "Error cheking for file {}: {:?}.",
337        path.to_string_lossy(),
338        e
339      )
340      .into()
341    })
342  }
343
344  //
345
346  /// Returns the depth and the range encoded in the filename.
347  fn parse_tmp_file_name(name: &str) -> Option<(u8, Range<u32>)> {
348    name
349      .strip_prefix(format!("{}d", Self::PREFIX).as_str())
350      .and_then(|s| s.strip_suffix(Self::SUFFIX))
351      .and_then(|s| s.split_once('f'))
352      .and_then(|(depth, s)| s.split_once('t').map(|(from, to)| (depth, from, to)))
353      .and_then(|(depth, from, to)| {
354        match (depth.parse::<u8>(), from.parse::<u32>(), to.parse::<u32>()) {
355          (Ok(d), Ok(f), Ok(t)) => Some((d, f..t)),
356          _ => None,
357        }
358      })
359  }
360
361  /// Returns a list of file `path`, `depth` dans HEALPix `range` sorted by increasing range starting
362  /// hash value.
363  fn get_ordered_files_in_tmp_dir(&self) -> Result<Vec<(PathBuf, u8, Range<u32>)>, IoError> {
364    read_dir(&self.tmp_dir).map(|it| {
365      let mut path_depth_range_vec: Vec<(PathBuf, u8, Range<u32>)> = it
366        .filter_map(|res_dir_entry| {
367          res_dir_entry.ok().and_then(|dir_entry| {
368            let path = dir_entry.path();
369            if path.is_file() {
370              path
371                .file_name()
372                .and_then(|os_str| os_str.to_str())
373                .and_then(SimpleExtSortParams::parse_tmp_file_name)
374                .map(|(depth, range)| (path, depth, range))
375            } else {
376              None
377            }
378          })
379        })
380        .collect();
381      // TODO: check that ranges are non-overlapping and depth is always the same?
382      path_depth_range_vec.sort_by(|(_, _, rl), (_, _, rr)| rl.start.cmp(&rr.start));
383      path_depth_range_vec.into_iter().collect()
384    })
385  }
386}
387
388///
389/// # Params
390/// * `it`: the iterator over all rows to be sorted
391/// * `count_map`: the skymap containing the number of rows inside each cell. It **must**
392///   be consistent with the rows we get from the input iterator `it`.
393///   If not known, a first pass is required, see [hpx_external_sort]{#hpx_external_sort}.
394/// * `hpx29`: the function computing HEALPix hash at depth 29 from a row reference.
395/// * `sort_param`: parameters specific to the sort algorithm
396///
397/// # Info
398/// * Why taking in input an iterator of `Result`?
399///     + because we expect large iteration that do no hold in memory, meaning that reading
400///       from a file (or the network) is involved, and thus an error may occur during the iteration.
401///
402/// # Algo
403/// This external sort is optimal: the input data is written and then read exactly once.
404/// This is possible only providing its HEALPix distribution.
405/// It probably means a first pass is required to compute the HEALPix distribution.
406/// When reading from a file, we have to read twice the input file:
407/// * first to compute the HEALPix cells distribution;
408/// * second to provide this method input iterator.
409///
410/// When reading from an iterator (distribution unknown), we additionally have to
411/// write the input (possibly pre-sorting it) before re-reading it.
412///
413/// # Warning
414/// Fails if the HEALPix cell distribution in the input iterator does not fit the one provided
415/// in the input `map`.
416///
417pub fn hpx_external_sort_with_knowledge<'a, T, E, I, S, F>(
418  it: I,
419  count_map: &'a S,
420  hpx29: F,
421  sort_params: Option<SimpleExtSortParams>,
422) -> Result<impl Iterator<Item = Result<T, Box<dyn Error>>>, Box<dyn Error>>
423where
424  T: ExtSortable,
425  E: Error + Send + 'static,
426  I: Iterator<Item = Result<T, E>> + Send,
427  S: SkyMap<'a, HashType = u32, ValueType = u32>,
428  F: Fn(&T) -> u64 + Sync,
429{
430  let params = sort_params.unwrap_or_default();
431  let tmp_dir = params.tmp_dir.clone();
432  hpx_external_sort_with_knowledge_write_tmp(it, count_map, &hpx29, Some(params))
433    .and_then(|()| hpx_external_sort_with_knowledge_read_tmp(hpx29, tmp_dir))
434}
435
436/// See [hpx_external_sort_with_knowledge](#hpx_external_sort_with_knowledge).
437/// This method perform the first part of the operation, i.e. creating and writing the (temporary)
438/// file structure on the disk.
439/// The second part consits in reading (and sorting in memory) the temporary files iteratively.
440///
441/// We separate the two parts to:
442/// * be able to only perform the read operation if an error occurs while provessing the output iterator.
443/// * allow for file-by-file sorting and serialization to create a structure on the disk (possibly indexing each file)
444///   for direct use.
445pub fn hpx_external_sort_with_knowledge_write_tmp<'a, T, E, I, S, F>(
446  mut it: I,
447  count_map: &'a S,
448  hpx29: F,
449  sort_params: Option<SimpleExtSortParams>,
450) -> Result<(), Box<dyn Error>>
451where
452  T: ExtSortable,
453  E: Error + Send + 'static,
454  I: Iterator<Item = Result<T, E>> + Send,
455  S: SkyMap<'a, HashType = u32, ValueType = u32>,
456  F: Fn(&T) -> u64 + Sync,
457{
458  // Get general params
459  let params = sort_params.unwrap_or_default();
460  let depth = count_map.depth();
461  let dd = 29 - depth;
462  let twice_dd = dd << 1;
463  // Config bincode
464  let bincode = get_bincode();
465  // Create the temporary directory
466  params.create_tmp_dir()?;
467  // Init thread pool
468  let pool = get_thread_pool(params.n_threads);
469  // Get 'chunks', i.e. files corresponding to (ordered) HEALPix ranges containing a maximum of `n_max` rows.
470  let n_max = params.n_elems_per_chunk;
471  let (n_tot, mut ranges_counts) = skymap2ranges(count_map, n_max);
472  params.write_info(n_tot, depth, ranges_counts.clone())?;
473
474  let n_max = n_max as usize;
475  // Now iterates on rows and sort them before writing into 'chunks' files.
476  let tstart = SystemTime::now();
477  let mut entries = (&mut it).take(n_max).collect::<Result<Vec<T>, _>>()?;
478  debug!(
479    "Read {} elements in {} ms",
480    entries.len(),
481    SystemTime::now()
482      .duration_since(tstart)
483      .unwrap_or_default()
484      .as_millis()
485  );
486
487  let mut n = entries.len();
488  let mut n_tot_it = 0_u64;
489  while n > 0 {
490    let tstart = SystemTime::now();
491    let (next_entries, ()) = pool.join(
492      || (&mut it).take(n_max).collect::<Result<Vec<T>, _>>(),
493      || {
494        let tstart = SystemTime::now();
495        entries.par_sort_by_cached_key(&hpx29);
496        debug!(
497          "Sort {} elements in {} ms",
498          entries.len(),
499          SystemTime::now()
500            .duration_since(tstart)
501            .unwrap_or_default()
502            .as_millis()
503        );
504      },
505    );
506    debug!(
507      "Read {} elements (+ parallel sort of prev elems) in {} ms",
508      entries.len(),
509      SystemTime::now()
510        .duration_since(tstart)
511        .unwrap_or_default()
512        .as_millis()
513    );
514    let next_entries = next_entries?;
515
516    // Write in files
517    let tstart = SystemTime::now();
518    let mut entries_view = entries.as_mut_slice();
519    // Find first and last range, then iter_mut and this sub_slice
520    // * unwrap() is ok for first and last since we 'break' if the buffer is empty.
521    // * unwrap() is ok on binary_search since ranges cover the full hash range.
522    let first_h = (hpx29(entries_view.first().unwrap()) >> twice_dd) as u32;
523    let last_h = (hpx29(entries_view.last().unwrap()) >> twice_dd) as u32;
524    let rstart = ranges_counts
525      .binary_search_by(get_range_binsearch(first_h))
526      .unwrap();
527    let rend = ranges_counts
528      .binary_search_by(get_range_binsearch(last_h))
529      .unwrap();
530
531    for (range, count) in &mut ranges_counts[rstart..=rend] {
532      let to = entries_view.partition_point(|row| {
533        let h = (hpx29(row) >> twice_dd) as u32;
534        range.contains(&h)
535      });
536      if to > 0 {
537        let (to_be_writen, remaining) = entries_view.split_at_mut(to);
538        entries_view = remaining;
539        // let tstart = SystemTime::now();
540        let file = params.create_file_or_open_to_append(depth, range)?;
541        let mut bufw = BufWriter::new(file);
542        for row in to_be_writen {
543          bincode.serialize_into(&mut bufw, row)?;
544        }
545
546        let to = to as u32;
547        if to > *count {
548          // TODO: clean removing tmp files?
549          return Err(
550            format!(
551              "Wrong number of counts for range [{}, {}). N remaining: {}. N to be removed: {}.",
552              range.start, range.end, count, &to
553            )
554            .into(),
555          );
556        }
557        *count -= to;
558      }
559    }
560    debug!(
561      "{} eleemnts added to temp files in {} ms",
562      n,
563      SystemTime::now()
564        .duration_since(tstart)
565        .unwrap_or_default()
566        .as_millis()
567    );
568
569    n_tot_it += n as u64;
570    entries = next_entries;
571    n = entries.len();
572  }
573  // Post write operation checks
574  // * check ntot
575  if n_tot_it != n_tot {
576    // TODO: remove tmp files?
577    return Err(
578      format!(
579        "Wrong number of written rows. Expected: {}. Actual: {}",
580        n_tot, n_tot_it
581      )
582      .into(),
583    );
584  }
585  // * check healpix ranges counts (all 0)
586  for (range, count) in &ranges_counts {
587    if *count != 0 {
588      // TODO: remove tmp files?
589      return Err(
590        format!(
591          "Wrong number of written rows for range [{}, {}). N remaining: {}",
592          range.start, range.end, count
593        )
594        .into(),
595      );
596    }
597  }
598  // write the count map??!
599  params.write_ok()
600}
601
602/// See [hpx_external_sort_with_knowledge](#hpx_external_sort_with_knowledge).
603/// This method perform the second part of the operation, i.e. read the temporary writen files
604/// and sort them on-the-fly to provide a sorted iterator over all entries.
605pub fn hpx_external_sort_with_knowledge_read_tmp<T, F>(
606  hpx29: F,
607  tmp_dir: PathBuf,
608) -> Result<impl Iterator<Item = Result<T, Box<dyn Error>>>, Box<dyn Error>>
609where
610  T: ExtSortable,
611  F: Fn(&T) -> u64 + Sync,
612{
613  // Get params
614  let info = SimpleExtSortParams::read_info_gen(&tmp_dir)?;
615  let params = SimpleExtSortParams::from_dir_and_info(tmp_dir, &info);
616  // Get list of written files, ordered according to the range.
617  let ordered_files = params.get_ordered_files_in_tmp_dir()?;
618  if ordered_files.is_empty() {
619    return Err(format!("No tmp file found in {:?}", &params.tmp_dir).into());
620  }
621  // Init thread pool
622  let thread_pool = get_thread_pool(params.n_threads);
623
624  fn load_file<TT: ExtSortable>(nrows: usize, path: PathBuf) -> Result<Vec<TT>, SortError> {
625    let path_str = path.to_string_lossy().to_string();
626    let tstart = SystemTime::now();
627    let file = File::open(path).map_err(SortError::IoError)?;
628    let file_len = file.metadata().map_err(SortError::IoError)?.len();
629    let mut bufr = BufReader::new(file);
630    let bincode = get_bincode();
631    let mut rows = Vec::with_capacity(nrows);
632    for _ in 0..nrows {
633      rows.push(
634        bincode
635          .deserialize_from(&mut bufr)
636          .map_err(SortError::BincodeError)?,
637      );
638    }
639    let pos = bufr.stream_position().map_err(SortError::IoError)?;
640    if pos != file_len {
641      Err(SortError::Custom(format!(
642        "File position '{}' does not match file len '{}'.",
643        pos, file_len
644      )))
645    } else {
646      debug!(
647        "Read file {} in {} ms",
648        path_str,
649        SystemTime::now()
650          .duration_since(tstart)
651          .unwrap_or_default()
652          .as_millis()
653      );
654      Ok(rows)
655    }
656  }
657  #[allow(clippy::type_complexity)]
658  fn load_next_file<TT: ExtSortable>(
659    elem: Option<((PathBuf, u8, Range<u32>), (Range<u32>, u32))>,
660  ) -> Option<Result<Vec<TT>, SortError>> {
661    match elem {
662      Some(((path, _depth, lrange), (rrange, nrows))) => {
663        assert_eq!(
664          rrange, lrange,
665          "File range difference from counts range: {:?} != {:?}.",
666          &rrange, &lrange
667        );
668        Some(load_file(nrows as usize, path.clone()))
669      }
670      None => None,
671    }
672  }
673
674  // Unwrap ok since we tested that `ordered_files` is not empty.
675  let mut ordered_files_counts_it = ordered_files.into_iter().zip(info.ordered_ranges_counts);
676  let ((file_path, _depth, lrange), (rrange, nrows)) = ordered_files_counts_it.next().unwrap();
677  assert_eq!(
678    rrange, lrange,
679    "File range difference from counts range: {:?} != {:?}.",
680    &rrange, &lrange
681  );
682
683  let mut rows_to_be_sorted = load_file(nrows as usize, file_path)?;
684  let (next_file_content, ()) = thread_pool.join(
685    || load_next_file(ordered_files_counts_it.next()),
686    || rows_to_be_sorted.par_sort_by_cached_key(&hpx29),
687  );
688
689  struct GlobalIt<TT: ExtSortable, FF: Fn(&TT) -> u64 + Sync> {
690    /// Pool of thread used for in-memmory sort.
691    thread_pool: ThreadPool,
692    /// Method to extract/compute the HEALPix order29 index from a row.
693    hpx29: FF,
694    /// Iterates on the files and range info: ((PathBuf, Depth, Range), (Range, NRows)).
695    #[allow(clippy::type_complexity)]
696    ordered_files_counts_it: Zip<IntoIter<(PathBuf, u8, Range<u32>)>, IntoIter<(Range<u32>, u32)>>,
697    /// Iterates on the sorted rows of a file.
698    rows_it: IntoIter<TT>,
699    /// The next chunk of rows to be sorted before iterating over
700    next_rows: Option<Vec<TT>>,
701    /// The temporary dir, used only if `clean == true`.
702    tmp_dir: PathBuf,
703    /// Clean temporary file once the iteration is over.
704    clean: bool,
705  }
706  impl<TT: ExtSortable, FF: Fn(&TT) -> u64 + Sync> Iterator for GlobalIt<TT, FF> {
707    type Item = Result<TT, Box<dyn Error>>;
708
709    fn next(&mut self) -> Option<Self::Item> {
710      match self.rows_it.next() {
711        Some(next_row) => Some(Ok(next_row)),
712        None => match self.next_rows.as_mut() {
713          Some(rows_to_be_sorted) => {
714            let (next_file_content, ()) = self.thread_pool.join(
715              || load_next_file(self.ordered_files_counts_it.next()),
716              || {
717                let tstart = SystemTime::now();
718                rows_to_be_sorted.par_sort_by_cached_key(&self.hpx29);
719                debug!(
720                  "{} rows sorted in {} ms",
721                  rows_to_be_sorted.len(),
722                  SystemTime::now()
723                    .duration_since(tstart)
724                    .unwrap_or_default()
725                    .as_millis()
726                );
727              },
728            );
729            match next_file_content.transpose() {
730              Ok(Some(next_row_chunk)) => {
731                self.rows_it = self.next_rows.replace(next_row_chunk).unwrap().into_iter();
732                self.next()
733              }
734              Ok(None) => {
735                self.rows_it = self.next_rows.take().unwrap().into_iter();
736                self.next()
737              }
738              Err(e) => Some(Err(e.into())),
739            }
740          }
741          None => {
742            if self.clean {
743              if let Err(e) = SimpleExtSortParams::clean(&self.tmp_dir) {
744                error!("Error cleaning external sort temporary files: {}", e);
745              }
746            }
747            None
748          }
749        },
750      }
751    }
752  }
753
754  Ok(GlobalIt {
755    thread_pool,
756    hpx29,
757    ordered_files_counts_it,
758    rows_it: rows_to_be_sorted.into_iter(),
759    next_rows: next_file_content.transpose()?,
760    tmp_dir: params.tmp_dir,
761    clean: params.clean,
762  })
763}
764
765// DefaultOptions
766fn get_bincode() -> WithOtherIntEncoding<DefaultOptions, FixintEncoding> {
767  DefaultOptions::new().with_fixint_encoding()
768  // .allow_trailing_bytes()
769}
770
771/// Method telling if a range is smaller, contains or is greater than the given input hash value.
772fn get_range_binsearch(hash: u32) -> impl Fn(&(Range<u32>, u32)) -> Ordering {
773  move |(target, _)| {
774    if target.end <= hash {
775      Ordering::Less
776    } else if hash < target.start {
777      Ordering::Greater
778    } else {
779      Ordering::Equal
780    }
781  }
782}
783
784/// Returns ordered ranges at the skymap depth containing a sum of `counts` of maximum `threshold`,
785/// together with this sum of counts each range contains.
786/// The first value of the returned tuple is the total number of counts in the skymap.
787fn skymap2ranges<'a, S>(counts: &'a S, threshold: u32) -> (u64, Vec<(Range<u32>, u32)>)
788where
789  S: SkyMap<'a, HashType = u32, ValueType = u32>,
790{
791  // 1000 chosen assuming a total count of 10x10^9 and a threshold of 10x10^6.
792  let mut range_count_vec: Vec<(Range<u32>, u32)> = Vec::with_capacity(1000);
793  let mut n_tot = 0_u64;
794
795  let depth = counts.depth();
796
797  let mut start = 0_u32;
798  let mut cumul_count = 0_u32;
799
800  for (hash, count) in counts.entries() {
801    let sum = cumul_count + count;
802    if sum <= threshold {
803      cumul_count = sum;
804    } else {
805      range_count_vec.push((start..hash, cumul_count));
806      n_tot += cumul_count as u64;
807      start = hash;
808      cumul_count = *count;
809    }
810  }
811  range_count_vec.push((start..n_hash(depth) as u32, cumul_count));
812  n_tot += cumul_count as u64;
813
814  (n_tot, range_count_vec)
815}
816
817/// # Params
818/// * `iterator`: the iterator on depth 29 healpix cells computed from the rows, and used to compute the count map
819/// * `iterable`: object returning an iterator on the rows that are actually sorted and written in output
820/// # Warning
821/// The input `iterator` and `iterable` **must** provide the same count map!
822/// # Note 1
823/// Both the input and the output are row oriented, another mechanism should be considered
824/// to sort a column oriented format.
825/// Also, only basic row oriented formats are supported (since we serialize row by row and not
826/// globally providing a serializer). It could be improved in a future, more elaborate, version.
827/// # Note 2
828/// For simple case, hy not use a `Clonable IntoIter`?
829/// I just found [this post](https://orxfun.github.io/orxfun-notes/#/missing-iterable-traits-2024-12-13)
830/// that seems to tackle a similar problem here (iterating several time over a same Collection).
831pub fn hpx_external_sort<T, E, I, J, F, P: AsRef<Path>>(
832  iterator: I,
833  iterable: J,
834  hpx29: F,
835  depth: u8,
836  save_countmap_in_file: Option<P>, // Save a copy of the computed count map in the given Path
837  sort_params: Option<SimpleExtSortParams>,
838) -> Result<impl Iterator<Item = Result<T, Box<dyn Error>>>, Box<dyn Error>>
839where
840  T: ExtSortable,
841  E: Error + Send + 'static,
842  I: Iterator<Item = Result<u64, E>> + Send,
843  J: IntoIterator<Item = Result<T, E>>,
844  <J as IntoIterator>::IntoIter: Send,
845  F: Fn(&T) -> u64 + Send + Sync,
846{
847  debug!("Starts computing the count map of depth {}...", depth);
848  let tstart = SystemTime::now();
849  let params = sort_params.unwrap_or_default();
850  let twice_dd = (29 - depth) << 1;
851  let count_map = ImplicitCountMapU32::from_hash_values(
852    depth,
853    iterator
854      .enumerate()
855      .filter_map(|(irow, row_res)| match row_res {
856        Ok(row_hash) => Some((row_hash >> twice_dd) as u32),
857        Err(e) => {
858          error!("Error at line {}, line ignored: {:?}", irow, e);
859          None
860        }
861      }),
862  );
863  debug!(
864    "... count map of depth {} computed in {} ms.",
865    depth,
866    SystemTime::now()
867      .duration_since(tstart)
868      .unwrap_or_default()
869      .as_millis()
870  );
871  let tstart = SystemTime::now();
872  params.create_tmp_dir()?;
873  if let Some(countmap_path) = save_countmap_in_file {
874    count_map.to_fits_file(countmap_path)?;
875  }
876  debug!(
877    "Count map of writen in {} ms.",
878    SystemTime::now()
879      .duration_since(tstart)
880      .unwrap_or_default()
881      .as_millis()
882  );
883  hpx_external_sort_with_knowledge(iterable.into_iter(), &count_map, hpx29, Some(params))
884}
885
886pub fn hpx_external_sort_stream<T, E, I, F, P: AsRef<Path>>(
887  stream: I,
888  hpx29: F,
889  depth: u8,
890  save_countmap_in_file: Option<P>, // Save a copy of the computed count map in the given Path
891  sort_params: Option<SimpleExtSortParams>,
892) -> Result<impl Iterator<Item = Result<T, Box<dyn Error>>>, Box<dyn Error>>
893where
894  T: ExtSortable,
895  E: Error + 'static,
896  I: Iterator<Item = Result<T, E>>,
897  F: Fn(&T) -> u64 + Sync,
898{
899  let params = sort_params.unwrap_or_default();
900  let twice_dd = (29 - depth) << 1;
901  let bincode = get_bincode();
902  // Write tmp file containing all rows, and compute the count map
903  let count_map = {
904    let tmp_file_all = params
905      .create_tmp_dir()
906      .and_then(|()| params.create_file_all())?;
907    let mut bufw = BufWriter::new(tmp_file_all);
908    ImplicitCountMapU32::from_hash_values(
909      depth,
910      stream
911        .enumerate()
912        .filter_map(|(irow, row_res)| match row_res {
913          Ok(row) => {
914            let hash = (hpx29(&row) >> twice_dd) as u32;
915            match bincode.serialize_into(&mut bufw, &row) {
916              Ok(()) => Some(hash),
917              Err(e) => {
918                error!("Error writing line {}, line ignored: {:?}", irow, e);
919                None
920              }
921            }
922          }
923          Err(e) => {
924            error!("Error reading line {}, line ignored: {:?}", irow, e);
925            None
926          }
927        }),
928    )
929  };
930  // Get an iterator over all written rows and apply the second part of the algorithm.
931  let n_rows = count_map.values().map(|count| *count as u64).sum();
932  if let Some(countmap_path) = save_countmap_in_file {
933    count_map.to_fits_file(countmap_path)?;
934  }
935  let mut bufr = params.open_file_all().map(BufReader::new)?;
936  hpx_external_sort_with_knowledge(
937    (0..n_rows).map(move |_| bincode.deserialize_from(&mut bufr)),
938    &count_map,
939    hpx29,
940    Some(params),
941  )
942}
943
944/// Apply the HEALPix external sort on a CSV data streammed from `stdin`, writing the output in `stdout`.
945/// # Params
946/// * `ilon`: index of the longitude column (starting at 0)
947/// * `ilat`: index of the latitude column (starting at 0)
948/// * `has_header`: tells that the first non-commented line is a header line (to be ignored in the
949///   sort process, but to be copied in output)
950/// * `separator`: ASCII value of the separator (`b','` if None)
951/// * `depth`: depth used to compute the count map and for the file ranges
952/// * `sort_params`: common sorting parameters
953/// * `clean`: remove temporary files if the operation succeed
954/// # Note
955/// For small files, we advice to read the full content of the in memory and use the
956/// [hpx_internal_sort](#hpx_internal_sort) method.
957pub fn hpx_external_sort_csv_stdin_stdout<P: AsRef<Path>>(
958  ilon: usize,
959  ilat: usize,
960  has_header: bool,
961  separator: Option<char>,
962  depth: u8,
963  save_countmap_in_file: Option<P>, // Save a copy of the computed count map in the given Path
964  sort_params: Option<SimpleExtSortParams>,
965) -> Result<(), Box<dyn Error>> {
966  let writer = io::stdout().lock();
967  hpx_external_sort_csv_stdin_gen(
968    writer,
969    ilon,
970    ilat,
971    has_header,
972    separator,
973    depth,
974    save_countmap_in_file,
975    sort_params,
976  )
977}
978
979/// Apply the HEALPix external sort on CSV data streamed from `stdin`, writing the output in a file.
980/// # Params
981/// * `output_path`: path of a file in which the result has to be put.
982/// * `output_overwrite`: do not fail if the output path already exists, but overwrite the file content
983///   (WARNING: setting this flag to `true` is discouraged).
984/// * `ilon`: index of the longitude column (starting at 0)
985/// * `ilat`: index of the latitude column (starting at 0)
986/// * `has_header`: tells that the first non-commented line is a header line (to be ignored in the
987///   sort process, but to be copied in output)
988/// * `separator`: ASCII value of the separator (`b','` if None)
989/// * `depth`: depth used to compute the count map and for the file ranges
990/// * `sort_params`: common sorting parameters
991/// * `clean`: remove temporary files if the operation succeed
992/// # Note
993/// For small files, we advice to read the full content of the in memory and use the
994/// [hpx_internal_sort](#hpx_internal_sort) method.
995pub fn hpx_external_sort_csv_stdin<OUT: AsRef<Path>, P: AsRef<Path>>(
996  output_path: OUT,
997  output_overwrite: bool,
998  ilon: usize,
999  ilat: usize,
1000  has_header: bool,
1001  separator: Option<char>,
1002  depth: u8,
1003  save_countmap_in_file: Option<P>, // Save a copy of the computed count map in the given Path
1004  sort_params: Option<SimpleExtSortParams>,
1005) -> Result<(), Box<dyn Error>> {
1006  let writer = BufWriter::new(if output_overwrite {
1007    OpenOptions::new()
1008      .write(true)
1009      .create(true)
1010      .truncate(true)
1011      .open(output_path)
1012  } else {
1013    OpenOptions::new()
1014      .write(true)
1015      .create_new(true)
1016      .open(output_path)
1017  }?);
1018  hpx_external_sort_csv_stdin_gen(
1019    writer,
1020    ilon,
1021    ilat,
1022    has_header,
1023    separator,
1024    depth,
1025    save_countmap_in_file,
1026    sort_params,
1027  )
1028}
1029
1030/// Apply the HEALPix external sort on CSV data streamed from `stdin`,
1031/// writing the output in the provided writer.
1032/// # Params
1033/// * `writer`: the writer in which the result is written
1034/// * `ilon`: index of the longitude column (starting at 0)
1035/// * `ilat`: index of the latitude column (starting at 0)
1036/// * `has_header`: tells that the first non-commented line is a header line (to be ignored in the
1037///   sort process, but to be copied in output)
1038/// * `separator`: ASCII value of the separator (`b','` if None)
1039/// * `depth`: depth used to compute the count map and for the file ranges
1040/// * `sort_params`: common sorting parameters
1041/// * `clean`: remove temporary files if the operation succeed
1042/// # Note
1043/// For small files, we advice to read the full content of the in memory and use the
1044/// [hpx_internal_sort](#hpx_internal_sort) method.
1045pub fn hpx_external_sort_csv_stdin_gen<W: Write, P: AsRef<Path>>(
1046  mut writer: W,
1047  ilon: usize,
1048  ilat: usize,
1049  has_header: bool,
1050  separator: Option<char>,
1051  depth: u8,
1052  save_countmap_in_file: Option<P>, // Save a copy of the computed count map in the given Path
1053  sort_params: Option<SimpleExtSortParams>,
1054) -> Result<(), Box<dyn Error>> {
1055  // Start reading file to create the count map
1056  let mut line_res_it = stdin().lock().lines().peekable();
1057  // Handle starting comments
1058  while let Some(Ok(line)) = line_res_it.next_if(|res| {
1059    res
1060      .as_ref()
1061      .map(|line| line.starts_with('#'))
1062      .unwrap_or(false)
1063  }) {
1064    writer.write_all(line.as_bytes())?;
1065  }
1066  // Handle header line
1067  if has_header {
1068    if let Some(header) = line_res_it.next().transpose()? {
1069      write!(&mut writer, "{}\n", header)?;
1070    }
1071  }
1072  // Get the sorted iterator
1073  let hpx29 = get_hpx(ilon, ilat, separator.unwrap_or(','), get(29));
1074  let sorted_it = hpx_external_sort_stream(
1075    line_res_it,
1076    hpx29,
1077    depth,
1078    save_countmap_in_file,
1079    sort_params,
1080  )?;
1081  // Write the results
1082  for row_res in sorted_it {
1083    row_res.and_then(|row| writeln!(&mut writer, "{}", row).map_err(|e| e.into()))?;
1084  }
1085  Ok(())
1086}
1087
1088/// Apply the HEALPix external sort on a CSV file, writing the output in a file.
1089/// # Params
1090/// * `input_path`: path of the file to be sorted.
1091/// * `output_path`: path of a file in which the result has to be put.
1092/// * `output_overwrite`: do not fail if the output path already exists, but overwrite the file content
1093///   (WARNING: setting this flag to `true` is discouraged).
1094/// * `ilon`: index of the longitude column (starting at 0)
1095/// * `ilat`: index of the latitude column (starting at 0)
1096/// * `has_header`: tells that the first non-commented line is a header line (to be ignored in the
1097///   sort process, but to be copied in output)
1098/// * `separator`: ASCII value of the separator (`b','` if None)
1099/// * `depth`: depth used to compute the count map and for the file ranges
1100/// * `sort_params`: common sorting parameters
1101/// * `clean`: remove temporary files if the operation succeed
1102/// # Note
1103/// For small files, we advice to read the full content of the in memory and use the
1104/// [hpx_internal_sort](#hpx_internal_sort) method.
1105pub fn hpx_external_sort_csv_file<IN: AsRef<Path>, OUT: AsRef<Path>, P: AsRef<Path>>(
1106  input_path: IN,
1107  output_path: OUT,
1108  output_overwrite: bool,
1109  ilon: usize,
1110  ilat: usize,
1111  has_header: bool,
1112  separator: Option<char>,
1113  depth: u8,
1114  save_countmap_in_file: Option<P>, // Save a copy of the computed count map in the given Path
1115  sort_params: Option<SimpleExtSortParams>,
1116) -> Result<(), Box<dyn Error>> {
1117  let writer = BufWriter::new(if output_overwrite {
1118    OpenOptions::new()
1119      .write(true)
1120      .create(true)
1121      .truncate(true)
1122      .open(output_path)
1123  } else {
1124    OpenOptions::new()
1125      .write(true)
1126      .create_new(true)
1127      .open(output_path)
1128  }?);
1129  hpx_external_sort_csv_file_gen(
1130    input_path,
1131    writer,
1132    ilon,
1133    ilat,
1134    has_header,
1135    separator,
1136    depth,
1137    save_countmap_in_file,
1138    sort_params,
1139  )
1140}
1141
1142/// Apply the HEALPix external sort on a CSV file, writing the output in `stdout`.
1143/// # Params
1144/// * `input_path`: path of the file to be sorted.
1145/// * `ilon`: index of the longitude column (starting at 0)
1146/// * `ilat`: index of the latitude column (starting at 0)
1147/// * `has_header`: tells that the first non-commented line is a header line (to be ignored in the
1148///   sort process, but to be copied in output)
1149/// * `separator`: ASCII value of the separator (`b','` if None)
1150/// * `depth`: depth used to compute the count map and for the file ranges
1151/// * `sort_params`: common sorting parameters
1152/// * `clean`: remove temporary files if the operation succeed
1153/// # Note
1154/// For small files, we advice to read the full content of the in memory and use the
1155/// [hpx_internal_sort](#hpx_internal_sort) method.
1156pub fn hpx_external_sort_csv_file_stdout<IN: AsRef<Path>, P: AsRef<Path>>(
1157  input_path: IN,
1158  ilon: usize,
1159  ilat: usize,
1160  has_header: bool,
1161  separator: Option<char>,
1162  depth: u8,
1163  save_countmap_in_file: Option<P>, // Save a copy of the computed count map in the given Path
1164  sort_params: Option<SimpleExtSortParams>,
1165) -> Result<(), Box<dyn Error>> {
1166  let writer = io::stdout().lock();
1167  hpx_external_sort_csv_file_gen(
1168    input_path,
1169    writer,
1170    ilon,
1171    ilat,
1172    has_header,
1173    separator,
1174    depth,
1175    save_countmap_in_file,
1176    sort_params,
1177  )
1178}
1179
1180/// Apply the HEALPix external sort on a CSV file, writing the output in the provided writer.
1181/// # Params
1182/// * `input_path`: path of the file to be sorted.
1183/// * `writer`: the writer in which the result is written
1184/// * `ilon`: index of the longitude column (starting at 0)
1185/// * `ilat`: index of the latitude column (starting at 0)
1186/// * `has_header`: tells that the first non-commented line is a header line (to be ignored in the
1187///   sort process, but to be copied in output)
1188/// * `separator`: ASCII value of the separator (`b','` if None)
1189/// * `depth`: depth used to compute the count map and for the file ranges
1190/// * `sort_params`: common sorting parameters
1191/// * `clean`: remove temporary files if the operation succeed
1192/// # Note
1193/// For small files, we advice to read the full content of the in memory and use the
1194/// [hpx_internal_sort](#hpx_internal_sort) method.
1195pub fn hpx_external_sort_csv_file_gen<IN: AsRef<Path>, W: Write, P: AsRef<Path>>(
1196  input_path: IN,
1197  mut writer: W,
1198  ilon: usize,
1199  ilat: usize,
1200  has_header: bool,
1201  separator: Option<char>,
1202  depth: u8,
1203  save_countmap_in_file: Option<P>, // Save a copy of the computed count map in the given Path
1204  sort_params: Option<SimpleExtSortParams>,
1205) -> Result<(), Box<dyn Error>> {
1206  // Declare variables/functions
1207  let hpx29 = get_hpx(ilon, ilat, separator.unwrap_or(','), get(29));
1208
1209  // Start reading file to create the count map
1210  let mut line_res_it = BufReader::new(File::open(&input_path)?).lines().peekable();
1211  // Handle starting comments
1212  while let Some(Ok(line)) = line_res_it.next_if(|res| {
1213    res
1214      .as_ref()
1215      .map(|line| line.starts_with('#'))
1216      .unwrap_or(false)
1217  }) {
1218    writer.write_all(line.as_bytes())?;
1219  }
1220  // Handle header line
1221  if has_header {
1222    if let Some(header) = line_res_it.next().transpose()? {
1223      write!(&mut writer, "{}\n", header)?;
1224    }
1225  }
1226  let sort_params = sort_params.unwrap_or_default();
1227  let thread_pool = get_thread_pool(sort_params.n_threads);
1228
1229  let tstart = SystemTime::now();
1230  debug!("Start generating count map (first iteration on the full CSV file)...");
1231  let count_map = ImplicitCountMapU32::from_csv_it_par(
1232    line_res_it,
1233    ilon,
1234    ilat,
1235    separator,
1236    depth,
1237    sort_params.n_elems_per_chunk as usize,
1238    &thread_pool,
1239  )?;
1240  debug!(
1241    "... count map computed in {} ms",
1242    SystemTime::now()
1243      .duration_since(tstart)
1244      .unwrap_or_default()
1245      .as_millis()
1246  );
1247  let tstart = SystemTime::now();
1248  sort_params.create_tmp_dir()?;
1249  if let Some(countmap_path) = save_countmap_in_file {
1250    count_map.to_fits_file(countmap_path)?;
1251  }
1252  debug!(
1253    "Count map of writen in {} ms.",
1254    SystemTime::now()
1255      .duration_since(tstart)
1256      .unwrap_or_default()
1257      .as_millis()
1258  );
1259
1260  // Re read the file to sort it
1261  let mut line_res_it = BufReader::new(File::open(&input_path)?).lines().peekable();
1262  // Consume starting comments
1263  while line_res_it
1264    .next_if(|res| {
1265      res
1266        .as_ref()
1267        .map(|line| line.starts_with('#'))
1268        .unwrap_or(false)
1269    })
1270    .is_some()
1271  {}
1272  // Consume the header line if any
1273  if has_header {
1274    line_res_it.next();
1275  }
1276  // Get the sorted iterator
1277  let sorted_it =
1278    hpx_external_sort_with_knowledge(line_res_it, &count_map, hpx29, Some(sort_params))?;
1279
1280  debug!("Starts writing sorted rows in output file...");
1281  let tstart = SystemTime::now();
1282  // Write the results
1283  for row_res in sorted_it {
1284    row_res.and_then(|row| writeln!(&mut writer, "{}", row).map_err(|e| e.into()))?;
1285  }
1286  debug!(
1287    "... writing sorted rows in {} ms",
1288    SystemTime::now()
1289      .duration_since(tstart)
1290      .unwrap_or_default()
1291      .as_millis()
1292  );
1293  Ok(())
1294}
1295
1296/// Returns a function computing given layer depth Healpix hash values from an input line of
1297/// `separator` separated fields in which the field at index `ilon` contains the longitude (in decimal degrees)
1298/// and the field at index `ilat` contains the latitude (also in decimal degrees).
1299/// WARNING: the returned hash value is `0` in case of error.
1300pub fn get_hpx(
1301  ilon: usize,
1302  ilat: usize,
1303  separator: char,
1304  layer: &'static Layer,
1305) -> impl Fn(&String) -> u64 {
1306  let (index_first_col, offset_to_second_col, ilon, ilat) = if ilon < ilat {
1307    (ilon, ilat - ilon - 1, 0, 1)
1308  } else {
1309    (ilat, ilon - ilat - 1, 1, 0)
1310  };
1311
1312  move |line: &String| {
1313    let mut field_it = line.as_str().split(separator);
1314    let coos = [
1315      field_it.nth(index_first_col).and_then(|s| {
1316        s.parse::<f64>()
1317          .map_err(|e| error!("Error parsing 1st coo: '{}': {:?}", s, e))
1318          .ok()
1319      }),
1320      field_it.nth(offset_to_second_col).and_then(|s| {
1321        s.parse::<f64>()
1322          .map_err(|e| error!("Error parsing 2nd coo: '{}': {:?}", s, e))
1323          .ok()
1324      }),
1325    ];
1326    match (coos[ilon], coos[ilat]) {
1327      (Some(lon), Some(lat)) => layer.hash(lon.to_radians(), lat.to_radians()),
1328      _ => {
1329        error!(
1330          "Error parsing coordinates at line: {}. Hash set to 0.",
1331          line
1332        );
1333        0
1334      }
1335    }
1336  }
1337}
1338
1339/// Returns a function computing given layer depth Healpix hash values from an input line of
1340/// `separator` separated fields in which the field at index `ilon` contains the longitude (in decimal degrees)
1341/// and the field at index `ilat` contains the latitude (also in decimal degrees).
1342/// WARNING: returns None in case of error.
1343pub fn get_hpx_opt(
1344  ilon: usize,
1345  ilat: usize,
1346  separator: char,
1347  layer: &'static Layer,
1348) -> impl Fn(&String) -> Option<u64> {
1349  let (index_first_col, offset_to_second_col, ilon, ilat) = if ilon < ilat {
1350    (ilon, ilat - ilon - 1, 0, 1)
1351  } else {
1352    (ilat, ilon - ilat - 1, 1, 0)
1353  };
1354  move |line: &String| {
1355    let mut field_it = line.as_str().split(separator);
1356    let coos = [
1357      field_it.nth(index_first_col).and_then(|s| {
1358        s.parse::<f64>()
1359          .map_err(|e| error!("Error parsing 1st coo: '{}': {:?}", s, e))
1360          .ok()
1361      }),
1362      field_it.nth(offset_to_second_col).and_then(|s| {
1363        s.parse::<f64>()
1364          .map_err(|e| error!("Error parsing 2nd coo: '{}': {:?}", s, e))
1365          .ok()
1366      }),
1367    ];
1368    match (coos[ilon], coos[ilat]) {
1369      (Some(lon), Some(lat)) => Some(layer.hash(lon.to_radians(), lat.to_radians())),
1370      _ => {
1371        error!(
1372          "Error parsing coordinates at line: {}. Lon: {:?}. Lat: {:?}. Hash set to 0.",
1373          line, coos[ilon], coos[ilat]
1374        );
1375        None
1376      }
1377    }
1378  }
1379}
1380
1381/// Create an HEALPix ordered CSV file containing one row per HEALPix hash values at the given
1382/// input depth.
1383/// Each row contains the healpix hash value, the longitude and latitude of the center of the
1384/// HEALPix cell and additional dummy columns adding to 34 bytes (including the coma).
1385pub fn create_test_file(depth: u8, path: &str) -> Result<(), IoError> {
1386  debug!("Starts writing test file...");
1387  let tstart = SystemTime::now();
1388  let n_hash = n_hash(depth);
1389  let width = (n_hash.ilog10() + 1) as usize; // => 50_331_648 rows
1390  let dummmy_cols = "abc,def,ghi,jkl,mno,pqr,stu,vwx,yz"; // 34 bytes
1391  let layer = get(depth);
1392
1393  let mut bufw = BufWriter::new(File::create(path)?);
1394
1395  for h in 0..n_hash {
1396    let (lon, lat) = layer.center(h);
1397    writeln!(
1398      bufw,
1399      "{:0width$},{:014.10},{:+014.10},{}",
1400      h,
1401      lon.to_degrees(),
1402      lat.to_degrees(),
1403      dummmy_cols
1404    )?;
1405  }
1406  debug!(
1407    "... done in {} ms",
1408    SystemTime::now()
1409      .duration_since(tstart)
1410      .unwrap_or_default()
1411      .as_millis()
1412  );
1413  Ok(())
1414}
1415
1416#[cfg(test)]
1417mod tests {
1418  use super::*;
1419  use std::process::Command;
1420
1421  fn init_logger() {
1422    let log_level = log::LevelFilter::max();
1423    // let log_level = log::LevelFilter::Error;
1424
1425    let _ = env_logger::builder()
1426      // Include all events in tests
1427      .filter_level(log_level)
1428      // Ensure events are captured by `cargo test`
1429      .is_test(true)
1430      // Ignore errors initializing the logger if tests race to configure it
1431      .try_init();
1432  }
1433
1434  #[cfg(target_os = "linux")]
1435  #[test]
1436  fn testok_sort() {
1437    init_logger();
1438    // Run with:
1439    // > cargo test testok_sort --release -- --nocapture
1440    // Compare with:
1441    //    time sort --buffer-size=70M --parallel 4 input.csv -T sort_tmp > toto.res
1442
1443    let depth_file = 5; //10;
1444    let depth_sort = 4;
1445    fs::create_dir_all("./local_resources/").unwrap();
1446    create_test_file(depth_file, "./local_resources/test.csv").unwrap();
1447    Command::new("bash")
1448      .arg("-c")
1449      .arg("shuf ./local_resources/test.csv -o ./local_resources/input.csv")
1450      .output()
1451      .expect("failed to execute process");
1452    hpx_external_sort_csv_file(
1453      "./local_resources/input.csv",
1454      "./local_resources/output.csv",
1455      true,
1456      1,
1457      2,
1458      false,
1459      Some(','),
1460      depth_sort,
1461      None::<&'static str>, // Some(PathBuf::from("./local_resources/sort_tmp/hpxsort.countmap.fits"))
1462      Some(SimpleExtSortParams {
1463        tmp_dir: PathBuf::from("./local_resources/sort_tmp/"),
1464        n_elems_per_chunk: 1000, // 1_000_000
1465        n_threads: Some(4),
1466        clean: true,
1467      }),
1468    )
1469    .unwrap();
1470    let out = Command::new("bash")
1471      .arg("-c")
1472      .arg("diff ./local_resources/test.csv ./local_resources/output.csv")
1473      .output()
1474      .expect("failed to execute process");
1475    assert!(out.status.success());
1476    assert!(out.stdout.is_empty());
1477    Command::new("bash")
1478      .arg("-c")
1479      .arg("rm -r ./local_resources/test.csv ./local_resources/output.csv")
1480      .output()
1481      .expect("failed to execute process");
1482  }
1483
1484  // Test only on personal computer
1485  /*#[test]
1486  #[cfg(all(target_os = "linux", not(target_arch = "wasm32")))]
1487  fn testok_bigsort_hdd() {
1488    init_logger();
1489    // Run with:
1490    // > cargo test testok_sort --release -- --nocapture
1491    // Compare with:
1492    //   time sort --buffer-size=800M --parallel 4 input.csv -T sort_tmp > toto.res
1493
1494    let depth_file = 11;
1495    let depth_sort = 4;
1496    fs::create_dir_all("/data/pineau/sandbox/").unwrap();
1497    create_test_file(depth_file, "/data/pineau/sandbox/test.csv").unwrap();
1498    Command::new("bash")
1499      .arg("-c")
1500      .arg("shuf /data/pineau/sandbox/test.csv -o /data/pineau/sandbox/input.csv")
1501      .output()
1502      .expect("failed to execute process");
1503    hpx_external_sort_csv_file(
1504      "/data/pineau/sandbox/input.csv",
1505      "/data/pineau/sandbox/output.csv",
1506      true,
1507      1,
1508      2,
1509      false,
1510      Some(','),
1511      depth_sort,
1512      None::<&'static str>,
1513      Some(SimpleExtSortParams {
1514        tmp_dir: PathBuf::from("/data/pineau/sandbox/sort_tmp/"),
1515        n_elems_per_chunk: 10_000_000,
1516        n_threads: Some(4),
1517        clean: false
1518      }),
1519    )
1520    .unwrap();
1521    let out = Command::new("bash")
1522      .arg("-c")
1523      .arg("diff /data/pineau/sandbox/test.csv /data/pineau/sandbox/output.csv")
1524      .output()
1525      .expect("failed to execute process");
1526    assert!(out.status.success());
1527    assert!(out.stdout.is_empty());
1528    Command::new("bash")
1529    .arg("-c")
1530    .arg("rm -r /data/pineau/sandbox/test.csv /data/pineau/sandbox/output.csv")
1531    .output()
1532    .expect("failed to execute process");
1533  }*/
1534
1535  // Test only on personal computer
1536  /*#[test]
1537  #[cfg(all(target_os = "linux", not(target_arch = "wasm32")))]
1538  fn testok_bigsort_ssd() {
1539    init_logger();
1540    // Run with:
1541    // > cargo test testok_sort --release -- --nocapture
1542    // Compare with:
1543    //   time sort --buffer-size=800M --parallel 4 input.csv -T sort_tmp > toto.res
1544
1545    let depth_file = 11;
1546    let depth_sort = 4;
1547    fs::create_dir_all("./local_resources").unwrap();
1548    create_test_file(depth_file, "./local_resources/test.csv").unwrap();
1549    Command::new("bash")
1550      .arg("-c")
1551      .arg("shuf ./local_resources/test.csv -o ./local_resources/input.csv")
1552      .output()
1553      .expect("failed to execute process");
1554    hpx_external_sort_csv_file(
1555      "./local_resources/input11.csv",
1556      "./local_resources/output11.csv",
1557      true,
1558      1,
1559      2,
1560      false,
1561      Some(','),
1562      depth_sort,
1563      None::<&'static str>,
1564      Some(SimpleExtSortParams {
1565        tmp_dir: PathBuf::from("./local_resources/sort_tmp/"),
1566        n_elems_per_chunk: 5_000_000,
1567        n_threads: Some(4),
1568        clean: false
1569      }),
1570    )
1571    .unwrap();
1572    let out = Command::new("bash")
1573      .arg("-c")
1574      .arg("diff /data/pineau/sandbox/test.csv /data/pineau/sandbox/output.csv")
1575      .output()
1576      .expect("failed to execute process");
1577    assert!(out.status.success());
1578    assert!(out.stdout.is_empty());
1579    Command::new("bash")
1580    .arg("-c")
1581    .arg("rm -r /data/pineau/sandbox/test.csv /data/pineau/sandbox/output.csv")
1582    .output()
1583    .expect("failed to execute process");
1584  }*/
1585}