graphix_rt/
rt.rs

1use crate::{GXExt, UpdateBatch, WriteBatch};
2use anyhow::{bail, Result};
3use arcstr::{literal, ArcStr};
4use chrono::prelude::*;
5use compact_str::format_compact;
6use futures::{channel::mpsc, stream::SelectAll, FutureExt};
7use fxhash::FxHashMap;
8use graphix_compiler::{expr::ExprId, BindId, Rt};
9use netidx::{
10    path::Path,
11    protocol::valarray::ValArray,
12    publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
13    resolver_client::ChangeTracker,
14    subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
15};
16use netidx_protocols::rpc::{
17    self,
18    server::{ArgSpec, RpcCall},
19};
20use poolshark::global::GPooled;
21use std::{
22    collections::{hash_map::Entry, HashMap, VecDeque},
23    fmt::Debug,
24    future,
25    time::Duration,
26};
27use tokio::{
28    sync::Mutex,
29    task::{self, JoinSet},
30    time::{self, Instant},
31};
32use triomphe::Arc;
33
34#[derive(Debug)]
35pub(super) struct RpcClient {
36    proc: rpc::client::Proc,
37    pub(super) last_used: Instant,
38}
39
40#[derive(Debug)]
41pub struct GXRt<X: GXExt> {
42    pub(super) by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
43    pub(super) subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
44    pub(super) published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
45    pub(super) var_updates: VecDeque<(BindId, Value)>,
46    pub(super) net_updates: VecDeque<(SubId, subscriber::Event)>,
47    pub(super) net_writes: VecDeque<(Id, WriteRequest)>,
48    pub(super) rpc_overflow: VecDeque<(BindId, RpcCall)>,
49    pub(super) rpc_clients: FxHashMap<Path, RpcClient>,
50    pub(super) published_rpcs: FxHashMap<Path, rpc::server::Proc>,
51    pub(super) pending_unsubscribe: VecDeque<(Instant, Dval)>,
52    pub(super) change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
53    pub(super) tasks: JoinSet<(BindId, Value)>,
54    pub(super) watches: SelectAll<mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>>,
55    // so the selectall will never return None
56    dummy_watch_tx: mpsc::Sender<GPooled<Vec<(BindId, Value)>>>,
57    pub(super) batch: publisher::UpdateBatch,
58    pub(super) publisher: Publisher,
59    pub(super) subscriber: Subscriber,
60    pub(super) updates_tx: mpsc::Sender<UpdateBatch>,
61    pub(super) updates: mpsc::Receiver<UpdateBatch>,
62    pub(super) writes_tx: mpsc::Sender<WriteBatch>,
63    pub(super) writes: mpsc::Receiver<WriteBatch>,
64    pub(super) rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
65    pub(super) rpcs: mpsc::Receiver<(BindId, RpcCall)>,
66    pub(super) updated: FxHashMap<ExprId, bool>,
67    pub ext: X,
68}
69
70impl<X: GXExt> GXRt<X> {
71    pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
72        let (updates_tx, updates) = mpsc::channel(100);
73        let (writes_tx, writes) = mpsc::channel(100);
74        let (rpcs_tx, rpcs) = mpsc::channel(100);
75        let batch = publisher.start_batch();
76        let mut tasks = JoinSet::new();
77        tasks.spawn(async { future::pending().await });
78        let (dummy_watch_tx, dummy_rx) = mpsc::channel(1);
79        let mut watches = SelectAll::new();
80        watches.push(dummy_rx);
81        Self {
82            by_ref: HashMap::default(),
83            var_updates: VecDeque::new(),
84            net_updates: VecDeque::new(),
85            net_writes: VecDeque::new(),
86            rpc_overflow: VecDeque::new(),
87            rpc_clients: HashMap::default(),
88            subscribed: HashMap::default(),
89            pending_unsubscribe: VecDeque::new(),
90            published: HashMap::default(),
91            change_trackers: HashMap::default(),
92            published_rpcs: HashMap::default(),
93            updated: HashMap::default(),
94            ext: X::default(),
95            tasks,
96            watches,
97            dummy_watch_tx,
98            batch,
99            publisher,
100            subscriber,
101            updates,
102            updates_tx,
103            writes,
104            writes_tx,
105            rpcs_tx,
106            rpcs,
107        }
108    }
109}
110
111macro_rules! or_err {
112    ($bindid:expr, $e:expr) => {
113        match $e {
114            Ok(v) => v,
115            Err(e) => {
116                let e = ArcStr::from(format_compact!("{e:?}").as_str());
117                let e = Value::Error(Arc::new(Value::String(e)));
118                return ($bindid, e);
119            }
120        }
121    };
122}
123
124macro_rules! check_changed {
125    ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
126        let mut ct = $ct.lock().await;
127        if ct.path() != &$path {
128            *ct = ChangeTracker::new($path.clone());
129        }
130        if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
131            return ($id, Value::Null);
132        }
133    };
134}
135
136impl<X: GXExt> Rt for GXRt<X> {
137    type AbortHandle = task::AbortHandle;
138
139    fn clear(&mut self) {
140        let Self {
141            by_ref,
142            var_updates,
143            net_updates,
144            net_writes,
145            rpc_clients,
146            rpc_overflow,
147            subscribed,
148            published,
149            published_rpcs,
150            pending_unsubscribe,
151            change_trackers,
152            tasks,
153            watches,
154            dummy_watch_tx,
155            batch,
156            publisher,
157            subscriber: _,
158            updates_tx,
159            updates,
160            writes_tx,
161            writes,
162            rpcs,
163            rpcs_tx,
164            updated,
165            ext,
166        } = self;
167        ext.clear();
168        updated.clear();
169        by_ref.clear();
170        var_updates.clear();
171        net_updates.clear();
172        net_writes.clear();
173        rpc_overflow.clear();
174        rpc_clients.clear();
175        subscribed.clear();
176        published.clear();
177        published_rpcs.clear();
178        pending_unsubscribe.clear();
179        change_trackers.clear();
180        *tasks = JoinSet::new();
181        tasks.spawn(async { future::pending().await });
182        *watches = SelectAll::new();
183        let (tx, rx) = mpsc::channel(1);
184        *dummy_watch_tx = tx;
185        watches.push(rx);
186        *batch = publisher.start_batch();
187        let (tx, rx) = mpsc::channel(3);
188        *updates_tx = tx;
189        *updates = rx;
190        let (tx, rx) = mpsc::channel(100);
191        *writes_tx = tx;
192        *writes = rx;
193        let (tx, rx) = mpsc::channel(100);
194        *rpcs_tx = tx;
195        *rpcs = rx
196    }
197
198    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
199        let now = Instant::now();
200        let proc = match self.rpc_clients.entry(name) {
201            Entry::Occupied(mut e) => {
202                let cl = e.get_mut();
203                cl.last_used = now;
204                Ok(cl.proc.clone())
205            }
206            Entry::Vacant(e) => {
207                match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
208                    Err(e) => Err(e),
209                    Ok(proc) => {
210                        let cl = RpcClient { last_used: now, proc: proc.clone() };
211                        e.insert(cl);
212                        Ok(proc)
213                    }
214                }
215            }
216        };
217        self.tasks.spawn(async move {
218            macro_rules! err {
219                ($e:expr) => {{
220                    let e = format_compact!("{:?}", $e);
221                    (id, Value::error(e.as_str()))
222                }};
223            }
224            match proc {
225                Err(e) => err!(e),
226                Ok(proc) => match proc.call(args).await {
227                    Err(e) => err!(e),
228                    Ok(res) => (id, res),
229                },
230            }
231        });
232    }
233
234    fn publish_rpc(
235        &mut self,
236        name: Path,
237        doc: Value,
238        spec: Vec<ArgSpec>,
239        id: BindId,
240    ) -> Result<()> {
241        use rpc::server::Proc;
242        let e = match self.published_rpcs.entry(name) {
243            Entry::Vacant(e) => e,
244            Entry::Occupied(_) => bail!("already published"),
245        };
246        let proc = Proc::new(
247            &self.publisher,
248            e.key().clone(),
249            doc,
250            spec,
251            move |c| Some((id, c)),
252            Some(self.rpcs_tx.clone()),
253        )?;
254        e.insert(proc);
255        Ok(())
256    }
257
258    fn unpublish_rpc(&mut self, name: Path) {
259        self.published_rpcs.remove(&name);
260    }
261
262    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
263        let dval =
264            self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
265        *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
266        dval
267    }
268
269    fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
270        if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
271            if let Some(cn) = exprs.get_mut(&ref_by) {
272                *cn -= 1;
273                if *cn == 0 {
274                    exprs.remove(&ref_by);
275                }
276            }
277            if exprs.is_empty() {
278                self.subscribed.remove(&dv.id());
279            }
280        }
281        self.pending_unsubscribe.push_back((Instant::now(), dv));
282    }
283
284    fn list(&mut self, id: BindId, path: Path) {
285        let ct = self
286            .change_trackers
287            .entry(id)
288            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
289        let ct = Arc::clone(ct);
290        let resolver = self.subscriber.resolver();
291        self.tasks.spawn(async move {
292            check_changed!(id, resolver, path, ct);
293            let mut paths = or_err!(id, resolver.list(path).await);
294            let paths = paths.drain(..).map(|p| Value::String(p.into()));
295            (id, Value::Array(ValArray::from_iter_exact(paths)))
296        });
297    }
298
299    fn list_table(&mut self, id: BindId, path: Path) {
300        let ct = self
301            .change_trackers
302            .entry(id)
303            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
304        let ct = Arc::clone(ct);
305        let resolver = self.subscriber.resolver();
306        self.tasks.spawn(async move {
307            check_changed!(id, resolver, path, ct);
308            let mut tbl = or_err!(id, resolver.table(path).await);
309            let cols = tbl.cols.drain(..).map(|(name, count)| {
310                Value::Array(ValArray::from([
311                    Value::String(name.into()),
312                    Value::V64(count.0),
313                ]))
314            });
315            let cols = Value::Array(ValArray::from_iter_exact(cols));
316            let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
317            let rows = Value::Array(ValArray::from_iter_exact(rows));
318            let tbl = Value::Array(ValArray::from([
319                Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
320                Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
321            ]));
322            (id, tbl)
323        });
324    }
325
326    fn stop_list(&mut self, id: BindId) {
327        self.change_trackers.remove(&id);
328    }
329
330    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
331        let val = self.publisher.publish_with_flags_and_writes(
332            PublishFlags::empty(),
333            path,
334            value,
335            Some(self.writes_tx.clone()),
336        )?;
337        let id = val.id();
338        *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
339        Ok(val)
340    }
341
342    fn update(&mut self, val: &Val, value: Value) {
343        val.update(&mut self.batch, value);
344    }
345
346    fn unpublish(&mut self, val: Val, ref_by: ExprId) {
347        if let Some(refs) = self.published.get_mut(&val.id()) {
348            if let Some(cn) = refs.get_mut(&ref_by) {
349                *cn -= 1;
350                if *cn == 0 {
351                    refs.remove(&ref_by);
352                }
353            }
354            if refs.is_empty() {
355                self.published.remove(&val.id());
356            }
357        }
358    }
359
360    fn set_timer(&mut self, id: BindId, timeout: Duration) {
361        self.tasks.spawn(
362            time::sleep(timeout)
363                .map(move |()| (id, Value::DateTime(Arc::new(Utc::now())))),
364        );
365    }
366
367    fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
368        *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
369    }
370
371    fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
372        if let Some(refs) = self.by_ref.get_mut(&id) {
373            if let Some(cn) = refs.get_mut(&ref_by) {
374                *cn -= 1;
375                if *cn == 0 {
376                    refs.remove(&ref_by);
377                }
378            }
379            if refs.is_empty() {
380                self.by_ref.remove(&id);
381            }
382        }
383    }
384
385    fn set_var(&mut self, id: BindId, value: Value) {
386        self.var_updates.push_back((id, value.clone()));
387    }
388
389    fn notify_set(&mut self, id: BindId) {
390        if let Some(refed) = self.by_ref.get(&id) {
391            for eid in refed.keys() {
392                self.updated.entry(*eid).or_default();
393            }
394        }
395    }
396
397    fn spawn<F: Future<Output = (BindId, Value)> + Send + 'static>(
398        &mut self,
399        f: F,
400    ) -> Self::AbortHandle {
401        self.tasks.spawn(f)
402    }
403
404    fn watch(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>) {
405        self.watches.push(s)
406    }
407}