1use core::ops::Fn;
2use std::sync::Arc;
3use std::thread;
4use std::time::Duration;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7fn sleep_ms(ms: u64) {
8 thread::sleep(Duration::from_millis(ms));
9}
10
11pub struct Batch< T: 'static> {
12 pub size: usize,
13 pub functions: Vec<Arc<dyn Fn() -> T + Send + Sync>>,
14 pub stats: bool,
15 pub batch_wait_dur: u64,
16}
17
18impl<T: 'static> Batch<T> {
19 pub fn new() -> Self {
20 Self {
21 size: 0,
22 functions: Vec::new(),
23 stats: false,
24 batch_wait_dur: 0
25 }
26 }
27
28 pub fn set_size(&mut self, s: usize) {
29 self.size = s;
30 }
31
32 pub fn set_stats(&mut self, s: bool) {
33 self.stats = s;
34 }
35
36 pub fn set_batch_wait_dur(&mut self, s: u64) {
37 self.batch_wait_dur = s;
38 }
39
40 pub fn add_func(&mut self, func: impl Fn() -> T + Send + Sync + 'static) {
41 self.functions.push(Arc::new(func));
42 }
43
44 pub fn length(&self) -> usize {
45 return self.functions.len();
46 }
47
48 pub fn itr_fn(&self, arr: Vec<Arc<dyn Fn() -> T + Send + Sync>>) {
49 let mut handles = Vec::new();
50 for i in arr {
51 let handle = thread::spawn(move || {
52 i();
53 });
54 handles.push(handle);
55 }
56 for i in handles {
57 i.join().unwrap()
58 }
59 }
60
61 pub fn execute_linear(&self) {
62 let start = self.get_time_in_ms();
63 self.itr_fn(self.functions.clone());
64 if self.stats {
65 println!(
66 "finished linear batch in {:?} ms",
67 self.get_time_in_ms() - start
68 );
69 }
70 }
71
72 pub fn execute(&self) {
73 let start = self.get_time_in_ms();
74
75 let mut prev_start = 0;
76
77 while self.functions.len() - prev_start > self.size {
78
79 let con_arr = self.functions[prev_start..prev_start + self.size].to_vec();
80 self.itr_fn(con_arr);
81
82 prev_start += self.size;
83
84 if self.batch_wait_dur > 0 {
85 sleep_ms(self.batch_wait_dur);
86 }
87 }
88
89 let con_arr = self.functions[prev_start..].to_vec();
90 self.itr_fn(con_arr);
91
92 if self.stats {
93 println!(
94 "finished batching operation in {:?} ms",
95 self.get_time_in_ms() - start
96 );
97 }
98 }
99
100 pub fn get_time_in_ms(&self) -> u64 {
101 let start_time = SystemTime::now();
102 let since_the_epoch = start_time.duration_since(UNIX_EPOCH).unwrap();
103 let time_in_ms = since_the_epoch.as_millis() as u64;
104 time_in_ms
105 }
106}
107
108 #[cfg(test)]
109mod tests {
110 use super::*;
111
112 #[test]
113 fn batch_size() {
114
115 fn func() {}
116
117 let mut batch = Batch::new();
118
119 for _ in 0..50 {
120 batch.add_func(func);
121 }
122
123 batch.set_size(10);
124
125 assert_eq!(batch.size, 10);
126 }
127 #[test]
128 fn equal_batch_clone() {
129 fn func() {}
130
131 let mut batch = Batch::new();
132
133 for _ in 0..50 {
134 batch.add_func(func);
135 }
136
137 assert_eq!(batch.length(), 50);
138 }
139
140}