1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
// use crate::common::*;

/// A helper trait that converts types to parallel stream configuration.
pub trait IntoParStreamConfig: Sized {
    fn into_par_stream_config(self) -> ParStreamConfig;
    fn into_par_stream_params(self) -> ParStreamParams {
        self.into_par_stream_config().into()
    }
}

impl IntoParStreamConfig for Option<usize> {
    fn into_par_stream_config(self) -> ParStreamConfig {
        match self {
            Some(size) => {
                assert!(
                    size >= 1,
                    "the number of workers must be equal to greater to 1"
                );
                ParStreamConfig::Absolute(size)
            }
            None => ParStreamConfig::Auto,
        }
    }
}

impl IntoParStreamConfig for usize {
    fn into_par_stream_config(self) -> ParStreamConfig {
        assert!(
            self >= 1,
            "the number of workers must be a number greater or equal to 1"
        );
        ParStreamConfig::Absolute(self)
    }
}

impl IntoParStreamConfig for f64 {
    fn into_par_stream_config(self) -> ParStreamConfig {
        assert!(
            self.is_finite() && self >= 0.0,
            "the scaling number must be positive"
        );
        ParStreamConfig::Scale(self)
    }
}

impl IntoParStreamConfig for (usize, usize) {
    fn into_par_stream_config(self) -> ParStreamConfig {
        let (num_workers, buf_size) = self;
        assert!(
            num_workers >= 1,
            "the number of workers must be equal to greater to 1"
        );
        assert!(
            buf_size >= 1,
            "the buffer size must be equal to greater to 1"
        );
        ParStreamConfig::Custom {
            num_workers,
            buf_size,
        }
    }
}

/// Parallel stream configuration.
#[derive(Debug, Clone)]
pub enum ParStreamConfig {
    Auto,
    Absolute(usize),
    Scale(f64),
    Custom { num_workers: usize, buf_size: usize },
}

/// Parallel stream parameters.
#[derive(Debug, Clone)]
pub struct ParStreamParams {
    pub(crate) num_workers: usize,
    pub(crate) buf_size: usize,
}

impl From<ParStreamConfig> for ParStreamParams {
    fn from(from: ParStreamConfig) -> Self {
        use ParStreamConfig::*;

        let (num_workers, buf_size) = match from {
            Auto => {
                let num_workers = num_cpus::get();
                let buf_size = num_workers * 2;
                (num_workers, buf_size)
            }
            Absolute(num_workers) => {
                let buf_size = num_workers * 2;
                (num_workers, buf_size)
            }
            Scale(scale) => {
                let num_workers = (num_cpus::get() as f64 * scale).ceil() as usize;
                let buf_size = num_workers * 2;
                (num_workers, buf_size)
            }
            Custom {
                num_workers,
                buf_size,
            } => (num_workers, buf_size),
        };

        Self {
            num_workers,
            buf_size,
        }
    }
}