1use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10
11use chrono::NaiveDateTime;
12use polars::prelude::*;
13
14use crate::convert::{
15 bars_to_dataframe, dataframe_to_bars, dataframe_to_ticks, ndt_to_date_string,
16 ticks_to_dataframe,
17};
18use crate::error::{DataError, Result};
19use crate::models::{Bar, BarQueryOpts, QueryOpts, StatRow, Tick};
20
21pub struct ParquetStore {
23 root: PathBuf,
24}
25
26impl ParquetStore {
27 pub fn open(root: impl AsRef<Path>) -> Result<Self> {
29 let root = root.as_ref().to_path_buf();
30 fs::create_dir_all(&root)?;
31 Ok(Self { root })
32 }
33
34 pub fn insert_ticks(&self, ticks: &[Tick]) -> Result<usize> {
39 if ticks.is_empty() {
40 return Ok(0);
41 }
42
43 let mut groups: HashMap<(String, String, String), Vec<&Tick>> = HashMap::new();
45 for tick in ticks {
46 let date = ndt_to_date_string(&tick.ts);
47 let key = (tick.exchange.clone(), tick.symbol.clone(), date);
48 groups.entry(key).or_default().push(tick);
49 }
50
51 let mut total_inserted = 0usize;
52
53 for ((exchange, symbol, date), group_ticks) in &groups {
54 let dir = self.tick_dir(exchange, symbol);
55 fs::create_dir_all(&dir)?;
56 let file_path = dir.join(format!("{date}.parquet"));
57
58 let owned: Vec<Tick> = group_ticks.iter().map(|t| (*t).clone()).collect();
59 let new_df = ticks_to_dataframe(&owned)?;
60
61 if file_path.exists() {
62 let existing_df = read_parquet_file(&file_path)?;
63 let existing_count = existing_df.height();
64 let combined = concat_and_dedup_ticks(existing_df, new_df)?;
65 total_inserted += combined.height().saturating_sub(existing_count);
66 write_parquet_file(&file_path, &mut combined.clone())?;
67 } else {
68 let deduped = dedup_ticks(new_df)?;
69 total_inserted += deduped.height();
70 write_parquet_file(&file_path, &mut deduped.clone())?;
71 }
72 }
73
74 Ok(total_inserted)
75 }
76
77 pub fn insert_bars(&self, bars: &[Bar]) -> Result<usize> {
80 if bars.is_empty() {
81 return Ok(0);
82 }
83
84 let mut groups: HashMap<(String, String, String, String), Vec<&Bar>> = HashMap::new();
86 for bar in bars {
87 let date = ndt_to_date_string(&bar.ts);
88 let key = (
89 bar.exchange.clone(),
90 bar.symbol.clone(),
91 bar.timeframe.as_str().to_string(),
92 date,
93 );
94 groups.entry(key).or_default().push(bar);
95 }
96
97 let mut total_inserted = 0usize;
98
99 for ((exchange, symbol, timeframe, date), group_bars) in &groups {
100 let dir = self.bar_dir(&exchange, &symbol, &timeframe);
101 fs::create_dir_all(&dir)?;
102 let file_path = dir.join(format!("{date}.parquet"));
103
104 let owned: Vec<Bar> = group_bars.iter().map(|b| (*b).clone()).collect();
105 let new_df = bars_to_dataframe(&owned)?;
106
107 if file_path.exists() {
108 let existing_df = read_parquet_file(&file_path)?;
109 let existing_count = existing_df.height();
110 let combined = concat_and_dedup_bars(existing_df, new_df)?;
111 total_inserted += combined.height().saturating_sub(existing_count);
112 write_parquet_file(&file_path, &mut combined.clone())?;
113 } else {
114 let deduped = dedup_bars(new_df)?;
115 total_inserted += deduped.height();
116 write_parquet_file(&file_path, &mut deduped.clone())?;
117 }
118 }
119
120 Ok(total_inserted)
121 }
122
123 pub fn query_ticks(&self, opts: &QueryOpts) -> Result<(Vec<Tick>, u64)> {
128 let dir = self.tick_dir(&opts.exchange, &opts.symbol);
129 if !dir.exists() {
130 return Ok((Vec::new(), 0));
131 }
132
133 let files = list_date_files(&dir, opts.from, opts.to)?;
134 if files.is_empty() {
135 return Ok((Vec::new(), 0));
136 }
137
138 let mut all_dfs: Vec<DataFrame> = Vec::new();
139 for file in &files {
140 let df = read_parquet_file(file)?;
141 all_dfs.push(df);
142 }
143 let mut combined = concat_dataframes(all_dfs)?;
144
145 combined = apply_ts_filter(combined, opts.from, opts.to)?;
147
148 combined = combined.sort(["ts"], SortMultipleOptions::default())?;
150
151 let total = combined.height() as u64;
152
153 combined = apply_pagination(combined, opts.limit, opts.tail, opts.descending)?;
155
156 let ticks = dataframe_to_ticks(&combined)?;
157 Ok((ticks, total))
158 }
159
160 pub fn query_bars(&self, opts: &BarQueryOpts) -> Result<(Vec<Bar>, u64)> {
163 let dir = self.bar_dir(&opts.exchange, &opts.symbol, &opts.timeframe);
164 if !dir.exists() {
165 return Ok((Vec::new(), 0));
166 }
167
168 let files = list_date_files(&dir, opts.from, opts.to)?;
169 if files.is_empty() {
170 return Ok((Vec::new(), 0));
171 }
172
173 let mut all_dfs: Vec<DataFrame> = Vec::new();
174 for file in &files {
175 let df = read_parquet_file(file)?;
176 all_dfs.push(df);
177 }
178 let mut combined = concat_dataframes(all_dfs)?;
179
180 combined = apply_ts_filter(combined, opts.from, opts.to)?;
182
183 combined = combined.sort(["ts"], SortMultipleOptions::default())?;
185
186 let total = combined.height() as u64;
187
188 combined = apply_pagination(combined, opts.limit, opts.tail, opts.descending)?;
190
191 let bars = dataframe_to_bars(&combined)?;
192 Ok((bars, total))
193 }
194
195 pub fn delete_ticks(
199 &self,
200 exchange: &str,
201 symbol: &str,
202 from: Option<NaiveDateTime>,
203 to: Option<NaiveDateTime>,
204 ) -> Result<usize> {
205 let dir = self.tick_dir(exchange, symbol);
206 if !dir.exists() {
207 return Ok(0);
208 }
209 delete_from_partition(&dir, from, to)
210 }
211
212 pub fn delete_bars(
214 &self,
215 exchange: &str,
216 symbol: &str,
217 timeframe: &str,
218 from: Option<NaiveDateTime>,
219 to: Option<NaiveDateTime>,
220 ) -> Result<usize> {
221 let dir = self.bar_dir(exchange, symbol, timeframe);
222 if !dir.exists() {
223 return Ok(0);
224 }
225 delete_from_partition(&dir, from, to)
226 }
227
228 pub fn delete_symbol(&self, exchange: &str, symbol: &str) -> Result<(usize, usize)> {
230 let tick_count = self.count_rows_in_dir(&self.tick_dir(exchange, symbol));
231 let bar_count = self.count_all_bars_for_symbol(exchange, symbol);
232
233 let tick_dir = self.tick_dir(exchange, symbol);
235 if tick_dir.exists() {
236 fs::remove_dir_all(&tick_dir)?;
237 }
238
239 let bar_sym_dir = self
241 .root
242 .join("bars")
243 .join(format!("exchange={exchange}"))
244 .join(format!("symbol={symbol}"));
245 if bar_sym_dir.exists() {
246 fs::remove_dir_all(&bar_sym_dir)?;
247 }
248
249 Ok((tick_count, bar_count))
250 }
251
252 pub fn delete_exchange(&self, exchange: &str) -> Result<(usize, usize)> {
254 let tick_ex_dir = self.root.join("ticks").join(format!("exchange={exchange}"));
255 let bar_ex_dir = self.root.join("bars").join(format!("exchange={exchange}"));
256
257 let tick_count = self.count_rows_recursive(&tick_ex_dir);
258 let bar_count = self.count_rows_recursive(&bar_ex_dir);
259
260 if tick_ex_dir.exists() {
261 fs::remove_dir_all(&tick_ex_dir)?;
262 }
263 if bar_ex_dir.exists() {
264 fs::remove_dir_all(&bar_ex_dir)?;
265 }
266
267 Ok((tick_count, bar_count))
268 }
269
270 pub fn stats(&self, exchange: Option<&str>, symbol: Option<&str>) -> Result<Vec<StatRow>> {
274 let mut rows = Vec::new();
275
276 self.collect_tick_stats(&mut rows, exchange, symbol)?;
278
279 self.collect_bar_stats(&mut rows, exchange, symbol)?;
281
282 rows.sort_by(|a, b| {
284 a.exchange
285 .cmp(&b.exchange)
286 .then(a.symbol.cmp(&b.symbol))
287 .then(a.data_type.cmp(&b.data_type))
288 });
289
290 Ok(rows)
291 }
292
293 pub fn total_size(&self) -> Option<u64> {
295 let mut total = 0u64;
296 for entry in walkdir(&self.root) {
297 if entry.extension().map_or(false, |e| e == "parquet") {
298 if let Ok(meta) = fs::metadata(&entry) {
299 total += meta.len();
300 }
301 }
302 }
303 if total == 0 { None } else { Some(total) }
304 }
305
306 fn tick_dir(&self, exchange: &str, symbol: &str) -> PathBuf {
310 self.root
311 .join("ticks")
312 .join(format!("exchange={exchange}"))
313 .join(format!("symbol={symbol}"))
314 }
315
316 fn bar_dir(&self, exchange: &str, symbol: &str, timeframe: &str) -> PathBuf {
318 self.root
319 .join("bars")
320 .join(format!("exchange={exchange}"))
321 .join(format!("symbol={symbol}"))
322 .join(format!("timeframe={timeframe}"))
323 }
324
325 fn count_rows_in_dir(&self, dir: &Path) -> usize {
327 if !dir.exists() {
328 return 0;
329 }
330 let mut count = 0;
331 if let Ok(entries) = fs::read_dir(dir) {
332 for entry in entries.flatten() {
333 let path = entry.path();
334 if path.extension().map_or(false, |e| e == "parquet") {
335 if let Ok(df) = read_parquet_file(&path) {
336 count += df.height();
337 }
338 }
339 }
340 }
341 count
342 }
343
344 fn count_rows_recursive(&self, dir: &Path) -> usize {
346 if !dir.exists() {
347 return 0;
348 }
349 let mut count = 0;
350 for path in walkdir(dir) {
351 if path.extension().map_or(false, |e| e == "parquet") {
352 if let Ok(df) = read_parquet_file(&path) {
353 count += df.height();
354 }
355 }
356 }
357 count
358 }
359
360 fn count_all_bars_for_symbol(&self, exchange: &str, symbol: &str) -> usize {
362 let bar_sym_dir = self
363 .root
364 .join("bars")
365 .join(format!("exchange={exchange}"))
366 .join(format!("symbol={symbol}"));
367 self.count_rows_recursive(&bar_sym_dir)
368 }
369
370 fn collect_tick_stats(
372 &self,
373 rows: &mut Vec<StatRow>,
374 exchange_filter: Option<&str>,
375 symbol_filter: Option<&str>,
376 ) -> Result<()> {
377 let ticks_dir = self.root.join("ticks");
378 if !ticks_dir.exists() {
379 return Ok(());
380 }
381
382 for (exchange, symbol, dir) in self.iter_exchange_symbol_dirs(&ticks_dir)? {
383 if let Some(ef) = exchange_filter {
384 if exchange != ef {
385 continue;
386 }
387 }
388 if let Some(sf) = symbol_filter {
389 if symbol != sf {
390 continue;
391 }
392 }
393
394 let (count, ts_min, ts_max) = self.aggregate_parquet_stats(&dir)?;
395 if count > 0 {
396 rows.push(StatRow {
397 exchange,
398 symbol,
399 data_type: "tick".to_string(),
400 count,
401 ts_min: ts_min.unwrap_or_default(),
402 ts_max: ts_max.unwrap_or_default(),
403 });
404 }
405 }
406
407 Ok(())
408 }
409
410 fn collect_bar_stats(
412 &self,
413 rows: &mut Vec<StatRow>,
414 exchange_filter: Option<&str>,
415 symbol_filter: Option<&str>,
416 ) -> Result<()> {
417 let bars_dir = self.root.join("bars");
418 if !bars_dir.exists() {
419 return Ok(());
420 }
421
422 for (exchange, symbol, timeframe, dir) in self.iter_exchange_symbol_tf_dirs(&bars_dir)? {
423 if let Some(ef) = exchange_filter {
424 if exchange != ef {
425 continue;
426 }
427 }
428 if let Some(sf) = symbol_filter {
429 if symbol != sf {
430 continue;
431 }
432 }
433
434 let (count, ts_min, ts_max) = self.aggregate_parquet_stats(&dir)?;
435 if count > 0 {
436 rows.push(StatRow {
437 exchange,
438 symbol,
439 data_type: format!("bar ({timeframe})"),
440 count,
441 ts_min: ts_min.unwrap_or_default(),
442 ts_max: ts_max.unwrap_or_default(),
443 });
444 }
445 }
446
447 Ok(())
448 }
449
450 fn iter_exchange_symbol_dirs(&self, base: &Path) -> Result<Vec<(String, String, PathBuf)>> {
452 let mut result = Vec::new();
453 if !base.exists() {
454 return Ok(result);
455 }
456
457 for ex_entry in fs::read_dir(base)?.flatten() {
458 let ex_path = ex_entry.path();
459 if !ex_path.is_dir() {
460 continue;
461 }
462 let exchange =
463 parse_partition_value(ex_path.file_name().unwrap().to_str().unwrap_or(""));
464 if exchange.is_empty() {
465 continue;
466 }
467
468 for sym_entry in fs::read_dir(&ex_path)?.flatten() {
469 let sym_path = sym_entry.path();
470 if !sym_path.is_dir() {
471 continue;
472 }
473 let symbol =
474 parse_partition_value(sym_path.file_name().unwrap().to_str().unwrap_or(""));
475 if symbol.is_empty() {
476 continue;
477 }
478 result.push((exchange.clone(), symbol, sym_path));
479 }
480 }
481
482 Ok(result)
483 }
484
485 fn iter_exchange_symbol_tf_dirs(
487 &self,
488 base: &Path,
489 ) -> Result<Vec<(String, String, String, PathBuf)>> {
490 let mut result = Vec::new();
491 if !base.exists() {
492 return Ok(result);
493 }
494
495 for ex_entry in fs::read_dir(base)?.flatten() {
496 let ex_path = ex_entry.path();
497 if !ex_path.is_dir() {
498 continue;
499 }
500 let exchange =
501 parse_partition_value(ex_path.file_name().unwrap().to_str().unwrap_or(""));
502 if exchange.is_empty() {
503 continue;
504 }
505
506 for sym_entry in fs::read_dir(&ex_path)?.flatten() {
507 let sym_path = sym_entry.path();
508 if !sym_path.is_dir() {
509 continue;
510 }
511 let symbol =
512 parse_partition_value(sym_path.file_name().unwrap().to_str().unwrap_or(""));
513 if symbol.is_empty() {
514 continue;
515 }
516
517 for tf_entry in fs::read_dir(&sym_path)?.flatten() {
518 let tf_path = tf_entry.path();
519 if !tf_path.is_dir() {
520 continue;
521 }
522 let timeframe =
523 parse_partition_value(tf_path.file_name().unwrap().to_str().unwrap_or(""));
524 if timeframe.is_empty() {
525 continue;
526 }
527 result.push((exchange.clone(), symbol.clone(), timeframe, tf_path));
528 }
529 }
530 }
531
532 Ok(result)
533 }
534
535 fn aggregate_parquet_stats(
537 &self,
538 dir: &Path,
539 ) -> Result<(u64, Option<NaiveDateTime>, Option<NaiveDateTime>)> {
540 let mut total_count = 0u64;
541 let mut global_min: Option<i64> = None;
542 let mut global_max: Option<i64> = None;
543
544 if !dir.exists() {
545 return Ok((0, None, None));
546 }
547
548 for entry in fs::read_dir(dir)?.flatten() {
549 let path = entry.path();
550 if path.extension().map_or(false, |e| e == "parquet") {
551 let df = read_parquet_file(&path)?;
552 total_count += df.height() as u64;
553
554 if df.height() > 0 {
555 let ts_col = df.column("ts").ok().and_then(|c| c.datetime().ok());
556 if let Some(ts) = ts_col {
557 if let Some(min_val) = ts.min() {
558 global_min =
559 Some(global_min.map_or(min_val, |cur: i64| cur.min(min_val)));
560 }
561 if let Some(max_val) = ts.max() {
562 global_max =
563 Some(global_max.map_or(max_val, |cur: i64| cur.max(max_val)));
564 }
565 }
566 }
567 }
568 }
569
570 let ts_min = global_min.map(micros_to_ndt);
571 let ts_max = global_max.map(micros_to_ndt);
572
573 Ok((total_count, ts_min, ts_max))
574 }
575}
576
577fn parse_partition_value(dir_name: &str) -> String {
581 dir_name
582 .split_once('=')
583 .map(|(_, v)| v.to_string())
584 .unwrap_or_default()
585}
586
587fn list_date_files(
589 dir: &Path,
590 from: Option<NaiveDateTime>,
591 to: Option<NaiveDateTime>,
592) -> Result<Vec<PathBuf>> {
593 let mut files = Vec::new();
594 let from_date = from.map(|d| d.format("%Y-%m-%d").to_string());
595 let to_date = to.map(|d| d.format("%Y-%m-%d").to_string());
596
597 for entry in fs::read_dir(dir)?.flatten() {
598 let path = entry.path();
599 if path.extension().map_or(false, |e| e == "parquet") {
600 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
601
602 let dominated_by_from = from_date.as_ref().map_or(false, |fd| stem < fd.as_str());
604 let past_to = to_date.as_ref().map_or(false, |td| stem > td.as_str());
605
606 if !dominated_by_from && !past_to {
607 files.push(path);
608 }
609 }
610 }
611
612 files.sort();
613 Ok(files)
614}
615
616fn read_parquet_file(path: &Path) -> Result<DataFrame> {
618 let file = std::fs::File::open(path)?;
619 let df = ParquetReader::new(file).finish()?;
620 Ok(df)
621}
622
623fn write_parquet_file(path: &Path, df: &mut DataFrame) -> Result<()> {
625 let file = std::fs::File::create(path)?;
626 ParquetWriter::new(file)
627 .with_compression(ParquetCompression::Zstd(None))
628 .finish(df)?;
629 Ok(())
630}
631
632fn concat_and_dedup_ticks(existing: DataFrame, new: DataFrame) -> Result<DataFrame> {
634 let combined = concat_dataframes(vec![existing, new])?;
635 dedup_ticks(combined)
636}
637
638fn dedup_ticks(df: DataFrame) -> Result<DataFrame> {
640 let cols: Vec<String> = vec!["exchange".into(), "symbol".into(), "ts".into()];
641 let deduped = df
642 .unique_stable(Some(&cols), UniqueKeepStrategy::First, None)?
643 .sort(["ts"], SortMultipleOptions::default())?;
644 Ok(deduped)
645}
646
647fn concat_and_dedup_bars(existing: DataFrame, new: DataFrame) -> Result<DataFrame> {
649 let combined = concat_dataframes(vec![existing, new])?;
650 dedup_bars(combined)
651}
652
653fn dedup_bars(df: DataFrame) -> Result<DataFrame> {
655 let cols: Vec<String> = vec![
656 "exchange".into(),
657 "symbol".into(),
658 "timeframe".into(),
659 "ts".into(),
660 ];
661 let deduped = df
662 .unique_stable(Some(&cols), UniqueKeepStrategy::First, None)?
663 .sort(["ts"], SortMultipleOptions::default())?;
664 Ok(deduped)
665}
666
667fn concat_dataframes(dfs: Vec<DataFrame>) -> Result<DataFrame> {
669 if dfs.is_empty() {
670 return Err(DataError::Other("no dataframes to concat".into()));
671 }
672 if dfs.len() == 1 {
673 return Ok(dfs.into_iter().next().unwrap());
674 }
675 let lazy_frames: Vec<LazyFrame> = dfs.into_iter().map(|df| df.lazy()).collect();
676 let combined = polars::prelude::concat(lazy_frames, Default::default())?.collect()?;
677 Ok(combined)
678}
679
680fn apply_ts_filter(
682 df: DataFrame,
683 from: Option<NaiveDateTime>,
684 to: Option<NaiveDateTime>,
685) -> Result<DataFrame> {
686 if from.is_none() && to.is_none() {
687 return Ok(df);
688 }
689
690 let mut lf = df.lazy();
691
692 if let Some(f) = from {
693 let from_micros = f.and_utc().timestamp_micros();
694 lf = lf.filter(
695 col("ts")
696 .gt_eq(lit(from_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None))),
697 );
698 }
699 if let Some(t) = to {
700 let to_micros = t.and_utc().timestamp_micros();
701 lf = lf.filter(
702 col("ts").lt_eq(lit(to_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None))),
703 );
704 }
705
706 Ok(lf.collect()?)
707}
708
709fn apply_pagination(
711 df: DataFrame,
712 limit: usize,
713 tail: bool,
714 descending: bool,
715) -> Result<DataFrame> {
716 let result = if tail {
717 let n = limit.min(df.height());
719 let tailed = df.tail(Some(n));
720 if descending {
721 tailed.sort(
722 ["ts"],
723 SortMultipleOptions::default().with_order_descending(true),
724 )?
725 } else {
726 tailed
727 }
728 } else if descending {
729 let sorted = df.sort(
731 ["ts"],
732 SortMultipleOptions::default().with_order_descending(true),
733 )?;
734 sorted.head(Some(limit))
735 } else {
736 df.head(Some(limit))
737 };
738 Ok(result)
739}
740
741fn delete_from_partition(
743 dir: &Path,
744 from: Option<NaiveDateTime>,
745 to: Option<NaiveDateTime>,
746) -> Result<usize> {
747 if from.is_none() && to.is_none() {
748 let count = count_all_rows_in_dir(dir);
750 for entry in fs::read_dir(dir)?.flatten() {
752 let path = entry.path();
753 if path.extension().map_or(false, |e| e == "parquet") {
754 fs::remove_file(&path)?;
755 }
756 }
757 return Ok(count);
758 }
759
760 let files = list_date_files(dir, from, to)?;
761 let mut total_deleted = 0usize;
762
763 for file_path in &files {
764 let df = read_parquet_file(file_path)?;
765 let original_count = df.height();
766
767 let filtered = apply_ts_filter_inverted(df, from, to)?;
769
770 if filtered.height() == 0 {
771 fs::remove_file(file_path)?;
773 total_deleted += original_count;
774 } else if filtered.height() < original_count {
775 total_deleted += original_count - filtered.height();
777 write_parquet_file(file_path, &mut filtered.clone())?;
778 }
779 }
781
782 Ok(total_deleted)
783}
784
785fn apply_ts_filter_inverted(
787 df: DataFrame,
788 from: Option<NaiveDateTime>,
789 to: Option<NaiveDateTime>,
790) -> Result<DataFrame> {
791 let mut lf = df.lazy();
792
793 match (from, to) {
794 (Some(f), Some(t)) => {
795 let from_micros = f.and_utc().timestamp_micros();
796 let to_micros = t.and_utc().timestamp_micros();
797 let from_lit = lit(from_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None));
798 let to_lit = lit(to_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None));
799 lf = lf.filter(col("ts").lt(from_lit).or(col("ts").gt(to_lit)));
801 }
802 (Some(f), None) => {
803 let from_micros = f.and_utc().timestamp_micros();
804 let from_lit = lit(from_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None));
805 lf = lf.filter(col("ts").lt(from_lit));
806 }
807 (None, Some(t)) => {
808 let to_micros = t.and_utc().timestamp_micros();
809 let to_lit = lit(to_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None));
810 lf = lf.filter(col("ts").gt(to_lit));
811 }
812 (None, None) => {}
813 }
814
815 Ok(lf.collect()?)
816}
817
818fn count_all_rows_in_dir(dir: &Path) -> usize {
820 let mut count = 0;
821 if let Ok(entries) = fs::read_dir(dir) {
822 for entry in entries.flatten() {
823 let path = entry.path();
824 if path.extension().map_or(false, |e| e == "parquet") {
825 if let Ok(df) = read_parquet_file(&path) {
826 count += df.height();
827 }
828 }
829 }
830 }
831 count
832}
833
834fn walkdir(dir: &Path) -> Vec<PathBuf> {
836 let mut result = Vec::new();
837 if !dir.exists() {
838 return result;
839 }
840 if let Ok(entries) = fs::read_dir(dir) {
841 for entry in entries.flatten() {
842 let path = entry.path();
843 if path.is_dir() {
844 result.extend(walkdir(&path));
845 } else {
846 result.push(path);
847 }
848 }
849 }
850 result
851}
852
853fn micros_to_ndt(micros: i64) -> NaiveDateTime {
855 let secs = micros / 1_000_000;
856 let nsecs = ((micros % 1_000_000) * 1_000) as u32;
857 chrono::DateTime::from_timestamp(secs, nsecs)
858 .map(|dt| dt.naive_utc())
859 .unwrap_or_default()
860}