1#![doc(
2 html_logo_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg",
3 html_favicon_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg"
4)]
5use anyhow::{anyhow, bail, Result};
6use arcstr::{literal, ArcStr};
7use compact_str::format_compact;
8use graphix_compiler::{
9 err, errf, expr::ExprId, node::genn, typ::Type, Apply, BindId, BuiltIn, Event,
10 ExecCtx, LambdaId, Node, Rt, Scope, UserEvent,
11};
12use graphix_package_core::{arity1, arity2, deftype, CachedVals};
13use graphix_rt::GXRt;
14use netidx::{
15 path::Path,
16 publisher::Val,
17 subscriber::{self, Dval, UpdatesFlags, Value},
18};
19use netidx_core::utils::Either;
20use netidx_protocols::rpc::server::{self, ArgSpec};
21use netidx_value::ValArray;
22use smallvec::{smallvec, SmallVec};
23use std::collections::VecDeque;
24use triomphe::Arc as TArc;
25
26fn as_path(v: Value) -> Option<Path> {
27 match v.cast_to::<String>() {
28 Err(_) => None,
29 Ok(p) => {
30 if Path::is_absolute(&p) {
31 Some(Path::from(p))
32 } else {
33 None
34 }
35 }
36 }
37}
38
39#[derive(Debug)]
40struct Write {
41 args: CachedVals,
42 top_id: ExprId,
43 dv: Either<(Path, Dval), Vec<Value>>,
44}
45
46impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Write {
47 const NAME: &str = "net_write";
48 deftype!("fn(string, Any) -> Result<_, `WriteError(string)>");
49
50 fn init<'a, 'b, 'c>(
51 _ctx: &'a mut ExecCtx<R, E>,
52 _typ: &'a graphix_compiler::typ::FnType,
53 _scope: &'b Scope,
54 from: &'c [Node<R, E>],
55 top_id: ExprId,
56 ) -> Result<Box<dyn Apply<R, E>>> {
57 Ok(Box::new(Write {
58 args: CachedVals::new(from),
59 dv: Either::Right(vec![]),
60 top_id,
61 }))
62 }
63}
64
65impl<R: Rt, E: UserEvent> Apply<R, E> for Write {
66 fn update(
67 &mut self,
68 ctx: &mut ExecCtx<R, E>,
69 from: &mut [Node<R, E>],
70 event: &mut Event<E>,
71 ) -> Option<Value> {
72 fn set(dv: &mut Either<(Path, Dval), Vec<Value>>, val: &Value) {
73 match dv {
74 Either::Right(q) => q.push(val.clone()),
75 Either::Left((_, dv)) => {
76 dv.write(val.clone());
77 }
78 }
79 }
80 let mut up = [false; 2];
81 self.args.update_diff(&mut up, ctx, from, event);
82 let ((path, value), (path_up, value_up)) = arity2!(self.args.0, &up);
83 match ((path, value), (path_up, value_up)) {
84 ((_, _), (false, false)) => (),
85 ((_, Some(val)), (false, true)) => set(&mut self.dv, val),
86 ((_, None), (false, true)) => (),
87 ((None, Some(val)), (true, true)) => set(&mut self.dv, val),
88 ((Some(path), Some(val)), (true, true)) if self.same_path(path) => {
89 set(&mut self.dv, val)
90 }
91 ((Some(path), _), (true, false)) if self.same_path(path) => (),
92 ((None, _), (true, false)) => (),
93 ((None, None), (_, _)) => (),
94 ((Some(path), val), (true, _)) => match as_path(path.clone()) {
95 None => {
96 if let Either::Left(_) = &self.dv {
97 self.dv = Either::Right(vec![]);
98 }
99 let e = errf!(literal!("WriteError"), "invalid path {path:?}");
100 return Some(Value::Error(TArc::new(e)));
101 }
102 Some(path) => {
103 let dv = ctx.rt.subscribe(
104 UpdatesFlags::empty(),
105 path.clone(),
106 self.top_id,
107 );
108 match &mut self.dv {
109 Either::Left(_) => (),
110 Either::Right(q) => {
111 for v in q.drain(..) {
112 dv.write(v);
113 }
114 }
115 }
116 self.dv = Either::Left((path, dv));
117 if let Some(val) = val {
118 set(&mut self.dv, val)
119 }
120 }
121 },
122 }
123 None
124 }
125
126 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
127 if let Either::Left((path, dv)) = &self.dv {
128 ctx.rt.unsubscribe(path.clone(), dv.clone(), self.top_id)
129 }
130 self.dv = Either::Right(vec![])
131 }
132
133 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
134 self.args.clear();
135 match &mut self.dv {
136 Either::Left((path, dv)) => {
137 ctx.rt.unsubscribe(path.clone(), dv.clone(), self.top_id);
138 self.dv = Either::Right(vec![])
139 }
140 Either::Right(_) => (),
141 }
142 }
143}
144
145impl Write {
146 fn same_path(&self, new_path: &Value) -> bool {
147 match (new_path, &self.dv) {
148 (Value::String(p0), Either::Left((p1, _))) => &**p0 == &**p1,
149 _ => false,
150 }
151 }
152}
153
154#[derive(Debug)]
155struct Subscribe {
156 args: CachedVals,
157 cur: Option<(Path, Dval)>,
158 top_id: ExprId,
159}
160
161impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Subscribe {
162 const NAME: &str = "net_subscribe";
163 deftype!("fn(string) -> Result<Primitive, `SubscribeError(string)>");
164
165 fn init<'a, 'b, 'c>(
166 _ctx: &'a mut ExecCtx<R, E>,
167 _typ: &'a graphix_compiler::typ::FnType,
168 _scope: &'b Scope,
169 from: &'c [Node<R, E>],
170 top_id: ExprId,
171 ) -> Result<Box<dyn Apply<R, E>>> {
172 Ok(Box::new(Subscribe { args: CachedVals::new(from), cur: None, top_id }))
173 }
174}
175
176impl<R: Rt, E: UserEvent> Apply<R, E> for Subscribe {
177 fn update(
178 &mut self,
179 ctx: &mut ExecCtx<R, E>,
180 from: &mut [Node<R, E>],
181 event: &mut Event<E>,
182 ) -> Option<Value> {
183 static ERR_TAG: ArcStr = literal!("SubscribeError");
184 let mut up = [false; 1];
185 self.args.update_diff(&mut up, ctx, from, event);
186 let (path, path_up) = arity1!(self.args.0, &up);
187 match (path, path_up) {
188 (Some(_), false) | (None, false) => (),
189 (None, true) => {
190 if let Some((path, dv)) = self.cur.take() {
191 ctx.rt.unsubscribe(path, dv, self.top_id)
192 }
193 return None;
194 }
195 (Some(Value::String(path)), true)
196 if self.cur.as_ref().map(|(p, _)| &**p) != Some(&*path) =>
197 {
198 if let Some((path, dv)) = self.cur.take() {
199 ctx.rt.unsubscribe(path, dv, self.top_id)
200 }
201 let path = Path::from(path);
202 if !Path::is_absolute(&path) {
203 return Some(err!(ERR_TAG, "expected absolute path"));
204 }
205 let dval = ctx.rt.subscribe(
206 UpdatesFlags::BEGIN_WITH_LAST,
207 path.clone(),
208 self.top_id,
209 );
210 self.cur = Some((path, dval));
211 }
212 (Some(Value::String(_)), true) => (),
213 (Some(v), true) => {
214 return Some(errf!(ERR_TAG, "invalid path {v}, expected string"))
215 }
216 }
217 self.cur.as_ref().and_then(|(_, dv)| {
218 event.netidx.get(&dv.id()).map(|e| match e {
219 subscriber::Event::Unsubscribed => Value::error(literal!("unsubscribed")),
220 subscriber::Event::Update(v) => v.clone(),
221 })
222 })
223 }
224
225 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
226 if let Some((path, dv)) = self.cur.take() {
227 ctx.rt.unsubscribe(path, dv, self.top_id)
228 }
229 }
230
231 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
232 self.args.clear();
233 if let Some((path, dv)) = self.cur.take() {
234 ctx.rt.unsubscribe(path, dv, self.top_id);
235 }
236 }
237}
238
239#[derive(Debug)]
240struct RpcCall {
241 args: CachedVals,
242 top_id: ExprId,
243 id: BindId,
244}
245
246impl<R: Rt, E: UserEvent> BuiltIn<R, E> for RpcCall {
247 const NAME: &str = "net_call";
248 deftype!("fn(string, Array<(string, Any)>) -> Result<Primitive, `RpcError(string)>");
249
250 fn init<'a, 'b, 'c>(
251 ctx: &'a mut ExecCtx<R, E>,
252 _typ: &'a graphix_compiler::typ::FnType,
253 _scope: &'b Scope,
254 from: &'c [Node<R, E>],
255 top_id: ExprId,
256 ) -> Result<Box<dyn Apply<R, E>>> {
257 let id = BindId::new();
258 ctx.rt.ref_var(id, top_id);
259 Ok(Box::new(RpcCall { args: CachedVals::new(from), top_id, id }))
260 }
261}
262
263impl<R: Rt, E: UserEvent> Apply<R, E> for RpcCall {
264 fn update(
265 &mut self,
266 ctx: &mut ExecCtx<R, E>,
267 from: &mut [Node<R, E>],
268 event: &mut Event<E>,
269 ) -> Option<Value> {
270 fn parse_args(
271 path: &Value,
272 args: &Value,
273 ) -> Result<(Path, Vec<(ArcStr, Value)>)> {
274 let path = as_path(path.clone()).ok_or_else(|| anyhow!("invalid path"))?;
275 let args = match args {
276 Value::Array(args) => args
277 .iter()
278 .map(|v| match v {
279 Value::Array(p) => match &**p {
280 [Value::String(name), value] => {
281 Ok((name.clone(), value.clone()))
282 }
283 _ => Err(anyhow!("rpc args expected [name, value] pair")),
284 },
285 _ => Err(anyhow!("rpc args expected [name, value] pair")),
286 })
287 .collect::<Result<Vec<_>>>()?,
288 _ => bail!("rpc args expected to be an array"),
289 };
290 Ok((path, args))
291 }
292 let mut up = [false; 2];
293 self.args.update_diff(&mut up, ctx, from, event);
294 let ((path, args), (path_up, args_up)) = arity2!(self.args.0, &up);
295 match ((path, args), (path_up, args_up)) {
296 ((Some(path), Some(args)), (_, true))
297 | ((Some(path), Some(args)), (true, _)) => match parse_args(path, args) {
298 Err(e) => return Some(errf!(literal!("RpcError"), "{e}")),
299 Ok((path, args)) => ctx.rt.call_rpc(path, args, self.id),
300 },
301 ((None, _), (_, _)) | ((_, None), (_, _)) | ((_, _), (false, false)) => (),
302 }
303 event.variables.get(&self.id).cloned()
304 }
305
306 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
307 ctx.rt.unref_var(self.id, self.top_id)
308 }
309
310 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
311 ctx.rt.unref_var(self.id, self.top_id);
312 self.id = BindId::new();
313 ctx.rt.ref_var(self.id, self.top_id);
314 self.args.clear()
315 }
316}
317
318macro_rules! list {
319 ($name:ident, $builtin:literal, $method:ident, $typ:literal) => {
320 #[derive(Debug)]
321 struct $name {
322 args: CachedVals,
323 current: Option<Path>,
324 id: BindId,
325 top_id: ExprId,
326 }
327
328 impl<R: Rt, E: UserEvent> BuiltIn<R, E> for $name {
329 const NAME: &str = $builtin;
330 deftype!($typ);
331
332 fn init<'a, 'b, 'c>(
333 ctx: &'a mut ExecCtx<R, E>,
334 _typ: &'a graphix_compiler::typ::FnType,
335 _scope: &'b Scope,
336 from: &'c [Node<R, E>],
337 top_id: ExprId,
338 ) -> Result<Box<dyn Apply<R, E>>> {
339 let id = BindId::new();
340 ctx.rt.ref_var(id, top_id);
341 Ok(Box::new($name {
342 args: CachedVals::new(from),
343 current: None,
344 top_id,
345 id,
346 }))
347 }
348 }
349
350 impl<R: Rt, E: UserEvent> Apply<R, E> for $name {
351 fn update(
352 &mut self,
353 ctx: &mut ExecCtx<R, E>,
354 from: &mut [Node<R, E>],
355 event: &mut Event<E>,
356 ) -> Option<Value> {
357 let mut up = [false; 2];
358 self.args.update_diff(&mut up, ctx, from, event);
359 let ((_, path), (trigger_up, path_up)) = arity2!(self.args.0, &up);
360 match (path, path_up, trigger_up) {
361 (Some(Value::String(path)), true, _)
362 if self
363 .current
364 .as_ref()
365 .map(|p| &**p != &**path)
366 .unwrap_or(true) =>
367 {
368 let path = Path::from(path);
369 self.current = Some(path.clone());
370 ctx.rt.$method(self.id, path);
371 }
372 (Some(Value::String(path)), _, true) => {
373 ctx.rt.$method(self.id, Path::from(path));
374 }
375 _ => (),
376 }
377 event.variables.get(&self.id).and_then(|v| match v {
378 Value::Null => None,
379 Value::Error(e) => Some(errf!(literal!("ListError"), "{e}")),
380 v => Some(v.clone()),
381 })
382 }
383
384 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
385 ctx.rt.unref_var(self.id, self.top_id);
386 ctx.rt.stop_list(self.id);
387 }
388
389 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
390 ctx.rt.unref_var(self.id, self.top_id);
391 ctx.rt.stop_list(self.id);
392 self.id = BindId::new();
393 ctx.rt.ref_var(self.id, self.top_id);
394 self.current = None;
395 self.args.clear();
396 }
397 }
398 };
399}
400
401list!(
402 List,
403 "net_list",
404 list,
405 "fn(?#update:Any, string) -> Result<Array<string>, `ListError(string)>"
406);
407
408list!(
409 ListTable,
410 "net_list_table",
411 list_table,
412 "fn(?#update:Any, string) -> Result<Table, `ListError(string)>"
413);
414
415#[derive(Debug)]
416struct Publish<R: Rt, E: UserEvent> {
417 args: CachedVals,
418 current: Option<(Path, Val)>,
419 top_id: ExprId,
420 x: BindId,
421 pid: BindId,
422 on_write: Node<R, E>,
423}
424
425impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Publish<R, E> {
426 const NAME: &str = "net_publish";
427 deftype!(
428 "fn(?#on_write:fn(Any) -> _ throws 'e, string, Any) -> Result<_, `PublishError(string)> throws 'e"
429 );
430
431 fn init<'a, 'b, 'c>(
432 ctx: &'a mut ExecCtx<R, E>,
433 typ: &'a graphix_compiler::typ::FnType,
434 scope: &'b Scope,
435 from: &'c [Node<R, E>],
436 top_id: ExprId,
437 ) -> Result<Box<dyn Apply<R, E>>> {
438 match from {
439 [_, _, _] => {
440 let scope =
441 scope.append(&format_compact!("fn{}", LambdaId::new().inner()));
442 let pid = BindId::new();
443 let mftyp = match &typ.args[0].typ {
444 Type::Fn(ft) => ft.clone(),
445 t => bail!("expected function not {t}"),
446 };
447 let (x, xn) = genn::bind(ctx, &scope.lexical, "x", Type::Any, top_id);
448 let fnode = genn::reference(ctx, pid, Type::Fn(mftyp.clone()), top_id);
449 let on_write = genn::apply(fnode, scope, vec![xn], &mftyp, top_id);
450 Ok(Box::new(Publish {
451 args: CachedVals::new(from),
452 current: None,
453 top_id,
454 pid,
455 x,
456 on_write,
457 }))
458 }
459 _ => bail!("expected three arguments"),
460 }
461 }
462}
463
464impl<R: Rt, E: UserEvent> Apply<R, E> for Publish<R, E> {
465 fn update(
466 &mut self,
467 ctx: &mut ExecCtx<R, E>,
468 from: &mut [Node<R, E>],
469 event: &mut Event<E>,
470 ) -> Option<Value> {
471 macro_rules! publish {
472 ($path:expr, $v:expr) => {{
473 let path = Path::from($path.clone());
474 match ctx.rt.publish(path.clone(), $v.clone(), self.top_id) {
475 Err(e) => {
476 let msg: ArcStr = format_compact!("{e:?}").as_str().into();
477 let e: Value = (literal!("PublishError"), msg).into();
478 return Some(Value::Error(TArc::new(e)));
479 }
480 Ok(id) => {
481 self.current = Some((path, id));
482 }
483 }
484 }};
485 }
486 let mut up = [false; 3];
487 self.args.update_diff(&mut up, ctx, from, event);
488 if up[0] {
489 if let Some(v) = self.args.0[0].clone() {
490 ctx.cached.insert(self.pid, v.clone());
491 event.variables.insert(self.pid, v);
492 }
493 }
494 match (&up[1..], &self.args.0[1..]) {
495 ([true, _], [Some(Value::String(path)), Some(v)])
496 if self.current.as_ref().map(|(p, _)| &**p != path).unwrap_or(true) =>
497 {
498 if let Some((_, id)) = self.current.take() {
499 ctx.rt.unpublish(id, self.top_id);
500 }
501 publish!(path, v)
502 }
503 ([_, true], [Some(Value::String(path)), Some(v)]) => match &self.current {
504 Some((_, val)) => ctx.rt.update(val, v.clone()),
505 None => publish!(path, v),
506 },
507 _ => (),
508 }
509 let mut reply = None;
510 if let Some((_, val)) = &self.current {
511 if let Some(req) = event.writes.remove(&val.id()) {
512 ctx.cached.insert(self.x, req.value.clone());
513 event.variables.insert(self.x, req.value);
514 reply = req.send_result;
515 }
516 }
517 if let Some(v) = self.on_write.update(ctx, event) {
518 if let Some(reply) = reply {
519 reply.send(v)
520 }
521 }
522 None
523 }
524
525 fn typecheck(
526 &mut self,
527 ctx: &mut ExecCtx<R, E>,
528 _from: &mut [Node<R, E>],
529 ) -> anyhow::Result<()> {
530 self.on_write.typecheck(ctx)
531 }
532
533 fn refs(&self, refs: &mut graphix_compiler::Refs) {
534 self.on_write.refs(refs)
535 }
536
537 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
538 if let Some((_, val)) = self.current.take() {
539 ctx.rt.unpublish(val, self.top_id);
540 }
541 ctx.cached.remove(&self.pid);
542 ctx.cached.remove(&self.x);
543 ctx.env.unbind_variable(self.x);
544 self.on_write.delete(ctx);
545 }
546
547 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
548 if let Some((_, val)) = self.current.take() {
549 ctx.rt.unpublish(val, self.top_id);
550 }
551 self.args.clear();
552 self.on_write.sleep(ctx);
553 }
554}
555
556#[derive(Debug)]
557struct PublishRpc<R: Rt, E: UserEvent> {
558 args: CachedVals,
559 id: BindId,
560 top_id: ExprId,
561 f: Node<R, E>,
562 pid: BindId,
563 x: BindId,
564 queue: VecDeque<server::RpcCall>,
565 argbuf: SmallVec<[(ArcStr, Value); 6]>,
566 ready: bool,
567 current: Option<Path>,
568}
569
570impl<R: Rt, E: UserEvent> BuiltIn<R, E> for PublishRpc<R, E> {
571 const NAME: &str = "net_publish_rpc";
572 deftype!(
573 r#"fn(
574 #path:string,
575 #doc:string,
576 #spec:Array<ArgSpec>,
577 #f:fn(Array<(string, Any)>) -> Any throws 'e
578 ) -> Result<_, `PublishRpcError(string)> throws 'e"#
579 );
580
581 fn init<'a, 'b, 'c>(
582 ctx: &'a mut ExecCtx<R, E>,
583 typ: &'a graphix_compiler::typ::FnType,
584 scope: &'b Scope,
585 from: &'c [Node<R, E>],
586 top_id: ExprId,
587 ) -> Result<Box<dyn Apply<R, E>>> {
588 match from {
589 [_, _, _, _] => {
590 let scope =
591 scope.append(&format_compact!("fn{}", LambdaId::new().inner()));
592 let id = BindId::new();
593 ctx.rt.ref_var(id, top_id);
594 let pid = BindId::new();
595 let mftyp = match &typ.args[3].typ {
596 Type::Fn(ft) => ft.clone(),
597 t => bail!("expected a function not {t}"),
598 };
599 let (x, xn) = genn::bind(
600 ctx,
601 &scope.lexical,
602 "x",
603 mftyp.args[0].typ.clone(),
604 top_id,
605 );
606 let fnode = genn::reference(ctx, pid, Type::Fn(mftyp.clone()), top_id);
607 let f = genn::apply(fnode, scope, vec![xn], &mftyp, top_id);
608 Ok(Box::new(PublishRpc {
609 queue: VecDeque::new(),
610 args: CachedVals::new(from),
611 x,
612 id,
613 top_id,
614 f,
615 pid,
616 argbuf: smallvec![],
617 ready: true,
618 current: None,
619 }))
620 }
621 _ => bail!("expected four arguments"),
622 }
623 }
624}
625
626impl<R: Rt, E: UserEvent> Apply<R, E> for PublishRpc<R, E> {
627 fn update(
628 &mut self,
629 ctx: &mut ExecCtx<R, E>,
630 from: &mut [Node<R, E>],
631 event: &mut Event<E>,
632 ) -> Option<Value> {
633 macro_rules! snd {
634 ($pair:expr) => {
635 match $pair {
636 Value::Array(p) => p[1].clone(),
637 _ => unreachable!(),
638 }
639 };
640 }
641 let mut changed = [false; 4];
642 self.args.update_diff(&mut changed, ctx, from, event);
643 if changed[3] {
644 if let Some(v) = self.args.0[3].clone() {
645 ctx.cached.insert(self.pid, v.clone());
646 event.variables.insert(self.pid, v);
647 }
648 }
649 if changed[0] || changed[1] || changed[2] {
650 if let Some(path) = self.current.take() {
651 ctx.rt.unpublish_rpc(path);
652 }
653 if let (Some(Value::String(path)), Some(doc), Some(Value::Array(spec))) =
654 (&self.args.0[0], &self.args.0[1], &self.args.0[2])
655 {
656 let path = Path::from(path);
657 let spec = spec
658 .iter()
659 .map(|r| match r {
660 Value::Array(r) => {
661 let default_value = snd!(&r[0]);
662 let doc = snd!(&r[1]);
663 let name = snd!(&r[2]).get_as::<ArcStr>().unwrap();
664 ArgSpec { name, doc, default_value }
665 }
666 _ => unreachable!(),
667 })
668 .collect::<Vec<_>>();
669 if let Err(e) =
670 ctx.rt.publish_rpc(path.clone(), doc.clone(), spec, self.id)
671 {
672 let e: ArcStr = format_compact!("{e:?}").as_str().into();
673 let e: Value = (literal!("PublishRpcError"), e).into();
674 return Some(Value::Error(TArc::new(e)));
675 }
676 self.current = Some(path);
677 }
678 }
679 macro_rules! set {
680 ($c:expr) => {{
681 self.ready = false;
682 self.argbuf.extend($c.args.iter().map(|(n, v)| (n.clone(), v.clone())));
683 self.argbuf.sort_by_key(|(n, _)| n.clone());
684 let args =
685 ValArray::from_iter_exact(self.argbuf.drain(..).map(|(n, v)| {
686 Value::Array(ValArray::from([Value::String(n), v]))
687 }));
688 ctx.cached.insert(self.x, Value::Array(args.clone()));
689 event.variables.insert(self.x, Value::Array(args));
690 }};
691 }
692 if let Some(c) = event.rpc_calls.remove(&self.id) {
693 self.queue.push_back(c);
694 }
695 if self.ready && self.queue.len() > 0 {
696 if let Some(c) = self.queue.front() {
697 set!(c)
698 }
699 }
700 loop {
701 match self.f.update(ctx, event) {
702 None => break None,
703 Some(v) => {
704 self.ready = true;
705 if let Some(mut call) = self.queue.pop_front() {
706 call.reply.send(v);
707 }
708 match self.queue.front() {
709 Some(c) => set!(c),
710 None => break None,
711 }
712 }
713 }
714 }
715 }
716
717 fn typecheck(
718 &mut self,
719 ctx: &mut ExecCtx<R, E>,
720 _from: &mut [Node<R, E>],
721 ) -> Result<()> {
722 self.f.typecheck(ctx)
723 }
724
725 fn refs(&self, refs: &mut graphix_compiler::Refs) {
726 self.f.refs(refs)
727 }
728
729 fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
730 ctx.rt.unref_var(self.id, self.top_id);
731 if let Some(path) = self.current.take() {
732 ctx.rt.unpublish_rpc(path);
733 }
734 ctx.cached.remove(&self.x);
735 ctx.env.unbind_variable(self.x);
736 ctx.cached.remove(&self.pid);
737 self.f.delete(ctx);
738 }
739
740 fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
741 ctx.rt.unref_var(self.id, self.top_id);
742 self.id = BindId::new();
743 ctx.rt.ref_var(self.id, self.top_id);
744 if let Some(path) = self.current.take() {
745 ctx.rt.unpublish_rpc(path);
746 }
747 self.args.clear();
748 self.queue.clear();
749 self.argbuf.clear();
750 self.ready = true;
751 self.f.sleep(ctx);
752 }
753}
754
755graphix_derive::defpackage! {
756 builtins => [
757 Write,
758 Subscribe,
759 RpcCall,
760 List,
761 ListTable,
762 Publish as Publish<GXRt<X>, X::UserEvent>,
763 PublishRpc as PublishRpc<GXRt<X>, X::UserEvent>,
764 ],
765}