atomic_batcher/lib.rs
1// #![forbid(missing_docs)]
2#![cfg_attr(test, deny(warnings))]
3
4//! ## Example
5//! ```rust
6//! extern crate atomic_batcher;
7//! extern crate tokio;
8
9//! use atomic_batcher::*;
10//! use std::sync::Arc;
11//! use std::time::{Duration, Instant};
12//! use tokio::prelude::*;
13//! use tokio::timer::Delay;
14
15//! fn main() {
16//! let when = Instant::now() + Duration::from_millis(2000);
17//! let run = move |val: Vec<u64>, _batcher: Batcher<u64>| -> () {
18//! println!("{:?}", val);
19//! };
20//!
21//! // Create a batcher with a run function which will be called
22//! // when batcher's inner state `running` is OFF and inner state `pending_batch`
23//! // is not empty.
24//! let batcher = Batcher::new(Arc::new(run));
25//!
26//! // Before this first append, batcher's inner state `running` is initial OFF,
27//! // so batcher will call the run function with the append value directly,
28//! // then inner state `running` is ON.
29//! batcher.append(vec![1, 2, 3], None);
30//!
31//! // Now because inner state `running` is ON, run function won't be called.
32//! // But the data `vec![4, 5, 6]` and `vec![7, 8, 9]` will be pushed to
33//! // batcher's `pending_batch`.
34//! batcher.append(vec![4, 5, 6], None);
35//! batcher.append(vec![7, 8, 9], None);
36//!
37//! // Now `pending_batch` is vec![4, 5, 6, 7, 8, 9].
38//! // After 2 seconds, batcher.done get called which will turn `running` to OFF,
39//! // then call run function with `pending_batch`.
40//! // Finally turn `running` to ON again.
41//! let task = Delay::new(when)
42//! .and_then(|_| {
43//! batcher.done(Ok(()));
44//! Ok(())
45//! })
46//! .map_err(|e| panic!("delay errored; err={:?}", e));
47//! tokio::run(task);
48//! }
49//! ```
50//! Running the above example will print
51//! ```sh
52//! [1, 2, 3]
53//!
54//! // two seconds later
55//! [4, 5, 6, 7, 8, 9]
56//! ```
57
58use std::sync::atomic::{AtomicBool, Ordering};
59use std::sync::{Arc, Mutex};
60
61type Cb = Arc<Fn(Result<(), &'static str>) -> () + Send + Sync>;
62/// Describing optional batched callback function
63pub type CbOption = Option<Cb>;
64
65/// Batching representation.
66#[derive(Clone)]
67pub struct Batcher<T: Clone> {
68 running: Arc<AtomicBool>,
69 pending_batch: Arc<Mutex<Vec<T>>>,
70 pending_callbacks: Arc<Mutex<Vec<Cb>>>,
71 callbacks: Arc<Mutex<Vec<Cb>>>,
72 run: Arc<Fn(Vec<T>, Batcher<T>) -> () + Send + Sync>,
73}
74
75impl<T: Clone> Batcher<T> {
76 /// Create a new batcher with a run function.
77 pub fn new(run: Arc<Fn(Vec<T>, Batcher<T>) -> () + Send + Sync>) -> Self {
78 Batcher {
79 running: Arc::new(AtomicBool::new(false)),
80 pending_batch: Arc::new(Mutex::new(Vec::new())),
81 pending_callbacks: Arc::new(Mutex::new(Vec::new())),
82 callbacks: Arc::new(Mutex::new(Vec::new())),
83 run,
84 }
85 }
86 /// Accept an array of values and a callback.
87 /// The accepted callback is called when the batch containing the values have been run.
88 pub fn append(&self, val: Vec<T>, cb: CbOption) -> () {
89 if self.running.load(Ordering::Relaxed) {
90 if self.pending_batch.lock().unwrap().len() == 0 {
91 *self.pending_callbacks.lock().unwrap() = Vec::new();
92 }
93 self.pending_batch.lock().unwrap().extend(val);
94 if let Some(cb) = cb {
95 self.callbacks.lock().unwrap().push(cb);
96 }
97 } else {
98 if let Some(cb) = cb {
99 *self.callbacks.lock().unwrap() = vec![cb];
100 }
101 self.running.store(true, Ordering::Relaxed);
102 (self.run)(val, self.clone());
103 }
104 }
105 /// Turn batcher's running state to off. then call the run function.
106 pub fn done(self, err: Result<(), &'static str>) -> () {
107 for cb in self.callbacks.lock().unwrap().iter() {
108 cb(err)
109 }
110 self.running.store(false, Ordering::Relaxed);
111 let mut pending_callbacks = self.pending_callbacks.lock().unwrap();
112 let mut pending_batch = self.pending_batch.lock().unwrap();
113 *self.callbacks.lock().unwrap() = pending_callbacks.drain(..).collect();
114 let nextbatch: Vec<T> = pending_batch.drain(..).collect();
115 if nextbatch.is_empty() && self.callbacks.lock().unwrap().is_empty() {
116 return;
117 }
118 self.running.store(true, Ordering::Relaxed);
119 (self.run)(nextbatch, self.clone());
120 }
121}