1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14 env::Env,
15 expr::{ExprId, ModPath},
16 typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use fxhash::{FxHashMap, FxHashSet};
23use log::info;
24use netidx::{
25 path::Path,
26 publisher::{Id, Val, WriteRequest},
27 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
28};
29use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
30use node::compiler;
31use parking_lot::RwLock;
32use poolshark::local::LPooled;
33use std::{
34 any::Any,
35 cell::Cell,
36 collections::{hash_map::Entry, HashMap},
37 fmt::Debug,
38 mem,
39 sync::{
40 self,
41 atomic::{AtomicBool, Ordering},
42 LazyLock,
43 },
44 time::Duration,
45};
46use tokio::time::Instant;
47use triomphe::Arc;
48
49#[derive(Debug, Clone, Copy)]
50#[bitflags]
51#[repr(u64)]
52pub enum CFlag {
53 WarnUnhandled,
54 WarnUnhandledArith,
55 WarnUnused,
56 WarningsAreErrors,
57}
58
59#[allow(dead_code)]
60static TRACE: AtomicBool = AtomicBool::new(false);
61
62#[allow(dead_code)]
63fn set_trace(b: bool) {
64 TRACE.store(b, Ordering::Relaxed)
65}
66
67#[allow(dead_code)]
68fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
69 let set = if enable {
70 eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
71 let prev = trace();
72 set_trace(true);
73 !prev
74 } else {
75 false
76 };
77 let r = match f() {
78 Err(e) => {
79 eprintln!("traced at {} failed with {e:?}", spec.pos);
80 Err(e)
81 }
82 r => r,
83 };
84 if set {
85 eprintln!("trace disabled at {}", spec.pos);
86 set_trace(false)
87 }
88 r
89}
90
91#[allow(dead_code)]
92fn trace() -> bool {
93 TRACE.load(Ordering::Relaxed)
94}
95
96#[macro_export]
97macro_rules! tdbg {
98 ($e:expr) => {
99 if $crate::trace() {
100 dbg!($e)
101 } else {
102 $e
103 }
104 };
105}
106
107#[macro_export]
108macro_rules! err {
109 ($tag:expr, $err:literal) => {{
110 let e: Value = ($tag.clone(), literal!($err)).into();
111 Value::Error(triomphe::Arc::new(e))
112 }};
113}
114
115#[macro_export]
116macro_rules! errf {
117 ($tag:expr, $fmt:expr, $args:tt) => {{
118 let msg: ArcStr = compact_str::format_compact!($fmt, $args).as_str().into();
119 let e: Value = ($tag.clone(), msg).into();
120 Value::Error(triomphe::Arc::new(e))
121 }};
122 ($tag:expr, $fmt:expr) => {{
123 let msg: ArcStr = compact_str::format_compact!($fmt).as_str().into();
124 let e: Value = ($tag.clone(), msg).into();
125 Value::Error(triomphe::Arc::new(e))
126 }};
127}
128
129#[macro_export]
130macro_rules! defetyp {
131 ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
132 static $tag_name: ArcStr = literal!($tag);
133 static $name: ::std::sync::LazyLock<$crate::typ::Type> =
134 ::std::sync::LazyLock::new(|| {
135 let scope = $crate::expr::ModPath::root();
136 $crate::expr::parser::parse_type(&format!($typ, $tag))
137 .expect("failed to parse type")
138 .scope_refs(&scope)
139 });
140 };
141}
142
143atomic_id!(LambdaId);
144
145impl From<u64> for LambdaId {
146 fn from(v: u64) -> Self {
147 LambdaId(v)
148 }
149}
150
151atomic_id!(BindId);
152
153impl From<u64> for BindId {
154 fn from(v: u64) -> Self {
155 BindId(v)
156 }
157}
158
159impl TryFrom<Value> for BindId {
160 type Error = anyhow::Error;
161
162 fn try_from(value: Value) -> Result<Self> {
163 match value {
164 Value::U64(id) => Ok(BindId(id)),
165 v => bail!("invalid bind id {v}"),
166 }
167 }
168}
169
170pub trait UserEvent: Clone + Debug + Any {
171 fn clear(&mut self);
172}
173
174#[derive(Debug, Clone)]
175pub struct NoUserEvent;
176
177impl UserEvent for NoUserEvent {
178 fn clear(&mut self) {}
179}
180
181#[derive(Debug, Clone, Copy)]
182#[bitflags]
183#[repr(u64)]
184pub enum PrintFlag {
185 DerefTVars,
188 ReplacePrims,
191 NoSource,
193 NoParents,
195}
196
197thread_local! {
198 static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims | PrintFlag::NoSource);
199}
200
201pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
205 flags: G,
206 f: F,
207) -> R {
208 let prev = PRINT_FLAGS.replace(flags.into());
209 let res = f();
210 PRINT_FLAGS.set(prev);
211 res
212}
213
214#[derive(Debug)]
220pub struct Event<E: UserEvent> {
221 pub init: bool,
222 pub variables: FxHashMap<BindId, Value>,
223 pub netidx: FxHashMap<SubId, subscriber::Event>,
224 pub writes: FxHashMap<Id, WriteRequest>,
225 pub rpc_calls: FxHashMap<BindId, RpcCall>,
226 pub user: E,
227}
228
229impl<E: UserEvent> Event<E> {
230 pub fn new(user: E) -> Self {
231 Event {
232 init: false,
233 variables: HashMap::default(),
234 netidx: HashMap::default(),
235 writes: HashMap::default(),
236 rpc_calls: HashMap::default(),
237 user,
238 }
239 }
240
241 pub fn clear(&mut self) {
242 let Self { init, variables, netidx, rpc_calls, writes, user } = self;
243 *init = false;
244 variables.clear();
245 netidx.clear();
246 rpc_calls.clear();
247 writes.clear();
248 user.clear();
249 }
250}
251
252#[derive(Debug, Clone, Default)]
253pub struct Refs {
254 refed: LPooled<FxHashSet<BindId>>,
255 bound: LPooled<FxHashSet<BindId>>,
256}
257
258impl Refs {
259 pub fn clear(&mut self) {
260 self.refed.clear();
261 self.bound.clear();
262 }
263
264 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
265 for id in &*self.refed {
266 if !self.bound.contains(id) {
267 f(*id);
268 }
269 }
270 }
271}
272
273pub type Node<R, E> = Box<dyn Update<R, E>>;
274
275pub type BuiltInInitFn<R, E> = sync::Arc<
276 dyn for<'a, 'b, 'c> Fn(
277 &'a mut ExecCtx<R, E>,
278 &'a FnType,
279 &'b Scope,
280 &'c [Node<R, E>],
281 ExprId,
282 ) -> Result<Box<dyn Apply<R, E>>>
283 + Send
284 + Sync
285 + 'static,
286>;
287
288pub type InitFn<R, E> = sync::Arc<
289 dyn for<'a, 'b, 'c> Fn(
290 &'a Scope,
291 &'b mut ExecCtx<R, E>,
292 &'c mut [Node<R, E>],
293 ExprId,
294 bool,
295 ) -> Result<Box<dyn Apply<R, E>>>
296 + Send
297 + Sync
298 + 'static,
299>;
300
301pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
306 fn update(
307 &mut self,
308 ctx: &mut ExecCtx<R, E>,
309 from: &mut [Node<R, E>],
310 event: &mut Event<E>,
311 ) -> Option<Value>;
312
313 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
316 ()
317 }
318
319 fn typecheck(
322 &mut self,
323 _ctx: &mut ExecCtx<R, E>,
324 _from: &mut [Node<R, E>],
325 ) -> Result<()> {
326 Ok(())
327 }
328
329 fn typ(&self) -> Arc<FnType> {
332 static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
333 Arc::new(FnType {
334 args: Arc::from_iter([]),
335 constraints: Arc::new(RwLock::new(LPooled::take())),
336 rtype: Type::Bottom,
337 throws: Type::Bottom,
338 vargs: None,
339 })
340 });
341 Arc::clone(&*EMPTY)
342 }
343
344 fn refs<'a>(&self, _refs: &mut Refs) {}
348
349 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
352}
353
354pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
358 fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
361
362 fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
364
365 fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
367
368 fn typ(&self) -> &Type;
370
371 fn refs(&self, refs: &mut Refs);
374
375 fn spec(&self) -> &Expr;
377
378 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
380}
381
382pub trait BuiltIn<R: Rt, E: UserEvent> {
383 const NAME: &str;
384 const TYP: LazyLock<FnType>;
385
386 fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
387}
388
389pub trait Rt: Debug + 'static {
390 fn clear(&mut self);
391
392 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
396
397 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
399
400 fn list(&mut self, id: BindId, path: Path);
405
406 fn list_table(&mut self, id: BindId, path: Path);
409
410 fn stop_list(&mut self, id: BindId);
413
414 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
418
419 fn update(&mut self, id: &Val, value: Value);
421
422 fn unpublish(&mut self, id: Val, ref_by: ExprId);
424
425 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
435 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
436
437 fn set_var(&mut self, id: BindId, value: Value);
448
449 fn notify_set(&mut self, id: BindId);
456
457 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
463
464 fn publish_rpc(
473 &mut self,
474 name: Path,
475 doc: Value,
476 spec: Vec<ArgSpec>,
477 id: BindId,
478 ) -> Result<()>;
479
480 fn unpublish_rpc(&mut self, name: Path);
482
483 fn set_timer(&mut self, id: BindId, timeout: Duration);
487}
488
489pub struct ExecCtx<R: Rt, E: UserEvent> {
490 builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
491 builtins_allowed: bool,
492 tags: FxHashSet<ArcStr>,
493 pub env: Env<R, E>,
494 pub cached: FxHashMap<BindId, Value>,
495 pub rt: R,
496}
497
498impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
499 pub fn clear(&mut self) {
500 self.env.clear();
501 self.rt.clear();
502 }
503
504 pub fn new(user: R) -> Self {
513 Self {
514 env: Env::new(),
515 builtins: FxHashMap::default(),
516 builtins_allowed: true,
517 tags: FxHashSet::default(),
518 cached: HashMap::default(),
519 rt: user,
520 }
521 }
522
523 pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
524 let f = T::init(self);
525 match self.builtins.entry(T::NAME) {
526 Entry::Vacant(e) => {
527 e.insert((T::TYP.clone(), f));
528 }
529 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
530 }
531 Ok(())
532 }
533
534 pub fn set_var(&mut self, id: BindId, v: Value) {
538 self.cached.insert(id, v.clone());
539 self.rt.set_var(id, v)
540 }
541
542 fn tag(&mut self, s: &ArcStr) -> ArcStr {
543 match self.tags.get(s) {
544 Some(s) => s.clone(),
545 None => {
546 self.tags.insert(s.clone());
547 s.clone()
548 }
549 }
550 }
551
552 pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
557 &mut self,
558 env: Env<R, E>,
559 f: F,
560 ) -> T {
561 let snap = self.env.restore_lexical_env(env);
562 let orig = mem::replace(&mut self.env, snap);
563 let r = f(self);
564 self.env = self.env.restore_lexical_env(orig);
565 r
566 }
567}
568
569#[derive(Debug, Clone)]
570pub struct Scope {
571 pub lexical: ModPath,
572 pub dynamic: ModPath,
573}
574
575impl Scope {
576 pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
577 Self {
578 lexical: ModPath(self.lexical.append(s)),
579 dynamic: ModPath(self.dynamic.append(s)),
580 }
581 }
582
583 pub fn root() -> Self {
584 Self { lexical: ModPath::root(), dynamic: ModPath::root() }
585 }
586}
587
588pub fn compile<R: Rt, E: UserEvent>(
591 ctx: &mut ExecCtx<R, E>,
592 flags: BitFlags<CFlag>,
593 scope: &Scope,
594 spec: Expr,
595) -> Result<Node<R, E>> {
596 let top_id = spec.id;
597 let env = ctx.env.clone();
598 let st = Instant::now();
599 let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
600 Ok(n) => n,
601 Err(e) => {
602 ctx.env = env;
603 return Err(e);
604 }
605 };
606 info!("compile time {:?}", st.elapsed());
607 let st = Instant::now();
608 if let Err(e) = node.typecheck(ctx) {
609 ctx.env = env;
610 return Err(e);
611 }
612 info!("typecheck time {:?}", st.elapsed());
613 Ok(node)
614}