Skip to main content

graphix_package_core/
lib.rs

1use anyhow::{bail, Result};
2use arcstr::{literal, ArcStr};
3use compact_str::format_compact;
4use graphix_compiler::{
5    err, errf,
6    expr::{Expr, ExprId},
7    node::genn,
8    typ::{FnType, TVal, Type},
9    Apply, BindId, BuiltIn, Event, ExecCtx, LambdaId, Node, Refs, Rt, Scope, UserEvent,
10};
11use graphix_rt::GXRt;
12use immutable_chunkmap::map::Map as CMap;
13use netidx::subscriber::Value;
14use netidx_core::utils::Either;
15use netidx_value::{FromValue, ValArray};
16use poolshark::local::LPooled;
17use std::{
18    any::Any,
19    collections::{hash_map::Entry, VecDeque},
20    fmt::Debug,
21    iter,
22    marker::PhantomData,
23    sync::LazyLock,
24    time::Duration,
25};
26use tokio::time::Instant;
27use triomphe::Arc as TArc;
28
29// ── Shared macros ──────────────────────────────────────────────────
30
31#[macro_export]
32macro_rules! deftype {
33    ($s:literal) => {
34        const TYP: ::std::sync::LazyLock<graphix_compiler::typ::FnType> =
35            ::std::sync::LazyLock::new(|| {
36                graphix_compiler::expr::parser::parse_fn_type($s)
37                    .expect("failed to parse fn type {s}")
38            });
39    };
40}
41
42#[macro_export]
43macro_rules! arity1 {
44    ($from:expr, $updates:expr) => {
45        match (&*$from, &*$updates) {
46            ([arg], [arg_up]) => (arg, arg_up),
47            (_, _) => unreachable!(),
48        }
49    };
50}
51
52#[macro_export]
53macro_rules! arity2 {
54    ($from:expr, $updates:expr) => {
55        match (&*$from, &*$updates) {
56            ([arg0, arg1], [arg0_up, arg1_up]) => ((arg0, arg1), (arg0_up, arg1_up)),
57            (_, _) => unreachable!(),
58        }
59    };
60}
61
62// ── Testing infrastructure ─────────────────────────────────────────
63
64pub mod testing;
65
66// ── Shared traits and structs ──────────────────────────────────────
67
68#[derive(Debug)]
69pub struct CachedVals(pub Box<[Option<Value>]>);
70
71impl CachedVals {
72    pub fn new<R: Rt, E: UserEvent>(from: &[Node<R, E>]) -> CachedVals {
73        CachedVals(from.into_iter().map(|_| None).collect())
74    }
75
76    pub fn clear(&mut self) {
77        for v in &mut self.0 {
78            *v = None
79        }
80    }
81
82    pub fn update<R: Rt, E: UserEvent>(
83        &mut self,
84        ctx: &mut ExecCtx<R, E>,
85        from: &mut [Node<R, E>],
86        event: &mut Event<E>,
87    ) -> bool {
88        from.into_iter().enumerate().fold(false, |res, (i, src)| {
89            match src.update(ctx, event) {
90                None => res,
91                v @ Some(_) => {
92                    self.0[i] = v;
93                    true
94                }
95            }
96        })
97    }
98
99    /// Like update, but return the indexes of the nodes that updated
100    /// instead of a consolidated bool
101    pub fn update_diff<R: Rt, E: UserEvent>(
102        &mut self,
103        up: &mut [bool],
104        ctx: &mut ExecCtx<R, E>,
105        from: &mut [Node<R, E>],
106        event: &mut Event<E>,
107    ) {
108        for (i, n) in from.iter_mut().enumerate() {
109            match n.update(ctx, event) {
110                None => (),
111                v => {
112                    self.0[i] = v;
113                    up[i] = true
114                }
115            }
116        }
117    }
118
119    pub fn flat_iter<'a>(&'a self) -> impl Iterator<Item = Option<Value>> + 'a {
120        self.0.iter().flat_map(|v| match v {
121            None => Either::Left(iter::once(None)),
122            Some(v) => Either::Right(v.clone().flatten().map(Some)),
123        })
124    }
125
126    pub fn get<T: FromValue>(&self, i: usize) -> Option<T> {
127        self.0.get(i).and_then(|v| v.as_ref()).and_then(|v| v.clone().cast_to::<T>().ok())
128    }
129}
130
131pub trait EvalCached: Debug + Default + Send + Sync + 'static {
132    const NAME: &str;
133    const TYP: LazyLock<FnType>;
134
135    fn eval(&mut self, from: &CachedVals) -> Option<Value>;
136}
137
138#[derive(Debug)]
139pub struct CachedArgs<T: EvalCached> {
140    cached: CachedVals,
141    t: T,
142}
143
144impl<R: Rt, E: UserEvent, T: EvalCached> BuiltIn<R, E> for CachedArgs<T> {
145    const NAME: &str = T::NAME;
146    const TYP: LazyLock<FnType> = T::TYP;
147
148    fn init<'a, 'b, 'c>(
149        _ctx: &'a mut ExecCtx<R, E>,
150        _typ: &'a graphix_compiler::typ::FnType,
151        _scope: &'b Scope,
152        from: &'c [Node<R, E>],
153        _top_id: ExprId,
154    ) -> Result<Box<dyn Apply<R, E>>> {
155        let t = CachedArgs::<T> { cached: CachedVals::new(from), t: T::default() };
156        Ok(Box::new(t))
157    }
158}
159
160impl<R: Rt, E: UserEvent, T: EvalCached> Apply<R, E> for CachedArgs<T> {
161    fn update(
162        &mut self,
163        ctx: &mut ExecCtx<R, E>,
164        from: &mut [Node<R, E>],
165        event: &mut Event<E>,
166    ) -> Option<Value> {
167        if self.cached.update(ctx, from, event) {
168            self.t.eval(&self.cached)
169        } else {
170            None
171        }
172    }
173
174    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {
175        self.cached.clear()
176    }
177}
178
179pub trait EvalCachedAsync: Debug + Default + Send + Sync + 'static {
180    const NAME: &str;
181    const TYP: LazyLock<FnType>;
182    type Args: Debug + Any + Send + Sync;
183
184    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args>;
185    fn eval(args: Self::Args) -> impl Future<Output = Value> + Send;
186}
187
188#[derive(Debug)]
189pub struct CachedArgsAsync<T: EvalCachedAsync> {
190    cached: CachedVals,
191    id: BindId,
192    top_id: ExprId,
193    queued: VecDeque<T::Args>,
194    running: bool,
195    t: T,
196}
197
198impl<R: Rt, E: UserEvent, T: EvalCachedAsync> BuiltIn<R, E> for CachedArgsAsync<T> {
199    const NAME: &str = T::NAME;
200    const TYP: LazyLock<FnType> = T::TYP;
201
202    fn init<'a, 'b, 'c>(
203        ctx: &'a mut ExecCtx<R, E>,
204        _typ: &'a graphix_compiler::typ::FnType,
205        _scope: &'b Scope,
206        from: &'c [Node<R, E>],
207        top_id: ExprId,
208    ) -> Result<Box<dyn Apply<R, E>>> {
209        let id = BindId::new();
210        ctx.rt.ref_var(id, top_id);
211        let t = CachedArgsAsync::<T> {
212            id,
213            top_id,
214            cached: CachedVals::new(from),
215            queued: VecDeque::new(),
216            running: false,
217            t: T::default(),
218        };
219        Ok(Box::new(t))
220    }
221}
222
223impl<R: Rt, E: UserEvent, T: EvalCachedAsync> Apply<R, E> for CachedArgsAsync<T> {
224    fn update(
225        &mut self,
226        ctx: &mut ExecCtx<R, E>,
227        from: &mut [Node<R, E>],
228        event: &mut Event<E>,
229    ) -> Option<Value> {
230        if self.cached.update(ctx, from, event)
231            && let Some(args) = self.t.prepare_args(&self.cached)
232        {
233            self.queued.push_back(args);
234        }
235        let res = event.variables.remove(&self.id).map(|v| {
236            self.running = false;
237            v
238        });
239        if !self.running
240            && let Some(args) = self.queued.pop_front()
241        {
242            self.running = true;
243            let id = self.id;
244            ctx.rt.spawn_var(async move { (id, T::eval(args).await) });
245        }
246        res
247    }
248
249    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
250        ctx.rt.unref_var(self.id, self.top_id);
251        self.queued.clear();
252        self.cached.clear();
253    }
254
255    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
256        self.delete(ctx);
257        let id = BindId::new();
258        ctx.rt.ref_var(id, self.top_id);
259    }
260}
261
262pub trait MapCollection: Debug + Clone + Default + Send + Sync + 'static {
263    /// return the length of the collection
264    fn len(&self) -> usize;
265
266    /// iterate the collection elements as values
267    fn iter_values(&self) -> impl Iterator<Item = Value>;
268
269    /// given a value, return Some if the value is the collection type
270    /// we are mapping.
271    fn select(v: Value) -> Option<Self>;
272
273    /// given a collection wrap it in a value
274    fn project(self) -> Value;
275
276    /// return the element type given the function type
277    fn etyp(ft: &FnType) -> Result<Type>;
278}
279
280impl MapCollection for ValArray {
281    fn iter_values(&self) -> impl Iterator<Item = Value> {
282        (**self).iter().cloned()
283    }
284
285    fn len(&self) -> usize {
286        (**self).len()
287    }
288
289    fn select(v: Value) -> Option<Self> {
290        match v {
291            Value::Array(a) => Some(a.clone()),
292            _ => None,
293        }
294    }
295
296    fn project(self) -> Value {
297        Value::Array(self)
298    }
299
300    fn etyp(ft: &FnType) -> Result<Type> {
301        match &ft.args[0].typ {
302            Type::Array(et) => Ok((**et).clone()),
303            _ => bail!("expected array"),
304        }
305    }
306}
307
308impl MapCollection for CMap<Value, Value, 32> {
309    fn iter_values(&self) -> impl Iterator<Item = Value> {
310        self.into_iter().map(|(k, v)| {
311            Value::Array(ValArray::from_iter_exact([k.clone(), v.clone()].into_iter()))
312        })
313    }
314
315    fn len(&self) -> usize {
316        CMap::len(self)
317    }
318
319    fn select(v: Value) -> Option<Self> {
320        match v {
321            Value::Map(m) => Some(m.clone()),
322            _ => None,
323        }
324    }
325
326    fn project(self) -> Value {
327        Value::Map(self)
328    }
329
330    fn etyp(ft: &FnType) -> Result<Type> {
331        match &ft.args[0].typ {
332            Type::Map { key, value } => {
333                Ok(Type::Tuple(TArc::from_iter([(**key).clone(), (**value).clone()])))
334            }
335            _ => bail!("expected Map, got {:?}", ft.args[0].typ),
336        }
337    }
338}
339
340pub trait MapFn<R: Rt, E: UserEvent>: Debug + Default + Send + Sync + 'static {
341    type Collection: MapCollection;
342
343    const NAME: &str;
344    const TYP: LazyLock<FnType>;
345
346    /// finish will be called when every lambda instance has produced
347    /// a value for the updated array. Out contains the output of the
348    /// predicate lambda for each index i, and a is the array. out and
349    /// a are guaranteed to have the same length. out\[i\].cur is
350    /// guaranteed to be Some.
351    fn finish(&mut self, slots: &[Slot<R, E>], a: &Self::Collection) -> Option<Value>;
352}
353
354#[derive(Debug)]
355pub struct Slot<R: Rt, E: UserEvent> {
356    id: BindId,
357    pred: Node<R, E>,
358    pub cur: Option<Value>,
359}
360
361impl<R: Rt, E: UserEvent> Slot<R, E> {
362    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
363        self.pred.delete(ctx);
364        ctx.cached.remove(&self.id);
365        ctx.env.unbind_variable(self.id);
366    }
367}
368
369#[derive(Debug)]
370pub struct MapQ<R: Rt, E: UserEvent, T: MapFn<R, E>> {
371    scope: Scope,
372    predid: BindId,
373    top_id: ExprId,
374    mftyp: TArc<FnType>,
375    etyp: Type,
376    slots: Vec<Slot<R, E>>,
377    cur: T::Collection,
378    t: T,
379}
380
381impl<R: Rt, E: UserEvent, T: MapFn<R, E>> BuiltIn<R, E> for MapQ<R, E, T> {
382    const NAME: &str = T::NAME;
383    const TYP: LazyLock<FnType> = T::TYP;
384
385    fn init<'a, 'b, 'c>(
386        _ctx: &'a mut ExecCtx<R, E>,
387        typ: &'a graphix_compiler::typ::FnType,
388        scope: &'b Scope,
389        from: &'c [Node<R, E>],
390        top_id: ExprId,
391    ) -> Result<Box<dyn Apply<R, E>>> {
392        match from {
393            [_, _] => Ok(Box::new(Self {
394                scope: scope.append(&format_compact!("fn{}", LambdaId::new().inner())),
395                predid: BindId::new(),
396                top_id,
397                etyp: T::Collection::etyp(typ)?,
398                mftyp: match &typ.args[1].typ {
399                    Type::Fn(ft) => ft.clone(),
400                    t => bail!("expected a function not {t}"),
401                },
402                slots: vec![],
403                cur: Default::default(),
404                t: T::default(),
405            })),
406            _ => bail!("expected two arguments"),
407        }
408    }
409}
410
411impl<R: Rt, E: UserEvent, T: MapFn<R, E>> Apply<R, E> for MapQ<R, E, T> {
412    fn update(
413        &mut self,
414        ctx: &mut ExecCtx<R, E>,
415        from: &mut [Node<R, E>],
416        event: &mut Event<E>,
417    ) -> Option<Value> {
418        let slen = self.slots.len();
419        if let Some(v) = from[1].update(ctx, event) {
420            ctx.cached.insert(self.predid, v.clone());
421            event.variables.insert(self.predid, v);
422        }
423        let (up, resized) =
424            match from[0].update(ctx, event).and_then(|v| T::Collection::select(v)) {
425                Some(a) if a.len() == slen => (Some(a), false),
426                Some(a) if a.len() < slen => {
427                    while self.slots.len() > a.len() {
428                        if let Some(mut s) = self.slots.pop() {
429                            s.delete(ctx)
430                        }
431                    }
432                    (Some(a), true)
433                }
434                Some(a) => {
435                    while self.slots.len() < a.len() {
436                        let (id, node) = genn::bind(
437                            ctx,
438                            &self.scope.lexical,
439                            "x",
440                            self.etyp.clone(),
441                            self.top_id,
442                        );
443                        let fargs = vec![node];
444                        let fnode = genn::reference(
445                            ctx,
446                            self.predid,
447                            Type::Fn(self.mftyp.clone()),
448                            self.top_id,
449                        );
450                        let pred = genn::apply(
451                            fnode,
452                            self.scope.clone(),
453                            fargs,
454                            &self.mftyp,
455                            self.top_id,
456                        );
457                        self.slots.push(Slot { id, pred, cur: None });
458                    }
459                    (Some(a), true)
460                }
461                None => (None, false),
462            };
463        if let Some(a) = up {
464            for (s, v) in self.slots.iter().zip(a.iter_values()) {
465                ctx.cached.insert(s.id, v.clone());
466                event.variables.insert(s.id, v);
467            }
468            self.cur = a.clone();
469            if a.len() == 0 {
470                return Some(T::Collection::project(a));
471            }
472        }
473        let init = event.init;
474        let mut up = resized;
475        for (i, s) in self.slots.iter_mut().enumerate() {
476            if i == slen {
477                // new nodes were added starting here
478                event.init = true;
479                if let Entry::Vacant(e) = event.variables.entry(self.predid)
480                    && let Some(v) = ctx.cached.get(&self.predid)
481                {
482                    e.insert(v.clone());
483                }
484            }
485            if let Some(v) = s.pred.update(ctx, event) {
486                s.cur = Some(v);
487                up = true;
488            }
489        }
490        event.init = init;
491        if up && self.slots.iter().all(|s| s.cur.is_some()) {
492            self.t.finish(&mut &self.slots, &self.cur)
493        } else {
494            None
495        }
496    }
497
498    fn typecheck(
499        &mut self,
500        ctx: &mut ExecCtx<R, E>,
501        _from: &mut [Node<R, E>],
502    ) -> anyhow::Result<()> {
503        let (_, node) =
504            genn::bind(ctx, &self.scope.lexical, "x", self.etyp.clone(), self.top_id);
505        let fargs = vec![node];
506        let ft = self.mftyp.clone();
507        let fnode = genn::reference(ctx, self.predid, Type::Fn(ft.clone()), self.top_id);
508        let mut node = genn::apply(fnode, self.scope.clone(), fargs, &ft, self.top_id);
509        let r = node.typecheck(ctx);
510        node.delete(ctx);
511        r
512    }
513
514    fn refs(&self, refs: &mut Refs) {
515        for s in &self.slots {
516            s.pred.refs(refs)
517        }
518    }
519
520    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
521        ctx.cached.remove(&self.predid);
522        for sl in &mut self.slots {
523            sl.delete(ctx)
524        }
525    }
526
527    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
528        self.cur = Default::default();
529        for sl in &mut self.slots {
530            sl.cur = None;
531            sl.pred.sleep(ctx);
532        }
533    }
534}
535
536pub trait FoldFn<R: Rt, E: UserEvent>: Debug + Send + Sync + 'static {
537    type Collection: MapCollection;
538
539    const NAME: &str;
540    const TYP: LazyLock<FnType>;
541}
542
543#[derive(Debug)]
544pub struct FoldQ<R: Rt, E: UserEvent, T: FoldFn<R, E>> {
545    top_id: ExprId,
546    fid: BindId,
547    scope: Scope,
548    binds: Vec<BindId>,
549    nodes: Vec<Node<R, E>>,
550    inits: Vec<Option<Value>>,
551    initids: Vec<BindId>,
552    initid: BindId,
553    mftype: TArc<FnType>,
554    etyp: Type,
555    ityp: Type,
556    init: Option<Value>,
557    t: PhantomData<T>,
558}
559
560impl<R: Rt, E: UserEvent, T: FoldFn<R, E>> BuiltIn<R, E> for FoldQ<R, E, T> {
561    const NAME: &str = T::NAME;
562    const TYP: LazyLock<FnType> = T::TYP;
563
564    fn init<'a, 'b, 'c>(
565        _ctx: &'a mut ExecCtx<R, E>,
566        typ: &'a graphix_compiler::typ::FnType,
567        scope: &'b Scope,
568        from: &'c [Node<R, E>],
569        top_id: ExprId,
570    ) -> Result<Box<dyn Apply<R, E>>> {
571        match from {
572            [_, _, _] => Ok(Box::new(Self {
573                top_id,
574                scope: scope.clone(),
575                binds: vec![],
576                nodes: vec![],
577                inits: vec![],
578                initids: vec![],
579                initid: BindId::new(),
580                fid: BindId::new(),
581                etyp: T::Collection::etyp(typ)?,
582                ityp: typ.args[1].typ.clone(),
583                mftype: match &typ.args[2].typ {
584                    Type::Fn(ft) => ft.clone(),
585                    t => bail!("expected a function not {t}"),
586                },
587                init: None,
588                t: PhantomData,
589            })),
590            _ => bail!("expected three arguments"),
591        }
592    }
593}
594
595impl<R: Rt, E: UserEvent, T: FoldFn<R, E>> Apply<R, E> for FoldQ<R, E, T> {
596    fn update(
597        &mut self,
598        ctx: &mut ExecCtx<R, E>,
599        from: &mut [Node<R, E>],
600        event: &mut Event<E>,
601    ) -> Option<Value> {
602        let init = match from[0].update(ctx, event).and_then(|v| T::Collection::select(v))
603        {
604            None => self.nodes.len(),
605            Some(a) if a.len() == self.binds.len() => {
606                for (id, v) in self.binds.iter().zip(a.iter_values()) {
607                    ctx.cached.insert(*id, v.clone());
608                    event.variables.insert(*id, v.clone());
609                }
610                self.nodes.len()
611            }
612            Some(a) => {
613                let vals = a.iter_values().collect::<LPooled<Vec<Value>>>();
614                while self.binds.len() < a.len() {
615                    self.binds.push(BindId::new());
616                    self.inits.push(None);
617                    self.initids.push(BindId::new());
618                }
619                while a.len() < self.binds.len() {
620                    if let Some(id) = self.binds.pop() {
621                        ctx.cached.remove(&id);
622                    }
623                    if let Some(id) = self.initids.pop() {
624                        ctx.cached.remove(&id);
625                    }
626                    self.inits.pop();
627                    if let Some(mut n) = self.nodes.pop() {
628                        n.delete(ctx);
629                    }
630                }
631                let init = self.nodes.len();
632                for i in 0..self.binds.len() {
633                    ctx.cached.insert(self.binds[i], vals[i].clone());
634                    event.variables.insert(self.binds[i], vals[i].clone());
635                    if i >= self.nodes.len() {
636                        let n = genn::reference(
637                            ctx,
638                            if i == 0 { self.initid } else { self.initids[i - 1] },
639                            self.ityp.clone(),
640                            self.top_id,
641                        );
642                        let x = genn::reference(
643                            ctx,
644                            self.binds[i],
645                            self.etyp.clone(),
646                            self.top_id,
647                        );
648                        let fnode = genn::reference(
649                            ctx,
650                            self.fid,
651                            Type::Fn(self.mftype.clone()),
652                            self.top_id,
653                        );
654                        let node = genn::apply(
655                            fnode,
656                            self.scope.clone(),
657                            vec![n, x],
658                            &self.mftype,
659                            self.top_id,
660                        );
661                        self.nodes.push(node);
662                    }
663                }
664                init
665            }
666        };
667        if let Some(v) = from[1].update(ctx, event) {
668            ctx.cached.insert(self.initid, v.clone());
669            event.variables.insert(self.initid, v.clone());
670            self.init = Some(v);
671        }
672        if let Some(v) = from[2].update(ctx, event) {
673            ctx.cached.insert(self.fid, v.clone());
674            event.variables.insert(self.fid, v);
675        }
676        let old_init = event.init;
677        for i in 0..self.nodes.len() {
678            if i == init {
679                event.init = true;
680                if let Some(v) = ctx.cached.get(&self.fid)
681                    && let Entry::Vacant(e) = event.variables.entry(self.fid)
682                {
683                    e.insert(v.clone());
684                }
685                if i == 0 {
686                    if let Some(v) = self.init.as_ref()
687                        && let Entry::Vacant(e) = event.variables.entry(self.initid)
688                    {
689                        e.insert(v.clone());
690                    }
691                } else {
692                    if let Some(v) = self.inits[i - 1].clone() {
693                        event.variables.insert(self.initids[i - 1], v);
694                    }
695                }
696            }
697            match self.nodes[i].update(ctx, event) {
698                Some(v) => {
699                    ctx.cached.insert(self.initids[i], v.clone());
700                    event.variables.insert(self.initids[i], v.clone());
701                    self.inits[i] = Some(v);
702                }
703                None => {
704                    ctx.cached.remove(&self.initids[i]);
705                    event.variables.remove(&self.initids[i]);
706                    self.inits[i] = None;
707                }
708            }
709        }
710        event.init = old_init;
711        self.inits.last().and_then(|v| v.clone())
712    }
713
714    fn typecheck(
715        &mut self,
716        ctx: &mut ExecCtx<R, E>,
717        _from: &mut [Node<R, E>],
718    ) -> anyhow::Result<()> {
719        let mut n = genn::reference(ctx, self.initid, self.ityp.clone(), self.top_id);
720        let x = genn::reference(ctx, BindId::new(), self.etyp.clone(), self.top_id);
721        let fnode =
722            genn::reference(ctx, self.fid, Type::Fn(self.mftype.clone()), self.top_id);
723        n = genn::apply(fnode, self.scope.clone(), vec![n, x], &self.mftype, self.top_id);
724        let r = n.typecheck(ctx);
725        n.delete(ctx);
726        r
727    }
728
729    fn refs(&self, refs: &mut Refs) {
730        for n in &self.nodes {
731            n.refs(refs)
732        }
733    }
734
735    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
736        let i =
737            iter::once(&self.initid).chain(self.binds.iter()).chain(self.initids.iter());
738        for id in i {
739            ctx.cached.remove(id);
740        }
741        for n in &mut self.nodes {
742            n.delete(ctx);
743        }
744    }
745
746    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
747        self.init = None;
748        for v in &mut self.inits {
749            *v = None
750        }
751        for n in &mut self.nodes {
752            n.sleep(ctx)
753        }
754    }
755}
756
757// ── Core builtins ──────────────────────────────────────────────────
758
759#[derive(Debug)]
760struct IsErr;
761
762impl<R: Rt, E: UserEvent> BuiltIn<R, E> for IsErr {
763    const NAME: &str = "core_is_err";
764    deftype!("fn(Any) -> bool");
765
766    fn init<'a, 'b, 'c>(
767        _ctx: &'a mut ExecCtx<R, E>,
768        _typ: &'a graphix_compiler::typ::FnType,
769        _scope: &'b Scope,
770        _from: &'c [Node<R, E>],
771        _top_id: ExprId,
772    ) -> Result<Box<dyn Apply<R, E>>> {
773        Ok(Box::new(IsErr))
774    }
775}
776
777impl<R: Rt, E: UserEvent> Apply<R, E> for IsErr {
778    fn update(
779        &mut self,
780        ctx: &mut ExecCtx<R, E>,
781        from: &mut [Node<R, E>],
782        event: &mut Event<E>,
783    ) -> Option<Value> {
784        from[0].update(ctx, event).map(|v| match v {
785            Value::Error(_) => Value::Bool(true),
786            _ => Value::Bool(false),
787        })
788    }
789
790    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {}
791}
792
793#[derive(Debug)]
794struct FilterErr;
795
796impl<R: Rt, E: UserEvent> BuiltIn<R, E> for FilterErr {
797    const NAME: &str = "core_filter_err";
798    deftype!("fn(Result<'a, 'b>) -> Error<'b>");
799
800    fn init<'a, 'b, 'c>(
801        _ctx: &'a mut ExecCtx<R, E>,
802        _typ: &'a graphix_compiler::typ::FnType,
803        _scope: &'b Scope,
804        _from: &'c [Node<R, E>],
805        _top_id: ExprId,
806    ) -> Result<Box<dyn Apply<R, E>>> {
807        Ok(Box::new(FilterErr))
808    }
809}
810
811impl<R: Rt, E: UserEvent> Apply<R, E> for FilterErr {
812    fn update(
813        &mut self,
814        ctx: &mut ExecCtx<R, E>,
815        from: &mut [Node<R, E>],
816        event: &mut Event<E>,
817    ) -> Option<Value> {
818        from[0].update(ctx, event).and_then(|v| match v {
819            v @ Value::Error(_) => Some(v),
820            _ => None,
821        })
822    }
823
824    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {}
825}
826
827#[derive(Debug)]
828struct ToError;
829
830impl<R: Rt, E: UserEvent> BuiltIn<R, E> for ToError {
831    const NAME: &str = "core_error";
832    deftype!("fn('a) -> Error<'a>");
833
834    fn init<'a, 'b, 'c>(
835        _ctx: &'a mut ExecCtx<R, E>,
836        _typ: &'a graphix_compiler::typ::FnType,
837        _scope: &'b Scope,
838        _from: &'c [Node<R, E>],
839        _top_id: ExprId,
840    ) -> Result<Box<dyn Apply<R, E>>> {
841        Ok(Box::new(ToError))
842    }
843}
844
845impl<R: Rt, E: UserEvent> Apply<R, E> for ToError {
846    fn update(
847        &mut self,
848        ctx: &mut ExecCtx<R, E>,
849        from: &mut [Node<R, E>],
850        event: &mut Event<E>,
851    ) -> Option<Value> {
852        from[0].update(ctx, event).map(|e| Value::Error(triomphe::Arc::new(e)))
853    }
854
855    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {}
856}
857
858#[derive(Debug)]
859struct Once {
860    val: bool,
861}
862
863impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Once {
864    const NAME: &str = "core_once";
865    deftype!("fn('a) -> 'a");
866
867    fn init<'a, 'b, 'c>(
868        _ctx: &'a mut ExecCtx<R, E>,
869        _typ: &'a graphix_compiler::typ::FnType,
870        _scope: &'b Scope,
871        _from: &'c [Node<R, E>],
872        _top_id: ExprId,
873    ) -> Result<Box<dyn Apply<R, E>>> {
874        Ok(Box::new(Once { val: false }))
875    }
876}
877
878impl<R: Rt, E: UserEvent> Apply<R, E> for Once {
879    fn update(
880        &mut self,
881        ctx: &mut ExecCtx<R, E>,
882        from: &mut [Node<R, E>],
883        event: &mut Event<E>,
884    ) -> Option<Value> {
885        match from {
886            [s] => s.update(ctx, event).and_then(|v| {
887                if self.val {
888                    None
889                } else {
890                    self.val = true;
891                    Some(v)
892                }
893            }),
894            _ => None,
895        }
896    }
897
898    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {
899        self.val = false
900    }
901}
902
903#[derive(Debug)]
904struct Take {
905    n: Option<usize>,
906}
907
908impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Take {
909    const NAME: &str = "core_take";
910    deftype!("fn(#n:Any, 'a) -> 'a");
911
912    fn init<'a, 'b, 'c>(
913        _ctx: &'a mut ExecCtx<R, E>,
914        _typ: &'a graphix_compiler::typ::FnType,
915        _scope: &'b Scope,
916        _from: &'c [Node<R, E>],
917        _top_id: ExprId,
918    ) -> Result<Box<dyn Apply<R, E>>> {
919        Ok(Box::new(Take { n: None }))
920    }
921}
922
923impl<R: Rt, E: UserEvent> Apply<R, E> for Take {
924    fn update(
925        &mut self,
926        ctx: &mut ExecCtx<R, E>,
927        from: &mut [Node<R, E>],
928        event: &mut Event<E>,
929    ) -> Option<Value> {
930        if let Some(n) =
931            from[0].update(ctx, event).and_then(|v| v.cast_to::<usize>().ok())
932        {
933            self.n = Some(n)
934        }
935        match from[1].update(ctx, event) {
936            None => None,
937            Some(v) => match &mut self.n {
938                None => None,
939                Some(n) if *n > 0 => {
940                    *n -= 1;
941                    return Some(v);
942                }
943                Some(_) => None,
944            },
945        }
946    }
947
948    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {
949        self.n = None
950    }
951}
952
953#[derive(Debug)]
954struct Skip {
955    n: Option<usize>,
956}
957
958impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Skip {
959    const NAME: &str = "core_skip";
960    deftype!("fn(#n:Any, 'a) -> 'a");
961
962    fn init<'a, 'b, 'c>(
963        _ctx: &'a mut ExecCtx<R, E>,
964        _typ: &'a graphix_compiler::typ::FnType,
965        _scope: &'b Scope,
966        _from: &'c [Node<R, E>],
967        _top_id: ExprId,
968    ) -> Result<Box<dyn Apply<R, E>>> {
969        Ok(Box::new(Skip { n: None }))
970    }
971}
972
973impl<R: Rt, E: UserEvent> Apply<R, E> for Skip {
974    fn update(
975        &mut self,
976        ctx: &mut ExecCtx<R, E>,
977        from: &mut [Node<R, E>],
978        event: &mut Event<E>,
979    ) -> Option<Value> {
980        if let Some(n) =
981            from[0].update(ctx, event).and_then(|v| v.cast_to::<usize>().ok())
982        {
983            self.n = Some(n)
984        }
985        match from[1].update(ctx, event) {
986            None => None,
987            Some(v) => match &mut self.n {
988                None => Some(v),
989                Some(n) if *n > 0 => {
990                    *n -= 1;
991                    None
992                }
993                Some(_) => Some(v),
994            },
995        }
996    }
997
998    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {
999        self.n = None
1000    }
1001}
1002
1003#[derive(Debug, Default)]
1004struct AllEv;
1005
1006impl EvalCached for AllEv {
1007    const NAME: &str = "core_all";
1008    deftype!("fn(@args: Any) -> Any");
1009
1010    fn eval(&mut self, from: &CachedVals) -> Option<Value> {
1011        match &*from.0 {
1012            [] => None,
1013            [hd, tl @ ..] => match hd {
1014                None => None,
1015                v @ Some(_) => {
1016                    if tl.into_iter().all(|v1| v1 == v) {
1017                        v.clone()
1018                    } else {
1019                        None
1020                    }
1021                }
1022            },
1023        }
1024    }
1025}
1026
1027type All = CachedArgs<AllEv>;
1028
1029fn add_vals(lhs: Option<Value>, rhs: Option<Value>) -> Option<Value> {
1030    match (lhs, rhs) {
1031        (None, None) | (Some(_), None) => None,
1032        (None, r @ Some(_)) => r,
1033        (Some(l), Some(r)) => Some(l + r),
1034    }
1035}
1036
1037#[derive(Debug, Default)]
1038struct SumEv;
1039
1040impl EvalCached for SumEv {
1041    const NAME: &str = "core_sum";
1042    deftype!("fn(@args: [Number, Array<[Number, Array<Number>]>]) -> Number");
1043
1044    fn eval(&mut self, from: &CachedVals) -> Option<Value> {
1045        from.flat_iter().fold(None, |res, v| match res {
1046            res @ Some(Value::Error(_)) => res,
1047            res => add_vals(res, v.clone()),
1048        })
1049    }
1050}
1051
1052type Sum = CachedArgs<SumEv>;
1053
1054#[derive(Debug, Default)]
1055struct ProductEv;
1056
1057fn prod_vals(lhs: Option<Value>, rhs: Option<Value>) -> Option<Value> {
1058    match (lhs, rhs) {
1059        (None, None) | (Some(_), None) => None,
1060        (None, r @ Some(_)) => r,
1061        (Some(l), Some(r)) => Some(l * r),
1062    }
1063}
1064
1065impl EvalCached for ProductEv {
1066    const NAME: &str = "core_product";
1067    deftype!("fn(@args: [Number, Array<[Number, Array<Number>]>]) -> Number");
1068
1069    fn eval(&mut self, from: &CachedVals) -> Option<Value> {
1070        from.flat_iter().fold(None, |res, v| match res {
1071            res @ Some(Value::Error(_)) => res,
1072            res => prod_vals(res, v.clone()),
1073        })
1074    }
1075}
1076
1077type Product = CachedArgs<ProductEv>;
1078
1079#[derive(Debug, Default)]
1080struct DivideEv;
1081
1082fn div_vals(lhs: Option<Value>, rhs: Option<Value>) -> Option<Value> {
1083    match (lhs, rhs) {
1084        (None, None) | (Some(_), None) => None,
1085        (None, r @ Some(_)) => r,
1086        (Some(l), Some(r)) => Some(l / r),
1087    }
1088}
1089
1090impl EvalCached for DivideEv {
1091    const NAME: &str = "core_divide";
1092    deftype!("fn(@args: [Number, Array<[Number, Array<Number>]>]) -> Number");
1093
1094    fn eval(&mut self, from: &CachedVals) -> Option<Value> {
1095        from.flat_iter().fold(None, |res, v| match res {
1096            res @ Some(Value::Error(_)) => res,
1097            res => div_vals(res, v.clone()),
1098        })
1099    }
1100}
1101
1102type Divide = CachedArgs<DivideEv>;
1103
1104#[derive(Debug, Default)]
1105struct MinEv;
1106
1107impl EvalCached for MinEv {
1108    const NAME: &str = "core_min";
1109    deftype!("fn('a, @args:'a) -> 'a");
1110
1111    fn eval(&mut self, from: &CachedVals) -> Option<Value> {
1112        let mut res = None;
1113        for v in from.flat_iter() {
1114            match (res, v) {
1115                (None, None) | (Some(_), None) => return None,
1116                (None, Some(v)) => {
1117                    res = Some(v);
1118                }
1119                (Some(v0), Some(v)) => {
1120                    res = if v < v0 { Some(v) } else { Some(v0) };
1121                }
1122            }
1123        }
1124        res
1125    }
1126}
1127
1128type Min = CachedArgs<MinEv>;
1129
1130#[derive(Debug, Default)]
1131struct MaxEv;
1132
1133impl EvalCached for MaxEv {
1134    const NAME: &str = "core_max";
1135    deftype!("fn('a, @args: 'a) -> 'a");
1136
1137    fn eval(&mut self, from: &CachedVals) -> Option<Value> {
1138        let mut res = None;
1139        for v in from.flat_iter() {
1140            match (res, v) {
1141                (None, None) | (Some(_), None) => return None,
1142                (None, Some(v)) => {
1143                    res = Some(v);
1144                }
1145                (Some(v0), Some(v)) => {
1146                    res = if v > v0 { Some(v) } else { Some(v0) };
1147                }
1148            }
1149        }
1150        res
1151    }
1152}
1153
1154type Max = CachedArgs<MaxEv>;
1155
1156#[derive(Debug, Default)]
1157struct AndEv;
1158
1159impl EvalCached for AndEv {
1160    const NAME: &str = "core_and";
1161    deftype!("fn(@args: bool) -> bool");
1162
1163    fn eval(&mut self, from: &CachedVals) -> Option<Value> {
1164        let mut res = Some(Value::Bool(true));
1165        for v in from.flat_iter() {
1166            match v {
1167                None => return None,
1168                Some(Value::Bool(true)) => (),
1169                Some(_) => {
1170                    res = Some(Value::Bool(false));
1171                }
1172            }
1173        }
1174        res
1175    }
1176}
1177
1178type And = CachedArgs<AndEv>;
1179
1180#[derive(Debug, Default)]
1181struct OrEv;
1182
1183impl EvalCached for OrEv {
1184    const NAME: &str = "core_or";
1185    deftype!("fn(@args: bool) -> bool");
1186
1187    fn eval(&mut self, from: &CachedVals) -> Option<Value> {
1188        let mut res = Some(Value::Bool(false));
1189        for v in from.flat_iter() {
1190            match v {
1191                None => return None,
1192                Some(Value::Bool(true)) => {
1193                    res = Some(Value::Bool(true));
1194                }
1195                Some(_) => (),
1196            }
1197        }
1198        res
1199    }
1200}
1201
1202type Or = CachedArgs<OrEv>;
1203
1204#[derive(Debug)]
1205struct Filter<R: Rt, E: UserEvent> {
1206    ready: bool,
1207    queue: VecDeque<Value>,
1208    pred: Node<R, E>,
1209    top_id: ExprId,
1210    fid: BindId,
1211    x: BindId,
1212    out: BindId,
1213}
1214
1215impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Filter<R, E> {
1216    const NAME: &str = "core_filter";
1217    deftype!("fn('a, fn('a) -> bool throws 'e) -> 'a throws 'e");
1218
1219    fn init<'a, 'b, 'c>(
1220        ctx: &'a mut ExecCtx<R, E>,
1221        typ: &'a graphix_compiler::typ::FnType,
1222        scope: &'b Scope,
1223        from: &'c [Node<R, E>],
1224        top_id: ExprId,
1225    ) -> Result<Box<dyn Apply<R, E>>> {
1226        match from {
1227            [_, _] => {
1228                let (x, xn) =
1229                    genn::bind(ctx, &scope.lexical, "x", typ.args[0].typ.clone(), top_id);
1230                let fid = BindId::new();
1231                let ptyp = match &typ.args[1].typ {
1232                    Type::Fn(ft) => ft.clone(),
1233                    t => bail!("expected a function not {t}"),
1234                };
1235                let fnode = genn::reference(ctx, fid, Type::Fn(ptyp.clone()), top_id);
1236                let pred = genn::apply(fnode, scope.clone(), vec![xn], &ptyp, top_id);
1237                let queue = VecDeque::new();
1238                let out = BindId::new();
1239                ctx.rt.ref_var(out, top_id);
1240                Ok(Box::new(Self { ready: true, queue, pred, fid, x, out, top_id }))
1241            }
1242            _ => bail!("expected two arguments"),
1243        }
1244    }
1245}
1246
1247impl<R: Rt, E: UserEvent> Apply<R, E> for Filter<R, E> {
1248    fn update(
1249        &mut self,
1250        ctx: &mut ExecCtx<R, E>,
1251        from: &mut [Node<R, E>],
1252        event: &mut Event<E>,
1253    ) -> Option<Value> {
1254        macro_rules! set {
1255            ($v:expr) => {{
1256                self.ready = false;
1257                ctx.cached.insert(self.x, $v.clone());
1258                event.variables.insert(self.x, $v);
1259            }};
1260        }
1261        macro_rules! maybe_cont {
1262            () => {{
1263                if let Some(v) = self.queue.front().cloned() {
1264                    set!(v);
1265                    continue;
1266                }
1267                break;
1268            }};
1269        }
1270        if let Some(v) = from[0].update(ctx, event) {
1271            self.queue.push_back(v);
1272        }
1273        if let Some(v) = from[1].update(ctx, event) {
1274            ctx.cached.insert(self.fid, v.clone());
1275            event.variables.insert(self.fid, v);
1276        }
1277        if self.ready && self.queue.len() > 0 {
1278            let v = self.queue.front().unwrap().clone();
1279            set!(v);
1280        }
1281        loop {
1282            match self.pred.update(ctx, event) {
1283                None => break,
1284                Some(v) => {
1285                    self.ready = true;
1286                    match v {
1287                        Value::Bool(true) => {
1288                            ctx.rt.set_var(self.out, self.queue.pop_front().unwrap());
1289                            maybe_cont!();
1290                        }
1291                        _ => {
1292                            let _ = self.queue.pop_front();
1293                            maybe_cont!();
1294                        }
1295                    }
1296                }
1297            }
1298        }
1299        event.variables.get(&self.out).map(|v| v.clone())
1300    }
1301
1302    fn typecheck(
1303        &mut self,
1304        ctx: &mut ExecCtx<R, E>,
1305        _from: &mut [Node<R, E>],
1306    ) -> anyhow::Result<()> {
1307        self.pred.typecheck(ctx)
1308    }
1309
1310    fn refs(&self, refs: &mut Refs) {
1311        self.pred.refs(refs)
1312    }
1313
1314    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
1315        ctx.cached.remove(&self.fid);
1316        ctx.cached.remove(&self.out);
1317        ctx.cached.remove(&self.x);
1318        ctx.env.unbind_variable(self.x);
1319        self.pred.delete(ctx);
1320        ctx.rt.unref_var(self.out, self.top_id)
1321    }
1322
1323    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
1324        ctx.rt.unref_var(self.out, self.top_id);
1325        self.out = BindId::new();
1326        ctx.rt.ref_var(self.out, self.top_id);
1327        self.queue.clear();
1328        self.pred.sleep(ctx);
1329    }
1330}
1331
1332#[derive(Debug)]
1333struct Queue {
1334    triggered: usize,
1335    queue: VecDeque<Value>,
1336    id: BindId,
1337    top_id: ExprId,
1338}
1339
1340impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Queue {
1341    const NAME: &str = "core_queue";
1342    deftype!("fn(#clock:Any, 'a) -> 'a");
1343
1344    fn init<'a, 'b, 'c>(
1345        ctx: &'a mut ExecCtx<R, E>,
1346        _typ: &'a graphix_compiler::typ::FnType,
1347        _scope: &'b Scope,
1348        from: &'c [Node<R, E>],
1349        top_id: ExprId,
1350    ) -> Result<Box<dyn Apply<R, E>>> {
1351        match from {
1352            [_, _] => {
1353                let id = BindId::new();
1354                ctx.rt.ref_var(id, top_id);
1355                Ok(Box::new(Self { triggered: 0, queue: VecDeque::new(), id, top_id }))
1356            }
1357            _ => bail!("expected two arguments"),
1358        }
1359    }
1360}
1361
1362impl<R: Rt, E: UserEvent> Apply<R, E> for Queue {
1363    fn update(
1364        &mut self,
1365        ctx: &mut ExecCtx<R, E>,
1366        from: &mut [Node<R, E>],
1367        event: &mut Event<E>,
1368    ) -> Option<Value> {
1369        if from[0].update(ctx, event).is_some() {
1370            self.triggered += 1;
1371        }
1372        if let Some(v) = from[1].update(ctx, event) {
1373            self.queue.push_back(v);
1374        }
1375        while self.triggered > 0 && self.queue.len() > 0 {
1376            self.triggered -= 1;
1377            ctx.rt.set_var(self.id, self.queue.pop_front().unwrap());
1378        }
1379        event.variables.get(&self.id).cloned()
1380    }
1381
1382    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
1383        ctx.rt.unref_var(self.id, self.top_id);
1384    }
1385
1386    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
1387        ctx.rt.unref_var(self.id, self.top_id);
1388        self.id = BindId::new();
1389        ctx.rt.ref_var(self.id, self.top_id);
1390        self.triggered = 0;
1391        self.queue.clear();
1392    }
1393}
1394
1395#[derive(Debug)]
1396struct Hold {
1397    triggered: usize,
1398    current: Option<Value>,
1399}
1400
1401impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Hold {
1402    const NAME: &str = "core_hold";
1403    deftype!("fn(#clock:Any, 'a) -> 'a");
1404
1405    fn init<'a, 'b, 'c>(
1406        _ctx: &'a mut ExecCtx<R, E>,
1407        _typ: &'a graphix_compiler::typ::FnType,
1408        _scope: &'b Scope,
1409        from: &'c [Node<R, E>],
1410        _top_id: ExprId,
1411    ) -> Result<Box<dyn Apply<R, E>>> {
1412        match from {
1413            [_, _] => Ok(Box::new(Self { triggered: 0, current: None })),
1414            _ => bail!("expected two arguments"),
1415        }
1416    }
1417}
1418
1419impl<R: Rt, E: UserEvent> Apply<R, E> for Hold {
1420    fn update(
1421        &mut self,
1422        ctx: &mut ExecCtx<R, E>,
1423        from: &mut [Node<R, E>],
1424        event: &mut Event<E>,
1425    ) -> Option<Value> {
1426        if from[0].update(ctx, event).is_some() {
1427            self.triggered += 1;
1428        }
1429        if let Some(v) = from[1].update(ctx, event) {
1430            self.current = Some(v);
1431        }
1432        if self.triggered > 0
1433            && let Some(v) = self.current.take()
1434        {
1435            self.triggered -= 1;
1436            Some(v)
1437        } else {
1438            None
1439        }
1440    }
1441
1442    fn delete(&mut self, _: &mut ExecCtx<R, E>) {}
1443
1444    fn sleep(&mut self, _: &mut ExecCtx<R, E>) {
1445        self.triggered = 0;
1446        self.current = None;
1447    }
1448}
1449
1450#[derive(Debug)]
1451struct Seq {
1452    id: BindId,
1453    top_id: ExprId,
1454    args: CachedVals,
1455}
1456
1457impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Seq {
1458    const NAME: &str = "core_seq";
1459    deftype!("fn(i64, i64) -> Result<i64, `SeqError(string)>");
1460
1461    fn init<'a, 'b, 'c>(
1462        ctx: &'a mut ExecCtx<R, E>,
1463        _typ: &'a graphix_compiler::typ::FnType,
1464        _scope: &'b Scope,
1465        from: &'c [Node<R, E>],
1466        top_id: ExprId,
1467    ) -> Result<Box<dyn Apply<R, E>>> {
1468        let id = BindId::new();
1469        ctx.rt.ref_var(id, top_id);
1470        let args = CachedVals::new(from);
1471        Ok(Box::new(Self { id, top_id, args }))
1472    }
1473}
1474
1475impl<R: Rt, E: UserEvent> Apply<R, E> for Seq {
1476    fn update(
1477        &mut self,
1478        ctx: &mut ExecCtx<R, E>,
1479        from: &mut [Node<R, E>],
1480        event: &mut Event<E>,
1481    ) -> Option<Value> {
1482        if self.args.update(ctx, from, event) {
1483            match &self.args.0[..] {
1484                [Some(Value::I64(i)), Some(Value::I64(j))] if i <= j => {
1485                    for v in *i..*j {
1486                        ctx.rt.set_var(self.id, Value::I64(v));
1487                    }
1488                }
1489                _ => {
1490                    let e = literal!("SeqError");
1491                    return Some(err!(e, "invalid args i must be <= j"));
1492                }
1493            }
1494        }
1495        event.variables.get(&self.id).cloned()
1496    }
1497
1498    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
1499        ctx.rt.unref_var(self.id, self.top_id);
1500    }
1501
1502    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
1503        ctx.rt.unref_var(self.id, self.top_id);
1504        self.id = BindId::new();
1505        ctx.rt.ref_var(self.id, self.top_id);
1506    }
1507}
1508
1509#[derive(Debug)]
1510struct Throttle {
1511    wait: Duration,
1512    last: Option<Instant>,
1513    tid: Option<BindId>,
1514    top_id: ExprId,
1515    args: CachedVals,
1516}
1517
1518impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Throttle {
1519    const NAME: &str = "core_throttle";
1520    deftype!("fn(?#rate:duration, 'a) -> 'a");
1521
1522    fn init<'a, 'b, 'c>(
1523        _ctx: &'a mut ExecCtx<R, E>,
1524        _typ: &'a graphix_compiler::typ::FnType,
1525        _scope: &'b Scope,
1526        from: &'c [Node<R, E>],
1527        top_id: ExprId,
1528    ) -> Result<Box<dyn Apply<R, E>>> {
1529        let args = CachedVals::new(from);
1530        Ok(Box::new(Self { wait: Duration::ZERO, last: None, tid: None, top_id, args }))
1531    }
1532}
1533
1534impl<R: Rt, E: UserEvent> Apply<R, E> for Throttle {
1535    fn update(
1536        &mut self,
1537        ctx: &mut ExecCtx<R, E>,
1538        from: &mut [Node<R, E>],
1539        event: &mut Event<E>,
1540    ) -> Option<Value> {
1541        macro_rules! maybe_schedule {
1542            ($last:expr) => {{
1543                let now = Instant::now();
1544                if now - *$last >= self.wait {
1545                    *$last = now;
1546                    return self.args.0[1].clone();
1547                } else {
1548                    let id = BindId::new();
1549                    ctx.rt.ref_var(id, self.top_id);
1550                    ctx.rt.set_timer(id, self.wait - (now - *$last));
1551                    self.tid = Some(id);
1552                    return None;
1553                }
1554            }};
1555        }
1556        let mut up = [false; 2];
1557        self.args.update_diff(&mut up, ctx, from, event);
1558        if up[0]
1559            && let Some(Value::Duration(d)) = &self.args.0[0]
1560        {
1561            self.wait = **d;
1562            if let Some(id) = self.tid.take()
1563                && let Some(last) = &mut self.last
1564            {
1565                ctx.rt.unref_var(id, self.top_id);
1566                maybe_schedule!(last)
1567            }
1568        }
1569        if up[1] && self.tid.is_none() {
1570            match &mut self.last {
1571                Some(last) => maybe_schedule!(last),
1572                None => {
1573                    self.last = Some(Instant::now());
1574                    return self.args.0[1].clone();
1575                }
1576            }
1577        }
1578        if let Some(id) = self.tid
1579            && let Some(_) = event.variables.get(&id)
1580        {
1581            ctx.rt.unref_var(id, self.top_id);
1582            self.tid = None;
1583            self.last = Some(Instant::now());
1584            return self.args.0[1].clone();
1585        }
1586        None
1587    }
1588
1589    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
1590        if let Some(id) = self.tid.take() {
1591            ctx.rt.unref_var(id, self.top_id);
1592        }
1593    }
1594
1595    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
1596        self.delete(ctx);
1597        self.last = None;
1598        self.wait = Duration::ZERO;
1599        self.args.clear();
1600    }
1601}
1602
1603#[derive(Debug)]
1604struct Count {
1605    count: i64,
1606}
1607
1608impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Count {
1609    const NAME: &str = "core_count";
1610    deftype!("fn(Any) -> i64");
1611
1612    fn init<'a, 'b, 'c>(
1613        _ctx: &'a mut ExecCtx<R, E>,
1614        _typ: &'a graphix_compiler::typ::FnType,
1615        _scope: &'b Scope,
1616        _from: &'c [Node<R, E>],
1617        _top_id: ExprId,
1618    ) -> Result<Box<dyn Apply<R, E>>> {
1619        Ok(Box::new(Count { count: 0 }))
1620    }
1621}
1622
1623impl<R: Rt, E: UserEvent> Apply<R, E> for Count {
1624    fn update(
1625        &mut self,
1626        ctx: &mut ExecCtx<R, E>,
1627        from: &mut [Node<R, E>],
1628        event: &mut Event<E>,
1629    ) -> Option<Value> {
1630        if from.into_iter().fold(false, |u, n| u || n.update(ctx, event).is_some()) {
1631            self.count += 1;
1632            Some(Value::I64(self.count))
1633        } else {
1634            None
1635        }
1636    }
1637
1638    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {
1639        self.count = 0
1640    }
1641}
1642
1643#[derive(Debug, Default)]
1644struct MeanEv;
1645
1646impl EvalCached for MeanEv {
1647    const NAME: &str = "core_mean";
1648    deftype!(
1649        "fn([Number, Array<Number>], @args: [Number, Array<Number>]) -> Result<f64, `MeanError(string)>"
1650    );
1651
1652    fn eval(&mut self, from: &CachedVals) -> Option<Value> {
1653        static TAG: ArcStr = literal!("MeanError");
1654        let mut total = 0.;
1655        let mut samples = 0;
1656        let mut error = None;
1657        for v in from.flat_iter() {
1658            if let Some(v) = v {
1659                match v.cast_to::<f64>() {
1660                    Err(e) => error = Some(errf!(TAG, "{e:?}")),
1661                    Ok(v) => {
1662                        total += v;
1663                        samples += 1;
1664                    }
1665                }
1666            }
1667        }
1668        if let Some(e) = error {
1669            Some(e)
1670        } else if samples == 0 {
1671            Some(err!(TAG, "mean requires at least one argument"))
1672        } else {
1673            Some(Value::F64(total / samples as f64))
1674        }
1675    }
1676}
1677
1678type Mean = CachedArgs<MeanEv>;
1679
1680#[derive(Debug)]
1681struct Uniq(Option<Value>);
1682
1683impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Uniq {
1684    const NAME: &str = "core_uniq";
1685    deftype!("fn('a) -> 'a");
1686
1687    fn init<'a, 'b, 'c>(
1688        _ctx: &'a mut ExecCtx<R, E>,
1689        _typ: &'a graphix_compiler::typ::FnType,
1690        _scope: &'b Scope,
1691        _from: &'c [Node<R, E>],
1692        _top_id: ExprId,
1693    ) -> Result<Box<dyn Apply<R, E>>> {
1694        Ok(Box::new(Uniq(None)))
1695    }
1696}
1697
1698impl<R: Rt, E: UserEvent> Apply<R, E> for Uniq {
1699    fn update(
1700        &mut self,
1701        ctx: &mut ExecCtx<R, E>,
1702        from: &mut [Node<R, E>],
1703        event: &mut Event<E>,
1704    ) -> Option<Value> {
1705        from[0].update(ctx, event).and_then(|v| {
1706            if Some(&v) != self.0.as_ref() {
1707                self.0 = Some(v.clone());
1708                Some(v)
1709            } else {
1710                None
1711            }
1712        })
1713    }
1714
1715    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {
1716        self.0 = None
1717    }
1718}
1719
1720#[derive(Debug)]
1721struct Never;
1722
1723impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Never {
1724    const NAME: &str = "core_never";
1725    deftype!("fn(@args: Any) -> 'a");
1726
1727    fn init<'a, 'b, 'c>(
1728        _ctx: &'a mut ExecCtx<R, E>,
1729        _typ: &'a graphix_compiler::typ::FnType,
1730        _scope: &'b Scope,
1731        _from: &'c [Node<R, E>],
1732        _top_id: ExprId,
1733    ) -> Result<Box<dyn Apply<R, E>>> {
1734        Ok(Box::new(Never))
1735    }
1736}
1737
1738impl<R: Rt, E: UserEvent> Apply<R, E> for Never {
1739    fn update(
1740        &mut self,
1741        ctx: &mut ExecCtx<R, E>,
1742        from: &mut [Node<R, E>],
1743        event: &mut Event<E>,
1744    ) -> Option<Value> {
1745        for n in from {
1746            n.update(ctx, event);
1747        }
1748        None
1749    }
1750
1751    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {}
1752}
1753
1754#[derive(Debug, Clone, Copy)]
1755enum Level {
1756    Trace,
1757    Debug,
1758    Info,
1759    Warn,
1760    Error,
1761}
1762
1763impl FromValue for Level {
1764    fn from_value(v: Value) -> Result<Self> {
1765        match &*v.cast_to::<ArcStr>()? {
1766            "Trace" => Ok(Self::Trace),
1767            "Debug" => Ok(Self::Debug),
1768            "Info" => Ok(Self::Info),
1769            "Warn" => Ok(Self::Warn),
1770            "Error" => Ok(Self::Error),
1771            v => bail!("invalid log level {v}"),
1772        }
1773    }
1774}
1775
1776#[derive(Debug, Clone, Copy)]
1777enum LogDest {
1778    Stdout,
1779    Stderr,
1780    Log(Level),
1781}
1782
1783impl FromValue for LogDest {
1784    fn from_value(v: Value) -> Result<Self> {
1785        match &*v.clone().cast_to::<ArcStr>()? {
1786            "Stdout" => Ok(Self::Stdout),
1787            "Stderr" => Ok(Self::Stderr),
1788            _ => Ok(Self::Log(v.cast_to()?)),
1789        }
1790    }
1791}
1792
1793#[derive(Debug)]
1794struct Dbg {
1795    spec: Expr,
1796    dest: LogDest,
1797    typ: Type,
1798}
1799
1800impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Dbg {
1801    const NAME: &str = "core_dbg";
1802    deftype!("fn(?#dest:[`Stdout, `Stderr, Log], 'a) -> 'a");
1803
1804    fn init<'a, 'b, 'c>(
1805        _ctx: &'a mut ExecCtx<R, E>,
1806        _typ: &'a graphix_compiler::typ::FnType,
1807        _scope: &'b Scope,
1808        from: &'c [Node<R, E>],
1809        _top_id: ExprId,
1810    ) -> Result<Box<dyn Apply<R, E>>> {
1811        Ok(Box::new(Dbg {
1812            spec: from[1].spec().clone(),
1813            dest: LogDest::Stderr,
1814            typ: Type::Bottom,
1815        }))
1816    }
1817}
1818
1819impl<R: Rt, E: UserEvent> Apply<R, E> for Dbg {
1820    fn update(
1821        &mut self,
1822        ctx: &mut ExecCtx<R, E>,
1823        from: &mut [Node<R, E>],
1824        event: &mut Event<E>,
1825    ) -> Option<Value> {
1826        if let Some(v) = from[0].update(ctx, event)
1827            && let Ok(d) = v.cast_to::<LogDest>()
1828        {
1829            self.dest = d;
1830        }
1831        from[1].update(ctx, event).map(|v| {
1832            let tv = TVal { env: &ctx.env, typ: &self.typ, v: &v };
1833            match self.dest {
1834                LogDest::Stderr => {
1835                    eprintln!("{} dbg({}): {}", self.spec.pos, self.spec, tv)
1836                }
1837                LogDest::Stdout => {
1838                    println!("{} dbg({}): {}", self.spec.pos, self.spec, tv)
1839                }
1840                LogDest::Log(level) => match level {
1841                    Level::Trace => {
1842                        log::trace!("{} dbg({}): {}", self.spec.pos, self.spec, tv)
1843                    }
1844                    Level::Debug => {
1845                        log::debug!("{} dbg({}): {}", self.spec.pos, self.spec, tv)
1846                    }
1847                    Level::Info => {
1848                        log::info!("{} dbg({}): {}", self.spec.pos, self.spec, tv)
1849                    }
1850                    Level::Warn => {
1851                        log::warn!("{} dbg({}): {}", self.spec.pos, self.spec, tv)
1852                    }
1853                    Level::Error => {
1854                        log::error!("{} dbg({}): {}", self.spec.pos, self.spec, tv)
1855                    }
1856                },
1857            };
1858            v
1859        })
1860    }
1861
1862    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {}
1863
1864    fn typecheck(
1865        &mut self,
1866        _ctx: &mut ExecCtx<R, E>,
1867        from: &mut [Node<R, E>],
1868    ) -> Result<()> {
1869        self.typ = from[1].typ().clone();
1870        Ok(())
1871    }
1872}
1873
1874#[derive(Debug)]
1875struct Log {
1876    scope: Scope,
1877    dest: LogDest,
1878}
1879
1880impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Log {
1881    const NAME: &str = "core_log";
1882    deftype!("fn(?#dest:Log, 'a) -> _");
1883
1884    fn init<'a, 'b, 'c>(
1885        _ctx: &'a mut ExecCtx<R, E>,
1886        _typ: &'a graphix_compiler::typ::FnType,
1887        scope: &'b Scope,
1888        _from: &'c [Node<R, E>],
1889        _top_id: ExprId,
1890    ) -> Result<Box<dyn Apply<R, E>>> {
1891        Ok(Box::new(Self { scope: scope.clone(), dest: LogDest::Stdout }))
1892    }
1893}
1894
1895impl<R: Rt, E: UserEvent> Apply<R, E> for Log {
1896    fn update(
1897        &mut self,
1898        ctx: &mut ExecCtx<R, E>,
1899        from: &mut [Node<R, E>],
1900        event: &mut Event<E>,
1901    ) -> Option<Value> {
1902        if let Some(v) = from[0].update(ctx, event)
1903            && let Ok(d) = v.cast_to::<LogDest>()
1904        {
1905            self.dest = d;
1906        }
1907        if let Some(v) = from[1].update(ctx, event) {
1908            let tv = TVal { env: &ctx.env, typ: from[1].typ(), v: &v };
1909            match self.dest {
1910                LogDest::Stdout => println!("{}: {}", self.scope.lexical, tv),
1911                LogDest::Stderr => eprintln!("{}: {}", self.scope.lexical, tv),
1912                LogDest::Log(lvl) => match lvl {
1913                    Level::Trace => log::trace!("{}: {}", self.scope.lexical, tv),
1914                    Level::Debug => log::debug!("{}: {}", self.scope.lexical, tv),
1915                    Level::Info => log::info!("{}: {}", self.scope.lexical, tv),
1916                    Level::Warn => log::warn!("{}: {}", self.scope.lexical, tv),
1917                    Level::Error => log::error!("{}: {}", self.scope.lexical, tv),
1918                },
1919            }
1920        }
1921        None
1922    }
1923
1924    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {}
1925}
1926
1927macro_rules! printfn {
1928    ($type:ident, $name:literal, $print:ident, $eprint:ident) => {
1929        #[derive(Debug)]
1930        struct $type {
1931            dest: LogDest,
1932            buf: String,
1933        }
1934
1935        impl<R: Rt, E: UserEvent> BuiltIn<R, E> for $type {
1936            const NAME: &str = $name;
1937            deftype!("fn(?#dest:Log, 'a) -> _");
1938
1939            fn init<'a, 'b, 'c>(
1940                _ctx: &'a mut ExecCtx<R, E>,
1941                _typ: &'a graphix_compiler::typ::FnType,
1942                _scope: &'b Scope,
1943                _from: &'c [Node<R, E>],
1944                _top_id: ExprId,
1945            ) -> Result<Box<dyn Apply<R, E>>> {
1946                Ok(Box::new(Self { dest: LogDest::Stdout, buf: String::new() }))
1947            }
1948        }
1949
1950        impl<R: Rt, E: UserEvent> Apply<R, E> for $type {
1951            fn update(
1952                &mut self,
1953                ctx: &mut ExecCtx<R, E>,
1954                from: &mut [Node<R, E>],
1955                event: &mut Event<E>,
1956            ) -> Option<Value> {
1957                use std::fmt::Write;
1958                if let Some(v) = from[0].update(ctx, event)
1959                    && let Ok(d) = v.cast_to::<LogDest>()
1960                {
1961                    self.dest = d;
1962                }
1963                if let Some(v) = from[1].update(ctx, event) {
1964                    self.buf.clear();
1965                    match v {
1966                        Value::String(s) => write!(self.buf, "{s}"),
1967                        v => write!(
1968                            self.buf,
1969                            "{}",
1970                            TVal { env: &ctx.env, typ: &from[1].typ(), v: &v }
1971                        ),
1972                    }
1973                    .unwrap();
1974                    match self.dest {
1975                        LogDest::Stdout => $print!("{}", self.buf),
1976                        LogDest::Stderr => $eprint!("{}", self.buf),
1977                        LogDest::Log(lvl) => match lvl {
1978                            Level::Trace => log::trace!("{}", self.buf),
1979                            Level::Debug => log::debug!("{}", self.buf),
1980                            Level::Info => log::info!("{}", self.buf),
1981                            Level::Warn => log::warn!("{}", self.buf),
1982                            Level::Error => log::error!("{}", self.buf),
1983                        },
1984                    }
1985                }
1986                None
1987            }
1988
1989            fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {}
1990        }
1991    };
1992}
1993
1994printfn!(Print, "core_print", print, eprint);
1995printfn!(Println, "core_println", println, eprintln);
1996
1997// ── Package registration ───────────────────────────────────────────
1998
1999graphix_derive::defpackage! {
2000    builtins => [
2001        IsErr,
2002        FilterErr,
2003        ToError,
2004        Once,
2005        Take,
2006        Skip,
2007        All,
2008        Sum,
2009        Product,
2010        Divide,
2011        Min,
2012        Max,
2013        And,
2014        Or,
2015        Filter as Filter<GXRt<X>, X::UserEvent>,
2016        Queue,
2017        Hold,
2018        Seq,
2019        Throttle,
2020        Count,
2021        Mean,
2022        Uniq,
2023        Never,
2024        Dbg,
2025        Log,
2026        Print,
2027        Println,
2028    ],
2029}