1use std::path::PathBuf;
9
10use anyhow::Result;
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use ar_row_derive::ArRowDeserialize;
13use orc_rust::arrow_reader::ArrowReaderBuilder;
14use orc_rust::reader::ChunkReader;
15use rayon::prelude::*;
16
17use super::orc::{get_dataset_readers, iter_arrow};
18use super::TextSwhid;
19use crate::NodeType;
20use crate::SWHID;
21
22pub fn iter_arcs(
23 dataset_dir: &PathBuf,
24 allowed_node_types: &[NodeType],
25) -> Result<impl ParallelIterator<Item = (TextSwhid, TextSwhid)>> {
26 let maybe_get_dataset_readers = |dataset_dir, subdirectory, node_type| {
27 if allowed_node_types.contains(&node_type) {
28 get_dataset_readers(dataset_dir, subdirectory)
29 } else {
30 Ok(Vec::new())
31 }
32 };
33
34 Ok([]
35 .into_par_iter()
36 .chain(
37 maybe_get_dataset_readers(dataset_dir, "directory_entry", NodeType::Directory)?
38 .into_par_iter()
39 .flat_map_iter(iter_arcs_from_dir_entry),
40 )
41 .chain(
42 maybe_get_dataset_readers(dataset_dir, "origin_visit_status", NodeType::Origin)?
43 .into_par_iter()
44 .flat_map_iter(iter_arcs_from_ovs),
45 )
46 .chain(
47 maybe_get_dataset_readers(dataset_dir, "release", NodeType::Release)?
48 .into_par_iter()
49 .flat_map_iter(iter_arcs_from_rel),
50 )
51 .chain(
52 maybe_get_dataset_readers(dataset_dir, "revision", NodeType::Revision)?
53 .into_par_iter()
54 .flat_map_iter(iter_arcs_from_rev),
55 )
56 .chain(
57 maybe_get_dataset_readers(dataset_dir, "revision_history", NodeType::Revision)?
58 .into_par_iter()
59 .flat_map_iter(iter_arcs_from_rev_history),
60 )
61 .chain(
62 maybe_get_dataset_readers(dataset_dir, "snapshot_branch", NodeType::Snapshot)?
63 .into_par_iter()
64 .flat_map_iter(iter_arcs_from_snp_branch),
65 ))
66}
67
68fn map_arcs<R: ChunkReader + Send, T, F>(
69 reader_builder: ArrowReaderBuilder<R>,
70 f: F,
71) -> impl Iterator<Item = (TextSwhid, TextSwhid)>
72where
73 F: Fn(T) -> Option<(String, String)> + Send + Sync,
74 T: ArRowDeserialize + ArRowStruct + Send,
75{
76 iter_arrow(reader_builder, move |record: T| {
77 f(record).map(|(src_swhid, dst_swhid)| {
78 (
79 src_swhid.as_bytes().try_into().unwrap(),
80 dst_swhid.as_bytes().try_into().unwrap(),
81 )
82 })
83 })
84}
85
86fn iter_arcs_from_dir_entry<R: ChunkReader + Send>(
87 reader_builder: ArrowReaderBuilder<R>,
88) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
89 #[derive(ArRowDeserialize, Default, Clone)]
90 struct DirectoryEntry {
91 directory_id: String,
92 r#type: String,
93 target: String,
94 }
95
96 map_arcs(reader_builder, |entry: DirectoryEntry| {
97 Some((
98 format!("swh:1:dir:{}", entry.directory_id),
99 match entry.r#type.as_bytes() {
100 b"file" => format!("swh:1:cnt:{}", entry.target),
101 b"dir" => format!("swh:1:dir:{}", entry.target),
102 b"rev" => format!("swh:1:rev:{}", entry.target),
103 _ => panic!("Unexpected directory entry type: {:?}", entry.r#type),
104 },
105 ))
106 })
107}
108
109pub(super) fn iter_arcs_from_ovs<R: ChunkReader + Send>(
110 reader_builder: ArrowReaderBuilder<R>,
111) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
112 #[derive(ArRowDeserialize, Default, Clone)]
113 struct OriginVisitStatus {
114 origin: String,
115 snapshot: Option<String>,
116 }
117
118 map_arcs(reader_builder, |ovs: OriginVisitStatus| {
119 ovs.snapshot.as_ref().map(|snapshot| {
120 (
121 SWHID::from_origin_url(ovs.origin).to_string(),
122 format!("swh:1:snp:{snapshot}"),
123 )
124 })
125 })
126}
127
128pub(super) fn iter_arcs_from_rel<R: ChunkReader + Send>(
129 reader_builder: ArrowReaderBuilder<R>,
130) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
131 #[derive(ArRowDeserialize, Default, Clone)]
132 struct Release {
133 id: String,
134 target: String,
135 target_type: String,
136 }
137
138 map_arcs(reader_builder, |entry: Release| {
139 Some((
140 format!("swh:1:rel:{}", entry.id),
141 match entry.target_type.as_bytes() {
142 b"content" => format!("swh:1:cnt:{}", entry.target),
143 b"directory" => format!("swh:1:dir:{}", entry.target),
144 b"revision" => format!("swh:1:rev:{}", entry.target),
145 b"release" => format!("swh:1:rel:{}", entry.target),
146 _ => panic!("Unexpected release target type: {:?}", entry.target_type),
147 },
148 ))
149 })
150}
151
152pub(super) fn iter_arcs_from_rev<R: ChunkReader + Send>(
153 reader_builder: ArrowReaderBuilder<R>,
154) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
155 #[derive(ArRowDeserialize, Default, Clone)]
156 struct Revision {
157 id: String,
158 directory: String,
159 }
160
161 map_arcs(reader_builder, |rev: Revision| {
162 Some((
163 format!("swh:1:rev:{}", rev.id),
164 format!("swh:1:dir:{}", rev.directory),
165 ))
166 })
167}
168
169pub(super) fn iter_arcs_from_rev_history<R: ChunkReader + Send>(
170 reader_builder: ArrowReaderBuilder<R>,
171) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
172 #[derive(ArRowDeserialize, Default, Clone)]
173 struct RevisionParent {
174 id: String,
175 parent_id: String,
176 }
177
178 map_arcs(reader_builder, |rev: RevisionParent| {
179 Some((
180 format!("swh:1:rev:{}", rev.id),
181 format!("swh:1:rev:{}", rev.parent_id),
182 ))
183 })
184}
185
186fn iter_arcs_from_snp_branch<R: ChunkReader + Send>(
187 reader_builder: ArrowReaderBuilder<R>,
188) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
189 #[derive(ArRowDeserialize, Default, Clone)]
190 struct SnapshotBranch {
191 snapshot_id: String,
192 target: String,
193 target_type: String,
194 }
195
196 map_arcs(reader_builder, |branch: SnapshotBranch| {
197 let dst = match branch.target_type.as_bytes() {
198 b"content" => Some(format!("swh:1:cnt:{}", branch.target)),
199 b"directory" => Some(format!("swh:1:dir:{}", branch.target)),
200 b"revision" => Some(format!("swh:1:rev:{}", branch.target)),
201 b"release" => Some(format!("swh:1:rel:{}", branch.target)),
202 b"alias" => None,
203 _ => panic!("Unexpected snapshot target type: {:?}", branch.target_type),
204 };
205 dst.map(|dst| (format!("swh:1:snp:{}", branch.snapshot_id), dst))
206 })
207}