some_local_executor/
lib.rs

1//SPDX-License-Identifier: MIT OR Apache-2.0
2/*!
3It's a simple single-threaded executor.
4
5This is a reference executor for the [some_executor](https://sealedabstract.com/code/some_executor) crate.
6
7By leveraging the features in `some_executor`, this project provides a rich API:
81.  Tasks and cancellation
92.  Observers and notifications
103.  Priorities and scheduling
114.  task-locals
12
13...in about a page of code.
14
15By writing code against this executor, you're basically compatible with every other executor supported
16by `some_executor`.
17*/
18
19//exports the some_executor crate used in this one
20pub use some_executor;
21
22mod channel;
23pub mod some_executor_adapter;
24
25use std::any::Any;
26use std::convert::Infallible;
27use std::fmt::Debug;
28use std::future::Future;
29use std::mem::forget;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, RawWaker, RawWakerVTable, Waker};
33use some_executor::{LocalExecutorExt, SomeLocalExecutor};
34use some_executor::observer::{FinishedObservation, Observer, ObserverNotified};
35use some_executor::task::{DynLocalSpawnedTask, DynSpawnedTask, TaskID};
36use crate::channel::{FindSlot, Sender};
37
38pub type Task<F,N> = some_executor::task::Task<F,N>;
39pub type Configuration = some_executor::task::Configuration;
40
41
42const VTABLE: RawWakerVTable = RawWakerVTable::new(
43    |data| {
44        let context = unsafe{
45            Arc::from_raw(data as *const WakeContext)
46        };
47        let cloned = Arc::into_raw(context.clone());
48        forget(context);
49        RawWaker::new(cloned as *const (), &VTABLE)
50    },
51    |data| {
52        let context = unsafe{
53            // we effectively rely on the Send/Sync property of WakeContext here.
54            // fortunately we can check it at compile time
55            assert_send_sync::<WakeContext>();
56            Arc::from_raw(data as *const WakeContext)
57        };
58        match Arc::try_unwrap(context) {
59            Ok(context) => {
60                context.sender.send();
61            }
62            Err(context) => {
63                context.sender.send_by_ref();
64            }
65        }
66
67    },
68    |data| {
69       let context = unsafe{
70            Arc::from_raw(data as *const WakeContext)
71        };
72        context.sender.send_by_ref();
73        forget(context);
74    },
75    |data| {
76        unsafe{Arc::from_raw(data as *const WakeContext)}; //drop the arc
77    }
78);
79
80fn assert_send_sync<T: Send + Sync>() {}
81
82//wrapped in Arc
83struct WakeContext {
84    sender: Sender,
85}
86
87struct SubmittedTask<'task> {
88    task: Pin<Box<dyn DynLocalSpawnedTask<Executor<'task>> + 'task>>,
89    ///Providing a stable Waker for each task is optimal.
90    waker: Waker,
91    task_id: TaskID
92}
93
94impl Debug for SubmittedTask<'_> {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.debug_struct("SubmittedTask")
97            .field("task_id", &self.task_id)
98            .finish()
99    }
100}
101
102struct SubmittedRemoteTask<'a> {
103    task: Pin<Box<dyn DynSpawnedTask<Executor<'a>>>>,
104    ///Providing a stable Waker for each task is optimal.
105    waker: Waker,
106    task_id: TaskID
107}
108
109impl Debug for SubmittedRemoteTask<'_> {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        f.debug_struct("SubmittedRemoteTask")
112            .field("task_id", &self.task_id)
113            .finish()
114    }
115}
116
117impl<'executor> SubmittedTask<'executor> {
118    fn new(task: Pin<Box<(dyn DynLocalSpawnedTask<Executor<'executor>> + 'executor)>>, sender: Sender) -> Self {
119        let context = Arc::new(WakeContext{
120            sender
121        });
122        let context_raw = Arc::into_raw(context);
123        let waker = unsafe {
124            // we effectively rely on the Send/Sync property of WakeContext here.
125            // fortunately we can check it at compile time
126            assert_send_sync::<WakeContext>();
127            Waker::from_raw(RawWaker::new(context_raw as *const (), &VTABLE))
128        };
129        let task_id = task.task_id();
130        SubmittedTask {
131            task: task,
132            waker,
133            task_id,
134        }
135    }
136}
137
138impl<'task> SubmittedRemoteTask<'task> {
139    fn new(task: Pin<Box<dyn DynSpawnedTask<Executor<'task>>>>, sender: Sender) -> Self {
140        let context = Arc::new(WakeContext {
141            sender
142        });
143        let context_raw = Arc::into_raw(context);
144        let waker = unsafe {
145            // we effectively rely on the Send/Sync property of WakeContext here.
146            // fortunately we can check it at compile time
147            assert_send_sync::<WakeContext>();
148            Waker::from_raw(RawWaker::new(context_raw as *const (), &VTABLE))
149        };
150        let task_id = task.task_id();
151        SubmittedRemoteTask {
152            task: task,
153            waker,
154            task_id,
155        }
156    }
157}
158
159
160
161/**
162The main executor type.
163
164The lifetime parameter `'tasks` is the lifetime of the tasks that are spawned on this executor, that is
165the lifetime of any data they may borrow, in case they are spawned with a reference to that data.
166*/
167#[derive(Debug)]
168pub struct Executor<'tasks> {
169
170    ready_for_poll: Vec<SubmittedTask<'tasks>>,
171    ready_for_poll_remote: Vec<SubmittedRemoteTask<'tasks>>,
172    waiting_for_wake: Vec<SubmittedTask<'tasks>>,
173    waiting_for_wake_remote: Vec<SubmittedRemoteTask<'tasks>>,
174
175    //slot so we can take
176    wake_receiver: Option<channel::Receiver>,
177
178    adapter_shared: Arc<some_executor_adapter::Shared>,
179}
180
181impl<'tasks> Executor<'tasks> {
182    /**
183    Creates a new executor.
184    */
185    pub fn new() -> Self {
186
187        let mut wake_receiver = channel::Receiver::new();
188        let adapter_shared = Arc::new(some_executor_adapter::Shared::new(&mut wake_receiver));
189        Executor {
190            ready_for_poll: Vec::new(),
191            ready_for_poll_remote: Vec::new(),
192            waiting_for_wake: Vec::new(),
193            waiting_for_wake_remote: Vec::new(),
194            wake_receiver: Some(wake_receiver),
195            adapter_shared,
196        }
197    }
198    /**
199    Runs the executor until there is no more immediate work to be performed.
200
201    It is intended ot be called in a loop with [Self::park_if_needed].
202    */
203    pub fn do_some(&mut self)  {
204
205        let receiver = self.wake_receiver.take().expect("Receiver is not available");
206
207        let attempt_tasks = self.ready_for_poll.drain(..).collect::<Vec<_>>();
208        let _interval = logwise::perfwarn_begin!("do_some does not currently sort tasks well");
209        for mut task in attempt_tasks {
210            let mut context = Context::from_waker(&task.waker);
211            let logwise_task = logwise::context::Context::new_task(Some(logwise::context::Context::current()), "single_threaded_executor::do_some");
212            let context_id = logwise_task.context_id();
213            logwise_task.set_current();
214            logwise::debuginternal_sync!("Polling task {id} {label}", id=logwise::privacy::IPromiseItsNotPrivate(task.task_id), label=task.task.label());
215            let e = task.task.as_mut().poll(&mut context, self,None);
216
217            match e {
218                std::task::Poll::Ready(_) => {
219                    // do nothing; drop the future
220                }
221                std::task::Poll::Pending => {
222                    //need to retain the task
223                    self.waiting_for_wake.push(task);
224                }
225            }
226            logwise::context::Context::pop(context_id);
227        }
228
229        let attempt_remote_tasks = self.ready_for_poll_remote.drain(..).collect::<Vec<_>>();
230        for mut task in attempt_remote_tasks {
231            let mut context = Context::from_waker(&task.waker);
232            let logwise_task = logwise::context::Context::new_task(Some(logwise::context::Context::current()), "single_threaded_executor::do_some");
233            let context_id = logwise_task.context_id();
234            logwise_task.set_current();
235            logwise::debuginternal_sync!("Polling task {id} {label}", id=logwise::privacy::IPromiseItsNotPrivate(task.task.task_id()), label=task.task.label());
236            let e = task.task.as_mut().poll(&mut context, None);
237
238            match e {
239                std::task::Poll::Ready(_) => {
240                    // do nothing; drop the future
241                }
242                std::task::Poll::Pending => {
243                    //need to retain the task
244                    self.waiting_for_wake_remote.push(task);
245                }
246            }
247            logwise::context::Context::pop(context_id);
248        }
249
250        self.wake_receiver = Some(receiver);
251        drop(_interval);
252    }
253    /**
254    Parks the thread if there are no tasks to be performed, until tasks are ready to be performed again.
255*/
256    pub fn park_if_needed(&mut self) {
257        if !self.has_unfinished_tasks() { return }
258
259        if self.ready_for_poll.is_empty() && self.ready_for_poll_remote.is_empty() {
260            let mut receiver = self.wake_receiver.take().expect("Receiver is not available");
261            let task_id = receiver.recv_park();
262            match task_id {
263                FindSlot::FoundSlot(task_id) => {
264                    {
265                        let _interval = logwise::perfwarn_begin!("O(n) search for task_id");
266
267                        if let Some(pos) = self.waiting_for_wake.iter().position(|x| x.task_id == task_id) {
268                            let task = self.waiting_for_wake.remove(pos);
269                            self.ready_for_poll.push(task);
270                        }
271                        else if let Some(pos) = self.waiting_for_wake_remote.iter().position(|x| x.task_id == task_id) {
272                            let task = self.waiting_for_wake_remote.remove(pos);
273                            self.ready_for_poll_remote.push(task);
274                        }
275                        else {
276                            unreachable!("Task ID not found");
277                        }
278                    };
279
280                }
281                FindSlot::FoundTaskSubmitted => {
282                    for task in self.adapter_shared.take_pending_tasks() {
283                        let sender = Sender::with_receiver(&mut receiver, task.task_id());
284                        self.ready_for_poll_remote.push(SubmittedRemoteTask::new(Box::into_pin(task), sender));
285                    }
286                }
287                FindSlot::NoSlot => {
288                    unreachable!("spurious wakeup")
289                }
290            }
291            //remove the first value from waiting_for_wake that has the same task_id
292
293
294            self.wake_receiver = Some(receiver);
295        }
296    }
297    /**
298    Drains the executor. After this call, the executor can no longer be used.
299
300    This function will return when all spawned tasks complete.
301*/
302    pub fn drain(mut self) {
303        while self.has_unfinished_tasks() {
304            self.do_some();
305            self.park_if_needed();
306        }
307    }
308
309    /**
310    Returns true if there are tasks that are not yet complete.
311    */
312    pub fn has_unfinished_tasks(&self) -> bool {
313
314        !self.ready_for_poll.is_empty()
315            || !self.ready_for_poll_remote.is_empty()
316            || !self.waiting_for_wake.is_empty()
317            || !self.waiting_for_wake_remote.is_empty()
318            || self.wake_receiver.as_ref().expect("No receiver").has_data()
319    }
320
321    fn enqueue_task(&mut self, task: Pin<Box<(dyn DynLocalSpawnedTask<Executor<'tasks>> + 'tasks)>>) {
322        if task.poll_after() < std::time::Instant::now() {
323            let sender = Sender::with_receiver(self.wake_receiver.as_mut().expect("Receiver is not available"), task.task_id());
324            self.ready_for_poll.push(SubmittedTask::new(task, sender));
325        }
326        else {
327            todo!("Not yet implemented")
328        }
329    }
330
331    pub(crate) fn adapter_shared(&self) -> &Arc<some_executor_adapter::Shared> {
332        &self.adapter_shared
333    }
334}
335
336/**
337Implementation of the `SomeLocalExecutor` trait from the [some_executor](http://sealedabstract.com/code/some_executor) project.
338
339This is the main interface to spawn tasks onto the executor.
340
341For details on this trait, see [SomeLocalExecutor].
342*/
343impl<'future> SomeLocalExecutor<'future> for Executor<'future> {
344    type ExecutorNotifier = Infallible;
345
346    fn spawn_local<F: Future, Notifier: ObserverNotified<F::Output>>(&mut self, task: Task<F, Notifier>) -> impl Observer<Value=F::Output>
347    where
348        Self: Sized,
349        F: 'future,
350        <F as Future>::Output: Unpin,
351        <F as Future>::Output: 'static,
352    {
353        let (task,observer) = task.spawn_local(self);
354        self.enqueue_task(Box::pin(task));
355        observer
356    }
357
358    fn spawn_local_async<F: Future, Notifier: ObserverNotified<F::Output>>(&mut self, task: Task<F, Notifier>) -> impl Future<Output=impl Observer<Value=F::Output>>
359    where
360        Self: Sized,
361        F: 'future,
362        <F as Future>::Output: 'static,
363    {
364        async {
365            let (spawn,observer)  = task.spawn_local(self);
366            self.enqueue_task(Box::pin(spawn));
367            observer
368        }
369    }
370    fn spawn_local_objsafe(&mut self, task: Task<Pin<Box<dyn Future<Output=Box<dyn Any>>>>, Box<dyn ObserverNotified<(dyn Any + 'static)>>>) -> Box<dyn Observer<Value=Box<dyn Any>, Output = FinishedObservation<Box<dyn Any>>>> {
371        let (spawn, observer) = task.spawn_local_objsafe(self);
372        self.enqueue_task(Box::pin(spawn));
373        Box::new(observer) as Box<dyn Observer<Value=Box<dyn Any>, Output = FinishedObservation<Box<dyn Any>>>>
374    }
375
376    fn spawn_local_objsafe_async<'s>(&'s mut self, task: Task<Pin<Box<dyn Future<Output=Box<dyn Any>>>>, Box<dyn ObserverNotified<(dyn Any + 'static)>>>) -> Box<dyn Future<Output=Box<dyn Observer<Value=Box<dyn Any>, Output = FinishedObservation<Box<dyn Any>>>>> + 's> {
377        Box::new(async {
378            let (spawn, observer) = task.spawn_local_objsafe(self);
379            self.enqueue_task(Box::pin(spawn));
380            Box::new(observer) as Box<dyn Observer<Value=Box<dyn Any>, Output = FinishedObservation<Box<dyn Any>>>>
381        })
382    }
383
384    fn executor_notifier(&mut self) -> Option<Self::ExecutorNotifier> {
385        None
386    }
387}
388
389
390impl<'tasks> LocalExecutorExt<'tasks> for Executor<'tasks> {
391
392}
393
394/*executor boilerplate
395
396I think we don't especially want clone, eq, hash, ord
397
398Default makes sense
399 */
400
401impl Default for Executor<'_> {
402    fn default() -> Self {
403        Self::new()
404    }
405}
406
407//I think it's not really possible to do from for a future, we need to return the observer type.
408
409
410#[cfg(test)] mod tests {
411    use std::future::Future;
412    use std::pin::Pin;
413    use some_executor::observer::{Observation};
414    use some_executor::SomeLocalExecutor;
415    use some_executor::task::{Configuration, Task};
416    use crate::{Executor};
417    use some_executor::observer::Observer;
418
419    struct PollCounter(u8);
420    impl Future for PollCounter {
421        type Output = ();
422        fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
423            let this = self.get_mut();
424            this.0 += 1;
425            cx.waker().clone().wake();
426            if this.0 < 10 {
427                std::task::Poll::Pending
428            }
429            else {
430                std::task::Poll::Ready(())
431            }
432        }
433    }
434
435    #[test] fn test_do_empty() {
436        let mut executor = Executor::new();
437        executor.do_some();
438    }
439
440    #[test] fn test_do() {
441        let mut executor = Executor::new();
442        let counter = PollCounter(0);
443        let task = Task::without_notifications("test_do".to_string(), counter, Configuration::default());
444
445        let observer = executor.spawn_local(task);
446        for _ in 0..9 {
447            executor.do_some();
448            assert_eq!(observer.observe(), Observation::Pending);
449            executor.park_if_needed();
450        }
451
452    }
453}
454