1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
use super::job::*;
use super::active_queue::*;
use super::queue_state::*;
use super::wake_thread::*;
use std::fmt;
use std::sync::*;
use std::thread;
use std::collections::vec_deque::*;
use futures::task;
use futures::task::{Context, Poll};
///
/// A job queue provides a list of jobs to perform in order
///
pub struct JobQueue {
/// The shared data for this queue is stored within a mutex
pub (super) core: Mutex<JobQueueCore>
}
///
/// The result of running a job
///
pub (super) enum JobStatus {
/// No jobs were waiting on the queue
NoJobsWaiting,
/// Job was run successfully
Finished,
}
///
/// Structure protected by the jobqueue matrix
///
pub (super) struct JobQueueCore {
/// The jobs that are scheduled on this queue
pub (super) queue: VecDeque<Box<dyn ScheduledJob>>,
/// The current state of this queue
pub (super) state: QueueState,
/// If something is blocked on this queue, a condition variable to wake it up
pub (super) wake_blocked: Vec<Weak<Condvar>>,
}
impl fmt::Debug for JobQueue {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let core = self.core.lock().expect("JobQueue core lock");
fmt.write_str(&format!("JobQueue: State: {:?}, Pending: {}", core.state, core.queue.len()))
}
}
impl JobQueue {
///
/// Creates a new job queue
///
pub (super) fn new() -> JobQueue {
JobQueue {
core: Mutex::new(JobQueueCore {
queue: VecDeque::new(),
state: QueueState::Idle,
wake_blocked: vec![],
})
}
}
///
/// If there are any jobs waiting, dequeues the next one
///
pub (super) fn dequeue(&self) -> Option<Box<dyn ScheduledJob>> {
let mut core = self.core.lock().expect("JobQueue core lock");
match core.state {
QueueState::WaitingForWake => None,
QueueState::WaitingForPoll(_) => None,
QueueState::WaitingForUnpark => None,
other => {
debug_assert!(other.is_running(), "State is {:?}", core.state);
core.queue.pop_front()
}
}
}
///
/// Adds a job to the front of the queue (so it's the next one to run)
///
pub (super) fn requeue(&self, job: Box<dyn ScheduledJob>) {
let mut core = self.core.lock().expect("JobQueue core lock");
core.queue.push_front(job);
}
///
/// Runs jobs on this queue until there are none left, marking the job as inactive when done
///
pub (super) fn drain(&self, context: &mut Context) {
let _active = ActiveQueue { queue: self };
debug_assert!(self.core.lock().unwrap().state.is_running());
let mut done = false;
while !done {
// Run jobs until the queue is drained or blocks
while let Some(mut job) = self.dequeue() {
debug_assert!(self.core.lock().unwrap().state.is_running());
let poll_result = job.run(context);
match poll_result {
Poll::Ready(()) => { },
Poll::Pending => {
// Job needs requeuing
self.requeue(job);
// Queue should move from the 'running' state to the 'waiting for wake' state
let mut core = self.core.lock().expect("JobQueue core lock");
core.state = match core.state {
QueueState::Running => QueueState::WaitingForWake,
QueueState::AwokenWhileRunning => QueueState::Running,
other => other
};
if core.state == QueueState::WaitingForWake {
return;
}
}
}
}
// Try to move back to the 'not running' state
{
let mut core = self.core.lock().expect("JobQueue core lock");
debug_assert!(core.state.is_running());
// If the queue is empty at the point where we obtain the lock, we can deactivate ourselves
if core.queue.len() == 0 {
if core.state.is_running() {
core.state = QueueState::Idle;
}
done = true;
} else if core.state == QueueState::Pending {
// Will restart when we get re-scheduled
done = true;
}
}
}
}
///
/// With the queue already in the running state, dequeues a single job and runs it synchronously on the current thread
///
pub (super) fn run_one_job_now(queue: &Arc<JobQueue>) -> JobStatus {
if let Some(mut job) = queue.dequeue() {
// Queue is running
debug_assert!(queue.core.lock().unwrap().state.is_running());
let waker = Arc::new(WakeThread(Arc::clone(queue), thread::current()));
let waker = task::waker_ref(&waker);
let mut context = Context::from_waker(&waker);
loop {
let poll_result = job.run(&mut context);
match poll_result {
// A ready result ends the loop
Poll::Ready(()) => break,
Poll::Pending => {
// Try to move to the parking state
let should_park = {
let mut core = queue.core.lock().unwrap();
core.state = match core.state {
QueueState::AwokenWhileRunning => QueueState::Running,
QueueState::Running => QueueState::WaitingForUnpark,
other => panic!("Queue was in unexpected state {:?}", other)
};
core.state == QueueState::WaitingForUnpark
};
// Park until the queue state returns changes
if should_park {
// If should_park is set to false, the queue was awoken very quickly
loop {
let current_state = { queue.core.lock().unwrap().state };
match current_state {
QueueState::Running => break,
QueueState::AwokenWhileRunning => break,
QueueState::WaitingForUnpark => (),
other => panic!("Queue was in unexpected state {:?}", other)
}
// Park until we're awoken from the other thread (once awoken, we re-check the state)
thread::park();
}
}
}
}
}
// Queue should still be running once we resume
debug_assert!(queue.core.lock().unwrap().state.is_running());
JobStatus::Finished
} else {
JobStatus::NoJobsWaiting
}
}
}