graphix_compiler/
lib.rs

1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14    env::Env,
15    expr::{ExprId, ModPath},
16    typ::{FnType, TVar, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use fxhash::{FxHashMap, FxHashSet};
23use log::info;
24use netidx::{
25    path::Path,
26    publisher::{Id, Val, WriteRequest},
27    subscriber::{self, Dval, SubId, UpdatesFlags, Value},
28};
29use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
30use node::compiler;
31use parking_lot::RwLock;
32use std::{
33    any::Any,
34    cell::{Cell, RefCell},
35    collections::{hash_map::Entry, HashMap},
36    fmt::Debug,
37    mem,
38    sync::{
39        self,
40        atomic::{AtomicBool, Ordering},
41        LazyLock,
42    },
43    time::Duration,
44};
45use tokio::time::Instant;
46use triomphe::Arc;
47
48#[allow(dead_code)]
49static TRACE: AtomicBool = AtomicBool::new(false);
50
51#[allow(dead_code)]
52fn set_trace(b: bool) {
53    TRACE.store(b, Ordering::Relaxed)
54}
55
56#[allow(dead_code)]
57fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
58    let set = if enable {
59        eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
60        let prev = trace();
61        set_trace(true);
62        !prev
63    } else {
64        false
65    };
66    let r = match f() {
67        Err(e) => {
68            eprintln!("traced at {} failed with {e:?}", spec.pos);
69            Err(e)
70        }
71        r => r,
72    };
73    if set {
74        eprintln!("trace disabled at {}", spec.pos);
75        set_trace(false)
76    }
77    r
78}
79
80#[allow(dead_code)]
81fn trace() -> bool {
82    TRACE.load(Ordering::Relaxed)
83}
84
85#[macro_export]
86macro_rules! tdbg {
87    ($e:expr) => {
88        if $crate::trace() {
89            dbg!($e)
90        } else {
91            $e
92        }
93    };
94}
95
96thread_local! {
97    /// thread local shared refs structure
98    pub static REFS: RefCell<Refs> = RefCell::new(Refs::new());
99    static KNOWN: RefCell<FxHashMap<ArcStr, TVar>> = RefCell::new(HashMap::default());
100}
101
102atomic_id!(LambdaId);
103
104impl From<u64> for LambdaId {
105    fn from(v: u64) -> Self {
106        LambdaId(v)
107    }
108}
109
110atomic_id!(BindId);
111
112impl From<u64> for BindId {
113    fn from(v: u64) -> Self {
114        BindId(v)
115    }
116}
117
118impl TryFrom<Value> for BindId {
119    type Error = anyhow::Error;
120
121    fn try_from(value: Value) -> Result<Self> {
122        match value {
123            Value::U64(id) => Ok(BindId(id)),
124            v => bail!("invalid bind id {v}"),
125        }
126    }
127}
128
129#[macro_export]
130macro_rules! errf {
131    ($pat:expr, $($arg:expr),*) => {
132        Some(Value::Error(ArcStr::from(format_compact!($pat, $($arg),*).as_str())))
133    };
134    ($pat:expr) => { Some(Value::Error(ArcStr::from(format_compact!($pat).as_str()))) };
135}
136
137#[macro_export]
138macro_rules! err {
139    ($pat:literal) => {
140        Some(Value::Error(literal!($pat)))
141    };
142}
143
144pub trait UserEvent: Clone + Debug + Any {
145    fn clear(&mut self);
146}
147
148#[derive(Debug, Clone)]
149pub struct NoUserEvent;
150
151impl UserEvent for NoUserEvent {
152    fn clear(&mut self) {}
153}
154
155#[derive(Debug, Clone, Copy)]
156#[bitflags]
157#[repr(u64)]
158pub enum PrintFlag {
159    /// Dereference type variables and print both the tvar name and the bound
160    /// type or "unbound".
161    DerefTVars,
162    /// Replace common primitives with shorter type names as defined
163    /// in core. e.g. Any, instead of the set of every primitive type.
164    ReplacePrims,
165    /// When formatting an Origin don't print the source, just the location
166    NoSource,
167    /// When formatting an Origin don't print the origin's parents
168    NoParents,
169}
170
171thread_local! {
172    static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims | PrintFlag::NoSource);
173}
174
175/// For the duration of the closure F change the way type variables
176/// are formatted (on this thread only) according to the specified
177/// flags.
178pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
179    flags: G,
180    f: F,
181) -> R {
182    let prev = PRINT_FLAGS.replace(flags.into());
183    let res = f();
184    PRINT_FLAGS.set(prev);
185    res
186}
187
188/// Event represents all the things that happened simultaneously in a
189/// given execution cycle. Event may contain only one update for each
190/// variable and netidx subscription in a given cycle, if more updates
191/// happen simultaneously they must be queued and deferred to later
192/// cycles.
193#[derive(Debug)]
194pub struct Event<E: UserEvent> {
195    pub init: bool,
196    pub variables: FxHashMap<BindId, Value>,
197    pub netidx: FxHashMap<SubId, subscriber::Event>,
198    pub writes: FxHashMap<Id, WriteRequest>,
199    pub rpc_calls: FxHashMap<BindId, RpcCall>,
200    pub user: E,
201}
202
203impl<E: UserEvent> Event<E> {
204    pub fn new(user: E) -> Self {
205        Event {
206            init: false,
207            variables: HashMap::default(),
208            netidx: HashMap::default(),
209            writes: HashMap::default(),
210            rpc_calls: HashMap::default(),
211            user,
212        }
213    }
214
215    pub fn clear(&mut self) {
216        let Self { init, variables, netidx, rpc_calls, writes, user } = self;
217        *init = false;
218        variables.clear();
219        netidx.clear();
220        rpc_calls.clear();
221        writes.clear();
222        user.clear();
223    }
224}
225
226#[derive(Debug, Clone)]
227pub struct Refs {
228    refed: FxHashSet<BindId>,
229    bound: FxHashSet<BindId>,
230}
231
232impl Refs {
233    pub fn new() -> Self {
234        Self { refed: FxHashSet::default(), bound: FxHashSet::default() }
235    }
236
237    pub fn clear(&mut self) {
238        self.refed.clear();
239        self.bound.clear();
240    }
241
242    pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
243        for id in &self.refed {
244            if !self.bound.contains(id) {
245                f(*id);
246            }
247        }
248    }
249}
250
251pub type Node<R, E> = Box<dyn Update<R, E>>;
252
253pub type BuiltInInitFn<R, E> = sync::Arc<
254    dyn for<'a, 'b, 'c> Fn(
255            &'a mut ExecCtx<R, E>,
256            &'a FnType,
257            &'b ModPath,
258            &'c [Node<R, E>],
259            ExprId,
260        ) -> Result<Box<dyn Apply<R, E>>>
261        + Send
262        + Sync
263        + 'static,
264>;
265
266pub type InitFn<R, E> = sync::Arc<
267    dyn for<'a, 'b> Fn(
268            &'a mut ExecCtx<R, E>,
269            &'b [Node<R, E>],
270            ExprId,
271        ) -> Result<Box<dyn Apply<R, E>>>
272        + Send
273        + Sync
274        + 'static,
275>;
276
277/// Apply is a kind of node that represents a function application. It
278/// does not hold ownership of it's arguments, instead those are held
279/// by a CallSite node. This allows us to change the function called
280/// at runtime without recompiling the arguments.
281pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
282    fn update(
283        &mut self,
284        ctx: &mut ExecCtx<R, E>,
285        from: &mut [Node<R, E>],
286        event: &mut Event<E>,
287    ) -> Option<Value>;
288
289    /// delete any internally generated nodes, only needed for
290    /// builtins that dynamically generate code at runtime
291    fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
292        ()
293    }
294
295    /// apply custom typechecking to the lambda, only needed for
296    /// builtins that take lambdas as arguments
297    fn typecheck(
298        &mut self,
299        _ctx: &mut ExecCtx<R, E>,
300        _from: &mut [Node<R, E>],
301    ) -> Result<()> {
302        Ok(())
303    }
304
305    /// return the lambdas type, builtins do not need to implement
306    /// this, it is implemented by the BuiltIn wrapper
307    fn typ(&self) -> Arc<FnType> {
308        const EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
309            Arc::new(FnType {
310                args: Arc::from_iter([]),
311                constraints: Arc::new(RwLock::new(vec![])),
312                rtype: Type::Bottom,
313                vargs: None,
314            })
315        });
316        Arc::clone(&*EMPTY)
317    }
318
319    /// Populate the Refs structure with all the ids bound and refed by this
320    /// node. It is only necessary for builtins to implement this if they create
321    /// nodes, such as call sites.
322    fn refs<'a>(&self, _refs: &mut Refs) {}
323
324    /// put the node to sleep, used in conditions like select for branches that
325    /// are not selected. Any cached values should be cleared on sleep.
326    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
327}
328
329/// Update represents a regular graph node, as opposed to a function
330/// application represented by Apply. Regular graph nodes are used for
331/// every built in node except for builtin functions.
332pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
333    /// update the node with the specified event and return any output
334    /// it might generate
335    fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
336
337    /// delete the node and it's children from the specified context
338    fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
339
340    /// type check the node and it's children
341    fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
342
343    /// return the node type
344    fn typ(&self) -> &Type;
345
346    /// Populate the Refs structure with all the bind ids either refed or bound
347    /// by the node and it's children
348    fn refs(&self, refs: &mut Refs);
349
350    /// return the original expression used to compile this node
351    fn spec(&self) -> &Expr;
352
353    /// put the node to sleep, called on unselected branches
354    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
355}
356
357pub trait BuiltIn<R: Rt, E: UserEvent> {
358    const NAME: &str;
359    const TYP: LazyLock<FnType>;
360
361    fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
362}
363
364pub trait Rt: Debug + 'static {
365    fn clear(&mut self);
366
367    /// Subscribe to the specified netidx path. When the subscription
368    /// updates you are expected to deliver Netidx events to the
369    /// expression specified by ref_by.
370    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
371
372    /// Called when a subscription is no longer needed
373    fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
374
375    /// List the netidx path, return Value::Null if the path did not
376    /// change. When the path did update you should send the output
377    /// back as a properly formatted struct with two fields, rows and
378    /// columns both containing string arrays.
379    fn list(&mut self, id: BindId, path: Path);
380
381    /// List the table at path, return Value::Null if the path did not
382    /// change
383    fn list_table(&mut self, id: BindId, path: Path);
384
385    /// list or table will no longer be called on this BindId, and
386    /// related resources can be cleaned up.
387    fn stop_list(&mut self, id: BindId);
388
389    /// Publish the specified value, returning it's Id, which must be
390    /// used to update the value and unpublish it. If the path is
391    /// already published, return an error.
392    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
393
394    /// Update the specified value
395    fn update(&mut self, id: &Val, value: Value);
396
397    /// Stop publishing the specified id
398    fn unpublish(&mut self, id: Val, ref_by: ExprId);
399
400    /// This will be called by the compiler whenever a bound variable
401    /// is referenced. The ref_by is the toplevel expression that
402    /// contains the variable reference. When a variable event
403    /// happens, you should update all the toplevel expressions that
404    /// ref that variable.
405    ///
406    /// ref_var will also be called when a bound lambda expression is
407    /// referenced, in that case the ref_by id will be the toplevel
408    /// expression containing the call site.
409    fn ref_var(&mut self, id: BindId, ref_by: ExprId);
410    fn unref_var(&mut self, id: BindId, ref_by: ExprId);
411
412    /// Called by the ExecCtx when set_var is called on it.
413    ///
414    /// All expressions that ref the id should be updated when this happens. The
415    /// runtime must deliver all set_vars in a single event except that set_vars
416    /// for the same variable in the same cycle must be queued and deferred to
417    /// the next cycle.
418    ///
419    /// The runtime MUST NOT change event while a cycle is in
420    /// progress. set_var must be queued until the cycle ends and then
421    /// presented as a new batch.
422    fn set_var(&mut self, id: BindId, value: Value);
423
424    /// Notify the RT that a top level variable has been set internally
425    ///
426    /// This is called when the compiler has determined that it's safe to set a
427    /// variable without waiting a cycle. When the updated variable is a
428    /// toplevel node this method is called to notify the runtime that needs to
429    /// update any dependent toplevel nodes.
430    fn notify_set(&mut self, id: BindId);
431
432    /// This must return results from the same path in the call order.
433    ///
434    /// when the rpc returns you are expected to deliver a Variable
435    /// event with the specified id to the expression specified by
436    /// ref_by.
437    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
438
439    /// Publish an rpc at the specified path with the specified
440    /// procedure level doc and arg spec.
441    ///
442    /// When the RPC is called the rpc table in event will be
443    /// populated under the specified bind id.
444    ///
445    /// If the procedure is already published an error will be
446    /// returned
447    fn publish_rpc(
448        &mut self,
449        name: Path,
450        doc: Value,
451        spec: Vec<ArgSpec>,
452        id: BindId,
453    ) -> Result<()>;
454
455    /// unpublish the rpc identified by the bind id.
456    fn unpublish_rpc(&mut self, name: Path);
457
458    /// arrange to have a Timer event delivered after timeout. When
459    /// the timer expires you are expected to deliver a Variable event
460    /// for the id, containing the current time.
461    fn set_timer(&mut self, id: BindId, timeout: Duration);
462}
463
464pub struct ExecCtx<R: Rt, E: UserEvent> {
465    builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
466    builtins_allowed: bool,
467    tags: FxHashSet<ArcStr>,
468    pub env: Env<R, E>,
469    pub cached: FxHashMap<BindId, Value>,
470    pub rt: R,
471}
472
473impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
474    pub fn clear(&mut self) {
475        self.env.clear();
476        self.rt.clear();
477    }
478
479    /// Build a new execution context.
480    ///
481    /// This is a very low level interface that you can use to build a
482    /// custom runtime with deep integration to your code. It is very
483    /// difficult to use, and if you don't implement everything
484    /// correctly the semantics of the language can be wrong.
485    ///
486    /// Most likely you want to use the `rt` module instead.
487    pub fn new(user: R) -> Self {
488        Self {
489            env: Env::new(),
490            builtins: FxHashMap::default(),
491            builtins_allowed: true,
492            tags: FxHashSet::default(),
493            cached: HashMap::default(),
494            rt: user,
495        }
496    }
497
498    pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
499        let f = T::init(self);
500        match self.builtins.entry(T::NAME) {
501            Entry::Vacant(e) => {
502                e.insert((T::TYP.clone(), f));
503            }
504            Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
505        }
506        Ok(())
507    }
508
509    /// Built in functions should call this when variables are set
510    /// unless they are sure the variable does not need to be
511    /// cached. This will also call the user ctx set_var.
512    pub fn set_var(&mut self, id: BindId, v: Value) {
513        self.cached.insert(id, v.clone());
514        self.rt.set_var(id, v)
515    }
516
517    fn tag(&mut self, s: &ArcStr) -> ArcStr {
518        match self.tags.get(s) {
519            Some(s) => s.clone(),
520            None => {
521                self.tags.insert(s.clone());
522                s.clone()
523            }
524        }
525    }
526
527    /// Restore the lexical environment to the snapshot `env` for the
528    /// duration of `f` restoring it to it's original value
529    /// afterwords. `by_id` and `lambdas` defined by the closure will
530    /// be retained.
531    pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
532        &mut self,
533        env: Env<R, E>,
534        f: F,
535    ) -> T {
536        let snap = self.env.restore_lexical_env(env);
537        let orig = mem::replace(&mut self.env, snap);
538        let r = f(self);
539        self.env = self.env.restore_lexical_env(orig);
540        r
541    }
542}
543
544/// compile the expression into a node graph in the specified context
545/// and scope, return the root node or an error if compilation failed.
546pub fn compile<R: Rt, E: UserEvent>(
547    ctx: &mut ExecCtx<R, E>,
548    scope: &ModPath,
549    spec: Expr,
550) -> Result<Node<R, E>> {
551    let top_id = spec.id;
552    let env = ctx.env.clone();
553    let st = Instant::now();
554    let mut node = match compiler::compile(ctx, spec, scope, top_id) {
555        Ok(n) => n,
556        Err(e) => {
557            ctx.env = env;
558            return Err(e);
559        }
560    };
561    info!("compile time {:?}", st.elapsed());
562    let st = Instant::now();
563    if let Err(e) = node.typecheck(ctx) {
564        ctx.env = env;
565        return Err(e);
566    }
567    info!("typecheck time {:?}", st.elapsed());
568    Ok(node)
569}