1use std::path::PathBuf;
9
10use anyhow::Result;
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use ar_row_derive::ArRowDeserialize;
13use nonmax::NonMaxU64;
14use orc_rust::arrow_reader::ArrowReaderBuilder;
15use orc_rust::reader::ChunkReader;
16use rayon::prelude::*;
17
18use super::iter_arcs::{iter_arcs_from_rel, iter_arcs_from_rev, iter_arcs_from_rev_history};
19use super::orc::{get_dataset_readers, iter_arrow};
20use super::TextSwhid;
21use crate::compress::label_names::LabelNameHasher;
22use crate::labels::{
23 Branch, DirEntry, EdgeLabel, Permission, UntypedEdgeLabel, Visit, VisitStatus,
24};
25use crate::{NodeType, SWHID};
26
27pub fn iter_labeled_arcs<'a>(
28 dataset_dir: &'a PathBuf,
29 allowed_node_types: &'a [NodeType],
30 label_name_hasher: LabelNameHasher<'a>,
31) -> Result<impl ParallelIterator<Item = (TextSwhid, TextSwhid, Option<NonMaxU64>)> + 'a> {
32 let maybe_get_dataset_readers = |dataset_dir, subdirectory, node_type| {
33 if allowed_node_types.contains(&node_type) {
34 get_dataset_readers(dataset_dir, subdirectory)
35 } else {
36 Ok(Vec::new())
37 }
38 };
39
40 Ok([]
41 .into_par_iter()
42 .chain(
43 maybe_get_dataset_readers(dataset_dir, "directory_entry", NodeType::Directory)?
44 .into_par_iter()
45 .flat_map_iter(move |rb| iter_labeled_arcs_from_dir_entry(rb, label_name_hasher)),
46 )
47 .chain(
48 maybe_get_dataset_readers(dataset_dir, "origin_visit_status", NodeType::Origin)?
49 .into_par_iter()
50 .flat_map_iter(iter_labeled_arcs_from_ovs),
51 )
52 .chain(
53 maybe_get_dataset_readers(dataset_dir, "release", NodeType::Release)?
54 .into_par_iter()
55 .flat_map_iter(iter_arcs_from_rel)
56 .map(|(src, dst)| (src, dst, None)),
57 )
58 .chain(
59 maybe_get_dataset_readers(dataset_dir, "revision", NodeType::Revision)?
60 .into_par_iter()
61 .flat_map_iter(iter_arcs_from_rev)
62 .map(|(src, dst)| (src, dst, None)),
63 )
64 .chain(
65 maybe_get_dataset_readers(dataset_dir, "revision_history", NodeType::Revision)?
66 .into_par_iter()
67 .flat_map_iter(iter_arcs_from_rev_history)
68 .map(|(src, dst)| (src, dst, None)),
69 )
70 .chain(
71 maybe_get_dataset_readers(dataset_dir, "snapshot_branch", NodeType::Snapshot)?
72 .into_par_iter()
73 .flat_map_iter(move |rb| iter_labeled_arcs_from_snp_branch(rb, label_name_hasher)),
74 ))
75}
76
77fn map_labeled_arcs<R: ChunkReader + Send, T, F>(
78 reader_builder: ArrowReaderBuilder<R>,
79 f: F,
80) -> impl Iterator<Item = (TextSwhid, TextSwhid, Option<NonMaxU64>)>
81where
82 F: Fn(T) -> Option<(String, String, Option<EdgeLabel>)> + Send + Sync,
83 T: Send + ArRowDeserialize + ArRowStruct,
84{
85 iter_arrow(reader_builder, move |record: T| {
86 f(record).map(|(src_swhid, dst_swhid, label)| {
87 (
88 src_swhid.as_bytes().try_into().unwrap(),
89 dst_swhid.as_bytes().try_into().unwrap(),
90 label.map(|label| {
91 UntypedEdgeLabel::from(label)
92 .0
93 .try_into()
94 .expect("label is 0")
95 }),
96 )
97 })
98 })
99}
100
101fn iter_labeled_arcs_from_dir_entry<'a, R: ChunkReader + Send + 'a>(
102 reader_builder: ArrowReaderBuilder<R>,
103 label_name_hasher: LabelNameHasher<'a>,
104) -> impl Iterator<Item = (TextSwhid, TextSwhid, Option<NonMaxU64>)> + 'a {
105 #[derive(ArRowDeserialize, Default, Clone)]
106 struct DirectoryEntry {
107 directory_id: String,
108 name: Box<[u8]>,
109 r#type: String,
110 target: String,
111 perms: i32,
112 }
113
114 map_labeled_arcs(reader_builder, move |entry: DirectoryEntry| {
115 Some((
116 format!("swh:1:dir:{}", entry.directory_id),
117 match entry.r#type.as_bytes() {
118 b"file" => format!("swh:1:cnt:{}", entry.target),
119 b"dir" => format!("swh:1:dir:{}", entry.target),
120 b"rev" => format!("swh:1:rev:{}", entry.target),
121 _ => panic!("Unexpected directory entry type: {:?}", entry.r#type),
122 },
123 DirEntry::new(
124 u16::try_from(entry.perms)
125 .ok()
126 .and_then(Permission::from_git)
127 .unwrap_or(Permission::None),
128 label_name_hasher
129 .hash(entry.name)
130 .expect("Could not hash dir entry name"),
131 )
132 .map(EdgeLabel::DirEntry),
133 ))
134 })
135}
136
137fn iter_labeled_arcs_from_ovs<R: ChunkReader + Send>(
138 reader_builder: ArrowReaderBuilder<R>,
139) -> impl Iterator<Item = (TextSwhid, TextSwhid, Option<NonMaxU64>)> {
140 #[derive(ArRowDeserialize, Default, Clone)]
141 struct OriginVisitStatus {
142 origin: String,
143 date: Option<ar_row::Timestamp>,
144 status: String,
145 snapshot: Option<String>,
146 }
147
148 map_labeled_arcs(reader_builder, |ovs: OriginVisitStatus| {
149 ovs.snapshot.as_ref().map(|snapshot| {
150 (
151 SWHID::from_origin_url(ovs.origin).to_string(),
152 format!("swh:1:snp:{snapshot}"),
153 Visit::new(
154 match ovs.status.as_str() {
155 "full" => VisitStatus::Full,
156 _ => VisitStatus::Partial,
157 },
158 ovs.date
159 .unwrap_or(ar_row::Timestamp {
160 seconds: 0,
161 nanoseconds: 0,
162 })
163 .seconds
164 .try_into()
165 .expect("Negative visit date"),
166 )
167 .map(EdgeLabel::Visit),
168 )
169 })
170 })
171}
172
173fn iter_labeled_arcs_from_snp_branch<'a, R: ChunkReader + Send + 'a>(
174 reader_builder: ArrowReaderBuilder<R>,
175 label_name_hasher: LabelNameHasher<'a>,
176) -> impl Iterator<Item = (TextSwhid, TextSwhid, Option<NonMaxU64>)> + 'a {
177 #[derive(ArRowDeserialize, Default, Clone)]
178 struct SnapshotBranch {
179 snapshot_id: String,
180 name: Box<[u8]>,
181 target: String,
182 target_type: String,
183 }
184
185 map_labeled_arcs(reader_builder, move |branch: SnapshotBranch| {
186 let dst = match branch.target_type.as_bytes() {
187 b"content" => Some(format!("swh:1:cnt:{}", branch.target)),
188 b"directory" => Some(format!("swh:1:dir:{}", branch.target)),
189 b"revision" => Some(format!("swh:1:rev:{}", branch.target)),
190 b"release" => Some(format!("swh:1:rel:{}", branch.target)),
191 b"alias" => None,
192 _ => panic!("Unexpected snapshot target type: {:?}", branch.target_type),
193 };
194 dst.map(|dst| {
195 (
196 format!("swh:1:snp:{}", branch.snapshot_id),
197 dst,
198 Branch::new(
199 label_name_hasher
200 .hash(branch.name)
201 .expect("Could not hash branch name"),
202 )
203 .map(EdgeLabel::Branch),
204 )
205 })
206 })
207}