graphix_compiler/
lib.rs

1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14    env::Env,
15    expr::{ExprId, ModPath},
16    typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use futures::channel::mpsc;
23use fxhash::{FxHashMap, FxHashSet};
24use log::info;
25use netidx::{
26    path::Path,
27    publisher::{Id, Val, WriteRequest},
28    subscriber::{self, Dval, SubId, UpdatesFlags, Value},
29};
30use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
31use node::compiler;
32use parking_lot::RwLock;
33use poolshark::{
34    global::{GPooled, Pool},
35    local::LPooled,
36};
37use std::{
38    any::{Any, TypeId},
39    cell::Cell,
40    collections::{hash_map::Entry, HashMap},
41    fmt::Debug,
42    mem,
43    sync::{
44        self,
45        atomic::{AtomicBool, Ordering},
46        LazyLock,
47    },
48    time::Duration,
49};
50use tokio::{task, time::Instant};
51use triomphe::Arc;
52
53#[derive(Debug, Clone, Copy)]
54#[bitflags]
55#[repr(u64)]
56pub enum CFlag {
57    WarnUnhandled,
58    WarnUnhandledArith,
59    WarnUnused,
60    WarningsAreErrors,
61}
62
63#[allow(dead_code)]
64static TRACE: AtomicBool = AtomicBool::new(false);
65
66#[allow(dead_code)]
67fn set_trace(b: bool) {
68    TRACE.store(b, Ordering::Relaxed)
69}
70
71#[allow(dead_code)]
72fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
73    let set = if enable {
74        eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
75        let prev = trace();
76        set_trace(true);
77        !prev
78    } else {
79        false
80    };
81    let r = match f() {
82        Err(e) => {
83            eprintln!("traced at {} failed with {e:?}", spec.pos);
84            Err(e)
85        }
86        r => r,
87    };
88    if set {
89        eprintln!("trace disabled at {}", spec.pos);
90        set_trace(false)
91    }
92    r
93}
94
95#[allow(dead_code)]
96fn trace() -> bool {
97    TRACE.load(Ordering::Relaxed)
98}
99
100#[macro_export]
101macro_rules! tdbg {
102    ($e:expr) => {
103        if $crate::trace() {
104            dbg!($e)
105        } else {
106            $e
107        }
108    };
109}
110
111#[macro_export]
112macro_rules! err {
113    ($tag:expr, $err:literal) => {{
114        let e: Value = ($tag.clone(), arcstr::literal!($err)).into();
115        Value::Error(triomphe::Arc::new(e))
116    }};
117}
118
119#[macro_export]
120macro_rules! errf {
121    ($tag:expr, $fmt:expr, $args:tt) => {{
122        let msg: ArcStr = compact_str::format_compact!($fmt, $args).as_str().into();
123        let e: Value = ($tag.clone(), msg).into();
124        Value::Error(triomphe::Arc::new(e))
125    }};
126    ($tag:expr, $fmt:expr) => {{
127        let msg: ArcStr = compact_str::format_compact!($fmt).as_str().into();
128        let e: Value = ($tag.clone(), msg).into();
129        Value::Error(triomphe::Arc::new(e))
130    }};
131}
132
133#[macro_export]
134macro_rules! defetyp {
135    ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
136        static $tag_name: ArcStr = arcstr::literal!($tag);
137        static $name: ::std::sync::LazyLock<$crate::typ::Type> =
138            ::std::sync::LazyLock::new(|| {
139                let scope = $crate::expr::ModPath::root();
140                $crate::expr::parser::parse_type(&format!($typ, $tag))
141                    .expect("failed to parse type")
142                    .scope_refs(&scope)
143            });
144    };
145}
146
147defetyp!(CAST_ERR, CAST_ERR_TAG, "InvalidCast", "Error<`{}(string)>");
148
149atomic_id!(LambdaId);
150
151impl From<u64> for LambdaId {
152    fn from(v: u64) -> Self {
153        LambdaId(v)
154    }
155}
156
157atomic_id!(BindId);
158
159impl From<u64> for BindId {
160    fn from(v: u64) -> Self {
161        BindId(v)
162    }
163}
164
165impl TryFrom<Value> for BindId {
166    type Error = anyhow::Error;
167
168    fn try_from(value: Value) -> Result<Self> {
169        match value {
170            Value::U64(id) => Ok(BindId(id)),
171            v => bail!("invalid bind id {v}"),
172        }
173    }
174}
175
176pub trait UserEvent: Clone + Debug + Any {
177    fn clear(&mut self);
178}
179
180#[derive(Debug, Clone)]
181pub struct NoUserEvent;
182
183impl UserEvent for NoUserEvent {
184    fn clear(&mut self) {}
185}
186
187#[derive(Debug, Clone, Copy)]
188#[bitflags]
189#[repr(u64)]
190pub enum PrintFlag {
191    /// Dereference type variables and print both the tvar name and the bound
192    /// type or "unbound".
193    DerefTVars,
194    /// Replace common primitives with shorter type names as defined
195    /// in core. e.g. Any, instead of the set of every primitive type.
196    ReplacePrims,
197    /// When formatting an Origin don't print the source, just the location
198    NoSource,
199    /// When formatting an Origin don't print the origin's parents
200    NoParents,
201}
202
203thread_local! {
204    static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims | PrintFlag::NoSource);
205}
206
207/// global pool of channel watch batches
208pub static CBATCH_POOL: LazyLock<Pool<Vec<(BindId, Value)>>> =
209    LazyLock::new(|| Pool::new(10000, 1000));
210
211/// For the duration of the closure F change the way type variables
212/// are formatted (on this thread only) according to the specified
213/// flags.
214pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
215    flags: G,
216    f: F,
217) -> R {
218    let prev = PRINT_FLAGS.replace(flags.into());
219    let res = f();
220    PRINT_FLAGS.set(prev);
221    res
222}
223
224/// Event represents all the things that happened simultaneously in a
225/// given execution cycle. Event may contain only one update for each
226/// variable and netidx subscription in a given cycle, if more updates
227/// happen simultaneously they must be queued and deferred to later
228/// cycles.
229#[derive(Debug)]
230pub struct Event<E: UserEvent> {
231    pub init: bool,
232    pub variables: FxHashMap<BindId, Value>,
233    pub netidx: FxHashMap<SubId, subscriber::Event>,
234    pub writes: FxHashMap<Id, WriteRequest>,
235    pub rpc_calls: FxHashMap<BindId, RpcCall>,
236    pub user: E,
237}
238
239impl<E: UserEvent> Event<E> {
240    pub fn new(user: E) -> Self {
241        Event {
242            init: false,
243            variables: HashMap::default(),
244            netidx: HashMap::default(),
245            writes: HashMap::default(),
246            rpc_calls: HashMap::default(),
247            user,
248        }
249    }
250
251    pub fn clear(&mut self) {
252        let Self { init, variables, netidx, rpc_calls, writes, user } = self;
253        *init = false;
254        variables.clear();
255        netidx.clear();
256        rpc_calls.clear();
257        writes.clear();
258        user.clear();
259    }
260}
261
262#[derive(Debug, Clone, Default)]
263pub struct Refs {
264    refed: LPooled<FxHashSet<BindId>>,
265    bound: LPooled<FxHashSet<BindId>>,
266}
267
268impl Refs {
269    pub fn clear(&mut self) {
270        self.refed.clear();
271        self.bound.clear();
272    }
273
274    pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
275        for id in &*self.refed {
276            if !self.bound.contains(id) {
277                f(*id);
278            }
279        }
280    }
281}
282
283pub type Node<R, E> = Box<dyn Update<R, E>>;
284
285pub type BuiltInInitFn<R, E> = sync::Arc<
286    dyn for<'a, 'b, 'c> Fn(
287            &'a mut ExecCtx<R, E>,
288            &'a FnType,
289            &'b Scope,
290            &'c [Node<R, E>],
291            ExprId,
292        ) -> Result<Box<dyn Apply<R, E>>>
293        + Send
294        + Sync
295        + 'static,
296>;
297
298pub type InitFn<R, E> = sync::Arc<
299    dyn for<'a, 'b, 'c> Fn(
300            &'a Scope,
301            &'b mut ExecCtx<R, E>,
302            &'c mut [Node<R, E>],
303            ExprId,
304            bool,
305        ) -> Result<Box<dyn Apply<R, E>>>
306        + Send
307        + Sync
308        + 'static,
309>;
310
311/// Apply is a kind of node that represents a function application. It
312/// does not hold ownership of it's arguments, instead those are held
313/// by a CallSite node. This allows us to change the function called
314/// at runtime without recompiling the arguments.
315pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
316    fn update(
317        &mut self,
318        ctx: &mut ExecCtx<R, E>,
319        from: &mut [Node<R, E>],
320        event: &mut Event<E>,
321    ) -> Option<Value>;
322
323    /// delete any internally generated nodes, only needed for
324    /// builtins that dynamically generate code at runtime
325    fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
326        ()
327    }
328
329    /// apply custom typechecking to the lambda, only needed for
330    /// builtins that take lambdas as arguments
331    fn typecheck(
332        &mut self,
333        _ctx: &mut ExecCtx<R, E>,
334        _from: &mut [Node<R, E>],
335    ) -> Result<()> {
336        Ok(())
337    }
338
339    /// return the lambdas type, builtins do not need to implement
340    /// this, it is implemented by the BuiltIn wrapper
341    fn typ(&self) -> Arc<FnType> {
342        static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
343            Arc::new(FnType {
344                args: Arc::from_iter([]),
345                constraints: Arc::new(RwLock::new(LPooled::take())),
346                rtype: Type::Bottom,
347                throws: Type::Bottom,
348                vargs: None,
349            })
350        });
351        Arc::clone(&*EMPTY)
352    }
353
354    /// Populate the Refs structure with all the ids bound and refed by this
355    /// node. It is only necessary for builtins to implement this if they create
356    /// nodes, such as call sites.
357    fn refs<'a>(&self, _refs: &mut Refs) {}
358
359    /// put the node to sleep, used in conditions like select for branches that
360    /// are not selected. Any cached values should be cleared on sleep.
361    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
362}
363
364/// Update represents a regular graph node, as opposed to a function
365/// application represented by Apply. Regular graph nodes are used for
366/// every built in node except for builtin functions.
367pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
368    /// update the node with the specified event and return any output
369    /// it might generate
370    fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
371
372    /// delete the node and it's children from the specified context
373    fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
374
375    /// type check the node and it's children
376    fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
377
378    /// return the node type
379    fn typ(&self) -> &Type;
380
381    /// Populate the Refs structure with all the bind ids either refed or bound
382    /// by the node and it's children
383    fn refs(&self, refs: &mut Refs);
384
385    /// return the original expression used to compile this node
386    fn spec(&self) -> &Expr;
387
388    /// put the node to sleep, called on unselected branches
389    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
390}
391
392pub trait BuiltIn<R: Rt, E: UserEvent> {
393    const NAME: &str;
394    const TYP: LazyLock<FnType>;
395
396    fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
397}
398
399pub trait Abortable {
400    fn abort(&self);
401}
402
403impl Abortable for task::AbortHandle {
404    fn abort(&self) {
405        task::AbortHandle::abort(self)
406    }
407}
408
409pub trait Rt: Debug + 'static {
410    type AbortHandle: Abortable;
411
412    fn clear(&mut self);
413
414    /// Subscribe to the specified netidx path
415    ///
416    /// When the subscription updates you are expected to deliver
417    /// Netidx events to the expression specified by ref_by.
418    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
419
420    /// Called when a subscription is no longer needed
421    fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
422
423    /// List the netidx path, return Value::Null if the path did not
424    /// change. When the path did update you should send the output
425    /// back as a properly formatted struct with two fields, rows and
426    /// columns both containing string arrays.
427    fn list(&mut self, id: BindId, path: Path);
428
429    /// List the table at path, return Value::Null if the path did not
430    /// change
431    fn list_table(&mut self, id: BindId, path: Path);
432
433    /// list or table will no longer be called on this BindId, and
434    /// related resources can be cleaned up.
435    fn stop_list(&mut self, id: BindId);
436
437    /// Publish the specified value, returning it's Id, which must be
438    /// used to update the value and unpublish it. If the path is
439    /// already published, return an error.
440    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
441
442    /// Update the specified value
443    fn update(&mut self, id: &Val, value: Value);
444
445    /// Stop publishing the specified id
446    fn unpublish(&mut self, id: Val, ref_by: ExprId);
447
448    /// This will be called by the compiler whenever a bound variable
449    /// is referenced. The ref_by is the toplevel expression that
450    /// contains the variable reference. When a variable event
451    /// happens, you should update all the toplevel expressions that
452    /// ref that variable.
453    ///
454    /// ref_var will also be called when a bound lambda expression is
455    /// referenced, in that case the ref_by id will be the toplevel
456    /// expression containing the call site.
457    fn ref_var(&mut self, id: BindId, ref_by: ExprId);
458    fn unref_var(&mut self, id: BindId, ref_by: ExprId);
459
460    /// Called by the ExecCtx when set_var is called on it.
461    ///
462    /// All expressions that ref the id should be updated when this happens. The
463    /// runtime must deliver all set_vars in a single event except that set_vars
464    /// for the same variable in the same cycle must be queued and deferred to
465    /// the next cycle.
466    ///
467    /// The runtime MUST NOT change event while a cycle is in
468    /// progress. set_var must be queued until the cycle ends and then
469    /// presented as a new batch.
470    fn set_var(&mut self, id: BindId, value: Value);
471
472    /// Notify the RT that a top level variable has been set internally
473    ///
474    /// This is called when the compiler has determined that it's safe to set a
475    /// variable without waiting a cycle. When the updated variable is a
476    /// toplevel node this method is called to notify the runtime that needs to
477    /// update any dependent toplevel nodes.
478    fn notify_set(&mut self, id: BindId);
479
480    /// This must return results from the same path in the call order.
481    ///
482    /// when the rpc returns you are expected to deliver a Variable
483    /// event with the specified id to the expression specified by
484    /// ref_by.
485    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
486
487    /// Publish an rpc at the specified path with the specified
488    /// procedure level doc and arg spec.
489    ///
490    /// When the RPC is called the rpc table in event will be
491    /// populated under the specified bind id.
492    ///
493    /// If the procedure is already published an error will be
494    /// returned
495    fn publish_rpc(
496        &mut self,
497        name: Path,
498        doc: Value,
499        spec: Vec<ArgSpec>,
500        id: BindId,
501    ) -> Result<()>;
502
503    /// unpublish the rpc identified by the bind id.
504    fn unpublish_rpc(&mut self, name: Path);
505
506    /// arrange to have a Timer event delivered after timeout. When
507    /// the timer expires you are expected to deliver a Variable event
508    /// for the id, containing the current time.
509    fn set_timer(&mut self, id: BindId, timeout: Duration);
510
511    /// Spawn a task
512    ///
513    /// When the task completes it's output must be delivered as a
514    /// variable event using the returned `BindId`
515    ///
516    /// Calling `abort` must guarantee that if it is called before the
517    /// task completes then no update will be delivered.
518    fn spawn<F: Future<Output = (BindId, Value)> + Send + 'static>(
519        &mut self,
520        f: F,
521    ) -> Self::AbortHandle;
522
523    /// Ask the runtime to watch a channel
524    ///
525    /// When event batches arrive via the channel the runtime must
526    /// deliver the events as variable updates.
527    fn watch(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>);
528}
529
530#[derive(Default)]
531pub struct LibState(FxHashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>);
532
533impl LibState {
534    /// Look up and return the context global library state of type
535    /// `T`.
536    ///
537    /// If none is registered in this context for `T` then create one
538    /// using `T::default`
539    pub fn get_or_default<T>(&mut self) -> &mut T
540    where
541        T: Default + Any + Send + Sync + 'static,
542    {
543        self.0
544            .entry(TypeId::of::<T>())
545            .or_insert_with(|| {
546                Box::new(T::default()) as Box<dyn Any + Send + Sync + 'static>
547            })
548            .downcast_mut::<T>()
549            .unwrap()
550    }
551
552    /// Look up and return the context global library state of type
553    /// `T`.
554    ///
555    /// If none is registered in this context for `T` then create one
556    /// using the provided function.
557    pub fn get_or_else<T, F>(&mut self, f: F) -> &mut T
558    where
559        T: Any + Send + Sync + 'static,
560        F: FnOnce() -> T,
561    {
562        self.0
563            .entry(TypeId::of::<T>())
564            .or_insert_with(|| Box::new(f()) as Box<dyn Any + Send + Sync + 'static>)
565            .downcast_mut::<T>()
566            .unwrap()
567    }
568
569    /// Look up and return a reference to the context global library
570    /// state of type `T`.
571    ///
572    /// If none is registered in this context for `T` return `None`
573    pub fn get<T>(&mut self) -> Option<&T>
574    where
575        T: Any + Send + Sync + 'static,
576    {
577        self.0.get(&TypeId::of::<T>()).map(|t| t.downcast_ref::<T>().unwrap())
578    }
579
580    /// Look up and return a mutable reference to the context global
581    /// library state of type `T`.
582    ///
583    /// If none is registered return `None`
584    pub fn get_mut<T>(&mut self) -> Option<&mut T>
585    where
586        T: Any + Send + Sync + 'static,
587    {
588        self.0.get_mut(&TypeId::of::<T>()).map(|t| t.downcast_mut::<T>().unwrap())
589    }
590
591    /// Set the context global library state of type `T`
592    ///
593    /// Any existing state will be returned
594    pub fn set<T>(&mut self, t: T) -> Option<Box<T>>
595    where
596        T: Any + Send + Sync + 'static,
597    {
598        self.0
599            .insert(
600                TypeId::of::<T>(),
601                Box::new(t) as Box<dyn Any + Send + Sync + 'static>,
602            )
603            .map(|t| t.downcast::<T>().unwrap())
604    }
605
606    /// Remove and refurn the context global state library state of type `T`
607    pub fn remove<T>(&mut self) -> Option<Box<T>>
608    where
609        T: Any + Send + Sync + 'static,
610    {
611        self.0.remove(&TypeId::of::<T>()).map(|t| t.downcast::<T>().unwrap())
612    }
613}
614
615pub struct ExecCtx<R: Rt, E: UserEvent> {
616    builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
617    builtins_allowed: bool,
618    tags: FxHashSet<ArcStr>,
619    /// context global library state for built-in functions
620    pub libstate: LibState,
621    /// the language environment, typdefs, binds, lambdas, etc
622    pub env: Env<R, E>,
623    /// the last value of every bound variable
624    pub cached: FxHashMap<BindId, Value>,
625    /// the runtime
626    pub rt: R,
627}
628
629impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
630    pub fn clear(&mut self) {
631        self.env.clear();
632        self.rt.clear();
633    }
634
635    /// Build a new execution context.
636    ///
637    /// This is a very low level interface that you can use to build a
638    /// custom runtime with deep integration to your code. It is very
639    /// difficult to use, and if you don't implement everything
640    /// correctly the semantics of the language can be wrong.
641    ///
642    /// Most likely you want to use the `rt` module instead.
643    pub fn new(user: R) -> Self {
644        Self {
645            env: Env::new(),
646            builtins: FxHashMap::default(),
647            builtins_allowed: true,
648            libstate: LibState::default(),
649            tags: FxHashSet::default(),
650            cached: HashMap::default(),
651            rt: user,
652        }
653    }
654
655    pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
656        let f = T::init(self);
657        match self.builtins.entry(T::NAME) {
658            Entry::Vacant(e) => {
659                e.insert((T::TYP.clone(), f));
660            }
661            Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
662        }
663        Ok(())
664    }
665
666    /// Built in functions should call this when variables are set
667    /// unless they are sure the variable does not need to be
668    /// cached. This will also call the user ctx set_var.
669    pub fn set_var(&mut self, id: BindId, v: Value) {
670        self.cached.insert(id, v.clone());
671        self.rt.set_var(id, v)
672    }
673
674    fn tag(&mut self, s: &ArcStr) -> ArcStr {
675        match self.tags.get(s) {
676            Some(s) => s.clone(),
677            None => {
678                self.tags.insert(s.clone());
679                s.clone()
680            }
681        }
682    }
683
684    /// Restore the lexical environment to the snapshot `env` for the
685    /// duration of `f` restoring it to it's original value
686    /// afterwords. `by_id` and `lambdas` defined by the closure will
687    /// be retained.
688    pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
689        &mut self,
690        env: Env<R, E>,
691        f: F,
692    ) -> T {
693        let snap = self.env.restore_lexical_env(env);
694        let orig = mem::replace(&mut self.env, snap);
695        let r = f(self);
696        self.env = self.env.restore_lexical_env(orig);
697        r
698    }
699}
700
701#[derive(Debug, Clone)]
702pub struct Scope {
703    pub lexical: ModPath,
704    pub dynamic: ModPath,
705}
706
707impl Scope {
708    pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
709        Self {
710            lexical: ModPath(self.lexical.append(s)),
711            dynamic: ModPath(self.dynamic.append(s)),
712        }
713    }
714
715    pub fn root() -> Self {
716        Self { lexical: ModPath::root(), dynamic: ModPath::root() }
717    }
718}
719
720/// compile the expression into a node graph in the specified context
721/// and scope, return the root node or an error if compilation failed.
722pub fn compile<R: Rt, E: UserEvent>(
723    ctx: &mut ExecCtx<R, E>,
724    flags: BitFlags<CFlag>,
725    scope: &Scope,
726    spec: Expr,
727) -> Result<Node<R, E>> {
728    let top_id = spec.id;
729    let env = ctx.env.clone();
730    let st = Instant::now();
731    let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
732        Ok(n) => n,
733        Err(e) => {
734            ctx.env = env;
735            return Err(e);
736        }
737    };
738    info!("compile time {:?}", st.elapsed());
739    let st = Instant::now();
740    if let Err(e) = node.typecheck(ctx) {
741        ctx.env = env;
742        return Err(e);
743    }
744    info!("typecheck time {:?}", st.elapsed());
745    Ok(node)
746}