1use super::adjust_heights_heap::AdjustHeightsHeap;
2use super::kind;
3use super::node_update::NodeUpdateDelayed;
4use super::{CellIncrement, NodeRef, Value, WeakNode};
5use crate::boxes::new_unsized;
6use crate::incrsan::NotObserver;
7use crate::node::ErasedNode;
8use crate::{SubscriptionToken, WeakMap};
9
10use super::internal_observer::{
11 ErasedObserver, InternalObserver, ObserverId, ObserverState, StrongObserver, WeakObserver,
12};
13use super::kind::Kind;
14use super::node::{Node, NodeId};
15use super::recompute_heap::RecomputeHeap;
16use super::scope::Scope;
17use super::stabilisation_num::StabilisationNum;
18use super::var::{Var, WeakVar};
19use super::{public, Incr};
20use core::fmt::Debug;
21use std::cell::{Cell, RefCell};
22use std::collections::HashMap;
23use std::rc::{Rc, Weak};
24
25pub(crate) mod expert;
26
27pub(crate) struct State {
28 pub(crate) stabilisation_num: Cell<StabilisationNum>,
29 pub(crate) adjust_heights_heap: RefCell<AdjustHeightsHeap>,
30 pub(crate) recompute_heap: RecomputeHeap,
31 pub(crate) status: Cell<IncrStatus>,
32 pub(crate) num_var_sets: Cell<usize>,
33 pub(crate) num_nodes_recomputed: Cell<usize>,
34 pub(crate) num_nodes_created: Cell<usize>,
35 pub(crate) num_nodes_changed: Cell<usize>,
36 pub(crate) num_nodes_became_necessary: Cell<usize>,
37 pub(crate) num_nodes_became_unnecessary: Cell<usize>,
38 pub(crate) num_nodes_invalidated: Cell<usize>,
39 pub(crate) num_active_observers: Cell<usize>,
40 pub(crate) propagate_invalidity: RefCell<Vec<WeakNode>>,
41 pub(crate) run_on_update_handlers: RefCell<Vec<(WeakNode, NodeUpdateDelayed)>>,
42 pub(crate) handle_after_stabilisation: RefCell<Vec<WeakNode>>,
43 pub(crate) new_observers: RefCell<Vec<WeakObserver>>,
44 pub(crate) all_observers: RefCell<HashMap<ObserverId, StrongObserver>>,
45 pub(crate) disallowed_observers: RefCell<Vec<WeakObserver>>,
46 pub(crate) current_scope: RefCell<Scope>,
47 pub(crate) set_during_stabilisation: RefCell<Vec<WeakVar>>,
48 pub(crate) dead_vars: RefCell<Vec<WeakVar>>,
49 pub(crate) dead_vars_alt: RefCell<Vec<WeakVar>>,
56
57 pub(crate) weak_maps: RefCell<Vec<Rc<RefCell<dyn WeakMap>>>>,
58 pub(crate) weak_self: Weak<Self>,
59
60 #[cfg(debug_assertions)]
61 pub(crate) only_in_debug: OnlyInDebug,
62}
63
64impl Debug for State {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("State").finish()
67 }
68}
69
70#[cfg(debug_assertions)]
71#[derive(Debug, Default)]
72pub(crate) struct OnlyInDebug {
73 pub currently_running_node: RefCell<Option<WeakNode>>,
74}
75
76#[cfg(debug_assertions)]
77impl OnlyInDebug {
78 pub(crate) fn currently_running_node_exn(&self, name: &'static str) -> WeakNode {
79 let crn = self.currently_running_node.borrow();
80 match &*crn {
81 None => panic!("can only call {} during stabilisation", name),
82 Some(current) => current.clone(),
83 }
84 }
85}
86
87#[derive(Debug, Clone, Copy, Eq, PartialEq)]
88pub enum IncrStatus {
89 RunningOnUpdateHandlers,
90 NotStabilising,
91 Stabilising,
92 }
96
97impl State {
98 pub(crate) fn public_weak(self: &Rc<Self>) -> public::WeakState {
99 public::WeakState {
100 inner: Rc::downgrade(self),
101 }
102 }
103 pub(crate) fn weak(&self) -> Weak<Self> {
104 self.weak_self.clone()
105 }
106 pub(crate) fn current_scope(&self) -> Scope {
107 self.current_scope.borrow().clone()
108 }
109
110 pub(crate) fn within_scope<R>(&self, scope: Scope, f: impl FnOnce() -> R) -> R {
111 if !scope.is_valid() {
112 panic!("Attempted to run a closure within an invalid scope");
113 }
114 let old = self.current_scope.replace(scope);
115 let r = f();
116 self.current_scope.replace(old);
117 r
118 }
119
120 pub(crate) fn new() -> Rc<Self> {
121 const DEFAULT_MAX_HEIGHT_ALLOWED: usize = 128;
122 Self::new_with_height(DEFAULT_MAX_HEIGHT_ALLOWED)
123 }
124 pub(crate) fn new_with_height(max_height: usize) -> Rc<Self> {
125 Rc::new_cyclic(|weak| State {
126 weak_self: weak.clone(),
127 recompute_heap: RecomputeHeap::new(max_height),
128 adjust_heights_heap: RefCell::new(AdjustHeightsHeap::new(max_height)),
129 stabilisation_num: Cell::new(StabilisationNum(0)),
130 num_var_sets: Cell::new(0),
131 num_nodes_recomputed: Cell::new(0),
132 num_nodes_created: Cell::new(0),
133 num_nodes_changed: Cell::new(0),
134 num_nodes_became_necessary: Cell::new(0),
135 num_nodes_became_unnecessary: Cell::new(0),
136 num_nodes_invalidated: Cell::new(0),
137 num_active_observers: Cell::new(0),
138 propagate_invalidity: RefCell::new(vec![]),
139 status: Cell::new(IncrStatus::NotStabilising),
140 all_observers: RefCell::new(HashMap::new()),
141 new_observers: RefCell::new(Vec::new()),
142 disallowed_observers: RefCell::new(Vec::new()),
143 current_scope: RefCell::new(Scope::Top),
144 set_during_stabilisation: RefCell::new(vec![]),
145 dead_vars: RefCell::new(vec![]),
146 dead_vars_alt: RefCell::new(vec![]),
147 handle_after_stabilisation: RefCell::new(vec![]),
148 run_on_update_handlers: RefCell::new(vec![]),
149 weak_maps: RefCell::new(vec![]),
150 #[cfg(debug_assertions)]
151 only_in_debug: OnlyInDebug::default(),
152 })
153 }
154
155 pub(crate) fn add_weak_map(&self, weak_map: Rc<RefCell<dyn WeakMap>>) {
156 let mut wm = self.weak_maps.borrow_mut();
157 wm.push(weak_map);
158 }
159
160 pub(crate) fn constant<T: Value>(self: &Rc<Self>, value: T) -> Incr<T> {
161 let node = Node::create_rc::<T>(self.weak(), self.current_scope(), Kind::constant(value));
166 Incr { node }
167 }
168
169 pub(crate) fn fold<F, T: Value, R: Value>(
170 self: &Rc<Self>,
171 vec: Vec<Incr<T>>,
172 init: R,
173 f: F,
174 ) -> Incr<R>
175 where
176 F: FnMut(R, &T) -> R + 'static + NotObserver,
177 {
178 if vec.is_empty() {
179 return self.constant(init);
180 }
181 let node = Node::create_rc::<R>(
182 self.weak(),
183 self.current_scope(),
184 Kind::ArrayFold(new_unsized!(kind::ArrayFold {
185 init,
186 fold: RefCell::new(f),
187 children: vec,
188 })),
189 );
190 Incr { node }
191 }
192
193 pub(crate) fn var_in_scope<T: Value>(
194 self: &Rc<Self>,
195 value: T,
196 scope: Scope,
197 ) -> public::Var<T> {
198 let var = Rc::new(Var {
199 state: self.weak(),
200 set_at: Cell::new(self.stabilisation_num.get()),
201 value: RefCell::new(value),
202 node_id: NodeId(0).into(),
203 node: RefCell::new(None),
204 value_set_during_stabilisation: RefCell::new(None),
205 });
206 let node = Node::create_rc::<T>(self.weak(), scope, Kind::Var(var.clone()));
207 {
208 let mut slot = var.node.borrow_mut();
209 var.node_id.set(node.id);
210 slot.replace(node);
211 }
212 public::Var::new(var)
213 }
214
215 pub(crate) fn observe<T: Value>(&self, incr: Incr<T>) -> Rc<InternalObserver<T>> {
216 let internal_observer = InternalObserver::new(incr);
217 self.num_active_observers.increment();
218 let mut no = self.new_observers.borrow_mut();
219 no.push(Rc::downgrade(&internal_observer) as Weak<dyn ErasedObserver>);
220 internal_observer
221 }
222
223 fn add_new_observers(&self) {
224 let mut no = self.new_observers.borrow_mut();
225 for weak in no.drain(..) {
226 let Some(obs) = weak.upgrade() else { continue };
227 match obs.state().get() {
228 ObserverState::InUse | ObserverState::Disallowed => panic!(),
229 ObserverState::Unlinked => {}
230 ObserverState::Created => {
231 obs.state().set(ObserverState::InUse);
232 let node = obs.observing_erased();
233 let was_necessary = node.is_necessary();
234 {
235 let mut ao = self.all_observers.borrow_mut();
236 ao.insert(obs.id(), obs.clone());
237 }
238 obs.add_to_observed_node();
239 node.handle_after_stabilisation(self);
243 debug_assert!(node.is_necessary());
244 if !was_necessary {
245 node.became_necessary_propagate(self);
246 }
247 }
248 }
249 }
250 }
251
252 #[tracing::instrument]
256 fn unlink_disallowed_observers(&self) {
257 let mut disallowed = self.disallowed_observers.borrow_mut();
258 for obs_weak in disallowed.drain(..) {
259 let Some(obs) = obs_weak.upgrade() else {
260 continue;
261 };
262 debug_assert_eq!(obs.state().get(), ObserverState::Disallowed);
263 obs.state().set(ObserverState::Unlinked);
264 let observing = obs.observing_packed();
266 {
267 obs.remove_from_observed_node();
268 let mut ao = self.all_observers.borrow_mut();
270 ao.remove(&obs.id());
271 drop(obs);
272 }
273 observing.check_if_unnecessary(self);
274 }
275 }
276
277 #[tracing::instrument]
278 fn stabilise_start(&self) {
279 self.status.set(IncrStatus::Stabilising);
280 self.add_new_observers();
282 self.unlink_disallowed_observers();
283 }
284 #[tracing::instrument]
285 fn stabilise_end(&self) {
286 self.stabilisation_num
287 .set(self.stabilisation_num.get().add1());
288 #[cfg(debug_assertions)]
289 {
290 self.only_in_debug.currently_running_node.take();
291 }
293 tracing::info_span!("set_during_stabilisation").in_scope(|| {
294 let mut stack = self.set_during_stabilisation.borrow_mut();
295 while let Some(var) = stack.pop() {
296 let Some(var) = var.upgrade() else { continue };
297 tracing::debug!("set_during_stabilisation: found var with {:?}", var.id());
298 var.set_var_stabilise_end();
299 }
300 });
301 tracing::info_span!("dead_vars").in_scope(|| {
308 let mut alt = self.dead_vars_alt.borrow_mut();
310 loop {
311 let mut stack = self.dead_vars.borrow_mut();
312 if stack.is_empty() {
313 break;
314 }
315 std::mem::swap(&mut *stack, &mut *alt);
317 drop(stack);
318 for var in alt.drain(..) {
319 let Some(var) = var.upgrade() else { continue };
320 tracing::debug!("dead_vars: found var with {:?}", var.id());
321 var.break_rc_cycle();
322 }
323 }
324 });
325 tracing::info_span!("handle_after_stabilisation").in_scope(|| {
326 let mut stack = self.handle_after_stabilisation.borrow_mut();
327 for node in stack.drain(..).filter_map(|node| node.upgrade()) {
328 node.is_in_handle_after_stabilisation().set(false);
329 let node_update = node.node_update();
330 let mut run_queue = self.run_on_update_handlers.borrow_mut();
331 run_queue.push((node.weak(), node_update))
332 }
333 });
334 tracing::info_span!("run_on_update_handlers").in_scope(|| {
335 self.status.set(IncrStatus::RunningOnUpdateHandlers);
336 let now = self.stabilisation_num.get();
337 let mut stack = self.run_on_update_handlers.borrow_mut();
338 for (node, node_update) in stack
339 .drain(..)
340 .filter_map(|(node, node_update)| node.upgrade().map(|n| (n, node_update)))
341 {
342 node.run_on_update_handlers(node_update, now)
343 }
344 });
345 for wm in self.weak_maps.borrow().iter() {
346 let mut w = wm.borrow_mut();
347 w.garbage_collect();
348 }
349 self.status.set(IncrStatus::NotStabilising);
350 }
351
352 pub(crate) fn is_stable(&self) -> bool {
353 self.recompute_heap.is_empty()
354 && self.dead_vars.borrow().is_empty()
355 && self.new_observers.borrow().is_empty()
356 }
357
358 pub(crate) fn stabilise(&self) {
359 self.stabilise_debug(None)
360 }
361
362 pub(crate) fn stabilise_debug(&self, _prefix: Option<&str>) {
363 let span = tracing::info_span!("stabilise");
364 span.in_scope(|| {
365 #[cfg(debug_assertions)]
366 let mut do_debug = {
367 let mut iterations = 0;
368 let mut buf = String::new();
369 move || {
370 if tracing::enabled!(tracing::Level::INFO) {
371 if let Some(prefix) = _prefix {
372 iterations += 1;
373 use std::fmt::Write;
374 buf.clear();
375 write!(&mut buf, "{prefix}-stabilise-{iterations}.dot").unwrap();
376 self.save_dot_to_file(&buf);
377 }
378 }
379 }
380 };
381
382 #[cfg(not(debug_assertions))]
383 fn do_debug() {}
384
385 assert_eq!(self.status.get(), IncrStatus::NotStabilising);
386
387 self.stabilise_start();
388
389 while let Some(node) = self.recompute_heap.remove_min() {
390 do_debug();
391 node.recompute(self);
392 }
393
394 self.stabilise_end();
395 do_debug();
396 });
397 }
398
399 pub(crate) fn propagate_invalidity(&self) {
400 while let Some(node) = {
401 let mut pi = self.propagate_invalidity.borrow_mut();
402 pi.pop()
403 } {
404 let Some(node) = node.upgrade() else { continue };
405 if node.is_valid() {
406 if node.should_be_invalidated() {
407 node.invalidate_node(self);
408 } else {
409 debug_assert!(node.needs_to_be_computed());
423
424 node.propagate_invalidity_helper();
426
427 if !node.is_in_recompute_heap() {
431 self.recompute_heap.insert(node);
432 }
433 }
434 }
435 }
436 }
437
438 pub(crate) fn unsubscribe(&self, token: SubscriptionToken) {
439 let all_obs = self.all_observers.borrow();
440 if let Some(obs) = all_obs.get(&token.observer_id()) {
441 obs.unsubscribe(token).unwrap();
442 }
443 }
444
445 pub(crate) fn is_stabilising(&self) -> bool {
446 self.status.get() != IncrStatus::NotStabilising
447 }
448
449 pub(crate) fn set_max_height_allowed(&self, new_max_height: usize) {
450 if self.status.get() == IncrStatus::Stabilising {
451 panic!("tried to set_max_height_allowed during stabilisation");
452 }
453 let mut ah_heap = self.adjust_heights_heap.borrow_mut();
454 ah_heap.set_max_height_allowed(new_max_height);
455 drop(ah_heap);
456 self.recompute_heap.set_max_height_allowed(new_max_height);
457 }
458
459 pub(crate) fn set_height(&self, node: NodeRef, height: i32) {
460 let mut ah_heap = self.adjust_heights_heap.borrow_mut();
461 ah_heap.set_height(&node, height);
462 }
463
464 pub(crate) fn save_dot_to_file(&self, named: &str) {
465 let observers = self.all_observers.borrow();
466 let mut all_observed = observers.iter().map(|(_id, o)| o.observing_erased());
467 super::node::save_dot_to_file(&mut all_observed, named).unwrap();
468 }
469 pub(crate) fn save_dot_to_string(&self) -> String {
470 let observers = self.all_observers.borrow();
471 let mut all_observed = observers.iter().map(|(_id, o)| o.observing_erased());
472 let mut buf = String::new();
473 super::node::save_dot(&mut buf, &mut all_observed).unwrap();
474 buf
475 }
476
477 #[tracing::instrument]
478 pub(crate) fn destroy(&self) {
479 let mut dead_vars = self.dead_vars.take();
480 for var in dead_vars.drain(..).filter_map(|x| x.upgrade()) {
481 var.break_rc_cycle();
482 }
483 for (_id, obs) in self.all_observers.borrow().iter() {
484 let state = obs.state().get();
485 if state == ObserverState::InUse || state == ObserverState::Created {
486 obs.disallow_future_use(self);
487 }
488 }
489 self.unlink_disallowed_observers();
490 self.all_observers.take().clear();
491 self.disallowed_observers.take().clear();
492 self.weak_maps.take().clear();
493 self.recompute_heap.clear();
494 self.adjust_heights_heap.borrow_mut().clear();
495 }
496}
497
498impl Drop for State {
499 fn drop(&mut self) {
500 tracing::debug!("destroying internal State object");
501 self.destroy();
502 }
503}