Skip to main content

hiasync/
future.rs

1use crate::{JoinHandle, LocalWaker, Waker, Event};
2use core::any::Any;
3use core::future::Future;
4use core::marker::PhantomData;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7
8struct WakerFuture;
9
10impl Future for WakerFuture {
11    type Output = Waker;
12    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
13        let waker = LocalWaker::waker(ctx);
14        Poll::Ready(waker.clone())
15    }
16}
17
18async fn local_waker() -> Waker {
19    WakerFuture.await
20}
21
22/// 新建异步任务,可实现和当前异步任务的并发调度.
23pub async fn spawn<T>(future: T) -> JoinHandle<T::Output>
24where
25    T: Future + 'static,
26    T::Output: 'static,
27{
28    local_waker().await.task_new(future)
29}
30
31struct Yield(bool);
32
33impl Future for Yield {
34    type Output = ();
35    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
36        if self.0 {
37            return Poll::Ready(());
38        }
39        let waker = LocalWaker::waker(ctx);
40        waker.wake_by_ref();
41        self.0 = true;
42        Poll::Pending
43    }
44}
45
46/// 主动放弃本次调度切换到其他任务.
47pub async fn sched_yield() {
48    Yield(false).await
49}
50
51struct Suspend(bool);
52
53impl Future for Suspend {
54    type Output = ();
55    fn poll(mut self: Pin<&mut Self>, _ctx: &mut Context<'_>) -> Poll<Self::Output> {
56        if self.0 {
57            return Poll::Ready(());
58        }
59        self.0 = true;
60        Poll::Pending
61    }
62}
63
64struct Abort(u64);
65
66impl Future for Abort {
67    type Output = ();
68    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
69        let waker = LocalWaker::waker(ctx);
70        waker.task_abort(self.0);
71        Poll::Ready(())
72    }
73}
74
75/// 主动挂起当前异步任务,直到外部调用Runtime::task_resume后才能唤醒.
76///
77/// `task_suspend/Runtime::task_resume`的关系类似`wait_event/Runtime::notify_events`.
78/// 只是`task_suspend`需要结合到`task_self`使用.
79///
80/// ```text
81/// async fn foo(ctx: Rc<RefCell<TaskContext>>) {
82///     let msg_key = ...;
83///     non_send_msg(msg_key, ...); // 需要异步等待消息响应
84///     let task_id = task_self().await;
85///     ctx.save_key(msg_key, task_id);
86///     task_suspend().await; // 直到外部调用Runtime::wake(task_id)才返回.
87///     let resp = ctx.borrow_mut().get_msg_response(msg_key); // 获取响应
88///     ...
89/// }
90///
91/// fn main() {
92///     let mut rt = Runtime::new();
93///     ...
94///     loop {
95///         let msg_key = recv_msg();
96///         if let Some(task_id) = ctx.get_task_id(msg_key) {
97///             // 唤醒等待msg_key响应的异步任务.
98///             rt.task_resume(task_id);
99///         }
100///         rt.sched();
101///     }
102/// }
103/// ```
104pub async fn task_suspend() {
105    Suspend(false).await
106}
107
108/// 强制终止task. 可能导致资源泄露,参考`Runtime::task_force_abort`.
109pub async fn task_force_abort(task_id: u64) {
110    Abort(task_id).await
111}
112
113/// 每次调用返回不重复的event_id
114pub async fn new_event_id() -> u64 {
115    local_waker().await.new_event_id()
116}
117
118/// 获取当前异步任务的id. 同task_self.
119pub async fn current_task_id() -> u64 {
120    local_waker().await.task_id()
121}
122
123/// 获取当前异步任务的id
124pub async fn task_self() -> u64 {
125    local_waker().await.task_id()
126}
127
128struct WaitEvent<'a, T: Any> {
129    event_id: u64,
130    waker: Option<Waker>,
131    mark: PhantomData<&'a T>,
132}
133
134impl<'a, T: Any> Future for WaitEvent<'a, T> {
135    type Output = Option<&'a T>;
136    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
137        let waker = LocalWaker::waker(ctx);
138        if let Some(data) = waker.event_remove(self.event_id) {
139            self.waker = None;
140            return Poll::Ready(data.downcast_ref::<T>());
141        }
142        waker.event_register(self.event_id);
143        if self.waker.is_none() {
144            self.waker = Some(waker.clone());
145        }
146        Poll::Pending
147    }
148}
149
150impl<T: Any> Drop for WaitEvent<'_, T> {
151    fn drop(&mut self) {
152        // wait_event可能在or组合使用中提前结束,此时需要调用event_unregister避免资源泄露.
153        if let Some(waker) = &self.waker {
154            waker.event_unregister(self.event_id);
155        }
156    }
157}
158
159/// 等待外部事件通知获取事件数据.
160///
161/// 调用者保证事件ID不重复,每个事件到来时只会唤醒最后一次调用wait_event的任务.
162/// 如果事件数据非期待的类型,返回None.
163pub async fn wait_event<'a, T: Any>(event_id: u64) -> Option<&'a T> {
164    WaitEvent::<T> {
165        event_id,
166        waker: None,
167        mark: PhantomData,
168    }.await
169}
170
171/// 发送事件的异步接口.
172pub async fn notify_events(events: &[Event<'static>]) {
173    local_waker().await.notify_events(events);
174}
175
176/// 唤醒异步任务的异步接口.
177pub async fn task_resume(task_id: u64) {
178    local_waker().await.task_resume(task_id);
179}
180
181enum FutureOrOutput<T: Future> {
182    Future(T),
183    Output(Option<T::Output>),
184}
185
186pub struct And<T1: Future, T2: Future> {
187    f1: FutureOrOutput<T1>,
188    f2: FutureOrOutput<T2>,
189}
190
191impl<T1: Future, T2: Future> Unpin for And<T1, T2> {}
192
193impl<T1: Future, T2: Future> Future for And<T1, T2> {
194    type Output = (T1::Output, T2::Output);
195    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
196        if let FutureOrOutput::Future(ref mut f) = &mut self.f1 {
197            let pin: Pin<&mut T1> = unsafe { Pin::new_unchecked(f) };
198            if let Poll::Ready(r) = pin.poll(ctx) {
199                self.f1 = FutureOrOutput::Output(Some(r));
200            }
201        }
202        if let FutureOrOutput::Future(ref mut f) = &mut self.f2 {
203            let pin: Pin<&mut T2> = unsafe { Pin::new_unchecked(f) };
204            if let Poll::Ready(r) = pin.poll(ctx) {
205                self.f2 = FutureOrOutput::Output(Some(r));
206            }
207        }
208        let this: &mut Self = &mut self;
209        if let (FutureOrOutput::Output(r1), FutureOrOutput::Output(r2)) =
210            (&mut this.f1, &mut this.f2)
211        {
212            return Poll::Ready((r1.take().unwrap(), r2.take().unwrap()));
213        }
214        Poll::Pending
215    }
216}
217
218/// 等待f1, f2都结束后才返回.
219pub fn and<T1: Future, T2: Future>(
220    f1: T1,
221    f2: T2,
222) -> And<T1, T2> {
223    And {
224        f1: FutureOrOutput::Future(f1),
225        f2: FutureOrOutput::Future(f2),
226    }
227}
228
229pub struct Or<T1: Future, T2: Future> {
230    f1: T1,
231    f2: T2,
232}
233
234impl<T1: Future, T2: Future> Unpin for Or<T1, T2> {}
235
236impl<T1: Future, T2: Future> Future for Or<T1, T2> {
237    type Output = (Option<T1::Output>, Option<T2::Output>);
238    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
239        let pin = unsafe { Pin::new_unchecked(&mut self.f1) };
240        if let Poll::Ready(r) = pin.poll(ctx) {
241            return Poll::Ready((Some(r), None));
242        }
243        let pin = unsafe { Pin::new_unchecked(&mut self.f2) };
244        if let Poll::Ready(r) = pin.poll(ctx) {
245            return Poll::Ready((None, Some(r)));
246        }
247        Poll::Pending
248    }
249}
250
251/// f1和f2中任何一个结束就返回.
252///
253/// 这里要注意另一个未结束的future已执行了部分功能,提前被强制终止对业务资源的生命周期管理的影响.
254pub fn or<T1: Future, T2: Future>(
255    f1: T1,
256    f2: T2,
257) -> Or<T1, T2> {
258    Or { f1, f2 }
259}
260
261pub struct PartialOr<T1: Future, T2: Future> {
262    is_first: bool,
263    f1: FutureOrOutput<T1>,
264    f2: FutureOrOutput<T2>,
265}
266
267impl<T1: Future, T2: Future> Unpin for PartialOr<T1, T2> {}
268
269impl<T1: Future, T2: Future> Future for PartialOr<T1, T2> {
270    type Output = (T1::Output, Option<T2::Output>);
271    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
272        if let FutureOrOutput::Future(ref mut f) = &mut self.f1 {
273            let pin: Pin<&mut T1> = unsafe { Pin::new_unchecked(f) };
274            if let Poll::Ready(r) = pin.poll(ctx) {
275                if self.is_first {
276                    return Poll::Ready((r, None));
277                }
278                self.f1 = FutureOrOutput::Output(Some(r));
279            }
280        }
281
282        if self.is_first {
283            self.is_first = false;
284        }
285
286        if let FutureOrOutput::Future(ref mut f) = &mut self.f2 {
287            let pin: Pin<&mut T2> = unsafe { Pin::new_unchecked(f) };
288            if let Poll::Ready(r) = pin.poll(ctx) {
289                self.f2 = FutureOrOutput::Output(Some(r));
290            }
291        }
292
293        let this: &mut Self = &mut self;
294        if let (FutureOrOutput::Output(r1), FutureOrOutput::Output(r2)) =
295            (&mut this.f1, &mut this.f2)
296        {
297            return Poll::Ready((r1.take().unwrap(), r2.take()));
298        }
299        Poll::Pending
300    }
301}
302
303/// f2可能不被执行就返回.
304///
305/// 如果f1首次调度返回Poll::Ready, 则不会调度f2直接返回(T1::Output, None).
306/// 否则一定在f1和f2都返回Poll::Ready后才结束, 返回(T1::Output, Some(T2::Output)).
307/// 和or的区别在于,f1和f2要么从未被调度过,要么一定等待调度结束再返回,完全消除异步任务可能被强制终止的场景.
308pub fn partial_or<T1: Future, T2: Future>(
309    f1: T1,
310    f2: T2,
311) -> PartialOr<T1, T2> {
312    PartialOr {
313        is_first: true,
314        f1: FutureOrOutput::Future(f1),
315        f2: FutureOrOutput::Future(f2),
316    }
317}
318
319/// 提供future的灵活组合能力.
320pub trait Extension {
321    /// 等同于hiasync::and.
322    ///
323    /// 等待self, other都结束后才返回.
324    fn and<T>(self, other: T) -> And<Self, T>
325    where
326        Self: Future + Sized,
327        T: Future + Sized,
328    {
329        and(self, other)
330    }
331
332    /// 等同于hiasync::or.
333    ///
334    /// self和other中任何一个结束就返回.
335    /// 这里要注意另一个未结束的future已执行了部分功能,提前被强制终止对业务资源的生命周期管理的影响.
336    fn or<T>(self, other: T) -> Or<Self, T>
337    where
338        Self: Future + Sized,
339        T: Future + Sized,
340    {
341        or(self, other)
342    }
343
344    /// 等同于hiasync::partial_or.
345    ///
346    /// 如果self首次调度返回Poll::Ready, 则不会调度other直接返回(Self::Output, None).
347    /// 否则一定在self和other都返回Poll::Ready后才结束返回(Self::Output, Some(T::Output)).
348    /// 和or的区别在于,self和other要么从未被调度过,要么一定等待调度结束再返回,完全消除异步任务可能被强制终止的场景.
349    fn partial_or<T>(self, other: T) -> PartialOr<Self, T>
350    where
351        Self: Future + Sized,
352        T: Future + Sized,
353    {
354        partial_or(self, other)
355    }
356}
357
358impl<T: Future + Sized> Extension for T {}