1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14 env::Env,
15 expr::{ExprId, ModPath},
16 typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use futures::channel::mpsc;
23use fxhash::{FxHashMap, FxHashSet};
24use log::info;
25use netidx::{
26 path::Path,
27 publisher::{Id, Val, WriteRequest},
28 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
29};
30use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
31use node::compiler;
32use parking_lot::RwLock;
33use poolshark::{
34 global::{GPooled, Pool},
35 local::LPooled,
36};
37use std::{
38 any::{Any, TypeId},
39 cell::Cell,
40 collections::{hash_map::Entry, HashMap},
41 fmt::Debug,
42 mem,
43 sync::{
44 self,
45 atomic::{AtomicBool, Ordering},
46 LazyLock,
47 },
48 time::Duration,
49};
50use tokio::{task, time::Instant};
51use triomphe::Arc;
52
53#[derive(Debug, Clone, Copy)]
54#[bitflags]
55#[repr(u64)]
56pub enum CFlag {
57 WarnUnhandled,
58 WarnUnhandledArith,
59 WarnUnused,
60 WarningsAreErrors,
61}
62
63#[allow(dead_code)]
64static TRACE: AtomicBool = AtomicBool::new(false);
65
66#[allow(dead_code)]
67fn set_trace(b: bool) {
68 TRACE.store(b, Ordering::Relaxed)
69}
70
71#[allow(dead_code)]
72fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
73 let set = if enable {
74 eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
75 let prev = trace();
76 set_trace(true);
77 !prev
78 } else {
79 false
80 };
81 let r = match f() {
82 Err(e) => {
83 eprintln!("traced at {} failed with {e:?}", spec.pos);
84 Err(e)
85 }
86 r => r,
87 };
88 if set {
89 eprintln!("trace disabled at {}", spec.pos);
90 set_trace(false)
91 }
92 r
93}
94
95#[allow(dead_code)]
96fn trace() -> bool {
97 TRACE.load(Ordering::Relaxed)
98}
99
100#[macro_export]
101macro_rules! tdbg {
102 ($e:expr) => {
103 if $crate::trace() {
104 dbg!($e)
105 } else {
106 $e
107 }
108 };
109}
110
111#[macro_export]
112macro_rules! err {
113 ($tag:expr, $err:literal) => {{
114 let e: Value = ($tag.clone(), arcstr::literal!($err)).into();
115 Value::Error(triomphe::Arc::new(e))
116 }};
117}
118
119#[macro_export]
120macro_rules! errf {
121 ($tag:expr, $fmt:expr, $args:tt) => {{
122 let msg: ArcStr = compact_str::format_compact!($fmt, $args).as_str().into();
123 let e: Value = ($tag.clone(), msg).into();
124 Value::Error(triomphe::Arc::new(e))
125 }};
126 ($tag:expr, $fmt:expr) => {{
127 let msg: ArcStr = compact_str::format_compact!($fmt).as_str().into();
128 let e: Value = ($tag.clone(), msg).into();
129 Value::Error(triomphe::Arc::new(e))
130 }};
131}
132
133#[macro_export]
134macro_rules! defetyp {
135 ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
136 static $tag_name: ArcStr = arcstr::literal!($tag);
137 static $name: ::std::sync::LazyLock<$crate::typ::Type> =
138 ::std::sync::LazyLock::new(|| {
139 let scope = $crate::expr::ModPath::root();
140 $crate::expr::parser::parse_type(&format!($typ, $tag))
141 .expect("failed to parse type")
142 .scope_refs(&scope)
143 });
144 };
145}
146
147defetyp!(CAST_ERR, CAST_ERR_TAG, "InvalidCast", "Error<`{}(string)>");
148
149atomic_id!(LambdaId);
150
151impl From<u64> for LambdaId {
152 fn from(v: u64) -> Self {
153 LambdaId(v)
154 }
155}
156
157atomic_id!(BindId);
158
159impl From<u64> for BindId {
160 fn from(v: u64) -> Self {
161 BindId(v)
162 }
163}
164
165impl TryFrom<Value> for BindId {
166 type Error = anyhow::Error;
167
168 fn try_from(value: Value) -> Result<Self> {
169 match value {
170 Value::U64(id) => Ok(BindId(id)),
171 v => bail!("invalid bind id {v}"),
172 }
173 }
174}
175
176pub trait UserEvent: Clone + Debug + Any {
177 fn clear(&mut self);
178}
179
180#[derive(Debug, Clone)]
181pub struct NoUserEvent;
182
183impl UserEvent for NoUserEvent {
184 fn clear(&mut self) {}
185}
186
187#[derive(Debug, Clone, Copy)]
188#[bitflags]
189#[repr(u64)]
190pub enum PrintFlag {
191 DerefTVars,
194 ReplacePrims,
197 NoSource,
199 NoParents,
201}
202
203thread_local! {
204 static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims | PrintFlag::NoSource);
205}
206
207pub static CBATCH_POOL: LazyLock<Pool<Vec<(BindId, Value)>>> =
209 LazyLock::new(|| Pool::new(10000, 1000));
210
211pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
215 flags: G,
216 f: F,
217) -> R {
218 let prev = PRINT_FLAGS.replace(flags.into());
219 let res = f();
220 PRINT_FLAGS.set(prev);
221 res
222}
223
224#[derive(Debug)]
230pub struct Event<E: UserEvent> {
231 pub init: bool,
232 pub variables: FxHashMap<BindId, Value>,
233 pub netidx: FxHashMap<SubId, subscriber::Event>,
234 pub writes: FxHashMap<Id, WriteRequest>,
235 pub rpc_calls: FxHashMap<BindId, RpcCall>,
236 pub user: E,
237}
238
239impl<E: UserEvent> Event<E> {
240 pub fn new(user: E) -> Self {
241 Event {
242 init: false,
243 variables: HashMap::default(),
244 netidx: HashMap::default(),
245 writes: HashMap::default(),
246 rpc_calls: HashMap::default(),
247 user,
248 }
249 }
250
251 pub fn clear(&mut self) {
252 let Self { init, variables, netidx, rpc_calls, writes, user } = self;
253 *init = false;
254 variables.clear();
255 netidx.clear();
256 rpc_calls.clear();
257 writes.clear();
258 user.clear();
259 }
260}
261
262#[derive(Debug, Clone, Default)]
263pub struct Refs {
264 refed: LPooled<FxHashSet<BindId>>,
265 bound: LPooled<FxHashSet<BindId>>,
266}
267
268impl Refs {
269 pub fn clear(&mut self) {
270 self.refed.clear();
271 self.bound.clear();
272 }
273
274 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
275 for id in &*self.refed {
276 if !self.bound.contains(id) {
277 f(*id);
278 }
279 }
280 }
281}
282
283pub type Node<R, E> = Box<dyn Update<R, E>>;
284
285pub type BuiltInInitFn<R, E> = sync::Arc<
286 dyn for<'a, 'b, 'c> Fn(
287 &'a mut ExecCtx<R, E>,
288 &'a FnType,
289 &'b Scope,
290 &'c [Node<R, E>],
291 ExprId,
292 ) -> Result<Box<dyn Apply<R, E>>>
293 + Send
294 + Sync
295 + 'static,
296>;
297
298pub type InitFn<R, E> = sync::Arc<
299 dyn for<'a, 'b, 'c> Fn(
300 &'a Scope,
301 &'b mut ExecCtx<R, E>,
302 &'c mut [Node<R, E>],
303 ExprId,
304 bool,
305 ) -> Result<Box<dyn Apply<R, E>>>
306 + Send
307 + Sync
308 + 'static,
309>;
310
311pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
316 fn update(
317 &mut self,
318 ctx: &mut ExecCtx<R, E>,
319 from: &mut [Node<R, E>],
320 event: &mut Event<E>,
321 ) -> Option<Value>;
322
323 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
326 ()
327 }
328
329 fn typecheck(
332 &mut self,
333 _ctx: &mut ExecCtx<R, E>,
334 _from: &mut [Node<R, E>],
335 ) -> Result<()> {
336 Ok(())
337 }
338
339 fn typ(&self) -> Arc<FnType> {
342 static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
343 Arc::new(FnType {
344 args: Arc::from_iter([]),
345 constraints: Arc::new(RwLock::new(LPooled::take())),
346 rtype: Type::Bottom,
347 throws: Type::Bottom,
348 vargs: None,
349 })
350 });
351 Arc::clone(&*EMPTY)
352 }
353
354 fn refs<'a>(&self, _refs: &mut Refs) {}
358
359 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
362}
363
364pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
368 fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
371
372 fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
374
375 fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
377
378 fn typ(&self) -> &Type;
380
381 fn refs(&self, refs: &mut Refs);
384
385 fn spec(&self) -> &Expr;
387
388 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
390}
391
392pub trait BuiltIn<R: Rt, E: UserEvent> {
393 const NAME: &str;
394 const TYP: LazyLock<FnType>;
395
396 fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
397}
398
399pub trait Abortable {
400 fn abort(&self);
401}
402
403impl Abortable for task::AbortHandle {
404 fn abort(&self) {
405 task::AbortHandle::abort(self)
406 }
407}
408
409pub trait Rt: Debug + 'static {
410 type AbortHandle: Abortable;
411
412 fn clear(&mut self);
413
414 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
419
420 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
422
423 fn list(&mut self, id: BindId, path: Path);
428
429 fn list_table(&mut self, id: BindId, path: Path);
432
433 fn stop_list(&mut self, id: BindId);
436
437 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
441
442 fn update(&mut self, id: &Val, value: Value);
444
445 fn unpublish(&mut self, id: Val, ref_by: ExprId);
447
448 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
458 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
459
460 fn set_var(&mut self, id: BindId, value: Value);
471
472 fn notify_set(&mut self, id: BindId);
479
480 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
486
487 fn publish_rpc(
496 &mut self,
497 name: Path,
498 doc: Value,
499 spec: Vec<ArgSpec>,
500 id: BindId,
501 ) -> Result<()>;
502
503 fn unpublish_rpc(&mut self, name: Path);
505
506 fn set_timer(&mut self, id: BindId, timeout: Duration);
510
511 fn spawn<F: Future<Output = (BindId, Value)> + Send + 'static>(
519 &mut self,
520 f: F,
521 ) -> Self::AbortHandle;
522
523 fn watch(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>);
528}
529
530#[derive(Default)]
531pub struct LibState(FxHashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>);
532
533impl LibState {
534 pub fn get_or_default<T>(&mut self) -> &mut T
540 where
541 T: Default + Any + Send + Sync + 'static,
542 {
543 self.0
544 .entry(TypeId::of::<T>())
545 .or_insert_with(|| {
546 Box::new(T::default()) as Box<dyn Any + Send + Sync + 'static>
547 })
548 .downcast_mut::<T>()
549 .unwrap()
550 }
551
552 pub fn get_or_else<T, F>(&mut self, f: F) -> &mut T
558 where
559 T: Any + Send + Sync + 'static,
560 F: FnOnce() -> T,
561 {
562 self.0
563 .entry(TypeId::of::<T>())
564 .or_insert_with(|| Box::new(f()) as Box<dyn Any + Send + Sync + 'static>)
565 .downcast_mut::<T>()
566 .unwrap()
567 }
568
569 pub fn get<T>(&mut self) -> Option<&T>
574 where
575 T: Any + Send + Sync + 'static,
576 {
577 self.0.get(&TypeId::of::<T>()).map(|t| t.downcast_ref::<T>().unwrap())
578 }
579
580 pub fn get_mut<T>(&mut self) -> Option<&mut T>
585 where
586 T: Any + Send + Sync + 'static,
587 {
588 self.0.get_mut(&TypeId::of::<T>()).map(|t| t.downcast_mut::<T>().unwrap())
589 }
590
591 pub fn set<T>(&mut self, t: T) -> Option<Box<T>>
595 where
596 T: Any + Send + Sync + 'static,
597 {
598 self.0
599 .insert(
600 TypeId::of::<T>(),
601 Box::new(t) as Box<dyn Any + Send + Sync + 'static>,
602 )
603 .map(|t| t.downcast::<T>().unwrap())
604 }
605
606 pub fn remove<T>(&mut self) -> Option<Box<T>>
608 where
609 T: Any + Send + Sync + 'static,
610 {
611 self.0.remove(&TypeId::of::<T>()).map(|t| t.downcast::<T>().unwrap())
612 }
613}
614
615pub struct ExecCtx<R: Rt, E: UserEvent> {
616 builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
617 builtins_allowed: bool,
618 tags: FxHashSet<ArcStr>,
619 pub libstate: LibState,
621 pub env: Env<R, E>,
623 pub cached: FxHashMap<BindId, Value>,
625 pub rt: R,
627}
628
629impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
630 pub fn clear(&mut self) {
631 self.env.clear();
632 self.rt.clear();
633 }
634
635 pub fn new(user: R) -> Self {
644 Self {
645 env: Env::new(),
646 builtins: FxHashMap::default(),
647 builtins_allowed: true,
648 libstate: LibState::default(),
649 tags: FxHashSet::default(),
650 cached: HashMap::default(),
651 rt: user,
652 }
653 }
654
655 pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
656 let f = T::init(self);
657 match self.builtins.entry(T::NAME) {
658 Entry::Vacant(e) => {
659 e.insert((T::TYP.clone(), f));
660 }
661 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
662 }
663 Ok(())
664 }
665
666 pub fn set_var(&mut self, id: BindId, v: Value) {
670 self.cached.insert(id, v.clone());
671 self.rt.set_var(id, v)
672 }
673
674 fn tag(&mut self, s: &ArcStr) -> ArcStr {
675 match self.tags.get(s) {
676 Some(s) => s.clone(),
677 None => {
678 self.tags.insert(s.clone());
679 s.clone()
680 }
681 }
682 }
683
684 pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
689 &mut self,
690 env: Env<R, E>,
691 f: F,
692 ) -> T {
693 let snap = self.env.restore_lexical_env(env);
694 let orig = mem::replace(&mut self.env, snap);
695 let r = f(self);
696 self.env = self.env.restore_lexical_env(orig);
697 r
698 }
699}
700
701#[derive(Debug, Clone)]
702pub struct Scope {
703 pub lexical: ModPath,
704 pub dynamic: ModPath,
705}
706
707impl Scope {
708 pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
709 Self {
710 lexical: ModPath(self.lexical.append(s)),
711 dynamic: ModPath(self.dynamic.append(s)),
712 }
713 }
714
715 pub fn root() -> Self {
716 Self { lexical: ModPath::root(), dynamic: ModPath::root() }
717 }
718}
719
720pub fn compile<R: Rt, E: UserEvent>(
723 ctx: &mut ExecCtx<R, E>,
724 flags: BitFlags<CFlag>,
725 scope: &Scope,
726 spec: Expr,
727) -> Result<Node<R, E>> {
728 let top_id = spec.id;
729 let env = ctx.env.clone();
730 let st = Instant::now();
731 let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
732 Ok(n) => n,
733 Err(e) => {
734 ctx.env = env;
735 return Err(e);
736 }
737 };
738 info!("compile time {:?}", st.elapsed());
739 let st = Instant::now();
740 if let Err(e) = node.typecheck(ctx) {
741 ctx.env = env;
742 return Err(e);
743 }
744 info!("typecheck time {:?}", st.elapsed());
745 Ok(node)
746}