1use anyhow::{anyhow, bail, Context, Result};
13use arcstr::{literal, ArcStr};
14use chrono::prelude::*;
15use combine::stream::position::SourcePosition;
16use compact_str::format_compact;
17use core::fmt;
18use derive_builder::Builder;
19use futures::{channel::mpsc, future::try_join_all, FutureExt, StreamExt};
20use fxhash::{FxBuildHasher, FxHashMap};
21use graphix_compiler::{
22 compile,
23 env::Env,
24 expr::{
25 self, Expr, ExprId, ExprKind, ModPath, ModuleKind, ModuleResolver, Origin, Source,
26 },
27 node::genn,
28 typ::{FnType, Type},
29 BindId, Ctx, Event, ExecCtx, LambdaId, NoUserEvent, Node, REFS,
30};
31use indexmap::IndexMap;
32use log::{debug, error, info};
33use netidx::{
34 path::Path,
35 pool::{Pool, Pooled},
36 protocol::valarray::ValArray,
37 publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
38 resolver_client::ChangeTracker,
39 subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
40};
41use netidx_core::atomic_id;
42use netidx_protocols::rpc::{
43 self,
44 server::{ArgSpec, RpcCall},
45};
46use serde_derive::{Deserialize, Serialize};
47use smallvec::{smallvec, SmallVec};
48use std::{
49 collections::{hash_map::Entry, HashMap, VecDeque},
50 future, mem,
51 os::unix::ffi::OsStrExt,
52 panic::{catch_unwind, AssertUnwindSafe},
53 path::{Component, PathBuf},
54 result,
55 sync::{LazyLock, Weak},
56 time::Duration,
57};
58use tokio::{
59 fs, select,
60 sync::{
61 mpsc::{self as tmpsc, error::SendTimeoutError, UnboundedReceiver},
62 oneshot, Mutex,
63 },
64 task::{self, JoinError, JoinSet},
65 time::{self, Instant},
66};
67use triomphe::Arc;
68
69type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
70type WriteBatch = Pooled<Vec<WriteRequest>>;
71
72#[derive(Debug)]
73pub struct CouldNotResolve;
74
75impl fmt::Display for CouldNotResolve {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 write!(f, "could not resolve module")
78 }
79}
80
81#[derive(Debug)]
82struct RpcClient {
83 proc: rpc::client::Proc,
84 last_used: Instant,
85}
86
87#[derive(Debug)]
88pub struct GXCtx {
89 by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
90 subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
91 published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
92 var_updates: VecDeque<(BindId, Value)>,
93 net_updates: VecDeque<(SubId, subscriber::Event)>,
94 net_writes: VecDeque<(Id, WriteRequest)>,
95 rpc_overflow: VecDeque<(BindId, RpcCall)>,
96 rpc_clients: FxHashMap<Path, RpcClient>,
97 published_rpcs: FxHashMap<Path, rpc::server::Proc>,
98 pending_unsubscribe: VecDeque<(Instant, Dval)>,
99 change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
100 tasks: JoinSet<(BindId, Value)>,
101 batch: publisher::UpdateBatch,
102 publisher: Publisher,
103 subscriber: Subscriber,
104 updates_tx: mpsc::Sender<UpdateBatch>,
105 updates: mpsc::Receiver<UpdateBatch>,
106 writes_tx: mpsc::Sender<WriteBatch>,
107 writes: mpsc::Receiver<WriteBatch>,
108 rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
109 rpcs: mpsc::Receiver<(BindId, RpcCall)>,
110}
111
112impl GXCtx {
113 pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
114 let (updates_tx, updates) = mpsc::channel(100);
115 let (writes_tx, writes) = mpsc::channel(100);
116 let (rpcs_tx, rpcs) = mpsc::channel(100);
117 let batch = publisher.start_batch();
118 let mut tasks = JoinSet::new();
119 tasks.spawn(async { future::pending().await });
120 Self {
121 by_ref: HashMap::default(),
122 var_updates: VecDeque::new(),
123 net_updates: VecDeque::new(),
124 net_writes: VecDeque::new(),
125 rpc_overflow: VecDeque::new(),
126 rpc_clients: HashMap::default(),
127 subscribed: HashMap::default(),
128 pending_unsubscribe: VecDeque::new(),
129 published: HashMap::default(),
130 change_trackers: HashMap::default(),
131 published_rpcs: HashMap::default(),
132 tasks,
133 batch,
134 publisher,
135 subscriber,
136 updates,
137 updates_tx,
138 writes,
139 writes_tx,
140 rpcs_tx,
141 rpcs,
142 }
143 }
144}
145
146macro_rules! or_err {
147 ($bindid:expr, $e:expr) => {
148 match $e {
149 Ok(v) => v,
150 Err(e) => {
151 let e = ArcStr::from(format_compact!("{e:?}").as_str());
152 let e = Value::Error(e);
153 return ($bindid, e);
154 }
155 }
156 };
157}
158
159macro_rules! check_changed {
160 ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
161 let mut ct = $ct.lock().await;
162 if ct.path() != &$path {
163 *ct = ChangeTracker::new($path.clone());
164 }
165 if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
166 return ($id, Value::Null);
167 }
168 };
169}
170
171impl Ctx for GXCtx {
172 fn clear(&mut self) {
173 let Self {
174 by_ref,
175 var_updates,
176 net_updates,
177 net_writes,
178 rpc_clients,
179 rpc_overflow,
180 subscribed,
181 published,
182 published_rpcs,
183 pending_unsubscribe,
184 change_trackers,
185 tasks,
186 batch,
187 publisher,
188 subscriber: _,
189 updates_tx,
190 updates,
191 writes_tx,
192 writes,
193 rpcs,
194 rpcs_tx,
195 } = self;
196 by_ref.clear();
197 var_updates.clear();
198 net_updates.clear();
199 net_writes.clear();
200 rpc_overflow.clear();
201 rpc_clients.clear();
202 subscribed.clear();
203 published.clear();
204 published_rpcs.clear();
205 pending_unsubscribe.clear();
206 change_trackers.clear();
207 *tasks = JoinSet::new();
208 tasks.spawn(async { future::pending().await });
209 *batch = publisher.start_batch();
210 let (tx, rx) = mpsc::channel(3);
211 *updates_tx = tx;
212 *updates = rx;
213 let (tx, rx) = mpsc::channel(100);
214 *writes_tx = tx;
215 *writes = rx;
216 let (tx, rx) = mpsc::channel(100);
217 *rpcs_tx = tx;
218 *rpcs = rx
219 }
220
221 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
222 let now = Instant::now();
223 let proc = match self.rpc_clients.entry(name) {
224 Entry::Occupied(mut e) => {
225 let cl = e.get_mut();
226 cl.last_used = now;
227 Ok(cl.proc.clone())
228 }
229 Entry::Vacant(e) => {
230 match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
231 Err(e) => Err(e),
232 Ok(proc) => {
233 let cl = RpcClient { last_used: now, proc: proc.clone() };
234 e.insert(cl);
235 Ok(proc)
236 }
237 }
238 }
239 };
240 self.tasks.spawn(async move {
241 macro_rules! err {
242 ($e:expr) => {{
243 let e = format_compact!("{:?}", $e);
244 (id, Value::Error(e.as_str().into()))
245 }};
246 }
247 match proc {
248 Err(e) => err!(e),
249 Ok(proc) => match proc.call(args).await {
250 Err(e) => err!(e),
251 Ok(res) => (id, res),
252 },
253 }
254 });
255 }
256
257 fn publish_rpc(
258 &mut self,
259 name: Path,
260 doc: Value,
261 spec: Vec<ArgSpec>,
262 id: BindId,
263 ) -> Result<()> {
264 use rpc::server::Proc;
265 let e = match self.published_rpcs.entry(name) {
266 Entry::Vacant(e) => e,
267 Entry::Occupied(_) => bail!("already published"),
268 };
269 let proc = Proc::new(
270 &self.publisher,
271 e.key().clone(),
272 doc,
273 spec,
274 move |c| Some((id, c)),
275 Some(self.rpcs_tx.clone()),
276 )?;
277 e.insert(proc);
278 Ok(())
279 }
280
281 fn unpublish_rpc(&mut self, name: Path) {
282 self.published_rpcs.remove(&name);
283 }
284
285 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
286 let dval =
287 self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
288 *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
289 dval
290 }
291
292 fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
293 if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
294 if let Some(cn) = exprs.get_mut(&ref_by) {
295 *cn -= 1;
296 if *cn == 0 {
297 exprs.remove(&ref_by);
298 }
299 }
300 if exprs.is_empty() {
301 self.subscribed.remove(&dv.id());
302 }
303 }
304 self.pending_unsubscribe.push_back((Instant::now(), dv));
305 }
306
307 fn list(&mut self, id: BindId, path: Path) {
308 let ct = self
309 .change_trackers
310 .entry(id)
311 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
312 let ct = Arc::clone(ct);
313 let resolver = self.subscriber.resolver();
314 self.tasks.spawn(async move {
315 check_changed!(id, resolver, path, ct);
316 let mut paths = or_err!(id, resolver.list(path).await);
317 let paths = paths.drain(..).map(|p| Value::String(p.into()));
318 (id, Value::Array(ValArray::from_iter_exact(paths)))
319 });
320 }
321
322 fn list_table(&mut self, id: BindId, path: Path) {
323 let ct = self
324 .change_trackers
325 .entry(id)
326 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
327 let ct = Arc::clone(ct);
328 let resolver = self.subscriber.resolver();
329 self.tasks.spawn(async move {
330 check_changed!(id, resolver, path, ct);
331 let mut tbl = or_err!(id, resolver.table(path).await);
332 let cols = tbl.cols.drain(..).map(|(name, count)| {
333 Value::Array(ValArray::from([
334 Value::String(name.into()),
335 Value::V64(count.0),
336 ]))
337 });
338 let cols = Value::Array(ValArray::from_iter_exact(cols));
339 let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
340 let rows = Value::Array(ValArray::from_iter_exact(rows));
341 let tbl = Value::Array(ValArray::from([
342 Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
343 Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
344 ]));
345 (id, tbl)
346 });
347 }
348
349 fn stop_list(&mut self, id: BindId) {
350 self.change_trackers.remove(&id);
351 }
352
353 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
354 let val = self.publisher.publish_with_flags_and_writes(
355 PublishFlags::empty(),
356 path,
357 value,
358 Some(self.writes_tx.clone()),
359 )?;
360 let id = val.id();
361 *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
362 Ok(val)
363 }
364
365 fn update(&mut self, val: &Val, value: Value) {
366 val.update(&mut self.batch, value);
367 }
368
369 fn unpublish(&mut self, val: Val, ref_by: ExprId) {
370 if let Some(refs) = self.published.get_mut(&val.id()) {
371 if let Some(cn) = refs.get_mut(&ref_by) {
372 *cn -= 1;
373 if *cn == 0 {
374 refs.remove(&ref_by);
375 }
376 }
377 if refs.is_empty() {
378 self.published.remove(&val.id());
379 }
380 }
381 }
382
383 fn set_timer(&mut self, id: BindId, timeout: Duration) {
384 self.tasks
385 .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
386 }
387
388 fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
389 *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
390 }
391
392 fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
393 if let Some(refs) = self.by_ref.get_mut(&id) {
394 if let Some(cn) = refs.get_mut(&ref_by) {
395 *cn -= 1;
396 if *cn == 0 {
397 refs.remove(&ref_by);
398 }
399 }
400 if refs.is_empty() {
401 self.by_ref.remove(&id);
402 }
403 }
404 }
405
406 fn set_var(&mut self, id: BindId, value: Value) {
407 self.var_updates.push_back((id, value.clone()));
408 }
409}
410
411fn is_output(n: &Node<GXCtx, NoUserEvent>) -> bool {
412 match &n.spec().kind {
413 ExprKind::Bind { .. }
414 | ExprKind::Lambda { .. }
415 | ExprKind::Use { .. }
416 | ExprKind::Connect { .. }
417 | ExprKind::Module { .. }
418 | ExprKind::TypeDef { .. } => false,
419 _ => true,
420 }
421}
422
423async fn or_never(b: bool) {
424 if !b {
425 future::pending().await
426 }
427}
428
429async fn join_or_wait(
430 js: &mut JoinSet<(BindId, Value)>,
431) -> result::Result<(BindId, Value), JoinError> {
432 match js.join_next().await {
433 None => future::pending().await,
434 Some(r) => r,
435 }
436}
437
438async fn maybe_next<T>(go: bool, ch: &mut mpsc::Receiver<T>) -> T {
439 if go {
440 match ch.next().await {
441 None => future::pending().await,
442 Some(v) => v,
443 }
444 } else {
445 future::pending().await
446 }
447}
448
449async fn unsubscribe_ready(pending: &VecDeque<(Instant, Dval)>, now: Instant) {
450 if pending.len() == 0 {
451 future::pending().await
452 } else {
453 let (ts, _) = pending.front().unwrap();
454 let one = Duration::from_secs(1);
455 let elapsed = now - *ts;
456 if elapsed < one {
457 time::sleep(one - elapsed).await
458 }
459 }
460}
461
462pub struct CompExp {
463 pub id: ExprId,
464 pub typ: Type,
465 pub output: bool,
466 rt: GXHandle,
467}
468
469impl Drop for CompExp {
470 fn drop(&mut self) {
471 let _ = self.rt.0.send(ToRt::Delete { id: self.id });
472 }
473}
474
475pub struct CompRes {
476 pub exprs: SmallVec<[CompExp; 1]>,
477 pub env: Env<GXCtx, NoUserEvent>,
478}
479
480#[derive(Clone)]
481pub enum RtEvent {
482 Updated(ExprId, Value),
483 Env(Env<GXCtx, NoUserEvent>),
484}
485
486static BATCH: LazyLock<Pool<Vec<RtEvent>>> = LazyLock::new(|| Pool::new(10, 1000000));
487
488pub struct Ref {
489 pub id: ExprId,
490 pub last: Option<Value>,
491 pub bid: BindId,
492 pub target_bid: Option<BindId>,
493 rt: GXHandle,
494}
495
496impl Drop for Ref {
497 fn drop(&mut self) {
498 let _ = self.rt.0.send(ToRt::Delete { id: self.id });
499 }
500}
501
502impl Ref {
503 pub fn set(&self, v: Value) -> Result<()> {
504 self.rt.set(self.bid, v)
505 }
506
507 pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
508 if let Some(id) = self.target_bid {
509 self.rt.set(id, v)?
510 }
511 Ok(())
512 }
513}
514
515atomic_id!(CallableId);
516
517pub struct Callable {
518 rt: GXHandle,
519 id: CallableId,
520 env: Env<GXCtx, NoUserEvent>,
521 pub typ: FnType,
522 pub expr: ExprId,
523}
524
525impl Drop for Callable {
526 fn drop(&mut self) {
527 let _ = self.rt.0.send(ToRt::DeleteCallable { id: self.id });
528 }
529}
530
531impl Callable {
532 pub async fn call(&self, args: ValArray) -> Result<()> {
535 if self.typ.args.len() != args.len() {
536 bail!("expected {} args", self.typ.args.len())
537 }
538 for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
539 if !a.typ.is_a(&self.env, v) {
540 bail!("type mismatch arg {i} expected {}", a.typ)
541 }
542 }
543 self.call_unchecked(args).await
544 }
545
546 pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
550 self.rt
551 .0
552 .send(ToRt::Call { id: self.id, args })
553 .map_err(|_| anyhow!("runtime is dead"))
554 }
555}
556
557enum ToRt {
558 GetEnv { res: oneshot::Sender<Env<GXCtx, NoUserEvent>> },
559 Delete { id: ExprId },
560 Load { path: PathBuf, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
561 Compile { text: ArcStr, rt: GXHandle, res: oneshot::Sender<Result<CompRes>> },
562 CompileCallable { id: Value, rt: GXHandle, res: oneshot::Sender<Result<Callable>> },
563 CompileRef { id: BindId, rt: GXHandle, res: oneshot::Sender<Result<Ref>> },
564 Set { id: BindId, v: Value },
565 Call { id: CallableId, args: ValArray },
566 DeleteCallable { id: CallableId },
567}
568
569struct CallableInt {
570 expr: ExprId,
571 args: Box<[BindId]>,
572}
573
574struct GX {
575 ctx: ExecCtx<GXCtx, NoUserEvent>,
576 event: Event<NoUserEvent>,
577 updated: FxHashMap<ExprId, bool>,
578 nodes: IndexMap<ExprId, Node<GXCtx, NoUserEvent>, FxBuildHasher>,
579 callables: FxHashMap<CallableId, CallableInt>,
580 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
581 resolvers: Arc<[ModuleResolver]>,
582 publish_timeout: Option<Duration>,
583 last_rpc_gc: Instant,
584}
585
586impl GX {
587 async fn new(mut rt: GXConfig) -> Result<Self> {
588 let resolvers_default = |r: &mut Vec<ModuleResolver>| match dirs::data_dir() {
589 None => r.push(ModuleResolver::Files("".into())),
590 Some(dd) => {
591 r.push(ModuleResolver::Files("".into()));
592 r.push(ModuleResolver::Files(dd.join("graphix")));
593 }
594 };
595 match std::env::var("GRAPHIX_MODPATH") {
596 Err(_) => resolvers_default(&mut rt.resolvers),
597 Ok(mp) => match ModuleResolver::parse_env(
598 rt.ctx.user.subscriber.clone(),
599 rt.resolve_timeout,
600 &mp,
601 ) {
602 Ok(r) => rt.resolvers.extend(r),
603 Err(e) => {
604 error!("failed to parse GRAPHIX_MODPATH, using default {e:?}");
605 resolvers_default(&mut rt.resolvers)
606 }
607 },
608 };
609 let mut t = Self {
610 ctx: rt.ctx,
611 event: Event::new(NoUserEvent),
612 updated: HashMap::default(),
613 nodes: IndexMap::default(),
614 callables: HashMap::default(),
615 sub: rt.sub,
616 resolvers: Arc::from(rt.resolvers),
617 publish_timeout: rt.publish_timeout,
618 last_rpc_gc: Instant::now(),
619 };
620 let st = Instant::now();
621 if let Some(root) = rt.root {
622 t.compile_root(root).await?;
623 }
624 info!("root init time: {:?}", st.elapsed());
625 Ok(t)
626 }
627
628 async fn do_cycle(
629 &mut self,
630 updates: Option<UpdateBatch>,
631 writes: Option<WriteBatch>,
632 tasks: &mut Vec<(BindId, Value)>,
633 rpcs: &mut Vec<(BindId, RpcCall)>,
634 to_rt: &mut UnboundedReceiver<ToRt>,
635 input: &mut Vec<ToRt>,
636 mut batch: Pooled<Vec<RtEvent>>,
637 ) {
638 macro_rules! push_event {
639 ($id:expr, $v:expr, $event:ident, $refed:ident, $overflow:ident) => {
640 match self.event.$event.entry($id) {
641 Entry::Vacant(e) => {
642 e.insert($v);
643 if let Some(exps) = self.ctx.user.$refed.get(&$id) {
644 for id in exps.keys() {
645 self.updated.entry(*id).or_insert(false);
646 }
647 }
648 }
649 Entry::Occupied(_) => {
650 self.ctx.user.$overflow.push_back(($id, $v));
651 }
652 }
653 };
654 }
655 for _ in 0..self.ctx.user.var_updates.len() {
656 let (id, v) = self.ctx.user.var_updates.pop_front().unwrap();
657 push_event!(id, v, variables, by_ref, var_updates)
658 }
659 for (id, v) in tasks.drain(..) {
660 push_event!(id, v, variables, by_ref, var_updates)
661 }
662 for _ in 0..self.ctx.user.rpc_overflow.len() {
663 let (id, v) = self.ctx.user.rpc_overflow.pop_front().unwrap();
664 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
665 }
666 for (id, v) in rpcs.drain(..) {
667 push_event!(id, v, rpc_calls, by_ref, rpc_overflow)
668 }
669 for _ in 0..self.ctx.user.net_updates.len() {
670 let (id, v) = self.ctx.user.net_updates.pop_front().unwrap();
671 push_event!(id, v, netidx, subscribed, net_updates)
672 }
673 if let Some(mut updates) = updates {
674 for (id, v) in updates.drain(..) {
675 push_event!(id, v, netidx, subscribed, net_updates)
676 }
677 }
678 for _ in 0..self.ctx.user.net_writes.len() {
679 let (id, v) = self.ctx.user.net_writes.pop_front().unwrap();
680 push_event!(id, v, writes, published, net_writes)
681 }
682 if let Some(mut writes) = writes {
683 for wr in writes.drain(..) {
684 let id = wr.id;
685 push_event!(id, wr, writes, published, net_writes)
686 }
687 }
688 for (id, n) in self.nodes.iter_mut() {
689 if let Some(init) = self.updated.get(id) {
690 let mut clear: SmallVec<[BindId; 16]> = smallvec![];
691 self.event.init = *init;
692 if self.event.init {
693 REFS.with_borrow_mut(|refs| {
694 refs.clear();
695 n.refs(refs);
696 refs.with_external_refs(|id| {
697 if let Some(v) = self.ctx.cached.get(&id) {
698 if let Entry::Vacant(e) = self.event.variables.entry(id) {
699 e.insert(v.clone());
700 clear.push(id);
701 }
702 }
703 });
704 });
705 }
706 let res = catch_unwind(AssertUnwindSafe(|| {
707 n.update(&mut self.ctx, &mut self.event)
708 }));
709 for id in clear {
710 self.event.variables.remove(&id);
711 }
712 match res {
713 Ok(None) => (),
714 Ok(Some(v)) => batch.push(RtEvent::Updated(*id, v)),
715 Err(e) => {
716 error!("could not update exprid: {id:?}, panic: {e:?}")
717 }
718 }
719 }
720 }
721 loop {
722 match self.sub.send_timeout(batch, Duration::from_millis(100)).await {
723 Ok(()) => break,
724 Err(SendTimeoutError::Closed(_)) => {
725 error!("could not send batch");
726 break;
727 }
728 Err(SendTimeoutError::Timeout(b)) => {
729 batch = b;
730 while let Ok(m) = to_rt.try_recv() {
732 input.push(m);
733 }
734 self.process_input_batch(tasks, input, &mut batch).await;
735 }
736 }
737 }
738 self.event.clear();
739 self.updated.clear();
740 if self.ctx.user.batch.len() > 0 {
741 let batch = mem::replace(
742 &mut self.ctx.user.batch,
743 self.ctx.user.publisher.start_batch(),
744 );
745 let timeout = self.publish_timeout;
746 task::spawn(async move { batch.commit(timeout).await });
747 }
748 }
749
750 async fn process_input_batch(
751 &mut self,
752 tasks: &mut Vec<(BindId, Value)>,
753 input: &mut Vec<ToRt>,
754 batch: &mut Pooled<Vec<RtEvent>>,
755 ) {
756 for m in input.drain(..) {
757 match m {
758 ToRt::GetEnv { res } => {
759 let _ = res.send(self.ctx.env.clone());
760 }
761 ToRt::Compile { text, rt, res } => {
762 let _ = res.send(self.compile(rt, text).await);
763 }
764 ToRt::Load { path, rt, res } => {
765 let _ = res.send(self.load(rt, &path).await);
766 }
767 ToRt::Delete { id } => {
768 if let Some(mut n) = self.nodes.shift_remove(&id) {
769 n.delete(&mut self.ctx);
770 }
771 debug!("delete {id:?}");
772 batch.push(RtEvent::Env(self.ctx.env.clone()));
773 }
774 ToRt::CompileCallable { id, rt, res } => {
775 let _ = res.send(self.compile_callable(id, rt));
776 }
777 ToRt::CompileRef { id, rt, res } => {
778 let _ = res.send(self.compile_ref(rt, id));
779 }
780 ToRt::Set { id, v } => {
781 self.ctx.cached.insert(id, v.clone());
782 tasks.push((id, v))
783 }
784 ToRt::DeleteCallable { id } => self.delete_callable(id),
785 ToRt::Call { id, args } => {
786 if let Err(e) = self.call_callable(id, args, tasks) {
787 error!("calling callable {id:?} failed with {e:?}")
788 }
789 }
790 }
791 }
792 }
793
794 fn cycle_ready(&self) -> bool {
795 self.ctx.user.var_updates.len() > 0
796 || self.ctx.user.net_updates.len() > 0
797 || self.ctx.user.net_writes.len() > 0
798 || self.ctx.user.rpc_overflow.len() > 0
799 }
800
801 async fn compile_root(&mut self, text: ArcStr) -> Result<()> {
802 let scope = ModPath::root();
803 let ori = Origin { parent: None, source: Source::Unspecified, text };
804 let exprs =
805 expr::parser::parse(ori.clone()).context("parsing the root module")?;
806 let exprs =
807 try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
808 .await
809 .context(CouldNotResolve)?;
810 let nodes = exprs
811 .iter()
812 .map(|e| {
813 compile(&mut self.ctx, &scope, e.clone())
814 .with_context(|| format!("compiling root expression {e}"))
815 })
816 .collect::<Result<SmallVec<[_; 4]>>>()
817 .with_context(|| ori.clone())?;
818 for (e, n) in exprs.iter().zip(nodes.into_iter()) {
819 self.updated.insert(e.id, true);
820 self.nodes.insert(e.id, n);
821 }
822 Ok(())
823 }
824
825 async fn compile(&mut self, rt: GXHandle, text: ArcStr) -> Result<CompRes> {
826 let scope = ModPath::root();
827 let ori = Origin { parent: None, source: Source::Unspecified, text };
828 let exprs = expr::parser::parse(ori.clone())?;
829 let exprs =
830 try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
831 .await
832 .context(CouldNotResolve)?;
833 let nodes = exprs
834 .iter()
835 .map(|e| compile(&mut self.ctx, &scope, e.clone()))
836 .collect::<Result<SmallVec<[_; 4]>>>()
837 .with_context(|| ori.clone())?;
838 let exprs = exprs
839 .iter()
840 .zip(nodes.into_iter())
841 .map(|(e, n)| {
842 let output = is_output(&n);
843 let typ = n.typ().clone();
844 self.updated.insert(e.id, true);
845 self.nodes.insert(e.id, n);
846 CompExp { id: e.id, output, typ, rt: rt.clone() }
847 })
848 .collect::<SmallVec<[_; 1]>>();
849 Ok(CompRes { exprs, env: self.ctx.env.clone() })
850 }
851
852 async fn load(&mut self, rt: GXHandle, file: &PathBuf) -> Result<CompRes> {
853 let scope = ModPath::root();
854 let st = Instant::now();
855 let (ori, exprs) = match file.extension() {
856 Some(e) if e.as_bytes() == b"gx" => {
857 let file = file.canonicalize()?;
858 let s = fs::read_to_string(&file).await?;
859 let s = if s.starts_with("#!") {
860 if let Some(i) = s.find('\n') {
861 &s[i..]
862 } else {
863 s.as_str()
864 }
865 } else {
866 s.as_str()
867 };
868 let ori = Origin {
869 parent: None,
870 source: Source::File(file),
871 text: ArcStr::from(s),
872 };
873 (ori.clone(), expr::parser::parse(ori)?)
874 }
875 Some(e) => bail!("invalid file extension {e:?}"),
876 None => {
877 let name = file
878 .components()
879 .map(|c| match c {
880 Component::RootDir
881 | Component::CurDir
882 | Component::ParentDir
883 | Component::Prefix(_) => bail!("invalid module name {file:?}"),
884 Component::Normal(s) => Ok(s),
885 })
886 .collect::<Result<Box<[_]>>>()?;
887 if name.len() != 1 {
888 bail!("invalid module name {file:?}")
889 }
890 let name = String::from_utf8_lossy(name[0].as_bytes());
891 let name = name
892 .parse::<ModPath>()
893 .with_context(|| "parsing module name {file:?}")?;
894 let name = Path::basename(&*name)
895 .ok_or_else(|| anyhow!("invalid module name {file:?}"))?;
896 let name = ArcStr::from(name);
897 let ori = Origin {
898 parent: None,
899 source: Source::Internal(name.clone()),
900 text: literal!(""),
901 };
902 let kind = ExprKind::Module {
903 export: true,
904 name,
905 value: ModuleKind::Unresolved,
906 };
907 let exprs = Arc::from(vec![Expr {
908 id: ExprId::new(),
909 ori: Arc::new(ori.clone()),
910 pos: SourcePosition::default(),
911 kind,
912 }]);
913 (ori, exprs)
914 }
915 };
916 info!("parse time: {:?}", st.elapsed());
917 let st = Instant::now();
918 let exprs =
919 try_join_all(exprs.iter().map(|e| e.resolve_modules(&self.resolvers)))
920 .await
921 .context(CouldNotResolve)?;
922 info!("resolve time: {:?}", st.elapsed());
923 let mut res = smallvec![];
924 for e in exprs.iter() {
925 let top_id = e.id;
926 let n =
927 compile(&mut self.ctx, &scope, e.clone()).with_context(|| ori.clone())?;
928 let has_out = is_output(&n);
929 let typ = n.typ().clone();
930 self.nodes.insert(top_id, n);
931 self.updated.insert(top_id, true);
932 res.push(CompExp { id: top_id, output: has_out, typ, rt: rt.clone() })
933 }
934 Ok(CompRes { exprs: res, env: self.ctx.env.clone() })
935 }
936
937 fn compile_callable(&mut self, id: Value, rt: GXHandle) -> Result<Callable> {
938 let id = match id {
939 Value::U64(id) => LambdaId::from(id),
940 v => bail!("invalid lambda id {v}"),
941 };
942 let lb = self.ctx.env.lambdas.get(&id).and_then(Weak::upgrade);
943 let lb = lb.ok_or_else(|| anyhow!("unknown lambda {id:?}"))?;
944 let args = lb.typ.args.iter();
945 let args = args
946 .map(|a| {
947 if a.label.as_ref().map(|(_, opt)| *opt).unwrap_or(false) {
948 bail!("can't call lambda with an optional argument from rust")
949 } else {
950 Ok(BindId::new())
951 }
952 })
953 .collect::<Result<Box<[_]>>>()?;
954 let eid = ExprId::new();
955 let argn = lb.typ.args.iter().zip(args.iter());
956 let argn = argn
957 .map(|(arg, id)| genn::reference(&mut self.ctx, *id, arg.typ.clone(), eid))
958 .collect::<Vec<_>>();
959 let fnode = genn::constant(Value::U64(id.inner()));
960 let mut n = genn::apply(fnode, argn, lb.typ.clone(), eid);
961 self.event.init = true;
962 n.update(&mut self.ctx, &mut self.event);
963 self.event.clear();
964 let cid = CallableId::new();
965 self.callables.insert(cid, CallableInt { expr: eid, args });
966 self.nodes.insert(eid, n);
967 let env = self.ctx.env.clone();
968 Ok(Callable { expr: eid, rt, env, id: cid, typ: (*lb.typ).clone() })
969 }
970
971 fn compile_ref(&mut self, rt: GXHandle, id: BindId) -> Result<Ref> {
972 let eid = ExprId::new();
973 let typ = Type::Any;
974 let n = genn::reference(&mut self.ctx, id, typ, eid);
975 self.nodes.insert(eid, n);
976 let target_bid = self.ctx.env.byref_chain.get(&id).copied();
977 Ok(Ref {
978 id: eid,
979 bid: id,
980 target_bid,
981 last: self.ctx.cached.get(&id).cloned(),
982 rt,
983 })
984 }
985
986 fn call_callable(
987 &mut self,
988 id: CallableId,
989 args: ValArray,
990 tasks: &mut Vec<(BindId, Value)>,
991 ) -> Result<()> {
992 let c =
993 self.callables.get(&id).ok_or_else(|| anyhow!("unknown callable {id:?}"))?;
994 if args.len() != c.args.len() {
995 bail!("expected {} arguments", c.args.len());
996 }
997 let a = c.args.iter().zip(args.iter()).map(|(id, v)| (*id, v.clone()));
998 tasks.extend(a);
999 Ok(())
1000 }
1001
1002 fn delete_callable(&mut self, id: CallableId) {
1003 if let Some(c) = self.callables.remove(&id) {
1004 if let Some(mut n) = self.nodes.shift_remove(&c.expr) {
1005 n.delete(&mut self.ctx)
1006 }
1007 }
1008 }
1009
1010 async fn run(mut self, mut to_rt: tmpsc::UnboundedReceiver<ToRt>) -> Result<()> {
1011 let mut tasks = vec![];
1012 let mut input = vec![];
1013 let mut rpcs = vec![];
1014 let onemin = Duration::from_secs(60);
1015 'main: loop {
1016 let now = Instant::now();
1017 let ready = self.cycle_ready();
1018 let mut updates = None;
1019 let mut writes = None;
1020 macro_rules! peek {
1021 (updates) => {
1022 if self.ctx.user.net_updates.is_empty() {
1023 while let Ok(Some(mut up)) = self.ctx.user.updates.try_next() {
1024 match &mut updates {
1025 None => updates = Some(up),
1026 Some(prev) => prev.extend(up.drain(..)),
1027 }
1028 }
1029 }
1030 };
1031 (writes) => {
1032 if self.ctx.user.net_writes.is_empty() {
1033 if let Ok(Some(wr)) = self.ctx.user.writes.try_next() {
1034 writes = Some(wr);
1035 }
1036 }
1037 };
1038 (tasks) => {
1039 while let Some(Ok(up)) = self.ctx.user.tasks.try_join_next() {
1040 tasks.push(up);
1041 }
1042 };
1043 (rpcs) => {
1044 if self.ctx.user.rpc_overflow.is_empty() {
1045 while let Ok(Some(up)) = self.ctx.user.rpcs.try_next() {
1046 rpcs.push(up);
1047 }
1048 }
1049 };
1050 (input) => {
1051 while let Ok(m) = to_rt.try_recv() {
1052 input.push(m);
1053 }
1054 };
1055 ($($item:tt),+) => {{
1056 $(peek!($item));+
1057 }};
1058 }
1059 select! {
1060 rp = maybe_next(
1061 self.ctx.user.rpc_overflow.is_empty(),
1062 &mut self.ctx.user.rpcs
1063 ) => {
1064 rpcs.push(rp);
1065 peek!(updates, tasks, writes, rpcs, input)
1066 }
1067 wr = maybe_next(
1068 self.ctx.user.net_writes.is_empty(),
1069 &mut self.ctx.user.writes
1070 ) => {
1071 writes = Some(wr);
1072 peek!(updates, tasks, rpcs, input);
1073 },
1074 up = maybe_next(
1075 self.ctx.user.net_updates.is_empty(),
1076 &mut self.ctx.user.updates
1077 ) => {
1078 updates = Some(up);
1079 peek!(updates, writes, tasks, rpcs, input);
1080 },
1081 up = join_or_wait(&mut self.ctx.user.tasks) => {
1082 if let Ok(up) = up {
1083 tasks.push(up);
1084 }
1085 peek!(updates, writes, tasks, rpcs, input)
1086 },
1087 _ = or_never(ready) => {
1088 peek!(updates, writes, tasks, rpcs, input)
1089 },
1090 n = to_rt.recv_many(&mut input, 100000) => {
1091 if n == 0 {
1092 break 'main Ok(())
1093 }
1094 peek!(updates, writes, tasks, rpcs);
1095 },
1096 () = unsubscribe_ready(&self.ctx.user.pending_unsubscribe, now) => {
1097 while let Some((ts, _)) = self.ctx.user.pending_unsubscribe.front() {
1098 if ts.elapsed() >= Duration::from_secs(1) {
1099 self.ctx.user.pending_unsubscribe.pop_front();
1100 } else {
1101 break
1102 }
1103 }
1104 continue 'main
1105 },
1106 }
1107 let mut batch = BATCH.take();
1108 self.process_input_batch(&mut tasks, &mut input, &mut batch).await;
1109 self.do_cycle(
1110 updates, writes, &mut tasks, &mut rpcs, &mut to_rt, &mut input, batch,
1111 )
1112 .await;
1113 if !self.ctx.user.rpc_clients.is_empty() {
1114 if now - self.last_rpc_gc >= onemin {
1115 self.last_rpc_gc = now;
1116 self.ctx.user.rpc_clients.retain(|_, c| now - c.last_used <= onemin);
1117 }
1118 }
1119 }
1120 }
1121}
1122
1123#[derive(Clone)]
1126pub struct GXHandle(tmpsc::UnboundedSender<ToRt>);
1127
1128impl GXHandle {
1129 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToRt>(&self, f: F) -> Result<R> {
1130 let (tx, rx) = oneshot::channel();
1131 self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
1132 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
1133 }
1134
1135 pub async fn get_env(&self) -> Result<Env<GXCtx, NoUserEvent>> {
1137 self.exec(|res| ToRt::GetEnv { res }).await
1138 }
1139
1140 pub async fn compile(&self, text: ArcStr) -> Result<CompRes> {
1146 Ok(self.exec(|tx| ToRt::Compile { text, res: tx, rt: self.clone() }).await??)
1147 }
1148
1149 pub async fn load(&self, path: PathBuf) -> Result<CompRes> {
1157 Ok(self.exec(|tx| ToRt::Load { path, res: tx, rt: self.clone() }).await??)
1158 }
1159
1160 pub async fn compile_callable(&self, id: Value) -> Result<Callable> {
1164 Ok(self
1165 .exec(|tx| ToRt::CompileCallable { id, rt: self.clone(), res: tx })
1166 .await??)
1167 }
1168
1169 pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref> {
1173 Ok(self
1174 .exec(|tx| ToRt::CompileRef { id: id.into(), res: tx, rt: self.clone() })
1175 .await??)
1176 }
1177
1178 pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
1181 let v = v.into();
1182 self.0.send(ToRt::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
1183 }
1184}
1185
1186#[derive(Builder)]
1187#[builder(pattern = "owned")]
1188pub struct GXConfig {
1189 #[builder(setter(strip_option), default)]
1193 resolve_timeout: Option<Duration>,
1194 #[builder(setter(strip_option), default)]
1196 publish_timeout: Option<Duration>,
1197 ctx: ExecCtx<GXCtx, NoUserEvent>,
1199 #[builder(setter(strip_option), default)]
1201 root: Option<ArcStr>,
1202 #[builder(default)]
1204 resolvers: Vec<ModuleResolver>,
1205 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1207}
1208
1209impl GXConfig {
1210 pub fn builder(
1212 ctx: ExecCtx<GXCtx, NoUserEvent>,
1213 sub: tmpsc::Sender<Pooled<Vec<RtEvent>>>,
1214 ) -> GXConfigBuilder {
1215 GXConfigBuilder::default().ctx(ctx).sub(sub)
1216 }
1217
1218 pub async fn start(self) -> Result<GXHandle> {
1229 let (init_tx, init_rx) = oneshot::channel();
1230 let (tx, rx) = tmpsc::unbounded_channel();
1231 task::spawn(async move {
1232 match GX::new(self).await {
1233 Ok(bs) => {
1234 let _ = init_tx.send(Ok(()));
1235 if let Err(e) = bs.run(rx).await {
1236 error!("run loop exited with error {e:?}")
1237 }
1238 }
1239 Err(e) => {
1240 let _ = init_tx.send(Err(e));
1241 }
1242 };
1243 });
1244 init_rx.await??;
1245 Ok(GXHandle(tx))
1246 }
1247}