screeps_async/
runtime.rs

1//! The Screeps Async runtime
2
3use crate::error::RuntimeError;
4use crate::job::JobHandle;
5use crate::utils::{game_time, time_used};
6use crate::CURRENT;
7use async_task::Runnable;
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::future::Future;
11use std::rc::Rc;
12use std::sync::Mutex;
13use std::task::Waker;
14
15/// Builder to construct a [ScreepsRuntime]
16pub struct Builder {
17    config: Config,
18}
19
20impl Builder {
21    /// Construct a new [Builder] with default settings
22    pub fn new() -> Self {
23        Self {
24            config: Config::default(),
25        }
26    }
27
28    /// Set what percentage of available CPU time the runtime should use per tick
29    pub fn tick_time_allocation(mut self, dur: f64) -> Self {
30        self.config.tick_time_allocation = dur;
31        self
32    }
33
34    /// Build a [ScreepsRuntime]
35    pub fn apply(self) {
36        CURRENT.with_borrow_mut(|runtime| {
37            *runtime = Some(ScreepsRuntime::new(self.config));
38        })
39    }
40}
41
42impl Default for Builder {
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48/// Configuration options for the [ScreepsRuntime]
49pub struct Config {
50    /// Percentage of per-tick CPU time allowed to be used by the async runtime
51    ///
52    /// Specifically, the runtime will continue polling new futures as long as
53    /// `[screeps::game::cpu::get_used] < tick_time_allocation * [screeps::game::cpu::tick_limit]`
54    tick_time_allocation: f64,
55}
56
57impl Default for Config {
58    fn default() -> Self {
59        Self {
60            tick_time_allocation: 0.9,
61        }
62    }
63}
64
65/// A very basic futures executor based on a channel. When tasks are woken, they
66/// are scheduled by queuing them in the send half of the channel. The executor
67/// waits on the receive half and executes received tasks.
68///
69/// When a task is executed, the send half of the channel is passed along via
70/// the task's Waker.
71pub struct ScreepsRuntime {
72    /// Receives scheduled tasks. When a task is scheduled, the associated future
73    /// is ready to make progress. This usually happens when a resource the task
74    /// uses becomes ready to perform an operation.
75    scheduled: flume::Receiver<Runnable>,
76
77    /// Send half of the scheduled channel.
78    sender: flume::Sender<Runnable>,
79
80    /// Stores [`Waker`]s used to wake tasks that are waiting for a specific game tick
81    // TODO should this really be pub(crate)?
82    pub(crate) timers: Rc<Mutex<TimerMap>>,
83
84    /// Config for the runtime
85    config: Config,
86
87    /// Mutex used to ensure you don't block_on multiple futures simultaneously
88    is_blocking: Mutex<()>,
89}
90
91impl ScreepsRuntime {
92    /// Initialize a new runtime instance.
93    ///
94    /// Only one ScreepsRuntime may exist. Attempting to create a second one before the first is
95    /// dropped with panic
96    pub(crate) fn new(config: Config) -> Self {
97        let (sender, scheduled) = flume::unbounded();
98
99        let timers = Rc::new(Mutex::new(BTreeMap::new()));
100
101        Self {
102            scheduled,
103            sender,
104            timers,
105            config,
106            is_blocking: Mutex::new(()),
107        }
108    }
109
110    /// Spawn a new async task that will be polled next time the scheduler runs
111    pub fn spawn<F>(&self, future: F) -> JobHandle<F::Output>
112    where
113        F: Future + 'static,
114    {
115        let fut_res = Rc::new(RefCell::new(None));
116
117        let future = {
118            let fut_res = fut_res.clone();
119            async move {
120                let res = future.await;
121                let mut fut_res = fut_res.borrow_mut();
122                *fut_res = Some(res);
123            }
124        };
125
126        let sender = self.sender.clone();
127        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
128            // Don't try to send if disconnected, this only happens when runtime is being dropped
129            if !sender.is_disconnected() {
130                sender.send(runnable).unwrap();
131            }
132        });
133
134        runnable.schedule();
135
136        JobHandle::new(fut_res, task)
137    }
138
139    /// The main entrypoint for the async runtime. Runs a future to completion.
140    ///
141    /// Returns [RuntimeError::DeadlockDetected] if blocking [Future] doesn't complete this tick
142    ///
143    /// # Panics
144    ///
145    /// Panics if another future is already being blocked on. You should `.await` the second future
146    /// instead.
147    pub fn block_on<F>(&self, future: F) -> Result<F::Output, RuntimeError>
148    where
149        F: Future + 'static,
150    {
151        let _guard = self
152            .is_blocking
153            .try_lock()
154            .expect("Cannot block_on multiple futures at once. Please .await on the inner future");
155        let handle = self.spawn(future);
156
157        while !handle.is_complete() {
158            if !self.try_poll_scheduled()? {
159                return Err(RuntimeError::DeadlockDetected);
160            }
161        }
162
163        Ok(handle.fut_res.take().unwrap())
164    }
165
166    /// Run the executor for one game tick
167    ///
168    /// This should generally be the last thing you call in your loop as by default the runtime
169    /// will keep polling for work until 90% of this tick's CPU time has been exhausted.
170    /// Thus, with enough scheduled work, this function will run for AT LEAST 90% of the tick time
171    /// (90% + however long the last Future takes to poll)
172    pub fn run(&self) -> Result<(), RuntimeError> {
173        // Only need to call this once per tick since delay_ticks(0) will execute synchronously
174        self.wake_timers();
175
176        // Poll tasks until there are no more, or we get an error
177        while self.try_poll_scheduled()? {}
178
179        Ok(())
180    }
181
182    /// Attempts to poll the next scheduled task, ensuring that there is time left in the tick
183    ///
184    /// Returns [Ok(true)] if a task was successfully polled
185    /// Returns [Ok(false)] if there are no tasks ready to poll
186    /// Returns [Err] if we have run out of allocated time this tick
187    pub(crate) fn try_poll_scheduled(&self) -> Result<bool, RuntimeError> {
188        if time_used() > self.config.tick_time_allocation {
189            return Err(RuntimeError::OutOfTime);
190        }
191
192        if let Ok(runnable) = self.scheduled.try_recv() {
193            runnable.run();
194            Ok(true)
195        } else {
196            Ok(false)
197        }
198    }
199
200    fn wake_timers(&self) {
201        let game_time = game_time();
202        let mut timers = self.timers.try_lock().unwrap();
203
204        let to_fire = {
205            // Grab timers that are still in the future. `timers` is now all timers that need firing
206            let mut pending = timers.split_off(&(game_time + 1));
207            // Switcheroo pending/timers so that `timers` is all timers that are scheduled in the future
208            std::mem::swap(&mut pending, &mut timers);
209            pending
210        };
211
212        to_fire
213            .into_values()
214            .flatten()
215            .flatten()
216            .for_each(Waker::wake);
217    }
218}
219
220type TimerMap = BTreeMap<u32, Vec<Option<Waker>>>;
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use crate::error::RuntimeError::OutOfTime;
226    use crate::tests::*;
227    use crate::time::yield_now;
228    use crate::{spawn, with_runtime};
229    use std::cell::OnceCell;
230
231    #[test]
232    fn test_block_on() {
233        init_test();
234
235        let res = crate::block_on(async move {
236            yield_now().await;
237            1 + 2
238        })
239        .unwrap();
240
241        assert_eq!(3, res);
242    }
243
244    #[test]
245    fn test_spawn() {
246        init_test();
247
248        drop(spawn(async move {}));
249
250        with_runtime(|runtime| {
251            runtime
252                .scheduled
253                .try_recv()
254                .expect("Failed to schedule task");
255        })
256    }
257
258    #[test]
259    fn test_run() {
260        init_test();
261
262        let has_run = Rc::new(OnceCell::new());
263        {
264            let has_run = has_run.clone();
265            spawn(async move {
266                has_run.set(()).unwrap();
267            })
268            .detach();
269        }
270
271        // task hasn't run yet
272        assert!(has_run.get().is_none());
273
274        crate::run().unwrap();
275
276        // Future has been run
277        assert!(has_run.get().is_some());
278    }
279
280    #[test]
281    fn test_nested_spawn() {
282        init_test();
283
284        let has_run = Rc::new(OnceCell::new());
285        {
286            let has_run = has_run.clone();
287            spawn(async move {
288                let result = spawn(async move { 1 + 2 }).await;
289
290                assert_eq!(3, result);
291
292                has_run.set(()).unwrap();
293            })
294            .detach();
295        }
296
297        // task hasn't run yet
298        assert!(has_run.get().is_none());
299
300        crate::run().unwrap();
301
302        // Future has been run
303        assert!(has_run.get().is_some());
304    }
305
306    #[test]
307    fn test_respects_time_remaining() {
308        init_test();
309
310        let has_run = Rc::new(OnceCell::new());
311        {
312            let has_run = has_run.clone();
313            spawn(async move {
314                has_run.set(()).unwrap();
315            })
316            .detach();
317        }
318
319        TIME_USED.with_borrow_mut(|t| *t = 0.95);
320
321        // task hasn't run yet
322        assert!(has_run.get().is_none());
323
324        assert_eq!(Err(OutOfTime), crate::run());
325
326        // Check future still hasn't run
327        assert!(has_run.get().is_none());
328    }
329}