rs_dirents2meta2rbat/
lib.rs

1pub use arrow;
2pub use futures;
3
4use std::fs::Metadata;
5use std::io;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::SystemTime;
9
10use futures::Stream;
11use futures::StreamExt;
12
13use futures_util::pin_mut;
14
15use async_stream::try_stream;
16
17use arrow::datatypes::DataType;
18use arrow::datatypes::Field;
19use arrow::datatypes::Schema;
20use arrow::datatypes::SchemaRef;
21use arrow::datatypes::TimeUnit;
22
23use arrow::array::{BooleanBuilder, StringBuilder, TimestampMicrosecondBuilder, UInt64Builder};
24
25use arrow::record_batch::RecordBatch;
26
27pub struct BasicDirentInfo {
28    pub filepath: String,
29    pub meta: BasicMetadata,
30}
31
32pub struct BasicMetadata {
33    pub is_dir: bool,
34    pub is_file: bool,
35    pub is_symlink: bool,
36    pub len: u64,
37    pub permissions: String,
38    pub created: Option<SystemTime>,
39    pub modified: Option<SystemTime>,
40    pub accessed: Option<SystemTime>,
41}
42
43pub fn systemtime2unixtime_micros(s: SystemTime) -> Option<u64> {
44    s.duration_since(SystemTime::UNIX_EPOCH)
45        .ok()
46        .map(|m| m.as_micros() as u64)
47}
48
49impl From<Metadata> for BasicMetadata {
50    fn from(m: Metadata) -> Self {
51        Self {
52            is_dir: m.is_dir(),
53            is_file: m.is_file(),
54            is_symlink: m.is_symlink(),
55            len: m.len(),
56            permissions: format!("{:#?}", m.permissions()),
57            created: m.created().ok(),
58            modified: m.modified().ok(),
59            accessed: m.accessed().ok(),
60        }
61    }
62}
63
64pub fn basic_dirent_info_schema() -> Schema {
65    Schema::new(vec![
66        Field::new("filepath", DataType::Utf8, false),
67        Field::new("is_dir", DataType::Boolean, false),
68        Field::new("is_file", DataType::Boolean, false),
69        Field::new("is_symlink", DataType::Boolean, false),
70        Field::new("len", DataType::UInt64, false),
71        Field::new("permissions", DataType::Utf8, false),
72        Field::new(
73            "created",
74            DataType::Timestamp(TimeUnit::Microsecond, None),
75            true,
76        ),
77        Field::new(
78            "modified",
79            DataType::Timestamp(TimeUnit::Microsecond, None),
80            true,
81        ),
82        Field::new(
83            "accessed",
84            DataType::Timestamp(TimeUnit::Microsecond, None),
85            true,
86        ),
87    ])
88}
89
90pub fn path2string<P>(p: P) -> String
91where
92    P: AsRef<Path>,
93{
94    p.as_ref().to_string_lossy().to_string()
95}
96
97pub async fn path2dinfo<P>(p: P) -> Result<BasicDirentInfo, io::Error>
98where
99    P: AsRef<Path>,
100{
101    let m: Metadata = tokio::fs::metadata(p.as_ref()).await?;
102    let meta: BasicMetadata = m.into();
103    Ok(BasicDirentInfo {
104        filepath: path2string(p),
105        meta,
106    })
107}
108
109pub async fn dinfo2batch<S>(dirents: S, schema: SchemaRef) -> Result<RecordBatch, io::Error>
110where
111    S: Stream<Item = Result<BasicDirentInfo, io::Error>>,
112{
113    pin_mut!(dirents);
114
115    let mut filepath_builder = StringBuilder::new();
116    let mut is_dir_builder = BooleanBuilder::new();
117    let mut is_file_builder = BooleanBuilder::new();
118    let mut is_symlink_builder = BooleanBuilder::new();
119    let mut len_builder = UInt64Builder::new();
120    let mut permissions_builder = StringBuilder::new();
121
122    let mut created_builder = TimestampMicrosecondBuilder::new();
123    let mut modified_builder = TimestampMicrosecondBuilder::new();
124    let mut accessed_builder = TimestampMicrosecondBuilder::new();
125
126    while let Some(rbdi) = dirents.next().await {
127        let bdi: BasicDirentInfo = rbdi?;
128        let BasicDirentInfo { filepath, meta } = bdi;
129
130        filepath_builder.append_value(&filepath);
131        is_dir_builder.append_value(meta.is_dir);
132        is_file_builder.append_value(meta.is_file);
133        is_symlink_builder.append_value(meta.is_symlink);
134        len_builder.append_value(meta.len);
135        permissions_builder.append_value(&meta.permissions);
136
137        created_builder.append_option(
138            meta.created
139                .and_then(systemtime2unixtime_micros)
140                .map(|t| t as i64),
141        );
142
143        modified_builder.append_option(
144            meta.modified
145                .and_then(systemtime2unixtime_micros)
146                .map(|t| t as i64),
147        );
148
149        accessed_builder.append_option(
150            meta.accessed
151                .and_then(systemtime2unixtime_micros)
152                .map(|t| t as i64),
153        );
154    }
155
156    let arrays: Vec<Arc<dyn arrow::array::Array>> = vec![
157        Arc::new(filepath_builder.finish()),
158        Arc::new(is_dir_builder.finish()),
159        Arc::new(is_file_builder.finish()),
160        Arc::new(is_symlink_builder.finish()),
161        Arc::new(len_builder.finish()),
162        Arc::new(permissions_builder.finish()),
163        Arc::new(created_builder.finish()),
164        Arc::new(modified_builder.finish()),
165        Arc::new(accessed_builder.finish()),
166    ];
167
168    RecordBatch::try_new(schema, arrays).map_err(io::Error::other)
169}
170
171pub async fn filenames2dirents<S>(
172    mut names: S,
173) -> impl Stream<Item = Result<BasicDirentInfo, io::Error>>
174where
175    S: Stream<Item = Result<String, io::Error>> + Unpin,
176{
177    try_stream! {
178        while let Some(rstr) = names.next().await {
179            let filename: String = rstr?;
180            let dinfo: BasicDirentInfo = path2dinfo(filename).await?;
181            yield dinfo;
182        }
183    }
184}
185
186/// Creates a record batch from the metadata values of the filenames.
187pub async fn filenames2batch<S>(
188    names: S,
189    schema: Option<SchemaRef>,
190) -> Result<RecordBatch, io::Error>
191where
192    S: Stream<Item = Result<String, io::Error>> + Unpin,
193{
194    let schema = schema.unwrap_or_else(|| Arc::new(basic_dirent_info_schema()));
195    let dirents = filenames2dirents(names).await;
196    dinfo2batch(dirents, schema).await
197}
198
199/// Gets filenames from stdin.
200pub fn stdin2filenames() -> impl Stream<Item = Result<String, io::Error>> {
201    let stdin = tokio::io::stdin();
202    let br = tokio::io::BufReader::new(stdin);
203    let lines = tokio::io::AsyncBufReadExt::lines(br);
204    tokio_stream::wrappers::LinesStream::new(lines)
205}