Skip to main content

graphix_rt/
lib.rs

1#![doc(
2    html_logo_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg",
3    html_favicon_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg"
4)]
5//! A general purpose graphix runtime
6//!
7//! This module implements a generic graphix runtime suitable for most
8//! applications, including applications that implement custom graphix
9//! builtins. The graphix interperter is run in a background task, and
10//! can be interacted with via a handle. All features of the standard
11//! library are supported by this runtime.
12use anyhow::{anyhow, bail, Result};
13use arcstr::ArcStr;
14use derive_builder::Builder;
15use enumflags2::BitFlags;
16use fxhash::FxHashSet;
17use graphix_compiler::{
18    env::Env,
19    expr::{ExprId, ModPath, ModuleResolver, Source},
20    typ::{FnType, Type},
21    BindId, CFlag, Event, ExecCtx, NoUserEvent, Scope, UserEvent,
22};
23use log::error;
24use netidx::{
25    protocol::valarray::ValArray,
26    publisher::{Value, WriteRequest},
27    subscriber::{self, SubId},
28};
29use netidx_core::atomic_id;
30use netidx_value::FromValue;
31use poolshark::global::GPooled;
32use serde_derive::{Deserialize, Serialize};
33use smallvec::SmallVec;
34use std::{fmt, future, sync::Arc, time::Duration};
35use tokio::{
36    sync::{
37        mpsc::{self as tmpsc},
38        oneshot,
39    },
40    task::{self, JoinHandle},
41};
42
43mod gx;
44mod rt;
45use gx::GX;
46pub use rt::GXRt;
47
48/// Trait to extend the event loop
49///
50/// The Graphix event loop has two steps,
51/// - update event sources, polls external async event sources like
52///   netidx, sockets, files, etc
53/// - do cycle, collects all the events and delivers them to the dataflow
54///   graph as a batch of "everything that happened"
55///
56/// As such to extend the event loop you must implement two things. A function
57/// to poll your own external event sources, and a function to take the events
58/// you got from those sources and represent them to the dataflow graph. You
59/// represent them either by setting generic variables (bindid -> value map), or
60/// by setting some custom structures that you define as part of your UserEvent
61/// implementation.
62///
63/// Your Graphix builtins can access both your custom structure, to register new
64/// event sources, etc, and your custom user event structure, to receive events
65/// who's types do not fit nicely as `Value`. If your event payload does fit
66/// nicely as a `Value`, then just use a variable.
67pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
68    type UserEvent: UserEvent + Send + Sync + 'static;
69
70    /// Update your custom event sources
71    ///
72    /// Your `update_sources` MUST be cancel safe.
73    fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
74
75    /// Collect events that happened and marshal them into the event structure
76    ///
77    /// for delivery to the dataflow graph. `do_cycle` will be called, and a
78    /// batch of events delivered to the graph until `is_ready` returns false.
79    /// It is possible that a call to `update_sources` will result in
80    /// multiple calls to `do_cycle`, but it is not guaranteed that
81    /// `update_sources` will not be called again before `is_ready`
82    /// returns false.
83    fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
84
85    /// Return true if there are events ready to deliver
86    fn is_ready(&self) -> bool;
87
88    /// Clear the state
89    fn clear(&mut self);
90
91    /// Create and return an empty custom event structure
92    fn empty_event(&mut self) -> Self::UserEvent;
93}
94
95#[derive(Debug, Default)]
96pub struct NoExt;
97
98impl GXExt for NoExt {
99    type UserEvent = NoUserEvent;
100
101    async fn update_sources(&mut self) -> Result<()> {
102        future::pending().await
103    }
104
105    fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
106        Ok(())
107    }
108
109    fn is_ready(&self) -> bool {
110        false
111    }
112
113    fn clear(&mut self) {}
114
115    fn empty_event(&mut self) -> Self::UserEvent {
116        NoUserEvent
117    }
118}
119
120type UpdateBatch = GPooled<Vec<(SubId, subscriber::Event)>>;
121type WriteBatch = GPooled<Vec<WriteRequest>>;
122
123#[derive(Debug)]
124pub struct CompExp<X: GXExt> {
125    pub id: ExprId,
126    pub typ: Type,
127    pub output: bool,
128    rt: GXHandle<X>,
129}
130
131impl<X: GXExt> Drop for CompExp<X> {
132    fn drop(&mut self) {
133        let _ = self.rt.0.tx.send(ToGX::Delete { id: self.id });
134    }
135}
136
137#[derive(Debug)]
138pub struct CompRes<X: GXExt> {
139    pub exprs: SmallVec<[CompExp<X>; 1]>,
140    pub env: Env,
141}
142
143pub struct Ref<X: GXExt> {
144    pub id: ExprId,
145    // the most recent value of the variable
146    pub last: Option<Value>,
147    pub bid: BindId,
148    pub target_bid: Option<BindId>,
149    pub typ: Type,
150    rt: GXHandle<X>,
151}
152
153impl<X: GXExt> Drop for Ref<X> {
154    fn drop(&mut self) {
155        let _ = self.rt.0.tx.send(ToGX::Delete { id: self.id });
156    }
157}
158
159impl<X: GXExt> Ref<X> {
160    /// set the value of the ref `r <-`
161    ///
162    /// This will cause all nodes dependent on this id to update. This is the
163    /// same thing as the `<-` operator in Graphix. This does the same thing as
164    /// `GXHandle::set`
165    pub fn set<T: Into<Value>>(&mut self, v: T) -> Result<()> {
166        let v = v.into();
167        self.last = Some(v.clone());
168        self.rt.set(self.bid, v)
169    }
170
171    /// set the value pointed to by ref `*r <-`
172    ///
173    /// This will cause all nodes dependent on *id to update. This is the same
174    /// as the `*r <-` operator in Graphix. This does the same thing as
175    /// `GXHandle::set` using the target id.
176    pub fn set_deref<T: Into<Value>>(&mut self, v: T) -> Result<()> {
177        if let Some(id) = self.target_bid {
178            self.rt.set(id, v)?
179        }
180        Ok(())
181    }
182
183    /// Process an update
184    ///
185    /// If the expr id refers to this ref, then set `last` to `v` and return a
186    /// mutable reference to `last`, otherwise return None. This will also
187    /// update `last` if the id matches.
188    pub fn update(&mut self, id: ExprId, v: &Value) -> Option<&mut Value> {
189        if self.id == id {
190            self.last = Some(v.clone());
191            self.last.as_mut()
192        } else {
193            None
194        }
195    }
196}
197
198pub struct TRef<X: GXExt, T: FromValue> {
199    pub r: Ref<X>,
200    pub t: Option<T>,
201}
202
203impl<X: GXExt, T: FromValue> TRef<X, T> {
204    /// Create a new typed reference from `r`
205    ///
206    /// If conversion of `r` fails, return an error.
207    pub fn new(mut r: Ref<X>) -> Result<Self> {
208        let t = r.last.take().map(|v| v.cast_to()).transpose()?;
209        Ok(TRef { r, t })
210    }
211
212    /// Process an update
213    ///
214    /// If the expr id refers to this tref, then convert the value into a `T`
215    /// update `t` and return a mutable reference to the new `T`, otherwise
216    /// return None. Return an Error if the conversion failed.
217    pub fn update(&mut self, id: ExprId, v: &Value) -> Result<Option<&mut T>> {
218        if self.r.id == id {
219            let v = v.clone().cast_to()?;
220            self.t = Some(v);
221            Ok(self.t.as_mut())
222        } else {
223            Ok(None)
224        }
225    }
226}
227
228impl<X: GXExt, T: Into<Value> + FromValue + Clone> TRef<X, T> {
229    /// set the value of the tref `r <-`
230    ///
231    /// This will cause all nodes dependent on this id to update. This is the
232    /// same thing as the `<-` operator in Graphix. This does the same thing as
233    /// `GXHandle::set`
234    pub fn set(&mut self, t: T) -> Result<()> {
235        self.t = Some(t.clone());
236        self.r.set(t)
237    }
238
239    /// set the value pointed to by tref `*r <-`
240    ///
241    /// This will cause all nodes dependent on *id to update. This is the same
242    /// as the `*r <-` operator in Graphix. This does the same thing as
243    /// `GXHandle::set` using the target id.
244    pub fn set_deref(&mut self, t: T) -> Result<()> {
245        self.t = Some(t.clone());
246        self.r.set_deref(t.into())
247    }
248}
249
250atomic_id!(CallableId);
251
252pub struct Callable<X: GXExt> {
253    rt: GXHandle<X>,
254    id: CallableId,
255    env: Env,
256    pub typ: FnType,
257    pub expr: ExprId,
258}
259
260impl<X: GXExt> Drop for Callable<X> {
261    fn drop(&mut self) {
262        let _ = self.rt.0.tx.send(ToGX::DeleteCallable { id: self.id });
263    }
264}
265
266impl<X: GXExt> Callable<X> {
267    /// Get the id of this callable
268    pub fn id(&self) -> CallableId {
269        self.id
270    }
271
272    /// Call the lambda with args
273    ///
274    /// Argument types and arity will be checked and an error will be returned
275    /// if they are wrong. If you call the function more than once before it
276    /// returns there is no guarantee that the returns will arrive in the order
277    /// of the calls. There is no guarantee that a function must return.
278    pub async fn call(&self, args: ValArray) -> Result<()> {
279        if self.typ.args.len() != args.len() {
280            bail!("expected {} args", self.typ.args.len())
281        }
282        for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
283            if !a.typ.is_a(&self.env, v) {
284                bail!("type mismatch arg {i} expected {}", a.typ)
285            }
286        }
287        self.call_unchecked(args).await
288    }
289
290    /// Call the lambda with args. Argument types and arity will NOT
291    /// be checked. This can result in a runtime panic, invalid
292    /// results, and probably other bad things.
293    pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
294        self.rt
295            .0
296            .tx
297            .send(ToGX::Call { id: self.id, args })
298            .map_err(|_| anyhow!("runtime is dead"))
299    }
300
301    /// Return Some(v) if this update is the return value of the callable
302    pub fn update<'a>(&self, id: ExprId, v: &'a Value) -> Option<&'a Value> {
303        if self.expr == id {
304            Some(v)
305        } else {
306            None
307        }
308    }
309}
310
311enum DeferredCall {
312    Call(ValArray, oneshot::Sender<Result<()>>),
313    CallUnchecked(ValArray, oneshot::Sender<Result<()>>),
314}
315
316pub struct NamedCallable<X: GXExt> {
317    fname: Ref<X>,
318    current: Option<Callable<X>>,
319    ids: FxHashSet<ExprId>,
320    deferred: Vec<DeferredCall>,
321    h: GXHandle<X>,
322}
323
324impl<X: GXExt> NamedCallable<X> {
325    /// Update the named callable function
326    ///
327    /// This method does two things,
328    /// - Handle late binding. When the name ref updates to an actual function
329    ///   compile the real call site
330    /// - Return Ok(Some(v)) when the called function returns
331    pub async fn update<'a>(
332        &mut self,
333        id: ExprId,
334        v: &'a Value,
335    ) -> Result<Option<&'a Value>> {
336        match self.fname.update(id, v) {
337            Some(v) => {
338                let callable = self.h.compile_callable(v.clone()).await?;
339                self.ids.insert(callable.expr);
340                for dc in self.deferred.drain(..) {
341                    match dc {
342                        DeferredCall::Call(args, reply) => {
343                            let _ = reply.send(callable.call(args).await);
344                        }
345                        DeferredCall::CallUnchecked(args, reply) => {
346                            let _ = reply.send(callable.call_unchecked(args).await);
347                        }
348                    }
349                }
350                self.current = Some(callable);
351                Ok(None)
352            }
353            None if self.ids.contains(&id) => Ok(Some(v)),
354            None => Ok(None),
355        }
356    }
357
358    /// Call the lambda with args
359    ///
360    /// Argument types and arity will be checked and an error will be returned
361    /// if they are wrong. If you call the function more than once before it
362    /// returns there is no guarantee that the returns will arrive in the order
363    /// of the calls. There is no guarantee that a function must return. In
364    /// order to handle late binding you must keep calling `update` while
365    /// waiting for this method.
366    ///
367    /// While a late bound function is unresolved calls will queue internally in
368    /// the NamedCallsite and will happen when the function is resolved.
369    pub async fn call(&mut self, args: ValArray) -> Result<()> {
370        match &self.current {
371            Some(c) => c.call(args).await,
372            None => {
373                let (tx, rx) = oneshot::channel();
374                self.deferred.push(DeferredCall::Call(args, tx));
375                rx.await?
376            }
377        }
378    }
379
380    /// call the function with the specified args
381    ///
382    /// Argument types and arity will NOT be checked by this method. If you call
383    /// the function more than once before it returns there is no guarantee that
384    /// the returns will arrive in the order of the calls. There is no guarantee
385    /// that a function must return. In order to handle late binding you must
386    /// keep calling `update` while waiting for this method.
387    ///
388    /// While a late bound function is unresolved calls will queue internally in
389    /// the NamedCallsite and will happen when the function is resolved.
390    pub async fn call_unchecked(&mut self, args: ValArray) -> Result<()> {
391        match &self.current {
392            Some(c) => c.call(args).await,
393            None => {
394                let (tx, rx) = oneshot::channel();
395                self.deferred.push(DeferredCall::CallUnchecked(args, tx));
396                rx.await?
397            }
398        }
399    }
400}
401
402enum ToGX<X: GXExt> {
403    GetEnv {
404        res: oneshot::Sender<Env>,
405    },
406    Delete {
407        id: ExprId,
408    },
409    Load {
410        path: Source,
411        rt: GXHandle<X>,
412        res: oneshot::Sender<Result<CompRes<X>>>,
413    },
414    Check {
415        path: Source,
416        res: oneshot::Sender<Result<()>>,
417    },
418    Compile {
419        text: ArcStr,
420        rt: GXHandle<X>,
421        res: oneshot::Sender<Result<CompRes<X>>>,
422    },
423    CompileCallable {
424        id: Value,
425        rt: GXHandle<X>,
426        res: oneshot::Sender<Result<Callable<X>>>,
427    },
428    CompileRef {
429        id: BindId,
430        rt: GXHandle<X>,
431        res: oneshot::Sender<Result<Ref<X>>>,
432    },
433    Set {
434        id: BindId,
435        v: Value,
436    },
437    Call {
438        id: CallableId,
439        args: ValArray,
440    },
441    DeleteCallable {
442        id: CallableId,
443    },
444}
445
446#[derive(Debug, Clone)]
447pub enum GXEvent {
448    Updated(ExprId, Value),
449    Env(Env),
450}
451
452struct GXHandleInner<X: GXExt> {
453    tx: tmpsc::UnboundedSender<ToGX<X>>,
454    task: JoinHandle<()>,
455}
456
457impl<X: GXExt> Drop for GXHandleInner<X> {
458    fn drop(&mut self) {
459        self.task.abort()
460    }
461}
462
463/// A handle to a running GX instance.
464///
465/// Drop the handle to shutdown the associated background tasks.
466pub struct GXHandle<X: GXExt>(Arc<GXHandleInner<X>>);
467
468impl<X: GXExt> fmt::Debug for GXHandle<X> {
469    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
470        write!(f, "GXHandle")
471    }
472}
473
474impl<X: GXExt> Clone for GXHandle<X> {
475    fn clone(&self) -> Self {
476        Self(self.0.clone())
477    }
478}
479
480impl<X: GXExt> GXHandle<X> {
481    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
482        let (tx, rx) = oneshot::channel();
483        self.0.tx.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
484        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
485    }
486
487    /// Get a copy of the current graphix environment
488    pub async fn get_env(&self) -> Result<Env> {
489        self.exec(|res| ToGX::GetEnv { res }).await
490    }
491
492    /// Check that a graphix module compiles
493    ///
494    /// If path startes with `netidx:` then the module will be loaded
495    /// from netidx, otherwise it will be loaded from the
496    /// filesystem. If the file compiles successfully return Ok(())
497    /// otherwise an error describing the problem. The environment
498    /// will not be altered by checking an expression, so you will not
499    /// be able to use any defined names later in the program. If you
500    /// want to do that see `compile`.
501    pub async fn check(&self, path: Source) -> Result<()> {
502        Ok(self.exec(|tx| ToGX::Check { path, res: tx }).await??)
503    }
504
505    /// Compile and execute a graphix expression
506    ///
507    /// If it generates results, they will be sent to all the channels that are
508    /// subscribed. When the `CompExp` objects contained in the `CompRes` are
509    /// dropped their corresponding expressions will be deleted. Therefore, you
510    /// can stop execution of the whole expression by dropping the returned
511    /// `CompRes`.
512    pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
513        Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
514    }
515
516    /// Load and execute a file or netidx value
517    ///
518    /// When the `CompExp` objects contained in the `CompRes` are
519    /// dropped their corresponding expressions will be
520    /// deleted. Therefore, you can stop execution of the whole file
521    /// by dropping the returned `CompRes`.
522    pub async fn load(&self, path: Source) -> Result<CompRes<X>> {
523        Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
524    }
525
526    /// Compile a callable interface to a lambda id
527    ///
528    /// This is how you call a lambda directly from rust. When the returned
529    /// `Callable` is dropped the associated callsite will be delete.
530    pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
531        Ok(self
532            .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
533            .await??)
534    }
535
536    /// Compile a callable interface to a late bound function by name
537    ///
538    /// This allows you to call a function by name. Because of late binding it
539    /// has some additional complexity (though less than implementing it
540    /// yourself). You must call `update` on `NamedCallable` when you recieve
541    /// updates from the runtime in order to drive late binding. `update` will
542    /// also return `Some` when one of your function calls returns.
543    pub async fn compile_callable_by_name(
544        &self,
545        env: &Env,
546        scope: &Scope,
547        name: &ModPath,
548    ) -> Result<NamedCallable<X>> {
549        let r = self.compile_ref_by_name(env, scope, name).await?;
550        match &r.typ {
551            Type::Fn(_) => (),
552            t => bail!(
553                "{name} in scope {} has type {t}. expected a function",
554                scope.lexical
555            ),
556        }
557        Ok(NamedCallable {
558            fname: r,
559            current: None,
560            ids: FxHashSet::default(),
561            deferred: vec![],
562            h: self.clone(),
563        })
564    }
565
566    /// Compile a ref to a bind id
567    ///
568    /// This will NOT return an error if the id isn't in the environment.
569    pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
570        Ok(self
571            .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
572            .await??)
573    }
574
575    /// Compile a ref to a name
576    ///
577    /// Return an error if the name does not exist in the environment
578    pub async fn compile_ref_by_name(
579        &self,
580        env: &Env,
581        scope: &Scope,
582        name: &ModPath,
583    ) -> Result<Ref<X>> {
584        let id = env
585            .lookup_bind(&scope.lexical, name)
586            .ok_or_else(|| anyhow!("no such value {name} in scope {}", scope.lexical))?
587            .1
588            .id;
589        self.compile_ref(id).await
590    }
591
592    /// Set the variable idenfified by `id` to `v`
593    ///
594    /// triggering updates of all dependent node trees. This does the same thing
595    /// as`Ref::set` and `TRef::set`
596    pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
597        let v = v.into();
598        self.0.tx.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
599    }
600
601    /// Call a callable by id with the given arguments
602    ///
603    /// This is a fire-and-forget call that does not wait for the result.
604    /// Unlike `Callable::call`, no type or arity checking is performed.
605    pub fn call(&self, id: CallableId, args: ValArray) -> Result<()> {
606        self.0.tx.send(ToGX::Call { id, args }).map_err(|_| anyhow!("runtime is dead"))
607    }
608}
609
610#[derive(Builder)]
611#[builder(pattern = "owned")]
612pub struct GXConfig<X: GXExt> {
613    /// The subscribe timeout to use when resolving modules in
614    /// netidx. Resolution will fail if the subscription does not
615    /// succeed before this timeout elapses.
616    #[builder(setter(strip_option), default)]
617    resolve_timeout: Option<Duration>,
618    /// The publish timeout to use when sending published batches. Default None.
619    #[builder(setter(strip_option), default)]
620    publish_timeout: Option<Duration>,
621    /// The execution context with any builtins already registered
622    ctx: ExecCtx<GXRt<X>, X::UserEvent>,
623    /// The text of the root module
624    #[builder(setter(strip_option), default)]
625    root: Option<ArcStr>,
626    /// The set of module resolvers to use when resolving loaded modules
627    #[builder(default)]
628    resolvers: Vec<ModuleResolver>,
629    /// The channel that will receive events from the runtime
630    sub: tmpsc::Sender<GPooled<Vec<GXEvent>>>,
631    /// The set of compiler flags. Default empty.
632    #[builder(default)]
633    flags: BitFlags<CFlag>,
634}
635
636impl<X: GXExt> GXConfig<X> {
637    /// Create a new config
638    pub fn builder(
639        ctx: ExecCtx<GXRt<X>, X::UserEvent>,
640        sub: tmpsc::Sender<GPooled<Vec<GXEvent>>>,
641    ) -> GXConfigBuilder<X> {
642        GXConfigBuilder::default().ctx(ctx).sub(sub)
643    }
644
645    /// Start the graphix runtime with the specified config,
646    ///
647    /// return a handle capable of interacting with it. root is the text of the
648    /// root module you wish to initially load. This will define the environment
649    /// for the rest of the code compiled by this runtime. The runtime starts
650    /// completely empty, with only the language, no core library, no standard
651    /// library. To build a runtime with the full standard library and nothing
652    /// else simply pass the output of `graphix_stdlib::register` to start.
653    pub async fn start(self) -> Result<GXHandle<X>> {
654        let (init_tx, init_rx) = oneshot::channel();
655        let (tx, rx) = tmpsc::unbounded_channel();
656        let task = task::spawn(async move {
657            match GX::new(self).await {
658                Ok(bs) => {
659                    let _ = init_tx.send(Ok(()));
660                    if let Err(e) = bs.run(rx).await {
661                        error!("run loop exited with error {e:?}")
662                    }
663                }
664                Err(e) => {
665                    let _ = init_tx.send(Err(e));
666                }
667            };
668        });
669        init_rx.await??;
670        Ok(GXHandle(Arc::new(GXHandleInner { tx, task })))
671    }
672}