graphix_rt/
lib.rs

1//! A general purpose graphix runtime
2//!
3//! This module implements a generic graphix runtime suitable for most
4//! applications, including applications that implement custom graphix
5//! builtins. The graphix interperter is run in a background task, and
6//! can be interacted with via a handle. All features of the standard
7//! library are supported by this runtime.
8use anyhow::{anyhow, bail, Result};
9use arcstr::ArcStr;
10use derive_builder::Builder;
11use enumflags2::BitFlags;
12use fxhash::FxHashSet;
13use graphix_compiler::{
14    env::Env,
15    expr::{ExprId, ModPath, ModuleResolver, Source},
16    typ::{FnType, Type},
17    BindId, CFlag, Event, ExecCtx, NoUserEvent, Scope, UserEvent,
18};
19use log::error;
20use netidx::{
21    protocol::valarray::ValArray,
22    publisher::{Value, WriteRequest},
23    subscriber::{self, SubId},
24};
25use netidx_core::atomic_id;
26use netidx_value::FromValue;
27use poolshark::global::GPooled;
28use serde_derive::{Deserialize, Serialize};
29use smallvec::SmallVec;
30use std::{fmt, future, sync::Arc, time::Duration};
31use tokio::{
32    sync::{
33        mpsc::{self as tmpsc},
34        oneshot,
35    },
36    task::{self, JoinHandle},
37};
38
39mod gx;
40mod rt;
41use gx::GX;
42pub use rt::GXRt;
43
44/// Trait to extend the event loop
45///
46/// The Graphix event loop has two steps,
47/// - update event sources, polls external async event sources like
48///   netidx, sockets, files, etc
49/// - do cycle, collects all the events and delivers them to the dataflow
50///   graph as a batch of "everything that happened"
51///
52/// As such to extend the event loop you must implement two things. A function
53/// to poll your own external event sources, and a function to take the events
54/// you got from those sources and represent them to the dataflow graph. You
55/// represent them either by setting generic variables (bindid -> value map), or
56/// by setting some custom structures that you define as part of your UserEvent
57/// implementation.
58///
59/// Your Graphix builtins can access both your custom structure, to register new
60/// event sources, etc, and your custom user event structure, to receive events
61/// who's types do not fit nicely as `Value`. If your event payload does fit
62/// nicely as a `Value`, then just use a variable.
63pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
64    type UserEvent: UserEvent + Send + Sync + 'static;
65
66    /// Update your custom event sources
67    ///
68    /// Your `update_sources` MUST be cancel safe.
69    fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
70
71    /// Collect events that happened and marshal them into the event structure
72    ///
73    /// for delivery to the dataflow graph. `do_cycle` will be called, and a
74    /// batch of events delivered to the graph until `is_ready` returns false.
75    /// It is possible that a call to `update_sources` will result in
76    /// multiple calls to `do_cycle`, but it is not guaranteed that
77    /// `update_sources` will not be called again before `is_ready`
78    /// returns false.
79    fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
80
81    /// Return true if there are events ready to deliver
82    fn is_ready(&self) -> bool;
83
84    /// Clear the state
85    fn clear(&mut self);
86
87    /// Create and return an empty custom event structure
88    fn empty_event(&mut self) -> Self::UserEvent;
89}
90
91#[derive(Debug, Default)]
92pub struct NoExt;
93
94impl GXExt for NoExt {
95    type UserEvent = NoUserEvent;
96
97    async fn update_sources(&mut self) -> Result<()> {
98        future::pending().await
99    }
100
101    fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
102        Ok(())
103    }
104
105    fn is_ready(&self) -> bool {
106        false
107    }
108
109    fn clear(&mut self) {}
110
111    fn empty_event(&mut self) -> Self::UserEvent {
112        NoUserEvent
113    }
114}
115
116type UpdateBatch = GPooled<Vec<(SubId, subscriber::Event)>>;
117type WriteBatch = GPooled<Vec<WriteRequest>>;
118
119#[derive(Debug)]
120pub struct CompExp<X: GXExt> {
121    pub id: ExprId,
122    pub typ: Type,
123    pub output: bool,
124    rt: GXHandle<X>,
125}
126
127impl<X: GXExt> Drop for CompExp<X> {
128    fn drop(&mut self) {
129        let _ = self.rt.0.tx.send(ToGX::Delete { id: self.id });
130    }
131}
132
133#[derive(Debug)]
134pub struct CompRes<X: GXExt> {
135    pub exprs: SmallVec<[CompExp<X>; 1]>,
136    pub env: Env<GXRt<X>, X::UserEvent>,
137}
138
139pub struct Ref<X: GXExt> {
140    pub id: ExprId,
141    // the most recent value of the variable
142    pub last: Option<Value>,
143    pub bid: BindId,
144    pub target_bid: Option<BindId>,
145    pub typ: Type,
146    rt: GXHandle<X>,
147}
148
149impl<X: GXExt> Drop for Ref<X> {
150    fn drop(&mut self) {
151        let _ = self.rt.0.tx.send(ToGX::Delete { id: self.id });
152    }
153}
154
155impl<X: GXExt> Ref<X> {
156    /// set the value of the ref `r <-`
157    ///
158    /// This will cause all nodes dependent on this id to update. This is the
159    /// same thing as the `<-` operator in Graphix. This does the same thing as
160    /// `GXHandle::set`
161    pub fn set<T: Into<Value>>(&mut self, v: T) -> Result<()> {
162        let v = v.into();
163        self.last = Some(v.clone());
164        self.rt.set(self.bid, v)
165    }
166
167    /// set the value pointed to by ref `*r <-`
168    ///
169    /// This will cause all nodes dependent on *id to update. This is the same
170    /// as the `*r <-` operator in Graphix. This does the same thing as
171    /// `GXHandle::set` using the target id.
172    pub fn set_deref<T: Into<Value>>(&mut self, v: T) -> Result<()> {
173        if let Some(id) = self.target_bid {
174            self.rt.set(id, v)?
175        }
176        Ok(())
177    }
178
179    /// Process an update
180    ///
181    /// If the expr id refers to this ref, then set `last` to `v` and return a
182    /// mutable reference to `last`, otherwise return None. This will also
183    /// update `last` if the id matches.
184    pub fn update(&mut self, id: ExprId, v: &Value) -> Option<&mut Value> {
185        if self.id == id {
186            self.last = Some(v.clone());
187            self.last.as_mut()
188        } else {
189            None
190        }
191    }
192}
193
194pub struct TRef<X: GXExt, T: FromValue> {
195    pub r: Ref<X>,
196    pub t: Option<T>,
197}
198
199impl<X: GXExt, T: FromValue> TRef<X, T> {
200    /// Create a new typed reference from `r`
201    ///
202    /// If conversion of `r` fails, return an error.
203    pub fn new(mut r: Ref<X>) -> Result<Self> {
204        let t = r.last.take().map(|v| v.cast_to()).transpose()?;
205        Ok(TRef { r, t })
206    }
207
208    /// Process an update
209    ///
210    /// If the expr id refers to this tref, then convert the value into a `T`
211    /// update `t` and return a mutable reference to the new `T`, otherwise
212    /// return None. Return an Error if the conversion failed.
213    pub fn update(&mut self, id: ExprId, v: &Value) -> Result<Option<&mut T>> {
214        if self.r.id == id {
215            let v = v.clone().cast_to()?;
216            self.t = Some(v);
217            Ok(self.t.as_mut())
218        } else {
219            Ok(None)
220        }
221    }
222}
223
224impl<X: GXExt, T: Into<Value> + FromValue + Clone> TRef<X, T> {
225    /// set the value of the tref `r <-`
226    ///
227    /// This will cause all nodes dependent on this id to update. This is the
228    /// same thing as the `<-` operator in Graphix. This does the same thing as
229    /// `GXHandle::set`
230    pub fn set(&mut self, t: T) -> Result<()> {
231        self.t = Some(t.clone());
232        self.r.set(t)
233    }
234
235    /// set the value pointed to by tref `*r <-`
236    ///
237    /// This will cause all nodes dependent on *id to update. This is the same
238    /// as the `*r <-` operator in Graphix. This does the same thing as
239    /// `GXHandle::set` using the target id.
240    pub fn set_deref(&mut self, t: T) -> Result<()> {
241        self.t = Some(t.clone());
242        self.r.set_deref(t.into())
243    }
244}
245
246atomic_id!(CallableId);
247
248pub struct Callable<X: GXExt> {
249    rt: GXHandle<X>,
250    id: CallableId,
251    env: Env<GXRt<X>, X::UserEvent>,
252    pub typ: FnType,
253    pub expr: ExprId,
254}
255
256impl<X: GXExt> Drop for Callable<X> {
257    fn drop(&mut self) {
258        let _ = self.rt.0.tx.send(ToGX::DeleteCallable { id: self.id });
259    }
260}
261
262impl<X: GXExt> Callable<X> {
263    /// Call the lambda with args
264    ///
265    /// Argument types and arity will be checked and an error will be returned
266    /// if they are wrong. If you call the function more than once before it
267    /// returns there is no guarantee that the returns will arrive in the order
268    /// of the calls. There is no guarantee that a function must return.
269    pub async fn call(&self, args: ValArray) -> Result<()> {
270        if self.typ.args.len() != args.len() {
271            bail!("expected {} args", self.typ.args.len())
272        }
273        for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
274            if !a.typ.is_a(&self.env, v) {
275                bail!("type mismatch arg {i} expected {}", a.typ)
276            }
277        }
278        self.call_unchecked(args).await
279    }
280
281    /// Call the lambda with args. Argument types and arity will NOT
282    /// be checked. This can result in a runtime panic, invalid
283    /// results, and probably other bad things.
284    pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
285        self.rt
286            .0
287            .tx
288            .send(ToGX::Call { id: self.id, args })
289            .map_err(|_| anyhow!("runtime is dead"))
290    }
291
292    /// Return Some(v) if this update is the return value of the callable
293    pub fn update<'a>(&self, id: ExprId, v: &'a Value) -> Option<&'a Value> {
294        if self.expr == id {
295            Some(v)
296        } else {
297            None
298        }
299    }
300}
301
302enum DeferredCall {
303    Call(ValArray, oneshot::Sender<Result<()>>),
304    CallUnchecked(ValArray, oneshot::Sender<Result<()>>),
305}
306
307pub struct NamedCallable<X: GXExt> {
308    fname: Ref<X>,
309    current: Option<Callable<X>>,
310    ids: FxHashSet<ExprId>,
311    deferred: Vec<DeferredCall>,
312    h: GXHandle<X>,
313}
314
315impl<X: GXExt> NamedCallable<X> {
316    /// Update the named callable function
317    ///
318    /// This method does two things,
319    /// - Handle late binding. When the name ref updates to an actual function
320    ///   compile the real call site
321    /// - Return Ok(Some(v)) when the called function returns
322    pub async fn update<'a>(
323        &mut self,
324        id: ExprId,
325        v: &'a Value,
326    ) -> Result<Option<&'a Value>> {
327        match self.fname.update(id, v) {
328            Some(v) => {
329                let callable = self.h.compile_callable(v.clone()).await?;
330                self.ids.insert(callable.expr);
331                for dc in self.deferred.drain(..) {
332                    match dc {
333                        DeferredCall::Call(args, reply) => {
334                            let _ = reply.send(callable.call(args).await);
335                        }
336                        DeferredCall::CallUnchecked(args, reply) => {
337                            let _ = reply.send(callable.call_unchecked(args).await);
338                        }
339                    }
340                }
341                self.current = Some(callable);
342                Ok(None)
343            }
344            None if self.ids.contains(&id) => Ok(Some(v)),
345            None => Ok(None),
346        }
347    }
348
349    /// Call the lambda with args
350    ///
351    /// Argument types and arity will be checked and an error will be returned
352    /// if they are wrong. If you call the function more than once before it
353    /// returns there is no guarantee that the returns will arrive in the order
354    /// of the calls. There is no guarantee that a function must return. In
355    /// order to handle late binding you must keep calling `update` while
356    /// waiting for this method.
357    ///
358    /// While a late bound function is unresolved calls will queue internally in
359    /// the NamedCallsite and will happen when the function is resolved.
360    pub async fn call(&mut self, args: ValArray) -> Result<()> {
361        match &self.current {
362            Some(c) => c.call(args).await,
363            None => {
364                let (tx, rx) = oneshot::channel();
365                self.deferred.push(DeferredCall::Call(args, tx));
366                rx.await?
367            }
368        }
369    }
370
371    /// call the function with the specified args
372    ///
373    /// Argument types and arity will NOT be checked by this method. If you call
374    /// the function more than once before it returns there is no guarantee that
375    /// the returns will arrive in the order of the calls. There is no guarantee
376    /// that a function must return. In order to handle late binding you must
377    /// keep calling `update` while waiting for this method.
378    ///
379    /// While a late bound function is unresolved calls will queue internally in
380    /// the NamedCallsite and will happen when the function is resolved.
381    pub async fn call_unchecked(&mut self, args: ValArray) -> Result<()> {
382        match &self.current {
383            Some(c) => c.call(args).await,
384            None => {
385                let (tx, rx) = oneshot::channel();
386                self.deferred.push(DeferredCall::CallUnchecked(args, tx));
387                rx.await?
388            }
389        }
390    }
391}
392
393enum ToGX<X: GXExt> {
394    GetEnv {
395        res: oneshot::Sender<Env<GXRt<X>, X::UserEvent>>,
396    },
397    Delete {
398        id: ExprId,
399    },
400    Load {
401        path: Source,
402        rt: GXHandle<X>,
403        res: oneshot::Sender<Result<CompRes<X>>>,
404    },
405    Check {
406        path: Source,
407        res: oneshot::Sender<Result<()>>,
408    },
409    Compile {
410        text: ArcStr,
411        rt: GXHandle<X>,
412        res: oneshot::Sender<Result<CompRes<X>>>,
413    },
414    CompileCallable {
415        id: Value,
416        rt: GXHandle<X>,
417        res: oneshot::Sender<Result<Callable<X>>>,
418    },
419    CompileRef {
420        id: BindId,
421        rt: GXHandle<X>,
422        res: oneshot::Sender<Result<Ref<X>>>,
423    },
424    Set {
425        id: BindId,
426        v: Value,
427    },
428    Call {
429        id: CallableId,
430        args: ValArray,
431    },
432    DeleteCallable {
433        id: CallableId,
434    },
435}
436
437#[derive(Debug, Clone)]
438pub enum GXEvent<X: GXExt> {
439    Updated(ExprId, Value),
440    Env(Env<GXRt<X>, X::UserEvent>),
441}
442
443struct GXHandleInner<X: GXExt> {
444    tx: tmpsc::UnboundedSender<ToGX<X>>,
445    task: JoinHandle<()>,
446}
447
448impl<X: GXExt> Drop for GXHandleInner<X> {
449    fn drop(&mut self) {
450        self.task.abort()
451    }
452}
453
454/// A handle to a running GX instance.
455///
456/// Drop the handle to shutdown the associated background tasks.
457pub struct GXHandle<X: GXExt>(Arc<GXHandleInner<X>>);
458
459impl<X: GXExt> fmt::Debug for GXHandle<X> {
460    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
461        write!(f, "GXHandle")
462    }
463}
464
465impl<X: GXExt> Clone for GXHandle<X> {
466    fn clone(&self) -> Self {
467        Self(self.0.clone())
468    }
469}
470
471impl<X: GXExt> GXHandle<X> {
472    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
473        let (tx, rx) = oneshot::channel();
474        self.0.tx.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
475        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
476    }
477
478    /// Get a copy of the current graphix environment
479    pub async fn get_env(&self) -> Result<Env<GXRt<X>, X::UserEvent>> {
480        self.exec(|res| ToGX::GetEnv { res }).await
481    }
482
483    /// Check that a graphix module compiles
484    ///
485    /// If path startes with `netidx:` then the module will be loaded
486    /// from netidx, otherwise it will be loaded from the
487    /// filesystem. If the file compiles successfully return Ok(())
488    /// otherwise an error describing the problem. The environment
489    /// will not be altered by checking an expression, so you will not
490    /// be able to use any defined names later in the program. If you
491    /// want to do that see `compile`.
492    pub async fn check(&self, path: Source) -> Result<()> {
493        Ok(self.exec(|tx| ToGX::Check { path, res: tx }).await??)
494    }
495
496    /// Compile and execute a graphix expression
497    ///
498    /// If it generates results, they will be sent to all the channels that are
499    /// subscribed. When the `CompExp` objects contained in the `CompRes` are
500    /// dropped their corresponding expressions will be deleted. Therefore, you
501    /// can stop execution of the whole expression by dropping the returned
502    /// `CompRes`.
503    pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
504        Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
505    }
506
507    /// Load and execute a file or netidx value
508    ///
509    /// When the `CompExp` objects contained in the `CompRes` are
510    /// dropped their corresponding expressions will be
511    /// deleted. Therefore, you can stop execution of the whole file
512    /// by dropping the returned `CompRes`.
513    pub async fn load(&self, path: Source) -> Result<CompRes<X>> {
514        Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
515    }
516
517    /// Compile a callable interface to a lambda id
518    ///
519    /// This is how you call a lambda directly from rust. When the returned
520    /// `Callable` is dropped the associated callsite will be delete.
521    pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
522        Ok(self
523            .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
524            .await??)
525    }
526
527    /// Compile a callable interface to a late bound function by name
528    ///
529    /// This allows you to call a function by name. Because of late binding it
530    /// has some additional complexity (though less than implementing it
531    /// yourself). You must call `update` on `NamedCallable` when you recieve
532    /// updates from the runtime in order to drive late binding. `update` will
533    /// also return `Some` when one of your function calls returns.
534    pub async fn compile_callable_by_name(
535        &self,
536        env: &Env<GXRt<X>, X::UserEvent>,
537        scope: &Scope,
538        name: &ModPath,
539    ) -> Result<NamedCallable<X>> {
540        let r = self.compile_ref_by_name(env, scope, name).await?;
541        match &r.typ {
542            Type::Fn(_) => (),
543            t => bail!(
544                "{name} in scope {} has type {t}. expected a function",
545                scope.lexical
546            ),
547        }
548        Ok(NamedCallable {
549            fname: r,
550            current: None,
551            ids: FxHashSet::default(),
552            deferred: vec![],
553            h: self.clone(),
554        })
555    }
556
557    /// Compile a ref to a bind id
558    ///
559    /// This will NOT return an error if the id isn't in the environment.
560    pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
561        Ok(self
562            .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
563            .await??)
564    }
565
566    /// Compile a ref to a name
567    ///
568    /// Return an error if the name does not exist in the environment
569    pub async fn compile_ref_by_name(
570        &self,
571        env: &Env<GXRt<X>, X::UserEvent>,
572        scope: &Scope,
573        name: &ModPath,
574    ) -> Result<Ref<X>> {
575        let id = env
576            .lookup_bind(&scope.lexical, name)
577            .ok_or_else(|| anyhow!("no such value {name} in scope {}", scope.lexical))?
578            .1
579            .id;
580        self.compile_ref(id).await
581    }
582
583    /// Set the variable idenfified by `id` to `v`
584    ///
585    /// triggering updates of all dependent node trees. This does the same thing
586    /// as`Ref::set` and `TRef::set`
587    pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
588        let v = v.into();
589        self.0.tx.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
590    }
591}
592
593#[derive(Builder)]
594#[builder(pattern = "owned")]
595pub struct GXConfig<X: GXExt> {
596    /// The subscribe timeout to use when resolving modules in
597    /// netidx. Resolution will fail if the subscription does not
598    /// succeed before this timeout elapses.
599    #[builder(setter(strip_option), default)]
600    resolve_timeout: Option<Duration>,
601    /// The publish timeout to use when sending published batches. Default None.
602    #[builder(setter(strip_option), default)]
603    publish_timeout: Option<Duration>,
604    /// The execution context with any builtins already registered
605    ctx: ExecCtx<GXRt<X>, X::UserEvent>,
606    /// The text of the root module
607    #[builder(setter(strip_option), default)]
608    root: Option<ArcStr>,
609    /// The set of module resolvers to use when resolving loaded modules
610    #[builder(default)]
611    resolvers: Vec<ModuleResolver>,
612    /// The channel that will receive events from the runtime
613    sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
614    /// The set of compiler flags. Default empty.
615    #[builder(default)]
616    flags: BitFlags<CFlag>,
617}
618
619impl<X: GXExt> GXConfig<X> {
620    /// Create a new config
621    pub fn builder(
622        ctx: ExecCtx<GXRt<X>, X::UserEvent>,
623        sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
624    ) -> GXConfigBuilder<X> {
625        GXConfigBuilder::default().ctx(ctx).sub(sub)
626    }
627
628    /// Start the graphix runtime with the specified config,
629    ///
630    /// return a handle capable of interacting with it. root is the text of the
631    /// root module you wish to initially load. This will define the environment
632    /// for the rest of the code compiled by this runtime. The runtime starts
633    /// completely empty, with only the language, no core library, no standard
634    /// library. To build a runtime with the full standard library and nothing
635    /// else simply pass the output of `graphix_stdlib::register` to start.
636    pub async fn start(self) -> Result<GXHandle<X>> {
637        let (init_tx, init_rx) = oneshot::channel();
638        let (tx, rx) = tmpsc::unbounded_channel();
639        let task = task::spawn(async move {
640            match GX::new(self).await {
641                Ok(bs) => {
642                    let _ = init_tx.send(Ok(()));
643                    if let Err(e) = bs.run(rx).await {
644                        error!("run loop exited with error {e:?}")
645                    }
646                }
647                Err(e) => {
648                    let _ = init_tx.send(Err(e));
649                }
650            };
651        });
652        init_rx.await??;
653        Ok(GXHandle(Arc::new(GXHandleInner { tx, task })))
654    }
655}