tracing_rc/sync/
collector.rs1use std::{
2 collections::{
3 HashMap,
4 VecDeque,
5 },
6 num::NonZeroUsize,
7 sync::{
8 Arc,
9 Weak,
10 },
11};
12
13use dashmap::{
14 DashMap,
15 DashSet,
16};
17use indexmap::IndexMap;
18use once_cell::sync::Lazy;
19use parking_lot::{
20 const_mutex,
21 Mutex,
22};
23use petgraph::{
24 csr::{
25 Csr,
26 NodeIndex,
27 },
28 visit::IntoNodeIdentifiers,
29 Directed,
30};
31
32use crate::{
33 impl_node,
34 sync::{
35 Agc,
36 AtomicInner,
37 Status,
38 Trace,
39 },
40 CollectOptions,
41 CollectionType,
42};
43
44impl_node!(WeakNode { inner_ptr: Weak<AtomicInner<dyn Trace>> }, upgrade(ptr) => Weak::upgrade(ptr));
45impl_node!(StrongNode { inner_ptr: Arc<AtomicInner<dyn Trace>> }, upgrade(ptr) => ptr);
46
47impl TryFrom<&WeakNode> for StrongNode {
48 type Error = ();
49
50 fn try_from(weak: &WeakNode) -> Result<Self, Self::Error> {
51 weak.upgrade()
52 .map(|strong| StrongNode { inner_ptr: strong })
53 .ok_or(())
54 }
55}
56
57pub(super) static YOUNG_GEN: Lazy<DashMap<WeakNode, usize>> = Lazy::new(DashMap::default);
58pub(super) static OLD_GEN: Lazy<DashSet<WeakNode>> = Lazy::new(DashSet::default);
59
60static COLLECTION_MUTEX: Mutex<()> = const_mutex(());
61
62pub struct GcVisitor<'cycle> {
65 visitor: &'cycle mut dyn FnMut(Arc<AtomicInner<dyn Trace>>),
66}
67
68impl GcVisitor<'_> {
69 pub fn visit_node<T: Trace + 'static>(&mut self, node: &Agc<T>) {
71 (self.visitor)(node.ptr.clone() as Arc<_>);
72 }
73}
74
75type GraphIndex = NodeIndex<usize>;
76
77type ConnectivityGraph = Csr<(), (), Directed, GraphIndex>;
78
79type TracedNodeList = IndexMap<StrongNode, NonZeroUsize>;
80
81#[doc(hidden)]
82pub fn count_roots() -> usize {
83 YOUNG_GEN.len() + OLD_GEN.len()
84}
85
86pub fn collect_full() {
89 collect_with_options(CollectOptions::TRACE_AND_COLLECT_ALL);
90}
91
92pub fn collect() {
106 collect_with_options(CollectOptions::default());
107}
108
109pub fn collect_with_options(options: CollectOptions) {
112 collect_new_gen(options);
113 if options.kind != CollectionType::YoungOnly {
114 collect_old_gen();
115 }
116}
117
118fn collect_new_gen(options: CollectOptions) {
119 let mut to_old_gen = vec![];
120 YOUNG_GEN.retain(|node, generation| {
121 if node.strong_count() == 0 {
122 return false;
123 }
124
125 if *generation < options.old_gen_threshold {
126 *generation += 1;
127 return true;
128 }
129
130 to_old_gen.push(node.clone());
131 false
132 });
133
134 for node in to_old_gen {
135 OLD_GEN.insert(node);
136 }
137}
138
139fn collect_old_gen() {
140 let _guard = if let Some(guard) = COLLECTION_MUTEX.try_lock() {
141 guard
142 } else {
143 return;
145 };
146
147 let mut traced_nodes = IndexMap::with_capacity(OLD_GEN.len());
148 OLD_GEN.retain(|node| {
149 if let Ok(strong) = TryInto::<StrongNode>::try_into(node) {
150 if strong.status.load(atomic::Ordering::Acquire) == Status::RecentlyDecremented {
151 traced_nodes.insert(strong, NonZeroUsize::new(1).unwrap());
152 }
153 }
154 false
155 });
156
157 let mut connectivity_graph = ConnectivityGraph::with_nodes(traced_nodes.len());
158
159 let mut pending_nodes = connectivity_graph
160 .node_identifiers()
161 .collect::<VecDeque<_>>();
162
163 while let Some(node_ix) = pending_nodes.pop_front() {
164 trace_children(
165 node_ix,
166 &mut pending_nodes,
167 &mut traced_nodes,
168 &mut connectivity_graph,
169 );
170 }
171
172 let mut live_nodes = vec![];
173 let mut dead_nodes = HashMap::default();
174
175 for (node_ix, (node, refs)) in traced_nodes.into_iter().enumerate() {
176 if Arc::strong_count(&node) > refs.get() {
177 node.status.store(Status::Live, atomic::Ordering::Release);
181 live_nodes.push(node_ix);
182 } else if node.status.load(atomic::Ordering::Acquire) != Status::Traced {
183 live_nodes.push(node_ix);
189 } else {
190 dead_nodes.insert(node_ix, node);
191 }
192 }
193
194 for node_index in live_nodes {
195 filter_live_node_children(&connectivity_graph, node_index, &mut dead_nodes);
196 }
197
198 for node in dead_nodes.values() {
199 node.status.store(Status::Dead, atomic::Ordering::Release);
200 }
201
202 for (_, node) in dead_nodes {
203 AtomicInner::drop_data(&node);
204 }
205}
206
207fn trace_children(
208 parent_ix: GraphIndex,
209 pending_nodes: &mut VecDeque<GraphIndex>,
210 traced_nodes: &mut TracedNodeList,
211 connectivity_graph: &mut ConnectivityGraph,
212) {
213 let pin_parent = traced_nodes.get_index(parent_ix).unwrap().0.clone();
214 let parent = {
215 if let Some(_guard) = pin_parent.data.try_write() {
216 pin_parent
219 .status
220 .store(Status::Traced, atomic::Ordering::Release);
221 } else {
222 return;
227 };
228
229 pin_parent.data.read_recursive()
230 };
231
232 parent.visit_children(&mut GcVisitor {
233 visitor: &mut |node| {
234 match traced_nodes.entry(node.into()) {
235 indexmap::map::Entry::Occupied(mut seen) => {
236 *seen.get_mut() =
240 NonZeroUsize::new(seen.get().get().saturating_add(1)).unwrap();
241
242 connectivity_graph.add_edge(parent_ix, seen.index(), ());
243 }
244 indexmap::map::Entry::Vacant(unseen) => {
245 let child_ix = connectivity_graph.add_node(());
246 debug_assert_eq!(unseen.index(), child_ix);
247
248 connectivity_graph.add_edge(parent_ix, child_ix, ());
249
250 unseen.insert(NonZeroUsize::new(2).unwrap());
252 pending_nodes.push_back(child_ix);
253 }
254 };
255 },
256 });
257}
258
259fn filter_live_node_children(
260 graph: &ConnectivityGraph,
261 node: GraphIndex,
262 dead_nodes: &mut HashMap<GraphIndex, StrongNode>,
263) {
264 for child in graph.neighbors_slice(node) {
265 if dead_nodes.remove(child).is_some() {
266 filter_live_node_children(graph, *child, dead_nodes);
267 }
268 }
269}