swh_graph/
approximate_bfs.rs

1// Copyright (C) 2023-2024  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6//! A parallel almost-BFS traversal
7//!
8//! This implements a graph traversal that is like a BFS from many sources, but traversal
9//! from a source may steal a node from another traversal
10
11use std::cell::RefCell;
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16use dsi_progress_logger::{progress_logger, ProgressLog};
17use rayon::prelude::*;
18use sux::prelude::{AtomicBitVec, BitFieldVec};
19use sux::traits::AtomicBitVecOps;
20use thread_local::ThreadLocal;
21use webgraph::prelude::*;
22
23use crate::map::OwnedPermutation;
24
25type NodeId = usize;
26
27pub fn almost_bfs_order<G: RandomAccessGraph + Send + Sync>(
28    graph: &G,
29    start_nodes: &[NodeId],
30) -> OwnedPermutation<Vec<NodeId>> {
31    let num_nodes = graph.num_nodes();
32
33    let visited = AtomicBitVec::new(num_nodes);
34
35    let visit_from_root_node =
36        |thread_order: &mut BitFieldVec<NodeId>, root_node| -> Option<usize> {
37            if visited.get(root_node, Ordering::Relaxed) {
38                // Skip VecDeque allocation
39                return None;
40            }
41            let mut visited_nodes = 0;
42            let mut queue = VecDeque::new();
43            queue.push_back(root_node);
44            while let Some(node) = queue.pop_front() {
45                // As we are not atomically getting and setting 'visited' bit, other
46                // threads may also visit it at the same time. We will deduplicate that
47                // at the end, so the only effect is for some nodes to be double-counted
48                // by the progress logger.
49                if visited.get(node, Ordering::Relaxed) {
50                    continue;
51                }
52                visited.set(node, true, Ordering::Relaxed);
53                visited_nodes += 1;
54                thread_order.push(node);
55
56                for succ in graph.successors(node) {
57                    queue.push_back(succ);
58                }
59            }
60            Some(visited_nodes)
61        };
62
63    let mut pl = progress_logger!(
64        display_memory = true,
65        item_name = "node",
66        local_speed = true,
67        expected_updates = Some(num_nodes),
68    );
69    pl.start("[step 1/2] Visiting graph in pseudo-BFS order...");
70    let pl = Arc::new(Mutex::new(pl));
71
72    let thread_orders = ThreadLocal::new();
73
74    let initial_capacity = 1024; // arbitrary
75    let bit_width = 8 * std::mem::size_of::<usize>() - (num_nodes.leading_zeros() as usize);
76
77    if start_nodes.is_empty() {
78        log::info!("No initial starting nodes; starting from arbitrary nodes...");
79    } else {
80        log::info!(
81            "Traversing from {} given initial nodes...",
82            start_nodes.len()
83        );
84        let visited_initial_nodes = AtomicUsize::new(0);
85        start_nodes.into_par_iter().for_each_init(
86            || {
87                thread_orders
88                    .get_or(|| {
89                        RefCell::new(BitFieldVec::with_capacity(bit_width, initial_capacity))
90                    })
91                    .borrow_mut()
92            },
93            |thread_order, &root_node| {
94                if let Some(visited_nodes) = visit_from_root_node(thread_order, root_node) {
95                    pl.lock().unwrap().update_with_count(visited_nodes);
96                }
97                let i = visited_initial_nodes.fetch_add(1, Ordering::Relaxed);
98                if start_nodes.len() > 100 && i % (start_nodes.len() / 100) == 0 {
99                    log::info!(
100                        "Finished traversals from {}% of initial nodes.",
101                        i * 100 / start_nodes.len()
102                    );
103                }
104            },
105        );
106        log::info!("Done traversing from given initial nodes.");
107        log::info!("Traversing from arbitrary nodes...");
108    }
109
110    crate::utils::shuffle::par_iter_shuffled_range(0..num_nodes).for_each_init(
111        || {
112            thread_orders
113                .get_or(|| RefCell::new(BitFieldVec::with_capacity(bit_width, initial_capacity)))
114                .borrow_mut()
115        },
116        |thread_order, root_node| {
117            if let Some(visited_nodes) = visit_from_root_node(thread_order, root_node) {
118                pl.lock().unwrap().update_with_count(visited_nodes);
119            }
120        },
121    );
122
123    pl.lock().unwrap().done();
124
125    let mut pl = progress_logger!(
126        display_memory = true,
127        item_name = "node",
128        local_speed = true,
129        expected_updates = Some(num_nodes),
130    );
131    pl.start("[step 2/2] Concatenating orders...");
132
133    // "Concatenate" orders from each thread.
134    let mut order = vec![NodeId::MAX; num_nodes];
135    let mut i = 0;
136    for thread_order in thread_orders.into_iter() {
137        for node in thread_order.into_inner().into_iter() {
138            if order[node] == NodeId::MAX {
139                pl.light_update();
140                order[node] = i;
141                i += 1
142            }
143        }
144    }
145
146    assert_eq!(
147        i, num_nodes,
148        "graph has {num_nodes} nodes, permutation has {i}"
149    );
150
151    pl.done();
152    OwnedPermutation::new(order).unwrap()
153}