Skip to main content

webgraph_cli/transform/
transpose.rs

1/*
2 * SPDX-FileCopyrightText: 2023 Inria
3 * SPDX-FileCopyrightText: 2023 Tommaso Fontana
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8use crate::*;
9use anyhow::Result;
10use dsi_bitstream::dispatch::factory::CodesReaderFactoryHelper;
11use dsi_bitstream::prelude::*;
12use std::io::BufReader;
13use std::path::PathBuf;
14use tempfile::Builder;
15use webgraph::prelude::*;
16
17#[derive(Parser, Debug)]
18#[command(name = "transpose", about = "Transposes a BvGraph.", long_about = None)]
19pub struct CliArgs {
20    /// The basename of the graph.
21    pub src: PathBuf,
22    /// The basename of the transposed graph.
23    pub dst: PathBuf,
24
25    #[arg(short, long)]
26    /// Use the parallel compressor.
27    pub parallel: bool,
28
29    #[clap(flatten)]
30    pub num_threads: NumThreadsArg,
31
32    #[clap(flatten)]
33    pub memory_usage: MemoryUsageArg,
34
35    #[clap(flatten)]
36    pub ca: CompressArgs,
37}
38
39pub fn main(global_args: GlobalArgs, args: CliArgs) -> Result<()> {
40    create_parent_dir(&args.dst)?;
41
42    match get_endianness(&args.src)?.as_str() {
43        #[cfg(feature = "be_bins")]
44        BE::NAME => {
45            if args.parallel {
46                par_transpose::<BE>(global_args, args)
47            } else {
48                transpose::<BE>(global_args, args)
49            }
50        }
51        #[cfg(feature = "le_bins")]
52        LE::NAME => {
53            if args.parallel {
54                par_transpose::<LE>(global_args, args)
55            } else {
56                transpose::<LE>(global_args, args)
57            }
58        }
59        e => panic!("Unknown endianness: {}", e),
60    }
61}
62
63pub fn transpose<E: Endianness>(_global_args: GlobalArgs, args: CliArgs) -> Result<()>
64where
65    MmapHelper<u32>: CodesReaderFactoryHelper<E>,
66{
67    let thread_pool = crate::get_thread_pool(args.num_threads.num_threads);
68
69    // TODO!: speed it up by using random access graph if possible
70    let seq_graph = webgraph::graphs::bvgraph::sequential::BvGraphSeq::with_basename(&args.src)
71        .endianness::<E>()
72        .load()?;
73
74    // transpose the graph
75    let sorted = webgraph::transform::transpose(&seq_graph, args.memory_usage.memory_usage)?;
76
77    let target_endianness = args.ca.endianness.clone();
78    let dir = Builder::new().prefix("transform_transpose_").tempdir()?;
79    let chunk_size = args.ca.chunk_size;
80    let bvgraphz = args.ca.bvgraphz;
81    let mut builder = BvCompConfig::new(&args.dst)
82        .with_comp_flags(args.ca.into())
83        .with_tmp_dir(&dir);
84
85    if bvgraphz {
86        builder = builder.with_chunk_size(chunk_size);
87    }
88
89    thread_pool.install(|| {
90        builder.par_comp_lenders_endianness(
91            &sorted,
92            sorted.num_nodes(),
93            &target_endianness.unwrap_or_else(|| BE::NAME.into()),
94        )
95    })?;
96
97    Ok(())
98}
99
100pub fn par_transpose<E: Endianness>(_global_args: GlobalArgs, args: CliArgs) -> Result<()>
101where
102    MmapHelper<u32>: CodesReaderFactoryHelper<E>,
103    for<'a> <MmapHelper<u32> as CodesReaderFactory<E>>::CodesReader<'a>:
104        BitSeek + Clone + Send + Sync,
105    BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
106    BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
107{
108    let thread_pool = crate::get_thread_pool(args.num_threads.num_threads);
109
110    let seq_graph = webgraph::graphs::bvgraph::BvGraph::with_basename(&args.src)
111        .endianness::<E>()
112        .load()?;
113
114    // transpose the graph
115    let split = webgraph::transform::transpose_split(&seq_graph, args.memory_usage.memory_usage)?;
116
117    // Convert to (node, lender) pairs
118    let pairs: Vec<_> = split.into();
119
120    let dir = Builder::new().prefix("transform_transpose_").tempdir()?;
121    let chunk_size = args.ca.chunk_size;
122    let bvgraphz = args.ca.bvgraphz;
123    let mut builder = BvCompConfig::new(&args.dst)
124        .with_comp_flags(args.ca.into())
125        .with_tmp_dir(&dir);
126
127    if bvgraphz {
128        builder = builder.with_chunk_size(chunk_size);
129    }
130
131    thread_pool
132        .install(|| builder.par_comp_lenders::<E, _>(pairs.into_iter(), seq_graph.num_nodes()))?;
133    Ok(())
134}