housekeeping 0.0.3

A concurrent memory reclaimer for periodic cleanups.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
//! Guards for accessing concurrent memory.

use core::mem::{ManuallyDrop, MaybeUninit};

use alloc::sync::Arc;

use crate::{
    Cleanups, SimpleBatch,
    loomish::cell::Cell,
    qsbr,
    stack::{self, Stack},
};

//----------- Guard ------------------------------------------------------------

/// A guard for concurrent memory.
///
/// A `Guard` guarantees that concurrent memory can be safely accessed; that
/// concurrent objects visible to the current thread will not be deallocated. It
/// provides functionality for deferring such deallocations and executing them
/// when objects cannot be observed.
///
/// Every `Guard` is associated with a [`Cleanups`]; it only guarantees safety
/// for concurrent objects cleaned up through guards to the same instance of
/// `Cleanups`. Distinct instances of `Cleanups` are completely unrelated.
///
/// **NOTE:** [`Guard::refresh()`] must be called periodically. If a `Guard` is
/// not refreshed for a long time, it will prevent cleanups deferred by other
/// guards from being executed.
///
/// See [the crate-level documentation](crate) for more information, including
/// examples and a usage guide.
///
/// ## Usage
///
/// ```
/// # use std::{mem, sync::{atomic::{AtomicPtr, Ordering}, Arc}};
/// # use housekeeping::Cleanups;
/// let cleanups = Arc::new(Cleanups::new());
/// let data = AtomicPtr::new(Box::into_raw(Box::new(42)));
///
/// // Register a guard, protecting the current thread.
/// let mut guard = cleanups.register();
///
/// // Dereference pointers to memory protected by the guard.
/// // SAFETY: 'data' is protected by '&guard'.
/// let data_ptr: *mut i32 = data.load(Ordering::Acquire);
/// assert_eq!(unsafe { &*data_ptr }, &42);
///
/// // Refresh the guard so that older objects can be deallocated.
/// // This should be called periodically.
/// guard.refresh();
/// // NOTE: Objects discovered before the refresh must not be accessed.
/// //assert_eq!(unsafe { &*data_ptr }, &42); // invalid!
///
/// // Defer deallocations of memory protected by the guard.
/// // Remember to remove such objects from global view first.
/// let new_data_ptr = Box::into_raw(Box::new(23));
/// let old_data_ptr = data.swap(new_data_ptr, Ordering::AcqRel);
/// // SAFETY:
/// // - '*mut i32' can be sent between threads safely.
/// // - The closure is 'move' and is 'static.
/// unsafe { guard.defer_unchecked(move || {
///     mem::drop(Box::from_raw(old_data_ptr));
/// }) };
/// #
/// # mem::drop(unsafe { Box::from_raw(data.into_inner()) });
/// ```
///
/// ## Implementation
///
/// `Guard` is a convenience wrapper around [`RawGuard`], referencing the
/// associated `Cleanups` through an [`Arc`]. It exposes the same methods
/// as `RawGuard` but implicitly provides the `Cleanups` parameter. Prefer
/// `RawGuard` if `Arc<Cleanups>` is not acceptable.
pub struct Guard<B = SimpleBatch> {
    /// The associated [`Cleanups`].
    cleanups: Arc<Cleanups<B>>,

    /// The underlying raw guard.
    raw: ManuallyDrop<RawGuard<B>>,
}

impl<B> Guard<B> {
    /// Register against the specified [`Cleanups`].
    ///
    /// The `Cleanups` will be synchronized with, so that objects that have
    /// already been deallocated will not be visible. After this, the calling
    /// thread can access concurrent memory covered by the `Cleanups`.
    ///
    /// [`Cleanups::register()`] is a convenient shorthand for this.
    ///
    /// ## Panics
    ///
    /// Panics if `usize::MAX / 2` (or more) guards are registered against a
    /// single `Cleanups`.
    pub fn new(cleanups: Arc<Cleanups<B>>) -> Self {
        Self {
            raw: ManuallyDrop::new(RawGuard::new(&cleanups)),
            cleanups,
        }
    }

    /// Refresh the guard.
    ///
    /// All references protected by this guard are invalidated. The thread will
    /// be synchronized with the associated [`Cleanups`], potentially unlocking
    /// previously deferred cleanups (i.e. making them ready to execute).
    ///
    /// Sometimes, old batches (that were ready to execute and reuse, but were
    /// not reused in time) will be returned, that the caller should execute
    /// immediately. For the default batch type, [`SimpleBatch`], execution is
    /// automatic and the returned value can be ignored.
    ///
    /// If a local batch exists, it will be extracted and sent.
    pub fn refresh(&mut self) -> Option<impl IntoIterator<Item = B> + use<B>> {
        // SAFETY: 'self.raw' is associated with 'self.cleanups'.
        unsafe { self.raw.refresh(&self.cleanups) }
    }

    /// Drop the guard, returning batches needing cleanup.
    ///
    /// The guard will be disconnected from the [`Cleanups`]. If any batches
    /// were ready to be reused, they will be returned, and the caller should
    /// execute them immediately. For the default batch type, [`SimpleBatch`],
    /// execution is automatic and the returned value can be ignored.
    ///
    /// If a local batch exists, it will be extracted and sent.
    pub fn drop_and_take(self) -> Option<impl IntoIterator<Item = B> + use<B>> {
        let (cleanups, raw) = self.into_raw_parts();
        // SAFETY: 'self.raw' is associated with 'self.cleanups'.
        unsafe { raw.drop_and_take(&cleanups) }
    }

    /// Deconstruct a [`Guard`] into its raw parts.
    ///
    /// The returned [`RawGuard`] is associated with the returned [`Cleanups`].
    /// This is important for safely using the guard.
    pub fn into_raw_parts(self) -> (Arc<Cleanups<B>>, RawGuard<B>) {
        let this = MaybeUninit::new(self);
        let cleanups = unsafe { (&raw const (*this.as_ptr()).cleanups).read() };
        let raw = unsafe { (&raw const (*this.as_ptr()).raw).read() };
        (cleanups, ManuallyDrop::into_inner(raw))
    }

    /// Reconstruct a [`Guard`] from its raw parts.
    ///
    /// ## Safety
    ///
    /// `Guard::from_raw_parts(cleanups, raw)` is sound if and only if `raw`
    /// is associated with `cleanups`.
    pub const unsafe fn from_raw_parts(cleanups: Arc<Cleanups<B>>, raw: RawGuard<B>) -> Self {
        Self {
            cleanups,
            raw: ManuallyDrop::new(raw),
        }
    }
}

impl<B> Drop for Guard<B> {
    /// Drop a [`Guard`].
    ///
    /// The guard will be deregistered from the associated [`Cleanups`], using
    /// [`RawGuard::drop()`]. This may cause the cleanup of some batches to
    /// be delayed -- consider using [`Self::drop_and_take()`] if batches can
    /// be executed.
    ///
    /// If a local batch exists, it will be extracted and sent.
    fn drop(&mut self) {
        // SAFETY: 'self' is being dropped so 'self.raw' will not be used.
        let raw = unsafe { ManuallyDrop::take(&mut self.raw) };
        // SAFETY: 'self.raw' is associated with 'self.cleanups'.
        unsafe { raw.drop(&self.cleanups) };
    }
}

/// # Working with [`SimpleBatch`]
///
/// This `impl` block provides convenience methods when the batch type used
/// is [`SimpleBatch`] (as is the common case). These methods do not rely on
/// crate-internal functionality; they can be implemented by the caller (e.g.
/// on a wrapper type) for their own batch type.
impl Guard<SimpleBatch> {
    /// Defer a cleanup action.
    ///
    /// The provided closure, which cleans up some kind of shared resource, will
    /// be deferred until the resource is unused.
    pub fn defer<F: FnOnce() + Send + 'static>(&self, f: F) {
        // SAFETY: 'f' is 'Send' as per the trait bound.
        unsafe { self.defer_unchecked(f) }
    }

    /// Defer a cleanup action.
    ///
    /// The provided closure, which cleans up some kind of shared resource, will
    /// be deferred until the resource is unused.
    ///
    /// ## Safety
    ///
    /// `self.defer_unchecked(f)` is sound if and only if:
    /// - `f` is sound to send (by value) between threads, i.e. it satisfies the
    ///   requirements of [`Send`],
    /// - `f` can be called at any time, i.e. it satisfies `'static`.
    pub unsafe fn defer_unchecked<F: FnOnce()>(&self, f: F) {
        // Try adding the function to the local batch.
        if let Some(mut batch) = self.take_batch() {
            // Try adding the item to the batch.
            if !batch.is_full() {
                // SAFETY: As per the caller, 'f' can be sent to other threads.
                unsafe { batch.add_unchecked(f) }
                    .unwrap_or_else(|_| unreachable!("the batch is not full"));

                self.store_batch(batch)
                    .unwrap_or_else(|_| unreachable!("no user-defined code was called"));
                return;
            } else {
                self.send_batch(batch);
            }
        }

        // Try reusing a batch.
        if let Some(mut batch) = self.reuse_batch() {
            batch.execute();

            // Check if a new batch was created during execution.
            if let Some(mut other) = self.take_batch() {
                if !other.is_full() {
                    // SAFETY: As per the caller, 'f' can be sent to other threads.
                    unsafe { other.add_unchecked(f) }
                        .unwrap_or_else(|_| unreachable!("the batch is not full"));

                    self.store_batch(other)
                        .unwrap_or_else(|_| unreachable!("no user-defined code was called"));
                    return;
                } else {
                    self.send_batch(other);
                }
            }

            // SAFETY: As per the caller, 'f' can be sent to other threads.
            unsafe { batch.add_unchecked(f) }
                .unwrap_or_else(|_| unreachable!("the batch has been cleared"));
            self.store_batch(batch)
                .unwrap_or_else(|_| unreachable!("no user-defined code was called"));
            return;
        }

        // Otherwise, create a new batch.
        let mut batch = SimpleBatch::new();
        // SAFETY: As per the caller, 'f' can be sent to other threads.
        unsafe { batch.add_unchecked(f) }
            .unwrap_or_else(|_| unreachable!("the batch was just created"));
        self.store_batch(batch)
            .unwrap_or_else(|_| unreachable!("no user-defined code was called"));
    }
}

/// # Batch Actions
///
/// Cleanups are grouped into _batches_. A `Guard` can hold a local batch,
/// into which cleanups can be added; once it is adequately sized, it can be
/// sent into the `Cleanups`. When a new batch is required, an old batch can be
/// retrieved from the `Cleanups`; its cleanups can be executed and the batch
/// instance reused.
///
/// These are lower-level functions that only matter for custom batch
/// types. When [`SimpleBatch`] is used, focus on [`Self::defer()`] and
/// [`Self::defer_unchecked()`].
impl<B> Guard<B> {
    /// Store a local batch.
    ///
    /// If a local batch already exists, it will not be modified, and the given
    /// batch will be returned.
    #[inline]
    pub fn store_batch(&self, batch: B) -> Result<(), B> {
        self.raw.store_batch(batch)
    }

    /// Take the local batch, if any.
    ///
    /// If a local batch is stored, it will be extracted and returned.
    #[inline]
    pub fn take_batch(&self) -> Option<B> {
        self.raw.take_batch()
    }

    /// Send a batch to the [`Cleanups`].
    ///
    /// The given batch of deferred cleanups will be sent to the overall
    /// [`Cleanups`] immediately; it will be stored internally until its
    /// cleanups are ready to execute.
    ///
    /// This method ignores the local batch entirely.
    pub fn send_batch(&self, batch: B) {
        // SAFETY: 'self.raw' is associated with 'self.cleanups'.
        unsafe { self.raw.send_batch(batch, &self.cleanups) }
    }

    /// Reuse an old batch, if one is available.
    ///
    /// If an old batch can be found, it will be returned. Its cleanups should
    /// be executed and cleared out; the batch can then be reused.
    ///
    /// This method ignores the local batch entirely.
    pub fn reuse_batch(&self) -> Option<B> {
        // SAFETY: 'self.raw' is associated with 'self.cleanups'.
        unsafe { self.raw.reuse_batch(&self.cleanups) }
    }
}

//----------- RawGuard --------------------------------------------------------

/// A raw guard for deferring cleanups.
///
/// A `RawGuard` guarantees that concurrent memory can be safely accessed; that
/// concurrent objects visible to the current thread will not be deallocated. It
/// provides functionality for deferring such deallocations and executing them
/// when objects cannot be observed.
///
/// Every `RawGuard` is associated with a [`Cleanups`]; it only guarantees
/// safety for concurrent objects cleaned up through guards to the same instance
/// of `Cleanups`. Distinct instances of `Cleanups` are completely unrelated.
/// `RawGuard` does not hold a reference to the `Cleanups`; the user must hold
/// a reference for the guard and pass it into all relevant methods. Prefer
/// using [`Guard`] if the `Cleanups` can be stored in an [`Arc`].
///
/// **NOTE:** [`RawGuard::refresh()`] must be called periodically. If a
/// `RawGuard` is not refreshed for a long time, it will prevent cleanups
/// deferred by other guards from being executed.
///
/// See [the crate-level documentation](crate) for more information, including
/// examples and a usage guide.
pub struct RawGuard<B> {
    /// The QSBR user.
    user: ManuallyDrop<qsbr::User>,

    /// The local batch.
    batch: Cell<Option<B>>,
}

impl<B> RawGuard<B> {
    /// Construct a new [`RawGuard`].
    ///
    /// The provided [`Cleanups`] will be registered against. This instance of
    /// [`Cleanups`] must be passed to all methods on `self`.
    ///
    /// ## Panics
    ///
    /// Panics if `usize::MAX / 2` (or more) guards are registered against a
    /// single [`Cleanups`].
    pub fn new(cleanups: &Cleanups<B>) -> Self {
        let user = cleanups.schedule.register();
        Self {
            user: ManuallyDrop::new(user),
            batch: Cell::new(None),
        }
    }

    /// Refresh the guard.
    ///
    /// All references protected by this guard are invalidated. This guard
    /// will synchronize with the [`Cleanups`], potentially unlocking previously
    /// deferred cleanups (i.e. making them ready to execute).
    ///
    /// Old batches of cleanups are reused through [`Self::reuse_batch()`]. In
    /// case some such batches were not reused in time, they will be returned
    /// here for immediate execution and cleanup.
    ///
    /// If a local batch exists, it will be extracted and sent.
    ///
    /// ## Safety
    ///
    /// `self.refresh(cleanups)` is sound if and only if:
    /// - `self` is associated with `cleanups`.
    pub unsafe fn refresh(
        &mut self,
        cleanups: &Cleanups<B>,
    ) -> Option<impl IntoIterator<Item = B> + use<B>> {
        // If a local batch exists, send it.
        if let Some(batch) = self.take_batch() {
            // SAFETY: As per the caller, 'self' is associated with 'cleanups'.
            unsafe { self.send_batch(batch, cleanups) };
        }

        if let Some(leaving) = self.user.progress(&cleanups.schedule)
            && let Some(_last_leaving) = leaving.leave_last()
        {
            let deferred = &cleanups.deferred[self.user.index()];
            let reused = &cleanups.reused[self.user.index()];
            let slots = &cleanups.slots[self.user.index()];

            // Clean up the phase.
            let deferred = unsafe { deferred.swap(Stack::new()) };
            let reused = unsafe { reused.swap(deferred) };
            let _ = unsafe { slots.swap(Stack::new()) };

            Some(reused)
        } else {
            None
        }
    }

    /// Drop the guard.
    ///
    /// The guard will be disconnected from the [`Cleanups`]. This may cause
    /// the cleanup of some batches to be delayed -- consider using
    /// [`Self::drop_and_take()`] if batches can be executed.
    ///
    /// If a local batch exists, it will be extracted and sent.
    ///
    /// ## Safety
    ///
    /// `self.drop(cleanups)` is sound if and only if:
    /// - `self` is associated with `cleanups`.
    pub unsafe fn drop(self, cleanups: &Cleanups<B>) {
        // If a local batch exists, send it.
        if let Some(batch) = self.take_batch() {
            // SAFETY: As per the caller, 'self' is associated with 'cleanups'.
            unsafe { self.send_batch(batch, cleanups) };
        }

        let index = self.user.index();
        let user = ManuallyDrop::into_inner(self.user);
        let leaving = user.deregister(&cleanups.schedule);
        if let Some(_last_leaving) = leaving.leave_last() {
            let deferred = &cleanups.deferred[index];
            let reused = &cleanups.reused[index];
            let slots = &cleanups.slots[index];

            // Clean up the phase.
            while let Some(node) = unsafe { reused.pop() } {
                unsafe { deferred.push(node) };
            }
            let deferred = unsafe { deferred.swap(Stack::new()) };
            let _ = unsafe { reused.swap(deferred) };
            let _ = unsafe { slots.swap(Stack::new()) };
        }
    }

    /// Drop the guard, returning batches needing cleanup.
    ///
    /// The guard will be disconnected from the [`Cleanups`]. If any batches
    /// were ready to be reused, they will be returned.
    ///
    /// If a local batch exists, it will be extracted and sent.
    ///
    /// ## Safety
    ///
    /// `self.drop_and_take(cleanups)` is sound if and only if:
    /// - `self` is associated with `cleanups`.
    pub unsafe fn drop_and_take(
        self,
        cleanups: &Cleanups<B>,
    ) -> Option<impl IntoIterator<Item = B> + use<B>> {
        // If a local batch exists, send it.
        if let Some(batch) = self.take_batch() {
            // SAFETY: As per the caller, 'self' is associated with 'cleanups'.
            unsafe { self.send_batch(batch, cleanups) };
        }

        let index = self.user.index();
        let user = ManuallyDrop::into_inner(self.user);
        let leaving = user.deregister(&cleanups.schedule);
        if let Some(_last_leaving) = leaving.leave_last() {
            let deferred = &cleanups.deferred[index];
            let reused = &cleanups.reused[index];
            let slots = &cleanups.slots[index];

            // Clean up the phase.
            let deferred = unsafe { deferred.swap(Stack::new()) };
            let reused = unsafe { reused.swap(deferred) };
            let _ = unsafe { slots.swap(Stack::new()) };

            Some(reused)
        } else {
            None
        }
    }
}

/// # Batch Actions
impl<B> RawGuard<B> {
    /// Store a local batch.
    ///
    /// If a local batch already exists, it will not be modified, and the given
    /// batch will be returned.
    pub fn store_batch(&self, batch: B) -> Result<(), B> {
        match self.batch.take() {
            None => {
                self.batch.set(Some(batch));
                Ok(())
            }
            Some(existing) => {
                self.batch.set(Some(existing));
                Err(batch)
            }
        }
    }

    /// Take the local batch, if any.
    ///
    /// If a local batch is stored, it will be extracted and returned.
    pub fn take_batch(&self) -> Option<B> {
        self.batch.replace(None)
    }

    /// Send a batch to the [`Cleanups`].
    ///
    /// The given batch of deferred cleanups will be sent to the overall
    /// [`Cleanups`] immediately; it will be stored internally until its
    /// cleanups are ready to execute.
    ///
    /// This method ignores the local batch entirely.
    ///
    /// ## Safety
    ///
    /// `self.send_batch(batch, cleanups)` is sound if and only if:
    /// - `self` is associated with `cleanups`.
    pub unsafe fn send_batch(&self, batch: B, cleanups: &Cleanups<B>) {
        let slot = unsafe {
            cleanups.slots[self.user.index()]
                .pop()
                .unwrap_or_else(|| stack::Node::new(MaybeUninit::uninit()))
        };

        unsafe { (*slot.as_ptr()).item.write(batch) };
        let slot = slot.cast::<stack::Node<B>>();

        unsafe { cleanups.deferred[self.user.index()].push(slot) };
    }

    /// Reuse an old batch, if one is available.
    ///
    /// If an old batch can be found, it will be returned. Its cleanups should
    /// be executed and cleared out; the batch can then be reused.
    ///
    /// This method ignores the local batch entirely.
    ///
    /// ## Safety
    ///
    /// `self.reuse_batch(cleanups)` is sound if and only if:
    /// - `self` is associated with `cleanups`.
    pub unsafe fn reuse_batch(&self, cleanups: &Cleanups<B>) -> Option<B> {
        unsafe { cleanups.reused[self.user.index()].pop() }.map(|node| {
            let slot = node.cast::<stack::Node<MaybeUninit<B>>>();
            let batch = unsafe { (*slot.as_ptr()).item.assume_init_read() };
            unsafe { cleanups.slots[self.user.index()].push(slot) };
            batch
        })
    }
}