graphix_compiler/
lib.rs

1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14    env::Env,
15    expr::{ExprId, ModPath},
16    typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use expr::Expr;
21use fxhash::{FxHashMap, FxHashSet};
22use log::info;
23use netidx::{
24    path::Path,
25    publisher::{Id, Val, WriteRequest},
26    subscriber::{self, Dval, SubId, UpdatesFlags, Value},
27};
28use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
29use node::compiler;
30use parking_lot::RwLock;
31use std::{
32    any::Any,
33    cell::RefCell,
34    collections::{hash_map::Entry, HashMap},
35    fmt::Debug,
36    mem,
37    sync::{
38        self,
39        atomic::{AtomicBool, Ordering},
40        LazyLock,
41    },
42    time::Duration,
43};
44use tokio::time::Instant;
45use triomphe::Arc;
46
47#[allow(dead_code)]
48static TRACE: AtomicBool = AtomicBool::new(false);
49
50#[allow(dead_code)]
51fn set_trace(b: bool) {
52    TRACE.store(b, Ordering::Relaxed)
53}
54
55#[allow(dead_code)]
56fn trace() -> bool {
57    TRACE.load(Ordering::Relaxed)
58}
59
60#[macro_export]
61macro_rules! tdbg {
62    ($e:expr) => {
63        if $crate::trace() {
64            dbg!($e)
65        } else {
66            $e
67        }
68    };
69}
70
71thread_local! {
72    /// thread local shared refs structure
73    pub static REFS: RefCell<Refs> = RefCell::new(Refs::new());
74}
75
76atomic_id!(LambdaId);
77
78impl From<u64> for LambdaId {
79    fn from(v: u64) -> Self {
80        LambdaId(v)
81    }
82}
83
84atomic_id!(BindId);
85
86impl From<u64> for BindId {
87    fn from(v: u64) -> Self {
88        BindId(v)
89    }
90}
91
92impl TryFrom<Value> for BindId {
93    type Error = anyhow::Error;
94
95    fn try_from(value: Value) -> Result<Self> {
96        match value {
97            Value::U64(id) => Ok(BindId(id)),
98            v => bail!("invalid bind id {v}"),
99        }
100    }
101}
102
103#[macro_export]
104macro_rules! errf {
105    ($pat:expr, $($arg:expr),*) => {
106        Some(Value::Error(ArcStr::from(format_compact!($pat, $($arg),*).as_str())))
107    };
108    ($pat:expr) => { Some(Value::Error(ArcStr::from(format_compact!($pat).as_str()))) };
109}
110
111#[macro_export]
112macro_rules! err {
113    ($pat:literal) => {
114        Some(Value::Error(literal!($pat)))
115    };
116}
117
118pub trait UserEvent: Clone + Debug + Any {
119    fn clear(&mut self);
120}
121
122#[derive(Debug, Clone)]
123pub struct NoUserEvent;
124
125impl UserEvent for NoUserEvent {
126    fn clear(&mut self) {}
127}
128
129/// Event represents all the things that happened simultaneously in a
130/// given execution cycle. Event may contain only one update for each
131/// variable and netidx subscription in a given cycle, if more updates
132/// happen simultaneously they must be queued and deferred to later
133/// cycles.
134#[derive(Debug)]
135pub struct Event<E: UserEvent> {
136    pub init: bool,
137    pub variables: FxHashMap<BindId, Value>,
138    pub netidx: FxHashMap<SubId, subscriber::Event>,
139    pub writes: FxHashMap<Id, WriteRequest>,
140    pub rpc_calls: FxHashMap<BindId, RpcCall>,
141    pub user: E,
142}
143
144impl<E: UserEvent> Event<E> {
145    pub fn new(user: E) -> Self {
146        Event {
147            init: false,
148            variables: HashMap::default(),
149            netidx: HashMap::default(),
150            writes: HashMap::default(),
151            rpc_calls: HashMap::default(),
152            user,
153        }
154    }
155
156    pub fn clear(&mut self) {
157        let Self { init, variables, netidx, rpc_calls, writes, user } = self;
158        *init = false;
159        variables.clear();
160        netidx.clear();
161        rpc_calls.clear();
162        writes.clear();
163        user.clear();
164    }
165}
166
167#[derive(Debug, Clone)]
168pub struct Refs {
169    refed: FxHashSet<BindId>,
170    bound: FxHashSet<BindId>,
171}
172
173impl Refs {
174    pub fn new() -> Self {
175        Self { refed: FxHashSet::default(), bound: FxHashSet::default() }
176    }
177
178    pub fn clear(&mut self) {
179        self.refed.clear();
180        self.bound.clear();
181    }
182
183    pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
184        for id in &self.refed {
185            if !self.bound.contains(id) {
186                f(*id);
187            }
188        }
189    }
190}
191
192pub type Node<C, E> = Box<dyn Update<C, E>>;
193
194pub type BuiltInInitFn<C, E> = sync::Arc<
195    dyn for<'a, 'b, 'c> Fn(
196            &'a mut ExecCtx<C, E>,
197            &'a FnType,
198            &'b ModPath,
199            &'c [Node<C, E>],
200            ExprId,
201        ) -> Result<Box<dyn Apply<C, E>>>
202        + Send
203        + Sync
204        + 'static,
205>;
206
207pub type InitFn<C, E> = sync::Arc<
208    dyn for<'a, 'b> Fn(
209            &'a mut ExecCtx<C, E>,
210            &'b [Node<C, E>],
211            ExprId,
212        ) -> Result<Box<dyn Apply<C, E>>>
213        + Send
214        + Sync
215        + 'static,
216>;
217
218/// Apply is a kind of node that represents a function application. It
219/// does not hold ownership of it's arguments, instead those are held
220/// by a CallSite node. This allows us to change the function called
221/// at runtime without recompiling the arguments.
222pub trait Apply<C: Ctx, E: UserEvent>: Debug + Send + Sync + Any {
223    fn update(
224        &mut self,
225        ctx: &mut ExecCtx<C, E>,
226        from: &mut [Node<C, E>],
227        event: &mut Event<E>,
228    ) -> Option<Value>;
229
230    /// delete any internally generated nodes, only needed for
231    /// builtins that dynamically generate code at runtime
232    fn delete(&mut self, _ctx: &mut ExecCtx<C, E>) {
233        ()
234    }
235
236    /// apply custom typechecking to the lambda, only needed for
237    /// builtins that take lambdas as arguments
238    fn typecheck(
239        &mut self,
240        _ctx: &mut ExecCtx<C, E>,
241        _from: &mut [Node<C, E>],
242    ) -> Result<()> {
243        Ok(())
244    }
245
246    /// return the lambdas type, builtins do not need to implement
247    /// this, it is implemented by the BuiltIn wrapper
248    fn typ(&self) -> Arc<FnType> {
249        const EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
250            Arc::new(FnType {
251                args: Arc::from_iter([]),
252                constraints: Arc::new(RwLock::new(vec![])),
253                rtype: Type::Bottom,
254                vargs: None,
255            })
256        });
257        Arc::clone(&*EMPTY)
258    }
259
260    /// Populate the Refs structure with all the ids bound and refed by this
261    /// node. It is only necessary for builtins to implement this if they create
262    /// nodes, such as call sites.
263    fn refs<'a>(&self, _refs: &mut Refs) {}
264
265    /// put the node to sleep, used in conditions like select for branches that
266    /// are not selected. Any cached values should be cleared on sleep.
267    fn sleep(&mut self, _ctx: &mut ExecCtx<C, E>);
268}
269
270/// Update represents a regular graph node, as opposed to a function
271/// application represented by Apply. Regular graph nodes are used for
272/// every built in node except for builtin functions.
273pub trait Update<C: Ctx, E: UserEvent>: Debug + Send + Sync + Any + 'static {
274    /// update the node with the specified event and return any output
275    /// it might generate
276    fn update(&mut self, ctx: &mut ExecCtx<C, E>, event: &mut Event<E>) -> Option<Value>;
277
278    /// delete the node and it's children from the specified context
279    fn delete(&mut self, ctx: &mut ExecCtx<C, E>);
280
281    /// type check the node and it's children
282    fn typecheck(&mut self, ctx: &mut ExecCtx<C, E>) -> Result<()>;
283
284    /// return the node type
285    fn typ(&self) -> &Type;
286
287    /// Populate the Refs structure with all the bind ids either refed or bound
288    /// by the node and it's children
289    fn refs(&self, refs: &mut Refs);
290
291    /// return the original expression used to compile this node
292    fn spec(&self) -> &Expr;
293
294    /// put the node to sleep, called on unselected branches
295    fn sleep(&mut self, ctx: &mut ExecCtx<C, E>);
296}
297
298pub trait BuiltIn<C: Ctx, E: UserEvent> {
299    const NAME: &str;
300    const TYP: LazyLock<FnType>;
301
302    fn init(ctx: &mut ExecCtx<C, E>) -> BuiltInInitFn<C, E>;
303}
304
305pub trait Ctx: Debug + 'static {
306    fn clear(&mut self);
307
308    /// Subscribe to the specified netidx path. When the subscription
309    /// updates you are expected to deliver Netidx events to the
310    /// expression specified by ref_by.
311    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
312
313    /// Called when a subscription is no longer needed
314    fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
315
316    /// List the netidx path, return Value::Null if the path did not
317    /// change. When the path did update you should send the output
318    /// back as a properly formatted struct with two fields, rows and
319    /// columns both containing string arrays.
320    fn list(&mut self, id: BindId, path: Path);
321
322    /// List the table at path, return Value::Null if the path did not
323    /// change
324    fn list_table(&mut self, id: BindId, path: Path);
325
326    /// list or table will no longer be called on this BindId, and
327    /// related resources can be cleaned up.
328    fn stop_list(&mut self, id: BindId);
329
330    /// Publish the specified value, returning it's Id, which must be
331    /// used to update the value and unpublish it. If the path is
332    /// already published, return an error.
333    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
334
335    /// Update the specified value
336    fn update(&mut self, id: &Val, value: Value);
337
338    /// Stop publishing the specified id
339    fn unpublish(&mut self, id: Val, ref_by: ExprId);
340
341    /// This will be called by the compiler whenever a bound variable
342    /// is referenced. The ref_by is the toplevel expression that
343    /// contains the variable reference. When a variable event
344    /// happens, you should update all the toplevel expressions that
345    /// ref that variable.
346    ///
347    /// ref_var will also be called when a bound lambda expression is
348    /// referenced, in that case the ref_by id will be the toplevel
349    /// expression containing the call site.
350    fn ref_var(&mut self, id: BindId, ref_by: ExprId);
351    fn unref_var(&mut self, id: BindId, ref_by: ExprId);
352
353    /// Called by the ExecCtx when set_var is called on it. All
354    /// expressions that ref the id should be updated when this
355    /// happens.
356    ///
357    /// The runtime must deliver all set_vars in a single event except
358    /// that set_vars for the same variable in the same cycle must be
359    /// queued and deferred to the next cycle.
360    ///
361    /// The runtime MUST NOT change event while a cycle is in
362    /// progress. set_var must be queued until the cycle ends and then
363    /// presented as a new batch.
364    fn set_var(&mut self, id: BindId, value: Value);
365
366    /// This must return results from the same path in the call order.
367    ///
368    /// when the rpc returns you are expected to deliver a Variable
369    /// event with the specified id to the expression specified by
370    /// ref_by.
371    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
372
373    /// Publish an rpc at the specified path with the specified
374    /// procedure level doc and arg spec.
375    ///
376    /// When the RPC is called the rpc table in event will be
377    /// populated under the specified bind id.
378    ///
379    /// If the procedure is already published an error will be
380    /// returned
381    fn publish_rpc(
382        &mut self,
383        name: Path,
384        doc: Value,
385        spec: Vec<ArgSpec>,
386        id: BindId,
387    ) -> Result<()>;
388
389    /// unpublish the rpc identified by the bind id.
390    fn unpublish_rpc(&mut self, name: Path);
391
392    /// arrange to have a Timer event delivered after timeout. When
393    /// the timer expires you are expected to deliver a Variable event
394    /// for the id, containing the current time.
395    fn set_timer(&mut self, id: BindId, timeout: Duration);
396}
397
398pub struct ExecCtx<C: Ctx, E: UserEvent> {
399    builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<C, E>)>,
400    tags: FxHashSet<ArcStr>,
401    pub env: Env<C, E>,
402    pub cached: FxHashMap<BindId, Value>,
403    pub user: C,
404}
405
406impl<C: Ctx, E: UserEvent> ExecCtx<C, E> {
407    pub fn clear(&mut self) {
408        self.env.clear();
409        self.user.clear();
410    }
411
412    /// Build a new execution context.
413    ///
414    /// This is a very low level interface that you can use to build a
415    /// custom runtime with deep integration to your code. It is very
416    /// difficult to use, and if you don't implement everything
417    /// correctly the semantics of the language can be wrong.
418    ///
419    /// Most likely you want to use the `rt` module instead.
420    pub fn new(user: C) -> Self {
421        Self {
422            env: Env::new(),
423            builtins: FxHashMap::default(),
424            tags: FxHashSet::default(),
425            cached: HashMap::default(),
426            user,
427        }
428    }
429
430    pub fn register_builtin<T: BuiltIn<C, E>>(&mut self) -> Result<()> {
431        let f = T::init(self);
432        match self.builtins.entry(T::NAME) {
433            Entry::Vacant(e) => {
434                e.insert((T::TYP.clone(), f));
435            }
436            Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
437        }
438        Ok(())
439    }
440
441    /// Built in functions should call this when variables are set
442    /// unless they are sure the variable does not need to be
443    /// cached. This will also call the user ctx set_var.
444    pub fn set_var(&mut self, id: BindId, v: Value) {
445        self.cached.insert(id, v.clone());
446        self.user.set_var(id, v)
447    }
448
449    fn tag(&mut self, s: &ArcStr) -> ArcStr {
450        match self.tags.get(s) {
451            Some(s) => s.clone(),
452            None => {
453                self.tags.insert(s.clone());
454                s.clone()
455            }
456        }
457    }
458
459    /// Restore the lexical environment to the snapshot `env` for the
460    /// duration of `f` restoring it to it's original value
461    /// afterwords. `by_id` and `lambdas` defined by the closure will
462    /// be retained.
463    pub fn with_restored<R, F: FnOnce(&mut Self) -> R>(
464        &mut self,
465        env: Env<C, E>,
466        f: F,
467    ) -> R {
468        let snap = self.env.restore_lexical_env(env);
469        let orig = mem::replace(&mut self.env, snap);
470        let r = f(self);
471        self.env = self.env.restore_lexical_env(orig);
472        r
473    }
474}
475
476/// compile the expression into a node graph in the specified context
477/// and scope, return the root node or an error if compilation failed.
478pub fn compile<C: Ctx, E: UserEvent>(
479    ctx: &mut ExecCtx<C, E>,
480    scope: &ModPath,
481    spec: Expr,
482) -> Result<Node<C, E>> {
483    let top_id = spec.id;
484    let env = ctx.env.clone();
485    let st = Instant::now();
486    let mut node = match compiler::compile(ctx, spec, scope, top_id) {
487        Ok(n) => n,
488        Err(e) => {
489            ctx.env = env;
490            return Err(e);
491        }
492    };
493    info!("compile time {:?}", st.elapsed());
494    let st = Instant::now();
495    if let Err(e) = node.typecheck(ctx) {
496        ctx.env = env;
497        return Err(e);
498    }
499    info!("typecheck time {:?}", st.elapsed());
500    Ok(node)
501}