1use anyhow::{anyhow, bail, Context, Result};
13use arcstr::{literal, ArcStr};
14use chrono::prelude::*;
15use combine::stream::position::SourcePosition;
16use compact_str::format_compact;
17use core::fmt;
18use derive_builder::Builder;
19use futures::{channel::mpsc, future::join_all, FutureExt, StreamExt};
20use fxhash::{FxBuildHasher, FxHashMap};
21use graphix_compiler::{
22 compile,
23 env::Env,
24 expr::{self, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin},
25 node::genn,
26 typ::{FnType, Type},
27 BindId, Ctx, Event, ExecCtx, LambdaId, NoUserEvent, Node, REFS,
28};
29use indexmap::IndexMap;
30use log::{debug, error, info};
31use netidx::{
32 path::Path,
33 pool::{Pool, Pooled},
34 protocol::valarray::ValArray,
35 publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
36 resolver_client::ChangeTracker,
37 subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
38};
39use netidx_core::atomic_id;
40use netidx_protocols::rpc::{
41 self,
42 server::{ArgSpec, RpcCall},
43};
44use serde_derive::{Deserialize, Serialize};
45use smallvec::{smallvec, SmallVec};
46use std::{
47 collections::{hash_map::Entry, HashMap, VecDeque},
48 future, mem,
49 os::unix::ffi::OsStrExt,
50 panic::{catch_unwind, AssertUnwindSafe},
51 path::{Component, PathBuf},
52 result,
53 sync::{LazyLock, Weak},
54 time::Duration,
55};
56use tokio::{
57 fs, select,
58 sync::{
59 mpsc::{self as tmpsc, error::SendTimeoutError, UnboundedReceiver},
60 oneshot, Mutex,
61 },
62 task::{self, JoinError, JoinSet},
63 time::{self, Instant},
64};
65use triomphe::Arc;
66
67type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
68type WriteBatch = Pooled<Vec<WriteRequest>>;
69
70#[derive(Debug)]
71pub struct CouldNotResolve;
72
73impl fmt::Display for CouldNotResolve {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(f, "could not resolve module")
76 }
77}
78
79#[derive(Debug)]
80struct RpcClient {
81 proc: rpc::client::Proc,
82 last_used: Instant,
83}
84
85#[derive(Debug)]
86pub struct GXCtx {
87 by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
88 subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
89 published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
90 var_updates: VecDeque<(BindId, Value)>,
91 net_updates: VecDeque<(SubId, subscriber::Event)>,
92 net_writes: VecDeque<(Id, WriteRequest)>,
93 rpc_overflow: VecDeque<(BindId, RpcCall)>,
94 rpc_clients: FxHashMap<Path, RpcClient>,
95 published_rpcs: FxHashMap<Path, rpc::server::Proc>,
96 pending_unsubscribe: VecDeque<(Instant, Dval)>,
97 change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
98 tasks: JoinSet<(BindId, Value)>,
99 batch: publisher::UpdateBatch,
100 publisher: Publisher,
101 subscriber: Subscriber,
102 updates_tx: mpsc::Sender<UpdateBatch>,
103 updates: mpsc::Receiver<UpdateBatch>,
104 writes_tx: mpsc::Sender<WriteBatch>,
105 writes: mpsc::Receiver<WriteBatch>,
106 rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
107 rpcs: mpsc::Receiver<(BindId, RpcCall)>,
108}
109
110impl GXCtx {
111 pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
112 let (updates_tx, updates) = mpsc::channel(100);
113 let (writes_tx, writes) = mpsc::channel(100);
114 let (rpcs_tx, rpcs) = mpsc::channel(100);
115 let batch = publisher.start_batch();
116 let mut tasks = JoinSet::new();
117 tasks.spawn(async { future::pending().await });
118 Self {
119 by_ref: HashMap::default(),
120 var_updates: VecDeque::new(),
121 net_updates: VecDeque::new(),
122 net_writes: VecDeque::new(),
123 rpc_overflow: VecDeque::new(),
124 rpc_clients: HashMap::default(),
125 subscribed: HashMap::default(),
126 pending_unsubscribe: VecDeque::new(),
127 published: HashMap::default(),
128 change_trackers: HashMap::default(),
129 published_rpcs: HashMap::default(),
130 tasks,
131 batch,
132 publisher,
133 subscriber,
134 updates,
135 updates_tx,
136 writes,
137 writes_tx,
138 rpcs_tx,
139 rpcs,
140 }
141 }
142}
143
144macro_rules! or_err {
145 ($bindid:expr, $e:expr) => {
146 match $e {
147 Ok(v) => v,
148 Err(e) => {
149 let e = ArcStr::from(format_compact!("{e:?}").as_str());
150 let e = Value::Error(e);
151 return ($bindid, e);
152 }
153 }
154 };
155}
156
157macro_rules! check_changed {
158 ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
159 let mut ct = $ct.lock().await;
160 if ct.path() != &$path {
161 *ct = ChangeTracker::new($path.clone());
162 }
163 if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
164 return ($id, Value::Null);
165 }
166 };
167}
168
169impl Ctx for GXCtx {
170 fn clear(&mut self) {
171 let Self {
172 by_ref,
173 var_updates,
174 net_updates,
175 net_writes,
176 rpc_clients,
177 rpc_overflow,
178 subscribed,
179 published,
180 published_rpcs,
181 pending_unsubscribe,
182 change_trackers,
183 tasks,
184 batch,
185 publisher,
186 subscriber: _,
187 updates_tx,
188 updates,
189 writes_tx,
190 writes,
191 rpcs,
192 rpcs_tx,
193 } = self;
194 by_ref.clear();
195 var_updates.clear();
196 net_updates.clear();
197 net_writes.clear();
198 rpc_overflow.clear();
199 rpc_clients.clear();
200 subscribed.clear();
201 published.clear();
202 published_rpcs.clear();
203 pending_unsubscribe.clear();
204 change_trackers.clear();
205 *tasks = JoinSet::new();
206 tasks.spawn(async { future::pending().await });
207 *batch = publisher.start_batch();
208 let (tx, rx) = mpsc::channel(3);
209 *updates_tx = tx;
210 *updates = rx;
211 let (tx, rx) = mpsc::channel(100);
212 *writes_tx = tx;
213 *writes = rx;
214 let (tx, rx) = mpsc::channel(100);
215 *rpcs_tx = tx;
216 *rpcs = rx
217 }
218
219 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
220 let now = Instant::now();
221 let proc = match self.rpc_clients.entry(name) {
222 Entry::Occupied(mut e) => {
223 let cl = e.get_mut();
224 cl.last_used = now;
225 Ok(cl.proc.clone())
226 }
227 Entry::Vacant(e) => {
228 match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
229 Err(e) => Err(e),
230 Ok(proc) => {
231 let cl = RpcClient { last_used: now, proc: proc.clone() };
232 e.insert(cl);
233 Ok(proc)
234 }
235 }
236 }
237 };
238 self.tasks.spawn(async move {
239 macro_rules! err {
240 ($e:expr) => {{
241 let e = format_compact!("{:?}", $e);
242 (id, Value::Error(e.as_str().into()))
243 }};
244 }
245 match proc {
246 Err(e) => err!(e),
247 Ok(proc) => match proc.call(args).await {
248 Err(e) => err!(e),
249 Ok(res) => (id, res),
250 },
251 }
252 });
253 }
254
255 fn publish_rpc(
256 &mut self,
257 name: Path,
258 doc: Value,
259 spec: Vec<ArgSpec>,
260 id: BindId,
261 ) -> Result<()> {
262 use rpc::server::Proc;
263 let e = match self.published_rpcs.entry(name) {
264 Entry::Vacant(e) => e,
265 Entry::Occupied(_) => bail!("already published"),
266 };
267 let proc = Proc::new(
268 &self.publisher,
269 e.key().clone(),
270 doc,
271 spec,
272 move |c| Some((id, c)),
273 Some(self.rpcs_tx.clone()),
274 )?;
275 e.insert(proc);
276 Ok(())
277 }
278
279 fn unpublish_rpc(&mut self, name: Path) {
280 self.published_rpcs.remove(&name);
281 }
282
283 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
284 let dval =
285 self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
286 *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
287 dval
288 }
289
290 fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
291 if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
292 if let Some(cn) = exprs.get_mut(&ref_by) {
293 *cn -= 1;
294 if *cn == 0 {
295 exprs.remove(&ref_by);
296 }
297 }
298 if exprs.is_empty() {
299 self.subscribed.remove(&dv.id());
300 }
301 }
302 self.pending_unsubscribe.push_back((Instant::now(), dv));
303 }
304
305 fn list(&mut self, id: BindId, path: Path) {
306 let ct = self
307 .change_trackers
308 .entry(id)
309 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
310 let ct = Arc::clone(ct);
311 let resolver = self.subscriber.resolver();
312 self.tasks.spawn(async move {
313 check_changed!(id, resolver, path, ct);
314 let mut paths = or_err!(id, resolver.list(path).await);
315 let paths = paths.drain(..).map(|p| Value::String(p.into()));
316 (id, Value::Array(ValArray::from_iter_exact(paths)))
317 });
318 }
319
320 fn list_table(&mut self, id: BindId, path: Path) {
321 let ct = self
322 .change_trackers
323 .entry(id)
324 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
325 let ct = Arc::clone(ct);
326 let resolver = self.subscriber.resolver();
327 self.tasks.spawn(async move {
328 check_changed!(id, resolver, path, ct);
329 let mut tbl = or_err!(id, resolver.table(path).await);
330 let cols = tbl.cols.drain(..).map(|(name, count)| {
331 Value::Array(ValArray::from([
332 Value::String(name.into()),
333 Value::V64(count.0),
334 ]))
335 });
336 let cols = Value::Array(ValArray::from_iter_exact(cols));
337 let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
338 let rows = Value::Array(ValArray::from_iter_exact(rows));
339 let tbl = Value::Array(ValArray::from([
340 Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
341 Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
342 ]));
343 (id, tbl)
344 });
345 }
346
347 fn stop_list(&mut self, id: BindId) {
348 self.change_trackers.remove(&id);
349 }
350
351 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
352 let val = self.publisher.publish_with_flags_and_writes(
353 PublishFlags::empty(),
354 path,
355 value,
356 Some(self.writes_tx.clone()),
357 )?;
358 let id = val.id();
359 *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
360 Ok(val)
361 }
362
363 fn update(&mut self, val: &Val, value: Value) {
364 val.update(&mut self.batch, value);
365 }
366
367 fn unpublish(&mut self, val: Val, ref_by: ExprId) {
368 if let Some(refs) = self.published.get_mut(&val.id()) {
369 if let Some(cn) = refs.get_mut(&ref_by) {
370 *cn -= 1;
371 if *cn == 0 {
372 refs.remove(&ref_by);
373 }
374 }
375 if refs.is_empty() {
376 self.published.remove(&val.id());
377 }
378 }
379 }
380
381 fn set_timer(&mut self, id: BindId, timeout: Duration) {
382 self.tasks
383 .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
384 }
385
386 fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
387 *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
388 }
389
390 fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
391 if let Some(refs) = self.by_ref.get_mut(&id) {
392 if let Some(cn) = refs.get_mut(&ref_by) {
393 *cn -= 1;
394 if *cn == 0 {
395 refs.remove(&ref_by);
396 }
397 }
398 if refs.is_empty() {
399 self.by_ref.remove(&id);
400 }
401 }
402 }
403
404 fn set_var(&mut self, id: BindId, value: Value) {
405 self.var_updates.push_back((id, value.clone()));
406 }
407}
408
409fn is_output(n: &Node<GXCtx, NoUserEvent>) -> bool {
410 match &n.spec().kind {
411 ExprKind::Bind { .. }
412 | ExprKind::Lambda { .. }
413 | ExprKind::Use { .. }
414 | ExprKind::Connect { .. }
415 | ExprKind::Module { .. }
416 | ExprKind::TypeDef { .. } => false,
417 _ => true,
418 }
419}
420
421async fn or_never(b: bool) {
422 if !b {
423 future::pending().await
424 }
425}
426
427async fn join_or_wait(
428 js: &mut JoinSet<(BindId, Value)>,
429) -> result::Result<(BindId, Value), JoinError> {
430 match js.join_next().await {
431 None => future::pending().await,
432 Some(r) => r,
433 }
434}
435
436async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
437 if go {
438 match ch.next().await {
439 None => future::pending().await,
440 Some(v) => v,
441 }
442 } else {
443 future::pending().await
444 }
445}
446
447async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
448 if pending.len() == 0 {
449 future::pending().await
450 } else {
451 let (ts, _) = pending.front().unwrap();
452 let one = Duration::from_secs(1);
453 let elapsed = now - *ts;
454 if elapsed < one {
455 time::sleep(one - elapsed).await
456 }
457 }
458}
459
460pub struct CompExp {
461 pub id: ExprId,
462 pub typ: Type,
463 pub output: bool,
464 rt: GXHandle,
465}
466
467impl Drop for CompExp {
468 fn drop(&mut self) {
469 let _ = self.rt.0.send(ToRt::Delete { id: self.id });
470 }
471}
472
473pub struct CompRes {
474 pub exprs: SmallVec<[CompExp; 1]>,
475 pub env: Env<GXCtx, NoUserEvent>,
476}
477
478#[derive(Clone)]
479pub enum RtEvent {
480 Updated(ExprId, Value),
481 Env(Env<GXCtx, NoUserEvent>),
482}
483
484static BATCH: LazyLock<Pool<Vec<RtEvent>>> = LazyLock::new(|| Pool::new(10, 1000000));
485
486pub struct Ref {
487 pub id: ExprId,
488 pub last: Option<Value>,
489 pub bid: BindId,
490 pub target_bid: Option<BindId>,
491 rt: GXHandle,
492}
493
494impl Drop for Ref {
495 fn drop(&mut self) {
496 let _ = self.rt.0.send(ToRt::Delete { id: self.id });
497 }
498}
499
500impl Ref {
501 pub fn set(&self, v: Value) -> Result<()> {
502 self.rt.set(self.bid, v)
503 }
504
505 pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
506 if let Some(id) = self.target_bid {
507 self.rt.set(id, v)?
508 }
509 Ok(())
510 }
511}
512
513atomic_id!(CallableId);
514
515pub struct Callable {
516 rt: GXHandle,
517 id: CallableId,
518 env: Env<GXCtx, NoUserEvent>,
519 pub typ: FnType,
520 pub expr: ExprId,
521}
522
523impl Drop for Callable {
524 fn drop(&mut self) {
525 let _ = self.rt.0.send(ToRt::DeleteCallable { id: self.id });
526 }
527}
528
529impl Callable {
530 pub async fn call(&self, args: ValArray) -> Result<()> {
533 if self.typ.args.len() != args.len() {
534 bail!("expected {} args", self.typ.args.len())
535 }
536 for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
537 if !a.typ.is_a(&self.env, v) {
538 bail!("type mismatch arg {i} expected {}", a.typ)
539 }
540 }
541 self.call_unchecked(args).await
542 }
543
544 pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
548 self.rt
549 .0
550 .send(ToRt::Call { id: self.id, args })
551 .map_err(|_| anyhow!("runtime is dead"))
552 }
553}
554
555enum ToRt {
556 GetEnv { res: oneshot::Sender<Env<GXCtx, NoUserEvent>> },
557 Delete { id: ExprId },
558 Load { path: PathBuf, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
559 Compile { text: ArcStr, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
560 CompileCallable { id: Value, rt: GXHandle, res: oneshot::Sender<Result<Callable>> },
561 CompileRef { id: BindId, rt: GXHandle, res: oneshot::Sender<Result<Ref>> },
562 Set { id: BindId, v: Value },
563 Call { id: CallableId, args: ValArray },
564 DeleteCallable { id: CallableId },
565}
566
567struct CallableInt {
568 expr: ExprId,
569 args: Box<[BindId]>,
570}
571
572struct GX {
573 ctx: ExecCtx<GXCtx, NoUserEvent>,
574 event: Event<NoUserEvent>,
575 updated: FxHashMap<ExprId, bool>,
576 nodes: IndexMap<ExprId, Node<GXCtx, NoUserEvent>, FxBuildHasher>,
577 callables: FxHashMap<CallableId, CallableInt>,
578 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
579 resolvers: Arc<[ModuleResolver]>,
580 publish_timeout: Option<Duration>,
581 last_rpc_gc: Instant,
582}
583
584impl GX {
585 async fn new(mut rt: GXConfig) -> Result<Self> {
586 let resolvers_default = |r: &mut Vec<ModuleResolver>| match dirs::data_dir() {
587 None => r.push(ModuleResolver::Files("".into())),
588 Some(dd) => {
589 r.push(ModuleResolver::Files("".into()));
590 r.push(ModuleResolver::Files(dd.join("graphix")));
591 }
592 };
593 match std::env::var("GRAPHIX_MODPATH") {
594 Err(_) => resolvers_default(&mut rt.resolvers),
595 Ok(mp) => match ModuleResolver::parse_env(
596 rt.ctx.user.subscriber.clone(),
597 rt.resolve_timeout,
598 &mp,
599 ) {
600 Ok(r) => rt.resolvers.extend(r),
601 Err(e) => {
602 error!("failed to parse GRAPHIX_MODPATH, using default {e:?}");
603 resolvers_default(&mut rt.resolvers)
604 }
605 },
606 };
607 let mut t = Self {
608 ctx: rt.ctx,
609 event: Event::new(NoUserEvent),
610 updated: HashMap::default(),
611 nodes: IndexMap::default(),
612 callables: HashMap::default(),
613 sub: rt.sub,
614 resolvers: Arc::from(rt.resolvers),
615 publish_timeout: rt.publish_timeout,
616 last_rpc_gc: Instant::now(),
617 };
618 let st = Instant::now();
619 if let Some(root) = rt.root {
620 t.compile_root(root).await?;
621 }
622 info!("root init time: {:?}", st.elapsed());
623 Ok(t)
624 }
625
626 async fn do_cycle(
627 &mut self,
628 updates: Option<UpdateBatch>,
629 writes: Option<WriteBatch>,
630 tasks: &mut Vec<(BindId, Value)>,
631 rpcs: &mut Vec<(BindId, RpcCall)>,
632 to_rt: &mut UnboundedReceiver<ToRt>,
633 input: &mut Vec<ToRt>,
634 mut batch: Pooled<Vec<RtEvent>>,
635 ) {
636 macro_rules! push_event {
637 ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
638 match self.event.$event.entry($id) {
639 Entry::Vacant(e) => {
640 e.insert($v);
641 if let Some(exps) = self.ctx.user.$refed.get(&$id) {
642 for id in exps.keys() {
643 self.updated.entry(*id).or_insert(false);
644 }
645 }
646 }
647 Entry::Occupied(_) => {
648 self.ctx.user.$overflow.push_back(($id, $v));
649 }
650 }
651 };
652 }
653 for _ in 0..self.ctx.user.var_updates.len() {
654 let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
655 push_event!(id, v, variables, by_ref, var_updates)
656 }
657 for (id, v) in tasks.drain(..) {
658 push_event!(id, v, variables, by_ref, var_updates)
659 }
660 for _ in 0..self.ctx.user.rpc_overflow.len() {
661 let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
662 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
663 }
664 for (id, v) in rpcs.drain(..) {
665 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
666 }
667 for _ in 0..self.ctx.user.net_updates.len() {
668 let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
669 push_event!(id, v, netidx, subscribed, net_updates)
670 }
671 if let Some(mut updates) = updates {
672 for (id, v) in updates.drain(..) {
673 push_event!(id, v, netidx, subscribed, net_updates)
674 }
675 }
676 for _ in 0..self.ctx.user.net_writes.len() {
677 let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
678 push_event!(id, v, writes, published, net_writes)
679 }
680 if let Some(mut writes) = writes {
681 for wr in writes.drain(..) {
682 let id = wr.id;
683 push_event!(id, wr, writes, published, net_writes)
684 }
685 }
686 for (id, n) in self.nodes.iter_mut() {
687 if let Some(init) = self.updated.get(id) {
688 let mut clear: SmallVec<[BindId; 16]> = smallvec![];
689 self.event.init = *init;
690 if self.event.init {
691 REFS.with_borrow_mut(|refs| {
692 refs.clear();
693 n.refs(refs);
694 refs.with_external_refs(|id| {
695 if let Some(v) = self.ctx.cached.get(&id) {
696 if let Entry::Vacant(e) = self.event.variables.entry(id) {
697 e.insert(v.clone());
698 clear.push(id);
699 }
700 }
701 });
702 });
703 }
704 let res = catch_unwind(AssertUnwindSafe(|| {
705 n.update(&mut self.ctx, &mut self.event)
706 }));
707 for id in clear {
708 self.event.variables.remove(&id);
709 }
710 match res {
711 Ok(None) => (),
712 Ok(Some(v)) => batch.push(RtEvent::Updated(*id, v)),
713 Err(e) => {
714 error!("could not update exprid: {id:?}, panic: {e:?}")
715 }
716 }
717 }
718 }
719 loop {
720 match self.sub.send_timeout(batch, Duration::from_millis(100)).await {
721 Ok(()) => break,
722 Err(SendTimeoutError::Closed(_)) => {
723 error!("could not send batch");
724 break;
725 }
726 Err(SendTimeoutError::Timeout(b)) => {
727 batch = b;
728 while let Ok(m) = to_rt.try_recv() {
730 input.push(m);
731 }
732 self.process_input_batch(tasks, input, &mut batch).await;
733 }
734 }
735 }
736 self.event.clear();
737 self.updated.clear();
738 if self.ctx.user.batch.len() > 0 {
739 let batch = mem::replace(
740 &mut self.ctx.user.batch,
741 self.ctx.user.publisher.start_batch(),
742 );
743 let timeout = self.publish_timeout;
744 task::spawn(async move { batch.commit(timeout).await });
745 }
746 }
747
748 async fn process_input_batch(
749 &mut self,
750 tasks: &mut Vec<(BindId, Value)>,
751 input: &mut Vec<ToRt>,
752 batch: &mut Pooled<Vec<RtEvent>>,
753 ) {
754 for m in input.drain(..) {
755 match m {
756 ToRt::GetEnv { res } => {
757 let _ = res.send(self.ctx.env.clone());
758 }
759 ToRt::Compile { text, rt, res } => {
760 let _ = res.send(self.compile(rt, text).await);
761 }
762 ToRt::Load { path, rt, res } => {
763 let _ = res.send(self.load(rt, &path).await);
764 }
765 ToRt::Delete { id } => {
766 if let Some(mut n) = self.nodes.shift_remove(&id) {
767 n.delete(&mut self.ctx);
768 }
769 debug!("delete {id:?}");
770 batch.push(RtEvent::Env(self.ctx.env.clone()));
771 }
772 ToRt::CompileCallable { id, rt, res } => {
773 let _ = res.send(self.compile_callable(id, rt));
774 }
775 ToRt::CompileRef { id, rt, res } => {
776 let _ = res.send(self.compile_ref(rt, id));
777 }
778 ToRt::Set { id, v } => {
779 self.ctx.cached.insert(id, v.clone());
780 tasks.push((id, v))
781 }
782 ToRt::DeleteCallable { id } => self.delete_callable(id),
783 ToRt::Call { id, args } => {
784 if let Err(e) = self.call_callable(id, args, tasks) {
785 error!("calling callable {id:?} failed with {e:?}")
786 }
787 }
788 }
789 }
790 }
791
792 fn cycle_ready(&self) -> bool {
793 self.ctx.user.var_updates.len() > 0
794 || self.ctx.user.net_updates.len() > 0
795 || self.ctx.user.net_writes.len() > 0
796 || self.ctx.user.rpc_overflow.len() > 0
797 }
798
799 async fn compile_root(&mut self, text: ArcStr) -> Result<()> {
800 let scope = ModPath::root();
801 let ori =
802 expr::parser::parse(None, text.clone()).context("parsing the root module")?;
803 let exprs = join_all(
804 ori.exprs.iter().map(|e| e.resolve_modules(&scope, &self.resolvers)),
805 )
806 .await
807 .into_iter()
808 .collect::<Result<SmallVec<[_; 4]>>>()
809 .context(CouldNotResolve)?;
810 let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
811 let nodes = ori
812 .exprs
813 .iter()
814 .map(|e| {
815 compile(&mut self.ctx, &scope, e.clone())
816 .with_context(|| format!("compiling root expression {e}"))
817 })
818 .collect::<Result<SmallVec<[_; 4]>>>()
819 .with_context(|| ori.clone())?;
820 for (e, n) in ori.exprs.iter().zip(nodes.into_iter()) {
821 self.updated.insert(e.id, true);
822 self.nodes.insert(e.id, n);
823 }
824 Ok(())
825 }
826
827 async fn compile(&mut self, rt: GXHandle, text: ArcStr) -> Result<CompRes> {
828 let scope = ModPath::root();
829 let ori = expr::parser::parse(None, text.clone())?;
830 let exprs = join_all(
831 ori.exprs.iter().map(|e| e.resolve_modules(&scope, &self.resolvers)),
832 )
833 .await
834 .into_iter()
835 .collect::<Result<SmallVec<[_; 4]>>>()
836 .context(CouldNotResolve)?;
837 let ori = Origin { exprs: Arc::from_iter(exprs), ..ori };
838 let nodes = ori
839 .exprs
840 .iter()
841 .map(|e| compile(&mut self.ctx, &scope, e.clone()))
842 .collect::<Result<SmallVec<[_; 4]>>>()
843 .with_context(|| ori.clone())?;
844 let exprs = ori
845 .exprs
846 .iter()
847 .zip(nodes.into_iter())
848 .map(|(e, n)| {
849 let output = is_output(&n);
850 let typ = n.typ().clone();
851 self.updated.insert(e.id, true);
852 self.nodes.insert(e.id, n);
853 CompExp { id: e.id, output, typ, rt: rt.clone() }
854 })
855 .collect::<SmallVec<[_; 1]>>();
856 Ok(CompRes { exprs, env: self.ctx.env.clone() })
857 }
858
859 async fn load(&mut self, rt: GXHandle, file: &PathBuf) -> Result<CompRes> {
860 let scope = ModPath::root();
861 let st = Instant::now();
862 let (scope, ori) = match file.extension() {
863 Some(e) if e.as_bytes() == b"gx" => {
864 let scope = match file.file_name() {
865 None => scope,
866 Some(name) => ModPath(scope.append(&*name.to_string_lossy())),
867 };
868 let s = fs::read_to_string(file).await?;
869 let s = if s.starts_with("#!") {
870 if let Some(i) = s.find('\n') {
871 &s[i..]
872 } else {
873 s.as_str()
874 }
875 } else {
876 s.as_str()
877 };
878 let s = ArcStr::from(s);
879 let name = ArcStr::from(file.to_string_lossy());
880 (scope, expr::parser::parse(Some(name), s)?)
881 }
882 Some(e) => bail!("invalid file extension {e:?}"),
883 None => {
884 let name = file
885 .components()
886 .map(|c| match c {
887 Component::RootDir
888 | Component::CurDir
889 | Component::ParentDir
890 | Component::Prefix(_) => bail!("invalid module name {file:?}"),
891 Component::Normal(s) => Ok(s),
892 })
893 .collect::<Result<Box<[_]>>>()?;
894 if name.len() != 1 {
895 bail!("invalid module name {file:?}")
896 }
897 let name = String::from_utf8_lossy(name[0].as_bytes());
898 let name = name
899 .parse::<ModPath>()
900 .with_context(|| "parsing module name {file:?}")?;
901 let scope =
902 ModPath(Path::from_str(Path::dirname(&*name).unwrap_or(&*scope)));
903 let name = Path::basename(&*name)
904 .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
905 let name = ArcStr::from(name);
906 let e = ExprKind::Module {
907 export: true,
908 name: name.clone(),
909 value: ModuleKind::Unresolved,
910 }
911 .to_expr(Default::default());
912 let ori = Origin {
913 name: Some(name),
914 source: literal!(""),
915 exprs: Arc::from_iter([e]),
916 };
917 (scope, ori)
918 }
919 };
920 let expr = ExprKind::Module {
921 export: true,
922 name: ori.name.clone().unwrap_or(literal!("")),
923 value: ModuleKind::Inline(ori.exprs.clone()),
924 }
925 .to_expr(SourcePosition::default());
926 info!("parse time: {:?}", st.elapsed());
927 let st = Instant::now();
928 let expr = expr
929 .resolve_modules(&scope, &self.resolvers)
930 .await
931 .with_context(|| ori.clone())?;
932 info!("resolve time: {:?}", st.elapsed());
933 let top_id = expr.id;
934 let n = compile(&mut self.ctx, &scope, expr).with_context(|| ori.clone())?;
935 let has_out = is_output(&n);
936 let typ = n.typ().clone();
937 self.nodes.insert(top_id, n);
938 self.updated.insert(top_id, true);
939 let exprs = smallvec![CompExp { id: top_id, output: has_out, typ, rt }];
940 Ok(CompRes { exprs, env: self.ctx.env.clone() })
941 }
942
943 fn compile_callable(&mut self, id: Value, rt: GXHandle) -> Result<Callable> {
944 let id = match id {
945 Value::U64(id) => LambdaId::from(id),
946 v => bail!("invalid lambda id {v}"),
947 };
948 let lb = self.ctx.env.lambdas.get(&id).and_then(Weak::upgrade);
949 let lb = lb.ok_or_else(|| anyhow!("unknown lambda {id:?}"))?;
950 let args = lb.typ.args.iter();
951 let args = args
952 .map(|a| {
953 if a.label.as_ref().map(|(_, opt)| *opt).unwrap_or(false) {
954 bail!("can't call lambda with an optional argument from rust")
955 } else {
956 Ok(BindId::new())
957 }
958 })
959 .collect::<Result<Box<[_]>>>()?;
960 let eid = ExprId::new();
961 let argn = lb.typ.args.iter().zip(args.iter());
962 let argn = argn
963 .map(|(arg, id)| genn::reference(&mut self.ctx, *id, arg.typ.clone(), eid))
964 .collect::<Vec<_>>();
965 let fnode = genn::constant(Value::U64(id.inner()));
966 let mut n = genn::apply(fnode, argn, lb.typ.clone(), eid);
967 self.event.init = true;
968 n.update(&mut self.ctx, &mut self.event);
969 self.event.clear();
970 let cid = CallableId::new();
971 self.callables.insert(cid, CallableInt { expr: eid, args });
972 self.nodes.insert(eid, n);
973 let env = self.ctx.env.clone();
974 Ok(Callable { expr: eid, rt, env, id: cid, typ: (*lb.typ).clone() })
975 }
976
977 fn compile_ref(&mut self, rt: GXHandle, id: BindId) -> Result<Ref> {
978 let eid = ExprId::new();
979 let typ = Type::Any;
980 let n = genn::reference(&mut self.ctx, id, typ, eid);
981 self.nodes.insert(eid, n);
982 let target_bid = self.ctx.env.byref_chain.get(&id).copied();
983 Ok(Ref {
984 id: eid,
985 bid: id,
986 target_bid,
987 last: self.ctx.cached.get(&id).cloned(),
988 rt,
989 })
990 }
991
992 fn call_callable(
993 &mut self,
994 id: CallableId,
995 args: ValArray,
996 tasks: &mut Vec<(BindId, Value)>,
997 ) -> Result<()> {
998 let c =
999 self.callables.get(&id).ok_or_else(|| anyhow!("unknown callable {id:?}"))?;
1000 if args.len() != c.args.len() {
1001 bail!("expected {} arguments", c.args.len());
1002 }
1003 let a = c.args.iter().zip(args.iter()).map(|(id, v)| (*id, v.clone()));
1004 tasks.extend(a);
1005 Ok(())
1006 }
1007
1008 fn delete_callable(&mut self, id: CallableId) {
1009 if let Some(c) = self.callables.remove(&id) {
1010 if let Some(mut n) = self.nodes.shift_remove(&c.expr) {
1011 n.delete(&mut self.ctx)
1012 }
1013 }
1014 }
1015
1016 async fn run(mut self, mut to_rt: tmpsc::UnboundedReceiver<ToRt>) -> Result<()> {
1017 let mut tasks = vec![];
1018 let mut input = vec![];
1019 let mut rpcs = vec![];
1020 let onemin = Duration::from_secs(60);
1021 'main: loop {
1022 let now = Instant::now();
1023 let ready = self.cycle_ready();
1024 let mut updates = None;
1025 let mut writes = None;
1026 macro_rules! peek {
1027 (updates) => {
1028 if self.ctx.user.net_updates.is_empty() {
1029 while let Ok(Some(mut up)) = self.ctx.user.updates.try_next() {
1030 match &mut updates {
1031 None => updates = Some(up),
1032 Some(prev) => prev.extend(up.drain(..)),
1033 }
1034 }
1035 }
1036 };
1037 (writes) => {
1038 if self.ctx.user.net_writes.is_empty() {
1039 if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
1040 writes = Some(wr);
1041 }
1042 }
1043 };
1044 (tasks) => {
1045 while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
1046 tasks.push(up);
1047 }
1048 };
1049 (rpcs) => {
1050 if self.ctx.user.rpc_overflow.is_empty() {
1051 while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
1052 rpcs.push(up);
1053 }
1054 }
1055 };
1056 (input) => {
1057 while let Ok(m) = to_rt.try_recv() {
1058 input.push(m);
1059 }
1060 };
1061 ($($item:tt),+) => {{
1062 $(peek!($item));+
1063 }};
1064 }
1065 select! {
1066 rp = maybe_next(
1067 self.ctx.user.rpc_overflow.is_empty(),
1068 &mut self.ctx.user.rpcs
1069 ) => {
1070 rpcs.push(rp);
1071 peek!(updates, tasks, writes, rpcs, input)
1072 }
1073 wr = maybe_next(
1074 self.ctx.user.net_writes.is_empty(),
1075 &mut self.ctx.user.writes
1076 ) => {
1077 writes = Some(wr);
1078 peek!(updates, tasks, rpcs, input);
1079 },
1080 up = maybe_next(
1081 self.ctx.user.net_updates.is_empty(),
1082 &mut self.ctx.user.updates
1083 ) => {
1084 updates = Some(up);
1085 peek!(updates, writes, tasks, rpcs, input);
1086 },
1087 up = join_or_wait(&mut self.ctx.user.tasks) => {
1088 if let Ok(up) = up {
1089 tasks.push(up);
1090 }
1091 peek!(updates, writes, tasks, rpcs, input)
1092 },
1093 _ = or_never(ready) => {
1094 peek!(updates, writes, tasks, rpcs, input)
1095 },
1096 n = to_rt.recv_many(&mut input, 100000) => {
1097 if n == 0 {
1098 break 'main Ok(())
1099 }
1100 peek!(updates, writes, tasks, rpcs);
1101 },
1102 () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
1103 while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
1104 if ts.elapsed() >= Duration::from_secs(1) {
1105 self.ctx.user.pending_unsubscribe.pop_front();
1106 } else {
1107 break
1108 }
1109 }
1110 continue 'main
1111 },
1112 }
1113 let mut batch = BATCH.take();
1114 self.process_input_batch(&mut tasks, &mut input, &mut batch).await;
1115 self.do_cycle(
1116 updates, writes, &mut tasks, &mut rpcs, &mut to_rt, &mut input, batch,
1117 )
1118 .await;
1119 if !self.ctx.user.rpc_clients.is_empty() {
1120 if now - self.last_rpc_gc >= onemin {
1121 self.last_rpc_gc = now;
1122 self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
1123 }
1124 }
1125 }
1126 }
1127}
1128
1129#[derive(Clone)]
1132pub struct GXHandle(tmpsc::UnboundedSender<ToRt>);
1133
1134impl GXHandle {
1135 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
1136 let (tx, rx) = oneshot::channel();
1137 self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
1138 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
1139 }
1140
1141 pub async fn get_env(&self) -> Result<Env<GXCtx, NoUserEvent>> {
1143 self.exec(|res| ToRt::GetEnv { res }).await
1144 }
1145
1146 pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
1152 Ok(self.exec(|tx| ToRt::Compile { text, res: tx, rt: self.clone() }).await??)
1153 }
1154
1155 pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
1163 Ok(self.exec(|tx| ToRt::Load { path, res: tx, rt: self.clone() }).await??)
1164 }
1165
1166 pub async fn compile_callable(&self, id: Value) -> Result<Callable> {
1170 Ok(self
1171 .exec(|tx| ToRt::CompileCallable { id, rt: self.clone(), res: tx })
1172 .await??)
1173 }
1174
1175 pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref> {
1179 Ok(self
1180 .exec(|tx| ToRt::CompileRef { id: id.into(), res: tx, rt: self.clone() })
1181 .await??)
1182 }
1183
1184 pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
1187 let v = v.into();
1188 self.0.send(ToRt::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
1189 }
1190}
1191
1192#[derive(Builder)]
1193#[builder(pattern = "owned")]
1194pub struct GXConfig {
1195 #[builder(setter(strip_option), default)]
1199 resolve_timeout: Option<Duration>,
1200 #[builder(setter(strip_option), default)]
1202 publish_timeout: Option<Duration>,
1203 ctx: ExecCtx<GXCtx, NoUserEvent>,
1205 #[builder(setter(strip_option), default)]
1207 root: Option<ArcStr>,
1208 #[builder(default)]
1210 resolvers: Vec<ModuleResolver>,
1211 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1213}
1214
1215impl GXConfig {
1216 pub fn builder(
1218 ctx: ExecCtx<GXCtx, NoUserEvent>,
1219 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1220 ) -> GXConfigBuilder {
1221 GXConfigBuilder::default().ctx(ctx).sub(sub)
1222 }
1223
1224 pub async fn start(self) -> Result<GXHandle> {
1235 let (init_tx, init_rx) = oneshot::channel();
1236 let (tx, rx) = tmpsc::unbounded_channel();
1237 task::spawn(async move {
1238 match GX::new(self).await {
1239 Ok(bs) => {
1240 let _ = init_tx.send(Ok(()));
1241 if let Err(e) = bs.run(rx).await {
1242 error!("run loop exited with error {e:?}")
1243 }
1244 }
1245 Err(e) => {
1246 let _ = init_tx.send(Err(e));
1247 }
1248 };
1249 });
1250 init_rx.await??;
1251 Ok(GXHandle(tx))
1252 }
1253}