orchidlang/libs/scheduler/thread_pool.rs
1//! A thread pool for executing tasks in parallel, spawning threads as workload
2//! increases and terminating them as tasks finish. This is not terribly
3//! efficient, its main design goal is to parallelize blocking I/O calls.
4//!
5//! This is the abstract implementation of the scheduler.
6
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::mpsc::{sync_channel, SyncSender};
9use std::sync::{Arc, Mutex};
10use std::thread::spawn;
11
12/// A trait for a task dispatched on a [ThreadPool]. The task owns all relevant
13/// data, is safe to pass between threads and is executed only once.
14pub trait Task: Send + 'static {
15 /// Execute the task. At a minimum, this involves signaling some other thread,
16 /// otherwise the task has no effect.
17 fn run(self);
18}
19
20impl<F: FnOnce() + Send + 'static> Task for F {
21 fn run(self) { self() }
22}
23
24/// An async unit of work that produces some result, see [Task]. This can be
25/// wrapped in a generic reporter to create a task.
26pub trait Query: Send + 'static {
27 /// The value produced by the query
28 type Result: Send + 'static;
29
30 /// Execute the query, producing some value which can then be sent to another
31 /// thread
32 fn run(self) -> Self::Result;
33
34 /// Associate the query with a reporter expressed in a plain function.
35 /// Note that because every lambda has a distinct type and every thread pool
36 /// runs exactly one type of task, this can appear only once in the code for
37 /// a given thread pool. It is practical in a narrow set of cases, most of the
38 /// time however you are better off defining an explicit reporter.
39 fn then<F: FnOnce(Self::Result) + Send + 'static>(self, callback: F) -> QueryTask<Self, F>
40 where Self: Sized {
41 QueryTask { query: self, callback }
42 }
43}
44impl<F: FnOnce() -> R + Send + 'static, R: Send + 'static> Query for F {
45 type Result = R;
46
47 fn run(self) -> Self::Result { self() }
48}
49
50/// A reporter that calls a statically known function with the result of a
51/// query. Constructed with [Query::then]
52pub struct QueryTask<Q: Query, F: FnOnce(Q::Result) + Send + 'static> {
53 query: Q,
54 callback: F,
55}
56impl<Q: Query, F: FnOnce(Q::Result) + Send + 'static> Task for QueryTask<Q, F> {
57 fn run(self) { (self.callback)(self.query.run()) }
58}
59
60enum Message<T: Task> {
61 Stop,
62 Task(T),
63}
64
65struct ThreadPoolData<T: Task> {
66 rdv_point: Mutex<Option<SyncSender<Message<T>>>>,
67 stopping: AtomicBool,
68}
69
70/// A thread pool to execute blocking I/O operations in parallel.
71/// This thread pool is pretty inefficient for CPU-bound operations because it
72/// spawns an unbounded number of concurrent threads and destroys them eagerly.
73/// It is assumed that the tasks at hand are substnatially but not incomparably
74/// more expensive than spawning a new thread.
75///
76/// If multiple threads finish their tasks, one waiting thread is kept, the
77/// rest exit. If all threads are busy, new threads are spawned when tasks
78/// arrive. To get rid of the last waiting thread, drop the thread pool.
79///
80/// ```
81/// use orchidlang::libs::scheduler::thread_pool::{Task, ThreadPool};
82///
83/// struct MyTask(&'static str);
84/// impl Task for MyTask {
85/// fn run(self) { println!("{}", self.0) }
86/// }
87///
88/// let pool = ThreadPool::new();
89///
90/// // spawns first thread
91/// pool.submit(MyTask("foo"));
92/// // probably spawns second thread
93/// pool.submit(MyTask("bar"));
94/// // either spawns third thread or reuses first
95/// pool.submit(MyTask("baz"));
96/// ```
97pub struct ThreadPool<T: Task> {
98 data: Arc<ThreadPoolData<T>>,
99}
100impl<T: Task> ThreadPool<T> {
101 /// Create a new thread pool. This just initializes the threadsafe
102 /// datastructures used to synchronize tasks and doesn't spawn any threads.
103 /// The first submission spawns the first thread.
104 pub fn new() -> Self {
105 Self {
106 data: Arc::new(ThreadPoolData {
107 rdv_point: Mutex::new(None),
108 stopping: AtomicBool::new(false),
109 }),
110 }
111 }
112
113 /// Submit a task to the thread pool. This tries to send the task to the
114 /// waiting thread, or spawn a new one. If a thread is done with its task
115 /// and finds that it another thread is already waiting, it exits.
116 pub fn submit(&self, task: T) {
117 let mut standby = self.data.rdv_point.lock().unwrap();
118 if let Some(port) = standby.take() {
119 (port.try_send(Message::Task(task))).expect(
120 "This channel cannot be disconnected unless the receiver crashes
121 between registering the sender and blocking for receive, and it cannot
122 be full because it's taken before insertion",
123 );
124 } else {
125 drop(standby);
126 let data = self.data.clone();
127 // worker thread created if all current ones are busy
128 spawn(move || {
129 let mut cur_task = task;
130 loop {
131 // Handle the task
132 cur_task.run();
133 // Apply for a new task if no other thread is doing so already
134 let mut standby_spot = data.rdv_point.lock().unwrap();
135 if standby_spot.is_some() {
136 return; // exit if we would be the second in line
137 }
138 let (sender, receiver) = sync_channel(1);
139 *standby_spot = Some(sender);
140 drop(standby_spot);
141 if data.stopping.load(Ordering::SeqCst) {
142 return; // exit if the pool was dropped before we applied
143 }
144 // Wait for the next event on the pool
145 let msg = (receiver.recv()).expect("We are holding a reference");
146 match msg {
147 // repeat with next task
148 Message::Task(task) => cur_task = task,
149 // exit if the pool is dropped
150 Message::Stop => return,
151 }
152 }
153 });
154 }
155 }
156}
157
158impl<T: Task> Default for ThreadPool<T> {
159 fn default() -> Self { Self::new() }
160}
161
162impl<T: Task> Drop for ThreadPool<T> {
163 // Ensure all threads exit properly
164 fn drop(&mut self) {
165 self.data.stopping.store(true, Ordering::SeqCst);
166 let mut rdv_point = self.data.rdv_point.lock().unwrap();
167 if let Some(pending) = rdv_point.take() {
168 // the worker has read the value of `stopping`
169 let _ = pending.send(Message::Stop);
170 }
171 }
172}