1pub use some_executor;
21
22mod channel;
23pub mod some_executor_adapter;
24
25use std::any::Any;
26use std::convert::Infallible;
27use std::fmt::Debug;
28use std::future::Future;
29use std::mem::forget;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, RawWaker, RawWakerVTable, Waker};
33use some_executor::{LocalExecutorExt, SomeLocalExecutor};
34use some_executor::observer::{FinishedObservation, Observer, ObserverNotified};
35use some_executor::task::{DynLocalSpawnedTask, DynSpawnedTask, TaskID};
36use crate::channel::{FindSlot, Sender};
37
38pub type Task<F,N> = some_executor::task::Task<F,N>;
39pub type Configuration = some_executor::task::Configuration;
40
41
42const VTABLE: RawWakerVTable = RawWakerVTable::new(
43 |data| {
44 let context = unsafe{
45 Arc::from_raw(data as *const WakeContext)
46 };
47 let cloned = Arc::into_raw(context.clone());
48 forget(context);
49 RawWaker::new(cloned as *const (), &VTABLE)
50 },
51 |data| {
52 let context = unsafe{
53 assert_send_sync::<WakeContext>();
56 Arc::from_raw(data as *const WakeContext)
57 };
58 match Arc::try_unwrap(context) {
59 Ok(context) => {
60 context.sender.send();
61 }
62 Err(context) => {
63 context.sender.send_by_ref();
64 }
65 }
66
67 },
68 |data| {
69 let context = unsafe{
70 Arc::from_raw(data as *const WakeContext)
71 };
72 context.sender.send_by_ref();
73 forget(context);
74 },
75 |data| {
76 unsafe{Arc::from_raw(data as *const WakeContext)}; }
78);
79
80fn assert_send_sync<T: Send + Sync>() {}
81
82struct WakeContext {
84 sender: Sender,
85}
86
87struct SubmittedTask<'task> {
88 task: Pin<Box<dyn DynLocalSpawnedTask<Executor<'task>> + 'task>>,
89 waker: Waker,
91 task_id: TaskID
92}
93
94impl Debug for SubmittedTask<'_> {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct("SubmittedTask")
97 .field("task_id", &self.task_id)
98 .finish()
99 }
100}
101
102struct SubmittedRemoteTask<'a> {
103 task: Pin<Box<dyn DynSpawnedTask<Executor<'a>>>>,
104 waker: Waker,
106 task_id: TaskID
107}
108
109impl Debug for SubmittedRemoteTask<'_> {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 f.debug_struct("SubmittedRemoteTask")
112 .field("task_id", &self.task_id)
113 .finish()
114 }
115}
116
117impl<'executor> SubmittedTask<'executor> {
118 fn new(task: Pin<Box<(dyn DynLocalSpawnedTask<Executor<'executor>> + 'executor)>>, sender: Sender) -> Self {
119 let context = Arc::new(WakeContext{
120 sender
121 });
122 let context_raw = Arc::into_raw(context);
123 let waker = unsafe {
124 assert_send_sync::<WakeContext>();
127 Waker::from_raw(RawWaker::new(context_raw as *const (), &VTABLE))
128 };
129 let task_id = task.task_id();
130 SubmittedTask {
131 task: task,
132 waker,
133 task_id,
134 }
135 }
136}
137
138impl<'task> SubmittedRemoteTask<'task> {
139 fn new(task: Pin<Box<dyn DynSpawnedTask<Executor<'task>>>>, sender: Sender) -> Self {
140 let context = Arc::new(WakeContext {
141 sender
142 });
143 let context_raw = Arc::into_raw(context);
144 let waker = unsafe {
145 assert_send_sync::<WakeContext>();
148 Waker::from_raw(RawWaker::new(context_raw as *const (), &VTABLE))
149 };
150 let task_id = task.task_id();
151 SubmittedRemoteTask {
152 task: task,
153 waker,
154 task_id,
155 }
156 }
157}
158
159
160
161#[derive(Debug)]
168pub struct Executor<'tasks> {
169
170 ready_for_poll: Vec<SubmittedTask<'tasks>>,
171 ready_for_poll_remote: Vec<SubmittedRemoteTask<'tasks>>,
172 waiting_for_wake: Vec<SubmittedTask<'tasks>>,
173 waiting_for_wake_remote: Vec<SubmittedRemoteTask<'tasks>>,
174
175 wake_receiver: Option<channel::Receiver>,
177
178 adapter_shared: Arc<some_executor_adapter::Shared>,
179}
180
181impl<'tasks> Executor<'tasks> {
182 pub fn new() -> Self {
186
187 let mut wake_receiver = channel::Receiver::new();
188 let adapter_shared = Arc::new(some_executor_adapter::Shared::new(&mut wake_receiver));
189 Executor {
190 ready_for_poll: Vec::new(),
191 ready_for_poll_remote: Vec::new(),
192 waiting_for_wake: Vec::new(),
193 waiting_for_wake_remote: Vec::new(),
194 wake_receiver: Some(wake_receiver),
195 adapter_shared,
196 }
197 }
198 pub fn do_some(&mut self) {
204
205 let receiver = self.wake_receiver.take().expect("Receiver is not available");
206
207 let attempt_tasks = self.ready_for_poll.drain(..).collect::<Vec<_>>();
208 let _interval = logwise::perfwarn_begin!("do_some does not currently sort tasks well");
209 for mut task in attempt_tasks {
210 let mut context = Context::from_waker(&task.waker);
211 let logwise_task = logwise::context::Context::new_task(Some(logwise::context::Context::current()), "single_threaded_executor::do_some");
212 let context_id = logwise_task.context_id();
213 logwise_task.set_current();
214 logwise::debuginternal_sync!("Polling task {id} {label}", id=logwise::privacy::IPromiseItsNotPrivate(task.task_id), label=task.task.label());
215 let e = task.task.as_mut().poll(&mut context, self,None);
216
217 match e {
218 std::task::Poll::Ready(_) => {
219 }
221 std::task::Poll::Pending => {
222 self.waiting_for_wake.push(task);
224 }
225 }
226 logwise::context::Context::pop(context_id);
227 }
228
229 let attempt_remote_tasks = self.ready_for_poll_remote.drain(..).collect::<Vec<_>>();
230 for mut task in attempt_remote_tasks {
231 let mut context = Context::from_waker(&task.waker);
232 let logwise_task = logwise::context::Context::new_task(Some(logwise::context::Context::current()), "single_threaded_executor::do_some");
233 let context_id = logwise_task.context_id();
234 logwise_task.set_current();
235 logwise::debuginternal_sync!("Polling task {id} {label}", id=logwise::privacy::IPromiseItsNotPrivate(task.task.task_id()), label=task.task.label());
236 let e = task.task.as_mut().poll(&mut context, None);
237
238 match e {
239 std::task::Poll::Ready(_) => {
240 }
242 std::task::Poll::Pending => {
243 self.waiting_for_wake_remote.push(task);
245 }
246 }
247 logwise::context::Context::pop(context_id);
248 }
249
250 self.wake_receiver = Some(receiver);
251 drop(_interval);
252 }
253 pub fn park_if_needed(&mut self) {
257 if !self.has_unfinished_tasks() { return }
258
259 if self.ready_for_poll.is_empty() && self.ready_for_poll_remote.is_empty() {
260 let mut receiver = self.wake_receiver.take().expect("Receiver is not available");
261 let task_id = receiver.recv_park();
262 match task_id {
263 FindSlot::FoundSlot(task_id) => {
264 {
265 let _interval = logwise::perfwarn_begin!("O(n) search for task_id");
266
267 if let Some(pos) = self.waiting_for_wake.iter().position(|x| x.task_id == task_id) {
268 let task = self.waiting_for_wake.remove(pos);
269 self.ready_for_poll.push(task);
270 }
271 else if let Some(pos) = self.waiting_for_wake_remote.iter().position(|x| x.task_id == task_id) {
272 let task = self.waiting_for_wake_remote.remove(pos);
273 self.ready_for_poll_remote.push(task);
274 }
275 else {
276 unreachable!("Task ID not found");
277 }
278 };
279
280 }
281 FindSlot::FoundTaskSubmitted => {
282 for task in self.adapter_shared.take_pending_tasks() {
283 let sender = Sender::with_receiver(&mut receiver, task.task_id());
284 self.ready_for_poll_remote.push(SubmittedRemoteTask::new(Box::into_pin(task), sender));
285 }
286 }
287 FindSlot::NoSlot => {
288 unreachable!("spurious wakeup")
289 }
290 }
291 self.wake_receiver = Some(receiver);
295 }
296 }
297 pub fn drain(mut self) {
303 while self.has_unfinished_tasks() {
304 self.do_some();
305 self.park_if_needed();
306 }
307 }
308
309 pub fn has_unfinished_tasks(&self) -> bool {
313
314 !self.ready_for_poll.is_empty()
315 || !self.ready_for_poll_remote.is_empty()
316 || !self.waiting_for_wake.is_empty()
317 || !self.waiting_for_wake_remote.is_empty()
318 || self.wake_receiver.as_ref().expect("No receiver").has_data()
319 }
320
321 fn enqueue_task(&mut self, task: Pin<Box<(dyn DynLocalSpawnedTask<Executor<'tasks>> + 'tasks)>>) {
322 if task.poll_after() < std::time::Instant::now() {
323 let sender = Sender::with_receiver(self.wake_receiver.as_mut().expect("Receiver is not available"), task.task_id());
324 self.ready_for_poll.push(SubmittedTask::new(task, sender));
325 }
326 else {
327 todo!("Not yet implemented")
328 }
329 }
330
331 pub(crate) fn adapter_shared(&self) -> &Arc<some_executor_adapter::Shared> {
332 &self.adapter_shared
333 }
334}
335
336impl<'future> SomeLocalExecutor<'future> for Executor<'future> {
344 type ExecutorNotifier = Infallible;
345
346 fn spawn_local<F: Future, Notifier: ObserverNotified<F::Output>>(&mut self, task: Task<F, Notifier>) -> impl Observer<Value=F::Output>
347 where
348 Self: Sized,
349 F: 'future,
350 <F as Future>::Output: Unpin,
351 <F as Future>::Output: 'static,
352 {
353 let (task,observer) = task.spawn_local(self);
354 self.enqueue_task(Box::pin(task));
355 observer
356 }
357
358 fn spawn_local_async<F: Future, Notifier: ObserverNotified<F::Output>>(&mut self, task: Task<F, Notifier>) -> impl Future<Output=impl Observer<Value=F::Output>>
359 where
360 Self: Sized,
361 F: 'future,
362 <F as Future>::Output: 'static,
363 {
364 async {
365 let (spawn,observer) = task.spawn_local(self);
366 self.enqueue_task(Box::pin(spawn));
367 observer
368 }
369 }
370 fn spawn_local_objsafe(&mut self, task: Task<Pin<Box<dyn Future<Output=Box<dyn Any>>>>, Box<dyn ObserverNotified<(dyn Any + 'static)>>>) -> Box<dyn Observer<Value=Box<dyn Any>, Output = FinishedObservation<Box<dyn Any>>>> {
371 let (spawn, observer) = task.spawn_local_objsafe(self);
372 self.enqueue_task(Box::pin(spawn));
373 Box::new(observer) as Box<dyn Observer<Value=Box<dyn Any>, Output = FinishedObservation<Box<dyn Any>>>>
374 }
375
376 fn spawn_local_objsafe_async<'s>(&'s mut self, task: Task<Pin<Box<dyn Future<Output=Box<dyn Any>>>>, Box<dyn ObserverNotified<(dyn Any + 'static)>>>) -> Box<dyn Future<Output=Box<dyn Observer<Value=Box<dyn Any>, Output = FinishedObservation<Box<dyn Any>>>>> + 's> {
377 Box::new(async {
378 let (spawn, observer) = task.spawn_local_objsafe(self);
379 self.enqueue_task(Box::pin(spawn));
380 Box::new(observer) as Box<dyn Observer<Value=Box<dyn Any>, Output = FinishedObservation<Box<dyn Any>>>>
381 })
382 }
383
384 fn executor_notifier(&mut self) -> Option<Self::ExecutorNotifier> {
385 None
386 }
387}
388
389
390impl<'tasks> LocalExecutorExt<'tasks> for Executor<'tasks> {
391
392}
393
394impl Default for Executor<'_> {
402 fn default() -> Self {
403 Self::new()
404 }
405}
406
407#[cfg(test)] mod tests {
411 use std::future::Future;
412 use std::pin::Pin;
413 use some_executor::observer::{Observation};
414 use some_executor::SomeLocalExecutor;
415 use some_executor::task::{Configuration, Task};
416 use crate::{Executor};
417 use some_executor::observer::Observer;
418
419 struct PollCounter(u8);
420 impl Future for PollCounter {
421 type Output = ();
422 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
423 let this = self.get_mut();
424 this.0 += 1;
425 cx.waker().clone().wake();
426 if this.0 < 10 {
427 std::task::Poll::Pending
428 }
429 else {
430 std::task::Poll::Ready(())
431 }
432 }
433 }
434
435 #[test] fn test_do_empty() {
436 let mut executor = Executor::new();
437 executor.do_some();
438 }
439
440 #[test] fn test_do() {
441 let mut executor = Executor::new();
442 let counter = PollCounter(0);
443 let task = Task::without_notifications("test_do".to_string(), counter, Configuration::default());
444
445 let observer = executor.spawn_local(task);
446 for _ in 0..9 {
447 executor.do_some();
448 assert_eq!(observer.observe(), Observation::Pending);
449 executor.park_if_needed();
450 }
451
452 }
453}
454