tokio/runtime/task/state.rs
1use crate::loom::sync::atomic::AtomicUsize;
2
3use std::fmt;
4use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5
6pub(super) struct State {
7 val: AtomicUsize,
8}
9
10/// Current state value.
11#[derive(Copy, Clone)]
12pub(super) struct Snapshot(usize);
13
14type UpdateResult = Result<Snapshot, Snapshot>;
15
16/// The task is currently being run.
17const RUNNING: usize = 0b0001;
18
19/// The task is complete.
20///
21/// Once this bit is set, it is never unset.
22const COMPLETE: usize = 0b0010;
23
24/// Extracts the task's lifecycle value from the state.
25const LIFECYCLE_MASK: usize = 0b11;
26
27/// Flag tracking if the task has been pushed into a run queue.
28const NOTIFIED: usize = 0b100;
29
30/// The join handle is still around.
31const JOIN_INTEREST: usize = 0b1_000;
32
33/// A join handle waker has been set.
34const JOIN_WAKER: usize = 0b10_000;
35
36/// The task has been forcibly cancelled.
37const CANCELLED: usize = 0b100_000;
38
39/// All bits.
40const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
41
42/// Bits used by the ref count portion of the state.
43const REF_COUNT_MASK: usize = !STATE_MASK;
44
45/// Number of positions to shift the ref count.
46const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
47
48/// One ref count.
49const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
50
51/// State a task is initialized with.
52///
53/// A task is initialized with three references:
54///
55/// * A reference that will be stored in an `OwnedTasks` or `LocalOwnedTasks`.
56/// * A reference that will be sent to the scheduler as an ordinary notification.
57/// * A reference for the `JoinHandle`.
58///
59/// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set.
60/// As the task starts with a `Notified`, `NOTIFIED` is set.
61const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED;
62
63#[must_use]
64pub(super) enum TransitionToRunning {
65 Success,
66 Cancelled,
67 Failed,
68 Dealloc,
69}
70
71#[must_use]
72pub(super) enum TransitionToIdle {
73 Ok,
74 OkNotified,
75 OkDealloc,
76 Cancelled,
77}
78
79#[must_use]
80pub(super) enum TransitionToNotifiedByVal {
81 DoNothing,
82 Submit,
83 Dealloc,
84}
85
86#[must_use]
87pub(crate) enum TransitionToNotifiedByRef {
88 DoNothing,
89 Submit,
90}
91
92#[must_use]
93pub(super) struct TransitionToJoinHandleDrop {
94 pub(super) drop_waker: bool,
95 pub(super) drop_output: bool,
96}
97
98/// All transitions are performed via RMW operations. This establishes an
99/// unambiguous modification order.
100impl State {
101 /// Returns a task's initial state.
102 pub(super) fn new() -> State {
103 // The raw task returned by this method has a ref-count of three. See
104 // the comment on INITIAL_STATE for more.
105 State {
106 val: AtomicUsize::new(INITIAL_STATE),
107 }
108 }
109
110 /// Loads the current state, establishes `Acquire` ordering.
111 pub(super) fn load(&self) -> Snapshot {
112 Snapshot(self.val.load(Acquire))
113 }
114
115 /// Attempts to transition the lifecycle to `Running`. This sets the
116 /// notified bit to false so notifications during the poll can be detected.
117 pub(super) fn transition_to_running(&self) -> TransitionToRunning {
118 self.fetch_update_action(|mut next| {
119 let action;
120 assert!(next.is_notified());
121
122 if !next.is_idle() {
123 // This happens if the task is either currently running or if it
124 // has already completed, e.g. if it was cancelled during
125 // shutdown. Consume the ref-count and return.
126 next.ref_dec();
127 if next.ref_count() == 0 {
128 action = TransitionToRunning::Dealloc;
129 } else {
130 action = TransitionToRunning::Failed;
131 }
132 } else {
133 // We are able to lock the RUNNING bit.
134 next.set_running();
135 next.unset_notified();
136
137 if next.is_cancelled() {
138 action = TransitionToRunning::Cancelled;
139 } else {
140 action = TransitionToRunning::Success;
141 }
142 }
143 (action, Some(next))
144 })
145 }
146
147 /// Transitions the task from `Running` -> `Idle`.
148 ///
149 /// The transition to `Idle` fails if the task has been flagged to be
150 /// cancelled.
151 pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
152 self.fetch_update_action(|curr| {
153 assert!(curr.is_running());
154
155 if curr.is_cancelled() {
156 return (TransitionToIdle::Cancelled, None);
157 }
158
159 let mut next = curr;
160 let action;
161 next.unset_running();
162
163 if !next.is_notified() {
164 // Polling the future consumes the ref-count of the Notified.
165 next.ref_dec();
166 if next.ref_count() == 0 {
167 action = TransitionToIdle::OkDealloc;
168 } else {
169 action = TransitionToIdle::Ok;
170 }
171 } else {
172 // The caller will schedule a new notification, so we create a
173 // new ref-count for the notification. Our own ref-count is kept
174 // for now, and the caller will drop it shortly.
175 next.ref_inc();
176 action = TransitionToIdle::OkNotified;
177 }
178
179 (action, Some(next))
180 })
181 }
182
183 /// Transitions the task from `Running` -> `Complete`.
184 pub(super) fn transition_to_complete(&self) -> Snapshot {
185 const DELTA: usize = RUNNING | COMPLETE;
186
187 let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
188 assert!(prev.is_running());
189 assert!(!prev.is_complete());
190
191 Snapshot(prev.0 ^ DELTA)
192 }
193
194 /// Transitions from `Complete` -> `Terminal`, decrementing the reference
195 /// count the specified number of times.
196 ///
197 /// Returns true if the task should be deallocated.
198 pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
199 let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
200 assert!(
201 prev.ref_count() >= count,
202 "current: {}, sub: {}",
203 prev.ref_count(),
204 count
205 );
206 prev.ref_count() == count
207 }
208
209 /// Transitions the state to `NOTIFIED`.
210 ///
211 /// If no task needs to be submitted, a ref-count is consumed.
212 ///
213 /// If a task needs to be submitted, the ref-count is incremented for the
214 /// new Notified.
215 pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal {
216 self.fetch_update_action(|mut snapshot| {
217 let action;
218
219 if snapshot.is_running() {
220 // If the task is running, we mark it as notified, but we should
221 // not submit anything as the thread currently running the
222 // future is responsible for that.
223 snapshot.set_notified();
224 snapshot.ref_dec();
225
226 // The thread that set the running bit also holds a ref-count.
227 assert!(snapshot.ref_count() > 0);
228
229 action = TransitionToNotifiedByVal::DoNothing;
230 } else if snapshot.is_complete() || snapshot.is_notified() {
231 // We do not need to submit any notifications, but we have to
232 // decrement the ref-count.
233 snapshot.ref_dec();
234
235 if snapshot.ref_count() == 0 {
236 action = TransitionToNotifiedByVal::Dealloc;
237 } else {
238 action = TransitionToNotifiedByVal::DoNothing;
239 }
240 } else {
241 // We create a new notified that we can submit. The caller
242 // retains ownership of the ref-count they passed in.
243 snapshot.set_notified();
244 snapshot.ref_inc();
245 action = TransitionToNotifiedByVal::Submit;
246 }
247
248 (action, Some(snapshot))
249 })
250 }
251
252 /// Transitions the state to `NOTIFIED`.
253 pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef {
254 self.fetch_update_action(|mut snapshot| {
255 if snapshot.is_complete() {
256 // The complete state is final
257 (TransitionToNotifiedByRef::DoNothing, None)
258 } else if snapshot.is_notified() {
259 // Even hough we have nothing to do in this branch,
260 // wake_by_ref() should synchronize-with the task starting execution,
261 // therefore we must use an Release store (with the same value),
262 // to pair with the Acquire in transition_to_running.
263 (TransitionToNotifiedByRef::DoNothing, Some(snapshot))
264 } else if snapshot.is_running() {
265 // If the task is running, we mark it as notified, but we should
266 // not submit as the thread currently running the future is
267 // responsible for that.
268 snapshot.set_notified();
269 (TransitionToNotifiedByRef::DoNothing, Some(snapshot))
270 } else {
271 // The task is idle and not notified. We should submit a
272 // notification.
273 snapshot.set_notified();
274 snapshot.ref_inc();
275 (TransitionToNotifiedByRef::Submit, Some(snapshot))
276 }
277 })
278 }
279
280 /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref
281 /// count.
282 ///
283 /// Returns `true` if the notified bit was transitioned from `0` to `1`;
284 /// otherwise `false.`
285 #[cfg(all(
286 tokio_unstable,
287 feature = "taskdump",
288 feature = "rt",
289 target_os = "linux",
290 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
291 ))]
292 pub(super) fn transition_to_notified_for_tracing(&self) -> bool {
293 self.fetch_update_action(|mut snapshot| {
294 if snapshot.is_notified() {
295 (false, None)
296 } else {
297 snapshot.set_notified();
298 snapshot.ref_inc();
299 (true, Some(snapshot))
300 }
301 })
302 }
303
304 /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
305 ///
306 /// Returns `true` if the task needs to be submitted to the pool for
307 /// execution.
308 pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
309 self.fetch_update_action(|mut snapshot| {
310 if snapshot.is_cancelled() || snapshot.is_complete() {
311 // Aborts to completed or cancelled tasks are no-ops.
312 (false, None)
313 } else if snapshot.is_running() {
314 // If the task is running, we mark it as cancelled. The thread
315 // running the task will notice the cancelled bit when it
316 // stops polling and it will kill the task.
317 //
318 // The set_notified() call is not strictly necessary but it will
319 // in some cases let a wake_by_ref call return without having
320 // to perform a compare_exchange.
321 snapshot.set_notified();
322 snapshot.set_cancelled();
323 (false, Some(snapshot))
324 } else {
325 // The task is idle. We set the cancelled and notified bits and
326 // submit a notification if the notified bit was not already
327 // set.
328 snapshot.set_cancelled();
329 if !snapshot.is_notified() {
330 snapshot.set_notified();
331 snapshot.ref_inc();
332 (true, Some(snapshot))
333 } else {
334 (false, Some(snapshot))
335 }
336 }
337 })
338 }
339
340 /// Sets the `CANCELLED` bit and attempts to transition to `Running`.
341 ///
342 /// Returns `true` if the transition to `Running` succeeded.
343 pub(super) fn transition_to_shutdown(&self) -> bool {
344 let mut prev = Snapshot(0);
345
346 let _ = self.fetch_update(|mut snapshot| {
347 prev = snapshot;
348
349 if snapshot.is_idle() {
350 snapshot.set_running();
351 }
352
353 // If the task was not idle, the thread currently running the task
354 // will notice the cancelled bit and cancel it once the poll
355 // completes.
356 snapshot.set_cancelled();
357 Some(snapshot)
358 });
359
360 prev.is_idle()
361 }
362
363 /// Optimistically tries to swap the state assuming the join handle is
364 /// __immediately__ dropped on spawn.
365 pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
366 use std::sync::atomic::Ordering::Relaxed;
367
368 // Relaxed is acceptable as if this function is called and succeeds,
369 // then nothing has been done w/ the join handle.
370 //
371 // The moment the join handle is used (polled), the `JOIN_WAKER` flag is
372 // set, at which point the CAS will fail.
373 //
374 // Given this, there is no risk if this operation is reordered.
375 self.val
376 .compare_exchange_weak(
377 INITIAL_STATE,
378 (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
379 Release,
380 Relaxed,
381 )
382 .map(|_| ())
383 .map_err(|_| ())
384 }
385
386 /// Unsets the `JOIN_INTEREST` flag. If `COMPLETE` is not set, the `JOIN_WAKER`
387 /// flag is also unset.
388 /// The returned `TransitionToJoinHandleDrop` indicates whether the `JoinHandle` should drop
389 /// the output of the future or the join waker after the transition.
390 pub(super) fn transition_to_join_handle_dropped(&self) -> TransitionToJoinHandleDrop {
391 self.fetch_update_action(|mut snapshot| {
392 assert!(snapshot.is_join_interested());
393
394 let mut transition = TransitionToJoinHandleDrop {
395 drop_waker: false,
396 drop_output: false,
397 };
398
399 snapshot.unset_join_interested();
400
401 if !snapshot.is_complete() {
402 // If `COMPLETE` is unset we also unset `JOIN_WAKER` to give the
403 // `JoinHandle` exclusive access to the waker following rule 6 in task/mod.rs.
404 // The `JoinHandle` will drop the waker if it has exclusive access
405 // to drop it.
406 snapshot.unset_join_waker();
407 } else {
408 // If `COMPLETE` is set the task is completed so the `JoinHandle` is responsible
409 // for dropping the output.
410 transition.drop_output = true;
411 }
412
413 if !snapshot.is_join_waker_set() {
414 // If the `JOIN_WAKER` bit is unset and the `JOIN_HANDLE` has exclusive access to
415 // the join waker and should drop it following this transition.
416 // This might happen in two situations:
417 // 1. The task is not completed and we just unset the `JOIN_WAKer` above in this
418 // function.
419 // 2. The task is completed. In that case the `JOIN_WAKER` bit was already unset
420 // by the runtime during completion.
421 transition.drop_waker = true;
422 }
423
424 (transition, Some(snapshot))
425 })
426 }
427
428 /// Sets the `JOIN_WAKER` bit.
429 ///
430 /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
431 /// the task has completed.
432 pub(super) fn set_join_waker(&self) -> UpdateResult {
433 self.fetch_update(|curr| {
434 assert!(curr.is_join_interested());
435 assert!(!curr.is_join_waker_set());
436
437 if curr.is_complete() {
438 return None;
439 }
440
441 let mut next = curr;
442 next.set_join_waker();
443
444 Some(next)
445 })
446 }
447
448 /// Unsets the `JOIN_WAKER` bit.
449 ///
450 /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if
451 /// the task has completed.
452 pub(super) fn unset_waker(&self) -> UpdateResult {
453 self.fetch_update(|curr| {
454 assert!(curr.is_join_interested());
455
456 if curr.is_complete() {
457 return None;
458 }
459
460 // If the task is completed, this bit may have been unset by
461 // `unset_waker_after_complete`.
462 assert!(curr.is_join_waker_set());
463
464 let mut next = curr;
465 next.unset_join_waker();
466
467 Some(next)
468 })
469 }
470
471 /// Unsets the `JOIN_WAKER` bit unconditionally after task completion.
472 ///
473 /// This operation requires the task to be completed.
474 pub(super) fn unset_waker_after_complete(&self) -> Snapshot {
475 let prev = Snapshot(self.val.fetch_and(!JOIN_WAKER, AcqRel));
476 assert!(prev.is_complete());
477 assert!(prev.is_join_waker_set());
478 Snapshot(prev.0 & !JOIN_WAKER)
479 }
480
481 pub(super) fn ref_inc(&self) {
482 use std::process;
483 use std::sync::atomic::Ordering::Relaxed;
484
485 // Using a relaxed ordering is alright here, as knowledge of the
486 // original reference prevents other threads from erroneously deleting
487 // the object.
488 //
489 // As explained in the [Boost documentation][1], Increasing the
490 // reference counter can always be done with memory_order_relaxed: New
491 // references to an object can only be formed from an existing
492 // reference, and passing an existing reference from one thread to
493 // another must already provide any required synchronization.
494 //
495 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
496 let prev = self.val.fetch_add(REF_ONE, Relaxed);
497
498 // If the reference count overflowed, abort.
499 if prev > isize::MAX as usize {
500 process::abort();
501 }
502 }
503
504 /// Returns `true` if the task should be released.
505 pub(super) fn ref_dec(&self) -> bool {
506 let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
507 assert!(prev.ref_count() >= 1);
508 prev.ref_count() == 1
509 }
510
511 /// Returns `true` if the task should be released.
512 pub(super) fn ref_dec_twice(&self) -> bool {
513 let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel));
514 assert!(prev.ref_count() >= 2);
515 prev.ref_count() == 2
516 }
517
518 fn fetch_update_action<F, T>(&self, mut f: F) -> T
519 where
520 F: FnMut(Snapshot) -> (T, Option<Snapshot>),
521 {
522 let mut curr = self.load();
523
524 loop {
525 let (output, next) = f(curr);
526 let next = match next {
527 Some(next) => next,
528 None => return output,
529 };
530
531 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
532
533 match res {
534 Ok(_) => return output,
535 Err(actual) => curr = Snapshot(actual),
536 }
537 }
538 }
539
540 fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
541 where
542 F: FnMut(Snapshot) -> Option<Snapshot>,
543 {
544 let mut curr = self.load();
545
546 loop {
547 let next = match f(curr) {
548 Some(next) => next,
549 None => return Err(curr),
550 };
551
552 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
553
554 match res {
555 Ok(_) => return Ok(next),
556 Err(actual) => curr = Snapshot(actual),
557 }
558 }
559 }
560}
561
562// ===== impl Snapshot =====
563
564impl Snapshot {
565 /// Returns `true` if the task is in an idle state.
566 pub(super) fn is_idle(self) -> bool {
567 self.0 & (RUNNING | COMPLETE) == 0
568 }
569
570 /// Returns `true` if the task has been flagged as notified.
571 pub(super) fn is_notified(self) -> bool {
572 self.0 & NOTIFIED == NOTIFIED
573 }
574
575 fn unset_notified(&mut self) {
576 self.0 &= !NOTIFIED;
577 }
578
579 fn set_notified(&mut self) {
580 self.0 |= NOTIFIED;
581 }
582
583 pub(super) fn is_running(self) -> bool {
584 self.0 & RUNNING == RUNNING
585 }
586
587 fn set_running(&mut self) {
588 self.0 |= RUNNING;
589 }
590
591 fn unset_running(&mut self) {
592 self.0 &= !RUNNING;
593 }
594
595 pub(super) fn is_cancelled(self) -> bool {
596 self.0 & CANCELLED == CANCELLED
597 }
598
599 fn set_cancelled(&mut self) {
600 self.0 |= CANCELLED;
601 }
602
603 /// Returns `true` if the task's future has completed execution.
604 pub(super) fn is_complete(self) -> bool {
605 self.0 & COMPLETE == COMPLETE
606 }
607
608 pub(super) fn is_join_interested(self) -> bool {
609 self.0 & JOIN_INTEREST == JOIN_INTEREST
610 }
611
612 fn unset_join_interested(&mut self) {
613 self.0 &= !JOIN_INTEREST;
614 }
615
616 pub(super) fn is_join_waker_set(self) -> bool {
617 self.0 & JOIN_WAKER == JOIN_WAKER
618 }
619
620 fn set_join_waker(&mut self) {
621 self.0 |= JOIN_WAKER;
622 }
623
624 fn unset_join_waker(&mut self) {
625 self.0 &= !JOIN_WAKER;
626 }
627
628 pub(super) fn ref_count(self) -> usize {
629 (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
630 }
631
632 fn ref_inc(&mut self) {
633 assert!(self.0 <= isize::MAX as usize);
634 self.0 += REF_ONE;
635 }
636
637 pub(super) fn ref_dec(&mut self) {
638 assert!(self.ref_count() > 0);
639 self.0 -= REF_ONE;
640 }
641}
642
643impl fmt::Debug for State {
644 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
645 let snapshot = self.load();
646 snapshot.fmt(fmt)
647 }
648}
649
650impl fmt::Debug for Snapshot {
651 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
652 fmt.debug_struct("Snapshot")
653 .field("is_running", &self.is_running())
654 .field("is_complete", &self.is_complete())
655 .field("is_notified", &self.is_notified())
656 .field("is_cancelled", &self.is_cancelled())
657 .field("is_join_interested", &self.is_join_interested())
658 .field("is_join_waker_set", &self.is_join_waker_set())
659 .field("ref_count", &self.ref_count())
660 .finish()
661 }
662}