graphix_rt/lib.rs
1//! A general purpose graphix runtime
2//!
3//! This module implements a generic graphix runtime suitable for most
4//! applications, including applications that implement custom graphix
5//! builtins. The graphix interperter is run in a background task, and
6//! can be interacted with via a handle. All features of the standard
7//! library are supported by this runtime.
8use anyhow::{anyhow, bail, Result};
9use arcstr::ArcStr;
10use derive_builder::Builder;
11use enumflags2::BitFlags;
12use fxhash::FxHashSet;
13use graphix_compiler::{
14 env::Env,
15 expr::{ExprId, ModPath, ModuleResolver},
16 typ::{FnType, Type},
17 BindId, CFlag, Event, ExecCtx, NoUserEvent, Scope, UserEvent,
18};
19use log::error;
20use netidx::{
21 protocol::valarray::ValArray,
22 publisher::{Value, WriteRequest},
23 subscriber::{self, SubId},
24};
25use netidx_core::atomic_id;
26use netidx_value::FromValue;
27use poolshark::global::GPooled;
28use serde_derive::{Deserialize, Serialize};
29use smallvec::SmallVec;
30use std::{fmt, future, path::PathBuf, time::Duration};
31use tokio::{
32 sync::{
33 mpsc::{self as tmpsc},
34 oneshot,
35 },
36 task,
37};
38
39mod gx;
40mod rt;
41use gx::GX;
42pub use rt::GXRt;
43
44/// Trait to extend the event loop
45///
46/// The Graphix event loop has two steps,
47/// - update event sources, polls external async event sources like
48/// netidx, sockets, files, etc
49/// - do cycle, collects all the events and delivers them to the dataflow
50/// graph as a batch of "everything that happened"
51///
52/// As such to extend the event loop you must implement two things. A function
53/// to poll your own external event sources, and a function to take the events
54/// you got from those sources and represent them to the dataflow graph. You
55/// represent them either by setting generic variables (bindid -> value map), or
56/// by setting some custom structures that you define as part of your UserEvent
57/// implementation.
58///
59/// Your Graphix builtins can access both your custom structure, to register new
60/// event sources, etc, and your custom user event structure, to receive events
61/// who's types do not fit nicely as `Value`. If your event payload does fit
62/// nicely as a `Value`, then just use a variable.
63pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
64 type UserEvent: UserEvent + Send + Sync + 'static;
65
66 /// Update your custom event sources
67 ///
68 /// Your `update_sources` MUST be cancel safe.
69 fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
70
71 /// Collect events that happened and marshal them into the event structure
72 ///
73 /// for delivery to the dataflow graph. `do_cycle` will be called, and a
74 /// batch of events delivered to the graph until `is_ready` returns false.
75 /// It is possible that a call to `update_sources` will result in
76 /// multiple calls to `do_cycle`, but it is not guaranteed that
77 /// `update_sources` will not be called again before `is_ready`
78 /// returns false.
79 fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
80
81 /// Return true if there are events ready to deliver
82 fn is_ready(&self) -> bool;
83
84 /// Clear the state
85 fn clear(&mut self);
86
87 /// Create and return an empty custom event structure
88 fn empty_event(&mut self) -> Self::UserEvent;
89}
90
91#[derive(Debug, Default)]
92pub struct NoExt;
93
94impl GXExt for NoExt {
95 type UserEvent = NoUserEvent;
96
97 async fn update_sources(&mut self) -> Result<()> {
98 future::pending().await
99 }
100
101 fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
102 Ok(())
103 }
104
105 fn is_ready(&self) -> bool {
106 false
107 }
108
109 fn clear(&mut self) {}
110
111 fn empty_event(&mut self) -> Self::UserEvent {
112 NoUserEvent
113 }
114}
115
116type UpdateBatch = GPooled<Vec<(SubId, subscriber::Event)>>;
117type WriteBatch = GPooled<Vec<WriteRequest>>;
118
119#[derive(Debug)]
120pub struct CompExp<X: GXExt> {
121 pub id: ExprId,
122 pub typ: Type,
123 pub output: bool,
124 rt: GXHandle<X>,
125}
126
127impl<X: GXExt> Drop for CompExp<X> {
128 fn drop(&mut self) {
129 let _ = self.rt.0.send(ToGX::Delete { id: self.id });
130 }
131}
132
133#[derive(Debug)]
134pub struct CompRes<X: GXExt> {
135 pub exprs: SmallVec<[CompExp<X>; 1]>,
136 pub env: Env<GXRt<X>, X::UserEvent>,
137}
138
139pub struct Ref<X: GXExt> {
140 pub id: ExprId,
141 // the most recent value of the variable
142 pub last: Option<Value>,
143 pub bid: BindId,
144 pub target_bid: Option<BindId>,
145 pub typ: Type,
146 rt: GXHandle<X>,
147}
148
149impl<X: GXExt> Drop for Ref<X> {
150 fn drop(&mut self) {
151 let _ = self.rt.0.send(ToGX::Delete { id: self.id });
152 }
153}
154
155impl<X: GXExt> Ref<X> {
156 /// set the value of the ref `r <-`
157 ///
158 /// This will cause all nodes dependent on this id to update. This is the
159 /// same thing as the `<-` operator in Graphix. This does the same thing as
160 /// `GXHandle::set`
161 pub fn set<T: Into<Value>>(&mut self, v: T) -> Result<()> {
162 let v = v.into();
163 self.last = Some(v.clone());
164 self.rt.set(self.bid, v)
165 }
166
167 /// set the value pointed to by ref `*r <-`
168 ///
169 /// This will cause all nodes dependent on *id to update. This is the same
170 /// as the `*r <-` operator in Graphix. This does the same thing as
171 /// `GXHandle::set` using the target id.
172 pub fn set_deref<T: Into<Value>>(&mut self, v: T) -> Result<()> {
173 if let Some(id) = self.target_bid {
174 self.rt.set(id, v)?
175 }
176 Ok(())
177 }
178
179 /// Process an update
180 ///
181 /// If the expr id refers to this ref, then set `last` to `v` and return a
182 /// mutable reference to `last`, otherwise return None. This will also
183 /// update `last` if the id matches.
184 pub fn update(&mut self, id: ExprId, v: &Value) -> Option<&mut Value> {
185 if self.id == id {
186 self.last = Some(v.clone());
187 self.last.as_mut()
188 } else {
189 None
190 }
191 }
192}
193
194pub struct TRef<X: GXExt, T: FromValue> {
195 pub r: Ref<X>,
196 pub t: Option<T>,
197}
198
199impl<X: GXExt, T: FromValue> TRef<X, T> {
200 /// Create a new typed reference from `r`
201 ///
202 /// If conversion of `r` fails, return an error.
203 pub fn new(mut r: Ref<X>) -> Result<Self> {
204 let t = r.last.take().map(|v| v.cast_to()).transpose()?;
205 Ok(TRef { r, t })
206 }
207
208 /// Process an update
209 ///
210 /// If the expr id refers to this tref, then convert the value into a `T`
211 /// update `t` and return a mutable reference to the new `T`, otherwise
212 /// return None. Return an Error if the conversion failed.
213 pub fn update(&mut self, id: ExprId, v: &Value) -> Result<Option<&mut T>> {
214 if self.r.id == id {
215 let v = v.clone().cast_to()?;
216 self.t = Some(v);
217 Ok(self.t.as_mut())
218 } else {
219 Ok(None)
220 }
221 }
222}
223
224impl<X: GXExt, T: Into<Value> + FromValue + Clone> TRef<X, T> {
225 /// set the value of the tref `r <-`
226 ///
227 /// This will cause all nodes dependent on this id to update. This is the
228 /// same thing as the `<-` operator in Graphix. This does the same thing as
229 /// `GXHandle::set`
230 pub fn set(&mut self, t: T) -> Result<()> {
231 self.t = Some(t.clone());
232 self.r.set(t)
233 }
234
235 /// set the value pointed to by tref `*r <-`
236 ///
237 /// This will cause all nodes dependent on *id to update. This is the same
238 /// as the `*r <-` operator in Graphix. This does the same thing as
239 /// `GXHandle::set` using the target id.
240 pub fn set_deref(&mut self, t: T) -> Result<()> {
241 self.t = Some(t.clone());
242 self.r.set_deref(t.into())
243 }
244}
245
246atomic_id!(CallableId);
247
248pub struct Callable<X: GXExt> {
249 rt: GXHandle<X>,
250 id: CallableId,
251 env: Env<GXRt<X>, X::UserEvent>,
252 pub typ: FnType,
253 pub expr: ExprId,
254}
255
256impl<X: GXExt> Drop for Callable<X> {
257 fn drop(&mut self) {
258 let _ = self.rt.0.send(ToGX::DeleteCallable { id: self.id });
259 }
260}
261
262impl<X: GXExt> Callable<X> {
263 /// Call the lambda with args
264 ///
265 /// Argument types and arity will be checked and an error will be returned
266 /// if they are wrong. If you call the function more than once before it
267 /// returns there is no guarantee that the returns will arrive in the order
268 /// of the calls. There is no guarantee that a function must return.
269 pub async fn call(&self, args: ValArray) -> Result<()> {
270 if self.typ.args.len() != args.len() {
271 bail!("expected {} args", self.typ.args.len())
272 }
273 for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
274 if !a.typ.is_a(&self.env, v) {
275 bail!("type mismatch arg {i} expected {}", a.typ)
276 }
277 }
278 self.call_unchecked(args).await
279 }
280
281 /// Call the lambda with args. Argument types and arity will NOT
282 /// be checked. This can result in a runtime panic, invalid
283 /// results, and probably other bad things.
284 pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
285 self.rt
286 .0
287 .send(ToGX::Call { id: self.id, args })
288 .map_err(|_| anyhow!("runtime is dead"))
289 }
290
291 /// Return Some(v) if this update is the return value of the callable
292 pub fn update<'a>(&self, id: ExprId, v: &'a Value) -> Option<&'a Value> {
293 if self.expr == id {
294 Some(v)
295 } else {
296 None
297 }
298 }
299}
300
301enum DeferredCall {
302 Call(ValArray, oneshot::Sender<Result<()>>),
303 CallUnchecked(ValArray, oneshot::Sender<Result<()>>),
304}
305
306pub struct NamedCallable<X: GXExt> {
307 fname: Ref<X>,
308 current: Option<Callable<X>>,
309 ids: FxHashSet<ExprId>,
310 deferred: Vec<DeferredCall>,
311 h: GXHandle<X>,
312}
313
314impl<X: GXExt> NamedCallable<X> {
315 /// Update the named callable function
316 ///
317 /// This method does two things,
318 /// - Handle late binding. When the name ref updates to an actual function
319 /// compile the real call site
320 /// - Return Ok(Some(v)) when the called function returns
321 pub async fn update<'a>(
322 &mut self,
323 id: ExprId,
324 v: &'a Value,
325 ) -> Result<Option<&'a Value>> {
326 match self.fname.update(id, v) {
327 Some(v) => {
328 let callable = self.h.compile_callable(v.clone()).await?;
329 self.ids.insert(callable.expr);
330 for dc in self.deferred.drain(..) {
331 match dc {
332 DeferredCall::Call(args, reply) => {
333 let _ = reply.send(callable.call(args).await);
334 }
335 DeferredCall::CallUnchecked(args, reply) => {
336 let _ = reply.send(callable.call_unchecked(args).await);
337 }
338 }
339 }
340 self.current = Some(callable);
341 Ok(None)
342 }
343 None if self.ids.contains(&id) => Ok(Some(v)),
344 None => Ok(None),
345 }
346 }
347
348 /// Call the lambda with args
349 ///
350 /// Argument types and arity will be checked and an error will be returned
351 /// if they are wrong. If you call the function more than once before it
352 /// returns there is no guarantee that the returns will arrive in the order
353 /// of the calls. There is no guarantee that a function must return. In
354 /// order to handle late binding you must keep calling `update` while
355 /// waiting for this method.
356 ///
357 /// While a late bound function is unresolved calls will queue internally in
358 /// the NamedCallsite and will happen when the function is resolved.
359 pub async fn call(&mut self, args: ValArray) -> Result<()> {
360 match &self.current {
361 Some(c) => c.call(args).await,
362 None => {
363 let (tx, rx) = oneshot::channel();
364 self.deferred.push(DeferredCall::Call(args, tx));
365 rx.await?
366 }
367 }
368 }
369
370 /// call the function with the specified args
371 ///
372 /// Argument types and arity will NOT be checked by this method. If you call
373 /// the function more than once before it returns there is no guarantee that
374 /// the returns will arrive in the order of the calls. There is no guarantee
375 /// that a function must return. In order to handle late binding you must
376 /// keep calling `update` while waiting for this method.
377 ///
378 /// While a late bound function is unresolved calls will queue internally in
379 /// the NamedCallsite and will happen when the function is resolved.
380 pub async fn call_unchecked(&mut self, args: ValArray) -> Result<()> {
381 match &self.current {
382 Some(c) => c.call(args).await,
383 None => {
384 let (tx, rx) = oneshot::channel();
385 self.deferred.push(DeferredCall::CallUnchecked(args, tx));
386 rx.await?
387 }
388 }
389 }
390}
391
392enum ToGX<X: GXExt> {
393 GetEnv {
394 res: oneshot::Sender<Env<GXRt<X>, X::UserEvent>>,
395 },
396 Delete {
397 id: ExprId,
398 },
399 Load {
400 path: PathBuf,
401 rt: GXHandle<X>,
402 res: oneshot::Sender<Result<CompRes<X>>>,
403 },
404 Check {
405 path: PathBuf,
406 res: oneshot::Sender<Result<()>>,
407 },
408 Compile {
409 text: ArcStr,
410 rt: GXHandle<X>,
411 res: oneshot::Sender<Result<CompRes<X>>>,
412 },
413 CompileCallable {
414 id: Value,
415 rt: GXHandle<X>,
416 res: oneshot::Sender<Result<Callable<X>>>,
417 },
418 CompileRef {
419 id: BindId,
420 rt: GXHandle<X>,
421 res: oneshot::Sender<Result<Ref<X>>>,
422 },
423 Set {
424 id: BindId,
425 v: Value,
426 },
427 Call {
428 id: CallableId,
429 args: ValArray,
430 },
431 DeleteCallable {
432 id: CallableId,
433 },
434}
435
436#[derive(Debug, Clone)]
437pub enum GXEvent<X: GXExt> {
438 Updated(ExprId, Value),
439 Env(Env<GXRt<X>, X::UserEvent>),
440}
441
442/// A handle to a running GX instance.
443///
444/// Drop the handle to shutdown the associated background tasks.
445pub struct GXHandle<X: GXExt>(tmpsc::UnboundedSender<ToGX<X>>);
446
447impl<X: GXExt> fmt::Debug for GXHandle<X> {
448 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
449 write!(f, "GXHandle")
450 }
451}
452
453impl<X: GXExt> Clone for GXHandle<X> {
454 fn clone(&self) -> Self {
455 Self(self.0.clone())
456 }
457}
458
459impl<X: GXExt> GXHandle<X> {
460 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
461 let (tx, rx) = oneshot::channel();
462 self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
463 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
464 }
465
466 /// Get a copy of the current graphix environment
467 pub async fn get_env(&self) -> Result<Env<GXRt<X>, X::UserEvent>> {
468 self.exec(|res| ToGX::GetEnv { res }).await
469 }
470
471 /// Check that the specified file compiles and typechecks.
472 ///
473 /// If the file will compile and type check successfully
474 /// return Ok(()) otherwise an error describing the problem. The
475 /// environment will not be altered by checking an expression, so
476 /// you will not be able to use any defined names later in the
477 /// program. If you want to do that see `compile`
478 pub async fn check(&self, path: PathBuf) -> Result<()> {
479 Ok(self.exec(|tx| ToGX::Check { path, res: tx }).await??)
480 }
481
482 /// Compile and execute the specified graphix expression.
483 ///
484 /// If it generates results, they will be sent to all the channels that are
485 /// subscribed. When the `CompExp` objects contained in the `CompRes` are
486 /// dropped their corresponding expressions will be deleted. Therefore, you
487 /// can stop execution of the whole expression by dropping the returned
488 /// `CompRes`.
489 pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
490 Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
491 }
492
493 /// Load and execute the specified graphix module.
494 ///
495 /// The path may have one of two forms. If it is the path to a file with
496 /// extension .bs then the rt will load the file directly. If it is a
497 /// modpath (e.g. foo::bar::baz) then the module resolver will look for a
498 /// matching module in the modpath. When the `CompExp` objects contained in
499 /// the `CompRes` are dropped their corresponding expressions will be
500 /// deleted. Therefore, you can stop execution of the whole file by dropping
501 /// the returned `CompRes`.
502 pub async fn load(&self, path: PathBuf) -> Result<CompRes<X>> {
503 Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
504 }
505
506 /// Compile a callable interface to the specified lambda id.
507 ///
508 /// This is how you call a lambda directly from rust. When the returned
509 /// `Callable` is dropped the associated callsite will be delete.
510 pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
511 Ok(self
512 .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
513 .await??)
514 }
515
516 /// Compile a callable interface to a late bound function by name.
517 ///
518 /// This allows you to call a function by name. Because of late binding it
519 /// has some additional complexity (though less than implementing it
520 /// yourself). You must call `update` on `NamedCallable` when you recieve
521 /// updates from the runtime in order to drive late binding. `update` will
522 /// also return `Some` when one of your function calls returns.
523 pub async fn compile_callable_by_name(
524 &self,
525 env: &Env<GXRt<X>, X::UserEvent>,
526 scope: &Scope,
527 name: &ModPath,
528 ) -> Result<NamedCallable<X>> {
529 let r = self.compile_ref_by_name(env, scope, name).await?;
530 match &r.typ {
531 Type::Fn(_) => (),
532 t => bail!(
533 "{name} in scope {} has type {t}. expected a function",
534 scope.lexical
535 ),
536 }
537 Ok(NamedCallable {
538 fname: r,
539 current: None,
540 ids: FxHashSet::default(),
541 deferred: vec![],
542 h: self.clone(),
543 })
544 }
545
546 /// Compile a ref to a specific bind id
547 ///
548 /// This will NOT return an error if the specified id isn't in the environment.
549 pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
550 Ok(self
551 .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
552 .await??)
553 }
554
555 /// Compile a ref to a specific name
556 ///
557 /// Return an error if the name does not exist in the specified environment
558 pub async fn compile_ref_by_name(
559 &self,
560 env: &Env<GXRt<X>, X::UserEvent>,
561 scope: &Scope,
562 name: &ModPath,
563 ) -> Result<Ref<X>> {
564 let id = env
565 .lookup_bind(&scope.lexical, name)
566 .ok_or_else(|| anyhow!("no such value {name} in scope {}", scope.lexical))?
567 .1
568 .id;
569 self.compile_ref(id).await
570 }
571
572 /// Set the variable idenfified by `id` to `v`
573 ///
574 /// triggering updates of all dependent node trees. This does the same thing
575 /// as`Ref::set` and `TRef::set`
576 pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
577 let v = v.into();
578 self.0.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
579 }
580}
581
582#[derive(Builder)]
583#[builder(pattern = "owned")]
584pub struct GXConfig<X: GXExt> {
585 /// The subscribe timeout to use when resolving modules in
586 /// netidx. Resolution will fail if the subscription does not
587 /// succeed before this timeout elapses.
588 #[builder(setter(strip_option), default)]
589 resolve_timeout: Option<Duration>,
590 /// The publish timeout to use when sending published batches. Default None.
591 #[builder(setter(strip_option), default)]
592 publish_timeout: Option<Duration>,
593 /// The execution context with any builtins already registered
594 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
595 /// The text of the root module
596 #[builder(setter(strip_option), default)]
597 root: Option<ArcStr>,
598 /// The set of module resolvers to use when resolving loaded modules
599 #[builder(default)]
600 resolvers: Vec<ModuleResolver>,
601 /// The channel that will receive events from the runtime
602 sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
603 /// The set of compiler flags. Default empty.
604 #[builder(default)]
605 flags: BitFlags<CFlag>,
606}
607
608impl<X: GXExt> GXConfig<X> {
609 /// Create a new config
610 pub fn builder(
611 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
612 sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
613 ) -> GXConfigBuilder<X> {
614 GXConfigBuilder::default().ctx(ctx).sub(sub)
615 }
616
617 /// Start the graphix runtime with the specified config,
618 ///
619 /// return a handle capable of interacting with it. root is the text of the
620 /// root module you wish to initially load. This will define the environment
621 /// for the rest of the code compiled by this runtime. The runtime starts
622 /// completely empty, with only the language, no core library, no standard
623 /// library. To build a runtime with the full standard library and nothing
624 /// else simply pass the output of `graphix_stdlib::register` to start.
625 pub async fn start(self) -> Result<GXHandle<X>> {
626 let (init_tx, init_rx) = oneshot::channel();
627 let (tx, rx) = tmpsc::unbounded_channel();
628 task::spawn(async move {
629 match GX::new(self).await {
630 Ok(bs) => {
631 let _ = init_tx.send(Ok(()));
632 if let Err(e) = bs.run(rx).await {
633 error!("run loop exited with error {e:?}")
634 }
635 }
636 Err(e) => {
637 let _ = init_tx.send(Err(e));
638 }
639 };
640 });
641 init_rx.await??;
642 Ok(GXHandle(tx))
643 }
644}