1use anyhow::{bail, Result};
2use arcstr::ArcStr;
3use compact_str::format_compact;
4use enumflags2::{bitflags, BitFlags};
5use fxhash::FxHashMap;
6use graphix_compiler::{
7 expr::{ExprId, ModuleResolver},
8 node::genn,
9 typ::{FnType, Type},
10 Apply, BindId, BuiltIn, BuiltInInitFn, Event, ExecCtx, LambdaId, Node, Refs, Rt,
11 Scope, UserEvent,
12};
13use netidx::{path::Path, subscriber::Value};
14use netidx_core::utils::Either;
15use netidx_value::FromValue;
16use poolshark::local::LPooled;
17use std::{
18 any::Any,
19 collections::{hash_map::Entry, VecDeque},
20 fmt::Debug,
21 iter,
22 marker::PhantomData,
23 sync::{Arc, LazyLock},
24};
25use triomphe::Arc as TArc;
26
27mod array;
28mod core;
29mod fs;
30mod map;
31mod net;
32mod rand;
33mod re;
34mod str;
35#[cfg(test)]
36mod test;
37mod time;
38
39#[macro_export]
40macro_rules! deftype {
41 ($scope:literal, $s:literal) => {
42 const TYP: ::std::sync::LazyLock<graphix_compiler::typ::FnType> =
43 ::std::sync::LazyLock::new(|| {
44 let scope =
45 graphix_compiler::expr::ModPath(::netidx::path::Path::from($scope));
46 graphix_compiler::expr::parser::parse_fn_type($s)
47 .expect("failed to parse fn type {s}")
48 .scope_refs(&scope)
49 });
50 };
51}
52
53#[macro_export]
54macro_rules! arity1 {
55 ($from:expr, $updates:expr) => {
56 match (&*$from, &*$updates) {
57 ([arg], [arg_up]) => (arg, arg_up),
58 (_, _) => unreachable!(),
59 }
60 };
61}
62
63#[macro_export]
64macro_rules! arity2 {
65 ($from:expr, $updates:expr) => {
66 match (&*$from, &*$updates) {
67 ([arg0, arg1], [arg0_up, arg1_up]) => ((arg0, arg1), (arg0_up, arg1_up)),
68 (_, _) => unreachable!(),
69 }
70 };
71}
72
73#[derive(Debug)]
74pub struct CachedVals(pub Box<[Option<Value>]>);
75
76impl CachedVals {
77 pub fn new<R: Rt, E: UserEvent>(from: &[Node<R, E>]) -> CachedVals {
78 CachedVals(from.into_iter().map(|_| None).collect())
79 }
80
81 pub fn clear(&mut self) {
82 for v in &mut self.0 {
83 *v = None
84 }
85 }
86
87 pub fn update<R: Rt, E: UserEvent>(
88 &mut self,
89 ctx: &mut ExecCtx<R, E>,
90 from: &mut [Node<R, E>],
91 event: &mut Event<E>,
92 ) -> bool {
93 from.into_iter().enumerate().fold(false, |res, (i, src)| {
94 match src.update(ctx, event) {
95 None => res,
96 v @ Some(_) => {
97 self.0[i] = v;
98 true
99 }
100 }
101 })
102 }
103
104 pub fn update_diff<R: Rt, E: UserEvent>(
107 &mut self,
108 up: &mut [bool],
109 ctx: &mut ExecCtx<R, E>,
110 from: &mut [Node<R, E>],
111 event: &mut Event<E>,
112 ) {
113 for (i, n) in from.iter_mut().enumerate() {
114 match n.update(ctx, event) {
115 None => (),
116 v => {
117 self.0[i] = v;
118 up[i] = true
119 }
120 }
121 }
122 }
123
124 pub fn flat_iter<'a>(&'a self) -> impl Iterator<Item = Option<Value>> + 'a {
125 self.0.iter().flat_map(|v| match v {
126 None => Either::Left(iter::once(None)),
127 Some(v) => Either::Right(v.clone().flatten().map(Some)),
128 })
129 }
130
131 pub fn get<T: FromValue>(&self, i: usize) -> Option<T> {
132 self.0.get(i).and_then(|v| v.as_ref()).and_then(|v| v.clone().cast_to::<T>().ok())
133 }
134}
135
136pub trait EvalCached: Debug + Default + Send + Sync + 'static {
137 const NAME: &str;
138 const TYP: LazyLock<FnType>;
139
140 fn eval(&mut self, from: &CachedVals) -> Option<Value>;
141}
142
143#[derive(Debug)]
144pub struct CachedArgs<T: EvalCached> {
145 cached: CachedVals,
146 t: T,
147}
148
149impl<R: Rt, E: UserEvent, T: EvalCached> BuiltIn<R, E> for CachedArgs<T> {
150 const NAME: &str = T::NAME;
151 const TYP: LazyLock<FnType> = T::TYP;
152
153 fn init(_: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E> {
154 Arc::new(|_, _, _, from, _| {
155 let t = CachedArgs::<T> { cached: CachedVals::new(from), t: T::default() };
156 Ok(Box::new(t))
157 })
158 }
159}
160
161impl<R: Rt, E: UserEvent, T: EvalCached> Apply<R, E> for CachedArgs<T> {
162 fn update(
163 &mut self,
164 ctx: &mut ExecCtx<R, E>,
165 from: &mut [Node<R, E>],
166 event: &mut Event<E>,
167 ) -> Option<Value> {
168 if self.cached.update(ctx, from, event) {
169 self.t.eval(&self.cached)
170 } else {
171 None
172 }
173 }
174
175 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {
176 self.cached.clear()
177 }
178}
179
180pub trait EvalCachedAsync: Debug + Default + Send + Sync + 'static {
181 const NAME: &str;
182 const TYP: LazyLock<FnType>;
183 type Args: Debug + Any + Send + Sync;
184
185 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args>;
186 fn eval(args: Self::Args) -> impl Future<Output = Value> + Send;
187}
188
189#[derive(Debug)]
190pub struct CachedArgsAsync<T: EvalCachedAsync> {
191 cached: CachedVals,
192 id: BindId,
193 top_id: ExprId,
194 queued: VecDeque<T::Args>,
195 running: bool,
196 t: T,
197}
198
199impl<R: Rt, E: UserEvent, T: EvalCachedAsync> BuiltIn<R, E> for CachedArgsAsync<T> {
200 const NAME: &str = T::NAME;
201 const TYP: LazyLock<FnType> = T::TYP;
202
203 fn init(_: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E> {
204 Arc::new(|ctx, _, _, from, top_id| {
205 let id = BindId::new();
206 ctx.rt.ref_var(id, top_id);
207 let t = CachedArgsAsync::<T> {
208 id,
209 top_id,
210 cached: CachedVals::new(from),
211 queued: VecDeque::new(),
212 running: false,
213 t: T::default(),
214 };
215 Ok(Box::new(t))
216 })
217 }
218}
219
220impl<R: Rt, E: UserEvent, T: EvalCachedAsync> Apply<R, E> for CachedArgsAsync<T> {
221 fn update(
222 &mut self,
223 ctx: &mut ExecCtx<R, E>,
224 from: &mut [Node<R, E>],
225 event: &mut Event<E>,
226 ) -> Option<Value> {
227 if self.cached.update(ctx, from, event)
228 && let Some(args) = self.t.prepare_args(&self.cached)
229 {
230 self.queued.push_back(args);
231 }
232 let res = event.variables.remove(&self.id).map(|v| {
233 self.running = false;
234 v
235 });
236 if !self.running
237 && let Some(args) = self.queued.pop_front()
238 {
239 self.running = true;
240 let id = self.id;
241 ctx.rt.spawn_var(async move { (id, T::eval(args).await) });
242 }
243 res
244 }
245
246 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
247 ctx.rt.unref_var(self.id, self.top_id);
248 self.queued.clear();
249 self.cached.clear();
250 }
251
252 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
253 self.delete(ctx);
254 let id = BindId::new();
255 ctx.rt.ref_var(id, self.top_id);
256 }
257}
258
259pub trait MapCollection: Debug + Clone + Default + Send + Sync + 'static {
260 fn len(&self) -> usize;
262
263 fn iter_values(&self) -> impl Iterator<Item = Value>;
265
266 fn select(v: Value) -> Option<Self>;
269
270 fn project(self) -> Value;
272
273 fn etyp(ft: &FnType) -> Result<Type>;
275}
276
277pub trait MapFn<R: Rt, E: UserEvent>: Debug + Default + Send + Sync + 'static {
278 type Collection: MapCollection;
279
280 const NAME: &str;
281 const TYP: LazyLock<FnType>;
282
283 fn finish(&mut self, slots: &[Slot<R, E>], a: &Self::Collection) -> Option<Value>;
289}
290
291#[derive(Debug)]
292pub struct Slot<R: Rt, E: UserEvent> {
293 id: BindId,
294 pred: Node<R, E>,
295 pub cur: Option<Value>,
296}
297
298impl<R: Rt, E: UserEvent> Slot<R, E> {
299 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
300 self.pred.delete(ctx);
301 ctx.cached.remove(&self.id);
302 ctx.env.unbind_variable(self.id);
303 }
304}
305
306#[derive(Debug)]
307pub struct MapQ<R: Rt, E: UserEvent, T: MapFn<R, E>> {
308 scope: Scope,
309 predid: BindId,
310 top_id: ExprId,
311 mftyp: TArc<FnType>,
312 etyp: Type,
313 slots: Vec<Slot<R, E>>,
314 cur: T::Collection,
315 t: T,
316}
317
318impl<R: Rt, E: UserEvent, T: MapFn<R, E>> BuiltIn<R, E> for MapQ<R, E, T> {
319 const NAME: &str = T::NAME;
320 const TYP: LazyLock<FnType> = T::TYP;
321
322 fn init(_: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E> {
323 Arc::new(|_ctx, typ, scope, from, top_id| match from {
324 [_, _] => Ok(Box::new(Self {
325 scope: scope.append(&format_compact!("fn{}", LambdaId::new().inner())),
326 predid: BindId::new(),
327 top_id,
328 etyp: T::Collection::etyp(typ)?,
329 mftyp: match &typ.args[1].typ {
330 Type::Fn(ft) => ft.clone(),
331 t => bail!("expected a function not {t}"),
332 },
333 slots: vec![],
334 cur: Default::default(),
335 t: T::default(),
336 })),
337 _ => bail!("expected two arguments"),
338 })
339 }
340}
341
342impl<R: Rt, E: UserEvent, T: MapFn<R, E>> Apply<R, E> for MapQ<R, E, T> {
343 fn update(
344 &mut self,
345 ctx: &mut ExecCtx<R, E>,
346 from: &mut [Node<R, E>],
347 event: &mut Event<E>,
348 ) -> Option<Value> {
349 let slen = self.slots.len();
350 if let Some(v) = from[1].update(ctx, event) {
351 ctx.cached.insert(self.predid, v.clone());
352 event.variables.insert(self.predid, v);
353 }
354 let (up, resized) =
355 match from[0].update(ctx, event).and_then(|v| T::Collection::select(v)) {
356 Some(a) if a.len() == slen => (Some(a), false),
357 Some(a) if a.len() < slen => {
358 while self.slots.len() > a.len() {
359 if let Some(mut s) = self.slots.pop() {
360 s.delete(ctx)
361 }
362 }
363 (Some(a), true)
364 }
365 Some(a) => {
366 while self.slots.len() < a.len() {
367 let (id, node) = genn::bind(
368 ctx,
369 &self.scope.lexical,
370 "x",
371 self.etyp.clone(),
372 self.top_id,
373 );
374 let fargs = vec![node];
375 let fnode = genn::reference(
376 ctx,
377 self.predid,
378 Type::Fn(self.mftyp.clone()),
379 self.top_id,
380 );
381 let pred = genn::apply(
382 fnode,
383 self.scope.clone(),
384 fargs,
385 &self.mftyp,
386 self.top_id,
387 );
388 self.slots.push(Slot { id, pred, cur: None });
389 }
390 (Some(a), true)
391 }
392 None => (None, false),
393 };
394 if let Some(a) = up {
395 for (s, v) in self.slots.iter().zip(a.iter_values()) {
396 ctx.cached.insert(s.id, v.clone());
397 event.variables.insert(s.id, v);
398 }
399 self.cur = a.clone();
400 if a.len() == 0 {
401 return Some(T::Collection::project(a));
402 }
403 }
404 let init = event.init;
405 let mut up = resized;
406 for (i, s) in self.slots.iter_mut().enumerate() {
407 if i == slen {
408 event.init = true;
410 if let Entry::Vacant(e) = event.variables.entry(self.predid)
411 && let Some(v) = ctx.cached.get(&self.predid)
412 {
413 e.insert(v.clone());
414 }
415 }
416 if let Some(v) = s.pred.update(ctx, event) {
417 s.cur = Some(v);
418 up = true;
419 }
420 }
421 event.init = init;
422 if up && self.slots.iter().all(|s| s.cur.is_some()) {
423 self.t.finish(&mut &self.slots, &self.cur)
424 } else {
425 None
426 }
427 }
428
429 fn typecheck(
430 &mut self,
431 ctx: &mut ExecCtx<R, E>,
432 _from: &mut [Node<R, E>],
433 ) -> anyhow::Result<()> {
434 let (_, node) =
435 genn::bind(ctx, &self.scope.lexical, "x", self.etyp.clone(), self.top_id);
436 let fargs = vec![node];
437 let ft = self.mftyp.clone();
438 let fnode = genn::reference(ctx, self.predid, Type::Fn(ft.clone()), self.top_id);
439 let mut node = genn::apply(fnode, self.scope.clone(), fargs, &ft, self.top_id);
440 let r = node.typecheck(ctx);
441 node.delete(ctx);
442 r
443 }
444
445 fn refs(&self, refs: &mut Refs) {
446 for s in &self.slots {
447 s.pred.refs(refs)
448 }
449 }
450
451 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
452 ctx.cached.remove(&self.predid);
453 for sl in &mut self.slots {
454 sl.delete(ctx)
455 }
456 }
457
458 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
459 self.cur = Default::default();
460 for sl in &mut self.slots {
461 sl.cur = None;
462 sl.pred.sleep(ctx);
463 }
464 }
465}
466
467pub trait FoldFn<R: Rt, E: UserEvent>: Debug + Send + Sync + 'static {
468 type Collection: MapCollection;
469
470 const NAME: &str;
471 const TYP: LazyLock<FnType>;
472}
473
474#[derive(Debug)]
475pub struct FoldQ<R: Rt, E: UserEvent, T: FoldFn<R, E>> {
476 top_id: ExprId,
477 fid: BindId,
478 scope: Scope,
479 binds: Vec<BindId>,
480 nodes: Vec<Node<R, E>>,
481 inits: Vec<Option<Value>>,
482 initids: Vec<BindId>,
483 initid: BindId,
484 mftype: TArc<FnType>,
485 etyp: Type,
486 ityp: Type,
487 init: Option<Value>,
488 t: PhantomData<T>,
489}
490
491impl<R: Rt, E: UserEvent, T: FoldFn<R, E>> BuiltIn<R, E> for FoldQ<R, E, T> {
492 const NAME: &str = T::NAME;
493 const TYP: LazyLock<FnType> = T::TYP;
494
495 fn init(_: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E> {
496 Arc::new(|_ctx, typ, scope, from, top_id| match from {
497 [_, _, _] => Ok(Box::new(Self {
498 top_id,
499 scope: scope.clone(),
500 binds: vec![],
501 nodes: vec![],
502 inits: vec![],
503 initids: vec![],
504 initid: BindId::new(),
505 fid: BindId::new(),
506 etyp: T::Collection::etyp(typ)?,
507 ityp: typ.args[1].typ.clone(),
508 mftype: match &typ.args[2].typ {
509 Type::Fn(ft) => ft.clone(),
510 t => bail!("expected a function not {t}"),
511 },
512 init: None,
513 t: PhantomData,
514 })),
515 _ => bail!("expected three arguments"),
516 })
517 }
518}
519
520impl<R: Rt, E: UserEvent, T: FoldFn<R, E>> Apply<R, E> for FoldQ<R, E, T> {
521 fn update(
522 &mut self,
523 ctx: &mut ExecCtx<R, E>,
524 from: &mut [Node<R, E>],
525 event: &mut Event<E>,
526 ) -> Option<Value> {
527 let init = match from[0].update(ctx, event).and_then(|v| T::Collection::select(v))
528 {
529 None => self.nodes.len(),
530 Some(a) if a.len() == self.binds.len() => {
531 for (id, v) in self.binds.iter().zip(a.iter_values()) {
532 ctx.cached.insert(*id, v.clone());
533 event.variables.insert(*id, v.clone());
534 }
535 self.nodes.len()
536 }
537 Some(a) => {
538 let vals = a.iter_values().collect::<LPooled<Vec<Value>>>();
539 while self.binds.len() < a.len() {
540 self.binds.push(BindId::new());
541 self.inits.push(None);
542 self.initids.push(BindId::new());
543 }
544 while a.len() < self.binds.len() {
545 if let Some(id) = self.binds.pop() {
546 ctx.cached.remove(&id);
547 }
548 if let Some(id) = self.initids.pop() {
549 ctx.cached.remove(&id);
550 }
551 self.inits.pop();
552 if let Some(mut n) = self.nodes.pop() {
553 n.delete(ctx);
554 }
555 }
556 let init = self.nodes.len();
557 for i in 0..self.binds.len() {
558 ctx.cached.insert(self.binds[i], vals[i].clone());
559 event.variables.insert(self.binds[i], vals[i].clone());
560 if i >= self.nodes.len() {
561 let n = genn::reference(
562 ctx,
563 if i == 0 { self.initid } else { self.initids[i - 1] },
564 self.ityp.clone(),
565 self.top_id,
566 );
567 let x = genn::reference(
568 ctx,
569 self.binds[i],
570 self.etyp.clone(),
571 self.top_id,
572 );
573 let fnode = genn::reference(
574 ctx,
575 self.fid,
576 Type::Fn(self.mftype.clone()),
577 self.top_id,
578 );
579 let node = genn::apply(
580 fnode,
581 self.scope.clone(),
582 vec![n, x],
583 &self.mftype,
584 self.top_id,
585 );
586 self.nodes.push(node);
587 }
588 }
589 init
590 }
591 };
592 if let Some(v) = from[1].update(ctx, event) {
593 ctx.cached.insert(self.initid, v.clone());
594 event.variables.insert(self.initid, v.clone());
595 self.init = Some(v);
596 }
597 if let Some(v) = from[2].update(ctx, event) {
598 ctx.cached.insert(self.fid, v.clone());
599 event.variables.insert(self.fid, v);
600 }
601 let old_init = event.init;
602 for i in 0..self.nodes.len() {
603 if i == init {
604 event.init = true;
605 if let Some(v) = ctx.cached.get(&self.fid)
606 && let Entry::Vacant(e) = event.variables.entry(self.fid)
607 {
608 e.insert(v.clone());
609 }
610 if i == 0 {
611 if let Some(v) = self.init.as_ref()
612 && let Entry::Vacant(e) = event.variables.entry(self.initid)
613 {
614 e.insert(v.clone());
615 }
616 } else {
617 if let Some(v) = self.inits[i - 1].clone() {
618 event.variables.insert(self.initids[i - 1], v);
619 }
620 }
621 }
622 match self.nodes[i].update(ctx, event) {
623 Some(v) => {
624 ctx.cached.insert(self.initids[i], v.clone());
625 event.variables.insert(self.initids[i], v.clone());
626 self.inits[i] = Some(v);
627 }
628 None => {
629 ctx.cached.remove(&self.initids[i]);
630 event.variables.remove(&self.initids[i]);
631 self.inits[i] = None;
632 }
633 }
634 }
635 event.init = old_init;
636 self.inits.last().and_then(|v| v.clone())
637 }
638
639 fn typecheck(
640 &mut self,
641 ctx: &mut ExecCtx<R, E>,
642 _from: &mut [Node<R, E>],
643 ) -> anyhow::Result<()> {
644 let mut n = genn::reference(ctx, self.initid, self.ityp.clone(), self.top_id);
645 let x = genn::reference(ctx, BindId::new(), self.etyp.clone(), self.top_id);
646 let fnode =
647 genn::reference(ctx, self.fid, Type::Fn(self.mftype.clone()), self.top_id);
648 n = genn::apply(fnode, self.scope.clone(), vec![n, x], &self.mftype, self.top_id);
649 let r = n.typecheck(ctx);
650 n.delete(ctx);
651 r
652 }
653
654 fn refs(&self, refs: &mut Refs) {
655 for n in &self.nodes {
656 n.refs(refs)
657 }
658 }
659
660 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
661 let i =
662 iter::once(&self.initid).chain(self.binds.iter()).chain(self.initids.iter());
663 for id in i {
664 ctx.cached.remove(id);
665 }
666 for n in &mut self.nodes {
667 n.delete(ctx);
668 }
669 }
670
671 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
672 self.init = None;
673 for v in &mut self.inits {
674 *v = None
675 }
676 for n in &mut self.nodes {
677 n.sleep(ctx)
678 }
679 }
680}
681
682#[bitflags]
683#[derive(Clone, Copy)]
684#[repr(u64)]
685pub enum Module {
686 Array,
687 Map,
688 NetAndTime,
689 Rand,
690 Re,
691 Str,
692 Fs,
693}
694
695pub fn register<R: Rt, E: UserEvent>(
733 ctx: &mut ExecCtx<R, E>,
734 modules: BitFlags<Module>,
735) -> Result<(ArcStr, ModuleResolver)> {
736 let mut tbl = FxHashMap::default();
737 tbl.insert(Path::from("/core"), core::register(ctx)?);
738 let mut root = String::from("pub mod core;\nuse core;\n");
739 for module in modules {
740 match module {
741 Module::Array => {
742 root.push_str("pub mod array;\n");
743 tbl.insert(Path::from("/array"), array::register(ctx)?);
744 }
745 Module::Map => {
746 root.push_str("pub mod map;\n");
747 tbl.insert(Path::from("/map"), map::register(ctx)?);
748 }
749 Module::NetAndTime => {
750 root.push_str("pub mod time;\n");
751 tbl.insert(Path::from("/time"), time::register(ctx)?);
752 root.push_str("pub mod net;\n");
753 tbl.insert(Path::from("/net"), net::register(ctx)?);
754 }
755 Module::Rand => {
756 root.push_str("pub mod rand;\n");
757 tbl.insert(Path::from("/rand"), rand::register(ctx)?);
758 }
759 Module::Re => {
760 root.push_str("pub mod re;\n");
761 tbl.insert(Path::from("/re"), re::register(ctx)?);
762 }
763 Module::Str => {
764 root.push_str("pub mod str;\n");
765 tbl.insert(Path::from("/str"), str::register(ctx)?);
766 }
767 Module::Fs => {
768 root.push_str("pub mod fs;\n");
769 tbl.insert(Path::from("/fs"), fs::register(ctx)?);
770 }
771 }
772 }
773 root.pop();
774 root.pop();
775 Ok((ArcStr::from(root), ModuleResolver::VFS(tbl)))
776}