1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14 env::Env,
15 expr::{ExprId, ModPath},
16 typ::{FnType, TVar, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use fxhash::{FxHashMap, FxHashSet};
23use log::info;
24use netidx::{
25 path::Path,
26 publisher::{Id, Val, WriteRequest},
27 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
28};
29use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
30use node::compiler;
31use parking_lot::RwLock;
32use std::{
33 any::Any,
34 cell::{Cell, RefCell},
35 collections::{hash_map::Entry, HashMap},
36 fmt::Debug,
37 mem,
38 sync::{
39 self,
40 atomic::{AtomicBool, Ordering},
41 LazyLock,
42 },
43 time::Duration,
44};
45use tokio::time::Instant;
46use triomphe::Arc;
47
48#[allow(dead_code)]
49static TRACE: AtomicBool = AtomicBool::new(false);
50
51#[allow(dead_code)]
52fn set_trace(b: bool) {
53 TRACE.store(b, Ordering::Relaxed)
54}
55
56#[allow(dead_code)]
57fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
58 let set = if enable {
59 eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
60 let prev = trace();
61 set_trace(true);
62 !prev
63 } else {
64 false
65 };
66 let r = match f() {
67 Err(e) => {
68 eprintln!("traced at {} failed with {e:?}", spec.pos);
69 Err(e)
70 }
71 r => r,
72 };
73 if set {
74 eprintln!("trace disabled at {}", spec.pos);
75 set_trace(false)
76 }
77 r
78}
79
80#[allow(dead_code)]
81fn trace() -> bool {
82 TRACE.load(Ordering::Relaxed)
83}
84
85#[macro_export]
86macro_rules! tdbg {
87 ($e:expr) => {
88 if $crate::trace() {
89 dbg!($e)
90 } else {
91 $e
92 }
93 };
94}
95
96thread_local! {
97 pub static REFS: RefCell<Refs> = RefCell::new(Refs::new());
99 static KNOWN: RefCell<FxHashMap<ArcStr, TVar>> = RefCell::new(HashMap::default());
100}
101
102atomic_id!(LambdaId);
103
104impl From<u64> for LambdaId {
105 fn from(v: u64) -> Self {
106 LambdaId(v)
107 }
108}
109
110atomic_id!(BindId);
111
112impl From<u64> for BindId {
113 fn from(v: u64) -> Self {
114 BindId(v)
115 }
116}
117
118impl TryFrom<Value> for BindId {
119 type Error = anyhow::Error;
120
121 fn try_from(value: Value) -> Result<Self> {
122 match value {
123 Value::U64(id) => Ok(BindId(id)),
124 v => bail!("invalid bind id {v}"),
125 }
126 }
127}
128
129#[macro_export]
130macro_rules! errf {
131 ($pat:expr, $($arg:expr),*) => {
132 Some(Value::Error(ArcStr::from(format_compact!($pat, $($arg),*).as_str())))
133 };
134 ($pat:expr) => { Some(Value::Error(ArcStr::from(format_compact!($pat).as_str()))) };
135}
136
137#[macro_export]
138macro_rules! err {
139 ($pat:literal) => {
140 Some(Value::Error(literal!($pat)))
141 };
142}
143
144pub trait UserEvent: Clone + Debug + Any {
145 fn clear(&mut self);
146}
147
148#[derive(Debug, Clone)]
149pub struct NoUserEvent;
150
151impl UserEvent for NoUserEvent {
152 fn clear(&mut self) {}
153}
154
155#[derive(Debug, Clone, Copy)]
156#[bitflags]
157#[repr(u64)]
158pub enum PrintFlag {
159 DerefTVars,
162 ReplacePrims,
165 NoSource,
167 NoParents,
169}
170
171thread_local! {
172 static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims | PrintFlag::NoSource);
173}
174
175pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
179 flags: G,
180 f: F,
181) -> R {
182 let prev = PRINT_FLAGS.replace(flags.into());
183 let res = f();
184 PRINT_FLAGS.set(prev);
185 res
186}
187
188#[derive(Debug)]
194pub struct Event<E: UserEvent> {
195 pub init: bool,
196 pub variables: FxHashMap<BindId, Value>,
197 pub netidx: FxHashMap<SubId, subscriber::Event>,
198 pub writes: FxHashMap<Id, WriteRequest>,
199 pub rpc_calls: FxHashMap<BindId, RpcCall>,
200 pub user: E,
201}
202
203impl<E: UserEvent> Event<E> {
204 pub fn new(user: E) -> Self {
205 Event {
206 init: false,
207 variables: HashMap::default(),
208 netidx: HashMap::default(),
209 writes: HashMap::default(),
210 rpc_calls: HashMap::default(),
211 user,
212 }
213 }
214
215 pub fn clear(&mut self) {
216 let Self { init, variables, netidx, rpc_calls, writes, user } = self;
217 *init = false;
218 variables.clear();
219 netidx.clear();
220 rpc_calls.clear();
221 writes.clear();
222 user.clear();
223 }
224}
225
226#[derive(Debug, Clone)]
227pub struct Refs {
228 refed: FxHashSet<BindId>,
229 bound: FxHashSet<BindId>,
230}
231
232impl Refs {
233 pub fn new() -> Self {
234 Self { refed: FxHashSet::default(), bound: FxHashSet::default() }
235 }
236
237 pub fn clear(&mut self) {
238 self.refed.clear();
239 self.bound.clear();
240 }
241
242 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
243 for id in &self.refed {
244 if !self.bound.contains(id) {
245 f(*id);
246 }
247 }
248 }
249}
250
251pub type Node<R, E> = Box<dyn Update<R, E>>;
252
253pub type BuiltInInitFn<R, E> = sync::Arc<
254 dyn for<'a, 'b, 'c> Fn(
255 &'a mut ExecCtx<R, E>,
256 &'a FnType,
257 &'b ModPath,
258 &'c [Node<R, E>],
259 ExprId,
260 ) -> Result<Box<dyn Apply<R, E>>>
261 + Send
262 + Sync
263 + 'static,
264>;
265
266pub type InitFn<R, E> = sync::Arc<
267 dyn for<'a, 'b> Fn(
268 &'a mut ExecCtx<R, E>,
269 &'b [Node<R, E>],
270 ExprId,
271 ) -> Result<Box<dyn Apply<R, E>>>
272 + Send
273 + Sync
274 + 'static,
275>;
276
277pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
282 fn update(
283 &mut self,
284 ctx: &mut ExecCtx<R, E>,
285 from: &mut [Node<R, E>],
286 event: &mut Event<E>,
287 ) -> Option<Value>;
288
289 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
292 ()
293 }
294
295 fn typecheck(
298 &mut self,
299 _ctx: &mut ExecCtx<R, E>,
300 _from: &mut [Node<R, E>],
301 ) -> Result<()> {
302 Ok(())
303 }
304
305 fn typ(&self) -> Arc<FnType> {
308 const EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
309 Arc::new(FnType {
310 args: Arc::from_iter([]),
311 constraints: Arc::new(RwLock::new(vec![])),
312 rtype: Type::Bottom,
313 vargs: None,
314 })
315 });
316 Arc::clone(&*EMPTY)
317 }
318
319 fn refs<'a>(&self, _refs: &mut Refs) {}
323
324 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
327}
328
329pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
333 fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
336
337 fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
339
340 fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
342
343 fn typ(&self) -> &Type;
345
346 fn refs(&self, refs: &mut Refs);
349
350 fn spec(&self) -> &Expr;
352
353 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
355}
356
357pub trait BuiltIn<R: Rt, E: UserEvent> {
358 const NAME: &str;
359 const TYP: LazyLock<FnType>;
360
361 fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
362}
363
364pub trait Rt: Debug + 'static {
365 fn clear(&mut self);
366
367 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
371
372 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
374
375 fn list(&mut self, id: BindId, path: Path);
380
381 fn list_table(&mut self, id: BindId, path: Path);
384
385 fn stop_list(&mut self, id: BindId);
388
389 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
393
394 fn update(&mut self, id: &Val, value: Value);
396
397 fn unpublish(&mut self, id: Val, ref_by: ExprId);
399
400 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
410 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
411
412 fn set_var(&mut self, id: BindId, value: Value);
423
424 fn notify_set(&mut self, id: BindId);
431
432 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
438
439 fn publish_rpc(
448 &mut self,
449 name: Path,
450 doc: Value,
451 spec: Vec<ArgSpec>,
452 id: BindId,
453 ) -> Result<()>;
454
455 fn unpublish_rpc(&mut self, name: Path);
457
458 fn set_timer(&mut self, id: BindId, timeout: Duration);
462}
463
464pub struct ExecCtx<R: Rt, E: UserEvent> {
465 builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
466 builtins_allowed: bool,
467 tags: FxHashSet<ArcStr>,
468 pub env: Env<R, E>,
469 pub cached: FxHashMap<BindId, Value>,
470 pub rt: R,
471}
472
473impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
474 pub fn clear(&mut self) {
475 self.env.clear();
476 self.rt.clear();
477 }
478
479 pub fn new(user: R) -> Self {
488 Self {
489 env: Env::new(),
490 builtins: FxHashMap::default(),
491 builtins_allowed: true,
492 tags: FxHashSet::default(),
493 cached: HashMap::default(),
494 rt: user,
495 }
496 }
497
498 pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
499 let f = T::init(self);
500 match self.builtins.entry(T::NAME) {
501 Entry::Vacant(e) => {
502 e.insert((T::TYP.clone(), f));
503 }
504 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
505 }
506 Ok(())
507 }
508
509 pub fn set_var(&mut self, id: BindId, v: Value) {
513 self.cached.insert(id, v.clone());
514 self.rt.set_var(id, v)
515 }
516
517 fn tag(&mut self, s: &ArcStr) -> ArcStr {
518 match self.tags.get(s) {
519 Some(s) => s.clone(),
520 None => {
521 self.tags.insert(s.clone());
522 s.clone()
523 }
524 }
525 }
526
527 pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
532 &mut self,
533 env: Env<R, E>,
534 f: F,
535 ) -> T {
536 let snap = self.env.restore_lexical_env(env);
537 let orig = mem::replace(&mut self.env, snap);
538 let r = f(self);
539 self.env = self.env.restore_lexical_env(orig);
540 r
541 }
542}
543
544pub fn compile<R: Rt, E: UserEvent>(
547 ctx: &mut ExecCtx<R, E>,
548 scope: &ModPath,
549 spec: Expr,
550) -> Result<Node<R, E>> {
551 let top_id = spec.id;
552 let env = ctx.env.clone();
553 let st = Instant::now();
554 let mut node = match compiler::compile(ctx, spec, scope, top_id) {
555 Ok(n) => n,
556 Err(e) => {
557 ctx.env = env;
558 return Err(e);
559 }
560 };
561 info!("compile time {:?}", st.elapsed());
562 let st = Instant::now();
563 if let Err(e) = node.typecheck(ctx) {
564 ctx.env = env;
565 return Err(e);
566 }
567 info!("typecheck time {:?}", st.elapsed());
568 Ok(node)
569}