cycle_ptr 0.1.0

Smart pointers, with cycles
//! Define a way for users of the library to control execution of the garbage collector.
use crate::errors::{Error, ErrorEnum};
use crate::generation::MTGeneration;
use crate::generation::defer_gc::maybe_defer_sync_task;
use crate::generation::stats::GcStats;
use std::fmt;
use std::marker::PhantomData;
use std::sync::mpsc;
use std::sync::{RwLock, Weak};
use std::thread;
use std::thread::JoinHandle;

/// Function-pointer used to refer to a task.
type TaskPtr = Box<dyn Send + Sync + Fn(GcTask)>;

/// Global callback for thread-safe garbage collector tasks ([GcTask]).
static GC_TASK_FN: RwLock<Option<TaskPtr>> = RwLock::new(None);

/// A garbage collection task.
///
/// This task is used to ensure a garbage collection is run,
/// but also holds on to the run, allowing you to control when it runs.
/// (Or you can [std::mem::forget] it, to never run the garbage collector and leak the memory.)
///
/// # Example
///
/// ```
/// use cycle_ptr::prelude::*;
/// use cycle_ptr::sync::{GcTask, GcMtMemberPtr, Metadata, GcMtPtr};
/// use std::sync::mpsc;
/// use std::sync::RwLock;
/// use std::thread;
///
/// /// Struct that'll print a text when it is dropped.
/// struct PrintOnDrop {
///     text: &'static str,
///
///     /// Include a self-reference, to force the garbage collector to do work.
///     _self_reference: RwLock<Option<GcMtMemberPtr<PrintOnDrop>>>,
///     metadata: Metadata,
/// }
///
/// impl PrintOnDrop {
///     fn new(text: &'static str) -> GcMtPtr<PrintOnDrop> {
///         let ptr = GcMtPtr::new(|metadata| PrintOnDrop {
///             text,
///             _self_reference: RwLock::new(None),
///             metadata,
///         });
///         /// Create a self reference.
///         /// This will force the garbage collector task
///         /// to confirm the object is no longer reachable.
///         let _ = ptr
///             ._self_reference
///             .write()
///             .expect("write access should never fail because we have the only reference")
///             .insert(ptr.metadata.new_pointer(ptr.clone()));
///         ptr
///     }
/// }
///
/// impl Drop for PrintOnDrop {
///     fn drop(&mut self) {
///         eprintln!("{}", self.text);
///     }
/// }
///
/// // Install a callback handler that uses a garbage collector thread.
/// let (pending_tx, pending_rx) = mpsc::channel::<GcTask>();
/// let gc_thread = thread::spawn(move || {
///     for task in pending_rx.iter() {
///         eprint!("GC task: ");
///         task.run();
///     }
/// });
/// let callback_handle =
///     GcTask::install_callback(move |task| pending_tx.send(task).unwrap()).expect("there should not be a callback");
///
/// // Create something for the garbage collectors to munch on.
/// let numbers = vec![
///     PrintOnDrop::new("one"),
///     PrintOnDrop::new("two"),
///     PrintOnDrop::new("three"),
///     PrintOnDrop::new("four"),
///     PrintOnDrop::new("five"),
///     PrintOnDrop::new("six"),
///     PrintOnDrop::new("seven"),
///     PrintOnDrop::new("eight"),
///     PrintOnDrop::new("nine"),
///     PrintOnDrop::new("ten"),
/// ];
/// // And now drop those numbers (so the garbage collector will do its job).
/// drop(numbers);
///
/// // Drop the callback handle.
/// drop(callback_handle);
/// // Wait for the garbage collector thread to finish.
/// gc_thread
///     .join()
///     .expect("garbage collector should join normally");
/// ```
///
/// This example prints:
/// ```text
/// GC task: one
/// GC task: two
/// GC task: three
/// GC task: four
/// GC task: five
/// GC task: six
/// GC task: seven
/// GC task: eight
/// GC task: nine
/// GC task: ten
/// ```
#[derive(Debug)]
pub struct GcTask {
    /// Generation that is to be garbage collected.
    generation: Option<Weak<MTGeneration>>,
}

/// A handle on a GC callback.
///
/// Dropping the handle will uninstall the associated callback function.
pub struct GcTaskCallback<'a> {
    /// Track the lifetime.
    lifetime: PhantomData<&'a ()>,
}

/// A handle on a GC thread.
///
/// Dropping the handle will uninstall the associated callback function.
#[derive(Debug)]
pub struct GcThread {
    /// Handle on the callback method.
    handle: Option<GcTaskCallback<'static>>,
    /// Thread that executes the tasks.
    thread: Option<JoinHandle<()>>,
}

impl Drop for GcTask {
    #[inline]
    fn drop(&mut self) {
        if let Some(generation) = self.generation.take().as_ref().and_then(Weak::upgrade) {
            generation.run_gc();
        }
    }
}

impl GcTask {
    /// Create a new task.
    pub(super) const fn new(generation: Weak<MTGeneration>) -> Self {
        GcTask {
            generation: Some(generation),
        }
    }

    /// Run the GC task.
    ///
    /// Note: if you don't run this function, the [drop][Drop::drop] of [GcTask] will run the task anyway.
    /// Invoking this function might be more readable than simply dropping the task however.
    ///
    /// Returns the number of objects that were collected, and the number of objects that were retained.
    /// Returns [None] if the garbage collector didn't run.
    #[inline]
    pub fn run(mut self) -> Option<GcStats> {
        self.generation
            .take()
            .as_ref()
            .and_then(Weak::upgrade)
            .map(MTGeneration::run_gc)
    }

    /// Post the task on a user-supplied callback.
    ///
    /// If the user has not created a callback, the task will be returned.
    pub(super) fn post(self) -> Option<Self> {
        maybe_defer_sync_task(self)
            .err()
            .and_then(Self::post_no_defer)
    }

    /// Post the task on a user-supplied callback.
    ///
    /// This method skips the `defer` handler.
    /// If the user has not created a callback, the task will be returned.
    pub(super) fn post_no_defer(self) -> Option<Self> {
        match GC_TASK_FN.read() {
            Ok(gc_task_fn) => {
                if let Some(gc_task_fn) = &*gc_task_fn {
                    gc_task_fn(self);
                    None
                } else {
                    Some(self)
                }
            }
            Err(_) => Some(self),
        }
    }

    /// Install a callback for running [GcTask].
    ///
    /// Returns a handle to the installed callback.
    /// Dropping the handle will uninstall the callback function.
    ///
    /// # Errors
    ///
    /// If there is already a callback present, the function will return an [Error].
    #[allow(
        clippy::missing_inline_in_public_items,
        reason = "This is rarely called."
    )]
    pub fn install_callback<'a>(
        callback: impl Fn(GcTask) + Send + Sync + 'a,
    ) -> Result<GcTaskCallback<'a>, Error> {
        let gc_task_fn = &mut *GC_TASK_FN
            .write()
            .map_err(|_| Error::new(ErrorEnum::InternalLockingError))?;
        if gc_task_fn.is_some() {
            Err(Error::new(ErrorEnum::CallbackAlreadyInstalled))
        } else {
            let callback = Box::new(callback);
            let callback: Box<dyn Fn(GcTask) + Send + Sync + 'a> = callback;
            let callback = unsafe {
                std::mem::transmute::<
                    Box<dyn Fn(GcTask) + Send + Sync + 'a>,
                    Box<dyn Fn(GcTask) + Send + Sync + 'static>,
                >(callback)
            };
            *gc_task_fn = Some(callback);
            Ok(GcTaskCallback {
                lifetime: PhantomData,
            })
        }
    }

    /// Create a garbage collector thread.
    ///
    /// # Returns
    ///
    /// A [GcThread] which controls the lifetime of the garbage collector thread.
    ///
    /// # Errors
    ///
    /// If there is already a callback present, the function will return an [Error].
    /// If the thread can't be started, this will also result in an [Error].
    #[inline(never)]
    pub fn gc_thread() -> Result<GcThread, Error> {
        let (tx, rx) = mpsc::channel();
        let handle = Self::install_callback(move |task| {
            let _ = tx.send(task);
        })?;

        let thread = thread::Builder::new()
            .name("gc".into())
            .spawn(move || {
                rx.iter().for_each(|task| {
                    task.run();
                });
            })
            .map_err(|io_err| Error::new(ErrorEnum::ThreadSpawnFailed(io_err.into())))?;

        Ok(GcThread {
            handle: Some(handle),
            thread: Some(thread),
        })
    }
}

impl Drop for GcTaskCallback<'_> {
    #[allow(
        clippy::missing_inline_in_public_items,
        reason = "This is rarely called."
    )]
    fn drop(&mut self) {
        GC_TASK_FN.write().unwrap().take();
    }
}

impl fmt::Debug for GcTaskCallback<'_> {
    #[inline]
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
        f.write_str("GcTaskCallback")
    }
}

impl Drop for GcThread {
    /// Stops the garbage collector thread and waits for the garbage collector thread to actually end.
    ///
    /// No more tasks will be added to the thread, but any tasks already submitted to the thread will complete,
    /// before this function returns.
    #[inline]
    fn drop(&mut self) {
        self.handle = None; // Uninstalls the sender.
        if let Some(thread) = self.thread.take() {
            let _ = thread.join();
        }
    }
}

impl GcThread {
    /// Stops the garbage collector thread, but does not wait for the garbage collector thread to actually end.
    ///
    /// No more tasks will be added to the thread, but any outstanding tasks will complete.
    /// The function won't wait for those to complete.
    ///
    /// Should you want to wait on the thread anyway, you can use [JoinHandle::join].
    /// But you don't have to.
    ///
    /// # Panics
    ///
    /// If the join handle is absent. This would be indicative of a bug in the [GcTask::gc_thread] code.
    #[inline]
    pub fn drop_no_wait(mut self) -> JoinHandle<()> {
        self.thread.take().unwrap()
    }
}