graphix_rt/
lib.rs

1//! A general purpose graphix runtime
2//!
3//! This module implements a generic graphix runtime suitable for most
4//! applications, including applications that implement custom graphix
5//! builtins. The graphix interperter is run in a background task, and
6//! can be interacted with via a handle. All features of the standard
7//! library are supported by this runtime.
8use anyhow::{anyhow, bail, Result};
9use arcstr::ArcStr;
10use core::fmt;
11use derive_builder::Builder;
12use graphix_compiler::{
13    env::Env,
14    expr::{ExprId, ModuleResolver},
15    typ::{FnType, Type},
16    BindId, Event, ExecCtx, NoUserEvent, UserEvent,
17};
18use log::error;
19use netidx::{
20    pool::Pooled,
21    protocol::valarray::ValArray,
22    publisher::{Value, WriteRequest},
23    subscriber::{self, SubId},
24};
25use netidx_core::atomic_id;
26use netidx_value::FromValue;
27use serde_derive::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use std::{future, path::PathBuf, time::Duration};
30use tokio::{
31    sync::{
32        mpsc::{self as tmpsc},
33        oneshot,
34    },
35    task,
36};
37
38mod gx;
39mod rt;
40use gx::GX;
41pub use rt::GXRt;
42
43/// Trait to extend the event loop
44///
45/// The Graphix event loop has two steps,
46/// - update event sources, polls external async event sources like
47///   netidx, sockets, files, etc
48/// - do cycle, collects all the events and delivers them to the dataflow
49///   graph as a batch of "everything that happened"
50///
51/// As such to extend the event loop you must implement two things. A function
52/// to poll your own external event sources, and a function to take the events
53/// you got from those sources and represent them to the dataflow graph. You
54/// represent them either by setting generic variables (bindid -> value map), or
55/// by setting some custom structures that you define as part of your UserEvent
56/// implementation.
57///
58/// Your Graphix builtins can access both your custom structure, to register new
59/// event sources, etc, and your custom user event structure, to receive events
60/// who's types do not fit nicely as `Value`. If your event payload does fit
61/// nicely as a `Value`, then just use a variable.
62pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
63    type UserEvent: UserEvent + Send + Sync + 'static;
64
65    /// Update your custom event sources
66    ///
67    /// Your `update_sources` MUST be cancel safe.
68    fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
69
70    /// Collect events that happened and marshal them into the event structure
71    ///
72    /// for delivery to the dataflow graph. `do_cycle` will be called, and a
73    /// batch of events delivered to the graph until `is_ready` returns false.
74    /// It is possible that a call to `update_sources` will result in
75    /// multiple calls to `do_cycle`, but it is not guaranteed that
76    /// `update_sources` will not be called again before `is_ready`
77    /// returns false.
78    fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
79
80    /// Return true if there are events ready to deliver
81    fn is_ready(&self) -> bool;
82
83    /// Clear the state
84    fn clear(&mut self);
85
86    /// Create and return an empty custom event structure
87    fn empty_event(&mut self) -> Self::UserEvent;
88}
89
90#[derive(Debug, Default)]
91pub struct NoExt;
92
93impl GXExt for NoExt {
94    type UserEvent = NoUserEvent;
95
96    async fn update_sources(&mut self) -> Result<()> {
97        future::pending().await
98    }
99
100    fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
101        Ok(())
102    }
103
104    fn is_ready(&self) -> bool {
105        false
106    }
107
108    fn clear(&mut self) {}
109
110    fn empty_event(&mut self) -> Self::UserEvent {
111        NoUserEvent
112    }
113}
114
115type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
116type WriteBatch = Pooled<Vec<WriteRequest>>;
117
118#[derive(Debug)]
119pub struct CouldNotResolve;
120
121impl fmt::Display for CouldNotResolve {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        write!(f, "could not resolve module")
124    }
125}
126
127pub struct CompExp<X: GXExt> {
128    pub id: ExprId,
129    pub typ: Type,
130    pub output: bool,
131    rt: GXHandle<X>,
132}
133
134impl<X: GXExt> Drop for CompExp<X> {
135    fn drop(&mut self) {
136        let _ = self.rt.0.send(ToGX::Delete { id: self.id });
137    }
138}
139
140pub struct CompRes<X: GXExt> {
141    pub exprs: SmallVec<[CompExp<X>; 1]>,
142    pub env: Env<GXRt<X>, X::UserEvent>,
143}
144
145pub struct Ref<X: GXExt> {
146    pub id: ExprId,
147    pub last: Option<Value>,
148    pub bid: BindId,
149    pub target_bid: Option<BindId>,
150    rt: GXHandle<X>,
151}
152
153impl<X: GXExt> Drop for Ref<X> {
154    fn drop(&mut self) {
155        let _ = self.rt.0.send(ToGX::Delete { id: self.id });
156    }
157}
158
159impl<X: GXExt> Ref<X> {
160    pub fn set(&self, v: Value) -> Result<()> {
161        self.rt.set(self.bid, v)
162    }
163
164    pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
165        if let Some(id) = self.target_bid {
166            self.rt.set(id, v)?
167        }
168        Ok(())
169    }
170}
171
172pub struct TRef<X: GXExt, T: FromValue> {
173    pub r: Ref<X>,
174    pub t: Option<T>,
175}
176
177impl<X: GXExt, T: FromValue> TRef<X, T> {
178    pub fn new(mut r: Ref<X>) -> Result<Self> {
179        let t = r.last.take().map(|v| v.cast_to()).transpose()?;
180        Ok(TRef { r, t })
181    }
182
183    pub fn update(&mut self, id: ExprId, v: &Value) -> Result<Option<&mut T>> {
184        if self.r.id == id {
185            let v = v.clone().cast_to()?;
186            self.t = Some(v);
187            Ok(self.t.as_mut())
188        } else {
189            Ok(None)
190        }
191    }
192}
193
194impl<X: GXExt, T: Into<Value> + FromValue + Clone> TRef<X, T> {
195    pub fn set(&mut self, t: T) -> Result<()> {
196        self.t = Some(t.clone());
197        self.r.set(t.into())
198    }
199
200    pub fn set_deref(&mut self, t: T) -> Result<()> {
201        self.t = Some(t.clone());
202        self.r.set_deref(t.into())
203    }
204}
205
206atomic_id!(CallableId);
207
208pub struct Callable<X: GXExt> {
209    rt: GXHandle<X>,
210    id: CallableId,
211    env: Env<GXRt<X>, X::UserEvent>,
212    pub typ: FnType,
213    pub expr: ExprId,
214}
215
216impl<X: GXExt> Drop for Callable<X> {
217    fn drop(&mut self) {
218        let _ = self.rt.0.send(ToGX::DeleteCallable { id: self.id });
219    }
220}
221
222impl<X: GXExt> Callable<X> {
223    /// Call the lambda with args. Argument types and arity will be
224    /// checked and an error will be returned if they are wrong.
225    pub async fn call(&self, args: ValArray) -> Result<()> {
226        if self.typ.args.len() != args.len() {
227            bail!("expected {} args", self.typ.args.len())
228        }
229        for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
230            if !a.typ.is_a(&self.env, v) {
231                bail!("type mismatch arg {i} expected {}", a.typ)
232            }
233        }
234        self.call_unchecked(args).await
235    }
236
237    /// Call the lambda with args. Argument types and arity will NOT
238    /// be checked. This can result in a runtime panic, invalid
239    /// results, and probably other bad things.
240    pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
241        self.rt
242            .0
243            .send(ToGX::Call { id: self.id, args })
244            .map_err(|_| anyhow!("runtime is dead"))
245    }
246}
247
248enum ToGX<X: GXExt> {
249    GetEnv {
250        res: oneshot::Sender<Env<GXRt<X>, X::UserEvent>>,
251    },
252    Delete {
253        id: ExprId,
254    },
255    Load {
256        path: PathBuf,
257        rt: GXHandle<X>,
258        res: oneshot::Sender<Result<CompRes<X>>>,
259    },
260    Compile {
261        text: ArcStr,
262        rt: GXHandle<X>,
263        res: oneshot::Sender<Result<CompRes<X>>>,
264    },
265    CompileCallable {
266        id: Value,
267        rt: GXHandle<X>,
268        res: oneshot::Sender<Result<Callable<X>>>,
269    },
270    CompileRef {
271        id: BindId,
272        rt: GXHandle<X>,
273        res: oneshot::Sender<Result<Ref<X>>>,
274    },
275    Set {
276        id: BindId,
277        v: Value,
278    },
279    Call {
280        id: CallableId,
281        args: ValArray,
282    },
283    DeleteCallable {
284        id: CallableId,
285    },
286}
287
288#[derive(Clone)]
289pub enum GXEvent<X: GXExt> {
290    Updated(ExprId, Value),
291    Env(Env<GXRt<X>, X::UserEvent>),
292}
293
294/// A handle to a running GX instance.
295///
296/// Drop the handle to shutdown the associated background tasks.
297pub struct GXHandle<X: GXExt>(tmpsc::UnboundedSender<ToGX<X>>);
298
299impl<X: GXExt> Clone for GXHandle<X> {
300    fn clone(&self) -> Self {
301        Self(self.0.clone())
302    }
303}
304
305impl<X: GXExt> GXHandle<X> {
306    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
307        let (tx, rx) = oneshot::channel();
308        self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
309        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
310    }
311
312    /// Get a copy of the current graphix environment
313    pub async fn get_env(&self) -> Result<Env<GXRt<X>, X::UserEvent>> {
314        self.exec(|res| ToGX::GetEnv { res }).await
315    }
316
317    /// Compile and execute the specified graphix expression.
318    ///
319    /// If it generates results, they will be sent to all the channels that are
320    /// subscribed. When the `CompExp` objects contained in the `CompRes` are
321    /// dropped their corresponding expressions will be deleted. Therefore, you
322    /// can stop execution of the whole expression by dropping the returned
323    /// `CompRes`.
324    pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
325        Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
326    }
327
328    /// Load and execute the specified graphix module.
329    ///
330    /// The path may have one of two forms. If it is the path to a file with
331    /// extension .bs then the rt will load the file directly. If it is a
332    /// modpath (e.g. foo::bar::baz) then the module resolver will look for a
333    /// matching module in the modpath. When the `CompExp` objects contained in
334    /// the `CompRes` are dropped their corresponding expressions will be
335    /// deleted. Therefore, you can stop execution of the whole file by dropping
336    /// the returned `CompRes`.
337    pub async fn load(&self, path: PathBuf) -> Result<CompRes<X>> {
338        Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
339    }
340
341    /// Compile a callable interface to the specified lambda id.
342    ///
343    /// This is how you call a lambda directly from rust. When the returned
344    /// `Callable` is dropped the associated callsite will be delete.
345    pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
346        Ok(self
347            .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
348            .await??)
349    }
350
351    /// Compile an expression that will output the value of the ref specifed by
352    /// id.
353    ///
354    /// This is the same as the deref (*) operator in graphix. When the returned
355    /// `Ref` is dropped the compiled code will be deleted.
356    pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
357        Ok(self
358            .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
359            .await??)
360    }
361
362    /// Set the variable idenfified by `id` to `v`
363    ///
364    /// triggering updates of all dependent node trees.
365    pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
366        let v = v.into();
367        self.0.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
368    }
369}
370
371#[derive(Builder)]
372#[builder(pattern = "owned")]
373pub struct GXConfig<X: GXExt> {
374    /// The subscribe timeout to use when resolving modules in
375    /// netidx. Resolution will fail if the subscription does not
376    /// succeed before this timeout elapses.
377    #[builder(setter(strip_option), default)]
378    resolve_timeout: Option<Duration>,
379    /// The publish timeout to use when sending published batches. Default None.
380    #[builder(setter(strip_option), default)]
381    publish_timeout: Option<Duration>,
382    /// The execution context with any builtins already registered
383    ctx: ExecCtx<GXRt<X>, X::UserEvent>,
384    /// The text of the root module
385    #[builder(setter(strip_option), default)]
386    root: Option<ArcStr>,
387    /// The set of module resolvers to use when resolving loaded modules
388    #[builder(default)]
389    resolvers: Vec<ModuleResolver>,
390    /// The channel that will receive events from the runtime
391    sub: tmpsc::Sender<Pooled<Vec<GXEvent<X>>>>,
392}
393
394impl<X: GXExt> GXConfig<X> {
395    /// Create a new config
396    pub fn builder(
397        ctx: ExecCtx<GXRt<X>, X::UserEvent>,
398        sub: tmpsc::Sender<Pooled<Vec<GXEvent<X>>>>,
399    ) -> GXConfigBuilder<X> {
400        GXConfigBuilder::default().ctx(ctx).sub(sub)
401    }
402
403    /// Start the graphix runtime with the specified config,
404    ///
405    /// return a handle capable of interacting with it. root is the text of the
406    /// root module you wish to initially load. This will define the environment
407    /// for the rest of the code compiled by this runtime. The runtime starts
408    /// completely empty, with only the language, no core library, no standard
409    /// library. To build a runtime with the full standard library and nothing
410    /// else simply pass the output of `graphix_stdlib::register` to start.
411    pub async fn start(self) -> Result<GXHandle<X>> {
412        let (init_tx, init_rx) = oneshot::channel();
413        let (tx, rx) = tmpsc::unbounded_channel();
414        task::spawn(async move {
415            match GX::new(self).await {
416                Ok(bs) => {
417                    let _ = init_tx.send(Ok(()));
418                    if let Err(e) = bs.run(rx).await {
419                        error!("run loop exited with error {e:?}")
420                    }
421                }
422                Err(e) => {
423                    let _ = init_tx.send(Err(e));
424                }
425            };
426        });
427        init_rx.await??;
428        Ok(GXHandle(tx))
429    }
430}