pargraph 0.2.0

Operator based parallel graph processing.
Documentation
// SPDX-FileCopyrightText: 2023 Thomas Kramer <code@tkramer.ch>
//
// SPDX-License-Identifier: GPL-3.0-or-later

//! Operator wrapper which handles data conflicts of its inner operator.

use std::ops::DerefMut;

use petgraph::{data::DataMap, visit::GraphBase};

use super::{worklists::PushFnWrapper, *};

/// Wrap a [`LabellingOperator`] into a [`ReadonlyOperator`] which takes care of the
/// conflict detection.
#[derive(Clone)]
pub struct ConflictResolvingOperator<Op> {
    op: Op,
}

impl<Op> ConflictResolvingOperator<Op> {
    /// Wrap the operator `op` into an operator which resolves the data conflicts caused by `op`.
    pub fn new(op: Op) -> Self {
        Self { op }
    }
}

impl<G, Op> ReadonlyOperator<G> for ConflictResolvingOperator<Op>
where
    Op: LabellingOperator<G>,
    G: GraphBase + DataMap,
    G::NodeWeight: BorrowDataCell<UserData = Op::NodeWeight>,
    Op::WorkItem: Copy,
{
    type WorkItem = Op::WorkItem;

    fn op(
        &self,
        work_item: Self::WorkItem,
        local_view: LocalGraphView<&G>,
        mut worklist: impl WorklistPush<Self::WorkItem>,
    ) {
        let worker_id = worklist.current_worker_id();
        let active_node = local_view.active_node();
        let local_view = &local_view;

        let node_data = local_view
            .node_weight(active_node)
            .expect("node has no data");

        let mut data_guard = match node_data.borrow_data_cell().try_write(0) {
            Ok(g) => g,
            Err(e) => {
                // Push the task again to the worklist for later processing.
                match e {
                    id_rwlock::IdLockWriteErr::CurrentWriter(other_worker_id) => {
                        // Some worker already has write access to this node data.
                        //
                        // Make a smart decision about to whom to push the work item:
                        // Pushing the work item to the worker currently blocking it might cause oscillations.
                        // Therefore push it to the worker with lower ID.
                        let responsible_worker_id =
                            worklist.current_worker_id().min(other_worker_id);
                        worklist.push_to(work_item, responsible_worker_id);
                    }
                    id_rwlock::IdLockWriteErr::NumberOfReaders(_) => {
                        // Some workers already have read access to this node data.
                        worklist.push(work_item);
                    }
                };
                return;
            }
        };

        let node_data = data_guard.deref_mut();

        let local_view_mut = LocalGraphView::new(local_view.base_graph(), active_node);

        let push = PushFnWrapper::new(|item| worklist.push(item), worker_id);
        // Call the operator.
        let op_result = self.op.op(work_item, local_view_mut, node_data, push);

        // Handle data conflicts.
        if let Err(data_conflict_err) = op_result {
            match data_conflict_err.err {
                LockingErr::ReadErr(read_err) => match read_err {
                    id_rwlock::IdLockReadErr::CurrentWriter(other_worker_id) => {
                        // Send this task to the worker which blocked the current operation.
                        worklist.push_to(work_item, other_worker_id);
                    }
                    id_rwlock::IdLockReadErr::LocksExhausted(_) => {
                        // Simply schedule this task for later.
                        worklist.push(work_item);
                    }
                },
                LockingErr::WriteErr(_write_err) => {
                    // operators should not be able to create write conflicts because they cannot try to acquire write locks
                    unreachable!("unexpected write conflict")
                }
            }
        }
    }
}