Skip to main content

graphix_package_net/
lib.rs

1#![doc(
2    html_logo_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg",
3    html_favicon_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg"
4)]
5use anyhow::{anyhow, bail, Result};
6use arcstr::{literal, ArcStr};
7use compact_str::format_compact;
8use graphix_compiler::{
9    err, errf, expr::ExprId, node::genn, typ::Type, Apply, BindId, BuiltIn, Event,
10    ExecCtx, LambdaId, Node, Rt, Scope, UserEvent,
11};
12use graphix_package_core::{arity1, arity2, deftype, CachedVals};
13use graphix_rt::GXRt;
14use netidx::{
15    path::Path,
16    publisher::Val,
17    subscriber::{self, Dval, UpdatesFlags, Value},
18};
19use netidx_core::utils::Either;
20use netidx_protocols::rpc::server::{self, ArgSpec};
21use netidx_value::ValArray;
22use smallvec::{smallvec, SmallVec};
23use std::collections::VecDeque;
24use triomphe::Arc as TArc;
25
26fn as_path(v: Value) -> Option<Path> {
27    match v.cast_to::<String>() {
28        Err(_) => None,
29        Ok(p) => {
30            if Path::is_absolute(&p) {
31                Some(Path::from(p))
32            } else {
33                None
34            }
35        }
36    }
37}
38
39#[derive(Debug)]
40struct Write {
41    args: CachedVals,
42    top_id: ExprId,
43    dv: Either<(Path, Dval), Vec<Value>>,
44}
45
46impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Write {
47    const NAME: &str = "net_write";
48    deftype!("fn(string, Any) -> Result<_, `WriteError(string)>");
49
50    fn init<'a, 'b, 'c>(
51        _ctx: &'a mut ExecCtx<R, E>,
52        _typ: &'a graphix_compiler::typ::FnType,
53        _scope: &'b Scope,
54        from: &'c [Node<R, E>],
55        top_id: ExprId,
56    ) -> Result<Box<dyn Apply<R, E>>> {
57        Ok(Box::new(Write {
58            args: CachedVals::new(from),
59            dv: Either::Right(vec![]),
60            top_id,
61        }))
62    }
63}
64
65impl<R: Rt, E: UserEvent> Apply<R, E> for Write {
66    fn update(
67        &mut self,
68        ctx: &mut ExecCtx<R, E>,
69        from: &mut [Node<R, E>],
70        event: &mut Event<E>,
71    ) -> Option<Value> {
72        fn set(dv: &mut Either<(Path, Dval), Vec<Value>>, val: &Value) {
73            match dv {
74                Either::Right(q) => q.push(val.clone()),
75                Either::Left((_, dv)) => {
76                    dv.write(val.clone());
77                }
78            }
79        }
80        let mut up = [false; 2];
81        self.args.update_diff(&mut up, ctx, from, event);
82        let ((path, value), (path_up, value_up)) = arity2!(self.args.0, &up);
83        match ((path, value), (path_up, value_up)) {
84            ((_, _), (false, false)) => (),
85            ((_, Some(val)), (false, true)) => set(&mut self.dv, val),
86            ((_, None), (false, true)) => (),
87            ((None, Some(val)), (true, true)) => set(&mut self.dv, val),
88            ((Some(path), Some(val)), (true, true)) if self.same_path(path) => {
89                set(&mut self.dv, val)
90            }
91            ((Some(path), _), (true, false)) if self.same_path(path) => (),
92            ((None, _), (true, false)) => (),
93            ((None, None), (_, _)) => (),
94            ((Some(path), val), (true, _)) => match as_path(path.clone()) {
95                None => {
96                    if let Either::Left(_) = &self.dv {
97                        self.dv = Either::Right(vec![]);
98                    }
99                    let e = errf!(literal!("WriteError"), "invalid path {path:?}");
100                    return Some(Value::Error(TArc::new(e)));
101                }
102                Some(path) => {
103                    let dv = ctx.rt.subscribe(
104                        UpdatesFlags::empty(),
105                        path.clone(),
106                        self.top_id,
107                    );
108                    match &mut self.dv {
109                        Either::Left(_) => (),
110                        Either::Right(q) => {
111                            for v in q.drain(..) {
112                                dv.write(v);
113                            }
114                        }
115                    }
116                    self.dv = Either::Left((path, dv));
117                    if let Some(val) = val {
118                        set(&mut self.dv, val)
119                    }
120                }
121            },
122        }
123        None
124    }
125
126    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
127        if let Either::Left((path, dv)) = &self.dv {
128            ctx.rt.unsubscribe(path.clone(), dv.clone(), self.top_id)
129        }
130        self.dv = Either::Right(vec![])
131    }
132
133    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
134        self.args.clear();
135        match &mut self.dv {
136            Either::Left((path, dv)) => {
137                ctx.rt.unsubscribe(path.clone(), dv.clone(), self.top_id);
138                self.dv = Either::Right(vec![])
139            }
140            Either::Right(_) => (),
141        }
142    }
143}
144
145impl Write {
146    fn same_path(&self, new_path: &Value) -> bool {
147        match (new_path, &self.dv) {
148            (Value::String(p0), Either::Left((p1, _))) => &**p0 == &**p1,
149            _ => false,
150        }
151    }
152}
153
154#[derive(Debug)]
155struct Subscribe {
156    args: CachedVals,
157    cur: Option<(Path, Dval)>,
158    top_id: ExprId,
159}
160
161impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Subscribe {
162    const NAME: &str = "net_subscribe";
163    deftype!("fn(string) -> Result<Primitive, `SubscribeError(string)>");
164
165    fn init<'a, 'b, 'c>(
166        _ctx: &'a mut ExecCtx<R, E>,
167        _typ: &'a graphix_compiler::typ::FnType,
168        _scope: &'b Scope,
169        from: &'c [Node<R, E>],
170        top_id: ExprId,
171    ) -> Result<Box<dyn Apply<R, E>>> {
172        Ok(Box::new(Subscribe { args: CachedVals::new(from), cur: None, top_id }))
173    }
174}
175
176impl<R: Rt, E: UserEvent> Apply<R, E> for Subscribe {
177    fn update(
178        &mut self,
179        ctx: &mut ExecCtx<R, E>,
180        from: &mut [Node<R, E>],
181        event: &mut Event<E>,
182    ) -> Option<Value> {
183        static ERR_TAG: ArcStr = literal!("SubscribeError");
184        let mut up = [false; 1];
185        self.args.update_diff(&mut up, ctx, from, event);
186        let (path, path_up) = arity1!(self.args.0, &up);
187        match (path, path_up) {
188            (Some(_), false) | (None, false) => (),
189            (None, true) => {
190                if let Some((path, dv)) = self.cur.take() {
191                    ctx.rt.unsubscribe(path, dv, self.top_id)
192                }
193                return None;
194            }
195            (Some(Value::String(path)), true)
196                if self.cur.as_ref().map(|(p, _)| &**p) != Some(&*path) =>
197            {
198                if let Some((path, dv)) = self.cur.take() {
199                    ctx.rt.unsubscribe(path, dv, self.top_id)
200                }
201                let path = Path::from(path);
202                if !Path::is_absolute(&path) {
203                    return Some(err!(ERR_TAG, "expected absolute path"));
204                }
205                let dval = ctx.rt.subscribe(
206                    UpdatesFlags::BEGIN_WITH_LAST,
207                    path.clone(),
208                    self.top_id,
209                );
210                self.cur = Some((path, dval));
211            }
212            (Some(Value::String(_)), true) => (),
213            (Some(v), true) => {
214                return Some(errf!(ERR_TAG, "invalid path {v}, expected string"))
215            }
216        }
217        self.cur.as_ref().and_then(|(_, dv)| {
218            event.netidx.get(&dv.id()).map(|e| match e {
219                subscriber::Event::Unsubscribed => Value::error(literal!("unsubscribed")),
220                subscriber::Event::Update(v) => v.clone(),
221            })
222        })
223    }
224
225    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
226        if let Some((path, dv)) = self.cur.take() {
227            ctx.rt.unsubscribe(path, dv, self.top_id)
228        }
229    }
230
231    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
232        self.args.clear();
233        if let Some((path, dv)) = self.cur.take() {
234            ctx.rt.unsubscribe(path, dv, self.top_id);
235        }
236    }
237}
238
239#[derive(Debug)]
240struct RpcCall {
241    args: CachedVals,
242    top_id: ExprId,
243    id: BindId,
244}
245
246impl<R: Rt, E: UserEvent> BuiltIn<R, E> for RpcCall {
247    const NAME: &str = "net_call";
248    deftype!("fn(string, Array<(string, Any)>) -> Result<Primitive, `RpcError(string)>");
249
250    fn init<'a, 'b, 'c>(
251        ctx: &'a mut ExecCtx<R, E>,
252        _typ: &'a graphix_compiler::typ::FnType,
253        _scope: &'b Scope,
254        from: &'c [Node<R, E>],
255        top_id: ExprId,
256    ) -> Result<Box<dyn Apply<R, E>>> {
257        let id = BindId::new();
258        ctx.rt.ref_var(id, top_id);
259        Ok(Box::new(RpcCall { args: CachedVals::new(from), top_id, id }))
260    }
261}
262
263impl<R: Rt, E: UserEvent> Apply<R, E> for RpcCall {
264    fn update(
265        &mut self,
266        ctx: &mut ExecCtx<R, E>,
267        from: &mut [Node<R, E>],
268        event: &mut Event<E>,
269    ) -> Option<Value> {
270        fn parse_args(
271            path: &Value,
272            args: &Value,
273        ) -> Result<(Path, Vec<(ArcStr, Value)>)> {
274            let path = as_path(path.clone()).ok_or_else(|| anyhow!("invalid path"))?;
275            let args = match args {
276                Value::Array(args) => args
277                    .iter()
278                    .map(|v| match v {
279                        Value::Array(p) => match &**p {
280                            [Value::String(name), value] => {
281                                Ok((name.clone(), value.clone()))
282                            }
283                            _ => Err(anyhow!("rpc args expected [name, value] pair")),
284                        },
285                        _ => Err(anyhow!("rpc args expected [name, value] pair")),
286                    })
287                    .collect::<Result<Vec<_>>>()?,
288                _ => bail!("rpc args expected to be an array"),
289            };
290            Ok((path, args))
291        }
292        let mut up = [false; 2];
293        self.args.update_diff(&mut up, ctx, from, event);
294        let ((path, args), (path_up, args_up)) = arity2!(self.args.0, &up);
295        match ((path, args), (path_up, args_up)) {
296            ((Some(path), Some(args)), (_, true))
297            | ((Some(path), Some(args)), (true, _)) => match parse_args(path, args) {
298                Err(e) => return Some(errf!(literal!("RpcError"), "{e}")),
299                Ok((path, args)) => ctx.rt.call_rpc(path, args, self.id),
300            },
301            ((None, _), (_, _)) | ((_, None), (_, _)) | ((_, _), (false, false)) => (),
302        }
303        event.variables.get(&self.id).cloned()
304    }
305
306    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
307        ctx.rt.unref_var(self.id, self.top_id)
308    }
309
310    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
311        ctx.rt.unref_var(self.id, self.top_id);
312        self.id = BindId::new();
313        ctx.rt.ref_var(self.id, self.top_id);
314        self.args.clear()
315    }
316}
317
318macro_rules! list {
319    ($name:ident, $builtin:literal, $method:ident, $typ:literal) => {
320        #[derive(Debug)]
321        struct $name {
322            args: CachedVals,
323            current: Option<Path>,
324            id: BindId,
325            top_id: ExprId,
326        }
327
328        impl<R: Rt, E: UserEvent> BuiltIn<R, E> for $name {
329            const NAME: &str = $builtin;
330            deftype!($typ);
331
332            fn init<'a, 'b, 'c>(
333                ctx: &'a mut ExecCtx<R, E>,
334                _typ: &'a graphix_compiler::typ::FnType,
335                _scope: &'b Scope,
336                from: &'c [Node<R, E>],
337                top_id: ExprId,
338            ) -> Result<Box<dyn Apply<R, E>>> {
339                let id = BindId::new();
340                ctx.rt.ref_var(id, top_id);
341                Ok(Box::new($name {
342                    args: CachedVals::new(from),
343                    current: None,
344                    top_id,
345                    id,
346                }))
347            }
348        }
349
350        impl<R: Rt, E: UserEvent> Apply<R, E> for $name {
351            fn update(
352                &mut self,
353                ctx: &mut ExecCtx<R, E>,
354                from: &mut [Node<R, E>],
355                event: &mut Event<E>,
356            ) -> Option<Value> {
357                let mut up = [false; 2];
358                self.args.update_diff(&mut up, ctx, from, event);
359                let ((_, path), (trigger_up, path_up)) = arity2!(self.args.0, &up);
360                match (path, path_up, trigger_up) {
361                    (Some(Value::String(path)), true, _)
362                        if self
363                            .current
364                            .as_ref()
365                            .map(|p| &**p != &**path)
366                            .unwrap_or(true) =>
367                    {
368                        let path = Path::from(path);
369                        self.current = Some(path.clone());
370                        ctx.rt.$method(self.id, path);
371                    }
372                    (Some(Value::String(path)), _, true) => {
373                        ctx.rt.$method(self.id, Path::from(path));
374                    }
375                    _ => (),
376                }
377                event.variables.get(&self.id).and_then(|v| match v {
378                    Value::Null => None,
379                    Value::Error(e) => Some(errf!(literal!("ListError"), "{e}")),
380                    v => Some(v.clone()),
381                })
382            }
383
384            fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
385                ctx.rt.unref_var(self.id, self.top_id);
386                ctx.rt.stop_list(self.id);
387            }
388
389            fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
390                ctx.rt.unref_var(self.id, self.top_id);
391                ctx.rt.stop_list(self.id);
392                self.id = BindId::new();
393                ctx.rt.ref_var(self.id, self.top_id);
394                self.current = None;
395                self.args.clear();
396            }
397        }
398    };
399}
400
401list!(
402    List,
403    "net_list",
404    list,
405    "fn(?#update:Any, string) -> Result<Array<string>, `ListError(string)>"
406);
407
408list!(
409    ListTable,
410    "net_list_table",
411    list_table,
412    "fn(?#update:Any, string) -> Result<Table, `ListError(string)>"
413);
414
415#[derive(Debug)]
416struct Publish<R: Rt, E: UserEvent> {
417    args: CachedVals,
418    current: Option<(Path, Val)>,
419    top_id: ExprId,
420    x: BindId,
421    pid: BindId,
422    on_write: Node<R, E>,
423}
424
425impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Publish<R, E> {
426    const NAME: &str = "net_publish";
427    deftype!(
428        "fn(?#on_write:fn(Any) -> _ throws 'e, string, Any) -> Result<_, `PublishError(string)> throws 'e"
429    );
430
431    fn init<'a, 'b, 'c>(
432        ctx: &'a mut ExecCtx<R, E>,
433        typ: &'a graphix_compiler::typ::FnType,
434        scope: &'b Scope,
435        from: &'c [Node<R, E>],
436        top_id: ExprId,
437    ) -> Result<Box<dyn Apply<R, E>>> {
438        match from {
439            [_, _, _] => {
440                let scope =
441                    scope.append(&format_compact!("fn{}", LambdaId::new().inner()));
442                let pid = BindId::new();
443                let mftyp = match &typ.args[0].typ {
444                    Type::Fn(ft) => ft.clone(),
445                    t => bail!("expected function not {t}"),
446                };
447                let (x, xn) = genn::bind(ctx, &scope.lexical, "x", Type::Any, top_id);
448                let fnode = genn::reference(ctx, pid, Type::Fn(mftyp.clone()), top_id);
449                let on_write = genn::apply(fnode, scope, vec![xn], &mftyp, top_id);
450                Ok(Box::new(Publish {
451                    args: CachedVals::new(from),
452                    current: None,
453                    top_id,
454                    pid,
455                    x,
456                    on_write,
457                }))
458            }
459            _ => bail!("expected three arguments"),
460        }
461    }
462}
463
464impl<R: Rt, E: UserEvent> Apply<R, E> for Publish<R, E> {
465    fn update(
466        &mut self,
467        ctx: &mut ExecCtx<R, E>,
468        from: &mut [Node<R, E>],
469        event: &mut Event<E>,
470    ) -> Option<Value> {
471        macro_rules! publish {
472            ($path:expr, $v:expr) => {{
473                let path = Path::from($path.clone());
474                match ctx.rt.publish(path.clone(), $v.clone(), self.top_id) {
475                    Err(e) => {
476                        let msg: ArcStr = format_compact!("{e:?}").as_str().into();
477                        let e: Value = (literal!("PublishError"), msg).into();
478                        return Some(Value::Error(TArc::new(e)));
479                    }
480                    Ok(id) => {
481                        self.current = Some((path, id));
482                    }
483                }
484            }};
485        }
486        let mut up = [false; 3];
487        self.args.update_diff(&mut up, ctx, from, event);
488        if up[0] {
489            if let Some(v) = self.args.0[0].clone() {
490                ctx.cached.insert(self.pid, v.clone());
491                event.variables.insert(self.pid, v);
492            }
493        }
494        match (&up[1..], &self.args.0[1..]) {
495            ([true, _], [Some(Value::String(path)), Some(v)])
496                if self.current.as_ref().map(|(p, _)| &**p != path).unwrap_or(true) =>
497            {
498                if let Some((_, id)) = self.current.take() {
499                    ctx.rt.unpublish(id, self.top_id);
500                }
501                publish!(path, v)
502            }
503            ([_, true], [Some(Value::String(path)), Some(v)]) => match &self.current {
504                Some((_, val)) => ctx.rt.update(val, v.clone()),
505                None => publish!(path, v),
506            },
507            _ => (),
508        }
509        let mut reply = None;
510        if let Some((_, val)) = &self.current {
511            if let Some(req) = event.writes.remove(&val.id()) {
512                ctx.cached.insert(self.x, req.value.clone());
513                event.variables.insert(self.x, req.value);
514                reply = req.send_result;
515            }
516        }
517        if let Some(v) = self.on_write.update(ctx, event) {
518            if let Some(reply) = reply {
519                reply.send(v)
520            }
521        }
522        None
523    }
524
525    fn typecheck(
526        &mut self,
527        ctx: &mut ExecCtx<R, E>,
528        _from: &mut [Node<R, E>],
529    ) -> anyhow::Result<()> {
530        self.on_write.typecheck(ctx)
531    }
532
533    fn refs(&self, refs: &mut graphix_compiler::Refs) {
534        self.on_write.refs(refs)
535    }
536
537    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
538        if let Some((_, val)) = self.current.take() {
539            ctx.rt.unpublish(val, self.top_id);
540        }
541        ctx.cached.remove(&self.pid);
542        ctx.cached.remove(&self.x);
543        ctx.env.unbind_variable(self.x);
544        self.on_write.delete(ctx);
545    }
546
547    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
548        if let Some((_, val)) = self.current.take() {
549            ctx.rt.unpublish(val, self.top_id);
550        }
551        self.args.clear();
552        self.on_write.sleep(ctx);
553    }
554}
555
556#[derive(Debug)]
557struct PublishRpc<R: Rt, E: UserEvent> {
558    args: CachedVals,
559    id: BindId,
560    top_id: ExprId,
561    f: Node<R, E>,
562    pid: BindId,
563    x: BindId,
564    queue: VecDeque<server::RpcCall>,
565    argbuf: SmallVec<[(ArcStr, Value); 6]>,
566    ready: bool,
567    current: Option<Path>,
568}
569
570impl<R: Rt, E: UserEvent> BuiltIn<R, E> for PublishRpc<R, E> {
571    const NAME: &str = "net_publish_rpc";
572    deftype!(
573        r#"fn(
574            #path:string,
575            #doc:string,
576            #spec:Array<ArgSpec>,
577            #f:fn(Array<(string, Any)>) -> Any throws 'e
578        ) -> Result<_, `PublishRpcError(string)> throws 'e"#
579    );
580
581    fn init<'a, 'b, 'c>(
582        ctx: &'a mut ExecCtx<R, E>,
583        typ: &'a graphix_compiler::typ::FnType,
584        scope: &'b Scope,
585        from: &'c [Node<R, E>],
586        top_id: ExprId,
587    ) -> Result<Box<dyn Apply<R, E>>> {
588        match from {
589            [_, _, _, _] => {
590                let scope =
591                    scope.append(&format_compact!("fn{}", LambdaId::new().inner()));
592                let id = BindId::new();
593                ctx.rt.ref_var(id, top_id);
594                let pid = BindId::new();
595                let mftyp = match &typ.args[3].typ {
596                    Type::Fn(ft) => ft.clone(),
597                    t => bail!("expected a function not {t}"),
598                };
599                let (x, xn) = genn::bind(
600                    ctx,
601                    &scope.lexical,
602                    "x",
603                    mftyp.args[0].typ.clone(),
604                    top_id,
605                );
606                let fnode = genn::reference(ctx, pid, Type::Fn(mftyp.clone()), top_id);
607                let f = genn::apply(fnode, scope, vec![xn], &mftyp, top_id);
608                Ok(Box::new(PublishRpc {
609                    queue: VecDeque::new(),
610                    args: CachedVals::new(from),
611                    x,
612                    id,
613                    top_id,
614                    f,
615                    pid,
616                    argbuf: smallvec![],
617                    ready: true,
618                    current: None,
619                }))
620            }
621            _ => bail!("expected four arguments"),
622        }
623    }
624}
625
626impl<R: Rt, E: UserEvent> Apply<R, E> for PublishRpc<R, E> {
627    fn update(
628        &mut self,
629        ctx: &mut ExecCtx<R, E>,
630        from: &mut [Node<R, E>],
631        event: &mut Event<E>,
632    ) -> Option<Value> {
633        macro_rules! snd {
634            ($pair:expr) => {
635                match $pair {
636                    Value::Array(p) => p[1].clone(),
637                    _ => unreachable!(),
638                }
639            };
640        }
641        let mut changed = [false; 4];
642        self.args.update_diff(&mut changed, ctx, from, event);
643        if changed[3] {
644            if let Some(v) = self.args.0[3].clone() {
645                ctx.cached.insert(self.pid, v.clone());
646                event.variables.insert(self.pid, v);
647            }
648        }
649        if changed[0] || changed[1] || changed[2] {
650            if let Some(path) = self.current.take() {
651                ctx.rt.unpublish_rpc(path);
652            }
653            if let (Some(Value::String(path)), Some(doc), Some(Value::Array(spec))) =
654                (&self.args.0[0], &self.args.0[1], &self.args.0[2])
655            {
656                let path = Path::from(path);
657                let spec = spec
658                    .iter()
659                    .map(|r| match r {
660                        Value::Array(r) => {
661                            let default_value = snd!(&r[0]);
662                            let doc = snd!(&r[1]);
663                            let name = snd!(&r[2]).get_as::<ArcStr>().unwrap();
664                            ArgSpec { name, doc, default_value }
665                        }
666                        _ => unreachable!(),
667                    })
668                    .collect::<Vec<_>>();
669                if let Err(e) =
670                    ctx.rt.publish_rpc(path.clone(), doc.clone(), spec, self.id)
671                {
672                    let e: ArcStr = format_compact!("{e:?}").as_str().into();
673                    let e: Value = (literal!("PublishRpcError"), e).into();
674                    return Some(Value::Error(TArc::new(e)));
675                }
676                self.current = Some(path);
677            }
678        }
679        macro_rules! set {
680            ($c:expr) => {{
681                self.ready = false;
682                self.argbuf.extend($c.args.iter().map(|(n, v)| (n.clone(), v.clone())));
683                self.argbuf.sort_by_key(|(n, _)| n.clone());
684                let args =
685                    ValArray::from_iter_exact(self.argbuf.drain(..).map(|(n, v)| {
686                        Value::Array(ValArray::from([Value::String(n), v]))
687                    }));
688                ctx.cached.insert(self.x, Value::Array(args.clone()));
689                event.variables.insert(self.x, Value::Array(args));
690            }};
691        }
692        if let Some(c) = event.rpc_calls.remove(&self.id) {
693            self.queue.push_back(c);
694        }
695        if self.ready && self.queue.len() > 0 {
696            if let Some(c) = self.queue.front() {
697                set!(c)
698            }
699        }
700        loop {
701            match self.f.update(ctx, event) {
702                None => break None,
703                Some(v) => {
704                    self.ready = true;
705                    if let Some(mut call) = self.queue.pop_front() {
706                        call.reply.send(v);
707                    }
708                    match self.queue.front() {
709                        Some(c) => set!(c),
710                        None => break None,
711                    }
712                }
713            }
714        }
715    }
716
717    fn typecheck(
718        &mut self,
719        ctx: &mut ExecCtx<R, E>,
720        _from: &mut [Node<R, E>],
721    ) -> Result<()> {
722        self.f.typecheck(ctx)
723    }
724
725    fn refs(&self, refs: &mut graphix_compiler::Refs) {
726        self.f.refs(refs)
727    }
728
729    fn delete(&mut self, ctx: &mut ExecCtx<R, E>) {
730        ctx.rt.unref_var(self.id, self.top_id);
731        if let Some(path) = self.current.take() {
732            ctx.rt.unpublish_rpc(path);
733        }
734        ctx.cached.remove(&self.x);
735        ctx.env.unbind_variable(self.x);
736        ctx.cached.remove(&self.pid);
737        self.f.delete(ctx);
738    }
739
740    fn sleep(&mut self, ctx: &mut ExecCtx<R, E>) {
741        ctx.rt.unref_var(self.id, self.top_id);
742        self.id = BindId::new();
743        ctx.rt.ref_var(self.id, self.top_id);
744        if let Some(path) = self.current.take() {
745            ctx.rt.unpublish_rpc(path);
746        }
747        self.args.clear();
748        self.queue.clear();
749        self.argbuf.clear();
750        self.ready = true;
751        self.f.sleep(ctx);
752    }
753}
754
755graphix_derive::defpackage! {
756    builtins => [
757        Write,
758        Subscribe,
759        RpcCall,
760        List,
761        ListTable,
762        Publish as Publish<GXRt<X>, X::UserEvent>,
763        PublishRpc as PublishRpc<GXRt<X>, X::UserEvent>,
764    ],
765}