netidx_bscript/
vm.rs

1pub use crate::stdfn::{RpcCallId, TimerId};
2use crate::{
3    expr::{Expr, ExprId, ExprKind},
4    stdfn,
5};
6use arcstr::ArcStr;
7use chrono::prelude::*;
8use fxhash::{FxBuildHasher, FxHashMap};
9use netidx::{
10    chars::Chars,
11    path::Path,
12    subscriber::{Dval, SubId, UpdatesFlags, Value},
13};
14use std::{
15    collections::{HashMap, VecDeque},
16    fmt,
17    sync::{Arc, Weak},
18    time::Duration,
19};
20
21pub struct DbgCtx<E> {
22    pub trace: bool,
23    events: VecDeque<(ExprId, (DateTime<Local>, Option<Event<E>>, Value))>,
24    watch: HashMap<
25        ExprId,
26        Vec<Weak<dyn Fn(&DateTime<Local>, &Option<Event<E>>, &Value) + Send + Sync>>,
27        FxBuildHasher,
28    >,
29    current: HashMap<ExprId, (Option<Event<E>>, Value), FxBuildHasher>,
30}
31
32impl<E: Clone> DbgCtx<E> {
33    fn new() -> Self {
34        DbgCtx {
35            trace: false,
36            events: VecDeque::new(),
37            watch: HashMap::with_hasher(FxBuildHasher::default()),
38            current: HashMap::with_hasher(FxBuildHasher::default()),
39        }
40    }
41
42    pub fn iter_events(
43        &self,
44    ) -> impl Iterator<Item = &(ExprId, (DateTime<Local>, Option<Event<E>>, Value))> {
45        self.events.iter()
46    }
47
48    pub fn get_current(&self, id: &ExprId) -> Option<&(Option<Event<E>>, Value)> {
49        self.current.get(id)
50    }
51
52    pub fn add_watch(
53        &mut self,
54        id: ExprId,
55        watch: &Arc<dyn Fn(&DateTime<Local>, &Option<Event<E>>, &Value) + Send + Sync>,
56    ) {
57        let watches = self.watch.entry(id).or_insert_with(Vec::new);
58        watches.push(Arc::downgrade(watch));
59    }
60
61    pub fn add_event(&mut self, id: ExprId, event: Option<Event<E>>, value: Value) {
62        const MAX: usize = 1000;
63        let now = Local::now();
64        if let Some(watch) = self.watch.get_mut(&id) {
65            let mut i = 0;
66            while i < watch.len() {
67                match Weak::upgrade(&watch[i]) {
68                    None => {
69                        watch.remove(i);
70                    }
71                    Some(f) => {
72                        f(&now, &event, &value);
73                        i += 1;
74                    }
75                }
76            }
77        }
78        self.events.push_back((id, (now, event.clone(), value.clone())));
79        self.current.insert(id, (event, value));
80        if self.events.len() > MAX {
81            self.events.pop_front();
82            if self.watch.len() > MAX {
83                self.watch.retain(|_, vs| {
84                    vs.retain(|v| Weak::upgrade(v).is_some());
85                    !vs.is_empty()
86                });
87            }
88        }
89    }
90
91    pub fn clear(&mut self) {
92        self.events.clear();
93        self.current.clear();
94        self.watch.retain(|_, v| {
95            v.retain(|w| Weak::strong_count(w) > 0);
96            v.len() > 0
97        });
98    }
99}
100
101#[derive(Clone, Debug)]
102pub enum Event<E> {
103    Variable(Path, Chars, Value),
104    Netidx(SubId, Value),
105    Rpc(RpcCallId, Value),
106    Timer(TimerId),
107    User(E),
108}
109
110pub type InitFn<C, E> = Arc<
111    dyn Fn(
112            &mut ExecCtx<C, E>,
113            &[Node<C, E>],
114            Path,
115            ExprId,
116        ) -> Box<dyn Apply<C, E> + Send + Sync>
117        + Send
118        + Sync,
119>;
120
121pub trait Register<C: Ctx, E> {
122    fn register(ctx: &mut ExecCtx<C, E>);
123}
124
125pub trait Apply<C: Ctx, E> {
126    fn current(&self, ctx: &mut ExecCtx<C, E>) -> Option<Value>;
127    fn update(
128        &mut self,
129        ctx: &mut ExecCtx<C, E>,
130        from: &mut [Node<C, E>],
131        event: &Event<E>,
132    ) -> Option<Value>;
133}
134
135pub trait Ctx {
136    fn clear(&mut self);
137    fn durable_subscribe(
138        &mut self,
139        flags: UpdatesFlags,
140        path: Path,
141        ref_by: ExprId,
142    ) -> Dval;
143    fn unsubscribe(&mut self, path: Path, dv: Dval, ref_by: ExprId);
144    fn ref_var(&mut self, name: Chars, scope: Path, ref_by: ExprId);
145    fn unref_var(&mut self, name: Chars, scope: Path, ref_by: ExprId);
146    fn register_fn(&mut self, name: Chars, scope: Path);
147    fn set_var(
148        &mut self,
149        variables: &mut FxHashMap<Path, FxHashMap<Chars, Value>>,
150        local: bool,
151        scope: Path,
152        name: Chars,
153        value: Value,
154    );
155
156    /// For a given name, this must have at most one outstanding call
157    /// at a time, and must preserve the order of the calls. Calls to
158    /// different names may execute concurrently.
159    fn call_rpc(
160        &mut self,
161        name: Path,
162        args: Vec<(Chars, Value)>,
163        ref_by: ExprId,
164        id: RpcCallId,
165    );
166
167    /// arrange to have a Timer event delivered after timeout
168    fn set_timer(&mut self, id: TimerId, timeout: Duration, ref_by: ExprId);
169}
170
171pub fn store_var(
172    variables: &mut FxHashMap<Path, FxHashMap<Chars, Value>>,
173    local: bool,
174    scope: &Path,
175    name: &Chars,
176    value: Value,
177) -> (bool, Path) {
178    if local {
179        let mut new = false;
180        variables
181            .entry(scope.clone())
182            .or_insert_with(|| {
183                new = true;
184                HashMap::with_hasher(FxBuildHasher::default())
185            })
186            .insert(name.clone(), value);
187        (new, scope.clone())
188    } else {
189        let mut iter = Path::dirnames(scope);
190        loop {
191            match iter.next_back() {
192                Some(scope) => {
193                    if let Some(vars) = variables.get_mut(scope) {
194                        if let Some(var) = vars.get_mut(name) {
195                            *var = value;
196                            break (false, Path::from(ArcStr::from(scope)));
197                        }
198                    }
199                }
200                None => break store_var(variables, true, &Path::root(), name, value),
201            }
202        }
203    }
204}
205
206pub struct ExecCtx<C: Ctx + 'static, E: 'static> {
207    pub functions: FxHashMap<String, InitFn<C, E>>,
208    pub variables: FxHashMap<Path, FxHashMap<Chars, Value>>,
209    pub dbg_ctx: DbgCtx<E>,
210    pub user: C,
211}
212
213impl<C: Ctx, E: Clone> ExecCtx<C, E> {
214    pub fn lookup_var(&self, scope: &Path, name: &Chars) -> Option<(&Path, &Value)> {
215        let mut iter = Path::dirnames(scope);
216        loop {
217            match iter.next_back() {
218                Some(scope) => {
219                    if let Some((scope, vars)) = self.variables.get_key_value(scope) {
220                        if let Some(var) = vars.get(name) {
221                            break Some((scope, var));
222                        }
223                    }
224                }
225                None => break None,
226            }
227        }
228    }
229
230    pub fn clear(&mut self) {
231        self.variables.clear();
232        self.dbg_ctx.clear();
233        self.user.clear();
234    }
235
236    pub fn no_std(user: C) -> Self {
237        ExecCtx {
238            functions: HashMap::with_hasher(FxBuildHasher::default()),
239            variables: HashMap::with_hasher(FxBuildHasher::default()),
240            dbg_ctx: DbgCtx::new(),
241            user,
242        }
243    }
244
245    pub fn new(user: C) -> Self {
246        let mut t = ExecCtx::no_std(user);
247        stdfn::AfterIdle::register(&mut t);
248        stdfn::All::register(&mut t);
249        stdfn::And::register(&mut t);
250        stdfn::Any::register(&mut t);
251        stdfn::Array::register(&mut t);
252        stdfn::Basename::register(&mut t);
253        stdfn::Cast::register(&mut t);
254        stdfn::Cmp::register(&mut t);
255        stdfn::Contains::register(&mut t);
256        stdfn::Count::register(&mut t);
257        stdfn::Dirname::register(&mut t);
258        stdfn::Divide::register(&mut t);
259        stdfn::Do::register(&mut t);
260        stdfn::EndsWith::register(&mut t);
261        stdfn::Eval::register(&mut t);
262        stdfn::FilterErr::register(&mut t);
263        stdfn::Filter::register(&mut t);
264        stdfn::Get::register(&mut t);
265        stdfn::If::register(&mut t);
266        stdfn::Index::register(&mut t);
267        stdfn::Isa::register(&mut t);
268        stdfn::IsErr::register(&mut t);
269        stdfn::Load::register(&mut t);
270        stdfn::Max::register(&mut t);
271        stdfn::Mean::register(&mut t);
272        stdfn::Min::register(&mut t);
273        stdfn::Not::register(&mut t);
274        stdfn::Once::register(&mut t);
275        stdfn::Or::register(&mut t);
276        stdfn::Product::register(&mut t);
277        stdfn::Replace::register(&mut t);
278        stdfn::RpcCall::register(&mut t);
279        stdfn::Sample::register(&mut t);
280        stdfn::Set::register(&mut t);
281        stdfn::StartsWith::register(&mut t);
282        stdfn::Store::register(&mut t);
283        stdfn::StringConcat::register(&mut t);
284        stdfn::StringJoin::register(&mut t);
285        stdfn::StripPrefix::register(&mut t);
286        stdfn::StripSuffix::register(&mut t);
287        stdfn::Sum::register(&mut t);
288        stdfn::Timer::register(&mut t);
289        stdfn::TrimEnd::register(&mut t);
290        stdfn::Trim::register(&mut t);
291        stdfn::TrimStart::register(&mut t);
292        stdfn::Uniq::register(&mut t);
293        t
294    }
295}
296
297pub enum Node<C: Ctx, E> {
298    Error(Expr, Value),
299    Constant(Expr, Value),
300    Apply {
301        spec: Expr,
302        args: Vec<Node<C, E>>,
303        function: Box<dyn Apply<C, E> + Send + Sync>,
304    },
305}
306
307impl<C: Ctx, E> fmt::Display for Node<C, E> {
308    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
309        match self {
310            Node::Error(s, _) | Node::Constant(s, _) | Node::Apply { spec: s, .. } => {
311                write!(f, "{}", s)
312            }
313        }
314    }
315}
316
317impl<C: Ctx, E: Clone> Node<C, E> {
318    pub fn compile_int(
319        ctx: &mut ExecCtx<C, E>,
320        spec: Expr,
321        scope: Path,
322        top_id: ExprId,
323    ) -> Self {
324        match &spec {
325            Expr { kind: ExprKind::Constant(v), id: _ } => {
326                Node::Constant(spec.clone(), v.clone())
327            }
328            Expr { kind: ExprKind::Apply { args, function }, id } => {
329                let scope = if function == "do" && id != &top_id {
330                    scope.append(&format!("do{:?}", id))
331                } else {
332                    scope
333                };
334                let args: Vec<Node<C, E>> = args
335                    .iter()
336                    .map(|spec| {
337                        Node::compile_int(ctx, spec.clone(), scope.clone(), top_id)
338                    })
339                    .collect();
340                match ctx.functions.get(function).map(Arc::clone) {
341                    None => {
342                        let e = Value::Error(Chars::from(format!(
343                            "unknown function {}",
344                            function
345                        )));
346                        Node::Error(spec.clone(), e)
347                    }
348                    Some(init) => {
349                        let function = init(ctx, &args, scope, top_id);
350                        Node::Apply { spec, args, function }
351                    }
352                }
353            }
354        }
355    }
356
357    pub fn compile(ctx: &mut ExecCtx<C, E>, scope: Path, spec: Expr) -> Self {
358        let top_id = spec.id;
359        Self::compile_int(ctx, spec, scope, top_id)
360    }
361
362    pub fn current(&self, ctx: &mut ExecCtx<C, E>) -> Option<Value> {
363        let (id, res) = match self {
364            Node::Error(spec, v) => (spec.id, Some(v.clone())),
365            Node::Constant(spec, v) => (spec.id, Some(v.clone())),
366            Node::Apply { spec, function, .. } => (spec.id, function.current(ctx)),
367        };
368        if ctx.dbg_ctx.trace {
369            if let Some(v) = &res {
370                ctx.dbg_ctx.add_event(id, None, v.clone());
371            }
372        }
373        res
374    }
375
376    pub fn update(&mut self, ctx: &mut ExecCtx<C, E>, event: &Event<E>) -> Option<Value> {
377        match self {
378            Node::Error(_, _) | Node::Constant(_, _) => None,
379            Node::Apply { spec, args, function } => {
380                let res = function.update(ctx, args, event);
381                if ctx.dbg_ctx.trace {
382                    if let Some(v) = &res {
383                        ctx.dbg_ctx.add_event(spec.id, Some(event.clone()), v.clone());
384                    }
385                }
386                res
387            }
388        }
389    }
390}