kompact/runtime/
scheduler.rs1use super::*;
2use executors::*;
3use futures::FutureExt;
4
5pub trait Scheduler: Send + Sync {
14 fn schedule(&self, c: Arc<dyn CoreContainer>) -> ();
19
20 fn shutdown_async(&self) -> ();
25
26 fn shutdown(&self) -> Result<(), SchedulerShutdownError>;
31
32 fn shutdown_notify(
38 &self,
39 ) -> futures::future::BoxFuture<'static, Result<(), SchedulerShutdownError>> {
40 let scheduler = self.box_clone();
41 async move { async_std::task::spawn_blocking(move || scheduler.shutdown()).await }.boxed()
42 }
43
44 fn box_clone(&self) -> Box<dyn Scheduler>;
51
52 fn poison(&self) -> ();
57
58 fn spawn(&self, future: futures::future::BoxFuture<'static, ()>) -> ();
60}
61
62impl Clone for Box<dyn Scheduler> {
63 fn clone(&self) -> Self {
64 (*self).box_clone()
65 }
66}
67
68#[derive(Clone)]
70pub struct ExecutorScheduler<E>
71where
72 E: FuturesExecutor + Sync,
73{
74 exec: E,
75}
76
77impl<E: FuturesExecutor + Sync + 'static> ExecutorScheduler<E> {
78 pub fn with(exec: E) -> ExecutorScheduler<E> {
80 ExecutorScheduler { exec }
81 }
82
83 pub fn from(exec: E) -> Box<dyn Scheduler> {
85 Box::new(ExecutorScheduler::with(exec))
86 }
87}
88
89impl<E: FuturesExecutor + Sync + 'static> Scheduler for ExecutorScheduler<E> {
90 fn schedule(&self, c: Arc<dyn CoreContainer>) -> () {
91 self.exec.execute(move || maybe_reschedule(c));
92 }
93
94 fn shutdown_async(&self) -> () {
95 self.exec.shutdown_async()
96 }
97
98 fn shutdown(&self) -> Result<(), SchedulerShutdownError> {
99 self.exec
100 .shutdown_borrowed()
101 .map_err(SchedulerShutdownError::from)
102 }
103
104 fn box_clone(&self) -> Box<dyn Scheduler> {
105 Box::new(self.clone())
106 }
107
108 fn poison(&self) -> () {
109 self.exec.shutdown_async();
110 }
111
112 fn spawn(&self, future: futures::future::BoxFuture<'static, ()>) -> () {
113 let handle = self.exec.spawn(future);
114 handle.detach();
115 }
116}
117
118fn maybe_reschedule(c: Arc<dyn CoreContainer>) {
119 match c.execute() {
120 SchedulingDecision::Schedule => {
121 if cfg!(feature = "use_local_executor") {
122 let res = try_execute_locally(move || maybe_reschedule(c));
123 assert!(
124 res.is_ok(),
125 "Only run with Executors that can support local execute or remove the avoid_executor_lookups feature!"
126 );
127 } else {
128 let c2 = c.clone();
129 c.system().schedule(c2);
130 }
131 }
132 SchedulingDecision::Resume => maybe_reschedule(c),
133 _ => (),
134 }
135}