1use crate::channel::oneshot;
100use crate::combinator::{Either, Select};
101use crate::cx::{Cx, cap};
102use crate::record::{AdmissionError, TaskRecord};
103use crate::runtime::task_handle::{JoinError, TaskHandle};
104use crate::runtime::{RegionCreateError, RuntimeState, SpawnError, StoredTask};
105use crate::tracing_compat::{debug, debug_span};
106use crate::types::{Budget, CancelReason, Outcome, PanicPayload, Policy, RegionId, TaskId};
107use std::future::Future;
108use std::marker::PhantomData;
109use std::pin::Pin;
110use std::sync::Arc;
111use std::task::{Context, Poll};
112
113pub struct Scope<'r, P: Policy = crate::types::policy::FailFast> {
121 pub(crate) region: RegionId,
123 pub(crate) budget: Budget,
125 pub(crate) _policy: PhantomData<&'r P>,
127}
128
129#[pin_project::pin_project]
130pub(crate) struct CatchUnwind<F> {
131 #[pin]
132 pub(crate) inner: F,
133}
134
135impl<F: Future> Future for CatchUnwind<F> {
136 type Output = std::thread::Result<F::Output>;
137
138 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139 let mut this = self.project();
140 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
141 this.inner.as_mut().poll(cx)
142 }));
143 match result {
144 Ok(Poll::Pending) => Poll::Pending,
145 Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
146 Err(payload) => Poll::Ready(Err(payload)),
147 }
148 }
149}
150
151pub(crate) fn payload_to_string(payload: &Box<dyn std::any::Any + Send>) -> String {
152 payload
153 .downcast_ref::<&str>()
154 .map(ToString::to_string)
155 .or_else(|| payload.downcast_ref::<String>().cloned())
156 .unwrap_or_else(|| "unknown panic".to_string())
157}
158
159struct RegionRunner<'a, Fut> {
160 fut: Pin<&'a mut CatchUnwind<Fut>>,
161 state: Option<&'a mut RuntimeState>,
162 child_region: RegionId,
163}
164
165impl<'a, Fut: Future> Future for RegionRunner<'a, Fut> {
166 type Output = (std::thread::Result<Fut::Output>, &'a mut RuntimeState);
167
168 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
169 let this = self.get_mut();
170 match this.fut.as_mut().poll(cx) {
171 Poll::Ready(res) => {
172 let state = this.state.take().expect("polled after ready");
173 Poll::Ready((res, state))
174 }
175 Poll::Pending => Poll::Pending,
176 }
177 }
178}
179
180impl<Fut> Drop for RegionRunner<'_, Fut> {
181 fn drop(&mut self) {
182 if let Some(state) = self.state.take() {
183 let reason = CancelReason::fail_fast().with_region(self.child_region);
184 let _ = state.cancel_request(self.child_region, &reason, None);
185 if let Some(region) = state.region(self.child_region) {
186 region.begin_close(None);
187 }
188 state.advance_region_state(self.child_region);
189 }
190 }
191}
192
193struct RegionCloseFuture {
194 state: Arc<parking_lot::Mutex<crate::record::region::RegionCloseState>>,
195}
196
197impl Future for RegionCloseFuture {
198 type Output = ();
199
200 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
201 let mut state = self.state.lock();
202 if state.closed {
203 Poll::Ready(())
204 } else {
205 if !state
206 .waker
207 .as_ref()
208 .is_some_and(|w| w.will_wake(cx.waker()))
209 {
210 state.waker = Some(cx.waker().clone());
211 }
212 Poll::Pending
213 }
214 }
215}
216
217impl Drop for RegionCloseFuture {
218 fn drop(&mut self) {
219 let mut state = self.state.lock();
220 state.waker = None;
221 }
222}
223
224impl<P: Policy> Scope<'_, P> {
225 #[must_use]
227 #[allow(dead_code)]
228 #[cfg_attr(feature = "test-internals", visibility::make(pub))]
229 pub(crate) fn new(region: RegionId, budget: Budget) -> Self {
230 Self {
231 region,
232 budget,
233 _policy: PhantomData,
234 }
235 }
236
237 #[must_use]
239 pub fn region_id(&self) -> RegionId {
240 self.region
241 }
242
243 #[must_use]
245 pub fn budget(&self) -> Budget {
246 self.budget
247 }
248
249 pub fn spawn<F, Fut, Caps>(
359 &self,
360 state: &mut RuntimeState,
361 cx: &Cx<Caps>,
362 f: F,
363 ) -> Result<(TaskHandle<Fut::Output>, StoredTask), SpawnError>
364 where
365 Caps: cap::HasSpawn + Send + Sync + 'static,
366 F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
367 Fut: Future + Send + 'static,
368 Fut::Output: Send + 'static,
369 {
370 let (tx, rx) = oneshot::channel::<Result<Fut::Output, JoinError>>();
372
373 let task_id = self.create_task_record(state)?;
375
376 let _span = debug_span!(
378 "task_spawn",
379 task_id = ?task_id,
380 region_id = ?self.region,
381 initial_state = "Created",
382 budget_deadline = ?self.budget.deadline,
383 budget_poll_quota = self.budget.poll_quota,
384 budget_cost_quota = ?self.budget.cost_quota,
385 budget_priority = self.budget.priority,
386 budget_source = "scope"
387 )
388 .entered();
389 debug!(
390 task_id = ?task_id,
391 region_id = ?self.region,
392 initial_state = "Created",
393 budget_deadline = ?self.budget.deadline,
394 budget_poll_quota = self.budget.poll_quota,
395 budget_cost_quota = ?self.budget.cost_quota,
396 budget_priority = self.budget.priority,
397 budget_source = "scope",
398 "task spawned"
399 );
400
401 let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
402
403 let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
405
406 if let Some(record) = state.task_mut(task_id) {
409 record.set_cx_inner(child_cx.inner.clone());
410 record.set_cx(child_cx_full.clone());
411 }
412
413 let cx_for_send = child_cx_full;
415
416 let future = {
421 struct TaskCreationGuard<'a> {
422 state: &'a mut RuntimeState,
423 task_id: TaskId,
424 region_id: RegionId,
425 committed: bool,
426 }
427
428 impl Drop for TaskCreationGuard<'_> {
429 fn drop(&mut self) {
430 if !self.committed {
431 if let Some(region) = self.state.region_mut(self.region_id) {
433 region.remove_task(self.task_id);
434 }
435 self.state.remove_task(self.task_id);
436 }
437 }
438 }
439
440 let mut guard = TaskCreationGuard {
441 state,
442 task_id,
443 region_id: self.region,
444 committed: false,
445 };
446
447 let fut = f(child_cx);
448 guard.committed = true;
449 fut
450 };
451
452 let wrapped = async move {
456 let result_result = CatchUnwind { inner: future }.await;
457 match result_result {
458 Ok(result) => {
459 let _ = tx.send(&cx_for_send, Ok(result));
460 crate::types::Outcome::Ok(())
461 }
462 Err(payload) => {
463 let msg = payload_to_string(&payload);
464 let panic_payload = PanicPayload::new(msg);
465 let _ = tx.send(
466 &cx_for_send,
467 Err(JoinError::Panicked(panic_payload.clone())),
468 );
469 crate::types::Outcome::Panicked(panic_payload)
470 }
471 }
472 };
473
474 let stored = StoredTask::new_with_id(wrapped, task_id);
476
477 Ok((handle, stored))
478 }
479
480 #[inline]
503 pub fn spawn_task<F, Fut, Caps>(
504 &self,
505 state: &mut RuntimeState,
506 cx: &Cx<Caps>,
507 f: F,
508 ) -> Result<(TaskHandle<Fut::Output>, StoredTask), SpawnError>
509 where
510 Caps: cap::HasSpawn + Send + Sync + 'static,
511 F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
512 Fut: Future + Send + 'static,
513 Fut::Output: Send + 'static,
514 {
515 self.spawn(state, cx, f)
516 }
517
518 pub fn spawn_registered<F, Fut, Caps>(
545 &self,
546 state: &mut RuntimeState,
547 cx: &Cx<Caps>,
548 f: F,
549 ) -> Result<TaskHandle<Fut::Output>, SpawnError>
550 where
551 Caps: cap::HasSpawn + Send + Sync + 'static,
552 F: FnOnce(Cx<Caps>) -> Fut + Send + 'static,
553 Fut: Future + Send + 'static,
554 Fut::Output: Send + 'static,
555 {
556 let (handle, stored) = self.spawn(state, cx, f)?;
557 state.store_spawned_task(handle.task_id(), stored);
558 Ok(handle)
559 }
560
561 #[allow(clippy::too_many_lines)]
601 pub fn spawn_local<F, Fut, Caps>(
602 &self,
603 state: &mut RuntimeState,
604 cx: &Cx<Caps>,
605 f: F,
606 ) -> Result<TaskHandle<Fut::Output>, SpawnError>
607 where
608 Caps: cap::HasSpawn + Send + Sync + 'static,
609 F: FnOnce(Cx<Caps>) -> Fut + 'static,
610 Fut: Future + 'static,
611 Fut::Output: Send + 'static,
612 {
613 use crate::runtime::stored_task::LocalStoredTask;
614 use crate::runtime::task_handle::JoinError;
615
616 let (result_tx, rx) = oneshot::channel::<Result<Fut::Output, JoinError>>();
618
619 let task_id = self.create_task_record(state)?;
621
622 let _span = debug_span!(
624 "task_spawn",
625 task_id = ?task_id,
626 region_id = ?self.region,
627 initial_state = "Created",
628 budget_deadline = ?self.budget.deadline,
629 budget_poll_quota = self.budget.poll_quota,
630 budget_cost_quota = ?self.budget.cost_quota,
631 budget_priority = self.budget.priority,
632 budget_source = "scope_local"
633 )
634 .entered();
635 debug!(
636 task_id = ?task_id,
637 region_id = ?self.region,
638 initial_state = "Created",
639 budget_deadline = ?self.budget.deadline,
640 budget_poll_quota = self.budget.poll_quota,
641 budget_cost_quota = ?self.budget.cost_quota,
642 budget_priority = self.budget.priority,
643 budget_source = "scope_local",
644 "local task spawned"
645 );
646
647 let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
648
649 let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
651
652 if let Some(record) = state.task_mut(task_id) {
654 record.set_cx_inner(child_cx.inner.clone());
655 record.set_cx(child_cx_full.clone());
656 }
657
658 let cx_for_send = child_cx_full;
660
661 let future = {
664 struct TaskCreationGuard<'a> {
665 state: &'a mut RuntimeState,
666 task_id: TaskId,
667 region_id: RegionId,
668 committed: bool,
669 }
670
671 impl Drop for TaskCreationGuard<'_> {
672 fn drop(&mut self) {
673 if !self.committed {
674 if let Some(region) = self.state.region_mut(self.region_id) {
676 region.remove_task(self.task_id);
677 }
678 self.state.remove_task(self.task_id);
679 }
680 }
681 }
682
683 let mut guard = TaskCreationGuard {
684 state,
685 task_id,
686 region_id: self.region,
687 committed: false,
688 };
689
690 let fut = f(child_cx);
691 guard.committed = true;
692 fut
693 };
694
695 let wrapped = async move {
697 let result_result = CatchUnwind { inner: future }.await;
698 match result_result {
699 Ok(result) => {
700 let _ = result_tx.send(&cx_for_send, Ok(result));
701 crate::types::Outcome::Ok(())
702 }
703 Err(payload) => {
704 let msg = payload_to_string(&payload);
705 let panic_payload = PanicPayload::new(msg);
706 let _ = result_tx.send(
707 &cx_for_send,
708 Err(JoinError::Panicked(panic_payload.clone())),
709 );
710 crate::types::Outcome::Panicked(panic_payload)
711 }
712 }
713 };
714
715 let stored = LocalStoredTask::new_with_id(wrapped, task_id);
717
718 crate::runtime::local::store_local_task(task_id, stored);
720
721 if let Some(record) = state.task_mut(task_id) {
725 if let Some(worker_id) = crate::runtime::scheduler::three_lane::current_worker_id() {
726 record.pin_to_worker(worker_id);
727 } else {
728 record.mark_local();
729 }
730 record.wake_state.notify();
731 }
732
733 let scheduled = crate::runtime::scheduler::three_lane::schedule_local_task(task_id);
736
737 if scheduled {
738 if let Some(record) = state.task(task_id) {
739 let _ = record.wake_state.notify();
740 }
741 return Ok(handle);
742 }
743
744 let _ = crate::runtime::local::remove_local_task(task_id);
746 if let Some(region) = state.region(self.region) {
747 region.remove_task(task_id);
748 }
749 state.remove_task(task_id);
750 Err(SpawnError::LocalSchedulerUnavailable)
751 }
752
753 pub fn spawn_blocking<F, R, Caps>(
787 &self,
788 state: &mut RuntimeState,
789 cx: &Cx<Caps>, f: F,
791 ) -> Result<(TaskHandle<R>, StoredTask), SpawnError>
792 where
793 Caps: cap::HasSpawn + Send + Sync + 'static,
794 F: FnOnce(Cx<Caps>) -> R + Send + 'static,
795 R: Send + 'static,
796 {
797 let (tx, rx) = oneshot::channel::<Result<R, JoinError>>();
799
800 let task_id = self.create_task_record(state)?;
802
803 debug!(
805 task_id = ?task_id,
806 region_id = ?self.region,
807 initial_state = "Created",
808 poll_quota = self.budget.poll_quota,
809 spawn_kind = "blocking",
810 "blocking task spawned"
811 );
812
813 let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
814
815 let handle = TaskHandle::new(task_id, rx, Arc::downgrade(&child_cx.inner));
817
818 if let Some(record) = state.task_mut(task_id) {
820 record.set_cx_inner(child_cx.inner.clone());
821 record.set_cx(child_cx_full.clone());
822 }
823
824 let cx_for_send = child_cx_full;
826
827 let wrapped = async move {
830 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(child_cx)));
833 match result {
834 Ok(res) => {
835 let _ = tx.send(&cx_for_send, Ok(res));
836 crate::types::Outcome::Ok(())
837 }
838 Err(payload) => {
839 let msg = payload_to_string(&payload);
840 let panic_payload = PanicPayload::new(msg);
841 let _ = tx.send(
842 &cx_for_send,
843 Err(JoinError::Panicked(panic_payload.clone())),
844 );
845 crate::types::Outcome::Panicked(panic_payload)
846 }
847 }
848 };
849
850 let stored = StoredTask::new_with_id(wrapped, task_id);
851
852 Ok((handle, stored))
853 }
854
855 pub async fn region<P2, F, Fut, T, Caps>(
872 &self,
873 state: &mut RuntimeState,
874 cx: &Cx<Caps>,
875 policy: P2,
876 f: F,
877 ) -> Result<Outcome<T, P2::Error>, RegionCreateError>
878 where
879 P2: Policy,
880 F: FnOnce(Scope<'_, P2>, &mut RuntimeState) -> Fut,
881 Fut: Future<Output = Outcome<T, P2::Error>>,
882 {
883 self.region_with_budget(state, cx, self.budget, policy, f)
884 .await
885 }
886
887 pub async fn region_with_budget<P2, F, Fut, T, Caps>(
892 &self,
893 state: &mut RuntimeState,
894 _cx: &Cx<Caps>,
895 budget: Budget,
896 _policy: P2,
897 f: F,
898 ) -> Result<Outcome<T, P2::Error>, RegionCreateError>
899 where
900 P2: Policy,
901 F: FnOnce(Scope<'_, P2>, &mut RuntimeState) -> Fut,
902 Fut: Future<Output = Outcome<T, P2::Error>>,
903 {
904 let child_region = state.create_child_region(self.region, budget)?;
905 let child_budget = state
906 .region(child_region)
907 .map_or(self.budget, crate::record::RegionRecord::budget);
908 let child_scope = Scope::<P2>::new(child_region, child_budget);
909
910 let fut_result =
911 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(child_scope, &mut *state)));
912
913 let fut = match fut_result {
914 Ok(fut) => fut,
915 Err(payload) => {
916 let reason = CancelReason::fail_fast().with_region(child_region);
917 let _ = state.cancel_request(child_region, &reason, None);
918 if let Some(region) = state.region(child_region) {
919 region.begin_close(None);
920 }
921 state.advance_region_state(child_region);
922 std::panic::resume_unwind(payload);
923 }
924 };
925
926 let pinned_fut = std::pin::pin!(CatchUnwind { inner: fut });
927
928 let runner = RegionRunner {
929 fut: pinned_fut,
930 state: Some(state),
931 child_region,
932 };
933
934 let (result, state) = runner.await;
935 let outcome = match result {
936 Ok(outcome) => outcome,
937 Err(payload) => {
938 let msg = payload_to_string(&payload);
939 Outcome::Panicked(PanicPayload::new(msg))
940 }
941 };
942
943 match &outcome {
944 Outcome::Ok(_) => {
945 if let Some(region) = state.region(child_region) {
946 region.begin_close(None);
947 }
948 }
949 Outcome::Cancelled(reason) => {
950 let _ = state.cancel_request(child_region, reason, None);
951 if let Some(region) = state.region(child_region) {
952 region.begin_close(None);
953 }
954 }
955 Outcome::Err(_) | Outcome::Panicked(_) => {
956 let reason = CancelReason::fail_fast().with_region(child_region);
957 let _ = state.cancel_request(child_region, &reason, None);
958 if let Some(region) = state.region(child_region) {
959 region.begin_close(None);
960 }
961 }
962 }
963
964 let close_notify = state.region(child_region).map(|r| r.close_notify.clone());
965 state.advance_region_state(child_region);
966
967 if let Some(notify) = close_notify {
968 RegionCloseFuture { state: notify }.await;
969 }
970
971 Ok(outcome)
972 }
973
974 pub async fn join<T1, T2>(
990 &self,
991 cx: &Cx,
992 mut h1: TaskHandle<T1>,
993 mut h2: TaskHandle<T2>,
994 ) -> (Result<T1, JoinError>, Result<T2, JoinError>) {
995 let mut f1 = h1.join(cx);
996 let mut f2 = h2.join(cx);
997 let r1 = std::pin::Pin::new(&mut f1).await;
998 let r2 = std::pin::Pin::new(&mut f2).await;
999 (r1, r2)
1000 }
1001
1002 fn best_effort_poll_loser_join<T>(cx: &Cx, handle: &mut TaskHandle<T>) {
1016 let mut drain = std::pin::pin!(handle.join(cx));
1017 let waker = std::task::Waker::noop();
1018 let mut poll_cx = std::task::Context::from_waker(waker);
1019 let _ = drain.as_mut().poll(&mut poll_cx);
1020 }
1021
1022 pub async fn race<T>(
1024 &self,
1025 cx: &Cx,
1026 mut h1: TaskHandle<T>,
1027 mut h2: TaskHandle<T>,
1028 ) -> Result<T, JoinError> {
1029 let winner = {
1030 let f1 = h1.join_with_drop_reason(cx, CancelReason::race_loser());
1031 let mut f1 = std::pin::pin!(f1);
1032 let f2 = h2.join_with_drop_reason(cx, CancelReason::race_loser());
1033 let mut f2 = std::pin::pin!(f2);
1034 Select::new(f1.as_mut(), f2.as_mut())
1035 .await
1036 .map_err(|_| JoinError::PolledAfterCompletion)?
1037 };
1038
1039 match winner {
1040 Either::Left(res) => {
1041 if matches!(&res, Err(JoinError::Panicked(_)))
1042 && crate::runtime::scheduler::three_lane::current_worker_id().is_none()
1043 {
1044 Self::best_effort_poll_loser_join(cx, &mut h2);
1049 return res;
1050 }
1051 let loser_res = h2.join(cx).await;
1052 if let Err(JoinError::Panicked(p)) = res {
1053 Err(JoinError::Panicked(p))
1054 } else if let Err(JoinError::Panicked(p)) = loser_res {
1055 Err(JoinError::Panicked(p))
1056 } else {
1057 res
1058 }
1059 }
1060 Either::Right(res) => {
1061 if matches!(&res, Err(JoinError::Panicked(_)))
1062 && crate::runtime::scheduler::three_lane::current_worker_id().is_none()
1063 {
1064 Self::best_effort_poll_loser_join(cx, &mut h1);
1066 return res;
1067 }
1068 let loser_res = h1.join(cx).await;
1069 if let Err(JoinError::Panicked(p)) = res {
1070 Err(JoinError::Panicked(p))
1071 } else if let Err(JoinError::Panicked(p)) = loser_res {
1072 Err(JoinError::Panicked(p))
1073 } else {
1074 res
1075 }
1076 }
1077 }
1078 }
1079
1080 pub async fn hedge<F1, Fut1, F2, Fut2, T>(
1099 &self,
1100 state: &mut RuntimeState,
1101 cx: &Cx,
1102 delay: std::time::Duration,
1103 primary: F1,
1104 backup: F2,
1105 ) -> Result<T, JoinError>
1106 where
1107 F1: FnOnce(Cx) -> Fut1 + Send + 'static,
1108 Fut1: Future<Output = T> + Send + 'static,
1109 F2: FnOnce(Cx) -> Fut2 + Send + 'static,
1110 Fut2: Future<Output = T> + Send + 'static,
1111 T: Send + 'static,
1112 {
1113 use crate::combinator::Either;
1114 use crate::combinator::select::Select;
1115 let mut h1 = self
1117 .spawn_registered(state, cx, primary)
1118 .map_err(|_| JoinError::Cancelled(CancelReason::resource_unavailable()))?;
1119
1120 let primary_or_delay = {
1123 let f1_primary = h1.join(cx);
1124 let mut f1_primary = std::pin::pin!(f1_primary);
1125
1126 let now = cx
1127 .timer_driver()
1128 .map_or_else(crate::time::wall_now, |d| d.now());
1129 let sleep_fut = crate::time::sleep(now, delay);
1130 let mut sleep_pinned = std::pin::pin!(sleep_fut);
1131
1132 let res = Select::new(f1_primary.as_mut(), sleep_pinned.as_mut())
1133 .await
1134 .map_err(|_| JoinError::PolledAfterCompletion)?;
1135 if matches!(res, Either::Right(())) {
1136 f1_primary.defuse_drop_abort();
1137 }
1138 res
1139 };
1140
1141 match primary_or_delay {
1142 Either::Left(res) => {
1143 res
1145 }
1146 Either::Right(()) => {
1147 let Ok(mut h2) = self.spawn_registered(state, cx, backup) else {
1149 h1.abort_with_reason(CancelReason::resource_unavailable());
1152
1153 if crate::runtime::scheduler::three_lane::current_worker_id().is_some() {
1154 match h1.join(cx).await {
1157 Ok(res) => return Ok(res),
1158 Err(JoinError::Panicked(p)) => return Err(JoinError::Panicked(p)),
1159 Err(JoinError::Cancelled(_) | JoinError::PolledAfterCompletion) => {}
1160 }
1161 } else {
1162 let mut drain = std::pin::pin!(h1.join(cx));
1166 let waker = std::task::Waker::noop();
1167 let mut poll_cx = Context::from_waker(waker);
1168 match drain.as_mut().poll(&mut poll_cx) {
1169 std::task::Poll::Ready(Ok(res)) => return Ok(res),
1170 std::task::Poll::Ready(Err(JoinError::Panicked(p))) => {
1171 return Err(JoinError::Panicked(p));
1172 }
1173 _ => {}
1174 }
1175 }
1176
1177 return Err(JoinError::Cancelled(CancelReason::resource_unavailable()));
1178 };
1179
1180 let race_outcome = {
1182 let f1_race = h1.join_with_drop_reason(cx, CancelReason::race_loser());
1183 let mut f1_race = std::pin::pin!(f1_race);
1184 let f2_race = h2.join_with_drop_reason(cx, CancelReason::race_loser());
1185 let mut f2_race = std::pin::pin!(f2_race);
1186 Select::new(f1_race.as_mut(), f2_race.as_mut())
1187 .await
1188 .map_err(|_| JoinError::PolledAfterCompletion)?
1189 };
1190
1191 match race_outcome {
1192 Either::Left(res) => {
1193 if matches!(&res, Err(JoinError::Panicked(_)))
1194 && crate::runtime::scheduler::three_lane::current_worker_id().is_none()
1195 {
1196 Self::best_effort_poll_loser_join(cx, &mut h2);
1197 return res;
1198 }
1199 let loser_res = h2.join(cx).await;
1200 if let Err(JoinError::Panicked(p)) = res {
1201 Err(JoinError::Panicked(p))
1202 } else if let Err(JoinError::Panicked(p)) = loser_res {
1203 Err(JoinError::Panicked(p))
1204 } else {
1205 res
1206 }
1207 }
1208 Either::Right(res) => {
1209 if matches!(&res, Err(JoinError::Panicked(_)))
1210 && crate::runtime::scheduler::three_lane::current_worker_id().is_none()
1211 {
1212 Self::best_effort_poll_loser_join(cx, &mut h1);
1213 return res;
1214 }
1215 let loser_res = h1.join(cx).await;
1216 if let Err(JoinError::Panicked(p)) = res {
1217 Err(JoinError::Panicked(p))
1218 } else if let Err(JoinError::Panicked(p)) = loser_res {
1219 Err(JoinError::Panicked(p))
1220 } else {
1221 res
1222 }
1223 }
1224 }
1225 }
1226 }
1227 }
1228
1229 pub async fn race_all<T>(
1241 &self,
1242 cx: &Cx,
1243 handles: Vec<TaskHandle<T>>,
1244 ) -> Result<(T, usize), JoinError> {
1245 let mut handles = handles;
1246 if handles.is_empty() {
1247 return std::future::pending().await;
1248 }
1249
1250 let mut futures: Vec<_> = handles
1251 .iter_mut()
1252 .map(|h| h.join_with_drop_reason(cx, CancelReason::race_loser()))
1253 .collect();
1254 let mut ready_results: Vec<Option<Result<T, JoinError>>> = std::iter::repeat_with(|| None)
1255 .take(futures.len())
1256 .collect();
1257
1258 let winner_idx = std::future::poll_fn(|poll_cx| {
1262 let mut newly_ready = Vec::new();
1263
1264 for (i, future) in futures.iter_mut().enumerate() {
1265 if ready_results[i].is_some() {
1266 continue;
1267 }
1268 if let std::task::Poll::Ready(res) = std::pin::Pin::new(future).poll(poll_cx) {
1269 ready_results[i] = Some(res);
1270 newly_ready.push(i);
1271 }
1272 }
1273
1274 if newly_ready.is_empty() {
1275 std::task::Poll::Pending
1276 } else {
1277 let chosen = newly_ready[cx.random_usize(newly_ready.len())];
1279 std::task::Poll::Ready(chosen)
1280 }
1281 })
1282 .await;
1283
1284 let winner_result = ready_results[winner_idx]
1285 .take()
1286 .expect("winner index must have a ready result");
1287
1288 drop(futures);
1291
1292 let mut loser_panic = None;
1295 let mut pending_loser_indices = Vec::new();
1296 for (i, handle) in handles.iter_mut().enumerate() {
1297 if i == winner_idx {
1298 continue;
1299 }
1300 if let Some(res) = ready_results[i].take() {
1301 if let Err(JoinError::Panicked(p)) = res {
1302 if loser_panic.is_none() {
1303 loser_panic = Some(p);
1304 }
1305 }
1306 } else if handle.is_finished() {
1307 let res = handle.join(cx).await;
1308 if let Err(JoinError::Panicked(p)) = res {
1309 if loser_panic.is_none() {
1310 loser_panic = Some(p);
1311 }
1312 }
1313 } else {
1314 pending_loser_indices.push(i);
1315 }
1316 }
1317
1318 for &idx in &pending_loser_indices {
1322 handles[idx].abort_with_reason(CancelReason::race_loser());
1323 }
1324 if matches!(&winner_result, Err(JoinError::Panicked(_)))
1325 && crate::runtime::scheduler::three_lane::current_worker_id().is_none()
1326 {
1327 for idx in pending_loser_indices {
1332 Self::best_effort_poll_loser_join(cx, &mut handles[idx]);
1333 }
1334 return winner_result.map(|val| (val, winner_idx));
1335 }
1336 for idx in pending_loser_indices {
1337 let res = handles[idx].join(cx).await;
1338 if let Err(JoinError::Panicked(p)) = res {
1339 if loser_panic.is_none() {
1340 loser_panic = Some(p);
1341 }
1342 }
1343 }
1344
1345 let winner_result = winner_result.map(|val| (val, winner_idx));
1346 if matches!(&winner_result, Err(JoinError::Panicked(_))) {
1347 return winner_result;
1348 }
1349
1350 loser_panic.map_or(winner_result, |panic_payload| {
1351 Err(JoinError::Panicked(panic_payload))
1352 })
1353 }
1354
1355 pub async fn join_all<T>(
1359 &self,
1360 cx: &Cx,
1361 mut handles: Vec<TaskHandle<T>>,
1362 ) -> Vec<Result<T, JoinError>> {
1363 let mut futures: Vec<_> = handles.iter_mut().map(|h| h.join(cx)).collect();
1364 let mut results = Vec::with_capacity(futures.len());
1365 for fut in &mut futures {
1366 results.push(std::pin::Pin::new(fut).await);
1367 }
1368 results
1369 }
1370
1371 pub(crate) fn build_child_task_cx<Caps>(
1372 &self,
1373 state: &RuntimeState,
1374 parent_cx: &Cx<Caps>,
1375 task_id: TaskId,
1376 ) -> (Cx<Caps>, Cx<cap::All>) {
1377 let child_observability = parent_cx.child_observability(self.region, task_id);
1378 let child_entropy = parent_cx.child_entropy(task_id);
1379 let io_driver = state.io_driver_handle();
1380 let timer_driver = state.timer_driver_handle();
1381 let logical_clock = state
1382 .logical_clock_mode()
1383 .build_handle(timer_driver.clone());
1384
1385 let child_cx = Cx::<Caps>::new_with_drivers(
1386 self.region,
1387 task_id,
1388 self.budget,
1389 Some(child_observability),
1390 io_driver,
1391 parent_cx.io_cap_handle(),
1392 timer_driver,
1393 Some(child_entropy),
1394 )
1395 .with_logical_clock(logical_clock)
1396 .with_registry_handle(parent_cx.registry_handle())
1397 .with_remote_cap_handle(parent_cx.remote_cap_handle())
1398 .with_blocking_pool_handle(parent_cx.blocking_pool_handle())
1399 .with_evidence_sink(parent_cx.evidence_sink_handle())
1400 .with_macaroon_handle(parent_cx.macaroon_handle());
1401 let child_cx = if let Some(pressure) = parent_cx.pressure_handle() {
1402 child_cx.with_pressure(pressure)
1403 } else {
1404 child_cx
1405 };
1406 child_cx.set_trace_buffer(state.trace_handle());
1407 let child_cx_full = child_cx.retype::<cap::All>();
1408
1409 (child_cx, child_cx_full)
1410 }
1411
1412 pub(crate) fn create_task_record(
1416 &self,
1417 state: &mut RuntimeState,
1418 ) -> Result<TaskId, SpawnError> {
1419 use crate::util::ArenaIndex;
1420
1421 let now = state
1422 .timer_driver()
1423 .map_or(state.now, crate::time::TimerDriverHandle::now);
1424
1425 let idx = state.insert_task(TaskRecord::new_with_time(
1427 TaskId::from_arena(ArenaIndex::new(0, 0)), self.region,
1429 self.budget,
1430 now,
1431 ));
1432
1433 let task_id = TaskId::from_arena(idx);
1435
1436 if let Some(record) = state.task_mut(task_id) {
1438 record.id = task_id;
1439 }
1440
1441 if let Some(region) = state.region(self.region) {
1443 if let Err(err) = region.add_task(task_id) {
1444 state.remove_task(task_id);
1446 return Err(match err {
1447 AdmissionError::Closed => SpawnError::RegionClosed(self.region),
1448 AdmissionError::LimitReached { limit, live, .. } => {
1449 SpawnError::RegionAtCapacity {
1450 region: self.region,
1451 limit,
1452 live,
1453 }
1454 }
1455 });
1456 }
1457 } else {
1458 state.remove_task(task_id);
1460 return Err(SpawnError::RegionNotFound(self.region));
1461 }
1462
1463 state.record_task_spawn(task_id, self.region);
1464
1465 Ok(task_id)
1466 }
1467
1468 pub fn defer_sync<F>(&self, state: &mut RuntimeState, f: F) -> bool
1492 where
1493 F: FnOnce() + Send + 'static,
1494 {
1495 state.register_sync_finalizer(self.region, f)
1496 }
1497
1498 pub fn defer_async<F>(&self, state: &mut RuntimeState, future: F) -> bool
1519 where
1520 F: Future<Output = ()> + Send + 'static,
1521 {
1522 state.register_async_finalizer(self.region, future)
1523 }
1524}
1525
1526impl<P: Policy> std::fmt::Debug for Scope<'_, P> {
1527 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1528 f.debug_struct("Scope")
1529 .field("region", &self.region)
1530 .field("budget", &self.budget)
1531 .finish()
1532 }
1533}
1534
1535#[cfg(test)]
1536mod tests {
1537 use super::*;
1538 use crate::record::RegionLimits;
1539 use crate::runtime::RuntimeState;
1540 use crate::types::{CancelKind, Outcome, Time};
1541 use crate::util::ArenaIndex;
1542 use futures_lite::future::block_on;
1543 use std::sync::Arc;
1544
1545 fn test_cx() -> Cx {
1546 Cx::new(
1547 RegionId::from_arena(ArenaIndex::new(0, 0)),
1548 TaskId::from_arena(ArenaIndex::new(0, 0)),
1549 Budget::INFINITE,
1550 )
1551 }
1552
1553 fn test_scope(region: RegionId, budget: Budget) -> Scope<'static> {
1554 Scope::new(region, budget)
1555 }
1556
1557 #[test]
1558 fn spawn_creates_task_record() {
1559 let mut state = RuntimeState::new();
1560 let cx = test_cx();
1561 let region = state.create_root_region(Budget::INFINITE);
1562 let scope = test_scope(region, Budget::INFINITE);
1563
1564 let (handle, _stored) = scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1565
1566 let task = state.task(handle.task_id());
1568 assert!(task.is_some());
1569
1570 let task = task.unwrap();
1572 assert_eq!(task.owner, region);
1573 }
1574
1575 #[test]
1576 fn spawn_inherits_registry_and_remote_capabilities() {
1577 use crate::cx::registry::RegistryHandle;
1578 use crate::remote::{NodeId, RemoteCap};
1579 use std::task::Context;
1580
1581 let mut state = RuntimeState::new();
1582
1583 let registry = crate::cx::NameRegistry::new();
1584 let registry_handle = RegistryHandle::new(Arc::new(registry));
1585 let parent_registry_arc = registry_handle.as_arc();
1586
1587 let cx = test_cx()
1588 .with_registry_handle(Some(registry_handle))
1589 .with_remote_cap(RemoteCap::new().with_local_node(NodeId::new("origin-test")));
1590
1591 let region = state.create_root_region(Budget::INFINITE);
1592 let scope = test_scope(region, Budget::INFINITE);
1593
1594 let mut handle = scope
1595 .spawn_registered(&mut state, &cx, move |cx| async move {
1596 let child_registry = cx.registry_handle().expect("child must inherit registry");
1597 let child_registry_arc = child_registry.as_arc();
1598 let same_registry = Arc::ptr_eq(&child_registry_arc, &parent_registry_arc);
1599
1600 let child_remote = cx.remote().expect("child must inherit remote cap");
1601 let origin = child_remote.local_node().as_str().to_owned();
1602
1603 (same_registry, origin)
1604 })
1605 .unwrap();
1606
1607 let waker = std::task::Waker::noop().clone();
1608 let mut poll_cx = Context::from_waker(&waker);
1609
1610 let stored = state
1611 .get_stored_future(handle.task_id())
1612 .expect("spawn_registered must store the task");
1613 assert!(stored.poll(&mut poll_cx).is_ready());
1614
1615 let mut join_fut = std::pin::pin!(handle.join(&cx));
1616 match join_fut.as_mut().poll(&mut poll_cx) {
1617 Poll::Ready(Ok((same_registry, origin))) => {
1618 assert!(
1619 same_registry,
1620 "child should observe the same RegistryCap instance"
1621 );
1622 assert_eq!(origin, "origin-test");
1623 }
1624 other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1625 }
1626 }
1627
1628 #[test]
1629 fn spawn_inherits_runtime_timer_driver() {
1630 use std::task::Context;
1631
1632 let mut state = RuntimeState::new();
1633 let clock = Arc::new(crate::time::VirtualClock::new());
1634 state.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(clock));
1635
1636 let cx = test_cx();
1637 let region = state.create_root_region(Budget::INFINITE);
1638 let scope = test_scope(region, Budget::INFINITE);
1639
1640 let (mut handle, mut stored) = scope
1641 .spawn(&mut state, &cx, |cx| async move { cx.has_timer() })
1642 .expect("spawn should succeed");
1643
1644 let waker = std::task::Waker::noop().clone();
1645 let mut poll_cx = Context::from_waker(&waker);
1646 assert!(stored.poll(&mut poll_cx).is_ready());
1647
1648 let mut join_fut = std::pin::pin!(handle.join(&cx));
1649 match join_fut.as_mut().poll(&mut poll_cx) {
1650 Poll::Ready(Ok(has_timer)) => assert!(has_timer),
1651 other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1652 }
1653 }
1654
1655 #[test]
1656 fn create_task_record_uses_runtime_timer_driver_time() {
1657 let mut state = RuntimeState::new();
1658 let clock = Arc::new(crate::time::VirtualClock::starting_at(Time::from_millis(
1659 11,
1660 )));
1661 state.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(
1662 clock.clone(),
1663 ));
1664
1665 let region = state.create_root_region(Budget::INFINITE);
1666 let scope = test_scope(region, Budget::INFINITE);
1667
1668 clock.advance(Time::from_millis(7).as_nanos());
1669 let task_id = scope
1670 .create_task_record(&mut state)
1671 .expect("task record should be created");
1672
1673 let task = state.task(task_id).expect("task record");
1674 assert_eq!(task.created_at, Time::from_millis(18));
1675 }
1676
1677 #[test]
1678 fn spawn_blocking_inherits_runtime_timer_driver() {
1679 use std::task::Context;
1680
1681 let mut state = RuntimeState::new();
1682 let clock = Arc::new(crate::time::VirtualClock::new());
1683 state.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(clock));
1684
1685 let cx = test_cx();
1686 let region = state.create_root_region(Budget::INFINITE);
1687 let scope = test_scope(region, Budget::INFINITE);
1688
1689 let (mut handle, mut stored) = scope
1690 .spawn_blocking(&mut state, &cx, |cx| cx.has_timer())
1691 .expect("spawn_blocking should succeed");
1692
1693 let waker = std::task::Waker::noop().clone();
1694 let mut poll_cx = Context::from_waker(&waker);
1695 assert!(stored.poll(&mut poll_cx).is_ready());
1696
1697 let mut join_fut = std::pin::pin!(handle.join(&cx));
1698 match join_fut.as_mut().poll(&mut poll_cx) {
1699 Poll::Ready(Ok(has_timer)) => assert!(has_timer),
1700 other => unreachable!("Expected Ready(Ok(_)), got {other:?}"),
1701 }
1702 }
1703
1704 #[test]
1705 fn spawn_registered_stores_task() {
1706 let mut state = RuntimeState::new();
1707 let cx = test_cx();
1708 let region = state.create_root_region(Budget::INFINITE);
1709 let scope = test_scope(region, Budget::INFINITE);
1710
1711 let handle = scope
1713 .spawn_registered(&mut state, &cx, |_| async { 42_i32 })
1714 .unwrap();
1715
1716 let task = state.task(handle.task_id());
1718 assert!(task.is_some());
1719 assert_eq!(task.unwrap().owner, region);
1720
1721 let stored = state.get_stored_future(handle.task_id());
1723 assert!(stored.is_some(), "spawn_registered should store the task");
1724 }
1725
1726 #[test]
1727 fn spawn_registered_task_can_be_polled() {
1728 use std::task::Context;
1729
1730 let mut state = RuntimeState::new();
1731 let cx = test_cx();
1732 let region = state.create_root_region(Budget::INFINITE);
1733 let scope = test_scope(region, Budget::INFINITE);
1734
1735 let mut handle = scope
1736 .spawn_registered(&mut state, &cx, |_| async { 42_i32 })
1737 .unwrap();
1738
1739 let waker = std::task::Waker::noop().clone();
1741 let mut poll_cx = Context::from_waker(&waker);
1742
1743 let stored = state.get_stored_future(handle.task_id()).unwrap();
1744 let poll_result = stored.poll(&mut poll_cx);
1745 assert!(
1746 poll_result.is_ready(),
1747 "Simple async should complete in one poll"
1748 );
1749
1750 let mut join_fut = std::pin::pin!(handle.join(&cx));
1752 match join_fut.as_mut().poll(&mut poll_cx) {
1753 Poll::Ready(Ok(val)) => assert_eq!(val, 42),
1754 other => unreachable!("Expected Ready(Ok(42)), got {other:?}"),
1755 }
1756 }
1757
1758 #[test]
1759 fn spawn_blocking_creates_task_record() {
1760 let mut state = RuntimeState::new();
1761 let cx = test_cx();
1762 let region = state.create_root_region(Budget::INFINITE);
1763 let scope = test_scope(region, Budget::INFINITE);
1764
1765 let (handle, _stored) = scope.spawn_blocking(&mut state, &cx, |_| 42_i32).unwrap();
1766
1767 let task = state.task(handle.task_id());
1769 assert!(task.is_some());
1770 assert_eq!(task.unwrap().owner, region);
1771 }
1772
1773 #[test]
1774 fn spawn_local_creates_task_record() {
1775 let mut state = RuntimeState::new();
1776 let cx = test_cx();
1777 let region = state.create_root_region(Budget::INFINITE);
1778 let scope = test_scope(region, Budget::INFINITE);
1779
1780 let local_ready = Arc::new(parking_lot::Mutex::new(std::collections::VecDeque::new()));
1781 let _local_ready_guard =
1782 crate::runtime::scheduler::three_lane::ScopedLocalReady::new(Arc::clone(&local_ready));
1783 let _worker_guard = crate::runtime::scheduler::three_lane::ScopedWorkerId::new(1);
1784
1785 let handle = scope
1788 .spawn_local(&mut state, &cx, |_| async move { 42_i32 })
1789 .unwrap();
1790
1791 let task = state.task(handle.task_id());
1793 assert!(task.is_some());
1794 assert_eq!(task.unwrap().owner, region);
1795 }
1796
1797 #[test]
1798 fn spawn_local_without_scheduler_fails_and_rolls_back() {
1799 let mut state = RuntimeState::new();
1800 let cx = test_cx();
1801 let region = state.create_root_region(Budget::INFINITE);
1802 let scope = test_scope(region, Budget::INFINITE);
1803
1804 let result = scope.spawn_local(&mut state, &cx, |_| async move { 5_i32 });
1805 assert!(matches!(result, Err(SpawnError::LocalSchedulerUnavailable)));
1806
1807 assert!(state.tasks_is_empty());
1809 let region_record = state.region(region).unwrap();
1810 assert!(region_record.task_ids().is_empty());
1811 }
1812
1813 #[test]
1814 fn spawn_local_makes_progress_via_local_ready() {
1815 use std::sync::Arc;
1816 use std::task::Context;
1817
1818 let mut state = RuntimeState::new();
1819 let cx = test_cx();
1820 let region = state.create_root_region(Budget::INFINITE);
1821 let scope = test_scope(region, Budget::INFINITE);
1822
1823 let local_ready = Arc::new(parking_lot::Mutex::new(std::collections::VecDeque::new()));
1824 let _local_ready_guard =
1825 crate::runtime::scheduler::three_lane::ScopedLocalReady::new(Arc::clone(&local_ready));
1826 let _worker_guard = crate::runtime::scheduler::three_lane::ScopedWorkerId::new(1);
1827
1828 let mut handle = scope
1829 .spawn_local(&mut state, &cx, |_| async move { 7_i32 })
1830 .unwrap();
1831
1832 let queued = {
1833 let queue = local_ready.lock();
1834 queue.contains(&handle.task_id())
1835 };
1836 assert!(queued, "spawn_local should enqueue into local_ready");
1837
1838 let task_id = {
1839 let mut queue = local_ready.lock();
1840 queue
1841 .pop_front()
1842 .expect("local_ready should contain spawned task")
1843 };
1844
1845 let mut join_fut = std::pin::pin!(handle.join(&cx));
1846 let waker = std::task::Waker::noop().clone();
1847 let mut ctx = Context::from_waker(&waker);
1848
1849 assert!(join_fut.as_mut().poll(&mut ctx).is_pending());
1850
1851 let mut local_task =
1852 crate::runtime::local::remove_local_task(task_id).expect("local task missing");
1853 assert!(local_task.poll(&mut ctx).is_ready());
1854
1855 match join_fut.as_mut().poll(&mut ctx) {
1856 Poll::Ready(Ok(val)) => assert_eq!(val, 7),
1857 res => unreachable!("Expected Ready(Ok(7)), got {res:?}"),
1858 }
1859 }
1860
1861 #[test]
1862 fn task_added_to_region() {
1863 let mut state = RuntimeState::new();
1864 let cx = test_cx();
1865 let region = state.create_root_region(Budget::INFINITE);
1866 let scope = test_scope(region, Budget::INFINITE);
1867
1868 let (handle, _stored) = scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1869
1870 let region_record = state.region(region).unwrap();
1872 assert!(region_record.task_ids().contains(&handle.task_id()));
1873 }
1874
1875 #[test]
1876 fn multiple_spawns_create_distinct_tasks() {
1877 let mut state = RuntimeState::new();
1878 let cx = test_cx();
1879 let region = state.create_root_region(Budget::INFINITE);
1880 let scope = test_scope(region, Budget::INFINITE);
1881
1882 let (handle1, _) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
1883 let (handle2, _) = scope.spawn(&mut state, &cx, |_| async { 2_i32 }).unwrap();
1884 let (handle3, _) = scope.spawn(&mut state, &cx, |_| async { 3_i32 }).unwrap();
1885
1886 assert_ne!(handle1.task_id(), handle2.task_id());
1888 assert_ne!(handle2.task_id(), handle3.task_id());
1889 assert_ne!(handle1.task_id(), handle3.task_id());
1890
1891 let region_record = state.region(region).unwrap();
1893 assert!(region_record.task_ids().contains(&handle1.task_id()));
1894 assert!(region_record.task_ids().contains(&handle2.task_id()));
1895 assert!(region_record.task_ids().contains(&handle3.task_id()));
1896 }
1897
1898 #[test]
1899 fn spawn_into_closing_region_should_fail() {
1900 let mut state = RuntimeState::new();
1901 let cx = test_cx();
1902 let region = state.create_root_region(Budget::INFINITE);
1903 let scope = test_scope(region, Budget::INFINITE);
1904
1905 let region_record = state.region_mut(region).expect("region");
1907 region_record.begin_close(None);
1908
1909 let result = scope.spawn(&mut state, &cx, |_| async { 42 });
1911 assert!(matches!(result, Err(SpawnError::RegionClosed(_))));
1912 }
1913
1914 #[test]
1915 fn test_join_manual_poll() {
1916 use std::task::Context;
1917
1918 let mut state = RuntimeState::new();
1919 let cx = test_cx();
1920 let region = state.create_root_region(Budget::INFINITE);
1921 let scope = test_scope(region, Budget::INFINITE);
1922
1923 let (mut handle, mut stored_task) =
1925 scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
1926 let mut join_fut = std::pin::pin!(handle.join(&cx));
1930
1931 let waker = std::task::Waker::noop().clone();
1933 let mut ctx = Context::from_waker(&waker);
1934
1935 assert!(join_fut.as_mut().poll(&mut ctx).is_pending());
1937
1938 assert!(stored_task.poll(&mut ctx).is_ready());
1940
1941 match join_fut.as_mut().poll(&mut ctx) {
1943 Poll::Ready(Ok(val)) => assert_eq!(val, 42),
1944 other => unreachable!("Expected Ready(Ok(42)), got {other:?}"),
1945 }
1946 }
1947
1948 #[test]
1949 fn spawn_abort_cancels_task() {
1950 use std::task::{Context, Poll};
1951
1952 let mut state = RuntimeState::new();
1953 let cx = test_cx();
1954 let region = state.create_root_region(Budget::INFINITE);
1955 let scope = test_scope(region, Budget::INFINITE);
1956
1957 let (mut handle, mut stored_task) = scope
1959 .spawn(&mut state, &cx, |cx| async move {
1960 if cx.checkpoint().is_err() {
1962 return "cancelled";
1963 }
1964 "finished"
1965 })
1966 .unwrap();
1967
1968 handle.abort();
1970
1971 let waker = std::task::Waker::noop().clone();
1973 let mut ctx = Context::from_waker(&waker);
1974
1975 match stored_task.poll(&mut ctx) {
1977 Poll::Ready(crate::types::Outcome::Ok(())) => {}
1978 res => unreachable!("Task should have completed with Ok(()), got {res:?}"),
1979 }
1980
1981 let mut join_fut = std::pin::pin!(handle.join(&cx));
1983 match join_fut.as_mut().poll(&mut ctx) {
1984 Poll::Ready(Ok(val)) => assert_eq!(val, "cancelled"),
1985 Poll::Ready(Err(e)) => unreachable!("Task failed unexpectedly: {e}"),
1986 Poll::Pending => unreachable!("Join should be ready"),
1987 }
1988 }
1989
1990 #[test]
1991 fn hedge_backup_spawn_failure_aborts_primary() {
1992 let mut state = RuntimeState::new();
1993 let cx = test_cx();
1994 let region = state.create_root_region(Budget::INFINITE);
1995 let scope = test_scope(region, Budget::INFINITE);
1996
1997 let limits = RegionLimits {
1998 max_tasks: Some(1),
1999 ..RegionLimits::unlimited()
2000 };
2001 assert!(state.set_region_limits(region, limits));
2002
2003 let result = block_on(scope.hedge(
2004 &mut state,
2005 &cx,
2006 std::time::Duration::ZERO,
2007 |_| async { 1_u8 },
2008 |_| async { 2_u8 },
2009 ));
2010
2011 assert!(matches!(
2012 result,
2013 Err(JoinError::Cancelled(reason))
2014 if reason.kind == CancelKind::ResourceUnavailable
2015 ));
2016
2017 let task_id = *state
2018 .region(region)
2019 .expect("region missing")
2020 .task_ids()
2021 .first()
2022 .expect("primary task should remain tracked");
2023
2024 let task = state.task(task_id).expect("primary task record missing");
2025 let (cancel_requested, cancel_reason_kind) = {
2026 let inner = task
2027 .cx_inner
2028 .as_ref()
2029 .expect("primary task must have shared Cx inner")
2030 .read();
2031 (
2032 inner.cancel_requested,
2033 inner.cancel_reason.as_ref().map(|r| r.kind),
2034 )
2035 };
2036
2037 assert!(
2038 cancel_requested,
2039 "primary task must be cancellation-requested when backup spawn fails"
2040 );
2041 assert_eq!(cancel_reason_kind, Some(CancelKind::ResourceUnavailable));
2042 }
2043
2044 #[test]
2045 fn region_closes_empty_child() {
2046 let mut state = RuntimeState::new();
2047 let cx = test_cx();
2048 let parent = state.create_root_region(Budget::INFINITE);
2049 let scope = test_scope(parent, Budget::INFINITE);
2050
2051 let outcome = block_on(scope.region(
2052 &mut state,
2053 &cx,
2054 crate::types::policy::FailFast,
2055 |child, _state| {
2056 let child_id = child.region_id();
2057 async move { Outcome::Ok(child_id) }
2058 },
2059 ))
2060 .expect("child region created");
2061
2062 let child_id = match outcome {
2063 Outcome::Ok(id) => id,
2064 other => unreachable!("expected Outcome::Ok(child_id), got {other:?}"),
2065 };
2066
2067 assert!(
2068 state.region(child_id).is_none(),
2069 "closed child region should be reclaimed from arena"
2070 );
2071
2072 let parent_record = state.region(parent).expect("parent record missing");
2073 assert!(
2074 !parent_record.child_ids().contains(&child_id),
2075 "closed child should be removed from parent"
2076 );
2077 }
2078
2079 #[test]
2080 fn region_budget_is_met_with_parent() {
2081 let mut state = RuntimeState::new();
2082 let cx = test_cx();
2083 let parent = state.create_root_region(Budget::with_deadline_secs(10));
2084 let scope = test_scope(parent, Budget::with_deadline_secs(10));
2085
2086 let outcome = block_on(scope.region_with_budget(
2087 &mut state,
2088 &cx,
2089 Budget::with_deadline_secs(30),
2090 crate::types::policy::FailFast,
2091 |child, _state| {
2092 let child_id = child.region_id();
2093 let child_budget = child.budget();
2094 async move { Outcome::Ok((child_id, child_budget)) }
2095 },
2096 ))
2097 .expect("child region created");
2098
2099 let (child_id, child_budget) = match outcome {
2100 Outcome::Ok(tuple) => tuple,
2101 other => unreachable!("expected Outcome::Ok(child_id), got {other:?}"),
2102 };
2103
2104 assert_eq!(
2105 child_budget.deadline,
2106 Some(crate::types::Time::from_secs(10))
2107 );
2108 assert!(
2109 state.region(child_id).is_none(),
2110 "closed child region should be reclaimed from arena"
2111 );
2112 }
2113
2114 #[test]
2115 fn region_spawns_tasks_in_child() {
2116 use std::task::{Context, Poll};
2117
2118 let mut state = RuntimeState::new();
2119 let cx = test_cx();
2120 let parent = state.create_root_region(Budget::INFINITE);
2121 let scope = test_scope(parent, Budget::INFINITE);
2122
2123 let outcome = block_on(scope.region(
2124 &mut state,
2125 &cx,
2126 crate::types::policy::FailFast,
2127 |child, state| {
2128 let child_id = child.region_id();
2129 let (handle, mut stored) = child
2130 .spawn(state, &cx, |_| async { 7_i32 })
2131 .expect("spawn in child");
2132
2133 let parent_has = state
2134 .region(parent)
2135 .expect("parent record missing")
2136 .task_ids()
2137 .contains(&handle.task_id());
2138 let child_has = state
2139 .region(child_id)
2140 .expect("child record missing")
2141 .task_ids()
2142 .contains(&handle.task_id());
2143
2144 let waker = std::task::Waker::noop().clone();
2145 let mut poll_cx = Context::from_waker(&waker);
2146 let poll_result = stored.poll(&mut poll_cx);
2147 if let Poll::Ready(outcome) = poll_result {
2148 let task_outcome = match outcome {
2149 Outcome::Ok(()) => Outcome::Ok(()),
2150 Outcome::Panicked(payload) => Outcome::Panicked(payload),
2151 other => unreachable!("unexpected task outcome: {other:?}"),
2152 };
2153 if let Some(task_record) = state.task_mut(handle.task_id()) {
2154 task_record.complete(task_outcome);
2155 }
2156 let _ = state.task_completed(handle.task_id());
2157 }
2158
2159 std::future::ready(Outcome::Ok((child_id, parent_has, child_has)))
2160 },
2161 ))
2162 .expect("child region created");
2163
2164 let (child_id, parent_has, child_has) = match outcome {
2165 Outcome::Ok(tuple) => tuple,
2166 other => unreachable!("expected Outcome::Ok(tuple), got {other:?}"),
2167 };
2168
2169 assert!(!parent_has, "task should not be owned by parent region");
2170 assert!(child_has, "task should be owned by child region");
2171
2172 let parent_record = state.region(parent).expect("parent record missing");
2173 assert!(
2174 !parent_record.child_ids().contains(&child_id),
2175 "closed child should be removed from parent"
2176 );
2177 }
2178
2179 #[test]
2180 fn spawn_panic_propagates_as_panicked_error() {
2181 use std::task::{Context, Poll};
2182
2183 let mut state = RuntimeState::new();
2184 let cx = test_cx();
2185 let region = state.create_root_region(Budget::INFINITE);
2186 let scope = test_scope(region, Budget::INFINITE);
2187
2188 let (mut handle, mut stored_task) = scope
2189 .spawn(&mut state, &cx, |_| async {
2190 std::panic::panic_any("oops");
2191 })
2192 .unwrap();
2193
2194 let waker = std::task::Waker::noop().clone();
2196 let mut ctx = Context::from_waker(&waker);
2197
2198 match stored_task.poll(&mut ctx) {
2200 Poll::Ready(crate::types::Outcome::Panicked(_)) => {}
2201 res => unreachable!("Task should have completed with Panicked, got {res:?}"),
2202 }
2203
2204 let mut join_fut = std::pin::pin!(handle.join(&cx));
2206 match join_fut.as_mut().poll(&mut ctx) {
2207 Poll::Ready(Err(JoinError::Panicked(p))) => {
2208 assert_eq!(p.message(), "oops");
2209 }
2210 res => unreachable!("Expected Panicked, got {res:?}"),
2211 }
2212 }
2213
2214 #[test]
2215 fn join_all_success() {
2216 use std::task::{Context, Poll};
2217
2218 let mut state = RuntimeState::new();
2219 let cx = test_cx();
2220 let region = state.create_root_region(Budget::INFINITE);
2221 let scope = test_scope(region, Budget::INFINITE);
2222
2223 let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1 }).unwrap();
2224 let (h2, mut t2) = scope.spawn(&mut state, &cx, |_| async { 2 }).unwrap();
2225
2226 let waker = std::task::Waker::noop().clone();
2228 let mut ctx = Context::from_waker(&waker);
2229 assert!(t1.poll(&mut ctx).is_ready());
2230 assert!(t2.poll(&mut ctx).is_ready());
2231
2232 let handles = vec![h1, h2];
2233 let mut fut = Box::pin(scope.join_all(&cx, handles));
2234
2235 match fut.as_mut().poll(&mut ctx) {
2236 Poll::Ready(results) => {
2237 assert_eq!(results.len(), 2);
2238 assert_eq!(results[0].as_ref().unwrap(), &1);
2239 assert_eq!(results[1].as_ref().unwrap(), &2);
2240 }
2241 Poll::Pending => unreachable!("join_all should be ready"),
2242 }
2243 }
2244
2245 #[test]
2246 fn race_all_aborted_task_is_drained() {
2247 use std::task::{Context, Poll};
2248
2249 let mut state = RuntimeState::new();
2250 let cx = test_cx();
2251 let region = state.create_root_region(Budget::INFINITE);
2252 let scope = test_scope(region, Budget::INFINITE);
2253
2254 let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1 }).unwrap();
2256
2257 let (h2, mut t2) = scope
2259 .spawn(&mut state, &cx, |cx| async move {
2260 struct YieldOnce(bool);
2262 impl std::future::Future for YieldOnce {
2263 type Output = ();
2264 fn poll(
2265 mut self: std::pin::Pin<&mut Self>,
2266 cx: &mut std::task::Context<'_>,
2267 ) -> std::task::Poll<()> {
2268 if self.0 {
2269 std::task::Poll::Ready(())
2270 } else {
2271 self.0 = true;
2272 cx.waker().wake_by_ref();
2273 std::task::Poll::Pending
2274 }
2275 }
2276 }
2277 YieldOnce(false).await;
2278
2279 if cx.checkpoint().is_err() {
2281 return 0; }
2283 2
2284 })
2285 .unwrap();
2286
2287 let waker = std::task::Waker::noop().clone();
2288 let mut ctx = Context::from_waker(&waker);
2289
2290 assert!(t1.poll(&mut ctx).is_ready());
2292
2293 let handles = vec![h1, h2];
2295 let mut race_fut = Box::pin(scope.race_all(&cx, handles));
2296
2297 assert!(race_fut.as_mut().poll(&mut ctx).is_pending());
2304
2305 assert!(t2.poll(&mut ctx).is_pending());
2310
2311 assert!(race_fut.as_mut().poll(&mut ctx).is_pending());
2313
2314 assert!(t2.poll(&mut ctx).is_ready());
2323
2324 match race_fut.as_mut().poll(&mut ctx) {
2327 Poll::Ready(Ok((val, idx))) => {
2328 assert_eq!(val, 1);
2329 assert_eq!(idx, 0);
2330 }
2331 res => unreachable!("Expected Ready(Ok((1, 0))), got {res:?}"),
2332 }
2333 }
2334
2335 #[test]
2336 fn race_surfaces_loser_panic_even_if_winner_succeeds() {
2337 use std::task::Context;
2338
2339 let mut state = RuntimeState::new();
2340 let cx = test_cx();
2341 let region = state.create_root_region(Budget::INFINITE);
2342 let scope = test_scope(region, Budget::INFINITE);
2343
2344 let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
2345 let (h2, mut t2) = scope
2346 .spawn(&mut state, &cx, |_| async {
2347 std::panic::panic_any("loser panic");
2348 })
2349 .unwrap();
2350
2351 let waker = std::task::Waker::noop().clone();
2352 let mut poll_cx = Context::from_waker(&waker);
2353 assert!(t1.poll(&mut poll_cx).is_ready());
2354 assert!(t2.poll(&mut poll_cx).is_ready());
2355
2356 let result = block_on(scope.race(&cx, h1, h2));
2357 assert!(
2358 matches!(result, Err(JoinError::Panicked(_))),
2359 "loser panic must dominate race result, got {result:?}"
2360 );
2361 }
2362
2363 #[test]
2364 fn race_preserves_winner_panic_over_loser_panic() {
2365 use std::task::{Context, Poll};
2366
2367 let mut state = RuntimeState::new();
2368 let cx = test_cx();
2369 let region = state.create_root_region(Budget::INFINITE);
2370 let scope = test_scope(region, Budget::INFINITE);
2371
2372 let (h1, mut t1) = scope
2373 .spawn(&mut state, &cx, |_| async {
2374 std::panic::panic_any("winner panic");
2375 })
2376 .unwrap();
2377 let (h2, mut t2) = scope
2378 .spawn(&mut state, &cx, |_| {
2379 let mut first_poll = true;
2380 std::future::poll_fn(move |poll_cx| {
2381 if first_poll {
2382 first_poll = false;
2383 poll_cx.waker().wake_by_ref();
2384 Poll::Pending
2385 } else {
2386 std::panic::panic_any("loser panic");
2387 }
2388 })
2389 })
2390 .unwrap();
2391
2392 let waker = std::task::Waker::noop().clone();
2393 let mut poll_cx = Context::from_waker(&waker);
2394 assert!(t1.poll(&mut poll_cx).is_ready());
2395 assert!(t2.poll(&mut poll_cx).is_pending());
2396
2397 let result = block_on(scope.race(&cx, h1, h2));
2398 match result {
2399 Err(JoinError::Panicked(payload)) => {
2400 assert_eq!(payload.message(), "winner panic");
2401 }
2402 other => unreachable!("winner panic must dominate race result, got {other:?}"),
2403 }
2404 }
2405
2406 #[test]
2407 fn race_all_surfaces_simultaneous_loser_panic() {
2408 use std::task::Context;
2409
2410 let mut state = RuntimeState::new();
2411 let cx = test_cx();
2412 let region = state.create_root_region(Budget::INFINITE);
2413 let scope = test_scope(region, Budget::INFINITE);
2414
2415 let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 1_i32 }).unwrap();
2416 let (h2, mut t2) = scope
2417 .spawn(&mut state, &cx, |_| async {
2418 std::panic::panic_any("simultaneous loser panic");
2419 })
2420 .unwrap();
2421 let (h3, mut t3) = scope.spawn(&mut state, &cx, |_| async { 3_i32 }).unwrap();
2422
2423 let waker = std::task::Waker::noop().clone();
2424 let mut poll_cx = Context::from_waker(&waker);
2425 assert!(t1.poll(&mut poll_cx).is_ready());
2426 assert!(t2.poll(&mut poll_cx).is_ready());
2427 assert!(t3.poll(&mut poll_cx).is_ready());
2428
2429 let result = block_on(scope.race_all(&cx, vec![h1, h2, h3]));
2430 assert!(
2431 matches!(result, Err(JoinError::Panicked(_))),
2432 "simultaneous loser panic must dominate race_all result, got {result:?}"
2433 );
2434 }
2435
2436 #[test]
2437 fn race_all_preserves_winner_panic_over_loser_panic() {
2438 use std::task::{Context, Poll};
2439
2440 let mut state = RuntimeState::new();
2441 let cx = test_cx();
2442 let region = state.create_root_region(Budget::INFINITE);
2443 let scope = test_scope(region, Budget::INFINITE);
2444
2445 let (h1, mut t1) = scope
2446 .spawn(&mut state, &cx, |_| async {
2447 std::panic::panic_any("winner panic");
2448 })
2449 .unwrap();
2450 let (h2, mut t2) = scope
2451 .spawn(&mut state, &cx, |_| {
2452 let mut first_poll = true;
2453 std::future::poll_fn(move |poll_cx| {
2454 if first_poll {
2455 first_poll = false;
2456 poll_cx.waker().wake_by_ref();
2457 Poll::Pending
2458 } else {
2459 std::panic::panic_any("loser panic");
2460 }
2461 })
2462 })
2463 .unwrap();
2464
2465 let waker = std::task::Waker::noop().clone();
2466 let mut poll_cx = Context::from_waker(&waker);
2467 assert!(t1.poll(&mut poll_cx).is_ready());
2468 assert!(t2.poll(&mut poll_cx).is_pending());
2469
2470 let result = block_on(scope.race_all(&cx, vec![h1, h2]));
2471 match result {
2472 Err(JoinError::Panicked(payload)) => {
2473 assert_eq!(payload.message(), "winner panic");
2474 }
2475 other => unreachable!("winner panic must dominate race_all result, got {other:?}"),
2476 }
2477 }
2478
2479 #[test]
2480 fn race_all_empty_is_pending() {
2481 let mut state = RuntimeState::new();
2482 let cx = test_cx();
2483 let region = state.create_root_region(Budget::INFINITE);
2484 let scope = test_scope(region, Budget::INFINITE);
2485
2486 let fut = scope.race_all::<i32>(&cx, vec![]);
2487 let waker = std::task::Waker::noop();
2488 let mut poll_cx = std::task::Context::from_waker(waker);
2489 let pinned = std::pin::pin!(fut);
2490 let status = std::future::Future::poll(pinned, &mut poll_cx);
2491 assert!(status.is_pending());
2492 }
2493
2494 #[test]
2502 fn conformance_spawn_creates_trackable_task() {
2503 let mut state = RuntimeState::new();
2505 let cx = test_cx();
2506 let region = state.create_root_region(Budget::INFINITE);
2507 let scope = test_scope(region, Budget::INFINITE);
2508
2509 let (handle, _stored) = scope.spawn(&mut state, &cx, |_| async { 42_i32 }).unwrap();
2510
2511 let task_record = state
2513 .task(handle.task_id())
2514 .expect("spawned task must have a record");
2515
2516 assert_eq!(
2518 task_record.owner, region,
2519 "spawned task must be owned by the spawning region"
2520 );
2521
2522 let region_record = state.region(region).expect("spawning region must exist");
2524 assert!(
2525 region_record.task_ids().contains(&handle.task_id()),
2526 "spawning region must track the spawned task"
2527 );
2528 }
2529
2530 #[test]
2531 fn conformance_spawn_enforces_send_bounds() {
2532 let mut state = RuntimeState::new();
2534 let cx = test_cx();
2535 let region = state.create_root_region(Budget::INFINITE);
2536 let scope = test_scope(region, Budget::INFINITE);
2537
2538 let send_data = String::from("test");
2540 let (handle, _stored) = scope
2541 .spawn(&mut state, &cx, move |_| async move {
2542 send_data.len() })
2544 .unwrap();
2545
2546 let task_record = state
2548 .task(handle.task_id())
2549 .expect("Send task must have a record");
2550 assert_eq!(task_record.owner, region);
2551
2552 }
2555
2556 #[test]
2557 fn conformance_join_awaits_task_completion() {
2558 use std::task::Context;
2561
2562 let mut state = RuntimeState::new();
2563 let cx = test_cx();
2564 let region = state.create_root_region(Budget::INFINITE);
2565 let scope = test_scope(region, Budget::INFINITE);
2566
2567 let (mut handle, mut stored) = scope.spawn(&mut state, &cx, |_| async { 123_i32 }).unwrap();
2568
2569 let waker = std::task::Waker::noop().clone();
2570 let mut poll_cx = Context::from_waker(&waker);
2571
2572 let mut join_fut = std::pin::pin!(handle.join(&cx));
2574 assert!(
2575 join_fut.as_mut().poll(&mut poll_cx).is_pending(),
2576 "join must be pending before task completion"
2577 );
2578
2579 assert!(
2581 stored.poll(&mut poll_cx).is_ready(),
2582 "test task must complete in one poll"
2583 );
2584
2585 match join_fut.as_mut().poll(&mut poll_cx) {
2587 std::task::Poll::Ready(Ok(result)) => {
2588 assert_eq!(result, 123, "join must return the task's result");
2589 }
2590 other => panic!("join must be Ready(Ok(123)) after task completion, got {other:?}"),
2591 }
2592 }
2593
2594 #[test]
2595 fn conformance_child_region_task_isolation() {
2596 use std::task::Context;
2599
2600 let mut state = RuntimeState::new();
2601 let cx = test_cx();
2602 let parent_region = state.create_root_region(Budget::INFINITE);
2603 let scope = test_scope(parent_region, Budget::INFINITE);
2604
2605 let outcome = block_on(scope.region(
2606 &mut state,
2607 &cx,
2608 crate::types::policy::FailFast,
2609 |child_scope, state| {
2610 let child_region = child_scope.region_id();
2611
2612 let (handle, mut stored) = child_scope
2614 .spawn(state, &cx, |_| async { 456_i32 })
2615 .expect("spawn in child region must succeed");
2616
2617 let task_record = state
2619 .task(handle.task_id())
2620 .expect("child task must have a record");
2621 let child_owns = task_record.owner == child_region;
2622 let parent_owns = task_record.owner == parent_region;
2623
2624 let parent_tracks = state
2626 .region(parent_region)
2627 .is_some_and(|r| r.task_ids().contains(&handle.task_id()));
2628 let child_tracks = state
2629 .region(child_region)
2630 .is_some_and(|r| r.task_ids().contains(&handle.task_id()));
2631
2632 let waker = std::task::Waker::noop().clone();
2634 let mut poll_cx = Context::from_waker(&waker);
2635 if let std::task::Poll::Ready(outcome) = stored.poll(&mut poll_cx) {
2636 if let Some(task) = state.task_mut(handle.task_id()) {
2637 task.complete(outcome.map_err(|_| {
2638 crate::error::Error::new(crate::error::ErrorKind::Internal)
2639 }));
2640 }
2641 let _ = state.task_completed(handle.task_id());
2642 }
2643
2644 std::future::ready(Outcome::Ok((
2645 child_owns,
2646 parent_owns,
2647 child_tracks,
2648 parent_tracks,
2649 )))
2650 },
2651 ))
2652 .expect("child region must complete");
2653
2654 let (child_owns, parent_owns, child_tracks, parent_tracks) = match outcome {
2655 Outcome::Ok(tuple) => tuple,
2656 other => panic!("expected Ok(ownership_data), got {other:?}"),
2657 };
2658
2659 assert!(
2660 child_owns,
2661 "task spawned in child region must be owned by child"
2662 );
2663 assert!(
2664 !parent_owns,
2665 "task spawned in child region must NOT be owned by parent"
2666 );
2667 assert!(child_tracks, "child region must track its spawned tasks");
2668 assert!(!parent_tracks, "parent region must NOT track child's tasks");
2669 }
2670
2671 #[test]
2672 fn conformance_capability_inheritance() {
2673 use crate::cx::macaroon::MacaroonToken;
2675 use crate::cx::registry::RegistryHandle;
2676 use crate::remote::{NodeId, RemoteCap};
2677 use crate::security::key::AuthKey;
2678 use crate::types::SystemPressure;
2679 use std::sync::Arc;
2680 use std::task::Context;
2681
2682 let mut state = RuntimeState::new();
2683 let region = state.create_root_region(Budget::INFINITE);
2684 let scope = test_scope(region, Budget::INFINITE);
2685
2686 let registry = crate::cx::NameRegistry::new();
2688 let registry_handle = RegistryHandle::new(Arc::new(registry));
2689 let parent_registry_arc = registry_handle.as_arc();
2690 let parent_io_cap: Arc<dyn crate::io::IoCap> = Arc::new(crate::io::LabIoCap::new());
2691 let parent_pressure = Arc::new(SystemPressure::new());
2692 parent_pressure.set_headroom(0.25);
2693 let auth_key = AuthKey::from_seed(7);
2694 let token = MacaroonToken::mint(&auth_key, "scope:spawn", "cx/scope");
2695
2696 let parent_cx = Cx::new_with_io(
2697 crate::types::RegionId::new_for_test(0, 0),
2698 crate::types::TaskId::new_for_test(0, 0),
2699 Budget::INFINITE,
2700 None,
2701 None,
2702 Some(Arc::clone(&parent_io_cap)),
2703 None,
2704 )
2705 .with_registry_handle(Some(registry_handle))
2706 .with_remote_cap(RemoteCap::new().with_local_node(NodeId::new("test-node")))
2707 .with_pressure(Arc::clone(&parent_pressure))
2708 .with_macaroon(token);
2709 let parent_macaroon = parent_cx
2710 .macaroon_handle()
2711 .expect("parent must retain macaroon capability");
2712
2713 let mut handle = scope
2714 .spawn_registered(&mut state, &parent_cx, move |child_cx| async move {
2715 let child_registry = child_cx
2717 .registry_handle()
2718 .expect("child must inherit registry capability");
2719 let same_registry = Arc::ptr_eq(&child_registry.as_arc(), &parent_registry_arc);
2720
2721 let child_remote = child_cx
2723 .remote()
2724 .expect("child must inherit remote capability");
2725 let node_name = child_remote.local_node().as_str().to_owned();
2726
2727 let child_io_cap = child_cx
2729 .io_cap_handle()
2730 .expect("child must inherit I/O capability");
2731 let same_io_cap = Arc::ptr_eq(&child_io_cap, &parent_io_cap);
2732
2733 let child_pressure = child_cx
2735 .pressure_handle()
2736 .expect("child must inherit system pressure");
2737 let same_pressure = Arc::ptr_eq(&child_pressure, &parent_pressure);
2738
2739 let child_macaroon = child_cx
2741 .macaroon_handle()
2742 .expect("child must inherit macaroon capability");
2743 let same_macaroon = Arc::ptr_eq(&child_macaroon, &parent_macaroon);
2744
2745 let has_timer = child_cx.has_timer();
2747
2748 (
2749 same_registry,
2750 node_name,
2751 same_io_cap,
2752 same_pressure,
2753 same_macaroon,
2754 has_timer,
2755 )
2756 })
2757 .unwrap();
2758
2759 let waker = std::task::Waker::noop().clone();
2761 let mut poll_cx = Context::from_waker(&waker);
2762
2763 let stored = state
2764 .get_stored_future(handle.task_id())
2765 .expect("spawn_registered must store the task");
2766 assert!(stored.poll(&mut poll_cx).is_ready());
2767
2768 let mut join_fut = std::pin::pin!(handle.join(&parent_cx));
2769 match join_fut.as_mut().poll(&mut poll_cx) {
2770 std::task::Poll::Ready(Ok((
2771 same_registry,
2772 node_name,
2773 same_io_cap,
2774 same_pressure,
2775 same_macaroon,
2776 has_timer,
2777 ))) => {
2778 assert!(
2779 same_registry,
2780 "child must inherit exact same registry instance"
2781 );
2782 assert_eq!(
2783 node_name, "test-node",
2784 "child must inherit remote capability"
2785 );
2786 assert!(same_io_cap, "child must inherit exact same I/O capability");
2787 assert!(
2788 same_pressure,
2789 "child must inherit exact same system pressure handle"
2790 );
2791 assert!(
2792 same_macaroon,
2793 "child must inherit exact same macaroon capability"
2794 );
2795 assert_eq!(
2796 has_timer,
2797 parent_cx.has_timer(),
2798 "child timer capability should stay consistent with the runtime-backed parent"
2799 );
2800 }
2801 other => panic!("capability inheritance test failed: {other:?}"),
2802 }
2803 }
2804
2805 #[test]
2806 fn conformance_task_cancellation_propagation() {
2807 use std::task::Context;
2810
2811 let mut state = RuntimeState::new();
2812 let cx = test_cx();
2813 let region = state.create_root_region(Budget::INFINITE);
2814 let scope = test_scope(region, Budget::INFINITE);
2815
2816 let (mut handle, mut stored) = scope
2817 .spawn(&mut state, &cx, |cx| async move {
2818 if cx.checkpoint().is_err() {
2820 "cancelled"
2821 } else {
2822 "completed"
2823 }
2824 })
2825 .unwrap();
2826
2827 handle.abort();
2829
2830 let waker = std::task::Waker::noop().clone();
2831 let mut poll_cx = Context::from_waker(&waker);
2832
2833 assert!(
2835 stored.poll(&mut poll_cx).is_ready(),
2836 "cancelled task must still complete"
2837 );
2838
2839 let mut join_fut = std::pin::pin!(handle.join(&cx));
2841 match join_fut.as_mut().poll(&mut poll_cx) {
2842 std::task::Poll::Ready(Ok(result)) => {
2843 assert_eq!(
2844 result, "cancelled",
2845 "cancelled task must observe cancellation via checkpoint()"
2846 );
2847 }
2848 other => panic!("cancelled task join failed: {other:?}"),
2849 }
2850 }
2851
2852 #[test]
2853 fn metamorphic_nested_scope_cancellation_closes_descendants_without_spawn_leaks() {
2854 use std::task::{Context, Poll};
2855
2856 struct YieldOnce(bool);
2857
2858 impl std::future::Future for YieldOnce {
2859 type Output = ();
2860
2861 fn poll(
2862 mut self: std::pin::Pin<&mut Self>,
2863 cx: &mut std::task::Context<'_>,
2864 ) -> Poll<()> {
2865 if self.0 {
2866 Poll::Ready(())
2867 } else {
2868 self.0 = true;
2869 cx.waker().wake_by_ref();
2870 Poll::Pending
2871 }
2872 }
2873 }
2874
2875 let mut state = RuntimeState::new();
2876 let cx = test_cx();
2877 let root = state.create_root_region(Budget::INFINITE);
2878 let child = state
2879 .create_child_region(root, Budget::INFINITE)
2880 .expect("child region");
2881 let grandchild = state
2882 .create_child_region(child, Budget::INFINITE)
2883 .expect("grandchild region");
2884 let child_scope = test_scope(child, Budget::INFINITE);
2885 let grandchild_scope = test_scope(grandchild, Budget::INFINITE);
2886 let finalizer_log = Arc::new(std::sync::Mutex::new(Vec::new()));
2887
2888 let child_log = Arc::clone(&finalizer_log);
2889 assert!(
2890 child_scope.defer_sync(&mut state, move || {
2891 child_log
2892 .lock()
2893 .expect("child finalizer log poisoned")
2894 .push("child");
2895 }),
2896 "child finalizer should register before cancellation"
2897 );
2898
2899 let grandchild_log = Arc::clone(&finalizer_log);
2900 assert!(
2901 grandchild_scope.defer_sync(&mut state, move || {
2902 grandchild_log
2903 .lock()
2904 .expect("grandchild finalizer log poisoned")
2905 .push("grandchild");
2906 }),
2907 "grandchild finalizer should register before cancellation"
2908 );
2909
2910 let mut child_handle = child_scope
2911 .spawn_registered(&mut state, &cx, |task_cx| async move {
2912 YieldOnce(false).await;
2913 if task_cx.checkpoint().is_err() {
2914 "child_cancelled"
2915 } else {
2916 "child_completed"
2917 }
2918 })
2919 .expect("spawn child task");
2920 let child_task_id = child_handle.task_id();
2921
2922 let mut grandchild_handle = grandchild_scope
2923 .spawn_registered(&mut state, &cx, |task_cx| async move {
2924 YieldOnce(false).await;
2925 if task_cx.checkpoint().is_err() {
2926 "grandchild_cancelled"
2927 } else {
2928 "grandchild_completed"
2929 }
2930 })
2931 .expect("spawn grandchild task");
2932 let grandchild_task_id = grandchild_handle.task_id();
2933
2934 let cancel_reason = CancelReason::shutdown().with_region(root);
2935 let cancelled = state.cancel_request(root, &cancel_reason, None);
2936 assert!(
2937 cancelled
2938 .iter()
2939 .any(|(task_id, _)| *task_id == child_task_id),
2940 "parent cancellation must reach child task"
2941 );
2942 assert!(
2943 cancelled
2944 .iter()
2945 .any(|(task_id, _)| *task_id == grandchild_task_id),
2946 "parent cancellation must reach grandchild task"
2947 );
2948
2949 let grandchild_tasks_before_failed_spawn = state
2950 .region(grandchild)
2951 .expect("grandchild region missing")
2952 .task_count();
2953 let live_tasks_before_failed_spawn = state.live_task_count();
2954 let failed_spawn = grandchild_scope.spawn(&mut state, &cx, |_| async { 99_u8 });
2955 let grandchild_tasks_after_failed_spawn = state
2956 .region(grandchild)
2957 .expect("grandchild region missing after failed spawn")
2958 .task_count();
2959 let live_tasks_after_failed_spawn = state.live_task_count();
2960
2961 let waker = std::task::Waker::noop().clone();
2962 let mut poll_cx = Context::from_waker(&waker);
2963 {
2964 let stored = state
2965 .get_stored_future(grandchild_task_id)
2966 .expect("grandchild stored task");
2967 let poll_result = stored.poll(&mut poll_cx);
2968 assert!(
2969 poll_result.is_pending(),
2970 "grandchild task should yield once before observing cancellation"
2971 );
2972 }
2973 {
2974 let stored = state
2975 .get_stored_future(grandchild_task_id)
2976 .expect("grandchild stored task");
2977 let poll_result = stored.poll(&mut poll_cx);
2978 let task_outcome = match poll_result {
2979 Poll::Ready(Outcome::Ok(())) => Outcome::Ok(()),
2980 Poll::Ready(Outcome::Panicked(payload)) => Outcome::Panicked(payload),
2981 other => panic!(
2982 "grandchild task should complete once cancellation is observed: {other:?}"
2983 ),
2984 };
2985 if let Some(task_record) = state.task_mut(grandchild_task_id) {
2986 task_record.complete(task_outcome);
2987 }
2988 }
2989 let _ = state.task_completed(grandchild_task_id);
2990 state.advance_region_state(grandchild);
2991
2992 let mut grandchild_join_fut = std::pin::pin!(grandchild_handle.join(&cx));
2993 let grandchild_result = match grandchild_join_fut.as_mut().poll(&mut poll_cx) {
2994 Poll::Ready(Ok(result)) => result,
2995 other => panic!("grandchild cancellation join should succeed: {other:?}"),
2996 };
2997
2998 {
2999 let stored = state
3000 .get_stored_future(child_task_id)
3001 .expect("child stored task");
3002 let poll_result = stored.poll(&mut poll_cx);
3003 assert!(
3004 poll_result.is_pending(),
3005 "child task should yield once before observing cancellation"
3006 );
3007 }
3008 {
3009 let stored = state
3010 .get_stored_future(child_task_id)
3011 .expect("child stored task");
3012 let poll_result = stored.poll(&mut poll_cx);
3013 let task_outcome = match poll_result {
3014 Poll::Ready(Outcome::Ok(())) => Outcome::Ok(()),
3015 Poll::Ready(Outcome::Panicked(payload)) => Outcome::Panicked(payload),
3016 other => {
3017 panic!("child task should complete once cancellation is observed: {other:?}")
3018 }
3019 };
3020 if let Some(task_record) = state.task_mut(child_task_id) {
3021 task_record.complete(task_outcome);
3022 }
3023 }
3024 let _ = state.task_completed(child_task_id);
3025 state.advance_region_state(child);
3026
3027 let mut child_join_fut = std::pin::pin!(child_handle.join(&cx));
3028 let child_result = match child_join_fut.as_mut().poll(&mut poll_cx) {
3029 Poll::Ready(Ok(result)) => result,
3030 other => panic!("child cancellation join should succeed: {other:?}"),
3031 };
3032
3033 assert_eq!(child_result, "child_cancelled");
3034 assert_eq!(grandchild_result, "grandchild_cancelled");
3035 assert!(
3036 matches!(failed_spawn, Err(SpawnError::RegionClosed(id)) if id == grandchild),
3037 "nested spawn after parent cancellation must fail against the closing grandchild region"
3038 );
3039 assert_eq!(
3040 grandchild_tasks_before_failed_spawn, grandchild_tasks_after_failed_spawn,
3041 "failed spawn after cancellation must not leak task membership into the grandchild region"
3042 );
3043 assert_eq!(
3044 live_tasks_before_failed_spawn, live_tasks_after_failed_spawn,
3045 "failed spawn after cancellation must not inflate runtime task count"
3046 );
3047 assert_eq!(
3048 *finalizer_log.lock().expect("finalizer log poisoned"),
3049 vec!["grandchild", "child"],
3050 "nested scope finalizers must run in reverse scope creation order"
3051 );
3052 assert!(
3053 state.region(grandchild).is_none(),
3054 "grandchild region should be reclaimed after close"
3055 );
3056 assert!(
3057 state.region(child).is_none(),
3058 "child region should be reclaimed after close"
3059 );
3060 }
3061
3062 #[test]
3063 fn conformance_race_loser_drain_invariant() {
3064 use std::task::{Context, Poll};
3067
3068 let mut state = RuntimeState::new();
3069 let cx = test_cx();
3070 let region = state.create_root_region(Budget::INFINITE);
3071 let scope = test_scope(region, Budget::INFINITE);
3072
3073 let (winner_handle, mut winner_stored) = scope
3075 .spawn(&mut state, &cx, |_| async { "winner" })
3076 .unwrap();
3077
3078 let (loser_handle, mut loser_stored) = scope
3080 .spawn(&mut state, &cx, |cx| async move {
3081 struct YieldOnce(bool);
3083 impl std::future::Future for YieldOnce {
3084 type Output = ();
3085 fn poll(
3086 mut self: std::pin::Pin<&mut Self>,
3087 cx: &mut std::task::Context<'_>,
3088 ) -> std::task::Poll<()> {
3089 if self.0 {
3090 std::task::Poll::Ready(())
3091 } else {
3092 self.0 = true;
3093 cx.waker().wake_by_ref();
3094 std::task::Poll::Pending
3095 }
3096 }
3097 }
3098 YieldOnce(false).await;
3099
3100 if cx.checkpoint().is_err() {
3102 "loser_cancelled"
3103 } else {
3104 "loser_completed"
3105 }
3106 })
3107 .unwrap();
3108
3109 let waker = std::task::Waker::noop().clone();
3110 let mut poll_cx = Context::from_waker(&waker);
3111
3112 assert!(winner_stored.poll(&mut poll_cx).is_ready());
3114
3115 let handles = vec![winner_handle, loser_handle];
3117 let mut race_fut = std::pin::pin!(scope.race_all(&cx, handles));
3118
3119 assert!(
3121 race_fut.as_mut().poll(&mut poll_cx).is_pending(),
3122 "race must wait for loser to be drained"
3123 );
3124
3125 assert!(loser_stored.poll(&mut poll_cx).is_pending());
3127
3128 assert!(race_fut.as_mut().poll(&mut poll_cx).is_pending());
3130
3131 assert!(loser_stored.poll(&mut poll_cx).is_ready());
3133
3134 match race_fut.as_mut().poll(&mut poll_cx) {
3136 Poll::Ready(Ok((result, winner_index))) => {
3137 assert_eq!(result, "winner", "race must return winner result");
3138 assert_eq!(winner_index, 0, "winner index must be correct");
3139 }
3140 other => panic!("race must complete after loser drain: {other:?}"),
3141 }
3142 }
3143
3144 #[test]
3145 fn conformance_region_quiescence_on_empty() {
3146 let mut state = RuntimeState::new();
3148 let cx = test_cx();
3149 let parent_region = state.create_root_region(Budget::INFINITE);
3150 let scope = test_scope(parent_region, Budget::INFINITE);
3151
3152 let outcome = block_on(scope.region(
3154 &mut state,
3155 &cx,
3156 crate::types::policy::FailFast,
3157 |_child_scope, _state| {
3158 std::future::ready(Outcome::Ok("empty_region_completed"))
3160 },
3161 ))
3162 .expect("empty child region must complete");
3163
3164 match outcome {
3165 Outcome::Ok(result) => {
3166 assert_eq!(
3167 result, "empty_region_completed",
3168 "empty region must reach quiescence immediately"
3169 );
3170 }
3171 other => panic!("empty region must complete successfully: {other:?}"),
3172 }
3173
3174 let parent_record = state
3176 .region(parent_region)
3177 .expect("parent region must exist");
3178 assert!(
3179 parent_record.child_ids().is_empty(),
3180 "completed child region must be removed from parent"
3181 );
3182 }
3183
3184 #[test]
3185 fn conformance_spawn_into_closed_region_fails() {
3186 let mut state = RuntimeState::new();
3188 let cx = test_cx();
3189 let region = state.create_root_region(Budget::INFINITE);
3190 let scope = test_scope(region, Budget::INFINITE);
3191
3192 let region_record = state.region_mut(region).expect("region must exist");
3194 region_record.begin_close(None);
3195
3196 let spawn_result = scope.spawn(&mut state, &cx, |_| async { 42 });
3198
3199 assert!(
3200 matches!(spawn_result, Err(SpawnError::RegionClosed(_))),
3201 "spawning into closed region must fail with RegionClosed error"
3202 );
3203
3204 assert!(
3206 state.tasks_is_empty()
3207 || state
3208 .region(region)
3209 .map_or(true, |r| r.task_ids().is_empty()),
3210 "failed spawn must not create orphaned tasks"
3211 );
3212 }
3213
3214 #[test]
3215 fn conformance_join_multiple_tasks_preserves_results() {
3216 use std::task::{Context, Poll};
3219
3220 let mut state = RuntimeState::new();
3221 let cx = test_cx();
3222 let region = state.create_root_region(Budget::INFINITE);
3223 let scope = test_scope(region, Budget::INFINITE);
3224
3225 let (h1, mut t1) = scope.spawn(&mut state, &cx, |_| async { 100_i32 }).unwrap();
3227 let (h2, mut t2) = scope.spawn(&mut state, &cx, |_| async { 200_i32 }).unwrap();
3228 let (h3, mut t3) = scope.spawn(&mut state, &cx, |_| async { 300_i32 }).unwrap();
3229
3230 let waker = std::task::Waker::noop().clone();
3231 let mut poll_cx = Context::from_waker(&waker);
3232
3233 assert!(t1.poll(&mut poll_cx).is_ready());
3235 assert!(t2.poll(&mut poll_cx).is_ready());
3236 assert!(t3.poll(&mut poll_cx).is_ready());
3237
3238 let handles = vec![h1, h2, h3];
3240 let mut join_all_fut = std::pin::pin!(scope.join_all(&cx, handles));
3241
3242 match join_all_fut.as_mut().poll(&mut poll_cx) {
3243 Poll::Ready(results) => {
3244 assert_eq!(results.len(), 3, "join_all must return all results");
3245
3246 assert_eq!(
3248 results[0].as_ref().unwrap(),
3249 &100,
3250 "first task result must be preserved"
3251 );
3252 assert_eq!(
3253 results[1].as_ref().unwrap(),
3254 &200,
3255 "second task result must be preserved"
3256 );
3257 assert_eq!(
3258 results[2].as_ref().unwrap(),
3259 &300,
3260 "third task result must be preserved"
3261 );
3262 }
3263 other => panic!("join_all must complete with all results: {other:?}"),
3264 }
3265 }
3266
3267 #[test]
3268 fn conformance_panic_propagation_through_join() {
3269 use std::task::{Context, Poll};
3272
3273 let mut state = RuntimeState::new();
3274 let cx = test_cx();
3275 let region = state.create_root_region(Budget::INFINITE);
3276 let scope = test_scope(region, Budget::INFINITE);
3277
3278 let (mut handle, mut stored) = scope
3280 .spawn(&mut state, &cx, |_| async {
3281 std::panic::panic_any("test_panic_message");
3282 })
3283 .unwrap();
3284
3285 let waker = std::task::Waker::noop().clone();
3286 let mut poll_cx = Context::from_waker(&waker);
3287
3288 match stored.poll(&mut poll_cx) {
3290 Poll::Ready(crate::types::Outcome::Panicked(_)) => {
3291 }
3293 other => panic!("panicking task must complete with Panicked outcome: {other:?}"),
3294 }
3295
3296 let mut join_fut = std::pin::pin!(handle.join(&cx));
3298 match join_fut.as_mut().poll(&mut poll_cx) {
3299 Poll::Ready(Err(JoinError::Panicked(payload))) => {
3300 assert_eq!(
3301 payload.message(),
3302 "test_panic_message",
3303 "join must preserve panic payload message"
3304 );
3305 }
3306 other => panic!("join of panicked task must return JoinError::Panicked: {other:?}"),
3307 }
3308 }
3309}