1use crate::{Executor, TaskStream};
2use async_task::Runnable;
3use core::future::Future;
4use core::marker::PhantomData;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use futures_util::Stream;
8
9pub struct LocalRunnable {
10 runnable: Runnable,
11 _private: PhantomData<*mut ()>,
13}
14impl LocalRunnable {
15 fn new(runnable: Runnable) -> Self {
16 Self {
17 runnable,
18 _private: PhantomData,
19 }
20 }
21 pub fn run(self) -> bool {
22 self.runnable.run()
23 }
24}
25
26pub struct LocalExecutor<const N: usize> {
27 exec: Executor<N>,
28 _private: PhantomData<*mut ()>,
30}
31impl<const N: usize> LocalExecutor<N> {
32 pub const fn new(exec: Executor<N>) -> Self {
33 Self {
34 exec,
35 _private: PhantomData,
36 }
37 }
38
39 pub fn spawn<F>(&self, future: F)
40 where
41 F: Future + 'static,
42 F::Output: 'static,
43 {
44 unsafe { self.exec.spawn_local(future) }
45 }
46
47 pub fn stream(&self) -> LocalTaskStream<N> {
48 LocalTaskStream {
49 stream: self.exec.stream(),
50 _private: PhantomData,
51 }
52 }
53}
54
55pub struct LocalTaskStream<'a, const N: usize> {
56 stream: TaskStream<'a, N>,
57 _private: PhantomData<*mut ()>,
59}
60impl<'a, const N: usize> LocalTaskStream<'a, N> {
61 pub fn get_task(&self) -> Option<LocalRunnable> {
62 let runnable = self.stream.get_task()?;
63 Some(LocalRunnable::new(runnable))
64 }
65}
66impl<'a, const N: usize> Stream for LocalTaskStream<'a, N> {
67 type Item = LocalRunnable;
68 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
69 if let Poll::Ready(runnable) = Pin::new(&mut self.stream).poll_next(cx){
70 if let Some(runnable) = runnable {
71 Poll::Ready(Some(LocalRunnable::new(runnable)))
72 }else{
73 Poll::Ready(None)
74 }
75 }else{
76 Poll::Pending
77 }
78 }
79}