1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
//! Contains environment specific logic.

use crate::utils::{DefaultRandom, Random, ThreadPool, Timer};
use std::sync::Arc;

/// A logger type which is called with various information.
pub type InfoLogger = Arc<dyn Fn(&str) + Send + Sync>;

/// Specifies a computational quota for executions. The main purpose is to allow to stop algorithm
/// in reaction to external events such as user cancellation, timer, etc.
pub trait Quota: Send + Sync {
    /// Returns true when computation should be stopped.
    fn is_reached(&self) -> bool;
}

/// Keeps track of environment specific information which influences algorithm behavior.
#[derive(Clone)]
pub struct Environment {
    /// A wrapper on random generator.
    pub random: Arc<dyn Random + Send + Sync>,

    /// A global execution quota.
    pub quota: Option<Arc<dyn Quota + Send + Sync>>,

    /// Keeps data parallelism settings.
    pub parallelism: Parallelism,

    /// An information logger.
    pub logger: InfoLogger,

    /// A boolean flag which signalizes that experimental behavior is allowed.
    pub is_experimental: bool,
}

impl Environment {
    /// Creates an instance of `Environment` using optional time quota and defaults.
    pub fn new_with_time_quota(max_time: Option<usize>) -> Self {
        Self {
            quota: max_time.map::<Arc<dyn Quota + Send + Sync>, _>(|time| Arc::new(TimeQuota::new(time as f64))),
            ..Self::default()
        }
    }

    /// Creates an instance of `Environment`.
    pub fn new(
        random: Arc<dyn Random + Send + Sync>,
        quota: Option<Arc<dyn Quota + Send + Sync>>,
        parallelism: Parallelism,
        logger: InfoLogger,
        is_experimental: bool,
    ) -> Self {
        Self { random, quota, parallelism, logger, is_experimental }
    }
}

impl Default for Environment {
    fn default() -> Self {
        Environment::new(
            Arc::new(DefaultRandom::default()),
            None,
            Parallelism::default(),
            Arc::new(|msg| println!("{}", msg)),
            false,
        )
    }
}

/// A time quota.
pub struct TimeQuota {
    start: Timer,
    limit_in_secs: f64,
}

impl TimeQuota {
    /// Creates a new instance of `TimeQuota`.
    pub fn new(limit_in_secs: f64) -> Self {
        Self { start: Timer::start(), limit_in_secs }
    }
}

impl Quota for TimeQuota {
    fn is_reached(&self) -> bool {
        self.start.elapsed_secs_as_f64() > self.limit_in_secs
    }
}

/// Specifies data parallelism settings.
#[derive(Clone)]
pub struct Parallelism {
    available_cpus: usize,
    // NOTE seems falls positive.
    #[allow(clippy::rc_buffer)]
    thread_pools: Option<Arc<Vec<ThreadPool>>>,
}

impl Default for Parallelism {
    fn default() -> Self {
        Self { available_cpus: get_cpus(), thread_pools: None }
    }
}

impl Parallelism {
    /// Creates an instance of `Parallelism`.
    pub fn new(num_thread_pools: usize, threads_per_pool: usize) -> Self {
        let thread_pools = (0..num_thread_pools).map(|_| ThreadPool::new(threads_per_pool)).collect();
        Self { available_cpus: get_cpus(), thread_pools: Some(Arc::new(thread_pools)) }
    }

    /// Amount of total available CPUs.
    pub fn available_cpus(&self) -> usize {
        self.available_cpus
    }

    /// Executes operation on thread pool with given index. If there is no thread pool with such
    /// index, then executes it without using any of thread pools.
    pub fn thread_pool_execute<OP, R>(&self, idx: usize, op: OP) -> R
    where
        OP: FnOnce() -> R + Send,
        R: Send,
    {
        if let Some(thread_pool) = self.thread_pools.as_ref().and_then(|tps| tps.get(idx)) {
            thread_pool.execute(op)
        } else {
            op()
        }
    }

    /// Returns amount of thread pools used. Returns zero if default thread pool is used.
    pub fn thread_pool_size(&self) -> usize {
        self.thread_pools.as_ref().map_or(0, |tp| tp.len())
    }
}

#[cfg(not(target_arch = "wasm32"))]
fn get_cpus() -> usize {
    num_cpus::get()
}

#[cfg(target_arch = "wasm32")]
fn get_cpus() -> usize {
    1
}