1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14 env::Env,
15 expr::{ExprId, ModPath},
16 typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use fxhash::{FxHashMap, FxHashSet};
23use log::info;
24use netidx::{
25 path::Path,
26 publisher::{Id, Val, WriteRequest},
27 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
28};
29use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
30use node::compiler;
31use parking_lot::RwLock;
32use poolshark::local::LPooled;
33use std::{
34 any::Any,
35 cell::Cell,
36 collections::{hash_map::Entry, HashMap},
37 fmt::Debug,
38 mem,
39 sync::{
40 self,
41 atomic::{AtomicBool, Ordering},
42 LazyLock,
43 },
44 time::Duration,
45};
46use tokio::time::Instant;
47use triomphe::Arc;
48
49#[derive(Debug, Clone, Copy)]
50#[bitflags]
51#[repr(u64)]
52pub enum CFlag {
53 WarnUnhandled,
54 WarnUnhandledArith,
55 WarnUnused,
56 WarningsAreErrors,
57}
58
59#[allow(dead_code)]
60static TRACE: AtomicBool = AtomicBool::new(false);
61
62#[allow(dead_code)]
63fn set_trace(b: bool) {
64 TRACE.store(b, Ordering::Relaxed)
65}
66
67#[allow(dead_code)]
68fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
69 let set = if enable {
70 eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
71 let prev = trace();
72 set_trace(true);
73 !prev
74 } else {
75 false
76 };
77 let r = match f() {
78 Err(e) => {
79 eprintln!("traced at {} failed with {e:?}", spec.pos);
80 Err(e)
81 }
82 r => r,
83 };
84 if set {
85 eprintln!("trace disabled at {}", spec.pos);
86 set_trace(false)
87 }
88 r
89}
90
91#[allow(dead_code)]
92fn trace() -> bool {
93 TRACE.load(Ordering::Relaxed)
94}
95
96#[macro_export]
97macro_rules! tdbg {
98 ($e:expr) => {
99 if $crate::trace() {
100 dbg!($e)
101 } else {
102 $e
103 }
104 };
105}
106
107#[macro_export]
108macro_rules! err {
109 ($tag:expr, $err:literal) => {{
110 let e: Value = ($tag.clone(), literal!($err)).into();
111 Value::Error(triomphe::Arc::new(e))
112 }};
113}
114
115#[macro_export]
116macro_rules! errf {
117 ($tag:expr, $fmt:expr, $args:tt) => {{
118 let msg: ArcStr = compact_str::format_compact!($fmt, $args).as_str().into();
119 let e: Value = ($tag.clone(), msg).into();
120 Value::Error(triomphe::Arc::new(e))
121 }};
122 ($tag:expr, $fmt:expr) => {{
123 let msg: ArcStr = compact_str::format_compact!($fmt).as_str().into();
124 let e: Value = ($tag.clone(), msg).into();
125 Value::Error(triomphe::Arc::new(e))
126 }};
127}
128
129#[macro_export]
130macro_rules! defetyp {
131 ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
132 static $tag_name: ArcStr = arcstr::literal!($tag);
133 static $name: ::std::sync::LazyLock<$crate::typ::Type> =
134 ::std::sync::LazyLock::new(|| {
135 let scope = $crate::expr::ModPath::root();
136 $crate::expr::parser::parse_type(&format!($typ, $tag))
137 .expect("failed to parse type")
138 .scope_refs(&scope)
139 });
140 };
141}
142
143defetyp!(CAST_ERR, CAST_ERR_TAG, "InvalidCast", "Error<`{}(string)>");
144
145atomic_id!(LambdaId);
146
147impl From<u64> for LambdaId {
148 fn from(v: u64) -> Self {
149 LambdaId(v)
150 }
151}
152
153atomic_id!(BindId);
154
155impl From<u64> for BindId {
156 fn from(v: u64) -> Self {
157 BindId(v)
158 }
159}
160
161impl TryFrom<Value> for BindId {
162 type Error = anyhow::Error;
163
164 fn try_from(value: Value) -> Result<Self> {
165 match value {
166 Value::U64(id) => Ok(BindId(id)),
167 v => bail!("invalid bind id {v}"),
168 }
169 }
170}
171
172pub trait UserEvent: Clone + Debug + Any {
173 fn clear(&mut self);
174}
175
176#[derive(Debug, Clone)]
177pub struct NoUserEvent;
178
179impl UserEvent for NoUserEvent {
180 fn clear(&mut self) {}
181}
182
183#[derive(Debug, Clone, Copy)]
184#[bitflags]
185#[repr(u64)]
186pub enum PrintFlag {
187 DerefTVars,
190 ReplacePrims,
193 NoSource,
195 NoParents,
197}
198
199thread_local! {
200 static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims | PrintFlag::NoSource);
201}
202
203pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
207 flags: G,
208 f: F,
209) -> R {
210 let prev = PRINT_FLAGS.replace(flags.into());
211 let res = f();
212 PRINT_FLAGS.set(prev);
213 res
214}
215
216#[derive(Debug)]
222pub struct Event<E: UserEvent> {
223 pub init: bool,
224 pub variables: FxHashMap<BindId, Value>,
225 pub netidx: FxHashMap<SubId, subscriber::Event>,
226 pub writes: FxHashMap<Id, WriteRequest>,
227 pub rpc_calls: FxHashMap<BindId, RpcCall>,
228 pub user: E,
229}
230
231impl<E: UserEvent> Event<E> {
232 pub fn new(user: E) -> Self {
233 Event {
234 init: false,
235 variables: HashMap::default(),
236 netidx: HashMap::default(),
237 writes: HashMap::default(),
238 rpc_calls: HashMap::default(),
239 user,
240 }
241 }
242
243 pub fn clear(&mut self) {
244 let Self { init, variables, netidx, rpc_calls, writes, user } = self;
245 *init = false;
246 variables.clear();
247 netidx.clear();
248 rpc_calls.clear();
249 writes.clear();
250 user.clear();
251 }
252}
253
254#[derive(Debug, Clone, Default)]
255pub struct Refs {
256 refed: LPooled<FxHashSet<BindId>>,
257 bound: LPooled<FxHashSet<BindId>>,
258}
259
260impl Refs {
261 pub fn clear(&mut self) {
262 self.refed.clear();
263 self.bound.clear();
264 }
265
266 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
267 for id in &*self.refed {
268 if !self.bound.contains(id) {
269 f(*id);
270 }
271 }
272 }
273}
274
275pub type Node<R, E> = Box<dyn Update<R, E>>;
276
277pub type BuiltInInitFn<R, E> = sync::Arc<
278 dyn for<'a, 'b, 'c> Fn(
279 &'a mut ExecCtx<R, E>,
280 &'a FnType,
281 &'b Scope,
282 &'c [Node<R, E>],
283 ExprId,
284 ) -> Result<Box<dyn Apply<R, E>>>
285 + Send
286 + Sync
287 + 'static,
288>;
289
290pub type InitFn<R, E> = sync::Arc<
291 dyn for<'a, 'b, 'c> Fn(
292 &'a Scope,
293 &'b mut ExecCtx<R, E>,
294 &'c mut [Node<R, E>],
295 ExprId,
296 bool,
297 ) -> Result<Box<dyn Apply<R, E>>>
298 + Send
299 + Sync
300 + 'static,
301>;
302
303pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
308 fn update(
309 &mut self,
310 ctx: &mut ExecCtx<R, E>,
311 from: &mut [Node<R, E>],
312 event: &mut Event<E>,
313 ) -> Option<Value>;
314
315 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
318 ()
319 }
320
321 fn typecheck(
324 &mut self,
325 _ctx: &mut ExecCtx<R, E>,
326 _from: &mut [Node<R, E>],
327 ) -> Result<()> {
328 Ok(())
329 }
330
331 fn typ(&self) -> Arc<FnType> {
334 static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
335 Arc::new(FnType {
336 args: Arc::from_iter([]),
337 constraints: Arc::new(RwLock::new(LPooled::take())),
338 rtype: Type::Bottom,
339 throws: Type::Bottom,
340 vargs: None,
341 })
342 });
343 Arc::clone(&*EMPTY)
344 }
345
346 fn refs<'a>(&self, _refs: &mut Refs) {}
350
351 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
354}
355
356pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
360 fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
363
364 fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
366
367 fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
369
370 fn typ(&self) -> &Type;
372
373 fn refs(&self, refs: &mut Refs);
376
377 fn spec(&self) -> &Expr;
379
380 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
382}
383
384pub trait BuiltIn<R: Rt, E: UserEvent> {
385 const NAME: &str;
386 const TYP: LazyLock<FnType>;
387
388 fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
389}
390
391pub trait Rt: Debug + 'static {
392 fn clear(&mut self);
393
394 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
398
399 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
401
402 fn list(&mut self, id: BindId, path: Path);
407
408 fn list_table(&mut self, id: BindId, path: Path);
411
412 fn stop_list(&mut self, id: BindId);
415
416 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
420
421 fn update(&mut self, id: &Val, value: Value);
423
424 fn unpublish(&mut self, id: Val, ref_by: ExprId);
426
427 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
437 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
438
439 fn set_var(&mut self, id: BindId, value: Value);
450
451 fn notify_set(&mut self, id: BindId);
458
459 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
465
466 fn publish_rpc(
475 &mut self,
476 name: Path,
477 doc: Value,
478 spec: Vec<ArgSpec>,
479 id: BindId,
480 ) -> Result<()>;
481
482 fn unpublish_rpc(&mut self, name: Path);
484
485 fn set_timer(&mut self, id: BindId, timeout: Duration);
489}
490
491pub struct ExecCtx<R: Rt, E: UserEvent> {
492 builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
493 builtins_allowed: bool,
494 tags: FxHashSet<ArcStr>,
495 pub env: Env<R, E>,
496 pub cached: FxHashMap<BindId, Value>,
497 pub rt: R,
498}
499
500impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
501 pub fn clear(&mut self) {
502 self.env.clear();
503 self.rt.clear();
504 }
505
506 pub fn new(user: R) -> Self {
515 Self {
516 env: Env::new(),
517 builtins: FxHashMap::default(),
518 builtins_allowed: true,
519 tags: FxHashSet::default(),
520 cached: HashMap::default(),
521 rt: user,
522 }
523 }
524
525 pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
526 let f = T::init(self);
527 match self.builtins.entry(T::NAME) {
528 Entry::Vacant(e) => {
529 e.insert((T::TYP.clone(), f));
530 }
531 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
532 }
533 Ok(())
534 }
535
536 pub fn set_var(&mut self, id: BindId, v: Value) {
540 self.cached.insert(id, v.clone());
541 self.rt.set_var(id, v)
542 }
543
544 fn tag(&mut self, s: &ArcStr) -> ArcStr {
545 match self.tags.get(s) {
546 Some(s) => s.clone(),
547 None => {
548 self.tags.insert(s.clone());
549 s.clone()
550 }
551 }
552 }
553
554 pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
559 &mut self,
560 env: Env<R, E>,
561 f: F,
562 ) -> T {
563 let snap = self.env.restore_lexical_env(env);
564 let orig = mem::replace(&mut self.env, snap);
565 let r = f(self);
566 self.env = self.env.restore_lexical_env(orig);
567 r
568 }
569}
570
571#[derive(Debug, Clone)]
572pub struct Scope {
573 pub lexical: ModPath,
574 pub dynamic: ModPath,
575}
576
577impl Scope {
578 pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
579 Self {
580 lexical: ModPath(self.lexical.append(s)),
581 dynamic: ModPath(self.dynamic.append(s)),
582 }
583 }
584
585 pub fn root() -> Self {
586 Self { lexical: ModPath::root(), dynamic: ModPath::root() }
587 }
588}
589
590pub fn compile<R: Rt, E: UserEvent>(
593 ctx: &mut ExecCtx<R, E>,
594 flags: BitFlags<CFlag>,
595 scope: &Scope,
596 spec: Expr,
597) -> Result<Node<R, E>> {
598 let top_id = spec.id;
599 let env = ctx.env.clone();
600 let st = Instant::now();
601 let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
602 Ok(n) => n,
603 Err(e) => {
604 ctx.env = env;
605 return Err(e);
606 }
607 };
608 info!("compile time {:?}", st.elapsed());
609 let st = Instant::now();
610 if let Err(e) = node.typecheck(ctx) {
611 ctx.env = env;
612 return Err(e);
613 }
614 info!("typecheck time {:?}", st.elapsed());
615 Ok(node)
616}