1use crate::{
13 compile,
14 env::Env,
15 expr::{self, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin},
16 node::genn,
17 typ::{FnType, Type},
18 BindId, BuiltIn, Ctx, Event, ExecCtx, LambdaId, NoUserEvent, Node,
19};
20use anyhow::{anyhow, bail, Context, Result};
21use arcstr::{literal, ArcStr};
22use chrono::prelude::*;
23use combine::stream::position::SourcePosition;
24use compact_str::format_compact;
25use core::fmt;
26use derive_builder::Builder;
27use futures::{channel::mpsc, future::join_all, FutureExt, SinkExt, StreamExt};
28use fxhash::{FxBuildHasher, FxHashMap};
29use indexmap::IndexMap;
30use log::error;
31use netidx::{
32 path::Path,
33 pool::Pooled,
34 protocol::valarray::ValArray,
35 publisher::{self, Id, PublishFlags, Publisher, Typ, Val, Value, WriteRequest},
36 resolver_client::ChangeTracker,
37 subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
38};
39use netidx_protocols::rpc::{
40 self,
41 server::{ArgSpec, RpcCall},
42};
43use smallvec::{smallvec, SmallVec};
44use std::{
45 collections::{hash_map::Entry, HashMap, VecDeque},
46 future, mem,
47 os::unix::ffi::OsStrExt,
48 path::{Component, PathBuf},
49 sync::Weak,
50 time::Duration,
51};
52use tokio::{
53 fs, select,
54 sync::{oneshot, Mutex},
55 task::{self, JoinSet},
56 time::{self, Instant},
57};
58use triomphe::Arc;
59
60type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
61type WriteBatch = Pooled<Vec<WriteRequest>>;
62
63#[derive(Debug)]
64pub struct CouldNotResolve;
65
66impl fmt::Display for CouldNotResolve {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 write!(f, "could not resolve module")
69 }
70}
71
72#[derive(Debug)]
73struct RpcClient {
74 proc: rpc::client::Proc,
75 last_used: Instant,
76}
77
78#[derive(Debug)]
79pub struct BSCtx {
80 by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
81 subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
82 published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
83 var_updates: VecDeque<(BindId, Value)>,
84 net_updates: VecDeque<(SubId, subscriber::Event)>,
85 net_writes: VecDeque<(Id, WriteRequest)>,
86 rpc_overflow: VecDeque<(BindId, RpcCall)>,
87 rpc_clients: FxHashMap<Path, RpcClient>,
88 published_rpcs: FxHashMap<Path, rpc::server::Proc>,
89 pending_unsubscribe: VecDeque<(Instant, Dval)>,
90 change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
91 tasks: JoinSet<(BindId, Value)>,
92 batch: publisher::UpdateBatch,
93 publisher: Publisher,
94 subscriber: Subscriber,
95 updates_tx: mpsc::Sender<UpdateBatch>,
96 updates: mpsc::Receiver<UpdateBatch>,
97 writes_tx: mpsc::Sender<WriteBatch>,
98 writes: mpsc::Receiver<WriteBatch>,
99 rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
100 rpcs: mpsc::Receiver<(BindId, RpcCall)>,
101}
102
103impl BSCtx {
104 fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
105 let (updates_tx, updates) = mpsc::channel(100);
106 let (writes_tx, writes) = mpsc::channel(100);
107 let (rpcs_tx, rpcs) = mpsc::channel(100);
108 let batch = publisher.start_batch();
109 let mut tasks = JoinSet::new();
110 tasks.spawn(async { future::pending().await });
111 Self {
112 by_ref: HashMap::default(),
113 var_updates: VecDeque::new(),
114 net_updates: VecDeque::new(),
115 net_writes: VecDeque::new(),
116 rpc_overflow: VecDeque::new(),
117 rpc_clients: HashMap::default(),
118 subscribed: HashMap::default(),
119 pending_unsubscribe: VecDeque::new(),
120 published: HashMap::default(),
121 change_trackers: HashMap::default(),
122 published_rpcs: HashMap::default(),
123 tasks,
124 batch,
125 publisher,
126 subscriber,
127 updates,
128 updates_tx,
129 writes,
130 writes_tx,
131 rpcs_tx,
132 rpcs,
133 }
134 }
135}
136
137macro_rules! or_err {
138 ($bindid:expr, $e:expr) => {
139 match $e {
140 Ok(v) => v,
141 Err(e) => {
142 let e = ArcStr::from(format_compact!("{e:?}").as_str());
143 let e = Value::Error(e);
144 return ($bindid, e);
145 }
146 }
147 };
148}
149
150macro_rules! check_changed {
151 ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
152 let mut ct = $ct.lock().await;
153 if ct.path() != &$path {
154 *ct = ChangeTracker::new($path.clone());
155 }
156 if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
157 return ($id, Value::Null);
158 }
159 };
160}
161
162impl Ctx for BSCtx {
163 fn clear(&mut self) {
164 let Self {
165 by_ref,
166 var_updates,
167 net_updates,
168 net_writes,
169 rpc_clients,
170 rpc_overflow,
171 subscribed,
172 published,
173 published_rpcs,
174 pending_unsubscribe,
175 change_trackers,
176 tasks,
177 batch,
178 publisher,
179 subscriber: _,
180 updates_tx,
181 updates,
182 writes_tx,
183 writes,
184 rpcs,
185 rpcs_tx,
186 } = self;
187 by_ref.clear();
188 var_updates.clear();
189 net_updates.clear();
190 net_writes.clear();
191 rpc_overflow.clear();
192 rpc_clients.clear();
193 subscribed.clear();
194 published.clear();
195 published_rpcs.clear();
196 pending_unsubscribe.clear();
197 change_trackers.clear();
198 *tasks = JoinSet::new();
199 tasks.spawn(async { future::pending().await });
200 *batch = publisher.start_batch();
201 let (tx, rx) = mpsc::channel(3);
202 *updates_tx = tx;
203 *updates = rx;
204 let (tx, rx) = mpsc::channel(100);
205 *writes_tx = tx;
206 *writes = rx;
207 let (tx, rx) = mpsc::channel(100);
208 *rpcs_tx = tx;
209 *rpcs = rx
210 }
211
212 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
213 let now = Instant::now();
214 let proc = match self.rpc_clients.entry(name) {
215 Entry::Occupied(mut e) => {
216 let cl = e.get_mut();
217 cl.last_used = now;
218 Ok(cl.proc.clone())
219 }
220 Entry::Vacant(e) => {
221 match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
222 Err(e) => Err(e),
223 Ok(proc) => {
224 let cl = RpcClient { last_used: now, proc: proc.clone() };
225 e.insert(cl);
226 Ok(proc)
227 }
228 }
229 }
230 };
231 self.tasks.spawn(async move {
232 macro_rules! err {
233 ($e:expr) => {{
234 let e = format_compact!("{:?}", $e);
235 (id, Value::Error(e.as_str().into()))
236 }};
237 }
238 match proc {
239 Err(e) => err!(e),
240 Ok(proc) => match proc.call(args).await {
241 Err(e) => err!(e),
242 Ok(res) => (id, res),
243 },
244 }
245 });
246 }
247
248 fn publish_rpc(
249 &mut self,
250 name: Path,
251 doc: Value,
252 spec: Vec<ArgSpec>,
253 id: BindId,
254 ) -> Result<()> {
255 use rpc::server::Proc;
256 let e = match self.published_rpcs.entry(name) {
257 Entry::Vacant(e) => e,
258 Entry::Occupied(_) => bail!("already published"),
259 };
260 let proc = Proc::new(
261 &self.publisher,
262 e.key().clone(),
263 doc,
264 spec,
265 move |c| Some((id, c)),
266 Some(self.rpcs_tx.clone()),
267 )?;
268 e.insert(proc);
269 Ok(())
270 }
271
272 fn unpublish_rpc(&mut self, name: Path) {
273 self.published_rpcs.remove(&name);
274 }
275
276 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
277 let dval =
278 self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
279 *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
280 dval
281 }
282
283 fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
284 if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
285 if let Some(cn) = exprs.get_mut(&ref_by) {
286 *cn -= 1;
287 if *cn == 0 {
288 exprs.remove(&ref_by);
289 }
290 }
291 if exprs.is_empty() {
292 self.subscribed.remove(&dv.id());
293 }
294 }
295 self.pending_unsubscribe.push_back((Instant::now(), dv));
296 }
297
298 fn list(&mut self, id: BindId, path: Path) {
299 let ct = self
300 .change_trackers
301 .entry(id)
302 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
303 let ct = Arc::clone(ct);
304 let resolver = self.subscriber.resolver();
305 self.tasks.spawn(async move {
306 check_changed!(id, resolver, path, ct);
307 let mut paths = or_err!(id, resolver.list(path).await);
308 let paths = paths.drain(..).map(|p| Value::String(p.into()));
309 (id, Value::Array(ValArray::from_iter_exact(paths)))
310 });
311 }
312
313 fn list_table(&mut self, id: BindId, path: Path) {
314 let ct = self
315 .change_trackers
316 .entry(id)
317 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
318 let ct = Arc::clone(ct);
319 let resolver = self.subscriber.resolver();
320 self.tasks.spawn(async move {
321 check_changed!(id, resolver, path, ct);
322 let mut tbl = or_err!(id, resolver.table(path).await);
323 let cols = tbl.cols.drain(..).map(|(name, count)| {
324 Value::Array(ValArray::from([
325 Value::String(name.into()),
326 Value::V64(count.0),
327 ]))
328 });
329 let cols = Value::Array(ValArray::from_iter_exact(cols));
330 let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
331 let rows = Value::Array(ValArray::from_iter_exact(rows));
332 let tbl = Value::Array(ValArray::from([
333 Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
334 Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
335 ]));
336 (id, tbl)
337 });
338 }
339
340 fn stop_list(&mut self, id: BindId) {
341 self.change_trackers.remove(&id);
342 }
343
344 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
345 let val = self.publisher.publish_with_flags_and_writes(
346 PublishFlags::empty(),
347 path,
348 value,
349 Some(self.writes_tx.clone()),
350 )?;
351 let id = val.id();
352 *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
353 Ok(val)
354 }
355
356 fn update(&mut self, val: &Val, value: Value) {
357 val.update(&mut self.batch, value);
358 }
359
360 fn unpublish(&mut self, val: Val, ref_by: ExprId) {
361 if let Some(refs) = self.published.get_mut(&val.id()) {
362 if let Some(cn) = refs.get_mut(&ref_by) {
363 *cn -= 1;
364 if *cn == 0 {
365 refs.remove(&ref_by);
366 }
367 }
368 if refs.is_empty() {
369 self.published.remove(&val.id());
370 }
371 }
372 }
373
374 fn set_timer(&mut self, id: BindId, timeout: Duration) {
375 self.tasks
376 .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
377 }
378
379 fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
380 *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
381 }
382
383 fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
384 if let Some(refs) = self.by_ref.get_mut(&id) {
385 if let Some(cn) = refs.get_mut(&ref_by) {
386 *cn -= 1;
387 if *cn == 0 {
388 refs.remove(&ref_by);
389 }
390 }
391 if refs.is_empty() {
392 self.by_ref.remove(&id);
393 }
394 }
395 }
396
397 fn set_var(&mut self, id: BindId, value: Value) {
398 self.var_updates.push_back((id, value.clone()));
399 }
400}
401
402fn is_output(n: &Node<BSCtx, NoUserEvent>) -> bool {
403 match &n.spec().kind {
404 ExprKind::Bind { .. }
405 | ExprKind::Lambda { .. }
406 | ExprKind::Use { .. }
407 | ExprKind::Connect { .. }
408 | ExprKind::Module { .. }
409 | ExprKind::TypeDef { .. } => false,
410 _ => true,
411 }
412}
413
414async fn or_never(b: bool) {
415 if !b {
416 future::pending().await
417 }
418}
419
420async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
421 if go {
422 match ch.next().await {
423 None => future::pending().await,
424 Some(v) => v,
425 }
426 } else {
427 future::pending().await
428 }
429}
430
431async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
432 if pending.len() == 0 {
433 future::pending().await
434 } else {
435 let (ts, _) = pending.front().unwrap();
436 let one = Duration::from_secs(1);
437 let elapsed = now - *ts;
438 if elapsed < one {
439 time::sleep(one - elapsed).await
440 }
441 }
442}
443
444#[derive(Clone)]
445pub struct CompExp {
446 pub id: ExprId,
447 pub typ: Type,
448 pub output: bool,
449}
450
451#[derive(Clone)]
452pub struct CompRes {
453 pub exprs: SmallVec<[CompExp; 1]>,
454 pub env: Env<BSCtx, NoUserEvent>,
455}
456
457#[derive(Clone)]
458pub enum RtEvent {
459 Updated(ExprId, Value),
460}
461
462atomic_id!(CallableId);
463
464pub struct Callable {
465 rt: BSHandle,
466 id: CallableId,
467 env: Env<BSCtx, NoUserEvent>,
468 pub typ: FnType,
469 pub expr: ExprId,
470}
471
472impl Drop for Callable {
473 fn drop(&mut self) {
474 let _ = self.rt.0.unbounded_send(ToRt::DeleteCallable { id: self.id });
475 }
476}
477
478impl Callable {
479 pub async fn call(&self, args: ValArray) -> Result<()> {
482 if self.typ.args.len() != args.len() {
483 bail!("expected {} args", self.typ.args.len())
484 }
485 for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
486 if !a.typ.is_a(&self.env, v) {
487 bail!("type mismatch arg {i} expected {}", a.typ)
488 }
489 }
490 self.call_unchecked(args).await
491 }
492
493 pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
497 self.rt
498 .0
499 .unbounded_send(ToRt::Call { id: self.id, args })
500 .map_err(|_| anyhow!("runtime is dead"))
501 }
502}
503
504enum ToRt {
505 GetEnv { res: oneshot::Sender<Env<BSCtx, NoUserEvent>> },
506 Delete { id: ExprId, res: oneshot::Sender<Result<Env<BSCtx, NoUserEvent>>> },
507 Load { path: PathBuf, res: oneshot::Sender<Result<CompRes>> },
508 Compile { text: ArcStr, res: oneshot::Sender<Result<CompRes>> },
509 CompileCallable { id: Value, rt: BSHandle, res: oneshot::Sender<Result<Callable>> },
510 CompileRef { id: Value, res: oneshot::Sender<Result<ExprId>> },
511 Call { id: CallableId, args: ValArray },
512 DeleteCallable { id: CallableId },
513 Subscribe { ch: mpsc::Sender<RtEvent> },
514}
515
516struct CallableInt {
517 expr: ExprId,
518 args: Box<[BindId]>,
519}
520
521struct BS {
524 ctx: ExecCtx<BSCtx, NoUserEvent>,
525 event: Event<NoUserEvent>,
526 updated: FxHashMap<ExprId, bool>,
527 nodes: IndexMap<ExprId, Node<BSCtx, NoUserEvent>, FxBuildHasher>,
528 callables: FxHashMap<CallableId, CallableInt>,
529 subs: Vec<mpsc::Sender<RtEvent>>,
530 resolvers: Vec<ModuleResolver>,
531 publish_timeout: Option<Duration>,
532 last_rpc_gc: Instant,
533}
534
535impl BS {
536 fn new(mut rt: BSConfig) -> Self {
537 let resolvers_default = || match dirs::data_dir() {
538 None => vec![ModuleResolver::Files("".into())],
539 Some(dd) => vec![
540 ModuleResolver::Files("".into()),
541 ModuleResolver::Files(dd.join("bscript")),
542 ],
543 };
544 let resolvers = match std::env::var("BSCRIPT_MODPATH") {
545 Err(_) => resolvers_default(),
546 Ok(mp) => match ModuleResolver::parse_env(
547 rt.subscriber.clone(),
548 rt.subscribe_timeout,
549 &mp,
550 ) {
551 Ok(r) => r,
552 Err(e) => {
553 error!("failed to parse BSCRIPT_MODPATH, using default {e:?}");
554 resolvers_default()
555 }
556 },
557 };
558 let mut event = Event::new(NoUserEvent);
559 let mut ctx = match rt.ctx.take() {
560 Some(ctx) => ctx,
561 None => ExecCtx::new(BSCtx::new(rt.publisher, rt.subscriber)),
562 };
563 event.init = true;
564 let mut std = mem::take(&mut ctx.std);
565 for n in std.iter_mut() {
566 let _ = n.update(&mut ctx, &mut event);
567 }
568 ctx.std = std;
569 event.init = false;
570 Self {
571 ctx,
572 event,
573 updated: HashMap::default(),
574 nodes: IndexMap::default(),
575 callables: HashMap::default(),
576 subs: vec![],
577 resolvers,
578 publish_timeout: rt.publish_timeout,
579 last_rpc_gc: Instant::now(),
580 }
581 }
582
583 async fn do_cycle(
584 &mut self,
585 updates: Option<UpdateBatch>,
586 writes: Option<WriteBatch>,
587 tasks: &mut Vec<(BindId, Value)>,
588 rpcs: &mut Vec<(BindId, RpcCall)>,
589 ) {
590 macro_rules! push_event {
591 ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
592 match self.event.$event.entry($id) {
593 Entry::Vacant(e) => {
594 e.insert($v);
595 if let Some(exps) = self.ctx.user.$refed.get(&$id) {
596 for id in exps.keys() {
597 self.updated.entry(*id).or_insert(false);
598 }
599 }
600 }
601 Entry::Occupied(_) => {
602 self.ctx.user.$overflow.push_back(($id, $v));
603 }
604 }
605 };
606 }
607 for _ in 0..self.ctx.user.var_updates.len() {
608 let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
609 push_event!(id, v, variables, by_ref, var_updates)
610 }
611 for (id, v) in tasks.drain(..) {
612 push_event!(id, v, variables, by_ref, var_updates)
613 }
614 for _ in 0..self.ctx.user.rpc_overflow.len() {
615 let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
616 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
617 }
618 for (id, v) in rpcs.drain(..) {
619 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
620 }
621 for _ in 0..self.ctx.user.net_updates.len() {
622 let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
623 push_event!(id, v, netidx, subscribed, net_updates)
624 }
625 if let Some(mut updates) = updates {
626 for (id, v) in updates.drain(..) {
627 push_event!(id, v, netidx, subscribed, net_updates)
628 }
629 }
630 for _ in 0..self.ctx.user.net_writes.len() {
631 let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
632 push_event!(id, v, writes, published, net_writes)
633 }
634 if let Some(mut writes) = writes {
635 for wr in writes.drain(..) {
636 let id = wr.id;
637 push_event!(id, wr, writes, published, net_writes)
638 }
639 }
640 for (id, n) in self.nodes.iter_mut() {
641 if let Some(init) = self.updated.get(id) {
642 let mut clear: SmallVec<[BindId; 16]> = smallvec![];
643 self.event.init = *init;
644 if self.event.init {
645 n.refs(&mut |id| {
646 if let Some(v) = self.ctx.cached.get(&id) {
647 if let Entry::Vacant(e) = self.event.variables.entry(id) {
648 e.insert(v.clone());
649 clear.push(id);
650 }
651 }
652 });
653 }
654 let res = n.update(&mut self.ctx, &mut self.event);
655 for id in clear {
656 self.event.variables.remove(&id);
657 }
658 if let Some(v) = res {
659 let mut i = 0;
660 while i < self.subs.len() {
661 if let Err(_) =
662 self.subs[i].send(RtEvent::Updated(*id, v.clone())).await
663 {
664 self.subs.remove(i);
665 }
666 i += 1;
667 }
668 }
669 }
670 }
671 self.event.clear();
672 self.updated.clear();
673 if self.ctx.user.batch.len() > 0 {
674 let batch = mem::replace(
675 &mut self.ctx.user.batch,
676 self.ctx.user.publisher.start_batch(),
677 );
678 let timeout = self.publish_timeout;
679 task::spawn(async move { batch.commit(timeout).await });
680 }
681 }
682
683 fn cycle_ready(&self) -> bool {
684 self.ctx.user.var_updates.len() > 0
685 || self.ctx.user.net_updates.len() > 0
686 || self.ctx.user.net_writes.len() > 0
687 || self.ctx.user.rpc_overflow.len() > 0
688 }
689
690 async fn compile(&mut self, text: ArcStr) -> Result<CompRes> {
691 let scope = ModPath::root();
692 let ori = expr::parser::parse(None, text.clone())?;
693 let exprs = join_all(
694 ori.exprs.iter().map(|e| e.resolve_modules(&scope, &self.resolvers)),
695 )
696 .await
697 .into_iter()
698 .collect::<Result<SmallVec<[_; 4]>>>()
699 .context(CouldNotResolve)?;
700 let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
701 let nodes = ori
702 .exprs
703 .iter()
704 .map(|e| compile(&mut self.ctx, &scope, e.clone()))
705 .collect::<Result<SmallVec<[_; 4]>>>()
706 .with_context(|| ori.clone())?;
707 let exprs = ori
708 .exprs
709 .iter()
710 .zip(nodes.into_iter())
711 .map(|(e, n)| {
712 let output = is_output(&n);
713 let typ = n.typ().clone();
714 self.updated.insert(e.id, true);
715 self.nodes.insert(e.id, n);
716 CompExp { id: e.id, output, typ }
717 })
718 .collect::<SmallVec<[_; 1]>>();
719 Ok(CompRes { exprs, env: self.ctx.env.clone() })
720 }
721
722 async fn load(&mut self, file: &PathBuf) -> Result<CompRes> {
723 let scope = ModPath::root();
724 let (scope, ori) = match file.extension() {
725 Some(e) if e.as_bytes() == b"bs" => {
726 let scope = match file.file_name() {
727 None => scope,
728 Some(name) => ModPath(scope.append(&*name.to_string_lossy())),
729 };
730 let s = fs::read_to_string(file).await?;
731 let s = if s.starts_with("#!") {
732 if let Some(i) = s.find('\n') {
733 &s[i..]
734 } else {
735 s.as_str()
736 }
737 } else {
738 s.as_str()
739 };
740 let s = ArcStr::from(s);
741 let name = ArcStr::from(file.to_string_lossy());
742 (scope, expr::parser::parse(Some(name), s)?)
743 }
744 Some(e) => bail!("invalid file extension {e:?}"),
745 None => {
746 let name = file
747 .components()
748 .map(|c| match c {
749 Component::RootDir
750 | Component::CurDir
751 | Component::ParentDir
752 | Component::Prefix(_) => bail!("invalid module name {file:?}"),
753 Component::Normal(s) => Ok(s),
754 })
755 .collect::<Result<Box<[_]>>>()?;
756 if name.len() != 1 {
757 bail!("invalid module name {file:?}")
758 }
759 let name = String::from_utf8_lossy(name[0].as_bytes());
760 let name = name
761 .parse::<ModPath>()
762 .with_context(|| "parsing module name {file:?}")?;
763 let scope =
764 ModPath(Path::from_str(Path::dirname(&*name).unwrap_or(&*scope)));
765 let name = Path::basename(&*name)
766 .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
767 let name = ArcStr::from(name);
768 let e = ExprKind::Module {
769 export: true,
770 name: name.clone(),
771 value: ModuleKind::Unresolved,
772 }
773 .to_expr(Default::default());
774 let ori = Origin {
775 name: Some(name),
776 source: literal!(""),
777 exprs: Arc::from_iter([e]),
778 };
779 (scope, ori)
780 }
781 };
782 let expr = ExprKind::Module {
783 export: true,
784 name: ori.name.clone().unwrap_or(literal!("")),
785 value: ModuleKind::Inline(ori.exprs.clone()),
786 }
787 .to_expr(SourcePosition::default());
788 let expr = expr
789 .resolve_modules(&scope, &self.resolvers)
790 .await
791 .with_context(|| ori.clone())?;
792 let top_id = expr.id;
793 let n = compile(&mut self.ctx, &scope, expr).with_context(|| ori.clone())?;
794 let has_out = is_output(&n);
795 let typ = n.typ().clone();
796 self.nodes.insert(top_id, n);
797 self.updated.insert(top_id, true);
798 let exprs = smallvec![CompExp { id: top_id, output: has_out, typ }];
799 Ok(CompRes { exprs, env: self.ctx.env.clone() })
800 }
801
802 fn compile_callable(&mut self, id: Value, rt: BSHandle) -> Result<Callable> {
803 let id = match id {
804 Value::U64(id) => LambdaId(id),
805 v => bail!("invalid lambda id {v}"),
806 };
807 let lb = self.ctx.env.lambdas.get(&id).and_then(Weak::upgrade);
808 let lb = lb.ok_or_else(|| anyhow!("unknown lambda {id:?}"))?;
809 let args = lb.typ.args.iter();
810 let args = args
811 .map(|a| {
812 if a.label.as_ref().map(|(_, opt)| *opt).unwrap_or(false) {
813 bail!("can't call lambda with an optional argument from rust")
814 } else {
815 Ok(BindId::new())
816 }
817 })
818 .collect::<Result<Box<[_]>>>()?;
819 let eid = ExprId::new();
820 let argn = lb.typ.args.iter().zip(args.iter());
821 let argn = argn
822 .map(|(arg, id)| {
823 genn::reference(&mut self.ctx, *id, arg.typ.clone(), ExprId::new())
824 })
825 .collect::<Vec<_>>();
826 let fnode = genn::constant(Value::U64(id.0));
827 let n = genn::apply(fnode, argn, lb.typ.clone(), eid);
828 let cid = CallableId::new();
829 self.callables.insert(cid, CallableInt { expr: eid, args });
830 self.nodes.insert(eid, n);
831 let env = self.ctx.env.clone();
832 Ok(Callable { expr: eid, rt, env, id: cid, typ: (*lb.typ).clone() })
833 }
834
835 fn compile_ref(&mut self, id: Value) -> Result<ExprId> {
836 let id = match id {
837 Value::U64(id) => BindId(id),
838 v => bail!("invalid bind id {v}"),
839 };
840 let eid = ExprId::new();
841 let typ = Type::Primitive(Typ::any());
842 let n = genn::reference(&mut self.ctx, id, typ, eid);
843 self.nodes.insert(eid, n);
844 Ok(eid)
845 }
846
847 fn call_callable(
848 &mut self,
849 id: CallableId,
850 args: ValArray,
851 tasks: &mut Vec<(BindId, Value)>,
852 ) -> Result<()> {
853 let c =
854 self.callables.get(&id).ok_or_else(|| anyhow!("unknown callable {id:?}"))?;
855 if args.len() != c.args.len() {
856 bail!("expected {} arguments", c.args.len());
857 }
858 let a = c.args.iter().zip(args.iter()).map(|(id, v)| (*id, v.clone()));
859 tasks.extend(a);
860 Ok(())
861 }
862
863 fn delete_callable(&mut self, id: CallableId) {
864 if let Some(c) = self.callables.remove(&id) {
865 if let Some(mut n) = self.nodes.shift_remove(&c.expr) {
866 n.delete(&mut self.ctx)
867 }
868 }
869 }
870
871 async fn run(mut self, mut to_rt: mpsc::UnboundedReceiver<ToRt>) -> Result<()> {
872 let mut tasks = vec![];
873 let mut rpcs = vec![];
874 let onemin = Duration::from_secs(60);
875 'main: loop {
876 let now = Instant::now();
877 let ready = self.cycle_ready();
878 let mut updates = None;
879 let mut writes = None;
880 let mut input = None;
881 macro_rules! peek {
882 (updates) => {
883 if self.ctx.user.net_updates.is_empty() {
884 while let Ok(Some(mut up)) = self.ctx.user.updates.try_next() {
885 match &mut updates {
886 None => updates = Some(up),
887 Some(prev) => prev.extend(up.drain(..)),
888 }
889 }
890 }
891 };
892 (writes) => {
893 if self.ctx.user.net_writes.is_empty() {
894 if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
895 writes = Some(wr);
896 }
897 }
898 };
899 (tasks) => {
900 while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
901 tasks.push(up);
902 }
903 };
904 (rpcs) => {
905 if self.ctx.user.rpc_overflow.is_empty() {
906 while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
907 rpcs.push(up);
908 }
909 }
910 };
911 ($($item:tt),+) => {{
912 $(peek!($item));+
913 }};
914 }
915 select! {
916 rp = maybe_next(self.ctx.user.rpc_overflow.is_empty(), &mut self.ctx.user.rpcs) => {
917 rpcs.push(rp);
918 peek!(updates, tasks, writes, rpcs)
919 }
920 wr = maybe_next(self.ctx.user.net_writes.is_empty(), &mut self.ctx.user.writes) => {
921 writes = Some(wr);
922 peek!(updates, tasks, rpcs);
923 },
924 up = maybe_next(self.ctx.user.net_updates.is_empty(), &mut self.ctx.user.updates) => {
925 updates = Some(up);
926 peek!(updates, writes, tasks, rpcs);
927 },
928 up = self.ctx.user.tasks.join_next() => {
929 if let Some(Ok(up)) = up {
930 tasks.push(up);
931 }
932 peek!(updates, writes, tasks, rpcs)
933 },
934 n = to_rt.next() => match n {
935 None => break 'main Ok(()),
936 Some(i) => {
937 peek!(updates, writes, tasks, rpcs);
938 input = Some(i);
939 }
940 },
941 _ = or_never(ready) => peek!(updates, writes, tasks, rpcs),
942 () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
943 while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
944 if ts.elapsed() >= Duration::from_secs(1) {
945 self.ctx.user.pending_unsubscribe.pop_front();
946 } else {
947 break
948 }
949 }
950 continue 'main
951 }
952 }
953 match input {
954 None => (),
955 Some(ToRt::GetEnv { res }) => {
956 let _ = res.send(self.ctx.env.clone());
957 }
958 Some(ToRt::Compile { text, res }) => {
959 let _ = res.send(self.compile(text).await);
960 }
961 Some(ToRt::Load { path, res }) => {
962 let _ = res.send(self.load(&path).await);
963 }
964 Some(ToRt::Subscribe { ch }) => {
965 self.subs.push(ch);
966 }
967 Some(ToRt::Delete { id, res }) => {
968 if let Some(mut n) = self.nodes.shift_remove(&id) {
970 n.delete(&mut self.ctx);
971 }
972 let _ = res.send(Ok(self.ctx.env.clone()));
973 }
974 Some(ToRt::CompileCallable { id, rt, res }) => {
975 let _ = res.send(self.compile_callable(id, rt));
976 }
977 Some(ToRt::CompileRef { id, res }) => {
978 let _ = res.send(self.compile_ref(id));
979 }
980 Some(ToRt::DeleteCallable { id }) => self.delete_callable(id),
981 Some(ToRt::Call { id, args }) => {
982 if let Err(e) = self.call_callable(id, args, &mut tasks) {
983 error!("calling callable {id:?} failed with {e:?}")
984 }
985 }
986 }
987 self.do_cycle(updates, writes, &mut tasks, &mut rpcs).await;
988 if !self.ctx.user.rpc_clients.is_empty() {
989 if now - self.last_rpc_gc >= onemin {
990 self.last_rpc_gc = now;
991 self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
992 }
993 }
994 }
995 }
996}
997
998#[derive(Clone)]
1001pub struct BSHandle(mpsc::UnboundedSender<ToRt>);
1002
1003impl BSHandle {
1004 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
1005 let (tx, rx) = oneshot::channel();
1006 self.0.unbounded_send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
1007 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
1008 }
1009
1010 pub async fn get_env(&self) -> Result<Env<BSCtx, NoUserEvent>> {
1012 self.exec(|res| ToRt::GetEnv { res }).await
1013 }
1014
1015 pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
1020 Ok(self.exec(|tx| ToRt::Compile { text, res: tx }).await??)
1021 }
1022
1023 pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
1029 Ok(self.exec(|tx| ToRt::Load { path, res: tx }).await??)
1030 }
1031
1032 pub async fn delete(&self, id: ExprId) -> Result<Env<BSCtx, NoUserEvent>> {
1034 Ok(self.exec(|tx| ToRt::Delete { id, res: tx }).await??)
1035 }
1036
1037 pub async fn compile_callable(&self, id: Value) -> Result<Callable> {
1040 Ok(self
1041 .exec(|tx| ToRt::CompileCallable { id, rt: self.clone(), res: tx })
1042 .await??)
1043 }
1044
1045 pub async fn compile_ref(&self, id: Value) -> Result<ExprId> {
1049 Ok(self.exec(|tx| ToRt::CompileRef { id, res: tx }).await??)
1050 }
1051
1052 pub fn subscribe(&self, ch: mpsc::Sender<RtEvent>) -> Result<()> {
1056 self.0
1057 .unbounded_send(ToRt::Subscribe { ch })
1058 .map_err(|_| anyhow!("runtime is dead"))
1059 }
1060}
1061
1062#[derive(Builder)]
1063pub struct BSConfig {
1064 publisher: Publisher,
1065 subscriber: Subscriber,
1066 #[builder(setter(strip_option), default)]
1067 subscribe_timeout: Option<Duration>,
1068 #[builder(setter(strip_option), default)]
1069 publish_timeout: Option<Duration>,
1070 #[builder(setter(skip))]
1071 ctx: Option<ExecCtx<BSCtx, NoUserEvent>>,
1072}
1073
1074impl BSConfig {
1075 pub fn register_builtin<T: BuiltIn<BSCtx, NoUserEvent>>(&mut self) -> Result<()> {
1077 let ctx = match self.ctx.as_mut() {
1078 Some(ctx) => ctx,
1079 None => {
1080 self.ctx = Some(ExecCtx::new(BSCtx::new(
1081 self.publisher.clone(),
1082 self.subscriber.clone(),
1083 )));
1084 self.ctx.as_mut().unwrap()
1085 }
1086 };
1087 ctx.register_builtin::<T>()
1088 }
1089
1090 pub fn start(self) -> BSHandle {
1093 let (tx, rx) = mpsc::unbounded();
1094 task::spawn(async move {
1095 let bs = BS::new(self);
1096 if let Err(e) = bs.run(rx).await {
1097 error!("run loop exited with error {e:?}")
1098 }
1099 });
1100 BSHandle(tx)
1101 }
1102}