pargraph 0.2.0

Operator based parallel graph processing.
Documentation
// SPDX-FileCopyrightText: 2022 Thomas Kramer <code@tkramer.ch>
//
// SPDX-License-Identifier: GPL-3.0-or-later

//! Different worklist implementations.

pub mod biglock_fifo;
pub mod crossbeam_worklist;
pub mod message_passing_fifo;

/// A worklist keeps track of graph nodes which need to be processed.
/// Depending on the application there are different requirements of how the list
/// should work. Tasks might be required to be ordered in some way or maybe can
/// also be unordered.
/// Also, the list must be accessible by many threads. This is why this trait does
/// not define functions to read and write to the list but only functions to create
/// access channels to the list. Such channels can then be sent to threads and used for communication
/// with the list.
pub trait Worklist<T> {
    /// Channel type used to communicate with this worklist.
    type Channel: WorklistChannel<T>;

    /// Create a channel for communicating with this worklist from other threads.
    fn create_channel(&mut self) -> Self::Channel;

    /// Get the number of elements in the worklist before any channels are opened.
    /// This must be exact in the beginning before any channels are opened.
    /// Used for book-keeping to detect empty worklists in multi-threaded executors.
    // TODO: Feels like a hack. Another option would be: Add a `fn new() -> Self` to this trait. Then instead of an existing worklist
    // executors take an iterator over initial elements.
    fn initial_len(&self) -> usize;

    /// Terminate the worklist.
    /// All threads calling 'pop' will get `None` in return.
    /// This function must be called once it is sure that no more items will arrive to the worklist.
    fn stop(&mut self);
}

/// Bidirectional communication channel to a worklist.
/// Such channels are intended to be sent to worker threads such that those can
/// fetch and push items to work on.
pub trait WorklistChannel<T> {
    /// Push an item to the worklist.
    fn push(&self, item: T);

    /// Push an item to the thread-local worklist of the specified worker.
    fn push_to(&self, item: T, _worker_id: u32) {
        self.push(item)
    }

    /// Get an item from the worklist.
    /// If there's currently no available item, this function should block until an item arrives
    /// or the worklist is stopped.
    fn pop(&self) -> Option<T>;

    /// Number of items in the thread-local queue.
    fn local_len(&self) -> usize;

    /// Approximate number of items in the full worklist.
    /// This value might quickly be out of date.
    fn global_len(&self) -> usize;

    /// Close the channel.
    fn close(self);
}

/// Unidirectional communication channel to a worklist.
/// This is the API which graph operators can use to push items to the worklist.
pub trait WorklistPush<T>: Sized {
    /// Push an item to the worklist.
    fn push(&mut self, item: T);

    /// Push an item to the thread-local worklist of the specified worker.
    fn push_to(&mut self, item: T, _worker_id: u32) {
        self.push(item)
    }

    /// Get the ID of the current worker thread.
    fn current_worker_id(&self) -> u32;
}

/// Wrap a bi-directional worklist channel such that it exposes
/// the unidirectional interface (push-only).
pub(crate) struct PushWrapper<'a, W> {
    worklist_channel: &'a W,
    worker_id: u32,
}

impl<'a, W> PushWrapper<'a, W> {
    pub fn new(worklist_channel: &'a W, current_worker_id: u32) -> Self {
        Self {
            worklist_channel,
            worker_id: current_worker_id,
        }
    }
}

impl<'a, W, T> WorklistPush<T> for PushWrapper<'a, W>
where
    W: WorklistChannel<T>,
{
    fn push(&mut self, item: T) {
        self.worklist_channel.push(item)
    }
    fn push_to(&mut self, item: T, worker_id: u32) {
        self.worklist_channel.push_to(item, worker_id)
    }

    fn current_worker_id(&self) -> u32 {
        self.worker_id
    }
}

/// Wrap a closure and implement `WorklistPush<T>` for it.
pub(crate) struct PushFnWrapper<F> {
    push_fn: F,
    worker_id: u32,
}

impl<F> PushFnWrapper<F> {
    pub fn new(push_fn: F, current_worker_id: u32) -> Self {
        Self {
            push_fn,
            worker_id: current_worker_id,
        }
    }
}
impl<T, F> WorklistPush<T> for PushFnWrapper<F>
where
    F: FnMut(T),
{
    fn push(&mut self, item: T) {
        (self.push_fn)(item)
    }

    fn current_worker_id(&self) -> u32 {
        self.worker_id
    }
}