Skip to main content

ethrex_trie/
threadpool.rs

1use crossbeam::channel::{Sender, select_biased, unbounded};
2use std::marker::Send;
3use std::thread::{Builder, Scope};
4
5pub struct ThreadPool<'scope> {
6    priority_sender: Sender<Box<dyn 'scope + Send + FnOnce()>>, // Implictly our threads in the thread pool have the receiver
7    nice_sender: Sender<Box<dyn 'scope + Send + FnOnce()>>, // Implictly our threads in the thread pool have the receiver
8}
9
10impl<'scope> ThreadPool<'scope> {
11    pub fn new(thread_count: usize, scope: &'scope Scope<'scope, '_>) -> Self {
12        let (priority_sender, priority_receiver) = unbounded::<Box<dyn 'scope + Send + FnOnce()>>();
13        let (nice_sender, nice_receiver) = unbounded::<Box<dyn 'scope + Send + FnOnce()>>();
14
15        for i in 0..thread_count {
16            let priority_receiver = priority_receiver.clone();
17            let nice_receiver = nice_receiver.clone();
18            let _ = Builder::new()
19                .name(format!("ThreadPool {i}"))
20                .spawn_scoped(scope, move || {
21                    // Thread work goes here
22                    while let Ok(task) = select_biased! {
23                        recv(priority_receiver) -> msg => msg,
24                        recv(nice_receiver) -> msg => msg,
25                    } {
26                        task();
27                    }
28                    // If one of the senders closes because the threadpool is dropped, the other one
29                    // channel may still exist and have data
30                    while let Ok(task) = priority_receiver.recv() {
31                        task();
32                    }
33                    while let Ok(task) = nice_receiver.recv() {
34                        task();
35                    }
36                });
37        }
38        ThreadPool {
39            priority_sender,
40            nice_sender,
41        }
42    }
43
44    pub fn execute(&self, task: Box<dyn 'scope + Send + FnOnce()>) {
45        self.nice_sender.send(task).unwrap();
46    }
47
48    pub fn execute_priority(&self, task: Box<dyn 'scope + Send + FnOnce()>) {
49        self.priority_sender.send(task).unwrap();
50    }
51}