1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/// A conversion trait that converts an input type to parallel stream parameters.
pub trait IntoParStreamParams {
    fn into_par_stream_params(self) -> ParStreamParams;
}

impl<T> IntoParStreamParams for T
where
    ParStreamConfig: From<T>,
{
    fn into_par_stream_params(self) -> ParStreamParams {
        let config: ParStreamConfig = self.into();
        let params: ParStreamParams = config.into();
        params
    }
}

/// Parallel stream configuration.
#[derive(Debug, Clone)]
pub struct ParStreamConfig {
    pub num_workers: Count,
    pub buf_size: Count,
}

impl From<Option<usize>> for ParStreamConfig {
    fn from(size: Option<usize>) -> Self {
        match size {
            Some(size) => ParStreamConfig {
                num_workers: Count::Absolute(size),
                buf_size: Count::Absolute(size),
            },
            None => ParStreamConfig {
                num_workers: Count::Auto,
                buf_size: Count::Auto,
            },
        }
    }
}

impl From<usize> for ParStreamConfig {
    fn from(size: usize) -> Self {
        ParStreamConfig {
            num_workers: Count::Absolute(size),
            buf_size: Count::Absolute(size),
        }
    }
}

impl From<f64> for ParStreamConfig {
    fn from(scale: f64) -> Self {
        ParStreamConfig {
            num_workers: Count::Scale(scale),
            buf_size: Count::Scale(scale),
        }
    }
}

impl From<(usize, usize)> for ParStreamConfig {
    fn from((num_workers, buf_size): (usize, usize)) -> Self {
        ParStreamConfig {
            num_workers: Count::Absolute(num_workers),
            buf_size: Count::Absolute(buf_size),
        }
    }
}

impl From<(f64, usize)> for ParStreamConfig {
    fn from((num_workers, buf_size): (f64, usize)) -> Self {
        ParStreamConfig {
            num_workers: Count::Scale(num_workers),
            buf_size: Count::Absolute(buf_size),
        }
    }
}

impl From<(usize, f64)> for ParStreamConfig {
    fn from((num_workers, buf_size): (usize, f64)) -> Self {
        ParStreamConfig {
            num_workers: Count::Absolute(num_workers),
            buf_size: Count::Scale(buf_size),
        }
    }
}

impl From<(f64, f64)> for ParStreamConfig {
    fn from((num_workers, buf_size): (f64, f64)) -> Self {
        ParStreamConfig {
            num_workers: Count::Scale(num_workers),
            buf_size: Count::Scale(buf_size),
        }
    }
}

/// Specifies an absolute value, a scaling factor, or a value determined in runtime.
#[derive(Debug, Clone)]
pub enum Count {
    Auto,
    Absolute(usize),
    Scale(f64),
}

impl Count {
    pub fn to_absolute(&self) -> usize {
        match *self {
            Self::Auto => num_cpus::get(),
            Self::Absolute(val) => {
                assert!(val > 0, "absolute value must be positive");
                val
            }
            Self::Scale(scale) => {
                assert!(
                    scale.is_finite() && scale.is_sign_positive(),
                    "scaling value must be positive finite"
                );
                (num_cpus::get() as f64 * scale).ceil() as usize
            }
        }
    }
}

/// Parallel stream parameters.
#[derive(Debug, Clone)]
pub struct ParStreamParams {
    pub(crate) num_workers: usize,
    pub(crate) buf_size: usize,
}

impl From<ParStreamConfig> for ParStreamParams {
    fn from(from: ParStreamConfig) -> Self {
        let ParStreamConfig {
            num_workers,
            buf_size,
        } = from;

        let num_workers = num_workers.to_absolute();
        let buf_size = buf_size.to_absolute();

        Self {
            num_workers,
            buf_size,
        }
    }
}