1use super::*;
2use std::{
3 sync::mpsc::{self, RecvTimeoutError, TryRecvError},
4 thread,
5 time::*,
6};
7
8const STACK_SIZE: usize = 8 * 1024 * 1024;
9const BADDER_STACK_LEN: usize = 200;
10
11#[derive(Clone, Debug)]
12pub struct Phase {
13 pub id: u64,
14 pub time: Instant,
15 pub src: SourceRef,
16 pub called_from: Vec<SourceRef>,
18 kind: PhaseKind,
19 unpaused: bool,
20 pub stack: Arc<[FxIndexMap<Token, FrameData>]>,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum PhaseKind {
25 FunctionCall,
26 Assignment,
27 Other,
28}
29
30impl PhaseKind {
31 fn from(ast: &Ast) -> PhaseKind {
32 match *ast {
33 Ast::Call(..) => PhaseKind::FunctionCall,
34 Ast::Assign(..) | Ast::AssignFun(..) | Ast::AssignSeq(..) => PhaseKind::Assignment,
35
36 _ => PhaseKind::Other,
37 }
38 }
39}
40
41#[derive(Debug, Clone)]
42enum OverseerUpdate {
43 Phase(Phase),
44 FinishedFunCall,
45}
46
47struct ControllerOverseer {
48 next_id: u64,
49 pause_time: single_value_channel::Receiver<Duration>,
50 to_controller: mpsc::Sender<OverseerUpdate>,
51 from_controller: mpsc::Receiver<Result<u64, ()>>,
52 external_function_ids: Vec<Token>,
53 external_function_call: mpsc::Sender<ExternalCall>,
54 external_function_answer: mpsc::Receiver<Result<(Int, IntFlag), String>>,
55 last_stack_copy: Option<Arc<[FxIndexMap<Token, FrameData>]>>,
56}
57
58fn interested_in(ast: &Ast) -> bool {
59 use crate::Ast::*;
60 !matches!(
61 *ast,
62 LinePair(..)
63 | Line(..)
64 | Empty(..)
65 | Num(..)
66 | ReferSeq(..)
67 | ReferSeqIndex(..)
68 | Refer(..)
69 )
70}
71
72impl ControllerOverseer {
73 fn replace_last_stack(
74 &mut self,
75 stack: &[FxIndexMap<Token, FrameData>],
76 ) -> Arc<[FxIndexMap<Token, FrameData>]> {
77 let last: Arc<[_]> = stack.into();
78 self.last_stack_copy = Some(Arc::clone(&last));
79 last
80 }
81}
82
83impl Overseer for ControllerOverseer {
84 fn oversee(
85 &mut self,
86 stack: &[FxIndexMap<Token, FrameData>], ast: &Ast,
88 _current_scope: usize,
89 _stack_key: StackKey,
90 ) -> Result<(), ()> {
91 if !interested_in(ast) {
92 return Ok(());
93 }
94
95 let id = self.next_id;
96 let send_time = Instant::now();
97 self.next_id = id.overflowing_add(1).0;
98
99 let stack = {
100 if let Some(last) = self.last_stack_copy.take() {
102 if &*last == stack {
103 self.last_stack_copy = Some(Arc::clone(&last));
104 last
105 } else {
106 self.replace_last_stack(stack)
107 }
108 } else {
109 self.replace_last_stack(stack)
110 }
111 };
112
113 trace!("ControllerOverseer sending: {:?} {:?}", ast.src(), ast);
114 self.to_controller
115 .send(OverseerUpdate::Phase(Phase {
116 id,
117 src: ast.src(),
118 called_from: Vec::new(), kind: PhaseKind::from(ast),
120 unpaused: false,
121 time: send_time,
122 stack,
123 }))
124 .expect("send");
125
126 let mut recv = self.from_controller.try_recv();
127 while recv != Err(TryRecvError::Empty) {
128 match recv {
129 Ok(Ok(i)) => {
130 if i == id {
131 return Ok(());
132 }
133 }
134 Ok(Err(_)) | Err(TryRecvError::Disconnected) => {
135 debug!("ControllerOverseer cancelling: {:?} {:?}", ast.src(), ast);
136 return Err(());
137 }
138 _ => (),
139 }
140 recv = self.from_controller.try_recv();
141 }
142
143 let pause_time = *self.pause_time.latest();
144 if send_time.elapsed() >= pause_time {
145 Ok(())
146 } else {
147 debug!("ControllerOverseer waiting: {:?} {:?}", ast.src(), ast);
149
150 let mut elapsed = send_time.elapsed();
151 while elapsed < pause_time {
152 match self.from_controller.recv_timeout(pause_time - elapsed) {
153 Ok(Ok(i)) => {
154 if i == id {
155 return Ok(());
156 }
157 }
158 Ok(Err(_)) | Err(RecvTimeoutError::Disconnected) => {
159 debug!("ControllerOverseer cancelling: {:?} {:?}", ast.src(), ast);
160 return Err(());
161 }
162 _ => (),
163 };
164
165 elapsed = send_time.elapsed();
166 }
167 Ok(())
168 }
169 }
170
171 fn oversee_after(&mut self, _stack: &[FxIndexMap<Token, FrameData>], ast: &Ast) {
172 if let Ast::Call(..) = *ast {
173 let _ = self.to_controller.send(OverseerUpdate::FinishedFunCall);
174 }
175 }
176
177 fn external_function_signatures(&self) -> &[Token] {
178 &self.external_function_ids
179 }
180
181 fn call_external_function(
182 &mut self,
183 id: Token,
184 args: Vec<(Int, IntFlag)>,
185 ) -> Result<(Int, IntFlag), String> {
186 debug!(
187 "ControllerOverseer awaiting answer: {:?}, args {:?}",
188 id, args
189 );
190 self.external_function_call
191 .send(ExternalCall { id, args })
192 .expect("send");
193
194 match self.external_function_answer.recv() {
196 Ok(result) => result,
197 Err(err) => {
198 debug!("ControllerOverseer cancelling: {:?}", err);
199 Err("cancelled".into())
200 }
201 }
202 }
203}
204
205#[derive(Debug)]
206pub struct Controller {
207 pause_time: Duration,
208 current_phase: Option<Phase>,
209 fun_call_history: Vec<SourceRef>,
210 result: Option<Res<Int>>,
211 current_external_call: Option<ExternalCall>,
212 external_function_ids: Vec<Token>,
213 run_stats: RunStats,
214
215 cancelled: bool,
217
218 from_overseer: mpsc::Receiver<OverseerUpdate>,
219 execution_result: mpsc::Receiver<Res<Int>>,
220 to_overseer: mpsc::Sender<Result<u64, ()>>,
221 overseer_pause: single_value_channel::Updater<Duration>,
222 external_function_call: mpsc::Receiver<ExternalCall>,
223 external_function_answer: mpsc::Sender<Result<(Int, IntFlag), String>>,
224}
225
226impl Controller {
227 pub fn new(pause: Duration) -> Controller {
228 Controller {
229 pause_time: pause,
230 current_phase: None,
231 fun_call_history: Vec::new(),
232 result: None,
233 current_external_call: None,
234 external_function_ids: Vec::new(),
235 run_stats: RunStats::default(),
236
237 cancelled: false,
238 from_overseer: mpsc::channel().1,
239 execution_result: mpsc::channel().1,
240 to_overseer: mpsc::channel().0,
241 overseer_pause: single_value_channel::channel_starting_with(Duration::from_secs(0)).1,
242 external_function_answer: mpsc::channel().0,
243 external_function_call: mpsc::channel().1,
244 }
245 }
246
247 pub fn new_no_pause() -> Controller {
248 Controller::new(Duration::from_secs(0))
249 }
250
251 pub fn new_max_pause() -> Controller {
252 Controller::new(Duration::from_secs(u64::from(u32::MAX)))
254 }
255
256 pub fn set_pause_duration(&mut self, pause_time: Duration) {
260 self.pause_time = pause_time;
261 let _ = self.overseer_pause.update(self.pause_time);
263 }
264
265 pub fn pause_duration(&self) -> Duration {
266 self.pause_time
267 }
268
269 pub fn unpause(&mut self) {
272 if let Some(Phase {
273 id,
274 unpaused: false,
275 ..
276 }) = self.current_phase
277 {
278 let _ = self.to_overseer.send(Ok(id));
280 self.current_phase.as_mut().unwrap().unpaused = true;
281 }
282 }
283
284 pub fn cancel(&mut self) {
286 if !self.cancelled {
287 self.cancelled = true;
288 self.fun_call_history.clear();
289 let _ = self.to_overseer.send(Err(()));
290 }
291 }
292
293 pub fn current_phase(&self) -> Option<Phase> {
296 if self.cancelled {
297 None
298 } else {
299 self.current_phase.clone()
300 }
301 }
302
303 pub fn run_stats(&self) -> &RunStats {
304 &self.run_stats
305 }
306
307 pub fn result(&self) -> Option<&Res<Int>> {
309 self.result.as_ref()
310 }
311
312 pub fn paused(&self) -> bool {
315 if self.cancelled {
316 return false;
317 }
318
319 if let Some(Phase { time, unpaused, .. }) = self.current_phase {
320 !unpaused && time.elapsed() < self.pause_time
321 } else {
322 false
323 }
324 }
325
326 pub fn refresh(&mut self) -> bool {
330 if self.result.is_some() {
331 return false;
332 }
333
334 if let Ok(result) = self.execution_result.try_recv() {
335 self.result = Some(result);
336 self.current_phase = None;
337 return true;
338 }
339
340 let mut change = self.refresh_overseer_updates();
341
342 if let Ok(call) = self.external_function_call.try_recv() {
343 self.refresh_overseer_updates();
347
348 self.current_external_call = Some(call);
349 change = true;
350 }
351
352 change
353 }
354
355 #[inline]
357 fn refresh_overseer_updates(&mut self) -> bool {
358 let mut change = false;
359 while let Ok(update) = self.from_overseer.try_recv() {
360 debug_assert!(
361 self.current_external_call.is_none(),
362 "Update received during external call: {:?}\nupdate: {:?}",
363 self.current_external_call,
364 update,
365 );
366
367 match update {
368 OverseerUpdate::Phase(mut phase) => {
369 phase.called_from = self.current_call_info();
370 if phase.kind == PhaseKind::FunctionCall {
371 self.fun_call_history.push(phase.src)
372 }
373 if !self.cancelled {
374 self.run_stats.consider(&phase);
375 }
376 self.current_phase = Some(phase);
377 }
378 OverseerUpdate::FinishedFunCall => {
379 self.fun_call_history.pop();
380 }
381 };
382 change = true;
383 }
384 change
385 }
386
387 fn current_call_info(&self) -> Vec<SourceRef> {
388 if self.cancelled {
389 vec![]
390 } else {
391 self.fun_call_history.iter().rev().cloned().collect()
392 }
393 }
394
395 pub fn add_external_function(&mut self, id: &str) {
399 self.external_function_ids.push(Token::Id(id.into()));
400 }
401
402 pub fn clear_external_functions(&mut self) {
403 self.external_function_ids.clear();
404 }
405
406 pub fn current_external_call(&self) -> Option<ExternalCall> {
407 if self.cancelled {
408 None
409 } else {
410 self.current_external_call.clone()
411 }
412 }
413
414 pub fn answer_external_call(&mut self, result: Result<(Int, IntFlag), String>) {
415 self.current_external_call = None;
416 if let Err(err) = self.external_function_answer.send(result) {
417 warn!(
418 "Comms failure with badder runtime when answering external call: {}",
419 err
420 );
421 }
422 }
423
424 pub fn execute(&mut self, code: Ast) {
427 let (to_controller, from_overseer) = mpsc::channel();
428 let (to_overseer, from_controller) = mpsc::channel();
429 let (final_result, execution_result) = mpsc::channel();
430 let (get_pause, set_pause) = single_value_channel::channel_starting_with(self.pause_time);
431 let (send_fun_call, recv_fun_call) = mpsc::channel();
432 let (send_fun_answer, recv_fun_answer) = mpsc::channel();
433
434 self.to_overseer = to_overseer;
435 self.from_overseer = from_overseer;
436 self.execution_result = execution_result;
437 self.overseer_pause = set_pause;
438 self.external_function_call = recv_fun_call;
439 self.external_function_answer = send_fun_answer;
440 self.result = None;
441 self.current_phase = None;
442 self.fun_call_history.clear();
443 self.current_external_call = None;
444 self.cancelled = false;
445 self.run_stats = RunStats::default();
446
447 let external_function_ids = self.external_function_ids.clone();
448
449 thread::Builder::new()
450 .name("badder-exe".into())
451 .stack_size(STACK_SIZE)
452 .spawn(move || {
453 let overseer = ControllerOverseer {
454 next_id: 0,
455 pause_time: get_pause,
456 to_controller,
457 from_controller,
458 external_function_ids,
459 external_function_call: send_fun_call,
460 external_function_answer: recv_fun_answer,
461 last_stack_copy: None,
462 };
463 let _ =
464 final_result.send(Interpreter::new(BADDER_STACK_LEN, overseer).evaluate(&code));
465 })
466 .unwrap();
467 }
468}
469
470#[derive(Debug, PartialEq, Clone, Eq, Hash)]
471pub struct ExternalCall {
472 pub id: Token,
473 pub args: Vec<(Int, IntFlag)>,
474}
475
476impl ExternalCall {
477 pub fn id_str(&self) -> &str {
478 match self.id {
479 Token::Id(ref s) => s,
480 _ => unreachable!(),
481 }
482 }
483}
484
485#[derive(Debug, Clone, Default)]
486pub struct RunStats {
487 pub eval_counts: IndexMap<SourceRef, usize>,
489 last_phase: Option<u64>,
490}
491
492impl RunStats {
493 fn consider(&mut self, phase: &Phase) {
494 if phase.kind != PhaseKind::Assignment {
495 if let Some(id) = self.last_phase {
496 if id != phase.id {
497 *(self.eval_counts.entry(phase.src).or_insert(0)) += 1;
498 }
499 } else {
500 *(self.eval_counts.entry(phase.src).or_insert(0)) += 1;
501 }
502
503 self.last_phase = Some(phase.id);
504 }
505 }
506}