graphix_rt/
lib.rs

1/// A general purpose graphix runtime
2///
3/// This module implements a generic graphix runtime suitable for most
4/// applications, including applications that implement custom graphix
5/// builtins. The graphix interperter is run in a background task, and
6/// can be interacted with via a handle. All features of the standard
7/// library are supported by this runtime.
8///
9/// In special cases where this runtime is not suitable for your
10/// application you can implement your own, see the [Ctx] and
11/// [UserEvent] traits.
12use anyhow::{anyhow, bail, Context, Result};
13use arcstr::{literal, ArcStr};
14use chrono::prelude::*;
15use compact_str::format_compact;
16use core::fmt;
17use derive_builder::Builder;
18use futures::{channel::mpsc, future::join_all, FutureExt, StreamExt};
19use fxhash::{FxBuildHasher, FxHashMap};
20use graphix_compiler::{
21    compile,
22    env::Env,
23    expr::{
24        self, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin, SourceOrigin,
25    },
26    node::genn,
27    typ::{FnType, Type},
28    BindId, Ctx, Event, ExecCtx, LambdaId, NoUserEvent, Node, REFS,
29};
30use indexmap::IndexMap;
31use log::{debug, error, info};
32use netidx::{
33    path::Path,
34    pool::{Pool, Pooled},
35    protocol::valarray::ValArray,
36    publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
37    resolver_client::ChangeTracker,
38    subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
39};
40use netidx_core::atomic_id;
41use netidx_protocols::rpc::{
42    self,
43    server::{ArgSpec, RpcCall},
44};
45use serde_derive::{Deserialize, Serialize};
46use smallvec::{smallvec, SmallVec};
47use std::{
48    collections::{hash_map::Entry, HashMap, VecDeque},
49    future, mem,
50    os::unix::ffi::OsStrExt,
51    panic::{catch_unwind, AssertUnwindSafe},
52    path::{Component, PathBuf},
53    result,
54    sync::{LazyLock, Weak},
55    time::Duration,
56};
57use tokio::{
58    fs, select,
59    sync::{
60        mpsc::{self as tmpsc, error::SendTimeoutError, UnboundedReceiver},
61        oneshot, Mutex,
62    },
63    task::{self, JoinError, JoinSet},
64    time::{self, Instant},
65};
66use triomphe::Arc;
67
68type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
69type WriteBatch = Pooled<Vec<WriteRequest>>;
70
71#[derive(Debug)]
72pub struct CouldNotResolve;
73
74impl fmt::Display for CouldNotResolve {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        write!(f, "could not resolve module")
77    }
78}
79
80#[derive(Debug)]
81struct RpcClient {
82    proc: rpc::client::Proc,
83    last_used: Instant,
84}
85
86#[derive(Debug)]
87pub struct GXCtx {
88    by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
89    subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
90    published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
91    var_updates: VecDeque<(BindId, Value)>,
92    net_updates: VecDeque<(SubId, subscriber::Event)>,
93    net_writes: VecDeque<(Id, WriteRequest)>,
94    rpc_overflow: VecDeque<(BindId, RpcCall)>,
95    rpc_clients: FxHashMap<Path, RpcClient>,
96    published_rpcs: FxHashMap<Path, rpc::server::Proc>,
97    pending_unsubscribe: VecDeque<(Instant, Dval)>,
98    change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
99    tasks: JoinSet<(BindId, Value)>,
100    batch: publisher::UpdateBatch,
101    publisher: Publisher,
102    subscriber: Subscriber,
103    updates_tx: mpsc::Sender<UpdateBatch>,
104    updates: mpsc::Receiver<UpdateBatch>,
105    writes_tx: mpsc::Sender<WriteBatch>,
106    writes: mpsc::Receiver<WriteBatch>,
107    rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
108    rpcs: mpsc::Receiver<(BindId, RpcCall)>,
109}
110
111impl GXCtx {
112    pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
113        let (updates_tx, updates) = mpsc::channel(100);
114        let (writes_tx, writes) = mpsc::channel(100);
115        let (rpcs_tx, rpcs) = mpsc::channel(100);
116        let batch = publisher.start_batch();
117        let mut tasks = JoinSet::new();
118        tasks.spawn(async { future::pending().await });
119        Self {
120            by_ref: HashMap::default(),
121            var_updates: VecDeque::new(),
122            net_updates: VecDeque::new(),
123            net_writes: VecDeque::new(),
124            rpc_overflow: VecDeque::new(),
125            rpc_clients: HashMap::default(),
126            subscribed: HashMap::default(),
127            pending_unsubscribe: VecDeque::new(),
128            published: HashMap::default(),
129            change_trackers: HashMap::default(),
130            published_rpcs: HashMap::default(),
131            tasks,
132            batch,
133            publisher,
134            subscriber,
135            updates,
136            updates_tx,
137            writes,
138            writes_tx,
139            rpcs_tx,
140            rpcs,
141        }
142    }
143}
144
145macro_rules! or_err {
146    ($bindid:expr, $e:expr) => {
147        match $e {
148            Ok(v) => v,
149            Err(e) => {
150                let e = ArcStr::from(format_compact!("{e:?}").as_str());
151                let e = Value::Error(e);
152                return ($bindid, e);
153            }
154        }
155    };
156}
157
158macro_rules! check_changed {
159    ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
160        let mut ct = $ct.lock().await;
161        if ct.path() != &$path {
162            *ct = ChangeTracker::new($path.clone());
163        }
164        if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
165            return ($id, Value::Null);
166        }
167    };
168}
169
170impl Ctx for GXCtx {
171    fn clear(&mut self) {
172        let Self {
173            by_ref,
174            var_updates,
175            net_updates,
176            net_writes,
177            rpc_clients,
178            rpc_overflow,
179            subscribed,
180            published,
181            published_rpcs,
182            pending_unsubscribe,
183            change_trackers,
184            tasks,
185            batch,
186            publisher,
187            subscriber: _,
188            updates_tx,
189            updates,
190            writes_tx,
191            writes,
192            rpcs,
193            rpcs_tx,
194        } = self;
195        by_ref.clear();
196        var_updates.clear();
197        net_updates.clear();
198        net_writes.clear();
199        rpc_overflow.clear();
200        rpc_clients.clear();
201        subscribed.clear();
202        published.clear();
203        published_rpcs.clear();
204        pending_unsubscribe.clear();
205        change_trackers.clear();
206        *tasks = JoinSet::new();
207        tasks.spawn(async { future::pending().await });
208        *batch = publisher.start_batch();
209        let (tx, rx) = mpsc::channel(3);
210        *updates_tx = tx;
211        *updates = rx;
212        let (tx, rx) = mpsc::channel(100);
213        *writes_tx = tx;
214        *writes = rx;
215        let (tx, rx) = mpsc::channel(100);
216        *rpcs_tx = tx;
217        *rpcs = rx
218    }
219
220    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
221        let now = Instant::now();
222        let proc = match self.rpc_clients.entry(name) {
223            Entry::Occupied(mut e) => {
224                let cl = e.get_mut();
225                cl.last_used = now;
226                Ok(cl.proc.clone())
227            }
228            Entry::Vacant(e) => {
229                match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
230                    Err(e) => Err(e),
231                    Ok(proc) => {
232                        let cl = RpcClient { last_used: now, proc: proc.clone() };
233                        e.insert(cl);
234                        Ok(proc)
235                    }
236                }
237            }
238        };
239        self.tasks.spawn(async move {
240            macro_rules! err {
241                ($e:expr) => {{
242                    let e = format_compact!("{:?}", $e);
243                    (id, Value::Error(e.as_str().into()))
244                }};
245            }
246            match proc {
247                Err(e) => err!(e),
248                Ok(proc) => match proc.call(args).await {
249                    Err(e) => err!(e),
250                    Ok(res) => (id, res),
251                },
252            }
253        });
254    }
255
256    fn publish_rpc(
257        &mut self,
258        name: Path,
259        doc: Value,
260        spec: Vec<ArgSpec>,
261        id: BindId,
262    ) -> Result<()> {
263        use rpc::server::Proc;
264        let e = match self.published_rpcs.entry(name) {
265            Entry::Vacant(e) => e,
266            Entry::Occupied(_) => bail!("already published"),
267        };
268        let proc = Proc::new(
269            &self.publisher,
270            e.key().clone(),
271            doc,
272            spec,
273            move |c| Some((id, c)),
274            Some(self.rpcs_tx.clone()),
275        )?;
276        e.insert(proc);
277        Ok(())
278    }
279
280    fn unpublish_rpc(&mut self, name: Path) {
281        self.published_rpcs.remove(&name);
282    }
283
284    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
285        let dval =
286            self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
287        *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
288        dval
289    }
290
291    fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
292        if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
293            if let Some(cn) = exprs.get_mut(&ref_by) {
294                *cn -= 1;
295                if *cn == 0 {
296                    exprs.remove(&ref_by);
297                }
298            }
299            if exprs.is_empty() {
300                self.subscribed.remove(&dv.id());
301            }
302        }
303        self.pending_unsubscribe.push_back((Instant::now(), dv));
304    }
305
306    fn list(&mut self, id: BindId, path: Path) {
307        let ct = self
308            .change_trackers
309            .entry(id)
310            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
311        let ct = Arc::clone(ct);
312        let resolver = self.subscriber.resolver();
313        self.tasks.spawn(async move {
314            check_changed!(id, resolver, path, ct);
315            let mut paths = or_err!(id, resolver.list(path).await);
316            let paths = paths.drain(..).map(|p| Value::String(p.into()));
317            (id, Value::Array(ValArray::from_iter_exact(paths)))
318        });
319    }
320
321    fn list_table(&mut self, id: BindId, path: Path) {
322        let ct = self
323            .change_trackers
324            .entry(id)
325            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
326        let ct = Arc::clone(ct);
327        let resolver = self.subscriber.resolver();
328        self.tasks.spawn(async move {
329            check_changed!(id, resolver, path, ct);
330            let mut tbl = or_err!(id, resolver.table(path).await);
331            let cols = tbl.cols.drain(..).map(|(name, count)| {
332                Value::Array(ValArray::from([
333                    Value::String(name.into()),
334                    Value::V64(count.0),
335                ]))
336            });
337            let cols = Value::Array(ValArray::from_iter_exact(cols));
338            let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
339            let rows = Value::Array(ValArray::from_iter_exact(rows));
340            let tbl = Value::Array(ValArray::from([
341                Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
342                Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
343            ]));
344            (id, tbl)
345        });
346    }
347
348    fn stop_list(&mut self, id: BindId) {
349        self.change_trackers.remove(&id);
350    }
351
352    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
353        let val = self.publisher.publish_with_flags_and_writes(
354            PublishFlags::empty(),
355            path,
356            value,
357            Some(self.writes_tx.clone()),
358        )?;
359        let id = val.id();
360        *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
361        Ok(val)
362    }
363
364    fn update(&mut self, val: &Val, value: Value) {
365        val.update(&mut self.batch, value);
366    }
367
368    fn unpublish(&mut self, val: Val, ref_by: ExprId) {
369        if let Some(refs) = self.published.get_mut(&val.id()) {
370            if let Some(cn) = refs.get_mut(&ref_by) {
371                *cn -= 1;
372                if *cn == 0 {
373                    refs.remove(&ref_by);
374                }
375            }
376            if refs.is_empty() {
377                self.published.remove(&val.id());
378            }
379        }
380    }
381
382    fn set_timer(&mut self, id: BindId, timeout: Duration) {
383        self.tasks
384            .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
385    }
386
387    fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
388        *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
389    }
390
391    fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
392        if let Some(refs) = self.by_ref.get_mut(&id) {
393            if let Some(cn) = refs.get_mut(&ref_by) {
394                *cn -= 1;
395                if *cn == 0 {
396                    refs.remove(&ref_by);
397                }
398            }
399            if refs.is_empty() {
400                self.by_ref.remove(&id);
401            }
402        }
403    }
404
405    fn set_var(&mut self, id: BindId, value: Value) {
406        self.var_updates.push_back((id, value.clone()));
407    }
408}
409
410fn is_output(n: &Node<GXCtx, NoUserEvent>) -> bool {
411    match &n.spec().kind {
412        ExprKind::Bind { .. }
413        | ExprKind::Lambda { .. }
414        | ExprKind::Use { .. }
415        | ExprKind::Connect { .. }
416        | ExprKind::Module { .. }
417        | ExprKind::TypeDef { .. } => false,
418        _ => true,
419    }
420}
421
422async fn or_never(b: bool) {
423    if !b {
424        future::pending().await
425    }
426}
427
428async fn join_or_wait(
429    js: &mut JoinSet<(BindId, Value)>,
430) -> result::Result<(BindId, Value), JoinError> {
431    match js.join_next().await {
432        None => future::pending().await,
433        Some(r) => r,
434    }
435}
436
437async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
438    if go {
439        match ch.next().await {
440            None => future::pending().await,
441            Some(v) => v,
442        }
443    } else {
444        future::pending().await
445    }
446}
447
448async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
449    if pending.len() == 0 {
450        future::pending().await
451    } else {
452        let (ts, _) = pending.front().unwrap();
453        let one = Duration::from_secs(1);
454        let elapsed = now - *ts;
455        if elapsed < one {
456            time::sleep(one - elapsed).await
457        }
458    }
459}
460
461pub struct CompExp {
462    pub id: ExprId,
463    pub typ: Type,
464    pub output: bool,
465    rt: GXHandle,
466}
467
468impl Drop for CompExp {
469    fn drop(&mut self) {
470        let _ = self.rt.0.send(ToRt::Delete { id: self.id });
471    }
472}
473
474pub struct CompRes {
475    pub exprs: SmallVec<[CompExp; 1]>,
476    pub env: Env<GXCtx, NoUserEvent>,
477}
478
479#[derive(Clone)]
480pub enum RtEvent {
481    Updated(ExprId, Value),
482    Env(Env<GXCtx, NoUserEvent>),
483}
484
485static BATCH: LazyLock<Pool<Vec<RtEvent>>> = LazyLock::new(|| Pool::new(10, 1000000));
486
487pub struct Ref {
488    pub id: ExprId,
489    pub last: Option<Value>,
490    pub bid: BindId,
491    pub target_bid: Option<BindId>,
492    rt: GXHandle,
493}
494
495impl Drop for Ref {
496    fn drop(&mut self) {
497        let _ = self.rt.0.send(ToRt::Delete { id: self.id });
498    }
499}
500
501impl Ref {
502    pub fn set(&self, v: Value) -> Result<()> {
503        self.rt.set(self.bid, v)
504    }
505
506    pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
507        if let Some(id) = self.target_bid {
508            self.rt.set(id, v)?
509        }
510        Ok(())
511    }
512}
513
514atomic_id!(CallableId);
515
516pub struct Callable {
517    rt: GXHandle,
518    id: CallableId,
519    env: Env<GXCtx, NoUserEvent>,
520    pub typ: FnType,
521    pub expr: ExprId,
522}
523
524impl Drop for Callable {
525    fn drop(&mut self) {
526        let _ = self.rt.0.send(ToRt::DeleteCallable { id: self.id });
527    }
528}
529
530impl Callable {
531    /// Call the lambda with args. Argument types and arity will be
532    /// checked and an error will be returned if they are wrong.
533    pub async fn call(&self, args: ValArray) -> Result<()> {
534        if self.typ.args.len() != args.len() {
535            bail!("expected {} args", self.typ.args.len())
536        }
537        for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
538            if !a.typ.is_a(&self.env, v) {
539                bail!("type mismatch arg {i} expected {}", a.typ)
540            }
541        }
542        self.call_unchecked(args).await
543    }
544
545    /// Call the lambda with args. Argument types and arity will NOT
546    /// be checked. This can result in a runtime panic, invalid
547    /// results, and probably other bad things.
548    pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
549        self.rt
550            .0
551            .send(ToRt::Call { id: self.id, args })
552            .map_err(|_| anyhow!("runtime is dead"))
553    }
554}
555
556enum ToRt {
557    GetEnv { res: oneshot::Sender<Env<GXCtx, NoUserEvent>> },
558    Delete { id: ExprId },
559    Load { path: PathBuf, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
560    Compile { text: ArcStr, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
561    CompileCallable { id: Value, rt: GXHandle, res: oneshot::Sender<Result<Callable>> },
562    CompileRef { id: BindId, rt: GXHandle, res: oneshot::Sender<Result<Ref>> },
563    Set { id: BindId, v: Value },
564    Call { id: CallableId, args: ValArray },
565    DeleteCallable { id: CallableId },
566}
567
568struct CallableInt {
569    expr: ExprId,
570    args: Box<[BindId]>,
571}
572
573struct GX {
574    ctx: ExecCtx<GXCtx, NoUserEvent>,
575    event: Event<NoUserEvent>,
576    updated: FxHashMap<ExprId, bool>,
577    nodes: IndexMap<ExprId, Node<GXCtx, NoUserEvent>, FxBuildHasher>,
578    callables: FxHashMap<CallableId, CallableInt>,
579    sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
580    resolvers: Arc<[ModuleResolver]>,
581    publish_timeout: Option<Duration>,
582    last_rpc_gc: Instant,
583}
584
585impl GX {
586    async fn new(mut rt: GXConfig) -> Result<Self> {
587        let resolvers_default = |r: &mut Vec<ModuleResolver>| match dirs::data_dir() {
588            None => r.push(ModuleResolver::Files("".into())),
589            Some(dd) => {
590                r.push(ModuleResolver::Files("".into()));
591                r.push(ModuleResolver::Files(dd.join("graphix")));
592            }
593        };
594        match std::env::var("GRAPHIX_MODPATH") {
595            Err(_) => resolvers_default(&mut rt.resolvers),
596            Ok(mp) => match ModuleResolver::parse_env(
597                rt.ctx.user.subscriber.clone(),
598                rt.resolve_timeout,
599                &mp,
600            ) {
601                Ok(r) => rt.resolvers.extend(r),
602                Err(e) => {
603                    error!("failed to parse GRAPHIX_MODPATH, using default {e:?}");
604                    resolvers_default(&mut rt.resolvers)
605                }
606            },
607        };
608        let mut t = Self {
609            ctx: rt.ctx,
610            event: Event::new(NoUserEvent),
611            updated: HashMap::default(),
612            nodes: IndexMap::default(),
613            callables: HashMap::default(),
614            sub: rt.sub,
615            resolvers: Arc::from(rt.resolvers),
616            publish_timeout: rt.publish_timeout,
617            last_rpc_gc: Instant::now(),
618        };
619        let st = Instant::now();
620        if let Some(root) = rt.root {
621            t.compile_root(root).await?;
622        }
623        info!("root init time: {:?}", st.elapsed());
624        Ok(t)
625    }
626
627    async fn do_cycle(
628        &mut self,
629        updates: Option<UpdateBatch>,
630        writes: Option<WriteBatch>,
631        tasks: &mut Vec<(BindId, Value)>,
632        rpcs: &mut Vec<(BindId, RpcCall)>,
633        to_rt: &mut UnboundedReceiver<ToRt>,
634        input: &mut Vec<ToRt>,
635        mut batch: Pooled<Vec<RtEvent>>,
636    ) {
637        macro_rules! push_event {
638            ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
639                match self.event.$event.entry($id) {
640                    Entry::Vacant(e) => {
641                        e.insert($v);
642                        if let Some(exps) = self.ctx.user.$refed.get(&$id) {
643                            for id in exps.keys() {
644                                self.updated.entry(*id).or_insert(false);
645                            }
646                        }
647                    }
648                    Entry::Occupied(_) => {
649                        self.ctx.user.$overflow.push_back(($id, $v));
650                    }
651                }
652            };
653        }
654        for _ in 0..self.ctx.user.var_updates.len() {
655            let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
656            push_event!(id, v, variables, by_ref, var_updates)
657        }
658        for (id, v) in tasks.drain(..) {
659            push_event!(id, v, variables, by_ref, var_updates)
660        }
661        for _ in 0..self.ctx.user.rpc_overflow.len() {
662            let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
663            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
664        }
665        for (id, v) in rpcs.drain(..) {
666            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
667        }
668        for _ in 0..self.ctx.user.net_updates.len() {
669            let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
670            push_event!(id, v, netidx, subscribed, net_updates)
671        }
672        if let Some(mut updates) = updates {
673            for (id, v) in updates.drain(..) {
674                push_event!(id, v, netidx, subscribed, net_updates)
675            }
676        }
677        for _ in 0..self.ctx.user.net_writes.len() {
678            let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
679            push_event!(id, v, writes, published, net_writes)
680        }
681        if let Some(mut writes) = writes {
682            for wr in writes.drain(..) {
683                let id = wr.id;
684                push_event!(id, wr, writes, published, net_writes)
685            }
686        }
687        for (id, n) in self.nodes.iter_mut() {
688            if let Some(init) = self.updated.get(id) {
689                let mut clear: SmallVec<[BindId; 16]> = smallvec![];
690                self.event.init = *init;
691                if self.event.init {
692                    REFS.with_borrow_mut(|refs| {
693                        refs.clear();
694                        n.refs(refs);
695                        refs.with_external_refs(|id| {
696                            if let Some(v) = self.ctx.cached.get(&id) {
697                                if let Entry::Vacant(e) = self.event.variables.entry(id) {
698                                    e.insert(v.clone());
699                                    clear.push(id);
700                                }
701                            }
702                        });
703                    });
704                }
705                let res = catch_unwind(AssertUnwindSafe(|| {
706                    n.update(&mut self.ctx, &mut self.event)
707                }));
708                for id in clear {
709                    self.event.variables.remove(&id);
710                }
711                match res {
712                    Ok(None) => (),
713                    Ok(Some(v)) => batch.push(RtEvent::Updated(*id, v)),
714                    Err(e) => {
715                        error!("could not update exprid: {id:?}, panic: {e:?}")
716                    }
717                }
718            }
719        }
720        loop {
721            match self.sub.send_timeout(batch, Duration::from_millis(100)).await {
722                Ok(()) => break,
723                Err(SendTimeoutError::Closed(_)) => {
724                    error!("could not send batch");
725                    break;
726                }
727                Err(SendTimeoutError::Timeout(b)) => {
728                    batch = b;
729                    // prevent deadlock on input
730                    while let Ok(m) = to_rt.try_recv() {
731                        input.push(m);
732                    }
733                    self.process_input_batch(tasks, input, &mut batch).await;
734                }
735            }
736        }
737        self.event.clear();
738        self.updated.clear();
739        if self.ctx.user.batch.len() > 0 {
740            let batch = mem::replace(
741                &mut self.ctx.user.batch,
742                self.ctx.user.publisher.start_batch(),
743            );
744            let timeout = self.publish_timeout;
745            task::spawn(async move { batch.commit(timeout).await });
746        }
747    }
748
749    async fn process_input_batch(
750        &mut self,
751        tasks: &mut Vec<(BindId, Value)>,
752        input: &mut Vec<ToRt>,
753        batch: &mut Pooled<Vec<RtEvent>>,
754    ) {
755        for m in input.drain(..) {
756            match m {
757                ToRt::GetEnv { res } => {
758                    let _ = res.send(self.ctx.env.clone());
759                }
760                ToRt::Compile { text, rt, res } => {
761                    let _ = res.send(self.compile(rt, text).await);
762                }
763                ToRt::Load { path, rt, res } => {
764                    let _ = res.send(self.load(rt, &path).await);
765                }
766                ToRt::Delete { id } => {
767                    if let Some(mut n) = self.nodes.shift_remove(&id) {
768                        n.delete(&mut self.ctx);
769                    }
770                    debug!("delete {id:?}");
771                    batch.push(RtEvent::Env(self.ctx.env.clone()));
772                }
773                ToRt::CompileCallable { id, rt, res } => {
774                    let _ = res.send(self.compile_callable(id, rt));
775                }
776                ToRt::CompileRef { id, rt, res } => {
777                    let _ = res.send(self.compile_ref(rt, id));
778                }
779                ToRt::Set { id, v } => {
780                    self.ctx.cached.insert(id, v.clone());
781                    tasks.push((id, v))
782                }
783                ToRt::DeleteCallable { id } => self.delete_callable(id),
784                ToRt::Call { id, args } => {
785                    if let Err(e) = self.call_callable(id, args, tasks) {
786                        error!("calling callable {id:?} failed with {e:?}")
787                    }
788                }
789            }
790        }
791    }
792
793    fn cycle_ready(&self) -> bool {
794        self.ctx.user.var_updates.len() > 0
795            || self.ctx.user.net_updates.len() > 0
796            || self.ctx.user.net_writes.len() > 0
797            || self.ctx.user.rpc_overflow.len() > 0
798    }
799
800    async fn compile_root(&mut self, text: ArcStr) -> Result<()> {
801        let scope = ModPath::root();
802        let ori = expr::parser::parse(SourceOrigin::Unspecified, text.clone())
803            .context("parsing the root module")?;
804        let exprs =
805            join_all(ori.exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
806                .await
807                .into_iter()
808                .collect::<Result<SmallVec<[_; 4]>>>()
809                .context(CouldNotResolve)?;
810        let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
811        let nodes = ori
812            .exprs
813            .iter()
814            .map(|e| {
815                compile(&mut self.ctx, &scope, e.clone())
816                    .with_context(|| format!("compiling root expression {e}"))
817            })
818            .collect::<Result<SmallVec<[_; 4]>>>()
819            .with_context(|| ori.clone())?;
820        for (e, n) in ori.exprs.iter().zip(nodes.into_iter()) {
821            self.updated.insert(e.id, true);
822            self.nodes.insert(e.id, n);
823        }
824        Ok(())
825    }
826
827    async fn compile(&mut self, rt: GXHandle, text: ArcStr) -> Result<CompRes> {
828        let scope = ModPath::root();
829        let ori = expr::parser::parse(SourceOrigin::Unspecified, text.clone())?;
830        let exprs =
831            join_all(ori.exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
832                .await
833                .into_iter()
834                .collect::<Result<SmallVec<[_; 4]>>>()
835                .context(CouldNotResolve)?;
836        let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
837        let nodes = ori
838            .exprs
839            .iter()
840            .map(|e| compile(&mut self.ctx, &scope, e.clone()))
841            .collect::<Result<SmallVec<[_; 4]>>>()
842            .with_context(|| ori.clone())?;
843        let exprs = ori
844            .exprs
845            .iter()
846            .zip(nodes.into_iter())
847            .map(|(e, n)| {
848                let output = is_output(&n);
849                let typ = n.typ().clone();
850                self.updated.insert(e.id, true);
851                self.nodes.insert(e.id, n);
852                CompExp { id: e.id, output, typ, rt: rt.clone() }
853            })
854            .collect::<SmallVec<[_; 1]>>();
855        Ok(CompRes { exprs, env: self.ctx.env.clone() })
856    }
857
858    async fn load(&mut self, rt: GXHandle, file: &PathBuf) -> Result<CompRes> {
859        let scope = ModPath::root();
860        let st = Instant::now();
861        let ori = match file.extension() {
862            Some(e) if e.as_bytes() == b"gx" => {
863                let file = file.canonicalize()?;
864                let s = fs::read_to_string(&file).await?;
865                let s = if s.starts_with("#!") {
866                    if let Some(i) = s.find('\n') {
867                        &s[i..]
868                    } else {
869                        s.as_str()
870                    }
871                } else {
872                    s.as_str()
873                };
874                let s = ArcStr::from(s);
875                expr::parser::parse(SourceOrigin::File(file), s)?
876            }
877            Some(e) => bail!("invalid file extension {e:?}"),
878            None => {
879                let name = file
880                    .components()
881                    .map(|c| match c {
882                        Component::RootDir
883                        | Component::CurDir
884                        | Component::ParentDir
885                        | Component::Prefix(_) => bail!("invalid module name {file:?}"),
886                        Component::Normal(s) => Ok(s),
887                    })
888                    .collect::<Result<Box<[_]>>>()?;
889                if name.len() != 1 {
890                    bail!("invalid module name {file:?}")
891                }
892                let name = String::from_utf8_lossy(name[0].as_bytes());
893                let name = name
894                    .parse::<ModPath>()
895                    .with_context(|| "parsing module name {file:?}")?;
896                let name = Path::basename(&*name)
897                    .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
898                let name = ArcStr::from(name);
899                let e = ExprKind::Module {
900                    export: true,
901                    name: name.clone(),
902                    value: ModuleKind::Unresolved,
903                }
904                .to_expr(Default::default());
905                let ori = Origin {
906                    origin: SourceOrigin::Internal(name),
907                    source: literal!(""),
908                    exprs: Arc::from_iter([e]),
909                };
910                ori
911            }
912        };
913        info!("parse time: {:?}", st.elapsed());
914        let st = Instant::now();
915        let ori = {
916            let _ori = ori.clone();
917            ori.resolve_modules(&self.resolvers).await.with_context(|| _ori)?
918        };
919        info!("resolve time: {:?}", st.elapsed());
920        let mut exprs = smallvec![];
921        for e in ori.exprs.iter() {
922            let top_id = e.id;
923            let n =
924                compile(&mut self.ctx, &scope, e.clone()).with_context(|| ori.clone())?;
925            let has_out = is_output(&n);
926            let typ = n.typ().clone();
927            self.nodes.insert(top_id, n);
928            self.updated.insert(top_id, true);
929            exprs.push(CompExp { id: top_id, output: has_out, typ, rt: rt.clone() })
930        }
931        Ok(CompRes { exprs, env: self.ctx.env.clone() })
932    }
933
934    fn compile_callable(&mut self, id: Value, rt: GXHandle) -> Result<Callable> {
935        let id = match id {
936            Value::U64(id) => LambdaId::from(id),
937            v => bail!("invalid lambda id {v}"),
938        };
939        let lb = self.ctx.env.lambdas.get(&id).and_then(Weak::upgrade);
940        let lb = lb.ok_or_else(|| anyhow!("unknown lambda {id:?}"))?;
941        let args = lb.typ.args.iter();
942        let args = args
943            .map(|a| {
944                if a.label.as_ref().map(|(_, opt)| *opt).unwrap_or(false) {
945                    bail!("can't call lambda with an optional argument from rust")
946                } else {
947                    Ok(BindId::new())
948                }
949            })
950            .collect::<Result<Box<[_]>>>()?;
951        let eid = ExprId::new();
952        let argn = lb.typ.args.iter().zip(args.iter());
953        let argn = argn
954            .map(|(arg, id)| genn::reference(&mut self.ctx, *id, arg.typ.clone(), eid))
955            .collect::<Vec<_>>();
956        let fnode = genn::constant(Value::U64(id.inner()));
957        let mut n = genn::apply(fnode, argn, lb.typ.clone(), eid);
958        self.event.init = true;
959        n.update(&mut self.ctx, &mut self.event);
960        self.event.clear();
961        let cid = CallableId::new();
962        self.callables.insert(cid, CallableInt { expr: eid, args });
963        self.nodes.insert(eid, n);
964        let env = self.ctx.env.clone();
965        Ok(Callable { expr: eid, rt, env, id: cid, typ: (*lb.typ).clone() })
966    }
967
968    fn compile_ref(&mut self, rt: GXHandle, id: BindId) -> Result<Ref> {
969        let eid = ExprId::new();
970        let typ = Type::Any;
971        let n = genn::reference(&mut self.ctx, id, typ, eid);
972        self.nodes.insert(eid, n);
973        let target_bid = self.ctx.env.byref_chain.get(&id).copied();
974        Ok(Ref {
975            id: eid,
976            bid: id,
977            target_bid,
978            last: self.ctx.cached.get(&id).cloned(),
979            rt,
980        })
981    }
982
983    fn call_callable(
984        &mut self,
985        id: CallableId,
986        args: ValArray,
987        tasks: &mut Vec<(BindId, Value)>,
988    ) -> Result<()> {
989        let c =
990            self.callables.get(&id).ok_or_else(|| anyhow!("unknown callable {id:?}"))?;
991        if args.len() != c.args.len() {
992            bail!("expected {} arguments", c.args.len());
993        }
994        let a = c.args.iter().zip(args.iter()).map(|(id, v)| (*id, v.clone()));
995        tasks.extend(a);
996        Ok(())
997    }
998
999    fn delete_callable(&mut self, id: CallableId) {
1000        if let Some(c) = self.callables.remove(&id) {
1001            if let Some(mut n) = self.nodes.shift_remove(&c.expr) {
1002                n.delete(&mut self.ctx)
1003            }
1004        }
1005    }
1006
1007    async fn run(mut self, mut to_rt: tmpsc::UnboundedReceiver<ToRt>) -> Result<()> {
1008        let mut tasks = vec![];
1009        let mut input = vec![];
1010        let mut rpcs = vec![];
1011        let onemin = Duration::from_secs(60);
1012        'main: loop {
1013            let now = Instant::now();
1014            let ready = self.cycle_ready();
1015            let mut updates = None;
1016            let mut writes = None;
1017            macro_rules! peek {
1018                (updates) => {
1019                    if self.ctx.user.net_updates.is_empty() {
1020                        while let Ok(Some(mut up)) = self.ctx.user.updates.try_next() {
1021                            match &mut updates {
1022                                None => updates = Some(up),
1023                                Some(prev) => prev.extend(up.drain(..)),
1024                            }
1025                        }
1026                    }
1027                };
1028                (writes) => {
1029                    if self.ctx.user.net_writes.is_empty() {
1030                        if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
1031                            writes = Some(wr);
1032                        }
1033                    }
1034                };
1035                (tasks) => {
1036                    while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
1037                        tasks.push(up);
1038                    }
1039                };
1040                (rpcs) => {
1041                    if self.ctx.user.rpc_overflow.is_empty() {
1042                        while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
1043                            rpcs.push(up);
1044                        }
1045                    }
1046                };
1047                (input) => {
1048                    while let Ok(m) = to_rt.try_recv() {
1049                        input.push(m);
1050                    }
1051                };
1052                ($($item:tt),+) => {{
1053                    $(peek!($item));+
1054                }};
1055            }
1056            select! {
1057                rp = maybe_next(
1058                    self.ctx.user.rpc_overflow.is_empty(),
1059                    &mut self.ctx.user.rpcs
1060                ) => {
1061                    rpcs.push(rp);
1062                    peek!(updates, tasks, writes, rpcs, input)
1063                }
1064                wr = maybe_next(
1065                    self.ctx.user.net_writes.is_empty(),
1066                    &mut self.ctx.user.writes
1067                ) => {
1068                    writes = Some(wr);
1069                    peek!(updates, tasks, rpcs, input);
1070                },
1071                up = maybe_next(
1072                    self.ctx.user.net_updates.is_empty(),
1073                    &mut self.ctx.user.updates
1074                ) => {
1075                    updates = Some(up);
1076                    peek!(updates, writes, tasks, rpcs, input);
1077                },
1078                up = join_or_wait(&mut self.ctx.user.tasks) => {
1079                    if let Ok(up) = up {
1080                        tasks.push(up);
1081                    }
1082                    peek!(updates, writes, tasks, rpcs, input)
1083                },
1084                _ = or_never(ready) => {
1085                    peek!(updates, writes, tasks, rpcs, input)
1086                },
1087                n = to_rt.recv_many(&mut input, 100000) => {
1088                    if n == 0 {
1089                        break 'main Ok(())
1090                    }
1091                    peek!(updates, writes, tasks, rpcs);
1092                },
1093                () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
1094                    while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
1095                        if ts.elapsed() >= Duration::from_secs(1) {
1096                            self.ctx.user.pending_unsubscribe.pop_front();
1097                        } else {
1098                            break
1099                        }
1100                    }
1101                    continue 'main
1102                },
1103            }
1104            let mut batch = BATCH.take();
1105            self.process_input_batch(&mut tasks, &mut input, &mut batch).await;
1106            self.do_cycle(
1107                updates, writes, &mut tasks, &mut rpcs, &mut to_rt, &mut input, batch,
1108            )
1109            .await;
1110            if !self.ctx.user.rpc_clients.is_empty() {
1111                if now - self.last_rpc_gc >= onemin {
1112                    self.last_rpc_gc = now;
1113                    self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
1114                }
1115            }
1116        }
1117    }
1118}
1119
1120/// A handle to a running GX instance. Drop the handle to shutdown the
1121/// associated background tasks.
1122#[derive(Clone)]
1123pub struct GXHandle(tmpsc::UnboundedSender<ToRt>);
1124
1125impl GXHandle {
1126    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
1127        let (tx, rx) = oneshot::channel();
1128        self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
1129        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
1130    }
1131
1132    /// Get a copy of the current graphix environment
1133    pub async fn get_env(&self) -> Result<Env<GXCtx, NoUserEvent>> {
1134        self.exec(|res| ToRt::GetEnv { res }).await
1135    }
1136
1137    /// Compile and execute the specified graphix expression. If it generates
1138    /// results, they will be sent to all the channels that are subscribed. When
1139    /// the `CompExp` objects contained in the `CompRes` are dropped their
1140    /// corresponding expressions will be deleted. Therefore, you can stop
1141    /// execution of the whole expression by dropping the returned `CompRes`.
1142    pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
1143        Ok(self.exec(|tx| ToRt::Compile { text, res: tx, rt: self.clone() }).await??)
1144    }
1145
1146    /// Load and execute the specified graphix module. The path may have one of
1147    /// two forms. If it is the path to a file with extension .bs then the rt
1148    /// will load the file directly. If it is a modpath (e.g. foo::bar::baz)
1149    /// then the module resolver will look for a matching module in the modpath.
1150    /// When the `CompExp` objects contained in the `CompRes` are dropped their
1151    /// corresponding expressions will be deleted. Therefore, you can stop
1152    /// execution of the whole file by dropping the returned `CompRes`.
1153    pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
1154        Ok(self.exec(|tx| ToRt::Load { path, res: tx, rt: self.clone() }).await??)
1155    }
1156
1157    /// Compile a callable interface to the specified lambda id. This is how you
1158    /// call a lambda directly from rust. When the returned `Callable` is
1159    /// dropped the associated callsite will be delete.
1160    pub async fn compile_callable(&self, id: Value) -> Result<Callable> {
1161        Ok(self
1162            .exec(|tx| ToRt::CompileCallable { id, rt: self.clone(), res: tx })
1163            .await??)
1164    }
1165
1166    /// Compile an expression that will output the value of the ref specifed by
1167    /// id. This is the same as the deref (*) operator in graphix. When the
1168    /// returned `Ref` is dropped the compiled code will be deleted.
1169    pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref> {
1170        Ok(self
1171            .exec(|tx| ToRt::CompileRef { id: id.into(), res: tx, rt: self.clone() })
1172            .await??)
1173    }
1174
1175    /// Set the variable idenfified by `id` to `v`, triggering updates of all
1176    /// dependent node trees.
1177    pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
1178        let v = v.into();
1179        self.0.send(ToRt::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
1180    }
1181}
1182
1183#[derive(Builder)]
1184#[builder(pattern = "owned")]
1185pub struct GXConfig {
1186    /// The subscribe timeout to use when resolving modules in
1187    /// netidx. Resolution will fail if the subscription does not
1188    /// succeed before this timeout elapses.
1189    #[builder(setter(strip_option), default)]
1190    resolve_timeout: Option<Duration>,
1191    /// The publish timeout to use when sending published batches. Default None.
1192    #[builder(setter(strip_option), default)]
1193    publish_timeout: Option<Duration>,
1194    /// The execution context with any builtins already registered
1195    ctx: ExecCtx<GXCtx, NoUserEvent>,
1196    /// The text of the root module
1197    #[builder(setter(strip_option), default)]
1198    root: Option<ArcStr>,
1199    /// The set of module resolvers to use when resolving loaded modules
1200    #[builder(default)]
1201    resolvers: Vec<ModuleResolver>,
1202    /// The channel that will receive events from the runtime
1203    sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1204}
1205
1206impl GXConfig {
1207    /// Create a new config
1208    pub fn builder(
1209        ctx: ExecCtx<GXCtx, NoUserEvent>,
1210        sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1211    ) -> GXConfigBuilder {
1212        GXConfigBuilder::default().ctx(ctx).sub(sub)
1213    }
1214
1215    /// Start the graphix runtime with the specified config, return a
1216    /// handle capable of interacting with it.
1217    ///
1218    /// root is the text of the root module you wish to initially
1219    /// load. This will define the environment for the rest of the
1220    /// code compiled by this runtime. The runtime starts completely
1221    /// empty, with only the language, no core library, no standard
1222    /// library. To build a runtime with the full standard library and
1223    /// nothing else simply pass the output of
1224    /// `graphix_stdlib::register` to start.
1225    pub async fn start(self) -> Result<GXHandle> {
1226        let (init_tx, init_rx) = oneshot::channel();
1227        let (tx, rx) = tmpsc::unbounded_channel();
1228        task::spawn(async move {
1229            match GX::new(self).await {
1230                Ok(bs) => {
1231                    let _ = init_tx.send(Ok(()));
1232                    if let Err(e) = bs.run(rx).await {
1233                        error!("run loop exited with error {e:?}")
1234                    }
1235                }
1236                Err(e) => {
1237                    let _ = init_tx.send(Err(e));
1238                }
1239            };
1240        });
1241        init_rx.await??;
1242        Ok(GXHandle(tx))
1243    }
1244}