1use crate::{GXExt, UpdateBatch, WriteBatch};
2use anyhow::{bail, Result};
3use arcstr::{literal, ArcStr};
4use chrono::prelude::*;
5use compact_str::format_compact;
6use futures::{channel::mpsc, stream::SelectAll, FutureExt};
7use fxhash::FxHashMap;
8use graphix_compiler::{expr::ExprId, BindId, Rt};
9use netidx::{
10 path::Path,
11 protocol::valarray::ValArray,
12 publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
13 resolver_client::ChangeTracker,
14 subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
15};
16use netidx_protocols::rpc::{
17 self,
18 server::{ArgSpec, RpcCall},
19};
20use poolshark::global::GPooled;
21use std::{
22 collections::{hash_map::Entry, HashMap, VecDeque},
23 fmt::Debug,
24 future,
25 time::Duration,
26};
27use tokio::{
28 sync::Mutex,
29 task::{self, JoinSet},
30 time::{self, Instant},
31};
32use triomphe::Arc;
33
34#[derive(Debug)]
35pub(super) struct RpcClient {
36 proc: rpc::client::Proc,
37 pub(super) last_used: Instant,
38}
39
40#[derive(Debug)]
41pub struct GXRt<X: GXExt> {
42 pub(super) by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
43 pub(super) subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
44 pub(super) published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
45 pub(super) var_updates: VecDeque<(BindId, Value)>,
46 pub(super) net_updates: VecDeque<(SubId, subscriber::Event)>,
47 pub(super) net_writes: VecDeque<(Id, WriteRequest)>,
48 pub(super) rpc_overflow: VecDeque<(BindId, RpcCall)>,
49 pub(super) rpc_clients: FxHashMap<Path, RpcClient>,
50 pub(super) published_rpcs: FxHashMap<Path, rpc::server::Proc>,
51 pub(super) pending_unsubscribe: VecDeque<(Instant, Dval)>,
52 pub(super) change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
53 pub(super) tasks: JoinSet<(BindId, Value)>,
54 pub(super) watches: SelectAll<mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>>,
55 dummy_watch_tx: mpsc::Sender<GPooled<Vec<(BindId, Value)>>>,
57 pub(super) batch: publisher::UpdateBatch,
58 pub(super) publisher: Publisher,
59 pub(super) subscriber: Subscriber,
60 pub(super) updates_tx: mpsc::Sender<UpdateBatch>,
61 pub(super) updates: mpsc::Receiver<UpdateBatch>,
62 pub(super) writes_tx: mpsc::Sender<WriteBatch>,
63 pub(super) writes: mpsc::Receiver<WriteBatch>,
64 pub(super) rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
65 pub(super) rpcs: mpsc::Receiver<(BindId, RpcCall)>,
66 pub(super) updated: FxHashMap<ExprId, bool>,
67 pub ext: X,
68}
69
70impl<X: GXExt> GXRt<X> {
71 pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
72 let (updates_tx, updates) = mpsc::channel(100);
73 let (writes_tx, writes) = mpsc::channel(100);
74 let (rpcs_tx, rpcs) = mpsc::channel(100);
75 let batch = publisher.start_batch();
76 let mut tasks = JoinSet::new();
77 tasks.spawn(async { future::pending().await });
78 let (dummy_watch_tx, dummy_rx) = mpsc::channel(1);
79 let mut watches = SelectAll::new();
80 watches.push(dummy_rx);
81 Self {
82 by_ref: HashMap::default(),
83 var_updates: VecDeque::new(),
84 net_updates: VecDeque::new(),
85 net_writes: VecDeque::new(),
86 rpc_overflow: VecDeque::new(),
87 rpc_clients: HashMap::default(),
88 subscribed: HashMap::default(),
89 pending_unsubscribe: VecDeque::new(),
90 published: HashMap::default(),
91 change_trackers: HashMap::default(),
92 published_rpcs: HashMap::default(),
93 updated: HashMap::default(),
94 ext: X::default(),
95 tasks,
96 watches,
97 dummy_watch_tx,
98 batch,
99 publisher,
100 subscriber,
101 updates,
102 updates_tx,
103 writes,
104 writes_tx,
105 rpcs_tx,
106 rpcs,
107 }
108 }
109}
110
111macro_rules! or_err {
112 ($bindid:expr, $e:expr) => {
113 match $e {
114 Ok(v) => v,
115 Err(e) => {
116 let e = ArcStr::from(format_compact!("{e:?}").as_str());
117 let e = Value::Error(Arc::new(Value::String(e)));
118 return ($bindid, e);
119 }
120 }
121 };
122}
123
124macro_rules! check_changed {
125 ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
126 let mut ct = $ct.lock().await;
127 if ct.path() != &$path {
128 *ct = ChangeTracker::new($path.clone());
129 }
130 if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
131 return ($id, Value::Null);
132 }
133 };
134}
135
136impl<X: GXExt> Rt for GXRt<X> {
137 type AbortHandle = task::AbortHandle;
138
139 fn clear(&mut self) {
140 let Self {
141 by_ref,
142 var_updates,
143 net_updates,
144 net_writes,
145 rpc_clients,
146 rpc_overflow,
147 subscribed,
148 published,
149 published_rpcs,
150 pending_unsubscribe,
151 change_trackers,
152 tasks,
153 watches,
154 dummy_watch_tx,
155 batch,
156 publisher,
157 subscriber: _,
158 updates_tx,
159 updates,
160 writes_tx,
161 writes,
162 rpcs,
163 rpcs_tx,
164 updated,
165 ext,
166 } = self;
167 ext.clear();
168 updated.clear();
169 by_ref.clear();
170 var_updates.clear();
171 net_updates.clear();
172 net_writes.clear();
173 rpc_overflow.clear();
174 rpc_clients.clear();
175 subscribed.clear();
176 published.clear();
177 published_rpcs.clear();
178 pending_unsubscribe.clear();
179 change_trackers.clear();
180 *tasks = JoinSet::new();
181 tasks.spawn(async { future::pending().await });
182 *watches = SelectAll::new();
183 let (tx, rx) = mpsc::channel(1);
184 *dummy_watch_tx = tx;
185 watches.push(rx);
186 *batch = publisher.start_batch();
187 let (tx, rx) = mpsc::channel(3);
188 *updates_tx = tx;
189 *updates = rx;
190 let (tx, rx) = mpsc::channel(100);
191 *writes_tx = tx;
192 *writes = rx;
193 let (tx, rx) = mpsc::channel(100);
194 *rpcs_tx = tx;
195 *rpcs = rx
196 }
197
198 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
199 let now = Instant::now();
200 let proc = match self.rpc_clients.entry(name) {
201 Entry::Occupied(mut e) => {
202 let cl = e.get_mut();
203 cl.last_used = now;
204 Ok(cl.proc.clone())
205 }
206 Entry::Vacant(e) => {
207 match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
208 Err(e) => Err(e),
209 Ok(proc) => {
210 let cl = RpcClient { last_used: now, proc: proc.clone() };
211 e.insert(cl);
212 Ok(proc)
213 }
214 }
215 }
216 };
217 self.tasks.spawn(async move {
218 macro_rules! err {
219 ($e:expr) => {{
220 let e = format_compact!("{:?}", $e);
221 (id, Value::error(e.as_str()))
222 }};
223 }
224 match proc {
225 Err(e) => err!(e),
226 Ok(proc) => match proc.call(args).await {
227 Err(e) => err!(e),
228 Ok(res) => (id, res),
229 },
230 }
231 });
232 }
233
234 fn publish_rpc(
235 &mut self,
236 name: Path,
237 doc: Value,
238 spec: Vec<ArgSpec>,
239 id: BindId,
240 ) -> Result<()> {
241 use rpc::server::Proc;
242 let e = match self.published_rpcs.entry(name) {
243 Entry::Vacant(e) => e,
244 Entry::Occupied(_) => bail!("already published"),
245 };
246 let proc = Proc::new(
247 &self.publisher,
248 e.key().clone(),
249 doc,
250 spec,
251 move |c| Some((id, c)),
252 Some(self.rpcs_tx.clone()),
253 )?;
254 e.insert(proc);
255 Ok(())
256 }
257
258 fn unpublish_rpc(&mut self, name: Path) {
259 self.published_rpcs.remove(&name);
260 }
261
262 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
263 let dval =
264 self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
265 *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
266 dval
267 }
268
269 fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
270 if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
271 if let Some(cn) = exprs.get_mut(&ref_by) {
272 *cn -= 1;
273 if *cn == 0 {
274 exprs.remove(&ref_by);
275 }
276 }
277 if exprs.is_empty() {
278 self.subscribed.remove(&dv.id());
279 }
280 }
281 self.pending_unsubscribe.push_back((Instant::now(), dv));
282 }
283
284 fn list(&mut self, id: BindId, path: Path) {
285 let ct = self
286 .change_trackers
287 .entry(id)
288 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
289 let ct = Arc::clone(ct);
290 let resolver = self.subscriber.resolver();
291 self.tasks.spawn(async move {
292 check_changed!(id, resolver, path, ct);
293 let mut paths = or_err!(id, resolver.list(path).await);
294 let paths = paths.drain(..).map(|p| Value::String(p.into()));
295 (id, Value::Array(ValArray::from_iter_exact(paths)))
296 });
297 }
298
299 fn list_table(&mut self, id: BindId, path: Path) {
300 let ct = self
301 .change_trackers
302 .entry(id)
303 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
304 let ct = Arc::clone(ct);
305 let resolver = self.subscriber.resolver();
306 self.tasks.spawn(async move {
307 check_changed!(id, resolver, path, ct);
308 let mut tbl = or_err!(id, resolver.table(path).await);
309 let cols = tbl.cols.drain(..).map(|(name, count)| {
310 Value::Array(ValArray::from([
311 Value::String(name.into()),
312 Value::V64(count.0),
313 ]))
314 });
315 let cols = Value::Array(ValArray::from_iter_exact(cols));
316 let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
317 let rows = Value::Array(ValArray::from_iter_exact(rows));
318 let tbl = Value::Array(ValArray::from([
319 Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
320 Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
321 ]));
322 (id, tbl)
323 });
324 }
325
326 fn stop_list(&mut self, id: BindId) {
327 self.change_trackers.remove(&id);
328 }
329
330 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
331 let val = self.publisher.publish_with_flags_and_writes(
332 PublishFlags::empty(),
333 path,
334 value,
335 Some(self.writes_tx.clone()),
336 )?;
337 let id = val.id();
338 *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
339 Ok(val)
340 }
341
342 fn update(&mut self, val: &Val, value: Value) {
343 val.update(&mut self.batch, value);
344 }
345
346 fn unpublish(&mut self, val: Val, ref_by: ExprId) {
347 if let Some(refs) = self.published.get_mut(&val.id()) {
348 if let Some(cn) = refs.get_mut(&ref_by) {
349 *cn -= 1;
350 if *cn == 0 {
351 refs.remove(&ref_by);
352 }
353 }
354 if refs.is_empty() {
355 self.published.remove(&val.id());
356 }
357 }
358 }
359
360 fn set_timer(&mut self, id: BindId, timeout: Duration) {
361 self.tasks.spawn(
362 time::sleep(timeout)
363 .map(move |()| (id, Value::DateTime(Arc::new(Utc::now())))),
364 );
365 }
366
367 fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
368 *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
369 }
370
371 fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
372 if let Some(refs) = self.by_ref.get_mut(&id) {
373 if let Some(cn) = refs.get_mut(&ref_by) {
374 *cn -= 1;
375 if *cn == 0 {
376 refs.remove(&ref_by);
377 }
378 }
379 if refs.is_empty() {
380 self.by_ref.remove(&id);
381 }
382 }
383 }
384
385 fn set_var(&mut self, id: BindId, value: Value) {
386 self.var_updates.push_back((id, value.clone()));
387 }
388
389 fn notify_set(&mut self, id: BindId) {
390 if let Some(refed) = self.by_ref.get(&id) {
391 for eid in refed.keys() {
392 self.updated.entry(*eid).or_default();
393 }
394 }
395 }
396
397 fn spawn<F: Future<Output = (BindId, Value)> + Send + 'static>(
398 &mut self,
399 f: F,
400 ) -> Self::AbortHandle {
401 self.tasks.spawn(f)
402 }
403
404 fn watch(&mut self, s: mpsc::Receiver<GPooled<Vec<(BindId, Value)>>>) {
405 self.watches.push(s)
406 }
407}