badder_lang/
controller.rs

1use super::*;
2use std::{
3    sync::mpsc::{self, RecvTimeoutError, TryRecvError},
4    thread,
5    time::*,
6};
7
8const STACK_SIZE: usize = 8 * 1024 * 1024;
9const BADDER_STACK_LEN: usize = 200;
10
11#[derive(Clone, Debug)]
12pub struct Phase {
13    pub id: u64,
14    pub time: Instant,
15    pub src: SourceRef,
16    /// most recent function call ref relavant to this Ast, empty => top level code
17    pub called_from: Vec<SourceRef>,
18    kind: PhaseKind,
19    unpaused: bool,
20    pub stack: Arc<[FxIndexMap<Token, FrameData>]>,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum PhaseKind {
25    FunctionCall,
26    Assignment,
27    Other,
28}
29
30impl PhaseKind {
31    fn from(ast: &Ast) -> PhaseKind {
32        match *ast {
33            Ast::Call(..) => PhaseKind::FunctionCall,
34            Ast::Assign(..) | Ast::AssignFun(..) | Ast::AssignSeq(..) => PhaseKind::Assignment,
35
36            _ => PhaseKind::Other,
37        }
38    }
39}
40
41#[derive(Debug, Clone)]
42enum OverseerUpdate {
43    Phase(Phase),
44    FinishedFunCall,
45}
46
47struct ControllerOverseer {
48    next_id: u64,
49    pause_time: single_value_channel::Receiver<Duration>,
50    to_controller: mpsc::Sender<OverseerUpdate>,
51    from_controller: mpsc::Receiver<Result<u64, ()>>,
52    external_function_ids: Vec<Token>,
53    external_function_call: mpsc::Sender<ExternalCall>,
54    external_function_answer: mpsc::Receiver<Result<(Int, IntFlag), String>>,
55    last_stack_copy: Option<Arc<[FxIndexMap<Token, FrameData>]>>,
56}
57
58fn interested_in(ast: &Ast) -> bool {
59    use crate::Ast::*;
60    !matches!(
61        *ast,
62        LinePair(..)
63            | Line(..)
64            | Empty(..)
65            | Num(..)
66            | ReferSeq(..)
67            | ReferSeqIndex(..)
68            | Refer(..)
69    )
70}
71
72impl ControllerOverseer {
73    fn replace_last_stack(
74        &mut self,
75        stack: &[FxIndexMap<Token, FrameData>],
76    ) -> Arc<[FxIndexMap<Token, FrameData>]> {
77        let last: Arc<[_]> = stack.into();
78        self.last_stack_copy = Some(Arc::clone(&last));
79        last
80    }
81}
82
83impl Overseer for ControllerOverseer {
84    fn oversee(
85        &mut self,
86        stack: &[FxIndexMap<Token, FrameData>], // Cow?
87        ast: &Ast,
88        _current_scope: usize,
89        _stack_key: StackKey,
90    ) -> Result<(), ()> {
91        if !interested_in(ast) {
92            return Ok(());
93        }
94
95        let id = self.next_id;
96        let send_time = Instant::now();
97        self.next_id = id.overflowing_add(1).0;
98
99        let stack = {
100            // kerfuffle to avoid cloning the stack when it hasn't changed
101            if let Some(last) = self.last_stack_copy.take() {
102                if &*last == stack {
103                    self.last_stack_copy = Some(Arc::clone(&last));
104                    last
105                } else {
106                    self.replace_last_stack(stack)
107                }
108            } else {
109                self.replace_last_stack(stack)
110            }
111        };
112
113        trace!("ControllerOverseer sending: {:?} {:?}", ast.src(), ast);
114        self.to_controller
115            .send(OverseerUpdate::Phase(Phase {
116                id,
117                src: ast.src(),
118                called_from: Vec::new(), // unknown
119                kind: PhaseKind::from(ast),
120                unpaused: false,
121                time: send_time,
122                stack,
123            }))
124            .expect("send");
125
126        let mut recv = self.from_controller.try_recv();
127        while recv != Err(TryRecvError::Empty) {
128            match recv {
129                Ok(Ok(i)) => {
130                    if i == id {
131                        return Ok(());
132                    }
133                }
134                Ok(Err(_)) | Err(TryRecvError::Disconnected) => {
135                    debug!("ControllerOverseer cancelling: {:?} {:?}", ast.src(), ast);
136                    return Err(());
137                }
138                _ => (),
139            }
140            recv = self.from_controller.try_recv();
141        }
142
143        let pause_time = *self.pause_time.latest();
144        if send_time.elapsed() >= pause_time {
145            Ok(())
146        } else {
147            // block until result received
148            debug!("ControllerOverseer waiting: {:?} {:?}", ast.src(), ast);
149
150            let mut elapsed = send_time.elapsed();
151            while elapsed < pause_time {
152                match self.from_controller.recv_timeout(pause_time - elapsed) {
153                    Ok(Ok(i)) => {
154                        if i == id {
155                            return Ok(());
156                        }
157                    }
158                    Ok(Err(_)) | Err(RecvTimeoutError::Disconnected) => {
159                        debug!("ControllerOverseer cancelling: {:?} {:?}", ast.src(), ast);
160                        return Err(());
161                    }
162                    _ => (),
163                };
164
165                elapsed = send_time.elapsed();
166            }
167            Ok(())
168        }
169    }
170
171    fn oversee_after(&mut self, _stack: &[FxIndexMap<Token, FrameData>], ast: &Ast) {
172        if let Ast::Call(..) = *ast {
173            let _ = self.to_controller.send(OverseerUpdate::FinishedFunCall);
174        }
175    }
176
177    fn external_function_signatures(&self) -> &[Token] {
178        &self.external_function_ids
179    }
180
181    fn call_external_function(
182        &mut self,
183        id: Token,
184        args: Vec<(Int, IntFlag)>,
185    ) -> Result<(Int, IntFlag), String> {
186        debug!(
187            "ControllerOverseer awaiting answer: {:?}, args {:?}",
188            id, args
189        );
190        self.external_function_call
191            .send(ExternalCall { id, args })
192            .expect("send");
193
194        // block until answer received
195        match self.external_function_answer.recv() {
196            Ok(result) => result,
197            Err(err) => {
198                debug!("ControllerOverseer cancelling: {:?}", err);
199                Err("cancelled".into())
200            }
201        }
202    }
203}
204
205#[derive(Debug)]
206pub struct Controller {
207    pause_time: Duration,
208    current_phase: Option<Phase>,
209    fun_call_history: Vec<SourceRef>,
210    result: Option<Res<Int>>,
211    current_external_call: Option<ExternalCall>,
212    external_function_ids: Vec<Token>,
213    run_stats: RunStats,
214
215    /// #cancel() has been called this run
216    cancelled: bool,
217
218    from_overseer: mpsc::Receiver<OverseerUpdate>,
219    execution_result: mpsc::Receiver<Res<Int>>,
220    to_overseer: mpsc::Sender<Result<u64, ()>>,
221    overseer_pause: single_value_channel::Updater<Duration>,
222    external_function_call: mpsc::Receiver<ExternalCall>,
223    external_function_answer: mpsc::Sender<Result<(Int, IntFlag), String>>,
224}
225
226impl Controller {
227    pub fn new(pause: Duration) -> Controller {
228        Controller {
229            pause_time: pause,
230            current_phase: None,
231            fun_call_history: Vec::new(),
232            result: None,
233            current_external_call: None,
234            external_function_ids: Vec::new(),
235            run_stats: RunStats::default(),
236
237            cancelled: false,
238            from_overseer: mpsc::channel().1,
239            execution_result: mpsc::channel().1,
240            to_overseer: mpsc::channel().0,
241            overseer_pause: single_value_channel::channel_starting_with(Duration::from_secs(0)).1,
242            external_function_answer: mpsc::channel().0,
243            external_function_call: mpsc::channel().1,
244        }
245    }
246
247    pub fn new_no_pause() -> Controller {
248        Controller::new(Duration::from_secs(0))
249    }
250
251    pub fn new_max_pause() -> Controller {
252        // just a large amount of seconds, for some reason u64::MAX caused issues on windows
253        Controller::new(Duration::from_secs(u64::from(u32::MAX)))
254    }
255
256    /// Modifies the pause time, the duration the interpreter will block for
257    /// before non-trivial AST interpretation waiting for a #unpause() or #cancel() call
258    /// after the duration execution will continue automatically
259    pub fn set_pause_duration(&mut self, pause_time: Duration) {
260        self.pause_time = pause_time;
261        // ignore errors to allow setting pause_time before execution
262        let _ = self.overseer_pause.update(self.pause_time);
263    }
264
265    pub fn pause_duration(&self) -> Duration {
266        self.pause_time
267    }
268
269    /// Unblocks current waiting phase's execution, if it is blocked.
270    /// Requires a recent (in terms of set pause_time) call to #refresh() to be valid
271    pub fn unpause(&mut self) {
272        if let Some(Phase {
273            id,
274            unpaused: false,
275            ..
276        }) = self.current_phase
277        {
278            // ignore errors as send can happen after execution finishes
279            let _ = self.to_overseer.send(Ok(id));
280            self.current_phase.as_mut().unwrap().unpaused = true;
281        }
282    }
283
284    /// Requests any current executing interpreter be cancelled
285    pub fn cancel(&mut self) {
286        if !self.cancelled {
287            self.cancelled = true;
288            self.fun_call_history.clear();
289            let _ = self.to_overseer.send(Err(()));
290        }
291    }
292
293    /// Returns current execution phase, requires a recent (in terms of set pause_time)
294    /// call to #refresh() to be valid
295    pub fn current_phase(&self) -> Option<Phase> {
296        if self.cancelled {
297            None
298        } else {
299            self.current_phase.clone()
300        }
301    }
302
303    pub fn run_stats(&self) -> &RunStats {
304        &self.run_stats
305    }
306
307    /// Returns result of execution
308    pub fn result(&self) -> Option<&Res<Int>> {
309        self.result.as_ref()
310    }
311
312    /// Returns current execution is paused, requires a recent (in terms of set pause_time)
313    /// call to #refresh() to be valid
314    pub fn paused(&self) -> bool {
315        if self.cancelled {
316            return false;
317        }
318
319        if let Some(Phase { time, unpaused, .. }) = self.current_phase {
320            !unpaused && time.elapsed() < self.pause_time
321        } else {
322            false
323        }
324    }
325
326    /// Communicate with the current execution
327    /// populates #result & #current_phase if available
328    /// Returns if a change has taken place
329    pub fn refresh(&mut self) -> bool {
330        if self.result.is_some() {
331            return false;
332        }
333
334        if let Ok(result) = self.execution_result.try_recv() {
335            self.result = Some(result);
336            self.current_phase = None;
337            return true;
338        }
339
340        let mut change = self.refresh_overseer_updates();
341
342        if let Ok(call) = self.external_function_call.try_recv() {
343            // refresh overseer updates again in case the external call
344            // occurred just after the last `refresh_overseer_updates` call
345            // call before setting `current_external_call` as it has an assert
346            self.refresh_overseer_updates();
347
348            self.current_external_call = Some(call);
349            change = true;
350        }
351
352        change
353    }
354
355    /// Returns if a change occurred
356    #[inline]
357    fn refresh_overseer_updates(&mut self) -> bool {
358        let mut change = false;
359        while let Ok(update) = self.from_overseer.try_recv() {
360            debug_assert!(
361                self.current_external_call.is_none(),
362                "Update received during external call: {:?}\nupdate: {:?}",
363                self.current_external_call,
364                update,
365            );
366
367            match update {
368                OverseerUpdate::Phase(mut phase) => {
369                    phase.called_from = self.current_call_info();
370                    if phase.kind == PhaseKind::FunctionCall {
371                        self.fun_call_history.push(phase.src)
372                    }
373                    if !self.cancelled {
374                        self.run_stats.consider(&phase);
375                    }
376                    self.current_phase = Some(phase);
377                }
378                OverseerUpdate::FinishedFunCall => {
379                    self.fun_call_history.pop();
380                }
381            };
382            change = true;
383        }
384        change
385    }
386
387    fn current_call_info(&self) -> Vec<SourceRef> {
388        if self.cancelled {
389            vec![]
390        } else {
391            self.fun_call_history.iter().rev().cloned().collect()
392        }
393    }
394
395    /// Adds an external function signature
396    /// must be a valid badder signature to work
397    /// ie `some_ext_function(svv)` for a 3 arg function taking a sequence & 2 variable arguments
398    pub fn add_external_function(&mut self, id: &str) {
399        self.external_function_ids.push(Token::Id(id.into()));
400    }
401
402    pub fn clear_external_functions(&mut self) {
403        self.external_function_ids.clear();
404    }
405
406    pub fn current_external_call(&self) -> Option<ExternalCall> {
407        if self.cancelled {
408            None
409        } else {
410            self.current_external_call.clone()
411        }
412    }
413
414    pub fn answer_external_call(&mut self, result: Result<(Int, IntFlag), String>) {
415        self.current_external_call = None;
416        if let Err(err) = self.external_function_answer.send(result) {
417            warn!(
418                "Comms failure with badder runtime when answering external call: {}",
419                err
420            );
421        }
422    }
423
424    /// Start executing code with a new thread.
425    /// Effectively resets this instances state.
426    pub fn execute(&mut self, code: Ast) {
427        let (to_controller, from_overseer) = mpsc::channel();
428        let (to_overseer, from_controller) = mpsc::channel();
429        let (final_result, execution_result) = mpsc::channel();
430        let (get_pause, set_pause) = single_value_channel::channel_starting_with(self.pause_time);
431        let (send_fun_call, recv_fun_call) = mpsc::channel();
432        let (send_fun_answer, recv_fun_answer) = mpsc::channel();
433
434        self.to_overseer = to_overseer;
435        self.from_overseer = from_overseer;
436        self.execution_result = execution_result;
437        self.overseer_pause = set_pause;
438        self.external_function_call = recv_fun_call;
439        self.external_function_answer = send_fun_answer;
440        self.result = None;
441        self.current_phase = None;
442        self.fun_call_history.clear();
443        self.current_external_call = None;
444        self.cancelled = false;
445        self.run_stats = RunStats::default();
446
447        let external_function_ids = self.external_function_ids.clone();
448
449        thread::Builder::new()
450            .name("badder-exe".into())
451            .stack_size(STACK_SIZE)
452            .spawn(move || {
453                let overseer = ControllerOverseer {
454                    next_id: 0,
455                    pause_time: get_pause,
456                    to_controller,
457                    from_controller,
458                    external_function_ids,
459                    external_function_call: send_fun_call,
460                    external_function_answer: recv_fun_answer,
461                    last_stack_copy: None,
462                };
463                let _ =
464                    final_result.send(Interpreter::new(BADDER_STACK_LEN, overseer).evaluate(&code));
465            })
466            .unwrap();
467    }
468}
469
470#[derive(Debug, PartialEq, Clone, Eq, Hash)]
471pub struct ExternalCall {
472    pub id: Token,
473    pub args: Vec<(Int, IntFlag)>,
474}
475
476impl ExternalCall {
477    pub fn id_str(&self) -> &str {
478        match self.id {
479            Token::Id(ref s) => s,
480            _ => unreachable!(),
481        }
482    }
483}
484
485#[derive(Debug, Clone, Default)]
486pub struct RunStats {
487    /// source_ref -> evaluation count
488    pub eval_counts: IndexMap<SourceRef, usize>,
489    last_phase: Option<u64>,
490}
491
492impl RunStats {
493    fn consider(&mut self, phase: &Phase) {
494        if phase.kind != PhaseKind::Assignment {
495            if let Some(id) = self.last_phase {
496                if id != phase.id {
497                    *(self.eval_counts.entry(phase.src).or_insert(0)) += 1;
498                }
499            } else {
500                *(self.eval_counts.entry(phase.src).or_insert(0)) += 1;
501            }
502
503            self.last_phase = Some(phase.id);
504        }
505    }
506}