switch_resume/
lib.rs

1#![doc = include_str!("../README.md")]
2//! # Examples
3//! ```
4#![doc = include_str!("../examples/simple.rs")]
5//! ```
6//!
7//! ```
8#![doc = include_str!("../examples/foobar.rs")]
9//! ```
10
11use std::{future::Future, pin::Pin, task::Poll};
12
13/// Async function passed to [Task::switch]. Represents the paused continuation.
14pub type Resume<'a, Arg, T> = Box<dyn FnOnce(Arg) -> Continuation<'a, T> + 'a>;
15
16type Continuation<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
17
18type Switch<'a, T> = Box<dyn FnOnce(Continuation<'a, T>) -> Continuation<'a, T> + 'a>;
19
20/// Handle to the running task.
21pub struct Task<'a, T: 'a> {
22    switch_sender: async_channel::Sender<Switch<'a, T>>,
23}
24
25impl<'a, T> Task<'a, T> {
26    /// Pause current task execution, switching to a new future.
27    /// Current continuation is captured and passed as argument.
28    pub async fn switch<
29        ResumeArg: 'a,
30        Fut: Future<Output = T> + 'a,
31        F: FnOnce(Resume<'a, ResumeArg, T>) -> Fut + 'a,
32    >(
33        &self,
34        f: F,
35    ) -> ResumeArg {
36        let (mut resume_arg_sender, resume_arg_receiver) = async_oneshot::oneshot();
37        self.switch_sender
38            .try_send(Box::new(move |continuation: Continuation<'a, T>| {
39                Box::pin(f(Box::new(move |arg| {
40                    resume_arg_sender.send(arg).expect("WTF");
41                    continuation
42                }))) as Continuation<'a, T>
43            }) as Switch<'a, T>)
44            .expect("WTF");
45        resume_arg_receiver.await.expect("HUH")
46    }
47}
48
49/// Run a task with switch capability.
50///
51/// Provided async function will be called with a handle to the [Task],
52/// and will be able to use switch operation using that handle.
53pub async fn run<'a, T: 'a, Fut: Future<Output = T> + 'a>(
54    f: impl FnOnce(Task<'a, T>) -> Fut + 'a,
55) -> T {
56    let (switch_sender, switch_receiver) = async_channel::bounded(1);
57    let task = Task { switch_sender };
58    let mut continuation: Option<Continuation<'a, T>> = Some(Box::pin(f(task)));
59    std::future::poll_fn(move |cx| loop {
60        let poll = Future::poll(continuation.as_mut().unwrap().as_mut(), cx);
61        match poll {
62            Poll::Ready(result) => return Poll::Ready(result),
63            Poll::Pending => {
64                match switch_receiver.try_recv() {
65                    Ok(switch) => {
66                        continuation = Some(switch(continuation.take().unwrap()));
67                        continue;
68                    }
69                    Err(
70                        async_channel::TryRecvError::Empty | async_channel::TryRecvError::Closed,
71                    ) => {}
72                };
73                return Poll::Pending;
74            }
75        }
76    })
77    .await
78}