graphix_rt/
lib.rs

1//! A general purpose graphix runtime
2//!
3//! This module implements a generic graphix runtime suitable for most
4//! applications, including applications that implement custom graphix
5//! builtins. The graphix interperter is run in a background task, and
6//! can be interacted with via a handle. All features of the standard
7//! library are supported by this runtime.
8use anyhow::{anyhow, bail, Result};
9use arcstr::ArcStr;
10use core::fmt;
11use derive_builder::Builder;
12use graphix_compiler::{
13    env::Env,
14    expr::{ExprId, ModuleResolver},
15    typ::{FnType, Type},
16    BindId, Event, ExecCtx, NoUserEvent, UserEvent,
17};
18use log::error;
19use netidx::{
20    pool::Pooled,
21    protocol::valarray::ValArray,
22    publisher::{Value, WriteRequest},
23    subscriber::{self, SubId},
24};
25use netidx_core::atomic_id;
26use serde_derive::{Deserialize, Serialize};
27use smallvec::SmallVec;
28use std::{future, path::PathBuf, time::Duration};
29use tokio::{
30    sync::{
31        mpsc::{self as tmpsc},
32        oneshot,
33    },
34    task,
35};
36
37mod gx;
38mod rt;
39use gx::GX;
40pub use rt::GXRt;
41
42/// Trait to extend the event loop
43///
44/// The Graphix event loop has two steps,
45/// - update event sources, polls external async event sources like
46///   netidx, sockets, files, etc
47/// - do cycle, collects all the events and delivers them to the dataflow
48///   graph as a batch of "everything that happened"
49///
50/// As such to extend the event loop you must implement two things. A function
51/// to poll your own external event sources, and a function to take the events
52/// you got from those sources and represent them to the dataflow graph. You
53/// represent them either by setting generic variables (bindid -> value map), or
54/// by setting some custom structures that you define as part of your UserEvent
55/// implementation.
56///
57/// Your Graphix builtins can access both your custom structure, to register new
58/// event sources, etc, and your custom user event structure, to receive events
59/// who's types do not fit nicely as `Value`. If your event payload does fit
60/// nicely as a `Value`, then just use a variable.
61pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
62    type UserEvent: UserEvent + Send + Sync + 'static;
63
64    /// Update your custom event sources
65    ///
66    /// Your `update_sources` MUST be cancel safe.
67    fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
68
69    /// Collect events that happened and marshal them into the event structure
70    ///
71    /// for delivery to the dataflow graph. `do_cycle` will be called, and a
72    /// batch of events delivered to the graph until `is_ready` returns false.
73    /// It is possible that a call to `update_sources` will result in
74    /// multiple calls to `do_cycle`, but it is not guaranteed that
75    /// `update_sources` will not be called again before `is_ready`
76    /// returns false.
77    fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
78
79    /// Return true if there are events ready to deliver
80    fn is_ready(&self) -> bool;
81
82    /// Clear the state
83    fn clear(&mut self);
84
85    /// Create and return an empty custom event structure
86    fn empty_event(&mut self) -> Self::UserEvent;
87}
88
89#[derive(Debug, Default)]
90pub struct NoExt;
91
92impl GXExt for NoExt {
93    type UserEvent = NoUserEvent;
94
95    async fn update_sources(&mut self) -> Result<()> {
96        future::pending().await
97    }
98
99    fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
100        Ok(())
101    }
102
103    fn is_ready(&self) -> bool {
104        false
105    }
106
107    fn clear(&mut self) {}
108
109    fn empty_event(&mut self) -> Self::UserEvent {
110        NoUserEvent
111    }
112}
113
114type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
115type WriteBatch = Pooled<Vec<WriteRequest>>;
116
117#[derive(Debug)]
118pub struct CouldNotResolve;
119
120impl fmt::Display for CouldNotResolve {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        write!(f, "could not resolve module")
123    }
124}
125
126pub struct CompExp<X: GXExt> {
127    pub id: ExprId,
128    pub typ: Type,
129    pub output: bool,
130    rt: GXHandle<X>,
131}
132
133impl<X: GXExt> Drop for CompExp<X> {
134    fn drop(&mut self) {
135        let _ = self.rt.0.send(ToGX::Delete { id: self.id });
136    }
137}
138
139pub struct CompRes<X: GXExt> {
140    pub exprs: SmallVec<[CompExp<X>; 1]>,
141    pub env: Env<GXRt<X>, X::UserEvent>,
142}
143
144pub struct Ref<X: GXExt> {
145    pub id: ExprId,
146    pub last: Option<Value>,
147    pub bid: BindId,
148    pub target_bid: Option<BindId>,
149    rt: GXHandle<X>,
150}
151
152impl<X: GXExt> Drop for Ref<X> {
153    fn drop(&mut self) {
154        let _ = self.rt.0.send(ToGX::Delete { id: self.id });
155    }
156}
157
158impl<X: GXExt> Ref<X> {
159    pub fn set(&self, v: Value) -> Result<()> {
160        self.rt.set(self.bid, v)
161    }
162
163    pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
164        if let Some(id) = self.target_bid {
165            self.rt.set(id, v)?
166        }
167        Ok(())
168    }
169}
170
171atomic_id!(CallableId);
172
173pub struct Callable<X: GXExt> {
174    rt: GXHandle<X>,
175    id: CallableId,
176    env: Env<GXRt<X>, X::UserEvent>,
177    pub typ: FnType,
178    pub expr: ExprId,
179}
180
181impl<X: GXExt> Drop for Callable<X> {
182    fn drop(&mut self) {
183        let _ = self.rt.0.send(ToGX::DeleteCallable { id: self.id });
184    }
185}
186
187impl<X: GXExt> Callable<X> {
188    /// Call the lambda with args. Argument types and arity will be
189    /// checked and an error will be returned if they are wrong.
190    pub async fn call(&self, args: ValArray) -> Result<()> {
191        if self.typ.args.len() != args.len() {
192            bail!("expected {} args", self.typ.args.len())
193        }
194        for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
195            if !a.typ.is_a(&self.env, v) {
196                bail!("type mismatch arg {i} expected {}", a.typ)
197            }
198        }
199        self.call_unchecked(args).await
200    }
201
202    /// Call the lambda with args. Argument types and arity will NOT
203    /// be checked. This can result in a runtime panic, invalid
204    /// results, and probably other bad things.
205    pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
206        self.rt
207            .0
208            .send(ToGX::Call { id: self.id, args })
209            .map_err(|_| anyhow!("runtime is dead"))
210    }
211}
212
213enum ToGX<X: GXExt> {
214    GetEnv {
215        res: oneshot::Sender<Env<GXRt<X>, X::UserEvent>>,
216    },
217    Delete {
218        id: ExprId,
219    },
220    Load {
221        path: PathBuf,
222        rt: GXHandle<X>,
223        res: oneshot::Sender<Result<CompRes<X>>>,
224    },
225    Compile {
226        text: ArcStr,
227        rt: GXHandle<X>,
228        res: oneshot::Sender<Result<CompRes<X>>>,
229    },
230    CompileCallable {
231        id: Value,
232        rt: GXHandle<X>,
233        res: oneshot::Sender<Result<Callable<X>>>,
234    },
235    CompileRef {
236        id: BindId,
237        rt: GXHandle<X>,
238        res: oneshot::Sender<Result<Ref<X>>>,
239    },
240    Set {
241        id: BindId,
242        v: Value,
243    },
244    Call {
245        id: CallableId,
246        args: ValArray,
247    },
248    DeleteCallable {
249        id: CallableId,
250    },
251}
252
253#[derive(Clone)]
254pub enum GXEvent<X: GXExt> {
255    Updated(ExprId, Value),
256    Env(Env<GXRt<X>, X::UserEvent>),
257}
258
259/// A handle to a running GX instance.
260///
261/// Drop the handle to shutdown the associated background tasks.
262pub struct GXHandle<X: GXExt>(tmpsc::UnboundedSender<ToGX<X>>);
263
264impl<X: GXExt> Clone for GXHandle<X> {
265    fn clone(&self) -> Self {
266        Self(self.0.clone())
267    }
268}
269
270impl<X: GXExt> GXHandle<X> {
271    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
272        let (tx, rx) = oneshot::channel();
273        self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
274        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
275    }
276
277    /// Get a copy of the current graphix environment
278    pub async fn get_env(&self) -> Result<Env<GXRt<X>, X::UserEvent>> {
279        self.exec(|res| ToGX::GetEnv { res }).await
280    }
281
282    /// Compile and execute the specified graphix expression.
283    ///
284    /// If it generates results, they will be sent to all the channels that are
285    /// subscribed. When the `CompExp` objects contained in the `CompRes` are
286    /// dropped their corresponding expressions will be deleted. Therefore, you
287    /// can stop execution of the whole expression by dropping the returned
288    /// `CompRes`.
289    pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
290        Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
291    }
292
293    /// Load and execute the specified graphix module.
294    ///
295    /// The path may have one of two forms. If it is the path to a file with
296    /// extension .bs then the rt will load the file directly. If it is a
297    /// modpath (e.g. foo::bar::baz) then the module resolver will look for a
298    /// matching module in the modpath. When the `CompExp` objects contained in
299    /// the `CompRes` are dropped their corresponding expressions will be
300    /// deleted. Therefore, you can stop execution of the whole file by dropping
301    /// the returned `CompRes`.
302    pub async fn load(&self, path: PathBuf) -> Result<CompRes<X>> {
303        Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
304    }
305
306    /// Compile a callable interface to the specified lambda id.
307    ///
308    /// This is how you call a lambda directly from rust. When the returned
309    /// `Callable` is dropped the associated callsite will be delete.
310    pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
311        Ok(self
312            .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
313            .await??)
314    }
315
316    /// Compile an expression that will output the value of the ref specifed by
317    /// id.
318    ///
319    /// This is the same as the deref (*) operator in graphix. When the returned
320    /// `Ref` is dropped the compiled code will be deleted.
321    pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
322        Ok(self
323            .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
324            .await??)
325    }
326
327    /// Set the variable idenfified by `id` to `v`
328    ///
329    /// triggering updates of all dependent node trees.
330    pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
331        let v = v.into();
332        self.0.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
333    }
334}
335
336#[derive(Builder)]
337#[builder(pattern = "owned")]
338pub struct GXConfig<X: GXExt> {
339    /// The subscribe timeout to use when resolving modules in
340    /// netidx. Resolution will fail if the subscription does not
341    /// succeed before this timeout elapses.
342    #[builder(setter(strip_option), default)]
343    resolve_timeout: Option<Duration>,
344    /// The publish timeout to use when sending published batches. Default None.
345    #[builder(setter(strip_option), default)]
346    publish_timeout: Option<Duration>,
347    /// The execution context with any builtins already registered
348    ctx: ExecCtx<GXRt<X>, X::UserEvent>,
349    /// The text of the root module
350    #[builder(setter(strip_option), default)]
351    root: Option<ArcStr>,
352    /// The set of module resolvers to use when resolving loaded modules
353    #[builder(default)]
354    resolvers: Vec<ModuleResolver>,
355    /// The channel that will receive events from the runtime
356    sub: tmpsc::Sender<Pooled<Vec<GXEvent<X>>>>,
357}
358
359impl<X: GXExt> GXConfig<X> {
360    /// Create a new config
361    pub fn builder(
362        ctx: ExecCtx<GXRt<X>, X::UserEvent>,
363        sub: tmpsc::Sender<Pooled<Vec<GXEvent<X>>>>,
364    ) -> GXConfigBuilder<X> {
365        GXConfigBuilder::default().ctx(ctx).sub(sub)
366    }
367
368    /// Start the graphix runtime with the specified config,
369    ///
370    /// return a handle capable of interacting with it. root is the text of the
371    /// root module you wish to initially load. This will define the environment
372    /// for the rest of the code compiled by this runtime. The runtime starts
373    /// completely empty, with only the language, no core library, no standard
374    /// library. To build a runtime with the full standard library and nothing
375    /// else simply pass the output of `graphix_stdlib::register` to start.
376    pub async fn start(self) -> Result<GXHandle<X>> {
377        let (init_tx, init_rx) = oneshot::channel();
378        let (tx, rx) = tmpsc::unbounded_channel();
379        task::spawn(async move {
380            match GX::new(self).await {
381                Ok(bs) => {
382                    let _ = init_tx.send(Ok(()));
383                    if let Err(e) = bs.run(rx).await {
384                        error!("run loop exited with error {e:?}")
385                    }
386                }
387                Err(e) => {
388                    let _ = init_tx.send(Err(e));
389                }
390            };
391        });
392        init_rx.await??;
393        Ok(GXHandle(tx))
394    }
395}