beekeeper/hive/inner/queue/mod.rs
1mod channel;
2#[cfg(feature = "retry")]
3mod retry;
4mod status;
5mod workstealing;
6
7pub use self::channel::ChannelTaskQueues;
8pub use self::workstealing::WorkstealingTaskQueues;
9
10#[cfg(feature = "retry")]
11use self::retry::RetryQueue;
12use self::status::Status;
13use super::{Config, Task, Token};
14use crate::bee::Worker;
15
16/// Errors that may occur when trying to pop tasks from the global queue.
17#[derive(thiserror::Error, Debug)]
18pub enum PopTaskError {
19 #[error("Global task queue is empty")]
20 Empty,
21 #[error("Global task queue is closed")]
22 Closed,
23}
24
25/// Trait that encapsulates the global and local task queues used by a `Hive` for managing tasks
26/// within and between worker threads.
27///
28/// This trait is sealed - it cannot be implemented outside of this crate.
29pub trait TaskQueues<W: Worker>: Sized + Send + Sync + 'static {
30 type WorkerQueues: WorkerQueues<W>;
31
32 /// Returns a new instance.
33 ///
34 /// The private `Token` is used to prevent this method from being called externally.
35 fn new(token: Token) -> Self;
36
37 /// Initializes the local queues for the given range of worker thread indices.
38 fn init_for_threads(&self, start_index: usize, end_index: usize, config: &Config);
39
40 /// Updates the queue settings from `config` for the given range of worker threads.
41 fn update_for_threads(&self, start_index: usize, end_index: usize, config: &Config);
42
43 /// Tries to add a task to the global queue.
44 ///
45 /// Returns an error with the task if the queue is disconnected.
46 fn try_push_global(&self, task: Task<W>) -> Result<(), Task<W>>;
47
48 /// Returns a `WorkerQueues` instance for the worker thread with the given `index`.
49 fn worker_queues(&self, thread_index: usize) -> Self::WorkerQueues;
50
51 /// Closes this `GlobalQueue` so no more tasks may be pushed.
52 ///
53 /// If `urgent` is `true`, this also prevents queued tasks from being popped.
54 ///
55 /// The private `Token` is used to prevent this method from being called externally.
56 fn close(&self, urgent: bool, token: Token);
57
58 /// Consumes this `TaskQueues` and Drains all tasks from all global and local queues and
59 /// returns them as a `Vec`.
60 ///
61 /// This method panics if `close` has not been called.
62 fn drain(self) -> Vec<Task<W>>;
63}
64
65/// Trait that provides access to the task queues to each worker thread. Implementations of this
66/// trait can hold thread-local types that are not Send/Sync.
67pub trait WorkerQueues<W: Worker> {
68 /// Attempts to add a task to the local queue if space is available, otherwise adds it to the
69 /// global queue. If adding to the global queue fails, the task is added to a local "abandoned"
70 /// queue from which it may be popped or will otherwise be converted to an `Unprocessed`
71 /// outcome.
72 fn push(&self, task: Task<W>);
73
74 /// Attempts to remove a task from the local queue for the given worker thread index. If there
75 /// are no local queues, or if the local queues are empty, falls back to taking a task from the
76 /// global queue.
77 ///
78 /// Returns an error if a task is not available, where each implementation may have a different
79 /// definition of "available".
80 ///
81 /// Also returns an error if the queues are closed.
82 fn try_pop(&self) -> Result<Task<W>, PopTaskError>;
83
84 /// Attempts to add `task` to the local retry queue.
85 ///
86 /// Returns the earliest `Instant` at which it might be retried. If the task could not be added
87 /// to the retry queue (e.g., if the queue is full), the task returned as an error.
88 #[cfg(feature = "retry")]
89 fn try_push_retry(&self, task: Task<W>) -> Result<std::time::Instant, Task<W>>;
90
91 /// Returns the unique index of the thread that owns this `WorkerQueues` instance.
92 #[cfg(test)]
93 fn thread_index(&self) -> usize;
94}