graphix_rt/
lib.rs

1/// A general purpose graphix runtime
2///
3/// This module implements a generic graphix runtime suitable for most
4/// applications, including applications that implement custom graphix
5/// builtins. The graphix interperter is run in a background task, and
6/// can be interacted with via a handle. All features of the standard
7/// library are supported by this runtime.
8///
9/// In special cases where this runtime is not suitable for your
10/// application you can implement your own, see the [Ctx] and
11/// [UserEvent] traits.
12use anyhow::{anyhow, bail, Context, Result};
13use arcstr::{literal, ArcStr};
14use chrono::prelude::*;
15use combine::stream::position::SourcePosition;
16use compact_str::format_compact;
17use core::fmt;
18use derive_builder::Builder;
19use futures::{channel::mpsc, future::try_join_all, FutureExt, StreamExt};
20use fxhash::{FxBuildHasher, FxHashMap};
21use graphix_compiler::{
22    compile,
23    env::Env,
24    expr::{
25        self, Expr, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin, Source,
26    },
27    node::genn,
28    typ::{FnType, Type},
29    BindId, Ctx, Event, ExecCtx, LambdaId, NoUserEvent, Node, REFS,
30};
31use indexmap::IndexMap;
32use log::{debug, error, info};
33use netidx::{
34    path::Path,
35    pool::{Pool, Pooled},
36    protocol::valarray::ValArray,
37    publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
38    resolver_client::ChangeTracker,
39    subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
40};
41use netidx_core::atomic_id;
42use netidx_protocols::rpc::{
43    self,
44    server::{ArgSpec, RpcCall},
45};
46use serde_derive::{Deserialize, Serialize};
47use smallvec::{smallvec, SmallVec};
48use std::{
49    collections::{hash_map::Entry, HashMap, VecDeque},
50    future, mem,
51    panic::{catch_unwind, AssertUnwindSafe},
52    path::{Component, PathBuf},
53    result,
54    sync::{LazyLock, Weak},
55    time::Duration,
56};
57use tokio::{
58    fs, select,
59    sync::{
60        mpsc::{self as tmpsc, error::SendTimeoutError, UnboundedReceiver},
61        oneshot, Mutex,
62    },
63    task::{self, JoinError, JoinSet},
64    time::{self, Instant},
65};
66use triomphe::Arc;
67
68type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
69type WriteBatch = Pooled<Vec<WriteRequest>>;
70
71#[derive(Debug)]
72pub struct CouldNotResolve;
73
74impl fmt::Display for CouldNotResolve {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        write!(f, "could not resolve module")
77    }
78}
79
80#[derive(Debug)]
81struct RpcClient {
82    proc: rpc::client::Proc,
83    last_used: Instant,
84}
85
86#[derive(Debug)]
87pub struct GXCtx {
88    by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
89    subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
90    published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
91    var_updates: VecDeque<(BindId, Value)>,
92    net_updates: VecDeque<(SubId, subscriber::Event)>,
93    net_writes: VecDeque<(Id, WriteRequest)>,
94    rpc_overflow: VecDeque<(BindId, RpcCall)>,
95    rpc_clients: FxHashMap<Path, RpcClient>,
96    published_rpcs: FxHashMap<Path, rpc::server::Proc>,
97    pending_unsubscribe: VecDeque<(Instant, Dval)>,
98    change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
99    tasks: JoinSet<(BindId, Value)>,
100    batch: publisher::UpdateBatch,
101    publisher: Publisher,
102    subscriber: Subscriber,
103    updates_tx: mpsc::Sender<UpdateBatch>,
104    updates: mpsc::Receiver<UpdateBatch>,
105    writes_tx: mpsc::Sender<WriteBatch>,
106    writes: mpsc::Receiver<WriteBatch>,
107    rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
108    rpcs: mpsc::Receiver<(BindId, RpcCall)>,
109}
110
111impl GXCtx {
112    pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
113        let (updates_tx, updates) = mpsc::channel(100);
114        let (writes_tx, writes) = mpsc::channel(100);
115        let (rpcs_tx, rpcs) = mpsc::channel(100);
116        let batch = publisher.start_batch();
117        let mut tasks = JoinSet::new();
118        tasks.spawn(async { future::pending().await });
119        Self {
120            by_ref: HashMap::default(),
121            var_updates: VecDeque::new(),
122            net_updates: VecDeque::new(),
123            net_writes: VecDeque::new(),
124            rpc_overflow: VecDeque::new(),
125            rpc_clients: HashMap::default(),
126            subscribed: HashMap::default(),
127            pending_unsubscribe: VecDeque::new(),
128            published: HashMap::default(),
129            change_trackers: HashMap::default(),
130            published_rpcs: HashMap::default(),
131            tasks,
132            batch,
133            publisher,
134            subscriber,
135            updates,
136            updates_tx,
137            writes,
138            writes_tx,
139            rpcs_tx,
140            rpcs,
141        }
142    }
143}
144
145macro_rules! or_err {
146    ($bindid:expr, $e:expr) => {
147        match $e {
148            Ok(v) => v,
149            Err(e) => {
150                let e = ArcStr::from(format_compact!("{e:?}").as_str());
151                let e = Value::Error(e);
152                return ($bindid, e);
153            }
154        }
155    };
156}
157
158macro_rules! check_changed {
159    ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
160        let mut ct = $ct.lock().await;
161        if ct.path() != &$path {
162            *ct = ChangeTracker::new($path.clone());
163        }
164        if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
165            return ($id, Value::Null);
166        }
167    };
168}
169
170impl Ctx for GXCtx {
171    fn clear(&mut self) {
172        let Self {
173            by_ref,
174            var_updates,
175            net_updates,
176            net_writes,
177            rpc_clients,
178            rpc_overflow,
179            subscribed,
180            published,
181            published_rpcs,
182            pending_unsubscribe,
183            change_trackers,
184            tasks,
185            batch,
186            publisher,
187            subscriber: _,
188            updates_tx,
189            updates,
190            writes_tx,
191            writes,
192            rpcs,
193            rpcs_tx,
194        } = self;
195        by_ref.clear();
196        var_updates.clear();
197        net_updates.clear();
198        net_writes.clear();
199        rpc_overflow.clear();
200        rpc_clients.clear();
201        subscribed.clear();
202        published.clear();
203        published_rpcs.clear();
204        pending_unsubscribe.clear();
205        change_trackers.clear();
206        *tasks = JoinSet::new();
207        tasks.spawn(async { future::pending().await });
208        *batch = publisher.start_batch();
209        let (tx, rx) = mpsc::channel(3);
210        *updates_tx = tx;
211        *updates = rx;
212        let (tx, rx) = mpsc::channel(100);
213        *writes_tx = tx;
214        *writes = rx;
215        let (tx, rx) = mpsc::channel(100);
216        *rpcs_tx = tx;
217        *rpcs = rx
218    }
219
220    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
221        let now = Instant::now();
222        let proc = match self.rpc_clients.entry(name) {
223            Entry::Occupied(mut e) => {
224                let cl = e.get_mut();
225                cl.last_used = now;
226                Ok(cl.proc.clone())
227            }
228            Entry::Vacant(e) => {
229                match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
230                    Err(e) => Err(e),
231                    Ok(proc) => {
232                        let cl = RpcClient { last_used: now, proc: proc.clone() };
233                        e.insert(cl);
234                        Ok(proc)
235                    }
236                }
237            }
238        };
239        self.tasks.spawn(async move {
240            macro_rules! err {
241                ($e:expr) => {{
242                    let e = format_compact!("{:?}", $e);
243                    (id, Value::Error(e.as_str().into()))
244                }};
245            }
246            match proc {
247                Err(e) => err!(e),
248                Ok(proc) => match proc.call(args).await {
249                    Err(e) => err!(e),
250                    Ok(res) => (id, res),
251                },
252            }
253        });
254    }
255
256    fn publish_rpc(
257        &mut self,
258        name: Path,
259        doc: Value,
260        spec: Vec<ArgSpec>,
261        id: BindId,
262    ) -> Result<()> {
263        use rpc::server::Proc;
264        let e = match self.published_rpcs.entry(name) {
265            Entry::Vacant(e) => e,
266            Entry::Occupied(_) => bail!("already published"),
267        };
268        let proc = Proc::new(
269            &self.publisher,
270            e.key().clone(),
271            doc,
272            spec,
273            move |c| Some((id, c)),
274            Some(self.rpcs_tx.clone()),
275        )?;
276        e.insert(proc);
277        Ok(())
278    }
279
280    fn unpublish_rpc(&mut self, name: Path) {
281        self.published_rpcs.remove(&name);
282    }
283
284    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
285        let dval =
286            self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
287        *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
288        dval
289    }
290
291    fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
292        if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
293            if let Some(cn) = exprs.get_mut(&ref_by) {
294                *cn -= 1;
295                if *cn == 0 {
296                    exprs.remove(&ref_by);
297                }
298            }
299            if exprs.is_empty() {
300                self.subscribed.remove(&dv.id());
301            }
302        }
303        self.pending_unsubscribe.push_back((Instant::now(), dv));
304    }
305
306    fn list(&mut self, id: BindId, path: Path) {
307        let ct = self
308            .change_trackers
309            .entry(id)
310            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
311        let ct = Arc::clone(ct);
312        let resolver = self.subscriber.resolver();
313        self.tasks.spawn(async move {
314            check_changed!(id, resolver, path, ct);
315            let mut paths = or_err!(id, resolver.list(path).await);
316            let paths = paths.drain(..).map(|p| Value::String(p.into()));
317            (id, Value::Array(ValArray::from_iter_exact(paths)))
318        });
319    }
320
321    fn list_table(&mut self, id: BindId, path: Path) {
322        let ct = self
323            .change_trackers
324            .entry(id)
325            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
326        let ct = Arc::clone(ct);
327        let resolver = self.subscriber.resolver();
328        self.tasks.spawn(async move {
329            check_changed!(id, resolver, path, ct);
330            let mut tbl = or_err!(id, resolver.table(path).await);
331            let cols = tbl.cols.drain(..).map(|(name, count)| {
332                Value::Array(ValArray::from([
333                    Value::String(name.into()),
334                    Value::V64(count.0),
335                ]))
336            });
337            let cols = Value::Array(ValArray::from_iter_exact(cols));
338            let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
339            let rows = Value::Array(ValArray::from_iter_exact(rows));
340            let tbl = Value::Array(ValArray::from([
341                Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
342                Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
343            ]));
344            (id, tbl)
345        });
346    }
347
348    fn stop_list(&mut self, id: BindId) {
349        self.change_trackers.remove(&id);
350    }
351
352    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
353        let val = self.publisher.publish_with_flags_and_writes(
354            PublishFlags::empty(),
355            path,
356            value,
357            Some(self.writes_tx.clone()),
358        )?;
359        let id = val.id();
360        *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
361        Ok(val)
362    }
363
364    fn update(&mut self, val: &Val, value: Value) {
365        val.update(&mut self.batch, value);
366    }
367
368    fn unpublish(&mut self, val: Val, ref_by: ExprId) {
369        if let Some(refs) = self.published.get_mut(&val.id()) {
370            if let Some(cn) = refs.get_mut(&ref_by) {
371                *cn -= 1;
372                if *cn == 0 {
373                    refs.remove(&ref_by);
374                }
375            }
376            if refs.is_empty() {
377                self.published.remove(&val.id());
378            }
379        }
380    }
381
382    fn set_timer(&mut self, id: BindId, timeout: Duration) {
383        self.tasks
384            .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
385    }
386
387    fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
388        *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
389    }
390
391    fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
392        if let Some(refs) = self.by_ref.get_mut(&id) {
393            if let Some(cn) = refs.get_mut(&ref_by) {
394                *cn -= 1;
395                if *cn == 0 {
396                    refs.remove(&ref_by);
397                }
398            }
399            if refs.is_empty() {
400                self.by_ref.remove(&id);
401            }
402        }
403    }
404
405    fn set_var(&mut self, id: BindId, value: Value) {
406        self.var_updates.push_back((id, value.clone()));
407    }
408}
409
410fn is_output(n: &Node<GXCtx, NoUserEvent>) -> bool {
411    match &n.spec().kind {
412        ExprKind::Bind { .. }
413        | ExprKind::Lambda { .. }
414        | ExprKind::Use { .. }
415        | ExprKind::Connect { .. }
416        | ExprKind::Module { .. }
417        | ExprKind::TypeDef { .. } => false,
418        _ => true,
419    }
420}
421
422async fn or_never(b: bool) {
423    if !b {
424        future::pending().await
425    }
426}
427
428async fn join_or_wait(
429    js: &mut JoinSet<(BindId, Value)>,
430) -> result::Result<(BindId, Value), JoinError> {
431    match js.join_next().await {
432        None => future::pending().await,
433        Some(r) => r,
434    }
435}
436
437async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
438    if go {
439        match ch.next().await {
440            None => future::pending().await,
441            Some(v) => v,
442        }
443    } else {
444        future::pending().await
445    }
446}
447
448async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
449    if pending.len() == 0 {
450        future::pending().await
451    } else {
452        let (ts, _) = pending.front().unwrap();
453        let one = Duration::from_secs(1);
454        let elapsed = now - *ts;
455        if elapsed < one {
456            time::sleep(one - elapsed).await
457        }
458    }
459}
460
461pub struct CompExp {
462    pub id: ExprId,
463    pub typ: Type,
464    pub output: bool,
465    rt: GXHandle,
466}
467
468impl Drop for CompExp {
469    fn drop(&mut self) {
470        let _ = self.rt.0.send(ToRt::Delete { id: self.id });
471    }
472}
473
474pub struct CompRes {
475    pub exprs: SmallVec<[CompExp; 1]>,
476    pub env: Env<GXCtx, NoUserEvent>,
477}
478
479#[derive(Clone)]
480pub enum RtEvent {
481    Updated(ExprId, Value),
482    Env(Env<GXCtx, NoUserEvent>),
483}
484
485static BATCH: LazyLock<Pool<Vec<RtEvent>>> = LazyLock::new(|| Pool::new(10, 1000000));
486
487pub struct Ref {
488    pub id: ExprId,
489    pub last: Option<Value>,
490    pub bid: BindId,
491    pub target_bid: Option<BindId>,
492    rt: GXHandle,
493}
494
495impl Drop for Ref {
496    fn drop(&mut self) {
497        let _ = self.rt.0.send(ToRt::Delete { id: self.id });
498    }
499}
500
501impl Ref {
502    pub fn set(&self, v: Value) -> Result<()> {
503        self.rt.set(self.bid, v)
504    }
505
506    pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
507        if let Some(id) = self.target_bid {
508            self.rt.set(id, v)?
509        }
510        Ok(())
511    }
512}
513
514atomic_id!(CallableId);
515
516pub struct Callable {
517    rt: GXHandle,
518    id: CallableId,
519    env: Env<GXCtx, NoUserEvent>,
520    pub typ: FnType,
521    pub expr: ExprId,
522}
523
524impl Drop for Callable {
525    fn drop(&mut self) {
526        let _ = self.rt.0.send(ToRt::DeleteCallable { id: self.id });
527    }
528}
529
530impl Callable {
531    /// Call the lambda with args. Argument types and arity will be
532    /// checked and an error will be returned if they are wrong.
533    pub async fn call(&self, args: ValArray) -> Result<()> {
534        if self.typ.args.len() != args.len() {
535            bail!("expected {} args", self.typ.args.len())
536        }
537        for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
538            if !a.typ.is_a(&self.env, v) {
539                bail!("type mismatch arg {i} expected {}", a.typ)
540            }
541        }
542        self.call_unchecked(args).await
543    }
544
545    /// Call the lambda with args. Argument types and arity will NOT
546    /// be checked. This can result in a runtime panic, invalid
547    /// results, and probably other bad things.
548    pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
549        self.rt
550            .0
551            .send(ToRt::Call { id: self.id, args })
552            .map_err(|_| anyhow!("runtime is dead"))
553    }
554}
555
556enum ToRt {
557    GetEnv { res: oneshot::Sender<Env<GXCtx, NoUserEvent>> },
558    Delete { id: ExprId },
559    Load { path: PathBuf, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
560    Compile { text: ArcStr, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
561    CompileCallable { id: Value, rt: GXHandle, res: oneshot::Sender<Result<Callable>> },
562    CompileRef { id: BindId, rt: GXHandle, res: oneshot::Sender<Result<Ref>> },
563    Set { id: BindId, v: Value },
564    Call { id: CallableId, args: ValArray },
565    DeleteCallable { id: CallableId },
566}
567
568struct CallableInt {
569    expr: ExprId,
570    args: Box<[BindId]>,
571}
572
573struct GX {
574    ctx: ExecCtx<GXCtx, NoUserEvent>,
575    event: Event<NoUserEvent>,
576    updated: FxHashMap<ExprId, bool>,
577    nodes: IndexMap<ExprId, Node<GXCtx, NoUserEvent>, FxBuildHasher>,
578    callables: FxHashMap<CallableId, CallableInt>,
579    sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
580    resolvers: Arc<[ModuleResolver]>,
581    publish_timeout: Option<Duration>,
582    last_rpc_gc: Instant,
583}
584
585impl GX {
586    async fn new(mut rt: GXConfig) -> Result<Self> {
587        let resolvers_default = |r: &mut Vec<ModuleResolver>| match dirs::data_dir() {
588            None => r.push(ModuleResolver::Files("".into())),
589            Some(dd) => {
590                r.push(ModuleResolver::Files("".into()));
591                r.push(ModuleResolver::Files(dd.join("graphix")));
592            }
593        };
594        match std::env::var("GRAPHIX_MODPATH") {
595            Err(_) => resolvers_default(&mut rt.resolvers),
596            Ok(mp) => match ModuleResolver::parse_env(
597                rt.ctx.user.subscriber.clone(),
598                rt.resolve_timeout,
599                &mp,
600            ) {
601                Ok(r) => rt.resolvers.extend(r),
602                Err(e) => {
603                    error!("failed to parse GRAPHIX_MODPATH, using default {e:?}");
604                    resolvers_default(&mut rt.resolvers)
605                }
606            },
607        };
608        let mut t = Self {
609            ctx: rt.ctx,
610            event: Event::new(NoUserEvent),
611            updated: HashMap::default(),
612            nodes: IndexMap::default(),
613            callables: HashMap::default(),
614            sub: rt.sub,
615            resolvers: Arc::from(rt.resolvers),
616            publish_timeout: rt.publish_timeout,
617            last_rpc_gc: Instant::now(),
618        };
619        let st = Instant::now();
620        if let Some(root) = rt.root {
621            t.compile_root(root).await?;
622        }
623        info!("root init time: {:?}", st.elapsed());
624        Ok(t)
625    }
626
627    async fn do_cycle(
628        &mut self,
629        updates: Option<UpdateBatch>,
630        writes: Option<WriteBatch>,
631        tasks: &mut Vec<(BindId, Value)>,
632        rpcs: &mut Vec<(BindId, RpcCall)>,
633        to_rt: &mut UnboundedReceiver<ToRt>,
634        input: &mut Vec<ToRt>,
635        mut batch: Pooled<Vec<RtEvent>>,
636    ) {
637        macro_rules! push_event {
638            ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
639                match self.event.$event.entry($id) {
640                    Entry::Vacant(e) => {
641                        e.insert($v);
642                        if let Some(exps) = self.ctx.user.$refed.get(&$id) {
643                            for id in exps.keys() {
644                                self.updated.entry(*id).or_insert(false);
645                            }
646                        }
647                    }
648                    Entry::Occupied(_) => {
649                        self.ctx.user.$overflow.push_back(($id, $v));
650                    }
651                }
652            };
653        }
654        for _ in 0..self.ctx.user.var_updates.len() {
655            let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
656            push_event!(id, v, variables, by_ref, var_updates)
657        }
658        for (id, v) in tasks.drain(..) {
659            push_event!(id, v, variables, by_ref, var_updates)
660        }
661        for _ in 0..self.ctx.user.rpc_overflow.len() {
662            let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
663            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
664        }
665        for (id, v) in rpcs.drain(..) {
666            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
667        }
668        for _ in 0..self.ctx.user.net_updates.len() {
669            let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
670            push_event!(id, v, netidx, subscribed, net_updates)
671        }
672        if let Some(mut updates) = updates {
673            for (id, v) in updates.drain(..) {
674                push_event!(id, v, netidx, subscribed, net_updates)
675            }
676        }
677        for _ in 0..self.ctx.user.net_writes.len() {
678            let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
679            push_event!(id, v, writes, published, net_writes)
680        }
681        if let Some(mut writes) = writes {
682            for wr in writes.drain(..) {
683                let id = wr.id;
684                push_event!(id, wr, writes, published, net_writes)
685            }
686        }
687        for (id, n) in self.nodes.iter_mut() {
688            if let Some(init) = self.updated.get(id) {
689                let mut clear: SmallVec<[BindId; 16]> = smallvec![];
690                self.event.init = *init;
691                if self.event.init {
692                    REFS.with_borrow_mut(|refs| {
693                        refs.clear();
694                        n.refs(refs);
695                        refs.with_external_refs(|id| {
696                            if let Some(v) = self.ctx.cached.get(&id) {
697                                if let Entry::Vacant(e) = self.event.variables.entry(id) {
698                                    e.insert(v.clone());
699                                    clear.push(id);
700                                }
701                            }
702                        });
703                    });
704                }
705                let res = catch_unwind(AssertUnwindSafe(|| {
706                    n.update(&mut self.ctx, &mut self.event)
707                }));
708                for id in clear {
709                    self.event.variables.remove(&id);
710                }
711                match res {
712                    Ok(None) => (),
713                    Ok(Some(v)) => batch.push(RtEvent::Updated(*id, v)),
714                    Err(e) => {
715                        error!("could not update exprid: {id:?}, panic: {e:?}")
716                    }
717                }
718            }
719        }
720        loop {
721            match self.sub.send_timeout(batch, Duration::from_millis(100)).await {
722                Ok(()) => break,
723                Err(SendTimeoutError::Closed(_)) => {
724                    error!("could not send batch");
725                    break;
726                }
727                Err(SendTimeoutError::Timeout(b)) => {
728                    batch = b;
729                    // prevent deadlock on input
730                    while let Ok(m) = to_rt.try_recv() {
731                        input.push(m);
732                    }
733                    self.process_input_batch(tasks, input, &mut batch).await;
734                }
735            }
736        }
737        self.event.clear();
738        self.updated.clear();
739        if self.ctx.user.batch.len() > 0 {
740            let batch = mem::replace(
741                &mut self.ctx.user.batch,
742                self.ctx.user.publisher.start_batch(),
743            );
744            let timeout = self.publish_timeout;
745            task::spawn(async move { batch.commit(timeout).await });
746        }
747    }
748
749    async fn process_input_batch(
750        &mut self,
751        tasks: &mut Vec<(BindId, Value)>,
752        input: &mut Vec<ToRt>,
753        batch: &mut Pooled<Vec<RtEvent>>,
754    ) {
755        for m in input.drain(..) {
756            match m {
757                ToRt::GetEnv { res } => {
758                    let _ = res.send(self.ctx.env.clone());
759                }
760                ToRt::Compile { text, rt, res } => {
761                    let _ = res.send(self.compile(rt, text).await);
762                }
763                ToRt::Load { path, rt, res } => {
764                    let _ = res.send(self.load(rt, &path).await);
765                }
766                ToRt::Delete { id } => {
767                    if let Some(mut n) = self.nodes.shift_remove(&id) {
768                        n.delete(&mut self.ctx);
769                    }
770                    debug!("delete {id:?}");
771                    batch.push(RtEvent::Env(self.ctx.env.clone()));
772                }
773                ToRt::CompileCallable { id, rt, res } => {
774                    let _ = res.send(self.compile_callable(id, rt));
775                }
776                ToRt::CompileRef { id, rt, res } => {
777                    let _ = res.send(self.compile_ref(rt, id));
778                }
779                ToRt::Set { id, v } => {
780                    self.ctx.cached.insert(id, v.clone());
781                    tasks.push((id, v))
782                }
783                ToRt::DeleteCallable { id } => self.delete_callable(id),
784                ToRt::Call { id, args } => {
785                    if let Err(e) = self.call_callable(id, args, tasks) {
786                        error!("calling callable {id:?} failed with {e:?}")
787                    }
788                }
789            }
790        }
791    }
792
793    fn cycle_ready(&self) -> bool {
794        self.ctx.user.var_updates.len() > 0
795            || self.ctx.user.net_updates.len() > 0
796            || self.ctx.user.net_writes.len() > 0
797            || self.ctx.user.rpc_overflow.len() > 0
798    }
799
800    async fn compile_root(&mut self, text: ArcStr) -> Result<()> {
801        let scope = ModPath::root();
802        let ori = Origin { parent: None, source: Source::Unspecified, text };
803        let exprs =
804            expr::parser::parse(ori.clone()).context("parsing the root module")?;
805        let exprs =
806            try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
807                .await
808                .context(CouldNotResolve)?;
809        let nodes = exprs
810            .iter()
811            .map(|e| {
812                compile(&mut self.ctx, &scope, e.clone())
813                    .with_context(|| format!("compiling root expression {e}"))
814            })
815            .collect::<Result<SmallVec<[_; 4]>>>()
816            .with_context(|| ori.clone())?;
817        for (e, n) in exprs.iter().zip(nodes.into_iter()) {
818            self.updated.insert(e.id, true);
819            self.nodes.insert(e.id, n);
820        }
821        Ok(())
822    }
823
824    async fn compile(&mut self, rt: GXHandle, text: ArcStr) -> Result<CompRes> {
825        let scope = ModPath::root();
826        let ori = Origin { parent: None, source: Source::Unspecified, text };
827        let exprs = expr::parser::parse(ori.clone())?;
828        let exprs =
829            try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
830                .await
831                .context(CouldNotResolve)?;
832        let nodes = exprs
833            .iter()
834            .map(|e| compile(&mut self.ctx, &scope, e.clone()))
835            .collect::<Result<SmallVec<[_; 4]>>>()
836            .with_context(|| ori.clone())?;
837        let exprs = exprs
838            .iter()
839            .zip(nodes.into_iter())
840            .map(|(e, n)| {
841                let output = is_output(&n);
842                let typ = n.typ().clone();
843                self.updated.insert(e.id, true);
844                self.nodes.insert(e.id, n);
845                CompExp { id: e.id, output, typ, rt: rt.clone() }
846            })
847            .collect::<SmallVec<[_; 1]>>();
848        Ok(CompRes { exprs, env: self.ctx.env.clone() })
849    }
850
851    async fn load(&mut self, rt: GXHandle, file: &PathBuf) -> Result<CompRes> {
852        let scope = ModPath::root();
853        let st = Instant::now();
854        let (ori, exprs) = match file.extension() {
855            Some(e) if e.as_encoded_bytes() == b"gx" => {
856                let file = file.canonicalize()?;
857                let s = fs::read_to_string(&file).await?;
858                let s = if s.starts_with("#!") {
859                    if let Some(i) = s.find('\n') {
860                        &s[i..]
861                    } else {
862                        s.as_str()
863                    }
864                } else {
865                    s.as_str()
866                };
867                let ori = Origin {
868                    parent: None,
869                    source: Source::File(file),
870                    text: ArcStr::from(s),
871                };
872                (ori.clone(), expr::parser::parse(ori)?)
873            }
874            Some(e) => bail!("invalid file extension {e:?}"),
875            None => {
876                let name = file
877                    .components()
878                    .map(|c| match c {
879                        Component::RootDir
880                        | Component::CurDir
881                        | Component::ParentDir
882                        | Component::Prefix(_) => bail!("invalid module name {file:?}"),
883                        Component::Normal(s) => Ok(s),
884                    })
885                    .collect::<Result<Box<[_]>>>()?;
886                if name.len() != 1 {
887                    bail!("invalid module name {file:?}")
888                }
889                let name = name[0].to_string_lossy();
890                let name = name
891                    .parse::<ModPath>()
892                    .with_context(|| "parsing module name {file:?}")?;
893                let name = Path::basename(&*name)
894                    .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
895                let name = ArcStr::from(name);
896                let ori = Origin {
897                    parent: None,
898                    source: Source::Internal(name.clone()),
899                    text: literal!(""),
900                };
901                let kind = ExprKind::Module {
902                    export: true,
903                    name,
904                    value: ModuleKind::Unresolved,
905                };
906                let exprs = Arc::from(vec![Expr {
907                    id: ExprId::new(),
908                    ori: Arc::new(ori.clone()),
909                    pos: SourcePosition::default(),
910                    kind,
911                }]);
912                (ori, exprs)
913            }
914        };
915        info!("parse time: {:?}", st.elapsed());
916        let st = Instant::now();
917        let exprs =
918            try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
919                .await
920                .context(CouldNotResolve)?;
921        info!("resolve time: {:?}", st.elapsed());
922        let mut res = smallvec![];
923        for e in exprs.iter() {
924            let top_id = e.id;
925            let n =
926                compile(&mut self.ctx, &scope, e.clone()).with_context(|| ori.clone())?;
927            let has_out = is_output(&n);
928            let typ = n.typ().clone();
929            self.nodes.insert(top_id, n);
930            self.updated.insert(top_id, true);
931            res.push(CompExp { id: top_id, output: has_out, typ, rt: rt.clone() })
932        }
933        Ok(CompRes { exprs: res, env: self.ctx.env.clone() })
934    }
935
936    fn compile_callable(&mut self, id: Value, rt: GXHandle) -> Result<Callable> {
937        let id = match id {
938            Value::U64(id) => LambdaId::from(id),
939            v => bail!("invalid lambda id {v}"),
940        };
941        let lb = self.ctx.env.lambdas.get(&id).and_then(Weak::upgrade);
942        let lb = lb.ok_or_else(|| anyhow!("unknown lambda {id:?}"))?;
943        let args = lb.typ.args.iter();
944        let args = args
945            .map(|a| {
946                if a.label.as_ref().map(|(_, opt)| *opt).unwrap_or(false) {
947                    bail!("can't call lambda with an optional argument from rust")
948                } else {
949                    Ok(BindId::new())
950                }
951            })
952            .collect::<Result<Box<[_]>>>()?;
953        let eid = ExprId::new();
954        let argn = lb.typ.args.iter().zip(args.iter());
955        let argn = argn
956            .map(|(arg, id)| genn::reference(&mut self.ctx, *id, arg.typ.clone(), eid))
957            .collect::<Vec<_>>();
958        let fnode = genn::constant(Value::U64(id.inner()));
959        let mut n = genn::apply(fnode, argn, lb.typ.clone(), eid);
960        self.event.init = true;
961        n.update(&mut self.ctx, &mut self.event);
962        self.event.clear();
963        let cid = CallableId::new();
964        self.callables.insert(cid, CallableInt { expr: eid, args });
965        self.nodes.insert(eid, n);
966        let env = self.ctx.env.clone();
967        Ok(Callable { expr: eid, rt, env, id: cid, typ: (*lb.typ).clone() })
968    }
969
970    fn compile_ref(&mut self, rt: GXHandle, id: BindId) -> Result<Ref> {
971        let eid = ExprId::new();
972        let typ = Type::Any;
973        let n = genn::reference(&mut self.ctx, id, typ, eid);
974        self.nodes.insert(eid, n);
975        let target_bid = self.ctx.env.byref_chain.get(&id).copied();
976        Ok(Ref {
977            id: eid,
978            bid: id,
979            target_bid,
980            last: self.ctx.cached.get(&id).cloned(),
981            rt,
982        })
983    }
984
985    fn call_callable(
986        &mut self,
987        id: CallableId,
988        args: ValArray,
989        tasks: &mut Vec<(BindId, Value)>,
990    ) -> Result<()> {
991        let c =
992            self.callables.get(&id).ok_or_else(|| anyhow!("unknown callable {id:?}"))?;
993        if args.len() != c.args.len() {
994            bail!("expected {} arguments", c.args.len());
995        }
996        let a = c.args.iter().zip(args.iter()).map(|(id, v)| (*id, v.clone()));
997        tasks.extend(a);
998        Ok(())
999    }
1000
1001    fn delete_callable(&mut self, id: CallableId) {
1002        if let Some(c) = self.callables.remove(&id) {
1003            if let Some(mut n) = self.nodes.shift_remove(&c.expr) {
1004                n.delete(&mut self.ctx)
1005            }
1006        }
1007    }
1008
1009    async fn run(mut self, mut to_rt: tmpsc::UnboundedReceiver<ToRt>) -> Result<()> {
1010        let mut tasks = vec![];
1011        let mut input = vec![];
1012        let mut rpcs = vec![];
1013        let onemin = Duration::from_secs(60);
1014        'main: loop {
1015            let now = Instant::now();
1016            let ready = self.cycle_ready();
1017            let mut updates = None;
1018            let mut writes = None;
1019            macro_rules! peek {
1020                (updates) => {
1021                    if self.ctx.user.net_updates.is_empty() {
1022                        while let Ok(Some(mut up)) = self.ctx.user.updates.try_next() {
1023                            match &mut updates {
1024                                None => updates = Some(up),
1025                                Some(prev) => prev.extend(up.drain(..)),
1026                            }
1027                        }
1028                    }
1029                };
1030                (writes) => {
1031                    if self.ctx.user.net_writes.is_empty() {
1032                        if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
1033                            writes = Some(wr);
1034                        }
1035                    }
1036                };
1037                (tasks) => {
1038                    while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
1039                        tasks.push(up);
1040                    }
1041                };
1042                (rpcs) => {
1043                    if self.ctx.user.rpc_overflow.is_empty() {
1044                        while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
1045                            rpcs.push(up);
1046                        }
1047                    }
1048                };
1049                (input) => {
1050                    while let Ok(m) = to_rt.try_recv() {
1051                        input.push(m);
1052                    }
1053                };
1054                ($($item:tt),+) => {{
1055                    $(peek!($item));+
1056                }};
1057            }
1058            select! {
1059                rp = maybe_next(
1060                    self.ctx.user.rpc_overflow.is_empty(),
1061                    &mut self.ctx.user.rpcs
1062                ) => {
1063                    rpcs.push(rp);
1064                    peek!(updates, tasks, writes, rpcs, input)
1065                }
1066                wr = maybe_next(
1067                    self.ctx.user.net_writes.is_empty(),
1068                    &mut self.ctx.user.writes
1069                ) => {
1070                    writes = Some(wr);
1071                    peek!(updates, tasks, rpcs, input);
1072                },
1073                up = maybe_next(
1074                    self.ctx.user.net_updates.is_empty(),
1075                    &mut self.ctx.user.updates
1076                ) => {
1077                    updates = Some(up);
1078                    peek!(updates, writes, tasks, rpcs, input);
1079                },
1080                up = join_or_wait(&mut self.ctx.user.tasks) => {
1081                    if let Ok(up) = up {
1082                        tasks.push(up);
1083                    }
1084                    peek!(updates, writes, tasks, rpcs, input)
1085                },
1086                _ = or_never(ready) => {
1087                    peek!(updates, writes, tasks, rpcs, input)
1088                },
1089                n = to_rt.recv_many(&mut input, 100000) => {
1090                    if n == 0 {
1091                        break 'main Ok(())
1092                    }
1093                    peek!(updates, writes, tasks, rpcs);
1094                },
1095                () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
1096                    while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
1097                        if ts.elapsed() >= Duration::from_secs(1) {
1098                            self.ctx.user.pending_unsubscribe.pop_front();
1099                        } else {
1100                            break
1101                        }
1102                    }
1103                    continue 'main
1104                },
1105            }
1106            let mut batch = BATCH.take();
1107            self.process_input_batch(&mut tasks, &mut input, &mut batch).await;
1108            self.do_cycle(
1109                updates, writes, &mut tasks, &mut rpcs, &mut to_rt, &mut input, batch,
1110            )
1111            .await;
1112            if !self.ctx.user.rpc_clients.is_empty() {
1113                if now - self.last_rpc_gc >= onemin {
1114                    self.last_rpc_gc = now;
1115                    self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
1116                }
1117            }
1118        }
1119    }
1120}
1121
1122/// A handle to a running GX instance. Drop the handle to shutdown the
1123/// associated background tasks.
1124#[derive(Clone)]
1125pub struct GXHandle(tmpsc::UnboundedSender<ToRt>);
1126
1127impl GXHandle {
1128    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
1129        let (tx, rx) = oneshot::channel();
1130        self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
1131        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
1132    }
1133
1134    /// Get a copy of the current graphix environment
1135    pub async fn get_env(&self) -> Result<Env<GXCtx, NoUserEvent>> {
1136        self.exec(|res| ToRt::GetEnv { res }).await
1137    }
1138
1139    /// Compile and execute the specified graphix expression. If it generates
1140    /// results, they will be sent to all the channels that are subscribed. When
1141    /// the `CompExp` objects contained in the `CompRes` are dropped their
1142    /// corresponding expressions will be deleted. Therefore, you can stop
1143    /// execution of the whole expression by dropping the returned `CompRes`.
1144    pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
1145        Ok(self.exec(|tx| ToRt::Compile { text, res: tx, rt: self.clone() }).await??)
1146    }
1147
1148    /// Load and execute the specified graphix module. The path may have one of
1149    /// two forms. If it is the path to a file with extension .bs then the rt
1150    /// will load the file directly. If it is a modpath (e.g. foo::bar::baz)
1151    /// then the module resolver will look for a matching module in the modpath.
1152    /// When the `CompExp` objects contained in the `CompRes` are dropped their
1153    /// corresponding expressions will be deleted. Therefore, you can stop
1154    /// execution of the whole file by dropping the returned `CompRes`.
1155    pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
1156        Ok(self.exec(|tx| ToRt::Load { path, res: tx, rt: self.clone() }).await??)
1157    }
1158
1159    /// Compile a callable interface to the specified lambda id. This is how you
1160    /// call a lambda directly from rust. When the returned `Callable` is
1161    /// dropped the associated callsite will be delete.
1162    pub async fn compile_callable(&self, id: Value) -> Result<Callable> {
1163        Ok(self
1164            .exec(|tx| ToRt::CompileCallable { id, rt: self.clone(), res: tx })
1165            .await??)
1166    }
1167
1168    /// Compile an expression that will output the value of the ref specifed by
1169    /// id. This is the same as the deref (*) operator in graphix. When the
1170    /// returned `Ref` is dropped the compiled code will be deleted.
1171    pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref> {
1172        Ok(self
1173            .exec(|tx| ToRt::CompileRef { id: id.into(), res: tx, rt: self.clone() })
1174            .await??)
1175    }
1176
1177    /// Set the variable idenfified by `id` to `v`, triggering updates of all
1178    /// dependent node trees.
1179    pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
1180        let v = v.into();
1181        self.0.send(ToRt::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
1182    }
1183}
1184
1185#[derive(Builder)]
1186#[builder(pattern = "owned")]
1187pub struct GXConfig {
1188    /// The subscribe timeout to use when resolving modules in
1189    /// netidx. Resolution will fail if the subscription does not
1190    /// succeed before this timeout elapses.
1191    #[builder(setter(strip_option), default)]
1192    resolve_timeout: Option<Duration>,
1193    /// The publish timeout to use when sending published batches. Default None.
1194    #[builder(setter(strip_option), default)]
1195    publish_timeout: Option<Duration>,
1196    /// The execution context with any builtins already registered
1197    ctx: ExecCtx<GXCtx, NoUserEvent>,
1198    /// The text of the root module
1199    #[builder(setter(strip_option), default)]
1200    root: Option<ArcStr>,
1201    /// The set of module resolvers to use when resolving loaded modules
1202    #[builder(default)]
1203    resolvers: Vec<ModuleResolver>,
1204    /// The channel that will receive events from the runtime
1205    sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1206}
1207
1208impl GXConfig {
1209    /// Create a new config
1210    pub fn builder(
1211        ctx: ExecCtx<GXCtx, NoUserEvent>,
1212        sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1213    ) -> GXConfigBuilder {
1214        GXConfigBuilder::default().ctx(ctx).sub(sub)
1215    }
1216
1217    /// Start the graphix runtime with the specified config, return a
1218    /// handle capable of interacting with it.
1219    ///
1220    /// root is the text of the root module you wish to initially
1221    /// load. This will define the environment for the rest of the
1222    /// code compiled by this runtime. The runtime starts completely
1223    /// empty, with only the language, no core library, no standard
1224    /// library. To build a runtime with the full standard library and
1225    /// nothing else simply pass the output of
1226    /// `graphix_stdlib::register` to start.
1227    pub async fn start(self) -> Result<GXHandle> {
1228        let (init_tx, init_rx) = oneshot::channel();
1229        let (tx, rx) = tmpsc::unbounded_channel();
1230        task::spawn(async move {
1231            match GX::new(self).await {
1232                Ok(bs) => {
1233                    let _ = init_tx.send(Ok(()));
1234                    if let Err(e) = bs.run(rx).await {
1235                        error!("run loop exited with error {e:?}")
1236                    }
1237                }
1238                Err(e) => {
1239                    let _ = init_tx.send(Err(e));
1240                }
1241            };
1242        });
1243        init_rx.await??;
1244        Ok(GXHandle(tx))
1245    }
1246}