1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14 env::Env,
15 expr::{ExprId, ModPath},
16 typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use futures::channel::mpsc;
23use fxhash::{FxHashMap, FxHashSet};
24use log::info;
25use netidx::{
26 path::Path,
27 publisher::{Id, Val, WriteRequest},
28 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
29};
30use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
31use node::compiler;
32use parking_lot::RwLock;
33use poolshark::{global::GPooled, local::LPooled};
34use std::{
35 any::{Any, TypeId},
36 cell::Cell,
37 collections::{hash_map::Entry, HashMap},
38 fmt::Debug,
39 mem,
40 sync::{
41 self,
42 atomic::{AtomicBool, Ordering},
43 LazyLock,
44 },
45 time::Duration,
46};
47use tokio::{task, time::Instant};
48use triomphe::Arc;
49
50#[derive(Debug, Clone, Copy)]
51#[bitflags]
52#[repr(u64)]
53pub enum CFlag {
54 WarnUnhandled,
55 WarnUnhandledArith,
56 WarnUnused,
57 WarningsAreErrors,
58}
59
60#[allow(dead_code)]
61static TRACE: AtomicBool = AtomicBool::new(false);
62
63#[allow(dead_code)]
64fn set_trace(b: bool) {
65 TRACE.store(b, Ordering::Relaxed)
66}
67
68#[allow(dead_code)]
69fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
70 let set = if enable {
71 eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
72 let prev = trace();
73 set_trace(true);
74 !prev
75 } else {
76 false
77 };
78 let r = match f() {
79 Err(e) => {
80 eprintln!("traced at {} failed with {e:?}", spec.pos);
81 Err(e)
82 }
83 r => r,
84 };
85 if set {
86 eprintln!("trace disabled at {}", spec.pos);
87 set_trace(false)
88 }
89 r
90}
91
92#[allow(dead_code)]
93fn trace() -> bool {
94 TRACE.load(Ordering::Relaxed)
95}
96
97#[macro_export]
98macro_rules! tdbg {
99 ($e:expr) => {
100 if $crate::trace() {
101 dbg!($e)
102 } else {
103 $e
104 }
105 };
106}
107
108#[macro_export]
109macro_rules! err {
110 ($tag:expr, $err:literal) => {{
111 let e: Value = ($tag.clone(), literal!($err)).into();
112 Value::Error(triomphe::Arc::new(e))
113 }};
114}
115
116#[macro_export]
117macro_rules! errf {
118 ($tag:expr, $fmt:expr, $args:tt) => {{
119 let msg: ArcStr = compact_str::format_compact!($fmt, $args).as_str().into();
120 let e: Value = ($tag.clone(), msg).into();
121 Value::Error(triomphe::Arc::new(e))
122 }};
123 ($tag:expr, $fmt:expr) => {{
124 let msg: ArcStr = compact_str::format_compact!($fmt).as_str().into();
125 let e: Value = ($tag.clone(), msg).into();
126 Value::Error(triomphe::Arc::new(e))
127 }};
128}
129
130#[macro_export]
131macro_rules! defetyp {
132 ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
133 static $tag_name: ArcStr = arcstr::literal!($tag);
134 static $name: ::std::sync::LazyLock<$crate::typ::Type> =
135 ::std::sync::LazyLock::new(|| {
136 let scope = $crate::expr::ModPath::root();
137 $crate::expr::parser::parse_type(&format!($typ, $tag))
138 .expect("failed to parse type")
139 .scope_refs(&scope)
140 });
141 };
142}
143
144defetyp!(CAST_ERR, CAST_ERR_TAG, "InvalidCast", "Error<`{}(string)>");
145
146atomic_id!(LambdaId);
147
148impl From<u64> for LambdaId {
149 fn from(v: u64) -> Self {
150 LambdaId(v)
151 }
152}
153
154atomic_id!(BindId);
155
156impl From<u64> for BindId {
157 fn from(v: u64) -> Self {
158 BindId(v)
159 }
160}
161
162impl TryFrom<Value> for BindId {
163 type Error = anyhow::Error;
164
165 fn try_from(value: Value) -> Result<Self> {
166 match value {
167 Value::U64(id) => Ok(BindId(id)),
168 v => bail!("invalid bind id {v}"),
169 }
170 }
171}
172
173pub trait UserEvent: Clone + Debug + Any {
174 fn clear(&mut self);
175}
176
177#[derive(Debug, Clone)]
178pub struct NoUserEvent;
179
180impl UserEvent for NoUserEvent {
181 fn clear(&mut self) {}
182}
183
184#[derive(Debug, Clone, Copy)]
185#[bitflags]
186#[repr(u64)]
187pub enum PrintFlag {
188 DerefTVars,
191 ReplacePrims,
194 NoSource,
196 NoParents,
198}
199
200thread_local! {
201 static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims | PrintFlag::NoSource);
202}
203
204pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
208 flags: G,
209 f: F,
210) -> R {
211 let prev = PRINT_FLAGS.replace(flags.into());
212 let res = f();
213 PRINT_FLAGS.set(prev);
214 res
215}
216
217#[derive(Debug)]
223pub struct Event<E: UserEvent> {
224 pub init: bool,
225 pub variables: FxHashMap<BindId, Value>,
226 pub netidx: FxHashMap<SubId, subscriber::Event>,
227 pub writes: FxHashMap<Id, WriteRequest>,
228 pub rpc_calls: FxHashMap<BindId, RpcCall>,
229 pub user: E,
230}
231
232impl<E: UserEvent> Event<E> {
233 pub fn new(user: E) -> Self {
234 Event {
235 init: false,
236 variables: HashMap::default(),
237 netidx: HashMap::default(),
238 writes: HashMap::default(),
239 rpc_calls: HashMap::default(),
240 user,
241 }
242 }
243
244 pub fn clear(&mut self) {
245 let Self { init, variables, netidx, rpc_calls, writes, user } = self;
246 *init = false;
247 variables.clear();
248 netidx.clear();
249 rpc_calls.clear();
250 writes.clear();
251 user.clear();
252 }
253}
254
255#[derive(Debug, Clone, Default)]
256pub struct Refs {
257 refed: LPooled<FxHashSet<BindId>>,
258 bound: LPooled<FxHashSet<BindId>>,
259}
260
261impl Refs {
262 pub fn clear(&mut self) {
263 self.refed.clear();
264 self.bound.clear();
265 }
266
267 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
268 for id in &*self.refed {
269 if !self.bound.contains(id) {
270 f(*id);
271 }
272 }
273 }
274}
275
276pub type Node<R, E> = Box<dyn Update<R, E>>;
277
278pub type BuiltInInitFn<R, E> = sync::Arc<
279 dyn for<'a, 'b, 'c> Fn(
280 &'a mut ExecCtx<R, E>,
281 &'a FnType,
282 &'b Scope,
283 &'c [Node<R, E>],
284 ExprId,
285 ) -> Result<Box<dyn Apply<R, E>>>
286 + Send
287 + Sync
288 + 'static,
289>;
290
291pub type InitFn<R, E> = sync::Arc<
292 dyn for<'a, 'b, 'c> Fn(
293 &'a Scope,
294 &'b mut ExecCtx<R, E>,
295 &'c mut [Node<R, E>],
296 ExprId,
297 bool,
298 ) -> Result<Box<dyn Apply<R, E>>>
299 + Send
300 + Sync
301 + 'static,
302>;
303
304pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
309 fn update(
310 &mut self,
311 ctx: &mut ExecCtx<R, E>,
312 from: &mut [Node<R, E>],
313 event: &mut Event<E>,
314 ) -> Option<Value>;
315
316 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
319 ()
320 }
321
322 fn typecheck(
325 &mut self,
326 _ctx: &mut ExecCtx<R, E>,
327 _from: &mut [Node<R, E>],
328 ) -> Result<()> {
329 Ok(())
330 }
331
332 fn typ(&self) -> Arc<FnType> {
335 static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
336 Arc::new(FnType {
337 args: Arc::from_iter([]),
338 constraints: Arc::new(RwLock::new(LPooled::take())),
339 rtype: Type::Bottom,
340 throws: Type::Bottom,
341 vargs: None,
342 })
343 });
344 Arc::clone(&*EMPTY)
345 }
346
347 fn refs<'a>(&self, _refs: &mut Refs) {}
351
352 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
355}
356
357pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
361 fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
364
365 fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
367
368 fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
370
371 fn typ(&self) -> &Type;
373
374 fn refs(&self, refs: &mut Refs);
377
378 fn spec(&self) -> &Expr;
380
381 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
383}
384
385pub trait BuiltIn<R: Rt, E: UserEvent> {
386 const NAME: &str;
387 const TYP: LazyLock<FnType>;
388
389 fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
390}
391
392pub trait Abortable {
393 fn abort(&self);
394}
395
396impl Abortable for task::AbortHandle {
397 fn abort(&self) {
398 task::AbortHandle::abort(self)
399 }
400}
401
402pub trait Rt: Debug + 'static {
403 type AbortHandle: Abortable;
404
405 fn clear(&mut self);
406
407 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
412
413 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
415
416 fn list(&mut self, id: BindId, path: Path);
421
422 fn list_table(&mut self, id: BindId, path: Path);
425
426 fn stop_list(&mut self, id: BindId);
429
430 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
434
435 fn update(&mut self, id: &Val, value: Value);
437
438 fn unpublish(&mut self, id: Val, ref_by: ExprId);
440
441 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
451 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
452
453 fn set_var(&mut self, id: BindId, value: Value);
464
465 fn notify_set(&mut self, id: BindId);
472
473 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
479
480 fn publish_rpc(
489 &mut self,
490 name: Path,
491 doc: Value,
492 spec: Vec<ArgSpec>,
493 id: BindId,
494 ) -> Result<()>;
495
496 fn unpublish_rpc(&mut self, name: Path);
498
499 fn set_timer(&mut self, id: BindId, timeout: Duration);
503
504 fn spawn<F: Future<Output = (BindId, Value)> + Send + 'static>(
512 &mut self,
513 f: F,
514 ) -> Self::AbortHandle;
515
516 fn watch(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>);
521}
522
523#[derive(Default)]
524pub struct LibState(FxHashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>);
525
526impl LibState {
527 pub fn get_or_default<T>(&mut self) -> &mut T
533 where
534 T: Default + Any + Send + Sync + 'static,
535 {
536 self.0
537 .entry(TypeId::of::<T>())
538 .or_insert_with(|| {
539 Box::new(T::default()) as Box<dyn Any + Send + Sync + 'static>
540 })
541 .downcast_mut::<T>()
542 .unwrap()
543 }
544
545 pub fn get_or_else<T, F>(&mut self, f: F) -> &mut T
551 where
552 T: Any + Send + Sync + 'static,
553 F: FnOnce() -> T,
554 {
555 self.0
556 .entry(TypeId::of::<T>())
557 .or_insert_with(|| Box::new(f()) as Box<dyn Any + Send + Sync + 'static>)
558 .downcast_mut::<T>()
559 .unwrap()
560 }
561
562 pub fn get<T>(&mut self) -> Option<&T>
567 where
568 T: Any + Send + Sync + 'static,
569 {
570 self.0.get(&TypeId::of::<T>()).map(|t| t.downcast_ref::<T>().unwrap())
571 }
572
573 pub fn get_mut<T>(&mut self) -> Option<&mut T>
578 where
579 T: Any + Send + Sync + 'static,
580 {
581 self.0.get_mut(&TypeId::of::<T>()).map(|t| t.downcast_mut::<T>().unwrap())
582 }
583
584 pub fn set<T>(&mut self, t: T) -> Option<Box<T>>
588 where
589 T: Any + Send + Sync + 'static,
590 {
591 self.0
592 .insert(
593 TypeId::of::<T>(),
594 Box::new(t) as Box<dyn Any + Send + Sync + 'static>,
595 )
596 .map(|t| t.downcast::<T>().unwrap())
597 }
598
599 pub fn remove<T>(&mut self) -> Option<Box<T>>
601 where
602 T: Any + Send + Sync + 'static,
603 {
604 self.0.remove(&TypeId::of::<T>()).map(|t| t.downcast::<T>().unwrap())
605 }
606}
607
608pub struct ExecCtx<R: Rt, E: UserEvent> {
609 builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
610 builtins_allowed: bool,
611 tags: FxHashSet<ArcStr>,
612 pub libstate: LibState,
614 pub env: Env<R, E>,
616 pub cached: FxHashMap<BindId, Value>,
618 pub rt: R,
620}
621
622impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
623 pub fn clear(&mut self) {
624 self.env.clear();
625 self.rt.clear();
626 }
627
628 pub fn new(user: R) -> Self {
637 Self {
638 env: Env::new(),
639 builtins: FxHashMap::default(),
640 builtins_allowed: true,
641 libstate: LibState::default(),
642 tags: FxHashSet::default(),
643 cached: HashMap::default(),
644 rt: user,
645 }
646 }
647
648 pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
649 let f = T::init(self);
650 match self.builtins.entry(T::NAME) {
651 Entry::Vacant(e) => {
652 e.insert((T::TYP.clone(), f));
653 }
654 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
655 }
656 Ok(())
657 }
658
659 pub fn set_var(&mut self, id: BindId, v: Value) {
663 self.cached.insert(id, v.clone());
664 self.rt.set_var(id, v)
665 }
666
667 fn tag(&mut self, s: &ArcStr) -> ArcStr {
668 match self.tags.get(s) {
669 Some(s) => s.clone(),
670 None => {
671 self.tags.insert(s.clone());
672 s.clone()
673 }
674 }
675 }
676
677 pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
682 &mut self,
683 env: Env<R, E>,
684 f: F,
685 ) -> T {
686 let snap = self.env.restore_lexical_env(env);
687 let orig = mem::replace(&mut self.env, snap);
688 let r = f(self);
689 self.env = self.env.restore_lexical_env(orig);
690 r
691 }
692}
693
694#[derive(Debug, Clone)]
695pub struct Scope {
696 pub lexical: ModPath,
697 pub dynamic: ModPath,
698}
699
700impl Scope {
701 pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
702 Self {
703 lexical: ModPath(self.lexical.append(s)),
704 dynamic: ModPath(self.dynamic.append(s)),
705 }
706 }
707
708 pub fn root() -> Self {
709 Self { lexical: ModPath::root(), dynamic: ModPath::root() }
710 }
711}
712
713pub fn compile<R: Rt, E: UserEvent>(
716 ctx: &mut ExecCtx<R, E>,
717 flags: BitFlags<CFlag>,
718 scope: &Scope,
719 spec: Expr,
720) -> Result<Node<R, E>> {
721 let top_id = spec.id;
722 let env = ctx.env.clone();
723 let st = Instant::now();
724 let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
725 Ok(n) => n,
726 Err(e) => {
727 ctx.env = env;
728 return Err(e);
729 }
730 };
731 info!("compile time {:?}", st.elapsed());
732 let st = Instant::now();
733 if let Err(e) = node.typecheck(ctx) {
734 ctx.env = env;
735 return Err(e);
736 }
737 info!("typecheck time {:?}", st.elapsed());
738 Ok(node)
739}