batcher/
lib.rs

1use core::ops::Fn;
2use std::sync::Arc;
3use std::thread;
4use std::time::Duration;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7fn sleep_ms(ms: u64) {
8    thread::sleep(Duration::from_millis(ms));
9}
10
11pub struct Batch< T: 'static> {
12    pub size: usize,
13    pub functions: Vec<Arc<dyn Fn() -> T + Send + Sync>>,
14    pub stats: bool,
15    pub batch_wait_dur: u64,
16}
17
18impl<T: 'static> Batch<T> {
19    pub fn new() -> Self {
20        Self {
21            size: 0,
22            functions: Vec::new(),
23            stats: false,
24            batch_wait_dur: 0
25        }
26    }
27
28    pub fn set_size(&mut self, s: usize) {
29        self.size = s;
30    }
31
32    pub fn set_stats(&mut self, s: bool) {
33        self.stats = s;
34    }
35
36    pub fn set_batch_wait_dur(&mut self, s: u64) {
37        self.batch_wait_dur = s;
38    }
39
40    pub fn add_func(&mut self, func: impl Fn() -> T + Send + Sync + 'static) {
41        self.functions.push(Arc::new(func));
42    }
43
44    pub fn length(&self) -> usize {
45        return self.functions.len();
46    }
47
48    pub fn itr_fn(&self, arr: Vec<Arc<dyn Fn() -> T + Send + Sync>>) {
49        let mut handles = Vec::new();
50        for i in arr {
51           let handle = thread::spawn(move || {
52           i();
53        }); 
54           handles.push(handle);
55        }
56        for i in handles {
57            i.join().unwrap()
58        }
59    }
60
61    pub fn execute_linear(&self) {
62        let start = self.get_time_in_ms();
63        self.itr_fn(self.functions.clone());
64        if self.stats {
65            println!(
66                "finished linear batch in {:?} ms",
67                self.get_time_in_ms() - start
68            );
69        }
70    }
71
72    pub fn execute(&self) {
73        let start = self.get_time_in_ms();
74
75        let mut prev_start = 0;
76
77        while self.functions.len() - prev_start > self.size {
78
79            let con_arr = self.functions[prev_start..prev_start + self.size].to_vec();
80            self.itr_fn(con_arr);
81
82            prev_start += self.size;
83
84            if self.batch_wait_dur > 0 {
85                sleep_ms(self.batch_wait_dur);
86            }
87        }
88
89        let con_arr = self.functions[prev_start..].to_vec();
90        self.itr_fn(con_arr);
91
92        if self.stats {
93            println!(
94                "finished batching operation in {:?} ms",
95                self.get_time_in_ms() - start
96            );
97        }
98    }
99
100    pub fn get_time_in_ms(&self) -> u64 {
101        let start_time = SystemTime::now();
102        let since_the_epoch = start_time.duration_since(UNIX_EPOCH).unwrap();
103        let time_in_ms = since_the_epoch.as_millis() as u64;
104        time_in_ms
105    }
106}
107
108 #[cfg(test)]
109mod tests {
110    use super::*;
111
112    #[test]
113    fn batch_size() {
114
115        fn func() {}
116
117        let mut batch = Batch::new();
118
119        for _ in 0..50 {
120            batch.add_func(func);
121        }
122
123        batch.set_size(10);
124
125        assert_eq!(batch.size, 10);
126    }
127    #[test]
128    fn equal_batch_clone() {
129        fn func() {}
130
131        let mut batch = Batch::new();
132
133        for _ in 0..50 {
134            batch.add_func(func);
135        }
136
137        assert_eq!(batch.length(), 50);
138    }
139
140}