piccolo/thread/
executor.rs

1use std::hash::{Hash, Hasher};
2
3use allocator_api2::vec;
4use gc_arena::{allocator_api::MetricsAlloc, lock::RefLock, Collect, Gc, Mutation};
5use thiserror::Error;
6
7use crate::{
8    compiler::{FunctionRef, LineNumber},
9    BadThreadMode, CallbackReturn, Context, Error, FromMultiValue, Fuel, Function,
10    FunctionPrototype, IntoMultiValue, SequencePoll, Stack, String, Thread, ThreadMode, Value,
11    Variadic,
12};
13
14use super::{
15    thread::{Frame, LuaFrame, ThreadState},
16    vm::run_vm,
17};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum ExecutorMode {
21    /// There are no threads being run and the `Executor` must be restarted to do any work.
22    Stopped,
23    /// Lua has errored or returned (or yielded) values that must be taken to move the `Executor` to
24    /// the `Stopped` (or `Suspended`) state.
25    Result,
26    /// There is an active thread in the `ThreadMode::Normal` state and it is can be run with
27    /// `Executor::step`.
28    Normal,
29    /// The main thread has yielded and is waiting on being resumed.
30    Suspended,
31    /// The `Executor` is currently inside its own `Executor::step` function.
32    Running,
33}
34
35#[derive(Debug, Copy, Clone, Error)]
36#[error("bad executor mode: {found:?}, expected {expected:?}")]
37pub struct BadExecutorMode {
38    pub found: ExecutorMode,
39    pub expected: ExecutorMode,
40}
41
42#[derive(Debug, Collect)]
43#[collect(no_drop)]
44pub struct ExecutorState<'gc> {
45    thread_stack: vec::Vec<Thread<'gc>, MetricsAlloc<'gc>>,
46}
47
48pub type ExecutorInner<'gc> = RefLock<ExecutorState<'gc>>;
49
50/// The entry-point for the Lua VM.
51///
52/// `Executor` runs networks of `Threads` that may depend on each other and may pass control
53/// back and forth. All Lua code that is run is done so directly or indirectly by calling
54/// `Executor::step`.
55///
56/// # Panics
57///
58/// `Executor` is dangerous to use from within any kind of Lua callback. It has no protection
59/// against re-entrency, and calling `Executor` methods from within a callback that it is running
60/// (other than `Executor::mode`) will panic. Additionally, even if an independent `Executor` is
61/// used, cross-thread upvalues may cause a panic if one `Executor` is used within the other.
62///
63/// `Executor`s are not meant to be used from callbacks at all, and `Executor`s should not be
64/// nested. Instead, use the normal mechanisms for callbacks to call Lua code so that it is run on
65/// the same executor calling the callback.
66#[derive(Debug, Copy, Clone, Collect)]
67#[collect(no_drop)]
68pub struct Executor<'gc>(Gc<'gc, ExecutorInner<'gc>>);
69
70impl<'gc> PartialEq for Executor<'gc> {
71    fn eq(&self, other: &Executor<'gc>) -> bool {
72        Gc::ptr_eq(self.0, other.0)
73    }
74}
75
76impl<'gc> Eq for Executor<'gc> {}
77
78impl<'gc> Hash for Executor<'gc> {
79    fn hash<H: Hasher>(&self, state: &mut H) {
80        Gc::as_ptr(self.0).hash(state)
81    }
82}
83
84impl<'gc> Executor<'gc> {
85    const FUEL_PER_CALLBACK: i32 = 8;
86    const FUEL_PER_SEQ_STEP: i32 = 4;
87    const FUEL_PER_STEP: i32 = 4;
88
89    /// Creates a new `Executor` with a stopped main thread.
90    pub fn new(ctx: Context<'gc>) -> Self {
91        Self::run(&ctx, Thread::new(ctx))
92    }
93
94    /// Creates a new `Executor` that begins running the given thread.
95    pub fn run(mc: &Mutation<'gc>, thread: Thread<'gc>) -> Self {
96        let mut thread_stack = vec::Vec::new_in(MetricsAlloc::new(mc));
97        thread_stack.push(thread);
98        Executor(Gc::new(mc, RefLock::new(ExecutorState { thread_stack })))
99    }
100
101    pub fn from_inner(inner: Gc<'gc, ExecutorInner<'gc>>) -> Self {
102        Self(inner)
103    }
104
105    pub fn into_inner(self) -> Gc<'gc, ExecutorInner<'gc>> {
106        self.0
107    }
108
109    /// Creates a new `Executor` with a new `Thread` running the given function.
110    pub fn start(
111        ctx: Context<'gc>,
112        function: Function<'gc>,
113        args: impl IntoMultiValue<'gc>,
114    ) -> Self {
115        let thread = Thread::new(ctx);
116        thread.start(ctx, function, args).unwrap();
117        Self::run(&ctx, thread)
118    }
119
120    pub fn mode(self) -> ExecutorMode {
121        if let Ok(state) = self.0.try_borrow() {
122            if state.thread_stack.len() > 1 {
123                ExecutorMode::Normal
124            } else {
125                match state.thread_stack[0].mode() {
126                    ThreadMode::Stopped => ExecutorMode::Stopped,
127                    ThreadMode::Result => ExecutorMode::Result,
128                    ThreadMode::Normal => ExecutorMode::Normal,
129                    ThreadMode::Suspended => ExecutorMode::Suspended,
130                    ThreadMode::Waiting => unreachable!(),
131                    ThreadMode::Running => ExecutorMode::Running,
132                }
133            }
134        } else {
135            ExecutorMode::Running
136        }
137    }
138
139    /// Runs the VM for a period of time controlled by the `fuel` parameter.
140    ///
141    /// The VM and callbacks will consume fuel as they run, and `Executor::step` will return as soon
142    /// as `Fuel::can_continue()` returns false *and some minimal positive progress has been made*.
143    ///
144    /// Returns `false` if the method has exhausted its fuel, but there is more work to
145    /// do, and returns `true` if no more progress can be made. If `true` is returned, then
146    /// `Executor::mode()` will no longer be `ExecutorMode::Normal`.
147    pub fn step(self, ctx: Context<'gc>, fuel: &mut Fuel) -> bool {
148        let mut state = self.0.borrow_mut(&ctx);
149
150        loop {
151            let mut top_thread = state.thread_stack.last().copied().unwrap();
152            let mut res_thread = None;
153            match top_thread.mode() {
154                ThreadMode::Normal => {}
155                ThreadMode::Running => {
156                    panic!("`Executor` thread already running")
157                }
158                _ => {
159                    if state.thread_stack.len() == 1 {
160                        break true;
161                    } else {
162                        state.thread_stack.pop();
163                        res_thread = Some(top_thread);
164                        top_thread = state.thread_stack.last().copied().unwrap();
165                    }
166                }
167            }
168
169            let mut top_state = top_thread.into_inner().borrow_mut(&ctx);
170            let top_state = &mut *top_state;
171            if let Some(res_thread) = res_thread {
172                let mode = top_state.mode();
173                if mode == ThreadMode::Waiting {
174                    assert!(matches!(top_state.frames.pop(), Some(Frame::WaitThread)));
175                    match res_thread.mode() {
176                        ThreadMode::Result => {
177                            // Take the results from the res_thread and return them to our top
178                            // thread.
179                            let mut res_state = res_thread.into_inner().borrow_mut(&ctx);
180                            match res_state.take_result() {
181                                Ok(vals) => {
182                                    let bottom = top_state.stack.len();
183                                    top_state.stack.extend(vals);
184                                    top_state.return_to(bottom);
185                                }
186                                Err(err) => {
187                                    top_state.frames.push(Frame::Error(err.into()));
188                                }
189                            }
190                            drop(res_state);
191                        }
192                        ThreadMode::Normal => unreachable!(),
193                        res_mode => top_state.frames.push(Frame::Error(
194                            BadThreadMode {
195                                found: res_mode,
196                                expected: None,
197                            }
198                            .into(),
199                        )),
200                    }
201                } else {
202                    // Shenanigans have happened and the upper thread has had its state externally
203                    // changed.
204                    top_state.frames.push(Frame::Error(
205                        BadThreadMode {
206                            found: mode,
207                            expected: None,
208                        }
209                        .into(),
210                    ));
211                }
212            }
213
214            if top_state.mode() == ThreadMode::Normal {
215                fn callback_ret<'gc>(
216                    ctx: Context<'gc>,
217                    thread_stack: &mut vec::Vec<Thread<'gc>, MetricsAlloc<'gc>>,
218                    top_state: &mut ThreadState<'gc>,
219                    stack_bottom: usize,
220                    ret: CallbackReturn<'gc>,
221                ) {
222                    match ret {
223                        CallbackReturn::Return => {
224                            top_state.return_to(stack_bottom);
225                        }
226                        CallbackReturn::Sequence(sequence) => {
227                            top_state.frames.push(Frame::Sequence {
228                                bottom: stack_bottom,
229                                sequence,
230                                pending_error: None,
231                            });
232                        }
233                        CallbackReturn::Yield { to_thread, then } => {
234                            if let Some(sequence) = then {
235                                top_state.frames.push(Frame::Sequence {
236                                    bottom: stack_bottom,
237                                    sequence,
238                                    pending_error: None,
239                                });
240                            }
241                            top_state.frames.push(Frame::Yielded);
242
243                            if let Some(to_thread) = to_thread {
244                                if let Err(err) = to_thread
245                                    .resume(ctx, Variadic(top_state.stack.drain(stack_bottom..)))
246                                {
247                                    top_state.frames.push(Frame::Error(err.into()));
248                                } else {
249                                    thread_stack.pop();
250                                    thread_stack.push(to_thread);
251                                }
252                            } else {
253                                top_state.frames.push(Frame::Result {
254                                    bottom: stack_bottom,
255                                });
256                            }
257                        }
258                        CallbackReturn::Call { function, then } => {
259                            if let Some(sequence) = then {
260                                top_state.frames.push(Frame::Sequence {
261                                    bottom: stack_bottom,
262                                    sequence,
263                                    pending_error: None,
264                                });
265                            }
266                            top_state.push_call(stack_bottom, function);
267                        }
268                        CallbackReturn::Resume { thread, then } => {
269                            if let Some(sequence) = then {
270                                top_state.frames.push(Frame::Sequence {
271                                    bottom: stack_bottom,
272                                    sequence,
273                                    pending_error: None,
274                                });
275                            }
276                            top_state.frames.push(Frame::WaitThread);
277
278                            if let Err(err) =
279                                thread.resume(ctx, Variadic(top_state.stack.drain(stack_bottom..)))
280                            {
281                                top_state.frames.push(Frame::Error(err.into()));
282                            } else {
283                                if top_state.frames.len() == 1 {
284                                    // Tail call the thread resume if we can.
285                                    assert!(matches!(top_state.frames[0], Frame::WaitThread));
286                                    thread_stack.pop();
287                                }
288                                thread_stack.push(thread);
289                            }
290                        }
291                    }
292                }
293
294                fn execution<'gc, 'a>(
295                    executor: Executor<'gc>,
296                    fuel: &'a mut Fuel,
297                    threads: &'a [Thread<'gc>],
298                    top_frames: &'a [Frame<'gc>],
299                    top_stack: &[Value<'gc>],
300                ) -> Execution<'gc, 'a> {
301                    let upper_lua = match top_frames.last() {
302                        Some(Frame::Lua { bottom, pc, .. }) => {
303                            let Value::Function(Function::Closure(closure)) = top_stack[*bottom]
304                            else {
305                                panic!("lua frame bottom is not a closure");
306                            };
307                            // Subtract 1 instruction for the Call opcode.
308                            Some((closure.prototype(), *pc - 1))
309                        }
310                        _ => None,
311                    };
312
313                    Execution {
314                        executor,
315                        fuel,
316                        upper_lua,
317                        threads,
318                    }
319                }
320
321                match top_state.frames.pop() {
322                    Some(Frame::Callback { bottom, callback }) => {
323                        fuel.consume(Self::FUEL_PER_CALLBACK);
324                        let exec = execution(
325                            self,
326                            fuel,
327                            &state.thread_stack,
328                            &top_state.frames,
329                            &top_state.stack,
330                        );
331                        match callback.call(ctx, exec, Stack::new(&mut top_state.stack, bottom)) {
332                            Ok(ret) => {
333                                callback_ret(ctx, &mut state.thread_stack, top_state, bottom, ret)
334                            }
335                            Err(err) => {
336                                top_state.stack.truncate(bottom);
337                                top_state.frames.push(Frame::Error(err))
338                            }
339                        }
340                    }
341                    Some(Frame::Sequence {
342                        bottom,
343                        mut sequence,
344                        pending_error,
345                    }) => {
346                        fuel.consume(Self::FUEL_PER_SEQ_STEP);
347
348                        let exec = execution(
349                            self,
350                            fuel,
351                            &state.thread_stack,
352                            &top_state.frames,
353                            &top_state.stack,
354                        );
355                        let fin = if let Some(err) = pending_error {
356                            sequence.error(ctx, exec, err, Stack::new(&mut top_state.stack, bottom))
357                        } else {
358                            sequence.poll(ctx, exec, Stack::new(&mut top_state.stack, bottom))
359                        };
360
361                        match fin {
362                            Ok(ret) => callback_ret(
363                                ctx,
364                                &mut state.thread_stack,
365                                top_state,
366                                bottom,
367                                match ret {
368                                    SequencePoll::Pending => CallbackReturn::Sequence(sequence),
369                                    SequencePoll::Return => CallbackReturn::Return,
370                                    SequencePoll::Yield { to_thread, is_tail } => {
371                                        CallbackReturn::Yield {
372                                            to_thread,
373                                            then: if is_tail { None } else { Some(sequence) },
374                                        }
375                                    }
376                                    SequencePoll::Call { function, is_tail } => {
377                                        CallbackReturn::Call {
378                                            function,
379                                            then: if is_tail { None } else { Some(sequence) },
380                                        }
381                                    }
382                                    SequencePoll::Resume { thread, is_tail } => {
383                                        CallbackReturn::Resume {
384                                            thread,
385                                            then: if is_tail { None } else { Some(sequence) },
386                                        }
387                                    }
388                                },
389                            ),
390                            Err(error) => {
391                                top_state.stack.truncate(bottom);
392                                top_state.frames.push(Frame::Error(error));
393                            }
394                        }
395                    }
396                    Some(frame @ Frame::Lua { .. }) => {
397                        top_state.frames.push(frame);
398
399                        const VM_GRANULARITY: u32 = 64;
400
401                        let lua_frame = LuaFrame {
402                            state: top_state,
403                            thread: top_thread,
404                            fuel,
405                        };
406                        match run_vm(ctx, lua_frame, VM_GRANULARITY) {
407                            Err(err) => {
408                                top_state.frames.push(Frame::Error(err.into()));
409                            }
410                            Ok(instructions_run) => {
411                                fuel.consume(instructions_run.try_into().unwrap());
412                            }
413                        }
414                    }
415                    Some(Frame::Error(err)) => {
416                        match top_state
417                            .frames
418                            .pop()
419                            .expect("normal thread must have frame above error")
420                        {
421                            Frame::Lua { bottom, .. } => {
422                                top_state.close_upvalues(&ctx, bottom);
423                                top_state.stack.truncate(bottom);
424                                top_state.frames.push(Frame::Error(err));
425                            }
426                            Frame::Sequence {
427                                bottom,
428                                sequence,
429                                pending_error: error,
430                            } => {
431                                assert!(error.is_none());
432                                top_state.frames.push(Frame::Sequence {
433                                    bottom,
434                                    sequence,
435                                    pending_error: Some(err),
436                                });
437                            }
438                            _ => top_state.frames.push(Frame::Error(err)),
439                        }
440                    }
441                    _ => panic!("tried to step invalid frame type"),
442                }
443            }
444
445            fuel.consume(Self::FUEL_PER_STEP);
446
447            if !fuel.should_continue() {
448                break false;
449            }
450        }
451    }
452
453    pub fn take_result<T: FromMultiValue<'gc>>(
454        self,
455        ctx: Context<'gc>,
456    ) -> Result<Result<T, Error<'gc>>, BadExecutorMode> {
457        let mode = self.mode();
458        if mode == ExecutorMode::Result {
459            let state = self.0.borrow();
460            Ok(state.thread_stack[0].take_result(ctx).unwrap())
461        } else {
462            Err(BadExecutorMode {
463                found: mode,
464                expected: ExecutorMode::Result,
465            })
466        }
467    }
468
469    pub fn resume(
470        self,
471        ctx: Context<'gc>,
472        args: impl IntoMultiValue<'gc>,
473    ) -> Result<(), BadExecutorMode> {
474        let mode = self.mode();
475        if mode == ExecutorMode::Suspended {
476            let state = self.0.borrow();
477            state.thread_stack[0].resume(ctx, args).unwrap();
478            Ok(())
479        } else {
480            Err(BadExecutorMode {
481                found: mode,
482                expected: ExecutorMode::Suspended,
483            })
484        }
485    }
486
487    pub fn resume_err(self, mc: &Mutation<'gc>, error: Error<'gc>) -> Result<(), BadExecutorMode> {
488        let mode = self.mode();
489        if mode == ExecutorMode::Suspended {
490            let state = self.0.borrow();
491            state.thread_stack[0].resume_err(mc, error).unwrap();
492            Ok(())
493        } else {
494            Err(BadExecutorMode {
495                found: mode,
496                expected: ExecutorMode::Suspended,
497            })
498        }
499    }
500
501    /// Reset this `Executor` entirely, leaving it with a stopped main thread. Equivalent to
502    /// creating a new executor with `Executor::new`.
503    pub fn stop(self, mc: &Mutation<'gc>) {
504        let mut state = self.0.borrow_mut(mc);
505        state.thread_stack.truncate(1);
506        state.thread_stack[0].reset(mc).unwrap();
507    }
508
509    /// Reset this `Executor` entirely and begins running the given thread. Equivalent to
510    /// creating a new executor with `Executor::run`.
511    pub fn reset(self, mc: &Mutation<'gc>, thread: Thread<'gc>) {
512        let mut state = self.0.borrow_mut(mc);
513        state.thread_stack.clear();
514        state.thread_stack.push(thread);
515    }
516
517    /// Reset this `Executor` entirely and begins running the given function, equivalent to
518    /// creating a new executor with `Executor::start`.
519    pub fn restart(
520        self,
521        ctx: Context<'gc>,
522        function: Function<'gc>,
523        args: impl IntoMultiValue<'gc>,
524    ) {
525        let mut state = self.0.borrow_mut(&ctx);
526        state.thread_stack.truncate(1);
527        state.thread_stack[0].reset(&ctx).unwrap();
528        state.thread_stack[0].start(ctx, function, args).unwrap();
529    }
530}
531
532/// Execution state passed to callbacks when they are run by an `Executor`.
533pub struct Execution<'gc, 'a> {
534    executor: Executor<'gc>,
535    fuel: &'a mut Fuel,
536    upper_lua: Option<(Gc<'gc, FunctionPrototype<'gc>>, usize)>,
537    threads: &'a [Thread<'gc>],
538}
539
540impl<'gc, 'a> Execution<'gc, 'a> {
541    /// The fuel parameter passed to `Executor::step`.
542    pub fn fuel(&mut self) -> &mut Fuel {
543        self.fuel
544    }
545
546    /// The curently executing Thread.
547    pub fn current_thread(&self) -> CurrentThread<'gc> {
548        CurrentThread {
549            thread: *self.threads.last().unwrap(),
550            is_main: self.threads.len() == 1,
551        }
552    }
553
554    /// The curently running Executor.
555    ///
556    /// Do not call methods on this from callbacks! This is provided only for identification
557    /// purposes, so that callbacks can identify which executor that is currently executing them, or
558    /// to store the pointer somewhere.
559    pub fn executor(&self) -> Executor<'gc> {
560        self.executor
561    }
562
563    /// If the function we are returning to is Lua, returns information about the Lua frame we are
564    /// returning to.
565    pub fn upper_lua_frame(&self) -> Option<UpperLuaFrame<'gc>> {
566        self.upper_lua.map(|(proto, pc)| UpperLuaFrame {
567            chunk_name: proto.chunk_name,
568            current_function: proto.reference,
569            current_line: match proto
570                .opcode_line_numbers
571                .binary_search_by_key(&pc, |(opi, _)| *opi)
572            {
573                Ok(i) => proto.opcode_line_numbers[i].1,
574                Err(i) => proto.opcode_line_numbers[i - 1].1,
575            },
576        })
577    }
578}
579
580pub struct CurrentThread<'gc> {
581    pub thread: Thread<'gc>,
582    pub is_main: bool,
583}
584
585pub struct UpperLuaFrame<'gc> {
586    pub chunk_name: String<'gc>,
587    pub current_function: FunctionRef<String<'gc>>,
588    pub current_line: LineNumber,
589}