1pub use crate::stdfn::{RpcCallId, TimerId};
2use crate::{
3 expr::{Expr, ExprId, ExprKind},
4 stdfn,
5};
6use arcstr::ArcStr;
7use chrono::prelude::*;
8use fxhash::{FxBuildHasher, FxHashMap};
9use netidx::{
10 chars::Chars,
11 path::Path,
12 subscriber::{Dval, SubId, UpdatesFlags, Value},
13};
14use std::{
15 collections::{HashMap, VecDeque},
16 fmt,
17 sync::{Arc, Weak},
18 time::Duration,
19};
20
21pub struct DbgCtx<E> {
22 pub trace: bool,
23 events: VecDeque<(ExprId, (DateTime<Local>, Option<Event<E>>, Value))>,
24 watch: HashMap<
25 ExprId,
26 Vec<Weak<dyn Fn(&DateTime<Local>, &Option<Event<E>>, &Value) + Send + Sync>>,
27 FxBuildHasher,
28 >,
29 current: HashMap<ExprId, (Option<Event<E>>, Value), FxBuildHasher>,
30}
31
32impl<E: Clone> DbgCtx<E> {
33 fn new() -> Self {
34 DbgCtx {
35 trace: false,
36 events: VecDeque::new(),
37 watch: HashMap::with_hasher(FxBuildHasher::default()),
38 current: HashMap::with_hasher(FxBuildHasher::default()),
39 }
40 }
41
42 pub fn iter_events(
43 &self,
44 ) -> impl Iterator<Item = &(ExprId, (DateTime<Local>, Option<Event<E>>, Value))> {
45 self.events.iter()
46 }
47
48 pub fn get_current(&self, id: &ExprId) -> Option<&(Option<Event<E>>, Value)> {
49 self.current.get(id)
50 }
51
52 pub fn add_watch(
53 &mut self,
54 id: ExprId,
55 watch: &Arc<dyn Fn(&DateTime<Local>, &Option<Event<E>>, &Value) + Send + Sync>,
56 ) {
57 let watches = self.watch.entry(id).or_insert_with(Vec::new);
58 watches.push(Arc::downgrade(watch));
59 }
60
61 pub fn add_event(&mut self, id: ExprId, event: Option<Event<E>>, value: Value) {
62 const MAX: usize = 1000;
63 let now = Local::now();
64 if let Some(watch) = self.watch.get_mut(&id) {
65 let mut i = 0;
66 while i < watch.len() {
67 match Weak::upgrade(&watch[i]) {
68 None => {
69 watch.remove(i);
70 }
71 Some(f) => {
72 f(&now, &event, &value);
73 i += 1;
74 }
75 }
76 }
77 }
78 self.events.push_back((id, (now, event.clone(), value.clone())));
79 self.current.insert(id, (event, value));
80 if self.events.len() > MAX {
81 self.events.pop_front();
82 if self.watch.len() > MAX {
83 self.watch.retain(|_, vs| {
84 vs.retain(|v| Weak::upgrade(v).is_some());
85 !vs.is_empty()
86 });
87 }
88 }
89 }
90
91 pub fn clear(&mut self) {
92 self.events.clear();
93 self.current.clear();
94 self.watch.retain(|_, v| {
95 v.retain(|w| Weak::strong_count(w) > 0);
96 v.len() > 0
97 });
98 }
99}
100
101#[derive(Clone, Debug)]
102pub enum Event<E> {
103 Variable(Path, Chars, Value),
104 Netidx(SubId, Value),
105 Rpc(RpcCallId, Value),
106 Timer(TimerId),
107 User(E),
108}
109
110pub type InitFn<C, E> = Arc<
111 dyn Fn(
112 &mut ExecCtx<C, E>,
113 &[Node<C, E>],
114 Path,
115 ExprId,
116 ) -> Box<dyn Apply<C, E> + Send + Sync>
117 + Send
118 + Sync,
119>;
120
121pub trait Register<C: Ctx, E> {
122 fn register(ctx: &mut ExecCtx<C, E>);
123}
124
125pub trait Apply<C: Ctx, E> {
126 fn current(&self, ctx: &mut ExecCtx<C, E>) -> Option<Value>;
127 fn update(
128 &mut self,
129 ctx: &mut ExecCtx<C, E>,
130 from: &mut [Node<C, E>],
131 event: &Event<E>,
132 ) -> Option<Value>;
133}
134
135pub trait Ctx {
136 fn clear(&mut self);
137 fn durable_subscribe(
138 &mut self,
139 flags: UpdatesFlags,
140 path: Path,
141 ref_by: ExprId,
142 ) -> Dval;
143 fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
144 fn ref_var(&mut self, name: Chars, scope: Path, ref_by: ExprId);
145 fn unref_var(&mut self, name: Chars, scope: Path, ref_by: ExprId);
146 fn register_fn(&mut self, name: Chars, scope: Path);
147 fn set_var(
148 &mut self,
149 variables: &mut FxHashMap<Path, FxHashMap<Chars, Value>>,
150 local: bool,
151 scope: Path,
152 name: Chars,
153 value: Value,
154 );
155
156 fn call_rpc(
160 &mut self,
161 name: Path,
162 args: Vec<(Chars, Value)>,
163 ref_by: ExprId,
164 id: RpcCallId,
165 );
166
167 fn set_timer(&mut self, id: TimerId, timeout: Duration, ref_by: ExprId);
169}
170
171pub fn store_var(
172 variables: &mut FxHashMap<Path, FxHashMap<Chars, Value>>,
173 local: bool,
174 scope: &Path,
175 name: &Chars,
176 value: Value,
177) -> (bool, Path) {
178 if local {
179 let mut new = false;
180 variables
181 .entry(scope.clone())
182 .or_insert_with(|| {
183 new = true;
184 HashMap::with_hasher(FxBuildHasher::default())
185 })
186 .insert(name.clone(), value);
187 (new, scope.clone())
188 } else {
189 let mut iter = Path::dirnames(scope);
190 loop {
191 match iter.next_back() {
192 Some(scope) => {
193 if let Some(vars) = variables.get_mut(scope) {
194 if let Some(var) = vars.get_mut(name) {
195 *var = value;
196 break (false, Path::from(ArcStr::from(scope)));
197 }
198 }
199 }
200 None => break store_var(variables, true, &Path::root(), name, value),
201 }
202 }
203 }
204}
205
206pub struct ExecCtx<C: Ctx + 'static, E: 'static> {
207 pub functions: FxHashMap<String, InitFn<C, E>>,
208 pub variables: FxHashMap<Path, FxHashMap<Chars, Value>>,
209 pub dbg_ctx: DbgCtx<E>,
210 pub user: C,
211}
212
213impl<C: Ctx, E: Clone> ExecCtx<C, E> {
214 pub fn lookup_var(&self, scope: &Path, name: &Chars) -> Option<(&Path, &Value)> {
215 let mut iter = Path::dirnames(scope);
216 loop {
217 match iter.next_back() {
218 Some(scope) => {
219 if let Some((scope, vars)) = self.variables.get_key_value(scope) {
220 if let Some(var) = vars.get(name) {
221 break Some((scope, var));
222 }
223 }
224 }
225 None => break None,
226 }
227 }
228 }
229
230 pub fn clear(&mut self) {
231 self.variables.clear();
232 self.dbg_ctx.clear();
233 self.user.clear();
234 }
235
236 pub fn no_std(user: C) -> Self {
237 ExecCtx {
238 functions: HashMap::with_hasher(FxBuildHasher::default()),
239 variables: HashMap::with_hasher(FxBuildHasher::default()),
240 dbg_ctx: DbgCtx::new(),
241 user,
242 }
243 }
244
245 pub fn new(user: C) -> Self {
246 let mut t = ExecCtx::no_std(user);
247 stdfn::AfterIdle::register(&mut t);
248 stdfn::All::register(&mut t);
249 stdfn::And::register(&mut t);
250 stdfn::Any::register(&mut t);
251 stdfn::Array::register(&mut t);
252 stdfn::Basename::register(&mut t);
253 stdfn::Cast::register(&mut t);
254 stdfn::Cmp::register(&mut t);
255 stdfn::Contains::register(&mut t);
256 stdfn::Count::register(&mut t);
257 stdfn::Dirname::register(&mut t);
258 stdfn::Divide::register(&mut t);
259 stdfn::Do::register(&mut t);
260 stdfn::EndsWith::register(&mut t);
261 stdfn::Eval::register(&mut t);
262 stdfn::FilterErr::register(&mut t);
263 stdfn::Filter::register(&mut t);
264 stdfn::Get::register(&mut t);
265 stdfn::If::register(&mut t);
266 stdfn::Index::register(&mut t);
267 stdfn::Isa::register(&mut t);
268 stdfn::IsErr::register(&mut t);
269 stdfn::Load::register(&mut t);
270 stdfn::Max::register(&mut t);
271 stdfn::Mean::register(&mut t);
272 stdfn::Min::register(&mut t);
273 stdfn::Not::register(&mut t);
274 stdfn::Once::register(&mut t);
275 stdfn::Or::register(&mut t);
276 stdfn::Product::register(&mut t);
277 stdfn::Replace::register(&mut t);
278 stdfn::RpcCall::register(&mut t);
279 stdfn::Sample::register(&mut t);
280 stdfn::Set::register(&mut t);
281 stdfn::StartsWith::register(&mut t);
282 stdfn::Store::register(&mut t);
283 stdfn::StringConcat::register(&mut t);
284 stdfn::StringJoin::register(&mut t);
285 stdfn::StripPrefix::register(&mut t);
286 stdfn::StripSuffix::register(&mut t);
287 stdfn::Sum::register(&mut t);
288 stdfn::Timer::register(&mut t);
289 stdfn::TrimEnd::register(&mut t);
290 stdfn::Trim::register(&mut t);
291 stdfn::TrimStart::register(&mut t);
292 stdfn::Uniq::register(&mut t);
293 t
294 }
295}
296
297pub enum Node<C: Ctx, E> {
298 Error(Expr, Value),
299 Constant(Expr, Value),
300 Apply {
301 spec: Expr,
302 args: Vec<Node<C, E>>,
303 function: Box<dyn Apply<C, E> + Send + Sync>,
304 },
305}
306
307impl<C: Ctx, E> fmt::Display for Node<C, E> {
308 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
309 match self {
310 Node::Error(s, _) | Node::Constant(s, _) | Node::Apply { spec: s, .. } => {
311 write!(f, "{}", s)
312 }
313 }
314 }
315}
316
317impl<C: Ctx, E: Clone> Node<C, E> {
318 pub fn compile_int(
319 ctx: &mut ExecCtx<C, E>,
320 spec: Expr,
321 scope: Path,
322 top_id: ExprId,
323 ) -> Self {
324 match &spec {
325 Expr { kind: ExprKind::Constant(v), id: _ } => {
326 Node::Constant(spec.clone(), v.clone())
327 }
328 Expr { kind: ExprKind::Apply { args, function }, id } => {
329 let scope = if function == "do" && id != &top_id {
330 scope.append(&format!("do{:?}", id))
331 } else {
332 scope
333 };
334 let args: Vec<Node<C, E>> = args
335 .iter()
336 .map(|spec| {
337 Node::compile_int(ctx, spec.clone(), scope.clone(), top_id)
338 })
339 .collect();
340 match ctx.functions.get(function).map(Arc::clone) {
341 None => {
342 let e = Value::Error(Chars::from(format!(
343 "unknown function {}",
344 function
345 )));
346 Node::Error(spec.clone(), e)
347 }
348 Some(init) => {
349 let function = init(ctx, &args, scope, top_id);
350 Node::Apply { spec, args, function }
351 }
352 }
353 }
354 }
355 }
356
357 pub fn compile(ctx: &mut ExecCtx<C, E>, scope: Path, spec: Expr) -> Self {
358 let top_id = spec.id;
359 Self::compile_int(ctx, spec, scope, top_id)
360 }
361
362 pub fn current(&self, ctx: &mut ExecCtx<C, E>) -> Option<Value> {
363 let (id, res) = match self {
364 Node::Error(spec, v) => (spec.id, Some(v.clone())),
365 Node::Constant(spec, v) => (spec.id, Some(v.clone())),
366 Node::Apply { spec, function, .. } => (spec.id, function.current(ctx)),
367 };
368 if ctx.dbg_ctx.trace {
369 if let Some(v) = &res {
370 ctx.dbg_ctx.add_event(id, None, v.clone());
371 }
372 }
373 res
374 }
375
376 pub fn update(&mut self, ctx: &mut ExecCtx<C, E>, event: &Event<E>) -> Option<Value> {
377 match self {
378 Node::Error(_, _) | Node::Constant(_, _) => None,
379 Node::Apply { spec, args, function } => {
380 let res = function.update(ctx, args, event);
381 if ctx.dbg_ctx.trace {
382 if let Some(v) = &res {
383 ctx.dbg_ctx.add_event(spec.id, Some(event.clone()), v.clone());
384 }
385 }
386 res
387 }
388 }
389 }
390}