webgraph_algo/distances/
hyperball.rs

1/*
2 * SPDX-FileCopyrightText: 2024 Matteo Dell'Acqua
3 * SPDX-FileCopyrightText: 2025 Sebastiano Vigna
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8use anyhow::{bail, ensure, Context, Result};
9use card_est_array::impls::{HyperLogLog, HyperLogLogBuilder, SliceEstimatorArray};
10use card_est_array::traits::{
11    AsSyncArray, EstimationLogic, EstimatorArray, EstimatorArrayMut, EstimatorMut,
12    MergeEstimationLogic, SyncEstimatorArray,
13};
14use dsi_progress_logger::ConcurrentProgressLog;
15use kahan::KahanSum;
16use rayon::{prelude::*, ThreadPool};
17use std::hash::{BuildHasherDefault, DefaultHasher};
18use std::sync::{atomic::*, Mutex};
19use sux::{bits::AtomicBitVec, traits::Succ};
20use sync_cell_slice::{SyncCell, SyncSlice};
21use webgraph::traits::{RandomAccessGraph, SequentialLabeling};
22use webgraph::utils::Granularity;
23
24/// A builder for [`HyperBall`].
25///
26/// After creating a builder with [`HyperBallBuilder::new`] you can configure it
27/// using setters such as [`HyperBallBuilder`] its methods, then call
28/// [`HyperBallBuilder::build`] on it to create a [`HyperBall`] instance.
29pub struct HyperBallBuilder<
30    'a,
31    G1: RandomAccessGraph + Sync,
32    G2: RandomAccessGraph + Sync,
33    D: Succ<Input = usize, Output = usize>,
34    L: MergeEstimationLogic<Item = G1::Label>,
35    A: EstimatorArrayMut<L>,
36> {
37    /// A graph.
38    graph: &'a G1,
39    /// The transpose of `graph`, if any.
40    transpose: Option<&'a G2>,
41    /// The outdegree cumulative function of the graph.
42    cumul_outdegree: &'a D,
43    /// Whether to compute the sum of distances (e.g., for closeness centrality).
44    do_sum_of_dists: bool,
45    /// Whether to compute the sum of inverse distances (e.g., for harmonic centrality).
46    do_sum_of_inv_dists: bool,
47    /// Custom discount functions whose sum should be computed.
48    discount_functions: Vec<Box<dyn Fn(usize) -> f64 + Sync + 'a>>,
49    /// The arc granularity.
50    arc_granularity: usize,
51    /// Integer weights for the nodes, if any.
52    weights: Option<&'a [usize]>,
53    /// A first array of estimators.
54    array_0: A,
55    /// A second array of estimators of the same length and with the same logic of
56    /// `array_0`.
57    array_1: A,
58    _marker: std::marker::PhantomData<L>,
59}
60
61impl<
62        'a,
63        G1: RandomAccessGraph + Sync,
64        G2: RandomAccessGraph + Sync,
65        D: Succ<Input = usize, Output = usize>,
66    >
67    HyperBallBuilder<
68        'a,
69        G1,
70        G2,
71        D,
72        HyperLogLog<G1::Label, BuildHasherDefault<DefaultHasher>, usize>,
73        SliceEstimatorArray<
74            HyperLogLog<G1::Label, BuildHasherDefault<DefaultHasher>, usize>,
75            usize,
76            Box<[usize]>,
77        >,
78    >
79{
80    /// A builder for [`HyperBall`] using a specified [`EstimationLogic`].
81    ///
82    /// # Arguments
83    /// * `graph`: the graph to analyze.
84    /// * `transpose`: optionally, the transpose of `graph`. If [`None`], no
85    ///   systolic iterations will be performed by the resulting [`HyperBall`].
86    /// * `cumul_outdeg`: the outdegree cumulative function of the graph.
87    /// * `log2m`: the base-2 logarithm of the number *m* of register per
88    ///   HyperLogLog cardinality estimator.
89    /// * `weights`: the weights to use. If [`None`] every node is assumed to be
90    ///   of weight equal to 1.
91    pub fn with_hyper_log_log(
92        graph: &'a G1,
93        transposed: Option<&'a G2>,
94        cumul_outdeg: &'a D,
95        log2m: usize,
96        weights: Option<&'a [usize]>,
97    ) -> Result<Self> {
98        let num_elements = if let Some(w) = weights {
99            ensure!(
100                w.len() == graph.num_nodes(),
101                "weights should have length equal to the graph's number of nodes"
102            );
103            w.iter().sum()
104        } else {
105            graph.num_nodes()
106        };
107
108        let logic = HyperLogLogBuilder::new(num_elements)
109            .log_2_num_reg(log2m)
110            .build()
111            .with_context(|| "Could not build HyperLogLog logic")?;
112
113        let array_0 = SliceEstimatorArray::new(logic.clone(), graph.num_nodes());
114        let array_1 = SliceEstimatorArray::new(logic, graph.num_nodes());
115
116        Ok(Self {
117            graph,
118            transpose: transposed,
119            cumul_outdegree: cumul_outdeg,
120            do_sum_of_dists: false,
121            do_sum_of_inv_dists: false,
122            discount_functions: Vec::new(),
123            arc_granularity: Self::DEFAULT_GRANULARITY,
124            weights,
125            array_0,
126            array_1,
127            _marker: std::marker::PhantomData,
128        })
129    }
130}
131
132impl<
133        'a,
134        D: Succ<Input = usize, Output = usize>,
135        G: RandomAccessGraph + Sync,
136        L: MergeEstimationLogic<Item = G::Label> + PartialEq,
137        A: EstimatorArrayMut<L>,
138    > HyperBallBuilder<'a, G, G, D, L, A>
139{
140    /// Creates a new builder with default parameters.
141    ///
142    /// # Arguments
143    /// * `graph`: the graph to analyze.
144    /// * `cumul_outdeg`: the outdegree cumulative function of the graph.
145    /// * `array_0`: a first array of estimators.
146    /// * `array_1`: A second array of estimators of the same length and with the same logic of
147    ///   `array_0`.
148    pub fn new(graph: &'a G, cumul_outdeg: &'a D, array_0: A, array_1: A) -> Self {
149        assert!(array_0.logic() == array_1.logic(), "Incompatible logics");
150        assert_eq!(
151            graph.num_nodes(),
152            array_0.len(),
153            "array_0 should have length {}. Got {}",
154            graph.num_nodes(),
155            array_0.len()
156        );
157        assert_eq!(
158            graph.num_nodes(),
159            array_1.len(),
160            "array_1 should have length {}. Got {}",
161            graph.num_nodes(),
162            array_1.len()
163        );
164        Self {
165            graph,
166            transpose: None,
167            cumul_outdegree: cumul_outdeg,
168            do_sum_of_dists: false,
169            do_sum_of_inv_dists: false,
170            discount_functions: Vec::new(),
171            arc_granularity: Self::DEFAULT_GRANULARITY,
172            weights: None,
173            array_0,
174            array_1,
175            _marker: std::marker::PhantomData,
176        }
177    }
178}
179
180impl<
181        'a,
182        G1: RandomAccessGraph + Sync,
183        G2: RandomAccessGraph + Sync,
184        D: Succ<Input = usize, Output = usize>,
185        L: MergeEstimationLogic<Item = G1::Label>,
186        A: EstimatorArrayMut<L>,
187    > HyperBallBuilder<'a, G1, G2, D, L, A>
188{
189    const DEFAULT_GRANULARITY: usize = 16 * 1024;
190
191    /// Creates a new builder with default parameters using also the transpose.
192    ///
193    /// * `graph`: the graph to analyze.
194    /// * `transpose`: optionally, the transpose of `graph`. If [`None`], no
195    ///   systolic iterations will be performed by the resulting [`HyperBall`].
196    /// * `cumul_outdeg`: the outdegree cumulative function of the graph.
197    /// * `array_0`: a first array of estimators.
198    /// * `array_1`: A second array of estimators of the same length and with the same logic of
199    ///   `array_0`.
200    pub fn with_transpose(
201        graph: &'a G1,
202        transpose: &'a G2,
203        cumul_outdeg: &'a D,
204        array_0: A,
205        array_1: A,
206    ) -> Self {
207        assert_eq!(
208            graph.num_nodes(),
209            array_0.len(),
210            "array_0 should have have len {}. Got {}",
211            graph.num_nodes(),
212            array_0.len()
213        );
214        assert_eq!(
215            graph.num_nodes(),
216            array_1.len(),
217            "array_1 should have have len {}. Got {}",
218            graph.num_nodes(),
219            array_1.len()
220        );
221        assert_eq!(
222            transpose.num_nodes(),
223            graph.num_nodes(),
224            "the transpose should have same number of nodes of the graph ({}). Got {}.",
225            graph.num_nodes(),
226            transpose.num_nodes()
227        );
228        assert_eq!(
229            transpose.num_arcs(),
230            graph.num_arcs(),
231            "the transpose should have same number of nodes of the graph ({}). Got {}.",
232            graph.num_arcs(),
233            transpose.num_arcs()
234        );
235        /* TODOdebug_assert!(
236            check_transposed(graph, transpose),
237            "the transpose should be the transpose of the graph"
238        );*/
239        Self {
240            graph,
241            transpose: Some(transpose),
242            cumul_outdegree: cumul_outdeg,
243            do_sum_of_dists: false,
244            do_sum_of_inv_dists: false,
245            discount_functions: Vec::new(),
246            arc_granularity: Self::DEFAULT_GRANULARITY,
247            weights: None,
248            array_0,
249            array_1,
250            _marker: std::marker::PhantomData,
251        }
252    }
253
254    /// Sets whether to compute the sum of distances.
255    pub fn sum_of_distances(mut self, do_sum_of_distances: bool) -> Self {
256        self.do_sum_of_dists = do_sum_of_distances;
257        self
258    }
259
260    /// Sets whether to compute the sum of inverse distances.
261    pub fn sum_of_inverse_distances(mut self, do_sum_of_inverse_distances: bool) -> Self {
262        self.do_sum_of_inv_dists = do_sum_of_inverse_distances;
263        self
264    }
265
266    /// Sets the base granularity used in the parallel phases of the iterations.
267    pub fn granularity(mut self, granularity: Granularity) -> Self {
268        self.arc_granularity =
269            granularity.arc_granularity(self.graph.num_nodes(), Some(self.graph.num_arcs()));
270        self
271    }
272
273    /// Sets optional weights for the nodes of the graph.
274    ///
275    /// # Arguments
276    /// * `weights`: weights to use for the nodes. If [`None`], every node is
277    ///   assumed to be of weight equal to 1.
278    pub fn weights(mut self, weights: Option<&'a [usize]>) -> Self {
279        if let Some(w) = weights {
280            assert_eq!(w.len(), self.graph.num_nodes());
281        }
282        self.weights = weights;
283        self
284    }
285
286    /// Adds a new discount function whose sum over all spheres should be
287    /// computed.
288    pub fn discount_function(
289        mut self,
290        discount_function: impl Fn(usize) -> f64 + Sync + 'a,
291    ) -> Self {
292        self.discount_functions.push(Box::new(discount_function));
293        self
294    }
295
296    /// Removes all custom discount functions.
297    pub fn no_discount_function(mut self) -> Self {
298        self.discount_functions.clear();
299        self
300    }
301}
302
303impl<
304        'a,
305        G1: RandomAccessGraph + Sync,
306        G2: RandomAccessGraph + Sync,
307        D: Succ<Input = usize, Output = usize>,
308        L: MergeEstimationLogic<Item = G1::Label> + Sync + std::fmt::Display,
309        A: EstimatorArrayMut<L>,
310    > HyperBallBuilder<'a, G1, G2, D, L, A>
311{
312    /// Builds a [`HyperBall`] instance.
313    ///
314    /// # Arguments
315    ///
316    /// * `pl`: A progress logger.
317    #[allow(clippy::type_complexity)]
318    pub fn build(self, pl: &mut impl ConcurrentProgressLog) -> HyperBall<'a, G1, G2, D, L, A> {
319        let num_nodes = self.graph.num_nodes();
320
321        let sum_of_distances = if self.do_sum_of_dists {
322            pl.debug(format_args!("Initializing sum of distances"));
323            Some(vec![0.0; num_nodes])
324        } else {
325            pl.debug(format_args!("Skipping sum of distances"));
326            None
327        };
328        let sum_of_inverse_distances = if self.do_sum_of_inv_dists {
329            pl.debug(format_args!("Initializing sum of inverse distances"));
330            Some(vec![0.0; num_nodes])
331        } else {
332            pl.debug(format_args!("Skipping sum of inverse distances"));
333            None
334        };
335
336        let mut discounted_centralities = Vec::new();
337        pl.debug(format_args!(
338            "Initializing {} discount functions",
339            self.discount_functions.len()
340        ));
341        for _ in self.discount_functions.iter() {
342            discounted_centralities.push(vec![0.0; num_nodes]);
343        }
344
345        pl.info(format_args!("Initializing bit vectors"));
346        let estimator_modified = AtomicBitVec::new(num_nodes);
347        let modified_result_estimator = AtomicBitVec::new(num_nodes);
348        let must_be_checked = AtomicBitVec::new(num_nodes);
349        let next_must_be_checked = AtomicBitVec::new(num_nodes);
350
351        pl.info(format_args!(
352            "Using estimation logic: {}",
353            self.array_0.logic()
354        ));
355
356        HyperBall {
357            graph: self.graph,
358            transposed: self.transpose,
359            weight: self.weights,
360            granularity: self.arc_granularity,
361            curr_state: self.array_0,
362            next_state: self.array_1,
363            completed: false,
364            neighborhood_function: Vec::new(),
365            last: 0.0,
366            relative_increment: 0.0,
367            sum_of_dists: sum_of_distances,
368            sum_of_inv_dists: sum_of_inverse_distances,
369            discounted_centralities,
370            iteration_context: IterationContext {
371                cumul_outdeg: self.cumul_outdegree,
372                iteration: 0,
373                current_nf: Mutex::new(0.0),
374                arc_granularity: 0,
375                node_cursor: AtomicUsize::new(0),
376                arc_cursor: Mutex::new((0, 0)),
377                visited_arcs: AtomicU64::new(0),
378                modified_estimators: AtomicU64::new(0),
379                systolic: false,
380                local: false,
381                pre_local: false,
382                local_checklist: Vec::new(),
383                local_next_must_be_checked: Mutex::new(Vec::new()),
384                must_be_checked,
385                next_must_be_checked,
386                curr_modified: estimator_modified,
387                next_modified: modified_result_estimator,
388                discount_functions: self.discount_functions,
389            },
390            _marker: std::marker::PhantomData,
391        }
392    }
393}
394
395/// Data used by [`parallel_task`](HyperBall::parallel_task).
396///
397/// These variables are used by the threads running
398/// [`parallel_task`](HyperBall::parallel_task). They must be isolated in a
399/// field because we need to be able to borrow exclusively
400/// [`HyperBall::next_state`], while sharing references to the data contained
401/// here and to the [`HyperBall::curr_state`].
402struct IterationContext<'a, G1: SequentialLabeling, D> {
403    /// The cumulative list of outdegrees.
404    cumul_outdeg: &'a D,
405    /// The number of the current iteration.
406    iteration: usize,
407    /// The value of the neighborhood function computed during the current iteration.
408    current_nf: Mutex<f64>,
409    /// The arc granularity: each task will try to process at least this number
410    /// of arcs.
411    arc_granularity: usize,
412    /// A cursor scanning the nodes to process during local computations.
413    node_cursor: AtomicUsize,
414    /// A cursor scanning the nodes and arcs to process during non-local
415    /// computations.
416    arc_cursor: Mutex<(usize, usize)>,
417    /// The number of arcs visited during the current iteration.
418    visited_arcs: AtomicU64,
419    /// The number of estimators modified during the current iteration.
420    modified_estimators: AtomicU64,
421    /// `true` if we started a systolic computation.
422    systolic: bool,
423    /// `true` if we started a local computation.
424    local: bool,
425    /// `true` if we are preparing a local computation (systolic is `true` and less than 1% nodes were modified).
426    pre_local: bool,
427    /// If [`local`](Self::local) is `true`, the sorted list of nodes that
428    /// should be scanned.
429    local_checklist: Vec<G1::Label>,
430    /// If [`pre_local`](Self::pre_local) is `true`, the set of nodes that
431    /// should be scanned on the next iteration.
432    local_next_must_be_checked: Mutex<Vec<G1::Label>>,
433    /// Used in systolic iterations to keep track of nodes to check.
434    must_be_checked: AtomicBitVec,
435    /// Used in systolic iterations to keep track of nodes to check in the next
436    /// iteration.
437    next_must_be_checked: AtomicBitVec,
438    /// Whether each estimator has been modified during the previous iteration.
439    curr_modified: AtomicBitVec,
440    /// Whether each estimator has been modified during the current iteration.
441    next_modified: AtomicBitVec,
442    /// Custom discount functions whose sum should be computed.
443    discount_functions: Vec<Box<dyn Fn(usize) -> f64 + Sync + 'a>>,
444}
445
446impl<G1: SequentialLabeling, D> IterationContext<'_, G1, D> {
447    /// Resets the iteration context
448    fn reset(&mut self, granularity: usize) {
449        self.arc_granularity = granularity;
450        self.node_cursor.store(0, Ordering::Relaxed);
451        *self.arc_cursor.lock().unwrap() = (0, 0);
452        self.visited_arcs.store(0, Ordering::Relaxed);
453        self.modified_estimators.store(0, Ordering::Relaxed);
454    }
455}
456
457/// An algorithm that computes an approximation of the neighborhood function,
458/// of the size of the reachable sets, and of (discounted) positive geometric
459/// centralities of a graph.
460pub struct HyperBall<
461    'a,
462    G1: RandomAccessGraph + Sync,
463    G2: RandomAccessGraph + Sync,
464    D: Succ<Input = usize, Output = usize>,
465    L: MergeEstimationLogic<Item = G1::Label> + Sync,
466    A: EstimatorArrayMut<L>,
467> {
468    /// The graph to analyze.
469    graph: &'a G1,
470    /// The transpose of [`Self::graph`], if any.
471    transposed: Option<&'a G2>,
472    /// An optional slice of nonnegative node weights.
473    weight: Option<&'a [usize]>,
474    /// The base number of nodes per task. TODO.
475    granularity: usize,
476    /// The previous state.
477    curr_state: A,
478    /// The next state.
479    next_state: A,
480    /// `true` if the computation is over.
481    completed: bool,
482    /// The neighborhood function.
483    neighborhood_function: Vec<f64>,
484    /// The value computed by the last iteration.
485    last: f64,
486    /// The relative increment of the neighborhood function for the last
487    /// iteration.
488    relative_increment: f64,
489    /// The sum of the distances from every given node, if requested.
490    sum_of_dists: Option<Vec<f32>>,
491    /// The sum of inverse distances from each given node, if requested.
492    sum_of_inv_dists: Option<Vec<f32>>,
493    /// The overall discount centrality for every [`Self::discount_functions`].
494    discounted_centralities: Vec<Vec<f32>>,
495    /// Context used in a single iteration.
496    iteration_context: IterationContext<'a, G1, D>,
497    _marker: std::marker::PhantomData<L>,
498}
499
500impl<
501        G1: RandomAccessGraph + Sync,
502        G2: RandomAccessGraph + Sync,
503        D: Succ<Input = usize, Output = usize> + Sync,
504        L: MergeEstimationLogic<Item = usize> + Sync,
505        A: EstimatorArrayMut<L> + Sync + AsSyncArray<L>,
506    > HyperBall<'_, G1, G2, D, L, A>
507where
508    L::Backend: PartialEq,
509{
510    /// Runs HyperBall.
511    ///
512    /// # Arguments
513    ///
514    /// * `upper_bound`: an upper bound to the number of iterations.
515    ///
516    /// * `threshold`: a value that will be used to stop the computation by
517    ///   relative increment if the neighborhood function is being computed. If
518    ///   [`None`] the computation will stop when no estimators are modified.
519    ///
520    /// * `thread_pool`: The thread pool to use for parallel computation.
521    ///
522    /// * `pl`: A progress logger.
523    pub fn run(
524        &mut self,
525        upper_bound: usize,
526        threshold: Option<f64>,
527        thread_pool: &ThreadPool,
528        rng: impl rand::Rng,
529        pl: &mut impl ConcurrentProgressLog,
530    ) -> Result<()> {
531        let upper_bound = std::cmp::min(upper_bound, self.graph.num_nodes());
532
533        self.init(thread_pool, rng, pl)
534            .with_context(|| "Could not initialize estimator")?;
535
536        pl.item_name("iteration");
537        pl.expected_updates(None);
538        pl.start(format!(
539            "Running Hyperball for a maximum of {} iterations and a threshold of {:?}",
540            upper_bound, threshold
541        ));
542
543        for i in 0..upper_bound {
544            self.iterate(thread_pool, pl)
545                .with_context(|| format!("Could not perform iteration {}", i + 1))?;
546
547            pl.update_and_display();
548
549            if self
550                .iteration_context
551                .modified_estimators
552                .load(Ordering::Relaxed)
553                == 0
554            {
555                pl.info(format_args!(
556                    "Terminating HyperBall after {} iteration(s) by stabilization",
557                    i + 1
558                ));
559                break;
560            }
561
562            if let Some(t) = threshold {
563                if i > 3 && self.relative_increment < (1.0 + t) {
564                    pl.info(format_args!("Terminating HyperBall after {} iteration(s) by relative bound on the neighborhood function", i + 1));
565                    break;
566                }
567            }
568        }
569
570        pl.done();
571
572        Ok(())
573    }
574
575    /// Runs HyperBall until no estimators are modified.
576    ///
577    /// # Arguments
578    ///
579    /// * `upper_bound`: an upper bound to the number of iterations.
580    ///
581    /// * `thread_pool`: The thread pool to use for parallel computation.
582    ///
583    /// * `pl`: A progress logger.
584    #[inline(always)]
585    pub fn run_until_stable(
586        &mut self,
587        upper_bound: usize,
588        thread_pool: &ThreadPool,
589        rng: impl rand::Rng,
590        pl: &mut impl ConcurrentProgressLog,
591    ) -> Result<()> {
592        self.run(upper_bound, None, thread_pool, rng, pl)
593            .with_context(|| "Could not complete run_until_stable")
594    }
595
596    /// Runs HyperBall until no estimators are modified with no upper bound on the
597    /// number of iterations.
598    ///
599    /// # Arguments
600    ///
601    /// * `thread_pool`: The thread pool to use for parallel computation.
602    ///
603    /// * `pl`: A progress logger.
604    #[inline(always)]
605    pub fn run_until_done(
606        &mut self,
607        thread_pool: &ThreadPool,
608        rng: impl rand::Rng,
609        pl: &mut impl ConcurrentProgressLog,
610    ) -> Result<()> {
611        self.run_until_stable(usize::MAX, thread_pool, rng, pl)
612            .with_context(|| "Could not complete run_until_done")
613    }
614
615    #[inline(always)]
616    fn ensure_iteration(&self) -> Result<()> {
617        ensure!(
618            self.iteration_context.iteration > 0,
619            "HyperBall was not run. Please call HyperBall::run before accessing computed fields."
620        );
621        Ok(())
622    }
623
624    /// Returns the neighborhood function computed by this instance.
625    pub fn neighborhood_function(&self) -> Result<&[f64]> {
626        self.ensure_iteration()?;
627        Ok(&self.neighborhood_function)
628    }
629
630    /// Returns the sum of distances computed by this instance if requested.
631    pub fn sum_of_distances(&self) -> Result<&[f32]> {
632        self.ensure_iteration()?;
633        if let Some(distances) = &self.sum_of_dists {
634            // TODO these are COPIES
635            Ok(distances)
636        } else {
637            bail!("Sum of distances were not requested: use builder.with_sum_of_distances(true) while building HyperBall to compute them")
638        }
639    }
640
641    /// Returns the harmonic centralities (sum of inverse distances) computed by this instance if requested.
642    pub fn harmonic_centralities(&self) -> Result<&[f32]> {
643        self.ensure_iteration()?;
644        if let Some(distances) = &self.sum_of_inv_dists {
645            Ok(distances)
646        } else {
647            bail!("Sum of inverse distances were not requested: use builder.with_sum_of_inverse_distances(true) while building HyperBall to compute them")
648        }
649    }
650
651    /// Returns the discounted centralities of the specified index computed by this instance.
652    ///
653    /// # Arguments
654    /// * `index`: the index of the requested discounted centrality.
655    pub fn discounted_centrality(&self, index: usize) -> Result<&[f32]> {
656        self.ensure_iteration()?;
657        let d = self.discounted_centralities.get(index);
658        if let Some(distances) = d {
659            Ok(distances)
660        } else {
661            bail!("Discount centrality of index {} does not exist", index)
662        }
663    }
664
665    /// Computes and returns the closeness centralities from the sum of distances computed by this instance.
666    pub fn closeness_centrality(&self) -> Result<Box<[f32]>> {
667        self.ensure_iteration()?;
668        if let Some(distances) = &self.sum_of_dists {
669            Ok(distances
670                .iter()
671                .map(|&d| if d == 0.0 { 0.0 } else { d.recip() })
672                .collect())
673        } else {
674            bail!("Sum of distances were not requested: use builder.with_sum_of_distances(true) while building HyperBall to compute closeness centrality")
675        }
676    }
677
678    /// Computes and returns the lin centralities from the sum of distances computed by this instance.
679    ///
680    /// Note that lin's index for isolated nodes is by (our) definition one (it's smaller than any other node).
681    pub fn lin_centrality(&self) -> Result<Box<[f32]>> {
682        self.ensure_iteration()?;
683        if let Some(distances) = &self.sum_of_dists {
684            let logic = self.curr_state.logic();
685            Ok(distances
686                .iter()
687                .enumerate()
688                .map(|(node, &d)| {
689                    if d == 0.0 {
690                        1.0
691                    } else {
692                        let count = logic.estimate(self.curr_state.get_backend(node));
693                        (count * count / d as f64) as f32
694                    }
695                })
696                .collect())
697        } else {
698            bail!("Sum of distances were not requested: use builder.with_sum_of_distances(true) while building HyperBall to compute lin centrality")
699        }
700    }
701
702    /// Computes and returns the Nieminen centralities from the sum of distances computed by this instance.
703    pub fn nieminen_centrality(&self) -> Result<Box<[f32]>> {
704        self.ensure_iteration()?;
705        if let Some(distances) = &self.sum_of_dists {
706            let logic = self.curr_state.logic();
707            Ok(distances
708                .iter()
709                .enumerate()
710                .map(|(node, &d)| {
711                    let count = logic.estimate(self.curr_state.get_backend(node));
712                    ((count * count) - d as f64) as f32
713                })
714                .collect())
715        } else {
716            bail!("Sum of distances were not requested: use builder.with_sum_of_distances(true) while building HyperBall to compute lin centrality")
717        }
718    }
719
720    /// Reads from the internal estimator array and estimates the number of nodes
721    /// reachable from the specified node.
722    ///
723    /// # Arguments
724    /// * `node`: the index of the node to compute reachable nodes from.
725    pub fn reachable_nodes_from(&self, node: usize) -> Result<f64> {
726        self.ensure_iteration()?;
727        Ok(self
728            .curr_state
729            .logic()
730            .estimate(self.curr_state.get_backend(node)))
731    }
732
733    /// Reads from the internal estimator array and estimates the number of nodes reachable
734    /// from every node of the graph.
735    ///
736    /// `hyperball.reachable_nodes().unwrap()[i]` is equal to `hyperball.reachable_nodes_from(i).unwrap()`.
737    pub fn reachable_nodes(&self) -> Result<Box<[f32]>> {
738        self.ensure_iteration()?;
739        let logic = self.curr_state.logic();
740        Ok((0..self.graph.num_nodes())
741            .map(|n| logic.estimate(self.curr_state.get_backend(n)) as f32)
742            .collect())
743    }
744}
745
746impl<
747        G1: RandomAccessGraph + Sync,
748        G2: RandomAccessGraph + Sync,
749        D: Succ<Input = usize, Output = usize> + Sync,
750        L: EstimationLogic<Item = usize> + MergeEstimationLogic + Sync,
751        A: EstimatorArrayMut<L> + Sync + AsSyncArray<L>,
752    > HyperBall<'_, G1, G2, D, L, A>
753where
754    L::Backend: PartialEq,
755{
756    /// Performs a new iteration of HyperBall.
757    ///
758    /// # Arguments
759    /// * `thread_pool`: The thread pool to use for parallel computation.
760    /// * `pl`: A progress logger.
761    fn iterate(
762        &mut self,
763        thread_pool: &ThreadPool,
764        pl: &mut impl ConcurrentProgressLog,
765    ) -> Result<()> {
766        let ic = &mut self.iteration_context;
767
768        pl.info(format_args!("Performing iteration {}", ic.iteration + 1));
769
770        // Alias the number of modified estimators, nodes and arcs
771        let num_nodes = self.graph.num_nodes() as u64;
772        let num_arcs = self.graph.num_arcs();
773        let modified_estimators = ic.modified_estimators.load(Ordering::Relaxed);
774
775        // Let us record whether the previous computation was systolic or local
776        let prev_was_systolic = ic.systolic;
777        let prev_was_local = ic.local;
778
779        // If less than one fourth of the nodes have been modified, and we have
780        // the transpose, it is time to pass to a systolic computation
781        ic.systolic =
782            self.transposed.is_some() && ic.iteration > 0 && modified_estimators < num_nodes / 4;
783
784        // Non-systolic computations add up the values of all estimators.
785        //
786        // Systolic computations modify the last value by compensating for each
787        // modified estimators.
788        *ic.current_nf.lock().unwrap() = if ic.systolic { self.last } else { 0.0 };
789
790        // If we completed the last iteration in pre-local mode, we MUST run in
791        // local mode
792        ic.local = ic.pre_local;
793
794        // We run in pre-local mode if we are systolic and few nodes where
795        // modified.
796        ic.pre_local =
797            ic.systolic && modified_estimators < (num_nodes * num_nodes) / (num_arcs * 10);
798
799        if ic.systolic {
800            pl.info(format_args!(
801                "Starting systolic iteration (local: {}, pre_local: {})",
802                ic.local, ic.pre_local
803            ));
804        } else {
805            pl.info(format_args!("Starting standard iteration"));
806        }
807
808        if prev_was_local {
809            for &node in ic.local_checklist.iter() {
810                ic.next_modified.set(node, false, Ordering::Relaxed);
811            }
812        } else {
813            thread_pool.install(|| ic.next_modified.fill(false, Ordering::Relaxed));
814        }
815
816        if ic.local {
817            // In case of a local computation, we convert the set of
818            // must-be-checked for the next iteration into a check list
819            thread_pool.join(
820                || ic.local_checklist.clear(),
821                || {
822                    let mut local_next_must_be_checked =
823                        ic.local_next_must_be_checked.lock().unwrap();
824                    local_next_must_be_checked.par_sort_unstable();
825                    local_next_must_be_checked.dedup();
826                },
827            );
828            std::mem::swap(
829                &mut ic.local_checklist,
830                &mut ic.local_next_must_be_checked.lock().unwrap(),
831            );
832        } else if ic.systolic {
833            thread_pool.join(
834                || {
835                    // Systolic, non-local computations store the could-be-modified set implicitly into Self::next_must_be_checked.
836                    ic.next_must_be_checked.fill(false, Ordering::Relaxed);
837                },
838                || {
839                    // If the previous computation wasn't systolic, we must assume that all registers could have changed.
840                    if !prev_was_systolic {
841                        ic.must_be_checked.fill(true, Ordering::Relaxed);
842                    }
843                },
844            );
845        }
846
847        let mut granularity = ic.arc_granularity;
848        let num_threads = thread_pool.current_num_threads();
849
850        if num_threads > 1 && !ic.local {
851            if ic.iteration > 0 {
852                granularity = f64::min(
853                    std::cmp::max(1, num_nodes as usize / num_threads) as _,
854                    granularity as f64
855                        * (num_nodes as f64 / std::cmp::max(1, modified_estimators) as f64),
856                ) as usize;
857            }
858            pl.info(format_args!(
859                "Adaptive granularity for this iteration: {}",
860                granularity
861            ));
862        }
863
864        ic.reset(granularity);
865
866        pl.item_name("arc");
867        pl.expected_updates(if ic.local { None } else { Some(num_arcs as _) });
868        pl.start("Starting parallel execution");
869        {
870            let next_state_sync = self.next_state.as_sync_array();
871            let sum_of_dists = self.sum_of_dists.as_mut().map(|x| x.as_sync_slice());
872            let sum_of_inv_dists = self.sum_of_inv_dists.as_mut().map(|x| x.as_sync_slice());
873
874            let discounted_centralities = &self
875                .discounted_centralities
876                .iter_mut()
877                .map(|s| s.as_sync_slice())
878                .collect::<Vec<_>>();
879            thread_pool.broadcast(|c| {
880                Self::parallel_task(
881                    self.graph,
882                    self.transposed,
883                    &self.curr_state,
884                    &next_state_sync,
885                    ic,
886                    sum_of_dists,
887                    sum_of_inv_dists,
888                    discounted_centralities,
889                    c,
890                )
891            });
892        }
893
894        pl.done_with_count(ic.visited_arcs.load(Ordering::Relaxed) as usize);
895        let modified_estimators = ic.modified_estimators.load(Ordering::Relaxed);
896
897        pl.info(format_args!(
898            "Modified estimators: {}/{} ({:.3}%)",
899            modified_estimators,
900            self.graph.num_nodes(),
901            (modified_estimators as f64 / self.graph.num_nodes() as f64) * 100.0
902        ));
903
904        std::mem::swap(&mut self.curr_state, &mut self.next_state);
905        std::mem::swap(&mut ic.curr_modified, &mut ic.next_modified);
906
907        if ic.systolic {
908            std::mem::swap(&mut ic.must_be_checked, &mut ic.next_must_be_checked);
909        }
910
911        let mut current_nf_mut = ic.current_nf.lock().unwrap();
912        self.last = *current_nf_mut;
913        // We enforce monotonicity--non-monotonicity can only be caused by
914        // approximation errors
915        let &last_output = self
916            .neighborhood_function
917            .as_slice()
918            .last()
919            .expect("Should always have at least 1 element");
920        if *current_nf_mut < last_output {
921            *current_nf_mut = last_output;
922        }
923        self.relative_increment = *current_nf_mut / last_output;
924
925        pl.info(format_args!(
926            "Pairs: {} ({}%)",
927            *current_nf_mut,
928            (*current_nf_mut * 100.0) / (num_nodes * num_nodes) as f64
929        ));
930        pl.info(format_args!(
931            "Absolute increment: {}",
932            *current_nf_mut - last_output
933        ));
934        pl.info(format_args!(
935            "Relative increment: {}",
936            self.relative_increment
937        ));
938
939        self.neighborhood_function.push(*current_nf_mut);
940
941        ic.iteration += 1;
942
943        Ok(())
944    }
945
946    /// The parallel operations to be performed each iteration.
947    ///
948    /// # Arguments:
949    /// * `graph`: the graph to analyze.
950    /// * `transpose`: optionally, the transpose of `graph`. If [`None`], no
951    ///   systolic iterations will be performed.
952    /// * `curr_state`: the current state of the estimators.
953    /// * `next_state`: the next state of the estimators (to be computed).
954    /// * `ic`: the iteration context.
955    #[allow(clippy::too_many_arguments)]
956    fn parallel_task(
957        graph: &(impl RandomAccessGraph + Sync),
958        transpose: Option<&(impl RandomAccessGraph + Sync)>,
959        curr_state: &impl EstimatorArray<L>,
960        next_state: &impl SyncEstimatorArray<L>,
961        ic: &IterationContext<'_, G1, D>,
962        sum_of_dists: Option<&[SyncCell<f32>]>,
963        sum_of_inv_dists: Option<&[SyncCell<f32>]>,
964        discounted_centralities: &[&[SyncCell<f32>]],
965        _broadcast_context: rayon::BroadcastContext,
966    ) {
967        let node_granularity = ic.arc_granularity;
968        let arc_granularity = ((graph.num_arcs() as f64 * node_granularity as f64)
969            / graph.num_nodes() as f64)
970            .ceil() as usize;
971        let do_centrality = sum_of_dists.is_some()
972            || sum_of_inv_dists.is_some()
973            || !ic.discount_functions.is_empty();
974        let node_upper_limit = if ic.local {
975            ic.local_checklist.len()
976        } else {
977            graph.num_nodes()
978        };
979        let mut visited_arcs = 0;
980        let mut modified_estimators = 0;
981        let arc_upper_limit = graph.num_arcs();
982
983        // During standard iterations, cumulates the neighborhood function for the nodes scanned
984        // by this thread. During systolic iterations, cumulates the *increase* of the
985        // neighborhood function for the nodes scanned by this thread.
986        let mut neighborhood_function_delta = KahanSum::new_with_value(0.0);
987        let mut helper = curr_state.logic().new_helper();
988        let logic = curr_state.logic();
989        let mut next_estimator = logic.new_estimator();
990
991        loop {
992            // Get work
993            let (start, end) = if ic.local {
994                let start = std::cmp::min(
995                    ic.node_cursor.fetch_add(1, Ordering::Relaxed),
996                    node_upper_limit,
997                );
998                let end = std::cmp::min(start + 1, node_upper_limit);
999                (start, end)
1000            } else {
1001                let mut arc_balanced_cursor = ic.arc_cursor.lock().unwrap();
1002                let (mut next_node, mut next_arc) = *arc_balanced_cursor;
1003                if next_node >= node_upper_limit {
1004                    (node_upper_limit, node_upper_limit)
1005                } else {
1006                    let start = next_node;
1007                    let target = next_arc + arc_granularity;
1008                    if target as u64 >= arc_upper_limit {
1009                        next_node = node_upper_limit;
1010                    } else {
1011                        (next_node, next_arc) = ic.cumul_outdeg.succ(target).unwrap();
1012                    }
1013                    let end = next_node;
1014                    *arc_balanced_cursor = (next_node, next_arc);
1015                    (start, end)
1016                }
1017            };
1018
1019            if start == node_upper_limit {
1020                break;
1021            }
1022
1023            // Do work
1024            for i in start..end {
1025                let node = if ic.local { ic.local_checklist[i] } else { i };
1026
1027                let prev_estimator = curr_state.get_backend(node);
1028
1029                // The three cases in which we enumerate successors:
1030                // 1) A non-systolic computation (we don't know anything, so we enumerate).
1031                // 2) A systolic, local computation (the node is by definition to be checked, as it comes from the local check list).
1032                // 3) A systolic, non-local computation in which the node should be checked.
1033                if !ic.systolic || ic.local || ic.must_be_checked[node] {
1034                    next_estimator.set(prev_estimator);
1035                    let mut modified = false;
1036                    for succ in graph.successors(node) {
1037                        if succ != node && ic.curr_modified[succ] {
1038                            visited_arcs += 1;
1039                            if !modified {
1040                                modified = true;
1041                            }
1042                            logic.merge_with_helper(
1043                                next_estimator.as_mut(),
1044                                curr_state.get_backend(succ),
1045                                &mut helper,
1046                            );
1047                        }
1048                    }
1049
1050                    let mut post = f64::NAN;
1051                    let estimator_modified = modified && next_estimator.as_ref() != prev_estimator;
1052
1053                    // We need the estimator value only if the iteration is standard (as we're going to
1054                    // compute the neighborhood function cumulating actual values, and not deltas) or
1055                    // if the estimator was actually modified (as we're going to cumulate the neighborhood
1056                    // function delta, or at least some centrality).
1057                    if !ic.systolic || estimator_modified {
1058                        post = logic.estimate(next_estimator.as_ref())
1059                    }
1060                    if !ic.systolic {
1061                        neighborhood_function_delta += post;
1062                    }
1063
1064                    if estimator_modified && (ic.systolic || do_centrality) {
1065                        let pre = logic.estimate(prev_estimator);
1066                        if ic.systolic {
1067                            neighborhood_function_delta += -pre;
1068                            neighborhood_function_delta += post;
1069                        }
1070
1071                        if do_centrality {
1072                            let delta = post - pre;
1073                            // Note that this code is executed only for distances > 0
1074                            if delta > 0.0 {
1075                                if let Some(distances) = sum_of_dists {
1076                                    let new_value = delta * (ic.iteration + 1) as f64;
1077                                    unsafe {
1078                                        distances[node]
1079                                            .set((distances[node].get() as f64 + new_value) as f32)
1080                                    };
1081                                }
1082                                if let Some(distances) = sum_of_inv_dists {
1083                                    let new_value = delta / (ic.iteration + 1) as f64;
1084                                    unsafe {
1085                                        distances[node]
1086                                            .set((distances[node].get() as f64 + new_value) as f32)
1087                                    };
1088                                }
1089                                for (func, distances) in ic
1090                                    .discount_functions
1091                                    .iter()
1092                                    .zip(discounted_centralities.iter())
1093                                {
1094                                    let new_value = delta * func(ic.iteration + 1);
1095                                    unsafe {
1096                                        distances[node]
1097                                            .set((distances[node].get() as f64 + new_value) as f32)
1098                                    };
1099                                }
1100                            }
1101                        }
1102                    }
1103
1104                    if estimator_modified {
1105                        // We keep track of modified estimators in the result. Note that we must
1106                        // add the current node to the must-be-checked set for the next
1107                        // local iteration if it is modified, as it might need a copy to
1108                        // the result array at the next iteration.
1109                        if ic.pre_local {
1110                            ic.local_next_must_be_checked.lock().unwrap().push(node);
1111                        }
1112                        ic.next_modified.set(node, true, Ordering::Relaxed);
1113
1114                        if ic.systolic {
1115                            debug_assert!(transpose.is_some());
1116                            // In systolic computations we must keep track of
1117                            // which estimators must be checked on the next
1118                            // iteration. If we are preparing a local
1119                            // computation, we do this explicitly, by adding the
1120                            // predecessors of the current node to a set.
1121                            // Otherwise, we do this implicitly, by setting the
1122                            // corresponding entry in an array.
1123
1124                            // SAFETY: ic.systolic is true, so transpose is Some
1125                            let transpose = unsafe { transpose.unwrap_unchecked() };
1126                            if ic.pre_local {
1127                                let mut local_next_must_be_checked =
1128                                    ic.local_next_must_be_checked.lock().unwrap();
1129                                for succ in transpose.successors(node) {
1130                                    local_next_must_be_checked.push(succ);
1131                                }
1132                            } else {
1133                                for succ in transpose.successors(node) {
1134                                    ic.next_must_be_checked.set(succ, true, Ordering::Relaxed);
1135                                }
1136                            }
1137                        }
1138
1139                        modified_estimators += 1;
1140                    }
1141
1142                    unsafe {
1143                        next_state.set(node, next_estimator.as_ref());
1144                    }
1145                } else {
1146                    // Even if we cannot possibly have changed our value, still our copy
1147                    // in the result vector might need to be updated because it does not
1148                    // reflect our current value.
1149                    if ic.curr_modified[node] {
1150                        unsafe {
1151                            next_state.set(node, prev_estimator);
1152                        }
1153                    }
1154                }
1155            }
1156        }
1157
1158        *ic.current_nf.lock().unwrap() += neighborhood_function_delta.sum();
1159        ic.visited_arcs.fetch_add(visited_arcs, Ordering::Relaxed);
1160        ic.modified_estimators
1161            .fetch_add(modified_estimators, Ordering::Relaxed);
1162    }
1163
1164    /// Initializes HyperBall.
1165    fn init(
1166        &mut self,
1167        thread_pool: &ThreadPool,
1168        mut rng: impl rand::Rng,
1169        pl: &mut impl ConcurrentProgressLog,
1170    ) -> Result<()> {
1171        pl.start("Initializing estimators");
1172        pl.info(format_args!("Clearing all registers"));
1173
1174        self.curr_state.clear();
1175        self.next_state.clear();
1176
1177        pl.info(format_args!("Initializing registers"));
1178        if let Some(w) = &self.weight {
1179            pl.info(format_args!("Loading weights"));
1180            for (i, &node_weight) in w.iter().enumerate() {
1181                let mut estimator = self.curr_state.get_estimator_mut(i);
1182                for _ in 0..node_weight {
1183                    estimator.add(&(rng.random::<u64>() as usize));
1184                }
1185            }
1186        } else {
1187            (0..self.graph.num_nodes()).for_each(|i| {
1188                self.curr_state.get_estimator_mut(i).add(i);
1189            });
1190        }
1191
1192        self.completed = false;
1193
1194        let ic = &mut self.iteration_context;
1195        ic.iteration = 0;
1196        ic.systolic = false;
1197        ic.local = false;
1198        ic.pre_local = false;
1199        ic.reset(self.granularity);
1200
1201        pl.debug(format_args!("Initializing distances"));
1202        if let Some(distances) = &mut self.sum_of_dists {
1203            distances.fill(0.0);
1204        }
1205        if let Some(distances) = &mut self.sum_of_inv_dists {
1206            distances.fill(0.0);
1207        }
1208        pl.debug(format_args!("Initializing centralities"));
1209        for centralities in self.discounted_centralities.iter_mut() {
1210            centralities.fill(0.0);
1211        }
1212
1213        self.last = self.graph.num_nodes() as f64;
1214        pl.debug(format_args!("Initializing neighborhood function"));
1215        self.neighborhood_function.clear();
1216        self.neighborhood_function.push(self.last);
1217
1218        pl.debug(format_args!("Initializing modified estimators"));
1219        thread_pool.install(|| ic.curr_modified.fill(true, Ordering::Relaxed));
1220
1221        pl.done();
1222
1223        Ok(())
1224    }
1225}
1226
1227#[cfg(test)]
1228mod test {
1229    use std::hash::{BuildHasherDefault, DefaultHasher};
1230
1231    use super::*;
1232    use card_est_array::traits::{EstimatorArray, MergeEstimator};
1233    use dsi_progress_logger::no_logging;
1234    use epserde::deser::{Deserialize, Flags};
1235    use rand::SeedableRng;
1236    use webgraph::{
1237        prelude::{BvGraph, DCF},
1238        traits::SequentialLabeling,
1239    };
1240
1241    type HyperBallArray<G> = SliceEstimatorArray<
1242        HyperLogLog<<G as SequentialLabeling>::Label, BuildHasherDefault<DefaultHasher>, usize>,
1243        usize,
1244        Box<[usize]>,
1245    >;
1246
1247    struct SeqHyperBall<'a, G: RandomAccessGraph> {
1248        graph: &'a G,
1249        curr_state: HyperBallArray<G>,
1250        next_state: HyperBallArray<G>,
1251    }
1252
1253    impl<G: RandomAccessGraph> SeqHyperBall<'_, G> {
1254        fn init(&mut self) {
1255            for i in 0..self.graph.num_nodes() {
1256                self.curr_state.get_estimator_mut(i).add(i);
1257            }
1258        }
1259
1260        fn iterate(&mut self) {
1261            for i in 0..self.graph.num_nodes() {
1262                let mut estimator = self.next_state.get_estimator_mut(i);
1263                estimator.set(self.curr_state.get_backend(i));
1264                for succ in self.graph.successors(i) {
1265                    estimator.merge(self.curr_state.get_backend(succ));
1266                }
1267            }
1268            std::mem::swap(&mut self.curr_state, &mut self.next_state);
1269        }
1270    }
1271
1272    #[cfg_attr(feature = "slow_tests", test)]
1273    #[cfg_attr(not(feature = "slow_tests"), allow(dead_code))]
1274    fn test_cnr_2000() -> Result<()> {
1275        let basename = "../data/cnr-2000";
1276
1277        let graph = BvGraph::with_basename(basename).load()?;
1278        let transpose = BvGraph::with_basename(basename.to_owned() + "-t").load()?;
1279        let cumulative = DCF::load_mmap(basename.to_owned() + ".dcf", Flags::empty())?;
1280
1281        let num_nodes = graph.num_nodes();
1282
1283        let hyper_log_log = HyperLogLogBuilder::new(num_nodes)
1284            .log_2_num_reg(6)
1285            .build()?;
1286
1287        let seq_bits = SliceEstimatorArray::new(hyper_log_log.clone(), num_nodes);
1288        let seq_result_bits = SliceEstimatorArray::new(hyper_log_log.clone(), num_nodes);
1289        let par_bits = SliceEstimatorArray::new(hyper_log_log.clone(), num_nodes);
1290        let par_result_bits = SliceEstimatorArray::new(hyper_log_log.clone(), num_nodes);
1291
1292        let mut hyperball = HyperBallBuilder::with_transpose(
1293            &graph,
1294            &transpose,
1295            cumulative.as_ref(),
1296            par_bits,
1297            par_result_bits,
1298        )
1299        .build(no_logging![]);
1300        let mut seq_hyperball = SeqHyperBall {
1301            curr_state: seq_bits,
1302            next_state: seq_result_bits,
1303            graph: &graph,
1304        };
1305
1306        let mut modified_estimators = num_nodes as u64;
1307        let threads = thread_pool![];
1308        let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
1309        hyperball.init(&threads, &mut rng, no_logging![])?;
1310        seq_hyperball.init();
1311
1312        while modified_estimators != 0 {
1313            hyperball.iterate(&threads, no_logging![])?;
1314            seq_hyperball.iterate();
1315
1316            modified_estimators = hyperball
1317                .iteration_context
1318                .modified_estimators
1319                .load(Ordering::Relaxed);
1320
1321            assert_eq!(
1322                hyperball.next_state.as_ref(),
1323                seq_hyperball.next_state.as_ref()
1324            );
1325            assert_eq!(
1326                hyperball.curr_state.as_ref(),
1327                seq_hyperball.curr_state.as_ref()
1328            );
1329        }
1330
1331        Ok(())
1332    }
1333}