pargraph 0.2.0

Operator based parallel graph processing.
Documentation
// SPDX-FileCopyrightText: 2022 Thomas Kramer <code@tkramer.ch>
//
// SPDX-License-Identifier: GPL-3.0-or-later

//! ParaGraph: Parallel graph processing.
//!
//! This crate implements datastructures and algorithms for concurrent traversal of graphs.
//! Graphs can be processed using 'operators'. An operator can see only a small part of a graph, namely
//! the 'active' node and its direct neighbors. Labelling operators can edit the associated data of the active
//! node and they can generate a set of new nodes which should be processed later. The order how the nodes are processed
//! is largely defined by the 'worklists'.
//!
//! # Operators
//! There are the following types of operators:
//! * [`ReadonlyOperator`] - Can only access graph elements by immutable reference. Needs to use interior mutability if modification is necessary.
//! * [`LabellingOperator`] - Can modify the local node data. The executor must provide a mutable reference to local node data.
//!
//! # Executors
//! There are the following executors:
//! * [`executors::single_thread::SingleThreadExecutor`] - Executes an operator in the current thread.
//! * [`executors::multi_thread::MultiThreadExecutor`] - Executes an operator on many threads. Imposes stricter trait bounds on operators and graph data structures.
//!
//!
//! # Example: compute cone of influence using atomics
//!
//! The following example visits the output cone of the `src` node. The output cone consists of all nodes
//! which can be reached by starting at `src` and then following outgoing edges.
//! Additionally, for each node in the cone, the operator keeps track of input nodes which are in the cone.
//!
//! Similar algorithms can for example be used to mark the regions of interest for incremental updates for shortest path searches.
//!
//! This algorithm is implemented as a [`ReadonlyOperator`] which operates on immutable references of the node data.
//! Safe mutability of the node data is still achieved using atomics. This avoids wrapping the node data into a
//! [`DataCell`] for locking.
//!
//! ```
//! use pargraph::prelude::*;
//! use petgraph::data::DataMap;
//! use petgraph::graph::DiGraph;
//! use petgraph::visit::*;
//! use std::sync::atomic::AtomicU32;
//!
//! struct NodeData {
//!     /// Count the number of input edges to the node
//!     /// which are part of the cone.
//!     num_dependencies: AtomicU32,
//! }
//!
//! impl NodeData {
//!     fn new() -> Self {
//!         Self {
//!             num_dependencies: AtomicU32::new(0),
//!         }
//!     }
//! }
//!
//! // Create a graph like:
//! //     x---
//! //     |   \
//! //    src   y
//! //     |
//! //     a
//! //   /  \
//! //  b   c
//! //  \  /
//! //   d
//! let mut g = DiGraph::new();
//!
//! // Helper function for creating new nodes with default node data.
//! // Initialize the distance to the maximum value.
//! let mut new_node = || g.add_node(NodeData::new());
//!
//! // Create some new nodes.
//! let [x, y, src, a, b, c, d] = [(); 7].map(|_| new_node());
//!
//! // Add some edges (without any weights).
//! g.add_edge(x, src, ());
//! g.add_edge(x, y, ());
//! g.add_edge(src, a, ());
//! g.add_edge(a, b, ());
//! g.add_edge(a, c, ());
//! g.add_edge(c, d, ());
//! g.add_edge(b, d, ());
//!
//! let operator = ConeOfInfluenceOp {};
//!
//! let executor = MultiThreadExecutor::new();
//!
//! // Create a worklist and add the source node.
//! let wl = FifoWorklist::new_with_local_queues(vec![src].into());
//! executor.run_readonly(wl, &operator, &g);
//!
//! let get_num_dependencies = |n: petgraph::graph::NodeIndex| -> u32 {
//!     g.node_weight(n)
//!         .unwrap()
//!         .num_dependencies
//!         .load(std::sync::atomic::Ordering::Relaxed)
//! };
//!
//! // Check the distances.
//! assert_eq!(get_num_dependencies(x), 0, "x is not in the cone of influence of src");
//! assert_eq!(get_num_dependencies(y), 0, "y is not in the cone of influence of src");
//! assert_eq!(get_num_dependencies(src), 0);
//! assert_eq!(get_num_dependencies(a), 1);
//! assert_eq!(get_num_dependencies(b), 1);
//! assert_eq!(get_num_dependencies(c), 1);
//! assert_eq!(get_num_dependencies(d), 2);
//!
//! // This is our operator.
//! struct ConeOfInfluenceOp {}
//!
//! // We can implement this operator as a `ReadonlyOperator` because it does not require
//! // a mutable reference to the node data. Safe mutability is achieved using atomics.
//! // Note that we implement the operator for the reference type. Operators are required to implement `Clone`.
//! // A reference implements `Clone` automatically. Alternatively we could also derive `Clone` for `ConeOfInfluenceOp`
//! // and pass ownership of the operator to the executor. The executor might create clones of the operators for the worker
//! // threads.
//! impl<G> ReadonlyOperator<G> for &ConeOfInfluenceOp
//! where
//!     G: GraphBase + IntoEdgesDirected,
//!     G: DataMap<NodeWeight = NodeData>,
//! {
//!     type WorkItem = G::NodeId;
//!
//!     fn op(
//!         &self,
//!         active_node: Self::WorkItem,
//!         local_view: LocalGraphView<&G>,
//!         mut worklist: impl WorklistPush<Self::WorkItem>,
//!     ) {
//!         let output_nodes =
//!             local_view.neighbors_directed(active_node, petgraph::Direction::Outgoing);
//!
//!         for n in output_nodes {
//!             // Access the node weight.
//!             let n_data = local_view
//!                 .node_weight(n)
//!                 .expect("all nodes should have a weight");
//!
//!             // Atomically increment the number of dependencies of the node `n`.
//!             // `fetch_add` returns the previous value. If the previous value is `0` then
//!             // we know that this is the first time we look at node `n` (unless there is a cycle leading to the source node).
//!             let previous_num_dependencies = n_data
//!                 .num_dependencies
//!                 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
//!
//!             if previous_num_dependencies == 0 {
//!                 // This is the first time n is touched.
//!                 worklist.push(n);
//!             }
//!         }
//!     }
//! }
//! ```

#![deny(missing_docs)]
#![deny(unused_imports)]

mod conflict_resolution;
pub mod executors;
pub mod id_rwlock;
pub mod local_view;
pub mod prelude;
pub mod worklists;

use std::{borrow::Borrow, fmt::Debug};

use petgraph::visit::GraphBase;

use self::{
    id_rwlock::IdLock,
    local_view::{FullConflictDetection, LocalGraphView},
    worklists::WorklistPush,
};

/// An graph operator which can only modify labels of nodes and edges
/// but cannot change the topology of a graph.
pub trait LabellingOperator<G>
where
    G: GraphBase,
{
    /// Data type used for label of the nodes.
    type NodeWeight;

    /// Type used for representing a graph node.
    ///
    /// In order to allow including associated data, the work item can be any type which
    /// allows to borrow a reference to the graph node ID.
    /// Because `Borrow<T>` is by default also implemented for `T`, this allows to use graph node IDs
    /// directly as a `WorkItem`.
    type WorkItem: Borrow<G::NodeId>;

    /// Do the graph operation on the given active node.
    ///
    /// `local_data` provides exclusive mutable access to the node data of the active node.
    /// The operator can request read access to its neighborhood using `local_view.try_node_weight(n)`.
    /// Nodes which should become active nodes later can be pushed to the worklist with
    /// the `worklist.push()` function.
    /// In order for data conflicts to be handled, the errors during locking of node or edge data must be propagated to the caller.
    /// Modifications to the `local_data` are *not* reverted upon a data conflict.
    fn op(
        &self,
        work_item: Self::WorkItem,
        local_view: LocalGraphView<&G, FullConflictDetection>,
        local_data: &mut Self::NodeWeight,
        worklist: impl WorklistPush<Self::WorkItem>,
    ) -> Result<(), DataConflictErr<G::NodeId, G::EdgeId>>;
}

/// Operator which only gets read access to the graph.
/// The implementation can still write to graph/edge weights
/// but needs to care about synchronization itself by using apropriate
/// schemes (for example `RwLock` and atomics).
pub trait ReadonlyOperator<G>
where
    G: GraphBase,
{
    /// Type used for representing a graph node.
    ///
    /// In order to allow including associated data, the work item can be any type which
    /// allows to borrow a reference to the graph node ID.
    /// Because `Borrow<T>` is by default also implemented for `T`, this allows to use graph node IDs
    /// directly as a `WorkItem`.
    type WorkItem: Borrow<G::NodeId>;

    /// Do the graph operation on the given active node.
    fn op(
        &self,
        work_item: Self::WorkItem,
        local_view: LocalGraphView<&G>,
        worklist: impl WorklistPush<Self::WorkItem>,
    );
}

/// A data conflict error which can happen in two ways:
/// 1) The executor tries to get write access to the data of the active node but there's already some other threads reading or writing to it.
/// 2) The operator tries to get read access to data of other nodes/edges but the data is already locked for write access.
#[derive(Debug, Copy, Clone)]
pub struct DataConflictErr<Node, Edge> {
    /// The graph element which was tried to be accessed. TODO: This might not be used.
    _graph_element: GraphElement<Node, Edge>,
    /// The error caused by trying to acquire the lock.
    err: LockingErr,
}

/// Either a node or edge ID.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
enum GraphElement<N, E> {
    NodeId(N),
    EdgeId(E),
}

/// Bundle read/write locking errors into one type.
#[derive(Debug, Copy, Clone)]
enum LockingErr {
    /// Failed to acquire a read lock.
    ReadErr(id_rwlock::IdLockReadErr),
    /// Failed to acquire a write lock.
    WriteErr(id_rwlock::IdLockWriteErr),
}

impl From<id_rwlock::IdLockReadErr> for LockingErr {
    fn from(e: id_rwlock::IdLockReadErr) -> Self {
        Self::ReadErr(e)
    }
}

impl From<id_rwlock::IdLockWriteErr> for LockingErr {
    fn from(e: id_rwlock::IdLockWriteErr) -> Self {
        Self::WriteErr(e)
    }
}

/// Wrapper for data associated with graph elements.
/// Used for interior mutability during parallel graph processing.
pub type DataCell<T> = IdLock<T>;

/// This trait is used to allow using arbitrary data types for node weights
/// as long as it is possible to use a `DataCell` from them.
pub trait BorrowDataCell {
    /// Data type which is wrapped to get synchronized access.
    type UserData;

    /// Get a reference to a [`DataCell`] structure.
    fn borrow_data_cell(&self) -> &DataCell<Self::UserData>;
}

impl<T> BorrowDataCell for DataCell<T> {
    type UserData = T;

    fn borrow_data_cell(&self) -> &DataCell<Self::UserData> {
        self
    }
}

// /// Bundle of a graph node and a priority.
// /// This type implements `PartialOrd` and `Ord` based on the value of the priority (the value of the node is ignored).
// /// Priorities can be disabled by using the `()` type for priorities.
// /// Then the memory representation of this struct is equal to the one of the type used for graph nodes.
// #[derive(Clone, Copy, Debug, Eq, PartialEq)]
// pub struct WithPriority<N, P> {
//     /// Priority associated with the graph node. Can be of type `()` to disable priorities.
//     pub priority: P,
//     /// ID of the graph node.
//     pub node: N,
// }
//
// impl<N> From<N> for WithPriority<N, ()> {
//     fn from(node: N) -> WithPriority<N, ()> {
//         WithPriority::new(node)
//     }
// }
//
// impl<N, P> From<(N, P)> for WithPriority<N, P> {
//     fn from((node, priority): (N, P)) -> Self {
//         Self::new_with_priority(node, priority)
//     }
// }
//
// impl<N, P> WithPriority<N, P> {
//     pub fn new_with_priority(node: N, priority: P) -> Self {
//         Self { priority, node }
//     }
// }
//
// impl<N> WithPriority<N, ()> {
//     /// Create a new struct with priorities disabled.
//     pub fn new(node: N) -> Self {
//         Self { priority: (), node }
//     }
// }
//
// impl<N, P> PartialOrd for WithPriority<N, P>
// where
//     P: PartialOrd,
//     N: PartialEq,
// {
//     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
//         self.priority.partial_cmp(&other.priority)
//     }
// }
//
// impl<N, P> Ord for WithPriority<N, P>
// where
//     P: Ord,
//     N: PartialEq + Eq,
// {
//     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
//         self.priority.cmp(&other.priority)
//     }
// }