1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
//! The Screeps Async runtime

use crate::error::RuntimeError;
use crate::job::JobHandle;
use crate::utils::{game_time, time_used};
use crate::CURRENT;
use async_task::Runnable;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::future::Future;
use std::rc::Rc;
use std::sync::Mutex;
use std::task::Waker;

/// Builder to construct a [ScreepsRuntime]
pub struct Builder {
    config: Config,
}

impl Builder {
    /// Construct a new [Builder] with default settings
    pub fn new() -> Self {
        Self {
            config: Config::default(),
        }
    }

    /// Set what percentage of available CPU time the runtime should use per tick
    pub fn tick_time_allocation(mut self, dur: f64) -> Self {
        self.config.tick_time_allocation = dur;
        self
    }

    /// Build a [ScreepsRuntime]
    pub fn apply(self) {
        CURRENT.with_borrow_mut(|runtime| {
            *runtime = Some(ScreepsRuntime::new(self.config));
        })
    }
}

impl Default for Builder {
    fn default() -> Self {
        Self::new()
    }
}

/// Configuration options for the [ScreepsRuntime]
pub struct Config {
    /// Percentage of per-tick CPU time allowed to be used by the async runtime
    ///
    /// Specifically, the runtime will continue polling new futures as long as
    /// `[screeps::game::cpu::get_used] < tick_time_allocation * [screeps::game::cpu::tick_limit]`
    tick_time_allocation: f64,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            tick_time_allocation: 0.9,
        }
    }
}

/// A very basic futures executor based on a channel. When tasks are woken, they
/// are scheduled by queuing them in the send half of the channel. The executor
/// waits on the receive half and executes received tasks.
///
/// When a task is executed, the send half of the channel is passed along via
/// the task's Waker.
pub struct ScreepsRuntime {
    /// Receives scheduled tasks. When a task is scheduled, the associated future
    /// is ready to make progress. This usually happens when a resource the task
    /// uses becomes ready to perform an operation.
    scheduled: flume::Receiver<Runnable>,

    /// Send half of the scheduled channel.
    sender: flume::Sender<Runnable>,

    /// Stores [`Waker`]s used to wake tasks that are waiting for a specific game tick
    // TODO should this really be pub(crate)?
    pub(crate) timers: Rc<Mutex<TimerMap>>,

    /// Config for the runtime
    config: Config,

    /// Mutex used to ensure you don't block_on multiple futures simultaneously
    is_blocking: Mutex<()>,
}

impl ScreepsRuntime {
    /// Initialize a new runtime instance.
    ///
    /// Only one ScreepsRuntime may exist. Attempting to create a second one before the first is
    /// dropped with panic
    pub(crate) fn new(config: Config) -> Self {
        let (sender, scheduled) = flume::unbounded();

        let timers = Rc::new(Mutex::new(BTreeMap::new()));

        Self {
            scheduled,
            sender,
            timers,
            config,
            is_blocking: Mutex::new(()),
        }
    }

    /// Spawn a new async task that will be polled next time the scheduler runs
    pub fn spawn<F>(&self, future: F) -> JobHandle<F::Output>
    where
        F: Future + 'static,
    {
        let fut_res = Rc::new(RefCell::new(None));

        let future = {
            let fut_res = fut_res.clone();
            async move {
                let res = future.await;
                let mut fut_res = fut_res.borrow_mut();
                *fut_res = Some(res);
            }
        };

        let sender = self.sender.clone();
        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
            sender.send(runnable).unwrap();
        });

        runnable.schedule();

        JobHandle::new(fut_res, task)
    }

    /// The main entrypoint for the async runtime. Runs a future to completion.
    ///
    /// Returns [RuntimeError::DeadlockDetected] if blocking [Future] doesn't complete this tick
    ///
    /// # Panics
    ///
    /// Panics if another future is already being blocked on. You should `.await` the second future
    /// instead.
    pub fn block_on<F>(&self, future: F) -> Result<F::Output, RuntimeError>
    where
        F: Future + 'static,
    {
        let _guard = self
            .is_blocking
            .try_lock()
            .expect("Cannot block_on multiple futures at once. Please .await on the inner future");
        let handle = self.spawn(future);

        while !handle.is_complete() {
            if !self.try_poll_scheduled()? {
                return Err(RuntimeError::DeadlockDetected);
            }
        }

        Ok(handle.fut_res.take().unwrap())
    }

    /// Run the executor for one game tick
    ///
    /// This should generally be the last thing you call in your loop as by default the runtime
    /// will keep polling for work until 90% of this tick's CPU time has been exhausted.
    /// Thus, with enough scheduled work, this function will run for AT LEAST 90% of the tick time
    /// (90% + however long the last Future takes to poll)
    pub fn run(&self) -> Result<(), RuntimeError> {
        // Only need to call this once per tick since delay_ticks(0) will execute synchronously
        self.wake_timers();

        // Poll tasks until there are no more, or we get an error
        while self.try_poll_scheduled()? {}

        Ok(())
    }

    /// Attempts to poll the next scheduled task, ensuring that there is time left in the tick
    ///
    /// Returns [Ok(true)] if a task was successfully polled
    /// Returns [Ok(false)] if there are no tasks ready to poll
    /// Returns [Err] if we have run out of allocated time this tick
    pub(crate) fn try_poll_scheduled(&self) -> Result<bool, RuntimeError> {
        if time_used() > self.config.tick_time_allocation {
            return Err(RuntimeError::OutOfTime);
        }

        if let Ok(runnable) = self.scheduled.try_recv() {
            runnable.run();
            Ok(true)
        } else {
            Ok(false)
        }
    }

    fn wake_timers(&self) {
        let game_time = game_time();
        let mut timers = self.timers.try_lock().unwrap();

        let to_fire = {
            // Grab timers that are still in the future. `timers` is now all timers that need firing
            let mut pending = timers.split_off(&(game_time + 1));
            // Switcheroo pending/timers so that `timers` is all timers that are scheduled in the future
            std::mem::swap(&mut pending, &mut timers);
            pending
        };

        to_fire
            .into_values()
            .flatten()
            .flatten()
            .for_each(Waker::wake);
    }
}

type TimerMap = BTreeMap<u32, Vec<Option<Waker>>>;

#[cfg(test)]
mod tests {
    use super::*;
    use crate::error::RuntimeError::OutOfTime;
    use crate::tests::*;
    use crate::time::yield_now;
    use crate::{spawn, with_runtime};
    use std::cell::OnceCell;

    #[test]
    fn test_block_on() {
        init_test();

        let res = crate::block_on(async move {
            yield_now().await;
            1 + 2
        })
        .unwrap();

        assert_eq!(3, res);
    }

    #[test]
    fn test_spawn() {
        init_test();

        drop(spawn(async move {}));

        with_runtime(|runtime| {
            runtime
                .scheduled
                .try_recv()
                .expect("Failed to schedule task");
        })
    }

    #[test]
    fn test_run() {
        init_test();

        let has_run = Rc::new(OnceCell::new());
        {
            let has_run = has_run.clone();
            spawn(async move {
                has_run.set(()).unwrap();
            })
            .detach();
        }

        // task hasn't run yet
        assert!(has_run.get().is_none());

        crate::run().unwrap();

        // Future has been run
        assert!(has_run.get().is_some());
    }

    #[test]
    fn test_nested_spawn() {
        init_test();

        let has_run = Rc::new(OnceCell::new());
        {
            let has_run = has_run.clone();
            spawn(async move {
                let result = spawn(async move { 1 + 2 }).await;

                assert_eq!(3, result);

                has_run.set(()).unwrap();
            })
            .detach();
        }

        // task hasn't run yet
        assert!(has_run.get().is_none());

        crate::run().unwrap();

        // Future has been run
        assert!(has_run.get().is_some());
    }

    #[test]
    fn test_respects_time_remaining() {
        init_test();

        let has_run = Rc::new(OnceCell::new());
        {
            let has_run = has_run.clone();
            spawn(async move {
                has_run.set(()).unwrap();
            })
            .detach();
        }

        TIME_USED.with_borrow_mut(|t| *t = 0.95);

        // task hasn't run yet
        assert!(has_run.get().is_none());

        assert_eq!(Err(OutOfTime), crate::run());

        // Check future still hasn't run
        assert!(has_run.get().is_none());
    }
}