shredder/collector/
mod.rs

1mod alloc;
2mod collect_impl;
3mod data;
4mod dropper;
5mod trigger;
6
7use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::thread::spawn;
10
11use crossbeam::channel::{self, Sender};
12use once_cell::sync::Lazy;
13use parking_lot::Mutex;
14
15use crate::collector::alloc::GcAllocation;
16use crate::collector::dropper::{BackgroundDropper, DropMessage};
17use crate::collector::trigger::GcTrigger;
18use crate::concurrency::atomic_protection::{APSInclusiveGuard, AtomicProtectingSpinlock};
19use crate::concurrency::chunked_ll::{CLLItem, ChunkedLinkedList};
20use crate::concurrency::lockout::{ExclusiveWarrant, Lockout, Warrant};
21use crate::marker::GcDrop;
22use crate::{Finalize, Scan, ToScan};
23
24pub use crate::collector::data::{GcData, GcHandle, UnderlyingData};
25
26/// Intermediate struct. `Gc<T>` holds a `InternalGcRef`, which references a `GcHandle`
27/// There should be one `GcHandle` per `Gc<T>`
28#[derive(Clone, Debug)]
29pub struct InternalGcRef {
30    handle_ref: CLLItem<GcHandle>,
31}
32
33impl InternalGcRef {
34    pub(crate) fn new(handle_ref: CLLItem<GcHandle>) -> Self {
35        Self { handle_ref }
36    }
37
38    pub(crate) fn invalidate(&self) {
39        COLLECTOR.drop_handle(self);
40    }
41
42    pub(crate) fn data(&self) -> &Arc<GcData> {
43        if let UnderlyingData::Fixed(data) = &self.handle_ref.v.underlying_data {
44            data
45        } else {
46            panic!("Only fixed data has a usable `data` method")
47        }
48    }
49}
50
51/// We don't want to expose what specific warrant provider we're using
52/// (this struct should be optimized away)
53pub struct GcGuardWarrant {
54    /// stores the internal warrant. only the drop being run is relevant
55    _warrant: Warrant<Arc<GcData>>,
56}
57type GcExclusiveWarrant = ExclusiveWarrant<Arc<GcData>>;
58
59pub struct Collector {
60    /// shredder only allows one collection to proceed at a time
61    gc_lock: Mutex<()>,
62    /// this prevents atomic operations from happening during collection time
63    atomic_spinlock: AtomicProtectingSpinlock,
64    /// trigger decides when we should run a collection
65    trigger: GcTrigger,
66    /// dropping happens in a background thread. This struct lets us communicate with that thread
67    dropper: BackgroundDropper,
68    /// we run automatic gc in a background thread
69    /// sending to this channel indicates that thread should check the trigger, then collect if the
70    /// trigger indicates it should
71    async_gc_notifier: Sender<()>,
72    /// all the data we are managing plus metadata about what `Gc<T>`s exist
73    tracked_data: TrackedData,
74}
75
76/// Stores metadata about each piece of tracked data, plus metadata about each handle
77#[derive(Debug)]
78struct TrackedData {
79    /// we increment this whenever we collect
80    current_collection_number: AtomicU64,
81    /// a set storing metadata on the live data the collector is managing
82    data: ChunkedLinkedList<GcData>,
83    /// a set storing metadata on each live handle (`Gc<T>`) the collector is managing
84    handles: ChunkedLinkedList<GcHandle>,
85}
86
87// TODO(issue): https://github.com/Others/shredder/issues/7
88
89impl Collector {
90    fn new() -> Arc<Self> {
91        let (async_gc_notifier, async_gc_receiver) = channel::bounded(1);
92
93        let res = Arc::new(Self {
94            gc_lock: Mutex::default(),
95            atomic_spinlock: AtomicProtectingSpinlock::default(),
96            trigger: GcTrigger::default(),
97            dropper: BackgroundDropper::new(),
98            async_gc_notifier,
99            tracked_data: TrackedData {
100                // This is janky, but we subtract one from the collection number
101                // to get a previous collection number in `do_collect`
102                //
103                // We also use 0 as a sentinel value for newly allocated data
104                //
105                // Together that implies we need to start the collection number sequence at 2, not 1
106                current_collection_number: AtomicU64::new(2),
107                data: ChunkedLinkedList::new(),
108                handles: ChunkedLinkedList::new(),
109            },
110        });
111
112        // The async Gc thread deals with background Gc'ing
113        let async_collector_ref = Arc::downgrade(&res);
114        spawn(move || {
115            // An Err value means the stream will never recover
116            while async_gc_receiver.recv().is_ok() {
117                if let Some(collector) = async_collector_ref.upgrade() {
118                    collector.check_then_collect();
119                }
120            }
121        });
122
123        res
124    }
125
126    #[inline]
127    fn notify_async_gc_thread(&self) {
128        // Note: We only send if there is room in the channel
129        // If there's already a notification there the async thread is already notified
130        select! {
131            send(self.async_gc_notifier, ()) -> res => {
132                if let Err(e) = res {
133                    error!("Could not notify async gc thread: {}", e);
134                }
135            },
136            default => (),
137        };
138    }
139
140    pub fn track_with_drop<T: Scan + GcDrop>(&self, data: T) -> (InternalGcRef, *const T) {
141        let (gc_data_ptr, heap_ptr) = GcAllocation::allocate_with_drop(data);
142        self.track(gc_data_ptr, heap_ptr)
143    }
144
145    pub fn track_with_no_drop<T: Scan>(&self, data: T) -> (InternalGcRef, *const T) {
146        let (gc_data_ptr, heap_ptr) = GcAllocation::allocate_no_drop(data);
147        self.track(gc_data_ptr, heap_ptr)
148    }
149
150    pub fn track_with_finalization<T: Finalize + Scan>(
151        &self,
152        data: T,
153    ) -> (InternalGcRef, *const T) {
154        let (gc_data_ptr, heap_ptr) = GcAllocation::allocate_with_finalization(data);
155        self.track(gc_data_ptr, heap_ptr)
156    }
157
158    pub fn track_boxed_value<T: Scan + ToScan + GcDrop + ?Sized>(
159        &self,
160        data: Box<T>,
161    ) -> (InternalGcRef, *const T) {
162        let (gc_data_ptr, heap_ptr) = GcAllocation::from_box(data);
163        self.track(gc_data_ptr, heap_ptr)
164    }
165
166    fn track<T: Scan + ?Sized>(
167        &self,
168        gc_data_ptr: GcAllocation,
169        heap_ptr: *const T,
170    ) -> (InternalGcRef, *const T) {
171        let new_data_arc = Arc::new(GcData {
172            underlying_allocation: gc_data_ptr,
173            lockout: Lockout::new(),
174            deallocated: AtomicBool::new(false),
175            last_marked: AtomicU64::new(0),
176        });
177
178        let new_handle_arc = Arc::new(GcHandle {
179            underlying_data: UnderlyingData::Fixed(new_data_arc.clone()),
180            last_non_rooted: AtomicU64::new(0),
181        });
182
183        // Insert handle before data -- don't want the data to be observable before there is a relevant handle
184        let new_handle = self.tracked_data.handles.insert(new_handle_arc);
185
186        self.tracked_data.data.insert(new_data_arc);
187
188        let res = (InternalGcRef::new(new_handle), heap_ptr);
189
190        // When we allocate, the heuristic for whether we need to GC might change
191        self.notify_async_gc_thread();
192
193        res
194    }
195
196    pub fn drop_handle(&self, handle: &InternalGcRef) {
197        self.tracked_data.handles.remove(&handle.handle_ref);
198
199        // NOTE: This is worth experimenting with
200        // self.notify_async_gc_thread();
201    }
202
203    pub fn clone_handle(&self, handle: &InternalGcRef) -> InternalGcRef {
204        let new_handle_arc = Arc::new(GcHandle {
205            underlying_data: UnderlyingData::Fixed(handle.data().clone()),
206            last_non_rooted: AtomicU64::new(0),
207        });
208
209        let new_handle = self.tracked_data.handles.insert(new_handle_arc);
210
211        InternalGcRef {
212            handle_ref: new_handle,
213        }
214    }
215
216    pub fn handle_from_data(&self, underlying_data: Arc<GcData>) -> InternalGcRef {
217        let new_handle_arc = Arc::new(GcHandle {
218            underlying_data: UnderlyingData::Fixed(underlying_data),
219            last_non_rooted: AtomicU64::new(0),
220        });
221
222        let new_handle = self.tracked_data.handles.insert(new_handle_arc);
223
224        InternalGcRef {
225            handle_ref: new_handle,
226        }
227    }
228
229    pub fn new_handle_for_atomic(&self, atomic_ptr: Arc<AtomicPtr<GcData>>) -> InternalGcRef {
230        let new_handle_arc = Arc::new(GcHandle {
231            underlying_data: UnderlyingData::DynamicForAtomic(atomic_ptr),
232            last_non_rooted: AtomicU64::new(0),
233        });
234
235        let new_handle = self.tracked_data.handles.insert(new_handle_arc);
236
237        InternalGcRef {
238            handle_ref: new_handle,
239        }
240    }
241
242    #[allow(clippy::unused_self)]
243    pub fn get_data_warrant(&self, handle: &InternalGcRef) -> GcGuardWarrant {
244        // This check is only necessary in the destructors
245        // The destructor thread will always set the `deallocated` flag before deallocating data
246        if let UnderlyingData::Fixed(fixed) = &handle.handle_ref.v.underlying_data {
247            let data_deallocated = fixed.deallocated.load(Ordering::SeqCst);
248
249            if data_deallocated {
250                panic!("Tried to access into a Gc, but the internal state was corrupted (perhaps you're manipulating Gc<?> in a destructor?)");
251            }
252
253            GcGuardWarrant {
254                _warrant: Lockout::get_warrant(fixed.clone()),
255            }
256        } else {
257            panic!("Cannot get data warrant for atomic data!")
258        }
259    }
260
261    pub fn tracked_data_count(&self) -> usize {
262        self.tracked_data.data.estimate_len()
263    }
264
265    pub fn handle_count(&self) -> usize {
266        self.tracked_data.handles.estimate_len()
267    }
268
269    pub fn set_gc_trigger_percent(&self, new_trigger_percent: f32) {
270        self.trigger.set_trigger_percent(new_trigger_percent);
271    }
272
273    pub fn synchronize_destructors(&self) {
274        // We send a channel to the drop thread and wait for it to respond
275        // This has the effect of synchronizing this thread with the drop thread
276
277        let (sender, receiver) = channel::bounded(1);
278        let drop_msg = DropMessage::SyncUp(sender);
279        {
280            self.dropper
281                .send_msg(drop_msg)
282                .expect("drop thread should be infallible!");
283        }
284        receiver.recv().expect("drop thread should be infallible!");
285    }
286
287    #[inline]
288    pub fn get_collection_blocker_spinlock(&self) -> APSInclusiveGuard<'_> {
289        loop {
290            if let Some(inclusive_guard) = self.atomic_spinlock.lock_inclusive() {
291                return inclusive_guard;
292            }
293            // block on the collector if we can't get the APS guard
294            let collector_block = self.gc_lock.lock();
295            drop(collector_block);
296        }
297    }
298
299    pub fn check_then_collect(&self) -> bool {
300        let gc_guard = self.gc_lock.lock();
301
302        let current_data_count = self.tracked_data.data.estimate_len();
303        let current_handle_count = self.tracked_data.handles.estimate_len();
304        if self
305            .trigger
306            .should_collect(current_data_count, current_handle_count)
307        {
308            self.do_collect(gc_guard);
309            true
310        } else {
311            false
312        }
313    }
314
315    pub fn collect(&self) {
316        let gc_guard = self.gc_lock.lock();
317        self.do_collect(gc_guard);
318    }
319}
320
321pub static COLLECTOR: Lazy<Arc<Collector>> = Lazy::new(Collector::new);
322
323#[cfg(test)]
324pub(crate) fn get_mock_handle() -> InternalGcRef {
325    use crate::marker::GcSafe;
326    use crate::Scanner;
327
328    pub(crate) struct MockAllocation;
329    unsafe impl Scan for MockAllocation {
330        fn scan(&self, _: &mut Scanner<'_>) {}
331    }
332    unsafe impl GcSafe for MockAllocation {}
333
334    let mock_scannable: Box<dyn Scan> = Box::new(MockAllocation);
335
336    // This leaks some memory...
337    let mock_master_list = ChunkedLinkedList::new();
338
339    // Note: Here we assume a random u64 is unique. That's hacky, but is fine for testing :)
340    let handle_arc = Arc::new(GcHandle {
341        underlying_data: UnderlyingData::Fixed(Arc::new(GcData {
342            underlying_allocation: unsafe { GcAllocation::raw(Box::into_raw(mock_scannable)) },
343            lockout: Lockout::new(),
344            deallocated: AtomicBool::new(false),
345            last_marked: AtomicU64::new(0),
346        })),
347        last_non_rooted: AtomicU64::new(0),
348    });
349
350    InternalGcRef::new(mock_master_list.insert(handle_arc))
351}