1use crate::channel::oneshot;
100use crate::combinator::{Either, Select};
101use crate::cx::{Cx, cap};
102use crate::record::{AdmissionError, TaskRecord};
103use crate::runtime::task_handle::{JoinError, TaskHandle};
104use crate::runtime::{RegionCreateError, RuntimeState, SpawnError, StoredTask};
105use crate::tracing_compat::{debug, debug_span};
106use crate::types::{Budget, CancelReason, Outcome, PanicPayload, Policy, RegionId, TaskId};
107use std::future::Future;
108use std::marker::PhantomData;
109use std::pin::Pin;
110use std::sync::Arc;
111use std::task::{Context, Poll};
112
113pub struct Scope<'r, P: Policy = crate::types::policy::FailFast> {
121 pub(crate) region: RegionId,
123 pub(crate) budget: Budget,
125 pub(crate) _policy: PhantomData<&'r P>,
127}
128
129#[pin_project::pin_project]
130pub(crate) struct CatchUnwind<F> {
131 #[pin]
132 pub(crate) inner: F,
133}
134
135impl<F: Future> Future for CatchUnwind<F> {
136 type Output = std::thread::Result<F::Output>;
137
138 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139 let mut this = self.project();
140 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
141 this.inner.as_mut().poll(cx)
142 }));
143 match result {
144 Ok(Poll::Pending) => Poll::Pending,
145 Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
146 Err(payload) => Poll::Ready(Err(payload)),
147 }
148 }
149}
150
151pub(crate) fn payload_to_string(payload: &Box<dyn std::any::Any + Send>) -> String {
152 payload
153 .downcast_ref::<&str>()
154 .map(ToString::to_string)
155 .or_else(|| payload.downcast_ref::<String>().cloned())
156 .unwrap_or_else(|| "unknown panic".to_string())
157}
158
159struct RegionRunner<'a, Fut> {
160 fut: Pin<&'a mut CatchUnwind<Fut>>,
161 state: Option<&'a mut RuntimeState>,
162 child_region: RegionId,
163}
164
165impl<'a, Fut: Future> Future for RegionRunner<'a, Fut> {
166 type Output = (std::thread::Result<Fut::Output>, &'a mut RuntimeState);
167
168 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
169 let this = self.get_mut();
170 match this.fut.as_mut().poll(cx) {
171 Poll::Ready(res) => {
172 let state = this.state.take().expect("polled after ready");
173 Poll::Ready((res, state))
174 }
175 Poll::Pending => Poll::Pending,
176 }
177 }
178}
179
180impl<Fut> Drop for RegionRunner<'_, Fut> {
181 fn drop(&mut self) {
182 if let Some(state) = self.state.take() {
183 let reason = CancelReason::fail_fast().with_region(self.child_region);
184 let _ = state.cancel_request(self.child_region, &reason, None);
185 state.advance_region_state(self.child_region);
186 }
187 }
188}
189
190struct RegionCloseFuture {
191 state: Arc<parking_lot::Mutex<crate::record::region::RegionCloseState>>,
192}
193
194impl Future for RegionCloseFuture {
195 type Output = ();
196
197 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
198 let mut state = self.state.lock();
199 if state.closed {
200 Poll::Ready(())
201 } else {
202 if !state
203 .waker
204 .as_ref()
205 .is_some_and(|w| w.will_wake(cx.waker()))
206 {
207 state.waker = Some(cx.waker().clone());
208 }
209 Poll::Pending
210 }
211 }
212}
213
214impl Drop for RegionCloseFuture {
215 fn drop(&mut self) {
216 let mut state = self.state.lock();
217 state.waker = None;
218 }
219}
220
221impl<P: Policy> Scope<'_, P> {
222 #[must_use]
224 #[allow(dead_code)]
225 #[cfg_attr(feature = "test-internals", visibility::make(pub))]
226 pub(crate) fn new(region: RegionId, budget: Budget) -> Self {
227 Self {
228 region,
229 budget,
230 _policy: PhantomData,
231 }
232 }
233
234 #[must_use]
236 pub fn region_id(&self) -> RegionId {
237 self.region
238 }
239
240 #[must_use]
242 pub fn budget(&self) -> Budget {
243 self.budget
244 }
245
246 pub fn spawn<F, Fut, Caps>(
356 &self,
357 state: &mut RuntimeState,
358 cx: &Cx<Caps>,
359 f: F,
360 ) -> Result<(TaskHandle<Fut::Output>, StoredTask), SpawnError>
361 where
362 Caps: cap::HasSpawn + Send + Sync + 'static,
363 F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
364 Fut: Future + Send + 'static,
365 Fut::Output: Send + 'static,
366 {
367 let (tx, rx) = oneshot::channel::<Result<Fut::Output, JoinError>>();
369
370 let task_id = self.create_task_record(state)?;
372
373 let _span = debug_span!(
375 "task_spawn",
376 task_id = ?task_id,
377 region_id = ?self.region,
378 initial_state = "Created",
379 budget_deadline = ?self.budget.deadline,
380 budget_poll_quota = self.budget.poll_quota,
381 budget_cost_quota = ?self.budget.cost_quota,
382 budget_priority = self.budget.priority,
383 budget_source = "scope"
384 )
385 .entered();
386 debug!(
387 task_id = ?task_id,
388 region_id = ?self.region,
389 initial_state = "Created",
390 budget_deadline = ?self.budget.deadline,
391 budget_poll_quota = self.budget.poll_quota,
392 budget_cost_quota = ?self.budget.cost_quota,
393 budget_priority = self.budget.priority,
394 budget_source = "scope",
395 "task spawned"
396 );
397
398 let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
399
400 let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
402
403 if let Some(record) = state.task_mut(task_id) {
406 record.set_cx_inner(child_cx.inner.clone());
407 record.set_cx(child_cx_full.clone());
408 }
409
410 let cx_for_send = child_cx_full;
412
413 let future = {
418 struct TaskCreationGuard<'a> {
419 state: &'a mut RuntimeState,
420 task_id: TaskId,
421 region_id: RegionId,
422 committed: bool,
423 }
424
425 impl Drop for TaskCreationGuard<'_> {
426 fn drop(&mut self) {
427 if !self.committed {
428 if let Some(region) = self.state.region_mut(self.region_id) {
430 region.remove_task(self.task_id);
431 }
432 self.state.remove_task(self.task_id);
433 }
434 }
435 }
436
437 let mut guard = TaskCreationGuard {
438 state,
439 task_id,
440 region_id: self.region,
441 committed: false,
442 };
443
444 let fut = f(child_cx);
445 guard.committed = true;
446 fut
447 };
448
449 let wrapped = async move {
453 let result_result = CatchUnwind { inner: future }.await;
454 match result_result {
455 Ok(result) => {
456 let _ = tx.send(&cx_for_send, Ok(result));
457 crate::types::Outcome::Ok(())
458 }
459 Err(payload) => {
460 let msg = payload_to_string(&payload);
461 let panic_payload = PanicPayload::new(msg);
462 let _ = tx.send(
463 &cx_for_send,
464 Err(JoinError::Panicked(panic_payload.clone())),
465 );
466 crate::types::Outcome::Panicked(panic_payload)
467 }
468 }
469 };
470
471 let stored = StoredTask::new_with_id(wrapped, task_id);
473
474 Ok((handle, stored))
475 }
476
477 #[inline]
500 pub fn spawn_task<F, Fut, Caps>(
501 &self,
502 state: &mut RuntimeState,
503 cx: &Cx<Caps>,
504 f: F,
505 ) -> Result<(TaskHandle<Fut::Output>, StoredTask), SpawnError>
506 where
507 Caps: cap::HasSpawn + Send + Sync + 'static,
508 F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
509 Fut: Future + Send + 'static,
510 Fut::Output: Send + 'static,
511 {
512 self.spawn(state, cx, f)
513 }
514
515 pub fn spawn_registered<F, Fut, Caps>(
542 &self,
543 state: &mut RuntimeState,
544 cx: &Cx<Caps>,
545 f: F,
546 ) -> Result<TaskHandle<Fut::Output>, SpawnError>
547 where
548 Caps: cap::HasSpawn + Send + Sync + 'static,
549 F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
550 Fut: Future + Send + 'static,
551 Fut::Output: Send + 'static,
552 {
553 let (handle, stored) = self.spawn(state, cx, f)?;
554 state.store_spawned_task(handle.task_id(), stored);
555 Ok(handle)
556 }
557
558 #[allow(clippy::too_many_lines)]
598 pub fn spawn_local<F, Fut, Caps>(
599 &self,
600 state: &mut RuntimeState,
601 cx: &Cx<Caps>,
602 f: F,
603 ) -> Result<TaskHandle<Fut::Output>, SpawnError>
604 where
605 Caps: cap::HasSpawn + Send + Sync + 'static,
606 F: FnOnce(Cx<Caps>) -> Fut + 'static,
607 Fut: Future + 'static,
608 Fut::Output: Send + 'static,
609 {
610 use crate::runtime::stored_task::LocalStoredTask;
611 use crate::runtime::task_handle::JoinError;
612
613 let (result_tx, rx) = oneshot::channel::<Result<Fut::Output, JoinError>>();
615
616 let task_id = self.create_task_record(state)?;
618
619 let _span = debug_span!(
621 "task_spawn",
622 task_id = ?task_id,
623 region_id = ?self.region,
624 initial_state = "Created",
625 budget_deadline = ?self.budget.deadline,
626 budget_poll_quota = self.budget.poll_quota,
627 budget_cost_quota = ?self.budget.cost_quota,
628 budget_priority = self.budget.priority,
629 budget_source = "scope_local"
630 )
631 .entered();
632 debug!(
633 task_id = ?task_id,
634 region_id = ?self.region,
635 initial_state = "Created",
636 budget_deadline = ?self.budget.deadline,
637 budget_poll_quota = self.budget.poll_quota,
638 budget_cost_quota = ?self.budget.cost_quota,
639 budget_priority = self.budget.priority,
640 budget_source = "scope_local",
641 "local task spawned"
642 );
643
644 let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
645
646 let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
648
649 if let Some(record) = state.task_mut(task_id) {
651 record.set_cx_inner(child_cx.inner.clone());
652 record.set_cx(child_cx_full.clone());
653 }
654
655 let cx_for_send = child_cx_full;
657
658 let future = {
661 struct TaskCreationGuard<'a> {
662 state: &'a mut RuntimeState,
663 task_id: TaskId,
664 region_id: RegionId,
665 committed: bool,
666 }
667
668 impl Drop for TaskCreationGuard<'_> {
669 fn drop(&mut self) {
670 if !self.committed {
671 if let Some(region) = self.state.region_mut(self.region_id) {
673 region.remove_task(self.task_id);
674 }
675 self.state.remove_task(self.task_id);
676 }
677 }
678 }
679
680 let mut guard = TaskCreationGuard {
681 state,
682 task_id,
683 region_id: self.region,
684 committed: false,
685 };
686
687 let fut = f(child_cx);
688 guard.committed = true;
689 fut
690 };
691
692 let wrapped = async move {
694 let result_result = CatchUnwind { inner: future }.await;
695 match result_result {
696 Ok(result) => {
697 let _ = result_tx.send(&cx_for_send, Ok(result));
698 crate::types::Outcome::Ok(())
699 }
700 Err(payload) => {
701 let msg = payload_to_string(&payload);
702 let panic_payload = PanicPayload::new(msg);
703 let _ = result_tx.send(
704 &cx_for_send,
705 Err(JoinError::Panicked(panic_payload.clone())),
706 );
707 crate::types::Outcome::Panicked(panic_payload)
708 }
709 }
710 };
711
712 let stored = LocalStoredTask::new_with_id(wrapped, task_id);
714
715 crate::runtime::local::store_local_task(task_id, stored);
717
718 if let Some(record) = state.task_mut(task_id) {
722 if let Some(worker_id) = crate::runtime::scheduler::three_lane::current_worker_id() {
723 record.pin_to_worker(worker_id);
724 } else {
725 record.mark_local();
726 }
727 record.wake_state.notify();
728 }
729
730 let scheduled = crate::runtime::scheduler::three_lane::schedule_local_task(task_id);
733
734 if scheduled {
735 if let Some(record) = state.task(task_id) {
736 let _ = record.wake_state.notify();
737 }
738 return Ok(handle);
739 }
740
741 let _ = crate::runtime::local::remove_local_task(task_id);
743 if let Some(region) = state.region(self.region) {
744 region.remove_task(task_id);
745 }
746 state.remove_task(task_id);
747 Err(SpawnError::LocalSchedulerUnavailable)
748 }
749
750 pub fn spawn_blocking<F, R, Caps>(
784 &self,
785 state: &mut RuntimeState,
786 cx: &Cx<Caps>, f: F,
788 ) -> Result<(TaskHandle<R>, StoredTask), SpawnError>
789 where
790 Caps: cap::HasSpawn + Send + Sync + 'static,
791 F: FnOnce(Cx<Caps>) -> R + Send + 'static,
792 R: Send + 'static,
793 {
794 let (tx, rx) = oneshot::channel::<Result<R, JoinError>>();
796
797 let task_id = self.create_task_record(state)?;
799
800 debug!(
802 task_id = ?task_id,
803 region_id = ?self.region,
804 initial_state = "Created",
805 poll_quota = self.budget.poll_quota,
806 spawn_kind = "blocking",
807 "blocking task spawned"
808 );
809
810 let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
811
812 let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
814
815 if let Some(record) = state.task_mut(task_id) {
817 record.set_cx_inner(child_cx.inner.clone());
818 record.set_cx(child_cx_full.clone());
819 }
820
821 let cx_for_send = child_cx_full;
823
824 let wrapped = async move {
827 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(child_cx)));
830 match result {
831 Ok(res) => {
832 let _ = tx.send(&cx_for_send, Ok(res));
833 crate::types::Outcome::Ok(())
834 }
835 Err(payload) => {
836 let msg = payload_to_string(&payload);
837 let panic_payload = PanicPayload::new(msg);
838 let _ = tx.send(
839 &cx_for_send,
840 Err(JoinError::Panicked(panic_payload.clone())),
841 );
842 crate::types::Outcome::Panicked(panic_payload)
843 }
844 }
845 };
846
847 let stored = StoredTask::new_with_id(wrapped, task_id);
848
849 Ok((handle, stored))
850 }
851
852 pub async fn region<P2, F, Fut, T, Caps>(
869 &self,
870 state: &mut RuntimeState,
871 cx: &Cx<Caps>,
872 policy: P2,
873 f: F,
874 ) -> Result<Outcome<T, P2::Error>, RegionCreateError>
875 where
876 P2: Policy,
877 F: FnOnce(Scope<'_, P2>, &mut RuntimeState) -> Fut,
878 Fut: Future<Output = Outcome<T, P2::Error>>,
879 {
880 self.region_with_budget(state, cx, self.budget, policy, f)
881 .await
882 }
883
884 pub async fn region_with_budget<P2, F, Fut, T, Caps>(
889 &self,
890 state: &mut RuntimeState,
891 _cx: &Cx<Caps>,
892 budget: Budget,
893 _policy: P2,
894 f: F,
895 ) -> Result<Outcome<T, P2::Error>, RegionCreateError>
896 where
897 P2: Policy,
898 F: FnOnce(Scope<'_, P2>, &mut RuntimeState) -> Fut,
899 Fut: Future<Output = Outcome<T, P2::Error>>,
900 {
901 let child_region = state.create_child_region(self.region, budget)?;
902 let child_budget = state
903 .region(child_region)
904 .map_or(self.budget, crate::record::RegionRecord::budget);
905 let child_scope = Scope::<P2>::new(child_region, child_budget);
906
907 let fut = f(child_scope, &mut *state);
908 let pinned_fut = std::pin::pin!(CatchUnwind { inner: fut });
909
910 let runner = RegionRunner {
911 fut: pinned_fut,
912 state: Some(state),
913 child_region,
914 };
915
916 let (result, state) = runner.await;
917 let outcome = match result {
918 Ok(outcome) => outcome,
919 Err(payload) => {
920 let msg = payload_to_string(&payload);
921 Outcome::Panicked(PanicPayload::new(msg))
922 }
923 };
924
925 match &outcome {
926 Outcome::Ok(_) => {
927 if let Some(region) = state.region(child_region) {
928 region.begin_close(None);
929 }
930 }
931 Outcome::Cancelled(reason) => {
932 let _ = state.cancel_request(child_region, reason, None);
933 }
934 Outcome::Err(_) | Outcome::Panicked(_) => {
935 let reason = CancelReason::fail_fast().with_region(child_region);
936 let _ = state.cancel_request(child_region, &reason, None);
937 }
938 }
939
940 let close_notify = state.region(child_region).map(|r| r.close_notify.clone());
941 state.advance_region_state(child_region);
942
943 if let Some(notify) = close_notify {
944 RegionCloseFuture { state: notify }.await;
945 }
946
947 Ok(outcome)
948 }
949
950 pub async fn join<T1, T2>(
966 &self,
967 cx: &Cx,
968 mut h1: TaskHandle<T1>,
969 mut h2: TaskHandle<T2>,
970 ) -> (Result<T1, JoinError>, Result<T2, JoinError>) {
971 let mut f1 = h1.join(cx);
972 let mut f2 = h2.join(cx);
973 let r1 = std::pin::Pin::new(&mut f1).await;
974 let r2 = std::pin::Pin::new(&mut f2).await;
975 (r1, r2)
976 }
977
978 pub async fn race<T>(
992 &self,
993 cx: &Cx,
994 mut h1: TaskHandle<T>,
995 mut h2: TaskHandle<T>,
996 ) -> Result<T, JoinError> {
997 let winner = {
998 let f1 = h1.join_with_drop_reason(cx, CancelReason::race_loser());
999 let mut f1 = std::pin::pin!(f1);
1000 let f2 = h2.join_with_drop_reason(cx, CancelReason::race_loser());
1001 let mut f2 = std::pin::pin!(f2);
1002 Select::new(f1.as_mut(), f2.as_mut())
1003 .await
1004 .map_err(|_| JoinError::PolledAfterCompletion)?
1005 };
1006
1007 match winner {
1008 Either::Left(res) => {
1009 let loser_res = h2.join(cx).await;
1010 if let Err(JoinError::Panicked(p)) = res {
1011 Err(JoinError::Panicked(p))
1012 } else if let Err(JoinError::Panicked(p)) = loser_res {
1013 Err(JoinError::Panicked(p))
1014 } else {
1015 res
1016 }
1017 }
1018 Either::Right(res) => {
1019 let loser_res = h1.join(cx).await;
1020 if let Err(JoinError::Panicked(p)) = res {
1021 Err(JoinError::Panicked(p))
1022 } else if let Err(JoinError::Panicked(p)) = loser_res {
1023 Err(JoinError::Panicked(p))
1024 } else {
1025 res
1026 }
1027 }
1028 }
1029 }
1030
1031 pub async fn hedge<F1, Fut1, F2, Fut2, T>(
1050 &self,
1051 state: &mut RuntimeState,
1052 cx: &Cx,
1053 delay: std::time::Duration,
1054 primary: F1,
1055 backup: F2,
1056 ) -> Result<T, JoinError>
1057 where
1058 F1: FnOnce(Cx) -> Fut1 + Send + 'static,
1059 Fut1: Future<Output = T> + Send + 'static,
1060 F2: FnOnce(Cx) -> Fut2 + Send + 'static,
1061 Fut2: Future<Output = T> + Send + 'static,
1062 T: Send + 'static,
1063 {
1064 use crate::combinator::Either;
1065 use crate::combinator::select::Select;
1066 let mut h1 = self
1068 .spawn_registered(state, cx, primary)
1069 .map_err(|_| JoinError::Cancelled(CancelReason::resource_unavailable()))?;
1070
1071 let primary_or_delay = {
1074 let f1_primary = h1.join(cx);
1075 let mut f1_primary = std::pin::pin!(f1_primary);
1076
1077 let now = cx
1078 .timer_driver()
1079 .map_or_else(crate::time::wall_now, |d| d.now());
1080 let sleep_fut = crate::time::sleep(now, delay);
1081 let mut sleep_pinned = std::pin::pin!(sleep_fut);
1082
1083 let res = Select::new(f1_primary.as_mut(), sleep_pinned.as_mut())
1084 .await
1085 .map_err(|_| JoinError::PolledAfterCompletion)?;
1086 if matches!(res, Either::Right(())) {
1087 f1_primary.defuse_drop_abort();
1088 }
1089 res
1090 };
1091
1092 match primary_or_delay {
1093 Either::Left(res) => {
1094 res
1096 }
1097 Either::Right(()) => {
1098 let Ok(mut h2) = self.spawn_registered(state, cx, backup) else {
1100 h1.abort_with_reason(CancelReason::resource_unavailable());
1103
1104 if crate::runtime::scheduler::three_lane::current_worker_id().is_some() {
1105 match h1.join(cx).await {
1108 Ok(res) => return Ok(res),
1109 Err(JoinError::Panicked(p)) => return Err(JoinError::Panicked(p)),
1110 Err(JoinError::Cancelled(_) | JoinError::PolledAfterCompletion) => {}
1111 }
1112 } else {
1113 let mut drain = std::pin::pin!(h1.join(cx));
1117 let waker = std::task::Waker::noop();
1118 let mut poll_cx = Context::from_waker(waker);
1119 match drain.as_mut().poll(&mut poll_cx) {
1120 std::task::Poll::Ready(Ok(res)) => return Ok(res),
1121 std::task::Poll::Ready(Err(JoinError::Panicked(p))) => {
1122 return Err(JoinError::Panicked(p));
1123 }
1124 _ => {}
1125 }
1126 }
1127
1128 return Err(JoinError::Cancelled(CancelReason::resource_unavailable()));
1129 };
1130
1131 let race_outcome = {
1133 let f1_race = h1.join_with_drop_reason(cx, CancelReason::race_loser());
1134 let mut f1_race = std::pin::pin!(f1_race);
1135 let f2_race = h2.join_with_drop_reason(cx, CancelReason::race_loser());
1136 let mut f2_race = std::pin::pin!(f2_race);
1137 Select::new(f1_race.as_mut(), f2_race.as_mut())
1138 .await
1139 .map_err(|_| JoinError::PolledAfterCompletion)?
1140 };
1141
1142 match race_outcome {
1143 Either::Left(res) => {
1144 let loser_res = h2.join(cx).await;
1145 if let Err(JoinError::Panicked(p)) = res {
1146 Err(JoinError::Panicked(p))
1147 } else if let Err(JoinError::Panicked(p)) = loser_res {
1148 Err(JoinError::Panicked(p))
1149 } else {
1150 res
1151 }
1152 }
1153 Either::Right(res) => {
1154 let loser_res = h1.join(cx).await;
1155 if let Err(JoinError::Panicked(p)) = res {
1156 Err(JoinError::Panicked(p))
1157 } else if let Err(JoinError::Panicked(p)) = loser_res {
1158 Err(JoinError::Panicked(p))
1159 } else {
1160 res
1161 }
1162 }
1163 }
1164 }
1165 }
1166 }
1167
1168 pub async fn race_all<T>(
1180 &self,
1181 cx: &Cx,
1182 handles: Vec<TaskHandle<T>>,
1183 ) -> Result<(T, usize), JoinError> {
1184 let mut handles = handles;
1185 if handles.is_empty() {
1186 return std::future::pending().await;
1187 }
1188
1189 let mut futures: Vec<_> = handles
1190 .iter_mut()
1191 .map(|h| h.join_with_drop_reason(cx, CancelReason::race_loser()))
1192 .collect();
1193 let mut ready_results: Vec<Option<Result<T, JoinError>>> = std::iter::repeat_with(|| None)
1194 .take(futures.len())
1195 .collect();
1196 let mut winner_idx = None;
1197
1198 let winner_idx = std::future::poll_fn(|poll_cx| {
1202 let mut newly_ready = Vec::new();
1203
1204 for (i, future) in futures.iter_mut().enumerate() {
1205 if ready_results[i].is_some() {
1206 continue;
1207 }
1208 if let std::task::Poll::Ready(res) = std::pin::Pin::new(future).poll(poll_cx) {
1209 ready_results[i] = Some(res);
1210 newly_ready.push(i);
1211 }
1212 }
1213
1214 if let Some(existing) = winner_idx {
1215 return std::task::Poll::Ready(existing);
1216 }
1217
1218 if newly_ready.is_empty() {
1219 std::task::Poll::Pending
1220 } else {
1221 let chosen = newly_ready[cx.random_usize(newly_ready.len())];
1223 winner_idx = Some(chosen);
1224 std::task::Poll::Ready(chosen)
1225 }
1226 })
1227 .await;
1228
1229 let winner_result = ready_results[winner_idx]
1230 .take()
1231 .expect("winner index must have a ready result");
1232
1233 drop(futures);
1236
1237 let mut loser_panic = None;
1240 let mut pending_loser_indices = Vec::new();
1241 for (i, handle) in handles.iter_mut().enumerate() {
1242 if i == winner_idx {
1243 continue;
1244 }
1245 if let Some(res) = ready_results[i].take() {
1246 if let Err(JoinError::Panicked(p)) = res {
1247 if loser_panic.is_none() {
1248 loser_panic = Some(p);
1249 }
1250 }
1251 } else if handle.is_finished() {
1252 let res = handle.join(cx).await;
1253 if let Err(JoinError::Panicked(p)) = res {
1254 if loser_panic.is_none() {
1255 loser_panic = Some(p);
1256 }
1257 }
1258 } else {
1259 pending_loser_indices.push(i);
1260 }
1261 }
1262
1263 for &idx in &pending_loser_indices {
1267 handles[idx].abort_with_reason(CancelReason::race_loser());
1268 }
1269 for idx in pending_loser_indices {
1270 let res = handles[idx].join(cx).await;
1271 if let Err(JoinError::Panicked(p)) = res {
1272 if loser_panic.is_none() {
1273 loser_panic = Some(p);
1274 }
1275 }
1276 }
1277
1278 loser_panic.map_or_else(
1279 || winner_result.map(|val| (val, winner_idx)),
1280 |p| Err(JoinError::Panicked(p)),
1281 )
1282 }
1283
1284 pub async fn join_all<T>(
1288 &self,
1289 cx: &Cx,
1290 mut handles: Vec<TaskHandle<T>>,
1291 ) -> Vec<Result<T, JoinError>> {
1292 let mut futures: Vec<_> = handles.iter_mut().map(|h| h.join(cx)).collect();
1293 let mut results = Vec::with_capacity(futures.len());
1294 for fut in &mut futures {
1295 results.push(std::pin::Pin::new(fut).await);
1296 }
1297 results
1298 }
1299
1300 pub(crate) fn build_child_task_cx<Caps>(
1301 &self,
1302 state: &RuntimeState,
1303 parent_cx: &Cx<Caps>,
1304 task_id: TaskId,
1305 ) -> (Cx<Caps>, Cx<cap::All>) {
1306 let child_observability = parent_cx.child_observability(self.region, task_id);
1307 let child_entropy = parent_cx.child_entropy(task_id);
1308 let io_driver = state.io_driver_handle();
1309 let timer_driver = state.timer_driver_handle();
1310 let logical_clock = state
1311 .logical_clock_mode()
1312 .build_handle(timer_driver.clone());
1313
1314 let child_cx = Cx::<Caps>::new_with_drivers(
1315 self.region,
1316 task_id,
1317 self.budget,
1318 Some(child_observability),
1319 io_driver,
1320 None,
1321 timer_driver,
1322 Some(child_entropy),
1323 )
1324 .with_logical_clock(logical_clock)
1325 .with_registry_handle(parent_cx.registry_handle())
1326 .with_remote_cap_handle(parent_cx.remote_cap_handle())
1327 .with_blocking_pool_handle(parent_cx.blocking_pool_handle())
1328 .with_evidence_sink(parent_cx.evidence_sink_handle());
1329 child_cx.set_trace_buffer(state.trace_handle());
1330 let child_cx_full = child_cx.retype::<cap::All>();
1331
1332 (child_cx, child_cx_full)
1333 }
1334
1335 pub(crate) fn create_task_record(
1339 &self,
1340 state: &mut RuntimeState,
1341 ) -> Result<TaskId, SpawnError> {
1342 use crate::util::ArenaIndex;
1343
1344 let idx = state.insert_task(TaskRecord::new_with_time(
1346 TaskId::from_arena(ArenaIndex::new(0, 0)), self.region,
1348 self.budget,
1349 state.now,
1350 ));
1351
1352 let task_id = TaskId::from_arena(idx);
1354
1355 if let Some(record) = state.task_mut(task_id) {
1357 record.id = task_id;
1358 }
1359
1360 if let Some(region) = state.region(self.region) {
1362 if let Err(err) = region.add_task(task_id) {
1363 state.remove_task(task_id);
1365 return Err(match err {
1366 AdmissionError::Closed => SpawnError::RegionClosed(self.region),
1367 AdmissionError::LimitReached { limit, live, .. } => {
1368 SpawnError::RegionAtCapacity {
1369 region: self.region,
1370 limit,
1371 live,
1372 }
1373 }
1374 });
1375 }
1376 } else {
1377 state.remove_task(task_id);
1379 return Err(SpawnError::RegionNotFound(self.region));
1380 }
1381
1382 state.record_task_spawn(task_id, self.region);
1383
1384 Ok(task_id)
1385 }
1386
1387 pub fn defer_sync<F>(&self, state: &mut RuntimeState, f: F) -> bool
1411 where
1412 F: FnOnce() + Send + 'static,
1413 {
1414 state.register_sync_finalizer(self.region, f)
1415 }
1416
1417 pub fn defer_async<F>(&self, state: &mut RuntimeState, future: F) -> bool
1438 where
1439 F: Future<Output = ()> + Send + 'static,
1440 {
1441 state.register_async_finalizer(self.region, future)
1442 }
1443}
1444
1445impl<P: Policy> std::fmt::Debug for Scope<'_, P> {
1446 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1447 f.debug_struct("Scope")
1448 .field("region", &self.region)
1449 .field("budget", &self.budget)
1450 .finish()
1451 }
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456 use super::*;
1457 use crate::record::RegionLimits;
1458 use crate::runtime::RuntimeState;
1459 use crate::types::{CancelKind, Outcome};
1460 use crate::util::ArenaIndex;
1461 use futures_lite::future::block_on;
1462 use std::sync::Arc;
1463
1464 fn test_cx() -> Cx {
1465 Cx::new(
1466 RegionId::from_arena(ArenaIndex::new(0, 0)),
1467 TaskId::from_arena(ArenaIndex::new(0, 0)),
1468 Budget::INFINITE,
1469 )
1470 }
1471
1472 fn test_scope(region: RegionId, budget: Budget) -> Scope<'static> {
1473 Scope::new(region, budget)
1474 }
1475
1476 #[test]
1477 fn spawn_creates_task_record() {
1478 let mut state = RuntimeState::new();
1479 let cx = test_cx();
1480 let region = state.create_root_region(Budget::INFINITE);
1481 let scope = test_scope(region, Budget::INFINITE);
1482
1483 let (handle, _stored) = scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1484
1485 let task = state.task(handle.task_id());
1487 assert!(task.is_some());
1488
1489 let task = task.unwrap();
1491 assert_eq!(task.owner, region);
1492 }
1493
1494 #[test]
1495 fn spawn_inherits_registry_and_remote_capabilities() {
1496 use crate::cx::registry::RegistryHandle;
1497 use crate::remote::{NodeId, RemoteCap};
1498 use std::task::{Context, Waker};
1499
1500 struct NoopWaker;
1501 impl std::task::Wake for NoopWaker {
1502 fn wake(self: Arc<Self>) {}
1503 }
1504
1505 let mut state = RuntimeState::new();
1506
1507 let registry = crate::cx::NameRegistry::new();
1508 let registry_handle = RegistryHandle::new(Arc::new(registry));
1509 let parent_registry_arc = registry_handle.as_arc();
1510
1511 let cx = test_cx()
1512 .with_registry_handle(Some(registry_handle))
1513 .with_remote_cap(RemoteCap::new().with_local_node(NodeId::new("origin-test")));
1514
1515 let region = state.create_root_region(Budget::INFINITE);
1516 let scope = test_scope(region, Budget::INFINITE);
1517
1518 let mut handle = scope
1519 .spawn_registered(&mut state, &cx, move |cx| async move {
1520 let child_registry = cx.registry_handle().expect("child must inherit registry");
1521 let child_registry_arc = child_registry.as_arc();
1522 let same_registry = Arc::ptr_eq(&child_registry_arc, &parent_registry_arc);
1523
1524 let child_remote = cx.remote().expect("child must inherit remote cap");
1525 let origin = child_remote.local_node().as_str().to_owned();
1526
1527 (same_registry, origin)
1528 })
1529 .unwrap();
1530
1531 let waker = Waker::from(Arc::new(NoopWaker));
1532 let mut poll_cx = Context::from_waker(&waker);
1533
1534 let stored = state
1535 .get_stored_future(handle.task_id())
1536 .expect("spawn_registered must store the task");
1537 assert!(stored.poll(&mut poll_cx).is_ready());
1538
1539 let mut join_fut = std::pin::pin!(handle.join(&cx));
1540 match join_fut.as_mut().poll(&mut poll_cx) {
1541 Poll::Ready(Ok((same_registry, origin))) => {
1542 assert!(
1543 same_registry,
1544 "child should observe the same RegistryCap instance"
1545 );
1546 assert_eq!(origin, "origin-test");
1547 }
1548 other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1549 }
1550 }
1551
1552 #[test]
1553 fn spawn_inherits_runtime_timer_driver() {
1554 use std::task::{Context, Waker};
1555
1556 struct NoopWaker;
1557 impl std::task::Wake for NoopWaker {
1558 fn wake(self: Arc<Self>) {}
1559 }
1560
1561 let mut state = RuntimeState::new();
1562 let clock = Arc::new(crate::time::VirtualClock::new());
1563 state.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(clock));
1564
1565 let cx = test_cx();
1566 let region = state.create_root_region(Budget::INFINITE);
1567 let scope = test_scope(region, Budget::INFINITE);
1568
1569 let (mut handle, mut stored) = scope
1570 .spawn(&mut state, &cx, |cx| async move { cx.has_timer() })
1571 .expect("spawn should succeed");
1572
1573 let waker = Waker::from(Arc::new(NoopWaker));
1574 let mut poll_cx = Context::from_waker(&waker);
1575 assert!(stored.poll(&mut poll_cx).is_ready());
1576
1577 let mut join_fut = std::pin::pin!(handle.join(&cx));
1578 match join_fut.as_mut().poll(&mut poll_cx) {
1579 Poll::Ready(Ok(has_timer)) => assert!(has_timer),
1580 other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1581 }
1582 }
1583
1584 #[test]
1585 fn spawn_blocking_inherits_runtime_timer_driver() {
1586 use std::task::{Context, Waker};
1587
1588 struct NoopWaker;
1589 impl std::task::Wake for NoopWaker {
1590 fn wake(self: Arc<Self>) {}
1591 }
1592
1593 let mut state = RuntimeState::new();
1594 let clock = Arc::new(crate::time::VirtualClock::new());
1595 state.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(clock));
1596
1597 let cx = test_cx();
1598 let region = state.create_root_region(Budget::INFINITE);
1599 let scope = test_scope(region, Budget::INFINITE);
1600
1601 let (mut handle, mut stored) = scope
1602 .spawn_blocking(&mut state, &cx, |cx| cx.has_timer())
1603 .expect("spawn_blocking should succeed");
1604
1605 let waker = Waker::from(Arc::new(NoopWaker));
1606 let mut poll_cx = Context::from_waker(&waker);
1607 assert!(stored.poll(&mut poll_cx).is_ready());
1608
1609 let mut join_fut = std::pin::pin!(handle.join(&cx));
1610 match join_fut.as_mut().poll(&mut poll_cx) {
1611 Poll::Ready(Ok(has_timer)) => assert!(has_timer),
1612 other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1613 }
1614 }
1615
1616 #[test]
1617 fn spawn_registered_stores_task() {
1618 let mut state = RuntimeState::new();
1619 let cx = test_cx();
1620 let region = state.create_root_region(Budget::INFINITE);
1621 let scope = test_scope(region, Budget::INFINITE);
1622
1623 let handle = scope
1625 .spawn_registered(&mut state, &cx, |_| async { 42_i32 })
1626 .unwrap();
1627
1628 let task = state.task(handle.task_id());
1630 assert!(task.is_some());
1631 assert_eq!(task.unwrap().owner, region);
1632
1633 let stored = state.get_stored_future(handle.task_id());
1635 assert!(stored.is_some(), "spawn_registered should store the task");
1636 }
1637
1638 #[test]
1639 fn spawn_registered_task_can_be_polled() {
1640 use std::sync::Arc;
1641 use std::task::{Context, Waker};
1642
1643 struct NoopWaker;
1644 impl std::task::Wake for NoopWaker {
1645 fn wake(self: Arc<Self>) {}
1646 }
1647
1648 let mut state = RuntimeState::new();
1649 let cx = test_cx();
1650 let region = state.create_root_region(Budget::INFINITE);
1651 let scope = test_scope(region, Budget::INFINITE);
1652
1653 let mut handle = scope
1654 .spawn_registered(&mut state, &cx, |_| async { 42_i32 })
1655 .unwrap();
1656
1657 let waker = Waker::from(Arc::new(NoopWaker));
1659 let mut poll_cx = Context::from_waker(&waker);
1660
1661 let stored = state.get_stored_future(handle.task_id()).unwrap();
1662 let poll_result = stored.poll(&mut poll_cx);
1663 assert!(
1664 poll_result.is_ready(),
1665 "Simple async should complete in one poll"
1666 );
1667
1668 let mut join_fut = std::pin::pin!(handle.join(&cx));
1670 match join_fut.as_mut().poll(&mut poll_cx) {
1671 Poll::Ready(Ok(val)) => assert_eq!(val, 42),
1672 other => unreachable!("Expected Ready(Ok(42)), got {other:?}"),
1673 }
1674 }
1675
1676 #[test]
1677 fn spawn_blocking_creates_task_record() {
1678 let mut state = RuntimeState::new();
1679 let cx = test_cx();
1680 let region = state.create_root_region(Budget::INFINITE);
1681 let scope = test_scope(region, Budget::INFINITE);
1682
1683 let (handle, _stored) = scope.spawn_blocking(&mut state, &cx, |_| 42_i32).unwrap();
1684
1685 let task = state.task(handle.task_id());
1687 assert!(task.is_some());
1688 assert_eq!(task.unwrap().owner, region);
1689 }
1690
1691 #[test]
1692 fn spawn_local_creates_task_record() {
1693 let mut state = RuntimeState::new();
1694 let cx = test_cx();
1695 let region = state.create_root_region(Budget::INFINITE);
1696 let scope = test_scope(region, Budget::INFINITE);
1697
1698 let local_ready = Arc::new(parking_lot::Mutex::new(Vec::new()));
1699 let _local_ready_guard =
1700 crate::runtime::scheduler::three_lane::ScopedLocalReady::new(Arc::clone(&local_ready));
1701 let _worker_guard = crate::runtime::scheduler::three_lane::ScopedWorkerId::new(1);
1702
1703 let handle = scope
1706 .spawn_local(&mut state, &cx, |_| async move { 42_i32 })
1707 .unwrap();
1708
1709 let task = state.task(handle.task_id());
1711 assert!(task.is_some());
1712 assert_eq!(task.unwrap().owner, region);
1713 }
1714
1715 #[test]
1716 fn spawn_local_without_scheduler_fails_and_rolls_back() {
1717 let mut state = RuntimeState::new();
1718 let cx = test_cx();
1719 let region = state.create_root_region(Budget::INFINITE);
1720 let scope = test_scope(region, Budget::INFINITE);
1721
1722 let result = scope.spawn_local(&mut state, &cx, |_| async move { 5_i32 });
1723 assert!(matches!(result, Err(SpawnError::LocalSchedulerUnavailable)));
1724
1725 assert!(state.tasks_is_empty());
1727 let region_record = state.region(region).unwrap();
1728 assert!(region_record.task_ids().is_empty());
1729 }
1730
1731 #[test]
1732 fn spawn_local_makes_progress_via_local_ready() {
1733 use std::sync::Arc;
1734 use std::task::{Context, Waker};
1735
1736 struct NoopWaker;
1737 impl std::task::Wake for NoopWaker {
1738 fn wake(self: Arc<Self>) {}
1739 }
1740
1741 let mut state = RuntimeState::new();
1742 let cx = test_cx();
1743 let region = state.create_root_region(Budget::INFINITE);
1744 let scope = test_scope(region, Budget::INFINITE);
1745
1746 let local_ready = Arc::new(parking_lot::Mutex::new(Vec::new()));
1747 let _local_ready_guard =
1748 crate::runtime::scheduler::three_lane::ScopedLocalReady::new(Arc::clone(&local_ready));
1749 let _worker_guard = crate::runtime::scheduler::three_lane::ScopedWorkerId::new(1);
1750
1751 let mut handle = scope
1752 .spawn_local(&mut state, &cx, |_| async move { 7_i32 })
1753 .unwrap();
1754
1755 let queued = {
1756 let queue = local_ready.lock();
1757 queue.contains(&handle.task_id())
1758 };
1759 assert!(queued, "spawn_local should enqueue into local_ready");
1760
1761 let task_id = {
1762 let mut queue = local_ready.lock();
1763 queue.remove(0)
1764 };
1765
1766 let mut join_fut = std::pin::pin!(handle.join(&cx));
1767 let waker = Waker::from(Arc::new(NoopWaker));
1768 let mut ctx = Context::from_waker(&waker);
1769
1770 assert!(join_fut.as_mut().poll(&mut ctx).is_pending());
1771
1772 let mut local_task =
1773 crate::runtime::local::remove_local_task(task_id).expect("local task missing");
1774 assert!(local_task.poll(&mut ctx).is_ready());
1775
1776 match join_fut.as_mut().poll(&mut ctx) {
1777 Poll::Ready(Ok(val)) => assert_eq!(val, 7),
1778 res => unreachable!("Expected Ready(Ok(7)), got {res:?}"),
1779 }
1780 }
1781
1782 #[test]
1783 fn task_added_to_region() {
1784 let mut state = RuntimeState::new();
1785 let cx = test_cx();
1786 let region = state.create_root_region(Budget::INFINITE);
1787 let scope = test_scope(region, Budget::INFINITE);
1788
1789 let (handle, _stored) = scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1790
1791 let region_record = state.region(region).unwrap();
1793 assert!(region_record.task_ids().contains(&handle.task_id()));
1794 }
1795
1796 #[test]
1797 fn multiple_spawns_create_distinct_tasks() {
1798 let mut state = RuntimeState::new();
1799 let cx = test_cx();
1800 let region = state.create_root_region(Budget::INFINITE);
1801 let scope = test_scope(region, Budget::INFINITE);
1802
1803 let (handle1, _) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
1804 let (handle2, _) = scope.spawn(&mut state, &cx, |_| async { 2_i32 }).unwrap();
1805 let (handle3, _) = scope.spawn(&mut state, &cx, |_| async { 3_i32 }).unwrap();
1806
1807 assert_ne!(handle1.task_id(), handle2.task_id());
1809 assert_ne!(handle2.task_id(), handle3.task_id());
1810 assert_ne!(handle1.task_id(), handle3.task_id());
1811
1812 let region_record = state.region(region).unwrap();
1814 assert!(region_record.task_ids().contains(&handle1.task_id()));
1815 assert!(region_record.task_ids().contains(&handle2.task_id()));
1816 assert!(region_record.task_ids().contains(&handle3.task_id()));
1817 }
1818
1819 #[test]
1820 fn spawn_into_closing_region_should_fail() {
1821 let mut state = RuntimeState::new();
1822 let cx = test_cx();
1823 let region = state.create_root_region(Budget::INFINITE);
1824 let scope = test_scope(region, Budget::INFINITE);
1825
1826 let region_record = state.region_mut(region).expect("region");
1828 region_record.begin_close(None);
1829
1830 let result = scope.spawn(&mut state, &cx, |_| async { 42 });
1832 assert!(matches!(result, Err(SpawnError::RegionClosed(_))));
1833 }
1834
1835 #[test]
1836 fn test_join_manual_poll() {
1837 use std::sync::Arc;
1838 use std::task::{Context, Waker};
1839
1840 struct NoopWaker;
1841 impl std::task::Wake for NoopWaker {
1842 fn wake(self: Arc<Self>) {}
1843 }
1844
1845 let mut state = RuntimeState::new();
1846 let cx = test_cx();
1847 let region = state.create_root_region(Budget::INFINITE);
1848 let scope = test_scope(region, Budget::INFINITE);
1849
1850 let (mut handle, mut stored_task) =
1852 scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1853 let mut join_fut = std::pin::pin!(handle.join(&cx));
1857
1858 let waker = Waker::from(Arc::new(NoopWaker));
1860 let mut ctx = Context::from_waker(&waker);
1861
1862 assert!(join_fut.as_mut().poll(&mut ctx).is_pending());
1864
1865 assert!(stored_task.poll(&mut ctx).is_ready());
1867
1868 match join_fut.as_mut().poll(&mut ctx) {
1870 Poll::Ready(Ok(val)) => assert_eq!(val, 42),
1871 other => unreachable!("Expected Ready(Ok(42)), got {other:?}"),
1872 }
1873 }
1874
1875 #[test]
1876 fn spawn_abort_cancels_task() {
1877 use std::sync::Arc;
1878 use std::task::{Context, Poll, Waker};
1879
1880 struct NoopWaker;
1881 impl std::task::Wake for NoopWaker {
1882 fn wake(self: Arc<Self>) {}
1883 }
1884
1885 let mut state = RuntimeState::new();
1886 let cx = test_cx();
1887 let region = state.create_root_region(Budget::INFINITE);
1888 let scope = test_scope(region, Budget::INFINITE);
1889
1890 let (mut handle, mut stored_task) = scope
1892 .spawn(&mut state, &cx, |cx| async move {
1893 if cx.checkpoint().is_err() {
1895 return "cancelled";
1896 }
1897 "finished"
1898 })
1899 .unwrap();
1900
1901 handle.abort();
1903
1904 let waker = Waker::from(Arc::new(NoopWaker));
1906 let mut ctx = Context::from_waker(&waker);
1907
1908 match stored_task.poll(&mut ctx) {
1910 Poll::Ready(crate::types::Outcome::Ok(())) => {}
1911 res => unreachable!("Task should have completed with Ok(()), got {res:?}"),
1912 }
1913
1914 let mut join_fut = std::pin::pin!(handle.join(&cx));
1916 match join_fut.as_mut().poll(&mut ctx) {
1917 Poll::Ready(Ok(val)) => assert_eq!(val, "cancelled"),
1918 Poll::Ready(Err(e)) => unreachable!("Task failed unexpectedly: {e}"),
1919 Poll::Pending => unreachable!("Join should be ready"),
1920 }
1921 }
1922
1923 #[test]
1924 fn hedge_backup_spawn_failure_aborts_primary() {
1925 let mut state = RuntimeState::new();
1926 let cx = test_cx();
1927 let region = state.create_root_region(Budget::INFINITE);
1928 let scope = test_scope(region, Budget::INFINITE);
1929
1930 let limits = RegionLimits {
1931 max_tasks: Some(1),
1932 ..RegionLimits::unlimited()
1933 };
1934 assert!(state.set_region_limits(region, limits));
1935
1936 let result = block_on(scope.hedge(
1937 &mut state,
1938 &cx,
1939 std::time::Duration::ZERO,
1940 |_| async { 1_u8 },
1941 |_| async { 2_u8 },
1942 ));
1943
1944 assert!(matches!(
1945 result,
1946 Err(JoinError::Cancelled(reason))
1947 if reason.kind == CancelKind::ResourceUnavailable
1948 ));
1949
1950 let task_id = *state
1951 .region(region)
1952 .expect("region missing")
1953 .task_ids()
1954 .first()
1955 .expect("primary task should remain tracked");
1956
1957 let task = state.task(task_id).expect("primary task record missing");
1958 let (cancel_requested, cancel_reason_kind) = {
1959 let inner = task
1960 .cx_inner
1961 .as_ref()
1962 .expect("primary task must have shared Cx inner")
1963 .read();
1964 (
1965 inner.cancel_requested,
1966 inner.cancel_reason.as_ref().map(|r| r.kind),
1967 )
1968 };
1969
1970 assert!(
1971 cancel_requested,
1972 "primary task must be cancellation-requested when backup spawn fails"
1973 );
1974 assert_eq!(cancel_reason_kind, Some(CancelKind::ResourceUnavailable));
1975 }
1976
1977 #[test]
1978 fn region_closes_empty_child() {
1979 let mut state = RuntimeState::new();
1980 let cx = test_cx();
1981 let parent = state.create_root_region(Budget::INFINITE);
1982 let scope = test_scope(parent, Budget::INFINITE);
1983
1984 let outcome = block_on(scope.region(
1985 &mut state,
1986 &cx,
1987 crate::types::policy::FailFast,
1988 |child, _state| {
1989 let child_id = child.region_id();
1990 async move { Outcome::Ok(child_id) }
1991 },
1992 ))
1993 .expect("child region created");
1994
1995 let child_id = match outcome {
1996 Outcome::Ok(id) => id,
1997 other => unreachable!("expected Outcome::Ok(child_id), got {other:?}"),
1998 };
1999
2000 assert!(
2001 state.region(child_id).is_none(),
2002 "closed child region should be reclaimed from arena"
2003 );
2004
2005 let parent_record = state.region(parent).expect("parent record missing");
2006 assert!(
2007 !parent_record.child_ids().contains(&child_id),
2008 "closed child should be removed from parent"
2009 );
2010 }
2011
2012 #[test]
2013 fn region_budget_is_met_with_parent() {
2014 let mut state = RuntimeState::new();
2015 let cx = test_cx();
2016 let parent = state.create_root_region(Budget::with_deadline_secs(10));
2017 let scope = test_scope(parent, Budget::with_deadline_secs(10));
2018
2019 let outcome = block_on(scope.region_with_budget(
2020 &mut state,
2021 &cx,
2022 Budget::with_deadline_secs(30),
2023 crate::types::policy::FailFast,
2024 |child, _state| {
2025 let child_id = child.region_id();
2026 let child_budget = child.budget();
2027 async move { Outcome::Ok((child_id, child_budget)) }
2028 },
2029 ))
2030 .expect("child region created");
2031
2032 let (child_id, child_budget) = match outcome {
2033 Outcome::Ok(tuple) => tuple,
2034 other => unreachable!("expected Outcome::Ok(child_id), got {other:?}"),
2035 };
2036
2037 assert_eq!(
2038 child_budget.deadline,
2039 Some(crate::types::Time::from_secs(10))
2040 );
2041 assert!(
2042 state.region(child_id).is_none(),
2043 "closed child region should be reclaimed from arena"
2044 );
2045 }
2046
2047 #[test]
2048 fn region_spawns_tasks_in_child() {
2049 use std::task::{Context, Poll, Waker};
2050
2051 struct NoopWaker;
2052 impl std::task::Wake for NoopWaker {
2053 fn wake(self: Arc<Self>) {}
2054 }
2055
2056 let mut state = RuntimeState::new();
2057 let cx = test_cx();
2058 let parent = state.create_root_region(Budget::INFINITE);
2059 let scope = test_scope(parent, Budget::INFINITE);
2060
2061 let outcome = block_on(scope.region(
2062 &mut state,
2063 &cx,
2064 crate::types::policy::FailFast,
2065 |child, state| {
2066 let child_id = child.region_id();
2067 let (handle, mut stored) = child
2068 .spawn(state, &cx, |_| async { 7_i32 })
2069 .expect("spawn in child");
2070
2071 let parent_has = state
2072 .region(parent)
2073 .expect("parent record missing")
2074 .task_ids()
2075 .contains(&handle.task_id());
2076 let child_has = state
2077 .region(child_id)
2078 .expect("child record missing")
2079 .task_ids()
2080 .contains(&handle.task_id());
2081
2082 let waker = Waker::from(Arc::new(NoopWaker));
2083 let mut poll_cx = Context::from_waker(&waker);
2084 let poll_result = stored.poll(&mut poll_cx);
2085 if let Poll::Ready(outcome) = poll_result {
2086 let task_outcome = match outcome {
2087 Outcome::Ok(()) => Outcome::Ok(()),
2088 Outcome::Panicked(payload) => Outcome::Panicked(payload),
2089 other => unreachable!("unexpected task outcome: {other:?}"),
2090 };
2091 if let Some(task_record) = state.task_mut(handle.task_id()) {
2092 task_record.complete(task_outcome);
2093 }
2094 let _ = state.task_completed(handle.task_id());
2095 }
2096
2097 std::future::ready(Outcome::Ok((child_id, parent_has, child_has)))
2098 },
2099 ))
2100 .expect("child region created");
2101
2102 let (child_id, parent_has, child_has) = match outcome {
2103 Outcome::Ok(tuple) => tuple,
2104 other => unreachable!("expected Outcome::Ok(tuple), got {other:?}"),
2105 };
2106
2107 assert!(!parent_has, "task should not be owned by parent region");
2108 assert!(child_has, "task should be owned by child region");
2109
2110 let parent_record = state.region(parent).expect("parent record missing");
2111 assert!(
2112 !parent_record.child_ids().contains(&child_id),
2113 "closed child should be removed from parent"
2114 );
2115 }
2116
2117 #[test]
2118 fn spawn_panic_propagates_as_panicked_error() {
2119 use std::sync::Arc;
2120 use std::task::{Context, Poll, Waker};
2121
2122 struct NoopWaker;
2123 impl std::task::Wake for NoopWaker {
2124 fn wake(self: Arc<Self>) {}
2125 }
2126
2127 let mut state = RuntimeState::new();
2128 let cx = test_cx();
2129 let region = state.create_root_region(Budget::INFINITE);
2130 let scope = test_scope(region, Budget::INFINITE);
2131
2132 let (mut handle, mut stored_task) = scope
2133 .spawn(&mut state, &cx, |_| async {
2134 std::panic::panic_any("oops");
2135 })
2136 .unwrap();
2137
2138 let waker = Waker::from(Arc::new(NoopWaker));
2140 let mut ctx = Context::from_waker(&waker);
2141
2142 match stored_task.poll(&mut ctx) {
2144 Poll::Ready(crate::types::Outcome::Panicked(_)) => {}
2145 res => unreachable!("Task should have completed with Panicked, got {res:?}"),
2146 }
2147
2148 let mut join_fut = std::pin::pin!(handle.join(&cx));
2150 match join_fut.as_mut().poll(&mut ctx) {
2151 Poll::Ready(Err(JoinError::Panicked(p))) => {
2152 assert_eq!(p.message(), "oops");
2153 }
2154 res => unreachable!("Expected Panicked, got {res:?}"),
2155 }
2156 }
2157
2158 #[test]
2159 fn join_all_success() {
2160 use std::sync::Arc;
2161 use std::task::{Context, Poll, Waker};
2162
2163 struct NoopWaker;
2164 impl std::task::Wake for NoopWaker {
2165 fn wake(self: Arc<Self>) {}
2166 }
2167
2168 let mut state = RuntimeState::new();
2169 let cx = test_cx();
2170 let region = state.create_root_region(Budget::INFINITE);
2171 let scope = test_scope(region, Budget::INFINITE);
2172
2173 let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1 }).unwrap();
2174 let (h2, mut t2) = scope.spawn(&mut state, &cx, |_| async { 2 }).unwrap();
2175
2176 let waker = Waker::from(Arc::new(NoopWaker));
2178 let mut ctx = Context::from_waker(&waker);
2179 assert!(t1.poll(&mut ctx).is_ready());
2180 assert!(t2.poll(&mut ctx).is_ready());
2181
2182 let handles = vec![h1, h2];
2183 let mut fut = Box::pin(scope.join_all(&cx, handles));
2184
2185 match fut.as_mut().poll(&mut ctx) {
2186 Poll::Ready(results) => {
2187 assert_eq!(results.len(), 2);
2188 assert_eq!(results[0].as_ref().unwrap(), &1);
2189 assert_eq!(results[1].as_ref().unwrap(), &2);
2190 }
2191 Poll::Pending => unreachable!("join_all should be ready"),
2192 }
2193 }
2194
2195 #[test]
2196 fn race_all_aborted_task_is_drained() {
2197 use std::sync::Arc;
2198 use std::task::{Context, Poll, Waker};
2199
2200 struct NoopWaker;
2201 impl std::task::Wake for NoopWaker {
2202 fn wake(self: Arc<Self>) {}
2203 }
2204
2205 let mut state = RuntimeState::new();
2206 let cx = test_cx();
2207 let region = state.create_root_region(Budget::INFINITE);
2208 let scope = test_scope(region, Budget::INFINITE);
2209
2210 let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1 }).unwrap();
2212
2213 let (h2, mut t2) = scope
2215 .spawn(&mut state, &cx, |cx| async move {
2216 struct YieldOnce(bool);
2218 impl std::future::Future for YieldOnce {
2219 type Output = ();
2220 fn poll(
2221 mut self: std::pin::Pin<&mut Self>,
2222 cx: &mut std::task::Context<'_>,
2223 ) -> std::task::Poll<()> {
2224 if self.0 {
2225 std::task::Poll::Ready(())
2226 } else {
2227 self.0 = true;
2228 cx.waker().wake_by_ref();
2229 std::task::Poll::Pending
2230 }
2231 }
2232 }
2233 YieldOnce(false).await;
2234
2235 if cx.checkpoint().is_err() {
2237 return 0; }
2239 2
2240 })
2241 .unwrap();
2242
2243 let waker = Waker::from(Arc::new(NoopWaker));
2244 let mut ctx = Context::from_waker(&waker);
2245
2246 assert!(t1.poll(&mut ctx).is_ready());
2248
2249 let handles = vec![h1, h2];
2251 let mut race_fut = Box::pin(scope.race_all(&cx, handles));
2252
2253 assert!(race_fut.as_mut().poll(&mut ctx).is_pending());
2260
2261 assert!(t2.poll(&mut ctx).is_pending());
2266
2267 assert!(race_fut.as_mut().poll(&mut ctx).is_pending());
2269
2270 assert!(t2.poll(&mut ctx).is_ready());
2279
2280 match race_fut.as_mut().poll(&mut ctx) {
2283 Poll::Ready(Ok((val, idx))) => {
2284 assert_eq!(val, 1);
2285 assert_eq!(idx, 0);
2286 }
2287 res => unreachable!("Expected Ready(Ok((1, 0))), got {res:?}"),
2288 }
2289 }
2290
2291 #[test]
2292 fn race_surfaces_loser_panic_even_if_winner_succeeds() {
2293 use std::sync::Arc;
2294 use std::task::{Context, Waker};
2295
2296 struct NoopWaker;
2297 impl std::task::Wake for NoopWaker {
2298 fn wake(self: Arc<Self>) {}
2299 }
2300
2301 let mut state = RuntimeState::new();
2302 let cx = test_cx();
2303 let region = state.create_root_region(Budget::INFINITE);
2304 let scope = test_scope(region, Budget::INFINITE);
2305
2306 let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
2307 let (h2, mut t2) = scope
2308 .spawn(&mut state, &cx, |_| async {
2309 std::panic::panic_any("loser panic");
2310 })
2311 .unwrap();
2312
2313 let waker = Waker::from(Arc::new(NoopWaker));
2314 let mut poll_cx = Context::from_waker(&waker);
2315 assert!(t1.poll(&mut poll_cx).is_ready());
2316 assert!(t2.poll(&mut poll_cx).is_ready());
2317
2318 let result = block_on(scope.race(&cx, h1, h2));
2319 assert!(
2320 matches!(result, Err(JoinError::Panicked(_))),
2321 "loser panic must dominate race result, got {result:?}"
2322 );
2323 }
2324
2325 #[test]
2326 fn race_all_surfaces_simultaneous_loser_panic() {
2327 use std::sync::Arc;
2328 use std::task::{Context, Waker};
2329
2330 struct NoopWaker;
2331 impl std::task::Wake for NoopWaker {
2332 fn wake(self: Arc<Self>) {}
2333 }
2334
2335 let mut state = RuntimeState::new();
2336 let cx = test_cx();
2337 let region = state.create_root_region(Budget::INFINITE);
2338 let scope = test_scope(region, Budget::INFINITE);
2339
2340 let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
2341 let (h2, mut t2) = scope
2342 .spawn(&mut state, &cx, |_| async {
2343 std::panic::panic_any("simultaneous loser panic");
2344 })
2345 .unwrap();
2346 let (h3, mut t3) = scope.spawn(&mut state, &cx, |_| async { 3_i32 }).unwrap();
2347
2348 let waker = Waker::from(Arc::new(NoopWaker));
2349 let mut poll_cx = Context::from_waker(&waker);
2350 assert!(t1.poll(&mut poll_cx).is_ready());
2351 assert!(t2.poll(&mut poll_cx).is_ready());
2352 assert!(t3.poll(&mut poll_cx).is_ready());
2353
2354 let result = block_on(scope.race_all(&cx, vec![h1, h2, h3]));
2355 assert!(
2356 matches!(result, Err(JoinError::Panicked(_))),
2357 "simultaneous loser panic must dominate race_all result, got {result:?}"
2358 );
2359 }
2360
2361 #[test]
2362 fn race_all_empty_is_pending() {
2363 let mut state = RuntimeState::new();
2364 let cx = test_cx();
2365 let region = state.create_root_region(Budget::INFINITE);
2366 let scope = test_scope(region, Budget::INFINITE);
2367
2368 let fut = scope.race_all::<i32>(&cx, vec![]);
2369 let waker = std::task::Waker::noop();
2370 let mut poll_cx = std::task::Context::from_waker(waker);
2371 let pinned = std::pin::pin!(fut);
2372 let status = std::future::Future::poll(pinned, &mut poll_cx);
2373 assert!(status.is_pending());
2374 }
2375}