1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14 env::Env,
15 expr::{ExprId, ModPath},
16 typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use enumflags2::{bitflags, BitFlags};
21use expr::Expr;
22use fxhash::{FxHashMap, FxHashSet};
23use log::info;
24use netidx::{
25 path::Path,
26 publisher::{Id, Val, WriteRequest},
27 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
28};
29use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
30use node::compiler;
31use parking_lot::RwLock;
32use poolshark::local::LPooled;
33use std::{
34 any::Any,
35 cell::Cell,
36 collections::{hash_map::Entry, HashMap},
37 fmt::Debug,
38 mem,
39 sync::{
40 self,
41 atomic::{AtomicBool, Ordering},
42 LazyLock,
43 },
44 time::Duration,
45};
46use tokio::time::Instant;
47use triomphe::Arc;
48
49#[derive(Debug, Clone, Copy)]
50#[bitflags]
51#[repr(u64)]
52pub enum CFlag {
53 WarnUnhandled,
54 WarnUnhandledArith,
55 WarnUnused,
56 WarningsAreErrors,
57}
58
59#[allow(dead_code)]
60static TRACE: AtomicBool = AtomicBool::new(false);
61
62#[allow(dead_code)]
63fn set_trace(b: bool) {
64 TRACE.store(b, Ordering::Relaxed)
65}
66
67#[allow(dead_code)]
68fn with_trace<F: FnOnce() -> Result<R>, R>(enable: bool, spec: &Expr, f: F) -> Result<R> {
69 let set = if enable {
70 eprintln!("trace enabled at {}, spec: {}", spec.pos, spec);
71 let prev = trace();
72 set_trace(true);
73 !prev
74 } else {
75 false
76 };
77 let r = match f() {
78 Err(e) => {
79 eprintln!("traced at {} failed with {e:?}", spec.pos);
80 Err(e)
81 }
82 r => r,
83 };
84 if set {
85 eprintln!("trace disabled at {}", spec.pos);
86 set_trace(false)
87 }
88 r
89}
90
91#[allow(dead_code)]
92fn trace() -> bool {
93 TRACE.load(Ordering::Relaxed)
94}
95
96#[macro_export]
97macro_rules! tdbg {
98 ($e:expr) => {
99 if $crate::trace() {
100 dbg!($e)
101 } else {
102 $e
103 }
104 };
105}
106
107#[macro_export]
108macro_rules! err {
109 ($tag:expr, $err:literal) => {{
110 let e: Value = ($tag.clone(), literal!($err)).into();
111 Value::Error(triomphe::Arc::new(e))
112 }};
113}
114
115#[macro_export]
116macro_rules! errf {
117 ($tag:expr, $fmt:expr, $args:tt) => {{
118 let msg: ArcStr = compact_str::format_compact!($fmt, $args).as_str().into();
119 let e: Value = ($tag.clone(), msg).into();
120 Value::Error(triomphe::Arc::new(e))
121 }};
122 ($tag:expr, $fmt:expr) => {{
123 let msg: ArcStr = compact_str::format_compact!($fmt).as_str().into();
124 let e: Value = ($tag.clone(), msg).into();
125 Value::Error(triomphe::Arc::new(e))
126 }};
127}
128
129#[macro_export]
130macro_rules! defetyp {
131 ($name:ident, $tag_name:ident, $tag:literal, $typ:expr) => {
132 static $tag_name: ArcStr = literal!($tag);
133 static $name: ::std::sync::LazyLock<$crate::typ::Type> =
134 ::std::sync::LazyLock::new(|| {
135 let scope = $crate::expr::ModPath::root();
136 $crate::expr::parser::parse_type(&format!($typ, $tag))
137 .expect("failed to parse type")
138 .scope_refs(&scope)
139 });
140 };
141}
142
143atomic_id!(LambdaId);
144
145impl From<u64> for LambdaId {
146 fn from(v: u64) -> Self {
147 LambdaId(v)
148 }
149}
150
151atomic_id!(BindId);
152
153impl From<u64> for BindId {
154 fn from(v: u64) -> Self {
155 BindId(v)
156 }
157}
158
159impl TryFrom<Value> for BindId {
160 type Error = anyhow::Error;
161
162 fn try_from(value: Value) -> Result<Self> {
163 match value {
164 Value::U64(id) => Ok(BindId(id)),
165 v => bail!("invalid bind id {v}"),
166 }
167 }
168}
169
170pub trait UserEvent: Clone + Debug + Any {
171 fn clear(&mut self);
172}
173
174#[derive(Debug, Clone)]
175pub struct NoUserEvent;
176
177impl UserEvent for NoUserEvent {
178 fn clear(&mut self) {}
179}
180
181#[derive(Debug, Clone, Copy)]
182#[bitflags]
183#[repr(u64)]
184pub enum PrintFlag {
185 DerefTVars,
188 ReplacePrims,
191 NoSource,
193 NoParents,
195}
196
197thread_local! {
198 static PRINT_FLAGS: Cell<BitFlags<PrintFlag>> = Cell::new(PrintFlag::ReplacePrims | PrintFlag::NoSource);
199}
200
201pub fn format_with_flags<G: Into<BitFlags<PrintFlag>>, R, F: FnOnce() -> R>(
205 flags: G,
206 f: F,
207) -> R {
208 let prev = PRINT_FLAGS.replace(flags.into());
209 let res = f();
210 PRINT_FLAGS.set(prev);
211 res
212}
213
214#[derive(Debug)]
220pub struct Event<E: UserEvent> {
221 pub init: bool,
222 pub variables: FxHashMap<BindId, Value>,
223 pub netidx: FxHashMap<SubId, subscriber::Event>,
224 pub writes: FxHashMap<Id, WriteRequest>,
225 pub rpc_calls: FxHashMap<BindId, RpcCall>,
226 pub user: E,
227}
228
229impl<E: UserEvent> Event<E> {
230 pub fn new(user: E) -> Self {
231 Event {
232 init: false,
233 variables: HashMap::default(),
234 netidx: HashMap::default(),
235 writes: HashMap::default(),
236 rpc_calls: HashMap::default(),
237 user,
238 }
239 }
240
241 pub fn clear(&mut self) {
242 let Self { init, variables, netidx, rpc_calls, writes, user } = self;
243 *init = false;
244 variables.clear();
245 netidx.clear();
246 rpc_calls.clear();
247 writes.clear();
248 user.clear();
249 }
250}
251
252#[derive(Debug, Clone, Default)]
253pub struct Refs {
254 refed: LPooled<FxHashSet<BindId>>,
255 bound: LPooled<FxHashSet<BindId>>,
256}
257
258impl Refs {
259 pub fn clear(&mut self) {
260 self.refed.clear();
261 self.bound.clear();
262 }
263
264 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
265 for id in &*self.refed {
266 if !self.bound.contains(id) {
267 f(*id);
268 }
269 }
270 }
271}
272
273pub type Node<R, E> = Box<dyn Update<R, E>>;
274
275pub type BuiltInInitFn<R, E> = sync::Arc<
276 dyn for<'a, 'b, 'c> Fn(
277 &'a mut ExecCtx<R, E>,
278 &'a FnType,
279 &'b Scope,
280 &'c [Node<R, E>],
281 ExprId,
282 ) -> Result<Box<dyn Apply<R, E>>>
283 + Send
284 + Sync
285 + 'static,
286>;
287
288pub type InitFn<R, E> = sync::Arc<
289 dyn for<'a, 'b, 'c> Fn(
290 &'a Scope,
291 &'b mut ExecCtx<R, E>,
292 &'c [Node<R, E>],
293 ExprId,
294 ) -> Result<Box<dyn Apply<R, E>>>
295 + Send
296 + Sync
297 + 'static,
298>;
299
300pub trait Apply<R: Rt, E: UserEvent>: Debug + Send + Sync + Any {
305 fn update(
306 &mut self,
307 ctx: &mut ExecCtx<R, E>,
308 from: &mut [Node<R, E>],
309 event: &mut Event<E>,
310 ) -> Option<Value>;
311
312 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {
315 ()
316 }
317
318 fn typecheck(
321 &mut self,
322 _ctx: &mut ExecCtx<R, E>,
323 _from: &mut [Node<R, E>],
324 ) -> Result<()> {
325 Ok(())
326 }
327
328 fn typ(&self) -> Arc<FnType> {
331 static EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
332 Arc::new(FnType {
333 args: Arc::from_iter([]),
334 constraints: Arc::new(RwLock::new(LPooled::take())),
335 rtype: Type::Bottom,
336 throws: Type::Bottom,
337 vargs: None,
338 })
339 });
340 Arc::clone(&*EMPTY)
341 }
342
343 fn refs<'a>(&self, _refs: &mut Refs) {}
347
348 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>);
351}
352
353pub trait Update<R: Rt, E: UserEvent>: Debug + Send + Sync + Any + 'static {
357 fn update(&mut self, ctx: &mut ExecCtx<R, E>, event: &mut Event<E>) -> Option<Value>;
360
361 fn delete(&mut self, ctx: &mut ExecCtx<R, E>);
363
364 fn typecheck(&mut self, ctx: &mut ExecCtx<R, E>) -> Result<()>;
366
367 fn typ(&self) -> &Type;
369
370 fn refs(&self, refs: &mut Refs);
373
374 fn spec(&self) -> &Expr;
376
377 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>);
379}
380
381pub trait BuiltIn<R: Rt, E: UserEvent> {
382 const NAME: &str;
383 const TYP: LazyLock<FnType>;
384
385 fn init(ctx: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E>;
386}
387
388pub trait Rt: Debug + 'static {
389 fn clear(&mut self);
390
391 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
395
396 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
398
399 fn list(&mut self, id: BindId, path: Path);
404
405 fn list_table(&mut self, id: BindId, path: Path);
408
409 fn stop_list(&mut self, id: BindId);
412
413 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
417
418 fn update(&mut self, id: &Val, value: Value);
420
421 fn unpublish(&mut self, id: Val, ref_by: ExprId);
423
424 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
434 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
435
436 fn set_var(&mut self, id: BindId, value: Value);
447
448 fn notify_set(&mut self, id: BindId);
455
456 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
462
463 fn publish_rpc(
472 &mut self,
473 name: Path,
474 doc: Value,
475 spec: Vec<ArgSpec>,
476 id: BindId,
477 ) -> Result<()>;
478
479 fn unpublish_rpc(&mut self, name: Path);
481
482 fn set_timer(&mut self, id: BindId, timeout: Duration);
486}
487
488pub struct ExecCtx<R: Rt, E: UserEvent> {
489 builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<R, E>)>,
490 builtins_allowed: bool,
491 tags: FxHashSet<ArcStr>,
492 pub env: Env<R, E>,
493 pub cached: FxHashMap<BindId, Value>,
494 pub rt: R,
495}
496
497impl<R: Rt, E: UserEvent> ExecCtx<R, E> {
498 pub fn clear(&mut self) {
499 self.env.clear();
500 self.rt.clear();
501 }
502
503 pub fn new(user: R) -> Self {
512 Self {
513 env: Env::new(),
514 builtins: FxHashMap::default(),
515 builtins_allowed: true,
516 tags: FxHashSet::default(),
517 cached: HashMap::default(),
518 rt: user,
519 }
520 }
521
522 pub fn register_builtin<T: BuiltIn<R, E>>(&mut self) -> Result<()> {
523 let f = T::init(self);
524 match self.builtins.entry(T::NAME) {
525 Entry::Vacant(e) => {
526 e.insert((T::TYP.clone(), f));
527 }
528 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
529 }
530 Ok(())
531 }
532
533 pub fn set_var(&mut self, id: BindId, v: Value) {
537 self.cached.insert(id, v.clone());
538 self.rt.set_var(id, v)
539 }
540
541 fn tag(&mut self, s: &ArcStr) -> ArcStr {
542 match self.tags.get(s) {
543 Some(s) => s.clone(),
544 None => {
545 self.tags.insert(s.clone());
546 s.clone()
547 }
548 }
549 }
550
551 pub fn with_restored<T, F: FnOnce(&mut Self) -> T>(
556 &mut self,
557 env: Env<R, E>,
558 f: F,
559 ) -> T {
560 let snap = self.env.restore_lexical_env(env);
561 let orig = mem::replace(&mut self.env, snap);
562 let r = f(self);
563 self.env = self.env.restore_lexical_env(orig);
564 r
565 }
566}
567
568#[derive(Debug, Clone)]
569pub struct Scope {
570 pub lexical: ModPath,
571 pub dynamic: ModPath,
572}
573
574impl Scope {
575 pub fn append<S: AsRef<str> + ?Sized>(&self, s: &S) -> Self {
576 Self {
577 lexical: ModPath(self.lexical.append(s)),
578 dynamic: ModPath(self.dynamic.append(s)),
579 }
580 }
581
582 pub fn root() -> Self {
583 Self { lexical: ModPath::root(), dynamic: ModPath::root() }
584 }
585}
586
587pub fn compile<R: Rt, E: UserEvent>(
590 ctx: &mut ExecCtx<R, E>,
591 flags: BitFlags<CFlag>,
592 scope: &Scope,
593 spec: Expr,
594) -> Result<Node<R, E>> {
595 let top_id = spec.id;
596 let env = ctx.env.clone();
597 let st = Instant::now();
598 let mut node = match compiler::compile(ctx, flags, spec, scope, top_id) {
599 Ok(n) => n,
600 Err(e) => {
601 ctx.env = env;
602 return Err(e);
603 }
604 };
605 info!("compile time {:?}", st.elapsed());
606 let st = Instant::now();
607 if let Err(e) = node.typecheck(ctx) {
608 ctx.env = env;
609 return Err(e);
610 }
611 info!("typecheck time {:?}", st.elapsed());
612 Ok(node)
613}