swh_graph/compress/
iter_arcs.rs

1// Copyright (C) 2023-2024  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6//! Iterator on the set of all arcs in an ORC dataset
7
8use std::path::PathBuf;
9
10use anyhow::Result;
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use ar_row_derive::ArRowDeserialize;
13use orc_rust::arrow_reader::ArrowReaderBuilder;
14use orc_rust::reader::ChunkReader;
15use rayon::prelude::*;
16
17use super::orc::{get_dataset_readers, iter_arrow};
18use super::TextSwhid;
19use crate::NodeType;
20use crate::SWHID;
21
22pub fn iter_arcs(
23    dataset_dir: &PathBuf,
24    allowed_node_types: &[NodeType],
25) -> Result<impl ParallelIterator<Item = (TextSwhid, TextSwhid)>> {
26    let maybe_get_dataset_readers = |dataset_dir, subdirectory, node_type| {
27        if allowed_node_types.contains(&node_type) {
28            get_dataset_readers(dataset_dir, subdirectory)
29        } else {
30            Ok(Vec::new())
31        }
32    };
33
34    Ok([]
35        .into_par_iter()
36        .chain(
37            maybe_get_dataset_readers(dataset_dir, "directory_entry", NodeType::Directory)?
38                .into_par_iter()
39                .flat_map_iter(iter_arcs_from_dir_entry),
40        )
41        .chain(
42            maybe_get_dataset_readers(dataset_dir, "origin_visit_status", NodeType::Origin)?
43                .into_par_iter()
44                .flat_map_iter(iter_arcs_from_ovs),
45        )
46        .chain(
47            maybe_get_dataset_readers(dataset_dir, "release", NodeType::Release)?
48                .into_par_iter()
49                .flat_map_iter(iter_arcs_from_rel),
50        )
51        .chain(
52            maybe_get_dataset_readers(dataset_dir, "revision", NodeType::Revision)?
53                .into_par_iter()
54                .flat_map_iter(iter_arcs_from_rev),
55        )
56        .chain(
57            maybe_get_dataset_readers(dataset_dir, "revision_history", NodeType::Revision)?
58                .into_par_iter()
59                .flat_map_iter(iter_arcs_from_rev_history),
60        )
61        .chain(
62            maybe_get_dataset_readers(dataset_dir, "snapshot_branch", NodeType::Snapshot)?
63                .into_par_iter()
64                .flat_map_iter(iter_arcs_from_snp_branch),
65        ))
66}
67
68fn map_arcs<R: ChunkReader + Send, T, F>(
69    reader_builder: ArrowReaderBuilder<R>,
70    f: F,
71) -> impl Iterator<Item = (TextSwhid, TextSwhid)>
72where
73    F: Fn(T) -> Option<(String, String)> + Send + Sync,
74    T: ArRowDeserialize + ArRowStruct + Send,
75{
76    iter_arrow(reader_builder, move |record: T| {
77        f(record).map(|(src_swhid, dst_swhid)| {
78            (
79                src_swhid.as_bytes().try_into().unwrap(),
80                dst_swhid.as_bytes().try_into().unwrap(),
81            )
82        })
83    })
84}
85
86fn iter_arcs_from_dir_entry<R: ChunkReader + Send>(
87    reader_builder: ArrowReaderBuilder<R>,
88) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
89    #[derive(ArRowDeserialize, Default, Clone)]
90    struct DirectoryEntry {
91        directory_id: String,
92        r#type: String,
93        target: String,
94    }
95
96    map_arcs(reader_builder, |entry: DirectoryEntry| {
97        Some((
98            format!("swh:1:dir:{}", entry.directory_id),
99            match entry.r#type.as_bytes() {
100                b"file" => format!("swh:1:cnt:{}", entry.target),
101                b"dir" => format!("swh:1:dir:{}", entry.target),
102                b"rev" => format!("swh:1:rev:{}", entry.target),
103                _ => panic!("Unexpected directory entry type: {:?}", entry.r#type),
104            },
105        ))
106    })
107}
108
109pub(super) fn iter_arcs_from_ovs<R: ChunkReader + Send>(
110    reader_builder: ArrowReaderBuilder<R>,
111) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
112    #[derive(ArRowDeserialize, Default, Clone)]
113    struct OriginVisitStatus {
114        origin: String,
115        snapshot: Option<String>,
116    }
117
118    map_arcs(reader_builder, |ovs: OriginVisitStatus| {
119        ovs.snapshot.as_ref().map(|snapshot| {
120            (
121                SWHID::from_origin_url(ovs.origin).to_string(),
122                format!("swh:1:snp:{snapshot}"),
123            )
124        })
125    })
126}
127
128pub(super) fn iter_arcs_from_rel<R: ChunkReader + Send>(
129    reader_builder: ArrowReaderBuilder<R>,
130) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
131    #[derive(ArRowDeserialize, Default, Clone)]
132    struct Release {
133        id: String,
134        target: String,
135        target_type: String,
136    }
137
138    map_arcs(reader_builder, |entry: Release| {
139        Some((
140            format!("swh:1:rel:{}", entry.id),
141            match entry.target_type.as_bytes() {
142                b"content" => format!("swh:1:cnt:{}", entry.target),
143                b"directory" => format!("swh:1:dir:{}", entry.target),
144                b"revision" => format!("swh:1:rev:{}", entry.target),
145                b"release" => format!("swh:1:rel:{}", entry.target),
146                _ => panic!("Unexpected release target type: {:?}", entry.target_type),
147            },
148        ))
149    })
150}
151
152pub(super) fn iter_arcs_from_rev<R: ChunkReader + Send>(
153    reader_builder: ArrowReaderBuilder<R>,
154) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
155    #[derive(ArRowDeserialize, Default, Clone)]
156    struct Revision {
157        id: String,
158        directory: String,
159    }
160
161    map_arcs(reader_builder, |rev: Revision| {
162        Some((
163            format!("swh:1:rev:{}", rev.id),
164            format!("swh:1:dir:{}", rev.directory),
165        ))
166    })
167}
168
169pub(super) fn iter_arcs_from_rev_history<R: ChunkReader + Send>(
170    reader_builder: ArrowReaderBuilder<R>,
171) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
172    #[derive(ArRowDeserialize, Default, Clone)]
173    struct RevisionParent {
174        id: String,
175        parent_id: String,
176    }
177
178    map_arcs(reader_builder, |rev: RevisionParent| {
179        Some((
180            format!("swh:1:rev:{}", rev.id),
181            format!("swh:1:rev:{}", rev.parent_id),
182        ))
183    })
184}
185
186fn iter_arcs_from_snp_branch<R: ChunkReader + Send>(
187    reader_builder: ArrowReaderBuilder<R>,
188) -> impl Iterator<Item = (TextSwhid, TextSwhid)> {
189    #[derive(ArRowDeserialize, Default, Clone)]
190    struct SnapshotBranch {
191        snapshot_id: String,
192        target: String,
193        target_type: String,
194    }
195
196    map_arcs(reader_builder, |branch: SnapshotBranch| {
197        let dst = match branch.target_type.as_bytes() {
198            b"content" => Some(format!("swh:1:cnt:{}", branch.target)),
199            b"directory" => Some(format!("swh:1:dir:{}", branch.target)),
200            b"revision" => Some(format!("swh:1:rev:{}", branch.target)),
201            b"release" => Some(format!("swh:1:rel:{}", branch.target)),
202            b"alias" => None,
203            _ => panic!("Unexpected snapshot target type: {:?}", branch.target_type),
204        };
205        dst.map(|dst| (format!("swh:1:snp:{}", branch.snapshot_id), dst))
206    })
207}