1use crate::{GXExt, UpdateBatch, WriteBatch};
2use anyhow::{bail, Result};
3use arcstr::{literal, ArcStr};
4use chrono::prelude::*;
5use compact_str::format_compact;
6use futures::{channel::mpsc, stream::SelectAll, FutureExt};
7use fxhash::FxHashMap;
8use graphix_compiler::{expr::ExprId, BindId, CustomBuiltinType, Rt};
9use netidx::{
10 path::Path,
11 protocol::valarray::ValArray,
12 publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
13 resolver_client::ChangeTracker,
14 subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
15};
16use netidx_protocols::rpc::{
17 self,
18 server::{ArgSpec, RpcCall},
19};
20use poolshark::global::GPooled;
21use std::{
22 collections::{hash_map::Entry, HashMap, VecDeque},
23 fmt::Debug,
24 future,
25 time::Duration,
26};
27use tokio::{
28 sync::Mutex,
29 task::{self, JoinSet},
30 time::{self, Instant},
31};
32use triomphe::Arc;
33
34#[derive(Debug)]
35pub(super) struct RpcClient {
36 proc: rpc::client::Proc,
37 pub(super) last_used: Instant,
38}
39
40#[derive(Debug)]
41pub struct GXRt<X: GXExt> {
42 pub(super) by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
43 pub(super) subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
44 pub(super) published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
45 pub(super) var_updates: VecDeque<(BindId, Value)>,
46 pub(super) custom_updates: VecDeque<(BindId, Box<dyn CustomBuiltinType>)>,
47 pub(super) net_updates: VecDeque<(SubId, subscriber::Event)>,
48 pub(super) net_writes: VecDeque<(Id, WriteRequest)>,
49 pub(super) rpc_overflow: VecDeque<(BindId, RpcCall)>,
50 pub(super) rpc_clients: FxHashMap<Path, RpcClient>,
51 pub(super) published_rpcs: FxHashMap<Path, rpc::server::Proc>,
52 pub(super) pending_unsubscribe: VecDeque<(Instant, Dval)>,
53 pub(super) change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
54 pub(super) tasks: JoinSet<(BindId, Value)>,
55 pub(super) custom_tasks: JoinSet<(BindId, Box<dyn CustomBuiltinType>)>,
56 pub(super) watches:
57 SelectAll<mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>>,
58 pub(super) var_watches: SelectAll<mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>>,
59 dummy_watch_tx: mpsc::Sender<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
61 var_dummy_watch_tx: mpsc::Sender<GPooled<Vec<(BindId, Value)>>>,
63 pub(super) batch: publisher::UpdateBatch,
64 pub(super) publisher: Publisher,
65 pub(super) subscriber: Subscriber,
66 pub(super) updates_tx: mpsc::Sender<UpdateBatch>,
67 pub(super) updates: mpsc::Receiver<UpdateBatch>,
68 pub(super) writes_tx: mpsc::Sender<WriteBatch>,
69 pub(super) writes: mpsc::Receiver<WriteBatch>,
70 pub(super) rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
71 pub(super) rpcs: mpsc::Receiver<(BindId, RpcCall)>,
72 pub(super) updated: FxHashMap<ExprId, bool>,
73 pub ext: X,
74}
75
76impl<X: GXExt> GXRt<X> {
77 pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
78 let (updates_tx, updates) = mpsc::channel(100);
79 let (writes_tx, writes) = mpsc::channel(100);
80 let (rpcs_tx, rpcs) = mpsc::channel(100);
81 let batch = publisher.start_batch();
82 let mut tasks = JoinSet::new();
83 tasks.spawn(async { future::pending().await });
84 let mut custom_tasks = JoinSet::new();
85 custom_tasks.spawn(async { future::pending().await });
86 let (dummy_watch_tx, dummy_rx) = mpsc::channel(1);
87 let mut watches = SelectAll::new();
88 watches.push(dummy_rx);
89 let (var_dummy_watch_tx, dummy_rx) = mpsc::channel(1);
90 let mut var_watches = SelectAll::new();
91 var_watches.push(dummy_rx);
92 Self {
93 by_ref: HashMap::default(),
94 var_updates: VecDeque::new(),
95 custom_updates: VecDeque::new(),
96 net_updates: VecDeque::new(),
97 net_writes: VecDeque::new(),
98 rpc_overflow: VecDeque::new(),
99 rpc_clients: HashMap::default(),
100 subscribed: HashMap::default(),
101 pending_unsubscribe: VecDeque::new(),
102 published: HashMap::default(),
103 change_trackers: HashMap::default(),
104 published_rpcs: HashMap::default(),
105 updated: HashMap::default(),
106 ext: X::default(),
107 tasks,
108 custom_tasks,
109 watches,
110 var_watches,
111 dummy_watch_tx,
112 var_dummy_watch_tx,
113 batch,
114 publisher,
115 subscriber,
116 updates,
117 updates_tx,
118 writes,
119 writes_tx,
120 rpcs_tx,
121 rpcs,
122 }
123 }
124}
125
126macro_rules! or_err {
127 ($bindid:expr, $e:expr) => {
128 match $e {
129 Ok(v) => v,
130 Err(e) => {
131 let e = ArcStr::from(format_compact!("{e:?}").as_str());
132 let e = Value::Error(Arc::new(Value::String(e)));
133 return ($bindid, e);
134 }
135 }
136 };
137}
138
139macro_rules! check_changed {
140 ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
141 let mut ct = $ct.lock().await;
142 if ct.path() != &$path {
143 *ct = ChangeTracker::new($path.clone());
144 }
145 if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
146 return ($id, Value::Null);
147 }
148 };
149}
150
151impl<X: GXExt> Rt for GXRt<X> {
152 type AbortHandle = task::AbortHandle;
153
154 fn clear(&mut self) {
155 let Self {
156 by_ref,
157 var_updates,
158 custom_updates,
159 net_updates,
160 net_writes,
161 rpc_clients,
162 rpc_overflow,
163 subscribed,
164 published,
165 published_rpcs,
166 pending_unsubscribe,
167 change_trackers,
168 tasks,
169 custom_tasks,
170 watches,
171 var_watches,
172 dummy_watch_tx,
173 var_dummy_watch_tx,
174 batch,
175 publisher,
176 subscriber: _,
177 updates_tx,
178 updates,
179 writes_tx,
180 writes,
181 rpcs,
182 rpcs_tx,
183 updated,
184 ext,
185 } = self;
186 ext.clear();
187 updated.clear();
188 by_ref.clear();
189 var_updates.clear();
190 custom_updates.clear();
191 net_updates.clear();
192 net_writes.clear();
193 rpc_overflow.clear();
194 rpc_clients.clear();
195 subscribed.clear();
196 published.clear();
197 published_rpcs.clear();
198 pending_unsubscribe.clear();
199 change_trackers.clear();
200 *tasks = JoinSet::new();
201 tasks.spawn(async { future::pending().await });
202 *custom_tasks = JoinSet::new();
203 custom_tasks.spawn(async { future::pending().await });
204 *watches = SelectAll::new();
205 let (tx, rx) = mpsc::channel(1);
206 *dummy_watch_tx = tx;
207 watches.push(rx);
208 *var_watches = SelectAll::new();
209 let (tx, rx) = mpsc::channel(1);
210 *var_dummy_watch_tx = tx;
211 var_watches.push(rx);
212 *batch = publisher.start_batch();
213 let (tx, rx) = mpsc::channel(3);
214 *updates_tx = tx;
215 *updates = rx;
216 let (tx, rx) = mpsc::channel(100);
217 *writes_tx = tx;
218 *writes = rx;
219 let (tx, rx) = mpsc::channel(100);
220 *rpcs_tx = tx;
221 *rpcs = rx
222 }
223
224 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
225 let now = Instant::now();
226 let proc = match self.rpc_clients.entry(name) {
227 Entry::Occupied(mut e) => {
228 let cl = e.get_mut();
229 cl.last_used = now;
230 Ok(cl.proc.clone())
231 }
232 Entry::Vacant(e) => {
233 match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
234 Err(e) => Err(e),
235 Ok(proc) => {
236 let cl = RpcClient { last_used: now, proc: proc.clone() };
237 e.insert(cl);
238 Ok(proc)
239 }
240 }
241 }
242 };
243 self.tasks.spawn(async move {
244 macro_rules! err {
245 ($e:expr) => {{
246 let e = format_compact!("{:?}", $e);
247 (id, Value::error(e.as_str()))
248 }};
249 }
250 match proc {
251 Err(e) => err!(e),
252 Ok(proc) => match proc.call(args).await {
253 Err(e) => err!(e),
254 Ok(res) => (id, res),
255 },
256 }
257 });
258 }
259
260 fn publish_rpc(
261 &mut self,
262 name: Path,
263 doc: Value,
264 spec: Vec<ArgSpec>,
265 id: BindId,
266 ) -> Result<()> {
267 use rpc::server::Proc;
268 let e = match self.published_rpcs.entry(name) {
269 Entry::Vacant(e) => e,
270 Entry::Occupied(_) => bail!("already published"),
271 };
272 let proc = Proc::new(
273 &self.publisher,
274 e.key().clone(),
275 doc,
276 spec,
277 move |c| Some((id, c)),
278 Some(self.rpcs_tx.clone()),
279 )?;
280 e.insert(proc);
281 Ok(())
282 }
283
284 fn unpublish_rpc(&mut self, name: Path) {
285 self.published_rpcs.remove(&name);
286 }
287
288 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
289 let dval =
290 self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
291 *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
292 dval
293 }
294
295 fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
296 if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
297 if let Some(cn) = exprs.get_mut(&ref_by) {
298 *cn -= 1;
299 if *cn == 0 {
300 exprs.remove(&ref_by);
301 }
302 }
303 if exprs.is_empty() {
304 self.subscribed.remove(&dv.id());
305 }
306 }
307 self.pending_unsubscribe.push_back((Instant::now(), dv));
308 }
309
310 fn list(&mut self, id: BindId, path: Path) {
311 let ct = self
312 .change_trackers
313 .entry(id)
314 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
315 let ct = Arc::clone(ct);
316 let resolver = self.subscriber.resolver();
317 self.tasks.spawn(async move {
318 check_changed!(id, resolver, path, ct);
319 let mut paths = or_err!(id, resolver.list(path).await);
320 let paths = paths.drain(..).map(|p| Value::String(p.into()));
321 (id, Value::Array(ValArray::from_iter_exact(paths)))
322 });
323 }
324
325 fn list_table(&mut self, id: BindId, path: Path) {
326 let ct = self
327 .change_trackers
328 .entry(id)
329 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
330 let ct = Arc::clone(ct);
331 let resolver = self.subscriber.resolver();
332 self.tasks.spawn(async move {
333 check_changed!(id, resolver, path, ct);
334 let mut tbl = or_err!(id, resolver.table(path).await);
335 let cols = tbl.cols.drain(..).map(|(name, count)| {
336 Value::Array(ValArray::from([
337 Value::String(name.into()),
338 Value::V64(count.0),
339 ]))
340 });
341 let cols = Value::Array(ValArray::from_iter_exact(cols));
342 let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
343 let rows = Value::Array(ValArray::from_iter_exact(rows));
344 let tbl = Value::Array(ValArray::from([
345 Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
346 Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
347 ]));
348 (id, tbl)
349 });
350 }
351
352 fn stop_list(&mut self, id: BindId) {
353 self.change_trackers.remove(&id);
354 }
355
356 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
357 let val = self.publisher.publish_with_flags_and_writes(
358 PublishFlags::empty(),
359 path,
360 value,
361 Some(self.writes_tx.clone()),
362 )?;
363 let id = val.id();
364 *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
365 Ok(val)
366 }
367
368 fn update(&mut self, val: &Val, value: Value) {
369 val.update(&mut self.batch, value);
370 }
371
372 fn unpublish(&mut self, val: Val, ref_by: ExprId) {
373 if let Some(refs) = self.published.get_mut(&val.id()) {
374 if let Some(cn) = refs.get_mut(&ref_by) {
375 *cn -= 1;
376 if *cn == 0 {
377 refs.remove(&ref_by);
378 }
379 }
380 if refs.is_empty() {
381 self.published.remove(&val.id());
382 }
383 }
384 }
385
386 fn set_timer(&mut self, id: BindId, timeout: Duration) {
387 self.tasks.spawn(
388 time::sleep(timeout)
389 .map(move |()| (id, Value::DateTime(Arc::new(Utc::now())))),
390 );
391 }
392
393 fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
394 *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
395 }
396
397 fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
398 if let Some(refs) = self.by_ref.get_mut(&id) {
399 if let Some(cn) = refs.get_mut(&ref_by) {
400 *cn -= 1;
401 if *cn == 0 {
402 refs.remove(&ref_by);
403 }
404 }
405 if refs.is_empty() {
406 self.by_ref.remove(&id);
407 }
408 }
409 }
410
411 fn set_var(&mut self, id: BindId, value: Value) {
412 self.var_updates.push_back((id, value.clone()));
413 }
414
415 fn notify_set(&mut self, id: BindId) {
416 if let Some(refed) = self.by_ref.get(&id) {
417 for eid in refed.keys() {
418 self.updated.entry(*eid).or_default();
419 }
420 }
421 }
422
423 fn spawn<
424 F: Future<Output = (BindId, Box<dyn CustomBuiltinType>)> + Send + 'static,
425 >(
426 &mut self,
427 f: F,
428 ) -> Self::AbortHandle {
429 self.custom_tasks.spawn(f)
430 }
431
432 fn spawn_var<F: Future<Output = (BindId, Value)> + Send + 'static>(
433 &mut self,
434 f: F,
435 ) -> Self::AbortHandle {
436 self.tasks.spawn(f)
437 }
438
439 fn watch(
440 &mut self,
441 s: mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
442 ) {
443 self.watches.push(s)
444 }
445
446 fn watch_var(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>) {
447 self.var_watches.push(s)
448 }
449}