worker_pool/
msg.rs

1use super::*;
2
3// Here for future-proofing, in case we want to switch over to another mpsc implementation
4pub type Sender<T> = std::sync::mpsc::Sender<T>;
5pub type SyncSender<T> = std::sync::mpsc::SyncSender<T>;
6pub type Receiver<T> = std::sync::mpsc::Receiver<T>;
7pub type TryRecvError = std::sync::mpsc::TryRecvError;
8pub type RecvError = std::sync::mpsc::RecvError;
9pub type SendError<T> = std::sync::mpsc::SendError<T>;
10pub type TrySendError<T> = std::sync::mpsc::TrySendError<T>;
11
12// TODO: add unsafe ways to manually send messages
13
14/// A message sent from the manager to the workers
15#[derive(Clone, Debug, PartialEq)]
16pub enum DownMsg<Down: Send> {
17    /// Instructs the workers to stop execution and return as soon as possible.
18    /// See [`WorkerPool::execute`] on more information as to how this value should be handled.
19    Stop,
20    /// Instructs a worker to pause execution until the `Continue` message is received.
21    /// This value can safely be ignored, and what to do when another message is received while in the paused state is up to the worker.
22    Pause,
23    /// Instructs a worker to resume execution.
24    /// This value can safely be ignored.
25    Continue,
26    /// A customized message sent from the manager to the workers.
27    /// The `Down` type must implement [`Send`] and optionally [`Clone`] so that the messages can be sent.
28    Other(Down)
29}
30
31/// A message sent from a worker to the manager; contains the timestamp of its creation to
32/// allow [`RecvBurstIterator`] to stop early.
33#[derive(Clone, Debug)]
34pub struct UpMsg<Up: Send> {
35    time: Instant,
36    msg: Up
37}
38
39/// A wrapper around `Sender<UpMsg<Up>>`. This type implements `!Send`,
40/// as [`RecvAllIterator`] depends on this type being dropped whenever the thread holding it stops or panics.
41#[derive(Clone, Debug)]
42pub struct WorkerSender<Up: Send> {
43    sender: SyncSender<UpMsg<Up>>,
44}
45
46impl<Up: Send> UpMsg<Up> {
47    pub fn new(msg: Up) -> Self {
48        Self {
49            time: Instant::now(),
50            msg
51        }
52    }
53
54    pub fn time(&self) -> Instant {
55        self.time
56    }
57
58    pub fn get(self) -> Up {
59        self.msg
60    }
61}
62
63impl<Up: Send> WorkerSender<Up> {
64    pub(crate) fn new(sender: SyncSender<UpMsg<Up>>) -> Self {
65        Self {
66            sender
67        }
68    }
69
70    /// Sends a value on the channel, blocking if the channel is full.
71    /// The value is wrapped in an [`UpMsg`]
72    pub fn send(&self, msg: Up) -> Result<(), SendError<Up>> {
73        self.sender.send(UpMsg::new(msg)).map_err(|e| std::sync::mpsc::SendError(e.0.get()))
74    }
75
76    /// Tries to send a value on the channel. If the channel is full, then [`TrySendError::Full`] is returned instead.
77    pub fn try_send(&self, msg: Up) -> Result<(), TrySendError<Up>> {
78        use std::sync::mpsc::TrySendError;
79
80        self.sender.try_send(UpMsg::new(msg)).map_err(|e| {
81            match e {
82                TrySendError::Full(x) => TrySendError::Full(x.get()),
83                TrySendError::Disconnected(x) => TrySendError::Disconnected(x.get())
84            }
85        })
86    }
87}
88
89impl<Up: Send> !Send for WorkerSender<Up> {}
90
91/// A wrapper around Receiver<DownMsg<Down>>
92pub type WorkerReceiver<Down> = Receiver<DownMsg<Down>>;