1use crate::spawn_with_name;
2use std::any::Any;
3use std::borrow::Cow;
4use std::cell::Cell;
5use std::fmt::{self, Debug, Display, Formatter};
6use std::future::Future;
7use std::mem;
8use std::mem::ManuallyDrop;
9use std::panic::{self, AssertUnwindSafe};
10use std::pin::Pin;
11use std::ptr;
12use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll, Waker};
15
16thread_local! {
17 static RUNNING_ID: Cell<RawId> = const { Cell::new(RawId(0)) };
18}
19
20#[repr(usize)]
21#[derive(Clone, Copy, PartialEq)]
22enum Stage {
23 Running = 0,
24 Finished = 1,
25 Detached = 2,
26}
27
28impl From<usize> for Stage {
29 fn from(v: usize) -> Self {
30 match v {
31 0 => Stage::Running,
32 1 => Stage::Finished,
33 2 => Stage::Detached,
34 _ => unreachable!("{v} is not valid for Stage"),
35 }
36 }
37}
38
39enum JoinResult<T> {
40 Empty,
41 Joining(Waker),
42 Joined,
43 Finished(Result<T, InnerJoinError>),
44 Detached,
45}
46
47#[derive(Clone, Copy)]
48#[cfg_attr(test, derive(Debug, PartialEq))]
49struct RawId(u64);
50
51impl From<RawId> for Id {
52 fn from(id: RawId) -> Id {
53 Id(id.0)
54 }
55}
56
57impl RawId {
58 fn run(self) -> RawId {
59 RUNNING_ID.with(|id| {
60 let previous = id.get();
61 id.set(self);
62 previous
63 })
64 }
65
66 pub(crate) fn enter(&self) -> IdScope {
67 let id = RawId(self.0);
68 let previous = id.run();
69 IdScope { previous }
70 }
71
72 fn next() -> RawId {
73 static ID: AtomicU64 = AtomicU64::new(1);
75 Self(ID.fetch_add(1, Ordering::Relaxed))
76 }
77}
78
79#[derive(Clone, Hash, PartialEq, Eq)]
90pub struct Id(u64);
91
92struct IdScope {
93 previous: RawId,
94}
95
96impl Drop for IdScope {
97 fn drop(&mut self) {
98 self.previous.run();
99 }
100}
101
102impl Id {
103 fn as_raw(&self) -> RawId {
104 RawId(self.0)
105 }
106}
107
108impl Display for Id {
109 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
110 if self.0 <= u32::MAX.into() {
111 write!(f, "{:#010x}", self.0 as u32)
112 } else {
113 write!(f, "{:#018x}", self.0)
114 }
115 }
116}
117
118impl Debug for Id {
119 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
120 write!(f, "TaskId({self})")
121 }
122}
123
124#[non_exhaustive]
126pub struct Task {
127 pub id: Id,
128 pub name: Name,
129 pub future: Box<dyn Future<Output = ()> + Send + 'static>,
130}
131
132impl Task {
133 pub(crate) fn new<T: Send, F: Future<Output = T> + Send + 'static>(
134 name: Name,
135 future: F,
136 ) -> (Self, JoinHandle<F::Output>) {
137 let (id, future) = IdFuture::new(future);
138 let future = TaskFuture::new(future);
139 let handle = JoinHandle::new(
140 id.as_raw(),
141 future.joint.clone(),
142 future.cancellation.clone(),
143 );
144 let task = Self {
145 id,
146 name,
147 future: Box::new(future),
148 };
149 (task, handle)
150 }
151
152 pub fn name(&self) -> &str {
153 self.name.as_str()
154 }
155
156 pub fn try_name(&self) -> Option<&str> {
157 self.name.try_as_str()
158 }
159}
160
161#[derive(Clone, Default, Hash, PartialEq, Eq, Debug)]
163pub struct Name {
164 name: Option<Cow<'static, str>>,
165}
166
167impl Name {
168 pub fn as_str(&self) -> &str {
170 self.try_as_str().unwrap_or("unnamed")
171 }
172
173 pub fn try_as_str(&self) -> Option<&str> {
175 self.name.as_ref().map(|n| n.as_ref())
176 }
177
178 pub fn into(self) -> Option<Cow<'static, str>> {
180 self.name
181 }
182
183 fn name(self, name: impl Into<Cow<'static, str>>) -> Self {
184 Self {
185 name: Some(name.into()),
186 }
187 }
188}
189
190#[derive(Default)]
192pub struct Builder {
193 name: Name,
194}
195
196impl Builder {
197 pub fn new() -> Self {
199 Self::default()
200 }
201
202 pub fn name(self, name: impl Into<Cow<'static, str>>) -> Self {
204 Self {
205 name: self.name.name(name),
206 }
207 }
208
209 pub fn spawn<T, F>(self, f: F) -> JoinHandle<T>
211 where
212 F: Future<Output = T> + Send + 'static,
213 T: Send + 'static,
214 {
215 spawn_with_name(self.name, f)
216 }
217}
218
219struct Joint<T> {
220 stage: AtomicUsize,
221 result: Mutex<JoinResult<T>>,
222}
223
224impl<T> Joint<T> {
225 fn new() -> Self {
226 Self {
227 stage: AtomicUsize::new(Stage::Running as usize),
228 result: Mutex::new(JoinResult::Empty),
229 }
230 }
231
232 fn wake(&self, value: Result<T, InnerJoinError>) {
233 let stage = self.stage();
234 if stage != Stage::Running {
235 return;
236 }
237 let mut lock = self.result.lock().unwrap();
238 let result = mem::replace(&mut *lock, JoinResult::Finished(value));
239 drop(lock);
240 self.stage
242 .store(Stage::Finished as usize, Ordering::Relaxed);
243 match result {
244 JoinResult::Empty => {}
245 JoinResult::Joining(waker) => waker.wake(),
246 JoinResult::Finished(_) | JoinResult::Joined => unreachable!("task completed already"),
247 JoinResult::Detached => *self.result.lock().unwrap() = JoinResult::Detached,
248 }
249 }
250
251 fn stage(&self) -> Stage {
252 Stage::from(self.stage.load(Ordering::Relaxed))
253 }
254}
255
256struct IdFuture<F: Future> {
257 id: RawId,
258 future: F,
259}
260
261impl<F: Future> IdFuture<F> {
262 pub fn new(future: F) -> (Id, Self) {
263 let id = RawId::next();
264 let future = Self { id, future };
265 (id.into(), future)
266 }
267}
268
269impl<F: Future> Future for IdFuture<F> {
270 type Output = F::Output;
271
272 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273 let _scope = self.id.enter();
274 let task = unsafe { self.get_unchecked_mut() };
275 let future = unsafe { Pin::new_unchecked(&mut task.future) };
276 future.poll(cx)
277 }
278}
279
280struct TaskFuture<F: Future> {
281 waker: Option<Box<Waker>>,
282 cancellation: Arc<Cancellation>,
283 joint: Arc<Joint<F::Output>>,
284 future: Option<F>,
285}
286
287impl<F: Future> TaskFuture<F> {
288 fn new(future: F) -> Self {
289 Self {
290 waker: None,
291 joint: Arc::new(Joint::new()),
292 cancellation: Arc::new(Default::default()),
293 future: Some(future),
294 }
295 }
296
297 fn finish(&mut self, value: Result<F::Output, InnerJoinError>) -> Poll<()> {
298 self.future = None;
299 self.joint.wake(value);
300 Poll::Ready(())
301 }
302}
303
304impl<F: Future> Drop for TaskFuture<F> {
305 fn drop(&mut self) {
306 if self.future.is_some() {
307 let _ = self.finish(Err(InnerJoinError::Cancelled));
308 }
309 }
310}
311
312impl<F: Future> Future for TaskFuture<F> {
313 type Output = ();
314
315 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
316 let task = unsafe { self.get_unchecked_mut() };
317 if task.future.is_none() {
318 return Poll::Ready(());
319 } else if task.cancellation.is_cancelled() {
320 return task.finish(Err(InnerJoinError::Cancelled));
321 }
322 let future = unsafe { Pin::new_unchecked(task.future.as_mut().unwrap_unchecked()) };
323 match panic::catch_unwind(AssertUnwindSafe(|| future.poll(cx))) {
324 Ok(Poll::Pending) => {
325 let waker = match task.waker.take() {
326 None => Box::new(cx.waker().clone()),
327 Some(mut waker) => {
328 waker.as_mut().clone_from(cx.waker());
329 waker
330 }
331 };
332 let Ok(waker) = task.cancellation.update(waker) else {
333 return task.finish(Err(InnerJoinError::Cancelled));
334 };
335 task.waker = waker;
336 Poll::Pending
337 }
338 Ok(Poll::Ready(value)) => task.finish(Ok(value)),
339 Err(err) => task.finish(Err(InnerJoinError::Panic(err))),
340 }
341 }
342}
343
344#[derive(Default)]
345struct Cancellation {
346 waker: AtomicPtr<Waker>,
347}
348
349impl Cancellation {
350 const CANCELLED: *mut Waker = Self::cancelled();
351
352 const fn cancelled() -> *mut Waker {
353 Cancellation::cancel as *const fn() as *mut Waker
354 }
355
356 pub fn is_cancelled(&self) -> bool {
357 let current = self.waker.load(Ordering::Relaxed);
358 ptr::eq(current, Self::CANCELLED)
359 }
360
361 pub fn cancel(&self) {
362 let mut current = self.waker.load(Ordering::Relaxed);
363 loop {
364 if ptr::eq(current, Self::CANCELLED) {
365 return;
366 }
367 match self.waker.compare_exchange(
368 current,
369 Self::CANCELLED,
370 Ordering::Relaxed,
371 Ordering::Relaxed,
372 ) {
373 Err(newer) => current = newer,
374 _ => {
375 if !ptr::eq(current, ptr::null()) {
376 let waker = unsafe { Box::from_raw(current) };
377 waker.wake();
378 }
379 return;
380 }
381 }
382 }
383 }
384
385 pub fn update(&self, mut waker: Box<Waker>) -> Result<Option<Box<Waker>>, ()> {
386 let raw_waker = waker.as_mut() as *mut Waker;
387 let mut current = self.waker.load(Ordering::Relaxed);
388 loop {
389 if ptr::eq(current, Self::CANCELLED) {
390 return Err(());
391 }
392 match self.waker.compare_exchange(
393 current,
394 raw_waker,
395 Ordering::Relaxed,
396 Ordering::Relaxed,
397 ) {
398 Err(newer) => current = newer,
399 _ => {
400 mem::forget(waker);
401 if !ptr::eq(current, ptr::null()) {
402 let waker = unsafe { Box::from_raw(current) };
403 return Ok(Some(waker));
404 }
405 return Ok(None);
406 }
407 }
408 }
409 }
410}
411
412impl Drop for Cancellation {
413 fn drop(&mut self) {
414 let waker = self.waker.load(Ordering::Relaxed);
415 if ptr::eq(waker, ptr::null()) || ptr::eq(waker, Self::CANCELLED) {
416 return;
417 }
418 let _ = unsafe { Box::from_raw(waker) };
419 }
420}
421
422enum InnerJoinError {
423 Cancelled,
424 Panic(Box<dyn Any + Send + 'static>),
425}
426
427pub struct JoinError {
429 id: Id,
430 inner: InnerJoinError,
431}
432
433impl JoinError {
434 pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, JoinError> {
436 match self.inner {
437 InnerJoinError::Panic(p) => Ok(p),
438 _ => Err(self),
439 }
440 }
441
442 pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
444 match self.try_into_panic() {
445 Ok(panic) => panic,
446 Err(err) => panic!("task {} does not panic, but cancelled", err.id),
447 }
448 }
449
450 pub fn resume_panic(self) -> ! {
453 panic::resume_unwind(self.into_panic())
454 }
455
456 pub fn is_panic(&self) -> bool {
458 matches!(&self.inner, InnerJoinError::Panic(_))
459 }
460
461 pub fn is_cancelled(&self) -> bool {
463 matches!(&self.inner, InnerJoinError::Cancelled)
464 }
465}
466
467impl Debug for JoinError {
468 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
469 match &self.inner {
470 InnerJoinError::Cancelled => write!(fmt, "JoinError::Cancelled({:?})", self.id),
471 InnerJoinError::Panic(_panic) => write!(fmt, "JoinError::Panic({:?}, ...)", self.id),
472 }
473 }
474}
475
476impl Display for JoinError {
477 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
478 match &self.inner {
479 InnerJoinError::Cancelled => write!(fmt, "task {} was cancelled", self.id),
480 InnerJoinError::Panic(_panic) => write!(fmt, "task {} panicked", self.id),
481 }
482 }
483}
484
485impl std::error::Error for JoinError {}
486
487#[derive(Clone)]
489pub struct CancelHandle {
490 cancellation: Arc<Cancellation>,
491}
492
493unsafe impl Send for CancelHandle {}
494unsafe impl Sync for CancelHandle {}
495
496impl CancelHandle {
497 fn new(cancellation: Arc<Cancellation>) -> Self {
498 Self { cancellation }
499 }
500
501 pub fn cancel(&self) {
506 self.cancellation.cancel()
507 }
508}
509
510#[must_use = "task will be cancelled on drop, detach it if this is undesired"]
512pub struct TaskHandle<T> {
513 handle: JoinHandle<T>,
514}
515
516impl<T> TaskHandle<T> {
517 pub fn id(&self) -> Id {
522 self.handle.id()
523 }
524
525 pub fn detach(self) -> JoinHandle<T> {
527 let task = ManuallyDrop::new(self);
528 unsafe { ptr::read(&task.handle) }
529 }
530
531 pub fn cancel(self) -> JoinHandle<T> {
536 self.handle.cancel();
537 self.detach()
538 }
539
540 pub fn cancel_handle(&self) -> CancelHandle {
542 self.handle.cancel_handle()
543 }
544}
545
546impl<T> Drop for TaskHandle<T> {
547 fn drop(&mut self) {
548 self.handle.cancel()
549 }
550}
551
552impl<T> Future for TaskHandle<T> {
553 type Output = Result<T, JoinError>;
554
555 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
556 let handle = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().handle) };
557 handle.poll(cx)
558 }
559}
560
561pub struct JoinHandle<T> {
563 id: RawId,
564 joint: Arc<Joint<T>>,
565 cancellation: Arc<Cancellation>,
566}
567
568impl<T> JoinHandle<T> {
569 pub fn id(&self) -> Id {
574 self.id.into()
575 }
576
577 fn new(id: RawId, joint: Arc<Joint<T>>, cancellation: Arc<Cancellation>) -> Self {
578 Self {
579 id,
580 joint,
581 cancellation,
582 }
583 }
584
585 pub fn cancel(&self) {
590 self.cancellation.cancel()
591 }
592
593 pub fn attach(self) -> TaskHandle<T> {
595 TaskHandle { handle: self }
596 }
597
598 #[cfg(test)]
599 unsafe fn clone(&self) -> Self {
600 Self {
601 id: self.id,
602 joint: self.joint.clone(),
603 cancellation: self.cancellation.clone(),
604 }
605 }
606
607 pub fn cancel_handle(&self) -> CancelHandle {
609 CancelHandle::new(self.cancellation.clone())
610 }
611}
612
613impl<T> Drop for JoinHandle<T> {
614 fn drop(&mut self) {
615 self.joint
616 .stage
617 .store(Stage::Detached as usize, Ordering::Relaxed);
618 *self.joint.result.lock().unwrap() = JoinResult::Detached;
619 }
620}
621
622impl<T> Future for JoinHandle<T> {
623 type Output = Result<T, JoinError>;
624
625 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
626 let stage = self.joint.stage();
627 if stage == Stage::Finished {
628 let mut lock = self.joint.result.lock().unwrap();
629 let result = mem::replace(&mut *lock, JoinResult::Joined);
630 drop(lock);
631 return match result {
632 JoinResult::Finished(result) => Poll::Ready(result.map_err(|inner| JoinError {
633 id: self.id.into(),
634 inner,
635 })),
636 JoinResult::Joined => panic!("task({}) already joined", Id(self.id.0)),
637 JoinResult::Detached => panic!("task({}) already detached", Id(self.id.0)),
638 _ => unreachable!("get no task result after stage finished"),
639 };
640 }
641 let waker = cx.waker().clone();
642 let mut lock = self.joint.result.lock().unwrap();
643 let result = mem::replace(&mut *lock, JoinResult::Joining(waker));
644 drop(lock);
645 match result {
646 JoinResult::Finished(result) => {
647 *self.joint.result.lock().unwrap() = JoinResult::Joined;
648 Poll::Ready(result.map_err(|inner| JoinError {
649 id: self.id.into(),
650 inner,
651 }))
652 }
653 JoinResult::Empty | JoinResult::Joining(_) => Poll::Pending,
654 JoinResult::Joined => panic!("task({}) already joined", Id(self.id.0)),
655 JoinResult::Detached => panic!("task({}) already detached", Id(self.id.0)),
656 }
657 }
658}
659
660pub fn id() -> Id {
665 try_id().expect("id(): no task running")
666}
667
668pub fn try_id() -> Option<Id> {
673 let id = RUNNING_ID.get();
674 if id.0 == 0 {
675 None
676 } else {
677 Some(id.into())
678 }
679}
680
681#[cfg(test)]
682mod tests {
683 use std::sync::atomic::AtomicBool;
684 use std::time::{Duration, Instant};
685
686 use super::*;
687 use futures::executor::block_on;
688 use futures::task::noop_waker;
689 use std::future::{pending, ready};
690
691 use static_assertions::*;
692
693 assert_impl_all!(Id: Clone);
694 assert_not_impl_any!(Id: Copy);
695
696 assert_impl_all!(JoinHandle<()>: Send, Sync);
697 assert_not_impl_any!(JoinHandle<()>: Clone, Copy);
698
699 assert_impl_all!(TaskHandle<()>: Send, Sync);
700 assert_not_impl_any!(TaskHandle<()>: Clone, Copy);
701
702 assert_impl_all!(CancelHandle: Send, Sync);
703
704 #[test]
705 #[should_panic(expected = "no task running")]
706 fn id_no() {
707 id();
708 }
709
710 #[test]
711 fn id_ok() {
712 let (id, future) = IdFuture::new(async { id() });
713 assert_eq!(block_on(future), id);
714 }
715
716 #[test]
717 fn try_id_no() {
718 assert_eq!(try_id(), None);
719 }
720
721 #[test]
722 fn try_id_ok() {
723 let (id, future) = IdFuture::new(async { try_id() });
724 assert_eq!(block_on(future), Some(id));
725 }
726
727 #[test]
728 fn id_display() {
729 assert_eq!(Id(2).to_string(), "0x00000002");
730 assert_eq!(Id(u32::MAX as u64).to_string(), "0xffffffff");
731 assert_eq!(Id(u32::MAX as u64 + 1).to_string(), "0x0000000100000000");
732 }
733
734 #[test]
735 fn id_debug() {
736 assert_eq!(format!("{:?}", Id(2)), "TaskId(0x00000002)");
737 assert_eq!(format!("{:?}", Id(u32::MAX as u64)), "TaskId(0xffffffff)");
738 assert_eq!(
739 format!("{:?}", Id(u32::MAX as u64 + 1)),
740 "TaskId(0x0000000100000000)"
741 );
742 }
743
744 #[test]
745 fn id_raw() {
746 let raw_id = RawId(2);
747 let id = Id::from(raw_id);
748 assert_eq!(id.as_raw(), raw_id);
749 }
750
751 #[test]
752 fn name_unnamed() {
753 let name = Name::default();
754 assert_eq!(name.try_as_str(), None);
755 assert_eq!(name.as_str(), "unnamed");
756 assert_eq!(name.into(), None);
757 }
758
759 #[test]
760 fn name_named() {
761 let name = Name::default().name("named");
762 assert_eq!(name.try_as_str(), Some("named"));
763 assert_eq!(name.as_str(), "named");
764 assert_eq!(name.into(), Some(Cow::Borrowed("named")));
765 }
766
767 #[test]
768 fn task_unnamed() {
769 let (task, _handle) = Task::new(Name::default(), pending::<()>());
770 assert_eq!(task.try_name(), None);
771 assert_eq!(task.name(), "unnamed");
772 }
773
774 #[test]
775 fn task_named() {
776 let (task, _handle) = Task::new(Name::default().name("named"), pending::<()>());
777 assert_eq!(task.try_name(), Some("named"));
778 assert_eq!(task.name(), "named");
779 }
780
781 #[test]
782 fn task_id() {
783 let (task, handle) = Task::new(Name::default(), async { id() });
784 block_on(Box::into_pin(task.future));
785 assert_eq!(block_on(handle).unwrap(), task.id);
786 }
787
788 #[test]
789 fn task_try_id() {
790 let (task, handle) = Task::new(Name::default(), async { try_id() });
791 block_on(Box::into_pin(task.future));
792 assert_eq!(block_on(handle).unwrap(), Some(task.id));
793 }
794
795 #[test]
796 fn task_cancel() {
797 let (task, handle) = Task::new(Name::default().name("named"), pending::<()>());
798 handle.cancel();
799 block_on(Box::into_pin(task.future));
800 let err = block_on(handle).unwrap_err();
801 assert!(err.is_cancelled());
802 }
803
804 #[test]
805 fn task_cancel_handle() {
806 let (task, handle) = Task::new(Name::default().name("named"), pending::<()>());
807 handle.cancel_handle().cancel();
808 block_on(Box::into_pin(task.future));
809 let err = block_on(handle).unwrap_err();
810 assert!(err.is_cancelled());
811 }
812
813 #[test]
814 fn task_cancel_passively() {
815 let (task, handle) = Task::new(Name::default(), ready(()));
816 drop(task);
817 let err = block_on(handle).unwrap_err();
818 assert!(err.is_cancelled());
819 }
820
821 #[test]
822 fn task_cancel_after_polling() {
823 let (task, handle) = Task::new(Name::default(), pending::<()>());
824 let mut task = Box::into_pin(task.future);
825 let noop_waker = noop_waker();
826 let mut cx = Context::from_waker(&noop_waker);
827 assert_eq!(task.as_mut().poll(&mut cx), Poll::Pending);
828 handle.cancel();
829 assert_eq!(task.as_mut().poll(&mut cx), Poll::Ready(()));
830 let err = block_on(handle).unwrap_err();
831 assert!(err.is_cancelled());
832 }
833
834 #[test]
835 fn task_cancel_after_completed() {
836 let (task, handle) = Task::new(Name::default(), ready(()));
837 block_on(Box::into_pin(task.future));
838 handle.cancel();
839 block_on(handle).unwrap();
840 }
841
842 #[test]
843 #[should_panic(expected = "panic in task")]
844 fn task_cancel_after_paniced() {
845 let (task, handle) = Task::new(Name::default(), async { panic!("panic in task") });
846 block_on(Box::into_pin(task.future));
847 handle.cancel();
848 block_on(handle).unwrap_err().resume_panic();
849 }
850
851 #[test]
852 #[should_panic(expected = "panic in task")]
853 fn join_handle_join_paniced() {
854 let (task, handle) = Task::new(Name::default(), async { panic!("panic in task") });
855 block_on(Box::into_pin(task.future));
856 block_on(handle).unwrap_err().resume_panic();
857 }
858
859 #[test]
860 fn join_handle_join_running() {
861 let (task, handle) = Task::new(Name::default(), ready(()));
862 std::thread::spawn(move || {
863 block_on(Box::into_pin(task.future));
864 });
865 block_on(handle).unwrap();
866 }
867
868 #[test]
869 fn join_handle_join_finished() {
870 let (task, handle) = Task::new(Name::default(), ready(()));
871 block_on(Box::into_pin(task.future));
872 block_on(handle).unwrap();
873 }
874
875 #[test]
876 fn join_handle_join_finished_concurrently() {
877 let (task, handle) = Task::new(Name::default(), ready(()));
878 block_on(Box::into_pin(task.future));
879 handle
880 .joint
881 .stage
882 .store(Stage::Running as usize, Ordering::Relaxed);
883 block_on(handle).unwrap();
884 }
885
886 #[test]
887 #[should_panic(expected = "already joined")]
888 fn join_handle_join_joined() {
889 let (task, mut handle) = Task::new(Name::default(), ready(()));
890 block_on(Box::into_pin(task.future));
891 let noop_waker = noop_waker();
892 let mut cx = Context::from_waker(&noop_waker);
893 let pinned = unsafe { Pin::new_unchecked(&mut handle) };
894 assert!(pinned.poll(&mut cx).is_ready());
895 let pinned = unsafe { Pin::new_unchecked(&mut handle) };
896 let _ = pinned.poll(&mut cx);
897 }
898
899 #[test]
900 #[should_panic(expected = "already detached")]
901 fn join_handle_join_detached() {
902 let (_task, handle) = Task::new(Name::default(), ready(()));
903 unsafe {
904 drop(handle.clone());
905 }
906 let _ = block_on(handle);
907 }
908
909 #[test]
910 fn join_handle_detached() {
911 let (task, handle) = Task::new(Name::default(), ready(()));
912 drop(handle);
913 block_on(Box::into_pin(task.future));
914 }
915
916 #[test]
917 fn join_handle_detached_concurrently() {
918 let (task, handle) = Task::new(Name::default(), ready(()));
919 *handle.joint.result.lock().unwrap() = JoinResult::Detached;
920 block_on(Box::into_pin(task.future));
921 }
922
923 #[test]
924 fn task_handle_join() {
925 let (task, handle) = Task::new(Name::default(), async { id() });
926 let handle = handle.attach();
927 block_on(Box::into_pin(task.future));
928 let id = handle.id();
929 assert_eq!(block_on(handle).unwrap(), id);
930 }
931
932 #[test]
933 fn task_handle_cancel() {
934 let (task, handle) = Task::new(Name::default(), pending::<()>());
935 std::thread::spawn(move || {
936 block_on(Box::into_pin(task.future));
937 });
938 let handle = handle.attach();
939 let err = block_on(async move { handle.cancel().await.unwrap_err() });
940 assert!(err.is_cancelled());
941 }
942
943 struct Sleep {
944 deadline: Instant,
945 }
946
947 impl Sleep {
948 fn new(timeout: Duration) -> Sleep {
949 Sleep {
950 deadline: Instant::now() + timeout,
951 }
952 }
953 }
954
955 impl Future for Sleep {
956 type Output = ();
957
958 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
959 let timeout = self.deadline.saturating_duration_since(Instant::now());
960 if timeout.is_zero() {
961 return Poll::Ready(());
962 }
963 let waker = cx.waker().clone();
964 std::thread::spawn(move || {
965 std::thread::sleep(timeout);
966 waker.wake();
967 });
968 Poll::Pending
969 }
970 }
971
972 #[test]
973 fn task_handle_cancel_on_drop() {
974 let (task, handle) = Task::new(Name::default(), pending::<()>());
975 let handle = handle.attach();
976 drop(handle);
977 block_on(Box::into_pin(task.future));
978 }
979
980 #[test]
981 fn task_handle_detach() {
982 let cancelled = Arc::new(AtomicBool::new(true));
983 let (task, handle) = Task::new(Name::default(), {
984 let cancelled = cancelled.clone();
985 async move {
986 Sleep::new(Duration::from_millis(500)).await;
987 cancelled.store(false, Ordering::Relaxed);
988 }
989 });
990 let handle = handle.attach().detach();
991 drop(handle);
992 block_on(Box::into_pin(task.future));
993 assert!(!cancelled.load(Ordering::Relaxed));
994 }
995
996 struct CustomFuture {
997 _shared: Arc<()>,
998 }
999
1000 impl Future for CustomFuture {
1001 type Output = ();
1002
1003 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1004 Poll::Ready(())
1005 }
1006 }
1007
1008 #[test]
1009 fn future_dropped_before_ready() {
1010 let shared = Arc::new(());
1011 let (mut task, _handle) = Task::new(
1012 Name::default(),
1013 CustomFuture {
1014 _shared: shared.clone(),
1015 },
1016 );
1017 let pinned = unsafe { Pin::new_unchecked(task.future.as_mut()) };
1018 let poll = pinned.poll(&mut Context::from_waker(futures::task::noop_waker_ref()));
1019 assert!(poll.is_ready());
1020 assert_eq!(Arc::strong_count(&shared), 1);
1021 }
1022
1023 #[test]
1024 fn future_dropped_before_joined() {
1025 let shared = Arc::new(());
1026 let (mut task, handle) = Task::new(
1027 Name::default(),
1028 CustomFuture {
1029 _shared: shared.clone(),
1030 },
1031 );
1032 std::thread::spawn(move || {
1033 let pinned = unsafe { Pin::new_unchecked(task.future.as_mut()) };
1034 let _poll = pinned.poll(&mut Context::from_waker(futures::task::noop_waker_ref()));
1035
1036 std::thread::sleep(Duration::from_millis(10));
1038 });
1039 block_on(handle).unwrap();
1040 assert_eq!(Arc::strong_count(&shared), 1);
1041 }
1042}