1use std::path::PathBuf;
9
10use anyhow::Result;
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use ar_row_derive::ArRowDeserialize;
13use orc_rust::arrow_reader::ArrowReaderBuilder;
14use orc_rust::reader::ChunkReader;
15use rayon::prelude::*;
16
17use super::iter_arcs::iter_arcs_from_ovs;
18use super::orc::{get_dataset_readers, par_iter_arrow};
19use super::TextSwhid;
20use crate::NodeType;
21
22pub fn iter_swhids(
23 dataset_dir: &PathBuf,
24 allowed_node_types: &[NodeType],
25) -> Result<impl ParallelIterator<Item = TextSwhid>> {
26 let maybe_get_dataset_readers = |dataset_dir: &PathBuf, subdirectory, node_type| {
27 if allowed_node_types.contains(&node_type) {
28 get_dataset_readers(dataset_dir, subdirectory)
29 } else {
30 Ok(Vec::new())
31 }
32 };
33
34 Ok([]
35 .into_par_iter()
36 .chain(
37 maybe_get_dataset_readers(dataset_dir, "directory", NodeType::Directory)?
38 .into_par_iter()
39 .flat_map(iter_swhids_from_dir),
40 )
41 .chain(
42 maybe_get_dataset_readers(dataset_dir, "directory_entry", NodeType::Directory)?
43 .into_par_iter()
44 .flat_map(iter_swhids_from_dir_entry),
45 )
46 .chain(
47 maybe_get_dataset_readers(dataset_dir, "content", NodeType::Content)?
48 .into_par_iter()
49 .flat_map(iter_swhids_from_cnt),
50 )
51 .chain(
52 maybe_get_dataset_readers(dataset_dir, "origin", NodeType::Origin)?
53 .into_par_iter()
54 .flat_map(iter_swhids_from_ori),
55 )
56 .chain(
57 maybe_get_dataset_readers(dataset_dir, "origin_visit_status", NodeType::Origin)?
58 .into_par_iter()
59 .flat_map_iter(iter_arcs_from_ovs)
60 .flat_map_iter(|(src, dst)| [src, dst].into_iter()),
61 )
62 .chain(
63 maybe_get_dataset_readers(dataset_dir, "release", NodeType::Release)?
64 .into_par_iter()
65 .flat_map(iter_rel_swhids_from_rel),
66 )
67 .chain(
68 maybe_get_dataset_readers(dataset_dir, "release", NodeType::Release)?
69 .into_par_iter()
70 .flat_map(iter_target_swhids_from_rel),
71 )
72 .chain(
73 maybe_get_dataset_readers(dataset_dir, "revision", NodeType::Revision)?
74 .into_par_iter()
75 .flat_map(iter_rev_swhids_from_rev),
76 )
77 .chain(
78 maybe_get_dataset_readers(dataset_dir, "revision", NodeType::Revision)?
79 .into_par_iter()
80 .flat_map(iter_dir_swhids_from_rev),
81 )
82 .chain(
83 maybe_get_dataset_readers(dataset_dir, "revision_history", NodeType::Revision)?
84 .into_par_iter()
85 .flat_map(iter_parent_swhids_from_rev),
86 )
87 .chain(
88 maybe_get_dataset_readers(dataset_dir, "snapshot", NodeType::Snapshot)?
89 .into_par_iter()
90 .flat_map(iter_swhids_from_snp),
91 )
92 .chain(
93 maybe_get_dataset_readers(dataset_dir, "snapshot_branch", NodeType::Snapshot)?
94 .into_par_iter()
95 .flat_map(iter_swhids_from_snp_branch),
96 ))
97}
98
99fn map_swhids<R: ChunkReader + Send, T, F>(
100 reader_builder: ArrowReaderBuilder<R>,
101 f: F,
102) -> impl ParallelIterator<Item = TextSwhid>
103where
104 F: Fn(T) -> Option<String> + Send + Sync,
105 T: ArRowDeserialize + ArRowStruct + Send,
106{
107 par_iter_arrow(reader_builder, move |record: T| {
108 f(record).map(|swhid| swhid.as_bytes().try_into().unwrap())
109 })
110}
111
112fn iter_swhids_from_dir_entry<R: ChunkReader + Send>(
113 reader_builder: ArrowReaderBuilder<R>,
114) -> impl ParallelIterator<Item = TextSwhid> {
115 #[derive(ArRowDeserialize, Default, Clone)]
116 struct DirectoryEntry {
117 r#type: String,
118 target: String,
119 }
120
121 map_swhids(reader_builder, |entry: DirectoryEntry| {
122 Some(match entry.r#type.as_bytes() {
123 b"file" => format!("swh:1:cnt:{}", entry.target),
124 b"dir" => format!("swh:1:dir:{}", entry.target),
125 b"rev" => format!("swh:1:rev:{}", entry.target),
126 _ => panic!("Unexpected directory entry type: {:?}", entry.r#type),
127 })
128 })
129}
130
131fn iter_swhids_from_dir<R: ChunkReader + Send>(
132 reader_builder: ArrowReaderBuilder<R>,
133) -> impl ParallelIterator<Item = TextSwhid> {
134 #[derive(ArRowDeserialize, Default, Clone)]
135 struct Directory {
136 id: String,
137 }
138
139 map_swhids(reader_builder, |dir: Directory| {
140 Some(format!("swh:1:dir:{}", dir.id))
141 })
142}
143
144fn iter_swhids_from_cnt<R: ChunkReader + Send>(
145 reader_builder: ArrowReaderBuilder<R>,
146) -> impl ParallelIterator<Item = TextSwhid> {
147 #[derive(ArRowDeserialize, Default, Clone)]
148 struct Content {
149 sha1_git: String,
150 }
151
152 map_swhids(reader_builder, |cnt: Content| {
153 Some(format!("swh:1:cnt:{}", cnt.sha1_git))
154 })
155}
156
157fn iter_swhids_from_ori<R: ChunkReader + Send>(
158 reader_builder: ArrowReaderBuilder<R>,
159) -> impl ParallelIterator<Item = TextSwhid> {
160 #[derive(ArRowDeserialize, Default, Clone)]
161 struct Origin {
162 id: String,
163 }
164
165 map_swhids(reader_builder, |ori: Origin| {
166 Some(format!("swh:1:ori:{}", ori.id))
167 })
168}
169
170fn iter_rel_swhids_from_rel<R: ChunkReader + Send>(
171 reader_builder: ArrowReaderBuilder<R>,
172) -> impl ParallelIterator<Item = TextSwhid> {
173 #[derive(ArRowDeserialize, Default, Clone)]
174 struct Release {
175 id: String,
176 }
177
178 map_swhids(reader_builder, |rel: Release| {
179 Some(format!("swh:1:rel:{}", rel.id))
180 })
181}
182
183fn iter_target_swhids_from_rel<R: ChunkReader + Send>(
184 reader_builder: ArrowReaderBuilder<R>,
185) -> impl ParallelIterator<Item = TextSwhid> {
186 #[derive(ArRowDeserialize, Default, Clone)]
187 struct Release {
188 target: String,
189 target_type: String,
190 }
191
192 map_swhids(reader_builder, |entry: Release| {
193 Some(match entry.target_type.as_bytes() {
194 b"content" => format!("swh:1:cnt:{}", entry.target),
195 b"directory" => format!("swh:1:dir:{}", entry.target),
196 b"revision" => format!("swh:1:rev:{}", entry.target),
197 b"release" => format!("swh:1:rel:{}", entry.target),
198 _ => panic!("Unexpected release target type: {:?}", entry.target_type),
199 })
200 })
201}
202
203fn iter_rev_swhids_from_rev<R: ChunkReader + Send>(
204 reader_builder: ArrowReaderBuilder<R>,
205) -> impl ParallelIterator<Item = TextSwhid> {
206 #[derive(ArRowDeserialize, Default, Clone)]
207 struct Revision {
208 id: String,
209 }
210
211 map_swhids(reader_builder, |dir: Revision| {
212 Some(format!("swh:1:rev:{}", dir.id))
213 })
214}
215
216fn iter_dir_swhids_from_rev<R: ChunkReader + Send>(
217 reader_builder: ArrowReaderBuilder<R>,
218) -> impl ParallelIterator<Item = TextSwhid> {
219 #[derive(ArRowDeserialize, Default, Clone)]
220 struct Revision {
221 directory: String,
222 }
223
224 map_swhids(reader_builder, |rev: Revision| {
225 Some(format!("swh:1:dir:{}", rev.directory))
226 })
227}
228
229fn iter_parent_swhids_from_rev<R: ChunkReader + Send>(
230 reader_builder: ArrowReaderBuilder<R>,
231) -> impl ParallelIterator<Item = TextSwhid> {
232 #[derive(ArRowDeserialize, Default, Clone)]
233 struct RevisionParent {
234 parent_id: String,
235 }
236
237 map_swhids(reader_builder, |rev: RevisionParent| {
238 Some(format!("swh:1:rev:{}", rev.parent_id))
239 })
240}
241
242fn iter_swhids_from_snp<R: ChunkReader + Send>(
243 reader_builder: ArrowReaderBuilder<R>,
244) -> impl ParallelIterator<Item = TextSwhid> {
245 #[derive(ArRowDeserialize, Default, Clone)]
246 struct Snapshot {
247 id: String,
248 }
249
250 map_swhids(reader_builder, |dir: Snapshot| {
251 Some(format!("swh:1:snp:{}", dir.id))
252 })
253}
254
255fn iter_swhids_from_snp_branch<R: ChunkReader + Send>(
256 reader_builder: ArrowReaderBuilder<R>,
257) -> impl ParallelIterator<Item = TextSwhid> {
258 #[derive(ArRowDeserialize, Default, Clone)]
259 struct SnapshotBranch {
260 target: String,
261 target_type: String,
262 }
263
264 map_swhids(reader_builder, |branch: SnapshotBranch| {
265 match branch.target_type.as_bytes() {
266 b"content" => Some(format!("swh:1:cnt:{}", branch.target)),
267 b"directory" => Some(format!("swh:1:dir:{}", branch.target)),
268 b"revision" => Some(format!("swh:1:rev:{}", branch.target)),
269 b"release" => Some(format!("swh:1:rel:{}", branch.target)),
270 b"alias" => None,
271 _ => panic!("Unexpected snapshot target type: {:?}", branch.target_type),
272 }
273 })
274}