graphix_stdlib/
lib.rs

1use anyhow::{bail, Result};
2use arcstr::ArcStr;
3use compact_str::format_compact;
4use enumflags2::{bitflags, BitFlags};
5use fxhash::FxHashMap;
6use graphix_compiler::{
7    expr::{ExprId, ModuleResolver},
8    node::genn,
9    typ::{FnType, Type},
10    Apply, BindId, BuiltIn, BuiltInInitFn, Event, ExecCtx, LambdaId, Node, Refs, Rt,
11    Scope, UserEvent,
12};
13use netidx::{path::Path, subscriber::Value};
14use netidx_core::utils::Either;
15use poolshark::local::LPooled;
16use std::{
17    collections::hash_map::Entry,
18    fmt::Debug,
19    iter,
20    marker::PhantomData,
21    sync::{Arc, LazyLock},
22};
23use triomphe::Arc as TArc;
24
25mod array;
26mod core;
27mod fs;
28mod map;
29mod net;
30mod rand;
31mod re;
32mod str;
33#[cfg(test)]
34mod test;
35mod time;
36
37#[macro_export]
38macro_rules! deftype {
39    ($scope:literal, $s:literal) => {
40        const TYP: ::std::sync::LazyLock<graphix_compiler::typ::FnType> =
41            ::std::sync::LazyLock::new(|| {
42                let scope =
43                    graphix_compiler::expr::ModPath(::netidx::path::Path::from($scope));
44                graphix_compiler::expr::parser::parse_fn_type($s)
45                    .expect("failed to parse fn type {s}")
46                    .scope_refs(&scope)
47            });
48    };
49}
50
51#[macro_export]
52macro_rules! arity1 {
53    ($from:expr, $updates:expr) => {
54        match (&*$from, &*$updates) {
55            ([arg], [arg_up]) => (arg, arg_up),
56            (_, _) => unreachable!(),
57        }
58    };
59}
60
61#[macro_export]
62macro_rules! arity2 {
63    ($from:expr, $updates:expr) => {
64        match (&*$from, &*$updates) {
65            ([arg0, arg1], [arg0_up, arg1_up]) => ((arg0, arg1), (arg0_up, arg1_up)),
66            (_, _) => unreachable!(),
67        }
68    };
69}
70
71#[derive(Debug)]
72pub struct CachedVals(pub Box<[Option<Value>]>);
73
74impl CachedVals {
75    pub fn new<R: Rt, E: UserEvent>(from: &[Node<R, E>]) -> CachedVals {
76        CachedVals(from.into_iter().map(|_| None).collect())
77    }
78
79    pub fn clear(&mut self) {
80        for v in &mut self.0 {
81            *v = None
82        }
83    }
84
85    pub fn update<R: Rt, E: UserEvent>(
86        &mut self,
87        ctx: &mut ExecCtx<R, E>,
88        from: &mut [Node<R, E>],
89        event: &mut Event<E>,
90    ) -> bool {
91        from.into_iter().enumerate().fold(false, |res, (i, src)| {
92            match src.update(ctx, event) {
93                None => res,
94                v @ Some(_) => {
95                    self.0[i] = v;
96                    true
97                }
98            }
99        })
100    }
101
102    /// Like update, but return the indexes of the nodes that updated
103    /// instead of a consolidated bool
104    pub fn update_diff<R: Rt, E: UserEvent>(
105        &mut self,
106        up: &mut [bool],
107        ctx: &mut ExecCtx<R, E>,
108        from: &mut [Node<R, E>],
109        event: &mut Event<E>,
110    ) {
111        for (i, n) in from.iter_mut().enumerate() {
112            match n.update(ctx, event) {
113                None => (),
114                v => {
115                    self.0[i] = v;
116                    up[i] = true
117                }
118            }
119        }
120    }
121
122    pub fn flat_iter<'a>(&'a self) -> impl Iterator<Item = Option<Value>> + 'a {
123        self.0.iter().flat_map(|v| match v {
124            None => Either::Left(iter::once(None)),
125            Some(v) => Either::Right(v.clone().flatten().map(Some)),
126        })
127    }
128}
129
130pub trait EvalCached: Debug + Default + Send + Sync + 'static {
131    const NAME: &str;
132    const TYP: LazyLock<FnType>;
133
134    fn eval(&mut self, from: &CachedVals) -> Option<Value>;
135}
136
137#[derive(Debug)]
138pub struct CachedArgs<T: EvalCached> {
139    cached: CachedVals,
140    t: T,
141}
142
143impl<R: Rt, E: UserEvent, T: EvalCached> BuiltIn<R, E> for CachedArgs<T> {
144    const NAME: &str = T::NAME;
145    const TYP: LazyLock<FnType> = T::TYP;
146
147    fn init(_: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E> {
148        Arc::new(|_, _, _, from, _| {
149            let t = CachedArgs::<T> { cached: CachedVals::new(from), t: T::default() };
150            Ok(Box::new(t))
151        })
152    }
153}
154
155impl<R: Rt, E: UserEvent, T: EvalCached> Apply<R, E> for CachedArgs<T> {
156    fn update(
157        &mut self,
158        ctx: &mut ExecCtx<R, E>,
159        from: &mut [Node<R, E>],
160        event: &mut Event<E>,
161    ) -> Option<Value> {
162        if self.cached.update(ctx, from, event) {
163            self.t.eval(&self.cached)
164        } else {
165            None
166        }
167    }
168
169    fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {
170        self.cached.clear()
171    }
172}
173
174pub trait MapCollection: Debug + Clone + Default + Send + Sync + 'static {
175    /// return the length of the collection
176    fn len(&self) -> usize;
177
178    /// iterate the collection elements as values
179    fn iter_values(&self) -> impl Iterator<Item = Value>;
180
181    /// given a value, return Some if the value is the collection type
182    /// we are mapping.
183    fn select(v: Value) -> Option<Self>;
184
185    /// given a collection wrap it in a value
186    fn project(self) -> Value;
187
188    /// return the element type given the function type
189    fn etyp(ft: &FnType) -> Result<Type>;
190}
191
192pub trait MapFn<R: Rt, E: UserEvent>: Debug + Default + Send + Sync + 'static {
193    type Collection: MapCollection;
194
195    const NAME: &str;
196    const TYP: LazyLock<FnType>;
197
198    /// finish will be called when every lambda instance has produced
199    /// a value for the updated array. Out contains the output of the
200    /// predicate lambda for each index i, and a is the array. out and
201    /// a are guaranteed to have the same length. out[i].cur is
202    /// guaranteed to be Some.
203    fn finish(&mut self, slots: &[Slot<R, E>], a: &Self::Collection) -> Option<Value>;
204}
205
206#[derive(Debug)]
207pub struct Slot<R: Rt, E: UserEvent> {
208    id: BindId,
209    pred: Node<R, E>,
210    pub cur: Option<Value>,
211}
212
213impl<R: Rt, E: UserEvent> Slot<R, E> {
214    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
215        self.pred.delete(ctx);
216        ctx.cached.remove(&self.id);
217        ctx.env.unbind_variable(self.id);
218    }
219}
220
221#[derive(Debug)]
222pub struct MapQ<R: Rt, E: UserEvent, T: MapFn<R, E>> {
223    scope: Scope,
224    predid: BindId,
225    top_id: ExprId,
226    mftyp: TArc<FnType>,
227    etyp: Type,
228    slots: Vec<Slot<R, E>>,
229    cur: T::Collection,
230    t: T,
231}
232
233impl<R: Rt, E: UserEvent, T: MapFn<R, E>> BuiltIn<R, E> for MapQ<R, E, T> {
234    const NAME: &str = T::NAME;
235    const TYP: LazyLock<FnType> = T::TYP;
236
237    fn init(_: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E> {
238        Arc::new(|_ctx, typ, scope, from, top_id| match from {
239            [_, _] => Ok(Box::new(Self {
240                scope: scope.append(&format_compact!("fn{}", LambdaId::new().inner())),
241                predid: BindId::new(),
242                top_id,
243                etyp: T::Collection::etyp(typ)?,
244                mftyp: match &typ.args[1].typ {
245                    Type::Fn(ft) => ft.clone(),
246                    t => bail!("expected a function not {t}"),
247                },
248                slots: vec![],
249                cur: Default::default(),
250                t: T::default(),
251            })),
252            _ => bail!("expected two arguments"),
253        })
254    }
255}
256
257impl<R: Rt, E: UserEvent, T: MapFn<R, E>> Apply<R, E> for MapQ<R, E, T> {
258    fn update(
259        &mut self,
260        ctx: &mut ExecCtx<R, E>,
261        from: &mut [Node<R, E>],
262        event: &mut Event<E>,
263    ) -> Option<Value> {
264        let slen = self.slots.len();
265        if let Some(v) = from[1].update(ctx, event) {
266            ctx.cached.insert(self.predid, v.clone());
267            event.variables.insert(self.predid, v);
268        }
269        let (up, resized) =
270            match from[0].update(ctx, event).and_then(|v| T::Collection::select(v)) {
271                Some(a) if a.len() == slen => (Some(a), false),
272                Some(a) if a.len() < slen => {
273                    while self.slots.len() > a.len() {
274                        if let Some(mut s) = self.slots.pop() {
275                            s.delete(ctx)
276                        }
277                    }
278                    (Some(a), true)
279                }
280                Some(a) => {
281                    while self.slots.len() < a.len() {
282                        let (id, node) = genn::bind(
283                            ctx,
284                            &self.scope.lexical,
285                            "x",
286                            self.etyp.clone(),
287                            self.top_id,
288                        );
289                        let fargs = vec![node];
290                        let fnode = genn::reference(
291                            ctx,
292                            self.predid,
293                            Type::Fn(self.mftyp.clone()),
294                            self.top_id,
295                        );
296                        let pred = genn::apply(
297                            fnode,
298                            self.scope.clone(),
299                            fargs,
300                            &self.mftyp,
301                            self.top_id,
302                        );
303                        self.slots.push(Slot { id, pred, cur: None });
304                    }
305                    (Some(a), true)
306                }
307                None => (None, false),
308            };
309        if let Some(a) = up {
310            for (s, v) in self.slots.iter().zip(a.iter_values()) {
311                ctx.cached.insert(s.id, v.clone());
312                event.variables.insert(s.id, v);
313            }
314            self.cur = a.clone();
315            if a.len() == 0 {
316                return Some(T::Collection::project(a));
317            }
318        }
319        let init = event.init;
320        let mut up = resized;
321        for (i, s) in self.slots.iter_mut().enumerate() {
322            if i == slen {
323                // new nodes were added starting here
324                event.init = true;
325                if let Entry::Vacant(e) = event.variables.entry(self.predid)
326                    && let Some(v) = ctx.cached.get(&self.predid)
327                {
328                    e.insert(v.clone());
329                }
330            }
331            if let Some(v) = s.pred.update(ctx, event) {
332                s.cur = Some(v);
333                up = true;
334            }
335        }
336        event.init = init;
337        if up && self.slots.iter().all(|s| s.cur.is_some()) {
338            self.t.finish(&mut &self.slots, &self.cur)
339        } else {
340            None
341        }
342    }
343
344    fn typecheck(
345        &mut self,
346        ctx: &mut ExecCtx<R, E>,
347        _from: &mut [Node<R, E>],
348    ) -> anyhow::Result<()> {
349        let (_, node) =
350            genn::bind(ctx, &self.scope.lexical, "x", self.etyp.clone(), self.top_id);
351        let fargs = vec![node];
352        let ft = self.mftyp.clone();
353        let fnode = genn::reference(ctx, self.predid, Type::Fn(ft.clone()), self.top_id);
354        let mut node = genn::apply(fnode, self.scope.clone(), fargs, &ft, self.top_id);
355        let r = node.typecheck(ctx);
356        node.delete(ctx);
357        r
358    }
359
360    fn refs(&self, refs: &mut Refs) {
361        for s in &self.slots {
362            s.pred.refs(refs)
363        }
364    }
365
366    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
367        ctx.cached.remove(&self.predid);
368        for sl in &mut self.slots {
369            sl.delete(ctx)
370        }
371    }
372
373    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
374        self.cur = Default::default();
375        for sl in &mut self.slots {
376            sl.cur = None;
377            sl.pred.sleep(ctx);
378        }
379    }
380}
381
382pub trait FoldFn<R: Rt, E: UserEvent>: Debug + Send + Sync + 'static {
383    type Collection: MapCollection;
384
385    const NAME: &str;
386    const TYP: LazyLock<FnType>;
387}
388
389#[derive(Debug)]
390pub struct FoldQ<R: Rt, E: UserEvent, T: FoldFn<R, E>> {
391    top_id: ExprId,
392    fid: BindId,
393    scope: Scope,
394    binds: Vec<BindId>,
395    nodes: Vec<Node<R, E>>,
396    inits: Vec<Option<Value>>,
397    initids: Vec<BindId>,
398    initid: BindId,
399    mftype: TArc<FnType>,
400    etyp: Type,
401    ityp: Type,
402    init: Option<Value>,
403    t: PhantomData<T>,
404}
405
406impl<R: Rt, E: UserEvent, T: FoldFn<R, E>> BuiltIn<R, E> for FoldQ<R, E, T> {
407    const NAME: &str = T::NAME;
408    const TYP: LazyLock<FnType> = T::TYP;
409
410    fn init(_: &mut ExecCtx<R, E>) -> BuiltInInitFn<R, E> {
411        Arc::new(|_ctx, typ, scope, from, top_id| match from {
412            [_, _, _] => Ok(Box::new(Self {
413                top_id,
414                scope: scope.clone(),
415                binds: vec![],
416                nodes: vec![],
417                inits: vec![],
418                initids: vec![],
419                initid: BindId::new(),
420                fid: BindId::new(),
421                etyp: T::Collection::etyp(typ)?,
422                ityp: typ.args[1].typ.clone(),
423                mftype: match &typ.args[2].typ {
424                    Type::Fn(ft) => ft.clone(),
425                    t => bail!("expected a function not {t}"),
426                },
427                init: None,
428                t: PhantomData,
429            })),
430            _ => bail!("expected three arguments"),
431        })
432    }
433}
434
435impl<R: Rt, E: UserEvent, T: FoldFn<R, E>> Apply<R, E> for FoldQ<R, E, T> {
436    fn update(
437        &mut self,
438        ctx: &mut ExecCtx<R, E>,
439        from: &mut [Node<R, E>],
440        event: &mut Event<E>,
441    ) -> Option<Value> {
442        let init = match from[0].update(ctx, event).and_then(|v| T::Collection::select(v))
443        {
444            None => self.nodes.len(),
445            Some(a) if a.len() == self.binds.len() => {
446                for (id, v) in self.binds.iter().zip(a.iter_values()) {
447                    ctx.cached.insert(*id, v.clone());
448                    event.variables.insert(*id, v.clone());
449                }
450                self.nodes.len()
451            }
452            Some(a) => {
453                let vals = a.iter_values().collect::<LPooled<Vec<Value>>>();
454                while self.binds.len() < a.len() {
455                    self.binds.push(BindId::new());
456                    self.inits.push(None);
457                    self.initids.push(BindId::new());
458                }
459                while a.len() < self.binds.len() {
460                    if let Some(id) = self.binds.pop() {
461                        ctx.cached.remove(&id);
462                    }
463                    if let Some(id) = self.initids.pop() {
464                        ctx.cached.remove(&id);
465                    }
466                    self.inits.pop();
467                    if let Some(mut n) = self.nodes.pop() {
468                        n.delete(ctx);
469                    }
470                }
471                let init = self.nodes.len();
472                for i in 0..self.binds.len() {
473                    ctx.cached.insert(self.binds[i], vals[i].clone());
474                    event.variables.insert(self.binds[i], vals[i].clone());
475                    if i >= self.nodes.len() {
476                        let n = genn::reference(
477                            ctx,
478                            if i == 0 { self.initid } else { self.initids[i - 1] },
479                            self.ityp.clone(),
480                            self.top_id,
481                        );
482                        let x = genn::reference(
483                            ctx,
484                            self.binds[i],
485                            self.etyp.clone(),
486                            self.top_id,
487                        );
488                        let fnode = genn::reference(
489                            ctx,
490                            self.fid,
491                            Type::Fn(self.mftype.clone()),
492                            self.top_id,
493                        );
494                        let node = genn::apply(
495                            fnode,
496                            self.scope.clone(),
497                            vec![n, x],
498                            &self.mftype,
499                            self.top_id,
500                        );
501                        self.nodes.push(node);
502                    }
503                }
504                init
505            }
506        };
507        if let Some(v) = from[1].update(ctx, event) {
508            ctx.cached.insert(self.initid, v.clone());
509            event.variables.insert(self.initid, v.clone());
510            self.init = Some(v);
511        }
512        if let Some(v) = from[2].update(ctx, event) {
513            ctx.cached.insert(self.fid, v.clone());
514            event.variables.insert(self.fid, v);
515        }
516        let old_init = event.init;
517        for i in 0..self.nodes.len() {
518            if i == init {
519                event.init = true;
520                if let Some(v) = ctx.cached.get(&self.fid)
521                    && let Entry::Vacant(e) = event.variables.entry(self.fid)
522                {
523                    e.insert(v.clone());
524                }
525                if i == 0 {
526                    if let Some(v) = self.init.as_ref()
527                        && let Entry::Vacant(e) = event.variables.entry(self.initid)
528                    {
529                        e.insert(v.clone());
530                    }
531                } else {
532                    if let Some(v) = self.inits[i - 1].clone() {
533                        event.variables.insert(self.initids[i - 1], v);
534                    }
535                }
536            }
537            match self.nodes[i].update(ctx, event) {
538                Some(v) => {
539                    ctx.cached.insert(self.initids[i], v.clone());
540                    event.variables.insert(self.initids[i], v.clone());
541                    self.inits[i] = Some(v);
542                }
543                None => {
544                    ctx.cached.remove(&self.initids[i]);
545                    event.variables.remove(&self.initids[i]);
546                    self.inits[i] = None;
547                }
548            }
549        }
550        event.init = old_init;
551        self.inits.last().and_then(|v| v.clone())
552    }
553
554    fn typecheck(
555        &mut self,
556        ctx: &mut ExecCtx<R, E>,
557        _from: &mut [Node<R, E>],
558    ) -> anyhow::Result<()> {
559        let mut n = genn::reference(ctx, self.initid, self.ityp.clone(), self.top_id);
560        let x = genn::reference(ctx, BindId::new(), self.etyp.clone(), self.top_id);
561        let fnode =
562            genn::reference(ctx, self.fid, Type::Fn(self.mftype.clone()), self.top_id);
563        n = genn::apply(fnode, self.scope.clone(), vec![n, x], &self.mftype, self.top_id);
564        let r = n.typecheck(ctx);
565        n.delete(ctx);
566        r
567    }
568
569    fn refs(&self, refs: &mut Refs) {
570        for n in &self.nodes {
571            n.refs(refs)
572        }
573    }
574
575    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
576        let i =
577            iter::once(&self.initid).chain(self.binds.iter()).chain(self.initids.iter());
578        for id in i {
579            ctx.cached.remove(id);
580        }
581        for n in &mut self.nodes {
582            n.delete(ctx);
583        }
584    }
585
586    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
587        self.init = None;
588        for v in &mut self.inits {
589            *v = None
590        }
591        for n in &mut self.nodes {
592            n.sleep(ctx)
593        }
594    }
595}
596
597#[bitflags]
598#[derive(Clone, Copy)]
599#[repr(u64)]
600pub enum Module {
601    Array,
602    Map,
603    NetAndTime,
604    Rand,
605    Re,
606    Str,
607}
608
609/// Register selected modules of the standard graphix library
610///
611/// and return a root module that will load them along with a module resolver
612/// that contains the necessary code. You need both of these for the `rt`
613/// module.
614///
615/// Note, core is always included and registered, all the other
616/// modules are optional
617///
618/// # Example
619///
620/// ```no_run
621/// use netidx::{publisher::Publisher, subscriber::Subscriber};
622/// use anyhow::Result;
623/// use poolshark::global::GPooled;
624/// use graphix_compiler::ExecCtx;
625/// use graphix_rt::{GXRt, GXConfigBuilder, GXHandle, GXEvent, NoExt};
626/// use tokio::sync::mpsc;
627/// use enumflags2::BitFlags;
628///
629/// async fn start_runtime(
630///     publisher: Publisher,
631///     subscriber: Subscriber,
632///     sub: mpsc::Sender<GPooled<Vec<GXEvent<NoExt>>>>
633/// ) -> Result<GXHandle<NoExt>> {
634///     let mut ctx = ExecCtx::new(GXRt::<NoExt>::new(publisher, subscriber));
635///     let (root, mods) = graphix_stdlib::register(&mut ctx, BitFlags::all())?;
636///     GXConfigBuilder::default()
637///        .ctx(ctx)
638///        .root(root)
639///        .resolvers(vec![mods])
640///        .sub(sub)
641///        .build()?
642///        .start()
643///        .await
644/// }
645/// ```
646pub fn register<R: Rt, E: UserEvent>(
647    ctx: &mut ExecCtx<R, E>,
648    modules: BitFlags<Module>,
649) -> Result<(ArcStr, ModuleResolver)> {
650    let mut tbl = FxHashMap::default();
651    tbl.insert(Path::from("/core"), core::register(ctx)?);
652    let mut root = String::from("pub mod core;\nuse core;\n");
653    for module in modules {
654        match module {
655            Module::Array => {
656                root.push_str("pub mod array;\n");
657                tbl.insert(Path::from("/array"), array::register(ctx)?);
658            }
659            Module::Map => {
660                root.push_str("pub mod map;\n");
661                tbl.insert(Path::from("/map"), map::register(ctx)?);
662            }
663            Module::NetAndTime => {
664                root.push_str("pub mod time;\n");
665                tbl.insert(Path::from("/time"), time::register(ctx)?);
666                root.push_str("pub mod net;\n");
667                tbl.insert(Path::from("/net"), net::register(ctx)?);
668            }
669            Module::Rand => {
670                root.push_str("pub mod rand;\n");
671                tbl.insert(Path::from("/rand"), rand::register(ctx)?);
672            }
673            Module::Re => {
674                root.push_str("pub mod re;\n");
675                tbl.insert(Path::from("/re"), re::register(ctx)?);
676            }
677            Module::Str => {
678                root.push_str("pub mod str;\n");
679                tbl.insert(Path::from("/str"), str::register(ctx)?);
680            }
681        }
682    }
683    root.pop();
684    root.pop();
685    Ok((ArcStr::from(root), ModuleResolver::VFS(tbl)))
686}