swh_graph/compress/
iter_labeled_arcs.rs

1// Copyright (C) 2023-2024  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6//! Iterator on the set of all arcs in an ORC dataset, along with their label if any
7
8use std::path::PathBuf;
9
10use anyhow::Result;
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use ar_row_derive::ArRowDeserialize;
13use nonmax::NonMaxU64;
14use orc_rust::arrow_reader::ArrowReaderBuilder;
15use orc_rust::reader::ChunkReader;
16use rayon::prelude::*;
17
18use super::iter_arcs::{iter_arcs_from_rel, iter_arcs_from_rev, iter_arcs_from_rev_history};
19use super::orc::{get_dataset_readers, iter_arrow};
20use super::TextSwhid;
21use crate::compress::label_names::LabelNameHasher;
22use crate::labels::{
23    Branch, DirEntry, EdgeLabel, Permission, UntypedEdgeLabel, Visit, VisitStatus,
24};
25use crate::{NodeType, SWHID};
26
27pub fn iter_labeled_arcs<'a>(
28    dataset_dir: &'a PathBuf,
29    allowed_node_types: &'a [NodeType],
30    label_name_hasher: LabelNameHasher<'a>,
31) -> Result<impl ParallelIterator<Item = (TextSwhid, TextSwhid, Option<NonMaxU64>)> + 'a> {
32    let maybe_get_dataset_readers = |dataset_dir, subdirectory, node_type| {
33        if allowed_node_types.contains(&node_type) {
34            get_dataset_readers(dataset_dir, subdirectory)
35        } else {
36            Ok(Vec::new())
37        }
38    };
39
40    Ok([]
41        .into_par_iter()
42        .chain(
43            maybe_get_dataset_readers(dataset_dir, "directory_entry", NodeType::Directory)?
44                .into_par_iter()
45                .flat_map_iter(move |rb| iter_labeled_arcs_from_dir_entry(rb, label_name_hasher)),
46        )
47        .chain(
48            maybe_get_dataset_readers(dataset_dir, "origin_visit_status", NodeType::Origin)?
49                .into_par_iter()
50                .flat_map_iter(iter_labeled_arcs_from_ovs),
51        )
52        .chain(
53            maybe_get_dataset_readers(dataset_dir, "release", NodeType::Release)?
54                .into_par_iter()
55                .flat_map_iter(iter_arcs_from_rel)
56                .map(|(src, dst)| (src, dst, None)),
57        )
58        .chain(
59            maybe_get_dataset_readers(dataset_dir, "revision", NodeType::Revision)?
60                .into_par_iter()
61                .flat_map_iter(iter_arcs_from_rev)
62                .map(|(src, dst)| (src, dst, None)),
63        )
64        .chain(
65            maybe_get_dataset_readers(dataset_dir, "revision_history", NodeType::Revision)?
66                .into_par_iter()
67                .flat_map_iter(iter_arcs_from_rev_history)
68                .map(|(src, dst)| (src, dst, None)),
69        )
70        .chain(
71            maybe_get_dataset_readers(dataset_dir, "snapshot_branch", NodeType::Snapshot)?
72                .into_par_iter()
73                .flat_map_iter(move |rb| iter_labeled_arcs_from_snp_branch(rb, label_name_hasher)),
74        ))
75}
76
77fn map_labeled_arcs<R: ChunkReader + Send, T, F>(
78    reader_builder: ArrowReaderBuilder<R>,
79    f: F,
80) -> impl Iterator<Item = (TextSwhid, TextSwhid, Option<NonMaxU64>)>
81where
82    F: Fn(T) -> Option<(String, String, Option<EdgeLabel>)> + Send + Sync,
83    T: Send + ArRowDeserialize + ArRowStruct,
84{
85    iter_arrow(reader_builder, move |record: T| {
86        f(record).map(|(src_swhid, dst_swhid, label)| {
87            (
88                src_swhid.as_bytes().try_into().unwrap(),
89                dst_swhid.as_bytes().try_into().unwrap(),
90                label.map(|label| {
91                    UntypedEdgeLabel::from(label)
92                        .0
93                        .try_into()
94                        .expect("label is 0")
95                }),
96            )
97        })
98    })
99}
100
101fn iter_labeled_arcs_from_dir_entry<'a, R: ChunkReader + Send + 'a>(
102    reader_builder: ArrowReaderBuilder<R>,
103    label_name_hasher: LabelNameHasher<'a>,
104) -> impl Iterator<Item = (TextSwhid, TextSwhid, Option<NonMaxU64>)> + 'a {
105    #[derive(ArRowDeserialize, Default, Clone)]
106    struct DirectoryEntry {
107        directory_id: String,
108        name: Box<[u8]>,
109        r#type: String,
110        target: String,
111        perms: i32,
112    }
113
114    map_labeled_arcs(reader_builder, move |entry: DirectoryEntry| {
115        Some((
116            format!("swh:1:dir:{}", entry.directory_id),
117            match entry.r#type.as_bytes() {
118                b"file" => format!("swh:1:cnt:{}", entry.target),
119                b"dir" => format!("swh:1:dir:{}", entry.target),
120                b"rev" => format!("swh:1:rev:{}", entry.target),
121                _ => panic!("Unexpected directory entry type: {:?}", entry.r#type),
122            },
123            DirEntry::new(
124                u16::try_from(entry.perms)
125                    .ok()
126                    .and_then(Permission::from_git)
127                    .unwrap_or(Permission::None),
128                label_name_hasher
129                    .hash(entry.name)
130                    .expect("Could not hash dir entry name"),
131            )
132            .map(EdgeLabel::DirEntry),
133        ))
134    })
135}
136
137fn iter_labeled_arcs_from_ovs<R: ChunkReader + Send>(
138    reader_builder: ArrowReaderBuilder<R>,
139) -> impl Iterator<Item = (TextSwhid, TextSwhid, Option<NonMaxU64>)> {
140    #[derive(ArRowDeserialize, Default, Clone)]
141    struct OriginVisitStatus {
142        origin: String,
143        date: Option<ar_row::Timestamp>,
144        status: String,
145        snapshot: Option<String>,
146    }
147
148    map_labeled_arcs(reader_builder, |ovs: OriginVisitStatus| {
149        ovs.snapshot.as_ref().map(|snapshot| {
150            (
151                SWHID::from_origin_url(ovs.origin).to_string(),
152                format!("swh:1:snp:{snapshot}"),
153                Visit::new(
154                    match ovs.status.as_str() {
155                        "full" => VisitStatus::Full,
156                        _ => VisitStatus::Partial,
157                    },
158                    ovs.date
159                        .unwrap_or(ar_row::Timestamp {
160                            seconds: 0,
161                            nanoseconds: 0,
162                        })
163                        .seconds
164                        .try_into()
165                        .expect("Negative visit date"),
166                )
167                .map(EdgeLabel::Visit),
168            )
169        })
170    })
171}
172
173fn iter_labeled_arcs_from_snp_branch<'a, R: ChunkReader + Send + 'a>(
174    reader_builder: ArrowReaderBuilder<R>,
175    label_name_hasher: LabelNameHasher<'a>,
176) -> impl Iterator<Item = (TextSwhid, TextSwhid, Option<NonMaxU64>)> + 'a {
177    #[derive(ArRowDeserialize, Default, Clone)]
178    struct SnapshotBranch {
179        snapshot_id: String,
180        name: Box<[u8]>,
181        target: String,
182        target_type: String,
183    }
184
185    map_labeled_arcs(reader_builder, move |branch: SnapshotBranch| {
186        let dst = match branch.target_type.as_bytes() {
187            b"content" => Some(format!("swh:1:cnt:{}", branch.target)),
188            b"directory" => Some(format!("swh:1:dir:{}", branch.target)),
189            b"revision" => Some(format!("swh:1:rev:{}", branch.target)),
190            b"release" => Some(format!("swh:1:rel:{}", branch.target)),
191            b"alias" => None,
192            _ => panic!("Unexpected snapshot target type: {:?}", branch.target_type),
193        };
194        dst.map(|dst| {
195            (
196                format!("swh:1:snp:{}", branch.snapshot_id),
197                dst,
198                Branch::new(
199                    label_name_hasher
200                        .hash(branch.name)
201                        .expect("Could not hash branch name"),
202                )
203                .map(EdgeLabel::Branch),
204            )
205        })
206    })
207}