rs_names2stats2arrow_ipc_stream/
lib.rs

1use std::fs::Metadata;
2use std::io;
3use std::sync::Arc;
4use std::time::SystemTime;
5
6use io::BufRead;
7
8use std::path::Path;
9
10use arrow::datatypes::DataType;
11use arrow::datatypes::Field;
12use arrow::datatypes::Schema;
13use arrow::datatypes::SchemaRef;
14use arrow::datatypes::TimeUnit;
15
16use arrow::array::Array;
17use arrow::array::BooleanBuilder;
18use arrow::array::StringBuilder;
19use arrow::array::TimestampSecondBuilder;
20use arrow::array::UInt32Builder;
21use arrow::array::UInt64Builder;
22
23use arrow::record_batch::RecordBatch;
24
25pub fn path2meta<P>(p: P) -> Result<Metadata, io::Error>
26where
27    P: AsRef<Path>,
28{
29    std::fs::metadata(p)
30}
31
32pub enum FileType {
33    Dir,
34    File,
35    Symlink,
36    Unspecified,
37}
38
39impl FileType {
40    pub fn name(&self) -> &str {
41        match self {
42            FileType::Dir => "dir",
43            FileType::File => "file",
44            FileType::Symlink => "symlink",
45            FileType::Unspecified => "unspecified",
46        }
47    }
48}
49
50pub struct FileMeta<'a>(pub &'a Metadata);
51
52impl<'a> FileMeta<'a> {
53    pub fn file_type(&self) -> FileType {
54        let ft = self.0.file_type();
55        if ft.is_dir() {
56            FileType::Dir
57        } else if ft.is_symlink() {
58            FileType::Symlink
59        } else if ft.is_file() {
60            FileType::File
61        } else {
62            FileType::Unspecified
63        }
64    }
65
66    pub fn accessed(&self) -> Result<SystemTime, io::Error> {
67        self.0.accessed()
68    }
69
70    pub fn read_only(&self) -> bool {
71        self.0.permissions().readonly()
72    }
73}
74
75#[cfg(unix)]
76impl<'a> FileMeta<'a> {
77    pub fn mode(&self) -> u32 {
78        std::os::unix::fs::MetadataExt::mode(self.0)
79    }
80    pub fn nlink(&self) -> u64 {
81        std::os::unix::fs::MetadataExt::nlink(self.0)
82    }
83    pub fn uid(&self) -> u32 {
84        std::os::unix::fs::MetadataExt::uid(self.0)
85    }
86    pub fn gid(&self) -> u32 {
87        std::os::unix::fs::MetadataExt::gid(self.0)
88    }
89}
90
91pub fn stdin2lines() -> impl Iterator<Item = Result<String, io::Error>> {
92    io::stdin().lock().lines()
93}
94
95pub fn schema() -> SchemaRef {
96    Schema::new(vec![
97        Field::new("path", DataType::Utf8, false),
98        Field::new("type", DataType::Utf8, false),
99        Field::new("read_only", DataType::Boolean, false),
100        Field::new("mode", DataType::UInt32, true),
101        Field::new("nlink", DataType::UInt64, true),
102        Field::new("len", DataType::UInt64, false),
103        Field::new("uid", DataType::UInt32, true),
104        Field::new("gid", DataType::UInt32, true),
105        Field::new("mtime", DataType::Timestamp(TimeUnit::Second, None), true),
106    ])
107    .into()
108}
109
110#[cfg(unix)]
111pub fn lines2batch<I>(
112    lines: &mut I,
113    schema: SchemaRef,
114    bldr: &mut Builder,
115) -> Result<Option<RecordBatch>, io::Error>
116where
117    I: Iterator<Item = Result<String, io::Error>>,
118{
119    for rline in lines {
120        let line: String = rline?;
121        let meta: Metadata = path2meta(&line)?;
122        let fmet = FileMeta(&meta);
123        bldr.append_path(line);
124        bldr.append_type(fmet.file_type().name());
125        bldr.append_read_only(fmet.read_only());
126        bldr.append_mode(Some(fmet.mode()));
127        bldr.append_nlink(Some(fmet.nlink()));
128        bldr.append_len(meta.len());
129        bldr.append_uid(Some(fmet.uid()));
130        bldr.append_gid(Some(fmet.gid()));
131        let mtime_secs = meta
132            .modified()?
133            .duration_since(SystemTime::UNIX_EPOCH)
134            .ok()
135            .map(|d| d.as_secs() as i64);
136        bldr.append_mtime(mtime_secs);
137    }
138
139    if bldr.is_empty() {
140        return Ok(None);
141    }
142
143    let apath: Arc<dyn Array> = bldr.finish_path();
144    let atype: Arc<dyn Array> = bldr.finish_type();
145    let aread_only: Arc<dyn Array> = bldr.finish_read_only();
146    let amode: Arc<dyn Array> = bldr.finish_mode();
147    let anlink: Arc<dyn Array> = bldr.finish_nlink();
148    let alen: Arc<dyn Array> = bldr.finish_len();
149    let auid: Arc<dyn Array> = bldr.finish_uid();
150    let agid: Arc<dyn Array> = bldr.finish_gid();
151    let amtime: Arc<dyn Array> = bldr.finish_mtime();
152
153    RecordBatch::try_new(
154        schema,
155        vec![
156            apath, atype, aread_only, amode, anlink, alen, auid, agid, amtime,
157        ],
158    )
159    .map_err(io::Error::other)
160    .map(Some)
161}
162
163pub struct Builder {
164    pub path: StringBuilder,
165    pub file_type: StringBuilder,
166    pub read_only: BooleanBuilder,
167    pub mode: UInt32Builder,
168    pub nlink: UInt64Builder,
169    pub len: UInt64Builder,
170    pub uid: UInt32Builder,
171    pub gid: UInt32Builder,
172    pub mtime: TimestampSecondBuilder,
173}
174
175impl Builder {
176    pub fn append_path(&mut self, p: String) {
177        self.path.append_value(p)
178    }
179
180    pub fn append_type(&mut self, t: &str) {
181        self.file_type.append_value(t)
182    }
183    pub fn append_read_only(&mut self, r: bool) {
184        self.read_only.append_value(r)
185    }
186    pub fn append_mode(&mut self, m: Option<u32>) {
187        self.mode.append_option(m)
188    }
189    pub fn append_nlink(&mut self, l: Option<u64>) {
190        self.nlink.append_option(l)
191    }
192    pub fn append_len(&mut self, l: u64) {
193        self.len.append_value(l)
194    }
195    pub fn append_uid(&mut self, l: Option<u32>) {
196        self.uid.append_option(l)
197    }
198    pub fn append_gid(&mut self, l: Option<u32>) {
199        self.gid.append_option(l)
200    }
201    pub fn append_mtime(&mut self, t: Option<i64>) {
202        self.mtime.append_option(t)
203    }
204}
205
206impl Builder {
207    pub fn is_empty(&self) -> bool {
208        self.path.values_slice().is_empty()
209    }
210}
211
212impl Builder {
213    pub fn finish_path(&mut self) -> Arc<dyn Array> {
214        Arc::new(self.path.finish())
215    }
216
217    pub fn finish_type(&mut self) -> Arc<dyn Array> {
218        Arc::new(self.file_type.finish())
219    }
220    pub fn finish_read_only(&mut self) -> Arc<dyn Array> {
221        Arc::new(self.read_only.finish())
222    }
223    pub fn finish_mode(&mut self) -> Arc<dyn Array> {
224        Arc::new(self.mode.finish())
225    }
226    pub fn finish_nlink(&mut self) -> Arc<dyn Array> {
227        Arc::new(self.nlink.finish())
228    }
229    pub fn finish_len(&mut self) -> Arc<dyn Array> {
230        Arc::new(self.len.finish())
231    }
232    pub fn finish_uid(&mut self) -> Arc<dyn Array> {
233        Arc::new(self.uid.finish())
234    }
235    pub fn finish_gid(&mut self) -> Arc<dyn Array> {
236        Arc::new(self.gid.finish())
237    }
238    pub fn finish_mtime(&mut self) -> Arc<dyn Array> {
239        Arc::new(self.mtime.finish())
240    }
241}
242
243#[cfg(unix)]
244pub fn lines2batch_iter<I>(
245    lines: I,
246    schema: SchemaRef,
247    batch_size: usize,
248) -> Result<impl Iterator<Item = Result<RecordBatch, io::Error>>, io::Error>
249where
250    I: Iterator<Item = Result<String, io::Error>>,
251{
252    Ok(Lines2BatchIter {
253        lines,
254        schema,
255        batch_size,
256        bldr: Builder {
257            path: StringBuilder::new(),
258            file_type: StringBuilder::new(),
259            read_only: BooleanBuilder::new(),
260            mode: UInt32Builder::new(),
261            nlink: UInt64Builder::new(),
262            len: UInt64Builder::new(),
263            uid: UInt32Builder::new(),
264            gid: UInt32Builder::new(),
265            mtime: TimestampSecondBuilder::new(),
266        },
267    })
268}
269
270#[cfg(unix)]
271struct Lines2BatchIter<I> {
272    lines: I,
273    schema: SchemaRef,
274    batch_size: usize,
275    bldr: Builder,
276}
277
278#[cfg(unix)]
279impl<I> Iterator for Lines2BatchIter<I>
280where
281    I: Iterator<Item = Result<String, io::Error>>,
282{
283    type Item = Result<RecordBatch, io::Error>;
284
285    fn next(&mut self) -> Option<Self::Item> {
286        let mut taken = (&mut self.lines).take(self.batch_size);
287
288        let robat = lines2batch(&mut taken, self.schema.clone(), &mut self.bldr);
289
290        match robat {
291            Err(e) => Some(Err(e)),
292            Ok(None) => None,
293            Ok(Some(r)) => Some(Ok(r)),
294        }
295    }
296}