swh_graph/compress/
iter_swhids.rs

1// Copyright (C) 2023-2024  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6//! Iterator on the set of all arcs in an ORC dataset
7
8use std::path::PathBuf;
9
10use anyhow::Result;
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use ar_row_derive::ArRowDeserialize;
13use orc_rust::arrow_reader::ArrowReaderBuilder;
14use orc_rust::reader::ChunkReader;
15use rayon::prelude::*;
16
17use super::iter_arcs::iter_arcs_from_ovs;
18use super::orc::{get_dataset_readers, par_iter_arrow};
19use super::TextSwhid;
20use crate::NodeType;
21
22pub fn iter_swhids(
23    dataset_dir: &PathBuf,
24    allowed_node_types: &[NodeType],
25) -> Result<impl ParallelIterator<Item = TextSwhid>> {
26    let maybe_get_dataset_readers = |dataset_dir: &PathBuf, subdirectory, node_type| {
27        if allowed_node_types.contains(&node_type) {
28            get_dataset_readers(dataset_dir, subdirectory)
29        } else {
30            Ok(Vec::new())
31        }
32    };
33
34    Ok([]
35        .into_par_iter()
36        .chain(
37            maybe_get_dataset_readers(dataset_dir, "directory", NodeType::Directory)?
38                .into_par_iter()
39                .flat_map(iter_swhids_from_dir),
40        )
41        .chain(
42            maybe_get_dataset_readers(dataset_dir, "directory_entry", NodeType::Directory)?
43                .into_par_iter()
44                .flat_map(iter_swhids_from_dir_entry),
45        )
46        .chain(
47            maybe_get_dataset_readers(dataset_dir, "content", NodeType::Content)?
48                .into_par_iter()
49                .flat_map(iter_swhids_from_cnt),
50        )
51        .chain(
52            maybe_get_dataset_readers(dataset_dir, "origin", NodeType::Origin)?
53                .into_par_iter()
54                .flat_map(iter_swhids_from_ori),
55        )
56        .chain(
57            maybe_get_dataset_readers(dataset_dir, "origin_visit_status", NodeType::Origin)?
58                .into_par_iter()
59                .flat_map_iter(iter_arcs_from_ovs)
60                .flat_map_iter(|(src, dst)| [src, dst].into_iter()),
61        )
62        .chain(
63            maybe_get_dataset_readers(dataset_dir, "release", NodeType::Release)?
64                .into_par_iter()
65                .flat_map(iter_rel_swhids_from_rel),
66        )
67        .chain(
68            maybe_get_dataset_readers(dataset_dir, "release", NodeType::Release)?
69                .into_par_iter()
70                .flat_map(iter_target_swhids_from_rel),
71        )
72        .chain(
73            maybe_get_dataset_readers(dataset_dir, "revision", NodeType::Revision)?
74                .into_par_iter()
75                .flat_map(iter_rev_swhids_from_rev),
76        )
77        .chain(
78            maybe_get_dataset_readers(dataset_dir, "revision", NodeType::Revision)?
79                .into_par_iter()
80                .flat_map(iter_dir_swhids_from_rev),
81        )
82        .chain(
83            maybe_get_dataset_readers(dataset_dir, "revision_history", NodeType::Revision)?
84                .into_par_iter()
85                .flat_map(iter_parent_swhids_from_rev),
86        )
87        .chain(
88            maybe_get_dataset_readers(dataset_dir, "snapshot", NodeType::Snapshot)?
89                .into_par_iter()
90                .flat_map(iter_swhids_from_snp),
91        )
92        .chain(
93            maybe_get_dataset_readers(dataset_dir, "snapshot_branch", NodeType::Snapshot)?
94                .into_par_iter()
95                .flat_map(iter_swhids_from_snp_branch),
96        ))
97}
98
99fn map_swhids<R: ChunkReader + Send, T, F>(
100    reader_builder: ArrowReaderBuilder<R>,
101    f: F,
102) -> impl ParallelIterator<Item = TextSwhid>
103where
104    F: Fn(T) -> Option<String> + Send + Sync,
105    T: ArRowDeserialize + ArRowStruct + Send,
106{
107    par_iter_arrow(reader_builder, move |record: T| {
108        f(record).map(|swhid| swhid.as_bytes().try_into().unwrap())
109    })
110}
111
112fn iter_swhids_from_dir_entry<R: ChunkReader + Send>(
113    reader_builder: ArrowReaderBuilder<R>,
114) -> impl ParallelIterator<Item = TextSwhid> {
115    #[derive(ArRowDeserialize, Default, Clone)]
116    struct DirectoryEntry {
117        r#type: String,
118        target: String,
119    }
120
121    map_swhids(reader_builder, |entry: DirectoryEntry| {
122        Some(match entry.r#type.as_bytes() {
123            b"file" => format!("swh:1:cnt:{}", entry.target),
124            b"dir" => format!("swh:1:dir:{}", entry.target),
125            b"rev" => format!("swh:1:rev:{}", entry.target),
126            _ => panic!("Unexpected directory entry type: {:?}", entry.r#type),
127        })
128    })
129}
130
131fn iter_swhids_from_dir<R: ChunkReader + Send>(
132    reader_builder: ArrowReaderBuilder<R>,
133) -> impl ParallelIterator<Item = TextSwhid> {
134    #[derive(ArRowDeserialize, Default, Clone)]
135    struct Directory {
136        id: String,
137    }
138
139    map_swhids(reader_builder, |dir: Directory| {
140        Some(format!("swh:1:dir:{}", dir.id))
141    })
142}
143
144fn iter_swhids_from_cnt<R: ChunkReader + Send>(
145    reader_builder: ArrowReaderBuilder<R>,
146) -> impl ParallelIterator<Item = TextSwhid> {
147    #[derive(ArRowDeserialize, Default, Clone)]
148    struct Content {
149        sha1_git: String,
150    }
151
152    map_swhids(reader_builder, |cnt: Content| {
153        Some(format!("swh:1:cnt:{}", cnt.sha1_git))
154    })
155}
156
157fn iter_swhids_from_ori<R: ChunkReader + Send>(
158    reader_builder: ArrowReaderBuilder<R>,
159) -> impl ParallelIterator<Item = TextSwhid> {
160    #[derive(ArRowDeserialize, Default, Clone)]
161    struct Origin {
162        id: String,
163    }
164
165    map_swhids(reader_builder, |ori: Origin| {
166        Some(format!("swh:1:ori:{}", ori.id))
167    })
168}
169
170fn iter_rel_swhids_from_rel<R: ChunkReader + Send>(
171    reader_builder: ArrowReaderBuilder<R>,
172) -> impl ParallelIterator<Item = TextSwhid> {
173    #[derive(ArRowDeserialize, Default, Clone)]
174    struct Release {
175        id: String,
176    }
177
178    map_swhids(reader_builder, |rel: Release| {
179        Some(format!("swh:1:rel:{}", rel.id))
180    })
181}
182
183fn iter_target_swhids_from_rel<R: ChunkReader + Send>(
184    reader_builder: ArrowReaderBuilder<R>,
185) -> impl ParallelIterator<Item = TextSwhid> {
186    #[derive(ArRowDeserialize, Default, Clone)]
187    struct Release {
188        target: String,
189        target_type: String,
190    }
191
192    map_swhids(reader_builder, |entry: Release| {
193        Some(match entry.target_type.as_bytes() {
194            b"content" => format!("swh:1:cnt:{}", entry.target),
195            b"directory" => format!("swh:1:dir:{}", entry.target),
196            b"revision" => format!("swh:1:rev:{}", entry.target),
197            b"release" => format!("swh:1:rel:{}", entry.target),
198            _ => panic!("Unexpected release target type: {:?}", entry.target_type),
199        })
200    })
201}
202
203fn iter_rev_swhids_from_rev<R: ChunkReader + Send>(
204    reader_builder: ArrowReaderBuilder<R>,
205) -> impl ParallelIterator<Item = TextSwhid> {
206    #[derive(ArRowDeserialize, Default, Clone)]
207    struct Revision {
208        id: String,
209    }
210
211    map_swhids(reader_builder, |dir: Revision| {
212        Some(format!("swh:1:rev:{}", dir.id))
213    })
214}
215
216fn iter_dir_swhids_from_rev<R: ChunkReader + Send>(
217    reader_builder: ArrowReaderBuilder<R>,
218) -> impl ParallelIterator<Item = TextSwhid> {
219    #[derive(ArRowDeserialize, Default, Clone)]
220    struct Revision {
221        directory: String,
222    }
223
224    map_swhids(reader_builder, |rev: Revision| {
225        Some(format!("swh:1:dir:{}", rev.directory))
226    })
227}
228
229fn iter_parent_swhids_from_rev<R: ChunkReader + Send>(
230    reader_builder: ArrowReaderBuilder<R>,
231) -> impl ParallelIterator<Item = TextSwhid> {
232    #[derive(ArRowDeserialize, Default, Clone)]
233    struct RevisionParent {
234        parent_id: String,
235    }
236
237    map_swhids(reader_builder, |rev: RevisionParent| {
238        Some(format!("swh:1:rev:{}", rev.parent_id))
239    })
240}
241
242fn iter_swhids_from_snp<R: ChunkReader + Send>(
243    reader_builder: ArrowReaderBuilder<R>,
244) -> impl ParallelIterator<Item = TextSwhid> {
245    #[derive(ArRowDeserialize, Default, Clone)]
246    struct Snapshot {
247        id: String,
248    }
249
250    map_swhids(reader_builder, |dir: Snapshot| {
251        Some(format!("swh:1:snp:{}", dir.id))
252    })
253}
254
255fn iter_swhids_from_snp_branch<R: ChunkReader + Send>(
256    reader_builder: ArrowReaderBuilder<R>,
257) -> impl ParallelIterator<Item = TextSwhid> {
258    #[derive(ArRowDeserialize, Default, Clone)]
259    struct SnapshotBranch {
260        target: String,
261        target_type: String,
262    }
263
264    map_swhids(reader_builder, |branch: SnapshotBranch| {
265        match branch.target_type.as_bytes() {
266            b"content" => Some(format!("swh:1:cnt:{}", branch.target)),
267            b"directory" => Some(format!("swh:1:dir:{}", branch.target)),
268            b"revision" => Some(format!("swh:1:rev:{}", branch.target)),
269            b"release" => Some(format!("swh:1:rel:{}", branch.target)),
270            b"alias" => None,
271            _ => panic!("Unexpected snapshot target type: {:?}", branch.target_type),
272        }
273    })
274}