orchidlang/libs/scheduler/
thread_pool.rs

1//! A thread pool for executing tasks in parallel, spawning threads as workload
2//! increases and terminating them as tasks finish. This is not terribly
3//! efficient, its main design goal is to parallelize blocking I/O calls.
4//!
5//! This is the abstract implementation of the scheduler.
6
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::mpsc::{sync_channel, SyncSender};
9use std::sync::{Arc, Mutex};
10use std::thread::spawn;
11
12/// A trait for a task dispatched on a [ThreadPool]. The task owns all relevant
13/// data, is safe to pass between threads and is executed only once.
14pub trait Task: Send + 'static {
15  /// Execute the task. At a minimum, this involves signaling some other thread,
16  /// otherwise the task has no effect.
17  fn run(self);
18}
19
20impl<F: FnOnce() + Send + 'static> Task for F {
21  fn run(self) { self() }
22}
23
24/// An async unit of work that produces some result, see [Task]. This can be
25/// wrapped in a generic reporter to create a task.
26pub trait Query: Send + 'static {
27  /// The value produced by the query
28  type Result: Send + 'static;
29
30  /// Execute the query, producing some value which can then be sent to another
31  /// thread
32  fn run(self) -> Self::Result;
33
34  /// Associate the query with a reporter expressed in a plain function.
35  /// Note that because every lambda has a distinct type and every thread pool
36  /// runs exactly one type of task, this can appear only once in the code for
37  /// a given thread pool. It is practical in a narrow set of cases, most of the
38  /// time however you are better off defining an explicit reporter.
39  fn then<F: FnOnce(Self::Result) + Send + 'static>(self, callback: F) -> QueryTask<Self, F>
40  where Self: Sized {
41    QueryTask { query: self, callback }
42  }
43}
44impl<F: FnOnce() -> R + Send + 'static, R: Send + 'static> Query for F {
45  type Result = R;
46
47  fn run(self) -> Self::Result { self() }
48}
49
50/// A reporter that calls a statically known function with the result of a
51/// query. Constructed with [Query::then]
52pub struct QueryTask<Q: Query, F: FnOnce(Q::Result) + Send + 'static> {
53  query: Q,
54  callback: F,
55}
56impl<Q: Query, F: FnOnce(Q::Result) + Send + 'static> Task for QueryTask<Q, F> {
57  fn run(self) { (self.callback)(self.query.run()) }
58}
59
60enum Message<T: Task> {
61  Stop,
62  Task(T),
63}
64
65struct ThreadPoolData<T: Task> {
66  rdv_point: Mutex<Option<SyncSender<Message<T>>>>,
67  stopping: AtomicBool,
68}
69
70/// A thread pool to execute blocking I/O operations in parallel.
71/// This thread pool is pretty inefficient for CPU-bound operations because it
72/// spawns an unbounded number of concurrent threads and destroys them eagerly.
73/// It is assumed that the tasks at hand are substnatially but not incomparably
74/// more expensive than spawning a new thread.
75///
76/// If multiple threads finish their tasks, one waiting thread is kept, the
77/// rest exit. If all threads are busy, new threads are spawned when tasks
78/// arrive. To get rid of the last waiting thread, drop the thread pool.
79///
80/// ```
81/// use orchidlang::libs::scheduler::thread_pool::{Task, ThreadPool};
82///
83/// struct MyTask(&'static str);
84/// impl Task for MyTask {
85///   fn run(self) { println!("{}", self.0) }
86/// }
87///
88/// let pool = ThreadPool::new();
89///
90/// // spawns first thread
91/// pool.submit(MyTask("foo"));
92/// // probably spawns second thread
93/// pool.submit(MyTask("bar"));
94/// // either spawns third thread or reuses first
95/// pool.submit(MyTask("baz"));
96/// ```
97pub struct ThreadPool<T: Task> {
98  data: Arc<ThreadPoolData<T>>,
99}
100impl<T: Task> ThreadPool<T> {
101  /// Create a new thread pool. This just initializes the threadsafe
102  /// datastructures used to synchronize tasks and doesn't spawn any threads.
103  /// The first submission spawns the first thread.
104  pub fn new() -> Self {
105    Self {
106      data: Arc::new(ThreadPoolData {
107        rdv_point: Mutex::new(None),
108        stopping: AtomicBool::new(false),
109      }),
110    }
111  }
112
113  /// Submit a task to the thread pool. This tries to send the task to the
114  /// waiting thread, or spawn a new one. If a thread is done with its task
115  /// and finds that it another thread is already waiting, it exits.
116  pub fn submit(&self, task: T) {
117    let mut standby = self.data.rdv_point.lock().unwrap();
118    if let Some(port) = standby.take() {
119      (port.try_send(Message::Task(task))).expect(
120        "This channel cannot be disconnected unless the receiver crashes
121        between registering the sender and blocking for receive, and it cannot
122        be full because it's taken before insertion",
123      );
124    } else {
125      drop(standby);
126      let data = self.data.clone();
127      // worker thread created if all current ones are busy
128      spawn(move || {
129        let mut cur_task = task;
130        loop {
131          // Handle the task
132          cur_task.run();
133          // Apply for a new task if no other thread is doing so already
134          let mut standby_spot = data.rdv_point.lock().unwrap();
135          if standby_spot.is_some() {
136            return; // exit if we would be the second in line
137          }
138          let (sender, receiver) = sync_channel(1);
139          *standby_spot = Some(sender);
140          drop(standby_spot);
141          if data.stopping.load(Ordering::SeqCst) {
142            return; // exit if the pool was dropped before we applied
143          }
144          // Wait for the next event on the pool
145          let msg = (receiver.recv()).expect("We are holding a reference");
146          match msg {
147            // repeat with next task
148            Message::Task(task) => cur_task = task,
149            // exit if the pool is dropped
150            Message::Stop => return,
151          }
152        }
153      });
154    }
155  }
156}
157
158impl<T: Task> Default for ThreadPool<T> {
159  fn default() -> Self { Self::new() }
160}
161
162impl<T: Task> Drop for ThreadPool<T> {
163  // Ensure all threads exit properly
164  fn drop(&mut self) {
165    self.data.stopping.store(true, Ordering::SeqCst);
166    let mut rdv_point = self.data.rdv_point.lock().unwrap();
167    if let Some(pending) = rdv_point.take() {
168      // the worker has read the value of `stopping`
169      let _ = pending.send(Message::Stop);
170    }
171  }
172}