graphix_rt/
rt.rs

1use crate::{GXExt, UpdateBatch, WriteBatch};
2use anyhow::{bail, Result};
3use arcstr::{literal, ArcStr};
4use chrono::prelude::*;
5use compact_str::format_compact;
6use futures::{channel::mpsc, stream::SelectAll, FutureExt};
7use fxhash::FxHashMap;
8use graphix_compiler::{expr::ExprId, BindId, CustomBuiltinType, Rt};
9use netidx::{
10    path::Path,
11    protocol::valarray::ValArray,
12    publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
13    resolver_client::ChangeTracker,
14    subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
15};
16use netidx_protocols::rpc::{
17    self,
18    server::{ArgSpec, RpcCall},
19};
20use poolshark::global::GPooled;
21use std::{
22    collections::{hash_map::Entry, HashMap, VecDeque},
23    fmt::Debug,
24    future,
25    time::Duration,
26};
27use tokio::{
28    sync::Mutex,
29    task::{self, JoinSet},
30    time::{self, Instant},
31};
32use triomphe::Arc;
33
34#[derive(Debug)]
35pub(super) struct RpcClient {
36    proc: rpc::client::Proc,
37    pub(super) last_used: Instant,
38}
39
40#[derive(Debug)]
41pub struct GXRt<X: GXExt> {
42    pub(super) by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
43    pub(super) subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
44    pub(super) published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
45    pub(super) var_updates: VecDeque<(BindId, Value)>,
46    pub(super) custom_updates: VecDeque<(BindId, Box<dyn CustomBuiltinType>)>,
47    pub(super) net_updates: VecDeque<(SubId, subscriber::Event)>,
48    pub(super) net_writes: VecDeque<(Id, WriteRequest)>,
49    pub(super) rpc_overflow: VecDeque<(BindId, RpcCall)>,
50    pub(super) rpc_clients: FxHashMap<Path, RpcClient>,
51    pub(super) published_rpcs: FxHashMap<Path, rpc::server::Proc>,
52    pub(super) pending_unsubscribe: VecDeque<(Instant, Dval)>,
53    pub(super) change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
54    pub(super) tasks: JoinSet<(BindId, Value)>,
55    pub(super) custom_tasks: JoinSet<(BindId, Box<dyn CustomBuiltinType>)>,
56    pub(super) watches:
57        SelectAll<mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>>,
58    pub(super) var_watches: SelectAll<mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>>,
59    // so the selectall will never return None
60    dummy_watch_tx: mpsc::Sender<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
61    // so the selectall will never return None
62    var_dummy_watch_tx: mpsc::Sender<GPooled<Vec<(BindId, Value)>>>,
63    pub(super) batch: publisher::UpdateBatch,
64    pub(super) publisher: Publisher,
65    pub(super) subscriber: Subscriber,
66    pub(super) updates_tx: mpsc::Sender<UpdateBatch>,
67    pub(super) updates: mpsc::Receiver<UpdateBatch>,
68    pub(super) writes_tx: mpsc::Sender<WriteBatch>,
69    pub(super) writes: mpsc::Receiver<WriteBatch>,
70    pub(super) rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
71    pub(super) rpcs: mpsc::Receiver<(BindId, RpcCall)>,
72    pub(super) updated: FxHashMap<ExprId, bool>,
73    pub ext: X,
74}
75
76impl<X: GXExt> GXRt<X> {
77    pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
78        let (updates_tx, updates) = mpsc::channel(100);
79        let (writes_tx, writes) = mpsc::channel(100);
80        let (rpcs_tx, rpcs) = mpsc::channel(100);
81        let batch = publisher.start_batch();
82        let mut tasks = JoinSet::new();
83        tasks.spawn(async { future::pending().await });
84        let mut custom_tasks = JoinSet::new();
85        custom_tasks.spawn(async { future::pending().await });
86        let (dummy_watch_tx, dummy_rx) = mpsc::channel(1);
87        let mut watches = SelectAll::new();
88        watches.push(dummy_rx);
89        let (var_dummy_watch_tx, dummy_rx) = mpsc::channel(1);
90        let mut var_watches = SelectAll::new();
91        var_watches.push(dummy_rx);
92        Self {
93            by_ref: HashMap::default(),
94            var_updates: VecDeque::new(),
95            custom_updates: VecDeque::new(),
96            net_updates: VecDeque::new(),
97            net_writes: VecDeque::new(),
98            rpc_overflow: VecDeque::new(),
99            rpc_clients: HashMap::default(),
100            subscribed: HashMap::default(),
101            pending_unsubscribe: VecDeque::new(),
102            published: HashMap::default(),
103            change_trackers: HashMap::default(),
104            published_rpcs: HashMap::default(),
105            updated: HashMap::default(),
106            ext: X::default(),
107            tasks,
108            custom_tasks,
109            watches,
110            var_watches,
111            dummy_watch_tx,
112            var_dummy_watch_tx,
113            batch,
114            publisher,
115            subscriber,
116            updates,
117            updates_tx,
118            writes,
119            writes_tx,
120            rpcs_tx,
121            rpcs,
122        }
123    }
124}
125
126macro_rules! or_err {
127    ($bindid:expr, $e:expr) => {
128        match $e {
129            Ok(v) => v,
130            Err(e) => {
131                let e = ArcStr::from(format_compact!("{e:?}").as_str());
132                let e = Value::Error(Arc::new(Value::String(e)));
133                return ($bindid, e);
134            }
135        }
136    };
137}
138
139macro_rules! check_changed {
140    ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
141        let mut ct = $ct.lock().await;
142        if ct.path() != &$path {
143            *ct = ChangeTracker::new($path.clone());
144        }
145        if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
146            return ($id, Value::Null);
147        }
148    };
149}
150
151impl<X: GXExt> Rt for GXRt<X> {
152    type AbortHandle = task::AbortHandle;
153
154    fn clear(&mut self) {
155        let Self {
156            by_ref,
157            var_updates,
158            custom_updates,
159            net_updates,
160            net_writes,
161            rpc_clients,
162            rpc_overflow,
163            subscribed,
164            published,
165            published_rpcs,
166            pending_unsubscribe,
167            change_trackers,
168            tasks,
169            custom_tasks,
170            watches,
171            var_watches,
172            dummy_watch_tx,
173            var_dummy_watch_tx,
174            batch,
175            publisher,
176            subscriber: _,
177            updates_tx,
178            updates,
179            writes_tx,
180            writes,
181            rpcs,
182            rpcs_tx,
183            updated,
184            ext,
185        } = self;
186        ext.clear();
187        updated.clear();
188        by_ref.clear();
189        var_updates.clear();
190        custom_updates.clear();
191        net_updates.clear();
192        net_writes.clear();
193        rpc_overflow.clear();
194        rpc_clients.clear();
195        subscribed.clear();
196        published.clear();
197        published_rpcs.clear();
198        pending_unsubscribe.clear();
199        change_trackers.clear();
200        *tasks = JoinSet::new();
201        tasks.spawn(async { future::pending().await });
202        *custom_tasks = JoinSet::new();
203        custom_tasks.spawn(async { future::pending().await });
204        *watches = SelectAll::new();
205        let (tx, rx) = mpsc::channel(1);
206        *dummy_watch_tx = tx;
207        watches.push(rx);
208        *var_watches = SelectAll::new();
209        let (tx, rx) = mpsc::channel(1);
210        *var_dummy_watch_tx = tx;
211        var_watches.push(rx);
212        *batch = publisher.start_batch();
213        let (tx, rx) = mpsc::channel(3);
214        *updates_tx = tx;
215        *updates = rx;
216        let (tx, rx) = mpsc::channel(100);
217        *writes_tx = tx;
218        *writes = rx;
219        let (tx, rx) = mpsc::channel(100);
220        *rpcs_tx = tx;
221        *rpcs = rx
222    }
223
224    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
225        let now = Instant::now();
226        let proc = match self.rpc_clients.entry(name) {
227            Entry::Occupied(mut e) => {
228                let cl = e.get_mut();
229                cl.last_used = now;
230                Ok(cl.proc.clone())
231            }
232            Entry::Vacant(e) => {
233                match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
234                    Err(e) => Err(e),
235                    Ok(proc) => {
236                        let cl = RpcClient { last_used: now, proc: proc.clone() };
237                        e.insert(cl);
238                        Ok(proc)
239                    }
240                }
241            }
242        };
243        self.tasks.spawn(async move {
244            macro_rules! err {
245                ($e:expr) => {{
246                    let e = format_compact!("{:?}", $e);
247                    (id, Value::error(e.as_str()))
248                }};
249            }
250            match proc {
251                Err(e) => err!(e),
252                Ok(proc) => match proc.call(args).await {
253                    Err(e) => err!(e),
254                    Ok(res) => (id, res),
255                },
256            }
257        });
258    }
259
260    fn publish_rpc(
261        &mut self,
262        name: Path,
263        doc: Value,
264        spec: Vec<ArgSpec>,
265        id: BindId,
266    ) -> Result<()> {
267        use rpc::server::Proc;
268        let e = match self.published_rpcs.entry(name) {
269            Entry::Vacant(e) => e,
270            Entry::Occupied(_) => bail!("already published"),
271        };
272        let proc = Proc::new(
273            &self.publisher,
274            e.key().clone(),
275            doc,
276            spec,
277            move |c| Some((id, c)),
278            Some(self.rpcs_tx.clone()),
279        )?;
280        e.insert(proc);
281        Ok(())
282    }
283
284    fn unpublish_rpc(&mut self, name: Path) {
285        self.published_rpcs.remove(&name);
286    }
287
288    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
289        let dval =
290            self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
291        *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
292        dval
293    }
294
295    fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
296        if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
297            if let Some(cn) = exprs.get_mut(&ref_by) {
298                *cn -= 1;
299                if *cn == 0 {
300                    exprs.remove(&ref_by);
301                }
302            }
303            if exprs.is_empty() {
304                self.subscribed.remove(&dv.id());
305            }
306        }
307        self.pending_unsubscribe.push_back((Instant::now(), dv));
308    }
309
310    fn list(&mut self, id: BindId, path: Path) {
311        let ct = self
312            .change_trackers
313            .entry(id)
314            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
315        let ct = Arc::clone(ct);
316        let resolver = self.subscriber.resolver();
317        self.tasks.spawn(async move {
318            check_changed!(id, resolver, path, ct);
319            let mut paths = or_err!(id, resolver.list(path).await);
320            let paths = paths.drain(..).map(|p| Value::String(p.into()));
321            (id, Value::Array(ValArray::from_iter_exact(paths)))
322        });
323    }
324
325    fn list_table(&mut self, id: BindId, path: Path) {
326        let ct = self
327            .change_trackers
328            .entry(id)
329            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
330        let ct = Arc::clone(ct);
331        let resolver = self.subscriber.resolver();
332        self.tasks.spawn(async move {
333            check_changed!(id, resolver, path, ct);
334            let mut tbl = or_err!(id, resolver.table(path).await);
335            let cols = tbl.cols.drain(..).map(|(name, count)| {
336                Value::Array(ValArray::from([
337                    Value::String(name.into()),
338                    Value::V64(count.0),
339                ]))
340            });
341            let cols = Value::Array(ValArray::from_iter_exact(cols));
342            let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
343            let rows = Value::Array(ValArray::from_iter_exact(rows));
344            let tbl = Value::Array(ValArray::from([
345                Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
346                Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
347            ]));
348            (id, tbl)
349        });
350    }
351
352    fn stop_list(&mut self, id: BindId) {
353        self.change_trackers.remove(&id);
354    }
355
356    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
357        let val = self.publisher.publish_with_flags_and_writes(
358            PublishFlags::empty(),
359            path,
360            value,
361            Some(self.writes_tx.clone()),
362        )?;
363        let id = val.id();
364        *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
365        Ok(val)
366    }
367
368    fn update(&mut self, val: &Val, value: Value) {
369        val.update(&mut self.batch, value);
370    }
371
372    fn unpublish(&mut self, val: Val, ref_by: ExprId) {
373        if let Some(refs) = self.published.get_mut(&val.id()) {
374            if let Some(cn) = refs.get_mut(&ref_by) {
375                *cn -= 1;
376                if *cn == 0 {
377                    refs.remove(&ref_by);
378                }
379            }
380            if refs.is_empty() {
381                self.published.remove(&val.id());
382            }
383        }
384    }
385
386    fn set_timer(&mut self, id: BindId, timeout: Duration) {
387        self.tasks.spawn(
388            time::sleep(timeout)
389                .map(move |()| (id, Value::DateTime(Arc::new(Utc::now())))),
390        );
391    }
392
393    fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
394        *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
395    }
396
397    fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
398        if let Some(refs) = self.by_ref.get_mut(&id) {
399            if let Some(cn) = refs.get_mut(&ref_by) {
400                *cn -= 1;
401                if *cn == 0 {
402                    refs.remove(&ref_by);
403                }
404            }
405            if refs.is_empty() {
406                self.by_ref.remove(&id);
407            }
408        }
409    }
410
411    fn set_var(&mut self, id: BindId, value: Value) {
412        self.var_updates.push_back((id, value.clone()));
413    }
414
415    fn notify_set(&mut self, id: BindId) {
416        if let Some(refed) = self.by_ref.get(&id) {
417            for eid in refed.keys() {
418                self.updated.entry(*eid).or_default();
419            }
420        }
421    }
422
423    fn spawn<
424        F: Future<Output = (BindId, Box<dyn CustomBuiltinType>)> + Send + 'static,
425    >(
426        &mut self,
427        f: F,
428    ) -> Self::AbortHandle {
429        self.custom_tasks.spawn(f)
430    }
431
432    fn spawn_var<F: Future<Output = (BindId, Value)> + Send + 'static>(
433        &mut self,
434        f: F,
435    ) -> Self::AbortHandle {
436        self.tasks.spawn(f)
437    }
438
439    fn watch(
440        &mut self,
441        s: mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
442    ) {
443        self.watches.push(s)
444    }
445
446    fn watch_var(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>) {
447        self.var_watches.push(s)
448    }
449}