aspen_loader/
lib.rs

1use std::{
2    collections::VecDeque, 
3    num::NonZeroUsize, 
4    sync::{
5        atomic::{
6            AtomicBool, 
7            Ordering
8        }, 
9        mpsc, 
10        Arc, 
11        Condvar, 
12        Mutex
13    }, 
14    thread::{
15        self, 
16        JoinHandle
17    }
18};
19
20use subloader::{
21    LoadExp, 
22    SubLoader
23};
24
25mod subloader;
26
27pub trait Loadable {
28    type Output: Send + Sync;
29    fn load(&mut self) -> Self::Output;
30}
31
32type LoadQueue = Arc<Mutex<VecDeque<Box<dyn LoadExp + Send + Sync>>>>;
33pub struct Loader {
34    threads: Vec<Option<thread::JoinHandle<()>>>,
35    queue: LoadQueue,
36    conds: Arc<(Condvar, Mutex<bool>, AtomicBool)>,
37}
38
39impl Loader {
40    pub fn new() -> Self {
41        let mut threads = Vec::new();
42
43        let num_threads = std::thread::available_parallelism()
44            .unwrap_or(NonZeroUsize::new(4).unwrap())
45            .get();
46
47        let queue = Arc::new(Mutex::new(VecDeque::new()));
48        let conds = Arc::new((Condvar::new(), Mutex::new(false), AtomicBool::new(false)));
49
50        for _ in 0..num_threads {
51            let queue = queue.clone();
52            let conds = conds.clone();
53            threads.push(Some(thread::spawn(|| { Self::loader_func(queue, conds) })))
54        }
55
56        Self {
57            threads,
58            queue,
59            conds,
60        }
61    }
62
63    pub fn load<T: Loadable + Send + Sync + 'static>(&mut self, item: T) -> mpsc::Receiver<<T as Loadable>::Output> {
64        let (loader, reciever) = SubLoader::new(item);
65
66        self.queue
67            .lock()
68            .unwrap()
69            .push_back(loader);
70
71        self.conds.0.notify_one();
72
73        reciever
74    }
75
76    fn loader_func(queue: LoadQueue, conditions: Arc<(Condvar, Mutex<bool>, AtomicBool)>) {
77        let (ref cond_var, ref mutex, ref should_close) = *conditions;
78
79        while !should_close.load(Ordering::SeqCst) {
80            {
81                let len = {
82                    queue.lock().unwrap().len()
83                };
84
85                if len < 1 {
86                    let guard = mutex.lock().unwrap();
87                    let _x = cond_var.wait(guard).unwrap();
88                }
89            }
90            
91            let data = {
92                let mut queue = queue.lock().unwrap();
93                queue.pop_front()
94            };
95            
96            match data {
97                Some(mut data) => {
98                    data.load();
99                    
100                    continue
101                },
102                None => {
103                }
104            }
105        }
106    }
107
108    /// Checks each thread to see if it has returned, and if it has returned, replaces the thread
109    pub fn restore_threads(&mut self) {
110        for handle in self.threads.iter_mut() {
111            let old = handle.take();
112
113            fn add_threads(
114                queue: LoadQueue, 
115                conds: Arc<(Condvar, Mutex<bool>, AtomicBool)>, 
116                handle: &mut Option<JoinHandle<()>>
117            ) {
118                *handle = Some(thread::spawn(|| { Loader::loader_func(queue, conds) }));
119            }
120
121            match old {
122                Some(thread) => {
123                    // Check if thread has closed
124                    if thread.is_finished() {
125                        _ = thread.join();
126
127                        add_threads(self.queue.clone(), self.conds.clone(), handle)
128                    }
129                },
130                None => {
131                    add_threads(self.queue.clone(), self.conds.clone(), handle)
132                }
133            }
134        }
135    }
136}
137
138impl Drop for Loader {
139    fn drop(&mut self) {
140        let (ref cond_var, _, ref should_close) = *self.conds; 
141
142        should_close.store(true, Ordering::SeqCst);
143        cond_var.notify_all();
144
145        for opt in self.threads.iter_mut() {
146            let thread = opt.take();
147
148            match thread {
149                Some(thread) => {
150                    _ = thread.join()
151                },
152                None => () 
153            }
154        }
155    }
156}