1use std::{
2 collections::VecDeque,
3 num::NonZeroUsize,
4 sync::{
5 atomic::{
6 AtomicBool,
7 Ordering
8 },
9 mpsc,
10 Arc,
11 Condvar,
12 Mutex
13 },
14 thread::{
15 self,
16 JoinHandle
17 }
18};
19
20use subloader::{
21 LoadExp,
22 SubLoader
23};
24
25mod subloader;
26
27pub trait Loadable {
28 type Output: Send + Sync;
29 fn load(&mut self) -> Self::Output;
30}
31
32type LoadQueue = Arc<Mutex<VecDeque<Box<dyn LoadExp + Send + Sync>>>>;
33pub struct Loader {
34 threads: Vec<Option<thread::JoinHandle<()>>>,
35 queue: LoadQueue,
36 conds: Arc<(Condvar, Mutex<bool>, AtomicBool)>,
37}
38
39impl Loader {
40 pub fn new() -> Self {
41 let mut threads = Vec::new();
42
43 let num_threads = std::thread::available_parallelism()
44 .unwrap_or(NonZeroUsize::new(4).unwrap())
45 .get();
46
47 let queue = Arc::new(Mutex::new(VecDeque::new()));
48 let conds = Arc::new((Condvar::new(), Mutex::new(false), AtomicBool::new(false)));
49
50 for _ in 0..num_threads {
51 let queue = queue.clone();
52 let conds = conds.clone();
53 threads.push(Some(thread::spawn(|| { Self::loader_func(queue, conds) })))
54 }
55
56 Self {
57 threads,
58 queue,
59 conds,
60 }
61 }
62
63 pub fn load<T: Loadable + Send + Sync + 'static>(&mut self, item: T) -> mpsc::Receiver<<T as Loadable>::Output> {
64 let (loader, reciever) = SubLoader::new(item);
65
66 self.queue
67 .lock()
68 .unwrap()
69 .push_back(loader);
70
71 self.conds.0.notify_one();
72
73 reciever
74 }
75
76 fn loader_func(queue: LoadQueue, conditions: Arc<(Condvar, Mutex<bool>, AtomicBool)>) {
77 let (ref cond_var, ref mutex, ref should_close) = *conditions;
78
79 while !should_close.load(Ordering::SeqCst) {
80 {
81 let len = {
82 queue.lock().unwrap().len()
83 };
84
85 if len < 1 {
86 let guard = mutex.lock().unwrap();
87 let _x = cond_var.wait(guard).unwrap();
88 }
89 }
90
91 let data = {
92 let mut queue = queue.lock().unwrap();
93 queue.pop_front()
94 };
95
96 match data {
97 Some(mut data) => {
98 data.load();
99
100 continue
101 },
102 None => {
103 }
104 }
105 }
106 }
107
108 pub fn restore_threads(&mut self) {
110 for handle in self.threads.iter_mut() {
111 let old = handle.take();
112
113 fn add_threads(
114 queue: LoadQueue,
115 conds: Arc<(Condvar, Mutex<bool>, AtomicBool)>,
116 handle: &mut Option<JoinHandle<()>>
117 ) {
118 *handle = Some(thread::spawn(|| { Loader::loader_func(queue, conds) }));
119 }
120
121 match old {
122 Some(thread) => {
123 if thread.is_finished() {
125 _ = thread.join();
126
127 add_threads(self.queue.clone(), self.conds.clone(), handle)
128 }
129 },
130 None => {
131 add_threads(self.queue.clone(), self.conds.clone(), handle)
132 }
133 }
134 }
135 }
136}
137
138impl Drop for Loader {
139 fn drop(&mut self) {
140 let (ref cond_var, _, ref should_close) = *self.conds;
141
142 should_close.store(true, Ordering::SeqCst);
143 cond_var.notify_all();
144
145 for opt in self.threads.iter_mut() {
146 let thread = opt.take();
147
148 match thread {
149 Some(thread) => {
150 _ = thread.join()
151 },
152 None => ()
153 }
154 }
155 }
156}