1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14 env::Env,
15 expr::{ExprId, ModPath},
16 typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use futures::channel::mpsc;
23use fxhash::{FxHashMap, FxHashSet};
24use log::info;
25use netidx::{
26 path::Path,
27 publisher::{Id, Val, WriteRequest},
28 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
29};
30use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
31use node::compiler;
32use parking_lot::RwLock;
33use poolshark::{
34 global::{GPooled, Pool},
35 local::LPooled,
36};
37use std::{
38 any::{Any, TypeId},
39 cell::Cell,
40 collections::{
41 hash_map::{self, Entry},
42 HashMap,
43 },
44 fmt::Debug,
45 mem,
46 sync::{
47 self,
48 atomic::{AtomicBool, Ordering},
49 LazyLock,
50 },
51 time::Duration,
52};
53use tokio::{task, time::Instant};
54use triomphe::Arc;
55
56#[derive(Debug, Clone, Copy)]
57#[bitflags]
58#[repr(u64)]
59pub enum CFlag {
60 WarnUnhandled,
61 WarnUnhandledArith,
62 WarnUnused,
63 WarningsAreErrors,
64}
65
66#[allow(dead_code)]
67static TRACE: AtomicBool = AtomicBool::new(false);
68
69#[allow(dead_code)]
70fn set_trace(b: bool) {
71 TRACE.store(b, Ordering::Relaxed)
72}
73
74#[allow(dead_code)]
75fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
76 let set = if enable {
77 eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
78 let prev = trace();
79 set_trace(true);
80 !prev
81 } else {
82 false
83 };
84 let r = match f() {
85 Err(e) => {
86 eprintln!("traced at {} failed with {e:?}", spec.pos);
87 Err(e)
88 }
89 r => r,
90 };
91 if set {
92 eprintln!("trace disabled at {}", spec.pos);
93 set_trace(false)
94 }
95 r
96}
97
98#[allow(dead_code)]
99fn trace() -> bool {
100 TRACE.load(Ordering::Relaxed)
101}
102
103#[macro_export]
104macro_rules! tdbg {
105 ($e:expr) => {
106 if $crate::trace() {
107 dbg!($e)
108 } else {
109 $e
110 }
111 };
112}
113
114#[macro_export]
115macro_rules! err {
116 ($tag:expr, $err:literal) => {{
117 let e: Value = ($tag.clone(), arcstr::literal!($err)).into();
118 Value::Error(triomphe::Arc::new(e))
119 }};
120}
121
122#[macro_export]
123macro_rules! errf {
124 ($tag:expr, $fmt:expr, $($args:expr),*) => {{
125 let msg: ArcStr = compact_str::format_compact!($fmt, $($args),*).as_str().into();
126 let e: Value = ($tag.clone(), msg).into();
127 Value::Error(triomphe::Arc::new(e))
128 }};
129 ($tag:expr, $fmt:expr) => {{
130 let msg: ArcStr = compact_str::format_compact!($fmt).as_str().into();
131 let e: Value = ($tag.clone(), msg).into();
132 Value::Error(triomphe::Arc::new(e))
133 }};
134}
135
136#[macro_export]
137macro_rules! defetyp {
138 ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
139 static $tag_name: ArcStr = arcstr::literal!($tag);
140 static $name: ::std::sync::LazyLock<$crate::typ::Type> =
141 ::std::sync::LazyLock::new(|| {
142 let scope = $crate::expr::ModPath::root();
143 $crate::expr::parser::parse_type(&format!($typ, $tag))
144 .expect("failed to parse type")
145 .scope_refs(&scope)
146 });
147 };
148}
149
150defetyp!(CAST_ERR, CAST_ERR_TAG, "InvalidCast", "Error<`{}(string)>");
151
152atomic_id!(LambdaId);
153
154impl From<u64> for LambdaId {
155 fn from(v: u64) -> Self {
156 LambdaId(v)
157 }
158}
159
160atomic_id!(BindId);
161
162impl From<u64> for BindId {
163 fn from(v: u64) -> Self {
164 BindId(v)
165 }
166}
167
168impl TryFrom<Value> for BindId {
169 type Error = anyhow::Error;
170
171 fn try_from(value: Value) -> Result<Self> {
172 match value {
173 Value::U64(id) => Ok(BindId(id)),
174 v => bail!("invalid bind id {v}"),
175 }
176 }
177}
178
179pub trait UserEvent: Clone + Debug + Any {
180 fn clear(&mut self);
181}
182
183pub trait CustomBuiltinType: Debug + Any + Send + Sync {}
184
185impl CustomBuiltinType for Value {}
186impl CustomBuiltinType for Option<Value> {}
187
188#[derive(Debug, Clone)]
189pub struct NoUserEvent;
190
191impl UserEvent for NoUserEvent {
192 fn clear(&mut self) {}
193}
194
195#[derive(Debug, Clone, Copy)]
196#[bitflags]
197#[repr(u64)]
198pub enum PrintFlag {
199 DerefTVars,
202 ReplacePrims,
205 NoSource,
207 NoParents,
209}
210
211thread_local! {
212 static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims.into());
213}
214
215pub static CBATCH_POOL: LazyLock<Pool<Vec<(BindId, Box<dyn CustomBuiltinType>)>>> =
217 LazyLock::new(|| Pool::new(10000, 1000));
218
219pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
223 flags: G,
224 f: F,
225) -> R {
226 let prev = PRINT_FLAGS.replace(flags.into());
227 let res = f();
228 PRINT_FLAGS.set(prev);
229 res
230}
231
232#[derive(Debug)]
238pub struct Event<E: UserEvent> {
239 pub init: bool,
240 pub variables: FxHashMap<BindId, Value>,
241 pub netidx: FxHashMap<SubId, subscriber::Event>,
242 pub writes: FxHashMap<Id, WriteRequest>,
243 pub rpc_calls: FxHashMap<BindId, RpcCall>,
244 pub custom: FxHashMap<BindId, Box<dyn CustomBuiltinType>>,
245 pub user: E,
246}
247
248impl<E: UserEvent> Event<E> {
249 pub fn new(user: E) -> Self {
250 Event {
251 init: false,
252 variables: HashMap::default(),
253 netidx: HashMap::default(),
254 writes: HashMap::default(),
255 rpc_calls: HashMap::default(),
256 custom: HashMap::default(),
257 user,
258 }
259 }
260
261 pub fn clear(&mut self) {
262 let Self { init, variables, netidx, rpc_calls, writes, custom, user } = self;
263 *init = false;
264 variables.clear();
265 netidx.clear();
266 rpc_calls.clear();
267 custom.clear();
268 writes.clear();
269 user.clear();
270 }
271}
272
273#[derive(Debug, Clone, Default)]
274pub struct Refs {
275 refed: LPooled<FxHashSet<BindId>>,
276 bound: LPooled<FxHashSet<BindId>>,
277}
278
279impl Refs {
280 pub fn clear(&mut self) {
281 self.refed.clear();
282 self.bound.clear();
283 }
284
285 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
286 for id in &*self.refed {
287 if !self.bound.contains(id) {
288 f(*id);
289 }
290 }
291 }
292}
293
294pub type Node<R, E> = Box<dyn Update<R, E>>;
295
296pub type BuiltInInitFn<R, E> = sync::Arc<
297 dyn for<'a, 'b, 'c> Fn(
298 &'a mut ExecCtx<R, E>,
299 &'a FnType,
300 &'b Scope,
301 &'c [Node<R, E>],
302 ExprId,
303 ) -> Result<Box<dyn Apply<R, E>>>
304 + Send
305 + Sync
306 + 'static,
307>;
308
309pub type InitFn<R, E> = sync::Arc<
310 dyn for<'a, 'b, 'c> Fn(
311 &'a Scope,
312 &'b mut ExecCtx<R, E>,
313 &'c mut [Node<R, E>],
314 ExprId,
315 bool,
316 ) -> Result<Box<dyn Apply<R, E>>>
317 + Send
318 + Sync
319 + 'static,
320>;
321
322pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
327 fn update(
328 &mut self,
329 ctx: &mut ExecCtx<R, E>,
330 from: &mut [Node<R, E>],
331 event: &mut Event<E>,
332 ) -> Option<Value>;
333
334 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
337 ()
338 }
339
340 fn typecheck(
343 &mut self,
344 _ctx: &mut ExecCtx<R, E>,
345 _from: &mut [Node<R, E>],
346 ) -> Result<()> {
347 Ok(())
348 }
349
350 fn typ(&self) -> Arc<FnType> {
353 static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
354 Arc::new(FnType {
355 args: Arc::from_iter([]),
356 constraints: Arc::new(RwLock::new(LPooled::take())),
357 rtype: Type::Bottom,
358 throws: Type::Bottom,
359 vargs: None,
360 explicit_throws: false,
361 })
362 });
363 Arc::clone(&*EMPTY)
364 }
365
366 fn refs<'a>(&self, _refs: &mut Refs) {}
370
371 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
374}
375
376pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
380 fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
383
384 fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
386
387 fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
389
390 fn typ(&self) -> &Type;
392
393 fn refs(&self, refs: &mut Refs);
396
397 fn spec(&self) -> &Expr;
399
400 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
402}
403
404pub trait BuiltIn<R: Rt, E: UserEvent> {
405 const NAME: &str;
406 const TYP: LazyLock<FnType>;
407
408 fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
409}
410
411pub trait Abortable {
412 fn abort(&self);
413}
414
415impl Abortable for task::AbortHandle {
416 fn abort(&self) {
417 task::AbortHandle::abort(self)
418 }
419}
420
421pub trait Rt: Debug + 'static {
422 type AbortHandle: Abortable;
423
424 fn clear(&mut self);
425
426 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
431
432 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
434
435 fn list(&mut self, id: BindId, path: Path);
440
441 fn list_table(&mut self, id: BindId, path: Path);
444
445 fn stop_list(&mut self, id: BindId);
448
449 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
453
454 fn update(&mut self, id: &Val, value: Value);
456
457 fn unpublish(&mut self, id: Val, ref_by: ExprId);
459
460 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
470 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
471
472 fn set_var(&mut self, id: BindId, value: Value);
483
484 fn notify_set(&mut self, id: BindId);
491
492 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
498
499 fn publish_rpc(
508 &mut self,
509 name: Path,
510 doc: Value,
511 spec: Vec<ArgSpec>,
512 id: BindId,
513 ) -> Result<()>;
514
515 fn unpublish_rpc(&mut self, name: Path);
517
518 fn set_timer(&mut self, id: BindId, timeout: Duration);
522
523 fn spawn<F: Future<Output = (BindId, Box<dyn CustomBuiltinType>)> + Send + 'static>(
531 &mut self,
532 f: F,
533 ) -> Self::AbortHandle;
534
535 fn spawn_var<F: Future<Output = (BindId, Value)> + Send + 'static>(
543 &mut self,
544 f: F,
545 ) -> Self::AbortHandle;
546
547 fn watch(
552 &mut self,
553 s: mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
554 );
555
556 fn watch_var(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>);
561}
562
563#[derive(Default)]
564pub struct LibState(FxHashMap<TypeId, Box<dyn Any + Send + Sync>>);
565
566impl LibState {
567 pub fn get_or_default<T>(&mut self) -> &mut T
573 where
574 T: Default + Any + Send + Sync,
575 {
576 self.0
577 .entry(TypeId::of::<T>())
578 .or_insert_with(|| Box::new(T::default()) as Box<dyn Any + Send + Sync>)
579 .downcast_mut::<T>()
580 .unwrap()
581 }
582
583 pub fn get_or_else<T, F>(&mut self, f: F) -> &mut T
589 where
590 T: Any + Send + Sync,
591 F: FnOnce() -> T,
592 {
593 self.0
594 .entry(TypeId::of::<T>())
595 .or_insert_with(|| Box::new(f()) as Box<dyn Any + Send + Sync>)
596 .downcast_mut::<T>()
597 .unwrap()
598 }
599
600 pub fn entry<'a, T>(
601 &'a mut self,
602 ) -> hash_map::Entry<'a, TypeId, Box<dyn Any + Send + Sync>>
603 where
604 T: Any + Send + Sync,
605 {
606 self.0.entry(TypeId::of::<T>())
607 }
608
609 pub fn contains<T>(&self) -> bool
611 where
612 T: Any + Send + Sync,
613 {
614 self.0.contains_key(&TypeId::of::<T>())
615 }
616
617 pub fn get<T>(&mut self) -> Option<&T>
622 where
623 T: Any + Send + Sync,
624 {
625 self.0.get(&TypeId::of::<T>()).map(|t| t.downcast_ref::<T>().unwrap())
626 }
627
628 pub fn get_mut<T>(&mut self) -> Option<&mut T>
633 where
634 T: Any + Send + Sync,
635 {
636 self.0.get_mut(&TypeId::of::<T>()).map(|t| t.downcast_mut::<T>().unwrap())
637 }
638
639 pub fn set<T>(&mut self, t: T) -> Option<Box<T>>
643 where
644 T: Any + Send + Sync,
645 {
646 self.0
647 .insert(TypeId::of::<T>(), Box::new(t) as Box<dyn Any + Send + Sync>)
648 .map(|t| t.downcast::<T>().unwrap())
649 }
650
651 pub fn remove<T>(&mut self) -> Option<Box<T>>
653 where
654 T: Any + Send + Sync,
655 {
656 self.0.remove(&TypeId::of::<T>()).map(|t| t.downcast::<T>().unwrap())
657 }
658}
659
660pub struct ExecCtx<R: Rt, E: UserEvent> {
661 builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
662 builtins_allowed: bool,
663 tags: FxHashSet<ArcStr>,
664 pub libstate: LibState,
666 pub env: Env<R, E>,
668 pub cached: FxHashMap<BindId, Value>,
670 pub rt: R,
672}
673
674impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
675 pub fn clear(&mut self) {
676 self.env.clear();
677 self.rt.clear();
678 }
679
680 pub fn new(user: R) -> Self {
689 Self {
690 env: Env::new(),
691 builtins: FxHashMap::default(),
692 builtins_allowed: true,
693 libstate: LibState::default(),
694 tags: FxHashSet::default(),
695 cached: HashMap::default(),
696 rt: user,
697 }
698 }
699
700 pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
701 let f = T::init(self);
702 match self.builtins.entry(T::NAME) {
703 Entry::Vacant(e) => {
704 e.insert((T::TYP.clone(), f));
705 }
706 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
707 }
708 Ok(())
709 }
710
711 pub fn set_var(&mut self, id: BindId, v: Value) {
715 self.cached.insert(id, v.clone());
716 self.rt.set_var(id, v)
717 }
718
719 fn tag(&mut self, s: &ArcStr) -> ArcStr {
720 match self.tags.get(s) {
721 Some(s) => s.clone(),
722 None => {
723 self.tags.insert(s.clone());
724 s.clone()
725 }
726 }
727 }
728
729 pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
733 &mut self,
734 env: Env<R, E>,
735 f: F,
736 ) -> T {
737 let snap = self.env.restore_lexical_env(env);
738 let orig = mem::replace(&mut self.env, snap);
739 let r = f(self);
740 self.env = self.env.restore_lexical_env(orig);
741 r
742 }
743
744 pub fn with_restored_mut<T, F: FnOnce(&mut Self) -> T>(
750 &mut self,
751 env: &mut Env<R, E>,
752 f: F,
753 ) -> T {
754 let snap = self.env.restore_lexical_env_mut(env);
755 let orig = mem::replace(&mut self.env, snap);
756 let r = f(self);
757 *env = self.env.clone();
758 self.env = self.env.restore_lexical_env(orig);
759 r
760 }
761}
762
763#[derive(Debug, Clone)]
764pub struct Scope {
765 pub lexical: ModPath,
766 pub dynamic: ModPath,
767}
768
769impl Scope {
770 pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
771 Self {
772 lexical: ModPath(self.lexical.append(s)),
773 dynamic: ModPath(self.dynamic.append(s)),
774 }
775 }
776
777 pub fn root() -> Self {
778 Self { lexical: ModPath::root(), dynamic: ModPath::root() }
779 }
780}
781
782pub fn compile<R: Rt, E: UserEvent>(
785 ctx: &mut ExecCtx<R, E>,
786 flags: BitFlags<CFlag>,
787 scope: &Scope,
788 spec: Expr,
789) -> Result<Node<R, E>> {
790 let top_id = spec.id;
791 let env = ctx.env.clone();
792 let st = Instant::now();
793 let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
794 Ok(n) => n,
795 Err(e) => {
796 ctx.env = env;
797 return Err(e);
798 }
799 };
800 info!("compile time {:?}", st.elapsed());
801 let st = Instant::now();
802 if let Err(e) = node.typecheck(ctx) {
803 ctx.env = env;
804 return Err(e);
805 }
806 info!("typecheck time {:?}", st.elapsed());
807 Ok(node)
808}