1use super::adjust_heights_heap::AdjustHeightsHeap;
2use super::kind;
3use super::node_update::NodeUpdateDelayed;
4use super::{CellIncrement, NodeRef, Value, WeakNode};
5use crate::boxes::new_unsized;
6use crate::incrsan::NotObserver;
7use crate::node::ErasedNode;
8use crate::{SubscriptionToken, WeakMap};
9
10use super::internal_observer::{
11    ErasedObserver, InternalObserver, ObserverId, ObserverState, StrongObserver, WeakObserver,
12};
13use super::kind::Kind;
14use super::node::{Node, NodeId};
15use super::recompute_heap::RecomputeHeap;
16use super::scope::Scope;
17use super::stabilisation_num::StabilisationNum;
18use super::var::{Var, WeakVar};
19use super::{public, Incr};
20use core::fmt::Debug;
21use std::cell::{Cell, RefCell};
22use std::collections::HashMap;
23use std::rc::{Rc, Weak};
24
25pub(crate) mod expert;
26
27pub(crate) struct State {
28    pub(crate) stabilisation_num: Cell<StabilisationNum>,
29    pub(crate) adjust_heights_heap: RefCell<AdjustHeightsHeap>,
30    pub(crate) recompute_heap: RecomputeHeap,
31    pub(crate) status: Cell<IncrStatus>,
32    pub(crate) num_var_sets: Cell<usize>,
33    pub(crate) num_nodes_recomputed: Cell<usize>,
34    pub(crate) num_nodes_created: Cell<usize>,
35    pub(crate) num_nodes_changed: Cell<usize>,
36    pub(crate) num_nodes_became_necessary: Cell<usize>,
37    pub(crate) num_nodes_became_unnecessary: Cell<usize>,
38    pub(crate) num_nodes_invalidated: Cell<usize>,
39    pub(crate) num_active_observers: Cell<usize>,
40    pub(crate) propagate_invalidity: RefCell<Vec<WeakNode>>,
41    pub(crate) run_on_update_handlers: RefCell<Vec<(WeakNode, NodeUpdateDelayed)>>,
42    pub(crate) handle_after_stabilisation: RefCell<Vec<WeakNode>>,
43    pub(crate) new_observers: RefCell<Vec<WeakObserver>>,
44    pub(crate) all_observers: RefCell<HashMap<ObserverId, StrongObserver>>,
45    pub(crate) disallowed_observers: RefCell<Vec<WeakObserver>>,
46    pub(crate) current_scope: RefCell<Scope>,
47    pub(crate) set_during_stabilisation: RefCell<Vec<WeakVar>>,
48    pub(crate) dead_vars: RefCell<Vec<WeakVar>>,
49    pub(crate) dead_vars_alt: RefCell<Vec<WeakVar>>,
56
57    pub(crate) weak_maps: RefCell<Vec<Rc<RefCell<dyn WeakMap>>>>,
58    pub(crate) weak_self: Weak<Self>,
59
60    #[cfg(debug_assertions)]
61    pub(crate) only_in_debug: OnlyInDebug,
62}
63
64impl Debug for State {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("State").finish()
67    }
68}
69
70#[cfg(debug_assertions)]
71#[derive(Debug, Default)]
72pub(crate) struct OnlyInDebug {
73    pub currently_running_node: RefCell<Option<WeakNode>>,
74}
75
76#[cfg(debug_assertions)]
77impl OnlyInDebug {
78    pub(crate) fn currently_running_node_exn(&self, name: &'static str) -> WeakNode {
79        let crn = self.currently_running_node.borrow();
80        match &*crn {
81            None => panic!("can only call {} during stabilisation", name),
82            Some(current) => current.clone(),
83        }
84    }
85}
86
87#[derive(Debug, Clone, Copy, Eq, PartialEq)]
88pub enum IncrStatus {
89    RunningOnUpdateHandlers,
90    NotStabilising,
91    Stabilising,
92    }
96
97impl State {
98    pub(crate) fn public_weak(self: &Rc<Self>) -> public::WeakState {
99        public::WeakState {
100            inner: Rc::downgrade(self),
101        }
102    }
103    pub(crate) fn weak(&self) -> Weak<Self> {
104        self.weak_self.clone()
105    }
106    pub(crate) fn current_scope(&self) -> Scope {
107        self.current_scope.borrow().clone()
108    }
109
110    pub(crate) fn within_scope<R>(&self, scope: Scope, f: impl FnOnce() -> R) -> R {
111        if !scope.is_valid() {
112            panic!("Attempted to run a closure within an invalid scope");
113        }
114        let old = self.current_scope.replace(scope);
115        let r = f();
116        self.current_scope.replace(old);
117        r
118    }
119
120    pub(crate) fn new() -> Rc<Self> {
121        const DEFAULT_MAX_HEIGHT_ALLOWED: usize = 128;
122        Self::new_with_height(DEFAULT_MAX_HEIGHT_ALLOWED)
123    }
124    pub(crate) fn new_with_height(max_height: usize) -> Rc<Self> {
125        Rc::new_cyclic(|weak| State {
126            weak_self: weak.clone(),
127            recompute_heap: RecomputeHeap::new(max_height),
128            adjust_heights_heap: RefCell::new(AdjustHeightsHeap::new(max_height)),
129            stabilisation_num: Cell::new(StabilisationNum(0)),
130            num_var_sets: Cell::new(0),
131            num_nodes_recomputed: Cell::new(0),
132            num_nodes_created: Cell::new(0),
133            num_nodes_changed: Cell::new(0),
134            num_nodes_became_necessary: Cell::new(0),
135            num_nodes_became_unnecessary: Cell::new(0),
136            num_nodes_invalidated: Cell::new(0),
137            num_active_observers: Cell::new(0),
138            propagate_invalidity: RefCell::new(vec![]),
139            status: Cell::new(IncrStatus::NotStabilising),
140            all_observers: RefCell::new(HashMap::new()),
141            new_observers: RefCell::new(Vec::new()),
142            disallowed_observers: RefCell::new(Vec::new()),
143            current_scope: RefCell::new(Scope::Top),
144            set_during_stabilisation: RefCell::new(vec![]),
145            dead_vars: RefCell::new(vec![]),
146            dead_vars_alt: RefCell::new(vec![]),
147            handle_after_stabilisation: RefCell::new(vec![]),
148            run_on_update_handlers: RefCell::new(vec![]),
149            weak_maps: RefCell::new(vec![]),
150            #[cfg(debug_assertions)]
151            only_in_debug: OnlyInDebug::default(),
152        })
153    }
154
155    pub(crate) fn add_weak_map(&self, weak_map: Rc<RefCell<dyn WeakMap>>) {
156        let mut wm = self.weak_maps.borrow_mut();
157        wm.push(weak_map);
158    }
159
160    pub(crate) fn constant<T: Value>(self: &Rc<Self>, value: T) -> Incr<T> {
161        let node = Node::create_rc::<T>(self.weak(), self.current_scope(), Kind::constant(value));
166        Incr { node }
167    }
168
169    pub(crate) fn fold<F, T: Value, R: Value>(
170        self: &Rc<Self>,
171        vec: Vec<Incr<T>>,
172        init: R,
173        f: F,
174    ) -> Incr<R>
175    where
176        F: FnMut(R, &T) -> R + 'static + NotObserver,
177    {
178        if vec.is_empty() {
179            return self.constant(init);
180        }
181        let node = Node::create_rc::<R>(
182            self.weak(),
183            self.current_scope(),
184            Kind::ArrayFold(new_unsized!(kind::ArrayFold {
185                init,
186                fold: RefCell::new(f),
187                children: vec,
188            })),
189        );
190        Incr { node }
191    }
192
193    pub(crate) fn var_in_scope<T: Value>(
194        self: &Rc<Self>,
195        value: T,
196        scope: Scope,
197    ) -> public::Var<T> {
198        let var = Rc::new(Var {
199            state: self.weak(),
200            set_at: Cell::new(self.stabilisation_num.get()),
201            value: RefCell::new(value),
202            node_id: NodeId(0).into(),
203            node: RefCell::new(None),
204            value_set_during_stabilisation: RefCell::new(None),
205        });
206        let node = Node::create_rc::<T>(self.weak(), scope, Kind::Var(var.clone()));
207        {
208            let mut slot = var.node.borrow_mut();
209            var.node_id.set(node.id);
210            slot.replace(node);
211        }
212        public::Var::new(var)
213    }
214
215    pub(crate) fn observe<T: Value>(&self, incr: Incr<T>) -> Rc<InternalObserver<T>> {
216        let internal_observer = InternalObserver::new(incr);
217        self.num_active_observers.increment();
218        let mut no = self.new_observers.borrow_mut();
219        no.push(Rc::downgrade(&internal_observer) as Weak<dyn ErasedObserver>);
220        internal_observer
221    }
222
223    fn add_new_observers(&self) {
224        let mut no = self.new_observers.borrow_mut();
225        for weak in no.drain(..) {
226            let Some(obs) = weak.upgrade() else { continue };
227            match obs.state().get() {
228                ObserverState::InUse | ObserverState::Disallowed => panic!(),
229                ObserverState::Unlinked => {}
230                ObserverState::Created => {
231                    obs.state().set(ObserverState::InUse);
232                    let node = obs.observing_erased();
233                    let was_necessary = node.is_necessary();
234                    {
235                        let mut ao = self.all_observers.borrow_mut();
236                        ao.insert(obs.id(), obs.clone());
237                    }
238                    obs.add_to_observed_node();
239                    node.handle_after_stabilisation(self);
243                    debug_assert!(node.is_necessary());
244                    if !was_necessary {
245                        node.became_necessary_propagate(self);
246                    }
247                }
248            }
249        }
250    }
251
252    #[tracing::instrument]
256    fn unlink_disallowed_observers(&self) {
257        let mut disallowed = self.disallowed_observers.borrow_mut();
258        for obs_weak in disallowed.drain(..) {
259            let Some(obs) = obs_weak.upgrade() else {
260                continue;
261            };
262            debug_assert_eq!(obs.state().get(), ObserverState::Disallowed);
263            obs.state().set(ObserverState::Unlinked);
264            let observing = obs.observing_packed();
266            {
267                obs.remove_from_observed_node();
268                let mut ao = self.all_observers.borrow_mut();
270                ao.remove(&obs.id());
271                drop(obs);
272            }
273            observing.check_if_unnecessary(self);
274        }
275    }
276
277    #[tracing::instrument]
278    fn stabilise_start(&self) {
279        self.status.set(IncrStatus::Stabilising);
280        self.add_new_observers();
282        self.unlink_disallowed_observers();
283    }
284    #[tracing::instrument]
285    fn stabilise_end(&self) {
286        self.stabilisation_num
287            .set(self.stabilisation_num.get().add1());
288        #[cfg(debug_assertions)]
289        {
290            self.only_in_debug.currently_running_node.take();
291            }
293        tracing::info_span!("set_during_stabilisation").in_scope(|| {
294            let mut stack = self.set_during_stabilisation.borrow_mut();
295            while let Some(var) = stack.pop() {
296                let Some(var) = var.upgrade() else { continue };
297                tracing::debug!("set_during_stabilisation: found var with {:?}", var.id());
298                var.set_var_stabilise_end();
299            }
300        });
301        tracing::info_span!("dead_vars").in_scope(|| {
308            let mut alt = self.dead_vars_alt.borrow_mut();
310            loop {
311                let mut stack = self.dead_vars.borrow_mut();
312                if stack.is_empty() {
313                    break;
314                }
315                std::mem::swap(&mut *stack, &mut *alt);
317                drop(stack);
318                for var in alt.drain(..) {
319                    let Some(var) = var.upgrade() else { continue };
320                    tracing::debug!("dead_vars: found var with {:?}", var.id());
321                    var.break_rc_cycle();
322                }
323            }
324        });
325        tracing::info_span!("handle_after_stabilisation").in_scope(|| {
326            let mut stack = self.handle_after_stabilisation.borrow_mut();
327            for node in stack.drain(..).filter_map(|node| node.upgrade()) {
328                node.is_in_handle_after_stabilisation().set(false);
329                let node_update = node.node_update();
330                let mut run_queue = self.run_on_update_handlers.borrow_mut();
331                run_queue.push((node.weak(), node_update))
332            }
333        });
334        tracing::info_span!("run_on_update_handlers").in_scope(|| {
335            self.status.set(IncrStatus::RunningOnUpdateHandlers);
336            let now = self.stabilisation_num.get();
337            let mut stack = self.run_on_update_handlers.borrow_mut();
338            for (node, node_update) in stack
339                .drain(..)
340                .filter_map(|(node, node_update)| node.upgrade().map(|n| (n, node_update)))
341            {
342                node.run_on_update_handlers(node_update, now)
343            }
344        });
345        for wm in self.weak_maps.borrow().iter() {
346            let mut w = wm.borrow_mut();
347            w.garbage_collect();
348        }
349        self.status.set(IncrStatus::NotStabilising);
350    }
351
352    pub(crate) fn is_stable(&self) -> bool {
353        self.recompute_heap.is_empty()
354            && self.dead_vars.borrow().is_empty()
355            && self.new_observers.borrow().is_empty()
356    }
357
358    pub(crate) fn stabilise(&self) {
359        self.stabilise_debug(None)
360    }
361
362    pub(crate) fn stabilise_debug(&self, _prefix: Option<&str>) {
363        let span = tracing::info_span!("stabilise");
364        span.in_scope(|| {
365            #[cfg(debug_assertions)]
366            let mut do_debug = {
367                let mut iterations = 0;
368                let mut buf = String::new();
369                move || {
370                    if tracing::enabled!(tracing::Level::INFO) {
371                        if let Some(prefix) = _prefix {
372                            iterations += 1;
373                            use std::fmt::Write;
374                            buf.clear();
375                            write!(&mut buf, "{prefix}-stabilise-{iterations}.dot").unwrap();
376                            self.save_dot_to_file(&buf);
377                        }
378                    }
379                }
380            };
381
382            #[cfg(not(debug_assertions))]
383            fn do_debug() {}
384
385            assert_eq!(self.status.get(), IncrStatus::NotStabilising);
386
387            self.stabilise_start();
388
389            while let Some(node) = self.recompute_heap.remove_min() {
390                do_debug();
391                node.recompute(self);
392            }
393
394            self.stabilise_end();
395            do_debug();
396        });
397    }
398
399    pub(crate) fn propagate_invalidity(&self) {
400        while let Some(node) = {
401            let mut pi = self.propagate_invalidity.borrow_mut();
402            pi.pop()
403        } {
404            let Some(node) = node.upgrade() else { continue };
405            if node.is_valid() {
406                if node.should_be_invalidated() {
407                    node.invalidate_node(self);
408                } else {
409                    debug_assert!(node.needs_to_be_computed());
423
424                    node.propagate_invalidity_helper();
426
427                    if !node.is_in_recompute_heap() {
431                        self.recompute_heap.insert(node);
432                    }
433                }
434            }
435        }
436    }
437
438    pub(crate) fn unsubscribe(&self, token: SubscriptionToken) {
439        let all_obs = self.all_observers.borrow();
440        if let Some(obs) = all_obs.get(&token.observer_id()) {
441            obs.unsubscribe(token).unwrap();
442        }
443    }
444
445    pub(crate) fn is_stabilising(&self) -> bool {
446        self.status.get() != IncrStatus::NotStabilising
447    }
448
449    pub(crate) fn set_max_height_allowed(&self, new_max_height: usize) {
450        if self.status.get() == IncrStatus::Stabilising {
451            panic!("tried to set_max_height_allowed during stabilisation");
452        }
453        let mut ah_heap = self.adjust_heights_heap.borrow_mut();
454        ah_heap.set_max_height_allowed(new_max_height);
455        drop(ah_heap);
456        self.recompute_heap.set_max_height_allowed(new_max_height);
457    }
458
459    pub(crate) fn set_height(&self, node: NodeRef, height: i32) {
460        let mut ah_heap = self.adjust_heights_heap.borrow_mut();
461        ah_heap.set_height(&node, height);
462    }
463
464    pub(crate) fn save_dot_to_file(&self, named: &str) {
465        let observers = self.all_observers.borrow();
466        let mut all_observed = observers.iter().map(|(_id, o)| o.observing_erased());
467        super::node::save_dot_to_file(&mut all_observed, named).unwrap();
468    }
469    pub(crate) fn save_dot_to_string(&self) -> String {
470        let observers = self.all_observers.borrow();
471        let mut all_observed = observers.iter().map(|(_id, o)| o.observing_erased());
472        let mut buf = String::new();
473        super::node::save_dot(&mut buf, &mut all_observed).unwrap();
474        buf
475    }
476
477    #[tracing::instrument]
478    pub(crate) fn destroy(&self) {
479        let mut dead_vars = self.dead_vars.take();
480        for var in dead_vars.drain(..).filter_map(|x| x.upgrade()) {
481            var.break_rc_cycle();
482        }
483        for (_id, obs) in self.all_observers.borrow().iter() {
484            let state = obs.state().get();
485            if state == ObserverState::InUse || state == ObserverState::Created {
486                obs.disallow_future_use(self);
487            }
488        }
489        self.unlink_disallowed_observers();
490        self.all_observers.take().clear();
491        self.disallowed_observers.take().clear();
492        self.weak_maps.take().clear();
493        self.recompute_heap.clear();
494        self.adjust_heights_heap.borrow_mut().clear();
495    }
496}
497
498impl Drop for State {
499    fn drop(&mut self) {
500        tracing::debug!("destroying internal State object");
501        self.destroy();
502    }
503}