1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
use futures::*;
use std::time::Duration;
use std::sync::{Arc, RwLock};
use rand::Rng;
use log::*;
use tokio::time::sleep as t_sleep;

// TODO Make this configurable
const RAMP_UP_SLEEP: Duration = Duration::from_millis(10);

pub struct PerfTool<IN, OUT>
    where
        IN: Send + 'static,
        OUT: Future + Send + 'static,
        OUT::Output: Send + 'static,
{
    wps: u64,
    ramp_up: Duration,
    test_duration: Duration,
    init: Box<dyn Fn() -> IN>,
    worker: Box<dyn Fn(IN) -> OUT>,

    num_workers: Arc<RwLock<u64>>,
}

impl<IN, OUT> PerfTool<IN, OUT>
    where
        IN: Send + 'static,
        OUT: Future + Send + 'static,
        OUT::Output: Send + 'static,
{
    pub fn new(wps: u64,
               ramp_up: Duration,
               test_duration: Duration,
               init_task: impl Fn() -> IN + 'static,
               task: impl Fn(IN) -> OUT + 'static,
    ) -> PerfTool<IN, OUT> {
        PerfTool{
            wps,
            ramp_up,
            test_duration,
            init: Box::new(init_task),
            worker: Box::new(task),
            num_workers: Arc::new(RwLock::new(0)),
        }
    }

    pub async fn start(&self) {
        debug!("starting ramp up");
        let ramp_up_steps = (self.ramp_up.as_millis() / RAMP_UP_SLEEP.as_millis()) as u64;
        let ramp_up_speed: f64 = self.wps as f64 / ramp_up_steps as f64;
        let mut ramp_handles = self.spawn_loop(ramp_up_steps, |i| {
            (ramp_up_speed * (i + 1) as f64).ceil() as u64
        }).await;
        debug!("ramp up done");

        debug!("starting test with constant load");
        let test_steps = (self.test_duration.as_millis() / RAMP_UP_SLEEP.as_millis()) as u64;
        let handles = self.spawn_loop(test_steps, |_| self.wps).await;
        ramp_handles.extend(handles);
        debug!("test done, waiting for workers to finish");

        for h in ramp_handles {
            let _ = h.await;
        }

        debug!("all workers done")
    }

    async fn spawn_loop(&self, steps: u64, expected_num_workers: impl Fn(u64) -> u64) -> Vec<tokio::task::JoinHandle<()>> {
        let mut handles = vec![];

        for i in 0..steps {
            t_sleep(RAMP_UP_SLEEP).await;

            let expected_num_workers = expected_num_workers(i);
            let actual_num_workers = *self.num_workers.read().unwrap();
            if actual_num_workers >= expected_num_workers {
                continue
            }

            debug!("actual #workers - {}, expected #workers - {}", actual_num_workers, expected_num_workers);
            for _ in actual_num_workers..expected_num_workers {
                let input = (self.init)();
                let future = (self.worker)(input);
                let handle = self.spawn_worker(future);
                handles.push(handle);
            }
        }

        handles
    }

    fn spawn_worker(&self, task: OUT) -> tokio::task::JoinHandle<()> {
        let num_workers = Arc::clone(&self.num_workers);
        tokio::spawn(async move {
            *num_workers.write().unwrap() += 1;
            task.await;
            *num_workers.write().unwrap() -= 1;
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    struct WorkerInput<'a> {
        id: u8,
        name: &'a str,
    }

    impl<'a> WorkerInput<'a> {
        fn new(n: &str) -> WorkerInput {
            WorkerInput{
                id: rand::thread_rng().gen(),
                name: n,
            }
        }
    }

    async fn test_worker(input: WorkerInput<'_>) {
        info!("hello, {}, my name is {}", input.id, input.name);

        t_sleep(Duration::from_secs(2)).await;

        info!("I'm done, {}", input.id);
    }

    #[tokio::test]
    async fn test_simple() {
        std::env::set_var("RUST_LOG", "debug");
        env_logger::init();

        let wps = 10;
        let ramp_up = Duration::from_secs(5);
        let test_time = Duration::from_secs(10);

        let test_data = vec!["JORA", "KOLEA", "FEDEA", "VASEA", "VANEA", "PETEA"];
        let init = move || {
            let n = test_data.len();
            let idx: usize = rand::thread_rng().gen();
            WorkerInput::new(test_data[idx % n])
        };
        let tool = PerfTool::new(wps, ramp_up, test_time,init, test_worker);

        let num_workers = Arc::clone(&tool.num_workers);
        let checker = tokio::spawn(async move {
            info!("test waiting for ramp up to finish");
            t_sleep(ramp_up).await;
            info!("test starts checkng actual wps");
            for _ in 0..test_time.as_secs() {
                t_sleep(Duration::from_secs(1)).await;
                let actual_wps = *num_workers.read().unwrap();
                if actual_wps + 2 <= wps {
                    return Err(format!("expected wps {}, actual wps {}", wps, actual_wps));
                }
            }
            Ok(())
        });

        tool.start().await;

        let result = checker.await;
        match result {
            Ok(Ok(())) => {},
            Ok(Err(msg)) => assert!(false, msg),
            Err(msg) => assert!(false, "{}", msg)
        }
    }
}