graphix_rt/
lib.rs

1//! A general purpose graphix runtime
2//!
3//! This module implements a generic graphix runtime suitable for most
4//! applications, including applications that implement custom graphix
5//! builtins. The graphix interperter is run in a background task, and
6//! can be interacted with via a handle. All features of the standard
7//! library are supported by this runtime.
8use anyhow::{anyhow, bail, Result};
9use arcstr::ArcStr;
10use derive_builder::Builder;
11use enumflags2::BitFlags;
12use graphix_compiler::{
13    env::Env,
14    expr::{ExprId, ModuleResolver},
15    typ::{FnType, Type},
16    BindId, CFlag, Event, ExecCtx, NoUserEvent, UserEvent,
17};
18use log::error;
19use netidx::{
20    protocol::valarray::ValArray,
21    publisher::{Value, WriteRequest},
22    subscriber::{self, SubId},
23};
24use netidx_core::atomic_id;
25use netidx_value::FromValue;
26use poolshark::global::GPooled;
27use serde_derive::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use std::{fmt, future, path::PathBuf, time::Duration};
30use tokio::{
31    sync::{
32        mpsc::{self as tmpsc},
33        oneshot,
34    },
35    task,
36};
37
38mod gx;
39mod rt;
40use gx::GX;
41pub use rt::GXRt;
42
43/// Trait to extend the event loop
44///
45/// The Graphix event loop has two steps,
46/// - update event sources, polls external async event sources like
47///   netidx, sockets, files, etc
48/// - do cycle, collects all the events and delivers them to the dataflow
49///   graph as a batch of "everything that happened"
50///
51/// As such to extend the event loop you must implement two things. A function
52/// to poll your own external event sources, and a function to take the events
53/// you got from those sources and represent them to the dataflow graph. You
54/// represent them either by setting generic variables (bindid -> value map), or
55/// by setting some custom structures that you define as part of your UserEvent
56/// implementation.
57///
58/// Your Graphix builtins can access both your custom structure, to register new
59/// event sources, etc, and your custom user event structure, to receive events
60/// who's types do not fit nicely as `Value`. If your event payload does fit
61/// nicely as a `Value`, then just use a variable.
62pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
63    type UserEvent: UserEvent + Send + Sync + 'static;
64
65    /// Update your custom event sources
66    ///
67    /// Your `update_sources` MUST be cancel safe.
68    fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
69
70    /// Collect events that happened and marshal them into the event structure
71    ///
72    /// for delivery to the dataflow graph. `do_cycle` will be called, and a
73    /// batch of events delivered to the graph until `is_ready` returns false.
74    /// It is possible that a call to `update_sources` will result in
75    /// multiple calls to `do_cycle`, but it is not guaranteed that
76    /// `update_sources` will not be called again before `is_ready`
77    /// returns false.
78    fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
79
80    /// Return true if there are events ready to deliver
81    fn is_ready(&self) -> bool;
82
83    /// Clear the state
84    fn clear(&mut self);
85
86    /// Create and return an empty custom event structure
87    fn empty_event(&mut self) -> Self::UserEvent;
88}
89
90#[derive(Debug, Default)]
91pub struct NoExt;
92
93impl GXExt for NoExt {
94    type UserEvent = NoUserEvent;
95
96    async fn update_sources(&mut self) -> Result<()> {
97        future::pending().await
98    }
99
100    fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
101        Ok(())
102    }
103
104    fn is_ready(&self) -> bool {
105        false
106    }
107
108    fn clear(&mut self) {}
109
110    fn empty_event(&mut self) -> Self::UserEvent {
111        NoUserEvent
112    }
113}
114
115type UpdateBatch = GPooled<Vec<(SubId, subscriber::Event)>>;
116type WriteBatch = GPooled<Vec<WriteRequest>>;
117
118#[derive(Debug)]
119pub struct CompExp<X: GXExt> {
120    pub id: ExprId,
121    pub typ: Type,
122    pub output: bool,
123    rt: GXHandle<X>,
124}
125
126impl<X: GXExt> Drop for CompExp<X> {
127    fn drop(&mut self) {
128        let _ = self.rt.0.send(ToGX::Delete { id: self.id });
129    }
130}
131
132#[derive(Debug)]
133pub struct CompRes<X: GXExt> {
134    pub exprs: SmallVec<[CompExp<X>; 1]>,
135    pub env: Env<GXRt<X>, X::UserEvent>,
136}
137
138pub struct Ref<X: GXExt> {
139    pub id: ExprId,
140    pub last: Option<Value>,
141    pub bid: BindId,
142    pub target_bid: Option<BindId>,
143    rt: GXHandle<X>,
144}
145
146impl<X: GXExt> Drop for Ref<X> {
147    fn drop(&mut self) {
148        let _ = self.rt.0.send(ToGX::Delete { id: self.id });
149    }
150}
151
152impl<X: GXExt> Ref<X> {
153    pub fn set(&self, v: Value) -> Result<()> {
154        self.rt.set(self.bid, v)
155    }
156
157    pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
158        if let Some(id) = self.target_bid {
159            self.rt.set(id, v)?
160        }
161        Ok(())
162    }
163}
164
165pub struct TRef<X: GXExt, T: FromValue> {
166    pub r: Ref<X>,
167    pub t: Option<T>,
168}
169
170impl<X: GXExt, T: FromValue> TRef<X, T> {
171    pub fn new(mut r: Ref<X>) -> Result<Self> {
172        let t = r.last.take().map(|v| v.cast_to()).transpose()?;
173        Ok(TRef { r, t })
174    }
175
176    pub fn update(&mut self, id: ExprId, v: &Value) -> Result<Option<&mut T>> {
177        if self.r.id == id {
178            let v = v.clone().cast_to()?;
179            self.t = Some(v);
180            Ok(self.t.as_mut())
181        } else {
182            Ok(None)
183        }
184    }
185}
186
187impl<X: GXExt, T: Into<Value> + FromValue + Clone> TRef<X, T> {
188    pub fn set(&mut self, t: T) -> Result<()> {
189        self.t = Some(t.clone());
190        self.r.set(t.into())
191    }
192
193    pub fn set_deref(&mut self, t: T) -> Result<()> {
194        self.t = Some(t.clone());
195        self.r.set_deref(t.into())
196    }
197}
198
199atomic_id!(CallableId);
200
201pub struct Callable<X: GXExt> {
202    rt: GXHandle<X>,
203    id: CallableId,
204    env: Env<GXRt<X>, X::UserEvent>,
205    pub typ: FnType,
206    pub expr: ExprId,
207}
208
209impl<X: GXExt> Drop for Callable<X> {
210    fn drop(&mut self) {
211        let _ = self.rt.0.send(ToGX::DeleteCallable { id: self.id });
212    }
213}
214
215impl<X: GXExt> Callable<X> {
216    /// Call the lambda with args. Argument types and arity will be
217    /// checked and an error will be returned if they are wrong.
218    pub async fn call(&self, args: ValArray) -> Result<()> {
219        if self.typ.args.len() != args.len() {
220            bail!("expected {} args", self.typ.args.len())
221        }
222        for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
223            if !a.typ.is_a(&self.env, v) {
224                bail!("type mismatch arg {i} expected {}", a.typ)
225            }
226        }
227        self.call_unchecked(args).await
228    }
229
230    /// Call the lambda with args. Argument types and arity will NOT
231    /// be checked. This can result in a runtime panic, invalid
232    /// results, and probably other bad things.
233    pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
234        self.rt
235            .0
236            .send(ToGX::Call { id: self.id, args })
237            .map_err(|_| anyhow!("runtime is dead"))
238    }
239}
240
241enum ToGX<X: GXExt> {
242    GetEnv {
243        res: oneshot::Sender<Env<GXRt<X>, X::UserEvent>>,
244    },
245    Delete {
246        id: ExprId,
247    },
248    Load {
249        path: PathBuf,
250        rt: GXHandle<X>,
251        res: oneshot::Sender<Result<CompRes<X>>>,
252    },
253    Check {
254        path: PathBuf,
255        res: oneshot::Sender<Result<()>>,
256    },
257    Compile {
258        text: ArcStr,
259        rt: GXHandle<X>,
260        res: oneshot::Sender<Result<CompRes<X>>>,
261    },
262    CompileCallable {
263        id: Value,
264        rt: GXHandle<X>,
265        res: oneshot::Sender<Result<Callable<X>>>,
266    },
267    CompileRef {
268        id: BindId,
269        rt: GXHandle<X>,
270        res: oneshot::Sender<Result<Ref<X>>>,
271    },
272    Set {
273        id: BindId,
274        v: Value,
275    },
276    Call {
277        id: CallableId,
278        args: ValArray,
279    },
280    DeleteCallable {
281        id: CallableId,
282    },
283}
284
285#[derive(Debug, Clone)]
286pub enum GXEvent<X: GXExt> {
287    Updated(ExprId, Value),
288    Env(Env<GXRt<X>, X::UserEvent>),
289}
290
291/// A handle to a running GX instance.
292///
293/// Drop the handle to shutdown the associated background tasks.
294pub struct GXHandle<X: GXExt>(tmpsc::UnboundedSender<ToGX<X>>);
295
296impl<X: GXExt> fmt::Debug for GXHandle<X> {
297    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298        write!(f, "GXHandle")
299    }
300}
301
302impl<X: GXExt> Clone for GXHandle<X> {
303    fn clone(&self) -> Self {
304        Self(self.0.clone())
305    }
306}
307
308impl<X: GXExt> GXHandle<X> {
309    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
310        let (tx, rx) = oneshot::channel();
311        self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
312        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
313    }
314
315    /// Get a copy of the current graphix environment
316    pub async fn get_env(&self) -> Result<Env<GXRt<X>, X::UserEvent>> {
317        self.exec(|res| ToGX::GetEnv { res }).await
318    }
319
320    /// Check that the specified file compiles and typechecks.
321    ///
322    /// If the file will compile and type check successfully
323    /// return Ok(()) otherwise an error describing the problem. The
324    /// environment will not be altered by checking an expression, so
325    /// you will not be able to use any defined names later in the
326    /// program. If you want to do that see `compile`
327    pub async fn check(&self, path: PathBuf) -> Result<()> {
328        Ok(self.exec(|tx| ToGX::Check { path, res: tx }).await??)
329    }
330
331    /// Compile and execute the specified graphix expression.
332    ///
333    /// If it generates results, they will be sent to all the channels that are
334    /// subscribed. When the `CompExp` objects contained in the `CompRes` are
335    /// dropped their corresponding expressions will be deleted. Therefore, you
336    /// can stop execution of the whole expression by dropping the returned
337    /// `CompRes`.
338    pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
339        Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
340    }
341
342    /// Load and execute the specified graphix module.
343    ///
344    /// The path may have one of two forms. If it is the path to a file with
345    /// extension .bs then the rt will load the file directly. If it is a
346    /// modpath (e.g. foo::bar::baz) then the module resolver will look for a
347    /// matching module in the modpath. When the `CompExp` objects contained in
348    /// the `CompRes` are dropped their corresponding expressions will be
349    /// deleted. Therefore, you can stop execution of the whole file by dropping
350    /// the returned `CompRes`.
351    pub async fn load(&self, path: PathBuf) -> Result<CompRes<X>> {
352        Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
353    }
354
355    /// Compile a callable interface to the specified lambda id.
356    ///
357    /// This is how you call a lambda directly from rust. When the returned
358    /// `Callable` is dropped the associated callsite will be delete.
359    pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
360        Ok(self
361            .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
362            .await??)
363    }
364
365    /// Compile an expression that will output the value of the ref specifed by
366    /// id.
367    ///
368    /// This is the same as the deref (*) operator in graphix. When the returned
369    /// `Ref` is dropped the compiled code will be deleted.
370    pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
371        Ok(self
372            .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
373            .await??)
374    }
375
376    /// Set the variable idenfified by `id` to `v`
377    ///
378    /// triggering updates of all dependent node trees.
379    pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
380        let v = v.into();
381        self.0.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
382    }
383}
384
385#[derive(Builder)]
386#[builder(pattern = "owned")]
387pub struct GXConfig<X: GXExt> {
388    /// The subscribe timeout to use when resolving modules in
389    /// netidx. Resolution will fail if the subscription does not
390    /// succeed before this timeout elapses.
391    #[builder(setter(strip_option), default)]
392    resolve_timeout: Option<Duration>,
393    /// The publish timeout to use when sending published batches. Default None.
394    #[builder(setter(strip_option), default)]
395    publish_timeout: Option<Duration>,
396    /// The execution context with any builtins already registered
397    ctx: ExecCtx<GXRt<X>, X::UserEvent>,
398    /// The text of the root module
399    #[builder(setter(strip_option), default)]
400    root: Option<ArcStr>,
401    /// The set of module resolvers to use when resolving loaded modules
402    #[builder(default)]
403    resolvers: Vec<ModuleResolver>,
404    /// The channel that will receive events from the runtime
405    sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
406    /// The set of compiler flags. Default empty.
407    #[builder(default)]
408    flags: BitFlags<CFlag>,
409}
410
411impl<X: GXExt> GXConfig<X> {
412    /// Create a new config
413    pub fn builder(
414        ctx: ExecCtx<GXRt<X>, X::UserEvent>,
415        sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
416    ) -> GXConfigBuilder<X> {
417        GXConfigBuilder::default().ctx(ctx).sub(sub)
418    }
419
420    /// Start the graphix runtime with the specified config,
421    ///
422    /// return a handle capable of interacting with it. root is the text of the
423    /// root module you wish to initially load. This will define the environment
424    /// for the rest of the code compiled by this runtime. The runtime starts
425    /// completely empty, with only the language, no core library, no standard
426    /// library. To build a runtime with the full standard library and nothing
427    /// else simply pass the output of `graphix_stdlib::register` to start.
428    pub async fn start(self) -> Result<GXHandle<X>> {
429        let (init_tx, init_rx) = oneshot::channel();
430        let (tx, rx) = tmpsc::unbounded_channel();
431        task::spawn(async move {
432            match GX::new(self).await {
433                Ok(bs) => {
434                    let _ = init_tx.send(Ok(()));
435                    if let Err(e) = bs.run(rx).await {
436                        error!("run loop exited with error {e:?}")
437                    }
438                }
439                Err(e) => {
440                    let _ = init_tx.send(Err(e));
441                }
442            };
443        });
444        init_rx.await??;
445        Ok(GXHandle(tx))
446    }
447}