swh_graph/compress/
stats.rs

1// Copyright (C) 2023-2024  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6//! Iterator on the set of all arcs in an ORC dataset
7
8use std::path::PathBuf;
9
10use anyhow::Result;
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use ar_row_derive::ArRowDeserialize;
13use orc_rust::arrow_reader::ArrowReaderBuilder;
14use orc_rust::projection::ProjectionMask;
15use orc_rust::reader::ChunkReader;
16use rayon::prelude::*;
17
18use super::orc::{get_dataset_readers, iter_arrow};
19use crate::NodeType;
20
21fn count_arrow_rows<R: ChunkReader>(reader_builder: ArrowReaderBuilder<R>) -> u64 {
22    let empty_mask = ProjectionMask::roots(reader_builder.file_metadata().root_data_type(), []); // Don't need to read any column
23    let reader = reader_builder.with_projection(empty_mask).build();
24    reader.total_row_count()
25}
26
27pub fn estimate_node_count(dataset_dir: &PathBuf, allowed_node_types: &[NodeType]) -> Result<u64> {
28    let mut readers = Vec::new();
29    if allowed_node_types.contains(&NodeType::Directory) {
30        readers.extend(get_dataset_readers(dataset_dir, "directory")?);
31    }
32    if allowed_node_types.contains(&NodeType::Content) {
33        readers.extend(get_dataset_readers(dataset_dir, "content")?);
34    }
35    if allowed_node_types.contains(&NodeType::Origin) {
36        readers.extend(get_dataset_readers(dataset_dir, "origin")?);
37    }
38    if allowed_node_types.contains(&NodeType::Release) {
39        readers.extend(get_dataset_readers(dataset_dir, "release")?);
40    }
41    if allowed_node_types.contains(&NodeType::Revision) {
42        readers.extend(get_dataset_readers(dataset_dir, "revision")?);
43    }
44    if allowed_node_types.contains(&NodeType::Snapshot) {
45        readers.extend(get_dataset_readers(dataset_dir, "snapshot")?);
46    }
47    Ok(readers.into_par_iter().map(count_arrow_rows).sum())
48}
49
50pub fn estimate_edge_count(dataset_dir: &PathBuf, allowed_node_types: &[NodeType]) -> Result<u64> {
51    let mut readers = Vec::new();
52    if allowed_node_types.contains(&NodeType::Directory) {
53        readers.extend(get_dataset_readers(dataset_dir, "directory_entry")?)
54    }
55    if allowed_node_types.contains(&NodeType::Origin) {
56        readers.extend(get_dataset_readers(
57            // Count the source...
58            dataset_dir.clone(),
59            "origin_visit_status",
60        )?);
61        readers.extend(get_dataset_readers(
62            // ... and destination of each arc
63            dataset_dir.clone(),
64            "origin_visit_status",
65        )?);
66    }
67    if allowed_node_types.contains(&NodeType::Release) {
68        readers.extend(get_dataset_readers(dataset_dir, "release")?);
69    }
70    if allowed_node_types.contains(&NodeType::Revision) {
71        readers.extend(get_dataset_readers(dataset_dir, "revision")?);
72        readers.extend(get_dataset_readers(dataset_dir, "revision_history")?);
73    }
74    if allowed_node_types.contains(&NodeType::Snapshot) {
75        readers.extend(get_dataset_readers(dataset_dir, "snapshot_branch")?);
76    }
77    Ok(readers.into_par_iter().map(count_arrow_rows).sum())
78}
79
80type EdgeStats = [[usize; NodeType::NUMBER_OF_TYPES]; NodeType::NUMBER_OF_TYPES];
81
82pub fn count_edge_types(
83    dataset_dir: &PathBuf,
84    allowed_node_types: &[NodeType],
85) -> Result<impl ParallelIterator<Item = EdgeStats>> {
86    let maybe_get_dataset_readers = |dataset_dir, subdirectory, node_type| {
87        if allowed_node_types.contains(&node_type) {
88            get_dataset_readers(dataset_dir, subdirectory)
89        } else {
90            Ok(Vec::new())
91        }
92    };
93
94    Ok([]
95        .into_par_iter()
96        .chain(
97            maybe_get_dataset_readers(dataset_dir, "directory_entry", NodeType::Directory)?
98                .into_par_iter()
99                .map(count_edge_types_from_dir),
100        )
101        .chain(
102            maybe_get_dataset_readers(dataset_dir, "origin_visit_status", NodeType::Origin)?
103                .into_par_iter()
104                .map(count_edge_types_from_ovs),
105        )
106        .chain(
107            maybe_get_dataset_readers(dataset_dir, "release", NodeType::Release)?
108                .into_par_iter()
109                .map(count_edge_types_from_rel),
110        )
111        .chain(
112            maybe_get_dataset_readers(dataset_dir, "revision", NodeType::Revision)?
113                .into_par_iter()
114                .map(count_dir_edge_types_from_rev),
115        )
116        .chain(
117            maybe_get_dataset_readers(dataset_dir, "revision_history", NodeType::Revision)?
118                .into_par_iter()
119                .map(count_parent_edge_types_from_rev),
120        )
121        .chain(
122            maybe_get_dataset_readers(dataset_dir, "snapshot_branch", NodeType::Snapshot)?
123                .into_par_iter()
124                .map(count_edge_types_from_snp),
125        ))
126}
127
128fn for_each_edge<T, F, R: ChunkReader + Send>(reader_builder: ArrowReaderBuilder<R>, mut f: F)
129where
130    F: FnMut(T) + Send + Sync,
131    T: ArRowDeserialize + ArRowStruct + Send,
132{
133    iter_arrow(reader_builder, move |record: T| -> [(); 0] {
134        f(record);
135        []
136    })
137    .count();
138}
139
140fn inc(stats: &mut EdgeStats, src_type: NodeType, dst_type: NodeType) {
141    stats[src_type as usize][dst_type as usize] += 1;
142}
143
144fn count_edge_types_from_dir<R: ChunkReader + Send>(
145    reader_builder: ArrowReaderBuilder<R>,
146) -> EdgeStats {
147    let mut stats = EdgeStats::default();
148
149    #[derive(ArRowDeserialize, Default, Clone)]
150    struct DirectoryEntry {
151        r#type: String,
152    }
153
154    for_each_edge(reader_builder, |entry: DirectoryEntry| {
155        match entry.r#type.as_bytes() {
156            b"file" => {
157                inc(&mut stats, NodeType::Directory, NodeType::Content);
158            }
159            b"dir" => {
160                inc(&mut stats, NodeType::Directory, NodeType::Directory);
161            }
162            b"rev" => {
163                inc(&mut stats, NodeType::Directory, NodeType::Revision);
164            }
165            _ => panic!("Unexpected directory entry type: {:?}", entry.r#type),
166        }
167    });
168
169    stats
170}
171
172fn count_edge_types_from_ovs<R: ChunkReader + Send>(
173    reader_builder: ArrowReaderBuilder<R>,
174) -> EdgeStats {
175    let mut stats = EdgeStats::default();
176
177    #[derive(ArRowDeserialize, Default, Clone)]
178    struct OriginVisitStatus {
179        snapshot: Option<String>,
180    }
181
182    for_each_edge(reader_builder, |ovs: OriginVisitStatus| {
183        if ovs.snapshot.is_some() {
184            inc(&mut stats, NodeType::Origin, NodeType::Snapshot)
185        }
186    });
187
188    stats
189}
190
191fn count_dir_edge_types_from_rev<R: ChunkReader + Send>(
192    reader_builder: ArrowReaderBuilder<R>,
193) -> EdgeStats {
194    let mut stats = EdgeStats::default();
195
196    stats[NodeType::Revision as usize][NodeType::Directory as usize] +=
197        count_arrow_rows(reader_builder) as usize;
198
199    stats
200}
201
202fn count_parent_edge_types_from_rev<R: ChunkReader + Send>(
203    reader_builder: ArrowReaderBuilder<R>,
204) -> EdgeStats {
205    let mut stats = EdgeStats::default();
206
207    stats[NodeType::Revision as usize][NodeType::Revision as usize] +=
208        count_arrow_rows(reader_builder) as usize;
209
210    stats
211}
212
213fn count_edge_types_from_rel<R: ChunkReader + Send>(
214    reader_builder: ArrowReaderBuilder<R>,
215) -> EdgeStats {
216    let mut stats = EdgeStats::default();
217    #[derive(ArRowDeserialize, Default, Clone)]
218    struct Release {
219        target_type: String,
220    }
221
222    for_each_edge(reader_builder, |entry: Release| {
223        match entry.target_type.as_bytes() {
224            b"content" => {
225                inc(&mut stats, NodeType::Release, NodeType::Content);
226            }
227            b"directory" => {
228                inc(&mut stats, NodeType::Release, NodeType::Directory);
229            }
230            b"revision" => {
231                inc(&mut stats, NodeType::Release, NodeType::Revision);
232            }
233            b"release" => {
234                inc(&mut stats, NodeType::Release, NodeType::Release);
235            }
236            _ => panic!("Unexpected directory entry type: {:?}", entry.target_type),
237        }
238    });
239
240    stats
241}
242
243fn count_edge_types_from_snp<R: ChunkReader + Send>(
244    reader_builder: ArrowReaderBuilder<R>,
245) -> EdgeStats {
246    let mut stats = EdgeStats::default();
247
248    #[derive(ArRowDeserialize, Default, Clone)]
249    struct SnapshotBranch {
250        target_type: String,
251    }
252
253    for_each_edge(reader_builder, |branch: SnapshotBranch| {
254        match branch.target_type.as_bytes() {
255            b"content" => {
256                inc(&mut stats, NodeType::Snapshot, NodeType::Content);
257            }
258            b"directory" => {
259                inc(&mut stats, NodeType::Snapshot, NodeType::Directory);
260            }
261            b"revision" => {
262                inc(&mut stats, NodeType::Snapshot, NodeType::Revision);
263            }
264            b"release" => {
265                inc(&mut stats, NodeType::Snapshot, NodeType::Release);
266            }
267            b"alias" => {}
268            _ => panic!("Unexpected snapshot branch type: {:?}", branch.target_type),
269        }
270    });
271
272    stats
273}