task_stream/
local.rs

1use crate::{Executor, TaskStream};
2use async_task::Runnable;
3use core::future::Future;
4use core::marker::PhantomData;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use futures_util::Stream;
8
9pub struct LocalRunnable {
10    runnable: Runnable,
11    // Prevent Send, Sync
12    _private: PhantomData<*mut ()>,
13}
14impl LocalRunnable {
15    fn new(runnable: Runnable) -> Self {
16        Self {
17            runnable,
18            _private: PhantomData,
19        }
20    }
21    pub fn run(self) -> bool {
22        self.runnable.run()
23    }
24}
25
26pub struct LocalExecutor<const N: usize> {
27    exec: Executor<N>,
28    // Prevent Send, Sync
29    _private: PhantomData<*mut ()>,
30}
31impl<const N: usize> LocalExecutor<N> {
32    pub const fn new(exec: Executor<N>) -> Self {
33        Self {
34            exec,
35            _private: PhantomData,
36        }
37    }
38
39    pub fn spawn<F>(&self, future: F)
40    where
41        F: Future + 'static,
42        F::Output: 'static,
43    {
44        unsafe { self.exec.spawn_local(future) }
45    }
46
47    pub fn stream(&self) -> LocalTaskStream<N> {
48        LocalTaskStream {
49            stream: self.exec.stream(),
50            _private: PhantomData,
51        }
52    }
53}
54
55pub struct LocalTaskStream<'a, const N: usize> {
56    stream: TaskStream<'a, N>,
57    // Prevent Send, Sync
58    _private: PhantomData<*mut ()>,
59}
60impl<'a, const N: usize> LocalTaskStream<'a, N> {
61    pub fn get_task(&self) -> Option<LocalRunnable> {
62        let runnable = self.stream.get_task()?;
63        Some(LocalRunnable::new(runnable))
64    }
65}
66impl<'a, const N: usize> Stream for LocalTaskStream<'a, N> {
67    type Item = LocalRunnable;
68    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
69        if let Poll::Ready(runnable) = Pin::new(&mut self.stream).poll_next(cx){
70            if let Some(runnable) = runnable {
71                Poll::Ready(Some(LocalRunnable::new(runnable)))
72            }else{
73                Poll::Ready(None)
74            }
75        }else{
76            Poll::Pending
77        }
78    }
79}