graphix_stdlib/
lib.rs

1use anyhow::{bail, Result};
2use arcstr::ArcStr;
3use compact_str::format_compact;
4use enumflags2::{bitflags, BitFlags};
5use fxhash::FxHashMap;
6use graphix_compiler::{
7    expr::{ExprId, ModuleResolver},
8    node::genn,
9    typ::{FnType, Type},
10    Apply, BindId, BuiltIn, BuiltInInitFn, Event, ExecCtx, LambdaId, Node, Refs, Rt,
11    Scope, UserEvent,
12};
13use netidx::{path::Path, subscriber::Value};
14use netidx_core::utils::Either;
15use poolshark::local::LPooled;
16use std::{
17    collections::hash_map::Entry,
18    fmt::Debug,
19    iter,
20    marker::PhantomData,
21    sync::{Arc, LazyLock},
22};
23use triomphe::Arc as TArc;
24
25mod array;
26mod core;
27mod map;
28mod net;
29mod rand;
30mod re;
31mod str;
32#[cfg(test)]
33mod test;
34mod time;
35
36#[macro_export]
37macro_rules! deftype {
38    ($scope:literal, $s:literal) => {
39        const TYP: ::std::sync::LazyLock<graphix_compiler::typ::FnType> =
40            ::std::sync::LazyLock::new(|| {
41                let scope =
42                    graphix_compiler::expr::ModPath(::netidx::path::Path::from($scope));
43                graphix_compiler::expr::parser::parse_fn_type($s)
44                    .expect("failed to parse fn type {s}")
45                    .scope_refs(&scope)
46            });
47    };
48}
49
50#[macro_export]
51macro_rules! arity1 {
52    ($from:expr, $updates:expr) => {
53        match (&*$from, &*$updates) {
54            ([arg], [arg_up]) => (arg, arg_up),
55            (_, _) => unreachable!(),
56        }
57    };
58}
59
60#[macro_export]
61macro_rules! arity2 {
62    ($from:expr, $updates:expr) => {
63        match (&*$from, &*$updates) {
64            ([arg0, arg1], [arg0_up, arg1_up]) => ((arg0, arg1), (arg0_up, arg1_up)),
65            (_, _) => unreachable!(),
66        }
67    };
68}
69
70#[derive(Debug)]
71pub struct CachedVals(pub Box<[Option<Value>]>);
72
73impl CachedVals {
74    pub fn new<R: Rt, E: UserEvent>(from: &[Node<R, E>]) -> CachedVals {
75        CachedVals(from.into_iter().map(|_| None).collect())
76    }
77
78    pub fn clear(&mut self) {
79        for v in &mut self.0 {
80            *v = None
81        }
82    }
83
84    pub fn update<R: Rt, E: UserEvent>(
85        &mut self,
86        ctx: &mut ExecCtx<R, E>,
87        from: &mut [Node<R, E>],
88        event: &mut Event<E>,
89    ) -> bool {
90        from.into_iter().enumerate().fold(false, |res, (i, src)| {
91            match src.update(ctx, event) {
92                None => res,
93                v @ Some(_) => {
94                    self.0[i] = v;
95                    true
96                }
97            }
98        })
99    }
100
101    /// Like update, but return the indexes of the nodes that updated
102    /// instead of a consolidated bool
103    pub fn update_diff<R: Rt, E: UserEvent>(
104        &mut self,
105        up: &mut [bool],
106        ctx: &mut ExecCtx<R, E>,
107        from: &mut [Node<R, E>],
108        event: &mut Event<E>,
109    ) {
110        for (i, n) in from.iter_mut().enumerate() {
111            match n.update(ctx, event) {
112                None => (),
113                v => {
114                    self.0[i] = v;
115                    up[i] = true
116                }
117            }
118        }
119    }
120
121    pub fn flat_iter<'a>(&'a self) -> impl Iterator<Item = Option<Value>> + 'a {
122        self.0.iter().flat_map(|v| match v {
123            None => Either::Left(iter::once(None)),
124            Some(v) => Either::Right(v.clone().flatten().map(Some)),
125        })
126    }
127}
128
129pub trait EvalCached: Debug + Default + Send + Sync + 'static {
130    const NAME: &str;
131    const TYP: LazyLock<FnType>;
132
133    fn eval(&mut self, from: &CachedVals) -> Option<Value>;
134}
135
136#[derive(Debug)]
137pub struct CachedArgs<T: EvalCached> {
138    cached: CachedVals,
139    t: T,
140}
141
142impl<R: Rt, E: UserEvent, T: EvalCached> BuiltIn<R, E> for CachedArgs<T> {
143    const NAME: &str = T::NAME;
144    const TYP: LazyLock<FnType> = T::TYP;
145
146    fn init(_: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E> {
147        Arc::new(|_, _, _, from, _| {
148            let t = CachedArgs::<T> { cached: CachedVals::new(from), t: T::default() };
149            Ok(Box::new(t))
150        })
151    }
152}
153
154impl<R: Rt, E: UserEvent, T: EvalCached> Apply<R, E> for CachedArgs<T> {
155    fn update(
156        &mut self,
157        ctx: &mut ExecCtx<R, E>,
158        from: &mut [Node<R, E>],
159        event: &mut Event<E>,
160    ) -> Option<Value> {
161        if self.cached.update(ctx, from, event) {
162            self.t.eval(&self.cached)
163        } else {
164            None
165        }
166    }
167
168    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {
169        self.cached.clear()
170    }
171}
172
173pub trait MapCollection: Debug + Clone + Default + Send + Sync + 'static {
174    /// return the length of the collection
175    fn len(&self) -> usize;
176
177    /// iterate the collection elements as values
178    fn iter_values(&self) -> impl Iterator<Item = Value>;
179
180    /// given a value, return Some if the value is the collection type
181    /// we are mapping.
182    fn select(v: Value) -> Option<Self>;
183
184    /// given a collection wrap it in a value
185    fn project(self) -> Value;
186
187    /// return the element type given the function type
188    fn etyp(ft: &FnType) -> Result<Type>;
189}
190
191pub trait MapFn<R: Rt, E: UserEvent>: Debug + Default + Send + Sync + 'static {
192    type Collection: MapCollection;
193
194    const NAME: &str;
195    const TYP: LazyLock<FnType>;
196
197    /// finish will be called when every lambda instance has produced
198    /// a value for the updated array. Out contains the output of the
199    /// predicate lambda for each index i, and a is the array. out and
200    /// a are guaranteed to have the same length. out[i].cur is
201    /// guaranteed to be Some.
202    fn finish(&mut self, slots: &[Slot<R, E>], a: &Self::Collection) -> Option<Value>;
203}
204
205#[derive(Debug)]
206pub struct Slot<R: Rt, E: UserEvent> {
207    id: BindId,
208    pred: Node<R, E>,
209    pub cur: Option<Value>,
210}
211
212impl<R: Rt, E: UserEvent> Slot<R, E> {
213    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
214        self.pred.delete(ctx);
215        ctx.cached.remove(&self.id);
216        ctx.env.unbind_variable(self.id);
217    }
218}
219
220#[derive(Debug)]
221pub struct MapQ<R: Rt, E: UserEvent, T: MapFn<R, E>> {
222    scope: Scope,
223    predid: BindId,
224    top_id: ExprId,
225    mftyp: TArc<FnType>,
226    etyp: Type,
227    slots: Vec<Slot<R, E>>,
228    cur: T::Collection,
229    t: T,
230}
231
232impl<R: Rt, E: UserEvent, T: MapFn<R, E>> BuiltIn<R, E> for MapQ<R, E, T> {
233    const NAME: &str = T::NAME;
234    const TYP: LazyLock<FnType> = T::TYP;
235
236    fn init(_: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E> {
237        Arc::new(|_ctx, typ, scope, from, top_id| match from {
238            [_, _] => Ok(Box::new(Self {
239                scope: scope.append(&format_compact!("fn{}", LambdaId::new().inner())),
240                predid: BindId::new(),
241                top_id,
242                etyp: T::Collection::etyp(typ)?,
243                mftyp: match &typ.args[1].typ {
244                    Type::Fn(ft) => ft.clone(),
245                    t => bail!("expected a function not {t}"),
246                },
247                slots: vec![],
248                cur: Default::default(),
249                t: T::default(),
250            })),
251            _ => bail!("expected two arguments"),
252        })
253    }
254}
255
256impl<R: Rt, E: UserEvent, T: MapFn<R, E>> Apply<R, E> for MapQ<R, E, T> {
257    fn update(
258        &mut self,
259        ctx: &mut ExecCtx<R, E>,
260        from: &mut [Node<R, E>],
261        event: &mut Event<E>,
262    ) -> Option<Value> {
263        let slen = self.slots.len();
264        if let Some(v) = from[1].update(ctx, event) {
265            ctx.cached.insert(self.predid, v.clone());
266            event.variables.insert(self.predid, v);
267        }
268        let (up, resized) =
269            match from[0].update(ctx, event).and_then(|v| T::Collection::select(v)) {
270                Some(a) if a.len() == slen => (Some(a), false),
271                Some(a) if a.len() < slen => {
272                    while self.slots.len() > a.len() {
273                        if let Some(mut s) = self.slots.pop() {
274                            s.delete(ctx)
275                        }
276                    }
277                    (Some(a), true)
278                }
279                Some(a) => {
280                    while self.slots.len() < a.len() {
281                        let (id, node) = genn::bind(
282                            ctx,
283                            &self.scope.lexical,
284                            "x",
285                            self.etyp.clone(),
286                            self.top_id,
287                        );
288                        let fargs = vec![node];
289                        let fnode = genn::reference(
290                            ctx,
291                            self.predid,
292                            Type::Fn(self.mftyp.clone()),
293                            self.top_id,
294                        );
295                        let pred = genn::apply(
296                            fnode,
297                            self.scope.clone(),
298                            fargs,
299                            &self.mftyp,
300                            self.top_id,
301                        );
302                        self.slots.push(Slot { id, pred, cur: None });
303                    }
304                    (Some(a), true)
305                }
306                None => (None, false),
307            };
308        if let Some(a) = up {
309            for (s, v) in self.slots.iter().zip(a.iter_values()) {
310                ctx.cached.insert(s.id, v.clone());
311                event.variables.insert(s.id, v);
312            }
313            self.cur = a.clone();
314            if a.len() == 0 {
315                return Some(T::Collection::project(a));
316            }
317        }
318        let init = event.init;
319        let mut up = resized;
320        for (i, s) in self.slots.iter_mut().enumerate() {
321            if i == slen {
322                // new nodes were added starting here
323                event.init = true;
324                if let Entry::Vacant(e) = event.variables.entry(self.predid)
325                    && let Some(v) = ctx.cached.get(&self.predid)
326                {
327                    e.insert(v.clone());
328                }
329            }
330            if let Some(v) = s.pred.update(ctx, event) {
331                s.cur = Some(v);
332                up = true;
333            }
334        }
335        event.init = init;
336        if up && self.slots.iter().all(|s| s.cur.is_some()) {
337            self.t.finish(&mut &self.slots, &self.cur)
338        } else {
339            None
340        }
341    }
342
343    fn typecheck(
344        &mut self,
345        ctx: &mut ExecCtx<R, E>,
346        _from: &mut [Node<R, E>],
347    ) -> anyhow::Result<()> {
348        let (_, node) =
349            genn::bind(ctx, &self.scope.lexical, "x", self.etyp.clone(), self.top_id);
350        let fargs = vec![node];
351        let ft = self.mftyp.clone();
352        let fnode = genn::reference(ctx, self.predid, Type::Fn(ft.clone()), self.top_id);
353        let mut node = genn::apply(fnode, self.scope.clone(), fargs, &ft, self.top_id);
354        let r = node.typecheck(ctx);
355        node.delete(ctx);
356        r
357    }
358
359    fn refs(&self, refs: &mut Refs) {
360        for s in &self.slots {
361            s.pred.refs(refs)
362        }
363    }
364
365    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
366        ctx.cached.remove(&self.predid);
367        for sl in &mut self.slots {
368            sl.delete(ctx)
369        }
370    }
371
372    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
373        self.cur = Default::default();
374        for sl in &mut self.slots {
375            sl.cur = None;
376            sl.pred.sleep(ctx);
377        }
378    }
379}
380
381pub trait FoldFn<R: Rt, E: UserEvent>: Debug + Send + Sync + 'static {
382    type Collection: MapCollection;
383
384    const NAME: &str;
385    const TYP: LazyLock<FnType>;
386}
387
388#[derive(Debug)]
389pub struct FoldQ<R: Rt, E: UserEvent, T: FoldFn<R, E>> {
390    top_id: ExprId,
391    fid: BindId,
392    scope: Scope,
393    binds: Vec<BindId>,
394    nodes: Vec<Node<R, E>>,
395    inits: Vec<Option<Value>>,
396    initids: Vec<BindId>,
397    initid: BindId,
398    mftype: TArc<FnType>,
399    etyp: Type,
400    ityp: Type,
401    init: Option<Value>,
402    t: PhantomData<T>,
403}
404
405impl<R: Rt, E: UserEvent, T: FoldFn<R, E>> BuiltIn<R, E> for FoldQ<R, E, T> {
406    const NAME: &str = T::NAME;
407    const TYP: LazyLock<FnType> = T::TYP;
408
409    fn init(_: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E> {
410        Arc::new(|_ctx, typ, scope, from, top_id| match from {
411            [_, _, _] => Ok(Box::new(Self {
412                top_id,
413                scope: scope.clone(),
414                binds: vec![],
415                nodes: vec![],
416                inits: vec![],
417                initids: vec![],
418                initid: BindId::new(),
419                fid: BindId::new(),
420                etyp: T::Collection::etyp(typ)?,
421                ityp: typ.args[1].typ.clone(),
422                mftype: match &typ.args[2].typ {
423                    Type::Fn(ft) => ft.clone(),
424                    t => bail!("expected a function not {t}"),
425                },
426                init: None,
427                t: PhantomData,
428            })),
429            _ => bail!("expected three arguments"),
430        })
431    }
432}
433
434impl<R: Rt, E: UserEvent, T: FoldFn<R, E>> Apply<R, E> for FoldQ<R, E, T> {
435    fn update(
436        &mut self,
437        ctx: &mut ExecCtx<R, E>,
438        from: &mut [Node<R, E>],
439        event: &mut Event<E>,
440    ) -> Option<Value> {
441        let init = match from[0].update(ctx, event).and_then(|v| T::Collection::select(v))
442        {
443            None => self.nodes.len(),
444            Some(a) if a.len() == self.binds.len() => {
445                for (id, v) in self.binds.iter().zip(a.iter_values()) {
446                    ctx.cached.insert(*id, v.clone());
447                    event.variables.insert(*id, v.clone());
448                }
449                self.nodes.len()
450            }
451            Some(a) => {
452                let vals = a.iter_values().collect::<LPooled<Vec<Value>>>();
453                while self.binds.len() < a.len() {
454                    self.binds.push(BindId::new());
455                    self.inits.push(None);
456                    self.initids.push(BindId::new());
457                }
458                while a.len() < self.binds.len() {
459                    if let Some(id) = self.binds.pop() {
460                        ctx.cached.remove(&id);
461                    }
462                    if let Some(id) = self.initids.pop() {
463                        ctx.cached.remove(&id);
464                    }
465                    self.inits.pop();
466                    if let Some(mut n) = self.nodes.pop() {
467                        n.delete(ctx);
468                    }
469                }
470                let init = self.nodes.len();
471                for i in 0..self.binds.len() {
472                    ctx.cached.insert(self.binds[i], vals[i].clone());
473                    event.variables.insert(self.binds[i], vals[i].clone());
474                    if i >= self.nodes.len() {
475                        let n = genn::reference(
476                            ctx,
477                            if i == 0 { self.initid } else { self.initids[i - 1] },
478                            self.ityp.clone(),
479                            self.top_id,
480                        );
481                        let x = genn::reference(
482                            ctx,
483                            self.binds[i],
484                            self.etyp.clone(),
485                            self.top_id,
486                        );
487                        let fnode = genn::reference(
488                            ctx,
489                            self.fid,
490                            Type::Fn(self.mftype.clone()),
491                            self.top_id,
492                        );
493                        let node = genn::apply(
494                            fnode,
495                            self.scope.clone(),
496                            vec![n, x],
497                            &self.mftype,
498                            self.top_id,
499                        );
500                        self.nodes.push(node);
501                    }
502                }
503                init
504            }
505        };
506        if let Some(v) = from[1].update(ctx, event) {
507            ctx.cached.insert(self.initid, v.clone());
508            event.variables.insert(self.initid, v.clone());
509            self.init = Some(v);
510        }
511        if let Some(v) = from[2].update(ctx, event) {
512            ctx.cached.insert(self.fid, v.clone());
513            event.variables.insert(self.fid, v);
514        }
515        let old_init = event.init;
516        for i in 0..self.nodes.len() {
517            if i == init {
518                event.init = true;
519                if let Some(v) = ctx.cached.get(&self.fid)
520                    && let Entry::Vacant(e) = event.variables.entry(self.fid)
521                {
522                    e.insert(v.clone());
523                }
524                if i == 0 {
525                    if let Some(v) = self.init.as_ref()
526                        && let Entry::Vacant(e) = event.variables.entry(self.initid)
527                    {
528                        e.insert(v.clone());
529                    }
530                } else {
531                    if let Some(v) = self.inits[i - 1].clone() {
532                        event.variables.insert(self.initids[i - 1], v);
533                    }
534                }
535            }
536            match self.nodes[i].update(ctx, event) {
537                Some(v) => {
538                    ctx.cached.insert(self.initids[i], v.clone());
539                    event.variables.insert(self.initids[i], v.clone());
540                    self.inits[i] = Some(v);
541                }
542                None => {
543                    ctx.cached.remove(&self.initids[i]);
544                    event.variables.remove(&self.initids[i]);
545                    self.inits[i] = None;
546                }
547            }
548        }
549        event.init = old_init;
550        self.inits.last().and_then(|v| v.clone())
551    }
552
553    fn typecheck(
554        &mut self,
555        ctx: &mut ExecCtx<R, E>,
556        _from: &mut [Node<R, E>],
557    ) -> anyhow::Result<()> {
558        let mut n = genn::reference(ctx, self.initid, self.ityp.clone(), self.top_id);
559        let x = genn::reference(ctx, BindId::new(), self.etyp.clone(), self.top_id);
560        let fnode =
561            genn::reference(ctx, self.fid, Type::Fn(self.mftype.clone()), self.top_id);
562        n = genn::apply(fnode, self.scope.clone(), vec![n, x], &self.mftype, self.top_id);
563        let r = n.typecheck(ctx);
564        n.delete(ctx);
565        r
566    }
567
568    fn refs(&self, refs: &mut Refs) {
569        for n in &self.nodes {
570            n.refs(refs)
571        }
572    }
573
574    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
575        let i =
576            iter::once(&self.initid).chain(self.binds.iter()).chain(self.initids.iter());
577        for id in i {
578            ctx.cached.remove(id);
579        }
580        for n in &mut self.nodes {
581            n.delete(ctx);
582        }
583    }
584
585    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
586        self.init = None;
587        for v in &mut self.inits {
588            *v = None
589        }
590        for n in &mut self.nodes {
591            n.sleep(ctx)
592        }
593    }
594}
595
596#[bitflags]
597#[derive(Clone, Copy)]
598#[repr(u64)]
599pub enum Module {
600    Array,
601    Map,
602    NetAndTime,
603    Rand,
604    Re,
605    Str,
606}
607
608/// Register selected modules of the standard graphix library
609///
610/// and return a root module that will load them along with a module resolver
611/// that contains the necessary code. You need both of these for the `rt`
612/// module.
613///
614/// Note, core is always included and registered, all the other
615/// modules are optional
616///
617/// # Example
618///
619/// ```no_run
620/// use netidx::{publisher::Publisher, subscriber::Subscriber};
621/// use anyhow::Result;
622/// use poolshark::global::GPooled;
623/// use graphix_compiler::ExecCtx;
624/// use graphix_rt::{GXRt, GXConfigBuilder, GXHandle, GXEvent, NoExt};
625/// use tokio::sync::mpsc;
626/// use enumflags2::BitFlags;
627///
628/// async fn start_runtime(
629///     publisher: Publisher,
630///     subscriber: Subscriber,
631///     sub: mpsc::Sender<GPooled<Vec<GXEvent<NoExt>>>>
632/// ) -> Result<GXHandle<NoExt>> {
633///     let mut ctx = ExecCtx::new(GXRt::<NoExt>::new(publisher, subscriber));
634///     let (root, mods) = graphix_stdlib::register(&mut ctx, BitFlags::all())?;
635///     GXConfigBuilder::default()
636///        .ctx(ctx)
637///        .root(root)
638///        .resolvers(vec![mods])
639///        .sub(sub)
640///        .build()?
641///        .start()
642///        .await
643/// }
644/// ```
645pub fn register<R: Rt, E: UserEvent>(
646    ctx: &mut ExecCtx<R, E>,
647    modules: BitFlags<Module>,
648) -> Result<(ArcStr, ModuleResolver)> {
649    let mut tbl = FxHashMap::default();
650    tbl.insert(Path::from("/core"), core::register(ctx)?);
651    let mut root = String::from("pub mod core;\nuse core;\n");
652    for module in modules {
653        match module {
654            Module::Array => {
655                root.push_str("pub mod array;\n");
656                tbl.insert(Path::from("/array"), array::register(ctx)?);
657            }
658            Module::Map => {
659                root.push_str("pub mod map;\n");
660                tbl.insert(Path::from("/map"), map::register(ctx)?);
661            }
662            Module::NetAndTime => {
663                root.push_str("pub mod time;\n");
664                tbl.insert(Path::from("/time"), time::register(ctx)?);
665                root.push_str("pub mod net;\n");
666                tbl.insert(Path::from("/net"), net::register(ctx)?);
667            }
668            Module::Rand => {
669                root.push_str("pub mod rand;\n");
670                tbl.insert(Path::from("/rand"), rand::register(ctx)?);
671            }
672            Module::Re => {
673                root.push_str("pub mod re;\n");
674                tbl.insert(Path::from("/re"), re::register(ctx)?);
675            }
676            Module::Str => {
677                root.push_str("pub mod str;\n");
678                tbl.insert(Path::from("/str"), str::register(ctx)?);
679            }
680        }
681    }
682    root.pop();
683    root.pop();
684    Ok((ArcStr::from(root), ModuleResolver::VFS(tbl)))
685}