webgraph_cli/transform/
transpose.rs

1/*
2 * SPDX-FileCopyrightText: 2023 Inria
3 * SPDX-FileCopyrightText: 2023 Tommaso Fontana
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8use crate::*;
9use anyhow::Result;
10use dsi_bitstream::dispatch::factory::CodesReaderFactoryHelper;
11use dsi_bitstream::prelude::*;
12use std::io::BufReader;
13use std::path::PathBuf;
14use tempfile::Builder;
15use webgraph::prelude::*;
16
17#[derive(Parser, Debug)]
18#[command(name = "transpose", about = "Transposes a BvGraph.", long_about = None)]
19pub struct CliArgs {
20    /// The basename of the graph.
21    pub src: PathBuf,
22    /// The basename of the transposed graph.
23    pub dst: PathBuf,
24
25    #[arg(short, long)]
26    /// Use the parallel compressor.
27    pub parallel: bool,
28
29    #[clap(flatten)]
30    pub num_threads: NumThreadsArg,
31
32    #[clap(flatten)]
33    pub memory_usage: MemoryUsageArg,
34
35    #[clap(flatten)]
36    pub ca: CompressArgs,
37}
38
39pub fn main(global_args: GlobalArgs, args: CliArgs) -> Result<()> {
40    create_parent_dir(&args.dst)?;
41
42    match get_endianness(&args.src)?.as_str() {
43        #[cfg(feature = "be_bins")]
44        BE::NAME => {
45            if args.parallel {
46                par_transpose::<BE>(global_args, args)
47            } else {
48                transpose::<BE>(global_args, args)
49            }
50        }
51        #[cfg(feature = "le_bins")]
52        LE::NAME => {
53            if args.parallel {
54                par_transpose::<LE>(global_args, args)
55            } else {
56                transpose::<LE>(global_args, args)
57            }
58        }
59        e => panic!("Unknown endianness: {}", e),
60    }
61}
62
63pub fn transpose<E: Endianness>(_global_args: GlobalArgs, args: CliArgs) -> Result<()>
64where
65    MmapHelper<u32>: CodesReaderFactoryHelper<E>,
66{
67    let thread_pool = crate::get_thread_pool(args.num_threads.num_threads);
68
69    // TODO!: speed it up by using random access graph if possible
70    let seq_graph = webgraph::graphs::bvgraph::sequential::BvGraphSeq::with_basename(&args.src)
71        .endianness::<E>()
72        .load()?;
73
74    // transpose the graph
75    let sorted =
76        webgraph::transform::transpose(&seq_graph, args.memory_usage.memory_usage).unwrap();
77
78    let target_endianness = args.ca.endianness.clone();
79    let dir = Builder::new().prefix("transform_transpose_").tempdir()?;
80    BvComp::parallel_endianness(
81        &args.dst,
82        &sorted,
83        sorted.num_nodes(),
84        args.ca.into(),
85        &thread_pool,
86        dir,
87        &target_endianness.unwrap_or_else(|| E::NAME.into()),
88    )?;
89
90    Ok(())
91}
92
93pub fn par_transpose<E: Endianness>(_global_args: GlobalArgs, args: CliArgs) -> Result<()>
94where
95    MmapHelper<u32>: CodesReaderFactoryHelper<E>,
96    for<'a> <MmapHelper<u32> as CodesReaderFactory<E>>::CodesReader<'a>:
97        BitSeek + Clone + Send + Sync,
98    BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
99    BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
100{
101    let thread_pool = crate::get_thread_pool(args.num_threads.num_threads);
102
103    let seq_graph = webgraph::graphs::bvgraph::BvGraph::with_basename(&args.src)
104        .endianness::<E>()
105        .load()?;
106
107    // transpose the graph
108    let split = webgraph::transform::transpose_split(&seq_graph, args.memory_usage.memory_usage)?;
109
110    // Convert to (node, lender) pairs
111    let pairs: Vec<_> = split.into();
112
113    let dir = Builder::new().prefix("transform_transpose_").tempdir()?;
114    BvComp::parallel_iter::<E, _>(
115        &args.dst,
116        pairs.into_iter(),
117        seq_graph.num_nodes(),
118        args.ca.into(),
119        &thread_pool,
120        dir,
121    )?;
122    Ok(())
123}