graphix_compiler/
lib.rs

1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14    env::Env,
15    expr::{ExprId, ModPath},
16    typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use fxhash::{FxHashMap, FxHashSet};
23use log::info;
24use netidx::{
25    path::Path,
26    publisher::{Id, Val, WriteRequest},
27    subscriber::{self, Dval, SubId, UpdatesFlags, Value},
28};
29use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
30use node::compiler;
31use parking_lot::RwLock;
32use poolshark::local::LPooled;
33use std::{
34    any::Any,
35    cell::Cell,
36    collections::{hash_map::Entry, HashMap},
37    fmt::Debug,
38    mem,
39    sync::{
40        self,
41        atomic::{AtomicBool, Ordering},
42        LazyLock,
43    },
44    time::Duration,
45};
46use tokio::time::Instant;
47use triomphe::Arc;
48
49#[derive(Debug, Clone, Copy)]
50#[bitflags]
51#[repr(u64)]
52pub enum CFlag {
53    WarnUnhandled,
54    WarnUnhandledArith,
55    WarnUnused,
56    WarningsAreErrors,
57}
58
59#[allow(dead_code)]
60static TRACE: AtomicBool = AtomicBool::new(false);
61
62#[allow(dead_code)]
63fn set_trace(b: bool) {
64    TRACE.store(b, Ordering::Relaxed)
65}
66
67#[allow(dead_code)]
68fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
69    let set = if enable {
70        eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
71        let prev = trace();
72        set_trace(true);
73        !prev
74    } else {
75        false
76    };
77    let r = match f() {
78        Err(e) => {
79            eprintln!("traced at {} failed with {e:?}", spec.pos);
80            Err(e)
81        }
82        r => r,
83    };
84    if set {
85        eprintln!("trace disabled at {}", spec.pos);
86        set_trace(false)
87    }
88    r
89}
90
91#[allow(dead_code)]
92fn trace() -> bool {
93    TRACE.load(Ordering::Relaxed)
94}
95
96#[macro_export]
97macro_rules! tdbg {
98    ($e:expr) => {
99        if $crate::trace() {
100            dbg!($e)
101        } else {
102            $e
103        }
104    };
105}
106
107#[macro_export]
108macro_rules! err {
109    ($tag:expr, $err:literal) => {{
110        let e: Value = ($tag.clone(), literal!($err)).into();
111        Value::Error(triomphe::Arc::new(e))
112    }};
113}
114
115#[macro_export]
116macro_rules! errf {
117    ($tag:expr, $fmt:expr, $args:tt) => {{
118        let msg: ArcStr = compact_str::format_compact!($fmt, $args).as_str().into();
119        let e: Value = ($tag.clone(), msg).into();
120        Value::Error(triomphe::Arc::new(e))
121    }};
122    ($tag:expr, $fmt:expr) => {{
123        let msg: ArcStr = compact_str::format_compact!($fmt).as_str().into();
124        let e: Value = ($tag.clone(), msg).into();
125        Value::Error(triomphe::Arc::new(e))
126    }};
127}
128
129#[macro_export]
130macro_rules! defetyp {
131    ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
132        static $tag_name: ArcStr = arcstr::literal!($tag);
133        static $name: ::std::sync::LazyLock<$crate::typ::Type> =
134            ::std::sync::LazyLock::new(|| {
135                let scope = $crate::expr::ModPath::root();
136                $crate::expr::parser::parse_type(&format!($typ, $tag))
137                    .expect("failed to parse type")
138                    .scope_refs(&scope)
139            });
140    };
141}
142
143defetyp!(CAST_ERR, CAST_ERR_TAG, "InvalidCast", "Error<`{}(string)>");
144
145atomic_id!(LambdaId);
146
147impl From<u64> for LambdaId {
148    fn from(v: u64) -> Self {
149        LambdaId(v)
150    }
151}
152
153atomic_id!(BindId);
154
155impl From<u64> for BindId {
156    fn from(v: u64) -> Self {
157        BindId(v)
158    }
159}
160
161impl TryFrom<Value> for BindId {
162    type Error = anyhow::Error;
163
164    fn try_from(value: Value) -> Result<Self> {
165        match value {
166            Value::U64(id) => Ok(BindId(id)),
167            v => bail!("invalid bind id {v}"),
168        }
169    }
170}
171
172pub trait UserEvent: Clone + Debug + Any {
173    fn clear(&mut self);
174}
175
176#[derive(Debug, Clone)]
177pub struct NoUserEvent;
178
179impl UserEvent for NoUserEvent {
180    fn clear(&mut self) {}
181}
182
183#[derive(Debug, Clone, Copy)]
184#[bitflags]
185#[repr(u64)]
186pub enum PrintFlag {
187    /// Dereference type variables and print both the tvar name and the bound
188    /// type or "unbound".
189    DerefTVars,
190    /// Replace common primitives with shorter type names as defined
191    /// in core. e.g. Any, instead of the set of every primitive type.
192    ReplacePrims,
193    /// When formatting an Origin don't print the source, just the location
194    NoSource,
195    /// When formatting an Origin don't print the origin's parents
196    NoParents,
197}
198
199thread_local! {
200    static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims | PrintFlag::NoSource);
201}
202
203/// For the duration of the closure F change the way type variables
204/// are formatted (on this thread only) according to the specified
205/// flags.
206pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
207    flags: G,
208    f: F,
209) -> R {
210    let prev = PRINT_FLAGS.replace(flags.into());
211    let res = f();
212    PRINT_FLAGS.set(prev);
213    res
214}
215
216/// Event represents all the things that happened simultaneously in a
217/// given execution cycle. Event may contain only one update for each
218/// variable and netidx subscription in a given cycle, if more updates
219/// happen simultaneously they must be queued and deferred to later
220/// cycles.
221#[derive(Debug)]
222pub struct Event<E: UserEvent> {
223    pub init: bool,
224    pub variables: FxHashMap<BindId, Value>,
225    pub netidx: FxHashMap<SubId, subscriber::Event>,
226    pub writes: FxHashMap<Id, WriteRequest>,
227    pub rpc_calls: FxHashMap<BindId, RpcCall>,
228    pub user: E,
229}
230
231impl<E: UserEvent> Event<E> {
232    pub fn new(user: E) -> Self {
233        Event {
234            init: false,
235            variables: HashMap::default(),
236            netidx: HashMap::default(),
237            writes: HashMap::default(),
238            rpc_calls: HashMap::default(),
239            user,
240        }
241    }
242
243    pub fn clear(&mut self) {
244        let Self { init, variables, netidx, rpc_calls, writes, user } = self;
245        *init = false;
246        variables.clear();
247        netidx.clear();
248        rpc_calls.clear();
249        writes.clear();
250        user.clear();
251    }
252}
253
254#[derive(Debug, Clone, Default)]
255pub struct Refs {
256    refed: LPooled<FxHashSet<BindId>>,
257    bound: LPooled<FxHashSet<BindId>>,
258}
259
260impl Refs {
261    pub fn clear(&mut self) {
262        self.refed.clear();
263        self.bound.clear();
264    }
265
266    pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
267        for id in &*self.refed {
268            if !self.bound.contains(id) {
269                f(*id);
270            }
271        }
272    }
273}
274
275pub type Node<R, E> = Box<dyn Update<R, E>>;
276
277pub type BuiltInInitFn<R, E> = sync::Arc<
278    dyn for<'a, 'b, 'c> Fn(
279            &'a mut ExecCtx<R, E>,
280            &'a FnType,
281            &'b Scope,
282            &'c [Node<R, E>],
283            ExprId,
284        ) -> Result<Box<dyn Apply<R, E>>>
285        + Send
286        + Sync
287        + 'static,
288>;
289
290pub type InitFn<R, E> = sync::Arc<
291    dyn for<'a, 'b, 'c> Fn(
292            &'a Scope,
293            &'b mut ExecCtx<R, E>,
294            &'c mut [Node<R, E>],
295            ExprId,
296            bool,
297        ) -> Result<Box<dyn Apply<R, E>>>
298        + Send
299        + Sync
300        + 'static,
301>;
302
303/// Apply is a kind of node that represents a function application. It
304/// does not hold ownership of it's arguments, instead those are held
305/// by a CallSite node. This allows us to change the function called
306/// at runtime without recompiling the arguments.
307pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
308    fn update(
309        &mut self,
310        ctx: &mut ExecCtx<R, E>,
311        from: &mut [Node<R, E>],
312        event: &mut Event<E>,
313    ) -> Option<Value>;
314
315    /// delete any internally generated nodes, only needed for
316    /// builtins that dynamically generate code at runtime
317    fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
318        ()
319    }
320
321    /// apply custom typechecking to the lambda, only needed for
322    /// builtins that take lambdas as arguments
323    fn typecheck(
324        &mut self,
325        _ctx: &mut ExecCtx<R, E>,
326        _from: &mut [Node<R, E>],
327    ) -> Result<()> {
328        Ok(())
329    }
330
331    /// return the lambdas type, builtins do not need to implement
332    /// this, it is implemented by the BuiltIn wrapper
333    fn typ(&self) -> Arc<FnType> {
334        static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
335            Arc::new(FnType {
336                args: Arc::from_iter([]),
337                constraints: Arc::new(RwLock::new(LPooled::take())),
338                rtype: Type::Bottom,
339                throws: Type::Bottom,
340                vargs: None,
341            })
342        });
343        Arc::clone(&*EMPTY)
344    }
345
346    /// Populate the Refs structure with all the ids bound and refed by this
347    /// node. It is only necessary for builtins to implement this if they create
348    /// nodes, such as call sites.
349    fn refs<'a>(&self, _refs: &mut Refs) {}
350
351    /// put the node to sleep, used in conditions like select for branches that
352    /// are not selected. Any cached values should be cleared on sleep.
353    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
354}
355
356/// Update represents a regular graph node, as opposed to a function
357/// application represented by Apply. Regular graph nodes are used for
358/// every built in node except for builtin functions.
359pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
360    /// update the node with the specified event and return any output
361    /// it might generate
362    fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
363
364    /// delete the node and it's children from the specified context
365    fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
366
367    /// type check the node and it's children
368    fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
369
370    /// return the node type
371    fn typ(&self) -> &Type;
372
373    /// Populate the Refs structure with all the bind ids either refed or bound
374    /// by the node and it's children
375    fn refs(&self, refs: &mut Refs);
376
377    /// return the original expression used to compile this node
378    fn spec(&self) -> &Expr;
379
380    /// put the node to sleep, called on unselected branches
381    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
382}
383
384pub trait BuiltIn<R: Rt, E: UserEvent> {
385    const NAME: &str;
386    const TYP: LazyLock<FnType>;
387
388    fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
389}
390
391pub trait Rt: Debug + 'static {
392    fn clear(&mut self);
393
394    /// Subscribe to the specified netidx path. When the subscription
395    /// updates you are expected to deliver Netidx events to the
396    /// expression specified by ref_by.
397    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
398
399    /// Called when a subscription is no longer needed
400    fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
401
402    /// List the netidx path, return Value::Null if the path did not
403    /// change. When the path did update you should send the output
404    /// back as a properly formatted struct with two fields, rows and
405    /// columns both containing string arrays.
406    fn list(&mut self, id: BindId, path: Path);
407
408    /// List the table at path, return Value::Null if the path did not
409    /// change
410    fn list_table(&mut self, id: BindId, path: Path);
411
412    /// list or table will no longer be called on this BindId, and
413    /// related resources can be cleaned up.
414    fn stop_list(&mut self, id: BindId);
415
416    /// Publish the specified value, returning it's Id, which must be
417    /// used to update the value and unpublish it. If the path is
418    /// already published, return an error.
419    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
420
421    /// Update the specified value
422    fn update(&mut self, id: &Val, value: Value);
423
424    /// Stop publishing the specified id
425    fn unpublish(&mut self, id: Val, ref_by: ExprId);
426
427    /// This will be called by the compiler whenever a bound variable
428    /// is referenced. The ref_by is the toplevel expression that
429    /// contains the variable reference. When a variable event
430    /// happens, you should update all the toplevel expressions that
431    /// ref that variable.
432    ///
433    /// ref_var will also be called when a bound lambda expression is
434    /// referenced, in that case the ref_by id will be the toplevel
435    /// expression containing the call site.
436    fn ref_var(&mut self, id: BindId, ref_by: ExprId);
437    fn unref_var(&mut self, id: BindId, ref_by: ExprId);
438
439    /// Called by the ExecCtx when set_var is called on it.
440    ///
441    /// All expressions that ref the id should be updated when this happens. The
442    /// runtime must deliver all set_vars in a single event except that set_vars
443    /// for the same variable in the same cycle must be queued and deferred to
444    /// the next cycle.
445    ///
446    /// The runtime MUST NOT change event while a cycle is in
447    /// progress. set_var must be queued until the cycle ends and then
448    /// presented as a new batch.
449    fn set_var(&mut self, id: BindId, value: Value);
450
451    /// Notify the RT that a top level variable has been set internally
452    ///
453    /// This is called when the compiler has determined that it's safe to set a
454    /// variable without waiting a cycle. When the updated variable is a
455    /// toplevel node this method is called to notify the runtime that needs to
456    /// update any dependent toplevel nodes.
457    fn notify_set(&mut self, id: BindId);
458
459    /// This must return results from the same path in the call order.
460    ///
461    /// when the rpc returns you are expected to deliver a Variable
462    /// event with the specified id to the expression specified by
463    /// ref_by.
464    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
465
466    /// Publish an rpc at the specified path with the specified
467    /// procedure level doc and arg spec.
468    ///
469    /// When the RPC is called the rpc table in event will be
470    /// populated under the specified bind id.
471    ///
472    /// If the procedure is already published an error will be
473    /// returned
474    fn publish_rpc(
475        &mut self,
476        name: Path,
477        doc: Value,
478        spec: Vec<ArgSpec>,
479        id: BindId,
480    ) -> Result<()>;
481
482    /// unpublish the rpc identified by the bind id.
483    fn unpublish_rpc(&mut self, name: Path);
484
485    /// arrange to have a Timer event delivered after timeout. When
486    /// the timer expires you are expected to deliver a Variable event
487    /// for the id, containing the current time.
488    fn set_timer(&mut self, id: BindId, timeout: Duration);
489}
490
491pub struct ExecCtx<R: Rt, E: UserEvent> {
492    builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
493    builtins_allowed: bool,
494    tags: FxHashSet<ArcStr>,
495    pub env: Env<R, E>,
496    pub cached: FxHashMap<BindId, Value>,
497    pub rt: R,
498}
499
500impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
501    pub fn clear(&mut self) {
502        self.env.clear();
503        self.rt.clear();
504    }
505
506    /// Build a new execution context.
507    ///
508    /// This is a very low level interface that you can use to build a
509    /// custom runtime with deep integration to your code. It is very
510    /// difficult to use, and if you don't implement everything
511    /// correctly the semantics of the language can be wrong.
512    ///
513    /// Most likely you want to use the `rt` module instead.
514    pub fn new(user: R) -> Self {
515        Self {
516            env: Env::new(),
517            builtins: FxHashMap::default(),
518            builtins_allowed: true,
519            tags: FxHashSet::default(),
520            cached: HashMap::default(),
521            rt: user,
522        }
523    }
524
525    pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
526        let f = T::init(self);
527        match self.builtins.entry(T::NAME) {
528            Entry::Vacant(e) => {
529                e.insert((T::TYP.clone(), f));
530            }
531            Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
532        }
533        Ok(())
534    }
535
536    /// Built in functions should call this when variables are set
537    /// unless they are sure the variable does not need to be
538    /// cached. This will also call the user ctx set_var.
539    pub fn set_var(&mut self, id: BindId, v: Value) {
540        self.cached.insert(id, v.clone());
541        self.rt.set_var(id, v)
542    }
543
544    fn tag(&mut self, s: &ArcStr) -> ArcStr {
545        match self.tags.get(s) {
546            Some(s) => s.clone(),
547            None => {
548                self.tags.insert(s.clone());
549                s.clone()
550            }
551        }
552    }
553
554    /// Restore the lexical environment to the snapshot `env` for the
555    /// duration of `f` restoring it to it's original value
556    /// afterwords. `by_id` and `lambdas` defined by the closure will
557    /// be retained.
558    pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
559        &mut self,
560        env: Env<R, E>,
561        f: F,
562    ) -> T {
563        let snap = self.env.restore_lexical_env(env);
564        let orig = mem::replace(&mut self.env, snap);
565        let r = f(self);
566        self.env = self.env.restore_lexical_env(orig);
567        r
568    }
569}
570
571#[derive(Debug, Clone)]
572pub struct Scope {
573    pub lexical: ModPath,
574    pub dynamic: ModPath,
575}
576
577impl Scope {
578    pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
579        Self {
580            lexical: ModPath(self.lexical.append(s)),
581            dynamic: ModPath(self.dynamic.append(s)),
582        }
583    }
584
585    pub fn root() -> Self {
586        Self { lexical: ModPath::root(), dynamic: ModPath::root() }
587    }
588}
589
590/// compile the expression into a node graph in the specified context
591/// and scope, return the root node or an error if compilation failed.
592pub fn compile<R: Rt, E: UserEvent>(
593    ctx: &mut ExecCtx<R, E>,
594    flags: BitFlags<CFlag>,
595    scope: &Scope,
596    spec: Expr,
597) -> Result<Node<R, E>> {
598    let top_id = spec.id;
599    let env = ctx.env.clone();
600    let st = Instant::now();
601    let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
602        Ok(n) => n,
603        Err(e) => {
604            ctx.env = env;
605            return Err(e);
606        }
607    };
608    info!("compile time {:?}", st.elapsed());
609    let st = Instant::now();
610    if let Err(e) = node.typecheck(ctx) {
611        ctx.env = env;
612        return Err(e);
613    }
614    info!("typecheck time {:?}", st.elapsed());
615    Ok(node)
616}