beekeeper/hive/inner/queue/
mod.rs

1mod channel;
2#[cfg(feature = "retry")]
3mod retry;
4mod status;
5mod workstealing;
6
7pub use self::channel::ChannelTaskQueues;
8pub use self::workstealing::WorkstealingTaskQueues;
9
10#[cfg(feature = "retry")]
11use self::retry::RetryQueue;
12use self::status::Status;
13use super::{Config, Task, Token};
14use crate::bee::Worker;
15
16/// Errors that may occur when trying to pop tasks from the global queue.
17#[derive(thiserror::Error, Debug)]
18pub enum PopTaskError {
19    #[error("Global task queue is empty")]
20    Empty,
21    #[error("Global task queue is closed")]
22    Closed,
23}
24
25/// Trait that encapsulates the global and local task queues used by a `Hive` for managing tasks
26/// within and between worker threads.
27///
28/// This trait is sealed - it cannot be implemented outside of this crate.
29pub trait TaskQueues<W: Worker>: Sized + Send + Sync + 'static {
30    type WorkerQueues: WorkerQueues<W>;
31
32    /// Returns a new instance.
33    ///
34    /// The private `Token` is used to prevent this method from being called externally.
35    fn new(token: Token) -> Self;
36
37    /// Initializes the local queues for the given range of worker thread indices.
38    fn init_for_threads(&self, start_index: usize, end_index: usize, config: &Config);
39
40    /// Updates the queue settings from `config` for the given range of worker threads.
41    fn update_for_threads(&self, start_index: usize, end_index: usize, config: &Config);
42
43    /// Tries to add a task to the global queue.
44    ///
45    /// Returns an error with the task if the queue is disconnected.
46    fn try_push_global(&self, task: Task<W>) -> Result<(), Task<W>>;
47
48    /// Returns a `WorkerQueues` instance for the worker thread with the given `index`.
49    fn worker_queues(&self, thread_index: usize) -> Self::WorkerQueues;
50
51    /// Closes this `GlobalQueue` so no more tasks may be pushed.
52    ///
53    /// If `urgent` is `true`, this also prevents queued tasks from being popped.
54    ///
55    /// The private `Token` is used to prevent this method from being called externally.
56    fn close(&self, urgent: bool, token: Token);
57
58    /// Consumes this `TaskQueues` and Drains all tasks from all global and local queues and
59    /// returns them as a `Vec`.
60    ///
61    /// This method panics if `close` has not been called.
62    fn drain(self) -> Vec<Task<W>>;
63}
64
65/// Trait that provides access to the task queues to each worker thread. Implementations of this
66/// trait can hold thread-local types that are not Send/Sync.
67pub trait WorkerQueues<W: Worker> {
68    /// Attempts to add a task to the local queue if space is available, otherwise adds it to the
69    /// global queue. If adding to the global queue fails, the task is added to a local "abandoned"
70    /// queue from which it may be popped or will otherwise be converted to an `Unprocessed`
71    /// outcome.
72    fn push(&self, task: Task<W>);
73
74    /// Attempts to remove a task from the local queue for the given worker thread index. If there
75    /// are no local queues, or if the local queues are empty, falls back to taking a task from the
76    /// global queue.
77    ///
78    /// Returns an error if a task is not available, where each implementation may have a different
79    /// definition of "available".
80    ///
81    /// Also returns an error if the queues are closed.
82    fn try_pop(&self) -> Result<Task<W>, PopTaskError>;
83
84    /// Attempts to add `task` to the local retry queue.
85    ///
86    /// Returns the earliest `Instant` at which it might be retried. If the task could not be added
87    /// to the retry queue (e.g., if the queue is full), the task returned as an error.
88    #[cfg(feature = "retry")]
89    fn try_push_retry(&self, task: Task<W>) -> Result<std::time::Instant, Task<W>>;
90
91    /// Returns the unique index of the thread that owns this `WorkerQueues` instance.
92    #[cfg(test)]
93    fn thread_index(&self) -> usize;
94}