1use std::path::PathBuf;
9
10use anyhow::Result;
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use ar_row_derive::ArRowDeserialize;
13use orc_rust::arrow_reader::ArrowReaderBuilder;
14use orc_rust::projection::ProjectionMask;
15use orc_rust::reader::ChunkReader;
16use rayon::prelude::*;
17
18use super::orc::{get_dataset_readers, iter_arrow};
19use crate::NodeType;
20
21fn count_arrow_rows<R: ChunkReader>(reader_builder: ArrowReaderBuilder<R>) -> u64 {
22 let empty_mask = ProjectionMask::roots(reader_builder.file_metadata().root_data_type(), []); let reader = reader_builder.with_projection(empty_mask).build();
24 reader.total_row_count()
25}
26
27pub fn estimate_node_count(dataset_dir: &PathBuf, allowed_node_types: &[NodeType]) -> Result<u64> {
28 let mut readers = Vec::new();
29 if allowed_node_types.contains(&NodeType::Directory) {
30 readers.extend(get_dataset_readers(dataset_dir, "directory")?);
31 }
32 if allowed_node_types.contains(&NodeType::Content) {
33 readers.extend(get_dataset_readers(dataset_dir, "content")?);
34 }
35 if allowed_node_types.contains(&NodeType::Origin) {
36 readers.extend(get_dataset_readers(dataset_dir, "origin")?);
37 }
38 if allowed_node_types.contains(&NodeType::Release) {
39 readers.extend(get_dataset_readers(dataset_dir, "release")?);
40 }
41 if allowed_node_types.contains(&NodeType::Revision) {
42 readers.extend(get_dataset_readers(dataset_dir, "revision")?);
43 }
44 if allowed_node_types.contains(&NodeType::Snapshot) {
45 readers.extend(get_dataset_readers(dataset_dir, "snapshot")?);
46 }
47 Ok(readers.into_par_iter().map(count_arrow_rows).sum())
48}
49
50pub fn estimate_edge_count(dataset_dir: &PathBuf, allowed_node_types: &[NodeType]) -> Result<u64> {
51 let mut readers = Vec::new();
52 if allowed_node_types.contains(&NodeType::Directory) {
53 readers.extend(get_dataset_readers(dataset_dir, "directory_entry")?)
54 }
55 if allowed_node_types.contains(&NodeType::Origin) {
56 readers.extend(get_dataset_readers(
57 dataset_dir.clone(),
59 "origin_visit_status",
60 )?);
61 readers.extend(get_dataset_readers(
62 dataset_dir.clone(),
64 "origin_visit_status",
65 )?);
66 }
67 if allowed_node_types.contains(&NodeType::Release) {
68 readers.extend(get_dataset_readers(dataset_dir, "release")?);
69 }
70 if allowed_node_types.contains(&NodeType::Revision) {
71 readers.extend(get_dataset_readers(dataset_dir, "revision")?);
72 readers.extend(get_dataset_readers(dataset_dir, "revision_history")?);
73 }
74 if allowed_node_types.contains(&NodeType::Snapshot) {
75 readers.extend(get_dataset_readers(dataset_dir, "snapshot_branch")?);
76 }
77 Ok(readers.into_par_iter().map(count_arrow_rows).sum())
78}
79
80type EdgeStats = [[usize; NodeType::NUMBER_OF_TYPES]; NodeType::NUMBER_OF_TYPES];
81
82pub fn count_edge_types(
83 dataset_dir: &PathBuf,
84 allowed_node_types: &[NodeType],
85) -> Result<impl ParallelIterator<Item = EdgeStats>> {
86 let maybe_get_dataset_readers = |dataset_dir, subdirectory, node_type| {
87 if allowed_node_types.contains(&node_type) {
88 get_dataset_readers(dataset_dir, subdirectory)
89 } else {
90 Ok(Vec::new())
91 }
92 };
93
94 Ok([]
95 .into_par_iter()
96 .chain(
97 maybe_get_dataset_readers(dataset_dir, "directory_entry", NodeType::Directory)?
98 .into_par_iter()
99 .map(count_edge_types_from_dir),
100 )
101 .chain(
102 maybe_get_dataset_readers(dataset_dir, "origin_visit_status", NodeType::Origin)?
103 .into_par_iter()
104 .map(count_edge_types_from_ovs),
105 )
106 .chain(
107 maybe_get_dataset_readers(dataset_dir, "release", NodeType::Release)?
108 .into_par_iter()
109 .map(count_edge_types_from_rel),
110 )
111 .chain(
112 maybe_get_dataset_readers(dataset_dir, "revision", NodeType::Revision)?
113 .into_par_iter()
114 .map(count_dir_edge_types_from_rev),
115 )
116 .chain(
117 maybe_get_dataset_readers(dataset_dir, "revision_history", NodeType::Revision)?
118 .into_par_iter()
119 .map(count_parent_edge_types_from_rev),
120 )
121 .chain(
122 maybe_get_dataset_readers(dataset_dir, "snapshot_branch", NodeType::Snapshot)?
123 .into_par_iter()
124 .map(count_edge_types_from_snp),
125 ))
126}
127
128fn for_each_edge<T, F, R: ChunkReader + Send>(reader_builder: ArrowReaderBuilder<R>, mut f: F)
129where
130 F: FnMut(T) + Send + Sync,
131 T: ArRowDeserialize + ArRowStruct + Send,
132{
133 iter_arrow(reader_builder, move |record: T| -> [(); 0] {
134 f(record);
135 []
136 })
137 .count();
138}
139
140fn inc(stats: &mut EdgeStats, src_type: NodeType, dst_type: NodeType) {
141 stats[src_type as usize][dst_type as usize] += 1;
142}
143
144fn count_edge_types_from_dir<R: ChunkReader + Send>(
145 reader_builder: ArrowReaderBuilder<R>,
146) -> EdgeStats {
147 let mut stats = EdgeStats::default();
148
149 #[derive(ArRowDeserialize, Default, Clone)]
150 struct DirectoryEntry {
151 r#type: String,
152 }
153
154 for_each_edge(reader_builder, |entry: DirectoryEntry| {
155 match entry.r#type.as_bytes() {
156 b"file" => {
157 inc(&mut stats, NodeType::Directory, NodeType::Content);
158 }
159 b"dir" => {
160 inc(&mut stats, NodeType::Directory, NodeType::Directory);
161 }
162 b"rev" => {
163 inc(&mut stats, NodeType::Directory, NodeType::Revision);
164 }
165 _ => panic!("Unexpected directory entry type: {:?}", entry.r#type),
166 }
167 });
168
169 stats
170}
171
172fn count_edge_types_from_ovs<R: ChunkReader + Send>(
173 reader_builder: ArrowReaderBuilder<R>,
174) -> EdgeStats {
175 let mut stats = EdgeStats::default();
176
177 #[derive(ArRowDeserialize, Default, Clone)]
178 struct OriginVisitStatus {
179 snapshot: Option<String>,
180 }
181
182 for_each_edge(reader_builder, |ovs: OriginVisitStatus| {
183 if ovs.snapshot.is_some() {
184 inc(&mut stats, NodeType::Origin, NodeType::Snapshot)
185 }
186 });
187
188 stats
189}
190
191fn count_dir_edge_types_from_rev<R: ChunkReader + Send>(
192 reader_builder: ArrowReaderBuilder<R>,
193) -> EdgeStats {
194 let mut stats = EdgeStats::default();
195
196 stats[NodeType::Revision as usize][NodeType::Directory as usize] +=
197 count_arrow_rows(reader_builder) as usize;
198
199 stats
200}
201
202fn count_parent_edge_types_from_rev<R: ChunkReader + Send>(
203 reader_builder: ArrowReaderBuilder<R>,
204) -> EdgeStats {
205 let mut stats = EdgeStats::default();
206
207 stats[NodeType::Revision as usize][NodeType::Revision as usize] +=
208 count_arrow_rows(reader_builder) as usize;
209
210 stats
211}
212
213fn count_edge_types_from_rel<R: ChunkReader + Send>(
214 reader_builder: ArrowReaderBuilder<R>,
215) -> EdgeStats {
216 let mut stats = EdgeStats::default();
217 #[derive(ArRowDeserialize, Default, Clone)]
218 struct Release {
219 target_type: String,
220 }
221
222 for_each_edge(reader_builder, |entry: Release| {
223 match entry.target_type.as_bytes() {
224 b"content" => {
225 inc(&mut stats, NodeType::Release, NodeType::Content);
226 }
227 b"directory" => {
228 inc(&mut stats, NodeType::Release, NodeType::Directory);
229 }
230 b"revision" => {
231 inc(&mut stats, NodeType::Release, NodeType::Revision);
232 }
233 b"release" => {
234 inc(&mut stats, NodeType::Release, NodeType::Release);
235 }
236 _ => panic!("Unexpected directory entry type: {:?}", entry.target_type),
237 }
238 });
239
240 stats
241}
242
243fn count_edge_types_from_snp<R: ChunkReader + Send>(
244 reader_builder: ArrowReaderBuilder<R>,
245) -> EdgeStats {
246 let mut stats = EdgeStats::default();
247
248 #[derive(ArRowDeserialize, Default, Clone)]
249 struct SnapshotBranch {
250 target_type: String,
251 }
252
253 for_each_edge(reader_builder, |branch: SnapshotBranch| {
254 match branch.target_type.as_bytes() {
255 b"content" => {
256 inc(&mut stats, NodeType::Snapshot, NodeType::Content);
257 }
258 b"directory" => {
259 inc(&mut stats, NodeType::Snapshot, NodeType::Directory);
260 }
261 b"revision" => {
262 inc(&mut stats, NodeType::Snapshot, NodeType::Revision);
263 }
264 b"release" => {
265 inc(&mut stats, NodeType::Snapshot, NodeType::Release);
266 }
267 b"alias" => {}
268 _ => panic!("Unexpected snapshot branch type: {:?}", branch.target_type),
269 }
270 });
271
272 stats
273}