1use crate::{GXExt, UpdateBatch, WriteBatch};
2use anyhow::{bail, Result};
3use arcstr::{literal, ArcStr};
4use chrono::prelude::*;
5use compact_str::format_compact;
6use futures::{channel::mpsc, FutureExt};
7use fxhash::FxHashMap;
8use graphix_compiler::{expr::ExprId, BindId, Rt};
9use netidx::{
10 path::Path,
11 protocol::valarray::ValArray,
12 publisher::{self, Id, PublishFlags, Publisher, Val, Value, WriteRequest},
13 resolver_client::ChangeTracker,
14 subscriber::{self, Dval, SubId, Subscriber, UpdatesFlags},
15};
16use netidx_protocols::rpc::{
17 self,
18 server::{ArgSpec, RpcCall},
19};
20use std::{
21 collections::{hash_map::Entry, HashMap, VecDeque},
22 future,
23 time::Duration,
24};
25use tokio::{
26 sync::Mutex,
27 task::JoinSet,
28 time::{self, Instant},
29};
30use triomphe::Arc;
31
32#[derive(Debug)]
33pub(super) struct RpcClient {
34 proc: rpc::client::Proc,
35 pub(super) last_used: Instant,
36}
37
38#[derive(Debug)]
39pub struct GXRt<X: GXExt> {
40 pub(super) by_ref: FxHashMap<BindId, FxHashMap<ExprId, usize>>,
41 pub(super) subscribed: FxHashMap<SubId, FxHashMap<ExprId, usize>>,
42 pub(super) published: FxHashMap<Id, FxHashMap<ExprId, usize>>,
43 pub(super) var_updates: VecDeque<(BindId, Value)>,
44 pub(super) net_updates: VecDeque<(SubId, subscriber::Event)>,
45 pub(super) net_writes: VecDeque<(Id, WriteRequest)>,
46 pub(super) rpc_overflow: VecDeque<(BindId, RpcCall)>,
47 pub(super) rpc_clients: FxHashMap<Path, RpcClient>,
48 pub(super) published_rpcs: FxHashMap<Path, rpc::server::Proc>,
49 pub(super) pending_unsubscribe: VecDeque<(Instant, Dval)>,
50 pub(super) change_trackers: FxHashMap<BindId, Arc<Mutex<ChangeTracker>>>,
51 pub(super) tasks: JoinSet<(BindId, Value)>,
52 pub(super) batch: publisher::UpdateBatch,
53 pub(super) publisher: Publisher,
54 pub(super) subscriber: Subscriber,
55 pub(super) updates_tx: mpsc::Sender<UpdateBatch>,
56 pub(super) updates: mpsc::Receiver<UpdateBatch>,
57 pub(super) writes_tx: mpsc::Sender<WriteBatch>,
58 pub(super) writes: mpsc::Receiver<WriteBatch>,
59 pub(super) rpcs_tx: mpsc::Sender<(BindId, RpcCall)>,
60 pub(super) rpcs: mpsc::Receiver<(BindId, RpcCall)>,
61 pub(super) updated: FxHashMap<ExprId, bool>,
62 pub ext: X,
63}
64
65impl<X: GXExt> GXRt<X> {
66 pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
67 let (updates_tx, updates) = mpsc::channel(100);
68 let (writes_tx, writes) = mpsc::channel(100);
69 let (rpcs_tx, rpcs) = mpsc::channel(100);
70 let batch = publisher.start_batch();
71 let mut tasks = JoinSet::new();
72 tasks.spawn(async { future::pending().await });
73 Self {
74 by_ref: HashMap::default(),
75 var_updates: VecDeque::new(),
76 net_updates: VecDeque::new(),
77 net_writes: VecDeque::new(),
78 rpc_overflow: VecDeque::new(),
79 rpc_clients: HashMap::default(),
80 subscribed: HashMap::default(),
81 pending_unsubscribe: VecDeque::new(),
82 published: HashMap::default(),
83 change_trackers: HashMap::default(),
84 published_rpcs: HashMap::default(),
85 updated: HashMap::default(),
86 ext: X::default(),
87 tasks,
88 batch,
89 publisher,
90 subscriber,
91 updates,
92 updates_tx,
93 writes,
94 writes_tx,
95 rpcs_tx,
96 rpcs,
97 }
98 }
99}
100
101macro_rules! or_err {
102 ($bindid:expr, $e:expr) => {
103 match $e {
104 Ok(v) => v,
105 Err(e) => {
106 let e = ArcStr::from(format_compact!("{e:?}").as_str());
107 let e = Value::Error(e);
108 return ($bindid, e);
109 }
110 }
111 };
112}
113
114macro_rules! check_changed {
115 ($id:expr, $resolver:expr, $path:expr, $ct:expr) => {
116 let mut ct = $ct.lock().await;
117 if ct.path() != &$path {
118 *ct = ChangeTracker::new($path.clone());
119 }
120 if !or_err!($id, $resolver.check_changed(&mut *ct).await) {
121 return ($id, Value::Null);
122 }
123 };
124}
125
126impl<X: GXExt> Rt for GXRt<X> {
127 fn clear(&mut self) {
128 let Self {
129 by_ref,
130 var_updates,
131 net_updates,
132 net_writes,
133 rpc_clients,
134 rpc_overflow,
135 subscribed,
136 published,
137 published_rpcs,
138 pending_unsubscribe,
139 change_trackers,
140 tasks,
141 batch,
142 publisher,
143 subscriber: _,
144 updates_tx,
145 updates,
146 writes_tx,
147 writes,
148 rpcs,
149 rpcs_tx,
150 updated,
151 ext,
152 } = self;
153 ext.clear();
154 updated.clear();
155 by_ref.clear();
156 var_updates.clear();
157 net_updates.clear();
158 net_writes.clear();
159 rpc_overflow.clear();
160 rpc_clients.clear();
161 subscribed.clear();
162 published.clear();
163 published_rpcs.clear();
164 pending_unsubscribe.clear();
165 change_trackers.clear();
166 *tasks = JoinSet::new();
167 tasks.spawn(async { future::pending().await });
168 *batch = publisher.start_batch();
169 let (tx, rx) = mpsc::channel(3);
170 *updates_tx = tx;
171 *updates = rx;
172 let (tx, rx) = mpsc::channel(100);
173 *writes_tx = tx;
174 *writes = rx;
175 let (tx, rx) = mpsc::channel(100);
176 *rpcs_tx = tx;
177 *rpcs = rx
178 }
179
180 fn call_rpc(&mut self, name: Path, args: Vec<(ArcStr, Value)>, id: BindId) {
181 let now = Instant::now();
182 let proc = match self.rpc_clients.entry(name) {
183 Entry::Occupied(mut e) => {
184 let cl = e.get_mut();
185 cl.last_used = now;
186 Ok(cl.proc.clone())
187 }
188 Entry::Vacant(e) => {
189 match rpc::client::Proc::new(&self.subscriber, e.key().clone()) {
190 Err(e) => Err(e),
191 Ok(proc) => {
192 let cl = RpcClient { last_used: now, proc: proc.clone() };
193 e.insert(cl);
194 Ok(proc)
195 }
196 }
197 }
198 };
199 self.tasks.spawn(async move {
200 macro_rules! err {
201 ($e:expr) => {{
202 let e = format_compact!("{:?}", $e);
203 (id, Value::Error(e.as_str().into()))
204 }};
205 }
206 match proc {
207 Err(e) => err!(e),
208 Ok(proc) => match proc.call(args).await {
209 Err(e) => err!(e),
210 Ok(res) => (id, res),
211 },
212 }
213 });
214 }
215
216 fn publish_rpc(
217 &mut self,
218 name: Path,
219 doc: Value,
220 spec: Vec<ArgSpec>,
221 id: BindId,
222 ) -> Result<()> {
223 use rpc::server::Proc;
224 let e = match self.published_rpcs.entry(name) {
225 Entry::Vacant(e) => e,
226 Entry::Occupied(_) => bail!("already published"),
227 };
228 let proc = Proc::new(
229 &self.publisher,
230 e.key().clone(),
231 doc,
232 spec,
233 move |c| Some((id, c)),
234 Some(self.rpcs_tx.clone()),
235 )?;
236 e.insert(proc);
237 Ok(())
238 }
239
240 fn unpublish_rpc(&mut self, name: Path) {
241 self.published_rpcs.remove(&name);
242 }
243
244 fn subscribe(&mut self, flags: UpdatesFlags, path: Path, ref_by: ExprId) -> Dval {
245 let dval =
246 self.subscriber.subscribe_updates(path, [(flags, self.updates_tx.clone())]);
247 *self.subscribed.entry(dval.id()).or_default().entry(ref_by).or_default() += 1;
248 dval
249 }
250
251 fn unsubscribe(&mut self, _path: Path, dv: Dval, ref_by: ExprId) {
252 if let Some(exprs) = self.subscribed.get_mut(&dv.id()) {
253 if let Some(cn) = exprs.get_mut(&ref_by) {
254 *cn -= 1;
255 if *cn == 0 {
256 exprs.remove(&ref_by);
257 }
258 }
259 if exprs.is_empty() {
260 self.subscribed.remove(&dv.id());
261 }
262 }
263 self.pending_unsubscribe.push_back((Instant::now(), dv));
264 }
265
266 fn list(&mut self, id: BindId, path: Path) {
267 let ct = self
268 .change_trackers
269 .entry(id)
270 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
271 let ct = Arc::clone(ct);
272 let resolver = self.subscriber.resolver();
273 self.tasks.spawn(async move {
274 check_changed!(id, resolver, path, ct);
275 let mut paths = or_err!(id, resolver.list(path).await);
276 let paths = paths.drain(..).map(|p| Value::String(p.into()));
277 (id, Value::Array(ValArray::from_iter_exact(paths)))
278 });
279 }
280
281 fn list_table(&mut self, id: BindId, path: Path) {
282 let ct = self
283 .change_trackers
284 .entry(id)
285 .or_insert_with(|| Arc::new(Mutex::new(ChangeTracker::new(path.clone()))));
286 let ct = Arc::clone(ct);
287 let resolver = self.subscriber.resolver();
288 self.tasks.spawn(async move {
289 check_changed!(id, resolver, path, ct);
290 let mut tbl = or_err!(id, resolver.table(path).await);
291 let cols = tbl.cols.drain(..).map(|(name, count)| {
292 Value::Array(ValArray::from([
293 Value::String(name.into()),
294 Value::V64(count.0),
295 ]))
296 });
297 let cols = Value::Array(ValArray::from_iter_exact(cols));
298 let rows = tbl.rows.drain(..).map(|name| Value::String(name.into()));
299 let rows = Value::Array(ValArray::from_iter_exact(rows));
300 let tbl = Value::Array(ValArray::from([
301 Value::Array(ValArray::from([Value::String(literal!("columns")), cols])),
302 Value::Array(ValArray::from([Value::String(literal!("rows")), rows])),
303 ]));
304 (id, tbl)
305 });
306 }
307
308 fn stop_list(&mut self, id: BindId) {
309 self.change_trackers.remove(&id);
310 }
311
312 fn publish(&mut self, path: Path, value: Value, ref_by: ExprId) -> Result<Val> {
313 let val = self.publisher.publish_with_flags_and_writes(
314 PublishFlags::empty(),
315 path,
316 value,
317 Some(self.writes_tx.clone()),
318 )?;
319 let id = val.id();
320 *self.published.entry(id).or_default().entry(ref_by).or_default() += 1;
321 Ok(val)
322 }
323
324 fn update(&mut self, val: &Val, value: Value) {
325 val.update(&mut self.batch, value);
326 }
327
328 fn unpublish(&mut self, val: Val, ref_by: ExprId) {
329 if let Some(refs) = self.published.get_mut(&val.id()) {
330 if let Some(cn) = refs.get_mut(&ref_by) {
331 *cn -= 1;
332 if *cn == 0 {
333 refs.remove(&ref_by);
334 }
335 }
336 if refs.is_empty() {
337 self.published.remove(&val.id());
338 }
339 }
340 }
341
342 fn set_timer(&mut self, id: BindId, timeout: Duration) {
343 self.tasks
344 .spawn(time::sleep(timeout).map(move |()| (id, Value::DateTime(Utc::now()))));
345 }
346
347 fn ref_var(&mut self, id: BindId, ref_by: ExprId) {
348 *self.by_ref.entry(id).or_default().entry(ref_by).or_default() += 1;
349 }
350
351 fn unref_var(&mut self, id: BindId, ref_by: ExprId) {
352 if let Some(refs) = self.by_ref.get_mut(&id) {
353 if let Some(cn) = refs.get_mut(&ref_by) {
354 *cn -= 1;
355 if *cn == 0 {
356 refs.remove(&ref_by);
357 }
358 }
359 if refs.is_empty() {
360 self.by_ref.remove(&id);
361 }
362 }
363 }
364
365 fn set_var(&mut self, id: BindId, value: Value) {
366 self.var_updates.push_back((id, value.clone()));
367 }
368
369 fn notify_set(&mut self, id: BindId) {
370 if let Some(refed) = self.by_ref.get(&id) {
371 for eid in refed.keys() {
372 self.updated.entry(*eid).or_default();
373 }
374 }
375 }
376}