swh_graph/compress/
zst_dir.rs1use std::io::BufRead;
11use std::path::Path;
12
13use dsi_progress_logger::{ConcurrentProgressLog, ProgressLog};
14use rayon::prelude::*;
15
16pub struct ByteLines<B: std::io::BufRead> {
18 buf: B,
19}
20
21impl<B: BufRead> Iterator for ByteLines<B> {
22 type Item = std::io::Result<Vec<u8>>;
23
24 fn next(&mut self) -> Option<std::io::Result<Vec<u8>>> {
25 let mut buf = Vec::new();
26 match self.buf.read_until(b'\n', &mut buf) {
27 Ok(0) => None,
28 Ok(_n) => {
29 if buf.last() == Some(&b'\n') {
30 buf.pop();
31 if buf.last() == Some(&b'\r') {
32 buf.pop();
33 }
34 }
35 Some(Ok(buf))
36 }
37 Err(e) => Some(Err(e)),
38 }
39 }
40}
41
42pub trait ToByteLines: std::io::BufRead + Sized {
43 fn byte_lines(self) -> ByteLines<Self> {
44 ByteLines { buf: self }
45 }
46}
47
48impl<B: std::io::BufRead> ToByteLines for B {}
49
50pub fn iter_lines_from_file<'a, Line>(
52 path: &Path,
53 mut pl: impl ProgressLog + 'a,
54) -> impl Iterator<Item = Line> + 'a
55where
56 Line: TryFrom<Vec<u8>>,
57 <Line as TryFrom<Vec<u8>>>::Error: std::fmt::Debug,
58{
59 std::io::BufReader::new(
60 zstd::stream::read::Decoder::new(
61 std::fs::File::open(path).unwrap_or_else(|e| {
62 panic!("Could not open {} for reading: {:?}", path.display(), e)
63 }),
64 )
65 .unwrap_or_else(|e| panic!("{} is not a ZSTD file: {:?}", path.display(), e)),
66 )
67 .byte_lines()
68 .map(move |line| {
69 pl.light_update();
70 line.unwrap_or_else(|line| panic!("Could not parse swhid {:?}", &line))
71 .try_into()
72 .unwrap_or_else(|line| panic!("Could not parse swhid {:?}", &line))
73 })
74}
75
76pub fn iter_lines_from_dir<'a, Line>(
80 path: &'a Path,
81 pl: impl ConcurrentProgressLog + 'a,
82) -> impl Iterator<Item = Line> + 'a
83where
84 Line: TryFrom<Vec<u8>>,
85 <Line as TryFrom<Vec<u8>>>::Error: std::fmt::Debug,
86{
87 let mut file_paths: Vec<_> = std::fs::read_dir(path)
88 .unwrap_or_else(|e| panic!("Could not list {}: {:?}", path.display(), e))
89 .map(|entry| {
90 entry
91 .as_ref()
92 .unwrap_or_else(|e| panic!("Could not read {} entry: {:?}", path.display(), e))
93 .path()
94 })
95 .collect();
96 file_paths.sort();
97 file_paths
98 .into_iter()
99 .flat_map(move |file_path| iter_lines_from_file(&file_path, pl.clone()))
100}
101
102pub fn par_iter_lines_from_dir<'a, Line>(
106 path: &'a Path,
107 pl: impl ConcurrentProgressLog + 'a,
108) -> impl ParallelIterator<Item = Line> + 'a
109where
110 Line: TryFrom<Vec<u8>> + Send,
111 <Line as TryFrom<Vec<u8>>>::Error: std::fmt::Debug,
112{
113 let mut file_paths: Vec<_> = std::fs::read_dir(path)
114 .unwrap_or_else(|e| panic!("Could not list {}: {:?}", path.display(), e))
115 .map(|entry| {
116 entry
117 .as_ref()
118 .unwrap_or_else(|e| panic!("Could not read {} entry: {:?}", path.display(), e))
119 .path()
120 })
121 .collect();
122 file_paths.sort();
123 file_paths
124 .into_par_iter()
125 .flat_map_iter(move |file_path| iter_lines_from_file(&file_path, pl.clone()))
126}