netidx_bscript/
rt.rs

1/// A general purpose bscript runtime
2///
3/// This module implements a generic bscript runtime suitable for most
4/// applications, including applications that implement custom bscript
5/// builtins. The bscript interperter is run in a background task, and
6/// can be interacted with via a handle. All features of the standard
7/// library are supported by this runtime.
8///
9/// In special cases where this runtime is not suitable for your
10/// application you can implement your own, see the [Ctx] and
11/// [UserEvent] traits.
12use crate::{
13    env::Env,
14    expr::{self, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin},
15    node::{Node, NodeKind},
16    typ::{NoRefs, Type},
17    BindId, BuiltIn, Ctx, Event, ExecCtx, NoUserEvent,
18};
19use anyhow::{anyhow, bail, Context, Result};
20use arcstr::{literal, ArcStr};
21use chrono::prelude::*;
22use combine::stream::position::SourcePosition;
23use compact_str::format_compact;
24use core::fmt;
25use derive_builder::Builder;
26use futures::{channel::mpsc, future::join_all, FutureExt, SinkExt, StreamExt};
27use fxhash::{FxBuildHasher, FxHashMap};
28use indexmap::IndexMap;
29use log::error;
30use netidx::{
31    path::Path,
32    pool::Pooled,
33    protocol::valarray::ValArray,
34    publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
35    resolver_client::ChangeTracker,
36    subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
37};
38use netidx_protocols::rpc::{
39    self,
40    server::{ArgSpec, RpcCall},
41};
42use smallvec::{smallvec, SmallVec};
43use std::{
44    collections::{hash_map::Entry, HashMap, VecDeque},
45    future, mem,
46    os::unix::ffi::OsStrExt,
47    path::{Component, PathBuf},
48    time::Duration,
49};
50use tokio::{
51    fs, select,
52    sync::{oneshot, Mutex},
53    task::{self, JoinSet},
54    time::{self, Instant},
55};
56use triomphe::Arc;
57
58type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
59type WriteBatch = Pooled<Vec<WriteRequest>>;
60
61#[derive(Debug)]
62pub struct CouldNotResolve;
63
64impl fmt::Display for CouldNotResolve {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        write!(f, "could not resolve module")
67    }
68}
69
70#[derive(Debug)]
71struct RpcClient {
72    proc: rpc::client::Proc,
73    last_used: Instant,
74}
75
76#[derive(Debug)]
77pub struct BSCtx {
78    by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
79    subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
80    published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
81    var_updates: VecDeque<(BindId, Value)>,
82    net_updates: VecDeque<(SubId, subscriber::Event)>,
83    net_writes: VecDeque<(Id, WriteRequest)>,
84    rpc_overflow: VecDeque<(BindId, RpcCall)>,
85    rpc_clients: FxHashMap<Path, RpcClient>,
86    published_rpcs: FxHashMap<Path, rpc::server::Proc>,
87    pending_unsubscribe: VecDeque<(Instant, Dval)>,
88    change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
89    tasks: JoinSet<(BindId, Value)>,
90    batch: publisher::UpdateBatch,
91    publisher: Publisher,
92    subscriber: Subscriber,
93    updates_tx: mpsc::Sender<UpdateBatch>,
94    updates: mpsc::Receiver<UpdateBatch>,
95    writes_tx: mpsc::Sender<WriteBatch>,
96    writes: mpsc::Receiver<WriteBatch>,
97    rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
98    rpcs: mpsc::Receiver<(BindId, RpcCall)>,
99}
100
101impl BSCtx {
102    fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
103        let (updates_tx, updates) = mpsc::channel(3);
104        let (writes_tx, writes) = mpsc::channel(100);
105        let (rpcs_tx, rpcs) = mpsc::channel(100);
106        let batch = publisher.start_batch();
107        let mut tasks = JoinSet::new();
108        tasks.spawn(async { future::pending().await });
109        Self {
110            by_ref: HashMap::default(),
111            var_updates: VecDeque::new(),
112            net_updates: VecDeque::new(),
113            net_writes: VecDeque::new(),
114            rpc_overflow: VecDeque::new(),
115            rpc_clients: HashMap::default(),
116            subscribed: HashMap::default(),
117            pending_unsubscribe: VecDeque::new(),
118            published: HashMap::default(),
119            change_trackers: HashMap::default(),
120            published_rpcs: HashMap::default(),
121            tasks,
122            batch,
123            publisher,
124            subscriber,
125            updates,
126            updates_tx,
127            writes,
128            writes_tx,
129            rpcs_tx,
130            rpcs,
131        }
132    }
133}
134
135macro_rules! or_err {
136    ($bindid:expr, $e:expr) => {
137        match $e {
138            Ok(v) => v,
139            Err(e) => {
140                let e = ArcStr::from(format_compact!("{e:?}").as_str());
141                let e = Value::Error(e);
142                return ($bindid, e);
143            }
144        }
145    };
146}
147
148macro_rules! check_changed {
149    ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
150        let mut ct = $ct.lock().await;
151        if ct.path() != &$path {
152            *ct = ChangeTracker::new($path.clone());
153        }
154        if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
155            return ($id, Value::Null);
156        }
157    };
158}
159
160impl Ctx for BSCtx {
161    fn clear(&mut self) {
162        let Self {
163            by_ref,
164            var_updates,
165            net_updates,
166            net_writes,
167            rpc_clients,
168            rpc_overflow,
169            subscribed,
170            published,
171            published_rpcs,
172            pending_unsubscribe,
173            change_trackers,
174            tasks,
175            batch,
176            publisher,
177            subscriber: _,
178            updates_tx,
179            updates,
180            writes_tx,
181            writes,
182            rpcs,
183            rpcs_tx,
184        } = self;
185        by_ref.clear();
186        var_updates.clear();
187        net_updates.clear();
188        net_writes.clear();
189        rpc_overflow.clear();
190        rpc_clients.clear();
191        subscribed.clear();
192        published.clear();
193        published_rpcs.clear();
194        pending_unsubscribe.clear();
195        change_trackers.clear();
196        *tasks = JoinSet::new();
197        tasks.spawn(async { future::pending().await });
198        *batch = publisher.start_batch();
199        let (tx, rx) = mpsc::channel(3);
200        *updates_tx = tx;
201        *updates = rx;
202        let (tx, rx) = mpsc::channel(100);
203        *writes_tx = tx;
204        *writes = rx;
205        let (tx, rx) = mpsc::channel(100);
206        *rpcs_tx = tx;
207        *rpcs = rx
208    }
209
210    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
211        let now = Instant::now();
212        let proc = match self.rpc_clients.entry(name) {
213            Entry::Occupied(mut e) => {
214                let cl = e.get_mut();
215                cl.last_used = now;
216                Ok(cl.proc.clone())
217            }
218            Entry::Vacant(e) => {
219                match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
220                    Err(e) => Err(e),
221                    Ok(proc) => {
222                        let cl = RpcClient { last_used: now, proc: proc.clone() };
223                        e.insert(cl);
224                        Ok(proc)
225                    }
226                }
227            }
228        };
229        self.tasks.spawn(async move {
230            macro_rules! err {
231                ($e:expr) => {{
232                    let e = format_compact!("{:?}", $e);
233                    (id, Value::Error(e.as_str().into()))
234                }};
235            }
236            match proc {
237                Err(e) => err!(e),
238                Ok(proc) => match proc.call(args).await {
239                    Err(e) => err!(e),
240                    Ok(res) => (id, res),
241                },
242            }
243        });
244    }
245
246    fn publish_rpc(
247        &mut self,
248        name: Path,
249        doc: Value,
250        spec: Vec<ArgSpec>,
251        id: BindId,
252    ) -> Result<()> {
253        use rpc::server::Proc;
254        let e = match self.published_rpcs.entry(name) {
255            Entry::Vacant(e) => e,
256            Entry::Occupied(_) => bail!("already published"),
257        };
258        let proc = Proc::new(
259            &self.publisher,
260            e.key().clone(),
261            doc,
262            spec,
263            move |c| Some((id, c)),
264            Some(self.rpcs_tx.clone()),
265        )?;
266        e.insert(proc);
267        Ok(())
268    }
269
270    fn unpublish_rpc(&mut self, name: Path) {
271        self.published_rpcs.remove(&name);
272    }
273
274    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
275        let dval =
276            self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
277        *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
278        dval
279    }
280
281    fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
282        if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
283            if let Some(cn) = exprs.get_mut(&ref_by) {
284                *cn -= 1;
285                if *cn == 0 {
286                    exprs.remove(&ref_by);
287                }
288            }
289            if exprs.is_empty() {
290                self.subscribed.remove(&dv.id());
291            }
292        }
293        self.pending_unsubscribe.push_back((Instant::now(), dv));
294    }
295
296    fn list(&mut self, id: BindId, path: Path) {
297        let ct = self
298            .change_trackers
299            .entry(id)
300            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
301        let ct = Arc::clone(ct);
302        let resolver = self.subscriber.resolver();
303        self.tasks.spawn(async move {
304            check_changed!(id, resolver, path, ct);
305            let mut paths = or_err!(id, resolver.list(path).await);
306            let paths = paths.drain(..).map(|p| Value::String(p.into()));
307            (id, Value::Array(ValArray::from_iter_exact(paths)))
308        });
309    }
310
311    fn list_table(&mut self, id: BindId, path: Path) {
312        let ct = self
313            .change_trackers
314            .entry(id)
315            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
316        let ct = Arc::clone(ct);
317        let resolver = self.subscriber.resolver();
318        self.tasks.spawn(async move {
319            check_changed!(id, resolver, path, ct);
320            let mut tbl = or_err!(id, resolver.table(path).await);
321            let cols = tbl.cols.drain(..).map(|(name, count)| {
322                Value::Array(ValArray::from([
323                    Value::String(name.into()),
324                    Value::V64(count.0),
325                ]))
326            });
327            let cols = Value::Array(ValArray::from_iter_exact(cols));
328            let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
329            let rows = Value::Array(ValArray::from_iter_exact(rows));
330            let tbl = Value::Array(ValArray::from([
331                Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
332                Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
333            ]));
334            (id, tbl)
335        });
336    }
337
338    fn stop_list(&mut self, id: BindId) {
339        self.change_trackers.remove(&id);
340    }
341
342    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
343        let val = self.publisher.publish_with_flags_and_writes(
344            PublishFlags::empty(),
345            path,
346            value,
347            Some(self.writes_tx.clone()),
348        )?;
349        let id = val.id();
350        *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
351        Ok(val)
352    }
353
354    fn update(&mut self, val: &Val, value: Value) {
355        val.update(&mut self.batch, value);
356    }
357
358    fn unpublish(&mut self, val: Val, ref_by: ExprId) {
359        if let Some(refs) = self.published.get_mut(&val.id()) {
360            if let Some(cn) = refs.get_mut(&ref_by) {
361                *cn -= 1;
362                if *cn == 0 {
363                    refs.remove(&ref_by);
364                }
365            }
366            if refs.is_empty() {
367                self.published.remove(&val.id());
368            }
369        }
370    }
371
372    fn set_timer(&mut self, id: BindId, timeout: Duration) {
373        self.tasks
374            .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
375    }
376
377    fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
378        *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
379    }
380
381    fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
382        if let Some(refs) = self.by_ref.get_mut(&id) {
383            if let Some(cn) = refs.get_mut(&ref_by) {
384                *cn -= 1;
385                if *cn == 0 {
386                    refs.remove(&ref_by);
387                }
388            }
389            if refs.is_empty() {
390                self.by_ref.remove(&id);
391            }
392        }
393    }
394
395    fn set_var(&mut self, id: BindId, value: Value) {
396        self.var_updates.push_back((id, value.clone()));
397    }
398}
399
400fn is_output(n: &Node<BSCtx, NoUserEvent>) -> bool {
401    match &n.kind {
402        NodeKind::Bind { .. }
403        | NodeKind::Lambda(_)
404        | NodeKind::Use { .. }
405        | NodeKind::Connect(_, _)
406        | NodeKind::Module(_)
407        | NodeKind::TypeDef { .. } => false,
408        _ => true,
409    }
410}
411
412async fn or_never(b: bool) {
413    if !b {
414        future::pending().await
415    }
416}
417
418async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
419    if go {
420        match ch.next().await {
421            None => future::pending().await,
422            Some(v) => v,
423        }
424    } else {
425        future::pending().await
426    }
427}
428
429async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
430    if pending.len() == 0 {
431        future::pending().await
432    } else {
433        let (ts, _) = pending.front().unwrap();
434        let one = Duration::from_secs(1);
435        let elapsed = now - *ts;
436        if elapsed < one {
437            time::sleep(one - elapsed).await
438        }
439    }
440}
441
442#[derive(Clone)]
443pub struct CompExp {
444    pub id: ExprId,
445    pub typ: Type<NoRefs>,
446    pub output: bool,
447}
448
449#[derive(Clone)]
450pub struct CompRes {
451    pub exprs: SmallVec<[CompExp; 1]>,
452    pub env: Env<BSCtx, NoUserEvent>,
453}
454
455#[derive(Clone)]
456pub enum RtEvent {
457    Updated(ExprId, Value),
458}
459
460enum ToRt {
461    GetEnv { res: oneshot::Sender<Env<BSCtx, NoUserEvent>> },
462    Delete { id: ExprId, res: oneshot::Sender<Result<Env<BSCtx, NoUserEvent>>> },
463    Load { path: PathBuf, res: oneshot::Sender<Result<CompRes>> },
464    Compile { text: ArcStr, res: oneshot::Sender<Result<CompRes>> },
465    Subscribe { ch: mpsc::Sender<RtEvent> },
466}
467
468// setting expectations is one of the most under rated skills in
469// software engineering
470struct BS {
471    ctx: ExecCtx<BSCtx, NoUserEvent>,
472    event: Event<NoUserEvent>,
473    updated: FxHashMap<ExprId, bool>,
474    nodes: IndexMap<ExprId, Node<BSCtx, NoUserEvent>, FxBuildHasher>,
475    subs: Vec<mpsc::Sender<RtEvent>>,
476    resolvers: Vec<ModuleResolver>,
477    publish_timeout: Option<Duration>,
478    last_rpc_gc: Instant,
479}
480
481impl BS {
482    fn new(mut rt: BSConfig) -> Self {
483        let resolvers_default = || match dirs::data_dir() {
484            None => vec![ModuleResolver::Files("".into())],
485            Some(dd) => vec![
486                ModuleResolver::Files("".into()),
487                ModuleResolver::Files(dd.join("bscript")),
488            ],
489        };
490        let resolvers = match std::env::var("BSCRIPT_MODPATH") {
491            Err(_) => resolvers_default(),
492            Ok(mp) => match ModuleResolver::parse_env(
493                rt.subscriber.clone(),
494                rt.subscribe_timeout,
495                &mp,
496            ) {
497                Ok(r) => r,
498                Err(e) => {
499                    error!("failed to parse BSCRIPT_MODPATH, using default {e:?}");
500                    resolvers_default()
501                }
502            },
503        };
504        let mut event = Event::new(NoUserEvent);
505        let mut ctx = match rt.ctx.take() {
506            Some(ctx) => ctx,
507            None => ExecCtx::new(BSCtx::new(rt.publisher, rt.subscriber)),
508        };
509        event.init = true;
510        let mut std = mem::take(&mut ctx.std);
511        for n in std.iter_mut() {
512            let _ = n.update(&mut ctx, &mut event);
513        }
514        ctx.std = std;
515        event.init = false;
516        Self {
517            ctx,
518            event,
519            updated: HashMap::default(),
520            nodes: IndexMap::default(),
521            subs: vec![],
522            resolvers,
523            publish_timeout: rt.publish_timeout,
524            last_rpc_gc: Instant::now(),
525        }
526    }
527
528    async fn do_cycle(
529        &mut self,
530        updates: Option<UpdateBatch>,
531        writes: Option<WriteBatch>,
532        tasks: &mut Vec<(BindId, Value)>,
533        rpcs: &mut Vec<(BindId, RpcCall)>,
534    ) {
535        macro_rules! push_event {
536            ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
537                match self.event.$event.entry($id) {
538                    Entry::Vacant(e) => {
539                        e.insert($v);
540                        if let Some(exps) = self.ctx.user.$refed.get(&$id) {
541                            for id in exps.keys() {
542                                self.updated.entry(*id).or_insert(false);
543                            }
544                        }
545                    }
546                    Entry::Occupied(_) => {
547                        self.ctx.user.$overflow.push_back(($id, $v));
548                    }
549                }
550            };
551        }
552        for _ in 0..self.ctx.user.var_updates.len() {
553            let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
554            push_event!(id, v, variables, by_ref, var_updates)
555        }
556        for (id, v) in tasks.drain(..) {
557            push_event!(id, v, variables, by_ref, var_updates)
558        }
559        for _ in 0..self.ctx.user.rpc_overflow.len() {
560            let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
561            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
562        }
563        for (id, v) in rpcs.drain(..) {
564            push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
565        }
566        for _ in 0..self.ctx.user.net_updates.len() {
567            let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
568            push_event!(id, v, netidx, subscribed, net_updates)
569        }
570        if let Some(mut updates) = updates {
571            for (id, v) in updates.drain(..) {
572                push_event!(id, v, netidx, subscribed, net_updates)
573            }
574        }
575        for _ in 0..self.ctx.user.net_writes.len() {
576            let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
577            push_event!(id, v, writes, published, net_writes)
578        }
579        if let Some(mut writes) = writes {
580            for wr in writes.drain(..) {
581                let id = wr.id;
582                push_event!(id, wr, writes, published, net_writes)
583            }
584        }
585        for (id, n) in self.nodes.iter_mut() {
586            if let Some(init) = self.updated.get(id) {
587                let mut clear: SmallVec<[BindId; 16]> = smallvec![];
588                self.event.init = *init;
589                if self.event.init {
590                    n.refs(&mut |id| {
591                        if let Some(v) = self.ctx.cached.get(&id) {
592                            if let Entry::Vacant(e) = self.event.variables.entry(id) {
593                                e.insert(v.clone());
594                                clear.push(id);
595                            }
596                        }
597                    });
598                }
599                let res = n.update(&mut self.ctx, &mut self.event);
600                for id in clear {
601                    self.event.variables.remove(&id);
602                }
603                if let Some(v) = res {
604                    let mut i = 0;
605                    while i < self.subs.len() {
606                        if let Err(_) =
607                            self.subs[i].send(RtEvent::Updated(*id, v.clone())).await
608                        {
609                            self.subs.remove(i);
610                        }
611                        i += 1;
612                    }
613                }
614            }
615        }
616        self.event.clear();
617        self.updated.clear();
618        if self.ctx.user.batch.len() > 0 {
619            let batch = mem::replace(
620                &mut self.ctx.user.batch,
621                self.ctx.user.publisher.start_batch(),
622            );
623            let timeout = self.publish_timeout;
624            task::spawn(async move { batch.commit(timeout).await });
625        }
626    }
627
628    fn cycle_ready(&self) -> bool {
629        self.ctx.user.var_updates.len() > 0
630            || self.ctx.user.net_updates.len() > 0
631            || self.ctx.user.net_writes.len() > 0
632            || self.ctx.user.rpc_overflow.len() > 0
633    }
634
635    async fn compile(&mut self, text: ArcStr) -> Result<CompRes> {
636        let scope = ModPath::root();
637        let ori = expr::parser::parse(None, text.clone())?;
638        let exprs = join_all(
639            ori.exprs.iter().map(|e| e.resolve_modules(&scope, &self.resolvers)),
640        )
641        .await
642        .into_iter()
643        .collect::<Result<SmallVec<[_; 4]>>>()
644        .context(CouldNotResolve)?;
645        let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
646        let nodes = ori
647            .exprs
648            .iter()
649            .map(|e| Node::compile(&mut self.ctx, &scope, e.clone()))
650            .collect::<Result<SmallVec<[_; 4]>>>()
651            .with_context(|| ori.clone())?;
652        let exprs = ori
653            .exprs
654            .iter()
655            .zip(nodes.into_iter())
656            .map(|(e, n)| {
657                let output = is_output(&n);
658                let typ = n.typ.clone();
659                self.updated.insert(e.id, true);
660                self.nodes.insert(e.id, n);
661                CompExp { id: e.id, output, typ }
662            })
663            .collect::<SmallVec<[_; 1]>>();
664        Ok(CompRes { exprs, env: self.ctx.env.clone() })
665    }
666
667    async fn load(&mut self, file: &PathBuf) -> Result<CompRes> {
668        let scope = ModPath::root();
669        let (scope, ori) = match file.extension() {
670            Some(e) if e.as_bytes() == b"bs" => {
671                let scope = match file.file_name() {
672                    None => scope,
673                    Some(name) => ModPath(scope.append(&*name.to_string_lossy())),
674                };
675                let s = fs::read_to_string(file).await?;
676                let s = if s.starts_with("#!") {
677                    if let Some(i) = s.find('\n') {
678                        &s[i..]
679                    } else {
680                        s.as_str()
681                    }
682                } else {
683                    s.as_str()
684                };
685                let s = ArcStr::from(s);
686                let name = ArcStr::from(file.to_string_lossy());
687                (scope, expr::parser::parse(Some(name), s)?)
688            }
689            Some(e) => bail!("invalid file extension {e:?}"),
690            None => {
691                let name = file
692                    .components()
693                    .map(|c| match c {
694                        Component::RootDir
695                        | Component::CurDir
696                        | Component::ParentDir
697                        | Component::Prefix(_) => bail!("invalid module name {file:?}"),
698                        Component::Normal(s) => Ok(s),
699                    })
700                    .collect::<Result<Box<[_]>>>()?;
701                if name.len() != 1 {
702                    bail!("invalid module name {file:?}")
703                }
704                let name = String::from_utf8_lossy(name[0].as_bytes());
705                let name = name
706                    .parse::<ModPath>()
707                    .with_context(|| "parsing module name {file:?}")?;
708                let scope =
709                    ModPath(Path::from_str(Path::dirname(&*name).unwrap_or(&*scope)));
710                let name = Path::basename(&*name)
711                    .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
712                let name = ArcStr::from(name);
713                let e = ExprKind::Module {
714                    export: true,
715                    name: name.clone(),
716                    value: ModuleKind::Unresolved,
717                }
718                .to_expr(Default::default());
719                let ori = Origin {
720                    name: Some(name),
721                    source: literal!(""),
722                    exprs: Arc::from_iter([e]),
723                };
724                (scope, ori)
725            }
726        };
727        let expr = ExprKind::Module {
728            export: true,
729            name: ori.name.clone().unwrap_or(literal!("")),
730            value: ModuleKind::Inline(ori.exprs.clone()),
731        }
732        .to_expr(SourcePosition::default());
733        let expr = expr
734            .resolve_modules(&scope, &self.resolvers)
735            .await
736            .with_context(|| ori.clone())?;
737        let top_id = expr.id;
738        let n =
739            Node::compile(&mut self.ctx, &scope, expr).with_context(|| ori.clone())?;
740        let has_out = is_output(&n);
741        let typ = n.typ.clone();
742        self.nodes.insert(top_id, n);
743        self.updated.insert(top_id, true);
744        let exprs = smallvec![CompExp { id: top_id, output: has_out, typ }];
745        Ok(CompRes { exprs, env: self.ctx.env.clone() })
746    }
747
748    async fn run(mut self, mut to_rt: mpsc::UnboundedReceiver<ToRt>) -> Result<()> {
749        let mut tasks = vec![];
750        let mut rpcs = vec![];
751        let onemin = Duration::from_secs(60);
752        'main: loop {
753            let now = Instant::now();
754            let ready = self.cycle_ready();
755            let mut updates = None;
756            let mut writes = None;
757            let mut input = None;
758            macro_rules! peek {
759                (updates) => {
760                    if self.ctx.user.net_updates.is_empty() {
761                        if let Ok(Some(up)) = self.ctx.user.updates.try_next() {
762                            updates = Some(up);
763                        }
764                    }
765                };
766                (writes) => {
767                    if self.ctx.user.net_writes.is_empty() {
768                        if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
769                            writes = Some(wr);
770                        }
771                    }
772                };
773                (tasks) => {
774                    while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
775                        tasks.push(up);
776                    }
777                };
778                (rpcs) => {
779                    if self.ctx.user.rpc_overflow.is_empty() {
780                        while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
781                            rpcs.push(up);
782                        }
783                    }
784                };
785                ($($item:tt),+) => {{
786                    $(peek!($item));+
787                }};
788            }
789            select! {
790                rp = maybe_next(self.ctx.user.rpc_overflow.is_empty(), &mut self.ctx.user.rpcs) => {
791                    rpcs.push(rp);
792                    peek!(updates, tasks, writes, rpcs)
793                }
794                wr = maybe_next(self.ctx.user.net_writes.is_empty(), &mut self.ctx.user.writes) => {
795                    writes = Some(wr);
796                    peek!(updates, tasks, rpcs);
797                },
798                up = maybe_next(self.ctx.user.net_updates.is_empty(), &mut self.ctx.user.updates) => {
799                    updates = Some(up);
800                    peek!(writes, tasks, rpcs);
801                },
802                up = self.ctx.user.tasks.join_next() => {
803                    if let Some(Ok(up)) = up {
804                        tasks.push(up);
805                    }
806                    peek!(updates, writes, tasks, rpcs)
807                },
808                n = to_rt.next() => match n {
809                    None => break 'main Ok(()),
810                    Some(i) => {
811                        peek!(updates, writes, tasks, rpcs);
812                        input = Some(i);
813                    }
814                },
815                _ = or_never(ready) => peek!(updates, writes, tasks, rpcs),
816                () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
817                    while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
818                        if ts.elapsed() >= Duration::from_secs(1) {
819                            self.ctx.user.pending_unsubscribe.pop_front();
820                        } else {
821                            break
822                        }
823                    }
824                    continue 'main
825                }
826            }
827            match input {
828                None => (),
829                Some(ToRt::GetEnv { res }) => {
830                    let _ = res.send(self.ctx.env.clone());
831                }
832                Some(ToRt::Compile { text, res }) => {
833                    let _ = res.send(self.compile(text).await);
834                }
835                Some(ToRt::Load { path, res }) => {
836                    let _ = res.send(self.load(&path).await);
837                }
838                Some(ToRt::Subscribe { ch }) => {
839                    self.subs.push(ch);
840                }
841                Some(ToRt::Delete { id, res }) => {
842                    // CR estokes: Check dependencies
843                    if let Some(n) = self.nodes.shift_remove(&id) {
844                        n.delete(&mut self.ctx);
845                    }
846                    let _ = res.send(Ok(self.ctx.env.clone()));
847                }
848            }
849            self.do_cycle(updates, writes, &mut tasks, &mut rpcs).await;
850            if !self.ctx.user.rpc_clients.is_empty() {
851                if now - self.last_rpc_gc >= onemin {
852                    self.last_rpc_gc = now;
853                    self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
854                }
855            }
856        }
857    }
858}
859
860/// A handle to a running BS instance. Drop the handle to shutdown the
861/// associated background tasks.
862pub struct BSHandle(mpsc::UnboundedSender<ToRt>);
863
864impl BSHandle {
865    async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
866        let (tx, rx) = oneshot::channel();
867        self.0.unbounded_send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
868        Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
869    }
870
871    /// Get a copy of the current bscript environment
872    pub async fn get_env(&self) -> Result<Env<BSCtx, NoUserEvent>> {
873        self.exec(|res| ToRt::GetEnv { res }).await
874    }
875
876    /// Compile and execute the specified bscript expression. It can
877    /// either be a module expression or a single expression. If it
878    /// generates results, they will be sent to all the channels that
879    /// are subscribed.
880    pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
881        Ok(self.exec(|tx| ToRt::Compile { text, res: tx }).await??)
882    }
883
884    /// Load and execute the specified bscript module. The path may
885    /// have one of two forms. If it is the path to a file with
886    /// extension .bs then the rt will load the file directly. If it
887    /// is a modpath (e.g. foo::bar::baz) then the module resolver
888    /// will look for a matching module in the modpath.
889    pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
890        Ok(self.exec(|tx| ToRt::Load { path, res: tx }).await??)
891    }
892
893    /// Delete the specified expression
894    pub async fn delete(&self, id: ExprId) -> Result<Env<BSCtx, NoUserEvent>> {
895        Ok(self.exec(|tx| ToRt::Delete { id, res: tx }).await??)
896    }
897
898    /// The specified channel will receive events generated by all
899    /// bscript expressions being run by the runtime. To unsubscribe,
900    /// just drop the receiver.
901    pub fn subscribe(&self, ch: mpsc::Sender<RtEvent>) -> Result<()> {
902        self.0
903            .unbounded_send(ToRt::Subscribe { ch })
904            .map_err(|_| anyhow!("runtime is dead"))
905    }
906}
907
908#[derive(Builder)]
909pub struct BSConfig {
910    publisher: Publisher,
911    subscriber: Subscriber,
912    #[builder(setter(strip_option), default)]
913    subscribe_timeout: Option<Duration>,
914    #[builder(setter(strip_option), default)]
915    publish_timeout: Option<Duration>,
916    #[builder(setter(skip))]
917    ctx: Option<ExecCtx<BSCtx, NoUserEvent>>,
918}
919
920impl BSConfig {
921    /// Register a builtin bscript function
922    pub fn register_builtin<T: BuiltIn<BSCtx, NoUserEvent>>(&mut self) -> Result<()> {
923        let ctx = match self.ctx.as_mut() {
924            Some(ctx) => ctx,
925            None => {
926                self.ctx = Some(ExecCtx::new(BSCtx::new(
927                    self.publisher.clone(),
928                    self.subscriber.clone(),
929                )));
930                self.ctx.as_mut().unwrap()
931            }
932        };
933        ctx.register_builtin::<T>()
934    }
935
936    /// Start the BS runtime with the specified config, return a
937    /// handle capable of interacting with it.
938    pub fn start(self) -> BSHandle {
939        let (tx, rx) = mpsc::unbounded();
940        task::spawn(async move {
941            let bs = BS::new(self);
942            if let Err(e) = bs.run(rx).await {
943                error!("run loop exited with error {e:?}")
944            }
945        });
946        BSHandle(tx)
947    }
948}