atomic_batcher/
lib.rs

1// #![forbid(missing_docs)]
2#![cfg_attr(test, deny(warnings))]
3
4//! ## Example
5//! ```rust
6//! extern crate atomic_batcher;
7//! extern crate tokio;
8
9//! use atomic_batcher::*;
10//! use std::sync::Arc;
11//! use std::time::{Duration, Instant};
12//! use tokio::prelude::*;
13//! use tokio::timer::Delay;
14
15//! fn main() {
16//!   let when = Instant::now() + Duration::from_millis(2000);
17//!   let run = move |val: Vec<u64>, _batcher: Batcher<u64>| -> () {
18//!     println!("{:?}", val);  
19//!   };
20//!
21//!   // Create a batcher with a run function which will be called  
22//!   // when batcher's inner state `running` is OFF and inner state `pending_batch`
23//!   // is not empty.
24//!   let batcher = Batcher::new(Arc::new(run));
25//!
26//!   // Before this first append, batcher's inner state `running` is initial OFF,
27//!   // so batcher will call the run function with the append value directly,
28//!   // then inner state `running` is ON.
29//!   batcher.append(vec![1, 2, 3], None);
30//!
31//!   // Now because inner state `running` is ON, run function won't be called.
32//!   // But the data `vec![4, 5, 6]` and `vec![7, 8, 9]` will be pushed to
33//!   // batcher's `pending_batch`.
34//!   batcher.append(vec![4, 5, 6], None);
35//!   batcher.append(vec![7, 8, 9], None);
36//!
37//!   // Now `pending_batch` is vec![4, 5, 6, 7, 8, 9].
38//!   // After 2 seconds, batcher.done get called which will turn `running` to OFF,
39//!   // then call run function with `pending_batch`.
40//!   // Finally turn `running` to ON again.
41//!   let task = Delay::new(when)
42//!   .and_then(|_| {
43//!     batcher.done(Ok(()));
44//!     Ok(())
45//!   })
46//!   .map_err(|e| panic!("delay errored; err={:?}", e));
47//!   tokio::run(task);
48//! }
49//! ```
50//! Running the above example will print
51//! ```sh
52//! [1, 2, 3]
53//!
54//! // two seconds later
55//! [4, 5, 6, 7, 8, 9]
56//! ```
57
58use std::sync::atomic::{AtomicBool, Ordering};
59use std::sync::{Arc, Mutex};
60
61type Cb = Arc<Fn(Result<(), &'static str>) -> () + Send + Sync>;
62/// Describing optional batched callback function
63pub type CbOption = Option<Cb>;
64
65/// Batching representation.
66#[derive(Clone)]
67pub struct Batcher<T: Clone> {
68  running: Arc<AtomicBool>,
69  pending_batch: Arc<Mutex<Vec<T>>>,
70  pending_callbacks: Arc<Mutex<Vec<Cb>>>,
71  callbacks: Arc<Mutex<Vec<Cb>>>,
72  run: Arc<Fn(Vec<T>, Batcher<T>) -> () + Send + Sync>,
73}
74
75impl<T: Clone> Batcher<T> {
76  /// Create a new batcher with a run function.
77  pub fn new(run: Arc<Fn(Vec<T>, Batcher<T>) -> () + Send + Sync>) -> Self {
78    Batcher {
79      running: Arc::new(AtomicBool::new(false)),
80      pending_batch: Arc::new(Mutex::new(Vec::new())),
81      pending_callbacks: Arc::new(Mutex::new(Vec::new())),
82      callbacks: Arc::new(Mutex::new(Vec::new())),
83      run,
84    }
85  }
86  /// Accept an array of values and a callback.
87  /// The accepted callback is called when the batch containing the values have been run.
88  pub fn append(&self, val: Vec<T>, cb: CbOption) -> () {
89    if self.running.load(Ordering::Relaxed) {
90      if self.pending_batch.lock().unwrap().len() == 0 {
91        *self.pending_callbacks.lock().unwrap() = Vec::new();
92      }
93      self.pending_batch.lock().unwrap().extend(val);
94      if let Some(cb) = cb {
95        self.callbacks.lock().unwrap().push(cb);
96      }
97    } else {
98      if let Some(cb) = cb {
99        *self.callbacks.lock().unwrap() = vec![cb];
100      }
101      self.running.store(true, Ordering::Relaxed);
102      (self.run)(val, self.clone());
103    }
104  }
105  /// Turn batcher's running state to off. then call the run function.
106  pub fn done(self, err: Result<(), &'static str>) -> () {
107    for cb in self.callbacks.lock().unwrap().iter() {
108      cb(err)
109    }
110    self.running.store(false, Ordering::Relaxed);
111    let mut pending_callbacks = self.pending_callbacks.lock().unwrap();
112    let mut pending_batch = self.pending_batch.lock().unwrap();
113    *self.callbacks.lock().unwrap() = pending_callbacks.drain(..).collect();
114    let nextbatch: Vec<T> = pending_batch.drain(..).collect();
115    if nextbatch.is_empty() && self.callbacks.lock().unwrap().is_empty() {
116      return;
117    }
118    self.running.store(true, Ordering::Relaxed);
119    (self.run)(nextbatch, self.clone());
120  }
121}