graphix_rt/
lib.rs

1/// A general purpose graphix runtime
2///
3/// This module implements a generic graphix runtime suitable for most
4/// applications, including applications that implement custom graphix
5/// builtins. The graphix interperter is run in a background task, and
6/// can be interacted with via a handle. All features of the standard
7/// library are supported by this runtime.
8///
9/// In special cases where this runtime is not suitable for your
10/// application you can implement your own, see the [Ctx] and
11/// [UserEvent] traits.
12use anyhow::{anyhow, bail, Context, Result};
13use arcstr::{literal, ArcStr};
14use chrono::prelude::*;
15use combine::stream::position::SourcePosition;
16use compact_str::format_compact;
17use core::fmt;
18use derive_builder::Builder;
19use futures::{channel::mpsc, future::try_join_all, FutureExt, StreamExt};
20use fxhash::{FxBuildHasher, FxHashMap};
21use graphix_compiler::{
22    compile,
23    env::Env,
24    expr::{
25        self, Expr, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin, Source,
26    },
27    node::genn,
28    typ::{FnType, Type},
29    BindId, Ctx, Event, ExecCtx, LambdaId, NoUserEvent, Node, REFS,
30};
31use indexmap::IndexMap;
32use log::{debug, error, info};
33use netidx::{
34    path::Path,
35    pool::{Pool, Pooled},
36    protocol::valarray::ValArray,
37    publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
38    resolver_client::ChangeTracker,
39    subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
40};
41use netidx_core::atomic_id;
42use netidx_protocols::rpc::{
43    self,
44    server::{ArgSpec, RpcCall},
45};
46use serde_derive::{Deserialize, Serialize};
47use smallvec::{smallvec, SmallVec};
48use std::{
49    collections::{hash_map::Entry, HashMap, VecDeque},
50    future, mem,
51    os::unix::ffi::OsStrExt,
52    panic::{catch_unwind, AssertUnwindSafe},
53    path::{Component, PathBuf},
54    result,
55    sync::{LazyLock, Weak},
56    time::Duration,
57};
58use tokio::{
59    fs, select,
60    sync::{
61        mpsc::{self as tmpsc, error::SendTimeoutError, UnboundedReceiver},
62        oneshot, Mutex,
63    },
64    task::{self, JoinError, JoinSet},
65    time::{self, Instant},
66};
67use triomphe::Arc;
68
69type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
70type WriteBatch = Pooled<Vec<WriteRequest>>;
71
72#[derive(Debug)]
73pub struct CouldNotResolve;
74
75impl fmt::Display for CouldNotResolve {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        write!(f, "could not resolve module")
78    }
79}
80
81#[derive(Debug)]
82struct RpcClient {
83    proc: rpc::client::Proc,
84    last_used: Instant,
85}
86
87#[derive(Debug)]
88pub struct GXCtx {
89    by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
90    subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
91    published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
92    var_updates: VecDeque<(BindId, Value)>,
93    net_updates: VecDeque<(SubId, subscriber::Event)>,
94    net_writes: VecDeque<(Id, WriteRequest)>,
95    rpc_overflow: VecDeque<(BindId, RpcCall)>,
96    rpc_clients: FxHashMap<Path, RpcClient>,
97    published_rpcs: FxHashMap<Path, rpc::server::Proc>,
98    pending_unsubscribe: VecDeque<(Instant, Dval)>,
99    change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
100    tasks: JoinSet<(BindId, Value)>,
101    batch: publisher::UpdateBatch,
102    publisher: Publisher,
103    subscriber: Subscriber,
104    updates_tx: mpsc::Sender<UpdateBatch>,
105    updates: mpsc::Receiver<UpdateBatch>,
106    writes_tx: mpsc::Sender<WriteBatch>,
107    writes: mpsc::Receiver<WriteBatch>,
108    rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
109    rpcs: mpsc::Receiver<(BindId, RpcCall)>,
110}
111
112impl GXCtx {
113    pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
114        let (updates_tx, updates) = mpsc::channel(100);
115        let (writes_tx, writes) = mpsc::channel(100);
116        let (rpcs_tx, rpcs) = mpsc::channel(100);
117        let batch = publisher.start_batch();
118        let mut tasks = JoinSet::new();
119        tasks.spawn(async { future::pending().await });
120        Self {
121            by_ref: HashMap::default(),
122            var_updates: VecDeque::new(),
123            net_updates: VecDeque::new(),
124            net_writes: VecDeque::new(),
125            rpc_overflow: VecDeque::new(),
126            rpc_clients: HashMap::default(),
127            subscribed: HashMap::default(),
128            pending_unsubscribe: VecDeque::new(),
129            published: HashMap::default(),
130            change_trackers: HashMap::default(),
131            published_rpcs: HashMap::default(),
132            tasks,
133            batch,
134            publisher,
135            subscriber,
136            updates,
137            updates_tx,
138            writes,
139            writes_tx,
140            rpcs_tx,
141            rpcs,
142        }
143    }
144}
145
146macro_rules! or_err {
147    ($bindid:expr, $e:expr) => {
148        match $e {
149            Ok(v) => v,
150            Err(e) => {
151                let e = ArcStr::from(format_compact!("{e:?}").as_str());
152                let e = Value::Error(e);
153                return ($bindid, e);
154            }
155        }
156    };
157}
158
159macro_rules! check_changed {
160    ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
161        let mut ct = $ct.lock().await;
162        if ct.path() != &$path {
163            *ct = ChangeTracker::new($path.clone());
164        }
165        if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
166            return ($id, Value::Null);
167        }
168    };
169}
170
171impl Ctx for GXCtx {
172    fn clear(&mut self) {
173        let Self {
174            by_ref,
175            var_updates,
176            net_updates,
177            net_writes,
178            rpc_clients,
179            rpc_overflow,
180            subscribed,
181            published,
182            published_rpcs,
183            pending_unsubscribe,
184            change_trackers,
185            tasks,
186            batch,
187            publisher,
188            subscriber: _,
189            updates_tx,
190            updates,
191            writes_tx,
192            writes,
193            rpcs,
194            rpcs_tx,
195        } = self;
196        by_ref.clear();
197        var_updates.clear();
198        net_updates.clear();
199        net_writes.clear();
200        rpc_overflow.clear();
201        rpc_clients.clear();
202        subscribed.clear();
203        published.clear();
204        published_rpcs.clear();
205        pending_unsubscribe.clear();
206        change_trackers.clear();
207        *tasks = JoinSet::new();
208        tasks.spawn(async { future::pending().await });
209        *batch = publisher.start_batch();
210        let (tx, rx) = mpsc::channel(3);
211        *updates_tx = tx;
212        *updates = rx;
213        let (tx, rx) = mpsc::channel(100);
214        *writes_tx = tx;
215        *writes = rx;
216        let (tx, rx) = mpsc::channel(100);
217        *rpcs_tx = tx;
218        *rpcs = rx
219    }
220
221    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
222        let now = Instant::now();
223        let proc = match self.rpc_clients.entry(name) {
224            Entry::Occupied(mut e) => {
225                let cl = e.get_mut();
226                cl.last_used = now;
227                Ok(cl.proc.clone())
228            }
229            Entry::Vacant(e) => {
230                match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
231                    Err(e) => Err(e),
232                    Ok(proc) => {
233                        let cl = RpcClient { last_used: now, proc: proc.clone() };
234                        e.insert(cl);
235                        Ok(proc)
236                    }
237                }
238            }
239        };
240        self.tasks.spawn(async move {
241            macro_rules! err {
242                ($e:expr) => {{
243                    let e = format_compact!("{:?}", $e);
244                    (id, Value::Error(e.as_str().into()))
245                }};
246            }
247            match proc {
248                Err(e) => err!(e),
249                Ok(proc) => match proc.call(args).await {
250                    Err(e) => err!(e),
251                    Ok(res) => (id, res),
252                },
253            }
254        });
255    }
256
257    fn publish_rpc(
258        &mut self,
259        name: Path,
260        doc: Value,
261        spec: Vec<ArgSpec>,
262        id: BindId,
263    ) -> Result<()> {
264        use rpc::server::Proc;
265        let e = match self.published_rpcs.entry(name) {
266            Entry::Vacant(e) => e,
267            Entry::Occupied(_) => bail!("already published"),
268        };
269        let proc = Proc::new(
270            &self.publisher,
271            e.key().clone(),
272            doc,
273            spec,
274            move |c| Some((id, c)),
275            Some(self.rpcs_tx.clone()),
276        )?;
277        e.insert(proc);
278        Ok(())
279    }
280
281    fn unpublish_rpc(&mut self, name: Path) {
282        self.published_rpcs.remove(&name);
283    }
284
285    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
286        let dval =
287            self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
288        *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
289        dval
290    }
291
292    fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
293        if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
294            if let Some(cn) = exprs.get_mut(&ref_by) {
295                *cn -= 1;
296                if *cn == 0 {
297                    exprs.remove(&ref_by);
298                }
299            }
300            if exprs.is_empty() {
301                self.subscribed.remove(&dv.id());
302            }
303        }
304        self.pending_unsubscribe.push_back((Instant::now(), dv));
305    }
306
307    fn list(&mut self, id: BindId, path: Path) {
308        let ct = self
309            .change_trackers
310            .entry(id)
311            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
312        let ct = Arc::clone(ct);
313        let resolver = self.subscriber.resolver();
314        self.tasks.spawn(async move {
315            check_changed!(id, resolver, path, ct);
316            let mut paths = or_err!(id, resolver.list(path).await);
317            let paths = paths.drain(..).map(|p| Value::String(p.into()));
318            (id, Value::Array(ValArray::from_iter_exact(paths)))
319        });
320    }
321
322    fn list_table(&mut self, id: BindId, path: Path) {
323        let ct = self
324            .change_trackers
325            .entry(id)
326            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
327        let ct = Arc::clone(ct);
328        let resolver = self.subscriber.resolver();
329        self.tasks.spawn(async move {
330            check_changed!(id, resolver, path, ct);
331            let mut tbl = or_err!(id, resolver.table(path).await);
332            let cols = tbl.cols.drain(..).map(|(name, count)| {
333                Value::Array(ValArray::from([
334                    Value::String(name.into()),
335                    Value::V64(count.0),
336                ]))
337            });
338            let cols = Value::Array(ValArray::from_iter_exact(cols));
339            let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
340            let rows = Value::Array(ValArray::from_iter_exact(rows));
341            let tbl = Value::Array(ValArray::from([
342                Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
343                Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
344            ]));
345            (id, tbl)
346        });
347    }
348
349    fn stop_list(&mut self, id: BindId) {
350        self.change_trackers.remove(&id);
351    }
352
353    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
354        let val = self.publisher.publish_with_flags_and_writes(
355            PublishFlags::empty(),
356            path,
357            value,
358            Some(self.writes_tx.clone()),
359        )?;
360        let id = val.id();
361        *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
362        Ok(val)
363    }
364
365    fn update(&mut self, val: &Val, value: Value) {
366        val.update(&mut self.batch, value);
367    }
368
369    fn unpublish(&mut self, val: Val, ref_by: ExprId) {
370        if let Some(refs) = self.published.get_mut(&val.id()) {
371            if let Some(cn) = refs.get_mut(&ref_by) {
372                *cn -= 1;
373                if *cn == 0 {
374                    refs.remove(&ref_by);
375                }
376            }
377            if refs.is_empty() {
378                self.published.remove(&val.id());
379            }
380        }
381    }
382
383    fn set_timer(&mut self, id: BindId, timeout: Duration) {
384        self.tasks
385            .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
386    }
387
388    fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
389        *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
390    }
391
392    fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
393        if let Some(refs) = self.by_ref.get_mut(&id) {
394            if let Some(cn) = refs.get_mut(&ref_by) {
395                *cn -= 1;
396                if *cn == 0 {
397                    refs.remove(&ref_by);
398                }
399            }
400            if refs.is_empty() {
401                self.by_ref.remove(&id);
402            }
403        }
404    }
405
406    fn set_var(&mut self, id: BindId, value: Value) {
407        self.var_updates.push_back((id, value.clone()));
408    }
409}
410
411fn is_output(n: &Node<GXCtx, NoUserEvent>) -> bool {
412    match &n.spec().kind {
413        ExprKind::Bind { .. }
414        | ExprKind::Lambda { .. }
415        | ExprKind::Use { .. }
416        | ExprKind::Connect { .. }
417        | ExprKind::Module { .. }
418        | ExprKind::TypeDef { .. } => false,
419        _ => true,
420    }
421}
422
423async fn or_never(b: bool) {
424    if !b {
425        future::pending().await
426    }
427}
428
429async fn join_or_wait(
430    js: &mut JoinSet<(BindId, Value)>,
431) -> result::Result<(BindId, Value), JoinError> {
432    match js.join_next().await {
433        None => future::pending().await,
434        Some(r) => r,
435    }
436}
437
438async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
439    if go {
440        match ch.next().await {
441            None => future::pending().await,
442            Some(v) => v,
443        }
444    } else {
445        future::pending().await
446    }
447}
448
449async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
450    if pending.len() == 0 {
451        future::pending().await
452    } else {
453        let (ts, _) = pending.front().unwrap();
454        let one = Duration::from_secs(1);
455        let elapsed = now - *ts;
456        if elapsed < one {
457            time::sleep(one - elapsed).await
458        }
459    }
460}
461
462pub struct CompExp {
463    pub id: ExprId,
464    pub typ: Type,
465    pub output: bool,
466    rt: GXHandle,
467}
468
469impl Drop for CompExp {
470    fn drop(&mut self) {
471        let _ = self.rt.0.send(ToRt::Delete { id: self.id });
472    }
473}
474
475pub struct CompRes {
476    pub exprs: SmallVec<[CompExp; 1]>,
477    pub env: Env<GXCtx, NoUserEvent>,
478}
479
480#[derive(Clone)]
481pub enum RtEvent {
482    Updated(ExprId, Value),
483    Env(Env<GXCtx, NoUserEvent>),
484}
485
486static BATCH: LazyLock<Pool<Vec<RtEvent>>> = LazyLock::new(|| Pool::new(10, 1000000));
487
488pub struct Ref {
489    pub id: ExprId,
490    pub last: Option<Value>,
491    pub bid: BindId,
492    pub target_bid: Option<BindId>,
493    rt: GXHandle,
494}
495
496impl Drop for Ref {
497    fn drop(&mut self) {
498        let _ = self.rt.0.send(ToRt::Delete { id: self.id });
499    }
500}
501
502impl Ref {
503    pub fn set(&self, v: Value) -> Result<()> {
504        self.rt.set(self.bid, v)
505    }
506
507    pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
508        if let Some(id) = self.target_bid {
509            self.rt.set(id, v)?
510        }
511        Ok(())
512    }
513}
514
515atomic_id!(CallableId);
516
517pub struct Callable {
518    rt: GXHandle,
519    id: CallableId,
520    env: Env<GXCtx, NoUserEvent>,
521    pub typ: FnType,
522    pub expr: ExprId,
523}
524
525impl Drop for Callable {
526    fn drop(&mut self) {
527        let _ = self.rt.0.send(ToRt::DeleteCallable { id: self.id });
528    }
529}
530
531impl Callable {
532    /// Call the lambda with args. Argument types and arity will be
533    /// checked and an error will be returned if they are wrong.
534    pub async fn call(&self, args: ValArray) -> Result<()> {
535        if self.typ.args.len() != args.len() {
536            bail!("expected {} args", self.typ.args.len())
537        }
538        for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
539            if !a.typ.is_a(&self.env, v) {
540                bail!("type mismatch arg {i} expected {}", a.typ)
541            }
542        }
543        self.call_unchecked(args).await
544    }
545
546    /// Call the lambda with args. Argument types and arity will NOT
547    /// be checked. This can result in a runtime panic, invalid
548    /// results, and probably other bad things.
549    pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
550        self.rt
551            .0
552            .send(ToRt::Call { id: self.id, args })
553            .map_err(|_| anyhow!("runtime is dead"))
554    }
555}
556
557enum ToRt {
558    GetEnv { res: oneshot::Sender<Env<GXCtx, NoUserEvent>> },
559    Delete { id: ExprId },
560    Load { path: PathBuf, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
561    Compile { text: ArcStr, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
562    CompileCallable { id: Value, rt: GXHandle, res: oneshot::Sender<Result<Callable>> },
563    CompileRef { id: BindId, rt: GXHandle, res: oneshot::Sender<Result<Ref>> },
564    Set { id: BindId, v: Value },
565    Call { id: CallableId, args: ValArray },
566    DeleteCallable { id: CallableId },
567}
568
569struct CallableInt {
570    expr: ExprId,
571    args: Box<[BindId]>,
572}
573
574struct GX {
575    ctx: ExecCtx<GXCtx, NoUserEvent>,
576    event: Event<NoUserEvent>,
577    updated: FxHashMap<ExprId, bool>,
578    nodes: IndexMap<ExprId, Node<GXCtx, NoUserEvent>, FxBuildHasher>,
579    callables: FxHashMap<CallableId, CallableInt>,
580    sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
581    resolvers: Arc<[ModuleResolver]>,
582    publish_timeout: Option<Duration>,
583    last_rpc_gc: Instant,
584}
585
586impl GX {
587    async fn new(mut rt: GXConfig) -> Result<Self> {
588        let resolvers_default = |r: &mut Vec<ModuleResolver>| match dirs::data_dir() {
589            None => r.push(ModuleResolver::Files("".into())),
590            Some(dd) => {
591                r.push(ModuleResolver::Files("".into()));
592                r.push(ModuleResolver::Files(dd.join("graphix")));
593            }
594        };
595        match std::env::var("GRAPHIX_MODPATH") {
596            Err(_) => resolvers_default(&mut rt.resolvers),
597            Ok(mp) => match ModuleResolver::parse_env(
598                rt.ctx.user.subscriber.clone(),
599                rt.resolve_timeout,
600                &mp,
601            ) {
602                Ok(r) => rt.resolvers.extend(r),
603                Err(e) => {
604                    error!("failed to parse GRAPHIX_MODPATH, using default {e:?}");
605                    resolvers_default(&mut rt.resolvers)
606                }
607            },
608        };
609        let mut t = Self {
610            ctx: rt.ctx,
611            event: Event::new(NoUserEvent),
612            updated: HashMap::default(),
613            nodes: IndexMap::default(),
614            callables: HashMap::default(),
615            sub: rt.sub,
616            resolvers: Arc::from(rt.resolvers),
617            publish_timeout: rt.publish_timeout,
618            last_rpc_gc: Instant::now(),
619        };
620        let st = Instant::now();
621        if let Some(root) = rt.root {
622            t.compile_root(root).await?;
623        }
624        info!("root init time: {:?}", st.elapsed());
625        Ok(t)
626    }
627
628    async fn do_cycle(
629        &mut self,
630        updates: Option<UpdateBatch>,
631        writes: Option<WriteBatch>,
632        tasks: &mut Vec<(BindId, Value)>,
633        rpcs: &mut Vec<(BindId, RpcCall)>,
634        to_rt: &mut UnboundedReceiver<ToRt>,
635        input: &mut Vec<ToRt>,
636        mut batch: Pooled<Vec<RtEvent>>,
637    ) {
638        macro_rules! push_event {
639            ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
640                match self.event.$event.entry($id) {
641                    Entry::Vacant(e) => {
642                        e.insert($v);
643                        if let Some(exps) = self.ctx.user.$refed.get(&$id) {
644                            for id in exps.keys() {
645                                self.updated.entry(*id).or_insert(false);
646                            }
647                        }
648                    }
649                    Entry::Occupied(_) => {
650                        self.ctx.user.$overflow.push_back(($id, $v));
651                    }
652                }
653            };
654        }
655        for _ in 0..self.ctx.user.var_updates.len() {
656            let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
657            push_event!(id, v, variables, by_ref, var_updates)
658        }
659        for (id, v) in tasks.drain(..) {
660            push_event!(id, v, variables, by_ref, var_updates)
661        }
662        for _ in 0..self.ctx.user.rpc_overflow.len() {
663            let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
664            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
665        }
666        for (id, v) in rpcs.drain(..) {
667            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
668        }
669        for _ in 0..self.ctx.user.net_updates.len() {
670            let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
671            push_event!(id, v, netidx, subscribed, net_updates)
672        }
673        if let Some(mut updates) = updates {
674            for (id, v) in updates.drain(..) {
675                push_event!(id, v, netidx, subscribed, net_updates)
676            }
677        }
678        for _ in 0..self.ctx.user.net_writes.len() {
679            let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
680            push_event!(id, v, writes, published, net_writes)
681        }
682        if let Some(mut writes) = writes {
683            for wr in writes.drain(..) {
684                let id = wr.id;
685                push_event!(id, wr, writes, published, net_writes)
686            }
687        }
688        for (id, n) in self.nodes.iter_mut() {
689            if let Some(init) = self.updated.get(id) {
690                let mut clear: SmallVec<[BindId; 16]> = smallvec![];
691                self.event.init = *init;
692                if self.event.init {
693                    REFS.with_borrow_mut(|refs| {
694                        refs.clear();
695                        n.refs(refs);
696                        refs.with_external_refs(|id| {
697                            if let Some(v) = self.ctx.cached.get(&id) {
698                                if let Entry::Vacant(e) = self.event.variables.entry(id) {
699                                    e.insert(v.clone());
700                                    clear.push(id);
701                                }
702                            }
703                        });
704                    });
705                }
706                let res = catch_unwind(AssertUnwindSafe(|| {
707                    n.update(&mut self.ctx, &mut self.event)
708                }));
709                for id in clear {
710                    self.event.variables.remove(&id);
711                }
712                match res {
713                    Ok(None) => (),
714                    Ok(Some(v)) => batch.push(RtEvent::Updated(*id, v)),
715                    Err(e) => {
716                        error!("could not update exprid: {id:?}, panic: {e:?}")
717                    }
718                }
719            }
720        }
721        loop {
722            match self.sub.send_timeout(batch, Duration::from_millis(100)).await {
723                Ok(()) => break,
724                Err(SendTimeoutError::Closed(_)) => {
725                    error!("could not send batch");
726                    break;
727                }
728                Err(SendTimeoutError::Timeout(b)) => {
729                    batch = b;
730                    // prevent deadlock on input
731                    while let Ok(m) = to_rt.try_recv() {
732                        input.push(m);
733                    }
734                    self.process_input_batch(tasks, input, &mut batch).await;
735                }
736            }
737        }
738        self.event.clear();
739        self.updated.clear();
740        if self.ctx.user.batch.len() > 0 {
741            let batch = mem::replace(
742                &mut self.ctx.user.batch,
743                self.ctx.user.publisher.start_batch(),
744            );
745            let timeout = self.publish_timeout;
746            task::spawn(async move { batch.commit(timeout).await });
747        }
748    }
749
750    async fn process_input_batch(
751        &mut self,
752        tasks: &mut Vec<(BindId, Value)>,
753        input: &mut Vec<ToRt>,
754        batch: &mut Pooled<Vec<RtEvent>>,
755    ) {
756        for m in input.drain(..) {
757            match m {
758                ToRt::GetEnv { res } => {
759                    let _ = res.send(self.ctx.env.clone());
760                }
761                ToRt::Compile { text, rt, res } => {
762                    let _ = res.send(self.compile(rt, text).await);
763                }
764                ToRt::Load { path, rt, res } => {
765                    let _ = res.send(self.load(rt, &path).await);
766                }
767                ToRt::Delete { id } => {
768                    if let Some(mut n) = self.nodes.shift_remove(&id) {
769                        n.delete(&mut self.ctx);
770                    }
771                    debug!("delete {id:?}");
772                    batch.push(RtEvent::Env(self.ctx.env.clone()));
773                }
774                ToRt::CompileCallable { id, rt, res } => {
775                    let _ = res.send(self.compile_callable(id, rt));
776                }
777                ToRt::CompileRef { id, rt, res } => {
778                    let _ = res.send(self.compile_ref(rt, id));
779                }
780                ToRt::Set { id, v } => {
781                    self.ctx.cached.insert(id, v.clone());
782                    tasks.push((id, v))
783                }
784                ToRt::DeleteCallable { id } => self.delete_callable(id),
785                ToRt::Call { id, args } => {
786                    if let Err(e) = self.call_callable(id, args, tasks) {
787                        error!("calling callable {id:?} failed with {e:?}")
788                    }
789                }
790            }
791        }
792    }
793
794    fn cycle_ready(&self) -> bool {
795        self.ctx.user.var_updates.len() > 0
796            || self.ctx.user.net_updates.len() > 0
797            || self.ctx.user.net_writes.len() > 0
798            || self.ctx.user.rpc_overflow.len() > 0
799    }
800
801    async fn compile_root(&mut self, text: ArcStr) -> Result<()> {
802        let scope = ModPath::root();
803        let ori = Origin { parent: None, source: Source::Unspecified, text };
804        let exprs =
805            expr::parser::parse(ori.clone()).context("parsing the root module")?;
806        let exprs =
807            try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
808                .await
809                .context(CouldNotResolve)?;
810        let nodes = exprs
811            .iter()
812            .map(|e| {
813                compile(&mut self.ctx, &scope, e.clone())
814                    .with_context(|| format!("compiling root expression {e}"))
815            })
816            .collect::<Result<SmallVec<[_; 4]>>>()
817            .with_context(|| ori.clone())?;
818        for (e, n) in exprs.iter().zip(nodes.into_iter()) {
819            self.updated.insert(e.id, true);
820            self.nodes.insert(e.id, n);
821        }
822        Ok(())
823    }
824
825    async fn compile(&mut self, rt: GXHandle, text: ArcStr) -> Result<CompRes> {
826        let scope = ModPath::root();
827        let ori = Origin { parent: None, source: Source::Unspecified, text };
828        let exprs = expr::parser::parse(ori.clone())?;
829        let exprs =
830            try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
831                .await
832                .context(CouldNotResolve)?;
833        let nodes = exprs
834            .iter()
835            .map(|e| compile(&mut self.ctx, &scope, e.clone()))
836            .collect::<Result<SmallVec<[_; 4]>>>()
837            .with_context(|| ori.clone())?;
838        let exprs = exprs
839            .iter()
840            .zip(nodes.into_iter())
841            .map(|(e, n)| {
842                let output = is_output(&n);
843                let typ = n.typ().clone();
844                self.updated.insert(e.id, true);
845                self.nodes.insert(e.id, n);
846                CompExp { id: e.id, output, typ, rt: rt.clone() }
847            })
848            .collect::<SmallVec<[_; 1]>>();
849        Ok(CompRes { exprs, env: self.ctx.env.clone() })
850    }
851
852    async fn load(&mut self, rt: GXHandle, file: &PathBuf) -> Result<CompRes> {
853        let scope = ModPath::root();
854        let st = Instant::now();
855        let (ori, exprs) = match file.extension() {
856            Some(e) if e.as_bytes() == b"gx" => {
857                let file = file.canonicalize()?;
858                let s = fs::read_to_string(&file).await?;
859                let s = if s.starts_with("#!") {
860                    if let Some(i) = s.find('\n') {
861                        &s[i..]
862                    } else {
863                        s.as_str()
864                    }
865                } else {
866                    s.as_str()
867                };
868                let ori = Origin {
869                    parent: None,
870                    source: Source::File(file),
871                    text: ArcStr::from(s),
872                };
873                (ori.clone(), expr::parser::parse(ori)?)
874            }
875            Some(e) => bail!("invalid file extension {e:?}"),
876            None => {
877                let name = file
878                    .components()
879                    .map(|c| match c {
880                        Component::RootDir
881                        | Component::CurDir
882                        | Component::ParentDir
883                        | Component::Prefix(_) => bail!("invalid module name {file:?}"),
884                        Component::Normal(s) => Ok(s),
885                    })
886                    .collect::<Result<Box<[_]>>>()?;
887                if name.len() != 1 {
888                    bail!("invalid module name {file:?}")
889                }
890                let name = String::from_utf8_lossy(name[0].as_bytes());
891                let name = name
892                    .parse::<ModPath>()
893                    .with_context(|| "parsing module name {file:?}")?;
894                let name = Path::basename(&*name)
895                    .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
896                let name = ArcStr::from(name);
897                let ori = Origin {
898                    parent: None,
899                    source: Source::Internal(name.clone()),
900                    text: literal!(""),
901                };
902                let kind = ExprKind::Module {
903                    export: true,
904                    name,
905                    value: ModuleKind::Unresolved,
906                };
907                let exprs = Arc::from(vec![Expr {
908                    id: ExprId::new(),
909                    ori: Arc::new(ori.clone()),
910                    pos: SourcePosition::default(),
911                    kind,
912                }]);
913                (ori, exprs)
914            }
915        };
916        info!("parse time: {:?}", st.elapsed());
917        let st = Instant::now();
918        let exprs =
919            try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
920                .await
921                .context(CouldNotResolve)?;
922        info!("resolve time: {:?}", st.elapsed());
923        let mut res = smallvec![];
924        for e in exprs.iter() {
925            let top_id = e.id;
926            let n =
927                compile(&mut self.ctx, &scope, e.clone()).with_context(|| ori.clone())?;
928            let has_out = is_output(&n);
929            let typ = n.typ().clone();
930            self.nodes.insert(top_id, n);
931            self.updated.insert(top_id, true);
932            res.push(CompExp { id: top_id, output: has_out, typ, rt: rt.clone() })
933        }
934        Ok(CompRes { exprs: res, env: self.ctx.env.clone() })
935    }
936
937    fn compile_callable(&mut self, id: Value, rt: GXHandle) -> Result<Callable> {
938        let id = match id {
939            Value::U64(id) => LambdaId::from(id),
940            v => bail!("invalid lambda id {v}"),
941        };
942        let lb = self.ctx.env.lambdas.get(&id).and_then(Weak::upgrade);
943        let lb = lb.ok_or_else(|| anyhow!("unknown lambda {id:?}"))?;
944        let args = lb.typ.args.iter();
945        let args = args
946            .map(|a| {
947                if a.label.as_ref().map(|(_, opt)| *opt).unwrap_or(false) {
948                    bail!("can't call lambda with an optional argument from rust")
949                } else {
950                    Ok(BindId::new())
951                }
952            })
953            .collect::<Result<Box<[_]>>>()?;
954        let eid = ExprId::new();
955        let argn = lb.typ.args.iter().zip(args.iter());
956        let argn = argn
957            .map(|(arg, id)| genn::reference(&mut self.ctx, *id, arg.typ.clone(), eid))
958            .collect::<Vec<_>>();
959        let fnode = genn::constant(Value::U64(id.inner()));
960        let mut n = genn::apply(fnode, argn, lb.typ.clone(), eid);
961        self.event.init = true;
962        n.update(&mut self.ctx, &mut self.event);
963        self.event.clear();
964        let cid = CallableId::new();
965        self.callables.insert(cid, CallableInt { expr: eid, args });
966        self.nodes.insert(eid, n);
967        let env = self.ctx.env.clone();
968        Ok(Callable { expr: eid, rt, env, id: cid, typ: (*lb.typ).clone() })
969    }
970
971    fn compile_ref(&mut self, rt: GXHandle, id: BindId) -> Result<Ref> {
972        let eid = ExprId::new();
973        let typ = Type::Any;
974        let n = genn::reference(&mut self.ctx, id, typ, eid);
975        self.nodes.insert(eid, n);
976        let target_bid = self.ctx.env.byref_chain.get(&id).copied();
977        Ok(Ref {
978            id: eid,
979            bid: id,
980            target_bid,
981            last: self.ctx.cached.get(&id).cloned(),
982            rt,
983        })
984    }
985
986    fn call_callable(
987        &mut self,
988        id: CallableId,
989        args: ValArray,
990        tasks: &mut Vec<(BindId, Value)>,
991    ) -> Result<()> {
992        let c =
993            self.callables.get(&id).ok_or_else(|| anyhow!("unknown callable {id:?}"))?;
994        if args.len() != c.args.len() {
995            bail!("expected {} arguments", c.args.len());
996        }
997        let a = c.args.iter().zip(args.iter()).map(|(id, v)| (*id, v.clone()));
998        tasks.extend(a);
999        Ok(())
1000    }
1001
1002    fn delete_callable(&mut self, id: CallableId) {
1003        if let Some(c) = self.callables.remove(&id) {
1004            if let Some(mut n) = self.nodes.shift_remove(&c.expr) {
1005                n.delete(&mut self.ctx)
1006            }
1007        }
1008    }
1009
1010    async fn run(mut self, mut to_rt: tmpsc::UnboundedReceiver<ToRt>) -> Result<()> {
1011        let mut tasks = vec![];
1012        let mut input = vec![];
1013        let mut rpcs = vec![];
1014        let onemin = Duration::from_secs(60);
1015        'main: loop {
1016            let now = Instant::now();
1017            let ready = self.cycle_ready();
1018            let mut updates = None;
1019            let mut writes = None;
1020            macro_rules! peek {
1021                (updates) => {
1022                    if self.ctx.user.net_updates.is_empty() {
1023                        while let Ok(Some(mut up)) = self.ctx.user.updates.try_next() {
1024                            match &mut updates {
1025                                None => updates = Some(up),
1026                                Some(prev) => prev.extend(up.drain(..)),
1027                            }
1028                        }
1029                    }
1030                };
1031                (writes) => {
1032                    if self.ctx.user.net_writes.is_empty() {
1033                        if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
1034                            writes = Some(wr);
1035                        }
1036                    }
1037                };
1038                (tasks) => {
1039                    while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
1040                        tasks.push(up);
1041                    }
1042                };
1043                (rpcs) => {
1044                    if self.ctx.user.rpc_overflow.is_empty() {
1045                        while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
1046                            rpcs.push(up);
1047                        }
1048                    }
1049                };
1050                (input) => {
1051                    while let Ok(m) = to_rt.try_recv() {
1052                        input.push(m);
1053                    }
1054                };
1055                ($($item:tt),+) => {{
1056                    $(peek!($item));+
1057                }};
1058            }
1059            select! {
1060                rp = maybe_next(
1061                    self.ctx.user.rpc_overflow.is_empty(),
1062                    &mut self.ctx.user.rpcs
1063                ) => {
1064                    rpcs.push(rp);
1065                    peek!(updates, tasks, writes, rpcs, input)
1066                }
1067                wr = maybe_next(
1068                    self.ctx.user.net_writes.is_empty(),
1069                    &mut self.ctx.user.writes
1070                ) => {
1071                    writes = Some(wr);
1072                    peek!(updates, tasks, rpcs, input);
1073                },
1074                up = maybe_next(
1075                    self.ctx.user.net_updates.is_empty(),
1076                    &mut self.ctx.user.updates
1077                ) => {
1078                    updates = Some(up);
1079                    peek!(updates, writes, tasks, rpcs, input);
1080                },
1081                up = join_or_wait(&mut self.ctx.user.tasks) => {
1082                    if let Ok(up) = up {
1083                        tasks.push(up);
1084                    }
1085                    peek!(updates, writes, tasks, rpcs, input)
1086                },
1087                _ = or_never(ready) => {
1088                    peek!(updates, writes, tasks, rpcs, input)
1089                },
1090                n = to_rt.recv_many(&mut input, 100000) => {
1091                    if n == 0 {
1092                        break 'main Ok(())
1093                    }
1094                    peek!(updates, writes, tasks, rpcs);
1095                },
1096                () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
1097                    while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
1098                        if ts.elapsed() >= Duration::from_secs(1) {
1099                            self.ctx.user.pending_unsubscribe.pop_front();
1100                        } else {
1101                            break
1102                        }
1103                    }
1104                    continue 'main
1105                },
1106            }
1107            let mut batch = BATCH.take();
1108            self.process_input_batch(&mut tasks, &mut input, &mut batch).await;
1109            self.do_cycle(
1110                updates, writes, &mut tasks, &mut rpcs, &mut to_rt, &mut input, batch,
1111            )
1112            .await;
1113            if !self.ctx.user.rpc_clients.is_empty() {
1114                if now - self.last_rpc_gc >= onemin {
1115                    self.last_rpc_gc = now;
1116                    self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
1117                }
1118            }
1119        }
1120    }
1121}
1122
1123/// A handle to a running GX instance. Drop the handle to shutdown the
1124/// associated background tasks.
1125#[derive(Clone)]
1126pub struct GXHandle(tmpsc::UnboundedSender<ToRt>);
1127
1128impl GXHandle {
1129    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
1130        let (tx, rx) = oneshot::channel();
1131        self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
1132        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
1133    }
1134
1135    /// Get a copy of the current graphix environment
1136    pub async fn get_env(&self) -> Result<Env<GXCtx, NoUserEvent>> {
1137        self.exec(|res| ToRt::GetEnv { res }).await
1138    }
1139
1140    /// Compile and execute the specified graphix expression. If it generates
1141    /// results, they will be sent to all the channels that are subscribed. When
1142    /// the `CompExp` objects contained in the `CompRes` are dropped their
1143    /// corresponding expressions will be deleted. Therefore, you can stop
1144    /// execution of the whole expression by dropping the returned `CompRes`.
1145    pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
1146        Ok(self.exec(|tx| ToRt::Compile { text, res: tx, rt: self.clone() }).await??)
1147    }
1148
1149    /// Load and execute the specified graphix module. The path may have one of
1150    /// two forms. If it is the path to a file with extension .bs then the rt
1151    /// will load the file directly. If it is a modpath (e.g. foo::bar::baz)
1152    /// then the module resolver will look for a matching module in the modpath.
1153    /// When the `CompExp` objects contained in the `CompRes` are dropped their
1154    /// corresponding expressions will be deleted. Therefore, you can stop
1155    /// execution of the whole file by dropping the returned `CompRes`.
1156    pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
1157        Ok(self.exec(|tx| ToRt::Load { path, res: tx, rt: self.clone() }).await??)
1158    }
1159
1160    /// Compile a callable interface to the specified lambda id. This is how you
1161    /// call a lambda directly from rust. When the returned `Callable` is
1162    /// dropped the associated callsite will be delete.
1163    pub async fn compile_callable(&self, id: Value) -> Result<Callable> {
1164        Ok(self
1165            .exec(|tx| ToRt::CompileCallable { id, rt: self.clone(), res: tx })
1166            .await??)
1167    }
1168
1169    /// Compile an expression that will output the value of the ref specifed by
1170    /// id. This is the same as the deref (*) operator in graphix. When the
1171    /// returned `Ref` is dropped the compiled code will be deleted.
1172    pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref> {
1173        Ok(self
1174            .exec(|tx| ToRt::CompileRef { id: id.into(), res: tx, rt: self.clone() })
1175            .await??)
1176    }
1177
1178    /// Set the variable idenfified by `id` to `v`, triggering updates of all
1179    /// dependent node trees.
1180    pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
1181        let v = v.into();
1182        self.0.send(ToRt::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
1183    }
1184}
1185
1186#[derive(Builder)]
1187#[builder(pattern = "owned")]
1188pub struct GXConfig {
1189    /// The subscribe timeout to use when resolving modules in
1190    /// netidx. Resolution will fail if the subscription does not
1191    /// succeed before this timeout elapses.
1192    #[builder(setter(strip_option), default)]
1193    resolve_timeout: Option<Duration>,
1194    /// The publish timeout to use when sending published batches. Default None.
1195    #[builder(setter(strip_option), default)]
1196    publish_timeout: Option<Duration>,
1197    /// The execution context with any builtins already registered
1198    ctx: ExecCtx<GXCtx, NoUserEvent>,
1199    /// The text of the root module
1200    #[builder(setter(strip_option), default)]
1201    root: Option<ArcStr>,
1202    /// The set of module resolvers to use when resolving loaded modules
1203    #[builder(default)]
1204    resolvers: Vec<ModuleResolver>,
1205    /// The channel that will receive events from the runtime
1206    sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1207}
1208
1209impl GXConfig {
1210    /// Create a new config
1211    pub fn builder(
1212        ctx: ExecCtx<GXCtx, NoUserEvent>,
1213        sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1214    ) -> GXConfigBuilder {
1215        GXConfigBuilder::default().ctx(ctx).sub(sub)
1216    }
1217
1218    /// Start the graphix runtime with the specified config, return a
1219    /// handle capable of interacting with it.
1220    ///
1221    /// root is the text of the root module you wish to initially
1222    /// load. This will define the environment for the rest of the
1223    /// code compiled by this runtime. The runtime starts completely
1224    /// empty, with only the language, no core library, no standard
1225    /// library. To build a runtime with the full standard library and
1226    /// nothing else simply pass the output of
1227    /// `graphix_stdlib::register` to start.
1228    pub async fn start(self) -> Result<GXHandle> {
1229        let (init_tx, init_rx) = oneshot::channel();
1230        let (tx, rx) = tmpsc::unbounded_channel();
1231        task::spawn(async move {
1232            match GX::new(self).await {
1233                Ok(bs) => {
1234                    let _ = init_tx.send(Ok(()));
1235                    if let Err(e) = bs.run(rx).await {
1236                        error!("run loop exited with error {e:?}")
1237                    }
1238                }
1239                Err(e) => {
1240                    let _ = init_tx.send(Err(e));
1241                }
1242            };
1243        });
1244        init_rx.await??;
1245        Ok(GXHandle(tx))
1246    }
1247}