graphix_rt/lib.rs
1#![doc(
2 html_logo_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg",
3 html_favicon_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg"
4)]
5//! A general purpose graphix runtime
6//!
7//! This module implements a generic graphix runtime suitable for most
8//! applications, including applications that implement custom graphix
9//! builtins. The graphix interperter is run in a background task, and
10//! can be interacted with via a handle. All features of the standard
11//! library are supported by this runtime.
12use anyhow::{anyhow, bail, Result};
13use arcstr::ArcStr;
14use derive_builder::Builder;
15use enumflags2::BitFlags;
16use fxhash::FxHashSet;
17use graphix_compiler::{
18 env::Env,
19 expr::{ExprId, ModPath, ModuleResolver, Source},
20 typ::{FnType, Type},
21 BindId, CFlag, Event, ExecCtx, NoUserEvent, Scope, UserEvent,
22};
23use log::error;
24use netidx::{
25 protocol::valarray::ValArray,
26 publisher::{Value, WriteRequest},
27 subscriber::{self, SubId},
28};
29use netidx_core::atomic_id;
30use netidx_value::FromValue;
31use poolshark::global::GPooled;
32use serde_derive::{Deserialize, Serialize};
33use smallvec::SmallVec;
34use std::{fmt, future, sync::Arc, time::Duration};
35use tokio::{
36 sync::{
37 mpsc::{self as tmpsc},
38 oneshot,
39 },
40 task::{self, JoinHandle},
41};
42
43mod gx;
44mod rt;
45use gx::GX;
46pub use rt::GXRt;
47
48/// Trait to extend the event loop
49///
50/// The Graphix event loop has two steps,
51/// - update event sources, polls external async event sources like
52/// netidx, sockets, files, etc
53/// - do cycle, collects all the events and delivers them to the dataflow
54/// graph as a batch of "everything that happened"
55///
56/// As such to extend the event loop you must implement two things. A function
57/// to poll your own external event sources, and a function to take the events
58/// you got from those sources and represent them to the dataflow graph. You
59/// represent them either by setting generic variables (bindid -> value map), or
60/// by setting some custom structures that you define as part of your UserEvent
61/// implementation.
62///
63/// Your Graphix builtins can access both your custom structure, to register new
64/// event sources, etc, and your custom user event structure, to receive events
65/// who's types do not fit nicely as `Value`. If your event payload does fit
66/// nicely as a `Value`, then just use a variable.
67pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
68 type UserEvent: UserEvent + Send + Sync + 'static;
69
70 /// Update your custom event sources
71 ///
72 /// Your `update_sources` MUST be cancel safe.
73 fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
74
75 /// Collect events that happened and marshal them into the event structure
76 ///
77 /// for delivery to the dataflow graph. `do_cycle` will be called, and a
78 /// batch of events delivered to the graph until `is_ready` returns false.
79 /// It is possible that a call to `update_sources` will result in
80 /// multiple calls to `do_cycle`, but it is not guaranteed that
81 /// `update_sources` will not be called again before `is_ready`
82 /// returns false.
83 fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
84
85 /// Return true if there are events ready to deliver
86 fn is_ready(&self) -> bool;
87
88 /// Clear the state
89 fn clear(&mut self);
90
91 /// Create and return an empty custom event structure
92 fn empty_event(&mut self) -> Self::UserEvent;
93}
94
95#[derive(Debug, Default)]
96pub struct NoExt;
97
98impl GXExt for NoExt {
99 type UserEvent = NoUserEvent;
100
101 async fn update_sources(&mut self) -> Result<()> {
102 future::pending().await
103 }
104
105 fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
106 Ok(())
107 }
108
109 fn is_ready(&self) -> bool {
110 false
111 }
112
113 fn clear(&mut self) {}
114
115 fn empty_event(&mut self) -> Self::UserEvent {
116 NoUserEvent
117 }
118}
119
120type UpdateBatch = GPooled<Vec<(SubId, subscriber::Event)>>;
121type WriteBatch = GPooled<Vec<WriteRequest>>;
122
123#[derive(Debug)]
124pub struct CompExp<X: GXExt> {
125 pub id: ExprId,
126 pub typ: Type,
127 pub output: bool,
128 rt: GXHandle<X>,
129}
130
131impl<X: GXExt> Drop for CompExp<X> {
132 fn drop(&mut self) {
133 let _ = self.rt.0.tx.send(ToGX::Delete { id: self.id });
134 }
135}
136
137#[derive(Debug)]
138pub struct CompRes<X: GXExt> {
139 pub exprs: SmallVec<[CompExp<X>; 1]>,
140 pub env: Env,
141}
142
143pub struct Ref<X: GXExt> {
144 pub id: ExprId,
145 // the most recent value of the variable
146 pub last: Option<Value>,
147 pub bid: BindId,
148 pub target_bid: Option<BindId>,
149 pub typ: Type,
150 rt: GXHandle<X>,
151}
152
153impl<X: GXExt> Drop for Ref<X> {
154 fn drop(&mut self) {
155 let _ = self.rt.0.tx.send(ToGX::Delete { id: self.id });
156 }
157}
158
159impl<X: GXExt> Ref<X> {
160 /// set the value of the ref `r <-`
161 ///
162 /// This will cause all nodes dependent on this id to update. This is the
163 /// same thing as the `<-` operator in Graphix. This does the same thing as
164 /// `GXHandle::set`
165 pub fn set<T: Into<Value>>(&mut self, v: T) -> Result<()> {
166 let v = v.into();
167 self.last = Some(v.clone());
168 self.rt.set(self.bid, v)
169 }
170
171 /// set the value pointed to by ref `*r <-`
172 ///
173 /// This will cause all nodes dependent on *id to update. This is the same
174 /// as the `*r <-` operator in Graphix. This does the same thing as
175 /// `GXHandle::set` using the target id.
176 pub fn set_deref<T: Into<Value>>(&mut self, v: T) -> Result<()> {
177 if let Some(id) = self.target_bid {
178 self.rt.set(id, v)?
179 }
180 Ok(())
181 }
182
183 /// Process an update
184 ///
185 /// If the expr id refers to this ref, then set `last` to `v` and return a
186 /// mutable reference to `last`, otherwise return None. This will also
187 /// update `last` if the id matches.
188 pub fn update(&mut self, id: ExprId, v: &Value) -> Option<&mut Value> {
189 if self.id == id {
190 self.last = Some(v.clone());
191 self.last.as_mut()
192 } else {
193 None
194 }
195 }
196}
197
198pub struct TRef<X: GXExt, T: FromValue> {
199 pub r: Ref<X>,
200 pub t: Option<T>,
201}
202
203impl<X: GXExt, T: FromValue> TRef<X, T> {
204 /// Create a new typed reference from `r`
205 ///
206 /// If conversion of `r` fails, return an error.
207 pub fn new(mut r: Ref<X>) -> Result<Self> {
208 let t = r.last.take().map(|v| v.cast_to()).transpose()?;
209 Ok(TRef { r, t })
210 }
211
212 /// Process an update
213 ///
214 /// If the expr id refers to this tref, then convert the value into a `T`
215 /// update `t` and return a mutable reference to the new `T`, otherwise
216 /// return None. Return an Error if the conversion failed.
217 pub fn update(&mut self, id: ExprId, v: &Value) -> Result<Option<&mut T>> {
218 if self.r.id == id {
219 let v = v.clone().cast_to()?;
220 self.t = Some(v);
221 Ok(self.t.as_mut())
222 } else {
223 Ok(None)
224 }
225 }
226}
227
228impl<X: GXExt, T: Into<Value> + FromValue + Clone> TRef<X, T> {
229 /// set the value of the tref `r <-`
230 ///
231 /// This will cause all nodes dependent on this id to update. This is the
232 /// same thing as the `<-` operator in Graphix. This does the same thing as
233 /// `GXHandle::set`
234 pub fn set(&mut self, t: T) -> Result<()> {
235 self.t = Some(t.clone());
236 self.r.set(t)
237 }
238
239 /// set the value pointed to by tref `*r <-`
240 ///
241 /// This will cause all nodes dependent on *id to update. This is the same
242 /// as the `*r <-` operator in Graphix. This does the same thing as
243 /// `GXHandle::set` using the target id.
244 pub fn set_deref(&mut self, t: T) -> Result<()> {
245 self.t = Some(t.clone());
246 self.r.set_deref(t.into())
247 }
248}
249
250atomic_id!(CallableId);
251
252pub struct Callable<X: GXExt> {
253 rt: GXHandle<X>,
254 id: CallableId,
255 env: Env,
256 pub typ: FnType,
257 pub expr: ExprId,
258}
259
260impl<X: GXExt> Drop for Callable<X> {
261 fn drop(&mut self) {
262 let _ = self.rt.0.tx.send(ToGX::DeleteCallable { id: self.id });
263 }
264}
265
266impl<X: GXExt> Callable<X> {
267 /// Get the id of this callable
268 pub fn id(&self) -> CallableId {
269 self.id
270 }
271
272 /// Call the lambda with args
273 ///
274 /// Argument types and arity will be checked and an error will be returned
275 /// if they are wrong. If you call the function more than once before it
276 /// returns there is no guarantee that the returns will arrive in the order
277 /// of the calls. There is no guarantee that a function must return.
278 pub async fn call(&self, args: ValArray) -> Result<()> {
279 if self.typ.args.len() != args.len() {
280 bail!("expected {} args", self.typ.args.len())
281 }
282 for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
283 if !a.typ.is_a(&self.env, v) {
284 bail!("type mismatch arg {i} expected {}", a.typ)
285 }
286 }
287 self.call_unchecked(args).await
288 }
289
290 /// Call the lambda with args. Argument types and arity will NOT
291 /// be checked. This can result in a runtime panic, invalid
292 /// results, and probably other bad things.
293 pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
294 self.rt
295 .0
296 .tx
297 .send(ToGX::Call { id: self.id, args })
298 .map_err(|_| anyhow!("runtime is dead"))
299 }
300
301 /// Return Some(v) if this update is the return value of the callable
302 pub fn update<'a>(&self, id: ExprId, v: &'a Value) -> Option<&'a Value> {
303 if self.expr == id {
304 Some(v)
305 } else {
306 None
307 }
308 }
309}
310
311enum DeferredCall {
312 Call(ValArray, oneshot::Sender<Result<()>>),
313 CallUnchecked(ValArray, oneshot::Sender<Result<()>>),
314}
315
316pub struct NamedCallable<X: GXExt> {
317 fname: Ref<X>,
318 current: Option<Callable<X>>,
319 ids: FxHashSet<ExprId>,
320 deferred: Vec<DeferredCall>,
321 h: GXHandle<X>,
322}
323
324impl<X: GXExt> NamedCallable<X> {
325 /// Update the named callable function
326 ///
327 /// This method does two things,
328 /// - Handle late binding. When the name ref updates to an actual function
329 /// compile the real call site
330 /// - Return Ok(Some(v)) when the called function returns
331 pub async fn update<'a>(
332 &mut self,
333 id: ExprId,
334 v: &'a Value,
335 ) -> Result<Option<&'a Value>> {
336 match self.fname.update(id, v) {
337 Some(v) => {
338 let callable = self.h.compile_callable(v.clone()).await?;
339 self.ids.insert(callable.expr);
340 for dc in self.deferred.drain(..) {
341 match dc {
342 DeferredCall::Call(args, reply) => {
343 let _ = reply.send(callable.call(args).await);
344 }
345 DeferredCall::CallUnchecked(args, reply) => {
346 let _ = reply.send(callable.call_unchecked(args).await);
347 }
348 }
349 }
350 self.current = Some(callable);
351 Ok(None)
352 }
353 None if self.ids.contains(&id) => Ok(Some(v)),
354 None => Ok(None),
355 }
356 }
357
358 /// Call the lambda with args
359 ///
360 /// Argument types and arity will be checked and an error will be returned
361 /// if they are wrong. If you call the function more than once before it
362 /// returns there is no guarantee that the returns will arrive in the order
363 /// of the calls. There is no guarantee that a function must return. In
364 /// order to handle late binding you must keep calling `update` while
365 /// waiting for this method.
366 ///
367 /// While a late bound function is unresolved calls will queue internally in
368 /// the NamedCallsite and will happen when the function is resolved.
369 pub async fn call(&mut self, args: ValArray) -> Result<()> {
370 match &self.current {
371 Some(c) => c.call(args).await,
372 None => {
373 let (tx, rx) = oneshot::channel();
374 self.deferred.push(DeferredCall::Call(args, tx));
375 rx.await?
376 }
377 }
378 }
379
380 /// call the function with the specified args
381 ///
382 /// Argument types and arity will NOT be checked by this method. If you call
383 /// the function more than once before it returns there is no guarantee that
384 /// the returns will arrive in the order of the calls. There is no guarantee
385 /// that a function must return. In order to handle late binding you must
386 /// keep calling `update` while waiting for this method.
387 ///
388 /// While a late bound function is unresolved calls will queue internally in
389 /// the NamedCallsite and will happen when the function is resolved.
390 pub async fn call_unchecked(&mut self, args: ValArray) -> Result<()> {
391 match &self.current {
392 Some(c) => c.call(args).await,
393 None => {
394 let (tx, rx) = oneshot::channel();
395 self.deferred.push(DeferredCall::CallUnchecked(args, tx));
396 rx.await?
397 }
398 }
399 }
400}
401
402enum ToGX<X: GXExt> {
403 GetEnv {
404 res: oneshot::Sender<Env>,
405 },
406 Delete {
407 id: ExprId,
408 },
409 Load {
410 path: Source,
411 rt: GXHandle<X>,
412 res: oneshot::Sender<Result<CompRes<X>>>,
413 },
414 Check {
415 path: Source,
416 res: oneshot::Sender<Result<()>>,
417 },
418 Compile {
419 text: ArcStr,
420 rt: GXHandle<X>,
421 res: oneshot::Sender<Result<CompRes<X>>>,
422 },
423 CompileCallable {
424 id: Value,
425 rt: GXHandle<X>,
426 res: oneshot::Sender<Result<Callable<X>>>,
427 },
428 CompileRef {
429 id: BindId,
430 rt: GXHandle<X>,
431 res: oneshot::Sender<Result<Ref<X>>>,
432 },
433 Set {
434 id: BindId,
435 v: Value,
436 },
437 Call {
438 id: CallableId,
439 args: ValArray,
440 },
441 DeleteCallable {
442 id: CallableId,
443 },
444}
445
446#[derive(Debug, Clone)]
447pub enum GXEvent {
448 Updated(ExprId, Value),
449 Env(Env),
450}
451
452struct GXHandleInner<X: GXExt> {
453 tx: tmpsc::UnboundedSender<ToGX<X>>,
454 task: JoinHandle<()>,
455}
456
457impl<X: GXExt> Drop for GXHandleInner<X> {
458 fn drop(&mut self) {
459 self.task.abort()
460 }
461}
462
463/// A handle to a running GX instance.
464///
465/// Drop the handle to shutdown the associated background tasks.
466pub struct GXHandle<X: GXExt>(Arc<GXHandleInner<X>>);
467
468impl<X: GXExt> fmt::Debug for GXHandle<X> {
469 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
470 write!(f, "GXHandle")
471 }
472}
473
474impl<X: GXExt> Clone for GXHandle<X> {
475 fn clone(&self) -> Self {
476 Self(self.0.clone())
477 }
478}
479
480impl<X: GXExt> GXHandle<X> {
481 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
482 let (tx, rx) = oneshot::channel();
483 self.0.tx.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
484 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
485 }
486
487 /// Get a copy of the current graphix environment
488 pub async fn get_env(&self) -> Result<Env> {
489 self.exec(|res| ToGX::GetEnv { res }).await
490 }
491
492 /// Check that a graphix module compiles
493 ///
494 /// If path startes with `netidx:` then the module will be loaded
495 /// from netidx, otherwise it will be loaded from the
496 /// filesystem. If the file compiles successfully return Ok(())
497 /// otherwise an error describing the problem. The environment
498 /// will not be altered by checking an expression, so you will not
499 /// be able to use any defined names later in the program. If you
500 /// want to do that see `compile`.
501 pub async fn check(&self, path: Source) -> Result<()> {
502 Ok(self.exec(|tx| ToGX::Check { path, res: tx }).await??)
503 }
504
505 /// Compile and execute a graphix expression
506 ///
507 /// If it generates results, they will be sent to all the channels that are
508 /// subscribed. When the `CompExp` objects contained in the `CompRes` are
509 /// dropped their corresponding expressions will be deleted. Therefore, you
510 /// can stop execution of the whole expression by dropping the returned
511 /// `CompRes`.
512 pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
513 Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
514 }
515
516 /// Load and execute a file or netidx value
517 ///
518 /// When the `CompExp` objects contained in the `CompRes` are
519 /// dropped their corresponding expressions will be
520 /// deleted. Therefore, you can stop execution of the whole file
521 /// by dropping the returned `CompRes`.
522 pub async fn load(&self, path: Source) -> Result<CompRes<X>> {
523 Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
524 }
525
526 /// Compile a callable interface to a lambda id
527 ///
528 /// This is how you call a lambda directly from rust. When the returned
529 /// `Callable` is dropped the associated callsite will be delete.
530 pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
531 Ok(self
532 .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
533 .await??)
534 }
535
536 /// Compile a callable interface to a late bound function by name
537 ///
538 /// This allows you to call a function by name. Because of late binding it
539 /// has some additional complexity (though less than implementing it
540 /// yourself). You must call `update` on `NamedCallable` when you recieve
541 /// updates from the runtime in order to drive late binding. `update` will
542 /// also return `Some` when one of your function calls returns.
543 pub async fn compile_callable_by_name(
544 &self,
545 env: &Env,
546 scope: &Scope,
547 name: &ModPath,
548 ) -> Result<NamedCallable<X>> {
549 let r = self.compile_ref_by_name(env, scope, name).await?;
550 match &r.typ {
551 Type::Fn(_) => (),
552 t => bail!(
553 "{name} in scope {} has type {t}. expected a function",
554 scope.lexical
555 ),
556 }
557 Ok(NamedCallable {
558 fname: r,
559 current: None,
560 ids: FxHashSet::default(),
561 deferred: vec![],
562 h: self.clone(),
563 })
564 }
565
566 /// Compile a ref to a bind id
567 ///
568 /// This will NOT return an error if the id isn't in the environment.
569 pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
570 Ok(self
571 .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
572 .await??)
573 }
574
575 /// Compile a ref to a name
576 ///
577 /// Return an error if the name does not exist in the environment
578 pub async fn compile_ref_by_name(
579 &self,
580 env: &Env,
581 scope: &Scope,
582 name: &ModPath,
583 ) -> Result<Ref<X>> {
584 let id = env
585 .lookup_bind(&scope.lexical, name)
586 .ok_or_else(|| anyhow!("no such value {name} in scope {}", scope.lexical))?
587 .1
588 .id;
589 self.compile_ref(id).await
590 }
591
592 /// Set the variable idenfified by `id` to `v`
593 ///
594 /// triggering updates of all dependent node trees. This does the same thing
595 /// as`Ref::set` and `TRef::set`
596 pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
597 let v = v.into();
598 self.0.tx.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
599 }
600
601 /// Call a callable by id with the given arguments
602 ///
603 /// This is a fire-and-forget call that does not wait for the result.
604 /// Unlike `Callable::call`, no type or arity checking is performed.
605 pub fn call(&self, id: CallableId, args: ValArray) -> Result<()> {
606 self.0.tx.send(ToGX::Call { id, args }).map_err(|_| anyhow!("runtime is dead"))
607 }
608}
609
610#[derive(Builder)]
611#[builder(pattern = "owned")]
612pub struct GXConfig<X: GXExt> {
613 /// The subscribe timeout to use when resolving modules in
614 /// netidx. Resolution will fail if the subscription does not
615 /// succeed before this timeout elapses.
616 #[builder(setter(strip_option), default)]
617 resolve_timeout: Option<Duration>,
618 /// The publish timeout to use when sending published batches. Default None.
619 #[builder(setter(strip_option), default)]
620 publish_timeout: Option<Duration>,
621 /// The execution context with any builtins already registered
622 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
623 /// The text of the root module
624 #[builder(setter(strip_option), default)]
625 root: Option<ArcStr>,
626 /// The set of module resolvers to use when resolving loaded modules
627 #[builder(default)]
628 resolvers: Vec<ModuleResolver>,
629 /// The channel that will receive events from the runtime
630 sub: tmpsc::Sender<GPooled<Vec<GXEvent>>>,
631 /// The set of compiler flags. Default empty.
632 #[builder(default)]
633 flags: BitFlags<CFlag>,
634}
635
636impl<X: GXExt> GXConfig<X> {
637 /// Create a new config
638 pub fn builder(
639 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
640 sub: tmpsc::Sender<GPooled<Vec<GXEvent>>>,
641 ) -> GXConfigBuilder<X> {
642 GXConfigBuilder::default().ctx(ctx).sub(sub)
643 }
644
645 /// Start the graphix runtime with the specified config,
646 ///
647 /// return a handle capable of interacting with it. root is the text of the
648 /// root module you wish to initially load. This will define the environment
649 /// for the rest of the code compiled by this runtime. The runtime starts
650 /// completely empty, with only the language, no core library, no standard
651 /// library. To build a runtime with the full standard library and nothing
652 /// else simply pass the output of `graphix_stdlib::register` to start.
653 pub async fn start(self) -> Result<GXHandle<X>> {
654 let (init_tx, init_rx) = oneshot::channel();
655 let (tx, rx) = tmpsc::unbounded_channel();
656 let task = task::spawn(async move {
657 match GX::new(self).await {
658 Ok(bs) => {
659 let _ = init_tx.send(Ok(()));
660 if let Err(e) = bs.run(rx).await {
661 error!("run loop exited with error {e:?}")
662 }
663 }
664 Err(e) => {
665 let _ = init_tx.send(Err(e));
666 }
667 };
668 });
669 init_rx.await??;
670 Ok(GXHandle(Arc::new(GXHandleInner { tx, task })))
671 }
672}