rs_names2stats2arrow_ipc_stream/
lib.rs1use std::fs::Metadata;
2use std::io;
3use std::sync::Arc;
4use std::time::SystemTime;
5
6use io::BufRead;
7
8use std::path::Path;
9
10use arrow::datatypes::DataType;
11use arrow::datatypes::Field;
12use arrow::datatypes::Schema;
13use arrow::datatypes::SchemaRef;
14use arrow::datatypes::TimeUnit;
15
16use arrow::array::Array;
17use arrow::array::BooleanBuilder;
18use arrow::array::StringBuilder;
19use arrow::array::TimestampSecondBuilder;
20use arrow::array::UInt32Builder;
21use arrow::array::UInt64Builder;
22
23use arrow::record_batch::RecordBatch;
24
25pub fn path2meta<P>(p: P) -> Result<Metadata, io::Error>
26where
27 P: AsRef<Path>,
28{
29 std::fs::metadata(p)
30}
31
32pub enum FileType {
33 Dir,
34 File,
35 Symlink,
36 Unspecified,
37}
38
39impl FileType {
40 pub fn name(&self) -> &str {
41 match self {
42 FileType::Dir => "dir",
43 FileType::File => "file",
44 FileType::Symlink => "symlink",
45 FileType::Unspecified => "unspecified",
46 }
47 }
48}
49
50pub struct FileMeta<'a>(pub &'a Metadata);
51
52impl<'a> FileMeta<'a> {
53 pub fn file_type(&self) -> FileType {
54 let ft = self.0.file_type();
55 if ft.is_dir() {
56 FileType::Dir
57 } else if ft.is_symlink() {
58 FileType::Symlink
59 } else if ft.is_file() {
60 FileType::File
61 } else {
62 FileType::Unspecified
63 }
64 }
65
66 pub fn accessed(&self) -> Result<SystemTime, io::Error> {
67 self.0.accessed()
68 }
69
70 pub fn read_only(&self) -> bool {
71 self.0.permissions().readonly()
72 }
73}
74
75#[cfg(unix)]
76impl<'a> FileMeta<'a> {
77 pub fn mode(&self) -> u32 {
78 std::os::unix::fs::MetadataExt::mode(self.0)
79 }
80 pub fn nlink(&self) -> u64 {
81 std::os::unix::fs::MetadataExt::nlink(self.0)
82 }
83 pub fn uid(&self) -> u32 {
84 std::os::unix::fs::MetadataExt::uid(self.0)
85 }
86 pub fn gid(&self) -> u32 {
87 std::os::unix::fs::MetadataExt::gid(self.0)
88 }
89}
90
91pub fn stdin2lines() -> impl Iterator<Item = Result<String, io::Error>> {
92 io::stdin().lock().lines()
93}
94
95pub fn schema() -> SchemaRef {
96 Schema::new(vec![
97 Field::new("path", DataType::Utf8, false),
98 Field::new("type", DataType::Utf8, false),
99 Field::new("read_only", DataType::Boolean, false),
100 Field::new("mode", DataType::UInt32, true),
101 Field::new("nlink", DataType::UInt64, true),
102 Field::new("len", DataType::UInt64, false),
103 Field::new("uid", DataType::UInt32, true),
104 Field::new("gid", DataType::UInt32, true),
105 Field::new("mtime", DataType::Timestamp(TimeUnit::Second, None), true),
106 ])
107 .into()
108}
109
110#[cfg(unix)]
111pub fn lines2batch<I>(
112 lines: &mut I,
113 schema: SchemaRef,
114 bldr: &mut Builder,
115) -> Result<Option<RecordBatch>, io::Error>
116where
117 I: Iterator<Item = Result<String, io::Error>>,
118{
119 for rline in lines {
120 let line: String = rline?;
121 let meta: Metadata = path2meta(&line)?;
122 let fmet = FileMeta(&meta);
123 bldr.append_path(line);
124 bldr.append_type(fmet.file_type().name());
125 bldr.append_read_only(fmet.read_only());
126 bldr.append_mode(Some(fmet.mode()));
127 bldr.append_nlink(Some(fmet.nlink()));
128 bldr.append_len(meta.len());
129 bldr.append_uid(Some(fmet.uid()));
130 bldr.append_gid(Some(fmet.gid()));
131 let mtime_secs = meta
132 .modified()?
133 .duration_since(SystemTime::UNIX_EPOCH)
134 .ok()
135 .map(|d| d.as_secs() as i64);
136 bldr.append_mtime(mtime_secs);
137 }
138
139 if bldr.is_empty() {
140 return Ok(None);
141 }
142
143 let apath: Arc<dyn Array> = bldr.finish_path();
144 let atype: Arc<dyn Array> = bldr.finish_type();
145 let aread_only: Arc<dyn Array> = bldr.finish_read_only();
146 let amode: Arc<dyn Array> = bldr.finish_mode();
147 let anlink: Arc<dyn Array> = bldr.finish_nlink();
148 let alen: Arc<dyn Array> = bldr.finish_len();
149 let auid: Arc<dyn Array> = bldr.finish_uid();
150 let agid: Arc<dyn Array> = bldr.finish_gid();
151 let amtime: Arc<dyn Array> = bldr.finish_mtime();
152
153 RecordBatch::try_new(
154 schema,
155 vec![
156 apath, atype, aread_only, amode, anlink, alen, auid, agid, amtime,
157 ],
158 )
159 .map_err(io::Error::other)
160 .map(Some)
161}
162
163pub struct Builder {
164 pub path: StringBuilder,
165 pub file_type: StringBuilder,
166 pub read_only: BooleanBuilder,
167 pub mode: UInt32Builder,
168 pub nlink: UInt64Builder,
169 pub len: UInt64Builder,
170 pub uid: UInt32Builder,
171 pub gid: UInt32Builder,
172 pub mtime: TimestampSecondBuilder,
173}
174
175impl Builder {
176 pub fn append_path(&mut self, p: String) {
177 self.path.append_value(p)
178 }
179
180 pub fn append_type(&mut self, t: &str) {
181 self.file_type.append_value(t)
182 }
183 pub fn append_read_only(&mut self, r: bool) {
184 self.read_only.append_value(r)
185 }
186 pub fn append_mode(&mut self, m: Option<u32>) {
187 self.mode.append_option(m)
188 }
189 pub fn append_nlink(&mut self, l: Option<u64>) {
190 self.nlink.append_option(l)
191 }
192 pub fn append_len(&mut self, l: u64) {
193 self.len.append_value(l)
194 }
195 pub fn append_uid(&mut self, l: Option<u32>) {
196 self.uid.append_option(l)
197 }
198 pub fn append_gid(&mut self, l: Option<u32>) {
199 self.gid.append_option(l)
200 }
201 pub fn append_mtime(&mut self, t: Option<i64>) {
202 self.mtime.append_option(t)
203 }
204}
205
206impl Builder {
207 pub fn is_empty(&self) -> bool {
208 self.path.values_slice().is_empty()
209 }
210}
211
212impl Builder {
213 pub fn finish_path(&mut self) -> Arc<dyn Array> {
214 Arc::new(self.path.finish())
215 }
216
217 pub fn finish_type(&mut self) -> Arc<dyn Array> {
218 Arc::new(self.file_type.finish())
219 }
220 pub fn finish_read_only(&mut self) -> Arc<dyn Array> {
221 Arc::new(self.read_only.finish())
222 }
223 pub fn finish_mode(&mut self) -> Arc<dyn Array> {
224 Arc::new(self.mode.finish())
225 }
226 pub fn finish_nlink(&mut self) -> Arc<dyn Array> {
227 Arc::new(self.nlink.finish())
228 }
229 pub fn finish_len(&mut self) -> Arc<dyn Array> {
230 Arc::new(self.len.finish())
231 }
232 pub fn finish_uid(&mut self) -> Arc<dyn Array> {
233 Arc::new(self.uid.finish())
234 }
235 pub fn finish_gid(&mut self) -> Arc<dyn Array> {
236 Arc::new(self.gid.finish())
237 }
238 pub fn finish_mtime(&mut self) -> Arc<dyn Array> {
239 Arc::new(self.mtime.finish())
240 }
241}
242
243#[cfg(unix)]
244pub fn lines2batch_iter<I>(
245 lines: I,
246 schema: SchemaRef,
247 batch_size: usize,
248) -> Result<impl Iterator<Item = Result<RecordBatch, io::Error>>, io::Error>
249where
250 I: Iterator<Item = Result<String, io::Error>>,
251{
252 Ok(Lines2BatchIter {
253 lines,
254 schema,
255 batch_size,
256 bldr: Builder {
257 path: StringBuilder::new(),
258 file_type: StringBuilder::new(),
259 read_only: BooleanBuilder::new(),
260 mode: UInt32Builder::new(),
261 nlink: UInt64Builder::new(),
262 len: UInt64Builder::new(),
263 uid: UInt32Builder::new(),
264 gid: UInt32Builder::new(),
265 mtime: TimestampSecondBuilder::new(),
266 },
267 })
268}
269
270#[cfg(unix)]
271struct Lines2BatchIter<I> {
272 lines: I,
273 schema: SchemaRef,
274 batch_size: usize,
275 bldr: Builder,
276}
277
278#[cfg(unix)]
279impl<I> Iterator for Lines2BatchIter<I>
280where
281 I: Iterator<Item = Result<String, io::Error>>,
282{
283 type Item = Result<RecordBatch, io::Error>;
284
285 fn next(&mut self) -> Option<Self::Item> {
286 let mut taken = (&mut self.lines).take(self.batch_size);
287
288 let robat = lines2batch(&mut taken, self.schema.clone(), &mut self.bldr);
289
290 match robat {
291 Err(e) => Some(Err(e)),
292 Ok(None) => None,
293 Ok(Some(r)) => Some(Ok(r)),
294 }
295 }
296}