1use crate::{GXExt, UpdateBatch, WriteBatch};
2use ahash::AHashMap;
3use anyhow::{bail, Result};
4use arcstr::{literal, ArcStr};
5use chrono::prelude::*;
6use compact_str::format_compact;
7use futures::{channel::mpsc, stream::SelectAll, FutureExt};
8use graphix_compiler::{expr::ExprId, BindId, CustomBuiltinType, Rt};
9use netidx::{
10 path::Path,
11 protocol::valarray::ValArray,
12 publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
13 resolver_client::ChangeTracker,
14 subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
15};
16use netidx_protocols::rpc::{
17 self,
18 server::{ArgSpec, RpcCall},
19};
20use nohash::IntMap;
21use poolshark::global::GPooled;
22use std::{
23 collections::{hash_map::Entry, VecDeque},
24 fmt::Debug,
25 future,
26 time::Duration,
27};
28use tokio::{
29 sync::Mutex,
30 task::{self, JoinSet},
31 time::{self, Instant},
32};
33use triomphe::Arc;
34
35#[derive(Debug)]
36pub(super) struct RpcClient {
37 proc: rpc::client::Proc,
38 pub(super) last_used: Instant,
39}
40
41#[derive(Debug)]
42pub struct GXRt<X: GXExt> {
43 pub(super) by_ref: IntMap<BindId, IntMap<ExprId, usize>>,
44 pub(super) subscribed: IntMap<SubId, IntMap<ExprId, usize>>,
45 pub(super) published: IntMap<Id, IntMap<ExprId, usize>>,
46 pub(super) var_updates: VecDeque<(BindId, Value)>,
47 pub(super) custom_updates: VecDeque<(BindId, Box<dyn CustomBuiltinType>)>,
48 pub(super) net_updates: VecDeque<(SubId, subscriber::Event)>,
49 pub(super) net_writes: VecDeque<(Id, WriteRequest)>,
50 pub(super) rpc_overflow: VecDeque<(BindId, RpcCall)>,
51 pub(super) rpc_clients: AHashMap<Path, RpcClient>,
52 pub(super) published_rpcs: AHashMap<Path, rpc::server::Proc>,
53 pub(super) pending_unsubscribe: VecDeque<(Instant, Dval)>,
54 pub(super) change_trackers: IntMap<BindId, Arc<Mutex<ChangeTracker>>>,
55 pub(super) tasks: JoinSet<(BindId, Value)>,
56 pub(super) custom_tasks: JoinSet<(BindId, Box<dyn CustomBuiltinType>)>,
57 pub(super) watches:
58 SelectAll<mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>>,
59 pub(super) var_watches: SelectAll<mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>>,
60 dummy_watch_tx: mpsc::Sender<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
62 var_dummy_watch_tx: mpsc::Sender<GPooled<Vec<(BindId, Value)>>>,
64 pub(super) batch: publisher::UpdateBatch,
65 pub(super) publisher: Publisher,
66 pub(super) subscriber: Subscriber,
67 pub(super) updates_tx: mpsc::Sender<UpdateBatch>,
68 pub(super) updates: mpsc::Receiver<UpdateBatch>,
69 pub(super) writes_tx: mpsc::Sender<WriteBatch>,
70 pub(super) writes: mpsc::Receiver<WriteBatch>,
71 pub(super) rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
72 pub(super) rpcs: mpsc::Receiver<(BindId, RpcCall)>,
73 pub(super) updated: IntMap<ExprId, bool>,
74 pub ext: X,
75}
76
77impl<X: GXExt> GXRt<X> {
78 pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
79 let (updates_tx, updates) = mpsc::channel(100);
80 let (writes_tx, writes) = mpsc::channel(100);
81 let (rpcs_tx, rpcs) = mpsc::channel(100);
82 let batch = publisher.start_batch();
83 let mut tasks = JoinSet::new();
84 tasks.spawn(async { future::pending().await });
85 let mut custom_tasks = JoinSet::new();
86 custom_tasks.spawn(async { future::pending().await });
87 let (dummy_watch_tx, dummy_rx) = mpsc::channel(1);
88 let mut watches = SelectAll::new();
89 watches.push(dummy_rx);
90 let (var_dummy_watch_tx, dummy_rx) = mpsc::channel(1);
91 let mut var_watches = SelectAll::new();
92 var_watches.push(dummy_rx);
93 Self {
94 by_ref: IntMap::default(),
95 var_updates: VecDeque::new(),
96 custom_updates: VecDeque::new(),
97 net_updates: VecDeque::new(),
98 net_writes: VecDeque::new(),
99 rpc_overflow: VecDeque::new(),
100 rpc_clients: AHashMap::default(),
101 subscribed: IntMap::default(),
102 pending_unsubscribe: VecDeque::new(),
103 published: IntMap::default(),
104 change_trackers: IntMap::default(),
105 published_rpcs: AHashMap::default(),
106 updated: IntMap::default(),
107 ext: X::default(),
108 tasks,
109 custom_tasks,
110 watches,
111 var_watches,
112 dummy_watch_tx,
113 var_dummy_watch_tx,
114 batch,
115 publisher,
116 subscriber,
117 updates,
118 updates_tx,
119 writes,
120 writes_tx,
121 rpcs_tx,
122 rpcs,
123 }
124 }
125}
126
127macro_rules! or_err {
128 ($bindid:expr, $e:expr) => {
129 match $e {
130 Ok(v) => v,
131 Err(e) => {
132 let e = ArcStr::from(format_compact!("{e:?}").as_str());
133 let e = Value::Error(Arc::new(Value::String(e)));
134 return ($bindid, e);
135 }
136 }
137 };
138}
139
140macro_rules! check_changed {
141 ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
142 let mut ct = $ct.lock().await;
143 if ct.path() != &$path {
144 *ct = ChangeTracker::new($path.clone());
145 }
146 if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
147 return ($id, Value::Null);
148 }
149 };
150}
151
152impl<X: GXExt> Rt for GXRt<X> {
153 type AbortHandle = task::AbortHandle;
154
155 fn clear(&mut self) {
156 let Self {
157 by_ref,
158 var_updates,
159 custom_updates,
160 net_updates,
161 net_writes,
162 rpc_clients,
163 rpc_overflow,
164 subscribed,
165 published,
166 published_rpcs,
167 pending_unsubscribe,
168 change_trackers,
169 tasks,
170 custom_tasks,
171 watches,
172 var_watches,
173 dummy_watch_tx,
174 var_dummy_watch_tx,
175 batch,
176 publisher,
177 subscriber: _,
178 updates_tx,
179 updates,
180 writes_tx,
181 writes,
182 rpcs,
183 rpcs_tx,
184 updated,
185 ext,
186 } = self;
187 ext.clear();
188 updated.clear();
189 by_ref.clear();
190 var_updates.clear();
191 custom_updates.clear();
192 net_updates.clear();
193 net_writes.clear();
194 rpc_overflow.clear();
195 rpc_clients.clear();
196 subscribed.clear();
197 published.clear();
198 published_rpcs.clear();
199 pending_unsubscribe.clear();
200 change_trackers.clear();
201 *tasks = JoinSet::new();
202 tasks.spawn(async { future::pending().await });
203 *custom_tasks = JoinSet::new();
204 custom_tasks.spawn(async { future::pending().await });
205 *watches = SelectAll::new();
206 let (tx, rx) = mpsc::channel(1);
207 *dummy_watch_tx = tx;
208 watches.push(rx);
209 *var_watches = SelectAll::new();
210 let (tx, rx) = mpsc::channel(1);
211 *var_dummy_watch_tx = tx;
212 var_watches.push(rx);
213 *batch = publisher.start_batch();
214 let (tx, rx) = mpsc::channel(3);
215 *updates_tx = tx;
216 *updates = rx;
217 let (tx, rx) = mpsc::channel(100);
218 *writes_tx = tx;
219 *writes = rx;
220 let (tx, rx) = mpsc::channel(100);
221 *rpcs_tx = tx;
222 *rpcs = rx
223 }
224
225 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
226 let now = Instant::now();
227 let proc = match self.rpc_clients.entry(name) {
228 Entry::Occupied(mut e) => {
229 let cl = e.get_mut();
230 cl.last_used = now;
231 Ok(cl.proc.clone())
232 }
233 Entry::Vacant(e) => {
234 match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
235 Err(e) => Err(e),
236 Ok(proc) => {
237 let cl = RpcClient { last_used: now, proc: proc.clone() };
238 e.insert(cl);
239 Ok(proc)
240 }
241 }
242 }
243 };
244 self.tasks.spawn(async move {
245 macro_rules! err {
246 ($e:expr) => {{
247 let e = format_compact!("{:?}", $e);
248 (id, Value::error(e.as_str()))
249 }};
250 }
251 match proc {
252 Err(e) => err!(e),
253 Ok(proc) => match proc.call(args).await {
254 Err(e) => err!(e),
255 Ok(res) => (id, res),
256 },
257 }
258 });
259 }
260
261 fn publish_rpc(
262 &mut self,
263 name: Path,
264 doc: Value,
265 spec: Vec<ArgSpec>,
266 id: BindId,
267 ) -> Result<()> {
268 use rpc::server::Proc;
269 let e = match self.published_rpcs.entry(name) {
270 Entry::Vacant(e) => e,
271 Entry::Occupied(_) => bail!("already published"),
272 };
273 let proc = Proc::new(
274 &self.publisher,
275 e.key().clone(),
276 doc,
277 spec,
278 move |c| Some((id, c)),
279 Some(self.rpcs_tx.clone()),
280 )?;
281 e.insert(proc);
282 Ok(())
283 }
284
285 fn unpublish_rpc(&mut self, name: Path) {
286 self.published_rpcs.remove(&name);
287 }
288
289 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
290 let dval =
291 self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
292 *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
293 dval
294 }
295
296 fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
297 if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
298 if let Some(cn) = exprs.get_mut(&ref_by) {
299 *cn -= 1;
300 if *cn == 0 {
301 exprs.remove(&ref_by);
302 }
303 }
304 if exprs.is_empty() {
305 self.subscribed.remove(&dv.id());
306 }
307 }
308 self.pending_unsubscribe.push_back((Instant::now(), dv));
309 }
310
311 fn list(&mut self, id: BindId, path: Path) {
312 let ct = self
313 .change_trackers
314 .entry(id)
315 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
316 let ct = Arc::clone(ct);
317 let resolver = self.subscriber.resolver();
318 self.tasks.spawn(async move {
319 check_changed!(id, resolver, path, ct);
320 let mut paths = or_err!(id, resolver.list(path).await);
321 let paths = paths.drain(..).map(|p| Value::String(p.into()));
322 (id, Value::Array(ValArray::from_iter_exact(paths)))
323 });
324 }
325
326 fn list_table(&mut self, id: BindId, path: Path) {
327 let ct = self
328 .change_trackers
329 .entry(id)
330 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
331 let ct = Arc::clone(ct);
332 let resolver = self.subscriber.resolver();
333 self.tasks.spawn(async move {
334 check_changed!(id, resolver, path, ct);
335 let mut tbl = or_err!(id, resolver.table(path).await);
336 let cols =
337 tbl.cols.drain(..).map(|(name, _count)| Value::String(name.into()));
338 let cols = Value::Array(ValArray::from_iter_exact(cols));
339 let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
340 let rows = Value::Array(ValArray::from_iter_exact(rows));
341 let tbl = Value::Array(ValArray::from([
342 Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
343 Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
344 ]));
345 (id, tbl)
346 });
347 }
348
349 fn stop_list(&mut self, id: BindId) {
350 self.change_trackers.remove(&id);
351 }
352
353 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
354 let val = self.publisher.publish_with_flags_and_writes(
355 PublishFlags::empty(),
356 path,
357 value,
358 Some(self.writes_tx.clone()),
359 )?;
360 let id = val.id();
361 *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
362 Ok(val)
363 }
364
365 fn update(&mut self, val: &Val, value: Value) {
366 val.update(&mut self.batch, value);
367 }
368
369 fn unpublish(&mut self, val: Val, ref_by: ExprId) {
370 if let Some(refs) = self.published.get_mut(&val.id()) {
371 if let Some(cn) = refs.get_mut(&ref_by) {
372 *cn -= 1;
373 if *cn == 0 {
374 refs.remove(&ref_by);
375 }
376 }
377 if refs.is_empty() {
378 self.published.remove(&val.id());
379 }
380 }
381 }
382
383 fn set_timer(&mut self, id: BindId, timeout: Duration) {
384 self.tasks.spawn(
385 time::sleep(timeout)
386 .map(move |()| (id, Value::DateTime(Arc::new(Utc::now())))),
387 );
388 }
389
390 fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
391 *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
392 }
393
394 fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
395 if let Some(refs) = self.by_ref.get_mut(&id) {
396 if let Some(cn) = refs.get_mut(&ref_by) {
397 *cn -= 1;
398 if *cn == 0 {
399 refs.remove(&ref_by);
400 }
401 }
402 if refs.is_empty() {
403 self.by_ref.remove(&id);
404 }
405 }
406 }
407
408 fn set_var(&mut self, id: BindId, value: Value) {
409 self.var_updates.push_back((id, value.clone()));
410 }
411
412 fn notify_set(&mut self, id: BindId) {
413 if let Some(refed) = self.by_ref.get(&id) {
414 for eid in refed.keys() {
415 self.updated.entry(*eid).or_default();
416 }
417 }
418 }
419
420 fn spawn<
421 F: Future<Output = (BindId, Box<dyn CustomBuiltinType>)> + Send + 'static,
422 >(
423 &mut self,
424 f: F,
425 ) -> Self::AbortHandle {
426 self.custom_tasks.spawn(f)
427 }
428
429 fn spawn_var<F: Future<Output = (BindId, Value)> + Send + 'static>(
430 &mut self,
431 f: F,
432 ) -> Self::AbortHandle {
433 self.tasks.spawn(f)
434 }
435
436 fn watch(
437 &mut self,
438 s: mpsc::Receiver<GPooled<Vec<(BindId, Box<dyn CustomBuiltinType>)>>>,
439 ) {
440 self.watches.push(s)
441 }
442
443 fn watch_var(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>) {
444 self.var_watches.push(s)
445 }
446}