1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14 env::Env,
15 expr::{ExprId, ModPath},
16 node::lambda::LambdaDef,
17 typ::{FnType, Type},
18};
19use anyhow::{bail, Result};
20use arcstr::ArcStr;
21use enumflags2::{bitflags, BitFlags};
22use expr::Expr;
23use futures::channel::mpsc;
24use fxhash::{FxHashMap, FxHashSet};
25use log::info;
26use netidx::{
27 path::Path,
28 publisher::{Id, Val, WriteRequest},
29 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
30};
31use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
32use netidx_value::{abstract_type::AbstractWrapper, Abstract};
33use node::compiler;
34use parking_lot::{Mutex, RwLock};
35use poolshark::{
36 global::{GPooled, Pool},
37 local::LPooled,
38};
39use std::{
40 any::{Any, TypeId},
41 cell::Cell,
42 collections::{
43 hash_map::{self, Entry},
44 HashMap,
45 },
46 fmt::Debug,
47 mem,
48 sync::{
49 self,
50 atomic::{AtomicBool, Ordering},
51 LazyLock,
52 },
53 time::Duration,
54};
55use tokio::{task, time::Instant};
56use triomphe::Arc;
57use uuid::Uuid;
58
59#[derive(Debug, Clone, Copy)]
60#[bitflags]
61#[repr(u64)]
62pub enum CFlag {
63 WarnUnhandled,
64 WarnUnhandledArith,
65 WarnUnused,
66 WarningsAreErrors,
67}
68
69#[allow(dead_code)]
70static TRACE: AtomicBool = AtomicBool::new(false);
71
72#[allow(dead_code)]
73fn set_trace(b: bool) {
74 TRACE.store(b, Ordering::Relaxed)
75}
76
77#[allow(dead_code)]
78fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
79 let set = if enable {
80 eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
81 let prev = trace();
82 set_trace(true);
83 !prev
84 } else {
85 false
86 };
87 let r = match f() {
88 Err(e) => {
89 eprintln!("traced at {} failed with {e:?}", spec.pos);
90 Err(e)
91 }
92 r => r,
93 };
94 if set {
95 eprintln!("trace disabled at {}", spec.pos);
96 set_trace(false)
97 }
98 r
99}
100
101#[allow(dead_code)]
102fn trace() -> bool {
103 TRACE.load(Ordering::Relaxed)
104}
105
106#[macro_export]
107macro_rules! tdbg {
108 ($e:expr) => {
109 if $crate::trace() {
110 dbg!($e)
111 } else {
112 $e
113 }
114 };
115}
116
117#[macro_export]
118macro_rules! err {
119 ($tag:expr, $err:literal) => {{
120 let e: Value = ($tag.clone(), arcstr::literal!($err)).into();
121 Value::Error(triomphe::Arc::new(e))
122 }};
123}
124
125#[macro_export]
126macro_rules! errf {
127 ($tag:expr, $fmt:expr, $($args:expr),*) => {{
128 let msg: ArcStr = compact_str::format_compact!($fmt, $($args),*).as_str().into();
129 let e: Value = ($tag.clone(), msg).into();
130 Value::Error(triomphe::Arc::new(e))
131 }};
132 ($tag:expr, $fmt:expr) => {{
133 let msg: ArcStr = compact_str::format_compact!($fmt).as_str().into();
134 let e: Value = ($tag.clone(), msg).into();
135 Value::Error(triomphe::Arc::new(e))
136 }};
137}
138
139#[macro_export]
140macro_rules! defetyp {
141 ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
142 static $tag_name: ArcStr = arcstr::literal!($tag);
143 static $name: ::std::sync::LazyLock<$crate::typ::Type> =
144 ::std::sync::LazyLock::new(|| {
145 let scope = $crate::expr::ModPath::root();
146 $crate::expr::parser::parse_type(&format!($typ, $tag))
147 .expect("failed to parse type")
148 .scope_refs(&scope)
149 });
150 };
151}
152
153defetyp!(CAST_ERR, CAST_ERR_TAG, "InvalidCast", "Error<`{}(string)>");
154
155atomic_id!(LambdaId);
156
157impl From<u64> for LambdaId {
158 fn from(v: u64) -> Self {
159 LambdaId(v)
160 }
161}
162
163atomic_id!(BindId);
164
165impl From<u64> for BindId {
166 fn from(v: u64) -> Self {
167 BindId(v)
168 }
169}
170
171impl TryFrom<Value> for BindId {
172 type Error = anyhow::Error;
173
174 fn try_from(value: Value) -> Result<Self> {
175 match value {
176 Value::U64(id) => Ok(BindId(id)),
177 v => bail!("invalid bind id {v}"),
178 }
179 }
180}
181
182pub trait UserEvent: Clone + Debug + Any {
183 fn clear(&mut self);
184}
185
186pub trait CustomBuiltinType: Debug + Any + Send + Sync {}
187
188impl CustomBuiltinType for Value {}
189impl CustomBuiltinType for Option<Value> {}
190
191#[derive(Debug, Clone)]
192pub struct NoUserEvent;
193
194impl UserEvent for NoUserEvent {
195 fn clear(&mut self) {}
196}
197
198#[derive(Debug, Clone, Copy)]
199#[bitflags]
200#[repr(u64)]
201pub enum PrintFlag {
202 DerefTVars,
205 ReplacePrims,
208 NoSource,
210 NoParents,
212}
213
214thread_local! {
215 static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims.into());
216}
217
218pub static CBATCH_POOL: LazyLock<Pool<Vec<(BindId, Box<dyn CustomBuiltinType>)>>> =
220 LazyLock::new(|| Pool::new(10000, 1000));
221
222pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
226 flags: G,
227 f: F,
228) -> R {
229 let prev = PRINT_FLAGS.replace(flags.into());
230 let res = f();
231 PRINT_FLAGS.set(prev);
232 res
233}
234
235#[derive(Debug)]
241pub struct Event<E: UserEvent> {
242 pub init: bool,
243 pub variables: FxHashMap<BindId, Value>,
244 pub netidx: FxHashMap<SubId, subscriber::Event>,
245 pub writes: FxHashMap<Id, WriteRequest>,
246 pub rpc_calls: FxHashMap<BindId, RpcCall>,
247 pub custom: FxHashMap<BindId, Box<dyn CustomBuiltinType>>,
248 pub user: E,
249}
250
251impl<E: UserEvent> Event<E> {
252 pub fn new(user: E) -> Self {
253 Event {
254 init: false,
255 variables: HashMap::default(),
256 netidx: HashMap::default(),
257 writes: HashMap::default(),
258 rpc_calls: HashMap::default(),
259 custom: HashMap::default(),
260 user,
261 }
262 }
263
264 pub fn clear(&mut self) {
265 let Self { init, variables, netidx, rpc_calls, writes, custom, user } = self;
266 *init = false;
267 variables.clear();
268 netidx.clear();
269 rpc_calls.clear();
270 custom.clear();
271 writes.clear();
272 user.clear();
273 }
274}
275
276#[derive(Debug, Clone, Default)]
277pub struct Refs {
278 refed: LPooled<FxHashSet<BindId>>,
279 bound: LPooled<FxHashSet<BindId>>,
280}
281
282impl Refs {
283 pub fn clear(&mut self) {
284 self.refed.clear();
285 self.bound.clear();
286 }
287
288 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
289 for id in &*self.refed {
290 if !self.bound.contains(id) {
291 f(*id);
292 }
293 }
294 }
295}
296
297pub type Node<R, E> = Box<dyn Update<R, E>>;
298
299pub type BuiltInInitFn<R, E> = sync::Arc<
300 dyn for<'a, 'b, 'c> Fn(
301 &'a mut ExecCtx<R, E>,
302 &'a FnType,
303 &'b Scope,
304 &'c [Node<R, E>],
305 ExprId,
306 ) -> Result<Box<dyn Apply<R, E>>>
307 + Send
308 + Sync
309 + 'static,
310>;
311
312pub type InitFn<R, E> = sync::Arc<
313 dyn for<'a, 'b, 'c> Fn(
314 &'a Scope,
315 &'b mut ExecCtx<R, E>,
316 &'c mut [Node<R, E>],
317 ExprId,
318 bool,
319 ) -> Result<Box<dyn Apply<R, E>>>
320 + Send
321 + Sync
322 + 'static,
323>;
324
325pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
330 fn update(
331 &mut self,
332 ctx: &mut ExecCtx<R, E>,
333 from: &mut [Node<R, E>],
334 event: &mut Event<E>,
335 ) -> Option<Value>;
336
337 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
340 ()
341 }
342
343 fn typecheck(
346 &mut self,
347 _ctx: &mut ExecCtx<R, E>,
348 _from: &mut [Node<R, E>],
349 ) -> Result<()> {
350 Ok(())
351 }
352
353 fn typ(&self) -> Arc<FnType> {
356 static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
357 Arc::new(FnType {
358 args: Arc::from_iter([]),
359 constraints: Arc::new(RwLock::new(LPooled::take())),
360 rtype: Type::Bottom,
361 throws: Type::Bottom,
362 vargs: None,
363 explicit_throws: false,
364 })
365 });
366 Arc::clone(&*EMPTY)
367 }
368
369 fn refs<'a>(&self, _refs: &mut Refs) {}
373
374 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
377}
378
379pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
383 fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
386
387 fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
389
390 fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
392
393 fn typ(&self) -> &Type;
395
396 fn refs(&self, refs: &mut Refs);
399
400 fn spec(&self) -> &Expr;
402
403 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
405}
406
407pub trait BuiltIn<R: Rt, E: UserEvent> {
408 const NAME: &str;
409 const TYP: LazyLock<FnType>;
410
411 fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
412}
413
414pub trait Abortable {
415 fn abort(&self);
416}
417
418impl Abortable for task::AbortHandle {
419 fn abort(&self) {
420 task::AbortHandle::abort(self)
421 }
422}
423
424pub trait Rt: Debug + Any {
425 type AbortHandle: Abortable;
426
427 fn clear(&mut self);
428
429 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
434
435 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
437
438 fn list(&mut self, id: BindId, path: Path);
443
444 fn list_table(&mut self, id: BindId, path: Path);
447
448 fn stop_list(&mut self, id: BindId);
451
452 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
456
457 fn update(&mut self, id: &Val, value: Value);
459
460 fn unpublish(&mut self, id: Val, ref_by: ExprId);
462
463 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
473 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
474
475 fn set_var(&mut self, id: BindId, value: Value);
486
487 fn notify_set(&mut self, id: BindId);
494
495 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
501
502 fn publish_rpc(
511 &mut self,
512 name: Path,
513 doc: Value,
514 spec: Vec<ArgSpec>,
515 id: BindId,
516 ) -> Result<()>;
517
518 fn unpublish_rpc(&mut self, name: Path);
520
521 fn set_timer(&mut self, id: BindId, timeout: Duration);
525
526 fn spawn<F: Future<Output = (BindId, Box<dyn CustomBuiltinType>)> + Send + 'static>(
534 &mut self,
535 f: F,
536 ) -> Self::AbortHandle;
537
538 fn spawn_var<F: Future<Output = (BindId, Value)> + Send + 'static>(
546 &mut self,
547 f: F,
548 ) -> Self::AbortHandle;
549
550 fn watch(
555 &mut self,
556 s: mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
557 );
558
559 fn watch_var(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>);
564}
565
566#[derive(Default)]
567pub struct LibState(FxHashMap<TypeId, Box<dyn Any + Send + Sync>>);
568
569impl LibState {
570 pub fn get_or_default<T>(&mut self) -> &mut T
576 where
577 T: Default + Any + Send + Sync,
578 {
579 self.0
580 .entry(TypeId::of::<T>())
581 .or_insert_with(|| Box::new(T::default()) as Box<dyn Any + Send + Sync>)
582 .downcast_mut::<T>()
583 .unwrap()
584 }
585
586 pub fn get_or_else<T, F>(&mut self, f: F) -> &mut T
592 where
593 T: Any + Send + Sync,
594 F: FnOnce() -> T,
595 {
596 self.0
597 .entry(TypeId::of::<T>())
598 .or_insert_with(|| Box::new(f()) as Box<dyn Any + Send + Sync>)
599 .downcast_mut::<T>()
600 .unwrap()
601 }
602
603 pub fn entry<'a, T>(
604 &'a mut self,
605 ) -> hash_map::Entry<'a, TypeId, Box<dyn Any + Send + Sync>>
606 where
607 T: Any + Send + Sync,
608 {
609 self.0.entry(TypeId::of::<T>())
610 }
611
612 pub fn contains<T>(&self) -> bool
614 where
615 T: Any + Send + Sync,
616 {
617 self.0.contains_key(&TypeId::of::<T>())
618 }
619
620 pub fn get<T>(&mut self) -> Option<&T>
625 where
626 T: Any + Send + Sync,
627 {
628 self.0.get(&TypeId::of::<T>()).map(|t| t.downcast_ref::<T>().unwrap())
629 }
630
631 pub fn get_mut<T>(&mut self) -> Option<&mut T>
636 where
637 T: Any + Send + Sync,
638 {
639 self.0.get_mut(&TypeId::of::<T>()).map(|t| t.downcast_mut::<T>().unwrap())
640 }
641
642 pub fn set<T>(&mut self, t: T) -> Option<Box<T>>
646 where
647 T: Any + Send + Sync,
648 {
649 self.0
650 .insert(TypeId::of::<T>(), Box::new(t) as Box<dyn Any + Send + Sync>)
651 .map(|t| t.downcast::<T>().unwrap())
652 }
653
654 pub fn remove<T>(&mut self) -> Option<Box<T>>
656 where
657 T: Any + Send + Sync,
658 {
659 self.0.remove(&TypeId::of::<T>()).map(|t| t.downcast::<T>().unwrap())
660 }
661}
662
663#[derive(Default)]
674pub struct AbstractTypeRegistry {
675 by_tid: FxHashMap<TypeId, Uuid>,
676 by_uuid: FxHashMap<Uuid, &'static str>,
677}
678
679impl AbstractTypeRegistry {
680 fn with<V, F: FnMut(&mut AbstractTypeRegistry) -> V>(mut f: F) -> V {
681 static REG: LazyLock<Mutex<AbstractTypeRegistry>> =
682 LazyLock::new(|| Mutex::new(AbstractTypeRegistry::default()));
683 let mut g = REG.lock();
684 f(&mut *g)
685 }
686
687 pub(crate) fn uuid<T: Any>(tag: &'static str) -> Uuid {
689 Self::with(|rg| {
690 *rg.by_tid.entry(TypeId::of::<T>()).or_insert_with(|| {
691 let id = Uuid::new_v4();
692 rg.by_uuid.insert(id, tag);
693 id
694 })
695 })
696 }
697
698 pub fn tag(a: &Abstract) -> Option<&'static str> {
700 Self::with(|rg| rg.by_uuid.get(&a.id()).map(|r| *r))
701 }
702
703 pub fn is_a(a: &Abstract, tag: &str) -> bool {
705 match Self::tag(a) {
706 Some(t) => t == tag,
707 None => false,
708 }
709 }
710}
711
712pub struct ExecCtx<R: Rt, E: UserEvent> {
713 lambdawrap: AbstractWrapper<LambdaDef<R, E>>,
715 builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
717 builtins_allowed: bool,
720 tags: FxHashSet<ArcStr>,
722 pub libstate: LibState,
724 pub env: Env,
726 pub cached: FxHashMap<BindId, Value>,
728 pub rt: R,
730}
731
732impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
733 pub fn clear(&mut self) {
734 self.env.clear();
735 self.rt.clear();
736 }
737
738 pub fn new(user: R) -> Result<Self> {
747 let id = AbstractTypeRegistry::uuid::<LambdaDef<R, E>>("lambda");
748 Ok(Self {
749 lambdawrap: Abstract::register(id)?,
750 env: Env::default(),
751 builtins: FxHashMap::default(),
752 builtins_allowed: true,
753 libstate: LibState::default(),
754 tags: FxHashSet::default(),
755 cached: HashMap::default(),
756 rt: user,
757 })
758 }
759
760 pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
761 let f = T::init(self);
762 match self.builtins.entry(T::NAME) {
763 Entry::Vacant(e) => {
764 e.insert((T::TYP.clone(), f));
765 }
766 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
767 }
768 Ok(())
769 }
770
771 pub fn set_var(&mut self, id: BindId, v: Value) {
775 self.cached.insert(id, v.clone());
776 self.rt.set_var(id, v)
777 }
778
779 fn tag(&mut self, s: &ArcStr) -> ArcStr {
780 match self.tags.get(s) {
781 Some(s) => s.clone(),
782 None => {
783 self.tags.insert(s.clone());
784 s.clone()
785 }
786 }
787 }
788
789 pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(&mut self, env: Env, f: F) -> T {
793 let snap = self.env.restore_lexical_env(env);
794 let orig = mem::replace(&mut self.env, snap);
795 let r = f(self);
796 self.env = self.env.restore_lexical_env(orig);
797 r
798 }
799
800 pub fn with_restored_mut<T, F: FnOnce(&mut Self) -> T>(
806 &mut self,
807 env: &mut Env,
808 f: F,
809 ) -> T {
810 let snap = self.env.restore_lexical_env_mut(env);
811 let orig = mem::replace(&mut self.env, snap);
812 let r = f(self);
813 *env = self.env.clone();
814 self.env = self.env.restore_lexical_env(orig);
815 r
816 }
817}
818
819#[derive(Debug, Clone)]
820pub struct Scope {
821 pub lexical: ModPath,
822 pub dynamic: ModPath,
823}
824
825impl Scope {
826 pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
827 Self {
828 lexical: ModPath(self.lexical.append(s)),
829 dynamic: ModPath(self.dynamic.append(s)),
830 }
831 }
832
833 pub fn root() -> Self {
834 Self { lexical: ModPath::root(), dynamic: ModPath::root() }
835 }
836}
837
838pub fn compile<R: Rt, E: UserEvent>(
841 ctx: &mut ExecCtx<R, E>,
842 flags: BitFlags<CFlag>,
843 scope: &Scope,
844 spec: Expr,
845) -> Result<Node<R, E>> {
846 let top_id = spec.id;
847 let env = ctx.env.clone();
848 let st = Instant::now();
849 let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
850 Ok(n) => n,
851 Err(e) => {
852 ctx.env = env;
853 return Err(e);
854 }
855 };
856 info!("compile time {:?}", st.elapsed());
857 let st = Instant::now();
858 if let Err(e) = node.typecheck(ctx) {
859 ctx.env = env;
860 return Err(e);
861 }
862 info!("typecheck time {:?}", st.elapsed());
863 Ok(node)
864}