swh_graph/
approximate_bfs.rs

1// Copyright (C) 2023-2024  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6//! A parallel almost-BFS traversal
7//!
8//! This implements a graph traversal that is like a BFS from many sources, but traversal
9//! from a source may steal a node from another traversal
10
11use std::cell::RefCell;
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16use dsi_progress_logger::{progress_logger, ProgressLog};
17use num_cpus;
18use rayon::prelude::*;
19use sux::prelude::AtomicBitVec;
20use thread_local::ThreadLocal;
21use webgraph::prelude::*;
22
23use crate::map::OwnedPermutation;
24
25type NodeId = usize;
26
27pub fn almost_bfs_order<G: RandomAccessGraph + Send + Sync>(
28    graph: &G,
29    start_nodes: &[NodeId],
30) -> OwnedPermutation<Vec<NodeId>> {
31    let num_nodes = graph.num_nodes();
32
33    let visited = AtomicBitVec::new(num_nodes);
34
35    let visit_from_root_node = |thread_order: &mut Vec<NodeId>, root_node| -> Option<usize> {
36        if visited.get(root_node, Ordering::Relaxed) {
37            // Skip VecDeque allocation
38            return None;
39        }
40        let mut visited_nodes = 0;
41        let mut queue = VecDeque::new();
42        queue.push_back(root_node);
43        while let Some(node) = queue.pop_front() {
44            // As we are not atomically getting and setting 'visited' bit, other
45            // threads may also visit it at the same time. We will deduplicate that
46            // at the end, so the only effect is for some nodes to be double-counted
47            // by the progress logger.
48            if visited.get(node, Ordering::Relaxed) {
49                continue;
50            }
51            visited.set(node, true, Ordering::Relaxed);
52            visited_nodes += 1;
53            thread_order.push(node);
54
55            for succ in graph.successors(node) {
56                queue.push_back(succ);
57            }
58        }
59        Some(visited_nodes)
60    };
61
62    let mut pl = progress_logger!(
63        display_memory = true,
64        item_name = "node",
65        local_speed = true,
66        expected_updates = Some(num_nodes),
67    );
68    pl.start("[step 1/2] Visiting graph in pseudo-BFS order...");
69    let pl = Arc::new(Mutex::new(pl));
70
71    let thread_orders = ThreadLocal::new();
72
73    let num_threads = num_cpus::get();
74
75    if start_nodes.is_empty() {
76        log::info!("No initial starting nodes; starting from arbitrary nodes...");
77    } else {
78        log::info!(
79            "Traversing from {} given initial nodes...",
80            start_nodes.len()
81        );
82        let visited_initial_nodes = AtomicUsize::new(0);
83        start_nodes.into_par_iter().for_each_init(
84            || {
85                thread_orders
86                    .get_or(|| RefCell::new(Vec::with_capacity(num_nodes / num_threads)))
87                    .borrow_mut()
88            },
89            |thread_order, &root_node| {
90                if let Some(visited_nodes) = visit_from_root_node(thread_order, root_node) {
91                    pl.lock().unwrap().update_with_count(visited_nodes);
92                }
93                let i = visited_initial_nodes.fetch_add(1, Ordering::Relaxed);
94                if start_nodes.len() > 100 && i % (start_nodes.len() / 100) == 0 {
95                    log::info!(
96                        "Finished traversals from {}% of initial nodes.",
97                        i * 100 / start_nodes.len()
98                    );
99                }
100            },
101        );
102        log::info!("Done traversing from given initial nodes.");
103        log::info!("Traversing from arbitrary nodes...");
104    }
105
106    crate::utils::shuffle::par_iter_shuffled_range(0..num_nodes).for_each_init(
107        || {
108            thread_orders
109                .get_or(|| RefCell::new(Vec::with_capacity(num_nodes / num_threads)))
110                .borrow_mut()
111        },
112        |thread_order, root_node| {
113            if let Some(visited_nodes) = visit_from_root_node(thread_order, root_node) {
114                pl.lock().unwrap().update_with_count(visited_nodes);
115            }
116        },
117    );
118
119    pl.lock().unwrap().done();
120
121    let mut pl = progress_logger!(
122        display_memory = true,
123        item_name = "node",
124        local_speed = true,
125        expected_updates = Some(num_nodes),
126    );
127    pl.start("[step 2/2] Concatenating orders...");
128
129    // "Concatenate" orders from each thread.
130    let mut order = vec![NodeId::MAX; num_nodes];
131    let mut i = 0;
132    for thread_order in thread_orders.into_iter() {
133        for node in thread_order.into_inner().into_iter() {
134            if order[node] == NodeId::MAX {
135                pl.light_update();
136                order[node] = i;
137                i += 1
138            }
139        }
140    }
141
142    assert_eq!(
143        i, num_nodes,
144        "graph has {num_nodes} nodes, permutation has {i}"
145    );
146
147    pl.done();
148    OwnedPermutation::new(order).unwrap()
149}