1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14 env::Env,
15 expr::{ExprId, ModPath},
16 typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use fxhash::{FxHashMap, FxHashSet};
23use log::info;
24use netidx::{
25 path::Path,
26 publisher::{Id, Val, WriteRequest},
27 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
28};
29use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
30use node::compiler;
31use parking_lot::RwLock;
32use std::{
33 any::Any,
34 cell::{Cell, RefCell},
35 collections::{hash_map::Entry, HashMap},
36 fmt::Debug,
37 mem,
38 sync::{
39 self,
40 atomic::{AtomicBool, Ordering},
41 LazyLock,
42 },
43 time::Duration,
44};
45use tokio::time::Instant;
46use triomphe::Arc;
47
48#[allow(dead_code)]
49static TRACE: AtomicBool = AtomicBool::new(false);
50
51#[allow(dead_code)]
52fn set_trace(b: bool) {
53 TRACE.store(b, Ordering::Relaxed)
54}
55
56#[allow(dead_code)]
57fn trace() -> bool {
58 TRACE.load(Ordering::Relaxed)
59}
60
61#[macro_export]
62macro_rules! tdbg {
63 ($e:expr) => {
64 if $crate::trace() {
65 dbg!($e)
66 } else {
67 $e
68 }
69 };
70}
71
72thread_local! {
73 pub static REFS: RefCell<Refs> = RefCell::new(Refs::new());
75}
76
77atomic_id!(LambdaId);
78
79impl From<u64> for LambdaId {
80 fn from(v: u64) -> Self {
81 LambdaId(v)
82 }
83}
84
85atomic_id!(BindId);
86
87impl From<u64> for BindId {
88 fn from(v: u64) -> Self {
89 BindId(v)
90 }
91}
92
93impl TryFrom<Value> for BindId {
94 type Error = anyhow::Error;
95
96 fn try_from(value: Value) -> Result<Self> {
97 match value {
98 Value::U64(id) => Ok(BindId(id)),
99 v => bail!("invalid bind id {v}"),
100 }
101 }
102}
103
104#[macro_export]
105macro_rules! errf {
106 ($pat:expr, $($arg:expr),*) => {
107 Some(Value::Error(ArcStr::from(format_compact!($pat, $($arg),*).as_str())))
108 };
109 ($pat:expr) => { Some(Value::Error(ArcStr::from(format_compact!($pat).as_str()))) };
110}
111
112#[macro_export]
113macro_rules! err {
114 ($pat:literal) => {
115 Some(Value::Error(literal!($pat)))
116 };
117}
118
119pub trait UserEvent: Clone + Debug + Any {
120 fn clear(&mut self);
121}
122
123#[derive(Debug, Clone)]
124pub struct NoUserEvent;
125
126impl UserEvent for NoUserEvent {
127 fn clear(&mut self) {}
128}
129
130#[derive(Debug, Clone, Copy)]
131#[bitflags]
132#[repr(u64)]
133pub enum PrintFlag {
134 DerefTVars,
137 ReplacePrims,
140 NoSource,
142 NoParents,
144}
145
146thread_local! {
147 static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims.into());
148}
149
150pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
154 flags: G,
155 f: F,
156) -> R {
157 let prev = PRINT_FLAGS.replace(flags.into());
158 let res = f();
159 PRINT_FLAGS.set(prev);
160 res
161}
162
163#[derive(Debug)]
169pub struct Event<E: UserEvent> {
170 pub init: bool,
171 pub variables: FxHashMap<BindId, Value>,
172 pub netidx: FxHashMap<SubId, subscriber::Event>,
173 pub writes: FxHashMap<Id, WriteRequest>,
174 pub rpc_calls: FxHashMap<BindId, RpcCall>,
175 pub user: E,
176}
177
178impl<E: UserEvent> Event<E> {
179 pub fn new(user: E) -> Self {
180 Event {
181 init: false,
182 variables: HashMap::default(),
183 netidx: HashMap::default(),
184 writes: HashMap::default(),
185 rpc_calls: HashMap::default(),
186 user,
187 }
188 }
189
190 pub fn clear(&mut self) {
191 let Self { init, variables, netidx, rpc_calls, writes, user } = self;
192 *init = false;
193 variables.clear();
194 netidx.clear();
195 rpc_calls.clear();
196 writes.clear();
197 user.clear();
198 }
199}
200
201#[derive(Debug, Clone)]
202pub struct Refs {
203 refed: FxHashSet<BindId>,
204 bound: FxHashSet<BindId>,
205}
206
207impl Refs {
208 pub fn new() -> Self {
209 Self { refed: FxHashSet::default(), bound: FxHashSet::default() }
210 }
211
212 pub fn clear(&mut self) {
213 self.refed.clear();
214 self.bound.clear();
215 }
216
217 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
218 for id in &self.refed {
219 if !self.bound.contains(id) {
220 f(*id);
221 }
222 }
223 }
224}
225
226pub type Node<R, E> = Box<dyn Update<R, E>>;
227
228pub type BuiltInInitFn<R, E> = sync::Arc<
229 dyn for<'a, 'b, 'c> Fn(
230 &'a mut ExecCtx<R, E>,
231 &'a FnType,
232 &'b ModPath,
233 &'c [Node<R, E>],
234 ExprId,
235 ) -> Result<Box<dyn Apply<R, E>>>
236 + Send
237 + Sync
238 + 'static,
239>;
240
241pub type InitFn<R, E> = sync::Arc<
242 dyn for<'a, 'b> Fn(
243 &'a mut ExecCtx<R, E>,
244 &'b [Node<R, E>],
245 ExprId,
246 ) -> Result<Box<dyn Apply<R, E>>>
247 + Send
248 + Sync
249 + 'static,
250>;
251
252pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
257 fn update(
258 &mut self,
259 ctx: &mut ExecCtx<R, E>,
260 from: &mut [Node<R, E>],
261 event: &mut Event<E>,
262 ) -> Option<Value>;
263
264 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
267 ()
268 }
269
270 fn typecheck(
273 &mut self,
274 _ctx: &mut ExecCtx<R, E>,
275 _from: &mut [Node<R, E>],
276 ) -> Result<()> {
277 Ok(())
278 }
279
280 fn typ(&self) -> Arc<FnType> {
283 const EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
284 Arc::new(FnType {
285 args: Arc::from_iter([]),
286 constraints: Arc::new(RwLock::new(vec![])),
287 rtype: Type::Bottom,
288 vargs: None,
289 })
290 });
291 Arc::clone(&*EMPTY)
292 }
293
294 fn refs<'a>(&self, _refs: &mut Refs) {}
298
299 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
302}
303
304pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
308 fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
311
312 fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
314
315 fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
317
318 fn typ(&self) -> &Type;
320
321 fn refs(&self, refs: &mut Refs);
324
325 fn spec(&self) -> &Expr;
327
328 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
330}
331
332pub trait BuiltIn<R: Rt, E: UserEvent> {
333 const NAME: &str;
334 const TYP: LazyLock<FnType>;
335
336 fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
337}
338
339pub trait Rt: Debug + 'static {
340 fn clear(&mut self);
341
342 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
346
347 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
349
350 fn list(&mut self, id: BindId, path: Path);
355
356 fn list_table(&mut self, id: BindId, path: Path);
359
360 fn stop_list(&mut self, id: BindId);
363
364 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
368
369 fn update(&mut self, id: &Val, value: Value);
371
372 fn unpublish(&mut self, id: Val, ref_by: ExprId);
374
375 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
385 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
386
387 fn set_var(&mut self, id: BindId, value: Value);
398
399 fn set_var_now(&mut self, id: BindId);
411
412 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
418
419 fn publish_rpc(
428 &mut self,
429 name: Path,
430 doc: Value,
431 spec: Vec<ArgSpec>,
432 id: BindId,
433 ) -> Result<()>;
434
435 fn unpublish_rpc(&mut self, name: Path);
437
438 fn set_timer(&mut self, id: BindId, timeout: Duration);
442}
443
444pub struct ExecCtx<R: Rt, E: UserEvent> {
445 builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
446 tags: FxHashSet<ArcStr>,
447 pub env: Env<R, E>,
448 pub cached: FxHashMap<BindId, Value>,
449 pub rt: R,
450}
451
452impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
453 pub fn clear(&mut self) {
454 self.env.clear();
455 self.rt.clear();
456 }
457
458 pub fn new(user: R) -> Self {
467 Self {
468 env: Env::new(),
469 builtins: FxHashMap::default(),
470 tags: FxHashSet::default(),
471 cached: HashMap::default(),
472 rt: user,
473 }
474 }
475
476 pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
477 let f = T::init(self);
478 match self.builtins.entry(T::NAME) {
479 Entry::Vacant(e) => {
480 e.insert((T::TYP.clone(), f));
481 }
482 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
483 }
484 Ok(())
485 }
486
487 pub fn set_var(&mut self, id: BindId, v: Value) {
491 self.cached.insert(id, v.clone());
492 self.rt.set_var(id, v)
493 }
494
495 pub fn set_var_now(&mut self, id: BindId, v: Value) {
496 self.cached.insert(id, v);
497 self.rt.set_var_now(id);
498 }
499
500 fn tag(&mut self, s: &ArcStr) -> ArcStr {
501 match self.tags.get(s) {
502 Some(s) => s.clone(),
503 None => {
504 self.tags.insert(s.clone());
505 s.clone()
506 }
507 }
508 }
509
510 pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
515 &mut self,
516 env: Env<R, E>,
517 f: F,
518 ) -> T {
519 let snap = self.env.restore_lexical_env(env);
520 let orig = mem::replace(&mut self.env, snap);
521 let r = f(self);
522 self.env = self.env.restore_lexical_env(orig);
523 r
524 }
525}
526
527pub fn compile<R: Rt, E: UserEvent>(
530 ctx: &mut ExecCtx<R, E>,
531 scope: &ModPath,
532 spec: Expr,
533) -> Result<Node<R, E>> {
534 let top_id = spec.id;
535 let env = ctx.env.clone();
536 let st = Instant::now();
537 let mut node = match compiler::compile(ctx, spec, scope, top_id) {
538 Ok(n) => n,
539 Err(e) => {
540 ctx.env = env;
541 return Err(e);
542 }
543 };
544 info!("compile time {:?}", st.elapsed());
545 let st = Instant::now();
546 if let Err(e) = node.typecheck(ctx) {
547 ctx.env = env;
548 return Err(e);
549 }
550 info!("typecheck time {:?}", st.elapsed());
551 Ok(node)
552}