1#[macro_use]
2extern crate netidx_core;
3#[macro_use]
4extern crate combine;
5#[macro_use]
6extern crate serde_derive;
7
8pub mod env;
9pub mod expr;
10pub mod node;
11pub mod typ;
12
13use crate::{
14 env::Env,
15 expr::{ExprId, ModPath},
16 typ::{FnType, Type},
17};
18use anyhow::{bail, Result};
19use arcstr::ArcStr;
20use expr::Expr;
21use fxhash::{FxHashMap, FxHashSet};
22use log::info;
23use netidx::{
24 path::Path,
25 publisher::{Id, Val, WriteRequest},
26 subscriber::{self, Dval, SubId, UpdatesFlags, Value},
27};
28use netidx_protocols::rpc::server::{ArgSpec, RpcCall};
29use node::compiler;
30use parking_lot::RwLock;
31use std::{
32 any::Any,
33 cell::RefCell,
34 collections::{hash_map::Entry, HashMap},
35 fmt::Debug,
36 mem,
37 sync::{
38 self,
39 atomic::{AtomicBool, Ordering},
40 LazyLock,
41 },
42 time::Duration,
43};
44use tokio::time::Instant;
45use triomphe::Arc;
46
47#[allow(dead_code)]
48static TRACE: AtomicBool = AtomicBool::new(false);
49
50#[allow(dead_code)]
51fn set_trace(b: bool) {
52 TRACE.store(b, Ordering::Relaxed)
53}
54
55#[allow(dead_code)]
56fn trace() -> bool {
57 TRACE.load(Ordering::Relaxed)
58}
59
60#[macro_export]
61macro_rules! tdbg {
62 ($e:expr) => {
63 if $crate::trace() {
64 dbg!($e)
65 } else {
66 $e
67 }
68 };
69}
70
71thread_local! {
72 pub static REFS: RefCell<Refs> = RefCell::new(Refs::new());
74}
75
76atomic_id!(LambdaId);
77
78impl From<u64> for LambdaId {
79 fn from(v: u64) -> Self {
80 LambdaId(v)
81 }
82}
83
84atomic_id!(BindId);
85
86impl From<u64> for BindId {
87 fn from(v: u64) -> Self {
88 BindId(v)
89 }
90}
91
92impl TryFrom<Value> for BindId {
93 type Error = anyhow::Error;
94
95 fn try_from(value: Value) -> Result<Self> {
96 match value {
97 Value::U64(id) => Ok(BindId(id)),
98 v => bail!("invalid bind id {v}"),
99 }
100 }
101}
102
103#[macro_export]
104macro_rules! errf {
105 ($pat:expr, $($arg:expr),*) => {
106 Some(Value::Error(ArcStr::from(format_compact!($pat, $($arg),*).as_str())))
107 };
108 ($pat:expr) => { Some(Value::Error(ArcStr::from(format_compact!($pat).as_str()))) };
109}
110
111#[macro_export]
112macro_rules! err {
113 ($pat:literal) => {
114 Some(Value::Error(literal!($pat)))
115 };
116}
117
118pub trait UserEvent: Clone + Debug + Any {
119 fn clear(&mut self);
120}
121
122#[derive(Debug, Clone)]
123pub struct NoUserEvent;
124
125impl UserEvent for NoUserEvent {
126 fn clear(&mut self) {}
127}
128
129#[derive(Debug)]
135pub struct Event<E: UserEvent> {
136 pub init: bool,
137 pub variables: FxHashMap<BindId, Value>,
138 pub netidx: FxHashMap<SubId, subscriber::Event>,
139 pub writes: FxHashMap<Id, WriteRequest>,
140 pub rpc_calls: FxHashMap<BindId, RpcCall>,
141 pub user: E,
142}
143
144impl<E: UserEvent> Event<E> {
145 pub fn new(user: E) -> Self {
146 Event {
147 init: false,
148 variables: HashMap::default(),
149 netidx: HashMap::default(),
150 writes: HashMap::default(),
151 rpc_calls: HashMap::default(),
152 user,
153 }
154 }
155
156 pub fn clear(&mut self) {
157 let Self { init, variables, netidx, rpc_calls, writes, user } = self;
158 *init = false;
159 variables.clear();
160 netidx.clear();
161 rpc_calls.clear();
162 writes.clear();
163 user.clear();
164 }
165}
166
167#[derive(Debug, Clone)]
168pub struct Refs {
169 refed: FxHashSet<BindId>,
170 bound: FxHashSet<BindId>,
171}
172
173impl Refs {
174 pub fn new() -> Self {
175 Self { refed: FxHashSet::default(), bound: FxHashSet::default() }
176 }
177
178 pub fn clear(&mut self) {
179 self.refed.clear();
180 self.bound.clear();
181 }
182
183 pub fn with_external_refs(&self, mut f: impl FnMut(BindId)) {
184 for id in &self.refed {
185 if !self.bound.contains(id) {
186 f(*id);
187 }
188 }
189 }
190}
191
192pub type Node<C, E> = Box<dyn Update<C, E>>;
193
194pub type BuiltInInitFn<C, E> = sync::Arc<
195 dyn for<'a, 'b, 'c> Fn(
196 &'a mut ExecCtx<C, E>,
197 &'a FnType,
198 &'b ModPath,
199 &'c [Node<C, E>],
200 ExprId,
201 ) -> Result<Box<dyn Apply<C, E>>>
202 + Send
203 + Sync
204 + 'static,
205>;
206
207pub type InitFn<C, E> = sync::Arc<
208 dyn for<'a, 'b> Fn(
209 &'a mut ExecCtx<C, E>,
210 &'b [Node<C, E>],
211 ExprId,
212 ) -> Result<Box<dyn Apply<C, E>>>
213 + Send
214 + Sync
215 + 'static,
216>;
217
218pub trait Apply<C: Ctx, E: UserEvent>: Debug + Send + Sync + Any {
223 fn update(
224 &mut self,
225 ctx: &mut ExecCtx<C, E>,
226 from: &mut [Node<C, E>],
227 event: &mut Event<E>,
228 ) -> Option<Value>;
229
230 fn delete(&mut self, _ctx: &mut ExecCtx<C, E>) {
233 ()
234 }
235
236 fn typecheck(
239 &mut self,
240 _ctx: &mut ExecCtx<C, E>,
241 _from: &mut [Node<C, E>],
242 ) -> Result<()> {
243 Ok(())
244 }
245
246 fn typ(&self) -> Arc<FnType> {
249 const EMPTY: LazyLock<Arc<FnType>> = LazyLock::new(|| {
250 Arc::new(FnType {
251 args: Arc::from_iter([]),
252 constraints: Arc::new(RwLock::new(vec![])),
253 rtype: Type::Bottom,
254 vargs: None,
255 })
256 });
257 Arc::clone(&*EMPTY)
258 }
259
260 fn refs<'a>(&self, _refs: &mut Refs) {}
264
265 fn sleep(&mut self, _ctx: &mut ExecCtx<C, E>);
268}
269
270pub trait Update<C: Ctx, E: UserEvent>: Debug + Send + Sync + Any + 'static {
274 fn update(&mut self, ctx: &mut ExecCtx<C, E>, event: &mut Event<E>) -> Option<Value>;
277
278 fn delete(&mut self, ctx: &mut ExecCtx<C, E>);
280
281 fn typecheck(&mut self, ctx: &mut ExecCtx<C, E>) -> Result<()>;
283
284 fn typ(&self) -> &Type;
286
287 fn refs(&self, refs: &mut Refs);
290
291 fn spec(&self) -> &Expr;
293
294 fn sleep(&mut self, ctx: &mut ExecCtx<C, E>);
296}
297
298pub trait BuiltIn<C: Ctx, E: UserEvent> {
299 const NAME: &str;
300 const TYP: LazyLock<FnType>;
301
302 fn init(ctx: &mut ExecCtx<C, E>) -> BuiltInInitFn<C, E>;
303}
304
305pub trait Ctx: Debug + 'static {
306 fn clear(&mut self);
307
308 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval;
312
313 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
315
316 fn list(&mut self, id: BindId, path: Path);
321
322 fn list_table(&mut self, id: BindId, path: Path);
325
326 fn stop_list(&mut self, id: BindId);
329
330 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val>;
334
335 fn update(&mut self, id: &Val, value: Value);
337
338 fn unpublish(&mut self, id: Val, ref_by: ExprId);
340
341 fn ref_var(&mut self, id: BindId, ref_by: ExprId);
351 fn unref_var(&mut self, id: BindId, ref_by: ExprId);
352
353 fn set_var(&mut self, id: BindId, value: Value);
365
366 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId);
372
373 fn publish_rpc(
382 &mut self,
383 name: Path,
384 doc: Value,
385 spec: Vec<ArgSpec>,
386 id: BindId,
387 ) -> Result<()>;
388
389 fn unpublish_rpc(&mut self, name: Path);
391
392 fn set_timer(&mut self, id: BindId, timeout: Duration);
396}
397
398pub struct ExecCtx<C: Ctx, E: UserEvent> {
399 builtins: FxHashMap<&'static str, (FnType, BuiltInInitFn<C, E>)>,
400 tags: FxHashSet<ArcStr>,
401 pub env: Env<C, E>,
402 pub cached: FxHashMap<BindId, Value>,
403 pub user: C,
404}
405
406impl<C: Ctx, E: UserEvent> ExecCtx<C, E> {
407 pub fn clear(&mut self) {
408 self.env.clear();
409 self.user.clear();
410 }
411
412 pub fn new(user: C) -> Self {
421 Self {
422 env: Env::new(),
423 builtins: FxHashMap::default(),
424 tags: FxHashSet::default(),
425 cached: HashMap::default(),
426 user,
427 }
428 }
429
430 pub fn register_builtin<T: BuiltIn<C, E>>(&mut self) -> Result<()> {
431 let f = T::init(self);
432 match self.builtins.entry(T::NAME) {
433 Entry::Vacant(e) => {
434 e.insert((T::TYP.clone(), f));
435 }
436 Entry::Occupied(_) => bail!("builtin {} is already registered", T::NAME),
437 }
438 Ok(())
439 }
440
441 pub fn set_var(&mut self, id: BindId, v: Value) {
445 self.cached.insert(id, v.clone());
446 self.user.set_var(id, v)
447 }
448
449 fn tag(&mut self, s: &ArcStr) -> ArcStr {
450 match self.tags.get(s) {
451 Some(s) => s.clone(),
452 None => {
453 self.tags.insert(s.clone());
454 s.clone()
455 }
456 }
457 }
458
459 pub fn with_restored<R, F: FnOnce(&mut Self) -> R>(
464 &mut self,
465 env: Env<C, E>,
466 f: F,
467 ) -> R {
468 let snap = self.env.restore_lexical_env(env);
469 let orig = mem::replace(&mut self.env, snap);
470 let r = f(self);
471 self.env = self.env.restore_lexical_env(orig);
472 r
473 }
474}
475
476pub fn compile<C: Ctx, E: UserEvent>(
479 ctx: &mut ExecCtx<C, E>,
480 scope: &ModPath,
481 spec: Expr,
482) -> Result<Node<C, E>> {
483 let top_id = spec.id;
484 let env = ctx.env.clone();
485 let st = Instant::now();
486 let mut node = match compiler::compile(ctx, spec, scope, top_id) {
487 Ok(n) => n,
488 Err(e) => {
489 ctx.env = env;
490 return Err(e);
491 }
492 };
493 info!("compile time {:?}", st.elapsed());
494 let st = Instant::now();
495 if let Err(e) = node.typecheck(ctx) {
496 ctx.env = env;
497 return Err(e);
498 }
499 info!("typecheck time {:?}", st.elapsed());
500 Ok(node)
501}