rpm_timer/lib.rs
1//! # Overview
2//! RpmTimer (_RequestPerMinute Timer_) is a tool for limiting your processing speed to the requested number of items (e.g. requests) per minut.
3//!
4//! It is designed to work with any rate-limited API.
5//!
6//! [](https://crates.io/crates/rpm-timer)
7//! [](https://github.com/kbknapp/clap-rs/blob/master/LICENSE-MIT)
8//! [](https://travis-ci.org/synek317/rpm-timer)
9//!
10//! [Documentation](https://docs.rs/rpm-timer/)
11//!
12//! [Repository](https://github.com/synek317/rpm-timer)
13//!
14//! # Getting Started
15//!
16//! First of all you have to add this dependency to your `Cargo.toml`:
17//!
18//! ```toml
19//! [dev-dependencies]
20//! rpm-timer = "0.0.3"
21//! ```
22//!
23//! Next include `rpm_timer` crate in your `lib.rs` or `main.rs` file:
24//!
25//! ```
26//! extern crate rpm_timer;
27//! ```
28//!
29//! Finally, use `RpmTimer` in the mods where you are going to limit your processing speed:
30//!
31//! ```
32//! use rpm_timer::RpmTimer;
33//! ```
34//!
35//! # Examples
36//!
37//! In order to avoid unnecessary memory alocations, `run` function has two version.
38//!
39//! 1. `run_slices` accepts slice and pass sub-slices to the processing function:
40//!
41//! ```
42//! extern crate rpm_timer;
43//!
44//! use rpm_timer::RpmTimer;
45//!
46//! fn send_slice(requests: Vec<String>) {
47//! RpmTimer::default()
48//! .rpm_limit(100.0)
49//! .run_slice(&requests, send_http_requests);
50//! }
51//!
52//! fn send_http_requests(requests: &[&String]) {
53//! // process all requests here
54//! }
55//! ```
56//!
57//! 2. `run_iter` accepts any iterator, collects items and pass every portion in `Vec` to the processing function:processing
58//!
59//! ```
60//! extern crate rpm_timer;
61//!
62//! use rpm_timer::RpmTimer;
63//!
64//! fn send_slice(reader: BufReader) {
65//! let lines = reader.lines();
66//!
67//! RpmTimer::default()
68//! .rpm_limit(100.0)
69//! .run_iter(lines, send_http_requests);
70//! }
71//!
72//! fn send_http_requests(requests: Vec<Result<String, io::Error>>) {
73//! // process all requests here
74//! }
75//! ```
76//!
77//! Please check `examples` directory for more detailed, working examples.
78//!
79//! # Description
80//!
81//! `RpmTimer` works in tick intervals. You can adjust tick length with `tick` method. Every tick it checks if there are any free worker threads (the number of threads can be adjusted with `max_threads`) and how many items should be processed in order to achieve requested speed. If any items should be processed, `RpmTimer` collects them to the either slice (in non-allocating version) or `Vec` (in allocating version) and fires processing function in the parallel. Every tick, `RpmTimer` tries to utilize all available worker threads. For example, when there are 100 items ready and 4 workers available, RpmTimer will pass 25 items to each worker in this single tick.
82//!
83//! Visualization:
84//!
85//! Assume 2 worker threads and 500 ms tick time. Also, imagine a lag (e.g. cpu busy with other processes) between 2nd and 3rd second:60
86//!
87//! __60 RPM__ = __1 RPS__ = __1__ request every __1__ second
88//!
89//! ```
90//! Time 0 0.5 1 1.5 2 2.5 3 3.5
91//! Main Thread: |....X....X....X....X.........X....X..
92//! Number of items ready 1 0.5 1 0.5 1 2 0.5
93//! Worker no. 1 1**********************.......1******.
94//! Worker no. 2 ..........1**************.....1***....
95//! ^ ^
96//! | |- 2 items sent to the threads
97//! |- an item is ready but no worker threads available
98//!
99//! ```
100//!
101//! __30 RPM__ = __0.5 RPS__ = __1__ request every __2__ seconds
102//!
103//! ```
104//! Time 0 0.5 1 1.5 2 2.5 3 3.5
105//! Main Thread: |....X....X....X..............X....X..
106//! Number of items ready 1 0.25 0.5 0.75 1 0.25 0.5 0.75
107//! Worker no. 1 1***********..........................
108//! Worker no. 2 ....................1************.....
109//! ```
110//!
111//! Legend:
112//!
113//! - `.` - sleeping
114//! - `X` - main thread's _tick_
115//! - `*` - busy with # requests
116//!
117//! # Contribution
118//!
119//! All contributions and comments are more than welcome! Don't be afraid to open an issue or PR whenever you find a bug or have an idea to improve this crate.
120//!
121//! # License
122//!
123//! MIT License
124//!
125//! Copyright (c) 2017 Marcin Sas-SzymaĆski
126//!
127//! Permission is hereby granted, free of charge, to any person obtaining a copy
128//! of this software and associated documentation files (the "Software"), to deal
129//! in the Software without restriction, including without limitation the rights
130//! to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
131//! copies of the Software, and to permit persons to whom the Software is
132//! furnished to do so, subject to the following conditions:
133//!
134//! The above copyright notice and this permission notice shall be included in all
135//! copies or substantial portions of the Software.
136//!
137//! THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
138//! IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
139//! FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
140//! AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
141//! LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
142//! OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
143//! SOFTWARE.
144
145
146extern crate scoped_pool;
147extern crate num_cpus;
148
149mod helpers;
150
151use std::time::{Duration, Instant};
152use std::cmp::min;
153use std::thread::sleep;
154use std::sync::Arc;
155use std::sync::atomic::{AtomicUsize};
156use scoped_pool::Pool;
157use self::helpers::*;
158
159/// Use this struct to limit the speed of any items processing.
160///
161/// Adjust processing speed using struct's methods.
162///
163/// Example usage:
164///
165/// ```
166/// extern crate rpm_timer;
167///
168/// use rpm_timer::RpmTimer;
169///
170/// fn main() {
171/// let items = &["Hello", "World!", "How", "are", "you?"];
172///
173/// RpmTimer::default()
174/// .rps_limit(1.0)
175/// .max_threads(1)
176/// .run_slice(items, print);
177/// }
178///
179/// fn print(items: &[&str]) {
180/// for item in items {
181/// println!("{}", item);
182/// }
183/// }
184/// ```
185pub struct RpmTimer {
186 tick: Duration,
187 rps_limit: f64,
188 max_threads: Option<usize> //None == number of cpus
189}
190
191impl RpmTimer {
192 /// Main thread will try to spawn working threads every _tick_.
193 ///
194 /// Tip: yhe higher RPM requested, the lower tick duration should be.
195 ///
196 /// Default: 100 ms
197 pub fn tick(mut self, value: Duration) -> Self {
198 self.tick = value;
199 self
200 }
201
202 /// Target requests per minute number. It overrides the value previously set by `rps_limit`, if any.
203 ///
204 /// Default: 60
205 pub fn rpm_limit(self, value: f64) -> Self {
206 self.rps_limit(value / 60f64)
207 }
208
209 /// Target requests per second number. It overrides the value previously set by `rpm_limit`, if any.
210 ///
211 /// Default: 1
212 pub fn rps_limit(mut self, value: f64) -> Self {
213 self.rps_limit = value;
214 self
215 }
216
217 /// Maximum number of working threads in the pool.
218 ///
219 /// Pass `None` to limit the number to the number of cpu cores (uses `num_cpus` under the hood).
220 ///
221 /// Default: None
222 pub fn max_threads<T: Into<Option<usize>>>(mut self, value: T) -> Self {
223 self.max_threads = value.into();
224 self
225 }
226
227 /// Non-allocating method that spawns thread and pass sub-slices to the workers.
228 ///
229 /// This is the preffered way unless you only have an iterator.
230 ///
231 /// It waits for all spawned threads to finish.
232 pub fn run_slice<T, F>(self, items: &[T], action: F)
233 where F: Fn(&[T]) + Sync,
234 T: Send + Sync
235 {
236 let mut last_dispatched_item_index = 0;
237
238 self.run(action, |items_to_dispatch| {
239 let first_item_index_to_process = last_dispatched_item_index;
240
241 last_dispatched_item_index = min(last_dispatched_item_index + items_to_dispatch, items.len());
242
243 (&items[first_item_index_to_process..last_dispatched_item_index], last_dispatched_item_index == items.len())
244 });
245 }
246
247 /// Allocating method that spawns thread and pass vectors with collected items to the workers.
248 ///
249 /// This is the most generic solution but you should only use it when `run_slice` is not possible..
250 ///
251 /// It waits for all spawned threads to finish.
252 pub fn run_iter<T, I, F>(self, mut items: I, action: F)
253 where F: Fn(Vec<T>) + Sync,
254 I: Iterator<Item=T>,
255 T: Send
256 {
257 self.run(action, |items_to_dispatch| {
258 let items_to_process = items.by_ref().take(items_to_dispatch).collect::<Vec<_>>();
259 let len = items_to_process.len();
260
261 (items_to_process, len != items_to_dispatch)
262 });
263 }
264
265 fn run<TItems, FAction, FTake>(self, action: FAction, mut take: FTake)
266 where FAction: Fn(TItems) + Sync,
267 FTake: FnMut(usize) -> (TItems, bool),
268 TItems: Send
269 {
270 let pool_size = self.max_threads.unwrap_or_else(|| num_cpus::get());
271 let pool = Pool::new(pool_size);
272 let working_threads = Arc::new(AtomicUsize::new(0));
273 let mut last_tick_time = Instant::now();
274 let mut items_ready = 1f64;
275 let mut finished = false;
276
277 pool.scoped(|scope|
278 while !finished {
279 let tick_start_time = Instant::now();
280 let mut sleeping_working_threads = pool_size - working_threads.get();
281
282 if sleeping_working_threads > 0 {
283 let seconds_since_last_tick = last_tick_time.elapsed_seconds();
284
285 last_tick_time = tick_start_time;
286 items_ready += self.rps_limit * seconds_since_last_tick;
287
288 if items_ready >= 1f64 {
289 let mut items_to_take = items_ready.floor() as usize;
290 let items_to_take_per_worker = (items_to_take as f64 / sleeping_working_threads as f64).ceil() as usize;
291
292 while sleeping_working_threads > 0 && items_to_take > 0 && !finished {
293 let items_to_take_this_time = min(items_to_take_per_worker, items_to_take);
294 let (taken_items, is_finished) = take(items_to_take_this_time);
295 let working_threads_clone = working_threads.clone();
296 let a = &action;
297
298 finished = is_finished;
299 items_ready -= items_to_take_this_time as f64;
300 items_to_take -= items_to_take_this_time;
301 sleeping_working_threads -= 1;
302
303 working_threads.increase();
304
305 scope.execute(move || {
306 a(taken_items);
307 working_threads_clone.decrease();
308 });
309 }
310 }
311 }
312
313 sleep(self.tick - tick_start_time.elapsed());
314 }
315 );
316 }
317}
318
319impl Default for RpmTimer {
320 fn default() -> Self {
321 Self {
322 tick: Duration::from_millis(100),
323 rps_limit: 1f64,
324 max_threads: None
325 }
326 }
327}