1use anyhow::{anyhow, bail, Result};
2use arcstr::{literal, ArcStr};
3use compact_str::format_compact;
4use graphix_compiler::{
5 err, errf, expr::ExprId, node::genn, typ::Type, Apply, BindId, BuiltIn, Event,
6 ExecCtx, LambdaId, Node, Rt, Scope, UserEvent,
7};
8use graphix_package_core::{arity1, arity2, deftype, CachedVals};
9use graphix_rt::GXRt;
10use netidx::{
11 path::Path,
12 publisher::Val,
13 subscriber::{self, Dval, UpdatesFlags, Value},
14};
15use netidx_core::utils::Either;
16use netidx_protocols::rpc::server::{self, ArgSpec};
17use netidx_value::ValArray;
18use smallvec::{smallvec, SmallVec};
19use std::collections::VecDeque;
20use triomphe::Arc as TArc;
21
22fn as_path(v: Value) -> Option<Path> {
23 match v.cast_to::<String>() {
24 Err(_) => None,
25 Ok(p) => {
26 if Path::is_absolute(&p) {
27 Some(Path::from(p))
28 } else {
29 None
30 }
31 }
32 }
33}
34
35#[derive(Debug)]
36struct Write {
37 args: CachedVals,
38 top_id: ExprId,
39 dv: Either<(Path, Dval), Vec<Value>>,
40}
41
42impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Write {
43 const NAME: &str = "net_write";
44 deftype!("fn(string, Any) -> Result<_, `WriteError(string)>");
45
46 fn init<'a, 'b, 'c>(
47 _ctx: &'a mut ExecCtx<R, E>,
48 _typ: &'a graphix_compiler::typ::FnType,
49 _scope: &'b Scope,
50 from: &'c [Node<R, E>],
51 top_id: ExprId,
52 ) -> Result<Box<dyn Apply<R, E>>> {
53 Ok(Box::new(Write {
54 args: CachedVals::new(from),
55 dv: Either::Right(vec![]),
56 top_id,
57 }))
58 }
59}
60
61impl<R: Rt, E: UserEvent> Apply<R, E> for Write {
62 fn update(
63 &mut self,
64 ctx: &mut ExecCtx<R, E>,
65 from: &mut [Node<R, E>],
66 event: &mut Event<E>,
67 ) -> Option<Value> {
68 fn set(dv: &mut Either<(Path, Dval), Vec<Value>>, val: &Value) {
69 match dv {
70 Either::Right(q) => q.push(val.clone()),
71 Either::Left((_, dv)) => {
72 dv.write(val.clone());
73 }
74 }
75 }
76 let mut up = [false; 2];
77 self.args.update_diff(&mut up, ctx, from, event);
78 let ((path, value), (path_up, value_up)) = arity2!(self.args.0, &up);
79 match ((path, value), (path_up, value_up)) {
80 ((_, _), (false, false)) => (),
81 ((_, Some(val)), (false, true)) => set(&mut self.dv, val),
82 ((_, None), (false, true)) => (),
83 ((None, Some(val)), (true, true)) => set(&mut self.dv, val),
84 ((Some(path), Some(val)), (true, true)) if self.same_path(path) => {
85 set(&mut self.dv, val)
86 }
87 ((Some(path), _), (true, false)) if self.same_path(path) => (),
88 ((None, _), (true, false)) => (),
89 ((None, None), (_, _)) => (),
90 ((Some(path), val), (true, _)) => match as_path(path.clone()) {
91 None => {
92 if let Either::Left(_) = &self.dv {
93 self.dv = Either::Right(vec![]);
94 }
95 let e = errf!(literal!("WriteError"), "invalid path {path:?}");
96 return Some(Value::Error(TArc::new(e)));
97 }
98 Some(path) => {
99 let dv = ctx.rt.subscribe(
100 UpdatesFlags::empty(),
101 path.clone(),
102 self.top_id,
103 );
104 match &mut self.dv {
105 Either::Left(_) => (),
106 Either::Right(q) => {
107 for v in q.drain(..) {
108 dv.write(v);
109 }
110 }
111 }
112 self.dv = Either::Left((path, dv));
113 if let Some(val) = val {
114 set(&mut self.dv, val)
115 }
116 }
117 },
118 }
119 None
120 }
121
122 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
123 if let Either::Left((path, dv)) = &self.dv {
124 ctx.rt.unsubscribe(path.clone(), dv.clone(), self.top_id)
125 }
126 self.dv = Either::Right(vec![])
127 }
128
129 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
130 self.args.clear();
131 match &mut self.dv {
132 Either::Left((path, dv)) => {
133 ctx.rt.unsubscribe(path.clone(), dv.clone(), self.top_id);
134 self.dv = Either::Right(vec![])
135 }
136 Either::Right(_) => (),
137 }
138 }
139}
140
141impl Write {
142 fn same_path(&self, new_path: &Value) -> bool {
143 match (new_path, &self.dv) {
144 (Value::String(p0), Either::Left((p1, _))) => &**p0 == &**p1,
145 _ => false,
146 }
147 }
148}
149
150#[derive(Debug)]
151struct Subscribe {
152 args: CachedVals,
153 cur: Option<(Path, Dval)>,
154 top_id: ExprId,
155}
156
157impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Subscribe {
158 const NAME: &str = "net_subscribe";
159 deftype!("fn(string) -> Result<Primitive, `SubscribeError(string)>");
160
161 fn init<'a, 'b, 'c>(
162 _ctx: &'a mut ExecCtx<R, E>,
163 _typ: &'a graphix_compiler::typ::FnType,
164 _scope: &'b Scope,
165 from: &'c [Node<R, E>],
166 top_id: ExprId,
167 ) -> Result<Box<dyn Apply<R, E>>> {
168 Ok(Box::new(Subscribe { args: CachedVals::new(from), cur: None, top_id }))
169 }
170}
171
172impl<R: Rt, E: UserEvent> Apply<R, E> for Subscribe {
173 fn update(
174 &mut self,
175 ctx: &mut ExecCtx<R, E>,
176 from: &mut [Node<R, E>],
177 event: &mut Event<E>,
178 ) -> Option<Value> {
179 static ERR_TAG: ArcStr = literal!("SubscribeError");
180 let mut up = [false; 1];
181 self.args.update_diff(&mut up, ctx, from, event);
182 let (path, path_up) = arity1!(self.args.0, &up);
183 match (path, path_up) {
184 (Some(_), false) | (None, false) => (),
185 (None, true) => {
186 if let Some((path, dv)) = self.cur.take() {
187 ctx.rt.unsubscribe(path, dv, self.top_id)
188 }
189 return None;
190 }
191 (Some(Value::String(path)), true)
192 if self.cur.as_ref().map(|(p, _)| &**p) != Some(&*path) =>
193 {
194 if let Some((path, dv)) = self.cur.take() {
195 ctx.rt.unsubscribe(path, dv, self.top_id)
196 }
197 let path = Path::from(path);
198 if !Path::is_absolute(&path) {
199 return Some(err!(ERR_TAG, "expected absolute path"));
200 }
201 let dval = ctx.rt.subscribe(
202 UpdatesFlags::BEGIN_WITH_LAST,
203 path.clone(),
204 self.top_id,
205 );
206 self.cur = Some((path, dval));
207 }
208 (Some(Value::String(_)), true) => (),
209 (Some(v), true) => {
210 return Some(errf!(ERR_TAG, "invalid path {v}, expected string"))
211 }
212 }
213 self.cur.as_ref().and_then(|(_, dv)| {
214 event.netidx.get(&dv.id()).map(|e| match e {
215 subscriber::Event::Unsubscribed => Value::error(literal!("unsubscribed")),
216 subscriber::Event::Update(v) => v.clone(),
217 })
218 })
219 }
220
221 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
222 if let Some((path, dv)) = self.cur.take() {
223 ctx.rt.unsubscribe(path, dv, self.top_id)
224 }
225 }
226
227 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
228 self.args.clear();
229 if let Some((path, dv)) = self.cur.take() {
230 ctx.rt.unsubscribe(path, dv, self.top_id);
231 }
232 }
233}
234
235#[derive(Debug)]
236struct RpcCall {
237 args: CachedVals,
238 top_id: ExprId,
239 id: BindId,
240}
241
242impl<R: Rt, E: UserEvent> BuiltIn<R, E> for RpcCall {
243 const NAME: &str = "net_call";
244 deftype!("fn(string, Array<(string, Any)>) -> Result<Primitive, `RpcError(string)>");
245
246 fn init<'a, 'b, 'c>(
247 ctx: &'a mut ExecCtx<R, E>,
248 _typ: &'a graphix_compiler::typ::FnType,
249 _scope: &'b Scope,
250 from: &'c [Node<R, E>],
251 top_id: ExprId,
252 ) -> Result<Box<dyn Apply<R, E>>> {
253 let id = BindId::new();
254 ctx.rt.ref_var(id, top_id);
255 Ok(Box::new(RpcCall { args: CachedVals::new(from), top_id, id }))
256 }
257}
258
259impl<R: Rt, E: UserEvent> Apply<R, E> for RpcCall {
260 fn update(
261 &mut self,
262 ctx: &mut ExecCtx<R, E>,
263 from: &mut [Node<R, E>],
264 event: &mut Event<E>,
265 ) -> Option<Value> {
266 fn parse_args(
267 path: &Value,
268 args: &Value,
269 ) -> Result<(Path, Vec<(ArcStr, Value)>)> {
270 let path = as_path(path.clone()).ok_or_else(|| anyhow!("invalid path"))?;
271 let args = match args {
272 Value::Array(args) => args
273 .iter()
274 .map(|v| match v {
275 Value::Array(p) => match &**p {
276 [Value::String(name), value] => {
277 Ok((name.clone(), value.clone()))
278 }
279 _ => Err(anyhow!("rpc args expected [name, value] pair")),
280 },
281 _ => Err(anyhow!("rpc args expected [name, value] pair")),
282 })
283 .collect::<Result<Vec<_>>>()?,
284 _ => bail!("rpc args expected to be an array"),
285 };
286 Ok((path, args))
287 }
288 let mut up = [false; 2];
289 self.args.update_diff(&mut up, ctx, from, event);
290 let ((path, args), (path_up, args_up)) = arity2!(self.args.0, &up);
291 match ((path, args), (path_up, args_up)) {
292 ((Some(path), Some(args)), (_, true))
293 | ((Some(path), Some(args)), (true, _)) => match parse_args(path, args) {
294 Err(e) => return Some(errf!(literal!("RpcError"), "{e}")),
295 Ok((path, args)) => ctx.rt.call_rpc(path, args, self.id),
296 },
297 ((None, _), (_, _)) | ((_, None), (_, _)) | ((_, _), (false, false)) => (),
298 }
299 event.variables.get(&self.id).cloned()
300 }
301
302 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
303 ctx.rt.unref_var(self.id, self.top_id)
304 }
305
306 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
307 ctx.rt.unref_var(self.id, self.top_id);
308 self.id = BindId::new();
309 ctx.rt.ref_var(self.id, self.top_id);
310 self.args.clear()
311 }
312}
313
314macro_rules! list {
315 ($name:ident, $builtin:literal, $method:ident, $typ:literal) => {
316 #[derive(Debug)]
317 struct $name {
318 args: CachedVals,
319 current: Option<Path>,
320 id: BindId,
321 top_id: ExprId,
322 }
323
324 impl<R: Rt, E: UserEvent> BuiltIn<R, E> for $name {
325 const NAME: &str = $builtin;
326 deftype!($typ);
327
328 fn init<'a, 'b, 'c>(
329 ctx: &'a mut ExecCtx<R, E>,
330 _typ: &'a graphix_compiler::typ::FnType,
331 _scope: &'b Scope,
332 from: &'c [Node<R, E>],
333 top_id: ExprId,
334 ) -> Result<Box<dyn Apply<R, E>>> {
335 let id = BindId::new();
336 ctx.rt.ref_var(id, top_id);
337 Ok(Box::new($name {
338 args: CachedVals::new(from),
339 current: None,
340 top_id,
341 id,
342 }))
343 }
344 }
345
346 impl<R: Rt, E: UserEvent> Apply<R, E> for $name {
347 fn update(
348 &mut self,
349 ctx: &mut ExecCtx<R, E>,
350 from: &mut [Node<R, E>],
351 event: &mut Event<E>,
352 ) -> Option<Value> {
353 let mut up = [false; 2];
354 self.args.update_diff(&mut up, ctx, from, event);
355 let ((_, path), (trigger_up, path_up)) = arity2!(self.args.0, &up);
356 match (path, path_up, trigger_up) {
357 (Some(Value::String(path)), true, _)
358 if self
359 .current
360 .as_ref()
361 .map(|p| &**p != &**path)
362 .unwrap_or(true) =>
363 {
364 let path = Path::from(path);
365 self.current = Some(path.clone());
366 ctx.rt.$method(self.id, path);
367 }
368 (Some(Value::String(path)), _, true) => {
369 ctx.rt.$method(self.id, Path::from(path));
370 }
371 _ => (),
372 }
373 event.variables.get(&self.id).and_then(|v| match v {
374 Value::Null => None,
375 Value::Error(e) => Some(errf!(literal!("ListError"), "{e}")),
376 v => Some(v.clone()),
377 })
378 }
379
380 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
381 ctx.rt.unref_var(self.id, self.top_id);
382 ctx.rt.stop_list(self.id);
383 }
384
385 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
386 ctx.rt.unref_var(self.id, self.top_id);
387 ctx.rt.stop_list(self.id);
388 self.id = BindId::new();
389 ctx.rt.ref_var(self.id, self.top_id);
390 self.current = None;
391 self.args.clear();
392 }
393 }
394 };
395}
396
397list!(
398 List,
399 "net_list",
400 list,
401 "fn(?#update:Any, string) -> Result<Array<string>, `ListError(string)>"
402);
403
404list!(
405 ListTable,
406 "net_list_table",
407 list_table,
408 "fn(?#update:Any, string) -> Result<Table, `ListError(string)>"
409);
410
411#[derive(Debug)]
412struct Publish<R: Rt, E: UserEvent> {
413 args: CachedVals,
414 current: Option<(Path, Val)>,
415 top_id: ExprId,
416 x: BindId,
417 pid: BindId,
418 on_write: Node<R, E>,
419}
420
421impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Publish<R, E> {
422 const NAME: &str = "net_publish";
423 deftype!(
424 "fn(?#on_write:fn(Any) -> _ throws 'e, string, Any) -> Result<_, `PublishError(string)> throws 'e"
425 );
426
427 fn init<'a, 'b, 'c>(
428 ctx: &'a mut ExecCtx<R, E>,
429 typ: &'a graphix_compiler::typ::FnType,
430 scope: &'b Scope,
431 from: &'c [Node<R, E>],
432 top_id: ExprId,
433 ) -> Result<Box<dyn Apply<R, E>>> {
434 match from {
435 [_, _, _] => {
436 let scope =
437 scope.append(&format_compact!("fn{}", LambdaId::new().inner()));
438 let pid = BindId::new();
439 let mftyp = match &typ.args[0].typ {
440 Type::Fn(ft) => ft.clone(),
441 t => bail!("expected function not {t}"),
442 };
443 let (x, xn) = genn::bind(ctx, &scope.lexical, "x", Type::Any, top_id);
444 let fnode = genn::reference(ctx, pid, Type::Fn(mftyp.clone()), top_id);
445 let on_write = genn::apply(fnode, scope, vec![xn], &mftyp, top_id);
446 Ok(Box::new(Publish {
447 args: CachedVals::new(from),
448 current: None,
449 top_id,
450 pid,
451 x,
452 on_write,
453 }))
454 }
455 _ => bail!("expected three arguments"),
456 }
457 }
458}
459
460impl<R: Rt, E: UserEvent> Apply<R, E> for Publish<R, E> {
461 fn update(
462 &mut self,
463 ctx: &mut ExecCtx<R, E>,
464 from: &mut [Node<R, E>],
465 event: &mut Event<E>,
466 ) -> Option<Value> {
467 macro_rules! publish {
468 ($path:expr, $v:expr) => {{
469 let path = Path::from($path.clone());
470 match ctx.rt.publish(path.clone(), $v.clone(), self.top_id) {
471 Err(e) => {
472 let msg: ArcStr = format_compact!("{e:?}").as_str().into();
473 let e: Value = (literal!("PublishError"), msg).into();
474 return Some(Value::Error(TArc::new(e)));
475 }
476 Ok(id) => {
477 self.current = Some((path, id));
478 }
479 }
480 }};
481 }
482 let mut up = [false; 3];
483 self.args.update_diff(&mut up, ctx, from, event);
484 if up[0] {
485 if let Some(v) = self.args.0[0].clone() {
486 ctx.cached.insert(self.pid, v.clone());
487 event.variables.insert(self.pid, v);
488 }
489 }
490 match (&up[1..], &self.args.0[1..]) {
491 ([true, _], [Some(Value::String(path)), Some(v)])
492 if self.current.as_ref().map(|(p, _)| &**p != path).unwrap_or(true) =>
493 {
494 if let Some((_, id)) = self.current.take() {
495 ctx.rt.unpublish(id, self.top_id);
496 }
497 publish!(path, v)
498 }
499 ([_, true], [Some(Value::String(path)), Some(v)]) => match &self.current {
500 Some((_, val)) => ctx.rt.update(val, v.clone()),
501 None => publish!(path, v),
502 },
503 _ => (),
504 }
505 let mut reply = None;
506 if let Some((_, val)) = &self.current {
507 if let Some(req) = event.writes.remove(&val.id()) {
508 ctx.cached.insert(self.x, req.value.clone());
509 event.variables.insert(self.x, req.value);
510 reply = req.send_result;
511 }
512 }
513 if let Some(v) = self.on_write.update(ctx, event) {
514 if let Some(reply) = reply {
515 reply.send(v)
516 }
517 }
518 None
519 }
520
521 fn typecheck(
522 &mut self,
523 ctx: &mut ExecCtx<R, E>,
524 _from: &mut [Node<R, E>],
525 ) -> anyhow::Result<()> {
526 self.on_write.typecheck(ctx)
527 }
528
529 fn refs(&self, refs: &mut graphix_compiler::Refs) {
530 self.on_write.refs(refs)
531 }
532
533 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
534 if let Some((_, val)) = self.current.take() {
535 ctx.rt.unpublish(val, self.top_id);
536 }
537 ctx.cached.remove(&self.pid);
538 ctx.cached.remove(&self.x);
539 ctx.env.unbind_variable(self.x);
540 self.on_write.delete(ctx);
541 }
542
543 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
544 if let Some((_, val)) = self.current.take() {
545 ctx.rt.unpublish(val, self.top_id);
546 }
547 self.args.clear();
548 self.on_write.sleep(ctx);
549 }
550}
551
552#[derive(Debug)]
553struct PublishRpc<R: Rt, E: UserEvent> {
554 args: CachedVals,
555 id: BindId,
556 top_id: ExprId,
557 f: Node<R, E>,
558 pid: BindId,
559 x: BindId,
560 queue: VecDeque<server::RpcCall>,
561 argbuf: SmallVec<[(ArcStr, Value); 6]>,
562 ready: bool,
563 current: Option<Path>,
564}
565
566impl<R: Rt, E: UserEvent> BuiltIn<R, E> for PublishRpc<R, E> {
567 const NAME: &str = "net_publish_rpc";
568 deftype!(
569 r#"fn(
570 #path:string,
571 #doc:string,
572 #spec:Array<ArgSpec>,
573 #f:fn(Array<(string, Any)>) -> Any throws 'e
574 ) -> Result<_, `PublishRpcError(string)> throws 'e"#
575 );
576
577 fn init<'a, 'b, 'c>(
578 ctx: &'a mut ExecCtx<R, E>,
579 typ: &'a graphix_compiler::typ::FnType,
580 scope: &'b Scope,
581 from: &'c [Node<R, E>],
582 top_id: ExprId,
583 ) -> Result<Box<dyn Apply<R, E>>> {
584 match from {
585 [_, _, _, _] => {
586 let scope =
587 scope.append(&format_compact!("fn{}", LambdaId::new().inner()));
588 let id = BindId::new();
589 ctx.rt.ref_var(id, top_id);
590 let pid = BindId::new();
591 let mftyp = match &typ.args[3].typ {
592 Type::Fn(ft) => ft.clone(),
593 t => bail!("expected a function not {t}"),
594 };
595 let (x, xn) = genn::bind(
596 ctx,
597 &scope.lexical,
598 "x",
599 mftyp.args[0].typ.clone(),
600 top_id,
601 );
602 let fnode = genn::reference(ctx, pid, Type::Fn(mftyp.clone()), top_id);
603 let f = genn::apply(fnode, scope, vec![xn], &mftyp, top_id);
604 Ok(Box::new(PublishRpc {
605 queue: VecDeque::new(),
606 args: CachedVals::new(from),
607 x,
608 id,
609 top_id,
610 f,
611 pid,
612 argbuf: smallvec![],
613 ready: true,
614 current: None,
615 }))
616 }
617 _ => bail!("expected four arguments"),
618 }
619 }
620}
621
622impl<R: Rt, E: UserEvent> Apply<R, E> for PublishRpc<R, E> {
623 fn update(
624 &mut self,
625 ctx: &mut ExecCtx<R, E>,
626 from: &mut [Node<R, E>],
627 event: &mut Event<E>,
628 ) -> Option<Value> {
629 macro_rules! snd {
630 ($pair:expr) => {
631 match $pair {
632 Value::Array(p) => p[1].clone(),
633 _ => unreachable!(),
634 }
635 };
636 }
637 let mut changed = [false; 4];
638 self.args.update_diff(&mut changed, ctx, from, event);
639 if changed[3] {
640 if let Some(v) = self.args.0[3].clone() {
641 ctx.cached.insert(self.pid, v.clone());
642 event.variables.insert(self.pid, v);
643 }
644 }
645 if changed[0] || changed[1] || changed[2] {
646 if let Some(path) = self.current.take() {
647 ctx.rt.unpublish_rpc(path);
648 }
649 if let (Some(Value::String(path)), Some(doc), Some(Value::Array(spec))) =
650 (&self.args.0[0], &self.args.0[1], &self.args.0[2])
651 {
652 let path = Path::from(path);
653 let spec = spec
654 .iter()
655 .map(|r| match r {
656 Value::Array(r) => {
657 let default_value = snd!(&r[0]);
658 let doc = snd!(&r[1]);
659 let name = snd!(&r[2]).get_as::<ArcStr>().unwrap();
660 ArgSpec { name, doc, default_value }
661 }
662 _ => unreachable!(),
663 })
664 .collect::<Vec<_>>();
665 if let Err(e) =
666 ctx.rt.publish_rpc(path.clone(), doc.clone(), spec, self.id)
667 {
668 let e: ArcStr = format_compact!("{e:?}").as_str().into();
669 let e: Value = (literal!("PublishRpcError"), e).into();
670 return Some(Value::Error(TArc::new(e)));
671 }
672 self.current = Some(path);
673 }
674 }
675 macro_rules! set {
676 ($c:expr) => {{
677 self.ready = false;
678 self.argbuf.extend($c.args.iter().map(|(n, v)| (n.clone(), v.clone())));
679 self.argbuf.sort_by_key(|(n, _)| n.clone());
680 let args =
681 ValArray::from_iter_exact(self.argbuf.drain(..).map(|(n, v)| {
682 Value::Array(ValArray::from([Value::String(n), v]))
683 }));
684 ctx.cached.insert(self.x, Value::Array(args.clone()));
685 event.variables.insert(self.x, Value::Array(args));
686 }};
687 }
688 if let Some(c) = event.rpc_calls.remove(&self.id) {
689 self.queue.push_back(c);
690 }
691 if self.ready && self.queue.len() > 0 {
692 if let Some(c) = self.queue.front() {
693 set!(c)
694 }
695 }
696 loop {
697 match self.f.update(ctx, event) {
698 None => break None,
699 Some(v) => {
700 self.ready = true;
701 if let Some(mut call) = self.queue.pop_front() {
702 call.reply.send(v);
703 }
704 match self.queue.front() {
705 Some(c) => set!(c),
706 None => break None,
707 }
708 }
709 }
710 }
711 }
712
713 fn typecheck(
714 &mut self,
715 ctx: &mut ExecCtx<R, E>,
716 _from: &mut [Node<R, E>],
717 ) -> Result<()> {
718 self.f.typecheck(ctx)
719 }
720
721 fn refs(&self, refs: &mut graphix_compiler::Refs) {
722 self.f.refs(refs)
723 }
724
725 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
726 ctx.rt.unref_var(self.id, self.top_id);
727 if let Some(path) = self.current.take() {
728 ctx.rt.unpublish_rpc(path);
729 }
730 ctx.cached.remove(&self.x);
731 ctx.env.unbind_variable(self.x);
732 ctx.cached.remove(&self.pid);
733 self.f.delete(ctx);
734 }
735
736 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
737 ctx.rt.unref_var(self.id, self.top_id);
738 self.id = BindId::new();
739 ctx.rt.ref_var(self.id, self.top_id);
740 if let Some(path) = self.current.take() {
741 ctx.rt.unpublish_rpc(path);
742 }
743 self.args.clear();
744 self.queue.clear();
745 self.argbuf.clear();
746 self.ready = true;
747 self.f.sleep(ctx);
748 }
749}
750
751graphix_derive::defpackage! {
752 builtins => [
753 Write,
754 Subscribe,
755 RpcCall,
756 List,
757 ListTable,
758 Publish as Publish<GXRt<X>, X::UserEvent>,
759 PublishRpc as PublishRpc<GXRt<X>, X::UserEvent>,
760 ],
761}