Skip to main content

graphix_rt/
rt.rs

1use crate::{GXExt, UpdateBatch, WriteBatch};
2use ahash::AHashMap;
3use anyhow::{bail, Result};
4use arcstr::{literal, ArcStr};
5use chrono::prelude::*;
6use compact_str::format_compact;
7use futures::{channel::mpsc, stream::SelectAll, FutureExt};
8use graphix_compiler::{expr::ExprId, BindId, CustomBuiltinType, Rt};
9use netidx::{
10    path::Path,
11    protocol::valarray::ValArray,
12    publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
13    resolver_client::ChangeTracker,
14    subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
15};
16use netidx_protocols::rpc::{
17    self,
18    server::{ArgSpec, RpcCall},
19};
20use nohash::IntMap;
21use poolshark::global::GPooled;
22use std::{
23    collections::{hash_map::Entry, VecDeque},
24    fmt::Debug,
25    future,
26    time::Duration,
27};
28use tokio::{
29    sync::Mutex,
30    task::{self, JoinSet},
31    time::{self, Instant},
32};
33use triomphe::Arc;
34
35#[derive(Debug)]
36pub(super) struct RpcClient {
37    proc: rpc::client::Proc,
38    pub(super) last_used: Instant,
39}
40
41#[derive(Debug)]
42pub struct GXRt<X: GXExt> {
43    pub(super) by_ref: IntMap<BindId, IntMap<ExprId, usize>>,
44    pub(super) subscribed: IntMap<SubId, IntMap<ExprId, usize>>,
45    pub(super) published: IntMap<Id, IntMap<ExprId, usize>>,
46    pub(super) var_updates: VecDeque<(BindId, Value)>,
47    pub(super) custom_updates: VecDeque<(BindId, Box<dyn CustomBuiltinType>)>,
48    pub(super) net_updates: VecDeque<(SubId, subscriber::Event)>,
49    pub(super) net_writes: VecDeque<(Id, WriteRequest)>,
50    pub(super) rpc_overflow: VecDeque<(BindId, RpcCall)>,
51    pub(super) rpc_clients: AHashMap<Path, RpcClient>,
52    pub(super) published_rpcs: AHashMap<Path, rpc::server::Proc>,
53    pub(super) pending_unsubscribe: VecDeque<(Instant, Dval)>,
54    pub(super) change_trackers: IntMap<BindId, Arc<Mutex<ChangeTracker>>>,
55    pub(super) tasks: JoinSet<(BindId, Value)>,
56    pub(super) custom_tasks: JoinSet<(BindId, Box<dyn CustomBuiltinType>)>,
57    pub(super) watches:
58        SelectAll<mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>>,
59    pub(super) var_watches: SelectAll<mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>>,
60    // so the selectall will never return None
61    dummy_watch_tx: mpsc::Sender<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
62    // so the selectall will never return None
63    var_dummy_watch_tx: mpsc::Sender<GPooled<Vec<(BindId, Value)>>>,
64    pub(super) batch: publisher::UpdateBatch,
65    pub(super) publisher: Publisher,
66    pub(super) subscriber: Subscriber,
67    pub(super) updates_tx: mpsc::Sender<UpdateBatch>,
68    pub(super) updates: mpsc::Receiver<UpdateBatch>,
69    pub(super) writes_tx: mpsc::Sender<WriteBatch>,
70    pub(super) writes: mpsc::Receiver<WriteBatch>,
71    pub(super) rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
72    pub(super) rpcs: mpsc::Receiver<(BindId, RpcCall)>,
73    pub(super) updated: IntMap<ExprId, bool>,
74    pub ext: X,
75}
76
77impl<X: GXExt> GXRt<X> {
78    pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
79        let (updates_tx, updates) = mpsc::channel(100);
80        let (writes_tx, writes) = mpsc::channel(100);
81        let (rpcs_tx, rpcs) = mpsc::channel(100);
82        let batch = publisher.start_batch();
83        let mut tasks = JoinSet::new();
84        tasks.spawn(async { future::pending().await });
85        let mut custom_tasks = JoinSet::new();
86        custom_tasks.spawn(async { future::pending().await });
87        let (dummy_watch_tx, dummy_rx) = mpsc::channel(1);
88        let mut watches = SelectAll::new();
89        watches.push(dummy_rx);
90        let (var_dummy_watch_tx, dummy_rx) = mpsc::channel(1);
91        let mut var_watches = SelectAll::new();
92        var_watches.push(dummy_rx);
93        Self {
94            by_ref: IntMap::default(),
95            var_updates: VecDeque::new(),
96            custom_updates: VecDeque::new(),
97            net_updates: VecDeque::new(),
98            net_writes: VecDeque::new(),
99            rpc_overflow: VecDeque::new(),
100            rpc_clients: AHashMap::default(),
101            subscribed: IntMap::default(),
102            pending_unsubscribe: VecDeque::new(),
103            published: IntMap::default(),
104            change_trackers: IntMap::default(),
105            published_rpcs: AHashMap::default(),
106            updated: IntMap::default(),
107            ext: X::default(),
108            tasks,
109            custom_tasks,
110            watches,
111            var_watches,
112            dummy_watch_tx,
113            var_dummy_watch_tx,
114            batch,
115            publisher,
116            subscriber,
117            updates,
118            updates_tx,
119            writes,
120            writes_tx,
121            rpcs_tx,
122            rpcs,
123        }
124    }
125}
126
127macro_rules! or_err {
128    ($bindid:expr, $e:expr) => {
129        match $e {
130            Ok(v) => v,
131            Err(e) => {
132                let e = ArcStr::from(format_compact!("{e:?}").as_str());
133                let e = Value::Error(Arc::new(Value::String(e)));
134                return ($bindid, e);
135            }
136        }
137    };
138}
139
140macro_rules! check_changed {
141    ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
142        let mut ct = $ct.lock().await;
143        if ct.path() != &$path {
144            *ct = ChangeTracker::new($path.clone());
145        }
146        if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
147            return ($id, Value::Null);
148        }
149    };
150}
151
152impl<X: GXExt> Rt for GXRt<X> {
153    type AbortHandle = task::AbortHandle;
154
155    fn clear(&mut self) {
156        let Self {
157            by_ref,
158            var_updates,
159            custom_updates,
160            net_updates,
161            net_writes,
162            rpc_clients,
163            rpc_overflow,
164            subscribed,
165            published,
166            published_rpcs,
167            pending_unsubscribe,
168            change_trackers,
169            tasks,
170            custom_tasks,
171            watches,
172            var_watches,
173            dummy_watch_tx,
174            var_dummy_watch_tx,
175            batch,
176            publisher,
177            subscriber: _,
178            updates_tx,
179            updates,
180            writes_tx,
181            writes,
182            rpcs,
183            rpcs_tx,
184            updated,
185            ext,
186        } = self;
187        ext.clear();
188        updated.clear();
189        by_ref.clear();
190        var_updates.clear();
191        custom_updates.clear();
192        net_updates.clear();
193        net_writes.clear();
194        rpc_overflow.clear();
195        rpc_clients.clear();
196        subscribed.clear();
197        published.clear();
198        published_rpcs.clear();
199        pending_unsubscribe.clear();
200        change_trackers.clear();
201        *tasks = JoinSet::new();
202        tasks.spawn(async { future::pending().await });
203        *custom_tasks = JoinSet::new();
204        custom_tasks.spawn(async { future::pending().await });
205        *watches = SelectAll::new();
206        let (tx, rx) = mpsc::channel(1);
207        *dummy_watch_tx = tx;
208        watches.push(rx);
209        *var_watches = SelectAll::new();
210        let (tx, rx) = mpsc::channel(1);
211        *var_dummy_watch_tx = tx;
212        var_watches.push(rx);
213        *batch = publisher.start_batch();
214        let (tx, rx) = mpsc::channel(3);
215        *updates_tx = tx;
216        *updates = rx;
217        let (tx, rx) = mpsc::channel(100);
218        *writes_tx = tx;
219        *writes = rx;
220        let (tx, rx) = mpsc::channel(100);
221        *rpcs_tx = tx;
222        *rpcs = rx
223    }
224
225    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
226        let now = Instant::now();
227        let proc = match self.rpc_clients.entry(name) {
228            Entry::Occupied(mut e) => {
229                let cl = e.get_mut();
230                cl.last_used = now;
231                Ok(cl.proc.clone())
232            }
233            Entry::Vacant(e) => {
234                match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
235                    Err(e) => Err(e),
236                    Ok(proc) => {
237                        let cl = RpcClient { last_used: now, proc: proc.clone() };
238                        e.insert(cl);
239                        Ok(proc)
240                    }
241                }
242            }
243        };
244        self.tasks.spawn(async move {
245            macro_rules! err {
246                ($e:expr) => {{
247                    let e = format_compact!("{:?}", $e);
248                    (id, Value::error(e.as_str()))
249                }};
250            }
251            match proc {
252                Err(e) => err!(e),
253                Ok(proc) => match proc.call(args).await {
254                    Err(e) => err!(e),
255                    Ok(res) => (id, res),
256                },
257            }
258        });
259    }
260
261    fn publish_rpc(
262        &mut self,
263        name: Path,
264        doc: Value,
265        spec: Vec<ArgSpec>,
266        id: BindId,
267    ) -> Result<()> {
268        use rpc::server::Proc;
269        let e = match self.published_rpcs.entry(name) {
270            Entry::Vacant(e) => e,
271            Entry::Occupied(_) => bail!("already published"),
272        };
273        let proc = Proc::new(
274            &self.publisher,
275            e.key().clone(),
276            doc,
277            spec,
278            move |c| Some((id, c)),
279            Some(self.rpcs_tx.clone()),
280        )?;
281        e.insert(proc);
282        Ok(())
283    }
284
285    fn unpublish_rpc(&mut self, name: Path) {
286        self.published_rpcs.remove(&name);
287    }
288
289    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
290        let dval =
291            self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
292        *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
293        dval
294    }
295
296    fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
297        if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
298            if let Some(cn) = exprs.get_mut(&ref_by) {
299                *cn -= 1;
300                if *cn == 0 {
301                    exprs.remove(&ref_by);
302                }
303            }
304            if exprs.is_empty() {
305                self.subscribed.remove(&dv.id());
306            }
307        }
308        self.pending_unsubscribe.push_back((Instant::now(), dv));
309    }
310
311    fn list(&mut self, id: BindId, path: Path) {
312        let ct = self
313            .change_trackers
314            .entry(id)
315            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
316        let ct = Arc::clone(ct);
317        let resolver = self.subscriber.resolver();
318        self.tasks.spawn(async move {
319            check_changed!(id, resolver, path, ct);
320            let mut paths = or_err!(id, resolver.list(path).await);
321            let paths = paths.drain(..).map(|p| Value::String(p.into()));
322            (id, Value::Array(ValArray::from_iter_exact(paths)))
323        });
324    }
325
326    fn list_table(&mut self, id: BindId, path: Path) {
327        let ct = self
328            .change_trackers
329            .entry(id)
330            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
331        let ct = Arc::clone(ct);
332        let resolver = self.subscriber.resolver();
333        self.tasks.spawn(async move {
334            check_changed!(id, resolver, path, ct);
335            let mut tbl = or_err!(id, resolver.table(path).await);
336            let cols =
337                tbl.cols.drain(..).map(|(name, _count)| Value::String(name.into()));
338            let cols = Value::Array(ValArray::from_iter_exact(cols));
339            let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
340            let rows = Value::Array(ValArray::from_iter_exact(rows));
341            let tbl = Value::Array(ValArray::from([
342                Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
343                Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
344            ]));
345            (id, tbl)
346        });
347    }
348
349    fn stop_list(&mut self, id: BindId) {
350        self.change_trackers.remove(&id);
351    }
352
353    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
354        let val = self.publisher.publish_with_flags_and_writes(
355            PublishFlags::empty(),
356            path,
357            value,
358            Some(self.writes_tx.clone()),
359        )?;
360        let id = val.id();
361        *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
362        Ok(val)
363    }
364
365    fn update(&mut self, val: &Val, value: Value) {
366        val.update(&mut self.batch, value);
367    }
368
369    fn unpublish(&mut self, val: Val, ref_by: ExprId) {
370        if let Some(refs) = self.published.get_mut(&val.id()) {
371            if let Some(cn) = refs.get_mut(&ref_by) {
372                *cn -= 1;
373                if *cn == 0 {
374                    refs.remove(&ref_by);
375                }
376            }
377            if refs.is_empty() {
378                self.published.remove(&val.id());
379            }
380        }
381    }
382
383    fn set_timer(&mut self, id: BindId, timeout: Duration) {
384        self.tasks.spawn(
385            time::sleep(timeout)
386                .map(move |()| (id, Value::DateTime(Arc::new(Utc::now())))),
387        );
388    }
389
390    fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
391        *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
392    }
393
394    fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
395        if let Some(refs) = self.by_ref.get_mut(&id) {
396            if let Some(cn) = refs.get_mut(&ref_by) {
397                *cn -= 1;
398                if *cn == 0 {
399                    refs.remove(&ref_by);
400                }
401            }
402            if refs.is_empty() {
403                self.by_ref.remove(&id);
404            }
405        }
406    }
407
408    fn set_var(&mut self, id: BindId, value: Value) {
409        self.var_updates.push_back((id, value.clone()));
410    }
411
412    fn notify_set(&mut self, id: BindId) {
413        if let Some(refed) = self.by_ref.get(&id) {
414            for eid in refed.keys() {
415                self.updated.entry(*eid).or_default();
416            }
417        }
418    }
419
420    fn spawn<
421        F: Future<Output = (BindId, Box<dyn CustomBuiltinType>)> + Send + 'static,
422    >(
423        &mut self,
424        f: F,
425    ) -> Self::AbortHandle {
426        self.custom_tasks.spawn(f)
427    }
428
429    fn spawn_var<F: Future<Output = (BindId, Value)> + Send + 'static>(
430        &mut self,
431        f: F,
432    ) -> Self::AbortHandle {
433        self.tasks.spawn(f)
434    }
435
436    fn watch(
437        &mut self,
438        s: mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
439    ) {
440        self.watches.push(s)
441    }
442
443    fn watch_var(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>) {
444        self.var_watches.push(s)
445    }
446}