swh_graph/
approximate_bfs.rs1use std::cell::RefCell;
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16use dsi_progress_logger::{progress_logger, ProgressLog};
17use num_cpus;
18use rayon::prelude::*;
19use sux::prelude::AtomicBitVec;
20use thread_local::ThreadLocal;
21use webgraph::prelude::*;
22
23use crate::map::OwnedPermutation;
24
25type NodeId = usize;
26
27pub fn almost_bfs_order<G: RandomAccessGraph + Send + Sync>(
28 graph: &G,
29 start_nodes: &[NodeId],
30) -> OwnedPermutation<Vec<NodeId>> {
31 let num_nodes = graph.num_nodes();
32
33 let visited = AtomicBitVec::new(num_nodes);
34
35 let visit_from_root_node = |thread_order: &mut Vec<NodeId>, root_node| -> Option<usize> {
36 if visited.get(root_node, Ordering::Relaxed) {
37 return None;
39 }
40 let mut visited_nodes = 0;
41 let mut queue = VecDeque::new();
42 queue.push_back(root_node);
43 while let Some(node) = queue.pop_front() {
44 if visited.get(node, Ordering::Relaxed) {
49 continue;
50 }
51 visited.set(node, true, Ordering::Relaxed);
52 visited_nodes += 1;
53 thread_order.push(node);
54
55 for succ in graph.successors(node) {
56 queue.push_back(succ);
57 }
58 }
59 Some(visited_nodes)
60 };
61
62 let mut pl = progress_logger!(
63 display_memory = true,
64 item_name = "node",
65 local_speed = true,
66 expected_updates = Some(num_nodes),
67 );
68 pl.start("[step 1/2] Visiting graph in pseudo-BFS order...");
69 let pl = Arc::new(Mutex::new(pl));
70
71 let thread_orders = ThreadLocal::new();
72
73 let num_threads = num_cpus::get();
74
75 if start_nodes.is_empty() {
76 log::info!("No initial starting nodes; starting from arbitrary nodes...");
77 } else {
78 log::info!(
79 "Traversing from {} given initial nodes...",
80 start_nodes.len()
81 );
82 let visited_initial_nodes = AtomicUsize::new(0);
83 start_nodes.into_par_iter().for_each_init(
84 || {
85 thread_orders
86 .get_or(|| RefCell::new(Vec::with_capacity(num_nodes / num_threads)))
87 .borrow_mut()
88 },
89 |thread_order, &root_node| {
90 if let Some(visited_nodes) = visit_from_root_node(thread_order, root_node) {
91 pl.lock().unwrap().update_with_count(visited_nodes);
92 }
93 let i = visited_initial_nodes.fetch_add(1, Ordering::Relaxed);
94 if start_nodes.len() > 100 && i % (start_nodes.len() / 100) == 0 {
95 log::info!(
96 "Finished traversals from {}% of initial nodes.",
97 i * 100 / start_nodes.len()
98 );
99 }
100 },
101 );
102 log::info!("Done traversing from given initial nodes.");
103 log::info!("Traversing from arbitrary nodes...");
104 }
105
106 crate::utils::shuffle::par_iter_shuffled_range(0..num_nodes).for_each_init(
107 || {
108 thread_orders
109 .get_or(|| RefCell::new(Vec::with_capacity(num_nodes / num_threads)))
110 .borrow_mut()
111 },
112 |thread_order, root_node| {
113 if let Some(visited_nodes) = visit_from_root_node(thread_order, root_node) {
114 pl.lock().unwrap().update_with_count(visited_nodes);
115 }
116 },
117 );
118
119 pl.lock().unwrap().done();
120
121 let mut pl = progress_logger!(
122 display_memory = true,
123 item_name = "node",
124 local_speed = true,
125 expected_updates = Some(num_nodes),
126 );
127 pl.start("[step 2/2] Concatenating orders...");
128
129 let mut order = vec![NodeId::MAX; num_nodes];
131 let mut i = 0;
132 for thread_order in thread_orders.into_iter() {
133 for node in thread_order.into_inner().into_iter() {
134 if order[node] == NodeId::MAX {
135 pl.light_update();
136 order[node] = i;
137 i += 1
138 }
139 }
140 }
141
142 assert_eq!(
143 i, num_nodes,
144 "graph has {num_nodes} nodes, permutation has {i}"
145 );
146
147 pl.done();
148 OwnedPermutation::new(order).unwrap()
149}