1use anyhow::{anyhow, bail, Context, Result};
13use arcstr::{literal, ArcStr};
14use chrono::prelude::*;
15use compact_str::format_compact;
16use core::fmt;
17use derive_builder::Builder;
18use futures::{channel::mpsc, future::join_all, FutureExt, StreamExt};
19use fxhash::{FxBuildHasher, FxHashMap};
20use graphix_compiler::{
21 compile,
22 env::Env,
23 expr::{
24 self, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin, SourceOrigin,
25 },
26 node::genn,
27 typ::{FnType, Type},
28 BindId, Ctx, Event, ExecCtx, LambdaId, NoUserEvent, Node, REFS,
29};
30use indexmap::IndexMap;
31use log::{debug, error, info};
32use netidx::{
33 path::Path,
34 pool::{Pool, Pooled},
35 protocol::valarray::ValArray,
36 publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
37 resolver_client::ChangeTracker,
38 subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
39};
40use netidx_core::atomic_id;
41use netidx_protocols::rpc::{
42 self,
43 server::{ArgSpec, RpcCall},
44};
45use serde_derive::{Deserialize, Serialize};
46use smallvec::{smallvec, SmallVec};
47use std::{
48 collections::{hash_map::Entry, HashMap, VecDeque},
49 future, mem,
50 os::unix::ffi::OsStrExt,
51 panic::{catch_unwind, AssertUnwindSafe},
52 path::{Component, PathBuf},
53 result,
54 sync::{LazyLock, Weak},
55 time::Duration,
56};
57use tokio::{
58 fs, select,
59 sync::{
60 mpsc::{self as tmpsc, error::SendTimeoutError, UnboundedReceiver},
61 oneshot, Mutex,
62 },
63 task::{self, JoinError, JoinSet},
64 time::{self, Instant},
65};
66use triomphe::Arc;
67
68type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
69type WriteBatch = Pooled<Vec<WriteRequest>>;
70
71#[derive(Debug)]
72pub struct CouldNotResolve;
73
74impl fmt::Display for CouldNotResolve {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 write!(f, "could not resolve module")
77 }
78}
79
80#[derive(Debug)]
81struct RpcClient {
82 proc: rpc::client::Proc,
83 last_used: Instant,
84}
85
86#[derive(Debug)]
87pub struct GXCtx {
88 by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
89 subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
90 published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
91 var_updates: VecDeque<(BindId, Value)>,
92 net_updates: VecDeque<(SubId, subscriber::Event)>,
93 net_writes: VecDeque<(Id, WriteRequest)>,
94 rpc_overflow: VecDeque<(BindId, RpcCall)>,
95 rpc_clients: FxHashMap<Path, RpcClient>,
96 published_rpcs: FxHashMap<Path, rpc::server::Proc>,
97 pending_unsubscribe: VecDeque<(Instant, Dval)>,
98 change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
99 tasks: JoinSet<(BindId, Value)>,
100 batch: publisher::UpdateBatch,
101 publisher: Publisher,
102 subscriber: Subscriber,
103 updates_tx: mpsc::Sender<UpdateBatch>,
104 updates: mpsc::Receiver<UpdateBatch>,
105 writes_tx: mpsc::Sender<WriteBatch>,
106 writes: mpsc::Receiver<WriteBatch>,
107 rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
108 rpcs: mpsc::Receiver<(BindId, RpcCall)>,
109}
110
111impl GXCtx {
112 pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
113 let (updates_tx, updates) = mpsc::channel(100);
114 let (writes_tx, writes) = mpsc::channel(100);
115 let (rpcs_tx, rpcs) = mpsc::channel(100);
116 let batch = publisher.start_batch();
117 let mut tasks = JoinSet::new();
118 tasks.spawn(async { future::pending().await });
119 Self {
120 by_ref: HashMap::default(),
121 var_updates: VecDeque::new(),
122 net_updates: VecDeque::new(),
123 net_writes: VecDeque::new(),
124 rpc_overflow: VecDeque::new(),
125 rpc_clients: HashMap::default(),
126 subscribed: HashMap::default(),
127 pending_unsubscribe: VecDeque::new(),
128 published: HashMap::default(),
129 change_trackers: HashMap::default(),
130 published_rpcs: HashMap::default(),
131 tasks,
132 batch,
133 publisher,
134 subscriber,
135 updates,
136 updates_tx,
137 writes,
138 writes_tx,
139 rpcs_tx,
140 rpcs,
141 }
142 }
143}
144
145macro_rules! or_err {
146 ($bindid:expr, $e:expr) => {
147 match $e {
148 Ok(v) => v,
149 Err(e) => {
150 let e = ArcStr::from(format_compact!("{e:?}").as_str());
151 let e = Value::Error(e);
152 return ($bindid, e);
153 }
154 }
155 };
156}
157
158macro_rules! check_changed {
159 ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
160 let mut ct = $ct.lock().await;
161 if ct.path() != &$path {
162 *ct = ChangeTracker::new($path.clone());
163 }
164 if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
165 return ($id, Value::Null);
166 }
167 };
168}
169
170impl Ctx for GXCtx {
171 fn clear(&mut self) {
172 let Self {
173 by_ref,
174 var_updates,
175 net_updates,
176 net_writes,
177 rpc_clients,
178 rpc_overflow,
179 subscribed,
180 published,
181 published_rpcs,
182 pending_unsubscribe,
183 change_trackers,
184 tasks,
185 batch,
186 publisher,
187 subscriber: _,
188 updates_tx,
189 updates,
190 writes_tx,
191 writes,
192 rpcs,
193 rpcs_tx,
194 } = self;
195 by_ref.clear();
196 var_updates.clear();
197 net_updates.clear();
198 net_writes.clear();
199 rpc_overflow.clear();
200 rpc_clients.clear();
201 subscribed.clear();
202 published.clear();
203 published_rpcs.clear();
204 pending_unsubscribe.clear();
205 change_trackers.clear();
206 *tasks = JoinSet::new();
207 tasks.spawn(async { future::pending().await });
208 *batch = publisher.start_batch();
209 let (tx, rx) = mpsc::channel(3);
210 *updates_tx = tx;
211 *updates = rx;
212 let (tx, rx) = mpsc::channel(100);
213 *writes_tx = tx;
214 *writes = rx;
215 let (tx, rx) = mpsc::channel(100);
216 *rpcs_tx = tx;
217 *rpcs = rx
218 }
219
220 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
221 let now = Instant::now();
222 let proc = match self.rpc_clients.entry(name) {
223 Entry::Occupied(mut e) => {
224 let cl = e.get_mut();
225 cl.last_used = now;
226 Ok(cl.proc.clone())
227 }
228 Entry::Vacant(e) => {
229 match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
230 Err(e) => Err(e),
231 Ok(proc) => {
232 let cl = RpcClient { last_used: now, proc: proc.clone() };
233 e.insert(cl);
234 Ok(proc)
235 }
236 }
237 }
238 };
239 self.tasks.spawn(async move {
240 macro_rules! err {
241 ($e:expr) => {{
242 let e = format_compact!("{:?}", $e);
243 (id, Value::Error(e.as_str().into()))
244 }};
245 }
246 match proc {
247 Err(e) => err!(e),
248 Ok(proc) => match proc.call(args).await {
249 Err(e) => err!(e),
250 Ok(res) => (id, res),
251 },
252 }
253 });
254 }
255
256 fn publish_rpc(
257 &mut self,
258 name: Path,
259 doc: Value,
260 spec: Vec<ArgSpec>,
261 id: BindId,
262 ) -> Result<()> {
263 use rpc::server::Proc;
264 let e = match self.published_rpcs.entry(name) {
265 Entry::Vacant(e) => e,
266 Entry::Occupied(_) => bail!("already published"),
267 };
268 let proc = Proc::new(
269 &self.publisher,
270 e.key().clone(),
271 doc,
272 spec,
273 move |c| Some((id, c)),
274 Some(self.rpcs_tx.clone()),
275 )?;
276 e.insert(proc);
277 Ok(())
278 }
279
280 fn unpublish_rpc(&mut self, name: Path) {
281 self.published_rpcs.remove(&name);
282 }
283
284 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
285 let dval =
286 self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
287 *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
288 dval
289 }
290
291 fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
292 if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
293 if let Some(cn) = exprs.get_mut(&ref_by) {
294 *cn -= 1;
295 if *cn == 0 {
296 exprs.remove(&ref_by);
297 }
298 }
299 if exprs.is_empty() {
300 self.subscribed.remove(&dv.id());
301 }
302 }
303 self.pending_unsubscribe.push_back((Instant::now(), dv));
304 }
305
306 fn list(&mut self, id: BindId, path: Path) {
307 let ct = self
308 .change_trackers
309 .entry(id)
310 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
311 let ct = Arc::clone(ct);
312 let resolver = self.subscriber.resolver();
313 self.tasks.spawn(async move {
314 check_changed!(id, resolver, path, ct);
315 let mut paths = or_err!(id, resolver.list(path).await);
316 let paths = paths.drain(..).map(|p| Value::String(p.into()));
317 (id, Value::Array(ValArray::from_iter_exact(paths)))
318 });
319 }
320
321 fn list_table(&mut self, id: BindId, path: Path) {
322 let ct = self
323 .change_trackers
324 .entry(id)
325 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
326 let ct = Arc::clone(ct);
327 let resolver = self.subscriber.resolver();
328 self.tasks.spawn(async move {
329 check_changed!(id, resolver, path, ct);
330 let mut tbl = or_err!(id, resolver.table(path).await);
331 let cols = tbl.cols.drain(..).map(|(name, count)| {
332 Value::Array(ValArray::from([
333 Value::String(name.into()),
334 Value::V64(count.0),
335 ]))
336 });
337 let cols = Value::Array(ValArray::from_iter_exact(cols));
338 let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
339 let rows = Value::Array(ValArray::from_iter_exact(rows));
340 let tbl = Value::Array(ValArray::from([
341 Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
342 Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
343 ]));
344 (id, tbl)
345 });
346 }
347
348 fn stop_list(&mut self, id: BindId) {
349 self.change_trackers.remove(&id);
350 }
351
352 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
353 let val = self.publisher.publish_with_flags_and_writes(
354 PublishFlags::empty(),
355 path,
356 value,
357 Some(self.writes_tx.clone()),
358 )?;
359 let id = val.id();
360 *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
361 Ok(val)
362 }
363
364 fn update(&mut self, val: &Val, value: Value) {
365 val.update(&mut self.batch, value);
366 }
367
368 fn unpublish(&mut self, val: Val, ref_by: ExprId) {
369 if let Some(refs) = self.published.get_mut(&val.id()) {
370 if let Some(cn) = refs.get_mut(&ref_by) {
371 *cn -= 1;
372 if *cn == 0 {
373 refs.remove(&ref_by);
374 }
375 }
376 if refs.is_empty() {
377 self.published.remove(&val.id());
378 }
379 }
380 }
381
382 fn set_timer(&mut self, id: BindId, timeout: Duration) {
383 self.tasks
384 .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
385 }
386
387 fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
388 *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
389 }
390
391 fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
392 if let Some(refs) = self.by_ref.get_mut(&id) {
393 if let Some(cn) = refs.get_mut(&ref_by) {
394 *cn -= 1;
395 if *cn == 0 {
396 refs.remove(&ref_by);
397 }
398 }
399 if refs.is_empty() {
400 self.by_ref.remove(&id);
401 }
402 }
403 }
404
405 fn set_var(&mut self, id: BindId, value: Value) {
406 self.var_updates.push_back((id, value.clone()));
407 }
408}
409
410fn is_output(n: &Node<GXCtx, NoUserEvent>) -> bool {
411 match &n.spec().kind {
412 ExprKind::Bind { .. }
413 | ExprKind::Lambda { .. }
414 | ExprKind::Use { .. }
415 | ExprKind::Connect { .. }
416 | ExprKind::Module { .. }
417 | ExprKind::TypeDef { .. } => false,
418 _ => true,
419 }
420}
421
422async fn or_never(b: bool) {
423 if !b {
424 future::pending().await
425 }
426}
427
428async fn join_or_wait(
429 js: &mut JoinSet<(BindId, Value)>,
430) -> result::Result<(BindId, Value), JoinError> {
431 match js.join_next().await {
432 None => future::pending().await,
433 Some(r) => r,
434 }
435}
436
437async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
438 if go {
439 match ch.next().await {
440 None => future::pending().await,
441 Some(v) => v,
442 }
443 } else {
444 future::pending().await
445 }
446}
447
448async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
449 if pending.len() == 0 {
450 future::pending().await
451 } else {
452 let (ts, _) = pending.front().unwrap();
453 let one = Duration::from_secs(1);
454 let elapsed = now - *ts;
455 if elapsed < one {
456 time::sleep(one - elapsed).await
457 }
458 }
459}
460
461pub struct CompExp {
462 pub id: ExprId,
463 pub typ: Type,
464 pub output: bool,
465 rt: GXHandle,
466}
467
468impl Drop for CompExp {
469 fn drop(&mut self) {
470 let _ = self.rt.0.send(ToRt::Delete { id: self.id });
471 }
472}
473
474pub struct CompRes {
475 pub exprs: SmallVec<[CompExp; 1]>,
476 pub env: Env<GXCtx, NoUserEvent>,
477}
478
479#[derive(Clone)]
480pub enum RtEvent {
481 Updated(ExprId, Value),
482 Env(Env<GXCtx, NoUserEvent>),
483}
484
485static BATCH: LazyLock<Pool<Vec<RtEvent>>> = LazyLock::new(|| Pool::new(10, 1000000));
486
487pub struct Ref {
488 pub id: ExprId,
489 pub last: Option<Value>,
490 pub bid: BindId,
491 pub target_bid: Option<BindId>,
492 rt: GXHandle,
493}
494
495impl Drop for Ref {
496 fn drop(&mut self) {
497 let _ = self.rt.0.send(ToRt::Delete { id: self.id });
498 }
499}
500
501impl Ref {
502 pub fn set(&self, v: Value) -> Result<()> {
503 self.rt.set(self.bid, v)
504 }
505
506 pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
507 if let Some(id) = self.target_bid {
508 self.rt.set(id, v)?
509 }
510 Ok(())
511 }
512}
513
514atomic_id!(CallableId);
515
516pub struct Callable {
517 rt: GXHandle,
518 id: CallableId,
519 env: Env<GXCtx, NoUserEvent>,
520 pub typ: FnType,
521 pub expr: ExprId,
522}
523
524impl Drop for Callable {
525 fn drop(&mut self) {
526 let _ = self.rt.0.send(ToRt::DeleteCallable { id: self.id });
527 }
528}
529
530impl Callable {
531 pub async fn call(&self, args: ValArray) -> Result<()> {
534 if self.typ.args.len() != args.len() {
535 bail!("expected {} args", self.typ.args.len())
536 }
537 for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
538 if !a.typ.is_a(&self.env, v) {
539 bail!("type mismatch arg {i} expected {}", a.typ)
540 }
541 }
542 self.call_unchecked(args).await
543 }
544
545 pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
549 self.rt
550 .0
551 .send(ToRt::Call { id: self.id, args })
552 .map_err(|_| anyhow!("runtime is dead"))
553 }
554}
555
556enum ToRt {
557 GetEnv { res: oneshot::Sender<Env<GXCtx, NoUserEvent>> },
558 Delete { id: ExprId },
559 Load { path: PathBuf, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
560 Compile { text: ArcStr, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
561 CompileCallable { id: Value, rt: GXHandle, res: oneshot::Sender<Result<Callable>> },
562 CompileRef { id: BindId, rt: GXHandle, res: oneshot::Sender<Result<Ref>> },
563 Set { id: BindId, v: Value },
564 Call { id: CallableId, args: ValArray },
565 DeleteCallable { id: CallableId },
566}
567
568struct CallableInt {
569 expr: ExprId,
570 args: Box<[BindId]>,
571}
572
573struct GX {
574 ctx: ExecCtx<GXCtx, NoUserEvent>,
575 event: Event<NoUserEvent>,
576 updated: FxHashMap<ExprId, bool>,
577 nodes: IndexMap<ExprId, Node<GXCtx, NoUserEvent>, FxBuildHasher>,
578 callables: FxHashMap<CallableId, CallableInt>,
579 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
580 resolvers: Arc<[ModuleResolver]>,
581 publish_timeout: Option<Duration>,
582 last_rpc_gc: Instant,
583}
584
585impl GX {
586 async fn new(mut rt: GXConfig) -> Result<Self> {
587 let resolvers_default = |r: &mut Vec<ModuleResolver>| match dirs::data_dir() {
588 None => r.push(ModuleResolver::Files("".into())),
589 Some(dd) => {
590 r.push(ModuleResolver::Files("".into()));
591 r.push(ModuleResolver::Files(dd.join("graphix")));
592 }
593 };
594 match std::env::var("GRAPHIX_MODPATH") {
595 Err(_) => resolvers_default(&mut rt.resolvers),
596 Ok(mp) => match ModuleResolver::parse_env(
597 rt.ctx.user.subscriber.clone(),
598 rt.resolve_timeout,
599 &mp,
600 ) {
601 Ok(r) => rt.resolvers.extend(r),
602 Err(e) => {
603 error!("failed to parse GRAPHIX_MODPATH, using default {e:?}");
604 resolvers_default(&mut rt.resolvers)
605 }
606 },
607 };
608 let mut t = Self {
609 ctx: rt.ctx,
610 event: Event::new(NoUserEvent),
611 updated: HashMap::default(),
612 nodes: IndexMap::default(),
613 callables: HashMap::default(),
614 sub: rt.sub,
615 resolvers: Arc::from(rt.resolvers),
616 publish_timeout: rt.publish_timeout,
617 last_rpc_gc: Instant::now(),
618 };
619 let st = Instant::now();
620 if let Some(root) = rt.root {
621 t.compile_root(root).await?;
622 }
623 info!("root init time: {:?}", st.elapsed());
624 Ok(t)
625 }
626
627 async fn do_cycle(
628 &mut self,
629 updates: Option<UpdateBatch>,
630 writes: Option<WriteBatch>,
631 tasks: &mut Vec<(BindId, Value)>,
632 rpcs: &mut Vec<(BindId, RpcCall)>,
633 to_rt: &mut UnboundedReceiver<ToRt>,
634 input: &mut Vec<ToRt>,
635 mut batch: Pooled<Vec<RtEvent>>,
636 ) {
637 macro_rules! push_event {
638 ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
639 match self.event.$event.entry($id) {
640 Entry::Vacant(e) => {
641 e.insert($v);
642 if let Some(exps) = self.ctx.user.$refed.get(&$id) {
643 for id in exps.keys() {
644 self.updated.entry(*id).or_insert(false);
645 }
646 }
647 }
648 Entry::Occupied(_) => {
649 self.ctx.user.$overflow.push_back(($id, $v));
650 }
651 }
652 };
653 }
654 for _ in 0..self.ctx.user.var_updates.len() {
655 let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
656 push_event!(id, v, variables, by_ref, var_updates)
657 }
658 for (id, v) in tasks.drain(..) {
659 push_event!(id, v, variables, by_ref, var_updates)
660 }
661 for _ in 0..self.ctx.user.rpc_overflow.len() {
662 let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
663 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
664 }
665 for (id, v) in rpcs.drain(..) {
666 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
667 }
668 for _ in 0..self.ctx.user.net_updates.len() {
669 let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
670 push_event!(id, v, netidx, subscribed, net_updates)
671 }
672 if let Some(mut updates) = updates {
673 for (id, v) in updates.drain(..) {
674 push_event!(id, v, netidx, subscribed, net_updates)
675 }
676 }
677 for _ in 0..self.ctx.user.net_writes.len() {
678 let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
679 push_event!(id, v, writes, published, net_writes)
680 }
681 if let Some(mut writes) = writes {
682 for wr in writes.drain(..) {
683 let id = wr.id;
684 push_event!(id, wr, writes, published, net_writes)
685 }
686 }
687 for (id, n) in self.nodes.iter_mut() {
688 if let Some(init) = self.updated.get(id) {
689 let mut clear: SmallVec<[BindId; 16]> = smallvec![];
690 self.event.init = *init;
691 if self.event.init {
692 REFS.with_borrow_mut(|refs| {
693 refs.clear();
694 n.refs(refs);
695 refs.with_external_refs(|id| {
696 if let Some(v) = self.ctx.cached.get(&id) {
697 if let Entry::Vacant(e) = self.event.variables.entry(id) {
698 e.insert(v.clone());
699 clear.push(id);
700 }
701 }
702 });
703 });
704 }
705 let res = catch_unwind(AssertUnwindSafe(|| {
706 n.update(&mut self.ctx, &mut self.event)
707 }));
708 for id in clear {
709 self.event.variables.remove(&id);
710 }
711 match res {
712 Ok(None) => (),
713 Ok(Some(v)) => batch.push(RtEvent::Updated(*id, v)),
714 Err(e) => {
715 error!("could not update exprid: {id:?}, panic: {e:?}")
716 }
717 }
718 }
719 }
720 loop {
721 match self.sub.send_timeout(batch, Duration::from_millis(100)).await {
722 Ok(()) => break,
723 Err(SendTimeoutError::Closed(_)) => {
724 error!("could not send batch");
725 break;
726 }
727 Err(SendTimeoutError::Timeout(b)) => {
728 batch = b;
729 while let Ok(m) = to_rt.try_recv() {
731 input.push(m);
732 }
733 self.process_input_batch(tasks, input, &mut batch).await;
734 }
735 }
736 }
737 self.event.clear();
738 self.updated.clear();
739 if self.ctx.user.batch.len() > 0 {
740 let batch = mem::replace(
741 &mut self.ctx.user.batch,
742 self.ctx.user.publisher.start_batch(),
743 );
744 let timeout = self.publish_timeout;
745 task::spawn(async move { batch.commit(timeout).await });
746 }
747 }
748
749 async fn process_input_batch(
750 &mut self,
751 tasks: &mut Vec<(BindId, Value)>,
752 input: &mut Vec<ToRt>,
753 batch: &mut Pooled<Vec<RtEvent>>,
754 ) {
755 for m in input.drain(..) {
756 match m {
757 ToRt::GetEnv { res } => {
758 let _ = res.send(self.ctx.env.clone());
759 }
760 ToRt::Compile { text, rt, res } => {
761 let _ = res.send(self.compile(rt, text).await);
762 }
763 ToRt::Load { path, rt, res } => {
764 let _ = res.send(self.load(rt, &path).await);
765 }
766 ToRt::Delete { id } => {
767 if let Some(mut n) = self.nodes.shift_remove(&id) {
768 n.delete(&mut self.ctx);
769 }
770 debug!("delete {id:?}");
771 batch.push(RtEvent::Env(self.ctx.env.clone()));
772 }
773 ToRt::CompileCallable { id, rt, res } => {
774 let _ = res.send(self.compile_callable(id, rt));
775 }
776 ToRt::CompileRef { id, rt, res } => {
777 let _ = res.send(self.compile_ref(rt, id));
778 }
779 ToRt::Set { id, v } => {
780 self.ctx.cached.insert(id, v.clone());
781 tasks.push((id, v))
782 }
783 ToRt::DeleteCallable { id } => self.delete_callable(id),
784 ToRt::Call { id, args } => {
785 if let Err(e) = self.call_callable(id, args, tasks) {
786 error!("calling callable {id:?} failed with {e:?}")
787 }
788 }
789 }
790 }
791 }
792
793 fn cycle_ready(&self) -> bool {
794 self.ctx.user.var_updates.len() > 0
795 || self.ctx.user.net_updates.len() > 0
796 || self.ctx.user.net_writes.len() > 0
797 || self.ctx.user.rpc_overflow.len() > 0
798 }
799
800 async fn compile_root(&mut self, text: ArcStr) -> Result<()> {
801 let scope = ModPath::root();
802 let ori = expr::parser::parse(SourceOrigin::Unspecified, text.clone())
803 .context("parsing the root module")?;
804 let exprs =
805 join_all(ori.exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
806 .await
807 .into_iter()
808 .collect::<Result<SmallVec<[_; 4]>>>()
809 .context(CouldNotResolve)?;
810 let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
811 let nodes = ori
812 .exprs
813 .iter()
814 .map(|e| {
815 compile(&mut self.ctx, &scope, e.clone())
816 .with_context(|| format!("compiling root expression {e}"))
817 })
818 .collect::<Result<SmallVec<[_; 4]>>>()
819 .with_context(|| ori.clone())?;
820 for (e, n) in ori.exprs.iter().zip(nodes.into_iter()) {
821 self.updated.insert(e.id, true);
822 self.nodes.insert(e.id, n);
823 }
824 Ok(())
825 }
826
827 async fn compile(&mut self, rt: GXHandle, text: ArcStr) -> Result<CompRes> {
828 let scope = ModPath::root();
829 let ori = expr::parser::parse(SourceOrigin::Unspecified, text.clone())?;
830 let exprs =
831 join_all(ori.exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
832 .await
833 .into_iter()
834 .collect::<Result<SmallVec<[_; 4]>>>()
835 .context(CouldNotResolve)?;
836 let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
837 let nodes = ori
838 .exprs
839 .iter()
840 .map(|e| compile(&mut self.ctx, &scope, e.clone()))
841 .collect::<Result<SmallVec<[_; 4]>>>()
842 .with_context(|| ori.clone())?;
843 let exprs = ori
844 .exprs
845 .iter()
846 .zip(nodes.into_iter())
847 .map(|(e, n)| {
848 let output = is_output(&n);
849 let typ = n.typ().clone();
850 self.updated.insert(e.id, true);
851 self.nodes.insert(e.id, n);
852 CompExp { id: e.id, output, typ, rt: rt.clone() }
853 })
854 .collect::<SmallVec<[_; 1]>>();
855 Ok(CompRes { exprs, env: self.ctx.env.clone() })
856 }
857
858 async fn load(&mut self, rt: GXHandle, file: &PathBuf) -> Result<CompRes> {
859 let scope = ModPath::root();
860 let st = Instant::now();
861 let ori = match file.extension() {
862 Some(e) if e.as_bytes() == b"gx" => {
863 let file = file.canonicalize()?;
864 let s = fs::read_to_string(&file).await?;
865 let s = if s.starts_with("#!") {
866 if let Some(i) = s.find('\n') {
867 &s[i..]
868 } else {
869 s.as_str()
870 }
871 } else {
872 s.as_str()
873 };
874 let s = ArcStr::from(s);
875 expr::parser::parse(SourceOrigin::File(file), s)?
876 }
877 Some(e) => bail!("invalid file extension {e:?}"),
878 None => {
879 let name = file
880 .components()
881 .map(|c| match c {
882 Component::RootDir
883 | Component::CurDir
884 | Component::ParentDir
885 | Component::Prefix(_) => bail!("invalid module name {file:?}"),
886 Component::Normal(s) => Ok(s),
887 })
888 .collect::<Result<Box<[_]>>>()?;
889 if name.len() != 1 {
890 bail!("invalid module name {file:?}")
891 }
892 let name = String::from_utf8_lossy(name[0].as_bytes());
893 let name = name
894 .parse::<ModPath>()
895 .with_context(|| "parsing module name {file:?}")?;
896 let name = Path::basename(&*name)
897 .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
898 let name = ArcStr::from(name);
899 let e = ExprKind::Module {
900 export: true,
901 name: name.clone(),
902 value: ModuleKind::Unresolved,
903 }
904 .to_expr(Default::default());
905 let ori = Origin {
906 origin: SourceOrigin::Internal(name),
907 source: literal!(""),
908 exprs: Arc::from_iter([e]),
909 };
910 ori
911 }
912 };
913 info!("parse time: {:?}", st.elapsed());
914 let st = Instant::now();
915 let ori = {
916 let _ori = ori.clone();
917 ori.resolve_modules(&self.resolvers).await.with_context(|| _ori)?
918 };
919 info!("resolve time: {:?}", st.elapsed());
920 let mut exprs = smallvec![];
921 for e in ori.exprs.iter() {
922 let top_id = e.id;
923 let n =
924 compile(&mut self.ctx, &scope, e.clone()).with_context(|| ori.clone())?;
925 let has_out = is_output(&n);
926 let typ = n.typ().clone();
927 self.nodes.insert(top_id, n);
928 self.updated.insert(top_id, true);
929 exprs.push(CompExp { id: top_id, output: has_out, typ, rt: rt.clone() })
930 }
931 Ok(CompRes { exprs, env: self.ctx.env.clone() })
932 }
933
934 fn compile_callable(&mut self, id: Value, rt: GXHandle) -> Result<Callable> {
935 let id = match id {
936 Value::U64(id) => LambdaId::from(id),
937 v => bail!("invalid lambda id {v}"),
938 };
939 let lb = self.ctx.env.lambdas.get(&id).and_then(Weak::upgrade);
940 let lb = lb.ok_or_else(|| anyhow!("unknown lambda {id:?}"))?;
941 let args = lb.typ.args.iter();
942 let args = args
943 .map(|a| {
944 if a.label.as_ref().map(|(_, opt)| *opt).unwrap_or(false) {
945 bail!("can't call lambda with an optional argument from rust")
946 } else {
947 Ok(BindId::new())
948 }
949 })
950 .collect::<Result<Box<[_]>>>()?;
951 let eid = ExprId::new();
952 let argn = lb.typ.args.iter().zip(args.iter());
953 let argn = argn
954 .map(|(arg, id)| genn::reference(&mut self.ctx, *id, arg.typ.clone(), eid))
955 .collect::<Vec<_>>();
956 let fnode = genn::constant(Value::U64(id.inner()));
957 let mut n = genn::apply(fnode, argn, lb.typ.clone(), eid);
958 self.event.init = true;
959 n.update(&mut self.ctx, &mut self.event);
960 self.event.clear();
961 let cid = CallableId::new();
962 self.callables.insert(cid, CallableInt { expr: eid, args });
963 self.nodes.insert(eid, n);
964 let env = self.ctx.env.clone();
965 Ok(Callable { expr: eid, rt, env, id: cid, typ: (*lb.typ).clone() })
966 }
967
968 fn compile_ref(&mut self, rt: GXHandle, id: BindId) -> Result<Ref> {
969 let eid = ExprId::new();
970 let typ = Type::Any;
971 let n = genn::reference(&mut self.ctx, id, typ, eid);
972 self.nodes.insert(eid, n);
973 let target_bid = self.ctx.env.byref_chain.get(&id).copied();
974 Ok(Ref {
975 id: eid,
976 bid: id,
977 target_bid,
978 last: self.ctx.cached.get(&id).cloned(),
979 rt,
980 })
981 }
982
983 fn call_callable(
984 &mut self,
985 id: CallableId,
986 args: ValArray,
987 tasks: &mut Vec<(BindId, Value)>,
988 ) -> Result<()> {
989 let c =
990 self.callables.get(&id).ok_or_else(|| anyhow!("unknown callable {id:?}"))?;
991 if args.len() != c.args.len() {
992 bail!("expected {} arguments", c.args.len());
993 }
994 let a = c.args.iter().zip(args.iter()).map(|(id, v)| (*id, v.clone()));
995 tasks.extend(a);
996 Ok(())
997 }
998
999 fn delete_callable(&mut self, id: CallableId) {
1000 if let Some(c) = self.callables.remove(&id) {
1001 if let Some(mut n) = self.nodes.shift_remove(&c.expr) {
1002 n.delete(&mut self.ctx)
1003 }
1004 }
1005 }
1006
1007 async fn run(mut self, mut to_rt: tmpsc::UnboundedReceiver<ToRt>) -> Result<()> {
1008 let mut tasks = vec![];
1009 let mut input = vec![];
1010 let mut rpcs = vec![];
1011 let onemin = Duration::from_secs(60);
1012 'main: loop {
1013 let now = Instant::now();
1014 let ready = self.cycle_ready();
1015 let mut updates = None;
1016 let mut writes = None;
1017 macro_rules! peek {
1018 (updates) => {
1019 if self.ctx.user.net_updates.is_empty() {
1020 while let Ok(Some(mut up)) = self.ctx.user.updates.try_next() {
1021 match &mut updates {
1022 None => updates = Some(up),
1023 Some(prev) => prev.extend(up.drain(..)),
1024 }
1025 }
1026 }
1027 };
1028 (writes) => {
1029 if self.ctx.user.net_writes.is_empty() {
1030 if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
1031 writes = Some(wr);
1032 }
1033 }
1034 };
1035 (tasks) => {
1036 while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
1037 tasks.push(up);
1038 }
1039 };
1040 (rpcs) => {
1041 if self.ctx.user.rpc_overflow.is_empty() {
1042 while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
1043 rpcs.push(up);
1044 }
1045 }
1046 };
1047 (input) => {
1048 while let Ok(m) = to_rt.try_recv() {
1049 input.push(m);
1050 }
1051 };
1052 ($($item:tt),+) => {{
1053 $(peek!($item));+
1054 }};
1055 }
1056 select! {
1057 rp = maybe_next(
1058 self.ctx.user.rpc_overflow.is_empty(),
1059 &mut self.ctx.user.rpcs
1060 ) => {
1061 rpcs.push(rp);
1062 peek!(updates, tasks, writes, rpcs, input)
1063 }
1064 wr = maybe_next(
1065 self.ctx.user.net_writes.is_empty(),
1066 &mut self.ctx.user.writes
1067 ) => {
1068 writes = Some(wr);
1069 peek!(updates, tasks, rpcs, input);
1070 },
1071 up = maybe_next(
1072 self.ctx.user.net_updates.is_empty(),
1073 &mut self.ctx.user.updates
1074 ) => {
1075 updates = Some(up);
1076 peek!(updates, writes, tasks, rpcs, input);
1077 },
1078 up = join_or_wait(&mut self.ctx.user.tasks) => {
1079 if let Ok(up) = up {
1080 tasks.push(up);
1081 }
1082 peek!(updates, writes, tasks, rpcs, input)
1083 },
1084 _ = or_never(ready) => {
1085 peek!(updates, writes, tasks, rpcs, input)
1086 },
1087 n = to_rt.recv_many(&mut input, 100000) => {
1088 if n == 0 {
1089 break 'main Ok(())
1090 }
1091 peek!(updates, writes, tasks, rpcs);
1092 },
1093 () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
1094 while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
1095 if ts.elapsed() >= Duration::from_secs(1) {
1096 self.ctx.user.pending_unsubscribe.pop_front();
1097 } else {
1098 break
1099 }
1100 }
1101 continue 'main
1102 },
1103 }
1104 let mut batch = BATCH.take();
1105 self.process_input_batch(&mut tasks, &mut input, &mut batch).await;
1106 self.do_cycle(
1107 updates, writes, &mut tasks, &mut rpcs, &mut to_rt, &mut input, batch,
1108 )
1109 .await;
1110 if !self.ctx.user.rpc_clients.is_empty() {
1111 if now - self.last_rpc_gc >= onemin {
1112 self.last_rpc_gc = now;
1113 self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
1114 }
1115 }
1116 }
1117 }
1118}
1119
1120#[derive(Clone)]
1123pub struct GXHandle(tmpsc::UnboundedSender<ToRt>);
1124
1125impl GXHandle {
1126 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
1127 let (tx, rx) = oneshot::channel();
1128 self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
1129 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
1130 }
1131
1132 pub async fn get_env(&self) -> Result<Env<GXCtx, NoUserEvent>> {
1134 self.exec(|res| ToRt::GetEnv { res }).await
1135 }
1136
1137 pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
1143 Ok(self.exec(|tx| ToRt::Compile { text, res: tx, rt: self.clone() }).await??)
1144 }
1145
1146 pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
1154 Ok(self.exec(|tx| ToRt::Load { path, res: tx, rt: self.clone() }).await??)
1155 }
1156
1157 pub async fn compile_callable(&self, id: Value) -> Result<Callable> {
1161 Ok(self
1162 .exec(|tx| ToRt::CompileCallable { id, rt: self.clone(), res: tx })
1163 .await??)
1164 }
1165
1166 pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref> {
1170 Ok(self
1171 .exec(|tx| ToRt::CompileRef { id: id.into(), res: tx, rt: self.clone() })
1172 .await??)
1173 }
1174
1175 pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
1178 let v = v.into();
1179 self.0.send(ToRt::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
1180 }
1181}
1182
1183#[derive(Builder)]
1184#[builder(pattern = "owned")]
1185pub struct GXConfig {
1186 #[builder(setter(strip_option), default)]
1190 resolve_timeout: Option<Duration>,
1191 #[builder(setter(strip_option), default)]
1193 publish_timeout: Option<Duration>,
1194 ctx: ExecCtx<GXCtx, NoUserEvent>,
1196 #[builder(setter(strip_option), default)]
1198 root: Option<ArcStr>,
1199 #[builder(default)]
1201 resolvers: Vec<ModuleResolver>,
1202 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1204}
1205
1206impl GXConfig {
1207 pub fn builder(
1209 ctx: ExecCtx<GXCtx, NoUserEvent>,
1210 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1211 ) -> GXConfigBuilder {
1212 GXConfigBuilder::default().ctx(ctx).sub(sub)
1213 }
1214
1215 pub async fn start(self) -> Result<GXHandle> {
1226 let (init_tx, init_rx) = oneshot::channel();
1227 let (tx, rx) = tmpsc::unbounded_channel();
1228 task::spawn(async move {
1229 match GX::new(self).await {
1230 Ok(bs) => {
1231 let _ = init_tx.send(Ok(()));
1232 if let Err(e) = bs.run(rx).await {
1233 error!("run loop exited with error {e:?}")
1234 }
1235 }
1236 Err(e) => {
1237 let _ = init_tx.send(Err(e));
1238 }
1239 };
1240 });
1241 init_rx.await??;
1242 Ok(GXHandle(tx))
1243 }
1244}