netidx_bscript/
rt.rs

1/// A general purpose bscript runtime
2///
3/// This module implements a generic bscript runtime suitable for most
4/// applications, including applications that implement custom bscript
5/// builtins. The bscript interperter is run in a background task, and
6/// can be interacted with via a handle. All features of the standard
7/// library are supported by this runtime.
8///
9/// In special cases where this runtime is not suitable for your
10/// application you can implement your own, see the [Ctx] and
11/// [UserEvent] traits.
12use crate::{
13    compile,
14    env::Env,
15    expr::{self, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin},
16    node::genn,
17    typ::{FnType, Type},
18    BindId, BuiltIn, Ctx, Event, ExecCtx, LambdaId, NoUserEvent, Node,
19};
20use anyhow::{anyhow, bail, Context, Result};
21use arcstr::{literal, ArcStr};
22use chrono::prelude::*;
23use combine::stream::position::SourcePosition;
24use compact_str::format_compact;
25use core::fmt;
26use derive_builder::Builder;
27use futures::{channel::mpsc, future::join_all, FutureExt, SinkExt, StreamExt};
28use fxhash::{FxBuildHasher, FxHashMap};
29use indexmap::IndexMap;
30use log::error;
31use netidx::{
32    path::Path,
33    pool::Pooled,
34    protocol::valarray::ValArray,
35    publisher::{self, Id, PublishFlags, Publisher, Typ, Val, Value, WriteRequest},
36    resolver_client::ChangeTracker,
37    subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
38};
39use netidx_protocols::rpc::{
40    self,
41    server::{ArgSpec, RpcCall},
42};
43use smallvec::{smallvec, SmallVec};
44use std::{
45    collections::{hash_map::Entry, HashMap, VecDeque},
46    future, mem,
47    os::unix::ffi::OsStrExt,
48    path::{Component, PathBuf},
49    sync::Weak,
50    time::Duration,
51};
52use tokio::{
53    fs, select,
54    sync::{oneshot, Mutex},
55    task::{self, JoinSet},
56    time::{self, Instant},
57};
58use triomphe::Arc;
59
60type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
61type WriteBatch = Pooled<Vec<WriteRequest>>;
62
63#[derive(Debug)]
64pub struct CouldNotResolve;
65
66impl fmt::Display for CouldNotResolve {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        write!(f, "could not resolve module")
69    }
70}
71
72#[derive(Debug)]
73struct RpcClient {
74    proc: rpc::client::Proc,
75    last_used: Instant,
76}
77
78#[derive(Debug)]
79pub struct BSCtx {
80    by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
81    subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
82    published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
83    var_updates: VecDeque<(BindId, Value)>,
84    net_updates: VecDeque<(SubId, subscriber::Event)>,
85    net_writes: VecDeque<(Id, WriteRequest)>,
86    rpc_overflow: VecDeque<(BindId, RpcCall)>,
87    rpc_clients: FxHashMap<Path, RpcClient>,
88    published_rpcs: FxHashMap<Path, rpc::server::Proc>,
89    pending_unsubscribe: VecDeque<(Instant, Dval)>,
90    change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
91    tasks: JoinSet<(BindId, Value)>,
92    batch: publisher::UpdateBatch,
93    publisher: Publisher,
94    subscriber: Subscriber,
95    updates_tx: mpsc::Sender<UpdateBatch>,
96    updates: mpsc::Receiver<UpdateBatch>,
97    writes_tx: mpsc::Sender<WriteBatch>,
98    writes: mpsc::Receiver<WriteBatch>,
99    rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
100    rpcs: mpsc::Receiver<(BindId, RpcCall)>,
101}
102
103impl BSCtx {
104    fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
105        let (updates_tx, updates) = mpsc::channel(100);
106        let (writes_tx, writes) = mpsc::channel(100);
107        let (rpcs_tx, rpcs) = mpsc::channel(100);
108        let batch = publisher.start_batch();
109        let mut tasks = JoinSet::new();
110        tasks.spawn(async { future::pending().await });
111        Self {
112            by_ref: HashMap::default(),
113            var_updates: VecDeque::new(),
114            net_updates: VecDeque::new(),
115            net_writes: VecDeque::new(),
116            rpc_overflow: VecDeque::new(),
117            rpc_clients: HashMap::default(),
118            subscribed: HashMap::default(),
119            pending_unsubscribe: VecDeque::new(),
120            published: HashMap::default(),
121            change_trackers: HashMap::default(),
122            published_rpcs: HashMap::default(),
123            tasks,
124            batch,
125            publisher,
126            subscriber,
127            updates,
128            updates_tx,
129            writes,
130            writes_tx,
131            rpcs_tx,
132            rpcs,
133        }
134    }
135}
136
137macro_rules! or_err {
138    ($bindid:expr, $e:expr) => {
139        match $e {
140            Ok(v) => v,
141            Err(e) => {
142                let e = ArcStr::from(format_compact!("{e:?}").as_str());
143                let e = Value::Error(e);
144                return ($bindid, e);
145            }
146        }
147    };
148}
149
150macro_rules! check_changed {
151    ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
152        let mut ct = $ct.lock().await;
153        if ct.path() != &$path {
154            *ct = ChangeTracker::new($path.clone());
155        }
156        if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
157            return ($id, Value::Null);
158        }
159    };
160}
161
162impl Ctx for BSCtx {
163    fn clear(&mut self) {
164        let Self {
165            by_ref,
166            var_updates,
167            net_updates,
168            net_writes,
169            rpc_clients,
170            rpc_overflow,
171            subscribed,
172            published,
173            published_rpcs,
174            pending_unsubscribe,
175            change_trackers,
176            tasks,
177            batch,
178            publisher,
179            subscriber: _,
180            updates_tx,
181            updates,
182            writes_tx,
183            writes,
184            rpcs,
185            rpcs_tx,
186        } = self;
187        by_ref.clear();
188        var_updates.clear();
189        net_updates.clear();
190        net_writes.clear();
191        rpc_overflow.clear();
192        rpc_clients.clear();
193        subscribed.clear();
194        published.clear();
195        published_rpcs.clear();
196        pending_unsubscribe.clear();
197        change_trackers.clear();
198        *tasks = JoinSet::new();
199        tasks.spawn(async { future::pending().await });
200        *batch = publisher.start_batch();
201        let (tx, rx) = mpsc::channel(3);
202        *updates_tx = tx;
203        *updates = rx;
204        let (tx, rx) = mpsc::channel(100);
205        *writes_tx = tx;
206        *writes = rx;
207        let (tx, rx) = mpsc::channel(100);
208        *rpcs_tx = tx;
209        *rpcs = rx
210    }
211
212    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
213        let now = Instant::now();
214        let proc = match self.rpc_clients.entry(name) {
215            Entry::Occupied(mut e) => {
216                let cl = e.get_mut();
217                cl.last_used = now;
218                Ok(cl.proc.clone())
219            }
220            Entry::Vacant(e) => {
221                match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
222                    Err(e) => Err(e),
223                    Ok(proc) => {
224                        let cl = RpcClient { last_used: now, proc: proc.clone() };
225                        e.insert(cl);
226                        Ok(proc)
227                    }
228                }
229            }
230        };
231        self.tasks.spawn(async move {
232            macro_rules! err {
233                ($e:expr) => {{
234                    let e = format_compact!("{:?}", $e);
235                    (id, Value::Error(e.as_str().into()))
236                }};
237            }
238            match proc {
239                Err(e) => err!(e),
240                Ok(proc) => match proc.call(args).await {
241                    Err(e) => err!(e),
242                    Ok(res) => (id, res),
243                },
244            }
245        });
246    }
247
248    fn publish_rpc(
249        &mut self,
250        name: Path,
251        doc: Value,
252        spec: Vec<ArgSpec>,
253        id: BindId,
254    ) -> Result<()> {
255        use rpc::server::Proc;
256        let e = match self.published_rpcs.entry(name) {
257            Entry::Vacant(e) => e,
258            Entry::Occupied(_) => bail!("already published"),
259        };
260        let proc = Proc::new(
261            &self.publisher,
262            e.key().clone(),
263            doc,
264            spec,
265            move |c| Some((id, c)),
266            Some(self.rpcs_tx.clone()),
267        )?;
268        e.insert(proc);
269        Ok(())
270    }
271
272    fn unpublish_rpc(&mut self, name: Path) {
273        self.published_rpcs.remove(&name);
274    }
275
276    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
277        let dval =
278            self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
279        *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
280        dval
281    }
282
283    fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
284        if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
285            if let Some(cn) = exprs.get_mut(&ref_by) {
286                *cn -= 1;
287                if *cn == 0 {
288                    exprs.remove(&ref_by);
289                }
290            }
291            if exprs.is_empty() {
292                self.subscribed.remove(&dv.id());
293            }
294        }
295        self.pending_unsubscribe.push_back((Instant::now(), dv));
296    }
297
298    fn list(&mut self, id: BindId, path: Path) {
299        let ct = self
300            .change_trackers
301            .entry(id)
302            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
303        let ct = Arc::clone(ct);
304        let resolver = self.subscriber.resolver();
305        self.tasks.spawn(async move {
306            check_changed!(id, resolver, path, ct);
307            let mut paths = or_err!(id, resolver.list(path).await);
308            let paths = paths.drain(..).map(|p| Value::String(p.into()));
309            (id, Value::Array(ValArray::from_iter_exact(paths)))
310        });
311    }
312
313    fn list_table(&mut self, id: BindId, path: Path) {
314        let ct = self
315            .change_trackers
316            .entry(id)
317            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
318        let ct = Arc::clone(ct);
319        let resolver = self.subscriber.resolver();
320        self.tasks.spawn(async move {
321            check_changed!(id, resolver, path, ct);
322            let mut tbl = or_err!(id, resolver.table(path).await);
323            let cols = tbl.cols.drain(..).map(|(name, count)| {
324                Value::Array(ValArray::from([
325                    Value::String(name.into()),
326                    Value::V64(count.0),
327                ]))
328            });
329            let cols = Value::Array(ValArray::from_iter_exact(cols));
330            let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
331            let rows = Value::Array(ValArray::from_iter_exact(rows));
332            let tbl = Value::Array(ValArray::from([
333                Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
334                Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
335            ]));
336            (id, tbl)
337        });
338    }
339
340    fn stop_list(&mut self, id: BindId) {
341        self.change_trackers.remove(&id);
342    }
343
344    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
345        let val = self.publisher.publish_with_flags_and_writes(
346            PublishFlags::empty(),
347            path,
348            value,
349            Some(self.writes_tx.clone()),
350        )?;
351        let id = val.id();
352        *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
353        Ok(val)
354    }
355
356    fn update(&mut self, val: &Val, value: Value) {
357        val.update(&mut self.batch, value);
358    }
359
360    fn unpublish(&mut self, val: Val, ref_by: ExprId) {
361        if let Some(refs) = self.published.get_mut(&val.id()) {
362            if let Some(cn) = refs.get_mut(&ref_by) {
363                *cn -= 1;
364                if *cn == 0 {
365                    refs.remove(&ref_by);
366                }
367            }
368            if refs.is_empty() {
369                self.published.remove(&val.id());
370            }
371        }
372    }
373
374    fn set_timer(&mut self, id: BindId, timeout: Duration) {
375        self.tasks
376            .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
377    }
378
379    fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
380        *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
381    }
382
383    fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
384        if let Some(refs) = self.by_ref.get_mut(&id) {
385            if let Some(cn) = refs.get_mut(&ref_by) {
386                *cn -= 1;
387                if *cn == 0 {
388                    refs.remove(&ref_by);
389                }
390            }
391            if refs.is_empty() {
392                self.by_ref.remove(&id);
393            }
394        }
395    }
396
397    fn set_var(&mut self, id: BindId, value: Value) {
398        self.var_updates.push_back((id, value.clone()));
399    }
400}
401
402fn is_output(n: &Node<BSCtx, NoUserEvent>) -> bool {
403    match &n.spec().kind {
404        ExprKind::Bind { .. }
405        | ExprKind::Lambda { .. }
406        | ExprKind::Use { .. }
407        | ExprKind::Connect { .. }
408        | ExprKind::Module { .. }
409        | ExprKind::TypeDef { .. } => false,
410        _ => true,
411    }
412}
413
414async fn or_never(b: bool) {
415    if !b {
416        future::pending().await
417    }
418}
419
420async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
421    if go {
422        match ch.next().await {
423            None => future::pending().await,
424            Some(v) => v,
425        }
426    } else {
427        future::pending().await
428    }
429}
430
431async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
432    if pending.len() == 0 {
433        future::pending().await
434    } else {
435        let (ts, _) = pending.front().unwrap();
436        let one = Duration::from_secs(1);
437        let elapsed = now - *ts;
438        if elapsed < one {
439            time::sleep(one - elapsed).await
440        }
441    }
442}
443
444#[derive(Clone)]
445pub struct CompExp {
446    pub id: ExprId,
447    pub typ: Type,
448    pub output: bool,
449}
450
451#[derive(Clone)]
452pub struct CompRes {
453    pub exprs: SmallVec<[CompExp; 1]>,
454    pub env: Env<BSCtx, NoUserEvent>,
455}
456
457#[derive(Clone)]
458pub enum RtEvent {
459    Updated(ExprId, Value),
460}
461
462atomic_id!(CallableId);
463
464pub struct Callable {
465    rt: BSHandle,
466    id: CallableId,
467    env: Env<BSCtx, NoUserEvent>,
468    pub typ: FnType,
469    pub expr: ExprId,
470}
471
472impl Drop for Callable {
473    fn drop(&mut self) {
474        let _ = self.rt.0.unbounded_send(ToRt::DeleteCallable { id: self.id });
475    }
476}
477
478impl Callable {
479    /// Call the lambda with args. Argument types and arity will be
480    /// checked and an error will be returned if they are wrong.
481    pub async fn call(&self, args: ValArray) -> Result<()> {
482        if self.typ.args.len() != args.len() {
483            bail!("expected {} args", self.typ.args.len())
484        }
485        for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
486            if !a.typ.is_a(&self.env, v) {
487                bail!("type mismatch arg {i} expected {}", a.typ)
488            }
489        }
490        self.call_unchecked(args).await
491    }
492
493    /// Call the lambda with args. Argument types and arity will NOT
494    /// be checked. This can result in a runtime panic, invalid
495    /// results, and probably other bad things.
496    pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
497        self.rt
498            .0
499            .unbounded_send(ToRt::Call { id: self.id, args })
500            .map_err(|_| anyhow!("runtime is dead"))
501    }
502}
503
504enum ToRt {
505    GetEnv { res: oneshot::Sender<Env<BSCtx, NoUserEvent>> },
506    Delete { id: ExprId, res: oneshot::Sender<Result<Env<BSCtx, NoUserEvent>>> },
507    Load { path: PathBuf, res: oneshot::Sender<Result<CompRes>> },
508    Compile { text: ArcStr, res: oneshot::Sender<Result<CompRes>> },
509    CompileCallable { id: Value, rt: BSHandle, res: oneshot::Sender<Result<Callable>> },
510    CompileRef { id: Value, res: oneshot::Sender<Result<ExprId>> },
511    Call { id: CallableId, args: ValArray },
512    DeleteCallable { id: CallableId },
513    Subscribe { ch: mpsc::Sender<RtEvent> },
514}
515
516struct CallableInt {
517    expr: ExprId,
518    args: Box<[BindId]>,
519}
520
521// setting expectations is one of the most under rated skills in
522// software engineering
523struct BS {
524    ctx: ExecCtx<BSCtx, NoUserEvent>,
525    event: Event<NoUserEvent>,
526    updated: FxHashMap<ExprId, bool>,
527    nodes: IndexMap<ExprId, Node<BSCtx, NoUserEvent>, FxBuildHasher>,
528    callables: FxHashMap<CallableId, CallableInt>,
529    subs: Vec<mpsc::Sender<RtEvent>>,
530    resolvers: Vec<ModuleResolver>,
531    publish_timeout: Option<Duration>,
532    last_rpc_gc: Instant,
533}
534
535impl BS {
536    fn new(mut rt: BSConfig) -> Self {
537        let resolvers_default = || match dirs::data_dir() {
538            None => vec![ModuleResolver::Files("".into())],
539            Some(dd) => vec![
540                ModuleResolver::Files("".into()),
541                ModuleResolver::Files(dd.join("bscript")),
542            ],
543        };
544        let resolvers = match std::env::var("BSCRIPT_MODPATH") {
545            Err(_) => resolvers_default(),
546            Ok(mp) => match ModuleResolver::parse_env(
547                rt.subscriber.clone(),
548                rt.subscribe_timeout,
549                &mp,
550            ) {
551                Ok(r) => r,
552                Err(e) => {
553                    error!("failed to parse BSCRIPT_MODPATH, using default {e:?}");
554                    resolvers_default()
555                }
556            },
557        };
558        let mut event = Event::new(NoUserEvent);
559        let mut ctx = match rt.ctx.take() {
560            Some(ctx) => ctx,
561            None => ExecCtx::new(BSCtx::new(rt.publisher, rt.subscriber)),
562        };
563        event.init = true;
564        let mut std = mem::take(&mut ctx.std);
565        for n in std.iter_mut() {
566            let _ = n.update(&mut ctx, &mut event);
567        }
568        ctx.std = std;
569        event.init = false;
570        Self {
571            ctx,
572            event,
573            updated: HashMap::default(),
574            nodes: IndexMap::default(),
575            callables: HashMap::default(),
576            subs: vec![],
577            resolvers,
578            publish_timeout: rt.publish_timeout,
579            last_rpc_gc: Instant::now(),
580        }
581    }
582
583    async fn do_cycle(
584        &mut self,
585        updates: Option<UpdateBatch>,
586        writes: Option<WriteBatch>,
587        tasks: &mut Vec<(BindId, Value)>,
588        rpcs: &mut Vec<(BindId, RpcCall)>,
589    ) {
590        macro_rules! push_event {
591            ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
592                match self.event.$event.entry($id) {
593                    Entry::Vacant(e) => {
594                        e.insert($v);
595                        if let Some(exps) = self.ctx.user.$refed.get(&$id) {
596                            for id in exps.keys() {
597                                self.updated.entry(*id).or_insert(false);
598                            }
599                        }
600                    }
601                    Entry::Occupied(_) => {
602                        self.ctx.user.$overflow.push_back(($id, $v));
603                    }
604                }
605            };
606        }
607        for _ in 0..self.ctx.user.var_updates.len() {
608            let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
609            push_event!(id, v, variables, by_ref, var_updates)
610        }
611        for (id, v) in tasks.drain(..) {
612            push_event!(id, v, variables, by_ref, var_updates)
613        }
614        for _ in 0..self.ctx.user.rpc_overflow.len() {
615            let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
616            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
617        }
618        for (id, v) in rpcs.drain(..) {
619            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
620        }
621        for _ in 0..self.ctx.user.net_updates.len() {
622            let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
623            push_event!(id, v, netidx, subscribed, net_updates)
624        }
625        if let Some(mut updates) = updates {
626            for (id, v) in updates.drain(..) {
627                push_event!(id, v, netidx, subscribed, net_updates)
628            }
629        }
630        for _ in 0..self.ctx.user.net_writes.len() {
631            let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
632            push_event!(id, v, writes, published, net_writes)
633        }
634        if let Some(mut writes) = writes {
635            for wr in writes.drain(..) {
636                let id = wr.id;
637                push_event!(id, wr, writes, published, net_writes)
638            }
639        }
640        for (id, n) in self.nodes.iter_mut() {
641            if let Some(init) = self.updated.get(id) {
642                let mut clear: SmallVec<[BindId; 16]> = smallvec![];
643                self.event.init = *init;
644                if self.event.init {
645                    n.refs(&mut |id| {
646                        if let Some(v) = self.ctx.cached.get(&id) {
647                            if let Entry::Vacant(e) = self.event.variables.entry(id) {
648                                e.insert(v.clone());
649                                clear.push(id);
650                            }
651                        }
652                    });
653                }
654                let res = n.update(&mut self.ctx, &mut self.event);
655                for id in clear {
656                    self.event.variables.remove(&id);
657                }
658                if let Some(v) = res {
659                    let mut i = 0;
660                    while i < self.subs.len() {
661                        if let Err(_) =
662                            self.subs[i].send(RtEvent::Updated(*id, v.clone())).await
663                        {
664                            self.subs.remove(i);
665                        }
666                        i += 1;
667                    }
668                }
669            }
670        }
671        self.event.clear();
672        self.updated.clear();
673        if self.ctx.user.batch.len() > 0 {
674            let batch = mem::replace(
675                &mut self.ctx.user.batch,
676                self.ctx.user.publisher.start_batch(),
677            );
678            let timeout = self.publish_timeout;
679            task::spawn(async move { batch.commit(timeout).await });
680        }
681    }
682
683    fn cycle_ready(&self) -> bool {
684        self.ctx.user.var_updates.len() > 0
685            || self.ctx.user.net_updates.len() > 0
686            || self.ctx.user.net_writes.len() > 0
687            || self.ctx.user.rpc_overflow.len() > 0
688    }
689
690    async fn compile(&mut self, text: ArcStr) -> Result<CompRes> {
691        let scope = ModPath::root();
692        let ori = expr::parser::parse(None, text.clone())?;
693        let exprs = join_all(
694            ori.exprs.iter().map(|e| e.resolve_modules(&scope, &self.resolvers)),
695        )
696        .await
697        .into_iter()
698        .collect::<Result<SmallVec<[_; 4]>>>()
699        .context(CouldNotResolve)?;
700        let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
701        let nodes = ori
702            .exprs
703            .iter()
704            .map(|e| compile(&mut self.ctx, &scope, e.clone()))
705            .collect::<Result<SmallVec<[_; 4]>>>()
706            .with_context(|| ori.clone())?;
707        let exprs = ori
708            .exprs
709            .iter()
710            .zip(nodes.into_iter())
711            .map(|(e, n)| {
712                let output = is_output(&n);
713                let typ = n.typ().clone();
714                self.updated.insert(e.id, true);
715                self.nodes.insert(e.id, n);
716                CompExp { id: e.id, output, typ }
717            })
718            .collect::<SmallVec<[_; 1]>>();
719        Ok(CompRes { exprs, env: self.ctx.env.clone() })
720    }
721
722    async fn load(&mut self, file: &PathBuf) -> Result<CompRes> {
723        let scope = ModPath::root();
724        let (scope, ori) = match file.extension() {
725            Some(e) if e.as_bytes() == b"bs" => {
726                let scope = match file.file_name() {
727                    None => scope,
728                    Some(name) => ModPath(scope.append(&*name.to_string_lossy())),
729                };
730                let s = fs::read_to_string(file).await?;
731                let s = if s.starts_with("#!") {
732                    if let Some(i) = s.find('\n') {
733                        &s[i..]
734                    } else {
735                        s.as_str()
736                    }
737                } else {
738                    s.as_str()
739                };
740                let s = ArcStr::from(s);
741                let name = ArcStr::from(file.to_string_lossy());
742                (scope, expr::parser::parse(Some(name), s)?)
743            }
744            Some(e) => bail!("invalid file extension {e:?}"),
745            None => {
746                let name = file
747                    .components()
748                    .map(|c| match c {
749                        Component::RootDir
750                        | Component::CurDir
751                        | Component::ParentDir
752                        | Component::Prefix(_) => bail!("invalid module name {file:?}"),
753                        Component::Normal(s) => Ok(s),
754                    })
755                    .collect::<Result<Box<[_]>>>()?;
756                if name.len() != 1 {
757                    bail!("invalid module name {file:?}")
758                }
759                let name = String::from_utf8_lossy(name[0].as_bytes());
760                let name = name
761                    .parse::<ModPath>()
762                    .with_context(|| "parsing module name {file:?}")?;
763                let scope =
764                    ModPath(Path::from_str(Path::dirname(&*name).unwrap_or(&*scope)));
765                let name = Path::basename(&*name)
766                    .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
767                let name = ArcStr::from(name);
768                let e = ExprKind::Module {
769                    export: true,
770                    name: name.clone(),
771                    value: ModuleKind::Unresolved,
772                }
773                .to_expr(Default::default());
774                let ori = Origin {
775                    name: Some(name),
776                    source: literal!(""),
777                    exprs: Arc::from_iter([e]),
778                };
779                (scope, ori)
780            }
781        };
782        let expr = ExprKind::Module {
783            export: true,
784            name: ori.name.clone().unwrap_or(literal!("")),
785            value: ModuleKind::Inline(ori.exprs.clone()),
786        }
787        .to_expr(SourcePosition::default());
788        let expr = expr
789            .resolve_modules(&scope, &self.resolvers)
790            .await
791            .with_context(|| ori.clone())?;
792        let top_id = expr.id;
793        let n = compile(&mut self.ctx, &scope, expr).with_context(|| ori.clone())?;
794        let has_out = is_output(&n);
795        let typ = n.typ().clone();
796        self.nodes.insert(top_id, n);
797        self.updated.insert(top_id, true);
798        let exprs = smallvec![CompExp { id: top_id, output: has_out, typ }];
799        Ok(CompRes { exprs, env: self.ctx.env.clone() })
800    }
801
802    fn compile_callable(&mut self, id: Value, rt: BSHandle) -> Result<Callable> {
803        let id = match id {
804            Value::U64(id) => LambdaId(id),
805            v => bail!("invalid lambda id {v}"),
806        };
807        let lb = self.ctx.env.lambdas.get(&id).and_then(Weak::upgrade);
808        let lb = lb.ok_or_else(|| anyhow!("unknown lambda {id:?}"))?;
809        let args = lb.typ.args.iter();
810        let args = args
811            .map(|a| {
812                if a.label.as_ref().map(|(_, opt)| *opt).unwrap_or(false) {
813                    bail!("can't call lambda with an optional argument from rust")
814                } else {
815                    Ok(BindId::new())
816                }
817            })
818            .collect::<Result<Box<[_]>>>()?;
819        let eid = ExprId::new();
820        let argn = lb.typ.args.iter().zip(args.iter());
821        let argn = argn
822            .map(|(arg, id)| {
823                genn::reference(&mut self.ctx, *id, arg.typ.clone(), ExprId::new())
824            })
825            .collect::<Vec<_>>();
826        let fnode = genn::constant(Value::U64(id.0));
827        let n = genn::apply(fnode, argn, lb.typ.clone(), eid);
828        let cid = CallableId::new();
829        self.callables.insert(cid, CallableInt { expr: eid, args });
830        self.nodes.insert(eid, n);
831        let env = self.ctx.env.clone();
832        Ok(Callable { expr: eid, rt, env, id: cid, typ: (*lb.typ).clone() })
833    }
834
835    fn compile_ref(&mut self, id: Value) -> Result<ExprId> {
836        let id = match id {
837            Value::U64(id) => BindId(id),
838            v => bail!("invalid bind id {v}"),
839        };
840        let eid = ExprId::new();
841        let typ = Type::Primitive(Typ::any());
842        let n = genn::reference(&mut self.ctx, id, typ, eid);
843        self.nodes.insert(eid, n);
844        Ok(eid)
845    }
846
847    fn call_callable(
848        &mut self,
849        id: CallableId,
850        args: ValArray,
851        tasks: &mut Vec<(BindId, Value)>,
852    ) -> Result<()> {
853        let c =
854            self.callables.get(&id).ok_or_else(|| anyhow!("unknown callable {id:?}"))?;
855        if args.len() != c.args.len() {
856            bail!("expected {} arguments", c.args.len());
857        }
858        let a = c.args.iter().zip(args.iter()).map(|(id, v)| (*id, v.clone()));
859        tasks.extend(a);
860        Ok(())
861    }
862
863    fn delete_callable(&mut self, id: CallableId) {
864        if let Some(c) = self.callables.remove(&id) {
865            if let Some(mut n) = self.nodes.shift_remove(&c.expr) {
866                n.delete(&mut self.ctx)
867            }
868        }
869    }
870
871    async fn run(mut self, mut to_rt: mpsc::UnboundedReceiver<ToRt>) -> Result<()> {
872        let mut tasks = vec![];
873        let mut rpcs = vec![];
874        let onemin = Duration::from_secs(60);
875        'main: loop {
876            let now = Instant::now();
877            let ready = self.cycle_ready();
878            let mut updates = None;
879            let mut writes = None;
880            let mut input = None;
881            macro_rules! peek {
882                (updates) => {
883                    if self.ctx.user.net_updates.is_empty() {
884                        while let Ok(Some(mut up)) = self.ctx.user.updates.try_next() {
885                            match &mut updates {
886                                None => updates = Some(up),
887                                Some(prev) => prev.extend(up.drain(..)),
888                            }
889                        }
890                    }
891                };
892                (writes) => {
893                    if self.ctx.user.net_writes.is_empty() {
894                        if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
895                            writes = Some(wr);
896                        }
897                    }
898                };
899                (tasks) => {
900                    while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
901                        tasks.push(up);
902                    }
903                };
904                (rpcs) => {
905                    if self.ctx.user.rpc_overflow.is_empty() {
906                        while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
907                            rpcs.push(up);
908                        }
909                    }
910                };
911                ($($item:tt),+) => {{
912                    $(peek!($item));+
913                }};
914            }
915            select! {
916                rp = maybe_next(self.ctx.user.rpc_overflow.is_empty(), &mut self.ctx.user.rpcs) => {
917                    rpcs.push(rp);
918                    peek!(updates, tasks, writes, rpcs)
919                }
920                wr = maybe_next(self.ctx.user.net_writes.is_empty(), &mut self.ctx.user.writes) => {
921                    writes = Some(wr);
922                    peek!(updates, tasks, rpcs);
923                },
924                up = maybe_next(self.ctx.user.net_updates.is_empty(), &mut self.ctx.user.updates) => {
925                    updates = Some(up);
926                    peek!(updates, writes, tasks, rpcs);
927                },
928                up = self.ctx.user.tasks.join_next() => {
929                    if let Some(Ok(up)) = up {
930                        tasks.push(up);
931                    }
932                    peek!(updates, writes, tasks, rpcs)
933                },
934                n = to_rt.next() => match n {
935                    None => break 'main Ok(()),
936                    Some(i) => {
937                        peek!(updates, writes, tasks, rpcs);
938                        input = Some(i);
939                    }
940                },
941                _ = or_never(ready) => peek!(updates, writes, tasks, rpcs),
942                () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
943                    while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
944                        if ts.elapsed() >= Duration::from_secs(1) {
945                            self.ctx.user.pending_unsubscribe.pop_front();
946                        } else {
947                            break
948                        }
949                    }
950                    continue 'main
951                }
952            }
953            match input {
954                None => (),
955                Some(ToRt::GetEnv { res }) => {
956                    let _ = res.send(self.ctx.env.clone());
957                }
958                Some(ToRt::Compile { text, res }) => {
959                    let _ = res.send(self.compile(text).await);
960                }
961                Some(ToRt::Load { path, res }) => {
962                    let _ = res.send(self.load(&path).await);
963                }
964                Some(ToRt::Subscribe { ch }) => {
965                    self.subs.push(ch);
966                }
967                Some(ToRt::Delete { id, res }) => {
968                    // CR estokes: Check dependencies
969                    if let Some(mut n) = self.nodes.shift_remove(&id) {
970                        n.delete(&mut self.ctx);
971                    }
972                    let _ = res.send(Ok(self.ctx.env.clone()));
973                }
974                Some(ToRt::CompileCallable { id, rt, res }) => {
975                    let _ = res.send(self.compile_callable(id, rt));
976                }
977                Some(ToRt::CompileRef { id, res }) => {
978                    let _ = res.send(self.compile_ref(id));
979                }
980                Some(ToRt::DeleteCallable { id }) => self.delete_callable(id),
981                Some(ToRt::Call { id, args }) => {
982                    if let Err(e) = self.call_callable(id, args, &mut tasks) {
983                        error!("calling callable {id:?} failed with {e:?}")
984                    }
985                }
986            }
987            self.do_cycle(updates, writes, &mut tasks, &mut rpcs).await;
988            if !self.ctx.user.rpc_clients.is_empty() {
989                if now - self.last_rpc_gc >= onemin {
990                    self.last_rpc_gc = now;
991                    self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
992                }
993            }
994        }
995    }
996}
997
998/// A handle to a running BS instance. Drop the handle to shutdown the
999/// associated background tasks.
1000#[derive(Clone)]
1001pub struct BSHandle(mpsc::UnboundedSender<ToRt>);
1002
1003impl BSHandle {
1004    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
1005        let (tx, rx) = oneshot::channel();
1006        self.0.unbounded_send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
1007        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
1008    }
1009
1010    /// Get a copy of the current bscript environment
1011    pub async fn get_env(&self) -> Result<Env<BSCtx, NoUserEvent>> {
1012        self.exec(|res| ToRt::GetEnv { res }).await
1013    }
1014
1015    /// Compile and execute the specified bscript expression. It can
1016    /// either be a module expression or a single expression. If it
1017    /// generates results, they will be sent to all the channels that
1018    /// are subscribed.
1019    pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
1020        Ok(self.exec(|tx| ToRt::Compile { text, res: tx }).await??)
1021    }
1022
1023    /// Load and execute the specified bscript module. The path may
1024    /// have one of two forms. If it is the path to a file with
1025    /// extension .bs then the rt will load the file directly. If it
1026    /// is a modpath (e.g. foo::bar::baz) then the module resolver
1027    /// will look for a matching module in the modpath.
1028    pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
1029        Ok(self.exec(|tx| ToRt::Load { path, res: tx }).await??)
1030    }
1031
1032    /// Delete the specified expression
1033    pub async fn delete(&self, id: ExprId) -> Result<Env<BSCtx, NoUserEvent>> {
1034        Ok(self.exec(|tx| ToRt::Delete { id, res: tx }).await??)
1035    }
1036
1037    /// Compile a callable interface to the specified lambda id. This
1038    /// is how you call a lambda directly from rust.
1039    pub async fn compile_callable(&self, id: Value) -> Result<Callable> {
1040        Ok(self
1041            .exec(|tx| ToRt::CompileCallable { id, rt: self.clone(), res: tx })
1042            .await??)
1043    }
1044
1045    /// Compile an expression that will output the value of the ref
1046    /// specifed by id. This is the same as the deref (*) operator in
1047    /// bscript.
1048    pub async fn compile_ref(&self, id: Value) -> Result<ExprId> {
1049        Ok(self.exec(|tx| ToRt::CompileRef { id, res: tx }).await??)
1050    }
1051
1052    /// The specified channel will receive events generated by all
1053    /// bscript expressions being run by the runtime. To unsubscribe,
1054    /// just drop the receiver.
1055    pub fn subscribe(&self, ch: mpsc::Sender<RtEvent>) -> Result<()> {
1056        self.0
1057            .unbounded_send(ToRt::Subscribe { ch })
1058            .map_err(|_| anyhow!("runtime is dead"))
1059    }
1060}
1061
1062#[derive(Builder)]
1063pub struct BSConfig {
1064    publisher: Publisher,
1065    subscriber: Subscriber,
1066    #[builder(setter(strip_option), default)]
1067    subscribe_timeout: Option<Duration>,
1068    #[builder(setter(strip_option), default)]
1069    publish_timeout: Option<Duration>,
1070    #[builder(setter(skip))]
1071    ctx: Option<ExecCtx<BSCtx, NoUserEvent>>,
1072}
1073
1074impl BSConfig {
1075    /// Register a builtin bscript function
1076    pub fn register_builtin<T: BuiltIn<BSCtx, NoUserEvent>>(&mut self) -> Result<()> {
1077        let ctx = match self.ctx.as_mut() {
1078            Some(ctx) => ctx,
1079            None => {
1080                self.ctx = Some(ExecCtx::new(BSCtx::new(
1081                    self.publisher.clone(),
1082                    self.subscriber.clone(),
1083                )));
1084                self.ctx.as_mut().unwrap()
1085            }
1086        };
1087        ctx.register_builtin::<T>()
1088    }
1089
1090    /// Start the BS runtime with the specified config, return a
1091    /// handle capable of interacting with it.
1092    pub fn start(self) -> BSHandle {
1093        let (tx, rx) = mpsc::unbounded();
1094        task::spawn(async move {
1095            let bs = BS::new(self);
1096            if let Err(e) = bs.run(rx).await {
1097                error!("run loop exited with error {e:?}")
1098            }
1099        });
1100        BSHandle(tx)
1101    }
1102}