1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// #![forbid(missing_docs)]
#![cfg_attr(test, deny(warnings))]

//! ## Example
//! ```rust
//! extern crate atomic_batcher;
//! extern crate tokio;

//! use atomic_batcher::*;
//! use std::sync::Arc;
//! use std::time::{Duration, Instant};
//! use tokio::prelude::*;
//! use tokio::timer::Delay;

//! fn main() {
//!   let when = Instant::now() + Duration::from_millis(2000);
//!   let run = move |val: Vec<u64>, _batcher: Batcher<u64>| -> () {
//!     println!("{:?}", val);  
//!   };
//!
//!   // Create a batcher with a run function which will be called  
//!   // when batcher's inner state `running` is OFF and inner state `pending_batch`
//!   // is not empty.
//!   let batcher = Batcher::new(Arc::new(run));
//!
//!   // Before this first append, batcher's inner state `running` is initial OFF,
//!   // so batcher will call the run function with the append value directly,
//!   // then inner state `running` is ON.
//!   batcher.append(vec![1, 2, 3], None);
//!
//!   // Now because inner state `running` is ON, run function won't be called.
//!   // But the data `vec![4, 5, 6]` and `vec![7, 8, 9]` will be pushed to
//!   // batcher's `pending_batch`.
//!   batcher.append(vec![4, 5, 6], None);
//!   batcher.append(vec![7, 8, 9], None);
//!
//!   // Now `pending_batch` is vec![4, 5, 6, 7, 8, 9].
//!   // After 2 seconds, batcher.done get called which will turn `running` to OFF,
//!   // then call run function with `pending_batch`.
//!   // Finally turn `running` to ON again.
//!   let task = Delay::new(when)
//!   .and_then(|_| {
//!     batcher.done(Ok(()));
//!     Ok(())
//!   })
//!   .map_err(|e| panic!("delay errored; err={:?}", e));
//!   tokio::run(task);
//! }
//! ```
//! Running the above example will print
//! ```sh
//! [1, 2, 3]
//!
//! // two seconds later
//! [4, 5, 6, 7, 8, 9]
//! ```

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};

type Cb = Arc<Fn(Result<(), &'static str>) -> () + Send + Sync>;
/// Describing optional batched callback function
pub type CbOption = Option<Cb>;

/// Batching representation.
#[derive(Clone)]
pub struct Batcher<T: Clone> {
  running: Arc<AtomicBool>,
  pending_batch: Arc<Mutex<Vec<T>>>,
  pending_callbacks: Arc<Mutex<Vec<Cb>>>,
  callbacks: Arc<Mutex<Vec<Cb>>>,
  run: Arc<Fn(Vec<T>, Batcher<T>) -> () + Send + Sync>,
}

impl<T: Clone> Batcher<T> {
  /// Create a new batcher with a run function.
  pub fn new(run: Arc<Fn(Vec<T>, Batcher<T>) -> () + Send + Sync>) -> Self {
    Batcher {
      running: Arc::new(AtomicBool::new(false)),
      pending_batch: Arc::new(Mutex::new(Vec::new())),
      pending_callbacks: Arc::new(Mutex::new(Vec::new())),
      callbacks: Arc::new(Mutex::new(Vec::new())),
      run,
    }
  }
  /// Accept an array of values and a callback.
  /// The accepted callback is called when the batch containing the values have been run.
  pub fn append(&self, val: Vec<T>, cb: CbOption) -> () {
    if self.running.load(Ordering::Relaxed) {
      if self.pending_batch.lock().unwrap().len() == 0 {
        *self.pending_callbacks.lock().unwrap() = Vec::new();
      }
      self.pending_batch.lock().unwrap().extend(val);
      if let Some(cb) = cb {
        self.callbacks.lock().unwrap().push(cb);
      }
    } else {
      if let Some(cb) = cb {
        *self.callbacks.lock().unwrap() = vec![cb];
      }
      self.running.store(true, Ordering::Relaxed);
      (self.run)(val, self.clone());
    }
  }
  /// Turn batcher's running state to off. then call the run function.
  pub fn done(self, err: Result<(), &'static str>) -> () {
    for cb in self.callbacks.lock().unwrap().iter() {
      cb(err)
    }
    self.running.store(false, Ordering::Relaxed);
    let mut pending_callbacks = self.pending_callbacks.lock().unwrap();
    let mut pending_batch = self.pending_batch.lock().unwrap();
    *self.callbacks.lock().unwrap() = pending_callbacks.drain(..).collect();
    let nextbatch: Vec<T> = pending_batch.drain(..).collect();
    if nextbatch.is_empty() && self.callbacks.lock().unwrap().is_empty() {
      return;
    }
    self.running.store(true, Ordering::Relaxed);
    (self.run)(nextbatch, self.clone());
  }
}