graphix_rt/
lib.rs

1/// A general purpose graphix runtime
2///
3/// This module implements a generic graphix runtime suitable for most
4/// applications, including applications that implement custom graphix
5/// builtins. The graphix interperter is run in a background task, and
6/// can be interacted with via a handle. All features of the standard
7/// library are supported by this runtime.
8///
9/// In special cases where this runtime is not suitable for your
10/// application you can implement your own, see the [Ctx] and
11/// [UserEvent] traits.
12use anyhow::{anyhow, bail, Context, Result};
13use arcstr::{literal, ArcStr};
14use chrono::prelude::*;
15use combine::stream::position::SourcePosition;
16use compact_str::format_compact;
17use core::fmt;
18use derive_builder::Builder;
19use futures::{channel::mpsc, future::join_all, FutureExt, StreamExt};
20use fxhash::{FxBuildHasher, FxHashMap};
21use graphix_compiler::{
22    compile,
23    env::Env,
24    expr::{self, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin},
25    node::genn,
26    typ::{FnType, Type},
27    BindId, Ctx, Event, ExecCtx, LambdaId, NoUserEvent, Node, REFS,
28};
29use indexmap::IndexMap;
30use log::{debug, error, info};
31use netidx::{
32    path::Path,
33    pool::{Pool, Pooled},
34    protocol::valarray::ValArray,
35    publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
36    resolver_client::ChangeTracker,
37    subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
38};
39use netidx_core::atomic_id;
40use netidx_protocols::rpc::{
41    self,
42    server::{ArgSpec, RpcCall},
43};
44use serde_derive::{Deserialize, Serialize};
45use smallvec::{smallvec, SmallVec};
46use std::{
47    collections::{hash_map::Entry, HashMap, VecDeque},
48    future, mem,
49    os::unix::ffi::OsStrExt,
50    panic::{catch_unwind, AssertUnwindSafe},
51    path::{Component, PathBuf},
52    result,
53    sync::{LazyLock, Weak},
54    time::Duration,
55};
56use tokio::{
57    fs, select,
58    sync::{
59        mpsc::{self as tmpsc, error::SendTimeoutError, UnboundedReceiver},
60        oneshot, Mutex,
61    },
62    task::{self, JoinError, JoinSet},
63    time::{self, Instant},
64};
65use triomphe::Arc;
66
67type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
68type WriteBatch = Pooled<Vec<WriteRequest>>;
69
70#[derive(Debug)]
71pub struct CouldNotResolve;
72
73impl fmt::Display for CouldNotResolve {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        write!(f, "could not resolve module")
76    }
77}
78
79#[derive(Debug)]
80struct RpcClient {
81    proc: rpc::client::Proc,
82    last_used: Instant,
83}
84
85#[derive(Debug)]
86pub struct GXCtx {
87    by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
88    subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
89    published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
90    var_updates: VecDeque<(BindId, Value)>,
91    net_updates: VecDeque<(SubId, subscriber::Event)>,
92    net_writes: VecDeque<(Id, WriteRequest)>,
93    rpc_overflow: VecDeque<(BindId, RpcCall)>,
94    rpc_clients: FxHashMap<Path, RpcClient>,
95    published_rpcs: FxHashMap<Path, rpc::server::Proc>,
96    pending_unsubscribe: VecDeque<(Instant, Dval)>,
97    change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
98    tasks: JoinSet<(BindId, Value)>,
99    batch: publisher::UpdateBatch,
100    publisher: Publisher,
101    subscriber: Subscriber,
102    updates_tx: mpsc::Sender<UpdateBatch>,
103    updates: mpsc::Receiver<UpdateBatch>,
104    writes_tx: mpsc::Sender<WriteBatch>,
105    writes: mpsc::Receiver<WriteBatch>,
106    rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
107    rpcs: mpsc::Receiver<(BindId, RpcCall)>,
108}
109
110impl GXCtx {
111    pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
112        let (updates_tx, updates) = mpsc::channel(100);
113        let (writes_tx, writes) = mpsc::channel(100);
114        let (rpcs_tx, rpcs) = mpsc::channel(100);
115        let batch = publisher.start_batch();
116        let mut tasks = JoinSet::new();
117        tasks.spawn(async { future::pending().await });
118        Self {
119            by_ref: HashMap::default(),
120            var_updates: VecDeque::new(),
121            net_updates: VecDeque::new(),
122            net_writes: VecDeque::new(),
123            rpc_overflow: VecDeque::new(),
124            rpc_clients: HashMap::default(),
125            subscribed: HashMap::default(),
126            pending_unsubscribe: VecDeque::new(),
127            published: HashMap::default(),
128            change_trackers: HashMap::default(),
129            published_rpcs: HashMap::default(),
130            tasks,
131            batch,
132            publisher,
133            subscriber,
134            updates,
135            updates_tx,
136            writes,
137            writes_tx,
138            rpcs_tx,
139            rpcs,
140        }
141    }
142}
143
144macro_rules! or_err {
145    ($bindid:expr, $e:expr) => {
146        match $e {
147            Ok(v) => v,
148            Err(e) => {
149                let e = ArcStr::from(format_compact!("{e:?}").as_str());
150                let e = Value::Error(e);
151                return ($bindid, e);
152            }
153        }
154    };
155}
156
157macro_rules! check_changed {
158    ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
159        let mut ct = $ct.lock().await;
160        if ct.path() != &$path {
161            *ct = ChangeTracker::new($path.clone());
162        }
163        if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
164            return ($id, Value::Null);
165        }
166    };
167}
168
169impl Ctx for GXCtx {
170    fn clear(&mut self) {
171        let Self {
172            by_ref,
173            var_updates,
174            net_updates,
175            net_writes,
176            rpc_clients,
177            rpc_overflow,
178            subscribed,
179            published,
180            published_rpcs,
181            pending_unsubscribe,
182            change_trackers,
183            tasks,
184            batch,
185            publisher,
186            subscriber: _,
187            updates_tx,
188            updates,
189            writes_tx,
190            writes,
191            rpcs,
192            rpcs_tx,
193        } = self;
194        by_ref.clear();
195        var_updates.clear();
196        net_updates.clear();
197        net_writes.clear();
198        rpc_overflow.clear();
199        rpc_clients.clear();
200        subscribed.clear();
201        published.clear();
202        published_rpcs.clear();
203        pending_unsubscribe.clear();
204        change_trackers.clear();
205        *tasks = JoinSet::new();
206        tasks.spawn(async { future::pending().await });
207        *batch = publisher.start_batch();
208        let (tx, rx) = mpsc::channel(3);
209        *updates_tx = tx;
210        *updates = rx;
211        let (tx, rx) = mpsc::channel(100);
212        *writes_tx = tx;
213        *writes = rx;
214        let (tx, rx) = mpsc::channel(100);
215        *rpcs_tx = tx;
216        *rpcs = rx
217    }
218
219    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
220        let now = Instant::now();
221        let proc = match self.rpc_clients.entry(name) {
222            Entry::Occupied(mut e) => {
223                let cl = e.get_mut();
224                cl.last_used = now;
225                Ok(cl.proc.clone())
226            }
227            Entry::Vacant(e) => {
228                match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
229                    Err(e) => Err(e),
230                    Ok(proc) => {
231                        let cl = RpcClient { last_used: now, proc: proc.clone() };
232                        e.insert(cl);
233                        Ok(proc)
234                    }
235                }
236            }
237        };
238        self.tasks.spawn(async move {
239            macro_rules! err {
240                ($e:expr) => {{
241                    let e = format_compact!("{:?}", $e);
242                    (id, Value::Error(e.as_str().into()))
243                }};
244            }
245            match proc {
246                Err(e) => err!(e),
247                Ok(proc) => match proc.call(args).await {
248                    Err(e) => err!(e),
249                    Ok(res) => (id, res),
250                },
251            }
252        });
253    }
254
255    fn publish_rpc(
256        &mut self,
257        name: Path,
258        doc: Value,
259        spec: Vec<ArgSpec>,
260        id: BindId,
261    ) -> Result<()> {
262        use rpc::server::Proc;
263        let e = match self.published_rpcs.entry(name) {
264            Entry::Vacant(e) => e,
265            Entry::Occupied(_) => bail!("already published"),
266        };
267        let proc = Proc::new(
268            &self.publisher,
269            e.key().clone(),
270            doc,
271            spec,
272            move |c| Some((id, c)),
273            Some(self.rpcs_tx.clone()),
274        )?;
275        e.insert(proc);
276        Ok(())
277    }
278
279    fn unpublish_rpc(&mut self, name: Path) {
280        self.published_rpcs.remove(&name);
281    }
282
283    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
284        let dval =
285            self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
286        *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
287        dval
288    }
289
290    fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
291        if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
292            if let Some(cn) = exprs.get_mut(&ref_by) {
293                *cn -= 1;
294                if *cn == 0 {
295                    exprs.remove(&ref_by);
296                }
297            }
298            if exprs.is_empty() {
299                self.subscribed.remove(&dv.id());
300            }
301        }
302        self.pending_unsubscribe.push_back((Instant::now(), dv));
303    }
304
305    fn list(&mut self, id: BindId, path: Path) {
306        let ct = self
307            .change_trackers
308            .entry(id)
309            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
310        let ct = Arc::clone(ct);
311        let resolver = self.subscriber.resolver();
312        self.tasks.spawn(async move {
313            check_changed!(id, resolver, path, ct);
314            let mut paths = or_err!(id, resolver.list(path).await);
315            let paths = paths.drain(..).map(|p| Value::String(p.into()));
316            (id, Value::Array(ValArray::from_iter_exact(paths)))
317        });
318    }
319
320    fn list_table(&mut self, id: BindId, path: Path) {
321        let ct = self
322            .change_trackers
323            .entry(id)
324            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
325        let ct = Arc::clone(ct);
326        let resolver = self.subscriber.resolver();
327        self.tasks.spawn(async move {
328            check_changed!(id, resolver, path, ct);
329            let mut tbl = or_err!(id, resolver.table(path).await);
330            let cols = tbl.cols.drain(..).map(|(name, count)| {
331                Value::Array(ValArray::from([
332                    Value::String(name.into()),
333                    Value::V64(count.0),
334                ]))
335            });
336            let cols = Value::Array(ValArray::from_iter_exact(cols));
337            let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
338            let rows = Value::Array(ValArray::from_iter_exact(rows));
339            let tbl = Value::Array(ValArray::from([
340                Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
341                Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
342            ]));
343            (id, tbl)
344        });
345    }
346
347    fn stop_list(&mut self, id: BindId) {
348        self.change_trackers.remove(&id);
349    }
350
351    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
352        let val = self.publisher.publish_with_flags_and_writes(
353            PublishFlags::empty(),
354            path,
355            value,
356            Some(self.writes_tx.clone()),
357        )?;
358        let id = val.id();
359        *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
360        Ok(val)
361    }
362
363    fn update(&mut self, val: &Val, value: Value) {
364        val.update(&mut self.batch, value);
365    }
366
367    fn unpublish(&mut self, val: Val, ref_by: ExprId) {
368        if let Some(refs) = self.published.get_mut(&val.id()) {
369            if let Some(cn) = refs.get_mut(&ref_by) {
370                *cn -= 1;
371                if *cn == 0 {
372                    refs.remove(&ref_by);
373                }
374            }
375            if refs.is_empty() {
376                self.published.remove(&val.id());
377            }
378        }
379    }
380
381    fn set_timer(&mut self, id: BindId, timeout: Duration) {
382        self.tasks
383            .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
384    }
385
386    fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
387        *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
388    }
389
390    fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
391        if let Some(refs) = self.by_ref.get_mut(&id) {
392            if let Some(cn) = refs.get_mut(&ref_by) {
393                *cn -= 1;
394                if *cn == 0 {
395                    refs.remove(&ref_by);
396                }
397            }
398            if refs.is_empty() {
399                self.by_ref.remove(&id);
400            }
401        }
402    }
403
404    fn set_var(&mut self, id: BindId, value: Value) {
405        self.var_updates.push_back((id, value.clone()));
406    }
407}
408
409fn is_output(n: &Node<GXCtx, NoUserEvent>) -> bool {
410    match &n.spec().kind {
411        ExprKind::Bind { .. }
412        | ExprKind::Lambda { .. }
413        | ExprKind::Use { .. }
414        | ExprKind::Connect { .. }
415        | ExprKind::Module { .. }
416        | ExprKind::TypeDef { .. } => false,
417        _ => true,
418    }
419}
420
421async fn or_never(b: bool) {
422    if !b {
423        future::pending().await
424    }
425}
426
427async fn join_or_wait(
428    js: &mut JoinSet<(BindId, Value)>,
429) -> result::Result<(BindId, Value), JoinError> {
430    match js.join_next().await {
431        None => future::pending().await,
432        Some(r) => r,
433    }
434}
435
436async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
437    if go {
438        match ch.next().await {
439            None => future::pending().await,
440            Some(v) => v,
441        }
442    } else {
443        future::pending().await
444    }
445}
446
447async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
448    if pending.len() == 0 {
449        future::pending().await
450    } else {
451        let (ts, _) = pending.front().unwrap();
452        let one = Duration::from_secs(1);
453        let elapsed = now - *ts;
454        if elapsed < one {
455            time::sleep(one - elapsed).await
456        }
457    }
458}
459
460pub struct CompExp {
461    pub id: ExprId,
462    pub typ: Type,
463    pub output: bool,
464    rt: GXHandle,
465}
466
467impl Drop for CompExp {
468    fn drop(&mut self) {
469        let _ = self.rt.0.send(ToRt::Delete { id: self.id });
470    }
471}
472
473pub struct CompRes {
474    pub exprs: SmallVec<[CompExp; 1]>,
475    pub env: Env<GXCtx, NoUserEvent>,
476}
477
478#[derive(Clone)]
479pub enum RtEvent {
480    Updated(ExprId, Value),
481    Env(Env<GXCtx, NoUserEvent>),
482}
483
484static BATCH: LazyLock<Pool<Vec<RtEvent>>> = LazyLock::new(|| Pool::new(10, 1000000));
485
486pub struct Ref {
487    pub id: ExprId,
488    pub last: Option<Value>,
489    pub bid: BindId,
490    pub target_bid: Option<BindId>,
491    rt: GXHandle,
492}
493
494impl Drop for Ref {
495    fn drop(&mut self) {
496        let _ = self.rt.0.send(ToRt::Delete { id: self.id });
497    }
498}
499
500impl Ref {
501    pub fn set(&self, v: Value) -> Result<()> {
502        self.rt.set(self.bid, v)
503    }
504
505    pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
506        if let Some(id) = self.target_bid {
507            self.rt.set(id, v)?
508        }
509        Ok(())
510    }
511}
512
513atomic_id!(CallableId);
514
515pub struct Callable {
516    rt: GXHandle,
517    id: CallableId,
518    env: Env<GXCtx, NoUserEvent>,
519    pub typ: FnType,
520    pub expr: ExprId,
521}
522
523impl Drop for Callable {
524    fn drop(&mut self) {
525        let _ = self.rt.0.send(ToRt::DeleteCallable { id: self.id });
526    }
527}
528
529impl Callable {
530    /// Call the lambda with args. Argument types and arity will be
531    /// checked and an error will be returned if they are wrong.
532    pub async fn call(&self, args: ValArray) -> Result<()> {
533        if self.typ.args.len() != args.len() {
534            bail!("expected {} args", self.typ.args.len())
535        }
536        for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
537            if !a.typ.is_a(&self.env, v) {
538                bail!("type mismatch arg {i} expected {}", a.typ)
539            }
540        }
541        self.call_unchecked(args).await
542    }
543
544    /// Call the lambda with args. Argument types and arity will NOT
545    /// be checked. This can result in a runtime panic, invalid
546    /// results, and probably other bad things.
547    pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
548        self.rt
549            .0
550            .send(ToRt::Call { id: self.id, args })
551            .map_err(|_| anyhow!("runtime is dead"))
552    }
553}
554
555enum ToRt {
556    GetEnv { res: oneshot::Sender<Env<GXCtx, NoUserEvent>> },
557    Delete { id: ExprId },
558    Load { path: PathBuf, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
559    Compile { text: ArcStr, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
560    CompileCallable { id: Value, rt: GXHandle, res: oneshot::Sender<Result<Callable>> },
561    CompileRef { id: BindId, rt: GXHandle, res: oneshot::Sender<Result<Ref>> },
562    Set { id: BindId, v: Value },
563    Call { id: CallableId, args: ValArray },
564    DeleteCallable { id: CallableId },
565}
566
567struct CallableInt {
568    expr: ExprId,
569    args: Box<[BindId]>,
570}
571
572struct GX {
573    ctx: ExecCtx<GXCtx, NoUserEvent>,
574    event: Event<NoUserEvent>,
575    updated: FxHashMap<ExprId, bool>,
576    nodes: IndexMap<ExprId, Node<GXCtx, NoUserEvent>, FxBuildHasher>,
577    callables: FxHashMap<CallableId, CallableInt>,
578    sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
579    resolvers: Arc<[ModuleResolver]>,
580    publish_timeout: Option<Duration>,
581    last_rpc_gc: Instant,
582}
583
584impl GX {
585    async fn new(mut rt: GXConfig) -> Result<Self> {
586        let resolvers_default = |r: &mut Vec<ModuleResolver>| match dirs::data_dir() {
587            None => r.push(ModuleResolver::Files("".into())),
588            Some(dd) => {
589                r.push(ModuleResolver::Files("".into()));
590                r.push(ModuleResolver::Files(dd.join("graphix")));
591            }
592        };
593        match std::env::var("GRAPHIX_MODPATH") {
594            Err(_) => resolvers_default(&mut rt.resolvers),
595            Ok(mp) => match ModuleResolver::parse_env(
596                rt.ctx.user.subscriber.clone(),
597                rt.resolve_timeout,
598                &mp,
599            ) {
600                Ok(r) => rt.resolvers.extend(r),
601                Err(e) => {
602                    error!("failed to parse GRAPHIX_MODPATH, using default {e:?}");
603                    resolvers_default(&mut rt.resolvers)
604                }
605            },
606        };
607        let mut t = Self {
608            ctx: rt.ctx,
609            event: Event::new(NoUserEvent),
610            updated: HashMap::default(),
611            nodes: IndexMap::default(),
612            callables: HashMap::default(),
613            sub: rt.sub,
614            resolvers: Arc::from(rt.resolvers),
615            publish_timeout: rt.publish_timeout,
616            last_rpc_gc: Instant::now(),
617        };
618        let st = Instant::now();
619        if let Some(root) = rt.root {
620            t.compile_root(root).await?;
621        }
622        info!("root init time: {:?}", st.elapsed());
623        Ok(t)
624    }
625
626    async fn do_cycle(
627        &mut self,
628        updates: Option<UpdateBatch>,
629        writes: Option<WriteBatch>,
630        tasks: &mut Vec<(BindId, Value)>,
631        rpcs: &mut Vec<(BindId, RpcCall)>,
632        to_rt: &mut UnboundedReceiver<ToRt>,
633        input: &mut Vec<ToRt>,
634        mut batch: Pooled<Vec<RtEvent>>,
635    ) {
636        macro_rules! push_event {
637            ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
638                match self.event.$event.entry($id) {
639                    Entry::Vacant(e) => {
640                        e.insert($v);
641                        if let Some(exps) = self.ctx.user.$refed.get(&$id) {
642                            for id in exps.keys() {
643                                self.updated.entry(*id).or_insert(false);
644                            }
645                        }
646                    }
647                    Entry::Occupied(_) => {
648                        self.ctx.user.$overflow.push_back(($id, $v));
649                    }
650                }
651            };
652        }
653        for _ in 0..self.ctx.user.var_updates.len() {
654            let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
655            push_event!(id, v, variables, by_ref, var_updates)
656        }
657        for (id, v) in tasks.drain(..) {
658            push_event!(id, v, variables, by_ref, var_updates)
659        }
660        for _ in 0..self.ctx.user.rpc_overflow.len() {
661            let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
662            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
663        }
664        for (id, v) in rpcs.drain(..) {
665            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
666        }
667        for _ in 0..self.ctx.user.net_updates.len() {
668            let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
669            push_event!(id, v, netidx, subscribed, net_updates)
670        }
671        if let Some(mut updates) = updates {
672            for (id, v) in updates.drain(..) {
673                push_event!(id, v, netidx, subscribed, net_updates)
674            }
675        }
676        for _ in 0..self.ctx.user.net_writes.len() {
677            let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
678            push_event!(id, v, writes, published, net_writes)
679        }
680        if let Some(mut writes) = writes {
681            for wr in writes.drain(..) {
682                let id = wr.id;
683                push_event!(id, wr, writes, published, net_writes)
684            }
685        }
686        for (id, n) in self.nodes.iter_mut() {
687            if let Some(init) = self.updated.get(id) {
688                let mut clear: SmallVec<[BindId; 16]> = smallvec![];
689                self.event.init = *init;
690                if self.event.init {
691                    REFS.with_borrow_mut(|refs| {
692                        refs.clear();
693                        n.refs(refs);
694                        refs.with_external_refs(|id| {
695                            if let Some(v) = self.ctx.cached.get(&id) {
696                                if let Entry::Vacant(e) = self.event.variables.entry(id) {
697                                    e.insert(v.clone());
698                                    clear.push(id);
699                                }
700                            }
701                        });
702                    });
703                }
704                let res = catch_unwind(AssertUnwindSafe(|| {
705                    n.update(&mut self.ctx, &mut self.event)
706                }));
707                for id in clear {
708                    self.event.variables.remove(&id);
709                }
710                match res {
711                    Ok(None) => (),
712                    Ok(Some(v)) => batch.push(RtEvent::Updated(*id, v)),
713                    Err(e) => {
714                        error!("could not update exprid: {id:?}, panic: {e:?}")
715                    }
716                }
717            }
718        }
719        loop {
720            match self.sub.send_timeout(batch, Duration::from_millis(100)).await {
721                Ok(()) => break,
722                Err(SendTimeoutError::Closed(_)) => {
723                    error!("could not send batch");
724                    break;
725                }
726                Err(SendTimeoutError::Timeout(b)) => {
727                    batch = b;
728                    // prevent deadlock on input
729                    while let Ok(m) = to_rt.try_recv() {
730                        input.push(m);
731                    }
732                    self.process_input_batch(tasks, input, &mut batch).await;
733                }
734            }
735        }
736        self.event.clear();
737        self.updated.clear();
738        if self.ctx.user.batch.len() > 0 {
739            let batch = mem::replace(
740                &mut self.ctx.user.batch,
741                self.ctx.user.publisher.start_batch(),
742            );
743            let timeout = self.publish_timeout;
744            task::spawn(async move { batch.commit(timeout).await });
745        }
746    }
747
748    async fn process_input_batch(
749        &mut self,
750        tasks: &mut Vec<(BindId, Value)>,
751        input: &mut Vec<ToRt>,
752        batch: &mut Pooled<Vec<RtEvent>>,
753    ) {
754        for m in input.drain(..) {
755            match m {
756                ToRt::GetEnv { res } => {
757                    let _ = res.send(self.ctx.env.clone());
758                }
759                ToRt::Compile { text, rt, res } => {
760                    let _ = res.send(self.compile(rt, text).await);
761                }
762                ToRt::Load { path, rt, res } => {
763                    let _ = res.send(self.load(rt, &path).await);
764                }
765                ToRt::Delete { id } => {
766                    if let Some(mut n) = self.nodes.shift_remove(&id) {
767                        n.delete(&mut self.ctx);
768                    }
769                    debug!("delete {id:?}");
770                    batch.push(RtEvent::Env(self.ctx.env.clone()));
771                }
772                ToRt::CompileCallable { id, rt, res } => {
773                    let _ = res.send(self.compile_callable(id, rt));
774                }
775                ToRt::CompileRef { id, rt, res } => {
776                    let _ = res.send(self.compile_ref(rt, id));
777                }
778                ToRt::Set { id, v } => {
779                    self.ctx.cached.insert(id, v.clone());
780                    tasks.push((id, v))
781                }
782                ToRt::DeleteCallable { id } => self.delete_callable(id),
783                ToRt::Call { id, args } => {
784                    if let Err(e) = self.call_callable(id, args, tasks) {
785                        error!("calling callable {id:?} failed with {e:?}")
786                    }
787                }
788            }
789        }
790    }
791
792    fn cycle_ready(&self) -> bool {
793        self.ctx.user.var_updates.len() > 0
794            || self.ctx.user.net_updates.len() > 0
795            || self.ctx.user.net_writes.len() > 0
796            || self.ctx.user.rpc_overflow.len() > 0
797    }
798
799    async fn compile_root(&mut self, text: ArcStr) -> Result<()> {
800        let scope = ModPath::root();
801        let ori =
802            expr::parser::parse(None, text.clone()).context("parsing the root module")?;
803        let exprs = join_all(
804            ori.exprs.iter().map(|e| e.resolve_modules(&scope, &self.resolvers)),
805        )
806        .await
807        .into_iter()
808        .collect::<Result<SmallVec<[_; 4]>>>()
809        .context(CouldNotResolve)?;
810        let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
811        let nodes = ori
812            .exprs
813            .iter()
814            .map(|e| {
815                compile(&mut self.ctx, &scope, e.clone())
816                    .with_context(|| format!("compiling root expression {e}"))
817            })
818            .collect::<Result<SmallVec<[_; 4]>>>()
819            .with_context(|| ori.clone())?;
820        for (e, n) in ori.exprs.iter().zip(nodes.into_iter()) {
821            self.updated.insert(e.id, true);
822            self.nodes.insert(e.id, n);
823        }
824        Ok(())
825    }
826
827    async fn compile(&mut self, rt: GXHandle, text: ArcStr) -> Result<CompRes> {
828        let scope = ModPath::root();
829        let ori = expr::parser::parse(None, text.clone())?;
830        let exprs = join_all(
831            ori.exprs.iter().map(|e| e.resolve_modules(&scope, &self.resolvers)),
832        )
833        .await
834        .into_iter()
835        .collect::<Result<SmallVec<[_; 4]>>>()
836        .context(CouldNotResolve)?;
837        let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
838        let nodes = ori
839            .exprs
840            .iter()
841            .map(|e| compile(&mut self.ctx, &scope, e.clone()))
842            .collect::<Result<SmallVec<[_; 4]>>>()
843            .with_context(|| ori.clone())?;
844        let exprs = ori
845            .exprs
846            .iter()
847            .zip(nodes.into_iter())
848            .map(|(e, n)| {
849                let output = is_output(&n);
850                let typ = n.typ().clone();
851                self.updated.insert(e.id, true);
852                self.nodes.insert(e.id, n);
853                CompExp { id: e.id, output, typ, rt: rt.clone() }
854            })
855            .collect::<SmallVec<[_; 1]>>();
856        Ok(CompRes { exprs, env: self.ctx.env.clone() })
857    }
858
859    async fn load(&mut self, rt: GXHandle, file: &PathBuf) -> Result<CompRes> {
860        let scope = ModPath::root();
861        let st = Instant::now();
862        let (scope, ori) = match file.extension() {
863            Some(e) if e.as_bytes() == b"gx" => {
864                let scope = match file.file_name() {
865                    None => scope,
866                    Some(name) => ModPath(scope.append(&*name.to_string_lossy())),
867                };
868                let s = fs::read_to_string(file).await?;
869                let s = if s.starts_with("#!") {
870                    if let Some(i) = s.find('\n') {
871                        &s[i..]
872                    } else {
873                        s.as_str()
874                    }
875                } else {
876                    s.as_str()
877                };
878                let s = ArcStr::from(s);
879                let name = ArcStr::from(file.to_string_lossy());
880                (scope, expr::parser::parse(Some(name), s)?)
881            }
882            Some(e) => bail!("invalid file extension {e:?}"),
883            None => {
884                let name = file
885                    .components()
886                    .map(|c| match c {
887                        Component::RootDir
888                        | Component::CurDir
889                        | Component::ParentDir
890                        | Component::Prefix(_) => bail!("invalid module name {file:?}"),
891                        Component::Normal(s) => Ok(s),
892                    })
893                    .collect::<Result<Box<[_]>>>()?;
894                if name.len() != 1 {
895                    bail!("invalid module name {file:?}")
896                }
897                let name = String::from_utf8_lossy(name[0].as_bytes());
898                let name = name
899                    .parse::<ModPath>()
900                    .with_context(|| "parsing module name {file:?}")?;
901                let scope =
902                    ModPath(Path::from_str(Path::dirname(&*name).unwrap_or(&*scope)));
903                let name = Path::basename(&*name)
904                    .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
905                let name = ArcStr::from(name);
906                let e = ExprKind::Module {
907                    export: true,
908                    name: name.clone(),
909                    value: ModuleKind::Unresolved,
910                }
911                .to_expr(Default::default());
912                let ori = Origin {
913                    name: Some(name),
914                    source: literal!(""),
915                    exprs: Arc::from_iter([e]),
916                };
917                (scope, ori)
918            }
919        };
920        let expr = ExprKind::Module {
921            export: true,
922            name: ori.name.clone().unwrap_or(literal!("")),
923            value: ModuleKind::Inline(ori.exprs.clone()),
924        }
925        .to_expr(SourcePosition::default());
926        info!("parse time: {:?}", st.elapsed());
927        let st = Instant::now();
928        let expr = expr
929            .resolve_modules(&scope, &self.resolvers)
930            .await
931            .with_context(|| ori.clone())?;
932        info!("resolve time: {:?}", st.elapsed());
933        let top_id = expr.id;
934        let n = compile(&mut self.ctx, &scope, expr).with_context(|| ori.clone())?;
935        let has_out = is_output(&n);
936        let typ = n.typ().clone();
937        self.nodes.insert(top_id, n);
938        self.updated.insert(top_id, true);
939        let exprs = smallvec![CompExp { id: top_id, output: has_out, typ, rt }];
940        Ok(CompRes { exprs, env: self.ctx.env.clone() })
941    }
942
943    fn compile_callable(&mut self, id: Value, rt: GXHandle) -> Result<Callable> {
944        let id = match id {
945            Value::U64(id) => LambdaId::from(id),
946            v => bail!("invalid lambda id {v}"),
947        };
948        let lb = self.ctx.env.lambdas.get(&id).and_then(Weak::upgrade);
949        let lb = lb.ok_or_else(|| anyhow!("unknown lambda {id:?}"))?;
950        let args = lb.typ.args.iter();
951        let args = args
952            .map(|a| {
953                if a.label.as_ref().map(|(_, opt)| *opt).unwrap_or(false) {
954                    bail!("can't call lambda with an optional argument from rust")
955                } else {
956                    Ok(BindId::new())
957                }
958            })
959            .collect::<Result<Box<[_]>>>()?;
960        let eid = ExprId::new();
961        let argn = lb.typ.args.iter().zip(args.iter());
962        let argn = argn
963            .map(|(arg, id)| genn::reference(&mut self.ctx, *id, arg.typ.clone(), eid))
964            .collect::<Vec<_>>();
965        let fnode = genn::constant(Value::U64(id.inner()));
966        let mut n = genn::apply(fnode, argn, lb.typ.clone(), eid);
967        self.event.init = true;
968        n.update(&mut self.ctx, &mut self.event);
969        self.event.clear();
970        let cid = CallableId::new();
971        self.callables.insert(cid, CallableInt { expr: eid, args });
972        self.nodes.insert(eid, n);
973        let env = self.ctx.env.clone();
974        Ok(Callable { expr: eid, rt, env, id: cid, typ: (*lb.typ).clone() })
975    }
976
977    fn compile_ref(&mut self, rt: GXHandle, id: BindId) -> Result<Ref> {
978        let eid = ExprId::new();
979        let typ = Type::Any;
980        let n = genn::reference(&mut self.ctx, id, typ, eid);
981        self.nodes.insert(eid, n);
982        let target_bid = self.ctx.env.byref_chain.get(&id).copied();
983        Ok(Ref {
984            id: eid,
985            bid: id,
986            target_bid,
987            last: self.ctx.cached.get(&id).cloned(),
988            rt,
989        })
990    }
991
992    fn call_callable(
993        &mut self,
994        id: CallableId,
995        args: ValArray,
996        tasks: &mut Vec<(BindId, Value)>,
997    ) -> Result<()> {
998        let c =
999            self.callables.get(&id).ok_or_else(|| anyhow!("unknown callable {id:?}"))?;
1000        if args.len() != c.args.len() {
1001            bail!("expected {} arguments", c.args.len());
1002        }
1003        let a = c.args.iter().zip(args.iter()).map(|(id, v)| (*id, v.clone()));
1004        tasks.extend(a);
1005        Ok(())
1006    }
1007
1008    fn delete_callable(&mut self, id: CallableId) {
1009        if let Some(c) = self.callables.remove(&id) {
1010            if let Some(mut n) = self.nodes.shift_remove(&c.expr) {
1011                n.delete(&mut self.ctx)
1012            }
1013        }
1014    }
1015
1016    async fn run(mut self, mut to_rt: tmpsc::UnboundedReceiver<ToRt>) -> Result<()> {
1017        let mut tasks = vec![];
1018        let mut input = vec![];
1019        let mut rpcs = vec![];
1020        let onemin = Duration::from_secs(60);
1021        'main: loop {
1022            let now = Instant::now();
1023            let ready = self.cycle_ready();
1024            let mut updates = None;
1025            let mut writes = None;
1026            macro_rules! peek {
1027                (updates) => {
1028                    if self.ctx.user.net_updates.is_empty() {
1029                        while let Ok(Some(mut up)) = self.ctx.user.updates.try_next() {
1030                            match &mut updates {
1031                                None => updates = Some(up),
1032                                Some(prev) => prev.extend(up.drain(..)),
1033                            }
1034                        }
1035                    }
1036                };
1037                (writes) => {
1038                    if self.ctx.user.net_writes.is_empty() {
1039                        if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
1040                            writes = Some(wr);
1041                        }
1042                    }
1043                };
1044                (tasks) => {
1045                    while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
1046                        tasks.push(up);
1047                    }
1048                };
1049                (rpcs) => {
1050                    if self.ctx.user.rpc_overflow.is_empty() {
1051                        while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
1052                            rpcs.push(up);
1053                        }
1054                    }
1055                };
1056                (input) => {
1057                    while let Ok(m) = to_rt.try_recv() {
1058                        input.push(m);
1059                    }
1060                };
1061                ($($item:tt),+) => {{
1062                    $(peek!($item));+
1063                }};
1064            }
1065            select! {
1066                rp = maybe_next(
1067                    self.ctx.user.rpc_overflow.is_empty(),
1068                    &mut self.ctx.user.rpcs
1069                ) => {
1070                    rpcs.push(rp);
1071                    peek!(updates, tasks, writes, rpcs, input)
1072                }
1073                wr = maybe_next(
1074                    self.ctx.user.net_writes.is_empty(),
1075                    &mut self.ctx.user.writes
1076                ) => {
1077                    writes = Some(wr);
1078                    peek!(updates, tasks, rpcs, input);
1079                },
1080                up = maybe_next(
1081                    self.ctx.user.net_updates.is_empty(),
1082                    &mut self.ctx.user.updates
1083                ) => {
1084                    updates = Some(up);
1085                    peek!(updates, writes, tasks, rpcs, input);
1086                },
1087                up = join_or_wait(&mut self.ctx.user.tasks) => {
1088                    if let Ok(up) = up {
1089                        tasks.push(up);
1090                    }
1091                    peek!(updates, writes, tasks, rpcs, input)
1092                },
1093                _ = or_never(ready) => {
1094                    peek!(updates, writes, tasks, rpcs, input)
1095                },
1096                n = to_rt.recv_many(&mut input, 100000) => {
1097                    if n == 0 {
1098                        break 'main Ok(())
1099                    }
1100                    peek!(updates, writes, tasks, rpcs);
1101                },
1102                () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
1103                    while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
1104                        if ts.elapsed() >= Duration::from_secs(1) {
1105                            self.ctx.user.pending_unsubscribe.pop_front();
1106                        } else {
1107                            break
1108                        }
1109                    }
1110                    continue 'main
1111                },
1112            }
1113            let mut batch = BATCH.take();
1114            self.process_input_batch(&mut tasks, &mut input, &mut batch).await;
1115            self.do_cycle(
1116                updates, writes, &mut tasks, &mut rpcs, &mut to_rt, &mut input, batch,
1117            )
1118            .await;
1119            if !self.ctx.user.rpc_clients.is_empty() {
1120                if now - self.last_rpc_gc >= onemin {
1121                    self.last_rpc_gc = now;
1122                    self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
1123                }
1124            }
1125        }
1126    }
1127}
1128
1129/// A handle to a running GX instance. Drop the handle to shutdown the
1130/// associated background tasks.
1131#[derive(Clone)]
1132pub struct GXHandle(tmpsc::UnboundedSender<ToRt>);
1133
1134impl GXHandle {
1135    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
1136        let (tx, rx) = oneshot::channel();
1137        self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
1138        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
1139    }
1140
1141    /// Get a copy of the current graphix environment
1142    pub async fn get_env(&self) -> Result<Env<GXCtx, NoUserEvent>> {
1143        self.exec(|res| ToRt::GetEnv { res }).await
1144    }
1145
1146    /// Compile and execute the specified graphix expression. If it generates
1147    /// results, they will be sent to all the channels that are subscribed. When
1148    /// the `CompExp` objects contained in the `CompRes` are dropped their
1149    /// corresponding expressions will be deleted. Therefore, you can stop
1150    /// execution of the whole expression by dropping the returned `CompRes`.
1151    pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
1152        Ok(self.exec(|tx| ToRt::Compile { text, res: tx, rt: self.clone() }).await??)
1153    }
1154
1155    /// Load and execute the specified graphix module. The path may have one of
1156    /// two forms. If it is the path to a file with extension .bs then the rt
1157    /// will load the file directly. If it is a modpath (e.g. foo::bar::baz)
1158    /// then the module resolver will look for a matching module in the modpath.
1159    /// When the `CompExp` objects contained in the `CompRes` are dropped their
1160    /// corresponding expressions will be deleted. Therefore, you can stop
1161    /// execution of the whole file by dropping the returned `CompRes`.
1162    pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
1163        Ok(self.exec(|tx| ToRt::Load { path, res: tx, rt: self.clone() }).await??)
1164    }
1165
1166    /// Compile a callable interface to the specified lambda id. This is how you
1167    /// call a lambda directly from rust. When the returned `Callable` is
1168    /// dropped the associated callsite will be delete.
1169    pub async fn compile_callable(&self, id: Value) -> Result<Callable> {
1170        Ok(self
1171            .exec(|tx| ToRt::CompileCallable { id, rt: self.clone(), res: tx })
1172            .await??)
1173    }
1174
1175    /// Compile an expression that will output the value of the ref specifed by
1176    /// id. This is the same as the deref (*) operator in graphix. When the
1177    /// returned `Ref` is dropped the compiled code will be deleted.
1178    pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref> {
1179        Ok(self
1180            .exec(|tx| ToRt::CompileRef { id: id.into(), res: tx, rt: self.clone() })
1181            .await??)
1182    }
1183
1184    /// Set the variable idenfified by `id` to `v`, triggering updates of all
1185    /// dependent node trees.
1186    pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
1187        let v = v.into();
1188        self.0.send(ToRt::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
1189    }
1190}
1191
1192#[derive(Builder)]
1193#[builder(pattern = "owned")]
1194pub struct GXConfig {
1195    /// The subscribe timeout to use when resolving modules in
1196    /// netidx. Resolution will fail if the subscription does not
1197    /// succeed before this timeout elapses.
1198    #[builder(setter(strip_option), default)]
1199    resolve_timeout: Option<Duration>,
1200    /// The publish timeout to use when sending published batches. Default None.
1201    #[builder(setter(strip_option), default)]
1202    publish_timeout: Option<Duration>,
1203    /// The execution context with any builtins already registered
1204    ctx: ExecCtx<GXCtx, NoUserEvent>,
1205    /// The text of the root module
1206    #[builder(setter(strip_option), default)]
1207    root: Option<ArcStr>,
1208    /// The set of module resolvers to use when resolving loaded modules
1209    #[builder(default)]
1210    resolvers: Vec<ModuleResolver>,
1211    /// The channel that will receive events from the runtime
1212    sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1213}
1214
1215impl GXConfig {
1216    /// Create a new config
1217    pub fn builder(
1218        ctx: ExecCtx<GXCtx, NoUserEvent>,
1219        sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1220    ) -> GXConfigBuilder {
1221        GXConfigBuilder::default().ctx(ctx).sub(sub)
1222    }
1223
1224    /// Start the graphix runtime with the specified config, return a
1225    /// handle capable of interacting with it.
1226    ///
1227    /// root is the text of the root module you wish to initially
1228    /// load. This will define the environment for the rest of the
1229    /// code compiled by this runtime. The runtime starts completely
1230    /// empty, with only the language, no core library, no standard
1231    /// library. To build a runtime with the full standard library and
1232    /// nothing else simply pass the output of
1233    /// `graphix_stdlib::register` to start.
1234    pub async fn start(self) -> Result<GXHandle> {
1235        let (init_tx, init_rx) = oneshot::channel();
1236        let (tx, rx) = tmpsc::unbounded_channel();
1237        task::spawn(async move {
1238            match GX::new(self).await {
1239                Ok(bs) => {
1240                    let _ = init_tx.send(Ok(()));
1241                    if let Err(e) = bs.run(rx).await {
1242                        error!("run loop exited with error {e:?}")
1243                    }
1244                }
1245                Err(e) => {
1246                    let _ = init_tx.send(Err(e));
1247                }
1248            };
1249        });
1250        init_rx.await??;
1251        Ok(GXHandle(tx))
1252    }
1253}