graphix_compiler/
lib.rs

1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14    env::Env,
15    expr::{ExprId, ModPath},
16    typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use fxhash::{FxHashMap, FxHashSet};
23use log::info;
24use netidx::{
25    path::Path,
26    publisher::{Id, Val, WriteRequest},
27    subscriber::{self, Dval, SubId, UpdatesFlags, Value},
28};
29use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
30use node::compiler;
31use parking_lot::RwLock;
32use std::{
33    any::Any,
34    cell::{Cell, RefCell},
35    collections::{hash_map::Entry, HashMap},
36    fmt::Debug,
37    mem,
38    sync::{
39        self,
40        atomic::{AtomicBool, Ordering},
41        LazyLock,
42    },
43    time::Duration,
44};
45use tokio::time::Instant;
46use triomphe::Arc;
47
48#[allow(dead_code)]
49static TRACE: AtomicBool = AtomicBool::new(false);
50
51#[allow(dead_code)]
52fn set_trace(b: bool) {
53    TRACE.store(b, Ordering::Relaxed)
54}
55
56#[allow(dead_code)]
57fn trace() -> bool {
58    TRACE.load(Ordering::Relaxed)
59}
60
61#[macro_export]
62macro_rules! tdbg {
63    ($e:expr) => {
64        if $crate::trace() {
65            dbg!($e)
66        } else {
67            $e
68        }
69    };
70}
71
72thread_local! {
73    /// thread local shared refs structure
74    pub static REFS: RefCell<Refs> = RefCell::new(Refs::new());
75}
76
77atomic_id!(LambdaId);
78
79impl From<u64> for LambdaId {
80    fn from(v: u64) -> Self {
81        LambdaId(v)
82    }
83}
84
85atomic_id!(BindId);
86
87impl From<u64> for BindId {
88    fn from(v: u64) -> Self {
89        BindId(v)
90    }
91}
92
93impl TryFrom<Value> for BindId {
94    type Error = anyhow::Error;
95
96    fn try_from(value: Value) -> Result<Self> {
97        match value {
98            Value::U64(id) => Ok(BindId(id)),
99            v => bail!("invalid bind id {v}"),
100        }
101    }
102}
103
104#[macro_export]
105macro_rules! errf {
106    ($pat:expr, $($arg:expr),*) => {
107        Some(Value::Error(ArcStr::from(format_compact!($pat, $($arg),*).as_str())))
108    };
109    ($pat:expr) => { Some(Value::Error(ArcStr::from(format_compact!($pat).as_str()))) };
110}
111
112#[macro_export]
113macro_rules! err {
114    ($pat:literal) => {
115        Some(Value::Error(literal!($pat)))
116    };
117}
118
119pub trait UserEvent: Clone + Debug + Any {
120    fn clear(&mut self);
121}
122
123#[derive(Debug, Clone)]
124pub struct NoUserEvent;
125
126impl UserEvent for NoUserEvent {
127    fn clear(&mut self) {}
128}
129
130#[derive(Debug, Clone, Copy)]
131#[bitflags]
132#[repr(u64)]
133pub enum PrintFlag {
134    /// Dereference type variables and print both the tvar name and the bound
135    /// type or "unbound".
136    DerefTVars,
137    /// Replace common primitives with shorter type names as defined
138    /// in core. e.g. Any, instead of the set of every primitive type.
139    ReplacePrims,
140    /// When formatting an Origin don't print the source, just the location
141    NoSource,
142    /// When formatting an Origin don't print the origin's parents
143    NoParents,
144}
145
146thread_local! {
147    static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims.into());
148}
149
150/// For the duration of the closure F change the way type variables
151/// are formatted (on this thread only) according to the specified
152/// flags.
153pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
154    flags: G,
155    f: F,
156) -> R {
157    let prev = PRINT_FLAGS.replace(flags.into());
158    let res = f();
159    PRINT_FLAGS.set(prev);
160    res
161}
162
163/// Event represents all the things that happened simultaneously in a
164/// given execution cycle. Event may contain only one update for each
165/// variable and netidx subscription in a given cycle, if more updates
166/// happen simultaneously they must be queued and deferred to later
167/// cycles.
168#[derive(Debug)]
169pub struct Event<E: UserEvent> {
170    pub init: bool,
171    pub variables: FxHashMap<BindId, Value>,
172    pub netidx: FxHashMap<SubId, subscriber::Event>,
173    pub writes: FxHashMap<Id, WriteRequest>,
174    pub rpc_calls: FxHashMap<BindId, RpcCall>,
175    pub user: E,
176}
177
178impl<E: UserEvent> Event<E> {
179    pub fn new(user: E) -> Self {
180        Event {
181            init: false,
182            variables: HashMap::default(),
183            netidx: HashMap::default(),
184            writes: HashMap::default(),
185            rpc_calls: HashMap::default(),
186            user,
187        }
188    }
189
190    pub fn clear(&mut self) {
191        let Self { init, variables, netidx, rpc_calls, writes, user } = self;
192        *init = false;
193        variables.clear();
194        netidx.clear();
195        rpc_calls.clear();
196        writes.clear();
197        user.clear();
198    }
199}
200
201#[derive(Debug, Clone)]
202pub struct Refs {
203    refed: FxHashSet<BindId>,
204    bound: FxHashSet<BindId>,
205}
206
207impl Refs {
208    pub fn new() -> Self {
209        Self { refed: FxHashSet::default(), bound: FxHashSet::default() }
210    }
211
212    pub fn clear(&mut self) {
213        self.refed.clear();
214        self.bound.clear();
215    }
216
217    pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
218        for id in &self.refed {
219            if !self.bound.contains(id) {
220                f(*id);
221            }
222        }
223    }
224}
225
226pub type Node<R, E> = Box<dyn Update<R, E>>;
227
228pub type BuiltInInitFn<R, E> = sync::Arc<
229    dyn for<'a, 'b, 'c> Fn(
230            &'a mut ExecCtx<R, E>,
231            &'a FnType,
232            &'b ModPath,
233            &'c [Node<R, E>],
234            ExprId,
235        ) -> Result<Box<dyn Apply<R, E>>>
236        + Send
237        + Sync
238        + 'static,
239>;
240
241pub type InitFn<R, E> = sync::Arc<
242    dyn for<'a, 'b> Fn(
243            &'a mut ExecCtx<R, E>,
244            &'b [Node<R, E>],
245            ExprId,
246        ) -> Result<Box<dyn Apply<R, E>>>
247        + Send
248        + Sync
249        + 'static,
250>;
251
252/// Apply is a kind of node that represents a function application. It
253/// does not hold ownership of it's arguments, instead those are held
254/// by a CallSite node. This allows us to change the function called
255/// at runtime without recompiling the arguments.
256pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
257    fn update(
258        &mut self,
259        ctx: &mut ExecCtx<R, E>,
260        from: &mut [Node<R, E>],
261        event: &mut Event<E>,
262    ) -> Option<Value>;
263
264    /// delete any internally generated nodes, only needed for
265    /// builtins that dynamically generate code at runtime
266    fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
267        ()
268    }
269
270    /// apply custom typechecking to the lambda, only needed for
271    /// builtins that take lambdas as arguments
272    fn typecheck(
273        &mut self,
274        _ctx: &mut ExecCtx<R, E>,
275        _from: &mut [Node<R, E>],
276    ) -> Result<()> {
277        Ok(())
278    }
279
280    /// return the lambdas type, builtins do not need to implement
281    /// this, it is implemented by the BuiltIn wrapper
282    fn typ(&self) -> Arc<FnType> {
283        const EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
284            Arc::new(FnType {
285                args: Arc::from_iter([]),
286                constraints: Arc::new(RwLock::new(vec![])),
287                rtype: Type::Bottom,
288                vargs: None,
289            })
290        });
291        Arc::clone(&*EMPTY)
292    }
293
294    /// Populate the Refs structure with all the ids bound and refed by this
295    /// node. It is only necessary for builtins to implement this if they create
296    /// nodes, such as call sites.
297    fn refs<'a>(&self, _refs: &mut Refs) {}
298
299    /// put the node to sleep, used in conditions like select for branches that
300    /// are not selected. Any cached values should be cleared on sleep.
301    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
302}
303
304/// Update represents a regular graph node, as opposed to a function
305/// application represented by Apply. Regular graph nodes are used for
306/// every built in node except for builtin functions.
307pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
308    /// update the node with the specified event and return any output
309    /// it might generate
310    fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
311
312    /// delete the node and it's children from the specified context
313    fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
314
315    /// type check the node and it's children
316    fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
317
318    /// return the node type
319    fn typ(&self) -> &Type;
320
321    /// Populate the Refs structure with all the bind ids either refed or bound
322    /// by the node and it's children
323    fn refs(&self, refs: &mut Refs);
324
325    /// return the original expression used to compile this node
326    fn spec(&self) -> &Expr;
327
328    /// put the node to sleep, called on unselected branches
329    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
330}
331
332pub trait BuiltIn<R: Rt, E: UserEvent> {
333    const NAME: &str;
334    const TYP: LazyLock<FnType>;
335
336    fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
337}
338
339pub trait Rt: Debug + 'static {
340    fn clear(&mut self);
341
342    /// Subscribe to the specified netidx path. When the subscription
343    /// updates you are expected to deliver Netidx events to the
344    /// expression specified by ref_by.
345    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
346
347    /// Called when a subscription is no longer needed
348    fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
349
350    /// List the netidx path, return Value::Null if the path did not
351    /// change. When the path did update you should send the output
352    /// back as a properly formatted struct with two fields, rows and
353    /// columns both containing string arrays.
354    fn list(&mut self, id: BindId, path: Path);
355
356    /// List the table at path, return Value::Null if the path did not
357    /// change
358    fn list_table(&mut self, id: BindId, path: Path);
359
360    /// list or table will no longer be called on this BindId, and
361    /// related resources can be cleaned up.
362    fn stop_list(&mut self, id: BindId);
363
364    /// Publish the specified value, returning it's Id, which must be
365    /// used to update the value and unpublish it. If the path is
366    /// already published, return an error.
367    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
368
369    /// Update the specified value
370    fn update(&mut self, id: &Val, value: Value);
371
372    /// Stop publishing the specified id
373    fn unpublish(&mut self, id: Val, ref_by: ExprId);
374
375    /// This will be called by the compiler whenever a bound variable
376    /// is referenced. The ref_by is the toplevel expression that
377    /// contains the variable reference. When a variable event
378    /// happens, you should update all the toplevel expressions that
379    /// ref that variable.
380    ///
381    /// ref_var will also be called when a bound lambda expression is
382    /// referenced, in that case the ref_by id will be the toplevel
383    /// expression containing the call site.
384    fn ref_var(&mut self, id: BindId, ref_by: ExprId);
385    fn unref_var(&mut self, id: BindId, ref_by: ExprId);
386
387    /// Called by the ExecCtx when set_var is called on it.
388    ///
389    /// All expressions that ref the id should be updated when this happens. The
390    /// runtime must deliver all set_vars in a single event except that set_vars
391    /// for the same variable in the same cycle must be queued and deferred to
392    /// the next cycle.
393    ///
394    /// The runtime MUST NOT change event while a cycle is in
395    /// progress. set_var must be queued until the cycle ends and then
396    /// presented as a new batch.
397    fn set_var(&mut self, id: BindId, value: Value);
398
399    /// Called by the ExecCtx when set_var_now is called on it
400    ///
401    /// Set the variable right now for the current cycle. This is called when
402    /// the compiler knows that nothing can depend on the variable before the
403    /// current node, and as a result the only nodes that can depend on the
404    /// variable have yet to be executed. It is therefore safe to skip the extra
405    /// cycle and set the variable in the event now. The Rt must make sure to
406    /// wake up any nodes that depend on the variable, but it need not wake up
407    /// nodes before the current node.
408    ///
409    /// The runtime can assume that the event has already been updated.
410    fn set_var_now(&mut self, id: BindId);
411
412    /// This must return results from the same path in the call order.
413    ///
414    /// when the rpc returns you are expected to deliver a Variable
415    /// event with the specified id to the expression specified by
416    /// ref_by.
417    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
418
419    /// Publish an rpc at the specified path with the specified
420    /// procedure level doc and arg spec.
421    ///
422    /// When the RPC is called the rpc table in event will be
423    /// populated under the specified bind id.
424    ///
425    /// If the procedure is already published an error will be
426    /// returned
427    fn publish_rpc(
428        &mut self,
429        name: Path,
430        doc: Value,
431        spec: Vec<ArgSpec>,
432        id: BindId,
433    ) -> Result<()>;
434
435    /// unpublish the rpc identified by the bind id.
436    fn unpublish_rpc(&mut self, name: Path);
437
438    /// arrange to have a Timer event delivered after timeout. When
439    /// the timer expires you are expected to deliver a Variable event
440    /// for the id, containing the current time.
441    fn set_timer(&mut self, id: BindId, timeout: Duration);
442}
443
444pub struct ExecCtx<R: Rt, E: UserEvent> {
445    builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
446    tags: FxHashSet<ArcStr>,
447    pub env: Env<R, E>,
448    pub cached: FxHashMap<BindId, Value>,
449    pub rt: R,
450}
451
452impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
453    pub fn clear(&mut self) {
454        self.env.clear();
455        self.rt.clear();
456    }
457
458    /// Build a new execution context.
459    ///
460    /// This is a very low level interface that you can use to build a
461    /// custom runtime with deep integration to your code. It is very
462    /// difficult to use, and if you don't implement everything
463    /// correctly the semantics of the language can be wrong.
464    ///
465    /// Most likely you want to use the `rt` module instead.
466    pub fn new(user: R) -> Self {
467        Self {
468            env: Env::new(),
469            builtins: FxHashMap::default(),
470            tags: FxHashSet::default(),
471            cached: HashMap::default(),
472            rt: user,
473        }
474    }
475
476    pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
477        let f = T::init(self);
478        match self.builtins.entry(T::NAME) {
479            Entry::Vacant(e) => {
480                e.insert((T::TYP.clone(), f));
481            }
482            Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
483        }
484        Ok(())
485    }
486
487    /// Built in functions should call this when variables are set
488    /// unless they are sure the variable does not need to be
489    /// cached. This will also call the user ctx set_var.
490    pub fn set_var(&mut self, id: BindId, v: Value) {
491        self.cached.insert(id, v.clone());
492        self.rt.set_var(id, v)
493    }
494
495    pub fn set_var_now(&mut self, id: BindId, v: Value) {
496        self.cached.insert(id, v);
497        self.rt.set_var_now(id);
498    }
499
500    fn tag(&mut self, s: &ArcStr) -> ArcStr {
501        match self.tags.get(s) {
502            Some(s) => s.clone(),
503            None => {
504                self.tags.insert(s.clone());
505                s.clone()
506            }
507        }
508    }
509
510    /// Restore the lexical environment to the snapshot `env` for the
511    /// duration of `f` restoring it to it's original value
512    /// afterwords. `by_id` and `lambdas` defined by the closure will
513    /// be retained.
514    pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
515        &mut self,
516        env: Env<R, E>,
517        f: F,
518    ) -> T {
519        let snap = self.env.restore_lexical_env(env);
520        let orig = mem::replace(&mut self.env, snap);
521        let r = f(self);
522        self.env = self.env.restore_lexical_env(orig);
523        r
524    }
525}
526
527/// compile the expression into a node graph in the specified context
528/// and scope, return the root node or an error if compilation failed.
529pub fn compile<R: Rt, E: UserEvent>(
530    ctx: &mut ExecCtx<R, E>,
531    scope: &ModPath,
532    spec: Expr,
533) -> Result<Node<R, E>> {
534    let top_id = spec.id;
535    let env = ctx.env.clone();
536    let st = Instant::now();
537    let mut node = match compiler::compile(ctx, spec, scope, top_id) {
538        Ok(n) => n,
539        Err(e) => {
540            ctx.env = env;
541            return Err(e);
542        }
543    };
544    info!("compile time {:?}", st.elapsed());
545    let st = Instant::now();
546    if let Err(e) = node.typecheck(ctx) {
547        ctx.env = env;
548        return Err(e);
549    }
550    info!("typecheck time {:?}", st.elapsed());
551    Ok(node)
552}