worker_pool/msg.rs
1use super::*;
2
3// Here for future-proofing, in case we want to switch over to another mpsc implementation
4pub type Sender<T> = std::sync::mpsc::Sender<T>;
5pub type SyncSender<T> = std::sync::mpsc::SyncSender<T>;
6pub type Receiver<T> = std::sync::mpsc::Receiver<T>;
7pub type TryRecvError = std::sync::mpsc::TryRecvError;
8pub type RecvError = std::sync::mpsc::RecvError;
9pub type SendError<T> = std::sync::mpsc::SendError<T>;
10pub type TrySendError<T> = std::sync::mpsc::TrySendError<T>;
11
12// TODO: add unsafe ways to manually send messages
13
14/// A message sent from the manager to the workers
15#[derive(Clone, Debug, PartialEq)]
16pub enum DownMsg<Down: Send> {
17 /// Instructs the workers to stop execution and return as soon as possible.
18 /// See [`WorkerPool::execute`] on more information as to how this value should be handled.
19 Stop,
20 /// Instructs a worker to pause execution until the `Continue` message is received.
21 /// This value can safely be ignored, and what to do when another message is received while in the paused state is up to the worker.
22 Pause,
23 /// Instructs a worker to resume execution.
24 /// This value can safely be ignored.
25 Continue,
26 /// A customized message sent from the manager to the workers.
27 /// The `Down` type must implement [`Send`] and optionally [`Clone`] so that the messages can be sent.
28 Other(Down)
29}
30
31/// A message sent from a worker to the manager; contains the timestamp of its creation to
32/// allow [`RecvBurstIterator`] to stop early.
33#[derive(Clone, Debug)]
34pub struct UpMsg<Up: Send> {
35 time: Instant,
36 msg: Up
37}
38
39/// A wrapper around `Sender<UpMsg<Up>>`. This type implements `!Send`,
40/// as [`RecvAllIterator`] depends on this type being dropped whenever the thread holding it stops or panics.
41#[derive(Clone, Debug)]
42pub struct WorkerSender<Up: Send> {
43 sender: SyncSender<UpMsg<Up>>,
44}
45
46impl<Up: Send> UpMsg<Up> {
47 pub fn new(msg: Up) -> Self {
48 Self {
49 time: Instant::now(),
50 msg
51 }
52 }
53
54 pub fn time(&self) -> Instant {
55 self.time
56 }
57
58 pub fn get(self) -> Up {
59 self.msg
60 }
61}
62
63impl<Up: Send> WorkerSender<Up> {
64 pub(crate) fn new(sender: SyncSender<UpMsg<Up>>) -> Self {
65 Self {
66 sender
67 }
68 }
69
70 /// Sends a value on the channel, blocking if the channel is full.
71 /// The value is wrapped in an [`UpMsg`]
72 pub fn send(&self, msg: Up) -> Result<(), SendError<Up>> {
73 self.sender.send(UpMsg::new(msg)).map_err(|e| std::sync::mpsc::SendError(e.0.get()))
74 }
75
76 /// Tries to send a value on the channel. If the channel is full, then [`TrySendError::Full`] is returned instead.
77 pub fn try_send(&self, msg: Up) -> Result<(), TrySendError<Up>> {
78 use std::sync::mpsc::TrySendError;
79
80 self.sender.try_send(UpMsg::new(msg)).map_err(|e| {
81 match e {
82 TrySendError::Full(x) => TrySendError::Full(x.get()),
83 TrySendError::Disconnected(x) => TrySendError::Disconnected(x.get())
84 }
85 })
86 }
87}
88
89impl<Up: Send> !Send for WorkerSender<Up> {}
90
91/// A wrapper around Receiver<DownMsg<Down>>
92pub type WorkerReceiver<Down> = Receiver<DownMsg<Down>>;