1use std::cell::Cell;
20use std::future::Future;
21use std::pin::Pin;
22use std::task::{Context, Poll, Wake, Waker};
23use std::time::{Duration, Instant};
24
25use crate::io::IoDriver;
26use crate::task::JoinHandle;
27use crate::timer::TimerDriver;
28use crate::{Executor, WorldCtx};
29
30const DEFAULT_EVENT_INTERVAL: u32 = 61;
33
34thread_local! {
39 static CURRENT: Cell<*mut Executor> = const { Cell::new(std::ptr::null_mut()) };
42}
43
44pub fn spawn_boxed<F>(future: F) -> JoinHandle<F::Output>
56where
57 F: Future + 'static,
58 F::Output: 'static,
59{
60 CURRENT.with(|cell| {
61 let ptr = cell.get();
62 assert!(
63 !ptr.is_null(),
64 "spawn_boxed() called outside of Runtime::block_on"
65 );
66 let executor = unsafe { &mut *ptr };
68 executor.spawn_boxed(future)
69 })
70}
71
72pub fn spawn_slab<F>(future: F) -> JoinHandle<F::Output>
85where
86 F: Future + 'static,
87 F::Output: 'static,
88{
89 CURRENT.with(|cell| {
90 let ptr = cell.get();
91 assert!(
92 !ptr.is_null(),
93 "spawn_slab() called outside of Runtime::block_on"
94 );
95 let executor = unsafe { &mut *ptr };
96 let tracker_key = executor.next_tracker_key();
97 let task_ptr = crate::alloc::slab_spawn(future, tracker_key);
98 executor.spawn_raw(task_ptr);
99 JoinHandle::new(task_ptr)
100 })
101}
102
103pub(crate) fn with_executor<R>(f: impl FnOnce(&mut Executor) -> R) -> R {
105 CURRENT.with(|cell| {
106 let ptr = cell.get();
107 assert!(!ptr.is_null(), "called outside of Runtime::block_on");
108 let executor = unsafe { &mut *ptr };
109 f(executor)
110 })
111}
112
113pub fn try_claim_slab() -> Option<crate::alloc::SlabClaim> {
124 CURRENT.with(|cell| {
125 assert!(
126 !cell.get().is_null(),
127 "try_claim_slab() called outside of Runtime::block_on"
128 );
129 });
130 crate::alloc::try_claim()
131}
132
133pub fn claim_slab() -> crate::alloc::SlabClaim {
145 CURRENT.with(|cell| {
146 assert!(
147 !cell.get().is_null(),
148 "claim_slab() called outside of Runtime::block_on"
149 );
150 });
151 crate::alloc::claim()
152}
153
154pub struct Runtime {
186 executor: Executor,
188
189 io: IoDriver,
191
192 timers: TimerDriver,
194
195 ctx: WorldCtx,
197
198 event_time: Cell<Instant>,
200
201 shutdown: crate::ShutdownHandle,
203
204 cross_wake: std::sync::Arc<crate::cross_wake::CrossWakeContext>,
207
208 cross_thread_drain_limit: usize,
210
211 event_interval: u32,
213
214 _slab: Option<Box<dyn std::any::Any>>,
218
219 slab_tls: Option<crate::alloc::SlabTlsConfig>,
222}
223
224impl Runtime {
225 pub fn new(world: &mut nexus_rt::World) -> Self {
229 RuntimeBuilder::new(world).build()
230 }
231
232 pub fn builder(world: &mut nexus_rt::World) -> RuntimeBuilder<'_> {
234 RuntimeBuilder::new(world)
235 }
236
237 pub fn shutdown_handle(&self) -> crate::ShutdownHandle {
239 self.shutdown.clone()
240 }
241
242 pub fn install_signal_handlers(&self) {
244 crate::shutdown::install_signal_handlers(&self.shutdown.flag_ptr(), &self.io.mio_waker());
245 }
246
247 pub fn task_count(&self) -> usize {
249 self.executor.task_count()
250 }
251}
252
253type SlabInstaller = Box<dyn FnOnce() -> (Box<dyn std::any::Any>, crate::alloc::SlabTlsConfig)>;
259
260pub struct RuntimeBuilder<'w> {
278 world: &'w mut nexus_rt::World,
279 tasks_per_cycle: usize,
280 cross_thread_drain_limit: usize,
281 event_interval: u32,
282 queue_capacity: usize,
283 event_capacity: usize,
284 token_capacity: usize,
285 signal_handlers: bool,
286 slab_installer: Option<SlabInstaller>,
288}
289
290impl<'w> RuntimeBuilder<'w> {
291 fn new(world: &'w mut nexus_rt::World) -> Self {
292 Self {
293 world,
294 tasks_per_cycle: crate::DEFAULT_TASKS_PER_CYCLE,
295 cross_thread_drain_limit: usize::MAX,
296 event_interval: DEFAULT_EVENT_INTERVAL,
297 queue_capacity: 64,
298 event_capacity: 1024,
299 token_capacity: 64,
300 signal_handlers: false,
301 slab_installer: None,
302 }
303 }
304
305 pub fn tasks_per_cycle(mut self, limit: usize) -> Self {
308 self.tasks_per_cycle = limit;
309 self
310 }
311
312 pub fn event_interval(mut self, n: u32) -> Self {
320 assert!(n > 0, "event_interval must be > 0");
321 self.event_interval = n;
322 self
323 }
324
325 pub fn cross_thread_drain_limit(mut self, limit: usize) -> Self {
333 self.cross_thread_drain_limit = limit;
334 self
335 }
336
337 pub fn queue_capacity(mut self, cap: usize) -> Self {
339 self.queue_capacity = cap;
340 self
341 }
342
343 pub fn event_capacity(mut self, cap: usize) -> Self {
345 self.event_capacity = cap;
346 self
347 }
348
349 pub fn token_capacity(mut self, cap: usize) -> Self {
351 self.token_capacity = cap;
352 self
353 }
354
355 pub fn signal_handlers(mut self, enable: bool) -> Self {
357 self.signal_handlers = enable;
358 self
359 }
360
361 pub fn slab_unbounded<const S: usize>(
384 mut self,
385 slab: nexus_slab::byte::unbounded::Slab<S>,
386 ) -> Self {
387 const {
388 assert!(
389 S >= 64,
390 "slab slot size must be at least 64 bytes (TASK_HEADER_SIZE)"
391 );
392 }
393 self.slab_installer = Some(Box::new(move || {
394 let mut slab = Box::new(slab);
395 let slab_ptr = std::ptr::from_mut(slab.as_mut()).cast::<u8>();
399 let config = crate::alloc::make_unbounded_config::<S>(slab_ptr);
400 (slab as Box<dyn std::any::Any>, config)
401 }));
402 self
403 }
404
405 pub fn slab_bounded<const S: usize>(
425 mut self,
426 slab: nexus_slab::byte::bounded::Slab<S>,
427 ) -> Self {
428 const {
429 assert!(
430 S >= 64,
431 "slab slot size must be at least 64 bytes (TASK_HEADER_SIZE)"
432 );
433 }
434 self.slab_installer = Some(Box::new(move || {
435 let mut slab = Box::new(slab);
436 let slab_ptr = std::ptr::from_mut(slab.as_mut()).cast::<u8>();
440 let config = crate::alloc::make_bounded_config::<S>(slab_ptr);
441 (slab as Box<dyn std::any::Any>, config)
442 }));
443 self
444 }
445
446 pub fn build(self) -> Runtime {
448 let io = IoDriver::new(self.event_capacity, self.token_capacity)
449 .expect("failed to create mio::Poll");
450 let mut shutdown = crate::ShutdownHandle::new();
451 shutdown.set_mio_waker(io.mio_waker());
452
453 let mut executor = Executor::new(self.queue_capacity);
454 executor.set_tasks_per_cycle(self.tasks_per_cycle);
455
456 let ctx = WorldCtx::new(self.world);
457 let event_time = Cell::new(Instant::now());
458
459 let (slab, slab_tls) = self.slab_installer.map_or((None, None), |install| {
461 let (slab, config) = install();
462 (Some(slab), Some(config))
463 });
464
465 let cross_wake = std::sync::Arc::new(crate::cross_wake::CrossWakeContext {
466 queue: crate::cross_wake::CrossWakeQueue::new(),
467 mio_waker: io.mio_waker(),
468 parked: std::sync::atomic::AtomicBool::new(false),
469 });
470
471 let rt = Runtime {
472 executor,
473 io,
474 timers: TimerDriver::new(64),
475 ctx,
476 event_time,
477 shutdown,
478 cross_wake,
479 cross_thread_drain_limit: self.cross_thread_drain_limit,
480 event_interval: self.event_interval,
481 _slab: slab,
482 slab_tls,
483 };
484
485 if self.signal_handlers {
486 rt.install_signal_handlers();
487 }
488
489 rt
490 }
491}
492
493impl Runtime {
498 pub fn block_on<F>(&mut self, future: F) -> F::Output
502 where
503 F: Future + 'static,
504 {
505 self.run_loop(future, ParkMode::Park)
506 }
507
508 pub fn block_on_busy<F>(&mut self, future: F) -> F::Output
512 where
513 F: Future + 'static,
514 {
515 self.run_loop(future, ParkMode::Spin)
516 }
517
518 fn run_loop<F>(&mut self, future: F, mode: ParkMode) -> F::Output
519 where
520 F: Future + 'static,
521 {
522 let _ctx_guard = crate::context::install(
524 self.ctx.as_ptr(),
525 &raw mut self.io,
526 &raw mut self.timers,
527 &raw const self.event_time,
528 std::sync::Arc::as_ptr(&self.shutdown.flag_ptr()),
529 std::ptr::from_ref(&self.shutdown.task_waker),
530 );
531
532 let _slab_guard = self.slab_tls.as_ref().map(crate::alloc::install_slab);
534
535 let _cross_wake_guard = crate::cross_wake::install_cross_wake(&self.cross_wake);
537
538 let mut root: Pin<Box<dyn Future<Output = F::Output>>> = Box::pin(future);
539
540 let woken = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
541 let root_waker = Waker::from(std::sync::Arc::new(RootWake {
542 woken: std::sync::Arc::clone(&woken),
543 mio_waker: self.io.mio_waker(),
544 }));
545 let mut root_cx = Context::from_waker(&root_waker);
546
547 let _spawn_guard = RuntimeGuard::enter(&raw mut self.executor);
549
550 let (ready, deferred) = self.executor.poll_context_mut();
552 let _ready_guard = crate::waker::set_poll_context(ready, deferred);
553
554 self.event_time.set(Instant::now());
555
556 let cross_queue = &*self.cross_wake;
560
561 let mut tick: u32 = 0;
562
563 loop {
564 if woken.swap(false, std::sync::atomic::Ordering::Acquire)
566 || self.shutdown.is_shutdown()
567 {
568 match root.as_mut().poll(&mut root_cx) {
569 Poll::Ready(output) => return output,
570 Poll::Pending => {}
571 }
572 }
573
574 self.executor
576 .drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
577
578 self.executor.poll();
580
581 self.timers.fire_expired(Instant::now());
583
584 if matches!(mode, ParkMode::Park) {
587 cross_queue
588 .parked
589 .store(true, std::sync::atomic::Ordering::Release);
590 }
591
592 self.executor
594 .drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
595
596 tick = tick.wrapping_add(1);
597
598 if tick % self.event_interval == 0 {
601 if let Err(e) = self.io.poll_io(Some(Duration::ZERO)) {
602 assert!(
603 e.kind() == std::io::ErrorKind::Interrupted,
604 "mio::Poll::poll failed: {e}"
605 );
606 }
607 self.event_time.set(Instant::now());
608 }
609
610 let has_work =
612 self.executor.has_ready() || woken.load(std::sync::atomic::Ordering::Acquire);
613
614 if has_work {
615 if matches!(mode, ParkMode::Park) {
616 cross_queue
617 .parked
618 .store(false, std::sync::atomic::Ordering::Release);
619 }
620 continue;
621 }
622
623 match mode {
625 ParkMode::Spin => {
626 if let Err(e) = self.io.poll_io(Some(Duration::ZERO)) {
628 assert!(
629 e.kind() == std::io::ErrorKind::Interrupted,
630 "mio::Poll::poll failed: {e}"
631 );
632 }
633 self.event_time.set(Instant::now());
634 }
635 ParkMode::Park => {
636 let timeout = self
640 .timers
641 .next_deadline()
642 .map(|d| d.saturating_duration_since(Instant::now()));
643
644 if let Err(e) = self.io.poll_io(timeout) {
645 assert!(
646 e.kind() == std::io::ErrorKind::Interrupted,
647 "mio::Poll::poll failed: {e}"
648 );
649 }
650
651 cross_queue
652 .parked
653 .store(false, std::sync::atomic::Ordering::Release);
654 self.event_time.set(Instant::now());
655 }
656 }
657 }
658 }
659}
660
661#[derive(Clone, Copy)]
666enum ParkMode {
667 Park,
668 Spin,
669}
670
671struct RootWake {
676 woken: std::sync::Arc<std::sync::atomic::AtomicBool>,
677 mio_waker: std::sync::Arc<mio::Waker>,
678}
679
680impl Wake for RootWake {
681 fn wake(self: std::sync::Arc<Self>) {
682 self.wake_by_ref();
683 }
684
685 fn wake_by_ref(self: &std::sync::Arc<Self>) {
686 let was_woken = self.woken.swap(true, std::sync::atomic::Ordering::Release);
687 if !was_woken {
688 let _ = self.mio_waker.wake();
689 }
690 }
691}
692
693struct RuntimeGuard {
698 prev: *mut Executor,
699}
700
701impl RuntimeGuard {
702 fn enter(executor: *mut Executor) -> Self {
703 let prev = CURRENT.with(|cell| cell.replace(executor));
704 Self { prev }
705 }
706}
707
708impl Drop for RuntimeGuard {
709 fn drop(&mut self) {
710 CURRENT.with(|cell| cell.set(self.prev));
711 }
712}
713
714#[cfg(test)]
719mod tests {
720 use super::*;
721 use nexus_rt::{Handler, IntoHandler, Res, ResMut, WorldBuilder};
722
723 nexus_rt::new_resource!(Val(u64));
724 nexus_rt::new_resource!(Out(u64));
725
726 #[test]
727 fn block_on_returns_value() {
728 let mut wb = WorldBuilder::new();
729 wb.register(Val(42));
730 let mut world = wb.build();
731
732 let mut rt = Runtime::new(&mut world);
733 let result = rt.block_on(async { 42u64 });
734 assert_eq!(result, 42);
735 }
736
737 #[test]
738 fn block_on_with_world_access() {
739 let mut wb = WorldBuilder::new();
740 wb.register(Val(42));
741 wb.register(Out(0));
742 let mut world = wb.build();
743
744 let mut rt = Runtime::new(&mut world);
745
746 let result = rt.block_on(async move {
747 crate::context::with_world(|world| {
748 let v = world.resource::<Val>().0;
749 world.resource_mut::<Out>().0 = v + 10;
750 });
751 crate::context::with_world_ref(|world| world.resource::<Out>().0)
752 });
753
754 assert_eq!(result, 52);
755 }
756
757 #[test]
758 fn block_on_with_pre_resolved_handler() {
759 let mut wb = WorldBuilder::new();
760 wb.register(Val(42));
761 wb.register(Out(0));
762 let mut world = wb.build();
763
764 let mut rt = Runtime::new(&mut world);
765
766 let mut h = (|val: Res<Val>, mut out: ResMut<Out>, event: u64| {
767 out.0 = val.0 + event;
768 })
769 .into_handler(world.registry());
770
771 let result = rt.block_on(async move {
772 crate::context::with_world(|world| h.run(world, 10));
773 crate::context::with_world_ref(|world| world.resource::<Out>().0)
774 });
775
776 assert_eq!(result, 52);
777 }
778
779 #[test]
780 fn spawn_from_root_future() {
781 let mut wb = WorldBuilder::new();
782 wb.register(Out(0));
783 let mut world = wb.build();
784
785 let mut rt = Runtime::new(&mut world);
786
787 rt.block_on(async move {
788 for i in 1..=3u64 {
789 spawn_boxed(async move {
790 crate::context::with_world(|world| {
791 world.resource_mut::<Out>().0 += i;
792 });
793 });
794 }
795
796 YieldOnce(false).await;
797 });
798
799 assert_eq!(world.resource::<Out>().0, 6);
800 }
801
802 #[test]
803 fn block_on_busy_returns_value() {
804 let mut wb = WorldBuilder::new();
805 wb.register(Val(7));
806 let mut world = wb.build();
807
808 let mut rt = Runtime::new(&mut world);
809 let result = rt.block_on_busy(async { 6 * 7 });
810 assert_eq!(result, 42);
811 }
812
813 #[test]
814 fn block_on_busy_with_spawned_tasks() {
815 let mut wb = WorldBuilder::new();
816 wb.register(Out(0));
817 let mut world = wb.build();
818
819 let mut rt = Runtime::new(&mut world);
820
821 rt.block_on_busy(async move {
822 spawn_boxed(async move {
823 crate::context::with_world(|world| {
824 world.resource_mut::<Out>().0 = 99;
825 });
826 });
827
828 YieldOnce(false).await;
829 });
830
831 assert_eq!(world.resource::<Out>().0, 99);
832 }
833
834 #[test]
835 fn event_time_is_set() {
836 let mut wb = WorldBuilder::new();
837 wb.register(Val(0));
838 let mut world = wb.build();
839
840 let mut rt = Runtime::new(&mut world);
841
842 let before = Instant::now();
843 rt.block_on(async move {
844 let t = crate::context::event_time();
845 assert!(t >= before);
846 });
847 }
848
849 #[test]
850 #[should_panic(expected = "spawn_boxed() called outside of Runtime::block_on")]
851 fn spawn_outside_runtime_panics() {
852 spawn_boxed(async {});
853 }
854
855 fn test_slab() -> nexus_slab::byte::unbounded::Slab<256> {
856 unsafe { nexus_slab::byte::unbounded::Slab::with_chunk_capacity(16) }
858 }
859
860 #[test]
861 #[should_panic(expected = "spawn_slab() called without a slab")]
862 fn spawn_slab_without_slab_panics() {
863 let mut wb = WorldBuilder::new();
864 let mut world = wb.build();
865 let mut rt = Runtime::new(&mut world);
866
867 rt.block_on(async {
868 spawn_slab(async {});
869 });
870 }
871
872 #[test]
873 fn spawn_slab_with_slab() {
874 let mut wb = WorldBuilder::new();
875 wb.register(Out(0));
876 let mut world = wb.build();
877
878 let mut rt = Runtime::builder(&mut world)
879 .slab_unbounded(test_slab())
880 .build();
881
882 rt.block_on(async move {
883 spawn_slab(async move {
884 crate::context::with_world(|world| {
885 world.resource_mut::<Out>().0 = 77;
886 });
887 });
888
889 YieldOnce(false).await;
890 });
891
892 assert_eq!(world.resource::<Out>().0, 77);
893 }
894
895 #[test]
896 fn mixed_spawn_and_spawn_slab() {
897 let mut wb = WorldBuilder::new();
898 wb.register(Out(0));
899 let mut world = wb.build();
900
901 let mut rt = Runtime::builder(&mut world)
902 .slab_unbounded(test_slab())
903 .build();
904
905 rt.block_on(async move {
906 spawn_boxed(async move {
908 crate::context::with_world(|world| {
909 world.resource_mut::<Out>().0 += 10;
910 });
911 });
912 spawn_slab(async move {
914 crate::context::with_world(|world| {
915 world.resource_mut::<Out>().0 += 20;
916 });
917 });
918
919 YieldOnce(false).await;
920 });
921
922 assert_eq!(world.resource::<Out>().0, 30);
923 }
924
925 #[test]
930 fn claim_slab_spawn_executes() {
931 let mut wb = WorldBuilder::new();
932 wb.register(Out(0));
933 let mut world = wb.build();
934
935 let mut rt = Runtime::builder(&mut world)
936 .slab_unbounded(test_slab())
937 .build();
938
939 rt.block_on(async move {
940 let claim = claim_slab();
941 claim.spawn(async move {
942 crate::context::with_world(|world| {
943 world.resource_mut::<Out>().0 = 55;
944 });
945 });
946
947 YieldOnce(false).await;
948 });
949
950 assert_eq!(world.resource::<Out>().0, 55);
951 }
952
953 #[test]
954 fn claim_slab_drop_returns_slot() {
955 let mut wb = WorldBuilder::new();
956 let mut world = wb.build();
957
958 let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
959 let mut rt = Runtime::builder(&mut world).slab_bounded(bounded).build();
960
961 rt.block_on(async {
962 let claim = claim_slab();
964 drop(claim);
965
966 let claim = claim_slab();
968 claim.spawn(async {});
969
970 YieldOnce(false).await;
971 });
972 }
973
974 #[test]
975 fn try_claim_slab_returns_none_when_full() {
976 let mut wb = WorldBuilder::new();
977 let mut world = wb.build();
978
979 let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
980 let mut rt = Runtime::builder(&mut world).slab_bounded(bounded).build();
981
982 rt.block_on(async {
983 let _held = claim_slab(); assert!(try_claim_slab().is_none());
985 });
986 }
987
988 #[test]
989 fn mixed_spawn_boxed_and_claim_slab() {
990 let mut wb = WorldBuilder::new();
991 wb.register(Out(0));
992 let mut world = wb.build();
993
994 let mut rt = Runtime::builder(&mut world)
995 .slab_unbounded(test_slab())
996 .build();
997
998 rt.block_on(async move {
999 spawn_boxed(async move {
1000 crate::context::with_world(|world| {
1001 world.resource_mut::<Out>().0 += 10;
1002 });
1003 });
1004
1005 let claim = claim_slab();
1006 claim.spawn(async move {
1007 crate::context::with_world(|world| {
1008 world.resource_mut::<Out>().0 += 20;
1009 });
1010 });
1011
1012 YieldOnce(false).await;
1013 });
1014
1015 assert_eq!(world.resource::<Out>().0, 30);
1016 }
1017
1018 #[test]
1023 fn sleep_completes() {
1024 let mut wb = WorldBuilder::new();
1025 wb.register(Out(0));
1026 let mut world = wb.build();
1027
1028 let mut rt = Runtime::new(&mut world);
1029
1030 let before = Instant::now();
1031 rt.block_on(async move {
1032 crate::context::sleep(Duration::from_millis(50)).await;
1033 });
1034 let elapsed = before.elapsed();
1035
1036 assert!(
1037 elapsed >= Duration::from_millis(40),
1038 "elapsed {elapsed:?} too short"
1039 );
1040 assert!(
1041 elapsed < Duration::from_millis(200),
1042 "elapsed {elapsed:?} too long"
1043 );
1044 }
1045
1046 #[test]
1047 fn sleep_in_spawned_task() {
1048 let mut wb = WorldBuilder::new();
1049 wb.register(Out(0));
1050 let mut world = wb.build();
1051
1052 let mut rt = Runtime::new(&mut world);
1053
1054 let before = Instant::now();
1055 rt.block_on(async move {
1056 spawn_boxed(async move {
1057 crate::context::sleep(Duration::from_millis(50)).await;
1058 crate::context::with_world(|world| {
1059 world.resource_mut::<Out>().0 = 42;
1060 });
1061 });
1062
1063 crate::context::sleep(Duration::from_millis(100)).await;
1064 });
1065
1066 let elapsed = before.elapsed();
1067 assert!(elapsed >= Duration::from_millis(80));
1068 assert_eq!(world.resource::<Out>().0, 42);
1069 }
1070
1071 #[test]
1072 fn sleep_zero_duration_ready_immediately() {
1073 let mut wb = WorldBuilder::new();
1074 let mut world = wb.build();
1075 let mut rt = Runtime::new(&mut world);
1076
1077 let before = Instant::now();
1078 rt.block_on(async move {
1079 crate::context::sleep(Duration::ZERO).await;
1080 });
1081 assert!(before.elapsed() < Duration::from_millis(10));
1082 }
1083
1084 #[test]
1085 fn sleep_past_deadline_ready_immediately() {
1086 let mut wb = WorldBuilder::new();
1087 let mut world = wb.build();
1088 let mut rt = Runtime::new(&mut world);
1089
1090 let past = Instant::now() - Duration::from_secs(1);
1091 let before = Instant::now();
1092 rt.block_on(async move {
1093 crate::context::sleep_until(past).await;
1094 });
1095 assert!(before.elapsed() < Duration::from_millis(10));
1096 }
1097
1098 #[test]
1103 fn timeout_completes_before_deadline() {
1104 let mut wb = WorldBuilder::new();
1105 let mut world = wb.build();
1106 let mut rt = Runtime::new(&mut world);
1107
1108 let result = rt.block_on(async {
1109 crate::context::timeout(Duration::from_millis(500), async { 42u64 }).await
1110 });
1111
1112 assert_eq!(result.unwrap(), 42);
1113 }
1114
1115 #[test]
1116 fn timeout_expires() {
1117 let mut wb = WorldBuilder::new();
1118 let mut world = wb.build();
1119 let mut rt = Runtime::new(&mut world);
1120
1121 let result = rt.block_on(async {
1122 crate::context::timeout(
1123 Duration::from_millis(10),
1124 crate::context::sleep(Duration::from_secs(10)),
1125 )
1126 .await
1127 });
1128
1129 assert!(result.is_err());
1130 }
1131
1132 #[test]
1137 fn interval_ticks() {
1138 let mut wb = WorldBuilder::new();
1139 wb.register(Out(0));
1140 let mut world = wb.build();
1141 let mut rt = Runtime::new(&mut world);
1142
1143 let before = Instant::now();
1144 rt.block_on(async move {
1145 let mut iv = crate::context::interval(Duration::from_millis(20));
1146 iv.tick().await; iv.tick().await; iv.tick().await; });
1150 let elapsed = before.elapsed();
1151
1152 assert!(
1153 elapsed >= Duration::from_millis(50),
1154 "too fast: {elapsed:?}"
1155 );
1156 assert!(
1157 elapsed < Duration::from_millis(200),
1158 "too slow: {elapsed:?}"
1159 );
1160 }
1161
1162 #[test]
1167 fn yield_now_lets_other_tasks_run() {
1168 let mut wb = WorldBuilder::new();
1169 wb.register(Out(0));
1170 let mut world = wb.build();
1171 let mut rt = Runtime::new(&mut world);
1172
1173 rt.block_on(async move {
1174 spawn_boxed(async move {
1175 crate::context::with_world(|world| {
1176 world.resource_mut::<Out>().0 = 99;
1177 });
1178 });
1179
1180 crate::context::yield_now().await;
1182
1183 let val = crate::context::with_world_ref(|world| world.resource::<Out>().0);
1184 assert_eq!(val, 99);
1185 });
1186 }
1187
1188 struct YieldOnce(bool);
1193
1194 impl Future for YieldOnce {
1195 type Output = ();
1196 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1197 if self.0 {
1198 Poll::Ready(())
1199 } else {
1200 self.0 = true;
1201 cx.waker().wake_by_ref();
1202 Poll::Pending
1203 }
1204 }
1205 }
1206
1207 #[test]
1212 fn join_handle_await_gets_value() {
1213 let wb = WorldBuilder::new();
1214 let mut world = wb.build();
1215 let mut rt = Runtime::new(&mut world);
1216
1217 rt.block_on(async {
1218 let handle = spawn_boxed(async { 42u64 });
1219 let result = handle.await;
1220 assert_eq!(result, 42);
1221 });
1222 }
1223
1224 #[test]
1225 fn join_handle_await_string() {
1226 let wb = WorldBuilder::new();
1227 let mut world = wb.build();
1228 let mut rt = Runtime::new(&mut world);
1229
1230 rt.block_on(async {
1231 let handle = spawn_boxed(async { String::from("hello world") });
1232 let result = handle.await;
1233 assert_eq!(result, "hello world");
1234 });
1235 }
1236
1237 #[test]
1238 fn join_handle_detach() {
1239 use std::cell::Cell;
1240 use std::rc::Rc;
1241
1242 let wb = WorldBuilder::new();
1243 let mut world = wb.build();
1244 let mut rt = Runtime::new(&mut world);
1245
1246 let ran = Rc::new(Cell::new(false));
1247 let r = ran.clone();
1248
1249 rt.block_on(async move {
1250 drop(spawn_boxed(async move {
1252 r.set(true);
1253 }));
1254 crate::context::yield_now().await;
1256 });
1257
1258 assert!(ran.get());
1259 }
1260
1261 #[test]
1262 fn join_handle_is_finished() {
1263 let wb = WorldBuilder::new();
1264 let mut world = wb.build();
1265 let mut rt = Runtime::new(&mut world);
1266
1267 rt.block_on(async {
1268 let handle = spawn_boxed(async { 1 });
1269 assert!(!handle.is_finished());
1271 crate::context::yield_now().await;
1273 assert!(handle.is_finished());
1274 let val = handle.await;
1275 assert_eq!(val, 1);
1276 });
1277 }
1278
1279 #[test]
1280 fn join_handle_abort_returns_true() {
1281 let wb = WorldBuilder::new();
1282 let mut world = wb.build();
1283 let mut rt = Runtime::new(&mut world);
1284
1285 rt.block_on(async {
1286 let handle = spawn_boxed(std::future::pending::<()>());
1287 assert!(handle.abort()); });
1289 }
1290
1291 #[test]
1292 fn join_handle_abort_completed_returns_false() {
1293 let wb = WorldBuilder::new();
1294 let mut world = wb.build();
1295 let mut rt = Runtime::new(&mut world);
1296
1297 rt.block_on(async {
1298 let handle = spawn_boxed(async { 42 });
1299 crate::context::yield_now().await;
1300 assert!(handle.is_finished());
1301 assert!(!handle.abort()); });
1303 }
1304
1305 #[test]
1306 fn join_handle_drop_after_completion_drops_output() {
1307 use std::cell::Cell;
1308 use std::rc::Rc;
1309
1310 let wb = WorldBuilder::new();
1311 let mut world = wb.build();
1312 let mut rt = Runtime::new(&mut world);
1313
1314 let drop_count = Rc::new(Cell::new(0u32));
1315 let dc = drop_count.clone();
1316
1317 struct DropCounter(Rc<Cell<u32>>);
1318 impl Drop for DropCounter {
1319 fn drop(&mut self) {
1320 self.0.set(self.0.get() + 1);
1321 }
1322 }
1323
1324 rt.block_on(async move {
1325 let handle = spawn_boxed(async move { DropCounter(dc) });
1326 crate::context::yield_now().await;
1328 assert!(handle.is_finished());
1329 drop(handle);
1331 });
1332
1333 assert_eq!(drop_count.get(), 1, "output should be dropped exactly once");
1334 }
1335
1336 #[test]
1337 fn join_handle_multiple_concurrent() {
1338 let wb = WorldBuilder::new();
1339 let mut world = wb.build();
1340 let mut rt = Runtime::new(&mut world);
1341
1342 rt.block_on(async {
1343 let h1 = spawn_boxed(async { 10u64 });
1344 let h2 = spawn_boxed(async { 20u64 });
1345 let h3 = spawn_boxed(async { 30u64 });
1346
1347 let r3 = h3.await;
1348 let r1 = h1.await;
1349 let r2 = h2.await;
1350
1351 assert_eq!(r1, 10);
1352 assert_eq!(r2, 20);
1353 assert_eq!(r3, 30);
1354 });
1355 }
1356
1357 #[test]
1358 fn join_handle_output_larger_than_future() {
1359 let wb = WorldBuilder::new();
1360 let mut world = wb.build();
1361 let mut rt = Runtime::new(&mut world);
1362
1363 rt.block_on(async {
1364 let handle = spawn_boxed(async { [42u64; 32] });
1366 let result = handle.await;
1367 assert_eq!(result[0], 42);
1368 assert_eq!(result[31], 42);
1369 });
1370 }
1371}