Skip to main content

kompact/runtime/
scheduler.rs

1use super::*;
2use executors::*;
3use futures::FutureExt;
4
5/// API for a Kompact scheduler
6///
7/// Any scheduler implementation must implement this trait
8/// so it can be used with Kompact.
9///
10/// Usually that means implementing some kind of wrapper
11/// type for your particular scheduler, such as
12/// [ExecutorScheduler](runtime::ExecutorScheduler), for example.
13pub trait Scheduler: Send + Sync {
14    /// Schedule `c` to be run on this scheduler
15    ///
16    /// Implementations must call [`c.execute()`](CoreContainer::execute)
17    /// on the target thread.
18    fn schedule(&self, c: Arc<dyn CoreContainer>) -> ();
19
20    /// Shut this pool down asynchronously
21    ///
22    /// Implementations must eventually result in a correct
23    /// shutdown, even when called from within one of its own threads.
24    fn shutdown_async(&self) -> ();
25
26    /// Shut this pool down synchronously
27    ///
28    /// Implementations must only return when the pool
29    /// has been shut down, or upon an error.
30    fn shutdown(&self) -> Result<(), SchedulerShutdownError>;
31
32    /// Shut this pool down and complete once the pool has stopped.
33    ///
34    /// The default implementation delegates the existing blocking shutdown to a
35    /// blocking task. Scheduler implementations that can signal completion
36    /// without blocking should override this method.
37    fn shutdown_notify(
38        &self,
39    ) -> futures::future::BoxFuture<'static, Result<(), SchedulerShutdownError>> {
40        let scheduler = self.box_clone();
41        async move { async_std::task::spawn_blocking(move || scheduler.shutdown()).await }.boxed()
42    }
43
44    /// Clone an instance of this boxed
45    ///
46    /// Simply implement as `Box::new(self.clone())`.
47    ///
48    /// This is just a workaround for issues with boxed objects
49    /// and [Clone](std::clone::Clone) implementations.
50    fn box_clone(&self) -> Box<dyn Scheduler>;
51
52    /// Handle the system being poisoned
53    ///
54    /// Usually this should just cause the scheduler to be
55    /// shut down in an appropriate manner.
56    fn poison(&self) -> ();
57
58    /// Run a Future on this pool
59    fn spawn(&self, future: futures::future::BoxFuture<'static, ()>) -> ();
60}
61
62impl Clone for Box<dyn Scheduler> {
63    fn clone(&self) -> Self {
64        (*self).box_clone()
65    }
66}
67
68/// A wrapper for schedulers from the [executors](executors) crate
69#[derive(Clone)]
70pub struct ExecutorScheduler<E>
71where
72    E: FuturesExecutor + Sync,
73{
74    exec: E,
75}
76
77impl<E: FuturesExecutor + Sync + 'static> ExecutorScheduler<E> {
78    /// Produce a new `ExecutorScheduler` from an [Executor](executors::Executor) `E`.
79    pub fn with(exec: E) -> ExecutorScheduler<E> {
80        ExecutorScheduler { exec }
81    }
82
83    /// Produce a new boxed [Scheduler](runtime::Scheduler) from an [Executor](executors::Executor) `E`.
84    pub fn from(exec: E) -> Box<dyn Scheduler> {
85        Box::new(ExecutorScheduler::with(exec))
86    }
87}
88
89impl<E: FuturesExecutor + Sync + 'static> Scheduler for ExecutorScheduler<E> {
90    fn schedule(&self, c: Arc<dyn CoreContainer>) -> () {
91        self.exec.execute(move || maybe_reschedule(c));
92    }
93
94    fn shutdown_async(&self) -> () {
95        self.exec.shutdown_async()
96    }
97
98    fn shutdown(&self) -> Result<(), SchedulerShutdownError> {
99        self.exec
100            .shutdown_borrowed()
101            .map_err(SchedulerShutdownError::from)
102    }
103
104    fn box_clone(&self) -> Box<dyn Scheduler> {
105        Box::new(self.clone())
106    }
107
108    fn poison(&self) -> () {
109        self.exec.shutdown_async();
110    }
111
112    fn spawn(&self, future: futures::future::BoxFuture<'static, ()>) -> () {
113        let handle = self.exec.spawn(future);
114        handle.detach();
115    }
116}
117
118fn maybe_reschedule(c: Arc<dyn CoreContainer>) {
119    match c.execute() {
120        SchedulingDecision::Schedule => {
121            if cfg!(feature = "use_local_executor") {
122                let res = try_execute_locally(move || maybe_reschedule(c));
123                assert!(
124                    res.is_ok(),
125                    "Only run with Executors that can support local execute or remove the avoid_executor_lookups feature!"
126                );
127            } else {
128                let c2 = c.clone();
129                c.system().schedule(c2);
130            }
131        }
132        SchedulingDecision::Resume => maybe_reschedule(c),
133        _ => (),
134    }
135}