swh_graph/
approximate_bfs.rs1use std::cell::RefCell;
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16use dsi_progress_logger::{progress_logger, ProgressLog};
17use rayon::prelude::*;
18use sux::prelude::{AtomicBitVec, BitFieldVec};
19use sux::traits::AtomicBitVecOps;
20use thread_local::ThreadLocal;
21use webgraph::prelude::*;
22
23use crate::map::OwnedPermutation;
24
25type NodeId = usize;
26
27pub fn almost_bfs_order<G: RandomAccessGraph + Send + Sync>(
28 graph: &G,
29 start_nodes: &[NodeId],
30) -> OwnedPermutation<Vec<NodeId>> {
31 let num_nodes = graph.num_nodes();
32
33 let visited = AtomicBitVec::new(num_nodes);
34
35 let visit_from_root_node =
36 |thread_order: &mut BitFieldVec<NodeId>, root_node| -> Option<usize> {
37 if visited.get(root_node, Ordering::Relaxed) {
38 return None;
40 }
41 let mut visited_nodes = 0;
42 let mut queue = VecDeque::new();
43 queue.push_back(root_node);
44 while let Some(node) = queue.pop_front() {
45 if visited.get(node, Ordering::Relaxed) {
50 continue;
51 }
52 visited.set(node, true, Ordering::Relaxed);
53 visited_nodes += 1;
54 thread_order.push(node);
55
56 for succ in graph.successors(node) {
57 queue.push_back(succ);
58 }
59 }
60 Some(visited_nodes)
61 };
62
63 let mut pl = progress_logger!(
64 display_memory = true,
65 item_name = "node",
66 local_speed = true,
67 expected_updates = Some(num_nodes),
68 );
69 pl.start("[step 1/2] Visiting graph in pseudo-BFS order...");
70 let pl = Arc::new(Mutex::new(pl));
71
72 let thread_orders = ThreadLocal::new();
73
74 let initial_capacity = 1024; let bit_width = 8 * std::mem::size_of::<usize>() - (num_nodes.leading_zeros() as usize);
76
77 if start_nodes.is_empty() {
78 log::info!("No initial starting nodes; starting from arbitrary nodes...");
79 } else {
80 log::info!(
81 "Traversing from {} given initial nodes...",
82 start_nodes.len()
83 );
84 let visited_initial_nodes = AtomicUsize::new(0);
85 start_nodes.into_par_iter().for_each_init(
86 || {
87 thread_orders
88 .get_or(|| {
89 RefCell::new(BitFieldVec::with_capacity(bit_width, initial_capacity))
90 })
91 .borrow_mut()
92 },
93 |thread_order, &root_node| {
94 if let Some(visited_nodes) = visit_from_root_node(thread_order, root_node) {
95 pl.lock().unwrap().update_with_count(visited_nodes);
96 }
97 let i = visited_initial_nodes.fetch_add(1, Ordering::Relaxed);
98 if start_nodes.len() > 100 && i % (start_nodes.len() / 100) == 0 {
99 log::info!(
100 "Finished traversals from {}% of initial nodes.",
101 i * 100 / start_nodes.len()
102 );
103 }
104 },
105 );
106 log::info!("Done traversing from given initial nodes.");
107 log::info!("Traversing from arbitrary nodes...");
108 }
109
110 crate::utils::shuffle::par_iter_shuffled_range(0..num_nodes).for_each_init(
111 || {
112 thread_orders
113 .get_or(|| RefCell::new(BitFieldVec::with_capacity(bit_width, initial_capacity)))
114 .borrow_mut()
115 },
116 |thread_order, root_node| {
117 if let Some(visited_nodes) = visit_from_root_node(thread_order, root_node) {
118 pl.lock().unwrap().update_with_count(visited_nodes);
119 }
120 },
121 );
122
123 pl.lock().unwrap().done();
124
125 let mut pl = progress_logger!(
126 display_memory = true,
127 item_name = "node",
128 local_speed = true,
129 expected_updates = Some(num_nodes),
130 );
131 pl.start("[step 2/2] Concatenating orders...");
132
133 let mut order = vec![NodeId::MAX; num_nodes];
135 let mut i = 0;
136 for thread_order in thread_orders.into_iter() {
137 for node in thread_order.into_inner().into_iter() {
138 if order[node] == NodeId::MAX {
139 pl.light_update();
140 order[node] = i;
141 i += 1
142 }
143 }
144 }
145
146 assert_eq!(
147 i, num_nodes,
148 "graph has {num_nodes} nodes, permutation has {i}"
149 );
150
151 pl.done();
152 OwnedPermutation::new(order).unwrap()
153}