1use crate::{
13 env::Env,
14 expr::{self, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin},
15 node::{Node, NodeKind},
16 typ::{NoRefs, Type},
17 BindId, BuiltIn, Ctx, Event, ExecCtx, NoUserEvent,
18};
19use anyhow::{anyhow, bail, Context, Result};
20use arcstr::{literal, ArcStr};
21use chrono::prelude::*;
22use combine::stream::position::SourcePosition;
23use compact_str::format_compact;
24use core::fmt;
25use derive_builder::Builder;
26use futures::{channel::mpsc, future::join_all, FutureExt, SinkExt, StreamExt};
27use fxhash::{FxBuildHasher, FxHashMap};
28use indexmap::IndexMap;
29use log::error;
30use netidx::{
31 path::Path,
32 pool::Pooled,
33 protocol::valarray::ValArray,
34 publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
35 resolver_client::ChangeTracker,
36 subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
37};
38use netidx_protocols::rpc::{
39 self,
40 server::{ArgSpec, RpcCall},
41};
42use smallvec::{smallvec, SmallVec};
43use std::{
44 collections::{hash_map::Entry, HashMap, VecDeque},
45 future, mem,
46 os::unix::ffi::OsStrExt,
47 path::{Component, PathBuf},
48 time::Duration,
49};
50use tokio::{
51 fs, select,
52 sync::{oneshot, Mutex},
53 task::{self, JoinSet},
54 time::{self, Instant},
55};
56use triomphe::Arc;
57
58type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
59type WriteBatch = Pooled<Vec<WriteRequest>>;
60
61#[derive(Debug)]
62pub struct CouldNotResolve;
63
64impl fmt::Display for CouldNotResolve {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 write!(f, "could not resolve module")
67 }
68}
69
70#[derive(Debug)]
71struct RpcClient {
72 proc: rpc::client::Proc,
73 last_used: Instant,
74}
75
76#[derive(Debug)]
77pub struct BSCtx {
78 by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
79 subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
80 published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
81 var_updates: VecDeque<(BindId, Value)>,
82 net_updates: VecDeque<(SubId, subscriber::Event)>,
83 net_writes: VecDeque<(Id, WriteRequest)>,
84 rpc_overflow: VecDeque<(BindId, RpcCall)>,
85 rpc_clients: FxHashMap<Path, RpcClient>,
86 published_rpcs: FxHashMap<Path, rpc::server::Proc>,
87 pending_unsubscribe: VecDeque<(Instant, Dval)>,
88 change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
89 tasks: JoinSet<(BindId, Value)>,
90 batch: publisher::UpdateBatch,
91 publisher: Publisher,
92 subscriber: Subscriber,
93 updates_tx: mpsc::Sender<UpdateBatch>,
94 updates: mpsc::Receiver<UpdateBatch>,
95 writes_tx: mpsc::Sender<WriteBatch>,
96 writes: mpsc::Receiver<WriteBatch>,
97 rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
98 rpcs: mpsc::Receiver<(BindId, RpcCall)>,
99}
100
101impl BSCtx {
102 fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
103 let (updates_tx, updates) = mpsc::channel(3);
104 let (writes_tx, writes) = mpsc::channel(100);
105 let (rpcs_tx, rpcs) = mpsc::channel(100);
106 let batch = publisher.start_batch();
107 let mut tasks = JoinSet::new();
108 tasks.spawn(async { future::pending().await });
109 Self {
110 by_ref: HashMap::default(),
111 var_updates: VecDeque::new(),
112 net_updates: VecDeque::new(),
113 net_writes: VecDeque::new(),
114 rpc_overflow: VecDeque::new(),
115 rpc_clients: HashMap::default(),
116 subscribed: HashMap::default(),
117 pending_unsubscribe: VecDeque::new(),
118 published: HashMap::default(),
119 change_trackers: HashMap::default(),
120 published_rpcs: HashMap::default(),
121 tasks,
122 batch,
123 publisher,
124 subscriber,
125 updates,
126 updates_tx,
127 writes,
128 writes_tx,
129 rpcs_tx,
130 rpcs,
131 }
132 }
133}
134
135macro_rules! or_err {
136 ($bindid:expr, $e:expr) => {
137 match $e {
138 Ok(v) => v,
139 Err(e) => {
140 let e = ArcStr::from(format_compact!("{e:?}").as_str());
141 let e = Value::Error(e);
142 return ($bindid, e);
143 }
144 }
145 };
146}
147
148macro_rules! check_changed {
149 ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
150 let mut ct = $ct.lock().await;
151 if ct.path() != &$path {
152 *ct = ChangeTracker::new($path.clone());
153 }
154 if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
155 return ($id, Value::Null);
156 }
157 };
158}
159
160impl Ctx for BSCtx {
161 fn clear(&mut self) {
162 let Self {
163 by_ref,
164 var_updates,
165 net_updates,
166 net_writes,
167 rpc_clients,
168 rpc_overflow,
169 subscribed,
170 published,
171 published_rpcs,
172 pending_unsubscribe,
173 change_trackers,
174 tasks,
175 batch,
176 publisher,
177 subscriber: _,
178 updates_tx,
179 updates,
180 writes_tx,
181 writes,
182 rpcs,
183 rpcs_tx,
184 } = self;
185 by_ref.clear();
186 var_updates.clear();
187 net_updates.clear();
188 net_writes.clear();
189 rpc_overflow.clear();
190 rpc_clients.clear();
191 subscribed.clear();
192 published.clear();
193 published_rpcs.clear();
194 pending_unsubscribe.clear();
195 change_trackers.clear();
196 *tasks = JoinSet::new();
197 tasks.spawn(async { future::pending().await });
198 *batch = publisher.start_batch();
199 let (tx, rx) = mpsc::channel(3);
200 *updates_tx = tx;
201 *updates = rx;
202 let (tx, rx) = mpsc::channel(100);
203 *writes_tx = tx;
204 *writes = rx;
205 let (tx, rx) = mpsc::channel(100);
206 *rpcs_tx = tx;
207 *rpcs = rx
208 }
209
210 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
211 let now = Instant::now();
212 let proc = match self.rpc_clients.entry(name) {
213 Entry::Occupied(mut e) => {
214 let cl = e.get_mut();
215 cl.last_used = now;
216 Ok(cl.proc.clone())
217 }
218 Entry::Vacant(e) => {
219 match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
220 Err(e) => Err(e),
221 Ok(proc) => {
222 let cl = RpcClient { last_used: now, proc: proc.clone() };
223 e.insert(cl);
224 Ok(proc)
225 }
226 }
227 }
228 };
229 self.tasks.spawn(async move {
230 macro_rules! err {
231 ($e:expr) => {{
232 let e = format_compact!("{:?}", $e);
233 (id, Value::Error(e.as_str().into()))
234 }};
235 }
236 match proc {
237 Err(e) => err!(e),
238 Ok(proc) => match proc.call(args).await {
239 Err(e) => err!(e),
240 Ok(res) => (id, res),
241 },
242 }
243 });
244 }
245
246 fn publish_rpc(
247 &mut self,
248 name: Path,
249 doc: Value,
250 spec: Vec<ArgSpec>,
251 id: BindId,
252 ) -> Result<()> {
253 use rpc::server::Proc;
254 let e = match self.published_rpcs.entry(name) {
255 Entry::Vacant(e) => e,
256 Entry::Occupied(_) => bail!("already published"),
257 };
258 let proc = Proc::new(
259 &self.publisher,
260 e.key().clone(),
261 doc,
262 spec,
263 move |c| Some((id, c)),
264 Some(self.rpcs_tx.clone()),
265 )?;
266 e.insert(proc);
267 Ok(())
268 }
269
270 fn unpublish_rpc(&mut self, name: Path) {
271 self.published_rpcs.remove(&name);
272 }
273
274 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
275 let dval =
276 self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
277 *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
278 dval
279 }
280
281 fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
282 if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
283 if let Some(cn) = exprs.get_mut(&ref_by) {
284 *cn -= 1;
285 if *cn == 0 {
286 exprs.remove(&ref_by);
287 }
288 }
289 if exprs.is_empty() {
290 self.subscribed.remove(&dv.id());
291 }
292 }
293 self.pending_unsubscribe.push_back((Instant::now(), dv));
294 }
295
296 fn list(&mut self, id: BindId, path: Path) {
297 let ct = self
298 .change_trackers
299 .entry(id)
300 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
301 let ct = Arc::clone(ct);
302 let resolver = self.subscriber.resolver();
303 self.tasks.spawn(async move {
304 check_changed!(id, resolver, path, ct);
305 let mut paths = or_err!(id, resolver.list(path).await);
306 let paths = paths.drain(..).map(|p| Value::String(p.into()));
307 (id, Value::Array(ValArray::from_iter_exact(paths)))
308 });
309 }
310
311 fn list_table(&mut self, id: BindId, path: Path) {
312 let ct = self
313 .change_trackers
314 .entry(id)
315 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
316 let ct = Arc::clone(ct);
317 let resolver = self.subscriber.resolver();
318 self.tasks.spawn(async move {
319 check_changed!(id, resolver, path, ct);
320 let mut tbl = or_err!(id, resolver.table(path).await);
321 let cols = tbl.cols.drain(..).map(|(name, count)| {
322 Value::Array(ValArray::from([
323 Value::String(name.into()),
324 Value::V64(count.0),
325 ]))
326 });
327 let cols = Value::Array(ValArray::from_iter_exact(cols));
328 let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
329 let rows = Value::Array(ValArray::from_iter_exact(rows));
330 let tbl = Value::Array(ValArray::from([
331 Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
332 Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
333 ]));
334 (id, tbl)
335 });
336 }
337
338 fn stop_list(&mut self, id: BindId) {
339 self.change_trackers.remove(&id);
340 }
341
342 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
343 let val = self.publisher.publish_with_flags_and_writes(
344 PublishFlags::empty(),
345 path,
346 value,
347 Some(self.writes_tx.clone()),
348 )?;
349 let id = val.id();
350 *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
351 Ok(val)
352 }
353
354 fn update(&mut self, val: &Val, value: Value) {
355 val.update(&mut self.batch, value);
356 }
357
358 fn unpublish(&mut self, val: Val, ref_by: ExprId) {
359 if let Some(refs) = self.published.get_mut(&val.id()) {
360 if let Some(cn) = refs.get_mut(&ref_by) {
361 *cn -= 1;
362 if *cn == 0 {
363 refs.remove(&ref_by);
364 }
365 }
366 if refs.is_empty() {
367 self.published.remove(&val.id());
368 }
369 }
370 }
371
372 fn set_timer(&mut self, id: BindId, timeout: Duration) {
373 self.tasks
374 .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
375 }
376
377 fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
378 *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
379 }
380
381 fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
382 if let Some(refs) = self.by_ref.get_mut(&id) {
383 if let Some(cn) = refs.get_mut(&ref_by) {
384 *cn -= 1;
385 if *cn == 0 {
386 refs.remove(&ref_by);
387 }
388 }
389 if refs.is_empty() {
390 self.by_ref.remove(&id);
391 }
392 }
393 }
394
395 fn set_var(&mut self, id: BindId, value: Value) {
396 self.var_updates.push_back((id, value.clone()));
397 }
398}
399
400fn is_output(n: &Node<BSCtx, NoUserEvent>) -> bool {
401 match &n.kind {
402 NodeKind::Bind { .. }
403 | NodeKind::Lambda(_)
404 | NodeKind::Use { .. }
405 | NodeKind::Connect(_, _)
406 | NodeKind::Module(_)
407 | NodeKind::TypeDef { .. } => false,
408 _ => true,
409 }
410}
411
412async fn or_never(b: bool) {
413 if !b {
414 future::pending().await
415 }
416}
417
418async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
419 if go {
420 match ch.next().await {
421 None => future::pending().await,
422 Some(v) => v,
423 }
424 } else {
425 future::pending().await
426 }
427}
428
429async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
430 if pending.len() == 0 {
431 future::pending().await
432 } else {
433 let (ts, _) = pending.front().unwrap();
434 let one = Duration::from_secs(1);
435 let elapsed = now - *ts;
436 if elapsed < one {
437 time::sleep(one - elapsed).await
438 }
439 }
440}
441
442#[derive(Clone)]
443pub struct CompExp {
444 pub id: ExprId,
445 pub typ: Type<NoRefs>,
446 pub output: bool,
447}
448
449#[derive(Clone)]
450pub struct CompRes {
451 pub exprs: SmallVec<[CompExp; 1]>,
452 pub env: Env<BSCtx, NoUserEvent>,
453}
454
455#[derive(Clone)]
456pub enum RtEvent {
457 Updated(ExprId, Value),
458}
459
460enum ToRt {
461 GetEnv { res: oneshot::Sender<Env<BSCtx, NoUserEvent>> },
462 Delete { id: ExprId, res: oneshot::Sender<Result<Env<BSCtx, NoUserEvent>>> },
463 Load { path: PathBuf, res: oneshot::Sender<Result<CompRes>> },
464 Compile { text: ArcStr, res: oneshot::Sender<Result<CompRes>> },
465 Subscribe { ch: mpsc::Sender<RtEvent> },
466}
467
468struct BS {
471 ctx: ExecCtx<BSCtx, NoUserEvent>,
472 event: Event<NoUserEvent>,
473 updated: FxHashMap<ExprId, bool>,
474 nodes: IndexMap<ExprId, Node<BSCtx, NoUserEvent>, FxBuildHasher>,
475 subs: Vec<mpsc::Sender<RtEvent>>,
476 resolvers: Vec<ModuleResolver>,
477 publish_timeout: Option<Duration>,
478 last_rpc_gc: Instant,
479}
480
481impl BS {
482 fn new(mut rt: BSConfig) -> Self {
483 let resolvers_default = || match dirs::data_dir() {
484 None => vec![ModuleResolver::Files("".into())],
485 Some(dd) => vec![
486 ModuleResolver::Files("".into()),
487 ModuleResolver::Files(dd.join("bscript")),
488 ],
489 };
490 let resolvers = match std::env::var("BSCRIPT_MODPATH") {
491 Err(_) => resolvers_default(),
492 Ok(mp) => match ModuleResolver::parse_env(
493 rt.subscriber.clone(),
494 rt.subscribe_timeout,
495 &mp,
496 ) {
497 Ok(r) => r,
498 Err(e) => {
499 error!("failed to parse BSCRIPT_MODPATH, using default {e:?}");
500 resolvers_default()
501 }
502 },
503 };
504 let mut event = Event::new(NoUserEvent);
505 let mut ctx = match rt.ctx.take() {
506 Some(ctx) => ctx,
507 None => ExecCtx::new(BSCtx::new(rt.publisher, rt.subscriber)),
508 };
509 event.init = true;
510 let mut std = mem::take(&mut ctx.std);
511 for n in std.iter_mut() {
512 let _ = n.update(&mut ctx, &mut event);
513 }
514 ctx.std = std;
515 event.init = false;
516 Self {
517 ctx,
518 event,
519 updated: HashMap::default(),
520 nodes: IndexMap::default(),
521 subs: vec![],
522 resolvers,
523 publish_timeout: rt.publish_timeout,
524 last_rpc_gc: Instant::now(),
525 }
526 }
527
528 async fn do_cycle(
529 &mut self,
530 updates: Option<UpdateBatch>,
531 writes: Option<WriteBatch>,
532 tasks: &mut Vec<(BindId, Value)>,
533 rpcs: &mut Vec<(BindId, RpcCall)>,
534 ) {
535 macro_rules! push_event {
536 ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
537 match self.event.$event.entry($id) {
538 Entry::Vacant(e) => {
539 e.insert($v);
540 if let Some(exps) = self.ctx.user.$refed.get(&$id) {
541 for id in exps.keys() {
542 self.updated.entry(*id).or_insert(false);
543 }
544 }
545 }
546 Entry::Occupied(_) => {
547 self.ctx.user.$overflow.push_back(($id, $v));
548 }
549 }
550 };
551 }
552 for _ in 0..self.ctx.user.var_updates.len() {
553 let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
554 push_event!(id, v, variables, by_ref, var_updates)
555 }
556 for (id, v) in tasks.drain(..) {
557 push_event!(id, v, variables, by_ref, var_updates)
558 }
559 for _ in 0..self.ctx.user.rpc_overflow.len() {
560 let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
561 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
562 }
563 for (id, v) in rpcs.drain(..) {
564 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
565 }
566 for _ in 0..self.ctx.user.net_updates.len() {
567 let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
568 push_event!(id, v, netidx, subscribed, net_updates)
569 }
570 if let Some(mut updates) = updates {
571 for (id, v) in updates.drain(..) {
572 push_event!(id, v, netidx, subscribed, net_updates)
573 }
574 }
575 for _ in 0..self.ctx.user.net_writes.len() {
576 let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
577 push_event!(id, v, writes, published, net_writes)
578 }
579 if let Some(mut writes) = writes {
580 for wr in writes.drain(..) {
581 let id = wr.id;
582 push_event!(id, wr, writes, published, net_writes)
583 }
584 }
585 for (id, n) in self.nodes.iter_mut() {
586 if let Some(init) = self.updated.get(id) {
587 let mut clear: SmallVec<[BindId; 16]> = smallvec![];
588 self.event.init = *init;
589 if self.event.init {
590 n.refs(&mut |id| {
591 if let Some(v) = self.ctx.cached.get(&id) {
592 if let Entry::Vacant(e) = self.event.variables.entry(id) {
593 e.insert(v.clone());
594 clear.push(id);
595 }
596 }
597 });
598 }
599 let res = n.update(&mut self.ctx, &mut self.event);
600 for id in clear {
601 self.event.variables.remove(&id);
602 }
603 if let Some(v) = res {
604 let mut i = 0;
605 while i < self.subs.len() {
606 if let Err(_) =
607 self.subs[i].send(RtEvent::Updated(*id, v.clone())).await
608 {
609 self.subs.remove(i);
610 }
611 i += 1;
612 }
613 }
614 }
615 }
616 self.event.clear();
617 self.updated.clear();
618 if self.ctx.user.batch.len() > 0 {
619 let batch = mem::replace(
620 &mut self.ctx.user.batch,
621 self.ctx.user.publisher.start_batch(),
622 );
623 let timeout = self.publish_timeout;
624 task::spawn(async move { batch.commit(timeout).await });
625 }
626 }
627
628 fn cycle_ready(&self) -> bool {
629 self.ctx.user.var_updates.len() > 0
630 || self.ctx.user.net_updates.len() > 0
631 || self.ctx.user.net_writes.len() > 0
632 || self.ctx.user.rpc_overflow.len() > 0
633 }
634
635 async fn compile(&mut self, text: ArcStr) -> Result<CompRes> {
636 let scope = ModPath::root();
637 let ori = expr::parser::parse(None, text.clone())?;
638 let exprs = join_all(
639 ori.exprs.iter().map(|e| e.resolve_modules(&scope, &self.resolvers)),
640 )
641 .await
642 .into_iter()
643 .collect::<Result<SmallVec<[_; 4]>>>()
644 .context(CouldNotResolve)?;
645 let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
646 let nodes = ori
647 .exprs
648 .iter()
649 .map(|e| Node::compile(&mut self.ctx, &scope, e.clone()))
650 .collect::<Result<SmallVec<[_; 4]>>>()
651 .with_context(|| ori.clone())?;
652 let exprs = ori
653 .exprs
654 .iter()
655 .zip(nodes.into_iter())
656 .map(|(e, n)| {
657 let output = is_output(&n);
658 let typ = n.typ.clone();
659 self.updated.insert(e.id, true);
660 self.nodes.insert(e.id, n);
661 CompExp { id: e.id, output, typ }
662 })
663 .collect::<SmallVec<[_; 1]>>();
664 Ok(CompRes { exprs, env: self.ctx.env.clone() })
665 }
666
667 async fn load(&mut self, file: &PathBuf) -> Result<CompRes> {
668 let scope = ModPath::root();
669 let (scope, ori) = match file.extension() {
670 Some(e) if e.as_bytes() == b"bs" => {
671 let scope = match file.file_name() {
672 None => scope,
673 Some(name) => ModPath(scope.append(&*name.to_string_lossy())),
674 };
675 let s = fs::read_to_string(file).await?;
676 let s = if s.starts_with("#!") {
677 if let Some(i) = s.find('\n') {
678 &s[i..]
679 } else {
680 s.as_str()
681 }
682 } else {
683 s.as_str()
684 };
685 let s = ArcStr::from(s);
686 let name = ArcStr::from(file.to_string_lossy());
687 (scope, expr::parser::parse(Some(name), s)?)
688 }
689 Some(e) => bail!("invalid file extension {e:?}"),
690 None => {
691 let name = file
692 .components()
693 .map(|c| match c {
694 Component::RootDir
695 | Component::CurDir
696 | Component::ParentDir
697 | Component::Prefix(_) => bail!("invalid module name {file:?}"),
698 Component::Normal(s) => Ok(s),
699 })
700 .collect::<Result<Box<[_]>>>()?;
701 if name.len() != 1 {
702 bail!("invalid module name {file:?}")
703 }
704 let name = String::from_utf8_lossy(name[0].as_bytes());
705 let name = name
706 .parse::<ModPath>()
707 .with_context(|| "parsing module name {file:?}")?;
708 let scope =
709 ModPath(Path::from_str(Path::dirname(&*name).unwrap_or(&*scope)));
710 let name = Path::basename(&*name)
711 .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
712 let name = ArcStr::from(name);
713 let e = ExprKind::Module {
714 export: true,
715 name: name.clone(),
716 value: ModuleKind::Unresolved,
717 }
718 .to_expr(Default::default());
719 let ori = Origin {
720 name: Some(name),
721 source: literal!(""),
722 exprs: Arc::from_iter([e]),
723 };
724 (scope, ori)
725 }
726 };
727 let expr = ExprKind::Module {
728 export: true,
729 name: ori.name.clone().unwrap_or(literal!("")),
730 value: ModuleKind::Inline(ori.exprs.clone()),
731 }
732 .to_expr(SourcePosition::default());
733 let expr = expr
734 .resolve_modules(&scope, &self.resolvers)
735 .await
736 .with_context(|| ori.clone())?;
737 let top_id = expr.id;
738 let n =
739 Node::compile(&mut self.ctx, &scope, expr).with_context(|| ori.clone())?;
740 let has_out = is_output(&n);
741 let typ = n.typ.clone();
742 self.nodes.insert(top_id, n);
743 self.updated.insert(top_id, true);
744 let exprs = smallvec![CompExp { id: top_id, output: has_out, typ }];
745 Ok(CompRes { exprs, env: self.ctx.env.clone() })
746 }
747
748 async fn run(mut self, mut to_rt: mpsc::UnboundedReceiver<ToRt>) -> Result<()> {
749 let mut tasks = vec![];
750 let mut rpcs = vec![];
751 let onemin = Duration::from_secs(60);
752 'main: loop {
753 let now = Instant::now();
754 let ready = self.cycle_ready();
755 let mut updates = None;
756 let mut writes = None;
757 let mut input = None;
758 macro_rules! peek {
759 (updates) => {
760 if self.ctx.user.net_updates.is_empty() {
761 if let Ok(Some(up)) = self.ctx.user.updates.try_next() {
762 updates = Some(up);
763 }
764 }
765 };
766 (writes) => {
767 if self.ctx.user.net_writes.is_empty() {
768 if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
769 writes = Some(wr);
770 }
771 }
772 };
773 (tasks) => {
774 while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
775 tasks.push(up);
776 }
777 };
778 (rpcs) => {
779 if self.ctx.user.rpc_overflow.is_empty() {
780 while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
781 rpcs.push(up);
782 }
783 }
784 };
785 ($($item:tt),+) => {{
786 $(peek!($item));+
787 }};
788 }
789 select! {
790 rp = maybe_next(self.ctx.user.rpc_overflow.is_empty(), &mut self.ctx.user.rpcs) => {
791 rpcs.push(rp);
792 peek!(updates, tasks, writes, rpcs)
793 }
794 wr = maybe_next(self.ctx.user.net_writes.is_empty(), &mut self.ctx.user.writes) => {
795 writes = Some(wr);
796 peek!(updates, tasks, rpcs);
797 },
798 up = maybe_next(self.ctx.user.net_updates.is_empty(), &mut self.ctx.user.updates) => {
799 updates = Some(up);
800 peek!(writes, tasks, rpcs);
801 },
802 up = self.ctx.user.tasks.join_next() => {
803 if let Some(Ok(up)) = up {
804 tasks.push(up);
805 }
806 peek!(updates, writes, tasks, rpcs)
807 },
808 n = to_rt.next() => match n {
809 None => break 'main Ok(()),
810 Some(i) => {
811 peek!(updates, writes, tasks, rpcs);
812 input = Some(i);
813 }
814 },
815 _ = or_never(ready) => peek!(updates, writes, tasks, rpcs),
816 () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
817 while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
818 if ts.elapsed() >= Duration::from_secs(1) {
819 self.ctx.user.pending_unsubscribe.pop_front();
820 } else {
821 break
822 }
823 }
824 continue 'main
825 }
826 }
827 match input {
828 None => (),
829 Some(ToRt::GetEnv { res }) => {
830 let _ = res.send(self.ctx.env.clone());
831 }
832 Some(ToRt::Compile { text, res }) => {
833 let _ = res.send(self.compile(text).await);
834 }
835 Some(ToRt::Load { path, res }) => {
836 let _ = res.send(self.load(&path).await);
837 }
838 Some(ToRt::Subscribe { ch }) => {
839 self.subs.push(ch);
840 }
841 Some(ToRt::Delete { id, res }) => {
842 if let Some(n) = self.nodes.shift_remove(&id) {
844 n.delete(&mut self.ctx);
845 }
846 let _ = res.send(Ok(self.ctx.env.clone()));
847 }
848 }
849 self.do_cycle(updates, writes, &mut tasks, &mut rpcs).await;
850 if !self.ctx.user.rpc_clients.is_empty() {
851 if now - self.last_rpc_gc >= onemin {
852 self.last_rpc_gc = now;
853 self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
854 }
855 }
856 }
857 }
858}
859
860pub struct BSHandle(mpsc::UnboundedSender<ToRt>);
863
864impl BSHandle {
865 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
866 let (tx, rx) = oneshot::channel();
867 self.0.unbounded_send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
868 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
869 }
870
871 pub async fn get_env(&self) -> Result<Env<BSCtx, NoUserEvent>> {
873 self.exec(|res| ToRt::GetEnv { res }).await
874 }
875
876 pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
881 Ok(self.exec(|tx| ToRt::Compile { text, res: tx }).await??)
882 }
883
884 pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
890 Ok(self.exec(|tx| ToRt::Load { path, res: tx }).await??)
891 }
892
893 pub async fn delete(&self, id: ExprId) -> Result<Env<BSCtx, NoUserEvent>> {
895 Ok(self.exec(|tx| ToRt::Delete { id, res: tx }).await??)
896 }
897
898 pub fn subscribe(&self, ch: mpsc::Sender<RtEvent>) -> Result<()> {
902 self.0
903 .unbounded_send(ToRt::Subscribe { ch })
904 .map_err(|_| anyhow!("runtime is dead"))
905 }
906}
907
908#[derive(Builder)]
909pub struct BSConfig {
910 publisher: Publisher,
911 subscriber: Subscriber,
912 #[builder(setter(strip_option), default)]
913 subscribe_timeout: Option<Duration>,
914 #[builder(setter(strip_option), default)]
915 publish_timeout: Option<Duration>,
916 #[builder(setter(skip))]
917 ctx: Option<ExecCtx<BSCtx, NoUserEvent>>,
918}
919
920impl BSConfig {
921 pub fn register_builtin<T: BuiltIn<BSCtx, NoUserEvent>>(&mut self) -> Result<()> {
923 let ctx = match self.ctx.as_mut() {
924 Some(ctx) => ctx,
925 None => {
926 self.ctx = Some(ExecCtx::new(BSCtx::new(
927 self.publisher.clone(),
928 self.subscriber.clone(),
929 )));
930 self.ctx.as_mut().unwrap()
931 }
932 };
933 ctx.register_builtin::<T>()
934 }
935
936 pub fn start(self) -> BSHandle {
939 let (tx, rx) = mpsc::unbounded();
940 task::spawn(async move {
941 let bs = BS::new(self);
942 if let Err(e) = bs.run(rx).await {
943 error!("run loop exited with error {e:?}")
944 }
945 });
946 BSHandle(tx)
947 }
948}