graphix_rt/
lib.rs

1//! A general purpose graphix runtime
2//!
3//! This module implements a generic graphix runtime suitable for most
4//! applications, including applications that implement custom graphix
5//! builtins. The graphix interperter is run in a background task, and
6//! can be interacted with via a handle. All features of the standard
7//! library are supported by this runtime.
8use anyhow::{anyhow, bail, Result};
9use arcstr::ArcStr;
10use derive_builder::Builder;
11use enumflags2::BitFlags;
12use graphix_compiler::{
13    env::Env,
14    expr::{ExprId, ModuleResolver},
15    typ::{FnType, Type},
16    BindId, CFlag, Event, ExecCtx, NoUserEvent, UserEvent,
17};
18use log::error;
19use netidx::{
20    protocol::valarray::ValArray,
21    publisher::{Value, WriteRequest},
22    subscriber::{self, SubId},
23};
24use netidx_core::atomic_id;
25use netidx_value::FromValue;
26use poolshark::global::GPooled;
27use serde_derive::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use std::{fmt, future, path::PathBuf, time::Duration};
30use tokio::{
31    sync::{
32        mpsc::{self as tmpsc},
33        oneshot,
34    },
35    task,
36};
37
38mod gx;
39mod rt;
40use gx::GX;
41pub use rt::GXRt;
42
43/// Trait to extend the event loop
44///
45/// The Graphix event loop has two steps,
46/// - update event sources, polls external async event sources like
47///   netidx, sockets, files, etc
48/// - do cycle, collects all the events and delivers them to the dataflow
49///   graph as a batch of "everything that happened"
50///
51/// As such to extend the event loop you must implement two things. A function
52/// to poll your own external event sources, and a function to take the events
53/// you got from those sources and represent them to the dataflow graph. You
54/// represent them either by setting generic variables (bindid -> value map), or
55/// by setting some custom structures that you define as part of your UserEvent
56/// implementation.
57///
58/// Your Graphix builtins can access both your custom structure, to register new
59/// event sources, etc, and your custom user event structure, to receive events
60/// who's types do not fit nicely as `Value`. If your event payload does fit
61/// nicely as a `Value`, then just use a variable.
62pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
63    type UserEvent: UserEvent + Send + Sync + 'static;
64
65    /// Update your custom event sources
66    ///
67    /// Your `update_sources` MUST be cancel safe.
68    fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
69
70    /// Collect events that happened and marshal them into the event structure
71    ///
72    /// for delivery to the dataflow graph. `do_cycle` will be called, and a
73    /// batch of events delivered to the graph until `is_ready` returns false.
74    /// It is possible that a call to `update_sources` will result in
75    /// multiple calls to `do_cycle`, but it is not guaranteed that
76    /// `update_sources` will not be called again before `is_ready`
77    /// returns false.
78    fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
79
80    /// Return true if there are events ready to deliver
81    fn is_ready(&self) -> bool;
82
83    /// Clear the state
84    fn clear(&mut self);
85
86    /// Create and return an empty custom event structure
87    fn empty_event(&mut self) -> Self::UserEvent;
88}
89
90#[derive(Debug, Default)]
91pub struct NoExt;
92
93impl GXExt for NoExt {
94    type UserEvent = NoUserEvent;
95
96    async fn update_sources(&mut self) -> Result<()> {
97        future::pending().await
98    }
99
100    fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
101        Ok(())
102    }
103
104    fn is_ready(&self) -> bool {
105        false
106    }
107
108    fn clear(&mut self) {}
109
110    fn empty_event(&mut self) -> Self::UserEvent {
111        NoUserEvent
112    }
113}
114
115type UpdateBatch = GPooled<Vec<(SubId, subscriber::Event)>>;
116type WriteBatch = GPooled<Vec<WriteRequest>>;
117
118#[derive(Debug)]
119pub struct CouldNotResolve;
120
121impl fmt::Display for CouldNotResolve {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        write!(f, "could not resolve module")
124    }
125}
126
127#[derive(Debug)]
128pub struct CompExp<X: GXExt> {
129    pub id: ExprId,
130    pub typ: Type,
131    pub output: bool,
132    rt: GXHandle<X>,
133}
134
135impl<X: GXExt> Drop for CompExp<X> {
136    fn drop(&mut self) {
137        let _ = self.rt.0.send(ToGX::Delete { id: self.id });
138    }
139}
140
141#[derive(Debug)]
142pub struct CompRes<X: GXExt> {
143    pub exprs: SmallVec<[CompExp<X>; 1]>,
144    pub env: Env<GXRt<X>, X::UserEvent>,
145}
146
147pub struct Ref<X: GXExt> {
148    pub id: ExprId,
149    pub last: Option<Value>,
150    pub bid: BindId,
151    pub target_bid: Option<BindId>,
152    rt: GXHandle<X>,
153}
154
155impl<X: GXExt> Drop for Ref<X> {
156    fn drop(&mut self) {
157        let _ = self.rt.0.send(ToGX::Delete { id: self.id });
158    }
159}
160
161impl<X: GXExt> Ref<X> {
162    pub fn set(&self, v: Value) -> Result<()> {
163        self.rt.set(self.bid, v)
164    }
165
166    pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
167        if let Some(id) = self.target_bid {
168            self.rt.set(id, v)?
169        }
170        Ok(())
171    }
172}
173
174pub struct TRef<X: GXExt, T: FromValue> {
175    pub r: Ref<X>,
176    pub t: Option<T>,
177}
178
179impl<X: GXExt, T: FromValue> TRef<X, T> {
180    pub fn new(mut r: Ref<X>) -> Result<Self> {
181        let t = r.last.take().map(|v| v.cast_to()).transpose()?;
182        Ok(TRef { r, t })
183    }
184
185    pub fn update(&mut self, id: ExprId, v: &Value) -> Result<Option<&mut T>> {
186        if self.r.id == id {
187            let v = v.clone().cast_to()?;
188            self.t = Some(v);
189            Ok(self.t.as_mut())
190        } else {
191            Ok(None)
192        }
193    }
194}
195
196impl<X: GXExt, T: Into<Value> + FromValue + Clone> TRef<X, T> {
197    pub fn set(&mut self, t: T) -> Result<()> {
198        self.t = Some(t.clone());
199        self.r.set(t.into())
200    }
201
202    pub fn set_deref(&mut self, t: T) -> Result<()> {
203        self.t = Some(t.clone());
204        self.r.set_deref(t.into())
205    }
206}
207
208atomic_id!(CallableId);
209
210pub struct Callable<X: GXExt> {
211    rt: GXHandle<X>,
212    id: CallableId,
213    env: Env<GXRt<X>, X::UserEvent>,
214    pub typ: FnType,
215    pub expr: ExprId,
216}
217
218impl<X: GXExt> Drop for Callable<X> {
219    fn drop(&mut self) {
220        let _ = self.rt.0.send(ToGX::DeleteCallable { id: self.id });
221    }
222}
223
224impl<X: GXExt> Callable<X> {
225    /// Call the lambda with args. Argument types and arity will be
226    /// checked and an error will be returned if they are wrong.
227    pub async fn call(&self, args: ValArray) -> Result<()> {
228        if self.typ.args.len() != args.len() {
229            bail!("expected {} args", self.typ.args.len())
230        }
231        for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
232            if !a.typ.is_a(&self.env, v) {
233                bail!("type mismatch arg {i} expected {}", a.typ)
234            }
235        }
236        self.call_unchecked(args).await
237    }
238
239    /// Call the lambda with args. Argument types and arity will NOT
240    /// be checked. This can result in a runtime panic, invalid
241    /// results, and probably other bad things.
242    pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
243        self.rt
244            .0
245            .send(ToGX::Call { id: self.id, args })
246            .map_err(|_| anyhow!("runtime is dead"))
247    }
248}
249
250enum ToGX<X: GXExt> {
251    GetEnv {
252        res: oneshot::Sender<Env<GXRt<X>, X::UserEvent>>,
253    },
254    Delete {
255        id: ExprId,
256    },
257    Load {
258        path: PathBuf,
259        rt: GXHandle<X>,
260        res: oneshot::Sender<Result<CompRes<X>>>,
261    },
262    Compile {
263        text: ArcStr,
264        rt: GXHandle<X>,
265        res: oneshot::Sender<Result<CompRes<X>>>,
266    },
267    CompileCallable {
268        id: Value,
269        rt: GXHandle<X>,
270        res: oneshot::Sender<Result<Callable<X>>>,
271    },
272    CompileRef {
273        id: BindId,
274        rt: GXHandle<X>,
275        res: oneshot::Sender<Result<Ref<X>>>,
276    },
277    Set {
278        id: BindId,
279        v: Value,
280    },
281    Call {
282        id: CallableId,
283        args: ValArray,
284    },
285    DeleteCallable {
286        id: CallableId,
287    },
288}
289
290#[derive(Debug, Clone)]
291pub enum GXEvent<X: GXExt> {
292    Updated(ExprId, Value),
293    Env(Env<GXRt<X>, X::UserEvent>),
294}
295
296/// A handle to a running GX instance.
297///
298/// Drop the handle to shutdown the associated background tasks.
299pub struct GXHandle<X: GXExt>(tmpsc::UnboundedSender<ToGX<X>>);
300
301impl<X: GXExt> fmt::Debug for GXHandle<X> {
302    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303        write!(f, "GXHandle")
304    }
305}
306
307impl<X: GXExt> Clone for GXHandle<X> {
308    fn clone(&self) -> Self {
309        Self(self.0.clone())
310    }
311}
312
313impl<X: GXExt> GXHandle<X> {
314    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
315        let (tx, rx) = oneshot::channel();
316        self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
317        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
318    }
319
320    /// Get a copy of the current graphix environment
321    pub async fn get_env(&self) -> Result<Env<GXRt<X>, X::UserEvent>> {
322        self.exec(|res| ToGX::GetEnv { res }).await
323    }
324
325    /// Compile and execute the specified graphix expression.
326    ///
327    /// If it generates results, they will be sent to all the channels that are
328    /// subscribed. When the `CompExp` objects contained in the `CompRes` are
329    /// dropped their corresponding expressions will be deleted. Therefore, you
330    /// can stop execution of the whole expression by dropping the returned
331    /// `CompRes`.
332    pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
333        Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
334    }
335
336    /// Load and execute the specified graphix module.
337    ///
338    /// The path may have one of two forms. If it is the path to a file with
339    /// extension .bs then the rt will load the file directly. If it is a
340    /// modpath (e.g. foo::bar::baz) then the module resolver will look for a
341    /// matching module in the modpath. When the `CompExp` objects contained in
342    /// the `CompRes` are dropped their corresponding expressions will be
343    /// deleted. Therefore, you can stop execution of the whole file by dropping
344    /// the returned `CompRes`.
345    pub async fn load(&self, path: PathBuf) -> Result<CompRes<X>> {
346        Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
347    }
348
349    /// Compile a callable interface to the specified lambda id.
350    ///
351    /// This is how you call a lambda directly from rust. When the returned
352    /// `Callable` is dropped the associated callsite will be delete.
353    pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
354        Ok(self
355            .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
356            .await??)
357    }
358
359    /// Compile an expression that will output the value of the ref specifed by
360    /// id.
361    ///
362    /// This is the same as the deref (*) operator in graphix. When the returned
363    /// `Ref` is dropped the compiled code will be deleted.
364    pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
365        Ok(self
366            .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
367            .await??)
368    }
369
370    /// Set the variable idenfified by `id` to `v`
371    ///
372    /// triggering updates of all dependent node trees.
373    pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
374        let v = v.into();
375        self.0.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
376    }
377}
378
379#[derive(Builder)]
380#[builder(pattern = "owned")]
381pub struct GXConfig<X: GXExt> {
382    /// The subscribe timeout to use when resolving modules in
383    /// netidx. Resolution will fail if the subscription does not
384    /// succeed before this timeout elapses.
385    #[builder(setter(strip_option), default)]
386    resolve_timeout: Option<Duration>,
387    /// The publish timeout to use when sending published batches. Default None.
388    #[builder(setter(strip_option), default)]
389    publish_timeout: Option<Duration>,
390    /// The execution context with any builtins already registered
391    ctx: ExecCtx<GXRt<X>, X::UserEvent>,
392    /// The text of the root module
393    #[builder(setter(strip_option), default)]
394    root: Option<ArcStr>,
395    /// The set of module resolvers to use when resolving loaded modules
396    #[builder(default)]
397    resolvers: Vec<ModuleResolver>,
398    /// The channel that will receive events from the runtime
399    sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
400    /// The set of compiler flags. Default empty.
401    #[builder(default)]
402    flags: BitFlags<CFlag>,
403}
404
405impl<X: GXExt> GXConfig<X> {
406    /// Create a new config
407    pub fn builder(
408        ctx: ExecCtx<GXRt<X>, X::UserEvent>,
409        sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
410    ) -> GXConfigBuilder<X> {
411        GXConfigBuilder::default().ctx(ctx).sub(sub)
412    }
413
414    /// Start the graphix runtime with the specified config,
415    ///
416    /// return a handle capable of interacting with it. root is the text of the
417    /// root module you wish to initially load. This will define the environment
418    /// for the rest of the code compiled by this runtime. The runtime starts
419    /// completely empty, with only the language, no core library, no standard
420    /// library. To build a runtime with the full standard library and nothing
421    /// else simply pass the output of `graphix_stdlib::register` to start.
422    pub async fn start(self) -> Result<GXHandle<X>> {
423        let (init_tx, init_rx) = oneshot::channel();
424        let (tx, rx) = tmpsc::unbounded_channel();
425        task::spawn(async move {
426            match GX::new(self).await {
427                Ok(bs) => {
428                    let _ = init_tx.send(Ok(()));
429                    if let Err(e) = bs.run(rx).await {
430                        error!("run loop exited with error {e:?}")
431                    }
432                }
433                Err(e) => {
434                    let _ = init_tx.send(Err(e));
435                }
436            };
437        });
438        init_rx.await??;
439        Ok(GXHandle(tx))
440    }
441}