graphix_compiler/
lib.rs

1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14    env::Env,
15    expr::{ExprId, ModPath},
16    typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use futures::channel::mpsc;
23use fxhash::{FxHashMap, FxHashSet};
24use log::info;
25use netidx::{
26    path::Path,
27    publisher::{Id, Val, WriteRequest},
28    subscriber::{self, Dval, SubId, UpdatesFlags, Value},
29};
30use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
31use node::compiler;
32use parking_lot::RwLock;
33use poolshark::{global::GPooled, local::LPooled};
34use std::{
35    any::{Any, TypeId},
36    cell::Cell,
37    collections::{hash_map::Entry, HashMap},
38    fmt::Debug,
39    mem,
40    sync::{
41        self,
42        atomic::{AtomicBool, Ordering},
43        LazyLock,
44    },
45    time::Duration,
46};
47use tokio::{task, time::Instant};
48use triomphe::Arc;
49
50#[derive(Debug, Clone, Copy)]
51#[bitflags]
52#[repr(u64)]
53pub enum CFlag {
54    WarnUnhandled,
55    WarnUnhandledArith,
56    WarnUnused,
57    WarningsAreErrors,
58}
59
60#[allow(dead_code)]
61static TRACE: AtomicBool = AtomicBool::new(false);
62
63#[allow(dead_code)]
64fn set_trace(b: bool) {
65    TRACE.store(b, Ordering::Relaxed)
66}
67
68#[allow(dead_code)]
69fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
70    let set = if enable {
71        eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
72        let prev = trace();
73        set_trace(true);
74        !prev
75    } else {
76        false
77    };
78    let r = match f() {
79        Err(e) => {
80            eprintln!("traced at {} failed with {e:?}", spec.pos);
81            Err(e)
82        }
83        r => r,
84    };
85    if set {
86        eprintln!("trace disabled at {}", spec.pos);
87        set_trace(false)
88    }
89    r
90}
91
92#[allow(dead_code)]
93fn trace() -> bool {
94    TRACE.load(Ordering::Relaxed)
95}
96
97#[macro_export]
98macro_rules! tdbg {
99    ($e:expr) => {
100        if $crate::trace() {
101            dbg!($e)
102        } else {
103            $e
104        }
105    };
106}
107
108#[macro_export]
109macro_rules! err {
110    ($tag:expr, $err:literal) => {{
111        let e: Value = ($tag.clone(), literal!($err)).into();
112        Value::Error(triomphe::Arc::new(e))
113    }};
114}
115
116#[macro_export]
117macro_rules! errf {
118    ($tag:expr, $fmt:expr, $args:tt) => {{
119        let msg: ArcStr = compact_str::format_compact!($fmt, $args).as_str().into();
120        let e: Value = ($tag.clone(), msg).into();
121        Value::Error(triomphe::Arc::new(e))
122    }};
123    ($tag:expr, $fmt:expr) => {{
124        let msg: ArcStr = compact_str::format_compact!($fmt).as_str().into();
125        let e: Value = ($tag.clone(), msg).into();
126        Value::Error(triomphe::Arc::new(e))
127    }};
128}
129
130#[macro_export]
131macro_rules! defetyp {
132    ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
133        static $tag_name: ArcStr = arcstr::literal!($tag);
134        static $name: ::std::sync::LazyLock<$crate::typ::Type> =
135            ::std::sync::LazyLock::new(|| {
136                let scope = $crate::expr::ModPath::root();
137                $crate::expr::parser::parse_type(&format!($typ, $tag))
138                    .expect("failed to parse type")
139                    .scope_refs(&scope)
140            });
141    };
142}
143
144defetyp!(CAST_ERR, CAST_ERR_TAG, "InvalidCast", "Error<`{}(string)>");
145
146atomic_id!(LambdaId);
147
148impl From<u64> for LambdaId {
149    fn from(v: u64) -> Self {
150        LambdaId(v)
151    }
152}
153
154atomic_id!(BindId);
155
156impl From<u64> for BindId {
157    fn from(v: u64) -> Self {
158        BindId(v)
159    }
160}
161
162impl TryFrom<Value> for BindId {
163    type Error = anyhow::Error;
164
165    fn try_from(value: Value) -> Result<Self> {
166        match value {
167            Value::U64(id) => Ok(BindId(id)),
168            v => bail!("invalid bind id {v}"),
169        }
170    }
171}
172
173pub trait UserEvent: Clone + Debug + Any {
174    fn clear(&mut self);
175}
176
177#[derive(Debug, Clone)]
178pub struct NoUserEvent;
179
180impl UserEvent for NoUserEvent {
181    fn clear(&mut self) {}
182}
183
184#[derive(Debug, Clone, Copy)]
185#[bitflags]
186#[repr(u64)]
187pub enum PrintFlag {
188    /// Dereference type variables and print both the tvar name and the bound
189    /// type or "unbound".
190    DerefTVars,
191    /// Replace common primitives with shorter type names as defined
192    /// in core. e.g. Any, instead of the set of every primitive type.
193    ReplacePrims,
194    /// When formatting an Origin don't print the source, just the location
195    NoSource,
196    /// When formatting an Origin don't print the origin's parents
197    NoParents,
198}
199
200thread_local! {
201    static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims | PrintFlag::NoSource);
202}
203
204/// For the duration of the closure F change the way type variables
205/// are formatted (on this thread only) according to the specified
206/// flags.
207pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
208    flags: G,
209    f: F,
210) -> R {
211    let prev = PRINT_FLAGS.replace(flags.into());
212    let res = f();
213    PRINT_FLAGS.set(prev);
214    res
215}
216
217/// Event represents all the things that happened simultaneously in a
218/// given execution cycle. Event may contain only one update for each
219/// variable and netidx subscription in a given cycle, if more updates
220/// happen simultaneously they must be queued and deferred to later
221/// cycles.
222#[derive(Debug)]
223pub struct Event<E: UserEvent> {
224    pub init: bool,
225    pub variables: FxHashMap<BindId, Value>,
226    pub netidx: FxHashMap<SubId, subscriber::Event>,
227    pub writes: FxHashMap<Id, WriteRequest>,
228    pub rpc_calls: FxHashMap<BindId, RpcCall>,
229    pub user: E,
230}
231
232impl<E: UserEvent> Event<E> {
233    pub fn new(user: E) -> Self {
234        Event {
235            init: false,
236            variables: HashMap::default(),
237            netidx: HashMap::default(),
238            writes: HashMap::default(),
239            rpc_calls: HashMap::default(),
240            user,
241        }
242    }
243
244    pub fn clear(&mut self) {
245        let Self { init, variables, netidx, rpc_calls, writes, user } = self;
246        *init = false;
247        variables.clear();
248        netidx.clear();
249        rpc_calls.clear();
250        writes.clear();
251        user.clear();
252    }
253}
254
255#[derive(Debug, Clone, Default)]
256pub struct Refs {
257    refed: LPooled<FxHashSet<BindId>>,
258    bound: LPooled<FxHashSet<BindId>>,
259}
260
261impl Refs {
262    pub fn clear(&mut self) {
263        self.refed.clear();
264        self.bound.clear();
265    }
266
267    pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
268        for id in &*self.refed {
269            if !self.bound.contains(id) {
270                f(*id);
271            }
272        }
273    }
274}
275
276pub type Node<R, E> = Box<dyn Update<R, E>>;
277
278pub type BuiltInInitFn<R, E> = sync::Arc<
279    dyn for<'a, 'b, 'c> Fn(
280            &'a mut ExecCtx<R, E>,
281            &'a FnType,
282            &'b Scope,
283            &'c [Node<R, E>],
284            ExprId,
285        ) -> Result<Box<dyn Apply<R, E>>>
286        + Send
287        + Sync
288        + 'static,
289>;
290
291pub type InitFn<R, E> = sync::Arc<
292    dyn for<'a, 'b, 'c> Fn(
293            &'a Scope,
294            &'b mut ExecCtx<R, E>,
295            &'c mut [Node<R, E>],
296            ExprId,
297            bool,
298        ) -> Result<Box<dyn Apply<R, E>>>
299        + Send
300        + Sync
301        + 'static,
302>;
303
304/// Apply is a kind of node that represents a function application. It
305/// does not hold ownership of it's arguments, instead those are held
306/// by a CallSite node. This allows us to change the function called
307/// at runtime without recompiling the arguments.
308pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
309    fn update(
310        &mut self,
311        ctx: &mut ExecCtx<R, E>,
312        from: &mut [Node<R, E>],
313        event: &mut Event<E>,
314    ) -> Option<Value>;
315
316    /// delete any internally generated nodes, only needed for
317    /// builtins that dynamically generate code at runtime
318    fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
319        ()
320    }
321
322    /// apply custom typechecking to the lambda, only needed for
323    /// builtins that take lambdas as arguments
324    fn typecheck(
325        &mut self,
326        _ctx: &mut ExecCtx<R, E>,
327        _from: &mut [Node<R, E>],
328    ) -> Result<()> {
329        Ok(())
330    }
331
332    /// return the lambdas type, builtins do not need to implement
333    /// this, it is implemented by the BuiltIn wrapper
334    fn typ(&self) -> Arc<FnType> {
335        static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
336            Arc::new(FnType {
337                args: Arc::from_iter([]),
338                constraints: Arc::new(RwLock::new(LPooled::take())),
339                rtype: Type::Bottom,
340                throws: Type::Bottom,
341                vargs: None,
342            })
343        });
344        Arc::clone(&*EMPTY)
345    }
346
347    /// Populate the Refs structure with all the ids bound and refed by this
348    /// node. It is only necessary for builtins to implement this if they create
349    /// nodes, such as call sites.
350    fn refs<'a>(&self, _refs: &mut Refs) {}
351
352    /// put the node to sleep, used in conditions like select for branches that
353    /// are not selected. Any cached values should be cleared on sleep.
354    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
355}
356
357/// Update represents a regular graph node, as opposed to a function
358/// application represented by Apply. Regular graph nodes are used for
359/// every built in node except for builtin functions.
360pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
361    /// update the node with the specified event and return any output
362    /// it might generate
363    fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
364
365    /// delete the node and it's children from the specified context
366    fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
367
368    /// type check the node and it's children
369    fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
370
371    /// return the node type
372    fn typ(&self) -> &Type;
373
374    /// Populate the Refs structure with all the bind ids either refed or bound
375    /// by the node and it's children
376    fn refs(&self, refs: &mut Refs);
377
378    /// return the original expression used to compile this node
379    fn spec(&self) -> &Expr;
380
381    /// put the node to sleep, called on unselected branches
382    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
383}
384
385pub trait BuiltIn<R: Rt, E: UserEvent> {
386    const NAME: &str;
387    const TYP: LazyLock<FnType>;
388
389    fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
390}
391
392pub trait Abortable {
393    fn abort(&self);
394}
395
396impl Abortable for task::AbortHandle {
397    fn abort(&self) {
398        task::AbortHandle::abort(self)
399    }
400}
401
402pub trait Rt: Debug + 'static {
403    type AbortHandle: Abortable;
404
405    fn clear(&mut self);
406
407    /// Subscribe to the specified netidx path
408    ///
409    /// When the subscription updates you are expected to deliver
410    /// Netidx events to the expression specified by ref_by.
411    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
412
413    /// Called when a subscription is no longer needed
414    fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
415
416    /// List the netidx path, return Value::Null if the path did not
417    /// change. When the path did update you should send the output
418    /// back as a properly formatted struct with two fields, rows and
419    /// columns both containing string arrays.
420    fn list(&mut self, id: BindId, path: Path);
421
422    /// List the table at path, return Value::Null if the path did not
423    /// change
424    fn list_table(&mut self, id: BindId, path: Path);
425
426    /// list or table will no longer be called on this BindId, and
427    /// related resources can be cleaned up.
428    fn stop_list(&mut self, id: BindId);
429
430    /// Publish the specified value, returning it's Id, which must be
431    /// used to update the value and unpublish it. If the path is
432    /// already published, return an error.
433    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
434
435    /// Update the specified value
436    fn update(&mut self, id: &Val, value: Value);
437
438    /// Stop publishing the specified id
439    fn unpublish(&mut self, id: Val, ref_by: ExprId);
440
441    /// This will be called by the compiler whenever a bound variable
442    /// is referenced. The ref_by is the toplevel expression that
443    /// contains the variable reference. When a variable event
444    /// happens, you should update all the toplevel expressions that
445    /// ref that variable.
446    ///
447    /// ref_var will also be called when a bound lambda expression is
448    /// referenced, in that case the ref_by id will be the toplevel
449    /// expression containing the call site.
450    fn ref_var(&mut self, id: BindId, ref_by: ExprId);
451    fn unref_var(&mut self, id: BindId, ref_by: ExprId);
452
453    /// Called by the ExecCtx when set_var is called on it.
454    ///
455    /// All expressions that ref the id should be updated when this happens. The
456    /// runtime must deliver all set_vars in a single event except that set_vars
457    /// for the same variable in the same cycle must be queued and deferred to
458    /// the next cycle.
459    ///
460    /// The runtime MUST NOT change event while a cycle is in
461    /// progress. set_var must be queued until the cycle ends and then
462    /// presented as a new batch.
463    fn set_var(&mut self, id: BindId, value: Value);
464
465    /// Notify the RT that a top level variable has been set internally
466    ///
467    /// This is called when the compiler has determined that it's safe to set a
468    /// variable without waiting a cycle. When the updated variable is a
469    /// toplevel node this method is called to notify the runtime that needs to
470    /// update any dependent toplevel nodes.
471    fn notify_set(&mut self, id: BindId);
472
473    /// This must return results from the same path in the call order.
474    ///
475    /// when the rpc returns you are expected to deliver a Variable
476    /// event with the specified id to the expression specified by
477    /// ref_by.
478    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
479
480    /// Publish an rpc at the specified path with the specified
481    /// procedure level doc and arg spec.
482    ///
483    /// When the RPC is called the rpc table in event will be
484    /// populated under the specified bind id.
485    ///
486    /// If the procedure is already published an error will be
487    /// returned
488    fn publish_rpc(
489        &mut self,
490        name: Path,
491        doc: Value,
492        spec: Vec<ArgSpec>,
493        id: BindId,
494    ) -> Result<()>;
495
496    /// unpublish the rpc identified by the bind id.
497    fn unpublish_rpc(&mut self, name: Path);
498
499    /// arrange to have a Timer event delivered after timeout. When
500    /// the timer expires you are expected to deliver a Variable event
501    /// for the id, containing the current time.
502    fn set_timer(&mut self, id: BindId, timeout: Duration);
503
504    /// Spawn a task
505    ///
506    /// When the task completes it's output must be delivered as a
507    /// variable event using the returned `BindId`
508    ///
509    /// Calling `abort` must guarantee that if it is called before the
510    /// task completes then no update will be delivered.
511    fn spawn<F: Future<Output = (BindId, Value)> + Send + 'static>(
512        &mut self,
513        f: F,
514    ) -> Self::AbortHandle;
515
516    /// Ask the runtime to watch a channel
517    ///
518    /// When event batches arrive via the channel the runtime must
519    /// deliver the events as variable updates.
520    fn watch(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>);
521}
522
523#[derive(Default)]
524pub struct LibState(FxHashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>);
525
526impl LibState {
527    /// Look up and return the context global library state of type
528    /// `T`.
529    ///
530    /// If none is registered in this context for `T` then create one
531    /// using `T::default`
532    pub fn get_or_default<T>(&mut self) -> &mut T
533    where
534        T: Default + Any + Send + Sync + 'static,
535    {
536        self.0
537            .entry(TypeId::of::<T>())
538            .or_insert_with(|| {
539                Box::new(T::default()) as Box<dyn Any + Send + Sync + 'static>
540            })
541            .downcast_mut::<T>()
542            .unwrap()
543    }
544
545    /// Look up and return the context global library state of type
546    /// `T`.
547    ///
548    /// If none is registered in this context for `T` then create one
549    /// using the provided function.
550    pub fn get_or_else<T, F>(&mut self, f: F) -> &mut T
551    where
552        T: Any + Send + Sync + 'static,
553        F: FnOnce() -> T,
554    {
555        self.0
556            .entry(TypeId::of::<T>())
557            .or_insert_with(|| Box::new(f()) as Box<dyn Any + Send + Sync + 'static>)
558            .downcast_mut::<T>()
559            .unwrap()
560    }
561
562    /// Look up and return a reference to the context global library
563    /// state of type `T`.
564    ///
565    /// If none is registered in this context for `T` return `None`
566    pub fn get<T>(&mut self) -> Option<&T>
567    where
568        T: Any + Send + Sync + 'static,
569    {
570        self.0.get(&TypeId::of::<T>()).map(|t| t.downcast_ref::<T>().unwrap())
571    }
572
573    /// Look up and return a mutable reference to the context global
574    /// library state of type `T`.
575    ///
576    /// If none is registered return `None`
577    pub fn get_mut<T>(&mut self) -> Option<&mut T>
578    where
579        T: Any + Send + Sync + 'static,
580    {
581        self.0.get_mut(&TypeId::of::<T>()).map(|t| t.downcast_mut::<T>().unwrap())
582    }
583
584    /// Set the context global library state of type `T`
585    ///
586    /// Any existing state will be returned
587    pub fn set<T>(&mut self, t: T) -> Option<Box<T>>
588    where
589        T: Any + Send + Sync + 'static,
590    {
591        self.0
592            .insert(
593                TypeId::of::<T>(),
594                Box::new(t) as Box<dyn Any + Send + Sync + 'static>,
595            )
596            .map(|t| t.downcast::<T>().unwrap())
597    }
598
599    /// Remove and refurn the context global state library state of type `T`
600    pub fn remove<T>(&mut self) -> Option<Box<T>>
601    where
602        T: Any + Send + Sync + 'static,
603    {
604        self.0.remove(&TypeId::of::<T>()).map(|t| t.downcast::<T>().unwrap())
605    }
606}
607
608pub struct ExecCtx<R: Rt, E: UserEvent> {
609    builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
610    builtins_allowed: bool,
611    tags: FxHashSet<ArcStr>,
612    /// context global library state for built-in functions
613    pub libstate: LibState,
614    /// the language environment, typdefs, binds, lambdas, etc
615    pub env: Env<R, E>,
616    /// the last value of every bound variable
617    pub cached: FxHashMap<BindId, Value>,
618    /// the runtime
619    pub rt: R,
620}
621
622impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
623    pub fn clear(&mut self) {
624        self.env.clear();
625        self.rt.clear();
626    }
627
628    /// Build a new execution context.
629    ///
630    /// This is a very low level interface that you can use to build a
631    /// custom runtime with deep integration to your code. It is very
632    /// difficult to use, and if you don't implement everything
633    /// correctly the semantics of the language can be wrong.
634    ///
635    /// Most likely you want to use the `rt` module instead.
636    pub fn new(user: R) -> Self {
637        Self {
638            env: Env::new(),
639            builtins: FxHashMap::default(),
640            builtins_allowed: true,
641            libstate: LibState::default(),
642            tags: FxHashSet::default(),
643            cached: HashMap::default(),
644            rt: user,
645        }
646    }
647
648    pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
649        let f = T::init(self);
650        match self.builtins.entry(T::NAME) {
651            Entry::Vacant(e) => {
652                e.insert((T::TYP.clone(), f));
653            }
654            Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
655        }
656        Ok(())
657    }
658
659    /// Built in functions should call this when variables are set
660    /// unless they are sure the variable does not need to be
661    /// cached. This will also call the user ctx set_var.
662    pub fn set_var(&mut self, id: BindId, v: Value) {
663        self.cached.insert(id, v.clone());
664        self.rt.set_var(id, v)
665    }
666
667    fn tag(&mut self, s: &ArcStr) -> ArcStr {
668        match self.tags.get(s) {
669            Some(s) => s.clone(),
670            None => {
671                self.tags.insert(s.clone());
672                s.clone()
673            }
674        }
675    }
676
677    /// Restore the lexical environment to the snapshot `env` for the
678    /// duration of `f` restoring it to it's original value
679    /// afterwords. `by_id` and `lambdas` defined by the closure will
680    /// be retained.
681    pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
682        &mut self,
683        env: Env<R, E>,
684        f: F,
685    ) -> T {
686        let snap = self.env.restore_lexical_env(env);
687        let orig = mem::replace(&mut self.env, snap);
688        let r = f(self);
689        self.env = self.env.restore_lexical_env(orig);
690        r
691    }
692}
693
694#[derive(Debug, Clone)]
695pub struct Scope {
696    pub lexical: ModPath,
697    pub dynamic: ModPath,
698}
699
700impl Scope {
701    pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
702        Self {
703            lexical: ModPath(self.lexical.append(s)),
704            dynamic: ModPath(self.dynamic.append(s)),
705        }
706    }
707
708    pub fn root() -> Self {
709        Self { lexical: ModPath::root(), dynamic: ModPath::root() }
710    }
711}
712
713/// compile the expression into a node graph in the specified context
714/// and scope, return the root node or an error if compilation failed.
715pub fn compile<R: Rt, E: UserEvent>(
716    ctx: &mut ExecCtx<R, E>,
717    flags: BitFlags<CFlag>,
718    scope: &Scope,
719    spec: Expr,
720) -> Result<Node<R, E>> {
721    let top_id = spec.id;
722    let env = ctx.env.clone();
723    let st = Instant::now();
724    let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
725        Ok(n) => n,
726        Err(e) => {
727            ctx.env = env;
728            return Err(e);
729        }
730    };
731    info!("compile time {:?}", st.elapsed());
732    let st = Instant::now();
733    if let Err(e) = node.typecheck(ctx) {
734        ctx.env = env;
735        return Err(e);
736    }
737    info!("typecheck time {:?}", st.elapsed());
738    Ok(node)
739}