1use anyhow::{anyhow, bail, Context, Result};
13use arcstr::{literal, ArcStr};
14use chrono::prelude::*;
15use combine::stream::position::SourcePosition;
16use compact_str::format_compact;
17use core::fmt;
18use derive_builder::Builder;
19use futures::{channel::mpsc, future::try_join_all, FutureExt, StreamExt};
20use fxhash::{FxBuildHasher, FxHashMap};
21use graphix_compiler::{
22 compile,
23 env::Env,
24 expr::{
25 self, Expr, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin, Source,
26 },
27 node::genn,
28 typ::{FnType, Type},
29 BindId, Ctx, Event, ExecCtx, LambdaId, NoUserEvent, Node, REFS,
30};
31use indexmap::IndexMap;
32use log::{debug, error, info};
33use netidx::{
34 path::Path,
35 pool::{Pool, Pooled},
36 protocol::valarray::ValArray,
37 publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
38 resolver_client::ChangeTracker,
39 subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
40};
41use netidx_core::atomic_id;
42use netidx_protocols::rpc::{
43 self,
44 server::{ArgSpec, RpcCall},
45};
46use serde_derive::{Deserialize, Serialize};
47use smallvec::{smallvec, SmallVec};
48use std::{
49 collections::{hash_map::Entry, HashMap, VecDeque},
50 future, mem,
51 panic::{catch_unwind, AssertUnwindSafe},
52 path::{Component, PathBuf},
53 result,
54 sync::{LazyLock, Weak},
55 time::Duration,
56};
57use tokio::{
58 fs, select,
59 sync::{
60 mpsc::{self as tmpsc, error::SendTimeoutError, UnboundedReceiver},
61 oneshot, Mutex,
62 },
63 task::{self, JoinError, JoinSet},
64 time::{self, Instant},
65};
66use triomphe::Arc;
67
68type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
69type WriteBatch = Pooled<Vec<WriteRequest>>;
70
71#[derive(Debug)]
72pub struct CouldNotResolve;
73
74impl fmt::Display for CouldNotResolve {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 write!(f, "could not resolve module")
77 }
78}
79
80#[derive(Debug)]
81struct RpcClient {
82 proc: rpc::client::Proc,
83 last_used: Instant,
84}
85
86#[derive(Debug)]
87pub struct GXCtx {
88 by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
89 subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
90 published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
91 var_updates: VecDeque<(BindId, Value)>,
92 net_updates: VecDeque<(SubId, subscriber::Event)>,
93 net_writes: VecDeque<(Id, WriteRequest)>,
94 rpc_overflow: VecDeque<(BindId, RpcCall)>,
95 rpc_clients: FxHashMap<Path, RpcClient>,
96 published_rpcs: FxHashMap<Path, rpc::server::Proc>,
97 pending_unsubscribe: VecDeque<(Instant, Dval)>,
98 change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
99 tasks: JoinSet<(BindId, Value)>,
100 batch: publisher::UpdateBatch,
101 publisher: Publisher,
102 subscriber: Subscriber,
103 updates_tx: mpsc::Sender<UpdateBatch>,
104 updates: mpsc::Receiver<UpdateBatch>,
105 writes_tx: mpsc::Sender<WriteBatch>,
106 writes: mpsc::Receiver<WriteBatch>,
107 rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
108 rpcs: mpsc::Receiver<(BindId, RpcCall)>,
109}
110
111impl GXCtx {
112 pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
113 let (updates_tx, updates) = mpsc::channel(100);
114 let (writes_tx, writes) = mpsc::channel(100);
115 let (rpcs_tx, rpcs) = mpsc::channel(100);
116 let batch = publisher.start_batch();
117 let mut tasks = JoinSet::new();
118 tasks.spawn(async { future::pending().await });
119 Self {
120 by_ref: HashMap::default(),
121 var_updates: VecDeque::new(),
122 net_updates: VecDeque::new(),
123 net_writes: VecDeque::new(),
124 rpc_overflow: VecDeque::new(),
125 rpc_clients: HashMap::default(),
126 subscribed: HashMap::default(),
127 pending_unsubscribe: VecDeque::new(),
128 published: HashMap::default(),
129 change_trackers: HashMap::default(),
130 published_rpcs: HashMap::default(),
131 tasks,
132 batch,
133 publisher,
134 subscriber,
135 updates,
136 updates_tx,
137 writes,
138 writes_tx,
139 rpcs_tx,
140 rpcs,
141 }
142 }
143}
144
145macro_rules! or_err {
146 ($bindid:expr, $e:expr) => {
147 match $e {
148 Ok(v) => v,
149 Err(e) => {
150 let e = ArcStr::from(format_compact!("{e:?}").as_str());
151 let e = Value::Error(e);
152 return ($bindid, e);
153 }
154 }
155 };
156}
157
158macro_rules! check_changed {
159 ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
160 let mut ct = $ct.lock().await;
161 if ct.path() != &$path {
162 *ct = ChangeTracker::new($path.clone());
163 }
164 if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
165 return ($id, Value::Null);
166 }
167 };
168}
169
170impl Ctx for GXCtx {
171 fn clear(&mut self) {
172 let Self {
173 by_ref,
174 var_updates,
175 net_updates,
176 net_writes,
177 rpc_clients,
178 rpc_overflow,
179 subscribed,
180 published,
181 published_rpcs,
182 pending_unsubscribe,
183 change_trackers,
184 tasks,
185 batch,
186 publisher,
187 subscriber: _,
188 updates_tx,
189 updates,
190 writes_tx,
191 writes,
192 rpcs,
193 rpcs_tx,
194 } = self;
195 by_ref.clear();
196 var_updates.clear();
197 net_updates.clear();
198 net_writes.clear();
199 rpc_overflow.clear();
200 rpc_clients.clear();
201 subscribed.clear();
202 published.clear();
203 published_rpcs.clear();
204 pending_unsubscribe.clear();
205 change_trackers.clear();
206 *tasks = JoinSet::new();
207 tasks.spawn(async { future::pending().await });
208 *batch = publisher.start_batch();
209 let (tx, rx) = mpsc::channel(3);
210 *updates_tx = tx;
211 *updates = rx;
212 let (tx, rx) = mpsc::channel(100);
213 *writes_tx = tx;
214 *writes = rx;
215 let (tx, rx) = mpsc::channel(100);
216 *rpcs_tx = tx;
217 *rpcs = rx
218 }
219
220 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
221 let now = Instant::now();
222 let proc = match self.rpc_clients.entry(name) {
223 Entry::Occupied(mut e) => {
224 let cl = e.get_mut();
225 cl.last_used = now;
226 Ok(cl.proc.clone())
227 }
228 Entry::Vacant(e) => {
229 match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
230 Err(e) => Err(e),
231 Ok(proc) => {
232 let cl = RpcClient { last_used: now, proc: proc.clone() };
233 e.insert(cl);
234 Ok(proc)
235 }
236 }
237 }
238 };
239 self.tasks.spawn(async move {
240 macro_rules! err {
241 ($e:expr) => {{
242 let e = format_compact!("{:?}", $e);
243 (id, Value::Error(e.as_str().into()))
244 }};
245 }
246 match proc {
247 Err(e) => err!(e),
248 Ok(proc) => match proc.call(args).await {
249 Err(e) => err!(e),
250 Ok(res) => (id, res),
251 },
252 }
253 });
254 }
255
256 fn publish_rpc(
257 &mut self,
258 name: Path,
259 doc: Value,
260 spec: Vec<ArgSpec>,
261 id: BindId,
262 ) -> Result<()> {
263 use rpc::server::Proc;
264 let e = match self.published_rpcs.entry(name) {
265 Entry::Vacant(e) => e,
266 Entry::Occupied(_) => bail!("already published"),
267 };
268 let proc = Proc::new(
269 &self.publisher,
270 e.key().clone(),
271 doc,
272 spec,
273 move |c| Some((id, c)),
274 Some(self.rpcs_tx.clone()),
275 )?;
276 e.insert(proc);
277 Ok(())
278 }
279
280 fn unpublish_rpc(&mut self, name: Path) {
281 self.published_rpcs.remove(&name);
282 }
283
284 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
285 let dval =
286 self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
287 *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
288 dval
289 }
290
291 fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
292 if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
293 if let Some(cn) = exprs.get_mut(&ref_by) {
294 *cn -= 1;
295 if *cn == 0 {
296 exprs.remove(&ref_by);
297 }
298 }
299 if exprs.is_empty() {
300 self.subscribed.remove(&dv.id());
301 }
302 }
303 self.pending_unsubscribe.push_back((Instant::now(), dv));
304 }
305
306 fn list(&mut self, id: BindId, path: Path) {
307 let ct = self
308 .change_trackers
309 .entry(id)
310 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
311 let ct = Arc::clone(ct);
312 let resolver = self.subscriber.resolver();
313 self.tasks.spawn(async move {
314 check_changed!(id, resolver, path, ct);
315 let mut paths = or_err!(id, resolver.list(path).await);
316 let paths = paths.drain(..).map(|p| Value::String(p.into()));
317 (id, Value::Array(ValArray::from_iter_exact(paths)))
318 });
319 }
320
321 fn list_table(&mut self, id: BindId, path: Path) {
322 let ct = self
323 .change_trackers
324 .entry(id)
325 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
326 let ct = Arc::clone(ct);
327 let resolver = self.subscriber.resolver();
328 self.tasks.spawn(async move {
329 check_changed!(id, resolver, path, ct);
330 let mut tbl = or_err!(id, resolver.table(path).await);
331 let cols = tbl.cols.drain(..).map(|(name, count)| {
332 Value::Array(ValArray::from([
333 Value::String(name.into()),
334 Value::V64(count.0),
335 ]))
336 });
337 let cols = Value::Array(ValArray::from_iter_exact(cols));
338 let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
339 let rows = Value::Array(ValArray::from_iter_exact(rows));
340 let tbl = Value::Array(ValArray::from([
341 Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
342 Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
343 ]));
344 (id, tbl)
345 });
346 }
347
348 fn stop_list(&mut self, id: BindId) {
349 self.change_trackers.remove(&id);
350 }
351
352 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
353 let val = self.publisher.publish_with_flags_and_writes(
354 PublishFlags::empty(),
355 path,
356 value,
357 Some(self.writes_tx.clone()),
358 )?;
359 let id = val.id();
360 *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
361 Ok(val)
362 }
363
364 fn update(&mut self, val: &Val, value: Value) {
365 val.update(&mut self.batch, value);
366 }
367
368 fn unpublish(&mut self, val: Val, ref_by: ExprId) {
369 if let Some(refs) = self.published.get_mut(&val.id()) {
370 if let Some(cn) = refs.get_mut(&ref_by) {
371 *cn -= 1;
372 if *cn == 0 {
373 refs.remove(&ref_by);
374 }
375 }
376 if refs.is_empty() {
377 self.published.remove(&val.id());
378 }
379 }
380 }
381
382 fn set_timer(&mut self, id: BindId, timeout: Duration) {
383 self.tasks
384 .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
385 }
386
387 fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
388 *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
389 }
390
391 fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
392 if let Some(refs) = self.by_ref.get_mut(&id) {
393 if let Some(cn) = refs.get_mut(&ref_by) {
394 *cn -= 1;
395 if *cn == 0 {
396 refs.remove(&ref_by);
397 }
398 }
399 if refs.is_empty() {
400 self.by_ref.remove(&id);
401 }
402 }
403 }
404
405 fn set_var(&mut self, id: BindId, value: Value) {
406 self.var_updates.push_back((id, value.clone()));
407 }
408}
409
410fn is_output(n: &Node<GXCtx, NoUserEvent>) -> bool {
411 match &n.spec().kind {
412 ExprKind::Bind { .. }
413 | ExprKind::Lambda { .. }
414 | ExprKind::Use { .. }
415 | ExprKind::Connect { .. }
416 | ExprKind::Module { .. }
417 | ExprKind::TypeDef { .. } => false,
418 _ => true,
419 }
420}
421
422async fn or_never(b: bool) {
423 if !b {
424 future::pending().await
425 }
426}
427
428async fn join_or_wait(
429 js: &mut JoinSet<(BindId, Value)>,
430) -> result::Result<(BindId, Value), JoinError> {
431 match js.join_next().await {
432 None => future::pending().await,
433 Some(r) => r,
434 }
435}
436
437async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
438 if go {
439 match ch.next().await {
440 None => future::pending().await,
441 Some(v) => v,
442 }
443 } else {
444 future::pending().await
445 }
446}
447
448async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
449 if pending.len() == 0 {
450 future::pending().await
451 } else {
452 let (ts, _) = pending.front().unwrap();
453 let one = Duration::from_secs(1);
454 let elapsed = now - *ts;
455 if elapsed < one {
456 time::sleep(one - elapsed).await
457 }
458 }
459}
460
461pub struct CompExp {
462 pub id: ExprId,
463 pub typ: Type,
464 pub output: bool,
465 rt: GXHandle,
466}
467
468impl Drop for CompExp {
469 fn drop(&mut self) {
470 let _ = self.rt.0.send(ToRt::Delete { id: self.id });
471 }
472}
473
474pub struct CompRes {
475 pub exprs: SmallVec<[CompExp; 1]>,
476 pub env: Env<GXCtx, NoUserEvent>,
477}
478
479#[derive(Clone)]
480pub enum RtEvent {
481 Updated(ExprId, Value),
482 Env(Env<GXCtx, NoUserEvent>),
483}
484
485static BATCH: LazyLock<Pool<Vec<RtEvent>>> = LazyLock::new(|| Pool::new(10, 1000000));
486
487pub struct Ref {
488 pub id: ExprId,
489 pub last: Option<Value>,
490 pub bid: BindId,
491 pub target_bid: Option<BindId>,
492 rt: GXHandle,
493}
494
495impl Drop for Ref {
496 fn drop(&mut self) {
497 let _ = self.rt.0.send(ToRt::Delete { id: self.id });
498 }
499}
500
501impl Ref {
502 pub fn set(&self, v: Value) -> Result<()> {
503 self.rt.set(self.bid, v)
504 }
505
506 pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
507 if let Some(id) = self.target_bid {
508 self.rt.set(id, v)?
509 }
510 Ok(())
511 }
512}
513
514atomic_id!(CallableId);
515
516pub struct Callable {
517 rt: GXHandle,
518 id: CallableId,
519 env: Env<GXCtx, NoUserEvent>,
520 pub typ: FnType,
521 pub expr: ExprId,
522}
523
524impl Drop for Callable {
525 fn drop(&mut self) {
526 let _ = self.rt.0.send(ToRt::DeleteCallable { id: self.id });
527 }
528}
529
530impl Callable {
531 pub async fn call(&self, args: ValArray) -> Result<()> {
534 if self.typ.args.len() != args.len() {
535 bail!("expected {} args", self.typ.args.len())
536 }
537 for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
538 if !a.typ.is_a(&self.env, v) {
539 bail!("type mismatch arg {i} expected {}", a.typ)
540 }
541 }
542 self.call_unchecked(args).await
543 }
544
545 pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
549 self.rt
550 .0
551 .send(ToRt::Call { id: self.id, args })
552 .map_err(|_| anyhow!("runtime is dead"))
553 }
554}
555
556enum ToRt {
557 GetEnv { res: oneshot::Sender<Env<GXCtx, NoUserEvent>> },
558 Delete { id: ExprId },
559 Load { path: PathBuf, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
560 Compile { text: ArcStr, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
561 CompileCallable { id: Value, rt: GXHandle, res: oneshot::Sender<Result<Callable>> },
562 CompileRef { id: BindId, rt: GXHandle, res: oneshot::Sender<Result<Ref>> },
563 Set { id: BindId, v: Value },
564 Call { id: CallableId, args: ValArray },
565 DeleteCallable { id: CallableId },
566}
567
568struct CallableInt {
569 expr: ExprId,
570 args: Box<[BindId]>,
571}
572
573struct GX {
574 ctx: ExecCtx<GXCtx, NoUserEvent>,
575 event: Event<NoUserEvent>,
576 updated: FxHashMap<ExprId, bool>,
577 nodes: IndexMap<ExprId, Node<GXCtx, NoUserEvent>, FxBuildHasher>,
578 callables: FxHashMap<CallableId, CallableInt>,
579 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
580 resolvers: Arc<[ModuleResolver]>,
581 publish_timeout: Option<Duration>,
582 last_rpc_gc: Instant,
583}
584
585impl GX {
586 async fn new(mut rt: GXConfig) -> Result<Self> {
587 let resolvers_default = |r: &mut Vec<ModuleResolver>| match dirs::data_dir() {
588 None => r.push(ModuleResolver::Files("".into())),
589 Some(dd) => {
590 r.push(ModuleResolver::Files("".into()));
591 r.push(ModuleResolver::Files(dd.join("graphix")));
592 }
593 };
594 match std::env::var("GRAPHIX_MODPATH") {
595 Err(_) => resolvers_default(&mut rt.resolvers),
596 Ok(mp) => match ModuleResolver::parse_env(
597 rt.ctx.user.subscriber.clone(),
598 rt.resolve_timeout,
599 &mp,
600 ) {
601 Ok(r) => rt.resolvers.extend(r),
602 Err(e) => {
603 error!("failed to parse GRAPHIX_MODPATH, using default {e:?}");
604 resolvers_default(&mut rt.resolvers)
605 }
606 },
607 };
608 let mut t = Self {
609 ctx: rt.ctx,
610 event: Event::new(NoUserEvent),
611 updated: HashMap::default(),
612 nodes: IndexMap::default(),
613 callables: HashMap::default(),
614 sub: rt.sub,
615 resolvers: Arc::from(rt.resolvers),
616 publish_timeout: rt.publish_timeout,
617 last_rpc_gc: Instant::now(),
618 };
619 let st = Instant::now();
620 if let Some(root) = rt.root {
621 t.compile_root(root).await?;
622 }
623 info!("root init time: {:?}", st.elapsed());
624 Ok(t)
625 }
626
627 async fn do_cycle(
628 &mut self,
629 updates: Option<UpdateBatch>,
630 writes: Option<WriteBatch>,
631 tasks: &mut Vec<(BindId, Value)>,
632 rpcs: &mut Vec<(BindId, RpcCall)>,
633 to_rt: &mut UnboundedReceiver<ToRt>,
634 input: &mut Vec<ToRt>,
635 mut batch: Pooled<Vec<RtEvent>>,
636 ) {
637 macro_rules! push_event {
638 ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
639 match self.event.$event.entry($id) {
640 Entry::Vacant(e) => {
641 e.insert($v);
642 if let Some(exps) = self.ctx.user.$refed.get(&$id) {
643 for id in exps.keys() {
644 self.updated.entry(*id).or_insert(false);
645 }
646 }
647 }
648 Entry::Occupied(_) => {
649 self.ctx.user.$overflow.push_back(($id, $v));
650 }
651 }
652 };
653 }
654 for _ in 0..self.ctx.user.var_updates.len() {
655 let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
656 push_event!(id, v, variables, by_ref, var_updates)
657 }
658 for (id, v) in tasks.drain(..) {
659 push_event!(id, v, variables, by_ref, var_updates)
660 }
661 for _ in 0..self.ctx.user.rpc_overflow.len() {
662 let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
663 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
664 }
665 for (id, v) in rpcs.drain(..) {
666 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
667 }
668 for _ in 0..self.ctx.user.net_updates.len() {
669 let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
670 push_event!(id, v, netidx, subscribed, net_updates)
671 }
672 if let Some(mut updates) = updates {
673 for (id, v) in updates.drain(..) {
674 push_event!(id, v, netidx, subscribed, net_updates)
675 }
676 }
677 for _ in 0..self.ctx.user.net_writes.len() {
678 let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
679 push_event!(id, v, writes, published, net_writes)
680 }
681 if let Some(mut writes) = writes {
682 for wr in writes.drain(..) {
683 let id = wr.id;
684 push_event!(id, wr, writes, published, net_writes)
685 }
686 }
687 for (id, n) in self.nodes.iter_mut() {
688 if let Some(init) = self.updated.get(id) {
689 let mut clear: SmallVec<[BindId; 16]> = smallvec![];
690 self.event.init = *init;
691 if self.event.init {
692 REFS.with_borrow_mut(|refs| {
693 refs.clear();
694 n.refs(refs);
695 refs.with_external_refs(|id| {
696 if let Some(v) = self.ctx.cached.get(&id) {
697 if let Entry::Vacant(e) = self.event.variables.entry(id) {
698 e.insert(v.clone());
699 clear.push(id);
700 }
701 }
702 });
703 });
704 }
705 let res = catch_unwind(AssertUnwindSafe(|| {
706 n.update(&mut self.ctx, &mut self.event)
707 }));
708 for id in clear {
709 self.event.variables.remove(&id);
710 }
711 match res {
712 Ok(None) => (),
713 Ok(Some(v)) => batch.push(RtEvent::Updated(*id, v)),
714 Err(e) => {
715 error!("could not update exprid: {id:?}, panic: {e:?}")
716 }
717 }
718 }
719 }
720 loop {
721 match self.sub.send_timeout(batch, Duration::from_millis(100)).await {
722 Ok(()) => break,
723 Err(SendTimeoutError::Closed(_)) => {
724 error!("could not send batch");
725 break;
726 }
727 Err(SendTimeoutError::Timeout(b)) => {
728 batch = b;
729 while let Ok(m) = to_rt.try_recv() {
731 input.push(m);
732 }
733 self.process_input_batch(tasks, input, &mut batch).await;
734 }
735 }
736 }
737 self.event.clear();
738 self.updated.clear();
739 if self.ctx.user.batch.len() > 0 {
740 let batch = mem::replace(
741 &mut self.ctx.user.batch,
742 self.ctx.user.publisher.start_batch(),
743 );
744 let timeout = self.publish_timeout;
745 task::spawn(async move { batch.commit(timeout).await });
746 }
747 }
748
749 async fn process_input_batch(
750 &mut self,
751 tasks: &mut Vec<(BindId, Value)>,
752 input: &mut Vec<ToRt>,
753 batch: &mut Pooled<Vec<RtEvent>>,
754 ) {
755 for m in input.drain(..) {
756 match m {
757 ToRt::GetEnv { res } => {
758 let _ = res.send(self.ctx.env.clone());
759 }
760 ToRt::Compile { text, rt, res } => {
761 let _ = res.send(self.compile(rt, text).await);
762 }
763 ToRt::Load { path, rt, res } => {
764 let _ = res.send(self.load(rt, &path).await);
765 }
766 ToRt::Delete { id } => {
767 if let Some(mut n) = self.nodes.shift_remove(&id) {
768 n.delete(&mut self.ctx);
769 }
770 debug!("delete {id:?}");
771 batch.push(RtEvent::Env(self.ctx.env.clone()));
772 }
773 ToRt::CompileCallable { id, rt, res } => {
774 let _ = res.send(self.compile_callable(id, rt));
775 }
776 ToRt::CompileRef { id, rt, res } => {
777 let _ = res.send(self.compile_ref(rt, id));
778 }
779 ToRt::Set { id, v } => {
780 self.ctx.cached.insert(id, v.clone());
781 tasks.push((id, v))
782 }
783 ToRt::DeleteCallable { id } => self.delete_callable(id),
784 ToRt::Call { id, args } => {
785 if let Err(e) = self.call_callable(id, args, tasks) {
786 error!("calling callable {id:?} failed with {e:?}")
787 }
788 }
789 }
790 }
791 }
792
793 fn cycle_ready(&self) -> bool {
794 self.ctx.user.var_updates.len() > 0
795 || self.ctx.user.net_updates.len() > 0
796 || self.ctx.user.net_writes.len() > 0
797 || self.ctx.user.rpc_overflow.len() > 0
798 }
799
800 async fn compile_root(&mut self, text: ArcStr) -> Result<()> {
801 let scope = ModPath::root();
802 let ori = Origin { parent: None, source: Source::Unspecified, text };
803 let exprs =
804 expr::parser::parse(ori.clone()).context("parsing the root module")?;
805 let exprs =
806 try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
807 .await
808 .context(CouldNotResolve)?;
809 let nodes = exprs
810 .iter()
811 .map(|e| {
812 compile(&mut self.ctx, &scope, e.clone())
813 .with_context(|| format!("compiling root expression {e}"))
814 })
815 .collect::<Result<SmallVec<[_; 4]>>>()
816 .with_context(|| ori.clone())?;
817 for (e, n) in exprs.iter().zip(nodes.into_iter()) {
818 self.updated.insert(e.id, true);
819 self.nodes.insert(e.id, n);
820 }
821 Ok(())
822 }
823
824 async fn compile(&mut self, rt: GXHandle, text: ArcStr) -> Result<CompRes> {
825 let scope = ModPath::root();
826 let ori = Origin { parent: None, source: Source::Unspecified, text };
827 let exprs = expr::parser::parse(ori.clone())?;
828 let exprs =
829 try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
830 .await
831 .context(CouldNotResolve)?;
832 let nodes = exprs
833 .iter()
834 .map(|e| compile(&mut self.ctx, &scope, e.clone()))
835 .collect::<Result<SmallVec<[_; 4]>>>()
836 .with_context(|| ori.clone())?;
837 let exprs = exprs
838 .iter()
839 .zip(nodes.into_iter())
840 .map(|(e, n)| {
841 let output = is_output(&n);
842 let typ = n.typ().clone();
843 self.updated.insert(e.id, true);
844 self.nodes.insert(e.id, n);
845 CompExp { id: e.id, output, typ, rt: rt.clone() }
846 })
847 .collect::<SmallVec<[_; 1]>>();
848 Ok(CompRes { exprs, env: self.ctx.env.clone() })
849 }
850
851 async fn load(&mut self, rt: GXHandle, file: &PathBuf) -> Result<CompRes> {
852 let scope = ModPath::root();
853 let st = Instant::now();
854 let (ori, exprs) = match file.extension() {
855 Some(e) if e.as_encoded_bytes() == b"gx" => {
856 let file = file.canonicalize()?;
857 let s = fs::read_to_string(&file).await?;
858 let s = if s.starts_with("#!") {
859 if let Some(i) = s.find('\n') {
860 &s[i..]
861 } else {
862 s.as_str()
863 }
864 } else {
865 s.as_str()
866 };
867 let ori = Origin {
868 parent: None,
869 source: Source::File(file),
870 text: ArcStr::from(s),
871 };
872 (ori.clone(), expr::parser::parse(ori)?)
873 }
874 Some(e) => bail!("invalid file extension {e:?}"),
875 None => {
876 let name = file
877 .components()
878 .map(|c| match c {
879 Component::RootDir
880 | Component::CurDir
881 | Component::ParentDir
882 | Component::Prefix(_) => bail!("invalid module name {file:?}"),
883 Component::Normal(s) => Ok(s),
884 })
885 .collect::<Result<Box<[_]>>>()?;
886 if name.len() != 1 {
887 bail!("invalid module name {file:?}")
888 }
889 let name = name[0].to_string_lossy();
890 let name = name
891 .parse::<ModPath>()
892 .with_context(|| "parsing module name {file:?}")?;
893 let name = Path::basename(&*name)
894 .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
895 let name = ArcStr::from(name);
896 let ori = Origin {
897 parent: None,
898 source: Source::Internal(name.clone()),
899 text: literal!(""),
900 };
901 let kind = ExprKind::Module {
902 export: true,
903 name,
904 value: ModuleKind::Unresolved,
905 };
906 let exprs = Arc::from(vec![Expr {
907 id: ExprId::new(),
908 ori: Arc::new(ori.clone()),
909 pos: SourcePosition::default(),
910 kind,
911 }]);
912 (ori, exprs)
913 }
914 };
915 info!("parse time: {:?}", st.elapsed());
916 let st = Instant::now();
917 let exprs =
918 try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
919 .await
920 .context(CouldNotResolve)?;
921 info!("resolve time: {:?}", st.elapsed());
922 let mut res = smallvec![];
923 for e in exprs.iter() {
924 let top_id = e.id;
925 let n =
926 compile(&mut self.ctx, &scope, e.clone()).with_context(|| ori.clone())?;
927 let has_out = is_output(&n);
928 let typ = n.typ().clone();
929 self.nodes.insert(top_id, n);
930 self.updated.insert(top_id, true);
931 res.push(CompExp { id: top_id, output: has_out, typ, rt: rt.clone() })
932 }
933 Ok(CompRes { exprs: res, env: self.ctx.env.clone() })
934 }
935
936 fn compile_callable(&mut self, id: Value, rt: GXHandle) -> Result<Callable> {
937 let id = match id {
938 Value::U64(id) => LambdaId::from(id),
939 v => bail!("invalid lambda id {v}"),
940 };
941 let lb = self.ctx.env.lambdas.get(&id).and_then(Weak::upgrade);
942 let lb = lb.ok_or_else(|| anyhow!("unknown lambda {id:?}"))?;
943 let args = lb.typ.args.iter();
944 let args = args
945 .map(|a| {
946 if a.label.as_ref().map(|(_, opt)| *opt).unwrap_or(false) {
947 bail!("can't call lambda with an optional argument from rust")
948 } else {
949 Ok(BindId::new())
950 }
951 })
952 .collect::<Result<Box<[_]>>>()?;
953 let eid = ExprId::new();
954 let argn = lb.typ.args.iter().zip(args.iter());
955 let argn = argn
956 .map(|(arg, id)| genn::reference(&mut self.ctx, *id, arg.typ.clone(), eid))
957 .collect::<Vec<_>>();
958 let fnode = genn::constant(Value::U64(id.inner()));
959 let mut n = genn::apply(fnode, argn, lb.typ.clone(), eid);
960 self.event.init = true;
961 n.update(&mut self.ctx, &mut self.event);
962 self.event.clear();
963 let cid = CallableId::new();
964 self.callables.insert(cid, CallableInt { expr: eid, args });
965 self.nodes.insert(eid, n);
966 let env = self.ctx.env.clone();
967 Ok(Callable { expr: eid, rt, env, id: cid, typ: (*lb.typ).clone() })
968 }
969
970 fn compile_ref(&mut self, rt: GXHandle, id: BindId) -> Result<Ref> {
971 let eid = ExprId::new();
972 let typ = Type::Any;
973 let n = genn::reference(&mut self.ctx, id, typ, eid);
974 self.nodes.insert(eid, n);
975 let target_bid = self.ctx.env.byref_chain.get(&id).copied();
976 Ok(Ref {
977 id: eid,
978 bid: id,
979 target_bid,
980 last: self.ctx.cached.get(&id).cloned(),
981 rt,
982 })
983 }
984
985 fn call_callable(
986 &mut self,
987 id: CallableId,
988 args: ValArray,
989 tasks: &mut Vec<(BindId, Value)>,
990 ) -> Result<()> {
991 let c =
992 self.callables.get(&id).ok_or_else(|| anyhow!("unknown callable {id:?}"))?;
993 if args.len() != c.args.len() {
994 bail!("expected {} arguments", c.args.len());
995 }
996 let a = c.args.iter().zip(args.iter()).map(|(id, v)| (*id, v.clone()));
997 tasks.extend(a);
998 Ok(())
999 }
1000
1001 fn delete_callable(&mut self, id: CallableId) {
1002 if let Some(c) = self.callables.remove(&id) {
1003 if let Some(mut n) = self.nodes.shift_remove(&c.expr) {
1004 n.delete(&mut self.ctx)
1005 }
1006 }
1007 }
1008
1009 async fn run(mut self, mut to_rt: tmpsc::UnboundedReceiver<ToRt>) -> Result<()> {
1010 let mut tasks = vec![];
1011 let mut input = vec![];
1012 let mut rpcs = vec![];
1013 let onemin = Duration::from_secs(60);
1014 'main: loop {
1015 let now = Instant::now();
1016 let ready = self.cycle_ready();
1017 let mut updates = None;
1018 let mut writes = None;
1019 macro_rules! peek {
1020 (updates) => {
1021 if self.ctx.user.net_updates.is_empty() {
1022 while let Ok(Some(mut up)) = self.ctx.user.updates.try_next() {
1023 match &mut updates {
1024 None => updates = Some(up),
1025 Some(prev) => prev.extend(up.drain(..)),
1026 }
1027 }
1028 }
1029 };
1030 (writes) => {
1031 if self.ctx.user.net_writes.is_empty() {
1032 if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
1033 writes = Some(wr);
1034 }
1035 }
1036 };
1037 (tasks) => {
1038 while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
1039 tasks.push(up);
1040 }
1041 };
1042 (rpcs) => {
1043 if self.ctx.user.rpc_overflow.is_empty() {
1044 while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
1045 rpcs.push(up);
1046 }
1047 }
1048 };
1049 (input) => {
1050 while let Ok(m) = to_rt.try_recv() {
1051 input.push(m);
1052 }
1053 };
1054 ($($item:tt),+) => {{
1055 $(peek!($item));+
1056 }};
1057 }
1058 select! {
1059 rp = maybe_next(
1060 self.ctx.user.rpc_overflow.is_empty(),
1061 &mut self.ctx.user.rpcs
1062 ) => {
1063 rpcs.push(rp);
1064 peek!(updates, tasks, writes, rpcs, input)
1065 }
1066 wr = maybe_next(
1067 self.ctx.user.net_writes.is_empty(),
1068 &mut self.ctx.user.writes
1069 ) => {
1070 writes = Some(wr);
1071 peek!(updates, tasks, rpcs, input);
1072 },
1073 up = maybe_next(
1074 self.ctx.user.net_updates.is_empty(),
1075 &mut self.ctx.user.updates
1076 ) => {
1077 updates = Some(up);
1078 peek!(updates, writes, tasks, rpcs, input);
1079 },
1080 up = join_or_wait(&mut self.ctx.user.tasks) => {
1081 if let Ok(up) = up {
1082 tasks.push(up);
1083 }
1084 peek!(updates, writes, tasks, rpcs, input)
1085 },
1086 _ = or_never(ready) => {
1087 peek!(updates, writes, tasks, rpcs, input)
1088 },
1089 n = to_rt.recv_many(&mut input, 100000) => {
1090 if n == 0 {
1091 break 'main Ok(())
1092 }
1093 peek!(updates, writes, tasks, rpcs);
1094 },
1095 () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
1096 while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
1097 if ts.elapsed() >= Duration::from_secs(1) {
1098 self.ctx.user.pending_unsubscribe.pop_front();
1099 } else {
1100 break
1101 }
1102 }
1103 continue 'main
1104 },
1105 }
1106 let mut batch = BATCH.take();
1107 self.process_input_batch(&mut tasks, &mut input, &mut batch).await;
1108 self.do_cycle(
1109 updates, writes, &mut tasks, &mut rpcs, &mut to_rt, &mut input, batch,
1110 )
1111 .await;
1112 if !self.ctx.user.rpc_clients.is_empty() {
1113 if now - self.last_rpc_gc >= onemin {
1114 self.last_rpc_gc = now;
1115 self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
1116 }
1117 }
1118 }
1119 }
1120}
1121
1122#[derive(Clone)]
1125pub struct GXHandle(tmpsc::UnboundedSender<ToRt>);
1126
1127impl GXHandle {
1128 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
1129 let (tx, rx) = oneshot::channel();
1130 self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
1131 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
1132 }
1133
1134 pub async fn get_env(&self) -> Result<Env<GXCtx, NoUserEvent>> {
1136 self.exec(|res| ToRt::GetEnv { res }).await
1137 }
1138
1139 pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
1145 Ok(self.exec(|tx| ToRt::Compile { text, res: tx, rt: self.clone() }).await??)
1146 }
1147
1148 pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
1156 Ok(self.exec(|tx| ToRt::Load { path, res: tx, rt: self.clone() }).await??)
1157 }
1158
1159 pub async fn compile_callable(&self, id: Value) -> Result<Callable> {
1163 Ok(self
1164 .exec(|tx| ToRt::CompileCallable { id, rt: self.clone(), res: tx })
1165 .await??)
1166 }
1167
1168 pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref> {
1172 Ok(self
1173 .exec(|tx| ToRt::CompileRef { id: id.into(), res: tx, rt: self.clone() })
1174 .await??)
1175 }
1176
1177 pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
1180 let v = v.into();
1181 self.0.send(ToRt::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
1182 }
1183}
1184
1185#[derive(Builder)]
1186#[builder(pattern = "owned")]
1187pub struct GXConfig {
1188 #[builder(setter(strip_option), default)]
1192 resolve_timeout: Option<Duration>,
1193 #[builder(setter(strip_option), default)]
1195 publish_timeout: Option<Duration>,
1196 ctx: ExecCtx<GXCtx, NoUserEvent>,
1198 #[builder(setter(strip_option), default)]
1200 root: Option<ArcStr>,
1201 #[builder(default)]
1203 resolvers: Vec<ModuleResolver>,
1204 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1206}
1207
1208impl GXConfig {
1209 pub fn builder(
1211 ctx: ExecCtx<GXCtx, NoUserEvent>,
1212 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1213 ) -> GXConfigBuilder {
1214 GXConfigBuilder::default().ctx(ctx).sub(sub)
1215 }
1216
1217 pub async fn start(self) -> Result<GXHandle> {
1228 let (init_tx, init_rx) = oneshot::channel();
1229 let (tx, rx) = tmpsc::unbounded_channel();
1230 task::spawn(async move {
1231 match GX::new(self).await {
1232 Ok(bs) => {
1233 let _ = init_tx.send(Ok(()));
1234 if let Err(e) = bs.run(rx).await {
1235 error!("run loop exited with error {e:?}")
1236 }
1237 }
1238 Err(e) => {
1239 let _ = init_tx.send(Err(e));
1240 }
1241 };
1242 });
1243 init_rx.await??;
1244 Ok(GXHandle(tx))
1245 }
1246}