spawns_core/
task.rs

1use crate::spawn_with_name;
2use std::any::Any;
3use std::borrow::Cow;
4use std::cell::Cell;
5use std::fmt::{self, Debug, Display, Formatter};
6use std::future::Future;
7use std::mem;
8use std::mem::ManuallyDrop;
9use std::panic::{self, AssertUnwindSafe};
10use std::pin::Pin;
11use std::ptr;
12use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll, Waker};
15
16thread_local! {
17    static RUNNING_ID: Cell<RawId> = const { Cell::new(RawId(0)) };
18}
19
20#[repr(usize)]
21#[derive(Clone, Copy, PartialEq)]
22enum Stage {
23    Running = 0,
24    Finished = 1,
25    Detached = 2,
26}
27
28impl From<usize> for Stage {
29    fn from(v: usize) -> Self {
30        match v {
31            0 => Stage::Running,
32            1 => Stage::Finished,
33            2 => Stage::Detached,
34            _ => unreachable!("{v} is not valid for Stage"),
35        }
36    }
37}
38
39enum JoinResult<T> {
40    Empty,
41    Joining(Waker),
42    Joined,
43    Finished(Result<T, InnerJoinError>),
44    Detached,
45}
46
47#[derive(Clone, Copy)]
48#[cfg_attr(test, derive(Debug, PartialEq))]
49struct RawId(u64);
50
51impl From<RawId> for Id {
52    fn from(id: RawId) -> Id {
53        Id(id.0)
54    }
55}
56
57impl RawId {
58    fn run(self) -> RawId {
59        RUNNING_ID.with(|id| {
60            let previous = id.get();
61            id.set(self);
62            previous
63        })
64    }
65
66    pub(crate) fn enter(&self) -> IdScope {
67        let id = RawId(self.0);
68        let previous = id.run();
69        IdScope { previous }
70    }
71
72    fn next() -> RawId {
73        // We could reclaim id with help from `Drop` of `JoinHandle` and `Task`.
74        static ID: AtomicU64 = AtomicU64::new(1);
75        Self(ID.fetch_add(1, Ordering::Relaxed))
76    }
77}
78
79/// Unique among running tasks.
80///
81/// There is no guarantee about its uniqueness among all spawned tasks. One could be reclaimed
82/// after task finished and joined.
83///
84/// # Cautions
85/// * Runtime should not rely on its uniqueness after task dropped.
86/// * Client should not rely on tis uniqueness after task joined.
87///
88/// It is intentional to not `Copy` but `Clone` to make it verbose to create a new one to avoid abusing.
89#[derive(Clone, Hash, PartialEq, Eq)]
90pub struct Id(u64);
91
92struct IdScope {
93    previous: RawId,
94}
95
96impl Drop for IdScope {
97    fn drop(&mut self) {
98        self.previous.run();
99    }
100}
101
102impl Id {
103    fn as_raw(&self) -> RawId {
104        RawId(self.0)
105    }
106}
107
108impl Display for Id {
109    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
110        if self.0 <= u32::MAX.into() {
111            write!(f, "{:#010x}", self.0 as u32)
112        } else {
113            write!(f, "{:#018x}", self.0)
114        }
115    }
116}
117
118impl Debug for Id {
119    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
120        write!(f, "TaskId({self})")
121    }
122}
123
124/// Thin wrapper around task to accommodate possible new members.
125#[non_exhaustive]
126pub struct Task {
127    pub id: Id,
128    pub name: Name,
129    pub future: Box<dyn Future<Output = ()> + Send + 'static>,
130}
131
132impl Task {
133    pub(crate) fn new<T: Send, F: Future<Output = T> + Send + 'static>(
134        name: Name,
135        future: F,
136    ) -> (Self, JoinHandle<F::Output>) {
137        let (id, future) = IdFuture::new(future);
138        let future = TaskFuture::new(future);
139        let handle = JoinHandle::new(
140            id.as_raw(),
141            future.joint.clone(),
142            future.cancellation.clone(),
143        );
144        let task = Self {
145            id,
146            name,
147            future: Box::new(future),
148        };
149        (task, handle)
150    }
151
152    pub fn name(&self) -> &str {
153        self.name.as_str()
154    }
155
156    pub fn try_name(&self) -> Option<&str> {
157        self.name.try_as_str()
158    }
159}
160
161/// Task name.
162#[derive(Clone, Default, Hash, PartialEq, Eq, Debug)]
163pub struct Name {
164    name: Option<Cow<'static, str>>,
165}
166
167impl Name {
168    /// Returns task name as str. `unnamed` for unnamed task.
169    pub fn as_str(&self) -> &str {
170        self.try_as_str().unwrap_or("unnamed")
171    }
172
173    /// Returns task name as str. [None] for unnamed task.
174    pub fn try_as_str(&self) -> Option<&str> {
175        self.name.as_ref().map(|n| n.as_ref())
176    }
177
178    /// Decomposes this into [Cow] str.
179    pub fn into(self) -> Option<Cow<'static, str>> {
180        self.name
181    }
182
183    fn name(self, name: impl Into<Cow<'static, str>>) -> Self {
184        Self {
185            name: Some(name.into()),
186        }
187    }
188}
189
190/// Builder to spawn task with more options.
191#[derive(Default)]
192pub struct Builder {
193    name: Name,
194}
195
196impl Builder {
197    /// Constructs a builder to spawn task with more options.
198    pub fn new() -> Self {
199        Self::default()
200    }
201
202    /// Names spawning task.
203    pub fn name(self, name: impl Into<Cow<'static, str>>) -> Self {
204        Self {
205            name: self.name.name(name),
206        }
207    }
208
209    /// Spawns a task with configured options.
210    pub fn spawn<T, F>(self, f: F) -> JoinHandle<T>
211    where
212        F: Future<Output = T> + Send + 'static,
213        T: Send + 'static,
214    {
215        spawn_with_name(self.name, f)
216    }
217}
218
219struct Joint<T> {
220    stage: AtomicUsize,
221    result: Mutex<JoinResult<T>>,
222}
223
224impl<T> Joint<T> {
225    fn new() -> Self {
226        Self {
227            stage: AtomicUsize::new(Stage::Running as usize),
228            result: Mutex::new(JoinResult::Empty),
229        }
230    }
231
232    fn wake(&self, value: Result<T, InnerJoinError>) {
233        let stage = self.stage();
234        if stage != Stage::Running {
235            return;
236        }
237        let mut lock = self.result.lock().unwrap();
238        let result = mem::replace(&mut *lock, JoinResult::Finished(value));
239        drop(lock);
240        // Update stage after lock released as atomic Relaxed carrying no happen-before relationship.
241        self.stage
242            .store(Stage::Finished as usize, Ordering::Relaxed);
243        match result {
244            JoinResult::Empty => {}
245            JoinResult::Joining(waker) => waker.wake(),
246            JoinResult::Finished(_) | JoinResult::Joined => unreachable!("task completed already"),
247            JoinResult::Detached => *self.result.lock().unwrap() = JoinResult::Detached,
248        }
249    }
250
251    fn stage(&self) -> Stage {
252        Stage::from(self.stage.load(Ordering::Relaxed))
253    }
254}
255
256struct IdFuture<F: Future> {
257    id: RawId,
258    future: F,
259}
260
261impl<F: Future> IdFuture<F> {
262    pub fn new(future: F) -> (Id, Self) {
263        let id = RawId::next();
264        let future = Self { id, future };
265        (id.into(), future)
266    }
267}
268
269impl<F: Future> Future for IdFuture<F> {
270    type Output = F::Output;
271
272    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273        let _scope = self.id.enter();
274        let task = unsafe { self.get_unchecked_mut() };
275        let future = unsafe { Pin::new_unchecked(&mut task.future) };
276        future.poll(cx)
277    }
278}
279
280struct TaskFuture<F: Future> {
281    waker: Option<Box<Waker>>,
282    cancellation: Arc<Cancellation>,
283    joint: Arc<Joint<F::Output>>,
284    future: Option<F>,
285}
286
287impl<F: Future> TaskFuture<F> {
288    fn new(future: F) -> Self {
289        Self {
290            waker: None,
291            joint: Arc::new(Joint::new()),
292            cancellation: Arc::new(Default::default()),
293            future: Some(future),
294        }
295    }
296
297    fn finish(&mut self, value: Result<F::Output, InnerJoinError>) -> Poll<()> {
298        self.future = None;
299        self.joint.wake(value);
300        Poll::Ready(())
301    }
302}
303
304impl<F: Future> Drop for TaskFuture<F> {
305    fn drop(&mut self) {
306        if self.future.is_some() {
307            let _ = self.finish(Err(InnerJoinError::Cancelled));
308        }
309    }
310}
311
312impl<F: Future> Future for TaskFuture<F> {
313    type Output = ();
314
315    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
316        let task = unsafe { self.get_unchecked_mut() };
317        if task.future.is_none() {
318            return Poll::Ready(());
319        } else if task.cancellation.is_cancelled() {
320            return task.finish(Err(InnerJoinError::Cancelled));
321        }
322        let future = unsafe { Pin::new_unchecked(task.future.as_mut().unwrap_unchecked()) };
323        match panic::catch_unwind(AssertUnwindSafe(|| future.poll(cx))) {
324            Ok(Poll::Pending) => {
325                let waker = match task.waker.take() {
326                    None => Box::new(cx.waker().clone()),
327                    Some(mut waker) => {
328                        waker.as_mut().clone_from(cx.waker());
329                        waker
330                    }
331                };
332                let Ok(waker) = task.cancellation.update(waker) else {
333                    return task.finish(Err(InnerJoinError::Cancelled));
334                };
335                task.waker = waker;
336                Poll::Pending
337            }
338            Ok(Poll::Ready(value)) => task.finish(Ok(value)),
339            Err(err) => task.finish(Err(InnerJoinError::Panic(err))),
340        }
341    }
342}
343
344#[derive(Default)]
345struct Cancellation {
346    waker: AtomicPtr<Waker>,
347}
348
349impl Cancellation {
350    const CANCELLED: *mut Waker = Self::cancelled();
351
352    const fn cancelled() -> *mut Waker {
353        Cancellation::cancel as *const fn() as *mut Waker
354    }
355
356    pub fn is_cancelled(&self) -> bool {
357        let current = self.waker.load(Ordering::Relaxed);
358        ptr::eq(current, Self::CANCELLED)
359    }
360
361    pub fn cancel(&self) {
362        let mut current = self.waker.load(Ordering::Relaxed);
363        loop {
364            if ptr::eq(current, Self::CANCELLED) {
365                return;
366            }
367            match self.waker.compare_exchange(
368                current,
369                Self::CANCELLED,
370                Ordering::Relaxed,
371                Ordering::Relaxed,
372            ) {
373                Err(newer) => current = newer,
374                _ => {
375                    if !ptr::eq(current, ptr::null()) {
376                        let waker = unsafe { Box::from_raw(current) };
377                        waker.wake();
378                    }
379                    return;
380                }
381            }
382        }
383    }
384
385    pub fn update(&self, mut waker: Box<Waker>) -> Result<Option<Box<Waker>>, ()> {
386        let raw_waker = waker.as_mut() as *mut Waker;
387        let mut current = self.waker.load(Ordering::Relaxed);
388        loop {
389            if ptr::eq(current, Self::CANCELLED) {
390                return Err(());
391            }
392            match self.waker.compare_exchange(
393                current,
394                raw_waker,
395                Ordering::Relaxed,
396                Ordering::Relaxed,
397            ) {
398                Err(newer) => current = newer,
399                _ => {
400                    mem::forget(waker);
401                    if !ptr::eq(current, ptr::null()) {
402                        let waker = unsafe { Box::from_raw(current) };
403                        return Ok(Some(waker));
404                    }
405                    return Ok(None);
406                }
407            }
408        }
409    }
410}
411
412impl Drop for Cancellation {
413    fn drop(&mut self) {
414        let waker = self.waker.load(Ordering::Relaxed);
415        if ptr::eq(waker, ptr::null()) || ptr::eq(waker, Self::CANCELLED) {
416            return;
417        }
418        let _ = unsafe { Box::from_raw(waker) };
419    }
420}
421
422enum InnerJoinError {
423    Cancelled,
424    Panic(Box<dyn Any + Send + 'static>),
425}
426
427/// Error in polling [JoinHandle].
428pub struct JoinError {
429    id: Id,
430    inner: InnerJoinError,
431}
432
433impl JoinError {
434    /// Consumes this into panic if it represents a panic error.
435    pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, JoinError> {
436        match self.inner {
437            InnerJoinError::Panic(p) => Ok(p),
438            _ => Err(self),
439        }
440    }
441
442    /// Consumes this into panic if it represents a panic error, panic otherwise.
443    pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
444        match self.try_into_panic() {
445            Ok(panic) => panic,
446            Err(err) => panic!("task {} does not panic, but cancelled", err.id),
447        }
448    }
449
450    /// Resumes the catched panic if it represents a panic error, otherwise panic with error
451    /// reason.
452    pub fn resume_panic(self) -> ! {
453        panic::resume_unwind(self.into_panic())
454    }
455
456    /// Returns true if task panic in polling.
457    pub fn is_panic(&self) -> bool {
458        matches!(&self.inner, InnerJoinError::Panic(_))
459    }
460
461    /// Returns true if task is cancelled.
462    pub fn is_cancelled(&self) -> bool {
463        matches!(&self.inner, InnerJoinError::Cancelled)
464    }
465}
466
467impl Debug for JoinError {
468    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
469        match &self.inner {
470            InnerJoinError::Cancelled => write!(fmt, "JoinError::Cancelled({:?})", self.id),
471            InnerJoinError::Panic(_panic) => write!(fmt, "JoinError::Panic({:?}, ...)", self.id),
472        }
473    }
474}
475
476impl Display for JoinError {
477    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
478        match &self.inner {
479            InnerJoinError::Cancelled => write!(fmt, "task {} was cancelled", self.id),
480            InnerJoinError::Panic(_panic) => write!(fmt, "task {} panicked", self.id),
481        }
482    }
483}
484
485impl std::error::Error for JoinError {}
486
487/// Handle to cancel associated task.
488#[derive(Clone)]
489pub struct CancelHandle {
490    cancellation: Arc<Cancellation>,
491}
492
493unsafe impl Send for CancelHandle {}
494unsafe impl Sync for CancelHandle {}
495
496impl CancelHandle {
497    fn new(cancellation: Arc<Cancellation>) -> Self {
498        Self { cancellation }
499    }
500
501    /// Cancels associated task with this handle.
502    ///
503    /// Cancellation is inherently concurrent with task execution. Currently, there is no guarantee
504    /// about promptness, the task could even run to complete normally after cancellation.
505    pub fn cancel(&self) {
506        self.cancellation.cancel()
507    }
508}
509
510/// An owned permission to cancel task on [Drop] besides join and cancel on demand.
511#[must_use = "task will be cancelled on drop, detach it if this is undesired"]
512pub struct TaskHandle<T> {
513    handle: JoinHandle<T>,
514}
515
516impl<T> TaskHandle<T> {
517    /// Gets id of associated task.
518    ///
519    /// # Cautions
520    /// See cautions of [Id].
521    pub fn id(&self) -> Id {
522        self.handle.id()
523    }
524
525    /// Detaches task from permission to cancel on [Drop].
526    pub fn detach(self) -> JoinHandle<T> {
527        let task = ManuallyDrop::new(self);
528        unsafe { ptr::read(&task.handle) }
529    }
530
531    /// Cancels owning task.
532    ///
533    /// Cancellation is inherently concurrent with task execution. Currently, there is no guarantee
534    /// about promptness, the task could even run to complete normally after cancellation.
535    pub fn cancel(self) -> JoinHandle<T> {
536        self.handle.cancel();
537        self.detach()
538    }
539
540    /// Creates a handle to cancel associated task.
541    pub fn cancel_handle(&self) -> CancelHandle {
542        self.handle.cancel_handle()
543    }
544}
545
546impl<T> Drop for TaskHandle<T> {
547    fn drop(&mut self) {
548        self.handle.cancel()
549    }
550}
551
552impl<T> Future for TaskHandle<T> {
553    type Output = Result<T, JoinError>;
554
555    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
556        let handle = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().handle) };
557        handle.poll(cx)
558    }
559}
560
561/// Handle to join or cancel associated task.
562pub struct JoinHandle<T> {
563    id: RawId,
564    joint: Arc<Joint<T>>,
565    cancellation: Arc<Cancellation>,
566}
567
568impl<T> JoinHandle<T> {
569    /// Gets id of associated task.
570    ///
571    /// # Cautions
572    /// See cautions of [Id].
573    pub fn id(&self) -> Id {
574        self.id.into()
575    }
576
577    fn new(id: RawId, joint: Arc<Joint<T>>, cancellation: Arc<Cancellation>) -> Self {
578        Self {
579            id,
580            joint,
581            cancellation,
582        }
583    }
584
585    /// Cancels associated task with this handle.
586    ///
587    /// Cancellation is inherently concurrent with task execution. Currently, there is no guarantee
588    /// about promptness, the task could even run to complete normally after cancellation.
589    pub fn cancel(&self) {
590        self.cancellation.cancel()
591    }
592
593    /// Attaches to associated task to gain cancel on [Drop] permission.
594    pub fn attach(self) -> TaskHandle<T> {
595        TaskHandle { handle: self }
596    }
597
598    #[cfg(test)]
599    unsafe fn clone(&self) -> Self {
600        Self {
601            id: self.id,
602            joint: self.joint.clone(),
603            cancellation: self.cancellation.clone(),
604        }
605    }
606
607    /// Creates a handle to cancel associated task.
608    pub fn cancel_handle(&self) -> CancelHandle {
609        CancelHandle::new(self.cancellation.clone())
610    }
611}
612
613impl<T> Drop for JoinHandle<T> {
614    fn drop(&mut self) {
615        self.joint
616            .stage
617            .store(Stage::Detached as usize, Ordering::Relaxed);
618        *self.joint.result.lock().unwrap() = JoinResult::Detached;
619    }
620}
621
622impl<T> Future for JoinHandle<T> {
623    type Output = Result<T, JoinError>;
624
625    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
626        let stage = self.joint.stage();
627        if stage == Stage::Finished {
628            let mut lock = self.joint.result.lock().unwrap();
629            let result = mem::replace(&mut *lock, JoinResult::Joined);
630            drop(lock);
631            return match result {
632                JoinResult::Finished(result) => Poll::Ready(result.map_err(|inner| JoinError {
633                    id: self.id.into(),
634                    inner,
635                })),
636                JoinResult::Joined => panic!("task({}) already joined", Id(self.id.0)),
637                JoinResult::Detached => panic!("task({}) already detached", Id(self.id.0)),
638                _ => unreachable!("get no task result after stage finished"),
639            };
640        }
641        let waker = cx.waker().clone();
642        let mut lock = self.joint.result.lock().unwrap();
643        let result = mem::replace(&mut *lock, JoinResult::Joining(waker));
644        drop(lock);
645        match result {
646            JoinResult::Finished(result) => {
647                *self.joint.result.lock().unwrap() = JoinResult::Joined;
648                Poll::Ready(result.map_err(|inner| JoinError {
649                    id: self.id.into(),
650                    inner,
651                }))
652            }
653            JoinResult::Empty | JoinResult::Joining(_) => Poll::Pending,
654            JoinResult::Joined => panic!("task({}) already joined", Id(self.id.0)),
655            JoinResult::Detached => panic!("task({}) already detached", Id(self.id.0)),
656        }
657    }
658}
659
660/// Gets running task's id. Panic if no reside in managed async task.
661///
662/// # Cautions
663/// See cautions of [Id].
664pub fn id() -> Id {
665    try_id().expect("id(): no task running")
666}
667
668/// Gets running task's id. [None] if no reside in managed async task.
669///
670/// # Cautions
671/// See cautions of [Id].
672pub fn try_id() -> Option<Id> {
673    let id = RUNNING_ID.get();
674    if id.0 == 0 {
675        None
676    } else {
677        Some(id.into())
678    }
679}
680
681#[cfg(test)]
682mod tests {
683    use std::sync::atomic::AtomicBool;
684    use std::time::{Duration, Instant};
685
686    use super::*;
687    use futures::executor::block_on;
688    use futures::task::noop_waker;
689    use std::future::{pending, ready};
690
691    use static_assertions::*;
692
693    assert_impl_all!(Id: Clone);
694    assert_not_impl_any!(Id: Copy);
695
696    assert_impl_all!(JoinHandle<()>: Send, Sync);
697    assert_not_impl_any!(JoinHandle<()>: Clone, Copy);
698
699    assert_impl_all!(TaskHandle<()>: Send, Sync);
700    assert_not_impl_any!(TaskHandle<()>: Clone, Copy);
701
702    assert_impl_all!(CancelHandle: Send, Sync);
703
704    #[test]
705    #[should_panic(expected = "no task running")]
706    fn id_no() {
707        id();
708    }
709
710    #[test]
711    fn id_ok() {
712        let (id, future) = IdFuture::new(async { id() });
713        assert_eq!(block_on(future), id);
714    }
715
716    #[test]
717    fn try_id_no() {
718        assert_eq!(try_id(), None);
719    }
720
721    #[test]
722    fn try_id_ok() {
723        let (id, future) = IdFuture::new(async { try_id() });
724        assert_eq!(block_on(future), Some(id));
725    }
726
727    #[test]
728    fn id_display() {
729        assert_eq!(Id(2).to_string(), "0x00000002");
730        assert_eq!(Id(u32::MAX as u64).to_string(), "0xffffffff");
731        assert_eq!(Id(u32::MAX as u64 + 1).to_string(), "0x0000000100000000");
732    }
733
734    #[test]
735    fn id_debug() {
736        assert_eq!(format!("{:?}", Id(2)), "TaskId(0x00000002)");
737        assert_eq!(format!("{:?}", Id(u32::MAX as u64)), "TaskId(0xffffffff)");
738        assert_eq!(
739            format!("{:?}", Id(u32::MAX as u64 + 1)),
740            "TaskId(0x0000000100000000)"
741        );
742    }
743
744    #[test]
745    fn id_raw() {
746        let raw_id = RawId(2);
747        let id = Id::from(raw_id);
748        assert_eq!(id.as_raw(), raw_id);
749    }
750
751    #[test]
752    fn name_unnamed() {
753        let name = Name::default();
754        assert_eq!(name.try_as_str(), None);
755        assert_eq!(name.as_str(), "unnamed");
756        assert_eq!(name.into(), None);
757    }
758
759    #[test]
760    fn name_named() {
761        let name = Name::default().name("named");
762        assert_eq!(name.try_as_str(), Some("named"));
763        assert_eq!(name.as_str(), "named");
764        assert_eq!(name.into(), Some(Cow::Borrowed("named")));
765    }
766
767    #[test]
768    fn task_unnamed() {
769        let (task, _handle) = Task::new(Name::default(), pending::<()>());
770        assert_eq!(task.try_name(), None);
771        assert_eq!(task.name(), "unnamed");
772    }
773
774    #[test]
775    fn task_named() {
776        let (task, _handle) = Task::new(Name::default().name("named"), pending::<()>());
777        assert_eq!(task.try_name(), Some("named"));
778        assert_eq!(task.name(), "named");
779    }
780
781    #[test]
782    fn task_id() {
783        let (task, handle) = Task::new(Name::default(), async { id() });
784        block_on(Box::into_pin(task.future));
785        assert_eq!(block_on(handle).unwrap(), task.id);
786    }
787
788    #[test]
789    fn task_try_id() {
790        let (task, handle) = Task::new(Name::default(), async { try_id() });
791        block_on(Box::into_pin(task.future));
792        assert_eq!(block_on(handle).unwrap(), Some(task.id));
793    }
794
795    #[test]
796    fn task_cancel() {
797        let (task, handle) = Task::new(Name::default().name("named"), pending::<()>());
798        handle.cancel();
799        block_on(Box::into_pin(task.future));
800        let err = block_on(handle).unwrap_err();
801        assert!(err.is_cancelled());
802    }
803
804    #[test]
805    fn task_cancel_handle() {
806        let (task, handle) = Task::new(Name::default().name("named"), pending::<()>());
807        handle.cancel_handle().cancel();
808        block_on(Box::into_pin(task.future));
809        let err = block_on(handle).unwrap_err();
810        assert!(err.is_cancelled());
811    }
812
813    #[test]
814    fn task_cancel_passively() {
815        let (task, handle) = Task::new(Name::default(), ready(()));
816        drop(task);
817        let err = block_on(handle).unwrap_err();
818        assert!(err.is_cancelled());
819    }
820
821    #[test]
822    fn task_cancel_after_polling() {
823        let (task, handle) = Task::new(Name::default(), pending::<()>());
824        let mut task = Box::into_pin(task.future);
825        let noop_waker = noop_waker();
826        let mut cx = Context::from_waker(&noop_waker);
827        assert_eq!(task.as_mut().poll(&mut cx), Poll::Pending);
828        handle.cancel();
829        assert_eq!(task.as_mut().poll(&mut cx), Poll::Ready(()));
830        let err = block_on(handle).unwrap_err();
831        assert!(err.is_cancelled());
832    }
833
834    #[test]
835    fn task_cancel_after_completed() {
836        let (task, handle) = Task::new(Name::default(), ready(()));
837        block_on(Box::into_pin(task.future));
838        handle.cancel();
839        block_on(handle).unwrap();
840    }
841
842    #[test]
843    #[should_panic(expected = "panic in task")]
844    fn task_cancel_after_paniced() {
845        let (task, handle) = Task::new(Name::default(), async { panic!("panic in task") });
846        block_on(Box::into_pin(task.future));
847        handle.cancel();
848        block_on(handle).unwrap_err().resume_panic();
849    }
850
851    #[test]
852    #[should_panic(expected = "panic in task")]
853    fn join_handle_join_paniced() {
854        let (task, handle) = Task::new(Name::default(), async { panic!("panic in task") });
855        block_on(Box::into_pin(task.future));
856        block_on(handle).unwrap_err().resume_panic();
857    }
858
859    #[test]
860    fn join_handle_join_running() {
861        let (task, handle) = Task::new(Name::default(), ready(()));
862        std::thread::spawn(move || {
863            block_on(Box::into_pin(task.future));
864        });
865        block_on(handle).unwrap();
866    }
867
868    #[test]
869    fn join_handle_join_finished() {
870        let (task, handle) = Task::new(Name::default(), ready(()));
871        block_on(Box::into_pin(task.future));
872        block_on(handle).unwrap();
873    }
874
875    #[test]
876    fn join_handle_join_finished_concurrently() {
877        let (task, handle) = Task::new(Name::default(), ready(()));
878        block_on(Box::into_pin(task.future));
879        handle
880            .joint
881            .stage
882            .store(Stage::Running as usize, Ordering::Relaxed);
883        block_on(handle).unwrap();
884    }
885
886    #[test]
887    #[should_panic(expected = "already joined")]
888    fn join_handle_join_joined() {
889        let (task, mut handle) = Task::new(Name::default(), ready(()));
890        block_on(Box::into_pin(task.future));
891        let noop_waker = noop_waker();
892        let mut cx = Context::from_waker(&noop_waker);
893        let pinned = unsafe { Pin::new_unchecked(&mut handle) };
894        assert!(pinned.poll(&mut cx).is_ready());
895        let pinned = unsafe { Pin::new_unchecked(&mut handle) };
896        let _ = pinned.poll(&mut cx);
897    }
898
899    #[test]
900    #[should_panic(expected = "already detached")]
901    fn join_handle_join_detached() {
902        let (_task, handle) = Task::new(Name::default(), ready(()));
903        unsafe {
904            drop(handle.clone());
905        }
906        let _ = block_on(handle);
907    }
908
909    #[test]
910    fn join_handle_detached() {
911        let (task, handle) = Task::new(Name::default(), ready(()));
912        drop(handle);
913        block_on(Box::into_pin(task.future));
914    }
915
916    #[test]
917    fn join_handle_detached_concurrently() {
918        let (task, handle) = Task::new(Name::default(), ready(()));
919        *handle.joint.result.lock().unwrap() = JoinResult::Detached;
920        block_on(Box::into_pin(task.future));
921    }
922
923    #[test]
924    fn task_handle_join() {
925        let (task, handle) = Task::new(Name::default(), async { id() });
926        let handle = handle.attach();
927        block_on(Box::into_pin(task.future));
928        let id = handle.id();
929        assert_eq!(block_on(handle).unwrap(), id);
930    }
931
932    #[test]
933    fn task_handle_cancel() {
934        let (task, handle) = Task::new(Name::default(), pending::<()>());
935        std::thread::spawn(move || {
936            block_on(Box::into_pin(task.future));
937        });
938        let handle = handle.attach();
939        let err = block_on(async move { handle.cancel().await.unwrap_err() });
940        assert!(err.is_cancelled());
941    }
942
943    struct Sleep {
944        deadline: Instant,
945    }
946
947    impl Sleep {
948        fn new(timeout: Duration) -> Sleep {
949            Sleep {
950                deadline: Instant::now() + timeout,
951            }
952        }
953    }
954
955    impl Future for Sleep {
956        type Output = ();
957
958        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
959            let timeout = self.deadline.saturating_duration_since(Instant::now());
960            if timeout.is_zero() {
961                return Poll::Ready(());
962            }
963            let waker = cx.waker().clone();
964            std::thread::spawn(move || {
965                std::thread::sleep(timeout);
966                waker.wake();
967            });
968            Poll::Pending
969        }
970    }
971
972    #[test]
973    fn task_handle_cancel_on_drop() {
974        let (task, handle) = Task::new(Name::default(), pending::<()>());
975        let handle = handle.attach();
976        drop(handle);
977        block_on(Box::into_pin(task.future));
978    }
979
980    #[test]
981    fn task_handle_detach() {
982        let cancelled = Arc::new(AtomicBool::new(true));
983        let (task, handle) = Task::new(Name::default(), {
984            let cancelled = cancelled.clone();
985            async move {
986                Sleep::new(Duration::from_millis(500)).await;
987                cancelled.store(false, Ordering::Relaxed);
988            }
989        });
990        let handle = handle.attach().detach();
991        drop(handle);
992        block_on(Box::into_pin(task.future));
993        assert!(!cancelled.load(Ordering::Relaxed));
994    }
995
996    struct CustomFuture {
997        _shared: Arc<()>,
998    }
999
1000    impl Future for CustomFuture {
1001        type Output = ();
1002
1003        fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1004            Poll::Ready(())
1005        }
1006    }
1007
1008    #[test]
1009    fn future_dropped_before_ready() {
1010        let shared = Arc::new(());
1011        let (mut task, _handle) = Task::new(
1012            Name::default(),
1013            CustomFuture {
1014                _shared: shared.clone(),
1015            },
1016        );
1017        let pinned = unsafe { Pin::new_unchecked(task.future.as_mut()) };
1018        let poll = pinned.poll(&mut Context::from_waker(futures::task::noop_waker_ref()));
1019        assert!(poll.is_ready());
1020        assert_eq!(Arc::strong_count(&shared), 1);
1021    }
1022
1023    #[test]
1024    fn future_dropped_before_joined() {
1025        let shared = Arc::new(());
1026        let (mut task, handle) = Task::new(
1027            Name::default(),
1028            CustomFuture {
1029                _shared: shared.clone(),
1030            },
1031        );
1032        std::thread::spawn(move || {
1033            let pinned = unsafe { Pin::new_unchecked(task.future.as_mut()) };
1034            let _poll = pinned.poll(&mut Context::from_waker(futures::task::noop_waker_ref()));
1035
1036            // Let join handle complete before task drop.
1037            std::thread::sleep(Duration::from_millis(10));
1038        });
1039        block_on(handle).unwrap();
1040        assert_eq!(Arc::strong_count(&shared), 1);
1041    }
1042}