1
2mod task;
3
4use fumio_utils::park::Park;
5use futures_core::future::{Future, FutureObj, LocalFutureObj};
6use futures_core::task::{Spawn, LocalSpawn, SpawnError};
7use futures_executor::Enter;
8use futures_util::pin_mut;
9use std::rc::{Rc, Weak};
10use std::task::{Context, Poll};
11
12fn run_executor<P: Park, T, F: FnMut(&mut Context<'_>) -> Poll<T>>(park: &mut P, enter: &mut Enter, mut f: F) -> T {
15 let waker = park.waker();
16 let mut cx = Context::from_waker(&waker);
17
18 loop {
19 if let Poll::Ready(t) = f(&mut cx) {
20 return t;
21 }
22 park.park(enter, None);
23 }
24}
25
26#[derive(Debug)]
38pub struct LocalPool {
39 task_list: Rc<task::LocalTaskList>,
40}
41
42impl LocalPool {
43 pub fn new() -> Self {
45 Self {
46 task_list: Rc::new(task::LocalTaskList::new()),
47 }
48 }
49
50 pub fn spawner(&self) -> LocalSpawner {
52 LocalSpawner {
53 task_list: Rc::downgrade(&self.task_list)
54 }
55 }
56
57 pub fn run<P: Park>(&mut self, park: &mut P, enter: &mut Enter) {
62 run_executor(park, enter, |cx| self.poll_pool(cx))
63 }
64
65 pub fn run_until<P: Park, F: Future>(&mut self, park: &mut P, enter: &mut Enter, future: F) -> F::Output {
77 pin_mut!(future);
78
79 run_executor(park, enter, |cx| {
80 {
81 let result = future.as_mut().poll(cx);
83 if let Poll::Ready(output) = result {
84 return Poll::Ready(output);
85 }
86 }
87
88 let _ = self.poll_pool(cx);
89 Poll::Pending
90 })
91 }
92
93 pub fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
97 self.task_list.poll(cx)
98 }
99
100 pub fn spawn(&self, future: LocalFutureObj<'static, ()>) {
102 self.task_list.add_task(future);
103 }
104}
105
106impl Default for LocalPool {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl Spawn for LocalPool {
113 fn spawn_obj(
114 &mut self,
115 future: FutureObj<'static, ()>,
116 ) -> Result<(), SpawnError> {
117 self.spawn_local_obj(future.into())
118 }
119
120 fn status(&self) -> Result<(), SpawnError> {
121 self.status_local()
122 }
123}
124
125impl LocalSpawn for LocalPool {
126 fn spawn_local_obj(
127 &mut self,
128 future: LocalFutureObj<'static, ()>,
129 ) -> Result<(), SpawnError> {
130 self.spawn(future);
131 Ok(())
132 }
133
134 fn status_local(&self) -> Result<(), SpawnError> {
135 Ok(())
136 }
137}
138
139#[derive(Clone, Debug)]
142pub struct LocalSpawner {
143 task_list: Weak<task::LocalTaskList>,
144}
145
146impl LocalSpawner {
147 pub fn enter<F, T>(self, enter: &mut Enter, f: F) -> T
161 where
162 F: FnOnce(&mut Enter) -> T
163 {
164 crate::current::enter_local(self, enter, f)
165 }
166}
167
168impl Spawn for LocalSpawner {
169 fn spawn_obj(
170 &mut self,
171 future: FutureObj<'static, ()>,
172 ) -> Result<(), SpawnError> {
173 self.spawn_local_obj(future.into())
174 }
175
176 fn status(&self) -> Result<(), SpawnError> {
177 self.status_local()
178 }
179}
180
181impl LocalSpawn for LocalSpawner {
182 fn spawn_local_obj(
183 &mut self,
184 future: LocalFutureObj<'static, ()>,
185 ) -> Result<(), SpawnError> {
186 if let Some(task_list) = self.task_list.upgrade() {
187 task_list.add_task(future);
188 Ok(())
189 } else {
190 Err(SpawnError::shutdown())
191 }
192 }
193
194 fn status_local(&self) -> Result<(), SpawnError> {
195 if self.task_list.upgrade().is_some() {
196 Ok(())
197 } else {
198 Err(SpawnError::shutdown())
199 }
200 }
201}