1pub use arrow;
2pub use futures;
3
4use std::fs::Metadata;
5use std::io;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::SystemTime;
9
10use futures::Stream;
11use futures::StreamExt;
12
13use futures_util::pin_mut;
14
15use async_stream::try_stream;
16
17use arrow::datatypes::DataType;
18use arrow::datatypes::Field;
19use arrow::datatypes::Schema;
20use arrow::datatypes::SchemaRef;
21use arrow::datatypes::TimeUnit;
22
23use arrow::array::{BooleanBuilder, StringBuilder, TimestampMicrosecondBuilder, UInt64Builder};
24
25use arrow::record_batch::RecordBatch;
26
27pub struct BasicDirentInfo {
28 pub filepath: String,
29 pub meta: BasicMetadata,
30}
31
32pub struct BasicMetadata {
33 pub is_dir: bool,
34 pub is_file: bool,
35 pub is_symlink: bool,
36 pub len: u64,
37 pub permissions: String,
38 pub created: Option<SystemTime>,
39 pub modified: Option<SystemTime>,
40 pub accessed: Option<SystemTime>,
41}
42
43pub fn systemtime2unixtime_micros(s: SystemTime) -> Option<u64> {
44 s.duration_since(SystemTime::UNIX_EPOCH)
45 .ok()
46 .map(|m| m.as_micros() as u64)
47}
48
49impl From<Metadata> for BasicMetadata {
50 fn from(m: Metadata) -> Self {
51 Self {
52 is_dir: m.is_dir(),
53 is_file: m.is_file(),
54 is_symlink: m.is_symlink(),
55 len: m.len(),
56 permissions: format!("{:#?}", m.permissions()),
57 created: m.created().ok(),
58 modified: m.modified().ok(),
59 accessed: m.accessed().ok(),
60 }
61 }
62}
63
64pub fn basic_dirent_info_schema() -> Schema {
65 Schema::new(vec![
66 Field::new("filepath", DataType::Utf8, false),
67 Field::new("is_dir", DataType::Boolean, false),
68 Field::new("is_file", DataType::Boolean, false),
69 Field::new("is_symlink", DataType::Boolean, false),
70 Field::new("len", DataType::UInt64, false),
71 Field::new("permissions", DataType::Utf8, false),
72 Field::new(
73 "created",
74 DataType::Timestamp(TimeUnit::Microsecond, None),
75 true,
76 ),
77 Field::new(
78 "modified",
79 DataType::Timestamp(TimeUnit::Microsecond, None),
80 true,
81 ),
82 Field::new(
83 "accessed",
84 DataType::Timestamp(TimeUnit::Microsecond, None),
85 true,
86 ),
87 ])
88}
89
90pub fn path2string<P>(p: P) -> String
91where
92 P: AsRef<Path>,
93{
94 p.as_ref().to_string_lossy().to_string()
95}
96
97pub async fn path2dinfo<P>(p: P) -> Result<BasicDirentInfo, io::Error>
98where
99 P: AsRef<Path>,
100{
101 let m: Metadata = tokio::fs::metadata(p.as_ref()).await?;
102 let meta: BasicMetadata = m.into();
103 Ok(BasicDirentInfo {
104 filepath: path2string(p),
105 meta,
106 })
107}
108
109pub async fn dinfo2batch<S>(dirents: S, schema: SchemaRef) -> Result<RecordBatch, io::Error>
110where
111 S: Stream<Item = Result<BasicDirentInfo, io::Error>>,
112{
113 pin_mut!(dirents);
114
115 let mut filepath_builder = StringBuilder::new();
116 let mut is_dir_builder = BooleanBuilder::new();
117 let mut is_file_builder = BooleanBuilder::new();
118 let mut is_symlink_builder = BooleanBuilder::new();
119 let mut len_builder = UInt64Builder::new();
120 let mut permissions_builder = StringBuilder::new();
121
122 let mut created_builder = TimestampMicrosecondBuilder::new();
123 let mut modified_builder = TimestampMicrosecondBuilder::new();
124 let mut accessed_builder = TimestampMicrosecondBuilder::new();
125
126 while let Some(rbdi) = dirents.next().await {
127 let bdi: BasicDirentInfo = rbdi?;
128 let BasicDirentInfo { filepath, meta } = bdi;
129
130 filepath_builder.append_value(&filepath);
131 is_dir_builder.append_value(meta.is_dir);
132 is_file_builder.append_value(meta.is_file);
133 is_symlink_builder.append_value(meta.is_symlink);
134 len_builder.append_value(meta.len);
135 permissions_builder.append_value(&meta.permissions);
136
137 created_builder.append_option(
138 meta.created
139 .and_then(systemtime2unixtime_micros)
140 .map(|t| t as i64),
141 );
142
143 modified_builder.append_option(
144 meta.modified
145 .and_then(systemtime2unixtime_micros)
146 .map(|t| t as i64),
147 );
148
149 accessed_builder.append_option(
150 meta.accessed
151 .and_then(systemtime2unixtime_micros)
152 .map(|t| t as i64),
153 );
154 }
155
156 let arrays: Vec<Arc<dyn arrow::array::Array>> = vec![
157 Arc::new(filepath_builder.finish()),
158 Arc::new(is_dir_builder.finish()),
159 Arc::new(is_file_builder.finish()),
160 Arc::new(is_symlink_builder.finish()),
161 Arc::new(len_builder.finish()),
162 Arc::new(permissions_builder.finish()),
163 Arc::new(created_builder.finish()),
164 Arc::new(modified_builder.finish()),
165 Arc::new(accessed_builder.finish()),
166 ];
167
168 RecordBatch::try_new(schema, arrays).map_err(io::Error::other)
169}
170
171pub async fn filenames2dirents<S>(
172 mut names: S,
173) -> impl Stream<Item = Result<BasicDirentInfo, io::Error>>
174where
175 S: Stream<Item = Result<String, io::Error>> + Unpin,
176{
177 try_stream! {
178 while let Some(rstr) = names.next().await {
179 let filename: String = rstr?;
180 let dinfo: BasicDirentInfo = path2dinfo(filename).await?;
181 yield dinfo;
182 }
183 }
184}
185
186pub async fn filenames2batch<S>(
188 names: S,
189 schema: Option<SchemaRef>,
190) -> Result<RecordBatch, io::Error>
191where
192 S: Stream<Item = Result<String, io::Error>> + Unpin,
193{
194 let schema = schema.unwrap_or_else(|| Arc::new(basic_dirent_info_schema()));
195 let dirents = filenames2dirents(names).await;
196 dinfo2batch(dirents, schema).await
197}
198
199pub fn stdin2filenames() -> impl Stream<Item = Result<String, io::Error>> {
201 let stdin = tokio::io::stdin();
202 let br = tokio::io::BufReader::new(stdin);
203 let lines = tokio::io::AsyncBufReadExt::lines(br);
204 tokio_stream::wrappers::LinesStream::new(lines)
205}