graphix_rt/
rt.rs

1use crate::{GXExt, UpdateBatch, WriteBatch};
2use anyhow::{bail, Result};
3use arcstr::{literal, ArcStr};
4use chrono::prelude::*;
5use compact_str::format_compact;
6use futures::{channel::mpsc, FutureExt};
7use fxhash::FxHashMap;
8use graphix_compiler::{expr::ExprId, BindId, Rt};
9use netidx::{
10    path::Path,
11    protocol::valarray::ValArray,
12    publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
13    resolver_client::ChangeTracker,
14    subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
15};
16use netidx_protocols::rpc::{
17    self,
18    server::{ArgSpec, RpcCall},
19};
20use std::{
21    collections::{hash_map::Entry, HashMap, VecDeque},
22    future,
23    time::Duration,
24};
25use tokio::{
26    sync::Mutex,
27    task::JoinSet,
28    time::{self, Instant},
29};
30use triomphe::Arc;
31
32#[derive(Debug)]
33pub(super) struct RpcClient {
34    proc: rpc::client::Proc,
35    pub(super) last_used: Instant,
36}
37
38#[derive(Debug)]
39pub struct GXRt<X: GXExt> {
40    pub(super) by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
41    pub(super) subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
42    pub(super) published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
43    pub(super) var_updates: VecDeque<(BindId, Value)>,
44    pub(super) net_updates: VecDeque<(SubId, subscriber::Event)>,
45    pub(super) net_writes: VecDeque<(Id, WriteRequest)>,
46    pub(super) rpc_overflow: VecDeque<(BindId, RpcCall)>,
47    pub(super) rpc_clients: FxHashMap<Path, RpcClient>,
48    pub(super) published_rpcs: FxHashMap<Path, rpc::server::Proc>,
49    pub(super) pending_unsubscribe: VecDeque<(Instant, Dval)>,
50    pub(super) change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
51    pub(super) tasks: JoinSet<(BindId, Value)>,
52    pub(super) batch: publisher::UpdateBatch,
53    pub(super) publisher: Publisher,
54    pub(super) subscriber: Subscriber,
55    pub(super) updates_tx: mpsc::Sender<UpdateBatch>,
56    pub(super) updates: mpsc::Receiver<UpdateBatch>,
57    pub(super) writes_tx: mpsc::Sender<WriteBatch>,
58    pub(super) writes: mpsc::Receiver<WriteBatch>,
59    pub(super) rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
60    pub(super) rpcs: mpsc::Receiver<(BindId, RpcCall)>,
61    pub(super) updated: FxHashMap<ExprId, bool>,
62    pub ext: X,
63}
64
65impl<X: GXExt> GXRt<X> {
66    pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
67        let (updates_tx, updates) = mpsc::channel(100);
68        let (writes_tx, writes) = mpsc::channel(100);
69        let (rpcs_tx, rpcs) = mpsc::channel(100);
70        let batch = publisher.start_batch();
71        let mut tasks = JoinSet::new();
72        tasks.spawn(async { future::pending().await });
73        Self {
74            by_ref: HashMap::default(),
75            var_updates: VecDeque::new(),
76            net_updates: VecDeque::new(),
77            net_writes: VecDeque::new(),
78            rpc_overflow: VecDeque::new(),
79            rpc_clients: HashMap::default(),
80            subscribed: HashMap::default(),
81            pending_unsubscribe: VecDeque::new(),
82            published: HashMap::default(),
83            change_trackers: HashMap::default(),
84            published_rpcs: HashMap::default(),
85            updated: HashMap::default(),
86            ext: X::default(),
87            tasks,
88            batch,
89            publisher,
90            subscriber,
91            updates,
92            updates_tx,
93            writes,
94            writes_tx,
95            rpcs_tx,
96            rpcs,
97        }
98    }
99}
100
101macro_rules! or_err {
102    ($bindid:expr, $e:expr) => {
103        match $e {
104            Ok(v) => v,
105            Err(e) => {
106                let e = ArcStr::from(format_compact!("{e:?}").as_str());
107                let e = Value::Error(Arc::new(Value::String(e)));
108                return ($bindid, e);
109            }
110        }
111    };
112}
113
114macro_rules! check_changed {
115    ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
116        let mut ct = $ct.lock().await;
117        if ct.path() != &$path {
118            *ct = ChangeTracker::new($path.clone());
119        }
120        if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
121            return ($id, Value::Null);
122        }
123    };
124}
125
126impl<X: GXExt> Rt for GXRt<X> {
127    fn clear(&mut self) {
128        let Self {
129            by_ref,
130            var_updates,
131            net_updates,
132            net_writes,
133            rpc_clients,
134            rpc_overflow,
135            subscribed,
136            published,
137            published_rpcs,
138            pending_unsubscribe,
139            change_trackers,
140            tasks,
141            batch,
142            publisher,
143            subscriber: _,
144            updates_tx,
145            updates,
146            writes_tx,
147            writes,
148            rpcs,
149            rpcs_tx,
150            updated,
151            ext,
152        } = self;
153        ext.clear();
154        updated.clear();
155        by_ref.clear();
156        var_updates.clear();
157        net_updates.clear();
158        net_writes.clear();
159        rpc_overflow.clear();
160        rpc_clients.clear();
161        subscribed.clear();
162        published.clear();
163        published_rpcs.clear();
164        pending_unsubscribe.clear();
165        change_trackers.clear();
166        *tasks = JoinSet::new();
167        tasks.spawn(async { future::pending().await });
168        *batch = publisher.start_batch();
169        let (tx, rx) = mpsc::channel(3);
170        *updates_tx = tx;
171        *updates = rx;
172        let (tx, rx) = mpsc::channel(100);
173        *writes_tx = tx;
174        *writes = rx;
175        let (tx, rx) = mpsc::channel(100);
176        *rpcs_tx = tx;
177        *rpcs = rx
178    }
179
180    fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
181        let now = Instant::now();
182        let proc = match self.rpc_clients.entry(name) {
183            Entry::Occupied(mut e) => {
184                let cl = e.get_mut();
185                cl.last_used = now;
186                Ok(cl.proc.clone())
187            }
188            Entry::Vacant(e) => {
189                match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
190                    Err(e) => Err(e),
191                    Ok(proc) => {
192                        let cl = RpcClient { last_used: now, proc: proc.clone() };
193                        e.insert(cl);
194                        Ok(proc)
195                    }
196                }
197            }
198        };
199        self.tasks.spawn(async move {
200            macro_rules! err {
201                ($e:expr) => {{
202                    let e = format_compact!("{:?}", $e);
203                    (id, Value::error(e.as_str()))
204                }};
205            }
206            match proc {
207                Err(e) => err!(e),
208                Ok(proc) => match proc.call(args).await {
209                    Err(e) => err!(e),
210                    Ok(res) => (id, res),
211                },
212            }
213        });
214    }
215
216    fn publish_rpc(
217        &mut self,
218        name: Path,
219        doc: Value,
220        spec: Vec<ArgSpec>,
221        id: BindId,
222    ) -> Result<()> {
223        use rpc::server::Proc;
224        let e = match self.published_rpcs.entry(name) {
225            Entry::Vacant(e) => e,
226            Entry::Occupied(_) => bail!("already published"),
227        };
228        let proc = Proc::new(
229            &self.publisher,
230            e.key().clone(),
231            doc,
232            spec,
233            move |c| Some((id, c)),
234            Some(self.rpcs_tx.clone()),
235        )?;
236        e.insert(proc);
237        Ok(())
238    }
239
240    fn unpublish_rpc(&mut self, name: Path) {
241        self.published_rpcs.remove(&name);
242    }
243
244    fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
245        let dval =
246            self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
247        *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
248        dval
249    }
250
251    fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
252        if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
253            if let Some(cn) = exprs.get_mut(&ref_by) {
254                *cn -= 1;
255                if *cn == 0 {
256                    exprs.remove(&ref_by);
257                }
258            }
259            if exprs.is_empty() {
260                self.subscribed.remove(&dv.id());
261            }
262        }
263        self.pending_unsubscribe.push_back((Instant::now(), dv));
264    }
265
266    fn list(&mut self, id: BindId, path: Path) {
267        let ct = self
268            .change_trackers
269            .entry(id)
270            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
271        let ct = Arc::clone(ct);
272        let resolver = self.subscriber.resolver();
273        self.tasks.spawn(async move {
274            check_changed!(id, resolver, path, ct);
275            let mut paths = or_err!(id, resolver.list(path).await);
276            let paths = paths.drain(..).map(|p| Value::String(p.into()));
277            (id, Value::Array(ValArray::from_iter_exact(paths)))
278        });
279    }
280
281    fn list_table(&mut self, id: BindId, path: Path) {
282        let ct = self
283            .change_trackers
284            .entry(id)
285            .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
286        let ct = Arc::clone(ct);
287        let resolver = self.subscriber.resolver();
288        self.tasks.spawn(async move {
289            check_changed!(id, resolver, path, ct);
290            let mut tbl = or_err!(id, resolver.table(path).await);
291            let cols = tbl.cols.drain(..).map(|(name, count)| {
292                Value::Array(ValArray::from([
293                    Value::String(name.into()),
294                    Value::V64(count.0),
295                ]))
296            });
297            let cols = Value::Array(ValArray::from_iter_exact(cols));
298            let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
299            let rows = Value::Array(ValArray::from_iter_exact(rows));
300            let tbl = Value::Array(ValArray::from([
301                Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
302                Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
303            ]));
304            (id, tbl)
305        });
306    }
307
308    fn stop_list(&mut self, id: BindId) {
309        self.change_trackers.remove(&id);
310    }
311
312    fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
313        let val = self.publisher.publish_with_flags_and_writes(
314            PublishFlags::empty(),
315            path,
316            value,
317            Some(self.writes_tx.clone()),
318        )?;
319        let id = val.id();
320        *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
321        Ok(val)
322    }
323
324    fn update(&mut self, val: &Val, value: Value) {
325        val.update(&mut self.batch, value);
326    }
327
328    fn unpublish(&mut self, val: Val, ref_by: ExprId) {
329        if let Some(refs) = self.published.get_mut(&val.id()) {
330            if let Some(cn) = refs.get_mut(&ref_by) {
331                *cn -= 1;
332                if *cn == 0 {
333                    refs.remove(&ref_by);
334                }
335            }
336            if refs.is_empty() {
337                self.published.remove(&val.id());
338            }
339        }
340    }
341
342    fn set_timer(&mut self, id: BindId, timeout: Duration) {
343        self.tasks.spawn(
344            time::sleep(timeout)
345                .map(move |()| (id, Value::DateTime(Arc::new(Utc::now())))),
346        );
347    }
348
349    fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
350        *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
351    }
352
353    fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
354        if let Some(refs) = self.by_ref.get_mut(&id) {
355            if let Some(cn) = refs.get_mut(&ref_by) {
356                *cn -= 1;
357                if *cn == 0 {
358                    refs.remove(&ref_by);
359                }
360            }
361            if refs.is_empty() {
362                self.by_ref.remove(&id);
363            }
364        }
365    }
366
367    fn set_var(&mut self, id: BindId, value: Value) {
368        self.var_updates.push_back((id, value.clone()));
369    }
370
371    fn notify_set(&mut self, id: BindId) {
372        if let Some(refed) = self.by_ref.get(&id) {
373            for eid in refed.keys() {
374                self.updated.entry(*eid).or_default();
375            }
376        }
377    }
378}