webgraph 0.6.1

A Rust port of the WebGraph framework (http://webgraph.di.unimi.it/).
Documentation
/*
 * SPDX-FileCopyrightText: 2023 Inria
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
 *
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
 */

use std::path::PathBuf;

use super::*;
use crate::utils::CircularBuffer;
use anyhow::Result;
use bitflags::Flags;
use dsi_bitstream::codes::ToInt;
use dsi_bitstream::traits::BE;
use dsi_bitstream::traits::BitSeek;
use lender::*;

/// A sequential graph in the BV format.
///
/// The graph depends on a [`SequentialDecoderFactory`] that can be used to
/// create decoders for the graph. This allows to decouple the graph from the
/// underlying storage format, and to use different storage formats for the same
/// graph. For example, one can use a memory-mapped file for the graph, or load
/// it in memory, or even generate it on the fly.
///
/// Note that the knowledge of the codes used by the graph is in the factory,
/// which provides methods to read each component of the BV format (outdegree,
/// reference offset, block count, etc.), whereas the knowledge of the
/// compression parameters (compression window and minimum interval length) is
/// in this structure.
#[derive(Debug, Clone)]
pub struct BvGraphSeq<F> {
    factory: F,
    number_of_nodes: usize,
    number_of_arcs: Option<u64>,
    compression_window: usize,
    min_interval_length: usize,
}

impl BvGraphSeq<()> {
    pub fn with_basename(
        basename: impl AsRef<std::path::Path>,
    ) -> LoadConfig<BE, Sequential, Dynamic, Mmap, Mmap> {
        LoadConfig {
            basename: PathBuf::from(basename.as_ref()),
            graph_load_flags: Flags::empty(),
            offsets_load_flags: Flags::empty(),
            _marker: std::marker::PhantomData,
        }
    }
}

impl<F: SequentialDecoderFactory> SplitLabeling for BvGraphSeq<F>
where
    for<'a> <F as SequentialDecoderFactory>::Decoder<'a>: Clone + Send + Sync,
{
    type SplitLender<'a>
        = split::seq::Lender<'a, BvGraphSeq<F>>
    where
        Self: 'a;
    type IntoIterator<'a>
        = split::seq::IntoIterator<'a, BvGraphSeq<F>>
    where
        Self: 'a;

    fn split_iter(&self, how_many: usize) -> Self::IntoIterator<'_> {
        split::seq::Iter::new(self.iter(), self.num_nodes(), how_many)
    }
}

impl<F: SequentialDecoderFactory> SequentialLabeling for BvGraphSeq<F> {
    type Label = usize;
    type Lender<'a>
        = NodeLabels<F::Decoder<'a>>
    where
        Self: 'a;

    /// Returns the number of nodes in the graph.
    #[inline(always)]
    fn num_nodes(&self) -> usize {
        self.number_of_nodes
    }

    #[inline(always)]
    fn num_arcs_hint(&self) -> Option<u64> {
        self.number_of_arcs
    }

    #[inline(always)]
    fn iter_from(&self, from: usize) -> Self::Lender<'_> {
        let mut iter = NodeLabels::new(
            self.factory.new_decoder().unwrap(),
            self.number_of_nodes,
            self.compression_window,
            self.min_interval_length,
        );

        let _ = iter.advance_by(from);

        iter
    }

    fn build_dcf(&self) -> DCF {
        let n = self.num_nodes();
        let num_arcs = self
            .num_arcs_hint()
            .expect("build_dcf requires num_arcs_hint()") as usize;
        let mut efb = sux::dict::EliasFanoBuilder::new(n + 1, num_arcs);
        efb.push(0);
        let mut cumul_deg = 0usize;
        let mut iter = OffsetDegIter::new(
            self.factory.new_decoder().unwrap(),
            n,
            self.compression_window,
            self.min_interval_length,
        );
        for _ in 0..n {
            cumul_deg += iter.next_degree().unwrap();
            efb.push(cumul_deg);
        }
        unsafe {
            efb.build().map_high_bits(|high_bits| {
                sux::rank_sel::SelectZeroAdaptConst::<_, _, 12, 4>::new(
                    sux::rank_sel::SelectAdaptConst::<_, _, 12, 4>::new(high_bits),
                )
            })
        }
    }
}

impl<F: SequentialDecoderFactory> SequentialGraph for BvGraphSeq<F> {}

/// Convenience implementation that makes it possible to iterate
/// over the graph using the [`for_`] macro
/// (see the [crate documentation](crate)).
impl<'a, F: SequentialDecoderFactory> IntoLender for &'a BvGraphSeq<F> {
    type Lender = <BvGraphSeq<F> as SequentialLabeling>::Lender<'a>;

    #[inline(always)]
    fn into_lender(self) -> Self::Lender {
        self.iter()
    }
}

impl<F: SequentialDecoderFactory> BvGraphSeq<F> {
    /// Creates a new sequential graph from a codes reader builder
    /// and the number of nodes.
    pub fn new(
        codes_reader_builder: F,
        number_of_nodes: usize,
        number_of_arcs: Option<u64>,
        compression_window: usize,
        min_interval_length: usize,
    ) -> Self {
        Self {
            factory: codes_reader_builder,
            number_of_nodes,
            number_of_arcs,
            compression_window,
            min_interval_length,
        }
    }

    #[inline(always)]
    pub fn map_factory<F1, F0>(self, map_func: F0) -> BvGraphSeq<F1>
    where
        F0: FnOnce(F) -> F1,
        F1: SequentialDecoderFactory,
    {
        BvGraphSeq {
            factory: map_func(self.factory),
            number_of_nodes: self.number_of_nodes,
            number_of_arcs: self.number_of_arcs,
            compression_window: self.compression_window,
            min_interval_length: self.min_interval_length,
        }
    }

    /// Consumes self and returns the factory.
    #[inline(always)]
    pub fn into_inner(self) -> F {
        self.factory
    }
}

impl<F: SequentialDecoderFactory> BvGraphSeq<F> {
    /// Creates an iterator specialized in the degrees of the nodes.
    /// This is slightly faster because it can avoid decoding some of
    /// the nodes and completely skip the merging step.
    #[inline(always)]
    pub fn offset_deg_iter(&self) -> OffsetDegIter<F::Decoder<'_>> {
        OffsetDegIter::new(
            self.factory.new_decoder().unwrap(),
            self.number_of_nodes,
            self.compression_window,
            self.min_interval_length,
        )
    }
}

/// A fast sequential iterator over the nodes of the graph and their successors.
/// This iterator does not require to know the offsets of each node in the graph.
#[derive(Debug, Clone)]
pub struct NodeLabels<D: Decode> {
    pub(crate) number_of_nodes: usize,
    pub(crate) compression_window: usize,
    pub(crate) min_interval_length: usize,
    pub(crate) decoder: D,
    pub(crate) backrefs: CircularBuffer<Vec<usize>>,
    pub(crate) current_node: usize,
}

impl<D: Decode + BitSeek> NodeLabels<D> {
    /// Forwards the call of `get_pos` to the inner `codes_reader`.
    /// This returns the current bits offset in the bitstream.
    #[inline(always)]
    pub fn bit_pos(&mut self) -> Result<u64, <D as BitSeek>::Error> {
        self.decoder.bit_pos()
    }
}

impl<D: Decode> NodeLabels<D> {
    /// Creates a new iterator from a codes reader
    pub fn new(
        decoder: D,
        number_of_nodes: usize,
        compression_window: usize,
        min_interval_length: usize,
    ) -> Self {
        Self {
            number_of_nodes,
            compression_window,
            min_interval_length,
            decoder,
            backrefs: CircularBuffer::new(compression_window + 1),
            current_node: 0,
        }
    }

    /// Get the successors of the next node in the stream
    pub fn next_successors(&mut self) -> Result<&[usize]> {
        let mut res = self.backrefs.take(self.current_node);
        self.get_successors_iter_priv(self.current_node, &mut res)?;
        let res = self.backrefs.replace(self.current_node, res);
        self.current_node += 1;
        Ok(res)
    }

    /// Inner method called by `next_successors` and the iterator `next` method
    fn get_successors_iter_priv(&mut self, node_id: usize, results: &mut Vec<usize>) -> Result<()> {
        let degree = self.decoder.read_outdegree() as usize;
        // no edges, we are done!
        if degree == 0 {
            return Ok(());
        }

        results.clear();
        // ensure that we have enough capacity in the vector for not reallocating
        results.reserve(degree);
        // read the reference offset
        let ref_delta = if self.compression_window != 0 {
            self.decoder.read_reference_offset() as usize
        } else {
            0
        };
        // if we copy nodes from a previous one
        if ref_delta != 0 {
            // compute the node id of the reference
            let reference_node_id = node_id - ref_delta;
            // retrieve the data
            let successors = &self.backrefs[reference_node_id];
            //debug_assert!(!neighbours.is_empty());
            // get the info on which destinations to copy
            let number_of_blocks = self.decoder.read_block_count() as usize;
            // no blocks, we copy everything
            if number_of_blocks == 0 {
                results.extend_from_slice(successors);
            } else {
                // otherwise we copy only the blocks of even index
                // the first block could be zero
                let mut idx = self.decoder.read_block() as usize;
                results.extend_from_slice(&successors[..idx]);

                // while the other can't
                for block_id in 1..number_of_blocks {
                    let block = self.decoder.read_block() as usize;
                    let end = idx + block + 1;
                    if block_id % 2 == 0 {
                        results.extend_from_slice(&successors[idx..end]);
                    }
                    idx = end;
                }
                if number_of_blocks & 1 == 0 {
                    results.extend_from_slice(&successors[idx..]);
                }
            }
        };

        // if we still have to read nodes
        let nodes_left_to_decode = degree - results.len();
        if nodes_left_to_decode != 0 && self.min_interval_length != 0 {
            // read the number of intervals
            let number_of_intervals = self.decoder.read_interval_count() as usize;
            if number_of_intervals != 0 {
                // pre-allocate with capacity for efficiency
                let node_id_offset = self.decoder.read_interval_start().to_int();
                let mut start = (node_id as i64 + node_id_offset) as usize;
                let mut delta = self.decoder.read_interval_len() as usize;
                delta += self.min_interval_length;
                // save the first interval
                results.extend(start..(start + delta));
                start += delta;
                // decode the intervals
                for _ in 1..number_of_intervals {
                    start += 1 + self.decoder.read_interval_start() as usize;
                    delta = self.decoder.read_interval_len() as usize;
                    delta += self.min_interval_length;

                    results.extend(start..(start + delta));

                    start += delta;
                }
            }
        }

        // decode the extra nodes if needed
        let nodes_left_to_decode = degree - results.len();
        self.decoder.num_of_residuals(nodes_left_to_decode);
        if nodes_left_to_decode != 0 {
            // pre-allocate with capacity for efficiency
            let node_id_offset = self.decoder.read_first_residual().to_int();
            let mut extra = (node_id as i64 + node_id_offset) as usize;
            results.push(extra);
            // decode the successive extra nodes
            for _ in 1..nodes_left_to_decode {
                extra += 1 + self.decoder.read_residual() as usize;
                results.push(extra);
            }
        }

        results.sort();
        Ok(())
    }
}

impl<'succ, D: Decode> NodeLabelsLender<'succ> for NodeLabels<D> {
    type Label = usize;
    type IntoIterator = crate::traits::labels::AssumeSortedIterator<
        std::iter::Copied<std::slice::Iter<'succ, Self::Label>>,
    >;
}

impl<'succ, D: Decode> Lending<'succ> for NodeLabels<D> {
    type Lend = (usize, <Self as NodeLabelsLender<'succ>>::IntoIterator);
}

impl<D: Decode> Lender for NodeLabels<D> {
    check_covariance!();

    fn next(&mut self) -> Option<Lend<'_, Self>> {
        if self.current_node >= self.number_of_nodes {
            return None;
        }
        let mut res = self.backrefs.take(self.current_node);
        res.clear();
        self.get_successors_iter_priv(self.current_node, &mut res)
            .unwrap();

        let res = self.backrefs.replace(self.current_node, res);
        let node_id = self.current_node;
        self.current_node += 1;
        Some((node_id, unsafe {
            crate::traits::labels::AssumeSortedIterator::new(res.iter().copied())
        }))
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        let len = self.len();
        (len, Some(len))
    }
}

unsafe impl<D: Decode> SortedLender for NodeLabels<D> {}

impl<D: Decode> ExactSizeLender for NodeLabels<D> {
    #[inline(always)]
    fn len(&self) -> usize {
        self.number_of_nodes - self.current_node
    }
}