rpm_timer/
lib.rs

1//! # Overview
2//! RpmTimer (_RequestPerMinute Timer_) is a tool for limiting your processing speed to the requested number of items (e.g. requests) per minut.
3//!
4//! It is designed to work with any rate-limited API.
5//!
6//! [![Crates.io](https://img.shields.io/crates/v/rpm-timer.svg)](https://crates.io/crates/rpm-timer)
7//! [![license](http://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/kbknapp/clap-rs/blob/master/LICENSE-MIT)
8//! [![Build Status](https://travis-ci.org/synek317/rpm-timer.svg?branch=master)](https://travis-ci.org/synek317/rpm-timer)
9//!
10//! [Documentation](https://docs.rs/rpm-timer/)
11//!
12//! [Repository](https://github.com/synek317/rpm-timer)
13//!
14//! # Getting Started
15//!
16//! First of all you have to add this dependency to your `Cargo.toml`:
17//!
18//! ```toml
19//! [dev-dependencies]
20//! rpm-timer = "0.0.3"
21//! ```
22//!
23//! Next include `rpm_timer` crate in your `lib.rs` or `main.rs` file:
24//!
25//! ```
26//! extern crate rpm_timer;
27//! ```
28//!
29//! Finally, use `RpmTimer` in the mods where you are going to limit your processing speed:
30//!
31//! ```
32//! use rpm_timer::RpmTimer;
33//! ```
34//!
35//! # Examples
36//!
37//! In order to avoid unnecessary memory alocations, `run` function has two version.
38//!
39//! 1. `run_slices` accepts slice and pass sub-slices to the processing function:
40//!
41//! ```
42//! extern crate rpm_timer;
43//!
44//! use rpm_timer::RpmTimer;
45//!
46//! fn send_slice(requests: Vec<String>) {
47//!     RpmTimer::default()
48//!         .rpm_limit(100.0)
49//!         .run_slice(&requests, send_http_requests);
50//! }
51//!
52//! fn send_http_requests(requests: &[&String]) {
53//!     // process all requests here
54//! }
55//! ```
56//!
57//! 2. `run_iter` accepts any iterator, collects items and pass every portion in `Vec` to the processing function:processing
58//!
59//! ```
60//! extern crate rpm_timer;
61//!
62//! use rpm_timer::RpmTimer;
63//!
64//! fn send_slice(reader: BufReader) {
65//!     let lines = reader.lines();
66//!
67//!     RpmTimer::default()
68//!         .rpm_limit(100.0)
69//!         .run_iter(lines, send_http_requests);
70//! }
71//!
72//! fn send_http_requests(requests: Vec<Result<String, io::Error>>) {
73//!     // process all requests here
74//! }
75//! ```
76//!
77//! Please check `examples` directory for more detailed, working examples.
78//!
79//! # Description
80//!
81//! `RpmTimer` works in tick intervals. You can adjust tick length with `tick` method. Every tick it checks if there are any free worker threads (the number of threads can be adjusted with `max_threads`) and how many items should  be processed in order to achieve requested speed. If any items should be processed, `RpmTimer` collects them to the either slice (in non-allocating version) or `Vec` (in allocating version) and fires processing function in the parallel. Every tick, `RpmTimer` tries to utilize all available worker threads. For example, when there are 100 items ready and 4 workers available, RpmTimer will pass 25 items to each worker in this single tick.
82//!
83//! Visualization:
84//!
85//! Assume 2 worker threads and 500 ms tick time. Also, imagine a lag (e.g. cpu busy with other processes) between 2nd and 3rd second:60
86//!
87//! __60 RPM__ = __1 RPS__ = __1__ request every __1__ second
88//!
89//! ```
90//! Time                     0   0.5   1   1.5   2   2.5   3   3.5
91//! Main Thread:             |....X....X....X....X.........X....X..
92//! Number of items ready    1   0.5   1   0.5   1         2   0.5
93//! Worker no. 1             1**********************.......1******.
94//! Worker no. 2             ..........1**************.....1***....
95//!                                              ^         ^
96//!                                              |         |- 2 items sent to the threads
97//!                                              |- an item is ready but no worker threads available
98//!
99//! ```
100//!
101//! __30 RPM__ = __0.5 RPS__ = __1__ request every __2__ seconds
102//!
103//! ```
104//! Time                     0   0.5   1   1.5   2   2.5   3   3.5
105//! Main Thread:             |....X....X....X..............X....X..
106//! Number of items ready    1  0.25  0.5  0.75  1  0.25  0.5  0.75
107//! Worker no. 1             1***********..........................
108//! Worker no. 2             ....................1************.....
109//! ```
110//!
111//! Legend:
112//!
113//!   - `.` - sleeping
114//!   - `X` - main thread's _tick_
115//!   - `*` - busy with # requests
116//!
117//! # Contribution
118//!
119//! All contributions and comments are more than welcome! Don't be afraid to open an issue or PR whenever you find a bug or have an idea to improve this crate.
120//!
121//! # License
122//!
123//! MIT License
124//!
125//! Copyright (c) 2017 Marcin Sas-SzymaƄski
126//!
127//! Permission is hereby granted, free of charge, to any person obtaining a copy
128//! of this software and associated documentation files (the "Software"), to deal
129//! in the Software without restriction, including without limitation the rights
130//! to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
131//! copies of the Software, and to permit persons to whom the Software is
132//! furnished to do so, subject to the following conditions:
133//!
134//! The above copyright notice and this permission notice shall be included in all
135//! copies or substantial portions of the Software.
136//!
137//! THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
138//! IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
139//! FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
140//! AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
141//! LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
142//! OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
143//! SOFTWARE.
144
145
146extern crate scoped_pool;
147extern crate num_cpus;
148
149mod helpers;
150
151use std::time::{Duration, Instant};
152use std::cmp::min;
153use std::thread::sleep;
154use std::sync::Arc;
155use std::sync::atomic::{AtomicUsize};
156use scoped_pool::Pool;
157use self::helpers::*;
158
159/// Use this struct to limit the speed of any items processing.
160///
161/// Adjust processing speed using struct's methods.
162///
163/// Example usage:
164///
165/// ```
166/// extern crate rpm_timer;
167///
168/// use rpm_timer::RpmTimer;
169///
170/// fn main() {
171///     let items = &["Hello", "World!", "How", "are", "you?"];
172///
173///     RpmTimer::default()
174///         .rps_limit(1.0)
175///         .max_threads(1)
176///         .run_slice(items, print);
177/// }
178///
179/// fn print(items: &[&str]) {
180///     for item in items {
181///         println!("{}", item);
182///     }
183/// }
184/// ```
185pub struct RpmTimer {
186    tick: Duration,
187    rps_limit: f64,
188    max_threads: Option<usize> //None == number of cpus
189}
190
191impl RpmTimer {
192    /// Main thread will try to spawn working threads every _tick_.
193    ///
194    /// Tip: yhe higher RPM requested, the lower tick duration should be.
195    ///
196    /// Default: 100 ms
197    pub fn tick(mut self, value: Duration) -> Self {
198        self.tick = value;
199        self
200    }
201
202    /// Target requests per minute number. It overrides the value previously set by `rps_limit`, if any.
203    ///
204    /// Default: 60
205    pub fn rpm_limit(self, value: f64) -> Self {
206        self.rps_limit(value / 60f64)
207    }
208
209    /// Target requests per second number. It overrides the value previously set by `rpm_limit`, if any.
210    ///
211    /// Default: 1
212    pub fn rps_limit(mut self, value: f64) -> Self {
213        self.rps_limit = value;
214        self
215    }
216
217    /// Maximum number of working threads in the pool.
218    ///
219    /// Pass `None` to limit the number to the number of cpu cores (uses `num_cpus` under the hood).
220    ///
221    /// Default: None
222    pub fn max_threads<T: Into<Option<usize>>>(mut self, value: T) -> Self {
223        self.max_threads = value.into();
224        self
225    }
226
227    /// Non-allocating method that spawns thread and pass sub-slices to the workers.
228    ///
229    /// This is the preffered way unless you only have an iterator.
230    ///
231    /// It waits for all spawned threads to finish.
232    pub fn run_slice<T, F>(self, items: &[T], action: F)
233        where F: Fn(&[T]) + Sync,
234              T: Send + Sync
235    {
236        let mut last_dispatched_item_index = 0;
237
238        self.run(action, |items_to_dispatch| {
239            let first_item_index_to_process = last_dispatched_item_index;
240
241            last_dispatched_item_index = min(last_dispatched_item_index + items_to_dispatch, items.len());
242
243            (&items[first_item_index_to_process..last_dispatched_item_index], last_dispatched_item_index == items.len())
244        });
245    }
246
247    /// Allocating method that spawns thread and pass vectors with collected items to the workers.
248    ///
249    /// This is the most generic solution but you should only use it when `run_slice` is not possible..
250    ///
251    /// It waits for all spawned threads to finish.
252    pub fn run_iter<T, I, F>(self, mut items: I, action: F)
253        where F: Fn(Vec<T>) + Sync,
254              I: Iterator<Item=T>,
255              T: Send
256    {
257        self.run(action, |items_to_dispatch| {
258            let items_to_process = items.by_ref().take(items_to_dispatch).collect::<Vec<_>>();
259            let len = items_to_process.len();
260
261            (items_to_process, len != items_to_dispatch)
262        });
263    }
264
265    fn run<TItems, FAction, FTake>(self, action: FAction, mut take: FTake)
266        where FAction: Fn(TItems) + Sync,
267              FTake: FnMut(usize) -> (TItems, bool),
268              TItems: Send
269    {
270        let pool_size          = self.max_threads.unwrap_or_else(|| num_cpus::get());
271        let pool               = Pool::new(pool_size);
272        let working_threads    = Arc::new(AtomicUsize::new(0));
273        let mut last_tick_time = Instant::now();
274        let mut items_ready    = 1f64;
275        let mut finished       = false;
276
277        pool.scoped(|scope|
278            while !finished {
279                let tick_start_time = Instant::now();
280                let mut sleeping_working_threads = pool_size - working_threads.get();
281
282                if sleeping_working_threads > 0 {
283                    let seconds_since_last_tick = last_tick_time.elapsed_seconds();
284
285                    last_tick_time  = tick_start_time;
286                    items_ready    += self.rps_limit * seconds_since_last_tick;
287
288                    if items_ready >= 1f64 {
289                        let mut items_to_take = items_ready.floor() as usize;
290                        let items_to_take_per_worker = (items_to_take as f64 / sleeping_working_threads as f64).ceil() as usize;
291
292                        while sleeping_working_threads > 0 && items_to_take > 0 && !finished {
293                            let items_to_take_this_time    = min(items_to_take_per_worker, items_to_take);
294                            let (taken_items, is_finished) = take(items_to_take_this_time);
295                            let working_threads_clone      = working_threads.clone();
296                            let a = &action;
297
298                            finished                  = is_finished;
299                            items_ready              -= items_to_take_this_time as f64;
300                            items_to_take            -= items_to_take_this_time;
301                            sleeping_working_threads -= 1;
302
303                            working_threads.increase();
304
305                            scope.execute(move || {
306                                a(taken_items);
307                                working_threads_clone.decrease();
308                            });
309                        }
310                    }
311                }
312
313                sleep(self.tick - tick_start_time.elapsed());
314            }
315        );
316    }
317}
318
319impl Default for RpmTimer {
320    fn default() -> Self {
321        Self {
322            tick:        Duration::from_millis(100),
323            rps_limit:   1f64,
324            max_threads: None
325        }
326    }
327}