swh_graph/compress/
zst_dir.rs

1/*
2 * Copyright (C) 2023-2025  The Software Heritage developers
3 * See the AUTHORS file at the top-level directory of this distribution
4 * License: GNU General Public License version 3, or any later version
5 * See top-level LICENSE file for more information
6 */
7
8//! Iterators on newline-separated ZSTD-compressed files.
9
10use std::io::BufRead;
11use std::path::Path;
12
13use dsi_progress_logger::{ConcurrentProgressLog, ProgressLog};
14use rayon::prelude::*;
15
16// Inspired from https://archive.softwareheritage.org/swh:1:cnt:5c1d2d8f46cd47edf2adb15f5b7642098e03883f;origin=https://github.com/rust-lang/rust;visit=swh:1:snp:e93a6ff91a26c85dfe1d515afa437ab63e290357;anchor=swh:1:rev:c67cb3e577bdd4de640eb11d96cd5ef5afe0eb0b;path=/library/std/src/io/mod.rs;lines=2847-2871
17pub struct ByteLines<B: std::io::BufRead> {
18    buf: B,
19}
20
21impl<B: BufRead> Iterator for ByteLines<B> {
22    type Item = std::io::Result<Vec<u8>>;
23
24    fn next(&mut self) -> Option<std::io::Result<Vec<u8>>> {
25        let mut buf = Vec::new();
26        match self.buf.read_until(b'\n', &mut buf) {
27            Ok(0) => None,
28            Ok(_n) => {
29                if buf.last() == Some(&b'\n') {
30                    buf.pop();
31                    if buf.last() == Some(&b'\r') {
32                        buf.pop();
33                    }
34                }
35                Some(Ok(buf))
36            }
37            Err(e) => Some(Err(e)),
38        }
39    }
40}
41
42pub trait ToByteLines: std::io::BufRead + Sized {
43    fn byte_lines(self) -> ByteLines<Self> {
44        ByteLines { buf: self }
45    }
46}
47
48impl<B: std::io::BufRead> ToByteLines for B {}
49
50/// Yields textual lines from a newline-separated ZSTD-compressed file
51pub fn iter_lines_from_file<'a, Line>(
52    path: &Path,
53    mut pl: impl ProgressLog + 'a,
54) -> impl Iterator<Item = Line> + 'a
55where
56    Line: TryFrom<Vec<u8>>,
57    <Line as TryFrom<Vec<u8>>>::Error: std::fmt::Debug,
58{
59    std::io::BufReader::new(
60        zstd::stream::read::Decoder::new(
61            std::fs::File::open(path).unwrap_or_else(|e| {
62                panic!("Could not open {} for reading: {:?}", path.display(), e)
63            }),
64        )
65        .unwrap_or_else(|e| panic!("{} is not a ZSTD file: {:?}", path.display(), e)),
66    )
67    .byte_lines()
68    .map(move |line| {
69        pl.light_update();
70        line.unwrap_or_else(|line| panic!("Could not parse swhid {:?}", &line))
71            .try_into()
72            .unwrap_or_else(|line| panic!("Could not parse swhid {:?}", &line))
73    })
74}
75
76/// Yields textual swhids from a directory of newline-separated ZSTD-compressed files.
77///
78/// Files are read in alphabetical order of their name.
79pub fn iter_lines_from_dir<'a, Line>(
80    path: &'a Path,
81    pl: impl ConcurrentProgressLog + 'a,
82) -> impl Iterator<Item = Line> + 'a
83where
84    Line: TryFrom<Vec<u8>>,
85    <Line as TryFrom<Vec<u8>>>::Error: std::fmt::Debug,
86{
87    let mut file_paths: Vec<_> = std::fs::read_dir(path)
88        .unwrap_or_else(|e| panic!("Could not list {}: {:?}", path.display(), e))
89        .map(|entry| {
90            entry
91                .as_ref()
92                .unwrap_or_else(|e| panic!("Could not read {} entry: {:?}", path.display(), e))
93                .path()
94        })
95        .collect();
96    file_paths.sort();
97    file_paths
98        .into_iter()
99        .flat_map(move |file_path| iter_lines_from_file(&file_path, pl.clone()))
100}
101
102/// Yields textual swhids from a directory of newline-separated ZSTD-compressed files
103///
104/// Files are read in alphabetical order of their name.
105pub fn par_iter_lines_from_dir<'a, Line>(
106    path: &'a Path,
107    pl: impl ConcurrentProgressLog + 'a,
108) -> impl ParallelIterator<Item = Line> + 'a
109where
110    Line: TryFrom<Vec<u8>> + Send,
111    <Line as TryFrom<Vec<u8>>>::Error: std::fmt::Debug,
112{
113    let mut file_paths: Vec<_> = std::fs::read_dir(path)
114        .unwrap_or_else(|e| panic!("Could not list {}: {:?}", path.display(), e))
115        .map(|entry| {
116            entry
117                .as_ref()
118                .unwrap_or_else(|e| panic!("Could not read {} entry: {:?}", path.display(), e))
119                .path()
120        })
121        .collect();
122    file_paths.sort();
123    file_paths
124        .into_par_iter()
125        .flat_map_iter(move |file_path| iter_lines_from_file(&file_path, pl.clone()))
126}