Skip to main content

graphix_package_net/
lib.rs

1use anyhow::{anyhow, bail, Result};
2use arcstr::{literal, ArcStr};
3use compact_str::format_compact;
4use graphix_compiler::{
5    err, errf, expr::ExprId, node::genn, typ::Type, Apply, BindId, BuiltIn, Event,
6    ExecCtx, LambdaId, Node, Rt, Scope, UserEvent,
7};
8use graphix_package_core::{arity1, arity2, deftype, CachedVals};
9use graphix_rt::GXRt;
10use netidx::{
11    path::Path,
12    publisher::Val,
13    subscriber::{self, Dval, UpdatesFlags, Value},
14};
15use netidx_core::utils::Either;
16use netidx_protocols::rpc::server::{self, ArgSpec};
17use netidx_value::ValArray;
18use smallvec::{smallvec, SmallVec};
19use std::collections::VecDeque;
20use triomphe::Arc as TArc;
21
22fn as_path(v: Value) -> Option<Path> {
23    match v.cast_to::<String>() {
24        Err(_) => None,
25        Ok(p) => {
26            if Path::is_absolute(&p) {
27                Some(Path::from(p))
28            } else {
29                None
30            }
31        }
32    }
33}
34
35#[derive(Debug)]
36struct Write {
37    args: CachedVals,
38    top_id: ExprId,
39    dv: Either<(Path, Dval), Vec<Value>>,
40}
41
42impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Write {
43    const NAME: &str = "net_write";
44    deftype!("fn(string, Any) -> Result<_, `WriteError(string)>");
45
46    fn init<'a, 'b, 'c>(
47        _ctx: &'a mut ExecCtx<R, E>,
48        _typ: &'a graphix_compiler::typ::FnType,
49        _scope: &'b Scope,
50        from: &'c [Node<R, E>],
51        top_id: ExprId,
52    ) -> Result<Box<dyn Apply<R, E>>> {
53        Ok(Box::new(Write {
54            args: CachedVals::new(from),
55            dv: Either::Right(vec![]),
56            top_id,
57        }))
58    }
59}
60
61impl<R: Rt, E: UserEvent> Apply<R, E> for Write {
62    fn update(
63        &mut self,
64        ctx: &mut ExecCtx<R, E>,
65        from: &mut [Node<R, E>],
66        event: &mut Event<E>,
67    ) -> Option<Value> {
68        fn set(dv: &mut Either<(Path, Dval), Vec<Value>>, val: &Value) {
69            match dv {
70                Either::Right(q) => q.push(val.clone()),
71                Either::Left((_, dv)) => {
72                    dv.write(val.clone());
73                }
74            }
75        }
76        let mut up = [false; 2];
77        self.args.update_diff(&mut up, ctx, from, event);
78        let ((path, value), (path_up, value_up)) = arity2!(self.args.0, &up);
79        match ((path, value), (path_up, value_up)) {
80            ((_, _), (false, false)) => (),
81            ((_, Some(val)), (false, true)) => set(&mut self.dv, val),
82            ((_, None), (false, true)) => (),
83            ((None, Some(val)), (true, true)) => set(&mut self.dv, val),
84            ((Some(path), Some(val)), (true, true)) if self.same_path(path) => {
85                set(&mut self.dv, val)
86            }
87            ((Some(path), _), (true, false)) if self.same_path(path) => (),
88            ((None, _), (true, false)) => (),
89            ((None, None), (_, _)) => (),
90            ((Some(path), val), (true, _)) => match as_path(path.clone()) {
91                None => {
92                    if let Either::Left(_) = &self.dv {
93                        self.dv = Either::Right(vec![]);
94                    }
95                    let e = errf!(literal!("WriteError"), "invalid path {path:?}");
96                    return Some(Value::Error(TArc::new(e)));
97                }
98                Some(path) => {
99                    let dv = ctx.rt.subscribe(
100                        UpdatesFlags::empty(),
101                        path.clone(),
102                        self.top_id,
103                    );
104                    match &mut self.dv {
105                        Either::Left(_) => (),
106                        Either::Right(q) => {
107                            for v in q.drain(..) {
108                                dv.write(v);
109                            }
110                        }
111                    }
112                    self.dv = Either::Left((path, dv));
113                    if let Some(val) = val {
114                        set(&mut self.dv, val)
115                    }
116                }
117            },
118        }
119        None
120    }
121
122    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
123        if let Either::Left((path, dv)) = &self.dv {
124            ctx.rt.unsubscribe(path.clone(), dv.clone(), self.top_id)
125        }
126        self.dv = Either::Right(vec![])
127    }
128
129    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
130        self.args.clear();
131        match &mut self.dv {
132            Either::Left((path, dv)) => {
133                ctx.rt.unsubscribe(path.clone(), dv.clone(), self.top_id);
134                self.dv = Either::Right(vec![])
135            }
136            Either::Right(_) => (),
137        }
138    }
139}
140
141impl Write {
142    fn same_path(&self, new_path: &Value) -> bool {
143        match (new_path, &self.dv) {
144            (Value::String(p0), Either::Left((p1, _))) => &**p0 == &**p1,
145            _ => false,
146        }
147    }
148}
149
150#[derive(Debug)]
151struct Subscribe {
152    args: CachedVals,
153    cur: Option<(Path, Dval)>,
154    top_id: ExprId,
155}
156
157impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Subscribe {
158    const NAME: &str = "net_subscribe";
159    deftype!("fn(string) -> Result<Primitive, `SubscribeError(string)>");
160
161    fn init<'a, 'b, 'c>(
162        _ctx: &'a mut ExecCtx<R, E>,
163        _typ: &'a graphix_compiler::typ::FnType,
164        _scope: &'b Scope,
165        from: &'c [Node<R, E>],
166        top_id: ExprId,
167    ) -> Result<Box<dyn Apply<R, E>>> {
168        Ok(Box::new(Subscribe { args: CachedVals::new(from), cur: None, top_id }))
169    }
170}
171
172impl<R: Rt, E: UserEvent> Apply<R, E> for Subscribe {
173    fn update(
174        &mut self,
175        ctx: &mut ExecCtx<R, E>,
176        from: &mut [Node<R, E>],
177        event: &mut Event<E>,
178    ) -> Option<Value> {
179        static ERR_TAG: ArcStr = literal!("SubscribeError");
180        let mut up = [false; 1];
181        self.args.update_diff(&mut up, ctx, from, event);
182        let (path, path_up) = arity1!(self.args.0, &up);
183        match (path, path_up) {
184            (Some(_), false) | (None, false) => (),
185            (None, true) => {
186                if let Some((path, dv)) = self.cur.take() {
187                    ctx.rt.unsubscribe(path, dv, self.top_id)
188                }
189                return None;
190            }
191            (Some(Value::String(path)), true)
192                if self.cur.as_ref().map(|(p, _)| &**p) != Some(&*path) =>
193            {
194                if let Some((path, dv)) = self.cur.take() {
195                    ctx.rt.unsubscribe(path, dv, self.top_id)
196                }
197                let path = Path::from(path);
198                if !Path::is_absolute(&path) {
199                    return Some(err!(ERR_TAG, "expected absolute path"));
200                }
201                let dval = ctx.rt.subscribe(
202                    UpdatesFlags::BEGIN_WITH_LAST,
203                    path.clone(),
204                    self.top_id,
205                );
206                self.cur = Some((path, dval));
207            }
208            (Some(Value::String(_)), true) => (),
209            (Some(v), true) => {
210                return Some(errf!(ERR_TAG, "invalid path {v}, expected string"))
211            }
212        }
213        self.cur.as_ref().and_then(|(_, dv)| {
214            event.netidx.get(&dv.id()).map(|e| match e {
215                subscriber::Event::Unsubscribed => Value::error(literal!("unsubscribed")),
216                subscriber::Event::Update(v) => v.clone(),
217            })
218        })
219    }
220
221    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
222        if let Some((path, dv)) = self.cur.take() {
223            ctx.rt.unsubscribe(path, dv, self.top_id)
224        }
225    }
226
227    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
228        self.args.clear();
229        if let Some((path, dv)) = self.cur.take() {
230            ctx.rt.unsubscribe(path, dv, self.top_id);
231        }
232    }
233}
234
235#[derive(Debug)]
236struct RpcCall {
237    args: CachedVals,
238    top_id: ExprId,
239    id: BindId,
240}
241
242impl<R: Rt, E: UserEvent> BuiltIn<R, E> for RpcCall {
243    const NAME: &str = "net_call";
244    deftype!("fn(string, Array<(string, Any)>) -> Result<Primitive, `RpcError(string)>");
245
246    fn init<'a, 'b, 'c>(
247        ctx: &'a mut ExecCtx<R, E>,
248        _typ: &'a graphix_compiler::typ::FnType,
249        _scope: &'b Scope,
250        from: &'c [Node<R, E>],
251        top_id: ExprId,
252    ) -> Result<Box<dyn Apply<R, E>>> {
253        let id = BindId::new();
254        ctx.rt.ref_var(id, top_id);
255        Ok(Box::new(RpcCall { args: CachedVals::new(from), top_id, id }))
256    }
257}
258
259impl<R: Rt, E: UserEvent> Apply<R, E> for RpcCall {
260    fn update(
261        &mut self,
262        ctx: &mut ExecCtx<R, E>,
263        from: &mut [Node<R, E>],
264        event: &mut Event<E>,
265    ) -> Option<Value> {
266        fn parse_args(
267            path: &Value,
268            args: &Value,
269        ) -> Result<(Path, Vec<(ArcStr, Value)>)> {
270            let path = as_path(path.clone()).ok_or_else(|| anyhow!("invalid path"))?;
271            let args = match args {
272                Value::Array(args) => args
273                    .iter()
274                    .map(|v| match v {
275                        Value::Array(p) => match &**p {
276                            [Value::String(name), value] => {
277                                Ok((name.clone(), value.clone()))
278                            }
279                            _ => Err(anyhow!("rpc args expected [name, value] pair")),
280                        },
281                        _ => Err(anyhow!("rpc args expected [name, value] pair")),
282                    })
283                    .collect::<Result<Vec<_>>>()?,
284                _ => bail!("rpc args expected to be an array"),
285            };
286            Ok((path, args))
287        }
288        let mut up = [false; 2];
289        self.args.update_diff(&mut up, ctx, from, event);
290        let ((path, args), (path_up, args_up)) = arity2!(self.args.0, &up);
291        match ((path, args), (path_up, args_up)) {
292            ((Some(path), Some(args)), (_, true))
293            | ((Some(path), Some(args)), (true, _)) => match parse_args(path, args) {
294                Err(e) => return Some(errf!(literal!("RpcError"), "{e}")),
295                Ok((path, args)) => ctx.rt.call_rpc(path, args, self.id),
296            },
297            ((None, _), (_, _)) | ((_, None), (_, _)) | ((_, _), (false, false)) => (),
298        }
299        event.variables.get(&self.id).cloned()
300    }
301
302    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
303        ctx.rt.unref_var(self.id, self.top_id)
304    }
305
306    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
307        ctx.rt.unref_var(self.id, self.top_id);
308        self.id = BindId::new();
309        ctx.rt.ref_var(self.id, self.top_id);
310        self.args.clear()
311    }
312}
313
314macro_rules! list {
315    ($name:ident, $builtin:literal, $method:ident, $typ:literal) => {
316        #[derive(Debug)]
317        struct $name {
318            args: CachedVals,
319            current: Option<Path>,
320            id: BindId,
321            top_id: ExprId,
322        }
323
324        impl<R: Rt, E: UserEvent> BuiltIn<R, E> for $name {
325            const NAME: &str = $builtin;
326            deftype!($typ);
327
328            fn init<'a, 'b, 'c>(
329                ctx: &'a mut ExecCtx<R, E>,
330                _typ: &'a graphix_compiler::typ::FnType,
331                _scope: &'b Scope,
332                from: &'c [Node<R, E>],
333                top_id: ExprId,
334            ) -> Result<Box<dyn Apply<R, E>>> {
335                let id = BindId::new();
336                ctx.rt.ref_var(id, top_id);
337                Ok(Box::new($name {
338                    args: CachedVals::new(from),
339                    current: None,
340                    top_id,
341                    id,
342                }))
343            }
344        }
345
346        impl<R: Rt, E: UserEvent> Apply<R, E> for $name {
347            fn update(
348                &mut self,
349                ctx: &mut ExecCtx<R, E>,
350                from: &mut [Node<R, E>],
351                event: &mut Event<E>,
352            ) -> Option<Value> {
353                let mut up = [false; 2];
354                self.args.update_diff(&mut up, ctx, from, event);
355                let ((_, path), (trigger_up, path_up)) = arity2!(self.args.0, &up);
356                match (path, path_up, trigger_up) {
357                    (Some(Value::String(path)), true, _)
358                        if self
359                            .current
360                            .as_ref()
361                            .map(|p| &**p != &**path)
362                            .unwrap_or(true) =>
363                    {
364                        let path = Path::from(path);
365                        self.current = Some(path.clone());
366                        ctx.rt.$method(self.id, path);
367                    }
368                    (Some(Value::String(path)), _, true) => {
369                        ctx.rt.$method(self.id, Path::from(path));
370                    }
371                    _ => (),
372                }
373                event.variables.get(&self.id).and_then(|v| match v {
374                    Value::Null => None,
375                    Value::Error(e) => Some(errf!(literal!("ListError"), "{e}")),
376                    v => Some(v.clone()),
377                })
378            }
379
380            fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
381                ctx.rt.unref_var(self.id, self.top_id);
382                ctx.rt.stop_list(self.id);
383            }
384
385            fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
386                ctx.rt.unref_var(self.id, self.top_id);
387                ctx.rt.stop_list(self.id);
388                self.id = BindId::new();
389                ctx.rt.ref_var(self.id, self.top_id);
390                self.current = None;
391                self.args.clear();
392            }
393        }
394    };
395}
396
397list!(
398    List,
399    "net_list",
400    list,
401    "fn(?#update:Any, string) -> Result<Array<string>, `ListError(string)>"
402);
403
404list!(
405    ListTable,
406    "net_list_table",
407    list_table,
408    "fn(?#update:Any, string) -> Result<Table, `ListError(string)>"
409);
410
411#[derive(Debug)]
412struct Publish<R: Rt, E: UserEvent> {
413    args: CachedVals,
414    current: Option<(Path, Val)>,
415    top_id: ExprId,
416    x: BindId,
417    pid: BindId,
418    on_write: Node<R, E>,
419}
420
421impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Publish<R, E> {
422    const NAME: &str = "net_publish";
423    deftype!(
424        "fn(?#on_write:fn(Any) -> _ throws 'e, string, Any) -> Result<_, `PublishError(string)> throws 'e"
425    );
426
427    fn init<'a, 'b, 'c>(
428        ctx: &'a mut ExecCtx<R, E>,
429        typ: &'a graphix_compiler::typ::FnType,
430        scope: &'b Scope,
431        from: &'c [Node<R, E>],
432        top_id: ExprId,
433    ) -> Result<Box<dyn Apply<R, E>>> {
434        match from {
435            [_, _, _] => {
436                let scope =
437                    scope.append(&format_compact!("fn{}", LambdaId::new().inner()));
438                let pid = BindId::new();
439                let mftyp = match &typ.args[0].typ {
440                    Type::Fn(ft) => ft.clone(),
441                    t => bail!("expected function not {t}"),
442                };
443                let (x, xn) = genn::bind(ctx, &scope.lexical, "x", Type::Any, top_id);
444                let fnode = genn::reference(ctx, pid, Type::Fn(mftyp.clone()), top_id);
445                let on_write = genn::apply(fnode, scope, vec![xn], &mftyp, top_id);
446                Ok(Box::new(Publish {
447                    args: CachedVals::new(from),
448                    current: None,
449                    top_id,
450                    pid,
451                    x,
452                    on_write,
453                }))
454            }
455            _ => bail!("expected three arguments"),
456        }
457    }
458}
459
460impl<R: Rt, E: UserEvent> Apply<R, E> for Publish<R, E> {
461    fn update(
462        &mut self,
463        ctx: &mut ExecCtx<R, E>,
464        from: &mut [Node<R, E>],
465        event: &mut Event<E>,
466    ) -> Option<Value> {
467        macro_rules! publish {
468            ($path:expr, $v:expr) => {{
469                let path = Path::from($path.clone());
470                match ctx.rt.publish(path.clone(), $v.clone(), self.top_id) {
471                    Err(e) => {
472                        let msg: ArcStr = format_compact!("{e:?}").as_str().into();
473                        let e: Value = (literal!("PublishError"), msg).into();
474                        return Some(Value::Error(TArc::new(e)));
475                    }
476                    Ok(id) => {
477                        self.current = Some((path, id));
478                    }
479                }
480            }};
481        }
482        let mut up = [false; 3];
483        self.args.update_diff(&mut up, ctx, from, event);
484        if up[0] {
485            if let Some(v) = self.args.0[0].clone() {
486                ctx.cached.insert(self.pid, v.clone());
487                event.variables.insert(self.pid, v);
488            }
489        }
490        match (&up[1..], &self.args.0[1..]) {
491            ([true, _], [Some(Value::String(path)), Some(v)])
492                if self.current.as_ref().map(|(p, _)| &**p != path).unwrap_or(true) =>
493            {
494                if let Some((_, id)) = self.current.take() {
495                    ctx.rt.unpublish(id, self.top_id);
496                }
497                publish!(path, v)
498            }
499            ([_, true], [Some(Value::String(path)), Some(v)]) => match &self.current {
500                Some((_, val)) => ctx.rt.update(val, v.clone()),
501                None => publish!(path, v),
502            },
503            _ => (),
504        }
505        let mut reply = None;
506        if let Some((_, val)) = &self.current {
507            if let Some(req) = event.writes.remove(&val.id()) {
508                ctx.cached.insert(self.x, req.value.clone());
509                event.variables.insert(self.x, req.value);
510                reply = req.send_result;
511            }
512        }
513        if let Some(v) = self.on_write.update(ctx, event) {
514            if let Some(reply) = reply {
515                reply.send(v)
516            }
517        }
518        None
519    }
520
521    fn typecheck(
522        &mut self,
523        ctx: &mut ExecCtx<R, E>,
524        _from: &mut [Node<R, E>],
525    ) -> anyhow::Result<()> {
526        self.on_write.typecheck(ctx)
527    }
528
529    fn refs(&self, refs: &mut graphix_compiler::Refs) {
530        self.on_write.refs(refs)
531    }
532
533    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
534        if let Some((_, val)) = self.current.take() {
535            ctx.rt.unpublish(val, self.top_id);
536        }
537        ctx.cached.remove(&self.pid);
538        ctx.cached.remove(&self.x);
539        ctx.env.unbind_variable(self.x);
540        self.on_write.delete(ctx);
541    }
542
543    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
544        if let Some((_, val)) = self.current.take() {
545            ctx.rt.unpublish(val, self.top_id);
546        }
547        self.args.clear();
548        self.on_write.sleep(ctx);
549    }
550}
551
552#[derive(Debug)]
553struct PublishRpc<R: Rt, E: UserEvent> {
554    args: CachedVals,
555    id: BindId,
556    top_id: ExprId,
557    f: Node<R, E>,
558    pid: BindId,
559    x: BindId,
560    queue: VecDeque<server::RpcCall>,
561    argbuf: SmallVec<[(ArcStr, Value); 6]>,
562    ready: bool,
563    current: Option<Path>,
564}
565
566impl<R: Rt, E: UserEvent> BuiltIn<R, E> for PublishRpc<R, E> {
567    const NAME: &str = "net_publish_rpc";
568    deftype!(
569        r#"fn(
570            #path:string,
571            #doc:string,
572            #spec:Array<ArgSpec>,
573            #f:fn(Array<(string, Any)>) -> Any throws 'e
574        ) -> Result<_, `PublishRpcError(string)> throws 'e"#
575    );
576
577    fn init<'a, 'b, 'c>(
578        ctx: &'a mut ExecCtx<R, E>,
579        typ: &'a graphix_compiler::typ::FnType,
580        scope: &'b Scope,
581        from: &'c [Node<R, E>],
582        top_id: ExprId,
583    ) -> Result<Box<dyn Apply<R, E>>> {
584        match from {
585            [_, _, _, _] => {
586                let scope =
587                    scope.append(&format_compact!("fn{}", LambdaId::new().inner()));
588                let id = BindId::new();
589                ctx.rt.ref_var(id, top_id);
590                let pid = BindId::new();
591                let mftyp = match &typ.args[3].typ {
592                    Type::Fn(ft) => ft.clone(),
593                    t => bail!("expected a function not {t}"),
594                };
595                let (x, xn) = genn::bind(
596                    ctx,
597                    &scope.lexical,
598                    "x",
599                    mftyp.args[0].typ.clone(),
600                    top_id,
601                );
602                let fnode = genn::reference(ctx, pid, Type::Fn(mftyp.clone()), top_id);
603                let f = genn::apply(fnode, scope, vec![xn], &mftyp, top_id);
604                Ok(Box::new(PublishRpc {
605                    queue: VecDeque::new(),
606                    args: CachedVals::new(from),
607                    x,
608                    id,
609                    top_id,
610                    f,
611                    pid,
612                    argbuf: smallvec![],
613                    ready: true,
614                    current: None,
615                }))
616            }
617            _ => bail!("expected four arguments"),
618        }
619    }
620}
621
622impl<R: Rt, E: UserEvent> Apply<R, E> for PublishRpc<R, E> {
623    fn update(
624        &mut self,
625        ctx: &mut ExecCtx<R, E>,
626        from: &mut [Node<R, E>],
627        event: &mut Event<E>,
628    ) -> Option<Value> {
629        macro_rules! snd {
630            ($pair:expr) => {
631                match $pair {
632                    Value::Array(p) => p[1].clone(),
633                    _ => unreachable!(),
634                }
635            };
636        }
637        let mut changed = [false; 4];
638        self.args.update_diff(&mut changed, ctx, from, event);
639        if changed[3] {
640            if let Some(v) = self.args.0[3].clone() {
641                ctx.cached.insert(self.pid, v.clone());
642                event.variables.insert(self.pid, v);
643            }
644        }
645        if changed[0] || changed[1] || changed[2] {
646            if let Some(path) = self.current.take() {
647                ctx.rt.unpublish_rpc(path);
648            }
649            if let (Some(Value::String(path)), Some(doc), Some(Value::Array(spec))) =
650                (&self.args.0[0], &self.args.0[1], &self.args.0[2])
651            {
652                let path = Path::from(path);
653                let spec = spec
654                    .iter()
655                    .map(|r| match r {
656                        Value::Array(r) => {
657                            let default_value = snd!(&r[0]);
658                            let doc = snd!(&r[1]);
659                            let name = snd!(&r[2]).get_as::<ArcStr>().unwrap();
660                            ArgSpec { name, doc, default_value }
661                        }
662                        _ => unreachable!(),
663                    })
664                    .collect::<Vec<_>>();
665                if let Err(e) =
666                    ctx.rt.publish_rpc(path.clone(), doc.clone(), spec, self.id)
667                {
668                    let e: ArcStr = format_compact!("{e:?}").as_str().into();
669                    let e: Value = (literal!("PublishRpcError"), e).into();
670                    return Some(Value::Error(TArc::new(e)));
671                }
672                self.current = Some(path);
673            }
674        }
675        macro_rules! set {
676            ($c:expr) => {{
677                self.ready = false;
678                self.argbuf.extend($c.args.iter().map(|(n, v)| (n.clone(), v.clone())));
679                self.argbuf.sort_by_key(|(n, _)| n.clone());
680                let args =
681                    ValArray::from_iter_exact(self.argbuf.drain(..).map(|(n, v)| {
682                        Value::Array(ValArray::from([Value::String(n), v]))
683                    }));
684                ctx.cached.insert(self.x, Value::Array(args.clone()));
685                event.variables.insert(self.x, Value::Array(args));
686            }};
687        }
688        if let Some(c) = event.rpc_calls.remove(&self.id) {
689            self.queue.push_back(c);
690        }
691        if self.ready && self.queue.len() > 0 {
692            if let Some(c) = self.queue.front() {
693                set!(c)
694            }
695        }
696        loop {
697            match self.f.update(ctx, event) {
698                None => break None,
699                Some(v) => {
700                    self.ready = true;
701                    if let Some(mut call) = self.queue.pop_front() {
702                        call.reply.send(v);
703                    }
704                    match self.queue.front() {
705                        Some(c) => set!(c),
706                        None => break None,
707                    }
708                }
709            }
710        }
711    }
712
713    fn typecheck(
714        &mut self,
715        ctx: &mut ExecCtx<R, E>,
716        _from: &mut [Node<R, E>],
717    ) -> Result<()> {
718        self.f.typecheck(ctx)
719    }
720
721    fn refs(&self, refs: &mut graphix_compiler::Refs) {
722        self.f.refs(refs)
723    }
724
725    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
726        ctx.rt.unref_var(self.id, self.top_id);
727        if let Some(path) = self.current.take() {
728            ctx.rt.unpublish_rpc(path);
729        }
730        ctx.cached.remove(&self.x);
731        ctx.env.unbind_variable(self.x);
732        ctx.cached.remove(&self.pid);
733        self.f.delete(ctx);
734    }
735
736    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
737        ctx.rt.unref_var(self.id, self.top_id);
738        self.id = BindId::new();
739        ctx.rt.ref_var(self.id, self.top_id);
740        if let Some(path) = self.current.take() {
741            ctx.rt.unpublish_rpc(path);
742        }
743        self.args.clear();
744        self.queue.clear();
745        self.argbuf.clear();
746        self.ready = true;
747        self.f.sleep(ctx);
748    }
749}
750
751graphix_derive::defpackage! {
752    builtins => [
753        Write,
754        Subscribe,
755        RpcCall,
756        List,
757        ListTable,
758        Publish as Publish<GXRt<X>, X::UserEvent>,
759        PublishRpc as PublishRpc<GXRt<X>, X::UserEvent>,
760    ],
761}