orchidlang/libs/asynch/
system.rs

1//! Object to pass to [crate::facade::loader::Loader::add_system] to enable the
2//! I/O subsystem. Also many other systems depend on it, these take a mut ref to
3//! register themselves.
4
5use std::any::{type_name, Any, TypeId};
6use std::cell::RefCell;
7use std::collections::VecDeque;
8use std::fmt;
9use std::rc::Rc;
10use std::sync::mpsc::Sender;
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14use hashbrown::HashMap;
15use ordered_float::NotNan;
16use rust_embed::RustEmbed;
17
18use super::poller::{PollEvent, Poller, TimerHandle};
19use crate::facade::system::{IntoSystem, System};
20use crate::foreign::atom::Atomic;
21use crate::foreign::cps_box::CPSBox;
22use crate::foreign::error::RTError;
23use crate::foreign::inert::{Inert, InertPayload};
24use crate::gen::tpl;
25use crate::gen::traits::Gen;
26use crate::gen::tree::{atom_ent, xfn_ent, ConstTree};
27use crate::interpreter::gen_nort::nort_gen;
28use crate::interpreter::handler::HandlerTable;
29use crate::interpreter::nort::Expr;
30use crate::libs::std::number::Numeric;
31use crate::location::{CodeGenInfo, CodeLocation};
32use crate::sym;
33use crate::utils::unwrap_or::unwrap_or;
34use crate::virt_fs::{DeclTree, EmbeddedFS, PrefixFS, VirtFS};
35
36#[derive(Debug, Clone)]
37struct Timer {
38  recurring: bool,
39  delay: NotNan<f64>,
40}
41
42fn set_timer(rec: Inert<bool>, delay: Numeric) -> CPSBox<Timer> {
43  CPSBox::new(2, Timer { recurring: rec.0, delay: delay.as_float() })
44}
45
46#[derive(Clone)]
47struct CancelTimer(Arc<Mutex<dyn Fn() + Send>>);
48impl CancelTimer {
49  pub fn new<T: Send + Clone + 'static>(canceller: TimerHandle<T>) -> Self {
50    Self(Arc::new(Mutex::new(move || canceller.clone().cancel())))
51  }
52  pub fn cancel(&self) { self.0.lock().unwrap()() }
53}
54impl fmt::Debug for CancelTimer {
55  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56    f.debug_struct("CancelTimer").finish_non_exhaustive()
57  }
58}
59
60#[derive(Clone, Debug)]
61struct Yield;
62impl InertPayload for Yield {
63  const TYPE_STR: &'static str = "asynch::yield";
64}
65
66/// Error indicating a yield command when all event producers and timers had
67/// exited
68#[derive(Clone)]
69pub struct InfiniteBlock;
70impl RTError for InfiniteBlock {}
71impl fmt::Display for InfiniteBlock {
72  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73    static MSG: &str = "User code yielded, but there are no timers or event \
74                        producers to wake it up in the future";
75    write!(f, "{}", MSG)
76  }
77}
78
79/// A thread-safe handle that can be used to send events of any type
80#[derive(Clone)]
81pub struct MessagePort(Sender<Box<dyn Any + Send>>);
82impl MessagePort {
83  /// Send an event. Any type is accepted, handlers are dispatched by type ID
84  pub fn send<T: Send + 'static>(&mut self, message: T) { let _ = self.0.send(Box::new(message)); }
85}
86
87fn gen() -> CodeGenInfo { CodeGenInfo::no_details(sym!(asynch)) }
88
89#[derive(RustEmbed)]
90#[folder = "src/libs/asynch"]
91#[include = "*.orc"]
92struct AsynchEmbed;
93
94fn code() -> DeclTree {
95  DeclTree::ns("system::async", [DeclTree::leaf(
96    PrefixFS::new(EmbeddedFS::new::<AsynchEmbed>(".orc", gen()), "", "async").rc(),
97  )])
98}
99
100type AnyHandler<'a> = Box<dyn FnMut(Box<dyn Any>) -> Vec<Expr> + 'a>;
101
102/// Datastructures the asynch system will eventually be constructed from.
103pub struct AsynchSystem<'a> {
104  poller: Poller<Box<dyn Any + Send>, Expr, Expr>,
105  sender: Sender<Box<dyn Any + Send>>,
106  handlers: HashMap<TypeId, AnyHandler<'a>>,
107}
108
109impl<'a> AsynchSystem<'a> {
110  /// Create a new async event loop that allows registering handlers and taking
111  /// references to the port before it's converted into a [System]
112  #[must_use]
113  pub fn new() -> Self {
114    let (sender, poller) = Poller::new();
115    Self { poller, sender, handlers: HashMap::new() }
116  }
117
118  /// Register a callback to be called on the owning thread when an object of
119  /// the given type is found on the queue. Each type should signify a single
120  /// command so each type should have exactly one handler.
121  ///
122  /// # Panics
123  ///
124  /// if the given type is already handled.
125  pub fn register<T: 'static>(&mut self, mut f: impl FnMut(Box<T>) -> Vec<Expr> + 'a) {
126    let cb = move |a: Box<dyn Any>| f(a.downcast().expect("keyed by TypeId"));
127    let prev = self.handlers.insert(TypeId::of::<T>(), Box::new(cb));
128    assert!(prev.is_none(), "Duplicate handlers for async event {}", type_name::<T>())
129  }
130
131  /// Obtain a message port for sending messages to the main thread. If an
132  /// object is passed to the MessagePort that does not have a handler, the
133  /// main thread panics.
134  #[must_use]
135  pub fn get_port(&self) -> MessagePort { MessagePort(self.sender.clone()) }
136}
137
138impl<'a> Default for AsynchSystem<'a> {
139  fn default() -> Self { Self::new() }
140}
141
142impl<'a> IntoSystem<'a> for AsynchSystem<'a> {
143  fn into_system(self) -> System<'a> {
144    let Self { mut handlers, poller, .. } = self;
145    let mut handler_table = HandlerTable::new();
146    let polly = Rc::new(RefCell::new(poller));
147    handler_table.register({
148      let polly = polly.clone();
149      move |t: &CPSBox<Timer>| {
150        let mut polly = polly.borrow_mut();
151        let (Timer { delay, recurring }, action, cont) = t.unpack2();
152        let duration = Duration::from_secs_f64(**delay);
153        let cancel_timer = match *recurring {
154          true => CancelTimer::new(polly.set_interval(duration, action)),
155          false => CancelTimer::new(polly.set_timeout(duration, action)),
156        };
157        let tpl = tpl::A(tpl::Slot, tpl::V(CPSBox::new(1, cancel_timer)));
158        tpl.template(nort_gen(cont.location()), [cont])
159      }
160    });
161    handler_table.register(move |t: &CPSBox<CancelTimer>| {
162      let (command, cont) = t.unpack1();
163      command.cancel();
164      cont
165    });
166    handler_table.register({
167      let polly = polly.clone();
168      let mut microtasks = VecDeque::new();
169      move |_: &Inert<Yield>| {
170        if let Some(expr) = microtasks.pop_front() {
171          return Ok(expr);
172        }
173        let mut polly = polly.borrow_mut();
174        loop {
175          let next = unwrap_or!(polly.run();
176            return Err(InfiniteBlock.pack())
177          );
178          match next {
179            PollEvent::Once(expr) => return Ok(expr),
180            PollEvent::Recurring(expr) => return Ok(expr),
181            PollEvent::Event(ev) => {
182              let handler = (handlers.get_mut(&ev.as_ref().type_id()))
183                .unwrap_or_else(|| panic!("Unhandled messgae type: {:?}", (*ev).type_id()));
184              let events = handler(ev);
185              // we got new microtasks
186              if !events.is_empty() {
187                microtasks = VecDeque::from(events);
188                // trampoline
189                let loc = CodeLocation::new_gen(CodeGenInfo::no_details(sym!(system::asynch)));
190                return Ok(Inert(Yield).atom_expr(loc));
191              }
192            },
193          }
194        }
195      }
196    });
197    System {
198      name: "system::asynch",
199      lexer_plugins: vec![],
200      line_parsers: vec![],
201      constants: ConstTree::ns("system::async", [ConstTree::tree([
202        xfn_ent("set_timer", [set_timer]),
203        atom_ent("yield", [Inert(Yield)]),
204      ])]),
205      code: code(),
206      prelude: Vec::new(),
207      handlers: handler_table,
208    }
209  }
210}