incremental/
state.rs

1use super::adjust_heights_heap::AdjustHeightsHeap;
2use super::kind;
3use super::node_update::NodeUpdateDelayed;
4use super::{CellIncrement, NodeRef, Value, WeakNode};
5use crate::boxes::new_unsized;
6use crate::incrsan::NotObserver;
7use crate::node::ErasedNode;
8use crate::{SubscriptionToken, WeakMap};
9
10use super::internal_observer::{
11    ErasedObserver, InternalObserver, ObserverId, ObserverState, StrongObserver, WeakObserver,
12};
13use super::kind::Kind;
14use super::node::{Node, NodeId};
15use super::recompute_heap::RecomputeHeap;
16use super::scope::Scope;
17use super::stabilisation_num::StabilisationNum;
18use super::var::{Var, WeakVar};
19use super::{public, Incr};
20use core::fmt::Debug;
21use std::cell::{Cell, RefCell};
22use std::collections::HashMap;
23use std::rc::{Rc, Weak};
24
25pub(crate) mod expert;
26
27pub(crate) struct State {
28    pub(crate) stabilisation_num: Cell<StabilisationNum>,
29    pub(crate) adjust_heights_heap: RefCell<AdjustHeightsHeap>,
30    pub(crate) recompute_heap: RecomputeHeap,
31    pub(crate) status: Cell<IncrStatus>,
32    pub(crate) num_var_sets: Cell<usize>,
33    pub(crate) num_nodes_recomputed: Cell<usize>,
34    pub(crate) num_nodes_created: Cell<usize>,
35    pub(crate) num_nodes_changed: Cell<usize>,
36    pub(crate) num_nodes_became_necessary: Cell<usize>,
37    pub(crate) num_nodes_became_unnecessary: Cell<usize>,
38    pub(crate) num_nodes_invalidated: Cell<usize>,
39    pub(crate) num_active_observers: Cell<usize>,
40    pub(crate) propagate_invalidity: RefCell<Vec<WeakNode>>,
41    pub(crate) run_on_update_handlers: RefCell<Vec<(WeakNode, NodeUpdateDelayed)>>,
42    pub(crate) handle_after_stabilisation: RefCell<Vec<WeakNode>>,
43    pub(crate) new_observers: RefCell<Vec<WeakObserver>>,
44    pub(crate) all_observers: RefCell<HashMap<ObserverId, StrongObserver>>,
45    pub(crate) disallowed_observers: RefCell<Vec<WeakObserver>>,
46    pub(crate) current_scope: RefCell<Scope>,
47    pub(crate) set_during_stabilisation: RefCell<Vec<WeakVar>>,
48    pub(crate) dead_vars: RefCell<Vec<WeakVar>>,
49    /// Buffer for dropping vars
50    ///
51    /// If you have a Var<Var<i32>>, and then you drop the outer one, the outer one gets put in the
52    /// dead_vars bucket. But then, when you stabilise, the code that actually drops the internal
53    /// Vars wants to push the inner Var onto the dead_vars list. So we can't have it borrowed
54    /// while we execute the Drop code.
55    pub(crate) dead_vars_alt: RefCell<Vec<WeakVar>>,
56
57    pub(crate) weak_maps: RefCell<Vec<Rc<RefCell<dyn WeakMap>>>>,
58    pub(crate) weak_self: Weak<Self>,
59
60    #[cfg(debug_assertions)]
61    pub(crate) only_in_debug: OnlyInDebug,
62}
63
64impl Debug for State {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("State").finish()
67    }
68}
69
70#[cfg(debug_assertions)]
71#[derive(Debug, Default)]
72pub(crate) struct OnlyInDebug {
73    pub currently_running_node: RefCell<Option<WeakNode>>,
74}
75
76#[cfg(debug_assertions)]
77impl OnlyInDebug {
78    pub(crate) fn currently_running_node_exn(&self, name: &'static str) -> WeakNode {
79        let crn = self.currently_running_node.borrow();
80        match &*crn {
81            None => panic!("can only call {} during stabilisation", name),
82            Some(current) => current.clone(),
83        }
84    }
85}
86
87#[derive(Debug, Clone, Copy, Eq, PartialEq)]
88pub enum IncrStatus {
89    RunningOnUpdateHandlers,
90    NotStabilising,
91    Stabilising,
92    // This is a bit like Mutex panic poisoning. We don't currently need it because
93    // we don't use any std::panic::catch_unwind.
94    // StabilisePreviouslyRaised,
95}
96
97impl State {
98    pub(crate) fn public_weak(self: &Rc<Self>) -> public::WeakState {
99        public::WeakState {
100            inner: Rc::downgrade(self),
101        }
102    }
103    pub(crate) fn weak(&self) -> Weak<Self> {
104        self.weak_self.clone()
105    }
106    pub(crate) fn current_scope(&self) -> Scope {
107        self.current_scope.borrow().clone()
108    }
109
110    pub(crate) fn within_scope<R>(&self, scope: Scope, f: impl FnOnce() -> R) -> R {
111        if !scope.is_valid() {
112            panic!("Attempted to run a closure within an invalid scope");
113        }
114        let old = self.current_scope.replace(scope);
115        let r = f();
116        self.current_scope.replace(old);
117        r
118    }
119
120    pub(crate) fn new() -> Rc<Self> {
121        const DEFAULT_MAX_HEIGHT_ALLOWED: usize = 128;
122        Self::new_with_height(DEFAULT_MAX_HEIGHT_ALLOWED)
123    }
124    pub(crate) fn new_with_height(max_height: usize) -> Rc<Self> {
125        Rc::new_cyclic(|weak| State {
126            weak_self: weak.clone(),
127            recompute_heap: RecomputeHeap::new(max_height),
128            adjust_heights_heap: RefCell::new(AdjustHeightsHeap::new(max_height)),
129            stabilisation_num: Cell::new(StabilisationNum(0)),
130            num_var_sets: Cell::new(0),
131            num_nodes_recomputed: Cell::new(0),
132            num_nodes_created: Cell::new(0),
133            num_nodes_changed: Cell::new(0),
134            num_nodes_became_necessary: Cell::new(0),
135            num_nodes_became_unnecessary: Cell::new(0),
136            num_nodes_invalidated: Cell::new(0),
137            num_active_observers: Cell::new(0),
138            propagate_invalidity: RefCell::new(vec![]),
139            status: Cell::new(IncrStatus::NotStabilising),
140            all_observers: RefCell::new(HashMap::new()),
141            new_observers: RefCell::new(Vec::new()),
142            disallowed_observers: RefCell::new(Vec::new()),
143            current_scope: RefCell::new(Scope::Top),
144            set_during_stabilisation: RefCell::new(vec![]),
145            dead_vars: RefCell::new(vec![]),
146            dead_vars_alt: RefCell::new(vec![]),
147            handle_after_stabilisation: RefCell::new(vec![]),
148            run_on_update_handlers: RefCell::new(vec![]),
149            weak_maps: RefCell::new(vec![]),
150            #[cfg(debug_assertions)]
151            only_in_debug: OnlyInDebug::default(),
152        })
153    }
154
155    pub(crate) fn add_weak_map(&self, weak_map: Rc<RefCell<dyn WeakMap>>) {
156        let mut wm = self.weak_maps.borrow_mut();
157        wm.push(weak_map);
158    }
159
160    pub(crate) fn constant<T: Value>(self: &Rc<Self>, value: T) -> Incr<T> {
161        // TODO: - store the constant value directly in the Node.value_opt
162        //       - make recompute a noop instead of a clone
163        //       - set recomputed_at/changed_at to avoid queueing for recompute at all?
164        //       to save the single clone that constant currently does.
165        let node = Node::create_rc::<T>(self.weak(), self.current_scope(), Kind::constant(value));
166        Incr { node }
167    }
168
169    pub(crate) fn fold<F, T: Value, R: Value>(
170        self: &Rc<Self>,
171        vec: Vec<Incr<T>>,
172        init: R,
173        f: F,
174    ) -> Incr<R>
175    where
176        F: FnMut(R, &T) -> R + 'static + NotObserver,
177    {
178        if vec.is_empty() {
179            return self.constant(init);
180        }
181        let node = Node::create_rc::<R>(
182            self.weak(),
183            self.current_scope(),
184            Kind::ArrayFold(new_unsized!(kind::ArrayFold {
185                init,
186                fold: RefCell::new(f),
187                children: vec,
188            })),
189        );
190        Incr { node }
191    }
192
193    pub(crate) fn var_in_scope<T: Value>(
194        self: &Rc<Self>,
195        value: T,
196        scope: Scope,
197    ) -> public::Var<T> {
198        let var = Rc::new(Var {
199            state: self.weak(),
200            set_at: Cell::new(self.stabilisation_num.get()),
201            value: RefCell::new(value),
202            node_id: NodeId(0).into(),
203            node: RefCell::new(None),
204            value_set_during_stabilisation: RefCell::new(None),
205        });
206        let node = Node::create_rc::<T>(self.weak(), scope, Kind::Var(var.clone()));
207        {
208            let mut slot = var.node.borrow_mut();
209            var.node_id.set(node.id);
210            slot.replace(node);
211        }
212        public::Var::new(var)
213    }
214
215    pub(crate) fn observe<T: Value>(&self, incr: Incr<T>) -> Rc<InternalObserver<T>> {
216        let internal_observer = InternalObserver::new(incr);
217        self.num_active_observers.increment();
218        let mut no = self.new_observers.borrow_mut();
219        no.push(Rc::downgrade(&internal_observer) as Weak<dyn ErasedObserver>);
220        internal_observer
221    }
222
223    fn add_new_observers(&self) {
224        let mut no = self.new_observers.borrow_mut();
225        for weak in no.drain(..) {
226            let Some(obs) = weak.upgrade() else { continue };
227            match obs.state().get() {
228                ObserverState::InUse | ObserverState::Disallowed => panic!(),
229                ObserverState::Unlinked => {}
230                ObserverState::Created => {
231                    obs.state().set(ObserverState::InUse);
232                    let node = obs.observing_erased();
233                    let was_necessary = node.is_necessary();
234                    {
235                        let mut ao = self.all_observers.borrow_mut();
236                        ao.insert(obs.id(), obs.clone());
237                    }
238                    obs.add_to_observed_node();
239                    /* By adding [internal_observer] to [observing.observers], we may have added
240                    on-update handlers to [observing].  We need to handle [observing] after this
241                    stabilization to give those handlers a chance to run. */
242                    node.handle_after_stabilisation(self);
243                    debug_assert!(node.is_necessary());
244                    if !was_necessary {
245                        node.became_necessary_propagate(self);
246                    }
247                }
248            }
249        }
250    }
251
252    // not required. We don't have a GC with dead-but-still-participating objects to account for.
253    // fn disallow_finalized_observers(&self) {}
254
255    #[tracing::instrument]
256    fn unlink_disallowed_observers(&self) {
257        let mut disallowed = self.disallowed_observers.borrow_mut();
258        for obs_weak in disallowed.drain(..) {
259            let Some(obs) = obs_weak.upgrade() else {
260                continue;
261            };
262            debug_assert_eq!(obs.state().get(), ObserverState::Disallowed);
263            obs.state().set(ObserverState::Unlinked);
264            // get a strong ref to the node, before we drop its owning InternalObserver
265            let observing = obs.observing_packed();
266            {
267                obs.remove_from_observed_node();
268                // remove from all_observers (this finally drops the InternalObserver)
269                let mut ao = self.all_observers.borrow_mut();
270                ao.remove(&obs.id());
271                drop(obs);
272            }
273            observing.check_if_unnecessary(self);
274        }
275    }
276
277    #[tracing::instrument]
278    fn stabilise_start(&self) {
279        self.status.set(IncrStatus::Stabilising);
280        // self.disallow_finalized_observers();
281        self.add_new_observers();
282        self.unlink_disallowed_observers();
283    }
284    #[tracing::instrument]
285    fn stabilise_end(&self) {
286        self.stabilisation_num
287            .set(self.stabilisation_num.get().add1());
288        #[cfg(debug_assertions)]
289        {
290            self.only_in_debug.currently_running_node.take();
291            // t.only_in_debug.expert_nodes_created_by_current_node <- []);
292        }
293        tracing::info_span!("set_during_stabilisation").in_scope(|| {
294            let mut stack = self.set_during_stabilisation.borrow_mut();
295            while let Some(var) = stack.pop() {
296                let Some(var) = var.upgrade() else { continue };
297                tracing::debug!("set_during_stabilisation: found var with {:?}", var.id());
298                var.set_var_stabilise_end();
299            }
300        });
301        // we may have the same var appear in the set_during_stabilisation stack,
302        // and also the dead_vars stack. That's ok! Being in dead_vars means it will
303        // never be set again, as the public::Var is gone & nobody can set it from
304        // outside any more. So killing the Var's internal reference to the watch Node
305        // will not be a problem, because the last time that was needed was back a few
306        // lines ago when we ran var.set_var_stabilise_end().
307        tracing::info_span!("dead_vars").in_scope(|| {
308            // This code handles Var<Var> by double buffering
309            let mut alt = self.dead_vars_alt.borrow_mut();
310            loop {
311                let mut stack = self.dead_vars.borrow_mut();
312                if stack.is_empty() {
313                    break;
314                }
315                // Subtle: don't just swap the RefMuts! Swap the vecs.
316                std::mem::swap(&mut *stack, &mut *alt);
317                drop(stack);
318                for var in alt.drain(..) {
319                    let Some(var) = var.upgrade() else { continue };
320                    tracing::debug!("dead_vars: found var with {:?}", var.id());
321                    var.break_rc_cycle();
322                }
323            }
324        });
325        tracing::info_span!("handle_after_stabilisation").in_scope(|| {
326            let mut stack = self.handle_after_stabilisation.borrow_mut();
327            for node in stack.drain(..).filter_map(|node| node.upgrade()) {
328                node.is_in_handle_after_stabilisation().set(false);
329                let node_update = node.node_update();
330                let mut run_queue = self.run_on_update_handlers.borrow_mut();
331                run_queue.push((node.weak(), node_update))
332            }
333        });
334        tracing::info_span!("run_on_update_handlers").in_scope(|| {
335            self.status.set(IncrStatus::RunningOnUpdateHandlers);
336            let now = self.stabilisation_num.get();
337            let mut stack = self.run_on_update_handlers.borrow_mut();
338            for (node, node_update) in stack
339                .drain(..)
340                .filter_map(|(node, node_update)| node.upgrade().map(|n| (n, node_update)))
341            {
342                node.run_on_update_handlers(node_update, now)
343            }
344        });
345        for wm in self.weak_maps.borrow().iter() {
346            let mut w = wm.borrow_mut();
347            w.garbage_collect();
348        }
349        self.status.set(IncrStatus::NotStabilising);
350    }
351
352    pub(crate) fn is_stable(&self) -> bool {
353        self.recompute_heap.is_empty()
354            && self.dead_vars.borrow().is_empty()
355            && self.new_observers.borrow().is_empty()
356    }
357
358    pub(crate) fn stabilise(&self) {
359        self.stabilise_debug(None)
360    }
361
362    pub(crate) fn stabilise_debug(&self, _prefix: Option<&str>) {
363        let span = tracing::info_span!("stabilise");
364        span.in_scope(|| {
365            #[cfg(debug_assertions)]
366            let mut do_debug = {
367                let mut iterations = 0;
368                let mut buf = String::new();
369                move || {
370                    if tracing::enabled!(tracing::Level::INFO) {
371                        if let Some(prefix) = _prefix {
372                            iterations += 1;
373                            use std::fmt::Write;
374                            buf.clear();
375                            write!(&mut buf, "{prefix}-stabilise-{iterations}.dot").unwrap();
376                            self.save_dot_to_file(&buf);
377                        }
378                    }
379                }
380            };
381
382            #[cfg(not(debug_assertions))]
383            fn do_debug() {}
384
385            assert_eq!(self.status.get(), IncrStatus::NotStabilising);
386
387            self.stabilise_start();
388
389            while let Some(node) = self.recompute_heap.remove_min() {
390                do_debug();
391                node.recompute(self);
392            }
393
394            self.stabilise_end();
395            do_debug();
396        });
397    }
398
399    pub(crate) fn propagate_invalidity(&self) {
400        while let Some(node) = {
401            let mut pi = self.propagate_invalidity.borrow_mut();
402            pi.pop()
403        } {
404            let Some(node) = node.upgrade() else { continue };
405            if node.is_valid() {
406                if node.should_be_invalidated() {
407                    node.invalidate_node(self);
408                } else {
409                    /* [Node.needs_to_be_computed node] is true because
410                    - node is necessary. This is because children can only point to necessary parents
411                    - node is stale. This is because: For bind, if, join, this is true because
412                    - either the invalidation is caused by the lhs changing (in which case the
413                      lhs-change node being newer makes us stale).
414                    - or a child became invalid this stabilization cycle, in which case it has
415                      t.changed_at of [t.stabilization_num], and so [node] is stale
416                    - or [node] just became necessary and tried connecting to an already invalid
417                    child. In that case, [child.changed_at > node.recomputed_at] for that child,
418                    because if we had been recomputed when that child changed, we would have been
419                    made invalid back then.  For expert nodes, the argument is the same, except
420                    that instead of lhs-change nodes make the expert nodes stale, it's made stale
421                    explicitely when adding or removing children. */
422                    debug_assert!(node.needs_to_be_computed());
423
424                    // ...
425                    node.propagate_invalidity_helper();
426
427                    /* We do not check [Node.needs_to_be_computed node] here, because it should be
428                    true, and because computing it takes O(number of children), node can be pushed
429                    on the stack once per child, and expert nodes can have lots of children. */
430                    if !node.is_in_recompute_heap() {
431                        self.recompute_heap.insert(node);
432                    }
433                }
434            }
435        }
436    }
437
438    pub(crate) fn unsubscribe(&self, token: SubscriptionToken) {
439        let all_obs = self.all_observers.borrow();
440        if let Some(obs) = all_obs.get(&token.observer_id()) {
441            obs.unsubscribe(token).unwrap();
442        }
443    }
444
445    pub(crate) fn is_stabilising(&self) -> bool {
446        self.status.get() != IncrStatus::NotStabilising
447    }
448
449    pub(crate) fn set_max_height_allowed(&self, new_max_height: usize) {
450        if self.status.get() == IncrStatus::Stabilising {
451            panic!("tried to set_max_height_allowed during stabilisation");
452        }
453        let mut ah_heap = self.adjust_heights_heap.borrow_mut();
454        ah_heap.set_max_height_allowed(new_max_height);
455        drop(ah_heap);
456        self.recompute_heap.set_max_height_allowed(new_max_height);
457    }
458
459    pub(crate) fn set_height(&self, node: NodeRef, height: i32) {
460        let mut ah_heap = self.adjust_heights_heap.borrow_mut();
461        ah_heap.set_height(&node, height);
462    }
463
464    pub(crate) fn save_dot_to_file(&self, named: &str) {
465        let observers = self.all_observers.borrow();
466        let mut all_observed = observers.iter().map(|(_id, o)| o.observing_erased());
467        super::node::save_dot_to_file(&mut all_observed, named).unwrap();
468    }
469    pub(crate) fn save_dot_to_string(&self) -> String {
470        let observers = self.all_observers.borrow();
471        let mut all_observed = observers.iter().map(|(_id, o)| o.observing_erased());
472        let mut buf = String::new();
473        super::node::save_dot(&mut buf, &mut all_observed).unwrap();
474        buf
475    }
476
477    #[tracing::instrument]
478    pub(crate) fn destroy(&self) {
479        let mut dead_vars = self.dead_vars.take();
480        for var in dead_vars.drain(..).filter_map(|x| x.upgrade()) {
481            var.break_rc_cycle();
482        }
483        for (_id, obs) in self.all_observers.borrow().iter() {
484            let state = obs.state().get();
485            if state == ObserverState::InUse || state == ObserverState::Created {
486                obs.disallow_future_use(self);
487            }
488        }
489        self.unlink_disallowed_observers();
490        self.all_observers.take().clear();
491        self.disallowed_observers.take().clear();
492        self.weak_maps.take().clear();
493        self.recompute_heap.clear();
494        self.adjust_heights_heap.borrow_mut().clear();
495    }
496}
497
498impl Drop for State {
499    fn drop(&mut self) {
500        tracing::debug!("destroying internal State object");
501        self.destroy();
502    }
503}