tracing_rc/sync/
collector.rs

1use std::{
2    collections::{
3        HashMap,
4        VecDeque,
5    },
6    num::NonZeroUsize,
7    sync::{
8        Arc,
9        Weak,
10    },
11};
12
13use dashmap::{
14    DashMap,
15    DashSet,
16};
17use indexmap::IndexMap;
18use once_cell::sync::Lazy;
19use parking_lot::{
20    const_mutex,
21    Mutex,
22};
23use petgraph::{
24    csr::{
25        Csr,
26        NodeIndex,
27    },
28    visit::IntoNodeIdentifiers,
29    Directed,
30};
31
32use crate::{
33    impl_node,
34    sync::{
35        Agc,
36        AtomicInner,
37        Status,
38        Trace,
39    },
40    CollectOptions,
41    CollectionType,
42};
43
44impl_node!(WeakNode { inner_ptr: Weak<AtomicInner<dyn Trace>> }, upgrade(ptr) => Weak::upgrade(ptr));
45impl_node!(StrongNode { inner_ptr: Arc<AtomicInner<dyn Trace>> }, upgrade(ptr) => ptr);
46
47impl TryFrom<&WeakNode> for StrongNode {
48    type Error = ();
49
50    fn try_from(weak: &WeakNode) -> Result<Self, Self::Error> {
51        weak.upgrade()
52            .map(|strong| StrongNode { inner_ptr: strong })
53            .ok_or(())
54    }
55}
56
57pub(super) static YOUNG_GEN: Lazy<DashMap<WeakNode, usize>> = Lazy::new(DashMap::default);
58pub(super) static OLD_GEN: Lazy<DashSet<WeakNode>> = Lazy::new(DashSet::default);
59
60static COLLECTION_MUTEX: Mutex<()> = const_mutex(());
61
62/// Visitor provided during tracing of the reachable object graph. You shouldn't need to interact
63/// with this as [`Agc::visit_children`] will do the right thing for you.
64pub struct GcVisitor<'cycle> {
65    visitor: &'cycle mut dyn FnMut(Arc<AtomicInner<dyn Trace>>),
66}
67
68impl GcVisitor<'_> {
69    /// Visit an owned [`Agc`] node.
70    pub fn visit_node<T: Trace + 'static>(&mut self, node: &Agc<T>) {
71        (self.visitor)(node.ptr.clone() as Arc<_>);
72    }
73}
74
75type GraphIndex = NodeIndex<usize>;
76
77type ConnectivityGraph = Csr<(), (), Directed, GraphIndex>;
78
79type TracedNodeList = IndexMap<StrongNode, NonZeroUsize>;
80
81#[doc(hidden)]
82pub fn count_roots() -> usize {
83    YOUNG_GEN.len() + OLD_GEN.len()
84}
85
86/// Perform a full, cycle-tracing collection of both the old & young gen.
87/// See [`collect`] for more details on the implementation of collection.
88pub fn collect_full() {
89    collect_with_options(CollectOptions::TRACE_AND_COLLECT_ALL);
90}
91
92/// Perform a normal collection cycle.
93///
94/// - Only one thread at a time is permitted to collect the old generation. If another thread
95///   attempts to start collection while the old gen is being processed, it will return early
96///   without performing any garbage collection.
97/// - Collection is optimized for latency over throughput. The goal is minimal pause times, not
98///   maximizing the speed with which memory is released.
99/// - Collection is not incremental, it will process all of the possible pointers available at the
100///   time collection starts.
101/// - It may take several calls to `collect` or [`collect_full`] before all garbage is cleaned up
102///   depending on the specific location of values in the old/young gen - even if no additional
103///   garbage is produced between calls; however, it is likely that most memory will be cleaned up
104///   after a single pass.
105pub fn collect() {
106    collect_with_options(CollectOptions::default());
107}
108
109/// Perform a collection cycle based on [`CollectOptions`].
110/// See [`collect`] for more details on the implementation of collection.
111pub fn collect_with_options(options: CollectOptions) {
112    collect_new_gen(options);
113    if options.kind != CollectionType::YoungOnly {
114        collect_old_gen();
115    }
116}
117
118fn collect_new_gen(options: CollectOptions) {
119    let mut to_old_gen = vec![];
120    YOUNG_GEN.retain(|node, generation| {
121        if node.strong_count() == 0 {
122            return false;
123        }
124
125        if *generation < options.old_gen_threshold {
126            *generation += 1;
127            return true;
128        }
129
130        to_old_gen.push(node.clone());
131        false
132    });
133
134    for node in to_old_gen {
135        OLD_GEN.insert(node);
136    }
137}
138
139fn collect_old_gen() {
140    let _guard = if let Some(guard) = COLLECTION_MUTEX.try_lock() {
141        guard
142    } else {
143        // Some other thread is running collection on the old gen.
144        return;
145    };
146
147    let mut traced_nodes = IndexMap::with_capacity(OLD_GEN.len());
148    OLD_GEN.retain(|node| {
149        if let Ok(strong) = TryInto::<StrongNode>::try_into(node) {
150            if strong.status.load(atomic::Ordering::Acquire) == Status::RecentlyDecremented {
151                traced_nodes.insert(strong, NonZeroUsize::new(1).unwrap());
152            }
153        }
154        false
155    });
156
157    let mut connectivity_graph = ConnectivityGraph::with_nodes(traced_nodes.len());
158
159    let mut pending_nodes = connectivity_graph
160        .node_identifiers()
161        .collect::<VecDeque<_>>();
162
163    while let Some(node_ix) = pending_nodes.pop_front() {
164        trace_children(
165            node_ix,
166            &mut pending_nodes,
167            &mut traced_nodes,
168            &mut connectivity_graph,
169        );
170    }
171
172    let mut live_nodes = vec![];
173    let mut dead_nodes = HashMap::default();
174
175    for (node_ix, (node, refs)) in traced_nodes.into_iter().enumerate() {
176        if Arc::strong_count(&node) > refs.get() {
177            // Even if this node becomes dead between the read of the strong count above and the
178            // cmp/mark as live, we won't leak memory as the node will get added to the
179            // young gen.
180            node.status.store(Status::Live, atomic::Ordering::Release);
181            live_nodes.push(node_ix);
182        } else if node.status.load(atomic::Ordering::Acquire) != Status::Traced {
183            // This node may be dead, but something accessed it internals between
184            // the time we traced its children and now. We can't know if the child graph
185            // is still the same, so we can't attempt to drop its children. If the node became dead
186            // between tracing and access to its children, the node will end up placed in the young
187            // gen and can be collected later.
188            live_nodes.push(node_ix);
189        } else {
190            dead_nodes.insert(node_ix, node);
191        }
192    }
193
194    for node_index in live_nodes {
195        filter_live_node_children(&connectivity_graph, node_index, &mut dead_nodes);
196    }
197
198    for node in dead_nodes.values() {
199        node.status.store(Status::Dead, atomic::Ordering::Release);
200    }
201
202    for (_, node) in dead_nodes {
203        AtomicInner::drop_data(&node);
204    }
205}
206
207fn trace_children(
208    parent_ix: GraphIndex,
209    pending_nodes: &mut VecDeque<GraphIndex>,
210    traced_nodes: &mut TracedNodeList,
211    connectivity_graph: &mut ConnectivityGraph,
212) {
213    let pin_parent = traced_nodes.get_index(parent_ix).unwrap().0.clone();
214    let parent = {
215        if let Some(_guard) = pin_parent.data.try_write() {
216            // This must be guarded by an exclusive lock to prevent altering of the child graph
217            // without marking the node dirty.
218            pin_parent
219                .status
220                .store(Status::Traced, atomic::Ordering::Release);
221        } else {
222            // If we can't get exclusive access to parent, it must be locked by some active thread.
223            //
224            // It must be live and not visiting its children will undercount any nodes it owns,
225            // which is guaranteed to prevent us from deciding those nodes are dead.
226            return;
227        };
228
229        pin_parent.data.read_recursive()
230    };
231
232    parent.visit_children(&mut GcVisitor {
233        visitor: &mut |node| {
234            match traced_nodes.entry(node.into()) {
235                indexmap::map::Entry::Occupied(mut seen) => {
236                    // We've already seen this node. We do a saturating add because it's ok to
237                    // undercount references here and usize::max references is kind of a degenerate
238                    // case.
239                    *seen.get_mut() =
240                        NonZeroUsize::new(seen.get().get().saturating_add(1)).unwrap();
241
242                    connectivity_graph.add_edge(parent_ix, seen.index(), ());
243                }
244                indexmap::map::Entry::Vacant(unseen) => {
245                    let child_ix = connectivity_graph.add_node(());
246                    debug_assert_eq!(unseen.index(), child_ix);
247
248                    connectivity_graph.add_edge(parent_ix, child_ix, ());
249
250                    // 1 for the graph strong reference, 1 for the seen reference.
251                    unseen.insert(NonZeroUsize::new(2).unwrap());
252                    pending_nodes.push_back(child_ix);
253                }
254            };
255        },
256    });
257}
258
259fn filter_live_node_children(
260    graph: &ConnectivityGraph,
261    node: GraphIndex,
262    dead_nodes: &mut HashMap<GraphIndex, StrongNode>,
263) {
264    for child in graph.neighbors_slice(node) {
265        if dead_nodes.remove(child).is_some() {
266            filter_live_node_children(graph, *child, dead_nodes);
267        }
268    }
269}