pargraph 0.2.0

Operator based parallel graph processing.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
// SPDX-FileCopyrightText: 2023 Thomas Kramer <code@tkramer.ch>
//
// SPDX-License-Identifier: GPL-3.0-or-later

//! Simple multi-threaded executor for labelling graph operators.
use std::borrow::Borrow;
use std::fmt::Debug;

use std::sync::atomic::{AtomicIsize, AtomicU8, AtomicUsize, Ordering};

use std::sync::RwLock;
use std::{num::NonZeroUsize, thread};

use petgraph::{data::DataMap, visit::GraphBase};

use crate::conflict_resolution::ConflictResolvingOperator;
use crate::local_view::LocalGraphView;
use crate::worklists::WorklistChannel;
use crate::worklists::{PushFnWrapper, Worklist};
use crate::{BorrowDataCell, LabellingOperator, ReadonlyOperator};

/// Execute graph operators on multiple threads.
///
/// This executor will create clones of the operator for each worker thread.
/// Therefore, the operators should usually be cheap to clone. For example they can
/// include large shared data as a reference.
/// Cloning the operators also allows them to keep thread-local data, if necessary.
pub struct MultiThreadExecutor {
    num_threads: NonZeroUsize,
}

impl Default for MultiThreadExecutor {
    fn default() -> Self {
        Self::new()
    }
}

impl MultiThreadExecutor {
    /// Create a new multi-threaded executor.
    /// Automatically determine the number of threads.
    pub fn new() -> Self {
        Self {
            num_threads: std::thread::available_parallelism()
                .unwrap_or(NonZeroUsize::new(1).unwrap()),
        }
    }

    /// Set the number of threads to be used.
    pub fn with_num_threads(mut self, num_threads: NonZeroUsize) -> Self {
        self.num_threads = num_threads;
        self
    }

    /// Run an algorithm which modifies node labels but keeps the topology unchanged.
    pub fn run_node_labelling<Wl, Op, G>(&self, initial_worklist: Wl, operator: Op, graph: G)
    where
        Wl: Worklist<Op::WorkItem>,
        Wl::Channel: Send,
        Op: LabellingOperator<G> + Clone + Send,
        G: GraphBase + DataMap + Sync,
        G::NodeWeight: BorrowDataCell<UserData = Op::NodeWeight>,
        Op::WorkItem: Copy,
    {
        let op_wrapper = ConflictResolvingOperator::new(operator);

        self.run_readonly(initial_worklist, op_wrapper, graph)
    }

    /// Run an algorithm which does not need explicit mutable access to the graph elements.
    /// It can, of course, modify node and edge weights by using interior mutability together
    /// with an appropriate locking mechansim or atomic operations.
    pub fn run_readonly<Wl, Op, G>(&self, mut initial_worklist: Wl, operator: Op, graph: G)
    where
        Wl: Worklist<Op::WorkItem>,
        Wl::Channel: Send,
        Op: ReadonlyOperator<G> + Clone + Send,
        G: GraphBase + Sync,
    {
        let graph = &graph;

        let shared = WorkerSharedData::new(self.num_threads);

        thread::scope(|s| {
            {
                let initial_worklist_len = initial_worklist.initial_len();
                shared.pending_task_counters[0].set(initial_worklist_len as isize);
            }

            // Lock list of thread IDs during construction.
            let mut thread_ids_guard = shared.threads.write().unwrap();

            // Create clones of the operators.
            let operator_clones = itertools::repeat_n(operator, self.num_threads.get());

            // Create worker threads.
            let thread_ids = operator_clones.enumerate().map(|(id, operator)| {
                let worker = Worker {
                    shared: &shared,
                    id: id as u32,
                    channel: initial_worklist.create_channel(),
                    graph,
                    operator,
                };

                let thread = s.spawn(move || {
                    worker.run();
                });

                // Store the thread such that others can wake it up if necessary.
                thread.thread().clone()
            });

            thread_ids_guard.extend(thread_ids);

            // Release lock.
            drop(thread_ids_guard);
        });

        initial_worklist.stop();
    }
}

/// Variables which are shared by reference between worker threads.
struct WorkerSharedData {
    // Keep per-thread balance of issued tasks and finished tasks.
    // Used to detect if all work has been done. This is needed in case the worklist has some latency and pushed items don't immediately
    // show up in the queue.
    // Counters need to be signed because workers can finish more tasks than they create.
    pending_task_counters: Vec<PendingTasksCounter>,
    unpark_signals: Vec<UnparkSignal>,
    /// Used to signal when a thread encounters an empty worklist.
    worklist_maybe_empty: AtomicUsize,
    /// List of all thread handles.
    /// Used to unpark sleeping threads when a new work item might have arrived.
    threads: RwLock<Vec<thread::Thread>>,
    /// Number of worker threads.
    num_threads: NonZeroUsize,
}

impl WorkerSharedData {
    fn new(num_threads: NonZeroUsize) -> Self {
        let threads: RwLock<Vec<thread::Thread>> =
            RwLock::new(Vec::with_capacity(num_threads.get()));

        let pending_task_counters = (0..num_threads.get()).map(|_| Default::default()).collect();

        let unpark_signals = (0..num_threads.get()).map(|_| Default::default()).collect();

        Self {
            pending_task_counters,
            unpark_signals,
            worklist_maybe_empty: Default::default(),
            threads,
            num_threads,
        }
    }
}

struct Worker<'a, Ch, G, Op> {
    id: u32,
    channel: Ch,
    graph: &'a G,
    operator: Op,
    shared: &'a WorkerSharedData,
}

impl<'a, Ch, G, Op> Worker<'a, Ch, G, Op>
where
    G: GraphBase,
    Ch: WorklistChannel<Op::WorkItem>,
    Op: ReadonlyOperator<G>,
{
    /// Run the worker.
    fn run(self) {
        let pending_task_counter = &self.shared.pending_task_counters[self.id as usize];

        loop {
            if let Some(active_node) = self.channel.pop() {
                let num_new_work_items = self.process_task(active_node);

                // Bookkeeping: we just finished a task.
                pending_task_counter.decrement();

                if num_new_work_items > 0 {
                    self.wake_starved_threads();
                }
            } else {
                // Worklist looks empty.
                // There might be items in the fly, it is not necessarily empty.
                // Eventually blocks and waits for other workers to generate
                // more tasks.
                if self.handle_empty_worklist() == LoopControl::Break {
                    break;
                }
            }
        }
    }

    /// Return the number of newly created work items.
    fn process_task(&self, work_item: Op::WorkItem) -> usize {
        let mut num_new_work_items = 0;
        let pending_task_counter = &self.shared.pending_task_counters[self.id as usize];
        let active_node = *work_item.borrow();

        let local_view = LocalGraphView::new(self.graph, active_node);
        // Don't pass locked node data to the operator.
        let push = PushFnWrapper::new(
            |item| {
                num_new_work_items += 1;
                pending_task_counter.increment();
                self.channel.push_to(item, self.id)
            },
            self.id,
        );
        self.operator.op(work_item, local_view, push);
        num_new_work_items
    }
}

impl<'a, Ch, G, Op> Worker<'a, Ch, G, Op> {
    /// Get number of pending tasks across all worker threads.
    fn num_pending_tasks(&self) -> isize {
        self.shared
            .pending_task_counters
            .iter()
            .map(|c| c.get())
            .sum()
    }

    /// Blocks if the worklist is empty and some other thread could still wake the current thread.
    fn handle_empty_worklist(&self) -> LoopControl {
        // Seen from this thread, the worklist is possibly empty. But we don't know for sure.
        // Signal to other threads that we'd like to check if the worklist is empty.
        let num_empty = self
            .shared
            .worklist_maybe_empty
            .fetch_add(1, Ordering::Relaxed)
            + 1;
        assert!(
            num_empty <= self.shared.num_threads.get(),
            "unmatched increment/decrement"
        );

        if num_empty == self.shared.num_threads.get() {
            // I'm the last thread running out of work. All the others might be asleep.
            // Wake all other threads. They should exit this loop.

            if self.num_pending_tasks() == 0 {
                for (thread, signal) in self
                    .shared
                    .threads
                    .read()
                    .unwrap()
                    .iter()
                    .zip(&self.shared.unpark_signals)
                {
                    signal.set_exit();
                    if thread.id() != thread::current().id() {
                        thread.unpark();
                    }
                }
                // Exit. There's no more work to do.
                LoopControl::Break
            } else {
                self.shared
                    .worklist_maybe_empty
                    .fetch_sub(1, Ordering::Relaxed);
                LoopControl::NoAction
            }
        } else {
            // Other threads might still produce more work. Let's wait for them to wake this thread.

            let signal = &self.shared.unpark_signals[self.id as usize];
            loop {
                thread::park();

                match signal.clear_unpark() {
                    UnparkAction::ParkAgain => {}
                    UnparkAction::Unpark => {
                        // There are pending tasks. The worklist is not empty.
                        self.shared
                            .worklist_maybe_empty
                            .fetch_sub(1, Ordering::Relaxed);
                        break LoopControl::NoAction;
                    }
                    UnparkAction::Exit => {
                        assert_eq!(self.num_pending_tasks(), 0);
                        break LoopControl::Break;
                    }
                }
            }
        }
    }

    fn wake_starved_threads<T>(&self)
    where
        Ch: WorklistChannel<T>,
    {
        let has_waiting_threads = self.shared.worklist_maybe_empty.load(Ordering::Acquire) > 0;

        // Telling other threads has overhead. Best do it if there's enough work for everybody.
        let has_enough_tasks = self.channel.local_len() > self.shared.num_threads.get();

        if has_waiting_threads && has_enough_tasks
        // TODO: what if the other threads cannot steal work from this one?
        {
            // Some other thread thinks that the worklist might be empty.
            // Need to tell the other threads that there might be more items coming into the worklist.
            for (thread, signal) in self
                .shared
                .threads
                .read()
                .unwrap()
                .iter()
                .zip(&self.shared.unpark_signals)
            {
                if thread.id() != thread::current().id() {
                    signal.set_unpark();
                    thread.unpark();
                }
            }
        }
    }
}

/// Used to break a `loop` from a function.
#[derive(Copy, Clone, PartialEq, Eq)]
#[must_use]
enum LoopControl {
    /// Continue the loop.
    NoAction,
    /// Exit the loop.
    Break,
}

/// Atomic counter for keeping track of the number of tasks in the worklist.
/// Each worker thread uses a thread-local counter for cache efficiency.
#[derive(Default)]
#[repr(align(64))] // Counters should be in different cache-lines. The alignment should be equal to the cache-line size.
struct PendingTasksCounter {
    value: AtomicIsize,
}

impl PendingTasksCounter {
    /// `increment` and `decrement` are always called from the same thread.
    fn increment(&self) {
        self.value.fetch_add(1, Ordering::Release);
    }

    /// `increment` and `decrement` are always called from the same thread.
    fn decrement(&self) {
        self.value.fetch_sub(1, Ordering::Release);
    }

    /// `get` might be called from other threads.
    fn get(&self) -> isize {
        self.value.load(Ordering::Acquire)
    }

    fn set(&self, value: isize) {
        self.value.store(value, Ordering::Release);
    }
}

#[derive(Default)]
#[repr(align(64))]
struct UnparkSignal {
    signal: AtomicU8,
}

impl UnparkSignal {
    /// Set the exit-bit and return the previous value.
    /// Once set, the exit bit cannot be cleared.
    fn set_exit(&self) -> UnparkAction {
        // Once set, the exit bit cannot be cleared.
        self.signal.fetch_or(0b10, Ordering::Relaxed).into()
    }

    /// Clear the unpark-bit and return the previous value.
    fn clear_unpark(&self) -> UnparkAction {
        self.signal.fetch_and(!0b1, Ordering::Relaxed).into()
    }

    /// Set the unpark-bit and return the previous value.
    fn set_unpark(&self) -> UnparkAction {
        self.signal.fetch_or(0b1, Ordering::Relaxed).into()
    }
}

#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
enum UnparkAction {
    #[default]
    ParkAgain,
    Unpark,
    /// Exit the worker thread.
    Exit,
}

impl From<u8> for UnparkAction {
    fn from(value: u8) -> Self {
        if value & 0b10 != 0 {
            UnparkAction::Exit
        } else if value & 0b1 != 0 {
            UnparkAction::Unpark
        } else {
            UnparkAction::ParkAgain
        }
    }
}
impl From<UnparkAction> for u8 {
    fn from(value: UnparkAction) -> Self {
        match value {
            UnparkAction::ParkAgain => 0b00,
            UnparkAction::Unpark => 0b01,
            UnparkAction::Exit => 0b10,
        }
    }
}