1use anyhow::{anyhow, Result};
2use chrono::{DateTime, Local};
3use globset::{Glob, GlobMatcher};
4use walkdir::WalkDir;
5
6use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
7use crate::sql::parser::ast::FileCTESpec;
8
9const DEFAULT_MAX_FILES: usize = 500_000;
10
11pub fn walk_filesystem(spec: &FileCTESpec, cte_name: &str) -> Result<DataTable> {
13 let root = std::fs::canonicalize(&spec.path)
15 .map_err(|e| anyhow!("FILE CTE: cannot resolve path '{}': {}", spec.path, e))?;
16
17 if !root.is_dir() {
18 return Err(anyhow!("FILE CTE: path '{}' is not a directory", spec.path));
19 }
20
21 let glob_matcher: Option<GlobMatcher> = match &spec.glob {
23 Some(pattern) => {
24 let g = Glob::new(pattern)
25 .map_err(|e| anyhow!("FILE CTE: invalid GLOB pattern '{}': {}", pattern, e))?;
26 Some(g.compile_matcher())
27 }
28 None => None,
29 };
30
31 let max_depth = if spec.recursive {
33 spec.max_depth } else {
35 Some(1) };
37
38 let mut walker = WalkDir::new(&root).follow_links(spec.follow_links);
39
40 if let Some(depth) = max_depth {
41 walker = walker.max_depth(depth);
42 }
43
44 let mut table = build_schema(cte_name);
46
47 let max_files = spec.max_files.unwrap_or(DEFAULT_MAX_FILES);
49 let mut file_count: usize = 0;
50 let mut permission_errors: usize = 0;
51
52 for entry_result in walker {
53 let entry = match entry_result {
54 Ok(e) => e,
55 Err(e) => {
56 permission_errors += 1;
57 tracing::debug!("FILE CTE: skipping entry: {}", e);
58 continue;
59 }
60 };
61
62 if !spec.include_hidden && is_hidden(&entry) {
64 continue;
65 }
66
67 if let Some(ref matcher) = glob_matcher {
69 if !entry.file_type().is_dir()
70 && !matcher.is_match(entry.file_name().to_string_lossy().as_ref())
71 {
72 continue;
73 }
74 }
75
76 file_count += 1;
78 if file_count > max_files {
79 return Err(anyhow!(
80 "FILE CTE: exceeded MAX_FILES limit of {}. \
81 Use MAX_FILES <n> or GLOB to constrain the walk.",
82 max_files
83 ));
84 }
85
86 let row = build_row(&entry);
87 table
88 .add_row(row)
89 .map_err(|e| anyhow!("FILE CTE: failed to add row: {}", e))?;
90 }
91
92 if permission_errors > 0 {
93 tracing::warn!(
94 "FILE CTE: {} entries skipped due to permission errors",
95 permission_errors
96 );
97 }
98
99 Ok(table)
100}
101
102fn build_schema(cte_name: &str) -> DataTable {
103 let mut table = DataTable::new(cte_name);
104 table.add_column(DataColumn::new("path").with_type(DataType::String));
105 table.add_column(DataColumn::new("parent").with_type(DataType::String));
106 table.add_column(DataColumn::new("name").with_type(DataType::String));
107 table.add_column(DataColumn::new("stem").with_type(DataType::String));
108 table.add_column(
109 DataColumn::new("ext")
110 .with_type(DataType::String)
111 .with_nullable(true),
112 );
113 table.add_column(DataColumn::new("size").with_type(DataType::Integer));
114 table.add_column(
115 DataColumn::new("modified")
116 .with_type(DataType::DateTime)
117 .with_nullable(true),
118 );
119 table.add_column(
120 DataColumn::new("created")
121 .with_type(DataType::DateTime)
122 .with_nullable(true),
123 );
124 table.add_column(
125 DataColumn::new("accessed")
126 .with_type(DataType::DateTime)
127 .with_nullable(true),
128 );
129 table.add_column(DataColumn::new("is_dir").with_type(DataType::Boolean));
130 table.add_column(DataColumn::new("is_symlink").with_type(DataType::Boolean));
131 table.add_column(DataColumn::new("depth").with_type(DataType::Integer));
132 table
133}
134
135fn is_hidden(entry: &walkdir::DirEntry) -> bool {
136 if entry.depth() == 0 {
138 return false;
139 }
140 entry.file_name().to_string_lossy().starts_with('.')
141}
142
143fn build_row(entry: &walkdir::DirEntry) -> DataRow {
144 let path = entry.path();
145
146 let canonical = std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf());
147
148 let path_str = canonical.to_string_lossy().to_string();
149 let parent_str = canonical
150 .parent()
151 .map(|p| p.to_string_lossy().to_string())
152 .unwrap_or_default();
153 let name_str = canonical
154 .file_name()
155 .map(|n| n.to_string_lossy().to_string())
156 .unwrap_or_default();
157 let stem_str = canonical
158 .file_stem()
159 .map(|s| s.to_string_lossy().to_string())
160 .unwrap_or_default();
161 let ext_val = canonical
162 .extension()
163 .map(|e| DataValue::String(e.to_string_lossy().to_lowercase()))
164 .unwrap_or(DataValue::Null);
165
166 let metadata = entry.metadata().ok();
167
168 let size = metadata
169 .as_ref()
170 .map(|m| DataValue::Integer(m.len() as i64))
171 .unwrap_or(DataValue::Integer(0));
172
173 let modified = system_time_to_datavalue(metadata.as_ref().and_then(|m| m.modified().ok()));
174 let created = system_time_to_datavalue(metadata.as_ref().and_then(|m| m.created().ok()));
175 let accessed = system_time_to_datavalue(metadata.as_ref().and_then(|m| m.accessed().ok()));
176
177 let is_dir = DataValue::Boolean(entry.file_type().is_dir());
178 let is_symlink = DataValue::Boolean(entry.file_type().is_symlink());
179 let depth = DataValue::Integer(entry.depth() as i64);
180
181 DataRow::new(vec![
182 DataValue::String(path_str),
183 DataValue::String(parent_str),
184 DataValue::String(name_str),
185 DataValue::String(stem_str),
186 ext_val,
187 size,
188 modified,
189 created,
190 accessed,
191 is_dir,
192 is_symlink,
193 depth,
194 ])
195}
196
197fn system_time_to_datavalue(time: Option<std::time::SystemTime>) -> DataValue {
198 match time {
199 Some(t) => {
200 let dt: DateTime<Local> = t.into();
201 DataValue::DateTime(dt.to_rfc3339())
202 }
203 None => DataValue::Null,
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use std::fs;
211 use tempfile::TempDir;
212
213 fn make_spec(path: &str) -> FileCTESpec {
214 FileCTESpec {
215 path: path.to_string(),
216 recursive: false,
217 glob: None,
218 max_depth: None,
219 max_files: None,
220 follow_links: false,
221 include_hidden: false,
222 }
223 }
224
225 #[test]
226 fn test_basic_walk() {
227 let tmp = TempDir::new().unwrap();
228 fs::write(tmp.path().join("a.txt"), "hello").unwrap();
229 fs::write(tmp.path().join("b.csv"), "1,2,3").unwrap();
230 fs::create_dir(tmp.path().join("subdir")).unwrap();
231
232 let spec = make_spec(tmp.path().to_str().unwrap());
233 let table = walk_filesystem(&spec, "files").unwrap();
234
235 assert_eq!(table.row_count(), 4);
237 assert_eq!(table.columns.len(), 12);
238 }
239
240 #[test]
241 fn test_non_recursive_excludes_nested() {
242 let tmp = TempDir::new().unwrap();
243 fs::create_dir_all(tmp.path().join("sub/deep")).unwrap();
244 fs::write(tmp.path().join("top.txt"), "").unwrap();
245 fs::write(tmp.path().join("sub/nested.txt"), "").unwrap();
246 fs::write(tmp.path().join("sub/deep/buried.txt"), "").unwrap();
247
248 let spec = make_spec(tmp.path().to_str().unwrap());
249 let table = walk_filesystem(&spec, "files").unwrap();
250
251 assert_eq!(table.row_count(), 3);
253 }
254
255 #[test]
256 fn test_recursive_walk() {
257 let tmp = TempDir::new().unwrap();
258 fs::create_dir_all(tmp.path().join("sub/deep")).unwrap();
259 fs::write(tmp.path().join("top.txt"), "").unwrap();
260 fs::write(tmp.path().join("sub/nested.txt"), "").unwrap();
261 fs::write(tmp.path().join("sub/deep/buried.txt"), "").unwrap();
262
263 let mut spec = make_spec(tmp.path().to_str().unwrap());
264 spec.recursive = true;
265
266 let table = walk_filesystem(&spec, "files").unwrap();
267
268 assert_eq!(table.row_count(), 6);
270 }
271
272 #[test]
273 fn test_glob_filter() {
274 let tmp = TempDir::new().unwrap();
275 fs::write(tmp.path().join("a.csv"), "").unwrap();
276 fs::write(tmp.path().join("b.csv"), "").unwrap();
277 fs::write(tmp.path().join("c.txt"), "").unwrap();
278
279 let mut spec = make_spec(tmp.path().to_str().unwrap());
280 spec.glob = Some("*.csv".to_string());
281
282 let table = walk_filesystem(&spec, "files").unwrap();
283
284 assert_eq!(table.row_count(), 3);
286 }
287
288 #[test]
289 fn test_max_files_enforcement() {
290 let tmp = TempDir::new().unwrap();
291 for i in 0..10 {
292 fs::write(tmp.path().join(format!("file_{i}.txt")), "").unwrap();
293 }
294
295 let mut spec = make_spec(tmp.path().to_str().unwrap());
296 spec.max_files = Some(5);
297
298 let result = walk_filesystem(&spec, "files");
299 assert!(result.is_err());
300 assert!(result.unwrap_err().to_string().contains("MAX_FILES"));
301 }
302
303 #[test]
304 fn test_hidden_files_excluded_by_default() {
305 let tmp = TempDir::new().unwrap();
306 fs::write(tmp.path().join("visible.txt"), "").unwrap();
307 fs::write(tmp.path().join(".hidden"), "").unwrap();
308
309 let spec = make_spec(tmp.path().to_str().unwrap());
310 let table = walk_filesystem(&spec, "files").unwrap();
311
312 assert_eq!(table.row_count(), 2);
314 }
315
316 #[test]
317 fn test_hidden_files_included() {
318 let tmp = TempDir::new().unwrap();
319 fs::write(tmp.path().join("visible.txt"), "").unwrap();
320 fs::write(tmp.path().join(".hidden"), "").unwrap();
321
322 let mut spec = make_spec(tmp.path().to_str().unwrap());
323 spec.include_hidden = true;
324
325 let table = walk_filesystem(&spec, "files").unwrap();
326
327 assert_eq!(table.row_count(), 3);
329 }
330
331 #[test]
332 fn test_max_depth() {
333 let tmp = TempDir::new().unwrap();
334 fs::create_dir_all(tmp.path().join("a/b/c")).unwrap();
335 fs::write(tmp.path().join("a/b/c/deep.txt"), "").unwrap();
336
337 let mut spec = make_spec(tmp.path().to_str().unwrap());
338 spec.recursive = true;
339 spec.max_depth = Some(2);
340
341 let table = walk_filesystem(&spec, "files").unwrap();
342
343 assert_eq!(table.row_count(), 3);
345 }
346
347 #[test]
348 fn test_invalid_path() {
349 let spec = make_spec("/nonexistent/path/that/does/not/exist");
350 let result = walk_filesystem(&spec, "files");
351 assert!(result.is_err());
352 }
353
354 #[test]
355 fn test_column_values() {
356 let tmp = TempDir::new().unwrap();
357 fs::write(tmp.path().join("test.csv"), "hello world").unwrap();
358
359 let spec = make_spec(tmp.path().to_str().unwrap());
360 let table = walk_filesystem(&spec, "files").unwrap();
361
362 let csv_row = table
364 .rows
365 .iter()
366 .find(|r| matches!(&r.values[2], DataValue::String(s) if s == "test.csv"))
367 .expect("should find test.csv row");
368
369 assert_eq!(csv_row.values[4], DataValue::String("csv".to_string()));
371 assert_eq!(csv_row.values[5], DataValue::Integer(11));
373 assert_eq!(csv_row.values[9], DataValue::Boolean(false));
375 assert_eq!(csv_row.values[10], DataValue::Boolean(false));
377 assert_eq!(csv_row.values[11], DataValue::Integer(1));
379 }
380}