1use anyhow::{bail, ensure, Context, Result};
9use card_est_array::impls::{HyperLogLog, HyperLogLogBuilder, SliceEstimatorArray};
10use card_est_array::traits::{
11 AsSyncArray, EstimationLogic, EstimatorArray, EstimatorArrayMut, EstimatorMut,
12 MergeEstimationLogic, SyncEstimatorArray,
13};
14use dsi_progress_logger::ConcurrentProgressLog;
15use kahan::KahanSum;
16use rayon::{prelude::*, ThreadPool};
17use std::hash::{BuildHasherDefault, DefaultHasher};
18use std::sync::{atomic::*, Mutex};
19use sux::{bits::AtomicBitVec, traits::Succ};
20use sync_cell_slice::{SyncCell, SyncSlice};
21use webgraph::traits::{RandomAccessGraph, SequentialLabeling};
22use webgraph::utils::Granularity;
23
24pub struct HyperBallBuilder<
30 'a,
31 G1: RandomAccessGraph + Sync,
32 G2: RandomAccessGraph + Sync,
33 D: Succ<Input = usize, Output = usize>,
34 L: MergeEstimationLogic<Item = G1::Label>,
35 A: EstimatorArrayMut<L>,
36> {
37 graph: &'a G1,
39 transpose: Option<&'a G2>,
41 cumul_outdegree: &'a D,
43 do_sum_of_dists: bool,
45 do_sum_of_inv_dists: bool,
47 discount_functions: Vec<Box<dyn Fn(usize) -> f64 + Sync + 'a>>,
49 arc_granularity: usize,
51 weights: Option<&'a [usize]>,
53 array_0: A,
55 array_1: A,
58 _marker: std::marker::PhantomData<L>,
59}
60
61impl<
62 'a,
63 G1: RandomAccessGraph + Sync,
64 G2: RandomAccessGraph + Sync,
65 D: Succ<Input = usize, Output = usize>,
66 >
67 HyperBallBuilder<
68 'a,
69 G1,
70 G2,
71 D,
72 HyperLogLog<G1::Label, BuildHasherDefault<DefaultHasher>, usize>,
73 SliceEstimatorArray<
74 HyperLogLog<G1::Label, BuildHasherDefault<DefaultHasher>, usize>,
75 usize,
76 Box<[usize]>,
77 >,
78 >
79{
80 pub fn with_hyper_log_log(
92 graph: &'a G1,
93 transposed: Option<&'a G2>,
94 cumul_outdeg: &'a D,
95 log2m: usize,
96 weights: Option<&'a [usize]>,
97 ) -> Result<Self> {
98 let num_elements = if let Some(w) = weights {
99 ensure!(
100 w.len() == graph.num_nodes(),
101 "weights should have length equal to the graph's number of nodes"
102 );
103 w.iter().sum()
104 } else {
105 graph.num_nodes()
106 };
107
108 let logic = HyperLogLogBuilder::new(num_elements)
109 .log_2_num_reg(log2m)
110 .build()
111 .with_context(|| "Could not build HyperLogLog logic")?;
112
113 let array_0 = SliceEstimatorArray::new(logic.clone(), graph.num_nodes());
114 let array_1 = SliceEstimatorArray::new(logic, graph.num_nodes());
115
116 Ok(Self {
117 graph,
118 transpose: transposed,
119 cumul_outdegree: cumul_outdeg,
120 do_sum_of_dists: false,
121 do_sum_of_inv_dists: false,
122 discount_functions: Vec::new(),
123 arc_granularity: Self::DEFAULT_GRANULARITY,
124 weights,
125 array_0,
126 array_1,
127 _marker: std::marker::PhantomData,
128 })
129 }
130}
131
132impl<
133 'a,
134 D: Succ<Input = usize, Output = usize>,
135 G: RandomAccessGraph + Sync,
136 L: MergeEstimationLogic<Item = G::Label> + PartialEq,
137 A: EstimatorArrayMut<L>,
138 > HyperBallBuilder<'a, G, G, D, L, A>
139{
140 pub fn new(graph: &'a G, cumul_outdeg: &'a D, array_0: A, array_1: A) -> Self {
149 assert!(array_0.logic() == array_1.logic(), "Incompatible logics");
150 assert_eq!(
151 graph.num_nodes(),
152 array_0.len(),
153 "array_0 should have length {}. Got {}",
154 graph.num_nodes(),
155 array_0.len()
156 );
157 assert_eq!(
158 graph.num_nodes(),
159 array_1.len(),
160 "array_1 should have length {}. Got {}",
161 graph.num_nodes(),
162 array_1.len()
163 );
164 Self {
165 graph,
166 transpose: None,
167 cumul_outdegree: cumul_outdeg,
168 do_sum_of_dists: false,
169 do_sum_of_inv_dists: false,
170 discount_functions: Vec::new(),
171 arc_granularity: Self::DEFAULT_GRANULARITY,
172 weights: None,
173 array_0,
174 array_1,
175 _marker: std::marker::PhantomData,
176 }
177 }
178}
179
180impl<
181 'a,
182 G1: RandomAccessGraph + Sync,
183 G2: RandomAccessGraph + Sync,
184 D: Succ<Input = usize, Output = usize>,
185 L: MergeEstimationLogic<Item = G1::Label>,
186 A: EstimatorArrayMut<L>,
187 > HyperBallBuilder<'a, G1, G2, D, L, A>
188{
189 const DEFAULT_GRANULARITY: usize = 16 * 1024;
190
191 pub fn with_transpose(
201 graph: &'a G1,
202 transpose: &'a G2,
203 cumul_outdeg: &'a D,
204 array_0: A,
205 array_1: A,
206 ) -> Self {
207 assert_eq!(
208 graph.num_nodes(),
209 array_0.len(),
210 "array_0 should have have len {}. Got {}",
211 graph.num_nodes(),
212 array_0.len()
213 );
214 assert_eq!(
215 graph.num_nodes(),
216 array_1.len(),
217 "array_1 should have have len {}. Got {}",
218 graph.num_nodes(),
219 array_1.len()
220 );
221 assert_eq!(
222 transpose.num_nodes(),
223 graph.num_nodes(),
224 "the transpose should have same number of nodes of the graph ({}). Got {}.",
225 graph.num_nodes(),
226 transpose.num_nodes()
227 );
228 assert_eq!(
229 transpose.num_arcs(),
230 graph.num_arcs(),
231 "the transpose should have same number of nodes of the graph ({}). Got {}.",
232 graph.num_arcs(),
233 transpose.num_arcs()
234 );
235 Self {
240 graph,
241 transpose: Some(transpose),
242 cumul_outdegree: cumul_outdeg,
243 do_sum_of_dists: false,
244 do_sum_of_inv_dists: false,
245 discount_functions: Vec::new(),
246 arc_granularity: Self::DEFAULT_GRANULARITY,
247 weights: None,
248 array_0,
249 array_1,
250 _marker: std::marker::PhantomData,
251 }
252 }
253
254 pub fn sum_of_distances(mut self, do_sum_of_distances: bool) -> Self {
256 self.do_sum_of_dists = do_sum_of_distances;
257 self
258 }
259
260 pub fn sum_of_inverse_distances(mut self, do_sum_of_inverse_distances: bool) -> Self {
262 self.do_sum_of_inv_dists = do_sum_of_inverse_distances;
263 self
264 }
265
266 pub fn granularity(mut self, granularity: Granularity) -> Self {
268 self.arc_granularity =
269 granularity.arc_granularity(self.graph.num_nodes(), Some(self.graph.num_arcs()));
270 self
271 }
272
273 pub fn weights(mut self, weights: Option<&'a [usize]>) -> Self {
279 if let Some(w) = weights {
280 assert_eq!(w.len(), self.graph.num_nodes());
281 }
282 self.weights = weights;
283 self
284 }
285
286 pub fn discount_function(
289 mut self,
290 discount_function: impl Fn(usize) -> f64 + Sync + 'a,
291 ) -> Self {
292 self.discount_functions.push(Box::new(discount_function));
293 self
294 }
295
296 pub fn no_discount_function(mut self) -> Self {
298 self.discount_functions.clear();
299 self
300 }
301}
302
303impl<
304 'a,
305 G1: RandomAccessGraph + Sync,
306 G2: RandomAccessGraph + Sync,
307 D: Succ<Input = usize, Output = usize>,
308 L: MergeEstimationLogic<Item = G1::Label> + Sync + std::fmt::Display,
309 A: EstimatorArrayMut<L>,
310 > HyperBallBuilder<'a, G1, G2, D, L, A>
311{
312 #[allow(clippy::type_complexity)]
318 pub fn build(self, pl: &mut impl ConcurrentProgressLog) -> HyperBall<'a, G1, G2, D, L, A> {
319 let num_nodes = self.graph.num_nodes();
320
321 let sum_of_distances = if self.do_sum_of_dists {
322 pl.debug(format_args!("Initializing sum of distances"));
323 Some(vec![0.0; num_nodes])
324 } else {
325 pl.debug(format_args!("Skipping sum of distances"));
326 None
327 };
328 let sum_of_inverse_distances = if self.do_sum_of_inv_dists {
329 pl.debug(format_args!("Initializing sum of inverse distances"));
330 Some(vec![0.0; num_nodes])
331 } else {
332 pl.debug(format_args!("Skipping sum of inverse distances"));
333 None
334 };
335
336 let mut discounted_centralities = Vec::new();
337 pl.debug(format_args!(
338 "Initializing {} discount functions",
339 self.discount_functions.len()
340 ));
341 for _ in self.discount_functions.iter() {
342 discounted_centralities.push(vec![0.0; num_nodes]);
343 }
344
345 pl.info(format_args!("Initializing bit vectors"));
346 let estimator_modified = AtomicBitVec::new(num_nodes);
347 let modified_result_estimator = AtomicBitVec::new(num_nodes);
348 let must_be_checked = AtomicBitVec::new(num_nodes);
349 let next_must_be_checked = AtomicBitVec::new(num_nodes);
350
351 pl.info(format_args!(
352 "Using estimation logic: {}",
353 self.array_0.logic()
354 ));
355
356 HyperBall {
357 graph: self.graph,
358 transposed: self.transpose,
359 weight: self.weights,
360 granularity: self.arc_granularity,
361 curr_state: self.array_0,
362 next_state: self.array_1,
363 completed: false,
364 neighborhood_function: Vec::new(),
365 last: 0.0,
366 relative_increment: 0.0,
367 sum_of_dists: sum_of_distances,
368 sum_of_inv_dists: sum_of_inverse_distances,
369 discounted_centralities,
370 iteration_context: IterationContext {
371 cumul_outdeg: self.cumul_outdegree,
372 iteration: 0,
373 current_nf: Mutex::new(0.0),
374 arc_granularity: 0,
375 node_cursor: AtomicUsize::new(0),
376 arc_cursor: Mutex::new((0, 0)),
377 visited_arcs: AtomicU64::new(0),
378 modified_estimators: AtomicU64::new(0),
379 systolic: false,
380 local: false,
381 pre_local: false,
382 local_checklist: Vec::new(),
383 local_next_must_be_checked: Mutex::new(Vec::new()),
384 must_be_checked,
385 next_must_be_checked,
386 curr_modified: estimator_modified,
387 next_modified: modified_result_estimator,
388 discount_functions: self.discount_functions,
389 },
390 _marker: std::marker::PhantomData,
391 }
392 }
393}
394
395struct IterationContext<'a, G1: SequentialLabeling, D> {
403 cumul_outdeg: &'a D,
405 iteration: usize,
407 current_nf: Mutex<f64>,
409 arc_granularity: usize,
412 node_cursor: AtomicUsize,
414 arc_cursor: Mutex<(usize, usize)>,
417 visited_arcs: AtomicU64,
419 modified_estimators: AtomicU64,
421 systolic: bool,
423 local: bool,
425 pre_local: bool,
427 local_checklist: Vec<G1::Label>,
430 local_next_must_be_checked: Mutex<Vec<G1::Label>>,
433 must_be_checked: AtomicBitVec,
435 next_must_be_checked: AtomicBitVec,
438 curr_modified: AtomicBitVec,
440 next_modified: AtomicBitVec,
442 discount_functions: Vec<Box<dyn Fn(usize) -> f64 + Sync + 'a>>,
444}
445
446impl<G1: SequentialLabeling, D> IterationContext<'_, G1, D> {
447 fn reset(&mut self, granularity: usize) {
449 self.arc_granularity = granularity;
450 self.node_cursor.store(0, Ordering::Relaxed);
451 *self.arc_cursor.lock().unwrap() = (0, 0);
452 self.visited_arcs.store(0, Ordering::Relaxed);
453 self.modified_estimators.store(0, Ordering::Relaxed);
454 }
455}
456
457pub struct HyperBall<
461 'a,
462 G1: RandomAccessGraph + Sync,
463 G2: RandomAccessGraph + Sync,
464 D: Succ<Input = usize, Output = usize>,
465 L: MergeEstimationLogic<Item = G1::Label> + Sync,
466 A: EstimatorArrayMut<L>,
467> {
468 graph: &'a G1,
470 transposed: Option<&'a G2>,
472 weight: Option<&'a [usize]>,
474 granularity: usize,
476 curr_state: A,
478 next_state: A,
480 completed: bool,
482 neighborhood_function: Vec<f64>,
484 last: f64,
486 relative_increment: f64,
489 sum_of_dists: Option<Vec<f32>>,
491 sum_of_inv_dists: Option<Vec<f32>>,
493 discounted_centralities: Vec<Vec<f32>>,
495 iteration_context: IterationContext<'a, G1, D>,
497 _marker: std::marker::PhantomData<L>,
498}
499
500impl<
501 G1: RandomAccessGraph + Sync,
502 G2: RandomAccessGraph + Sync,
503 D: Succ<Input = usize, Output = usize> + Sync,
504 L: MergeEstimationLogic<Item = usize> + Sync,
505 A: EstimatorArrayMut<L> + Sync + AsSyncArray<L>,
506 > HyperBall<'_, G1, G2, D, L, A>
507where
508 L::Backend: PartialEq,
509{
510 pub fn run(
524 &mut self,
525 upper_bound: usize,
526 threshold: Option<f64>,
527 thread_pool: &ThreadPool,
528 rng: impl rand::Rng,
529 pl: &mut impl ConcurrentProgressLog,
530 ) -> Result<()> {
531 let upper_bound = std::cmp::min(upper_bound, self.graph.num_nodes());
532
533 self.init(thread_pool, rng, pl)
534 .with_context(|| "Could not initialize estimator")?;
535
536 pl.item_name("iteration");
537 pl.expected_updates(None);
538 pl.start(format!(
539 "Running Hyperball for a maximum of {} iterations and a threshold of {:?}",
540 upper_bound, threshold
541 ));
542
543 for i in 0..upper_bound {
544 self.iterate(thread_pool, pl)
545 .with_context(|| format!("Could not perform iteration {}", i + 1))?;
546
547 pl.update_and_display();
548
549 if self
550 .iteration_context
551 .modified_estimators
552 .load(Ordering::Relaxed)
553 == 0
554 {
555 pl.info(format_args!(
556 "Terminating HyperBall after {} iteration(s) by stabilization",
557 i + 1
558 ));
559 break;
560 }
561
562 if let Some(t) = threshold {
563 if i > 3 && self.relative_increment < (1.0 + t) {
564 pl.info(format_args!("Terminating HyperBall after {} iteration(s) by relative bound on the neighborhood function", i + 1));
565 break;
566 }
567 }
568 }
569
570 pl.done();
571
572 Ok(())
573 }
574
575 #[inline(always)]
585 pub fn run_until_stable(
586 &mut self,
587 upper_bound: usize,
588 thread_pool: &ThreadPool,
589 rng: impl rand::Rng,
590 pl: &mut impl ConcurrentProgressLog,
591 ) -> Result<()> {
592 self.run(upper_bound, None, thread_pool, rng, pl)
593 .with_context(|| "Could not complete run_until_stable")
594 }
595
596 #[inline(always)]
605 pub fn run_until_done(
606 &mut self,
607 thread_pool: &ThreadPool,
608 rng: impl rand::Rng,
609 pl: &mut impl ConcurrentProgressLog,
610 ) -> Result<()> {
611 self.run_until_stable(usize::MAX, thread_pool, rng, pl)
612 .with_context(|| "Could not complete run_until_done")
613 }
614
615 #[inline(always)]
616 fn ensure_iteration(&self) -> Result<()> {
617 ensure!(
618 self.iteration_context.iteration > 0,
619 "HyperBall was not run. Please call HyperBall::run before accessing computed fields."
620 );
621 Ok(())
622 }
623
624 pub fn neighborhood_function(&self) -> Result<&[f64]> {
626 self.ensure_iteration()?;
627 Ok(&self.neighborhood_function)
628 }
629
630 pub fn sum_of_distances(&self) -> Result<&[f32]> {
632 self.ensure_iteration()?;
633 if let Some(distances) = &self.sum_of_dists {
634 Ok(distances)
636 } else {
637 bail!("Sum of distances were not requested: use builder.with_sum_of_distances(true) while building HyperBall to compute them")
638 }
639 }
640
641 pub fn harmonic_centralities(&self) -> Result<&[f32]> {
643 self.ensure_iteration()?;
644 if let Some(distances) = &self.sum_of_inv_dists {
645 Ok(distances)
646 } else {
647 bail!("Sum of inverse distances were not requested: use builder.with_sum_of_inverse_distances(true) while building HyperBall to compute them")
648 }
649 }
650
651 pub fn discounted_centrality(&self, index: usize) -> Result<&[f32]> {
656 self.ensure_iteration()?;
657 let d = self.discounted_centralities.get(index);
658 if let Some(distances) = d {
659 Ok(distances)
660 } else {
661 bail!("Discount centrality of index {} does not exist", index)
662 }
663 }
664
665 pub fn closeness_centrality(&self) -> Result<Box<[f32]>> {
667 self.ensure_iteration()?;
668 if let Some(distances) = &self.sum_of_dists {
669 Ok(distances
670 .iter()
671 .map(|&d| if d == 0.0 { 0.0 } else { d.recip() })
672 .collect())
673 } else {
674 bail!("Sum of distances were not requested: use builder.with_sum_of_distances(true) while building HyperBall to compute closeness centrality")
675 }
676 }
677
678 pub fn lin_centrality(&self) -> Result<Box<[f32]>> {
682 self.ensure_iteration()?;
683 if let Some(distances) = &self.sum_of_dists {
684 let logic = self.curr_state.logic();
685 Ok(distances
686 .iter()
687 .enumerate()
688 .map(|(node, &d)| {
689 if d == 0.0 {
690 1.0
691 } else {
692 let count = logic.estimate(self.curr_state.get_backend(node));
693 (count * count / d as f64) as f32
694 }
695 })
696 .collect())
697 } else {
698 bail!("Sum of distances were not requested: use builder.with_sum_of_distances(true) while building HyperBall to compute lin centrality")
699 }
700 }
701
702 pub fn nieminen_centrality(&self) -> Result<Box<[f32]>> {
704 self.ensure_iteration()?;
705 if let Some(distances) = &self.sum_of_dists {
706 let logic = self.curr_state.logic();
707 Ok(distances
708 .iter()
709 .enumerate()
710 .map(|(node, &d)| {
711 let count = logic.estimate(self.curr_state.get_backend(node));
712 ((count * count) - d as f64) as f32
713 })
714 .collect())
715 } else {
716 bail!("Sum of distances were not requested: use builder.with_sum_of_distances(true) while building HyperBall to compute lin centrality")
717 }
718 }
719
720 pub fn reachable_nodes_from(&self, node: usize) -> Result<f64> {
726 self.ensure_iteration()?;
727 Ok(self
728 .curr_state
729 .logic()
730 .estimate(self.curr_state.get_backend(node)))
731 }
732
733 pub fn reachable_nodes(&self) -> Result<Box<[f32]>> {
738 self.ensure_iteration()?;
739 let logic = self.curr_state.logic();
740 Ok((0..self.graph.num_nodes())
741 .map(|n| logic.estimate(self.curr_state.get_backend(n)) as f32)
742 .collect())
743 }
744}
745
746impl<
747 G1: RandomAccessGraph + Sync,
748 G2: RandomAccessGraph + Sync,
749 D: Succ<Input = usize, Output = usize> + Sync,
750 L: EstimationLogic<Item = usize> + MergeEstimationLogic + Sync,
751 A: EstimatorArrayMut<L> + Sync + AsSyncArray<L>,
752 > HyperBall<'_, G1, G2, D, L, A>
753where
754 L::Backend: PartialEq,
755{
756 fn iterate(
762 &mut self,
763 thread_pool: &ThreadPool,
764 pl: &mut impl ConcurrentProgressLog,
765 ) -> Result<()> {
766 let ic = &mut self.iteration_context;
767
768 pl.info(format_args!("Performing iteration {}", ic.iteration + 1));
769
770 let num_nodes = self.graph.num_nodes() as u64;
772 let num_arcs = self.graph.num_arcs();
773 let modified_estimators = ic.modified_estimators.load(Ordering::Relaxed);
774
775 let prev_was_systolic = ic.systolic;
777 let prev_was_local = ic.local;
778
779 ic.systolic =
782 self.transposed.is_some() && ic.iteration > 0 && modified_estimators < num_nodes / 4;
783
784 *ic.current_nf.lock().unwrap() = if ic.systolic { self.last } else { 0.0 };
789
790 ic.local = ic.pre_local;
793
794 ic.pre_local =
797 ic.systolic && modified_estimators < (num_nodes * num_nodes) / (num_arcs * 10);
798
799 if ic.systolic {
800 pl.info(format_args!(
801 "Starting systolic iteration (local: {}, pre_local: {})",
802 ic.local, ic.pre_local
803 ));
804 } else {
805 pl.info(format_args!("Starting standard iteration"));
806 }
807
808 if prev_was_local {
809 for &node in ic.local_checklist.iter() {
810 ic.next_modified.set(node, false, Ordering::Relaxed);
811 }
812 } else {
813 thread_pool.install(|| ic.next_modified.fill(false, Ordering::Relaxed));
814 }
815
816 if ic.local {
817 thread_pool.join(
820 || ic.local_checklist.clear(),
821 || {
822 let mut local_next_must_be_checked =
823 ic.local_next_must_be_checked.lock().unwrap();
824 local_next_must_be_checked.par_sort_unstable();
825 local_next_must_be_checked.dedup();
826 },
827 );
828 std::mem::swap(
829 &mut ic.local_checklist,
830 &mut ic.local_next_must_be_checked.lock().unwrap(),
831 );
832 } else if ic.systolic {
833 thread_pool.join(
834 || {
835 ic.next_must_be_checked.fill(false, Ordering::Relaxed);
837 },
838 || {
839 if !prev_was_systolic {
841 ic.must_be_checked.fill(true, Ordering::Relaxed);
842 }
843 },
844 );
845 }
846
847 let mut granularity = ic.arc_granularity;
848 let num_threads = thread_pool.current_num_threads();
849
850 if num_threads > 1 && !ic.local {
851 if ic.iteration > 0 {
852 granularity = f64::min(
853 std::cmp::max(1, num_nodes as usize / num_threads) as _,
854 granularity as f64
855 * (num_nodes as f64 / std::cmp::max(1, modified_estimators) as f64),
856 ) as usize;
857 }
858 pl.info(format_args!(
859 "Adaptive granularity for this iteration: {}",
860 granularity
861 ));
862 }
863
864 ic.reset(granularity);
865
866 pl.item_name("arc");
867 pl.expected_updates(if ic.local { None } else { Some(num_arcs as _) });
868 pl.start("Starting parallel execution");
869 {
870 let next_state_sync = self.next_state.as_sync_array();
871 let sum_of_dists = self.sum_of_dists.as_mut().map(|x| x.as_sync_slice());
872 let sum_of_inv_dists = self.sum_of_inv_dists.as_mut().map(|x| x.as_sync_slice());
873
874 let discounted_centralities = &self
875 .discounted_centralities
876 .iter_mut()
877 .map(|s| s.as_sync_slice())
878 .collect::<Vec<_>>();
879 thread_pool.broadcast(|c| {
880 Self::parallel_task(
881 self.graph,
882 self.transposed,
883 &self.curr_state,
884 &next_state_sync,
885 ic,
886 sum_of_dists,
887 sum_of_inv_dists,
888 discounted_centralities,
889 c,
890 )
891 });
892 }
893
894 pl.done_with_count(ic.visited_arcs.load(Ordering::Relaxed) as usize);
895 let modified_estimators = ic.modified_estimators.load(Ordering::Relaxed);
896
897 pl.info(format_args!(
898 "Modified estimators: {}/{} ({:.3}%)",
899 modified_estimators,
900 self.graph.num_nodes(),
901 (modified_estimators as f64 / self.graph.num_nodes() as f64) * 100.0
902 ));
903
904 std::mem::swap(&mut self.curr_state, &mut self.next_state);
905 std::mem::swap(&mut ic.curr_modified, &mut ic.next_modified);
906
907 if ic.systolic {
908 std::mem::swap(&mut ic.must_be_checked, &mut ic.next_must_be_checked);
909 }
910
911 let mut current_nf_mut = ic.current_nf.lock().unwrap();
912 self.last = *current_nf_mut;
913 let &last_output = self
916 .neighborhood_function
917 .as_slice()
918 .last()
919 .expect("Should always have at least 1 element");
920 if *current_nf_mut < last_output {
921 *current_nf_mut = last_output;
922 }
923 self.relative_increment = *current_nf_mut / last_output;
924
925 pl.info(format_args!(
926 "Pairs: {} ({}%)",
927 *current_nf_mut,
928 (*current_nf_mut * 100.0) / (num_nodes * num_nodes) as f64
929 ));
930 pl.info(format_args!(
931 "Absolute increment: {}",
932 *current_nf_mut - last_output
933 ));
934 pl.info(format_args!(
935 "Relative increment: {}",
936 self.relative_increment
937 ));
938
939 self.neighborhood_function.push(*current_nf_mut);
940
941 ic.iteration += 1;
942
943 Ok(())
944 }
945
946 #[allow(clippy::too_many_arguments)]
956 fn parallel_task(
957 graph: &(impl RandomAccessGraph + Sync),
958 transpose: Option<&(impl RandomAccessGraph + Sync)>,
959 curr_state: &impl EstimatorArray<L>,
960 next_state: &impl SyncEstimatorArray<L>,
961 ic: &IterationContext<'_, G1, D>,
962 sum_of_dists: Option<&[SyncCell<f32>]>,
963 sum_of_inv_dists: Option<&[SyncCell<f32>]>,
964 discounted_centralities: &[&[SyncCell<f32>]],
965 _broadcast_context: rayon::BroadcastContext,
966 ) {
967 let node_granularity = ic.arc_granularity;
968 let arc_granularity = ((graph.num_arcs() as f64 * node_granularity as f64)
969 / graph.num_nodes() as f64)
970 .ceil() as usize;
971 let do_centrality = sum_of_dists.is_some()
972 || sum_of_inv_dists.is_some()
973 || !ic.discount_functions.is_empty();
974 let node_upper_limit = if ic.local {
975 ic.local_checklist.len()
976 } else {
977 graph.num_nodes()
978 };
979 let mut visited_arcs = 0;
980 let mut modified_estimators = 0;
981 let arc_upper_limit = graph.num_arcs();
982
983 let mut neighborhood_function_delta = KahanSum::new_with_value(0.0);
987 let mut helper = curr_state.logic().new_helper();
988 let logic = curr_state.logic();
989 let mut next_estimator = logic.new_estimator();
990
991 loop {
992 let (start, end) = if ic.local {
994 let start = std::cmp::min(
995 ic.node_cursor.fetch_add(1, Ordering::Relaxed),
996 node_upper_limit,
997 );
998 let end = std::cmp::min(start + 1, node_upper_limit);
999 (start, end)
1000 } else {
1001 let mut arc_balanced_cursor = ic.arc_cursor.lock().unwrap();
1002 let (mut next_node, mut next_arc) = *arc_balanced_cursor;
1003 if next_node >= node_upper_limit {
1004 (node_upper_limit, node_upper_limit)
1005 } else {
1006 let start = next_node;
1007 let target = next_arc + arc_granularity;
1008 if target as u64 >= arc_upper_limit {
1009 next_node = node_upper_limit;
1010 } else {
1011 (next_node, next_arc) = ic.cumul_outdeg.succ(target).unwrap();
1012 }
1013 let end = next_node;
1014 *arc_balanced_cursor = (next_node, next_arc);
1015 (start, end)
1016 }
1017 };
1018
1019 if start == node_upper_limit {
1020 break;
1021 }
1022
1023 for i in start..end {
1025 let node = if ic.local { ic.local_checklist[i] } else { i };
1026
1027 let prev_estimator = curr_state.get_backend(node);
1028
1029 if !ic.systolic || ic.local || ic.must_be_checked[node] {
1034 next_estimator.set(prev_estimator);
1035 let mut modified = false;
1036 for succ in graph.successors(node) {
1037 if succ != node && ic.curr_modified[succ] {
1038 visited_arcs += 1;
1039 if !modified {
1040 modified = true;
1041 }
1042 logic.merge_with_helper(
1043 next_estimator.as_mut(),
1044 curr_state.get_backend(succ),
1045 &mut helper,
1046 );
1047 }
1048 }
1049
1050 let mut post = f64::NAN;
1051 let estimator_modified = modified && next_estimator.as_ref() != prev_estimator;
1052
1053 if !ic.systolic || estimator_modified {
1058 post = logic.estimate(next_estimator.as_ref())
1059 }
1060 if !ic.systolic {
1061 neighborhood_function_delta += post;
1062 }
1063
1064 if estimator_modified && (ic.systolic || do_centrality) {
1065 let pre = logic.estimate(prev_estimator);
1066 if ic.systolic {
1067 neighborhood_function_delta += -pre;
1068 neighborhood_function_delta += post;
1069 }
1070
1071 if do_centrality {
1072 let delta = post - pre;
1073 if delta > 0.0 {
1075 if let Some(distances) = sum_of_dists {
1076 let new_value = delta * (ic.iteration + 1) as f64;
1077 unsafe {
1078 distances[node]
1079 .set((distances[node].get() as f64 + new_value) as f32)
1080 };
1081 }
1082 if let Some(distances) = sum_of_inv_dists {
1083 let new_value = delta / (ic.iteration + 1) as f64;
1084 unsafe {
1085 distances[node]
1086 .set((distances[node].get() as f64 + new_value) as f32)
1087 };
1088 }
1089 for (func, distances) in ic
1090 .discount_functions
1091 .iter()
1092 .zip(discounted_centralities.iter())
1093 {
1094 let new_value = delta * func(ic.iteration + 1);
1095 unsafe {
1096 distances[node]
1097 .set((distances[node].get() as f64 + new_value) as f32)
1098 };
1099 }
1100 }
1101 }
1102 }
1103
1104 if estimator_modified {
1105 if ic.pre_local {
1110 ic.local_next_must_be_checked.lock().unwrap().push(node);
1111 }
1112 ic.next_modified.set(node, true, Ordering::Relaxed);
1113
1114 if ic.systolic {
1115 debug_assert!(transpose.is_some());
1116 let transpose = unsafe { transpose.unwrap_unchecked() };
1126 if ic.pre_local {
1127 let mut local_next_must_be_checked =
1128 ic.local_next_must_be_checked.lock().unwrap();
1129 for succ in transpose.successors(node) {
1130 local_next_must_be_checked.push(succ);
1131 }
1132 } else {
1133 for succ in transpose.successors(node) {
1134 ic.next_must_be_checked.set(succ, true, Ordering::Relaxed);
1135 }
1136 }
1137 }
1138
1139 modified_estimators += 1;
1140 }
1141
1142 unsafe {
1143 next_state.set(node, next_estimator.as_ref());
1144 }
1145 } else {
1146 if ic.curr_modified[node] {
1150 unsafe {
1151 next_state.set(node, prev_estimator);
1152 }
1153 }
1154 }
1155 }
1156 }
1157
1158 *ic.current_nf.lock().unwrap() += neighborhood_function_delta.sum();
1159 ic.visited_arcs.fetch_add(visited_arcs, Ordering::Relaxed);
1160 ic.modified_estimators
1161 .fetch_add(modified_estimators, Ordering::Relaxed);
1162 }
1163
1164 fn init(
1166 &mut self,
1167 thread_pool: &ThreadPool,
1168 mut rng: impl rand::Rng,
1169 pl: &mut impl ConcurrentProgressLog,
1170 ) -> Result<()> {
1171 pl.start("Initializing estimators");
1172 pl.info(format_args!("Clearing all registers"));
1173
1174 self.curr_state.clear();
1175 self.next_state.clear();
1176
1177 pl.info(format_args!("Initializing registers"));
1178 if let Some(w) = &self.weight {
1179 pl.info(format_args!("Loading weights"));
1180 for (i, &node_weight) in w.iter().enumerate() {
1181 let mut estimator = self.curr_state.get_estimator_mut(i);
1182 for _ in 0..node_weight {
1183 estimator.add(&(rng.random::<u64>() as usize));
1184 }
1185 }
1186 } else {
1187 (0..self.graph.num_nodes()).for_each(|i| {
1188 self.curr_state.get_estimator_mut(i).add(i);
1189 });
1190 }
1191
1192 self.completed = false;
1193
1194 let ic = &mut self.iteration_context;
1195 ic.iteration = 0;
1196 ic.systolic = false;
1197 ic.local = false;
1198 ic.pre_local = false;
1199 ic.reset(self.granularity);
1200
1201 pl.debug(format_args!("Initializing distances"));
1202 if let Some(distances) = &mut self.sum_of_dists {
1203 distances.fill(0.0);
1204 }
1205 if let Some(distances) = &mut self.sum_of_inv_dists {
1206 distances.fill(0.0);
1207 }
1208 pl.debug(format_args!("Initializing centralities"));
1209 for centralities in self.discounted_centralities.iter_mut() {
1210 centralities.fill(0.0);
1211 }
1212
1213 self.last = self.graph.num_nodes() as f64;
1214 pl.debug(format_args!("Initializing neighborhood function"));
1215 self.neighborhood_function.clear();
1216 self.neighborhood_function.push(self.last);
1217
1218 pl.debug(format_args!("Initializing modified estimators"));
1219 thread_pool.install(|| ic.curr_modified.fill(true, Ordering::Relaxed));
1220
1221 pl.done();
1222
1223 Ok(())
1224 }
1225}
1226
1227#[cfg(test)]
1228mod test {
1229 use std::hash::{BuildHasherDefault, DefaultHasher};
1230
1231 use super::*;
1232 use card_est_array::traits::{EstimatorArray, MergeEstimator};
1233 use dsi_progress_logger::no_logging;
1234 use epserde::deser::{Deserialize, Flags};
1235 use rand::SeedableRng;
1236 use webgraph::{
1237 prelude::{BvGraph, DCF},
1238 traits::SequentialLabeling,
1239 };
1240
1241 type HyperBallArray<G> = SliceEstimatorArray<
1242 HyperLogLog<<G as SequentialLabeling>::Label, BuildHasherDefault<DefaultHasher>, usize>,
1243 usize,
1244 Box<[usize]>,
1245 >;
1246
1247 struct SeqHyperBall<'a, G: RandomAccessGraph> {
1248 graph: &'a G,
1249 curr_state: HyperBallArray<G>,
1250 next_state: HyperBallArray<G>,
1251 }
1252
1253 impl<G: RandomAccessGraph> SeqHyperBall<'_, G> {
1254 fn init(&mut self) {
1255 for i in 0..self.graph.num_nodes() {
1256 self.curr_state.get_estimator_mut(i).add(i);
1257 }
1258 }
1259
1260 fn iterate(&mut self) {
1261 for i in 0..self.graph.num_nodes() {
1262 let mut estimator = self.next_state.get_estimator_mut(i);
1263 estimator.set(self.curr_state.get_backend(i));
1264 for succ in self.graph.successors(i) {
1265 estimator.merge(self.curr_state.get_backend(succ));
1266 }
1267 }
1268 std::mem::swap(&mut self.curr_state, &mut self.next_state);
1269 }
1270 }
1271
1272 #[cfg_attr(feature = "slow_tests", test)]
1273 #[cfg_attr(not(feature = "slow_tests"), allow(dead_code))]
1274 fn test_cnr_2000() -> Result<()> {
1275 let basename = "../data/cnr-2000";
1276
1277 let graph = BvGraph::with_basename(basename).load()?;
1278 let transpose = BvGraph::with_basename(basename.to_owned() + "-t").load()?;
1279 let cumulative = DCF::load_mmap(basename.to_owned() + ".dcf", Flags::empty())?;
1280
1281 let num_nodes = graph.num_nodes();
1282
1283 let hyper_log_log = HyperLogLogBuilder::new(num_nodes)
1284 .log_2_num_reg(6)
1285 .build()?;
1286
1287 let seq_bits = SliceEstimatorArray::new(hyper_log_log.clone(), num_nodes);
1288 let seq_result_bits = SliceEstimatorArray::new(hyper_log_log.clone(), num_nodes);
1289 let par_bits = SliceEstimatorArray::new(hyper_log_log.clone(), num_nodes);
1290 let par_result_bits = SliceEstimatorArray::new(hyper_log_log.clone(), num_nodes);
1291
1292 let mut hyperball = HyperBallBuilder::with_transpose(
1293 &graph,
1294 &transpose,
1295 cumulative.as_ref(),
1296 par_bits,
1297 par_result_bits,
1298 )
1299 .build(no_logging![]);
1300 let mut seq_hyperball = SeqHyperBall {
1301 curr_state: seq_bits,
1302 next_state: seq_result_bits,
1303 graph: &graph,
1304 };
1305
1306 let mut modified_estimators = num_nodes as u64;
1307 let threads = thread_pool![];
1308 let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
1309 hyperball.init(&threads, &mut rng, no_logging![])?;
1310 seq_hyperball.init();
1311
1312 while modified_estimators != 0 {
1313 hyperball.iterate(&threads, no_logging![])?;
1314 seq_hyperball.iterate();
1315
1316 modified_estimators = hyperball
1317 .iteration_context
1318 .modified_estimators
1319 .load(Ordering::Relaxed);
1320
1321 assert_eq!(
1322 hyperball.next_state.as_ref(),
1323 seq_hyperball.next_state.as_ref()
1324 );
1325 assert_eq!(
1326 hyperball.curr_state.as_ref(),
1327 seq_hyperball.curr_state.as_ref()
1328 );
1329 }
1330
1331 Ok(())
1332 }
1333}