1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14 env::Env,
15 expr::{ExprId, ModPath},
16 node::lambda::LambdaDef,
17 typ::{FnType, Type},
18};
19use anyhow::{bail, Result};
20use arcstr::ArcStr;
21use enumflags2::{bitflags, BitFlags};
22use expr::Expr;
23use futures::channel::mpsc;
24use fxhash::{FxHashMap, FxHashSet};
25use log::info;
26use netidx::{
27 path::Path,
28 publisher::{Id, Val, WriteRequest},
29 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
30};
31use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
32use netidx_value::{abstract_type::AbstractWrapper, Abstract};
33use node::compiler;
34use parking_lot::{Mutex, RwLock};
35use poolshark::{
36 global::{GPooled, Pool},
37 local::LPooled,
38};
39use std::{
40 any::{Any, TypeId},
41 cell::Cell,
42 collections::{
43 hash_map::{self, Entry},
44 HashMap,
45 },
46 fmt::Debug,
47 mem,
48 sync::{
49 self,
50 atomic::{AtomicBool, Ordering},
51 LazyLock,
52 },
53 time::Duration,
54};
55use tokio::{task, time::Instant};
56use triomphe::Arc;
57use uuid::Uuid;
58
59#[derive(Debug, Clone, Copy)]
60#[bitflags]
61#[repr(u64)]
62pub enum CFlag {
63 WarnUnhandled,
64 WarnUnhandledArith,
65 WarnUnused,
66 WarningsAreErrors,
67}
68
69#[allow(dead_code)]
70static TRACE: AtomicBool = AtomicBool::new(false);
71
72#[allow(dead_code)]
73fn set_trace(b: bool) {
74 TRACE.store(b, Ordering::Relaxed)
75}
76
77#[allow(dead_code)]
78fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
79 let set = if enable {
80 eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
81 let prev = trace();
82 set_trace(true);
83 !prev
84 } else {
85 false
86 };
87 let r = match f() {
88 Err(e) => {
89 eprintln!("traced at {} failed with {e:?}", spec.pos);
90 Err(e)
91 }
92 r => r,
93 };
94 if set {
95 eprintln!("trace disabled at {}", spec.pos);
96 set_trace(false)
97 }
98 r
99}
100
101#[allow(dead_code)]
102fn trace() -> bool {
103 TRACE.load(Ordering::Relaxed)
104}
105
106#[macro_export]
107macro_rules! tdbg {
108 ($e:expr) => {
109 if $crate::trace() {
110 dbg!($e)
111 } else {
112 $e
113 }
114 };
115}
116
117#[macro_export]
118macro_rules! err {
119 ($tag:expr, $err:literal) => {{
120 let e: Value = ($tag.clone(), ::arcstr::literal!($err)).into();
121 Value::Error(::triomphe::Arc::new(e))
122 }};
123}
124
125#[macro_export]
126macro_rules! errf {
127 ($tag:expr, $fmt:expr, $($args:expr),*) => {{
128 let msg: ArcStr = ::compact_str::format_compact!($fmt, $($args),*).as_str().into();
129 let e: Value = ($tag.clone(), msg).into();
130 Value::Error(::triomphe::Arc::new(e))
131 }};
132 ($tag:expr, $fmt:expr) => {{
133 let msg: ArcStr = ::compact_str::format_compact!($fmt).as_str().into();
134 let e: Value = ($tag.clone(), msg).into();
135 Value::Error(::triomphe::Arc::new(e))
136 }};
137}
138
139#[macro_export]
140macro_rules! defetyp {
141 ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
142 static $tag_name: ArcStr = ::arcstr::literal!($tag);
143 static $name: ::std::sync::LazyLock<$crate::typ::Type> =
144 ::std::sync::LazyLock::new(|| {
145 let scope = $crate::expr::ModPath::root();
146 $crate::expr::parser::parse_type(&format!($typ, $tag))
147 .expect("failed to parse type")
148 .scope_refs(&scope)
149 });
150 };
151}
152
153defetyp!(CAST_ERR, CAST_ERR_TAG, "InvalidCast", "Error<`{}(string)>");
154
155atomic_id!(LambdaId);
156
157impl From<u64> for LambdaId {
158 fn from(v: u64) -> Self {
159 LambdaId(v)
160 }
161}
162
163atomic_id!(BindId);
164
165impl From<u64> for BindId {
166 fn from(v: u64) -> Self {
167 BindId(v)
168 }
169}
170
171impl TryFrom<Value> for BindId {
172 type Error = anyhow::Error;
173
174 fn try_from(value: Value) -> Result<Self> {
175 match value {
176 Value::U64(id) => Ok(BindId(id)),
177 v => bail!("invalid bind id {v}"),
178 }
179 }
180}
181
182pub trait UserEvent: Clone + Debug + Any {
183 fn clear(&mut self);
184}
185
186pub trait CustomBuiltinType: Debug + Any + Send + Sync {}
187
188impl CustomBuiltinType for Value {}
189impl CustomBuiltinType for Option<Value> {}
190
191#[derive(Debug, Clone)]
192pub struct NoUserEvent;
193
194impl UserEvent for NoUserEvent {
195 fn clear(&mut self) {}
196}
197
198#[derive(Debug, Clone, Copy)]
199#[bitflags]
200#[repr(u64)]
201pub enum PrintFlag {
202 DerefTVars,
205 ReplacePrims,
208 NoSource,
210 NoParents,
212}
213
214thread_local! {
215 static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims.into());
216}
217
218pub static CBATCH_POOL: LazyLock<Pool<Vec<(BindId, Box<dyn CustomBuiltinType>)>>> =
220 LazyLock::new(|| Pool::new(10000, 1000));
221
222pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
226 flags: G,
227 f: F,
228) -> R {
229 let prev = PRINT_FLAGS.replace(flags.into());
230 let res = f();
231 PRINT_FLAGS.set(prev);
232 res
233}
234
235#[derive(Debug)]
241pub struct Event<E: UserEvent> {
242 pub init: bool,
243 pub variables: FxHashMap<BindId, Value>,
244 pub netidx: FxHashMap<SubId, subscriber::Event>,
245 pub writes: FxHashMap<Id, WriteRequest>,
246 pub rpc_calls: FxHashMap<BindId, RpcCall>,
247 pub custom: FxHashMap<BindId, Box<dyn CustomBuiltinType>>,
248 pub user: E,
249}
250
251impl<E: UserEvent> Event<E> {
252 pub fn new(user: E) -> Self {
253 Event {
254 init: false,
255 variables: HashMap::default(),
256 netidx: HashMap::default(),
257 writes: HashMap::default(),
258 rpc_calls: HashMap::default(),
259 custom: HashMap::default(),
260 user,
261 }
262 }
263
264 pub fn clear(&mut self) {
265 let Self { init, variables, netidx, rpc_calls, writes, custom, user } = self;
266 *init = false;
267 variables.clear();
268 netidx.clear();
269 rpc_calls.clear();
270 custom.clear();
271 writes.clear();
272 user.clear();
273 }
274}
275
276#[derive(Debug, Clone, Default)]
277pub struct Refs {
278 refed: LPooled<FxHashSet<BindId>>,
279 bound: LPooled<FxHashSet<BindId>>,
280}
281
282impl Refs {
283 pub fn clear(&mut self) {
284 self.refed.clear();
285 self.bound.clear();
286 }
287
288 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
289 for id in &*self.refed {
290 if !self.bound.contains(id) {
291 f(*id);
292 }
293 }
294 }
295}
296
297pub type Node<R, E> = Box<dyn Update<R, E>>;
298
299pub type InitFn<R, E> = sync::Arc<
300 dyn for<'a, 'b, 'c> Fn(
301 &'a Scope,
302 &'b mut ExecCtx<R, E>,
303 &'c mut [Node<R, E>],
304 ExprId,
305 bool,
306 ) -> Result<Box<dyn Apply<R, E>>>
307 + Send
308 + Sync
309 + 'static,
310>;
311
312pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
317 fn update(
318 &mut self,
319 ctx: &mut ExecCtx<R, E>,
320 from: &mut [Node<R, E>],
321 event: &mut Event<E>,
322 ) -> Option<Value>;
323
324 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
327 ()
328 }
329
330 fn typecheck(
333 &mut self,
334 _ctx: &mut ExecCtx<R, E>,
335 _from: &mut [Node<R, E>],
336 ) -> Result<()> {
337 Ok(())
338 }
339
340 fn typ(&self) -> Arc<FnType> {
343 static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
344 Arc::new(FnType {
345 args: Arc::from_iter([]),
346 constraints: Arc::new(RwLock::new(LPooled::take())),
347 rtype: Type::Bottom,
348 throws: Type::Bottom,
349 vargs: None,
350 explicit_throws: false,
351 })
352 });
353 Arc::clone(&*EMPTY)
354 }
355
356 fn refs<'a>(&self, _refs: &mut Refs) {}
360
361 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
364}
365
366pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
370 fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
373
374 fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
376
377 fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
379
380 fn typ(&self) -> &Type;
382
383 fn refs(&self, refs: &mut Refs);
386
387 fn spec(&self) -> &Expr;
389
390 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
392}
393
394pub type BuiltInInitFn<R, E> = for<'a, 'b, 'c> fn(
395 &'a mut ExecCtx<R, E>,
396 &'a FnType,
397 &'b Scope,
398 &'c [Node<R, E>],
399 ExprId,
400) -> Result<Box<dyn Apply<R, E>>>;
401
402pub trait BuiltIn<R: Rt, E: UserEvent> {
406 const NAME: &str;
407 const TYP: LazyLock<FnType>;
408
409 fn init<'a, 'b, 'c>(
410 ctx: &'a mut ExecCtx<R, E>,
411 typ: &'a FnType,
412 scope: &'b Scope,
413 from: &'c [Node<R, E>],
414 top_id: ExprId,
415 ) -> Result<Box<dyn Apply<R, E>>>;
416}
417
418pub trait Abortable {
419 fn abort(&self);
420}
421
422impl Abortable for task::AbortHandle {
423 fn abort(&self) {
424 task::AbortHandle::abort(self)
425 }
426}
427
428pub trait Rt: Debug + Any {
429 type AbortHandle: Abortable;
430
431 fn clear(&mut self);
432
433 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
438
439 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
441
442 fn list(&mut self, id: BindId, path: Path);
447
448 fn list_table(&mut self, id: BindId, path: Path);
451
452 fn stop_list(&mut self, id: BindId);
455
456 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
460
461 fn update(&mut self, id: &Val, value: Value);
463
464 fn unpublish(&mut self, id: Val, ref_by: ExprId);
466
467 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
477 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
478
479 fn set_var(&mut self, id: BindId, value: Value);
490
491 fn notify_set(&mut self, id: BindId);
498
499 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
505
506 fn publish_rpc(
515 &mut self,
516 name: Path,
517 doc: Value,
518 spec: Vec<ArgSpec>,
519 id: BindId,
520 ) -> Result<()>;
521
522 fn unpublish_rpc(&mut self, name: Path);
524
525 fn set_timer(&mut self, id: BindId, timeout: Duration);
529
530 fn spawn<F: Future<Output = (BindId, Box<dyn CustomBuiltinType>)> + Send + 'static>(
538 &mut self,
539 f: F,
540 ) -> Self::AbortHandle;
541
542 fn spawn_var<F: Future<Output = (BindId, Value)> + Send + 'static>(
550 &mut self,
551 f: F,
552 ) -> Self::AbortHandle;
553
554 fn watch(
559 &mut self,
560 s: mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
561 );
562
563 fn watch_var(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>);
568}
569
570#[derive(Default)]
571pub struct LibState(FxHashMap<TypeId, Box<dyn Any + Send + Sync>>);
572
573impl LibState {
574 pub fn get_or_default<T>(&mut self) -> &mut T
580 where
581 T: Default + Any + Send + Sync,
582 {
583 self.0
584 .entry(TypeId::of::<T>())
585 .or_insert_with(|| Box::new(T::default()) as Box<dyn Any + Send + Sync>)
586 .downcast_mut::<T>()
587 .unwrap()
588 }
589
590 pub fn get_or_else<T, F>(&mut self, f: F) -> &mut T
596 where
597 T: Any + Send + Sync,
598 F: FnOnce() -> T,
599 {
600 self.0
601 .entry(TypeId::of::<T>())
602 .or_insert_with(|| Box::new(f()) as Box<dyn Any + Send + Sync>)
603 .downcast_mut::<T>()
604 .unwrap()
605 }
606
607 pub fn entry<'a, T>(
608 &'a mut self,
609 ) -> hash_map::Entry<'a, TypeId, Box<dyn Any + Send + Sync>>
610 where
611 T: Any + Send + Sync,
612 {
613 self.0.entry(TypeId::of::<T>())
614 }
615
616 pub fn contains<T>(&self) -> bool
618 where
619 T: Any + Send + Sync,
620 {
621 self.0.contains_key(&TypeId::of::<T>())
622 }
623
624 pub fn get<T>(&mut self) -> Option<&T>
629 where
630 T: Any + Send + Sync,
631 {
632 self.0.get(&TypeId::of::<T>()).map(|t| t.downcast_ref::<T>().unwrap())
633 }
634
635 pub fn get_mut<T>(&mut self) -> Option<&mut T>
640 where
641 T: Any + Send + Sync,
642 {
643 self.0.get_mut(&TypeId::of::<T>()).map(|t| t.downcast_mut::<T>().unwrap())
644 }
645
646 pub fn set<T>(&mut self, t: T) -> Option<Box<T>>
650 where
651 T: Any + Send + Sync,
652 {
653 self.0
654 .insert(TypeId::of::<T>(), Box::new(t) as Box<dyn Any + Send + Sync>)
655 .map(|t| t.downcast::<T>().unwrap())
656 }
657
658 pub fn remove<T>(&mut self) -> Option<Box<T>>
660 where
661 T: Any + Send + Sync,
662 {
663 self.0.remove(&TypeId::of::<T>()).map(|t| t.downcast::<T>().unwrap())
664 }
665}
666
667#[derive(Default)]
678pub struct AbstractTypeRegistry {
679 by_tid: FxHashMap<TypeId, Uuid>,
680 by_uuid: FxHashMap<Uuid, &'static str>,
681}
682
683impl AbstractTypeRegistry {
684 fn with<V, F: FnMut(&mut AbstractTypeRegistry) -> V>(mut f: F) -> V {
685 static REG: LazyLock<Mutex<AbstractTypeRegistry>> =
686 LazyLock::new(|| Mutex::new(AbstractTypeRegistry::default()));
687 let mut g = REG.lock();
688 f(&mut *g)
689 }
690
691 pub(crate) fn uuid<T: Any>(tag: &'static str) -> Uuid {
693 Self::with(|rg| {
694 *rg.by_tid.entry(TypeId::of::<T>()).or_insert_with(|| {
695 let id = Uuid::new_v4();
696 rg.by_uuid.insert(id, tag);
697 id
698 })
699 })
700 }
701
702 pub fn tag(a: &Abstract) -> Option<&'static str> {
704 Self::with(|rg| rg.by_uuid.get(&a.id()).map(|r| *r))
705 }
706
707 pub fn is_a(a: &Abstract, tag: &str) -> bool {
709 match Self::tag(a) {
710 Some(t) => t == tag,
711 None => false,
712 }
713 }
714}
715
716pub struct ExecCtx<R: Rt, E: UserEvent> {
717 lambdawrap: AbstractWrapper<LambdaDef<R, E>>,
719 builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
721 builtins_allowed: bool,
724 tags: FxHashSet<ArcStr>,
726 pub libstate: LibState,
728 pub env: Env,
730 pub cached: FxHashMap<BindId, Value>,
732 pub rt: R,
734}
735
736impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
737 pub fn clear(&mut self) {
738 self.env.clear();
739 self.rt.clear();
740 }
741
742 pub fn new(user: R) -> Result<Self> {
751 let id = AbstractTypeRegistry::uuid::<LambdaDef<R, E>>("lambda");
752 Ok(Self {
753 lambdawrap: Abstract::register(id)?,
754 env: Env::default(),
755 builtins: FxHashMap::default(),
756 builtins_allowed: true,
757 libstate: LibState::default(),
758 tags: FxHashSet::default(),
759 cached: HashMap::default(),
760 rt: user,
761 })
762 }
763
764 pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
765 match self.builtins.entry(T::NAME) {
766 Entry::Vacant(e) => {
767 e.insert((T::TYP.clone(), T::init));
768 }
769 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
770 }
771 Ok(())
772 }
773
774 pub fn set_var(&mut self, id: BindId, v: Value) {
778 self.cached.insert(id, v.clone());
779 self.rt.set_var(id, v)
780 }
781
782 fn tag(&mut self, s: &ArcStr) -> ArcStr {
783 match self.tags.get(s) {
784 Some(s) => s.clone(),
785 None => {
786 self.tags.insert(s.clone());
787 s.clone()
788 }
789 }
790 }
791
792 pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(&mut self, env: Env, f: F) -> T {
796 let snap = self.env.restore_lexical_env(env);
797 let orig = mem::replace(&mut self.env, snap);
798 let r = f(self);
799 self.env = self.env.restore_lexical_env(orig);
800 r
801 }
802
803 pub fn with_restored_mut<T, F: FnOnce(&mut Self) -> T>(
809 &mut self,
810 env: &mut Env,
811 f: F,
812 ) -> T {
813 let snap = self.env.restore_lexical_env_mut(env);
814 let orig = mem::replace(&mut self.env, snap);
815 let r = f(self);
816 *env = self.env.clone();
817 self.env = self.env.restore_lexical_env(orig);
818 r
819 }
820}
821
822#[derive(Debug, Clone)]
823pub struct Scope {
824 pub lexical: ModPath,
825 pub dynamic: ModPath,
826}
827
828impl Scope {
829 pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
830 Self {
831 lexical: ModPath(self.lexical.append(s)),
832 dynamic: ModPath(self.dynamic.append(s)),
833 }
834 }
835
836 pub fn root() -> Self {
837 Self { lexical: ModPath::root(), dynamic: ModPath::root() }
838 }
839}
840
841pub fn compile<R: Rt, E: UserEvent>(
844 ctx: &mut ExecCtx<R, E>,
845 flags: BitFlags<CFlag>,
846 scope: &Scope,
847 spec: Expr,
848) -> Result<Node<R, E>> {
849 let top_id = spec.id;
850 let env = ctx.env.clone();
851 let st = Instant::now();
852 let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
853 Ok(n) => n,
854 Err(e) => {
855 ctx.env = env;
856 return Err(e);
857 }
858 };
859 info!("compile time {:?}", st.elapsed());
860 let st = Instant::now();
861 if let Err(e) = node.typecheck(ctx) {
862 ctx.env = env;
863 return Err(e);
864 }
865 info!("typecheck time {:?}", st.elapsed());
866 Ok(node)
867}