1#![deny(missing_docs)]
2
3extern crate std;
4use core::sync::atomic::{AtomicBool, Ordering};
5use std::boxed::Box;
6use std::collections::BTreeMap;
7use std::ffi::c_void;
8use std::future::Future;
9use std::mem;
10use std::pin::Pin;
11use std::ptr;
12use std::sync::Arc;
13use std::task::{Context, Poll, Wake, Waker};
14
15macro_rules! rtdebug {
16 ($($f:tt)*) => {
17 if false {
21 std::eprintln!($($f)*);
22 }
23 }
24
25}
26
27mod abi_buffer;
28mod cabi;
29mod error_context;
30mod future_support;
31mod stream_support;
32mod subtask;
33mod waitable;
34mod waitable_set;
35
36use self::waitable_set::WaitableSet;
37pub use abi_buffer::*;
38pub use error_context::*;
39pub use future_support::*;
40pub use stream_support::*;
41#[doc(hidden)]
42pub use subtask::Subtask;
43
44type BoxFuture<'a> = Pin<Box<dyn Future<Output = ()> + 'a>>;
45
46#[cfg(feature = "async-spawn")]
47mod spawn;
48#[cfg(feature = "async-spawn")]
49pub use spawn::spawn;
50#[cfg(not(feature = "async-spawn"))]
51mod spawn_disabled;
52#[cfg(not(feature = "async-spawn"))]
53use spawn_disabled as spawn;
54
55struct FutureState<'a> {
58 tasks: spawn::Tasks<'a>,
63
64 waitable_set: Option<WaitableSet>,
66
67 waitables: BTreeMap<u32, (*mut c_void, unsafe extern "C" fn(*mut c_void, u32))>,
77
78 wasip3_task: cabi::wasip3_task,
80
81 waker: Arc<FutureWaker>,
84
85 waker_clone: Waker,
87}
88
89impl FutureState<'_> {
90 fn new(future: BoxFuture<'_>) -> FutureState<'_> {
91 let waker = Arc::new(FutureWaker::default());
92 FutureState {
93 waker_clone: waker.clone().into(),
94 waker,
95 tasks: spawn::Tasks::new(future),
96 waitable_set: None,
97 waitables: BTreeMap::new(),
98 wasip3_task: cabi::wasip3_task {
99 ptr: ptr::null_mut(),
101 version: cabi::WASIP3_TASK_V1,
102 waitable_register,
103 waitable_unregister,
104 },
105 }
106 }
107
108 fn get_or_create_waitable_set(&mut self) -> &WaitableSet {
109 self.waitable_set.get_or_insert_with(WaitableSet::new)
110 }
111
112 fn add_waitable(&mut self, waitable: u32) {
113 self.get_or_create_waitable_set().join(waitable)
114 }
115
116 fn remove_waitable(&mut self, waitable: u32) {
117 WaitableSet::remove_waitable_from_all_sets(waitable)
118 }
119
120 fn remaining_work(&self) -> bool {
121 !self.waitables.is_empty()
122 }
123
124 fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> (u32, bool) {
127 match event0 {
128 EVENT_NONE => rtdebug!("EVENT_NONE"),
129 EVENT_SUBTASK => rtdebug!("EVENT_SUBTASK({event1:#x}, {event2:#x})"),
130 EVENT_STREAM_READ => rtdebug!("EVENT_STREAM_READ({event1:#x}, {event2:#x})"),
131 EVENT_STREAM_WRITE => rtdebug!("EVENT_STREAM_WRITE({event1:#x}, {event2:#x})"),
132 EVENT_FUTURE_READ => rtdebug!("EVENT_FUTURE_READ({event1:#x}, {event2:#x})"),
133 EVENT_FUTURE_WRITE => rtdebug!("EVENT_FUTURE_WRITE({event1:#x}, {event2:#x})"),
134 EVENT_CANCEL => {
135 rtdebug!("EVENT_CANCEL");
136
137 return (CALLBACK_CODE_EXIT, true);
142 }
143 _ => unreachable!(),
144 }
145 if event0 != EVENT_NONE {
146 self.deliver_waitable_event(event1, event2)
147 }
148
149 self.poll()
150 }
151
152 fn deliver_waitable_event(&mut self, waitable: u32, code: u32) {
156 self.remove_waitable(waitable);
157 let (ptr, callback) = self.waitables.remove(&waitable).unwrap();
158 unsafe {
159 callback(ptr, code);
160 }
161 }
162
163 fn poll(&mut self) -> (u32, bool) {
169 self.with_p3_task_set(|me| {
170 let mut context = Context::from_waker(&me.waker_clone);
171
172 loop {
173 me.waker.0.store(false, Ordering::Relaxed);
176
177 let poll = me.tasks.poll_next(&mut context);
179
180 match poll {
181 Poll::Ready(Some(())) => (),
184
185 Poll::Ready(None) => {
190 assert!(me.tasks.is_empty());
191 if me.remaining_work() {
192 let waitable = me.waitable_set.as_ref().unwrap().as_raw();
193 break (CALLBACK_CODE_WAIT | (waitable << 4), false);
194 } else {
195 break (CALLBACK_CODE_EXIT, true);
196 }
197 }
198
199 Poll::Pending => {
204 assert!(!me.tasks.is_empty());
205 if me.waker.0.load(Ordering::Relaxed) {
206 break (CALLBACK_CODE_YIELD, false);
207 }
208
209 assert!(me.remaining_work());
210 let waitable = me.waitable_set.as_ref().unwrap().as_raw();
211 break (CALLBACK_CODE_WAIT | (waitable << 4), false);
212 }
213 }
214 }
215 })
216 }
217
218 fn with_p3_task_set<R>(&mut self, f: impl FnOnce(&mut Self) -> R) -> R {
219 struct ResetTask(*mut cabi::wasip3_task);
224 impl Drop for ResetTask {
225 fn drop(&mut self) {
226 unsafe {
227 cabi::wasip3_task_set(self.0);
228 }
229 }
230 }
231 let self_raw = self as *mut FutureState<'_>;
232 self.wasip3_task.ptr = self_raw.cast();
233 let prev = unsafe { cabi::wasip3_task_set(&mut self.wasip3_task) };
234 let _reset = ResetTask(prev);
235
236 f(self)
237 }
238}
239
240impl Drop for FutureState<'_> {
241 fn drop(&mut self) {
242 if !self.tasks.is_empty() {
248 self.with_p3_task_set(|me| {
249 me.tasks = Default::default();
250 })
251 }
252 }
253}
254
255unsafe extern "C" fn waitable_register(
256 ptr: *mut c_void,
257 waitable: u32,
258 callback: unsafe extern "C" fn(*mut c_void, u32),
259 callback_ptr: *mut c_void,
260) -> *mut c_void {
261 let ptr = ptr.cast::<FutureState<'static>>();
262 assert!(!ptr.is_null());
263 (*ptr).add_waitable(waitable);
264 match (*ptr).waitables.insert(waitable, (callback_ptr, callback)) {
265 Some((prev, _)) => prev,
266 None => ptr::null_mut(),
267 }
268}
269
270unsafe extern "C" fn waitable_unregister(ptr: *mut c_void, waitable: u32) -> *mut c_void {
271 let ptr = ptr.cast::<FutureState<'static>>();
272 assert!(!ptr.is_null());
273 (*ptr).remove_waitable(waitable);
274 match (*ptr).waitables.remove(&waitable) {
275 Some((prev, _)) => prev,
276 None => ptr::null_mut(),
277 }
278}
279
280#[derive(Default)]
281struct FutureWaker(AtomicBool);
282
283impl Wake for FutureWaker {
284 fn wake(self: Arc<Self>) {
285 Self::wake_by_ref(&self)
286 }
287
288 fn wake_by_ref(self: &Arc<Self>) {
289 self.0.store(true, Ordering::Relaxed)
290 }
291}
292
293const EVENT_NONE: u32 = 0;
294const EVENT_SUBTASK: u32 = 1;
295const EVENT_STREAM_READ: u32 = 2;
296const EVENT_STREAM_WRITE: u32 = 3;
297const EVENT_FUTURE_READ: u32 = 4;
298const EVENT_FUTURE_WRITE: u32 = 5;
299const EVENT_CANCEL: u32 = 6;
300
301const CALLBACK_CODE_EXIT: u32 = 0;
302const CALLBACK_CODE_YIELD: u32 = 1;
303const CALLBACK_CODE_WAIT: u32 = 2;
304const _CALLBACK_CODE_POLL: u32 = 3;
305
306const STATUS_STARTING: u32 = 0;
307const STATUS_STARTED: u32 = 1;
308const STATUS_RETURNED: u32 = 2;
309const STATUS_STARTED_CANCELLED: u32 = 3;
310const STATUS_RETURNED_CANCELLED: u32 = 4;
311
312const BLOCKED: u32 = 0xffff_ffff;
313const COMPLETED: u32 = 0x0;
314const DROPPED: u32 = 0x1;
315const CANCELLED: u32 = 0x2;
316
317#[derive(PartialEq, Debug, Copy, Clone)]
319enum ReturnCode {
320 Blocked,
322 Completed(u32),
324 Dropped(u32),
327 Cancelled(u32),
330}
331
332impl ReturnCode {
333 fn decode(val: u32) -> ReturnCode {
334 if val == BLOCKED {
335 return ReturnCode::Blocked;
336 }
337 let amt = val >> 4;
338 match val & 0xf {
339 COMPLETED => ReturnCode::Completed(amt),
340 DROPPED => ReturnCode::Dropped(amt),
341 CANCELLED => ReturnCode::Cancelled(amt),
342 _ => panic!("unknown return code {val:#x}"),
343 }
344 }
345}
346
347#[doc(hidden)]
355pub fn start_task(task: impl Future<Output = ()> + 'static) -> i32 {
356 let state = Box::into_raw(Box::new(FutureState::new(Box::pin(task))));
359
360 unsafe {
367 assert!(context_get().is_null());
368 context_set(state.cast());
369 callback(EVENT_NONE, 0, 0) as i32
370 }
371}
372
373#[doc(hidden)]
380pub unsafe fn callback(event0: u32, event1: u32, event2: u32) -> u32 {
381 let state = context_get().cast::<FutureState<'static>>();
385 assert!(!state.is_null());
386 unsafe {
387 context_set(ptr::null_mut());
388 }
389
390 unsafe {
395 let (rc, done) = (*state).callback(event0, event1, event2);
396 if done {
397 drop(Box::from_raw(state));
398 } else {
399 context_set(state.cast());
400 }
401 rtdebug!(" => (cb) {rc:#x}");
402 rc
403 }
404}
405
406pub fn block_on<T: 'static>(future: impl Future<Output = T>) -> T {
412 let mut result = None;
413 let mut state = FutureState::new(Box::pin(async {
414 result = Some(future.await);
415 }));
416 let mut event = (EVENT_NONE, 0, 0);
417 loop {
418 match state.callback(event.0, event.1, event.2) {
419 (_, true) => {
420 drop(state);
421 break result.unwrap();
422 }
423 (CALLBACK_CODE_YIELD, false) => event = state.waitable_set.as_ref().unwrap().poll(),
424 _ => event = state.waitable_set.as_ref().unwrap().wait(),
425 }
426 }
427}
428
429pub fn yield_blocking() -> bool {
450 #[cfg(not(target_arch = "wasm32"))]
451 unsafe fn yield_() -> bool {
452 unreachable!();
453 }
454
455 #[cfg(target_arch = "wasm32")]
456 #[link(wasm_import_module = "$root")]
457 extern "C" {
458 #[link_name = "[thread-yield]"]
459 fn yield_() -> bool;
460 }
461 unsafe { !yield_() }
465}
466
467pub async fn yield_async() {
482 #[derive(Default)]
483 struct Yield {
484 yielded: bool,
485 }
486
487 impl Future for Yield {
488 type Output = ();
489
490 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<()> {
491 if self.yielded {
492 Poll::Ready(())
493 } else {
494 self.yielded = true;
495 context.waker().wake_by_ref();
496 Poll::Pending
497 }
498 }
499 }
500
501 Yield::default().await;
502}
503
504#[deprecated = "use backpressure_{inc,dec} instead"]
510pub fn backpressure_set(enabled: bool) {
511 #[cfg(not(target_arch = "wasm32"))]
512 unsafe fn backpressure_set(_: i32) {
513 unreachable!();
514 }
515
516 #[cfg(target_arch = "wasm32")]
517 #[link(wasm_import_module = "$root")]
518 extern "C" {
519 #[link_name = "[backpressure-set]"]
520 fn backpressure_set(_: i32);
521 }
522
523 unsafe { backpressure_set(if enabled { 1 } else { 0 }) }
524}
525
526pub fn backpressure_inc() {
528 #[cfg(not(target_arch = "wasm32"))]
529 unsafe fn backpressure_inc() {
530 unreachable!();
531 }
532
533 #[cfg(target_arch = "wasm32")]
534 #[link(wasm_import_module = "$root")]
535 extern "C" {
536 #[link_name = "[backpressure-inc]"]
537 fn backpressure_inc();
538 }
539
540 unsafe { backpressure_inc() }
541}
542
543pub fn backpressure_dec() {
545 #[cfg(not(target_arch = "wasm32"))]
546 unsafe fn backpressure_dec() {
547 unreachable!();
548 }
549
550 #[cfg(target_arch = "wasm32")]
551 #[link(wasm_import_module = "$root")]
552 extern "C" {
553 #[link_name = "[backpressure-dec]"]
554 fn backpressure_dec();
555 }
556
557 unsafe { backpressure_dec() }
558}
559
560fn context_get() -> *mut u8 {
561 #[cfg(not(target_arch = "wasm32"))]
562 unsafe fn get() -> *mut u8 {
563 unreachable!()
564 }
565
566 #[cfg(target_arch = "wasm32")]
567 #[link(wasm_import_module = "$root")]
568 extern "C" {
569 #[link_name = "[context-get-0]"]
570 fn get() -> *mut u8;
571 }
572
573 unsafe { get() }
574}
575
576unsafe fn context_set(value: *mut u8) {
577 #[cfg(not(target_arch = "wasm32"))]
578 unsafe fn set(_: *mut u8) {
579 unreachable!()
580 }
581
582 #[cfg(target_arch = "wasm32")]
583 #[link(wasm_import_module = "$root")]
584 extern "C" {
585 #[link_name = "[context-set-0]"]
586 fn set(value: *mut u8);
587 }
588
589 unsafe { set(value) }
590}
591
592#[doc(hidden)]
593pub struct TaskCancelOnDrop {
594 _priv: (),
595}
596
597impl TaskCancelOnDrop {
598 #[doc(hidden)]
599 pub fn new() -> TaskCancelOnDrop {
600 TaskCancelOnDrop { _priv: () }
601 }
602
603 #[doc(hidden)]
604 pub fn forget(self) {
605 mem::forget(self);
606 }
607}
608
609impl Drop for TaskCancelOnDrop {
610 fn drop(&mut self) {
611 #[cfg(not(target_arch = "wasm32"))]
612 unsafe fn cancel() {
613 unreachable!()
614 }
615
616 #[cfg(target_arch = "wasm32")]
617 #[link(wasm_import_module = "[export]$root")]
618 extern "C" {
619 #[link_name = "[task-cancel]"]
620 fn cancel();
621 }
622
623 unsafe { cancel() }
624 }
625}