1#![doc(
2 html_logo_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg",
3 html_favicon_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg"
4)]
5#[macro_use]
6extern crate netidx_core;
7#[macro_use]
8extern crate combine;
9#[macro_use]
10extern crate serde_derive;
11
12pub mod env;
13pub mod expr;
14pub mod node;
15pub mod typ;
16
17use crate::{
18 env::Env,
19 expr::{ExprId, ModPath},
20 node::lambda::LambdaDef,
21 typ::{FnType, Type},
22};
23use anyhow::{bail, Result};
24use arcstr::ArcStr;
25use enumflags2::{bitflags, BitFlags};
26use expr::Expr;
27use futures::channel::mpsc;
28use fxhash::{FxHashMap, FxHashSet};
29use log::info;
30use netidx::{
31 path::Path,
32 publisher::{Id, Val, WriteRequest},
33 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
34};
35use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
36use netidx_value::{abstract_type::AbstractWrapper, Abstract};
37use node::compiler;
38use parking_lot::{Mutex, RwLock};
39use poolshark::{
40 global::{GPooled, Pool},
41 local::LPooled,
42};
43use std::{
44 any::{Any, TypeId},
45 cell::Cell,
46 collections::{
47 hash_map::{self, Entry},
48 HashMap,
49 },
50 fmt::Debug,
51 mem,
52 sync::{
53 self,
54 atomic::{AtomicBool, Ordering},
55 LazyLock,
56 },
57 time::Duration,
58};
59use tokio::{task, time::Instant};
60use triomphe::Arc;
61use uuid::Uuid;
62
63#[derive(Debug, Clone, Copy)]
64#[bitflags]
65#[repr(u64)]
66pub enum CFlag {
67 WarnUnhandled,
68 WarnUnused,
69 WarningsAreErrors,
70}
71
72#[allow(dead_code)]
73static TRACE: AtomicBool = AtomicBool::new(false);
74
75#[allow(dead_code)]
76pub fn set_trace(b: bool) {
77 TRACE.store(b, Ordering::Relaxed)
78}
79
80#[allow(dead_code)]
81pub fn with_trace<F: FnOnce() -> Result<R>, R>(
82 enable: bool,
83 spec: &Expr,
84 f: F,
85) -> Result<R> {
86 let prev = trace();
87 set_trace(enable);
88 if !prev && enable {
89 eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
90 } else if prev && !enable {
91 eprintln!("trace disabled at {}, spec: {}", spec.pos, spec);
92 }
93 let r = match f() {
94 Err(e) => {
95 eprintln!("traced at {} failed with {e:?}", spec.pos);
96 Err(e)
97 }
98 r => r,
99 };
100 if prev && !enable {
101 eprintln!("trace reenabled")
102 }
103 set_trace(prev);
104 r
105}
106
107#[allow(dead_code)]
108pub fn trace() -> bool {
109 TRACE.load(Ordering::Relaxed)
110}
111
112#[macro_export]
113macro_rules! tdbg {
114 ($e:expr) => {
115 if $crate::trace() {
116 dbg!($e)
117 } else {
118 $e
119 }
120 };
121}
122
123#[macro_export]
124macro_rules! err {
125 ($tag:expr, $err:literal) => {{
126 let e: Value = ($tag.clone(), ::arcstr::literal!($err)).into();
127 Value::Error(::triomphe::Arc::new(e))
128 }};
129}
130
131#[macro_export]
132macro_rules! errf {
133 ($tag:expr, $fmt:expr, $($args:expr),*) => {{
134 let msg: ArcStr = ::compact_str::format_compact!($fmt, $($args),*).as_str().into();
135 let e: Value = ($tag.clone(), msg).into();
136 Value::Error(::triomphe::Arc::new(e))
137 }};
138 ($tag:expr, $fmt:expr) => {{
139 let msg: ArcStr = ::compact_str::format_compact!($fmt).as_str().into();
140 let e: Value = ($tag.clone(), msg).into();
141 Value::Error(::triomphe::Arc::new(e))
142 }};
143}
144
145#[macro_export]
146macro_rules! defetyp {
147 ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
148 static $tag_name: ArcStr = ::arcstr::literal!($tag);
149 static $name: ::std::sync::LazyLock<$crate::typ::Type> =
150 ::std::sync::LazyLock::new(|| {
151 let scope = $crate::expr::ModPath::root();
152 $crate::expr::parser::parse_type(&format!($typ, $tag))
153 .expect("failed to parse type")
154 .scope_refs(&scope)
155 });
156 };
157}
158
159defetyp!(CAST_ERR, CAST_ERR_TAG, "InvalidCast", "Error<`{}(string)>");
160
161atomic_id!(LambdaId);
162
163impl From<u64> for LambdaId {
164 fn from(v: u64) -> Self {
165 LambdaId(v)
166 }
167}
168
169atomic_id!(BindId);
170
171impl From<u64> for BindId {
172 fn from(v: u64) -> Self {
173 BindId(v)
174 }
175}
176
177impl TryFrom<Value> for BindId {
178 type Error = anyhow::Error;
179
180 fn try_from(value: Value) -> Result<Self> {
181 match value {
182 Value::U64(id) => Ok(BindId(id)),
183 v => bail!("invalid bind id {v}"),
184 }
185 }
186}
187
188pub trait UserEvent: Clone + Debug + Any {
189 fn clear(&mut self);
190}
191
192pub trait CustomBuiltinType: Debug + Any + Send + Sync {}
193
194impl CustomBuiltinType for Value {}
195impl CustomBuiltinType for Option<Value> {}
196
197#[derive(Debug, Clone)]
198pub struct NoUserEvent;
199
200impl UserEvent for NoUserEvent {
201 fn clear(&mut self) {}
202}
203
204#[derive(Debug, Clone, Copy)]
205#[bitflags]
206#[repr(u64)]
207pub enum PrintFlag {
208 DerefTVars,
211 ReplacePrims,
214 NoSource,
216 NoParents,
218}
219
220thread_local! {
221 static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims.into());
222}
223
224pub static CBATCH_POOL: LazyLock<Pool<Vec<(BindId, Box<dyn CustomBuiltinType>)>>> =
226 LazyLock::new(|| Pool::new(10000, 1000));
227
228pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
232 flags: G,
233 f: F,
234) -> R {
235 let prev = PRINT_FLAGS.replace(flags.into());
236 let res = f();
237 PRINT_FLAGS.set(prev);
238 res
239}
240
241#[derive(Debug)]
247pub struct Event<E: UserEvent> {
248 pub init: bool,
249 pub variables: FxHashMap<BindId, Value>,
250 pub netidx: FxHashMap<SubId, subscriber::Event>,
251 pub writes: FxHashMap<Id, WriteRequest>,
252 pub rpc_calls: FxHashMap<BindId, RpcCall>,
253 pub custom: FxHashMap<BindId, Box<dyn CustomBuiltinType>>,
254 pub user: E,
255}
256
257impl<E: UserEvent> Event<E> {
258 pub fn new(user: E) -> Self {
259 Event {
260 init: false,
261 variables: HashMap::default(),
262 netidx: HashMap::default(),
263 writes: HashMap::default(),
264 rpc_calls: HashMap::default(),
265 custom: HashMap::default(),
266 user,
267 }
268 }
269
270 pub fn clear(&mut self) {
271 let Self { init, variables, netidx, rpc_calls, writes, custom, user } = self;
272 *init = false;
273 variables.clear();
274 netidx.clear();
275 rpc_calls.clear();
276 custom.clear();
277 writes.clear();
278 user.clear();
279 }
280}
281
282#[derive(Debug, Clone, Default)]
283pub struct Refs {
284 refed: LPooled<FxHashSet<BindId>>,
285 bound: LPooled<FxHashSet<BindId>>,
286}
287
288impl Refs {
289 pub fn clear(&mut self) {
290 self.refed.clear();
291 self.bound.clear();
292 }
293
294 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
295 for id in &*self.refed {
296 if !self.bound.contains(id) {
297 f(*id);
298 }
299 }
300 }
301}
302
303pub type Node<R, E> = Box<dyn Update<R, E>>;
304
305#[derive(Debug)]
307pub enum TypecheckPhase<'a> {
308 Lambda,
310 CallSite(&'a FnType),
312}
313
314pub type InitFn<R, E> = sync::Arc<
315 dyn for<'a, 'b, 'c, 'd> Fn(
316 &'a Scope,
317 &'b mut ExecCtx<R, E>,
318 &'c mut [Node<R, E>],
319 Option<&'d FnType>,
320 ExprId,
321 ) -> Result<Box<dyn Apply<R, E>>>
322 + Send
323 + Sync
324 + 'static,
325>;
326
327pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
332 fn update(
333 &mut self,
334 ctx: &mut ExecCtx<R, E>,
335 from: &mut [Node<R, E>],
336 event: &mut Event<E>,
337 ) -> Option<Value>;
338
339 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
342 ()
343 }
344
345 fn typecheck(
350 &mut self,
351 _ctx: &mut ExecCtx<R, E>,
352 _from: &mut [Node<R, E>],
353 _phase: TypecheckPhase<'_>,
354 ) -> Result<()> {
355 Ok(())
356 }
357
358 fn typ(&self) -> Arc<FnType> {
361 static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
362 Arc::new(FnType {
363 args: Arc::from_iter([]),
364 constraints: Arc::new(RwLock::new(LPooled::take())),
365 rtype: Type::Bottom,
366 throws: Type::Bottom,
367 vargs: None,
368 explicit_throws: false,
369 ..Default::default()
370 })
371 });
372 Arc::clone(&*EMPTY)
373 }
374
375 fn refs<'a>(&self, _refs: &mut Refs) {}
379
380 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
383}
384
385pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
389 fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
392
393 fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
395
396 fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
398
399 fn typ(&self) -> &Type;
401
402 fn refs(&self, refs: &mut Refs);
405
406 fn spec(&self) -> &Expr;
408
409 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
411}
412
413pub type BuiltInInitFn<R, E> = for<'a, 'b, 'c, 'd> fn(
414 &'a mut ExecCtx<R, E>,
415 &'a FnType,
416 Option<&'d FnType>,
417 &'b Scope,
418 &'c [Node<R, E>],
419 ExprId,
420) -> Result<Box<dyn Apply<R, E>>>;
421
422pub trait BuiltIn<R: Rt, E: UserEvent> {
426 const NAME: &str;
427 const NEEDS_CALLSITE: bool;
428
429 fn init<'a, 'b, 'c, 'd>(
430 ctx: &'a mut ExecCtx<R, E>,
431 typ: &'a FnType,
432 resolved_type: Option<&'d FnType>,
433 scope: &'b Scope,
434 from: &'c [Node<R, E>],
435 top_id: ExprId,
436 ) -> Result<Box<dyn Apply<R, E>>>;
437}
438
439pub trait Abortable {
440 fn abort(&self);
441}
442
443impl Abortable for task::AbortHandle {
444 fn abort(&self) {
445 task::AbortHandle::abort(self)
446 }
447}
448
449pub trait Rt: Debug + Any {
450 type AbortHandle: Abortable;
451
452 fn clear(&mut self);
453
454 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
459
460 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
462
463 fn list(&mut self, id: BindId, path: Path);
468
469 fn list_table(&mut self, id: BindId, path: Path);
472
473 fn stop_list(&mut self, id: BindId);
476
477 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
481
482 fn update(&mut self, id: &Val, value: Value);
484
485 fn unpublish(&mut self, id: Val, ref_by: ExprId);
487
488 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
498 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
499
500 fn set_var(&mut self, id: BindId, value: Value);
511
512 fn notify_set(&mut self, id: BindId);
519
520 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
526
527 fn publish_rpc(
536 &mut self,
537 name: Path,
538 doc: Value,
539 spec: Vec<ArgSpec>,
540 id: BindId,
541 ) -> Result<()>;
542
543 fn unpublish_rpc(&mut self, name: Path);
545
546 fn set_timer(&mut self, id: BindId, timeout: Duration);
550
551 fn spawn<F: Future<Output = (BindId, Box<dyn CustomBuiltinType>)> + Send + 'static>(
559 &mut self,
560 f: F,
561 ) -> Self::AbortHandle;
562
563 fn spawn_var<F: Future<Output = (BindId, Value)> + Send + 'static>(
571 &mut self,
572 f: F,
573 ) -> Self::AbortHandle;
574
575 fn watch(
580 &mut self,
581 s: mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
582 );
583
584 fn watch_var(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>);
589}
590
591#[derive(Default)]
592pub struct LibState(FxHashMap<TypeId, Box<dyn Any + Send + Sync>>);
593
594impl LibState {
595 pub fn get_or_default<T>(&mut self) -> &mut T
601 where
602 T: Default + Any + Send + Sync,
603 {
604 self.0
605 .entry(TypeId::of::<T>())
606 .or_insert_with(|| Box::new(T::default()) as Box<dyn Any + Send + Sync>)
607 .downcast_mut::<T>()
608 .unwrap()
609 }
610
611 pub fn get_or_else<T, F>(&mut self, f: F) -> &mut T
617 where
618 T: Any + Send + Sync,
619 F: FnOnce() -> T,
620 {
621 self.0
622 .entry(TypeId::of::<T>())
623 .or_insert_with(|| Box::new(f()) as Box<dyn Any + Send + Sync>)
624 .downcast_mut::<T>()
625 .unwrap()
626 }
627
628 pub fn entry<'a, T>(
629 &'a mut self,
630 ) -> hash_map::Entry<'a, TypeId, Box<dyn Any + Send + Sync>>
631 where
632 T: Any + Send + Sync,
633 {
634 self.0.entry(TypeId::of::<T>())
635 }
636
637 pub fn contains<T>(&self) -> bool
639 where
640 T: Any + Send + Sync,
641 {
642 self.0.contains_key(&TypeId::of::<T>())
643 }
644
645 pub fn get<T>(&mut self) -> Option<&T>
650 where
651 T: Any + Send + Sync,
652 {
653 self.0.get(&TypeId::of::<T>()).map(|t| t.downcast_ref::<T>().unwrap())
654 }
655
656 pub fn get_mut<T>(&mut self) -> Option<&mut T>
661 where
662 T: Any + Send + Sync,
663 {
664 self.0.get_mut(&TypeId::of::<T>()).map(|t| t.downcast_mut::<T>().unwrap())
665 }
666
667 pub fn set<T>(&mut self, t: T) -> Option<Box<T>>
671 where
672 T: Any + Send + Sync,
673 {
674 self.0
675 .insert(TypeId::of::<T>(), Box::new(t) as Box<dyn Any + Send + Sync>)
676 .map(|t| t.downcast::<T>().unwrap())
677 }
678
679 pub fn remove<T>(&mut self) -> Option<Box<T>>
681 where
682 T: Any + Send + Sync,
683 {
684 self.0.remove(&TypeId::of::<T>()).map(|t| t.downcast::<T>().unwrap())
685 }
686}
687
688#[derive(Default)]
699pub struct AbstractTypeRegistry {
700 by_tid: FxHashMap<TypeId, Uuid>,
701 by_uuid: FxHashMap<Uuid, &'static str>,
702}
703
704impl AbstractTypeRegistry {
705 fn with<V, F: FnMut(&mut AbstractTypeRegistry) -> V>(mut f: F) -> V {
706 static REG: LazyLock<Mutex<AbstractTypeRegistry>> =
707 LazyLock::new(|| Mutex::new(AbstractTypeRegistry::default()));
708 let mut g = REG.lock();
709 f(&mut *g)
710 }
711
712 pub(crate) fn uuid<T: Any>(tag: &'static str) -> Uuid {
714 Self::with(|rg| {
715 *rg.by_tid.entry(TypeId::of::<T>()).or_insert_with(|| {
716 let id = Uuid::new_v4();
717 rg.by_uuid.insert(id, tag);
718 id
719 })
720 })
721 }
722
723 pub fn tag(a: &Abstract) -> Option<&'static str> {
725 Self::with(|rg| rg.by_uuid.get(&a.id()).map(|r| *r))
726 }
727
728 pub fn is_a(a: &Abstract, tag: &str) -> bool {
730 match Self::tag(a) {
731 Some(t) => t == tag,
732 None => false,
733 }
734 }
735}
736
737pub struct ExecCtx<R: Rt, E: UserEvent> {
738 lambdawrap: AbstractWrapper<LambdaDef<R, E>>,
740 builtins: FxHashMap<&'static str, (BuiltInInitFn<R, E>, bool)>,
742 builtins_allowed: bool,
745 tags: FxHashSet<ArcStr>,
747 pub libstate: LibState,
749 pub env: Env,
751 pub cached: FxHashMap<BindId, Value>,
753 pub rt: R,
755 pub lambda_defs: FxHashMap<LambdaId, Value>,
757 pub deferred_checks:
759 Vec<Box<dyn FnOnce(&mut ExecCtx<R, E>) -> Result<()> + Send + Sync>>,
760}
761
762impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
763 pub fn clear(&mut self) {
764 self.env.clear();
765 self.rt.clear();
766 }
767
768 pub fn new(user: R) -> Result<Self> {
777 let id = AbstractTypeRegistry::uuid::<LambdaDef<R, E>>("lambda");
778 Ok(Self {
779 lambdawrap: Abstract::register(id)?,
780 env: Env::default(),
781 builtins: FxHashMap::default(),
782 builtins_allowed: true,
783 libstate: LibState::default(),
784 tags: FxHashSet::default(),
785 cached: HashMap::default(),
786 rt: user,
787 lambda_defs: FxHashMap::default(),
788 deferred_checks: Vec::new(),
789 })
790 }
791
792 pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
793 match self.builtins.entry(T::NAME) {
794 Entry::Vacant(e) => {
795 e.insert((T::init, T::NEEDS_CALLSITE));
796 }
797 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
798 }
799 Ok(())
800 }
801
802 pub fn set_var(&mut self, id: BindId, v: Value) {
806 self.cached.insert(id, v.clone());
807 self.rt.set_var(id, v)
808 }
809
810 fn tag(&mut self, s: &ArcStr) -> ArcStr {
811 match self.tags.get(s) {
812 Some(s) => s.clone(),
813 None => {
814 self.tags.insert(s.clone());
815 s.clone()
816 }
817 }
818 }
819
820 pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(&mut self, env: Env, f: F) -> T {
824 let snap = self.env.restore_lexical_env(env);
825 let orig = mem::replace(&mut self.env, snap);
826 let r = f(self);
827 self.env = self.env.restore_lexical_env(orig);
828 r
829 }
830
831 pub fn with_restored_mut<T, F: FnOnce(&mut Self) -> T>(
837 &mut self,
838 env: &mut Env,
839 f: F,
840 ) -> T {
841 let snap = self.env.restore_lexical_env_mut(env);
842 let orig = mem::replace(&mut self.env, snap);
843 let r = f(self);
844 *env = self.env.clone();
845 self.env = self.env.restore_lexical_env(orig);
846 r
847 }
848}
849
850#[derive(Debug, Clone)]
851pub struct Scope {
852 pub lexical: ModPath,
853 pub dynamic: ModPath,
854}
855
856impl Scope {
857 pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
858 Self {
859 lexical: ModPath(self.lexical.append(s)),
860 dynamic: ModPath(self.dynamic.append(s)),
861 }
862 }
863
864 pub fn root() -> Self {
865 Self { lexical: ModPath::root(), dynamic: ModPath::root() }
866 }
867}
868
869pub fn compile<R: Rt, E: UserEvent>(
872 ctx: &mut ExecCtx<R, E>,
873 flags: BitFlags<CFlag>,
874 scope: &Scope,
875 spec: Expr,
876) -> Result<Node<R, E>> {
877 let top_id = spec.id;
878 let env = ctx.env.clone();
879 let st = Instant::now();
880 let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
881 Ok(n) => n,
882 Err(e) => {
883 ctx.env = env;
884 return Err(e);
885 }
886 };
887 info!("compile time {:?}", st.elapsed());
888 let st = Instant::now();
889 if let Err(e) = node.typecheck(ctx) {
890 ctx.env = env;
891 return Err(e);
892 }
893 while let Some(check) = ctx.deferred_checks.pop() {
895 if let Err(e) = check(ctx) {
896 ctx.env = env;
897 return Err(e);
898 }
899 }
900 info!("typecheck time {:?}", st.elapsed());
901 Ok(node)
902}